Skip to main content

oxirs_cluster/
node_monitor.rs

1//! Cluster node health monitoring with heartbeat tracking.
2//!
3//! A [`NodeMonitor`] tracks the liveness of cluster nodes by recording
4//! incoming heartbeats and timing out nodes that have been silent for longer
5//! than a configurable `timeout_ms`.
6
7use std::collections::HashMap;
8
9// ── Node metadata ─────────────────────────────────────────────────────────────
10
11/// Role a node plays in the cluster.
12#[derive(Debug, Clone, PartialEq, Eq)]
13pub enum NodeRole {
14    Leader,
15    Follower,
16    Candidate,
17    Observer,
18}
19
20/// Static information about a registered node.
21#[derive(Debug, Clone)]
22pub struct NodeInfo {
23    /// Unique identifier for this node.
24    pub id: String,
25    /// Network address (e.g. `"10.0.0.1:7000"`).
26    pub address: String,
27    /// Role of this node when it was registered.
28    pub role: NodeRole,
29    /// Unix timestamp (ms) when the node joined the cluster.
30    pub joined_at: u64,
31}
32
33impl NodeInfo {
34    /// Create a new NodeInfo.
35    pub fn new(
36        id: impl Into<String>,
37        address: impl Into<String>,
38        role: NodeRole,
39        joined_at: u64,
40    ) -> Self {
41        Self {
42            id: id.into(),
43            address: address.into(),
44            role,
45            joined_at,
46        }
47    }
48}
49
50// ── Node state ────────────────────────────────────────────────────────────────
51
52/// Current health state of a node, updated by heartbeat arrival and timeout detection.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub enum NodeState {
55    /// Heartbeats are arriving within the timeout window.
56    Alive,
57    /// The node has missed at least one heartbeat interval — may be slow or partitioned.
58    Suspected,
59    /// The node has not been heard from for longer than `timeout_ms`.
60    Dead,
61}
62
63// ── Heartbeat record ──────────────────────────────────────────────────────────
64
65/// A single heartbeat receipt.
66#[derive(Debug, Clone, PartialEq)]
67pub struct HeartbeatRecord {
68    /// ID of the node that sent the heartbeat.
69    pub node_id: String,
70    /// Unix timestamp (ms) when the heartbeat was received.
71    pub received_at: u64,
72    /// Round-trip latency in milliseconds.
73    pub latency_ms: u64,
74}
75
76// ── Monitor ───────────────────────────────────────────────────────────────────
77
78/// Tracks node health based on heartbeat records and configurable timeouts.
79pub struct NodeMonitor {
80    nodes: HashMap<String, NodeInfo>,
81    states: HashMap<String, NodeState>,
82    heartbeats: HashMap<String, Vec<HeartbeatRecord>>,
83    timeout_ms: u64,
84    max_history: usize,
85}
86
87impl NodeMonitor {
88    /// Create a new monitor.
89    ///
90    /// - `timeout_ms` — a node is considered dead when no heartbeat has been
91    ///   received for this many milliseconds.
92    pub fn new(timeout_ms: u64) -> Self {
93        Self {
94            nodes: HashMap::new(),
95            states: HashMap::new(),
96            heartbeats: HashMap::new(),
97            timeout_ms,
98            max_history: 100,
99        }
100    }
101
102    /// Set the maximum heartbeat history to retain per node.
103    pub fn with_max_history(mut self, max_history: usize) -> Self {
104        self.max_history = max_history;
105        self
106    }
107
108    /// Register a new node.  Overwrites any previous registration for the same ID.
109    pub fn register(&mut self, node: NodeInfo) {
110        self.states.insert(node.id.clone(), NodeState::Alive);
111        self.heartbeats.entry(node.id.clone()).or_default();
112        self.nodes.insert(node.id.clone(), node);
113    }
114
115    /// Record a heartbeat received from a node.
116    ///
117    /// Sets the node's state to `Alive` and appends the record to its history.
118    /// Returns `false` if the node is not registered.
119    pub fn record_heartbeat(&mut self, node_id: &str, received_at: u64, latency_ms: u64) -> bool {
120        if !self.nodes.contains_key(node_id) {
121            return false;
122        }
123        self.states.insert(node_id.to_string(), NodeState::Alive);
124        let records = self.heartbeats.entry(node_id.to_string()).or_default();
125        records.push(HeartbeatRecord {
126            node_id: node_id.to_string(),
127            received_at,
128            latency_ms,
129        });
130        // Trim to max history.
131        if records.len() > self.max_history {
132            let drain_count = records.len() - self.max_history;
133            records.drain(..drain_count);
134        }
135        true
136    }
137
138    /// Scan all registered nodes and mark those whose last heartbeat is older
139    /// than `timeout_ms` as `Dead`.
140    ///
141    /// Returns the IDs of nodes that transitioned to `Dead` in this call.
142    pub fn check_timeouts(&mut self, now: u64) -> Vec<String> {
143        let mut timed_out = Vec::new();
144        for (id, records) in &self.heartbeats {
145            let last_seen = records.last().map(|r| r.received_at).unwrap_or(0);
146            let elapsed = now.saturating_sub(last_seen);
147            if elapsed >= self.timeout_ms {
148                if self.states.get(id) != Some(&NodeState::Dead) {
149                    timed_out.push(id.clone());
150                }
151            }
152        }
153        for id in &timed_out {
154            self.states.insert(id.clone(), NodeState::Dead);
155        }
156        timed_out
157    }
158
159    /// Return the current state of a node.
160    pub fn state(&self, node_id: &str) -> Option<&NodeState> {
161        self.states.get(node_id)
162    }
163
164    /// Return references to all nodes whose state is `Alive`.
165    pub fn alive_nodes(&self) -> Vec<&NodeInfo> {
166        self.nodes
167            .values()
168            .filter(|n| self.states.get(&n.id) == Some(&NodeState::Alive))
169            .collect()
170    }
171
172    /// Return references to all nodes whose state is `Dead`.
173    pub fn dead_nodes(&self) -> Vec<&NodeInfo> {
174        self.nodes
175            .values()
176            .filter(|n| self.states.get(&n.id) == Some(&NodeState::Dead))
177            .collect()
178    }
179
180    /// Compute the average heartbeat latency for a node.
181    ///
182    /// Returns `None` if the node has no heartbeat history.
183    pub fn avg_latency(&self, node_id: &str) -> Option<f64> {
184        let records = self.heartbeats.get(node_id)?;
185        if records.is_empty() {
186            return None;
187        }
188        let sum: u64 = records.iter().map(|r| r.latency_ms).sum();
189        Some(sum as f64 / records.len() as f64)
190    }
191
192    /// Total number of registered nodes.
193    pub fn node_count(&self) -> usize {
194        self.nodes.len()
195    }
196
197    /// Deregister a node.  Returns `true` if it existed.
198    pub fn remove(&mut self, node_id: &str) -> bool {
199        if self.nodes.remove(node_id).is_some() {
200            self.states.remove(node_id);
201            self.heartbeats.remove(node_id);
202            true
203        } else {
204            false
205        }
206    }
207
208    /// Return the most recent heartbeat record for a node, if any.
209    pub fn last_heartbeat(&self, node_id: &str) -> Option<&HeartbeatRecord> {
210        self.heartbeats.get(node_id)?.last()
211    }
212
213    /// Number of heartbeat records stored for a node.
214    pub fn heartbeat_count(&self, node_id: &str) -> usize {
215        self.heartbeats.get(node_id).map_or(0, |v| v.len())
216    }
217}
218
219// ── Tests ─────────────────────────────────────────────────────────────────────
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224
225    fn follower(id: &str) -> NodeInfo {
226        NodeInfo::new(id, "127.0.0.1:7000", NodeRole::Follower, 0)
227    }
228
229    fn monitor() -> NodeMonitor {
230        NodeMonitor::new(5000) // 5-second timeout
231    }
232
233    // ── register / node_count ─────────────────────────────────────────────────
234
235    #[test]
236    fn test_new_empty() {
237        let m = monitor();
238        assert_eq!(m.node_count(), 0);
239    }
240
241    #[test]
242    fn test_register_increments_count() {
243        let mut m = monitor();
244        m.register(follower("n1"));
245        assert_eq!(m.node_count(), 1);
246    }
247
248    #[test]
249    fn test_register_multiple_nodes() {
250        let mut m = monitor();
251        m.register(follower("n1"));
252        m.register(follower("n2"));
253        m.register(follower("n3"));
254        assert_eq!(m.node_count(), 3);
255    }
256
257    #[test]
258    fn test_register_overwrites_same_id() {
259        let mut m = monitor();
260        m.register(NodeInfo::new("n1", "addr1", NodeRole::Follower, 0));
261        m.register(NodeInfo::new("n1", "addr2", NodeRole::Leader, 100));
262        assert_eq!(m.node_count(), 1);
263        assert_eq!(m.nodes["n1"].address, "addr2");
264    }
265
266    #[test]
267    fn test_new_node_state_is_alive() {
268        let mut m = monitor();
269        m.register(follower("n1"));
270        assert_eq!(m.state("n1"), Some(&NodeState::Alive));
271    }
272
273    // ── record_heartbeat ──────────────────────────────────────────────────────
274
275    #[test]
276    fn test_record_heartbeat_returns_true_for_known_node() {
277        let mut m = monitor();
278        m.register(follower("n1"));
279        assert!(m.record_heartbeat("n1", 1000, 5));
280    }
281
282    #[test]
283    fn test_record_heartbeat_returns_false_for_unknown_node() {
284        let mut m = monitor();
285        assert!(!m.record_heartbeat("unknown", 1000, 5));
286    }
287
288    #[test]
289    fn test_record_heartbeat_sets_alive() {
290        let mut m = NodeMonitor::new(1000);
291        m.register(follower("n1"));
292        m.states.insert("n1".to_string(), NodeState::Dead); // manually set dead
293        m.record_heartbeat("n1", 1000, 5);
294        assert_eq!(m.state("n1"), Some(&NodeState::Alive));
295    }
296
297    #[test]
298    fn test_record_heartbeat_increments_history() {
299        let mut m = monitor();
300        m.register(follower("n1"));
301        m.record_heartbeat("n1", 1000, 5);
302        m.record_heartbeat("n1", 2000, 6);
303        assert_eq!(m.heartbeat_count("n1"), 2);
304    }
305
306    // ── check_timeouts ────────────────────────────────────────────────────────
307
308    #[test]
309    fn test_check_timeouts_no_timeout_within_window() {
310        let mut m = NodeMonitor::new(5000);
311        m.register(follower("n1"));
312        m.record_heartbeat("n1", 1000, 5);
313        let timed_out = m.check_timeouts(5999); // 4999 ms elapsed < 5000
314        assert!(timed_out.is_empty());
315    }
316
317    #[test]
318    fn test_check_timeouts_marks_dead() {
319        let mut m = NodeMonitor::new(5000);
320        m.register(follower("n1"));
321        m.record_heartbeat("n1", 1000, 5);
322        let timed_out = m.check_timeouts(6001); // 5001 ms >= 5000
323        assert!(timed_out.contains(&"n1".to_string()));
324        assert_eq!(m.state("n1"), Some(&NodeState::Dead));
325    }
326
327    #[test]
328    fn test_check_timeouts_no_heartbeat_marks_dead() {
329        let mut m = NodeMonitor::new(5000);
330        m.register(follower("n1"));
331        // No heartbeat recorded; last_seen defaults to 0.
332        let timed_out = m.check_timeouts(5000);
333        assert!(timed_out.contains(&"n1".to_string()));
334    }
335
336    #[test]
337    fn test_check_timeouts_already_dead_not_returned_again() {
338        let mut m = NodeMonitor::new(5000);
339        m.register(follower("n1"));
340        m.check_timeouts(6000); // mark dead
341        let timed_out2 = m.check_timeouts(7000); // already dead → not returned again
342        assert!(!timed_out2.contains(&"n1".to_string()));
343    }
344
345    // ── alive_nodes / dead_nodes ──────────────────────────────────────────────
346
347    #[test]
348    fn test_alive_nodes_all_alive() {
349        let mut m = monitor();
350        m.register(follower("n1"));
351        m.register(follower("n2"));
352        assert_eq!(m.alive_nodes().len(), 2);
353    }
354
355    #[test]
356    fn test_dead_nodes_empty_initially() {
357        let mut m = monitor();
358        m.register(follower("n1"));
359        assert_eq!(m.dead_nodes().len(), 0);
360    }
361
362    #[test]
363    fn test_alive_dead_after_timeout() {
364        let mut m = NodeMonitor::new(1000);
365        m.register(follower("n1"));
366        m.register(follower("n2"));
367        m.record_heartbeat("n1", 0, 5);
368        m.record_heartbeat("n2", 0, 5);
369        m.check_timeouts(1001); // both time out
370        assert_eq!(m.dead_nodes().len(), 2);
371        assert_eq!(m.alive_nodes().len(), 0);
372    }
373
374    // ── avg_latency ───────────────────────────────────────────────────────────
375
376    #[test]
377    fn test_avg_latency_none_for_no_history() {
378        let mut m = monitor();
379        m.register(follower("n1"));
380        assert!(m.avg_latency("n1").is_none());
381    }
382
383    #[test]
384    fn test_avg_latency_single_record() {
385        let mut m = monitor();
386        m.register(follower("n1"));
387        m.record_heartbeat("n1", 1000, 10);
388        assert_eq!(m.avg_latency("n1"), Some(10.0));
389    }
390
391    #[test]
392    fn test_avg_latency_multiple_records() {
393        let mut m = monitor();
394        m.register(follower("n1"));
395        m.record_heartbeat("n1", 1000, 10);
396        m.record_heartbeat("n1", 2000, 20);
397        m.record_heartbeat("n1", 3000, 30);
398        // avg = (10+20+30)/3 = 20
399        assert!((m.avg_latency("n1").unwrap() - 20.0).abs() < 0.001);
400    }
401
402    #[test]
403    fn test_avg_latency_unknown_node_none() {
404        let m = monitor();
405        assert!(m.avg_latency("unknown").is_none());
406    }
407
408    // ── remove ────────────────────────────────────────────────────────────────
409
410    #[test]
411    fn test_remove_existing_returns_true() {
412        let mut m = monitor();
413        m.register(follower("n1"));
414        assert!(m.remove("n1"));
415    }
416
417    #[test]
418    fn test_remove_decrements_count() {
419        let mut m = monitor();
420        m.register(follower("n1"));
421        m.remove("n1");
422        assert_eq!(m.node_count(), 0);
423    }
424
425    #[test]
426    fn test_remove_missing_returns_false() {
427        let mut m = monitor();
428        assert!(!m.remove("nobody"));
429    }
430
431    #[test]
432    fn test_remove_clears_state() {
433        let mut m = monitor();
434        m.register(follower("n1"));
435        m.remove("n1");
436        assert_eq!(m.state("n1"), None);
437    }
438
439    // ── roles ─────────────────────────────────────────────────────────────────
440
441    #[test]
442    fn test_leader_role_preserved() {
443        let mut m = monitor();
444        m.register(NodeInfo::new(
445            "leader",
446            "10.0.0.1:7000",
447            NodeRole::Leader,
448            0,
449        ));
450        assert_eq!(m.nodes["leader"].role, NodeRole::Leader);
451    }
452
453    #[test]
454    fn test_observer_role_preserved() {
455        let mut m = monitor();
456        m.register(NodeInfo::new("obs", "10.0.0.2:7000", NodeRole::Observer, 0));
457        assert_eq!(m.nodes["obs"].role, NodeRole::Observer);
458    }
459
460    // ── last_heartbeat ────────────────────────────────────────────────────────
461
462    #[test]
463    fn test_last_heartbeat_none_if_no_history() {
464        let mut m = monitor();
465        m.register(follower("n1"));
466        assert!(m.last_heartbeat("n1").is_none());
467    }
468
469    #[test]
470    fn test_last_heartbeat_returns_most_recent() {
471        let mut m = monitor();
472        m.register(follower("n1"));
473        m.record_heartbeat("n1", 1000, 5);
474        m.record_heartbeat("n1", 2000, 7);
475        let hb = m.last_heartbeat("n1").expect("should have record");
476        assert_eq!(hb.received_at, 2000);
477    }
478
479    // ── heartbeat_count ───────────────────────────────────────────────────────
480
481    #[test]
482    fn test_heartbeat_count_zero_initially() {
483        let mut m = monitor();
484        m.register(follower("n1"));
485        assert_eq!(m.heartbeat_count("n1"), 0);
486    }
487
488    #[test]
489    fn test_heartbeat_count_increments() {
490        let mut m = monitor();
491        m.register(follower("n1"));
492        m.record_heartbeat("n1", 1000, 5);
493        m.record_heartbeat("n1", 2000, 5);
494        assert_eq!(m.heartbeat_count("n1"), 2);
495    }
496
497    #[test]
498    fn test_max_history_trims_records() {
499        let mut m = NodeMonitor::new(5000).with_max_history(3);
500        m.register(follower("n1"));
501        for i in 0..10u64 {
502            m.record_heartbeat("n1", i * 100, 5);
503        }
504        assert_eq!(m.heartbeat_count("n1"), 3);
505    }
506
507    // ── Additional coverage ───────────────────────────────────────────────────
508
509    #[test]
510    fn test_node_info_fields() {
511        let n = NodeInfo::new("id1", "10.0.0.1:7000", NodeRole::Candidate, 42);
512        assert_eq!(n.id, "id1");
513        assert_eq!(n.address, "10.0.0.1:7000");
514        assert_eq!(n.role, NodeRole::Candidate);
515        assert_eq!(n.joined_at, 42);
516    }
517
518    #[test]
519    fn test_node_state_suspected() {
520        let state = NodeState::Suspected;
521        assert_eq!(state, NodeState::Suspected);
522    }
523
524    #[test]
525    fn test_heartbeat_record_fields() {
526        let hb = HeartbeatRecord {
527            node_id: "n1".to_string(),
528            received_at: 999,
529            latency_ms: 12,
530        };
531        assert_eq!(hb.node_id, "n1");
532        assert_eq!(hb.received_at, 999);
533        assert_eq!(hb.latency_ms, 12);
534    }
535
536    #[test]
537    fn test_avg_latency_zero_latency() {
538        let mut m = monitor();
539        m.register(follower("n1"));
540        m.record_heartbeat("n1", 1000, 0);
541        assert_eq!(m.avg_latency("n1"), Some(0.0));
542    }
543
544    #[test]
545    fn test_check_timeouts_at_exact_boundary() {
546        let mut m = NodeMonitor::new(5000);
547        m.register(follower("n1"));
548        m.record_heartbeat("n1", 1000, 5);
549        // elapsed = 6000 - 1000 = 5000 >= 5000 → times out
550        let timed_out = m.check_timeouts(6000);
551        assert!(timed_out.contains(&"n1".to_string()));
552    }
553
554    #[test]
555    fn test_multiple_registrations_independent() {
556        let mut m = monitor();
557        m.register(NodeInfo::new("a", "addr_a", NodeRole::Leader, 0));
558        m.register(NodeInfo::new("b", "addr_b", NodeRole::Follower, 0));
559        assert_eq!(m.node_count(), 2);
560        assert!(m.state("a").is_some());
561        assert!(m.state("b").is_some());
562    }
563
564    #[test]
565    fn test_remove_then_re_register() {
566        let mut m = monitor();
567        m.register(follower("n1"));
568        m.remove("n1");
569        m.register(NodeInfo::new("n1", "new_addr", NodeRole::Leader, 100));
570        assert_eq!(m.node_count(), 1);
571    }
572
573    #[test]
574    fn test_alive_nodes_excludes_dead() {
575        let mut m = NodeMonitor::new(1000);
576        m.register(follower("n1"));
577        m.register(follower("n2"));
578        m.record_heartbeat("n1", 0, 5);
579        m.record_heartbeat("n2", 0, 5);
580        m.check_timeouts(1001); // both timeout
581        assert_eq!(m.alive_nodes().len(), 0);
582    }
583
584    #[test]
585    fn test_check_timeouts_partial() {
586        let mut m = NodeMonitor::new(5000);
587        m.register(follower("n1"));
588        m.register(follower("n2"));
589        m.record_heartbeat("n1", 1000, 5); // will timeout
590        m.record_heartbeat("n2", 5000, 5); // will not timeout
591        let timed_out = m.check_timeouts(6001); // n1: 5001ms; n2: 1001ms
592        assert!(timed_out.contains(&"n1".to_string()));
593        assert!(!timed_out.contains(&"n2".to_string()));
594    }
595
596    #[test]
597    fn test_heartbeat_count_unknown_node() {
598        let m = monitor();
599        assert_eq!(m.heartbeat_count("ghost"), 0);
600    }
601
602    #[test]
603    fn test_state_unknown_node_none() {
604        let m = monitor();
605        assert_eq!(m.state("unknown"), None);
606    }
607
608    #[test]
609    fn test_last_heartbeat_unknown_node_none() {
610        let m = monitor();
611        assert_eq!(m.last_heartbeat("unknown"), None);
612    }
613
614    #[test]
615    fn test_node_monitor_new_timeout() {
616        let m = NodeMonitor::new(3000);
617        assert_eq!(m.timeout_ms, 3000);
618    }
619
620    #[test]
621    fn test_record_heartbeat_for_multiple_nodes() {
622        let mut m = monitor();
623        m.register(follower("n1"));
624        m.register(follower("n2"));
625        assert!(m.record_heartbeat("n1", 1000, 5));
626        assert!(m.record_heartbeat("n2", 2000, 10));
627        assert_eq!(m.heartbeat_count("n1"), 1);
628        assert_eq!(m.heartbeat_count("n2"), 1);
629    }
630}