1use crate::compatibility::CompatibilityLevel;
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct RegistryConfig {
9 pub storage: StorageConfig,
11
12 #[serde(default)]
14 pub compatibility: CompatibilityLevel,
15
16 #[serde(default = "default_true")]
18 pub normalize_schemas: bool,
19
20 #[serde(default)]
22 pub id_generation: IdGeneration,
23
24 #[serde(default = "default_max_cache_size")]
27 pub max_cache_size: usize,
28}
29
30fn default_true() -> bool {
31 true
32}
33
34fn default_max_cache_size() -> usize {
35 10_000
36}
37
38impl Default for RegistryConfig {
39 fn default() -> Self {
40 Self {
41 storage: StorageConfig::Memory,
42 compatibility: CompatibilityLevel::default(),
43 normalize_schemas: true,
44 id_generation: IdGeneration::default(),
45 max_cache_size: default_max_cache_size(),
46 }
47 }
48}
49
50impl RegistryConfig {
51 pub fn memory() -> Self {
53 Self {
54 storage: StorageConfig::Memory,
55 ..Default::default()
56 }
57 }
58
59 pub fn broker(config: BrokerStorageConfig) -> Self {
61 Self {
62 storage: StorageConfig::Broker(config),
63 ..Default::default()
64 }
65 }
66
67 pub fn with_compatibility(mut self, level: CompatibilityLevel) -> Self {
69 self.compatibility = level;
70 self
71 }
72
73 pub fn with_normalize(mut self, normalize: bool) -> Self {
75 self.normalize_schemas = normalize;
76 self
77 }
78}
79
80#[derive(Debug, Clone, Default, Serialize, Deserialize)]
87#[serde(tag = "type", rename_all = "lowercase")]
88pub enum StorageConfig {
89 #[default]
91 Memory,
92
93 Broker(BrokerStorageConfig),
98
99 Glue {
101 region: String,
102 registry_name: Option<String>,
103 },
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct BrokerStorageConfig {
109 pub brokers: String,
112
113 #[serde(default = "default_schema_topic")]
115 pub topic: String,
116
117 #[serde(default = "default_replication_factor")]
119 pub replication_factor: u16,
120
121 #[serde(default = "default_partitions")]
124 pub partitions: u32,
125
126 #[serde(default)]
128 pub tls: Option<BrokerTlsConfig>,
129
130 #[serde(default)]
132 pub auth: Option<BrokerAuthConfig>,
133
134 #[serde(default = "default_timeout")]
136 pub connect_timeout_ms: u64,
137
138 #[serde(default = "default_bootstrap_timeout")]
140 pub bootstrap_timeout_ms: u64,
141}
142
143fn default_schema_topic() -> String {
144 "_schemas".to_string()
145}
146
147fn default_replication_factor() -> u16 {
148 3
149}
150
151fn default_partitions() -> u32 {
152 1
153}
154
155fn default_timeout() -> u64 {
156 10000
157}
158
159fn default_bootstrap_timeout() -> u64 {
160 30000
161}
162
163impl Default for BrokerStorageConfig {
164 fn default() -> Self {
165 Self {
166 brokers: "localhost:9092".to_string(),
167 topic: default_schema_topic(),
168 replication_factor: default_replication_factor(),
169 partitions: default_partitions(),
170 tls: None,
171 auth: None,
172 connect_timeout_ms: default_timeout(),
173 bootstrap_timeout_ms: default_bootstrap_timeout(),
174 }
175 }
176}
177
178impl BrokerStorageConfig {
179 pub fn new(brokers: impl Into<String>) -> Self {
181 Self {
182 brokers: brokers.into(),
183 ..Default::default()
184 }
185 }
186
187 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
189 self.topic = topic.into();
190 self
191 }
192
193 pub fn with_replication_factor(mut self, factor: u16) -> Self {
195 self.replication_factor = factor;
196 self
197 }
198
199 pub fn with_tls(mut self, config: BrokerTlsConfig) -> Self {
201 self.tls = Some(config);
202 self
203 }
204
205 pub fn with_auth(mut self, config: BrokerAuthConfig) -> Self {
207 self.auth = Some(config);
208 self
209 }
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct BrokerTlsConfig {
215 pub ca_cert: Option<String>,
217
218 pub client_cert: Option<String>,
220
221 pub client_key: Option<String>,
223
224 #[serde(default)]
226 pub insecure_skip_verify: bool,
227}
228
229#[derive(Clone, Serialize, Deserialize)]
231#[serde(tag = "type", rename_all = "lowercase")]
232pub enum BrokerAuthConfig {
233 Plain { username: String, password: String },
235
236 ScramSha256 { username: String, password: String },
238
239 ScramSha512 { username: String, password: String },
241}
242
243impl std::fmt::Debug for BrokerAuthConfig {
244 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245 match self {
246 Self::Plain { username, .. } => f
247 .debug_struct("Plain")
248 .field("username", username)
249 .field("password", &"[REDACTED]")
250 .finish(),
251 Self::ScramSha256 { username, .. } => f
252 .debug_struct("ScramSha256")
253 .field("username", username)
254 .field("password", &"[REDACTED]")
255 .finish(),
256 Self::ScramSha512 { username, .. } => f
257 .debug_struct("ScramSha512")
258 .field("username", username)
259 .field("password", &"[REDACTED]")
260 .finish(),
261 }
262 }
263}
264
265#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
267#[serde(rename_all = "lowercase")]
268pub enum IdGeneration {
269 #[default]
271 AutoIncrement,
272
273 Hash,
275
276 External,
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283
284 #[test]
285 fn test_default_config() {
286 let config = RegistryConfig::default();
287 assert!(matches!(config.storage, StorageConfig::Memory));
288 assert_eq!(config.compatibility, CompatibilityLevel::Backward);
289 assert!(config.normalize_schemas);
290 }
291
292 #[test]
293 fn test_memory_config() {
294 let config = RegistryConfig::memory();
295 assert!(matches!(config.storage, StorageConfig::Memory));
296 }
297
298 #[test]
299 fn test_broker_config() {
300 let broker_config = BrokerStorageConfig::new("localhost:9092")
301 .with_topic("my_schemas")
302 .with_replication_factor(3);
303
304 let config = RegistryConfig::broker(broker_config);
305 if let StorageConfig::Broker(bc) = config.storage {
306 assert_eq!(bc.brokers, "localhost:9092");
307 assert_eq!(bc.topic, "my_schemas");
308 assert_eq!(bc.replication_factor, 3);
309 } else {
310 panic!("Expected Broker storage config");
311 }
312 }
313
314 #[test]
315 fn test_broker_storage_defaults() {
316 let config = BrokerStorageConfig::default();
317 assert_eq!(config.brokers, "localhost:9092");
318 assert_eq!(config.topic, "_schemas");
319 assert_eq!(config.replication_factor, 3);
320 assert_eq!(config.partitions, 1);
321 assert!(config.tls.is_none());
322 assert!(config.auth.is_none());
323 }
324}