1pub mod transform;
2pub mod types;
3pub mod import;
4pub mod dev;
5pub mod utils;
6
7use sdf_common::LATEST_STABLE_DATAFLOW;
8
9pub const SERVICE_DEFINITION_CONFIG_STABLE_VERSION: &str = LATEST_STABLE_DATAFLOW;
10pub const VERSION_NOT_SUPPORTED_ERROR: &str = "ApiVersion not supported, try upgrading to 0.5.0";
11
12pub use wrapper::*;
13
14mod wrapper {
15
16 use schemars::JsonSchema;
17 use serde::{Deserialize, Serialize};
18
19 use super::*;
20
21 use self::types::MetadataType;
22
23 #[derive(Serialize, Deserialize, Debug, Clone, Default, JsonSchema)]
25 pub struct Metadata {
26 pub name: String,
27 pub version: String,
28 pub namespace: String,
29 }
30
31 #[derive(Serialize, Deserialize, Debug, Default, Clone, JsonSchema)]
32 pub struct DefaultConfigs {
33 #[serde(skip_serializing_if = "Option::is_none", default)]
34 pub consumer: Option<ConsumerConfigWrapper>,
35 #[serde(skip_serializing_if = "Option::is_none", default)]
36 pub converter: Option<SerdeConverter>,
37 #[serde(skip_serializing_if = "Option::is_none", default)]
38 pub producer: Option<ProducerConfigWrapper>,
39 }
40
41 #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
42 #[serde(rename_all = "kebab-case")]
43 pub struct TopicWrapper {
44 pub name: Option<String>,
45 #[serde(skip_serializing_if = "Option::is_none", default)]
46 pub consumer: Option<ConsumerConfigWrapper>,
47 #[serde(skip_serializing_if = "Option::is_none", default)]
48 pub producer: Option<ProducerConfigWrapper>,
49 #[serde(
50 skip_serializing_if = "Option::is_none",
51 default,
52 alias = "remote_cluster_profile"
53 )]
54 pub remote_cluster_profile: Option<String>,
55 pub schema: SchemaWrapper,
56 }
57
58 impl TopicWrapper {
59 pub fn ty(&self) -> &str {
60 self.schema.value.ty.ty()
61 }
62
63 pub fn with_defaults(&self, topic_id: &str, defaults: &DefaultConfigs) -> Self {
64 Self {
65 name: self.name.clone().or_else(|| Some(topic_id.into())),
66 consumer: self.consumer.clone().or(defaults.consumer.clone()),
67 producer: self.producer.clone().or(defaults.producer.clone()),
68 remote_cluster_profile: self.remote_cluster_profile.clone(),
69 schema: SchemaWrapper {
70 key: self
71 .schema
72 .clone()
73 .key
74 .map(|inner| inner.with_defaults(defaults)),
75 value: self.schema.value.with_defaults(defaults),
76 },
77 }
78 }
79 }
80
81 #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
82 pub struct SchemaWrapper {
83 #[serde(default)]
84 pub key: Option<SerdeConfig>,
85 pub value: SerdeConfig,
86 }
87
88 #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
89 pub struct SerdeConfig {
90 #[serde(flatten)]
91 pub ty: MetadataType,
92 pub converter: Option<SerdeConverter>,
93 }
94
95 impl SerdeConfig {
96 pub fn with_defaults(&self, defaults: &DefaultConfigs) -> Self {
97 Self {
98 ty: self.ty.clone(),
99 converter: self.converter.clone().or(defaults.converter.clone()),
100 }
101 }
102 }
103
104 #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, JsonSchema)]
105 #[serde(rename_all = "kebab-case")]
106 pub enum SerdeConverter {
107 Json,
108 Raw,
109 }
110
111 #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
112 pub struct ConsumerConfigWrapper {
113 #[serde(skip_serializing_if = "Option::is_none", default)]
114 pub default_starting_offset: Option<OffsetWrapper>,
115 #[serde(skip_serializing_if = "Option::is_none", default)]
116 pub max_bytes: Option<i32>,
117 #[serde(skip_serializing_if = "Option::is_none", default)]
118 pub isolation: Option<Isolation>,
119 }
120
121 #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, JsonSchema)]
122 #[serde(tag = "position", content = "value")]
123 pub enum OffsetWrapper {
124 Offset(i64),
125 Beginning(u32),
126 End(u32),
127 }
128
129 #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
130 pub enum Isolation {
131 ReadUncommitted,
132 ReadCommitted,
133 }
134
135 #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
136 #[serde(rename_all = "kebab-case")]
137 pub enum Compression {
138 Gzip,
139 Snappy,
140 Lz4,
141 Zstd,
142 }
143
144 #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
145 pub struct ProducerConfigWrapper {
146 #[serde(skip_serializing_if = "Option::is_none", default)]
147 pub batch_size: Option<i64>,
148 #[serde(skip_serializing_if = "Option::is_none", default)]
149 pub linger_ms: Option<u64>,
150 #[serde(skip_serializing_if = "Option::is_none", default)]
151 pub compression: Option<Compression>,
152 #[serde(skip_serializing_if = "Option::is_none", default)]
153 pub timeout_ms: Option<u64>,
154 #[serde(skip_serializing_if = "Option::is_none", default)]
155 pub isolation: Option<Isolation>,
156 }
157}