1use crate::{Replica, ReplicaRole, ReplicaSet, ReplicationError, Result};
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::time::interval;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16pub enum HealthStatus {
17 Healthy,
19 Degraded,
21 Unhealthy,
23 Unresponsive,
25}
26
27#[derive(Debug, Clone)]
29pub struct HealthCheck {
30 pub replica_id: String,
32 pub status: HealthStatus,
34 pub response_time_ms: u64,
36 pub error: Option<String>,
38 pub timestamp: DateTime<Utc>,
40}
41
42impl HealthCheck {
43 pub fn healthy(replica_id: String, response_time_ms: u64) -> Self {
45 Self {
46 replica_id,
47 status: HealthStatus::Healthy,
48 response_time_ms,
49 error: None,
50 timestamp: Utc::now(),
51 }
52 }
53
54 pub fn unhealthy(replica_id: String, error: String) -> Self {
56 Self {
57 replica_id,
58 status: HealthStatus::Unhealthy,
59 response_time_ms: 0,
60 error: Some(error),
61 timestamp: Utc::now(),
62 }
63 }
64
65 pub fn unresponsive(replica_id: String) -> Self {
67 Self {
68 replica_id,
69 status: HealthStatus::Unresponsive,
70 response_time_ms: 0,
71 error: Some("No response".to_string()),
72 timestamp: Utc::now(),
73 }
74 }
75}
76
77#[derive(Debug, Clone)]
79pub struct FailoverPolicy {
80 pub auto_failover: bool,
82 pub health_check_interval: Duration,
84 pub health_check_timeout: Duration,
86 pub failure_threshold: usize,
88 pub min_quorum: usize,
90 pub prevent_split_brain: bool,
92}
93
94impl Default for FailoverPolicy {
95 fn default() -> Self {
96 Self {
97 auto_failover: true,
98 health_check_interval: Duration::from_secs(5),
99 health_check_timeout: Duration::from_secs(2),
100 failure_threshold: 3,
101 min_quorum: 2,
102 prevent_split_brain: true,
103 }
104 }
105}
106
107pub struct FailoverManager {
109 replica_set: Arc<RwLock<ReplicaSet>>,
111 policy: Arc<RwLock<FailoverPolicy>>,
113 health_history: Arc<RwLock<Vec<HealthCheck>>>,
115 failure_counts: Arc<RwLock<std::collections::HashMap<String, usize>>>,
117 failover_in_progress: Arc<RwLock<bool>>,
119}
120
121impl FailoverManager {
122 pub fn new(replica_set: Arc<RwLock<ReplicaSet>>) -> Self {
124 Self {
125 replica_set,
126 policy: Arc::new(RwLock::new(FailoverPolicy::default())),
127 health_history: Arc::new(RwLock::new(Vec::new())),
128 failure_counts: Arc::new(RwLock::new(std::collections::HashMap::new())),
129 failover_in_progress: Arc::new(RwLock::new(false)),
130 }
131 }
132
133 pub fn with_policy(replica_set: Arc<RwLock<ReplicaSet>>, policy: FailoverPolicy) -> Self {
135 Self {
136 replica_set,
137 policy: Arc::new(RwLock::new(policy)),
138 health_history: Arc::new(RwLock::new(Vec::new())),
139 failure_counts: Arc::new(RwLock::new(std::collections::HashMap::new())),
140 failover_in_progress: Arc::new(RwLock::new(false)),
141 }
142 }
143
144 pub fn set_policy(&self, policy: FailoverPolicy) {
146 *self.policy.write() = policy;
147 }
148
149 pub fn policy(&self) -> FailoverPolicy {
151 self.policy.read().clone()
152 }
153
154 pub async fn start_monitoring(&self) {
156 let policy = self.policy.read().clone();
157 let replica_set = self.replica_set.clone();
158 let health_history = self.health_history.clone();
159 let failure_counts = self.failure_counts.clone();
160 let failover_in_progress = self.failover_in_progress.clone();
161 let manager_policy = self.policy.clone();
162
163 tokio::spawn(async move {
164 let mut interval_timer = interval(policy.health_check_interval);
165
166 loop {
167 interval_timer.tick().await;
168
169 let replica_ids = {
170 let set = replica_set.read();
171 set.replica_ids()
172 };
173
174 for replica_id in replica_ids {
175 let health = Self::check_replica_health(
176 &replica_set,
177 &replica_id,
178 policy.health_check_timeout,
179 )
180 .await;
181
182 health_history.write().push(health.clone());
184
185 let should_failover = {
188 let mut counts = failure_counts.write();
189 let count = counts.entry(replica_id.clone()).or_insert(0);
190
191 match health.status {
192 HealthStatus::Healthy => {
193 *count = 0;
194 false
195 }
196 HealthStatus::Degraded => {
197 false
199 }
200 HealthStatus::Unhealthy | HealthStatus::Unresponsive => {
201 *count += 1;
202
203 let current_policy = manager_policy.read();
205 *count >= current_policy.failure_threshold
206 && current_policy.auto_failover
207 }
208 }
209 }; if should_failover {
213 if let Err(e) =
214 Self::trigger_failover(&replica_set, &failover_in_progress).await
215 {
216 tracing::error!("Failover failed: {}", e);
217 }
218 }
219 }
220
221 let mut history = health_history.write();
223 let len = history.len();
224 if len > 1000 {
225 history.drain(0..len - 1000);
226 }
227 }
228 });
229 }
230
231 async fn check_replica_health(
233 replica_set: &Arc<RwLock<ReplicaSet>>,
234 replica_id: &str,
235 timeout: Duration,
236 ) -> HealthCheck {
237 let replica = {
241 let set = replica_set.read();
242 set.get_replica(replica_id)
243 };
244
245 match replica {
246 Some(replica) => {
247 if replica.is_timed_out(timeout) {
248 HealthCheck::unresponsive(replica_id.to_string())
249 } else if replica.is_healthy() {
250 HealthCheck::healthy(replica_id.to_string(), 10)
251 } else {
252 HealthCheck::unhealthy(replica_id.to_string(), "Replica is lagging".to_string())
253 }
254 }
255 None => HealthCheck::unhealthy(replica_id.to_string(), "Replica not found".to_string()),
256 }
257 }
258
259 async fn trigger_failover(
261 replica_set: &Arc<RwLock<ReplicaSet>>,
262 failover_in_progress: &Arc<RwLock<bool>>,
263 ) -> Result<()> {
264 {
266 let mut in_progress = failover_in_progress.write();
267 if *in_progress {
268 return Ok(());
269 }
270 *in_progress = true;
271 }
272
273 tracing::warn!("Initiating failover");
274
275 let candidate_id = {
277 let set = replica_set.read();
278
279 if !set.has_quorum() {
281 *failover_in_progress.write() = false;
282 return Err(ReplicationError::QuorumNotMet {
283 needed: set.get_quorum_size(),
284 available: set.get_healthy_replicas().len(),
285 });
286 }
287
288 let candidate = Self::select_failover_candidate(&set)?;
290 candidate.id.clone()
291 }; let result = {
295 let mut set = replica_set.write();
296 set.promote_to_primary(&candidate_id)
297 };
298
299 match &result {
300 Ok(()) => tracing::info!("Failover completed: promoted {} to primary", candidate_id),
301 Err(e) => tracing::error!("Failover failed: {}", e),
302 }
303
304 *failover_in_progress.write() = false;
306
307 result
308 }
309
310 fn select_failover_candidate(replica_set: &ReplicaSet) -> Result<Replica> {
312 let mut candidates: Vec<Replica> = replica_set
313 .get_healthy_replicas()
314 .into_iter()
315 .filter(|r| r.role == ReplicaRole::Secondary)
316 .collect();
317
318 if candidates.is_empty() {
319 return Err(ReplicationError::FailoverFailed(
320 "No healthy secondary replicas available".to_string(),
321 ));
322 }
323
324 candidates.sort_by(|a, b| b.priority.cmp(&a.priority).then(a.lag_ms.cmp(&b.lag_ms)));
326
327 Ok(candidates[0].clone())
328 }
329
330 pub async fn manual_failover(&self, target_replica_id: Option<String>) -> Result<()> {
332 let mut set = self.replica_set.write();
333
334 if !set.has_quorum() {
336 return Err(ReplicationError::QuorumNotMet {
337 needed: set.get_quorum_size(),
338 available: set.get_healthy_replicas().len(),
339 });
340 }
341
342 let target = if let Some(id) = target_replica_id {
343 set.get_replica(&id)
344 .ok_or_else(|| ReplicationError::ReplicaNotFound(id))?
345 } else {
346 Self::select_failover_candidate(&set)?
347 };
348
349 set.promote_to_primary(&target.id)?;
350
351 tracing::info!(
352 "Manual failover completed: promoted {} to primary",
353 target.id
354 );
355 Ok(())
356 }
357
358 pub fn health_history(&self) -> Vec<HealthCheck> {
360 self.health_history.read().clone()
361 }
362
363 pub fn recent_health(&self, replica_id: &str, limit: usize) -> Vec<HealthCheck> {
365 let history = self.health_history.read();
366 history
367 .iter()
368 .rev()
369 .filter(|h| h.replica_id == replica_id)
370 .take(limit)
371 .cloned()
372 .collect()
373 }
374
375 pub fn is_failover_in_progress(&self) -> bool {
377 *self.failover_in_progress.read()
378 }
379
380 pub fn failure_count(&self, replica_id: &str) -> usize {
382 self.failure_counts
383 .read()
384 .get(replica_id)
385 .copied()
386 .unwrap_or(0)
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393
394 #[test]
395 fn test_health_check() {
396 let check = HealthCheck::healthy("r1".to_string(), 15);
397 assert_eq!(check.status, HealthStatus::Healthy);
398 assert_eq!(check.response_time_ms, 15);
399
400 let check = HealthCheck::unhealthy("r2".to_string(), "Error".to_string());
401 assert_eq!(check.status, HealthStatus::Unhealthy);
402 assert!(check.error.is_some());
403 }
404
405 #[test]
406 fn test_failover_policy() {
407 let policy = FailoverPolicy::default();
408 assert!(policy.auto_failover);
409 assert_eq!(policy.failure_threshold, 3);
410 }
411
412 #[test]
413 fn test_failover_manager() {
414 let mut replica_set = ReplicaSet::new("cluster-1");
415 replica_set
416 .add_replica("r1", "127.0.0.1:9001", ReplicaRole::Primary)
417 .unwrap();
418 replica_set
419 .add_replica("r2", "127.0.0.1:9002", ReplicaRole::Secondary)
420 .unwrap();
421
422 let manager = FailoverManager::new(Arc::new(RwLock::new(replica_set)));
423 assert!(!manager.is_failover_in_progress());
424 }
425
426 #[test]
427 fn test_candidate_selection() {
428 let mut replica_set = ReplicaSet::new("cluster-1");
429 replica_set
430 .add_replica("r1", "127.0.0.1:9001", ReplicaRole::Primary)
431 .unwrap();
432 replica_set
433 .add_replica("r2", "127.0.0.1:9002", ReplicaRole::Secondary)
434 .unwrap();
435 replica_set
436 .add_replica("r3", "127.0.0.1:9003", ReplicaRole::Secondary)
437 .unwrap();
438
439 let candidate = FailoverManager::select_failover_candidate(&replica_set).unwrap();
440 assert!(candidate.role == ReplicaRole::Secondary);
441 assert!(candidate.is_healthy());
442 }
443}