use crate::preprocess::PreprocessorConfig;
use crate::program_schema::ProgramSchema;
use crate::secret_resolver::default_secrets_directory;
use crate::transport::adhoc::AdHocInputConfig;
use crate::transport::clock::ClockConfig;
use crate::transport::datagen::DatagenInputConfig;
use crate::transport::delta_table::{DeltaTableReaderConfig, DeltaTableWriterConfig};
use crate::transport::file::{FileInputConfig, FileOutputConfig};
use crate::transport::http::HttpInputConfig;
use crate::transport::iceberg::IcebergReaderConfig;
use crate::transport::kafka::{KafkaInputConfig, KafkaOutputConfig};
use crate::transport::nats::NatsInputConfig;
use crate::transport::nexmark::NexmarkInputConfig;
use crate::transport::postgres::{PostgresReaderConfig, PostgresWriterConfig};
use crate::transport::pubsub::PubSubInputConfig;
use crate::transport::redis::RedisOutputConfig;
use crate::transport::s3::S3InputConfig;
use crate::transport::url::UrlInputConfig;
use core::fmt;
use feldera_ir::{MirNode, MirNodeId};
use serde::de::{self, MapAccess, Visitor};
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use std::fmt::Display;
use std::path::Path;
use std::str::FromStr;
use std::time::Duration;
use std::{borrow::Cow, cmp::max, collections::BTreeMap};
use utoipa::ToSchema;
use utoipa::openapi::{ObjectBuilder, OneOfBuilder, Ref, RefOr, Schema, SchemaType};
pub mod dev_tweaks;
pub use dev_tweaks::DevTweaks;
const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;
pub const fn default_max_queued_records() -> u64 {
1_000_000
}
pub const DEFAULT_MAX_WORKER_BATCH_SIZE: u64 = 10_000;
pub const DEFAULT_CLOCK_RESOLUTION_USECS: u64 = 1_000_000;
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)]
pub struct ProgramIr {
pub mir: HashMap<MirNodeId, MirNode>,
pub program_schema: ProgramSchema,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq)]
pub struct PipelineConfig {
#[serde(flatten)]
#[schema(inline)]
pub global: RuntimeConfig,
pub multihost: Option<MultihostConfig>,
pub name: Option<String>,
pub given_name: Option<String>,
#[serde(default)]
pub storage_config: Option<StorageConfig>,
pub secrets_dir: Option<String>,
#[serde(default)]
pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
#[serde(default)]
pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
#[serde(default)]
pub program_ir: Option<ProgramIr>,
}
impl PipelineConfig {
pub fn max_parallel_connector_init(&self) -> u64 {
max(
self.global
.max_parallel_connector_init
.unwrap_or(DEFAULT_MAX_PARALLEL_CONNECTOR_INIT),
1,
)
}
pub fn with_storage(self, storage: Option<(StorageConfig, StorageOptions)>) -> Self {
let (storage_config, storage_options) = storage.unzip();
Self {
global: RuntimeConfig {
storage: storage_options,
..self.global
},
storage_config,
..self
}
}
pub fn storage(&self) -> Option<(&StorageConfig, &StorageOptions)> {
let storage_options = self.global.storage.as_ref();
let storage_config = self.storage_config.as_ref();
storage_config.zip(storage_options)
}
pub fn secrets_dir(&self) -> &Path {
match &self.secrets_dir {
Some(dir) => Path::new(dir.as_str()),
None => default_secrets_directory(),
}
}
pub fn display_summary(&self) -> String {
let summary = serde_json::json!({
"name": self.name,
"given_name": self.given_name,
"global": self.global,
"storage_config": self.storage_config,
"secrets_dir": self.secrets_dir,
"inputs": self.inputs,
"outputs": self.outputs
});
serde_json::to_string_pretty(&summary).unwrap_or_else(|_| "{}".to_string())
}
}
#[derive(Default, Deserialize, Serialize, Eq, PartialEq, Debug, Clone)]
pub struct PipelineConfigProgramInfo {
pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
#[serde(default)]
pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
#[serde(default)]
pub program_ir: Option<ProgramIr>,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
pub struct MultihostConfig {
pub hosts: usize,
}
impl Default for MultihostConfig {
fn default() -> Self {
Self { hosts: 1 }
}
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
pub struct StorageConfig {
pub path: String,
#[serde(default)]
pub cache: StorageCacheConfig,
}
impl StorageConfig {
pub fn path(&self) -> &Path {
Path::new(&self.path)
}
}
#[derive(Copy, Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum StorageCacheConfig {
#[default]
PageCache,
FelderaCache,
}
impl StorageCacheConfig {
#[cfg(unix)]
pub fn to_custom_open_flags(&self) -> i32 {
match self {
StorageCacheConfig::PageCache => (),
StorageCacheConfig::FelderaCache => {
#[cfg(target_os = "linux")]
return libc::O_DIRECT;
}
}
0
}
}
#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
#[serde(default)]
pub struct StorageOptions {
pub backend: StorageBackendConfig,
pub min_storage_bytes: Option<usize>,
pub min_step_storage_bytes: Option<usize>,
pub compression: StorageCompression,
pub cache_mib: Option<usize>,
}
#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
#[serde(tag = "name", content = "config", rename_all = "snake_case")]
pub enum StorageBackendConfig {
#[default]
Default,
File(Box<FileBackendConfig>),
Object(ObjectStorageConfig),
}
impl Display for StorageBackendConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
StorageBackendConfig::Default => write!(f, "default"),
StorageBackendConfig::File(_) => write!(f, "file"),
StorageBackendConfig::Object(_) => write!(f, "object"),
}
}
}
#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum StorageCompression {
#[default]
Default,
None,
Snappy,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum StartFromCheckpoint {
Latest,
Uuid(uuid::Uuid),
}
impl ToSchema<'_> for StartFromCheckpoint {
fn schema() -> (
&'static str,
utoipa::openapi::RefOr<utoipa::openapi::schema::Schema>,
) {
(
"StartFromCheckpoint",
utoipa::openapi::RefOr::T(Schema::OneOf(
OneOfBuilder::new()
.item(
ObjectBuilder::new()
.schema_type(SchemaType::String)
.enum_values(Some(["latest"].into_iter()))
.build(),
)
.item(
ObjectBuilder::new()
.schema_type(SchemaType::String)
.format(Some(utoipa::openapi::SchemaFormat::KnownFormat(
utoipa::openapi::KnownFormat::Uuid,
)))
.build(),
)
.nullable(true)
.build(),
)),
)
}
}
impl<'de> Deserialize<'de> for StartFromCheckpoint {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct StartFromCheckpointVisitor;
impl<'de> Visitor<'de> for StartFromCheckpointVisitor {
type Value = StartFromCheckpoint;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a UUID string or the string \"latest\"")
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
if value == "latest" {
Ok(StartFromCheckpoint::Latest)
} else {
uuid::Uuid::parse_str(value)
.map(StartFromCheckpoint::Uuid)
.map_err(|_| E::invalid_value(serde::de::Unexpected::Str(value), &self))
}
}
}
deserializer.deserialize_str(StartFromCheckpointVisitor)
}
}
impl Serialize for StartFromCheckpoint {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self {
StartFromCheckpoint::Latest => serializer.serialize_str("latest"),
StartFromCheckpoint::Uuid(uuid) => serializer.serialize_str(&uuid.to_string()),
}
}
}
#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
pub struct SyncConfig {
pub endpoint: Option<String>,
pub bucket: String,
pub region: Option<String>,
pub provider: Option<String>,
pub access_key: Option<String>,
pub secret_key: Option<String>,
pub start_from_checkpoint: Option<StartFromCheckpoint>,
#[schema(default = std::primitive::bool::default)]
#[serde(default)]
pub fail_if_no_checkpoint: bool,
pub transfers: Option<u8>,
pub checkers: Option<u8>,
pub ignore_checksum: Option<bool>,
pub multi_thread_streams: Option<u8>,
pub multi_thread_cutoff: Option<String>,
pub upload_concurrency: Option<u8>,
#[schema(default = std::primitive::bool::default)]
#[serde(default)]
pub standby: bool,
#[schema(default = default_pull_interval)]
#[serde(default = "default_pull_interval")]
pub pull_interval: u64,
#[serde(default)]
pub push_interval: Option<u64>,
pub flags: Option<Vec<String>>,
#[schema(default = default_retention_min_count)]
#[serde(default = "default_retention_min_count")]
pub retention_min_count: u32,
#[schema(default = default_retention_min_age)]
#[serde(default = "default_retention_min_age")]
pub retention_min_age: u32,
#[serde(default)]
pub read_bucket: Option<String>,
}
fn default_pull_interval() -> u64 {
10
}
fn default_retention_min_count() -> u32 {
10
}
fn default_retention_min_age() -> u32 {
30
}
impl SyncConfig {
pub fn validate(&self) -> Result<(), String> {
if self.standby && self.start_from_checkpoint.is_none() {
return Err(r#"invalid sync config: `standby` set to `true` but `start_from_checkpoint` not set.
Standby mode requires `start_from_checkpoint` to be set.
Consider setting `start_from_checkpoint` to `"latest"`."#.to_owned());
}
if let Some(ref rb) = self.read_bucket
&& rb == &self.bucket
{
return Err(
"invalid sync config: `read_bucket` and `bucket` must point to different locations"
.to_owned(),
);
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
pub struct PipelineTemplateConfig {
pub name: String,
#[schema(default = default_pipeline_template_key)]
#[serde(default = "default_pipeline_template_key")]
pub key: String,
}
fn default_pipeline_template_key() -> String {
"pipelineTemplate".to_string()
}
#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
pub struct ObjectStorageConfig {
pub url: String,
#[serde(flatten)]
pub other_options: BTreeMap<String, String>,
}
#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
#[serde(default)]
pub struct FileBackendConfig {
pub async_threads: Option<bool>,
pub ioop_delay: Option<u64>,
pub sync: Option<SyncConfig>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
#[serde(default)]
pub struct RuntimeConfig {
pub workers: u16,
pub max_rss_mb: Option<u64>,
pub hosts: usize,
#[serde(deserialize_with = "deserialize_storage_options")]
pub storage: Option<StorageOptions>,
#[serde(deserialize_with = "deserialize_fault_tolerance")]
pub fault_tolerance: FtConfig,
pub cpu_profiler: bool,
pub tracing: bool,
pub tracing_endpoint_jaeger: String,
pub min_batch_size_records: u64,
pub max_buffering_delay_usecs: u64,
pub resources: ResourceConfig,
pub clock_resolution_usecs: Option<u64>,
pub pin_cpus: Vec<usize>,
pub provisioning_timeout_secs: Option<u64>,
pub max_parallel_connector_init: Option<u64>,
pub init_containers: Option<serde_json::Value>,
pub checkpoint_during_suspend: bool,
pub http_workers: Option<u64>,
pub io_workers: Option<u64>,
#[serde(default)]
pub env: BTreeMap<String, String>,
pub dev_tweaks: DevTweaks,
pub logging: Option<String>,
pub pipeline_template_configmap: Option<PipelineTemplateConfig>,
}
fn deserialize_storage_options<'de, D>(deserializer: D) -> Result<Option<StorageOptions>, D::Error>
where
D: Deserializer<'de>,
{
struct BoolOrStruct;
impl<'de> Visitor<'de> for BoolOrStruct {
type Value = Option<StorageOptions>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("boolean or StorageOptions")
}
fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
where
E: de::Error,
{
match v {
false => Ok(None),
true => Ok(Some(StorageOptions::default())),
}
}
fn visit_unit<E>(self) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(None)
}
fn visit_none<E>(self) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(None)
}
fn visit_map<M>(self, map: M) -> Result<Option<StorageOptions>, M::Error>
where
M: MapAccess<'de>,
{
Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)).map(Some)
}
}
deserializer.deserialize_any(BoolOrStruct)
}
fn deserialize_fault_tolerance<'de, D>(deserializer: D) -> Result<FtConfig, D::Error>
where
D: Deserializer<'de>,
{
struct StringOrStruct;
impl<'de> Visitor<'de> for StringOrStruct {
type Value = FtConfig;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("none or FtConfig or 'initial_state' or 'latest_checkpoint'")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
match v {
"initial_state" | "latest_checkpoint" => Ok(FtConfig {
model: Some(FtModel::default()),
..FtConfig::default()
}),
_ => Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)),
}
}
fn visit_unit<E>(self) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(FtConfig::default())
}
fn visit_none<E>(self) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(FtConfig::default())
}
fn visit_map<M>(self, map: M) -> Result<FtConfig, M::Error>
where
M: MapAccess<'de>,
{
Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))
}
}
deserializer.deserialize_any(StringOrStruct)
}
impl Default for RuntimeConfig {
fn default() -> Self {
Self {
workers: 8,
max_rss_mb: None,
hosts: 1,
storage: Some(StorageOptions::default()),
fault_tolerance: FtConfig::default(),
cpu_profiler: true,
tracing: {
false
},
tracing_endpoint_jaeger: "127.0.0.1:6831".to_string(),
min_batch_size_records: 0,
max_buffering_delay_usecs: 0,
resources: ResourceConfig::default(),
clock_resolution_usecs: { Some(DEFAULT_CLOCK_RESOLUTION_USECS) },
pin_cpus: Vec::new(),
provisioning_timeout_secs: None,
max_parallel_connector_init: None,
init_containers: None,
checkpoint_during_suspend: true,
io_workers: None,
http_workers: None,
env: BTreeMap::default(),
dev_tweaks: DevTweaks::default(),
logging: None,
pipeline_template_configmap: None,
}
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "snake_case")]
pub struct FtConfig {
#[serde(with = "none_as_string")]
#[serde(default = "default_model")]
#[schema(
schema_with = none_as_string_schema::<FtModel>,
)]
pub model: Option<FtModel>,
#[serde(default = "default_checkpoint_interval_secs")]
pub checkpoint_interval_secs: Option<u64>,
}
fn default_model() -> Option<FtModel> {
Some(FtModel::default())
}
pub fn default_checkpoint_interval_secs() -> Option<u64> {
Some(60)
}
impl Default for FtConfig {
fn default() -> Self {
Self {
model: None,
checkpoint_interval_secs: default_checkpoint_interval_secs(),
}
}
}
#[cfg(test)]
mod test {
use super::deserialize_fault_tolerance;
use crate::config::{FtConfig, FtModel};
use serde::{Deserialize, Serialize};
#[test]
fn ft_config() {
#[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)]
#[serde(default)]
struct Wrapper {
#[serde(deserialize_with = "deserialize_fault_tolerance")]
config: FtConfig,
}
for s in [
"{}",
r#"{"config": null}"#,
r#"{"config": {"model": "none"}}"#,
] {
let config: Wrapper = serde_json::from_str(s).unwrap();
assert_eq!(
config,
Wrapper {
config: FtConfig {
model: None,
checkpoint_interval_secs: Some(60)
}
}
);
}
let s = serde_json::to_string(&Wrapper {
config: FtConfig::default(),
})
.unwrap();
assert!(s.contains("\"none\""));
for s in [r#"{"config": {}}"#, r#"{"checkpoint_interval_secs": 60}"#] {
assert_eq!(
serde_json::from_str::<FtConfig>(s).unwrap(),
FtConfig {
model: Some(FtModel::default()),
checkpoint_interval_secs: Some(60)
}
);
}
assert_eq!(
serde_json::from_str::<FtConfig>(r#"{"checkpoint_interval_secs": null}"#).unwrap(),
FtConfig {
model: Some(FtModel::default()),
checkpoint_interval_secs: None
}
);
}
}
impl FtConfig {
pub fn is_enabled(&self) -> bool {
self.model.is_some()
}
pub fn checkpoint_interval(&self) -> Option<Duration> {
if self.is_enabled() {
self.checkpoint_interval_secs
.map(|interval| Duration::from_secs(interval.clamp(1, 3600)))
} else {
None
}
}
}
mod none_as_string {
use std::marker::PhantomData;
use serde::de::{Deserialize, Deserializer, IntoDeserializer, Visitor};
use serde::ser::{Serialize, Serializer};
pub(super) fn serialize<S, T>(value: &Option<T>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
T: Serialize,
{
match value.as_ref() {
Some(value) => value.serialize(serializer),
None => "none".serialize(serializer),
}
}
struct NoneAsString<T>(PhantomData<fn() -> T>);
impl<'de, T> Visitor<'de> for NoneAsString<T>
where
T: Deserialize<'de>,
{
type Value = Option<T>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("string")
}
fn visit_none<E>(self) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(None)
}
fn visit_str<E>(self, value: &str) -> Result<Option<T>, E>
where
E: serde::de::Error,
{
if &value.to_ascii_lowercase() == "none" {
Ok(None)
} else {
Ok(Some(T::deserialize(value.into_deserializer())?))
}
}
}
pub(super) fn deserialize<'de, D, T>(deserializer: D) -> Result<Option<T>, D::Error>
where
D: Deserializer<'de>,
T: Deserialize<'de>,
{
deserializer.deserialize_str(NoneAsString(PhantomData))
}
}
fn none_as_string_schema<'a, T: ToSchema<'a> + Default + Serialize>() -> Schema {
Schema::OneOf(
OneOfBuilder::new()
.item(RefOr::Ref(Ref::new(format!(
"#/components/schemas/{}",
T::schema().0
))))
.item(
ObjectBuilder::new()
.schema_type(SchemaType::String)
.enum_values(Some(vec!["none"])),
)
.default(Some(
serde_json::to_value(T::default()).expect("Failed to serialize default value"),
))
.build(),
)
}
#[derive(
Debug, Copy, Clone, Default, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, ToSchema,
)]
#[serde(rename_all = "snake_case")]
pub enum FtModel {
AtLeastOnce,
#[default]
ExactlyOnce,
}
impl FtModel {
pub fn option_as_str(value: Option<FtModel>) -> &'static str {
value.map_or("no", |model| model.as_str())
}
pub fn as_str(&self) -> &'static str {
match self {
FtModel::AtLeastOnce => "at_least_once",
FtModel::ExactlyOnce => "exactly_once",
}
}
}
pub struct FtModelUnknown;
impl FromStr for FtModel {
type Err = FtModelUnknown;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"exactly_once" => Ok(Self::ExactlyOnce),
"at_least_once" => Ok(Self::AtLeastOnce),
_ => Err(FtModelUnknown),
}
}
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
pub struct InputEndpointConfig {
pub stream: Cow<'static, str>,
#[serde(flatten)]
pub connector_config: ConnectorConfig,
}
fn deserialize_start_after<'de, D>(deserializer: D) -> Result<Option<Vec<String>>, D::Error>
where
D: Deserializer<'de>,
{
let value = Option::<JsonValue>::deserialize(deserializer)?;
match value {
Some(JsonValue::String(s)) => Ok(Some(vec![s])),
Some(JsonValue::Array(arr)) => {
let vec = arr
.into_iter()
.map(|item| {
item.as_str()
.map(|s| s.to_string())
.ok_or_else(|| serde::de::Error::custom("invalid 'start_after' property: expected a string, an array of strings, or null"))
})
.collect::<Result<Vec<String>, _>>()?;
Ok(Some(vec))
}
Some(JsonValue::Null) | None => Ok(None),
_ => Err(serde::de::Error::custom(
"invalid 'start_after' property: expected a string, an array of strings, or null",
)),
}
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
pub struct ConnectorConfig {
pub transport: TransportConfig,
#[serde(skip_serializing_if = "Option::is_none")]
pub preprocessor: Option<Vec<PreprocessorConfig>>,
pub format: Option<FormatConfig>,
pub index: Option<String>,
#[serde(flatten)]
pub output_buffer_config: OutputBufferConfig,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_batch_size: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_worker_batch_size: Option<u64>,
#[serde(default = "default_max_queued_records")]
pub max_queued_records: u64,
#[serde(default)]
pub paused: bool,
#[serde(default)]
pub labels: Vec<String>,
#[serde(deserialize_with = "deserialize_start_after")]
#[serde(default)]
pub start_after: Option<Vec<String>>,
}
impl ConnectorConfig {
pub fn equal_modulo_paused(&self, other: &Self) -> bool {
let mut a = self.clone();
let mut b = other.clone();
a.paused = false;
b.paused = false;
a == b
}
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
#[serde(default)]
pub struct OutputBufferConfig {
pub enable_output_buffer: bool,
pub max_output_buffer_time_millis: usize,
pub max_output_buffer_size_records: usize,
}
impl Default for OutputBufferConfig {
fn default() -> Self {
Self {
enable_output_buffer: false,
max_output_buffer_size_records: usize::MAX,
max_output_buffer_time_millis: usize::MAX,
}
}
}
impl OutputBufferConfig {
pub fn validate(&self) -> Result<(), String> {
if self.enable_output_buffer
&& self.max_output_buffer_size_records == Self::default().max_output_buffer_size_records
&& self.max_output_buffer_time_millis == Self::default().max_output_buffer_time_millis
{
return Err(
"when the 'enable_output_buffer' flag is set, one of 'max_output_buffer_size_records' and 'max_output_buffer_time_millis' settings must be specified"
.to_string(),
);
}
Ok(())
}
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
pub struct OutputEndpointConfig {
pub stream: Cow<'static, str>,
#[serde(flatten)]
pub connector_config: ConnectorConfig,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
#[serde(tag = "name", content = "config", rename_all = "snake_case")]
pub enum TransportConfig {
FileInput(FileInputConfig),
FileOutput(FileOutputConfig),
NatsInput(NatsInputConfig),
KafkaInput(KafkaInputConfig),
KafkaOutput(KafkaOutputConfig),
PubSubInput(PubSubInputConfig),
UrlInput(UrlInputConfig),
S3Input(S3InputConfig),
DeltaTableInput(DeltaTableReaderConfig),
DeltaTableOutput(DeltaTableWriterConfig),
RedisOutput(RedisOutputConfig),
IcebergInput(Box<IcebergReaderConfig>),
PostgresInput(PostgresReaderConfig),
PostgresOutput(PostgresWriterConfig),
Datagen(DatagenInputConfig),
Nexmark(NexmarkInputConfig),
HttpInput(HttpInputConfig),
HttpOutput,
AdHocInput(AdHocInputConfig),
ClockInput(ClockConfig),
}
impl TransportConfig {
pub fn name(&self) -> String {
match self {
TransportConfig::FileInput(_) => "file_input".to_string(),
TransportConfig::FileOutput(_) => "file_output".to_string(),
TransportConfig::NatsInput(_) => "nats_input".to_string(),
TransportConfig::KafkaInput(_) => "kafka_input".to_string(),
TransportConfig::KafkaOutput(_) => "kafka_output".to_string(),
TransportConfig::PubSubInput(_) => "pub_sub_input".to_string(),
TransportConfig::UrlInput(_) => "url_input".to_string(),
TransportConfig::S3Input(_) => "s3_input".to_string(),
TransportConfig::DeltaTableInput(_) => "delta_table_input".to_string(),
TransportConfig::DeltaTableOutput(_) => "delta_table_output".to_string(),
TransportConfig::IcebergInput(_) => "iceberg_input".to_string(),
TransportConfig::PostgresInput(_) => "postgres_input".to_string(),
TransportConfig::PostgresOutput(_) => "postgres_output".to_string(),
TransportConfig::Datagen(_) => "datagen".to_string(),
TransportConfig::Nexmark(_) => "nexmark".to_string(),
TransportConfig::HttpInput(_) => "http_input".to_string(),
TransportConfig::HttpOutput => "http_output".to_string(),
TransportConfig::AdHocInput(_) => "adhoc_input".to_string(),
TransportConfig::RedisOutput(_) => "redis_output".to_string(),
TransportConfig::ClockInput(_) => "clock".to_string(),
}
}
pub fn is_transient(&self) -> bool {
matches!(
self,
TransportConfig::AdHocInput(_)
| TransportConfig::HttpInput(_)
| TransportConfig::HttpOutput
| TransportConfig::ClockInput(_)
)
}
pub fn is_http_input(&self) -> bool {
matches!(self, TransportConfig::HttpInput(_))
}
}
#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, ToSchema)]
pub struct FormatConfig {
pub name: Cow<'static, str>,
#[serde(default)]
#[schema(value_type = Object)]
pub config: JsonValue,
}
#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, ToSchema)]
#[serde(default)]
pub struct ResourceConfig {
#[serde(deserialize_with = "crate::serde_via_value::deserialize")]
pub cpu_cores_min: Option<f64>,
#[serde(deserialize_with = "crate::serde_via_value::deserialize")]
pub cpu_cores_max: Option<f64>,
pub memory_mb_min: Option<u64>,
pub memory_mb_max: Option<u64>,
pub storage_mb_max: Option<u64>,
pub storage_class: Option<String>,
pub service_account_name: Option<String>,
pub namespace: Option<String>,
}