Skip to main content

feldera_types/
config.rs

1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure.  The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs.  We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::program_schema::ProgramSchema;
9use crate::secret_resolver::default_secrets_directory;
10use crate::transport::adhoc::AdHocInputConfig;
11use crate::transport::clock::ClockConfig;
12use crate::transport::datagen::DatagenInputConfig;
13use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
14use crate::transport::file::{FileInputConfig, FileOutputConfig};
15use crate::transport::http::HttpInputConfig;
16use crate::transport::iceberg::IcebergReaderConfig;
17use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
18use crate::transport::nats::NatsInputConfig;
19use crate::transport::nexmark::NexmarkInputConfig;
20use crate::transport::postgres::{PostgresReaderConfig, PostgresWriterConfig};
21use crate::transport::pubsub::PubSubInputConfig;
22use crate::transport::redis::RedisOutputConfig;
23use crate::transport::s3::S3InputConfig;
24use crate::transport::url::UrlInputConfig;
25use core::fmt;
26use feldera_ir::{MirNode, MirNodeId};
27use serde::de::{self, MapAccess, Visitor};
28use serde::{Deserialize, Deserializer, Serialize};
29use serde_json::Value as JsonValue;
30use std::collections::HashMap;
31use std::fmt::Display;
32use std::path::Path;
33use std::str::FromStr;
34use std::time::Duration;
35use std::{borrow::Cow, cmp::max, collections::BTreeMap};
36use utoipa::ToSchema;
37use utoipa::openapi::{ObjectBuilder, OneOfBuilder, Ref, RefOr, Schema, SchemaType};
38
39const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
40
41/// Default value of `ConnectorConfig::max_queued_records`.
42pub const fn default_max_queued_records() -> u64 {
43    1_000_000
44}
45
46pub const DEFAULT_MAX_WORKER_BATCH_SIZE: u64 = 10_000;
47
48pub const DEFAULT_CLOCK_RESOLUTION_USECS: u64 = 1_000_000;
49
50/// Program information included in the pipeline configuration.
51#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)]
52pub struct ProgramIr {
53    /// The MIR of the program.
54    pub mir: HashMap<MirNodeId, MirNode>,
55    /// Program schema.
56    pub program_schema: ProgramSchema,
57}
58
59/// Pipeline deployment configuration.
60/// It represents configuration entries directly provided by the user
61/// (e.g., runtime configuration) and entries derived from the schema
62/// of the compiled program (e.g., connectors). Storage configuration,
63/// if applicable, is set by the runner.
64#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq)]
65pub struct PipelineConfig {
66    /// Global controller configuration.
67    #[serde(flatten)]
68    #[schema(inline)]
69    pub global: RuntimeConfig,
70
71    /// Configuration for multihost pipelines.
72    ///
73    /// The presence of this field indicates that the pipeline is running in
74    /// multihost mode.  In the pod with ordinal 0, this triggers starting the
75    /// coordinator process.  In all pods, this tells the pipeline process to
76    /// await a connection from the coordinator instead of initializing the
77    /// pipeline immediately.
78    pub multihost: Option<MultihostConfig>,
79
80    /// Unique system-generated name of the pipeline (format: `pipeline-<uuid>`).
81    /// It is unique across all tenants and cannot be changed.
82    ///
83    /// The `<uuid>` is also used in the naming of various resources that back the pipeline,
84    /// and as such this name is useful to find/identify corresponding resources.
85    pub name: Option<String>,
86
87    /// Name given by the tenant to the pipeline. It is only unique within the same tenant, and can
88    /// be changed by the tenant when the pipeline is stopped.
89    ///
90    /// Given a specific tenant, it can be used to find/identify a specific pipeline of theirs.
91    pub given_name: Option<String>,
92
93    /// Configuration for persistent storage
94    ///
95    /// If `global.storage` is `Some(_)`, this field must be set to some
96    /// [`StorageConfig`].  If `global.storage` is `None``, the pipeline ignores
97    /// this field.
98    #[serde(default)]
99    pub storage_config: Option<StorageConfig>,
100
101    /// Directory containing values of secrets.
102    ///
103    /// If this is not set, a default directory is used.
104    pub secrets_dir: Option<String>,
105
106    /// Input endpoint configuration.
107    #[serde(default)]
108    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
109
110    /// Output endpoint configuration.
111    #[serde(default)]
112    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
113
114    /// Program information.
115    #[serde(default)]
116    pub program_ir: Option<ProgramIr>,
117}
118
119impl PipelineConfig {
120    pub fn max_parallel_connector_init(&self) -> u64 {
121        max(
122            self.global
123                .max_parallel_connector_init
124                .unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
125            1,
126        )
127    }
128
129    pub fn with_storage(self, storage: Option<(StorageConfig, StorageOptions)>) -> Self {
130        let (storage_config, storage_options) = storage.unzip();
131        Self {
132            global: RuntimeConfig {
133                storage: storage_options,
134                ..self.global
135            },
136            storage_config,
137            ..self
138        }
139    }
140
141    pub fn storage(&self) -> Option<(&StorageConfig, &StorageOptions)> {
142        let storage_options = self.global.storage.as_ref();
143        let storage_config = self.storage_config.as_ref();
144        storage_config.zip(storage_options)
145    }
146
147    /// Returns `self.secrets_dir`, or the default secrets directory if it isn't
148    /// set.
149    pub fn secrets_dir(&self) -> &Path {
150        match &self.secrets_dir {
151            Some(dir) => Path::new(dir.as_str()),
152            None => default_secrets_directory(),
153        }
154    }
155
156    /// Abbreviated config that can be printed in the log on pipeline startup.
157    pub fn display_summary(&self) -> String {
158        // TODO: we may want to further abbreviate connector config.
159        let summary = serde_json::json!({
160            "name": self.name,
161            "given_name": self.given_name,
162            "global": self.global,
163            "storage_config": self.storage_config,
164            "secrets_dir": self.secrets_dir,
165            "inputs": self.inputs,
166            "outputs": self.outputs
167        });
168
169        serde_json::to_string_pretty(&summary).unwrap_or_else(|_| "{}".to_string())
170    }
171}
172
173/// A subset of fields in `PipelineConfig` that are generated by the compiler.
174/// These fields are shipped to the pipeline by the compilation server along with
175/// the program binary.
176// Note: An alternative would be to embed these fields in the program binary itself
177// as static strings. This would work well for program IR, but it would require recompiling
178// the program anytime a connector config changes, whereas today connector changes
179// do not require recompilation.
180#[derive(Default, Deserialize, Serialize, Eq, PartialEq, Debug, Clone)]
181pub struct PipelineConfigProgramInfo {
182    /// Input endpoint configuration.
183    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
184
185    /// Output endpoint configuration.
186    #[serde(default)]
187    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
188
189    /// Program information.
190    #[serde(default)]
191    pub program_ir: Option<ProgramIr>,
192}
193
194/// Configuration for a multihost Feldera pipeline.
195///
196/// This configuration is primarily for the coordinator.
197#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
198pub struct MultihostConfig {
199    /// Number of hosts to launch.
200    ///
201    /// For the configuration to be truly multihost, this should be at least 2.
202    /// A value of 1 still runs the multihost coordinator but it only
203    /// coordinates a single host.
204    pub hosts: usize,
205}
206
207impl Default for MultihostConfig {
208    fn default() -> Self {
209        Self { hosts: 1 }
210    }
211}
212
213/// Configuration for persistent storage in a [`PipelineConfig`].
214#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
215pub struct StorageConfig {
216    /// A directory to keep pipeline state, as a path on the filesystem of the
217    /// machine or container where the pipeline will run.
218    ///
219    /// When storage is enabled, this directory stores the data for
220    /// [StorageBackendConfig::Default].
221    ///
222    /// When fault tolerance is enabled, this directory stores checkpoints and
223    /// the log.
224    pub path: String,
225
226    /// How to cache access to storage in this pipeline.
227    #[serde(default)]
228    pub cache: StorageCacheConfig,
229}
230
231impl StorageConfig {
232    pub fn path(&self) -> &Path {
233        Path::new(&self.path)
234    }
235}
236
237/// How to cache access to storage within a Feldera pipeline.
238#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
239#[serde(rename_all = "snake_case")]
240pub enum StorageCacheConfig {
241    /// Use the operating system's page cache as the primary storage cache.
242    ///
243    /// This is the default because it currently performs better than
244    /// `FelderaCache`.
245    #[default]
246    PageCache,
247
248    /// Use Feldera's internal cache implementation.
249    ///
250    /// This is under development. It will become the default when its
251    /// performance exceeds that of `PageCache`.
252    FelderaCache,
253}
254
255impl StorageCacheConfig {
256    #[cfg(unix)]
257    pub fn to_custom_open_flags(&self) -> i32 {
258        match self {
259            StorageCacheConfig::PageCache => (),
260            StorageCacheConfig::FelderaCache => {
261                #[cfg(target_os = "linux")]
262                return libc::O_DIRECT;
263            }
264        }
265        0
266    }
267}
268
269/// Storage configuration for a pipeline.
270#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
271#[serde(default)]
272pub struct StorageOptions {
273    /// How to connect to the underlying storage.
274    pub backend: StorageBackendConfig,
275
276    /// For a batch of data maintained as part of a persistent index during a
277    /// pipeline run, the minimum estimated number of bytes to write it to
278    /// storage.
279    ///
280    /// This is provided for debugging and fine-tuning and should ordinarily be
281    /// left unset.
282    ///
283    /// A value of 0 will write even empty batches to storage, and nonzero
284    /// values provide a threshold.  `usize::MAX` would effectively disable
285    /// storage for such batches.  The default is 10,048,576 (10 MiB).
286    pub min_storage_bytes: Option<usize>,
287
288    /// For a batch of data passed through the pipeline during a single step,
289    /// the minimum estimated number of bytes to write it to storage.
290    ///
291    /// This is provided for debugging and fine-tuning and should ordinarily be
292    /// left unset.  A value of 0 will write even empty batches to storage, and
293    /// nonzero values provide a threshold.  `usize::MAX`, the default,
294    /// effectively disables storage for such batches.  If it is set to another
295    /// value, it should ordinarily be greater than or equal to
296    /// `min_storage_bytes`.
297    pub min_step_storage_bytes: Option<usize>,
298
299    /// The form of compression to use in data batches.
300    ///
301    /// Compression has a CPU cost but it can take better advantage of limited
302    /// NVMe and network bandwidth, which means that it can increase overall
303    /// performance.
304    pub compression: StorageCompression,
305
306    /// The maximum size of the in-memory storage cache, in MiB.
307    ///
308    /// If set, the specified cache size is spread across all the foreground and
309    /// background threads. If unset, each foreground or background thread cache
310    /// is limited to 256 MiB.
311    pub cache_mib: Option<usize>,
312}
313
314/// Backend storage configuration.
315#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
316#[serde(tag = "name", content = "config", rename_all = "snake_case")]
317pub enum StorageBackendConfig {
318    /// Use the default storage configuration.
319    ///
320    /// This currently uses the local file system.
321    #[default]
322    Default,
323
324    /// Use the local file system.
325    ///
326    /// This uses ordinary system file operations.
327    File(Box<FileBackendConfig>),
328
329    /// Object storage.
330    Object(ObjectStorageConfig),
331}
332
333impl Display for StorageBackendConfig {
334    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
335        match self {
336            StorageBackendConfig::Default => write!(f, "default"),
337            StorageBackendConfig::File(_) => write!(f, "file"),
338            StorageBackendConfig::Object(_) => write!(f, "object"),
339        }
340    }
341}
342
343/// Storage compression algorithm.
344#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
345#[serde(rename_all = "snake_case")]
346pub enum StorageCompression {
347    /// Use Feldera's default compression algorithm.
348    ///
349    /// The default may change as Feldera's performance is tuned and new
350    /// algorithms are introduced.
351    #[default]
352    Default,
353
354    /// Do not compress.
355    None,
356
357    /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
358    Snappy,
359}
360
361#[derive(Debug, Clone, Eq, PartialEq)]
362pub enum StartFromCheckpoint {
363    Latest,
364    Uuid(uuid::Uuid),
365}
366
367impl ToSchema<'_> for StartFromCheckpoint {
368    fn schema() -> (
369        &'static str,
370        utoipa::openapi::RefOr<utoipa::openapi::schema::Schema>,
371    ) {
372        (
373            "StartFromCheckpoint",
374            utoipa::openapi::RefOr::T(Schema::OneOf(
375                OneOfBuilder::new()
376                    .item(
377                        ObjectBuilder::new()
378                            .schema_type(SchemaType::String)
379                            .enum_values(Some(["latest"].into_iter()))
380                            .build(),
381                    )
382                    .item(
383                        ObjectBuilder::new()
384                            .schema_type(SchemaType::String)
385                            .format(Some(utoipa::openapi::SchemaFormat::KnownFormat(
386                                utoipa::openapi::KnownFormat::Uuid,
387                            )))
388                            .build(),
389                    )
390                    .nullable(true)
391                    .build(),
392            )),
393        )
394    }
395}
396
397impl<'de> Deserialize<'de> for StartFromCheckpoint {
398    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
399    where
400        D: Deserializer<'de>,
401    {
402        struct StartFromCheckpointVisitor;
403
404        impl<'de> Visitor<'de> for StartFromCheckpointVisitor {
405            type Value = StartFromCheckpoint;
406
407            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
408                formatter.write_str("a UUID string or the string \"latest\"")
409            }
410
411            fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
412            where
413                E: de::Error,
414            {
415                if value == "latest" {
416                    Ok(StartFromCheckpoint::Latest)
417                } else {
418                    uuid::Uuid::parse_str(value)
419                        .map(StartFromCheckpoint::Uuid)
420                        .map_err(|_| E::invalid_value(serde::de::Unexpected::Str(value), &self))
421                }
422            }
423        }
424
425        deserializer.deserialize_str(StartFromCheckpointVisitor)
426    }
427}
428
429impl Serialize for StartFromCheckpoint {
430    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
431    where
432        S: serde::Serializer,
433    {
434        match self {
435            StartFromCheckpoint::Latest => serializer.serialize_str("latest"),
436            StartFromCheckpoint::Uuid(uuid) => serializer.serialize_str(&uuid.to_string()),
437        }
438    }
439}
440
441#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
442pub struct SyncConfig {
443    /// The endpoint URL for the storage service.
444    ///
445    /// This is typically required for custom or local S3-compatible storage providers like MinIO.
446    /// Example: `http://localhost:9000`
447    ///
448    /// Relevant rclone config key: [`endpoint`](https://rclone.org/s3/#s3-endpoint)
449    pub endpoint: Option<String>,
450
451    /// The name of the storage bucket.
452    ///
453    /// This may include a path to a folder inside the bucket (e.g., `my-bucket/data`).
454    pub bucket: String,
455
456    /// The region that this bucket is in.
457    ///
458    /// Leave empty for Minio or the default region (`us-east-1` for AWS).
459    pub region: Option<String>,
460
461    /// The name of the cloud storage provider (e.g., `"AWS"`, `"Minio"`).
462    ///
463    /// Used for provider-specific behavior in rclone.
464    /// If omitted, defaults to `"Other"`.
465    ///
466    /// See [rclone S3 provider documentation](https://rclone.org/s3/#s3-provider)
467    pub provider: Option<String>,
468
469    /// The access key used to authenticate with the storage provider.
470    ///
471    /// If not provided, rclone will fall back to environment-based credentials, such as
472    /// `RCLONE_S3_ACCESS_KEY_ID`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
473    /// this can be left empty to allow automatic authentication via the pod's service account.
474    pub access_key: Option<String>,
475
476    /// The secret key used together with the access key for authentication.
477    ///
478    /// If not provided, rclone will fall back to environment-based credentials, such as
479    /// `RCLONE_S3_SECRET_ACCESS_KEY`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
480    /// this can be left empty to allow automatic authentication via the pod's service account.
481    pub secret_key: Option<String>,
482
483    /// When set, the pipeline will try fetch the specified checkpoint from the
484    /// object store.
485    ///
486    /// If `fail_if_no_checkpoint` is `true`, the pipeline will fail to initialize.
487    pub start_from_checkpoint: Option<StartFromCheckpoint>,
488
489    /// When true, the pipeline will fail to initialize if fetching the
490    /// specified checkpoint fails (missing, download error).
491    /// When false, the pipeline will start from scratch instead.
492    ///
493    /// False by default.
494    #[schema(default = std::primitive::bool::default)]
495    #[serde(default)]
496    pub fail_if_no_checkpoint: bool,
497
498    /// The number of file transfers to run in parallel.
499    /// Default: 20
500    pub transfers: Option<u8>,
501
502    /// The number of checkers to run in parallel.
503    /// Default: 20
504    pub checkers: Option<u8>,
505
506    /// Set to skip post copy check of checksums, and only check the file sizes.
507    /// This can significantly improve the throughput.
508    /// Defualt: false
509    pub ignore_checksum: Option<bool>,
510
511    /// Number of streams to use for multi-thread downloads.
512    /// Default: 10
513    pub multi_thread_streams: Option<u8>,
514
515    /// Use multi-thread download for files above this size.
516    /// Format: `[size][Suffix]` (Example: 1G, 500M)
517    /// Supported suffixes: k|M|G|T
518    /// Default: 100M
519    pub multi_thread_cutoff: Option<String>,
520
521    /// The number of chunks of the same file that are uploaded for multipart uploads.
522    /// Default: 10
523    pub upload_concurrency: Option<u8>,
524
525    /// When `true`, the pipeline starts in **standby** mode; processing doesn't
526    /// start until activation (`POST /activate`).
527    /// If this pipeline was previously activated and the storage has not been
528    /// cleared, the pipeline will auto activate, no newer checkpoints will be
529    /// fetched.
530    ///
531    /// Standby behavior depends on `start_from_checkpoint`:
532    /// - If `latest`, pipeline continuously fetches the latest available
533    ///   checkpoint until activated.
534    /// - If checkpoint UUID, pipeline fetches this checkpoint once and waits
535    ///   in standby until activated.
536    ///
537    /// Default: `false`
538    #[schema(default = std::primitive::bool::default)]
539    #[serde(default)]
540    pub standby: bool,
541
542    /// The interval (in seconds) between each attempt to fetch the latest
543    /// checkpoint from object store while in standby mode.
544    ///
545    /// Applies only when `start_from_checkpoint` is set to `latest`.
546    ///
547    /// Default: 10 seconds
548    #[schema(default = default_pull_interval)]
549    #[serde(default = "default_pull_interval")]
550    pub pull_interval: u64,
551
552    /// The interval (in seconds) between each push of checkpoints to object store.
553    ///
554    /// Default: disabled (no periodic push).
555    #[serde(default)]
556    pub push_interval: Option<u64>,
557
558    /// Extra flags to pass to `rclone`.
559    ///
560    /// WARNING: Supplying incorrect or conflicting flags can break `rclone`.
561    /// Use with caution.
562    ///
563    /// Refer to the docs to see the supported flags:
564    /// - [Global flags](https://rclone.org/flags/)
565    /// - [S3 specific flags](https://rclone.org/s3/)
566    pub flags: Option<Vec<String>>,
567
568    /// The minimum number of checkpoints to retain in object store.
569    /// No checkpoints will be deleted if the total count is below this threshold.
570    ///
571    /// Default: 10
572    #[schema(default = default_retention_min_count)]
573    #[serde(default = "default_retention_min_count")]
574    pub retention_min_count: u32,
575
576    /// The minimum age (in days) a checkpoint must reach before it becomes
577    /// eligible for deletion. All younger checkpoints will be preserved.
578    ///
579    /// Default: 30
580    #[schema(default = default_retention_min_age)]
581    #[serde(default = "default_retention_min_age")]
582    pub retention_min_age: u32,
583}
584
585fn default_pull_interval() -> u64 {
586    10
587}
588
589fn default_retention_min_count() -> u32 {
590    10
591}
592
593fn default_retention_min_age() -> u32 {
594    30
595}
596
597impl SyncConfig {
598    pub fn validate(&self) -> Result<(), String> {
599        if self.standby && self.start_from_checkpoint.is_none() {
600            return Err(r#"invalid sync config: `standby` set to `true` but `start_from_checkpoint` not set.
601Standby mode requires `start_from_checkpoint` to be set.
602Consider setting `start_from_checkpoint` to `"latest"`."#.to_owned());
603        }
604
605        Ok(())
606    }
607}
608
609/// Configuration for supplying a custom pipeline StatefulSet template via a Kubernetes ConfigMap.
610///
611/// Operators can provide a custom StatefulSet YAML that the Kubernetes runner will use when
612/// creating pipeline StatefulSets for a pipeline. The custom template must be stored as the
613/// value of a key in a ConfigMap in the same namespace as the pipeline; set `name` to the
614/// ConfigMap name and `key` to the entry that contains the template.
615///
616/// Recommendations and requirements:
617/// - **Start from the default template and modify it as needed.** The default template is present
618///   in ConfigMap named as `<release-name>-pipeline-template`, with key `pipelineTemplate` in the release
619///   namespace and should be used as a reference.
620/// - The template must contain a valid Kubernetes `StatefulSet` manifest in YAML form. The
621///   runner substitutes variables in the template before parsing; therefore the final YAML
622///   must be syntactically valid.
623/// - The runner performs simple string substitution for the following placeholders. Please ensure these
624///   placeholders are placed at appropriate location for their semantics:
625///   - `{id}`: pipeline Kubernetes name (used for object names and labels)
626///   - `{namespace}`: Kubernetes namespace where the pipeline runs
627///   - `{pipeline_executor_image}`: container image used to run the pipeline executor
628///   - `{binary_ref}`: program binary reference passed as an argument
629///   - `{program_info_ref}`: program info reference passed as an argument
630///   - `{pipeline_storage_path}`: mount path for persistent pipeline storage
631///   - `{storage_class_name}`: storage class name to use for PVCs (if applicable)
632///   - `{deployment_id}`: UUID identifying the deployment instance
633///   - `{deployment_initial}`: initial desired runtime status (e.g., `provisioning`)
634///   - `{bootstrap_policy}`: bootstrap policy value when applicable
635#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
636pub struct PipelineTemplateConfig {
637    /// Name of the ConfigMap containing the pipeline template.
638    pub name: String,
639    /// Key in the ConfigMap containing the pipeline template.
640    ///
641    /// If not set, defaults to `pipelineTemplate`.
642    #[schema(default = default_pipeline_template_key)]
643    #[serde(default = "default_pipeline_template_key")]
644    pub key: String,
645}
646
647fn default_pipeline_template_key() -> String {
648    "pipelineTemplate".to_string()
649}
650
651#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
652pub struct ObjectStorageConfig {
653    /// URL.
654    ///
655    /// The following URL schemes are supported:
656    ///
657    /// * S3:
658    ///   - `s3://<bucket>/<path>`
659    ///   - `s3a://<bucket>/<path>`
660    ///   - `https://s3.<region>.amazonaws.com/<bucket>`
661    ///   - `https://<bucket>.s3.<region>.amazonaws.com`
662    ///   - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
663    /// * Google Cloud Storage:
664    ///   - `gs://<bucket>/<path>`
665    /// * Microsoft Azure Blob Storage:
666    ///   - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
667    ///   - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
668    ///   - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
669    ///   - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
670    ///   - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
671    ///   - `azure://<container>/<path>` (custom)
672    ///   - `https://<account>.dfs.core.windows.net`
673    ///   - `https://<account>.blob.core.windows.net`
674    ///   - `https://<account>.blob.core.windows.net/<container>`
675    ///   - `https://<account>.dfs.fabric.microsoft.com`
676    ///   - `https://<account>.dfs.fabric.microsoft.com/<container>`
677    ///   - `https://<account>.blob.fabric.microsoft.com`
678    ///   - `https://<account>.blob.fabric.microsoft.com/<container>`
679    ///
680    /// Settings derived from the URL will override other settings.
681    pub url: String,
682
683    /// Additional options as key-value pairs.
684    ///
685    /// The following keys are supported:
686    ///
687    /// * S3:
688    ///   - `access_key_id`: AWS Access Key.
689    ///   - `secret_access_key`: AWS Secret Access Key.
690    ///   - `region`: Region.
691    ///   - `default_region`: Default region.
692    ///   - `endpoint`: Custom endpoint for communicating with S3,
693    ///     e.g. `https://localhost:4566` for testing against a localstack
694    ///     instance.
695    ///   - `token`: Token to use for requests (passed to underlying provider).
696    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
697    /// * Google Cloud Storage:
698    ///   - `service_account`: Path to the service account file.
699    ///   - `service_account_key`: The serialized service account key.
700    ///   - `google_application_credentials`: Application credentials path.
701    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
702    /// * Microsoft Azure Blob Storage:
703    ///   - `access_key`: Azure Access Key.
704    ///   - `container_name`: Azure Container Name.
705    ///   - `account`: Azure Account.
706    ///   - `bearer_token_authorization`: Static bearer token for authorizing requests.
707    ///   - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
708    ///   - `client_secret`: Client secret for use in client secret flow.
709    ///   - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
710    ///   - `endpoint`: Override the endpoint for communicating with blob storage.
711    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
712    ///
713    /// Options set through the URL take precedence over those set with these
714    /// options.
715    #[serde(flatten)]
716    pub other_options: BTreeMap<String, String>,
717}
718
719/// Configuration for local file system access.
720#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
721#[serde(default)]
722pub struct FileBackendConfig {
723    /// Whether to use background threads for file I/O.
724    ///
725    /// Background threads should improve performance, but they can reduce
726    /// performance if too few cores are available. This is provided for
727    /// debugging and fine-tuning and should ordinarily be left unset.
728    pub async_threads: Option<bool>,
729
730    /// Per-I/O operation sleep duration, in milliseconds.
731    ///
732    /// This is for simulating slow storage devices.  Do not use this in
733    /// production.
734    pub ioop_delay: Option<u64>,
735
736    /// Configuration to synchronize checkpoints to object store.
737    pub sync: Option<SyncConfig>,
738}
739
740/// Global pipeline configuration settings. This is the publicly
741/// exposed type for users to configure pipelines.
742#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
743#[serde(default)]
744pub struct RuntimeConfig {
745    /// Number of DBSP worker threads.
746    ///
747    /// Each DBSP "foreground" worker thread is paired with a "background"
748    /// thread for LSM merging, making the total number of threads twice the
749    /// specified number.
750    ///
751    /// The typical sweet spot for the number of workers is between 4 and 16.
752    /// Each worker increases overall memory consumption for data structures
753    /// used during a step.
754    pub workers: u16,
755
756    /// Number of DBSP hosts.
757    ///
758    /// The worker threads are evenly divided among the hosts.  For single-host
759    /// deployments, this should be 1 (the default).
760    ///
761    /// Multihost pipelines are an enterprise-only preview feature.
762    pub hosts: usize,
763
764    /// Storage configuration.
765    ///
766    /// - If this is `None`, the default, the pipeline's state is kept in
767    ///   in-memory data-structures.  This is useful if the pipeline's state
768    ///   will fit in memory and if the pipeline is ephemeral and does not need
769    ///   to be recovered after a restart. The pipeline will most likely run
770    ///   faster since it does not need to access storage.
771    ///
772    /// - If set, the pipeline's state is kept on storage.  This allows the
773    ///   pipeline to work with state that will not fit into memory. It also
774    ///   allows the state to be checkpointed and recovered across restarts.
775    #[serde(deserialize_with = "deserialize_storage_options")]
776    pub storage: Option<StorageOptions>,
777
778    /// Fault tolerance configuration.
779    #[serde(deserialize_with = "deserialize_fault_tolerance")]
780    pub fault_tolerance: FtConfig,
781
782    /// Enable CPU profiler.
783    ///
784    /// The default value is `true`.
785    pub cpu_profiler: bool,
786
787    /// Enable pipeline tracing.
788    pub tracing: bool,
789
790    /// Jaeger tracing endpoint to send tracing information to.
791    pub tracing_endpoint_jaeger: String,
792
793    /// Minimal input batch size.
794    ///
795    /// The controller delays pushing input records to the circuit until at
796    /// least `min_batch_size_records` records have been received (total
797    /// across all endpoints) or `max_buffering_delay_usecs` microseconds
798    /// have passed since at least one input records has been buffered.
799    /// Defaults to 0.
800    pub min_batch_size_records: u64,
801
802    /// Maximal delay in microseconds to wait for `min_batch_size_records` to
803    /// get buffered by the controller, defaults to 0.
804    pub max_buffering_delay_usecs: u64,
805
806    /// Resource reservations and limits. This is enforced
807    /// only in Feldera Cloud.
808    pub resources: ResourceConfig,
809
810    /// Real-time clock resolution in microseconds.
811    ///
812    /// This parameter controls the execution of queries that use the `NOW()` function.  The output of such
813    /// queries depends on the real-time clock and can change over time without any external
814    /// inputs.  If the query uses `NOW()`, the pipeline will update the clock value and trigger incremental
815    /// recomputation at most each `clock_resolution_usecs` microseconds.  If the query does not use
816    /// `NOW()`, then clock value updates are suppressed and the pipeline ignores this setting.
817    ///
818    /// It is set to 1 second (1,000,000 microseconds) by default.
819    pub clock_resolution_usecs: Option<u64>,
820
821    /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
822    /// its worker threads.  Specify at least twice as many CPU numbers as
823    /// workers.  CPUs are generally numbered starting from 0.  The pipeline
824    /// might not be able to honor CPU pinning requests.
825    ///
826    /// CPU pinning can make pipelines run faster and perform more consistently,
827    /// as long as different pipelines running on the same machine are pinned to
828    /// different CPUs.
829    pub pin_cpus: Vec<usize>,
830
831    /// Timeout in seconds for the `Provisioning` phase of the pipeline.
832    /// Setting this value will override the default of the runner.
833    pub provisioning_timeout_secs: Option<u64>,
834
835    /// The maximum number of connectors initialized in parallel during pipeline
836    /// startup.
837    ///
838    /// At startup, the pipeline must initialize all of its input and output connectors.
839    /// Depending on the number and types of connectors, this can take a long time.
840    /// To accelerate the process, multiple connectors are initialized concurrently.
841    /// This option controls the maximum number of connectors that can be initialized
842    /// in parallel.
843    ///
844    /// The default is 10.
845    pub max_parallel_connector_init: Option<u64>,
846
847    /// Specification of additional (sidecar) containers.
848    pub init_containers: Option<serde_json::Value>,
849
850    /// Deprecated: setting this true or false does not have an effect anymore.
851    pub checkpoint_during_suspend: bool,
852
853    /// Sets the number of available runtime threads for the http server.
854    ///
855    /// In most cases, this does not need to be set explicitly and
856    /// the default is sufficient. Can be increased in case the
857    /// pipeline HTTP API operations are a bottleneck.
858    ///
859    /// If not specified, the default is set to `workers`.
860    pub http_workers: Option<u64>,
861
862    /// Sets the number of available runtime threads for async IO tasks.
863    ///
864    /// This affects some networking and file I/O operations
865    /// especially adapters and ad-hoc queries.
866    ///
867    /// In most cases, this does not need to be set explicitly and
868    /// the default is sufficient. Can be increased in case
869    /// ingress, egress or ad-hoc queries are a bottleneck.
870    ///
871    /// If not specified, the default is set to `workers`.
872    pub io_workers: Option<u64>,
873
874    /// Optional settings for tweaking Feldera internals.
875    ///
876    /// The available key-value pairs change from one version of Feldera to
877    /// another, so users should not depend on particular settings being
878    /// available, or on their behavior.
879    pub dev_tweaks: BTreeMap<String, serde_json::Value>,
880
881    /// Log filtering directives.
882    ///
883    /// If set to a valid [tracing-subscriber] filter, this controls the log
884    /// messages emitted by the pipeline process.  Otherwise, or if the filter
885    /// has invalid syntax, messages at "info" severity and higher are written
886    /// to the log and all others are discarded.
887    ///
888    /// [tracing-subscriber]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
889    pub logging: Option<String>,
890
891    /// ConfigMap containing a custom pipeline template (Enterprise only).
892    ///
893    /// This feature is only available in Feldera Enterprise. If set, the Kubernetes runner
894    /// will read the template from the specified ConfigMap and use it instead of the default
895    /// StatefulSet template for the configured pipeline.
896    ///
897    /// check [`PipelineTemplateConfig`] documentation for details.
898    pub pipeline_template_configmap: Option<PipelineTemplateConfig>,
899}
900
901/// Accepts "true" and "false" and converts them to the new format.
902fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
903where
904    D: Deserializer<'de>,
905{
906    struct BoolOrStruct;
907
908    impl<'de> Visitor<'de> for BoolOrStruct {
909        type Value = Option<StorageOptions>;
910
911        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
912            formatter.write_str("boolean or StorageOptions")
913        }
914
915        fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
916        where
917            E: de::Error,
918        {
919            match v {
920                false => Ok(None),
921                true => Ok(Some(StorageOptions::default())),
922            }
923        }
924
925        fn visit_unit<E>(self) -> Result<Self::Value, E>
926        where
927            E: de::Error,
928        {
929            Ok(None)
930        }
931
932        fn visit_none<E>(self) -> Result<Self::Value, E>
933        where
934            E: de::Error,
935        {
936            Ok(None)
937        }
938
939        fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
940        where
941            M: MapAccess<'de>,
942        {
943            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
944        }
945    }
946
947    deserializer.deserialize_any(BoolOrStruct)
948}
949
950/// Accepts very old 'initial_state' and 'latest_checkpoint' as enabling fault
951/// tolerance.
952///
953/// Accepts `null` as disabling fault tolerance.
954///
955/// Otherwise, deserializes [FtConfig] in the way that one might otherwise
956/// expect.
957fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<FtConfig, D::Error>
958where
959    D: Deserializer<'de>,
960{
961    struct StringOrStruct;
962
963    impl<'de> Visitor<'de> for StringOrStruct {
964        type Value = FtConfig;
965
966        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
967            formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
968        }
969
970        fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
971        where
972            E: de::Error,
973        {
974            match v {
975                "initial_state" | "latest_checkpoint" => Ok(FtConfig {
976                    model: Some(FtModel::default()),
977                    ..FtConfig::default()
978                }),
979                _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
980            }
981        }
982
983        fn visit_unit<E>(self) -> Result<Self::Value, E>
984        where
985            E: de::Error,
986        {
987            Ok(FtConfig::default())
988        }
989
990        fn visit_none<E>(self) -> Result<Self::Value, E>
991        where
992            E: de::Error,
993        {
994            Ok(FtConfig::default())
995        }
996
997        fn visit_map<M>(self, map: M) -> Result<FtConfig, M::Error>
998        where
999            M: MapAccess<'de>,
1000        {
1001            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))
1002        }
1003    }
1004
1005    deserializer.deserialize_any(StringOrStruct)
1006}
1007
1008impl Default for RuntimeConfig {
1009    fn default() -> Self {
1010        Self {
1011            workers: 8,
1012            hosts: 1,
1013            storage: Some(StorageOptions::default()),
1014            fault_tolerance: FtConfig::default(),
1015            cpu_profiler: true,
1016            tracing: {
1017                // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
1018                // to keep it on by default.
1019                false
1020            },
1021            tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
1022            min_batch_size_records: 0,
1023            max_buffering_delay_usecs: 0,
1024            resources: ResourceConfig::default(),
1025            clock_resolution_usecs: { Some(DEFAULT_CLOCK_RESOLUTION_USECS) },
1026            pin_cpus: Vec::new(),
1027            provisioning_timeout_secs: None,
1028            max_parallel_connector_init: None,
1029            init_containers: None,
1030            checkpoint_during_suspend: true,
1031            io_workers: None,
1032            http_workers: None,
1033            dev_tweaks: BTreeMap::default(),
1034            logging: None,
1035            pipeline_template_configmap: None,
1036        }
1037    }
1038}
1039
1040/// Fault-tolerance configuration.
1041///
1042/// The default [FtConfig] (via [FtConfig::default]) disables fault tolerance,
1043/// which is the configuration that one gets if [RuntimeConfig] omits fault
1044/// tolerance configuration.
1045///
1046/// The default value for [FtConfig::model] enables fault tolerance, as
1047/// `Some(FtModel::default())`.  This is the configuration that one gets if
1048/// [RuntimeConfig] includes a fault tolerance configuration but does not
1049/// specify a particular model.
1050#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1051#[serde(rename_all = "snake_case")]
1052pub struct FtConfig {
1053    /// Fault tolerance model to use.
1054    #[serde(with = "none_as_string")]
1055    #[serde(default = "default_model")]
1056    #[schema(
1057        schema_with = none_as_string_schema::<FtModel>,
1058    )]
1059    pub model: Option<FtModel>,
1060
1061    /// Interval between automatic checkpoints, in seconds.
1062    ///
1063    /// The default is 60 seconds.  Values less than 1 or greater than 3600 will
1064    /// be forced into that range.
1065    #[serde(default = "default_checkpoint_interval_secs")]
1066    pub checkpoint_interval_secs: Option<u64>,
1067}
1068
1069fn default_model() -> Option<FtModel> {
1070    Some(FtModel::default())
1071}
1072
1073pub fn default_checkpoint_interval_secs() -> Option<u64> {
1074    Some(60)
1075}
1076
1077impl Default for FtConfig {
1078    fn default() -> Self {
1079        Self {
1080            model: None,
1081            checkpoint_interval_secs: default_checkpoint_interval_secs(),
1082        }
1083    }
1084}
1085
1086#[cfg(test)]
1087mod test {
1088    use super::deserialize_fault_tolerance;
1089    use crate::config::{FtConfig, FtModel};
1090    use serde::{Deserialize, Serialize};
1091
1092    #[test]
1093    fn ft_config() {
1094        #[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)]
1095        #[serde(default)]
1096        struct Wrapper {
1097            #[serde(deserialize_with = "deserialize_fault_tolerance")]
1098            config: FtConfig,
1099        }
1100
1101        // Omitting FtConfig, or specifying null, or specifying model "none", disables fault tolerance.
1102        for s in [
1103            "{}",
1104            r#"{"config": null}"#,
1105            r#"{"config": {"model": "none"}}"#,
1106        ] {
1107            let config: Wrapper = serde_json::from_str(s).unwrap();
1108            assert_eq!(
1109                config,
1110                Wrapper {
1111                    config: FtConfig {
1112                        model: None,
1113                        checkpoint_interval_secs: Some(60)
1114                    }
1115                }
1116            );
1117        }
1118
1119        // Serializing disabled FT produces explicit "none" form.
1120        let s = serde_json::to_string(&Wrapper {
1121            config: FtConfig::default(),
1122        })
1123        .unwrap();
1124        assert!(s.contains("\"none\""));
1125
1126        // `{}` for FtConfig, or `{...}` with `model` omitted, enables fault
1127        // tolerance.
1128        for s in [r#"{"config": {}}"#, r#"{"checkpoint_interval_secs": 60}"#] {
1129            assert_eq!(
1130                serde_json::from_str::<FtConfig>(s).unwrap(),
1131                FtConfig {
1132                    model: Some(FtModel::default()),
1133                    checkpoint_interval_secs: Some(60)
1134                }
1135            );
1136        }
1137
1138        // `"checkpoint_interval_secs": null` disables periodic checkpointing.
1139        assert_eq!(
1140            serde_json::from_str::<FtConfig>(r#"{"checkpoint_interval_secs": null}"#).unwrap(),
1141            FtConfig {
1142                model: Some(FtModel::default()),
1143                checkpoint_interval_secs: None
1144            }
1145        );
1146    }
1147}
1148
1149impl FtConfig {
1150    pub fn is_enabled(&self) -> bool {
1151        self.model.is_some()
1152    }
1153
1154    /// Returns the checkpoint interval, if fault tolerance is enabled, and
1155    /// otherwise `None`.
1156    pub fn checkpoint_interval(&self) -> Option<Duration> {
1157        if self.is_enabled() {
1158            self.checkpoint_interval_secs
1159                .map(|interval| Duration::from_secs(interval.clamp(1, 3600)))
1160        } else {
1161            None
1162        }
1163    }
1164}
1165
1166/// Serde implementation for de/serializing a string into `Option<T>` where
1167/// `"none"` indicates `None` and any other string indicates `Some`.
1168///
1169/// This could be extended to handle non-strings by adding more forwarding
1170/// `visit_*` methods to the Visitor implementation.  I don't see a way to write
1171/// them automatically.
1172mod none_as_string {
1173    use std::marker::PhantomData;
1174
1175    use serde::de::{Deserialize, Deserializer, IntoDeserializer, Visitor};
1176    use serde::ser::{Serialize, Serializer};
1177
1178    pub(super) fn serialize<S, T>(value: &Option<T>, serializer: S) -> Result<S::Ok, S::Error>
1179    where
1180        S: Serializer,
1181        T: Serialize,
1182    {
1183        match value.as_ref() {
1184            Some(value) => value.serialize(serializer),
1185            None => "none".serialize(serializer),
1186        }
1187    }
1188
1189    struct NoneAsString<T>(PhantomData<fn() -> T>);
1190
1191    impl<'de, T> Visitor<'de> for NoneAsString<T>
1192    where
1193        T: Deserialize<'de>,
1194    {
1195        type Value = Option<T>;
1196
1197        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1198            formatter.write_str("string")
1199        }
1200
1201        fn visit_none<E>(self) -> Result<Self::Value, E>
1202        where
1203            E: serde::de::Error,
1204        {
1205            Ok(None)
1206        }
1207
1208        fn visit_str<E>(self, value: &str) -> Result<Option<T>, E>
1209        where
1210            E: serde::de::Error,
1211        {
1212            if &value.to_ascii_lowercase() == "none" {
1213                Ok(None)
1214            } else {
1215                Ok(Some(T::deserialize(value.into_deserializer())?))
1216            }
1217        }
1218    }
1219
1220    pub(super) fn deserialize<'de, D, T>(deserializer: D) -> Result<Option<T>, D::Error>
1221    where
1222        D: Deserializer<'de>,
1223        T: Deserialize<'de>,
1224    {
1225        deserializer.deserialize_str(NoneAsString(PhantomData))
1226    }
1227}
1228
1229/// Generates an OpenAPI schema for an `Option<T>` field serialized with `none_as_string`.
1230/// The schema is a `oneOf` with a reference to `T`'s schema and a `"none"` string enum.
1231fn none_as_string_schema<'a, T: ToSchema<'a> + Default + Serialize>() -> Schema {
1232    Schema::OneOf(
1233        OneOfBuilder::new()
1234            .item(RefOr::Ref(Ref::new(format!(
1235                "#/components/schemas/{}",
1236                T::schema().0
1237            ))))
1238            .item(
1239                ObjectBuilder::new()
1240                    .schema_type(SchemaType::String)
1241                    .enum_values(Some(vec!["none"])),
1242            )
1243            .default(Some(
1244                serde_json::to_value(T::default()).expect("Failed to serialize default value"),
1245            ))
1246            .build(),
1247    )
1248}
1249
1250/// Fault tolerance model.
1251///
1252/// The ordering is significant: we consider [Self::ExactlyOnce] to be a "higher
1253/// level" of fault tolerance than [Self::AtLeastOnce].
1254#[derive(
1255    Debug, Copy, Clone, Default, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, ToSchema,
1256)]
1257#[serde(rename_all = "snake_case")]
1258pub enum FtModel {
1259    /// Each record is output at least once.  Crashes may duplicate output, but
1260    /// no input or output is dropped.
1261    AtLeastOnce,
1262
1263    /// Each record is output exactly once.  Crashes do not drop or duplicate
1264    /// input or output.
1265    #[default]
1266    ExactlyOnce,
1267}
1268
1269impl FtModel {
1270    pub fn option_as_str(value: Option<FtModel>) -> &'static str {
1271        value.map_or("no", |model| model.as_str())
1272    }
1273
1274    pub fn as_str(&self) -> &'static str {
1275        match self {
1276            FtModel::AtLeastOnce => "at_least_once",
1277            FtModel::ExactlyOnce => "exactly_once",
1278        }
1279    }
1280}
1281
1282pub struct FtModelUnknown;
1283
1284impl FromStr for FtModel {
1285    type Err = FtModelUnknown;
1286
1287    fn from_str(s: &str) -> Result<Self, Self::Err> {
1288        match s.to_ascii_lowercase().as_str() {
1289            "exactly_once" => Ok(Self::ExactlyOnce),
1290            "at_least_once" => Ok(Self::AtLeastOnce),
1291            _ => Err(FtModelUnknown),
1292        }
1293    }
1294}
1295
1296/// Describes an input connector configuration
1297#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1298pub struct InputEndpointConfig {
1299    /// The name of the input stream of the circuit that this endpoint is
1300    /// connected to.
1301    pub stream: Cow<'static, str>,
1302
1303    /// Connector configuration.
1304    #[serde(flatten)]
1305    pub connector_config: ConnectorConfig,
1306}
1307
1308/// Deserialize the `start_after` property of a connector configuration.
1309/// It requires a non-standard deserialization because we want to accept
1310/// either a string or an array of strings.
1311fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
1312where
1313    D: Deserializer<'de>,
1314{
1315    let value = Option::<JsonValue>::deserialize(deserializer)?;
1316    match value {
1317        Some(JsonValue::String(s)) => Ok(Some(vec![s])),
1318        Some(JsonValue::Array(arr)) => {
1319            let vec = arr
1320                .into_iter()
1321                .map(|item| {
1322                    item.as_str()
1323                        .map(|s| s.to_string())
1324                        .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
1325                })
1326                .collect::<Result<Vec<String>, _>>()?;
1327            Ok(Some(vec))
1328        }
1329        Some(JsonValue::Null) | None => Ok(None),
1330        _ => Err(serde::de::Error::custom(
1331            "invalid 'start_after' property: expected a string, an array of strings, or null",
1332        )),
1333    }
1334}
1335
1336/// A data connector's configuration
1337#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1338pub struct ConnectorConfig {
1339    /// Transport endpoint configuration.
1340    pub transport: TransportConfig,
1341
1342    /// Parser configuration.
1343    pub format: Option<FormatConfig>,
1344
1345    /// Name of the index that the connector is attached to.
1346    ///
1347    /// This property is valid for output connectors only.  It is used with data
1348    /// transports and formats that expect output updates in the form of key/value
1349    /// pairs, where the key typically represents a unique id associated with the
1350    /// table or view.
1351    ///
1352    /// To support such output formats, an output connector can be attached to an
1353    /// index created using the SQL CREATE INDEX statement.  An index of a table
1354    /// or view contains the same updates as the table or view itself, indexed by
1355    /// one or more key columns.
1356    ///
1357    /// See individual connector documentation for details on how they work
1358    /// with indexes.
1359    pub index: Option<String>,
1360
1361    /// Output buffer configuration.
1362    #[serde(flatten)]
1363    pub output_buffer_config: OutputBufferConfig,
1364
1365    /// Maximum number of records from this connector to process in a single batch.
1366    ///
1367    /// When set, this caps how many records are taken from the connector’s input
1368    /// buffer and pushed through the circuit at once.
1369    ///
1370    /// This is typically configured lower than `max_queued_records` to allow the
1371    /// connector time to restart and refill its buffer while a batch is being
1372    /// processed.
1373    ///
1374    /// Not all input adapters honor this limit.
1375    ///
1376    /// If this is not set, the batch size is derived from `max_worker_batch_size`.
1377    #[serde(skip_serializing_if = "Option::is_none")]
1378    pub max_batch_size: Option<u64>,
1379
1380    /// Maximum number of records processed per batch, per worker thread.
1381    ///
1382    /// When `max_batch_size` is not set, this setting is used to cap
1383    /// the number of records that can be taken from the connector’s input
1384    /// buffer and pushed through the circuit at once.  The effective batch size is computed as:
1385    /// `max_worker_batch_size × workers`.
1386    ///
1387    /// This provides an alternative to `max_batch_size` that automatically adjusts batch
1388    /// size as the number of worker threads changes to maintain constant amount of
1389    /// work per worker per batch.
1390    ///
1391    /// Defaults to 10,000 records per worker.
1392    #[serde(skip_serializing_if = "Option::is_none")]
1393    pub max_worker_batch_size: Option<u64>,
1394
1395    /// Backpressure threshold.
1396    ///
1397    /// Maximal number of records queued by the endpoint before the endpoint
1398    /// is paused by the backpressure mechanism.
1399    ///
1400    /// For input endpoints, this setting bounds the number of records that have
1401    /// been received from the input transport but haven't yet been consumed by
1402    /// the circuit since the circuit, since the circuit is still busy processing
1403    /// previous inputs.
1404    ///
1405    /// For output endpoints, this setting bounds the number of records that have
1406    /// been produced by the circuit but not yet sent via the output transport endpoint
1407    /// nor stored in the output buffer (see `enable_output_buffer`).
1408    ///
1409    /// Note that this is not a hard bound: there can be a small delay between
1410    /// the backpressure mechanism is triggered and the endpoint is paused, during
1411    /// which more data may be queued.
1412    ///
1413    /// The default is 1 million.
1414    #[serde(default = "default_max_queued_records")]
1415    pub max_queued_records: u64,
1416
1417    /// Create connector in paused state.
1418    ///
1419    /// The default is `false`.
1420    #[serde(default)]
1421    pub paused: bool,
1422
1423    /// Arbitrary user-defined text labels associated with the connector.
1424    ///
1425    /// These labels can be used in conjunction with the `start_after` property
1426    /// to control the start order of connectors.
1427    #[serde(default)]
1428    pub labels: Vec<String>,
1429
1430    /// Start the connector after all connectors with specified labels.
1431    ///
1432    /// This property is used to control the start order of connectors.
1433    /// The connector will not start until all connectors with the specified
1434    /// labels have finished processing all inputs.
1435    #[serde(deserialize_with = "deserialize_start_after")]
1436    #[serde(default)]
1437    pub start_after: Option<Vec<String>>,
1438}
1439
1440impl ConnectorConfig {
1441    /// Compare two configs modulo the `paused` field.
1442    ///
1443    /// Used to compare checkpointed and current connector configs.
1444    pub fn equal_modulo_paused(&self, other: &Self) -> bool {
1445        let mut a = self.clone();
1446        let mut b = other.clone();
1447        a.paused = false;
1448        b.paused = false;
1449        a == b
1450    }
1451}
1452
1453#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1454#[serde(default)]
1455pub struct OutputBufferConfig {
1456    /// Enable output buffering.
1457    ///
1458    /// The output buffering mechanism allows decoupling the rate at which the pipeline
1459    /// pushes changes to the output transport from the rate of input changes.
1460    ///
1461    /// By default, output updates produced by the pipeline are pushed directly to
1462    /// the output transport. Some destinations may prefer to receive updates in fewer
1463    /// bigger batches. For instance, when writing Parquet files, producing
1464    /// one bigger file every few minutes is usually better than creating
1465    /// small files every few milliseconds.
1466    ///
1467    /// To achieve such input/output decoupling, users can enable output buffering by
1468    /// setting the `enable_output_buffer` flag to `true`.  When buffering is enabled, output
1469    /// updates produced by the pipeline are consolidated in an internal buffer and are
1470    /// pushed to the output transport when one of several conditions is satisfied:
1471    ///
1472    /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
1473    ///   milliseconds.
1474    /// * buffer size exceeds `max_output_buffer_size_records` records.
1475    ///
1476    /// This flag is `false` by default.
1477    // TODO: on-demand output triggered via the API.
1478    pub enable_output_buffer: bool,
1479
1480    /// Maximum time in milliseconds data is kept in the output buffer.
1481    ///
1482    /// By default, data is kept in the buffer indefinitely until one of
1483    /// the other output conditions is satisfied.  When this option is
1484    /// set the buffer will be flushed at most every
1485    /// `max_output_buffer_time_millis` milliseconds.
1486    ///
1487    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1488    /// to be set.
1489    pub max_output_buffer_time_millis: usize,
1490
1491    /// Maximum number of updates to be kept in the output buffer.
1492    ///
1493    /// This parameter bounds the maximal size of the buffer.
1494    /// Note that the size of the buffer is not always equal to the
1495    /// total number of updates output by the pipeline. Updates to the
1496    /// same record can overwrite or cancel previous updates.
1497    ///
1498    /// By default, the buffer can grow indefinitely until one of
1499    /// the other output conditions is satisfied.
1500    ///
1501    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1502    /// to be set.
1503    pub max_output_buffer_size_records: usize,
1504}
1505
1506impl Default for OutputBufferConfig {
1507    fn default() -> Self {
1508        Self {
1509            enable_output_buffer: false,
1510            max_output_buffer_size_records: usize::MAX,
1511            max_output_buffer_time_millis: usize::MAX,
1512        }
1513    }
1514}
1515
1516impl OutputBufferConfig {
1517    pub fn validate(&self) -> Result<(), String> {
1518        if self.enable_output_buffer
1519            && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
1520            && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
1521        {
1522            return Err(
1523                "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
1524                    .to_string(),
1525            );
1526        }
1527
1528        Ok(())
1529    }
1530}
1531
1532/// Describes an output connector configuration
1533#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1534pub struct OutputEndpointConfig {
1535    /// The name of the output stream of the circuit that this endpoint is
1536    /// connected to.
1537    pub stream: Cow<'static, str>,
1538
1539    /// Connector configuration.
1540    #[serde(flatten)]
1541    pub connector_config: ConnectorConfig,
1542}
1543
1544/// Transport-specific endpoint configuration passed to
1545/// `crate::OutputTransport::new_endpoint`
1546/// and `crate::InputTransport::new_endpoint`.
1547#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1548#[serde(tag = "name", content = "config", rename_all = "snake_case")]
1549pub enum TransportConfig {
1550    FileInput(FileInputConfig),
1551    FileOutput(FileOutputConfig),
1552    NatsInput(NatsInputConfig),
1553    KafkaInput(KafkaInputConfig),
1554    KafkaOutput(KafkaOutputConfig),
1555    PubSubInput(PubSubInputConfig),
1556    UrlInput(UrlInputConfig),
1557    S3Input(S3InputConfig),
1558    DeltaTableInput(DeltaTableReaderConfig),
1559    DeltaTableOutput(DeltaTableWriterConfig),
1560    RedisOutput(RedisOutputConfig),
1561    // Prevent rust from complaining about large size difference between enum variants.
1562    IcebergInput(Box<IcebergReaderConfig>),
1563    PostgresInput(PostgresReaderConfig),
1564    PostgresOutput(PostgresWriterConfig),
1565    Datagen(DatagenInputConfig),
1566    Nexmark(NexmarkInputConfig),
1567    /// Direct HTTP input: cannot be instantiated through API
1568    HttpInput(HttpInputConfig),
1569    /// Direct HTTP output: cannot be instantiated through API
1570    HttpOutput,
1571    /// Ad hoc input: cannot be instantiated through API
1572    AdHocInput(AdHocInputConfig),
1573    ClockInput(ClockConfig),
1574}
1575
1576impl TransportConfig {
1577    pub fn name(&self) -> String {
1578        match self {
1579            TransportConfig::FileInput(_) => "file_input".to_string(),
1580            TransportConfig::FileOutput(_) => "file_output".to_string(),
1581            TransportConfig::NatsInput(_) => "nats_input".to_string(),
1582            TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
1583            TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
1584            TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
1585            TransportConfig::UrlInput(_) => "url_input".to_string(),
1586            TransportConfig::S3Input(_) => "s3_input".to_string(),
1587            TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
1588            TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
1589            TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
1590            TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
1591            TransportConfig::PostgresOutput(_) => "postgres_output".to_string(),
1592            TransportConfig::Datagen(_) => "datagen".to_string(),
1593            TransportConfig::Nexmark(_) => "nexmark".to_string(),
1594            TransportConfig::HttpInput(_) => "http_input".to_string(),
1595            TransportConfig::HttpOutput => "http_output".to_string(),
1596            TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
1597            TransportConfig::RedisOutput(_) => "redis_output".to_string(),
1598            TransportConfig::ClockInput(_) => "clock".to_string(),
1599        }
1600    }
1601
1602    /// Returns true if the connector is transient, i.e., is created and destroyed
1603    /// at runtime on demand, rather than being configured as part of the pipeline.
1604    pub fn is_transient(&self) -> bool {
1605        matches!(
1606            self,
1607            TransportConfig::AdHocInput(_)
1608                | TransportConfig::HttpInput(_)
1609                | TransportConfig::HttpOutput
1610                | TransportConfig::ClockInput(_)
1611        )
1612    }
1613}
1614
1615/// Data format specification used to parse raw data received from the
1616/// endpoint or to encode data sent to the endpoint.
1617#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
1618pub struct FormatConfig {
1619    /// Format name, e.g., "csv", "json", "bincode", etc.
1620    pub name: Cow<'static, str>,
1621
1622    /// Format-specific parser or encoder configuration.
1623    #[serde(default)]
1624    #[schema(value_type = Object)]
1625    pub config: JsonValue,
1626}
1627
1628#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, ToSchema)]
1629#[serde(default)]
1630pub struct ResourceConfig {
1631    /// The minimum number of CPU cores to reserve
1632    /// for an instance of this pipeline
1633    #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1634    pub cpu_cores_min: Option<f64>,
1635
1636    /// The maximum number of CPU cores to reserve
1637    /// for an instance of this pipeline
1638    #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1639    pub cpu_cores_max: Option<f64>,
1640
1641    /// The minimum memory in Megabytes to reserve
1642    /// for an instance of this pipeline
1643    pub memory_mb_min: Option<u64>,
1644
1645    /// The maximum memory in Megabytes to reserve
1646    /// for an instance of this pipeline
1647    pub memory_mb_max: Option<u64>,
1648
1649    /// The total storage in Megabytes to reserve
1650    /// for an instance of this pipeline
1651    pub storage_mb_max: Option<u64>,
1652
1653    /// Storage class to use for an instance of this pipeline.
1654    /// The class determines storage performance such as IOPS and throughput.
1655    pub storage_class: Option<String>,
1656
1657    /// Kubernetes service account name to use for an instance of this pipeline.
1658    /// The account determines permissions and access controls.
1659    pub service_account_name: Option<String>,
1660
1661    /// Kubernetes namespace to use for an instance of this pipeline.
1662    /// The namespace determines the scope of names for resources created
1663    /// for the pipeline.
1664    /// If not set, the pipeline will be deployed in the same namespace
1665    /// as the control-plane.
1666    pub namespace: Option<String>,
1667}