feldera_types/config.rs
1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure. The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs. We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::preprocess::PreprocessorConfig;
9use crate::program_schema::ProgramSchema;
10use crate::secret_resolver::default_secrets_directory;
11use crate::transport::adhoc::AdHocInputConfig;
12use crate::transport::clock::ClockConfig;
13use crate::transport::datagen::DatagenInputConfig;
14use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
15use crate::transport::file::{FileInputConfig, FileOutputConfig};
16use crate::transport::http::HttpInputConfig;
17use crate::transport::iceberg::IcebergReaderConfig;
18use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
19use crate::transport::nats::NatsInputConfig;
20use crate::transport::nexmark::NexmarkInputConfig;
21use crate::transport::postgres::{PostgresReaderConfig, PostgresWriterConfig};
22use crate::transport::pubsub::PubSubInputConfig;
23use crate::transport::redis::RedisOutputConfig;
24use crate::transport::s3::S3InputConfig;
25use crate::transport::url::UrlInputConfig;
26use core::fmt;
27use feldera_ir::{MirNode, MirNodeId};
28use serde::de::{self, MapAccess, Visitor};
29use serde::{Deserialize, Deserializer, Serialize};
30use serde_json::Value as JsonValue;
31use std::collections::HashMap;
32use std::fmt::Display;
33use std::path::Path;
34use std::str::FromStr;
35use std::time::Duration;
36use std::{borrow::Cow, cmp::max, collections::BTreeMap};
37use utoipa::ToSchema;
38use utoipa::openapi::{ObjectBuilder, OneOfBuilder, Ref, RefOr, Schema, SchemaType};
39
40const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
41
42/// Default value of `ConnectorConfig::max_queued_records`.
43pub const fn default_max_queued_records() -> u64 {
44 1_000_000
45}
46
47pub const DEFAULT_MAX_WORKER_BATCH_SIZE: u64 = 10_000;
48
49pub const DEFAULT_CLOCK_RESOLUTION_USECS: u64 = 1_000_000;
50
51/// Program information included in the pipeline configuration.
52#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)]
53pub struct ProgramIr {
54 /// The MIR of the program.
55 pub mir: HashMap<MirNodeId, MirNode>,
56 /// Program schema.
57 pub program_schema: ProgramSchema,
58}
59
60/// Pipeline deployment configuration.
61/// It represents configuration entries directly provided by the user
62/// (e.g., runtime configuration) and entries derived from the schema
63/// of the compiled program (e.g., connectors). Storage configuration,
64/// if applicable, is set by the runner.
65#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq)]
66pub struct PipelineConfig {
67 /// Global controller configuration.
68 #[serde(flatten)]
69 #[schema(inline)]
70 pub global: RuntimeConfig,
71
72 /// Configuration for multihost pipelines.
73 ///
74 /// The presence of this field indicates that the pipeline is running in
75 /// multihost mode. In the pod with ordinal 0, this triggers starting the
76 /// coordinator process. In all pods, this tells the pipeline process to
77 /// await a connection from the coordinator instead of initializing the
78 /// pipeline immediately.
79 pub multihost: Option<MultihostConfig>,
80
81 /// Unique system-generated name of the pipeline (format: `pipeline-<uuid>`).
82 /// It is unique across all tenants and cannot be changed.
83 ///
84 /// The `<uuid>` is also used in the naming of various resources that back the pipeline,
85 /// and as such this name is useful to find/identify corresponding resources.
86 pub name: Option<String>,
87
88 /// Name given by the tenant to the pipeline. It is only unique within the same tenant, and can
89 /// be changed by the tenant when the pipeline is stopped.
90 ///
91 /// Given a specific tenant, it can be used to find/identify a specific pipeline of theirs.
92 pub given_name: Option<String>,
93
94 /// Configuration for persistent storage
95 ///
96 /// If `global.storage` is `Some(_)`, this field must be set to some
97 /// [`StorageConfig`]. If `global.storage` is `None``, the pipeline ignores
98 /// this field.
99 #[serde(default)]
100 pub storage_config: Option<StorageConfig>,
101
102 /// Directory containing values of secrets.
103 ///
104 /// If this is not set, a default directory is used.
105 pub secrets_dir: Option<String>,
106
107 /// Input endpoint configuration.
108 #[serde(default)]
109 pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
110
111 /// Output endpoint configuration.
112 #[serde(default)]
113 pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
114
115 /// Program information.
116 #[serde(default)]
117 pub program_ir: Option<ProgramIr>,
118}
119
120impl PipelineConfig {
121 pub fn max_parallel_connector_init(&self) -> u64 {
122 max(
123 self.global
124 .max_parallel_connector_init
125 .unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
126 1,
127 )
128 }
129
130 pub fn with_storage(self, storage: Option<(StorageConfig, StorageOptions)>) -> Self {
131 let (storage_config, storage_options) = storage.unzip();
132 Self {
133 global: RuntimeConfig {
134 storage: storage_options,
135 ..self.global
136 },
137 storage_config,
138 ..self
139 }
140 }
141
142 pub fn storage(&self) -> Option<(&StorageConfig, &StorageOptions)> {
143 let storage_options = self.global.storage.as_ref();
144 let storage_config = self.storage_config.as_ref();
145 storage_config.zip(storage_options)
146 }
147
148 /// Returns `self.secrets_dir`, or the default secrets directory if it isn't
149 /// set.
150 pub fn secrets_dir(&self) -> &Path {
151 match &self.secrets_dir {
152 Some(dir) => Path::new(dir.as_str()),
153 None => default_secrets_directory(),
154 }
155 }
156
157 /// Abbreviated config that can be printed in the log on pipeline startup.
158 pub fn display_summary(&self) -> String {
159 // TODO: we may want to further abbreviate connector config.
160 let summary = serde_json::json!({
161 "name": self.name,
162 "given_name": self.given_name,
163 "global": self.global,
164 "storage_config": self.storage_config,
165 "secrets_dir": self.secrets_dir,
166 "inputs": self.inputs,
167 "outputs": self.outputs
168 });
169
170 serde_json::to_string_pretty(&summary).unwrap_or_else(|_| "{}".to_string())
171 }
172}
173
174/// A subset of fields in `PipelineConfig` that are generated by the compiler.
175/// These fields are shipped to the pipeline by the compilation server along with
176/// the program binary.
177// Note: An alternative would be to embed these fields in the program binary itself
178// as static strings. This would work well for program IR, but it would require recompiling
179// the program anytime a connector config changes, whereas today connector changes
180// do not require recompilation.
181#[derive(Default, Deserialize, Serialize, Eq, PartialEq, Debug, Clone)]
182pub struct PipelineConfigProgramInfo {
183 /// Input endpoint configuration.
184 pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
185
186 /// Output endpoint configuration.
187 #[serde(default)]
188 pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
189
190 /// Program information.
191 #[serde(default)]
192 pub program_ir: Option<ProgramIr>,
193}
194
195/// Configuration for a multihost Feldera pipeline.
196///
197/// This configuration is primarily for the coordinator.
198#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
199pub struct MultihostConfig {
200 /// Number of hosts to launch.
201 ///
202 /// For the configuration to be truly multihost, this should be at least 2.
203 /// A value of 1 still runs the multihost coordinator but it only
204 /// coordinates a single host.
205 pub hosts: usize,
206}
207
208impl Default for MultihostConfig {
209 fn default() -> Self {
210 Self { hosts: 1 }
211 }
212}
213
214/// Configuration for persistent storage in a [`PipelineConfig`].
215#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
216pub struct StorageConfig {
217 /// A directory to keep pipeline state, as a path on the filesystem of the
218 /// machine or container where the pipeline will run.
219 ///
220 /// When storage is enabled, this directory stores the data for
221 /// [StorageBackendConfig::Default].
222 ///
223 /// When fault tolerance is enabled, this directory stores checkpoints and
224 /// the log.
225 pub path: String,
226
227 /// How to cache access to storage in this pipeline.
228 #[serde(default)]
229 pub cache: StorageCacheConfig,
230}
231
232impl StorageConfig {
233 pub fn path(&self) -> &Path {
234 Path::new(&self.path)
235 }
236}
237
238/// How to cache access to storage within a Feldera pipeline.
239#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
240#[serde(rename_all = "snake_case")]
241pub enum StorageCacheConfig {
242 /// Use the operating system's page cache as the primary storage cache.
243 ///
244 /// This is the default because it currently performs better than
245 /// `FelderaCache`.
246 #[default]
247 PageCache,
248
249 /// Use Feldera's internal cache implementation.
250 ///
251 /// This is under development. It will become the default when its
252 /// performance exceeds that of `PageCache`.
253 FelderaCache,
254}
255
256impl StorageCacheConfig {
257 #[cfg(unix)]
258 pub fn to_custom_open_flags(&self) -> i32 {
259 match self {
260 StorageCacheConfig::PageCache => (),
261 StorageCacheConfig::FelderaCache => {
262 #[cfg(target_os = "linux")]
263 return libc::O_DIRECT;
264 }
265 }
266 0
267 }
268}
269
270/// Storage configuration for a pipeline.
271#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
272#[serde(default)]
273pub struct StorageOptions {
274 /// How to connect to the underlying storage.
275 pub backend: StorageBackendConfig,
276
277 /// For a batch of data maintained as part of a persistent index during a
278 /// pipeline run, the minimum estimated number of bytes to write it to
279 /// storage.
280 ///
281 /// This is provided for debugging and fine-tuning and should ordinarily be
282 /// left unset.
283 ///
284 /// A value of 0 will write even empty batches to storage, and nonzero
285 /// values provide a threshold. `usize::MAX` would effectively disable
286 /// storage for such batches. The default is 10,048,576 (10 MiB).
287 pub min_storage_bytes: Option<usize>,
288
289 /// For a batch of data passed through the pipeline during a single step,
290 /// the minimum estimated number of bytes to write it to storage.
291 ///
292 /// This is provided for debugging and fine-tuning and should ordinarily be
293 /// left unset. A value of 0 will write even empty batches to storage, and
294 /// nonzero values provide a threshold. `usize::MAX`, the default,
295 /// effectively disables storage for such batches. If it is set to another
296 /// value, it should ordinarily be greater than or equal to
297 /// `min_storage_bytes`.
298 pub min_step_storage_bytes: Option<usize>,
299
300 /// The form of compression to use in data batches.
301 ///
302 /// Compression has a CPU cost but it can take better advantage of limited
303 /// NVMe and network bandwidth, which means that it can increase overall
304 /// performance.
305 pub compression: StorageCompression,
306
307 /// The maximum size of the in-memory storage cache, in MiB.
308 ///
309 /// If set, the specified cache size is spread across all the foreground and
310 /// background threads. If unset, each foreground or background thread cache
311 /// is limited to 256 MiB.
312 pub cache_mib: Option<usize>,
313}
314
315/// Backend storage configuration.
316#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
317#[serde(tag = "name", content = "config", rename_all = "snake_case")]
318pub enum StorageBackendConfig {
319 /// Use the default storage configuration.
320 ///
321 /// This currently uses the local file system.
322 #[default]
323 Default,
324
325 /// Use the local file system.
326 ///
327 /// This uses ordinary system file operations.
328 File(Box<FileBackendConfig>),
329
330 /// Object storage.
331 Object(ObjectStorageConfig),
332}
333
334impl Display for StorageBackendConfig {
335 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336 match self {
337 StorageBackendConfig::Default => write!(f, "default"),
338 StorageBackendConfig::File(_) => write!(f, "file"),
339 StorageBackendConfig::Object(_) => write!(f, "object"),
340 }
341 }
342}
343
344/// Storage compression algorithm.
345#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
346#[serde(rename_all = "snake_case")]
347pub enum StorageCompression {
348 /// Use Feldera's default compression algorithm.
349 ///
350 /// The default may change as Feldera's performance is tuned and new
351 /// algorithms are introduced.
352 #[default]
353 Default,
354
355 /// Do not compress.
356 None,
357
358 /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
359 Snappy,
360}
361
362#[derive(Debug, Clone, Eq, PartialEq)]
363pub enum StartFromCheckpoint {
364 Latest,
365 Uuid(uuid::Uuid),
366}
367
368impl ToSchema<'_> for StartFromCheckpoint {
369 fn schema() -> (
370 &'static str,
371 utoipa::openapi::RefOr<utoipa::openapi::schema::Schema>,
372 ) {
373 (
374 "StartFromCheckpoint",
375 utoipa::openapi::RefOr::T(Schema::OneOf(
376 OneOfBuilder::new()
377 .item(
378 ObjectBuilder::new()
379 .schema_type(SchemaType::String)
380 .enum_values(Some(["latest"].into_iter()))
381 .build(),
382 )
383 .item(
384 ObjectBuilder::new()
385 .schema_type(SchemaType::String)
386 .format(Some(utoipa::openapi::SchemaFormat::KnownFormat(
387 utoipa::openapi::KnownFormat::Uuid,
388 )))
389 .build(),
390 )
391 .nullable(true)
392 .build(),
393 )),
394 )
395 }
396}
397
398impl<'de> Deserialize<'de> for StartFromCheckpoint {
399 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
400 where
401 D: Deserializer<'de>,
402 {
403 struct StartFromCheckpointVisitor;
404
405 impl<'de> Visitor<'de> for StartFromCheckpointVisitor {
406 type Value = StartFromCheckpoint;
407
408 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
409 formatter.write_str("a UUID string or the string \"latest\"")
410 }
411
412 fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
413 where
414 E: de::Error,
415 {
416 if value == "latest" {
417 Ok(StartFromCheckpoint::Latest)
418 } else {
419 uuid::Uuid::parse_str(value)
420 .map(StartFromCheckpoint::Uuid)
421 .map_err(|_| E::invalid_value(serde::de::Unexpected::Str(value), &self))
422 }
423 }
424 }
425
426 deserializer.deserialize_str(StartFromCheckpointVisitor)
427 }
428}
429
430impl Serialize for StartFromCheckpoint {
431 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
432 where
433 S: serde::Serializer,
434 {
435 match self {
436 StartFromCheckpoint::Latest => serializer.serialize_str("latest"),
437 StartFromCheckpoint::Uuid(uuid) => serializer.serialize_str(&uuid.to_string()),
438 }
439 }
440}
441
442#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
443pub struct SyncConfig {
444 /// The endpoint URL for the storage service.
445 ///
446 /// This is typically required for custom or local S3-compatible storage providers like MinIO.
447 /// Example: `http://localhost:9000`
448 ///
449 /// Relevant rclone config key: [`endpoint`](https://rclone.org/s3/#s3-endpoint)
450 pub endpoint: Option<String>,
451
452 /// The name of the storage bucket.
453 ///
454 /// This may include a path to a folder inside the bucket (e.g., `my-bucket/data`).
455 pub bucket: String,
456
457 /// The region that this bucket is in.
458 ///
459 /// Leave empty for Minio or the default region (`us-east-1` for AWS).
460 pub region: Option<String>,
461
462 /// The name of the cloud storage provider (e.g., `"AWS"`, `"Minio"`).
463 ///
464 /// Used for provider-specific behavior in rclone.
465 /// If omitted, defaults to `"Other"`.
466 ///
467 /// See [rclone S3 provider documentation](https://rclone.org/s3/#s3-provider)
468 pub provider: Option<String>,
469
470 /// The access key used to authenticate with the storage provider.
471 ///
472 /// If not provided, rclone will fall back to environment-based credentials, such as
473 /// `RCLONE_S3_ACCESS_KEY_ID`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
474 /// this can be left empty to allow automatic authentication via the pod's service account.
475 pub access_key: Option<String>,
476
477 /// The secret key used together with the access key for authentication.
478 ///
479 /// If not provided, rclone will fall back to environment-based credentials, such as
480 /// `RCLONE_S3_SECRET_ACCESS_KEY`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
481 /// this can be left empty to allow automatic authentication via the pod's service account.
482 pub secret_key: Option<String>,
483
484 /// When set, the pipeline will try fetch the specified checkpoint from the
485 /// object store.
486 ///
487 /// If `fail_if_no_checkpoint` is `true`, the pipeline will fail to initialize.
488 pub start_from_checkpoint: Option<StartFromCheckpoint>,
489
490 /// When true, the pipeline will fail to initialize if fetching the
491 /// specified checkpoint fails (missing, download error).
492 /// When false, the pipeline will start from scratch instead.
493 ///
494 /// False by default.
495 #[schema(default = std::primitive::bool::default)]
496 #[serde(default)]
497 pub fail_if_no_checkpoint: bool,
498
499 /// The number of file transfers to run in parallel.
500 /// Default: 20
501 pub transfers: Option<u8>,
502
503 /// The number of checkers to run in parallel.
504 /// Default: 20
505 pub checkers: Option<u8>,
506
507 /// Set to skip post copy check of checksums, and only check the file sizes.
508 /// This can significantly improve the throughput.
509 /// Defualt: false
510 pub ignore_checksum: Option<bool>,
511
512 /// Number of streams to use for multi-thread downloads.
513 /// Default: 10
514 pub multi_thread_streams: Option<u8>,
515
516 /// Use multi-thread download for files above this size.
517 /// Format: `[size][Suffix]` (Example: 1G, 500M)
518 /// Supported suffixes: k|M|G|T
519 /// Default: 100M
520 pub multi_thread_cutoff: Option<String>,
521
522 /// The number of chunks of the same file that are uploaded for multipart uploads.
523 /// Default: 10
524 pub upload_concurrency: Option<u8>,
525
526 /// When `true`, the pipeline starts in **standby** mode; processing doesn't
527 /// start until activation (`POST /activate`).
528 /// If this pipeline was previously activated and the storage has not been
529 /// cleared, the pipeline will auto activate, no newer checkpoints will be
530 /// fetched.
531 ///
532 /// Standby behavior depends on `start_from_checkpoint`:
533 /// - If `latest`, pipeline continuously fetches the latest available
534 /// checkpoint until activated.
535 /// - If checkpoint UUID, pipeline fetches this checkpoint once and waits
536 /// in standby until activated.
537 ///
538 /// Default: `false`
539 #[schema(default = std::primitive::bool::default)]
540 #[serde(default)]
541 pub standby: bool,
542
543 /// The interval (in seconds) between each attempt to fetch the latest
544 /// checkpoint from object store while in standby mode.
545 ///
546 /// Applies only when `start_from_checkpoint` is set to `latest`.
547 ///
548 /// Default: 10 seconds
549 #[schema(default = default_pull_interval)]
550 #[serde(default = "default_pull_interval")]
551 pub pull_interval: u64,
552
553 /// The interval (in seconds) between each push of checkpoints to object store.
554 ///
555 /// Default: disabled (no periodic push).
556 #[serde(default)]
557 pub push_interval: Option<u64>,
558
559 /// Extra flags to pass to `rclone`.
560 ///
561 /// WARNING: Supplying incorrect or conflicting flags can break `rclone`.
562 /// Use with caution.
563 ///
564 /// Refer to the docs to see the supported flags:
565 /// - [Global flags](https://rclone.org/flags/)
566 /// - [S3 specific flags](https://rclone.org/s3/)
567 pub flags: Option<Vec<String>>,
568
569 /// The minimum number of checkpoints to retain in object store.
570 /// No checkpoints will be deleted if the total count is below this threshold.
571 ///
572 /// Default: 10
573 #[schema(default = default_retention_min_count)]
574 #[serde(default = "default_retention_min_count")]
575 pub retention_min_count: u32,
576
577 /// The minimum age (in days) a checkpoint must reach before it becomes
578 /// eligible for deletion. All younger checkpoints will be preserved.
579 ///
580 /// Default: 30
581 #[schema(default = default_retention_min_age)]
582 #[serde(default = "default_retention_min_age")]
583 pub retention_min_age: u32,
584}
585
586fn default_pull_interval() -> u64 {
587 10
588}
589
590fn default_retention_min_count() -> u32 {
591 10
592}
593
594fn default_retention_min_age() -> u32 {
595 30
596}
597
598impl SyncConfig {
599 pub fn validate(&self) -> Result<(), String> {
600 if self.standby && self.start_from_checkpoint.is_none() {
601 return Err(r#"invalid sync config: `standby` set to `true` but `start_from_checkpoint` not set.
602Standby mode requires `start_from_checkpoint` to be set.
603Consider setting `start_from_checkpoint` to `"latest"`."#.to_owned());
604 }
605
606 Ok(())
607 }
608}
609
610/// Configuration for supplying a custom pipeline StatefulSet template via a Kubernetes ConfigMap.
611///
612/// Operators can provide a custom StatefulSet YAML that the Kubernetes runner will use when
613/// creating pipeline StatefulSets for a pipeline. The custom template must be stored as the
614/// value of a key in a ConfigMap in the same namespace as the pipeline; set `name` to the
615/// ConfigMap name and `key` to the entry that contains the template.
616///
617/// Recommendations and requirements:
618/// - **Start from the default template and modify it as needed.** The default template is present
619/// in ConfigMap named as `<release-name>-pipeline-template`, with key `pipelineTemplate` in the release
620/// namespace and should be used as a reference.
621/// - The template must contain a valid Kubernetes `StatefulSet` manifest in YAML form. The
622/// runner substitutes variables in the template before parsing; therefore the final YAML
623/// must be syntactically valid.
624/// - The runner performs simple string substitution for the following placeholders. Please ensure these
625/// placeholders are placed at appropriate location for their semantics:
626/// - `{id}`: pipeline Kubernetes name (used for object names and labels)
627/// - `{namespace}`: Kubernetes namespace where the pipeline runs
628/// - `{pipeline_executor_image}`: container image used to run the pipeline executor
629/// - `{binary_ref}`: program binary reference passed as an argument
630/// - `{program_info_ref}`: program info reference passed as an argument
631/// - `{pipeline_storage_path}`: mount path for persistent pipeline storage
632/// - `{storage_class_name}`: storage class name to use for PVCs (if applicable)
633/// - `{deployment_id}`: UUID identifying the deployment instance
634/// - `{deployment_initial}`: initial desired runtime status (e.g., `provisioning`)
635/// - `{bootstrap_policy}`: bootstrap policy value when applicable
636#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
637pub struct PipelineTemplateConfig {
638 /// Name of the ConfigMap containing the pipeline template.
639 pub name: String,
640 /// Key in the ConfigMap containing the pipeline template.
641 ///
642 /// If not set, defaults to `pipelineTemplate`.
643 #[schema(default = default_pipeline_template_key)]
644 #[serde(default = "default_pipeline_template_key")]
645 pub key: String,
646}
647
648fn default_pipeline_template_key() -> String {
649 "pipelineTemplate".to_string()
650}
651
652#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
653pub struct ObjectStorageConfig {
654 /// URL.
655 ///
656 /// The following URL schemes are supported:
657 ///
658 /// * S3:
659 /// - `s3://<bucket>/<path>`
660 /// - `s3a://<bucket>/<path>`
661 /// - `https://s3.<region>.amazonaws.com/<bucket>`
662 /// - `https://<bucket>.s3.<region>.amazonaws.com`
663 /// - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
664 /// * Google Cloud Storage:
665 /// - `gs://<bucket>/<path>`
666 /// * Microsoft Azure Blob Storage:
667 /// - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
668 /// - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
669 /// - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
670 /// - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
671 /// - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
672 /// - `azure://<container>/<path>` (custom)
673 /// - `https://<account>.dfs.core.windows.net`
674 /// - `https://<account>.blob.core.windows.net`
675 /// - `https://<account>.blob.core.windows.net/<container>`
676 /// - `https://<account>.dfs.fabric.microsoft.com`
677 /// - `https://<account>.dfs.fabric.microsoft.com/<container>`
678 /// - `https://<account>.blob.fabric.microsoft.com`
679 /// - `https://<account>.blob.fabric.microsoft.com/<container>`
680 ///
681 /// Settings derived from the URL will override other settings.
682 pub url: String,
683
684 /// Additional options as key-value pairs.
685 ///
686 /// The following keys are supported:
687 ///
688 /// * S3:
689 /// - `access_key_id`: AWS Access Key.
690 /// - `secret_access_key`: AWS Secret Access Key.
691 /// - `region`: Region.
692 /// - `default_region`: Default region.
693 /// - `endpoint`: Custom endpoint for communicating with S3,
694 /// e.g. `https://localhost:4566` for testing against a localstack
695 /// instance.
696 /// - `token`: Token to use for requests (passed to underlying provider).
697 /// - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
698 /// * Google Cloud Storage:
699 /// - `service_account`: Path to the service account file.
700 /// - `service_account_key`: The serialized service account key.
701 /// - `google_application_credentials`: Application credentials path.
702 /// - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
703 /// * Microsoft Azure Blob Storage:
704 /// - `access_key`: Azure Access Key.
705 /// - `container_name`: Azure Container Name.
706 /// - `account`: Azure Account.
707 /// - `bearer_token_authorization`: Static bearer token for authorizing requests.
708 /// - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
709 /// - `client_secret`: Client secret for use in client secret flow.
710 /// - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
711 /// - `endpoint`: Override the endpoint for communicating with blob storage.
712 /// - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
713 ///
714 /// Options set through the URL take precedence over those set with these
715 /// options.
716 #[serde(flatten)]
717 pub other_options: BTreeMap<String, String>,
718}
719
720/// Configuration for local file system access.
721#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
722#[serde(default)]
723pub struct FileBackendConfig {
724 /// Whether to use background threads for file I/O.
725 ///
726 /// Background threads should improve performance, but they can reduce
727 /// performance if too few cores are available. This is provided for
728 /// debugging and fine-tuning and should ordinarily be left unset.
729 pub async_threads: Option<bool>,
730
731 /// Per-I/O operation sleep duration, in milliseconds.
732 ///
733 /// This is for simulating slow storage devices. Do not use this in
734 /// production.
735 pub ioop_delay: Option<u64>,
736
737 /// Configuration to synchronize checkpoints to object store.
738 pub sync: Option<SyncConfig>,
739}
740
741/// Global pipeline configuration settings. This is the publicly
742/// exposed type for users to configure pipelines.
743#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
744#[serde(default)]
745pub struct RuntimeConfig {
746 /// Number of DBSP worker threads.
747 ///
748 /// Each DBSP "foreground" worker thread is paired with a "background"
749 /// thread for LSM merging, making the total number of threads twice the
750 /// specified number.
751 ///
752 /// The typical sweet spot for the number of workers is between 4 and 16.
753 /// Each worker increases overall memory consumption for data structures
754 /// used during a step.
755 pub workers: u16,
756
757 /// Number of DBSP hosts.
758 ///
759 /// The worker threads are evenly divided among the hosts. For single-host
760 /// deployments, this should be 1 (the default).
761 ///
762 /// Multihost pipelines are an enterprise-only preview feature.
763 pub hosts: usize,
764
765 /// Storage configuration.
766 ///
767 /// - If this is `None`, the default, the pipeline's state is kept in
768 /// in-memory data-structures. This is useful if the pipeline's state
769 /// will fit in memory and if the pipeline is ephemeral and does not need
770 /// to be recovered after a restart. The pipeline will most likely run
771 /// faster since it does not need to access storage.
772 ///
773 /// - If set, the pipeline's state is kept on storage. This allows the
774 /// pipeline to work with state that will not fit into memory. It also
775 /// allows the state to be checkpointed and recovered across restarts.
776 #[serde(deserialize_with = "deserialize_storage_options")]
777 pub storage: Option<StorageOptions>,
778
779 /// Fault tolerance configuration.
780 #[serde(deserialize_with = "deserialize_fault_tolerance")]
781 pub fault_tolerance: FtConfig,
782
783 /// Enable CPU profiler.
784 ///
785 /// The default value is `true`.
786 pub cpu_profiler: bool,
787
788 /// Enable pipeline tracing.
789 pub tracing: bool,
790
791 /// Jaeger tracing endpoint to send tracing information to.
792 pub tracing_endpoint_jaeger: String,
793
794 /// Minimal input batch size.
795 ///
796 /// The controller delays pushing input records to the circuit until at
797 /// least `min_batch_size_records` records have been received (total
798 /// across all endpoints) or `max_buffering_delay_usecs` microseconds
799 /// have passed since at least one input records has been buffered.
800 /// Defaults to 0.
801 pub min_batch_size_records: u64,
802
803 /// Maximal delay in microseconds to wait for `min_batch_size_records` to
804 /// get buffered by the controller, defaults to 0.
805 pub max_buffering_delay_usecs: u64,
806
807 /// Resource reservations and limits. This is enforced
808 /// only in Feldera Cloud.
809 pub resources: ResourceConfig,
810
811 /// Real-time clock resolution in microseconds.
812 ///
813 /// This parameter controls the execution of queries that use the `NOW()` function. The output of such
814 /// queries depends on the real-time clock and can change over time without any external
815 /// inputs. If the query uses `NOW()`, the pipeline will update the clock value and trigger incremental
816 /// recomputation at most each `clock_resolution_usecs` microseconds. If the query does not use
817 /// `NOW()`, then clock value updates are suppressed and the pipeline ignores this setting.
818 ///
819 /// It is set to 1 second (1,000,000 microseconds) by default.
820 pub clock_resolution_usecs: Option<u64>,
821
822 /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
823 /// its worker threads. Specify at least twice as many CPU numbers as
824 /// workers. CPUs are generally numbered starting from 0. The pipeline
825 /// might not be able to honor CPU pinning requests.
826 ///
827 /// CPU pinning can make pipelines run faster and perform more consistently,
828 /// as long as different pipelines running on the same machine are pinned to
829 /// different CPUs.
830 pub pin_cpus: Vec<usize>,
831
832 /// Timeout in seconds for the `Provisioning` phase of the pipeline.
833 /// Setting this value will override the default of the runner.
834 pub provisioning_timeout_secs: Option<u64>,
835
836 /// The maximum number of connectors initialized in parallel during pipeline
837 /// startup.
838 ///
839 /// At startup, the pipeline must initialize all of its input and output connectors.
840 /// Depending on the number and types of connectors, this can take a long time.
841 /// To accelerate the process, multiple connectors are initialized concurrently.
842 /// This option controls the maximum number of connectors that can be initialized
843 /// in parallel.
844 ///
845 /// The default is 10.
846 pub max_parallel_connector_init: Option<u64>,
847
848 /// Specification of additional (sidecar) containers.
849 pub init_containers: Option<serde_json::Value>,
850
851 /// Deprecated: setting this true or false does not have an effect anymore.
852 pub checkpoint_during_suspend: bool,
853
854 /// Sets the number of available runtime threads for the http server.
855 ///
856 /// In most cases, this does not need to be set explicitly and
857 /// the default is sufficient. Can be increased in case the
858 /// pipeline HTTP API operations are a bottleneck.
859 ///
860 /// If not specified, the default is set to `workers`.
861 pub http_workers: Option<u64>,
862
863 /// Sets the number of available runtime threads for async IO tasks.
864 ///
865 /// This affects some networking and file I/O operations
866 /// especially adapters and ad-hoc queries.
867 ///
868 /// In most cases, this does not need to be set explicitly and
869 /// the default is sufficient. Can be increased in case
870 /// ingress, egress or ad-hoc queries are a bottleneck.
871 ///
872 /// If not specified, the default is set to `workers`.
873 pub io_workers: Option<u64>,
874
875 /// Environment variables for the pipeline process.
876 ///
877 /// These are key-value pairs injected into the pipeline process environment.
878 /// Some variable names are reserved by the platform and cannot be overridden
879 /// (for example `RUST_LOG`, and variables in the `FELDERA_`,
880 /// `KUBERNETES_`, and `TOKIO_` namespaces).
881 #[serde(default)]
882 pub env: BTreeMap<String, String>,
883
884 /// Optional settings for tweaking Feldera internals.
885 ///
886 /// The available key-value pairs change from one version of Feldera to
887 /// another, so users should not depend on particular settings being
888 /// available, or on their behavior.
889 pub dev_tweaks: BTreeMap<String, serde_json::Value>,
890
891 /// Log filtering directives.
892 ///
893 /// If set to a valid [tracing-subscriber] filter, this controls the log
894 /// messages emitted by the pipeline process. Otherwise, or if the filter
895 /// has invalid syntax, messages at "info" severity and higher are written
896 /// to the log and all others are discarded.
897 ///
898 /// [tracing-subscriber]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
899 pub logging: Option<String>,
900
901 /// ConfigMap containing a custom pipeline template (Enterprise only).
902 ///
903 /// This feature is only available in Feldera Enterprise. If set, the Kubernetes runner
904 /// will read the template from the specified ConfigMap and use it instead of the default
905 /// StatefulSet template for the configured pipeline.
906 ///
907 /// check [`PipelineTemplateConfig`] documentation for details.
908 pub pipeline_template_configmap: Option<PipelineTemplateConfig>,
909}
910
911/// Accepts "true" and "false" and converts them to the new format.
912fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
913where
914 D: Deserializer<'de>,
915{
916 struct BoolOrStruct;
917
918 impl<'de> Visitor<'de> for BoolOrStruct {
919 type Value = Option<StorageOptions>;
920
921 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
922 formatter.write_str("boolean or StorageOptions")
923 }
924
925 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
926 where
927 E: de::Error,
928 {
929 match v {
930 false => Ok(None),
931 true => Ok(Some(StorageOptions::default())),
932 }
933 }
934
935 fn visit_unit<E>(self) -> Result<Self::Value, E>
936 where
937 E: de::Error,
938 {
939 Ok(None)
940 }
941
942 fn visit_none<E>(self) -> Result<Self::Value, E>
943 where
944 E: de::Error,
945 {
946 Ok(None)
947 }
948
949 fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
950 where
951 M: MapAccess<'de>,
952 {
953 Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
954 }
955 }
956
957 deserializer.deserialize_any(BoolOrStruct)
958}
959
960/// Accepts very old 'initial_state' and 'latest_checkpoint' as enabling fault
961/// tolerance.
962///
963/// Accepts `null` as disabling fault tolerance.
964///
965/// Otherwise, deserializes [FtConfig] in the way that one might otherwise
966/// expect.
967fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<FtConfig, D::Error>
968where
969 D: Deserializer<'de>,
970{
971 struct StringOrStruct;
972
973 impl<'de> Visitor<'de> for StringOrStruct {
974 type Value = FtConfig;
975
976 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
977 formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
978 }
979
980 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
981 where
982 E: de::Error,
983 {
984 match v {
985 "initial_state" | "latest_checkpoint" => Ok(FtConfig {
986 model: Some(FtModel::default()),
987 ..FtConfig::default()
988 }),
989 _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
990 }
991 }
992
993 fn visit_unit<E>(self) -> Result<Self::Value, E>
994 where
995 E: de::Error,
996 {
997 Ok(FtConfig::default())
998 }
999
1000 fn visit_none<E>(self) -> Result<Self::Value, E>
1001 where
1002 E: de::Error,
1003 {
1004 Ok(FtConfig::default())
1005 }
1006
1007 fn visit_map<M>(self, map: M) -> Result<FtConfig, M::Error>
1008 where
1009 M: MapAccess<'de>,
1010 {
1011 Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))
1012 }
1013 }
1014
1015 deserializer.deserialize_any(StringOrStruct)
1016}
1017
1018impl Default for RuntimeConfig {
1019 fn default() -> Self {
1020 Self {
1021 workers: 8,
1022 hosts: 1,
1023 storage: Some(StorageOptions::default()),
1024 fault_tolerance: FtConfig::default(),
1025 cpu_profiler: true,
1026 tracing: {
1027 // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
1028 // to keep it on by default.
1029 false
1030 },
1031 tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
1032 min_batch_size_records: 0,
1033 max_buffering_delay_usecs: 0,
1034 resources: ResourceConfig::default(),
1035 clock_resolution_usecs: { Some(DEFAULT_CLOCK_RESOLUTION_USECS) },
1036 pin_cpus: Vec::new(),
1037 provisioning_timeout_secs: None,
1038 max_parallel_connector_init: None,
1039 init_containers: None,
1040 checkpoint_during_suspend: true,
1041 io_workers: None,
1042 http_workers: None,
1043 env: BTreeMap::default(),
1044 dev_tweaks: BTreeMap::default(),
1045 logging: None,
1046 pipeline_template_configmap: None,
1047 }
1048 }
1049}
1050
1051/// Fault-tolerance configuration.
1052///
1053/// The default [FtConfig] (via [FtConfig::default]) disables fault tolerance,
1054/// which is the configuration that one gets if [RuntimeConfig] omits fault
1055/// tolerance configuration.
1056///
1057/// The default value for [FtConfig::model] enables fault tolerance, as
1058/// `Some(FtModel::default())`. This is the configuration that one gets if
1059/// [RuntimeConfig] includes a fault tolerance configuration but does not
1060/// specify a particular model.
1061#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1062#[serde(rename_all = "snake_case")]
1063pub struct FtConfig {
1064 /// Fault tolerance model to use.
1065 #[serde(with = "none_as_string")]
1066 #[serde(default = "default_model")]
1067 #[schema(
1068 schema_with = none_as_string_schema::<FtModel>,
1069 )]
1070 pub model: Option<FtModel>,
1071
1072 /// Interval between automatic checkpoints, in seconds.
1073 ///
1074 /// The default is 60 seconds. Values less than 1 or greater than 3600 will
1075 /// be forced into that range.
1076 #[serde(default = "default_checkpoint_interval_secs")]
1077 pub checkpoint_interval_secs: Option<u64>,
1078}
1079
1080fn default_model() -> Option<FtModel> {
1081 Some(FtModel::default())
1082}
1083
1084pub fn default_checkpoint_interval_secs() -> Option<u64> {
1085 Some(60)
1086}
1087
1088impl Default for FtConfig {
1089 fn default() -> Self {
1090 Self {
1091 model: None,
1092 checkpoint_interval_secs: default_checkpoint_interval_secs(),
1093 }
1094 }
1095}
1096
1097#[cfg(test)]
1098mod test {
1099 use super::deserialize_fault_tolerance;
1100 use crate::config::{FtConfig, FtModel};
1101 use serde::{Deserialize, Serialize};
1102
1103 #[test]
1104 fn ft_config() {
1105 #[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)]
1106 #[serde(default)]
1107 struct Wrapper {
1108 #[serde(deserialize_with = "deserialize_fault_tolerance")]
1109 config: FtConfig,
1110 }
1111
1112 // Omitting FtConfig, or specifying null, or specifying model "none", disables fault tolerance.
1113 for s in [
1114 "{}",
1115 r#"{"config": null}"#,
1116 r#"{"config": {"model": "none"}}"#,
1117 ] {
1118 let config: Wrapper = serde_json::from_str(s).unwrap();
1119 assert_eq!(
1120 config,
1121 Wrapper {
1122 config: FtConfig {
1123 model: None,
1124 checkpoint_interval_secs: Some(60)
1125 }
1126 }
1127 );
1128 }
1129
1130 // Serializing disabled FT produces explicit "none" form.
1131 let s = serde_json::to_string(&Wrapper {
1132 config: FtConfig::default(),
1133 })
1134 .unwrap();
1135 assert!(s.contains("\"none\""));
1136
1137 // `{}` for FtConfig, or `{...}` with `model` omitted, enables fault
1138 // tolerance.
1139 for s in [r#"{"config": {}}"#, r#"{"checkpoint_interval_secs": 60}"#] {
1140 assert_eq!(
1141 serde_json::from_str::<FtConfig>(s).unwrap(),
1142 FtConfig {
1143 model: Some(FtModel::default()),
1144 checkpoint_interval_secs: Some(60)
1145 }
1146 );
1147 }
1148
1149 // `"checkpoint_interval_secs": null` disables periodic checkpointing.
1150 assert_eq!(
1151 serde_json::from_str::<FtConfig>(r#"{"checkpoint_interval_secs": null}"#).unwrap(),
1152 FtConfig {
1153 model: Some(FtModel::default()),
1154 checkpoint_interval_secs: None
1155 }
1156 );
1157 }
1158}
1159
1160impl FtConfig {
1161 pub fn is_enabled(&self) -> bool {
1162 self.model.is_some()
1163 }
1164
1165 /// Returns the checkpoint interval, if fault tolerance is enabled, and
1166 /// otherwise `None`.
1167 pub fn checkpoint_interval(&self) -> Option<Duration> {
1168 if self.is_enabled() {
1169 self.checkpoint_interval_secs
1170 .map(|interval| Duration::from_secs(interval.clamp(1, 3600)))
1171 } else {
1172 None
1173 }
1174 }
1175}
1176
1177/// Serde implementation for de/serializing a string into `Option<T>` where
1178/// `"none"` indicates `None` and any other string indicates `Some`.
1179///
1180/// This could be extended to handle non-strings by adding more forwarding
1181/// `visit_*` methods to the Visitor implementation. I don't see a way to write
1182/// them automatically.
1183mod none_as_string {
1184 use std::marker::PhantomData;
1185
1186 use serde::de::{Deserialize, Deserializer, IntoDeserializer, Visitor};
1187 use serde::ser::{Serialize, Serializer};
1188
1189 pub(super) fn serialize<S, T>(value: &Option<T>, serializer: S) -> Result<S::Ok, S::Error>
1190 where
1191 S: Serializer,
1192 T: Serialize,
1193 {
1194 match value.as_ref() {
1195 Some(value) => value.serialize(serializer),
1196 None => "none".serialize(serializer),
1197 }
1198 }
1199
1200 struct NoneAsString<T>(PhantomData<fn() -> T>);
1201
1202 impl<'de, T> Visitor<'de> for NoneAsString<T>
1203 where
1204 T: Deserialize<'de>,
1205 {
1206 type Value = Option<T>;
1207
1208 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1209 formatter.write_str("string")
1210 }
1211
1212 fn visit_none<E>(self) -> Result<Self::Value, E>
1213 where
1214 E: serde::de::Error,
1215 {
1216 Ok(None)
1217 }
1218
1219 fn visit_str<E>(self, value: &str) -> Result<Option<T>, E>
1220 where
1221 E: serde::de::Error,
1222 {
1223 if &value.to_ascii_lowercase() == "none" {
1224 Ok(None)
1225 } else {
1226 Ok(Some(T::deserialize(value.into_deserializer())?))
1227 }
1228 }
1229 }
1230
1231 pub(super) fn deserialize<'de, D, T>(deserializer: D) -> Result<Option<T>, D::Error>
1232 where
1233 D: Deserializer<'de>,
1234 T: Deserialize<'de>,
1235 {
1236 deserializer.deserialize_str(NoneAsString(PhantomData))
1237 }
1238}
1239
1240/// Generates an OpenAPI schema for an `Option<T>` field serialized with `none_as_string`.
1241/// The schema is a `oneOf` with a reference to `T`'s schema and a `"none"` string enum.
1242fn none_as_string_schema<'a, T: ToSchema<'a> + Default + Serialize>() -> Schema {
1243 Schema::OneOf(
1244 OneOfBuilder::new()
1245 .item(RefOr::Ref(Ref::new(format!(
1246 "#/components/schemas/{}",
1247 T::schema().0
1248 ))))
1249 .item(
1250 ObjectBuilder::new()
1251 .schema_type(SchemaType::String)
1252 .enum_values(Some(vec!["none"])),
1253 )
1254 .default(Some(
1255 serde_json::to_value(T::default()).expect("Failed to serialize default value"),
1256 ))
1257 .build(),
1258 )
1259}
1260
1261/// Fault tolerance model.
1262///
1263/// The ordering is significant: we consider [Self::ExactlyOnce] to be a "higher
1264/// level" of fault tolerance than [Self::AtLeastOnce].
1265#[derive(
1266 Debug, Copy, Clone, Default, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, ToSchema,
1267)]
1268#[serde(rename_all = "snake_case")]
1269pub enum FtModel {
1270 /// Each record is output at least once. Crashes may duplicate output, but
1271 /// no input or output is dropped.
1272 AtLeastOnce,
1273
1274 /// Each record is output exactly once. Crashes do not drop or duplicate
1275 /// input or output.
1276 #[default]
1277 ExactlyOnce,
1278}
1279
1280impl FtModel {
1281 pub fn option_as_str(value: Option<FtModel>) -> &'static str {
1282 value.map_or("no", |model| model.as_str())
1283 }
1284
1285 pub fn as_str(&self) -> &'static str {
1286 match self {
1287 FtModel::AtLeastOnce => "at_least_once",
1288 FtModel::ExactlyOnce => "exactly_once",
1289 }
1290 }
1291}
1292
1293pub struct FtModelUnknown;
1294
1295impl FromStr for FtModel {
1296 type Err = FtModelUnknown;
1297
1298 fn from_str(s: &str) -> Result<Self, Self::Err> {
1299 match s.to_ascii_lowercase().as_str() {
1300 "exactly_once" => Ok(Self::ExactlyOnce),
1301 "at_least_once" => Ok(Self::AtLeastOnce),
1302 _ => Err(FtModelUnknown),
1303 }
1304 }
1305}
1306
1307/// Describes an input connector configuration
1308#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1309pub struct InputEndpointConfig {
1310 /// The name of the input stream of the circuit that this endpoint is
1311 /// connected to.
1312 pub stream: Cow<'static, str>,
1313
1314 /// Connector configuration.
1315 #[serde(flatten)]
1316 pub connector_config: ConnectorConfig,
1317}
1318
1319/// Deserialize the `start_after` property of a connector configuration.
1320/// It requires a non-standard deserialization because we want to accept
1321/// either a string or an array of strings.
1322fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
1323where
1324 D: Deserializer<'de>,
1325{
1326 let value = Option::<JsonValue>::deserialize(deserializer)?;
1327 match value {
1328 Some(JsonValue::String(s)) => Ok(Some(vec![s])),
1329 Some(JsonValue::Array(arr)) => {
1330 let vec = arr
1331 .into_iter()
1332 .map(|item| {
1333 item.as_str()
1334 .map(|s| s.to_string())
1335 .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
1336 })
1337 .collect::<Result<Vec<String>, _>>()?;
1338 Ok(Some(vec))
1339 }
1340 Some(JsonValue::Null) | None => Ok(None),
1341 _ => Err(serde::de::Error::custom(
1342 "invalid 'start_after' property: expected a string, an array of strings, or null",
1343 )),
1344 }
1345}
1346
1347/// A data connector's configuration
1348#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1349pub struct ConnectorConfig {
1350 /// Transport endpoint configuration.
1351 pub transport: TransportConfig,
1352
1353 #[serde(skip_serializing_if = "Option::is_none")]
1354 pub preprocessor: Option<Vec<PreprocessorConfig>>,
1355
1356 /// Parser configuration.
1357 pub format: Option<FormatConfig>,
1358
1359 /// Name of the index that the connector is attached to.
1360 ///
1361 /// This property is valid for output connectors only. It is used with data
1362 /// transports and formats that expect output updates in the form of key/value
1363 /// pairs, where the key typically represents a unique id associated with the
1364 /// table or view.
1365 ///
1366 /// To support such output formats, an output connector can be attached to an
1367 /// index created using the SQL CREATE INDEX statement. An index of a table
1368 /// or view contains the same updates as the table or view itself, indexed by
1369 /// one or more key columns.
1370 ///
1371 /// See individual connector documentation for details on how they work
1372 /// with indexes.
1373 pub index: Option<String>,
1374
1375 /// Output buffer configuration.
1376 #[serde(flatten)]
1377 pub output_buffer_config: OutputBufferConfig,
1378
1379 /// Maximum number of records from this connector to process in a single batch.
1380 ///
1381 /// When set, this caps how many records are taken from the connector’s input
1382 /// buffer and pushed through the circuit at once.
1383 ///
1384 /// This is typically configured lower than `max_queued_records` to allow the
1385 /// connector time to restart and refill its buffer while a batch is being
1386 /// processed.
1387 ///
1388 /// Not all input adapters honor this limit.
1389 ///
1390 /// If this is not set, the batch size is derived from `max_worker_batch_size`.
1391 #[serde(skip_serializing_if = "Option::is_none")]
1392 pub max_batch_size: Option<u64>,
1393
1394 /// Maximum number of records processed per batch, per worker thread.
1395 ///
1396 /// When `max_batch_size` is not set, this setting is used to cap
1397 /// the number of records that can be taken from the connector’s input
1398 /// buffer and pushed through the circuit at once. The effective batch size is computed as:
1399 /// `max_worker_batch_size × workers`.
1400 ///
1401 /// This provides an alternative to `max_batch_size` that automatically adjusts batch
1402 /// size as the number of worker threads changes to maintain constant amount of
1403 /// work per worker per batch.
1404 ///
1405 /// Defaults to 10,000 records per worker.
1406 #[serde(skip_serializing_if = "Option::is_none")]
1407 pub max_worker_batch_size: Option<u64>,
1408
1409 /// Backpressure threshold.
1410 ///
1411 /// Maximal number of records queued by the endpoint before the endpoint
1412 /// is paused by the backpressure mechanism.
1413 ///
1414 /// For input endpoints, this setting bounds the number of records that have
1415 /// been received from the input transport but haven't yet been consumed by
1416 /// the circuit since the circuit, since the circuit is still busy processing
1417 /// previous inputs.
1418 ///
1419 /// For output endpoints, this setting bounds the number of records that have
1420 /// been produced by the circuit but not yet sent via the output transport endpoint
1421 /// nor stored in the output buffer (see `enable_output_buffer`).
1422 ///
1423 /// Note that this is not a hard bound: there can be a small delay between
1424 /// the backpressure mechanism is triggered and the endpoint is paused, during
1425 /// which more data may be queued.
1426 ///
1427 /// The default is 1 million.
1428 #[serde(default = "default_max_queued_records")]
1429 pub max_queued_records: u64,
1430
1431 /// Create connector in paused state.
1432 ///
1433 /// The default is `false`.
1434 #[serde(default)]
1435 pub paused: bool,
1436
1437 /// Arbitrary user-defined text labels associated with the connector.
1438 ///
1439 /// These labels can be used in conjunction with the `start_after` property
1440 /// to control the start order of connectors.
1441 #[serde(default)]
1442 pub labels: Vec<String>,
1443
1444 /// Start the connector after all connectors with specified labels.
1445 ///
1446 /// This property is used to control the start order of connectors.
1447 /// The connector will not start until all connectors with the specified
1448 /// labels have finished processing all inputs.
1449 #[serde(deserialize_with = "deserialize_start_after")]
1450 #[serde(default)]
1451 pub start_after: Option<Vec<String>>,
1452}
1453
1454impl ConnectorConfig {
1455 /// Compare two configs modulo the `paused` field.
1456 ///
1457 /// Used to compare checkpointed and current connector configs.
1458 pub fn equal_modulo_paused(&self, other: &Self) -> bool {
1459 let mut a = self.clone();
1460 let mut b = other.clone();
1461 a.paused = false;
1462 b.paused = false;
1463 a == b
1464 }
1465}
1466
1467#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1468#[serde(default)]
1469pub struct OutputBufferConfig {
1470 /// Enable output buffering.
1471 ///
1472 /// The output buffering mechanism allows decoupling the rate at which the pipeline
1473 /// pushes changes to the output transport from the rate of input changes.
1474 ///
1475 /// By default, output updates produced by the pipeline are pushed directly to
1476 /// the output transport. Some destinations may prefer to receive updates in fewer
1477 /// bigger batches. For instance, when writing Parquet files, producing
1478 /// one bigger file every few minutes is usually better than creating
1479 /// small files every few milliseconds.
1480 ///
1481 /// To achieve such input/output decoupling, users can enable output buffering by
1482 /// setting the `enable_output_buffer` flag to `true`. When buffering is enabled, output
1483 /// updates produced by the pipeline are consolidated in an internal buffer and are
1484 /// pushed to the output transport when one of several conditions is satisfied:
1485 ///
1486 /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
1487 /// milliseconds.
1488 /// * buffer size exceeds `max_output_buffer_size_records` records.
1489 ///
1490 /// This flag is `false` by default.
1491 // TODO: on-demand output triggered via the API.
1492 pub enable_output_buffer: bool,
1493
1494 /// Maximum time in milliseconds data is kept in the output buffer.
1495 ///
1496 /// By default, data is kept in the buffer indefinitely until one of
1497 /// the other output conditions is satisfied. When this option is
1498 /// set the buffer will be flushed at most every
1499 /// `max_output_buffer_time_millis` milliseconds.
1500 ///
1501 /// NOTE: this configuration option requires the `enable_output_buffer` flag
1502 /// to be set.
1503 pub max_output_buffer_time_millis: usize,
1504
1505 /// Maximum number of updates to be kept in the output buffer.
1506 ///
1507 /// This parameter bounds the maximal size of the buffer.
1508 /// Note that the size of the buffer is not always equal to the
1509 /// total number of updates output by the pipeline. Updates to the
1510 /// same record can overwrite or cancel previous updates.
1511 ///
1512 /// By default, the buffer can grow indefinitely until one of
1513 /// the other output conditions is satisfied.
1514 ///
1515 /// NOTE: this configuration option requires the `enable_output_buffer` flag
1516 /// to be set.
1517 pub max_output_buffer_size_records: usize,
1518}
1519
1520impl Default for OutputBufferConfig {
1521 fn default() -> Self {
1522 Self {
1523 enable_output_buffer: false,
1524 max_output_buffer_size_records: usize::MAX,
1525 max_output_buffer_time_millis: usize::MAX,
1526 }
1527 }
1528}
1529
1530impl OutputBufferConfig {
1531 pub fn validate(&self) -> Result<(), String> {
1532 if self.enable_output_buffer
1533 && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
1534 && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
1535 {
1536 return Err(
1537 "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
1538 .to_string(),
1539 );
1540 }
1541
1542 Ok(())
1543 }
1544}
1545
1546/// Describes an output connector configuration
1547#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1548pub struct OutputEndpointConfig {
1549 /// The name of the output stream of the circuit that this endpoint is
1550 /// connected to.
1551 pub stream: Cow<'static, str>,
1552
1553 /// Connector configuration.
1554 #[serde(flatten)]
1555 pub connector_config: ConnectorConfig,
1556}
1557
1558/// Transport-specific endpoint configuration passed to
1559/// `crate::OutputTransport::new_endpoint`
1560/// and `crate::InputTransport::new_endpoint`.
1561#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1562#[serde(tag = "name", content = "config", rename_all = "snake_case")]
1563pub enum TransportConfig {
1564 FileInput(FileInputConfig),
1565 FileOutput(FileOutputConfig),
1566 NatsInput(NatsInputConfig),
1567 KafkaInput(KafkaInputConfig),
1568 KafkaOutput(KafkaOutputConfig),
1569 PubSubInput(PubSubInputConfig),
1570 UrlInput(UrlInputConfig),
1571 S3Input(S3InputConfig),
1572 DeltaTableInput(DeltaTableReaderConfig),
1573 DeltaTableOutput(DeltaTableWriterConfig),
1574 RedisOutput(RedisOutputConfig),
1575 // Prevent rust from complaining about large size difference between enum variants.
1576 IcebergInput(Box<IcebergReaderConfig>),
1577 PostgresInput(PostgresReaderConfig),
1578 PostgresOutput(PostgresWriterConfig),
1579 Datagen(DatagenInputConfig),
1580 Nexmark(NexmarkInputConfig),
1581 /// Direct HTTP input: cannot be instantiated through API
1582 HttpInput(HttpInputConfig),
1583 /// Direct HTTP output: cannot be instantiated through API
1584 HttpOutput,
1585 /// Ad hoc input: cannot be instantiated through API
1586 AdHocInput(AdHocInputConfig),
1587 ClockInput(ClockConfig),
1588}
1589
1590impl TransportConfig {
1591 pub fn name(&self) -> String {
1592 match self {
1593 TransportConfig::FileInput(_) => "file_input".to_string(),
1594 TransportConfig::FileOutput(_) => "file_output".to_string(),
1595 TransportConfig::NatsInput(_) => "nats_input".to_string(),
1596 TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
1597 TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
1598 TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
1599 TransportConfig::UrlInput(_) => "url_input".to_string(),
1600 TransportConfig::S3Input(_) => "s3_input".to_string(),
1601 TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
1602 TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
1603 TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
1604 TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
1605 TransportConfig::PostgresOutput(_) => "postgres_output".to_string(),
1606 TransportConfig::Datagen(_) => "datagen".to_string(),
1607 TransportConfig::Nexmark(_) => "nexmark".to_string(),
1608 TransportConfig::HttpInput(_) => "http_input".to_string(),
1609 TransportConfig::HttpOutput => "http_output".to_string(),
1610 TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
1611 TransportConfig::RedisOutput(_) => "redis_output".to_string(),
1612 TransportConfig::ClockInput(_) => "clock".to_string(),
1613 }
1614 }
1615
1616 /// Returns true if the connector is transient, i.e., is created and destroyed
1617 /// at runtime on demand, rather than being configured as part of the pipeline.
1618 pub fn is_transient(&self) -> bool {
1619 matches!(
1620 self,
1621 TransportConfig::AdHocInput(_)
1622 | TransportConfig::HttpInput(_)
1623 | TransportConfig::HttpOutput
1624 | TransportConfig::ClockInput(_)
1625 )
1626 }
1627}
1628
1629/// Data format specification used to parse raw data received from the
1630/// endpoint or to encode data sent to the endpoint.
1631#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
1632pub struct FormatConfig {
1633 /// Format name, e.g., "csv", "json", "bincode", etc.
1634 pub name: Cow<'static, str>,
1635
1636 /// Format-specific parser or encoder configuration.
1637 #[serde(default)]
1638 #[schema(value_type = Object)]
1639 pub config: JsonValue,
1640}
1641
1642#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, ToSchema)]
1643#[serde(default)]
1644pub struct ResourceConfig {
1645 /// The minimum number of CPU cores to reserve
1646 /// for an instance of this pipeline
1647 #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1648 pub cpu_cores_min: Option<f64>,
1649
1650 /// The maximum number of CPU cores to reserve
1651 /// for an instance of this pipeline
1652 #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1653 pub cpu_cores_max: Option<f64>,
1654
1655 /// The minimum memory in Megabytes to reserve
1656 /// for an instance of this pipeline
1657 pub memory_mb_min: Option<u64>,
1658
1659 /// The maximum memory in Megabytes to reserve
1660 /// for an instance of this pipeline
1661 pub memory_mb_max: Option<u64>,
1662
1663 /// The total storage in Megabytes to reserve
1664 /// for an instance of this pipeline
1665 pub storage_mb_max: Option<u64>,
1666
1667 /// Storage class to use for an instance of this pipeline.
1668 /// The class determines storage performance such as IOPS and throughput.
1669 pub storage_class: Option<String>,
1670
1671 /// Kubernetes service account name to use for an instance of this pipeline.
1672 /// The account determines permissions and access controls.
1673 pub service_account_name: Option<String>,
1674
1675 /// Kubernetes namespace to use for an instance of this pipeline.
1676 /// The namespace determines the scope of names for resources created
1677 /// for the pipeline.
1678 /// If not set, the pipeline will be deployed in the same namespace
1679 /// as the control-plane.
1680 pub namespace: Option<String>,
1681}