feldera_types/
config.rs

1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure.  The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs.  We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::transport::adhoc::AdHocInputConfig;
9use crate::transport::datagen::DatagenInputConfig;
10use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
11use crate::transport::file::{FileInputConfig, FileOutputConfig};
12use crate::transport::http::HttpInputConfig;
13use crate::transport::iceberg::IcebergReaderConfig;
14use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
15use crate::transport::nexmark::NexmarkInputConfig;
16use crate::transport::postgres::{PostgresReaderConfig, PostgresWriterConfig};
17use crate::transport::pubsub::PubSubInputConfig;
18use crate::transport::redis::RedisOutputConfig;
19use crate::transport::s3::S3InputConfig;
20use crate::transport::url::UrlInputConfig;
21use core::fmt;
22use serde::de::{self, MapAccess, Visitor};
23use serde::{Deserialize, Deserializer, Serialize};
24use serde_json::Value as JsonValue;
25use serde_yaml::Value as YamlValue;
26use std::fmt::Display;
27use std::path::Path;
28use std::str::FromStr;
29use std::time::Duration;
30use std::{borrow::Cow, cmp::max, collections::BTreeMap};
31use utoipa::ToSchema;
32
33const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
34
35/// Default value of `ConnectorConfig::max_queued_records`.
36pub const fn default_max_queued_records() -> u64 {
37    1_000_000
38}
39
40/// Default maximum batch size for connectors, in records.
41///
42/// If you change this then update the comment on
43/// [ConnectorConfig::max_batch_size].
44pub const fn default_max_batch_size() -> u64 {
45    10_000
46}
47
48/// Pipeline deployment configuration.
49/// It represents configuration entries directly provided by the user
50/// (e.g., runtime configuration) and entries derived from the schema
51/// of the compiled program (e.g., connectors). Storage configuration,
52/// if applicable, is set by the runner.
53#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
54pub struct PipelineConfig {
55    /// Global controller configuration.
56    #[serde(flatten)]
57    #[schema(inline)]
58    pub global: RuntimeConfig,
59
60    /// Pipeline name.
61    pub name: Option<String>,
62
63    /// Configuration for persistent storage
64    ///
65    /// If `global.storage` is `Some(_)`, this field must be set to some
66    /// [`StorageConfig`].  If `global.storage` is `None``, the pipeline ignores
67    /// this field.
68    #[serde(default)]
69    pub storage_config: Option<StorageConfig>,
70
71    /// Input endpoint configuration.
72    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
73
74    /// Output endpoint configuration.
75    #[serde(default)]
76    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
77}
78
79impl PipelineConfig {
80    pub fn max_parallel_connector_init(&self) -> u64 {
81        max(
82            self.global
83                .max_parallel_connector_init
84                .unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
85            1,
86        )
87    }
88
89    pub fn with_storage(self, storage: Option<(StorageConfig, StorageOptions)>) -> Self {
90        let (storage_config, storage_options) = storage.unzip();
91        Self {
92            global: RuntimeConfig {
93                storage: storage_options,
94                ..self.global
95            },
96            storage_config,
97            ..self
98        }
99    }
100
101    pub fn storage(&self) -> Option<(&StorageConfig, &StorageOptions)> {
102        let storage_options = self.global.storage.as_ref();
103        let storage_config = self.storage_config.as_ref();
104        storage_config.zip(storage_options)
105    }
106}
107
108/// Configuration for persistent storage in a [`PipelineConfig`].
109#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
110pub struct StorageConfig {
111    /// A directory to keep pipeline state, as a path on the filesystem of the
112    /// machine or container where the pipeline will run.
113    ///
114    /// When storage is enabled, this directory stores the data for
115    /// [StorageBackendConfig::Default].
116    ///
117    /// When fault tolerance is enabled, this directory stores checkpoints and
118    /// the log.
119    pub path: String,
120
121    /// How to cache access to storage in this pipeline.
122    #[serde(default)]
123    pub cache: StorageCacheConfig,
124}
125
126impl StorageConfig {
127    pub fn path(&self) -> &Path {
128        Path::new(&self.path)
129    }
130}
131
132/// How to cache access to storage within a Feldera pipeline.
133#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
134#[serde(rename_all = "snake_case")]
135pub enum StorageCacheConfig {
136    /// Use the operating system's page cache as the primary storage cache.
137    ///
138    /// This is the default because it currently performs better than
139    /// `FelderaCache`.
140    #[default]
141    PageCache,
142
143    /// Use Feldera's internal cache implementation.
144    ///
145    /// This is under development. It will become the default when its
146    /// performance exceeds that of `PageCache`.
147    FelderaCache,
148}
149
150impl StorageCacheConfig {
151    #[cfg(unix)]
152    pub fn to_custom_open_flags(&self) -> i32 {
153        match self {
154            StorageCacheConfig::PageCache => (),
155            StorageCacheConfig::FelderaCache => {
156                #[cfg(target_os = "linux")]
157                return libc::O_DIRECT;
158            }
159        }
160        0
161    }
162}
163
164/// Storage configuration for a pipeline.
165#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
166#[serde(default)]
167pub struct StorageOptions {
168    /// How to connect to the underlying storage.
169    pub backend: StorageBackendConfig,
170
171    /// The minimum estimated number of bytes in a batch of data to write it to
172    /// storage.  This is provided for debugging and fine-tuning and should
173    /// ordinarily be left unset.
174    ///
175    /// A value of 0 will write even empty batches to storage, and nonzero
176    /// values provide a threshold.  `usize::MAX` would effectively disable
177    /// storage.
178    ///
179    /// The default is 1,048,576 (1 MiB).
180    pub min_storage_bytes: Option<usize>,
181
182    /// The form of compression to use in data batches.
183    ///
184    /// Compression has a CPU cost but it can take better advantage of limited
185    /// NVMe and network bandwidth, which means that it can increase overall
186    /// performance.
187    pub compression: StorageCompression,
188
189    /// The maximum size of the in-memory storage cache, in MiB.
190    ///
191    /// If set, the specified cache size is spread across all the foreground and
192    /// background threads. If unset, each foreground or background thread cache
193    /// is limited to 256 MiB.
194    pub cache_mib: Option<usize>,
195}
196
197/// Backend storage configuration.
198#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
199#[serde(tag = "name", content = "config", rename_all = "snake_case")]
200pub enum StorageBackendConfig {
201    /// Use the default storage configuration.
202    ///
203    /// This currently uses the local file system.
204    #[default]
205    Default,
206
207    /// Object storage.
208    Object(ObjectStorageConfig),
209}
210
211impl Display for StorageBackendConfig {
212    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213        match self {
214            StorageBackendConfig::Default => write!(f, "default"),
215            StorageBackendConfig::Object(_) => write!(f, "object"),
216        }
217    }
218}
219
220/// Storage compression algorithm.
221#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
222#[serde(rename_all = "snake_case")]
223pub enum StorageCompression {
224    /// Use Feldera's default compression algorithm.
225    ///
226    /// The default may change as Feldera's performance is tuned and new
227    /// algorithms are introduced.
228    #[default]
229    Default,
230
231    /// Do not compress.
232    None,
233
234    /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
235    Snappy,
236}
237
238#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
239pub struct ObjectStorageConfig {
240    /// URL.
241    ///
242    /// The following URL schemes are supported:
243    ///
244    /// * S3:
245    ///   - `s3://<bucket>/<path>`
246    ///   - `s3a://<bucket>/<path>`
247    ///   - `https://s3.<region>.amazonaws.com/<bucket>`
248    ///   - `https://<bucket>.s3.<region>.amazonaws.com`
249    ///   - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
250    /// * Google Cloud Storage:
251    ///   - `gs://<bucket>/<path>`
252    /// * Microsoft Azure Blob Storage:
253    ///   - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
254    ///   - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
255    ///   - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
256    ///   - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
257    ///   - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
258    ///   - `azure://<container>/<path>` (custom)
259    ///   - `https://<account>.dfs.core.windows.net`
260    ///   - `https://<account>.blob.core.windows.net`
261    ///   - `https://<account>.blob.core.windows.net/<container>`
262    ///   - `https://<account>.dfs.fabric.microsoft.com`
263    ///   - `https://<account>.dfs.fabric.microsoft.com/<container>`
264    ///   - `https://<account>.blob.fabric.microsoft.com`
265    ///   - `https://<account>.blob.fabric.microsoft.com/<container>`
266    ///
267    /// Settings derived from the URL will override other settings.
268    pub url: String,
269
270    /// Additional options as key-value pairs.
271    ///
272    /// The following keys are supported:
273    ///
274    /// * S3:
275    ///   - `access_key_id`: AWS Access Key.
276    ///   - `secret_access_key`: AWS Secret Access Key.
277    ///   - `region`: Region.
278    ///   - `default_region`: Default region.
279    ///   - `endpoint`: Custom endpoint for communicating with S3,
280    ///     e.g. `https://localhost:4566` for testing against a localstack
281    ///     instance.
282    ///   - `token`: Token to use for requests (passed to underlying provider).
283    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
284    /// * Google Cloud Storage:
285    ///   - `service_account`: Path to the service account file.
286    ///   - `service_account_key`: The serialized service account key.
287    ///   - `google_application_credentials`: Application credentials path.
288    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
289    /// * Microsoft Azure Blob Storage:
290    ///   - `access_key`: Azure Access Key.
291    ///   - `container_name`: Azure Container Name.
292    ///   - `account`: Azure Account.
293    ///   - `bearer_token_authorization`: Static bearer token for authorizing requests.
294    ///   - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
295    ///   - `client_secret`: Client secret for use in client secret flow.
296    ///   - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
297    ///   - `endpoint`: Override the endpoint for communicating with blob storage.
298    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
299    ///
300    /// Options set through the URL take precedence over those set with these
301    /// options.
302    #[serde(flatten)]
303    pub other_options: BTreeMap<String, String>,
304}
305
306/// Global pipeline configuration settings. This is the publicly
307/// exposed type for users to configure pipelines.
308#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
309#[serde(default)]
310pub struct RuntimeConfig {
311    /// Number of DBSP worker threads.
312    ///
313    /// Each DBSP "foreground" worker thread is paired with a "background"
314    /// thread for LSM merging, making the total number of threads twice the
315    /// specified number.
316    pub workers: u16,
317
318    /// Storage configuration.
319    ///
320    /// - If this is `None`, the default, the pipeline's state is kept in
321    ///   in-memory data-structures.  This is useful if the pipeline's state
322    ///   will fit in memory and if the pipeline is ephemeral and does not need
323    ///   to be recovered after a restart. The pipeline will most likely run
324    ///   faster since it does not need to access storage.
325    ///
326    /// - If set, the pipeline's state is kept on storage.  This allows the
327    ///   pipeline to work with state that will not fit into memory. It also
328    ///   allows the state to be checkpointed and recovered across restarts.
329    #[serde(deserialize_with = "deserialize_storage_options")]
330    pub storage: Option<StorageOptions>,
331
332    /// Fault tolerance configuration.
333    #[serde(deserialize_with = "deserialize_fault_tolerance")]
334    pub fault_tolerance: FtConfig,
335
336    /// Enable CPU profiler.
337    ///
338    /// The default value is `true`.
339    pub cpu_profiler: bool,
340
341    /// Enable pipeline tracing.
342    pub tracing: bool,
343
344    /// Jaeger tracing endpoint to send tracing information to.
345    pub tracing_endpoint_jaeger: String,
346
347    /// Minimal input batch size.
348    ///
349    /// The controller delays pushing input records to the circuit until at
350    /// least `min_batch_size_records` records have been received (total
351    /// across all endpoints) or `max_buffering_delay_usecs` microseconds
352    /// have passed since at least one input records has been buffered.
353    /// Defaults to 0.
354    pub min_batch_size_records: u64,
355
356    /// Maximal delay in microseconds to wait for `min_batch_size_records` to
357    /// get buffered by the controller, defaults to 0.
358    pub max_buffering_delay_usecs: u64,
359
360    /// Resource reservations and limits. This is enforced
361    /// only in Feldera Cloud.
362    pub resources: ResourceConfig,
363
364    /// Real-time clock resolution in microseconds.
365    ///
366    /// This parameter controls the execution of queries that use the `NOW()` function.  The output of such
367    /// queries depends on the real-time clock and can change over time without any external
368    /// inputs.  The pipeline will update the clock value and trigger incremental recomputation
369    /// at most each `clock_resolution_usecs` microseconds.
370    ///
371    /// It is set to 100 milliseconds (100,000 microseconds) by default.
372    ///
373    /// Set to `null` to disable periodic clock updates.
374    pub clock_resolution_usecs: Option<u64>,
375
376    /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
377    /// its worker threads.  Specify at least twice as many CPU numbers as
378    /// workers.  CPUs are generally numbered starting from 0.  The pipeline
379    /// might not be able to honor CPU pinning requests.
380    ///
381    /// CPU pinning can make pipelines run faster and perform more consistently,
382    /// as long as different pipelines running on the same machine are pinned to
383    /// different CPUs.
384    pub pin_cpus: Vec<usize>,
385
386    /// Timeout in seconds for the `Provisioning` phase of the pipeline.
387    /// Setting this value will override the default of the runner.
388    pub provisioning_timeout_secs: Option<u64>,
389
390    /// The maximum number of connectors initialized in parallel during pipeline
391    /// startup.
392    ///
393    /// At startup, the pipeline must initialize all of its input and output connectors.
394    /// Depending on the number and types of connectors, this can take a long time.
395    /// To accelerate the process, multiple connectors are initialized concurrently.
396    /// This option controls the maximum number of connectors that can be intitialized
397    /// in parallel.
398    ///
399    /// The default is 10.
400    pub max_parallel_connector_init: Option<u64>,
401}
402
403/// Accepts "true" and "false" and converts them to the new format.
404fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
405where
406    D: Deserializer<'de>,
407{
408    struct BoolOrStruct;
409
410    impl<'de> Visitor<'de> for BoolOrStruct {
411        type Value = Option<StorageOptions>;
412
413        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
414            formatter.write_str("boolean or StorageOptions")
415        }
416
417        fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
418        where
419            E: de::Error,
420        {
421            match v {
422                false => Ok(None),
423                true => Ok(Some(StorageOptions::default())),
424            }
425        }
426
427        fn visit_unit<E>(self) -> Result<Self::Value, E>
428        where
429            E: de::Error,
430        {
431            Ok(None)
432        }
433
434        fn visit_none<E>(self) -> Result<Self::Value, E>
435        where
436            E: de::Error,
437        {
438            Ok(None)
439        }
440
441        fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
442        where
443            M: MapAccess<'de>,
444        {
445            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
446        }
447    }
448
449    deserializer.deserialize_any(BoolOrStruct)
450}
451
452/// Accepts very old 'initial_state' and 'latest_checkpoint' as enabling fault
453/// tolerance.
454///
455/// Accepts `null` as disabling fault tolerance.
456///
457/// Otherwise, deserializes [FtConfig] in the way that one might otherwise
458/// expect.
459fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<FtConfig, D::Error>
460where
461    D: Deserializer<'de>,
462{
463    struct StringOrStruct;
464
465    impl<'de> Visitor<'de> for StringOrStruct {
466        type Value = FtConfig;
467
468        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
469            formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
470        }
471
472        fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
473        where
474            E: de::Error,
475        {
476            match v {
477                "initial_state" | "latest_checkpoint" => Ok(FtConfig {
478                    model: Some(FtModel::default()),
479                    ..FtConfig::default()
480                }),
481                _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
482            }
483        }
484
485        fn visit_unit<E>(self) -> Result<Self::Value, E>
486        where
487            E: de::Error,
488        {
489            Ok(FtConfig::default())
490        }
491
492        fn visit_none<E>(self) -> Result<Self::Value, E>
493        where
494            E: de::Error,
495        {
496            Ok(FtConfig::default())
497        }
498
499        fn visit_map<M>(self, map: M) -> Result<FtConfig, M::Error>
500        where
501            M: MapAccess<'de>,
502        {
503            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))
504        }
505    }
506
507    deserializer.deserialize_any(StringOrStruct)
508}
509
510impl Default for RuntimeConfig {
511    fn default() -> Self {
512        Self {
513            workers: 8,
514            storage: Some(StorageOptions::default()),
515            fault_tolerance: FtConfig::default(),
516            cpu_profiler: true,
517            tracing: {
518                // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
519                // to keep it on by default.
520                false
521            },
522            tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
523            min_batch_size_records: 0,
524            max_buffering_delay_usecs: 0,
525            resources: ResourceConfig::default(),
526            clock_resolution_usecs: {
527                // Every 100 ms.
528                Some(100_000)
529            },
530            pin_cpus: Vec::new(),
531            provisioning_timeout_secs: None,
532            max_parallel_connector_init: None,
533        }
534    }
535}
536
537/// Fault-tolerance configuration.
538///
539/// The default [FtConfig] (via [FtConfig::default]) disables fault tolerance,
540/// which is the configuration that one gets if [RuntimeConfig] omits fault
541/// tolerance configuration.
542///
543/// The default value for [FtConfig::model] enables fault tolerance, as
544/// `Some(FtModel::default())`.  This is the configuration that one gets if
545/// [RuntimeConfig] includes a fault tolerance configuration but does not
546/// specify a particular model.
547#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
548#[serde(rename_all = "snake_case")]
549pub struct FtConfig {
550    /// Fault tolerance model to use.
551    #[serde(with = "none_as_string")]
552    #[serde(default = "default_model")]
553    pub model: Option<FtModel>,
554
555    /// Interval between automatic checkpoints, in seconds.
556    ///
557    /// The default is 60 seconds.  Values less than 1 or greater than 3600 will
558    /// be forced into that range.
559    #[serde(default = "default_checkpoint_interval_secs")]
560    pub checkpoint_interval_secs: u64,
561}
562
563fn default_model() -> Option<FtModel> {
564    Some(FtModel::default())
565}
566
567pub fn default_checkpoint_interval_secs() -> u64 {
568    60
569}
570
571impl Default for FtConfig {
572    fn default() -> Self {
573        Self {
574            model: None,
575            checkpoint_interval_secs: default_checkpoint_interval_secs(),
576        }
577    }
578}
579
580#[cfg(test)]
581mod test {
582    use super::deserialize_fault_tolerance;
583    use crate::config::{FtConfig, FtModel};
584    use serde::{Deserialize, Serialize};
585
586    #[test]
587    fn ft_config() {
588        #[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)]
589        #[serde(default)]
590        struct Wrapper {
591            #[serde(deserialize_with = "deserialize_fault_tolerance")]
592            config: FtConfig,
593        }
594
595        // Omitting FtConfig, or specifying null, or specifying model "none", disables fault tolerance.
596        for s in [
597            "{}",
598            r#"{"config": null}"#,
599            r#"{"config": {"model": "none"}}"#,
600        ] {
601            let config: Wrapper = serde_json::from_str(s).unwrap();
602            assert_eq!(
603                config,
604                Wrapper {
605                    config: FtConfig {
606                        model: None,
607                        ..FtConfig::default()
608                    }
609                }
610            );
611        }
612
613        // Serializing disabled FT produces explicit "none" form.
614        let s = serde_json::to_string(&Wrapper {
615            config: FtConfig::default(),
616        })
617        .unwrap();
618        assert!(s.contains("\"none\""));
619
620        // `{}` for FtConfig, or `{...}` with `model` omitted, enables fault
621        // tolerance.
622        for s in [r#"{"config": {}}"#, r#"{"checkpoint_interval_secs": 60}"#] {
623            assert_eq!(
624                serde_json::from_str::<FtConfig>(s).unwrap(),
625                FtConfig {
626                    model: Some(FtModel::default()),
627                    ..FtConfig::default()
628                }
629            );
630        }
631    }
632}
633
634impl FtConfig {
635    pub fn is_enabled(&self) -> bool {
636        self.model.is_some()
637    }
638
639    /// Returns the checkpoint interval, if fault tolerance is enabled, and
640    /// otherwise `None`.
641    pub fn checkpoint_interval(&self) -> Option<Duration> {
642        self.is_enabled()
643            .then(|| Duration::from_secs(self.checkpoint_interval_secs.clamp(1, 3600)))
644    }
645}
646
647/// Serde implementation for de/serializing a string into `Option<T>` where
648/// `"none"` indicates `None` and any other string indicates `Some`.
649///
650/// This could be extended to handle non-strings by adding more forwarding
651/// `visit_*` methods to the Visitor implementation.  I don't see a way to write
652/// them automatically.
653mod none_as_string {
654    use std::marker::PhantomData;
655
656    use serde::de::{Deserialize, Deserializer, IntoDeserializer, Visitor};
657    use serde::ser::{Serialize, Serializer};
658
659    pub(super) fn serialize<S, T>(value: &Option<T>, serializer: S) -> Result<S::Ok, S::Error>
660    where
661        S: Serializer,
662        T: Serialize,
663    {
664        match value.as_ref() {
665            Some(value) => value.serialize(serializer),
666            None => "none".serialize(serializer),
667        }
668    }
669
670    struct NoneAsString<T>(PhantomData<fn() -> T>);
671
672    impl<'de, T> Visitor<'de> for NoneAsString<T>
673    where
674        T: Deserialize<'de>,
675    {
676        type Value = Option<T>;
677
678        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
679            formatter.write_str("string")
680        }
681
682        fn visit_none<E>(self) -> Result<Self::Value, E>
683        where
684            E: serde::de::Error,
685        {
686            Ok(None)
687        }
688
689        fn visit_str<E>(self, value: &str) -> Result<Option<T>, E>
690        where
691            E: serde::de::Error,
692        {
693            if &value.to_ascii_lowercase() == "none" {
694                Ok(None)
695            } else {
696                Ok(Some(T::deserialize(value.into_deserializer())?))
697            }
698        }
699    }
700
701    pub(super) fn deserialize<'de, D, T>(deserializer: D) -> Result<Option<T>, D::Error>
702    where
703        D: Deserializer<'de>,
704        T: Deserialize<'de>,
705    {
706        deserializer.deserialize_str(NoneAsString(PhantomData))
707    }
708}
709
710/// Fault tolerance model.
711///
712/// The ordering is significant: we consider [Self::ExactlyOnce] to be a "higher
713/// level" of fault tolerance than [Self::AtLeastOnce].
714#[derive(
715    Debug, Copy, Clone, Default, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, ToSchema,
716)]
717#[serde(rename_all = "snake_case")]
718pub enum FtModel {
719    /// Each record is output at least once.  Crashes may duplicate output, but
720    /// no input or output is dropped.
721    AtLeastOnce,
722
723    /// Each record is output exactly once.  Crashes do not drop or duplicate
724    /// input or output.
725    #[default]
726    ExactlyOnce,
727}
728
729impl FtModel {
730    pub fn option_as_str(value: Option<FtModel>) -> &'static str {
731        value.map_or("no", |model| model.as_str())
732    }
733
734    pub fn as_str(&self) -> &'static str {
735        match self {
736            FtModel::AtLeastOnce => "at_least_once",
737            FtModel::ExactlyOnce => "exactly_once",
738        }
739    }
740}
741
742pub struct FtModelUnknown;
743
744impl FromStr for FtModel {
745    type Err = FtModelUnknown;
746
747    fn from_str(s: &str) -> Result<Self, Self::Err> {
748        match s.to_ascii_lowercase().as_str() {
749            "exactly_once" => Ok(Self::ExactlyOnce),
750            "at_least_once" => Ok(Self::AtLeastOnce),
751            _ => Err(FtModelUnknown),
752        }
753    }
754}
755
756/// Describes an input connector configuration
757#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
758pub struct InputEndpointConfig {
759    /// The name of the input stream of the circuit that this endpoint is
760    /// connected to.
761    pub stream: Cow<'static, str>,
762
763    /// Connector configuration.
764    #[serde(flatten)]
765    pub connector_config: ConnectorConfig,
766}
767
768/// Deserialize the `start_after` property of a connector configuration.
769/// It requires a non-standard deserialization because we want to accept
770/// either a string or an array of strings.
771fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
772where
773    D: Deserializer<'de>,
774{
775    let value = Option::<JsonValue>::deserialize(deserializer)?;
776    match value {
777        Some(JsonValue::String(s)) => Ok(Some(vec![s])),
778        Some(JsonValue::Array(arr)) => {
779            let vec = arr
780                .into_iter()
781                .map(|item| {
782                    item.as_str()
783                        .map(|s| s.to_string())
784                        .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
785                })
786                .collect::<Result<Vec<String>, _>>()?;
787            Ok(Some(vec))
788        }
789        Some(JsonValue::Null) | None => Ok(None),
790        _ => Err(serde::de::Error::custom(
791            "invalid 'start_after' property: expected a string, an array of strings, or null",
792        )),
793    }
794}
795
796/// A data connector's configuration
797#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
798pub struct ConnectorConfig {
799    /// Transport endpoint configuration.
800    pub transport: TransportConfig,
801
802    /// Parser configuration.
803    pub format: Option<FormatConfig>,
804
805    /// Name of the index that the connector is attached to.
806    ///
807    /// This property is valid for output connectors only.  It is used with data
808    /// transports and formats that expect output updates in the form of key/value
809    /// pairs, where the key typically represents a unique id associated with the
810    /// table or view.
811    ///
812    /// To support such output formats, an output connector can be attached to an
813    /// index created using the SQL CREATE INDEX statement.  An index of a table
814    /// or view contains the same updates as the table or view itself, indexed by
815    /// one or more key columns.
816    ///
817    /// See individual connector documentation for details on how they work
818    /// with indexes.
819    pub index: Option<String>,
820
821    /// Output buffer configuration.
822    #[serde(flatten)]
823    pub output_buffer_config: OutputBufferConfig,
824
825    /// Maximum batch size, in records.
826    ///
827    /// This is the maximum number of records to process in one batch through
828    /// the circuit.  The time and space cost of processing a batch is
829    /// asymptotically superlinear in the size of the batch, but very small
830    /// batches are less efficient due to constant factors.
831    ///
832    /// This should usually be less than `max_queued_records`, to give the
833    /// connector a round-trip time to restart and refill the buffer while
834    /// batches are being processed.
835    ///
836    /// Some input adapters might not honor this setting.
837    ///
838    /// The default is 10,000.
839    #[serde(default = "default_max_batch_size")]
840    pub max_batch_size: u64,
841
842    /// Backpressure threshold.
843    ///
844    /// Maximal number of records queued by the endpoint before the endpoint
845    /// is paused by the backpressure mechanism.
846    ///
847    /// For input endpoints, this setting bounds the number of records that have
848    /// been received from the input transport but haven't yet been consumed by
849    /// the circuit since the circuit, since the circuit is still busy processing
850    /// previous inputs.
851    ///
852    /// For output endpoints, this setting bounds the number of records that have
853    /// been produced by the circuit but not yet sent via the output transport endpoint
854    /// nor stored in the output buffer (see `enable_output_buffer`).
855    ///
856    /// Note that this is not a hard bound: there can be a small delay between
857    /// the backpressure mechanism is triggered and the endpoint is paused, during
858    /// which more data may be queued.
859    ///
860    /// The default is 1 million.
861    #[serde(default = "default_max_queued_records")]
862    pub max_queued_records: u64,
863
864    /// Create connector in paused state.
865    ///
866    /// The default is `false`.
867    #[serde(default)]
868    pub paused: bool,
869
870    /// Arbitrary user-defined text labels associated with the connector.
871    ///
872    /// These labels can be used in conjunction with the `start_after` property
873    /// to control the start order of connectors.
874    #[serde(default)]
875    pub labels: Vec<String>,
876
877    /// Start the connector after all connectors with specified labels.
878    ///
879    /// This property is used to control the start order of connectors.
880    /// The connector will not start until all connectors with the specified
881    /// labels have finished processing all inputs.
882    #[serde(deserialize_with = "deserialize_start_after")]
883    #[serde(default)]
884    pub start_after: Option<Vec<String>>,
885}
886
887#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
888#[serde(default)]
889pub struct OutputBufferConfig {
890    /// Enable output buffering.
891    ///
892    /// The output buffering mechanism allows decoupling the rate at which the pipeline
893    /// pushes changes to the output transport from the rate of input changes.
894    ///
895    /// By default, output updates produced by the pipeline are pushed directly to
896    /// the output transport. Some destinations may prefer to receive updates in fewer
897    /// bigger batches. For instance, when writing Parquet files, producing
898    /// one bigger file every few minutes is usually better than creating
899    /// small files every few milliseconds.
900    ///
901    /// To achieve such input/output decoupling, users can enable output buffering by
902    /// setting the `enable_output_buffer` flag to `true`.  When buffering is enabled, output
903    /// updates produced by the pipeline are consolidated in an internal buffer and are
904    /// pushed to the output transport when one of several conditions is satisfied:
905    ///
906    /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
907    ///   milliseconds.
908    /// * buffer size exceeds `max_output_buffer_size_records` records.
909    ///
910    /// This flag is `false` by default.
911    // TODO: on-demand output triggered via the API.
912    pub enable_output_buffer: bool,
913
914    /// Maximum time in milliseconds data is kept in the output buffer.
915    ///
916    /// By default, data is kept in the buffer indefinitely until one of
917    /// the other output conditions is satisfied.  When this option is
918    /// set the buffer will be flushed at most every
919    /// `max_output_buffer_time_millis` milliseconds.
920    ///
921    /// NOTE: this configuration option requires the `enable_output_buffer` flag
922    /// to be set.
923    pub max_output_buffer_time_millis: usize,
924
925    /// Maximum number of updates to be kept in the output buffer.
926    ///
927    /// This parameter bounds the maximal size of the buffer.
928    /// Note that the size of the buffer is not always equal to the
929    /// total number of updates output by the pipeline. Updates to the
930    /// same record can overwrite or cancel previous updates.
931    ///
932    /// By default, the buffer can grow indefinitely until one of
933    /// the other output conditions is satisfied.
934    ///
935    /// NOTE: this configuration option requires the `enable_output_buffer` flag
936    /// to be set.
937    pub max_output_buffer_size_records: usize,
938}
939
940impl Default for OutputBufferConfig {
941    fn default() -> Self {
942        Self {
943            enable_output_buffer: false,
944            max_output_buffer_size_records: usize::MAX,
945            max_output_buffer_time_millis: usize::MAX,
946        }
947    }
948}
949
950impl OutputBufferConfig {
951    pub fn validate(&self) -> Result<(), String> {
952        if self.enable_output_buffer
953            && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
954            && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
955        {
956            return Err(
957                "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
958                    .to_string(),
959            );
960        }
961
962        Ok(())
963    }
964}
965
966/// Describes an output connector configuration
967#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
968pub struct OutputEndpointConfig {
969    /// The name of the output stream of the circuit that this endpoint is
970    /// connected to.
971    pub stream: Cow<'static, str>,
972
973    /// Connector configuration.
974    #[serde(flatten)]
975    pub connector_config: ConnectorConfig,
976}
977
978/// Transport-specific endpoint configuration passed to
979/// `crate::OutputTransport::new_endpoint`
980/// and `crate::InputTransport::new_endpoint`.
981#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
982#[serde(tag = "name", content = "config", rename_all = "snake_case")]
983pub enum TransportConfig {
984    FileInput(FileInputConfig),
985    FileOutput(FileOutputConfig),
986    KafkaInput(KafkaInputConfig),
987    KafkaOutput(KafkaOutputConfig),
988    PubSubInput(PubSubInputConfig),
989    UrlInput(UrlInputConfig),
990    S3Input(S3InputConfig),
991    DeltaTableInput(DeltaTableReaderConfig),
992    DeltaTableOutput(DeltaTableWriterConfig),
993    RedisOutput(RedisOutputConfig),
994    // Prevent rust from complaining about large size difference between enum variants.
995    IcebergInput(Box<IcebergReaderConfig>),
996    PostgresInput(PostgresReaderConfig),
997    PostgresOutput(PostgresWriterConfig),
998    Datagen(DatagenInputConfig),
999    Nexmark(NexmarkInputConfig),
1000    /// Direct HTTP input: cannot be instantiated through API
1001    HttpInput(HttpInputConfig),
1002    /// Direct HTTP output: cannot be instantiated through API
1003    HttpOutput,
1004    /// Ad hoc input: cannot be instantiated through API
1005    AdHocInput(AdHocInputConfig),
1006}
1007
1008impl TransportConfig {
1009    pub fn name(&self) -> String {
1010        match self {
1011            TransportConfig::FileInput(_) => "file_input".to_string(),
1012            TransportConfig::FileOutput(_) => "file_output".to_string(),
1013            TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
1014            TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
1015            TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
1016            TransportConfig::UrlInput(_) => "url_input".to_string(),
1017            TransportConfig::S3Input(_) => "s3_input".to_string(),
1018            TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
1019            TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
1020            TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
1021            TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
1022            TransportConfig::PostgresOutput(_) => "postgres_output".to_string(),
1023            TransportConfig::Datagen(_) => "datagen".to_string(),
1024            TransportConfig::Nexmark(_) => "nexmark".to_string(),
1025            TransportConfig::HttpInput(_) => "http_input".to_string(),
1026            TransportConfig::HttpOutput => "http_output".to_string(),
1027            TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
1028            TransportConfig::RedisOutput(_) => "redis_output".to_string(),
1029        }
1030    }
1031}
1032
1033/// Data format specification used to parse raw data received from the
1034/// endpoint or to encode data sent to the endpoint.
1035#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
1036pub struct FormatConfig {
1037    /// Format name, e.g., "csv", "json", "bincode", etc.
1038    pub name: Cow<'static, str>,
1039
1040    /// Format-specific parser or encoder configuration.
1041    #[serde(default)]
1042    #[schema(value_type = Object)]
1043    pub config: YamlValue,
1044}
1045
1046#[derive(Debug, Clone, Eq, PartialEq, Default, Serialize, Deserialize, ToSchema)]
1047#[serde(default)]
1048pub struct ResourceConfig {
1049    /// The minimum number of CPU cores to reserve
1050    /// for an instance of this pipeline
1051    pub cpu_cores_min: Option<u64>,
1052
1053    /// The maximum number of CPU cores to reserve
1054    /// for an instance of this pipeline
1055    pub cpu_cores_max: Option<u64>,
1056
1057    /// The minimum memory in Megabytes to reserve
1058    /// for an instance of this pipeline
1059    pub memory_mb_min: Option<u64>,
1060
1061    /// The maximum memory in Megabytes to reserve
1062    /// for an instance of this pipeline
1063    pub memory_mb_max: Option<u64>,
1064
1065    /// The total storage in Megabytes to reserve
1066    /// for an instance of this pipeline
1067    pub storage_mb_max: Option<u64>,
1068
1069    /// Storage class to use for an instance of this pipeline.
1070    /// The class determines storage performance such as IOPS and throughput.
1071    pub storage_class: Option<String>,
1072}