scirs2_core/distributed/
fault_tolerance.rs

1//! Fault tolerance mechanisms for distributed systems
2//!
3//! This module provides fault detection, recovery, and resilience mechanisms
4//! for distributed computing environments.
5
6use crate::error::{CoreError, CoreResult, ErrorContext};
7use std::collections::HashMap;
8use std::net::SocketAddr;
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, Instant};
11
12/// Node health status
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum NodeHealth {
15    /// Node is healthy and responsive
16    Healthy,
17    /// Node is degraded but functional
18    Degraded,
19    /// Node is unresponsive
20    Unresponsive,
21    /// Node has failed
22    Failed,
23    /// Node is recovering
24    Recovering,
25}
26
27/// Fault detection strategy
28#[derive(Debug, Clone)]
29pub enum FaultDetectionStrategy {
30    /// Heartbeat-based detection
31    Heartbeat {
32        interval: Duration,
33        timeout: Duration,
34    },
35    /// Ping-based detection
36    Ping {
37        interval: Duration,
38        timeout: Duration,
39    },
40    /// Application-level health checks
41    HealthCheck {
42        interval: Duration,
43        endpoint: String,
44    },
45}
46
47/// Recovery strategy for failed nodes
48#[derive(Debug, Clone)]
49pub enum RecoveryStrategy {
50    /// Restart the node
51    Restart,
52    /// Migrate tasks to healthy nodes
53    Migrate,
54    /// Replace with standby node
55    Replace { standbyaddress: SocketAddr },
56    /// Manual intervention required
57    Manual,
58}
59
60/// Fault tolerance error types
61#[derive(Debug, Clone)]
62pub enum FaultToleranceError {
63    /// Lock acquisition failed
64    LockError(String),
65    /// Node not found
66    NodeNotFound(String),
67    /// Invalid configuration
68    InvalidConfiguration(String),
69    /// Recovery failed
70    RecoveryFailed(String),
71    /// General error
72    GeneralError(String),
73}
74
75/// Node information for fault tolerance
76#[derive(Debug, Clone)]
77pub struct NodeInfo {
78    pub nodeid: String,
79    pub address: SocketAddr,
80    pub health: NodeHealth,
81    pub last_seen: Instant,
82    pub failure_count: usize,
83    pub recovery_strategy: RecoveryStrategy,
84}
85
86impl NodeInfo {
87    /// Create new node info
88    pub fn new(nodeid: String, address: SocketAddr) -> Self {
89        Self {
90            nodeid,
91            address,
92            health: NodeHealth::Healthy,
93            last_seen: Instant::now(),
94            failure_count: 0,
95            recovery_strategy: RecoveryStrategy::Restart,
96        }
97    }
98
99    /// Update node health status
100    pub fn update_health(&mut self, health: NodeHealth) {
101        if health == NodeHealth::Failed {
102            self.failure_count += 1;
103        }
104        self.health = health;
105        self.last_seen = Instant::now();
106    }
107
108    /// Check if node is healthy
109    pub fn is_healthy(&self) -> bool {
110        matches!(self.health, NodeHealth::Healthy)
111    }
112
113    /// Check if node has failed
114    pub fn has_failed(&self) -> bool {
115        matches!(self.health, NodeHealth::Failed | NodeHealth::Unresponsive)
116    }
117}
118
119/// Fault tolerance manager
120#[derive(Debug)]
121pub struct FaultToleranceManager {
122    nodes: Arc<Mutex<HashMap<String, NodeInfo>>>,
123    detection_strategy: FaultDetectionStrategy,
124    #[allow(dead_code)]
125    maxfailures: usize,
126    #[allow(dead_code)]
127    failure_threshold: Duration,
128}
129
130impl FaultToleranceManager {
131    /// Create a new fault tolerance manager
132    pub fn failures(detection_strategy: FaultDetectionStrategy, maxfailures: usize) -> Self {
133        Self {
134            nodes: Arc::new(Mutex::new(HashMap::new())),
135            detection_strategy,
136            maxfailures,
137            failure_threshold: Duration::from_secs(300), // 5 minutes
138        }
139    }
140
141    /// Create a new fault tolerance manager (alias for failures)
142    pub fn new(detection_strategy: FaultDetectionStrategy, maxfailures: usize) -> Self {
143        Self::failures(detection_strategy, maxfailures)
144    }
145
146    /// Register a node for monitoring
147    pub fn info(&mut self, nodeinfo: NodeInfo) -> CoreResult<()> {
148        let mut nodes = self.nodes.lock().map_err(|_| {
149            CoreError::InvalidState(ErrorContext::new(
150                "Failed to acquire nodes lock".to_string(),
151            ))
152        })?;
153        nodes.insert(nodeinfo.nodeid.clone(), nodeinfo);
154        Ok(())
155    }
156
157    /// Update node health status
158    pub fn update_node_health(&mut self, nodeid: &str, health: NodeHealth) -> CoreResult<()> {
159        let mut nodes = self.nodes.lock().map_err(|_| {
160            CoreError::InvalidState(ErrorContext::new(
161                "Failed to acquire nodes lock".to_string(),
162            ))
163        })?;
164
165        if let Some(node) = nodes.get_mut(nodeid) {
166            node.update_health(health);
167        } else {
168            return Err(CoreError::InvalidArgument(ErrorContext::new(format!(
169                "Unknown node: {nodeid}",
170            ))));
171        }
172        Ok(())
173    }
174
175    /// Get all healthy nodes
176    pub fn get_healthy_nodes(&self) -> CoreResult<Vec<NodeInfo>> {
177        let nodes = self.nodes.lock().map_err(|_| {
178            CoreError::InvalidState(ErrorContext::new(
179                "Failed to acquire nodes lock".to_string(),
180            ))
181        })?;
182
183        Ok(nodes
184            .values()
185            .filter(|node| node.is_healthy())
186            .cloned()
187            .collect())
188    }
189
190    /// Get all failed nodes
191    pub fn get_failed_nodes(&self) -> CoreResult<Vec<NodeInfo>> {
192        let nodes = self.nodes.lock().map_err(|_| {
193            CoreError::InvalidState(ErrorContext::new(
194                "Failed to acquire nodes lock".to_string(),
195            ))
196        })?;
197
198        Ok(nodes
199            .values()
200            .filter(|node| node.has_failed())
201            .cloned()
202            .collect())
203    }
204
205    /// Detect failed nodes based on timeout
206    pub fn detect_failures(&self) -> CoreResult<Vec<String>> {
207        let nodes = self.nodes.lock().map_err(|_| {
208            CoreError::InvalidState(ErrorContext::new(
209                "Failed to acquire nodes lock".to_string(),
210            ))
211        })?;
212
213        let now = Instant::now();
214        let mut failed_nodes = Vec::new();
215
216        for (nodeid, node) in nodes.iter() {
217            let timeout = match &self.detection_strategy {
218                FaultDetectionStrategy::Heartbeat { timeout, .. } => *timeout,
219                FaultDetectionStrategy::Ping { timeout, .. } => *timeout,
220                FaultDetectionStrategy::HealthCheck { .. } => Duration::from_secs(30),
221            };
222
223            if now.duration_since(node.last_seen) > timeout && node.is_healthy() {
224                failed_nodes.push(nodeid.clone());
225            }
226        }
227
228        Ok(failed_nodes)
229    }
230
231    /// Initiate recovery for failed nodes
232    pub fn id_2(&self, nodeid: &str) -> CoreResult<()> {
233        let nodes = self.nodes.lock().map_err(|_| {
234            CoreError::InvalidState(ErrorContext::new(
235                "Failed to acquire nodes lock".to_string(),
236            ))
237        })?;
238
239        if let Some(node) = nodes.get(nodeid) {
240            match &node.recovery_strategy {
241                RecoveryStrategy::Restart => {
242                    self.restart_node(nodeid)?;
243                }
244                RecoveryStrategy::Migrate => {
245                    self.migrate_tasks(nodeid)?;
246                }
247                RecoveryStrategy::Replace { standbyaddress } => {
248                    self.replace_node(nodeid, *standbyaddress)?;
249                }
250                RecoveryStrategy::Manual => {
251                    println!("Manual intervention required for node: {nodeid}");
252                }
253            }
254        }
255
256        Ok(())
257    }
258
259    fn restart_node(&self, nodeid: &str) -> CoreResult<()> {
260        // In a real implementation, this would trigger node restart
261        println!("Restarting node: {nodeid}");
262        Ok(())
263    }
264
265    fn migrate_tasks(&self, nodeid: &str) -> CoreResult<()> {
266        // In a real implementation, this would migrate tasks to healthy nodes
267        println!("Migrating tasks from failed node: {nodeid}");
268        Ok(())
269    }
270
271    fn replace_node(&self, nodeid: &str, standbyaddress: SocketAddr) -> CoreResult<()> {
272        // In a real implementation, this would activate standby node
273        println!("Replacing node {nodeid} with standby at {standbyaddress}");
274        Ok(())
275    }
276
277    /// Check if the cluster has sufficient healthy nodes
278    pub fn is_cluster_healthy(&self) -> CoreResult<bool> {
279        let nodes = self.nodes.lock().map_err(|_| {
280            CoreError::InvalidState(ErrorContext::new(
281                "Failed to acquire nodes lock".to_string(),
282            ))
283        })?;
284
285        let healthy_count = nodes.values().filter(|node| node.is_healthy()).count();
286        let total_count = nodes.len();
287
288        // Require at least 50% of nodes to be healthy
289        Ok(healthy_count * 2 >= total_count)
290    }
291
292    /// Get cluster health summary
293    pub fn get_cluster_health_summary(&self) -> CoreResult<ClusterHealthSummary> {
294        let nodes = self.nodes.lock().map_err(|_| {
295            CoreError::InvalidState(ErrorContext::new(
296                "Failed to acquire nodes lock".to_string(),
297            ))
298        })?;
299
300        let mut summary = ClusterHealthSummary::default();
301
302        for node in nodes.values() {
303            match node.health {
304                NodeHealth::Healthy => summary.healthy_count += 1,
305                NodeHealth::Degraded => summary.degraded_count += 1,
306                NodeHealth::Unresponsive => summary.unresponsive_count += 1,
307                NodeHealth::Failed => summary.failed_count += 1,
308                NodeHealth::Recovering => summary.recovering_count += 1,
309            }
310        }
311
312        summary.total_count = nodes.len();
313        Ok(summary)
314    }
315
316    /// Register a node with the fault tolerance manager
317    pub fn register_node(&self, node: NodeInfo) -> Result<(), FaultToleranceError> {
318        let mut nodes = self.nodes.lock().map_err(|_| {
319            FaultToleranceError::LockError("Failed to acquire nodes lock".to_string())
320        })?;
321
322        nodes.insert(node.nodeid.clone(), node);
323
324        Ok(())
325    }
326}
327
328/// Cluster health summary
329#[derive(Debug, Default)]
330pub struct ClusterHealthSummary {
331    pub total_count: usize,
332    pub healthy_count: usize,
333    pub degraded_count: usize,
334    pub unresponsive_count: usize,
335    pub failed_count: usize,
336    pub recovering_count: usize,
337}
338
339impl ClusterHealthSummary {
340    /// Calculate health percentage
341    pub fn health_percentage(&self) -> f64 {
342        if self.total_count == 0 {
343            return 100.0;
344        }
345        (self.healthy_count as f64 / self.total_count as f64) * 100.0
346    }
347
348    /// Check if cluster is in good health
349    pub fn is_healthy(&self) -> bool {
350        self.health_percentage() >= 75.0
351    }
352}
353
354/// Initialize fault tolerance system
355#[allow(dead_code)]
356pub fn initialize_fault_tolerance() -> CoreResult<()> {
357    let _manager = FaultToleranceManager::new(
358        FaultDetectionStrategy::Heartbeat {
359            interval: Duration::from_secs(30),
360            timeout: Duration::from_secs(60),
361        },
362        3,
363    );
364    Ok(())
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370    use std::net::{IpAddr, Ipv4Addr};
371
372    #[test]
373    fn test_nodeinfo_creation() {
374        let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
375        let node = NodeInfo::new("node1".to_string(), address);
376
377        assert_eq!(node.nodeid, "node1");
378        assert_eq!(node.address, address);
379        assert_eq!(node.health, NodeHealth::Healthy);
380        assert!(node.is_healthy());
381        assert!(!node.has_failed());
382    }
383
384    #[test]
385    fn test_node_health_update() {
386        let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
387        let mut node = NodeInfo::new("node1".to_string(), address);
388
389        node.update_health(NodeHealth::Failed);
390        assert_eq!(node.health, NodeHealth::Failed);
391        assert_eq!(node.failure_count, 1);
392        assert!(node.has_failed());
393    }
394
395    #[test]
396    fn test_fault_tolerance_manager() {
397        let strategy = FaultDetectionStrategy::Heartbeat {
398            interval: Duration::from_secs(30),
399            timeout: Duration::from_secs(60),
400        };
401        let mut manager = FaultToleranceManager::new(strategy, 3);
402
403        let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
404        let node = NodeInfo::new("node1".to_string(), address);
405
406        assert!(manager.register_node(node).is_ok());
407        assert!(manager
408            .update_node_health("node1", NodeHealth::Failed)
409            .is_ok());
410
411        let failed_nodes = manager.get_failed_nodes().unwrap();
412        assert_eq!(failed_nodes.len(), 1);
413        assert_eq!(failed_nodes[0].nodeid, "node1");
414    }
415
416    #[test]
417    fn test_cluster_health_summary() {
418        let summary = ClusterHealthSummary {
419            total_count: 10,
420            healthy_count: 8,
421            failed_count: 2,
422            ..Default::default()
423        };
424
425        assert_eq!(summary.health_percentage(), 80.0);
426        assert!(summary.is_healthy());
427    }
428}