1use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::sync::RwLock;
19use std::time::Duration;
20
21pub const DEFAULT_RETENTION_MS: i64 = 7 * 24 * 60 * 60 * 1000;
23
24pub const DEFAULT_RETENTION_BYTES: i64 = -1;
26
27pub const DEFAULT_MAX_MESSAGE_BYTES: i64 = 1024 * 1024;
29
30pub const DEFAULT_SEGMENT_BYTES: i64 = 1024 * 1024 * 1024;
32
33pub const DEFAULT_SEGMENT_MS: i64 = 7 * 24 * 60 * 60 * 1000;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
38pub enum CleanupPolicy {
39 #[default]
41 Delete,
42 Compact,
44 CompactDelete,
46}
47
48impl std::fmt::Display for CleanupPolicy {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 match self {
51 CleanupPolicy::Delete => write!(f, "delete"),
52 CleanupPolicy::Compact => write!(f, "compact"),
53 CleanupPolicy::CompactDelete => write!(f, "compact,delete"),
54 }
55 }
56}
57
58impl std::str::FromStr for CleanupPolicy {
59 type Err = String;
60
61 fn from_str(s: &str) -> Result<Self, Self::Err> {
62 match s.to_lowercase().as_str() {
63 "delete" => Ok(CleanupPolicy::Delete),
64 "compact" => Ok(CleanupPolicy::Compact),
65 "compact,delete" | "delete,compact" => Ok(CleanupPolicy::CompactDelete),
66 _ => Err(format!("Invalid cleanup policy: {}", s)),
67 }
68 }
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
73pub enum CompressionType {
74 None,
76 #[default]
78 Producer,
79 Lz4,
81 Zstd,
83 Snappy,
85 Gzip,
87}
88
89impl std::fmt::Display for CompressionType {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self {
92 CompressionType::None => write!(f, "none"),
93 CompressionType::Producer => write!(f, "producer"),
94 CompressionType::Lz4 => write!(f, "lz4"),
95 CompressionType::Zstd => write!(f, "zstd"),
96 CompressionType::Snappy => write!(f, "snappy"),
97 CompressionType::Gzip => write!(f, "gzip"),
98 }
99 }
100}
101
102impl std::str::FromStr for CompressionType {
103 type Err = String;
104
105 fn from_str(s: &str) -> Result<Self, Self::Err> {
106 match s.to_lowercase().as_str() {
107 "none" => Ok(CompressionType::None),
108 "producer" => Ok(CompressionType::Producer),
109 "lz4" => Ok(CompressionType::Lz4),
110 "zstd" => Ok(CompressionType::Zstd),
111 "snappy" => Ok(CompressionType::Snappy),
112 "gzip" => Ok(CompressionType::Gzip),
113 _ => Err(format!("Invalid compression type: {}", s)),
114 }
115 }
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct TopicConfig {
121 pub retention_ms: i64,
123
124 pub retention_bytes: i64,
126
127 pub max_message_bytes: i64,
129
130 pub segment_bytes: i64,
132
133 pub segment_ms: i64,
135
136 pub cleanup_policy: CleanupPolicy,
138
139 pub min_insync_replicas: i32,
141
142 pub compression_type: CompressionType,
144}
145
146impl Default for TopicConfig {
147 fn default() -> Self {
148 Self {
149 retention_ms: DEFAULT_RETENTION_MS,
150 retention_bytes: DEFAULT_RETENTION_BYTES,
151 max_message_bytes: DEFAULT_MAX_MESSAGE_BYTES,
152 segment_bytes: DEFAULT_SEGMENT_BYTES,
153 segment_ms: DEFAULT_SEGMENT_MS,
154 cleanup_policy: CleanupPolicy::default(),
155 min_insync_replicas: 1,
156 compression_type: CompressionType::default(),
157 }
158 }
159}
160
161impl TopicConfig {
162 pub fn new() -> Self {
164 Self::default()
165 }
166
167 pub fn retention_duration(&self) -> Duration {
169 Duration::from_millis(self.retention_ms as u64)
170 }
171
172 pub fn segment_roll_duration(&self) -> Duration {
174 Duration::from_millis(self.segment_ms as u64)
175 }
176
177 pub fn to_map(&self) -> HashMap<String, ConfigValue> {
179 let mut map = HashMap::new();
180
181 map.insert(
182 "retention.ms".to_string(),
183 ConfigValue {
184 value: self.retention_ms.to_string(),
185 is_default: self.retention_ms == DEFAULT_RETENTION_MS,
186 is_read_only: false,
187 is_sensitive: false,
188 },
189 );
190
191 map.insert(
192 "retention.bytes".to_string(),
193 ConfigValue {
194 value: self.retention_bytes.to_string(),
195 is_default: self.retention_bytes == DEFAULT_RETENTION_BYTES,
196 is_read_only: false,
197 is_sensitive: false,
198 },
199 );
200
201 map.insert(
202 "max.message.bytes".to_string(),
203 ConfigValue {
204 value: self.max_message_bytes.to_string(),
205 is_default: self.max_message_bytes == DEFAULT_MAX_MESSAGE_BYTES,
206 is_read_only: false,
207 is_sensitive: false,
208 },
209 );
210
211 map.insert(
212 "segment.bytes".to_string(),
213 ConfigValue {
214 value: self.segment_bytes.to_string(),
215 is_default: self.segment_bytes == DEFAULT_SEGMENT_BYTES,
216 is_read_only: false,
217 is_sensitive: false,
218 },
219 );
220
221 map.insert(
222 "segment.ms".to_string(),
223 ConfigValue {
224 value: self.segment_ms.to_string(),
225 is_default: self.segment_ms == DEFAULT_SEGMENT_MS,
226 is_read_only: false,
227 is_sensitive: false,
228 },
229 );
230
231 map.insert(
232 "cleanup.policy".to_string(),
233 ConfigValue {
234 value: self.cleanup_policy.to_string(),
235 is_default: self.cleanup_policy == CleanupPolicy::default(),
236 is_read_only: false,
237 is_sensitive: false,
238 },
239 );
240
241 map.insert(
242 "min.insync.replicas".to_string(),
243 ConfigValue {
244 value: self.min_insync_replicas.to_string(),
245 is_default: self.min_insync_replicas == 1,
246 is_read_only: false,
247 is_sensitive: false,
248 },
249 );
250
251 map.insert(
252 "compression.type".to_string(),
253 ConfigValue {
254 value: self.compression_type.to_string(),
255 is_default: self.compression_type == CompressionType::default(),
256 is_read_only: false,
257 is_sensitive: false,
258 },
259 );
260
261 map
262 }
263
264 pub fn apply(&mut self, key: &str, value: Option<&str>) -> Result<(), String> {
266 match key {
267 "retention.ms" => {
268 self.retention_ms = match value {
269 Some(v) => v
270 .parse()
271 .map_err(|e| format!("Invalid retention.ms: {}", e))?,
272 None => DEFAULT_RETENTION_MS,
273 };
274 }
275 "retention.bytes" => {
276 self.retention_bytes = match value {
277 Some(v) => v
278 .parse()
279 .map_err(|e| format!("Invalid retention.bytes: {}", e))?,
280 None => DEFAULT_RETENTION_BYTES,
281 };
282 }
283 "max.message.bytes" => {
284 self.max_message_bytes = match value {
285 Some(v) => v
286 .parse()
287 .map_err(|e| format!("Invalid max.message.bytes: {}", e))?,
288 None => DEFAULT_MAX_MESSAGE_BYTES,
289 };
290 }
291 "segment.bytes" => {
292 self.segment_bytes = match value {
293 Some(v) => v
294 .parse()
295 .map_err(|e| format!("Invalid segment.bytes: {}", e))?,
296 None => DEFAULT_SEGMENT_BYTES,
297 };
298 }
299 "segment.ms" => {
300 self.segment_ms = match value {
301 Some(v) => v
302 .parse()
303 .map_err(|e| format!("Invalid segment.ms: {}", e))?,
304 None => DEFAULT_SEGMENT_MS,
305 };
306 }
307 "cleanup.policy" => {
308 self.cleanup_policy = match value {
309 Some(v) => v.parse()?,
310 None => CleanupPolicy::default(),
311 };
312 }
313 "min.insync.replicas" => {
314 self.min_insync_replicas = match value {
315 Some(v) => v
316 .parse()
317 .map_err(|e| format!("Invalid min.insync.replicas: {}", e))?,
318 None => 1,
319 };
320 }
321 "compression.type" => {
322 self.compression_type = match value {
323 Some(v) => v.parse()?,
324 None => CompressionType::default(),
325 };
326 }
327 _ => {
328 return Err(format!("Unknown configuration key: {}", key));
329 }
330 }
331 Ok(())
332 }
333}
334
335#[derive(Debug, Clone, Serialize, Deserialize)]
337pub struct ConfigValue {
338 pub value: String,
340 pub is_default: bool,
342 pub is_read_only: bool,
344 pub is_sensitive: bool,
346}
347
348pub struct TopicConfigManager {
352 configs: RwLock<HashMap<String, TopicConfig>>,
354}
355
356impl Default for TopicConfigManager {
357 fn default() -> Self {
358 Self::new()
359 }
360}
361
362impl TopicConfigManager {
363 pub fn new() -> Self {
365 Self {
366 configs: RwLock::new(HashMap::new()),
367 }
368 }
369
370 pub fn get_or_default(&self, topic: &str) -> TopicConfig {
372 let configs = self
373 .configs
374 .read()
375 .expect("topic config manager lock poisoned");
376 configs.get(topic).cloned().unwrap_or_default()
377 }
378
379 pub fn get(&self, topic: &str) -> Option<TopicConfig> {
381 let configs = self
382 .configs
383 .read()
384 .expect("topic config manager lock poisoned");
385 configs.get(topic).cloned()
386 }
387
388 pub fn set(&self, topic: &str, config: TopicConfig) {
390 let mut configs = self
391 .configs
392 .write()
393 .expect("topic config manager lock poisoned");
394 configs.insert(topic.to_string(), config);
395 }
396
397 pub fn apply_changes(
399 &self,
400 topic: &str,
401 changes: &[(String, Option<String>)],
402 ) -> Result<usize, String> {
403 let mut configs = self
404 .configs
405 .write()
406 .expect("topic config manager lock poisoned");
407 let config = configs.entry(topic.to_string()).or_default();
408
409 let mut changed = 0;
410 for (key, value) in changes {
411 config.apply(key, value.as_deref())?;
412 changed += 1;
413 }
414
415 Ok(changed)
416 }
417
418 pub fn remove(&self, topic: &str) {
420 let mut configs = self
421 .configs
422 .write()
423 .expect("topic config manager lock poisoned");
424 configs.remove(topic);
425 }
426
427 pub fn list_topics(&self) -> Vec<String> {
429 let configs = self
430 .configs
431 .read()
432 .expect("topic config manager lock poisoned");
433 configs.keys().cloned().collect()
434 }
435
436 pub fn describe(&self, topics: &[String]) -> Vec<(String, HashMap<String, ConfigValue>)> {
438 let configs = self
439 .configs
440 .read()
441 .expect("topic config manager lock poisoned");
442
443 if topics.is_empty() {
444 configs
446 .iter()
447 .map(|(name, config)| (name.clone(), config.to_map()))
448 .collect()
449 } else {
450 topics
452 .iter()
453 .map(|name| {
454 let config = configs.get(name).cloned().unwrap_or_default();
455 (name.clone(), config.to_map())
456 })
457 .collect()
458 }
459 }
460}
461
462#[cfg(test)]
467mod tests {
468 use super::*;
469
470 #[test]
471 fn test_topic_config_defaults() {
472 let config = TopicConfig::default();
473 assert_eq!(config.retention_ms, DEFAULT_RETENTION_MS);
474 assert_eq!(config.retention_bytes, DEFAULT_RETENTION_BYTES);
475 assert_eq!(config.max_message_bytes, DEFAULT_MAX_MESSAGE_BYTES);
476 assert_eq!(config.cleanup_policy, CleanupPolicy::Delete);
477 }
478
479 #[test]
480 fn test_cleanup_policy_parse() {
481 assert_eq!(
482 "delete".parse::<CleanupPolicy>().unwrap(),
483 CleanupPolicy::Delete
484 );
485 assert_eq!(
486 "compact".parse::<CleanupPolicy>().unwrap(),
487 CleanupPolicy::Compact
488 );
489 assert_eq!(
490 "compact,delete".parse::<CleanupPolicy>().unwrap(),
491 CleanupPolicy::CompactDelete
492 );
493 }
494
495 #[test]
496 fn test_compression_type_parse() {
497 assert_eq!(
498 "lz4".parse::<CompressionType>().unwrap(),
499 CompressionType::Lz4
500 );
501 assert_eq!(
502 "zstd".parse::<CompressionType>().unwrap(),
503 CompressionType::Zstd
504 );
505 assert_eq!(
506 "producer".parse::<CompressionType>().unwrap(),
507 CompressionType::Producer
508 );
509 }
510
511 #[test]
512 fn test_apply_config_changes() {
513 let mut config = TopicConfig::default();
514
515 config.apply("retention.ms", Some("86400000")).unwrap();
516 assert_eq!(config.retention_ms, 86400000);
517
518 config.apply("cleanup.policy", Some("compact")).unwrap();
519 assert_eq!(config.cleanup_policy, CleanupPolicy::Compact);
520
521 config.apply("compression.type", Some("lz4")).unwrap();
522 assert_eq!(config.compression_type, CompressionType::Lz4);
523 }
524
525 #[test]
526 fn test_apply_reset_to_default() {
527 let mut config = TopicConfig {
528 retention_ms: 123456,
529 ..Default::default()
530 };
531
532 config.apply("retention.ms", None).unwrap();
533 assert_eq!(config.retention_ms, DEFAULT_RETENTION_MS);
534 }
535
536 #[test]
537 fn test_invalid_config_key() {
538 let mut config = TopicConfig::default();
539 let result = config.apply("invalid.key", Some("value"));
540 assert!(result.is_err());
541 }
542
543 #[test]
544 fn test_config_to_map() {
545 let config = TopicConfig::default();
546 let map = config.to_map();
547
548 assert!(map.contains_key("retention.ms"));
549 assert!(map.contains_key("cleanup.policy"));
550 assert!(map.get("retention.ms").unwrap().is_default);
551 }
552
553 #[test]
554 fn test_topic_config_manager() {
555 let manager = TopicConfigManager::new();
556
557 let config = manager.get_or_default("test-topic");
559 assert_eq!(config.retention_ms, DEFAULT_RETENTION_MS);
560
561 let changes = vec![
563 ("retention.ms".to_string(), Some("3600000".to_string())),
564 ("cleanup.policy".to_string(), Some("compact".to_string())),
565 ];
566 let changed = manager.apply_changes("test-topic", &changes).unwrap();
567 assert_eq!(changed, 2);
568
569 let config = manager.get("test-topic").unwrap();
571 assert_eq!(config.retention_ms, 3600000);
572 assert_eq!(config.cleanup_policy, CleanupPolicy::Compact);
573 }
574
575 #[test]
576 fn test_describe_configs() {
577 let manager = TopicConfigManager::new();
578
579 let config = TopicConfig {
581 retention_ms: 86400000,
582 ..Default::default()
583 };
584 manager.set("topic-a", config);
585
586 let descriptions = manager.describe(&["topic-a".to_string()]);
588 assert_eq!(descriptions.len(), 1);
589 assert_eq!(descriptions[0].0, "topic-a");
590 assert_eq!(
591 descriptions[0].1.get("retention.ms").unwrap().value,
592 "86400000"
593 );
594 assert!(!descriptions[0].1.get("retention.ms").unwrap().is_default);
595 }
596
597 #[test]
598 fn test_retention_duration() {
599 let config = TopicConfig::default();
600 let duration = config.retention_duration();
601 assert_eq!(duration, Duration::from_millis(DEFAULT_RETENTION_MS as u64));
602 }
603}