feldera_types/
config.rs

1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure.  The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs.  We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::secret_resolver::default_secrets_directory;
9use crate::transport::adhoc::AdHocInputConfig;
10use crate::transport::clock::ClockConfig;
11use crate::transport::datagen::DatagenInputConfig;
12use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
13use crate::transport::file::{FileInputConfig, FileOutputConfig};
14use crate::transport::http::HttpInputConfig;
15use crate::transport::iceberg::IcebergReaderConfig;
16use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
17use crate::transport::nexmark::NexmarkInputConfig;
18use crate::transport::postgres::{PostgresReaderConfig, PostgresWriterConfig};
19use crate::transport::pubsub::PubSubInputConfig;
20use crate::transport::redis::RedisOutputConfig;
21use crate::transport::s3::S3InputConfig;
22use crate::transport::url::UrlInputConfig;
23use core::fmt;
24use serde::de::{self, MapAccess, Visitor};
25use serde::{Deserialize, Deserializer, Serialize};
26use serde_json::Value as JsonValue;
27use std::fmt::Display;
28use std::path::Path;
29use std::str::FromStr;
30use std::time::Duration;
31use std::{borrow::Cow, cmp::max, collections::BTreeMap};
32use utoipa::openapi::{ObjectBuilder, OneOfBuilder, Ref, RefOr, Schema, SchemaType};
33use utoipa::ToSchema;
34
35const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
36
37/// Default value of `ConnectorConfig::max_queued_records`.
38pub const fn default_max_queued_records() -> u64 {
39    1_000_000
40}
41
42/// Default maximum batch size for connectors, in records.
43///
44/// If you change this then update the comment on
45/// [ConnectorConfig::max_batch_size].
46pub const fn default_max_batch_size() -> u64 {
47    10_000
48}
49
50pub const DEFAULT_CLOCK_RESOLUTION_USECS: u64 = 1_000_000;
51
52/// Pipeline deployment configuration.
53/// It represents configuration entries directly provided by the user
54/// (e.g., runtime configuration) and entries derived from the schema
55/// of the compiled program (e.g., connectors). Storage configuration,
56/// if applicable, is set by the runner.
57#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
58pub struct PipelineConfig {
59    /// Global controller configuration.
60    #[serde(flatten)]
61    #[schema(inline)]
62    pub global: RuntimeConfig,
63
64    /// Pipeline name.
65    pub name: Option<String>,
66
67    /// Configuration for persistent storage
68    ///
69    /// If `global.storage` is `Some(_)`, this field must be set to some
70    /// [`StorageConfig`].  If `global.storage` is `None``, the pipeline ignores
71    /// this field.
72    #[serde(default)]
73    pub storage_config: Option<StorageConfig>,
74
75    /// Directory containing values of secrets.
76    ///
77    /// If this is not set, a default directory is used.
78    pub secrets_dir: Option<String>,
79
80    /// Input endpoint configuration.
81    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
82
83    /// Output endpoint configuration.
84    #[serde(default)]
85    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
86}
87
88impl PipelineConfig {
89    pub fn max_parallel_connector_init(&self) -> u64 {
90        max(
91            self.global
92                .max_parallel_connector_init
93                .unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
94            1,
95        )
96    }
97
98    pub fn with_storage(self, storage: Option<(StorageConfig, StorageOptions)>) -> Self {
99        let (storage_config, storage_options) = storage.unzip();
100        Self {
101            global: RuntimeConfig {
102                storage: storage_options,
103                ..self.global
104            },
105            storage_config,
106            ..self
107        }
108    }
109
110    pub fn storage(&self) -> Option<(&StorageConfig, &StorageOptions)> {
111        let storage_options = self.global.storage.as_ref();
112        let storage_config = self.storage_config.as_ref();
113        storage_config.zip(storage_options)
114    }
115
116    /// Returns `self.secrets_dir`, or the default secrets directory if it isn't
117    /// set.
118    pub fn secrets_dir(&self) -> &Path {
119        match &self.secrets_dir {
120            Some(dir) => Path::new(dir.as_str()),
121            None => default_secrets_directory(),
122        }
123    }
124}
125
126/// Configuration for persistent storage in a [`PipelineConfig`].
127#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
128pub struct StorageConfig {
129    /// A directory to keep pipeline state, as a path on the filesystem of the
130    /// machine or container where the pipeline will run.
131    ///
132    /// When storage is enabled, this directory stores the data for
133    /// [StorageBackendConfig::Default].
134    ///
135    /// When fault tolerance is enabled, this directory stores checkpoints and
136    /// the log.
137    pub path: String,
138
139    /// How to cache access to storage in this pipeline.
140    #[serde(default)]
141    pub cache: StorageCacheConfig,
142}
143
144impl StorageConfig {
145    pub fn path(&self) -> &Path {
146        Path::new(&self.path)
147    }
148}
149
150/// How to cache access to storage within a Feldera pipeline.
151#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
152#[serde(rename_all = "snake_case")]
153pub enum StorageCacheConfig {
154    /// Use the operating system's page cache as the primary storage cache.
155    ///
156    /// This is the default because it currently performs better than
157    /// `FelderaCache`.
158    #[default]
159    PageCache,
160
161    /// Use Feldera's internal cache implementation.
162    ///
163    /// This is under development. It will become the default when its
164    /// performance exceeds that of `PageCache`.
165    FelderaCache,
166}
167
168impl StorageCacheConfig {
169    #[cfg(unix)]
170    pub fn to_custom_open_flags(&self) -> i32 {
171        match self {
172            StorageCacheConfig::PageCache => (),
173            StorageCacheConfig::FelderaCache => {
174                #[cfg(target_os = "linux")]
175                return libc::O_DIRECT;
176            }
177        }
178        0
179    }
180}
181
182/// Storage configuration for a pipeline.
183#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
184#[serde(default)]
185pub struct StorageOptions {
186    /// How to connect to the underlying storage.
187    pub backend: StorageBackendConfig,
188
189    /// For a batch of data maintained as part of a persistent index during a
190    /// pipeline run, the minimum estimated number of bytes to write it to
191    /// storage.
192    ///
193    /// This is provided for debugging and fine-tuning and should ordinarily be
194    /// left unset.
195    ///
196    /// A value of 0 will write even empty batches to storage, and nonzero
197    /// values provide a threshold.  `usize::MAX` would effectively disable
198    /// storage for such batches.  The default is 1,048,576 (1 MiB).
199    pub min_storage_bytes: Option<usize>,
200
201    /// For a batch of data passed through the pipeline during a single step,
202    /// the minimum estimated number of bytes to write it to storage.
203    ///
204    /// This is provided for debugging and fine-tuning and should ordinarily be
205    /// left unset.  A value of 0 will write even empty batches to storage, and
206    /// nonzero values provide a threshold.  `usize::MAX`, the default,
207    /// effectively disables storage for such batches.  If it is set to another
208    /// value, it should ordinarily be greater than or equal to
209    /// `min_storage_bytes`.
210    pub min_step_storage_bytes: Option<usize>,
211
212    /// The form of compression to use in data batches.
213    ///
214    /// Compression has a CPU cost but it can take better advantage of limited
215    /// NVMe and network bandwidth, which means that it can increase overall
216    /// performance.
217    pub compression: StorageCompression,
218
219    /// The maximum size of the in-memory storage cache, in MiB.
220    ///
221    /// If set, the specified cache size is spread across all the foreground and
222    /// background threads. If unset, each foreground or background thread cache
223    /// is limited to 256 MiB.
224    pub cache_mib: Option<usize>,
225}
226
227/// Backend storage configuration.
228#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
229#[serde(tag = "name", content = "config", rename_all = "snake_case")]
230pub enum StorageBackendConfig {
231    /// Use the default storage configuration.
232    ///
233    /// This currently uses the local file system.
234    #[default]
235    Default,
236
237    /// Use the local file system.
238    ///
239    /// This uses ordinary system file operations.
240    File(Box<FileBackendConfig>),
241
242    /// Object storage.
243    Object(ObjectStorageConfig),
244}
245
246impl Display for StorageBackendConfig {
247    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248        match self {
249            StorageBackendConfig::Default => write!(f, "default"),
250            StorageBackendConfig::File(_) => write!(f, "file"),
251            StorageBackendConfig::Object(_) => write!(f, "object"),
252        }
253    }
254}
255
256/// Storage compression algorithm.
257#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
258#[serde(rename_all = "snake_case")]
259pub enum StorageCompression {
260    /// Use Feldera's default compression algorithm.
261    ///
262    /// The default may change as Feldera's performance is tuned and new
263    /// algorithms are introduced.
264    #[default]
265    Default,
266
267    /// Do not compress.
268    None,
269
270    /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
271    Snappy,
272}
273
274#[derive(Debug, Clone, Eq, PartialEq)]
275pub enum StartFromCheckpoint {
276    Latest,
277    Uuid(uuid::Uuid),
278}
279
280impl ToSchema<'_> for StartFromCheckpoint {
281    fn schema() -> (
282        &'static str,
283        utoipa::openapi::RefOr<utoipa::openapi::schema::Schema>,
284    ) {
285        (
286            "StartFromCheckpoint",
287            utoipa::openapi::RefOr::T(Schema::OneOf(
288                OneOfBuilder::new()
289                    .item(
290                        ObjectBuilder::new()
291                            .schema_type(SchemaType::String)
292                            .enum_values(Some(["latest"].into_iter()))
293                            .build(),
294                    )
295                    .item(
296                        ObjectBuilder::new()
297                            .schema_type(SchemaType::String)
298                            .format(Some(utoipa::openapi::SchemaFormat::KnownFormat(
299                                utoipa::openapi::KnownFormat::Uuid,
300                            )))
301                            .build(),
302                    )
303                    .nullable(true)
304                    .build(),
305            )),
306        )
307    }
308}
309
310impl<'de> Deserialize<'de> for StartFromCheckpoint {
311    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
312    where
313        D: Deserializer<'de>,
314    {
315        struct StartFromCheckpointVisitor;
316
317        impl<'de> Visitor<'de> for StartFromCheckpointVisitor {
318            type Value = StartFromCheckpoint;
319
320            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
321                formatter.write_str("a UUID string or the string \"latest\"")
322            }
323
324            fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
325            where
326                E: de::Error,
327            {
328                if value == "latest" {
329                    Ok(StartFromCheckpoint::Latest)
330                } else {
331                    uuid::Uuid::parse_str(value)
332                        .map(StartFromCheckpoint::Uuid)
333                        .map_err(|_| E::invalid_value(serde::de::Unexpected::Str(value), &self))
334                }
335            }
336        }
337
338        deserializer.deserialize_str(StartFromCheckpointVisitor)
339    }
340}
341
342impl Serialize for StartFromCheckpoint {
343    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
344    where
345        S: serde::Serializer,
346    {
347        match self {
348            StartFromCheckpoint::Latest => serializer.serialize_str("latest"),
349            StartFromCheckpoint::Uuid(uuid) => serializer.serialize_str(&uuid.to_string()),
350        }
351    }
352}
353
354#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
355pub struct SyncConfig {
356    /// The endpoint URL for the storage service.
357    ///
358    /// This is typically required for custom or local S3-compatible storage providers like MinIO.
359    /// Example: `http://localhost:9000`
360    ///
361    /// Relevant rclone config key: [`endpoint`](https://rclone.org/s3/#s3-endpoint)
362    pub endpoint: Option<String>,
363
364    /// The name of the storage bucket.
365    ///
366    /// This may include a path to a folder inside the bucket (e.g., `my-bucket/data`).
367    pub bucket: String,
368
369    /// The region that this bucket is in.
370    ///
371    /// Leave empty for Minio or the default region (`us-east-1` for AWS).
372    pub region: Option<String>,
373
374    /// The name of the cloud storage provider (e.g., `"AWS"`, `"Minio"`).
375    ///
376    /// Used for provider-specific behavior in rclone.
377    /// If omitted, defaults to `"Other"`.
378    ///
379    /// See [rclone S3 provider documentation](https://rclone.org/s3/#s3-provider)
380    pub provider: Option<String>,
381
382    /// The access key used to authenticate with the storage provider.
383    ///
384    /// If not provided, rclone will fall back to environment-based credentials, such as
385    /// `RCLONE_S3_ACCESS_KEY_ID`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
386    /// this can be left empty to allow automatic authentication via the pod's service account.
387    pub access_key: Option<String>,
388
389    /// The secret key used together with the access key for authentication.
390    ///
391    /// If not provided, rclone will fall back to environment-based credentials, such as
392    /// `RCLONE_S3_SECRET_ACCESS_KEY`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
393    /// this can be left empty to allow automatic authentication via the pod's service account.
394    pub secret_key: Option<String>,
395
396    /// When set, the pipeline will try fetch the specified checkpoint from the
397    /// object store.
398    ///
399    /// If `fail_if_no_checkpoint` is `true`, the pipeline will fail to initialize.
400    pub start_from_checkpoint: Option<StartFromCheckpoint>,
401
402    /// When true, the pipeline will fail to initialize if fetching the
403    /// specified checkpoint fails (missing, download error).
404    /// When false, the pipeline will start from scratch instead.
405    ///
406    /// False by default.
407    #[schema(default = std::primitive::bool::default)]
408    #[serde(default)]
409    pub fail_if_no_checkpoint: bool,
410
411    /// The number of file transfers to run in parallel.
412    /// Default: 20
413    pub transfers: Option<u8>,
414
415    /// The number of checkers to run in parallel.
416    /// Default: 20
417    pub checkers: Option<u8>,
418
419    /// Set to skip post copy check of checksums, and only check the file sizes.
420    /// This can significantly improve the throughput.
421    /// Defualt: false
422    pub ignore_checksum: Option<bool>,
423
424    /// Number of streams to use for multi-thread downloads.
425    /// Default: 10
426    pub multi_thread_streams: Option<u8>,
427
428    /// Use multi-thread download for files above this size.
429    /// Format: `[size][Suffix]` (Example: 1G, 500M)
430    /// Supported suffixes: k|M|G|T
431    /// Default: 100M
432    pub multi_thread_cutoff: Option<String>,
433
434    /// The number of chunks of the same file that are uploaded for multipart uploads.
435    /// Default: 10
436    pub upload_concurrency: Option<u8>,
437
438    /// When `true`, the pipeline starts in **standby** mode; processing doesn't
439    /// start until activation (`POST /activate`).
440    /// If this pipeline was previously activated and the storage has not been
441    /// cleared, the pipeline will auto activate, no newer checkpoints will be
442    /// fetched.
443    ///
444    /// Standby behavior depends on `start_from_checkpoint`:
445    /// - If `latest`, pipeline continuously fetches the latest available
446    ///   checkpoint until activated.
447    /// - If checkpoint UUID, pipeline fetches this checkpoint once and waits
448    ///   in standby until activated.
449    ///
450    /// Default: `false`
451    #[schema(default = std::primitive::bool::default)]
452    #[serde(default)]
453    pub standby: bool,
454
455    /// The interval (in seconds) between each attempt to fetch the latest
456    /// checkpoint from object store while in standby mode.
457    ///
458    /// Applies only when `start_from_checkpoint` is set to `latest`.
459    ///
460    /// Default: 10 seconds
461    #[schema(default = default_pull_interval)]
462    #[serde(default = "default_pull_interval")]
463    pub pull_interval: u64,
464
465    /// Extra flags to pass to `rclone`.
466    ///
467    /// WARNING: Supplying incorrect or conflicting flags can break `rclone`.
468    /// Use with caution.
469    ///
470    /// Refer to the docs to see the supported flags:
471    /// - [Global flags](https://rclone.org/flags/)
472    /// - [S3 specific flags](https://rclone.org/s3/)
473    pub flags: Option<Vec<String>>,
474
475    /// The minimum number of checkpoints to retain in object store.
476    /// No checkpoints will be deleted if the total count is below this threshold.
477    ///
478    /// Default: 10
479    #[schema(default = default_retention_min_count)]
480    #[serde(default = "default_retention_min_count")]
481    pub retention_min_count: u32,
482
483    /// The minimum age (in days) a checkpoint must reach before it becomes
484    /// eligible for deletion. All younger checkpoints will be preserved.
485    ///
486    /// Default: 30
487    #[schema(default = default_retention_min_age)]
488    #[serde(default = "default_retention_min_age")]
489    pub retention_min_age: u32,
490}
491
492fn default_pull_interval() -> u64 {
493    10
494}
495
496fn default_retention_min_count() -> u32 {
497    10
498}
499
500fn default_retention_min_age() -> u32 {
501    30
502}
503
504impl SyncConfig {
505    pub fn validate(&self) -> Result<(), String> {
506        if self.standby && self.start_from_checkpoint.is_none() {
507            return Err(r#"invalid sync config: `standby` set to `true` but `start_from_checkpoint` not set.
508Standby mode requires `start_from_checkpoint` to be set.
509Consider setting `start_from_checkpoint` to `"latest"`."#.to_owned());
510        }
511
512        Ok(())
513    }
514}
515
516#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
517pub struct ObjectStorageConfig {
518    /// URL.
519    ///
520    /// The following URL schemes are supported:
521    ///
522    /// * S3:
523    ///   - `s3://<bucket>/<path>`
524    ///   - `s3a://<bucket>/<path>`
525    ///   - `https://s3.<region>.amazonaws.com/<bucket>`
526    ///   - `https://<bucket>.s3.<region>.amazonaws.com`
527    ///   - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
528    /// * Google Cloud Storage:
529    ///   - `gs://<bucket>/<path>`
530    /// * Microsoft Azure Blob Storage:
531    ///   - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
532    ///   - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
533    ///   - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
534    ///   - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
535    ///   - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
536    ///   - `azure://<container>/<path>` (custom)
537    ///   - `https://<account>.dfs.core.windows.net`
538    ///   - `https://<account>.blob.core.windows.net`
539    ///   - `https://<account>.blob.core.windows.net/<container>`
540    ///   - `https://<account>.dfs.fabric.microsoft.com`
541    ///   - `https://<account>.dfs.fabric.microsoft.com/<container>`
542    ///   - `https://<account>.blob.fabric.microsoft.com`
543    ///   - `https://<account>.blob.fabric.microsoft.com/<container>`
544    ///
545    /// Settings derived from the URL will override other settings.
546    pub url: String,
547
548    /// Additional options as key-value pairs.
549    ///
550    /// The following keys are supported:
551    ///
552    /// * S3:
553    ///   - `access_key_id`: AWS Access Key.
554    ///   - `secret_access_key`: AWS Secret Access Key.
555    ///   - `region`: Region.
556    ///   - `default_region`: Default region.
557    ///   - `endpoint`: Custom endpoint for communicating with S3,
558    ///     e.g. `https://localhost:4566` for testing against a localstack
559    ///     instance.
560    ///   - `token`: Token to use for requests (passed to underlying provider).
561    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
562    /// * Google Cloud Storage:
563    ///   - `service_account`: Path to the service account file.
564    ///   - `service_account_key`: The serialized service account key.
565    ///   - `google_application_credentials`: Application credentials path.
566    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
567    /// * Microsoft Azure Blob Storage:
568    ///   - `access_key`: Azure Access Key.
569    ///   - `container_name`: Azure Container Name.
570    ///   - `account`: Azure Account.
571    ///   - `bearer_token_authorization`: Static bearer token for authorizing requests.
572    ///   - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
573    ///   - `client_secret`: Client secret for use in client secret flow.
574    ///   - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
575    ///   - `endpoint`: Override the endpoint for communicating with blob storage.
576    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
577    ///
578    /// Options set through the URL take precedence over those set with these
579    /// options.
580    #[serde(flatten)]
581    pub other_options: BTreeMap<String, String>,
582}
583
584/// Configuration for local file system access.
585#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
586#[serde(default)]
587pub struct FileBackendConfig {
588    /// Whether to use background threads for file I/O.
589    ///
590    /// Background threads should improve performance, but they can reduce
591    /// performance if too few cores are available. This is provided for
592    /// debugging and fine-tuning and should ordinarily be left unset.
593    pub async_threads: Option<bool>,
594
595    /// Per-I/O operation sleep duration, in milliseconds.
596    ///
597    /// This is for simulating slow storage devices.  Do not use this in
598    /// production.
599    pub ioop_delay: Option<u64>,
600
601    /// Configuration to synchronize checkpoints to object store.
602    pub sync: Option<SyncConfig>,
603}
604
605/// Global pipeline configuration settings. This is the publicly
606/// exposed type for users to configure pipelines.
607#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
608#[serde(default)]
609pub struct RuntimeConfig {
610    /// Number of DBSP worker threads.
611    ///
612    /// Each DBSP "foreground" worker thread is paired with a "background"
613    /// thread for LSM merging, making the total number of threads twice the
614    /// specified number.
615    ///
616    /// The typical sweet spot for the number of workers is between 4 and 16.
617    /// Each worker increases overall memory consumption for data structures
618    /// used during a step.
619    pub workers: u16,
620
621    /// Storage configuration.
622    ///
623    /// - If this is `None`, the default, the pipeline's state is kept in
624    ///   in-memory data-structures.  This is useful if the pipeline's state
625    ///   will fit in memory and if the pipeline is ephemeral and does not need
626    ///   to be recovered after a restart. The pipeline will most likely run
627    ///   faster since it does not need to access storage.
628    ///
629    /// - If set, the pipeline's state is kept on storage.  This allows the
630    ///   pipeline to work with state that will not fit into memory. It also
631    ///   allows the state to be checkpointed and recovered across restarts.
632    #[serde(deserialize_with = "deserialize_storage_options")]
633    pub storage: Option<StorageOptions>,
634
635    /// Fault tolerance configuration.
636    #[serde(deserialize_with = "deserialize_fault_tolerance")]
637    pub fault_tolerance: FtConfig,
638
639    /// Enable CPU profiler.
640    ///
641    /// The default value is `true`.
642    pub cpu_profiler: bool,
643
644    /// Enable pipeline tracing.
645    pub tracing: bool,
646
647    /// Jaeger tracing endpoint to send tracing information to.
648    pub tracing_endpoint_jaeger: String,
649
650    /// Minimal input batch size.
651    ///
652    /// The controller delays pushing input records to the circuit until at
653    /// least `min_batch_size_records` records have been received (total
654    /// across all endpoints) or `max_buffering_delay_usecs` microseconds
655    /// have passed since at least one input records has been buffered.
656    /// Defaults to 0.
657    pub min_batch_size_records: u64,
658
659    /// Maximal delay in microseconds to wait for `min_batch_size_records` to
660    /// get buffered by the controller, defaults to 0.
661    pub max_buffering_delay_usecs: u64,
662
663    /// Resource reservations and limits. This is enforced
664    /// only in Feldera Cloud.
665    pub resources: ResourceConfig,
666
667    /// Real-time clock resolution in microseconds.
668    ///
669    /// This parameter controls the execution of queries that use the `NOW()` function.  The output of such
670    /// queries depends on the real-time clock and can change over time without any external
671    /// inputs.  If the query uses `NOW()`, the pipeline will update the clock value and trigger incremental
672    /// recomputation at most each `clock_resolution_usecs` microseconds.  If the query does not use
673    /// `NOW()`, then clock value updates are suppressed and the pipeline ignores this setting.
674    ///
675    /// It is set to 1 second (1,000,000 microseconds) by default.
676    pub clock_resolution_usecs: Option<u64>,
677
678    /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
679    /// its worker threads.  Specify at least twice as many CPU numbers as
680    /// workers.  CPUs are generally numbered starting from 0.  The pipeline
681    /// might not be able to honor CPU pinning requests.
682    ///
683    /// CPU pinning can make pipelines run faster and perform more consistently,
684    /// as long as different pipelines running on the same machine are pinned to
685    /// different CPUs.
686    pub pin_cpus: Vec<usize>,
687
688    /// Timeout in seconds for the `Provisioning` phase of the pipeline.
689    /// Setting this value will override the default of the runner.
690    pub provisioning_timeout_secs: Option<u64>,
691
692    /// The maximum number of connectors initialized in parallel during pipeline
693    /// startup.
694    ///
695    /// At startup, the pipeline must initialize all of its input and output connectors.
696    /// Depending on the number and types of connectors, this can take a long time.
697    /// To accelerate the process, multiple connectors are initialized concurrently.
698    /// This option controls the maximum number of connectors that can be initialized
699    /// in parallel.
700    ///
701    /// The default is 10.
702    pub max_parallel_connector_init: Option<u64>,
703
704    /// Specification of additional (sidecar) containers.
705    pub init_containers: Option<serde_json::Value>,
706
707    /// Deprecated: setting this true or false does not have an effect anymore.
708    pub checkpoint_during_suspend: bool,
709
710    /// Sets the number of available runtime threads for the http server.
711    ///
712    /// In most cases, this does not need to be set explicitly and
713    /// the default is sufficient. Can be increased in case the
714    /// pipeline HTTP API operations are a bottleneck.
715    ///
716    /// If not specified, the default is set to `workers`.
717    pub http_workers: Option<u64>,
718
719    /// Sets the number of available runtime threads for async IO tasks.
720    ///
721    /// This affects some networking and file I/O operations
722    /// especially adapters and ad-hoc queries.
723    ///
724    /// In most cases, this does not need to be set explicitly and
725    /// the default is sufficient. Can be increased in case
726    /// ingress, egress or ad-hoc queries are a bottleneck.
727    ///
728    /// If not specified, the default is set to `workers`.
729    pub io_workers: Option<u64>,
730
731    /// Optional settings for tweaking Feldera internals.
732    ///
733    /// The available key-value pairs change from one version of Feldera to
734    /// another, so users should not depend on particular settings being
735    /// available, or on their behavior.
736    pub dev_tweaks: BTreeMap<String, serde_json::Value>,
737
738    /// Log filtering directives.
739    ///
740    /// If set to a valid [tracing-subscriber] filter, this controls the log
741    /// messages emitted by the pipeline process.  Otherwise, or if the filter
742    /// has invalid syntax, messages at "info" severity and higher are written
743    /// to the log and all others are discarded.
744    ///
745    /// [tracing-subscriber]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
746    pub logging: Option<String>,
747}
748
749/// Accepts "true" and "false" and converts them to the new format.
750fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
751where
752    D: Deserializer<'de>,
753{
754    struct BoolOrStruct;
755
756    impl<'de> Visitor<'de> for BoolOrStruct {
757        type Value = Option<StorageOptions>;
758
759        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
760            formatter.write_str("boolean or StorageOptions")
761        }
762
763        fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
764        where
765            E: de::Error,
766        {
767            match v {
768                false => Ok(None),
769                true => Ok(Some(StorageOptions::default())),
770            }
771        }
772
773        fn visit_unit<E>(self) -> Result<Self::Value, E>
774        where
775            E: de::Error,
776        {
777            Ok(None)
778        }
779
780        fn visit_none<E>(self) -> Result<Self::Value, E>
781        where
782            E: de::Error,
783        {
784            Ok(None)
785        }
786
787        fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
788        where
789            M: MapAccess<'de>,
790        {
791            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
792        }
793    }
794
795    deserializer.deserialize_any(BoolOrStruct)
796}
797
798/// Accepts very old 'initial_state' and 'latest_checkpoint' as enabling fault
799/// tolerance.
800///
801/// Accepts `null` as disabling fault tolerance.
802///
803/// Otherwise, deserializes [FtConfig] in the way that one might otherwise
804/// expect.
805fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<FtConfig, D::Error>
806where
807    D: Deserializer<'de>,
808{
809    struct StringOrStruct;
810
811    impl<'de> Visitor<'de> for StringOrStruct {
812        type Value = FtConfig;
813
814        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
815            formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
816        }
817
818        fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
819        where
820            E: de::Error,
821        {
822            match v {
823                "initial_state" | "latest_checkpoint" => Ok(FtConfig {
824                    model: Some(FtModel::default()),
825                    ..FtConfig::default()
826                }),
827                _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
828            }
829        }
830
831        fn visit_unit<E>(self) -> Result<Self::Value, E>
832        where
833            E: de::Error,
834        {
835            Ok(FtConfig::default())
836        }
837
838        fn visit_none<E>(self) -> Result<Self::Value, E>
839        where
840            E: de::Error,
841        {
842            Ok(FtConfig::default())
843        }
844
845        fn visit_map<M>(self, map: M) -> Result<FtConfig, M::Error>
846        where
847            M: MapAccess<'de>,
848        {
849            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))
850        }
851    }
852
853    deserializer.deserialize_any(StringOrStruct)
854}
855
856impl Default for RuntimeConfig {
857    fn default() -> Self {
858        Self {
859            workers: 8,
860            storage: Some(StorageOptions::default()),
861            fault_tolerance: FtConfig::default(),
862            cpu_profiler: true,
863            tracing: {
864                // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
865                // to keep it on by default.
866                false
867            },
868            tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
869            min_batch_size_records: 0,
870            max_buffering_delay_usecs: 0,
871            resources: ResourceConfig::default(),
872            clock_resolution_usecs: { Some(DEFAULT_CLOCK_RESOLUTION_USECS) },
873            pin_cpus: Vec::new(),
874            provisioning_timeout_secs: None,
875            max_parallel_connector_init: None,
876            init_containers: None,
877            checkpoint_during_suspend: true,
878            io_workers: None,
879            http_workers: None,
880            dev_tweaks: BTreeMap::default(),
881            logging: None,
882        }
883    }
884}
885
886/// Fault-tolerance configuration.
887///
888/// The default [FtConfig] (via [FtConfig::default]) disables fault tolerance,
889/// which is the configuration that one gets if [RuntimeConfig] omits fault
890/// tolerance configuration.
891///
892/// The default value for [FtConfig::model] enables fault tolerance, as
893/// `Some(FtModel::default())`.  This is the configuration that one gets if
894/// [RuntimeConfig] includes a fault tolerance configuration but does not
895/// specify a particular model.
896#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
897#[serde(rename_all = "snake_case")]
898pub struct FtConfig {
899    /// Fault tolerance model to use.
900    #[serde(with = "none_as_string")]
901    #[serde(default = "default_model")]
902    #[schema(
903        schema_with = none_as_string_schema::<FtModel>,
904    )]
905    pub model: Option<FtModel>,
906
907    /// Interval between automatic checkpoints, in seconds.
908    ///
909    /// The default is 60 seconds.  Values less than 1 or greater than 3600 will
910    /// be forced into that range.
911    #[serde(default = "default_checkpoint_interval_secs")]
912    pub checkpoint_interval_secs: Option<u64>,
913}
914
915fn default_model() -> Option<FtModel> {
916    Some(FtModel::default())
917}
918
919pub fn default_checkpoint_interval_secs() -> Option<u64> {
920    Some(60)
921}
922
923impl Default for FtConfig {
924    fn default() -> Self {
925        Self {
926            model: None,
927            checkpoint_interval_secs: default_checkpoint_interval_secs(),
928        }
929    }
930}
931
932#[cfg(test)]
933mod test {
934    use super::deserialize_fault_tolerance;
935    use crate::config::{FtConfig, FtModel};
936    use serde::{Deserialize, Serialize};
937
938    #[test]
939    fn ft_config() {
940        #[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)]
941        #[serde(default)]
942        struct Wrapper {
943            #[serde(deserialize_with = "deserialize_fault_tolerance")]
944            config: FtConfig,
945        }
946
947        // Omitting FtConfig, or specifying null, or specifying model "none", disables fault tolerance.
948        for s in [
949            "{}",
950            r#"{"config": null}"#,
951            r#"{"config": {"model": "none"}}"#,
952        ] {
953            let config: Wrapper = serde_json::from_str(s).unwrap();
954            assert_eq!(
955                config,
956                Wrapper {
957                    config: FtConfig {
958                        model: None,
959                        checkpoint_interval_secs: Some(60)
960                    }
961                }
962            );
963        }
964
965        // Serializing disabled FT produces explicit "none" form.
966        let s = serde_json::to_string(&Wrapper {
967            config: FtConfig::default(),
968        })
969        .unwrap();
970        assert!(s.contains("\"none\""));
971
972        // `{}` for FtConfig, or `{...}` with `model` omitted, enables fault
973        // tolerance.
974        for s in [r#"{"config": {}}"#, r#"{"checkpoint_interval_secs": 60}"#] {
975            assert_eq!(
976                serde_json::from_str::<FtConfig>(s).unwrap(),
977                FtConfig {
978                    model: Some(FtModel::default()),
979                    checkpoint_interval_secs: Some(60)
980                }
981            );
982        }
983
984        // `"checkpoint_interval_secs": null` disables periodic checkpointing.
985        assert_eq!(
986            serde_json::from_str::<FtConfig>(r#"{"checkpoint_interval_secs": null}"#).unwrap(),
987            FtConfig {
988                model: Some(FtModel::default()),
989                checkpoint_interval_secs: None
990            }
991        );
992    }
993}
994
995impl FtConfig {
996    pub fn is_enabled(&self) -> bool {
997        self.model.is_some()
998    }
999
1000    /// Returns the checkpoint interval, if fault tolerance is enabled, and
1001    /// otherwise `None`.
1002    pub fn checkpoint_interval(&self) -> Option<Duration> {
1003        if self.is_enabled() {
1004            self.checkpoint_interval_secs
1005                .map(|interval| Duration::from_secs(interval.clamp(1, 3600)))
1006        } else {
1007            None
1008        }
1009    }
1010}
1011
1012/// Serde implementation for de/serializing a string into `Option<T>` where
1013/// `"none"` indicates `None` and any other string indicates `Some`.
1014///
1015/// This could be extended to handle non-strings by adding more forwarding
1016/// `visit_*` methods to the Visitor implementation.  I don't see a way to write
1017/// them automatically.
1018mod none_as_string {
1019    use std::marker::PhantomData;
1020
1021    use serde::de::{Deserialize, Deserializer, IntoDeserializer, Visitor};
1022    use serde::ser::{Serialize, Serializer};
1023
1024    pub(super) fn serialize<S, T>(value: &Option<T>, serializer: S) -> Result<S::Ok, S::Error>
1025    where
1026        S: Serializer,
1027        T: Serialize,
1028    {
1029        match value.as_ref() {
1030            Some(value) => value.serialize(serializer),
1031            None => "none".serialize(serializer),
1032        }
1033    }
1034
1035    struct NoneAsString<T>(PhantomData<fn() -> T>);
1036
1037    impl<'de, T> Visitor<'de> for NoneAsString<T>
1038    where
1039        T: Deserialize<'de>,
1040    {
1041        type Value = Option<T>;
1042
1043        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1044            formatter.write_str("string")
1045        }
1046
1047        fn visit_none<E>(self) -> Result<Self::Value, E>
1048        where
1049            E: serde::de::Error,
1050        {
1051            Ok(None)
1052        }
1053
1054        fn visit_str<E>(self, value: &str) -> Result<Option<T>, E>
1055        where
1056            E: serde::de::Error,
1057        {
1058            if &value.to_ascii_lowercase() == "none" {
1059                Ok(None)
1060            } else {
1061                Ok(Some(T::deserialize(value.into_deserializer())?))
1062            }
1063        }
1064    }
1065
1066    pub(super) fn deserialize<'de, D, T>(deserializer: D) -> Result<Option<T>, D::Error>
1067    where
1068        D: Deserializer<'de>,
1069        T: Deserialize<'de>,
1070    {
1071        deserializer.deserialize_str(NoneAsString(PhantomData))
1072    }
1073}
1074
1075/// Generates an OpenAPI schema for an `Option<T>` field serialized with `none_as_string`.
1076/// The schema is a `oneOf` with a reference to `T`'s schema and a `"none"` string enum.
1077fn none_as_string_schema<'a, T: ToSchema<'a> + Default + Serialize>() -> Schema {
1078    Schema::OneOf(
1079        OneOfBuilder::new()
1080            .item(RefOr::Ref(Ref::new(format!(
1081                "#/components/schemas/{}",
1082                T::schema().0
1083            ))))
1084            .item(
1085                ObjectBuilder::new()
1086                    .schema_type(SchemaType::String)
1087                    .enum_values(Some(vec!["none"])),
1088            )
1089            .default(Some(
1090                serde_json::to_value(T::default()).expect("Failed to serialize default value"),
1091            ))
1092            .build(),
1093    )
1094}
1095
1096/// Fault tolerance model.
1097///
1098/// The ordering is significant: we consider [Self::ExactlyOnce] to be a "higher
1099/// level" of fault tolerance than [Self::AtLeastOnce].
1100#[derive(
1101    Debug, Copy, Clone, Default, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, ToSchema,
1102)]
1103#[serde(rename_all = "snake_case")]
1104pub enum FtModel {
1105    /// Each record is output at least once.  Crashes may duplicate output, but
1106    /// no input or output is dropped.
1107    AtLeastOnce,
1108
1109    /// Each record is output exactly once.  Crashes do not drop or duplicate
1110    /// input or output.
1111    #[default]
1112    ExactlyOnce,
1113}
1114
1115impl FtModel {
1116    pub fn option_as_str(value: Option<FtModel>) -> &'static str {
1117        value.map_or("no", |model| model.as_str())
1118    }
1119
1120    pub fn as_str(&self) -> &'static str {
1121        match self {
1122            FtModel::AtLeastOnce => "at_least_once",
1123            FtModel::ExactlyOnce => "exactly_once",
1124        }
1125    }
1126}
1127
1128pub struct FtModelUnknown;
1129
1130impl FromStr for FtModel {
1131    type Err = FtModelUnknown;
1132
1133    fn from_str(s: &str) -> Result<Self, Self::Err> {
1134        match s.to_ascii_lowercase().as_str() {
1135            "exactly_once" => Ok(Self::ExactlyOnce),
1136            "at_least_once" => Ok(Self::AtLeastOnce),
1137            _ => Err(FtModelUnknown),
1138        }
1139    }
1140}
1141
1142/// Describes an input connector configuration
1143#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1144pub struct InputEndpointConfig {
1145    /// The name of the input stream of the circuit that this endpoint is
1146    /// connected to.
1147    pub stream: Cow<'static, str>,
1148
1149    /// Connector configuration.
1150    #[serde(flatten)]
1151    pub connector_config: ConnectorConfig,
1152}
1153
1154/// Deserialize the `start_after` property of a connector configuration.
1155/// It requires a non-standard deserialization because we want to accept
1156/// either a string or an array of strings.
1157fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
1158where
1159    D: Deserializer<'de>,
1160{
1161    let value = Option::<JsonValue>::deserialize(deserializer)?;
1162    match value {
1163        Some(JsonValue::String(s)) => Ok(Some(vec![s])),
1164        Some(JsonValue::Array(arr)) => {
1165            let vec = arr
1166                .into_iter()
1167                .map(|item| {
1168                    item.as_str()
1169                        .map(|s| s.to_string())
1170                        .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
1171                })
1172                .collect::<Result<Vec<String>, _>>()?;
1173            Ok(Some(vec))
1174        }
1175        Some(JsonValue::Null) | None => Ok(None),
1176        _ => Err(serde::de::Error::custom(
1177            "invalid 'start_after' property: expected a string, an array of strings, or null",
1178        )),
1179    }
1180}
1181
1182/// A data connector's configuration
1183#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1184pub struct ConnectorConfig {
1185    /// Transport endpoint configuration.
1186    pub transport: TransportConfig,
1187
1188    /// Parser configuration.
1189    pub format: Option<FormatConfig>,
1190
1191    /// Name of the index that the connector is attached to.
1192    ///
1193    /// This property is valid for output connectors only.  It is used with data
1194    /// transports and formats that expect output updates in the form of key/value
1195    /// pairs, where the key typically represents a unique id associated with the
1196    /// table or view.
1197    ///
1198    /// To support such output formats, an output connector can be attached to an
1199    /// index created using the SQL CREATE INDEX statement.  An index of a table
1200    /// or view contains the same updates as the table or view itself, indexed by
1201    /// one or more key columns.
1202    ///
1203    /// See individual connector documentation for details on how they work
1204    /// with indexes.
1205    pub index: Option<String>,
1206
1207    /// Output buffer configuration.
1208    #[serde(flatten)]
1209    pub output_buffer_config: OutputBufferConfig,
1210
1211    /// Maximum batch size, in records.
1212    ///
1213    /// This is the maximum number of records to process in one batch through
1214    /// the circuit.  The time and space cost of processing a batch is
1215    /// asymptotically superlinear in the size of the batch, but very small
1216    /// batches are less efficient due to constant factors.
1217    ///
1218    /// This should usually be less than `max_queued_records`, to give the
1219    /// connector a round-trip time to restart and refill the buffer while
1220    /// batches are being processed.
1221    ///
1222    /// Some input adapters might not honor this setting.
1223    ///
1224    /// The default is 10,000.
1225    #[serde(default = "default_max_batch_size")]
1226    pub max_batch_size: u64,
1227
1228    /// Backpressure threshold.
1229    ///
1230    /// Maximal number of records queued by the endpoint before the endpoint
1231    /// is paused by the backpressure mechanism.
1232    ///
1233    /// For input endpoints, this setting bounds the number of records that have
1234    /// been received from the input transport but haven't yet been consumed by
1235    /// the circuit since the circuit, since the circuit is still busy processing
1236    /// previous inputs.
1237    ///
1238    /// For output endpoints, this setting bounds the number of records that have
1239    /// been produced by the circuit but not yet sent via the output transport endpoint
1240    /// nor stored in the output buffer (see `enable_output_buffer`).
1241    ///
1242    /// Note that this is not a hard bound: there can be a small delay between
1243    /// the backpressure mechanism is triggered and the endpoint is paused, during
1244    /// which more data may be queued.
1245    ///
1246    /// The default is 1 million.
1247    #[serde(default = "default_max_queued_records")]
1248    pub max_queued_records: u64,
1249
1250    /// Create connector in paused state.
1251    ///
1252    /// The default is `false`.
1253    #[serde(default)]
1254    pub paused: bool,
1255
1256    /// Arbitrary user-defined text labels associated with the connector.
1257    ///
1258    /// These labels can be used in conjunction with the `start_after` property
1259    /// to control the start order of connectors.
1260    #[serde(default)]
1261    pub labels: Vec<String>,
1262
1263    /// Start the connector after all connectors with specified labels.
1264    ///
1265    /// This property is used to control the start order of connectors.
1266    /// The connector will not start until all connectors with the specified
1267    /// labels have finished processing all inputs.
1268    #[serde(deserialize_with = "deserialize_start_after")]
1269    #[serde(default)]
1270    pub start_after: Option<Vec<String>>,
1271}
1272
1273impl ConnectorConfig {
1274    /// Compare two configs modulo the `paused` field.
1275    ///
1276    /// Used to compare checkpointed and current connector configs.
1277    pub fn equal_modulo_paused(&self, other: &Self) -> bool {
1278        let mut a = self.clone();
1279        let mut b = other.clone();
1280        a.paused = false;
1281        b.paused = false;
1282        a == b
1283    }
1284}
1285
1286#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1287#[serde(default)]
1288pub struct OutputBufferConfig {
1289    /// Enable output buffering.
1290    ///
1291    /// The output buffering mechanism allows decoupling the rate at which the pipeline
1292    /// pushes changes to the output transport from the rate of input changes.
1293    ///
1294    /// By default, output updates produced by the pipeline are pushed directly to
1295    /// the output transport. Some destinations may prefer to receive updates in fewer
1296    /// bigger batches. For instance, when writing Parquet files, producing
1297    /// one bigger file every few minutes is usually better than creating
1298    /// small files every few milliseconds.
1299    ///
1300    /// To achieve such input/output decoupling, users can enable output buffering by
1301    /// setting the `enable_output_buffer` flag to `true`.  When buffering is enabled, output
1302    /// updates produced by the pipeline are consolidated in an internal buffer and are
1303    /// pushed to the output transport when one of several conditions is satisfied:
1304    ///
1305    /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
1306    ///   milliseconds.
1307    /// * buffer size exceeds `max_output_buffer_size_records` records.
1308    ///
1309    /// This flag is `false` by default.
1310    // TODO: on-demand output triggered via the API.
1311    pub enable_output_buffer: bool,
1312
1313    /// Maximum time in milliseconds data is kept in the output buffer.
1314    ///
1315    /// By default, data is kept in the buffer indefinitely until one of
1316    /// the other output conditions is satisfied.  When this option is
1317    /// set the buffer will be flushed at most every
1318    /// `max_output_buffer_time_millis` milliseconds.
1319    ///
1320    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1321    /// to be set.
1322    pub max_output_buffer_time_millis: usize,
1323
1324    /// Maximum number of updates to be kept in the output buffer.
1325    ///
1326    /// This parameter bounds the maximal size of the buffer.
1327    /// Note that the size of the buffer is not always equal to the
1328    /// total number of updates output by the pipeline. Updates to the
1329    /// same record can overwrite or cancel previous updates.
1330    ///
1331    /// By default, the buffer can grow indefinitely until one of
1332    /// the other output conditions is satisfied.
1333    ///
1334    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1335    /// to be set.
1336    pub max_output_buffer_size_records: usize,
1337}
1338
1339impl Default for OutputBufferConfig {
1340    fn default() -> Self {
1341        Self {
1342            enable_output_buffer: false,
1343            max_output_buffer_size_records: usize::MAX,
1344            max_output_buffer_time_millis: usize::MAX,
1345        }
1346    }
1347}
1348
1349impl OutputBufferConfig {
1350    pub fn validate(&self) -> Result<(), String> {
1351        if self.enable_output_buffer
1352            && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
1353            && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
1354        {
1355            return Err(
1356                "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
1357                    .to_string(),
1358            );
1359        }
1360
1361        Ok(())
1362    }
1363}
1364
1365/// Describes an output connector configuration
1366#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1367pub struct OutputEndpointConfig {
1368    /// The name of the output stream of the circuit that this endpoint is
1369    /// connected to.
1370    pub stream: Cow<'static, str>,
1371
1372    /// Connector configuration.
1373    #[serde(flatten)]
1374    pub connector_config: ConnectorConfig,
1375}
1376
1377/// Transport-specific endpoint configuration passed to
1378/// `crate::OutputTransport::new_endpoint`
1379/// and `crate::InputTransport::new_endpoint`.
1380#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1381#[serde(tag = "name", content = "config", rename_all = "snake_case")]
1382pub enum TransportConfig {
1383    FileInput(FileInputConfig),
1384    FileOutput(FileOutputConfig),
1385    KafkaInput(KafkaInputConfig),
1386    KafkaOutput(KafkaOutputConfig),
1387    PubSubInput(PubSubInputConfig),
1388    UrlInput(UrlInputConfig),
1389    S3Input(S3InputConfig),
1390    DeltaTableInput(DeltaTableReaderConfig),
1391    DeltaTableOutput(DeltaTableWriterConfig),
1392    RedisOutput(RedisOutputConfig),
1393    // Prevent rust from complaining about large size difference between enum variants.
1394    IcebergInput(Box<IcebergReaderConfig>),
1395    PostgresInput(PostgresReaderConfig),
1396    PostgresOutput(PostgresWriterConfig),
1397    Datagen(DatagenInputConfig),
1398    Nexmark(NexmarkInputConfig),
1399    /// Direct HTTP input: cannot be instantiated through API
1400    HttpInput(HttpInputConfig),
1401    /// Direct HTTP output: cannot be instantiated through API
1402    HttpOutput,
1403    /// Ad hoc input: cannot be instantiated through API
1404    AdHocInput(AdHocInputConfig),
1405    ClockInput(ClockConfig),
1406}
1407
1408impl TransportConfig {
1409    pub fn name(&self) -> String {
1410        match self {
1411            TransportConfig::FileInput(_) => "file_input".to_string(),
1412            TransportConfig::FileOutput(_) => "file_output".to_string(),
1413            TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
1414            TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
1415            TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
1416            TransportConfig::UrlInput(_) => "url_input".to_string(),
1417            TransportConfig::S3Input(_) => "s3_input".to_string(),
1418            TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
1419            TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
1420            TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
1421            TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
1422            TransportConfig::PostgresOutput(_) => "postgres_output".to_string(),
1423            TransportConfig::Datagen(_) => "datagen".to_string(),
1424            TransportConfig::Nexmark(_) => "nexmark".to_string(),
1425            TransportConfig::HttpInput(_) => "http_input".to_string(),
1426            TransportConfig::HttpOutput => "http_output".to_string(),
1427            TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
1428            TransportConfig::RedisOutput(_) => "redis_output".to_string(),
1429            TransportConfig::ClockInput(_) => "clock".to_string(),
1430        }
1431    }
1432
1433    /// Returns true if the connector is transient, i.e., is created and destroyed
1434    /// at runtime on demand, rather than being configured as part of the pipeline.
1435    pub fn is_transient(&self) -> bool {
1436        matches!(
1437            self,
1438            TransportConfig::AdHocInput(_)
1439                | TransportConfig::HttpInput(_)
1440                | TransportConfig::HttpOutput
1441                | TransportConfig::ClockInput(_)
1442        )
1443    }
1444}
1445
1446/// Data format specification used to parse raw data received from the
1447/// endpoint or to encode data sent to the endpoint.
1448#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
1449pub struct FormatConfig {
1450    /// Format name, e.g., "csv", "json", "bincode", etc.
1451    pub name: Cow<'static, str>,
1452
1453    /// Format-specific parser or encoder configuration.
1454    #[serde(default)]
1455    #[schema(value_type = Object)]
1456    pub config: JsonValue,
1457}
1458
1459#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, ToSchema)]
1460#[serde(default)]
1461pub struct ResourceConfig {
1462    /// The minimum number of CPU cores to reserve
1463    /// for an instance of this pipeline
1464    #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1465    pub cpu_cores_min: Option<f64>,
1466
1467    /// The maximum number of CPU cores to reserve
1468    /// for an instance of this pipeline
1469    #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1470    pub cpu_cores_max: Option<f64>,
1471
1472    /// The minimum memory in Megabytes to reserve
1473    /// for an instance of this pipeline
1474    pub memory_mb_min: Option<u64>,
1475
1476    /// The maximum memory in Megabytes to reserve
1477    /// for an instance of this pipeline
1478    pub memory_mb_max: Option<u64>,
1479
1480    /// The total storage in Megabytes to reserve
1481    /// for an instance of this pipeline
1482    pub storage_mb_max: Option<u64>,
1483
1484    /// Storage class to use for an instance of this pipeline.
1485    /// The class determines storage performance such as IOPS and throughput.
1486    pub storage_class: Option<String>,
1487}