Skip to main content

feldera_types/
config.rs

1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure.  The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs.  We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::preprocess::PreprocessorConfig;
9use crate::program_schema::ProgramSchema;
10use crate::secret_resolver::default_secrets_directory;
11use crate::transport::adhoc::AdHocInputConfig;
12use crate::transport::clock::ClockConfig;
13use crate::transport::datagen::DatagenInputConfig;
14use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
15use crate::transport::file::{FileInputConfig, FileOutputConfig};
16use crate::transport::http::{HttpInputConfig, HttpOutputConfig};
17use crate::transport::iceberg::IcebergReaderConfig;
18use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
19use crate::transport::nats::NatsInputConfig;
20use crate::transport::nexmark::NexmarkInputConfig;
21use crate::transport::postgres::{
22    PostgresCdcReaderConfig, PostgresReaderConfig, PostgresWriterConfig,
23};
24use crate::transport::pubsub::PubSubInputConfig;
25use crate::transport::redis::RedisOutputConfig;
26use crate::transport::s3::S3InputConfig;
27use crate::transport::url::UrlInputConfig;
28use core::fmt;
29use feldera_ir::{MirNode, MirNodeId};
30use serde::de::{self, MapAccess, Visitor};
31use serde::{Deserialize, Deserializer, Serialize};
32use serde_json::Value as JsonValue;
33use std::collections::HashMap;
34use std::fmt::Display;
35use std::path::Path;
36use std::str::FromStr;
37use std::time::Duration;
38use std::{borrow::Cow, cmp::max, collections::BTreeMap};
39use utoipa::ToSchema;
40use utoipa::openapi::{ObjectBuilder, OneOfBuilder, Ref, RefOr, Schema, SchemaType};
41
42pub mod dev_tweaks;
43pub use dev_tweaks::DevTweaks;
44
45const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
46
47/// Default value of `ConnectorConfig::max_queued_records`.
48pub const fn default_max_queued_records() -> u64 {
49    1_000_000
50}
51
52pub const DEFAULT_MAX_WORKER_BATCH_SIZE: u64 = 10_000;
53
54pub const DEFAULT_CLOCK_RESOLUTION_USECS: u64 = 1_000_000;
55
56/// Program information included in the pipeline configuration.
57#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)]
58pub struct ProgramIr {
59    /// The MIR of the program.
60    pub mir: HashMap<MirNodeId, MirNode>,
61    /// Program schema.
62    pub program_schema: ProgramSchema,
63}
64
65/// Pipeline deployment configuration.
66/// It represents configuration entries directly provided by the user
67/// (e.g., runtime configuration) and entries derived from the schema
68/// of the compiled program (e.g., connectors). Storage configuration,
69/// if applicable, is set by the runner.
70#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq)]
71pub struct PipelineConfig {
72    /// Global controller configuration.
73    #[serde(flatten)]
74    #[schema(inline)]
75    pub global: RuntimeConfig,
76
77    /// Configuration for multihost pipelines.
78    ///
79    /// The presence of this field indicates that the pipeline is running in
80    /// multihost mode.  In the pod with ordinal 0, this triggers starting the
81    /// coordinator process.  In all pods, this tells the pipeline process to
82    /// await a connection from the coordinator instead of initializing the
83    /// pipeline immediately.
84    pub multihost: Option<MultihostConfig>,
85
86    /// Unique system-generated name of the pipeline (format: `pipeline-<uuid>`).
87    /// It is unique across all tenants and cannot be changed.
88    ///
89    /// The `<uuid>` is also used in the naming of various resources that back the pipeline,
90    /// and as such this name is useful to find/identify corresponding resources.
91    pub name: Option<String>,
92
93    /// Name given by the tenant to the pipeline. It is only unique within the same tenant, and can
94    /// be changed by the tenant when the pipeline is stopped.
95    ///
96    /// Given a specific tenant, it can be used to find/identify a specific pipeline of theirs.
97    pub given_name: Option<String>,
98
99    /// Configuration for persistent storage
100    ///
101    /// If `global.storage` is `Some(_)`, this field must be set to some
102    /// [`StorageConfig`].  If `global.storage` is `None``, the pipeline ignores
103    /// this field.
104    #[serde(default)]
105    pub storage_config: Option<StorageConfig>,
106
107    /// Directory containing values of secrets.
108    ///
109    /// If this is not set, a default directory is used.
110    pub secrets_dir: Option<String>,
111
112    /// Input endpoint configuration.
113    #[serde(default)]
114    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
115
116    /// Output endpoint configuration.
117    #[serde(default)]
118    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
119
120    /// Program information.
121    #[serde(default)]
122    pub program_ir: Option<ProgramIr>,
123}
124
125impl PipelineConfig {
126    pub fn max_parallel_connector_init(&self) -> u64 {
127        max(
128            self.global
129                .max_parallel_connector_init
130                .unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
131            1,
132        )
133    }
134
135    pub fn with_storage(self, storage: Option<(StorageConfig, StorageOptions)>) -> Self {
136        let (storage_config, storage_options) = storage.unzip();
137        Self {
138            global: RuntimeConfig {
139                storage: storage_options,
140                ..self.global
141            },
142            storage_config,
143            ..self
144        }
145    }
146
147    pub fn storage(&self) -> Option<(&StorageConfig, &StorageOptions)> {
148        let storage_options = self.global.storage.as_ref();
149        let storage_config = self.storage_config.as_ref();
150        storage_config.zip(storage_options)
151    }
152
153    /// Returns `self.secrets_dir`, or the default secrets directory if it isn't
154    /// set.
155    pub fn secrets_dir(&self) -> &Path {
156        match &self.secrets_dir {
157            Some(dir) => Path::new(dir.as_str()),
158            None => default_secrets_directory(),
159        }
160    }
161
162    /// Abbreviated config that can be printed in the log on pipeline startup.
163    pub fn display_summary(&self) -> String {
164        // TODO: we may want to further abbreviate connector config.
165        let summary = serde_json::json!({
166            "name": self.name,
167            "given_name": self.given_name,
168            "global": self.global,
169            "storage_config": self.storage_config,
170            "secrets_dir": self.secrets_dir,
171            "inputs": self.inputs,
172            "outputs": self.outputs
173        });
174
175        serde_json::to_string_pretty(&summary).unwrap_or_else(|_| "{}".to_string())
176    }
177}
178
179/// A subset of fields in `PipelineConfig` that are generated by the compiler.
180/// These fields are shipped to the pipeline by the compilation server along with
181/// the program binary.
182// Note: An alternative would be to embed these fields in the program binary itself
183// as static strings. This would work well for program IR, but it would require recompiling
184// the program anytime a connector config changes, whereas today connector changes
185// do not require recompilation.
186#[derive(Default, Deserialize, Serialize, Eq, PartialEq, Debug, Clone)]
187pub struct PipelineConfigProgramInfo {
188    /// Input endpoint configuration.
189    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
190
191    /// Output endpoint configuration.
192    #[serde(default)]
193    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
194
195    /// Program information.
196    #[serde(default)]
197    pub program_ir: Option<ProgramIr>,
198}
199
200/// Configuration for a multihost Feldera pipeline.
201///
202/// This configuration is primarily for the coordinator.
203#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
204pub struct MultihostConfig {
205    /// Number of hosts to launch.
206    ///
207    /// For the configuration to be truly multihost, this should be at least 2.
208    /// A value of 1 still runs the multihost coordinator but it only
209    /// coordinates a single host.
210    pub hosts: usize,
211}
212
213impl Default for MultihostConfig {
214    fn default() -> Self {
215        Self { hosts: 1 }
216    }
217}
218
219/// Configuration for persistent storage in a [`PipelineConfig`].
220#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
221pub struct StorageConfig {
222    /// A directory to keep pipeline state, as a path on the filesystem of the
223    /// machine or container where the pipeline will run.
224    ///
225    /// When storage is enabled, this directory stores the data for
226    /// [StorageBackendConfig::Default].
227    ///
228    /// When fault tolerance is enabled, this directory stores checkpoints and
229    /// the log.
230    pub path: String,
231
232    /// How to cache access to storage in this pipeline.
233    #[serde(default)]
234    pub cache: StorageCacheConfig,
235}
236
237impl StorageConfig {
238    pub fn path(&self) -> &Path {
239        Path::new(&self.path)
240    }
241}
242
243/// How to cache access to storage within a Feldera pipeline.
244#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
245#[serde(rename_all = "snake_case")]
246pub enum StorageCacheConfig {
247    /// Use the operating system's page cache as the primary storage cache.
248    ///
249    /// This is the default because it currently performs better than
250    /// `FelderaCache`.
251    #[default]
252    PageCache,
253
254    /// Use Feldera's internal cache implementation.
255    ///
256    /// This is under development. It will become the default when its
257    /// performance exceeds that of `PageCache`.
258    FelderaCache,
259}
260
261impl StorageCacheConfig {
262    #[cfg(unix)]
263    pub fn to_custom_open_flags(&self) -> i32 {
264        match self {
265            StorageCacheConfig::PageCache => (),
266            StorageCacheConfig::FelderaCache => {
267                #[cfg(target_os = "linux")]
268                return libc::O_DIRECT;
269            }
270        }
271        0
272    }
273}
274
275/// Storage configuration for a pipeline.
276#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
277#[serde(default)]
278pub struct StorageOptions {
279    /// How to connect to the underlying storage.
280    pub backend: StorageBackendConfig,
281
282    /// For a batch of data maintained as part of a persistent index during a
283    /// pipeline run, the minimum estimated number of bytes to write it to
284    /// storage.
285    ///
286    /// This is provided for debugging and fine-tuning and should ordinarily be
287    /// left unset.
288    ///
289    /// A value of 0 will write even empty batches to storage, and nonzero
290    /// values provide a threshold.  `usize::MAX` would effectively disable
291    /// storage for such batches.  The default is 10,048,576 (10 MiB).
292    pub min_storage_bytes: Option<usize>,
293
294    /// For a batch of data passed through the pipeline during a single step,
295    /// the minimum estimated number of bytes to write it to storage.
296    ///
297    /// This is provided for debugging and fine-tuning and should ordinarily be
298    /// left unset.  A value of 0 will write even empty batches to storage, and
299    /// nonzero values provide a threshold.  `usize::MAX`, the default,
300    /// effectively disables storage for such batches.  If it is set to another
301    /// value, it should ordinarily be greater than or equal to
302    /// `min_storage_bytes`.
303    pub min_step_storage_bytes: Option<usize>,
304
305    /// The form of compression to use in data batches.
306    ///
307    /// Compression has a CPU cost but it can take better advantage of limited
308    /// NVMe and network bandwidth, which means that it can increase overall
309    /// performance.
310    pub compression: StorageCompression,
311
312    /// The maximum size of the in-memory storage cache, in MiB.
313    ///
314    /// If set, the specified cache size is spread across all the foreground and
315    /// background threads. If unset, each foreground or background thread cache
316    /// is limited to 256 MiB.
317    pub cache_mib: Option<usize>,
318}
319
320/// Backend storage configuration.
321#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
322#[serde(tag = "name", content = "config", rename_all = "snake_case")]
323pub enum StorageBackendConfig {
324    /// Use the default storage configuration.
325    ///
326    /// This currently uses the local file system.
327    #[default]
328    Default,
329
330    /// Use the local file system.
331    ///
332    /// This uses ordinary system file operations.
333    File(Box<FileBackendConfig>),
334
335    /// Object storage.
336    Object(ObjectStorageConfig),
337}
338
339impl Display for StorageBackendConfig {
340    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
341        match self {
342            StorageBackendConfig::Default => write!(f, "default"),
343            StorageBackendConfig::File(_) => write!(f, "file"),
344            StorageBackendConfig::Object(_) => write!(f, "object"),
345        }
346    }
347}
348
349/// Storage compression algorithm.
350#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
351#[serde(rename_all = "snake_case")]
352pub enum StorageCompression {
353    /// Use Feldera's default compression algorithm.
354    ///
355    /// The default may change as Feldera's performance is tuned and new
356    /// algorithms are introduced.
357    #[default]
358    Default,
359
360    /// Do not compress.
361    None,
362
363    /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
364    Snappy,
365}
366
367#[derive(Debug, Clone, Eq, PartialEq)]
368pub enum StartFromCheckpoint {
369    Latest,
370    Uuid(uuid::Uuid),
371}
372
373impl ToSchema<'_> for StartFromCheckpoint {
374    fn schema() -> (
375        &'static str,
376        utoipa::openapi::RefOr<utoipa::openapi::schema::Schema>,
377    ) {
378        (
379            "StartFromCheckpoint",
380            utoipa::openapi::RefOr::T(Schema::OneOf(
381                OneOfBuilder::new()
382                    .item(
383                        ObjectBuilder::new()
384                            .schema_type(SchemaType::String)
385                            .enum_values(Some(["latest"].into_iter()))
386                            .build(),
387                    )
388                    .item(
389                        ObjectBuilder::new()
390                            .schema_type(SchemaType::String)
391                            .format(Some(utoipa::openapi::SchemaFormat::KnownFormat(
392                                utoipa::openapi::KnownFormat::Uuid,
393                            )))
394                            .build(),
395                    )
396                    .nullable(true)
397                    .build(),
398            )),
399        )
400    }
401}
402
403impl<'de> Deserialize<'de> for StartFromCheckpoint {
404    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
405    where
406        D: Deserializer<'de>,
407    {
408        struct StartFromCheckpointVisitor;
409
410        impl<'de> Visitor<'de> for StartFromCheckpointVisitor {
411            type Value = StartFromCheckpoint;
412
413            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
414                formatter.write_str("a UUID string or the string \"latest\"")
415            }
416
417            fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
418            where
419                E: de::Error,
420            {
421                if value == "latest" {
422                    Ok(StartFromCheckpoint::Latest)
423                } else {
424                    uuid::Uuid::parse_str(value)
425                        .map(StartFromCheckpoint::Uuid)
426                        .map_err(|_| E::invalid_value(serde::de::Unexpected::Str(value), &self))
427                }
428            }
429        }
430
431        deserializer.deserialize_str(StartFromCheckpointVisitor)
432    }
433}
434
435impl Serialize for StartFromCheckpoint {
436    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
437    where
438        S: serde::Serializer,
439    {
440        match self {
441            StartFromCheckpoint::Latest => serializer.serialize_str("latest"),
442            StartFromCheckpoint::Uuid(uuid) => serializer.serialize_str(&uuid.to_string()),
443        }
444    }
445}
446
447#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
448pub struct SyncConfig {
449    /// The endpoint URL for the storage service.
450    ///
451    /// This is typically required for custom or local S3-compatible storage providers like MinIO.
452    /// Example: `http://localhost:9000`
453    ///
454    /// Relevant rclone config key: [`endpoint`](https://rclone.org/s3/#s3-endpoint)
455    pub endpoint: Option<String>,
456
457    /// The name of the storage bucket.
458    ///
459    /// This may include a path to a folder inside the bucket (e.g., `my-bucket/data`).
460    pub bucket: String,
461
462    /// The region that this bucket is in.
463    ///
464    /// Leave empty for Minio or the default region (`us-east-1` for AWS).
465    pub region: Option<String>,
466
467    /// The name of the cloud storage provider (e.g., `"AWS"`, `"Minio"`).
468    ///
469    /// Used for provider-specific behavior in rclone.
470    /// If omitted, defaults to `"Other"`.
471    ///
472    /// See [rclone S3 provider documentation](https://rclone.org/s3/#s3-provider)
473    pub provider: Option<String>,
474
475    /// The access key used to authenticate with the storage provider.
476    ///
477    /// If not provided, rclone will fall back to environment-based credentials, such as
478    /// `RCLONE_S3_ACCESS_KEY_ID`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
479    /// this can be left empty to allow automatic authentication via the pod's service account.
480    pub access_key: Option<String>,
481
482    /// The secret key used together with the access key for authentication.
483    ///
484    /// If not provided, rclone will fall back to environment-based credentials, such as
485    /// `RCLONE_S3_SECRET_ACCESS_KEY`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
486    /// this can be left empty to allow automatic authentication via the pod's service account.
487    pub secret_key: Option<String>,
488
489    /// When set, the pipeline will try fetch the specified checkpoint from the
490    /// object store.
491    ///
492    /// If `fail_if_no_checkpoint` is `true`, the pipeline will fail to initialize.
493    pub start_from_checkpoint: Option<StartFromCheckpoint>,
494
495    /// When true, the pipeline will fail to initialize if fetching the
496    /// specified checkpoint fails (missing, download error).
497    /// When false, the pipeline will start from scratch instead.
498    ///
499    /// False by default.
500    #[schema(default = std::primitive::bool::default)]
501    #[serde(default)]
502    pub fail_if_no_checkpoint: bool,
503
504    /// The number of file transfers to run in parallel.
505    /// Default: 20
506    pub transfers: Option<u8>,
507
508    /// The number of checkers to run in parallel.
509    /// Default: 20
510    pub checkers: Option<u8>,
511
512    /// Set to skip post copy check of checksums, and only check the file sizes.
513    /// This can significantly improve the throughput.
514    /// Defualt: false
515    pub ignore_checksum: Option<bool>,
516
517    /// Number of streams to use for multi-thread downloads.
518    /// Default: 10
519    pub multi_thread_streams: Option<u8>,
520
521    /// Use multi-thread download for files above this size.
522    /// Format: `[size][Suffix]` (Example: 1G, 500M)
523    /// Supported suffixes: k|M|G|T
524    /// Default: 100M
525    pub multi_thread_cutoff: Option<String>,
526
527    /// The number of chunks of the same file that are uploaded for multipart uploads.
528    /// Default: 10
529    pub upload_concurrency: Option<u8>,
530
531    /// When `true`, the pipeline starts in **standby** mode; processing doesn't
532    /// start until activation (`POST /activate`).
533    /// If this pipeline was previously activated and the storage has not been
534    /// cleared, the pipeline will auto activate, no newer checkpoints will be
535    /// fetched.
536    ///
537    /// Standby behavior depends on `start_from_checkpoint`:
538    /// - If `latest`, pipeline continuously fetches the latest available
539    ///   checkpoint until activated.
540    /// - If checkpoint UUID, pipeline fetches this checkpoint once and waits
541    ///   in standby until activated.
542    ///
543    /// Default: `false`
544    #[schema(default = std::primitive::bool::default)]
545    #[serde(default)]
546    pub standby: bool,
547
548    /// The interval (in seconds) between each attempt to fetch the latest
549    /// checkpoint from object store while in standby mode.
550    ///
551    /// Applies only when `start_from_checkpoint` is set to `latest`.
552    ///
553    /// Default: 10 seconds
554    #[schema(default = default_pull_interval)]
555    #[serde(default = "default_pull_interval")]
556    pub pull_interval: u64,
557
558    /// The interval (in seconds) between each push of checkpoints to object store.
559    ///
560    /// Default: disabled (no periodic push).
561    #[serde(default)]
562    pub push_interval: Option<u64>,
563
564    /// Extra flags to pass to `rclone`.
565    ///
566    /// WARNING: Supplying incorrect or conflicting flags can break `rclone`.
567    /// Use with caution.
568    ///
569    /// Refer to the docs to see the supported flags:
570    /// - [Global flags](https://rclone.org/flags/)
571    /// - [S3 specific flags](https://rclone.org/s3/)
572    pub flags: Option<Vec<String>>,
573
574    /// The minimum number of checkpoints to retain in object store.
575    /// No checkpoints will be deleted if the total count is below this threshold.
576    ///
577    /// Default: 10
578    #[schema(default = default_retention_min_count)]
579    #[serde(default = "default_retention_min_count")]
580    pub retention_min_count: u32,
581
582    /// The minimum age (in days) a checkpoint must reach before it becomes
583    /// eligible for deletion. All younger checkpoints will be preserved.
584    ///
585    /// Default: 30
586    #[schema(default = default_retention_min_age)]
587    #[serde(default = "default_retention_min_age")]
588    pub retention_min_age: u32,
589
590    /// A read-only bucket used as a fallback checkpoint source.
591    ///
592    /// When the pipeline has no local checkpoint and `bucket` contains no
593    /// checkpoint either, it will attempt to fetch the checkpoint from this
594    /// location instead.  All connection settings (`endpoint`, `region`,
595    /// `provider`, `access_key`, `secret_key`) are shared with `bucket`.
596    ///
597    /// The pipeline **never writes** to `read_bucket`.
598    ///
599    /// Must point to a different location than `bucket`.
600    #[serde(default)]
601    pub read_bucket: Option<String>,
602}
603
604fn default_pull_interval() -> u64 {
605    10
606}
607
608fn default_retention_min_count() -> u32 {
609    10
610}
611
612fn default_retention_min_age() -> u32 {
613    30
614}
615
616impl SyncConfig {
617    pub fn validate(&self) -> Result<(), String> {
618        if self.standby && self.start_from_checkpoint.is_none() {
619            return Err(r#"invalid sync config: `standby` set to `true` but `start_from_checkpoint` not set.
620Standby mode requires `start_from_checkpoint` to be set.
621Consider setting `start_from_checkpoint` to `"latest"`."#.to_owned());
622        }
623
624        if let Some(ref rb) = self.read_bucket
625            && rb == &self.bucket
626        {
627            return Err(
628                "invalid sync config: `read_bucket` and `bucket` must point to different locations"
629                    .to_owned(),
630            );
631        }
632
633        Ok(())
634    }
635}
636
637/// Configuration for supplying a custom pipeline StatefulSet template via a Kubernetes ConfigMap.
638///
639/// Operators can provide a custom StatefulSet YAML that the Kubernetes runner will use when
640/// creating pipeline StatefulSets for a pipeline. The custom template must be stored as the
641/// value of a key in a ConfigMap in the same namespace as the pipeline; set `name` to the
642/// ConfigMap name and `key` to the entry that contains the template.
643///
644/// Recommendations and requirements:
645/// - **Start from the default template and modify it as needed.** The default template is present
646///   in ConfigMap named as `<release-name>-pipeline-template`, with key `pipelineTemplate` in the release
647///   namespace and should be used as a reference.
648/// - The template must contain a valid Kubernetes `StatefulSet` manifest in YAML form. The
649///   runner substitutes variables in the template before parsing; therefore the final YAML
650///   must be syntactically valid.
651/// - The runner performs simple string substitution for the following placeholders. Please ensure these
652///   placeholders are placed at appropriate location for their semantics:
653///   - `{id}`: pipeline Kubernetes name (used for object names and labels)
654///   - `{namespace}`: Kubernetes namespace where the pipeline runs
655///   - `{pipeline_executor_image}`: container image used to run the pipeline executor
656///   - `{binary_ref}`: program binary reference passed as an argument
657///   - `{program_info_ref}`: program info reference passed as an argument
658///   - `{pipeline_storage_path}`: mount path for persistent pipeline storage
659///   - `{storage_class_name}`: storage class name to use for PVCs (if applicable)
660///   - `{deployment_id}`: UUID identifying the deployment instance
661///   - `{deployment_initial}`: initial desired runtime status (e.g., `provisioning`)
662///   - `{bootstrap_policy}`: bootstrap policy value when applicable
663#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
664pub struct PipelineTemplateConfig {
665    /// Name of the ConfigMap containing the pipeline template.
666    pub name: String,
667    /// Key in the ConfigMap containing the pipeline template.
668    ///
669    /// If not set, defaults to `pipelineTemplate`.
670    #[schema(default = default_pipeline_template_key)]
671    #[serde(default = "default_pipeline_template_key")]
672    pub key: String,
673}
674
675fn default_pipeline_template_key() -> String {
676    "pipelineTemplate".to_string()
677}
678
679#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
680pub struct ObjectStorageConfig {
681    /// URL.
682    ///
683    /// The following URL schemes are supported:
684    ///
685    /// * S3:
686    ///   - `s3://<bucket>/<path>`
687    ///   - `s3a://<bucket>/<path>`
688    ///   - `https://s3.<region>.amazonaws.com/<bucket>`
689    ///   - `https://<bucket>.s3.<region>.amazonaws.com`
690    ///   - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
691    /// * Google Cloud Storage:
692    ///   - `gs://<bucket>/<path>`
693    /// * Microsoft Azure Blob Storage:
694    ///   - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
695    ///   - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
696    ///   - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
697    ///   - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
698    ///   - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
699    ///   - `azure://<container>/<path>` (custom)
700    ///   - `https://<account>.dfs.core.windows.net`
701    ///   - `https://<account>.blob.core.windows.net`
702    ///   - `https://<account>.blob.core.windows.net/<container>`
703    ///   - `https://<account>.dfs.fabric.microsoft.com`
704    ///   - `https://<account>.dfs.fabric.microsoft.com/<container>`
705    ///   - `https://<account>.blob.fabric.microsoft.com`
706    ///   - `https://<account>.blob.fabric.microsoft.com/<container>`
707    ///
708    /// Settings derived from the URL will override other settings.
709    pub url: String,
710
711    /// Additional options as key-value pairs.
712    ///
713    /// The following keys are supported:
714    ///
715    /// * S3:
716    ///   - `access_key_id`: AWS Access Key.
717    ///   - `secret_access_key`: AWS Secret Access Key.
718    ///   - `region`: Region.
719    ///   - `default_region`: Default region.
720    ///   - `endpoint`: Custom endpoint for communicating with S3,
721    ///     e.g. `https://localhost:4566` for testing against a localstack
722    ///     instance.
723    ///   - `token`: Token to use for requests (passed to underlying provider).
724    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
725    /// * Google Cloud Storage:
726    ///   - `service_account`: Path to the service account file.
727    ///   - `service_account_key`: The serialized service account key.
728    ///   - `google_application_credentials`: Application credentials path.
729    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
730    /// * Microsoft Azure Blob Storage:
731    ///   - `access_key`: Azure Access Key.
732    ///   - `container_name`: Azure Container Name.
733    ///   - `account`: Azure Account.
734    ///   - `bearer_token_authorization`: Static bearer token for authorizing requests.
735    ///   - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
736    ///   - `client_secret`: Client secret for use in client secret flow.
737    ///   - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
738    ///   - `endpoint`: Override the endpoint for communicating with blob storage.
739    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
740    ///
741    /// Options set through the URL take precedence over those set with these
742    /// options.
743    #[serde(flatten)]
744    pub other_options: BTreeMap<String, String>,
745}
746
747/// Configuration for local file system access.
748#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
749#[serde(default)]
750pub struct FileBackendConfig {
751    /// Whether to use background threads for file I/O.
752    ///
753    /// Background threads should improve performance, but they can reduce
754    /// performance if too few cores are available. This is provided for
755    /// debugging and fine-tuning and should ordinarily be left unset.
756    pub async_threads: Option<bool>,
757
758    /// Per-I/O operation sleep duration, in milliseconds.
759    ///
760    /// This is for simulating slow storage devices.  Do not use this in
761    /// production.
762    pub ioop_delay: Option<u64>,
763
764    /// Configuration to synchronize checkpoints to object store.
765    pub sync: Option<SyncConfig>,
766}
767
768/// Global pipeline configuration settings. This is the publicly
769/// exposed type for users to configure pipelines.
770#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
771#[serde(default)]
772pub struct RuntimeConfig {
773    /// Number of DBSP worker threads.
774    ///
775    /// Each DBSP "foreground" worker thread is paired with a "background"
776    /// thread for LSM merging, making the total number of threads twice the
777    /// specified number.
778    ///
779    /// The typical sweet spot for the number of workers is between 4 and 16.
780    /// Each worker increases overall memory consumption for data structures
781    /// used during a step.
782    pub workers: u16,
783
784    /// The maximum amount of memory, in Megabytes, that the pipeline is allowed to use
785    /// on each host.
786    ///
787    /// Setting this property activates memory pressure monitoring and backpressure
788    /// mechanisms. The pipeline will track the amount of remaining memory and
789    /// report the memory pressure level via the `memory_pressure` metric.
790    ///
791    /// As the memory pressure increases, the system will apply increasing backpressure
792    /// to push state cached in memory to storage, preventing the pipeline from running
793    /// out of memory at the cost of some performance degradation.
794    ///
795    /// It is strongly recommended to set this property to prevent the pipeline from
796    /// running out of memory. The setting should not exceed the memory limit of the pipeline
797    /// instance.
798    ///
799    /// When `max_rss_mb` is not specified but `resources.memory_mb_max` is set, the
800    /// latter is used as the effective memory cap for the pipeline.
801    ///
802    /// See [documentation on the pipeline's memory usage](https://docs.feldera.com/operations/memory)
803    /// for more details.
804    pub max_rss_mb: Option<u64>,
805
806    /// DataFusion memory pool size, in MB, shared by the ad-hoc query
807    /// engine and the Delta Lake / Iceberg connectors.
808    ///
809    /// Carved out of `max_rss_mb` (falling back to
810    /// `resources.memory_mb_max`); the remainder goes to the DBSP circuit,
811    /// so the two do not double-book RAM.
812    ///
813    /// Unset: defaults to 5% of the effective budget, capped at 2 GB.
814    /// Pipelines that don't run heavy ad-hoc / Delta / Iceberg workloads
815    /// can leave this unset.
816    ///
817    /// Sort/aggregate-heavy ad-hoc queries (especially at high `workers`
818    /// counts) should set this explicitly. An under-sized pool surfaces as
819    /// `ResourcesExhausted` on the failing query — the pipeline keeps
820    /// running and only that query fails.
821    ///
822    /// No pool limit applied if no overall budget is configured.
823    ///
824    /// See [documentation on the pipeline's memory usage](https://docs.feldera.com/operations/memory)
825    /// for more details.
826    pub datafusion_memory_mb: Option<u64>,
827
828    /// Number of DBSP hosts.
829    ///
830    /// The worker threads are evenly divided among the hosts.  For single-host
831    /// deployments, this should be 1 (the default).
832    ///
833    /// Multihost pipelines are an enterprise-only preview feature.
834    pub hosts: usize,
835
836    /// Storage configuration.
837    ///
838    /// - If this is `None`, the default, the pipeline's state is kept in
839    ///   in-memory data-structures.  This is useful if the pipeline's state
840    ///   will fit in memory and if the pipeline is ephemeral and does not need
841    ///   to be recovered after a restart. The pipeline will most likely run
842    ///   faster since it does not need to access storage.
843    ///
844    /// - If set, the pipeline's state is kept on storage.  This allows the
845    ///   pipeline to work with state that will not fit into memory. It also
846    ///   allows the state to be checkpointed and recovered across restarts.
847    #[serde(deserialize_with = "deserialize_storage_options")]
848    pub storage: Option<StorageOptions>,
849
850    /// Fault tolerance configuration.
851    #[serde(deserialize_with = "deserialize_fault_tolerance")]
852    pub fault_tolerance: FtConfig,
853
854    /// Enable CPU profiler.
855    ///
856    /// The default value is `true`.
857    pub cpu_profiler: bool,
858
859    /// Enable pipeline tracing.
860    pub tracing: bool,
861
862    /// Jaeger tracing endpoint to send tracing information to.
863    pub tracing_endpoint_jaeger: String,
864
865    /// Minimal input batch size.
866    ///
867    /// The controller delays pushing input records to the circuit until at
868    /// least `min_batch_size_records` records have been received (total
869    /// across all endpoints) or `max_buffering_delay_usecs` microseconds
870    /// have passed since at least one input records has been buffered.
871    /// Defaults to 0.
872    pub min_batch_size_records: u64,
873
874    /// Maximal delay in microseconds to wait for `min_batch_size_records` to
875    /// get buffered by the controller, defaults to 0.
876    pub max_buffering_delay_usecs: u64,
877
878    /// Resource reservations and limits. This is enforced
879    /// only in Feldera Cloud.
880    pub resources: ResourceConfig,
881
882    /// Real-time clock resolution in microseconds.
883    ///
884    /// This parameter controls the execution of queries that use the `NOW()` function.  The output of such
885    /// queries depends on the real-time clock and can change over time without any external
886    /// inputs.  If the query uses `NOW()`, the pipeline will update the clock value and trigger incremental
887    /// recomputation at most each `clock_resolution_usecs` microseconds.  If the query does not use
888    /// `NOW()`, then clock value updates are suppressed and the pipeline ignores this setting.
889    ///
890    /// It is set to 1 second (1,000,000 microseconds) by default.
891    pub clock_resolution_usecs: Option<u64>,
892
893    /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
894    /// its worker threads.  Specify at least twice as many CPU numbers as
895    /// workers.  CPUs are generally numbered starting from 0.  The pipeline
896    /// might not be able to honor CPU pinning requests.
897    ///
898    /// CPU pinning can make pipelines run faster and perform more consistently,
899    /// as long as different pipelines running on the same machine are pinned to
900    /// different CPUs.
901    pub pin_cpus: Vec<usize>,
902
903    /// Timeout in seconds for the `Provisioning` phase of the pipeline.
904    /// Setting this value will override the default of the runner.
905    pub provisioning_timeout_secs: Option<u64>,
906
907    /// The maximum number of connectors initialized in parallel during pipeline
908    /// startup.
909    ///
910    /// At startup, the pipeline must initialize all of its input and output connectors.
911    /// Depending on the number and types of connectors, this can take a long time.
912    /// To accelerate the process, multiple connectors are initialized concurrently.
913    /// This option controls the maximum number of connectors that can be initialized
914    /// in parallel.
915    ///
916    /// The default is 10.
917    pub max_parallel_connector_init: Option<u64>,
918
919    /// Specification of additional (sidecar) containers.
920    pub init_containers: Option<serde_json::Value>,
921
922    /// Deprecated: setting this true or false does not have an effect anymore.
923    pub checkpoint_during_suspend: bool,
924
925    /// Sets the number of available runtime threads for the http server.
926    ///
927    /// In most cases, this does not need to be set explicitly and
928    /// the default is sufficient. Can be increased in case the
929    /// pipeline HTTP API operations are a bottleneck.
930    ///
931    /// If not specified, the default is set to `workers`.
932    pub http_workers: Option<u64>,
933
934    /// Sets the number of available runtime threads for async IO tasks.
935    ///
936    /// This affects some networking and file I/O operations
937    /// especially adapters and ad-hoc queries.
938    ///
939    /// In most cases, this does not need to be set explicitly and
940    /// the default is sufficient. Can be increased in case
941    /// ingress, egress or ad-hoc queries are a bottleneck.
942    ///
943    /// If not specified, the default is set to `workers`.
944    pub io_workers: Option<u64>,
945
946    /// Environment variables for the pipeline process.
947    ///
948    /// These are key-value pairs injected into the pipeline process environment.
949    /// Some variable names are reserved by the platform and cannot be overridden
950    /// (for example `RUST_LOG`, and variables in the `FELDERA_`,
951    /// `KUBERNETES_`, and `TOKIO_` namespaces).
952    #[serde(default)]
953    pub env: BTreeMap<String, String>,
954
955    /// Optional settings for tweaking Feldera internals.
956    pub dev_tweaks: DevTweaks,
957
958    /// Log filtering directives.
959    ///
960    /// If set to a valid [tracing-subscriber] filter, this controls the log
961    /// messages emitted by the pipeline process.  Otherwise, or if the filter
962    /// has invalid syntax, messages at "info" severity and higher are written
963    /// to the log and all others are discarded.
964    ///
965    /// [tracing-subscriber]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
966    pub logging: Option<String>,
967
968    /// ConfigMap containing a custom pipeline template (Enterprise only).
969    ///
970    /// This feature is only available in Feldera Enterprise. If set, the Kubernetes runner
971    /// will read the template from the specified ConfigMap and use it instead of the default
972    /// StatefulSet template for the configured pipeline.
973    ///
974    /// check [`PipelineTemplateConfig`] documentation for details.
975    pub pipeline_template_configmap: Option<PipelineTemplateConfig>,
976}
977
978/// Accepts "true" and "false" and converts them to the new format.
979fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
980where
981    D: Deserializer<'de>,
982{
983    struct BoolOrStruct;
984
985    impl<'de> Visitor<'de> for BoolOrStruct {
986        type Value = Option<StorageOptions>;
987
988        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
989            formatter.write_str("boolean or StorageOptions")
990        }
991
992        fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
993        where
994            E: de::Error,
995        {
996            match v {
997                false => Ok(None),
998                true => Ok(Some(StorageOptions::default())),
999            }
1000        }
1001
1002        fn visit_unit<E>(self) -> Result<Self::Value, E>
1003        where
1004            E: de::Error,
1005        {
1006            Ok(None)
1007        }
1008
1009        fn visit_none<E>(self) -> Result<Self::Value, E>
1010        where
1011            E: de::Error,
1012        {
1013            Ok(None)
1014        }
1015
1016        fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
1017        where
1018            M: MapAccess<'de>,
1019        {
1020            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
1021        }
1022    }
1023
1024    deserializer.deserialize_any(BoolOrStruct)
1025}
1026
1027/// Accepts very old 'initial_state' and 'latest_checkpoint' as enabling fault
1028/// tolerance.
1029///
1030/// Accepts `null` as disabling fault tolerance.
1031///
1032/// Otherwise, deserializes [FtConfig] in the way that one might otherwise
1033/// expect.
1034fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<FtConfig, D::Error>
1035where
1036    D: Deserializer<'de>,
1037{
1038    struct StringOrStruct;
1039
1040    impl<'de> Visitor<'de> for StringOrStruct {
1041        type Value = FtConfig;
1042
1043        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1044            formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
1045        }
1046
1047        fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
1048        where
1049            E: de::Error,
1050        {
1051            match v {
1052                "initial_state" | "latest_checkpoint" => Ok(FtConfig {
1053                    model: Some(FtModel::default()),
1054                    ..FtConfig::default()
1055                }),
1056                _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
1057            }
1058        }
1059
1060        fn visit_unit<E>(self) -> Result<Self::Value, E>
1061        where
1062            E: de::Error,
1063        {
1064            Ok(FtConfig::default())
1065        }
1066
1067        fn visit_none<E>(self) -> Result<Self::Value, E>
1068        where
1069            E: de::Error,
1070        {
1071            Ok(FtConfig::default())
1072        }
1073
1074        fn visit_map<M>(self, map: M) -> Result<FtConfig, M::Error>
1075        where
1076            M: MapAccess<'de>,
1077        {
1078            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))
1079        }
1080    }
1081
1082    deserializer.deserialize_any(StringOrStruct)
1083}
1084
1085impl Default for RuntimeConfig {
1086    fn default() -> Self {
1087        Self {
1088            workers: 8,
1089            max_rss_mb: None,
1090            datafusion_memory_mb: None,
1091            hosts: 1,
1092            storage: Some(StorageOptions::default()),
1093            fault_tolerance: FtConfig::default(),
1094            cpu_profiler: true,
1095            tracing: {
1096                // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
1097                // to keep it on by default.
1098                false
1099            },
1100            tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
1101            min_batch_size_records: 0,
1102            max_buffering_delay_usecs: 0,
1103            resources: ResourceConfig::default(),
1104            clock_resolution_usecs: { Some(DEFAULT_CLOCK_RESOLUTION_USECS) },
1105            pin_cpus: Vec::new(),
1106            provisioning_timeout_secs: None,
1107            max_parallel_connector_init: None,
1108            init_containers: None,
1109            checkpoint_during_suspend: true,
1110            io_workers: None,
1111            http_workers: None,
1112            env: BTreeMap::default(),
1113            dev_tweaks: DevTweaks::default(),
1114            logging: None,
1115            pipeline_template_configmap: None,
1116        }
1117    }
1118}
1119
1120/// Upper bound on the default DataFusion pool size, in MB.
1121///
1122/// Spill-to-disk handles overflow; reserving more starves the circuit.
1123pub const DEFAULT_DATAFUSION_MEMORY_MB_CEILING: u64 = 2048;
1124
1125/// Default DataFusion pool size as a percentage of the pipeline's
1126/// effective memory budget. The remainder is left for the DBSP circuit.
1127pub const DEFAULT_DATAFUSION_MEMORY_PERCENT: u64 = 5;
1128
1129impl RuntimeConfig {
1130    /// Pipeline's effective memory budget in MB: `max_rss_mb`, falling back
1131    /// to `resources.memory_mb_max` (the k8s pod limit).
1132    pub fn effective_memory_mb(&self) -> Option<u64> {
1133        self.max_rss_mb.or(self.resources.memory_mb_max)
1134    }
1135
1136    /// Resolved DataFusion pool size in MB: explicit `datafusion_memory_mb`
1137    /// if set, else 5% of the effective budget capped at
1138    /// `DEFAULT_DATAFUSION_MEMORY_MB_CEILING`. `None` if no budget is
1139    /// configured.
1140    pub fn resolved_datafusion_memory_mb(&self) -> Option<u64> {
1141        if let Some(explicit) = self.datafusion_memory_mb {
1142            return Some(explicit);
1143        }
1144        let effective = self.effective_memory_mb()?;
1145        let fraction = effective * DEFAULT_DATAFUSION_MEMORY_PERCENT / 100;
1146        Some(fraction.min(DEFAULT_DATAFUSION_MEMORY_MB_CEILING))
1147    }
1148}
1149
1150/// Fault-tolerance configuration.
1151///
1152/// The default [FtConfig] (via [FtConfig::default]) disables fault tolerance,
1153/// which is the configuration that one gets if [RuntimeConfig] omits fault
1154/// tolerance configuration.
1155///
1156/// The default value for [FtConfig::model] enables fault tolerance, as
1157/// `Some(FtModel::default())`.  This is the configuration that one gets if
1158/// [RuntimeConfig] includes a fault tolerance configuration but does not
1159/// specify a particular model.
1160#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1161#[serde(rename_all = "snake_case")]
1162pub struct FtConfig {
1163    /// Fault tolerance model to use.
1164    #[serde(with = "none_as_string")]
1165    #[serde(default = "default_model")]
1166    #[schema(
1167        schema_with = none_as_string_schema::<FtModel>,
1168    )]
1169    pub model: Option<FtModel>,
1170
1171    /// Interval between automatic checkpoints, in seconds.
1172    ///
1173    /// The default is 60 seconds.  Values less than 1 or greater than 3600 will
1174    /// be forced into that range.
1175    #[serde(default = "default_checkpoint_interval_secs")]
1176    pub checkpoint_interval_secs: Option<u64>,
1177}
1178
1179fn default_model() -> Option<FtModel> {
1180    Some(FtModel::default())
1181}
1182
1183pub fn default_checkpoint_interval_secs() -> Option<u64> {
1184    Some(60)
1185}
1186
1187impl Default for FtConfig {
1188    fn default() -> Self {
1189        Self {
1190            model: None,
1191            checkpoint_interval_secs: default_checkpoint_interval_secs(),
1192        }
1193    }
1194}
1195
1196#[cfg(test)]
1197mod test {
1198    use super::deserialize_fault_tolerance;
1199    use crate::config::{
1200        DEFAULT_DATAFUSION_MEMORY_MB_CEILING, FtConfig, FtModel, ResourceConfig, RuntimeConfig,
1201    };
1202    use serde::{Deserialize, Serialize};
1203
1204    #[test]
1205    fn resolved_datafusion_memory_explicit_passes_through() {
1206        let config = RuntimeConfig {
1207            max_rss_mb: Some(8_000),
1208            datafusion_memory_mb: Some(1_500),
1209            ..Default::default()
1210        };
1211        assert_eq!(config.resolved_datafusion_memory_mb(), Some(1_500));
1212    }
1213
1214    #[test]
1215    fn resolved_datafusion_memory_unconfigured_returns_none() {
1216        let config = RuntimeConfig::default();
1217        assert!(config.max_rss_mb.is_none());
1218        assert!(config.resources.memory_mb_max.is_none());
1219        assert_eq!(config.resolved_datafusion_memory_mb(), None);
1220    }
1221
1222    #[test]
1223    fn resolved_datafusion_memory_small_budget_scales_down() {
1224        // Small pipelines must provision cleanly; the default just shrinks.
1225        let config = RuntimeConfig {
1226            max_rss_mb: Some(256),
1227            ..Default::default()
1228        };
1229        assert_eq!(config.resolved_datafusion_memory_mb(), Some(12));
1230
1231        let config = RuntimeConfig {
1232            max_rss_mb: Some(512),
1233            ..Default::default()
1234        };
1235        assert_eq!(config.resolved_datafusion_memory_mb(), Some(25));
1236    }
1237
1238    #[test]
1239    fn resolved_datafusion_memory_clamps_to_ceiling_for_large_budgets() {
1240        // 5% of 64 GB = 3.2 GB, above the 2 GB ceiling.
1241        let config = RuntimeConfig {
1242            max_rss_mb: Some(64_000),
1243            ..Default::default()
1244        };
1245        assert_eq!(
1246            config.resolved_datafusion_memory_mb(),
1247            Some(DEFAULT_DATAFUSION_MEMORY_MB_CEILING),
1248        );
1249    }
1250
1251    #[test]
1252    fn resolved_datafusion_memory_midrange_uses_five_percent() {
1253        // 5% of 16 GB = 800 MB, inside the clamp range.
1254        let config = RuntimeConfig {
1255            max_rss_mb: Some(16_000),
1256            ..Default::default()
1257        };
1258        assert_eq!(config.resolved_datafusion_memory_mb(), Some(800));
1259    }
1260
1261    #[test]
1262    fn resolved_datafusion_memory_falls_back_to_resources() {
1263        // No max_rss_mb, but resources.memory_mb_max is set.
1264        let config = RuntimeConfig {
1265            max_rss_mb: None,
1266            resources: ResourceConfig {
1267                memory_mb_max: Some(16_000),
1268                ..Default::default()
1269            },
1270            ..Default::default()
1271        };
1272        assert_eq!(config.resolved_datafusion_memory_mb(), Some(800));
1273    }
1274
1275    #[test]
1276    fn ft_config() {
1277        #[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)]
1278        #[serde(default)]
1279        struct Wrapper {
1280            #[serde(deserialize_with = "deserialize_fault_tolerance")]
1281            config: FtConfig,
1282        }
1283
1284        // Omitting FtConfig, or specifying null, or specifying model "none", disables fault tolerance.
1285        for s in [
1286            "{}",
1287            r#"{"config": null}"#,
1288            r#"{"config": {"model": "none"}}"#,
1289        ] {
1290            let config: Wrapper = serde_json::from_str(s).unwrap();
1291            assert_eq!(
1292                config,
1293                Wrapper {
1294                    config: FtConfig {
1295                        model: None,
1296                        checkpoint_interval_secs: Some(60)
1297                    }
1298                }
1299            );
1300        }
1301
1302        // Serializing disabled FT produces explicit "none" form.
1303        let s = serde_json::to_string(&Wrapper {
1304            config: FtConfig::default(),
1305        })
1306        .unwrap();
1307        assert!(s.contains("\"none\""));
1308
1309        // `{}` for FtConfig, or `{...}` with `model` omitted, enables fault
1310        // tolerance.
1311        for s in [r#"{"config": {}}"#, r#"{"checkpoint_interval_secs": 60}"#] {
1312            assert_eq!(
1313                serde_json::from_str::<FtConfig>(s).unwrap(),
1314                FtConfig {
1315                    model: Some(FtModel::default()),
1316                    checkpoint_interval_secs: Some(60)
1317                }
1318            );
1319        }
1320
1321        // `"checkpoint_interval_secs": null` disables periodic checkpointing.
1322        assert_eq!(
1323            serde_json::from_str::<FtConfig>(r#"{"checkpoint_interval_secs": null}"#).unwrap(),
1324            FtConfig {
1325                model: Some(FtModel::default()),
1326                checkpoint_interval_secs: None
1327            }
1328        );
1329    }
1330}
1331
1332impl FtConfig {
1333    pub fn is_enabled(&self) -> bool {
1334        self.model.is_some()
1335    }
1336
1337    /// Returns the checkpoint interval, if fault tolerance is enabled, and
1338    /// otherwise `None`.
1339    pub fn checkpoint_interval(&self) -> Option<Duration> {
1340        if self.is_enabled() {
1341            self.checkpoint_interval_secs
1342                .map(|interval| Duration::from_secs(interval.clamp(1, 3600)))
1343        } else {
1344            None
1345        }
1346    }
1347}
1348
1349/// Serde implementation for de/serializing a string into `Option<T>` where
1350/// `"none"` indicates `None` and any other string indicates `Some`.
1351///
1352/// This could be extended to handle non-strings by adding more forwarding
1353/// `visit_*` methods to the Visitor implementation.  I don't see a way to write
1354/// them automatically.
1355mod none_as_string {
1356    use std::marker::PhantomData;
1357
1358    use serde::de::{Deserialize, Deserializer, IntoDeserializer, Visitor};
1359    use serde::ser::{Serialize, Serializer};
1360
1361    pub(super) fn serialize<S, T>(value: &Option<T>, serializer: S) -> Result<S::Ok, S::Error>
1362    where
1363        S: Serializer,
1364        T: Serialize,
1365    {
1366        match value.as_ref() {
1367            Some(value) => value.serialize(serializer),
1368            None => "none".serialize(serializer),
1369        }
1370    }
1371
1372    struct NoneAsString<T>(PhantomData<fn() -> T>);
1373
1374    impl<'de, T> Visitor<'de> for NoneAsString<T>
1375    where
1376        T: Deserialize<'de>,
1377    {
1378        type Value = Option<T>;
1379
1380        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1381            formatter.write_str("string")
1382        }
1383
1384        fn visit_none<E>(self) -> Result<Self::Value, E>
1385        where
1386            E: serde::de::Error,
1387        {
1388            Ok(None)
1389        }
1390
1391        fn visit_str<E>(self, value: &str) -> Result<Option<T>, E>
1392        where
1393            E: serde::de::Error,
1394        {
1395            if &value.to_ascii_lowercase() == "none" {
1396                Ok(None)
1397            } else {
1398                Ok(Some(T::deserialize(value.into_deserializer())?))
1399            }
1400        }
1401    }
1402
1403    pub(super) fn deserialize<'de, D, T>(deserializer: D) -> Result<Option<T>, D::Error>
1404    where
1405        D: Deserializer<'de>,
1406        T: Deserialize<'de>,
1407    {
1408        deserializer.deserialize_str(NoneAsString(PhantomData))
1409    }
1410}
1411
1412/// Generates an OpenAPI schema for an `Option<T>` field serialized with `none_as_string`.
1413/// The schema is a `oneOf` with a reference to `T`'s schema and a `"none"` string enum.
1414fn none_as_string_schema<'a, T: ToSchema<'a> + Default + Serialize>() -> Schema {
1415    Schema::OneOf(
1416        OneOfBuilder::new()
1417            .item(RefOr::Ref(Ref::new(format!(
1418                "#/components/schemas/{}",
1419                T::schema().0
1420            ))))
1421            .item(
1422                ObjectBuilder::new()
1423                    .schema_type(SchemaType::String)
1424                    .enum_values(Some(vec!["none"])),
1425            )
1426            .default(Some(
1427                serde_json::to_value(T::default()).expect("Failed to serialize default value"),
1428            ))
1429            .build(),
1430    )
1431}
1432
1433/// Fault tolerance model.
1434///
1435/// The ordering is significant: we consider [Self::ExactlyOnce] to be a "higher
1436/// level" of fault tolerance than [Self::AtLeastOnce].
1437#[derive(
1438    Debug, Copy, Clone, Default, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, ToSchema,
1439)]
1440#[serde(rename_all = "snake_case")]
1441pub enum FtModel {
1442    /// Each record is output at least once.  Crashes may duplicate output, but
1443    /// no input or output is dropped.
1444    AtLeastOnce,
1445
1446    /// Each record is output exactly once.  Crashes do not drop or duplicate
1447    /// input or output.
1448    #[default]
1449    ExactlyOnce,
1450}
1451
1452impl FtModel {
1453    pub fn option_as_str(value: Option<FtModel>) -> &'static str {
1454        value.map_or("no", |model| model.as_str())
1455    }
1456
1457    pub fn as_str(&self) -> &'static str {
1458        match self {
1459            FtModel::AtLeastOnce => "at_least_once",
1460            FtModel::ExactlyOnce => "exactly_once",
1461        }
1462    }
1463}
1464
1465pub struct FtModelUnknown;
1466
1467impl FromStr for FtModel {
1468    type Err = FtModelUnknown;
1469
1470    fn from_str(s: &str) -> Result<Self, Self::Err> {
1471        match s.to_ascii_lowercase().as_str() {
1472            "exactly_once" => Ok(Self::ExactlyOnce),
1473            "at_least_once" => Ok(Self::AtLeastOnce),
1474            _ => Err(FtModelUnknown),
1475        }
1476    }
1477}
1478
1479/// Describes an input connector configuration
1480#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1481pub struct InputEndpointConfig {
1482    /// The name of the input stream of the circuit that this endpoint is
1483    /// connected to.
1484    pub stream: Cow<'static, str>,
1485
1486    /// Connector configuration.
1487    #[serde(flatten)]
1488    pub connector_config: ConnectorConfig,
1489}
1490
1491impl InputEndpointConfig {
1492    pub fn new(stream: impl Into<Cow<'static, str>>, connector_config: ConnectorConfig) -> Self {
1493        Self {
1494            stream: stream.into(),
1495            connector_config,
1496        }
1497    }
1498}
1499
1500/// Deserialize the `start_after` property of a connector configuration.
1501/// It requires a non-standard deserialization because we want to accept
1502/// either a string or an array of strings.
1503fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
1504where
1505    D: Deserializer<'de>,
1506{
1507    let value = Option::<JsonValue>::deserialize(deserializer)?;
1508    match value {
1509        Some(JsonValue::String(s)) => Ok(Some(vec![s])),
1510        Some(JsonValue::Array(arr)) => {
1511            let vec = arr
1512                .into_iter()
1513                .map(|item| {
1514                    item.as_str()
1515                        .map(|s| s.to_string())
1516                        .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
1517                })
1518                .collect::<Result<Vec<String>, _>>()?;
1519            Ok(Some(vec))
1520        }
1521        Some(JsonValue::Null) | None => Ok(None),
1522        _ => Err(serde::de::Error::custom(
1523            "invalid 'start_after' property: expected a string, an array of strings, or null",
1524        )),
1525    }
1526}
1527
1528#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1529pub struct ConnectorConfig {
1530    /// Send a full snapshot of a materialized view when the connector first
1531    /// starts. Valid for output connectors only.
1532    ///
1533    /// When `true`, the pipeline emits the current contents of the view as the
1534    /// initial batch the first time the connector runs. The view must be
1535    /// materialized (declared with `CREATE MATERIALIZED VIEW`).
1536    ///
1537    /// The snapshot is sent exactly once per connector lifetime: it does not
1538    /// fire again when the pipeline resumes from a checkpoint. Modifying the
1539    /// connector configuration or invoking the reset API triggers a fresh
1540    /// snapshot when the connector supports reset (e.g., Delta Lake in
1541    /// `truncate` mode and Postgres).
1542    #[serde(default)]
1543    pub send_snapshot: bool,
1544
1545    /// Transport endpoint configuration.
1546    pub transport: TransportConfig,
1547
1548    #[serde(skip_serializing_if = "Option::is_none")]
1549    pub preprocessor: Option<Vec<PreprocessorConfig>>,
1550
1551    /// Parser configuration.
1552    pub format: Option<FormatConfig>,
1553
1554    /// Name of the index that the connector is attached to.
1555    ///
1556    /// This property is valid for output connectors only.  It is used with data
1557    /// transports and formats that expect output updates in the form of key/value
1558    /// pairs, where the key typically represents a unique id associated with the
1559    /// table or view.
1560    ///
1561    /// To support such output formats, an output connector can be attached to an
1562    /// index created using the SQL CREATE INDEX statement.  An index of a table
1563    /// or view contains the same updates as the table or view itself, indexed by
1564    /// one or more key columns.
1565    ///
1566    /// See individual connector documentation for details on how they work
1567    /// with indexes.
1568    pub index: Option<String>,
1569
1570    /// Output buffer configuration.
1571    #[serde(flatten)]
1572    pub output_buffer_config: OutputBufferConfig,
1573
1574    /// Maximum number of records from this connector to process in a single batch.
1575    ///
1576    /// When set, this caps how many records are taken from the connector’s input
1577    /// buffer and pushed through the circuit at once.
1578    ///
1579    /// This is typically configured lower than `max_queued_records` to allow the
1580    /// connector time to restart and refill its buffer while a batch is being
1581    /// processed.
1582    ///
1583    /// Not all input adapters honor this limit.
1584    ///
1585    /// If this is not set, the batch size is derived from `max_worker_batch_size`.
1586    #[serde(skip_serializing_if = "Option::is_none")]
1587    pub max_batch_size: Option<u64>,
1588
1589    /// Maximum number of records processed per batch, per worker thread.
1590    ///
1591    /// When `max_batch_size` is not set, this setting is used to cap
1592    /// the number of records that can be taken from the connector’s input
1593    /// buffer and pushed through the circuit at once.  The effective batch size is computed as:
1594    /// `max_worker_batch_size × workers`.
1595    ///
1596    /// This provides an alternative to `max_batch_size` that automatically adjusts batch
1597    /// size as the number of worker threads changes to maintain constant amount of
1598    /// work per worker per batch.
1599    ///
1600    /// Defaults to 10,000 records per worker.
1601    #[serde(skip_serializing_if = "Option::is_none")]
1602    pub max_worker_batch_size: Option<u64>,
1603
1604    /// Backpressure threshold.
1605    ///
1606    /// Maximal number of records queued by the endpoint before the endpoint
1607    /// is paused by the backpressure mechanism.
1608    ///
1609    /// For input endpoints, this setting bounds the number of records that have
1610    /// been received from the input transport but haven't yet been consumed by
1611    /// the circuit since the circuit, since the circuit is still busy processing
1612    /// previous inputs.
1613    ///
1614    /// For output endpoints, this setting bounds the number of records that have
1615    /// been produced by the circuit but not yet sent via the output transport endpoint
1616    /// nor stored in the output buffer (see `enable_output_buffer`).
1617    ///
1618    /// Note that this is not a hard bound: there can be a small delay between
1619    /// the backpressure mechanism is triggered and the endpoint is paused, during
1620    /// which more data may be queued.
1621    ///
1622    /// The default is 1 million.
1623    #[serde(default = "default_max_queued_records")]
1624    pub max_queued_records: u64,
1625
1626    /// Create connector in paused state.
1627    ///
1628    /// The default is `false`.
1629    #[serde(default)]
1630    pub paused: bool,
1631
1632    /// Arbitrary user-defined text labels associated with the connector.
1633    ///
1634    /// These labels can be used in conjunction with the `start_after` property
1635    /// to control the start order of connectors.
1636    #[serde(default)]
1637    pub labels: Vec<String>,
1638
1639    /// Start the connector after all connectors with specified labels.
1640    ///
1641    /// This property is used to control the start order of connectors.
1642    /// The connector will not start until all connectors with the specified
1643    /// labels have finished processing all inputs.
1644    #[serde(deserialize_with = "deserialize_start_after")]
1645    #[serde(default)]
1646    pub start_after: Option<Vec<String>>,
1647}
1648
1649impl ConnectorConfig {
1650    pub fn new(transport: TransportConfig, format: Option<FormatConfig>) -> Self {
1651        Self {
1652            send_snapshot: false,
1653            transport,
1654            preprocessor: None,
1655            format,
1656            index: None,
1657            output_buffer_config: Default::default(),
1658            max_batch_size: None,
1659            max_worker_batch_size: None,
1660            max_queued_records: default_max_queued_records(),
1661            paused: false,
1662            labels: Vec::new(),
1663            start_after: None,
1664        }
1665    }
1666
1667    pub fn with_max_batch_size(mut self, max_batch_size: Option<u64>) -> Self {
1668        self.max_batch_size = max_batch_size;
1669        self
1670    }
1671
1672    pub fn with_max_queued_records(mut self, max_queued_records: u64) -> Self {
1673        self.max_queued_records = max_queued_records;
1674        self
1675    }
1676
1677    /// Compare two configs modulo the `paused` field.
1678    ///
1679    /// Used to compare checkpointed and current connector configs.
1680    pub fn equal_modulo_paused(&self, other: &Self) -> bool {
1681        let mut a = self.clone();
1682        let mut b = other.clone();
1683        a.paused = false;
1684        b.paused = false;
1685        a == b
1686    }
1687}
1688
1689#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1690#[serde(default)]
1691pub struct OutputBufferConfig {
1692    /// Enable output buffering.
1693    ///
1694    /// The output buffering mechanism allows decoupling the rate at which the pipeline
1695    /// pushes changes to the output transport from the rate of input changes.
1696    ///
1697    /// By default, output updates produced by the pipeline are pushed directly to
1698    /// the output transport. Some destinations may prefer to receive updates in fewer
1699    /// bigger batches. For instance, when writing Parquet files, producing
1700    /// one bigger file every few minutes is usually better than creating
1701    /// small files every few milliseconds.
1702    ///
1703    /// To achieve such input/output decoupling, users can enable output buffering by
1704    /// setting the `enable_output_buffer` flag to `true`.  When buffering is enabled, output
1705    /// updates produced by the pipeline are consolidated in an internal buffer and are
1706    /// pushed to the output transport when one of several conditions is satisfied:
1707    ///
1708    /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
1709    ///   milliseconds.
1710    /// * buffer size exceeds `max_output_buffer_size_records` records.
1711    ///
1712    /// This flag is `false` by default.
1713    // TODO: on-demand output triggered via the API.
1714    pub enable_output_buffer: bool,
1715
1716    /// Maximum time in milliseconds data is kept in the output buffer.
1717    ///
1718    /// By default, data is kept in the buffer indefinitely until one of
1719    /// the other output conditions is satisfied.  When this option is
1720    /// set the buffer will be flushed at most every
1721    /// `max_output_buffer_time_millis` milliseconds.
1722    ///
1723    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1724    /// to be set.
1725    pub max_output_buffer_time_millis: usize,
1726
1727    /// Maximum number of updates to be kept in the output buffer.
1728    ///
1729    /// This parameter bounds the maximal size of the buffer.
1730    /// Note that the size of the buffer is not always equal to the
1731    /// total number of updates output by the pipeline. Updates to the
1732    /// same record can overwrite or cancel previous updates.
1733    ///
1734    /// By default, the buffer can grow indefinitely until one of
1735    /// the other output conditions is satisfied.
1736    ///
1737    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1738    /// to be set.
1739    pub max_output_buffer_size_records: usize,
1740}
1741
1742impl Default for OutputBufferConfig {
1743    fn default() -> Self {
1744        Self {
1745            enable_output_buffer: false,
1746            max_output_buffer_size_records: usize::MAX,
1747            max_output_buffer_time_millis: usize::MAX,
1748        }
1749    }
1750}
1751
1752impl OutputBufferConfig {
1753    pub fn validate(&self) -> Result<(), String> {
1754        if self.enable_output_buffer
1755            && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
1756            && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
1757        {
1758            return Err(
1759                "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
1760                    .to_string(),
1761            );
1762        }
1763
1764        Ok(())
1765    }
1766}
1767
1768/// Describes an output connector configuration
1769#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1770pub struct OutputEndpointConfig {
1771    /// The name of the output stream of the circuit that this endpoint is
1772    /// connected to.
1773    pub stream: Cow<'static, str>,
1774
1775    /// Connector configuration.
1776    #[serde(flatten)]
1777    pub connector_config: ConnectorConfig,
1778}
1779
1780impl OutputEndpointConfig {
1781    pub fn new(stream: impl Into<Cow<'static, str>>, connector_config: ConnectorConfig) -> Self {
1782        Self {
1783            stream: stream.into(),
1784            connector_config,
1785        }
1786    }
1787}
1788
1789/// Transport-specific endpoint configuration passed to
1790/// `crate::OutputTransport::new_endpoint`
1791/// and `crate::InputTransport::new_endpoint`.
1792#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1793#[serde(tag = "name", content = "config", rename_all = "snake_case")]
1794pub enum TransportConfig {
1795    FileInput(FileInputConfig),
1796    FileOutput(FileOutputConfig),
1797    NatsInput(NatsInputConfig),
1798    KafkaInput(KafkaInputConfig),
1799    KafkaOutput(KafkaOutputConfig),
1800    PubSubInput(PubSubInputConfig),
1801    UrlInput(UrlInputConfig),
1802    S3Input(S3InputConfig),
1803    DeltaTableInput(DeltaTableReaderConfig),
1804    DeltaTableOutput(DeltaTableWriterConfig),
1805    RedisOutput(RedisOutputConfig),
1806    // Prevent rust from complaining about large size difference between enum variants.
1807    IcebergInput(Box<IcebergReaderConfig>),
1808    PostgresInput(PostgresReaderConfig),
1809    PostgresCdcInput(PostgresCdcReaderConfig),
1810    PostgresOutput(PostgresWriterConfig),
1811    Datagen(DatagenInputConfig),
1812    Nexmark(NexmarkInputConfig),
1813    /// Direct HTTP input: cannot be instantiated through API
1814    HttpInput(HttpInputConfig),
1815    /// Direct HTTP output: cannot be instantiated through API
1816    HttpOutput(HttpOutputConfig),
1817    /// Ad hoc input: cannot be instantiated through API
1818    AdHocInput(AdHocInputConfig),
1819    ClockInput(ClockConfig),
1820}
1821
1822impl TransportConfig {
1823    pub fn name(&self) -> String {
1824        match self {
1825            TransportConfig::FileInput(_) => "file_input".to_string(),
1826            TransportConfig::FileOutput(_) => "file_output".to_string(),
1827            TransportConfig::NatsInput(_) => "nats_input".to_string(),
1828            TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
1829            TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
1830            TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
1831            TransportConfig::UrlInput(_) => "url_input".to_string(),
1832            TransportConfig::S3Input(_) => "s3_input".to_string(),
1833            TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
1834            TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
1835            TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
1836            TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
1837            TransportConfig::PostgresCdcInput(_) => "postgres_cdc_input".to_string(),
1838            TransportConfig::PostgresOutput(_) => "postgres_output".to_string(),
1839            TransportConfig::Datagen(_) => "datagen".to_string(),
1840            TransportConfig::Nexmark(_) => "nexmark".to_string(),
1841            TransportConfig::HttpInput(_) => "http_input".to_string(),
1842            TransportConfig::HttpOutput(_) => "http_output".to_string(),
1843            TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
1844            TransportConfig::RedisOutput(_) => "redis_output".to_string(),
1845            TransportConfig::ClockInput(_) => "clock".to_string(),
1846        }
1847    }
1848
1849    /// Returns true if the connector is transient, i.e., is created and destroyed
1850    /// at runtime on demand, rather than being configured as part of the pipeline.
1851    pub fn is_transient(&self) -> bool {
1852        matches!(
1853            self,
1854            TransportConfig::AdHocInput(_)
1855                | TransportConfig::HttpInput(_)
1856                | TransportConfig::HttpOutput(_)
1857                | TransportConfig::ClockInput(_)
1858        )
1859    }
1860
1861    pub fn is_http_input(&self) -> bool {
1862        matches!(self, TransportConfig::HttpInput(_))
1863    }
1864}
1865
1866/// Data format specification used to parse raw data received from the
1867/// endpoint or to encode data sent to the endpoint.
1868#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
1869pub struct FormatConfig {
1870    /// Format name, e.g., "csv", "json", "bincode", etc.
1871    pub name: Cow<'static, str>,
1872
1873    /// Format-specific parser or encoder configuration.
1874    #[serde(default)]
1875    #[schema(value_type = Object)]
1876    pub config: JsonValue,
1877}
1878
1879#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, ToSchema)]
1880#[serde(default)]
1881pub struct ResourceConfig {
1882    /// The minimum number of CPU cores to reserve
1883    /// for an instance of this pipeline
1884    #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1885    pub cpu_cores_min: Option<f64>,
1886
1887    /// The maximum number of CPU cores to reserve
1888    /// for an instance of this pipeline
1889    #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1890    pub cpu_cores_max: Option<f64>,
1891
1892    /// The minimum memory in Megabytes to reserve
1893    /// for an instance of this pipeline
1894    pub memory_mb_min: Option<u64>,
1895
1896    /// The maximum memory in Megabytes to reserve
1897    /// for an instance of this pipeline
1898    pub memory_mb_max: Option<u64>,
1899
1900    /// The total storage in Megabytes to reserve
1901    /// for an instance of this pipeline
1902    pub storage_mb_max: Option<u64>,
1903
1904    /// Storage class to use for an instance of this pipeline.
1905    /// The class determines storage performance such as IOPS and throughput.
1906    pub storage_class: Option<String>,
1907
1908    /// Kubernetes service account name to use for an instance of this pipeline.
1909    /// The account determines permissions and access controls.
1910    pub service_account_name: Option<String>,
1911
1912    /// Kubernetes namespace to use for an instance of this pipeline.
1913    /// The namespace determines the scope of names for resources created
1914    /// for the pipeline.
1915    /// If not set, the pipeline will be deployed in the same namespace
1916    /// as the control-plane.
1917    pub namespace: Option<String>,
1918}