feldera_types/
config.rs

1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure.  The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs.  We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::transport::adhoc::AdHocInputConfig;
9use crate::transport::datagen::DatagenInputConfig;
10use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
11use crate::transport::file::{FileInputConfig, FileOutputConfig};
12use crate::transport::http::HttpInputConfig;
13use crate::transport::iceberg::IcebergReaderConfig;
14use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
15use crate::transport::nexmark::NexmarkInputConfig;
16use crate::transport::postgres::PostgresReaderConfig;
17use crate::transport::pubsub::PubSubInputConfig;
18use crate::transport::redis::RedisOutputConfig;
19use crate::transport::s3::S3InputConfig;
20use crate::transport::url::UrlInputConfig;
21use core::fmt;
22use serde::de::{self, MapAccess, Visitor};
23use serde::{Deserialize, Deserializer, Serialize};
24use serde_json::Value as JsonValue;
25use serde_yaml::Value as YamlValue;
26use std::fmt::Display;
27use std::path::Path;
28use std::{borrow::Cow, collections::BTreeMap};
29use utoipa::ToSchema;
30
31/// Default value of `ConnectorConfig::max_queued_records`.
32pub const fn default_max_queued_records() -> u64 {
33    1_000_000
34}
35
36/// Default maximum batch size for connectors, in records.
37///
38/// If you change this then update the comment on
39/// [ConnectorConfig::max_batch_size].
40pub const fn default_max_batch_size() -> u64 {
41    10_000
42}
43
44/// Pipeline deployment configuration.
45/// It represents configuration entries directly provided by the user
46/// (e.g., runtime configuration) and entries derived from the schema
47/// of the compiled program (e.g., connectors). Storage configuration,
48/// if applicable, is set by the runner.
49#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
50pub struct PipelineConfig {
51    /// Global controller configuration.
52    #[serde(flatten)]
53    #[schema(inline)]
54    pub global: RuntimeConfig,
55
56    /// Pipeline name.
57    pub name: Option<String>,
58
59    /// Configuration for persistent storage
60    ///
61    /// If `global.storage` is `true`, this field must be set to some
62    /// [`StorageConfig`].  If `global.storage` is `false`, the pipeline ignores
63    /// this field.
64    #[serde(default)]
65    pub storage_config: Option<StorageConfig>,
66
67    /// Input endpoint configuration.
68    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
69
70    /// Output endpoint configuration.
71    #[serde(default)]
72    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
73}
74
75/// Configuration for persistent storage in a [`PipelineConfig`].
76#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
77pub struct StorageConfig {
78    /// A directory to keep pipeline state, as a path on the filesystem of the
79    /// machine or container where the pipeline will run.
80    ///
81    /// When storage is enabled, this directory stores the data for
82    /// [StorageBackendConfig::Default].
83    ///
84    /// When fault tolerance is enabled, this directory stores checkpoints and
85    /// the log.
86    pub path: String,
87
88    /// How to cache access to storage in this pipeline.
89    #[serde(default)]
90    pub cache: StorageCacheConfig,
91}
92
93impl StorageConfig {
94    pub fn path(&self) -> &Path {
95        Path::new(&self.path)
96    }
97}
98
99/// How to cache access to storage within a Feldera pipeline.
100#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
101#[serde(rename_all = "snake_case")]
102pub enum StorageCacheConfig {
103    /// Use the operating system's page cache as the primary storage cache.
104    ///
105    /// This is the default because it currently performs better than
106    /// `FelderaCache`.
107    #[default]
108    PageCache,
109
110    /// Use Feldera's internal cache implementation.
111    ///
112    /// This is under development. It will become the default when its
113    /// performance exceeds that of `PageCache`.
114    FelderaCache,
115}
116
117impl StorageCacheConfig {
118    #[cfg(unix)]
119    pub fn to_custom_open_flags(&self) -> i32 {
120        match self {
121            StorageCacheConfig::PageCache => (),
122            StorageCacheConfig::FelderaCache => {
123                #[cfg(target_os = "linux")]
124                return libc::O_DIRECT;
125            }
126        }
127        0
128    }
129}
130
131/// Storage configuration for a pipeline.
132#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
133#[serde(default)]
134pub struct StorageOptions {
135    /// How to connect to the underlying storage.
136    pub backend: StorageBackendConfig,
137
138    /// The minimum estimated number of bytes in a batch of data to write it to
139    /// storage.  This is provided for debugging and fine-tuning and should
140    /// ordinarily be left unset.
141    ///
142    /// A value of 0 will write even empty batches to storage, and nonzero
143    /// values provide a threshold.  `usize::MAX` would effectively disable
144    /// storage.
145    ///
146    /// The default is 1,048,576 (1 MiB).
147    pub min_storage_bytes: Option<usize>,
148
149    /// The form of compression to use in data batches.
150    ///
151    /// Compression has a CPU cost but it can take better advantage of limited
152    /// NVMe and network bandwidth, which means that it can increase overall
153    /// performance.
154    pub compression: StorageCompression,
155
156    /// The maximum size of the in-memory storage cache, in MiB.
157    ///
158    /// If set, the specified cache size is spread across all the foreground and
159    /// background threads. If unset, each foreground or background thread cache
160    /// is limited to 256 MiB.
161    pub cache_mib: Option<usize>,
162}
163
164/// Backend storage configuration.
165#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
166#[serde(tag = "name", content = "config", rename_all = "snake_case")]
167pub enum StorageBackendConfig {
168    /// Use the default storage configuration.
169    ///
170    /// This currently uses the local file system.
171    #[default]
172    Default,
173
174    /// Object storage.
175    Object(ObjectStorageConfig),
176}
177
178impl Display for StorageBackendConfig {
179    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180        match self {
181            StorageBackendConfig::Default => write!(f, "default"),
182            StorageBackendConfig::Object(_) => write!(f, "object"),
183        }
184    }
185}
186
187/// Storage compression algorithm.
188#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
189#[serde(rename_all = "snake_case")]
190pub enum StorageCompression {
191    /// Use Feldera's default compression algorithm.
192    ///
193    /// The default may change as Feldera's performance is tuned and new
194    /// algorithms are introduced.
195    #[default]
196    Default,
197
198    /// Do not compress.
199    None,
200
201    /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
202    Snappy,
203}
204
205#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
206pub struct ObjectStorageConfig {
207    /// URL.
208    ///
209    /// The following URL schemes are supported:
210    ///
211    /// * S3:
212    ///   - `s3://<bucket>/<path>`
213    ///   - `s3a://<bucket>/<path>`
214    ///   - `https://s3.<region>.amazonaws.com/<bucket>`
215    ///   - `https://<bucket>.s3.<region>.amazonaws.com`
216    ///   - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
217    /// * Google Cloud Storage:
218    ///   - `gs://<bucket>/<path>`
219    /// * Microsoft Azure Blob Storage:
220    ///   - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
221    ///   - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
222    ///   - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
223    ///   - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
224    ///   - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
225    ///   - `azure://<container>/<path>` (custom)
226    ///   - `https://<account>.dfs.core.windows.net`
227    ///   - `https://<account>.blob.core.windows.net`
228    ///   - `https://<account>.blob.core.windows.net/<container>`
229    ///   - `https://<account>.dfs.fabric.microsoft.com`
230    ///   - `https://<account>.dfs.fabric.microsoft.com/<container>`
231    ///   - `https://<account>.blob.fabric.microsoft.com`
232    ///   - `https://<account>.blob.fabric.microsoft.com/<container>`
233    ///
234    /// Settings derived from the URL will override other settings.
235    pub url: String,
236
237    /// Additional options as key-value pairs.
238    ///
239    /// The following keys are supported:
240    ///
241    /// * S3:
242    ///   - `access_key_id`: AWS Access Key.
243    ///   - `secret_access_key`: AWS Secret Access Key.
244    ///   - `region`: Region.
245    ///   - `default_region`: Default region.
246    ///   - `endpoint`: Custom endpoint for communicating with S3,
247    ///     e.g. `https://localhost:4566` for testing against a localstack
248    ///     instance.
249    ///   - `token`: Token to use for requests (passed to underlying provider).
250    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
251    /// * Google Cloud Storage:
252    ///   - `service_account`: Path to the service account file.
253    ///   - `service_account_key`: The serialized service account key.
254    ///   - `google_application_credentials`: Application credentials path.
255    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
256    /// * Microsoft Azure Blob Storage:
257    ///   - `access_key`: Azure Access Key.
258    ///   - `container_name`: Azure Container Name.
259    ///   - `account`: Azure Account.
260    ///   - `bearer_token_authorization`: Static bearer token for authorizing requests.
261    ///   - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
262    ///   - `client_secret`: Client secret for use in client secret flow.
263    ///   - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
264    ///   - `endpoint`: Override the endpoint for communicating with blob storage.
265    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
266    ///
267    /// Options set through the URL take precedence over those set with these
268    /// options.
269    #[serde(flatten)]
270    pub other_options: BTreeMap<String, String>,
271}
272
273/// Global pipeline configuration settings. This is the publicly
274/// exposed type for users to configure pipelines.
275#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
276#[serde(default)]
277pub struct RuntimeConfig {
278    /// Number of DBSP worker threads.
279    ///
280    /// Each DBSP "foreground" worker thread is paired with a "background"
281    /// thread for LSM merging, making the total number of threads twice the
282    /// specified number.
283    pub workers: u16,
284
285    /// Storage configuration.
286    ///
287    /// - If this is `None`, the default, the pipeline's state is kept in
288    ///   in-memory data-structures.  This is useful if the pipeline's state
289    ///   will fit in memory and if the pipeline is ephemeral and does not need
290    ///   to be recovered after a restart. The pipeline will most likely run
291    ///   faster since it does not need to access storage.
292    ///
293    /// - If set, the pipeline's state is kept on storage.  This allows the
294    ///   pipeline to work with state that will not fit into memory. It also
295    ///   allows the state to be checkpointed and recovered across restarts.
296    #[serde(deserialize_with = "deserialize_storage_options")]
297    pub storage: Option<StorageOptions>,
298
299    /// Configures fault tolerance with the specified start up behavior. Fault
300    /// tolerance is disabled if this or `storage` is `None`.
301    #[serde(deserialize_with = "deserialize_fault_tolerance")]
302    pub fault_tolerance: Option<FtConfig>,
303
304    /// Enable CPU profiler.
305    ///
306    /// The default value is `true`.
307    pub cpu_profiler: bool,
308
309    /// Enable pipeline tracing.
310    pub tracing: bool,
311
312    /// Jaeger tracing endpoint to send tracing information to.
313    pub tracing_endpoint_jaeger: String,
314
315    /// Minimal input batch size.
316    ///
317    /// The controller delays pushing input records to the circuit until at
318    /// least `min_batch_size_records` records have been received (total
319    /// across all endpoints) or `max_buffering_delay_usecs` microseconds
320    /// have passed since at least one input records has been buffered.
321    /// Defaults to 0.
322    pub min_batch_size_records: u64,
323
324    /// Maximal delay in microseconds to wait for `min_batch_size_records` to
325    /// get buffered by the controller, defaults to 0.
326    pub max_buffering_delay_usecs: u64,
327
328    /// Resource reservations and limits. This is enforced
329    /// only in Feldera Cloud.
330    pub resources: ResourceConfig,
331
332    /// Real-time clock resolution in microseconds.
333    ///
334    /// This parameter controls the execution of queries that use the `NOW()` function.  The output of such
335    /// queries depends on the real-time clock and can change over time without any external
336    /// inputs.  The pipeline will update the clock value and trigger incremental recomputation
337    /// at most each `clock_resolution_usecs` microseconds.
338    ///
339    /// It is set to 100 milliseconds (100,000 microseconds) by default.
340    ///
341    /// Set to `null` to disable periodic clock updates.
342    pub clock_resolution_usecs: Option<u64>,
343
344    /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
345    /// its worker threads.  Specify at least twice as many CPU numbers as
346    /// workers.  CPUs are generally numbered starting from 0.  The pipeline
347    /// might not be able to honor CPU pinning requests.
348    ///
349    /// CPU pinning can make pipelines run faster and perform more consistently,
350    /// as long as different pipelines running on the same machine are pinned to
351    /// different CPUs.
352    pub pin_cpus: Vec<usize>,
353
354    /// Timeout in seconds for the `Provisioning` phase of the pipeline.
355    /// Setting this value will override the default of the runner.
356    pub provisioning_timeout_secs: Option<u64>,
357}
358
359/// Accepts "true" and "false" and converts them to the new format.
360fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
361where
362    D: Deserializer<'de>,
363{
364    struct BoolOrStruct;
365
366    impl<'de> Visitor<'de> for BoolOrStruct {
367        type Value = Option<StorageOptions>;
368
369        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
370            formatter.write_str("boolean or StorageOptions")
371        }
372
373        fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
374        where
375            E: de::Error,
376        {
377            match v {
378                false => Ok(None),
379                true => Ok(Some(StorageOptions::default())),
380            }
381        }
382
383        fn visit_unit<E>(self) -> Result<Self::Value, E>
384        where
385            E: de::Error,
386        {
387            Ok(None)
388        }
389
390        fn visit_none<E>(self) -> Result<Self::Value, E>
391        where
392            E: de::Error,
393        {
394            Ok(None)
395        }
396
397        fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
398        where
399            M: MapAccess<'de>,
400        {
401            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
402        }
403    }
404
405    deserializer.deserialize_any(BoolOrStruct)
406}
407
408/// Accepts old 'initial_state' and 'latest_checkpoint' and converts them to the
409/// new format.
410fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<Option<FtConfig>, D::Error>
411where
412    D: Deserializer<'de>,
413{
414    struct StringOrStruct;
415
416    impl<'de> Visitor<'de> for StringOrStruct {
417        type Value = Option<FtConfig>;
418
419        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
420            formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
421        }
422
423        fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
424        where
425            E: de::Error,
426        {
427            match v {
428                "initial_state" | "latest_checkpoint" => Ok(Some(FtConfig::default())),
429                _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
430            }
431        }
432
433        fn visit_unit<E>(self) -> Result<Self::Value, E>
434        where
435            E: de::Error,
436        {
437            Ok(None)
438        }
439
440        fn visit_none<E>(self) -> Result<Self::Value, E>
441        where
442            E: de::Error,
443        {
444            Ok(None)
445        }
446
447        fn visit_map<M>(self, map: M) -> Result<Option<FtConfig>, M::Error>
448        where
449            M: MapAccess<'de>,
450        {
451            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
452        }
453    }
454
455    deserializer.deserialize_any(StringOrStruct)
456}
457
458impl Default for RuntimeConfig {
459    fn default() -> Self {
460        Self {
461            workers: 8,
462            storage: None,
463            fault_tolerance: None,
464            cpu_profiler: true,
465            tracing: {
466                // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
467                // to keep it on by default.
468                false
469            },
470            tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
471            min_batch_size_records: 0,
472            max_buffering_delay_usecs: 0,
473            resources: ResourceConfig::default(),
474            clock_resolution_usecs: {
475                // Every 100 ms.
476                Some(100_000)
477            },
478            pin_cpus: Vec::new(),
479            provisioning_timeout_secs: None,
480        }
481    }
482}
483
484/// Fault-tolerance configuration for runtime startup.
485#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
486#[serde(default)]
487#[serde(rename_all = "snake_case")]
488pub struct FtConfig {
489    /// Interval between automatic checkpoints, in seconds.
490    ///
491    /// The default is 60 seconds.  A value of 0 disables automatic
492    /// checkpointing.
493    pub checkpoint_interval_secs: u64,
494}
495
496impl Default for FtConfig {
497    fn default() -> Self {
498        Self {
499            checkpoint_interval_secs: 60,
500        }
501    }
502}
503
504/// Describes an input connector configuration
505#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
506pub struct InputEndpointConfig {
507    /// The name of the input stream of the circuit that this endpoint is
508    /// connected to.
509    pub stream: Cow<'static, str>,
510
511    /// Connector configuration.
512    #[serde(flatten)]
513    pub connector_config: ConnectorConfig,
514}
515
516/// Deserialize the `start_after` property of a connector configuration.
517/// It requires a non-standard deserialization because we want to accept
518/// either a string or an array of strings.
519fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
520where
521    D: Deserializer<'de>,
522{
523    let value = Option::<JsonValue>::deserialize(deserializer)?;
524    match value {
525        Some(JsonValue::String(s)) => Ok(Some(vec![s])),
526        Some(JsonValue::Array(arr)) => {
527            let vec = arr
528                .into_iter()
529                .map(|item| {
530                    item.as_str()
531                        .map(|s| s.to_string())
532                        .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
533                })
534                .collect::<Result<Vec<String>, _>>()?;
535            Ok(Some(vec))
536        }
537        Some(JsonValue::Null) | None => Ok(None),
538        _ => Err(serde::de::Error::custom(
539            "invalid 'start_after' property: expected a string, an array of strings, or null",
540        )),
541    }
542}
543
544/// A data connector's configuration
545#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
546pub struct ConnectorConfig {
547    /// Transport endpoint configuration.
548    pub transport: TransportConfig,
549
550    /// Parser configuration.
551    pub format: Option<FormatConfig>,
552
553    /// Name of the index that the connector is attached to.
554    ///
555    /// This property is valid for output connectors only.  It is used with data
556    /// transports and formats that expect output updates in the form of key/value
557    /// pairs, where the key typically represents a unique id associated with the
558    /// table or view.
559    ///
560    /// To support such output formats, an output connector can be attached to an
561    /// index created using the SQL CREATE INDEX statement.  An index of a table
562    /// or view contains the same updates as the table or view itself, indexed by
563    /// one or more key columns.
564    ///
565    /// See individual connector documentation for details on how they work
566    /// with indexes.
567    pub index: Option<String>,
568
569    /// Output buffer configuration.
570    #[serde(flatten)]
571    pub output_buffer_config: OutputBufferConfig,
572
573    /// Maximum batch size, in records.
574    ///
575    /// This is the maximum number of records to process in one batch through
576    /// the circuit.  The time and space cost of processing a batch is
577    /// asymptotically superlinear in the size of the batch, but very small
578    /// batches are less efficient due to constant factors.
579    ///
580    /// This should usually be less than `max_queued_records`, to give the
581    /// connector a round-trip time to restart and refill the buffer while
582    /// batches are being processed.
583    ///
584    /// Some input adapters might not honor this setting.
585    ///
586    /// The default is 10,000.
587    #[serde(default = "default_max_batch_size")]
588    pub max_batch_size: u64,
589
590    /// Backpressure threshold.
591    ///
592    /// Maximal number of records queued by the endpoint before the endpoint
593    /// is paused by the backpressure mechanism.
594    ///
595    /// For input endpoints, this setting bounds the number of records that have
596    /// been received from the input transport but haven't yet been consumed by
597    /// the circuit since the circuit, since the circuit is still busy processing
598    /// previous inputs.
599    ///
600    /// For output endpoints, this setting bounds the number of records that have
601    /// been produced by the circuit but not yet sent via the output transport endpoint
602    /// nor stored in the output buffer (see `enable_output_buffer`).
603    ///
604    /// Note that this is not a hard bound: there can be a small delay between
605    /// the backpressure mechanism is triggered and the endpoint is paused, during
606    /// which more data may be queued.
607    ///
608    /// The default is 1 million.
609    #[serde(default = "default_max_queued_records")]
610    pub max_queued_records: u64,
611
612    /// Create connector in paused state.
613    ///
614    /// The default is `false`.
615    #[serde(default)]
616    pub paused: bool,
617
618    /// Arbitrary user-defined text labels associated with the connector.
619    ///
620    /// These labels can be used in conjunction with the `start_after` property
621    /// to control the start order of connectors.
622    #[serde(default)]
623    pub labels: Vec<String>,
624
625    /// Start the connector after all connectors with specified labels.
626    ///
627    /// This property is used to control the start order of connectors.
628    /// The connector will not start until all connectors with the specified
629    /// labels have finished processing all inputs.
630    #[serde(deserialize_with = "deserialize_start_after")]
631    #[serde(default)]
632    pub start_after: Option<Vec<String>>,
633}
634
635#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
636#[serde(default)]
637pub struct OutputBufferConfig {
638    /// Enable output buffering.
639    ///
640    /// The output buffering mechanism allows decoupling the rate at which the pipeline
641    /// pushes changes to the output transport from the rate of input changes.
642    ///
643    /// By default, output updates produced by the pipeline are pushed directly to
644    /// the output transport. Some destinations may prefer to receive updates in fewer
645    /// bigger batches. For instance, when writing Parquet files, producing
646    /// one bigger file every few minutes is usually better than creating
647    /// small files every few milliseconds.
648    ///
649    /// To achieve such input/output decoupling, users can enable output buffering by
650    /// setting the `enable_output_buffer` flag to `true`.  When buffering is enabled, output
651    /// updates produced by the pipeline are consolidated in an internal buffer and are
652    /// pushed to the output transport when one of several conditions is satisfied:
653    ///
654    /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
655    ///   milliseconds.
656    /// * buffer size exceeds `max_output_buffer_size_records` records.
657    ///
658    /// This flag is `false` by default.
659    // TODO: on-demand output triggered via the API.
660    pub enable_output_buffer: bool,
661
662    /// Maximum time in milliseconds data is kept in the output buffer.
663    ///
664    /// By default, data is kept in the buffer indefinitely until one of
665    /// the other output conditions is satisfied.  When this option is
666    /// set the buffer will be flushed at most every
667    /// `max_output_buffer_time_millis` milliseconds.
668    ///
669    /// NOTE: this configuration option requires the `enable_output_buffer` flag
670    /// to be set.
671    pub max_output_buffer_time_millis: usize,
672
673    /// Maximum number of updates to be kept in the output buffer.
674    ///
675    /// This parameter bounds the maximal size of the buffer.
676    /// Note that the size of the buffer is not always equal to the
677    /// total number of updates output by the pipeline. Updates to the
678    /// same record can overwrite or cancel previous updates.
679    ///
680    /// By default, the buffer can grow indefinitely until one of
681    /// the other output conditions is satisfied.
682    ///
683    /// NOTE: this configuration option requires the `enable_output_buffer` flag
684    /// to be set.
685    pub max_output_buffer_size_records: usize,
686}
687
688impl Default for OutputBufferConfig {
689    fn default() -> Self {
690        Self {
691            enable_output_buffer: false,
692            max_output_buffer_size_records: usize::MAX,
693            max_output_buffer_time_millis: usize::MAX,
694        }
695    }
696}
697
698impl OutputBufferConfig {
699    pub fn validate(&self) -> Result<(), String> {
700        if self.enable_output_buffer
701            && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
702            && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
703        {
704            return Err(
705                "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
706                    .to_string(),
707            );
708        }
709
710        Ok(())
711    }
712}
713
714/// Describes an output connector configuration
715#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
716pub struct OutputEndpointConfig {
717    /// The name of the output stream of the circuit that this endpoint is
718    /// connected to.
719    pub stream: Cow<'static, str>,
720
721    /// Connector configuration.
722    #[serde(flatten)]
723    pub connector_config: ConnectorConfig,
724}
725
726/// Transport-specific endpoint configuration passed to
727/// `crate::OutputTransport::new_endpoint`
728/// and `crate::InputTransport::new_endpoint`.
729#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
730#[serde(tag = "name", content = "config", rename_all = "snake_case")]
731pub enum TransportConfig {
732    FileInput(FileInputConfig),
733    FileOutput(FileOutputConfig),
734    KafkaInput(KafkaInputConfig),
735    KafkaOutput(KafkaOutputConfig),
736    PubSubInput(PubSubInputConfig),
737    UrlInput(UrlInputConfig),
738    S3Input(S3InputConfig),
739    DeltaTableInput(DeltaTableReaderConfig),
740    DeltaTableOutput(DeltaTableWriterConfig),
741    RedisOutput(RedisOutputConfig),
742    // Prevent rust from complaining about large size difference between enum variants.
743    IcebergInput(Box<IcebergReaderConfig>),
744    PostgresInput(PostgresReaderConfig),
745    Datagen(DatagenInputConfig),
746    Nexmark(NexmarkInputConfig),
747    /// Direct HTTP input: cannot be instantiated through API
748    HttpInput(HttpInputConfig),
749    /// Direct HTTP output: cannot be instantiated through API
750    HttpOutput,
751    /// Ad hoc input: cannot be instantiated through API
752    AdHocInput(AdHocInputConfig),
753}
754
755impl TransportConfig {
756    pub fn name(&self) -> String {
757        match self {
758            TransportConfig::FileInput(_) => "file_input".to_string(),
759            TransportConfig::FileOutput(_) => "file_output".to_string(),
760            TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
761            TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
762            TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
763            TransportConfig::UrlInput(_) => "url_input".to_string(),
764            TransportConfig::S3Input(_) => "s3_input".to_string(),
765            TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
766            TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
767            TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
768            TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
769            TransportConfig::Datagen(_) => "datagen".to_string(),
770            TransportConfig::Nexmark(_) => "nexmark".to_string(),
771            TransportConfig::HttpInput(_) => "http_input".to_string(),
772            TransportConfig::HttpOutput => "http_output".to_string(),
773            TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
774            TransportConfig::RedisOutput(_) => "redis_output".to_string(),
775        }
776    }
777}
778
779/// Data format specification used to parse raw data received from the
780/// endpoint or to encode data sent to the endpoint.
781#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
782pub struct FormatConfig {
783    /// Format name, e.g., "csv", "json", "bincode", etc.
784    pub name: Cow<'static, str>,
785
786    /// Format-specific parser or encoder configuration.
787    #[serde(default)]
788    #[schema(value_type = Object)]
789    pub config: YamlValue,
790}
791
792#[derive(Debug, Clone, Eq, PartialEq, Default, Serialize, Deserialize, ToSchema)]
793#[serde(default)]
794pub struct ResourceConfig {
795    /// The minimum number of CPU cores to reserve
796    /// for an instance of this pipeline
797    pub cpu_cores_min: Option<u64>,
798
799    /// The maximum number of CPU cores to reserve
800    /// for an instance of this pipeline
801    pub cpu_cores_max: Option<u64>,
802
803    /// The minimum memory in Megabytes to reserve
804    /// for an instance of this pipeline
805    pub memory_mb_min: Option<u64>,
806
807    /// The maximum memory in Megabytes to reserve
808    /// for an instance of this pipeline
809    pub memory_mb_max: Option<u64>,
810
811    /// The total storage in Megabytes to reserve
812    /// for an instance of this pipeline
813    pub storage_mb_max: Option<u64>,
814
815    /// Storage class to use for an instance of this pipeline.
816    /// The class determines storage performance such as IOPS and throughput.
817    pub storage_class: Option<String>,
818}