sdf_parser_core/config/
mod.rs

1pub mod transform;
2pub mod types;
3pub mod import;
4pub mod dev;
5pub mod utils;
6
7use sdf_common::LATEST_STABLE_DATAFLOW;
8
9pub const SERVICE_DEFINITION_CONFIG_STABLE_VERSION: &str = LATEST_STABLE_DATAFLOW;
10pub const VERSION_NOT_SUPPORTED_ERROR: &str = "ApiVersion not supported, try upgrading to 0.5.0";
11
12pub use wrapper::*;
13
14mod wrapper {
15
16    use schemars::JsonSchema;
17    use serde::{Deserialize, Serialize};
18
19    use super::*;
20
21    use self::types::MetadataType;
22
23    /// Common metadata
24    #[derive(Serialize, Deserialize, Debug, Clone, Default, JsonSchema)]
25    pub struct Metadata {
26        pub name: String,
27        pub version: String,
28        pub namespace: String,
29    }
30
31    #[derive(Serialize, Deserialize, Debug, Default, Clone, JsonSchema)]
32    pub struct DefaultConfigs {
33        #[serde(skip_serializing_if = "Option::is_none", default)]
34        pub consumer: Option<ConsumerConfigWrapper>,
35        #[serde(skip_serializing_if = "Option::is_none", default)]
36        pub converter: Option<SerdeConverter>,
37        #[serde(skip_serializing_if = "Option::is_none", default)]
38        pub producer: Option<ProducerConfigWrapper>,
39    }
40
41    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
42    #[serde(rename_all = "kebab-case")]
43    pub struct TopicWrapper {
44        pub name: Option<String>,
45        #[serde(skip_serializing_if = "Option::is_none", default)]
46        pub consumer: Option<ConsumerConfigWrapper>,
47        #[serde(skip_serializing_if = "Option::is_none", default)]
48        pub producer: Option<ProducerConfigWrapper>,
49        #[serde(
50            skip_serializing_if = "Option::is_none",
51            default,
52            alias = "remote_cluster_profile"
53        )]
54        pub remote_cluster_profile: Option<String>,
55        pub schema: SchemaWrapper,
56    }
57
58    impl TopicWrapper {
59        pub fn ty(&self) -> &str {
60            self.schema.value.ty.ty()
61        }
62
63        pub fn with_defaults(&self, topic_id: &str, defaults: &DefaultConfigs) -> Self {
64            Self {
65                name: self.name.clone().or_else(|| Some(topic_id.into())),
66                consumer: self.consumer.clone().or(defaults.consumer.clone()),
67                producer: self.producer.clone().or(defaults.producer.clone()),
68                remote_cluster_profile: self.remote_cluster_profile.clone(),
69                schema: SchemaWrapper {
70                    key: self
71                        .schema
72                        .clone()
73                        .key
74                        .map(|inner| inner.with_defaults(defaults)),
75                    value: self.schema.value.with_defaults(defaults),
76                },
77            }
78        }
79    }
80
81    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
82    pub struct SchemaWrapper {
83        #[serde(default)]
84        pub key: Option<SerdeConfig>,
85        pub value: SerdeConfig,
86    }
87
88    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
89    pub struct SerdeConfig {
90        #[serde(flatten)]
91        pub ty: MetadataType,
92        pub converter: Option<SerdeConverter>,
93    }
94
95    impl SerdeConfig {
96        pub fn with_defaults(&self, defaults: &DefaultConfigs) -> Self {
97            Self {
98                ty: self.ty.clone(),
99                converter: self.converter.clone().or(defaults.converter.clone()),
100            }
101        }
102    }
103
104    #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, JsonSchema)]
105    #[serde(rename_all = "kebab-case")]
106    pub enum SerdeConverter {
107        Json,
108        Raw,
109    }
110
111    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
112    pub struct ConsumerConfigWrapper {
113        #[serde(skip_serializing_if = "Option::is_none", default)]
114        pub default_starting_offset: Option<OffsetWrapper>,
115        #[serde(skip_serializing_if = "Option::is_none", default)]
116        pub max_bytes: Option<i32>,
117        #[serde(skip_serializing_if = "Option::is_none", default)]
118        pub isolation: Option<Isolation>,
119    }
120
121    #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, JsonSchema)]
122    #[serde(tag = "position", content = "value")]
123    pub enum OffsetWrapper {
124        Offset(i64),
125        Beginning(u32),
126        End(u32),
127    }
128
129    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
130    pub enum Isolation {
131        ReadUncommitted,
132        ReadCommitted,
133    }
134
135    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
136    #[serde(rename_all = "kebab-case")]
137    pub enum Compression {
138        Gzip,
139        Snappy,
140        Lz4,
141        Zstd,
142    }
143
144    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
145    pub struct ProducerConfigWrapper {
146        #[serde(skip_serializing_if = "Option::is_none", default)]
147        pub batch_size: Option<i64>,
148        #[serde(skip_serializing_if = "Option::is_none", default)]
149        pub linger_ms: Option<u64>,
150        #[serde(skip_serializing_if = "Option::is_none", default)]
151        pub compression: Option<Compression>,
152        #[serde(skip_serializing_if = "Option::is_none", default)]
153        pub timeout_ms: Option<u64>,
154        #[serde(skip_serializing_if = "Option::is_none", default)]
155        pub isolation: Option<Isolation>,
156    }
157}