feldera_types/
config.rs

1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure.  The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs.  We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::transport::adhoc::AdHocInputConfig;
9use crate::transport::datagen::DatagenInputConfig;
10use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
11use crate::transport::file::{FileInputConfig, FileOutputConfig};
12use crate::transport::http::HttpInputConfig;
13use crate::transport::iceberg::IcebergReaderConfig;
14use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
15use crate::transport::nexmark::NexmarkInputConfig;
16use crate::transport::postgres::PostgresReaderConfig;
17use crate::transport::pubsub::PubSubInputConfig;
18use crate::transport::redis::RedisOutputConfig;
19use crate::transport::s3::S3InputConfig;
20use crate::transport::url::UrlInputConfig;
21use core::fmt;
22use serde::de::{self, MapAccess, Visitor};
23use serde::{Deserialize, Deserializer, Serialize};
24use serde_json::Value as JsonValue;
25use serde_yaml::Value as YamlValue;
26use std::fmt::Display;
27use std::path::Path;
28use std::{borrow::Cow, cmp::max, collections::BTreeMap};
29use utoipa::ToSchema;
30
31const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
32
33/// Default value of `ConnectorConfig::max_queued_records`.
34pub const fn default_max_queued_records() -> u64 {
35    1_000_000
36}
37
38/// Default maximum batch size for connectors, in records.
39///
40/// If you change this then update the comment on
41/// [ConnectorConfig::max_batch_size].
42pub const fn default_max_batch_size() -> u64 {
43    10_000
44}
45
46/// Pipeline deployment configuration.
47/// It represents configuration entries directly provided by the user
48/// (e.g., runtime configuration) and entries derived from the schema
49/// of the compiled program (e.g., connectors). Storage configuration,
50/// if applicable, is set by the runner.
51#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
52pub struct PipelineConfig {
53    /// Global controller configuration.
54    #[serde(flatten)]
55    #[schema(inline)]
56    pub global: RuntimeConfig,
57
58    /// Pipeline name.
59    pub name: Option<String>,
60
61    /// Configuration for persistent storage
62    ///
63    /// If `global.storage` is `true`, this field must be set to some
64    /// [`StorageConfig`].  If `global.storage` is `false`, the pipeline ignores
65    /// this field.
66    #[serde(default)]
67    pub storage_config: Option<StorageConfig>,
68
69    /// Input endpoint configuration.
70    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
71
72    /// Output endpoint configuration.
73    #[serde(default)]
74    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
75}
76
77impl PipelineConfig {
78    pub fn max_parallel_connector_init(&self) -> u64 {
79        max(
80            self.global
81                .max_parallel_connector_init
82                .unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
83            1,
84        )
85    }
86}
87
88/// Configuration for persistent storage in a [`PipelineConfig`].
89#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
90pub struct StorageConfig {
91    /// A directory to keep pipeline state, as a path on the filesystem of the
92    /// machine or container where the pipeline will run.
93    ///
94    /// When storage is enabled, this directory stores the data for
95    /// [StorageBackendConfig::Default].
96    ///
97    /// When fault tolerance is enabled, this directory stores checkpoints and
98    /// the log.
99    pub path: String,
100
101    /// How to cache access to storage in this pipeline.
102    #[serde(default)]
103    pub cache: StorageCacheConfig,
104}
105
106impl StorageConfig {
107    pub fn path(&self) -> &Path {
108        Path::new(&self.path)
109    }
110}
111
112/// How to cache access to storage within a Feldera pipeline.
113#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
114#[serde(rename_all = "snake_case")]
115pub enum StorageCacheConfig {
116    /// Use the operating system's page cache as the primary storage cache.
117    ///
118    /// This is the default because it currently performs better than
119    /// `FelderaCache`.
120    #[default]
121    PageCache,
122
123    /// Use Feldera's internal cache implementation.
124    ///
125    /// This is under development. It will become the default when its
126    /// performance exceeds that of `PageCache`.
127    FelderaCache,
128}
129
130impl StorageCacheConfig {
131    #[cfg(unix)]
132    pub fn to_custom_open_flags(&self) -> i32 {
133        match self {
134            StorageCacheConfig::PageCache => (),
135            StorageCacheConfig::FelderaCache => {
136                #[cfg(target_os = "linux")]
137                return libc::O_DIRECT;
138            }
139        }
140        0
141    }
142}
143
144/// Storage configuration for a pipeline.
145#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
146#[serde(default)]
147pub struct StorageOptions {
148    /// How to connect to the underlying storage.
149    pub backend: StorageBackendConfig,
150
151    /// The minimum estimated number of bytes in a batch of data to write it to
152    /// storage.  This is provided for debugging and fine-tuning and should
153    /// ordinarily be left unset.
154    ///
155    /// A value of 0 will write even empty batches to storage, and nonzero
156    /// values provide a threshold.  `usize::MAX` would effectively disable
157    /// storage.
158    ///
159    /// The default is 1,048,576 (1 MiB).
160    pub min_storage_bytes: Option<usize>,
161
162    /// The form of compression to use in data batches.
163    ///
164    /// Compression has a CPU cost but it can take better advantage of limited
165    /// NVMe and network bandwidth, which means that it can increase overall
166    /// performance.
167    pub compression: StorageCompression,
168
169    /// The maximum size of the in-memory storage cache, in MiB.
170    ///
171    /// If set, the specified cache size is spread across all the foreground and
172    /// background threads. If unset, each foreground or background thread cache
173    /// is limited to 256 MiB.
174    pub cache_mib: Option<usize>,
175}
176
177/// Backend storage configuration.
178#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
179#[serde(tag = "name", content = "config", rename_all = "snake_case")]
180pub enum StorageBackendConfig {
181    /// Use the default storage configuration.
182    ///
183    /// This currently uses the local file system.
184    #[default]
185    Default,
186
187    /// Object storage.
188    Object(ObjectStorageConfig),
189}
190
191impl Display for StorageBackendConfig {
192    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193        match self {
194            StorageBackendConfig::Default => write!(f, "default"),
195            StorageBackendConfig::Object(_) => write!(f, "object"),
196        }
197    }
198}
199
200/// Storage compression algorithm.
201#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
202#[serde(rename_all = "snake_case")]
203pub enum StorageCompression {
204    /// Use Feldera's default compression algorithm.
205    ///
206    /// The default may change as Feldera's performance is tuned and new
207    /// algorithms are introduced.
208    #[default]
209    Default,
210
211    /// Do not compress.
212    None,
213
214    /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
215    Snappy,
216}
217
218#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
219pub struct ObjectStorageConfig {
220    /// URL.
221    ///
222    /// The following URL schemes are supported:
223    ///
224    /// * S3:
225    ///   - `s3://<bucket>/<path>`
226    ///   - `s3a://<bucket>/<path>`
227    ///   - `https://s3.<region>.amazonaws.com/<bucket>`
228    ///   - `https://<bucket>.s3.<region>.amazonaws.com`
229    ///   - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
230    /// * Google Cloud Storage:
231    ///   - `gs://<bucket>/<path>`
232    /// * Microsoft Azure Blob Storage:
233    ///   - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
234    ///   - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
235    ///   - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
236    ///   - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
237    ///   - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
238    ///   - `azure://<container>/<path>` (custom)
239    ///   - `https://<account>.dfs.core.windows.net`
240    ///   - `https://<account>.blob.core.windows.net`
241    ///   - `https://<account>.blob.core.windows.net/<container>`
242    ///   - `https://<account>.dfs.fabric.microsoft.com`
243    ///   - `https://<account>.dfs.fabric.microsoft.com/<container>`
244    ///   - `https://<account>.blob.fabric.microsoft.com`
245    ///   - `https://<account>.blob.fabric.microsoft.com/<container>`
246    ///
247    /// Settings derived from the URL will override other settings.
248    pub url: String,
249
250    /// Additional options as key-value pairs.
251    ///
252    /// The following keys are supported:
253    ///
254    /// * S3:
255    ///   - `access_key_id`: AWS Access Key.
256    ///   - `secret_access_key`: AWS Secret Access Key.
257    ///   - `region`: Region.
258    ///   - `default_region`: Default region.
259    ///   - `endpoint`: Custom endpoint for communicating with S3,
260    ///     e.g. `https://localhost:4566` for testing against a localstack
261    ///     instance.
262    ///   - `token`: Token to use for requests (passed to underlying provider).
263    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
264    /// * Google Cloud Storage:
265    ///   - `service_account`: Path to the service account file.
266    ///   - `service_account_key`: The serialized service account key.
267    ///   - `google_application_credentials`: Application credentials path.
268    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
269    /// * Microsoft Azure Blob Storage:
270    ///   - `access_key`: Azure Access Key.
271    ///   - `container_name`: Azure Container Name.
272    ///   - `account`: Azure Account.
273    ///   - `bearer_token_authorization`: Static bearer token for authorizing requests.
274    ///   - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
275    ///   - `client_secret`: Client secret for use in client secret flow.
276    ///   - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
277    ///   - `endpoint`: Override the endpoint for communicating with blob storage.
278    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
279    ///
280    /// Options set through the URL take precedence over those set with these
281    /// options.
282    #[serde(flatten)]
283    pub other_options: BTreeMap<String, String>,
284}
285
286/// Global pipeline configuration settings. This is the publicly
287/// exposed type for users to configure pipelines.
288#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
289#[serde(default)]
290pub struct RuntimeConfig {
291    /// Number of DBSP worker threads.
292    ///
293    /// Each DBSP "foreground" worker thread is paired with a "background"
294    /// thread for LSM merging, making the total number of threads twice the
295    /// specified number.
296    pub workers: u16,
297
298    /// Storage configuration.
299    ///
300    /// - If this is `None`, the default, the pipeline's state is kept in
301    ///   in-memory data-structures.  This is useful if the pipeline's state
302    ///   will fit in memory and if the pipeline is ephemeral and does not need
303    ///   to be recovered after a restart. The pipeline will most likely run
304    ///   faster since it does not need to access storage.
305    ///
306    /// - If set, the pipeline's state is kept on storage.  This allows the
307    ///   pipeline to work with state that will not fit into memory. It also
308    ///   allows the state to be checkpointed and recovered across restarts.
309    #[serde(deserialize_with = "deserialize_storage_options")]
310    pub storage: Option<StorageOptions>,
311
312    /// Configures fault tolerance with the specified start up behavior. Fault
313    /// tolerance is disabled if this or `storage` is `None`.
314    #[serde(deserialize_with = "deserialize_fault_tolerance")]
315    pub fault_tolerance: Option<FtConfig>,
316
317    /// Enable CPU profiler.
318    ///
319    /// The default value is `true`.
320    pub cpu_profiler: bool,
321
322    /// Enable pipeline tracing.
323    pub tracing: bool,
324
325    /// Jaeger tracing endpoint to send tracing information to.
326    pub tracing_endpoint_jaeger: String,
327
328    /// Minimal input batch size.
329    ///
330    /// The controller delays pushing input records to the circuit until at
331    /// least `min_batch_size_records` records have been received (total
332    /// across all endpoints) or `max_buffering_delay_usecs` microseconds
333    /// have passed since at least one input records has been buffered.
334    /// Defaults to 0.
335    pub min_batch_size_records: u64,
336
337    /// Maximal delay in microseconds to wait for `min_batch_size_records` to
338    /// get buffered by the controller, defaults to 0.
339    pub max_buffering_delay_usecs: u64,
340
341    /// Resource reservations and limits. This is enforced
342    /// only in Feldera Cloud.
343    pub resources: ResourceConfig,
344
345    /// Real-time clock resolution in microseconds.
346    ///
347    /// This parameter controls the execution of queries that use the `NOW()` function.  The output of such
348    /// queries depends on the real-time clock and can change over time without any external
349    /// inputs.  The pipeline will update the clock value and trigger incremental recomputation
350    /// at most each `clock_resolution_usecs` microseconds.
351    ///
352    /// It is set to 100 milliseconds (100,000 microseconds) by default.
353    ///
354    /// Set to `null` to disable periodic clock updates.
355    pub clock_resolution_usecs: Option<u64>,
356
357    /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
358    /// its worker threads.  Specify at least twice as many CPU numbers as
359    /// workers.  CPUs are generally numbered starting from 0.  The pipeline
360    /// might not be able to honor CPU pinning requests.
361    ///
362    /// CPU pinning can make pipelines run faster and perform more consistently,
363    /// as long as different pipelines running on the same machine are pinned to
364    /// different CPUs.
365    pub pin_cpus: Vec<usize>,
366
367    /// Timeout in seconds for the `Provisioning` phase of the pipeline.
368    /// Setting this value will override the default of the runner.
369    pub provisioning_timeout_secs: Option<u64>,
370
371    /// The maximum number of connectors initialized in parallel during pipeline
372    /// startup.
373    ///
374    /// At startup, the pipeline must initialize all of its input and output connectors.
375    /// Depending on the number and types of connectors, this can take a long time.
376    /// To accelerate the process, multiple connectors are initialized concurrently.
377    /// This option controls the maximum number of connectors that can be intitialized
378    /// in parallel.
379    ///
380    /// The default is 10.
381    pub max_parallel_connector_init: Option<u64>,
382}
383
384/// Accepts "true" and "false" and converts them to the new format.
385fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
386where
387    D: Deserializer<'de>,
388{
389    struct BoolOrStruct;
390
391    impl<'de> Visitor<'de> for BoolOrStruct {
392        type Value = Option<StorageOptions>;
393
394        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
395            formatter.write_str("boolean or StorageOptions")
396        }
397
398        fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
399        where
400            E: de::Error,
401        {
402            match v {
403                false => Ok(None),
404                true => Ok(Some(StorageOptions::default())),
405            }
406        }
407
408        fn visit_unit<E>(self) -> Result<Self::Value, E>
409        where
410            E: de::Error,
411        {
412            Ok(None)
413        }
414
415        fn visit_none<E>(self) -> Result<Self::Value, E>
416        where
417            E: de::Error,
418        {
419            Ok(None)
420        }
421
422        fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
423        where
424            M: MapAccess<'de>,
425        {
426            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
427        }
428    }
429
430    deserializer.deserialize_any(BoolOrStruct)
431}
432
433/// Accepts old 'initial_state' and 'latest_checkpoint' and converts them to the
434/// new format.
435fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<Option<FtConfig>, D::Error>
436where
437    D: Deserializer<'de>,
438{
439    struct StringOrStruct;
440
441    impl<'de> Visitor<'de> for StringOrStruct {
442        type Value = Option<FtConfig>;
443
444        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
445            formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
446        }
447
448        fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
449        where
450            E: de::Error,
451        {
452            match v {
453                "initial_state" | "latest_checkpoint" => Ok(Some(FtConfig::default())),
454                _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
455            }
456        }
457
458        fn visit_unit<E>(self) -> Result<Self::Value, E>
459        where
460            E: de::Error,
461        {
462            Ok(None)
463        }
464
465        fn visit_none<E>(self) -> Result<Self::Value, E>
466        where
467            E: de::Error,
468        {
469            Ok(None)
470        }
471
472        fn visit_map<M>(self, map: M) -> Result<Option<FtConfig>, M::Error>
473        where
474            M: MapAccess<'de>,
475        {
476            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
477        }
478    }
479
480    deserializer.deserialize_any(StringOrStruct)
481}
482
483impl Default for RuntimeConfig {
484    fn default() -> Self {
485        Self {
486            workers: 8,
487            storage: None,
488            fault_tolerance: None,
489            cpu_profiler: true,
490            tracing: {
491                // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
492                // to keep it on by default.
493                false
494            },
495            tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
496            min_batch_size_records: 0,
497            max_buffering_delay_usecs: 0,
498            resources: ResourceConfig::default(),
499            clock_resolution_usecs: {
500                // Every 100 ms.
501                Some(100_000)
502            },
503            pin_cpus: Vec::new(),
504            provisioning_timeout_secs: None,
505            max_parallel_connector_init: None,
506        }
507    }
508}
509
510/// Fault-tolerance configuration for runtime startup.
511#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
512#[serde(default)]
513#[serde(rename_all = "snake_case")]
514pub struct FtConfig {
515    /// Interval between automatic checkpoints, in seconds.
516    ///
517    /// The default is 60 seconds.  A value of 0 disables automatic
518    /// checkpointing.
519    pub checkpoint_interval_secs: u64,
520}
521
522impl Default for FtConfig {
523    fn default() -> Self {
524        Self {
525            checkpoint_interval_secs: 60,
526        }
527    }
528}
529
530/// Describes an input connector configuration
531#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
532pub struct InputEndpointConfig {
533    /// The name of the input stream of the circuit that this endpoint is
534    /// connected to.
535    pub stream: Cow<'static, str>,
536
537    /// Connector configuration.
538    #[serde(flatten)]
539    pub connector_config: ConnectorConfig,
540}
541
542/// Deserialize the `start_after` property of a connector configuration.
543/// It requires a non-standard deserialization because we want to accept
544/// either a string or an array of strings.
545fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
546where
547    D: Deserializer<'de>,
548{
549    let value = Option::<JsonValue>::deserialize(deserializer)?;
550    match value {
551        Some(JsonValue::String(s)) => Ok(Some(vec![s])),
552        Some(JsonValue::Array(arr)) => {
553            let vec = arr
554                .into_iter()
555                .map(|item| {
556                    item.as_str()
557                        .map(|s| s.to_string())
558                        .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
559                })
560                .collect::<Result<Vec<String>, _>>()?;
561            Ok(Some(vec))
562        }
563        Some(JsonValue::Null) | None => Ok(None),
564        _ => Err(serde::de::Error::custom(
565            "invalid 'start_after' property: expected a string, an array of strings, or null",
566        )),
567    }
568}
569
570/// A data connector's configuration
571#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
572pub struct ConnectorConfig {
573    /// Transport endpoint configuration.
574    pub transport: TransportConfig,
575
576    /// Parser configuration.
577    pub format: Option<FormatConfig>,
578
579    /// Name of the index that the connector is attached to.
580    ///
581    /// This property is valid for output connectors only.  It is used with data
582    /// transports and formats that expect output updates in the form of key/value
583    /// pairs, where the key typically represents a unique id associated with the
584    /// table or view.
585    ///
586    /// To support such output formats, an output connector can be attached to an
587    /// index created using the SQL CREATE INDEX statement.  An index of a table
588    /// or view contains the same updates as the table or view itself, indexed by
589    /// one or more key columns.
590    ///
591    /// See individual connector documentation for details on how they work
592    /// with indexes.
593    pub index: Option<String>,
594
595    /// Output buffer configuration.
596    #[serde(flatten)]
597    pub output_buffer_config: OutputBufferConfig,
598
599    /// Maximum batch size, in records.
600    ///
601    /// This is the maximum number of records to process in one batch through
602    /// the circuit.  The time and space cost of processing a batch is
603    /// asymptotically superlinear in the size of the batch, but very small
604    /// batches are less efficient due to constant factors.
605    ///
606    /// This should usually be less than `max_queued_records`, to give the
607    /// connector a round-trip time to restart and refill the buffer while
608    /// batches are being processed.
609    ///
610    /// Some input adapters might not honor this setting.
611    ///
612    /// The default is 10,000.
613    #[serde(default = "default_max_batch_size")]
614    pub max_batch_size: u64,
615
616    /// Backpressure threshold.
617    ///
618    /// Maximal number of records queued by the endpoint before the endpoint
619    /// is paused by the backpressure mechanism.
620    ///
621    /// For input endpoints, this setting bounds the number of records that have
622    /// been received from the input transport but haven't yet been consumed by
623    /// the circuit since the circuit, since the circuit is still busy processing
624    /// previous inputs.
625    ///
626    /// For output endpoints, this setting bounds the number of records that have
627    /// been produced by the circuit but not yet sent via the output transport endpoint
628    /// nor stored in the output buffer (see `enable_output_buffer`).
629    ///
630    /// Note that this is not a hard bound: there can be a small delay between
631    /// the backpressure mechanism is triggered and the endpoint is paused, during
632    /// which more data may be queued.
633    ///
634    /// The default is 1 million.
635    #[serde(default = "default_max_queued_records")]
636    pub max_queued_records: u64,
637
638    /// Create connector in paused state.
639    ///
640    /// The default is `false`.
641    #[serde(default)]
642    pub paused: bool,
643
644    /// Arbitrary user-defined text labels associated with the connector.
645    ///
646    /// These labels can be used in conjunction with the `start_after` property
647    /// to control the start order of connectors.
648    #[serde(default)]
649    pub labels: Vec<String>,
650
651    /// Start the connector after all connectors with specified labels.
652    ///
653    /// This property is used to control the start order of connectors.
654    /// The connector will not start until all connectors with the specified
655    /// labels have finished processing all inputs.
656    #[serde(deserialize_with = "deserialize_start_after")]
657    #[serde(default)]
658    pub start_after: Option<Vec<String>>,
659}
660
661#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
662#[serde(default)]
663pub struct OutputBufferConfig {
664    /// Enable output buffering.
665    ///
666    /// The output buffering mechanism allows decoupling the rate at which the pipeline
667    /// pushes changes to the output transport from the rate of input changes.
668    ///
669    /// By default, output updates produced by the pipeline are pushed directly to
670    /// the output transport. Some destinations may prefer to receive updates in fewer
671    /// bigger batches. For instance, when writing Parquet files, producing
672    /// one bigger file every few minutes is usually better than creating
673    /// small files every few milliseconds.
674    ///
675    /// To achieve such input/output decoupling, users can enable output buffering by
676    /// setting the `enable_output_buffer` flag to `true`.  When buffering is enabled, output
677    /// updates produced by the pipeline are consolidated in an internal buffer and are
678    /// pushed to the output transport when one of several conditions is satisfied:
679    ///
680    /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
681    ///   milliseconds.
682    /// * buffer size exceeds `max_output_buffer_size_records` records.
683    ///
684    /// This flag is `false` by default.
685    // TODO: on-demand output triggered via the API.
686    pub enable_output_buffer: bool,
687
688    /// Maximum time in milliseconds data is kept in the output buffer.
689    ///
690    /// By default, data is kept in the buffer indefinitely until one of
691    /// the other output conditions is satisfied.  When this option is
692    /// set the buffer will be flushed at most every
693    /// `max_output_buffer_time_millis` milliseconds.
694    ///
695    /// NOTE: this configuration option requires the `enable_output_buffer` flag
696    /// to be set.
697    pub max_output_buffer_time_millis: usize,
698
699    /// Maximum number of updates to be kept in the output buffer.
700    ///
701    /// This parameter bounds the maximal size of the buffer.
702    /// Note that the size of the buffer is not always equal to the
703    /// total number of updates output by the pipeline. Updates to the
704    /// same record can overwrite or cancel previous updates.
705    ///
706    /// By default, the buffer can grow indefinitely until one of
707    /// the other output conditions is satisfied.
708    ///
709    /// NOTE: this configuration option requires the `enable_output_buffer` flag
710    /// to be set.
711    pub max_output_buffer_size_records: usize,
712}
713
714impl Default for OutputBufferConfig {
715    fn default() -> Self {
716        Self {
717            enable_output_buffer: false,
718            max_output_buffer_size_records: usize::MAX,
719            max_output_buffer_time_millis: usize::MAX,
720        }
721    }
722}
723
724impl OutputBufferConfig {
725    pub fn validate(&self) -> Result<(), String> {
726        if self.enable_output_buffer
727            && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
728            && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
729        {
730            return Err(
731                "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
732                    .to_string(),
733            );
734        }
735
736        Ok(())
737    }
738}
739
740/// Describes an output connector configuration
741#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
742pub struct OutputEndpointConfig {
743    /// The name of the output stream of the circuit that this endpoint is
744    /// connected to.
745    pub stream: Cow<'static, str>,
746
747    /// Connector configuration.
748    #[serde(flatten)]
749    pub connector_config: ConnectorConfig,
750}
751
752/// Transport-specific endpoint configuration passed to
753/// `crate::OutputTransport::new_endpoint`
754/// and `crate::InputTransport::new_endpoint`.
755#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
756#[serde(tag = "name", content = "config", rename_all = "snake_case")]
757pub enum TransportConfig {
758    FileInput(FileInputConfig),
759    FileOutput(FileOutputConfig),
760    KafkaInput(KafkaInputConfig),
761    KafkaOutput(KafkaOutputConfig),
762    PubSubInput(PubSubInputConfig),
763    UrlInput(UrlInputConfig),
764    S3Input(S3InputConfig),
765    DeltaTableInput(DeltaTableReaderConfig),
766    DeltaTableOutput(DeltaTableWriterConfig),
767    RedisOutput(RedisOutputConfig),
768    // Prevent rust from complaining about large size difference between enum variants.
769    IcebergInput(Box<IcebergReaderConfig>),
770    PostgresInput(PostgresReaderConfig),
771    Datagen(DatagenInputConfig),
772    Nexmark(NexmarkInputConfig),
773    /// Direct HTTP input: cannot be instantiated through API
774    HttpInput(HttpInputConfig),
775    /// Direct HTTP output: cannot be instantiated through API
776    HttpOutput,
777    /// Ad hoc input: cannot be instantiated through API
778    AdHocInput(AdHocInputConfig),
779}
780
781impl TransportConfig {
782    pub fn name(&self) -> String {
783        match self {
784            TransportConfig::FileInput(_) => "file_input".to_string(),
785            TransportConfig::FileOutput(_) => "file_output".to_string(),
786            TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
787            TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
788            TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
789            TransportConfig::UrlInput(_) => "url_input".to_string(),
790            TransportConfig::S3Input(_) => "s3_input".to_string(),
791            TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
792            TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
793            TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
794            TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
795            TransportConfig::Datagen(_) => "datagen".to_string(),
796            TransportConfig::Nexmark(_) => "nexmark".to_string(),
797            TransportConfig::HttpInput(_) => "http_input".to_string(),
798            TransportConfig::HttpOutput => "http_output".to_string(),
799            TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
800            TransportConfig::RedisOutput(_) => "redis_output".to_string(),
801        }
802    }
803}
804
805/// Data format specification used to parse raw data received from the
806/// endpoint or to encode data sent to the endpoint.
807#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
808pub struct FormatConfig {
809    /// Format name, e.g., "csv", "json", "bincode", etc.
810    pub name: Cow<'static, str>,
811
812    /// Format-specific parser or encoder configuration.
813    #[serde(default)]
814    #[schema(value_type = Object)]
815    pub config: YamlValue,
816}
817
818#[derive(Debug, Clone, Eq, PartialEq, Default, Serialize, Deserialize, ToSchema)]
819#[serde(default)]
820pub struct ResourceConfig {
821    /// The minimum number of CPU cores to reserve
822    /// for an instance of this pipeline
823    pub cpu_cores_min: Option<u64>,
824
825    /// The maximum number of CPU cores to reserve
826    /// for an instance of this pipeline
827    pub cpu_cores_max: Option<u64>,
828
829    /// The minimum memory in Megabytes to reserve
830    /// for an instance of this pipeline
831    pub memory_mb_min: Option<u64>,
832
833    /// The maximum memory in Megabytes to reserve
834    /// for an instance of this pipeline
835    pub memory_mb_max: Option<u64>,
836
837    /// The total storage in Megabytes to reserve
838    /// for an instance of this pipeline
839    pub storage_mb_max: Option<u64>,
840
841    /// Storage class to use for an instance of this pipeline.
842    /// The class determines storage performance such as IOPS and throughput.
843    pub storage_class: Option<String>,
844}