1use crate::error::{CoreError, CoreResult, ErrorContext};
7use std::collections::HashMap;
8use std::net::SocketAddr;
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, Instant};
11
12#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum NodeHealth {
15 Healthy,
17 Degraded,
19 Unresponsive,
21 Failed,
23 Recovering,
25}
26
27#[derive(Debug, Clone)]
29pub enum FaultDetectionStrategy {
30 Heartbeat {
32 interval: Duration,
33 timeout: Duration,
34 },
35 Ping {
37 interval: Duration,
38 timeout: Duration,
39 },
40 HealthCheck {
42 interval: Duration,
43 endpoint: String,
44 },
45}
46
47#[derive(Debug, Clone)]
49pub enum RecoveryStrategy {
50 Restart,
52 Migrate,
54 Replace { standbyaddress: SocketAddr },
56 Manual,
58}
59
60#[derive(Debug, Clone)]
62pub enum FaultToleranceError {
63 LockError(String),
65 NodeNotFound(String),
67 InvalidConfiguration(String),
69 RecoveryFailed(String),
71 GeneralError(String),
73}
74
75#[derive(Debug, Clone)]
77pub struct NodeInfo {
78 pub nodeid: String,
79 pub address: SocketAddr,
80 pub health: NodeHealth,
81 pub last_seen: Instant,
82 pub failure_count: usize,
83 pub recovery_strategy: RecoveryStrategy,
84}
85
86impl NodeInfo {
87 pub fn new(nodeid: String, address: SocketAddr) -> Self {
89 Self {
90 nodeid,
91 address,
92 health: NodeHealth::Healthy,
93 last_seen: Instant::now(),
94 failure_count: 0,
95 recovery_strategy: RecoveryStrategy::Restart,
96 }
97 }
98
99 pub fn update_health(&mut self, health: NodeHealth) {
101 if health == NodeHealth::Failed {
102 self.failure_count += 1;
103 }
104 self.health = health;
105 self.last_seen = Instant::now();
106 }
107
108 pub fn is_healthy(&self) -> bool {
110 matches!(self.health, NodeHealth::Healthy)
111 }
112
113 pub fn has_failed(&self) -> bool {
115 matches!(self.health, NodeHealth::Failed | NodeHealth::Unresponsive)
116 }
117}
118
119#[derive(Debug)]
121pub struct FaultToleranceManager {
122 nodes: Arc<Mutex<HashMap<String, NodeInfo>>>,
123 detection_strategy: FaultDetectionStrategy,
124 #[allow(dead_code)]
125 maxfailures: usize,
126 #[allow(dead_code)]
127 failure_threshold: Duration,
128}
129
130impl FaultToleranceManager {
131 pub fn failures(detection_strategy: FaultDetectionStrategy, maxfailures: usize) -> Self {
133 Self {
134 nodes: Arc::new(Mutex::new(HashMap::new())),
135 detection_strategy,
136 maxfailures,
137 failure_threshold: Duration::from_secs(300), }
139 }
140
141 pub fn new(detection_strategy: FaultDetectionStrategy, maxfailures: usize) -> Self {
143 Self::failures(detection_strategy, maxfailures)
144 }
145
146 pub fn info(&mut self, nodeinfo: NodeInfo) -> CoreResult<()> {
148 let mut nodes = self.nodes.lock().map_err(|_| {
149 CoreError::InvalidState(ErrorContext::new(
150 "Failed to acquire nodes lock".to_string(),
151 ))
152 })?;
153 nodes.insert(nodeinfo.nodeid.clone(), nodeinfo);
154 Ok(())
155 }
156
157 pub fn update_node_health(&mut self, nodeid: &str, health: NodeHealth) -> CoreResult<()> {
159 let mut nodes = self.nodes.lock().map_err(|_| {
160 CoreError::InvalidState(ErrorContext::new(
161 "Failed to acquire nodes lock".to_string(),
162 ))
163 })?;
164
165 if let Some(node) = nodes.get_mut(nodeid) {
166 node.update_health(health);
167 } else {
168 return Err(CoreError::InvalidArgument(ErrorContext::new(format!(
169 "Unknown node: {nodeid}",
170 ))));
171 }
172 Ok(())
173 }
174
175 pub fn get_healthy_nodes(&self) -> CoreResult<Vec<NodeInfo>> {
177 let nodes = self.nodes.lock().map_err(|_| {
178 CoreError::InvalidState(ErrorContext::new(
179 "Failed to acquire nodes lock".to_string(),
180 ))
181 })?;
182
183 Ok(nodes
184 .values()
185 .filter(|node| node.is_healthy())
186 .cloned()
187 .collect())
188 }
189
190 pub fn get_failed_nodes(&self) -> CoreResult<Vec<NodeInfo>> {
192 let nodes = self.nodes.lock().map_err(|_| {
193 CoreError::InvalidState(ErrorContext::new(
194 "Failed to acquire nodes lock".to_string(),
195 ))
196 })?;
197
198 Ok(nodes
199 .values()
200 .filter(|node| node.has_failed())
201 .cloned()
202 .collect())
203 }
204
205 pub fn detect_failures(&self) -> CoreResult<Vec<String>> {
207 let nodes = self.nodes.lock().map_err(|_| {
208 CoreError::InvalidState(ErrorContext::new(
209 "Failed to acquire nodes lock".to_string(),
210 ))
211 })?;
212
213 let now = Instant::now();
214 let mut failed_nodes = Vec::new();
215
216 for (nodeid, node) in nodes.iter() {
217 let timeout = match &self.detection_strategy {
218 FaultDetectionStrategy::Heartbeat { timeout, .. } => *timeout,
219 FaultDetectionStrategy::Ping { timeout, .. } => *timeout,
220 FaultDetectionStrategy::HealthCheck { .. } => Duration::from_secs(30),
221 };
222
223 if now.duration_since(node.last_seen) > timeout && node.is_healthy() {
224 failed_nodes.push(nodeid.clone());
225 }
226 }
227
228 Ok(failed_nodes)
229 }
230
231 pub fn id_2(&self, nodeid: &str) -> CoreResult<()> {
233 let nodes = self.nodes.lock().map_err(|_| {
234 CoreError::InvalidState(ErrorContext::new(
235 "Failed to acquire nodes lock".to_string(),
236 ))
237 })?;
238
239 if let Some(node) = nodes.get(nodeid) {
240 match &node.recovery_strategy {
241 RecoveryStrategy::Restart => {
242 self.restart_node(nodeid)?;
243 }
244 RecoveryStrategy::Migrate => {
245 self.migrate_tasks(nodeid)?;
246 }
247 RecoveryStrategy::Replace { standbyaddress } => {
248 self.replace_node(nodeid, *standbyaddress)?;
249 }
250 RecoveryStrategy::Manual => {
251 println!("Manual intervention required for node: {nodeid}");
252 }
253 }
254 }
255
256 Ok(())
257 }
258
259 fn restart_node(&self, nodeid: &str) -> CoreResult<()> {
260 println!("Restarting node: {nodeid}");
262 Ok(())
263 }
264
265 fn migrate_tasks(&self, nodeid: &str) -> CoreResult<()> {
266 println!("Migrating tasks from failed node: {nodeid}");
268 Ok(())
269 }
270
271 fn replace_node(&self, nodeid: &str, standbyaddress: SocketAddr) -> CoreResult<()> {
272 println!("Replacing node {nodeid} with standby at {standbyaddress}");
274 Ok(())
275 }
276
277 pub fn is_cluster_healthy(&self) -> CoreResult<bool> {
279 let nodes = self.nodes.lock().map_err(|_| {
280 CoreError::InvalidState(ErrorContext::new(
281 "Failed to acquire nodes lock".to_string(),
282 ))
283 })?;
284
285 let healthy_count = nodes.values().filter(|node| node.is_healthy()).count();
286 let total_count = nodes.len();
287
288 Ok(healthy_count * 2 >= total_count)
290 }
291
292 pub fn get_cluster_health_summary(&self) -> CoreResult<ClusterHealthSummary> {
294 let nodes = self.nodes.lock().map_err(|_| {
295 CoreError::InvalidState(ErrorContext::new(
296 "Failed to acquire nodes lock".to_string(),
297 ))
298 })?;
299
300 let mut summary = ClusterHealthSummary::default();
301
302 for node in nodes.values() {
303 match node.health {
304 NodeHealth::Healthy => summary.healthy_count += 1,
305 NodeHealth::Degraded => summary.degraded_count += 1,
306 NodeHealth::Unresponsive => summary.unresponsive_count += 1,
307 NodeHealth::Failed => summary.failed_count += 1,
308 NodeHealth::Recovering => summary.recovering_count += 1,
309 }
310 }
311
312 summary.total_count = nodes.len();
313 Ok(summary)
314 }
315
316 pub fn register_node(&self, node: NodeInfo) -> Result<(), FaultToleranceError> {
318 let mut nodes = self.nodes.lock().map_err(|_| {
319 FaultToleranceError::LockError("Failed to acquire nodes lock".to_string())
320 })?;
321
322 nodes.insert(node.nodeid.clone(), node);
323
324 Ok(())
325 }
326}
327
328#[derive(Debug, Default)]
330pub struct ClusterHealthSummary {
331 pub total_count: usize,
332 pub healthy_count: usize,
333 pub degraded_count: usize,
334 pub unresponsive_count: usize,
335 pub failed_count: usize,
336 pub recovering_count: usize,
337}
338
339impl ClusterHealthSummary {
340 pub fn health_percentage(&self) -> f64 {
342 if self.total_count == 0 {
343 return 100.0;
344 }
345 (self.healthy_count as f64 / self.total_count as f64) * 100.0
346 }
347
348 pub fn is_healthy(&self) -> bool {
350 self.health_percentage() >= 75.0
351 }
352}
353
354#[allow(dead_code)]
356pub fn initialize_fault_tolerance() -> CoreResult<()> {
357 let _manager = FaultToleranceManager::new(
358 FaultDetectionStrategy::Heartbeat {
359 interval: Duration::from_secs(30),
360 timeout: Duration::from_secs(60),
361 },
362 3,
363 );
364 Ok(())
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370 use std::net::{IpAddr, Ipv4Addr};
371
372 #[test]
373 fn test_nodeinfo_creation() {
374 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
375 let node = NodeInfo::new("node1".to_string(), address);
376
377 assert_eq!(node.nodeid, "node1");
378 assert_eq!(node.address, address);
379 assert_eq!(node.health, NodeHealth::Healthy);
380 assert!(node.is_healthy());
381 assert!(!node.has_failed());
382 }
383
384 #[test]
385 fn test_node_health_update() {
386 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
387 let mut node = NodeInfo::new("node1".to_string(), address);
388
389 node.update_health(NodeHealth::Failed);
390 assert_eq!(node.health, NodeHealth::Failed);
391 assert_eq!(node.failure_count, 1);
392 assert!(node.has_failed());
393 }
394
395 #[test]
396 fn test_fault_tolerance_manager() {
397 let strategy = FaultDetectionStrategy::Heartbeat {
398 interval: Duration::from_secs(30),
399 timeout: Duration::from_secs(60),
400 };
401 let mut manager = FaultToleranceManager::new(strategy, 3);
402
403 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
404 let node = NodeInfo::new("node1".to_string(), address);
405
406 assert!(manager.register_node(node).is_ok());
407 assert!(manager
408 .update_node_health("node1", NodeHealth::Failed)
409 .is_ok());
410
411 let failed_nodes = manager.get_failed_nodes().unwrap();
412 assert_eq!(failed_nodes.len(), 1);
413 assert_eq!(failed_nodes[0].nodeid, "node1");
414 }
415
416 #[test]
417 fn test_cluster_health_summary() {
418 let summary = ClusterHealthSummary {
419 total_count: 10,
420 healthy_count: 8,
421 failed_count: 2,
422 ..Default::default()
423 };
424
425 assert_eq!(summary.health_percentage(), 80.0);
426 assert!(summary.is_healthy());
427 }
428}