Skip to main content

rivven_core/
topic_config.rs

1//! Topic Configuration Management
2//!
3//! Provides configurable topic settings for production deployments:
4//!
5//! | Config Key | Type | Default | Description |
6//! |------------|------|---------|-------------|
7//! | `retention.ms` | i64 | 604800000 | Message retention time (7 days) |
8//! | `retention.bytes` | i64 | -1 | Max bytes per partition (-1 = unlimited) |
9//! | `max.message.bytes` | i64 | 1048576 | Max message size (1 MB) |
10//! | `segment.bytes` | i64 | 1073741824 | Segment file size (1 GB) |
11//! | `segment.ms` | i64 | 604800000 | Segment roll time (7 days) |
12//! | `cleanup.policy` | string | "delete" | "delete" or "compact" |
13//! | `min.insync.replicas` | i32 | 1 | Min replicas for ack |
14//! | `compression.type` | string | "producer" | Compression algorithm |
15
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::sync::RwLock;
19use std::time::Duration;
20
21/// Default retention time (7 days in milliseconds)
22pub const DEFAULT_RETENTION_MS: i64 = 7 * 24 * 60 * 60 * 1000;
23
24/// Default retention bytes (-1 = unlimited)
25pub const DEFAULT_RETENTION_BYTES: i64 = -1;
26
27/// Default max message size (1 MB)
28pub const DEFAULT_MAX_MESSAGE_BYTES: i64 = 1024 * 1024;
29
30/// Default segment size (1 GB)
31pub const DEFAULT_SEGMENT_BYTES: i64 = 1024 * 1024 * 1024;
32
33/// Default segment roll time (7 days in milliseconds)
34pub const DEFAULT_SEGMENT_MS: i64 = 7 * 24 * 60 * 60 * 1000;
35
36/// Cleanup policy options
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
38pub enum CleanupPolicy {
39    /// Delete old segments based on retention
40    #[default]
41    Delete,
42    /// Compact log keeping only latest value per key
43    Compact,
44    /// Both delete and compact
45    CompactDelete,
46}
47
48impl std::fmt::Display for CleanupPolicy {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        match self {
51            CleanupPolicy::Delete => write!(f, "delete"),
52            CleanupPolicy::Compact => write!(f, "compact"),
53            CleanupPolicy::CompactDelete => write!(f, "compact,delete"),
54        }
55    }
56}
57
58impl std::str::FromStr for CleanupPolicy {
59    type Err = String;
60
61    fn from_str(s: &str) -> Result<Self, Self::Err> {
62        match s.to_lowercase().as_str() {
63            "delete" => Ok(CleanupPolicy::Delete),
64            "compact" => Ok(CleanupPolicy::Compact),
65            "compact,delete" | "delete,compact" => Ok(CleanupPolicy::CompactDelete),
66            _ => Err(format!("Invalid cleanup policy: {}", s)),
67        }
68    }
69}
70
71/// Compression type options
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
73pub enum CompressionType {
74    /// No compression
75    None,
76    /// Use producer's compression
77    #[default]
78    Producer,
79    /// LZ4 compression
80    Lz4,
81    /// Zstd compression
82    Zstd,
83    /// Snappy compression
84    Snappy,
85    /// Gzip compression
86    Gzip,
87}
88
89impl std::fmt::Display for CompressionType {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match self {
92            CompressionType::None => write!(f, "none"),
93            CompressionType::Producer => write!(f, "producer"),
94            CompressionType::Lz4 => write!(f, "lz4"),
95            CompressionType::Zstd => write!(f, "zstd"),
96            CompressionType::Snappy => write!(f, "snappy"),
97            CompressionType::Gzip => write!(f, "gzip"),
98        }
99    }
100}
101
102impl std::str::FromStr for CompressionType {
103    type Err = String;
104
105    fn from_str(s: &str) -> Result<Self, Self::Err> {
106        match s.to_lowercase().as_str() {
107            "none" => Ok(CompressionType::None),
108            "producer" => Ok(CompressionType::Producer),
109            "lz4" => Ok(CompressionType::Lz4),
110            "zstd" => Ok(CompressionType::Zstd),
111            "snappy" => Ok(CompressionType::Snappy),
112            "gzip" => Ok(CompressionType::Gzip),
113            _ => Err(format!("Invalid compression type: {}", s)),
114        }
115    }
116}
117
118/// Topic configuration
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct TopicConfig {
121    /// Message retention time in milliseconds
122    pub retention_ms: i64,
123
124    /// Max bytes to retain per partition (-1 = unlimited)
125    pub retention_bytes: i64,
126
127    /// Maximum message size in bytes
128    pub max_message_bytes: i64,
129
130    /// Segment file size in bytes
131    pub segment_bytes: i64,
132
133    /// Segment roll time in milliseconds
134    pub segment_ms: i64,
135
136    /// Cleanup policy
137    pub cleanup_policy: CleanupPolicy,
138
139    /// Minimum in-sync replicas for ack
140    pub min_insync_replicas: i32,
141
142    /// Compression type
143    pub compression_type: CompressionType,
144}
145
146impl Default for TopicConfig {
147    fn default() -> Self {
148        Self {
149            retention_ms: DEFAULT_RETENTION_MS,
150            retention_bytes: DEFAULT_RETENTION_BYTES,
151            max_message_bytes: DEFAULT_MAX_MESSAGE_BYTES,
152            segment_bytes: DEFAULT_SEGMENT_BYTES,
153            segment_ms: DEFAULT_SEGMENT_MS,
154            cleanup_policy: CleanupPolicy::default(),
155            min_insync_replicas: 1,
156            compression_type: CompressionType::default(),
157        }
158    }
159}
160
161impl TopicConfig {
162    /// Create new topic config with defaults
163    pub fn new() -> Self {
164        Self::default()
165    }
166
167    /// Get retention duration
168    pub fn retention_duration(&self) -> Duration {
169        Duration::from_millis(self.retention_ms as u64)
170    }
171
172    /// Get segment roll duration
173    pub fn segment_roll_duration(&self) -> Duration {
174        Duration::from_millis(self.segment_ms as u64)
175    }
176
177    /// Convert to HashMap for describe response
178    pub fn to_map(&self) -> HashMap<String, ConfigValue> {
179        let mut map = HashMap::new();
180
181        map.insert(
182            "retention.ms".to_string(),
183            ConfigValue {
184                value: self.retention_ms.to_string(),
185                is_default: self.retention_ms == DEFAULT_RETENTION_MS,
186                is_read_only: false,
187                is_sensitive: false,
188            },
189        );
190
191        map.insert(
192            "retention.bytes".to_string(),
193            ConfigValue {
194                value: self.retention_bytes.to_string(),
195                is_default: self.retention_bytes == DEFAULT_RETENTION_BYTES,
196                is_read_only: false,
197                is_sensitive: false,
198            },
199        );
200
201        map.insert(
202            "max.message.bytes".to_string(),
203            ConfigValue {
204                value: self.max_message_bytes.to_string(),
205                is_default: self.max_message_bytes == DEFAULT_MAX_MESSAGE_BYTES,
206                is_read_only: false,
207                is_sensitive: false,
208            },
209        );
210
211        map.insert(
212            "segment.bytes".to_string(),
213            ConfigValue {
214                value: self.segment_bytes.to_string(),
215                is_default: self.segment_bytes == DEFAULT_SEGMENT_BYTES,
216                is_read_only: false,
217                is_sensitive: false,
218            },
219        );
220
221        map.insert(
222            "segment.ms".to_string(),
223            ConfigValue {
224                value: self.segment_ms.to_string(),
225                is_default: self.segment_ms == DEFAULT_SEGMENT_MS,
226                is_read_only: false,
227                is_sensitive: false,
228            },
229        );
230
231        map.insert(
232            "cleanup.policy".to_string(),
233            ConfigValue {
234                value: self.cleanup_policy.to_string(),
235                is_default: self.cleanup_policy == CleanupPolicy::default(),
236                is_read_only: false,
237                is_sensitive: false,
238            },
239        );
240
241        map.insert(
242            "min.insync.replicas".to_string(),
243            ConfigValue {
244                value: self.min_insync_replicas.to_string(),
245                is_default: self.min_insync_replicas == 1,
246                is_read_only: false,
247                is_sensitive: false,
248            },
249        );
250
251        map.insert(
252            "compression.type".to_string(),
253            ConfigValue {
254                value: self.compression_type.to_string(),
255                is_default: self.compression_type == CompressionType::default(),
256                is_read_only: false,
257                is_sensitive: false,
258            },
259        );
260
261        map
262    }
263
264    /// Apply a configuration change
265    pub fn apply(&mut self, key: &str, value: Option<&str>) -> Result<(), String> {
266        match key {
267            "retention.ms" => {
268                self.retention_ms = match value {
269                    Some(v) => v
270                        .parse()
271                        .map_err(|e| format!("Invalid retention.ms: {}", e))?,
272                    None => DEFAULT_RETENTION_MS,
273                };
274            }
275            "retention.bytes" => {
276                self.retention_bytes = match value {
277                    Some(v) => v
278                        .parse()
279                        .map_err(|e| format!("Invalid retention.bytes: {}", e))?,
280                    None => DEFAULT_RETENTION_BYTES,
281                };
282            }
283            "max.message.bytes" => {
284                self.max_message_bytes = match value {
285                    Some(v) => v
286                        .parse()
287                        .map_err(|e| format!("Invalid max.message.bytes: {}", e))?,
288                    None => DEFAULT_MAX_MESSAGE_BYTES,
289                };
290            }
291            "segment.bytes" => {
292                self.segment_bytes = match value {
293                    Some(v) => v
294                        .parse()
295                        .map_err(|e| format!("Invalid segment.bytes: {}", e))?,
296                    None => DEFAULT_SEGMENT_BYTES,
297                };
298            }
299            "segment.ms" => {
300                self.segment_ms = match value {
301                    Some(v) => v
302                        .parse()
303                        .map_err(|e| format!("Invalid segment.ms: {}", e))?,
304                    None => DEFAULT_SEGMENT_MS,
305                };
306            }
307            "cleanup.policy" => {
308                self.cleanup_policy = match value {
309                    Some(v) => v.parse()?,
310                    None => CleanupPolicy::default(),
311                };
312            }
313            "min.insync.replicas" => {
314                self.min_insync_replicas = match value {
315                    Some(v) => v
316                        .parse()
317                        .map_err(|e| format!("Invalid min.insync.replicas: {}", e))?,
318                    None => 1,
319                };
320            }
321            "compression.type" => {
322                self.compression_type = match value {
323                    Some(v) => v.parse()?,
324                    None => CompressionType::default(),
325                };
326            }
327            _ => {
328                return Err(format!("Unknown configuration key: {}", key));
329            }
330        }
331        Ok(())
332    }
333}
334
335/// Configuration value with metadata
336#[derive(Debug, Clone, Serialize, Deserialize)]
337pub struct ConfigValue {
338    /// Current value
339    pub value: String,
340    /// Whether this is the default value
341    pub is_default: bool,
342    /// Whether this config is read-only
343    pub is_read_only: bool,
344    /// Whether this config is sensitive
345    pub is_sensitive: bool,
346}
347
348/// Topic configuration manager
349///
350/// Manages per-topic configurations with thread-safe access.
351pub struct TopicConfigManager {
352    /// Per-topic configurations
353    configs: RwLock<HashMap<String, TopicConfig>>,
354}
355
356impl Default for TopicConfigManager {
357    fn default() -> Self {
358        Self::new()
359    }
360}
361
362impl TopicConfigManager {
363    /// Create a new topic config manager
364    pub fn new() -> Self {
365        Self {
366            configs: RwLock::new(HashMap::new()),
367        }
368    }
369
370    /// Get or create config for a topic
371    pub fn get_or_default(&self, topic: &str) -> TopicConfig {
372        let configs = self
373            .configs
374            .read()
375            .expect("topic config manager lock poisoned");
376        configs.get(topic).cloned().unwrap_or_default()
377    }
378
379    /// Get config for a topic
380    pub fn get(&self, topic: &str) -> Option<TopicConfig> {
381        let configs = self
382            .configs
383            .read()
384            .expect("topic config manager lock poisoned");
385        configs.get(topic).cloned()
386    }
387
388    /// Set config for a topic
389    pub fn set(&self, topic: &str, config: TopicConfig) {
390        let mut configs = self
391            .configs
392            .write()
393            .expect("topic config manager lock poisoned");
394        configs.insert(topic.to_string(), config);
395    }
396
397    /// Apply configuration changes to a topic
398    pub fn apply_changes(
399        &self,
400        topic: &str,
401        changes: &[(String, Option<String>)],
402    ) -> Result<usize, String> {
403        let mut configs = self
404            .configs
405            .write()
406            .expect("topic config manager lock poisoned");
407        let config = configs.entry(topic.to_string()).or_default();
408
409        let mut changed = 0;
410        for (key, value) in changes {
411            config.apply(key, value.as_deref())?;
412            changed += 1;
413        }
414
415        Ok(changed)
416    }
417
418    /// Remove config for a topic (reverts to defaults)
419    pub fn remove(&self, topic: &str) {
420        let mut configs = self
421            .configs
422            .write()
423            .expect("topic config manager lock poisoned");
424        configs.remove(topic);
425    }
426
427    /// List all configured topics
428    pub fn list_topics(&self) -> Vec<String> {
429        let configs = self
430            .configs
431            .read()
432            .expect("topic config manager lock poisoned");
433        configs.keys().cloned().collect()
434    }
435
436    /// Describe configurations for topics
437    pub fn describe(&self, topics: &[String]) -> Vec<(String, HashMap<String, ConfigValue>)> {
438        let configs = self
439            .configs
440            .read()
441            .expect("topic config manager lock poisoned");
442
443        if topics.is_empty() {
444            // Return all topics
445            configs
446                .iter()
447                .map(|(name, config)| (name.clone(), config.to_map()))
448                .collect()
449        } else {
450            // Return specific topics
451            topics
452                .iter()
453                .map(|name| {
454                    let config = configs.get(name).cloned().unwrap_or_default();
455                    (name.clone(), config.to_map())
456                })
457                .collect()
458        }
459    }
460}
461
462// ============================================================================
463// Tests
464// ============================================================================
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469
470    #[test]
471    fn test_topic_config_defaults() {
472        let config = TopicConfig::default();
473        assert_eq!(config.retention_ms, DEFAULT_RETENTION_MS);
474        assert_eq!(config.retention_bytes, DEFAULT_RETENTION_BYTES);
475        assert_eq!(config.max_message_bytes, DEFAULT_MAX_MESSAGE_BYTES);
476        assert_eq!(config.cleanup_policy, CleanupPolicy::Delete);
477    }
478
479    #[test]
480    fn test_cleanup_policy_parse() {
481        assert_eq!(
482            "delete".parse::<CleanupPolicy>().unwrap(),
483            CleanupPolicy::Delete
484        );
485        assert_eq!(
486            "compact".parse::<CleanupPolicy>().unwrap(),
487            CleanupPolicy::Compact
488        );
489        assert_eq!(
490            "compact,delete".parse::<CleanupPolicy>().unwrap(),
491            CleanupPolicy::CompactDelete
492        );
493    }
494
495    #[test]
496    fn test_compression_type_parse() {
497        assert_eq!(
498            "lz4".parse::<CompressionType>().unwrap(),
499            CompressionType::Lz4
500        );
501        assert_eq!(
502            "zstd".parse::<CompressionType>().unwrap(),
503            CompressionType::Zstd
504        );
505        assert_eq!(
506            "producer".parse::<CompressionType>().unwrap(),
507            CompressionType::Producer
508        );
509    }
510
511    #[test]
512    fn test_apply_config_changes() {
513        let mut config = TopicConfig::default();
514
515        config.apply("retention.ms", Some("86400000")).unwrap();
516        assert_eq!(config.retention_ms, 86400000);
517
518        config.apply("cleanup.policy", Some("compact")).unwrap();
519        assert_eq!(config.cleanup_policy, CleanupPolicy::Compact);
520
521        config.apply("compression.type", Some("lz4")).unwrap();
522        assert_eq!(config.compression_type, CompressionType::Lz4);
523    }
524
525    #[test]
526    fn test_apply_reset_to_default() {
527        let mut config = TopicConfig {
528            retention_ms: 123456,
529            ..Default::default()
530        };
531
532        config.apply("retention.ms", None).unwrap();
533        assert_eq!(config.retention_ms, DEFAULT_RETENTION_MS);
534    }
535
536    #[test]
537    fn test_invalid_config_key() {
538        let mut config = TopicConfig::default();
539        let result = config.apply("invalid.key", Some("value"));
540        assert!(result.is_err());
541    }
542
543    #[test]
544    fn test_config_to_map() {
545        let config = TopicConfig::default();
546        let map = config.to_map();
547
548        assert!(map.contains_key("retention.ms"));
549        assert!(map.contains_key("cleanup.policy"));
550        assert!(map.get("retention.ms").unwrap().is_default);
551    }
552
553    #[test]
554    fn test_topic_config_manager() {
555        let manager = TopicConfigManager::new();
556
557        // Get default config for non-existent topic
558        let config = manager.get_or_default("test-topic");
559        assert_eq!(config.retention_ms, DEFAULT_RETENTION_MS);
560
561        // Apply changes
562        let changes = vec![
563            ("retention.ms".to_string(), Some("3600000".to_string())),
564            ("cleanup.policy".to_string(), Some("compact".to_string())),
565        ];
566        let changed = manager.apply_changes("test-topic", &changes).unwrap();
567        assert_eq!(changed, 2);
568
569        // Verify changes
570        let config = manager.get("test-topic").unwrap();
571        assert_eq!(config.retention_ms, 3600000);
572        assert_eq!(config.cleanup_policy, CleanupPolicy::Compact);
573    }
574
575    #[test]
576    fn test_describe_configs() {
577        let manager = TopicConfigManager::new();
578
579        // Set custom config
580        let config = TopicConfig {
581            retention_ms: 86400000,
582            ..Default::default()
583        };
584        manager.set("topic-a", config);
585
586        // Describe specific topic
587        let descriptions = manager.describe(&["topic-a".to_string()]);
588        assert_eq!(descriptions.len(), 1);
589        assert_eq!(descriptions[0].0, "topic-a");
590        assert_eq!(
591            descriptions[0].1.get("retention.ms").unwrap().value,
592            "86400000"
593        );
594        assert!(!descriptions[0].1.get("retention.ms").unwrap().is_default);
595    }
596
597    #[test]
598    fn test_retention_duration() {
599        let config = TopicConfig::default();
600        let duration = config.retention_duration();
601        assert_eq!(duration, Duration::from_millis(DEFAULT_RETENTION_MS as u64));
602    }
603}