feldera_types/config.rs
1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure. The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs. We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::transport::adhoc::AdHocInputConfig;
9use crate::transport::datagen::DatagenInputConfig;
10use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
11use crate::transport::file::{FileInputConfig, FileOutputConfig};
12use crate::transport::http::HttpInputConfig;
13use crate::transport::iceberg::IcebergReaderConfig;
14use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
15use crate::transport::nexmark::NexmarkInputConfig;
16use crate::transport::postgres::{PostgresReaderConfig, PostgresWriterConfig};
17use crate::transport::pubsub::PubSubInputConfig;
18use crate::transport::redis::RedisOutputConfig;
19use crate::transport::s3::S3InputConfig;
20use crate::transport::url::UrlInputConfig;
21use core::fmt;
22use serde::de::{self, MapAccess, Visitor};
23use serde::{Deserialize, Deserializer, Serialize};
24use serde_json::Value as JsonValue;
25use serde_yaml::Value as YamlValue;
26use std::fmt::Display;
27use std::path::Path;
28use std::str::FromStr;
29use std::time::Duration;
30use std::{borrow::Cow, cmp::max, collections::BTreeMap};
31use utoipa::ToSchema;
32
33const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
34
35/// Default value of `ConnectorConfig::max_queued_records`.
36pub const fn default_max_queued_records() -> u64 {
37 1_000_000
38}
39
40/// Default maximum batch size for connectors, in records.
41///
42/// If you change this then update the comment on
43/// [ConnectorConfig::max_batch_size].
44pub const fn default_max_batch_size() -> u64 {
45 10_000
46}
47
48/// Pipeline deployment configuration.
49/// It represents configuration entries directly provided by the user
50/// (e.g., runtime configuration) and entries derived from the schema
51/// of the compiled program (e.g., connectors). Storage configuration,
52/// if applicable, is set by the runner.
53#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
54pub struct PipelineConfig {
55 /// Global controller configuration.
56 #[serde(flatten)]
57 #[schema(inline)]
58 pub global: RuntimeConfig,
59
60 /// Pipeline name.
61 pub name: Option<String>,
62
63 /// Configuration for persistent storage
64 ///
65 /// If `global.storage` is `Some(_)`, this field must be set to some
66 /// [`StorageConfig`]. If `global.storage` is `None``, the pipeline ignores
67 /// this field.
68 #[serde(default)]
69 pub storage_config: Option<StorageConfig>,
70
71 /// Input endpoint configuration.
72 pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
73
74 /// Output endpoint configuration.
75 #[serde(default)]
76 pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
77}
78
79impl PipelineConfig {
80 pub fn max_parallel_connector_init(&self) -> u64 {
81 max(
82 self.global
83 .max_parallel_connector_init
84 .unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
85 1,
86 )
87 }
88
89 pub fn with_storage(self, storage: Option<(StorageConfig, StorageOptions)>) -> Self {
90 let (storage_config, storage_options) = storage.unzip();
91 Self {
92 global: RuntimeConfig {
93 storage: storage_options,
94 ..self.global
95 },
96 storage_config,
97 ..self
98 }
99 }
100
101 pub fn storage(&self) -> Option<(&StorageConfig, &StorageOptions)> {
102 let storage_options = self.global.storage.as_ref();
103 let storage_config = self.storage_config.as_ref();
104 storage_config.zip(storage_options)
105 }
106}
107
108/// Configuration for persistent storage in a [`PipelineConfig`].
109#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
110pub struct StorageConfig {
111 /// A directory to keep pipeline state, as a path on the filesystem of the
112 /// machine or container where the pipeline will run.
113 ///
114 /// When storage is enabled, this directory stores the data for
115 /// [StorageBackendConfig::Default].
116 ///
117 /// When fault tolerance is enabled, this directory stores checkpoints and
118 /// the log.
119 pub path: String,
120
121 /// How to cache access to storage in this pipeline.
122 #[serde(default)]
123 pub cache: StorageCacheConfig,
124}
125
126impl StorageConfig {
127 pub fn path(&self) -> &Path {
128 Path::new(&self.path)
129 }
130}
131
132/// How to cache access to storage within a Feldera pipeline.
133#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
134#[serde(rename_all = "snake_case")]
135pub enum StorageCacheConfig {
136 /// Use the operating system's page cache as the primary storage cache.
137 ///
138 /// This is the default because it currently performs better than
139 /// `FelderaCache`.
140 #[default]
141 PageCache,
142
143 /// Use Feldera's internal cache implementation.
144 ///
145 /// This is under development. It will become the default when its
146 /// performance exceeds that of `PageCache`.
147 FelderaCache,
148}
149
150impl StorageCacheConfig {
151 #[cfg(unix)]
152 pub fn to_custom_open_flags(&self) -> i32 {
153 match self {
154 StorageCacheConfig::PageCache => (),
155 StorageCacheConfig::FelderaCache => {
156 #[cfg(target_os = "linux")]
157 return libc::O_DIRECT;
158 }
159 }
160 0
161 }
162}
163
164/// Storage configuration for a pipeline.
165#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
166#[serde(default)]
167pub struct StorageOptions {
168 /// How to connect to the underlying storage.
169 pub backend: StorageBackendConfig,
170
171 /// The minimum estimated number of bytes in a batch of data to write it to
172 /// storage. This is provided for debugging and fine-tuning and should
173 /// ordinarily be left unset.
174 ///
175 /// A value of 0 will write even empty batches to storage, and nonzero
176 /// values provide a threshold. `usize::MAX` would effectively disable
177 /// storage.
178 ///
179 /// The default is 1,048,576 (1 MiB).
180 pub min_storage_bytes: Option<usize>,
181
182 /// The form of compression to use in data batches.
183 ///
184 /// Compression has a CPU cost but it can take better advantage of limited
185 /// NVMe and network bandwidth, which means that it can increase overall
186 /// performance.
187 pub compression: StorageCompression,
188
189 /// The maximum size of the in-memory storage cache, in MiB.
190 ///
191 /// If set, the specified cache size is spread across all the foreground and
192 /// background threads. If unset, each foreground or background thread cache
193 /// is limited to 256 MiB.
194 pub cache_mib: Option<usize>,
195}
196
197/// Backend storage configuration.
198#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
199#[serde(tag = "name", content = "config", rename_all = "snake_case")]
200pub enum StorageBackendConfig {
201 /// Use the default storage configuration.
202 ///
203 /// This currently uses the local file system.
204 #[default]
205 Default,
206
207 /// Object storage.
208 Object(ObjectStorageConfig),
209}
210
211impl Display for StorageBackendConfig {
212 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213 match self {
214 StorageBackendConfig::Default => write!(f, "default"),
215 StorageBackendConfig::Object(_) => write!(f, "object"),
216 }
217 }
218}
219
220/// Storage compression algorithm.
221#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
222#[serde(rename_all = "snake_case")]
223pub enum StorageCompression {
224 /// Use Feldera's default compression algorithm.
225 ///
226 /// The default may change as Feldera's performance is tuned and new
227 /// algorithms are introduced.
228 #[default]
229 Default,
230
231 /// Do not compress.
232 None,
233
234 /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
235 Snappy,
236}
237
238#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
239pub struct ObjectStorageConfig {
240 /// URL.
241 ///
242 /// The following URL schemes are supported:
243 ///
244 /// * S3:
245 /// - `s3://<bucket>/<path>`
246 /// - `s3a://<bucket>/<path>`
247 /// - `https://s3.<region>.amazonaws.com/<bucket>`
248 /// - `https://<bucket>.s3.<region>.amazonaws.com`
249 /// - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
250 /// * Google Cloud Storage:
251 /// - `gs://<bucket>/<path>`
252 /// * Microsoft Azure Blob Storage:
253 /// - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
254 /// - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
255 /// - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
256 /// - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
257 /// - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
258 /// - `azure://<container>/<path>` (custom)
259 /// - `https://<account>.dfs.core.windows.net`
260 /// - `https://<account>.blob.core.windows.net`
261 /// - `https://<account>.blob.core.windows.net/<container>`
262 /// - `https://<account>.dfs.fabric.microsoft.com`
263 /// - `https://<account>.dfs.fabric.microsoft.com/<container>`
264 /// - `https://<account>.blob.fabric.microsoft.com`
265 /// - `https://<account>.blob.fabric.microsoft.com/<container>`
266 ///
267 /// Settings derived from the URL will override other settings.
268 pub url: String,
269
270 /// Additional options as key-value pairs.
271 ///
272 /// The following keys are supported:
273 ///
274 /// * S3:
275 /// - `access_key_id`: AWS Access Key.
276 /// - `secret_access_key`: AWS Secret Access Key.
277 /// - `region`: Region.
278 /// - `default_region`: Default region.
279 /// - `endpoint`: Custom endpoint for communicating with S3,
280 /// e.g. `https://localhost:4566` for testing against a localstack
281 /// instance.
282 /// - `token`: Token to use for requests (passed to underlying provider).
283 /// - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
284 /// * Google Cloud Storage:
285 /// - `service_account`: Path to the service account file.
286 /// - `service_account_key`: The serialized service account key.
287 /// - `google_application_credentials`: Application credentials path.
288 /// - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
289 /// * Microsoft Azure Blob Storage:
290 /// - `access_key`: Azure Access Key.
291 /// - `container_name`: Azure Container Name.
292 /// - `account`: Azure Account.
293 /// - `bearer_token_authorization`: Static bearer token for authorizing requests.
294 /// - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
295 /// - `client_secret`: Client secret for use in client secret flow.
296 /// - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
297 /// - `endpoint`: Override the endpoint for communicating with blob storage.
298 /// - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
299 ///
300 /// Options set through the URL take precedence over those set with these
301 /// options.
302 #[serde(flatten)]
303 pub other_options: BTreeMap<String, String>,
304}
305
306/// Global pipeline configuration settings. This is the publicly
307/// exposed type for users to configure pipelines.
308#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
309#[serde(default)]
310pub struct RuntimeConfig {
311 /// Number of DBSP worker threads.
312 ///
313 /// Each DBSP "foreground" worker thread is paired with a "background"
314 /// thread for LSM merging, making the total number of threads twice the
315 /// specified number.
316 pub workers: u16,
317
318 /// Storage configuration.
319 ///
320 /// - If this is `None`, the default, the pipeline's state is kept in
321 /// in-memory data-structures. This is useful if the pipeline's state
322 /// will fit in memory and if the pipeline is ephemeral and does not need
323 /// to be recovered after a restart. The pipeline will most likely run
324 /// faster since it does not need to access storage.
325 ///
326 /// - If set, the pipeline's state is kept on storage. This allows the
327 /// pipeline to work with state that will not fit into memory. It also
328 /// allows the state to be checkpointed and recovered across restarts.
329 #[serde(deserialize_with = "deserialize_storage_options")]
330 pub storage: Option<StorageOptions>,
331
332 /// Fault tolerance configuration.
333 #[serde(deserialize_with = "deserialize_fault_tolerance")]
334 pub fault_tolerance: FtConfig,
335
336 /// Enable CPU profiler.
337 ///
338 /// The default value is `true`.
339 pub cpu_profiler: bool,
340
341 /// Enable pipeline tracing.
342 pub tracing: bool,
343
344 /// Jaeger tracing endpoint to send tracing information to.
345 pub tracing_endpoint_jaeger: String,
346
347 /// Minimal input batch size.
348 ///
349 /// The controller delays pushing input records to the circuit until at
350 /// least `min_batch_size_records` records have been received (total
351 /// across all endpoints) or `max_buffering_delay_usecs` microseconds
352 /// have passed since at least one input records has been buffered.
353 /// Defaults to 0.
354 pub min_batch_size_records: u64,
355
356 /// Maximal delay in microseconds to wait for `min_batch_size_records` to
357 /// get buffered by the controller, defaults to 0.
358 pub max_buffering_delay_usecs: u64,
359
360 /// Resource reservations and limits. This is enforced
361 /// only in Feldera Cloud.
362 pub resources: ResourceConfig,
363
364 /// Real-time clock resolution in microseconds.
365 ///
366 /// This parameter controls the execution of queries that use the `NOW()` function. The output of such
367 /// queries depends on the real-time clock and can change over time without any external
368 /// inputs. The pipeline will update the clock value and trigger incremental recomputation
369 /// at most each `clock_resolution_usecs` microseconds.
370 ///
371 /// It is set to 100 milliseconds (100,000 microseconds) by default.
372 ///
373 /// Set to `null` to disable periodic clock updates.
374 pub clock_resolution_usecs: Option<u64>,
375
376 /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
377 /// its worker threads. Specify at least twice as many CPU numbers as
378 /// workers. CPUs are generally numbered starting from 0. The pipeline
379 /// might not be able to honor CPU pinning requests.
380 ///
381 /// CPU pinning can make pipelines run faster and perform more consistently,
382 /// as long as different pipelines running on the same machine are pinned to
383 /// different CPUs.
384 pub pin_cpus: Vec<usize>,
385
386 /// Timeout in seconds for the `Provisioning` phase of the pipeline.
387 /// Setting this value will override the default of the runner.
388 pub provisioning_timeout_secs: Option<u64>,
389
390 /// The maximum number of connectors initialized in parallel during pipeline
391 /// startup.
392 ///
393 /// At startup, the pipeline must initialize all of its input and output connectors.
394 /// Depending on the number and types of connectors, this can take a long time.
395 /// To accelerate the process, multiple connectors are initialized concurrently.
396 /// This option controls the maximum number of connectors that can be intitialized
397 /// in parallel.
398 ///
399 /// The default is 10.
400 pub max_parallel_connector_init: Option<u64>,
401}
402
403/// Accepts "true" and "false" and converts them to the new format.
404fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
405where
406 D: Deserializer<'de>,
407{
408 struct BoolOrStruct;
409
410 impl<'de> Visitor<'de> for BoolOrStruct {
411 type Value = Option<StorageOptions>;
412
413 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
414 formatter.write_str("boolean or StorageOptions")
415 }
416
417 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
418 where
419 E: de::Error,
420 {
421 match v {
422 false => Ok(None),
423 true => Ok(Some(StorageOptions::default())),
424 }
425 }
426
427 fn visit_unit<E>(self) -> Result<Self::Value, E>
428 where
429 E: de::Error,
430 {
431 Ok(None)
432 }
433
434 fn visit_none<E>(self) -> Result<Self::Value, E>
435 where
436 E: de::Error,
437 {
438 Ok(None)
439 }
440
441 fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
442 where
443 M: MapAccess<'de>,
444 {
445 Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
446 }
447 }
448
449 deserializer.deserialize_any(BoolOrStruct)
450}
451
452/// Accepts very old 'initial_state' and 'latest_checkpoint' as enabling fault
453/// tolerance.
454///
455/// Accepts `null` as disabling fault tolerance.
456///
457/// Otherwise, deserializes [FtConfig] in the way that one might otherwise
458/// expect.
459fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<FtConfig, D::Error>
460where
461 D: Deserializer<'de>,
462{
463 struct StringOrStruct;
464
465 impl<'de> Visitor<'de> for StringOrStruct {
466 type Value = FtConfig;
467
468 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
469 formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
470 }
471
472 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
473 where
474 E: de::Error,
475 {
476 match v {
477 "initial_state" | "latest_checkpoint" => Ok(FtConfig {
478 model: Some(FtModel::default()),
479 ..FtConfig::default()
480 }),
481 _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
482 }
483 }
484
485 fn visit_unit<E>(self) -> Result<Self::Value, E>
486 where
487 E: de::Error,
488 {
489 Ok(FtConfig::default())
490 }
491
492 fn visit_none<E>(self) -> Result<Self::Value, E>
493 where
494 E: de::Error,
495 {
496 Ok(FtConfig::default())
497 }
498
499 fn visit_map<M>(self, map: M) -> Result<FtConfig, M::Error>
500 where
501 M: MapAccess<'de>,
502 {
503 Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))
504 }
505 }
506
507 deserializer.deserialize_any(StringOrStruct)
508}
509
510impl Default for RuntimeConfig {
511 fn default() -> Self {
512 Self {
513 workers: 8,
514 storage: Some(StorageOptions::default()),
515 fault_tolerance: FtConfig::default(),
516 cpu_profiler: true,
517 tracing: {
518 // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
519 // to keep it on by default.
520 false
521 },
522 tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
523 min_batch_size_records: 0,
524 max_buffering_delay_usecs: 0,
525 resources: ResourceConfig::default(),
526 clock_resolution_usecs: {
527 // Every 100 ms.
528 Some(100_000)
529 },
530 pin_cpus: Vec::new(),
531 provisioning_timeout_secs: None,
532 max_parallel_connector_init: None,
533 }
534 }
535}
536
537/// Fault-tolerance configuration.
538///
539/// The default [FtConfig] (via [FtConfig::default]) disables fault tolerance,
540/// which is the configuration that one gets if [RuntimeConfig] omits fault
541/// tolerance configuration.
542///
543/// The default value for [FtConfig::model] enables fault tolerance, as
544/// `Some(FtModel::default())`. This is the configuration that one gets if
545/// [RuntimeConfig] includes a fault tolerance configuration but does not
546/// specify a particular model.
547#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
548#[serde(rename_all = "snake_case")]
549pub struct FtConfig {
550 /// Fault tolerance model to use.
551 #[serde(with = "none_as_string")]
552 #[serde(default = "default_model")]
553 pub model: Option<FtModel>,
554
555 /// Interval between automatic checkpoints, in seconds.
556 ///
557 /// The default is 60 seconds. Values less than 1 or greater than 3600 will
558 /// be forced into that range.
559 #[serde(default = "default_checkpoint_interval_secs")]
560 pub checkpoint_interval_secs: u64,
561}
562
563fn default_model() -> Option<FtModel> {
564 Some(FtModel::default())
565}
566
567pub fn default_checkpoint_interval_secs() -> u64 {
568 60
569}
570
571impl Default for FtConfig {
572 fn default() -> Self {
573 Self {
574 model: None,
575 checkpoint_interval_secs: default_checkpoint_interval_secs(),
576 }
577 }
578}
579
580#[cfg(test)]
581mod test {
582 use super::deserialize_fault_tolerance;
583 use crate::config::{FtConfig, FtModel};
584 use serde::{Deserialize, Serialize};
585
586 #[test]
587 fn ft_config() {
588 #[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)]
589 #[serde(default)]
590 struct Wrapper {
591 #[serde(deserialize_with = "deserialize_fault_tolerance")]
592 config: FtConfig,
593 }
594
595 // Omitting FtConfig, or specifying null, or specifying model "none", disables fault tolerance.
596 for s in [
597 "{}",
598 r#"{"config": null}"#,
599 r#"{"config": {"model": "none"}}"#,
600 ] {
601 let config: Wrapper = serde_json::from_str(s).unwrap();
602 assert_eq!(
603 config,
604 Wrapper {
605 config: FtConfig {
606 model: None,
607 ..FtConfig::default()
608 }
609 }
610 );
611 }
612
613 // Serializing disabled FT produces explicit "none" form.
614 let s = serde_json::to_string(&Wrapper {
615 config: FtConfig::default(),
616 })
617 .unwrap();
618 assert!(s.contains("\"none\""));
619
620 // `{}` for FtConfig, or `{...}` with `model` omitted, enables fault
621 // tolerance.
622 for s in [r#"{"config": {}}"#, r#"{"checkpoint_interval_secs": 60}"#] {
623 assert_eq!(
624 serde_json::from_str::<FtConfig>(s).unwrap(),
625 FtConfig {
626 model: Some(FtModel::default()),
627 ..FtConfig::default()
628 }
629 );
630 }
631 }
632}
633
634impl FtConfig {
635 pub fn is_enabled(&self) -> bool {
636 self.model.is_some()
637 }
638
639 /// Returns the checkpoint interval, if fault tolerance is enabled, and
640 /// otherwise `None`.
641 pub fn checkpoint_interval(&self) -> Option<Duration> {
642 self.is_enabled()
643 .then(|| Duration::from_secs(self.checkpoint_interval_secs.clamp(1, 3600)))
644 }
645}
646
647/// Serde implementation for de/serializing a string into `Option<T>` where
648/// `"none"` indicates `None` and any other string indicates `Some`.
649///
650/// This could be extended to handle non-strings by adding more forwarding
651/// `visit_*` methods to the Visitor implementation. I don't see a way to write
652/// them automatically.
653mod none_as_string {
654 use std::marker::PhantomData;
655
656 use serde::de::{Deserialize, Deserializer, IntoDeserializer, Visitor};
657 use serde::ser::{Serialize, Serializer};
658
659 pub(super) fn serialize<S, T>(value: &Option<T>, serializer: S) -> Result<S::Ok, S::Error>
660 where
661 S: Serializer,
662 T: Serialize,
663 {
664 match value.as_ref() {
665 Some(value) => value.serialize(serializer),
666 None => "none".serialize(serializer),
667 }
668 }
669
670 struct NoneAsString<T>(PhantomData<fn() -> T>);
671
672 impl<'de, T> Visitor<'de> for NoneAsString<T>
673 where
674 T: Deserialize<'de>,
675 {
676 type Value = Option<T>;
677
678 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
679 formatter.write_str("string")
680 }
681
682 fn visit_none<E>(self) -> Result<Self::Value, E>
683 where
684 E: serde::de::Error,
685 {
686 Ok(None)
687 }
688
689 fn visit_str<E>(self, value: &str) -> Result<Option<T>, E>
690 where
691 E: serde::de::Error,
692 {
693 if &value.to_ascii_lowercase() == "none" {
694 Ok(None)
695 } else {
696 Ok(Some(T::deserialize(value.into_deserializer())?))
697 }
698 }
699 }
700
701 pub(super) fn deserialize<'de, D, T>(deserializer: D) -> Result<Option<T>, D::Error>
702 where
703 D: Deserializer<'de>,
704 T: Deserialize<'de>,
705 {
706 deserializer.deserialize_str(NoneAsString(PhantomData))
707 }
708}
709
710/// Fault tolerance model.
711#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
712#[serde(rename_all = "snake_case")]
713pub enum FtModel {
714 /// Each record is output exactly once. Crashes do not drop or duplicate
715 /// input or output.
716 #[default]
717 ExactlyOnce,
718
719 /// Each record is output at least once. Crashes may duplicate output, but
720 /// no input or output is dropped.
721 AtLeastOnce,
722}
723
724pub struct FtModelUnknown;
725
726impl FromStr for FtModel {
727 type Err = FtModelUnknown;
728
729 fn from_str(s: &str) -> Result<Self, Self::Err> {
730 match s.to_ascii_lowercase().as_str() {
731 "exactly_once" => Ok(Self::ExactlyOnce),
732 "at_least_once" => Ok(Self::AtLeastOnce),
733 _ => Err(FtModelUnknown),
734 }
735 }
736}
737
738/// Describes an input connector configuration
739#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
740pub struct InputEndpointConfig {
741 /// The name of the input stream of the circuit that this endpoint is
742 /// connected to.
743 pub stream: Cow<'static, str>,
744
745 /// Connector configuration.
746 #[serde(flatten)]
747 pub connector_config: ConnectorConfig,
748}
749
750/// Deserialize the `start_after` property of a connector configuration.
751/// It requires a non-standard deserialization because we want to accept
752/// either a string or an array of strings.
753fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
754where
755 D: Deserializer<'de>,
756{
757 let value = Option::<JsonValue>::deserialize(deserializer)?;
758 match value {
759 Some(JsonValue::String(s)) => Ok(Some(vec![s])),
760 Some(JsonValue::Array(arr)) => {
761 let vec = arr
762 .into_iter()
763 .map(|item| {
764 item.as_str()
765 .map(|s| s.to_string())
766 .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
767 })
768 .collect::<Result<Vec<String>, _>>()?;
769 Ok(Some(vec))
770 }
771 Some(JsonValue::Null) | None => Ok(None),
772 _ => Err(serde::de::Error::custom(
773 "invalid 'start_after' property: expected a string, an array of strings, or null",
774 )),
775 }
776}
777
778/// A data connector's configuration
779#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
780pub struct ConnectorConfig {
781 /// Transport endpoint configuration.
782 pub transport: TransportConfig,
783
784 /// Parser configuration.
785 pub format: Option<FormatConfig>,
786
787 /// Name of the index that the connector is attached to.
788 ///
789 /// This property is valid for output connectors only. It is used with data
790 /// transports and formats that expect output updates in the form of key/value
791 /// pairs, where the key typically represents a unique id associated with the
792 /// table or view.
793 ///
794 /// To support such output formats, an output connector can be attached to an
795 /// index created using the SQL CREATE INDEX statement. An index of a table
796 /// or view contains the same updates as the table or view itself, indexed by
797 /// one or more key columns.
798 ///
799 /// See individual connector documentation for details on how they work
800 /// with indexes.
801 pub index: Option<String>,
802
803 /// Output buffer configuration.
804 #[serde(flatten)]
805 pub output_buffer_config: OutputBufferConfig,
806
807 /// Maximum batch size, in records.
808 ///
809 /// This is the maximum number of records to process in one batch through
810 /// the circuit. The time and space cost of processing a batch is
811 /// asymptotically superlinear in the size of the batch, but very small
812 /// batches are less efficient due to constant factors.
813 ///
814 /// This should usually be less than `max_queued_records`, to give the
815 /// connector a round-trip time to restart and refill the buffer while
816 /// batches are being processed.
817 ///
818 /// Some input adapters might not honor this setting.
819 ///
820 /// The default is 10,000.
821 #[serde(default = "default_max_batch_size")]
822 pub max_batch_size: u64,
823
824 /// Backpressure threshold.
825 ///
826 /// Maximal number of records queued by the endpoint before the endpoint
827 /// is paused by the backpressure mechanism.
828 ///
829 /// For input endpoints, this setting bounds the number of records that have
830 /// been received from the input transport but haven't yet been consumed by
831 /// the circuit since the circuit, since the circuit is still busy processing
832 /// previous inputs.
833 ///
834 /// For output endpoints, this setting bounds the number of records that have
835 /// been produced by the circuit but not yet sent via the output transport endpoint
836 /// nor stored in the output buffer (see `enable_output_buffer`).
837 ///
838 /// Note that this is not a hard bound: there can be a small delay between
839 /// the backpressure mechanism is triggered and the endpoint is paused, during
840 /// which more data may be queued.
841 ///
842 /// The default is 1 million.
843 #[serde(default = "default_max_queued_records")]
844 pub max_queued_records: u64,
845
846 /// Create connector in paused state.
847 ///
848 /// The default is `false`.
849 #[serde(default)]
850 pub paused: bool,
851
852 /// Arbitrary user-defined text labels associated with the connector.
853 ///
854 /// These labels can be used in conjunction with the `start_after` property
855 /// to control the start order of connectors.
856 #[serde(default)]
857 pub labels: Vec<String>,
858
859 /// Start the connector after all connectors with specified labels.
860 ///
861 /// This property is used to control the start order of connectors.
862 /// The connector will not start until all connectors with the specified
863 /// labels have finished processing all inputs.
864 #[serde(deserialize_with = "deserialize_start_after")]
865 #[serde(default)]
866 pub start_after: Option<Vec<String>>,
867}
868
869#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
870#[serde(default)]
871pub struct OutputBufferConfig {
872 /// Enable output buffering.
873 ///
874 /// The output buffering mechanism allows decoupling the rate at which the pipeline
875 /// pushes changes to the output transport from the rate of input changes.
876 ///
877 /// By default, output updates produced by the pipeline are pushed directly to
878 /// the output transport. Some destinations may prefer to receive updates in fewer
879 /// bigger batches. For instance, when writing Parquet files, producing
880 /// one bigger file every few minutes is usually better than creating
881 /// small files every few milliseconds.
882 ///
883 /// To achieve such input/output decoupling, users can enable output buffering by
884 /// setting the `enable_output_buffer` flag to `true`. When buffering is enabled, output
885 /// updates produced by the pipeline are consolidated in an internal buffer and are
886 /// pushed to the output transport when one of several conditions is satisfied:
887 ///
888 /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
889 /// milliseconds.
890 /// * buffer size exceeds `max_output_buffer_size_records` records.
891 ///
892 /// This flag is `false` by default.
893 // TODO: on-demand output triggered via the API.
894 pub enable_output_buffer: bool,
895
896 /// Maximum time in milliseconds data is kept in the output buffer.
897 ///
898 /// By default, data is kept in the buffer indefinitely until one of
899 /// the other output conditions is satisfied. When this option is
900 /// set the buffer will be flushed at most every
901 /// `max_output_buffer_time_millis` milliseconds.
902 ///
903 /// NOTE: this configuration option requires the `enable_output_buffer` flag
904 /// to be set.
905 pub max_output_buffer_time_millis: usize,
906
907 /// Maximum number of updates to be kept in the output buffer.
908 ///
909 /// This parameter bounds the maximal size of the buffer.
910 /// Note that the size of the buffer is not always equal to the
911 /// total number of updates output by the pipeline. Updates to the
912 /// same record can overwrite or cancel previous updates.
913 ///
914 /// By default, the buffer can grow indefinitely until one of
915 /// the other output conditions is satisfied.
916 ///
917 /// NOTE: this configuration option requires the `enable_output_buffer` flag
918 /// to be set.
919 pub max_output_buffer_size_records: usize,
920}
921
922impl Default for OutputBufferConfig {
923 fn default() -> Self {
924 Self {
925 enable_output_buffer: false,
926 max_output_buffer_size_records: usize::MAX,
927 max_output_buffer_time_millis: usize::MAX,
928 }
929 }
930}
931
932impl OutputBufferConfig {
933 pub fn validate(&self) -> Result<(), String> {
934 if self.enable_output_buffer
935 && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
936 && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
937 {
938 return Err(
939 "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
940 .to_string(),
941 );
942 }
943
944 Ok(())
945 }
946}
947
948/// Describes an output connector configuration
949#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
950pub struct OutputEndpointConfig {
951 /// The name of the output stream of the circuit that this endpoint is
952 /// connected to.
953 pub stream: Cow<'static, str>,
954
955 /// Connector configuration.
956 #[serde(flatten)]
957 pub connector_config: ConnectorConfig,
958}
959
960/// Transport-specific endpoint configuration passed to
961/// `crate::OutputTransport::new_endpoint`
962/// and `crate::InputTransport::new_endpoint`.
963#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
964#[serde(tag = "name", content = "config", rename_all = "snake_case")]
965pub enum TransportConfig {
966 FileInput(FileInputConfig),
967 FileOutput(FileOutputConfig),
968 KafkaInput(KafkaInputConfig),
969 KafkaOutput(KafkaOutputConfig),
970 PubSubInput(PubSubInputConfig),
971 UrlInput(UrlInputConfig),
972 S3Input(S3InputConfig),
973 DeltaTableInput(DeltaTableReaderConfig),
974 DeltaTableOutput(DeltaTableWriterConfig),
975 RedisOutput(RedisOutputConfig),
976 // Prevent rust from complaining about large size difference between enum variants.
977 IcebergInput(Box<IcebergReaderConfig>),
978 PostgresInput(PostgresReaderConfig),
979 PostgresOutput(PostgresWriterConfig),
980 Datagen(DatagenInputConfig),
981 Nexmark(NexmarkInputConfig),
982 /// Direct HTTP input: cannot be instantiated through API
983 HttpInput(HttpInputConfig),
984 /// Direct HTTP output: cannot be instantiated through API
985 HttpOutput,
986 /// Ad hoc input: cannot be instantiated through API
987 AdHocInput(AdHocInputConfig),
988}
989
990impl TransportConfig {
991 pub fn name(&self) -> String {
992 match self {
993 TransportConfig::FileInput(_) => "file_input".to_string(),
994 TransportConfig::FileOutput(_) => "file_output".to_string(),
995 TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
996 TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
997 TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
998 TransportConfig::UrlInput(_) => "url_input".to_string(),
999 TransportConfig::S3Input(_) => "s3_input".to_string(),
1000 TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
1001 TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
1002 TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
1003 TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
1004 TransportConfig::PostgresOutput(_) => "postgres_output".to_string(),
1005 TransportConfig::Datagen(_) => "datagen".to_string(),
1006 TransportConfig::Nexmark(_) => "nexmark".to_string(),
1007 TransportConfig::HttpInput(_) => "http_input".to_string(),
1008 TransportConfig::HttpOutput => "http_output".to_string(),
1009 TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
1010 TransportConfig::RedisOutput(_) => "redis_output".to_string(),
1011 }
1012 }
1013}
1014
1015/// Data format specification used to parse raw data received from the
1016/// endpoint or to encode data sent to the endpoint.
1017#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
1018pub struct FormatConfig {
1019 /// Format name, e.g., "csv", "json", "bincode", etc.
1020 pub name: Cow<'static, str>,
1021
1022 /// Format-specific parser or encoder configuration.
1023 #[serde(default)]
1024 #[schema(value_type = Object)]
1025 pub config: YamlValue,
1026}
1027
1028#[derive(Debug, Clone, Eq, PartialEq, Default, Serialize, Deserialize, ToSchema)]
1029#[serde(default)]
1030pub struct ResourceConfig {
1031 /// The minimum number of CPU cores to reserve
1032 /// for an instance of this pipeline
1033 pub cpu_cores_min: Option<u64>,
1034
1035 /// The maximum number of CPU cores to reserve
1036 /// for an instance of this pipeline
1037 pub cpu_cores_max: Option<u64>,
1038
1039 /// The minimum memory in Megabytes to reserve
1040 /// for an instance of this pipeline
1041 pub memory_mb_min: Option<u64>,
1042
1043 /// The maximum memory in Megabytes to reserve
1044 /// for an instance of this pipeline
1045 pub memory_mb_max: Option<u64>,
1046
1047 /// The total storage in Megabytes to reserve
1048 /// for an instance of this pipeline
1049 pub storage_mb_max: Option<u64>,
1050
1051 /// Storage class to use for an instance of this pipeline.
1052 /// The class determines storage performance such as IOPS and throughput.
1053 pub storage_class: Option<String>,
1054}