feldera_types/
config.rs

1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure.  The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs.  We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::secret_resolver::default_secrets_directory;
9use crate::transport::adhoc::AdHocInputConfig;
10use crate::transport::clock::ClockConfig;
11use crate::transport::datagen::DatagenInputConfig;
12use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
13use crate::transport::file::{FileInputConfig, FileOutputConfig};
14use crate::transport::http::HttpInputConfig;
15use crate::transport::iceberg::IcebergReaderConfig;
16use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
17use crate::transport::nexmark::NexmarkInputConfig;
18use crate::transport::postgres::{PostgresReaderConfig, PostgresWriterConfig};
19use crate::transport::pubsub::PubSubInputConfig;
20use crate::transport::redis::RedisOutputConfig;
21use crate::transport::s3::S3InputConfig;
22use crate::transport::url::UrlInputConfig;
23use core::fmt;
24use serde::de::{self, DeserializeOwned, Error as _, MapAccess, Visitor};
25use serde::{Deserialize, Deserializer, Serialize};
26use serde_json::Value as JsonValue;
27use serde_yaml::Value as YamlValue;
28use std::fmt::Display;
29use std::path::Path;
30use std::str::FromStr;
31use std::time::Duration;
32use std::{borrow::Cow, cmp::max, collections::BTreeMap};
33use utoipa::openapi::{ObjectBuilder, OneOfBuilder, Ref, RefOr, Schema, SchemaType};
34use utoipa::ToSchema;
35
36const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
37
38/// Default value of `ConnectorConfig::max_queued_records`.
39pub const fn default_max_queued_records() -> u64 {
40    1_000_000
41}
42
43/// Default maximum batch size for connectors, in records.
44///
45/// If you change this then update the comment on
46/// [ConnectorConfig::max_batch_size].
47pub const fn default_max_batch_size() -> u64 {
48    10_000
49}
50
51pub const DEFAULT_CLOCK_RESOLUTION_USECS: u64 = 1_000_000;
52
53/// Pipeline deployment configuration.
54/// It represents configuration entries directly provided by the user
55/// (e.g., runtime configuration) and entries derived from the schema
56/// of the compiled program (e.g., connectors). Storage configuration,
57/// if applicable, is set by the runner.
58#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
59pub struct PipelineConfig {
60    /// Global controller configuration.
61    #[serde(flatten)]
62    #[schema(inline)]
63    pub global: RuntimeConfig,
64
65    /// Pipeline name.
66    pub name: Option<String>,
67
68    /// Configuration for persistent storage
69    ///
70    /// If `global.storage` is `Some(_)`, this field must be set to some
71    /// [`StorageConfig`].  If `global.storage` is `None``, the pipeline ignores
72    /// this field.
73    #[serde(default)]
74    pub storage_config: Option<StorageConfig>,
75
76    /// Directory containing values of secrets.
77    ///
78    /// If this is not set, a default directory is used.
79    pub secrets_dir: Option<String>,
80
81    /// Input endpoint configuration.
82    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
83
84    /// Output endpoint configuration.
85    #[serde(default)]
86    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
87}
88
89impl PipelineConfig {
90    pub fn max_parallel_connector_init(&self) -> u64 {
91        max(
92            self.global
93                .max_parallel_connector_init
94                .unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
95            1,
96        )
97    }
98
99    pub fn with_storage(self, storage: Option<(StorageConfig, StorageOptions)>) -> Self {
100        let (storage_config, storage_options) = storage.unzip();
101        Self {
102            global: RuntimeConfig {
103                storage: storage_options,
104                ..self.global
105            },
106            storage_config,
107            ..self
108        }
109    }
110
111    pub fn storage(&self) -> Option<(&StorageConfig, &StorageOptions)> {
112        let storage_options = self.global.storage.as_ref();
113        let storage_config = self.storage_config.as_ref();
114        storage_config.zip(storage_options)
115    }
116
117    /// Returns `self.secrets_dir`, or the default secrets directory if it isn't
118    /// set.
119    pub fn secrets_dir(&self) -> &Path {
120        match &self.secrets_dir {
121            Some(dir) => Path::new(dir.as_str()),
122            None => default_secrets_directory(),
123        }
124    }
125}
126
127/// Configuration for persistent storage in a [`PipelineConfig`].
128#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
129pub struct StorageConfig {
130    /// A directory to keep pipeline state, as a path on the filesystem of the
131    /// machine or container where the pipeline will run.
132    ///
133    /// When storage is enabled, this directory stores the data for
134    /// [StorageBackendConfig::Default].
135    ///
136    /// When fault tolerance is enabled, this directory stores checkpoints and
137    /// the log.
138    pub path: String,
139
140    /// How to cache access to storage in this pipeline.
141    #[serde(default)]
142    pub cache: StorageCacheConfig,
143}
144
145impl StorageConfig {
146    pub fn path(&self) -> &Path {
147        Path::new(&self.path)
148    }
149}
150
151/// How to cache access to storage within a Feldera pipeline.
152#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
153#[serde(rename_all = "snake_case")]
154pub enum StorageCacheConfig {
155    /// Use the operating system's page cache as the primary storage cache.
156    ///
157    /// This is the default because it currently performs better than
158    /// `FelderaCache`.
159    #[default]
160    PageCache,
161
162    /// Use Feldera's internal cache implementation.
163    ///
164    /// This is under development. It will become the default when its
165    /// performance exceeds that of `PageCache`.
166    FelderaCache,
167}
168
169impl StorageCacheConfig {
170    #[cfg(unix)]
171    pub fn to_custom_open_flags(&self) -> i32 {
172        match self {
173            StorageCacheConfig::PageCache => (),
174            StorageCacheConfig::FelderaCache => {
175                #[cfg(target_os = "linux")]
176                return libc::O_DIRECT;
177            }
178        }
179        0
180    }
181}
182
183/// Storage configuration for a pipeline.
184#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
185#[serde(default)]
186pub struct StorageOptions {
187    /// How to connect to the underlying storage.
188    pub backend: StorageBackendConfig,
189
190    /// For a batch of data maintained as part of a persistent index during a
191    /// pipeline run, the minimum estimated number of bytes to write it to
192    /// storage.
193    ///
194    /// This is provided for debugging and fine-tuning and should ordinarily be
195    /// left unset.
196    ///
197    /// A value of 0 will write even empty batches to storage, and nonzero
198    /// values provide a threshold.  `usize::MAX` would effectively disable
199    /// storage for such batches.  The default is 1,048,576 (1 MiB).
200    pub min_storage_bytes: Option<usize>,
201
202    /// For a batch of data passed through the pipeline during a single step,
203    /// the minimum estimated number of bytes to write it to storage.
204    ///
205    /// This is provided for debugging and fine-tuning and should ordinarily be
206    /// left unset.  A value of 0 will write even empty batches to storage, and
207    /// nonzero values provide a threshold.  `usize::MAX`, the default,
208    /// effectively disables storage for such batches.  If it is set to another
209    /// value, it should ordinarily be greater than or equal to
210    /// `min_storage_bytes`.
211    pub min_step_storage_bytes: Option<usize>,
212
213    /// The form of compression to use in data batches.
214    ///
215    /// Compression has a CPU cost but it can take better advantage of limited
216    /// NVMe and network bandwidth, which means that it can increase overall
217    /// performance.
218    pub compression: StorageCompression,
219
220    /// The maximum size of the in-memory storage cache, in MiB.
221    ///
222    /// If set, the specified cache size is spread across all the foreground and
223    /// background threads. If unset, each foreground or background thread cache
224    /// is limited to 256 MiB.
225    pub cache_mib: Option<usize>,
226}
227
228/// Backend storage configuration.
229#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
230#[serde(tag = "name", content = "config", rename_all = "snake_case")]
231pub enum StorageBackendConfig {
232    /// Use the default storage configuration.
233    ///
234    /// This currently uses the local file system.
235    #[default]
236    Default,
237
238    /// Use the local file system.
239    ///
240    /// This uses ordinary system file operations.
241    File(Box<FileBackendConfig>),
242
243    /// Object storage.
244    Object(ObjectStorageConfig),
245}
246
247impl Display for StorageBackendConfig {
248    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249        match self {
250            StorageBackendConfig::Default => write!(f, "default"),
251            StorageBackendConfig::File(_) => write!(f, "file"),
252            StorageBackendConfig::Object(_) => write!(f, "object"),
253        }
254    }
255}
256
257/// Storage compression algorithm.
258#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
259#[serde(rename_all = "snake_case")]
260pub enum StorageCompression {
261    /// Use Feldera's default compression algorithm.
262    ///
263    /// The default may change as Feldera's performance is tuned and new
264    /// algorithms are introduced.
265    #[default]
266    Default,
267
268    /// Do not compress.
269    None,
270
271    /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
272    Snappy,
273}
274
275#[derive(Debug, Clone, Eq, PartialEq)]
276pub enum StartFromCheckpoint {
277    Latest,
278    Uuid(uuid::Uuid),
279}
280
281impl ToSchema<'_> for StartFromCheckpoint {
282    fn schema() -> (
283        &'static str,
284        utoipa::openapi::RefOr<utoipa::openapi::schema::Schema>,
285    ) {
286        (
287            "StartFromCheckpoint",
288            utoipa::openapi::RefOr::T(Schema::OneOf(
289                OneOfBuilder::new()
290                    .item(
291                        ObjectBuilder::new()
292                            .schema_type(SchemaType::String)
293                            .enum_values(Some(["latest"].into_iter()))
294                            .build(),
295                    )
296                    .item(
297                        ObjectBuilder::new()
298                            .schema_type(SchemaType::String)
299                            .format(Some(utoipa::openapi::SchemaFormat::KnownFormat(
300                                utoipa::openapi::KnownFormat::Uuid,
301                            )))
302                            .build(),
303                    )
304                    .nullable(true)
305                    .build(),
306            )),
307        )
308    }
309}
310
311impl<'de> Deserialize<'de> for StartFromCheckpoint {
312    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
313    where
314        D: Deserializer<'de>,
315    {
316        struct StartFromCheckpointVisitor;
317
318        impl<'de> Visitor<'de> for StartFromCheckpointVisitor {
319            type Value = StartFromCheckpoint;
320
321            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
322                formatter.write_str("a UUID string or the string \"latest\"")
323            }
324
325            fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
326            where
327                E: de::Error,
328            {
329                if value == "latest" {
330                    Ok(StartFromCheckpoint::Latest)
331                } else {
332                    uuid::Uuid::parse_str(value)
333                        .map(StartFromCheckpoint::Uuid)
334                        .map_err(|_| E::invalid_value(serde::de::Unexpected::Str(value), &self))
335                }
336            }
337        }
338
339        deserializer.deserialize_str(StartFromCheckpointVisitor)
340    }
341}
342
343impl Serialize for StartFromCheckpoint {
344    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
345    where
346        S: serde::Serializer,
347    {
348        match self {
349            StartFromCheckpoint::Latest => serializer.serialize_str("latest"),
350            StartFromCheckpoint::Uuid(uuid) => serializer.serialize_str(&uuid.to_string()),
351        }
352    }
353}
354
355#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
356pub struct SyncConfig {
357    /// The endpoint URL for the storage service.
358    ///
359    /// This is typically required for custom or local S3-compatible storage providers like MinIO.
360    /// Example: `http://localhost:9000`
361    ///
362    /// Relevant rclone config key: [`endpoint`](https://rclone.org/s3/#s3-endpoint)
363    pub endpoint: Option<String>,
364
365    /// The name of the storage bucket.
366    ///
367    /// This may include a path to a folder inside the bucket (e.g., `my-bucket/data`).
368    pub bucket: String,
369
370    /// The region that this bucket is in.
371    ///
372    /// Leave empty for Minio or the default region (`us-east-1` for AWS).
373    pub region: Option<String>,
374
375    /// The name of the cloud storage provider (e.g., `"AWS"`, `"Minio"`).
376    ///
377    /// Used for provider-specific behavior in rclone.
378    /// If omitted, defaults to `"Other"`.
379    ///
380    /// See [rclone S3 provider documentation](https://rclone.org/s3/#s3-provider)
381    pub provider: Option<String>,
382
383    /// The access key used to authenticate with the storage provider.
384    ///
385    /// If not provided, rclone will fall back to environment-based credentials, such as
386    /// `RCLONE_S3_ACCESS_KEY_ID`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
387    /// this can be left empty to allow automatic authentication via the pod's service account.
388    pub access_key: Option<String>,
389
390    /// The secret key used together with the access key for authentication.
391    ///
392    /// If not provided, rclone will fall back to environment-based credentials, such as
393    /// `RCLONE_S3_SECRET_ACCESS_KEY`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
394    /// this can be left empty to allow automatic authentication via the pod's service account.
395    pub secret_key: Option<String>,
396
397    /// When set, the pipeline will try fetch the specified checkpoint from the
398    /// object store.
399    ///
400    /// If `fail_if_no_checkpoint` is `true`, the pipeline will fail to initialize.
401    pub start_from_checkpoint: Option<StartFromCheckpoint>,
402
403    /// When true, the pipeline will fail to initialize if fetching the
404    /// specified checkpoint fails (missing, download error).
405    /// When false, the pipeline will start from scratch instead.
406    ///
407    /// False by default.
408    #[schema(default = std::primitive::bool::default)]
409    #[serde(default)]
410    pub fail_if_no_checkpoint: bool,
411
412    /// The number of file transfers to run in parallel.
413    /// Default: 20
414    pub transfers: Option<u8>,
415
416    /// The number of checkers to run in parallel.
417    /// Default: 20
418    pub checkers: Option<u8>,
419
420    /// Set to skip post copy check of checksums, and only check the file sizes.
421    /// This can significantly improve the throughput.
422    /// Defualt: false
423    pub ignore_checksum: Option<bool>,
424
425    /// Number of streams to use for multi-thread downloads.
426    /// Default: 10
427    pub multi_thread_streams: Option<u8>,
428
429    /// Use multi-thread download for files above this size.
430    /// Format: `[size][Suffix]` (Example: 1G, 500M)
431    /// Supported suffixes: k|M|G|T
432    /// Default: 100M
433    pub multi_thread_cutoff: Option<String>,
434
435    /// The number of chunks of the same file that are uploaded for multipart uploads.
436    /// Default: 10
437    pub upload_concurrency: Option<u8>,
438
439    /// When `true`, the pipeline starts in **standby** mode; processing doesn't
440    /// start until activation (`POST /activate`).
441    /// If this pipeline was previously activated and the storage has not been
442    /// cleared, the pipeline will auto activate, no newer checkpoints will be
443    /// fetched.
444    ///
445    /// Standby behavior depends on `start_from_checkpoint`:
446    /// - If `latest`, pipeline continuously fetches the latest available
447    ///   checkpoint until activated.
448    /// - If checkpoint UUID, pipeline fetches this checkpoint once and waits
449    ///   in standby until activated.
450    ///
451    /// Default: `false`
452    #[schema(default = std::primitive::bool::default)]
453    #[serde(default)]
454    pub standby: bool,
455
456    /// The interval (in seconds) between each attempt to fetch the latest
457    /// checkpoint from object store while in standby mode.
458    ///
459    /// Applies only when `start_from_checkpoint` is set to `latest`.
460    ///
461    /// Default: 10 seconds
462    #[schema(default = default_pull_interval)]
463    #[serde(default = "default_pull_interval")]
464    pub pull_interval: u64,
465
466    /// Extra flags to pass to `rclone`.
467    ///
468    /// WARNING: Supplying incorrect or conflicting flags can break `rclone`.
469    /// Use with caution.
470    ///
471    /// Refer to the docs to see the supported flags:
472    /// - [Global flags](https://rclone.org/flags/)
473    /// - [S3 specific flags](https://rclone.org/s3/)
474    pub flags: Option<Vec<String>>,
475
476    /// The minimum number of checkpoints to retain in object store.
477    /// No checkpoints will be deleted if the total count is below this threshold.
478    ///
479    /// Default: 10
480    #[schema(default = default_retention_min_count)]
481    #[serde(default = "default_retention_min_count")]
482    pub retention_min_count: u32,
483
484    /// The minimum age (in days) a checkpoint must reach before it becomes
485    /// eligible for deletion. All younger checkpoints will be preserved.
486    ///
487    /// Default: 30
488    #[schema(default = default_retention_min_age)]
489    #[serde(default = "default_retention_min_age")]
490    pub retention_min_age: u32,
491}
492
493fn default_pull_interval() -> u64 {
494    10
495}
496
497fn default_retention_min_count() -> u32 {
498    10
499}
500
501fn default_retention_min_age() -> u32 {
502    30
503}
504
505impl SyncConfig {
506    pub fn validate(&self) -> Result<(), String> {
507        if self.standby && self.start_from_checkpoint.is_none() {
508            return Err(r#"invalid sync config: `standby` set to `true` but `start_from_checkpoint` not set.
509Standby mode requires `start_from_checkpoint` to be set.
510Consider setting `start_from_checkpoint` to `"latest"`."#.to_owned());
511        }
512
513        Ok(())
514    }
515}
516
517#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
518pub struct ObjectStorageConfig {
519    /// URL.
520    ///
521    /// The following URL schemes are supported:
522    ///
523    /// * S3:
524    ///   - `s3://<bucket>/<path>`
525    ///   - `s3a://<bucket>/<path>`
526    ///   - `https://s3.<region>.amazonaws.com/<bucket>`
527    ///   - `https://<bucket>.s3.<region>.amazonaws.com`
528    ///   - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
529    /// * Google Cloud Storage:
530    ///   - `gs://<bucket>/<path>`
531    /// * Microsoft Azure Blob Storage:
532    ///   - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
533    ///   - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
534    ///   - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
535    ///   - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
536    ///   - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
537    ///   - `azure://<container>/<path>` (custom)
538    ///   - `https://<account>.dfs.core.windows.net`
539    ///   - `https://<account>.blob.core.windows.net`
540    ///   - `https://<account>.blob.core.windows.net/<container>`
541    ///   - `https://<account>.dfs.fabric.microsoft.com`
542    ///   - `https://<account>.dfs.fabric.microsoft.com/<container>`
543    ///   - `https://<account>.blob.fabric.microsoft.com`
544    ///   - `https://<account>.blob.fabric.microsoft.com/<container>`
545    ///
546    /// Settings derived from the URL will override other settings.
547    pub url: String,
548
549    /// Additional options as key-value pairs.
550    ///
551    /// The following keys are supported:
552    ///
553    /// * S3:
554    ///   - `access_key_id`: AWS Access Key.
555    ///   - `secret_access_key`: AWS Secret Access Key.
556    ///   - `region`: Region.
557    ///   - `default_region`: Default region.
558    ///   - `endpoint`: Custom endpoint for communicating with S3,
559    ///     e.g. `https://localhost:4566` for testing against a localstack
560    ///     instance.
561    ///   - `token`: Token to use for requests (passed to underlying provider).
562    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
563    /// * Google Cloud Storage:
564    ///   - `service_account`: Path to the service account file.
565    ///   - `service_account_key`: The serialized service account key.
566    ///   - `google_application_credentials`: Application credentials path.
567    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
568    /// * Microsoft Azure Blob Storage:
569    ///   - `access_key`: Azure Access Key.
570    ///   - `container_name`: Azure Container Name.
571    ///   - `account`: Azure Account.
572    ///   - `bearer_token_authorization`: Static bearer token for authorizing requests.
573    ///   - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
574    ///   - `client_secret`: Client secret for use in client secret flow.
575    ///   - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
576    ///   - `endpoint`: Override the endpoint for communicating with blob storage.
577    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
578    ///
579    /// Options set through the URL take precedence over those set with these
580    /// options.
581    #[serde(flatten)]
582    pub other_options: BTreeMap<String, String>,
583}
584
585/// Configuration for local file system access.
586#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
587#[serde(default)]
588pub struct FileBackendConfig {
589    /// Whether to use background threads for file I/O.
590    ///
591    /// Background threads should improve performance, but they can reduce
592    /// performance if too few cores are available. This is provided for
593    /// debugging and fine-tuning and should ordinarily be left unset.
594    pub async_threads: Option<bool>,
595
596    /// Per-I/O operation sleep duration, in milliseconds.
597    ///
598    /// This is for simulating slow storage devices.  Do not use this in
599    /// production.
600    pub ioop_delay: Option<u64>,
601
602    /// Configuration to synchronize checkpoints to object store.
603    pub sync: Option<SyncConfig>,
604}
605
606/// Global pipeline configuration settings. This is the publicly
607/// exposed type for users to configure pipelines.
608#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
609#[serde(default)]
610pub struct RuntimeConfig {
611    /// Number of DBSP worker threads.
612    ///
613    /// Each DBSP "foreground" worker thread is paired with a "background"
614    /// thread for LSM merging, making the total number of threads twice the
615    /// specified number.
616    ///
617    /// The typical sweet spot for the number of workers is between 4 and 16.
618    /// Each worker increases overall memory consumption for data structures
619    /// used during a step.
620    pub workers: u16,
621
622    /// Storage configuration.
623    ///
624    /// - If this is `None`, the default, the pipeline's state is kept in
625    ///   in-memory data-structures.  This is useful if the pipeline's state
626    ///   will fit in memory and if the pipeline is ephemeral and does not need
627    ///   to be recovered after a restart. The pipeline will most likely run
628    ///   faster since it does not need to access storage.
629    ///
630    /// - If set, the pipeline's state is kept on storage.  This allows the
631    ///   pipeline to work with state that will not fit into memory. It also
632    ///   allows the state to be checkpointed and recovered across restarts.
633    #[serde(deserialize_with = "deserialize_storage_options")]
634    pub storage: Option<StorageOptions>,
635
636    /// Fault tolerance configuration.
637    #[serde(deserialize_with = "deserialize_fault_tolerance")]
638    pub fault_tolerance: FtConfig,
639
640    /// Enable CPU profiler.
641    ///
642    /// The default value is `true`.
643    pub cpu_profiler: bool,
644
645    /// Enable pipeline tracing.
646    pub tracing: bool,
647
648    /// Jaeger tracing endpoint to send tracing information to.
649    pub tracing_endpoint_jaeger: String,
650
651    /// Minimal input batch size.
652    ///
653    /// The controller delays pushing input records to the circuit until at
654    /// least `min_batch_size_records` records have been received (total
655    /// across all endpoints) or `max_buffering_delay_usecs` microseconds
656    /// have passed since at least one input records has been buffered.
657    /// Defaults to 0.
658    pub min_batch_size_records: u64,
659
660    /// Maximal delay in microseconds to wait for `min_batch_size_records` to
661    /// get buffered by the controller, defaults to 0.
662    pub max_buffering_delay_usecs: u64,
663
664    /// Resource reservations and limits. This is enforced
665    /// only in Feldera Cloud.
666    pub resources: ResourceConfig,
667
668    /// Real-time clock resolution in microseconds.
669    ///
670    /// This parameter controls the execution of queries that use the `NOW()` function.  The output of such
671    /// queries depends on the real-time clock and can change over time without any external
672    /// inputs.  If the query uses `NOW()`, the pipeline will update the clock value and trigger incremental
673    /// recomputation at most each `clock_resolution_usecs` microseconds.  If the query does not use
674    /// `NOW()`, then clock value updates are suppressed and the pipeline ignores this setting.
675    ///
676    /// It is set to 1 second (1,000,000 microseconds) by default.
677    pub clock_resolution_usecs: Option<u64>,
678
679    /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
680    /// its worker threads.  Specify at least twice as many CPU numbers as
681    /// workers.  CPUs are generally numbered starting from 0.  The pipeline
682    /// might not be able to honor CPU pinning requests.
683    ///
684    /// CPU pinning can make pipelines run faster and perform more consistently,
685    /// as long as different pipelines running on the same machine are pinned to
686    /// different CPUs.
687    pub pin_cpus: Vec<usize>,
688
689    /// Timeout in seconds for the `Provisioning` phase of the pipeline.
690    /// Setting this value will override the default of the runner.
691    pub provisioning_timeout_secs: Option<u64>,
692
693    /// The maximum number of connectors initialized in parallel during pipeline
694    /// startup.
695    ///
696    /// At startup, the pipeline must initialize all of its input and output connectors.
697    /// Depending on the number and types of connectors, this can take a long time.
698    /// To accelerate the process, multiple connectors are initialized concurrently.
699    /// This option controls the maximum number of connectors that can be initialized
700    /// in parallel.
701    ///
702    /// The default is 10.
703    pub max_parallel_connector_init: Option<u64>,
704
705    /// Specification of additional (sidecar) containers.
706    pub init_containers: Option<serde_yaml::Value>,
707
708    /// Deprecated: setting this true or false does not have an effect anymore.
709    pub checkpoint_during_suspend: bool,
710
711    /// Sets the number of available runtime threads for the http server.
712    ///
713    /// In most cases, this does not need to be set explicitly and
714    /// the default is sufficient. Can be increased in case the
715    /// pipeline HTTP API operations are a bottleneck.
716    ///
717    /// If not specified, the default is set to `workers`.
718    pub http_workers: Option<u64>,
719
720    /// Sets the number of available runtime threads for async IO tasks.
721    ///
722    /// This affects some networking and file I/O operations
723    /// especially adapters and ad-hoc queries.
724    ///
725    /// In most cases, this does not need to be set explicitly and
726    /// the default is sufficient. Can be increased in case
727    /// ingress, egress or ad-hoc queries are a bottleneck.
728    ///
729    /// If not specified, the default is set to `workers`.
730    pub io_workers: Option<u64>,
731
732    /// Optional settings for tweaking Feldera internals.
733    ///
734    /// The available key-value pairs change from one version of Feldera to
735    /// another, so users should not depend on particular settings being
736    /// available, or on their behavior.
737    pub dev_tweaks: BTreeMap<String, serde_json::Value>,
738
739    /// Log filtering directives.
740    ///
741    /// If set to a valid [tracing-subscriber] filter, this controls the log
742    /// messages emitted by the pipeline process.  Otherwise, or if the filter
743    /// has invalid syntax, messages at "info" severity and higher are written
744    /// to the log and all others are discarded.
745    ///
746    /// [tracing-subscriber]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
747    pub logging: Option<String>,
748}
749
750/// Accepts "true" and "false" and converts them to the new format.
751fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
752where
753    D: Deserializer<'de>,
754{
755    struct BoolOrStruct;
756
757    impl<'de> Visitor<'de> for BoolOrStruct {
758        type Value = Option<StorageOptions>;
759
760        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
761            formatter.write_str("boolean or StorageOptions")
762        }
763
764        fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
765        where
766            E: de::Error,
767        {
768            match v {
769                false => Ok(None),
770                true => Ok(Some(StorageOptions::default())),
771            }
772        }
773
774        fn visit_unit<E>(self) -> Result<Self::Value, E>
775        where
776            E: de::Error,
777        {
778            Ok(None)
779        }
780
781        fn visit_none<E>(self) -> Result<Self::Value, E>
782        where
783            E: de::Error,
784        {
785            Ok(None)
786        }
787
788        fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
789        where
790            M: MapAccess<'de>,
791        {
792            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
793        }
794    }
795
796    deserializer.deserialize_any(BoolOrStruct)
797}
798
799/// Accepts very old 'initial_state' and 'latest_checkpoint' as enabling fault
800/// tolerance.
801///
802/// Accepts `null` as disabling fault tolerance.
803///
804/// Otherwise, deserializes [FtConfig] in the way that one might otherwise
805/// expect.
806fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<FtConfig, D::Error>
807where
808    D: Deserializer<'de>,
809{
810    struct StringOrStruct;
811
812    impl<'de> Visitor<'de> for StringOrStruct {
813        type Value = FtConfig;
814
815        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
816            formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
817        }
818
819        fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
820        where
821            E: de::Error,
822        {
823            match v {
824                "initial_state" | "latest_checkpoint" => Ok(FtConfig {
825                    model: Some(FtModel::default()),
826                    ..FtConfig::default()
827                }),
828                _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
829            }
830        }
831
832        fn visit_unit<E>(self) -> Result<Self::Value, E>
833        where
834            E: de::Error,
835        {
836            Ok(FtConfig::default())
837        }
838
839        fn visit_none<E>(self) -> Result<Self::Value, E>
840        where
841            E: de::Error,
842        {
843            Ok(FtConfig::default())
844        }
845
846        fn visit_map<M>(self, map: M) -> Result<FtConfig, M::Error>
847        where
848            M: MapAccess<'de>,
849        {
850            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))
851        }
852    }
853
854    deserializer.deserialize_any(StringOrStruct)
855}
856
857impl Default for RuntimeConfig {
858    fn default() -> Self {
859        Self {
860            workers: 8,
861            storage: Some(StorageOptions::default()),
862            fault_tolerance: FtConfig::default(),
863            cpu_profiler: true,
864            tracing: {
865                // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
866                // to keep it on by default.
867                false
868            },
869            tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
870            min_batch_size_records: 0,
871            max_buffering_delay_usecs: 0,
872            resources: ResourceConfig::default(),
873            clock_resolution_usecs: { Some(DEFAULT_CLOCK_RESOLUTION_USECS) },
874            pin_cpus: Vec::new(),
875            provisioning_timeout_secs: None,
876            max_parallel_connector_init: None,
877            init_containers: None,
878            checkpoint_during_suspend: true,
879            io_workers: None,
880            http_workers: None,
881            dev_tweaks: BTreeMap::default(),
882            logging: None,
883        }
884    }
885}
886
887/// Fault-tolerance configuration.
888///
889/// The default [FtConfig] (via [FtConfig::default]) disables fault tolerance,
890/// which is the configuration that one gets if [RuntimeConfig] omits fault
891/// tolerance configuration.
892///
893/// The default value for [FtConfig::model] enables fault tolerance, as
894/// `Some(FtModel::default())`.  This is the configuration that one gets if
895/// [RuntimeConfig] includes a fault tolerance configuration but does not
896/// specify a particular model.
897#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
898#[serde(rename_all = "snake_case")]
899pub struct FtConfig {
900    /// Fault tolerance model to use.
901    #[serde(with = "none_as_string")]
902    #[serde(default = "default_model")]
903    #[schema(
904        schema_with = none_as_string_schema::<FtModel>,
905    )]
906    pub model: Option<FtModel>,
907
908    /// Interval between automatic checkpoints, in seconds.
909    ///
910    /// The default is 60 seconds.  Values less than 1 or greater than 3600 will
911    /// be forced into that range.
912    #[serde(default = "default_checkpoint_interval_secs")]
913    pub checkpoint_interval_secs: Option<u64>,
914}
915
916fn default_model() -> Option<FtModel> {
917    Some(FtModel::default())
918}
919
920pub fn default_checkpoint_interval_secs() -> Option<u64> {
921    Some(60)
922}
923
924impl Default for FtConfig {
925    fn default() -> Self {
926        Self {
927            model: None,
928            checkpoint_interval_secs: default_checkpoint_interval_secs(),
929        }
930    }
931}
932
933#[cfg(test)]
934mod test {
935    use super::deserialize_fault_tolerance;
936    use crate::config::{FtConfig, FtModel};
937    use serde::{Deserialize, Serialize};
938
939    #[test]
940    fn ft_config() {
941        #[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)]
942        #[serde(default)]
943        struct Wrapper {
944            #[serde(deserialize_with = "deserialize_fault_tolerance")]
945            config: FtConfig,
946        }
947
948        // Omitting FtConfig, or specifying null, or specifying model "none", disables fault tolerance.
949        for s in [
950            "{}",
951            r#"{"config": null}"#,
952            r#"{"config": {"model": "none"}}"#,
953        ] {
954            let config: Wrapper = serde_json::from_str(s).unwrap();
955            assert_eq!(
956                config,
957                Wrapper {
958                    config: FtConfig {
959                        model: None,
960                        checkpoint_interval_secs: Some(60)
961                    }
962                }
963            );
964        }
965
966        // Serializing disabled FT produces explicit "none" form.
967        let s = serde_json::to_string(&Wrapper {
968            config: FtConfig::default(),
969        })
970        .unwrap();
971        assert!(s.contains("\"none\""));
972
973        // `{}` for FtConfig, or `{...}` with `model` omitted, enables fault
974        // tolerance.
975        for s in [r#"{"config": {}}"#, r#"{"checkpoint_interval_secs": 60}"#] {
976            assert_eq!(
977                serde_json::from_str::<FtConfig>(s).unwrap(),
978                FtConfig {
979                    model: Some(FtModel::default()),
980                    checkpoint_interval_secs: Some(60)
981                }
982            );
983        }
984
985        // `"checkpoint_interval_secs": null` disables periodic checkpointing.
986        assert_eq!(
987            serde_json::from_str::<FtConfig>(r#"{"checkpoint_interval_secs": null}"#).unwrap(),
988            FtConfig {
989                model: Some(FtModel::default()),
990                checkpoint_interval_secs: None
991            }
992        );
993    }
994}
995
996impl FtConfig {
997    pub fn is_enabled(&self) -> bool {
998        self.model.is_some()
999    }
1000
1001    /// Returns the checkpoint interval, if fault tolerance is enabled, and
1002    /// otherwise `None`.
1003    pub fn checkpoint_interval(&self) -> Option<Duration> {
1004        if self.is_enabled() {
1005            self.checkpoint_interval_secs
1006                .map(|interval| Duration::from_secs(interval.clamp(1, 3600)))
1007        } else {
1008            None
1009        }
1010    }
1011}
1012
1013/// Serde implementation for de/serializing a string into `Option<T>` where
1014/// `"none"` indicates `None` and any other string indicates `Some`.
1015///
1016/// This could be extended to handle non-strings by adding more forwarding
1017/// `visit_*` methods to the Visitor implementation.  I don't see a way to write
1018/// them automatically.
1019mod none_as_string {
1020    use std::marker::PhantomData;
1021
1022    use serde::de::{Deserialize, Deserializer, IntoDeserializer, Visitor};
1023    use serde::ser::{Serialize, Serializer};
1024
1025    pub(super) fn serialize<S, T>(value: &Option<T>, serializer: S) -> Result<S::Ok, S::Error>
1026    where
1027        S: Serializer,
1028        T: Serialize,
1029    {
1030        match value.as_ref() {
1031            Some(value) => value.serialize(serializer),
1032            None => "none".serialize(serializer),
1033        }
1034    }
1035
1036    struct NoneAsString<T>(PhantomData<fn() -> T>);
1037
1038    impl<'de, T> Visitor<'de> for NoneAsString<T>
1039    where
1040        T: Deserialize<'de>,
1041    {
1042        type Value = Option<T>;
1043
1044        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1045            formatter.write_str("string")
1046        }
1047
1048        fn visit_none<E>(self) -> Result<Self::Value, E>
1049        where
1050            E: serde::de::Error,
1051        {
1052            Ok(None)
1053        }
1054
1055        fn visit_str<E>(self, value: &str) -> Result<Option<T>, E>
1056        where
1057            E: serde::de::Error,
1058        {
1059            if &value.to_ascii_lowercase() == "none" {
1060                Ok(None)
1061            } else {
1062                Ok(Some(T::deserialize(value.into_deserializer())?))
1063            }
1064        }
1065    }
1066
1067    pub(super) fn deserialize<'de, D, T>(deserializer: D) -> Result<Option<T>, D::Error>
1068    where
1069        D: Deserializer<'de>,
1070        T: Deserialize<'de>,
1071    {
1072        deserializer.deserialize_str(NoneAsString(PhantomData))
1073    }
1074}
1075
1076/// Generates an OpenAPI schema for an `Option<T>` field serialized with `none_as_string`.
1077/// The schema is a `oneOf` with a reference to `T`'s schema and a `"none"` string enum.
1078fn none_as_string_schema<'a, T: ToSchema<'a> + Default + Serialize>() -> Schema {
1079    Schema::OneOf(
1080        OneOfBuilder::new()
1081            .item(RefOr::Ref(Ref::new(format!(
1082                "#/components/schemas/{}",
1083                T::schema().0
1084            ))))
1085            .item(
1086                ObjectBuilder::new()
1087                    .schema_type(SchemaType::String)
1088                    .enum_values(Some(vec!["none"])),
1089            )
1090            .default(Some(
1091                serde_json::to_value(T::default()).expect("Failed to serialize default value"),
1092            ))
1093            .build(),
1094    )
1095}
1096
1097/// Fault tolerance model.
1098///
1099/// The ordering is significant: we consider [Self::ExactlyOnce] to be a "higher
1100/// level" of fault tolerance than [Self::AtLeastOnce].
1101#[derive(
1102    Debug, Copy, Clone, Default, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, ToSchema,
1103)]
1104#[serde(rename_all = "snake_case")]
1105pub enum FtModel {
1106    /// Each record is output at least once.  Crashes may duplicate output, but
1107    /// no input or output is dropped.
1108    AtLeastOnce,
1109
1110    /// Each record is output exactly once.  Crashes do not drop or duplicate
1111    /// input or output.
1112    #[default]
1113    ExactlyOnce,
1114}
1115
1116impl FtModel {
1117    pub fn option_as_str(value: Option<FtModel>) -> &'static str {
1118        value.map_or("no", |model| model.as_str())
1119    }
1120
1121    pub fn as_str(&self) -> &'static str {
1122        match self {
1123            FtModel::AtLeastOnce => "at_least_once",
1124            FtModel::ExactlyOnce => "exactly_once",
1125        }
1126    }
1127}
1128
1129pub struct FtModelUnknown;
1130
1131impl FromStr for FtModel {
1132    type Err = FtModelUnknown;
1133
1134    fn from_str(s: &str) -> Result<Self, Self::Err> {
1135        match s.to_ascii_lowercase().as_str() {
1136            "exactly_once" => Ok(Self::ExactlyOnce),
1137            "at_least_once" => Ok(Self::AtLeastOnce),
1138            _ => Err(FtModelUnknown),
1139        }
1140    }
1141}
1142
1143/// Describes an input connector configuration
1144#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1145pub struct InputEndpointConfig {
1146    /// The name of the input stream of the circuit that this endpoint is
1147    /// connected to.
1148    pub stream: Cow<'static, str>,
1149
1150    /// Connector configuration.
1151    #[serde(flatten)]
1152    pub connector_config: ConnectorConfig,
1153}
1154
1155/// Deserialize the `start_after` property of a connector configuration.
1156/// It requires a non-standard deserialization because we want to accept
1157/// either a string or an array of strings.
1158fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
1159where
1160    D: Deserializer<'de>,
1161{
1162    let value = Option::<JsonValue>::deserialize(deserializer)?;
1163    match value {
1164        Some(JsonValue::String(s)) => Ok(Some(vec![s])),
1165        Some(JsonValue::Array(arr)) => {
1166            let vec = arr
1167                .into_iter()
1168                .map(|item| {
1169                    item.as_str()
1170                        .map(|s| s.to_string())
1171                        .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
1172                })
1173                .collect::<Result<Vec<String>, _>>()?;
1174            Ok(Some(vec))
1175        }
1176        Some(JsonValue::Null) | None => Ok(None),
1177        _ => Err(serde::de::Error::custom(
1178            "invalid 'start_after' property: expected a string, an array of strings, or null",
1179        )),
1180    }
1181}
1182
1183/// A data connector's configuration
1184#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1185pub struct ConnectorConfig {
1186    /// Transport endpoint configuration.
1187    pub transport: TransportConfig,
1188
1189    /// Parser configuration.
1190    pub format: Option<FormatConfig>,
1191
1192    /// Name of the index that the connector is attached to.
1193    ///
1194    /// This property is valid for output connectors only.  It is used with data
1195    /// transports and formats that expect output updates in the form of key/value
1196    /// pairs, where the key typically represents a unique id associated with the
1197    /// table or view.
1198    ///
1199    /// To support such output formats, an output connector can be attached to an
1200    /// index created using the SQL CREATE INDEX statement.  An index of a table
1201    /// or view contains the same updates as the table or view itself, indexed by
1202    /// one or more key columns.
1203    ///
1204    /// See individual connector documentation for details on how they work
1205    /// with indexes.
1206    pub index: Option<String>,
1207
1208    /// Output buffer configuration.
1209    #[serde(flatten)]
1210    pub output_buffer_config: OutputBufferConfig,
1211
1212    /// Maximum batch size, in records.
1213    ///
1214    /// This is the maximum number of records to process in one batch through
1215    /// the circuit.  The time and space cost of processing a batch is
1216    /// asymptotically superlinear in the size of the batch, but very small
1217    /// batches are less efficient due to constant factors.
1218    ///
1219    /// This should usually be less than `max_queued_records`, to give the
1220    /// connector a round-trip time to restart and refill the buffer while
1221    /// batches are being processed.
1222    ///
1223    /// Some input adapters might not honor this setting.
1224    ///
1225    /// The default is 10,000.
1226    #[serde(default = "default_max_batch_size")]
1227    pub max_batch_size: u64,
1228
1229    /// Backpressure threshold.
1230    ///
1231    /// Maximal number of records queued by the endpoint before the endpoint
1232    /// is paused by the backpressure mechanism.
1233    ///
1234    /// For input endpoints, this setting bounds the number of records that have
1235    /// been received from the input transport but haven't yet been consumed by
1236    /// the circuit since the circuit, since the circuit is still busy processing
1237    /// previous inputs.
1238    ///
1239    /// For output endpoints, this setting bounds the number of records that have
1240    /// been produced by the circuit but not yet sent via the output transport endpoint
1241    /// nor stored in the output buffer (see `enable_output_buffer`).
1242    ///
1243    /// Note that this is not a hard bound: there can be a small delay between
1244    /// the backpressure mechanism is triggered and the endpoint is paused, during
1245    /// which more data may be queued.
1246    ///
1247    /// The default is 1 million.
1248    #[serde(default = "default_max_queued_records")]
1249    pub max_queued_records: u64,
1250
1251    /// Create connector in paused state.
1252    ///
1253    /// The default is `false`.
1254    #[serde(default)]
1255    pub paused: bool,
1256
1257    /// Arbitrary user-defined text labels associated with the connector.
1258    ///
1259    /// These labels can be used in conjunction with the `start_after` property
1260    /// to control the start order of connectors.
1261    #[serde(default)]
1262    pub labels: Vec<String>,
1263
1264    /// Start the connector after all connectors with specified labels.
1265    ///
1266    /// This property is used to control the start order of connectors.
1267    /// The connector will not start until all connectors with the specified
1268    /// labels have finished processing all inputs.
1269    #[serde(deserialize_with = "deserialize_start_after")]
1270    #[serde(default)]
1271    pub start_after: Option<Vec<String>>,
1272}
1273
1274#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1275#[serde(default)]
1276pub struct OutputBufferConfig {
1277    /// Enable output buffering.
1278    ///
1279    /// The output buffering mechanism allows decoupling the rate at which the pipeline
1280    /// pushes changes to the output transport from the rate of input changes.
1281    ///
1282    /// By default, output updates produced by the pipeline are pushed directly to
1283    /// the output transport. Some destinations may prefer to receive updates in fewer
1284    /// bigger batches. For instance, when writing Parquet files, producing
1285    /// one bigger file every few minutes is usually better than creating
1286    /// small files every few milliseconds.
1287    ///
1288    /// To achieve such input/output decoupling, users can enable output buffering by
1289    /// setting the `enable_output_buffer` flag to `true`.  When buffering is enabled, output
1290    /// updates produced by the pipeline are consolidated in an internal buffer and are
1291    /// pushed to the output transport when one of several conditions is satisfied:
1292    ///
1293    /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
1294    ///   milliseconds.
1295    /// * buffer size exceeds `max_output_buffer_size_records` records.
1296    ///
1297    /// This flag is `false` by default.
1298    // TODO: on-demand output triggered via the API.
1299    pub enable_output_buffer: bool,
1300
1301    /// Maximum time in milliseconds data is kept in the output buffer.
1302    ///
1303    /// By default, data is kept in the buffer indefinitely until one of
1304    /// the other output conditions is satisfied.  When this option is
1305    /// set the buffer will be flushed at most every
1306    /// `max_output_buffer_time_millis` milliseconds.
1307    ///
1308    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1309    /// to be set.
1310    pub max_output_buffer_time_millis: usize,
1311
1312    /// Maximum number of updates to be kept in the output buffer.
1313    ///
1314    /// This parameter bounds the maximal size of the buffer.
1315    /// Note that the size of the buffer is not always equal to the
1316    /// total number of updates output by the pipeline. Updates to the
1317    /// same record can overwrite or cancel previous updates.
1318    ///
1319    /// By default, the buffer can grow indefinitely until one of
1320    /// the other output conditions is satisfied.
1321    ///
1322    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1323    /// to be set.
1324    pub max_output_buffer_size_records: usize,
1325}
1326
1327impl Default for OutputBufferConfig {
1328    fn default() -> Self {
1329        Self {
1330            enable_output_buffer: false,
1331            max_output_buffer_size_records: usize::MAX,
1332            max_output_buffer_time_millis: usize::MAX,
1333        }
1334    }
1335}
1336
1337impl OutputBufferConfig {
1338    pub fn validate(&self) -> Result<(), String> {
1339        if self.enable_output_buffer
1340            && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
1341            && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
1342        {
1343            return Err(
1344                "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
1345                    .to_string(),
1346            );
1347        }
1348
1349        Ok(())
1350    }
1351}
1352
1353/// Describes an output connector configuration
1354#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1355pub struct OutputEndpointConfig {
1356    /// The name of the output stream of the circuit that this endpoint is
1357    /// connected to.
1358    pub stream: Cow<'static, str>,
1359
1360    /// Connector configuration.
1361    #[serde(flatten)]
1362    pub connector_config: ConnectorConfig,
1363}
1364
1365/// Transport-specific endpoint configuration passed to
1366/// `crate::OutputTransport::new_endpoint`
1367/// and `crate::InputTransport::new_endpoint`.
1368#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1369#[serde(tag = "name", content = "config", rename_all = "snake_case")]
1370pub enum TransportConfig {
1371    FileInput(FileInputConfig),
1372    FileOutput(FileOutputConfig),
1373    KafkaInput(KafkaInputConfig),
1374    KafkaOutput(KafkaOutputConfig),
1375    PubSubInput(PubSubInputConfig),
1376    UrlInput(UrlInputConfig),
1377    S3Input(S3InputConfig),
1378    DeltaTableInput(DeltaTableReaderConfig),
1379    DeltaTableOutput(DeltaTableWriterConfig),
1380    RedisOutput(RedisOutputConfig),
1381    // Prevent rust from complaining about large size difference between enum variants.
1382    IcebergInput(Box<IcebergReaderConfig>),
1383    PostgresInput(PostgresReaderConfig),
1384    PostgresOutput(PostgresWriterConfig),
1385    Datagen(DatagenInputConfig),
1386    Nexmark(NexmarkInputConfig),
1387    /// Direct HTTP input: cannot be instantiated through API
1388    HttpInput(HttpInputConfig),
1389    /// Direct HTTP output: cannot be instantiated through API
1390    HttpOutput,
1391    /// Ad hoc input: cannot be instantiated through API
1392    AdHocInput(AdHocInputConfig),
1393    ClockInput(ClockConfig),
1394}
1395
1396impl TransportConfig {
1397    pub fn name(&self) -> String {
1398        match self {
1399            TransportConfig::FileInput(_) => "file_input".to_string(),
1400            TransportConfig::FileOutput(_) => "file_output".to_string(),
1401            TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
1402            TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
1403            TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
1404            TransportConfig::UrlInput(_) => "url_input".to_string(),
1405            TransportConfig::S3Input(_) => "s3_input".to_string(),
1406            TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
1407            TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
1408            TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
1409            TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
1410            TransportConfig::PostgresOutput(_) => "postgres_output".to_string(),
1411            TransportConfig::Datagen(_) => "datagen".to_string(),
1412            TransportConfig::Nexmark(_) => "nexmark".to_string(),
1413            TransportConfig::HttpInput(_) => "http_input".to_string(),
1414            TransportConfig::HttpOutput => "http_output".to_string(),
1415            TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
1416            TransportConfig::RedisOutput(_) => "redis_output".to_string(),
1417            TransportConfig::ClockInput(_) => "clock".to_string(),
1418        }
1419    }
1420}
1421
1422/// Data format specification used to parse raw data received from the
1423/// endpoint or to encode data sent to the endpoint.
1424#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
1425pub struct FormatConfig {
1426    /// Format name, e.g., "csv", "json", "bincode", etc.
1427    pub name: Cow<'static, str>,
1428
1429    /// Format-specific parser or encoder configuration.
1430    #[serde(default)]
1431    #[schema(value_type = Object)]
1432    pub config: YamlValue,
1433}
1434
1435#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, ToSchema)]
1436#[serde(default)]
1437pub struct ResourceConfig {
1438    /// The minimum number of CPU cores to reserve
1439    /// for an instance of this pipeline
1440    #[serde(deserialize_with = "deserialize_via_value")]
1441    pub cpu_cores_min: Option<f64>,
1442
1443    /// The maximum number of CPU cores to reserve
1444    /// for an instance of this pipeline
1445    #[serde(deserialize_with = "deserialize_via_value")]
1446    pub cpu_cores_max: Option<f64>,
1447
1448    /// The minimum memory in Megabytes to reserve
1449    /// for an instance of this pipeline
1450    pub memory_mb_min: Option<u64>,
1451
1452    /// The maximum memory in Megabytes to reserve
1453    /// for an instance of this pipeline
1454    pub memory_mb_max: Option<u64>,
1455
1456    /// The total storage in Megabytes to reserve
1457    /// for an instance of this pipeline
1458    pub storage_mb_max: Option<u64>,
1459
1460    /// Storage class to use for an instance of this pipeline.
1461    /// The class determines storage performance such as IOPS and throughput.
1462    pub storage_class: Option<String>,
1463}
1464
1465/// Use this as a serde deserialization function to work around `serde_json`
1466/// [issues] with nested `f64`.  It works in two steps, first deserializing a
1467/// `serde_json::Value` from `deserializer`, then deserializing `T` from that
1468/// `serde_json::Value`.
1469///
1470/// [issues]: https://github.com/serde-rs/json/issues/1157
1471fn deserialize_via_value<'de, D, T>(deserializer: D) -> Result<T, D::Error>
1472where
1473    D: Deserializer<'de>,
1474    T: DeserializeOwned,
1475{
1476    serde_json::from_value(serde_json::Value::deserialize(deserializer)?).map_err(D::Error::custom)
1477}