feldera_types/config.rs
1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure. The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs. We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::transport::adhoc::AdHocInputConfig;
9use crate::transport::clock::ClockConfig;
10use crate::transport::datagen::DatagenInputConfig;
11use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
12use crate::transport::file::{FileInputConfig, FileOutputConfig};
13use crate::transport::http::HttpInputConfig;
14use crate::transport::iceberg::IcebergReaderConfig;
15use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
16use crate::transport::nexmark::NexmarkInputConfig;
17use crate::transport::postgres::{PostgresReaderConfig, PostgresWriterConfig};
18use crate::transport::pubsub::PubSubInputConfig;
19use crate::transport::redis::RedisOutputConfig;
20use crate::transport::s3::S3InputConfig;
21use crate::transport::url::UrlInputConfig;
22use core::fmt;
23use serde::de::{self, MapAccess, Visitor};
24use serde::{Deserialize, Deserializer, Serialize};
25use serde_json::Value as JsonValue;
26use serde_yaml::Value as YamlValue;
27use std::fmt::Display;
28use std::path::Path;
29use std::str::FromStr;
30use std::time::Duration;
31use std::{borrow::Cow, cmp::max, collections::BTreeMap};
32use utoipa::openapi::{ObjectBuilder, OneOfBuilder, Ref, RefOr, Schema, SchemaType};
33use utoipa::ToSchema;
34
35const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
36
37/// Default value of `ConnectorConfig::max_queued_records`.
38pub const fn default_max_queued_records() -> u64 {
39 1_000_000
40}
41
42/// Default maximum batch size for connectors, in records.
43///
44/// If you change this then update the comment on
45/// [ConnectorConfig::max_batch_size].
46pub const fn default_max_batch_size() -> u64 {
47 10_000
48}
49
50pub const DEFAULT_CLOCK_RESOLUTION_USECS: u64 = 1_000_000;
51
52/// Pipeline deployment configuration.
53/// It represents configuration entries directly provided by the user
54/// (e.g., runtime configuration) and entries derived from the schema
55/// of the compiled program (e.g., connectors). Storage configuration,
56/// if applicable, is set by the runner.
57#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
58pub struct PipelineConfig {
59 /// Global controller configuration.
60 #[serde(flatten)]
61 #[schema(inline)]
62 pub global: RuntimeConfig,
63
64 /// Pipeline name.
65 pub name: Option<String>,
66
67 /// Configuration for persistent storage
68 ///
69 /// If `global.storage` is `Some(_)`, this field must be set to some
70 /// [`StorageConfig`]. If `global.storage` is `None``, the pipeline ignores
71 /// this field.
72 #[serde(default)]
73 pub storage_config: Option<StorageConfig>,
74
75 /// Input endpoint configuration.
76 pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
77
78 /// Output endpoint configuration.
79 #[serde(default)]
80 pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
81}
82
83impl PipelineConfig {
84 pub fn max_parallel_connector_init(&self) -> u64 {
85 max(
86 self.global
87 .max_parallel_connector_init
88 .unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
89 1,
90 )
91 }
92
93 pub fn with_storage(self, storage: Option<(StorageConfig, StorageOptions)>) -> Self {
94 let (storage_config, storage_options) = storage.unzip();
95 Self {
96 global: RuntimeConfig {
97 storage: storage_options,
98 ..self.global
99 },
100 storage_config,
101 ..self
102 }
103 }
104
105 pub fn storage(&self) -> Option<(&StorageConfig, &StorageOptions)> {
106 let storage_options = self.global.storage.as_ref();
107 let storage_config = self.storage_config.as_ref();
108 storage_config.zip(storage_options)
109 }
110}
111
112/// Configuration for persistent storage in a [`PipelineConfig`].
113#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
114pub struct StorageConfig {
115 /// A directory to keep pipeline state, as a path on the filesystem of the
116 /// machine or container where the pipeline will run.
117 ///
118 /// When storage is enabled, this directory stores the data for
119 /// [StorageBackendConfig::Default].
120 ///
121 /// When fault tolerance is enabled, this directory stores checkpoints and
122 /// the log.
123 pub path: String,
124
125 /// How to cache access to storage in this pipeline.
126 #[serde(default)]
127 pub cache: StorageCacheConfig,
128}
129
130impl StorageConfig {
131 pub fn path(&self) -> &Path {
132 Path::new(&self.path)
133 }
134}
135
136/// How to cache access to storage within a Feldera pipeline.
137#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
138#[serde(rename_all = "snake_case")]
139pub enum StorageCacheConfig {
140 /// Use the operating system's page cache as the primary storage cache.
141 ///
142 /// This is the default because it currently performs better than
143 /// `FelderaCache`.
144 #[default]
145 PageCache,
146
147 /// Use Feldera's internal cache implementation.
148 ///
149 /// This is under development. It will become the default when its
150 /// performance exceeds that of `PageCache`.
151 FelderaCache,
152}
153
154impl StorageCacheConfig {
155 #[cfg(unix)]
156 pub fn to_custom_open_flags(&self) -> i32 {
157 match self {
158 StorageCacheConfig::PageCache => (),
159 StorageCacheConfig::FelderaCache => {
160 #[cfg(target_os = "linux")]
161 return libc::O_DIRECT;
162 }
163 }
164 0
165 }
166}
167
168/// Storage configuration for a pipeline.
169#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
170#[serde(default)]
171pub struct StorageOptions {
172 /// How to connect to the underlying storage.
173 pub backend: StorageBackendConfig,
174
175 /// For a batch of data maintained as part of a persistent index during a
176 /// pipeline run, the minimum estimated number of bytes to write it to
177 /// storage.
178 ///
179 /// This is provided for debugging and fine-tuning and should ordinarily be
180 /// left unset.
181 ///
182 /// A value of 0 will write even empty batches to storage, and nonzero
183 /// values provide a threshold. `usize::MAX` would effectively disable
184 /// storage for such batches. The default is 1,048,576 (1 MiB).
185 pub min_storage_bytes: Option<usize>,
186
187 /// For a batch of data passed through the pipeline during a single step,
188 /// the minimum estimated number of bytes to write it to storage.
189 ///
190 /// This is provided for debugging and fine-tuning and should ordinarily be
191 /// left unset. A value of 0 will write even empty batches to storage, and
192 /// nonzero values provide a threshold. `usize::MAX`, the default,
193 /// effectively disables storage for such batches. If it is set to another
194 /// value, it should ordinarily be greater than or equal to
195 /// `min_storage_bytes`.
196 pub min_step_storage_bytes: Option<usize>,
197
198 /// The form of compression to use in data batches.
199 ///
200 /// Compression has a CPU cost but it can take better advantage of limited
201 /// NVMe and network bandwidth, which means that it can increase overall
202 /// performance.
203 pub compression: StorageCompression,
204
205 /// The maximum size of the in-memory storage cache, in MiB.
206 ///
207 /// If set, the specified cache size is spread across all the foreground and
208 /// background threads. If unset, each foreground or background thread cache
209 /// is limited to 256 MiB.
210 pub cache_mib: Option<usize>,
211}
212
213/// Backend storage configuration.
214#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
215#[serde(tag = "name", content = "config", rename_all = "snake_case")]
216pub enum StorageBackendConfig {
217 /// Use the default storage configuration.
218 ///
219 /// This currently uses the local file system.
220 #[default]
221 Default,
222
223 /// Use the local file system.
224 ///
225 /// This uses ordinary system file operations.
226 File(FileBackendConfig),
227
228 /// Object storage.
229 Object(ObjectStorageConfig),
230}
231
232impl Display for StorageBackendConfig {
233 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
234 match self {
235 StorageBackendConfig::Default => write!(f, "default"),
236 StorageBackendConfig::File(_) => write!(f, "file"),
237 StorageBackendConfig::Object(_) => write!(f, "object"),
238 }
239 }
240}
241
242/// Storage compression algorithm.
243#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
244#[serde(rename_all = "snake_case")]
245pub enum StorageCompression {
246 /// Use Feldera's default compression algorithm.
247 ///
248 /// The default may change as Feldera's performance is tuned and new
249 /// algorithms are introduced.
250 #[default]
251 Default,
252
253 /// Do not compress.
254 None,
255
256 /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
257 Snappy,
258}
259
260#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
261pub struct SyncConfig {
262 /// The endpoint URL for the storage service.
263 ///
264 /// This is typically required for custom or local S3-compatible storage providers like MinIO.
265 /// Example: `http://localhost:9000`
266 ///
267 /// Relevant rclone config key: [`endpoint`](https://rclone.org/s3/#s3-endpoint)
268 pub endpoint: Option<String>,
269
270 /// The name of the storage bucket.
271 ///
272 /// This may include a path to a folder inside the bucket (e.g., `my-bucket/data`).
273 pub bucket: String,
274
275 /// The region that this bucket is in.
276 ///
277 /// Leave empty for Minio or the default region (`us-east-1` for AWS).
278 pub region: Option<String>,
279
280 /// The name of the cloud storage provider (e.g., `"AWS"`, `"Minio"`).
281 ///
282 /// Used for provider-specific behavior in rclone.
283 /// If omitted, defaults to `"Other"`.
284 ///
285 /// See [rclone S3 provider documentation](https://rclone.org/s3/#s3-provider)
286 pub provider: Option<String>,
287
288 /// The access key used to authenticate with the storage provider.
289 ///
290 /// If not provided, rclone will fall back to environment-based credentials, such as
291 /// `RCLONE_S3_ACCESS_KEY_ID`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
292 /// this can be left empty to allow automatic authentication via the pod's service account.
293 pub access_key: Option<String>,
294
295 /// The secret key used together with the access key for authentication.
296 ///
297 /// If not provided, rclone will fall back to environment-based credentials, such as
298 /// `RCLONE_S3_SECRET_ACCESS_KEY`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
299 /// this can be left empty to allow automatic authentication via the pod's service account.
300 pub secret_key: Option<String>,
301
302 /// If `true`, will try to pull the latest checkpoint from the configured
303 /// object store and resume from that point.
304 pub start_from_checkpoint: bool,
305}
306
307#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
308pub struct ObjectStorageConfig {
309 /// URL.
310 ///
311 /// The following URL schemes are supported:
312 ///
313 /// * S3:
314 /// - `s3://<bucket>/<path>`
315 /// - `s3a://<bucket>/<path>`
316 /// - `https://s3.<region>.amazonaws.com/<bucket>`
317 /// - `https://<bucket>.s3.<region>.amazonaws.com`
318 /// - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
319 /// * Google Cloud Storage:
320 /// - `gs://<bucket>/<path>`
321 /// * Microsoft Azure Blob Storage:
322 /// - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
323 /// - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
324 /// - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
325 /// - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
326 /// - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
327 /// - `azure://<container>/<path>` (custom)
328 /// - `https://<account>.dfs.core.windows.net`
329 /// - `https://<account>.blob.core.windows.net`
330 /// - `https://<account>.blob.core.windows.net/<container>`
331 /// - `https://<account>.dfs.fabric.microsoft.com`
332 /// - `https://<account>.dfs.fabric.microsoft.com/<container>`
333 /// - `https://<account>.blob.fabric.microsoft.com`
334 /// - `https://<account>.blob.fabric.microsoft.com/<container>`
335 ///
336 /// Settings derived from the URL will override other settings.
337 pub url: String,
338
339 /// Additional options as key-value pairs.
340 ///
341 /// The following keys are supported:
342 ///
343 /// * S3:
344 /// - `access_key_id`: AWS Access Key.
345 /// - `secret_access_key`: AWS Secret Access Key.
346 /// - `region`: Region.
347 /// - `default_region`: Default region.
348 /// - `endpoint`: Custom endpoint for communicating with S3,
349 /// e.g. `https://localhost:4566` for testing against a localstack
350 /// instance.
351 /// - `token`: Token to use for requests (passed to underlying provider).
352 /// - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
353 /// * Google Cloud Storage:
354 /// - `service_account`: Path to the service account file.
355 /// - `service_account_key`: The serialized service account key.
356 /// - `google_application_credentials`: Application credentials path.
357 /// - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
358 /// * Microsoft Azure Blob Storage:
359 /// - `access_key`: Azure Access Key.
360 /// - `container_name`: Azure Container Name.
361 /// - `account`: Azure Account.
362 /// - `bearer_token_authorization`: Static bearer token for authorizing requests.
363 /// - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
364 /// - `client_secret`: Client secret for use in client secret flow.
365 /// - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
366 /// - `endpoint`: Override the endpoint for communicating with blob storage.
367 /// - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
368 ///
369 /// Options set through the URL take precedence over those set with these
370 /// options.
371 #[serde(flatten)]
372 pub other_options: BTreeMap<String, String>,
373}
374
375/// Configuration for local file system access.
376#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
377#[serde(default)]
378pub struct FileBackendConfig {
379 /// Whether to use background threads for file I/O.
380 ///
381 /// Background threads should improve performance, but they can reduce
382 /// performance if too few cores are available. This is provided for
383 /// debugging and fine-tuning and should ordinarily be left unset.
384 pub async_threads: Option<bool>,
385
386 /// Per-I/O operation sleep duration, in milliseconds.
387 ///
388 /// This is for simulating slow storage devices. Do not use this in
389 /// production.
390 pub ioop_delay: Option<u64>,
391
392 /// Configuration to synchronize checkpoints to object store.
393 pub sync: Option<SyncConfig>,
394}
395
396/// Global pipeline configuration settings. This is the publicly
397/// exposed type for users to configure pipelines.
398#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
399#[serde(default)]
400pub struct RuntimeConfig {
401 /// Number of DBSP worker threads.
402 ///
403 /// Each DBSP "foreground" worker thread is paired with a "background"
404 /// thread for LSM merging, making the total number of threads twice the
405 /// specified number.
406 ///
407 /// The typical sweet spot for the number of workers is between 4 and 16.
408 /// Each worker increases overall memory consumption for data structures
409 /// used during a step.
410 pub workers: u16,
411
412 /// Storage configuration.
413 ///
414 /// - If this is `None`, the default, the pipeline's state is kept in
415 /// in-memory data-structures. This is useful if the pipeline's state
416 /// will fit in memory and if the pipeline is ephemeral and does not need
417 /// to be recovered after a restart. The pipeline will most likely run
418 /// faster since it does not need to access storage.
419 ///
420 /// - If set, the pipeline's state is kept on storage. This allows the
421 /// pipeline to work with state that will not fit into memory. It also
422 /// allows the state to be checkpointed and recovered across restarts.
423 #[serde(deserialize_with = "deserialize_storage_options")]
424 pub storage: Option<StorageOptions>,
425
426 /// Fault tolerance configuration.
427 #[serde(deserialize_with = "deserialize_fault_tolerance")]
428 pub fault_tolerance: FtConfig,
429
430 /// Enable CPU profiler.
431 ///
432 /// The default value is `true`.
433 pub cpu_profiler: bool,
434
435 /// Enable pipeline tracing.
436 pub tracing: bool,
437
438 /// Jaeger tracing endpoint to send tracing information to.
439 pub tracing_endpoint_jaeger: String,
440
441 /// Minimal input batch size.
442 ///
443 /// The controller delays pushing input records to the circuit until at
444 /// least `min_batch_size_records` records have been received (total
445 /// across all endpoints) or `max_buffering_delay_usecs` microseconds
446 /// have passed since at least one input records has been buffered.
447 /// Defaults to 0.
448 pub min_batch_size_records: u64,
449
450 /// Maximal delay in microseconds to wait for `min_batch_size_records` to
451 /// get buffered by the controller, defaults to 0.
452 pub max_buffering_delay_usecs: u64,
453
454 /// Resource reservations and limits. This is enforced
455 /// only in Feldera Cloud.
456 pub resources: ResourceConfig,
457
458 /// Real-time clock resolution in microseconds.
459 ///
460 /// This parameter controls the execution of queries that use the `NOW()` function. The output of such
461 /// queries depends on the real-time clock and can change over time without any external
462 /// inputs. The pipeline will update the clock value and trigger incremental recomputation
463 /// at most each `clock_resolution_usecs` microseconds.
464 ///
465 /// It is set to 1 second (1,000,000 microseconds) by default.
466 ///
467 /// Set to `null` to disable periodic clock updates.
468 pub clock_resolution_usecs: Option<u64>,
469
470 /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
471 /// its worker threads. Specify at least twice as many CPU numbers as
472 /// workers. CPUs are generally numbered starting from 0. The pipeline
473 /// might not be able to honor CPU pinning requests.
474 ///
475 /// CPU pinning can make pipelines run faster and perform more consistently,
476 /// as long as different pipelines running on the same machine are pinned to
477 /// different CPUs.
478 pub pin_cpus: Vec<usize>,
479
480 /// Timeout in seconds for the `Provisioning` phase of the pipeline.
481 /// Setting this value will override the default of the runner.
482 pub provisioning_timeout_secs: Option<u64>,
483
484 /// The maximum number of connectors initialized in parallel during pipeline
485 /// startup.
486 ///
487 /// At startup, the pipeline must initialize all of its input and output connectors.
488 /// Depending on the number and types of connectors, this can take a long time.
489 /// To accelerate the process, multiple connectors are initialized concurrently.
490 /// This option controls the maximum number of connectors that can be initialized
491 /// in parallel.
492 ///
493 /// The default is 10.
494 pub max_parallel_connector_init: Option<u64>,
495
496 /// Specification of additional (sidecar) containers.
497 pub init_containers: Option<serde_yaml::Value>,
498
499 /// Deprecated: setting this true or false does not have an effect anymore.
500 pub checkpoint_during_suspend: bool,
501
502 /// Optional settings for tweaking Feldera internals.
503 ///
504 /// The available key-value pairs change from one version of Feldera to
505 /// another, so users should not depend on particular settings being
506 /// available, or on their behavior.
507 pub dev_tweaks: BTreeMap<String, serde_json::Value>,
508}
509
510/// Accepts "true" and "false" and converts them to the new format.
511fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
512where
513 D: Deserializer<'de>,
514{
515 struct BoolOrStruct;
516
517 impl<'de> Visitor<'de> for BoolOrStruct {
518 type Value = Option<StorageOptions>;
519
520 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
521 formatter.write_str("boolean or StorageOptions")
522 }
523
524 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
525 where
526 E: de::Error,
527 {
528 match v {
529 false => Ok(None),
530 true => Ok(Some(StorageOptions::default())),
531 }
532 }
533
534 fn visit_unit<E>(self) -> Result<Self::Value, E>
535 where
536 E: de::Error,
537 {
538 Ok(None)
539 }
540
541 fn visit_none<E>(self) -> Result<Self::Value, E>
542 where
543 E: de::Error,
544 {
545 Ok(None)
546 }
547
548 fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
549 where
550 M: MapAccess<'de>,
551 {
552 Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
553 }
554 }
555
556 deserializer.deserialize_any(BoolOrStruct)
557}
558
559/// Accepts very old 'initial_state' and 'latest_checkpoint' as enabling fault
560/// tolerance.
561///
562/// Accepts `null` as disabling fault tolerance.
563///
564/// Otherwise, deserializes [FtConfig] in the way that one might otherwise
565/// expect.
566fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<FtConfig, D::Error>
567where
568 D: Deserializer<'de>,
569{
570 struct StringOrStruct;
571
572 impl<'de> Visitor<'de> for StringOrStruct {
573 type Value = FtConfig;
574
575 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
576 formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
577 }
578
579 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
580 where
581 E: de::Error,
582 {
583 match v {
584 "initial_state" | "latest_checkpoint" => Ok(FtConfig {
585 model: Some(FtModel::default()),
586 ..FtConfig::default()
587 }),
588 _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
589 }
590 }
591
592 fn visit_unit<E>(self) -> Result<Self::Value, E>
593 where
594 E: de::Error,
595 {
596 Ok(FtConfig::default())
597 }
598
599 fn visit_none<E>(self) -> Result<Self::Value, E>
600 where
601 E: de::Error,
602 {
603 Ok(FtConfig::default())
604 }
605
606 fn visit_map<M>(self, map: M) -> Result<FtConfig, M::Error>
607 where
608 M: MapAccess<'de>,
609 {
610 Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))
611 }
612 }
613
614 deserializer.deserialize_any(StringOrStruct)
615}
616
617impl Default for RuntimeConfig {
618 fn default() -> Self {
619 Self {
620 workers: 8,
621 storage: Some(StorageOptions::default()),
622 fault_tolerance: FtConfig::default(),
623 cpu_profiler: true,
624 tracing: {
625 // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
626 // to keep it on by default.
627 false
628 },
629 tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
630 min_batch_size_records: 0,
631 max_buffering_delay_usecs: 0,
632 resources: ResourceConfig::default(),
633 clock_resolution_usecs: { Some(DEFAULT_CLOCK_RESOLUTION_USECS) },
634 pin_cpus: Vec::new(),
635 provisioning_timeout_secs: None,
636 max_parallel_connector_init: None,
637 init_containers: None,
638 checkpoint_during_suspend: true,
639 dev_tweaks: BTreeMap::default(),
640 }
641 }
642}
643
644/// Fault-tolerance configuration.
645///
646/// The default [FtConfig] (via [FtConfig::default]) disables fault tolerance,
647/// which is the configuration that one gets if [RuntimeConfig] omits fault
648/// tolerance configuration.
649///
650/// The default value for [FtConfig::model] enables fault tolerance, as
651/// `Some(FtModel::default())`. This is the configuration that one gets if
652/// [RuntimeConfig] includes a fault tolerance configuration but does not
653/// specify a particular model.
654#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
655#[serde(rename_all = "snake_case")]
656pub struct FtConfig {
657 /// Fault tolerance model to use.
658 #[serde(with = "none_as_string")]
659 #[serde(default = "default_model")]
660 #[schema(
661 schema_with = none_as_string_schema::<FtModel>,
662 )]
663 pub model: Option<FtModel>,
664
665 /// Interval between automatic checkpoints, in seconds.
666 ///
667 /// The default is 60 seconds. Values less than 1 or greater than 3600 will
668 /// be forced into that range.
669 #[serde(default = "default_checkpoint_interval_secs")]
670 pub checkpoint_interval_secs: Option<u64>,
671}
672
673fn default_model() -> Option<FtModel> {
674 Some(FtModel::default())
675}
676
677pub fn default_checkpoint_interval_secs() -> Option<u64> {
678 Some(60)
679}
680
681impl Default for FtConfig {
682 fn default() -> Self {
683 Self {
684 model: None,
685 checkpoint_interval_secs: default_checkpoint_interval_secs(),
686 }
687 }
688}
689
690#[cfg(test)]
691mod test {
692 use super::deserialize_fault_tolerance;
693 use crate::config::{FtConfig, FtModel};
694 use serde::{Deserialize, Serialize};
695
696 #[test]
697 fn ft_config() {
698 #[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)]
699 #[serde(default)]
700 struct Wrapper {
701 #[serde(deserialize_with = "deserialize_fault_tolerance")]
702 config: FtConfig,
703 }
704
705 // Omitting FtConfig, or specifying null, or specifying model "none", disables fault tolerance.
706 for s in [
707 "{}",
708 r#"{"config": null}"#,
709 r#"{"config": {"model": "none"}}"#,
710 ] {
711 let config: Wrapper = serde_json::from_str(s).unwrap();
712 assert_eq!(
713 config,
714 Wrapper {
715 config: FtConfig {
716 model: None,
717 checkpoint_interval_secs: Some(60)
718 }
719 }
720 );
721 }
722
723 // Serializing disabled FT produces explicit "none" form.
724 let s = serde_json::to_string(&Wrapper {
725 config: FtConfig::default(),
726 })
727 .unwrap();
728 assert!(s.contains("\"none\""));
729
730 // `{}` for FtConfig, or `{...}` with `model` omitted, enables fault
731 // tolerance.
732 for s in [r#"{"config": {}}"#, r#"{"checkpoint_interval_secs": 60}"#] {
733 assert_eq!(
734 serde_json::from_str::<FtConfig>(s).unwrap(),
735 FtConfig {
736 model: Some(FtModel::default()),
737 checkpoint_interval_secs: Some(60)
738 }
739 );
740 }
741
742 // `"checkpoint_interval_secs": null` disables periodic checkpointing.
743 assert_eq!(
744 serde_json::from_str::<FtConfig>(r#"{"checkpoint_interval_secs": null}"#).unwrap(),
745 FtConfig {
746 model: Some(FtModel::default()),
747 checkpoint_interval_secs: None
748 }
749 );
750 }
751}
752
753impl FtConfig {
754 pub fn is_enabled(&self) -> bool {
755 self.model.is_some()
756 }
757
758 /// Returns the checkpoint interval, if fault tolerance is enabled, and
759 /// otherwise `None`.
760 pub fn checkpoint_interval(&self) -> Option<Duration> {
761 if self.is_enabled() {
762 self.checkpoint_interval_secs
763 .map(|interval| Duration::from_secs(interval.clamp(1, 3600)))
764 } else {
765 None
766 }
767 }
768}
769
770/// Serde implementation for de/serializing a string into `Option<T>` where
771/// `"none"` indicates `None` and any other string indicates `Some`.
772///
773/// This could be extended to handle non-strings by adding more forwarding
774/// `visit_*` methods to the Visitor implementation. I don't see a way to write
775/// them automatically.
776mod none_as_string {
777 use std::marker::PhantomData;
778
779 use serde::de::{Deserialize, Deserializer, IntoDeserializer, Visitor};
780 use serde::ser::{Serialize, Serializer};
781
782 pub(super) fn serialize<S, T>(value: &Option<T>, serializer: S) -> Result<S::Ok, S::Error>
783 where
784 S: Serializer,
785 T: Serialize,
786 {
787 match value.as_ref() {
788 Some(value) => value.serialize(serializer),
789 None => "none".serialize(serializer),
790 }
791 }
792
793 struct NoneAsString<T>(PhantomData<fn() -> T>);
794
795 impl<'de, T> Visitor<'de> for NoneAsString<T>
796 where
797 T: Deserialize<'de>,
798 {
799 type Value = Option<T>;
800
801 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
802 formatter.write_str("string")
803 }
804
805 fn visit_none<E>(self) -> Result<Self::Value, E>
806 where
807 E: serde::de::Error,
808 {
809 Ok(None)
810 }
811
812 fn visit_str<E>(self, value: &str) -> Result<Option<T>, E>
813 where
814 E: serde::de::Error,
815 {
816 if &value.to_ascii_lowercase() == "none" {
817 Ok(None)
818 } else {
819 Ok(Some(T::deserialize(value.into_deserializer())?))
820 }
821 }
822 }
823
824 pub(super) fn deserialize<'de, D, T>(deserializer: D) -> Result<Option<T>, D::Error>
825 where
826 D: Deserializer<'de>,
827 T: Deserialize<'de>,
828 {
829 deserializer.deserialize_str(NoneAsString(PhantomData))
830 }
831}
832
833/// Generates an OpenAPI schema for an `Option<T>` field serialized with `none_as_string`.
834/// The schema is a `oneOf` with a reference to `T`'s schema and a `"none"` string enum.
835fn none_as_string_schema<'a, T: ToSchema<'a> + Default + Serialize>() -> Schema {
836 Schema::OneOf(
837 OneOfBuilder::new()
838 .item(RefOr::Ref(Ref::new(format!(
839 "#/components/schemas/{}",
840 T::schema().0
841 ))))
842 .item(
843 ObjectBuilder::new()
844 .schema_type(SchemaType::String)
845 .enum_values(Some(vec!["none"])),
846 )
847 .default(Some(
848 serde_json::to_value(T::default()).expect("Failed to serialize default value"),
849 ))
850 .build(),
851 )
852}
853
854/// Fault tolerance model.
855///
856/// The ordering is significant: we consider [Self::ExactlyOnce] to be a "higher
857/// level" of fault tolerance than [Self::AtLeastOnce].
858#[derive(
859 Debug, Copy, Clone, Default, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, ToSchema,
860)]
861#[serde(rename_all = "snake_case")]
862pub enum FtModel {
863 /// Each record is output at least once. Crashes may duplicate output, but
864 /// no input or output is dropped.
865 AtLeastOnce,
866
867 /// Each record is output exactly once. Crashes do not drop or duplicate
868 /// input or output.
869 #[default]
870 ExactlyOnce,
871}
872
873impl FtModel {
874 pub fn option_as_str(value: Option<FtModel>) -> &'static str {
875 value.map_or("no", |model| model.as_str())
876 }
877
878 pub fn as_str(&self) -> &'static str {
879 match self {
880 FtModel::AtLeastOnce => "at_least_once",
881 FtModel::ExactlyOnce => "exactly_once",
882 }
883 }
884}
885
886pub struct FtModelUnknown;
887
888impl FromStr for FtModel {
889 type Err = FtModelUnknown;
890
891 fn from_str(s: &str) -> Result<Self, Self::Err> {
892 match s.to_ascii_lowercase().as_str() {
893 "exactly_once" => Ok(Self::ExactlyOnce),
894 "at_least_once" => Ok(Self::AtLeastOnce),
895 _ => Err(FtModelUnknown),
896 }
897 }
898}
899
900/// Describes an input connector configuration
901#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
902pub struct InputEndpointConfig {
903 /// The name of the input stream of the circuit that this endpoint is
904 /// connected to.
905 pub stream: Cow<'static, str>,
906
907 /// Connector configuration.
908 #[serde(flatten)]
909 pub connector_config: ConnectorConfig,
910}
911
912/// Deserialize the `start_after` property of a connector configuration.
913/// It requires a non-standard deserialization because we want to accept
914/// either a string or an array of strings.
915fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
916where
917 D: Deserializer<'de>,
918{
919 let value = Option::<JsonValue>::deserialize(deserializer)?;
920 match value {
921 Some(JsonValue::String(s)) => Ok(Some(vec![s])),
922 Some(JsonValue::Array(arr)) => {
923 let vec = arr
924 .into_iter()
925 .map(|item| {
926 item.as_str()
927 .map(|s| s.to_string())
928 .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
929 })
930 .collect::<Result<Vec<String>, _>>()?;
931 Ok(Some(vec))
932 }
933 Some(JsonValue::Null) | None => Ok(None),
934 _ => Err(serde::de::Error::custom(
935 "invalid 'start_after' property: expected a string, an array of strings, or null",
936 )),
937 }
938}
939
940/// A data connector's configuration
941#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
942pub struct ConnectorConfig {
943 /// Transport endpoint configuration.
944 pub transport: TransportConfig,
945
946 /// Parser configuration.
947 pub format: Option<FormatConfig>,
948
949 /// Name of the index that the connector is attached to.
950 ///
951 /// This property is valid for output connectors only. It is used with data
952 /// transports and formats that expect output updates in the form of key/value
953 /// pairs, where the key typically represents a unique id associated with the
954 /// table or view.
955 ///
956 /// To support such output formats, an output connector can be attached to an
957 /// index created using the SQL CREATE INDEX statement. An index of a table
958 /// or view contains the same updates as the table or view itself, indexed by
959 /// one or more key columns.
960 ///
961 /// See individual connector documentation for details on how they work
962 /// with indexes.
963 pub index: Option<String>,
964
965 /// Output buffer configuration.
966 #[serde(flatten)]
967 pub output_buffer_config: OutputBufferConfig,
968
969 /// Maximum batch size, in records.
970 ///
971 /// This is the maximum number of records to process in one batch through
972 /// the circuit. The time and space cost of processing a batch is
973 /// asymptotically superlinear in the size of the batch, but very small
974 /// batches are less efficient due to constant factors.
975 ///
976 /// This should usually be less than `max_queued_records`, to give the
977 /// connector a round-trip time to restart and refill the buffer while
978 /// batches are being processed.
979 ///
980 /// Some input adapters might not honor this setting.
981 ///
982 /// The default is 10,000.
983 #[serde(default = "default_max_batch_size")]
984 pub max_batch_size: u64,
985
986 /// Backpressure threshold.
987 ///
988 /// Maximal number of records queued by the endpoint before the endpoint
989 /// is paused by the backpressure mechanism.
990 ///
991 /// For input endpoints, this setting bounds the number of records that have
992 /// been received from the input transport but haven't yet been consumed by
993 /// the circuit since the circuit, since the circuit is still busy processing
994 /// previous inputs.
995 ///
996 /// For output endpoints, this setting bounds the number of records that have
997 /// been produced by the circuit but not yet sent via the output transport endpoint
998 /// nor stored in the output buffer (see `enable_output_buffer`).
999 ///
1000 /// Note that this is not a hard bound: there can be a small delay between
1001 /// the backpressure mechanism is triggered and the endpoint is paused, during
1002 /// which more data may be queued.
1003 ///
1004 /// The default is 1 million.
1005 #[serde(default = "default_max_queued_records")]
1006 pub max_queued_records: u64,
1007
1008 /// Create connector in paused state.
1009 ///
1010 /// The default is `false`.
1011 #[serde(default)]
1012 pub paused: bool,
1013
1014 /// Arbitrary user-defined text labels associated with the connector.
1015 ///
1016 /// These labels can be used in conjunction with the `start_after` property
1017 /// to control the start order of connectors.
1018 #[serde(default)]
1019 pub labels: Vec<String>,
1020
1021 /// Start the connector after all connectors with specified labels.
1022 ///
1023 /// This property is used to control the start order of connectors.
1024 /// The connector will not start until all connectors with the specified
1025 /// labels have finished processing all inputs.
1026 #[serde(deserialize_with = "deserialize_start_after")]
1027 #[serde(default)]
1028 pub start_after: Option<Vec<String>>,
1029}
1030
1031#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1032#[serde(default)]
1033pub struct OutputBufferConfig {
1034 /// Enable output buffering.
1035 ///
1036 /// The output buffering mechanism allows decoupling the rate at which the pipeline
1037 /// pushes changes to the output transport from the rate of input changes.
1038 ///
1039 /// By default, output updates produced by the pipeline are pushed directly to
1040 /// the output transport. Some destinations may prefer to receive updates in fewer
1041 /// bigger batches. For instance, when writing Parquet files, producing
1042 /// one bigger file every few minutes is usually better than creating
1043 /// small files every few milliseconds.
1044 ///
1045 /// To achieve such input/output decoupling, users can enable output buffering by
1046 /// setting the `enable_output_buffer` flag to `true`. When buffering is enabled, output
1047 /// updates produced by the pipeline are consolidated in an internal buffer and are
1048 /// pushed to the output transport when one of several conditions is satisfied:
1049 ///
1050 /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
1051 /// milliseconds.
1052 /// * buffer size exceeds `max_output_buffer_size_records` records.
1053 ///
1054 /// This flag is `false` by default.
1055 // TODO: on-demand output triggered via the API.
1056 pub enable_output_buffer: bool,
1057
1058 /// Maximum time in milliseconds data is kept in the output buffer.
1059 ///
1060 /// By default, data is kept in the buffer indefinitely until one of
1061 /// the other output conditions is satisfied. When this option is
1062 /// set the buffer will be flushed at most every
1063 /// `max_output_buffer_time_millis` milliseconds.
1064 ///
1065 /// NOTE: this configuration option requires the `enable_output_buffer` flag
1066 /// to be set.
1067 pub max_output_buffer_time_millis: usize,
1068
1069 /// Maximum number of updates to be kept in the output buffer.
1070 ///
1071 /// This parameter bounds the maximal size of the buffer.
1072 /// Note that the size of the buffer is not always equal to the
1073 /// total number of updates output by the pipeline. Updates to the
1074 /// same record can overwrite or cancel previous updates.
1075 ///
1076 /// By default, the buffer can grow indefinitely until one of
1077 /// the other output conditions is satisfied.
1078 ///
1079 /// NOTE: this configuration option requires the `enable_output_buffer` flag
1080 /// to be set.
1081 pub max_output_buffer_size_records: usize,
1082}
1083
1084impl Default for OutputBufferConfig {
1085 fn default() -> Self {
1086 Self {
1087 enable_output_buffer: false,
1088 max_output_buffer_size_records: usize::MAX,
1089 max_output_buffer_time_millis: usize::MAX,
1090 }
1091 }
1092}
1093
1094impl OutputBufferConfig {
1095 pub fn validate(&self) -> Result<(), String> {
1096 if self.enable_output_buffer
1097 && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
1098 && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
1099 {
1100 return Err(
1101 "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
1102 .to_string(),
1103 );
1104 }
1105
1106 Ok(())
1107 }
1108}
1109
1110/// Describes an output connector configuration
1111#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1112pub struct OutputEndpointConfig {
1113 /// The name of the output stream of the circuit that this endpoint is
1114 /// connected to.
1115 pub stream: Cow<'static, str>,
1116
1117 /// Connector configuration.
1118 #[serde(flatten)]
1119 pub connector_config: ConnectorConfig,
1120}
1121
1122/// Transport-specific endpoint configuration passed to
1123/// `crate::OutputTransport::new_endpoint`
1124/// and `crate::InputTransport::new_endpoint`.
1125#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1126#[serde(tag = "name", content = "config", rename_all = "snake_case")]
1127pub enum TransportConfig {
1128 FileInput(FileInputConfig),
1129 FileOutput(FileOutputConfig),
1130 KafkaInput(KafkaInputConfig),
1131 KafkaOutput(KafkaOutputConfig),
1132 PubSubInput(PubSubInputConfig),
1133 UrlInput(UrlInputConfig),
1134 S3Input(S3InputConfig),
1135 DeltaTableInput(DeltaTableReaderConfig),
1136 DeltaTableOutput(DeltaTableWriterConfig),
1137 RedisOutput(RedisOutputConfig),
1138 // Prevent rust from complaining about large size difference between enum variants.
1139 IcebergInput(Box<IcebergReaderConfig>),
1140 PostgresInput(PostgresReaderConfig),
1141 PostgresOutput(PostgresWriterConfig),
1142 Datagen(DatagenInputConfig),
1143 Nexmark(NexmarkInputConfig),
1144 /// Direct HTTP input: cannot be instantiated through API
1145 HttpInput(HttpInputConfig),
1146 /// Direct HTTP output: cannot be instantiated through API
1147 HttpOutput,
1148 /// Ad hoc input: cannot be instantiated through API
1149 AdHocInput(AdHocInputConfig),
1150 ClockInput(ClockConfig),
1151}
1152
1153impl TransportConfig {
1154 pub fn name(&self) -> String {
1155 match self {
1156 TransportConfig::FileInput(_) => "file_input".to_string(),
1157 TransportConfig::FileOutput(_) => "file_output".to_string(),
1158 TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
1159 TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
1160 TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
1161 TransportConfig::UrlInput(_) => "url_input".to_string(),
1162 TransportConfig::S3Input(_) => "s3_input".to_string(),
1163 TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
1164 TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
1165 TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
1166 TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
1167 TransportConfig::PostgresOutput(_) => "postgres_output".to_string(),
1168 TransportConfig::Datagen(_) => "datagen".to_string(),
1169 TransportConfig::Nexmark(_) => "nexmark".to_string(),
1170 TransportConfig::HttpInput(_) => "http_input".to_string(),
1171 TransportConfig::HttpOutput => "http_output".to_string(),
1172 TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
1173 TransportConfig::RedisOutput(_) => "redis_output".to_string(),
1174 TransportConfig::ClockInput(_) => "clock".to_string(),
1175 }
1176 }
1177}
1178
1179/// Data format specification used to parse raw data received from the
1180/// endpoint or to encode data sent to the endpoint.
1181#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
1182pub struct FormatConfig {
1183 /// Format name, e.g., "csv", "json", "bincode", etc.
1184 pub name: Cow<'static, str>,
1185
1186 /// Format-specific parser or encoder configuration.
1187 #[serde(default)]
1188 #[schema(value_type = Object)]
1189 pub config: YamlValue,
1190}
1191
1192#[derive(Debug, Clone, Eq, PartialEq, Default, Serialize, Deserialize, ToSchema)]
1193#[serde(default)]
1194pub struct ResourceConfig {
1195 /// The minimum number of CPU cores to reserve
1196 /// for an instance of this pipeline
1197 pub cpu_cores_min: Option<u64>,
1198
1199 /// The maximum number of CPU cores to reserve
1200 /// for an instance of this pipeline
1201 pub cpu_cores_max: Option<u64>,
1202
1203 /// The minimum memory in Megabytes to reserve
1204 /// for an instance of this pipeline
1205 pub memory_mb_min: Option<u64>,
1206
1207 /// The maximum memory in Megabytes to reserve
1208 /// for an instance of this pipeline
1209 pub memory_mb_max: Option<u64>,
1210
1211 /// The total storage in Megabytes to reserve
1212 /// for an instance of this pipeline
1213 pub storage_mb_max: Option<u64>,
1214
1215 /// Storage class to use for an instance of this pipeline.
1216 /// The class determines storage performance such as IOPS and throughput.
1217 pub storage_class: Option<String>,
1218}