Skip to main content

amaters_cluster/
config.rs

1//! Cluster node configuration: TOML file + environment variable overrides + dynamic reload.
2//!
3//! # Usage
4//!
5//! ```rust,no_run
6//! use amaters_cluster::config::NodeConfig;
7//!
8//! let cfg = NodeConfig::from_toml(r#"
9//!     bind_addr = "0.0.0.0:7001"
10//!     node_id = 1
11//!     "#).expect("valid config");
12//! assert_eq!(cfg.node_id, 1);
13//! ```
14//!
15//! # Hot-reloadable fields
16//!
17//! The following fields can be updated at runtime without restarting the node
18//! (accessible via [`NodeConfig::dynamic`]):
19//!
20//! - `heartbeat_interval_ms`
21//! - `compaction_threshold`
22//!
23//! Fields that require a full restart:
24//!
25//! - `bind_addr`
26//! - `node_id`
27//! - `peers`
28//! - `election_timeout_ms`
29//! - `data_dir`
30//! - `metrics_addr`
31
32use serde::{Deserialize, Serialize};
33use std::path::PathBuf;
34
35// ---------------------------------------------------------------------------
36// Default helpers (used by serde)
37// ---------------------------------------------------------------------------
38
39fn default_heartbeat_ms() -> u64 {
40    150
41}
42
43fn default_election_timeout_ms() -> u64 {
44    300
45}
46
47fn default_compaction_threshold() -> usize {
48    10_000
49}
50
51fn default_metrics_addr() -> String {
52    "0.0.0.0:9091".to_string()
53}
54
55fn default_key_retention_count() -> usize {
56    3
57}
58
59// ---------------------------------------------------------------------------
60// NodeConfig
61// ---------------------------------------------------------------------------
62
63/// Full cluster node configuration.
64///
65/// Can be loaded from a TOML file with [`NodeConfig::load`] or from a TOML
66/// string with [`NodeConfig::from_toml`].  After loading, call
67/// [`NodeConfig::apply_env_overrides`] to layer environment variable
68/// overrides on top.
69///
70/// # Field restart requirements
71///
72/// | Field | Hot-reloadable |
73/// |-------|---------------|
74/// | `heartbeat_interval_ms` | Yes (via [`DynamicConfig`]) |
75/// | `compaction_threshold` | Yes (via [`DynamicConfig`]) |
76/// | All other fields | No — requires restart |
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct NodeConfig {
79    /// Bind address for Raft RPC server (e.g. `"0.0.0.0:7001"`).
80    pub bind_addr: String,
81
82    /// Node ID — must be unique across the cluster and `> 0`.
83    pub node_id: u64,
84
85    /// Peer addresses as `"node_id=addr"` strings, e.g. `["2=10.0.0.2:7001"]`.
86    #[serde(default)]
87    pub peers: Vec<String>,
88
89    /// Raft heartbeat interval in milliseconds (default 150).
90    ///
91    /// Hot-reloadable.
92    #[serde(default = "default_heartbeat_ms")]
93    pub heartbeat_interval_ms: u64,
94
95    /// Raft election timeout in milliseconds (default 300).
96    ///
97    /// Must be `>= 2 * heartbeat_interval_ms`.  Requires restart to change.
98    #[serde(default = "default_election_timeout_ms")]
99    pub election_timeout_ms: u64,
100
101    /// Log compaction threshold: number of entries before triggering a
102    /// snapshot (default 10 000).
103    ///
104    /// Hot-reloadable.
105    #[serde(default = "default_compaction_threshold")]
106    pub compaction_threshold: usize,
107
108    /// Data directory for Raft log persistence.  `None` disables persistence.
109    #[serde(default)]
110    pub data_dir: Option<PathBuf>,
111
112    /// Metrics HTTP endpoint address (default `"0.0.0.0:9091"`).
113    ///
114    /// Requires restart to change.
115    #[serde(default = "default_metrics_addr")]
116    pub metrics_addr: String,
117
118    /// Optional automatic-rotation interval for the log-encryption master
119    /// key, in seconds.  `None` disables time-based rotation; rotation can
120    /// still be triggered manually via [`crate::key_rotation::KeyManager::rotate`].
121    ///
122    /// The background tokio task that drives time-based rotation is
123    /// **deferred** to a future cycle; this field reserves the
124    /// configuration surface for when it lands.
125    #[serde(default)]
126    pub key_rotation_interval_secs: Option<u64>,
127
128    /// Number of [`crate::key_rotation::KeyVersion`]s the
129    /// [`crate::key_rotation::KeyManager`] retains in its history (current
130    /// and previous keys). Default 3. Lower values reclaim memory faster
131    /// at the cost of decrypting fewer historical entries after rotation.
132    #[serde(default = "default_key_retention_count")]
133    pub key_retention_count: usize,
134}
135
136impl NodeConfig {
137    /// Load from a TOML file, then layer environment variable overrides.
138    ///
139    /// Env vars override individual fields:
140    ///
141    /// | Env var | Field |
142    /// |---------|-------|
143    /// | `AMATERS_BIND_ADDR` | `bind_addr` |
144    /// | `AMATERS_NODE_ID` | `node_id` |
145    /// | `AMATERS_PEERS` | `peers` (comma-separated `id=addr` pairs) |
146    /// | `AMATERS_HEARTBEAT_INTERVAL_MS` | `heartbeat_interval_ms` |
147    /// | `AMATERS_ELECTION_TIMEOUT_MS` | `election_timeout_ms` |
148    /// | `AMATERS_COMPACTION_THRESHOLD` | `compaction_threshold` |
149    /// | `AMATERS_DATA_DIR` | `data_dir` |
150    /// | `AMATERS_METRICS_ADDR` | `metrics_addr` |
151    pub fn load(path: &std::path::Path) -> Result<Self, ConfigError> {
152        let raw = std::fs::read_to_string(path)?;
153        let mut cfg = Self::from_toml(&raw)?;
154        cfg.apply_env_overrides();
155        Ok(cfg)
156    }
157
158    /// Parse from a TOML string directly.
159    ///
160    /// Useful in tests and when the configuration is provided via a
161    /// secret store rather than a file on disk.  Does **not** apply env
162    /// overrides; call [`apply_env_overrides`](Self::apply_env_overrides) if
163    /// you also want those.
164    pub fn from_toml(toml_str: &str) -> Result<Self, ConfigError> {
165        let cfg: Self = toml::from_str(toml_str)?;
166        Ok(cfg)
167    }
168
169    /// Apply environment variable overrides from the current process
170    /// environment.
171    ///
172    /// Unset variables leave the corresponding field unchanged.
173    pub fn apply_env_overrides(&mut self) {
174        if let Ok(v) = std::env::var("AMATERS_BIND_ADDR") {
175            self.bind_addr = v;
176        }
177        if let Ok(v) = std::env::var("AMATERS_NODE_ID") {
178            if let Ok(n) = v.parse::<u64>() {
179                self.node_id = n;
180            }
181        }
182        if let Ok(v) = std::env::var("AMATERS_PEERS") {
183            // Comma-separated list of "node_id=addr" pairs
184            self.peers = v
185                .split(',')
186                .map(|s| s.trim().to_string())
187                .filter(|s| !s.is_empty())
188                .collect();
189        }
190        if let Ok(v) = std::env::var("AMATERS_HEARTBEAT_INTERVAL_MS") {
191            if let Ok(n) = v.parse::<u64>() {
192                self.heartbeat_interval_ms = n;
193            }
194        }
195        if let Ok(v) = std::env::var("AMATERS_ELECTION_TIMEOUT_MS") {
196            if let Ok(n) = v.parse::<u64>() {
197                self.election_timeout_ms = n;
198            }
199        }
200        if let Ok(v) = std::env::var("AMATERS_COMPACTION_THRESHOLD") {
201            if let Ok(n) = v.parse::<usize>() {
202                self.compaction_threshold = n;
203            }
204        }
205        if let Ok(v) = std::env::var("AMATERS_DATA_DIR") {
206            self.data_dir = Some(PathBuf::from(v));
207        }
208        if let Ok(v) = std::env::var("AMATERS_METRICS_ADDR") {
209            self.metrics_addr = v;
210        }
211        if let Ok(v) = std::env::var("AMATERS_KEY_ROTATION_INTERVAL_SECS") {
212            if let Ok(n) = v.parse::<u64>() {
213                self.key_rotation_interval_secs = Some(n);
214            }
215        }
216        if let Ok(v) = std::env::var("AMATERS_KEY_RETENTION_COUNT") {
217            if let Ok(n) = v.parse::<usize>() {
218                self.key_retention_count = n;
219            }
220        }
221    }
222
223    /// Extract the hot-reloadable subset of this configuration.
224    ///
225    /// The returned [`DynamicConfig`] can be stored in an
226    /// `Arc<parking_lot::RwLock<DynamicConfig>>` and updated in place when
227    /// the configuration is reloaded (e.g. on `SIGHUP` or via an admin RPC)
228    /// without restarting the node.
229    pub fn dynamic(&self) -> DynamicConfig {
230        DynamicConfig {
231            heartbeat_interval_ms: self.heartbeat_interval_ms,
232            compaction_threshold: self.compaction_threshold,
233        }
234    }
235
236    /// Validate configuration fields.
237    ///
238    /// Returns a list of [`ConfigError::Validation`] variants describing each
239    /// detected problem.  An empty return value means the configuration is
240    /// valid.
241    pub fn validate(&self) -> Vec<ConfigError> {
242        let mut errors = Vec::new();
243
244        if self.bind_addr.is_empty() {
245            errors.push(ConfigError::Validation {
246                field: "bind_addr".to_string(),
247                reason: "must not be empty".to_string(),
248            });
249        } else if !self.bind_addr.contains(':') {
250            errors.push(ConfigError::Validation {
251                field: "bind_addr".to_string(),
252                reason: "must contain a ':' separator (e.g. \"0.0.0.0:7001\")".to_string(),
253            });
254        }
255
256        if self.node_id == 0 {
257            errors.push(ConfigError::Validation {
258                field: "node_id".to_string(),
259                reason: "must be > 0 (0 is reserved as a sentinel)".to_string(),
260            });
261        }
262
263        if self.heartbeat_interval_ms == 0 {
264            errors.push(ConfigError::Validation {
265                field: "heartbeat_interval_ms".to_string(),
266                reason: "must be > 0".to_string(),
267            });
268        }
269
270        // election_timeout must be at least 2× the heartbeat interval so
271        // that followers have a realistic chance to receive a heartbeat
272        // before timing out.
273        if self.heartbeat_interval_ms > 0
274            && self.election_timeout_ms < 2 * self.heartbeat_interval_ms
275        {
276            errors.push(ConfigError::Validation {
277                field: "election_timeout_ms".to_string(),
278                reason: format!(
279                    "must be >= 2 * heartbeat_interval_ms ({} >= {})",
280                    self.election_timeout_ms,
281                    2 * self.heartbeat_interval_ms,
282                ),
283            });
284        }
285
286        errors
287    }
288}
289
290// ---------------------------------------------------------------------------
291// DynamicConfig
292// ---------------------------------------------------------------------------
293
294/// The subset of [`NodeConfig`] that can be hot-reloaded at runtime.
295///
296/// Store this in an `Arc<parking_lot::RwLock<DynamicConfig>>` inside the
297/// cluster node.  When the node receives a `SIGHUP` or an admin RPC requesting
298/// a config reload, parse a new [`NodeConfig`] and replace the inner value:
299///
300/// ```rust,ignore
301/// *dynamic_config.write() = new_node_config.dynamic();
302/// ```
303///
304/// The Raft event loop reads from `Arc<RwLock<DynamicConfig>>` for the
305/// heartbeat interval, so changes take effect on the **next tick** without
306/// restarting the node.
307#[derive(Debug, Clone)]
308pub struct DynamicConfig {
309    /// Raft heartbeat interval in milliseconds.
310    pub heartbeat_interval_ms: u64,
311    /// Log compaction threshold (entries before snapshot).
312    pub compaction_threshold: usize,
313}
314
315// ---------------------------------------------------------------------------
316// ConfigError
317// ---------------------------------------------------------------------------
318
319/// Errors that can occur while loading or validating a [`NodeConfig`].
320#[derive(Debug, thiserror::Error)]
321pub enum ConfigError {
322    /// The configuration file could not be read.
323    #[error("IO error: {0}")]
324    Io(#[from] std::io::Error),
325
326    /// The TOML source could not be parsed.
327    #[error("TOML parse error: {0}")]
328    TomlParse(#[from] toml::de::Error),
329
330    /// A field failed semantic validation.
331    #[error("Validation error: field '{field}' — {reason}")]
332    Validation { field: String, reason: String },
333}
334
335// ---------------------------------------------------------------------------
336// Tests
337// ---------------------------------------------------------------------------
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    const MINIMAL_TOML: &str = r#"
344bind_addr = "0.0.0.0:7001"
345node_id = 1
346"#;
347
348    /// Parsing a minimal TOML string fills required fields and applies
349    /// serde defaults for optional ones.
350    #[test]
351    fn test_config_from_toml() {
352        let cfg = NodeConfig::from_toml(MINIMAL_TOML).expect("valid TOML");
353
354        assert_eq!(cfg.bind_addr, "0.0.0.0:7001");
355        assert_eq!(cfg.node_id, 1);
356        assert!(cfg.peers.is_empty());
357        assert_eq!(cfg.heartbeat_interval_ms, 150);
358        assert_eq!(cfg.election_timeout_ms, 300);
359        assert_eq!(cfg.compaction_threshold, 10_000);
360        assert!(cfg.data_dir.is_none());
361        assert_eq!(cfg.metrics_addr, "0.0.0.0:9091");
362        assert!(
363            cfg.key_rotation_interval_secs.is_none(),
364            "rotation interval defaults to None (manual rotation only)"
365        );
366        assert_eq!(cfg.key_retention_count, 3);
367    }
368
369    /// Parsing a TOML with explicit `[cluster.encryption]`-style fields
370    /// applies them.
371    #[test]
372    fn test_config_encryption_fields_from_toml() {
373        let toml = r#"
374bind_addr = "0.0.0.0:7001"
375node_id = 1
376key_rotation_interval_secs = 86400
377key_retention_count = 5
378"#;
379        let cfg = NodeConfig::from_toml(toml).expect("valid TOML");
380        assert_eq!(cfg.key_rotation_interval_secs, Some(86_400));
381        assert_eq!(cfg.key_retention_count, 5);
382    }
383
384    /// Parsing a fully-specified TOML overrides all default values.
385    #[test]
386    fn test_config_from_toml_full() {
387        let toml = r#"
388bind_addr = "127.0.0.1:8001"
389node_id = 5
390peers = ["2=10.0.0.2:7001", "3=10.0.0.3:7001"]
391heartbeat_interval_ms = 50
392election_timeout_ms = 200
393compaction_threshold = 5000
394data_dir = "/var/data/raft"
395metrics_addr = "0.0.0.0:9999"
396"#;
397        let cfg = NodeConfig::from_toml(toml).expect("valid TOML");
398
399        assert_eq!(cfg.bind_addr, "127.0.0.1:8001");
400        assert_eq!(cfg.node_id, 5);
401        assert_eq!(cfg.peers, vec!["2=10.0.0.2:7001", "3=10.0.0.3:7001"]);
402        assert_eq!(cfg.heartbeat_interval_ms, 50);
403        assert_eq!(cfg.election_timeout_ms, 200);
404        assert_eq!(cfg.compaction_threshold, 5000);
405        assert_eq!(cfg.data_dir, Some(PathBuf::from("/var/data/raft")));
406        assert_eq!(cfg.metrics_addr, "0.0.0.0:9999");
407    }
408
409    /// Environment variables override the TOML-loaded values when
410    /// `apply_env_overrides` is called.
411    #[test]
412    fn test_config_env_override() {
413        // Isolate env-var changes from other tests by always cleaning up at the
414        // end and using the unique `AMATERS_` prefix so other tests cannot see
415        // these variables.
416        let mut cfg = NodeConfig::from_toml(MINIMAL_TOML).expect("valid TOML");
417
418        // Set env vars — unsafe in edition 2024 due to multi-thread unsafety.
419        // SAFETY: All variable names are prefixed with `AMATERS_` which is
420        //         unique to this test suite.  No other test in the binary sets
421        //         these specific variables, so concurrent reads from other
422        //         threads cannot observe a torn write.
423        unsafe {
424            std::env::set_var("AMATERS_BIND_ADDR", "10.0.0.1:9000");
425            std::env::set_var("AMATERS_NODE_ID", "42");
426            std::env::set_var("AMATERS_PEERS", "2=10.0.0.2:7001,3=10.0.0.3:7001");
427            std::env::set_var("AMATERS_HEARTBEAT_INTERVAL_MS", "75");
428            std::env::set_var("AMATERS_ELECTION_TIMEOUT_MS", "400");
429            std::env::set_var("AMATERS_COMPACTION_THRESHOLD", "2000");
430            std::env::set_var("AMATERS_METRICS_ADDR", "127.0.0.1:8080");
431        }
432
433        cfg.apply_env_overrides();
434
435        assert_eq!(cfg.bind_addr, "10.0.0.1:9000");
436        assert_eq!(cfg.node_id, 42);
437        assert_eq!(cfg.peers, vec!["2=10.0.0.2:7001", "3=10.0.0.3:7001"]);
438        assert_eq!(cfg.heartbeat_interval_ms, 75);
439        assert_eq!(cfg.election_timeout_ms, 400);
440        assert_eq!(cfg.compaction_threshold, 2000);
441        assert_eq!(cfg.metrics_addr, "127.0.0.1:8080");
442
443        // Clean up
444        // SAFETY: Same as the set_var block above — unique AMATERS_ prefix
445        //         ensures no concurrent readers observe this removal.
446        unsafe {
447            std::env::remove_var("AMATERS_BIND_ADDR");
448            std::env::remove_var("AMATERS_NODE_ID");
449            std::env::remove_var("AMATERS_PEERS");
450            std::env::remove_var("AMATERS_HEARTBEAT_INTERVAL_MS");
451            std::env::remove_var("AMATERS_ELECTION_TIMEOUT_MS");
452            std::env::remove_var("AMATERS_COMPACTION_THRESHOLD");
453            std::env::remove_var("AMATERS_METRICS_ADDR");
454        }
455    }
456
457    /// A zero `node_id` must produce a validation error.
458    #[test]
459    fn test_config_validation_missing_field() {
460        let toml = r#"
461bind_addr = "0.0.0.0:7001"
462node_id = 0
463"#;
464        let cfg = NodeConfig::from_toml(toml).expect("parse should succeed");
465        let errors = cfg.validate();
466
467        assert!(
468            !errors.is_empty(),
469            "expected validation errors for node_id = 0"
470        );
471        let has_node_id_error = errors
472            .iter()
473            .any(|e| matches!(e, ConfigError::Validation { field, .. } if field == "node_id"));
474        assert!(
475            has_node_id_error,
476            "expected a Validation error for 'node_id'"
477        );
478    }
479
480    /// When `election_timeout_ms < 2 * heartbeat_interval_ms`, validation must
481    /// report an out-of-range error on `election_timeout_ms`.
482    #[test]
483    fn test_config_validation_out_of_range() {
484        let toml = r#"
485bind_addr = "0.0.0.0:7001"
486node_id = 1
487heartbeat_interval_ms = 200
488election_timeout_ms = 300
489"#;
490        // 300 < 2 * 200 = 400 → must fail validation
491        let cfg = NodeConfig::from_toml(toml).expect("parse should succeed");
492        let errors = cfg.validate();
493
494        assert!(
495            !errors.is_empty(),
496            "expected validation error: election_timeout_ms 300 < 2*200 = 400"
497        );
498        let has_timeout_error = errors.iter().any(|e| {
499            matches!(e, ConfigError::Validation { field, .. } if field == "election_timeout_ms")
500        });
501        assert!(
502            has_timeout_error,
503            "expected a Validation error for 'election_timeout_ms'"
504        );
505    }
506
507    /// A fully valid config must produce zero validation errors.
508    #[test]
509    fn test_config_validation_passes_for_valid_config() {
510        let cfg = NodeConfig::from_toml(MINIMAL_TOML).expect("valid TOML");
511        let errors = cfg.validate();
512        assert!(
513            errors.is_empty(),
514            "expected no validation errors, got: {:?}",
515            errors
516        );
517    }
518
519    /// `dynamic()` returns the hot-reloadable subset with matching values.
520    #[test]
521    fn test_config_dynamic_extraction() {
522        let toml = r#"
523bind_addr = "0.0.0.0:7001"
524node_id = 1
525heartbeat_interval_ms = 100
526compaction_threshold = 5000
527"#;
528        let cfg = NodeConfig::from_toml(toml).expect("valid TOML");
529        let dyn_cfg = cfg.dynamic();
530
531        assert_eq!(dyn_cfg.heartbeat_interval_ms, 100);
532        assert_eq!(dyn_cfg.compaction_threshold, 5000);
533    }
534
535    /// Verifies that `NodeConfig::load` reads from an actual temp file.
536    ///
537    /// We verify the raw file-parse path (no env overrides) using
538    /// `from_toml(file_contents)` so that a concurrently-running env-var test
539    /// cannot contaminate this assertion.  A separate smoke-test path calls
540    /// `NodeConfig::load` to ensure the function returns `Ok` for a valid file
541    /// (we do not assert field values there because env overrides may be active).
542    #[test]
543    fn test_config_load_from_file() {
544        let dir = std::env::temp_dir();
545        let path = dir.join("amaters_cluster_test_config_load.toml");
546        std::fs::write(&path, MINIMAL_TOML).expect("write temp config");
547
548        // Verify raw TOML parse from the file contents (no env override path)
549        let raw = std::fs::read_to_string(&path).expect("read temp config");
550        let cfg = NodeConfig::from_toml(&raw).expect("parse TOML from file");
551        assert_eq!(cfg.bind_addr, "0.0.0.0:7001");
552        assert_eq!(cfg.node_id, 1);
553
554        // Verify that NodeConfig::load itself succeeds (env overrides are fine
555        // here — we just check it doesn't error out).
556        NodeConfig::load(&path).expect("load() must succeed for a valid file");
557
558        // Clean up
559        let _ = std::fs::remove_file(&path);
560    }
561}