feldera_types/config.rs
1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure. The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs. We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::secret_resolver::default_secrets_directory;
9use crate::transport::adhoc::AdHocInputConfig;
10use crate::transport::clock::ClockConfig;
11use crate::transport::datagen::DatagenInputConfig;
12use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
13use crate::transport::file::{FileInputConfig, FileOutputConfig};
14use crate::transport::http::HttpInputConfig;
15use crate::transport::iceberg::IcebergReaderConfig;
16use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
17use crate::transport::nexmark::NexmarkInputConfig;
18use crate::transport::postgres::{PostgresReaderConfig, PostgresWriterConfig};
19use crate::transport::pubsub::PubSubInputConfig;
20use crate::transport::redis::RedisOutputConfig;
21use crate::transport::s3::S3InputConfig;
22use crate::transport::url::UrlInputConfig;
23use core::fmt;
24use serde::de::{self, MapAccess, Visitor};
25use serde::{Deserialize, Deserializer, Serialize};
26use serde_json::Value as JsonValue;
27use serde_yaml::Value as YamlValue;
28use std::fmt::Display;
29use std::path::Path;
30use std::str::FromStr;
31use std::time::Duration;
32use std::{borrow::Cow, cmp::max, collections::BTreeMap};
33use utoipa::openapi::{ObjectBuilder, OneOfBuilder, Ref, RefOr, Schema, SchemaType};
34use utoipa::ToSchema;
35
36const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
37
38/// Default value of `ConnectorConfig::max_queued_records`.
39pub const fn default_max_queued_records() -> u64 {
40 1_000_000
41}
42
43/// Default maximum batch size for connectors, in records.
44///
45/// If you change this then update the comment on
46/// [ConnectorConfig::max_batch_size].
47pub const fn default_max_batch_size() -> u64 {
48 10_000
49}
50
51pub const DEFAULT_CLOCK_RESOLUTION_USECS: u64 = 1_000_000;
52
53/// Pipeline deployment configuration.
54/// It represents configuration entries directly provided by the user
55/// (e.g., runtime configuration) and entries derived from the schema
56/// of the compiled program (e.g., connectors). Storage configuration,
57/// if applicable, is set by the runner.
58#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
59pub struct PipelineConfig {
60 /// Global controller configuration.
61 #[serde(flatten)]
62 #[schema(inline)]
63 pub global: RuntimeConfig,
64
65 /// Pipeline name.
66 pub name: Option<String>,
67
68 /// Configuration for persistent storage
69 ///
70 /// If `global.storage` is `Some(_)`, this field must be set to some
71 /// [`StorageConfig`]. If `global.storage` is `None``, the pipeline ignores
72 /// this field.
73 #[serde(default)]
74 pub storage_config: Option<StorageConfig>,
75
76 /// Directory containing values of secrets.
77 ///
78 /// If this is not set, a default directory is used.
79 pub secrets_dir: Option<String>,
80
81 /// Input endpoint configuration.
82 pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
83
84 /// Output endpoint configuration.
85 #[serde(default)]
86 pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
87}
88
89impl PipelineConfig {
90 pub fn max_parallel_connector_init(&self) -> u64 {
91 max(
92 self.global
93 .max_parallel_connector_init
94 .unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
95 1,
96 )
97 }
98
99 pub fn with_storage(self, storage: Option<(StorageConfig, StorageOptions)>) -> Self {
100 let (storage_config, storage_options) = storage.unzip();
101 Self {
102 global: RuntimeConfig {
103 storage: storage_options,
104 ..self.global
105 },
106 storage_config,
107 ..self
108 }
109 }
110
111 pub fn storage(&self) -> Option<(&StorageConfig, &StorageOptions)> {
112 let storage_options = self.global.storage.as_ref();
113 let storage_config = self.storage_config.as_ref();
114 storage_config.zip(storage_options)
115 }
116
117 /// Returns `self.secrets_dir`, or the default secrets directory if it isn't
118 /// set.
119 pub fn secrets_dir(&self) -> &Path {
120 match &self.secrets_dir {
121 Some(dir) => Path::new(dir.as_str()),
122 None => default_secrets_directory(),
123 }
124 }
125}
126
127/// Configuration for persistent storage in a [`PipelineConfig`].
128#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
129pub struct StorageConfig {
130 /// A directory to keep pipeline state, as a path on the filesystem of the
131 /// machine or container where the pipeline will run.
132 ///
133 /// When storage is enabled, this directory stores the data for
134 /// [StorageBackendConfig::Default].
135 ///
136 /// When fault tolerance is enabled, this directory stores checkpoints and
137 /// the log.
138 pub path: String,
139
140 /// How to cache access to storage in this pipeline.
141 #[serde(default)]
142 pub cache: StorageCacheConfig,
143}
144
145impl StorageConfig {
146 pub fn path(&self) -> &Path {
147 Path::new(&self.path)
148 }
149}
150
151/// How to cache access to storage within a Feldera pipeline.
152#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
153#[serde(rename_all = "snake_case")]
154pub enum StorageCacheConfig {
155 /// Use the operating system's page cache as the primary storage cache.
156 ///
157 /// This is the default because it currently performs better than
158 /// `FelderaCache`.
159 #[default]
160 PageCache,
161
162 /// Use Feldera's internal cache implementation.
163 ///
164 /// This is under development. It will become the default when its
165 /// performance exceeds that of `PageCache`.
166 FelderaCache,
167}
168
169impl StorageCacheConfig {
170 #[cfg(unix)]
171 pub fn to_custom_open_flags(&self) -> i32 {
172 match self {
173 StorageCacheConfig::PageCache => (),
174 StorageCacheConfig::FelderaCache => {
175 #[cfg(target_os = "linux")]
176 return libc::O_DIRECT;
177 }
178 }
179 0
180 }
181}
182
183/// Storage configuration for a pipeline.
184#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
185#[serde(default)]
186pub struct StorageOptions {
187 /// How to connect to the underlying storage.
188 pub backend: StorageBackendConfig,
189
190 /// For a batch of data maintained as part of a persistent index during a
191 /// pipeline run, the minimum estimated number of bytes to write it to
192 /// storage.
193 ///
194 /// This is provided for debugging and fine-tuning and should ordinarily be
195 /// left unset.
196 ///
197 /// A value of 0 will write even empty batches to storage, and nonzero
198 /// values provide a threshold. `usize::MAX` would effectively disable
199 /// storage for such batches. The default is 1,048,576 (1 MiB).
200 pub min_storage_bytes: Option<usize>,
201
202 /// For a batch of data passed through the pipeline during a single step,
203 /// the minimum estimated number of bytes to write it to storage.
204 ///
205 /// This is provided for debugging and fine-tuning and should ordinarily be
206 /// left unset. A value of 0 will write even empty batches to storage, and
207 /// nonzero values provide a threshold. `usize::MAX`, the default,
208 /// effectively disables storage for such batches. If it is set to another
209 /// value, it should ordinarily be greater than or equal to
210 /// `min_storage_bytes`.
211 pub min_step_storage_bytes: Option<usize>,
212
213 /// The form of compression to use in data batches.
214 ///
215 /// Compression has a CPU cost but it can take better advantage of limited
216 /// NVMe and network bandwidth, which means that it can increase overall
217 /// performance.
218 pub compression: StorageCompression,
219
220 /// The maximum size of the in-memory storage cache, in MiB.
221 ///
222 /// If set, the specified cache size is spread across all the foreground and
223 /// background threads. If unset, each foreground or background thread cache
224 /// is limited to 256 MiB.
225 pub cache_mib: Option<usize>,
226}
227
228/// Backend storage configuration.
229#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
230#[serde(tag = "name", content = "config", rename_all = "snake_case")]
231pub enum StorageBackendConfig {
232 /// Use the default storage configuration.
233 ///
234 /// This currently uses the local file system.
235 #[default]
236 Default,
237
238 /// Use the local file system.
239 ///
240 /// This uses ordinary system file operations.
241 File(Box<FileBackendConfig>),
242
243 /// Object storage.
244 Object(ObjectStorageConfig),
245}
246
247impl Display for StorageBackendConfig {
248 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249 match self {
250 StorageBackendConfig::Default => write!(f, "default"),
251 StorageBackendConfig::File(_) => write!(f, "file"),
252 StorageBackendConfig::Object(_) => write!(f, "object"),
253 }
254 }
255}
256
257/// Storage compression algorithm.
258#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
259#[serde(rename_all = "snake_case")]
260pub enum StorageCompression {
261 /// Use Feldera's default compression algorithm.
262 ///
263 /// The default may change as Feldera's performance is tuned and new
264 /// algorithms are introduced.
265 #[default]
266 Default,
267
268 /// Do not compress.
269 None,
270
271 /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
272 Snappy,
273}
274
275#[derive(Debug, Clone, Eq, PartialEq)]
276pub enum StartFromCheckpoint {
277 Latest,
278 Uuid(uuid::Uuid),
279}
280
281impl ToSchema<'_> for StartFromCheckpoint {
282 fn schema() -> (
283 &'static str,
284 utoipa::openapi::RefOr<utoipa::openapi::schema::Schema>,
285 ) {
286 (
287 "StartFromCheckpoint",
288 utoipa::openapi::RefOr::T(Schema::OneOf(
289 OneOfBuilder::new()
290 .item(
291 ObjectBuilder::new()
292 .schema_type(SchemaType::String)
293 .enum_values(Some(["latest"].into_iter()))
294 .build(),
295 )
296 .item(
297 ObjectBuilder::new()
298 .schema_type(SchemaType::String)
299 .format(Some(utoipa::openapi::SchemaFormat::KnownFormat(
300 utoipa::openapi::KnownFormat::Uuid,
301 )))
302 .build(),
303 )
304 .nullable(true)
305 .build(),
306 )),
307 )
308 }
309}
310
311impl<'de> Deserialize<'de> for StartFromCheckpoint {
312 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
313 where
314 D: Deserializer<'de>,
315 {
316 struct StartFromCheckpointVisitor;
317
318 impl<'de> Visitor<'de> for StartFromCheckpointVisitor {
319 type Value = StartFromCheckpoint;
320
321 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
322 formatter.write_str("a UUID string or the string \"latest\"")
323 }
324
325 fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
326 where
327 E: de::Error,
328 {
329 if value == "latest" {
330 Ok(StartFromCheckpoint::Latest)
331 } else {
332 uuid::Uuid::parse_str(value)
333 .map(StartFromCheckpoint::Uuid)
334 .map_err(|_| E::invalid_value(serde::de::Unexpected::Str(value), &self))
335 }
336 }
337 }
338
339 deserializer.deserialize_str(StartFromCheckpointVisitor)
340 }
341}
342
343impl Serialize for StartFromCheckpoint {
344 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
345 where
346 S: serde::Serializer,
347 {
348 match self {
349 StartFromCheckpoint::Latest => serializer.serialize_str("latest"),
350 StartFromCheckpoint::Uuid(uuid) => serializer.serialize_str(&uuid.to_string()),
351 }
352 }
353}
354
355#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
356pub struct SyncConfig {
357 /// The endpoint URL for the storage service.
358 ///
359 /// This is typically required for custom or local S3-compatible storage providers like MinIO.
360 /// Example: `http://localhost:9000`
361 ///
362 /// Relevant rclone config key: [`endpoint`](https://rclone.org/s3/#s3-endpoint)
363 pub endpoint: Option<String>,
364
365 /// The name of the storage bucket.
366 ///
367 /// This may include a path to a folder inside the bucket (e.g., `my-bucket/data`).
368 pub bucket: String,
369
370 /// The region that this bucket is in.
371 ///
372 /// Leave empty for Minio or the default region (`us-east-1` for AWS).
373 pub region: Option<String>,
374
375 /// The name of the cloud storage provider (e.g., `"AWS"`, `"Minio"`).
376 ///
377 /// Used for provider-specific behavior in rclone.
378 /// If omitted, defaults to `"Other"`.
379 ///
380 /// See [rclone S3 provider documentation](https://rclone.org/s3/#s3-provider)
381 pub provider: Option<String>,
382
383 /// The access key used to authenticate with the storage provider.
384 ///
385 /// If not provided, rclone will fall back to environment-based credentials, such as
386 /// `RCLONE_S3_ACCESS_KEY_ID`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
387 /// this can be left empty to allow automatic authentication via the pod's service account.
388 pub access_key: Option<String>,
389
390 /// The secret key used together with the access key for authentication.
391 ///
392 /// If not provided, rclone will fall back to environment-based credentials, such as
393 /// `RCLONE_S3_SECRET_ACCESS_KEY`. In Kubernetes environments using IRSA (IAM Roles for Service Accounts),
394 /// this can be left empty to allow automatic authentication via the pod's service account.
395 pub secret_key: Option<String>,
396
397 /// When set, the pipeline will try fetch the specified checkpoint from the
398 /// object store.
399 ///
400 /// If `fail_if_no_checkpoint` is `true`, the pipeline will fail to initialize.
401 pub start_from_checkpoint: Option<StartFromCheckpoint>,
402
403 /// When true, the pipeline will fail to initialize if fetching the
404 /// specified checkpoint fails (missing, download error).
405 /// When false, the pipeline will start from scratch instead.
406 ///
407 /// False by default.
408 #[schema(default = std::primitive::bool::default)]
409 #[serde(default)]
410 pub fail_if_no_checkpoint: bool,
411
412 /// The number of file transfers to run in parallel.
413 /// Default: 20
414 pub transfers: Option<u8>,
415
416 /// The number of checkers to run in parallel.
417 /// Default: 20
418 pub checkers: Option<u8>,
419
420 /// Set to skip post copy check of checksums, and only check the file sizes.
421 /// This can significantly improve the throughput.
422 /// Defualt: false
423 pub ignore_checksum: Option<bool>,
424
425 /// Number of streams to use for multi-thread downloads.
426 /// Default: 10
427 pub multi_thread_streams: Option<u8>,
428
429 /// Use multi-thread download for files above this size.
430 /// Format: `[size][Suffix]` (Example: 1G, 500M)
431 /// Supported suffixes: k|M|G|T
432 /// Default: 100M
433 pub multi_thread_cutoff: Option<String>,
434
435 /// The number of chunks of the same file that are uploaded for multipart uploads.
436 /// Default: 10
437 pub upload_concurrency: Option<u8>,
438
439 /// When `true`, the pipeline starts in **standby** mode; processing doesn't
440 /// start until activation (`POST /activate`).
441 /// If this pipeline was previously activated and the storage has not been
442 /// cleared, the pipeline will auto activate, no newer checkpoints will be
443 /// fetched.
444 ///
445 /// Standby behavior depends on `start_from_checkpoint`:
446 /// - If `latest`, pipeline continuously fetches the latest available
447 /// checkpoint until activated.
448 /// - If checkpoint UUID, pipeline fetches this checkpoint once and waits
449 /// in standby until activated.
450 ///
451 /// Default: `false`
452 #[schema(default = std::primitive::bool::default)]
453 #[serde(default)]
454 pub standby: bool,
455
456 /// The interval (in seconds) between each attempt to fetch the latest
457 /// checkpoint from object store while in standby mode.
458 ///
459 /// Applies only when `start_from_checkpoint` is set to `latest`.
460 ///
461 /// Default: 10 seconds
462 #[schema(default = default_pull_interval)]
463 #[serde(default = "default_pull_interval")]
464 pub pull_interval: u64,
465
466 /// Extra flags to pass to `rclone`.
467 ///
468 /// WARNING: Supplying incorrect or conflicting flags can break `rclone`.
469 /// Use with caution.
470 ///
471 /// Refer to the docs to see the supported flags:
472 /// - [Global flags](https://rclone.org/flags/)
473 /// - [S3 specific flags](https://rclone.org/s3/)
474 pub flags: Option<Vec<String>>,
475
476 /// The minimum number of checkpoints to retain in object store.
477 /// No checkpoints will be deleted if the total count is below this threshold.
478 ///
479 /// Default: 10
480 #[schema(default = default_retention_min_count)]
481 #[serde(default = "default_retention_min_count")]
482 pub retention_min_count: u32,
483
484 /// The minimum age (in days) a checkpoint must reach before it becomes
485 /// eligible for deletion. All younger checkpoints will be preserved.
486 ///
487 /// Default: 30
488 #[schema(default = default_retention_min_age)]
489 #[serde(default = "default_retention_min_age")]
490 pub retention_min_age: u32,
491}
492
493fn default_pull_interval() -> u64 {
494 10
495}
496
497fn default_retention_min_count() -> u32 {
498 10
499}
500
501fn default_retention_min_age() -> u32 {
502 30
503}
504
505impl SyncConfig {
506 pub fn validate(&self) -> Result<(), String> {
507 if self.standby && self.start_from_checkpoint.is_none() {
508 return Err(r#"invalid sync config: `standby` set to `true` but `start_from_checkpoint` not set.
509Standby mode requires `start_from_checkpoint` to be set.
510Consider setting `start_from_checkpoint` to `"latest"`."#.to_owned());
511 }
512
513 Ok(())
514 }
515}
516
517#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
518pub struct ObjectStorageConfig {
519 /// URL.
520 ///
521 /// The following URL schemes are supported:
522 ///
523 /// * S3:
524 /// - `s3://<bucket>/<path>`
525 /// - `s3a://<bucket>/<path>`
526 /// - `https://s3.<region>.amazonaws.com/<bucket>`
527 /// - `https://<bucket>.s3.<region>.amazonaws.com`
528 /// - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
529 /// * Google Cloud Storage:
530 /// - `gs://<bucket>/<path>`
531 /// * Microsoft Azure Blob Storage:
532 /// - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
533 /// - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
534 /// - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
535 /// - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
536 /// - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
537 /// - `azure://<container>/<path>` (custom)
538 /// - `https://<account>.dfs.core.windows.net`
539 /// - `https://<account>.blob.core.windows.net`
540 /// - `https://<account>.blob.core.windows.net/<container>`
541 /// - `https://<account>.dfs.fabric.microsoft.com`
542 /// - `https://<account>.dfs.fabric.microsoft.com/<container>`
543 /// - `https://<account>.blob.fabric.microsoft.com`
544 /// - `https://<account>.blob.fabric.microsoft.com/<container>`
545 ///
546 /// Settings derived from the URL will override other settings.
547 pub url: String,
548
549 /// Additional options as key-value pairs.
550 ///
551 /// The following keys are supported:
552 ///
553 /// * S3:
554 /// - `access_key_id`: AWS Access Key.
555 /// - `secret_access_key`: AWS Secret Access Key.
556 /// - `region`: Region.
557 /// - `default_region`: Default region.
558 /// - `endpoint`: Custom endpoint for communicating with S3,
559 /// e.g. `https://localhost:4566` for testing against a localstack
560 /// instance.
561 /// - `token`: Token to use for requests (passed to underlying provider).
562 /// - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
563 /// * Google Cloud Storage:
564 /// - `service_account`: Path to the service account file.
565 /// - `service_account_key`: The serialized service account key.
566 /// - `google_application_credentials`: Application credentials path.
567 /// - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
568 /// * Microsoft Azure Blob Storage:
569 /// - `access_key`: Azure Access Key.
570 /// - `container_name`: Azure Container Name.
571 /// - `account`: Azure Account.
572 /// - `bearer_token_authorization`: Static bearer token for authorizing requests.
573 /// - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
574 /// - `client_secret`: Client secret for use in client secret flow.
575 /// - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
576 /// - `endpoint`: Override the endpoint for communicating with blob storage.
577 /// - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
578 ///
579 /// Options set through the URL take precedence over those set with these
580 /// options.
581 #[serde(flatten)]
582 pub other_options: BTreeMap<String, String>,
583}
584
585/// Configuration for local file system access.
586#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
587#[serde(default)]
588pub struct FileBackendConfig {
589 /// Whether to use background threads for file I/O.
590 ///
591 /// Background threads should improve performance, but they can reduce
592 /// performance if too few cores are available. This is provided for
593 /// debugging and fine-tuning and should ordinarily be left unset.
594 pub async_threads: Option<bool>,
595
596 /// Per-I/O operation sleep duration, in milliseconds.
597 ///
598 /// This is for simulating slow storage devices. Do not use this in
599 /// production.
600 pub ioop_delay: Option<u64>,
601
602 /// Configuration to synchronize checkpoints to object store.
603 pub sync: Option<SyncConfig>,
604}
605
606/// Global pipeline configuration settings. This is the publicly
607/// exposed type for users to configure pipelines.
608#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
609#[serde(default)]
610pub struct RuntimeConfig {
611 /// Number of DBSP worker threads.
612 ///
613 /// Each DBSP "foreground" worker thread is paired with a "background"
614 /// thread for LSM merging, making the total number of threads twice the
615 /// specified number.
616 ///
617 /// The typical sweet spot for the number of workers is between 4 and 16.
618 /// Each worker increases overall memory consumption for data structures
619 /// used during a step.
620 pub workers: u16,
621
622 /// Storage configuration.
623 ///
624 /// - If this is `None`, the default, the pipeline's state is kept in
625 /// in-memory data-structures. This is useful if the pipeline's state
626 /// will fit in memory and if the pipeline is ephemeral and does not need
627 /// to be recovered after a restart. The pipeline will most likely run
628 /// faster since it does not need to access storage.
629 ///
630 /// - If set, the pipeline's state is kept on storage. This allows the
631 /// pipeline to work with state that will not fit into memory. It also
632 /// allows the state to be checkpointed and recovered across restarts.
633 #[serde(deserialize_with = "deserialize_storage_options")]
634 pub storage: Option<StorageOptions>,
635
636 /// Fault tolerance configuration.
637 #[serde(deserialize_with = "deserialize_fault_tolerance")]
638 pub fault_tolerance: FtConfig,
639
640 /// Enable CPU profiler.
641 ///
642 /// The default value is `true`.
643 pub cpu_profiler: bool,
644
645 /// Enable pipeline tracing.
646 pub tracing: bool,
647
648 /// Jaeger tracing endpoint to send tracing information to.
649 pub tracing_endpoint_jaeger: String,
650
651 /// Minimal input batch size.
652 ///
653 /// The controller delays pushing input records to the circuit until at
654 /// least `min_batch_size_records` records have been received (total
655 /// across all endpoints) or `max_buffering_delay_usecs` microseconds
656 /// have passed since at least one input records has been buffered.
657 /// Defaults to 0.
658 pub min_batch_size_records: u64,
659
660 /// Maximal delay in microseconds to wait for `min_batch_size_records` to
661 /// get buffered by the controller, defaults to 0.
662 pub max_buffering_delay_usecs: u64,
663
664 /// Resource reservations and limits. This is enforced
665 /// only in Feldera Cloud.
666 pub resources: ResourceConfig,
667
668 /// Real-time clock resolution in microseconds.
669 ///
670 /// This parameter controls the execution of queries that use the `NOW()` function. The output of such
671 /// queries depends on the real-time clock and can change over time without any external
672 /// inputs. If the query uses `NOW()`, the pipeline will update the clock value and trigger incremental
673 /// recomputation at most each `clock_resolution_usecs` microseconds. If the query does not use
674 /// `NOW()`, then clock value updates are suppressed and the pipeline ignores this setting.
675 ///
676 /// It is set to 1 second (1,000,000 microseconds) by default.
677 pub clock_resolution_usecs: Option<u64>,
678
679 /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
680 /// its worker threads. Specify at least twice as many CPU numbers as
681 /// workers. CPUs are generally numbered starting from 0. The pipeline
682 /// might not be able to honor CPU pinning requests.
683 ///
684 /// CPU pinning can make pipelines run faster and perform more consistently,
685 /// as long as different pipelines running on the same machine are pinned to
686 /// different CPUs.
687 pub pin_cpus: Vec<usize>,
688
689 /// Timeout in seconds for the `Provisioning` phase of the pipeline.
690 /// Setting this value will override the default of the runner.
691 pub provisioning_timeout_secs: Option<u64>,
692
693 /// The maximum number of connectors initialized in parallel during pipeline
694 /// startup.
695 ///
696 /// At startup, the pipeline must initialize all of its input and output connectors.
697 /// Depending on the number and types of connectors, this can take a long time.
698 /// To accelerate the process, multiple connectors are initialized concurrently.
699 /// This option controls the maximum number of connectors that can be initialized
700 /// in parallel.
701 ///
702 /// The default is 10.
703 pub max_parallel_connector_init: Option<u64>,
704
705 /// Specification of additional (sidecar) containers.
706 pub init_containers: Option<serde_yaml::Value>,
707
708 /// Deprecated: setting this true or false does not have an effect anymore.
709 pub checkpoint_during_suspend: bool,
710
711 /// Sets the number of available runtime threads for the http server.
712 ///
713 /// In most cases, this does not need to be set explicitly and
714 /// the default is sufficient. Can be increased in case the
715 /// pipeline HTTP API operations are a bottleneck.
716 ///
717 /// If not specified, the default is set to `workers`.
718 pub http_workers: Option<u64>,
719
720 /// Sets the number of available runtime threads for async IO tasks.
721 ///
722 /// This affects some networking and file I/O operations
723 /// especially adapters and ad-hoc queries.
724 ///
725 /// In most cases, this does not need to be set explicitly and
726 /// the default is sufficient. Can be increased in case
727 /// ingress, egress or ad-hoc queries are a bottleneck.
728 ///
729 /// If not specified, the default is set to `workers`.
730 pub io_workers: Option<u64>,
731
732 /// Optional settings for tweaking Feldera internals.
733 ///
734 /// The available key-value pairs change from one version of Feldera to
735 /// another, so users should not depend on particular settings being
736 /// available, or on their behavior.
737 pub dev_tweaks: BTreeMap<String, serde_json::Value>,
738
739 /// Log filtering directives.
740 ///
741 /// If set to a valid [tracing-subscriber] filter, this controls the log
742 /// messages emitted by the pipeline process. Otherwise, or if the filter
743 /// has invalid syntax, messages at "info" severity and higher are written
744 /// to the log and all others are discarded.
745 ///
746 /// [tracing-subscriber]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
747 pub logging: Option<String>,
748}
749
750/// Accepts "true" and "false" and converts them to the new format.
751fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
752where
753 D: Deserializer<'de>,
754{
755 struct BoolOrStruct;
756
757 impl<'de> Visitor<'de> for BoolOrStruct {
758 type Value = Option<StorageOptions>;
759
760 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
761 formatter.write_str("boolean or StorageOptions")
762 }
763
764 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
765 where
766 E: de::Error,
767 {
768 match v {
769 false => Ok(None),
770 true => Ok(Some(StorageOptions::default())),
771 }
772 }
773
774 fn visit_unit<E>(self) -> Result<Self::Value, E>
775 where
776 E: de::Error,
777 {
778 Ok(None)
779 }
780
781 fn visit_none<E>(self) -> Result<Self::Value, E>
782 where
783 E: de::Error,
784 {
785 Ok(None)
786 }
787
788 fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
789 where
790 M: MapAccess<'de>,
791 {
792 Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
793 }
794 }
795
796 deserializer.deserialize_any(BoolOrStruct)
797}
798
799/// Accepts very old 'initial_state' and 'latest_checkpoint' as enabling fault
800/// tolerance.
801///
802/// Accepts `null` as disabling fault tolerance.
803///
804/// Otherwise, deserializes [FtConfig] in the way that one might otherwise
805/// expect.
806fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<FtConfig, D::Error>
807where
808 D: Deserializer<'de>,
809{
810 struct StringOrStruct;
811
812 impl<'de> Visitor<'de> for StringOrStruct {
813 type Value = FtConfig;
814
815 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
816 formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
817 }
818
819 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
820 where
821 E: de::Error,
822 {
823 match v {
824 "initial_state" | "latest_checkpoint" => Ok(FtConfig {
825 model: Some(FtModel::default()),
826 ..FtConfig::default()
827 }),
828 _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
829 }
830 }
831
832 fn visit_unit<E>(self) -> Result<Self::Value, E>
833 where
834 E: de::Error,
835 {
836 Ok(FtConfig::default())
837 }
838
839 fn visit_none<E>(self) -> Result<Self::Value, E>
840 where
841 E: de::Error,
842 {
843 Ok(FtConfig::default())
844 }
845
846 fn visit_map<M>(self, map: M) -> Result<FtConfig, M::Error>
847 where
848 M: MapAccess<'de>,
849 {
850 Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))
851 }
852 }
853
854 deserializer.deserialize_any(StringOrStruct)
855}
856
857impl Default for RuntimeConfig {
858 fn default() -> Self {
859 Self {
860 workers: 8,
861 storage: Some(StorageOptions::default()),
862 fault_tolerance: FtConfig::default(),
863 cpu_profiler: true,
864 tracing: {
865 // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
866 // to keep it on by default.
867 false
868 },
869 tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
870 min_batch_size_records: 0,
871 max_buffering_delay_usecs: 0,
872 resources: ResourceConfig::default(),
873 clock_resolution_usecs: { Some(DEFAULT_CLOCK_RESOLUTION_USECS) },
874 pin_cpus: Vec::new(),
875 provisioning_timeout_secs: None,
876 max_parallel_connector_init: None,
877 init_containers: None,
878 checkpoint_during_suspend: true,
879 io_workers: None,
880 http_workers: None,
881 dev_tweaks: BTreeMap::default(),
882 logging: None,
883 }
884 }
885}
886
887/// Fault-tolerance configuration.
888///
889/// The default [FtConfig] (via [FtConfig::default]) disables fault tolerance,
890/// which is the configuration that one gets if [RuntimeConfig] omits fault
891/// tolerance configuration.
892///
893/// The default value for [FtConfig::model] enables fault tolerance, as
894/// `Some(FtModel::default())`. This is the configuration that one gets if
895/// [RuntimeConfig] includes a fault tolerance configuration but does not
896/// specify a particular model.
897#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
898#[serde(rename_all = "snake_case")]
899pub struct FtConfig {
900 /// Fault tolerance model to use.
901 #[serde(with = "none_as_string")]
902 #[serde(default = "default_model")]
903 #[schema(
904 schema_with = none_as_string_schema::<FtModel>,
905 )]
906 pub model: Option<FtModel>,
907
908 /// Interval between automatic checkpoints, in seconds.
909 ///
910 /// The default is 60 seconds. Values less than 1 or greater than 3600 will
911 /// be forced into that range.
912 #[serde(default = "default_checkpoint_interval_secs")]
913 pub checkpoint_interval_secs: Option<u64>,
914}
915
916fn default_model() -> Option<FtModel> {
917 Some(FtModel::default())
918}
919
920pub fn default_checkpoint_interval_secs() -> Option<u64> {
921 Some(60)
922}
923
924impl Default for FtConfig {
925 fn default() -> Self {
926 Self {
927 model: None,
928 checkpoint_interval_secs: default_checkpoint_interval_secs(),
929 }
930 }
931}
932
933#[cfg(test)]
934mod test {
935 use super::deserialize_fault_tolerance;
936 use crate::config::{FtConfig, FtModel};
937 use serde::{Deserialize, Serialize};
938
939 #[test]
940 fn ft_config() {
941 #[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)]
942 #[serde(default)]
943 struct Wrapper {
944 #[serde(deserialize_with = "deserialize_fault_tolerance")]
945 config: FtConfig,
946 }
947
948 // Omitting FtConfig, or specifying null, or specifying model "none", disables fault tolerance.
949 for s in [
950 "{}",
951 r#"{"config": null}"#,
952 r#"{"config": {"model": "none"}}"#,
953 ] {
954 let config: Wrapper = serde_json::from_str(s).unwrap();
955 assert_eq!(
956 config,
957 Wrapper {
958 config: FtConfig {
959 model: None,
960 checkpoint_interval_secs: Some(60)
961 }
962 }
963 );
964 }
965
966 // Serializing disabled FT produces explicit "none" form.
967 let s = serde_json::to_string(&Wrapper {
968 config: FtConfig::default(),
969 })
970 .unwrap();
971 assert!(s.contains("\"none\""));
972
973 // `{}` for FtConfig, or `{...}` with `model` omitted, enables fault
974 // tolerance.
975 for s in [r#"{"config": {}}"#, r#"{"checkpoint_interval_secs": 60}"#] {
976 assert_eq!(
977 serde_json::from_str::<FtConfig>(s).unwrap(),
978 FtConfig {
979 model: Some(FtModel::default()),
980 checkpoint_interval_secs: Some(60)
981 }
982 );
983 }
984
985 // `"checkpoint_interval_secs": null` disables periodic checkpointing.
986 assert_eq!(
987 serde_json::from_str::<FtConfig>(r#"{"checkpoint_interval_secs": null}"#).unwrap(),
988 FtConfig {
989 model: Some(FtModel::default()),
990 checkpoint_interval_secs: None
991 }
992 );
993 }
994}
995
996impl FtConfig {
997 pub fn is_enabled(&self) -> bool {
998 self.model.is_some()
999 }
1000
1001 /// Returns the checkpoint interval, if fault tolerance is enabled, and
1002 /// otherwise `None`.
1003 pub fn checkpoint_interval(&self) -> Option<Duration> {
1004 if self.is_enabled() {
1005 self.checkpoint_interval_secs
1006 .map(|interval| Duration::from_secs(interval.clamp(1, 3600)))
1007 } else {
1008 None
1009 }
1010 }
1011}
1012
1013/// Serde implementation for de/serializing a string into `Option<T>` where
1014/// `"none"` indicates `None` and any other string indicates `Some`.
1015///
1016/// This could be extended to handle non-strings by adding more forwarding
1017/// `visit_*` methods to the Visitor implementation. I don't see a way to write
1018/// them automatically.
1019mod none_as_string {
1020 use std::marker::PhantomData;
1021
1022 use serde::de::{Deserialize, Deserializer, IntoDeserializer, Visitor};
1023 use serde::ser::{Serialize, Serializer};
1024
1025 pub(super) fn serialize<S, T>(value: &Option<T>, serializer: S) -> Result<S::Ok, S::Error>
1026 where
1027 S: Serializer,
1028 T: Serialize,
1029 {
1030 match value.as_ref() {
1031 Some(value) => value.serialize(serializer),
1032 None => "none".serialize(serializer),
1033 }
1034 }
1035
1036 struct NoneAsString<T>(PhantomData<fn() -> T>);
1037
1038 impl<'de, T> Visitor<'de> for NoneAsString<T>
1039 where
1040 T: Deserialize<'de>,
1041 {
1042 type Value = Option<T>;
1043
1044 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1045 formatter.write_str("string")
1046 }
1047
1048 fn visit_none<E>(self) -> Result<Self::Value, E>
1049 where
1050 E: serde::de::Error,
1051 {
1052 Ok(None)
1053 }
1054
1055 fn visit_str<E>(self, value: &str) -> Result<Option<T>, E>
1056 where
1057 E: serde::de::Error,
1058 {
1059 if &value.to_ascii_lowercase() == "none" {
1060 Ok(None)
1061 } else {
1062 Ok(Some(T::deserialize(value.into_deserializer())?))
1063 }
1064 }
1065 }
1066
1067 pub(super) fn deserialize<'de, D, T>(deserializer: D) -> Result<Option<T>, D::Error>
1068 where
1069 D: Deserializer<'de>,
1070 T: Deserialize<'de>,
1071 {
1072 deserializer.deserialize_str(NoneAsString(PhantomData))
1073 }
1074}
1075
1076/// Generates an OpenAPI schema for an `Option<T>` field serialized with `none_as_string`.
1077/// The schema is a `oneOf` with a reference to `T`'s schema and a `"none"` string enum.
1078fn none_as_string_schema<'a, T: ToSchema<'a> + Default + Serialize>() -> Schema {
1079 Schema::OneOf(
1080 OneOfBuilder::new()
1081 .item(RefOr::Ref(Ref::new(format!(
1082 "#/components/schemas/{}",
1083 T::schema().0
1084 ))))
1085 .item(
1086 ObjectBuilder::new()
1087 .schema_type(SchemaType::String)
1088 .enum_values(Some(vec!["none"])),
1089 )
1090 .default(Some(
1091 serde_json::to_value(T::default()).expect("Failed to serialize default value"),
1092 ))
1093 .build(),
1094 )
1095}
1096
1097/// Fault tolerance model.
1098///
1099/// The ordering is significant: we consider [Self::ExactlyOnce] to be a "higher
1100/// level" of fault tolerance than [Self::AtLeastOnce].
1101#[derive(
1102 Debug, Copy, Clone, Default, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, ToSchema,
1103)]
1104#[serde(rename_all = "snake_case")]
1105pub enum FtModel {
1106 /// Each record is output at least once. Crashes may duplicate output, but
1107 /// no input or output is dropped.
1108 AtLeastOnce,
1109
1110 /// Each record is output exactly once. Crashes do not drop or duplicate
1111 /// input or output.
1112 #[default]
1113 ExactlyOnce,
1114}
1115
1116impl FtModel {
1117 pub fn option_as_str(value: Option<FtModel>) -> &'static str {
1118 value.map_or("no", |model| model.as_str())
1119 }
1120
1121 pub fn as_str(&self) -> &'static str {
1122 match self {
1123 FtModel::AtLeastOnce => "at_least_once",
1124 FtModel::ExactlyOnce => "exactly_once",
1125 }
1126 }
1127}
1128
1129pub struct FtModelUnknown;
1130
1131impl FromStr for FtModel {
1132 type Err = FtModelUnknown;
1133
1134 fn from_str(s: &str) -> Result<Self, Self::Err> {
1135 match s.to_ascii_lowercase().as_str() {
1136 "exactly_once" => Ok(Self::ExactlyOnce),
1137 "at_least_once" => Ok(Self::AtLeastOnce),
1138 _ => Err(FtModelUnknown),
1139 }
1140 }
1141}
1142
1143/// Describes an input connector configuration
1144#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1145pub struct InputEndpointConfig {
1146 /// The name of the input stream of the circuit that this endpoint is
1147 /// connected to.
1148 pub stream: Cow<'static, str>,
1149
1150 /// Connector configuration.
1151 #[serde(flatten)]
1152 pub connector_config: ConnectorConfig,
1153}
1154
1155/// Deserialize the `start_after` property of a connector configuration.
1156/// It requires a non-standard deserialization because we want to accept
1157/// either a string or an array of strings.
1158fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
1159where
1160 D: Deserializer<'de>,
1161{
1162 let value = Option::<JsonValue>::deserialize(deserializer)?;
1163 match value {
1164 Some(JsonValue::String(s)) => Ok(Some(vec![s])),
1165 Some(JsonValue::Array(arr)) => {
1166 let vec = arr
1167 .into_iter()
1168 .map(|item| {
1169 item.as_str()
1170 .map(|s| s.to_string())
1171 .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
1172 })
1173 .collect::<Result<Vec<String>, _>>()?;
1174 Ok(Some(vec))
1175 }
1176 Some(JsonValue::Null) | None => Ok(None),
1177 _ => Err(serde::de::Error::custom(
1178 "invalid 'start_after' property: expected a string, an array of strings, or null",
1179 )),
1180 }
1181}
1182
1183/// A data connector's configuration
1184#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1185pub struct ConnectorConfig {
1186 /// Transport endpoint configuration.
1187 pub transport: TransportConfig,
1188
1189 /// Parser configuration.
1190 pub format: Option<FormatConfig>,
1191
1192 /// Name of the index that the connector is attached to.
1193 ///
1194 /// This property is valid for output connectors only. It is used with data
1195 /// transports and formats that expect output updates in the form of key/value
1196 /// pairs, where the key typically represents a unique id associated with the
1197 /// table or view.
1198 ///
1199 /// To support such output formats, an output connector can be attached to an
1200 /// index created using the SQL CREATE INDEX statement. An index of a table
1201 /// or view contains the same updates as the table or view itself, indexed by
1202 /// one or more key columns.
1203 ///
1204 /// See individual connector documentation for details on how they work
1205 /// with indexes.
1206 pub index: Option<String>,
1207
1208 /// Output buffer configuration.
1209 #[serde(flatten)]
1210 pub output_buffer_config: OutputBufferConfig,
1211
1212 /// Maximum batch size, in records.
1213 ///
1214 /// This is the maximum number of records to process in one batch through
1215 /// the circuit. The time and space cost of processing a batch is
1216 /// asymptotically superlinear in the size of the batch, but very small
1217 /// batches are less efficient due to constant factors.
1218 ///
1219 /// This should usually be less than `max_queued_records`, to give the
1220 /// connector a round-trip time to restart and refill the buffer while
1221 /// batches are being processed.
1222 ///
1223 /// Some input adapters might not honor this setting.
1224 ///
1225 /// The default is 10,000.
1226 #[serde(default = "default_max_batch_size")]
1227 pub max_batch_size: u64,
1228
1229 /// Backpressure threshold.
1230 ///
1231 /// Maximal number of records queued by the endpoint before the endpoint
1232 /// is paused by the backpressure mechanism.
1233 ///
1234 /// For input endpoints, this setting bounds the number of records that have
1235 /// been received from the input transport but haven't yet been consumed by
1236 /// the circuit since the circuit, since the circuit is still busy processing
1237 /// previous inputs.
1238 ///
1239 /// For output endpoints, this setting bounds the number of records that have
1240 /// been produced by the circuit but not yet sent via the output transport endpoint
1241 /// nor stored in the output buffer (see `enable_output_buffer`).
1242 ///
1243 /// Note that this is not a hard bound: there can be a small delay between
1244 /// the backpressure mechanism is triggered and the endpoint is paused, during
1245 /// which more data may be queued.
1246 ///
1247 /// The default is 1 million.
1248 #[serde(default = "default_max_queued_records")]
1249 pub max_queued_records: u64,
1250
1251 /// Create connector in paused state.
1252 ///
1253 /// The default is `false`.
1254 #[serde(default)]
1255 pub paused: bool,
1256
1257 /// Arbitrary user-defined text labels associated with the connector.
1258 ///
1259 /// These labels can be used in conjunction with the `start_after` property
1260 /// to control the start order of connectors.
1261 #[serde(default)]
1262 pub labels: Vec<String>,
1263
1264 /// Start the connector after all connectors with specified labels.
1265 ///
1266 /// This property is used to control the start order of connectors.
1267 /// The connector will not start until all connectors with the specified
1268 /// labels have finished processing all inputs.
1269 #[serde(deserialize_with = "deserialize_start_after")]
1270 #[serde(default)]
1271 pub start_after: Option<Vec<String>>,
1272}
1273
1274impl ConnectorConfig {
1275 /// Compare two configs modulo the `paused` field.
1276 ///
1277 /// Used to compare checkpointed and current connector configs.
1278 pub fn equal_modulo_paused(&self, other: &Self) -> bool {
1279 let mut a = self.clone();
1280 let mut b = other.clone();
1281 a.paused = false;
1282 b.paused = false;
1283 a == b
1284 }
1285}
1286
1287#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1288#[serde(default)]
1289pub struct OutputBufferConfig {
1290 /// Enable output buffering.
1291 ///
1292 /// The output buffering mechanism allows decoupling the rate at which the pipeline
1293 /// pushes changes to the output transport from the rate of input changes.
1294 ///
1295 /// By default, output updates produced by the pipeline are pushed directly to
1296 /// the output transport. Some destinations may prefer to receive updates in fewer
1297 /// bigger batches. For instance, when writing Parquet files, producing
1298 /// one bigger file every few minutes is usually better than creating
1299 /// small files every few milliseconds.
1300 ///
1301 /// To achieve such input/output decoupling, users can enable output buffering by
1302 /// setting the `enable_output_buffer` flag to `true`. When buffering is enabled, output
1303 /// updates produced by the pipeline are consolidated in an internal buffer and are
1304 /// pushed to the output transport when one of several conditions is satisfied:
1305 ///
1306 /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
1307 /// milliseconds.
1308 /// * buffer size exceeds `max_output_buffer_size_records` records.
1309 ///
1310 /// This flag is `false` by default.
1311 // TODO: on-demand output triggered via the API.
1312 pub enable_output_buffer: bool,
1313
1314 /// Maximum time in milliseconds data is kept in the output buffer.
1315 ///
1316 /// By default, data is kept in the buffer indefinitely until one of
1317 /// the other output conditions is satisfied. When this option is
1318 /// set the buffer will be flushed at most every
1319 /// `max_output_buffer_time_millis` milliseconds.
1320 ///
1321 /// NOTE: this configuration option requires the `enable_output_buffer` flag
1322 /// to be set.
1323 pub max_output_buffer_time_millis: usize,
1324
1325 /// Maximum number of updates to be kept in the output buffer.
1326 ///
1327 /// This parameter bounds the maximal size of the buffer.
1328 /// Note that the size of the buffer is not always equal to the
1329 /// total number of updates output by the pipeline. Updates to the
1330 /// same record can overwrite or cancel previous updates.
1331 ///
1332 /// By default, the buffer can grow indefinitely until one of
1333 /// the other output conditions is satisfied.
1334 ///
1335 /// NOTE: this configuration option requires the `enable_output_buffer` flag
1336 /// to be set.
1337 pub max_output_buffer_size_records: usize,
1338}
1339
1340impl Default for OutputBufferConfig {
1341 fn default() -> Self {
1342 Self {
1343 enable_output_buffer: false,
1344 max_output_buffer_size_records: usize::MAX,
1345 max_output_buffer_time_millis: usize::MAX,
1346 }
1347 }
1348}
1349
1350impl OutputBufferConfig {
1351 pub fn validate(&self) -> Result<(), String> {
1352 if self.enable_output_buffer
1353 && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
1354 && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
1355 {
1356 return Err(
1357 "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
1358 .to_string(),
1359 );
1360 }
1361
1362 Ok(())
1363 }
1364}
1365
1366/// Describes an output connector configuration
1367#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1368pub struct OutputEndpointConfig {
1369 /// The name of the output stream of the circuit that this endpoint is
1370 /// connected to.
1371 pub stream: Cow<'static, str>,
1372
1373 /// Connector configuration.
1374 #[serde(flatten)]
1375 pub connector_config: ConnectorConfig,
1376}
1377
1378/// Transport-specific endpoint configuration passed to
1379/// `crate::OutputTransport::new_endpoint`
1380/// and `crate::InputTransport::new_endpoint`.
1381#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
1382#[serde(tag = "name", content = "config", rename_all = "snake_case")]
1383pub enum TransportConfig {
1384 FileInput(FileInputConfig),
1385 FileOutput(FileOutputConfig),
1386 KafkaInput(KafkaInputConfig),
1387 KafkaOutput(KafkaOutputConfig),
1388 PubSubInput(PubSubInputConfig),
1389 UrlInput(UrlInputConfig),
1390 S3Input(S3InputConfig),
1391 DeltaTableInput(DeltaTableReaderConfig),
1392 DeltaTableOutput(DeltaTableWriterConfig),
1393 RedisOutput(RedisOutputConfig),
1394 // Prevent rust from complaining about large size difference between enum variants.
1395 IcebergInput(Box<IcebergReaderConfig>),
1396 PostgresInput(PostgresReaderConfig),
1397 PostgresOutput(PostgresWriterConfig),
1398 Datagen(DatagenInputConfig),
1399 Nexmark(NexmarkInputConfig),
1400 /// Direct HTTP input: cannot be instantiated through API
1401 HttpInput(HttpInputConfig),
1402 /// Direct HTTP output: cannot be instantiated through API
1403 HttpOutput,
1404 /// Ad hoc input: cannot be instantiated through API
1405 AdHocInput(AdHocInputConfig),
1406 ClockInput(ClockConfig),
1407}
1408
1409impl TransportConfig {
1410 pub fn name(&self) -> String {
1411 match self {
1412 TransportConfig::FileInput(_) => "file_input".to_string(),
1413 TransportConfig::FileOutput(_) => "file_output".to_string(),
1414 TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
1415 TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
1416 TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
1417 TransportConfig::UrlInput(_) => "url_input".to_string(),
1418 TransportConfig::S3Input(_) => "s3_input".to_string(),
1419 TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
1420 TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
1421 TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
1422 TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
1423 TransportConfig::PostgresOutput(_) => "postgres_output".to_string(),
1424 TransportConfig::Datagen(_) => "datagen".to_string(),
1425 TransportConfig::Nexmark(_) => "nexmark".to_string(),
1426 TransportConfig::HttpInput(_) => "http_input".to_string(),
1427 TransportConfig::HttpOutput => "http_output".to_string(),
1428 TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
1429 TransportConfig::RedisOutput(_) => "redis_output".to_string(),
1430 TransportConfig::ClockInput(_) => "clock".to_string(),
1431 }
1432 }
1433
1434 /// Returns true if the connector is transient, i.e., is created and destroyed
1435 /// at runtime on demand, rather than being configured as part of the pipeline.
1436 pub fn is_transient(&self) -> bool {
1437 matches!(
1438 self,
1439 TransportConfig::AdHocInput(_)
1440 | TransportConfig::HttpInput(_)
1441 | TransportConfig::HttpOutput
1442 | TransportConfig::ClockInput(_)
1443 )
1444 }
1445}
1446
1447/// Data format specification used to parse raw data received from the
1448/// endpoint or to encode data sent to the endpoint.
1449#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
1450pub struct FormatConfig {
1451 /// Format name, e.g., "csv", "json", "bincode", etc.
1452 pub name: Cow<'static, str>,
1453
1454 /// Format-specific parser or encoder configuration.
1455 #[serde(default)]
1456 #[schema(value_type = Object)]
1457 pub config: YamlValue,
1458}
1459
1460#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, ToSchema)]
1461#[serde(default)]
1462pub struct ResourceConfig {
1463 /// The minimum number of CPU cores to reserve
1464 /// for an instance of this pipeline
1465 #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1466 pub cpu_cores_min: Option<f64>,
1467
1468 /// The maximum number of CPU cores to reserve
1469 /// for an instance of this pipeline
1470 #[serde(deserialize_with = "crate::serde_via_value::deserialize")]
1471 pub cpu_cores_max: Option<f64>,
1472
1473 /// The minimum memory in Megabytes to reserve
1474 /// for an instance of this pipeline
1475 pub memory_mb_min: Option<u64>,
1476
1477 /// The maximum memory in Megabytes to reserve
1478 /// for an instance of this pipeline
1479 pub memory_mb_max: Option<u64>,
1480
1481 /// The total storage in Megabytes to reserve
1482 /// for an instance of this pipeline
1483 pub storage_mb_max: Option<u64>,
1484
1485 /// Storage class to use for an instance of this pipeline.
1486 /// The class determines storage performance such as IOPS and throughput.
1487 pub storage_class: Option<String>,
1488}