Skip to main content

feldera_types/
config.rs

1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure.  The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs.  We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::preprocess::PreprocessorConfig;
9use crate::program_schema::ProgramSchema;
10use crate::secret_resolver::default_secrets_directory;
11use crate::transport::adhoc::AdHocInputConfig;
12use crate::transport::clock::ClockConfig;
13use crate::transport::datagen::DatagenInputConfig;
14use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
15use crate::transport::file::{FileInputConfig, FileOutputConfig};
16use crate::transport::http::HttpInputConfig;
17use crate::transport::iceberg::IcebergReaderConfig;
18use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
19use crate::transport::nats::NatsInputConfig;
20use crate::transport::nexmark::NexmarkInputConfig;
21use crate::transport::postgres::{PostgresReaderConfig, PostgresWriterConfig};
22use crate::transport::pubsub::PubSubInputConfig;
23use crate::transport::redis::RedisOutputConfig;
24use crate::transport::s3::S3InputConfig;
25use crate::transport::url::UrlInputConfig;
26use core::fmt;
27use feldera_ir::{MirNode, MirNodeId};
28use serde::de::{self, MapAccess, Visitor};
29use serde::{Deserialize, Deserializer, Serialize};
30use serde_json::Value as JsonValue;
31use std::collections::HashMap;
32use std::fmt::Display;
33use std::path::Path;
34use std::str::FromStr;
35use std::time::Duration;
36use std::{borrow::Cow, cmp::max, collections::BTreeMap};
37use utoipa::ToSchema;
38use utoipa::openapi::{ObjectBuilder, OneOfBuilder, Ref, RefOr, Schema, SchemaType};
39
40const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
41
42/// Default value of `ConnectorConfig::max_queued_records`.
43pub const fn default_max_queued_records() -> u64 {
44    1_000_000
45}
46
47pub const DEFAULT_MAX_WORKER_BATCH_SIZE: u64 = 10_000;
48
49pub const DEFAULT_CLOCK_RESOLUTION_USECS: u64 = 1_000_000;
50
51/// Program information included in the pipeline configuration.
52#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)]
53pub struct ProgramIr {
54    /// The MIR of the program.
55    pub mir: HashMap<MirNodeId, MirNode>,
56    /// Program schema.
57    pub program_schema: ProgramSchema,
58}
59
60/// Pipeline deployment configuration.
61/// It represents configuration entries directly provided by the user
62/// (e.g., runtime configuration) and entries derived from the schema
63/// of the compiled program (e.g., connectors). Storage configuration,
64/// if applicable, is set by the runner.
65#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq)]
66pub struct PipelineConfig {
67    /// Global controller configuration.
68    #[serde(flatten)]
69    #[schema(inline)]
70    pub global: RuntimeConfig,
71
72    /// Configuration for multihost pipelines.
73    ///
74    /// The presence of this field indicates that the pipeline is running in
75    /// multihost mode.  In the pod with ordinal 0, this triggers starting the
76    /// coordinator process.  In all pods, this tells the pipeline process to
77    /// await a connection from the coordinator instead of initializing the
78    /// pipeline immediately.
79    pub multihost: Option<MultihostConfig>,
80
81    /// Unique system-generated name of the pipeline (format: `pipeline-<uuid>`).
82    /// It is unique across all tenants and cannot be changed.
83    ///
84    /// The `<uuid>` is also used in the naming of various resources that back the pipeline,
85    /// and as such this name is useful to find/identify corresponding resources.
86    pub name: Option<String>,
87
88    /// Name given by the tenant to the pipeline. It is only unique within the same tenant, and can
89    /// be changed by the tenant when the pipeline is stopped.
90    ///
91    /// Given a specific tenant, it can be used to find/identify a specific pipeline of theirs.
92    pub given_name: Option<String>,
93
94    /// Configuration for persistent storage
95    ///
96    /// If `global.storage` is `Some(_)`, this field must be set to some
97    /// [`StorageConfig`].  If `global.storage` is `None``, the pipeline ignores
98    /// this field.
99    #[serde(default)]
100    pub storage_config: Option<StorageConfig>,
101
102    /// Directory containing values of secrets.
103    ///
104    /// If this is not set, a default directory is used.
105    pub secrets_dir: Option<String>,
106
107    /// Input endpoint configuration.
108    #[serde(default)]
109    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
110
111    /// Output endpoint configuration.
112    #[serde(default)]
113    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
114
115    /// Program information.
116    #[serde(default)]
117    pub program_ir: Option<ProgramIr>,
118}
119
120impl PipelineConfig {
121    pub fn max_parallel_connector_init(&self) -> u64 {
122        max(
123            self.global
124                .max_parallel_connector_init
125                .unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
126            1,
127        )
128    }
129
130    pub fn with_storage(self, storage: Option<(StorageConfig, StorageOptions)>) -> Self {
131        let (storage_config, storage_options) = storage.unzip();
132        Self {
133            global: RuntimeConfig {
134                storage: storage_options,
135                ..self.global
136            },
137            storage_config,
138            ..self
139        }
140    }
141
142    pub fn storage(&self) -> Option<(&StorageConfig, &StorageOptions)> {
143        let storage_options = self.global.storage.as_ref();
144        let storage_config = self.storage_config.as_ref();
145        storage_config.zip(storage_options)
146    }
147
148    /// Returns `self.secrets_dir`, or the default secrets directory if it isn't
149    /// set.
150    pub fn secrets_dir(&self) -> &Path {
151        match &self.secrets_dir {
152            Some(dir) => Path::new(dir.as_str()),
153            None => default_secrets_directory(),
154        }
155    }
156
157    /// Abbreviated config that can be printed in the log on pipeline startup.
158    pub fn display_summary(&self) -> String {
159        // TODO: we may want to further abbreviate connector config.
160        let summary = serde_json::json!({
161            "name": self.name,
162            "given_name": self.given_name,
163            "global": self.global,
164            "storage_config": self.storage_config,
165            "secrets_dir": self.secrets_dir,
166            "inputs": self.inputs,
167            "outputs": self.outputs
168        });
169
170        serde_json::to_string_pretty(&summary).unwrap_or_else(|_| "{}".to_string())
171    }
172}
173
174/// A subset of fields in `PipelineConfig` that are generated by the compiler.
175/// These fields are shipped to the pipeline by the compilation server along with
176/// the program binary.
177// Note: An alternative would be to embed these fields in the program binary itself
178// as static strings. This would work well for program IR, but it would require recompiling
179// the program anytime a connector config changes, whereas today connector changes
180// do not require recompilation.
181#[derive(Default, Deserialize, Serialize, Eq, PartialEq, Debug, Clone)]
182pub struct PipelineConfigProgramInfo {
183    /// Input endpoint configuration.
184    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
185
186    /// Output endpoint configuration.
187    #[serde(default)]
188    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
189
190    /// Program information.
191    #[serde(default)]
192    pub program_ir: Option<ProgramIr>,
193}
194
195/// Configuration for a multihost Feldera pipeline.
196///
197/// This configuration is primarily for the coordinator.
198#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
199pub struct MultihostConfig {
200    /// Number of hosts to launch.
201    ///
202    /// For the configuration to be truly multihost, this should be at least 2.
203    /// A value of 1 still runs the multihost coordinator but it only
204    /// coordinates a single host.
205    pub hosts: usize,
206}
207
208impl Default for MultihostConfig {
209    fn default() -> Self {
210        Self { hosts: 1 }
211    }
212}
213
214/// Configuration for persistent storage in a [`PipelineConfig`].
215#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
216pub struct StorageConfig {
217    /// A directory to keep pipeline state, as a path on the filesystem of the
218    /// machine or container where the pipeline will run.
219    ///
220    /// When storage is enabled, this directory stores the data for
221    /// [StorageBackendConfig::Default].
222    ///
223    /// When fault tolerance is enabled, this directory stores checkpoints and
224    /// the log.
225    pub path: String,
226
227    /// How to cache access to storage in this pipeline.
228    #[serde(default)]
229    pub cache: StorageCacheConfig,
230}
231
232impl StorageConfig {
233    pub fn path(&self) -> &Path {
234        Path::new(&self.path)
235    }
236}
237
238/// How to cache access to storage within a Feldera pipeline.
239#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
240#[serde(rename_all = "snake_case")]
241pub enum StorageCacheConfig {
242    /// Use the operating system's page cache as the primary storage cache.
243    ///
244    /// This is the default because it currently performs better than
245    /// `FelderaCache`.
246    #[default]
247    PageCache,
248
249    /// Use Feldera's internal cache implementation.
250    ///
251    /// This is under development. It will become the default when its
252    /// performance exceeds that of `PageCache`.
253    FelderaCache,
254}
255
256impl StorageCacheConfig {
257    #[cfg(unix)]
258    pub fn to_custom_open_flags(&self) -> i32 {
259        match self {
260            StorageCacheConfig::PageCache => (),
261            StorageCacheConfig::FelderaCache => {
262                #[cfg(target_os = "linux")]
263                return libc::O_DIRECT;
264            }
265        }
266        0
267    }
268}
269
270/// Storage configuration for a pipeline.
271#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
272#[serde(default)]
273pub struct StorageOptions {
274    /// How to connect to the underlying storage.
275    pub backend: StorageBackendConfig,
276
277    /// For a batch of data maintained as part of a persistent index during a
278    /// pipeline run, the minimum estimated number of bytes to write it to
279    /// storage.
280    ///
281    /// This is provided for debugging and fine-tuning and should ordinarily be
282    /// left unset.
283    ///
284    /// A value of 0 will write even empty batches to storage, and nonzero
285    /// values provide a threshold.  `usize::MAX` would effectively disable
286    /// storage for such batches.  The default is 10,048,576 (10 MiB).
287    pub min_storage_bytes: Option<usize>,
288
289    /// For a batch of data passed through the pipeline during a single step,
290    /// the minimum estimated number of bytes to write it to storage.
291    ///
292    /// This is provided for debugging and fine-tuning and should ordinarily be
293    /// left unset.  A value of 0 will write even empty batches to storage, and
294    /// nonzero values provide a threshold.  `usize::MAX`, the default,
295    /// effectively disables storage for such batches.  If it is set to another
296    /// value, it should ordinarily be greater than or equal to
297    /// `min_storage_bytes`.
298    pub min_step_storage_bytes: Option<usize>,
299
300    /// The form of compression to use in data batches.
301    ///
302    /// Compression has a CPU cost but it can take better advantage of limited
303    /// NVMe and network bandwidth, which means that it can increase overall
304    /// performance.
305    pub compression: StorageCompression,
306
307    /// The maximum size of the in-memory storage cache, in MiB.
308    ///
309    /// If set, the specified cache size is spread across all the foreground and
310    /// background threads. If unset, each foreground or background thread cache
311    /// is limited to 256 MiB.
312    pub cache_mib: Option<usize>,
313}
314
315/// Backend storage configuration.
316#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
317#[serde(tag = "name", content = "config", rename_all = "snake_case")]
318pub enum StorageBackendConfig {
319    /// Use the default storage configuration.
320    ///
321    /// This currently uses the local file system.
322    #[default]
323    Default,
324
325    /// Use the local file system.
326    ///
327    /// This uses ordinary system file operations.
328    File(Box<FileBackendConfig>),
329
330    /// Object storage.
331    Object(ObjectStorageConfig),
332}
333
334impl Display for StorageBackendConfig {
335    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336        match self {
337            StorageBackendConfig::Default => write!(f, "default"),
338            StorageBackendConfig::File(_) => write!(f, "file"),
339            StorageBackendConfig::Object(_) => write!(f, "object"),
340        }
341    }
342}
343
344/// Storage compression algorithm.
345#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
346#[serde(rename_all = "snake_case")]
347pub enum StorageCompression {
348    /// Use Feldera's default compression algorithm.
349    ///
350    /// The default may change as Feldera's performance is tuned and new
351    /// algorithms are introduced.
352    #[default]
353    Default,
354
355    /// Do not compress.
356    None,
357
358    /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
359    Snappy,
360}
361
362#[derive(Debug, Clone, Eq, PartialEq)]
363pub enum StartFromCheckpoint {
364    Latest,
365    Uuid(uuid::Uuid),
366}
367
368impl ToSchema<'_> for StartFromCheckpoint {
369    fn schema() -> (
370        &'static str,
371        utoipa::openapi::RefOr<utoipa::openapi::schema::Schema>,
372    ) {
373        (
374            "StartFromCheckpoint",
375            utoipa::openapi::RefOr::T(Schema::OneOf(
376                OneOfBuilder::new()
377                    .item(
378                        ObjectBuilder::new()
379                            .schema_type(SchemaType::String)
380                            .enum_values(Some(["latest"].into_iter()))
381                            .build(),
382                    )
383                    .item(
384                        ObjectBuilder::new()
385                            .schema_type(SchemaType::String)
386                            .format(Some(utoipa::openapi::SchemaFormat::KnownFormat(
387                                utoipa::openapi::KnownFormat::Uuid,
388                            )))
389                            .build(),
390                    )
391                    .nullable(true)
392                    .build(),
393            )),
394        )
395    }
396}
397
398impl<'de> Deserialize<'de> for StartFromCheckpoint {
399    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
400    where
401        D: Deserializer<'de>,
402    {
403        struct StartFromCheckpointVisitor;
404
405        impl<'de> Visitor<'de> for StartFromCheckpointVisitor {
406            type Value = StartFromCheckpoint;
407
408            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
409                formatter.write_str("a UUID string or the string \"latest\"")
410            }
411
412            fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
413            where
414                E: de::Error,
415            {
416                if value == "latest" {
417                    Ok(StartFromCheckpoint::Latest)
418                } else {
419                    uuid::Uuid::parse_str(value)
420                        .map(StartFromCheckpoint::Uuid)
421                        .map_err(|_| E::invalid_value(serde::de::Unexpected::Str(value), &self))
422                }
423            }
424        }
425
426        deserializer.deserialize_str(StartFromCheckpointVisitor)
427    }
428}
429
430impl Serialize for StartFromCheckpoint {
431    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
432    where
433        S: serde::Serializer,
434    {
435        match self {
436            StartFromCheckpoint::Latest => serializer.serialize_str("latest"),
437            StartFromCheckpoint::Uuid(uuid) => serializer.serialize_str(&uuid.to_string()),
438        }
439    }
440}
441
442#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
443pub struct SyncConfig {
444    /// The endpoint URL for the storage service.
445    ///
446    /// This is typically required for custom or local S3-compatible storage providers like MinIO.
447    /// Example: `http://localhost:9000`
448    ///
449    /// Relevant rclone config key: [`endpoint`](https://rclone.org/s3/#s3-endpoint)
450    pub endpoint: Option<String>,
451
452    /// The name of the storage bucket.
453    ///
454    /// This may include a path to a folder inside the bucket (e.g., `my-bucket/data`).
455    pub bucket: String,
456
457    /// The region that this bucket is in.
458    ///
459    /// Leave empty for Minio or the default region (`us-east-1` for AWS).
460    pub region: Option<String>,
461
462    /// The name of the cloud storage provider (e.g., `"AWS"`, `"Minio"`).
463    ///
464    /// Used for provider-specific behavior in rclone.
465    /// If omitted, defaults to `"Other"`.
466    ///
467    /// See [rclone S3 provider documentation](https://rclone.org/s3/#s3-provider)
468    pub provider: Option<String>,
469
470    /// The access key used to authenticate with the storage provider.
471    ///
472    /// If not provided, rclone will fall back to environment-based credentials, such as
473    /// `RCLONE_S3_ACCESS_KEY_ID`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
474    /// this can be left empty to allow automatic authentication via the pod's service account.
475    pub access_key: Option<String>,
476
477    /// The secret key used together with the access key for authentication.
478    ///
479    /// If not provided, rclone will fall back to environment-based credentials, such as
480    /// `RCLONE_S3_SECRET_ACCESS_KEY`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
481    /// this can be left empty to allow automatic authentication via the pod's service account.
482    pub secret_key: Option<String>,
483
484    /// When set, the pipeline will try fetch the specified checkpoint from the
485    /// object store.
486    ///
487    /// If `fail_if_no_checkpoint` is `true`, the pipeline will fail to initialize.
488    pub start_from_checkpoint: Option<StartFromCheckpoint>,
489
490    /// When true, the pipeline will fail to initialize if fetching the
491    /// specified checkpoint fails (missing, download error).
492    /// When false, the pipeline will start from scratch instead.
493    ///
494    /// False by default.
495    #[schema(default = std::primitive::bool::default)]
496    #[serde(default)]
497    pub fail_if_no_checkpoint: bool,
498
499    /// The number of file transfers to run in parallel.
500    /// Default: 20
501    pub transfers: Option<u8>,
502
503    /// The number of checkers to run in parallel.
504    /// Default: 20
505    pub checkers: Option<u8>,
506
507    /// Set to skip post copy check of checksums, and only check the file sizes.
508    /// This can significantly improve the throughput.
509    /// Defualt: false
510    pub ignore_checksum: Option<bool>,
511
512    /// Number of streams to use for multi-thread downloads.
513    /// Default: 10
514    pub multi_thread_streams: Option<u8>,
515
516    /// Use multi-thread download for files above this size.
517    /// Format: `[size][Suffix]` (Example: 1G, 500M)
518    /// Supported suffixes: k|M|G|T
519    /// Default: 100M
520    pub multi_thread_cutoff: Option<String>,
521
522    /// The number of chunks of the same file that are uploaded for multipart uploads.
523    /// Default: 10
524    pub upload_concurrency: Option<u8>,
525
526    /// When `true`, the pipeline starts in **standby** mode; processing doesn't
527    /// start until activation (`POST /activate`).
528    /// If this pipeline was previously activated and the storage has not been
529    /// cleared, the pipeline will auto activate, no newer checkpoints will be
530    /// fetched.
531    ///
532    /// Standby behavior depends on `start_from_checkpoint`:
533    /// - If `latest`, pipeline continuously fetches the latest available
534    ///   checkpoint until activated.
535    /// - If checkpoint UUID, pipeline fetches this checkpoint once and waits
536    ///   in standby until activated.
537    ///
538    /// Default: `false`
539    #[schema(default = std::primitive::bool::default)]
540    #[serde(default)]
541    pub standby: bool,
542
543    /// The interval (in seconds) between each attempt to fetch the latest
544    /// checkpoint from object store while in standby mode.
545    ///
546    /// Applies only when `start_from_checkpoint` is set to `latest`.
547    ///
548    /// Default: 10 seconds
549    #[schema(default = default_pull_interval)]
550    #[serde(default = "default_pull_interval")]
551    pub pull_interval: u64,
552
553    /// The interval (in seconds) between each push of checkpoints to object store.
554    ///
555    /// Default: disabled (no periodic push).
556    #[serde(default)]
557    pub push_interval: Option<u64>,
558
559    /// Extra flags to pass to `rclone`.
560    ///
561    /// WARNING: Supplying incorrect or conflicting flags can break `rclone`.
562    /// Use with caution.
563    ///
564    /// Refer to the docs to see the supported flags:
565    /// - [Global flags](https://rclone.org/flags/)
566    /// - [S3 specific flags](https://rclone.org/s3/)
567    pub flags: Option<Vec<String>>,
568
569    /// The minimum number of checkpoints to retain in object store.
570    /// No checkpoints will be deleted if the total count is below this threshold.
571    ///
572    /// Default: 10
573    #[schema(default = default_retention_min_count)]
574    #[serde(default = "default_retention_min_count")]
575    pub retention_min_count: u32,
576
577    /// The minimum age (in days) a checkpoint must reach before it becomes
578    /// eligible for deletion. All younger checkpoints will be preserved.
579    ///
580    /// Default: 30
581    #[schema(default = default_retention_min_age)]
582    #[serde(default = "default_retention_min_age")]
583    pub retention_min_age: u32,
584
585    /// A read-only bucket used as a fallback checkpoint source.
586    ///
587    /// When the pipeline has no local checkpoint and `bucket` contains no
588    /// checkpoint either, it will attempt to fetch the checkpoint from this
589    /// location instead.  All connection settings (`endpoint`, `region`,
590    /// `provider`, `access_key`, `secret_key`) are shared with `bucket`.
591    ///
592    /// The pipeline **never writes** to `read_bucket`.
593    ///
594    /// Must point to a different location than `bucket`.
595    #[serde(default)]
596    pub read_bucket: Option<String>,
597}
598
599fn default_pull_interval() -> u64 {
600    10
601}
602
603fn default_retention_min_count() -> u32 {
604    10
605}
606
607fn default_retention_min_age() -> u32 {
608    30
609}
610
611impl SyncConfig {
612    pub fn validate(&self) -> Result<(), String> {
613        if self.standby && self.start_from_checkpoint.is_none() {
614            return Err(r#"invalid sync config: `standby` set to `true` but `start_from_checkpoint` not set.
615Standby mode requires `start_from_checkpoint` to be set.
616Consider setting `start_from_checkpoint` to `"latest"`."#.to_owned());
617        }
618
619        if let Some(ref rb) = self.read_bucket
620            && rb == &self.bucket
621        {
622            return Err(
623                "invalid sync config: `read_bucket` and `bucket` must point to different locations"
624                    .to_owned(),
625            );
626        }
627
628        Ok(())
629    }
630}
631
632/// Configuration for supplying a custom pipeline StatefulSet template via a Kubernetes ConfigMap.
633///
634/// Operators can provide a custom StatefulSet YAML that the Kubernetes runner will use when
635/// creating pipeline StatefulSets for a pipeline. The custom template must be stored as the
636/// value of a key in a ConfigMap in the same namespace as the pipeline; set `name` to the
637/// ConfigMap name and `key` to the entry that contains the template.
638///
639/// Recommendations and requirements:
640/// - **Start from the default template and modify it as needed.** The default template is present
641///   in ConfigMap named as `<release-name>-pipeline-template`, with key `pipelineTemplate` in the release
642///   namespace and should be used as a reference.
643/// - The template must contain a valid Kubernetes `StatefulSet` manifest in YAML form. The
644///   runner substitutes variables in the template before parsing; therefore the final YAML
645///   must be syntactically valid.
646/// - The runner performs simple string substitution for the following placeholders. Please ensure these
647///   placeholders are placed at appropriate location for their semantics:
648///   - `{id}`: pipeline Kubernetes name (used for object names and labels)
649///   - `{namespace}`: Kubernetes namespace where the pipeline runs
650///   - `{pipeline_executor_image}`: container image used to run the pipeline executor
651///   - `{binary_ref}`: program binary reference passed as an argument
652///   - `{program_info_ref}`: program info reference passed as an argument
653///   - `{pipeline_storage_path}`: mount path for persistent pipeline storage
654///   - `{storage_class_name}`: storage class name to use for PVCs (if applicable)
655///   - `{deployment_id}`: UUID identifying the deployment instance
656///   - `{deployment_initial}`: initial desired runtime status (e.g., `provisioning`)
657///   - `{bootstrap_policy}`: bootstrap policy value when applicable
658#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
659pub struct PipelineTemplateConfig {
660    /// Name of the ConfigMap containing the pipeline template.
661    pub name: String,
662    /// Key in the ConfigMap containing the pipeline template.
663    ///
664    /// If not set, defaults to `pipelineTemplate`.
665    #[schema(default = default_pipeline_template_key)]
666    #[serde(default = "default_pipeline_template_key")]
667    pub key: String,
668}
669
670fn default_pipeline_template_key() -> String {
671    "pipelineTemplate".to_string()
672}
673
674#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
675pub struct ObjectStorageConfig {
676    /// URL.
677    ///
678    /// The following URL schemes are supported:
679    ///
680    /// * S3:
681    ///   - `s3://<bucket>/<path>`
682    ///   - `s3a://<bucket>/<path>`
683    ///   - `https://s3.<region>.amazonaws.com/<bucket>`
684    ///   - `https://<bucket>.s3.<region>.amazonaws.com`
685    ///   - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
686    /// * Google Cloud Storage:
687    ///   - `gs://<bucket>/<path>`
688    /// * Microsoft Azure Blob Storage:
689    ///   - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
690    ///   - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
691    ///   - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
692    ///   - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
693    ///   - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
694    ///   - `azure://<container>/<path>` (custom)
695    ///   - `https://<account>.dfs.core.windows.net`
696    ///   - `https://<account>.blob.core.windows.net`
697    ///   - `https://<account>.blob.core.windows.net/<container>`
698    ///   - `https://<account>.dfs.fabric.microsoft.com`
699    ///   - `https://<account>.dfs.fabric.microsoft.com/<container>`
700    ///   - `https://<account>.blob.fabric.microsoft.com`
701    ///   - `https://<account>.blob.fabric.microsoft.com/<container>`
702    ///
703    /// Settings derived from the URL will override other settings.
704    pub url: String,
705
706    /// Additional options as key-value pairs.
707    ///
708    /// The following keys are supported:
709    ///
710    /// * S3:
711    ///   - `access_key_id`: AWS Access Key.
712    ///   - `secret_access_key`: AWS Secret Access Key.
713    ///   - `region`: Region.
714    ///   - `default_region`: Default region.
715    ///   - `endpoint`: Custom endpoint for communicating with S3,
716    ///     e.g. `https://localhost:4566` for testing against a localstack
717    ///     instance.
718    ///   - `token`: Token to use for requests (passed to underlying provider).
719    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
720    /// * Google Cloud Storage:
721    ///   - `service_account`: Path to the service account file.
722    ///   - `service_account_key`: The serialized service account key.
723    ///   - `google_application_credentials`: Application credentials path.
724    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
725    /// * Microsoft Azure Blob Storage:
726    ///   - `access_key`: Azure Access Key.
727    ///   - `container_name`: Azure Container Name.
728    ///   - `account`: Azure Account.
729    ///   - `bearer_token_authorization`: Static bearer token for authorizing requests.
730    ///   - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
731    ///   - `client_secret`: Client secret for use in client secret flow.
732    ///   - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
733    ///   - `endpoint`: Override the endpoint for communicating with blob storage.
734    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
735    ///
736    /// Options set through the URL take precedence over those set with these
737    /// options.
738    #[serde(flatten)]
739    pub other_options: BTreeMap<String, String>,
740}
741
742/// Configuration for local file system access.
743#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
744#[serde(default)]
745pub struct FileBackendConfig {
746    /// Whether to use background threads for file I/O.
747    ///
748    /// Background threads should improve performance, but they can reduce
749    /// performance if too few cores are available. This is provided for
750    /// debugging and fine-tuning and should ordinarily be left unset.
751    pub async_threads: Option<bool>,
752
753    /// Per-I/O operation sleep duration, in milliseconds.
754    ///
755    /// This is for simulating slow storage devices.  Do not use this in
756    /// production.
757    pub ioop_delay: Option<u64>,
758
759    /// Configuration to synchronize checkpoints to object store.
760    pub sync: Option<SyncConfig>,
761}
762
763/// Global pipeline configuration settings. This is the publicly
764/// exposed type for users to configure pipelines.
765#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
766#[serde(default)]
767pub struct RuntimeConfig {
768    /// Number of DBSP worker threads.
769    ///
770    /// Each DBSP "foreground" worker thread is paired with a "background"
771    /// thread for LSM merging, making the total number of threads twice the
772    /// specified number.
773    ///
774    /// The typical sweet spot for the number of workers is between 4 and 16.
775    /// Each worker increases overall memory consumption for data structures
776    /// used during a step.
777    pub workers: u16,
778
779    /// Number of DBSP hosts.
780    ///
781    /// The worker threads are evenly divided among the hosts.  For single-host
782    /// deployments, this should be 1 (the default).
783    ///
784    /// Multihost pipelines are an enterprise-only preview feature.
785    pub hosts: usize,
786
787    /// Storage configuration.
788    ///
789    /// - If this is `None`, the default, the pipeline's state is kept in
790    ///   in-memory data-structures.  This is useful if the pipeline's state
791    ///   will fit in memory and if the pipeline is ephemeral and does not need
792    ///   to be recovered after a restart. The pipeline will most likely run
793    ///   faster since it does not need to access storage.
794    ///
795    /// - If set, the pipeline's state is kept on storage.  This allows the
796    ///   pipeline to work with state that will not fit into memory. It also
797    ///   allows the state to be checkpointed and recovered across restarts.
798    #[serde(deserialize_with = "deserialize_storage_options")]
799    pub storage: Option<StorageOptions>,
800
801    /// Fault tolerance configuration.
802    #[serde(deserialize_with = "deserialize_fault_tolerance")]
803    pub fault_tolerance: FtConfig,
804
805    /// Enable CPU profiler.
806    ///
807    /// The default value is `true`.
808    pub cpu_profiler: bool,
809
810    /// Enable pipeline tracing.
811    pub tracing: bool,
812
813    /// Jaeger tracing endpoint to send tracing information to.
814    pub tracing_endpoint_jaeger: String,
815
816    /// Minimal input batch size.
817    ///
818    /// The controller delays pushing input records to the circuit until at
819    /// least `min_batch_size_records` records have been received (total
820    /// across all endpoints) or `max_buffering_delay_usecs` microseconds
821    /// have passed since at least one input records has been buffered.
822    /// Defaults to 0.
823    pub min_batch_size_records: u64,
824
825    /// Maximal delay in microseconds to wait for `min_batch_size_records` to
826    /// get buffered by the controller, defaults to 0.
827    pub max_buffering_delay_usecs: u64,
828
829    /// Resource reservations and limits. This is enforced
830    /// only in Feldera Cloud.
831    pub resources: ResourceConfig,
832
833    /// Real-time clock resolution in microseconds.
834    ///
835    /// This parameter controls the execution of queries that use the `NOW()` function.  The output of such
836    /// queries depends on the real-time clock and can change over time without any external
837    /// inputs.  If the query uses `NOW()`, the pipeline will update the clock value and trigger incremental
838    /// recomputation at most each `clock_resolution_usecs` microseconds.  If the query does not use
839    /// `NOW()`, then clock value updates are suppressed and the pipeline ignores this setting.
840    ///
841    /// It is set to 1 second (1,000,000 microseconds) by default.
842    pub clock_resolution_usecs: Option<u64>,
843
844    /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
845    /// its worker threads.  Specify at least twice as many CPU numbers as
846    /// workers.  CPUs are generally numbered starting from 0.  The pipeline
847    /// might not be able to honor CPU pinning requests.
848    ///
849    /// CPU pinning can make pipelines run faster and perform more consistently,
850    /// as long as different pipelines running on the same machine are pinned to
851    /// different CPUs.
852    pub pin_cpus: Vec<usize>,
853
854    /// Timeout in seconds for the `Provisioning` phase of the pipeline.
855    /// Setting this value will override the default of the runner.
856    pub provisioning_timeout_secs: Option<u64>,
857
858    /// The maximum number of connectors initialized in parallel during pipeline
859    /// startup.
860    ///
861    /// At startup, the pipeline must initialize all of its input and output connectors.
862    /// Depending on the number and types of connectors, this can take a long time.
863    /// To accelerate the process, multiple connectors are initialized concurrently.
864    /// This option controls the maximum number of connectors that can be initialized
865    /// in parallel.
866    ///
867    /// The default is 10.
868    pub max_parallel_connector_init: Option<u64>,
869
870    /// Specification of additional (sidecar) containers.
871    pub init_containers: Option<serde_json::Value>,
872
873    /// Deprecated: setting this true or false does not have an effect anymore.
874    pub checkpoint_during_suspend: bool,
875
876    /// Sets the number of available runtime threads for the http server.
877    ///
878    /// In most cases, this does not need to be set explicitly and
879    /// the default is sufficient. Can be increased in case the
880    /// pipeline HTTP API operations are a bottleneck.
881    ///
882    /// If not specified, the default is set to `workers`.
883    pub http_workers: Option<u64>,
884
885    /// Sets the number of available runtime threads for async IO tasks.
886    ///
887    /// This affects some networking and file I/O operations
888    /// especially adapters and ad-hoc queries.
889    ///
890    /// In most cases, this does not need to be set explicitly and
891    /// the default is sufficient. Can be increased in case
892    /// ingress, egress or ad-hoc queries are a bottleneck.
893    ///
894    /// If not specified, the default is set to `workers`.
895    pub io_workers: Option<u64>,
896
897    /// Environment variables for the pipeline process.
898    ///
899    /// These are key-value pairs injected into the pipeline process environment.
900    /// Some variable names are reserved by the platform and cannot be overridden
901    /// (for example `RUST_LOG`, and variables in the `FELDERA_`,
902    /// `KUBERNETES_`, and `TOKIO_` namespaces).
903    #[serde(default)]
904    pub env: BTreeMap<String, String>,
905
906    /// Optional settings for tweaking Feldera internals.
907    ///
908    /// The available key-value pairs change from one version of Feldera to
909    /// another, so users should not depend on particular settings being
910    /// available, or on their behavior.
911    pub dev_tweaks: BTreeMap<String, serde_json::Value>,
912
913    /// Log filtering directives.
914    ///
915    /// If set to a valid [tracing-subscriber] filter, this controls the log
916    /// messages emitted by the pipeline process.  Otherwise, or if the filter
917    /// has invalid syntax, messages at "info" severity and higher are written
918    /// to the log and all others are discarded.
919    ///
920    /// [tracing-subscriber]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
921    pub logging: Option<String>,
922
923    /// ConfigMap containing a custom pipeline template (Enterprise only).
924    ///
925    /// This feature is only available in Feldera Enterprise. If set, the Kubernetes runner
926    /// will read the template from the specified ConfigMap and use it instead of the default
927    /// StatefulSet template for the configured pipeline.
928    ///
929    /// check [`PipelineTemplateConfig`] documentation for details.
930    pub pipeline_template_configmap: Option<PipelineTemplateConfig>,
931}
932
933/// Accepts "true" and "false" and converts them to the new format.
934fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
935where
936    D: Deserializer<'de>,
937{
938    struct BoolOrStruct;
939
940    impl<'de> Visitor<'de> for BoolOrStruct {
941        type Value = Option<StorageOptions>;
942
943        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
944            formatter.write_str("boolean or StorageOptions")
945        }
946
947        fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
948        where
949            E: de::Error,
950        {
951            match v {
952                false => Ok(None),
953                true => Ok(Some(StorageOptions::default())),
954            }
955        }
956
957        fn visit_unit<E>(self) -> Result<Self::Value, E>
958        where
959            E: de::Error,
960        {
961            Ok(None)
962        }
963
964        fn visit_none<E>(self) -> Result<Self::Value, E>
965        where
966            E: de::Error,
967        {
968            Ok(None)
969        }
970
971        fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
972        where
973            M: MapAccess<'de>,
974        {
975            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
976        }
977    }
978
979    deserializer.deserialize_any(BoolOrStruct)
980}
981
982/// Accepts very old 'initial_state' and 'latest_checkpoint' as enabling fault
983/// tolerance.
984///
985/// Accepts `null` as disabling fault tolerance.
986///
987/// Otherwise, deserializes [FtConfig] in the way that one might otherwise
988/// expect.
989fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<FtConfig, D::Error>
990where
991    D: Deserializer<'de>,
992{
993    struct StringOrStruct;
994
995    impl<'de> Visitor<'de> for StringOrStruct {
996        type Value = FtConfig;
997
998        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
999            formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
1000        }
1001
1002        fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1003        where
1004            E: de::Error,
1005        {
1006            match v {
1007                "initial_state" | "latest_checkpoint" => Ok(FtConfig {
1008                    model: Some(FtModel::default()),
1009                    ..FtConfig::default()
1010                }),
1011                _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
1012            }
1013        }
1014
1015        fn visit_unit<E>(self) -> Result<Self::Value, E>
1016        where
1017            E: de::Error,
1018        {
1019            Ok(FtConfig::default())
1020        }
1021
1022        fn visit_none<E>(self) -> Result<Self::Value, E>
1023        where
1024            E: de::Error,
1025        {
1026            Ok(FtConfig::default())
1027        }
1028
1029        fn visit_map<M>(self, map: M) -> Result<FtConfig, M::Error>
1030        where
1031            M: MapAccess<'de>,
1032        {
1033            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))
1034        }
1035    }
1036
1037    deserializer.deserialize_any(StringOrStruct)
1038}
1039
1040impl Default for RuntimeConfig {
1041    fn default() -> Self {
1042        Self {
1043            workers: 8,
1044            hosts: 1,
1045            storage: Some(StorageOptions::default()),
1046            fault_tolerance: FtConfig::default(),
1047            cpu_profiler: true,
1048            tracing: {
1049                // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
1050                // to keep it on by default.
1051                false
1052            },
1053            tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
1054            min_batch_size_records: 0,
1055            max_buffering_delay_usecs: 0,
1056            resources: ResourceConfig::default(),
1057            clock_resolution_usecs: { Some(DEFAULT_CLOCK_RESOLUTION_USECS) },
1058            pin_cpus: Vec::new(),
1059            provisioning_timeout_secs: None,
1060            max_parallel_connector_init: None,
1061            init_containers: None,
1062            checkpoint_during_suspend: true,
1063            io_workers: None,
1064            http_workers: None,
1065            env: BTreeMap::default(),
1066            dev_tweaks: BTreeMap::default(),
1067            logging: None,
1068            pipeline_template_configmap: None,
1069        }
1070    }
1071}
1072
1073/// Fault-tolerance configuration.
1074///
1075/// The default [FtConfig] (via [FtConfig::default]) disables fault tolerance,
1076/// which is the configuration that one gets if [RuntimeConfig] omits fault
1077/// tolerance configuration.
1078///
1079/// The default value for [FtConfig::model] enables fault tolerance, as
1080/// `Some(FtModel::default())`.  This is the configuration that one gets if
1081/// [RuntimeConfig] includes a fault tolerance configuration but does not
1082/// specify a particular model.
1083#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1084#[serde(rename_all = "snake_case")]
1085pub struct FtConfig {
1086    /// Fault tolerance model to use.
1087    #[serde(with = "none_as_string")]
1088    #[serde(default = "default_model")]
1089    #[schema(
1090        schema_with = none_as_string_schema::<FtModel>,
1091    )]
1092    pub model: Option<FtModel>,
1093
1094    /// Interval between automatic checkpoints, in seconds.
1095    ///
1096    /// The default is 60 seconds.  Values less than 1 or greater than 3600 will
1097    /// be forced into that range.
1098    #[serde(default = "default_checkpoint_interval_secs")]
1099    pub checkpoint_interval_secs: Option<u64>,
1100}
1101
1102fn default_model() -> Option<FtModel> {
1103    Some(FtModel::default())
1104}
1105
1106pub fn default_checkpoint_interval_secs() -> Option<u64> {
1107    Some(60)
1108}
1109
1110impl Default for FtConfig {
1111    fn default() -> Self {
1112        Self {
1113            model: None,
1114            checkpoint_interval_secs: default_checkpoint_interval_secs(),
1115        }
1116    }
1117}
1118
1119#[cfg(test)]
1120mod test {
1121    use super::deserialize_fault_tolerance;
1122    use crate::config::{FtConfig, FtModel};
1123    use serde::{Deserialize, Serialize};
1124
1125    #[test]
1126    fn ft_config() {
1127        #[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)]
1128        #[serde(default)]
1129        struct Wrapper {
1130            #[serde(deserialize_with = "deserialize_fault_tolerance")]
1131            config: FtConfig,
1132        }
1133
1134        // Omitting FtConfig, or specifying null, or specifying model "none", disables fault tolerance.
1135        for s in [
1136            "{}",
1137            r#"{"config": null}"#,
1138            r#"{"config": {"model": "none"}}"#,
1139        ] {
1140            let config: Wrapper = serde_json::from_str(s).unwrap();
1141            assert_eq!(
1142                config,
1143                Wrapper {
1144                    config: FtConfig {
1145                        model: None,
1146                        checkpoint_interval_secs: Some(60)
1147                    }
1148                }
1149            );
1150        }
1151
1152        // Serializing disabled FT produces explicit "none" form.
1153        let s = serde_json::to_string(&Wrapper {
1154            config: FtConfig::default(),
1155        })
1156        .unwrap();
1157        assert!(s.contains("\"none\""));
1158
1159        // `{}` for FtConfig, or `{...}` with `model` omitted, enables fault
1160        // tolerance.
1161        for s in [r#"{"config": {}}"#, r#"{"checkpoint_interval_secs": 60}"#] {
1162            assert_eq!(
1163                serde_json::from_str::<FtConfig>(s).unwrap(),
1164                FtConfig {
1165                    model: Some(FtModel::default()),
1166                    checkpoint_interval_secs: Some(60)
1167                }
1168            );
1169        }
1170
1171        // `"checkpoint_interval_secs": null` disables periodic checkpointing.
1172        assert_eq!(
1173            serde_json::from_str::<FtConfig>(r#"{"checkpoint_interval_secs": null}"#).unwrap(),
1174            FtConfig {
1175                model: Some(FtModel::default()),
1176                checkpoint_interval_secs: None
1177            }
1178        );
1179    }
1180}
1181
1182impl FtConfig {
1183    pub fn is_enabled(&self) -> bool {
1184        self.model.is_some()
1185    }
1186
1187    /// Returns the checkpoint interval, if fault tolerance is enabled, and
1188    /// otherwise `None`.
1189    pub fn checkpoint_interval(&self) -> Option<Duration> {
1190        if self.is_enabled() {
1191            self.checkpoint_interval_secs
1192                .map(|interval| Duration::from_secs(interval.clamp(1, 3600)))
1193        } else {
1194            None
1195        }
1196    }
1197}
1198
1199/// Serde implementation for de/serializing a string into `Option<T>` where
1200/// `"none"` indicates `None` and any other string indicates `Some`.
1201///
1202/// This could be extended to handle non-strings by adding more forwarding
1203/// `visit_*` methods to the Visitor implementation.  I don't see a way to write
1204/// them automatically.
1205mod none_as_string {
1206    use std::marker::PhantomData;
1207
1208    use serde::de::{Deserialize, Deserializer, IntoDeserializer, Visitor};
1209    use serde::ser::{Serialize, Serializer};
1210
1211    pub(super) fn serialize<S, T>(value: &Option<T>, serializer: S) -> Result<S::Ok, S::Error>
1212    where
1213        S: Serializer,
1214        T: Serialize,
1215    {
1216        match value.as_ref() {
1217            Some(value) => value.serialize(serializer),
1218            None => "none".serialize(serializer),
1219        }
1220    }
1221
1222    struct NoneAsString<T>(PhantomData<fn() -> T>);
1223
1224    impl<'de, T> Visitor<'de> for NoneAsString<T>
1225    where
1226        T: Deserialize<'de>,
1227    {
1228        type Value = Option<T>;
1229
1230        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1231            formatter.write_str("string")
1232        }
1233
1234        fn visit_none<E>(self) -> Result<Self::Value, E>
1235        where
1236            E: serde::de::Error,
1237        {
1238            Ok(None)
1239        }
1240
1241        fn visit_str<E>(self, value: &str) -> Result<Option<T>, E>
1242        where
1243            E: serde::de::Error,
1244        {
1245            if &value.to_ascii_lowercase() == "none" {
1246                Ok(None)
1247            } else {
1248                Ok(Some(T::deserialize(value.into_deserializer())?))
1249            }
1250        }
1251    }
1252
1253    pub(super) fn deserialize<'de, D, T>(deserializer: D) -> Result<Option<T>, D::Error>
1254    where
1255        D: Deserializer<'de>,
1256        T: Deserialize<'de>,
1257    {
1258        deserializer.deserialize_str(NoneAsString(PhantomData))
1259    }
1260}
1261
1262/// Generates an OpenAPI schema for an `Option<T>` field serialized with `none_as_string`.
1263/// The schema is a `oneOf` with a reference to `T`'s schema and a `"none"` string enum.
1264fn none_as_string_schema<'a, T: ToSchema<'a> + Default + Serialize>() -> Schema {
1265    Schema::OneOf(
1266        OneOfBuilder::new()
1267            .item(RefOr::Ref(Ref::new(format!(
1268                "#/components/schemas/{}",
1269                T::schema().0
1270            ))))
1271            .item(
1272                ObjectBuilder::new()
1273                    .schema_type(SchemaType::String)
1274                    .enum_values(Some(vec!["none"])),
1275            )
1276            .default(Some(
1277                serde_json::to_value(T::default()).expect("Failed to serialize default value"),
1278            ))
1279            .build(),
1280    )
1281}
1282
1283/// Fault tolerance model.
1284///
1285/// The ordering is significant: we consider [Self::ExactlyOnce] to be a "higher
1286/// level" of fault tolerance than [Self::AtLeastOnce].
1287#[derive(
1288    Debug, Copy, Clone, Default, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, ToSchema,
1289)]
1290#[serde(rename_all = "snake_case")]
1291pub enum FtModel {
1292    /// Each record is output at least once.  Crashes may duplicate output, but
1293    /// no input or output is dropped.
1294    AtLeastOnce,
1295
1296    /// Each record is output exactly once.  Crashes do not drop or duplicate
1297    /// input or output.
1298    #[default]
1299    ExactlyOnce,
1300}
1301
1302impl FtModel {
1303    pub fn option_as_str(value: Option<FtModel>) -> &'static str {
1304        value.map_or("no", |model| model.as_str())
1305    }
1306
1307    pub fn as_str(&self) -> &'static str {
1308        match self {
1309            FtModel::AtLeastOnce => "at_least_once",
1310            FtModel::ExactlyOnce => "exactly_once",
1311        }
1312    }
1313}
1314
1315pub struct FtModelUnknown;
1316
1317impl FromStr for FtModel {
1318    type Err = FtModelUnknown;
1319
1320    fn from_str(s: &str) -> Result<Self, Self::Err> {
1321        match s.to_ascii_lowercase().as_str() {
1322            "exactly_once" => Ok(Self::ExactlyOnce),
1323            "at_least_once" => Ok(Self::AtLeastOnce),
1324            _ => Err(FtModelUnknown),
1325        }
1326    }
1327}
1328
1329/// Describes an input connector configuration
1330#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1331pub struct InputEndpointConfig {
1332    /// The name of the input stream of the circuit that this endpoint is
1333    /// connected to.
1334    pub stream: Cow<'static, str>,
1335
1336    /// Connector configuration.
1337    #[serde(flatten)]
1338    pub connector_config: ConnectorConfig,
1339}
1340
1341/// Deserialize the `start_after` property of a connector configuration.
1342/// It requires a non-standard deserialization because we want to accept
1343/// either a string or an array of strings.
1344fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
1345where
1346    D: Deserializer<'de>,
1347{
1348    let value = Option::<JsonValue>::deserialize(deserializer)?;
1349    match value {
1350        Some(JsonValue::String(s)) => Ok(Some(vec![s])),
1351        Some(JsonValue::Array(arr)) => {
1352            let vec = arr
1353                .into_iter()
1354                .map(|item| {
1355                    item.as_str()
1356                        .map(|s| s.to_string())
1357                        .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
1358                })
1359                .collect::<Result<Vec<String>, _>>()?;
1360            Ok(Some(vec))
1361        }
1362        Some(JsonValue::Null) | None => Ok(None),
1363        _ => Err(serde::de::Error::custom(
1364            "invalid 'start_after' property: expected a string, an array of strings, or null",
1365        )),
1366    }
1367}
1368
1369/// A data connector's configuration
1370#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1371pub struct ConnectorConfig {
1372    /// Transport endpoint configuration.
1373    pub transport: TransportConfig,
1374
1375    #[serde(skip_serializing_if = "Option::is_none")]
1376    pub preprocessor: Option<Vec<PreprocessorConfig>>,
1377
1378    /// Parser configuration.
1379    pub format: Option<FormatConfig>,
1380
1381    /// Name of the index that the connector is attached to.
1382    ///
1383    /// This property is valid for output connectors only.  It is used with data
1384    /// transports and formats that expect output updates in the form of key/value
1385    /// pairs, where the key typically represents a unique id associated with the
1386    /// table or view.
1387    ///
1388    /// To support such output formats, an output connector can be attached to an
1389    /// index created using the SQL CREATE INDEX statement.  An index of a table
1390    /// or view contains the same updates as the table or view itself, indexed by
1391    /// one or more key columns.
1392    ///
1393    /// See individual connector documentation for details on how they work
1394    /// with indexes.
1395    pub index: Option<String>,
1396
1397    /// Output buffer configuration.
1398    #[serde(flatten)]
1399    pub output_buffer_config: OutputBufferConfig,
1400
1401    /// Maximum number of records from this connector to process in a single batch.
1402    ///
1403    /// When set, this caps how many records are taken from the connector’s input
1404    /// buffer and pushed through the circuit at once.
1405    ///
1406    /// This is typically configured lower than `max_queued_records` to allow the
1407    /// connector time to restart and refill its buffer while a batch is being
1408    /// processed.
1409    ///
1410    /// Not all input adapters honor this limit.
1411    ///
1412    /// If this is not set, the batch size is derived from `max_worker_batch_size`.
1413    #[serde(skip_serializing_if = "Option::is_none")]
1414    pub max_batch_size: Option<u64>,
1415
1416    /// Maximum number of records processed per batch, per worker thread.
1417    ///
1418    /// When `max_batch_size` is not set, this setting is used to cap
1419    /// the number of records that can be taken from the connector’s input
1420    /// buffer and pushed through the circuit at once.  The effective batch size is computed as:
1421    /// `max_worker_batch_size × workers`.
1422    ///
1423    /// This provides an alternative to `max_batch_size` that automatically adjusts batch
1424    /// size as the number of worker threads changes to maintain constant amount of
1425    /// work per worker per batch.
1426    ///
1427    /// Defaults to 10,000 records per worker.
1428    #[serde(skip_serializing_if = "Option::is_none")]
1429    pub max_worker_batch_size: Option<u64>,
1430
1431    /// Backpressure threshold.
1432    ///
1433    /// Maximal number of records queued by the endpoint before the endpoint
1434    /// is paused by the backpressure mechanism.
1435    ///
1436    /// For input endpoints, this setting bounds the number of records that have
1437    /// been received from the input transport but haven't yet been consumed by
1438    /// the circuit since the circuit, since the circuit is still busy processing
1439    /// previous inputs.
1440    ///
1441    /// For output endpoints, this setting bounds the number of records that have
1442    /// been produced by the circuit but not yet sent via the output transport endpoint
1443    /// nor stored in the output buffer (see `enable_output_buffer`).
1444    ///
1445    /// Note that this is not a hard bound: there can be a small delay between
1446    /// the backpressure mechanism is triggered and the endpoint is paused, during
1447    /// which more data may be queued.
1448    ///
1449    /// The default is 1 million.
1450    #[serde(default = "default_max_queued_records")]
1451    pub max_queued_records: u64,
1452
1453    /// Create connector in paused state.
1454    ///
1455    /// The default is `false`.
1456    #[serde(default)]
1457    pub paused: bool,
1458
1459    /// Arbitrary user-defined text labels associated with the connector.
1460    ///
1461    /// These labels can be used in conjunction with the `start_after` property
1462    /// to control the start order of connectors.
1463    #[serde(default)]
1464    pub labels: Vec<String>,
1465
1466    /// Start the connector after all connectors with specified labels.
1467    ///
1468    /// This property is used to control the start order of connectors.
1469    /// The connector will not start until all connectors with the specified
1470    /// labels have finished processing all inputs.
1471    #[serde(deserialize_with = "deserialize_start_after")]
1472    #[serde(default)]
1473    pub start_after: Option<Vec<String>>,
1474}
1475
1476impl ConnectorConfig {
1477    /// Compare two configs modulo the `paused` field.
1478    ///
1479    /// Used to compare checkpointed and current connector configs.
1480    pub fn equal_modulo_paused(&self, other: &Self) -> bool {
1481        let mut a = self.clone();
1482        let mut b = other.clone();
1483        a.paused = false;
1484        b.paused = false;
1485        a == b
1486    }
1487}
1488
1489#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1490#[serde(default)]
1491pub struct OutputBufferConfig {
1492    /// Enable output buffering.
1493    ///
1494    /// The output buffering mechanism allows decoupling the rate at which the pipeline
1495    /// pushes changes to the output transport from the rate of input changes.
1496    ///
1497    /// By default, output updates produced by the pipeline are pushed directly to
1498    /// the output transport. Some destinations may prefer to receive updates in fewer
1499    /// bigger batches. For instance, when writing Parquet files, producing
1500    /// one bigger file every few minutes is usually better than creating
1501    /// small files every few milliseconds.
1502    ///
1503    /// To achieve such input/output decoupling, users can enable output buffering by
1504    /// setting the `enable_output_buffer` flag to `true`.  When buffering is enabled, output
1505    /// updates produced by the pipeline are consolidated in an internal buffer and are
1506    /// pushed to the output transport when one of several conditions is satisfied:
1507    ///
1508    /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
1509    ///   milliseconds.
1510    /// * buffer size exceeds `max_output_buffer_size_records` records.
1511    ///
1512    /// This flag is `false` by default.
1513    // TODO: on-demand output triggered via the API.
1514    pub enable_output_buffer: bool,
1515
1516    /// Maximum time in milliseconds data is kept in the output buffer.
1517    ///
1518    /// By default, data is kept in the buffer indefinitely until one of
1519    /// the other output conditions is satisfied.  When this option is
1520    /// set the buffer will be flushed at most every
1521    /// `max_output_buffer_time_millis` milliseconds.
1522    ///
1523    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1524    /// to be set.
1525    pub max_output_buffer_time_millis: usize,
1526
1527    /// Maximum number of updates to be kept in the output buffer.
1528    ///
1529    /// This parameter bounds the maximal size of the buffer.
1530    /// Note that the size of the buffer is not always equal to the
1531    /// total number of updates output by the pipeline. Updates to the
1532    /// same record can overwrite or cancel previous updates.
1533    ///
1534    /// By default, the buffer can grow indefinitely until one of
1535    /// the other output conditions is satisfied.
1536    ///
1537    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1538    /// to be set.
1539    pub max_output_buffer_size_records: usize,
1540}
1541
1542impl Default for OutputBufferConfig {
1543    fn default() -> Self {
1544        Self {
1545            enable_output_buffer: false,
1546            max_output_buffer_size_records: usize::MAX,
1547            max_output_buffer_time_millis: usize::MAX,
1548        }
1549    }
1550}
1551
1552impl OutputBufferConfig {
1553    pub fn validate(&self) -> Result<(), String> {
1554        if self.enable_output_buffer
1555            && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
1556            && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
1557        {
1558            return Err(
1559                "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
1560                    .to_string(),
1561            );
1562        }
1563
1564        Ok(())
1565    }
1566}
1567
1568/// Describes an output connector configuration
1569#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1570pub struct OutputEndpointConfig {
1571    /// The name of the output stream of the circuit that this endpoint is
1572    /// connected to.
1573    pub stream: Cow<'static, str>,
1574
1575    /// Connector configuration.
1576    #[serde(flatten)]
1577    pub connector_config: ConnectorConfig,
1578}
1579
1580/// Transport-specific endpoint configuration passed to
1581/// `crate::OutputTransport::new_endpoint`
1582/// and `crate::InputTransport::new_endpoint`.
1583#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1584#[serde(tag = "name", content = "config", rename_all = "snake_case")]
1585pub enum TransportConfig {
1586    FileInput(FileInputConfig),
1587    FileOutput(FileOutputConfig),
1588    NatsInput(NatsInputConfig),
1589    KafkaInput(KafkaInputConfig),
1590    KafkaOutput(KafkaOutputConfig),
1591    PubSubInput(PubSubInputConfig),
1592    UrlInput(UrlInputConfig),
1593    S3Input(S3InputConfig),
1594    DeltaTableInput(DeltaTableReaderConfig),
1595    DeltaTableOutput(DeltaTableWriterConfig),
1596    RedisOutput(RedisOutputConfig),
1597    // Prevent rust from complaining about large size difference between enum variants.
1598    IcebergInput(Box<IcebergReaderConfig>),
1599    PostgresInput(PostgresReaderConfig),
1600    PostgresOutput(PostgresWriterConfig),
1601    Datagen(DatagenInputConfig),
1602    Nexmark(NexmarkInputConfig),
1603    /// Direct HTTP input: cannot be instantiated through API
1604    HttpInput(HttpInputConfig),
1605    /// Direct HTTP output: cannot be instantiated through API
1606    HttpOutput,
1607    /// Ad hoc input: cannot be instantiated through API
1608    AdHocInput(AdHocInputConfig),
1609    ClockInput(ClockConfig),
1610}
1611
1612impl TransportConfig {
1613    pub fn name(&self) -> String {
1614        match self {
1615            TransportConfig::FileInput(_) => "file_input".to_string(),
1616            TransportConfig::FileOutput(_) => "file_output".to_string(),
1617            TransportConfig::NatsInput(_) => "nats_input".to_string(),
1618            TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
1619            TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
1620            TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
1621            TransportConfig::UrlInput(_) => "url_input".to_string(),
1622            TransportConfig::S3Input(_) => "s3_input".to_string(),
1623            TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
1624            TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
1625            TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
1626            TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
1627            TransportConfig::PostgresOutput(_) => "postgres_output".to_string(),
1628            TransportConfig::Datagen(_) => "datagen".to_string(),
1629            TransportConfig::Nexmark(_) => "nexmark".to_string(),
1630            TransportConfig::HttpInput(_) => "http_input".to_string(),
1631            TransportConfig::HttpOutput => "http_output".to_string(),
1632            TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
1633            TransportConfig::RedisOutput(_) => "redis_output".to_string(),
1634            TransportConfig::ClockInput(_) => "clock".to_string(),
1635        }
1636    }
1637
1638    /// Returns true if the connector is transient, i.e., is created and destroyed
1639    /// at runtime on demand, rather than being configured as part of the pipeline.
1640    pub fn is_transient(&self) -> bool {
1641        matches!(
1642            self,
1643            TransportConfig::AdHocInput(_)
1644                | TransportConfig::HttpInput(_)
1645                | TransportConfig::HttpOutput
1646                | TransportConfig::ClockInput(_)
1647        )
1648    }
1649}
1650
1651/// Data format specification used to parse raw data received from the
1652/// endpoint or to encode data sent to the endpoint.
1653#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
1654pub struct FormatConfig {
1655    /// Format name, e.g., "csv", "json", "bincode", etc.
1656    pub name: Cow<'static, str>,
1657
1658    /// Format-specific parser or encoder configuration.
1659    #[serde(default)]
1660    #[schema(value_type = Object)]
1661    pub config: JsonValue,
1662}
1663
1664#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, ToSchema)]
1665#[serde(default)]
1666pub struct ResourceConfig {
1667    /// The minimum number of CPU cores to reserve
1668    /// for an instance of this pipeline
1669    #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1670    pub cpu_cores_min: Option<f64>,
1671
1672    /// The maximum number of CPU cores to reserve
1673    /// for an instance of this pipeline
1674    #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1675    pub cpu_cores_max: Option<f64>,
1676
1677    /// The minimum memory in Megabytes to reserve
1678    /// for an instance of this pipeline
1679    pub memory_mb_min: Option<u64>,
1680
1681    /// The maximum memory in Megabytes to reserve
1682    /// for an instance of this pipeline
1683    pub memory_mb_max: Option<u64>,
1684
1685    /// The total storage in Megabytes to reserve
1686    /// for an instance of this pipeline
1687    pub storage_mb_max: Option<u64>,
1688
1689    /// Storage class to use for an instance of this pipeline.
1690    /// The class determines storage performance such as IOPS and throughput.
1691    pub storage_class: Option<String>,
1692
1693    /// Kubernetes service account name to use for an instance of this pipeline.
1694    /// The account determines permissions and access controls.
1695    pub service_account_name: Option<String>,
1696
1697    /// Kubernetes namespace to use for an instance of this pipeline.
1698    /// The namespace determines the scope of names for resources created
1699    /// for the pipeline.
1700    /// If not set, the pipeline will be deployed in the same namespace
1701    /// as the control-plane.
1702    pub namespace: Option<String>,
1703}