1use crate::connection_pool::{ConnectionFactory, PooledConnection};
7use anyhow::{anyhow, Result};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use tokio::sync::{broadcast, RwLock};
13use tokio::time::{interval, sleep};
14use tracing::{error, info, warn};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct FailoverConfig {
19 pub health_check_interval: Duration,
21 pub health_check_timeout: Duration,
23 pub failure_threshold: u32,
25 pub recovery_threshold: u32,
27 pub failback_delay: Duration,
29 pub auto_failback: bool,
31 pub connection_timeout: Duration,
33 pub enable_notifications: bool,
35}
36
37impl Default for FailoverConfig {
38 fn default() -> Self {
39 Self {
40 health_check_interval: Duration::from_secs(10),
41 health_check_timeout: Duration::from_secs(5),
42 failure_threshold: 3,
43 recovery_threshold: 3,
44 failback_delay: Duration::from_secs(60),
45 auto_failback: true,
46 connection_timeout: Duration::from_secs(30),
47 enable_notifications: true,
48 }
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
54pub enum FailoverState {
55 #[default]
57 Primary,
58 Secondary,
60 FailingOver,
62 FailingBack,
64 Unavailable,
66}
67
68#[derive(Debug, Clone)]
70pub enum FailoverEvent {
71 FailoverInitiated {
73 from: String,
74 to: String,
75 reason: String,
76 },
77 FailoverCompleted {
79 from: String,
80 to: String,
81 duration: Duration,
82 },
83 FailoverFailed {
85 from: String,
86 to: String,
87 error: String,
88 },
89 FailbackInitiated { from: String, to: String },
91 FailbackCompleted {
93 from: String,
94 to: String,
95 duration: Duration,
96 },
97 FailbackFailed {
99 from: String,
100 to: String,
101 error: String,
102 },
103 HealthCheckFailed {
105 connection: String,
106 consecutive_failures: u32,
107 },
108 ConnectionRecovered { connection: String },
110 AllConnectionsUnavailable,
112}
113
114#[derive(Debug, Clone, Default, Serialize, Deserialize)]
116pub struct FailoverStatistics {
117 pub total_failovers: u64,
118 pub successful_failovers: u64,
119 pub failed_failovers: u64,
120 pub total_failbacks: u64,
121 pub successful_failbacks: u64,
122 pub failed_failbacks: u64,
123 pub primary_uptime: Duration,
124 pub secondary_uptime: Duration,
125 #[serde(skip)]
126 pub last_failover: Option<Instant>,
127 #[serde(skip)]
128 pub last_failback: Option<Instant>,
129 #[serde(skip)]
130 pub last_failback_failure: Option<Instant>,
131 pub current_state: FailoverState,
132 #[serde(skip)]
133 pub state_changes: Vec<(Instant, FailoverState)>,
134}
135
136#[derive(Clone)]
138pub struct ConnectionEndpoint<T: PooledConnection + Clone> {
139 pub name: String,
140 pub factory: Arc<dyn ConnectionFactory<T>>,
141 pub priority: u32,
142 pub metadata: HashMap<String, String>,
143}
144
145impl<T: PooledConnection + Clone> std::fmt::Debug for ConnectionEndpoint<T> {
146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 f.debug_struct("ConnectionEndpoint")
148 .field("name", &self.name)
149 .field("factory", &"<ConnectionFactory>")
150 .field("priority", &self.priority)
151 .field("metadata", &self.metadata)
152 .finish()
153 }
154}
155
156pub struct FailoverManager<T: PooledConnection + Clone> {
158 config: FailoverConfig,
159 primary: ConnectionEndpoint<T>,
160 secondary: ConnectionEndpoint<T>,
161 current_connection: Arc<RwLock<Option<T>>>,
162 state: Arc<RwLock<FailoverState>>,
163 statistics: Arc<RwLock<FailoverStatistics>>,
164 event_sender: broadcast::Sender<FailoverEvent>,
165 health_status: Arc<RwLock<HealthStatusTracker>>,
166 shutdown_signal: Arc<RwLock<bool>>,
167}
168
169#[derive(Debug, Clone)]
171struct HealthStatusTracker {
172 primary_consecutive_failures: u32,
173 primary_consecutive_successes: u32,
174 secondary_consecutive_failures: u32,
175 secondary_consecutive_successes: u32,
176 primary_last_check: Option<Instant>,
177 secondary_last_check: Option<Instant>,
178 primary_healthy: bool,
179 secondary_healthy: bool,
180}
181
182impl Default for HealthStatusTracker {
183 fn default() -> Self {
184 Self {
185 primary_consecutive_failures: 0,
186 primary_consecutive_successes: 0,
187 secondary_consecutive_failures: 0,
188 secondary_consecutive_successes: 0,
189 primary_last_check: None,
190 secondary_last_check: None,
191 primary_healthy: true,
192 secondary_healthy: true,
193 }
194 }
195}
196
197impl<T: PooledConnection + Clone> FailoverManager<T> {
198 pub async fn new(
200 config: FailoverConfig,
201 primary: ConnectionEndpoint<T>,
202 secondary: ConnectionEndpoint<T>,
203 ) -> Result<Self> {
204 let (event_sender, _) = broadcast::channel(1000);
205
206 let initial_connection = match tokio::time::timeout(
208 config.connection_timeout,
209 primary.factory.create_connection(),
210 )
211 .await
212 {
213 Ok(Ok(conn)) => {
214 info!("Successfully connected to primary: {}", primary.name);
215 Some(conn)
216 }
217 _ => {
218 warn!("Failed to connect to primary, trying secondary");
219 match tokio::time::timeout(
220 config.connection_timeout,
221 secondary.factory.create_connection(),
222 )
223 .await
224 {
225 Ok(Ok(conn)) => {
226 info!("Successfully connected to secondary: {}", secondary.name);
227 Some(conn)
228 }
229 _ => {
230 error!("Failed to connect to both primary and secondary");
231 None
232 }
233 }
234 }
235 };
236
237 let initial_state = if initial_connection.is_some() {
238 FailoverState::Primary
239 } else {
240 FailoverState::Unavailable
241 };
242
243 let mut statistics = FailoverStatistics {
244 current_state: initial_state.clone(),
245 ..Default::default()
246 };
247 statistics
248 .state_changes
249 .push((Instant::now(), initial_state.clone()));
250
251 let manager = Self {
252 config,
253 primary,
254 secondary,
255 current_connection: Arc::new(RwLock::new(initial_connection)),
256 state: Arc::new(RwLock::new(initial_state)),
257 statistics: Arc::new(RwLock::new(statistics)),
258 event_sender,
259 health_status: Arc::new(RwLock::new(HealthStatusTracker::default())),
260 shutdown_signal: Arc::new(RwLock::new(false)),
261 };
262
263 manager.start_health_monitoring().await;
265
266 Ok(manager)
267 }
268
269 pub async fn get_connection(&self) -> Result<T> {
271 let state = self.state.read().await.clone();
272
273 match state {
274 FailoverState::Primary | FailoverState::Secondary => {
275 if let Some(conn) = self.current_connection.read().await.as_ref() {
276 if conn.is_healthy().await {
277 return Err(anyhow!(
281 "Connection borrowing not implemented in this example"
282 ));
283 }
284 }
285
286 self.handle_connection_failure().await
288 }
289 FailoverState::FailingOver | FailoverState::FailingBack => {
290 let mut retry_count = 0;
292 while retry_count < 10 {
293 sleep(Duration::from_millis(100)).await;
294 let current_state = self.state.read().await.clone();
295 if !matches!(
296 current_state,
297 FailoverState::FailingOver | FailoverState::FailingBack
298 ) {
299 return self.get_connection().await;
300 }
301 retry_count += 1;
302 }
303 Err(anyhow!("Failover in progress timeout"))
304 }
305 FailoverState::Unavailable => Err(anyhow!("No connections available")),
306 }
307 }
308
309 async fn handle_connection_failure(&self) -> Result<T> {
311 let current_state = self.state.read().await.clone();
312
313 match current_state {
314 FailoverState::Primary => {
315 self.failover_to_secondary().await
317 }
318 FailoverState::Secondary => {
319 if self.health_status.read().await.primary_healthy {
321 self.failback_to_primary().await
322 } else {
323 Err(anyhow!(
324 "Secondary connection failed and primary is still unhealthy"
325 ))
326 }
327 }
328 _ => Err(anyhow!(
329 "Connection failure in unexpected state: {:?}",
330 current_state
331 )),
332 }
333 }
334
335 async fn failover_to_secondary(&self) -> Result<T> {
337 let start_time = Instant::now();
338
339 *self.state.write().await = FailoverState::FailingOver;
341
342 if self.config.enable_notifications {
343 let _ = self.event_sender.send(FailoverEvent::FailoverInitiated {
344 from: self.primary.name.clone(),
345 to: self.secondary.name.clone(),
346 reason: "Primary connection failure".to_string(),
347 });
348 }
349
350 match tokio::time::timeout(
352 self.config.connection_timeout,
353 self.secondary.factory.create_connection(),
354 )
355 .await
356 {
357 Ok(Ok(conn)) => {
358 if let Some(mut old_conn) = self.current_connection.write().await.take() {
360 let _ = old_conn.close().await;
361 }
362
363 *self.current_connection.write().await = Some(conn);
364 *self.state.write().await = FailoverState::Secondary;
365
366 let duration = start_time.elapsed();
367
368 let mut stats = self.statistics.write().await;
370 stats.total_failovers += 1;
371 stats.successful_failovers += 1;
372 stats.last_failover = Some(Instant::now());
373 stats.current_state = FailoverState::Secondary;
374 stats
375 .state_changes
376 .push((Instant::now(), FailoverState::Secondary));
377
378 if self.config.enable_notifications {
379 let _ = self.event_sender.send(FailoverEvent::FailoverCompleted {
380 from: self.primary.name.clone(),
381 to: self.secondary.name.clone(),
382 duration,
383 });
384 }
385
386 info!(
387 "Successfully failed over from {} to {} in {:?}",
388 self.primary.name, self.secondary.name, duration
389 );
390
391 Err(anyhow!(
392 "Failover successful but connection borrowing not implemented"
393 ))
394 }
395 Ok(Err(e)) => {
396 *self.state.write().await = FailoverState::Unavailable;
397 let error_msg = e.to_string();
398
399 let mut stats = self.statistics.write().await;
401 stats.total_failovers += 1;
402 stats.failed_failovers += 1;
403 stats.current_state = FailoverState::Unavailable;
404
405 if self.config.enable_notifications {
406 let _ = self.event_sender.send(FailoverEvent::FailoverFailed {
407 from: self.primary.name.clone(),
408 to: self.secondary.name.clone(),
409 error: error_msg.clone(),
410 });
411
412 let _ = self
413 .event_sender
414 .send(FailoverEvent::AllConnectionsUnavailable);
415 }
416
417 error!("Failover to secondary failed: {}", error_msg);
418 Err(anyhow!("Failover failed: {}", error_msg))
419 }
420 Err(_timeout_err) => {
421 *self.state.write().await = FailoverState::Unavailable;
422 let error_msg = "Connection timeout".to_string();
423
424 let mut stats = self.statistics.write().await;
426 stats.total_failovers += 1;
427 stats.failed_failovers += 1;
428 stats.current_state = FailoverState::Unavailable;
429
430 if self.config.enable_notifications {
431 let _ = self.event_sender.send(FailoverEvent::FailoverFailed {
432 from: self.primary.name.clone(),
433 to: self.secondary.name.clone(),
434 error: error_msg.clone(),
435 });
436
437 let _ = self
438 .event_sender
439 .send(FailoverEvent::AllConnectionsUnavailable);
440 }
441
442 error!("Failover to secondary timed out");
443 Err(anyhow!("Failover failed: {}", error_msg))
444 }
445 }
446 }
447
448 async fn failback_to_primary(&self) -> Result<T> {
450 let start_time = Instant::now();
451
452 *self.state.write().await = FailoverState::FailingBack;
454
455 if self.config.enable_notifications {
456 let _ = self.event_sender.send(FailoverEvent::FailbackInitiated {
457 from: self.secondary.name.clone(),
458 to: self.primary.name.clone(),
459 });
460 }
461
462 match tokio::time::timeout(
464 self.config.connection_timeout,
465 self.primary.factory.create_connection(),
466 )
467 .await
468 {
469 Ok(Ok(conn)) => {
470 if let Some(mut old_conn) = self.current_connection.write().await.take() {
472 let _ = old_conn.close().await;
473 }
474
475 *self.current_connection.write().await = Some(conn);
476 *self.state.write().await = FailoverState::Primary;
477
478 let duration = start_time.elapsed();
479
480 let mut stats = self.statistics.write().await;
482 stats.total_failbacks += 1;
483 stats.successful_failbacks += 1;
484 stats.last_failback = Some(Instant::now());
485 stats.current_state = FailoverState::Primary;
486 stats
487 .state_changes
488 .push((Instant::now(), FailoverState::Primary));
489
490 if self.config.enable_notifications {
491 let _ = self.event_sender.send(FailoverEvent::FailbackCompleted {
492 from: self.secondary.name.clone(),
493 to: self.primary.name.clone(),
494 duration,
495 });
496 }
497
498 info!(
499 "Successfully failed back from {} to {} in {:?}",
500 self.secondary.name, self.primary.name, duration
501 );
502
503 Err(anyhow!(
504 "Failback successful but connection borrowing not implemented"
505 ))
506 }
507 Ok(Err(connection_err)) => {
508 *self.state.write().await = FailoverState::Secondary;
510 let error_msg = connection_err.to_string();
511
512 let mut stats = self.statistics.write().await;
513 stats.total_failbacks += 1;
514 stats.failed_failbacks += 1;
515 stats.last_failback_failure = Some(Instant::now());
516
517 if self.config.enable_notifications {
518 let _ = self.event_sender.send(FailoverEvent::FailbackFailed {
519 from: self.secondary.name.clone(),
520 to: self.primary.name.clone(),
521 error: error_msg.clone(),
522 });
523 }
524
525 warn!(
526 "Failback from {} to {} failed: {}",
527 self.secondary.name, self.primary.name, error_msg
528 );
529
530 Err(anyhow!("Failback failed: {}", error_msg))
531 }
532 Err(_timeout_err) => {
533 *self.state.write().await = FailoverState::Secondary;
535 let error_msg = "Connection timeout".to_string();
536
537 let mut stats = self.statistics.write().await;
539 stats.total_failbacks += 1;
540 stats.failed_failbacks += 1;
541 stats.last_failback_failure = Some(Instant::now());
542
543 if self.config.enable_notifications {
544 let _ = self.event_sender.send(FailoverEvent::FailbackFailed {
545 from: self.secondary.name.clone(),
546 to: self.primary.name.clone(),
547 error: error_msg.clone(),
548 });
549 }
550
551 warn!(
552 "Failback to primary failed: {}, staying on secondary",
553 error_msg
554 );
555 Err(anyhow!("Failback failed: {}", error_msg))
556 }
557 }
558 }
559
560 async fn start_health_monitoring(&self) {
562 let config = self.config.clone();
563 let primary = self.primary.clone();
564 let secondary = self.secondary.clone();
565 let state = self.state.clone();
566 let health_status = self.health_status.clone();
567 let event_sender = self.event_sender.clone();
568 let shutdown_signal = self.shutdown_signal.clone();
569
570 tokio::spawn(async move {
571 let mut check_interval = interval(config.health_check_interval);
572
573 loop {
574 check_interval.tick().await;
575
576 if *shutdown_signal.read().await {
577 info!("Failover health monitoring shutting down");
578 break;
579 }
580
581 let current_state = state.read().await.clone();
582
583 match tokio::time::timeout(
585 config.health_check_timeout,
586 primary.factory.create_connection(),
587 )
588 .await
589 {
590 Ok(Ok(conn)) => {
591 if conn.is_healthy().await {
592 let mut status = health_status.write().await;
593 status.primary_consecutive_successes += 1;
594 status.primary_consecutive_failures = 0;
595 status.primary_last_check = Some(Instant::now());
596
597 if !status.primary_healthy {
598 status.primary_healthy = true;
599 if config.enable_notifications {
600 let _ = event_sender.send(FailoverEvent::ConnectionRecovered {
601 connection: primary.name.clone(),
602 });
603 }
604 }
605
606 if config.auto_failback
608 && current_state == FailoverState::Secondary
609 && status.primary_consecutive_successes >= config.recovery_threshold
610 {
611 drop(status);
612 info!("Primary connection recovered, initiating auto-failback");
613 sleep(config.failback_delay).await;
614 }
617 }
618 }
619 _ => {
620 let mut status = health_status.write().await;
621 status.primary_consecutive_failures += 1;
622 status.primary_consecutive_successes = 0;
623 status.primary_healthy = false;
624 status.primary_last_check = Some(Instant::now());
625
626 if config.enable_notifications
627 && status.primary_consecutive_failures % 3 == 0
628 {
629 let _ = event_sender.send(FailoverEvent::HealthCheckFailed {
630 connection: primary.name.clone(),
631 consecutive_failures: status.primary_consecutive_failures,
632 });
633 }
634 }
635 }
636
637 if current_state == FailoverState::Secondary || config.auto_failback {
639 match tokio::time::timeout(
640 config.health_check_timeout,
641 secondary.factory.create_connection(),
642 )
643 .await
644 {
645 Ok(Ok(conn)) => {
646 if conn.is_healthy().await {
647 let mut status = health_status.write().await;
648 status.secondary_consecutive_successes += 1;
649 status.secondary_consecutive_failures = 0;
650 status.secondary_healthy = true;
651 status.secondary_last_check = Some(Instant::now());
652 }
653 }
654 _ => {
655 let mut status = health_status.write().await;
656 status.secondary_consecutive_failures += 1;
657 status.secondary_consecutive_successes = 0;
658 status.secondary_healthy = false;
659 status.secondary_last_check = Some(Instant::now());
660 }
661 }
662 }
663 }
664 });
665 }
666
667 pub async fn get_state(&self) -> FailoverState {
669 self.state.read().await.clone()
670 }
671
672 pub async fn get_statistics(&self) -> FailoverStatistics {
674 let mut stats = self.statistics.read().await.clone();
675
676 let now = Instant::now();
678 for (i, (timestamp, state)) in stats.state_changes.iter().enumerate() {
679 let duration = if i + 1 < stats.state_changes.len() {
680 stats.state_changes[i + 1].0.duration_since(*timestamp)
681 } else {
682 now.duration_since(*timestamp)
683 };
684
685 match state {
686 FailoverState::Primary => stats.primary_uptime += duration,
687 FailoverState::Secondary => stats.secondary_uptime += duration,
688 _ => {}
689 }
690 }
691
692 stats
693 }
694
695 pub fn subscribe(&self) -> broadcast::Receiver<FailoverEvent> {
697 self.event_sender.subscribe()
698 }
699
700 pub async fn trigger_failover(&self) -> Result<()> {
702 let current_state = self.state.read().await.clone();
703
704 match current_state {
705 FailoverState::Primary => self.failover_to_secondary().await.map(|_| ()),
706 FailoverState::Secondary => self.failback_to_primary().await.map(|_| ()),
707 _ => Err(anyhow!(
708 "Cannot trigger failover in current state: {:?}",
709 current_state
710 )),
711 }
712 }
713
714 pub async fn stop(&self) {
716 *self.shutdown_signal.write().await = true;
717 }
718}
719
720#[cfg(test)]
721mod tests {
722 use super::*;
723 use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
724
725 #[derive(Clone)]
726 struct TestConnection {
727 id: u32,
728 healthy: Arc<AtomicBool>,
729 }
730
731 #[async_trait::async_trait]
732 impl PooledConnection for TestConnection {
733 async fn is_healthy(&self) -> bool {
734 self.healthy.load(Ordering::Relaxed)
735 }
736
737 async fn close(&mut self) -> Result<()> {
738 Ok(())
739 }
740
741 fn clone_connection(&self) -> Box<dyn PooledConnection> {
742 Box::new(TestConnection {
743 id: self.id,
744 healthy: Arc::new(AtomicBool::new(self.healthy.load(Ordering::Relaxed))),
745 })
746 }
747
748 fn created_at(&self) -> Instant {
749 Instant::now()
750 }
751
752 fn last_activity(&self) -> Instant {
753 Instant::now()
754 }
755
756 fn update_activity(&mut self) {}
757 }
758
759 struct TestConnectionFactory {
760 counter: Arc<AtomicU32>,
761 should_fail: Arc<AtomicBool>,
762 }
763
764 #[async_trait::async_trait]
765 impl ConnectionFactory<TestConnection> for TestConnectionFactory {
766 async fn create_connection(&self) -> Result<TestConnection> {
767 if self.should_fail.load(Ordering::Relaxed) {
768 return Err(anyhow!("Simulated connection failure"));
769 }
770
771 let id = self.counter.fetch_add(1, Ordering::Relaxed);
772 Ok(TestConnection {
773 id,
774 healthy: Arc::new(AtomicBool::new(true)),
775 })
776 }
777 }
778
779 #[tokio::test]
780 async fn test_failover_manager_creation() {
781 let config = FailoverConfig::default();
782
783 let primary_factory = Arc::new(TestConnectionFactory {
784 counter: Arc::new(AtomicU32::new(0)),
785 should_fail: Arc::new(AtomicBool::new(false)),
786 });
787
788 let secondary_factory = Arc::new(TestConnectionFactory {
789 counter: Arc::new(AtomicU32::new(100)),
790 should_fail: Arc::new(AtomicBool::new(false)),
791 });
792
793 let primary = ConnectionEndpoint {
794 name: "primary".to_string(),
795 factory: primary_factory,
796 priority: 1,
797 metadata: HashMap::new(),
798 };
799
800 let secondary = ConnectionEndpoint {
801 name: "secondary".to_string(),
802 factory: secondary_factory,
803 priority: 2,
804 metadata: HashMap::new(),
805 };
806
807 let manager = FailoverManager::new(config, primary, secondary)
808 .await
809 .unwrap();
810
811 assert_eq!(manager.get_state().await, FailoverState::Primary);
812
813 manager.stop().await;
814 }
815
816 #[tokio::test]
817 async fn test_failover_events() {
818 let config = FailoverConfig {
819 enable_notifications: true,
820 ..Default::default()
821 };
822
823 let primary_should_fail = Arc::new(AtomicBool::new(false));
824 let primary_factory = Arc::new(TestConnectionFactory {
825 counter: Arc::new(AtomicU32::new(0)),
826 should_fail: primary_should_fail.clone(),
827 });
828
829 let secondary_factory = Arc::new(TestConnectionFactory {
830 counter: Arc::new(AtomicU32::new(100)),
831 should_fail: Arc::new(AtomicBool::new(false)),
832 });
833
834 let primary = ConnectionEndpoint {
835 name: "primary".to_string(),
836 factory: primary_factory,
837 priority: 1,
838 metadata: HashMap::new(),
839 };
840
841 let secondary = ConnectionEndpoint {
842 name: "secondary".to_string(),
843 factory: secondary_factory,
844 priority: 2,
845 metadata: HashMap::new(),
846 };
847
848 let manager = FailoverManager::new(config, primary, secondary)
849 .await
850 .unwrap();
851 let mut event_receiver = manager.subscribe();
852
853 primary_should_fail.store(true, Ordering::Relaxed);
855
856 let _ = manager.trigger_failover().await;
858
859 tokio::time::timeout(Duration::from_secs(1), async {
861 while let Ok(event) = event_receiver.recv().await {
862 if matches!(event, FailoverEvent::FailoverCompleted { .. }) {
863 return;
864 }
865 }
866 })
867 .await
868 .expect("Should receive failover completed event");
869
870 assert_eq!(manager.get_state().await, FailoverState::Secondary);
871
872 manager.stop().await;
873 }
874
875 #[tokio::test]
876 async fn test_failover_statistics() {
877 let config = FailoverConfig::default();
878
879 let primary_factory = Arc::new(TestConnectionFactory {
880 counter: Arc::new(AtomicU32::new(0)),
881 should_fail: Arc::new(AtomicBool::new(false)),
882 });
883
884 let secondary_factory = Arc::new(TestConnectionFactory {
885 counter: Arc::new(AtomicU32::new(100)),
886 should_fail: Arc::new(AtomicBool::new(false)),
887 });
888
889 let primary = ConnectionEndpoint {
890 name: "primary".to_string(),
891 factory: primary_factory,
892 priority: 1,
893 metadata: HashMap::new(),
894 };
895
896 let secondary = ConnectionEndpoint {
897 name: "secondary".to_string(),
898 factory: secondary_factory,
899 priority: 2,
900 metadata: HashMap::new(),
901 };
902
903 let manager = FailoverManager::new(config, primary, secondary)
904 .await
905 .unwrap();
906
907 let _ = manager.trigger_failover().await;
909
910 let stats = manager.get_statistics().await;
911 assert_eq!(stats.total_failovers, 1);
912 assert_eq!(stats.successful_failovers, 1);
913 assert_eq!(stats.current_state, FailoverState::Secondary);
914
915 manager.stop().await;
916 }
917}