sdf_parser_core/config/
mod.rs

1pub mod transform;
2pub mod types;
3pub mod import;
4pub mod dev;
5pub mod utils;
6
7use sdf_common::LATEST_STABLE_DATAFLOW;
8
9pub const SERVICE_DEFINITION_CONFIG_STABLE_VERSION: &str = LATEST_STABLE_DATAFLOW;
10pub const VERSION_NOT_SUPPORTED_ERROR: &str = "ApiVersion not supported, try upgrading to 0.5.0";
11
12pub use wrapper::*;
13
14mod wrapper {
15
16    use schemars::JsonSchema;
17    use serde::{Deserialize, Serialize};
18
19    use super::*;
20
21    use self::types::MetadataType;
22
23    /// Common metadata
24    #[derive(Serialize, Deserialize, Debug, Clone, Default, JsonSchema)]
25    pub struct Metadata {
26        pub name: String,
27        pub version: String,
28        pub namespace: String,
29    }
30
31    #[derive(Serialize, Deserialize, Debug, Default, Clone, JsonSchema)]
32    pub struct DefaultConfigs {
33        #[serde(skip_serializing_if = "Option::is_none", default)]
34        pub consumer: Option<ConsumerConfigWrapper>,
35        #[serde(skip_serializing_if = "Option::is_none", default)]
36        pub converter: Option<SerdeConverter>,
37        #[serde(skip_serializing_if = "Option::is_none", default)]
38        pub producer: Option<ProducerConfigWrapper>,
39    }
40
41    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
42    #[serde(rename_all = "kebab-case")]
43    pub struct TopicWrapper {
44        pub name: Option<String>,
45        #[serde(skip_serializing_if = "Option::is_none", default)]
46        pub consumer: Option<ConsumerConfigWrapper>,
47        #[serde(skip_serializing_if = "Option::is_none", default)]
48        pub producer: Option<ProducerConfigWrapper>,
49        #[serde(
50            skip_serializing_if = "Option::is_none",
51            default,
52            alias = "remote_cluster_profile"
53        )]
54        pub remote_cluster_profile: Option<String>,
55        pub schema: SchemaWrapper,
56    }
57
58    impl TopicWrapper {
59        pub fn ty(&self) -> &str {
60            self.schema.value.ty.ty()
61        }
62
63        pub fn with_topic_id(&self, topic_id: &str) -> Self {
64            Self {
65                name: self.name.clone().or_else(|| Some(topic_id.into())),
66                consumer: self.consumer.clone(),
67                producer: self.producer.clone(),
68                remote_cluster_profile: self.remote_cluster_profile.clone(),
69                schema: SchemaWrapper {
70                    key: self.schema.clone().key,
71                    value: self.schema.value.clone(),
72                },
73            }
74        }
75    }
76
77    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
78    pub struct SchemaWrapper {
79        #[serde(default)]
80        pub key: Option<SerdeConfig>,
81        pub value: SerdeConfig,
82    }
83
84    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
85    pub struct SerdeConfig {
86        #[serde(flatten)]
87        pub ty: MetadataType,
88        pub converter: Option<SerdeConverter>,
89    }
90
91    #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, JsonSchema)]
92    #[serde(rename_all = "kebab-case")]
93    pub enum SerdeConverter {
94        Json,
95        Raw,
96    }
97
98    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
99    pub struct ConsumerConfigWrapper {
100        #[serde(skip_serializing_if = "Option::is_none", default)]
101        pub default_starting_offset: Option<OffsetWrapper>,
102        #[serde(skip_serializing_if = "Option::is_none", default)]
103        pub max_bytes: Option<i32>,
104        #[serde(skip_serializing_if = "Option::is_none", default)]
105        pub isolation: Option<Isolation>,
106    }
107
108    #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, JsonSchema)]
109    #[serde(tag = "position", content = "value")]
110    pub enum OffsetWrapper {
111        Offset(i64),
112        Beginning(u32),
113        End(u32),
114    }
115
116    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
117    pub enum Isolation {
118        ReadUncommitted,
119        ReadCommitted,
120    }
121
122    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
123    #[serde(rename_all = "kebab-case")]
124    pub enum Compression {
125        Gzip,
126        Snappy,
127        Lz4,
128        Zstd,
129    }
130
131    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
132    pub struct ProducerConfigWrapper {
133        #[serde(skip_serializing_if = "Option::is_none", default)]
134        pub batch_size: Option<i64>,
135        #[serde(skip_serializing_if = "Option::is_none", default)]
136        pub linger_ms: Option<u64>,
137        #[serde(skip_serializing_if = "Option::is_none", default)]
138        pub compression: Option<Compression>,
139        #[serde(skip_serializing_if = "Option::is_none", default)]
140        pub timeout_ms: Option<u64>,
141        #[serde(skip_serializing_if = "Option::is_none", default)]
142        pub isolation: Option<Isolation>,
143    }
144}