1use serde::{Deserialize, Serialize};
4use std::net::SocketAddr;
5use std::path::PathBuf;
6use std::time::Duration;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
10#[serde(rename_all = "lowercase")]
11pub enum ClusterMode {
12 #[default]
14 Standalone,
15 Cluster,
17}
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct ClusterConfig {
22 pub mode: ClusterMode,
24
25 pub node_id: String,
27
28 pub rack: Option<String>,
30
31 pub data_dir: PathBuf,
33
34 pub client_addr: SocketAddr,
36
37 pub cluster_addr: SocketAddr,
39
40 pub advertise_addr: Option<SocketAddr>,
42
43 pub seeds: Vec<String>,
45
46 pub swim: SwimConfig,
48
49 pub raft: RaftConfig,
51
52 pub replication: ReplicationConfig,
54
55 pub topic_defaults: TopicDefaults,
57}
58
59impl Default for ClusterConfig {
60 fn default() -> Self {
61 Self::standalone()
62 }
63}
64
65impl ClusterConfig {
66 pub fn standalone() -> Self {
68 Self {
69 mode: ClusterMode::Standalone,
70 node_id: "standalone".to_string(),
71 rack: None,
72 data_dir: PathBuf::from("./data"),
73 client_addr: "0.0.0.0:9092".parse().unwrap(),
74 cluster_addr: "0.0.0.0:9093".parse().unwrap(),
75 advertise_addr: None,
76 seeds: vec![],
77 swim: SwimConfig::default(),
78 raft: RaftConfig::default(),
79 replication: ReplicationConfig::standalone(),
80 topic_defaults: TopicDefaults::standalone(),
81 }
82 }
83
84 pub fn cluster() -> ClusterConfigBuilder {
86 ClusterConfigBuilder::new()
87 }
88
89 pub fn is_cluster(&self) -> bool {
91 matches!(self.mode, ClusterMode::Cluster)
92 }
93
94 pub fn advertised_cluster_addr(&self) -> SocketAddr {
96 self.advertise_addr.unwrap_or(self.cluster_addr)
97 }
98}
99
100#[derive(Debug, Default)]
102pub struct ClusterConfigBuilder {
103 node_id: Option<String>,
104 rack: Option<String>,
105 data_dir: Option<PathBuf>,
106 client_addr: Option<SocketAddr>,
107 cluster_addr: Option<SocketAddr>,
108 advertise_addr: Option<SocketAddr>,
109 seeds: Vec<String>,
110 swim: Option<SwimConfig>,
111 raft: Option<RaftConfig>,
112 replication: Option<ReplicationConfig>,
113 topic_defaults: Option<TopicDefaults>,
114}
115
116impl ClusterConfigBuilder {
117 pub fn new() -> Self {
118 Self::default()
119 }
120
121 pub fn node_id(mut self, id: impl Into<String>) -> Self {
122 self.node_id = Some(id.into());
123 self
124 }
125
126 pub fn rack(mut self, rack: impl Into<String>) -> Self {
127 self.rack = Some(rack.into());
128 self
129 }
130
131 pub fn data_dir(mut self, dir: impl Into<PathBuf>) -> Self {
132 self.data_dir = Some(dir.into());
133 self
134 }
135
136 pub fn client_addr(mut self, addr: SocketAddr) -> Self {
137 self.client_addr = Some(addr);
138 self
139 }
140
141 pub fn cluster_addr(mut self, addr: SocketAddr) -> Self {
142 self.cluster_addr = Some(addr);
143 self
144 }
145
146 pub fn advertise_addr(mut self, addr: SocketAddr) -> Self {
147 self.advertise_addr = Some(addr);
148 self
149 }
150
151 pub fn seeds(mut self, seeds: Vec<impl Into<String>>) -> Self {
152 self.seeds = seeds.into_iter().map(|s| s.into()).collect();
153 self
154 }
155
156 pub fn swim(mut self, config: SwimConfig) -> Self {
157 self.swim = Some(config);
158 self
159 }
160
161 pub fn raft(mut self, config: RaftConfig) -> Self {
162 self.raft = Some(config);
163 self
164 }
165
166 pub fn replication(mut self, config: ReplicationConfig) -> Self {
167 self.replication = Some(config);
168 self
169 }
170
171 pub fn build(self) -> ClusterConfig {
172 ClusterConfig {
173 mode: ClusterMode::Cluster,
174 node_id: self.node_id.unwrap_or_else(|| {
175 let id = uuid::Uuid::new_v4().to_string();
176 tracing::warn!(
177 node_id = %id,
178 "node_id not configured — generated random UUID. \
179 Restarting without explicit node_id creates a new identity, \
180 losing all state associated with the previous node."
181 );
182 id
183 }),
184 rack: self.rack,
185 data_dir: self.data_dir.unwrap_or_else(|| PathBuf::from("./data")),
186 client_addr: self
187 .client_addr
188 .unwrap_or(std::net::SocketAddr::from(([0, 0, 0, 0], 9092))),
189 cluster_addr: self
190 .cluster_addr
191 .unwrap_or(std::net::SocketAddr::from(([0, 0, 0, 0], 9093))),
192 advertise_addr: self.advertise_addr,
193 seeds: self.seeds,
194 swim: self.swim.unwrap_or_default(),
195 raft: self.raft.unwrap_or_default(),
196 replication: self.replication.unwrap_or_default(),
197 topic_defaults: self.topic_defaults.unwrap_or_default(),
198 }
199 }
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct SwimConfig {
205 pub ping_interval: Duration,
207
208 pub ping_timeout: Duration,
210
211 pub indirect_probes: usize,
213
214 pub probes_per_round: usize,
220
221 pub suspicion_multiplier: u32,
223
224 pub max_gossip_updates: usize,
226
227 pub sync_interval: Duration,
229
230 #[serde(default, skip_serializing_if = "Option::is_none")]
235 pub auth_token: Option<String>,
236}
237
238impl Default for SwimConfig {
239 fn default() -> Self {
240 Self {
241 ping_interval: Duration::from_secs(1),
242 ping_timeout: Duration::from_millis(500),
243 indirect_probes: 3,
244 probes_per_round: 3,
245 suspicion_multiplier: 4,
246 max_gossip_updates: 10,
247 sync_interval: Duration::from_secs(30),
248 auth_token: None,
249 }
250 }
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct RaftConfig {
256 pub election_timeout_min: Duration,
258
259 pub election_timeout_max: Duration,
261
262 pub heartbeat_interval: Duration,
264
265 pub snapshot_threshold: u64,
267
268 pub max_entries_per_append: u64,
270
271 pub replication_batch_size: u64,
273
274 #[serde(default, skip_serializing_if = "Option::is_none")]
283 pub cluster_secret: Option<String>,
284}
285
286impl Default for RaftConfig {
287 fn default() -> Self {
288 Self {
289 election_timeout_min: Duration::from_millis(150),
290 election_timeout_max: Duration::from_millis(300),
291 heartbeat_interval: Duration::from_millis(50),
292 snapshot_threshold: 10000,
293 max_entries_per_append: 100,
294 replication_batch_size: 1000,
295 cluster_secret: None,
296 }
297 }
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize)]
302pub struct ReplicationConfig {
303 pub default_replication_factor: u16,
305
306 pub min_isr: u16,
308
309 pub replica_lag_max_messages: u64,
311
312 pub replica_lag_max_time: Duration,
314
315 pub fetch_interval: Duration,
317
318 pub fetch_max_bytes: u32,
320
321 pub unclean_leader_election: bool,
323}
324
325impl Default for ReplicationConfig {
326 fn default() -> Self {
327 Self {
328 default_replication_factor: 3,
329 min_isr: 2,
330 replica_lag_max_messages: 10000,
331 replica_lag_max_time: Duration::from_secs(30),
332 fetch_interval: Duration::from_millis(100),
333 fetch_max_bytes: 10 * 1024 * 1024, unclean_leader_election: false,
335 }
336 }
337}
338
339impl ReplicationConfig {
340 pub fn standalone() -> Self {
342 Self {
343 default_replication_factor: 1,
344 min_isr: 1,
345 ..Default::default()
346 }
347 }
348}
349
350#[derive(Debug, Clone, Serialize, Deserialize)]
352pub struct TopicDefaults {
353 pub partitions: u32,
355
356 pub replication_factor: u16,
358
359 pub retention: Duration,
361
362 pub segment_size: u64,
364}
365
366impl Default for TopicDefaults {
367 fn default() -> Self {
368 Self {
369 partitions: 6,
370 replication_factor: 3,
371 retention: Duration::from_secs(7 * 24 * 60 * 60), segment_size: 1024 * 1024 * 1024, }
374 }
375}
376
377impl TopicDefaults {
378 pub fn standalone() -> Self {
380 Self {
381 partitions: 1,
382 replication_factor: 1,
383 ..Default::default()
384 }
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391
392 #[test]
393 fn test_standalone_config() {
394 let config = ClusterConfig::standalone();
395 assert!(!config.is_cluster());
396 assert_eq!(config.mode, ClusterMode::Standalone);
397 assert_eq!(config.replication.default_replication_factor, 1);
398 }
399
400 #[test]
401 fn test_cluster_config_builder() {
402 let config = ClusterConfig::cluster()
403 .node_id("node-1")
404 .rack("rack-a")
405 .seeds(vec!["node-1:9093", "node-2:9093"])
406 .build();
407
408 assert!(config.is_cluster());
409 assert_eq!(config.node_id, "node-1");
410 assert_eq!(config.rack, Some("rack-a".to_string()));
411 assert_eq!(config.seeds.len(), 2);
412 }
413}