Skip to main content

rivven_core/
topic_config.rs

1//! Topic Configuration Management
2//!
3//! Provides configurable topic settings for production deployments:
4//!
5//! | Config Key | Type | Default | Description |
6//! |------------|------|---------|-------------|
7//! | `retention.ms` | i64 | 604800000 | Message retention time (7 days) |
8//! | `retention.bytes` | i64 | -1 | Max bytes per partition (-1 = unlimited) |
9//! | `max.message.bytes` | i64 | 1048576 | Max message size (1 MB) |
10//! | `segment.bytes` | i64 | 1073741824 | Segment file size (1 GB) |
11//! | `segment.ms` | i64 | 604800000 | Segment roll time (7 days) |
12//! | `cleanup.policy` | string | "delete" | "delete" or "compact" |
13//! | `min.insync.replicas` | i32 | 1 | Min replicas for ack |
14//! | `compression.type` | string | "producer" | Compression algorithm |
15
16use parking_lot::RwLock;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::time::Duration;
20use tokio::sync::broadcast;
21
22/// Default retention time (7 days in milliseconds)
23pub const DEFAULT_RETENTION_MS: i64 = 7 * 24 * 60 * 60 * 1000;
24
25/// Default retention bytes (-1 = unlimited)
26pub const DEFAULT_RETENTION_BYTES: i64 = -1;
27
28/// Default max message size (1 MB)
29pub const DEFAULT_MAX_MESSAGE_BYTES: i64 = 1024 * 1024;
30
31/// Default segment size (1 GB)
32pub const DEFAULT_SEGMENT_BYTES: i64 = 1024 * 1024 * 1024;
33
34/// Default segment roll time (7 days in milliseconds)
35pub const DEFAULT_SEGMENT_MS: i64 = 7 * 24 * 60 * 60 * 1000;
36
37/// Cleanup policy options
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
39pub enum CleanupPolicy {
40    /// Delete old segments based on retention
41    #[default]
42    Delete,
43    /// Compact log keeping only latest value per key
44    Compact,
45    /// Both delete and compact
46    CompactDelete,
47}
48
49impl std::fmt::Display for CleanupPolicy {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        match self {
52            CleanupPolicy::Delete => write!(f, "delete"),
53            CleanupPolicy::Compact => write!(f, "compact"),
54            CleanupPolicy::CompactDelete => write!(f, "compact,delete"),
55        }
56    }
57}
58
59impl std::str::FromStr for CleanupPolicy {
60    type Err = String;
61
62    fn from_str(s: &str) -> Result<Self, Self::Err> {
63        match s.to_lowercase().as_str() {
64            "delete" => Ok(CleanupPolicy::Delete),
65            "compact" => Ok(CleanupPolicy::Compact),
66            "compact,delete" | "delete,compact" => Ok(CleanupPolicy::CompactDelete),
67            _ => Err(format!("Invalid cleanup policy: {}", s)),
68        }
69    }
70}
71
72/// Compression type options
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
74pub enum CompressionType {
75    /// No compression
76    None,
77    /// Use producer's compression
78    #[default]
79    Producer,
80    /// LZ4 compression
81    Lz4,
82    /// Zstd compression
83    Zstd,
84    /// Snappy compression
85    Snappy,
86    /// Gzip compression
87    Gzip,
88}
89
90impl std::fmt::Display for CompressionType {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        match self {
93            CompressionType::None => write!(f, "none"),
94            CompressionType::Producer => write!(f, "producer"),
95            CompressionType::Lz4 => write!(f, "lz4"),
96            CompressionType::Zstd => write!(f, "zstd"),
97            CompressionType::Snappy => write!(f, "snappy"),
98            CompressionType::Gzip => write!(f, "gzip"),
99        }
100    }
101}
102
103impl std::str::FromStr for CompressionType {
104    type Err = String;
105
106    fn from_str(s: &str) -> Result<Self, Self::Err> {
107        match s.to_lowercase().as_str() {
108            "none" => Ok(CompressionType::None),
109            "producer" => Ok(CompressionType::Producer),
110            "lz4" => Ok(CompressionType::Lz4),
111            "zstd" => Ok(CompressionType::Zstd),
112            "snappy" => Ok(CompressionType::Snappy),
113            "gzip" => Ok(CompressionType::Gzip),
114            _ => Err(format!("Invalid compression type: {}", s)),
115        }
116    }
117}
118
119/// Topic configuration
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct TopicConfig {
122    /// Message retention time in milliseconds
123    pub retention_ms: i64,
124
125    /// Max bytes to retain per partition (-1 = unlimited)
126    pub retention_bytes: i64,
127
128    /// Maximum message size in bytes
129    pub max_message_bytes: i64,
130
131    /// Segment file size in bytes
132    pub segment_bytes: i64,
133
134    /// Segment roll time in milliseconds
135    pub segment_ms: i64,
136
137    /// Cleanup policy
138    pub cleanup_policy: CleanupPolicy,
139
140    /// Minimum in-sync replicas for ack
141    pub min_insync_replicas: i32,
142
143    /// Compression type
144    pub compression_type: CompressionType,
145}
146
147impl Default for TopicConfig {
148    fn default() -> Self {
149        Self {
150            retention_ms: DEFAULT_RETENTION_MS,
151            retention_bytes: DEFAULT_RETENTION_BYTES,
152            max_message_bytes: DEFAULT_MAX_MESSAGE_BYTES,
153            segment_bytes: DEFAULT_SEGMENT_BYTES,
154            segment_ms: DEFAULT_SEGMENT_MS,
155            cleanup_policy: CleanupPolicy::default(),
156            min_insync_replicas: 1,
157            compression_type: CompressionType::default(),
158        }
159    }
160}
161
162impl TopicConfig {
163    /// Create new topic config with defaults
164    pub fn new() -> Self {
165        Self::default()
166    }
167
168    /// Get retention duration
169    pub fn retention_duration(&self) -> Duration {
170        Duration::from_millis(self.retention_ms as u64)
171    }
172
173    /// Get segment roll duration
174    pub fn segment_roll_duration(&self) -> Duration {
175        Duration::from_millis(self.segment_ms as u64)
176    }
177
178    /// Convert to HashMap for describe response
179    pub fn to_map(&self) -> HashMap<String, ConfigValue> {
180        let mut map = HashMap::new();
181
182        map.insert(
183            "retention.ms".to_string(),
184            ConfigValue {
185                value: self.retention_ms.to_string(),
186                is_default: self.retention_ms == DEFAULT_RETENTION_MS,
187                is_read_only: false,
188                is_sensitive: false,
189            },
190        );
191
192        map.insert(
193            "retention.bytes".to_string(),
194            ConfigValue {
195                value: self.retention_bytes.to_string(),
196                is_default: self.retention_bytes == DEFAULT_RETENTION_BYTES,
197                is_read_only: false,
198                is_sensitive: false,
199            },
200        );
201
202        map.insert(
203            "max.message.bytes".to_string(),
204            ConfigValue {
205                value: self.max_message_bytes.to_string(),
206                is_default: self.max_message_bytes == DEFAULT_MAX_MESSAGE_BYTES,
207                is_read_only: false,
208                is_sensitive: false,
209            },
210        );
211
212        map.insert(
213            "segment.bytes".to_string(),
214            ConfigValue {
215                value: self.segment_bytes.to_string(),
216                is_default: self.segment_bytes == DEFAULT_SEGMENT_BYTES,
217                is_read_only: false,
218                is_sensitive: false,
219            },
220        );
221
222        map.insert(
223            "segment.ms".to_string(),
224            ConfigValue {
225                value: self.segment_ms.to_string(),
226                is_default: self.segment_ms == DEFAULT_SEGMENT_MS,
227                is_read_only: false,
228                is_sensitive: false,
229            },
230        );
231
232        map.insert(
233            "cleanup.policy".to_string(),
234            ConfigValue {
235                value: self.cleanup_policy.to_string(),
236                is_default: self.cleanup_policy == CleanupPolicy::default(),
237                is_read_only: false,
238                is_sensitive: false,
239            },
240        );
241
242        map.insert(
243            "min.insync.replicas".to_string(),
244            ConfigValue {
245                value: self.min_insync_replicas.to_string(),
246                is_default: self.min_insync_replicas == 1,
247                is_read_only: false,
248                is_sensitive: false,
249            },
250        );
251
252        map.insert(
253            "compression.type".to_string(),
254            ConfigValue {
255                value: self.compression_type.to_string(),
256                is_default: self.compression_type == CompressionType::default(),
257                is_read_only: false,
258                is_sensitive: false,
259            },
260        );
261
262        map
263    }
264
265    /// Apply a configuration change
266    pub fn apply(&mut self, key: &str, value: Option<&str>) -> Result<(), String> {
267        match key {
268            "retention.ms" => {
269                let val: i64 = match value {
270                    Some(v) => v
271                        .parse()
272                        .map_err(|e| format!("Invalid retention.ms: {}", e))?,
273                    None => DEFAULT_RETENTION_MS,
274                };
275                if val < -1 {
276                    return Err("retention.ms must be >= -1 (-1 = infinite)".into());
277                }
278                self.retention_ms = val;
279            }
280            "retention.bytes" => {
281                self.retention_bytes = match value {
282                    Some(v) => v
283                        .parse()
284                        .map_err(|e| format!("Invalid retention.bytes: {}", e))?,
285                    None => DEFAULT_RETENTION_BYTES,
286                };
287            }
288            "max.message.bytes" => {
289                let val: i64 = match value {
290                    Some(v) => v
291                        .parse()
292                        .map_err(|e| format!("Invalid max.message.bytes: {}", e))?,
293                    None => DEFAULT_MAX_MESSAGE_BYTES,
294                };
295                if val <= 0 {
296                    return Err("max.message.bytes must be > 0".into());
297                }
298                self.max_message_bytes = val;
299            }
300            "segment.bytes" => {
301                let val: i64 = match value {
302                    Some(v) => v
303                        .parse()
304                        .map_err(|e| format!("Invalid segment.bytes: {}", e))?,
305                    None => DEFAULT_SEGMENT_BYTES,
306                };
307                if val < 1024 {
308                    return Err("segment.bytes must be >= 1024".into());
309                }
310                self.segment_bytes = val;
311            }
312            "segment.ms" => {
313                self.segment_ms = match value {
314                    Some(v) => v
315                        .parse()
316                        .map_err(|e| format!("Invalid segment.ms: {}", e))?,
317                    None => DEFAULT_SEGMENT_MS,
318                };
319            }
320            "cleanup.policy" => {
321                self.cleanup_policy = match value {
322                    Some(v) => v.parse()?,
323                    None => CleanupPolicy::default(),
324                };
325            }
326            "min.insync.replicas" => {
327                let val: i32 = match value {
328                    Some(v) => v
329                        .parse()
330                        .map_err(|e| format!("Invalid min.insync.replicas: {}", e))?,
331                    None => 1,
332                };
333                if val <= 0 {
334                    return Err("min.insync.replicas must be > 0".into());
335                }
336                self.min_insync_replicas = val;
337            }
338            "compression.type" => {
339                self.compression_type = match value {
340                    Some(v) => v.parse()?,
341                    None => CompressionType::default(),
342                };
343            }
344            _ => {
345                return Err(format!("Unknown configuration key: {}", key));
346            }
347        }
348        Ok(())
349    }
350}
351
352/// Configuration value with metadata
353#[derive(Debug, Clone, Serialize, Deserialize)]
354pub struct ConfigValue {
355    /// Current value
356    pub value: String,
357    /// Whether this is the default value
358    pub is_default: bool,
359    /// Whether this config is read-only
360    pub is_read_only: bool,
361    /// Whether this config is sensitive
362    pub is_sensitive: bool,
363}
364
365/// Notification about a config change
366#[derive(Debug, Clone)]
367pub struct ConfigChangeEvent {
368    /// Topic name
369    pub topic: String,
370    /// Changed keys
371    pub changed_keys: Vec<String>,
372}
373
374/// Topic configuration manager
375///
376/// Manages per-topic configurations with thread-safe access.
377/// Supports change notifications via a broadcast channel.
378pub struct TopicConfigManager {
379    /// Per-topic configurations
380    configs: RwLock<HashMap<String, TopicConfig>>,
381    /// Change notification sender
382    change_tx: broadcast::Sender<ConfigChangeEvent>,
383}
384
385impl Default for TopicConfigManager {
386    fn default() -> Self {
387        Self::new()
388    }
389}
390
391impl TopicConfigManager {
392    /// Create a new topic config manager
393    pub fn new() -> Self {
394        let (change_tx, _) = broadcast::channel(256);
395        Self {
396            configs: RwLock::new(HashMap::new()),
397            change_tx,
398        }
399    }
400
401    /// Subscribe to config change notifications
402    pub fn subscribe(&self) -> broadcast::Receiver<ConfigChangeEvent> {
403        self.change_tx.subscribe()
404    }
405
406    /// Get or create config for a topic
407    pub fn get_or_default(&self, topic: &str) -> TopicConfig {
408        let configs = self.configs.read();
409        configs.get(topic).cloned().unwrap_or_default()
410    }
411
412    /// Get config for a topic
413    pub fn get(&self, topic: &str) -> Option<TopicConfig> {
414        let configs = self.configs.read();
415        configs.get(topic).cloned()
416    }
417
418    /// Set config for a topic
419    pub fn set(&self, topic: &str, config: TopicConfig) {
420        let keys: Vec<String> = config.to_map().keys().cloned().collect();
421        {
422            let mut configs = self.configs.write();
423            configs.insert(topic.to_string(), config);
424        }
425        let _ = self.change_tx.send(ConfigChangeEvent {
426            topic: topic.to_string(),
427            changed_keys: keys,
428        });
429    }
430
431    /// Apply configuration changes to a topic
432    pub fn apply_changes(
433        &self,
434        topic: &str,
435        changes: &[(String, Option<String>)],
436    ) -> Result<usize, String> {
437        let changed_keys: Vec<String> = changes.iter().map(|(k, _)| k.clone()).collect();
438        let changed = {
439            let mut configs = self.configs.write();
440            let config = configs.entry(topic.to_string()).or_default();
441
442            let mut changed = 0;
443            for (key, value) in changes {
444                config.apply(key, value.as_deref())?;
445                changed += 1;
446            }
447            changed
448        };
449
450        if changed > 0 {
451            let _ = self.change_tx.send(ConfigChangeEvent {
452                topic: topic.to_string(),
453                changed_keys,
454            });
455        }
456
457        Ok(changed)
458    }
459
460    /// Remove config for a topic (reverts to defaults)
461    pub fn remove(&self, topic: &str) {
462        let mut configs = self.configs.write();
463        configs.remove(topic);
464    }
465
466    /// List all configured topics
467    pub fn list_topics(&self) -> Vec<String> {
468        let configs = self.configs.read();
469        configs.keys().cloned().collect()
470    }
471
472    /// Describe configurations for topics
473    pub fn describe(&self, topics: &[String]) -> Vec<(String, HashMap<String, ConfigValue>)> {
474        let configs = self.configs.read();
475
476        if topics.is_empty() {
477            // Return all topics
478            configs
479                .iter()
480                .map(|(name, config)| (name.clone(), config.to_map()))
481                .collect()
482        } else {
483            // Return specific topics
484            topics
485                .iter()
486                .map(|name| {
487                    let config = configs.get(name).cloned().unwrap_or_default();
488                    (name.clone(), config.to_map())
489                })
490                .collect()
491        }
492    }
493}
494
495// ============================================================================
496// Tests
497// ============================================================================
498
499#[cfg(test)]
500mod tests {
501    use super::*;
502
503    #[test]
504    fn test_topic_config_defaults() {
505        let config = TopicConfig::default();
506        assert_eq!(config.retention_ms, DEFAULT_RETENTION_MS);
507        assert_eq!(config.retention_bytes, DEFAULT_RETENTION_BYTES);
508        assert_eq!(config.max_message_bytes, DEFAULT_MAX_MESSAGE_BYTES);
509        assert_eq!(config.cleanup_policy, CleanupPolicy::Delete);
510    }
511
512    #[test]
513    fn test_cleanup_policy_parse() {
514        assert_eq!(
515            "delete".parse::<CleanupPolicy>().unwrap(),
516            CleanupPolicy::Delete
517        );
518        assert_eq!(
519            "compact".parse::<CleanupPolicy>().unwrap(),
520            CleanupPolicy::Compact
521        );
522        assert_eq!(
523            "compact,delete".parse::<CleanupPolicy>().unwrap(),
524            CleanupPolicy::CompactDelete
525        );
526    }
527
528    #[test]
529    fn test_compression_type_parse() {
530        assert_eq!(
531            "lz4".parse::<CompressionType>().unwrap(),
532            CompressionType::Lz4
533        );
534        assert_eq!(
535            "zstd".parse::<CompressionType>().unwrap(),
536            CompressionType::Zstd
537        );
538        assert_eq!(
539            "producer".parse::<CompressionType>().unwrap(),
540            CompressionType::Producer
541        );
542    }
543
544    #[test]
545    fn test_apply_config_changes() {
546        let mut config = TopicConfig::default();
547
548        config.apply("retention.ms", Some("86400000")).unwrap();
549        assert_eq!(config.retention_ms, 86400000);
550
551        config.apply("cleanup.policy", Some("compact")).unwrap();
552        assert_eq!(config.cleanup_policy, CleanupPolicy::Compact);
553
554        config.apply("compression.type", Some("lz4")).unwrap();
555        assert_eq!(config.compression_type, CompressionType::Lz4);
556    }
557
558    #[test]
559    fn test_apply_reset_to_default() {
560        let mut config = TopicConfig {
561            retention_ms: 123456,
562            ..Default::default()
563        };
564
565        config.apply("retention.ms", None).unwrap();
566        assert_eq!(config.retention_ms, DEFAULT_RETENTION_MS);
567    }
568
569    #[test]
570    fn test_invalid_config_key() {
571        let mut config = TopicConfig::default();
572        let result = config.apply("invalid.key", Some("value"));
573        assert!(result.is_err());
574    }
575
576    #[test]
577    fn test_config_to_map() {
578        let config = TopicConfig::default();
579        let map = config.to_map();
580
581        assert!(map.contains_key("retention.ms"));
582        assert!(map.contains_key("cleanup.policy"));
583        assert!(map.get("retention.ms").unwrap().is_default);
584    }
585
586    #[test]
587    fn test_topic_config_manager() {
588        let manager = TopicConfigManager::new();
589
590        // Get default config for non-existent topic
591        let config = manager.get_or_default("test-topic");
592        assert_eq!(config.retention_ms, DEFAULT_RETENTION_MS);
593
594        // Apply changes
595        let changes = vec![
596            ("retention.ms".to_string(), Some("3600000".to_string())),
597            ("cleanup.policy".to_string(), Some("compact".to_string())),
598        ];
599        let changed = manager.apply_changes("test-topic", &changes).unwrap();
600        assert_eq!(changed, 2);
601
602        // Verify changes
603        let config = manager.get("test-topic").unwrap();
604        assert_eq!(config.retention_ms, 3600000);
605        assert_eq!(config.cleanup_policy, CleanupPolicy::Compact);
606    }
607
608    #[test]
609    fn test_describe_configs() {
610        let manager = TopicConfigManager::new();
611
612        // Set custom config
613        let config = TopicConfig {
614            retention_ms: 86400000,
615            ..Default::default()
616        };
617        manager.set("topic-a", config);
618
619        // Describe specific topic
620        let descriptions = manager.describe(&["topic-a".to_string()]);
621        assert_eq!(descriptions.len(), 1);
622        assert_eq!(descriptions[0].0, "topic-a");
623        assert_eq!(
624            descriptions[0].1.get("retention.ms").unwrap().value,
625            "86400000"
626        );
627        assert!(!descriptions[0].1.get("retention.ms").unwrap().is_default);
628    }
629
630    #[test]
631    fn test_retention_duration() {
632        let config = TopicConfig::default();
633        let duration = config.retention_duration();
634        assert_eq!(duration, Duration::from_millis(DEFAULT_RETENTION_MS as u64));
635    }
636
637    #[test]
638    fn test_config_change_notification() {
639        let manager = TopicConfigManager::new();
640        let mut rx = manager.subscribe();
641
642        // Apply changes should send notification
643        let changes = vec![("retention.ms".to_string(), Some("3600000".to_string()))];
644        manager.apply_changes("my-topic", &changes).unwrap();
645
646        let event = rx.try_recv().unwrap();
647        assert_eq!(event.topic, "my-topic");
648        assert!(event.changed_keys.contains(&"retention.ms".to_string()));
649    }
650
651    #[test]
652    fn test_config_set_notification() {
653        let manager = TopicConfigManager::new();
654        let mut rx = manager.subscribe();
655
656        let config = TopicConfig {
657            retention_ms: 86400000,
658            ..Default::default()
659        };
660        manager.set("topic-b", config);
661
662        let event = rx.try_recv().unwrap();
663        assert_eq!(event.topic, "topic-b");
664        assert!(!event.changed_keys.is_empty());
665    }
666}