feldera_types/
config.rs

1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure.  The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs.  We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::program_schema::ProgramSchema;
9use crate::secret_resolver::default_secrets_directory;
10use crate::transport::adhoc::AdHocInputConfig;
11use crate::transport::clock::ClockConfig;
12use crate::transport::datagen::DatagenInputConfig;
13use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
14use crate::transport::file::{FileInputConfig, FileOutputConfig};
15use crate::transport::http::HttpInputConfig;
16use crate::transport::iceberg::IcebergReaderConfig;
17use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
18use crate::transport::nats::NatsInputConfig;
19use crate::transport::nexmark::NexmarkInputConfig;
20use crate::transport::postgres::{PostgresReaderConfig, PostgresWriterConfig};
21use crate::transport::pubsub::PubSubInputConfig;
22use crate::transport::redis::RedisOutputConfig;
23use crate::transport::s3::S3InputConfig;
24use crate::transport::url::UrlInputConfig;
25use core::fmt;
26use feldera_ir::{MirNode, MirNodeId};
27use serde::de::{self, MapAccess, Visitor};
28use serde::{Deserialize, Deserializer, Serialize};
29use serde_json::Value as JsonValue;
30use std::collections::HashMap;
31use std::fmt::Display;
32use std::path::Path;
33use std::str::FromStr;
34use std::time::Duration;
35use std::{borrow::Cow, cmp::max, collections::BTreeMap};
36use utoipa::openapi::{ObjectBuilder, OneOfBuilder, Ref, RefOr, Schema, SchemaType};
37use utoipa::ToSchema;
38
39const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
40
41/// Default value of `ConnectorConfig::max_queued_records`.
42pub const fn default_max_queued_records() -> u64 {
43    1_000_000
44}
45
46/// Default maximum batch size for connectors, in records.
47///
48/// If you change this then update the comment on
49/// [ConnectorConfig::max_batch_size].
50pub const fn default_max_batch_size() -> u64 {
51    10_000
52}
53
54pub const DEFAULT_CLOCK_RESOLUTION_USECS: u64 = 1_000_000;
55
56/// Program information included in the pipeline configuration.
57#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)]
58pub struct ProgramIr {
59    /// The MIR of the program.
60    pub mir: HashMap<MirNodeId, MirNode>,
61    /// Program schema.
62    pub program_schema: ProgramSchema,
63}
64
65/// Pipeline deployment configuration.
66/// It represents configuration entries directly provided by the user
67/// (e.g., runtime configuration) and entries derived from the schema
68/// of the compiled program (e.g., connectors). Storage configuration,
69/// if applicable, is set by the runner.
70#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq)]
71pub struct PipelineConfig {
72    /// Global controller configuration.
73    #[serde(flatten)]
74    #[schema(inline)]
75    pub global: RuntimeConfig,
76
77    /// Pipeline name.
78    pub name: Option<String>,
79
80    /// Configuration for persistent storage
81    ///
82    /// If `global.storage` is `Some(_)`, this field must be set to some
83    /// [`StorageConfig`].  If `global.storage` is `None``, the pipeline ignores
84    /// this field.
85    #[serde(default)]
86    pub storage_config: Option<StorageConfig>,
87
88    /// Directory containing values of secrets.
89    ///
90    /// If this is not set, a default directory is used.
91    pub secrets_dir: Option<String>,
92
93    /// Input endpoint configuration.
94    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
95
96    /// Output endpoint configuration.
97    #[serde(default)]
98    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
99
100    /// Program information.
101    #[serde(default)]
102    pub program_ir: Option<ProgramIr>,
103}
104
105impl PipelineConfig {
106    pub fn max_parallel_connector_init(&self) -> u64 {
107        max(
108            self.global
109                .max_parallel_connector_init
110                .unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
111            1,
112        )
113    }
114
115    pub fn with_storage(self, storage: Option<(StorageConfig, StorageOptions)>) -> Self {
116        let (storage_config, storage_options) = storage.unzip();
117        Self {
118            global: RuntimeConfig {
119                storage: storage_options,
120                ..self.global
121            },
122            storage_config,
123            ..self
124        }
125    }
126
127    pub fn storage(&self) -> Option<(&StorageConfig, &StorageOptions)> {
128        let storage_options = self.global.storage.as_ref();
129        let storage_config = self.storage_config.as_ref();
130        storage_config.zip(storage_options)
131    }
132
133    /// Returns `self.secrets_dir`, or the default secrets directory if it isn't
134    /// set.
135    pub fn secrets_dir(&self) -> &Path {
136        match &self.secrets_dir {
137            Some(dir) => Path::new(dir.as_str()),
138            None => default_secrets_directory(),
139        }
140    }
141
142    /// Abbreviated config that can be printed in the log on pipeline startup.
143    pub fn display_summary(&self) -> String {
144        // TODO: we may want to further abbreviate connector config.
145        let summary = serde_json::json!({
146            "name": self.name,
147            "global": self.global,
148            "storage_config": self.storage_config,
149            "secrets_dir": self.secrets_dir,
150            "inputs": self.inputs,
151            "outputs": self.outputs
152        });
153
154        serde_json::to_string_pretty(&summary).unwrap_or_else(|_| "{}".to_string())
155    }
156}
157
158/// Configuration for persistent storage in a [`PipelineConfig`].
159#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
160pub struct StorageConfig {
161    /// A directory to keep pipeline state, as a path on the filesystem of the
162    /// machine or container where the pipeline will run.
163    ///
164    /// When storage is enabled, this directory stores the data for
165    /// [StorageBackendConfig::Default].
166    ///
167    /// When fault tolerance is enabled, this directory stores checkpoints and
168    /// the log.
169    pub path: String,
170
171    /// How to cache access to storage in this pipeline.
172    #[serde(default)]
173    pub cache: StorageCacheConfig,
174}
175
176impl StorageConfig {
177    pub fn path(&self) -> &Path {
178        Path::new(&self.path)
179    }
180}
181
182/// How to cache access to storage within a Feldera pipeline.
183#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
184#[serde(rename_all = "snake_case")]
185pub enum StorageCacheConfig {
186    /// Use the operating system's page cache as the primary storage cache.
187    ///
188    /// This is the default because it currently performs better than
189    /// `FelderaCache`.
190    #[default]
191    PageCache,
192
193    /// Use Feldera's internal cache implementation.
194    ///
195    /// This is under development. It will become the default when its
196    /// performance exceeds that of `PageCache`.
197    FelderaCache,
198}
199
200impl StorageCacheConfig {
201    #[cfg(unix)]
202    pub fn to_custom_open_flags(&self) -> i32 {
203        match self {
204            StorageCacheConfig::PageCache => (),
205            StorageCacheConfig::FelderaCache => {
206                #[cfg(target_os = "linux")]
207                return libc::O_DIRECT;
208            }
209        }
210        0
211    }
212}
213
214/// Storage configuration for a pipeline.
215#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
216#[serde(default)]
217pub struct StorageOptions {
218    /// How to connect to the underlying storage.
219    pub backend: StorageBackendConfig,
220
221    /// For a batch of data maintained as part of a persistent index during a
222    /// pipeline run, the minimum estimated number of bytes to write it to
223    /// storage.
224    ///
225    /// This is provided for debugging and fine-tuning and should ordinarily be
226    /// left unset.
227    ///
228    /// A value of 0 will write even empty batches to storage, and nonzero
229    /// values provide a threshold.  `usize::MAX` would effectively disable
230    /// storage for such batches.  The default is 10,048,576 (10 MiB).
231    pub min_storage_bytes: Option<usize>,
232
233    /// For a batch of data passed through the pipeline during a single step,
234    /// the minimum estimated number of bytes to write it to storage.
235    ///
236    /// This is provided for debugging and fine-tuning and should ordinarily be
237    /// left unset.  A value of 0 will write even empty batches to storage, and
238    /// nonzero values provide a threshold.  `usize::MAX`, the default,
239    /// effectively disables storage for such batches.  If it is set to another
240    /// value, it should ordinarily be greater than or equal to
241    /// `min_storage_bytes`.
242    pub min_step_storage_bytes: Option<usize>,
243
244    /// The form of compression to use in data batches.
245    ///
246    /// Compression has a CPU cost but it can take better advantage of limited
247    /// NVMe and network bandwidth, which means that it can increase overall
248    /// performance.
249    pub compression: StorageCompression,
250
251    /// The maximum size of the in-memory storage cache, in MiB.
252    ///
253    /// If set, the specified cache size is spread across all the foreground and
254    /// background threads. If unset, each foreground or background thread cache
255    /// is limited to 256 MiB.
256    pub cache_mib: Option<usize>,
257}
258
259/// Backend storage configuration.
260#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
261#[serde(tag = "name", content = "config", rename_all = "snake_case")]
262pub enum StorageBackendConfig {
263    /// Use the default storage configuration.
264    ///
265    /// This currently uses the local file system.
266    #[default]
267    Default,
268
269    /// Use the local file system.
270    ///
271    /// This uses ordinary system file operations.
272    File(Box<FileBackendConfig>),
273
274    /// Object storage.
275    Object(ObjectStorageConfig),
276}
277
278impl Display for StorageBackendConfig {
279    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280        match self {
281            StorageBackendConfig::Default => write!(f, "default"),
282            StorageBackendConfig::File(_) => write!(f, "file"),
283            StorageBackendConfig::Object(_) => write!(f, "object"),
284        }
285    }
286}
287
288/// Storage compression algorithm.
289#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
290#[serde(rename_all = "snake_case")]
291pub enum StorageCompression {
292    /// Use Feldera's default compression algorithm.
293    ///
294    /// The default may change as Feldera's performance is tuned and new
295    /// algorithms are introduced.
296    #[default]
297    Default,
298
299    /// Do not compress.
300    None,
301
302    /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
303    Snappy,
304}
305
306#[derive(Debug, Clone, Eq, PartialEq)]
307pub enum StartFromCheckpoint {
308    Latest,
309    Uuid(uuid::Uuid),
310}
311
312impl ToSchema<'_> for StartFromCheckpoint {
313    fn schema() -> (
314        &'static str,
315        utoipa::openapi::RefOr<utoipa::openapi::schema::Schema>,
316    ) {
317        (
318            "StartFromCheckpoint",
319            utoipa::openapi::RefOr::T(Schema::OneOf(
320                OneOfBuilder::new()
321                    .item(
322                        ObjectBuilder::new()
323                            .schema_type(SchemaType::String)
324                            .enum_values(Some(["latest"].into_iter()))
325                            .build(),
326                    )
327                    .item(
328                        ObjectBuilder::new()
329                            .schema_type(SchemaType::String)
330                            .format(Some(utoipa::openapi::SchemaFormat::KnownFormat(
331                                utoipa::openapi::KnownFormat::Uuid,
332                            )))
333                            .build(),
334                    )
335                    .nullable(true)
336                    .build(),
337            )),
338        )
339    }
340}
341
342impl<'de> Deserialize<'de> for StartFromCheckpoint {
343    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
344    where
345        D: Deserializer<'de>,
346    {
347        struct StartFromCheckpointVisitor;
348
349        impl<'de> Visitor<'de> for StartFromCheckpointVisitor {
350            type Value = StartFromCheckpoint;
351
352            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
353                formatter.write_str("a UUID string or the string \"latest\"")
354            }
355
356            fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
357            where
358                E: de::Error,
359            {
360                if value == "latest" {
361                    Ok(StartFromCheckpoint::Latest)
362                } else {
363                    uuid::Uuid::parse_str(value)
364                        .map(StartFromCheckpoint::Uuid)
365                        .map_err(|_| E::invalid_value(serde::de::Unexpected::Str(value), &self))
366                }
367            }
368        }
369
370        deserializer.deserialize_str(StartFromCheckpointVisitor)
371    }
372}
373
374impl Serialize for StartFromCheckpoint {
375    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
376    where
377        S: serde::Serializer,
378    {
379        match self {
380            StartFromCheckpoint::Latest => serializer.serialize_str("latest"),
381            StartFromCheckpoint::Uuid(uuid) => serializer.serialize_str(&uuid.to_string()),
382        }
383    }
384}
385
386#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
387pub struct SyncConfig {
388    /// The endpoint URL for the storage service.
389    ///
390    /// This is typically required for custom or local S3-compatible storage providers like MinIO.
391    /// Example: `http://localhost:9000`
392    ///
393    /// Relevant rclone config key: [`endpoint`](https://rclone.org/s3/#s3-endpoint)
394    pub endpoint: Option<String>,
395
396    /// The name of the storage bucket.
397    ///
398    /// This may include a path to a folder inside the bucket (e.g., `my-bucket/data`).
399    pub bucket: String,
400
401    /// The region that this bucket is in.
402    ///
403    /// Leave empty for Minio or the default region (`us-east-1` for AWS).
404    pub region: Option<String>,
405
406    /// The name of the cloud storage provider (e.g., `"AWS"`, `"Minio"`).
407    ///
408    /// Used for provider-specific behavior in rclone.
409    /// If omitted, defaults to `"Other"`.
410    ///
411    /// See [rclone S3 provider documentation](https://rclone.org/s3/#s3-provider)
412    pub provider: Option<String>,
413
414    /// The access key used to authenticate with the storage provider.
415    ///
416    /// If not provided, rclone will fall back to environment-based credentials, such as
417    /// `RCLONE_S3_ACCESS_KEY_ID`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
418    /// this can be left empty to allow automatic authentication via the pod's service account.
419    pub access_key: Option<String>,
420
421    /// The secret key used together with the access key for authentication.
422    ///
423    /// If not provided, rclone will fall back to environment-based credentials, such as
424    /// `RCLONE_S3_SECRET_ACCESS_KEY`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
425    /// this can be left empty to allow automatic authentication via the pod's service account.
426    pub secret_key: Option<String>,
427
428    /// When set, the pipeline will try fetch the specified checkpoint from the
429    /// object store.
430    ///
431    /// If `fail_if_no_checkpoint` is `true`, the pipeline will fail to initialize.
432    pub start_from_checkpoint: Option<StartFromCheckpoint>,
433
434    /// When true, the pipeline will fail to initialize if fetching the
435    /// specified checkpoint fails (missing, download error).
436    /// When false, the pipeline will start from scratch instead.
437    ///
438    /// False by default.
439    #[schema(default = std::primitive::bool::default)]
440    #[serde(default)]
441    pub fail_if_no_checkpoint: bool,
442
443    /// The number of file transfers to run in parallel.
444    /// Default: 20
445    pub transfers: Option<u8>,
446
447    /// The number of checkers to run in parallel.
448    /// Default: 20
449    pub checkers: Option<u8>,
450
451    /// Set to skip post copy check of checksums, and only check the file sizes.
452    /// This can significantly improve the throughput.
453    /// Defualt: false
454    pub ignore_checksum: Option<bool>,
455
456    /// Number of streams to use for multi-thread downloads.
457    /// Default: 10
458    pub multi_thread_streams: Option<u8>,
459
460    /// Use multi-thread download for files above this size.
461    /// Format: `[size][Suffix]` (Example: 1G, 500M)
462    /// Supported suffixes: k|M|G|T
463    /// Default: 100M
464    pub multi_thread_cutoff: Option<String>,
465
466    /// The number of chunks of the same file that are uploaded for multipart uploads.
467    /// Default: 10
468    pub upload_concurrency: Option<u8>,
469
470    /// When `true`, the pipeline starts in **standby** mode; processing doesn't
471    /// start until activation (`POST /activate`).
472    /// If this pipeline was previously activated and the storage has not been
473    /// cleared, the pipeline will auto activate, no newer checkpoints will be
474    /// fetched.
475    ///
476    /// Standby behavior depends on `start_from_checkpoint`:
477    /// - If `latest`, pipeline continuously fetches the latest available
478    ///   checkpoint until activated.
479    /// - If checkpoint UUID, pipeline fetches this checkpoint once and waits
480    ///   in standby until activated.
481    ///
482    /// Default: `false`
483    #[schema(default = std::primitive::bool::default)]
484    #[serde(default)]
485    pub standby: bool,
486
487    /// The interval (in seconds) between each attempt to fetch the latest
488    /// checkpoint from object store while in standby mode.
489    ///
490    /// Applies only when `start_from_checkpoint` is set to `latest`.
491    ///
492    /// Default: 10 seconds
493    #[schema(default = default_pull_interval)]
494    #[serde(default = "default_pull_interval")]
495    pub pull_interval: u64,
496
497    /// Extra flags to pass to `rclone`.
498    ///
499    /// WARNING: Supplying incorrect or conflicting flags can break `rclone`.
500    /// Use with caution.
501    ///
502    /// Refer to the docs to see the supported flags:
503    /// - [Global flags](https://rclone.org/flags/)
504    /// - [S3 specific flags](https://rclone.org/s3/)
505    pub flags: Option<Vec<String>>,
506
507    /// The minimum number of checkpoints to retain in object store.
508    /// No checkpoints will be deleted if the total count is below this threshold.
509    ///
510    /// Default: 10
511    #[schema(default = default_retention_min_count)]
512    #[serde(default = "default_retention_min_count")]
513    pub retention_min_count: u32,
514
515    /// The minimum age (in days) a checkpoint must reach before it becomes
516    /// eligible for deletion. All younger checkpoints will be preserved.
517    ///
518    /// Default: 30
519    #[schema(default = default_retention_min_age)]
520    #[serde(default = "default_retention_min_age")]
521    pub retention_min_age: u32,
522}
523
524fn default_pull_interval() -> u64 {
525    10
526}
527
528fn default_retention_min_count() -> u32 {
529    10
530}
531
532fn default_retention_min_age() -> u32 {
533    30
534}
535
536impl SyncConfig {
537    pub fn validate(&self) -> Result<(), String> {
538        if self.standby && self.start_from_checkpoint.is_none() {
539            return Err(r#"invalid sync config: `standby` set to `true` but `start_from_checkpoint` not set.
540Standby mode requires `start_from_checkpoint` to be set.
541Consider setting `start_from_checkpoint` to `"latest"`."#.to_owned());
542        }
543
544        Ok(())
545    }
546}
547
548#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
549pub struct ObjectStorageConfig {
550    /// URL.
551    ///
552    /// The following URL schemes are supported:
553    ///
554    /// * S3:
555    ///   - `s3://<bucket>/<path>`
556    ///   - `s3a://<bucket>/<path>`
557    ///   - `https://s3.<region>.amazonaws.com/<bucket>`
558    ///   - `https://<bucket>.s3.<region>.amazonaws.com`
559    ///   - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
560    /// * Google Cloud Storage:
561    ///   - `gs://<bucket>/<path>`
562    /// * Microsoft Azure Blob Storage:
563    ///   - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
564    ///   - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
565    ///   - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
566    ///   - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
567    ///   - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
568    ///   - `azure://<container>/<path>` (custom)
569    ///   - `https://<account>.dfs.core.windows.net`
570    ///   - `https://<account>.blob.core.windows.net`
571    ///   - `https://<account>.blob.core.windows.net/<container>`
572    ///   - `https://<account>.dfs.fabric.microsoft.com`
573    ///   - `https://<account>.dfs.fabric.microsoft.com/<container>`
574    ///   - `https://<account>.blob.fabric.microsoft.com`
575    ///   - `https://<account>.blob.fabric.microsoft.com/<container>`
576    ///
577    /// Settings derived from the URL will override other settings.
578    pub url: String,
579
580    /// Additional options as key-value pairs.
581    ///
582    /// The following keys are supported:
583    ///
584    /// * S3:
585    ///   - `access_key_id`: AWS Access Key.
586    ///   - `secret_access_key`: AWS Secret Access Key.
587    ///   - `region`: Region.
588    ///   - `default_region`: Default region.
589    ///   - `endpoint`: Custom endpoint for communicating with S3,
590    ///     e.g. `https://localhost:4566` for testing against a localstack
591    ///     instance.
592    ///   - `token`: Token to use for requests (passed to underlying provider).
593    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
594    /// * Google Cloud Storage:
595    ///   - `service_account`: Path to the service account file.
596    ///   - `service_account_key`: The serialized service account key.
597    ///   - `google_application_credentials`: Application credentials path.
598    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
599    /// * Microsoft Azure Blob Storage:
600    ///   - `access_key`: Azure Access Key.
601    ///   - `container_name`: Azure Container Name.
602    ///   - `account`: Azure Account.
603    ///   - `bearer_token_authorization`: Static bearer token for authorizing requests.
604    ///   - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
605    ///   - `client_secret`: Client secret for use in client secret flow.
606    ///   - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
607    ///   - `endpoint`: Override the endpoint for communicating with blob storage.
608    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
609    ///
610    /// Options set through the URL take precedence over those set with these
611    /// options.
612    #[serde(flatten)]
613    pub other_options: BTreeMap<String, String>,
614}
615
616/// Configuration for local file system access.
617#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
618#[serde(default)]
619pub struct FileBackendConfig {
620    /// Whether to use background threads for file I/O.
621    ///
622    /// Background threads should improve performance, but they can reduce
623    /// performance if too few cores are available. This is provided for
624    /// debugging and fine-tuning and should ordinarily be left unset.
625    pub async_threads: Option<bool>,
626
627    /// Per-I/O operation sleep duration, in milliseconds.
628    ///
629    /// This is for simulating slow storage devices.  Do not use this in
630    /// production.
631    pub ioop_delay: Option<u64>,
632
633    /// Configuration to synchronize checkpoints to object store.
634    pub sync: Option<SyncConfig>,
635}
636
637/// Global pipeline configuration settings. This is the publicly
638/// exposed type for users to configure pipelines.
639#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
640#[serde(default)]
641pub struct RuntimeConfig {
642    /// Number of DBSP worker threads.
643    ///
644    /// Each DBSP "foreground" worker thread is paired with a "background"
645    /// thread for LSM merging, making the total number of threads twice the
646    /// specified number.
647    ///
648    /// The typical sweet spot for the number of workers is between 4 and 16.
649    /// Each worker increases overall memory consumption for data structures
650    /// used during a step.
651    pub workers: u16,
652
653    /// Storage configuration.
654    ///
655    /// - If this is `None`, the default, the pipeline's state is kept in
656    ///   in-memory data-structures.  This is useful if the pipeline's state
657    ///   will fit in memory and if the pipeline is ephemeral and does not need
658    ///   to be recovered after a restart. The pipeline will most likely run
659    ///   faster since it does not need to access storage.
660    ///
661    /// - If set, the pipeline's state is kept on storage.  This allows the
662    ///   pipeline to work with state that will not fit into memory. It also
663    ///   allows the state to be checkpointed and recovered across restarts.
664    #[serde(deserialize_with = "deserialize_storage_options")]
665    pub storage: Option<StorageOptions>,
666
667    /// Fault tolerance configuration.
668    #[serde(deserialize_with = "deserialize_fault_tolerance")]
669    pub fault_tolerance: FtConfig,
670
671    /// Enable CPU profiler.
672    ///
673    /// The default value is `true`.
674    pub cpu_profiler: bool,
675
676    /// Enable pipeline tracing.
677    pub tracing: bool,
678
679    /// Jaeger tracing endpoint to send tracing information to.
680    pub tracing_endpoint_jaeger: String,
681
682    /// Minimal input batch size.
683    ///
684    /// The controller delays pushing input records to the circuit until at
685    /// least `min_batch_size_records` records have been received (total
686    /// across all endpoints) or `max_buffering_delay_usecs` microseconds
687    /// have passed since at least one input records has been buffered.
688    /// Defaults to 0.
689    pub min_batch_size_records: u64,
690
691    /// Maximal delay in microseconds to wait for `min_batch_size_records` to
692    /// get buffered by the controller, defaults to 0.
693    pub max_buffering_delay_usecs: u64,
694
695    /// Resource reservations and limits. This is enforced
696    /// only in Feldera Cloud.
697    pub resources: ResourceConfig,
698
699    /// Real-time clock resolution in microseconds.
700    ///
701    /// This parameter controls the execution of queries that use the `NOW()` function.  The output of such
702    /// queries depends on the real-time clock and can change over time without any external
703    /// inputs.  If the query uses `NOW()`, the pipeline will update the clock value and trigger incremental
704    /// recomputation at most each `clock_resolution_usecs` microseconds.  If the query does not use
705    /// `NOW()`, then clock value updates are suppressed and the pipeline ignores this setting.
706    ///
707    /// It is set to 1 second (1,000,000 microseconds) by default.
708    pub clock_resolution_usecs: Option<u64>,
709
710    /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
711    /// its worker threads.  Specify at least twice as many CPU numbers as
712    /// workers.  CPUs are generally numbered starting from 0.  The pipeline
713    /// might not be able to honor CPU pinning requests.
714    ///
715    /// CPU pinning can make pipelines run faster and perform more consistently,
716    /// as long as different pipelines running on the same machine are pinned to
717    /// different CPUs.
718    pub pin_cpus: Vec<usize>,
719
720    /// Timeout in seconds for the `Provisioning` phase of the pipeline.
721    /// Setting this value will override the default of the runner.
722    pub provisioning_timeout_secs: Option<u64>,
723
724    /// The maximum number of connectors initialized in parallel during pipeline
725    /// startup.
726    ///
727    /// At startup, the pipeline must initialize all of its input and output connectors.
728    /// Depending on the number and types of connectors, this can take a long time.
729    /// To accelerate the process, multiple connectors are initialized concurrently.
730    /// This option controls the maximum number of connectors that can be initialized
731    /// in parallel.
732    ///
733    /// The default is 10.
734    pub max_parallel_connector_init: Option<u64>,
735
736    /// Specification of additional (sidecar) containers.
737    pub init_containers: Option<serde_json::Value>,
738
739    /// Deprecated: setting this true or false does not have an effect anymore.
740    pub checkpoint_during_suspend: bool,
741
742    /// Sets the number of available runtime threads for the http server.
743    ///
744    /// In most cases, this does not need to be set explicitly and
745    /// the default is sufficient. Can be increased in case the
746    /// pipeline HTTP API operations are a bottleneck.
747    ///
748    /// If not specified, the default is set to `workers`.
749    pub http_workers: Option<u64>,
750
751    /// Sets the number of available runtime threads for async IO tasks.
752    ///
753    /// This affects some networking and file I/O operations
754    /// especially adapters and ad-hoc queries.
755    ///
756    /// In most cases, this does not need to be set explicitly and
757    /// the default is sufficient. Can be increased in case
758    /// ingress, egress or ad-hoc queries are a bottleneck.
759    ///
760    /// If not specified, the default is set to `workers`.
761    pub io_workers: Option<u64>,
762
763    /// Optional settings for tweaking Feldera internals.
764    ///
765    /// The available key-value pairs change from one version of Feldera to
766    /// another, so users should not depend on particular settings being
767    /// available, or on their behavior.
768    pub dev_tweaks: BTreeMap<String, serde_json::Value>,
769
770    /// Log filtering directives.
771    ///
772    /// If set to a valid [tracing-subscriber] filter, this controls the log
773    /// messages emitted by the pipeline process.  Otherwise, or if the filter
774    /// has invalid syntax, messages at "info" severity and higher are written
775    /// to the log and all others are discarded.
776    ///
777    /// [tracing-subscriber]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
778    pub logging: Option<String>,
779}
780
781/// Accepts "true" and "false" and converts them to the new format.
782fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
783where
784    D: Deserializer<'de>,
785{
786    struct BoolOrStruct;
787
788    impl<'de> Visitor<'de> for BoolOrStruct {
789        type Value = Option<StorageOptions>;
790
791        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
792            formatter.write_str("boolean or StorageOptions")
793        }
794
795        fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
796        where
797            E: de::Error,
798        {
799            match v {
800                false => Ok(None),
801                true => Ok(Some(StorageOptions::default())),
802            }
803        }
804
805        fn visit_unit<E>(self) -> Result<Self::Value, E>
806        where
807            E: de::Error,
808        {
809            Ok(None)
810        }
811
812        fn visit_none<E>(self) -> Result<Self::Value, E>
813        where
814            E: de::Error,
815        {
816            Ok(None)
817        }
818
819        fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
820        where
821            M: MapAccess<'de>,
822        {
823            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
824        }
825    }
826
827    deserializer.deserialize_any(BoolOrStruct)
828}
829
830/// Accepts very old 'initial_state' and 'latest_checkpoint' as enabling fault
831/// tolerance.
832///
833/// Accepts `null` as disabling fault tolerance.
834///
835/// Otherwise, deserializes [FtConfig] in the way that one might otherwise
836/// expect.
837fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<FtConfig, D::Error>
838where
839    D: Deserializer<'de>,
840{
841    struct StringOrStruct;
842
843    impl<'de> Visitor<'de> for StringOrStruct {
844        type Value = FtConfig;
845
846        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
847            formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
848        }
849
850        fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
851        where
852            E: de::Error,
853        {
854            match v {
855                "initial_state" | "latest_checkpoint" => Ok(FtConfig {
856                    model: Some(FtModel::default()),
857                    ..FtConfig::default()
858                }),
859                _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
860            }
861        }
862
863        fn visit_unit<E>(self) -> Result<Self::Value, E>
864        where
865            E: de::Error,
866        {
867            Ok(FtConfig::default())
868        }
869
870        fn visit_none<E>(self) -> Result<Self::Value, E>
871        where
872            E: de::Error,
873        {
874            Ok(FtConfig::default())
875        }
876
877        fn visit_map<M>(self, map: M) -> Result<FtConfig, M::Error>
878        where
879            M: MapAccess<'de>,
880        {
881            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))
882        }
883    }
884
885    deserializer.deserialize_any(StringOrStruct)
886}
887
888impl Default for RuntimeConfig {
889    fn default() -> Self {
890        Self {
891            workers: 8,
892            storage: Some(StorageOptions::default()),
893            fault_tolerance: FtConfig::default(),
894            cpu_profiler: true,
895            tracing: {
896                // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
897                // to keep it on by default.
898                false
899            },
900            tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
901            min_batch_size_records: 0,
902            max_buffering_delay_usecs: 0,
903            resources: ResourceConfig::default(),
904            clock_resolution_usecs: { Some(DEFAULT_CLOCK_RESOLUTION_USECS) },
905            pin_cpus: Vec::new(),
906            provisioning_timeout_secs: None,
907            max_parallel_connector_init: None,
908            init_containers: None,
909            checkpoint_during_suspend: true,
910            io_workers: None,
911            http_workers: None,
912            dev_tweaks: BTreeMap::default(),
913            logging: None,
914        }
915    }
916}
917
918/// Fault-tolerance configuration.
919///
920/// The default [FtConfig] (via [FtConfig::default]) disables fault tolerance,
921/// which is the configuration that one gets if [RuntimeConfig] omits fault
922/// tolerance configuration.
923///
924/// The default value for [FtConfig::model] enables fault tolerance, as
925/// `Some(FtModel::default())`.  This is the configuration that one gets if
926/// [RuntimeConfig] includes a fault tolerance configuration but does not
927/// specify a particular model.
928#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
929#[serde(rename_all = "snake_case")]
930pub struct FtConfig {
931    /// Fault tolerance model to use.
932    #[serde(with = "none_as_string")]
933    #[serde(default = "default_model")]
934    #[schema(
935        schema_with = none_as_string_schema::<FtModel>,
936    )]
937    pub model: Option<FtModel>,
938
939    /// Interval between automatic checkpoints, in seconds.
940    ///
941    /// The default is 60 seconds.  Values less than 1 or greater than 3600 will
942    /// be forced into that range.
943    #[serde(default = "default_checkpoint_interval_secs")]
944    pub checkpoint_interval_secs: Option<u64>,
945}
946
947fn default_model() -> Option<FtModel> {
948    Some(FtModel::default())
949}
950
951pub fn default_checkpoint_interval_secs() -> Option<u64> {
952    Some(60)
953}
954
955impl Default for FtConfig {
956    fn default() -> Self {
957        Self {
958            model: None,
959            checkpoint_interval_secs: default_checkpoint_interval_secs(),
960        }
961    }
962}
963
964#[cfg(test)]
965mod test {
966    use super::deserialize_fault_tolerance;
967    use crate::config::{FtConfig, FtModel};
968    use serde::{Deserialize, Serialize};
969
970    #[test]
971    fn ft_config() {
972        #[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)]
973        #[serde(default)]
974        struct Wrapper {
975            #[serde(deserialize_with = "deserialize_fault_tolerance")]
976            config: FtConfig,
977        }
978
979        // Omitting FtConfig, or specifying null, or specifying model "none", disables fault tolerance.
980        for s in [
981            "{}",
982            r#"{"config": null}"#,
983            r#"{"config": {"model": "none"}}"#,
984        ] {
985            let config: Wrapper = serde_json::from_str(s).unwrap();
986            assert_eq!(
987                config,
988                Wrapper {
989                    config: FtConfig {
990                        model: None,
991                        checkpoint_interval_secs: Some(60)
992                    }
993                }
994            );
995        }
996
997        // Serializing disabled FT produces explicit "none" form.
998        let s = serde_json::to_string(&Wrapper {
999            config: FtConfig::default(),
1000        })
1001        .unwrap();
1002        assert!(s.contains("\"none\""));
1003
1004        // `{}` for FtConfig, or `{...}` with `model` omitted, enables fault
1005        // tolerance.
1006        for s in [r#"{"config": {}}"#, r#"{"checkpoint_interval_secs": 60}"#] {
1007            assert_eq!(
1008                serde_json::from_str::<FtConfig>(s).unwrap(),
1009                FtConfig {
1010                    model: Some(FtModel::default()),
1011                    checkpoint_interval_secs: Some(60)
1012                }
1013            );
1014        }
1015
1016        // `"checkpoint_interval_secs": null` disables periodic checkpointing.
1017        assert_eq!(
1018            serde_json::from_str::<FtConfig>(r#"{"checkpoint_interval_secs": null}"#).unwrap(),
1019            FtConfig {
1020                model: Some(FtModel::default()),
1021                checkpoint_interval_secs: None
1022            }
1023        );
1024    }
1025}
1026
1027impl FtConfig {
1028    pub fn is_enabled(&self) -> bool {
1029        self.model.is_some()
1030    }
1031
1032    /// Returns the checkpoint interval, if fault tolerance is enabled, and
1033    /// otherwise `None`.
1034    pub fn checkpoint_interval(&self) -> Option<Duration> {
1035        if self.is_enabled() {
1036            self.checkpoint_interval_secs
1037                .map(|interval| Duration::from_secs(interval.clamp(1, 3600)))
1038        } else {
1039            None
1040        }
1041    }
1042}
1043
1044/// Serde implementation for de/serializing a string into `Option<T>` where
1045/// `"none"` indicates `None` and any other string indicates `Some`.
1046///
1047/// This could be extended to handle non-strings by adding more forwarding
1048/// `visit_*` methods to the Visitor implementation.  I don't see a way to write
1049/// them automatically.
1050mod none_as_string {
1051    use std::marker::PhantomData;
1052
1053    use serde::de::{Deserialize, Deserializer, IntoDeserializer, Visitor};
1054    use serde::ser::{Serialize, Serializer};
1055
1056    pub(super) fn serialize<S, T>(value: &Option<T>, serializer: S) -> Result<S::Ok, S::Error>
1057    where
1058        S: Serializer,
1059        T: Serialize,
1060    {
1061        match value.as_ref() {
1062            Some(value) => value.serialize(serializer),
1063            None => "none".serialize(serializer),
1064        }
1065    }
1066
1067    struct NoneAsString<T>(PhantomData<fn() -> T>);
1068
1069    impl<'de, T> Visitor<'de> for NoneAsString<T>
1070    where
1071        T: Deserialize<'de>,
1072    {
1073        type Value = Option<T>;
1074
1075        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1076            formatter.write_str("string")
1077        }
1078
1079        fn visit_none<E>(self) -> Result<Self::Value, E>
1080        where
1081            E: serde::de::Error,
1082        {
1083            Ok(None)
1084        }
1085
1086        fn visit_str<E>(self, value: &str) -> Result<Option<T>, E>
1087        where
1088            E: serde::de::Error,
1089        {
1090            if &value.to_ascii_lowercase() == "none" {
1091                Ok(None)
1092            } else {
1093                Ok(Some(T::deserialize(value.into_deserializer())?))
1094            }
1095        }
1096    }
1097
1098    pub(super) fn deserialize<'de, D, T>(deserializer: D) -> Result<Option<T>, D::Error>
1099    where
1100        D: Deserializer<'de>,
1101        T: Deserialize<'de>,
1102    {
1103        deserializer.deserialize_str(NoneAsString(PhantomData))
1104    }
1105}
1106
1107/// Generates an OpenAPI schema for an `Option<T>` field serialized with `none_as_string`.
1108/// The schema is a `oneOf` with a reference to `T`'s schema and a `"none"` string enum.
1109fn none_as_string_schema<'a, T: ToSchema<'a> + Default + Serialize>() -> Schema {
1110    Schema::OneOf(
1111        OneOfBuilder::new()
1112            .item(RefOr::Ref(Ref::new(format!(
1113                "#/components/schemas/{}",
1114                T::schema().0
1115            ))))
1116            .item(
1117                ObjectBuilder::new()
1118                    .schema_type(SchemaType::String)
1119                    .enum_values(Some(vec!["none"])),
1120            )
1121            .default(Some(
1122                serde_json::to_value(T::default()).expect("Failed to serialize default value"),
1123            ))
1124            .build(),
1125    )
1126}
1127
1128/// Fault tolerance model.
1129///
1130/// The ordering is significant: we consider [Self::ExactlyOnce] to be a "higher
1131/// level" of fault tolerance than [Self::AtLeastOnce].
1132#[derive(
1133    Debug, Copy, Clone, Default, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, ToSchema,
1134)]
1135#[serde(rename_all = "snake_case")]
1136pub enum FtModel {
1137    /// Each record is output at least once.  Crashes may duplicate output, but
1138    /// no input or output is dropped.
1139    AtLeastOnce,
1140
1141    /// Each record is output exactly once.  Crashes do not drop or duplicate
1142    /// input or output.
1143    #[default]
1144    ExactlyOnce,
1145}
1146
1147impl FtModel {
1148    pub fn option_as_str(value: Option<FtModel>) -> &'static str {
1149        value.map_or("no", |model| model.as_str())
1150    }
1151
1152    pub fn as_str(&self) -> &'static str {
1153        match self {
1154            FtModel::AtLeastOnce => "at_least_once",
1155            FtModel::ExactlyOnce => "exactly_once",
1156        }
1157    }
1158}
1159
1160pub struct FtModelUnknown;
1161
1162impl FromStr for FtModel {
1163    type Err = FtModelUnknown;
1164
1165    fn from_str(s: &str) -> Result<Self, Self::Err> {
1166        match s.to_ascii_lowercase().as_str() {
1167            "exactly_once" => Ok(Self::ExactlyOnce),
1168            "at_least_once" => Ok(Self::AtLeastOnce),
1169            _ => Err(FtModelUnknown),
1170        }
1171    }
1172}
1173
1174/// Describes an input connector configuration
1175#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1176pub struct InputEndpointConfig {
1177    /// The name of the input stream of the circuit that this endpoint is
1178    /// connected to.
1179    pub stream: Cow<'static, str>,
1180
1181    /// Connector configuration.
1182    #[serde(flatten)]
1183    pub connector_config: ConnectorConfig,
1184}
1185
1186/// Deserialize the `start_after` property of a connector configuration.
1187/// It requires a non-standard deserialization because we want to accept
1188/// either a string or an array of strings.
1189fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
1190where
1191    D: Deserializer<'de>,
1192{
1193    let value = Option::<JsonValue>::deserialize(deserializer)?;
1194    match value {
1195        Some(JsonValue::String(s)) => Ok(Some(vec![s])),
1196        Some(JsonValue::Array(arr)) => {
1197            let vec = arr
1198                .into_iter()
1199                .map(|item| {
1200                    item.as_str()
1201                        .map(|s| s.to_string())
1202                        .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
1203                })
1204                .collect::<Result<Vec<String>, _>>()?;
1205            Ok(Some(vec))
1206        }
1207        Some(JsonValue::Null) | None => Ok(None),
1208        _ => Err(serde::de::Error::custom(
1209            "invalid 'start_after' property: expected a string, an array of strings, or null",
1210        )),
1211    }
1212}
1213
1214/// A data connector's configuration
1215#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1216pub struct ConnectorConfig {
1217    /// Transport endpoint configuration.
1218    pub transport: TransportConfig,
1219
1220    /// Parser configuration.
1221    pub format: Option<FormatConfig>,
1222
1223    /// Name of the index that the connector is attached to.
1224    ///
1225    /// This property is valid for output connectors only.  It is used with data
1226    /// transports and formats that expect output updates in the form of key/value
1227    /// pairs, where the key typically represents a unique id associated with the
1228    /// table or view.
1229    ///
1230    /// To support such output formats, an output connector can be attached to an
1231    /// index created using the SQL CREATE INDEX statement.  An index of a table
1232    /// or view contains the same updates as the table or view itself, indexed by
1233    /// one or more key columns.
1234    ///
1235    /// See individual connector documentation for details on how they work
1236    /// with indexes.
1237    pub index: Option<String>,
1238
1239    /// Output buffer configuration.
1240    #[serde(flatten)]
1241    pub output_buffer_config: OutputBufferConfig,
1242
1243    /// Maximum batch size, in records.
1244    ///
1245    /// This is the maximum number of records to process in one batch through
1246    /// the circuit.  The time and space cost of processing a batch is
1247    /// asymptotically superlinear in the size of the batch, but very small
1248    /// batches are less efficient due to constant factors.
1249    ///
1250    /// This should usually be less than `max_queued_records`, to give the
1251    /// connector a round-trip time to restart and refill the buffer while
1252    /// batches are being processed.
1253    ///
1254    /// Some input adapters might not honor this setting.
1255    ///
1256    /// The default is 10,000.
1257    #[serde(default = "default_max_batch_size")]
1258    pub max_batch_size: u64,
1259
1260    /// Backpressure threshold.
1261    ///
1262    /// Maximal number of records queued by the endpoint before the endpoint
1263    /// is paused by the backpressure mechanism.
1264    ///
1265    /// For input endpoints, this setting bounds the number of records that have
1266    /// been received from the input transport but haven't yet been consumed by
1267    /// the circuit since the circuit, since the circuit is still busy processing
1268    /// previous inputs.
1269    ///
1270    /// For output endpoints, this setting bounds the number of records that have
1271    /// been produced by the circuit but not yet sent via the output transport endpoint
1272    /// nor stored in the output buffer (see `enable_output_buffer`).
1273    ///
1274    /// Note that this is not a hard bound: there can be a small delay between
1275    /// the backpressure mechanism is triggered and the endpoint is paused, during
1276    /// which more data may be queued.
1277    ///
1278    /// The default is 1 million.
1279    #[serde(default = "default_max_queued_records")]
1280    pub max_queued_records: u64,
1281
1282    /// Create connector in paused state.
1283    ///
1284    /// The default is `false`.
1285    #[serde(default)]
1286    pub paused: bool,
1287
1288    /// Arbitrary user-defined text labels associated with the connector.
1289    ///
1290    /// These labels can be used in conjunction with the `start_after` property
1291    /// to control the start order of connectors.
1292    #[serde(default)]
1293    pub labels: Vec<String>,
1294
1295    /// Start the connector after all connectors with specified labels.
1296    ///
1297    /// This property is used to control the start order of connectors.
1298    /// The connector will not start until all connectors with the specified
1299    /// labels have finished processing all inputs.
1300    #[serde(deserialize_with = "deserialize_start_after")]
1301    #[serde(default)]
1302    pub start_after: Option<Vec<String>>,
1303}
1304
1305impl ConnectorConfig {
1306    /// Compare two configs modulo the `paused` field.
1307    ///
1308    /// Used to compare checkpointed and current connector configs.
1309    pub fn equal_modulo_paused(&self, other: &Self) -> bool {
1310        let mut a = self.clone();
1311        let mut b = other.clone();
1312        a.paused = false;
1313        b.paused = false;
1314        a == b
1315    }
1316}
1317
1318#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1319#[serde(default)]
1320pub struct OutputBufferConfig {
1321    /// Enable output buffering.
1322    ///
1323    /// The output buffering mechanism allows decoupling the rate at which the pipeline
1324    /// pushes changes to the output transport from the rate of input changes.
1325    ///
1326    /// By default, output updates produced by the pipeline are pushed directly to
1327    /// the output transport. Some destinations may prefer to receive updates in fewer
1328    /// bigger batches. For instance, when writing Parquet files, producing
1329    /// one bigger file every few minutes is usually better than creating
1330    /// small files every few milliseconds.
1331    ///
1332    /// To achieve such input/output decoupling, users can enable output buffering by
1333    /// setting the `enable_output_buffer` flag to `true`.  When buffering is enabled, output
1334    /// updates produced by the pipeline are consolidated in an internal buffer and are
1335    /// pushed to the output transport when one of several conditions is satisfied:
1336    ///
1337    /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
1338    ///   milliseconds.
1339    /// * buffer size exceeds `max_output_buffer_size_records` records.
1340    ///
1341    /// This flag is `false` by default.
1342    // TODO: on-demand output triggered via the API.
1343    pub enable_output_buffer: bool,
1344
1345    /// Maximum time in milliseconds data is kept in the output buffer.
1346    ///
1347    /// By default, data is kept in the buffer indefinitely until one of
1348    /// the other output conditions is satisfied.  When this option is
1349    /// set the buffer will be flushed at most every
1350    /// `max_output_buffer_time_millis` milliseconds.
1351    ///
1352    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1353    /// to be set.
1354    pub max_output_buffer_time_millis: usize,
1355
1356    /// Maximum number of updates to be kept in the output buffer.
1357    ///
1358    /// This parameter bounds the maximal size of the buffer.
1359    /// Note that the size of the buffer is not always equal to the
1360    /// total number of updates output by the pipeline. Updates to the
1361    /// same record can overwrite or cancel previous updates.
1362    ///
1363    /// By default, the buffer can grow indefinitely until one of
1364    /// the other output conditions is satisfied.
1365    ///
1366    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1367    /// to be set.
1368    pub max_output_buffer_size_records: usize,
1369}
1370
1371impl Default for OutputBufferConfig {
1372    fn default() -> Self {
1373        Self {
1374            enable_output_buffer: false,
1375            max_output_buffer_size_records: usize::MAX,
1376            max_output_buffer_time_millis: usize::MAX,
1377        }
1378    }
1379}
1380
1381impl OutputBufferConfig {
1382    pub fn validate(&self) -> Result<(), String> {
1383        if self.enable_output_buffer
1384            && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
1385            && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
1386        {
1387            return Err(
1388                "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
1389                    .to_string(),
1390            );
1391        }
1392
1393        Ok(())
1394    }
1395}
1396
1397/// Describes an output connector configuration
1398#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1399pub struct OutputEndpointConfig {
1400    /// The name of the output stream of the circuit that this endpoint is
1401    /// connected to.
1402    pub stream: Cow<'static, str>,
1403
1404    /// Connector configuration.
1405    #[serde(flatten)]
1406    pub connector_config: ConnectorConfig,
1407}
1408
1409/// Transport-specific endpoint configuration passed to
1410/// `crate::OutputTransport::new_endpoint`
1411/// and `crate::InputTransport::new_endpoint`.
1412#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1413#[serde(tag = "name", content = "config", rename_all = "snake_case")]
1414pub enum TransportConfig {
1415    FileInput(FileInputConfig),
1416    FileOutput(FileOutputConfig),
1417    NatsInput(NatsInputConfig),
1418    KafkaInput(KafkaInputConfig),
1419    KafkaOutput(KafkaOutputConfig),
1420    PubSubInput(PubSubInputConfig),
1421    UrlInput(UrlInputConfig),
1422    S3Input(S3InputConfig),
1423    DeltaTableInput(DeltaTableReaderConfig),
1424    DeltaTableOutput(DeltaTableWriterConfig),
1425    RedisOutput(RedisOutputConfig),
1426    // Prevent rust from complaining about large size difference between enum variants.
1427    IcebergInput(Box<IcebergReaderConfig>),
1428    PostgresInput(PostgresReaderConfig),
1429    PostgresOutput(PostgresWriterConfig),
1430    Datagen(DatagenInputConfig),
1431    Nexmark(NexmarkInputConfig),
1432    /// Direct HTTP input: cannot be instantiated through API
1433    HttpInput(HttpInputConfig),
1434    /// Direct HTTP output: cannot be instantiated through API
1435    HttpOutput,
1436    /// Ad hoc input: cannot be instantiated through API
1437    AdHocInput(AdHocInputConfig),
1438    ClockInput(ClockConfig),
1439}
1440
1441impl TransportConfig {
1442    pub fn name(&self) -> String {
1443        match self {
1444            TransportConfig::FileInput(_) => "file_input".to_string(),
1445            TransportConfig::FileOutput(_) => "file_output".to_string(),
1446            TransportConfig::NatsInput(_) => "nats_input".to_string(),
1447            TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
1448            TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
1449            TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
1450            TransportConfig::UrlInput(_) => "url_input".to_string(),
1451            TransportConfig::S3Input(_) => "s3_input".to_string(),
1452            TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
1453            TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
1454            TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
1455            TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
1456            TransportConfig::PostgresOutput(_) => "postgres_output".to_string(),
1457            TransportConfig::Datagen(_) => "datagen".to_string(),
1458            TransportConfig::Nexmark(_) => "nexmark".to_string(),
1459            TransportConfig::HttpInput(_) => "http_input".to_string(),
1460            TransportConfig::HttpOutput => "http_output".to_string(),
1461            TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
1462            TransportConfig::RedisOutput(_) => "redis_output".to_string(),
1463            TransportConfig::ClockInput(_) => "clock".to_string(),
1464        }
1465    }
1466
1467    /// Returns true if the connector is transient, i.e., is created and destroyed
1468    /// at runtime on demand, rather than being configured as part of the pipeline.
1469    pub fn is_transient(&self) -> bool {
1470        matches!(
1471            self,
1472            TransportConfig::AdHocInput(_)
1473                | TransportConfig::HttpInput(_)
1474                | TransportConfig::HttpOutput
1475                | TransportConfig::ClockInput(_)
1476        )
1477    }
1478}
1479
1480/// Data format specification used to parse raw data received from the
1481/// endpoint or to encode data sent to the endpoint.
1482#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
1483pub struct FormatConfig {
1484    /// Format name, e.g., "csv", "json", "bincode", etc.
1485    pub name: Cow<'static, str>,
1486
1487    /// Format-specific parser or encoder configuration.
1488    #[serde(default)]
1489    #[schema(value_type = Object)]
1490    pub config: JsonValue,
1491}
1492
1493#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, ToSchema)]
1494#[serde(default)]
1495pub struct ResourceConfig {
1496    /// The minimum number of CPU cores to reserve
1497    /// for an instance of this pipeline
1498    #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1499    pub cpu_cores_min: Option<f64>,
1500
1501    /// The maximum number of CPU cores to reserve
1502    /// for an instance of this pipeline
1503    #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1504    pub cpu_cores_max: Option<f64>,
1505
1506    /// The minimum memory in Megabytes to reserve
1507    /// for an instance of this pipeline
1508    pub memory_mb_min: Option<u64>,
1509
1510    /// The maximum memory in Megabytes to reserve
1511    /// for an instance of this pipeline
1512    pub memory_mb_max: Option<u64>,
1513
1514    /// The total storage in Megabytes to reserve
1515    /// for an instance of this pipeline
1516    pub storage_mb_max: Option<u64>,
1517
1518    /// Storage class to use for an instance of this pipeline.
1519    /// The class determines storage performance such as IOPS and throughput.
1520    pub storage_class: Option<String>,
1521
1522    /// Kubernetes service account name to use for an instance of this pipeline.
1523    /// The account determines permissions and access controls.
1524    pub service_account_name: Option<String>,
1525
1526    /// Kubernetes namespace to use for an instance of this pipeline.
1527    /// The namespace determines the scope of names for resources created
1528    /// for the pipeline.
1529    /// If not set, the pipeline will be deployed in the same namespace
1530    /// as the control-plane.
1531    pub namespace: Option<String>,
1532}