1use parking_lot::RwLock;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::time::Duration;
20use tokio::sync::broadcast;
21
22pub const DEFAULT_RETENTION_MS: i64 = 7 * 24 * 60 * 60 * 1000;
24
25pub const DEFAULT_RETENTION_BYTES: i64 = -1;
27
28pub const DEFAULT_MAX_MESSAGE_BYTES: i64 = 1024 * 1024;
30
31pub const DEFAULT_SEGMENT_BYTES: i64 = 1024 * 1024 * 1024;
33
34pub const DEFAULT_SEGMENT_MS: i64 = 7 * 24 * 60 * 60 * 1000;
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
39pub enum CleanupPolicy {
40 #[default]
42 Delete,
43 Compact,
45 CompactDelete,
47}
48
49impl std::fmt::Display for CleanupPolicy {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 match self {
52 CleanupPolicy::Delete => write!(f, "delete"),
53 CleanupPolicy::Compact => write!(f, "compact"),
54 CleanupPolicy::CompactDelete => write!(f, "compact,delete"),
55 }
56 }
57}
58
59impl std::str::FromStr for CleanupPolicy {
60 type Err = String;
61
62 fn from_str(s: &str) -> Result<Self, Self::Err> {
63 match s.to_lowercase().as_str() {
64 "delete" => Ok(CleanupPolicy::Delete),
65 "compact" => Ok(CleanupPolicy::Compact),
66 "compact,delete" | "delete,compact" => Ok(CleanupPolicy::CompactDelete),
67 _ => Err(format!("Invalid cleanup policy: {}", s)),
68 }
69 }
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
74pub enum CompressionType {
75 None,
77 #[default]
79 Producer,
80 Lz4,
82 Zstd,
84 Snappy,
86 Gzip,
88}
89
90impl std::fmt::Display for CompressionType {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 match self {
93 CompressionType::None => write!(f, "none"),
94 CompressionType::Producer => write!(f, "producer"),
95 CompressionType::Lz4 => write!(f, "lz4"),
96 CompressionType::Zstd => write!(f, "zstd"),
97 CompressionType::Snappy => write!(f, "snappy"),
98 CompressionType::Gzip => write!(f, "gzip"),
99 }
100 }
101}
102
103impl std::str::FromStr for CompressionType {
104 type Err = String;
105
106 fn from_str(s: &str) -> Result<Self, Self::Err> {
107 match s.to_lowercase().as_str() {
108 "none" => Ok(CompressionType::None),
109 "producer" => Ok(CompressionType::Producer),
110 "lz4" => Ok(CompressionType::Lz4),
111 "zstd" => Ok(CompressionType::Zstd),
112 "snappy" => Ok(CompressionType::Snappy),
113 "gzip" => Ok(CompressionType::Gzip),
114 _ => Err(format!("Invalid compression type: {}", s)),
115 }
116 }
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct TopicConfig {
122 pub retention_ms: i64,
124
125 pub retention_bytes: i64,
127
128 pub max_message_bytes: i64,
130
131 pub segment_bytes: i64,
133
134 pub segment_ms: i64,
136
137 pub cleanup_policy: CleanupPolicy,
139
140 pub min_insync_replicas: i32,
142
143 pub compression_type: CompressionType,
145}
146
147impl Default for TopicConfig {
148 fn default() -> Self {
149 Self {
150 retention_ms: DEFAULT_RETENTION_MS,
151 retention_bytes: DEFAULT_RETENTION_BYTES,
152 max_message_bytes: DEFAULT_MAX_MESSAGE_BYTES,
153 segment_bytes: DEFAULT_SEGMENT_BYTES,
154 segment_ms: DEFAULT_SEGMENT_MS,
155 cleanup_policy: CleanupPolicy::default(),
156 min_insync_replicas: 1,
157 compression_type: CompressionType::default(),
158 }
159 }
160}
161
162impl TopicConfig {
163 pub fn new() -> Self {
165 Self::default()
166 }
167
168 pub fn retention_duration(&self) -> Duration {
170 Duration::from_millis(self.retention_ms as u64)
171 }
172
173 pub fn segment_roll_duration(&self) -> Duration {
175 Duration::from_millis(self.segment_ms as u64)
176 }
177
178 pub fn to_map(&self) -> HashMap<String, ConfigValue> {
180 let mut map = HashMap::new();
181
182 map.insert(
183 "retention.ms".to_string(),
184 ConfigValue {
185 value: self.retention_ms.to_string(),
186 is_default: self.retention_ms == DEFAULT_RETENTION_MS,
187 is_read_only: false,
188 is_sensitive: false,
189 },
190 );
191
192 map.insert(
193 "retention.bytes".to_string(),
194 ConfigValue {
195 value: self.retention_bytes.to_string(),
196 is_default: self.retention_bytes == DEFAULT_RETENTION_BYTES,
197 is_read_only: false,
198 is_sensitive: false,
199 },
200 );
201
202 map.insert(
203 "max.message.bytes".to_string(),
204 ConfigValue {
205 value: self.max_message_bytes.to_string(),
206 is_default: self.max_message_bytes == DEFAULT_MAX_MESSAGE_BYTES,
207 is_read_only: false,
208 is_sensitive: false,
209 },
210 );
211
212 map.insert(
213 "segment.bytes".to_string(),
214 ConfigValue {
215 value: self.segment_bytes.to_string(),
216 is_default: self.segment_bytes == DEFAULT_SEGMENT_BYTES,
217 is_read_only: false,
218 is_sensitive: false,
219 },
220 );
221
222 map.insert(
223 "segment.ms".to_string(),
224 ConfigValue {
225 value: self.segment_ms.to_string(),
226 is_default: self.segment_ms == DEFAULT_SEGMENT_MS,
227 is_read_only: false,
228 is_sensitive: false,
229 },
230 );
231
232 map.insert(
233 "cleanup.policy".to_string(),
234 ConfigValue {
235 value: self.cleanup_policy.to_string(),
236 is_default: self.cleanup_policy == CleanupPolicy::default(),
237 is_read_only: false,
238 is_sensitive: false,
239 },
240 );
241
242 map.insert(
243 "min.insync.replicas".to_string(),
244 ConfigValue {
245 value: self.min_insync_replicas.to_string(),
246 is_default: self.min_insync_replicas == 1,
247 is_read_only: false,
248 is_sensitive: false,
249 },
250 );
251
252 map.insert(
253 "compression.type".to_string(),
254 ConfigValue {
255 value: self.compression_type.to_string(),
256 is_default: self.compression_type == CompressionType::default(),
257 is_read_only: false,
258 is_sensitive: false,
259 },
260 );
261
262 map
263 }
264
265 pub fn apply(&mut self, key: &str, value: Option<&str>) -> Result<(), String> {
267 match key {
268 "retention.ms" => {
269 let val: i64 = match value {
270 Some(v) => v
271 .parse()
272 .map_err(|e| format!("Invalid retention.ms: {}", e))?,
273 None => DEFAULT_RETENTION_MS,
274 };
275 if val < -1 {
276 return Err("retention.ms must be >= -1 (-1 = infinite)".into());
277 }
278 self.retention_ms = val;
279 }
280 "retention.bytes" => {
281 self.retention_bytes = match value {
282 Some(v) => v
283 .parse()
284 .map_err(|e| format!("Invalid retention.bytes: {}", e))?,
285 None => DEFAULT_RETENTION_BYTES,
286 };
287 }
288 "max.message.bytes" => {
289 let val: i64 = match value {
290 Some(v) => v
291 .parse()
292 .map_err(|e| format!("Invalid max.message.bytes: {}", e))?,
293 None => DEFAULT_MAX_MESSAGE_BYTES,
294 };
295 if val <= 0 {
296 return Err("max.message.bytes must be > 0".into());
297 }
298 self.max_message_bytes = val;
299 }
300 "segment.bytes" => {
301 let val: i64 = match value {
302 Some(v) => v
303 .parse()
304 .map_err(|e| format!("Invalid segment.bytes: {}", e))?,
305 None => DEFAULT_SEGMENT_BYTES,
306 };
307 if val < 1024 {
308 return Err("segment.bytes must be >= 1024".into());
309 }
310 self.segment_bytes = val;
311 }
312 "segment.ms" => {
313 self.segment_ms = match value {
314 Some(v) => v
315 .parse()
316 .map_err(|e| format!("Invalid segment.ms: {}", e))?,
317 None => DEFAULT_SEGMENT_MS,
318 };
319 }
320 "cleanup.policy" => {
321 self.cleanup_policy = match value {
322 Some(v) => v.parse()?,
323 None => CleanupPolicy::default(),
324 };
325 }
326 "min.insync.replicas" => {
327 let val: i32 = match value {
328 Some(v) => v
329 .parse()
330 .map_err(|e| format!("Invalid min.insync.replicas: {}", e))?,
331 None => 1,
332 };
333 if val <= 0 {
334 return Err("min.insync.replicas must be > 0".into());
335 }
336 self.min_insync_replicas = val;
337 }
338 "compression.type" => {
339 self.compression_type = match value {
340 Some(v) => v.parse()?,
341 None => CompressionType::default(),
342 };
343 }
344 _ => {
345 return Err(format!("Unknown configuration key: {}", key));
346 }
347 }
348 Ok(())
349 }
350}
351
352#[derive(Debug, Clone, Serialize, Deserialize)]
354pub struct ConfigValue {
355 pub value: String,
357 pub is_default: bool,
359 pub is_read_only: bool,
361 pub is_sensitive: bool,
363}
364
365#[derive(Debug, Clone)]
367pub struct ConfigChangeEvent {
368 pub topic: String,
370 pub changed_keys: Vec<String>,
372}
373
374pub struct TopicConfigManager {
379 configs: RwLock<HashMap<String, TopicConfig>>,
381 change_tx: broadcast::Sender<ConfigChangeEvent>,
383}
384
385impl Default for TopicConfigManager {
386 fn default() -> Self {
387 Self::new()
388 }
389}
390
391impl TopicConfigManager {
392 pub fn new() -> Self {
394 let (change_tx, _) = broadcast::channel(256);
395 Self {
396 configs: RwLock::new(HashMap::new()),
397 change_tx,
398 }
399 }
400
401 pub fn subscribe(&self) -> broadcast::Receiver<ConfigChangeEvent> {
403 self.change_tx.subscribe()
404 }
405
406 pub fn get_or_default(&self, topic: &str) -> TopicConfig {
408 let configs = self.configs.read();
409 configs.get(topic).cloned().unwrap_or_default()
410 }
411
412 pub fn get(&self, topic: &str) -> Option<TopicConfig> {
414 let configs = self.configs.read();
415 configs.get(topic).cloned()
416 }
417
418 pub fn set(&self, topic: &str, config: TopicConfig) {
420 let keys: Vec<String> = config.to_map().keys().cloned().collect();
421 {
422 let mut configs = self.configs.write();
423 configs.insert(topic.to_string(), config);
424 }
425 let _ = self.change_tx.send(ConfigChangeEvent {
426 topic: topic.to_string(),
427 changed_keys: keys,
428 });
429 }
430
431 pub fn apply_changes(
433 &self,
434 topic: &str,
435 changes: &[(String, Option<String>)],
436 ) -> Result<usize, String> {
437 let changed_keys: Vec<String> = changes.iter().map(|(k, _)| k.clone()).collect();
438 let changed = {
439 let mut configs = self.configs.write();
440 let config = configs.entry(topic.to_string()).or_default();
441
442 let mut changed = 0;
443 for (key, value) in changes {
444 config.apply(key, value.as_deref())?;
445 changed += 1;
446 }
447 changed
448 };
449
450 if changed > 0 {
451 let _ = self.change_tx.send(ConfigChangeEvent {
452 topic: topic.to_string(),
453 changed_keys,
454 });
455 }
456
457 Ok(changed)
458 }
459
460 pub fn remove(&self, topic: &str) {
462 let mut configs = self.configs.write();
463 configs.remove(topic);
464 }
465
466 pub fn list_topics(&self) -> Vec<String> {
468 let configs = self.configs.read();
469 configs.keys().cloned().collect()
470 }
471
472 pub fn describe(&self, topics: &[String]) -> Vec<(String, HashMap<String, ConfigValue>)> {
474 let configs = self.configs.read();
475
476 if topics.is_empty() {
477 configs
479 .iter()
480 .map(|(name, config)| (name.clone(), config.to_map()))
481 .collect()
482 } else {
483 topics
485 .iter()
486 .map(|name| {
487 let config = configs.get(name).cloned().unwrap_or_default();
488 (name.clone(), config.to_map())
489 })
490 .collect()
491 }
492 }
493}
494
495#[cfg(test)]
500mod tests {
501 use super::*;
502
503 #[test]
504 fn test_topic_config_defaults() {
505 let config = TopicConfig::default();
506 assert_eq!(config.retention_ms, DEFAULT_RETENTION_MS);
507 assert_eq!(config.retention_bytes, DEFAULT_RETENTION_BYTES);
508 assert_eq!(config.max_message_bytes, DEFAULT_MAX_MESSAGE_BYTES);
509 assert_eq!(config.cleanup_policy, CleanupPolicy::Delete);
510 }
511
512 #[test]
513 fn test_cleanup_policy_parse() {
514 assert_eq!(
515 "delete".parse::<CleanupPolicy>().unwrap(),
516 CleanupPolicy::Delete
517 );
518 assert_eq!(
519 "compact".parse::<CleanupPolicy>().unwrap(),
520 CleanupPolicy::Compact
521 );
522 assert_eq!(
523 "compact,delete".parse::<CleanupPolicy>().unwrap(),
524 CleanupPolicy::CompactDelete
525 );
526 }
527
528 #[test]
529 fn test_compression_type_parse() {
530 assert_eq!(
531 "lz4".parse::<CompressionType>().unwrap(),
532 CompressionType::Lz4
533 );
534 assert_eq!(
535 "zstd".parse::<CompressionType>().unwrap(),
536 CompressionType::Zstd
537 );
538 assert_eq!(
539 "producer".parse::<CompressionType>().unwrap(),
540 CompressionType::Producer
541 );
542 }
543
544 #[test]
545 fn test_apply_config_changes() {
546 let mut config = TopicConfig::default();
547
548 config.apply("retention.ms", Some("86400000")).unwrap();
549 assert_eq!(config.retention_ms, 86400000);
550
551 config.apply("cleanup.policy", Some("compact")).unwrap();
552 assert_eq!(config.cleanup_policy, CleanupPolicy::Compact);
553
554 config.apply("compression.type", Some("lz4")).unwrap();
555 assert_eq!(config.compression_type, CompressionType::Lz4);
556 }
557
558 #[test]
559 fn test_apply_reset_to_default() {
560 let mut config = TopicConfig {
561 retention_ms: 123456,
562 ..Default::default()
563 };
564
565 config.apply("retention.ms", None).unwrap();
566 assert_eq!(config.retention_ms, DEFAULT_RETENTION_MS);
567 }
568
569 #[test]
570 fn test_invalid_config_key() {
571 let mut config = TopicConfig::default();
572 let result = config.apply("invalid.key", Some("value"));
573 assert!(result.is_err());
574 }
575
576 #[test]
577 fn test_config_to_map() {
578 let config = TopicConfig::default();
579 let map = config.to_map();
580
581 assert!(map.contains_key("retention.ms"));
582 assert!(map.contains_key("cleanup.policy"));
583 assert!(map.get("retention.ms").unwrap().is_default);
584 }
585
586 #[test]
587 fn test_topic_config_manager() {
588 let manager = TopicConfigManager::new();
589
590 let config = manager.get_or_default("test-topic");
592 assert_eq!(config.retention_ms, DEFAULT_RETENTION_MS);
593
594 let changes = vec![
596 ("retention.ms".to_string(), Some("3600000".to_string())),
597 ("cleanup.policy".to_string(), Some("compact".to_string())),
598 ];
599 let changed = manager.apply_changes("test-topic", &changes).unwrap();
600 assert_eq!(changed, 2);
601
602 let config = manager.get("test-topic").unwrap();
604 assert_eq!(config.retention_ms, 3600000);
605 assert_eq!(config.cleanup_policy, CleanupPolicy::Compact);
606 }
607
608 #[test]
609 fn test_describe_configs() {
610 let manager = TopicConfigManager::new();
611
612 let config = TopicConfig {
614 retention_ms: 86400000,
615 ..Default::default()
616 };
617 manager.set("topic-a", config);
618
619 let descriptions = manager.describe(&["topic-a".to_string()]);
621 assert_eq!(descriptions.len(), 1);
622 assert_eq!(descriptions[0].0, "topic-a");
623 assert_eq!(
624 descriptions[0].1.get("retention.ms").unwrap().value,
625 "86400000"
626 );
627 assert!(!descriptions[0].1.get("retention.ms").unwrap().is_default);
628 }
629
630 #[test]
631 fn test_retention_duration() {
632 let config = TopicConfig::default();
633 let duration = config.retention_duration();
634 assert_eq!(duration, Duration::from_millis(DEFAULT_RETENTION_MS as u64));
635 }
636
637 #[test]
638 fn test_config_change_notification() {
639 let manager = TopicConfigManager::new();
640 let mut rx = manager.subscribe();
641
642 let changes = vec![("retention.ms".to_string(), Some("3600000".to_string()))];
644 manager.apply_changes("my-topic", &changes).unwrap();
645
646 let event = rx.try_recv().unwrap();
647 assert_eq!(event.topic, "my-topic");
648 assert!(event.changed_keys.contains(&"retention.ms".to_string()));
649 }
650
651 #[test]
652 fn test_config_set_notification() {
653 let manager = TopicConfigManager::new();
654 let mut rx = manager.subscribe();
655
656 let config = TopicConfig {
657 retention_ms: 86400000,
658 ..Default::default()
659 };
660 manager.set("topic-b", config);
661
662 let event = rx.try_recv().unwrap();
663 assert_eq!(event.topic, "topic-b");
664 assert!(!event.changed_keys.is_empty());
665 }
666}