1use crate::types::NodeState;
35use serde::{Deserialize, Serialize};
36use std::collections::{HashMap, VecDeque};
37use std::sync::Arc;
38use std::time::{Duration, Instant, SystemTime};
39use tokio::sync::RwLock;
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
47pub enum BackoffStrategy {
48 Immediate,
50
51 Fixed { delay: Duration },
53
54 Exponential {
56 initial_delay: Duration,
57 max_delay: Duration,
58 multiplier: f64,
59 },
60
61 Linear {
63 initial_delay: Duration,
64 increment: Duration,
65 max_delay: Duration,
66 },
67}
68
69impl Default for BackoffStrategy {
70 fn default() -> Self {
71 Self::Exponential {
72 initial_delay: Duration::from_secs(1),
73 max_delay: Duration::from_secs(60),
74 multiplier: 2.0,
75 }
76 }
77}
78
79impl BackoffStrategy {
80 pub fn calculate_delay(&self, attempt: u32) -> Duration {
82 match self {
83 Self::Immediate => Duration::from_secs(0),
84 Self::Fixed { delay } => *delay,
85 Self::Exponential {
86 initial_delay,
87 max_delay,
88 multiplier,
89 } => {
90 let delay_secs = initial_delay.as_secs_f64() * multiplier.powi(attempt as i32);
91 let clamped = delay_secs.min(max_delay.as_secs_f64());
92 Duration::from_secs_f64(clamped)
93 }
94 Self::Linear {
95 initial_delay,
96 increment,
97 max_delay,
98 } => {
99 let delay_secs = initial_delay.as_secs_f64() + (increment.as_secs_f64() * attempt as f64);
100 let clamped = delay_secs.min(max_delay.as_secs_f64());
101 Duration::from_secs_f64(clamped)
102 }
103 }
104 }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
109pub enum DegradationMode {
110 Disable,
112
113 StopDependents,
115
116 ShutdownSystem,
118
119 Fallback,
121
122 ReducedMode,
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
128pub enum RecoveryAction {
129 Restart,
131
132 Stop,
134
135 UseFallback,
137
138 Degrade,
140
141 Monitor,
143
144 Escalate,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct RecoveryPolicy {
151 pub max_restarts: u32,
153
154 pub restart_window: Duration,
156
157 pub backoff_strategy: BackoffStrategy,
159
160 pub degradation_mode: DegradationMode,
162
163 pub fallback_node: Option<String>,
165
166 pub critical: bool,
168
169 pub health_timeout: Duration,
171
172 pub auto_recover: bool,
174
175 pub recovery_script: Option<String>,
177}
178
179impl Default for RecoveryPolicy {
180 fn default() -> Self {
181 Self {
182 max_restarts: 3,
183 restart_window: Duration::from_secs(300), backoff_strategy: BackoffStrategy::default(),
185 degradation_mode: DegradationMode::Disable,
186 fallback_node: None,
187 critical: false,
188 health_timeout: Duration::from_secs(30),
189 auto_recover: true,
190 recovery_script: None,
191 }
192 }
193}
194
195impl RecoveryPolicy {
196 pub fn builder() -> RecoveryPolicyBuilder {
198 RecoveryPolicyBuilder::default()
199 }
200
201 pub fn should_recover(&self, history: &RecoveryHistory) -> bool {
203 if !self.auto_recover {
204 return false;
205 }
206
207 let window_start = Instant::now() - self.restart_window;
209 let recent_restarts = history
210 .attempts
211 .iter()
212 .filter(|attempt| attempt.timestamp >= window_start)
213 .count();
214
215 recent_restarts < self.max_restarts as usize
216 }
217
218 pub fn next_action(&self, history: &RecoveryHistory) -> RecoveryAction {
220 if !self.should_recover(history) {
221 return match self.degradation_mode {
223 DegradationMode::Disable => RecoveryAction::Stop,
224 DegradationMode::StopDependents => RecoveryAction::Stop,
225 DegradationMode::ShutdownSystem => RecoveryAction::Escalate,
226 DegradationMode::Fallback => {
227 if self.fallback_node.is_some() {
228 RecoveryAction::UseFallback
229 } else {
230 RecoveryAction::Stop
231 }
232 }
233 DegradationMode::ReducedMode => RecoveryAction::Degrade,
234 };
235 }
236
237 RecoveryAction::Restart
238 }
239
240 pub fn get_restart_delay(&self, attempt: u32) -> Duration {
242 self.backoff_strategy.calculate_delay(attempt)
243 }
244}
245
246#[derive(Default)]
248pub struct RecoveryPolicyBuilder {
249 policy: RecoveryPolicy,
250}
251
252impl RecoveryPolicyBuilder {
253 pub fn max_restarts(mut self, max: u32) -> Self {
254 self.policy.max_restarts = max;
255 self
256 }
257
258 pub fn restart_window(mut self, window: Duration) -> Self {
259 self.policy.restart_window = window;
260 self
261 }
262
263 pub fn backoff_strategy(mut self, strategy: BackoffStrategy) -> Self {
264 self.policy.backoff_strategy = strategy;
265 self
266 }
267
268 pub fn degradation_mode(mut self, mode: DegradationMode) -> Self {
269 self.policy.degradation_mode = mode;
270 self
271 }
272
273 pub fn fallback_node(mut self, node_id: String) -> Self {
274 self.policy.fallback_node = Some(node_id);
275 self
276 }
277
278 pub fn critical(mut self, critical: bool) -> Self {
279 self.policy.critical = critical;
280 self
281 }
282
283 pub fn health_timeout(mut self, timeout: Duration) -> Self {
284 self.policy.health_timeout = timeout;
285 self
286 }
287
288 pub fn auto_recover(mut self, auto: bool) -> Self {
289 self.policy.auto_recover = auto;
290 self
291 }
292
293 pub fn recovery_script(mut self, script: String) -> Self {
294 self.policy.recovery_script = Some(script);
295 self
296 }
297
298 pub fn build(self) -> RecoveryPolicy {
299 self.policy
300 }
301}
302
303#[derive(Debug, Clone, Serialize, Deserialize)]
309pub struct RecoveryAttempt {
310 #[serde(skip, default = "Instant::now")]
312 pub timestamp: Instant,
313
314 pub system_time: SystemTime,
316
317 pub action: RecoveryAction,
319
320 pub success: bool,
322
323 pub error: Option<String>,
325
326 pub duration: Duration,
328
329 pub previous_state: NodeState,
331
332 pub new_state: NodeState,
334}
335
336#[derive(Debug, Clone, Serialize, Deserialize)]
338pub struct RecoveryHistory {
339 pub node_id: String,
341
342 pub attempts: VecDeque<RecoveryAttempt>,
344
345 pub total_restarts: u64,
347
348 pub total_failures: u64,
350
351 #[serde(skip)]
353 pub last_success: Option<Instant>,
354
355 #[serde(skip)]
357 pub last_failure: Option<Instant>,
358
359 pub consecutive_failures: u32,
361
362 pub mtbf: Option<Duration>,
364
365 pub mttr: Option<Duration>,
367}
368
369impl RecoveryHistory {
370 pub fn new(node_id: String) -> Self {
372 Self {
373 node_id,
374 attempts: VecDeque::with_capacity(100),
375 total_restarts: 0,
376 total_failures: 0,
377 last_success: None,
378 last_failure: None,
379 consecutive_failures: 0,
380 mtbf: None,
381 mttr: None,
382 }
383 }
384
385 pub fn record_attempt(&mut self, attempt: RecoveryAttempt) {
387 if attempt.success {
388 self.total_restarts += 1;
389 self.last_success = Some(attempt.timestamp);
390 self.consecutive_failures = 0;
391 } else {
392 self.total_failures += 1;
393 self.last_failure = Some(attempt.timestamp);
394 self.consecutive_failures += 1;
395 }
396
397 if self.attempts.len() >= 100 {
399 self.attempts.pop_front();
400 }
401 self.attempts.push_back(attempt);
402
403 self.calculate_metrics();
405 }
406
407 fn calculate_metrics(&mut self) {
409 if self.attempts.len() < 2 {
410 return;
411 }
412
413 let mut failure_intervals = Vec::new();
414 let mut recovery_durations = Vec::new();
415
416 let mut last_success_time: Option<Instant> = None;
417
418 for attempt in &self.attempts {
419 if attempt.success {
420 if let Some(last_success) = last_success_time {
421 let interval = attempt.timestamp.duration_since(last_success);
422 failure_intervals.push(interval);
423 }
424 last_success_time = Some(attempt.timestamp);
425 recovery_durations.push(attempt.duration);
426 }
427 }
428
429 if !failure_intervals.is_empty() {
431 let total: Duration = failure_intervals.iter().sum();
432 self.mtbf = Some(total / failure_intervals.len() as u32);
433 }
434
435 if !recovery_durations.is_empty() {
437 let total: Duration = recovery_durations.iter().sum();
438 self.mttr = Some(total / recovery_durations.len() as u32);
439 }
440 }
441
442 pub fn success_rate(&self) -> f64 {
444 let total = self.total_restarts + self.total_failures;
445 if total == 0 {
446 return 1.0;
447 }
448 self.total_restarts as f64 / total as f64
449 }
450
451 pub fn recent_failures(&self, count: usize) -> usize {
453 self.attempts.iter().rev().take(count).filter(|a| !a.success).count()
454 }
455}
456
457pub struct RecoveryManager {
463 policies: Arc<RwLock<HashMap<String, RecoveryPolicy>>>,
465
466 histories: Arc<RwLock<HashMap<String, RecoveryHistory>>>,
468
469 default_policy: Arc<RwLock<RecoveryPolicy>>,
471
472 degraded_nodes: Arc<RwLock<Vec<String>>>,
474}
475
476impl RecoveryManager {
477 pub fn new() -> Self {
479 Self {
480 policies: Arc::new(RwLock::new(HashMap::new())),
481 histories: Arc::new(RwLock::new(HashMap::new())),
482 default_policy: Arc::new(RwLock::new(RecoveryPolicy::default())),
483 degraded_nodes: Arc::new(RwLock::new(Vec::new())),
484 }
485 }
486
487 pub async fn set_default_policy(&self, policy: RecoveryPolicy) {
489 *self.default_policy.write().await = policy;
490 }
491
492 pub async fn set_policy(&self, node_id: &str, policy: RecoveryPolicy) {
494 let mut policies = self.policies.write().await;
495 policies.insert(node_id.to_string(), policy);
496 }
497
498 pub async fn get_policy(&self, node_id: &str) -> RecoveryPolicy {
500 let policies = self.policies.read().await;
501 if let Some(policy) = policies.get(node_id) {
502 policy.clone()
503 } else {
504 self.default_policy.read().await.clone()
505 }
506 }
507
508 pub async fn get_history(&self, node_id: &str) -> RecoveryHistory {
510 let histories = self.histories.read().await;
511 if let Some(history) = histories.get(node_id) {
512 history.clone()
513 } else {
514 RecoveryHistory::new(node_id.to_string())
515 }
516 }
517
518 pub async fn record_attempt(&self, node_id: &str, attempt: RecoveryAttempt) {
520 let mut histories = self.histories.write().await;
521 let history = histories
522 .entry(node_id.to_string())
523 .or_insert_with(|| RecoveryHistory::new(node_id.to_string()));
524 history.record_attempt(attempt);
525 }
526
527 pub async fn determine_action(&self, node_id: &str, current_state: NodeState) -> RecoveryAction {
529 let policy = self.get_policy(node_id).await;
530 let history = self.get_history(node_id).await;
531
532 match current_state {
534 NodeState::Crashed | NodeState::Unhealthy => policy.next_action(&history),
535 _ => RecoveryAction::Monitor,
536 }
537 }
538
539 pub async fn get_restart_delay(&self, node_id: &str) -> Duration {
541 let policy = self.get_policy(node_id).await;
542 let history = self.get_history(node_id).await;
543
544 let window_start = Instant::now() - policy.restart_window;
545 let recent_attempts = history.attempts.iter().filter(|a| a.timestamp >= window_start).count();
546
547 policy.get_restart_delay(recent_attempts as u32)
548 }
549
550 pub async fn mark_degraded(&self, node_id: &str) {
552 let mut degraded = self.degraded_nodes.write().await;
553 if !degraded.contains(&node_id.to_string()) {
554 degraded.push(node_id.to_string());
555 }
556 }
557
558 pub async fn is_degraded(&self, node_id: &str) -> bool {
560 let degraded = self.degraded_nodes.read().await;
561 degraded.contains(&node_id.to_string())
562 }
563
564 pub async fn list_degraded(&self) -> Vec<String> {
566 self.degraded_nodes.read().await.clone()
567 }
568
569 pub async fn clear_degraded(&self, node_id: &str) {
571 let mut degraded = self.degraded_nodes.write().await;
572 degraded.retain(|id| id != node_id);
573 }
574
575 pub async fn get_stats(&self) -> RecoveryStats {
577 let histories = self.histories.read().await;
578
579 let total_nodes = histories.len();
580 let total_restarts: u64 = histories.values().map(|h| h.total_restarts).sum();
581 let total_failures: u64 = histories.values().map(|h| h.total_failures).sum();
582
583 let nodes_with_failures = histories.values().filter(|h| h.total_failures > 0).count();
584
585 let degraded_count = self.degraded_nodes.read().await.len();
586
587 RecoveryStats {
588 total_nodes,
589 total_restarts,
590 total_failures,
591 nodes_with_failures,
592 degraded_nodes: degraded_count,
593 avg_success_rate: if total_nodes > 0 {
594 histories.values().map(|h| h.success_rate()).sum::<f64>() / total_nodes as f64
595 } else {
596 1.0
597 },
598 }
599 }
600}
601
602impl Default for RecoveryManager {
603 fn default() -> Self {
604 Self::new()
605 }
606}
607
608#[derive(Debug, Clone, Serialize, Deserialize)]
610pub struct RecoveryStats {
611 pub total_nodes: usize,
612 pub total_restarts: u64,
613 pub total_failures: u64,
614 pub nodes_with_failures: usize,
615 pub degraded_nodes: usize,
616 pub avg_success_rate: f64,
617}
618
619