crabka_log/config.rs
1//! Tunables for `Log`. Defaults match Apache Kafka 4.2.
2
3use std::time::Duration;
4
5use crabka_compression::CompressionType;
6
7/// Per-topic policy for what to do with old log segments.
8///
9/// `Delete` (default): age- or size-based segment deletion via
10/// `crate::retention`. `Compact`: newest-wins dedup-by-key,
11/// implemented in `crate::compact` and invoked through
12/// [`crate::Log::compact`].
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
14pub enum CleanupPolicy {
15 #[default]
16 Delete,
17 Compact,
18}
19
20/// Tunables for [`Log`](crate::Log) behavior.
21///
22/// Defaults match Apache Kafka 4.2 (`segment.bytes`, `segment.ms`,
23/// `retention.ms`, `index.interval.bytes`, etc.). The
24/// [`Default`](Self::default) impl is the recommended starting point;
25/// most production deployments will only override `retention_ms` and
26/// `retention_bytes`.
27#[derive(Debug, Clone)]
28pub struct LogConfig {
29 /// Roll the active segment when it exceeds this many bytes. Kafka default: 1 GiB.
30 pub segment_bytes: u64,
31
32 /// Roll the active segment when its first record is older than this. Kafka default: 7 days.
33 pub segment_ms: Duration,
34
35 /// Delete sealed segments older than this. `None` = unlimited. Kafka default: 7 days.
36 pub retention_ms: Option<Duration>,
37
38 /// Delete oldest sealed segments until the total `.log` size fits. `None` = unlimited.
39 pub retention_bytes: Option<u64>,
40
41 /// Write one `.index`/`.timeindex` entry per N bytes of `.log`. Kafka default: 4 KiB.
42 pub index_interval_bytes: u32,
43
44 /// fsync after every `append`. Default off; broker manages fsync separately.
45 pub flush_on_append: bool,
46
47 /// On open, CRC every batch in the active segment from the last index entry to EOF.
48 pub validate_on_open: bool,
49
50 /// Cleanup policy. Defaults to `Delete`. See [`CleanupPolicy`].
51 pub cleanup_policy: CleanupPolicy,
52
53 /// Broker-side recompression target. `None` is Kafka's
54 /// `compression.type=producer` (pass-through — store the batch
55 /// exactly as the producer sent it). `Some(c)` forces every batch
56 /// the broker accepts on this partition to be re-encoded to `c`
57 /// before write. Matches Kafka's per-topic `compression.type`
58 /// config: `gzip` / `snappy` / `lz4` / `zstd` / `uncompressed` map
59 /// to `Some(_)`; `producer` (the default) maps to `None`.
60 pub compression_type: Option<CompressionType>,
61
62 /// When `true`, this partition's sealed segments (KIP-405)
63 /// are eligible to be copied to the remote tier by the broker's
64 /// `RemoteLogManager`. Maps to Kafka's per-topic `remote.storage.enable`.
65 /// Default `false` (Kafka's default — tiered storage is opt-in per topic).
66 pub remote_storage_enable: bool,
67
68 /// Local-disk time-retention window for tiered
69 /// partitions (KIP-405). `None` inherits `retention_ms`. Default `None`.
70 pub local_retention_ms: Option<Duration>,
71
72 /// Local-disk size budget for tiered partitions (KIP-405).
73 /// `None` inherits `retention_bytes`. Default `None`.
74 pub local_retention_bytes: Option<u64>,
75}
76
77impl Default for LogConfig {
78 fn default() -> Self {
79 Self {
80 segment_bytes: 1024 * 1024 * 1024,
81 segment_ms: Duration::from_hours(7 * 24),
82 retention_ms: Some(Duration::from_hours(7 * 24)),
83 retention_bytes: None,
84 index_interval_bytes: 4096,
85 flush_on_append: false,
86 validate_on_open: true,
87 cleanup_policy: CleanupPolicy::Delete,
88 // Pass-through: producers' compression choice wins. Kafka's
89 // default. Operators flip this to a specific codec on
90 // topics where they want broker-side enforcement.
91 compression_type: None,
92 // Tiered storage is opt-in per topic (Kafka default false).
93 remote_storage_enable: false,
94 local_retention_ms: None,
95 local_retention_bytes: None,
96 }
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103 use assert2::assert;
104
105 #[test]
106 fn defaults_match_kafka_4x() {
107 let c = LogConfig::default();
108 assert!(c.segment_bytes == 1 << 30);
109 assert!(c.index_interval_bytes == 4096);
110 assert!(!c.flush_on_append);
111 assert!(c.validate_on_open);
112 }
113
114 #[test]
115 fn default_cleanup_policy_is_delete() {
116 let c = LogConfig::default();
117 assert!(c.cleanup_policy == CleanupPolicy::Delete);
118 }
119
120 #[test]
121 fn default_compression_is_producer_passthrough() {
122 let c = LogConfig::default();
123 assert!(c.compression_type == None);
124 }
125
126 #[test]
127 fn default_local_retention_is_none() {
128 let c = LogConfig::default();
129 assert!(c.local_retention_ms == None);
130 assert!(c.local_retention_bytes == None);
131 }
132}