feldera_types/
config.rs

1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure.  The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs.  We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::transport::adhoc::AdHocInputConfig;
9use crate::transport::clock::ClockConfig;
10use crate::transport::datagen::DatagenInputConfig;
11use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
12use crate::transport::file::{FileInputConfig, FileOutputConfig};
13use crate::transport::http::HttpInputConfig;
14use crate::transport::iceberg::IcebergReaderConfig;
15use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
16use crate::transport::nexmark::NexmarkInputConfig;
17use crate::transport::postgres::{PostgresReaderConfig, PostgresWriterConfig};
18use crate::transport::pubsub::PubSubInputConfig;
19use crate::transport::redis::RedisOutputConfig;
20use crate::transport::s3::S3InputConfig;
21use crate::transport::url::UrlInputConfig;
22use core::fmt;
23use serde::de::{self, MapAccess, Visitor};
24use serde::{Deserialize, Deserializer, Serialize};
25use serde_json::Value as JsonValue;
26use serde_yaml::Value as YamlValue;
27use std::fmt::Display;
28use std::path::Path;
29use std::str::FromStr;
30use std::time::Duration;
31use std::{borrow::Cow, cmp::max, collections::BTreeMap};
32use utoipa::openapi::{ObjectBuilder, OneOfBuilder, Ref, RefOr, Schema, SchemaType};
33use utoipa::ToSchema;
34
35const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
36
37/// Default value of `ConnectorConfig::max_queued_records`.
38pub const fn default_max_queued_records() -> u64 {
39    1_000_000
40}
41
42/// Default maximum batch size for connectors, in records.
43///
44/// If you change this then update the comment on
45/// [ConnectorConfig::max_batch_size].
46pub const fn default_max_batch_size() -> u64 {
47    10_000
48}
49
50pub const DEFAULT_CLOCK_RESOLUTION_USECS: u64 = 1_000_000;
51
52/// Pipeline deployment configuration.
53/// It represents configuration entries directly provided by the user
54/// (e.g., runtime configuration) and entries derived from the schema
55/// of the compiled program (e.g., connectors). Storage configuration,
56/// if applicable, is set by the runner.
57#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
58pub struct PipelineConfig {
59    /// Global controller configuration.
60    #[serde(flatten)]
61    #[schema(inline)]
62    pub global: RuntimeConfig,
63
64    /// Pipeline name.
65    pub name: Option<String>,
66
67    /// Configuration for persistent storage
68    ///
69    /// If `global.storage` is `Some(_)`, this field must be set to some
70    /// [`StorageConfig`].  If `global.storage` is `None``, the pipeline ignores
71    /// this field.
72    #[serde(default)]
73    pub storage_config: Option<StorageConfig>,
74
75    /// Input endpoint configuration.
76    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
77
78    /// Output endpoint configuration.
79    #[serde(default)]
80    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
81}
82
83impl PipelineConfig {
84    pub fn max_parallel_connector_init(&self) -> u64 {
85        max(
86            self.global
87                .max_parallel_connector_init
88                .unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
89            1,
90        )
91    }
92
93    pub fn with_storage(self, storage: Option<(StorageConfig, StorageOptions)>) -> Self {
94        let (storage_config, storage_options) = storage.unzip();
95        Self {
96            global: RuntimeConfig {
97                storage: storage_options,
98                ..self.global
99            },
100            storage_config,
101            ..self
102        }
103    }
104
105    pub fn storage(&self) -> Option<(&StorageConfig, &StorageOptions)> {
106        let storage_options = self.global.storage.as_ref();
107        let storage_config = self.storage_config.as_ref();
108        storage_config.zip(storage_options)
109    }
110}
111
112/// Configuration for persistent storage in a [`PipelineConfig`].
113#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
114pub struct StorageConfig {
115    /// A directory to keep pipeline state, as a path on the filesystem of the
116    /// machine or container where the pipeline will run.
117    ///
118    /// When storage is enabled, this directory stores the data for
119    /// [StorageBackendConfig::Default].
120    ///
121    /// When fault tolerance is enabled, this directory stores checkpoints and
122    /// the log.
123    pub path: String,
124
125    /// How to cache access to storage in this pipeline.
126    #[serde(default)]
127    pub cache: StorageCacheConfig,
128}
129
130impl StorageConfig {
131    pub fn path(&self) -> &Path {
132        Path::new(&self.path)
133    }
134}
135
136/// How to cache access to storage within a Feldera pipeline.
137#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
138#[serde(rename_all = "snake_case")]
139pub enum StorageCacheConfig {
140    /// Use the operating system's page cache as the primary storage cache.
141    ///
142    /// This is the default because it currently performs better than
143    /// `FelderaCache`.
144    #[default]
145    PageCache,
146
147    /// Use Feldera's internal cache implementation.
148    ///
149    /// This is under development. It will become the default when its
150    /// performance exceeds that of `PageCache`.
151    FelderaCache,
152}
153
154impl StorageCacheConfig {
155    #[cfg(unix)]
156    pub fn to_custom_open_flags(&self) -> i32 {
157        match self {
158            StorageCacheConfig::PageCache => (),
159            StorageCacheConfig::FelderaCache => {
160                #[cfg(target_os = "linux")]
161                return libc::O_DIRECT;
162            }
163        }
164        0
165    }
166}
167
168/// Storage configuration for a pipeline.
169#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
170#[serde(default)]
171pub struct StorageOptions {
172    /// How to connect to the underlying storage.
173    pub backend: StorageBackendConfig,
174
175    /// For a batch of data maintained as part of a persistent index during a
176    /// pipeline run, the minimum estimated number of bytes to write it to
177    /// storage.
178    ///
179    /// This is provided for debugging and fine-tuning and should ordinarily be
180    /// left unset.
181    ///
182    /// A value of 0 will write even empty batches to storage, and nonzero
183    /// values provide a threshold.  `usize::MAX` would effectively disable
184    /// storage for such batches.  The default is 1,048,576 (1 MiB).
185    pub min_storage_bytes: Option<usize>,
186
187    /// For a batch of data passed through the pipeline during a single step,
188    /// the minimum estimated number of bytes to write it to storage.
189    ///
190    /// This is provided for debugging and fine-tuning and should ordinarily be
191    /// left unset.  A value of 0 will write even empty batches to storage, and
192    /// nonzero values provide a threshold.  `usize::MAX`, the default,
193    /// effectively disables storage for such batches.  If it is set to another
194    /// value, it should ordinarily be greater than or equal to
195    /// `min_storage_bytes`.
196    pub min_step_storage_bytes: Option<usize>,
197
198    /// The form of compression to use in data batches.
199    ///
200    /// Compression has a CPU cost but it can take better advantage of limited
201    /// NVMe and network bandwidth, which means that it can increase overall
202    /// performance.
203    pub compression: StorageCompression,
204
205    /// The maximum size of the in-memory storage cache, in MiB.
206    ///
207    /// If set, the specified cache size is spread across all the foreground and
208    /// background threads. If unset, each foreground or background thread cache
209    /// is limited to 256 MiB.
210    pub cache_mib: Option<usize>,
211}
212
213/// Backend storage configuration.
214#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
215#[serde(tag = "name", content = "config", rename_all = "snake_case")]
216pub enum StorageBackendConfig {
217    /// Use the default storage configuration.
218    ///
219    /// This currently uses the local file system.
220    #[default]
221    Default,
222
223    /// Use the local file system.
224    ///
225    /// This uses ordinary system file operations.
226    File(FileBackendConfig),
227
228    /// Object storage.
229    Object(ObjectStorageConfig),
230}
231
232impl Display for StorageBackendConfig {
233    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
234        match self {
235            StorageBackendConfig::Default => write!(f, "default"),
236            StorageBackendConfig::File(_) => write!(f, "file"),
237            StorageBackendConfig::Object(_) => write!(f, "object"),
238        }
239    }
240}
241
242/// Storage compression algorithm.
243#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
244#[serde(rename_all = "snake_case")]
245pub enum StorageCompression {
246    /// Use Feldera's default compression algorithm.
247    ///
248    /// The default may change as Feldera's performance is tuned and new
249    /// algorithms are introduced.
250    #[default]
251    Default,
252
253    /// Do not compress.
254    None,
255
256    /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
257    Snappy,
258}
259
260#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
261pub struct SyncConfig {
262    /// The endpoint URL for the storage service.
263    ///
264    /// This is typically required for custom or local S3-compatible storage providers like MinIO.
265    /// Example: `http://localhost:9000`
266    ///
267    /// Relevant rclone config key: [`endpoint`](https://rclone.org/s3/#s3-endpoint)
268    pub endpoint: Option<String>,
269
270    /// The name of the storage bucket.
271    ///
272    /// This may include a path to a folder inside the bucket (e.g., `my-bucket/data`).
273    pub bucket: String,
274
275    /// The region that this bucket is in.
276    ///
277    /// Leave empty for Minio or the default region (`us-east-1` for AWS).
278    pub region: Option<String>,
279
280    /// The name of the cloud storage provider (e.g., `"AWS"`, `"Minio"`).
281    ///
282    /// Used for provider-specific behavior in rclone.
283    /// If omitted, defaults to `"Other"`.
284    ///
285    /// See [rclone S3 provider documentation](https://rclone.org/s3/#s3-provider)
286    pub provider: Option<String>,
287
288    /// The access key used to authenticate with the storage provider.
289    ///
290    /// If not provided, rclone will fall back to environment-based credentials, such as
291    /// `RCLONE_S3_ACCESS_KEY_ID`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
292    /// this can be left empty to allow automatic authentication via the pod's service account.
293    pub access_key: Option<String>,
294
295    /// The secret key used together with the access key for authentication.
296    ///
297    /// If not provided, rclone will fall back to environment-based credentials, such as
298    /// `RCLONE_S3_SECRET_ACCESS_KEY`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
299    /// this can be left empty to allow automatic authentication via the pod's service account.
300    pub secret_key: Option<String>,
301
302    /// If `true`, will try to pull the latest checkpoint from the configured
303    /// object store and resume from that point.
304    pub start_from_checkpoint: bool,
305}
306
307#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
308pub struct ObjectStorageConfig {
309    /// URL.
310    ///
311    /// The following URL schemes are supported:
312    ///
313    /// * S3:
314    ///   - `s3://<bucket>/<path>`
315    ///   - `s3a://<bucket>/<path>`
316    ///   - `https://s3.<region>.amazonaws.com/<bucket>`
317    ///   - `https://<bucket>.s3.<region>.amazonaws.com`
318    ///   - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
319    /// * Google Cloud Storage:
320    ///   - `gs://<bucket>/<path>`
321    /// * Microsoft Azure Blob Storage:
322    ///   - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
323    ///   - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
324    ///   - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
325    ///   - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
326    ///   - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
327    ///   - `azure://<container>/<path>` (custom)
328    ///   - `https://<account>.dfs.core.windows.net`
329    ///   - `https://<account>.blob.core.windows.net`
330    ///   - `https://<account>.blob.core.windows.net/<container>`
331    ///   - `https://<account>.dfs.fabric.microsoft.com`
332    ///   - `https://<account>.dfs.fabric.microsoft.com/<container>`
333    ///   - `https://<account>.blob.fabric.microsoft.com`
334    ///   - `https://<account>.blob.fabric.microsoft.com/<container>`
335    ///
336    /// Settings derived from the URL will override other settings.
337    pub url: String,
338
339    /// Additional options as key-value pairs.
340    ///
341    /// The following keys are supported:
342    ///
343    /// * S3:
344    ///   - `access_key_id`: AWS Access Key.
345    ///   - `secret_access_key`: AWS Secret Access Key.
346    ///   - `region`: Region.
347    ///   - `default_region`: Default region.
348    ///   - `endpoint`: Custom endpoint for communicating with S3,
349    ///     e.g. `https://localhost:4566` for testing against a localstack
350    ///     instance.
351    ///   - `token`: Token to use for requests (passed to underlying provider).
352    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
353    /// * Google Cloud Storage:
354    ///   - `service_account`: Path to the service account file.
355    ///   - `service_account_key`: The serialized service account key.
356    ///   - `google_application_credentials`: Application credentials path.
357    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
358    /// * Microsoft Azure Blob Storage:
359    ///   - `access_key`: Azure Access Key.
360    ///   - `container_name`: Azure Container Name.
361    ///   - `account`: Azure Account.
362    ///   - `bearer_token_authorization`: Static bearer token for authorizing requests.
363    ///   - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
364    ///   - `client_secret`: Client secret for use in client secret flow.
365    ///   - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
366    ///   - `endpoint`: Override the endpoint for communicating with blob storage.
367    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
368    ///
369    /// Options set through the URL take precedence over those set with these
370    /// options.
371    #[serde(flatten)]
372    pub other_options: BTreeMap<String, String>,
373}
374
375/// Configuration for local file system access.
376#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
377#[serde(default)]
378pub struct FileBackendConfig {
379    /// Whether to use background threads for file I/O.
380    ///
381    /// Background threads should improve performance, but they can reduce
382    /// performance if too few cores are available. This is provided for
383    /// debugging and fine-tuning and should ordinarily be left unset.
384    pub async_threads: Option<bool>,
385
386    /// Per-I/O operation sleep duration, in milliseconds.
387    ///
388    /// This is for simulating slow storage devices.  Do not use this in
389    /// production.
390    pub ioop_delay: Option<u64>,
391
392    /// Configuration to synchronize checkpoints to object store.
393    pub sync: Option<SyncConfig>,
394}
395
396/// Global pipeline configuration settings. This is the publicly
397/// exposed type for users to configure pipelines.
398#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
399#[serde(default)]
400pub struct RuntimeConfig {
401    /// Number of DBSP worker threads.
402    ///
403    /// Each DBSP "foreground" worker thread is paired with a "background"
404    /// thread for LSM merging, making the total number of threads twice the
405    /// specified number.
406    ///
407    /// The typical sweet spot for the number of workers is between 4 and 16.
408    /// Each worker increases overall memory consumption for data structures
409    /// used during a step.
410    pub workers: u16,
411
412    /// Storage configuration.
413    ///
414    /// - If this is `None`, the default, the pipeline's state is kept in
415    ///   in-memory data-structures.  This is useful if the pipeline's state
416    ///   will fit in memory and if the pipeline is ephemeral and does not need
417    ///   to be recovered after a restart. The pipeline will most likely run
418    ///   faster since it does not need to access storage.
419    ///
420    /// - If set, the pipeline's state is kept on storage.  This allows the
421    ///   pipeline to work with state that will not fit into memory. It also
422    ///   allows the state to be checkpointed and recovered across restarts.
423    #[serde(deserialize_with = "deserialize_storage_options")]
424    pub storage: Option<StorageOptions>,
425
426    /// Fault tolerance configuration.
427    #[serde(deserialize_with = "deserialize_fault_tolerance")]
428    pub fault_tolerance: FtConfig,
429
430    /// Enable CPU profiler.
431    ///
432    /// The default value is `true`.
433    pub cpu_profiler: bool,
434
435    /// Enable pipeline tracing.
436    pub tracing: bool,
437
438    /// Jaeger tracing endpoint to send tracing information to.
439    pub tracing_endpoint_jaeger: String,
440
441    /// Minimal input batch size.
442    ///
443    /// The controller delays pushing input records to the circuit until at
444    /// least `min_batch_size_records` records have been received (total
445    /// across all endpoints) or `max_buffering_delay_usecs` microseconds
446    /// have passed since at least one input records has been buffered.
447    /// Defaults to 0.
448    pub min_batch_size_records: u64,
449
450    /// Maximal delay in microseconds to wait for `min_batch_size_records` to
451    /// get buffered by the controller, defaults to 0.
452    pub max_buffering_delay_usecs: u64,
453
454    /// Resource reservations and limits. This is enforced
455    /// only in Feldera Cloud.
456    pub resources: ResourceConfig,
457
458    /// Real-time clock resolution in microseconds.
459    ///
460    /// This parameter controls the execution of queries that use the `NOW()` function.  The output of such
461    /// queries depends on the real-time clock and can change over time without any external
462    /// inputs.  The pipeline will update the clock value and trigger incremental recomputation
463    /// at most each `clock_resolution_usecs` microseconds.
464    ///
465    /// It is set to 1 second (1,000,000 microseconds) by default.
466    ///
467    /// Set to `null` to disable periodic clock updates.
468    pub clock_resolution_usecs: Option<u64>,
469
470    /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
471    /// its worker threads.  Specify at least twice as many CPU numbers as
472    /// workers.  CPUs are generally numbered starting from 0.  The pipeline
473    /// might not be able to honor CPU pinning requests.
474    ///
475    /// CPU pinning can make pipelines run faster and perform more consistently,
476    /// as long as different pipelines running on the same machine are pinned to
477    /// different CPUs.
478    pub pin_cpus: Vec<usize>,
479
480    /// Timeout in seconds for the `Provisioning` phase of the pipeline.
481    /// Setting this value will override the default of the runner.
482    pub provisioning_timeout_secs: Option<u64>,
483
484    /// The maximum number of connectors initialized in parallel during pipeline
485    /// startup.
486    ///
487    /// At startup, the pipeline must initialize all of its input and output connectors.
488    /// Depending on the number and types of connectors, this can take a long time.
489    /// To accelerate the process, multiple connectors are initialized concurrently.
490    /// This option controls the maximum number of connectors that can be initialized
491    /// in parallel.
492    ///
493    /// The default is 10.
494    pub max_parallel_connector_init: Option<u64>,
495
496    /// Specification of additional (sidecar) containers.
497    pub init_containers: Option<serde_yaml::Value>,
498
499    /// * If `true`, the suspend operation will first atomically checkpoint the pipeline before
500    ///   deprovisioning the compute resources. When resuming, the pipeline will start from this
501    ///   checkpoint.
502    /// * If `false`, then the pipeline will be suspended without creating an additional checkpoint.
503    ///   When resuming, it will pick up the latest checkpoint made by the periodic checkpointer or
504    ///   by invoking the `/checkpoint` API.
505    pub checkpoint_during_suspend: bool,
506
507    /// Optional settings for tweaking Feldera internals.
508    ///
509    /// The available key-value pairs change from one version of Feldera to
510    /// another, so users should not depend on particular settings being
511    /// available, or on their behavior.
512    pub dev_tweaks: BTreeMap<String, serde_json::Value>,
513}
514
515/// Accepts "true" and "false" and converts them to the new format.
516fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
517where
518    D: Deserializer<'de>,
519{
520    struct BoolOrStruct;
521
522    impl<'de> Visitor<'de> for BoolOrStruct {
523        type Value = Option<StorageOptions>;
524
525        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
526            formatter.write_str("boolean or StorageOptions")
527        }
528
529        fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
530        where
531            E: de::Error,
532        {
533            match v {
534                false => Ok(None),
535                true => Ok(Some(StorageOptions::default())),
536            }
537        }
538
539        fn visit_unit<E>(self) -> Result<Self::Value, E>
540        where
541            E: de::Error,
542        {
543            Ok(None)
544        }
545
546        fn visit_none<E>(self) -> Result<Self::Value, E>
547        where
548            E: de::Error,
549        {
550            Ok(None)
551        }
552
553        fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
554        where
555            M: MapAccess<'de>,
556        {
557            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
558        }
559    }
560
561    deserializer.deserialize_any(BoolOrStruct)
562}
563
564/// Accepts very old 'initial_state' and 'latest_checkpoint' as enabling fault
565/// tolerance.
566///
567/// Accepts `null` as disabling fault tolerance.
568///
569/// Otherwise, deserializes [FtConfig] in the way that one might otherwise
570/// expect.
571fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<FtConfig, D::Error>
572where
573    D: Deserializer<'de>,
574{
575    struct StringOrStruct;
576
577    impl<'de> Visitor<'de> for StringOrStruct {
578        type Value = FtConfig;
579
580        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
581            formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
582        }
583
584        fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
585        where
586            E: de::Error,
587        {
588            match v {
589                "initial_state" | "latest_checkpoint" => Ok(FtConfig {
590                    model: Some(FtModel::default()),
591                    ..FtConfig::default()
592                }),
593                _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
594            }
595        }
596
597        fn visit_unit<E>(self) -> Result<Self::Value, E>
598        where
599            E: de::Error,
600        {
601            Ok(FtConfig::default())
602        }
603
604        fn visit_none<E>(self) -> Result<Self::Value, E>
605        where
606            E: de::Error,
607        {
608            Ok(FtConfig::default())
609        }
610
611        fn visit_map<M>(self, map: M) -> Result<FtConfig, M::Error>
612        where
613            M: MapAccess<'de>,
614        {
615            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))
616        }
617    }
618
619    deserializer.deserialize_any(StringOrStruct)
620}
621
622impl Default for RuntimeConfig {
623    fn default() -> Self {
624        Self {
625            workers: 8,
626            storage: Some(StorageOptions::default()),
627            fault_tolerance: FtConfig::default(),
628            cpu_profiler: true,
629            tracing: {
630                // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
631                // to keep it on by default.
632                false
633            },
634            tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
635            min_batch_size_records: 0,
636            max_buffering_delay_usecs: 0,
637            resources: ResourceConfig::default(),
638            clock_resolution_usecs: { Some(DEFAULT_CLOCK_RESOLUTION_USECS) },
639            pin_cpus: Vec::new(),
640            provisioning_timeout_secs: None,
641            max_parallel_connector_init: None,
642            init_containers: None,
643            checkpoint_during_suspend: true,
644            dev_tweaks: BTreeMap::default(),
645        }
646    }
647}
648
649/// Fault-tolerance configuration.
650///
651/// The default [FtConfig] (via [FtConfig::default]) disables fault tolerance,
652/// which is the configuration that one gets if [RuntimeConfig] omits fault
653/// tolerance configuration.
654///
655/// The default value for [FtConfig::model] enables fault tolerance, as
656/// `Some(FtModel::default())`.  This is the configuration that one gets if
657/// [RuntimeConfig] includes a fault tolerance configuration but does not
658/// specify a particular model.
659#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
660#[serde(rename_all = "snake_case")]
661pub struct FtConfig {
662    /// Fault tolerance model to use.
663    #[serde(with = "none_as_string")]
664    #[serde(default = "default_model")]
665    #[schema(
666        schema_with = none_as_string_schema::<FtModel>,
667    )]
668    pub model: Option<FtModel>,
669
670    /// Interval between automatic checkpoints, in seconds.
671    ///
672    /// The default is 60 seconds.  Values less than 1 or greater than 3600 will
673    /// be forced into that range.
674    #[serde(default = "default_checkpoint_interval_secs")]
675    pub checkpoint_interval_secs: Option<u64>,
676}
677
678fn default_model() -> Option<FtModel> {
679    Some(FtModel::default())
680}
681
682pub fn default_checkpoint_interval_secs() -> Option<u64> {
683    Some(60)
684}
685
686impl Default for FtConfig {
687    fn default() -> Self {
688        Self {
689            model: None,
690            checkpoint_interval_secs: default_checkpoint_interval_secs(),
691        }
692    }
693}
694
695#[cfg(test)]
696mod test {
697    use super::deserialize_fault_tolerance;
698    use crate::config::{FtConfig, FtModel};
699    use serde::{Deserialize, Serialize};
700
701    #[test]
702    fn ft_config() {
703        #[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)]
704        #[serde(default)]
705        struct Wrapper {
706            #[serde(deserialize_with = "deserialize_fault_tolerance")]
707            config: FtConfig,
708        }
709
710        // Omitting FtConfig, or specifying null, or specifying model "none", disables fault tolerance.
711        for s in [
712            "{}",
713            r#"{"config": null}"#,
714            r#"{"config": {"model": "none"}}"#,
715        ] {
716            let config: Wrapper = serde_json::from_str(s).unwrap();
717            assert_eq!(
718                config,
719                Wrapper {
720                    config: FtConfig {
721                        model: None,
722                        checkpoint_interval_secs: Some(60)
723                    }
724                }
725            );
726        }
727
728        // Serializing disabled FT produces explicit "none" form.
729        let s = serde_json::to_string(&Wrapper {
730            config: FtConfig::default(),
731        })
732        .unwrap();
733        assert!(s.contains("\"none\""));
734
735        // `{}` for FtConfig, or `{...}` with `model` omitted, enables fault
736        // tolerance.
737        for s in [r#"{"config": {}}"#, r#"{"checkpoint_interval_secs": 60}"#] {
738            assert_eq!(
739                serde_json::from_str::<FtConfig>(s).unwrap(),
740                FtConfig {
741                    model: Some(FtModel::default()),
742                    checkpoint_interval_secs: Some(60)
743                }
744            );
745        }
746
747        // `"checkpoint_interval_secs": null` disables periodic checkpointing.
748        assert_eq!(
749            serde_json::from_str::<FtConfig>(r#"{"checkpoint_interval_secs": null}"#).unwrap(),
750            FtConfig {
751                model: Some(FtModel::default()),
752                checkpoint_interval_secs: None
753            }
754        );
755    }
756}
757
758impl FtConfig {
759    pub fn is_enabled(&self) -> bool {
760        self.model.is_some()
761    }
762
763    /// Returns the checkpoint interval, if fault tolerance is enabled, and
764    /// otherwise `None`.
765    pub fn checkpoint_interval(&self) -> Option<Duration> {
766        if self.is_enabled() {
767            self.checkpoint_interval_secs
768                .map(|interval| Duration::from_secs(interval.clamp(1, 3600)))
769        } else {
770            None
771        }
772    }
773}
774
775/// Serde implementation for de/serializing a string into `Option<T>` where
776/// `"none"` indicates `None` and any other string indicates `Some`.
777///
778/// This could be extended to handle non-strings by adding more forwarding
779/// `visit_*` methods to the Visitor implementation.  I don't see a way to write
780/// them automatically.
781mod none_as_string {
782    use std::marker::PhantomData;
783
784    use serde::de::{Deserialize, Deserializer, IntoDeserializer, Visitor};
785    use serde::ser::{Serialize, Serializer};
786
787    pub(super) fn serialize<S, T>(value: &Option<T>, serializer: S) -> Result<S::Ok, S::Error>
788    where
789        S: Serializer,
790        T: Serialize,
791    {
792        match value.as_ref() {
793            Some(value) => value.serialize(serializer),
794            None => "none".serialize(serializer),
795        }
796    }
797
798    struct NoneAsString<T>(PhantomData<fn() -> T>);
799
800    impl<'de, T> Visitor<'de> for NoneAsString<T>
801    where
802        T: Deserialize<'de>,
803    {
804        type Value = Option<T>;
805
806        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
807            formatter.write_str("string")
808        }
809
810        fn visit_none<E>(self) -> Result<Self::Value, E>
811        where
812            E: serde::de::Error,
813        {
814            Ok(None)
815        }
816
817        fn visit_str<E>(self, value: &str) -> Result<Option<T>, E>
818        where
819            E: serde::de::Error,
820        {
821            if &value.to_ascii_lowercase() == "none" {
822                Ok(None)
823            } else {
824                Ok(Some(T::deserialize(value.into_deserializer())?))
825            }
826        }
827    }
828
829    pub(super) fn deserialize<'de, D, T>(deserializer: D) -> Result<Option<T>, D::Error>
830    where
831        D: Deserializer<'de>,
832        T: Deserialize<'de>,
833    {
834        deserializer.deserialize_str(NoneAsString(PhantomData))
835    }
836}
837
838/// Generates an OpenAPI schema for an `Option<T>` field serialized with `none_as_string`.
839/// The schema is a `oneOf` with a reference to `T`'s schema and a `"none"` string enum.
840fn none_as_string_schema<'a, T: ToSchema<'a> + Default + Serialize>() -> Schema {
841    Schema::OneOf(
842        OneOfBuilder::new()
843            .item(RefOr::Ref(Ref::new(format!(
844                "#/components/schemas/{}",
845                T::schema().0
846            ))))
847            .item(
848                ObjectBuilder::new()
849                    .schema_type(SchemaType::String)
850                    .enum_values(Some(vec!["none"])),
851            )
852            .default(Some(
853                serde_json::to_value(T::default()).expect("Failed to serialize default value"),
854            ))
855            .build(),
856    )
857}
858
859/// Fault tolerance model.
860///
861/// The ordering is significant: we consider [Self::ExactlyOnce] to be a "higher
862/// level" of fault tolerance than [Self::AtLeastOnce].
863#[derive(
864    Debug, Copy, Clone, Default, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, ToSchema,
865)]
866#[serde(rename_all = "snake_case")]
867pub enum FtModel {
868    /// Each record is output at least once.  Crashes may duplicate output, but
869    /// no input or output is dropped.
870    AtLeastOnce,
871
872    /// Each record is output exactly once.  Crashes do not drop or duplicate
873    /// input or output.
874    #[default]
875    ExactlyOnce,
876}
877
878impl FtModel {
879    pub fn option_as_str(value: Option<FtModel>) -> &'static str {
880        value.map_or("no", |model| model.as_str())
881    }
882
883    pub fn as_str(&self) -> &'static str {
884        match self {
885            FtModel::AtLeastOnce => "at_least_once",
886            FtModel::ExactlyOnce => "exactly_once",
887        }
888    }
889}
890
891pub struct FtModelUnknown;
892
893impl FromStr for FtModel {
894    type Err = FtModelUnknown;
895
896    fn from_str(s: &str) -> Result<Self, Self::Err> {
897        match s.to_ascii_lowercase().as_str() {
898            "exactly_once" => Ok(Self::ExactlyOnce),
899            "at_least_once" => Ok(Self::AtLeastOnce),
900            _ => Err(FtModelUnknown),
901        }
902    }
903}
904
905/// Describes an input connector configuration
906#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
907pub struct InputEndpointConfig {
908    /// The name of the input stream of the circuit that this endpoint is
909    /// connected to.
910    pub stream: Cow<'static, str>,
911
912    /// Connector configuration.
913    #[serde(flatten)]
914    pub connector_config: ConnectorConfig,
915}
916
917/// Deserialize the `start_after` property of a connector configuration.
918/// It requires a non-standard deserialization because we want to accept
919/// either a string or an array of strings.
920fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
921where
922    D: Deserializer<'de>,
923{
924    let value = Option::<JsonValue>::deserialize(deserializer)?;
925    match value {
926        Some(JsonValue::String(s)) => Ok(Some(vec![s])),
927        Some(JsonValue::Array(arr)) => {
928            let vec = arr
929                .into_iter()
930                .map(|item| {
931                    item.as_str()
932                        .map(|s| s.to_string())
933                        .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
934                })
935                .collect::<Result<Vec<String>, _>>()?;
936            Ok(Some(vec))
937        }
938        Some(JsonValue::Null) | None => Ok(None),
939        _ => Err(serde::de::Error::custom(
940            "invalid 'start_after' property: expected a string, an array of strings, or null",
941        )),
942    }
943}
944
945/// A data connector's configuration
946#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
947pub struct ConnectorConfig {
948    /// Transport endpoint configuration.
949    pub transport: TransportConfig,
950
951    /// Parser configuration.
952    pub format: Option<FormatConfig>,
953
954    /// Name of the index that the connector is attached to.
955    ///
956    /// This property is valid for output connectors only.  It is used with data
957    /// transports and formats that expect output updates in the form of key/value
958    /// pairs, where the key typically represents a unique id associated with the
959    /// table or view.
960    ///
961    /// To support such output formats, an output connector can be attached to an
962    /// index created using the SQL CREATE INDEX statement.  An index of a table
963    /// or view contains the same updates as the table or view itself, indexed by
964    /// one or more key columns.
965    ///
966    /// See individual connector documentation for details on how they work
967    /// with indexes.
968    pub index: Option<String>,
969
970    /// Output buffer configuration.
971    #[serde(flatten)]
972    pub output_buffer_config: OutputBufferConfig,
973
974    /// Maximum batch size, in records.
975    ///
976    /// This is the maximum number of records to process in one batch through
977    /// the circuit.  The time and space cost of processing a batch is
978    /// asymptotically superlinear in the size of the batch, but very small
979    /// batches are less efficient due to constant factors.
980    ///
981    /// This should usually be less than `max_queued_records`, to give the
982    /// connector a round-trip time to restart and refill the buffer while
983    /// batches are being processed.
984    ///
985    /// Some input adapters might not honor this setting.
986    ///
987    /// The default is 10,000.
988    #[serde(default = "default_max_batch_size")]
989    pub max_batch_size: u64,
990
991    /// Backpressure threshold.
992    ///
993    /// Maximal number of records queued by the endpoint before the endpoint
994    /// is paused by the backpressure mechanism.
995    ///
996    /// For input endpoints, this setting bounds the number of records that have
997    /// been received from the input transport but haven't yet been consumed by
998    /// the circuit since the circuit, since the circuit is still busy processing
999    /// previous inputs.
1000    ///
1001    /// For output endpoints, this setting bounds the number of records that have
1002    /// been produced by the circuit but not yet sent via the output transport endpoint
1003    /// nor stored in the output buffer (see `enable_output_buffer`).
1004    ///
1005    /// Note that this is not a hard bound: there can be a small delay between
1006    /// the backpressure mechanism is triggered and the endpoint is paused, during
1007    /// which more data may be queued.
1008    ///
1009    /// The default is 1 million.
1010    #[serde(default = "default_max_queued_records")]
1011    pub max_queued_records: u64,
1012
1013    /// Create connector in paused state.
1014    ///
1015    /// The default is `false`.
1016    #[serde(default)]
1017    pub paused: bool,
1018
1019    /// Arbitrary user-defined text labels associated with the connector.
1020    ///
1021    /// These labels can be used in conjunction with the `start_after` property
1022    /// to control the start order of connectors.
1023    #[serde(default)]
1024    pub labels: Vec<String>,
1025
1026    /// Start the connector after all connectors with specified labels.
1027    ///
1028    /// This property is used to control the start order of connectors.
1029    /// The connector will not start until all connectors with the specified
1030    /// labels have finished processing all inputs.
1031    #[serde(deserialize_with = "deserialize_start_after")]
1032    #[serde(default)]
1033    pub start_after: Option<Vec<String>>,
1034}
1035
1036#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1037#[serde(default)]
1038pub struct OutputBufferConfig {
1039    /// Enable output buffering.
1040    ///
1041    /// The output buffering mechanism allows decoupling the rate at which the pipeline
1042    /// pushes changes to the output transport from the rate of input changes.
1043    ///
1044    /// By default, output updates produced by the pipeline are pushed directly to
1045    /// the output transport. Some destinations may prefer to receive updates in fewer
1046    /// bigger batches. For instance, when writing Parquet files, producing
1047    /// one bigger file every few minutes is usually better than creating
1048    /// small files every few milliseconds.
1049    ///
1050    /// To achieve such input/output decoupling, users can enable output buffering by
1051    /// setting the `enable_output_buffer` flag to `true`.  When buffering is enabled, output
1052    /// updates produced by the pipeline are consolidated in an internal buffer and are
1053    /// pushed to the output transport when one of several conditions is satisfied:
1054    ///
1055    /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
1056    ///   milliseconds.
1057    /// * buffer size exceeds `max_output_buffer_size_records` records.
1058    ///
1059    /// This flag is `false` by default.
1060    // TODO: on-demand output triggered via the API.
1061    pub enable_output_buffer: bool,
1062
1063    /// Maximum time in milliseconds data is kept in the output buffer.
1064    ///
1065    /// By default, data is kept in the buffer indefinitely until one of
1066    /// the other output conditions is satisfied.  When this option is
1067    /// set the buffer will be flushed at most every
1068    /// `max_output_buffer_time_millis` milliseconds.
1069    ///
1070    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1071    /// to be set.
1072    pub max_output_buffer_time_millis: usize,
1073
1074    /// Maximum number of updates to be kept in the output buffer.
1075    ///
1076    /// This parameter bounds the maximal size of the buffer.
1077    /// Note that the size of the buffer is not always equal to the
1078    /// total number of updates output by the pipeline. Updates to the
1079    /// same record can overwrite or cancel previous updates.
1080    ///
1081    /// By default, the buffer can grow indefinitely until one of
1082    /// the other output conditions is satisfied.
1083    ///
1084    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1085    /// to be set.
1086    pub max_output_buffer_size_records: usize,
1087}
1088
1089impl Default for OutputBufferConfig {
1090    fn default() -> Self {
1091        Self {
1092            enable_output_buffer: false,
1093            max_output_buffer_size_records: usize::MAX,
1094            max_output_buffer_time_millis: usize::MAX,
1095        }
1096    }
1097}
1098
1099impl OutputBufferConfig {
1100    pub fn validate(&self) -> Result<(), String> {
1101        if self.enable_output_buffer
1102            && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
1103            && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
1104        {
1105            return Err(
1106                "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
1107                    .to_string(),
1108            );
1109        }
1110
1111        Ok(())
1112    }
1113}
1114
1115/// Describes an output connector configuration
1116#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1117pub struct OutputEndpointConfig {
1118    /// The name of the output stream of the circuit that this endpoint is
1119    /// connected to.
1120    pub stream: Cow<'static, str>,
1121
1122    /// Connector configuration.
1123    #[serde(flatten)]
1124    pub connector_config: ConnectorConfig,
1125}
1126
1127/// Transport-specific endpoint configuration passed to
1128/// `crate::OutputTransport::new_endpoint`
1129/// and `crate::InputTransport::new_endpoint`.
1130#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1131#[serde(tag = "name", content = "config", rename_all = "snake_case")]
1132pub enum TransportConfig {
1133    FileInput(FileInputConfig),
1134    FileOutput(FileOutputConfig),
1135    KafkaInput(KafkaInputConfig),
1136    KafkaOutput(KafkaOutputConfig),
1137    PubSubInput(PubSubInputConfig),
1138    UrlInput(UrlInputConfig),
1139    S3Input(S3InputConfig),
1140    DeltaTableInput(DeltaTableReaderConfig),
1141    DeltaTableOutput(DeltaTableWriterConfig),
1142    RedisOutput(RedisOutputConfig),
1143    // Prevent rust from complaining about large size difference between enum variants.
1144    IcebergInput(Box<IcebergReaderConfig>),
1145    PostgresInput(PostgresReaderConfig),
1146    PostgresOutput(PostgresWriterConfig),
1147    Datagen(DatagenInputConfig),
1148    Nexmark(NexmarkInputConfig),
1149    /// Direct HTTP input: cannot be instantiated through API
1150    HttpInput(HttpInputConfig),
1151    /// Direct HTTP output: cannot be instantiated through API
1152    HttpOutput,
1153    /// Ad hoc input: cannot be instantiated through API
1154    AdHocInput(AdHocInputConfig),
1155    ClockInput(ClockConfig),
1156}
1157
1158impl TransportConfig {
1159    pub fn name(&self) -> String {
1160        match self {
1161            TransportConfig::FileInput(_) => "file_input".to_string(),
1162            TransportConfig::FileOutput(_) => "file_output".to_string(),
1163            TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
1164            TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
1165            TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
1166            TransportConfig::UrlInput(_) => "url_input".to_string(),
1167            TransportConfig::S3Input(_) => "s3_input".to_string(),
1168            TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
1169            TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
1170            TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
1171            TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
1172            TransportConfig::PostgresOutput(_) => "postgres_output".to_string(),
1173            TransportConfig::Datagen(_) => "datagen".to_string(),
1174            TransportConfig::Nexmark(_) => "nexmark".to_string(),
1175            TransportConfig::HttpInput(_) => "http_input".to_string(),
1176            TransportConfig::HttpOutput => "http_output".to_string(),
1177            TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
1178            TransportConfig::RedisOutput(_) => "redis_output".to_string(),
1179            TransportConfig::ClockInput(_) => "clock".to_string(),
1180        }
1181    }
1182}
1183
1184/// Data format specification used to parse raw data received from the
1185/// endpoint or to encode data sent to the endpoint.
1186#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
1187pub struct FormatConfig {
1188    /// Format name, e.g., "csv", "json", "bincode", etc.
1189    pub name: Cow<'static, str>,
1190
1191    /// Format-specific parser or encoder configuration.
1192    #[serde(default)]
1193    #[schema(value_type = Object)]
1194    pub config: YamlValue,
1195}
1196
1197#[derive(Debug, Clone, Eq, PartialEq, Default, Serialize, Deserialize, ToSchema)]
1198#[serde(default)]
1199pub struct ResourceConfig {
1200    /// The minimum number of CPU cores to reserve
1201    /// for an instance of this pipeline
1202    pub cpu_cores_min: Option<u64>,
1203
1204    /// The maximum number of CPU cores to reserve
1205    /// for an instance of this pipeline
1206    pub cpu_cores_max: Option<u64>,
1207
1208    /// The minimum memory in Megabytes to reserve
1209    /// for an instance of this pipeline
1210    pub memory_mb_min: Option<u64>,
1211
1212    /// The maximum memory in Megabytes to reserve
1213    /// for an instance of this pipeline
1214    pub memory_mb_max: Option<u64>,
1215
1216    /// The total storage in Megabytes to reserve
1217    /// for an instance of this pipeline
1218    pub storage_mb_max: Option<u64>,
1219
1220    /// Storage class to use for an instance of this pipeline.
1221    /// The class determines storage performance such as IOPS and throughput.
1222    pub storage_class: Option<String>,
1223}