feldera_types/
config.rs

1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure.  The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs.  We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::program_schema::ProgramSchema;
9use crate::secret_resolver::default_secrets_directory;
10use crate::transport::adhoc::AdHocInputConfig;
11use crate::transport::clock::ClockConfig;
12use crate::transport::datagen::DatagenInputConfig;
13use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
14use crate::transport::file::{FileInputConfig, FileOutputConfig};
15use crate::transport::http::HttpInputConfig;
16use crate::transport::iceberg::IcebergReaderConfig;
17use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
18use crate::transport::nats::NatsInputConfig;
19use crate::transport::nexmark::NexmarkInputConfig;
20use crate::transport::postgres::{PostgresReaderConfig, PostgresWriterConfig};
21use crate::transport::pubsub::PubSubInputConfig;
22use crate::transport::redis::RedisOutputConfig;
23use crate::transport::s3::S3InputConfig;
24use crate::transport::url::UrlInputConfig;
25use core::fmt;
26use feldera_ir::{MirNode, MirNodeId};
27use serde::de::{self, MapAccess, Visitor};
28use serde::{Deserialize, Deserializer, Serialize};
29use serde_json::Value as JsonValue;
30use std::collections::HashMap;
31use std::fmt::Display;
32use std::path::Path;
33use std::str::FromStr;
34use std::time::Duration;
35use std::{borrow::Cow, cmp::max, collections::BTreeMap};
36use utoipa::openapi::{ObjectBuilder, OneOfBuilder, Ref, RefOr, Schema, SchemaType};
37use utoipa::ToSchema;
38
39const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
40
41/// Default value of `ConnectorConfig::max_queued_records`.
42pub const fn default_max_queued_records() -> u64 {
43    1_000_000
44}
45
46/// Default maximum batch size for connectors, in records.
47///
48/// If you change this then update the comment on
49/// [ConnectorConfig::max_batch_size].
50pub const fn default_max_batch_size() -> u64 {
51    10_000
52}
53
54pub const DEFAULT_CLOCK_RESOLUTION_USECS: u64 = 1_000_000;
55
56/// Program information included in the pipeline configuration.
57#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)]
58pub struct ProgramIr {
59    /// The MIR of the program.
60    pub mir: HashMap<MirNodeId, MirNode>,
61    /// Program schema.
62    pub program_schema: ProgramSchema,
63}
64
65/// Pipeline deployment configuration.
66/// It represents configuration entries directly provided by the user
67/// (e.g., runtime configuration) and entries derived from the schema
68/// of the compiled program (e.g., connectors). Storage configuration,
69/// if applicable, is set by the runner.
70#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq)]
71pub struct PipelineConfig {
72    /// Global controller configuration.
73    #[serde(flatten)]
74    #[schema(inline)]
75    pub global: RuntimeConfig,
76
77    /// Unique system-generated name of the pipeline (format: `pipeline-<uuid>`).
78    /// It is unique across all tenants and cannot be changed.
79    ///
80    /// The `<uuid>` is also used in the naming of various resources that back the pipeline,
81    /// and as such this name is useful to find/identify corresponding resources.
82    pub name: Option<String>,
83
84    /// Name given by the tenant to the pipeline. It is only unique within the same tenant, and can
85    /// be changed by the tenant when the pipeline is stopped.
86    ///
87    /// Given a specific tenant, it can be used to find/identify a specific pipeline of theirs.
88    pub given_name: Option<String>,
89
90    /// Configuration for persistent storage
91    ///
92    /// If `global.storage` is `Some(_)`, this field must be set to some
93    /// [`StorageConfig`].  If `global.storage` is `None``, the pipeline ignores
94    /// this field.
95    #[serde(default)]
96    pub storage_config: Option<StorageConfig>,
97
98    /// Directory containing values of secrets.
99    ///
100    /// If this is not set, a default directory is used.
101    pub secrets_dir: Option<String>,
102
103    /// Input endpoint configuration.
104    #[serde(default)]
105    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
106
107    /// Output endpoint configuration.
108    #[serde(default)]
109    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
110
111    /// Program information.
112    #[serde(default)]
113    pub program_ir: Option<ProgramIr>,
114}
115
116impl PipelineConfig {
117    pub fn max_parallel_connector_init(&self) -> u64 {
118        max(
119            self.global
120                .max_parallel_connector_init
121                .unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
122            1,
123        )
124    }
125
126    pub fn with_storage(self, storage: Option<(StorageConfig, StorageOptions)>) -> Self {
127        let (storage_config, storage_options) = storage.unzip();
128        Self {
129            global: RuntimeConfig {
130                storage: storage_options,
131                ..self.global
132            },
133            storage_config,
134            ..self
135        }
136    }
137
138    pub fn storage(&self) -> Option<(&StorageConfig, &StorageOptions)> {
139        let storage_options = self.global.storage.as_ref();
140        let storage_config = self.storage_config.as_ref();
141        storage_config.zip(storage_options)
142    }
143
144    /// Returns `self.secrets_dir`, or the default secrets directory if it isn't
145    /// set.
146    pub fn secrets_dir(&self) -> &Path {
147        match &self.secrets_dir {
148            Some(dir) => Path::new(dir.as_str()),
149            None => default_secrets_directory(),
150        }
151    }
152
153    /// Abbreviated config that can be printed in the log on pipeline startup.
154    pub fn display_summary(&self) -> String {
155        // TODO: we may want to further abbreviate connector config.
156        let summary = serde_json::json!({
157            "name": self.name,
158            "given_name": self.given_name,
159            "global": self.global,
160            "storage_config": self.storage_config,
161            "secrets_dir": self.secrets_dir,
162            "inputs": self.inputs,
163            "outputs": self.outputs
164        });
165
166        serde_json::to_string_pretty(&summary).unwrap_or_else(|_| "{}".to_string())
167    }
168}
169
170/// A subset of fields in `PipelineConfig` that are generated by the compiler.
171/// These fields are shipped to the pipeline by the compilation server along with
172/// the program binary.
173// Note: An alternative would be to embed these fields in the program binary itself
174// as static strings. This would work well for program IR, but it would require recompiling
175// the program anytime a connector config changes, whereas today connector changes
176// do not require recompilation.
177#[derive(Default, Deserialize, Serialize, Eq, PartialEq, Debug, Clone)]
178pub struct PipelineConfigProgramInfo {
179    /// Input endpoint configuration.
180    pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
181
182    /// Output endpoint configuration.
183    #[serde(default)]
184    pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
185
186    /// Program information.
187    #[serde(default)]
188    pub program_ir: Option<ProgramIr>,
189}
190
191/// Configuration for persistent storage in a [`PipelineConfig`].
192#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
193pub struct StorageConfig {
194    /// A directory to keep pipeline state, as a path on the filesystem of the
195    /// machine or container where the pipeline will run.
196    ///
197    /// When storage is enabled, this directory stores the data for
198    /// [StorageBackendConfig::Default].
199    ///
200    /// When fault tolerance is enabled, this directory stores checkpoints and
201    /// the log.
202    pub path: String,
203
204    /// How to cache access to storage in this pipeline.
205    #[serde(default)]
206    pub cache: StorageCacheConfig,
207}
208
209impl StorageConfig {
210    pub fn path(&self) -> &Path {
211        Path::new(&self.path)
212    }
213}
214
215/// How to cache access to storage within a Feldera pipeline.
216#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
217#[serde(rename_all = "snake_case")]
218pub enum StorageCacheConfig {
219    /// Use the operating system's page cache as the primary storage cache.
220    ///
221    /// This is the default because it currently performs better than
222    /// `FelderaCache`.
223    #[default]
224    PageCache,
225
226    /// Use Feldera's internal cache implementation.
227    ///
228    /// This is under development. It will become the default when its
229    /// performance exceeds that of `PageCache`.
230    FelderaCache,
231}
232
233impl StorageCacheConfig {
234    #[cfg(unix)]
235    pub fn to_custom_open_flags(&self) -> i32 {
236        match self {
237            StorageCacheConfig::PageCache => (),
238            StorageCacheConfig::FelderaCache => {
239                #[cfg(target_os = "linux")]
240                return libc::O_DIRECT;
241            }
242        }
243        0
244    }
245}
246
247/// Storage configuration for a pipeline.
248#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
249#[serde(default)]
250pub struct StorageOptions {
251    /// How to connect to the underlying storage.
252    pub backend: StorageBackendConfig,
253
254    /// For a batch of data maintained as part of a persistent index during a
255    /// pipeline run, the minimum estimated number of bytes to write it to
256    /// storage.
257    ///
258    /// This is provided for debugging and fine-tuning and should ordinarily be
259    /// left unset.
260    ///
261    /// A value of 0 will write even empty batches to storage, and nonzero
262    /// values provide a threshold.  `usize::MAX` would effectively disable
263    /// storage for such batches.  The default is 10,048,576 (10 MiB).
264    pub min_storage_bytes: Option<usize>,
265
266    /// For a batch of data passed through the pipeline during a single step,
267    /// the minimum estimated number of bytes to write it to storage.
268    ///
269    /// This is provided for debugging and fine-tuning and should ordinarily be
270    /// left unset.  A value of 0 will write even empty batches to storage, and
271    /// nonzero values provide a threshold.  `usize::MAX`, the default,
272    /// effectively disables storage for such batches.  If it is set to another
273    /// value, it should ordinarily be greater than or equal to
274    /// `min_storage_bytes`.
275    pub min_step_storage_bytes: Option<usize>,
276
277    /// The form of compression to use in data batches.
278    ///
279    /// Compression has a CPU cost but it can take better advantage of limited
280    /// NVMe and network bandwidth, which means that it can increase overall
281    /// performance.
282    pub compression: StorageCompression,
283
284    /// The maximum size of the in-memory storage cache, in MiB.
285    ///
286    /// If set, the specified cache size is spread across all the foreground and
287    /// background threads. If unset, each foreground or background thread cache
288    /// is limited to 256 MiB.
289    pub cache_mib: Option<usize>,
290}
291
292/// Backend storage configuration.
293#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
294#[serde(tag = "name", content = "config", rename_all = "snake_case")]
295pub enum StorageBackendConfig {
296    /// Use the default storage configuration.
297    ///
298    /// This currently uses the local file system.
299    #[default]
300    Default,
301
302    /// Use the local file system.
303    ///
304    /// This uses ordinary system file operations.
305    File(Box<FileBackendConfig>),
306
307    /// Object storage.
308    Object(ObjectStorageConfig),
309}
310
311impl Display for StorageBackendConfig {
312    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
313        match self {
314            StorageBackendConfig::Default => write!(f, "default"),
315            StorageBackendConfig::File(_) => write!(f, "file"),
316            StorageBackendConfig::Object(_) => write!(f, "object"),
317        }
318    }
319}
320
321/// Storage compression algorithm.
322#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
323#[serde(rename_all = "snake_case")]
324pub enum StorageCompression {
325    /// Use Feldera's default compression algorithm.
326    ///
327    /// The default may change as Feldera's performance is tuned and new
328    /// algorithms are introduced.
329    #[default]
330    Default,
331
332    /// Do not compress.
333    None,
334
335    /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
336    Snappy,
337}
338
339#[derive(Debug, Clone, Eq, PartialEq)]
340pub enum StartFromCheckpoint {
341    Latest,
342    Uuid(uuid::Uuid),
343}
344
345impl ToSchema<'_> for StartFromCheckpoint {
346    fn schema() -> (
347        &'static str,
348        utoipa::openapi::RefOr<utoipa::openapi::schema::Schema>,
349    ) {
350        (
351            "StartFromCheckpoint",
352            utoipa::openapi::RefOr::T(Schema::OneOf(
353                OneOfBuilder::new()
354                    .item(
355                        ObjectBuilder::new()
356                            .schema_type(SchemaType::String)
357                            .enum_values(Some(["latest"].into_iter()))
358                            .build(),
359                    )
360                    .item(
361                        ObjectBuilder::new()
362                            .schema_type(SchemaType::String)
363                            .format(Some(utoipa::openapi::SchemaFormat::KnownFormat(
364                                utoipa::openapi::KnownFormat::Uuid,
365                            )))
366                            .build(),
367                    )
368                    .nullable(true)
369                    .build(),
370            )),
371        )
372    }
373}
374
375impl<'de> Deserialize<'de> for StartFromCheckpoint {
376    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
377    where
378        D: Deserializer<'de>,
379    {
380        struct StartFromCheckpointVisitor;
381
382        impl<'de> Visitor<'de> for StartFromCheckpointVisitor {
383            type Value = StartFromCheckpoint;
384
385            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
386                formatter.write_str("a UUID string or the string \"latest\"")
387            }
388
389            fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
390            where
391                E: de::Error,
392            {
393                if value == "latest" {
394                    Ok(StartFromCheckpoint::Latest)
395                } else {
396                    uuid::Uuid::parse_str(value)
397                        .map(StartFromCheckpoint::Uuid)
398                        .map_err(|_| E::invalid_value(serde::de::Unexpected::Str(value), &self))
399                }
400            }
401        }
402
403        deserializer.deserialize_str(StartFromCheckpointVisitor)
404    }
405}
406
407impl Serialize for StartFromCheckpoint {
408    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
409    where
410        S: serde::Serializer,
411    {
412        match self {
413            StartFromCheckpoint::Latest => serializer.serialize_str("latest"),
414            StartFromCheckpoint::Uuid(uuid) => serializer.serialize_str(&uuid.to_string()),
415        }
416    }
417}
418
419#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
420pub struct SyncConfig {
421    /// The endpoint URL for the storage service.
422    ///
423    /// This is typically required for custom or local S3-compatible storage providers like MinIO.
424    /// Example: `http://localhost:9000`
425    ///
426    /// Relevant rclone config key: [`endpoint`](https://rclone.org/s3/#s3-endpoint)
427    pub endpoint: Option<String>,
428
429    /// The name of the storage bucket.
430    ///
431    /// This may include a path to a folder inside the bucket (e.g., `my-bucket/data`).
432    pub bucket: String,
433
434    /// The region that this bucket is in.
435    ///
436    /// Leave empty for Minio or the default region (`us-east-1` for AWS).
437    pub region: Option<String>,
438
439    /// The name of the cloud storage provider (e.g., `"AWS"`, `"Minio"`).
440    ///
441    /// Used for provider-specific behavior in rclone.
442    /// If omitted, defaults to `"Other"`.
443    ///
444    /// See [rclone S3 provider documentation](https://rclone.org/s3/#s3-provider)
445    pub provider: Option<String>,
446
447    /// The access key used to authenticate with the storage provider.
448    ///
449    /// If not provided, rclone will fall back to environment-based credentials, such as
450    /// `RCLONE_S3_ACCESS_KEY_ID`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
451    /// this can be left empty to allow automatic authentication via the pod's service account.
452    pub access_key: Option<String>,
453
454    /// The secret key used together with the access key for authentication.
455    ///
456    /// If not provided, rclone will fall back to environment-based credentials, such as
457    /// `RCLONE_S3_SECRET_ACCESS_KEY`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
458    /// this can be left empty to allow automatic authentication via the pod's service account.
459    pub secret_key: Option<String>,
460
461    /// When set, the pipeline will try fetch the specified checkpoint from the
462    /// object store.
463    ///
464    /// If `fail_if_no_checkpoint` is `true`, the pipeline will fail to initialize.
465    pub start_from_checkpoint: Option<StartFromCheckpoint>,
466
467    /// When true, the pipeline will fail to initialize if fetching the
468    /// specified checkpoint fails (missing, download error).
469    /// When false, the pipeline will start from scratch instead.
470    ///
471    /// False by default.
472    #[schema(default = std::primitive::bool::default)]
473    #[serde(default)]
474    pub fail_if_no_checkpoint: bool,
475
476    /// The number of file transfers to run in parallel.
477    /// Default: 20
478    pub transfers: Option<u8>,
479
480    /// The number of checkers to run in parallel.
481    /// Default: 20
482    pub checkers: Option<u8>,
483
484    /// Set to skip post copy check of checksums, and only check the file sizes.
485    /// This can significantly improve the throughput.
486    /// Defualt: false
487    pub ignore_checksum: Option<bool>,
488
489    /// Number of streams to use for multi-thread downloads.
490    /// Default: 10
491    pub multi_thread_streams: Option<u8>,
492
493    /// Use multi-thread download for files above this size.
494    /// Format: `[size][Suffix]` (Example: 1G, 500M)
495    /// Supported suffixes: k|M|G|T
496    /// Default: 100M
497    pub multi_thread_cutoff: Option<String>,
498
499    /// The number of chunks of the same file that are uploaded for multipart uploads.
500    /// Default: 10
501    pub upload_concurrency: Option<u8>,
502
503    /// When `true`, the pipeline starts in **standby** mode; processing doesn't
504    /// start until activation (`POST /activate`).
505    /// If this pipeline was previously activated and the storage has not been
506    /// cleared, the pipeline will auto activate, no newer checkpoints will be
507    /// fetched.
508    ///
509    /// Standby behavior depends on `start_from_checkpoint`:
510    /// - If `latest`, pipeline continuously fetches the latest available
511    ///   checkpoint until activated.
512    /// - If checkpoint UUID, pipeline fetches this checkpoint once and waits
513    ///   in standby until activated.
514    ///
515    /// Default: `false`
516    #[schema(default = std::primitive::bool::default)]
517    #[serde(default)]
518    pub standby: bool,
519
520    /// The interval (in seconds) between each attempt to fetch the latest
521    /// checkpoint from object store while in standby mode.
522    ///
523    /// Applies only when `start_from_checkpoint` is set to `latest`.
524    ///
525    /// Default: 10 seconds
526    #[schema(default = default_pull_interval)]
527    #[serde(default = "default_pull_interval")]
528    pub pull_interval: u64,
529
530    /// Extra flags to pass to `rclone`.
531    ///
532    /// WARNING: Supplying incorrect or conflicting flags can break `rclone`.
533    /// Use with caution.
534    ///
535    /// Refer to the docs to see the supported flags:
536    /// - [Global flags](https://rclone.org/flags/)
537    /// - [S3 specific flags](https://rclone.org/s3/)
538    pub flags: Option<Vec<String>>,
539
540    /// The minimum number of checkpoints to retain in object store.
541    /// No checkpoints will be deleted if the total count is below this threshold.
542    ///
543    /// Default: 10
544    #[schema(default = default_retention_min_count)]
545    #[serde(default = "default_retention_min_count")]
546    pub retention_min_count: u32,
547
548    /// The minimum age (in days) a checkpoint must reach before it becomes
549    /// eligible for deletion. All younger checkpoints will be preserved.
550    ///
551    /// Default: 30
552    #[schema(default = default_retention_min_age)]
553    #[serde(default = "default_retention_min_age")]
554    pub retention_min_age: u32,
555}
556
557fn default_pull_interval() -> u64 {
558    10
559}
560
561fn default_retention_min_count() -> u32 {
562    10
563}
564
565fn default_retention_min_age() -> u32 {
566    30
567}
568
569impl SyncConfig {
570    pub fn validate(&self) -> Result<(), String> {
571        if self.standby && self.start_from_checkpoint.is_none() {
572            return Err(r#"invalid sync config: `standby` set to `true` but `start_from_checkpoint` not set.
573Standby mode requires `start_from_checkpoint` to be set.
574Consider setting `start_from_checkpoint` to `"latest"`."#.to_owned());
575        }
576
577        Ok(())
578    }
579}
580
581#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
582pub struct ObjectStorageConfig {
583    /// URL.
584    ///
585    /// The following URL schemes are supported:
586    ///
587    /// * S3:
588    ///   - `s3://<bucket>/<path>`
589    ///   - `s3a://<bucket>/<path>`
590    ///   - `https://s3.<region>.amazonaws.com/<bucket>`
591    ///   - `https://<bucket>.s3.<region>.amazonaws.com`
592    ///   - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
593    /// * Google Cloud Storage:
594    ///   - `gs://<bucket>/<path>`
595    /// * Microsoft Azure Blob Storage:
596    ///   - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
597    ///   - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
598    ///   - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
599    ///   - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
600    ///   - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
601    ///   - `azure://<container>/<path>` (custom)
602    ///   - `https://<account>.dfs.core.windows.net`
603    ///   - `https://<account>.blob.core.windows.net`
604    ///   - `https://<account>.blob.core.windows.net/<container>`
605    ///   - `https://<account>.dfs.fabric.microsoft.com`
606    ///   - `https://<account>.dfs.fabric.microsoft.com/<container>`
607    ///   - `https://<account>.blob.fabric.microsoft.com`
608    ///   - `https://<account>.blob.fabric.microsoft.com/<container>`
609    ///
610    /// Settings derived from the URL will override other settings.
611    pub url: String,
612
613    /// Additional options as key-value pairs.
614    ///
615    /// The following keys are supported:
616    ///
617    /// * S3:
618    ///   - `access_key_id`: AWS Access Key.
619    ///   - `secret_access_key`: AWS Secret Access Key.
620    ///   - `region`: Region.
621    ///   - `default_region`: Default region.
622    ///   - `endpoint`: Custom endpoint for communicating with S3,
623    ///     e.g. `https://localhost:4566` for testing against a localstack
624    ///     instance.
625    ///   - `token`: Token to use for requests (passed to underlying provider).
626    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
627    /// * Google Cloud Storage:
628    ///   - `service_account`: Path to the service account file.
629    ///   - `service_account_key`: The serialized service account key.
630    ///   - `google_application_credentials`: Application credentials path.
631    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
632    /// * Microsoft Azure Blob Storage:
633    ///   - `access_key`: Azure Access Key.
634    ///   - `container_name`: Azure Container Name.
635    ///   - `account`: Azure Account.
636    ///   - `bearer_token_authorization`: Static bearer token for authorizing requests.
637    ///   - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
638    ///   - `client_secret`: Client secret for use in client secret flow.
639    ///   - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
640    ///   - `endpoint`: Override the endpoint for communicating with blob storage.
641    ///   - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
642    ///
643    /// Options set through the URL take precedence over those set with these
644    /// options.
645    #[serde(flatten)]
646    pub other_options: BTreeMap<String, String>,
647}
648
649/// Configuration for local file system access.
650#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
651#[serde(default)]
652pub struct FileBackendConfig {
653    /// Whether to use background threads for file I/O.
654    ///
655    /// Background threads should improve performance, but they can reduce
656    /// performance if too few cores are available. This is provided for
657    /// debugging and fine-tuning and should ordinarily be left unset.
658    pub async_threads: Option<bool>,
659
660    /// Per-I/O operation sleep duration, in milliseconds.
661    ///
662    /// This is for simulating slow storage devices.  Do not use this in
663    /// production.
664    pub ioop_delay: Option<u64>,
665
666    /// Configuration to synchronize checkpoints to object store.
667    pub sync: Option<SyncConfig>,
668}
669
670/// Global pipeline configuration settings. This is the publicly
671/// exposed type for users to configure pipelines.
672#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
673#[serde(default)]
674pub struct RuntimeConfig {
675    /// Number of DBSP worker threads.
676    ///
677    /// Each DBSP "foreground" worker thread is paired with a "background"
678    /// thread for LSM merging, making the total number of threads twice the
679    /// specified number.
680    ///
681    /// The typical sweet spot for the number of workers is between 4 and 16.
682    /// Each worker increases overall memory consumption for data structures
683    /// used during a step.
684    pub workers: u16,
685
686    /// Storage configuration.
687    ///
688    /// - If this is `None`, the default, the pipeline's state is kept in
689    ///   in-memory data-structures.  This is useful if the pipeline's state
690    ///   will fit in memory and if the pipeline is ephemeral and does not need
691    ///   to be recovered after a restart. The pipeline will most likely run
692    ///   faster since it does not need to access storage.
693    ///
694    /// - If set, the pipeline's state is kept on storage.  This allows the
695    ///   pipeline to work with state that will not fit into memory. It also
696    ///   allows the state to be checkpointed and recovered across restarts.
697    #[serde(deserialize_with = "deserialize_storage_options")]
698    pub storage: Option<StorageOptions>,
699
700    /// Fault tolerance configuration.
701    #[serde(deserialize_with = "deserialize_fault_tolerance")]
702    pub fault_tolerance: FtConfig,
703
704    /// Enable CPU profiler.
705    ///
706    /// The default value is `true`.
707    pub cpu_profiler: bool,
708
709    /// Enable pipeline tracing.
710    pub tracing: bool,
711
712    /// Jaeger tracing endpoint to send tracing information to.
713    pub tracing_endpoint_jaeger: String,
714
715    /// Minimal input batch size.
716    ///
717    /// The controller delays pushing input records to the circuit until at
718    /// least `min_batch_size_records` records have been received (total
719    /// across all endpoints) or `max_buffering_delay_usecs` microseconds
720    /// have passed since at least one input records has been buffered.
721    /// Defaults to 0.
722    pub min_batch_size_records: u64,
723
724    /// Maximal delay in microseconds to wait for `min_batch_size_records` to
725    /// get buffered by the controller, defaults to 0.
726    pub max_buffering_delay_usecs: u64,
727
728    /// Resource reservations and limits. This is enforced
729    /// only in Feldera Cloud.
730    pub resources: ResourceConfig,
731
732    /// Real-time clock resolution in microseconds.
733    ///
734    /// This parameter controls the execution of queries that use the `NOW()` function.  The output of such
735    /// queries depends on the real-time clock and can change over time without any external
736    /// inputs.  If the query uses `NOW()`, the pipeline will update the clock value and trigger incremental
737    /// recomputation at most each `clock_resolution_usecs` microseconds.  If the query does not use
738    /// `NOW()`, then clock value updates are suppressed and the pipeline ignores this setting.
739    ///
740    /// It is set to 1 second (1,000,000 microseconds) by default.
741    pub clock_resolution_usecs: Option<u64>,
742
743    /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
744    /// its worker threads.  Specify at least twice as many CPU numbers as
745    /// workers.  CPUs are generally numbered starting from 0.  The pipeline
746    /// might not be able to honor CPU pinning requests.
747    ///
748    /// CPU pinning can make pipelines run faster and perform more consistently,
749    /// as long as different pipelines running on the same machine are pinned to
750    /// different CPUs.
751    pub pin_cpus: Vec<usize>,
752
753    /// Timeout in seconds for the `Provisioning` phase of the pipeline.
754    /// Setting this value will override the default of the runner.
755    pub provisioning_timeout_secs: Option<u64>,
756
757    /// The maximum number of connectors initialized in parallel during pipeline
758    /// startup.
759    ///
760    /// At startup, the pipeline must initialize all of its input and output connectors.
761    /// Depending on the number and types of connectors, this can take a long time.
762    /// To accelerate the process, multiple connectors are initialized concurrently.
763    /// This option controls the maximum number of connectors that can be initialized
764    /// in parallel.
765    ///
766    /// The default is 10.
767    pub max_parallel_connector_init: Option<u64>,
768
769    /// Specification of additional (sidecar) containers.
770    pub init_containers: Option<serde_json::Value>,
771
772    /// Deprecated: setting this true or false does not have an effect anymore.
773    pub checkpoint_during_suspend: bool,
774
775    /// Sets the number of available runtime threads for the http server.
776    ///
777    /// In most cases, this does not need to be set explicitly and
778    /// the default is sufficient. Can be increased in case the
779    /// pipeline HTTP API operations are a bottleneck.
780    ///
781    /// If not specified, the default is set to `workers`.
782    pub http_workers: Option<u64>,
783
784    /// Sets the number of available runtime threads for async IO tasks.
785    ///
786    /// This affects some networking and file I/O operations
787    /// especially adapters and ad-hoc queries.
788    ///
789    /// In most cases, this does not need to be set explicitly and
790    /// the default is sufficient. Can be increased in case
791    /// ingress, egress or ad-hoc queries are a bottleneck.
792    ///
793    /// If not specified, the default is set to `workers`.
794    pub io_workers: Option<u64>,
795
796    /// Optional settings for tweaking Feldera internals.
797    ///
798    /// The available key-value pairs change from one version of Feldera to
799    /// another, so users should not depend on particular settings being
800    /// available, or on their behavior.
801    pub dev_tweaks: BTreeMap<String, serde_json::Value>,
802
803    /// Log filtering directives.
804    ///
805    /// If set to a valid [tracing-subscriber] filter, this controls the log
806    /// messages emitted by the pipeline process.  Otherwise, or if the filter
807    /// has invalid syntax, messages at "info" severity and higher are written
808    /// to the log and all others are discarded.
809    ///
810    /// [tracing-subscriber]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
811    pub logging: Option<String>,
812}
813
814/// Accepts "true" and "false" and converts them to the new format.
815fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
816where
817    D: Deserializer<'de>,
818{
819    struct BoolOrStruct;
820
821    impl<'de> Visitor<'de> for BoolOrStruct {
822        type Value = Option<StorageOptions>;
823
824        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
825            formatter.write_str("boolean or StorageOptions")
826        }
827
828        fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
829        where
830            E: de::Error,
831        {
832            match v {
833                false => Ok(None),
834                true => Ok(Some(StorageOptions::default())),
835            }
836        }
837
838        fn visit_unit<E>(self) -> Result<Self::Value, E>
839        where
840            E: de::Error,
841        {
842            Ok(None)
843        }
844
845        fn visit_none<E>(self) -> Result<Self::Value, E>
846        where
847            E: de::Error,
848        {
849            Ok(None)
850        }
851
852        fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
853        where
854            M: MapAccess<'de>,
855        {
856            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
857        }
858    }
859
860    deserializer.deserialize_any(BoolOrStruct)
861}
862
863/// Accepts very old 'initial_state' and 'latest_checkpoint' as enabling fault
864/// tolerance.
865///
866/// Accepts `null` as disabling fault tolerance.
867///
868/// Otherwise, deserializes [FtConfig] in the way that one might otherwise
869/// expect.
870fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<FtConfig, D::Error>
871where
872    D: Deserializer<'de>,
873{
874    struct StringOrStruct;
875
876    impl<'de> Visitor<'de> for StringOrStruct {
877        type Value = FtConfig;
878
879        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
880            formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
881        }
882
883        fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
884        where
885            E: de::Error,
886        {
887            match v {
888                "initial_state" | "latest_checkpoint" => Ok(FtConfig {
889                    model: Some(FtModel::default()),
890                    ..FtConfig::default()
891                }),
892                _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
893            }
894        }
895
896        fn visit_unit<E>(self) -> Result<Self::Value, E>
897        where
898            E: de::Error,
899        {
900            Ok(FtConfig::default())
901        }
902
903        fn visit_none<E>(self) -> Result<Self::Value, E>
904        where
905            E: de::Error,
906        {
907            Ok(FtConfig::default())
908        }
909
910        fn visit_map<M>(self, map: M) -> Result<FtConfig, M::Error>
911        where
912            M: MapAccess<'de>,
913        {
914            Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))
915        }
916    }
917
918    deserializer.deserialize_any(StringOrStruct)
919}
920
921impl Default for RuntimeConfig {
922    fn default() -> Self {
923        Self {
924            workers: 8,
925            storage: Some(StorageOptions::default()),
926            fault_tolerance: FtConfig::default(),
927            cpu_profiler: true,
928            tracing: {
929                // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
930                // to keep it on by default.
931                false
932            },
933            tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
934            min_batch_size_records: 0,
935            max_buffering_delay_usecs: 0,
936            resources: ResourceConfig::default(),
937            clock_resolution_usecs: { Some(DEFAULT_CLOCK_RESOLUTION_USECS) },
938            pin_cpus: Vec::new(),
939            provisioning_timeout_secs: None,
940            max_parallel_connector_init: None,
941            init_containers: None,
942            checkpoint_during_suspend: true,
943            io_workers: None,
944            http_workers: None,
945            dev_tweaks: BTreeMap::default(),
946            logging: None,
947        }
948    }
949}
950
951/// Fault-tolerance configuration.
952///
953/// The default [FtConfig] (via [FtConfig::default]) disables fault tolerance,
954/// which is the configuration that one gets if [RuntimeConfig] omits fault
955/// tolerance configuration.
956///
957/// The default value for [FtConfig::model] enables fault tolerance, as
958/// `Some(FtModel::default())`.  This is the configuration that one gets if
959/// [RuntimeConfig] includes a fault tolerance configuration but does not
960/// specify a particular model.
961#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
962#[serde(rename_all = "snake_case")]
963pub struct FtConfig {
964    /// Fault tolerance model to use.
965    #[serde(with = "none_as_string")]
966    #[serde(default = "default_model")]
967    #[schema(
968        schema_with = none_as_string_schema::<FtModel>,
969    )]
970    pub model: Option<FtModel>,
971
972    /// Interval between automatic checkpoints, in seconds.
973    ///
974    /// The default is 60 seconds.  Values less than 1 or greater than 3600 will
975    /// be forced into that range.
976    #[serde(default = "default_checkpoint_interval_secs")]
977    pub checkpoint_interval_secs: Option<u64>,
978}
979
980fn default_model() -> Option<FtModel> {
981    Some(FtModel::default())
982}
983
984pub fn default_checkpoint_interval_secs() -> Option<u64> {
985    Some(60)
986}
987
988impl Default for FtConfig {
989    fn default() -> Self {
990        Self {
991            model: None,
992            checkpoint_interval_secs: default_checkpoint_interval_secs(),
993        }
994    }
995}
996
997#[cfg(test)]
998mod test {
999    use super::deserialize_fault_tolerance;
1000    use crate::config::{FtConfig, FtModel};
1001    use serde::{Deserialize, Serialize};
1002
1003    #[test]
1004    fn ft_config() {
1005        #[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)]
1006        #[serde(default)]
1007        struct Wrapper {
1008            #[serde(deserialize_with = "deserialize_fault_tolerance")]
1009            config: FtConfig,
1010        }
1011
1012        // Omitting FtConfig, or specifying null, or specifying model "none", disables fault tolerance.
1013        for s in [
1014            "{}",
1015            r#"{"config": null}"#,
1016            r#"{"config": {"model": "none"}}"#,
1017        ] {
1018            let config: Wrapper = serde_json::from_str(s).unwrap();
1019            assert_eq!(
1020                config,
1021                Wrapper {
1022                    config: FtConfig {
1023                        model: None,
1024                        checkpoint_interval_secs: Some(60)
1025                    }
1026                }
1027            );
1028        }
1029
1030        // Serializing disabled FT produces explicit "none" form.
1031        let s = serde_json::to_string(&Wrapper {
1032            config: FtConfig::default(),
1033        })
1034        .unwrap();
1035        assert!(s.contains("\"none\""));
1036
1037        // `{}` for FtConfig, or `{...}` with `model` omitted, enables fault
1038        // tolerance.
1039        for s in [r#"{"config": {}}"#, r#"{"checkpoint_interval_secs": 60}"#] {
1040            assert_eq!(
1041                serde_json::from_str::<FtConfig>(s).unwrap(),
1042                FtConfig {
1043                    model: Some(FtModel::default()),
1044                    checkpoint_interval_secs: Some(60)
1045                }
1046            );
1047        }
1048
1049        // `"checkpoint_interval_secs": null` disables periodic checkpointing.
1050        assert_eq!(
1051            serde_json::from_str::<FtConfig>(r#"{"checkpoint_interval_secs": null}"#).unwrap(),
1052            FtConfig {
1053                model: Some(FtModel::default()),
1054                checkpoint_interval_secs: None
1055            }
1056        );
1057    }
1058}
1059
1060impl FtConfig {
1061    pub fn is_enabled(&self) -> bool {
1062        self.model.is_some()
1063    }
1064
1065    /// Returns the checkpoint interval, if fault tolerance is enabled, and
1066    /// otherwise `None`.
1067    pub fn checkpoint_interval(&self) -> Option<Duration> {
1068        if self.is_enabled() {
1069            self.checkpoint_interval_secs
1070                .map(|interval| Duration::from_secs(interval.clamp(1, 3600)))
1071        } else {
1072            None
1073        }
1074    }
1075}
1076
1077/// Serde implementation for de/serializing a string into `Option<T>` where
1078/// `"none"` indicates `None` and any other string indicates `Some`.
1079///
1080/// This could be extended to handle non-strings by adding more forwarding
1081/// `visit_*` methods to the Visitor implementation.  I don't see a way to write
1082/// them automatically.
1083mod none_as_string {
1084    use std::marker::PhantomData;
1085
1086    use serde::de::{Deserialize, Deserializer, IntoDeserializer, Visitor};
1087    use serde::ser::{Serialize, Serializer};
1088
1089    pub(super) fn serialize<S, T>(value: &Option<T>, serializer: S) -> Result<S::Ok, S::Error>
1090    where
1091        S: Serializer,
1092        T: Serialize,
1093    {
1094        match value.as_ref() {
1095            Some(value) => value.serialize(serializer),
1096            None => "none".serialize(serializer),
1097        }
1098    }
1099
1100    struct NoneAsString<T>(PhantomData<fn() -> T>);
1101
1102    impl<'de, T> Visitor<'de> for NoneAsString<T>
1103    where
1104        T: Deserialize<'de>,
1105    {
1106        type Value = Option<T>;
1107
1108        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1109            formatter.write_str("string")
1110        }
1111
1112        fn visit_none<E>(self) -> Result<Self::Value, E>
1113        where
1114            E: serde::de::Error,
1115        {
1116            Ok(None)
1117        }
1118
1119        fn visit_str<E>(self, value: &str) -> Result<Option<T>, E>
1120        where
1121            E: serde::de::Error,
1122        {
1123            if &value.to_ascii_lowercase() == "none" {
1124                Ok(None)
1125            } else {
1126                Ok(Some(T::deserialize(value.into_deserializer())?))
1127            }
1128        }
1129    }
1130
1131    pub(super) fn deserialize<'de, D, T>(deserializer: D) -> Result<Option<T>, D::Error>
1132    where
1133        D: Deserializer<'de>,
1134        T: Deserialize<'de>,
1135    {
1136        deserializer.deserialize_str(NoneAsString(PhantomData))
1137    }
1138}
1139
1140/// Generates an OpenAPI schema for an `Option<T>` field serialized with `none_as_string`.
1141/// The schema is a `oneOf` with a reference to `T`'s schema and a `"none"` string enum.
1142fn none_as_string_schema<'a, T: ToSchema<'a> + Default + Serialize>() -> Schema {
1143    Schema::OneOf(
1144        OneOfBuilder::new()
1145            .item(RefOr::Ref(Ref::new(format!(
1146                "#/components/schemas/{}",
1147                T::schema().0
1148            ))))
1149            .item(
1150                ObjectBuilder::new()
1151                    .schema_type(SchemaType::String)
1152                    .enum_values(Some(vec!["none"])),
1153            )
1154            .default(Some(
1155                serde_json::to_value(T::default()).expect("Failed to serialize default value"),
1156            ))
1157            .build(),
1158    )
1159}
1160
1161/// Fault tolerance model.
1162///
1163/// The ordering is significant: we consider [Self::ExactlyOnce] to be a "higher
1164/// level" of fault tolerance than [Self::AtLeastOnce].
1165#[derive(
1166    Debug, Copy, Clone, Default, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, ToSchema,
1167)]
1168#[serde(rename_all = "snake_case")]
1169pub enum FtModel {
1170    /// Each record is output at least once.  Crashes may duplicate output, but
1171    /// no input or output is dropped.
1172    AtLeastOnce,
1173
1174    /// Each record is output exactly once.  Crashes do not drop or duplicate
1175    /// input or output.
1176    #[default]
1177    ExactlyOnce,
1178}
1179
1180impl FtModel {
1181    pub fn option_as_str(value: Option<FtModel>) -> &'static str {
1182        value.map_or("no", |model| model.as_str())
1183    }
1184
1185    pub fn as_str(&self) -> &'static str {
1186        match self {
1187            FtModel::AtLeastOnce => "at_least_once",
1188            FtModel::ExactlyOnce => "exactly_once",
1189        }
1190    }
1191}
1192
1193pub struct FtModelUnknown;
1194
1195impl FromStr for FtModel {
1196    type Err = FtModelUnknown;
1197
1198    fn from_str(s: &str) -> Result<Self, Self::Err> {
1199        match s.to_ascii_lowercase().as_str() {
1200            "exactly_once" => Ok(Self::ExactlyOnce),
1201            "at_least_once" => Ok(Self::AtLeastOnce),
1202            _ => Err(FtModelUnknown),
1203        }
1204    }
1205}
1206
1207/// Describes an input connector configuration
1208#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1209pub struct InputEndpointConfig {
1210    /// The name of the input stream of the circuit that this endpoint is
1211    /// connected to.
1212    pub stream: Cow<'static, str>,
1213
1214    /// Connector configuration.
1215    #[serde(flatten)]
1216    pub connector_config: ConnectorConfig,
1217}
1218
1219/// Deserialize the `start_after` property of a connector configuration.
1220/// It requires a non-standard deserialization because we want to accept
1221/// either a string or an array of strings.
1222fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
1223where
1224    D: Deserializer<'de>,
1225{
1226    let value = Option::<JsonValue>::deserialize(deserializer)?;
1227    match value {
1228        Some(JsonValue::String(s)) => Ok(Some(vec![s])),
1229        Some(JsonValue::Array(arr)) => {
1230            let vec = arr
1231                .into_iter()
1232                .map(|item| {
1233                    item.as_str()
1234                        .map(|s| s.to_string())
1235                        .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
1236                })
1237                .collect::<Result<Vec<String>, _>>()?;
1238            Ok(Some(vec))
1239        }
1240        Some(JsonValue::Null) | None => Ok(None),
1241        _ => Err(serde::de::Error::custom(
1242            "invalid 'start_after' property: expected a string, an array of strings, or null",
1243        )),
1244    }
1245}
1246
1247/// A data connector's configuration
1248#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1249pub struct ConnectorConfig {
1250    /// Transport endpoint configuration.
1251    pub transport: TransportConfig,
1252
1253    /// Parser configuration.
1254    pub format: Option<FormatConfig>,
1255
1256    /// Name of the index that the connector is attached to.
1257    ///
1258    /// This property is valid for output connectors only.  It is used with data
1259    /// transports and formats that expect output updates in the form of key/value
1260    /// pairs, where the key typically represents a unique id associated with the
1261    /// table or view.
1262    ///
1263    /// To support such output formats, an output connector can be attached to an
1264    /// index created using the SQL CREATE INDEX statement.  An index of a table
1265    /// or view contains the same updates as the table or view itself, indexed by
1266    /// one or more key columns.
1267    ///
1268    /// See individual connector documentation for details on how they work
1269    /// with indexes.
1270    pub index: Option<String>,
1271
1272    /// Output buffer configuration.
1273    #[serde(flatten)]
1274    pub output_buffer_config: OutputBufferConfig,
1275
1276    /// Maximum batch size, in records.
1277    ///
1278    /// This is the maximum number of records to process in one batch through
1279    /// the circuit.  The time and space cost of processing a batch is
1280    /// asymptotically superlinear in the size of the batch, but very small
1281    /// batches are less efficient due to constant factors.
1282    ///
1283    /// This should usually be less than `max_queued_records`, to give the
1284    /// connector a round-trip time to restart and refill the buffer while
1285    /// batches are being processed.
1286    ///
1287    /// Some input adapters might not honor this setting.
1288    ///
1289    /// The default is 10,000.
1290    #[serde(default = "default_max_batch_size")]
1291    pub max_batch_size: u64,
1292
1293    /// Backpressure threshold.
1294    ///
1295    /// Maximal number of records queued by the endpoint before the endpoint
1296    /// is paused by the backpressure mechanism.
1297    ///
1298    /// For input endpoints, this setting bounds the number of records that have
1299    /// been received from the input transport but haven't yet been consumed by
1300    /// the circuit since the circuit, since the circuit is still busy processing
1301    /// previous inputs.
1302    ///
1303    /// For output endpoints, this setting bounds the number of records that have
1304    /// been produced by the circuit but not yet sent via the output transport endpoint
1305    /// nor stored in the output buffer (see `enable_output_buffer`).
1306    ///
1307    /// Note that this is not a hard bound: there can be a small delay between
1308    /// the backpressure mechanism is triggered and the endpoint is paused, during
1309    /// which more data may be queued.
1310    ///
1311    /// The default is 1 million.
1312    #[serde(default = "default_max_queued_records")]
1313    pub max_queued_records: u64,
1314
1315    /// Create connector in paused state.
1316    ///
1317    /// The default is `false`.
1318    #[serde(default)]
1319    pub paused: bool,
1320
1321    /// Arbitrary user-defined text labels associated with the connector.
1322    ///
1323    /// These labels can be used in conjunction with the `start_after` property
1324    /// to control the start order of connectors.
1325    #[serde(default)]
1326    pub labels: Vec<String>,
1327
1328    /// Start the connector after all connectors with specified labels.
1329    ///
1330    /// This property is used to control the start order of connectors.
1331    /// The connector will not start until all connectors with the specified
1332    /// labels have finished processing all inputs.
1333    #[serde(deserialize_with = "deserialize_start_after")]
1334    #[serde(default)]
1335    pub start_after: Option<Vec<String>>,
1336}
1337
1338impl ConnectorConfig {
1339    /// Compare two configs modulo the `paused` field.
1340    ///
1341    /// Used to compare checkpointed and current connector configs.
1342    pub fn equal_modulo_paused(&self, other: &Self) -> bool {
1343        let mut a = self.clone();
1344        let mut b = other.clone();
1345        a.paused = false;
1346        b.paused = false;
1347        a == b
1348    }
1349}
1350
1351#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1352#[serde(default)]
1353pub struct OutputBufferConfig {
1354    /// Enable output buffering.
1355    ///
1356    /// The output buffering mechanism allows decoupling the rate at which the pipeline
1357    /// pushes changes to the output transport from the rate of input changes.
1358    ///
1359    /// By default, output updates produced by the pipeline are pushed directly to
1360    /// the output transport. Some destinations may prefer to receive updates in fewer
1361    /// bigger batches. For instance, when writing Parquet files, producing
1362    /// one bigger file every few minutes is usually better than creating
1363    /// small files every few milliseconds.
1364    ///
1365    /// To achieve such input/output decoupling, users can enable output buffering by
1366    /// setting the `enable_output_buffer` flag to `true`.  When buffering is enabled, output
1367    /// updates produced by the pipeline are consolidated in an internal buffer and are
1368    /// pushed to the output transport when one of several conditions is satisfied:
1369    ///
1370    /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
1371    ///   milliseconds.
1372    /// * buffer size exceeds `max_output_buffer_size_records` records.
1373    ///
1374    /// This flag is `false` by default.
1375    // TODO: on-demand output triggered via the API.
1376    pub enable_output_buffer: bool,
1377
1378    /// Maximum time in milliseconds data is kept in the output buffer.
1379    ///
1380    /// By default, data is kept in the buffer indefinitely until one of
1381    /// the other output conditions is satisfied.  When this option is
1382    /// set the buffer will be flushed at most every
1383    /// `max_output_buffer_time_millis` milliseconds.
1384    ///
1385    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1386    /// to be set.
1387    pub max_output_buffer_time_millis: usize,
1388
1389    /// Maximum number of updates to be kept in the output buffer.
1390    ///
1391    /// This parameter bounds the maximal size of the buffer.
1392    /// Note that the size of the buffer is not always equal to the
1393    /// total number of updates output by the pipeline. Updates to the
1394    /// same record can overwrite or cancel previous updates.
1395    ///
1396    /// By default, the buffer can grow indefinitely until one of
1397    /// the other output conditions is satisfied.
1398    ///
1399    /// NOTE: this configuration option requires the `enable_output_buffer` flag
1400    /// to be set.
1401    pub max_output_buffer_size_records: usize,
1402}
1403
1404impl Default for OutputBufferConfig {
1405    fn default() -> Self {
1406        Self {
1407            enable_output_buffer: false,
1408            max_output_buffer_size_records: usize::MAX,
1409            max_output_buffer_time_millis: usize::MAX,
1410        }
1411    }
1412}
1413
1414impl OutputBufferConfig {
1415    pub fn validate(&self) -> Result<(), String> {
1416        if self.enable_output_buffer
1417            && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
1418            && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
1419        {
1420            return Err(
1421                "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
1422                    .to_string(),
1423            );
1424        }
1425
1426        Ok(())
1427    }
1428}
1429
1430/// Describes an output connector configuration
1431#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1432pub struct OutputEndpointConfig {
1433    /// The name of the output stream of the circuit that this endpoint is
1434    /// connected to.
1435    pub stream: Cow<'static, str>,
1436
1437    /// Connector configuration.
1438    #[serde(flatten)]
1439    pub connector_config: ConnectorConfig,
1440}
1441
1442/// Transport-specific endpoint configuration passed to
1443/// `crate::OutputTransport::new_endpoint`
1444/// and `crate::InputTransport::new_endpoint`.
1445#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1446#[serde(tag = "name", content = "config", rename_all = "snake_case")]
1447pub enum TransportConfig {
1448    FileInput(FileInputConfig),
1449    FileOutput(FileOutputConfig),
1450    NatsInput(NatsInputConfig),
1451    KafkaInput(KafkaInputConfig),
1452    KafkaOutput(KafkaOutputConfig),
1453    PubSubInput(PubSubInputConfig),
1454    UrlInput(UrlInputConfig),
1455    S3Input(S3InputConfig),
1456    DeltaTableInput(DeltaTableReaderConfig),
1457    DeltaTableOutput(DeltaTableWriterConfig),
1458    RedisOutput(RedisOutputConfig),
1459    // Prevent rust from complaining about large size difference between enum variants.
1460    IcebergInput(Box<IcebergReaderConfig>),
1461    PostgresInput(PostgresReaderConfig),
1462    PostgresOutput(PostgresWriterConfig),
1463    Datagen(DatagenInputConfig),
1464    Nexmark(NexmarkInputConfig),
1465    /// Direct HTTP input: cannot be instantiated through API
1466    HttpInput(HttpInputConfig),
1467    /// Direct HTTP output: cannot be instantiated through API
1468    HttpOutput,
1469    /// Ad hoc input: cannot be instantiated through API
1470    AdHocInput(AdHocInputConfig),
1471    ClockInput(ClockConfig),
1472}
1473
1474impl TransportConfig {
1475    pub fn name(&self) -> String {
1476        match self {
1477            TransportConfig::FileInput(_) => "file_input".to_string(),
1478            TransportConfig::FileOutput(_) => "file_output".to_string(),
1479            TransportConfig::NatsInput(_) => "nats_input".to_string(),
1480            TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
1481            TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
1482            TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
1483            TransportConfig::UrlInput(_) => "url_input".to_string(),
1484            TransportConfig::S3Input(_) => "s3_input".to_string(),
1485            TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
1486            TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
1487            TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
1488            TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
1489            TransportConfig::PostgresOutput(_) => "postgres_output".to_string(),
1490            TransportConfig::Datagen(_) => "datagen".to_string(),
1491            TransportConfig::Nexmark(_) => "nexmark".to_string(),
1492            TransportConfig::HttpInput(_) => "http_input".to_string(),
1493            TransportConfig::HttpOutput => "http_output".to_string(),
1494            TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
1495            TransportConfig::RedisOutput(_) => "redis_output".to_string(),
1496            TransportConfig::ClockInput(_) => "clock".to_string(),
1497        }
1498    }
1499
1500    /// Returns true if the connector is transient, i.e., is created and destroyed
1501    /// at runtime on demand, rather than being configured as part of the pipeline.
1502    pub fn is_transient(&self) -> bool {
1503        matches!(
1504            self,
1505            TransportConfig::AdHocInput(_)
1506                | TransportConfig::HttpInput(_)
1507                | TransportConfig::HttpOutput
1508                | TransportConfig::ClockInput(_)
1509        )
1510    }
1511}
1512
1513/// Data format specification used to parse raw data received from the
1514/// endpoint or to encode data sent to the endpoint.
1515#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
1516pub struct FormatConfig {
1517    /// Format name, e.g., "csv", "json", "bincode", etc.
1518    pub name: Cow<'static, str>,
1519
1520    /// Format-specific parser or encoder configuration.
1521    #[serde(default)]
1522    #[schema(value_type = Object)]
1523    pub config: JsonValue,
1524}
1525
1526#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, ToSchema)]
1527#[serde(default)]
1528pub struct ResourceConfig {
1529    /// The minimum number of CPU cores to reserve
1530    /// for an instance of this pipeline
1531    #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1532    pub cpu_cores_min: Option<f64>,
1533
1534    /// The maximum number of CPU cores to reserve
1535    /// for an instance of this pipeline
1536    #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1537    pub cpu_cores_max: Option<f64>,
1538
1539    /// The minimum memory in Megabytes to reserve
1540    /// for an instance of this pipeline
1541    pub memory_mb_min: Option<u64>,
1542
1543    /// The maximum memory in Megabytes to reserve
1544    /// for an instance of this pipeline
1545    pub memory_mb_max: Option<u64>,
1546
1547    /// The total storage in Megabytes to reserve
1548    /// for an instance of this pipeline
1549    pub storage_mb_max: Option<u64>,
1550
1551    /// Storage class to use for an instance of this pipeline.
1552    /// The class determines storage performance such as IOPS and throughput.
1553    pub storage_class: Option<String>,
1554
1555    /// Kubernetes service account name to use for an instance of this pipeline.
1556    /// The account determines permissions and access controls.
1557    pub service_account_name: Option<String>,
1558
1559    /// Kubernetes namespace to use for an instance of this pipeline.
1560    /// The namespace determines the scope of names for resources created
1561    /// for the pipeline.
1562    /// If not set, the pipeline will be deployed in the same namespace
1563    /// as the control-plane.
1564    pub namespace: Option<String>,
1565}