1use std::collections::hash_map::RandomState;
13use std::hash::BuildHasher;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use tracing::{debug, info, warn};
18
19use crate::error::{RaftError, RaftResult};
20use crate::heartbeat::FailureDetector;
21use crate::types::{FailureEvent, HeartbeatConfig, NodeId};
22
23#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum FailoverEvent {
28 LeaderLost {
31 old_leader: NodeId,
33 election_triggered: bool,
35 },
36 LeaderElected {
38 new_leader: NodeId,
40 },
41 FailoverTimeout,
43 PeerFailed {
45 node_id: NodeId,
47 },
48 PeerRecovered {
50 node_id: NodeId,
52 },
53}
54
55#[derive(Debug, Clone)]
59pub struct FailoverConfig {
60 pub election_jitter_min_ms: u64,
62 pub election_jitter_max_ms: u64,
64 pub max_consecutive_failures: u32,
67}
68
69impl FailoverConfig {
70 pub fn new(
72 election_jitter_min_ms: u64,
73 election_jitter_max_ms: u64,
74 max_consecutive_failures: u32,
75 ) -> Self {
76 Self {
77 election_jitter_min_ms,
78 election_jitter_max_ms,
79 max_consecutive_failures,
80 }
81 }
82
83 pub fn validate(&self) -> Result<(), String> {
85 if self.election_jitter_min_ms == 0 {
86 return Err("election_jitter_min_ms must be > 0".to_string());
87 }
88 if self.election_jitter_max_ms <= self.election_jitter_min_ms {
89 return Err(format!(
90 "election_jitter_max_ms ({}) must be > election_jitter_min_ms ({})",
91 self.election_jitter_max_ms, self.election_jitter_min_ms,
92 ));
93 }
94 if self.max_consecutive_failures == 0 {
95 return Err("max_consecutive_failures must be > 0".to_string());
96 }
97 Ok(())
98 }
99
100 fn random_jitter(&self) -> Duration {
102 let range = self.election_jitter_max_ms - self.election_jitter_min_ms;
103 let now = std::time::SystemTime::now()
104 .duration_since(std::time::UNIX_EPOCH)
105 .map(|d| d.as_nanos())
106 .unwrap_or(0);
107 let random_value = RandomState::new().hash_one(now);
108 let jitter_ms = self.election_jitter_min_ms + (random_value % range);
109 Duration::from_millis(jitter_ms)
110 }
111}
112
113impl Default for FailoverConfig {
114 fn default() -> Self {
115 Self {
116 election_jitter_min_ms: 150,
117 election_jitter_max_ms: 300,
118 max_consecutive_failures: 3,
119 }
120 }
121}
122
123#[derive(Debug)]
127enum ElectionTimer {
128 Idle,
130 Pending {
133 started_at: Instant,
135 jitter: Duration,
137 },
138 Fired {
141 fired_at: Instant,
143 },
144}
145
146pub struct FailoverCoordinator {
168 detector: FailureDetector,
170 config: FailoverConfig,
172 self_id: NodeId,
174 current_leader: Option<NodeId>,
176 election_timer: ElectionTimer,
178 leader_failure_count: u32,
180}
181
182impl FailoverCoordinator {
183 pub fn new(
185 heartbeat_config: HeartbeatConfig,
186 failover_config: FailoverConfig,
187 self_id: NodeId,
188 ) -> Self {
189 Self {
190 detector: FailureDetector::new(heartbeat_config, self_id),
191 config: failover_config,
192 self_id,
193 current_leader: None,
194 election_timer: ElectionTimer::Idle,
195 leader_failure_count: 0,
196 }
197 }
198
199 pub fn track_peer(&mut self, peer_id: NodeId) -> RaftResult<()> {
203 self.detector.track_peer(peer_id)
204 }
205
206 pub fn remove_peer(&mut self, peer_id: NodeId) {
208 self.detector.remove_peer(peer_id);
209 if self.current_leader == Some(peer_id) {
210 self.current_leader = None;
211 }
212 }
213
214 pub fn record_heartbeat(&mut self, peer_id: NodeId) -> RaftResult<()> {
216 self.detector.record_heartbeat(peer_id)
217 }
218
219 pub fn set_leader(&mut self, leader_id: NodeId) {
223 let changed = self.current_leader != Some(leader_id);
224 self.current_leader = Some(leader_id);
225 if changed {
226 self.leader_failure_count = 0;
227 self.election_timer = ElectionTimer::Idle;
228 debug!(
229 self_id = self.self_id,
230 leader_id = leader_id,
231 "FailoverCoordinator: leader updated"
232 );
233 }
234 }
235
236 pub fn clear_leader(&mut self) {
238 self.current_leader = None;
239 self.leader_failure_count = 0;
240 self.election_timer = ElectionTimer::Idle;
241 }
242
243 pub fn leader_hint(&self) -> Option<NodeId> {
245 self.current_leader
246 }
247
248 pub fn should_redirect(&self, my_id: NodeId) -> bool {
254 match self.current_leader {
255 Some(leader) => leader != my_id,
256 None => false,
257 }
258 }
259
260 pub fn tick(&mut self) -> RaftResult<Vec<FailoverEvent>> {
268 let failure_events = self.detector.check_timeouts()?;
269 let mut out = Vec::new();
270
271 for fe in &failure_events {
272 match fe {
273 FailureEvent::NodeFailed { node_id, .. } => {
274 if Some(*node_id) == self.current_leader {
275 self.leader_failure_count = self.leader_failure_count.saturating_add(1);
276 let should_trigger =
277 self.leader_failure_count >= self.config.max_consecutive_failures;
278
279 if should_trigger {
280 self.schedule_election();
281 }
282
283 info!(
284 self_id = self.self_id,
285 leader = node_id,
286 failure_count = self.leader_failure_count,
287 triggered = should_trigger,
288 "Leader failure detected"
289 );
290
291 out.push(FailoverEvent::LeaderLost {
292 old_leader: *node_id,
293 election_triggered: should_trigger,
294 });
295 } else {
296 out.push(FailoverEvent::PeerFailed { node_id: *node_id });
297 }
298 }
299 FailureEvent::NodeRecovered { node_id } => {
300 if Some(*node_id) == self.current_leader {
301 self.leader_failure_count = 0;
303 self.election_timer = ElectionTimer::Idle;
304 debug!(
305 self_id = self.self_id,
306 leader = node_id,
307 "Leader recovered, election timer cancelled"
308 );
309 }
310 out.push(FailoverEvent::PeerRecovered { node_id: *node_id });
311 }
312 }
313 }
314
315 match &self.election_timer {
317 ElectionTimer::Pending { started_at, jitter } => {
318 if started_at.elapsed() >= *jitter {
319 info!(
320 self_id = self.self_id,
321 jitter_ms = jitter.as_millis() as u64,
322 "Election jitter expired, triggering failover"
323 );
324 self.election_timer = ElectionTimer::Fired {
325 fired_at: Instant::now(),
326 };
327 out.push(FailoverEvent::FailoverTimeout);
328 }
329 }
330 ElectionTimer::Fired { .. } | ElectionTimer::Idle => {}
331 }
332
333 Ok(out)
334 }
335
336 pub fn reset(&mut self) {
338 self.detector.reset_all();
339 self.leader_failure_count = 0;
340 self.election_timer = ElectionTimer::Idle;
341 }
342
343 pub fn failed_peers(&self) -> Vec<NodeId> {
345 self.detector.failed_peers()
346 }
347
348 pub fn alive_peers(&self) -> Vec<NodeId> {
350 self.detector.alive_peers()
351 }
352
353 pub fn peer_count(&self) -> usize {
355 self.detector.peer_count()
356 }
357
358 pub fn is_election_pending(&self) -> bool {
360 matches!(self.election_timer, ElectionTimer::Pending { .. })
361 }
362
363 pub fn is_election_fired(&self) -> bool {
365 matches!(self.election_timer, ElectionTimer::Fired { .. })
366 }
367
368 fn schedule_election(&mut self) {
371 if matches!(
372 self.election_timer,
373 ElectionTimer::Pending { .. } | ElectionTimer::Fired { .. }
374 ) {
375 return;
377 }
378 let jitter = self.config.random_jitter();
379 debug!(
380 self_id = self.self_id,
381 jitter_ms = jitter.as_millis() as u64,
382 "Scheduling election with jitter"
383 );
384 self.election_timer = ElectionTimer::Pending {
385 started_at: Instant::now(),
386 jitter,
387 };
388 }
389}
390
391impl std::fmt::Debug for FailoverCoordinator {
392 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
393 f.debug_struct("FailoverCoordinator")
394 .field("self_id", &self.self_id)
395 .field("current_leader", &self.current_leader)
396 .field("leader_failure_count", &self.leader_failure_count)
397 .field("peer_count", &self.detector.peer_count())
398 .finish()
399 }
400}
401
402#[derive(Debug, Clone)]
406pub enum AlertEvent {
407 NodeFailed { node_id: NodeId },
409 NodeRecovered { node_id: NodeId },
411 LeaderChanged {
413 old_leader: Option<NodeId>,
414 new_leader: NodeId,
415 },
416 QuorumLost {
418 cluster_size: usize,
419 reachable: usize,
420 },
421 SlowReplication { follower: NodeId, lag_entries: u64 },
423}
424
425pub type AlertCallback = Arc<dyn Fn(AlertEvent) + Send + Sync>;
429
430pub struct AlertManager {
436 callbacks: Mutex<Vec<AlertCallback>>,
437}
438
439impl AlertManager {
440 pub fn new() -> Self {
442 Self {
443 callbacks: Mutex::new(Vec::new()),
444 }
445 }
446
447 pub fn register(&self, callback: AlertCallback) {
452 self.callbacks
453 .lock()
454 .unwrap_or_else(|e| e.into_inner())
455 .push(callback);
456 }
457
458 pub fn emit(&self, event: AlertEvent) {
463 let guard = self.callbacks.lock().unwrap_or_else(|e| e.into_inner());
464 for cb in guard.iter() {
465 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
468 cb(event.clone());
469 }));
470 }
471 }
472}
473
474impl Default for AlertManager {
475 fn default() -> Self {
476 Self::new()
477 }
478}
479
480impl std::fmt::Debug for AlertManager {
481 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
482 let count = self.callbacks.lock().map(|g| g.len()).unwrap_or(0);
483 f.debug_struct("AlertManager")
484 .field("callback_count", &count)
485 .finish()
486 }
487}
488
489pub struct FailoverController {
508 heartbeat_timeout: Duration,
509 last_seen: Mutex<std::collections::HashMap<NodeId, Instant>>,
510 failed_nodes: dashmap::DashSet<NodeId>,
511}
512
513impl FailoverController {
514 pub fn new(heartbeat_timeout: Duration) -> Self {
519 Self {
520 heartbeat_timeout,
521 last_seen: Mutex::new(std::collections::HashMap::new()),
522 failed_nodes: dashmap::DashSet::new(),
523 }
524 }
525
526 pub fn record_heartbeat(&self, node_id: NodeId) {
531 self.last_seen
532 .lock()
533 .unwrap_or_else(|e| e.into_inner())
534 .insert(node_id, Instant::now());
535 self.failed_nodes.remove(&node_id);
536 }
537
538 pub fn detect_failed_nodes(&self) -> Vec<NodeId> {
544 let now = Instant::now();
545 let guard = self.last_seen.lock().unwrap_or_else(|e| e.into_inner());
546 let mut failed = Vec::new();
547 for (&node_id, &last) in guard.iter() {
548 if now.duration_since(last) >= self.heartbeat_timeout {
549 self.failed_nodes.insert(node_id);
550 failed.push(node_id);
551 }
552 }
553 failed
554 }
555
556 pub fn mark_failed(&self, node_id: NodeId) {
558 self.failed_nodes.insert(node_id);
559 }
560
561 pub fn mark_recovered(&self, node_id: NodeId) {
567 self.failed_nodes.remove(&node_id);
568 }
569
570 pub fn is_failed(&self, node_id: NodeId) -> bool {
572 self.failed_nodes.contains(&node_id)
573 }
574
575 pub fn failed_nodes(&self) -> Vec<NodeId> {
577 self.failed_nodes.iter().map(|r| *r).collect()
578 }
579
580 #[cfg(test)]
586 pub fn set_last_seen(&self, node_id: NodeId, instant: Instant) {
587 self.last_seen
588 .lock()
589 .unwrap_or_else(|e| e.into_inner())
590 .insert(node_id, instant);
591 }
592}
593
594impl std::fmt::Debug for FailoverController {
595 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
596 let known = self.last_seen.lock().map(|g| g.len()).unwrap_or(0);
597 f.debug_struct("FailoverController")
598 .field("heartbeat_timeout", &self.heartbeat_timeout)
599 .field("known_nodes", &known)
600 .field("failed_count", &self.failed_nodes.len())
601 .finish()
602 }
603}
604
605#[cfg(test)]
608mod tests {
609 use super::*;
610 use std::thread;
611
612 fn fast_heartbeat_config() -> HeartbeatConfig {
613 HeartbeatConfig::new(10, 30, 1)
615 }
616
617 fn fast_failover_config() -> FailoverConfig {
618 FailoverConfig {
619 election_jitter_min_ms: 10,
620 election_jitter_max_ms: 30,
621 max_consecutive_failures: 1,
622 }
623 }
624
625 #[test]
626 fn test_failover_config_default() {
627 let cfg = FailoverConfig::default();
628 assert_eq!(cfg.election_jitter_min_ms, 150);
629 assert_eq!(cfg.election_jitter_max_ms, 300);
630 assert_eq!(cfg.max_consecutive_failures, 3);
631 assert!(cfg.validate().is_ok());
632 }
633
634 #[test]
635 fn test_failover_config_validation() {
636 let bad1 = FailoverConfig::new(0, 300, 3);
637 assert!(bad1.validate().is_err());
638
639 let bad2 = FailoverConfig::new(300, 150, 3);
640 assert!(bad2.validate().is_err());
641
642 let bad3 = FailoverConfig::new(150, 300, 0);
643 assert!(bad3.validate().is_err());
644
645 let bad4 = FailoverConfig::new(150, 150, 3);
646 assert!(bad4.validate().is_err());
647 }
648
649 #[test]
650 fn test_failover_config_jitter_in_range() {
651 let cfg = FailoverConfig::new(100, 200, 3);
652 for _ in 0..20 {
653 let jitter = cfg.random_jitter();
654 assert!(jitter.as_millis() >= 100, "jitter too low: {:?}", jitter);
655 assert!(jitter.as_millis() < 200, "jitter too high: {:?}", jitter);
656 }
657 }
658
659 #[test]
660 fn test_coordinator_creation() {
661 let coord =
662 FailoverCoordinator::new(HeartbeatConfig::default(), FailoverConfig::default(), 1);
663 assert_eq!(coord.leader_hint(), None);
664 assert_eq!(coord.peer_count(), 0);
665 assert!(!coord.is_election_pending());
666 }
667
668 #[test]
669 fn test_leader_hint_tracking() {
670 let mut coord =
671 FailoverCoordinator::new(HeartbeatConfig::default(), FailoverConfig::default(), 1);
672 assert_eq!(coord.leader_hint(), None);
673
674 coord.set_leader(2);
675 assert_eq!(coord.leader_hint(), Some(2));
676
677 coord.set_leader(3);
678 assert_eq!(coord.leader_hint(), Some(3));
679
680 coord.clear_leader();
681 assert_eq!(coord.leader_hint(), None);
682 }
683
684 #[test]
685 fn test_leader_failure_triggers_election() {
686 let mut coord =
687 FailoverCoordinator::new(fast_heartbeat_config(), fast_failover_config(), 1);
688 coord.track_peer(2).expect("track peer 2");
689 coord.track_peer(3).expect("track peer 3");
690 coord.set_leader(2);
691
692 thread::sleep(Duration::from_millis(50));
694
695 let events = coord.tick().expect("tick");
696 let leader_lost = events.iter().any(|e| {
697 matches!(
698 e,
699 FailoverEvent::LeaderLost {
700 old_leader: 2,
701 election_triggered: true,
702 }
703 )
704 });
705 assert!(leader_lost, "Expected LeaderLost event, got: {:?}", events);
706 assert!(coord.is_election_pending());
707 }
708
709 #[test]
710 fn test_election_timer_fires_after_jitter() {
711 let mut coord =
712 FailoverCoordinator::new(fast_heartbeat_config(), fast_failover_config(), 1);
713 coord.track_peer(2).expect("track peer 2");
714 coord.set_leader(2);
715
716 thread::sleep(Duration::from_millis(50));
718 let _ = coord.tick().expect("tick 1");
719
720 thread::sleep(Duration::from_millis(50));
722 let events = coord.tick().expect("tick 2");
723
724 let timeout_fired = events
725 .iter()
726 .any(|e| matches!(e, FailoverEvent::FailoverTimeout));
727 assert!(
728 timeout_fired,
729 "Expected FailoverTimeout event, got: {:?}",
730 events
731 );
732 assert!(coord.is_election_fired());
733 }
734
735 #[test]
736 fn test_leader_recovery_cancels_election() {
737 let mut coord =
738 FailoverCoordinator::new(fast_heartbeat_config(), fast_failover_config(), 1);
739 coord.track_peer(2).expect("track peer 2");
740 coord.set_leader(2);
741
742 thread::sleep(Duration::from_millis(50));
744 let _ = coord.tick().expect("tick");
745 assert!(coord.is_election_pending());
746
747 coord.record_heartbeat(2).expect("record heartbeat");
749 let events = coord.tick().expect("tick after recovery");
750
751 let recovered = events
752 .iter()
753 .any(|e| matches!(e, FailoverEvent::PeerRecovered { node_id: 2 }));
754 assert!(recovered, "Expected PeerRecovered, got: {:?}", events);
755
756 assert!(!coord.is_election_pending());
758 assert!(!coord.is_election_fired());
759 }
760
761 #[test]
762 fn test_non_leader_failure_emits_peer_failed() {
763 let mut coord =
764 FailoverCoordinator::new(fast_heartbeat_config(), fast_failover_config(), 1);
765 coord.track_peer(2).expect("track peer 2");
766 coord.track_peer(3).expect("track peer 3");
767 coord.set_leader(2);
768
769 thread::sleep(Duration::from_millis(50));
771 coord.record_heartbeat(2).expect("leader heartbeat refresh");
773
774 let events = coord.tick().expect("tick");
775 let peer_failed = events
776 .iter()
777 .any(|e| matches!(e, FailoverEvent::PeerFailed { node_id: 3 }));
778 assert!(peer_failed, "Expected PeerFailed for 3, got: {:?}", events);
779 assert!(
780 !coord.is_election_pending(),
781 "Non-leader failure should not trigger election"
782 );
783 }
784
785 #[test]
786 fn test_jitter_prevents_simultaneous_elections() {
787 let hb = fast_heartbeat_config();
790 let fo = FailoverConfig {
791 election_jitter_min_ms: 50,
792 election_jitter_max_ms: 200,
793 max_consecutive_failures: 1,
794 };
795
796 let mut c1 = FailoverCoordinator::new(hb.clone(), fo.clone(), 1);
797 let mut c2 = FailoverCoordinator::new(hb.clone(), fo.clone(), 3);
798
799 c1.track_peer(2).expect("c1 track 2");
800 c1.track_peer(3).expect("c1 track 3");
801 c1.set_leader(2);
802
803 c2.track_peer(1).expect("c2 track 1");
804 c2.track_peer(2).expect("c2 track 2");
805 c2.set_leader(2);
806
807 thread::sleep(Duration::from_millis(50));
809 let _ = c1.tick().expect("c1 tick");
810 let _ = c2.tick().expect("c2 tick");
811
812 assert!(c1.is_election_pending());
816 assert!(c2.is_election_pending());
817 }
818
819 #[test]
820 fn test_max_consecutive_failures_threshold() {
821 let mut coord = FailoverCoordinator::new(
822 fast_heartbeat_config(),
823 FailoverConfig {
824 election_jitter_min_ms: 10,
825 election_jitter_max_ms: 30,
826 max_consecutive_failures: 3,
827 },
828 1,
829 );
830 coord.track_peer(2).expect("track peer 2");
831 coord.set_leader(2);
832
833 thread::sleep(Duration::from_millis(50));
835 let events = coord.tick().expect("tick 1");
836 let triggered = events.iter().any(|e| {
837 matches!(
838 e,
839 FailoverEvent::LeaderLost {
840 election_triggered: true,
841 ..
842 }
843 )
844 });
845 assert!(
846 !triggered,
847 "Should not trigger election after 1 failure, got: {:?}",
848 events
849 );
850
851 assert!(!coord.is_election_pending());
856 }
857
858 #[test]
859 fn test_set_new_leader_resets_state() {
860 let mut coord =
861 FailoverCoordinator::new(fast_heartbeat_config(), fast_failover_config(), 1);
862 coord.track_peer(2).expect("track peer 2");
863 coord.track_peer(3).expect("track peer 3");
864 coord.set_leader(2);
865
866 thread::sleep(Duration::from_millis(50));
868 let _ = coord.tick().expect("tick");
869 assert!(coord.is_election_pending());
870
871 coord.set_leader(3);
873 assert!(!coord.is_election_pending());
874 assert!(!coord.is_election_fired());
875 assert_eq!(coord.leader_hint(), Some(3));
876 }
877
878 #[test]
879 fn test_reset_clears_all() {
880 let mut coord =
881 FailoverCoordinator::new(fast_heartbeat_config(), fast_failover_config(), 1);
882 coord.track_peer(2).expect("track peer 2");
883 coord.set_leader(2);
884
885 thread::sleep(Duration::from_millis(50));
886 let _ = coord.tick().expect("tick");
887
888 coord.reset();
889 assert!(!coord.is_election_pending());
890 assert!(!coord.is_election_fired());
891 assert!(coord.failed_peers().is_empty());
892 }
893
894 #[test]
895 fn test_remove_leader_peer_clears_leader() {
896 let mut coord =
897 FailoverCoordinator::new(HeartbeatConfig::default(), FailoverConfig::default(), 1);
898 coord.track_peer(2).expect("track peer 2");
899 coord.set_leader(2);
900 assert_eq!(coord.leader_hint(), Some(2));
901
902 coord.remove_peer(2);
903 assert_eq!(coord.leader_hint(), None);
904 }
905
906 #[test]
907 fn test_debug_impl() {
908 let coord =
909 FailoverCoordinator::new(HeartbeatConfig::default(), FailoverConfig::default(), 1);
910 let dbg = format!("{:?}", coord);
911 assert!(dbg.contains("FailoverCoordinator"));
912 assert!(dbg.contains("self_id"));
913 }
914
915 #[test]
920 fn test_failover_controller_detects_timeout() {
921 let timeout = Duration::from_millis(100);
922 let controller = FailoverController::new(timeout);
923
924 let old_instant = Instant::now() - Duration::from_millis(500);
926 controller.set_last_seen(42, old_instant);
927
928 let failed = controller.detect_failed_nodes();
929 assert!(
930 failed.contains(&42),
931 "node 42 should be detected as failed; got {:?}",
932 failed
933 );
934 assert!(controller.is_failed(42), "is_failed(42) should return true");
935 }
936
937 #[test]
940 fn test_failover_controller_recovered_node() {
941 let controller = FailoverController::new(Duration::from_millis(100));
942
943 controller.mark_failed(7);
944 assert!(controller.is_failed(7));
945
946 controller.mark_recovered(7);
947 assert!(
948 !controller.is_failed(7),
949 "node 7 should no longer be failed"
950 );
951 assert!(
952 !controller.failed_nodes().contains(&7),
953 "failed_nodes() must not include recovered node 7"
954 );
955 }
956
957 #[test]
959 fn test_failover_controller_heartbeat_clears_failure() {
960 let controller = FailoverController::new(Duration::from_millis(100));
961 controller.mark_failed(3);
962 assert!(controller.is_failed(3));
963 controller.record_heartbeat(3);
964 assert!(!controller.is_failed(3));
965 }
966
967 #[test]
971 fn test_alert_manager_emits_to_all_callbacks() {
972 use std::sync::atomic::{AtomicUsize, Ordering};
973
974 let manager = AlertManager::new();
975 let count = Arc::new(AtomicUsize::new(0));
976
977 let c1 = Arc::clone(&count);
978 manager.register(Arc::new(move |_evt: AlertEvent| {
979 c1.fetch_add(1, Ordering::Relaxed);
980 }));
981 let c2 = Arc::clone(&count);
982 manager.register(Arc::new(move |_evt: AlertEvent| {
983 c2.fetch_add(1, Ordering::Relaxed);
984 }));
985
986 manager.emit(AlertEvent::NodeFailed { node_id: 5 });
987
988 assert_eq!(
989 count.load(Ordering::Relaxed),
990 2,
991 "both callbacks should have been invoked"
992 );
993 }
994
995 #[test]
998 fn test_alert_manager_thread_safe() {
999 use std::sync::atomic::{AtomicUsize, Ordering};
1000 use std::thread;
1001
1002 let manager = Arc::new(AlertManager::new());
1003 let received = Arc::new(AtomicUsize::new(0));
1004
1005 let mut handles = Vec::new();
1007 for _ in 0..4 {
1008 let mgr = Arc::clone(&manager);
1009 let recv = Arc::clone(&received);
1010 handles.push(thread::spawn(move || {
1011 mgr.register(Arc::new(move |_evt: AlertEvent| {
1012 recv.fetch_add(1, Ordering::Relaxed);
1013 }));
1014 }));
1015 }
1016 for h in handles {
1017 h.join().expect("register thread must not panic");
1018 }
1019
1020 let mut emit_handles = Vec::new();
1022 for _ in 0..4 {
1023 let mgr = Arc::clone(&manager);
1024 emit_handles.push(thread::spawn(move || {
1025 mgr.emit(AlertEvent::NodeFailed { node_id: 1 });
1026 }));
1027 }
1028 for h in emit_handles {
1029 h.join().expect("emit thread must not panic");
1030 }
1031
1032 let total = received.load(Ordering::Relaxed);
1034 assert_eq!(total, 16, "expected 16 invocations, got {}", total);
1035 }
1036
1037 #[test]
1039 fn test_alert_manager_leader_changed_event() {
1040 let manager = AlertManager::new();
1041 let events: Arc<Mutex<Vec<AlertEvent>>> = Arc::new(Mutex::new(Vec::new()));
1042
1043 let ev = Arc::clone(&events);
1044 manager.register(Arc::new(move |e: AlertEvent| {
1045 ev.lock().unwrap_or_else(|e| e.into_inner()).push(e);
1046 }));
1047
1048 manager.emit(AlertEvent::LeaderChanged {
1049 old_leader: Some(1),
1050 new_leader: 2,
1051 });
1052
1053 let guard = events.lock().unwrap_or_else(|e| e.into_inner());
1054 assert_eq!(guard.len(), 1);
1055 assert!(matches!(
1056 guard[0],
1057 AlertEvent::LeaderChanged {
1058 old_leader: Some(1),
1059 new_leader: 2,
1060 }
1061 ));
1062 }
1063
1064 #[test]
1068 fn test_failover_redirects_after_leader_loss() {
1069 let mut coord =
1070 FailoverCoordinator::new(HeartbeatConfig::default(), FailoverConfig::default(), 1);
1071
1072 assert!(
1074 !coord.should_redirect(1),
1075 "no redirect when leader is unknown"
1076 );
1077 assert!(
1078 !coord.should_redirect(2),
1079 "no redirect when leader is unknown"
1080 );
1081
1082 coord.set_leader(2);
1084 assert!(
1086 coord.should_redirect(1),
1087 "node 1 should redirect when leader is node 2"
1088 );
1089 assert!(
1091 !coord.should_redirect(2),
1092 "node 2 should not redirect when it is the leader"
1093 );
1094
1095 coord.clear_leader();
1097 assert!(
1099 !coord.should_redirect(1),
1100 "no redirect when leader just lost (election pending)"
1101 );
1102
1103 coord.set_leader(3);
1105 assert!(
1106 coord.should_redirect(1),
1107 "node 1 should redirect to new leader node 3"
1108 );
1109 assert!(
1110 coord.should_redirect(2),
1111 "node 2 should redirect to new leader node 3"
1112 );
1113 assert!(
1114 !coord.should_redirect(3),
1115 "node 3 should not redirect to itself"
1116 );
1117 }
1118
1119 #[test]
1122 fn test_failover_no_redirect_on_follower_loss() {
1123 let mut coord =
1124 FailoverCoordinator::new(fast_heartbeat_config(), fast_failover_config(), 1);
1125 coord.track_peer(2).expect("track peer 2");
1126 coord.track_peer(3).expect("track peer 3");
1127 coord.set_leader(2);
1129
1130 thread::sleep(Duration::from_millis(50));
1132 coord.record_heartbeat(2).expect("leader heartbeat");
1133 let events = coord.tick().expect("tick");
1134
1135 let peer_failed = events
1137 .iter()
1138 .any(|e| matches!(e, FailoverEvent::PeerFailed { node_id: 3 }));
1139 assert!(peer_failed, "Expected PeerFailed for node 3");
1140
1141 assert_eq!(
1143 coord.leader_hint(),
1144 Some(2),
1145 "leader hint should still be node 2 after follower loss"
1146 );
1147
1148 assert!(
1150 coord.should_redirect(1),
1151 "node 1 should still redirect to leader 2 after follower 3 fails"
1152 );
1153 assert!(
1155 !coord.is_election_pending(),
1156 "election must not be triggered by non-leader failure"
1157 );
1158 }
1159}