feldera_types/config.rs
1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure. The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs. We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::preprocess::PreprocessorConfig;
9use crate::program_schema::ProgramSchema;
10use crate::secret_resolver::default_secrets_directory;
11use crate::transport::adhoc::AdHocInputConfig;
12use crate::transport::clock::ClockConfig;
13use crate::transport::datagen::DatagenInputConfig;
14use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
15use crate::transport::file::{FileInputConfig, FileOutputConfig};
16use crate::transport::http::{HttpInputConfig, HttpOutputConfig};
17use crate::transport::iceberg::IcebergReaderConfig;
18use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
19use crate::transport::nats::NatsInputConfig;
20use crate::transport::nexmark::NexmarkInputConfig;
21use crate::transport::postgres::{
22 PostgresCdcReaderConfig, PostgresReaderConfig, PostgresWriterConfig,
23};
24use crate::transport::pubsub::PubSubInputConfig;
25use crate::transport::redis::RedisOutputConfig;
26use crate::transport::s3::S3InputConfig;
27use crate::transport::url::UrlInputConfig;
28use core::fmt;
29use feldera_ir::{MirNode, MirNodeId};
30use serde::de::{self, MapAccess, Visitor};
31use serde::{Deserialize, Deserializer, Serialize};
32use serde_json::Value as JsonValue;
33use std::collections::HashMap;
34use std::fmt::Display;
35use std::path::Path;
36use std::str::FromStr;
37use std::time::Duration;
38use std::{borrow::Cow, cmp::max, collections::BTreeMap};
39use utoipa::ToSchema;
40use utoipa::openapi::{ObjectBuilder, OneOfBuilder, Ref, RefOr, Schema, SchemaType};
41
42pub mod dev_tweaks;
43pub use dev_tweaks::DevTweaks;
44
45const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
46
47/// Default value of `ConnectorConfig::max_queued_records`.
48pub const fn default_max_queued_records() -> u64 {
49 1_000_000
50}
51
52pub const DEFAULT_MAX_WORKER_BATCH_SIZE: u64 = 10_000;
53
54pub const DEFAULT_CLOCK_RESOLUTION_USECS: u64 = 1_000_000;
55
56/// Program information included in the pipeline configuration.
57#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)]
58pub struct ProgramIr {
59 /// The MIR of the program.
60 pub mir: HashMap<MirNodeId, MirNode>,
61 /// Program schema.
62 pub program_schema: ProgramSchema,
63}
64
65/// Pipeline deployment configuration.
66/// It represents configuration entries directly provided by the user
67/// (e.g., runtime configuration) and entries derived from the schema
68/// of the compiled program (e.g., connectors). Storage configuration,
69/// if applicable, is set by the runner.
70#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq)]
71pub struct PipelineConfig {
72 /// Global controller configuration.
73 #[serde(flatten)]
74 #[schema(inline)]
75 pub global: RuntimeConfig,
76
77 /// Configuration for multihost pipelines.
78 ///
79 /// The presence of this field indicates that the pipeline is running in
80 /// multihost mode. In the pod with ordinal 0, this triggers starting the
81 /// coordinator process. In all pods, this tells the pipeline process to
82 /// await a connection from the coordinator instead of initializing the
83 /// pipeline immediately.
84 pub multihost: Option<MultihostConfig>,
85
86 /// Unique system-generated name of the pipeline (format: `pipeline-<uuid>`).
87 /// It is unique across all tenants and cannot be changed.
88 ///
89 /// The `<uuid>` is also used in the naming of various resources that back the pipeline,
90 /// and as such this name is useful to find/identify corresponding resources.
91 pub name: Option<String>,
92
93 /// Name given by the tenant to the pipeline. It is only unique within the same tenant, and can
94 /// be changed by the tenant when the pipeline is stopped.
95 ///
96 /// Given a specific tenant, it can be used to find/identify a specific pipeline of theirs.
97 pub given_name: Option<String>,
98
99 /// Configuration for persistent storage
100 ///
101 /// If `global.storage` is `Some(_)`, this field must be set to some
102 /// [`StorageConfig`]. If `global.storage` is `None``, the pipeline ignores
103 /// this field.
104 #[serde(default)]
105 pub storage_config: Option<StorageConfig>,
106
107 /// Directory containing values of secrets.
108 ///
109 /// If this is not set, a default directory is used.
110 pub secrets_dir: Option<String>,
111
112 /// Input endpoint configuration.
113 #[serde(default)]
114 pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
115
116 /// Output endpoint configuration.
117 #[serde(default)]
118 pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
119
120 /// Program information.
121 #[serde(default)]
122 pub program_ir: Option<ProgramIr>,
123}
124
125impl PipelineConfig {
126 pub fn max_parallel_connector_init(&self) -> u64 {
127 max(
128 self.global
129 .max_parallel_connector_init
130 .unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
131 1,
132 )
133 }
134
135 pub fn with_storage(self, storage: Option<(StorageConfig, StorageOptions)>) -> Self {
136 let (storage_config, storage_options) = storage.unzip();
137 Self {
138 global: RuntimeConfig {
139 storage: storage_options,
140 ..self.global
141 },
142 storage_config,
143 ..self
144 }
145 }
146
147 pub fn storage(&self) -> Option<(&StorageConfig, &StorageOptions)> {
148 let storage_options = self.global.storage.as_ref();
149 let storage_config = self.storage_config.as_ref();
150 storage_config.zip(storage_options)
151 }
152
153 /// Returns `self.secrets_dir`, or the default secrets directory if it isn't
154 /// set.
155 pub fn secrets_dir(&self) -> &Path {
156 match &self.secrets_dir {
157 Some(dir) => Path::new(dir.as_str()),
158 None => default_secrets_directory(),
159 }
160 }
161
162 /// Abbreviated config that can be printed in the log on pipeline startup.
163 pub fn display_summary(&self) -> String {
164 // TODO: we may want to further abbreviate connector config.
165 let summary = serde_json::json!({
166 "name": self.name,
167 "given_name": self.given_name,
168 "global": self.global,
169 "storage_config": self.storage_config,
170 "secrets_dir": self.secrets_dir,
171 "inputs": self.inputs,
172 "outputs": self.outputs
173 });
174
175 serde_json::to_string_pretty(&summary).unwrap_or_else(|_| "{}".to_string())
176 }
177}
178
179/// A subset of fields in `PipelineConfig` that are generated by the compiler.
180/// These fields are shipped to the pipeline by the compilation server along with
181/// the program binary.
182// Note: An alternative would be to embed these fields in the program binary itself
183// as static strings. This would work well for program IR, but it would require recompiling
184// the program anytime a connector config changes, whereas today connector changes
185// do not require recompilation.
186#[derive(Default, Deserialize, Serialize, Eq, PartialEq, Debug, Clone)]
187pub struct PipelineConfigProgramInfo {
188 /// Input endpoint configuration.
189 pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
190
191 /// Output endpoint configuration.
192 #[serde(default)]
193 pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
194
195 /// Program information.
196 #[serde(default)]
197 pub program_ir: Option<ProgramIr>,
198}
199
200/// Configuration for a multihost Feldera pipeline.
201///
202/// This configuration is primarily for the coordinator.
203#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
204pub struct MultihostConfig {
205 /// Number of hosts to launch.
206 ///
207 /// For the configuration to be truly multihost, this should be at least 2.
208 /// A value of 1 still runs the multihost coordinator but it only
209 /// coordinates a single host.
210 pub hosts: usize,
211}
212
213impl Default for MultihostConfig {
214 fn default() -> Self {
215 Self { hosts: 1 }
216 }
217}
218
219/// Configuration for persistent storage in a [`PipelineConfig`].
220#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
221pub struct StorageConfig {
222 /// A directory to keep pipeline state, as a path on the filesystem of the
223 /// machine or container where the pipeline will run.
224 ///
225 /// When storage is enabled, this directory stores the data for
226 /// [StorageBackendConfig::Default].
227 ///
228 /// When fault tolerance is enabled, this directory stores checkpoints and
229 /// the log.
230 pub path: String,
231
232 /// How to cache access to storage in this pipeline.
233 #[serde(default)]
234 pub cache: StorageCacheConfig,
235}
236
237impl StorageConfig {
238 pub fn path(&self) -> &Path {
239 Path::new(&self.path)
240 }
241}
242
243/// How to cache access to storage within a Feldera pipeline.
244#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
245#[serde(rename_all = "snake_case")]
246pub enum StorageCacheConfig {
247 /// Use the operating system's page cache as the primary storage cache.
248 ///
249 /// This is the default because it currently performs better than
250 /// `FelderaCache`.
251 #[default]
252 PageCache,
253
254 /// Use Feldera's internal cache implementation.
255 ///
256 /// This is under development. It will become the default when its
257 /// performance exceeds that of `PageCache`.
258 FelderaCache,
259}
260
261impl StorageCacheConfig {
262 #[cfg(unix)]
263 pub fn to_custom_open_flags(&self) -> i32 {
264 match self {
265 StorageCacheConfig::PageCache => (),
266 StorageCacheConfig::FelderaCache => {
267 #[cfg(target_os = "linux")]
268 return libc::O_DIRECT;
269 }
270 }
271 0
272 }
273}
274
275/// Storage configuration for a pipeline.
276#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
277#[serde(default)]
278pub struct StorageOptions {
279 /// How to connect to the underlying storage.
280 pub backend: StorageBackendConfig,
281
282 /// For a batch of data maintained as part of a persistent index during a
283 /// pipeline run, the minimum estimated number of bytes to write it to
284 /// storage.
285 ///
286 /// This is provided for debugging and fine-tuning and should ordinarily be
287 /// left unset.
288 ///
289 /// A value of 0 will write even empty batches to storage, and nonzero
290 /// values provide a threshold. `usize::MAX` would effectively disable
291 /// storage for such batches. The default is 10,048,576 (10 MiB).
292 pub min_storage_bytes: Option<usize>,
293
294 /// For a batch of data passed through the pipeline during a single step,
295 /// the minimum estimated number of bytes to write it to storage.
296 ///
297 /// This is provided for debugging and fine-tuning and should ordinarily be
298 /// left unset. A value of 0 will write even empty batches to storage, and
299 /// nonzero values provide a threshold. `usize::MAX`, the default,
300 /// effectively disables storage for such batches. If it is set to another
301 /// value, it should ordinarily be greater than or equal to
302 /// `min_storage_bytes`.
303 pub min_step_storage_bytes: Option<usize>,
304
305 /// The form of compression to use in data batches.
306 ///
307 /// Compression has a CPU cost but it can take better advantage of limited
308 /// NVMe and network bandwidth, which means that it can increase overall
309 /// performance.
310 pub compression: StorageCompression,
311
312 /// The maximum size of the in-memory storage cache, in MiB.
313 ///
314 /// If set, the specified cache size is spread across all the foreground and
315 /// background threads. If unset, each foreground or background thread cache
316 /// is limited to 256 MiB.
317 pub cache_mib: Option<usize>,
318}
319
320/// Backend storage configuration.
321#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
322#[serde(tag = "name", content = "config", rename_all = "snake_case")]
323pub enum StorageBackendConfig {
324 /// Use the default storage configuration.
325 ///
326 /// This currently uses the local file system.
327 #[default]
328 Default,
329
330 /// Use the local file system.
331 ///
332 /// This uses ordinary system file operations.
333 File(Box<FileBackendConfig>),
334
335 /// Object storage.
336 Object(ObjectStorageConfig),
337}
338
339impl Display for StorageBackendConfig {
340 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
341 match self {
342 StorageBackendConfig::Default => write!(f, "default"),
343 StorageBackendConfig::File(_) => write!(f, "file"),
344 StorageBackendConfig::Object(_) => write!(f, "object"),
345 }
346 }
347}
348
349/// Storage compression algorithm.
350#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
351#[serde(rename_all = "snake_case")]
352pub enum StorageCompression {
353 /// Use Feldera's default compression algorithm.
354 ///
355 /// The default may change as Feldera's performance is tuned and new
356 /// algorithms are introduced.
357 #[default]
358 Default,
359
360 /// Do not compress.
361 None,
362
363 /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
364 Snappy,
365}
366
367#[derive(Debug, Clone, Eq, PartialEq)]
368pub enum StartFromCheckpoint {
369 Latest,
370 Uuid(uuid::Uuid),
371}
372
373impl ToSchema<'_> for StartFromCheckpoint {
374 fn schema() -> (
375 &'static str,
376 utoipa::openapi::RefOr<utoipa::openapi::schema::Schema>,
377 ) {
378 (
379 "StartFromCheckpoint",
380 utoipa::openapi::RefOr::T(Schema::OneOf(
381 OneOfBuilder::new()
382 .item(
383 ObjectBuilder::new()
384 .schema_type(SchemaType::String)
385 .enum_values(Some(["latest"].into_iter()))
386 .build(),
387 )
388 .item(
389 ObjectBuilder::new()
390 .schema_type(SchemaType::String)
391 .format(Some(utoipa::openapi::SchemaFormat::KnownFormat(
392 utoipa::openapi::KnownFormat::Uuid,
393 )))
394 .build(),
395 )
396 .nullable(true)
397 .build(),
398 )),
399 )
400 }
401}
402
403impl<'de> Deserialize<'de> for StartFromCheckpoint {
404 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
405 where
406 D: Deserializer<'de>,
407 {
408 struct StartFromCheckpointVisitor;
409
410 impl<'de> Visitor<'de> for StartFromCheckpointVisitor {
411 type Value = StartFromCheckpoint;
412
413 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
414 formatter.write_str("a UUID string or the string \"latest\"")
415 }
416
417 fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
418 where
419 E: de::Error,
420 {
421 if value == "latest" {
422 Ok(StartFromCheckpoint::Latest)
423 } else {
424 uuid::Uuid::parse_str(value)
425 .map(StartFromCheckpoint::Uuid)
426 .map_err(|_| E::invalid_value(serde::de::Unexpected::Str(value), &self))
427 }
428 }
429 }
430
431 deserializer.deserialize_str(StartFromCheckpointVisitor)
432 }
433}
434
435impl Serialize for StartFromCheckpoint {
436 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
437 where
438 S: serde::Serializer,
439 {
440 match self {
441 StartFromCheckpoint::Latest => serializer.serialize_str("latest"),
442 StartFromCheckpoint::Uuid(uuid) => serializer.serialize_str(&uuid.to_string()),
443 }
444 }
445}
446
447#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
448pub struct SyncConfig {
449 /// The endpoint URL for the storage service.
450 ///
451 /// This is typically required for custom or local S3-compatible storage providers like MinIO.
452 /// Example: `http://localhost:9000`
453 ///
454 /// Relevant rclone config key: [`endpoint`](https://rclone.org/s3/#s3-endpoint)
455 pub endpoint: Option<String>,
456
457 /// The name of the storage bucket.
458 ///
459 /// This may include a path to a folder inside the bucket (e.g., `my-bucket/data`).
460 pub bucket: String,
461
462 /// The region that this bucket is in.
463 ///
464 /// Leave empty for Minio or the default region (`us-east-1` for AWS).
465 pub region: Option<String>,
466
467 /// The name of the cloud storage provider (e.g., `"AWS"`, `"Minio"`).
468 ///
469 /// Used for provider-specific behavior in rclone.
470 /// If omitted, defaults to `"Other"`.
471 ///
472 /// See [rclone S3 provider documentation](https://rclone.org/s3/#s3-provider)
473 pub provider: Option<String>,
474
475 /// The access key used to authenticate with the storage provider.
476 ///
477 /// If not provided, rclone will fall back to environment-based credentials, such as
478 /// `RCLONE_S3_ACCESS_KEY_ID`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
479 /// this can be left empty to allow automatic authentication via the pod's service account.
480 pub access_key: Option<String>,
481
482 /// The secret key used together with the access key for authentication.
483 ///
484 /// If not provided, rclone will fall back to environment-based credentials, such as
485 /// `RCLONE_S3_SECRET_ACCESS_KEY`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
486 /// this can be left empty to allow automatic authentication via the pod's service account.
487 pub secret_key: Option<String>,
488
489 /// When set, the pipeline will try fetch the specified checkpoint from the
490 /// object store.
491 ///
492 /// If `fail_if_no_checkpoint` is `true`, the pipeline will fail to initialize.
493 pub start_from_checkpoint: Option<StartFromCheckpoint>,
494
495 /// When true, the pipeline will fail to initialize if fetching the
496 /// specified checkpoint fails (missing, download error).
497 /// When false, the pipeline will start from scratch instead.
498 ///
499 /// False by default.
500 #[schema(default = std::primitive::bool::default)]
501 #[serde(default)]
502 pub fail_if_no_checkpoint: bool,
503
504 /// The number of file transfers to run in parallel.
505 /// Default: 20
506 pub transfers: Option<u8>,
507
508 /// The number of checkers to run in parallel.
509 /// Default: 20
510 pub checkers: Option<u8>,
511
512 /// Set to skip post copy check of checksums, and only check the file sizes.
513 /// This can significantly improve the throughput.
514 /// Defualt: false
515 pub ignore_checksum: Option<bool>,
516
517 /// Number of streams to use for multi-thread downloads.
518 /// Default: 10
519 pub multi_thread_streams: Option<u8>,
520
521 /// Use multi-thread download for files above this size.
522 /// Format: `[size][Suffix]` (Example: 1G, 500M)
523 /// Supported suffixes: k|M|G|T
524 /// Default: 100M
525 pub multi_thread_cutoff: Option<String>,
526
527 /// The number of chunks of the same file that are uploaded for multipart uploads.
528 /// Default: 10
529 pub upload_concurrency: Option<u8>,
530
531 /// When `true`, the pipeline starts in **standby** mode; processing doesn't
532 /// start until activation (`POST /activate`).
533 /// If this pipeline was previously activated and the storage has not been
534 /// cleared, the pipeline will auto activate, no newer checkpoints will be
535 /// fetched.
536 ///
537 /// Standby behavior depends on `start_from_checkpoint`:
538 /// - If `latest`, pipeline continuously fetches the latest available
539 /// checkpoint until activated.
540 /// - If checkpoint UUID, pipeline fetches this checkpoint once and waits
541 /// in standby until activated.
542 ///
543 /// Default: `false`
544 #[schema(default = std::primitive::bool::default)]
545 #[serde(default)]
546 pub standby: bool,
547
548 /// The interval (in seconds) between each attempt to fetch the latest
549 /// checkpoint from object store while in standby mode.
550 ///
551 /// Applies only when `start_from_checkpoint` is set to `latest`.
552 ///
553 /// Default: 10 seconds
554 #[schema(default = default_pull_interval)]
555 #[serde(default = "default_pull_interval")]
556 pub pull_interval: u64,
557
558 /// The interval (in seconds) between each push of checkpoints to object store.
559 ///
560 /// Default: disabled (no periodic push).
561 #[serde(default)]
562 pub push_interval: Option<u64>,
563
564 /// Extra flags to pass to `rclone`.
565 ///
566 /// WARNING: Supplying incorrect or conflicting flags can break `rclone`.
567 /// Use with caution.
568 ///
569 /// Refer to the docs to see the supported flags:
570 /// - [Global flags](https://rclone.org/flags/)
571 /// - [S3 specific flags](https://rclone.org/s3/)
572 pub flags: Option<Vec<String>>,
573
574 /// The minimum number of checkpoints to retain in object store.
575 /// No checkpoints will be deleted if the total count is below this threshold.
576 ///
577 /// Default: 10
578 #[schema(default = default_retention_min_count)]
579 #[serde(default = "default_retention_min_count")]
580 pub retention_min_count: u32,
581
582 /// The minimum age (in days) a checkpoint must reach before it becomes
583 /// eligible for deletion. All younger checkpoints will be preserved.
584 ///
585 /// Default: 30
586 #[schema(default = default_retention_min_age)]
587 #[serde(default = "default_retention_min_age")]
588 pub retention_min_age: u32,
589
590 /// A read-only bucket used as a fallback checkpoint source.
591 ///
592 /// When the pipeline has no local checkpoint and `bucket` contains no
593 /// checkpoint either, it will attempt to fetch the checkpoint from this
594 /// location instead. All connection settings (`endpoint`, `region`,
595 /// `provider`, `access_key`, `secret_key`) are shared with `bucket`.
596 ///
597 /// The pipeline **never writes** to `read_bucket`.
598 ///
599 /// Must point to a different location than `bucket`.
600 #[serde(default)]
601 pub read_bucket: Option<String>,
602}
603
604fn default_pull_interval() -> u64 {
605 10
606}
607
608fn default_retention_min_count() -> u32 {
609 10
610}
611
612fn default_retention_min_age() -> u32 {
613 30
614}
615
616impl SyncConfig {
617 pub fn validate(&self) -> Result<(), String> {
618 if self.standby && self.start_from_checkpoint.is_none() {
619 return Err(r#"invalid sync config: `standby` set to `true` but `start_from_checkpoint` not set.
620Standby mode requires `start_from_checkpoint` to be set.
621Consider setting `start_from_checkpoint` to `"latest"`."#.to_owned());
622 }
623
624 if let Some(ref rb) = self.read_bucket
625 && rb == &self.bucket
626 {
627 return Err(
628 "invalid sync config: `read_bucket` and `bucket` must point to different locations"
629 .to_owned(),
630 );
631 }
632
633 Ok(())
634 }
635}
636
637/// Configuration for supplying a custom pipeline StatefulSet template via a Kubernetes ConfigMap.
638///
639/// Operators can provide a custom StatefulSet YAML that the Kubernetes runner will use when
640/// creating pipeline StatefulSets for a pipeline. The custom template must be stored as the
641/// value of a key in a ConfigMap in the same namespace as the pipeline; set `name` to the
642/// ConfigMap name and `key` to the entry that contains the template.
643///
644/// Recommendations and requirements:
645/// - **Start from the default template and modify it as needed.** The default template is present
646/// in ConfigMap named as `<release-name>-pipeline-template`, with key `pipelineTemplate` in the release
647/// namespace and should be used as a reference.
648/// - The template must contain a valid Kubernetes `StatefulSet` manifest in YAML form. The
649/// runner substitutes variables in the template before parsing; therefore the final YAML
650/// must be syntactically valid.
651/// - The runner performs simple string substitution for the following placeholders. Please ensure these
652/// placeholders are placed at appropriate location for their semantics:
653/// - `{id}`: pipeline Kubernetes name (used for object names and labels)
654/// - `{namespace}`: Kubernetes namespace where the pipeline runs
655/// - `{pipeline_executor_image}`: container image used to run the pipeline executor
656/// - `{binary_ref}`: program binary reference passed as an argument
657/// - `{program_info_ref}`: program info reference passed as an argument
658/// - `{pipeline_storage_path}`: mount path for persistent pipeline storage
659/// - `{storage_class_name}`: storage class name to use for PVCs (if applicable)
660/// - `{deployment_id}`: UUID identifying the deployment instance
661/// - `{deployment_initial}`: initial desired runtime status (e.g., `provisioning`)
662/// - `{bootstrap_policy}`: bootstrap policy value when applicable
663#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
664pub struct PipelineTemplateConfig {
665 /// Name of the ConfigMap containing the pipeline template.
666 pub name: String,
667 /// Key in the ConfigMap containing the pipeline template.
668 ///
669 /// If not set, defaults to `pipelineTemplate`.
670 #[schema(default = default_pipeline_template_key)]
671 #[serde(default = "default_pipeline_template_key")]
672 pub key: String,
673}
674
675fn default_pipeline_template_key() -> String {
676 "pipelineTemplate".to_string()
677}
678
679#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
680pub struct ObjectStorageConfig {
681 /// URL.
682 ///
683 /// The following URL schemes are supported:
684 ///
685 /// * S3:
686 /// - `s3://<bucket>/<path>`
687 /// - `s3a://<bucket>/<path>`
688 /// - `https://s3.<region>.amazonaws.com/<bucket>`
689 /// - `https://<bucket>.s3.<region>.amazonaws.com`
690 /// - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
691 /// * Google Cloud Storage:
692 /// - `gs://<bucket>/<path>`
693 /// * Microsoft Azure Blob Storage:
694 /// - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
695 /// - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
696 /// - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
697 /// - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
698 /// - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
699 /// - `azure://<container>/<path>` (custom)
700 /// - `https://<account>.dfs.core.windows.net`
701 /// - `https://<account>.blob.core.windows.net`
702 /// - `https://<account>.blob.core.windows.net/<container>`
703 /// - `https://<account>.dfs.fabric.microsoft.com`
704 /// - `https://<account>.dfs.fabric.microsoft.com/<container>`
705 /// - `https://<account>.blob.fabric.microsoft.com`
706 /// - `https://<account>.blob.fabric.microsoft.com/<container>`
707 ///
708 /// Settings derived from the URL will override other settings.
709 pub url: String,
710
711 /// Additional options as key-value pairs.
712 ///
713 /// The following keys are supported:
714 ///
715 /// * S3:
716 /// - `access_key_id`: AWS Access Key.
717 /// - `secret_access_key`: AWS Secret Access Key.
718 /// - `region`: Region.
719 /// - `default_region`: Default region.
720 /// - `endpoint`: Custom endpoint for communicating with S3,
721 /// e.g. `https://localhost:4566` for testing against a localstack
722 /// instance.
723 /// - `token`: Token to use for requests (passed to underlying provider).
724 /// - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
725 /// * Google Cloud Storage:
726 /// - `service_account`: Path to the service account file.
727 /// - `service_account_key`: The serialized service account key.
728 /// - `google_application_credentials`: Application credentials path.
729 /// - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
730 /// * Microsoft Azure Blob Storage:
731 /// - `access_key`: Azure Access Key.
732 /// - `container_name`: Azure Container Name.
733 /// - `account`: Azure Account.
734 /// - `bearer_token_authorization`: Static bearer token for authorizing requests.
735 /// - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
736 /// - `client_secret`: Client secret for use in client secret flow.
737 /// - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
738 /// - `endpoint`: Override the endpoint for communicating with blob storage.
739 /// - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
740 ///
741 /// Options set through the URL take precedence over those set with these
742 /// options.
743 #[serde(flatten)]
744 pub other_options: BTreeMap<String, String>,
745}
746
747/// Configuration for local file system access.
748#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
749#[serde(default)]
750pub struct FileBackendConfig {
751 /// Whether to use background threads for file I/O.
752 ///
753 /// Background threads should improve performance, but they can reduce
754 /// performance if too few cores are available. This is provided for
755 /// debugging and fine-tuning and should ordinarily be left unset.
756 pub async_threads: Option<bool>,
757
758 /// Per-I/O operation sleep duration, in milliseconds.
759 ///
760 /// This is for simulating slow storage devices. Do not use this in
761 /// production.
762 pub ioop_delay: Option<u64>,
763
764 /// Configuration to synchronize checkpoints to object store.
765 pub sync: Option<SyncConfig>,
766}
767
768/// Global pipeline configuration settings. This is the publicly
769/// exposed type for users to configure pipelines.
770#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
771#[serde(default)]
772pub struct RuntimeConfig {
773 /// Number of DBSP worker threads.
774 ///
775 /// Each DBSP "foreground" worker thread is paired with a "background"
776 /// thread for LSM merging, making the total number of threads twice the
777 /// specified number.
778 ///
779 /// The typical sweet spot for the number of workers is between 4 and 16.
780 /// Each worker increases overall memory consumption for data structures
781 /// used during a step.
782 pub workers: u16,
783
784 /// The maximum amount of memory, in Megabytes, that the pipeline is allowed to use
785 /// on each host.
786 ///
787 /// Setting this property activates memory pressure monitoring and backpressure
788 /// mechanisms. The pipeline will track the amount of remaining memory and
789 /// report the memory pressure level via the `memory_pressure` metric.
790 ///
791 /// As the memory pressure increases, the system will apply increasing backpressure
792 /// to push state cached in memory to storage, preventing the pipeline from running
793 /// out of memory at the cost of some performance degradation.
794 ///
795 /// It is strongly recommended to set this property to prevent the pipeline from
796 /// running out of memory. The setting should not exceed the memory limit of the pipeline
797 /// instance.
798 ///
799 /// When `max_rss_mb` is not specified but `resources.memory_mb_max` is set, the
800 /// latter is used as the effective memory cap for the pipeline.
801 ///
802 /// See [documentation on the pipeline's memory usage](https://docs.feldera.com/operations/memory)
803 /// for more details.
804 pub max_rss_mb: Option<u64>,
805
806 /// Number of DBSP hosts.
807 ///
808 /// The worker threads are evenly divided among the hosts. For single-host
809 /// deployments, this should be 1 (the default).
810 ///
811 /// Multihost pipelines are an enterprise-only preview feature.
812 pub hosts: usize,
813
814 /// Storage configuration.
815 ///
816 /// - If this is `None`, the default, the pipeline's state is kept in
817 /// in-memory data-structures. This is useful if the pipeline's state
818 /// will fit in memory and if the pipeline is ephemeral and does not need
819 /// to be recovered after a restart. The pipeline will most likely run
820 /// faster since it does not need to access storage.
821 ///
822 /// - If set, the pipeline's state is kept on storage. This allows the
823 /// pipeline to work with state that will not fit into memory. It also
824 /// allows the state to be checkpointed and recovered across restarts.
825 #[serde(deserialize_with = "deserialize_storage_options")]
826 pub storage: Option<StorageOptions>,
827
828 /// Fault tolerance configuration.
829 #[serde(deserialize_with = "deserialize_fault_tolerance")]
830 pub fault_tolerance: FtConfig,
831
832 /// Enable CPU profiler.
833 ///
834 /// The default value is `true`.
835 pub cpu_profiler: bool,
836
837 /// Enable pipeline tracing.
838 pub tracing: bool,
839
840 /// Jaeger tracing endpoint to send tracing information to.
841 pub tracing_endpoint_jaeger: String,
842
843 /// Minimal input batch size.
844 ///
845 /// The controller delays pushing input records to the circuit until at
846 /// least `min_batch_size_records` records have been received (total
847 /// across all endpoints) or `max_buffering_delay_usecs` microseconds
848 /// have passed since at least one input records has been buffered.
849 /// Defaults to 0.
850 pub min_batch_size_records: u64,
851
852 /// Maximal delay in microseconds to wait for `min_batch_size_records` to
853 /// get buffered by the controller, defaults to 0.
854 pub max_buffering_delay_usecs: u64,
855
856 /// Resource reservations and limits. This is enforced
857 /// only in Feldera Cloud.
858 pub resources: ResourceConfig,
859
860 /// Real-time clock resolution in microseconds.
861 ///
862 /// This parameter controls the execution of queries that use the `NOW()` function. The output of such
863 /// queries depends on the real-time clock and can change over time without any external
864 /// inputs. If the query uses `NOW()`, the pipeline will update the clock value and trigger incremental
865 /// recomputation at most each `clock_resolution_usecs` microseconds. If the query does not use
866 /// `NOW()`, then clock value updates are suppressed and the pipeline ignores this setting.
867 ///
868 /// It is set to 1 second (1,000,000 microseconds) by default.
869 pub clock_resolution_usecs: Option<u64>,
870
871 /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
872 /// its worker threads. Specify at least twice as many CPU numbers as
873 /// workers. CPUs are generally numbered starting from 0. The pipeline
874 /// might not be able to honor CPU pinning requests.
875 ///
876 /// CPU pinning can make pipelines run faster and perform more consistently,
877 /// as long as different pipelines running on the same machine are pinned to
878 /// different CPUs.
879 pub pin_cpus: Vec<usize>,
880
881 /// Timeout in seconds for the `Provisioning` phase of the pipeline.
882 /// Setting this value will override the default of the runner.
883 pub provisioning_timeout_secs: Option<u64>,
884
885 /// The maximum number of connectors initialized in parallel during pipeline
886 /// startup.
887 ///
888 /// At startup, the pipeline must initialize all of its input and output connectors.
889 /// Depending on the number and types of connectors, this can take a long time.
890 /// To accelerate the process, multiple connectors are initialized concurrently.
891 /// This option controls the maximum number of connectors that can be initialized
892 /// in parallel.
893 ///
894 /// The default is 10.
895 pub max_parallel_connector_init: Option<u64>,
896
897 /// Specification of additional (sidecar) containers.
898 pub init_containers: Option<serde_json::Value>,
899
900 /// Deprecated: setting this true or false does not have an effect anymore.
901 pub checkpoint_during_suspend: bool,
902
903 /// Sets the number of available runtime threads for the http server.
904 ///
905 /// In most cases, this does not need to be set explicitly and
906 /// the default is sufficient. Can be increased in case the
907 /// pipeline HTTP API operations are a bottleneck.
908 ///
909 /// If not specified, the default is set to `workers`.
910 pub http_workers: Option<u64>,
911
912 /// Sets the number of available runtime threads for async IO tasks.
913 ///
914 /// This affects some networking and file I/O operations
915 /// especially adapters and ad-hoc queries.
916 ///
917 /// In most cases, this does not need to be set explicitly and
918 /// the default is sufficient. Can be increased in case
919 /// ingress, egress or ad-hoc queries are a bottleneck.
920 ///
921 /// If not specified, the default is set to `workers`.
922 pub io_workers: Option<u64>,
923
924 /// Environment variables for the pipeline process.
925 ///
926 /// These are key-value pairs injected into the pipeline process environment.
927 /// Some variable names are reserved by the platform and cannot be overridden
928 /// (for example `RUST_LOG`, and variables in the `FELDERA_`,
929 /// `KUBERNETES_`, and `TOKIO_` namespaces).
930 #[serde(default)]
931 pub env: BTreeMap<String, String>,
932
933 /// Optional settings for tweaking Feldera internals.
934 pub dev_tweaks: DevTweaks,
935
936 /// Log filtering directives.
937 ///
938 /// If set to a valid [tracing-subscriber] filter, this controls the log
939 /// messages emitted by the pipeline process. Otherwise, or if the filter
940 /// has invalid syntax, messages at "info" severity and higher are written
941 /// to the log and all others are discarded.
942 ///
943 /// [tracing-subscriber]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
944 pub logging: Option<String>,
945
946 /// ConfigMap containing a custom pipeline template (Enterprise only).
947 ///
948 /// This feature is only available in Feldera Enterprise. If set, the Kubernetes runner
949 /// will read the template from the specified ConfigMap and use it instead of the default
950 /// StatefulSet template for the configured pipeline.
951 ///
952 /// check [`PipelineTemplateConfig`] documentation for details.
953 pub pipeline_template_configmap: Option<PipelineTemplateConfig>,
954}
955
956/// Accepts "true" and "false" and converts them to the new format.
957fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
958where
959 D: Deserializer<'de>,
960{
961 struct BoolOrStruct;
962
963 impl<'de> Visitor<'de> for BoolOrStruct {
964 type Value = Option<StorageOptions>;
965
966 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
967 formatter.write_str("boolean or StorageOptions")
968 }
969
970 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
971 where
972 E: de::Error,
973 {
974 match v {
975 false => Ok(None),
976 true => Ok(Some(StorageOptions::default())),
977 }
978 }
979
980 fn visit_unit<E>(self) -> Result<Self::Value, E>
981 where
982 E: de::Error,
983 {
984 Ok(None)
985 }
986
987 fn visit_none<E>(self) -> Result<Self::Value, E>
988 where
989 E: de::Error,
990 {
991 Ok(None)
992 }
993
994 fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
995 where
996 M: MapAccess<'de>,
997 {
998 Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
999 }
1000 }
1001
1002 deserializer.deserialize_any(BoolOrStruct)
1003}
1004
1005/// Accepts very old 'initial_state' and 'latest_checkpoint' as enabling fault
1006/// tolerance.
1007///
1008/// Accepts `null` as disabling fault tolerance.
1009///
1010/// Otherwise, deserializes [FtConfig] in the way that one might otherwise
1011/// expect.
1012fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<FtConfig, D::Error>
1013where
1014 D: Deserializer<'de>,
1015{
1016 struct StringOrStruct;
1017
1018 impl<'de> Visitor<'de> for StringOrStruct {
1019 type Value = FtConfig;
1020
1021 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1022 formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
1023 }
1024
1025 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1026 where
1027 E: de::Error,
1028 {
1029 match v {
1030 "initial_state" | "latest_checkpoint" => Ok(FtConfig {
1031 model: Some(FtModel::default()),
1032 ..FtConfig::default()
1033 }),
1034 _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
1035 }
1036 }
1037
1038 fn visit_unit<E>(self) -> Result<Self::Value, E>
1039 where
1040 E: de::Error,
1041 {
1042 Ok(FtConfig::default())
1043 }
1044
1045 fn visit_none<E>(self) -> Result<Self::Value, E>
1046 where
1047 E: de::Error,
1048 {
1049 Ok(FtConfig::default())
1050 }
1051
1052 fn visit_map<M>(self, map: M) -> Result<FtConfig, M::Error>
1053 where
1054 M: MapAccess<'de>,
1055 {
1056 Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))
1057 }
1058 }
1059
1060 deserializer.deserialize_any(StringOrStruct)
1061}
1062
1063impl Default for RuntimeConfig {
1064 fn default() -> Self {
1065 Self {
1066 workers: 8,
1067 max_rss_mb: None,
1068 hosts: 1,
1069 storage: Some(StorageOptions::default()),
1070 fault_tolerance: FtConfig::default(),
1071 cpu_profiler: true,
1072 tracing: {
1073 // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
1074 // to keep it on by default.
1075 false
1076 },
1077 tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
1078 min_batch_size_records: 0,
1079 max_buffering_delay_usecs: 0,
1080 resources: ResourceConfig::default(),
1081 clock_resolution_usecs: { Some(DEFAULT_CLOCK_RESOLUTION_USECS) },
1082 pin_cpus: Vec::new(),
1083 provisioning_timeout_secs: None,
1084 max_parallel_connector_init: None,
1085 init_containers: None,
1086 checkpoint_during_suspend: true,
1087 io_workers: None,
1088 http_workers: None,
1089 env: BTreeMap::default(),
1090 dev_tweaks: DevTweaks::default(),
1091 logging: None,
1092 pipeline_template_configmap: None,
1093 }
1094 }
1095}
1096
1097/// Fault-tolerance configuration.
1098///
1099/// The default [FtConfig] (via [FtConfig::default]) disables fault tolerance,
1100/// which is the configuration that one gets if [RuntimeConfig] omits fault
1101/// tolerance configuration.
1102///
1103/// The default value for [FtConfig::model] enables fault tolerance, as
1104/// `Some(FtModel::default())`. This is the configuration that one gets if
1105/// [RuntimeConfig] includes a fault tolerance configuration but does not
1106/// specify a particular model.
1107#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1108#[serde(rename_all = "snake_case")]
1109pub struct FtConfig {
1110 /// Fault tolerance model to use.
1111 #[serde(with = "none_as_string")]
1112 #[serde(default = "default_model")]
1113 #[schema(
1114 schema_with = none_as_string_schema::<FtModel>,
1115 )]
1116 pub model: Option<FtModel>,
1117
1118 /// Interval between automatic checkpoints, in seconds.
1119 ///
1120 /// The default is 60 seconds. Values less than 1 or greater than 3600 will
1121 /// be forced into that range.
1122 #[serde(default = "default_checkpoint_interval_secs")]
1123 pub checkpoint_interval_secs: Option<u64>,
1124}
1125
1126fn default_model() -> Option<FtModel> {
1127 Some(FtModel::default())
1128}
1129
1130pub fn default_checkpoint_interval_secs() -> Option<u64> {
1131 Some(60)
1132}
1133
1134impl Default for FtConfig {
1135 fn default() -> Self {
1136 Self {
1137 model: None,
1138 checkpoint_interval_secs: default_checkpoint_interval_secs(),
1139 }
1140 }
1141}
1142
1143#[cfg(test)]
1144mod test {
1145 use super::deserialize_fault_tolerance;
1146 use crate::config::{FtConfig, FtModel};
1147 use serde::{Deserialize, Serialize};
1148
1149 #[test]
1150 fn ft_config() {
1151 #[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)]
1152 #[serde(default)]
1153 struct Wrapper {
1154 #[serde(deserialize_with = "deserialize_fault_tolerance")]
1155 config: FtConfig,
1156 }
1157
1158 // Omitting FtConfig, or specifying null, or specifying model "none", disables fault tolerance.
1159 for s in [
1160 "{}",
1161 r#"{"config": null}"#,
1162 r#"{"config": {"model": "none"}}"#,
1163 ] {
1164 let config: Wrapper = serde_json::from_str(s).unwrap();
1165 assert_eq!(
1166 config,
1167 Wrapper {
1168 config: FtConfig {
1169 model: None,
1170 checkpoint_interval_secs: Some(60)
1171 }
1172 }
1173 );
1174 }
1175
1176 // Serializing disabled FT produces explicit "none" form.
1177 let s = serde_json::to_string(&Wrapper {
1178 config: FtConfig::default(),
1179 })
1180 .unwrap();
1181 assert!(s.contains("\"none\""));
1182
1183 // `{}` for FtConfig, or `{...}` with `model` omitted, enables fault
1184 // tolerance.
1185 for s in [r#"{"config": {}}"#, r#"{"checkpoint_interval_secs": 60}"#] {
1186 assert_eq!(
1187 serde_json::from_str::<FtConfig>(s).unwrap(),
1188 FtConfig {
1189 model: Some(FtModel::default()),
1190 checkpoint_interval_secs: Some(60)
1191 }
1192 );
1193 }
1194
1195 // `"checkpoint_interval_secs": null` disables periodic checkpointing.
1196 assert_eq!(
1197 serde_json::from_str::<FtConfig>(r#"{"checkpoint_interval_secs": null}"#).unwrap(),
1198 FtConfig {
1199 model: Some(FtModel::default()),
1200 checkpoint_interval_secs: None
1201 }
1202 );
1203 }
1204}
1205
1206impl FtConfig {
1207 pub fn is_enabled(&self) -> bool {
1208 self.model.is_some()
1209 }
1210
1211 /// Returns the checkpoint interval, if fault tolerance is enabled, and
1212 /// otherwise `None`.
1213 pub fn checkpoint_interval(&self) -> Option<Duration> {
1214 if self.is_enabled() {
1215 self.checkpoint_interval_secs
1216 .map(|interval| Duration::from_secs(interval.clamp(1, 3600)))
1217 } else {
1218 None
1219 }
1220 }
1221}
1222
1223/// Serde implementation for de/serializing a string into `Option<T>` where
1224/// `"none"` indicates `None` and any other string indicates `Some`.
1225///
1226/// This could be extended to handle non-strings by adding more forwarding
1227/// `visit_*` methods to the Visitor implementation. I don't see a way to write
1228/// them automatically.
1229mod none_as_string {
1230 use std::marker::PhantomData;
1231
1232 use serde::de::{Deserialize, Deserializer, IntoDeserializer, Visitor};
1233 use serde::ser::{Serialize, Serializer};
1234
1235 pub(super) fn serialize<S, T>(value: &Option<T>, serializer: S) -> Result<S::Ok, S::Error>
1236 where
1237 S: Serializer,
1238 T: Serialize,
1239 {
1240 match value.as_ref() {
1241 Some(value) => value.serialize(serializer),
1242 None => "none".serialize(serializer),
1243 }
1244 }
1245
1246 struct NoneAsString<T>(PhantomData<fn() -> T>);
1247
1248 impl<'de, T> Visitor<'de> for NoneAsString<T>
1249 where
1250 T: Deserialize<'de>,
1251 {
1252 type Value = Option<T>;
1253
1254 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1255 formatter.write_str("string")
1256 }
1257
1258 fn visit_none<E>(self) -> Result<Self::Value, E>
1259 where
1260 E: serde::de::Error,
1261 {
1262 Ok(None)
1263 }
1264
1265 fn visit_str<E>(self, value: &str) -> Result<Option<T>, E>
1266 where
1267 E: serde::de::Error,
1268 {
1269 if &value.to_ascii_lowercase() == "none" {
1270 Ok(None)
1271 } else {
1272 Ok(Some(T::deserialize(value.into_deserializer())?))
1273 }
1274 }
1275 }
1276
1277 pub(super) fn deserialize<'de, D, T>(deserializer: D) -> Result<Option<T>, D::Error>
1278 where
1279 D: Deserializer<'de>,
1280 T: Deserialize<'de>,
1281 {
1282 deserializer.deserialize_str(NoneAsString(PhantomData))
1283 }
1284}
1285
1286/// Generates an OpenAPI schema for an `Option<T>` field serialized with `none_as_string`.
1287/// The schema is a `oneOf` with a reference to `T`'s schema and a `"none"` string enum.
1288fn none_as_string_schema<'a, T: ToSchema<'a> + Default + Serialize>() -> Schema {
1289 Schema::OneOf(
1290 OneOfBuilder::new()
1291 .item(RefOr::Ref(Ref::new(format!(
1292 "#/components/schemas/{}",
1293 T::schema().0
1294 ))))
1295 .item(
1296 ObjectBuilder::new()
1297 .schema_type(SchemaType::String)
1298 .enum_values(Some(vec!["none"])),
1299 )
1300 .default(Some(
1301 serde_json::to_value(T::default()).expect("Failed to serialize default value"),
1302 ))
1303 .build(),
1304 )
1305}
1306
1307/// Fault tolerance model.
1308///
1309/// The ordering is significant: we consider [Self::ExactlyOnce] to be a "higher
1310/// level" of fault tolerance than [Self::AtLeastOnce].
1311#[derive(
1312 Debug, Copy, Clone, Default, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, ToSchema,
1313)]
1314#[serde(rename_all = "snake_case")]
1315pub enum FtModel {
1316 /// Each record is output at least once. Crashes may duplicate output, but
1317 /// no input or output is dropped.
1318 AtLeastOnce,
1319
1320 /// Each record is output exactly once. Crashes do not drop or duplicate
1321 /// input or output.
1322 #[default]
1323 ExactlyOnce,
1324}
1325
1326impl FtModel {
1327 pub fn option_as_str(value: Option<FtModel>) -> &'static str {
1328 value.map_or("no", |model| model.as_str())
1329 }
1330
1331 pub fn as_str(&self) -> &'static str {
1332 match self {
1333 FtModel::AtLeastOnce => "at_least_once",
1334 FtModel::ExactlyOnce => "exactly_once",
1335 }
1336 }
1337}
1338
1339pub struct FtModelUnknown;
1340
1341impl FromStr for FtModel {
1342 type Err = FtModelUnknown;
1343
1344 fn from_str(s: &str) -> Result<Self, Self::Err> {
1345 match s.to_ascii_lowercase().as_str() {
1346 "exactly_once" => Ok(Self::ExactlyOnce),
1347 "at_least_once" => Ok(Self::AtLeastOnce),
1348 _ => Err(FtModelUnknown),
1349 }
1350 }
1351}
1352
1353/// Describes an input connector configuration
1354#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1355pub struct InputEndpointConfig {
1356 /// The name of the input stream of the circuit that this endpoint is
1357 /// connected to.
1358 pub stream: Cow<'static, str>,
1359
1360 /// Connector configuration.
1361 #[serde(flatten)]
1362 pub connector_config: ConnectorConfig,
1363}
1364
1365impl InputEndpointConfig {
1366 pub fn new(stream: impl Into<Cow<'static, str>>, connector_config: ConnectorConfig) -> Self {
1367 Self {
1368 stream: stream.into(),
1369 connector_config,
1370 }
1371 }
1372}
1373
1374/// Deserialize the `start_after` property of a connector configuration.
1375/// It requires a non-standard deserialization because we want to accept
1376/// either a string or an array of strings.
1377fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
1378where
1379 D: Deserializer<'de>,
1380{
1381 let value = Option::<JsonValue>::deserialize(deserializer)?;
1382 match value {
1383 Some(JsonValue::String(s)) => Ok(Some(vec![s])),
1384 Some(JsonValue::Array(arr)) => {
1385 let vec = arr
1386 .into_iter()
1387 .map(|item| {
1388 item.as_str()
1389 .map(|s| s.to_string())
1390 .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
1391 })
1392 .collect::<Result<Vec<String>, _>>()?;
1393 Ok(Some(vec))
1394 }
1395 Some(JsonValue::Null) | None => Ok(None),
1396 _ => Err(serde::de::Error::custom(
1397 "invalid 'start_after' property: expected a string, an array of strings, or null",
1398 )),
1399 }
1400}
1401
1402/// A data connector's configuration
1403#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1404pub struct ConnectorConfig {
1405 /// Transport endpoint configuration.
1406 pub transport: TransportConfig,
1407
1408 #[serde(skip_serializing_if = "Option::is_none")]
1409 pub preprocessor: Option<Vec<PreprocessorConfig>>,
1410
1411 /// Parser configuration.
1412 pub format: Option<FormatConfig>,
1413
1414 /// Name of the index that the connector is attached to.
1415 ///
1416 /// This property is valid for output connectors only. It is used with data
1417 /// transports and formats that expect output updates in the form of key/value
1418 /// pairs, where the key typically represents a unique id associated with the
1419 /// table or view.
1420 ///
1421 /// To support such output formats, an output connector can be attached to an
1422 /// index created using the SQL CREATE INDEX statement. An index of a table
1423 /// or view contains the same updates as the table or view itself, indexed by
1424 /// one or more key columns.
1425 ///
1426 /// See individual connector documentation for details on how they work
1427 /// with indexes.
1428 pub index: Option<String>,
1429
1430 /// Output buffer configuration.
1431 #[serde(flatten)]
1432 pub output_buffer_config: OutputBufferConfig,
1433
1434 /// Maximum number of records from this connector to process in a single batch.
1435 ///
1436 /// When set, this caps how many records are taken from the connector’s input
1437 /// buffer and pushed through the circuit at once.
1438 ///
1439 /// This is typically configured lower than `max_queued_records` to allow the
1440 /// connector time to restart and refill its buffer while a batch is being
1441 /// processed.
1442 ///
1443 /// Not all input adapters honor this limit.
1444 ///
1445 /// If this is not set, the batch size is derived from `max_worker_batch_size`.
1446 #[serde(skip_serializing_if = "Option::is_none")]
1447 pub max_batch_size: Option<u64>,
1448
1449 /// Maximum number of records processed per batch, per worker thread.
1450 ///
1451 /// When `max_batch_size` is not set, this setting is used to cap
1452 /// the number of records that can be taken from the connector’s input
1453 /// buffer and pushed through the circuit at once. The effective batch size is computed as:
1454 /// `max_worker_batch_size × workers`.
1455 ///
1456 /// This provides an alternative to `max_batch_size` that automatically adjusts batch
1457 /// size as the number of worker threads changes to maintain constant amount of
1458 /// work per worker per batch.
1459 ///
1460 /// Defaults to 10,000 records per worker.
1461 #[serde(skip_serializing_if = "Option::is_none")]
1462 pub max_worker_batch_size: Option<u64>,
1463
1464 /// Backpressure threshold.
1465 ///
1466 /// Maximal number of records queued by the endpoint before the endpoint
1467 /// is paused by the backpressure mechanism.
1468 ///
1469 /// For input endpoints, this setting bounds the number of records that have
1470 /// been received from the input transport but haven't yet been consumed by
1471 /// the circuit since the circuit, since the circuit is still busy processing
1472 /// previous inputs.
1473 ///
1474 /// For output endpoints, this setting bounds the number of records that have
1475 /// been produced by the circuit but not yet sent via the output transport endpoint
1476 /// nor stored in the output buffer (see `enable_output_buffer`).
1477 ///
1478 /// Note that this is not a hard bound: there can be a small delay between
1479 /// the backpressure mechanism is triggered and the endpoint is paused, during
1480 /// which more data may be queued.
1481 ///
1482 /// The default is 1 million.
1483 #[serde(default = "default_max_queued_records")]
1484 pub max_queued_records: u64,
1485
1486 /// Create connector in paused state.
1487 ///
1488 /// The default is `false`.
1489 #[serde(default)]
1490 pub paused: bool,
1491
1492 /// Arbitrary user-defined text labels associated with the connector.
1493 ///
1494 /// These labels can be used in conjunction with the `start_after` property
1495 /// to control the start order of connectors.
1496 #[serde(default)]
1497 pub labels: Vec<String>,
1498
1499 /// Start the connector after all connectors with specified labels.
1500 ///
1501 /// This property is used to control the start order of connectors.
1502 /// The connector will not start until all connectors with the specified
1503 /// labels have finished processing all inputs.
1504 #[serde(deserialize_with = "deserialize_start_after")]
1505 #[serde(default)]
1506 pub start_after: Option<Vec<String>>,
1507}
1508
1509impl ConnectorConfig {
1510 pub fn new(transport: TransportConfig, format: Option<FormatConfig>) -> Self {
1511 Self {
1512 transport,
1513 preprocessor: None,
1514 format,
1515 index: None,
1516 output_buffer_config: Default::default(),
1517 max_batch_size: None,
1518 max_worker_batch_size: None,
1519 max_queued_records: default_max_queued_records(),
1520 paused: false,
1521 labels: Vec::new(),
1522 start_after: None,
1523 }
1524 }
1525
1526 pub fn with_max_batch_size(mut self, max_batch_size: Option<u64>) -> Self {
1527 self.max_batch_size = max_batch_size;
1528 self
1529 }
1530
1531 pub fn with_max_queued_records(mut self, max_queued_records: u64) -> Self {
1532 self.max_queued_records = max_queued_records;
1533 self
1534 }
1535
1536 /// Compare two configs modulo the `paused` field.
1537 ///
1538 /// Used to compare checkpointed and current connector configs.
1539 pub fn equal_modulo_paused(&self, other: &Self) -> bool {
1540 let mut a = self.clone();
1541 let mut b = other.clone();
1542 a.paused = false;
1543 b.paused = false;
1544 a == b
1545 }
1546}
1547
1548#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1549#[serde(default)]
1550pub struct OutputBufferConfig {
1551 /// Enable output buffering.
1552 ///
1553 /// The output buffering mechanism allows decoupling the rate at which the pipeline
1554 /// pushes changes to the output transport from the rate of input changes.
1555 ///
1556 /// By default, output updates produced by the pipeline are pushed directly to
1557 /// the output transport. Some destinations may prefer to receive updates in fewer
1558 /// bigger batches. For instance, when writing Parquet files, producing
1559 /// one bigger file every few minutes is usually better than creating
1560 /// small files every few milliseconds.
1561 ///
1562 /// To achieve such input/output decoupling, users can enable output buffering by
1563 /// setting the `enable_output_buffer` flag to `true`. When buffering is enabled, output
1564 /// updates produced by the pipeline are consolidated in an internal buffer and are
1565 /// pushed to the output transport when one of several conditions is satisfied:
1566 ///
1567 /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
1568 /// milliseconds.
1569 /// * buffer size exceeds `max_output_buffer_size_records` records.
1570 ///
1571 /// This flag is `false` by default.
1572 // TODO: on-demand output triggered via the API.
1573 pub enable_output_buffer: bool,
1574
1575 /// Maximum time in milliseconds data is kept in the output buffer.
1576 ///
1577 /// By default, data is kept in the buffer indefinitely until one of
1578 /// the other output conditions is satisfied. When this option is
1579 /// set the buffer will be flushed at most every
1580 /// `max_output_buffer_time_millis` milliseconds.
1581 ///
1582 /// NOTE: this configuration option requires the `enable_output_buffer` flag
1583 /// to be set.
1584 pub max_output_buffer_time_millis: usize,
1585
1586 /// Maximum number of updates to be kept in the output buffer.
1587 ///
1588 /// This parameter bounds the maximal size of the buffer.
1589 /// Note that the size of the buffer is not always equal to the
1590 /// total number of updates output by the pipeline. Updates to the
1591 /// same record can overwrite or cancel previous updates.
1592 ///
1593 /// By default, the buffer can grow indefinitely until one of
1594 /// the other output conditions is satisfied.
1595 ///
1596 /// NOTE: this configuration option requires the `enable_output_buffer` flag
1597 /// to be set.
1598 pub max_output_buffer_size_records: usize,
1599}
1600
1601impl Default for OutputBufferConfig {
1602 fn default() -> Self {
1603 Self {
1604 enable_output_buffer: false,
1605 max_output_buffer_size_records: usize::MAX,
1606 max_output_buffer_time_millis: usize::MAX,
1607 }
1608 }
1609}
1610
1611impl OutputBufferConfig {
1612 pub fn validate(&self) -> Result<(), String> {
1613 if self.enable_output_buffer
1614 && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
1615 && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
1616 {
1617 return Err(
1618 "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
1619 .to_string(),
1620 );
1621 }
1622
1623 Ok(())
1624 }
1625}
1626
1627/// Describes an output connector configuration
1628#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1629pub struct OutputEndpointConfig {
1630 /// The name of the output stream of the circuit that this endpoint is
1631 /// connected to.
1632 pub stream: Cow<'static, str>,
1633
1634 /// Connector configuration.
1635 #[serde(flatten)]
1636 pub connector_config: ConnectorConfig,
1637}
1638
1639impl OutputEndpointConfig {
1640 pub fn new(stream: impl Into<Cow<'static, str>>, connector_config: ConnectorConfig) -> Self {
1641 Self {
1642 stream: stream.into(),
1643 connector_config,
1644 }
1645 }
1646}
1647
1648/// Transport-specific endpoint configuration passed to
1649/// `crate::OutputTransport::new_endpoint`
1650/// and `crate::InputTransport::new_endpoint`.
1651#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1652#[serde(tag = "name", content = "config", rename_all = "snake_case")]
1653pub enum TransportConfig {
1654 FileInput(FileInputConfig),
1655 FileOutput(FileOutputConfig),
1656 NatsInput(NatsInputConfig),
1657 KafkaInput(KafkaInputConfig),
1658 KafkaOutput(KafkaOutputConfig),
1659 PubSubInput(PubSubInputConfig),
1660 UrlInput(UrlInputConfig),
1661 S3Input(S3InputConfig),
1662 DeltaTableInput(DeltaTableReaderConfig),
1663 DeltaTableOutput(DeltaTableWriterConfig),
1664 RedisOutput(RedisOutputConfig),
1665 // Prevent rust from complaining about large size difference between enum variants.
1666 IcebergInput(Box<IcebergReaderConfig>),
1667 PostgresInput(PostgresReaderConfig),
1668 PostgresCdcInput(PostgresCdcReaderConfig),
1669 PostgresOutput(PostgresWriterConfig),
1670 Datagen(DatagenInputConfig),
1671 Nexmark(NexmarkInputConfig),
1672 /// Direct HTTP input: cannot be instantiated through API
1673 HttpInput(HttpInputConfig),
1674 /// Direct HTTP output: cannot be instantiated through API
1675 HttpOutput(HttpOutputConfig),
1676 /// Ad hoc input: cannot be instantiated through API
1677 AdHocInput(AdHocInputConfig),
1678 ClockInput(ClockConfig),
1679}
1680
1681impl TransportConfig {
1682 pub fn name(&self) -> String {
1683 match self {
1684 TransportConfig::FileInput(_) => "file_input".to_string(),
1685 TransportConfig::FileOutput(_) => "file_output".to_string(),
1686 TransportConfig::NatsInput(_) => "nats_input".to_string(),
1687 TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
1688 TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
1689 TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
1690 TransportConfig::UrlInput(_) => "url_input".to_string(),
1691 TransportConfig::S3Input(_) => "s3_input".to_string(),
1692 TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
1693 TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
1694 TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
1695 TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
1696 TransportConfig::PostgresCdcInput(_) => "postgres_cdc_input".to_string(),
1697 TransportConfig::PostgresOutput(_) => "postgres_output".to_string(),
1698 TransportConfig::Datagen(_) => "datagen".to_string(),
1699 TransportConfig::Nexmark(_) => "nexmark".to_string(),
1700 TransportConfig::HttpInput(_) => "http_input".to_string(),
1701 TransportConfig::HttpOutput(_) => "http_output".to_string(),
1702 TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
1703 TransportConfig::RedisOutput(_) => "redis_output".to_string(),
1704 TransportConfig::ClockInput(_) => "clock".to_string(),
1705 }
1706 }
1707
1708 /// Returns true if the connector is transient, i.e., is created and destroyed
1709 /// at runtime on demand, rather than being configured as part of the pipeline.
1710 pub fn is_transient(&self) -> bool {
1711 matches!(
1712 self,
1713 TransportConfig::AdHocInput(_)
1714 | TransportConfig::HttpInput(_)
1715 | TransportConfig::HttpOutput(_)
1716 | TransportConfig::ClockInput(_)
1717 )
1718 }
1719
1720 pub fn is_http_input(&self) -> bool {
1721 matches!(self, TransportConfig::HttpInput(_))
1722 }
1723}
1724
1725/// Data format specification used to parse raw data received from the
1726/// endpoint or to encode data sent to the endpoint.
1727#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
1728pub struct FormatConfig {
1729 /// Format name, e.g., "csv", "json", "bincode", etc.
1730 pub name: Cow<'static, str>,
1731
1732 /// Format-specific parser or encoder configuration.
1733 #[serde(default)]
1734 #[schema(value_type = Object)]
1735 pub config: JsonValue,
1736}
1737
1738#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, ToSchema)]
1739#[serde(default)]
1740pub struct ResourceConfig {
1741 /// The minimum number of CPU cores to reserve
1742 /// for an instance of this pipeline
1743 #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1744 pub cpu_cores_min: Option<f64>,
1745
1746 /// The maximum number of CPU cores to reserve
1747 /// for an instance of this pipeline
1748 #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1749 pub cpu_cores_max: Option<f64>,
1750
1751 /// The minimum memory in Megabytes to reserve
1752 /// for an instance of this pipeline
1753 pub memory_mb_min: Option<u64>,
1754
1755 /// The maximum memory in Megabytes to reserve
1756 /// for an instance of this pipeline
1757 pub memory_mb_max: Option<u64>,
1758
1759 /// The total storage in Megabytes to reserve
1760 /// for an instance of this pipeline
1761 pub storage_mb_max: Option<u64>,
1762
1763 /// Storage class to use for an instance of this pipeline.
1764 /// The class determines storage performance such as IOPS and throughput.
1765 pub storage_class: Option<String>,
1766
1767 /// Kubernetes service account name to use for an instance of this pipeline.
1768 /// The account determines permissions and access controls.
1769 pub service_account_name: Option<String>,
1770
1771 /// Kubernetes namespace to use for an instance of this pipeline.
1772 /// The namespace determines the scope of names for resources created
1773 /// for the pipeline.
1774 /// If not set, the pipeline will be deployed in the same namespace
1775 /// as the control-plane.
1776 pub namespace: Option<String>,
1777}