feldera_types/config.rs
1//! Controller configuration.
2//!
3//! This module defines the controller configuration structure. The leaves of
4//! this structure are individual transport-specific and data-format-specific
5//! endpoint configs. We represent these configs as opaque JSON values, so
6//! that the entire configuration tree can be deserialized from a JSON file.
7
8use crate::transport::adhoc::AdHocInputConfig;
9use crate::transport::datagen::DatagenInputConfig;
10use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
11use crate::transport::file::{FileInputConfig, FileOutputConfig};
12use crate::transport::http::HttpInputConfig;
13use crate::transport::iceberg::IcebergReaderConfig;
14use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
15use crate::transport::nexmark::NexmarkInputConfig;
16use crate::transport::postgres::PostgresReaderConfig;
17use crate::transport::pubsub::PubSubInputConfig;
18use crate::transport::redis::RedisOutputConfig;
19use crate::transport::s3::S3InputConfig;
20use crate::transport::url::UrlInputConfig;
21use core::fmt;
22use serde::de::{self, MapAccess, Visitor};
23use serde::{Deserialize, Deserializer, Serialize};
24use serde_json::Value as JsonValue;
25use serde_yaml::Value as YamlValue;
26use std::fmt::Display;
27use std::path::Path;
28use std::{borrow::Cow, collections::BTreeMap};
29use utoipa::ToSchema;
30
31/// Default value of `ConnectorConfig::max_queued_records`.
32pub const fn default_max_queued_records() -> u64 {
33 1_000_000
34}
35
36/// Default maximum batch size for connectors, in records.
37///
38/// If you change this then update the comment on
39/// [ConnectorConfig::max_batch_size].
40pub const fn default_max_batch_size() -> u64 {
41 10_000
42}
43
44/// Pipeline deployment configuration.
45/// It represents configuration entries directly provided by the user
46/// (e.g., runtime configuration) and entries derived from the schema
47/// of the compiled program (e.g., connectors). Storage configuration,
48/// if applicable, is set by the runner.
49#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
50pub struct PipelineConfig {
51 /// Global controller configuration.
52 #[serde(flatten)]
53 #[schema(inline)]
54 pub global: RuntimeConfig,
55
56 /// Pipeline name.
57 pub name: Option<String>,
58
59 /// Configuration for persistent storage
60 ///
61 /// If `global.storage` is `true`, this field must be set to some
62 /// [`StorageConfig`]. If `global.storage` is `false`, the pipeline ignores
63 /// this field.
64 #[serde(default)]
65 pub storage_config: Option<StorageConfig>,
66
67 /// Input endpoint configuration.
68 pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
69
70 /// Output endpoint configuration.
71 #[serde(default)]
72 pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
73}
74
75/// Configuration for persistent storage in a [`PipelineConfig`].
76#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
77pub struct StorageConfig {
78 /// A directory to keep pipeline state, as a path on the filesystem of the
79 /// machine or container where the pipeline will run.
80 ///
81 /// When storage is enabled, this directory stores the data for
82 /// [StorageBackendConfig::Default].
83 ///
84 /// When fault tolerance is enabled, this directory stores checkpoints and
85 /// the log.
86 pub path: String,
87
88 /// How to cache access to storage in this pipeline.
89 #[serde(default)]
90 pub cache: StorageCacheConfig,
91}
92
93impl StorageConfig {
94 pub fn path(&self) -> &Path {
95 Path::new(&self.path)
96 }
97}
98
99/// How to cache access to storage within a Feldera pipeline.
100#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
101#[serde(rename_all = "snake_case")]
102pub enum StorageCacheConfig {
103 /// Use the operating system's page cache as the primary storage cache.
104 ///
105 /// This is the default because it currently performs better than
106 /// `FelderaCache`.
107 #[default]
108 PageCache,
109
110 /// Use Feldera's internal cache implementation.
111 ///
112 /// This is under development. It will become the default when its
113 /// performance exceeds that of `PageCache`.
114 FelderaCache,
115}
116
117impl StorageCacheConfig {
118 #[cfg(unix)]
119 pub fn to_custom_open_flags(&self) -> i32 {
120 match self {
121 StorageCacheConfig::PageCache => (),
122 StorageCacheConfig::FelderaCache => {
123 #[cfg(target_os = "linux")]
124 return libc::O_DIRECT;
125 }
126 }
127 0
128 }
129}
130
131/// Storage configuration for a pipeline.
132#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
133#[serde(default)]
134pub struct StorageOptions {
135 /// How to connect to the underlying storage.
136 pub backend: StorageBackendConfig,
137
138 /// The minimum estimated number of bytes in a batch of data to write it to
139 /// storage. This is provided for debugging and fine-tuning and should
140 /// ordinarily be left unset.
141 ///
142 /// A value of 0 will write even empty batches to storage, and nonzero
143 /// values provide a threshold. `usize::MAX` would effectively disable
144 /// storage.
145 ///
146 /// The default is 1,048,576 (1 MiB).
147 pub min_storage_bytes: Option<usize>,
148
149 /// The form of compression to use in data batches.
150 ///
151 /// Compression has a CPU cost but it can take better advantage of limited
152 /// NVMe and network bandwidth, which means that it can increase overall
153 /// performance.
154 pub compression: StorageCompression,
155
156 /// The maximum size of the in-memory storage cache, in MiB.
157 ///
158 /// If set, the specified cache size is spread across all the foreground and
159 /// background threads. If unset, each foreground or background thread cache
160 /// is limited to 256 MiB.
161 pub cache_mib: Option<usize>,
162}
163
164/// Backend storage configuration.
165#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
166#[serde(tag = "name", content = "config", rename_all = "snake_case")]
167pub enum StorageBackendConfig {
168 /// Use the default storage configuration.
169 ///
170 /// This currently uses the local file system.
171 #[default]
172 Default,
173
174 /// Object storage.
175 Object(ObjectStorageConfig),
176}
177
178impl Display for StorageBackendConfig {
179 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180 match self {
181 StorageBackendConfig::Default => write!(f, "default"),
182 StorageBackendConfig::Object(_) => write!(f, "object"),
183 }
184 }
185}
186
187/// Storage compression algorithm.
188#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
189#[serde(rename_all = "snake_case")]
190pub enum StorageCompression {
191 /// Use Feldera's default compression algorithm.
192 ///
193 /// The default may change as Feldera's performance is tuned and new
194 /// algorithms are introduced.
195 #[default]
196 Default,
197
198 /// Do not compress.
199 None,
200
201 /// Use [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression.
202 Snappy,
203}
204
205#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
206pub struct ObjectStorageConfig {
207 /// URL.
208 ///
209 /// The following URL schemes are supported:
210 ///
211 /// * S3:
212 /// - `s3://<bucket>/<path>`
213 /// - `s3a://<bucket>/<path>`
214 /// - `https://s3.<region>.amazonaws.com/<bucket>`
215 /// - `https://<bucket>.s3.<region>.amazonaws.com`
216 /// - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
217 /// * Google Cloud Storage:
218 /// - `gs://<bucket>/<path>`
219 /// * Microsoft Azure Blob Storage:
220 /// - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
221 /// - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
222 /// - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
223 /// - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
224 /// - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
225 /// - `azure://<container>/<path>` (custom)
226 /// - `https://<account>.dfs.core.windows.net`
227 /// - `https://<account>.blob.core.windows.net`
228 /// - `https://<account>.blob.core.windows.net/<container>`
229 /// - `https://<account>.dfs.fabric.microsoft.com`
230 /// - `https://<account>.dfs.fabric.microsoft.com/<container>`
231 /// - `https://<account>.blob.fabric.microsoft.com`
232 /// - `https://<account>.blob.fabric.microsoft.com/<container>`
233 ///
234 /// Settings derived from the URL will override other settings.
235 pub url: String,
236
237 /// Additional options as key-value pairs.
238 ///
239 /// The following keys are supported:
240 ///
241 /// * S3:
242 /// - `access_key_id`: AWS Access Key.
243 /// - `secret_access_key`: AWS Secret Access Key.
244 /// - `region`: Region.
245 /// - `default_region`: Default region.
246 /// - `endpoint`: Custom endpoint for communicating with S3,
247 /// e.g. `https://localhost:4566` for testing against a localstack
248 /// instance.
249 /// - `token`: Token to use for requests (passed to underlying provider).
250 /// - [Other keys](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants).
251 /// * Google Cloud Storage:
252 /// - `service_account`: Path to the service account file.
253 /// - `service_account_key`: The serialized service account key.
254 /// - `google_application_credentials`: Application credentials path.
255 /// - [Other keys](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html).
256 /// * Microsoft Azure Blob Storage:
257 /// - `access_key`: Azure Access Key.
258 /// - `container_name`: Azure Container Name.
259 /// - `account`: Azure Account.
260 /// - `bearer_token_authorization`: Static bearer token for authorizing requests.
261 /// - `client_id`: Client ID for use in client secret or Kubernetes federated credential flow.
262 /// - `client_secret`: Client secret for use in client secret flow.
263 /// - `tenant_id`: Tenant ID for use in client secret or Kubernetes federated credential flow.
264 /// - `endpoint`: Override the endpoint for communicating with blob storage.
265 /// - [Other keys](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants).
266 ///
267 /// Options set through the URL take precedence over those set with these
268 /// options.
269 #[serde(flatten)]
270 pub other_options: BTreeMap<String, String>,
271}
272
273/// Global pipeline configuration settings. This is the publicly
274/// exposed type for users to configure pipelines.
275#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
276#[serde(default)]
277pub struct RuntimeConfig {
278 /// Number of DBSP worker threads.
279 ///
280 /// Each DBSP "foreground" worker thread is paired with a "background"
281 /// thread for LSM merging, making the total number of threads twice the
282 /// specified number.
283 pub workers: u16,
284
285 /// Storage configuration.
286 ///
287 /// - If this is `None`, the default, the pipeline's state is kept in
288 /// in-memory data-structures. This is useful if the pipeline's state
289 /// will fit in memory and if the pipeline is ephemeral and does not need
290 /// to be recovered after a restart. The pipeline will most likely run
291 /// faster since it does not need to access storage.
292 ///
293 /// - If set, the pipeline's state is kept on storage. This allows the
294 /// pipeline to work with state that will not fit into memory. It also
295 /// allows the state to be checkpointed and recovered across restarts.
296 #[serde(deserialize_with = "deserialize_storage_options")]
297 pub storage: Option<StorageOptions>,
298
299 /// Configures fault tolerance with the specified start up behavior. Fault
300 /// tolerance is disabled if this or `storage` is `None`.
301 #[serde(deserialize_with = "deserialize_fault_tolerance")]
302 pub fault_tolerance: Option<FtConfig>,
303
304 /// Enable CPU profiler.
305 ///
306 /// The default value is `true`.
307 pub cpu_profiler: bool,
308
309 /// Enable pipeline tracing.
310 pub tracing: bool,
311
312 /// Jaeger tracing endpoint to send tracing information to.
313 pub tracing_endpoint_jaeger: String,
314
315 /// Minimal input batch size.
316 ///
317 /// The controller delays pushing input records to the circuit until at
318 /// least `min_batch_size_records` records have been received (total
319 /// across all endpoints) or `max_buffering_delay_usecs` microseconds
320 /// have passed since at least one input records has been buffered.
321 /// Defaults to 0.
322 pub min_batch_size_records: u64,
323
324 /// Maximal delay in microseconds to wait for `min_batch_size_records` to
325 /// get buffered by the controller, defaults to 0.
326 pub max_buffering_delay_usecs: u64,
327
328 /// Resource reservations and limits. This is enforced
329 /// only in Feldera Cloud.
330 pub resources: ResourceConfig,
331
332 /// Real-time clock resolution in microseconds.
333 ///
334 /// This parameter controls the execution of queries that use the `NOW()` function. The output of such
335 /// queries depends on the real-time clock and can change over time without any external
336 /// inputs. The pipeline will update the clock value and trigger incremental recomputation
337 /// at most each `clock_resolution_usecs` microseconds.
338 ///
339 /// It is set to 100 milliseconds (100,000 microseconds) by default.
340 ///
341 /// Set to `null` to disable periodic clock updates.
342 pub clock_resolution_usecs: Option<u64>,
343
344 /// Optionally, a list of CPU numbers for CPUs to which the pipeline may pin
345 /// its worker threads. Specify at least twice as many CPU numbers as
346 /// workers. CPUs are generally numbered starting from 0. The pipeline
347 /// might not be able to honor CPU pinning requests.
348 ///
349 /// CPU pinning can make pipelines run faster and perform more consistently,
350 /// as long as different pipelines running on the same machine are pinned to
351 /// different CPUs.
352 pub pin_cpus: Vec<usize>,
353
354 /// Timeout in seconds for the `Provisioning` phase of the pipeline.
355 /// Setting this value will override the default of the runner.
356 pub provisioning_timeout_secs: Option<u64>,
357}
358
359/// Accepts "true" and "false" and converts them to the new format.
360fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
361where
362 D: Deserializer<'de>,
363{
364 struct BoolOrStruct;
365
366 impl<'de> Visitor<'de> for BoolOrStruct {
367 type Value = Option<StorageOptions>;
368
369 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
370 formatter.write_str("boolean or StorageOptions")
371 }
372
373 fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
374 where
375 E: de::Error,
376 {
377 match v {
378 false => Ok(None),
379 true => Ok(Some(StorageOptions::default())),
380 }
381 }
382
383 fn visit_unit<E>(self) -> Result<Self::Value, E>
384 where
385 E: de::Error,
386 {
387 Ok(None)
388 }
389
390 fn visit_none<E>(self) -> Result<Self::Value, E>
391 where
392 E: de::Error,
393 {
394 Ok(None)
395 }
396
397 fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
398 where
399 M: MapAccess<'de>,
400 {
401 Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
402 }
403 }
404
405 deserializer.deserialize_any(BoolOrStruct)
406}
407
408/// Accepts old 'initial_state' and 'latest_checkpoint' and converts them to the
409/// new format.
410fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<Option<FtConfig>, D::Error>
411where
412 D: Deserializer<'de>,
413{
414 struct StringOrStruct;
415
416 impl<'de> Visitor<'de> for StringOrStruct {
417 type Value = Option<FtConfig>;
418
419 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
420 formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
421 }
422
423 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
424 where
425 E: de::Error,
426 {
427 match v {
428 "initial_state" | "latest_checkpoint" => Ok(Some(FtConfig::default())),
429 _ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
430 }
431 }
432
433 fn visit_unit<E>(self) -> Result<Self::Value, E>
434 where
435 E: de::Error,
436 {
437 Ok(None)
438 }
439
440 fn visit_none<E>(self) -> Result<Self::Value, E>
441 where
442 E: de::Error,
443 {
444 Ok(None)
445 }
446
447 fn visit_map<M>(self, map: M) -> Result<Option<FtConfig>, M::Error>
448 where
449 M: MapAccess<'de>,
450 {
451 Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
452 }
453 }
454
455 deserializer.deserialize_any(StringOrStruct)
456}
457
458impl Default for RuntimeConfig {
459 fn default() -> Self {
460 Self {
461 workers: 8,
462 storage: None,
463 fault_tolerance: None,
464 cpu_profiler: true,
465 tracing: {
466 // We discovered that the jaeger crate can use up gigabytes of RAM, so it's not harmless
467 // to keep it on by default.
468 false
469 },
470 tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
471 min_batch_size_records: 0,
472 max_buffering_delay_usecs: 0,
473 resources: ResourceConfig::default(),
474 clock_resolution_usecs: {
475 // Every 100 ms.
476 Some(100_000)
477 },
478 pin_cpus: Vec::new(),
479 provisioning_timeout_secs: None,
480 }
481 }
482}
483
484/// Fault-tolerance configuration for runtime startup.
485#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
486#[serde(default)]
487#[serde(rename_all = "snake_case")]
488pub struct FtConfig {
489 /// Interval between automatic checkpoints, in seconds.
490 ///
491 /// The default is 60 seconds. A value of 0 disables automatic
492 /// checkpointing.
493 pub checkpoint_interval_secs: u64,
494}
495
496impl Default for FtConfig {
497 fn default() -> Self {
498 Self {
499 checkpoint_interval_secs: 60,
500 }
501 }
502}
503
504/// Describes an input connector configuration
505#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
506pub struct InputEndpointConfig {
507 /// The name of the input stream of the circuit that this endpoint is
508 /// connected to.
509 pub stream: Cow<'static, str>,
510
511 /// Connector configuration.
512 #[serde(flatten)]
513 pub connector_config: ConnectorConfig,
514}
515
516/// Deserialize the `start_after` property of a connector configuration.
517/// It requires a non-standard deserialization because we want to accept
518/// either a string or an array of strings.
519fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
520where
521 D: Deserializer<'de>,
522{
523 let value = Option::<JsonValue>::deserialize(deserializer)?;
524 match value {
525 Some(JsonValue::String(s)) => Ok(Some(vec![s])),
526 Some(JsonValue::Array(arr)) => {
527 let vec = arr
528 .into_iter()
529 .map(|item| {
530 item.as_str()
531 .map(|s| s.to_string())
532 .ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
533 })
534 .collect::<Result<Vec<String>, _>>()?;
535 Ok(Some(vec))
536 }
537 Some(JsonValue::Null) | None => Ok(None),
538 _ => Err(serde::de::Error::custom(
539 "invalid 'start_after' property: expected a string, an array of strings, or null",
540 )),
541 }
542}
543
544/// A data connector's configuration
545#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
546pub struct ConnectorConfig {
547 /// Transport endpoint configuration.
548 pub transport: TransportConfig,
549
550 /// Parser configuration.
551 pub format: Option<FormatConfig>,
552
553 /// Name of the index that the connector is attached to.
554 ///
555 /// This property is valid for output connectors only. It is used with data
556 /// transports and formats that expect output updates in the form of key/value
557 /// pairs, where the key typically represents a unique id associated with the
558 /// table or view.
559 ///
560 /// To support such output formats, an output connector can be attached to an
561 /// index created using the SQL CREATE INDEX statement. An index of a table
562 /// or view contains the same updates as the table or view itself, indexed by
563 /// one or more key columns.
564 ///
565 /// See individual connector documentation for details on how they work
566 /// with indexes.
567 pub index: Option<String>,
568
569 /// Output buffer configuration.
570 #[serde(flatten)]
571 pub output_buffer_config: OutputBufferConfig,
572
573 /// Maximum batch size, in records.
574 ///
575 /// This is the maximum number of records to process in one batch through
576 /// the circuit. The time and space cost of processing a batch is
577 /// asymptotically superlinear in the size of the batch, but very small
578 /// batches are less efficient due to constant factors.
579 ///
580 /// This should usually be less than `max_queued_records`, to give the
581 /// connector a round-trip time to restart and refill the buffer while
582 /// batches are being processed.
583 ///
584 /// Some input adapters might not honor this setting.
585 ///
586 /// The default is 10,000.
587 #[serde(default = "default_max_batch_size")]
588 pub max_batch_size: u64,
589
590 /// Backpressure threshold.
591 ///
592 /// Maximal number of records queued by the endpoint before the endpoint
593 /// is paused by the backpressure mechanism.
594 ///
595 /// For input endpoints, this setting bounds the number of records that have
596 /// been received from the input transport but haven't yet been consumed by
597 /// the circuit since the circuit, since the circuit is still busy processing
598 /// previous inputs.
599 ///
600 /// For output endpoints, this setting bounds the number of records that have
601 /// been produced by the circuit but not yet sent via the output transport endpoint
602 /// nor stored in the output buffer (see `enable_output_buffer`).
603 ///
604 /// Note that this is not a hard bound: there can be a small delay between
605 /// the backpressure mechanism is triggered and the endpoint is paused, during
606 /// which more data may be queued.
607 ///
608 /// The default is 1 million.
609 #[serde(default = "default_max_queued_records")]
610 pub max_queued_records: u64,
611
612 /// Create connector in paused state.
613 ///
614 /// The default is `false`.
615 #[serde(default)]
616 pub paused: bool,
617
618 /// Arbitrary user-defined text labels associated with the connector.
619 ///
620 /// These labels can be used in conjunction with the `start_after` property
621 /// to control the start order of connectors.
622 #[serde(default)]
623 pub labels: Vec<String>,
624
625 /// Start the connector after all connectors with specified labels.
626 ///
627 /// This property is used to control the start order of connectors.
628 /// The connector will not start until all connectors with the specified
629 /// labels have finished processing all inputs.
630 #[serde(deserialize_with = "deserialize_start_after")]
631 #[serde(default)]
632 pub start_after: Option<Vec<String>>,
633}
634
635#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
636#[serde(default)]
637pub struct OutputBufferConfig {
638 /// Enable output buffering.
639 ///
640 /// The output buffering mechanism allows decoupling the rate at which the pipeline
641 /// pushes changes to the output transport from the rate of input changes.
642 ///
643 /// By default, output updates produced by the pipeline are pushed directly to
644 /// the output transport. Some destinations may prefer to receive updates in fewer
645 /// bigger batches. For instance, when writing Parquet files, producing
646 /// one bigger file every few minutes is usually better than creating
647 /// small files every few milliseconds.
648 ///
649 /// To achieve such input/output decoupling, users can enable output buffering by
650 /// setting the `enable_output_buffer` flag to `true`. When buffering is enabled, output
651 /// updates produced by the pipeline are consolidated in an internal buffer and are
652 /// pushed to the output transport when one of several conditions is satisfied:
653 ///
654 /// * data has been accumulated in the buffer for more than `max_output_buffer_time_millis`
655 /// milliseconds.
656 /// * buffer size exceeds `max_output_buffer_size_records` records.
657 ///
658 /// This flag is `false` by default.
659 // TODO: on-demand output triggered via the API.
660 pub enable_output_buffer: bool,
661
662 /// Maximum time in milliseconds data is kept in the output buffer.
663 ///
664 /// By default, data is kept in the buffer indefinitely until one of
665 /// the other output conditions is satisfied. When this option is
666 /// set the buffer will be flushed at most every
667 /// `max_output_buffer_time_millis` milliseconds.
668 ///
669 /// NOTE: this configuration option requires the `enable_output_buffer` flag
670 /// to be set.
671 pub max_output_buffer_time_millis: usize,
672
673 /// Maximum number of updates to be kept in the output buffer.
674 ///
675 /// This parameter bounds the maximal size of the buffer.
676 /// Note that the size of the buffer is not always equal to the
677 /// total number of updates output by the pipeline. Updates to the
678 /// same record can overwrite or cancel previous updates.
679 ///
680 /// By default, the buffer can grow indefinitely until one of
681 /// the other output conditions is satisfied.
682 ///
683 /// NOTE: this configuration option requires the `enable_output_buffer` flag
684 /// to be set.
685 pub max_output_buffer_size_records: usize,
686}
687
688impl Default for OutputBufferConfig {
689 fn default() -> Self {
690 Self {
691 enable_output_buffer: false,
692 max_output_buffer_size_records: usize::MAX,
693 max_output_buffer_time_millis: usize::MAX,
694 }
695 }
696}
697
698impl OutputBufferConfig {
699 pub fn validate(&self) -> Result<(), String> {
700 if self.enable_output_buffer
701 && self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
702 && self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
703 {
704 return Err(
705 "when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
706 .to_string(),
707 );
708 }
709
710 Ok(())
711 }
712}
713
714/// Describes an output connector configuration
715#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
716pub struct OutputEndpointConfig {
717 /// The name of the output stream of the circuit that this endpoint is
718 /// connected to.
719 pub stream: Cow<'static, str>,
720
721 /// Connector configuration.
722 #[serde(flatten)]
723 pub connector_config: ConnectorConfig,
724}
725
726/// Transport-specific endpoint configuration passed to
727/// `crate::OutputTransport::new_endpoint`
728/// and `crate::InputTransport::new_endpoint`.
729#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
730#[serde(tag = "name", content = "config", rename_all = "snake_case")]
731pub enum TransportConfig {
732 FileInput(FileInputConfig),
733 FileOutput(FileOutputConfig),
734 KafkaInput(KafkaInputConfig),
735 KafkaOutput(KafkaOutputConfig),
736 PubSubInput(PubSubInputConfig),
737 UrlInput(UrlInputConfig),
738 S3Input(S3InputConfig),
739 DeltaTableInput(DeltaTableReaderConfig),
740 DeltaTableOutput(DeltaTableWriterConfig),
741 RedisOutput(RedisOutputConfig),
742 // Prevent rust from complaining about large size difference between enum variants.
743 IcebergInput(Box<IcebergReaderConfig>),
744 PostgresInput(PostgresReaderConfig),
745 Datagen(DatagenInputConfig),
746 Nexmark(NexmarkInputConfig),
747 /// Direct HTTP input: cannot be instantiated through API
748 HttpInput(HttpInputConfig),
749 /// Direct HTTP output: cannot be instantiated through API
750 HttpOutput,
751 /// Ad hoc input: cannot be instantiated through API
752 AdHocInput(AdHocInputConfig),
753}
754
755impl TransportConfig {
756 pub fn name(&self) -> String {
757 match self {
758 TransportConfig::FileInput(_) => "file_input".to_string(),
759 TransportConfig::FileOutput(_) => "file_output".to_string(),
760 TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
761 TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
762 TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
763 TransportConfig::UrlInput(_) => "url_input".to_string(),
764 TransportConfig::S3Input(_) => "s3_input".to_string(),
765 TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
766 TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
767 TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
768 TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
769 TransportConfig::Datagen(_) => "datagen".to_string(),
770 TransportConfig::Nexmark(_) => "nexmark".to_string(),
771 TransportConfig::HttpInput(_) => "http_input".to_string(),
772 TransportConfig::HttpOutput => "http_output".to_string(),
773 TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
774 TransportConfig::RedisOutput(_) => "redis_output".to_string(),
775 }
776 }
777}
778
779/// Data format specification used to parse raw data received from the
780/// endpoint or to encode data sent to the endpoint.
781#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
782pub struct FormatConfig {
783 /// Format name, e.g., "csv", "json", "bincode", etc.
784 pub name: Cow<'static, str>,
785
786 /// Format-specific parser or encoder configuration.
787 #[serde(default)]
788 #[schema(value_type = Object)]
789 pub config: YamlValue,
790}
791
792#[derive(Debug, Clone, Eq, PartialEq, Default, Serialize, Deserialize, ToSchema)]
793#[serde(default)]
794pub struct ResourceConfig {
795 /// The minimum number of CPU cores to reserve
796 /// for an instance of this pipeline
797 pub cpu_cores_min: Option<u64>,
798
799 /// The maximum number of CPU cores to reserve
800 /// for an instance of this pipeline
801 pub cpu_cores_max: Option<u64>,
802
803 /// The minimum memory in Megabytes to reserve
804 /// for an instance of this pipeline
805 pub memory_mb_min: Option<u64>,
806
807 /// The maximum memory in Megabytes to reserve
808 /// for an instance of this pipeline
809 pub memory_mb_max: Option<u64>,
810
811 /// The total storage in Megabytes to reserve
812 /// for an instance of this pipeline
813 pub storage_mb_max: Option<u64>,
814
815 /// Storage class to use for an instance of this pipeline.
816 /// The class determines storage performance such as IOPS and throughput.
817 pub storage_class: Option<String>,
818}