Skip to main content

rivven_cluster/
config.rs

1//! Cluster configuration
2
3use serde::{Deserialize, Serialize};
4use std::net::SocketAddr;
5use std::path::PathBuf;
6use std::time::Duration;
7
8/// Cluster operating mode
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
10#[serde(rename_all = "lowercase")]
11pub enum ClusterMode {
12    /// Single node, no replication (default for simplicity)
13    #[default]
14    Standalone,
15    /// Multi-node cluster with replication
16    Cluster,
17}
18
19/// Cluster configuration
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct ClusterConfig {
22    /// Operating mode
23    pub mode: ClusterMode,
24
25    /// Node identifier (unique across cluster)
26    pub node_id: String,
27
28    /// Rack identifier for rack-aware placement
29    pub rack: Option<String>,
30
31    /// Data directory for Raft logs and state
32    pub data_dir: PathBuf,
33
34    /// Client-facing address
35    pub client_addr: SocketAddr,
36
37    /// Cluster communication address
38    pub cluster_addr: SocketAddr,
39
40    /// Advertised cluster address (for NAT/container environments)
41    pub advertise_addr: Option<SocketAddr>,
42
43    /// Seed nodes for initial cluster discovery
44    pub seeds: Vec<String>,
45
46    /// SWIM membership configuration
47    pub swim: SwimConfig,
48
49    /// Raft consensus configuration
50    pub raft: RaftConfig,
51
52    /// Replication configuration
53    pub replication: ReplicationConfig,
54
55    /// Topic defaults
56    pub topic_defaults: TopicDefaults,
57}
58
59impl Default for ClusterConfig {
60    fn default() -> Self {
61        Self::standalone()
62    }
63}
64
65impl ClusterConfig {
66    /// Create standalone configuration (single node, no cluster)
67    pub fn standalone() -> Self {
68        Self {
69            mode: ClusterMode::Standalone,
70            node_id: "standalone".to_string(),
71            rack: None,
72            data_dir: PathBuf::from("./data"),
73            client_addr: "0.0.0.0:9092".parse().unwrap(),
74            cluster_addr: "0.0.0.0:9093".parse().unwrap(),
75            advertise_addr: None,
76            seeds: vec![],
77            swim: SwimConfig::default(),
78            raft: RaftConfig::default(),
79            replication: ReplicationConfig::standalone(),
80            topic_defaults: TopicDefaults::standalone(),
81        }
82    }
83
84    /// Create cluster configuration builder
85    pub fn cluster() -> ClusterConfigBuilder {
86        ClusterConfigBuilder::new()
87    }
88
89    /// Check if running in cluster mode
90    pub fn is_cluster(&self) -> bool {
91        matches!(self.mode, ClusterMode::Cluster)
92    }
93
94    /// Get the advertised address (for other nodes to connect)
95    pub fn advertised_cluster_addr(&self) -> SocketAddr {
96        self.advertise_addr.unwrap_or(self.cluster_addr)
97    }
98}
99
100/// Builder for cluster configuration
101#[derive(Debug, Default)]
102pub struct ClusterConfigBuilder {
103    node_id: Option<String>,
104    rack: Option<String>,
105    data_dir: Option<PathBuf>,
106    client_addr: Option<SocketAddr>,
107    cluster_addr: Option<SocketAddr>,
108    advertise_addr: Option<SocketAddr>,
109    seeds: Vec<String>,
110    swim: Option<SwimConfig>,
111    raft: Option<RaftConfig>,
112    replication: Option<ReplicationConfig>,
113    topic_defaults: Option<TopicDefaults>,
114}
115
116impl ClusterConfigBuilder {
117    pub fn new() -> Self {
118        Self::default()
119    }
120
121    pub fn node_id(mut self, id: impl Into<String>) -> Self {
122        self.node_id = Some(id.into());
123        self
124    }
125
126    pub fn rack(mut self, rack: impl Into<String>) -> Self {
127        self.rack = Some(rack.into());
128        self
129    }
130
131    pub fn data_dir(mut self, dir: impl Into<PathBuf>) -> Self {
132        self.data_dir = Some(dir.into());
133        self
134    }
135
136    pub fn client_addr(mut self, addr: SocketAddr) -> Self {
137        self.client_addr = Some(addr);
138        self
139    }
140
141    pub fn cluster_addr(mut self, addr: SocketAddr) -> Self {
142        self.cluster_addr = Some(addr);
143        self
144    }
145
146    pub fn advertise_addr(mut self, addr: SocketAddr) -> Self {
147        self.advertise_addr = Some(addr);
148        self
149    }
150
151    pub fn seeds(mut self, seeds: Vec<impl Into<String>>) -> Self {
152        self.seeds = seeds.into_iter().map(|s| s.into()).collect();
153        self
154    }
155
156    pub fn swim(mut self, config: SwimConfig) -> Self {
157        self.swim = Some(config);
158        self
159    }
160
161    pub fn raft(mut self, config: RaftConfig) -> Self {
162        self.raft = Some(config);
163        self
164    }
165
166    pub fn replication(mut self, config: ReplicationConfig) -> Self {
167        self.replication = Some(config);
168        self
169    }
170
171    pub fn build(self) -> ClusterConfig {
172        ClusterConfig {
173            mode: ClusterMode::Cluster,
174            node_id: self.node_id.unwrap_or_else(|| {
175                let id = uuid::Uuid::new_v4().to_string();
176                tracing::warn!(
177                    node_id = %id,
178                    "node_id not configured — generated random UUID. \
179                     Restarting without explicit node_id creates a new identity, \
180                     losing all state associated with the previous node."
181                );
182                id
183            }),
184            rack: self.rack,
185            data_dir: self.data_dir.unwrap_or_else(|| PathBuf::from("./data")),
186            client_addr: self
187                .client_addr
188                .unwrap_or(std::net::SocketAddr::from(([0, 0, 0, 0], 9092))),
189            cluster_addr: self
190                .cluster_addr
191                .unwrap_or(std::net::SocketAddr::from(([0, 0, 0, 0], 9093))),
192            advertise_addr: self.advertise_addr,
193            seeds: self.seeds,
194            swim: self.swim.unwrap_or_default(),
195            raft: self.raft.unwrap_or_default(),
196            replication: self.replication.unwrap_or_default(),
197            topic_defaults: self.topic_defaults.unwrap_or_default(),
198        }
199    }
200}
201
202/// SWIM protocol configuration
203#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct SwimConfig {
205    /// Interval between probe rounds
206    pub ping_interval: Duration,
207
208    /// Timeout for direct ping
209    pub ping_timeout: Duration,
210
211    /// Number of indirect probes on ping failure
212    pub indirect_probes: usize,
213
214    /// Number of targets probed concurrently per round.
215    ///
216    /// The original SWIM paper probes one random target per interval, giving
217    /// O(N × interval) worst-case detection. Probing K targets concurrently
218    /// reduces this to O(N/K × interval) ≈ O(log N) when K = ⌈log₂(N)⌉.
219    pub probes_per_round: usize,
220
221    /// Multiplier for suspicion timeout (suspicion_mult * ping_interval)
222    pub suspicion_multiplier: u32,
223
224    /// Maximum number of updates to piggyback on messages
225    pub max_gossip_updates: usize,
226
227    /// Interval for full state sync
228    pub sync_interval: Duration,
229
230    /// Shared secret for HMAC-SHA256 authentication of SWIM messages.
231    /// When set, all outgoing messages include an HMAC tag and all incoming
232    /// messages are verified before processing. Prevents unauthorized nodes
233    /// from joining the cluster or injecting gossip messages.
234    #[serde(default, skip_serializing_if = "Option::is_none")]
235    pub auth_token: Option<String>,
236}
237
238impl Default for SwimConfig {
239    fn default() -> Self {
240        Self {
241            ping_interval: Duration::from_secs(1),
242            ping_timeout: Duration::from_millis(500),
243            indirect_probes: 3,
244            probes_per_round: 3,
245            suspicion_multiplier: 4,
246            max_gossip_updates: 10,
247            sync_interval: Duration::from_secs(30),
248            auth_token: None,
249        }
250    }
251}
252
253/// Raft consensus configuration
254#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct RaftConfig {
256    /// Minimum election timeout
257    pub election_timeout_min: Duration,
258
259    /// Maximum election timeout
260    pub election_timeout_max: Duration,
261
262    /// Heartbeat interval
263    pub heartbeat_interval: Duration,
264
265    /// Snapshot threshold (entries before snapshot)
266    pub snapshot_threshold: u64,
267
268    /// Maximum entries per append
269    pub max_entries_per_append: u64,
270
271    /// Replication batch size
272    pub replication_batch_size: u64,
273
274    /// Shared secret for Raft RPC authentication.
275    ///
276    /// When set, all outgoing Raft RPCs include an `X-Rivven-Cluster-Secret`
277    /// header, and all incoming RPCs are rejected unless their header matches.
278    /// This prevents unauthorized nodes from joining the Raft group or casting
279    /// votes. Use a strong random string (>= 32 bytes recommended).
280    ///
281    /// Prevents unauthenticated Raft RPC access.
282    #[serde(default, skip_serializing_if = "Option::is_none")]
283    pub cluster_secret: Option<String>,
284}
285
286impl Default for RaftConfig {
287    fn default() -> Self {
288        Self {
289            election_timeout_min: Duration::from_millis(150),
290            election_timeout_max: Duration::from_millis(300),
291            heartbeat_interval: Duration::from_millis(50),
292            snapshot_threshold: 10000,
293            max_entries_per_append: 100,
294            replication_batch_size: 1000,
295            cluster_secret: None,
296        }
297    }
298}
299
300/// Replication configuration
301#[derive(Debug, Clone, Serialize, Deserialize)]
302pub struct ReplicationConfig {
303    /// Default replication factor for new topics
304    pub default_replication_factor: u16,
305
306    /// Minimum in-sync replicas required for writes
307    pub min_isr: u16,
308
309    /// Maximum lag (in messages) before removing from ISR
310    pub replica_lag_max_messages: u64,
311
312    /// Maximum lag (in time) before removing from ISR
313    pub replica_lag_max_time: Duration,
314
315    /// Interval for follower fetch requests
316    pub fetch_interval: Duration,
317
318    /// Maximum bytes per fetch request
319    pub fetch_max_bytes: u32,
320
321    /// Allow unclean leader election (may lose data)
322    pub unclean_leader_election: bool,
323}
324
325impl Default for ReplicationConfig {
326    fn default() -> Self {
327        Self {
328            default_replication_factor: 3,
329            min_isr: 2,
330            replica_lag_max_messages: 10000,
331            replica_lag_max_time: Duration::from_secs(30),
332            fetch_interval: Duration::from_millis(100),
333            fetch_max_bytes: 10 * 1024 * 1024, // 10 MB
334            unclean_leader_election: false,
335        }
336    }
337}
338
339impl ReplicationConfig {
340    /// Standalone configuration (no replication)
341    pub fn standalone() -> Self {
342        Self {
343            default_replication_factor: 1,
344            min_isr: 1,
345            ..Default::default()
346        }
347    }
348}
349
350/// Default topic configuration
351#[derive(Debug, Clone, Serialize, Deserialize)]
352pub struct TopicDefaults {
353    /// Default number of partitions
354    pub partitions: u32,
355
356    /// Default replication factor
357    pub replication_factor: u16,
358
359    /// Default retention period
360    pub retention: Duration,
361
362    /// Default segment size
363    pub segment_size: u64,
364}
365
366impl Default for TopicDefaults {
367    fn default() -> Self {
368        Self {
369            partitions: 6,
370            replication_factor: 3,
371            retention: Duration::from_secs(7 * 24 * 60 * 60), // 7 days
372            segment_size: 1024 * 1024 * 1024,                 // 1 GB
373        }
374    }
375}
376
377impl TopicDefaults {
378    /// Standalone defaults (single partition, no replication)
379    pub fn standalone() -> Self {
380        Self {
381            partitions: 1,
382            replication_factor: 1,
383            ..Default::default()
384        }
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391
392    #[test]
393    fn test_standalone_config() {
394        let config = ClusterConfig::standalone();
395        assert!(!config.is_cluster());
396        assert_eq!(config.mode, ClusterMode::Standalone);
397        assert_eq!(config.replication.default_replication_factor, 1);
398    }
399
400    #[test]
401    fn test_cluster_config_builder() {
402        let config = ClusterConfig::cluster()
403            .node_id("node-1")
404            .rack("rack-a")
405            .seeds(vec!["node-1:9093", "node-2:9093"])
406            .build();
407
408        assert!(config.is_cluster());
409        assert_eq!(config.node_id, "node-1");
410        assert_eq!(config.rack, Some("rack-a".to_string()));
411        assert_eq!(config.seeds.len(), 2);
412    }
413}