Skip to main content

crabka_log/
config.rs

1//! Tunables for `Log`. Defaults match Apache Kafka 4.2.
2
3use std::time::Duration;
4
5use crabka_compression::CompressionType;
6
7/// Per-topic policy for what to do with old log segments.
8///
9/// `Delete` (default): age- or size-based segment deletion via
10/// `crate::retention`. `Compact`: newest-wins dedup-by-key,
11/// implemented in `crate::compact` and invoked through
12/// [`crate::Log::compact`].
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
14pub enum CleanupPolicy {
15    #[default]
16    Delete,
17    Compact,
18}
19
20/// Tunables for [`Log`](crate::Log) behavior.
21///
22/// Defaults match Apache Kafka 4.2 (`segment.bytes`, `segment.ms`,
23/// `retention.ms`, `index.interval.bytes`, etc.). The
24/// [`Default`](Self::default) impl is the recommended starting point;
25/// most production deployments will only override `retention_ms` and
26/// `retention_bytes`.
27#[derive(Debug, Clone)]
28pub struct LogConfig {
29    /// Roll the active segment when it exceeds this many bytes. Kafka default: 1 GiB.
30    pub segment_bytes: u64,
31
32    /// Roll the active segment when its first record is older than this. Kafka default: 7 days.
33    pub segment_ms: Duration,
34
35    /// Delete sealed segments older than this. `None` = unlimited. Kafka default: 7 days.
36    pub retention_ms: Option<Duration>,
37
38    /// Delete oldest sealed segments until the total `.log` size fits. `None` = unlimited.
39    pub retention_bytes: Option<u64>,
40
41    /// Write one `.index`/`.timeindex` entry per N bytes of `.log`. Kafka default: 4 KiB.
42    pub index_interval_bytes: u32,
43
44    /// fsync after every `append`. Default off; broker manages fsync separately.
45    pub flush_on_append: bool,
46
47    /// On open, CRC every batch in the active segment from the last index entry to EOF.
48    pub validate_on_open: bool,
49
50    /// Cleanup policy. Defaults to `Delete`. See [`CleanupPolicy`].
51    pub cleanup_policy: CleanupPolicy,
52
53    /// Broker-side recompression target. `None` is Kafka's
54    /// `compression.type=producer` (pass-through — store the batch
55    /// exactly as the producer sent it). `Some(c)` forces every batch
56    /// the broker accepts on this partition to be re-encoded to `c`
57    /// before write. Matches Kafka's per-topic `compression.type`
58    /// config: `gzip` / `snappy` / `lz4` / `zstd` / `uncompressed` map
59    /// to `Some(_)`; `producer` (the default) maps to `None`.
60    pub compression_type: Option<CompressionType>,
61
62    /// When `true`, this partition's sealed segments (KIP-405)
63    /// are eligible to be copied to the remote tier by the broker's
64    /// `RemoteLogManager`. Maps to Kafka's per-topic `remote.storage.enable`.
65    /// Default `false` (Kafka's default — tiered storage is opt-in per topic).
66    pub remote_storage_enable: bool,
67
68    /// Local-disk time-retention window for tiered
69    /// partitions (KIP-405). `None` inherits `retention_ms`. Default `None`.
70    pub local_retention_ms: Option<Duration>,
71
72    /// Local-disk size budget for tiered partitions (KIP-405).
73    /// `None` inherits `retention_bytes`. Default `None`.
74    pub local_retention_bytes: Option<u64>,
75}
76
77impl Default for LogConfig {
78    fn default() -> Self {
79        Self {
80            segment_bytes: 1024 * 1024 * 1024,
81            segment_ms: Duration::from_hours(7 * 24),
82            retention_ms: Some(Duration::from_hours(7 * 24)),
83            retention_bytes: None,
84            index_interval_bytes: 4096,
85            flush_on_append: false,
86            validate_on_open: true,
87            cleanup_policy: CleanupPolicy::Delete,
88            // Pass-through: producers' compression choice wins. Kafka's
89            // default. Operators flip this to a specific codec on
90            // topics where they want broker-side enforcement.
91            compression_type: None,
92            // Tiered storage is opt-in per topic (Kafka default false).
93            remote_storage_enable: false,
94            local_retention_ms: None,
95            local_retention_bytes: None,
96        }
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103    use assert2::assert;
104
105    #[test]
106    fn defaults_match_kafka_4x() {
107        let c = LogConfig::default();
108        assert!(c.segment_bytes == 1 << 30);
109        assert!(c.index_interval_bytes == 4096);
110        assert!(!c.flush_on_append);
111        assert!(c.validate_on_open);
112    }
113
114    #[test]
115    fn default_cleanup_policy_is_delete() {
116        let c = LogConfig::default();
117        assert!(c.cleanup_policy == CleanupPolicy::Delete);
118    }
119
120    #[test]
121    fn default_compression_is_producer_passthrough() {
122        let c = LogConfig::default();
123        assert!(c.compression_type == None);
124    }
125
126    #[test]
127    fn default_local_retention_is_none() {
128        let c = LogConfig::default();
129        assert!(c.local_retention_ms == None);
130        assert!(c.local_retention_bytes == None);
131    }
132}