ruvector_replication/
failover.rs

1//! Automatic failover and high availability
2//!
3//! Provides failover management with health monitoring,
4//! quorum-based decision making, and split-brain prevention.
5
6use crate::{Replica, ReplicaRole, ReplicaSet, ReplicationError, Result};
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::time::interval;
13
14/// Health status of a replica
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16pub enum HealthStatus {
17    /// Replica is healthy
18    Healthy,
19    /// Replica is degraded but operational
20    Degraded,
21    /// Replica is unhealthy
22    Unhealthy,
23    /// Replica is not responding
24    Unresponsive,
25}
26
27/// Health check result
28#[derive(Debug, Clone)]
29pub struct HealthCheck {
30    /// Replica ID
31    pub replica_id: String,
32    /// Health status
33    pub status: HealthStatus,
34    /// Response time in milliseconds
35    pub response_time_ms: u64,
36    /// Error message if unhealthy
37    pub error: Option<String>,
38    /// Timestamp of the check
39    pub timestamp: DateTime<Utc>,
40}
41
42impl HealthCheck {
43    /// Create a healthy check result
44    pub fn healthy(replica_id: String, response_time_ms: u64) -> Self {
45        Self {
46            replica_id,
47            status: HealthStatus::Healthy,
48            response_time_ms,
49            error: None,
50            timestamp: Utc::now(),
51        }
52    }
53
54    /// Create an unhealthy check result
55    pub fn unhealthy(replica_id: String, error: String) -> Self {
56        Self {
57            replica_id,
58            status: HealthStatus::Unhealthy,
59            response_time_ms: 0,
60            error: Some(error),
61            timestamp: Utc::now(),
62        }
63    }
64
65    /// Create an unresponsive check result
66    pub fn unresponsive(replica_id: String) -> Self {
67        Self {
68            replica_id,
69            status: HealthStatus::Unresponsive,
70            response_time_ms: 0,
71            error: Some("No response".to_string()),
72            timestamp: Utc::now(),
73        }
74    }
75}
76
77/// Failover policy configuration
78#[derive(Debug, Clone)]
79pub struct FailoverPolicy {
80    /// Enable automatic failover
81    pub auto_failover: bool,
82    /// Health check interval
83    pub health_check_interval: Duration,
84    /// Timeout for health checks
85    pub health_check_timeout: Duration,
86    /// Number of consecutive failures before failover
87    pub failure_threshold: usize,
88    /// Minimum quorum size for failover
89    pub min_quorum: usize,
90    /// Enable split-brain prevention
91    pub prevent_split_brain: bool,
92}
93
94impl Default for FailoverPolicy {
95    fn default() -> Self {
96        Self {
97            auto_failover: true,
98            health_check_interval: Duration::from_secs(5),
99            health_check_timeout: Duration::from_secs(2),
100            failure_threshold: 3,
101            min_quorum: 2,
102            prevent_split_brain: true,
103        }
104    }
105}
106
107/// Manages automatic failover and health monitoring
108pub struct FailoverManager {
109    /// The replica set
110    replica_set: Arc<RwLock<ReplicaSet>>,
111    /// Failover policy
112    policy: Arc<RwLock<FailoverPolicy>>,
113    /// Health check history
114    health_history: Arc<RwLock<Vec<HealthCheck>>>,
115    /// Failure counts by replica
116    failure_counts: Arc<RwLock<std::collections::HashMap<String, usize>>>,
117    /// Whether failover is in progress
118    failover_in_progress: Arc<RwLock<bool>>,
119}
120
121impl FailoverManager {
122    /// Create a new failover manager
123    pub fn new(replica_set: Arc<RwLock<ReplicaSet>>) -> Self {
124        Self {
125            replica_set,
126            policy: Arc::new(RwLock::new(FailoverPolicy::default())),
127            health_history: Arc::new(RwLock::new(Vec::new())),
128            failure_counts: Arc::new(RwLock::new(std::collections::HashMap::new())),
129            failover_in_progress: Arc::new(RwLock::new(false)),
130        }
131    }
132
133    /// Create with custom policy
134    pub fn with_policy(replica_set: Arc<RwLock<ReplicaSet>>, policy: FailoverPolicy) -> Self {
135        Self {
136            replica_set,
137            policy: Arc::new(RwLock::new(policy)),
138            health_history: Arc::new(RwLock::new(Vec::new())),
139            failure_counts: Arc::new(RwLock::new(std::collections::HashMap::new())),
140            failover_in_progress: Arc::new(RwLock::new(false)),
141        }
142    }
143
144    /// Set the failover policy
145    pub fn set_policy(&self, policy: FailoverPolicy) {
146        *self.policy.write() = policy;
147    }
148
149    /// Get the current policy
150    pub fn policy(&self) -> FailoverPolicy {
151        self.policy.read().clone()
152    }
153
154    /// Start health monitoring
155    pub async fn start_monitoring(&self) {
156        let policy = self.policy.read().clone();
157        let replica_set = self.replica_set.clone();
158        let health_history = self.health_history.clone();
159        let failure_counts = self.failure_counts.clone();
160        let failover_in_progress = self.failover_in_progress.clone();
161        let manager_policy = self.policy.clone();
162
163        tokio::spawn(async move {
164            let mut interval_timer = interval(policy.health_check_interval);
165
166            loop {
167                interval_timer.tick().await;
168
169                let replica_ids = {
170                    let set = replica_set.read();
171                    set.replica_ids()
172                };
173
174                for replica_id in replica_ids {
175                    let health = Self::check_replica_health(
176                        &replica_set,
177                        &replica_id,
178                        policy.health_check_timeout,
179                    )
180                    .await;
181
182                    // Record health check
183                    health_history.write().push(health.clone());
184
185                    // Update failure count and check if failover is needed
186                    // Use a scope to ensure lock is dropped before any await
187                    let should_failover = {
188                        let mut counts = failure_counts.write();
189                        let count = counts.entry(replica_id.clone()).or_insert(0);
190
191                        match health.status {
192                            HealthStatus::Healthy => {
193                                *count = 0;
194                                false
195                            }
196                            HealthStatus::Degraded => {
197                                // Don't increment for degraded
198                                false
199                            }
200                            HealthStatus::Unhealthy | HealthStatus::Unresponsive => {
201                                *count += 1;
202
203                                // Check if failover is needed
204                                let current_policy = manager_policy.read();
205                                *count >= current_policy.failure_threshold
206                                    && current_policy.auto_failover
207                            }
208                        }
209                    }; // Lock is dropped here
210
211                    // Trigger failover if needed (after lock is dropped)
212                    if should_failover {
213                        if let Err(e) =
214                            Self::trigger_failover(&replica_set, &failover_in_progress).await
215                        {
216                            tracing::error!("Failover failed: {}", e);
217                        }
218                    }
219                }
220
221                // Trim health history to last 1000 entries
222                let mut history = health_history.write();
223                let len = history.len();
224                if len > 1000 {
225                    history.drain(0..len - 1000);
226                }
227            }
228        });
229    }
230
231    /// Check health of a specific replica
232    async fn check_replica_health(
233        replica_set: &Arc<RwLock<ReplicaSet>>,
234        replica_id: &str,
235        timeout: Duration,
236    ) -> HealthCheck {
237        // In a real implementation, this would make a network call
238        // For now, we simulate health checks based on replica status
239
240        let replica = {
241            let set = replica_set.read();
242            set.get_replica(replica_id)
243        };
244
245        match replica {
246            Some(replica) => {
247                if replica.is_timed_out(timeout) {
248                    HealthCheck::unresponsive(replica_id.to_string())
249                } else if replica.is_healthy() {
250                    HealthCheck::healthy(replica_id.to_string(), 10)
251                } else {
252                    HealthCheck::unhealthy(replica_id.to_string(), "Replica is lagging".to_string())
253                }
254            }
255            None => HealthCheck::unhealthy(replica_id.to_string(), "Replica not found".to_string()),
256        }
257    }
258
259    /// Trigger failover to a healthy secondary
260    async fn trigger_failover(
261        replica_set: &Arc<RwLock<ReplicaSet>>,
262        failover_in_progress: &Arc<RwLock<bool>>,
263    ) -> Result<()> {
264        // Check if failover is already in progress
265        {
266            let mut in_progress = failover_in_progress.write();
267            if *in_progress {
268                return Ok(());
269            }
270            *in_progress = true;
271        }
272
273        tracing::warn!("Initiating failover");
274
275        // Find candidate within a scope to drop the lock before await
276        let candidate_id = {
277            let set = replica_set.read();
278
279            // Check quorum
280            if !set.has_quorum() {
281                *failover_in_progress.write() = false;
282                return Err(ReplicationError::QuorumNotMet {
283                    needed: set.get_quorum_size(),
284                    available: set.get_healthy_replicas().len(),
285                });
286            }
287
288            // Find best candidate for promotion
289            let candidate = Self::select_failover_candidate(&set)?;
290            candidate.id.clone()
291        }; // Lock is dropped here
292
293        // Promote the candidate (lock re-acquired inside promote_to_primary)
294        let result = {
295            let mut set = replica_set.write();
296            set.promote_to_primary(&candidate_id)
297        };
298
299        match &result {
300            Ok(()) => tracing::info!("Failover completed: promoted {} to primary", candidate_id),
301            Err(e) => tracing::error!("Failover failed: {}", e),
302        }
303
304        // Clear failover flag
305        *failover_in_progress.write() = false;
306
307        result
308    }
309
310    /// Select the best candidate for failover
311    fn select_failover_candidate(replica_set: &ReplicaSet) -> Result<Replica> {
312        let mut candidates: Vec<Replica> = replica_set
313            .get_healthy_replicas()
314            .into_iter()
315            .filter(|r| r.role == ReplicaRole::Secondary)
316            .collect();
317
318        if candidates.is_empty() {
319            return Err(ReplicationError::FailoverFailed(
320                "No healthy secondary replicas available".to_string(),
321            ));
322        }
323
324        // Sort by priority (highest first), then by lowest lag
325        candidates.sort_by(|a, b| b.priority.cmp(&a.priority).then(a.lag_ms.cmp(&b.lag_ms)));
326
327        Ok(candidates[0].clone())
328    }
329
330    /// Manually trigger failover
331    pub async fn manual_failover(&self, target_replica_id: Option<String>) -> Result<()> {
332        let mut set = self.replica_set.write();
333
334        // Check quorum
335        if !set.has_quorum() {
336            return Err(ReplicationError::QuorumNotMet {
337                needed: set.get_quorum_size(),
338                available: set.get_healthy_replicas().len(),
339            });
340        }
341
342        let target = if let Some(id) = target_replica_id {
343            set.get_replica(&id)
344                .ok_or_else(|| ReplicationError::ReplicaNotFound(id))?
345        } else {
346            Self::select_failover_candidate(&set)?
347        };
348
349        set.promote_to_primary(&target.id)?;
350
351        tracing::info!(
352            "Manual failover completed: promoted {} to primary",
353            target.id
354        );
355        Ok(())
356    }
357
358    /// Get health check history
359    pub fn health_history(&self) -> Vec<HealthCheck> {
360        self.health_history.read().clone()
361    }
362
363    /// Get recent health status for a replica
364    pub fn recent_health(&self, replica_id: &str, limit: usize) -> Vec<HealthCheck> {
365        let history = self.health_history.read();
366        history
367            .iter()
368            .rev()
369            .filter(|h| h.replica_id == replica_id)
370            .take(limit)
371            .cloned()
372            .collect()
373    }
374
375    /// Check if failover is currently in progress
376    pub fn is_failover_in_progress(&self) -> bool {
377        *self.failover_in_progress.read()
378    }
379
380    /// Get failure count for a replica
381    pub fn failure_count(&self, replica_id: &str) -> usize {
382        self.failure_counts
383            .read()
384            .get(replica_id)
385            .copied()
386            .unwrap_or(0)
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393
394    #[test]
395    fn test_health_check() {
396        let check = HealthCheck::healthy("r1".to_string(), 15);
397        assert_eq!(check.status, HealthStatus::Healthy);
398        assert_eq!(check.response_time_ms, 15);
399
400        let check = HealthCheck::unhealthy("r2".to_string(), "Error".to_string());
401        assert_eq!(check.status, HealthStatus::Unhealthy);
402        assert!(check.error.is_some());
403    }
404
405    #[test]
406    fn test_failover_policy() {
407        let policy = FailoverPolicy::default();
408        assert!(policy.auto_failover);
409        assert_eq!(policy.failure_threshold, 3);
410    }
411
412    #[test]
413    fn test_failover_manager() {
414        let mut replica_set = ReplicaSet::new("cluster-1");
415        replica_set
416            .add_replica("r1", "127.0.0.1:9001", ReplicaRole::Primary)
417            .unwrap();
418        replica_set
419            .add_replica("r2", "127.0.0.1:9002", ReplicaRole::Secondary)
420            .unwrap();
421
422        let manager = FailoverManager::new(Arc::new(RwLock::new(replica_set)));
423        assert!(!manager.is_failover_in_progress());
424    }
425
426    #[test]
427    fn test_candidate_selection() {
428        let mut replica_set = ReplicaSet::new("cluster-1");
429        replica_set
430            .add_replica("r1", "127.0.0.1:9001", ReplicaRole::Primary)
431            .unwrap();
432        replica_set
433            .add_replica("r2", "127.0.0.1:9002", ReplicaRole::Secondary)
434            .unwrap();
435        replica_set
436            .add_replica("r3", "127.0.0.1:9003", ReplicaRole::Secondary)
437            .unwrap();
438
439        let candidate = FailoverManager::select_failover_candidate(&replica_set).unwrap();
440        assert!(candidate.role == ReplicaRole::Secondary);
441        assert!(candidate.is_healthy());
442    }
443}