mecha10_core/
recovery.rs

1//! Graceful Degradation and Automatic Recovery Policies
2//!
3//! This module provides sophisticated failure handling and recovery strategies
4//! for robust distributed robot systems.
5//!
6//! # Features
7//!
8//! - Recovery policies (restart, exponential backoff, circuit breaker)
9//! - Graceful degradation strategies
10//! - Dependency-aware recovery
11//! - Health-based decision making
12//! - Recovery history and analytics
13//! - Configurable recovery limits
14//!
15//! # Usage
16//!
17//! ```rust
18//! use mecha10::prelude::*;
19//!
20//! let policy = RecoveryPolicy::builder()
21//!     .max_restarts(5)
22//!     .backoff_strategy(BackoffStrategy::Exponential {
23//!         initial_delay: Duration::from_secs(1),
24//!         max_delay: Duration::from_secs(60),
25//!         multiplier: 2.0,
26//!     })
27//!     .degradation_mode(DegradationMode::Disable)
28//!     .build();
29//!
30//! let manager = RecoveryManager::new();
31//! manager.set_policy("camera_node", policy).await;
32//! ```
33
34use crate::types::NodeState;
35use serde::{Deserialize, Serialize};
36use std::collections::{HashMap, VecDeque};
37use std::sync::Arc;
38use std::time::{Duration, Instant, SystemTime};
39use tokio::sync::RwLock;
40
41// ============================================================================
42// Recovery Policy Configuration
43// ============================================================================
44
45/// Backoff strategy for restart delays
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub enum BackoffStrategy {
48    /// No delay between restarts
49    Immediate,
50
51    /// Fixed delay between restarts
52    Fixed { delay: Duration },
53
54    /// Exponential backoff with jitter
55    Exponential {
56        initial_delay: Duration,
57        max_delay: Duration,
58        multiplier: f64,
59    },
60
61    /// Linear backoff
62    Linear {
63        initial_delay: Duration,
64        increment: Duration,
65        max_delay: Duration,
66    },
67}
68
69impl Default for BackoffStrategy {
70    fn default() -> Self {
71        Self::Exponential {
72            initial_delay: Duration::from_secs(1),
73            max_delay: Duration::from_secs(60),
74            multiplier: 2.0,
75        }
76    }
77}
78
79impl BackoffStrategy {
80    /// Calculate delay for nth restart attempt
81    pub fn calculate_delay(&self, attempt: u32) -> Duration {
82        match self {
83            Self::Immediate => Duration::from_secs(0),
84            Self::Fixed { delay } => *delay,
85            Self::Exponential {
86                initial_delay,
87                max_delay,
88                multiplier,
89            } => {
90                let delay_secs = initial_delay.as_secs_f64() * multiplier.powi(attempt as i32);
91                let clamped = delay_secs.min(max_delay.as_secs_f64());
92                Duration::from_secs_f64(clamped)
93            }
94            Self::Linear {
95                initial_delay,
96                increment,
97                max_delay,
98            } => {
99                let delay_secs = initial_delay.as_secs_f64() + (increment.as_secs_f64() * attempt as f64);
100                let clamped = delay_secs.min(max_delay.as_secs_f64());
101                Duration::from_secs_f64(clamped)
102            }
103        }
104    }
105}
106
107/// Degradation mode when a node cannot be recovered
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
109pub enum DegradationMode {
110    /// Disable the node and continue without it
111    Disable,
112
113    /// Stop dependent nodes (cascading shutdown)
114    StopDependents,
115
116    /// Shutdown entire system
117    ShutdownSystem,
118
119    /// Use fallback node if available
120    Fallback,
121
122    /// Run in reduced functionality mode
123    ReducedMode,
124}
125
126/// Recovery action to take
127#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
128pub enum RecoveryAction {
129    /// Restart the node
130    Restart,
131
132    /// Stop the node
133    Stop,
134
135    /// Replace with fallback
136    UseFallback,
137
138    /// Reduce functionality
139    Degrade,
140
141    /// No action (monitoring only)
142    Monitor,
143
144    /// Escalate to system-level decision
145    Escalate,
146}
147
148/// Comprehensive recovery policy
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct RecoveryPolicy {
151    /// Maximum number of restart attempts
152    pub max_restarts: u32,
153
154    /// Time window for counting restarts (resets after this duration)
155    pub restart_window: Duration,
156
157    /// Backoff strategy for delays between restarts
158    pub backoff_strategy: BackoffStrategy,
159
160    /// What to do when max restarts exceeded
161    pub degradation_mode: DegradationMode,
162
163    /// Optional fallback node ID
164    pub fallback_node: Option<String>,
165
166    /// Critical flag - if true, system cannot run without this node
167    pub critical: bool,
168
169    /// Health check timeout before considering node unhealthy
170    pub health_timeout: Duration,
171
172    /// Enable automatic recovery
173    pub auto_recover: bool,
174
175    /// Custom recovery script/command
176    pub recovery_script: Option<String>,
177}
178
179impl Default for RecoveryPolicy {
180    fn default() -> Self {
181        Self {
182            max_restarts: 3,
183            restart_window: Duration::from_secs(300), // 5 minutes
184            backoff_strategy: BackoffStrategy::default(),
185            degradation_mode: DegradationMode::Disable,
186            fallback_node: None,
187            critical: false,
188            health_timeout: Duration::from_secs(30),
189            auto_recover: true,
190            recovery_script: None,
191        }
192    }
193}
194
195impl RecoveryPolicy {
196    /// Create a new policy builder
197    pub fn builder() -> RecoveryPolicyBuilder {
198        RecoveryPolicyBuilder::default()
199    }
200
201    /// Check if we should attempt recovery
202    pub fn should_recover(&self, history: &RecoveryHistory) -> bool {
203        if !self.auto_recover {
204            return false;
205        }
206
207        // Count recent restarts within the window
208        let window_start = Instant::now() - self.restart_window;
209        let recent_restarts = history
210            .attempts
211            .iter()
212            .filter(|attempt| attempt.timestamp >= window_start)
213            .count();
214
215        recent_restarts < self.max_restarts as usize
216    }
217
218    /// Calculate next recovery action
219    pub fn next_action(&self, history: &RecoveryHistory) -> RecoveryAction {
220        if !self.should_recover(history) {
221            // Exceeded max restarts
222            return match self.degradation_mode {
223                DegradationMode::Disable => RecoveryAction::Stop,
224                DegradationMode::StopDependents => RecoveryAction::Stop,
225                DegradationMode::ShutdownSystem => RecoveryAction::Escalate,
226                DegradationMode::Fallback => {
227                    if self.fallback_node.is_some() {
228                        RecoveryAction::UseFallback
229                    } else {
230                        RecoveryAction::Stop
231                    }
232                }
233                DegradationMode::ReducedMode => RecoveryAction::Degrade,
234            };
235        }
236
237        RecoveryAction::Restart
238    }
239
240    /// Get delay before next restart attempt
241    pub fn get_restart_delay(&self, attempt: u32) -> Duration {
242        self.backoff_strategy.calculate_delay(attempt)
243    }
244}
245
246/// Builder for recovery policy
247#[derive(Default)]
248pub struct RecoveryPolicyBuilder {
249    policy: RecoveryPolicy,
250}
251
252impl RecoveryPolicyBuilder {
253    pub fn max_restarts(mut self, max: u32) -> Self {
254        self.policy.max_restarts = max;
255        self
256    }
257
258    pub fn restart_window(mut self, window: Duration) -> Self {
259        self.policy.restart_window = window;
260        self
261    }
262
263    pub fn backoff_strategy(mut self, strategy: BackoffStrategy) -> Self {
264        self.policy.backoff_strategy = strategy;
265        self
266    }
267
268    pub fn degradation_mode(mut self, mode: DegradationMode) -> Self {
269        self.policy.degradation_mode = mode;
270        self
271    }
272
273    pub fn fallback_node(mut self, node_id: String) -> Self {
274        self.policy.fallback_node = Some(node_id);
275        self
276    }
277
278    pub fn critical(mut self, critical: bool) -> Self {
279        self.policy.critical = critical;
280        self
281    }
282
283    pub fn health_timeout(mut self, timeout: Duration) -> Self {
284        self.policy.health_timeout = timeout;
285        self
286    }
287
288    pub fn auto_recover(mut self, auto: bool) -> Self {
289        self.policy.auto_recover = auto;
290        self
291    }
292
293    pub fn recovery_script(mut self, script: String) -> Self {
294        self.policy.recovery_script = Some(script);
295        self
296    }
297
298    pub fn build(self) -> RecoveryPolicy {
299        self.policy
300    }
301}
302
303// ============================================================================
304// Recovery History and Analytics
305// ============================================================================
306
307/// Single recovery attempt record
308#[derive(Debug, Clone, Serialize, Deserialize)]
309pub struct RecoveryAttempt {
310    /// When the attempt occurred (skipped during serialization, use system_time instead)
311    #[serde(skip, default = "Instant::now")]
312    pub timestamp: Instant,
313
314    /// System time for serialization
315    pub system_time: SystemTime,
316
317    /// What action was taken
318    pub action: RecoveryAction,
319
320    /// Whether it succeeded
321    pub success: bool,
322
323    /// Error message if failed
324    pub error: Option<String>,
325
326    /// How long the recovery took
327    pub duration: Duration,
328
329    /// Node state before recovery
330    pub previous_state: NodeState,
331
332    /// Node state after recovery
333    pub new_state: NodeState,
334}
335
336/// Recovery history for a node
337#[derive(Debug, Clone, Serialize, Deserialize)]
338pub struct RecoveryHistory {
339    /// Node identifier
340    pub node_id: String,
341
342    /// All recovery attempts (limited to last 100)
343    pub attempts: VecDeque<RecoveryAttempt>,
344
345    /// Total number of restarts
346    pub total_restarts: u64,
347
348    /// Total number of failures
349    pub total_failures: u64,
350
351    /// Last successful recovery time
352    #[serde(skip)]
353    pub last_success: Option<Instant>,
354
355    /// Last failure time
356    #[serde(skip)]
357    pub last_failure: Option<Instant>,
358
359    /// Current consecutive failures
360    pub consecutive_failures: u32,
361
362    /// Mean time between failures (MTBF)
363    pub mtbf: Option<Duration>,
364
365    /// Mean time to recovery (MTTR)
366    pub mttr: Option<Duration>,
367}
368
369impl RecoveryHistory {
370    /// Create new recovery history
371    pub fn new(node_id: String) -> Self {
372        Self {
373            node_id,
374            attempts: VecDeque::with_capacity(100),
375            total_restarts: 0,
376            total_failures: 0,
377            last_success: None,
378            last_failure: None,
379            consecutive_failures: 0,
380            mtbf: None,
381            mttr: None,
382        }
383    }
384
385    /// Record a recovery attempt
386    pub fn record_attempt(&mut self, attempt: RecoveryAttempt) {
387        if attempt.success {
388            self.total_restarts += 1;
389            self.last_success = Some(attempt.timestamp);
390            self.consecutive_failures = 0;
391        } else {
392            self.total_failures += 1;
393            self.last_failure = Some(attempt.timestamp);
394            self.consecutive_failures += 1;
395        }
396
397        // Keep only last 100 attempts
398        if self.attempts.len() >= 100 {
399            self.attempts.pop_front();
400        }
401        self.attempts.push_back(attempt);
402
403        // Recalculate metrics
404        self.calculate_metrics();
405    }
406
407    /// Calculate MTBF and MTTR
408    fn calculate_metrics(&mut self) {
409        if self.attempts.len() < 2 {
410            return;
411        }
412
413        let mut failure_intervals = Vec::new();
414        let mut recovery_durations = Vec::new();
415
416        let mut last_success_time: Option<Instant> = None;
417
418        for attempt in &self.attempts {
419            if attempt.success {
420                if let Some(last_success) = last_success_time {
421                    let interval = attempt.timestamp.duration_since(last_success);
422                    failure_intervals.push(interval);
423                }
424                last_success_time = Some(attempt.timestamp);
425                recovery_durations.push(attempt.duration);
426            }
427        }
428
429        // Calculate MTBF (mean time between failures)
430        if !failure_intervals.is_empty() {
431            let total: Duration = failure_intervals.iter().sum();
432            self.mtbf = Some(total / failure_intervals.len() as u32);
433        }
434
435        // Calculate MTTR (mean time to recovery)
436        if !recovery_durations.is_empty() {
437            let total: Duration = recovery_durations.iter().sum();
438            self.mttr = Some(total / recovery_durations.len() as u32);
439        }
440    }
441
442    /// Get success rate (0.0 to 1.0)
443    pub fn success_rate(&self) -> f64 {
444        let total = self.total_restarts + self.total_failures;
445        if total == 0 {
446            return 1.0;
447        }
448        self.total_restarts as f64 / total as f64
449    }
450
451    /// Get recent failures count (last N attempts)
452    pub fn recent_failures(&self, count: usize) -> usize {
453        self.attempts.iter().rev().take(count).filter(|a| !a.success).count()
454    }
455}
456
457// ============================================================================
458// Recovery Manager
459// ============================================================================
460
461/// Global recovery manager
462pub struct RecoveryManager {
463    /// Recovery policies per node
464    policies: Arc<RwLock<HashMap<String, RecoveryPolicy>>>,
465
466    /// Recovery history per node
467    histories: Arc<RwLock<HashMap<String, RecoveryHistory>>>,
468
469    /// Default policy
470    default_policy: Arc<RwLock<RecoveryPolicy>>,
471
472    /// Degraded nodes (running in reduced mode)
473    degraded_nodes: Arc<RwLock<Vec<String>>>,
474}
475
476impl RecoveryManager {
477    /// Create new recovery manager
478    pub fn new() -> Self {
479        Self {
480            policies: Arc::new(RwLock::new(HashMap::new())),
481            histories: Arc::new(RwLock::new(HashMap::new())),
482            default_policy: Arc::new(RwLock::new(RecoveryPolicy::default())),
483            degraded_nodes: Arc::new(RwLock::new(Vec::new())),
484        }
485    }
486
487    /// Set default recovery policy
488    pub async fn set_default_policy(&self, policy: RecoveryPolicy) {
489        *self.default_policy.write().await = policy;
490    }
491
492    /// Set recovery policy for a specific node
493    pub async fn set_policy(&self, node_id: &str, policy: RecoveryPolicy) {
494        let mut policies = self.policies.write().await;
495        policies.insert(node_id.to_string(), policy);
496    }
497
498    /// Get recovery policy for a node
499    pub async fn get_policy(&self, node_id: &str) -> RecoveryPolicy {
500        let policies = self.policies.read().await;
501        if let Some(policy) = policies.get(node_id) {
502            policy.clone()
503        } else {
504            self.default_policy.read().await.clone()
505        }
506    }
507
508    /// Get recovery history for a node
509    pub async fn get_history(&self, node_id: &str) -> RecoveryHistory {
510        let histories = self.histories.read().await;
511        if let Some(history) = histories.get(node_id) {
512            history.clone()
513        } else {
514            RecoveryHistory::new(node_id.to_string())
515        }
516    }
517
518    /// Record a recovery attempt
519    pub async fn record_attempt(&self, node_id: &str, attempt: RecoveryAttempt) {
520        let mut histories = self.histories.write().await;
521        let history = histories
522            .entry(node_id.to_string())
523            .or_insert_with(|| RecoveryHistory::new(node_id.to_string()));
524        history.record_attempt(attempt);
525    }
526
527    /// Determine recovery action for a node
528    pub async fn determine_action(&self, node_id: &str, current_state: NodeState) -> RecoveryAction {
529        let policy = self.get_policy(node_id).await;
530        let history = self.get_history(node_id).await;
531
532        // Only recover from crashed or unhealthy states
533        match current_state {
534            NodeState::Crashed | NodeState::Unhealthy => policy.next_action(&history),
535            _ => RecoveryAction::Monitor,
536        }
537    }
538
539    /// Calculate delay before next restart
540    pub async fn get_restart_delay(&self, node_id: &str) -> Duration {
541        let policy = self.get_policy(node_id).await;
542        let history = self.get_history(node_id).await;
543
544        let window_start = Instant::now() - policy.restart_window;
545        let recent_attempts = history.attempts.iter().filter(|a| a.timestamp >= window_start).count();
546
547        policy.get_restart_delay(recent_attempts as u32)
548    }
549
550    /// Mark node as degraded
551    pub async fn mark_degraded(&self, node_id: &str) {
552        let mut degraded = self.degraded_nodes.write().await;
553        if !degraded.contains(&node_id.to_string()) {
554            degraded.push(node_id.to_string());
555        }
556    }
557
558    /// Check if node is degraded
559    pub async fn is_degraded(&self, node_id: &str) -> bool {
560        let degraded = self.degraded_nodes.read().await;
561        degraded.contains(&node_id.to_string())
562    }
563
564    /// Get all degraded nodes
565    pub async fn list_degraded(&self) -> Vec<String> {
566        self.degraded_nodes.read().await.clone()
567    }
568
569    /// Clear degraded status
570    pub async fn clear_degraded(&self, node_id: &str) {
571        let mut degraded = self.degraded_nodes.write().await;
572        degraded.retain(|id| id != node_id);
573    }
574
575    /// Get recovery statistics
576    pub async fn get_stats(&self) -> RecoveryStats {
577        let histories = self.histories.read().await;
578
579        let total_nodes = histories.len();
580        let total_restarts: u64 = histories.values().map(|h| h.total_restarts).sum();
581        let total_failures: u64 = histories.values().map(|h| h.total_failures).sum();
582
583        let nodes_with_failures = histories.values().filter(|h| h.total_failures > 0).count();
584
585        let degraded_count = self.degraded_nodes.read().await.len();
586
587        RecoveryStats {
588            total_nodes,
589            total_restarts,
590            total_failures,
591            nodes_with_failures,
592            degraded_nodes: degraded_count,
593            avg_success_rate: if total_nodes > 0 {
594                histories.values().map(|h| h.success_rate()).sum::<f64>() / total_nodes as f64
595            } else {
596                1.0
597            },
598        }
599    }
600}
601
602impl Default for RecoveryManager {
603    fn default() -> Self {
604        Self::new()
605    }
606}
607
608/// Recovery statistics
609#[derive(Debug, Clone, Serialize, Deserialize)]
610pub struct RecoveryStats {
611    pub total_nodes: usize,
612    pub total_restarts: u64,
613    pub total_failures: u64,
614    pub nodes_with_failures: usize,
615    pub degraded_nodes: usize,
616    pub avg_success_rate: f64,
617}
618
619// ============================================================================
620// Tests
621// ============================================================================