feldera_types/
config.rs

1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure.  The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs.  We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::transport::adhoc::AdHocInputConfig;
9use crate::transport::datagen::DatagenInputConfig;
10use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
11use crate::transport::file::{FileInputConfig, FileOutputConfig};
12use crate::transport::http::HttpInputConfig;
13use crate::transport::iceberg::IcebergReaderConfig;
14use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
15use crate::transport::nexmark::NexmarkInputConfig;
16use crate::transport::postgres::PostgresReaderConfig;
17use crate::transport::pubsub::PubSubInputConfig;
18use crate::transport::redis::RedisOutputConfig;
19use crate::transport::s3::S3InputConfig;
20use crate::transport::url::UrlInputConfig;
21use core::fmt;
22use serde::de::{self, MapAccess, Visitor};
23use serde::{Deserialize, Deserializer, Serialize};
24use serde_json::Value as JsonValue;
25use serde_yaml::Value as YamlValue;
26use std::fmt::Display;
27use std::path::Path;
28use std::{borrow::Cow, cmp::max, collections::BTreeMap};
29use utoipa::ToSchema;
30
31const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
32
33/// Default value of `ConnectorConfig::max_queued_records`.
34pub const fn default_max_queued_records() -> u64 {
35    1_000_000
36}
37
38/// Default maximum batch size for connectors, in records.
39///
40/// If you change this then update the comment on
41/// [ConnectorConfig::max_batch_size].
42pub const fn default_max_batch_size() -> u64 {
43    10_000
44}
45
46/// Pipeline deployment configuration.
47/// It represents configuration entries directly provided by the user
48/// (e.g., runtime configuration) and entries derived from the schema
49/// of the compiled program (e.g., connectors). Storage configuration,
50/// if applicable, is set by the runner.
51#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
52pub struct PipelineConfig {
53    /// Global controller configuration.
54    #[serde(flatten)]
55    #[schema(inline)]
56    pub global: RuntimeConfig,
57
58    /// Pipeline name.
59    pub name: Option<String>,
60
61    /// Configuration for persistent storage
62    ///
63    /// If `global.storage` is `Some(_)`, this field must be set to some
64    /// [`StorageConfig`].  If `global.storage` is `None``, the pipeline ignores
65    /// this field.
66    #[serde(default)]
67    pub storage_config: Option<StorageConfig>,
68
69    /// Input endpoint configuration.
70    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
71
72    /// Output endpoint configuration.
73    #[serde(default)]
74    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
75}
76
77impl PipelineConfig {
78    pub fn max_parallel_connector_init(&self) -> u64 {
79        max(
80            self.global
81                .max_parallel_connector_init
82                .unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
83            1,
84        )
85    }
86
87    pub fn with_storage(self, storage: Option<(StorageConfig, StorageOptions)>) -> Self {
88        let (storage_config, storage_options) = storage.unzip();
89        Self {
90            global: RuntimeConfig {
91                storage: storage_options,
92                ..self.global
93            },
94            storage_config,
95            ..self
96        }
97    }
98
99    pub fn storage(&self) -> Option<(&StorageConfig, &StorageOptions)> {
100        let storage_options = self.global.storage.as_ref();
101        let storage_config = self.storage_config.as_ref();
102        storage_config.zip(storage_options)
103    }
104}
105
106/// Configuration for persistent storage in a [`PipelineConfig`].
107#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
108pub struct StorageConfig {
109    /// A directory to keep pipeline state, as a path on the filesystem of the
110    /// machine or container where the pipeline will run.
111    ///
112    /// When storage is enabled, this directory stores the data for
113    /// [StorageBackendConfig::Default].
114    ///
115    /// When fault tolerance is enabled, this directory stores checkpoints and
116    /// the log.
117    pub path: String,
118
119    /// How to cache access to storage in this pipeline.
120    #[serde(default)]
121    pub cache: StorageCacheConfig,
122}
123
124impl StorageConfig {
125    pub fn path(&self) -> &Path {
126        Path::new(&self.path)
127    }
128}
129
130/// How to cache access to storage within a Feldera pipeline.
131#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
132#[serde(rename_all = "snake_case")]
133pub enum StorageCacheConfig {
134    /// Use the operating system's page cache as the primary storage cache.
135    ///
136    /// This is the default because it currently performs better than
137    /// `FelderaCache`.
138    #[default]
139    PageCache,
140
141    /// Use Feldera's internal cache implementation.
142    ///
143    /// This is under development. It will become the default when its
144    /// performance exceeds that of `PageCache`.
145    FelderaCache,
146}
147
148impl StorageCacheConfig {
149    #[cfg(unix)]
150    pub fn to_custom_open_flags(&self) -> i32 {
151        match self {
152            StorageCacheConfig::PageCache => (),
153            StorageCacheConfig::FelderaCache => {
154                #[cfg(target_os = "linux")]
155                return libc::O_DIRECT;
156            }
157        }
158        0
159    }
160}
161
162/// Storage configuration for a pipeline.
163#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
164#[serde(default)]
165pub struct StorageOptions {
166    /// How to connect to the underlying storage.
167    pub backend: StorageBackendConfig,
168
169    /// The minimum estimated number of bytes in a batch of data to write it to
170    /// storage.  This is provided for debugging and fine-tuning and should
171    /// ordinarily be left unset.
172    ///
173    /// A value of 0 will write even empty batches to storage, and nonzero
174    /// values provide a threshold.  `usize::MAX` would effectively disable
175    /// storage.
176    ///
177    /// The default is 1,048,576 (1 MiB).
178    pub min_storage_bytes: Option<usize>,
179
180    /// The form of compression to use in data batches.
181    ///
182    /// Compression has a CPU cost but it can take better advantage of limited
183    /// NVMe and network bandwidth, which means that it can increase overall
184    /// performance.
185    pub compression: StorageCompression,
186
187    /// The maximum size of the in-memory storage cache, in MiB.
188    ///
189    /// If set, the specified cache size is spread across all the foreground and
190    /// background threads. If unset, each foreground or background thread cache
191    /// is limited to 256 MiB.
192    pub cache_mib: Option<usize>,
193}
194
195/// Backend storage configuration.
196#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
197#[serde(tag = "name", content = "config", rename_all = "snake_case")]
198pub enum StorageBackendConfig {
199    /// Use the default storage configuration.
200    ///
201    /// This currently uses the local file system.
202    #[default]
203    Default,
204
205    /// Object storage.
206    Object(ObjectStorageConfig),
207}
208
209impl Display for StorageBackendConfig {
210    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211        match self {
212            StorageBackendConfig::Default => write!(f, "default"),
213            StorageBackendConfig::Object(_) => write!(f, "object"),
214        }
215    }
216}
217
218/// Storage compression algorithm.
219#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
220#[serde(rename_all = "snake_case")]
221pub enum StorageCompression {
222    /// Use Feldera's default compression algorithm.
223    ///
224    /// The default may change as Feldera's performance is tuned and new
225    /// algorithms are introduced.
226    #[default]
227    Default,
228
229    /// Do not compress.
230    None,
231
232    /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
233    Snappy,
234}
235
236#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
237pub struct ObjectStorageConfig {
238    /// URL.
239    ///
240    /// The following URL schemes are supported:
241    ///
242    /// * S3:
243    ///   - `s3://<bucket>/<path>`
244    ///   - `s3a://<bucket>/<path>`
245    ///   - `https://s3.<region>.amazonaws.com/<bucket>`
246    ///   - `https://<bucket>.s3.<region>.amazonaws.com`
247    ///   - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
248    /// * Google Cloud Storage:
249    ///   - `gs://<bucket>/<path>`
250    /// * Microsoft Azure Blob Storage:
251    ///   - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
252    ///   - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
253    ///   - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
254    ///   - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
255    ///   - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
256    ///   - `azure://<container>/<path>` (custom)
257    ///   - `https://<account>.dfs.core.windows.net`
258    ///   - `https://<account>.blob.core.windows.net`
259    ///   - `https://<account>.blob.core.windows.net/<container>`
260    ///   - `https://<account>.dfs.fabric.microsoft.com`
261    ///   - `https://<account>.dfs.fabric.microsoft.com/<container>`
262    ///   - `https://<account>.blob.fabric.microsoft.com`
263    ///   - `https://<account>.blob.fabric.microsoft.com/<container>`
264    ///
265    /// Settings derived from the URL will override other settings.
266    pub url: String,
267
268    /// Additional options as key-value pairs.
269    ///
270    /// The following keys are supported:
271    ///
272    /// * S3:
273    ///   - `access_key_id`: AWS Access Key.
274    ///   - `secret_access_key`: AWS Secret Access Key.
275    ///   - `region`: Region.
276    ///   - `default_region`: Default region.
277    ///   - `endpoint`: Custom endpoint for communicating with S3,
278    ///     e.g. `https://localhost:4566` for testing against a localstack
279    ///     instance.
280    ///   - `token`: Token to use for requests (passed to underlying provider).
281    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
282    /// * Google Cloud Storage:
283    ///   - `service_account`: Path to the service account file.
284    ///   - `service_account_key`: The serialized service account key.
285    ///   - `google_application_credentials`: Application credentials path.
286    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
287    /// * Microsoft Azure Blob Storage:
288    ///   - `access_key`: Azure Access Key.
289    ///   - `container_name`: Azure Container Name.
290    ///   - `account`: Azure Account.
291    ///   - `bearer_token_authorization`: Static bearer token for authorizing requests.
292    ///   - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
293    ///   - `client_secret`: Client secret for use in client secret flow.
294    ///   - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
295    ///   - `endpoint`: Override the endpoint for communicating with blob storage.
296    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
297    ///
298    /// Options set through the URL take precedence over those set with these
299    /// options.
300    #[serde(flatten)]
301    pub other_options: BTreeMap<String, String>,
302}
303
304/// Global pipeline configuration settings. This is the publicly
305/// exposed type for users to configure pipelines.
306#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
307#[serde(default)]
308pub struct RuntimeConfig {
309    /// Number of DBSP worker threads.
310    ///
311    /// Each DBSP "foreground" worker thread is paired with a "background"
312    /// thread for LSM merging, making the total number of threads twice the
313    /// specified number.
314    pub workers: u16,
315
316    /// Storage configuration.
317    ///
318    /// - If this is `None`, the default, the pipeline's state is kept in
319    ///   in-memory data-structures.  This is useful if the pipeline's state
320    ///   will fit in memory and if the pipeline is ephemeral and does not need
321    ///   to be recovered after a restart. The pipeline will most likely run
322    ///   faster since it does not need to access storage.
323    ///
324    /// - If set, the pipeline's state is kept on storage.  This allows the
325    ///   pipeline to work with state that will not fit into memory. It also
326    ///   allows the state to be checkpointed and recovered across restarts.
327    #[serde(deserialize_with = "deserialize_storage_options")]
328    pub storage: Option<StorageOptions>,
329
330    /// Configures fault tolerance with the specified start up behavior. Fault
331    /// tolerance is disabled if this or `storage` is `None`.
332    #[serde(deserialize_with = "deserialize_fault_tolerance")]
333    pub fault_tolerance: Option<FtConfig>,
334
335    /// Enable CPU profiler.
336    ///
337    /// The default value is `true`.
338    pub cpu_profiler: bool,
339
340    /// Enable pipeline tracing.
341    pub tracing: bool,
342
343    /// Jaeger tracing endpoint to send tracing information to.
344    pub tracing_endpoint_jaeger: String,
345
346    /// Minimal input batch size.
347    ///
348    /// The controller delays pushing input records to the circuit until at
349    /// least `min_batch_size_records` records have been received (total
350    /// across all endpoints) or `max_buffering_delay_usecs` microseconds
351    /// have passed since at least one input records has been buffered.
352    /// Defaults to 0.
353    pub min_batch_size_records: u64,
354
355    /// Maximal delay in microseconds to wait for `min_batch_size_records` to
356    /// get buffered by the controller, defaults to 0.
357    pub max_buffering_delay_usecs: u64,
358
359    /// Resource reservations and limits. This is enforced
360    /// only in Feldera Cloud.
361    pub resources: ResourceConfig,
362
363    /// Real-time clock resolution in microseconds.
364    ///
365    /// This parameter controls the execution of queries that use the `NOW()` function.  The output of such
366    /// queries depends on the real-time clock and can change over time without any external
367    /// inputs.  The pipeline will update the clock value and trigger incremental recomputation
368    /// at most each `clock_resolution_usecs` microseconds.
369    ///
370    /// It is set to 100 milliseconds (100,000 microseconds) by default.
371    ///
372    /// Set to `null` to disable periodic clock updates.
373    pub clock_resolution_usecs: Option<u64>,
374
375    /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
376    /// its worker threads.  Specify at least twice as many CPU numbers as
377    /// workers.  CPUs are generally numbered starting from 0.  The pipeline
378    /// might not be able to honor CPU pinning requests.
379    ///
380    /// CPU pinning can make pipelines run faster and perform more consistently,
381    /// as long as different pipelines running on the same machine are pinned to
382    /// different CPUs.
383    pub pin_cpus: Vec<usize>,
384
385    /// Timeout in seconds for the `Provisioning` phase of the pipeline.
386    /// Setting this value will override the default of the runner.
387    pub provisioning_timeout_secs: Option<u64>,
388
389    /// The maximum number of connectors initialized in parallel during pipeline
390    /// startup.
391    ///
392    /// At startup, the pipeline must initialize all of its input and output connectors.
393    /// Depending on the number and types of connectors, this can take a long time.
394    /// To accelerate the process, multiple connectors are initialized concurrently.
395    /// This option controls the maximum number of connectors that can be intitialized
396    /// in parallel.
397    ///
398    /// The default is 10.
399    pub max_parallel_connector_init: Option<u64>,
400}
401
402/// Accepts "true" and "false" and converts them to the new format.
403fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
404where
405    D: Deserializer<'de>,
406{
407    struct BoolOrStruct;
408
409    impl<'de> Visitor<'de> for BoolOrStruct {
410        type Value = Option<StorageOptions>;
411
412        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
413            formatter.write_str("boolean or StorageOptions")
414        }
415
416        fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
417        where
418            E: de::Error,
419        {
420            match v {
421                false => Ok(None),
422                true => Ok(Some(StorageOptions::default())),
423            }
424        }
425
426        fn visit_unit<E>(self) -> Result<Self::Value, E>
427        where
428            E: de::Error,
429        {
430            Ok(None)
431        }
432
433        fn visit_none<E>(self) -> Result<Self::Value, E>
434        where
435            E: de::Error,
436        {
437            Ok(None)
438        }
439
440        fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
441        where
442            M: MapAccess<'de>,
443        {
444            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
445        }
446    }
447
448    deserializer.deserialize_any(BoolOrStruct)
449}
450
451/// Accepts old 'initial_state' and 'latest_checkpoint' and converts them to the
452/// new format.
453fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<Option<FtConfig>, D::Error>
454where
455    D: Deserializer<'de>,
456{
457    struct StringOrStruct;
458
459    impl<'de> Visitor<'de> for StringOrStruct {
460        type Value = Option<FtConfig>;
461
462        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
463            formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
464        }
465
466        fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
467        where
468            E: de::Error,
469        {
470            match v {
471                "initial_state" | "latest_checkpoint" => Ok(Some(FtConfig::default())),
472                _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
473            }
474        }
475
476        fn visit_unit<E>(self) -> Result<Self::Value, E>
477        where
478            E: de::Error,
479        {
480            Ok(None)
481        }
482
483        fn visit_none<E>(self) -> Result<Self::Value, E>
484        where
485            E: de::Error,
486        {
487            Ok(None)
488        }
489
490        fn visit_map<M>(self, map: M) -> Result<Option<FtConfig>, M::Error>
491        where
492            M: MapAccess<'de>,
493        {
494            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
495        }
496    }
497
498    deserializer.deserialize_any(StringOrStruct)
499}
500
501impl Default for RuntimeConfig {
502    fn default() -> Self {
503        Self {
504            workers: 8,
505            storage: Some(StorageOptions::default()),
506            fault_tolerance: None,
507            cpu_profiler: true,
508            tracing: {
509                // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
510                // to keep it on by default.
511                false
512            },
513            tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
514            min_batch_size_records: 0,
515            max_buffering_delay_usecs: 0,
516            resources: ResourceConfig::default(),
517            clock_resolution_usecs: {
518                // Every 100 ms.
519                Some(100_000)
520            },
521            pin_cpus: Vec::new(),
522            provisioning_timeout_secs: None,
523            max_parallel_connector_init: None,
524        }
525    }
526}
527
528/// Fault-tolerance configuration for runtime startup.
529#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
530#[serde(default)]
531#[serde(rename_all = "snake_case")]
532pub struct FtConfig {
533    /// Interval between automatic checkpoints, in seconds.
534    ///
535    /// The default is 60 seconds.  A value of 0 disables automatic
536    /// checkpointing.
537    pub checkpoint_interval_secs: u64,
538}
539
540impl Default for FtConfig {
541    fn default() -> Self {
542        Self {
543            checkpoint_interval_secs: 60,
544        }
545    }
546}
547
548/// Describes an input connector configuration
549#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
550pub struct InputEndpointConfig {
551    /// The name of the input stream of the circuit that this endpoint is
552    /// connected to.
553    pub stream: Cow<'static, str>,
554
555    /// Connector configuration.
556    #[serde(flatten)]
557    pub connector_config: ConnectorConfig,
558}
559
560/// Deserialize the `start_after` property of a connector configuration.
561/// It requires a non-standard deserialization because we want to accept
562/// either a string or an array of strings.
563fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
564where
565    D: Deserializer<'de>,
566{
567    let value = Option::<JsonValue>::deserialize(deserializer)?;
568    match value {
569        Some(JsonValue::String(s)) => Ok(Some(vec![s])),
570        Some(JsonValue::Array(arr)) => {
571            let vec = arr
572                .into_iter()
573                .map(|item| {
574                    item.as_str()
575                        .map(|s| s.to_string())
576                        .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
577                })
578                .collect::<Result<Vec<String>, _>>()?;
579            Ok(Some(vec))
580        }
581        Some(JsonValue::Null) | None => Ok(None),
582        _ => Err(serde::de::Error::custom(
583            "invalid 'start_after' property: expected a string, an array of strings, or null",
584        )),
585    }
586}
587
588/// A data connector's configuration
589#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
590pub struct ConnectorConfig {
591    /// Transport endpoint configuration.
592    pub transport: TransportConfig,
593
594    /// Parser configuration.
595    pub format: Option<FormatConfig>,
596
597    /// Name of the index that the connector is attached to.
598    ///
599    /// This property is valid for output connectors only.  It is used with data
600    /// transports and formats that expect output updates in the form of key/value
601    /// pairs, where the key typically represents a unique id associated with the
602    /// table or view.
603    ///
604    /// To support such output formats, an output connector can be attached to an
605    /// index created using the SQL CREATE INDEX statement.  An index of a table
606    /// or view contains the same updates as the table or view itself, indexed by
607    /// one or more key columns.
608    ///
609    /// See individual connector documentation for details on how they work
610    /// with indexes.
611    pub index: Option<String>,
612
613    /// Output buffer configuration.
614    #[serde(flatten)]
615    pub output_buffer_config: OutputBufferConfig,
616
617    /// Maximum batch size, in records.
618    ///
619    /// This is the maximum number of records to process in one batch through
620    /// the circuit.  The time and space cost of processing a batch is
621    /// asymptotically superlinear in the size of the batch, but very small
622    /// batches are less efficient due to constant factors.
623    ///
624    /// This should usually be less than `max_queued_records`, to give the
625    /// connector a round-trip time to restart and refill the buffer while
626    /// batches are being processed.
627    ///
628    /// Some input adapters might not honor this setting.
629    ///
630    /// The default is 10,000.
631    #[serde(default = "default_max_batch_size")]
632    pub max_batch_size: u64,
633
634    /// Backpressure threshold.
635    ///
636    /// Maximal number of records queued by the endpoint before the endpoint
637    /// is paused by the backpressure mechanism.
638    ///
639    /// For input endpoints, this setting bounds the number of records that have
640    /// been received from the input transport but haven't yet been consumed by
641    /// the circuit since the circuit, since the circuit is still busy processing
642    /// previous inputs.
643    ///
644    /// For output endpoints, this setting bounds the number of records that have
645    /// been produced by the circuit but not yet sent via the output transport endpoint
646    /// nor stored in the output buffer (see `enable_output_buffer`).
647    ///
648    /// Note that this is not a hard bound: there can be a small delay between
649    /// the backpressure mechanism is triggered and the endpoint is paused, during
650    /// which more data may be queued.
651    ///
652    /// The default is 1 million.
653    #[serde(default = "default_max_queued_records")]
654    pub max_queued_records: u64,
655
656    /// Create connector in paused state.
657    ///
658    /// The default is `false`.
659    #[serde(default)]
660    pub paused: bool,
661
662    /// Arbitrary user-defined text labels associated with the connector.
663    ///
664    /// These labels can be used in conjunction with the `start_after` property
665    /// to control the start order of connectors.
666    #[serde(default)]
667    pub labels: Vec<String>,
668
669    /// Start the connector after all connectors with specified labels.
670    ///
671    /// This property is used to control the start order of connectors.
672    /// The connector will not start until all connectors with the specified
673    /// labels have finished processing all inputs.
674    #[serde(deserialize_with = "deserialize_start_after")]
675    #[serde(default)]
676    pub start_after: Option<Vec<String>>,
677}
678
679#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
680#[serde(default)]
681pub struct OutputBufferConfig {
682    /// Enable output buffering.
683    ///
684    /// The output buffering mechanism allows decoupling the rate at which the pipeline
685    /// pushes changes to the output transport from the rate of input changes.
686    ///
687    /// By default, output updates produced by the pipeline are pushed directly to
688    /// the output transport. Some destinations may prefer to receive updates in fewer
689    /// bigger batches. For instance, when writing Parquet files, producing
690    /// one bigger file every few minutes is usually better than creating
691    /// small files every few milliseconds.
692    ///
693    /// To achieve such input/output decoupling, users can enable output buffering by
694    /// setting the `enable_output_buffer` flag to `true`.  When buffering is enabled, output
695    /// updates produced by the pipeline are consolidated in an internal buffer and are
696    /// pushed to the output transport when one of several conditions is satisfied:
697    ///
698    /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
699    ///   milliseconds.
700    /// * buffer size exceeds `max_output_buffer_size_records` records.
701    ///
702    /// This flag is `false` by default.
703    // TODO: on-demand output triggered via the API.
704    pub enable_output_buffer: bool,
705
706    /// Maximum time in milliseconds data is kept in the output buffer.
707    ///
708    /// By default, data is kept in the buffer indefinitely until one of
709    /// the other output conditions is satisfied.  When this option is
710    /// set the buffer will be flushed at most every
711    /// `max_output_buffer_time_millis` milliseconds.
712    ///
713    /// NOTE: this configuration option requires the `enable_output_buffer` flag
714    /// to be set.
715    pub max_output_buffer_time_millis: usize,
716
717    /// Maximum number of updates to be kept in the output buffer.
718    ///
719    /// This parameter bounds the maximal size of the buffer.
720    /// Note that the size of the buffer is not always equal to the
721    /// total number of updates output by the pipeline. Updates to the
722    /// same record can overwrite or cancel previous updates.
723    ///
724    /// By default, the buffer can grow indefinitely until one of
725    /// the other output conditions is satisfied.
726    ///
727    /// NOTE: this configuration option requires the `enable_output_buffer` flag
728    /// to be set.
729    pub max_output_buffer_size_records: usize,
730}
731
732impl Default for OutputBufferConfig {
733    fn default() -> Self {
734        Self {
735            enable_output_buffer: false,
736            max_output_buffer_size_records: usize::MAX,
737            max_output_buffer_time_millis: usize::MAX,
738        }
739    }
740}
741
742impl OutputBufferConfig {
743    pub fn validate(&self) -> Result<(), String> {
744        if self.enable_output_buffer
745            && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
746            && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
747        {
748            return Err(
749                "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
750                    .to_string(),
751            );
752        }
753
754        Ok(())
755    }
756}
757
758/// Describes an output connector configuration
759#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
760pub struct OutputEndpointConfig {
761    /// The name of the output stream of the circuit that this endpoint is
762    /// connected to.
763    pub stream: Cow<'static, str>,
764
765    /// Connector configuration.
766    #[serde(flatten)]
767    pub connector_config: ConnectorConfig,
768}
769
770/// Transport-specific endpoint configuration passed to
771/// `crate::OutputTransport::new_endpoint`
772/// and `crate::InputTransport::new_endpoint`.
773#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
774#[serde(tag = "name", content = "config", rename_all = "snake_case")]
775pub enum TransportConfig {
776    FileInput(FileInputConfig),
777    FileOutput(FileOutputConfig),
778    KafkaInput(KafkaInputConfig),
779    KafkaOutput(KafkaOutputConfig),
780    PubSubInput(PubSubInputConfig),
781    UrlInput(UrlInputConfig),
782    S3Input(S3InputConfig),
783    DeltaTableInput(DeltaTableReaderConfig),
784    DeltaTableOutput(DeltaTableWriterConfig),
785    RedisOutput(RedisOutputConfig),
786    // Prevent rust from complaining about large size difference between enum variants.
787    IcebergInput(Box<IcebergReaderConfig>),
788    PostgresInput(PostgresReaderConfig),
789    Datagen(DatagenInputConfig),
790    Nexmark(NexmarkInputConfig),
791    /// Direct HTTP input: cannot be instantiated through API
792    HttpInput(HttpInputConfig),
793    /// Direct HTTP output: cannot be instantiated through API
794    HttpOutput,
795    /// Ad hoc input: cannot be instantiated through API
796    AdHocInput(AdHocInputConfig),
797}
798
799impl TransportConfig {
800    pub fn name(&self) -> String {
801        match self {
802            TransportConfig::FileInput(_) => "file_input".to_string(),
803            TransportConfig::FileOutput(_) => "file_output".to_string(),
804            TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
805            TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
806            TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
807            TransportConfig::UrlInput(_) => "url_input".to_string(),
808            TransportConfig::S3Input(_) => "s3_input".to_string(),
809            TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
810            TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
811            TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
812            TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
813            TransportConfig::Datagen(_) => "datagen".to_string(),
814            TransportConfig::Nexmark(_) => "nexmark".to_string(),
815            TransportConfig::HttpInput(_) => "http_input".to_string(),
816            TransportConfig::HttpOutput => "http_output".to_string(),
817            TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
818            TransportConfig::RedisOutput(_) => "redis_output".to_string(),
819        }
820    }
821}
822
823/// Data format specification used to parse raw data received from the
824/// endpoint or to encode data sent to the endpoint.
825#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
826pub struct FormatConfig {
827    /// Format name, e.g., "csv", "json", "bincode", etc.
828    pub name: Cow<'static, str>,
829
830    /// Format-specific parser or encoder configuration.
831    #[serde(default)]
832    #[schema(value_type = Object)]
833    pub config: YamlValue,
834}
835
836#[derive(Debug, Clone, Eq, PartialEq, Default, Serialize, Deserialize, ToSchema)]
837#[serde(default)]
838pub struct ResourceConfig {
839    /// The minimum number of CPU cores to reserve
840    /// for an instance of this pipeline
841    pub cpu_cores_min: Option<u64>,
842
843    /// The maximum number of CPU cores to reserve
844    /// for an instance of this pipeline
845    pub cpu_cores_max: Option<u64>,
846
847    /// The minimum memory in Megabytes to reserve
848    /// for an instance of this pipeline
849    pub memory_mb_min: Option<u64>,
850
851    /// The maximum memory in Megabytes to reserve
852    /// for an instance of this pipeline
853    pub memory_mb_max: Option<u64>,
854
855    /// The total storage in Megabytes to reserve
856    /// for an instance of this pipeline
857    pub storage_mb_max: Option<u64>,
858
859    /// Storage class to use for an instance of this pipeline.
860    /// The class determines storage performance such as IOPS and throughput.
861    pub storage_class: Option<String>,
862}