1use crate::compatibility::CompatibilityLevel;
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct RegistryConfig {
9 pub storage: StorageConfig,
11
12 #[serde(default)]
14 pub compatibility: CompatibilityLevel,
15
16 #[serde(default = "default_true")]
18 pub normalize_schemas: bool,
19
20 #[serde(default)]
22 pub cache: CacheConfig,
23
24 #[serde(default)]
26 pub id_generation: IdGeneration,
27}
28
29fn default_true() -> bool {
30 true
31}
32
33impl Default for RegistryConfig {
34 fn default() -> Self {
35 Self {
36 storage: StorageConfig::Memory,
37 compatibility: CompatibilityLevel::default(),
38 normalize_schemas: true,
39 cache: CacheConfig::default(),
40 id_generation: IdGeneration::default(),
41 }
42 }
43}
44
45impl RegistryConfig {
46 pub fn memory() -> Self {
48 Self {
49 storage: StorageConfig::Memory,
50 ..Default::default()
51 }
52 }
53
54 pub fn broker(config: BrokerStorageConfig) -> Self {
56 Self {
57 storage: StorageConfig::Broker(config),
58 ..Default::default()
59 }
60 }
61
62 pub fn with_compatibility(mut self, level: CompatibilityLevel) -> Self {
64 self.compatibility = level;
65 self
66 }
67
68 pub fn with_normalize(mut self, normalize: bool) -> Self {
70 self.normalize_schemas = normalize;
71 self
72 }
73}
74
75#[derive(Debug, Clone, Default, Serialize, Deserialize)]
82#[serde(tag = "type", rename_all = "lowercase")]
83pub enum StorageConfig {
84 #[default]
86 Memory,
87
88 Broker(BrokerStorageConfig),
93
94 Glue {
96 region: String,
97 registry_name: Option<String>,
98 },
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct BrokerStorageConfig {
104 pub brokers: String,
107
108 #[serde(default = "default_schema_topic")]
110 pub topic: String,
111
112 #[serde(default = "default_replication_factor")]
114 pub replication_factor: u16,
115
116 #[serde(default = "default_partitions")]
119 pub partitions: u32,
120
121 #[serde(default)]
123 pub tls: Option<BrokerTlsConfig>,
124
125 #[serde(default)]
127 pub auth: Option<BrokerAuthConfig>,
128
129 #[serde(default = "default_timeout")]
131 pub connect_timeout_ms: u64,
132
133 #[serde(default = "default_bootstrap_timeout")]
135 pub bootstrap_timeout_ms: u64,
136}
137
138fn default_schema_topic() -> String {
139 "_schemas".to_string()
140}
141
142fn default_replication_factor() -> u16 {
143 3
144}
145
146fn default_partitions() -> u32 {
147 1
148}
149
150fn default_timeout() -> u64 {
151 10000
152}
153
154fn default_bootstrap_timeout() -> u64 {
155 30000
156}
157
158impl Default for BrokerStorageConfig {
159 fn default() -> Self {
160 Self {
161 brokers: "localhost:9092".to_string(),
162 topic: default_schema_topic(),
163 replication_factor: default_replication_factor(),
164 partitions: default_partitions(),
165 tls: None,
166 auth: None,
167 connect_timeout_ms: default_timeout(),
168 bootstrap_timeout_ms: default_bootstrap_timeout(),
169 }
170 }
171}
172
173impl BrokerStorageConfig {
174 pub fn new(brokers: impl Into<String>) -> Self {
176 Self {
177 brokers: brokers.into(),
178 ..Default::default()
179 }
180 }
181
182 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
184 self.topic = topic.into();
185 self
186 }
187
188 pub fn with_replication_factor(mut self, factor: u16) -> Self {
190 self.replication_factor = factor;
191 self
192 }
193
194 pub fn with_tls(mut self, config: BrokerTlsConfig) -> Self {
196 self.tls = Some(config);
197 self
198 }
199
200 pub fn with_auth(mut self, config: BrokerAuthConfig) -> Self {
202 self.auth = Some(config);
203 self
204 }
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct BrokerTlsConfig {
210 pub ca_cert: Option<String>,
212
213 pub client_cert: Option<String>,
215
216 pub client_key: Option<String>,
218
219 #[serde(default)]
221 pub insecure_skip_verify: bool,
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
226#[serde(tag = "type", rename_all = "lowercase")]
227pub enum BrokerAuthConfig {
228 Plain { username: String, password: String },
230
231 ScramSha256 { username: String, password: String },
233
234 ScramSha512 { username: String, password: String },
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct CacheConfig {
241 pub max_schemas: usize,
243
244 pub max_subjects: usize,
246
247 pub ttl_seconds: u64,
249}
250
251impl Default for CacheConfig {
252 fn default() -> Self {
253 Self {
254 max_schemas: 10000,
255 max_subjects: 1000,
256 ttl_seconds: 0, }
258 }
259}
260
261#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
263#[serde(rename_all = "lowercase")]
264pub enum IdGeneration {
265 #[default]
267 AutoIncrement,
268
269 Hash,
271
272 External,
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279
280 #[test]
281 fn test_default_config() {
282 let config = RegistryConfig::default();
283 assert!(matches!(config.storage, StorageConfig::Memory));
284 assert_eq!(config.compatibility, CompatibilityLevel::Backward);
285 assert!(config.normalize_schemas);
286 }
287
288 #[test]
289 fn test_memory_config() {
290 let config = RegistryConfig::memory();
291 assert!(matches!(config.storage, StorageConfig::Memory));
292 }
293
294 #[test]
295 fn test_broker_config() {
296 let broker_config = BrokerStorageConfig::new("localhost:9092")
297 .with_topic("my_schemas")
298 .with_replication_factor(3);
299
300 let config = RegistryConfig::broker(broker_config);
301 if let StorageConfig::Broker(bc) = config.storage {
302 assert_eq!(bc.brokers, "localhost:9092");
303 assert_eq!(bc.topic, "my_schemas");
304 assert_eq!(bc.replication_factor, 3);
305 } else {
306 panic!("Expected Broker storage config");
307 }
308 }
309
310 #[test]
311 fn test_broker_storage_defaults() {
312 let config = BrokerStorageConfig::default();
313 assert_eq!(config.brokers, "localhost:9092");
314 assert_eq!(config.topic, "_schemas");
315 assert_eq!(config.replication_factor, 3);
316 assert_eq!(config.partitions, 1);
317 assert!(config.tls.is_none());
318 assert!(config.auth.is_none());
319 }
320}