Skip to main content

amaters_cluster/
failover.rs

1//! Automatic failover coordination for Raft clusters.
2//!
3//! [`FailoverCoordinator`] wraps a [`FailureDetector`] and monitors the
4//! current leader.  When the failure detector reports that the leader has
5//! failed, the coordinator schedules an election with randomised jitter
6//! (to avoid thundering-herd simultaneous elections) and emits
7//! [`FailoverEvent`]s that the node event loop can act upon.
8//!
9//! Followers use [`FailoverCoordinator::leader_hint`] to redirect client
10//! requests to the current leader without requiring a full round-trip.
11
12use std::collections::hash_map::RandomState;
13use std::hash::BuildHasher;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use tracing::{debug, info, warn};
18
19use crate::error::{RaftError, RaftResult};
20use crate::heartbeat::FailureDetector;
21use crate::types::{FailureEvent, HeartbeatConfig, NodeId};
22
23// ── Events ──────────────────────────────────────────────────────────
24
25/// Events produced by the [`FailoverCoordinator`] during each tick.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum FailoverEvent {
28    /// The current leader has been detected as failed and an election has
29    /// been scheduled (after the jitter delay expires).
30    LeaderLost {
31        /// Node ID of the old leader.
32        old_leader: NodeId,
33        /// Whether an election timer was started as a result.
34        election_triggered: bool,
35    },
36    /// A new leader has been acknowledged (set via [`FailoverCoordinator::set_leader`]).
37    LeaderElected {
38        /// Node ID of the new leader.
39        new_leader: NodeId,
40    },
41    /// The election jitter timer expired without a new leader being set.
42    FailoverTimeout,
43    /// A non-leader peer has failed.
44    PeerFailed {
45        /// The failed peer.
46        node_id: NodeId,
47    },
48    /// A previously-failed peer has recovered.
49    PeerRecovered {
50        /// The recovered peer.
51        node_id: NodeId,
52    },
53}
54
55// ── Configuration ───────────────────────────────────────────────────
56
57/// Tuning knobs for the failover coordinator.
58#[derive(Debug, Clone)]
59pub struct FailoverConfig {
60    /// Minimum election jitter in milliseconds.
61    pub election_jitter_min_ms: u64,
62    /// Maximum election jitter in milliseconds.
63    pub election_jitter_max_ms: u64,
64    /// How many consecutive leader failure detections before triggering
65    /// an election.
66    pub max_consecutive_failures: u32,
67}
68
69impl FailoverConfig {
70    /// Create a new failover configuration.
71    pub fn new(
72        election_jitter_min_ms: u64,
73        election_jitter_max_ms: u64,
74        max_consecutive_failures: u32,
75    ) -> Self {
76        Self {
77            election_jitter_min_ms,
78            election_jitter_max_ms,
79            max_consecutive_failures,
80        }
81    }
82
83    /// Validate the configuration, returning an error message on failure.
84    pub fn validate(&self) -> Result<(), String> {
85        if self.election_jitter_min_ms == 0 {
86            return Err("election_jitter_min_ms must be > 0".to_string());
87        }
88        if self.election_jitter_max_ms <= self.election_jitter_min_ms {
89            return Err(format!(
90                "election_jitter_max_ms ({}) must be > election_jitter_min_ms ({})",
91                self.election_jitter_max_ms, self.election_jitter_min_ms,
92            ));
93        }
94        if self.max_consecutive_failures == 0 {
95            return Err("max_consecutive_failures must be > 0".to_string());
96        }
97        Ok(())
98    }
99
100    /// Generate a random jitter duration in `[min, max)`.
101    fn random_jitter(&self) -> Duration {
102        let range = self.election_jitter_max_ms - self.election_jitter_min_ms;
103        let now = std::time::SystemTime::now()
104            .duration_since(std::time::UNIX_EPOCH)
105            .map(|d| d.as_nanos())
106            .unwrap_or(0);
107        let random_value = RandomState::new().hash_one(now);
108        let jitter_ms = self.election_jitter_min_ms + (random_value % range);
109        Duration::from_millis(jitter_ms)
110    }
111}
112
113impl Default for FailoverConfig {
114    fn default() -> Self {
115        Self {
116            election_jitter_min_ms: 150,
117            election_jitter_max_ms: 300,
118            max_consecutive_failures: 3,
119        }
120    }
121}
122
123// ── Coordinator ─────────────────────────────────────────────────────
124
125/// Internal election timer state.
126#[derive(Debug)]
127enum ElectionTimer {
128    /// No election is pending.
129    Idle,
130    /// Waiting for jitter to expire before signalling the node to start
131    /// an election.
132    Pending {
133        /// When the timer was started.
134        started_at: Instant,
135        /// How long to wait.
136        jitter: Duration,
137    },
138    /// The jitter expired and we signalled the caller to start an election;
139    /// now we are waiting for a new leader to appear.
140    Fired {
141        /// When the timer fired.
142        fired_at: Instant,
143    },
144}
145
146/// Coordinates automatic leader failover.
147///
148/// # Usage
149///
150/// ```rust,ignore
151/// let hb_config = HeartbeatConfig::new(100, 500, 3);
152/// let fo_config = FailoverConfig::default();
153/// let mut coord = FailoverCoordinator::new(hb_config, fo_config, self_id);
154/// coord.track_peer(2)?;
155/// coord.track_peer(3)?;
156/// coord.set_leader(2);
157///
158/// // In the event loop:
159/// for event in coord.tick()? {
160///     match event {
161///         FailoverEvent::LeaderLost { .. } => { /* prepare for election */ }
162///         FailoverEvent::FailoverTimeout => { node.start_election(); }
163///         _ => {}
164///     }
165/// }
166/// ```
167pub struct FailoverCoordinator {
168    /// Underlying failure detector.
169    detector: FailureDetector,
170    /// Failover-specific configuration.
171    config: FailoverConfig,
172    /// This node's own ID.
173    self_id: NodeId,
174    /// Current known leader (None if unknown).
175    current_leader: Option<NodeId>,
176    /// Election timer state.
177    election_timer: ElectionTimer,
178    /// Number of consecutive ticks where the leader was detected as failed.
179    leader_failure_count: u32,
180}
181
182impl FailoverCoordinator {
183    /// Create a new failover coordinator.
184    pub fn new(
185        heartbeat_config: HeartbeatConfig,
186        failover_config: FailoverConfig,
187        self_id: NodeId,
188    ) -> Self {
189        Self {
190            detector: FailureDetector::new(heartbeat_config, self_id),
191            config: failover_config,
192            self_id,
193            current_leader: None,
194            election_timer: ElectionTimer::Idle,
195            leader_failure_count: 0,
196        }
197    }
198
199    // ── Peer management (delegates to FailureDetector) ──────────────
200
201    /// Begin tracking a peer.
202    pub fn track_peer(&mut self, peer_id: NodeId) -> RaftResult<()> {
203        self.detector.track_peer(peer_id)
204    }
205
206    /// Stop tracking a peer.
207    pub fn remove_peer(&mut self, peer_id: NodeId) {
208        self.detector.remove_peer(peer_id);
209        if self.current_leader == Some(peer_id) {
210            self.current_leader = None;
211        }
212    }
213
214    /// Record a heartbeat from a peer.
215    pub fn record_heartbeat(&mut self, peer_id: NodeId) -> RaftResult<()> {
216        self.detector.record_heartbeat(peer_id)
217    }
218
219    // ── Leader tracking ─────────────────────────────────────────────
220
221    /// Set the current known leader.
222    pub fn set_leader(&mut self, leader_id: NodeId) {
223        let changed = self.current_leader != Some(leader_id);
224        self.current_leader = Some(leader_id);
225        if changed {
226            self.leader_failure_count = 0;
227            self.election_timer = ElectionTimer::Idle;
228            debug!(
229                self_id = self.self_id,
230                leader_id = leader_id,
231                "FailoverCoordinator: leader updated"
232            );
233        }
234    }
235
236    /// Clear the current leader (e.g. after stepping down).
237    pub fn clear_leader(&mut self) {
238        self.current_leader = None;
239        self.leader_failure_count = 0;
240        self.election_timer = ElectionTimer::Idle;
241    }
242
243    /// Return the current known leader, useful for client redirects.
244    pub fn leader_hint(&self) -> Option<NodeId> {
245        self.current_leader
246    }
247
248    /// Returns `true` if this node should redirect clients to the current leader.
249    ///
250    /// A node should redirect when there is a known leader that is not this node.
251    /// If the leader is unknown (election in progress), returns `false` so that
252    /// the caller can try locally or return a generic "leader unknown" error.
253    pub fn should_redirect(&self, my_id: NodeId) -> bool {
254        match self.current_leader {
255            Some(leader) => leader != my_id,
256            None => false,
257        }
258    }
259
260    // ── Tick ────────────────────────────────────────────────────────
261
262    /// Advance the coordinator by one tick.
263    ///
264    /// Checks the underlying failure detector and processes any leader
265    /// failure / recovery events.  Returns a (possibly empty) list of
266    /// [`FailoverEvent`]s the caller should act upon.
267    pub fn tick(&mut self) -> RaftResult<Vec<FailoverEvent>> {
268        let failure_events = self.detector.check_timeouts()?;
269        let mut out = Vec::new();
270
271        for fe in &failure_events {
272            match fe {
273                FailureEvent::NodeFailed { node_id, .. } => {
274                    if Some(*node_id) == self.current_leader {
275                        self.leader_failure_count = self.leader_failure_count.saturating_add(1);
276                        let should_trigger =
277                            self.leader_failure_count >= self.config.max_consecutive_failures;
278
279                        if should_trigger {
280                            self.schedule_election();
281                        }
282
283                        info!(
284                            self_id = self.self_id,
285                            leader = node_id,
286                            failure_count = self.leader_failure_count,
287                            triggered = should_trigger,
288                            "Leader failure detected"
289                        );
290
291                        out.push(FailoverEvent::LeaderLost {
292                            old_leader: *node_id,
293                            election_triggered: should_trigger,
294                        });
295                    } else {
296                        out.push(FailoverEvent::PeerFailed { node_id: *node_id });
297                    }
298                }
299                FailureEvent::NodeRecovered { node_id } => {
300                    if Some(*node_id) == self.current_leader {
301                        // Leader came back — cancel any pending election timer.
302                        self.leader_failure_count = 0;
303                        self.election_timer = ElectionTimer::Idle;
304                        debug!(
305                            self_id = self.self_id,
306                            leader = node_id,
307                            "Leader recovered, election timer cancelled"
308                        );
309                    }
310                    out.push(FailoverEvent::PeerRecovered { node_id: *node_id });
311                }
312            }
313        }
314
315        // Check election timer
316        match &self.election_timer {
317            ElectionTimer::Pending { started_at, jitter } => {
318                if started_at.elapsed() >= *jitter {
319                    info!(
320                        self_id = self.self_id,
321                        jitter_ms = jitter.as_millis() as u64,
322                        "Election jitter expired, triggering failover"
323                    );
324                    self.election_timer = ElectionTimer::Fired {
325                        fired_at: Instant::now(),
326                    };
327                    out.push(FailoverEvent::FailoverTimeout);
328                }
329            }
330            ElectionTimer::Fired { .. } | ElectionTimer::Idle => {}
331        }
332
333        Ok(out)
334    }
335
336    /// Reset all state (e.g. when this node becomes leader).
337    pub fn reset(&mut self) {
338        self.detector.reset_all();
339        self.leader_failure_count = 0;
340        self.election_timer = ElectionTimer::Idle;
341    }
342
343    /// Return the IDs of peers currently considered failed.
344    pub fn failed_peers(&self) -> Vec<NodeId> {
345        self.detector.failed_peers()
346    }
347
348    /// Return the IDs of peers currently considered alive.
349    pub fn alive_peers(&self) -> Vec<NodeId> {
350        self.detector.alive_peers()
351    }
352
353    /// Return the number of tracked peers.
354    pub fn peer_count(&self) -> usize {
355        self.detector.peer_count()
356    }
357
358    /// Whether an election is currently pending (jitter not yet expired).
359    pub fn is_election_pending(&self) -> bool {
360        matches!(self.election_timer, ElectionTimer::Pending { .. })
361    }
362
363    /// Whether the election timer has fired.
364    pub fn is_election_fired(&self) -> bool {
365        matches!(self.election_timer, ElectionTimer::Fired { .. })
366    }
367
368    // ── Internal ────────────────────────────────────────────────────
369
370    fn schedule_election(&mut self) {
371        if matches!(
372            self.election_timer,
373            ElectionTimer::Pending { .. } | ElectionTimer::Fired { .. }
374        ) {
375            // Already scheduled or already fired; do not reset the timer.
376            return;
377        }
378        let jitter = self.config.random_jitter();
379        debug!(
380            self_id = self.self_id,
381            jitter_ms = jitter.as_millis() as u64,
382            "Scheduling election with jitter"
383        );
384        self.election_timer = ElectionTimer::Pending {
385            started_at: Instant::now(),
386            jitter,
387        };
388    }
389}
390
391impl std::fmt::Debug for FailoverCoordinator {
392    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
393        f.debug_struct("FailoverCoordinator")
394            .field("self_id", &self.self_id)
395            .field("current_leader", &self.current_leader)
396            .field("leader_failure_count", &self.leader_failure_count)
397            .field("peer_count", &self.detector.peer_count())
398            .finish()
399    }
400}
401
402// ── AlertEvent ──────────────────────────────────────────────────────
403
404/// An event that the alerting subsystem can deliver to registered callbacks.
405#[derive(Debug, Clone)]
406pub enum AlertEvent {
407    /// A node has been presumed failed (heartbeat timeout or explicit mark).
408    NodeFailed { node_id: NodeId },
409    /// A previously-failed node is now reachable again.
410    NodeRecovered { node_id: NodeId },
411    /// Raft leader changed (old may be `None` for the very first election).
412    LeaderChanged {
413        old_leader: Option<NodeId>,
414        new_leader: NodeId,
415    },
416    /// The cluster lost quorum — fewer than half of members are reachable.
417    QuorumLost {
418        cluster_size: usize,
419        reachable: usize,
420    },
421    /// A follower is lagging far behind the leader's commit index.
422    SlowReplication { follower: NodeId, lag_entries: u64 },
423}
424
425// ── AlertCallback / AlertManager ────────────────────────────────────
426
427/// A thread-safe, shared callback that receives [`AlertEvent`]s.
428pub type AlertCallback = Arc<dyn Fn(AlertEvent) + Send + Sync>;
429
430/// Fan-out alerting hub.
431///
432/// Register callbacks via [`register`][AlertManager::register]; emit events via
433/// [`emit`][AlertManager::emit].  Both methods are safe to call from multiple
434/// threads concurrently.
435pub struct AlertManager {
436    callbacks: Mutex<Vec<AlertCallback>>,
437}
438
439impl AlertManager {
440    /// Create an empty manager with no registered callbacks.
441    pub fn new() -> Self {
442        Self {
443            callbacks: Mutex::new(Vec::new()),
444        }
445    }
446
447    /// Register a new callback.
448    ///
449    /// The callback will be invoked synchronously (in the calling thread) for
450    /// every subsequent [`emit`][Self::emit] call.
451    pub fn register(&self, callback: AlertCallback) {
452        self.callbacks
453            .lock()
454            .unwrap_or_else(|e| e.into_inner())
455            .push(callback);
456    }
457
458    /// Emit an event to all registered callbacks.
459    ///
460    /// Callbacks are invoked in registration order.  A panicking callback does
461    /// not prevent the remaining callbacks from being invoked.
462    pub fn emit(&self, event: AlertEvent) {
463        let guard = self.callbacks.lock().unwrap_or_else(|e| e.into_inner());
464        for cb in guard.iter() {
465            // Use std::panic::catch_unwind so a bad callback cannot poison the
466            // lock or abort the cluster node.
467            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
468                cb(event.clone());
469            }));
470        }
471    }
472}
473
474impl Default for AlertManager {
475    fn default() -> Self {
476        Self::new()
477    }
478}
479
480impl std::fmt::Debug for AlertManager {
481    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
482        let count = self.callbacks.lock().map(|g| g.len()).unwrap_or(0);
483        f.debug_struct("AlertManager")
484            .field("callback_count", &count)
485            .finish()
486    }
487}
488
489// ── FailoverController ──────────────────────────────────────────────
490
491/// A higher-level node health monitor that combines heartbeat-timeout
492/// detection with explicit mark/recover operations.
493///
494/// Unlike [`FailoverCoordinator`] (which is tied to the Raft election
495/// machinery), `FailoverController` is a self-contained monitor suitable for
496/// use by the cluster topology dashboard, the alerting subsystem, or external
497/// health-check drivers.
498///
499/// # Thread safety
500///
501/// All methods use internal locks; the struct is `Send + Sync`.
502///
503/// # Test injection
504///
505/// Tests may call `set_last_seen` to back-date the last
506/// heartbeat timestamp for a node without actually waiting.
507pub struct FailoverController {
508    heartbeat_timeout: Duration,
509    last_seen: Mutex<std::collections::HashMap<NodeId, Instant>>,
510    failed_nodes: dashmap::DashSet<NodeId>,
511}
512
513impl FailoverController {
514    /// Create a new controller.
515    ///
516    /// `heartbeat_timeout` — how long a node can go without sending a
517    /// heartbeat before it is considered failed.
518    pub fn new(heartbeat_timeout: Duration) -> Self {
519        Self {
520            heartbeat_timeout,
521            last_seen: Mutex::new(std::collections::HashMap::new()),
522            failed_nodes: dashmap::DashSet::new(),
523        }
524    }
525
526    /// Record that a heartbeat was received from `node_id`.
527    ///
528    /// If the node was previously marked as failed it is automatically
529    /// recovered.
530    pub fn record_heartbeat(&self, node_id: NodeId) {
531        self.last_seen
532            .lock()
533            .unwrap_or_else(|e| e.into_inner())
534            .insert(node_id, Instant::now());
535        self.failed_nodes.remove(&node_id);
536    }
537
538    /// Check all known nodes; return the IDs of nodes that have not sent a
539    /// heartbeat within the timeout window.
540    ///
541    /// Nodes detected here are also added to the internal failed set (visible
542    /// via [`is_failed`][Self::is_failed] / [`failed_nodes`][Self::failed_nodes]).
543    pub fn detect_failed_nodes(&self) -> Vec<NodeId> {
544        let now = Instant::now();
545        let guard = self.last_seen.lock().unwrap_or_else(|e| e.into_inner());
546        let mut failed = Vec::new();
547        for (&node_id, &last) in guard.iter() {
548            if now.duration_since(last) >= self.heartbeat_timeout {
549                self.failed_nodes.insert(node_id);
550                failed.push(node_id);
551            }
552        }
553        failed
554    }
555
556    /// Explicitly mark a node as failed (e.g. after Raft loss-of-quorum).
557    pub fn mark_failed(&self, node_id: NodeId) {
558        self.failed_nodes.insert(node_id);
559    }
560
561    /// Mark a node as recovered.
562    ///
563    /// This removes it from the failed set.  Call
564    /// [`record_heartbeat`][Self::record_heartbeat] as well to reset the
565    /// timeout clock.
566    pub fn mark_recovered(&self, node_id: NodeId) {
567        self.failed_nodes.remove(&node_id);
568    }
569
570    /// Return `true` if `node_id` is currently considered failed.
571    pub fn is_failed(&self, node_id: NodeId) -> bool {
572        self.failed_nodes.contains(&node_id)
573    }
574
575    /// Return all currently failed node IDs as a `Vec`.
576    pub fn failed_nodes(&self) -> Vec<NodeId> {
577        self.failed_nodes.iter().map(|r| *r).collect()
578    }
579
580    // ── Test-only helpers ─────────────────────────────────────────────────────
581
582    /// Overwrite the last-seen timestamp for `node_id` to `instant`.
583    ///
584    /// Use this in tests to simulate time advancing without actually sleeping.
585    #[cfg(test)]
586    pub fn set_last_seen(&self, node_id: NodeId, instant: Instant) {
587        self.last_seen
588            .lock()
589            .unwrap_or_else(|e| e.into_inner())
590            .insert(node_id, instant);
591    }
592}
593
594impl std::fmt::Debug for FailoverController {
595    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
596        let known = self.last_seen.lock().map(|g| g.len()).unwrap_or(0);
597        f.debug_struct("FailoverController")
598            .field("heartbeat_timeout", &self.heartbeat_timeout)
599            .field("known_nodes", &known)
600            .field("failed_count", &self.failed_nodes.len())
601            .finish()
602    }
603}
604
605// ── Tests ───────────────────────────────────────────────────────────
606
607#[cfg(test)]
608mod tests {
609    use super::*;
610    use std::thread;
611
612    fn fast_heartbeat_config() -> HeartbeatConfig {
613        // Very short timeouts so tests complete quickly
614        HeartbeatConfig::new(10, 30, 1)
615    }
616
617    fn fast_failover_config() -> FailoverConfig {
618        FailoverConfig {
619            election_jitter_min_ms: 10,
620            election_jitter_max_ms: 30,
621            max_consecutive_failures: 1,
622        }
623    }
624
625    #[test]
626    fn test_failover_config_default() {
627        let cfg = FailoverConfig::default();
628        assert_eq!(cfg.election_jitter_min_ms, 150);
629        assert_eq!(cfg.election_jitter_max_ms, 300);
630        assert_eq!(cfg.max_consecutive_failures, 3);
631        assert!(cfg.validate().is_ok());
632    }
633
634    #[test]
635    fn test_failover_config_validation() {
636        let bad1 = FailoverConfig::new(0, 300, 3);
637        assert!(bad1.validate().is_err());
638
639        let bad2 = FailoverConfig::new(300, 150, 3);
640        assert!(bad2.validate().is_err());
641
642        let bad3 = FailoverConfig::new(150, 300, 0);
643        assert!(bad3.validate().is_err());
644
645        let bad4 = FailoverConfig::new(150, 150, 3);
646        assert!(bad4.validate().is_err());
647    }
648
649    #[test]
650    fn test_failover_config_jitter_in_range() {
651        let cfg = FailoverConfig::new(100, 200, 3);
652        for _ in 0..20 {
653            let jitter = cfg.random_jitter();
654            assert!(jitter.as_millis() >= 100, "jitter too low: {:?}", jitter);
655            assert!(jitter.as_millis() < 200, "jitter too high: {:?}", jitter);
656        }
657    }
658
659    #[test]
660    fn test_coordinator_creation() {
661        let coord =
662            FailoverCoordinator::new(HeartbeatConfig::default(), FailoverConfig::default(), 1);
663        assert_eq!(coord.leader_hint(), None);
664        assert_eq!(coord.peer_count(), 0);
665        assert!(!coord.is_election_pending());
666    }
667
668    #[test]
669    fn test_leader_hint_tracking() {
670        let mut coord =
671            FailoverCoordinator::new(HeartbeatConfig::default(), FailoverConfig::default(), 1);
672        assert_eq!(coord.leader_hint(), None);
673
674        coord.set_leader(2);
675        assert_eq!(coord.leader_hint(), Some(2));
676
677        coord.set_leader(3);
678        assert_eq!(coord.leader_hint(), Some(3));
679
680        coord.clear_leader();
681        assert_eq!(coord.leader_hint(), None);
682    }
683
684    #[test]
685    fn test_leader_failure_triggers_election() {
686        let mut coord =
687            FailoverCoordinator::new(fast_heartbeat_config(), fast_failover_config(), 1);
688        coord.track_peer(2).expect("track peer 2");
689        coord.track_peer(3).expect("track peer 3");
690        coord.set_leader(2);
691
692        // Let leader timeout
693        thread::sleep(Duration::from_millis(50));
694
695        let events = coord.tick().expect("tick");
696        let leader_lost = events.iter().any(|e| {
697            matches!(
698                e,
699                FailoverEvent::LeaderLost {
700                    old_leader: 2,
701                    election_triggered: true,
702                }
703            )
704        });
705        assert!(leader_lost, "Expected LeaderLost event, got: {:?}", events);
706        assert!(coord.is_election_pending());
707    }
708
709    #[test]
710    fn test_election_timer_fires_after_jitter() {
711        let mut coord =
712            FailoverCoordinator::new(fast_heartbeat_config(), fast_failover_config(), 1);
713        coord.track_peer(2).expect("track peer 2");
714        coord.set_leader(2);
715
716        // Let leader timeout to trigger election scheduling
717        thread::sleep(Duration::from_millis(50));
718        let _ = coord.tick().expect("tick 1");
719
720        // Wait for jitter to expire
721        thread::sleep(Duration::from_millis(50));
722        let events = coord.tick().expect("tick 2");
723
724        let timeout_fired = events
725            .iter()
726            .any(|e| matches!(e, FailoverEvent::FailoverTimeout));
727        assert!(
728            timeout_fired,
729            "Expected FailoverTimeout event, got: {:?}",
730            events
731        );
732        assert!(coord.is_election_fired());
733    }
734
735    #[test]
736    fn test_leader_recovery_cancels_election() {
737        let mut coord =
738            FailoverCoordinator::new(fast_heartbeat_config(), fast_failover_config(), 1);
739        coord.track_peer(2).expect("track peer 2");
740        coord.set_leader(2);
741
742        // Let leader timeout
743        thread::sleep(Duration::from_millis(50));
744        let _ = coord.tick().expect("tick");
745        assert!(coord.is_election_pending());
746
747        // Leader sends a heartbeat → recovery
748        coord.record_heartbeat(2).expect("record heartbeat");
749        let events = coord.tick().expect("tick after recovery");
750
751        let recovered = events
752            .iter()
753            .any(|e| matches!(e, FailoverEvent::PeerRecovered { node_id: 2 }));
754        assert!(recovered, "Expected PeerRecovered, got: {:?}", events);
755
756        // Election timer should be cancelled
757        assert!(!coord.is_election_pending());
758        assert!(!coord.is_election_fired());
759    }
760
761    #[test]
762    fn test_non_leader_failure_emits_peer_failed() {
763        let mut coord =
764            FailoverCoordinator::new(fast_heartbeat_config(), fast_failover_config(), 1);
765        coord.track_peer(2).expect("track peer 2");
766        coord.track_peer(3).expect("track peer 3");
767        coord.set_leader(2);
768
769        // Let node 3 (non-leader) timeout while leader (node 2) stays alive
770        thread::sleep(Duration::from_millis(50));
771        // Keep leader heartbeat fresh so it doesn't time out itself
772        coord.record_heartbeat(2).expect("leader heartbeat refresh");
773
774        let events = coord.tick().expect("tick");
775        let peer_failed = events
776            .iter()
777            .any(|e| matches!(e, FailoverEvent::PeerFailed { node_id: 3 }));
778        assert!(peer_failed, "Expected PeerFailed for 3, got: {:?}", events);
779        assert!(
780            !coord.is_election_pending(),
781            "Non-leader failure should not trigger election"
782        );
783    }
784
785    #[test]
786    fn test_jitter_prevents_simultaneous_elections() {
787        // Two coordinators for different nodes, same leader.
788        // Their jitter values should typically differ.
789        let hb = fast_heartbeat_config();
790        let fo = FailoverConfig {
791            election_jitter_min_ms: 50,
792            election_jitter_max_ms: 200,
793            max_consecutive_failures: 1,
794        };
795
796        let mut c1 = FailoverCoordinator::new(hb.clone(), fo.clone(), 1);
797        let mut c2 = FailoverCoordinator::new(hb.clone(), fo.clone(), 3);
798
799        c1.track_peer(2).expect("c1 track 2");
800        c1.track_peer(3).expect("c1 track 3");
801        c1.set_leader(2);
802
803        c2.track_peer(1).expect("c2 track 1");
804        c2.track_peer(2).expect("c2 track 2");
805        c2.set_leader(2);
806
807        // Let leader timeout on both
808        thread::sleep(Duration::from_millis(50));
809        let _ = c1.tick().expect("c1 tick");
810        let _ = c2.tick().expect("c2 tick");
811
812        // Both should have scheduled an election, but the internal jitter
813        // values should be independent (they use RandomState which is
814        // seeded differently per invocation).
815        assert!(c1.is_election_pending());
816        assert!(c2.is_election_pending());
817    }
818
819    #[test]
820    fn test_max_consecutive_failures_threshold() {
821        let mut coord = FailoverCoordinator::new(
822            fast_heartbeat_config(),
823            FailoverConfig {
824                election_jitter_min_ms: 10,
825                election_jitter_max_ms: 30,
826                max_consecutive_failures: 3,
827            },
828            1,
829        );
830        coord.track_peer(2).expect("track peer 2");
831        coord.set_leader(2);
832
833        // First timeout: failure count 1 — not enough
834        thread::sleep(Duration::from_millis(50));
835        let events = coord.tick().expect("tick 1");
836        let triggered = events.iter().any(|e| {
837            matches!(
838                e,
839                FailoverEvent::LeaderLost {
840                    election_triggered: true,
841                    ..
842                }
843            )
844        });
845        assert!(
846            !triggered,
847            "Should not trigger election after 1 failure, got: {:?}",
848            events
849        );
850
851        // Since FailureDetector only emits NodeFailed once per peer
852        // (stays in failed state), subsequent ticks won't increment.
853        // This verifies the threshold behaviour: a single detection with
854        // max_consecutive_failures=3 does NOT trigger an election.
855        assert!(!coord.is_election_pending());
856    }
857
858    #[test]
859    fn test_set_new_leader_resets_state() {
860        let mut coord =
861            FailoverCoordinator::new(fast_heartbeat_config(), fast_failover_config(), 1);
862        coord.track_peer(2).expect("track peer 2");
863        coord.track_peer(3).expect("track peer 3");
864        coord.set_leader(2);
865
866        // Let leader timeout and schedule election
867        thread::sleep(Duration::from_millis(50));
868        let _ = coord.tick().expect("tick");
869        assert!(coord.is_election_pending());
870
871        // New leader elected: resets everything
872        coord.set_leader(3);
873        assert!(!coord.is_election_pending());
874        assert!(!coord.is_election_fired());
875        assert_eq!(coord.leader_hint(), Some(3));
876    }
877
878    #[test]
879    fn test_reset_clears_all() {
880        let mut coord =
881            FailoverCoordinator::new(fast_heartbeat_config(), fast_failover_config(), 1);
882        coord.track_peer(2).expect("track peer 2");
883        coord.set_leader(2);
884
885        thread::sleep(Duration::from_millis(50));
886        let _ = coord.tick().expect("tick");
887
888        coord.reset();
889        assert!(!coord.is_election_pending());
890        assert!(!coord.is_election_fired());
891        assert!(coord.failed_peers().is_empty());
892    }
893
894    #[test]
895    fn test_remove_leader_peer_clears_leader() {
896        let mut coord =
897            FailoverCoordinator::new(HeartbeatConfig::default(), FailoverConfig::default(), 1);
898        coord.track_peer(2).expect("track peer 2");
899        coord.set_leader(2);
900        assert_eq!(coord.leader_hint(), Some(2));
901
902        coord.remove_peer(2);
903        assert_eq!(coord.leader_hint(), None);
904    }
905
906    #[test]
907    fn test_debug_impl() {
908        let coord =
909            FailoverCoordinator::new(HeartbeatConfig::default(), FailoverConfig::default(), 1);
910        let dbg = format!("{:?}", coord);
911        assert!(dbg.contains("FailoverCoordinator"));
912        assert!(dbg.contains("self_id"));
913    }
914
915    // ── FailoverController tests ───────────────────────────────────────────────
916
917    /// After calling `set_last_seen` with a timestamp far in the past,
918    /// `detect_failed_nodes` must report that node as failed.
919    #[test]
920    fn test_failover_controller_detects_timeout() {
921        let timeout = Duration::from_millis(100);
922        let controller = FailoverController::new(timeout);
923
924        // Prime last_seen with a timestamp well before the timeout.
925        let old_instant = Instant::now() - Duration::from_millis(500);
926        controller.set_last_seen(42, old_instant);
927
928        let failed = controller.detect_failed_nodes();
929        assert!(
930            failed.contains(&42),
931            "node 42 should be detected as failed; got {:?}",
932            failed
933        );
934        assert!(controller.is_failed(42), "is_failed(42) should return true");
935    }
936
937    /// A node that is first failed, then recovered, must not appear in the
938    /// failed list.
939    #[test]
940    fn test_failover_controller_recovered_node() {
941        let controller = FailoverController::new(Duration::from_millis(100));
942
943        controller.mark_failed(7);
944        assert!(controller.is_failed(7));
945
946        controller.mark_recovered(7);
947        assert!(
948            !controller.is_failed(7),
949            "node 7 should no longer be failed"
950        );
951        assert!(
952            !controller.failed_nodes().contains(&7),
953            "failed_nodes() must not include recovered node 7"
954        );
955    }
956
957    /// record_heartbeat after a failure should clear the failed status.
958    #[test]
959    fn test_failover_controller_heartbeat_clears_failure() {
960        let controller = FailoverController::new(Duration::from_millis(100));
961        controller.mark_failed(3);
962        assert!(controller.is_failed(3));
963        controller.record_heartbeat(3);
964        assert!(!controller.is_failed(3));
965    }
966
967    // ── AlertManager tests ─────────────────────────────────────────────────────
968
969    /// Emitting one event to two registered callbacks must invoke both.
970    #[test]
971    fn test_alert_manager_emits_to_all_callbacks() {
972        use std::sync::atomic::{AtomicUsize, Ordering};
973
974        let manager = AlertManager::new();
975        let count = Arc::new(AtomicUsize::new(0));
976
977        let c1 = Arc::clone(&count);
978        manager.register(Arc::new(move |_evt: AlertEvent| {
979            c1.fetch_add(1, Ordering::Relaxed);
980        }));
981        let c2 = Arc::clone(&count);
982        manager.register(Arc::new(move |_evt: AlertEvent| {
983            c2.fetch_add(1, Ordering::Relaxed);
984        }));
985
986        manager.emit(AlertEvent::NodeFailed { node_id: 5 });
987
988        assert_eq!(
989            count.load(Ordering::Relaxed),
990            2,
991            "both callbacks should have been invoked"
992        );
993    }
994
995    /// Registering and emitting from multiple threads must not panic or
996    /// deadlock.
997    #[test]
998    fn test_alert_manager_thread_safe() {
999        use std::sync::atomic::{AtomicUsize, Ordering};
1000        use std::thread;
1001
1002        let manager = Arc::new(AlertManager::new());
1003        let received = Arc::new(AtomicUsize::new(0));
1004
1005        // Register from multiple threads.
1006        let mut handles = Vec::new();
1007        for _ in 0..4 {
1008            let mgr = Arc::clone(&manager);
1009            let recv = Arc::clone(&received);
1010            handles.push(thread::spawn(move || {
1011                mgr.register(Arc::new(move |_evt: AlertEvent| {
1012                    recv.fetch_add(1, Ordering::Relaxed);
1013                }));
1014            }));
1015        }
1016        for h in handles {
1017            h.join().expect("register thread must not panic");
1018        }
1019
1020        // Emit from multiple threads simultaneously.
1021        let mut emit_handles = Vec::new();
1022        for _ in 0..4 {
1023            let mgr = Arc::clone(&manager);
1024            emit_handles.push(thread::spawn(move || {
1025                mgr.emit(AlertEvent::NodeFailed { node_id: 1 });
1026            }));
1027        }
1028        for h in emit_handles {
1029            h.join().expect("emit thread must not panic");
1030        }
1031
1032        // 4 emits × 4 callbacks = 16 total invocations.
1033        let total = received.load(Ordering::Relaxed);
1034        assert_eq!(total, 16, "expected 16 invocations, got {}", total);
1035    }
1036
1037    /// AlertEvent::LeaderChanged carries both old and new leader IDs.
1038    #[test]
1039    fn test_alert_manager_leader_changed_event() {
1040        let manager = AlertManager::new();
1041        let events: Arc<Mutex<Vec<AlertEvent>>> = Arc::new(Mutex::new(Vec::new()));
1042
1043        let ev = Arc::clone(&events);
1044        manager.register(Arc::new(move |e: AlertEvent| {
1045            ev.lock().unwrap_or_else(|e| e.into_inner()).push(e);
1046        }));
1047
1048        manager.emit(AlertEvent::LeaderChanged {
1049            old_leader: Some(1),
1050            new_leader: 2,
1051        });
1052
1053        let guard = events.lock().unwrap_or_else(|e| e.into_inner());
1054        assert_eq!(guard.len(), 1);
1055        assert!(matches!(
1056            guard[0],
1057            AlertEvent::LeaderChanged {
1058                old_leader: Some(1),
1059                new_leader: 2,
1060            }
1061        ));
1062    }
1063
1064    /// After leader loss (set to None), should_redirect returns false because no
1065    /// leader hint is known. Once a new leader is elected and set, should_redirect
1066    /// returns true for non-leader nodes and false for the leader itself.
1067    #[test]
1068    fn test_failover_redirects_after_leader_loss() {
1069        let mut coord =
1070            FailoverCoordinator::new(HeartbeatConfig::default(), FailoverConfig::default(), 1);
1071
1072        // No leader known yet — should not redirect (unknown destination)
1073        assert!(
1074            !coord.should_redirect(1),
1075            "no redirect when leader is unknown"
1076        );
1077        assert!(
1078            !coord.should_redirect(2),
1079            "no redirect when leader is unknown"
1080        );
1081
1082        // Set node 2 as leader
1083        coord.set_leader(2);
1084        // Node 1 (self) should redirect to node 2
1085        assert!(
1086            coord.should_redirect(1),
1087            "node 1 should redirect when leader is node 2"
1088        );
1089        // Node 2 (the leader) should not redirect to itself
1090        assert!(
1091            !coord.should_redirect(2),
1092            "node 2 should not redirect when it is the leader"
1093        );
1094
1095        // Simulate leader loss
1096        coord.clear_leader();
1097        // After loss, no redirect (leader unknown — election in progress)
1098        assert!(
1099            !coord.should_redirect(1),
1100            "no redirect when leader just lost (election pending)"
1101        );
1102
1103        // New leader (node 3) elected after recovery
1104        coord.set_leader(3);
1105        assert!(
1106            coord.should_redirect(1),
1107            "node 1 should redirect to new leader node 3"
1108        );
1109        assert!(
1110            coord.should_redirect(2),
1111            "node 2 should redirect to new leader node 3"
1112        );
1113        assert!(
1114            !coord.should_redirect(3),
1115            "node 3 should not redirect to itself"
1116        );
1117    }
1118
1119    /// A follower failure (non-leader peer) must not change the redirect behaviour;
1120    /// the leader_hint should remain pointing to the known leader.
1121    #[test]
1122    fn test_failover_no_redirect_on_follower_loss() {
1123        let mut coord =
1124            FailoverCoordinator::new(fast_heartbeat_config(), fast_failover_config(), 1);
1125        coord.track_peer(2).expect("track peer 2");
1126        coord.track_peer(3).expect("track peer 3");
1127        // Node 2 is the leader; node 3 is a follower
1128        coord.set_leader(2);
1129
1130        // Node 3 (follower) times out — keep leader heartbeat alive
1131        thread::sleep(Duration::from_millis(50));
1132        coord.record_heartbeat(2).expect("leader heartbeat");
1133        let events = coord.tick().expect("tick");
1134
1135        // Node 3 should be reported as failed
1136        let peer_failed = events
1137            .iter()
1138            .any(|e| matches!(e, FailoverEvent::PeerFailed { node_id: 3 }));
1139        assert!(peer_failed, "Expected PeerFailed for node 3");
1140
1141        // The leader hint must still point to node 2
1142        assert_eq!(
1143            coord.leader_hint(),
1144            Some(2),
1145            "leader hint should still be node 2 after follower loss"
1146        );
1147
1148        // Redirect logic: node 1 should still redirect to node 2
1149        assert!(
1150            coord.should_redirect(1),
1151            "node 1 should still redirect to leader 2 after follower 3 fails"
1152        );
1153        // No election should have been triggered by the follower failure
1154        assert!(
1155            !coord.is_election_pending(),
1156            "election must not be triggered by non-leader failure"
1157        );
1158    }
1159}