Skip to main content

feldera_types/
config.rs

1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure.  The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs.  We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::preprocess::PreprocessorConfig;
9use crate::program_schema::ProgramSchema;
10use crate::secret_resolver::default_secrets_directory;
11use crate::transport::adhoc::AdHocInputConfig;
12use crate::transport::clock::ClockConfig;
13use crate::transport::datagen::DatagenInputConfig;
14use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
15use crate::transport::file::{FileInputConfig, FileOutputConfig};
16use crate::transport::http::HttpInputConfig;
17use crate::transport::iceberg::IcebergReaderConfig;
18use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
19use crate::transport::nats::NatsInputConfig;
20use crate::transport::nexmark::NexmarkInputConfig;
21use crate::transport::postgres::{PostgresReaderConfig, PostgresWriterConfig};
22use crate::transport::pubsub::PubSubInputConfig;
23use crate::transport::redis::RedisOutputConfig;
24use crate::transport::s3::S3InputConfig;
25use crate::transport::url::UrlInputConfig;
26use core::fmt;
27use feldera_ir::{MirNode, MirNodeId};
28use serde::de::{self, MapAccess, Visitor};
29use serde::{Deserialize, Deserializer, Serialize};
30use serde_json::Value as JsonValue;
31use std::collections::HashMap;
32use std::fmt::Display;
33use std::path::Path;
34use std::str::FromStr;
35use std::time::Duration;
36use std::{borrow::Cow, cmp::max, collections::BTreeMap};
37use utoipa::ToSchema;
38use utoipa::openapi::{ObjectBuilder, OneOfBuilder, Ref, RefOr, Schema, SchemaType};
39
40pub mod dev_tweaks;
41pub use dev_tweaks::DevTweaks;
42
43const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
44
45/// Default value of `ConnectorConfig::max_queued_records`.
46pub const fn default_max_queued_records() -> u64 {
47    1_000_000
48}
49
50pub const DEFAULT_MAX_WORKER_BATCH_SIZE: u64 = 10_000;
51
52pub const DEFAULT_CLOCK_RESOLUTION_USECS: u64 = 1_000_000;
53
54/// Program information included in the pipeline configuration.
55#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)]
56pub struct ProgramIr {
57    /// The MIR of the program.
58    pub mir: HashMap<MirNodeId, MirNode>,
59    /// Program schema.
60    pub program_schema: ProgramSchema,
61}
62
63/// Pipeline deployment configuration.
64/// It represents configuration entries directly provided by the user
65/// (e.g., runtime configuration) and entries derived from the schema
66/// of the compiled program (e.g., connectors). Storage configuration,
67/// if applicable, is set by the runner.
68#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq)]
69pub struct PipelineConfig {
70    /// Global controller configuration.
71    #[serde(flatten)]
72    #[schema(inline)]
73    pub global: RuntimeConfig,
74
75    /// Configuration for multihost pipelines.
76    ///
77    /// The presence of this field indicates that the pipeline is running in
78    /// multihost mode.  In the pod with ordinal 0, this triggers starting the
79    /// coordinator process.  In all pods, this tells the pipeline process to
80    /// await a connection from the coordinator instead of initializing the
81    /// pipeline immediately.
82    pub multihost: Option<MultihostConfig>,
83
84    /// Unique system-generated name of the pipeline (format: `pipeline-<uuid>`).
85    /// It is unique across all tenants and cannot be changed.
86    ///
87    /// The `<uuid>` is also used in the naming of various resources that back the pipeline,
88    /// and as such this name is useful to find/identify corresponding resources.
89    pub name: Option<String>,
90
91    /// Name given by the tenant to the pipeline. It is only unique within the same tenant, and can
92    /// be changed by the tenant when the pipeline is stopped.
93    ///
94    /// Given a specific tenant, it can be used to find/identify a specific pipeline of theirs.
95    pub given_name: Option<String>,
96
97    /// Configuration for persistent storage
98    ///
99    /// If `global.storage` is `Some(_)`, this field must be set to some
100    /// [`StorageConfig`].  If `global.storage` is `None``, the pipeline ignores
101    /// this field.
102    #[serde(default)]
103    pub storage_config: Option<StorageConfig>,
104
105    /// Directory containing values of secrets.
106    ///
107    /// If this is not set, a default directory is used.
108    pub secrets_dir: Option<String>,
109
110    /// Input endpoint configuration.
111    #[serde(default)]
112    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
113
114    /// Output endpoint configuration.
115    #[serde(default)]
116    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
117
118    /// Program information.
119    #[serde(default)]
120    pub program_ir: Option<ProgramIr>,
121}
122
123impl PipelineConfig {
124    pub fn max_parallel_connector_init(&self) -> u64 {
125        max(
126            self.global
127                .max_parallel_connector_init
128                .unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
129            1,
130        )
131    }
132
133    pub fn with_storage(self, storage: Option<(StorageConfig, StorageOptions)>) -> Self {
134        let (storage_config, storage_options) = storage.unzip();
135        Self {
136            global: RuntimeConfig {
137                storage: storage_options,
138                ..self.global
139            },
140            storage_config,
141            ..self
142        }
143    }
144
145    pub fn storage(&self) -> Option<(&StorageConfig, &StorageOptions)> {
146        let storage_options = self.global.storage.as_ref();
147        let storage_config = self.storage_config.as_ref();
148        storage_config.zip(storage_options)
149    }
150
151    /// Returns `self.secrets_dir`, or the default secrets directory if it isn't
152    /// set.
153    pub fn secrets_dir(&self) -> &Path {
154        match &self.secrets_dir {
155            Some(dir) => Path::new(dir.as_str()),
156            None => default_secrets_directory(),
157        }
158    }
159
160    /// Abbreviated config that can be printed in the log on pipeline startup.
161    pub fn display_summary(&self) -> String {
162        // TODO: we may want to further abbreviate connector config.
163        let summary = serde_json::json!({
164            "name": self.name,
165            "given_name": self.given_name,
166            "global": self.global,
167            "storage_config": self.storage_config,
168            "secrets_dir": self.secrets_dir,
169            "inputs": self.inputs,
170            "outputs": self.outputs
171        });
172
173        serde_json::to_string_pretty(&summary).unwrap_or_else(|_| "{}".to_string())
174    }
175}
176
177/// A subset of fields in `PipelineConfig` that are generated by the compiler.
178/// These fields are shipped to the pipeline by the compilation server along with
179/// the program binary.
180// Note: An alternative would be to embed these fields in the program binary itself
181// as static strings. This would work well for program IR, but it would require recompiling
182// the program anytime a connector config changes, whereas today connector changes
183// do not require recompilation.
184#[derive(Default, Deserialize, Serialize, Eq, PartialEq, Debug, Clone)]
185pub struct PipelineConfigProgramInfo {
186    /// Input endpoint configuration.
187    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
188
189    /// Output endpoint configuration.
190    #[serde(default)]
191    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
192
193    /// Program information.
194    #[serde(default)]
195    pub program_ir: Option<ProgramIr>,
196}
197
198/// Configuration for a multihost Feldera pipeline.
199///
200/// This configuration is primarily for the coordinator.
201#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
202pub struct MultihostConfig {
203    /// Number of hosts to launch.
204    ///
205    /// For the configuration to be truly multihost, this should be at least 2.
206    /// A value of 1 still runs the multihost coordinator but it only
207    /// coordinates a single host.
208    pub hosts: usize,
209}
210
211impl Default for MultihostConfig {
212    fn default() -> Self {
213        Self { hosts: 1 }
214    }
215}
216
217/// Configuration for persistent storage in a [`PipelineConfig`].
218#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
219pub struct StorageConfig {
220    /// A directory to keep pipeline state, as a path on the filesystem of the
221    /// machine or container where the pipeline will run.
222    ///
223    /// When storage is enabled, this directory stores the data for
224    /// [StorageBackendConfig::Default].
225    ///
226    /// When fault tolerance is enabled, this directory stores checkpoints and
227    /// the log.
228    pub path: String,
229
230    /// How to cache access to storage in this pipeline.
231    #[serde(default)]
232    pub cache: StorageCacheConfig,
233}
234
235impl StorageConfig {
236    pub fn path(&self) -> &Path {
237        Path::new(&self.path)
238    }
239}
240
241/// How to cache access to storage within a Feldera pipeline.
242#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
243#[serde(rename_all = "snake_case")]
244pub enum StorageCacheConfig {
245    /// Use the operating system's page cache as the primary storage cache.
246    ///
247    /// This is the default because it currently performs better than
248    /// `FelderaCache`.
249    #[default]
250    PageCache,
251
252    /// Use Feldera's internal cache implementation.
253    ///
254    /// This is under development. It will become the default when its
255    /// performance exceeds that of `PageCache`.
256    FelderaCache,
257}
258
259impl StorageCacheConfig {
260    #[cfg(unix)]
261    pub fn to_custom_open_flags(&self) -> i32 {
262        match self {
263            StorageCacheConfig::PageCache => (),
264            StorageCacheConfig::FelderaCache => {
265                #[cfg(target_os = "linux")]
266                return libc::O_DIRECT;
267            }
268        }
269        0
270    }
271}
272
273/// Storage configuration for a pipeline.
274#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
275#[serde(default)]
276pub struct StorageOptions {
277    /// How to connect to the underlying storage.
278    pub backend: StorageBackendConfig,
279
280    /// For a batch of data maintained as part of a persistent index during a
281    /// pipeline run, the minimum estimated number of bytes to write it to
282    /// storage.
283    ///
284    /// This is provided for debugging and fine-tuning and should ordinarily be
285    /// left unset.
286    ///
287    /// A value of 0 will write even empty batches to storage, and nonzero
288    /// values provide a threshold.  `usize::MAX` would effectively disable
289    /// storage for such batches.  The default is 10,048,576 (10 MiB).
290    pub min_storage_bytes: Option<usize>,
291
292    /// For a batch of data passed through the pipeline during a single step,
293    /// the minimum estimated number of bytes to write it to storage.
294    ///
295    /// This is provided for debugging and fine-tuning and should ordinarily be
296    /// left unset.  A value of 0 will write even empty batches to storage, and
297    /// nonzero values provide a threshold.  `usize::MAX`, the default,
298    /// effectively disables storage for such batches.  If it is set to another
299    /// value, it should ordinarily be greater than or equal to
300    /// `min_storage_bytes`.
301    pub min_step_storage_bytes: Option<usize>,
302
303    /// The form of compression to use in data batches.
304    ///
305    /// Compression has a CPU cost but it can take better advantage of limited
306    /// NVMe and network bandwidth, which means that it can increase overall
307    /// performance.
308    pub compression: StorageCompression,
309
310    /// The maximum size of the in-memory storage cache, in MiB.
311    ///
312    /// If set, the specified cache size is spread across all the foreground and
313    /// background threads. If unset, each foreground or background thread cache
314    /// is limited to 256 MiB.
315    pub cache_mib: Option<usize>,
316}
317
318/// Backend storage configuration.
319#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
320#[serde(tag = "name", content = "config", rename_all = "snake_case")]
321pub enum StorageBackendConfig {
322    /// Use the default storage configuration.
323    ///
324    /// This currently uses the local file system.
325    #[default]
326    Default,
327
328    /// Use the local file system.
329    ///
330    /// This uses ordinary system file operations.
331    File(Box<FileBackendConfig>),
332
333    /// Object storage.
334    Object(ObjectStorageConfig),
335}
336
337impl Display for StorageBackendConfig {
338    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
339        match self {
340            StorageBackendConfig::Default => write!(f, "default"),
341            StorageBackendConfig::File(_) => write!(f, "file"),
342            StorageBackendConfig::Object(_) => write!(f, "object"),
343        }
344    }
345}
346
347/// Storage compression algorithm.
348#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
349#[serde(rename_all = "snake_case")]
350pub enum StorageCompression {
351    /// Use Feldera's default compression algorithm.
352    ///
353    /// The default may change as Feldera's performance is tuned and new
354    /// algorithms are introduced.
355    #[default]
356    Default,
357
358    /// Do not compress.
359    None,
360
361    /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
362    Snappy,
363}
364
365#[derive(Debug, Clone, Eq, PartialEq)]
366pub enum StartFromCheckpoint {
367    Latest,
368    Uuid(uuid::Uuid),
369}
370
371impl ToSchema<'_> for StartFromCheckpoint {
372    fn schema() -> (
373        &'static str,
374        utoipa::openapi::RefOr<utoipa::openapi::schema::Schema>,
375    ) {
376        (
377            "StartFromCheckpoint",
378            utoipa::openapi::RefOr::T(Schema::OneOf(
379                OneOfBuilder::new()
380                    .item(
381                        ObjectBuilder::new()
382                            .schema_type(SchemaType::String)
383                            .enum_values(Some(["latest"].into_iter()))
384                            .build(),
385                    )
386                    .item(
387                        ObjectBuilder::new()
388                            .schema_type(SchemaType::String)
389                            .format(Some(utoipa::openapi::SchemaFormat::KnownFormat(
390                                utoipa::openapi::KnownFormat::Uuid,
391                            )))
392                            .build(),
393                    )
394                    .nullable(true)
395                    .build(),
396            )),
397        )
398    }
399}
400
401impl<'de> Deserialize<'de> for StartFromCheckpoint {
402    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
403    where
404        D: Deserializer<'de>,
405    {
406        struct StartFromCheckpointVisitor;
407
408        impl<'de> Visitor<'de> for StartFromCheckpointVisitor {
409            type Value = StartFromCheckpoint;
410
411            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
412                formatter.write_str("a UUID string or the string \"latest\"")
413            }
414
415            fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
416            where
417                E: de::Error,
418            {
419                if value == "latest" {
420                    Ok(StartFromCheckpoint::Latest)
421                } else {
422                    uuid::Uuid::parse_str(value)
423                        .map(StartFromCheckpoint::Uuid)
424                        .map_err(|_| E::invalid_value(serde::de::Unexpected::Str(value), &self))
425                }
426            }
427        }
428
429        deserializer.deserialize_str(StartFromCheckpointVisitor)
430    }
431}
432
433impl Serialize for StartFromCheckpoint {
434    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
435    where
436        S: serde::Serializer,
437    {
438        match self {
439            StartFromCheckpoint::Latest => serializer.serialize_str("latest"),
440            StartFromCheckpoint::Uuid(uuid) => serializer.serialize_str(&uuid.to_string()),
441        }
442    }
443}
444
445#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
446pub struct SyncConfig {
447    /// The endpoint URL for the storage service.
448    ///
449    /// This is typically required for custom or local S3-compatible storage providers like MinIO.
450    /// Example: `http://localhost:9000`
451    ///
452    /// Relevant rclone config key: [`endpoint`](https://rclone.org/s3/#s3-endpoint)
453    pub endpoint: Option<String>,
454
455    /// The name of the storage bucket.
456    ///
457    /// This may include a path to a folder inside the bucket (e.g., `my-bucket/data`).
458    pub bucket: String,
459
460    /// The region that this bucket is in.
461    ///
462    /// Leave empty for Minio or the default region (`us-east-1` for AWS).
463    pub region: Option<String>,
464
465    /// The name of the cloud storage provider (e.g., `"AWS"`, `"Minio"`).
466    ///
467    /// Used for provider-specific behavior in rclone.
468    /// If omitted, defaults to `"Other"`.
469    ///
470    /// See [rclone S3 provider documentation](https://rclone.org/s3/#s3-provider)
471    pub provider: Option<String>,
472
473    /// The access key used to authenticate with the storage provider.
474    ///
475    /// If not provided, rclone will fall back to environment-based credentials, such as
476    /// `RCLONE_S3_ACCESS_KEY_ID`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
477    /// this can be left empty to allow automatic authentication via the pod's service account.
478    pub access_key: Option<String>,
479
480    /// The secret key used together with the access key for authentication.
481    ///
482    /// If not provided, rclone will fall back to environment-based credentials, such as
483    /// `RCLONE_S3_SECRET_ACCESS_KEY`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
484    /// this can be left empty to allow automatic authentication via the pod's service account.
485    pub secret_key: Option<String>,
486
487    /// When set, the pipeline will try fetch the specified checkpoint from the
488    /// object store.
489    ///
490    /// If `fail_if_no_checkpoint` is `true`, the pipeline will fail to initialize.
491    pub start_from_checkpoint: Option<StartFromCheckpoint>,
492
493    /// When true, the pipeline will fail to initialize if fetching the
494    /// specified checkpoint fails (missing, download error).
495    /// When false, the pipeline will start from scratch instead.
496    ///
497    /// False by default.
498    #[schema(default = std::primitive::bool::default)]
499    #[serde(default)]
500    pub fail_if_no_checkpoint: bool,
501
502    /// The number of file transfers to run in parallel.
503    /// Default: 20
504    pub transfers: Option<u8>,
505
506    /// The number of checkers to run in parallel.
507    /// Default: 20
508    pub checkers: Option<u8>,
509
510    /// Set to skip post copy check of checksums, and only check the file sizes.
511    /// This can significantly improve the throughput.
512    /// Defualt: false
513    pub ignore_checksum: Option<bool>,
514
515    /// Number of streams to use for multi-thread downloads.
516    /// Default: 10
517    pub multi_thread_streams: Option<u8>,
518
519    /// Use multi-thread download for files above this size.
520    /// Format: `[size][Suffix]` (Example: 1G, 500M)
521    /// Supported suffixes: k|M|G|T
522    /// Default: 100M
523    pub multi_thread_cutoff: Option<String>,
524
525    /// The number of chunks of the same file that are uploaded for multipart uploads.
526    /// Default: 10
527    pub upload_concurrency: Option<u8>,
528
529    /// When `true`, the pipeline starts in **standby** mode; processing doesn't
530    /// start until activation (`POST /activate`).
531    /// If this pipeline was previously activated and the storage has not been
532    /// cleared, the pipeline will auto activate, no newer checkpoints will be
533    /// fetched.
534    ///
535    /// Standby behavior depends on `start_from_checkpoint`:
536    /// - If `latest`, pipeline continuously fetches the latest available
537    ///   checkpoint until activated.
538    /// - If checkpoint UUID, pipeline fetches this checkpoint once and waits
539    ///   in standby until activated.
540    ///
541    /// Default: `false`
542    #[schema(default = std::primitive::bool::default)]
543    #[serde(default)]
544    pub standby: bool,
545
546    /// The interval (in seconds) between each attempt to fetch the latest
547    /// checkpoint from object store while in standby mode.
548    ///
549    /// Applies only when `start_from_checkpoint` is set to `latest`.
550    ///
551    /// Default: 10 seconds
552    #[schema(default = default_pull_interval)]
553    #[serde(default = "default_pull_interval")]
554    pub pull_interval: u64,
555
556    /// The interval (in seconds) between each push of checkpoints to object store.
557    ///
558    /// Default: disabled (no periodic push).
559    #[serde(default)]
560    pub push_interval: Option<u64>,
561
562    /// Extra flags to pass to `rclone`.
563    ///
564    /// WARNING: Supplying incorrect or conflicting flags can break `rclone`.
565    /// Use with caution.
566    ///
567    /// Refer to the docs to see the supported flags:
568    /// - [Global flags](https://rclone.org/flags/)
569    /// - [S3 specific flags](https://rclone.org/s3/)
570    pub flags: Option<Vec<String>>,
571
572    /// The minimum number of checkpoints to retain in object store.
573    /// No checkpoints will be deleted if the total count is below this threshold.
574    ///
575    /// Default: 10
576    #[schema(default = default_retention_min_count)]
577    #[serde(default = "default_retention_min_count")]
578    pub retention_min_count: u32,
579
580    /// The minimum age (in days) a checkpoint must reach before it becomes
581    /// eligible for deletion. All younger checkpoints will be preserved.
582    ///
583    /// Default: 30
584    #[schema(default = default_retention_min_age)]
585    #[serde(default = "default_retention_min_age")]
586    pub retention_min_age: u32,
587
588    /// A read-only bucket used as a fallback checkpoint source.
589    ///
590    /// When the pipeline has no local checkpoint and `bucket` contains no
591    /// checkpoint either, it will attempt to fetch the checkpoint from this
592    /// location instead.  All connection settings (`endpoint`, `region`,
593    /// `provider`, `access_key`, `secret_key`) are shared with `bucket`.
594    ///
595    /// The pipeline **never writes** to `read_bucket`.
596    ///
597    /// Must point to a different location than `bucket`.
598    #[serde(default)]
599    pub read_bucket: Option<String>,
600}
601
602fn default_pull_interval() -> u64 {
603    10
604}
605
606fn default_retention_min_count() -> u32 {
607    10
608}
609
610fn default_retention_min_age() -> u32 {
611    30
612}
613
614impl SyncConfig {
615    pub fn validate(&self) -> Result<(), String> {
616        if self.standby && self.start_from_checkpoint.is_none() {
617            return Err(r#"invalid sync config: `standby` set to `true` but `start_from_checkpoint` not set.
618Standby mode requires `start_from_checkpoint` to be set.
619Consider setting `start_from_checkpoint` to `"latest"`."#.to_owned());
620        }
621
622        if let Some(ref rb) = self.read_bucket
623            && rb == &self.bucket
624        {
625            return Err(
626                "invalid sync config: `read_bucket` and `bucket` must point to different locations"
627                    .to_owned(),
628            );
629        }
630
631        Ok(())
632    }
633}
634
635/// Configuration for supplying a custom pipeline StatefulSet template via a Kubernetes ConfigMap.
636///
637/// Operators can provide a custom StatefulSet YAML that the Kubernetes runner will use when
638/// creating pipeline StatefulSets for a pipeline. The custom template must be stored as the
639/// value of a key in a ConfigMap in the same namespace as the pipeline; set `name` to the
640/// ConfigMap name and `key` to the entry that contains the template.
641///
642/// Recommendations and requirements:
643/// - **Start from the default template and modify it as needed.** The default template is present
644///   in ConfigMap named as `<release-name>-pipeline-template`, with key `pipelineTemplate` in the release
645///   namespace and should be used as a reference.
646/// - The template must contain a valid Kubernetes `StatefulSet` manifest in YAML form. The
647///   runner substitutes variables in the template before parsing; therefore the final YAML
648///   must be syntactically valid.
649/// - The runner performs simple string substitution for the following placeholders. Please ensure these
650///   placeholders are placed at appropriate location for their semantics:
651///   - `{id}`: pipeline Kubernetes name (used for object names and labels)
652///   - `{namespace}`: Kubernetes namespace where the pipeline runs
653///   - `{pipeline_executor_image}`: container image used to run the pipeline executor
654///   - `{binary_ref}`: program binary reference passed as an argument
655///   - `{program_info_ref}`: program info reference passed as an argument
656///   - `{pipeline_storage_path}`: mount path for persistent pipeline storage
657///   - `{storage_class_name}`: storage class name to use for PVCs (if applicable)
658///   - `{deployment_id}`: UUID identifying the deployment instance
659///   - `{deployment_initial}`: initial desired runtime status (e.g., `provisioning`)
660///   - `{bootstrap_policy}`: bootstrap policy value when applicable
661#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
662pub struct PipelineTemplateConfig {
663    /// Name of the ConfigMap containing the pipeline template.
664    pub name: String,
665    /// Key in the ConfigMap containing the pipeline template.
666    ///
667    /// If not set, defaults to `pipelineTemplate`.
668    #[schema(default = default_pipeline_template_key)]
669    #[serde(default = "default_pipeline_template_key")]
670    pub key: String,
671}
672
673fn default_pipeline_template_key() -> String {
674    "pipelineTemplate".to_string()
675}
676
677#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
678pub struct ObjectStorageConfig {
679    /// URL.
680    ///
681    /// The following URL schemes are supported:
682    ///
683    /// * S3:
684    ///   - `s3://<bucket>/<path>`
685    ///   - `s3a://<bucket>/<path>`
686    ///   - `https://s3.<region>.amazonaws.com/<bucket>`
687    ///   - `https://<bucket>.s3.<region>.amazonaws.com`
688    ///   - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
689    /// * Google Cloud Storage:
690    ///   - `gs://<bucket>/<path>`
691    /// * Microsoft Azure Blob Storage:
692    ///   - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
693    ///   - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
694    ///   - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
695    ///   - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
696    ///   - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
697    ///   - `azure://<container>/<path>` (custom)
698    ///   - `https://<account>.dfs.core.windows.net`
699    ///   - `https://<account>.blob.core.windows.net`
700    ///   - `https://<account>.blob.core.windows.net/<container>`
701    ///   - `https://<account>.dfs.fabric.microsoft.com`
702    ///   - `https://<account>.dfs.fabric.microsoft.com/<container>`
703    ///   - `https://<account>.blob.fabric.microsoft.com`
704    ///   - `https://<account>.blob.fabric.microsoft.com/<container>`
705    ///
706    /// Settings derived from the URL will override other settings.
707    pub url: String,
708
709    /// Additional options as key-value pairs.
710    ///
711    /// The following keys are supported:
712    ///
713    /// * S3:
714    ///   - `access_key_id`: AWS Access Key.
715    ///   - `secret_access_key`: AWS Secret Access Key.
716    ///   - `region`: Region.
717    ///   - `default_region`: Default region.
718    ///   - `endpoint`: Custom endpoint for communicating with S3,
719    ///     e.g. `https://localhost:4566` for testing against a localstack
720    ///     instance.
721    ///   - `token`: Token to use for requests (passed to underlying provider).
722    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
723    /// * Google Cloud Storage:
724    ///   - `service_account`: Path to the service account file.
725    ///   - `service_account_key`: The serialized service account key.
726    ///   - `google_application_credentials`: Application credentials path.
727    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
728    /// * Microsoft Azure Blob Storage:
729    ///   - `access_key`: Azure Access Key.
730    ///   - `container_name`: Azure Container Name.
731    ///   - `account`: Azure Account.
732    ///   - `bearer_token_authorization`: Static bearer token for authorizing requests.
733    ///   - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
734    ///   - `client_secret`: Client secret for use in client secret flow.
735    ///   - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
736    ///   - `endpoint`: Override the endpoint for communicating with blob storage.
737    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
738    ///
739    /// Options set through the URL take precedence over those set with these
740    /// options.
741    #[serde(flatten)]
742    pub other_options: BTreeMap<String, String>,
743}
744
745/// Configuration for local file system access.
746#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
747#[serde(default)]
748pub struct FileBackendConfig {
749    /// Whether to use background threads for file I/O.
750    ///
751    /// Background threads should improve performance, but they can reduce
752    /// performance if too few cores are available. This is provided for
753    /// debugging and fine-tuning and should ordinarily be left unset.
754    pub async_threads: Option<bool>,
755
756    /// Per-I/O operation sleep duration, in milliseconds.
757    ///
758    /// This is for simulating slow storage devices.  Do not use this in
759    /// production.
760    pub ioop_delay: Option<u64>,
761
762    /// Configuration to synchronize checkpoints to object store.
763    pub sync: Option<SyncConfig>,
764}
765
766/// Global pipeline configuration settings. This is the publicly
767/// exposed type for users to configure pipelines.
768#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
769#[serde(default)]
770pub struct RuntimeConfig {
771    /// Number of DBSP worker threads.
772    ///
773    /// Each DBSP "foreground" worker thread is paired with a "background"
774    /// thread for LSM merging, making the total number of threads twice the
775    /// specified number.
776    ///
777    /// The typical sweet spot for the number of workers is between 4 and 16.
778    /// Each worker increases overall memory consumption for data structures
779    /// used during a step.
780    pub workers: u16,
781
782    /// The maximum amount of memory, in Megabytes, that the pipeline is allowed to use
783    /// on each host.
784    ///
785    /// Setting this property activates memory pressure monitoring and backpressure
786    /// mechanisms. The pipeline will track the amount of remaining memory and
787    /// report the memory pressure level via the `memory_pressure` metric.
788    ///
789    /// As the memory pressure increases, the system will apply increasing backpressure
790    /// to push state cached in memory to storage, preventing the pipeline from running
791    /// out of memory at the cost of some performance degradation.
792    ///
793    /// It is strongly recommended to set this property to prevent the pipeline from
794    /// running out of memory. The setting should not exceed the memory limit of the pipeline
795    /// instance.
796    ///
797    /// When `max_rss_mb` is not specified but `resources.memory_mb_max` is set, the
798    /// latter is used as the effective memory cap for the pipeline.
799    ///
800    /// See [documentation on the pipeline's memory usage](https://docs.feldera.com/operations/memory)
801    /// for more details.
802    pub max_rss_mb: Option<u64>,
803
804    /// Number of DBSP hosts.
805    ///
806    /// The worker threads are evenly divided among the hosts.  For single-host
807    /// deployments, this should be 1 (the default).
808    ///
809    /// Multihost pipelines are an enterprise-only preview feature.
810    pub hosts: usize,
811
812    /// Storage configuration.
813    ///
814    /// - If this is `None`, the default, the pipeline's state is kept in
815    ///   in-memory data-structures.  This is useful if the pipeline's state
816    ///   will fit in memory and if the pipeline is ephemeral and does not need
817    ///   to be recovered after a restart. The pipeline will most likely run
818    ///   faster since it does not need to access storage.
819    ///
820    /// - If set, the pipeline's state is kept on storage.  This allows the
821    ///   pipeline to work with state that will not fit into memory. It also
822    ///   allows the state to be checkpointed and recovered across restarts.
823    #[serde(deserialize_with = "deserialize_storage_options")]
824    pub storage: Option<StorageOptions>,
825
826    /// Fault tolerance configuration.
827    #[serde(deserialize_with = "deserialize_fault_tolerance")]
828    pub fault_tolerance: FtConfig,
829
830    /// Enable CPU profiler.
831    ///
832    /// The default value is `true`.
833    pub cpu_profiler: bool,
834
835    /// Enable pipeline tracing.
836    pub tracing: bool,
837
838    /// Jaeger tracing endpoint to send tracing information to.
839    pub tracing_endpoint_jaeger: String,
840
841    /// Minimal input batch size.
842    ///
843    /// The controller delays pushing input records to the circuit until at
844    /// least `min_batch_size_records` records have been received (total
845    /// across all endpoints) or `max_buffering_delay_usecs` microseconds
846    /// have passed since at least one input records has been buffered.
847    /// Defaults to 0.
848    pub min_batch_size_records: u64,
849
850    /// Maximal delay in microseconds to wait for `min_batch_size_records` to
851    /// get buffered by the controller, defaults to 0.
852    pub max_buffering_delay_usecs: u64,
853
854    /// Resource reservations and limits. This is enforced
855    /// only in Feldera Cloud.
856    pub resources: ResourceConfig,
857
858    /// Real-time clock resolution in microseconds.
859    ///
860    /// This parameter controls the execution of queries that use the `NOW()` function.  The output of such
861    /// queries depends on the real-time clock and can change over time without any external
862    /// inputs.  If the query uses `NOW()`, the pipeline will update the clock value and trigger incremental
863    /// recomputation at most each `clock_resolution_usecs` microseconds.  If the query does not use
864    /// `NOW()`, then clock value updates are suppressed and the pipeline ignores this setting.
865    ///
866    /// It is set to 1 second (1,000,000 microseconds) by default.
867    pub clock_resolution_usecs: Option<u64>,
868
869    /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
870    /// its worker threads.  Specify at least twice as many CPU numbers as
871    /// workers.  CPUs are generally numbered starting from 0.  The pipeline
872    /// might not be able to honor CPU pinning requests.
873    ///
874    /// CPU pinning can make pipelines run faster and perform more consistently,
875    /// as long as different pipelines running on the same machine are pinned to
876    /// different CPUs.
877    pub pin_cpus: Vec<usize>,
878
879    /// Timeout in seconds for the `Provisioning` phase of the pipeline.
880    /// Setting this value will override the default of the runner.
881    pub provisioning_timeout_secs: Option<u64>,
882
883    /// The maximum number of connectors initialized in parallel during pipeline
884    /// startup.
885    ///
886    /// At startup, the pipeline must initialize all of its input and output connectors.
887    /// Depending on the number and types of connectors, this can take a long time.
888    /// To accelerate the process, multiple connectors are initialized concurrently.
889    /// This option controls the maximum number of connectors that can be initialized
890    /// in parallel.
891    ///
892    /// The default is 10.
893    pub max_parallel_connector_init: Option<u64>,
894
895    /// Specification of additional (sidecar) containers.
896    pub init_containers: Option<serde_json::Value>,
897
898    /// Deprecated: setting this true or false does not have an effect anymore.
899    pub checkpoint_during_suspend: bool,
900
901    /// Sets the number of available runtime threads for the http server.
902    ///
903    /// In most cases, this does not need to be set explicitly and
904    /// the default is sufficient. Can be increased in case the
905    /// pipeline HTTP API operations are a bottleneck.
906    ///
907    /// If not specified, the default is set to `workers`.
908    pub http_workers: Option<u64>,
909
910    /// Sets the number of available runtime threads for async IO tasks.
911    ///
912    /// This affects some networking and file I/O operations
913    /// especially adapters and ad-hoc queries.
914    ///
915    /// In most cases, this does not need to be set explicitly and
916    /// the default is sufficient. Can be increased in case
917    /// ingress, egress or ad-hoc queries are a bottleneck.
918    ///
919    /// If not specified, the default is set to `workers`.
920    pub io_workers: Option<u64>,
921
922    /// Environment variables for the pipeline process.
923    ///
924    /// These are key-value pairs injected into the pipeline process environment.
925    /// Some variable names are reserved by the platform and cannot be overridden
926    /// (for example `RUST_LOG`, and variables in the `FELDERA_`,
927    /// `KUBERNETES_`, and `TOKIO_` namespaces).
928    #[serde(default)]
929    pub env: BTreeMap<String, String>,
930
931    /// Optional settings for tweaking Feldera internals.
932    pub dev_tweaks: DevTweaks,
933
934    /// Log filtering directives.
935    ///
936    /// If set to a valid [tracing-subscriber] filter, this controls the log
937    /// messages emitted by the pipeline process.  Otherwise, or if the filter
938    /// has invalid syntax, messages at "info" severity and higher are written
939    /// to the log and all others are discarded.
940    ///
941    /// [tracing-subscriber]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
942    pub logging: Option<String>,
943
944    /// ConfigMap containing a custom pipeline template (Enterprise only).
945    ///
946    /// This feature is only available in Feldera Enterprise. If set, the Kubernetes runner
947    /// will read the template from the specified ConfigMap and use it instead of the default
948    /// StatefulSet template for the configured pipeline.
949    ///
950    /// check [`PipelineTemplateConfig`] documentation for details.
951    pub pipeline_template_configmap: Option<PipelineTemplateConfig>,
952}
953
954/// Accepts "true" and "false" and converts them to the new format.
955fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
956where
957    D: Deserializer<'de>,
958{
959    struct BoolOrStruct;
960
961    impl<'de> Visitor<'de> for BoolOrStruct {
962        type Value = Option<StorageOptions>;
963
964        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
965            formatter.write_str("boolean or StorageOptions")
966        }
967
968        fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
969        where
970            E: de::Error,
971        {
972            match v {
973                false => Ok(None),
974                true => Ok(Some(StorageOptions::default())),
975            }
976        }
977
978        fn visit_unit<E>(self) -> Result<Self::Value, E>
979        where
980            E: de::Error,
981        {
982            Ok(None)
983        }
984
985        fn visit_none<E>(self) -> Result<Self::Value, E>
986        where
987            E: de::Error,
988        {
989            Ok(None)
990        }
991
992        fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
993        where
994            M: MapAccess<'de>,
995        {
996            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
997        }
998    }
999
1000    deserializer.deserialize_any(BoolOrStruct)
1001}
1002
1003/// Accepts very old 'initial_state' and 'latest_checkpoint' as enabling fault
1004/// tolerance.
1005///
1006/// Accepts `null` as disabling fault tolerance.
1007///
1008/// Otherwise, deserializes [FtConfig] in the way that one might otherwise
1009/// expect.
1010fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<FtConfig, D::Error>
1011where
1012    D: Deserializer<'de>,
1013{
1014    struct StringOrStruct;
1015
1016    impl<'de> Visitor<'de> for StringOrStruct {
1017        type Value = FtConfig;
1018
1019        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1020            formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
1021        }
1022
1023        fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1024        where
1025            E: de::Error,
1026        {
1027            match v {
1028                "initial_state" | "latest_checkpoint" => Ok(FtConfig {
1029                    model: Some(FtModel::default()),
1030                    ..FtConfig::default()
1031                }),
1032                _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
1033            }
1034        }
1035
1036        fn visit_unit<E>(self) -> Result<Self::Value, E>
1037        where
1038            E: de::Error,
1039        {
1040            Ok(FtConfig::default())
1041        }
1042
1043        fn visit_none<E>(self) -> Result<Self::Value, E>
1044        where
1045            E: de::Error,
1046        {
1047            Ok(FtConfig::default())
1048        }
1049
1050        fn visit_map<M>(self, map: M) -> Result<FtConfig, M::Error>
1051        where
1052            M: MapAccess<'de>,
1053        {
1054            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))
1055        }
1056    }
1057
1058    deserializer.deserialize_any(StringOrStruct)
1059}
1060
1061impl Default for RuntimeConfig {
1062    fn default() -> Self {
1063        Self {
1064            workers: 8,
1065            max_rss_mb: None,
1066            hosts: 1,
1067            storage: Some(StorageOptions::default()),
1068            fault_tolerance: FtConfig::default(),
1069            cpu_profiler: true,
1070            tracing: {
1071                // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
1072                // to keep it on by default.
1073                false
1074            },
1075            tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
1076            min_batch_size_records: 0,
1077            max_buffering_delay_usecs: 0,
1078            resources: ResourceConfig::default(),
1079            clock_resolution_usecs: { Some(DEFAULT_CLOCK_RESOLUTION_USECS) },
1080            pin_cpus: Vec::new(),
1081            provisioning_timeout_secs: None,
1082            max_parallel_connector_init: None,
1083            init_containers: None,
1084            checkpoint_during_suspend: true,
1085            io_workers: None,
1086            http_workers: None,
1087            env: BTreeMap::default(),
1088            dev_tweaks: DevTweaks::default(),
1089            logging: None,
1090            pipeline_template_configmap: None,
1091        }
1092    }
1093}
1094
1095/// Fault-tolerance configuration.
1096///
1097/// The default [FtConfig] (via [FtConfig::default]) disables fault tolerance,
1098/// which is the configuration that one gets if [RuntimeConfig] omits fault
1099/// tolerance configuration.
1100///
1101/// The default value for [FtConfig::model] enables fault tolerance, as
1102/// `Some(FtModel::default())`.  This is the configuration that one gets if
1103/// [RuntimeConfig] includes a fault tolerance configuration but does not
1104/// specify a particular model.
1105#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1106#[serde(rename_all = "snake_case")]
1107pub struct FtConfig {
1108    /// Fault tolerance model to use.
1109    #[serde(with = "none_as_string")]
1110    #[serde(default = "default_model")]
1111    #[schema(
1112        schema_with = none_as_string_schema::<FtModel>,
1113    )]
1114    pub model: Option<FtModel>,
1115
1116    /// Interval between automatic checkpoints, in seconds.
1117    ///
1118    /// The default is 60 seconds.  Values less than 1 or greater than 3600 will
1119    /// be forced into that range.
1120    #[serde(default = "default_checkpoint_interval_secs")]
1121    pub checkpoint_interval_secs: Option<u64>,
1122}
1123
1124fn default_model() -> Option<FtModel> {
1125    Some(FtModel::default())
1126}
1127
1128pub fn default_checkpoint_interval_secs() -> Option<u64> {
1129    Some(60)
1130}
1131
1132impl Default for FtConfig {
1133    fn default() -> Self {
1134        Self {
1135            model: None,
1136            checkpoint_interval_secs: default_checkpoint_interval_secs(),
1137        }
1138    }
1139}
1140
1141#[cfg(test)]
1142mod test {
1143    use super::deserialize_fault_tolerance;
1144    use crate::config::{FtConfig, FtModel};
1145    use serde::{Deserialize, Serialize};
1146
1147    #[test]
1148    fn ft_config() {
1149        #[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)]
1150        #[serde(default)]
1151        struct Wrapper {
1152            #[serde(deserialize_with = "deserialize_fault_tolerance")]
1153            config: FtConfig,
1154        }
1155
1156        // Omitting FtConfig, or specifying null, or specifying model "none", disables fault tolerance.
1157        for s in [
1158            "{}",
1159            r#"{"config": null}"#,
1160            r#"{"config": {"model": "none"}}"#,
1161        ] {
1162            let config: Wrapper = serde_json::from_str(s).unwrap();
1163            assert_eq!(
1164                config,
1165                Wrapper {
1166                    config: FtConfig {
1167                        model: None,
1168                        checkpoint_interval_secs: Some(60)
1169                    }
1170                }
1171            );
1172        }
1173
1174        // Serializing disabled FT produces explicit "none" form.
1175        let s = serde_json::to_string(&Wrapper {
1176            config: FtConfig::default(),
1177        })
1178        .unwrap();
1179        assert!(s.contains("\"none\""));
1180
1181        // `{}` for FtConfig, or `{...}` with `model` omitted, enables fault
1182        // tolerance.
1183        for s in [r#"{"config": {}}"#, r#"{"checkpoint_interval_secs": 60}"#] {
1184            assert_eq!(
1185                serde_json::from_str::<FtConfig>(s).unwrap(),
1186                FtConfig {
1187                    model: Some(FtModel::default()),
1188                    checkpoint_interval_secs: Some(60)
1189                }
1190            );
1191        }
1192
1193        // `"checkpoint_interval_secs": null` disables periodic checkpointing.
1194        assert_eq!(
1195            serde_json::from_str::<FtConfig>(r#"{"checkpoint_interval_secs": null}"#).unwrap(),
1196            FtConfig {
1197                model: Some(FtModel::default()),
1198                checkpoint_interval_secs: None
1199            }
1200        );
1201    }
1202}
1203
1204impl FtConfig {
1205    pub fn is_enabled(&self) -> bool {
1206        self.model.is_some()
1207    }
1208
1209    /// Returns the checkpoint interval, if fault tolerance is enabled, and
1210    /// otherwise `None`.
1211    pub fn checkpoint_interval(&self) -> Option<Duration> {
1212        if self.is_enabled() {
1213            self.checkpoint_interval_secs
1214                .map(|interval| Duration::from_secs(interval.clamp(1, 3600)))
1215        } else {
1216            None
1217        }
1218    }
1219}
1220
1221/// Serde implementation for de/serializing a string into `Option<T>` where
1222/// `"none"` indicates `None` and any other string indicates `Some`.
1223///
1224/// This could be extended to handle non-strings by adding more forwarding
1225/// `visit_*` methods to the Visitor implementation.  I don't see a way to write
1226/// them automatically.
1227mod none_as_string {
1228    use std::marker::PhantomData;
1229
1230    use serde::de::{Deserialize, Deserializer, IntoDeserializer, Visitor};
1231    use serde::ser::{Serialize, Serializer};
1232
1233    pub(super) fn serialize<S, T>(value: &Option<T>, serializer: S) -> Result<S::Ok, S::Error>
1234    where
1235        S: Serializer,
1236        T: Serialize,
1237    {
1238        match value.as_ref() {
1239            Some(value) => value.serialize(serializer),
1240            None => "none".serialize(serializer),
1241        }
1242    }
1243
1244    struct NoneAsString<T>(PhantomData<fn() -> T>);
1245
1246    impl<'de, T> Visitor<'de> for NoneAsString<T>
1247    where
1248        T: Deserialize<'de>,
1249    {
1250        type Value = Option<T>;
1251
1252        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1253            formatter.write_str("string")
1254        }
1255
1256        fn visit_none<E>(self) -> Result<Self::Value, E>
1257        where
1258            E: serde::de::Error,
1259        {
1260            Ok(None)
1261        }
1262
1263        fn visit_str<E>(self, value: &str) -> Result<Option<T>, E>
1264        where
1265            E: serde::de::Error,
1266        {
1267            if &value.to_ascii_lowercase() == "none" {
1268                Ok(None)
1269            } else {
1270                Ok(Some(T::deserialize(value.into_deserializer())?))
1271            }
1272        }
1273    }
1274
1275    pub(super) fn deserialize<'de, D, T>(deserializer: D) -> Result<Option<T>, D::Error>
1276    where
1277        D: Deserializer<'de>,
1278        T: Deserialize<'de>,
1279    {
1280        deserializer.deserialize_str(NoneAsString(PhantomData))
1281    }
1282}
1283
1284/// Generates an OpenAPI schema for an `Option<T>` field serialized with `none_as_string`.
1285/// The schema is a `oneOf` with a reference to `T`'s schema and a `"none"` string enum.
1286fn none_as_string_schema<'a, T: ToSchema<'a> + Default + Serialize>() -> Schema {
1287    Schema::OneOf(
1288        OneOfBuilder::new()
1289            .item(RefOr::Ref(Ref::new(format!(
1290                "#/components/schemas/{}",
1291                T::schema().0
1292            ))))
1293            .item(
1294                ObjectBuilder::new()
1295                    .schema_type(SchemaType::String)
1296                    .enum_values(Some(vec!["none"])),
1297            )
1298            .default(Some(
1299                serde_json::to_value(T::default()).expect("Failed to serialize default value"),
1300            ))
1301            .build(),
1302    )
1303}
1304
1305/// Fault tolerance model.
1306///
1307/// The ordering is significant: we consider [Self::ExactlyOnce] to be a "higher
1308/// level" of fault tolerance than [Self::AtLeastOnce].
1309#[derive(
1310    Debug, Copy, Clone, Default, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, ToSchema,
1311)]
1312#[serde(rename_all = "snake_case")]
1313pub enum FtModel {
1314    /// Each record is output at least once.  Crashes may duplicate output, but
1315    /// no input or output is dropped.
1316    AtLeastOnce,
1317
1318    /// Each record is output exactly once.  Crashes do not drop or duplicate
1319    /// input or output.
1320    #[default]
1321    ExactlyOnce,
1322}
1323
1324impl FtModel {
1325    pub fn option_as_str(value: Option<FtModel>) -> &'static str {
1326        value.map_or("no", |model| model.as_str())
1327    }
1328
1329    pub fn as_str(&self) -> &'static str {
1330        match self {
1331            FtModel::AtLeastOnce => "at_least_once",
1332            FtModel::ExactlyOnce => "exactly_once",
1333        }
1334    }
1335}
1336
1337pub struct FtModelUnknown;
1338
1339impl FromStr for FtModel {
1340    type Err = FtModelUnknown;
1341
1342    fn from_str(s: &str) -> Result<Self, Self::Err> {
1343        match s.to_ascii_lowercase().as_str() {
1344            "exactly_once" => Ok(Self::ExactlyOnce),
1345            "at_least_once" => Ok(Self::AtLeastOnce),
1346            _ => Err(FtModelUnknown),
1347        }
1348    }
1349}
1350
1351/// Describes an input connector configuration
1352#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1353pub struct InputEndpointConfig {
1354    /// The name of the input stream of the circuit that this endpoint is
1355    /// connected to.
1356    pub stream: Cow<'static, str>,
1357
1358    /// Connector configuration.
1359    #[serde(flatten)]
1360    pub connector_config: ConnectorConfig,
1361}
1362
1363/// Deserialize the `start_after` property of a connector configuration.
1364/// It requires a non-standard deserialization because we want to accept
1365/// either a string or an array of strings.
1366fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
1367where
1368    D: Deserializer<'de>,
1369{
1370    let value = Option::<JsonValue>::deserialize(deserializer)?;
1371    match value {
1372        Some(JsonValue::String(s)) => Ok(Some(vec![s])),
1373        Some(JsonValue::Array(arr)) => {
1374            let vec = arr
1375                .into_iter()
1376                .map(|item| {
1377                    item.as_str()
1378                        .map(|s| s.to_string())
1379                        .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
1380                })
1381                .collect::<Result<Vec<String>, _>>()?;
1382            Ok(Some(vec))
1383        }
1384        Some(JsonValue::Null) | None => Ok(None),
1385        _ => Err(serde::de::Error::custom(
1386            "invalid 'start_after' property: expected a string, an array of strings, or null",
1387        )),
1388    }
1389}
1390
1391/// A data connector's configuration
1392#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1393pub struct ConnectorConfig {
1394    /// Transport endpoint configuration.
1395    pub transport: TransportConfig,
1396
1397    #[serde(skip_serializing_if = "Option::is_none")]
1398    pub preprocessor: Option<Vec<PreprocessorConfig>>,
1399
1400    /// Parser configuration.
1401    pub format: Option<FormatConfig>,
1402
1403    /// Name of the index that the connector is attached to.
1404    ///
1405    /// This property is valid for output connectors only.  It is used with data
1406    /// transports and formats that expect output updates in the form of key/value
1407    /// pairs, where the key typically represents a unique id associated with the
1408    /// table or view.
1409    ///
1410    /// To support such output formats, an output connector can be attached to an
1411    /// index created using the SQL CREATE INDEX statement.  An index of a table
1412    /// or view contains the same updates as the table or view itself, indexed by
1413    /// one or more key columns.
1414    ///
1415    /// See individual connector documentation for details on how they work
1416    /// with indexes.
1417    pub index: Option<String>,
1418
1419    /// Output buffer configuration.
1420    #[serde(flatten)]
1421    pub output_buffer_config: OutputBufferConfig,
1422
1423    /// Maximum number of records from this connector to process in a single batch.
1424    ///
1425    /// When set, this caps how many records are taken from the connector’s input
1426    /// buffer and pushed through the circuit at once.
1427    ///
1428    /// This is typically configured lower than `max_queued_records` to allow the
1429    /// connector time to restart and refill its buffer while a batch is being
1430    /// processed.
1431    ///
1432    /// Not all input adapters honor this limit.
1433    ///
1434    /// If this is not set, the batch size is derived from `max_worker_batch_size`.
1435    #[serde(skip_serializing_if = "Option::is_none")]
1436    pub max_batch_size: Option<u64>,
1437
1438    /// Maximum number of records processed per batch, per worker thread.
1439    ///
1440    /// When `max_batch_size` is not set, this setting is used to cap
1441    /// the number of records that can be taken from the connector’s input
1442    /// buffer and pushed through the circuit at once.  The effective batch size is computed as:
1443    /// `max_worker_batch_size × workers`.
1444    ///
1445    /// This provides an alternative to `max_batch_size` that automatically adjusts batch
1446    /// size as the number of worker threads changes to maintain constant amount of
1447    /// work per worker per batch.
1448    ///
1449    /// Defaults to 10,000 records per worker.
1450    #[serde(skip_serializing_if = "Option::is_none")]
1451    pub max_worker_batch_size: Option<u64>,
1452
1453    /// Backpressure threshold.
1454    ///
1455    /// Maximal number of records queued by the endpoint before the endpoint
1456    /// is paused by the backpressure mechanism.
1457    ///
1458    /// For input endpoints, this setting bounds the number of records that have
1459    /// been received from the input transport but haven't yet been consumed by
1460    /// the circuit since the circuit, since the circuit is still busy processing
1461    /// previous inputs.
1462    ///
1463    /// For output endpoints, this setting bounds the number of records that have
1464    /// been produced by the circuit but not yet sent via the output transport endpoint
1465    /// nor stored in the output buffer (see `enable_output_buffer`).
1466    ///
1467    /// Note that this is not a hard bound: there can be a small delay between
1468    /// the backpressure mechanism is triggered and the endpoint is paused, during
1469    /// which more data may be queued.
1470    ///
1471    /// The default is 1 million.
1472    #[serde(default = "default_max_queued_records")]
1473    pub max_queued_records: u64,
1474
1475    /// Create connector in paused state.
1476    ///
1477    /// The default is `false`.
1478    #[serde(default)]
1479    pub paused: bool,
1480
1481    /// Arbitrary user-defined text labels associated with the connector.
1482    ///
1483    /// These labels can be used in conjunction with the `start_after` property
1484    /// to control the start order of connectors.
1485    #[serde(default)]
1486    pub labels: Vec<String>,
1487
1488    /// Start the connector after all connectors with specified labels.
1489    ///
1490    /// This property is used to control the start order of connectors.
1491    /// The connector will not start until all connectors with the specified
1492    /// labels have finished processing all inputs.
1493    #[serde(deserialize_with = "deserialize_start_after")]
1494    #[serde(default)]
1495    pub start_after: Option<Vec<String>>,
1496}
1497
1498impl ConnectorConfig {
1499    /// Compare two configs modulo the `paused` field.
1500    ///
1501    /// Used to compare checkpointed and current connector configs.
1502    pub fn equal_modulo_paused(&self, other: &Self) -> bool {
1503        let mut a = self.clone();
1504        let mut b = other.clone();
1505        a.paused = false;
1506        b.paused = false;
1507        a == b
1508    }
1509}
1510
1511#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1512#[serde(default)]
1513pub struct OutputBufferConfig {
1514    /// Enable output buffering.
1515    ///
1516    /// The output buffering mechanism allows decoupling the rate at which the pipeline
1517    /// pushes changes to the output transport from the rate of input changes.
1518    ///
1519    /// By default, output updates produced by the pipeline are pushed directly to
1520    /// the output transport. Some destinations may prefer to receive updates in fewer
1521    /// bigger batches. For instance, when writing Parquet files, producing
1522    /// one bigger file every few minutes is usually better than creating
1523    /// small files every few milliseconds.
1524    ///
1525    /// To achieve such input/output decoupling, users can enable output buffering by
1526    /// setting the `enable_output_buffer` flag to `true`.  When buffering is enabled, output
1527    /// updates produced by the pipeline are consolidated in an internal buffer and are
1528    /// pushed to the output transport when one of several conditions is satisfied:
1529    ///
1530    /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
1531    ///   milliseconds.
1532    /// * buffer size exceeds `max_output_buffer_size_records` records.
1533    ///
1534    /// This flag is `false` by default.
1535    // TODO: on-demand output triggered via the API.
1536    pub enable_output_buffer: bool,
1537
1538    /// Maximum time in milliseconds data is kept in the output buffer.
1539    ///
1540    /// By default, data is kept in the buffer indefinitely until one of
1541    /// the other output conditions is satisfied.  When this option is
1542    /// set the buffer will be flushed at most every
1543    /// `max_output_buffer_time_millis` milliseconds.
1544    ///
1545    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1546    /// to be set.
1547    pub max_output_buffer_time_millis: usize,
1548
1549    /// Maximum number of updates to be kept in the output buffer.
1550    ///
1551    /// This parameter bounds the maximal size of the buffer.
1552    /// Note that the size of the buffer is not always equal to the
1553    /// total number of updates output by the pipeline. Updates to the
1554    /// same record can overwrite or cancel previous updates.
1555    ///
1556    /// By default, the buffer can grow indefinitely until one of
1557    /// the other output conditions is satisfied.
1558    ///
1559    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1560    /// to be set.
1561    pub max_output_buffer_size_records: usize,
1562}
1563
1564impl Default for OutputBufferConfig {
1565    fn default() -> Self {
1566        Self {
1567            enable_output_buffer: false,
1568            max_output_buffer_size_records: usize::MAX,
1569            max_output_buffer_time_millis: usize::MAX,
1570        }
1571    }
1572}
1573
1574impl OutputBufferConfig {
1575    pub fn validate(&self) -> Result<(), String> {
1576        if self.enable_output_buffer
1577            && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
1578            && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
1579        {
1580            return Err(
1581                "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
1582                    .to_string(),
1583            );
1584        }
1585
1586        Ok(())
1587    }
1588}
1589
1590/// Describes an output connector configuration
1591#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1592pub struct OutputEndpointConfig {
1593    /// The name of the output stream of the circuit that this endpoint is
1594    /// connected to.
1595    pub stream: Cow<'static, str>,
1596
1597    /// Connector configuration.
1598    #[serde(flatten)]
1599    pub connector_config: ConnectorConfig,
1600}
1601
1602/// Transport-specific endpoint configuration passed to
1603/// `crate::OutputTransport::new_endpoint`
1604/// and `crate::InputTransport::new_endpoint`.
1605#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1606#[serde(tag = "name", content = "config", rename_all = "snake_case")]
1607pub enum TransportConfig {
1608    FileInput(FileInputConfig),
1609    FileOutput(FileOutputConfig),
1610    NatsInput(NatsInputConfig),
1611    KafkaInput(KafkaInputConfig),
1612    KafkaOutput(KafkaOutputConfig),
1613    PubSubInput(PubSubInputConfig),
1614    UrlInput(UrlInputConfig),
1615    S3Input(S3InputConfig),
1616    DeltaTableInput(DeltaTableReaderConfig),
1617    DeltaTableOutput(DeltaTableWriterConfig),
1618    RedisOutput(RedisOutputConfig),
1619    // Prevent rust from complaining about large size difference between enum variants.
1620    IcebergInput(Box<IcebergReaderConfig>),
1621    PostgresInput(PostgresReaderConfig),
1622    PostgresOutput(PostgresWriterConfig),
1623    Datagen(DatagenInputConfig),
1624    Nexmark(NexmarkInputConfig),
1625    /// Direct HTTP input: cannot be instantiated through API
1626    HttpInput(HttpInputConfig),
1627    /// Direct HTTP output: cannot be instantiated through API
1628    HttpOutput,
1629    /// Ad hoc input: cannot be instantiated through API
1630    AdHocInput(AdHocInputConfig),
1631    ClockInput(ClockConfig),
1632}
1633
1634impl TransportConfig {
1635    pub fn name(&self) -> String {
1636        match self {
1637            TransportConfig::FileInput(_) => "file_input".to_string(),
1638            TransportConfig::FileOutput(_) => "file_output".to_string(),
1639            TransportConfig::NatsInput(_) => "nats_input".to_string(),
1640            TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
1641            TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
1642            TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
1643            TransportConfig::UrlInput(_) => "url_input".to_string(),
1644            TransportConfig::S3Input(_) => "s3_input".to_string(),
1645            TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
1646            TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
1647            TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
1648            TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
1649            TransportConfig::PostgresOutput(_) => "postgres_output".to_string(),
1650            TransportConfig::Datagen(_) => "datagen".to_string(),
1651            TransportConfig::Nexmark(_) => "nexmark".to_string(),
1652            TransportConfig::HttpInput(_) => "http_input".to_string(),
1653            TransportConfig::HttpOutput => "http_output".to_string(),
1654            TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
1655            TransportConfig::RedisOutput(_) => "redis_output".to_string(),
1656            TransportConfig::ClockInput(_) => "clock".to_string(),
1657        }
1658    }
1659
1660    /// Returns true if the connector is transient, i.e., is created and destroyed
1661    /// at runtime on demand, rather than being configured as part of the pipeline.
1662    pub fn is_transient(&self) -> bool {
1663        matches!(
1664            self,
1665            TransportConfig::AdHocInput(_)
1666                | TransportConfig::HttpInput(_)
1667                | TransportConfig::HttpOutput
1668                | TransportConfig::ClockInput(_)
1669        )
1670    }
1671
1672    pub fn is_http_input(&self) -> bool {
1673        matches!(self, TransportConfig::HttpInput(_))
1674    }
1675}
1676
1677/// Data format specification used to parse raw data received from the
1678/// endpoint or to encode data sent to the endpoint.
1679#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
1680pub struct FormatConfig {
1681    /// Format name, e.g., "csv", "json", "bincode", etc.
1682    pub name: Cow<'static, str>,
1683
1684    /// Format-specific parser or encoder configuration.
1685    #[serde(default)]
1686    #[schema(value_type = Object)]
1687    pub config: JsonValue,
1688}
1689
1690#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, ToSchema)]
1691#[serde(default)]
1692pub struct ResourceConfig {
1693    /// The minimum number of CPU cores to reserve
1694    /// for an instance of this pipeline
1695    #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1696    pub cpu_cores_min: Option<f64>,
1697
1698    /// The maximum number of CPU cores to reserve
1699    /// for an instance of this pipeline
1700    #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1701    pub cpu_cores_max: Option<f64>,
1702
1703    /// The minimum memory in Megabytes to reserve
1704    /// for an instance of this pipeline
1705    pub memory_mb_min: Option<u64>,
1706
1707    /// The maximum memory in Megabytes to reserve
1708    /// for an instance of this pipeline
1709    pub memory_mb_max: Option<u64>,
1710
1711    /// The total storage in Megabytes to reserve
1712    /// for an instance of this pipeline
1713    pub storage_mb_max: Option<u64>,
1714
1715    /// Storage class to use for an instance of this pipeline.
1716    /// The class determines storage performance such as IOPS and throughput.
1717    pub storage_class: Option<String>,
1718
1719    /// Kubernetes service account name to use for an instance of this pipeline.
1720    /// The account determines permissions and access controls.
1721    pub service_account_name: Option<String>,
1722
1723    /// Kubernetes namespace to use for an instance of this pipeline.
1724    /// The namespace determines the scope of names for resources created
1725    /// for the pipeline.
1726    /// If not set, the pipeline will be deployed in the same namespace
1727    /// as the control-plane.
1728    pub namespace: Option<String>,
1729}