Skip to main content

oxirs_stream/
failover.rs

1//! Failover mechanisms for connection pool
2//!
3//! Provides primary/secondary connection failover, automatic failover on connection failure,
4//! and comprehensive failover event notifications.
5
6use crate::connection_pool::{ConnectionFactory, PooledConnection};
7use anyhow::{anyhow, Result};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use tokio::sync::{broadcast, RwLock};
13use tokio::time::{interval, sleep};
14use tracing::{error, info, warn};
15
16/// Failover configuration
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct FailoverConfig {
19    /// Health check interval for primary connection
20    pub health_check_interval: Duration,
21    /// Timeout for health checks
22    pub health_check_timeout: Duration,
23    /// Number of consecutive failures before failover
24    pub failure_threshold: u32,
25    /// Number of consecutive successes before failback
26    pub recovery_threshold: u32,
27    /// Delay before attempting failback to primary
28    pub failback_delay: Duration,
29    /// Enable automatic failback to primary when it recovers
30    pub auto_failback: bool,
31    /// Connection timeout for failover attempts
32    pub connection_timeout: Duration,
33    /// Enable failover event notifications
34    pub enable_notifications: bool,
35}
36
37impl Default for FailoverConfig {
38    fn default() -> Self {
39        Self {
40            health_check_interval: Duration::from_secs(10),
41            health_check_timeout: Duration::from_secs(5),
42            failure_threshold: 3,
43            recovery_threshold: 3,
44            failback_delay: Duration::from_secs(60),
45            auto_failback: true,
46            connection_timeout: Duration::from_secs(30),
47            enable_notifications: true,
48        }
49    }
50}
51
52/// Failover state
53#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
54pub enum FailoverState {
55    /// Using primary connection
56    #[default]
57    Primary,
58    /// Failed over to secondary
59    Secondary,
60    /// Failing over from primary to secondary
61    FailingOver,
62    /// Failing back from secondary to primary
63    FailingBack,
64    /// Both connections unavailable
65    Unavailable,
66}
67
68/// Failover event
69#[derive(Debug, Clone)]
70pub enum FailoverEvent {
71    /// Failover initiated
72    FailoverInitiated {
73        from: String,
74        to: String,
75        reason: String,
76    },
77    /// Failover completed successfully
78    FailoverCompleted {
79        from: String,
80        to: String,
81        duration: Duration,
82    },
83    /// Failover failed
84    FailoverFailed {
85        from: String,
86        to: String,
87        error: String,
88    },
89    /// Failback initiated
90    FailbackInitiated { from: String, to: String },
91    /// Failback completed
92    FailbackCompleted {
93        from: String,
94        to: String,
95        duration: Duration,
96    },
97    /// Failback failed
98    FailbackFailed {
99        from: String,
100        to: String,
101        error: String,
102    },
103    /// Health check failed
104    HealthCheckFailed {
105        connection: String,
106        consecutive_failures: u32,
107    },
108    /// Connection recovered
109    ConnectionRecovered { connection: String },
110    /// All connections unavailable
111    AllConnectionsUnavailable,
112}
113
114/// Failover statistics
115#[derive(Debug, Clone, Default, Serialize, Deserialize)]
116pub struct FailoverStatistics {
117    pub total_failovers: u64,
118    pub successful_failovers: u64,
119    pub failed_failovers: u64,
120    pub total_failbacks: u64,
121    pub successful_failbacks: u64,
122    pub failed_failbacks: u64,
123    pub primary_uptime: Duration,
124    pub secondary_uptime: Duration,
125    #[serde(skip)]
126    pub last_failover: Option<Instant>,
127    #[serde(skip)]
128    pub last_failback: Option<Instant>,
129    #[serde(skip)]
130    pub last_failback_failure: Option<Instant>,
131    pub current_state: FailoverState,
132    #[serde(skip)]
133    pub state_changes: Vec<(Instant, FailoverState)>,
134}
135
136/// Connection endpoint configuration
137#[derive(Clone)]
138pub struct ConnectionEndpoint<T: PooledConnection + Clone> {
139    pub name: String,
140    pub factory: Arc<dyn ConnectionFactory<T>>,
141    pub priority: u32,
142    pub metadata: HashMap<String, String>,
143}
144
145impl<T: PooledConnection + Clone> std::fmt::Debug for ConnectionEndpoint<T> {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        f.debug_struct("ConnectionEndpoint")
148            .field("name", &self.name)
149            .field("factory", &"<ConnectionFactory>")
150            .field("priority", &self.priority)
151            .field("metadata", &self.metadata)
152            .finish()
153    }
154}
155
156/// Failover manager for primary/secondary connections
157pub struct FailoverManager<T: PooledConnection + Clone> {
158    config: FailoverConfig,
159    primary: ConnectionEndpoint<T>,
160    secondary: ConnectionEndpoint<T>,
161    current_connection: Arc<RwLock<Option<T>>>,
162    state: Arc<RwLock<FailoverState>>,
163    statistics: Arc<RwLock<FailoverStatistics>>,
164    event_sender: broadcast::Sender<FailoverEvent>,
165    health_status: Arc<RwLock<HealthStatusTracker>>,
166    shutdown_signal: Arc<RwLock<bool>>,
167}
168
169/// Health status tracking
170#[derive(Debug, Clone)]
171struct HealthStatusTracker {
172    primary_consecutive_failures: u32,
173    primary_consecutive_successes: u32,
174    secondary_consecutive_failures: u32,
175    secondary_consecutive_successes: u32,
176    primary_last_check: Option<Instant>,
177    secondary_last_check: Option<Instant>,
178    primary_healthy: bool,
179    secondary_healthy: bool,
180}
181
182impl Default for HealthStatusTracker {
183    fn default() -> Self {
184        Self {
185            primary_consecutive_failures: 0,
186            primary_consecutive_successes: 0,
187            secondary_consecutive_failures: 0,
188            secondary_consecutive_successes: 0,
189            primary_last_check: None,
190            secondary_last_check: None,
191            primary_healthy: true,
192            secondary_healthy: true,
193        }
194    }
195}
196
197impl<T: PooledConnection + Clone> FailoverManager<T> {
198    /// Create a new failover manager
199    pub async fn new(
200        config: FailoverConfig,
201        primary: ConnectionEndpoint<T>,
202        secondary: ConnectionEndpoint<T>,
203    ) -> Result<Self> {
204        let (event_sender, _) = broadcast::channel(1000);
205
206        // Try to establish primary connection first
207        let initial_connection = match tokio::time::timeout(
208            config.connection_timeout,
209            primary.factory.create_connection(),
210        )
211        .await
212        {
213            Ok(Ok(conn)) => {
214                info!("Successfully connected to primary: {}", primary.name);
215                Some(conn)
216            }
217            _ => {
218                warn!("Failed to connect to primary, trying secondary");
219                match tokio::time::timeout(
220                    config.connection_timeout,
221                    secondary.factory.create_connection(),
222                )
223                .await
224                {
225                    Ok(Ok(conn)) => {
226                        info!("Successfully connected to secondary: {}", secondary.name);
227                        Some(conn)
228                    }
229                    _ => {
230                        error!("Failed to connect to both primary and secondary");
231                        None
232                    }
233                }
234            }
235        };
236
237        let initial_state = if initial_connection.is_some() {
238            FailoverState::Primary
239        } else {
240            FailoverState::Unavailable
241        };
242
243        let mut statistics = FailoverStatistics {
244            current_state: initial_state.clone(),
245            ..Default::default()
246        };
247        statistics
248            .state_changes
249            .push((Instant::now(), initial_state.clone()));
250
251        let manager = Self {
252            config,
253            primary,
254            secondary,
255            current_connection: Arc::new(RwLock::new(initial_connection)),
256            state: Arc::new(RwLock::new(initial_state)),
257            statistics: Arc::new(RwLock::new(statistics)),
258            event_sender,
259            health_status: Arc::new(RwLock::new(HealthStatusTracker::default())),
260            shutdown_signal: Arc::new(RwLock::new(false)),
261        };
262
263        // Start health monitoring
264        manager.start_health_monitoring().await;
265
266        Ok(manager)
267    }
268
269    /// Get the current active connection
270    pub async fn get_connection(&self) -> Result<T> {
271        let state = self.state.read().await.clone();
272
273        match state {
274            FailoverState::Primary | FailoverState::Secondary => {
275                if let Some(conn) = self.current_connection.read().await.as_ref() {
276                    if conn.is_healthy().await {
277                        // Clone the connection if it implements Clone
278                        // For this example, we'll need to handle this differently
279                        // In real implementation, you'd return a reference or handle
280                        return Err(anyhow!(
281                            "Connection borrowing not implemented in this example"
282                        ));
283                    }
284                }
285
286                // Current connection is unhealthy, trigger failover
287                self.handle_connection_failure().await
288            }
289            FailoverState::FailingOver | FailoverState::FailingBack => {
290                // Wait for failover to complete
291                let mut retry_count = 0;
292                while retry_count < 10 {
293                    sleep(Duration::from_millis(100)).await;
294                    let current_state = self.state.read().await.clone();
295                    if !matches!(
296                        current_state,
297                        FailoverState::FailingOver | FailoverState::FailingBack
298                    ) {
299                        return self.get_connection().await;
300                    }
301                    retry_count += 1;
302                }
303                Err(anyhow!("Failover in progress timeout"))
304            }
305            FailoverState::Unavailable => Err(anyhow!("No connections available")),
306        }
307    }
308
309    /// Handle connection failure and trigger failover
310    async fn handle_connection_failure(&self) -> Result<T> {
311        let current_state = self.state.read().await.clone();
312
313        match current_state {
314            FailoverState::Primary => {
315                // Failover to secondary
316                self.failover_to_secondary().await
317            }
318            FailoverState::Secondary => {
319                // Try to failback to primary if it's recovered
320                if self.health_status.read().await.primary_healthy {
321                    self.failback_to_primary().await
322                } else {
323                    Err(anyhow!(
324                        "Secondary connection failed and primary is still unhealthy"
325                    ))
326                }
327            }
328            _ => Err(anyhow!(
329                "Connection failure in unexpected state: {:?}",
330                current_state
331            )),
332        }
333    }
334
335    /// Perform failover to secondary connection
336    async fn failover_to_secondary(&self) -> Result<T> {
337        let start_time = Instant::now();
338
339        // Update state
340        *self.state.write().await = FailoverState::FailingOver;
341
342        if self.config.enable_notifications {
343            let _ = self.event_sender.send(FailoverEvent::FailoverInitiated {
344                from: self.primary.name.clone(),
345                to: self.secondary.name.clone(),
346                reason: "Primary connection failure".to_string(),
347            });
348        }
349
350        // Attempt to create secondary connection
351        match tokio::time::timeout(
352            self.config.connection_timeout,
353            self.secondary.factory.create_connection(),
354        )
355        .await
356        {
357            Ok(Ok(conn)) => {
358                // Close old connection if exists
359                if let Some(mut old_conn) = self.current_connection.write().await.take() {
360                    let _ = old_conn.close().await;
361                }
362
363                *self.current_connection.write().await = Some(conn);
364                *self.state.write().await = FailoverState::Secondary;
365
366                let duration = start_time.elapsed();
367
368                // Update statistics
369                let mut stats = self.statistics.write().await;
370                stats.total_failovers += 1;
371                stats.successful_failovers += 1;
372                stats.last_failover = Some(Instant::now());
373                stats.current_state = FailoverState::Secondary;
374                stats
375                    .state_changes
376                    .push((Instant::now(), FailoverState::Secondary));
377
378                if self.config.enable_notifications {
379                    let _ = self.event_sender.send(FailoverEvent::FailoverCompleted {
380                        from: self.primary.name.clone(),
381                        to: self.secondary.name.clone(),
382                        duration,
383                    });
384                }
385
386                info!(
387                    "Successfully failed over from {} to {} in {:?}",
388                    self.primary.name, self.secondary.name, duration
389                );
390
391                Err(anyhow!(
392                    "Failover successful but connection borrowing not implemented"
393                ))
394            }
395            Ok(Err(e)) => {
396                *self.state.write().await = FailoverState::Unavailable;
397                let error_msg = e.to_string();
398
399                // Update statistics
400                let mut stats = self.statistics.write().await;
401                stats.total_failovers += 1;
402                stats.failed_failovers += 1;
403                stats.current_state = FailoverState::Unavailable;
404
405                if self.config.enable_notifications {
406                    let _ = self.event_sender.send(FailoverEvent::FailoverFailed {
407                        from: self.primary.name.clone(),
408                        to: self.secondary.name.clone(),
409                        error: error_msg.clone(),
410                    });
411
412                    let _ = self
413                        .event_sender
414                        .send(FailoverEvent::AllConnectionsUnavailable);
415                }
416
417                error!("Failover to secondary failed: {}", error_msg);
418                Err(anyhow!("Failover failed: {}", error_msg))
419            }
420            Err(_timeout_err) => {
421                *self.state.write().await = FailoverState::Unavailable;
422                let error_msg = "Connection timeout".to_string();
423
424                // Update statistics
425                let mut stats = self.statistics.write().await;
426                stats.total_failovers += 1;
427                stats.failed_failovers += 1;
428                stats.current_state = FailoverState::Unavailable;
429
430                if self.config.enable_notifications {
431                    let _ = self.event_sender.send(FailoverEvent::FailoverFailed {
432                        from: self.primary.name.clone(),
433                        to: self.secondary.name.clone(),
434                        error: error_msg.clone(),
435                    });
436
437                    let _ = self
438                        .event_sender
439                        .send(FailoverEvent::AllConnectionsUnavailable);
440                }
441
442                error!("Failover to secondary timed out");
443                Err(anyhow!("Failover failed: {}", error_msg))
444            }
445        }
446    }
447
448    /// Perform failback to primary connection
449    async fn failback_to_primary(&self) -> Result<T> {
450        let start_time = Instant::now();
451
452        // Update state
453        *self.state.write().await = FailoverState::FailingBack;
454
455        if self.config.enable_notifications {
456            let _ = self.event_sender.send(FailoverEvent::FailbackInitiated {
457                from: self.secondary.name.clone(),
458                to: self.primary.name.clone(),
459            });
460        }
461
462        // Attempt to create primary connection
463        match tokio::time::timeout(
464            self.config.connection_timeout,
465            self.primary.factory.create_connection(),
466        )
467        .await
468        {
469            Ok(Ok(conn)) => {
470                // Close old connection if exists
471                if let Some(mut old_conn) = self.current_connection.write().await.take() {
472                    let _ = old_conn.close().await;
473                }
474
475                *self.current_connection.write().await = Some(conn);
476                *self.state.write().await = FailoverState::Primary;
477
478                let duration = start_time.elapsed();
479
480                // Update statistics
481                let mut stats = self.statistics.write().await;
482                stats.total_failbacks += 1;
483                stats.successful_failbacks += 1;
484                stats.last_failback = Some(Instant::now());
485                stats.current_state = FailoverState::Primary;
486                stats
487                    .state_changes
488                    .push((Instant::now(), FailoverState::Primary));
489
490                if self.config.enable_notifications {
491                    let _ = self.event_sender.send(FailoverEvent::FailbackCompleted {
492                        from: self.secondary.name.clone(),
493                        to: self.primary.name.clone(),
494                        duration,
495                    });
496                }
497
498                info!(
499                    "Successfully failed back from {} to {} in {:?}",
500                    self.secondary.name, self.primary.name, duration
501                );
502
503                Err(anyhow!(
504                    "Failback successful but connection borrowing not implemented"
505                ))
506            }
507            Ok(Err(connection_err)) => {
508                // Connection creation failed
509                *self.state.write().await = FailoverState::Secondary;
510                let error_msg = connection_err.to_string();
511
512                let mut stats = self.statistics.write().await;
513                stats.total_failbacks += 1;
514                stats.failed_failbacks += 1;
515                stats.last_failback_failure = Some(Instant::now());
516
517                if self.config.enable_notifications {
518                    let _ = self.event_sender.send(FailoverEvent::FailbackFailed {
519                        from: self.secondary.name.clone(),
520                        to: self.primary.name.clone(),
521                        error: error_msg.clone(),
522                    });
523                }
524
525                warn!(
526                    "Failback from {} to {} failed: {}",
527                    self.secondary.name, self.primary.name, error_msg
528                );
529
530                Err(anyhow!("Failback failed: {}", error_msg))
531            }
532            Err(_timeout_err) => {
533                // Timeout occurred
534                *self.state.write().await = FailoverState::Secondary;
535                let error_msg = "Connection timeout".to_string();
536
537                // Update statistics
538                let mut stats = self.statistics.write().await;
539                stats.total_failbacks += 1;
540                stats.failed_failbacks += 1;
541                stats.last_failback_failure = Some(Instant::now());
542
543                if self.config.enable_notifications {
544                    let _ = self.event_sender.send(FailoverEvent::FailbackFailed {
545                        from: self.secondary.name.clone(),
546                        to: self.primary.name.clone(),
547                        error: error_msg.clone(),
548                    });
549                }
550
551                warn!(
552                    "Failback to primary failed: {}, staying on secondary",
553                    error_msg
554                );
555                Err(anyhow!("Failback failed: {}", error_msg))
556            }
557        }
558    }
559
560    /// Start health monitoring for automatic failover/failback
561    async fn start_health_monitoring(&self) {
562        let config = self.config.clone();
563        let primary = self.primary.clone();
564        let secondary = self.secondary.clone();
565        let state = self.state.clone();
566        let health_status = self.health_status.clone();
567        let event_sender = self.event_sender.clone();
568        let shutdown_signal = self.shutdown_signal.clone();
569
570        tokio::spawn(async move {
571            let mut check_interval = interval(config.health_check_interval);
572
573            loop {
574                check_interval.tick().await;
575
576                if *shutdown_signal.read().await {
577                    info!("Failover health monitoring shutting down");
578                    break;
579                }
580
581                let current_state = state.read().await.clone();
582
583                // Check primary health
584                match tokio::time::timeout(
585                    config.health_check_timeout,
586                    primary.factory.create_connection(),
587                )
588                .await
589                {
590                    Ok(Ok(conn)) => {
591                        if conn.is_healthy().await {
592                            let mut status = health_status.write().await;
593                            status.primary_consecutive_successes += 1;
594                            status.primary_consecutive_failures = 0;
595                            status.primary_last_check = Some(Instant::now());
596
597                            if !status.primary_healthy {
598                                status.primary_healthy = true;
599                                if config.enable_notifications {
600                                    let _ = event_sender.send(FailoverEvent::ConnectionRecovered {
601                                        connection: primary.name.clone(),
602                                    });
603                                }
604                            }
605
606                            // Auto-failback logic
607                            if config.auto_failback
608                                && current_state == FailoverState::Secondary
609                                && status.primary_consecutive_successes >= config.recovery_threshold
610                            {
611                                drop(status);
612                                info!("Primary connection recovered, initiating auto-failback");
613                                sleep(config.failback_delay).await;
614                                // Trigger failback through the manager
615                                // In real implementation, this would be done differently
616                            }
617                        }
618                    }
619                    _ => {
620                        let mut status = health_status.write().await;
621                        status.primary_consecutive_failures += 1;
622                        status.primary_consecutive_successes = 0;
623                        status.primary_healthy = false;
624                        status.primary_last_check = Some(Instant::now());
625
626                        if config.enable_notifications
627                            && status.primary_consecutive_failures % 3 == 0
628                        {
629                            let _ = event_sender.send(FailoverEvent::HealthCheckFailed {
630                                connection: primary.name.clone(),
631                                consecutive_failures: status.primary_consecutive_failures,
632                            });
633                        }
634                    }
635                }
636
637                // Check secondary health only if we're using it or as backup
638                if current_state == FailoverState::Secondary || config.auto_failback {
639                    match tokio::time::timeout(
640                        config.health_check_timeout,
641                        secondary.factory.create_connection(),
642                    )
643                    .await
644                    {
645                        Ok(Ok(conn)) => {
646                            if conn.is_healthy().await {
647                                let mut status = health_status.write().await;
648                                status.secondary_consecutive_successes += 1;
649                                status.secondary_consecutive_failures = 0;
650                                status.secondary_healthy = true;
651                                status.secondary_last_check = Some(Instant::now());
652                            }
653                        }
654                        _ => {
655                            let mut status = health_status.write().await;
656                            status.secondary_consecutive_failures += 1;
657                            status.secondary_consecutive_successes = 0;
658                            status.secondary_healthy = false;
659                            status.secondary_last_check = Some(Instant::now());
660                        }
661                    }
662                }
663            }
664        });
665    }
666
667    /// Get current failover state
668    pub async fn get_state(&self) -> FailoverState {
669        self.state.read().await.clone()
670    }
671
672    /// Get failover statistics
673    pub async fn get_statistics(&self) -> FailoverStatistics {
674        let mut stats = self.statistics.read().await.clone();
675
676        // Calculate uptimes
677        let now = Instant::now();
678        for (i, (timestamp, state)) in stats.state_changes.iter().enumerate() {
679            let duration = if i + 1 < stats.state_changes.len() {
680                stats.state_changes[i + 1].0.duration_since(*timestamp)
681            } else {
682                now.duration_since(*timestamp)
683            };
684
685            match state {
686                FailoverState::Primary => stats.primary_uptime += duration,
687                FailoverState::Secondary => stats.secondary_uptime += duration,
688                _ => {}
689            }
690        }
691
692        stats
693    }
694
695    /// Subscribe to failover events
696    pub fn subscribe(&self) -> broadcast::Receiver<FailoverEvent> {
697        self.event_sender.subscribe()
698    }
699
700    /// Manually trigger failover
701    pub async fn trigger_failover(&self) -> Result<()> {
702        let current_state = self.state.read().await.clone();
703
704        match current_state {
705            FailoverState::Primary => self.failover_to_secondary().await.map(|_| ()),
706            FailoverState::Secondary => self.failback_to_primary().await.map(|_| ()),
707            _ => Err(anyhow!(
708                "Cannot trigger failover in current state: {:?}",
709                current_state
710            )),
711        }
712    }
713
714    /// Stop health monitoring
715    pub async fn stop(&self) {
716        *self.shutdown_signal.write().await = true;
717    }
718}
719
720#[cfg(test)]
721mod tests {
722    use super::*;
723    use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
724
725    #[derive(Clone)]
726    struct TestConnection {
727        id: u32,
728        healthy: Arc<AtomicBool>,
729    }
730
731    #[async_trait::async_trait]
732    impl PooledConnection for TestConnection {
733        async fn is_healthy(&self) -> bool {
734            self.healthy.load(Ordering::Relaxed)
735        }
736
737        async fn close(&mut self) -> Result<()> {
738            Ok(())
739        }
740
741        fn clone_connection(&self) -> Box<dyn PooledConnection> {
742            Box::new(TestConnection {
743                id: self.id,
744                healthy: Arc::new(AtomicBool::new(self.healthy.load(Ordering::Relaxed))),
745            })
746        }
747
748        fn created_at(&self) -> Instant {
749            Instant::now()
750        }
751
752        fn last_activity(&self) -> Instant {
753            Instant::now()
754        }
755
756        fn update_activity(&mut self) {}
757    }
758
759    struct TestConnectionFactory {
760        counter: Arc<AtomicU32>,
761        should_fail: Arc<AtomicBool>,
762    }
763
764    #[async_trait::async_trait]
765    impl ConnectionFactory<TestConnection> for TestConnectionFactory {
766        async fn create_connection(&self) -> Result<TestConnection> {
767            if self.should_fail.load(Ordering::Relaxed) {
768                return Err(anyhow!("Simulated connection failure"));
769            }
770
771            let id = self.counter.fetch_add(1, Ordering::Relaxed);
772            Ok(TestConnection {
773                id,
774                healthy: Arc::new(AtomicBool::new(true)),
775            })
776        }
777    }
778
779    #[tokio::test]
780    async fn test_failover_manager_creation() {
781        let config = FailoverConfig::default();
782
783        let primary_factory = Arc::new(TestConnectionFactory {
784            counter: Arc::new(AtomicU32::new(0)),
785            should_fail: Arc::new(AtomicBool::new(false)),
786        });
787
788        let secondary_factory = Arc::new(TestConnectionFactory {
789            counter: Arc::new(AtomicU32::new(100)),
790            should_fail: Arc::new(AtomicBool::new(false)),
791        });
792
793        let primary = ConnectionEndpoint {
794            name: "primary".to_string(),
795            factory: primary_factory,
796            priority: 1,
797            metadata: HashMap::new(),
798        };
799
800        let secondary = ConnectionEndpoint {
801            name: "secondary".to_string(),
802            factory: secondary_factory,
803            priority: 2,
804            metadata: HashMap::new(),
805        };
806
807        let manager = FailoverManager::new(config, primary, secondary)
808            .await
809            .unwrap();
810
811        assert_eq!(manager.get_state().await, FailoverState::Primary);
812
813        manager.stop().await;
814    }
815
816    #[tokio::test]
817    async fn test_failover_events() {
818        let config = FailoverConfig {
819            enable_notifications: true,
820            ..Default::default()
821        };
822
823        let primary_should_fail = Arc::new(AtomicBool::new(false));
824        let primary_factory = Arc::new(TestConnectionFactory {
825            counter: Arc::new(AtomicU32::new(0)),
826            should_fail: primary_should_fail.clone(),
827        });
828
829        let secondary_factory = Arc::new(TestConnectionFactory {
830            counter: Arc::new(AtomicU32::new(100)),
831            should_fail: Arc::new(AtomicBool::new(false)),
832        });
833
834        let primary = ConnectionEndpoint {
835            name: "primary".to_string(),
836            factory: primary_factory,
837            priority: 1,
838            metadata: HashMap::new(),
839        };
840
841        let secondary = ConnectionEndpoint {
842            name: "secondary".to_string(),
843            factory: secondary_factory,
844            priority: 2,
845            metadata: HashMap::new(),
846        };
847
848        let manager = FailoverManager::new(config, primary, secondary)
849            .await
850            .unwrap();
851        let mut event_receiver = manager.subscribe();
852
853        // Simulate primary failure
854        primary_should_fail.store(true, Ordering::Relaxed);
855
856        // Trigger failover
857        let _ = manager.trigger_failover().await;
858
859        // Should receive failover events
860        tokio::time::timeout(Duration::from_secs(1), async {
861            while let Ok(event) = event_receiver.recv().await {
862                if matches!(event, FailoverEvent::FailoverCompleted { .. }) {
863                    return;
864                }
865            }
866        })
867        .await
868        .expect("Should receive failover completed event");
869
870        assert_eq!(manager.get_state().await, FailoverState::Secondary);
871
872        manager.stop().await;
873    }
874
875    #[tokio::test]
876    async fn test_failover_statistics() {
877        let config = FailoverConfig::default();
878
879        let primary_factory = Arc::new(TestConnectionFactory {
880            counter: Arc::new(AtomicU32::new(0)),
881            should_fail: Arc::new(AtomicBool::new(false)),
882        });
883
884        let secondary_factory = Arc::new(TestConnectionFactory {
885            counter: Arc::new(AtomicU32::new(100)),
886            should_fail: Arc::new(AtomicBool::new(false)),
887        });
888
889        let primary = ConnectionEndpoint {
890            name: "primary".to_string(),
891            factory: primary_factory,
892            priority: 1,
893            metadata: HashMap::new(),
894        };
895
896        let secondary = ConnectionEndpoint {
897            name: "secondary".to_string(),
898            factory: secondary_factory,
899            priority: 2,
900            metadata: HashMap::new(),
901        };
902
903        let manager = FailoverManager::new(config, primary, secondary)
904            .await
905            .unwrap();
906
907        // Trigger failover
908        let _ = manager.trigger_failover().await;
909
910        let stats = manager.get_statistics().await;
911        assert_eq!(stats.total_failovers, 1);
912        assert_eq!(stats.successful_failovers, 1);
913        assert_eq!(stats.current_state, FailoverState::Secondary);
914
915        manager.stop().await;
916    }
917}