amaters_cluster/config.rs
1//! Cluster node configuration: TOML file + environment variable overrides + dynamic reload.
2//!
3//! # Usage
4//!
5//! ```rust,no_run
6//! use amaters_cluster::config::NodeConfig;
7//!
8//! let cfg = NodeConfig::from_toml(r#"
9//! bind_addr = "0.0.0.0:7001"
10//! node_id = 1
11//! "#).expect("valid config");
12//! assert_eq!(cfg.node_id, 1);
13//! ```
14//!
15//! # Hot-reloadable fields
16//!
17//! The following fields can be updated at runtime without restarting the node
18//! (accessible via [`NodeConfig::dynamic`]):
19//!
20//! - `heartbeat_interval_ms`
21//! - `compaction_threshold`
22//!
23//! Fields that require a full restart:
24//!
25//! - `bind_addr`
26//! - `node_id`
27//! - `peers`
28//! - `election_timeout_ms`
29//! - `data_dir`
30//! - `metrics_addr`
31
32use serde::{Deserialize, Serialize};
33use std::path::PathBuf;
34
35// ---------------------------------------------------------------------------
36// Default helpers (used by serde)
37// ---------------------------------------------------------------------------
38
39fn default_heartbeat_ms() -> u64 {
40 150
41}
42
43fn default_election_timeout_ms() -> u64 {
44 300
45}
46
47fn default_compaction_threshold() -> usize {
48 10_000
49}
50
51fn default_metrics_addr() -> String {
52 "0.0.0.0:9091".to_string()
53}
54
55fn default_key_retention_count() -> usize {
56 3
57}
58
59// ---------------------------------------------------------------------------
60// NodeConfig
61// ---------------------------------------------------------------------------
62
63/// Full cluster node configuration.
64///
65/// Can be loaded from a TOML file with [`NodeConfig::load`] or from a TOML
66/// string with [`NodeConfig::from_toml`]. After loading, call
67/// [`NodeConfig::apply_env_overrides`] to layer environment variable
68/// overrides on top.
69///
70/// # Field restart requirements
71///
72/// | Field | Hot-reloadable |
73/// |-------|---------------|
74/// | `heartbeat_interval_ms` | Yes (via [`DynamicConfig`]) |
75/// | `compaction_threshold` | Yes (via [`DynamicConfig`]) |
76/// | All other fields | No — requires restart |
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct NodeConfig {
79 /// Bind address for Raft RPC server (e.g. `"0.0.0.0:7001"`).
80 pub bind_addr: String,
81
82 /// Node ID — must be unique across the cluster and `> 0`.
83 pub node_id: u64,
84
85 /// Peer addresses as `"node_id=addr"` strings, e.g. `["2=10.0.0.2:7001"]`.
86 #[serde(default)]
87 pub peers: Vec<String>,
88
89 /// Raft heartbeat interval in milliseconds (default 150).
90 ///
91 /// Hot-reloadable.
92 #[serde(default = "default_heartbeat_ms")]
93 pub heartbeat_interval_ms: u64,
94
95 /// Raft election timeout in milliseconds (default 300).
96 ///
97 /// Must be `>= 2 * heartbeat_interval_ms`. Requires restart to change.
98 #[serde(default = "default_election_timeout_ms")]
99 pub election_timeout_ms: u64,
100
101 /// Log compaction threshold: number of entries before triggering a
102 /// snapshot (default 10 000).
103 ///
104 /// Hot-reloadable.
105 #[serde(default = "default_compaction_threshold")]
106 pub compaction_threshold: usize,
107
108 /// Data directory for Raft log persistence. `None` disables persistence.
109 #[serde(default)]
110 pub data_dir: Option<PathBuf>,
111
112 /// Metrics HTTP endpoint address (default `"0.0.0.0:9091"`).
113 ///
114 /// Requires restart to change.
115 #[serde(default = "default_metrics_addr")]
116 pub metrics_addr: String,
117
118 /// Optional automatic-rotation interval for the log-encryption master
119 /// key, in seconds. `None` disables time-based rotation; rotation can
120 /// still be triggered manually via [`crate::key_rotation::KeyManager::rotate`].
121 ///
122 /// The background tokio task that drives time-based rotation is
123 /// **deferred** to a future cycle; this field reserves the
124 /// configuration surface for when it lands.
125 #[serde(default)]
126 pub key_rotation_interval_secs: Option<u64>,
127
128 /// Number of [`crate::key_rotation::KeyVersion`]s the
129 /// [`crate::key_rotation::KeyManager`] retains in its history (current
130 /// and previous keys). Default 3. Lower values reclaim memory faster
131 /// at the cost of decrypting fewer historical entries after rotation.
132 #[serde(default = "default_key_retention_count")]
133 pub key_retention_count: usize,
134}
135
136impl NodeConfig {
137 /// Load from a TOML file, then layer environment variable overrides.
138 ///
139 /// Env vars override individual fields:
140 ///
141 /// | Env var | Field |
142 /// |---------|-------|
143 /// | `AMATERS_BIND_ADDR` | `bind_addr` |
144 /// | `AMATERS_NODE_ID` | `node_id` |
145 /// | `AMATERS_PEERS` | `peers` (comma-separated `id=addr` pairs) |
146 /// | `AMATERS_HEARTBEAT_INTERVAL_MS` | `heartbeat_interval_ms` |
147 /// | `AMATERS_ELECTION_TIMEOUT_MS` | `election_timeout_ms` |
148 /// | `AMATERS_COMPACTION_THRESHOLD` | `compaction_threshold` |
149 /// | `AMATERS_DATA_DIR` | `data_dir` |
150 /// | `AMATERS_METRICS_ADDR` | `metrics_addr` |
151 pub fn load(path: &std::path::Path) -> Result<Self, ConfigError> {
152 let raw = std::fs::read_to_string(path)?;
153 let mut cfg = Self::from_toml(&raw)?;
154 cfg.apply_env_overrides();
155 Ok(cfg)
156 }
157
158 /// Parse from a TOML string directly.
159 ///
160 /// Useful in tests and when the configuration is provided via a
161 /// secret store rather than a file on disk. Does **not** apply env
162 /// overrides; call [`apply_env_overrides`](Self::apply_env_overrides) if
163 /// you also want those.
164 pub fn from_toml(toml_str: &str) -> Result<Self, ConfigError> {
165 let cfg: Self = toml::from_str(toml_str)?;
166 Ok(cfg)
167 }
168
169 /// Apply environment variable overrides from the current process
170 /// environment.
171 ///
172 /// Unset variables leave the corresponding field unchanged.
173 pub fn apply_env_overrides(&mut self) {
174 if let Ok(v) = std::env::var("AMATERS_BIND_ADDR") {
175 self.bind_addr = v;
176 }
177 if let Ok(v) = std::env::var("AMATERS_NODE_ID") {
178 if let Ok(n) = v.parse::<u64>() {
179 self.node_id = n;
180 }
181 }
182 if let Ok(v) = std::env::var("AMATERS_PEERS") {
183 // Comma-separated list of "node_id=addr" pairs
184 self.peers = v
185 .split(',')
186 .map(|s| s.trim().to_string())
187 .filter(|s| !s.is_empty())
188 .collect();
189 }
190 if let Ok(v) = std::env::var("AMATERS_HEARTBEAT_INTERVAL_MS") {
191 if let Ok(n) = v.parse::<u64>() {
192 self.heartbeat_interval_ms = n;
193 }
194 }
195 if let Ok(v) = std::env::var("AMATERS_ELECTION_TIMEOUT_MS") {
196 if let Ok(n) = v.parse::<u64>() {
197 self.election_timeout_ms = n;
198 }
199 }
200 if let Ok(v) = std::env::var("AMATERS_COMPACTION_THRESHOLD") {
201 if let Ok(n) = v.parse::<usize>() {
202 self.compaction_threshold = n;
203 }
204 }
205 if let Ok(v) = std::env::var("AMATERS_DATA_DIR") {
206 self.data_dir = Some(PathBuf::from(v));
207 }
208 if let Ok(v) = std::env::var("AMATERS_METRICS_ADDR") {
209 self.metrics_addr = v;
210 }
211 if let Ok(v) = std::env::var("AMATERS_KEY_ROTATION_INTERVAL_SECS") {
212 if let Ok(n) = v.parse::<u64>() {
213 self.key_rotation_interval_secs = Some(n);
214 }
215 }
216 if let Ok(v) = std::env::var("AMATERS_KEY_RETENTION_COUNT") {
217 if let Ok(n) = v.parse::<usize>() {
218 self.key_retention_count = n;
219 }
220 }
221 }
222
223 /// Extract the hot-reloadable subset of this configuration.
224 ///
225 /// The returned [`DynamicConfig`] can be stored in an
226 /// `Arc<parking_lot::RwLock<DynamicConfig>>` and updated in place when
227 /// the configuration is reloaded (e.g. on `SIGHUP` or via an admin RPC)
228 /// without restarting the node.
229 pub fn dynamic(&self) -> DynamicConfig {
230 DynamicConfig {
231 heartbeat_interval_ms: self.heartbeat_interval_ms,
232 compaction_threshold: self.compaction_threshold,
233 }
234 }
235
236 /// Validate configuration fields.
237 ///
238 /// Returns a list of [`ConfigError::Validation`] variants describing each
239 /// detected problem. An empty return value means the configuration is
240 /// valid.
241 pub fn validate(&self) -> Vec<ConfigError> {
242 let mut errors = Vec::new();
243
244 if self.bind_addr.is_empty() {
245 errors.push(ConfigError::Validation {
246 field: "bind_addr".to_string(),
247 reason: "must not be empty".to_string(),
248 });
249 } else if !self.bind_addr.contains(':') {
250 errors.push(ConfigError::Validation {
251 field: "bind_addr".to_string(),
252 reason: "must contain a ':' separator (e.g. \"0.0.0.0:7001\")".to_string(),
253 });
254 }
255
256 if self.node_id == 0 {
257 errors.push(ConfigError::Validation {
258 field: "node_id".to_string(),
259 reason: "must be > 0 (0 is reserved as a sentinel)".to_string(),
260 });
261 }
262
263 if self.heartbeat_interval_ms == 0 {
264 errors.push(ConfigError::Validation {
265 field: "heartbeat_interval_ms".to_string(),
266 reason: "must be > 0".to_string(),
267 });
268 }
269
270 // election_timeout must be at least 2× the heartbeat interval so
271 // that followers have a realistic chance to receive a heartbeat
272 // before timing out.
273 if self.heartbeat_interval_ms > 0
274 && self.election_timeout_ms < 2 * self.heartbeat_interval_ms
275 {
276 errors.push(ConfigError::Validation {
277 field: "election_timeout_ms".to_string(),
278 reason: format!(
279 "must be >= 2 * heartbeat_interval_ms ({} >= {})",
280 self.election_timeout_ms,
281 2 * self.heartbeat_interval_ms,
282 ),
283 });
284 }
285
286 errors
287 }
288}
289
290// ---------------------------------------------------------------------------
291// DynamicConfig
292// ---------------------------------------------------------------------------
293
294/// The subset of [`NodeConfig`] that can be hot-reloaded at runtime.
295///
296/// Store this in an `Arc<parking_lot::RwLock<DynamicConfig>>` inside the
297/// cluster node. When the node receives a `SIGHUP` or an admin RPC requesting
298/// a config reload, parse a new [`NodeConfig`] and replace the inner value:
299///
300/// ```rust,ignore
301/// *dynamic_config.write() = new_node_config.dynamic();
302/// ```
303///
304/// The Raft event loop reads from `Arc<RwLock<DynamicConfig>>` for the
305/// heartbeat interval, so changes take effect on the **next tick** without
306/// restarting the node.
307#[derive(Debug, Clone)]
308pub struct DynamicConfig {
309 /// Raft heartbeat interval in milliseconds.
310 pub heartbeat_interval_ms: u64,
311 /// Log compaction threshold (entries before snapshot).
312 pub compaction_threshold: usize,
313}
314
315// ---------------------------------------------------------------------------
316// ConfigError
317// ---------------------------------------------------------------------------
318
319/// Errors that can occur while loading or validating a [`NodeConfig`].
320#[derive(Debug, thiserror::Error)]
321pub enum ConfigError {
322 /// The configuration file could not be read.
323 #[error("IO error: {0}")]
324 Io(#[from] std::io::Error),
325
326 /// The TOML source could not be parsed.
327 #[error("TOML parse error: {0}")]
328 TomlParse(#[from] toml::de::Error),
329
330 /// A field failed semantic validation.
331 #[error("Validation error: field '{field}' — {reason}")]
332 Validation { field: String, reason: String },
333}
334
335// ---------------------------------------------------------------------------
336// Tests
337// ---------------------------------------------------------------------------
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342
343 const MINIMAL_TOML: &str = r#"
344bind_addr = "0.0.0.0:7001"
345node_id = 1
346"#;
347
348 /// Parsing a minimal TOML string fills required fields and applies
349 /// serde defaults for optional ones.
350 #[test]
351 fn test_config_from_toml() {
352 let cfg = NodeConfig::from_toml(MINIMAL_TOML).expect("valid TOML");
353
354 assert_eq!(cfg.bind_addr, "0.0.0.0:7001");
355 assert_eq!(cfg.node_id, 1);
356 assert!(cfg.peers.is_empty());
357 assert_eq!(cfg.heartbeat_interval_ms, 150);
358 assert_eq!(cfg.election_timeout_ms, 300);
359 assert_eq!(cfg.compaction_threshold, 10_000);
360 assert!(cfg.data_dir.is_none());
361 assert_eq!(cfg.metrics_addr, "0.0.0.0:9091");
362 assert!(
363 cfg.key_rotation_interval_secs.is_none(),
364 "rotation interval defaults to None (manual rotation only)"
365 );
366 assert_eq!(cfg.key_retention_count, 3);
367 }
368
369 /// Parsing a TOML with explicit `[cluster.encryption]`-style fields
370 /// applies them.
371 #[test]
372 fn test_config_encryption_fields_from_toml() {
373 let toml = r#"
374bind_addr = "0.0.0.0:7001"
375node_id = 1
376key_rotation_interval_secs = 86400
377key_retention_count = 5
378"#;
379 let cfg = NodeConfig::from_toml(toml).expect("valid TOML");
380 assert_eq!(cfg.key_rotation_interval_secs, Some(86_400));
381 assert_eq!(cfg.key_retention_count, 5);
382 }
383
384 /// Parsing a fully-specified TOML overrides all default values.
385 #[test]
386 fn test_config_from_toml_full() {
387 let toml = r#"
388bind_addr = "127.0.0.1:8001"
389node_id = 5
390peers = ["2=10.0.0.2:7001", "3=10.0.0.3:7001"]
391heartbeat_interval_ms = 50
392election_timeout_ms = 200
393compaction_threshold = 5000
394data_dir = "/var/data/raft"
395metrics_addr = "0.0.0.0:9999"
396"#;
397 let cfg = NodeConfig::from_toml(toml).expect("valid TOML");
398
399 assert_eq!(cfg.bind_addr, "127.0.0.1:8001");
400 assert_eq!(cfg.node_id, 5);
401 assert_eq!(cfg.peers, vec!["2=10.0.0.2:7001", "3=10.0.0.3:7001"]);
402 assert_eq!(cfg.heartbeat_interval_ms, 50);
403 assert_eq!(cfg.election_timeout_ms, 200);
404 assert_eq!(cfg.compaction_threshold, 5000);
405 assert_eq!(cfg.data_dir, Some(PathBuf::from("/var/data/raft")));
406 assert_eq!(cfg.metrics_addr, "0.0.0.0:9999");
407 }
408
409 /// Environment variables override the TOML-loaded values when
410 /// `apply_env_overrides` is called.
411 #[test]
412 fn test_config_env_override() {
413 // Isolate env-var changes from other tests by always cleaning up at the
414 // end and using the unique `AMATERS_` prefix so other tests cannot see
415 // these variables.
416 let mut cfg = NodeConfig::from_toml(MINIMAL_TOML).expect("valid TOML");
417
418 // Set env vars — unsafe in edition 2024 due to multi-thread unsafety.
419 // SAFETY: All variable names are prefixed with `AMATERS_` which is
420 // unique to this test suite. No other test in the binary sets
421 // these specific variables, so concurrent reads from other
422 // threads cannot observe a torn write.
423 unsafe {
424 std::env::set_var("AMATERS_BIND_ADDR", "10.0.0.1:9000");
425 std::env::set_var("AMATERS_NODE_ID", "42");
426 std::env::set_var("AMATERS_PEERS", "2=10.0.0.2:7001,3=10.0.0.3:7001");
427 std::env::set_var("AMATERS_HEARTBEAT_INTERVAL_MS", "75");
428 std::env::set_var("AMATERS_ELECTION_TIMEOUT_MS", "400");
429 std::env::set_var("AMATERS_COMPACTION_THRESHOLD", "2000");
430 std::env::set_var("AMATERS_METRICS_ADDR", "127.0.0.1:8080");
431 }
432
433 cfg.apply_env_overrides();
434
435 assert_eq!(cfg.bind_addr, "10.0.0.1:9000");
436 assert_eq!(cfg.node_id, 42);
437 assert_eq!(cfg.peers, vec!["2=10.0.0.2:7001", "3=10.0.0.3:7001"]);
438 assert_eq!(cfg.heartbeat_interval_ms, 75);
439 assert_eq!(cfg.election_timeout_ms, 400);
440 assert_eq!(cfg.compaction_threshold, 2000);
441 assert_eq!(cfg.metrics_addr, "127.0.0.1:8080");
442
443 // Clean up
444 // SAFETY: Same as the set_var block above — unique AMATERS_ prefix
445 // ensures no concurrent readers observe this removal.
446 unsafe {
447 std::env::remove_var("AMATERS_BIND_ADDR");
448 std::env::remove_var("AMATERS_NODE_ID");
449 std::env::remove_var("AMATERS_PEERS");
450 std::env::remove_var("AMATERS_HEARTBEAT_INTERVAL_MS");
451 std::env::remove_var("AMATERS_ELECTION_TIMEOUT_MS");
452 std::env::remove_var("AMATERS_COMPACTION_THRESHOLD");
453 std::env::remove_var("AMATERS_METRICS_ADDR");
454 }
455 }
456
457 /// A zero `node_id` must produce a validation error.
458 #[test]
459 fn test_config_validation_missing_field() {
460 let toml = r#"
461bind_addr = "0.0.0.0:7001"
462node_id = 0
463"#;
464 let cfg = NodeConfig::from_toml(toml).expect("parse should succeed");
465 let errors = cfg.validate();
466
467 assert!(
468 !errors.is_empty(),
469 "expected validation errors for node_id = 0"
470 );
471 let has_node_id_error = errors
472 .iter()
473 .any(|e| matches!(e, ConfigError::Validation { field, .. } if field == "node_id"));
474 assert!(
475 has_node_id_error,
476 "expected a Validation error for 'node_id'"
477 );
478 }
479
480 /// When `election_timeout_ms < 2 * heartbeat_interval_ms`, validation must
481 /// report an out-of-range error on `election_timeout_ms`.
482 #[test]
483 fn test_config_validation_out_of_range() {
484 let toml = r#"
485bind_addr = "0.0.0.0:7001"
486node_id = 1
487heartbeat_interval_ms = 200
488election_timeout_ms = 300
489"#;
490 // 300 < 2 * 200 = 400 → must fail validation
491 let cfg = NodeConfig::from_toml(toml).expect("parse should succeed");
492 let errors = cfg.validate();
493
494 assert!(
495 !errors.is_empty(),
496 "expected validation error: election_timeout_ms 300 < 2*200 = 400"
497 );
498 let has_timeout_error = errors.iter().any(|e| {
499 matches!(e, ConfigError::Validation { field, .. } if field == "election_timeout_ms")
500 });
501 assert!(
502 has_timeout_error,
503 "expected a Validation error for 'election_timeout_ms'"
504 );
505 }
506
507 /// A fully valid config must produce zero validation errors.
508 #[test]
509 fn test_config_validation_passes_for_valid_config() {
510 let cfg = NodeConfig::from_toml(MINIMAL_TOML).expect("valid TOML");
511 let errors = cfg.validate();
512 assert!(
513 errors.is_empty(),
514 "expected no validation errors, got: {:?}",
515 errors
516 );
517 }
518
519 /// `dynamic()` returns the hot-reloadable subset with matching values.
520 #[test]
521 fn test_config_dynamic_extraction() {
522 let toml = r#"
523bind_addr = "0.0.0.0:7001"
524node_id = 1
525heartbeat_interval_ms = 100
526compaction_threshold = 5000
527"#;
528 let cfg = NodeConfig::from_toml(toml).expect("valid TOML");
529 let dyn_cfg = cfg.dynamic();
530
531 assert_eq!(dyn_cfg.heartbeat_interval_ms, 100);
532 assert_eq!(dyn_cfg.compaction_threshold, 5000);
533 }
534
535 /// Verifies that `NodeConfig::load` reads from an actual temp file.
536 ///
537 /// We verify the raw file-parse path (no env overrides) using
538 /// `from_toml(file_contents)` so that a concurrently-running env-var test
539 /// cannot contaminate this assertion. A separate smoke-test path calls
540 /// `NodeConfig::load` to ensure the function returns `Ok` for a valid file
541 /// (we do not assert field values there because env overrides may be active).
542 #[test]
543 fn test_config_load_from_file() {
544 let dir = std::env::temp_dir();
545 let path = dir.join("amaters_cluster_test_config_load.toml");
546 std::fs::write(&path, MINIMAL_TOML).expect("write temp config");
547
548 // Verify raw TOML parse from the file contents (no env override path)
549 let raw = std::fs::read_to_string(&path).expect("read temp config");
550 let cfg = NodeConfig::from_toml(&raw).expect("parse TOML from file");
551 assert_eq!(cfg.bind_addr, "0.0.0.0:7001");
552 assert_eq!(cfg.node_id, 1);
553
554 // Verify that NodeConfig::load itself succeeds (env overrides are fine
555 // here — we just check it doesn't error out).
556 NodeConfig::load(&path).expect("load() must succeed for a valid file");
557
558 // Clean up
559 let _ = std::fs::remove_file(&path);
560 }
561}