1use crate::error::{MetricsError, Result};
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, VecDeque};
13use std::sync::{Arc, Mutex, RwLock};
14use std::time::{Duration, Instant, SystemTime};
15
16pub use super::config::{FaultToleranceConfig, NodeReplacementStrategy};
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub enum RecoveryActionType {
21 NodeFailover,
23 DataReplication,
25 NetworkHeal,
27 ServiceRestart,
29 ResourceScaling,
31 ConfigRollback,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
37pub enum NodeHealthStatus {
38 Healthy,
40 Degraded,
42 Failed,
44 Unknown,
46 Recovering,
48 Maintenance,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
54pub enum AlertSeverity {
55 Info,
57 Warning,
59 Error,
61 Critical,
63 Emergency,
65}
66
67#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
69pub enum RecoveryStrategy {
70 Immediate,
72 Graceful { delay: Duration },
74 Manual,
76 AutomaticWithFallback,
78 Progressive,
80}
81
82#[derive(Debug)]
84pub struct FaultRecoveryManager {
85 config: FaultToleranceConfig,
87 health_monitor: HealthMonitor,
89 recovery_history: Arc<RwLock<VecDeque<RecoveryAction>>>,
91 alert_thresholds: AlertThresholds,
93 active_recoveries: Arc<Mutex<HashMap<String, RecoveryOperation>>>,
95 replacement_queue: Arc<Mutex<VecDeque<NodeReplacementRequest>>>,
97}
98
99impl FaultRecoveryManager {
100 pub fn new(config: FaultToleranceConfig) -> Self {
102 Self {
103 health_monitor: HealthMonitor::new(config.health_check_interval),
104 alert_thresholds: AlertThresholds::default(),
105 recovery_history: Arc::new(RwLock::new(VecDeque::new())),
106 active_recoveries: Arc::new(Mutex::new(HashMap::new())),
107 replacement_queue: Arc::new(Mutex::new(VecDeque::new())),
108 config,
109 }
110 }
111
112 pub fn start(&mut self) -> Result<()> {
114 self.health_monitor.start()?;
115 Ok(())
116 }
117
118 pub fn stop(&mut self) -> Result<()> {
120 self.health_monitor.stop()?;
121 Ok(())
122 }
123
124 pub fn register_node(&mut self, node_id: String, metrics: NodeMetrics) -> Result<()> {
126 self.health_monitor.register_node(node_id, metrics)
127 }
128
129 pub fn unregister_node(&mut self, node_id: &str) -> Result<()> {
131 self.health_monitor.unregister_node(node_id)
132 }
133
134 pub fn update_node_metrics(&mut self, node_id: &str, metrics: NodeMetrics) -> Result<()> {
136 self.health_monitor
137 .update_metrics(node_id, metrics.clone())?;
138
139 if let Some(action) = self.evaluate_recovery_need(node_id, &metrics)? {
141 self.trigger_recovery(action)?;
142 }
143
144 Ok(())
145 }
146
147 fn evaluate_recovery_need(
149 &self,
150 node_id: &str,
151 metrics: &NodeMetrics,
152 ) -> Result<Option<RecoveryAction>> {
153 if metrics.cpu_usage > self.alert_thresholds.cpu_critical {
155 return Ok(Some(RecoveryAction {
156 id: format!(
157 "recovery_{}_cpu_{}",
158 node_id,
159 Instant::now().elapsed().as_millis()
160 ),
161 action_type: RecoveryActionType::ResourceScaling,
162 target_node: node_id.to_string(),
163 severity: AlertSeverity::Critical,
164 description: format!("High CPU usage: {}%", metrics.cpu_usage),
165 strategy: RecoveryStrategy::Immediate,
166 created_at: SystemTime::now(),
167 started_at: None,
168 completed_at: None,
169 status: RecoveryStatus::Pending,
170 error: None,
171 }));
172 }
173
174 if metrics.memory_usage > self.alert_thresholds.memory_critical {
176 return Ok(Some(RecoveryAction {
177 id: format!(
178 "recovery_{}_memory_{}",
179 node_id,
180 Instant::now().elapsed().as_millis()
181 ),
182 action_type: RecoveryActionType::ResourceScaling,
183 target_node: node_id.to_string(),
184 severity: AlertSeverity::Critical,
185 description: format!("High memory usage: {}%", metrics.memory_usage),
186 strategy: RecoveryStrategy::Immediate,
187 created_at: SystemTime::now(),
188 started_at: None,
189 completed_at: None,
190 status: RecoveryStatus::Pending,
191 error: None,
192 }));
193 }
194
195 let last_heartbeat_age = metrics
197 .last_heartbeat
198 .elapsed()
199 .unwrap_or_else(|_| Duration::from_secs(0));
200 if last_heartbeat_age > Duration::from_secs(self.config.health_check_interval * 3) {
201 return Ok(Some(RecoveryAction {
202 id: format!(
203 "recovery_{}_heartbeat_{}",
204 node_id,
205 SystemTime::now()
206 .duration_since(SystemTime::UNIX_EPOCH)
207 .unwrap_or_default()
208 .as_millis()
209 ),
210 action_type: RecoveryActionType::NodeFailover,
211 target_node: node_id.to_string(),
212 severity: AlertSeverity::Emergency,
213 description: format!("Node unresponsive for {:?}", last_heartbeat_age),
214 strategy: RecoveryStrategy::Immediate,
215 created_at: SystemTime::now(),
216 started_at: None,
217 completed_at: None,
218 status: RecoveryStatus::Pending,
219 error: None,
220 }));
221 }
222
223 Ok(None)
224 }
225
226 pub fn trigger_recovery(&mut self, action: RecoveryAction) -> Result<String> {
228 if !self.config.auto_recovery && action.strategy != RecoveryStrategy::Manual {
229 self.log_action(action);
231 return Ok(
232 "Recovery action logged but not executed (auto recovery disabled)".to_string(),
233 );
234 }
235
236 let action_id = action.id.clone();
237 let mut active_recoveries = self.active_recoveries.lock().expect("Operation failed");
238
239 let recovery_op = RecoveryOperation {
240 action: action.clone(),
241 progress: 0.0,
242 estimated_completion: None,
243 };
244
245 active_recoveries.insert(action_id.clone(), recovery_op);
246 drop(active_recoveries);
247
248 match action.action_type {
250 RecoveryActionType::NodeFailover => {
251 self.execute_node_failover(&action)?;
252 }
253 RecoveryActionType::DataReplication => {
254 self.execute_data_replication(&action)?;
255 }
256 RecoveryActionType::NetworkHeal => {
257 self.execute_network_heal(&action)?;
258 }
259 RecoveryActionType::ServiceRestart => {
260 self.execute_service_restart(&action)?;
261 }
262 RecoveryActionType::ResourceScaling => {
263 self.execute_resource_scaling(&action)?;
264 }
265 RecoveryActionType::ConfigRollback => {
266 self.execute_config_rollback(&action)?;
267 }
268 }
269
270 self.log_action(action);
271 Ok(action_id)
272 }
273
274 fn execute_node_failover(&self, action: &RecoveryAction) -> Result<()> {
276 println!("Executing node failover for node: {}", action.target_node);
284
285 match self.config.replacement_strategy {
287 NodeReplacementStrategy::Immediate | NodeReplacementStrategy::HotStandby => {
288 let mut queue = self.replacement_queue.lock().expect("Operation failed");
289 queue.push_back(NodeReplacementRequest {
290 failed_node: action.target_node.clone(),
291 replacement_type: self.config.replacement_strategy.clone(),
292 requested_at: SystemTime::now(),
293 priority: match action.severity {
294 AlertSeverity::Emergency | AlertSeverity::Critical => {
295 ReplacementPriority::High
296 }
297 _ => ReplacementPriority::Normal,
298 },
299 });
300 }
301 _ => {}
302 }
303
304 Ok(())
305 }
306
307 fn execute_data_replication(&self, action: &RecoveryAction) -> Result<()> {
309 println!(
311 "Executing data replication for node: {}",
312 action.target_node
313 );
314 Ok(())
315 }
316
317 fn execute_network_heal(&self, action: &RecoveryAction) -> Result<()> {
319 println!("Executing network heal for node: {}", action.target_node);
321 Ok(())
322 }
323
324 fn execute_service_restart(&self, action: &RecoveryAction) -> Result<()> {
326 println!("Executing service restart for node: {}", action.target_node);
328 Ok(())
329 }
330
331 fn execute_resource_scaling(&self, action: &RecoveryAction) -> Result<()> {
333 println!(
335 "Executing resource scaling for node: {}",
336 action.target_node
337 );
338 Ok(())
339 }
340
341 fn execute_config_rollback(&self, action: &RecoveryAction) -> Result<()> {
343 println!("Executing config rollback for node: {}", action.target_node);
345 Ok(())
346 }
347
348 fn log_action(&self, action: RecoveryAction) {
350 let mut history = self.recovery_history.write().expect("Operation failed");
351 history.push_back(action);
352
353 while history.len() > 10000 {
355 history.pop_front();
356 }
357 }
358
359 pub fn get_recovery_history(&self) -> Vec<RecoveryAction> {
361 let history = self.recovery_history.read().expect("Operation failed");
362 history.iter().cloned().collect()
363 }
364
365 pub fn get_active_recoveries(&self) -> Vec<RecoveryOperation> {
367 let active = self.active_recoveries.lock().expect("Operation failed");
368 active.values().cloned().collect()
369 }
370
371 pub fn complete_recovery(
373 &mut self,
374 action_id: &str,
375 success: bool,
376 error: Option<String>,
377 ) -> Result<()> {
378 let mut active_recoveries = self.active_recoveries.lock().expect("Operation failed");
379
380 if let Some(mut recovery_op) = active_recoveries.remove(action_id) {
381 recovery_op.action.completed_at = Some(SystemTime::now());
382 recovery_op.action.status = if success {
383 RecoveryStatus::Completed
384 } else {
385 RecoveryStatus::Failed
386 };
387 recovery_op.action.error = error;
388 recovery_op.progress = 1.0;
389
390 self.log_action(recovery_op.action);
392 }
393
394 Ok(())
395 }
396
397 pub fn get_health_summary(&self) -> HealthSummary {
399 self.health_monitor.get_health_summary()
400 }
401
402 pub fn update_alert_thresholds(&mut self, thresholds: AlertThresholds) {
404 self.alert_thresholds = thresholds;
405 }
406
407 pub fn process_replacement_requests(&mut self) -> Result<Vec<NodeReplacementRequest>> {
409 let mut queue = self.replacement_queue.lock().expect("Operation failed");
410 let requests: Vec<_> = queue.drain(..).collect();
411 Ok(requests)
412 }
413}
414
415#[derive(Debug, Clone, Serialize, Deserialize)]
417pub struct RecoveryAction {
418 pub id: String,
420 pub action_type: RecoveryActionType,
422 pub target_node: String,
424 pub severity: AlertSeverity,
426 pub description: String,
428 pub strategy: RecoveryStrategy,
430 pub created_at: SystemTime,
432 pub started_at: Option<SystemTime>,
434 pub completed_at: Option<SystemTime>,
436 pub status: RecoveryStatus,
438 pub error: Option<String>,
440}
441
442#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
444pub enum RecoveryStatus {
445 Pending,
447 InProgress,
449 Completed,
451 Failed,
453 Cancelled,
455}
456
457#[derive(Debug, Clone)]
459pub struct RecoveryOperation {
460 pub action: RecoveryAction,
462 pub progress: f64,
464 pub estimated_completion: Option<SystemTime>,
466}
467
468#[derive(Debug)]
470pub struct HealthMonitor {
471 nodes: Arc<RwLock<HashMap<String, NodeMonitoringInfo>>>,
473 check_interval: u64,
475 is_monitoring: Arc<RwLock<bool>>,
477}
478
479impl HealthMonitor {
480 pub fn new(check_interval: u64) -> Self {
482 Self {
483 nodes: Arc::new(RwLock::new(HashMap::new())),
484 check_interval,
485 is_monitoring: Arc::new(RwLock::new(false)),
486 }
487 }
488
489 pub fn start(&mut self) -> Result<()> {
491 let mut is_monitoring = self.is_monitoring.write().expect("Operation failed");
492 *is_monitoring = true;
493
494 Ok(())
496 }
497
498 pub fn stop(&mut self) -> Result<()> {
500 let mut is_monitoring = self.is_monitoring.write().expect("Operation failed");
501 *is_monitoring = false;
502
503 Ok(())
505 }
506
507 pub fn register_node(&mut self, node_id: String, metrics: NodeMetrics) -> Result<()> {
509 let mut nodes = self.nodes.write().expect("Operation failed");
510
511 let monitoring_info = NodeMonitoringInfo {
512 node_id: node_id.clone(),
513 current_metrics: metrics,
514 health_status: NodeHealthStatus::Healthy,
515 last_check: Instant::now(),
516 failure_count: 0,
517 recovery_attempts: 0,
518 alerts: VecDeque::new(),
519 };
520
521 nodes.insert(node_id, monitoring_info);
522 Ok(())
523 }
524
525 pub fn unregister_node(&mut self, node_id: &str) -> Result<()> {
527 let mut nodes = self.nodes.write().expect("Operation failed");
528 nodes.remove(node_id);
529 Ok(())
530 }
531
532 pub fn update_metrics(&mut self, node_id: &str, metrics: NodeMetrics) -> Result<()> {
534 let mut nodes = self.nodes.write().expect("Operation failed");
535
536 if let Some(monitoring_info) = nodes.get_mut(node_id) {
537 monitoring_info.current_metrics = metrics;
538 monitoring_info.last_check = Instant::now();
539 monitoring_info.health_status =
540 self.determine_health_status(&monitoring_info.current_metrics);
541 } else {
542 return Err(MetricsError::FaultToleranceError(format!(
543 "Node {} not registered for monitoring",
544 node_id
545 )));
546 }
547
548 Ok(())
549 }
550
551 fn determine_health_status(&self, metrics: &NodeMetrics) -> NodeHealthStatus {
553 let heartbeat_age = metrics
555 .last_heartbeat
556 .elapsed()
557 .unwrap_or_else(|_| Duration::from_secs(0));
558 if heartbeat_age > Duration::from_secs(self.check_interval * 3) {
559 return NodeHealthStatus::Failed;
560 }
561
562 if metrics.cpu_usage > 95.0 || metrics.memory_usage > 95.0 {
564 return NodeHealthStatus::Degraded;
565 }
566
567 if metrics.cpu_usage > 85.0 || metrics.memory_usage > 85.0 {
568 return NodeHealthStatus::Degraded;
569 }
570
571 NodeHealthStatus::Healthy
572 }
573
574 pub fn get_health_summary(&self) -> HealthSummary {
576 let nodes = self.nodes.read().expect("Operation failed");
577
578 let mut summary = HealthSummary {
579 total_nodes: nodes.len(),
580 healthy_nodes: 0,
581 degraded_nodes: 0,
582 failed_nodes: 0,
583 unknown_nodes: 0,
584 recovering_nodes: 0,
585 maintenance_nodes: 0,
586 last_updated: SystemTime::now(),
587 };
588
589 for monitoring_info in nodes.values() {
590 match monitoring_info.health_status {
591 NodeHealthStatus::Healthy => summary.healthy_nodes += 1,
592 NodeHealthStatus::Degraded => summary.degraded_nodes += 1,
593 NodeHealthStatus::Failed => summary.failed_nodes += 1,
594 NodeHealthStatus::Unknown => summary.unknown_nodes += 1,
595 NodeHealthStatus::Recovering => summary.recovering_nodes += 1,
596 NodeHealthStatus::Maintenance => summary.maintenance_nodes += 1,
597 }
598 }
599
600 summary
601 }
602
603 pub fn get_node_health(&self, node_id: &str) -> Option<NodeHealthStatus> {
605 let nodes = self.nodes.read().expect("Operation failed");
606 nodes.get(node_id).map(|info| info.health_status.clone())
607 }
608
609 pub fn list_nodes(&self) -> Vec<String> {
611 let nodes = self.nodes.read().expect("Operation failed");
612 nodes.keys().cloned().collect()
613 }
614}
615
616#[derive(Debug, Clone)]
618pub struct NodeMonitoringInfo {
619 pub node_id: String,
621 pub current_metrics: NodeMetrics,
623 pub health_status: NodeHealthStatus,
625 pub last_check: Instant,
627 pub failure_count: usize,
629 pub recovery_attempts: usize,
631 pub alerts: VecDeque<Alert>,
633}
634
635#[derive(Debug, Clone, Serialize, Deserialize)]
637pub struct NodeMetrics {
638 pub cpu_usage: f64,
640 pub memory_usage: f64,
642 pub disk_usage: f64,
644 pub network_usage: f64,
646 pub active_connections: usize,
648 pub response_time_ms: f64,
650 pub error_rate: f64,
652 pub last_heartbeat: SystemTime,
654 pub custom_metrics: HashMap<String, f64>,
656}
657
658impl NodeMetrics {
659 pub fn healthy() -> Self {
661 Self {
662 cpu_usage: 10.0,
663 memory_usage: 20.0,
664 disk_usage: 30.0,
665 network_usage: 5.0,
666 active_connections: 10,
667 response_time_ms: 50.0,
668 error_rate: 0.001,
669 last_heartbeat: SystemTime::now(),
670 custom_metrics: HashMap::new(),
671 }
672 }
673
674 pub fn degraded() -> Self {
676 Self {
677 cpu_usage: 90.0, memory_usage: 80.0,
679 disk_usage: 70.0,
680 network_usage: 60.0,
681 active_connections: 100,
682 response_time_ms: 500.0,
683 error_rate: 0.05,
684 last_heartbeat: SystemTime::now(),
685 custom_metrics: HashMap::new(),
686 }
687 }
688}
689
690#[derive(Debug, Clone, Serialize, Deserialize)]
692pub struct AlertThresholds {
693 pub cpu_warning: f64,
695 pub cpu_critical: f64,
697 pub memory_warning: f64,
699 pub memory_critical: f64,
701 pub response_time_warning: f64,
703 pub response_time_critical: f64,
705 pub error_rate_warning: f64,
707 pub error_rate_critical: f64,
709}
710
711impl Default for AlertThresholds {
712 fn default() -> Self {
713 Self {
714 cpu_warning: 80.0,
715 cpu_critical: 95.0,
716 memory_warning: 80.0,
717 memory_critical: 95.0,
718 response_time_warning: 1000.0,
719 response_time_critical: 5000.0,
720 error_rate_warning: 0.01,
721 error_rate_critical: 0.05,
722 }
723 }
724}
725
726#[derive(Debug, Clone, Serialize, Deserialize)]
728pub struct HealthSummary {
729 pub total_nodes: usize,
731 pub healthy_nodes: usize,
733 pub degraded_nodes: usize,
735 pub failed_nodes: usize,
737 pub unknown_nodes: usize,
739 pub recovering_nodes: usize,
741 pub maintenance_nodes: usize,
743 pub last_updated: SystemTime,
745}
746
747impl HealthSummary {
748 pub fn health_percentage(&self) -> f64 {
750 if self.total_nodes == 0 {
751 return 100.0;
752 }
753
754 (self.healthy_nodes as f64 / self.total_nodes as f64) * 100.0
755 }
756
757 pub fn is_healthy(&self) -> bool {
759 self.failed_nodes == 0 && self.degraded_nodes <= (self.total_nodes / 10)
760 }
761}
762
763#[derive(Debug, Clone, Serialize, Deserialize)]
765pub struct Alert {
766 pub id: String,
768 pub severity: AlertSeverity,
770 pub message: String,
772 pub timestamp: SystemTime,
774 pub acknowledged: bool,
776}
777
778#[derive(Debug, Clone)]
780pub struct NodeReplacementRequest {
781 pub failed_node: String,
783 pub replacement_type: NodeReplacementStrategy,
785 pub requested_at: SystemTime,
787 pub priority: ReplacementPriority,
789}
790
791#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
793pub enum ReplacementPriority {
794 Low,
796 Normal,
798 High,
800 Emergency,
802}
803
804#[cfg(test)]
805mod tests {
806 use super::*;
807
808 #[test]
809 fn test_fault_recovery_manager_creation() {
810 let config = FaultToleranceConfig::default();
811 let manager = FaultRecoveryManager::new(config);
812 assert_eq!(manager.get_recovery_history().len(), 0);
813 }
814
815 #[test]
816 fn test_health_monitor_creation() {
817 let monitor = HealthMonitor::new(30);
818 assert_eq!(monitor.list_nodes().len(), 0);
819 }
820
821 #[test]
822 fn test_node_registration() {
823 let mut monitor = HealthMonitor::new(30);
824 let metrics = NodeMetrics::healthy();
825
826 monitor
827 .register_node("node1".to_string(), metrics)
828 .expect("Operation failed");
829 assert_eq!(monitor.list_nodes().len(), 1);
830 assert_eq!(
831 monitor.get_node_health("node1"),
832 Some(NodeHealthStatus::Healthy)
833 );
834 }
835
836 #[test]
837 fn test_health_status_determination() {
838 let monitor = HealthMonitor::new(30);
839
840 let healthy_metrics = NodeMetrics::healthy();
841 assert_eq!(
842 monitor.determine_health_status(&healthy_metrics),
843 NodeHealthStatus::Healthy
844 );
845
846 let degraded_metrics = NodeMetrics::degraded();
847 assert_eq!(
848 monitor.determine_health_status(°raded_metrics),
849 NodeHealthStatus::Degraded
850 );
851 }
852
853 #[test]
854 fn test_recovery_action_creation() {
855 let action = RecoveryAction {
856 id: "test_action".to_string(),
857 action_type: RecoveryActionType::NodeFailover,
858 target_node: "node1".to_string(),
859 severity: AlertSeverity::Critical,
860 description: "Test recovery action".to_string(),
861 strategy: RecoveryStrategy::Immediate,
862 created_at: SystemTime::now(),
863 started_at: None,
864 completed_at: None,
865 status: RecoveryStatus::Pending,
866 error: None,
867 };
868
869 assert_eq!(action.status, RecoveryStatus::Pending);
870 assert_eq!(action.severity, AlertSeverity::Critical);
871 }
872
873 #[test]
874 fn test_health_summary() {
875 let summary = HealthSummary {
876 total_nodes: 10,
877 healthy_nodes: 8,
878 degraded_nodes: 1,
879 failed_nodes: 1,
880 unknown_nodes: 0,
881 recovering_nodes: 0,
882 maintenance_nodes: 0,
883 last_updated: SystemTime::now(),
884 };
885
886 assert_eq!(summary.health_percentage(), 80.0);
887 assert!(!summary.is_healthy()); }
889
890 #[test]
891 fn test_alert_thresholds() {
892 let thresholds = AlertThresholds::default();
893 assert_eq!(thresholds.cpu_warning, 80.0);
894 assert_eq!(thresholds.cpu_critical, 95.0);
895 assert!(thresholds.cpu_critical > thresholds.cpu_warning);
896 }
897}