Skip to main content

ringkernel_core/
health.rs

1//! Health monitoring and resilience infrastructure for RingKernel.
2//!
3//! This module provides production-ready health and resilience features:
4//!
5//! - **Health Checks** - Kernel liveness and readiness probes
6//! - **Circuit Breakers** - Fault isolation and recovery
7//! - **Retry Policies** - Configurable retry with backoff
8//! - **Graceful Degradation** - Load shedding and fallback modes
9//! - **Watchdog** - Automatic kernel health monitoring
10//!
11//! ## Usage
12//!
13//! ```ignore
14//! use ringkernel_core::health::{HealthChecker, CircuitBreaker, RetryPolicy};
15//!
16//! // Create health checker
17//! let checker = HealthChecker::new()
18//!     .liveness_check("kernel_alive", || async { true })
19//!     .readiness_check("queue_ready", || async { queue_depth < 1000 });
20//!
21//! // Create circuit breaker
22//! let breaker = CircuitBreaker::new()
23//!     .failure_threshold(5)
24//!     .recovery_timeout(Duration::from_secs(30));
25//!
26//! // Execute with circuit breaker
27//! let result = breaker.execute(|| async { risky_operation() }).await;
28//! ```
29
30use parking_lot::RwLock;
31use std::collections::HashMap;
32use std::future::Future;
33use std::pin::Pin;
34use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
35use std::sync::Arc;
36use std::time::{Duration, Instant};
37
38use crate::error::{Result, RingKernelError};
39use crate::runtime::KernelId;
40
41// ============================================================================
42// Health Check Types
43// ============================================================================
44
45/// Health status of a component.
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum HealthStatus {
48    /// Component is healthy and operating normally.
49    Healthy,
50    /// Component is degraded but still functional.
51    Degraded,
52    /// Component is unhealthy and not functional.
53    Unhealthy,
54    /// Health status is unknown (check not yet run).
55    Unknown,
56}
57
58impl HealthStatus {
59    /// Check if status represents a healthy state.
60    pub fn is_healthy(&self) -> bool {
61        matches!(self, HealthStatus::Healthy | HealthStatus::Degraded)
62    }
63
64    /// Check if status represents an unhealthy state.
65    pub fn is_unhealthy(&self) -> bool {
66        matches!(self, HealthStatus::Unhealthy)
67    }
68}
69
70/// Result of a health check.
71#[derive(Debug, Clone)]
72pub struct HealthCheckResult {
73    /// Check name.
74    pub name: String,
75    /// Health status.
76    pub status: HealthStatus,
77    /// Human-readable message.
78    pub message: Option<String>,
79    /// Duration of the check.
80    pub duration: Duration,
81    /// Timestamp when check was performed.
82    pub checked_at: Instant,
83}
84
85/// Type alias for async health check functions.
86pub type HealthCheckFn =
87    Arc<dyn Fn() -> Pin<Box<dyn Future<Output = HealthStatus> + Send>> + Send + Sync>;
88
89/// A health check definition.
90pub struct HealthCheck {
91    /// Check name.
92    pub name: String,
93    /// Check function.
94    check_fn: HealthCheckFn,
95    /// Whether this is a liveness check.
96    pub is_liveness: bool,
97    /// Whether this is a readiness check.
98    pub is_readiness: bool,
99    /// Timeout for check execution.
100    pub timeout: Duration,
101    /// Last result.
102    last_result: RwLock<Option<HealthCheckResult>>,
103}
104
105impl HealthCheck {
106    /// Create a new health check.
107    pub fn new(name: impl Into<String>, check_fn: HealthCheckFn) -> Self {
108        Self {
109            name: name.into(),
110            check_fn,
111            is_liveness: false,
112            is_readiness: false,
113            timeout: Duration::from_secs(5),
114            last_result: RwLock::new(None),
115        }
116    }
117
118    /// Mark as liveness check.
119    pub fn liveness(mut self) -> Self {
120        self.is_liveness = true;
121        self
122    }
123
124    /// Mark as readiness check.
125    pub fn readiness(mut self) -> Self {
126        self.is_readiness = true;
127        self
128    }
129
130    /// Set timeout.
131    pub fn timeout(mut self, timeout: Duration) -> Self {
132        self.timeout = timeout;
133        self
134    }
135
136    /// Execute the health check.
137    pub async fn check(&self) -> HealthCheckResult {
138        let start = Instant::now();
139        let status = (self.check_fn)().await;
140        let duration = start.elapsed();
141
142        let result = HealthCheckResult {
143            name: self.name.clone(),
144            status,
145            message: None,
146            duration,
147            checked_at: Instant::now(),
148        };
149
150        *self.last_result.write() = Some(result.clone());
151        result
152    }
153
154    /// Get last check result.
155    pub fn last_result(&self) -> Option<HealthCheckResult> {
156        self.last_result.read().clone()
157    }
158}
159
160/// Health checker that manages multiple health checks.
161pub struct HealthChecker {
162    /// Registered health checks.
163    checks: RwLock<Vec<Arc<HealthCheck>>>,
164    /// Check interval (used by async runtime loop).
165    #[allow(dead_code)]
166    check_interval: Duration,
167    /// Running state (used by async runtime loop).
168    #[allow(dead_code)]
169    running: std::sync::atomic::AtomicBool,
170}
171
172impl HealthChecker {
173    /// Create a new health checker.
174    pub fn new() -> Arc<Self> {
175        Arc::new(Self {
176            checks: RwLock::new(Vec::new()),
177            check_interval: Duration::from_secs(10),
178            running: std::sync::atomic::AtomicBool::new(false),
179        })
180    }
181
182    /// Set check interval.
183    pub fn with_interval(self: Arc<Self>, interval: Duration) -> Arc<Self> {
184        // Note: This would require interior mutability or builder pattern
185        // For now, we just use the default
186        let _ = interval;
187        self
188    }
189
190    /// Register a health check.
191    pub fn register(&self, check: HealthCheck) {
192        self.checks.write().push(Arc::new(check));
193    }
194
195    /// Register a simple liveness check.
196    pub fn register_liveness<F, Fut>(&self, name: impl Into<String>, check_fn: F)
197    where
198        F: Fn() -> Fut + Send + Sync + 'static,
199        Fut: Future<Output = bool> + Send + 'static,
200    {
201        let name = name.into();
202        let check = HealthCheck::new(
203            name,
204            Arc::new(move || {
205                let fut = check_fn();
206                Box::pin(async move {
207                    if fut.await {
208                        HealthStatus::Healthy
209                    } else {
210                        HealthStatus::Unhealthy
211                    }
212                }) as Pin<Box<dyn Future<Output = HealthStatus> + Send>>
213            }),
214        )
215        .liveness();
216        self.register(check);
217    }
218
219    /// Register a simple readiness check.
220    pub fn register_readiness<F, Fut>(&self, name: impl Into<String>, check_fn: F)
221    where
222        F: Fn() -> Fut + Send + Sync + 'static,
223        Fut: Future<Output = bool> + Send + 'static,
224    {
225        let name = name.into();
226        let check = HealthCheck::new(
227            name,
228            Arc::new(move || {
229                let fut = check_fn();
230                Box::pin(async move {
231                    if fut.await {
232                        HealthStatus::Healthy
233                    } else {
234                        HealthStatus::Unhealthy
235                    }
236                }) as Pin<Box<dyn Future<Output = HealthStatus> + Send>>
237            }),
238        )
239        .readiness();
240        self.register(check);
241    }
242
243    /// Run all health checks.
244    pub async fn check_all(&self) -> Vec<HealthCheckResult> {
245        let checks = self.checks.read().clone();
246        let mut results = Vec::with_capacity(checks.len());
247
248        for check in checks {
249            results.push(check.check().await);
250        }
251
252        results
253    }
254
255    /// Run liveness checks only.
256    pub async fn check_liveness(&self) -> Vec<HealthCheckResult> {
257        let checks = self.checks.read().clone();
258        let mut results = Vec::new();
259
260        for check in checks.iter().filter(|c| c.is_liveness) {
261            results.push(check.check().await);
262        }
263
264        results
265    }
266
267    /// Run readiness checks only.
268    pub async fn check_readiness(&self) -> Vec<HealthCheckResult> {
269        let checks = self.checks.read().clone();
270        let mut results = Vec::new();
271
272        for check in checks.iter().filter(|c| c.is_readiness) {
273            results.push(check.check().await);
274        }
275
276        results
277    }
278
279    /// Get overall liveness status.
280    pub async fn is_alive(&self) -> bool {
281        let results = self.check_liveness().await;
282        results.iter().all(|r| r.status.is_healthy())
283    }
284
285    /// Get overall readiness status.
286    pub async fn is_ready(&self) -> bool {
287        let results = self.check_readiness().await;
288        results.iter().all(|r| r.status.is_healthy())
289    }
290
291    /// Get aggregate health status.
292    pub async fn aggregate_status(&self) -> HealthStatus {
293        let results = self.check_all().await;
294
295        if results.is_empty() {
296            return HealthStatus::Unknown;
297        }
298
299        let all_healthy = results.iter().all(|r| r.status == HealthStatus::Healthy);
300        let any_unhealthy = results.iter().any(|r| r.status == HealthStatus::Unhealthy);
301
302        if all_healthy {
303            HealthStatus::Healthy
304        } else if any_unhealthy {
305            HealthStatus::Unhealthy
306        } else {
307            HealthStatus::Degraded
308        }
309    }
310
311    /// Get check count.
312    pub fn check_count(&self) -> usize {
313        self.checks.read().len()
314    }
315}
316
317impl Default for HealthChecker {
318    fn default() -> Self {
319        Self {
320            checks: RwLock::new(Vec::new()),
321            check_interval: Duration::from_secs(10),
322            running: std::sync::atomic::AtomicBool::new(false),
323        }
324    }
325}
326
327// ============================================================================
328// Circuit Breaker
329// ============================================================================
330
331/// Circuit breaker state.
332#[derive(Debug, Clone, Copy, PartialEq, Eq)]
333pub enum CircuitState {
334    /// Circuit is closed (allowing requests).
335    Closed,
336    /// Circuit is open (rejecting requests).
337    Open,
338    /// Circuit is half-open (allowing test requests).
339    HalfOpen,
340}
341
342/// Circuit breaker configuration.
343#[derive(Debug, Clone)]
344pub struct CircuitBreakerConfig {
345    /// Number of failures before opening circuit.
346    pub failure_threshold: u32,
347    /// Number of successes to close circuit from half-open.
348    pub success_threshold: u32,
349    /// Duration to wait before transitioning from open to half-open.
350    pub recovery_timeout: Duration,
351    /// Duration of sliding window for counting failures.
352    pub window_duration: Duration,
353    /// Maximum concurrent requests in half-open state.
354    pub half_open_max_requests: u32,
355}
356
357impl Default for CircuitBreakerConfig {
358    fn default() -> Self {
359        Self {
360            failure_threshold: 5,
361            success_threshold: 3,
362            recovery_timeout: Duration::from_secs(30),
363            window_duration: Duration::from_secs(60),
364            half_open_max_requests: 3,
365        }
366    }
367}
368
369/// Circuit breaker for fault isolation.
370pub struct CircuitBreaker {
371    /// Configuration.
372    config: CircuitBreakerConfig,
373    /// Current state.
374    state: RwLock<CircuitState>,
375    /// Failure count in current window.
376    failure_count: AtomicU32,
377    /// Success count in half-open state.
378    success_count: AtomicU32,
379    /// Time when circuit opened.
380    opened_at: RwLock<Option<Instant>>,
381    /// Current requests in half-open state.
382    half_open_requests: AtomicU32,
383    /// Total requests.
384    total_requests: AtomicU64,
385    /// Total failures.
386    total_failures: AtomicU64,
387    /// Total rejections (due to open circuit).
388    total_rejections: AtomicU64,
389}
390
391impl CircuitBreaker {
392    /// Create a new circuit breaker with default config.
393    pub fn new() -> Arc<Self> {
394        Self::with_config(CircuitBreakerConfig::default())
395    }
396
397    /// Create with custom config.
398    pub fn with_config(config: CircuitBreakerConfig) -> Arc<Self> {
399        Arc::new(Self {
400            config,
401            state: RwLock::new(CircuitState::Closed),
402            failure_count: AtomicU32::new(0),
403            success_count: AtomicU32::new(0),
404            opened_at: RwLock::new(None),
405            half_open_requests: AtomicU32::new(0),
406            total_requests: AtomicU64::new(0),
407            total_failures: AtomicU64::new(0),
408            total_rejections: AtomicU64::new(0),
409        })
410    }
411
412    /// Get current state.
413    pub fn state(&self) -> CircuitState {
414        // Check if we should transition from open to half-open
415        let current_state = *self.state.read();
416        if current_state == CircuitState::Open {
417            if let Some(opened_at) = *self.opened_at.read() {
418                if opened_at.elapsed() >= self.config.recovery_timeout {
419                    *self.state.write() = CircuitState::HalfOpen;
420                    self.half_open_requests.store(0, Ordering::SeqCst);
421                    self.success_count.store(0, Ordering::SeqCst);
422                    return CircuitState::HalfOpen;
423                }
424            }
425        }
426        current_state
427    }
428
429    /// Check if circuit allows requests.
430    pub fn is_allowed(&self) -> bool {
431        match self.state() {
432            CircuitState::Closed => true,
433            CircuitState::Open => false,
434            CircuitState::HalfOpen => {
435                self.half_open_requests.load(Ordering::SeqCst) < self.config.half_open_max_requests
436            }
437        }
438    }
439
440    /// Record a successful operation.
441    pub fn record_success(&self) {
442        self.total_requests.fetch_add(1, Ordering::Relaxed);
443
444        let state = self.state();
445        if state == CircuitState::HalfOpen {
446            let success_count = self.success_count.fetch_add(1, Ordering::SeqCst) + 1;
447            self.half_open_requests.fetch_sub(1, Ordering::SeqCst);
448
449            if success_count >= self.config.success_threshold {
450                self.close();
451            }
452        }
453    }
454
455    /// Record a failed operation.
456    pub fn record_failure(&self) {
457        self.total_requests.fetch_add(1, Ordering::Relaxed);
458        self.total_failures.fetch_add(1, Ordering::Relaxed);
459
460        let state = self.state();
461        match state {
462            CircuitState::Closed => {
463                let failure_count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
464                if failure_count >= self.config.failure_threshold {
465                    self.open();
466                }
467            }
468            CircuitState::HalfOpen => {
469                self.half_open_requests.fetch_sub(1, Ordering::SeqCst);
470                self.open();
471            }
472            CircuitState::Open => {}
473        }
474    }
475
476    /// Record a rejection (request not attempted due to open circuit).
477    pub fn record_rejection(&self) {
478        self.total_rejections.fetch_add(1, Ordering::Relaxed);
479    }
480
481    /// Open the circuit.
482    fn open(&self) {
483        *self.state.write() = CircuitState::Open;
484        *self.opened_at.write() = Some(Instant::now());
485    }
486
487    /// Close the circuit.
488    fn close(&self) {
489        *self.state.write() = CircuitState::Closed;
490        *self.opened_at.write() = None;
491        self.failure_count.store(0, Ordering::SeqCst);
492        self.success_count.store(0, Ordering::SeqCst);
493    }
494
495    /// Force reset the circuit to closed state.
496    pub fn reset(&self) {
497        self.close();
498    }
499
500    /// Acquire permission to execute (for half-open state).
501    fn acquire_half_open(&self) -> bool {
502        if self.state() != CircuitState::HalfOpen {
503            return true;
504        }
505
506        let current = self.half_open_requests.load(Ordering::SeqCst);
507        if current >= self.config.half_open_max_requests {
508            return false;
509        }
510
511        self.half_open_requests.fetch_add(1, Ordering::SeqCst);
512        true
513    }
514
515    /// Execute an operation with circuit breaker protection.
516    pub async fn execute<F, Fut, T, E>(&self, operation: F) -> Result<T>
517    where
518        F: FnOnce() -> Fut,
519        Fut: Future<Output = std::result::Result<T, E>>,
520        E: std::fmt::Display,
521    {
522        if !self.is_allowed() {
523            self.record_rejection();
524            return Err(RingKernelError::BackendError(
525                "Circuit breaker is open".to_string(),
526            ));
527        }
528
529        if !self.acquire_half_open() {
530            self.record_rejection();
531            return Err(RingKernelError::BackendError(
532                "Circuit breaker half-open limit reached".to_string(),
533            ));
534        }
535
536        match operation().await {
537            Ok(result) => {
538                self.record_success();
539                Ok(result)
540            }
541            Err(e) => {
542                self.record_failure();
543                Err(RingKernelError::BackendError(format!(
544                    "Operation failed: {}",
545                    e
546                )))
547            }
548        }
549    }
550
551    /// Get circuit breaker statistics.
552    pub fn stats(&self) -> CircuitBreakerStats {
553        CircuitBreakerStats {
554            state: self.state(),
555            total_requests: self.total_requests.load(Ordering::Relaxed),
556            total_failures: self.total_failures.load(Ordering::Relaxed),
557            total_rejections: self.total_rejections.load(Ordering::Relaxed),
558            failure_count: self.failure_count.load(Ordering::Relaxed),
559            success_count: self.success_count.load(Ordering::Relaxed),
560        }
561    }
562}
563
564impl Default for CircuitBreaker {
565    fn default() -> Self {
566        Self {
567            config: CircuitBreakerConfig::default(),
568            state: RwLock::new(CircuitState::Closed),
569            failure_count: AtomicU32::new(0),
570            success_count: AtomicU32::new(0),
571            opened_at: RwLock::new(None),
572            half_open_requests: AtomicU32::new(0),
573            total_requests: AtomicU64::new(0),
574            total_failures: AtomicU64::new(0),
575            total_rejections: AtomicU64::new(0),
576        }
577    }
578}
579
580/// Circuit breaker statistics.
581#[derive(Debug, Clone)]
582pub struct CircuitBreakerStats {
583    /// Current state.
584    pub state: CircuitState,
585    /// Total requests attempted.
586    pub total_requests: u64,
587    /// Total failures.
588    pub total_failures: u64,
589    /// Total rejections.
590    pub total_rejections: u64,
591    /// Current failure count.
592    pub failure_count: u32,
593    /// Current success count (in half-open).
594    pub success_count: u32,
595}
596
597// ============================================================================
598// Retry Policy
599// ============================================================================
600
601/// Backoff strategy for retries.
602#[derive(Debug, Clone)]
603pub enum BackoffStrategy {
604    /// Fixed delay between retries.
605    Fixed(Duration),
606    /// Linear backoff (delay * attempt).
607    Linear {
608        /// Initial delay.
609        initial: Duration,
610        /// Maximum delay.
611        max: Duration,
612    },
613    /// Exponential backoff (delay * 2^attempt).
614    Exponential {
615        /// Initial delay.
616        initial: Duration,
617        /// Maximum delay.
618        max: Duration,
619        /// Multiplier (default 2.0).
620        multiplier: f64,
621    },
622    /// No delay between retries.
623    None,
624}
625
626impl BackoffStrategy {
627    /// Calculate delay for given attempt number (0-indexed).
628    pub fn delay(&self, attempt: u32) -> Duration {
629        match self {
630            BackoffStrategy::Fixed(d) => *d,
631            BackoffStrategy::Linear { initial, max } => {
632                let delay = initial.mul_f64((attempt + 1) as f64);
633                delay.min(*max)
634            }
635            BackoffStrategy::Exponential {
636                initial,
637                max,
638                multiplier,
639            } => {
640                let factor = multiplier.powi(attempt as i32);
641                let delay = initial.mul_f64(factor);
642                delay.min(*max)
643            }
644            BackoffStrategy::None => Duration::ZERO,
645        }
646    }
647}
648
649/// Retry policy configuration.
650#[derive(Clone)]
651pub struct RetryPolicy {
652    /// Maximum number of retry attempts.
653    pub max_attempts: u32,
654    /// Backoff strategy.
655    pub backoff: BackoffStrategy,
656    /// Whether to add jitter to delays.
657    pub jitter: bool,
658    /// Retryable error predicate.
659    #[allow(clippy::type_complexity)]
660    retryable: Option<Arc<dyn Fn(&str) -> bool + Send + Sync>>,
661}
662
663impl std::fmt::Debug for RetryPolicy {
664    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
665        f.debug_struct("RetryPolicy")
666            .field("max_attempts", &self.max_attempts)
667            .field("backoff", &self.backoff)
668            .field("jitter", &self.jitter)
669            .field("retryable", &self.retryable.is_some())
670            .finish()
671    }
672}
673
674impl RetryPolicy {
675    /// Create a new retry policy.
676    pub fn new(max_attempts: u32) -> Self {
677        Self {
678            max_attempts,
679            backoff: BackoffStrategy::Exponential {
680                initial: Duration::from_millis(100),
681                max: Duration::from_secs(30),
682                multiplier: 2.0,
683            },
684            jitter: true,
685            retryable: None,
686        }
687    }
688
689    /// Set backoff strategy.
690    pub fn with_backoff(mut self, backoff: BackoffStrategy) -> Self {
691        self.backoff = backoff;
692        self
693    }
694
695    /// Disable jitter.
696    pub fn without_jitter(mut self) -> Self {
697        self.jitter = false;
698        self
699    }
700
701    /// Set retryable error predicate.
702    pub fn with_retryable<F>(mut self, predicate: F) -> Self
703    where
704        F: Fn(&str) -> bool + Send + Sync + 'static,
705    {
706        self.retryable = Some(Arc::new(predicate));
707        self
708    }
709
710    /// Check if an error is retryable.
711    pub fn is_retryable(&self, error: &str) -> bool {
712        self.retryable.as_ref().map(|p| p(error)).unwrap_or(true)
713    }
714
715    /// Get delay for an attempt with optional jitter.
716    pub fn get_delay(&self, attempt: u32) -> Duration {
717        let base_delay = self.backoff.delay(attempt);
718
719        if self.jitter && base_delay > Duration::ZERO {
720            // Add up to 25% jitter
721            let jitter_factor = 0.75 + (rand_u64() % 50) as f64 / 200.0;
722            base_delay.mul_f64(jitter_factor)
723        } else {
724            base_delay
725        }
726    }
727
728    /// Execute an operation with retry.
729    pub async fn execute<F, Fut, T, E>(&self, mut operation: F) -> Result<T>
730    where
731        F: FnMut() -> Fut,
732        Fut: Future<Output = std::result::Result<T, E>>,
733        E: std::fmt::Display,
734    {
735        let mut last_error = String::new();
736
737        for attempt in 0..self.max_attempts {
738            match operation().await {
739                Ok(result) => return Ok(result),
740                Err(e) => {
741                    last_error = format!("{}", e);
742
743                    // Check if retryable
744                    if !self.is_retryable(&last_error) {
745                        return Err(RingKernelError::BackendError(format!(
746                            "Non-retryable error: {}",
747                            last_error
748                        )));
749                    }
750
751                    // Last attempt, don't wait
752                    if attempt + 1 >= self.max_attempts {
753                        break;
754                    }
755
756                    // Wait before retry
757                    let delay = self.get_delay(attempt);
758                    tokio::time::sleep(delay).await;
759                }
760            }
761        }
762
763        Err(RingKernelError::BackendError(format!(
764            "Operation failed after {} attempts: {}",
765            self.max_attempts, last_error
766        )))
767    }
768}
769
770impl Default for RetryPolicy {
771    fn default() -> Self {
772        Self::new(3)
773    }
774}
775
776/// Simple pseudo-random number generator for jitter.
777fn rand_u64() -> u64 {
778    use std::hash::{Hash, Hasher};
779    let mut hasher = std::collections::hash_map::DefaultHasher::new();
780    std::time::SystemTime::now().hash(&mut hasher);
781    std::thread::current().id().hash(&mut hasher);
782    hasher.finish()
783}
784
785// ============================================================================
786// Graceful Degradation
787// ============================================================================
788
789/// Degradation level for system operation.
790#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
791pub enum DegradationLevel {
792    /// Full functionality.
793    Normal = 0,
794    /// Minor degradation (e.g., increased latency acceptable).
795    Light = 1,
796    /// Moderate degradation (e.g., some features disabled).
797    Moderate = 2,
798    /// Severe degradation (e.g., read-only mode).
799    Severe = 3,
800    /// Critical (e.g., emergency mode only).
801    Critical = 4,
802}
803
804impl DegradationLevel {
805    /// Get the next worse degradation level.
806    ///
807    /// Returns Critical if already at Critical.
808    pub fn next_worse(self) -> Self {
809        match self {
810            DegradationLevel::Normal => DegradationLevel::Light,
811            DegradationLevel::Light => DegradationLevel::Moderate,
812            DegradationLevel::Moderate => DegradationLevel::Severe,
813            DegradationLevel::Severe => DegradationLevel::Critical,
814            DegradationLevel::Critical => DegradationLevel::Critical,
815        }
816    }
817
818    /// Get the next better degradation level.
819    ///
820    /// Returns Normal if already at Normal.
821    pub fn next_better(self) -> Self {
822        match self {
823            DegradationLevel::Normal => DegradationLevel::Normal,
824            DegradationLevel::Light => DegradationLevel::Normal,
825            DegradationLevel::Moderate => DegradationLevel::Light,
826            DegradationLevel::Severe => DegradationLevel::Moderate,
827            DegradationLevel::Critical => DegradationLevel::Severe,
828        }
829    }
830}
831
832/// Load shedding policy.
833#[derive(Debug, Clone)]
834pub struct LoadSheddingPolicy {
835    /// Queue depth threshold for shedding.
836    pub queue_threshold: usize,
837    /// CPU utilization threshold (0.0-1.0).
838    pub cpu_threshold: f64,
839    /// Memory utilization threshold (0.0-1.0).
840    pub memory_threshold: f64,
841    /// Percentage of requests to shed (0.0-1.0).
842    pub shed_ratio: f64,
843}
844
845impl Default for LoadSheddingPolicy {
846    fn default() -> Self {
847        Self {
848            queue_threshold: 10000,
849            cpu_threshold: 0.9,
850            memory_threshold: 0.85,
851            shed_ratio: 0.1,
852        }
853    }
854}
855
856/// Graceful degradation manager.
857pub struct DegradationManager {
858    /// Current degradation level.
859    level: RwLock<DegradationLevel>,
860    /// Load shedding policy.
861    policy: LoadSheddingPolicy,
862    /// Level change callbacks.
863    #[allow(clippy::type_complexity)]
864    callbacks: RwLock<Vec<Arc<dyn Fn(DegradationLevel, DegradationLevel) + Send + Sync>>>,
865    /// Shed counter for probabilistic shedding.
866    shed_counter: AtomicU64,
867    /// Total requests.
868    total_requests: AtomicU64,
869    /// Shed requests.
870    shed_requests: AtomicU64,
871}
872
873impl DegradationManager {
874    /// Create a new degradation manager.
875    pub fn new() -> Arc<Self> {
876        Arc::new(Self {
877            level: RwLock::new(DegradationLevel::Normal),
878            policy: LoadSheddingPolicy::default(),
879            callbacks: RwLock::new(Vec::new()),
880            shed_counter: AtomicU64::new(0),
881            total_requests: AtomicU64::new(0),
882            shed_requests: AtomicU64::new(0),
883        })
884    }
885
886    /// Create with custom policy.
887    pub fn with_policy(policy: LoadSheddingPolicy) -> Arc<Self> {
888        Arc::new(Self {
889            level: RwLock::new(DegradationLevel::Normal),
890            policy,
891            callbacks: RwLock::new(Vec::new()),
892            shed_counter: AtomicU64::new(0),
893            total_requests: AtomicU64::new(0),
894            shed_requests: AtomicU64::new(0),
895        })
896    }
897
898    /// Get current degradation level.
899    pub fn level(&self) -> DegradationLevel {
900        *self.level.read()
901    }
902
903    /// Set degradation level.
904    pub fn set_level(&self, new_level: DegradationLevel) {
905        let old_level = *self.level.read();
906        if old_level != new_level {
907            *self.level.write() = new_level;
908
909            // Notify callbacks
910            let callbacks = self.callbacks.read().clone();
911            for callback in callbacks {
912                callback(old_level, new_level);
913            }
914        }
915    }
916
917    /// Register level change callback.
918    pub fn on_level_change<F>(&self, callback: F)
919    where
920        F: Fn(DegradationLevel, DegradationLevel) + Send + Sync + 'static,
921    {
922        self.callbacks.write().push(Arc::new(callback));
923    }
924
925    /// Check if request should be shed.
926    pub fn should_shed(&self) -> bool {
927        self.total_requests.fetch_add(1, Ordering::Relaxed);
928
929        let level = self.level();
930        if level == DegradationLevel::Normal {
931            return false;
932        }
933
934        // Increase shed probability based on degradation level
935        let base_ratio = self.policy.shed_ratio;
936        let level_factor = match level {
937            DegradationLevel::Normal => 0.0,
938            DegradationLevel::Light => 1.0,
939            DegradationLevel::Moderate => 2.0,
940            DegradationLevel::Severe => 3.0,
941            DegradationLevel::Critical => 4.0,
942        };
943
944        let shed_probability = (base_ratio * level_factor).min(0.9);
945
946        // Probabilistic shedding
947        let counter = self.shed_counter.fetch_add(1, Ordering::Relaxed);
948        let should_shed = (counter % 100) < (shed_probability * 100.0) as u64;
949
950        if should_shed {
951            self.shed_requests.fetch_add(1, Ordering::Relaxed);
952        }
953
954        should_shed
955    }
956
957    /// Check if a feature should be disabled at current level.
958    pub fn is_feature_disabled(&self, required_level: DegradationLevel) -> bool {
959        self.level() > required_level
960    }
961
962    /// Get shedding statistics.
963    pub fn stats(&self) -> DegradationStats {
964        let total = self.total_requests.load(Ordering::Relaxed);
965        let shed = self.shed_requests.load(Ordering::Relaxed);
966
967        DegradationStats {
968            level: self.level(),
969            total_requests: total,
970            shed_requests: shed,
971            shed_ratio: if total > 0 {
972                shed as f64 / total as f64
973            } else {
974                0.0
975            },
976        }
977    }
978}
979
980impl Default for DegradationManager {
981    fn default() -> Self {
982        Self {
983            level: RwLock::new(DegradationLevel::Normal),
984            policy: LoadSheddingPolicy::default(),
985            callbacks: RwLock::new(Vec::new()),
986            shed_counter: AtomicU64::new(0),
987            total_requests: AtomicU64::new(0),
988            shed_requests: AtomicU64::new(0),
989        }
990    }
991}
992
993/// Degradation statistics.
994#[derive(Debug, Clone)]
995pub struct DegradationStats {
996    /// Current level.
997    pub level: DegradationLevel,
998    /// Total requests.
999    pub total_requests: u64,
1000    /// Shed requests.
1001    pub shed_requests: u64,
1002    /// Actual shed ratio.
1003    pub shed_ratio: f64,
1004}
1005
1006// ============================================================================
1007// Kernel Health Watchdog
1008// ============================================================================
1009
1010/// Kernel health status for watchdog.
1011#[derive(Debug, Clone)]
1012pub struct KernelHealth {
1013    /// Kernel ID.
1014    pub kernel_id: KernelId,
1015    /// Last heartbeat time.
1016    pub last_heartbeat: Instant,
1017    /// Health status.
1018    pub status: HealthStatus,
1019    /// Consecutive failure count.
1020    pub failure_count: u32,
1021    /// Message processing rate.
1022    pub messages_per_sec: f64,
1023    /// Current queue depth.
1024    pub queue_depth: usize,
1025}
1026
1027/// Watchdog for monitoring kernel health.
1028pub struct KernelWatchdog {
1029    /// Watched kernels.
1030    kernels: RwLock<HashMap<KernelId, KernelHealth>>,
1031    /// Heartbeat timeout.
1032    heartbeat_timeout: Duration,
1033    /// Check interval (used by async runtime loop).
1034    #[allow(dead_code)]
1035    check_interval: Duration,
1036    /// Failure threshold before marking unhealthy.
1037    failure_threshold: u32,
1038    /// Running state (used by async runtime loop).
1039    #[allow(dead_code)]
1040    running: std::sync::atomic::AtomicBool,
1041    /// Unhealthy kernel callbacks.
1042    #[allow(clippy::type_complexity)]
1043    callbacks: RwLock<Vec<Arc<dyn Fn(&KernelHealth) + Send + Sync>>>,
1044}
1045
1046impl KernelWatchdog {
1047    /// Create a new kernel watchdog.
1048    pub fn new() -> Arc<Self> {
1049        Arc::new(Self {
1050            kernels: RwLock::new(HashMap::new()),
1051            heartbeat_timeout: Duration::from_secs(30),
1052            check_interval: Duration::from_secs(5),
1053            failure_threshold: 3,
1054            running: std::sync::atomic::AtomicBool::new(false),
1055            callbacks: RwLock::new(Vec::new()),
1056        })
1057    }
1058
1059    /// Set heartbeat timeout.
1060    pub fn with_heartbeat_timeout(self: Arc<Self>, timeout: Duration) -> Arc<Self> {
1061        let _ = timeout; // Would need interior mutability
1062        self
1063    }
1064
1065    /// Register a kernel to watch.
1066    pub fn watch(&self, kernel_id: KernelId) {
1067        let health = KernelHealth {
1068            kernel_id: kernel_id.clone(),
1069            last_heartbeat: Instant::now(),
1070            status: HealthStatus::Healthy,
1071            failure_count: 0,
1072            messages_per_sec: 0.0,
1073            queue_depth: 0,
1074        };
1075        self.kernels.write().insert(kernel_id, health);
1076    }
1077
1078    /// Unregister a kernel.
1079    pub fn unwatch(&self, kernel_id: &KernelId) {
1080        self.kernels.write().remove(kernel_id);
1081    }
1082
1083    /// Record heartbeat from kernel.
1084    pub fn heartbeat(&self, kernel_id: &KernelId) {
1085        if let Some(health) = self.kernels.write().get_mut(kernel_id) {
1086            health.last_heartbeat = Instant::now();
1087            health.failure_count = 0;
1088            if health.status == HealthStatus::Unhealthy {
1089                health.status = HealthStatus::Healthy;
1090            }
1091        }
1092    }
1093
1094    /// Update kernel metrics.
1095    pub fn update_metrics(&self, kernel_id: &KernelId, messages_per_sec: f64, queue_depth: usize) {
1096        if let Some(health) = self.kernels.write().get_mut(kernel_id) {
1097            health.messages_per_sec = messages_per_sec;
1098            health.queue_depth = queue_depth;
1099        }
1100    }
1101
1102    /// Check all kernel health.
1103    pub fn check_all(&self) -> Vec<KernelHealth> {
1104        let now = Instant::now();
1105        let mut kernels = self.kernels.write();
1106        let mut results = Vec::with_capacity(kernels.len());
1107
1108        for health in kernels.values_mut() {
1109            // Check heartbeat timeout
1110            if now.duration_since(health.last_heartbeat) > self.heartbeat_timeout {
1111                health.failure_count += 1;
1112                if health.failure_count >= self.failure_threshold {
1113                    health.status = HealthStatus::Unhealthy;
1114                } else {
1115                    health.status = HealthStatus::Degraded;
1116                }
1117            }
1118
1119            results.push(health.clone());
1120        }
1121
1122        // Notify callbacks for unhealthy kernels
1123        drop(kernels);
1124        let callbacks = self.callbacks.read().clone();
1125        for health in results
1126            .iter()
1127            .filter(|h| h.status == HealthStatus::Unhealthy)
1128        {
1129            for callback in &callbacks {
1130                callback(health);
1131            }
1132        }
1133
1134        results
1135    }
1136
1137    /// Register unhealthy kernel callback.
1138    pub fn on_unhealthy<F>(&self, callback: F)
1139    where
1140        F: Fn(&KernelHealth) + Send + Sync + 'static,
1141    {
1142        self.callbacks.write().push(Arc::new(callback));
1143    }
1144
1145    /// Get health for specific kernel.
1146    pub fn get_health(&self, kernel_id: &KernelId) -> Option<KernelHealth> {
1147        self.kernels.read().get(kernel_id).cloned()
1148    }
1149
1150    /// Get all unhealthy kernels.
1151    pub fn unhealthy_kernels(&self) -> Vec<KernelHealth> {
1152        self.kernels
1153            .read()
1154            .values()
1155            .filter(|h| h.status == HealthStatus::Unhealthy)
1156            .cloned()
1157            .collect()
1158    }
1159
1160    /// Get watched kernel count.
1161    pub fn watched_count(&self) -> usize {
1162        self.kernels.read().len()
1163    }
1164}
1165
1166impl Default for KernelWatchdog {
1167    fn default() -> Self {
1168        Self {
1169            kernels: RwLock::new(HashMap::new()),
1170            heartbeat_timeout: Duration::from_secs(30),
1171            check_interval: Duration::from_secs(5),
1172            failure_threshold: 3,
1173            running: std::sync::atomic::AtomicBool::new(false),
1174            callbacks: RwLock::new(Vec::new()),
1175        }
1176    }
1177}
1178
1179// ============================================================================
1180// Automatic Recovery (Phase 5.2 - Enterprise Operational Excellence)
1181// ============================================================================
1182
1183/// Recovery policy for handling kernel failures.
1184#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
1185pub enum RecoveryPolicy {
1186    /// Restart the failed kernel.
1187    Restart,
1188    /// Migrate the kernel to a healthy GPU.
1189    Migrate,
1190    /// Create a checkpoint before recovery.
1191    Checkpoint,
1192    /// Notify operators but don't take action.
1193    #[default]
1194    Notify,
1195    /// Escalate to higher severity handling.
1196    Escalate,
1197    /// Open circuit breaker to prevent cascading failures.
1198    Circuit,
1199}
1200
1201impl RecoveryPolicy {
1202    /// Get the severity level of this policy.
1203    pub fn severity(&self) -> u8 {
1204        match self {
1205            RecoveryPolicy::Notify => 1,
1206            RecoveryPolicy::Checkpoint => 2,
1207            RecoveryPolicy::Restart => 3,
1208            RecoveryPolicy::Circuit => 4,
1209            RecoveryPolicy::Migrate => 5,
1210            RecoveryPolicy::Escalate => 6,
1211        }
1212    }
1213
1214    /// Check if this policy requires human intervention.
1215    pub fn requires_intervention(&self) -> bool {
1216        matches!(self, RecoveryPolicy::Notify | RecoveryPolicy::Escalate)
1217    }
1218}
1219
1220/// Configuration for automatic recovery.
1221#[derive(Debug, Clone)]
1222pub struct RecoveryConfig {
1223    /// Maximum restart attempts before escalating.
1224    pub max_restart_attempts: u32,
1225    /// Delay between restart attempts.
1226    pub restart_delay: Duration,
1227    /// Whether to checkpoint before restart.
1228    pub checkpoint_before_restart: bool,
1229    /// Whether to migrate on device errors.
1230    pub migrate_on_device_error: bool,
1231    /// Cooldown period between recovery attempts.
1232    pub recovery_cooldown: Duration,
1233    /// Policies for different failure types.
1234    pub policies: HashMap<FailureType, RecoveryPolicy>,
1235}
1236
1237impl Default for RecoveryConfig {
1238    fn default() -> Self {
1239        let mut policies = HashMap::new();
1240        policies.insert(FailureType::Timeout, RecoveryPolicy::Restart);
1241        policies.insert(FailureType::Crash, RecoveryPolicy::Restart);
1242        policies.insert(FailureType::DeviceError, RecoveryPolicy::Migrate);
1243        policies.insert(FailureType::ResourceExhausted, RecoveryPolicy::Circuit);
1244        policies.insert(FailureType::Unknown, RecoveryPolicy::Notify);
1245
1246        Self {
1247            max_restart_attempts: 3,
1248            restart_delay: Duration::from_secs(5),
1249            checkpoint_before_restart: true,
1250            migrate_on_device_error: true,
1251            recovery_cooldown: Duration::from_secs(60),
1252            policies,
1253        }
1254    }
1255}
1256
1257impl RecoveryConfig {
1258    /// Create a new builder.
1259    pub fn builder() -> RecoveryConfigBuilder {
1260        RecoveryConfigBuilder::new()
1261    }
1262
1263    /// Create a conservative config (notify-first).
1264    #[allow(clippy::field_reassign_with_default)]
1265    pub fn conservative() -> Self {
1266        let mut config = Self::default();
1267        config.max_restart_attempts = 1;
1268        config.checkpoint_before_restart = true;
1269        for policy in config.policies.values_mut() {
1270            if *policy == RecoveryPolicy::Restart {
1271                *policy = RecoveryPolicy::Notify;
1272            }
1273        }
1274        config
1275    }
1276
1277    /// Create an aggressive config (auto-recover).
1278    #[allow(clippy::field_reassign_with_default)]
1279    pub fn aggressive() -> Self {
1280        let mut config = Self::default();
1281        config.max_restart_attempts = 5;
1282        config.checkpoint_before_restart = false;
1283        config.restart_delay = Duration::from_secs(1);
1284        config.recovery_cooldown = Duration::from_secs(10);
1285        config
1286    }
1287
1288    /// Get recovery policy for a failure type.
1289    pub fn policy_for(&self, failure_type: FailureType) -> RecoveryPolicy {
1290        self.policies
1291            .get(&failure_type)
1292            .copied()
1293            .unwrap_or(RecoveryPolicy::Notify)
1294    }
1295}
1296
1297/// Builder for recovery configuration.
1298#[derive(Debug, Default)]
1299pub struct RecoveryConfigBuilder {
1300    config: RecoveryConfig,
1301}
1302
1303impl RecoveryConfigBuilder {
1304    /// Create a new builder.
1305    pub fn new() -> Self {
1306        Self {
1307            config: RecoveryConfig::default(),
1308        }
1309    }
1310
1311    /// Set maximum restart attempts.
1312    pub fn max_restart_attempts(mut self, attempts: u32) -> Self {
1313        self.config.max_restart_attempts = attempts;
1314        self
1315    }
1316
1317    /// Set restart delay.
1318    pub fn restart_delay(mut self, delay: Duration) -> Self {
1319        self.config.restart_delay = delay;
1320        self
1321    }
1322
1323    /// Enable/disable checkpoint before restart.
1324    pub fn checkpoint_before_restart(mut self, enabled: bool) -> Self {
1325        self.config.checkpoint_before_restart = enabled;
1326        self
1327    }
1328
1329    /// Enable/disable migration on device errors.
1330    pub fn migrate_on_device_error(mut self, enabled: bool) -> Self {
1331        self.config.migrate_on_device_error = enabled;
1332        self
1333    }
1334
1335    /// Set recovery cooldown.
1336    pub fn recovery_cooldown(mut self, cooldown: Duration) -> Self {
1337        self.config.recovery_cooldown = cooldown;
1338        self
1339    }
1340
1341    /// Set policy for a failure type.
1342    pub fn policy(mut self, failure_type: FailureType, policy: RecoveryPolicy) -> Self {
1343        self.config.policies.insert(failure_type, policy);
1344        self
1345    }
1346
1347    /// Build the configuration.
1348    pub fn build(self) -> RecoveryConfig {
1349        self.config
1350    }
1351}
1352
1353/// Types of kernel failures.
1354#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1355pub enum FailureType {
1356    /// Kernel timed out (no heartbeat).
1357    Timeout,
1358    /// Kernel crashed unexpectedly.
1359    Crash,
1360    /// GPU device error.
1361    DeviceError,
1362    /// Out of memory or other resources.
1363    ResourceExhausted,
1364    /// Message queue overflow.
1365    QueueOverflow,
1366    /// State corruption detected.
1367    StateCorruption,
1368    /// Unknown failure.
1369    Unknown,
1370}
1371
1372impl FailureType {
1373    /// Describe this failure type.
1374    pub fn description(&self) -> &'static str {
1375        match self {
1376            FailureType::Timeout => "Kernel heartbeat timeout",
1377            FailureType::Crash => "Kernel crash",
1378            FailureType::DeviceError => "GPU device error",
1379            FailureType::ResourceExhausted => "Resource exhaustion",
1380            FailureType::QueueOverflow => "Message queue overflow",
1381            FailureType::StateCorruption => "State corruption detected",
1382            FailureType::Unknown => "Unknown failure",
1383        }
1384    }
1385}
1386
1387/// A recovery action to be taken.
1388#[derive(Debug, Clone)]
1389pub struct RecoveryAction {
1390    /// Kernel ID.
1391    pub kernel_id: KernelId,
1392    /// Failure type that triggered recovery.
1393    pub failure_type: FailureType,
1394    /// Policy to apply.
1395    pub policy: RecoveryPolicy,
1396    /// Number of previous recovery attempts.
1397    pub attempt: u32,
1398    /// When the action was created.
1399    pub created_at: Instant,
1400    /// Additional context.
1401    pub context: HashMap<String, String>,
1402}
1403
1404impl RecoveryAction {
1405    /// Create a new recovery action.
1406    pub fn new(kernel_id: KernelId, failure_type: FailureType, policy: RecoveryPolicy) -> Self {
1407        Self {
1408            kernel_id,
1409            failure_type,
1410            policy,
1411            attempt: 1,
1412            created_at: Instant::now(),
1413            context: HashMap::new(),
1414        }
1415    }
1416
1417    /// Add context information.
1418    pub fn with_context(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
1419        self.context.insert(key.into(), value.into());
1420        self
1421    }
1422
1423    /// Set attempt number.
1424    pub fn with_attempt(mut self, attempt: u32) -> Self {
1425        self.attempt = attempt;
1426        self
1427    }
1428}
1429
1430/// Result of a recovery action.
1431#[derive(Debug, Clone)]
1432pub struct RecoveryResult {
1433    /// The action that was attempted.
1434    pub action: RecoveryAction,
1435    /// Whether recovery was successful.
1436    pub success: bool,
1437    /// Error message if failed.
1438    pub error: Option<String>,
1439    /// Duration of recovery attempt.
1440    pub duration: Duration,
1441    /// Next action if recovery failed.
1442    pub next_action: Option<RecoveryPolicy>,
1443}
1444
1445impl RecoveryResult {
1446    /// Create a successful result.
1447    pub fn success(action: RecoveryAction, duration: Duration) -> Self {
1448        Self {
1449            action,
1450            success: true,
1451            error: None,
1452            duration,
1453            next_action: None,
1454        }
1455    }
1456
1457    /// Create a failed result.
1458    pub fn failure(action: RecoveryAction, error: String, duration: Duration) -> Self {
1459        Self {
1460            action,
1461            success: false,
1462            error: Some(error),
1463            duration,
1464            next_action: Some(RecoveryPolicy::Escalate),
1465        }
1466    }
1467
1468    /// Create a failed result with a next action.
1469    pub fn failure_with_next(
1470        action: RecoveryAction,
1471        error: String,
1472        duration: Duration,
1473        next: RecoveryPolicy,
1474    ) -> Self {
1475        Self {
1476            action,
1477            success: false,
1478            error: Some(error),
1479            duration,
1480            next_action: Some(next),
1481        }
1482    }
1483}
1484
1485/// Type alias for recovery handler functions.
1486pub type RecoveryHandler = Arc<
1487    dyn Fn(&RecoveryAction) -> Pin<Box<dyn Future<Output = RecoveryResult> + Send>> + Send + Sync,
1488>;
1489
1490/// Manager for automatic kernel recovery.
1491pub struct RecoveryManager {
1492    /// Configuration.
1493    config: RwLock<RecoveryConfig>,
1494    /// Recovery handlers by policy.
1495    handlers: RwLock<HashMap<RecoveryPolicy, RecoveryHandler>>,
1496    /// Recovery history per kernel.
1497    history: RwLock<HashMap<KernelId, Vec<RecoveryResult>>>,
1498    /// Current attempt counts.
1499    attempts: RwLock<HashMap<KernelId, u32>>,
1500    /// Last recovery time per kernel.
1501    last_recovery: RwLock<HashMap<KernelId, Instant>>,
1502    /// Statistics.
1503    stats: RecoveryStats,
1504    /// Enabled flag.
1505    enabled: std::sync::atomic::AtomicBool,
1506}
1507
1508impl RecoveryManager {
1509    /// Create a new recovery manager with default config.
1510    pub fn new() -> Self {
1511        Self::with_config(RecoveryConfig::default())
1512    }
1513
1514    /// Create with specific configuration.
1515    pub fn with_config(config: RecoveryConfig) -> Self {
1516        Self {
1517            config: RwLock::new(config),
1518            handlers: RwLock::new(HashMap::new()),
1519            history: RwLock::new(HashMap::new()),
1520            attempts: RwLock::new(HashMap::new()),
1521            last_recovery: RwLock::new(HashMap::new()),
1522            stats: RecoveryStats::default(),
1523            enabled: std::sync::atomic::AtomicBool::new(true),
1524        }
1525    }
1526
1527    /// Enable/disable automatic recovery.
1528    pub fn set_enabled(&self, enabled: bool) {
1529        self.enabled.store(enabled, Ordering::SeqCst);
1530    }
1531
1532    /// Check if recovery is enabled.
1533    pub fn is_enabled(&self) -> bool {
1534        self.enabled.load(Ordering::SeqCst)
1535    }
1536
1537    /// Update configuration.
1538    pub fn set_config(&self, config: RecoveryConfig) {
1539        *self.config.write() = config;
1540    }
1541
1542    /// Get current configuration.
1543    pub fn config(&self) -> RecoveryConfig {
1544        self.config.read().clone()
1545    }
1546
1547    /// Register a recovery handler.
1548    pub fn register_handler(&self, policy: RecoveryPolicy, handler: RecoveryHandler) {
1549        self.handlers.write().insert(policy, handler);
1550    }
1551
1552    /// Check if recovery should be attempted (respects cooldown).
1553    pub fn should_recover(&self, kernel_id: &KernelId) -> bool {
1554        if !self.is_enabled() {
1555            return false;
1556        }
1557
1558        let config = self.config.read();
1559        let last_recovery = self.last_recovery.read();
1560
1561        if let Some(last) = last_recovery.get(kernel_id) {
1562            last.elapsed() >= config.recovery_cooldown
1563        } else {
1564            true
1565        }
1566    }
1567
1568    /// Determine recovery action for a failure.
1569    pub fn determine_action(
1570        &self,
1571        kernel_id: &KernelId,
1572        failure_type: FailureType,
1573    ) -> RecoveryAction {
1574        let config = self.config.read();
1575        let attempts = self.attempts.read();
1576
1577        let current_attempt = attempts.get(kernel_id).copied().unwrap_or(0) + 1;
1578        let policy = if current_attempt > config.max_restart_attempts {
1579            RecoveryPolicy::Escalate
1580        } else {
1581            config.policy_for(failure_type)
1582        };
1583
1584        RecoveryAction::new(kernel_id.clone(), failure_type, policy).with_attempt(current_attempt)
1585    }
1586
1587    /// Execute recovery for a kernel.
1588    pub async fn recover(&self, action: RecoveryAction) -> RecoveryResult {
1589        let _start = Instant::now();
1590        let kernel_id = action.kernel_id.clone();
1591        let policy = action.policy;
1592
1593        // Update attempt count
1594        {
1595            let mut attempts = self.attempts.write();
1596            let count = attempts.entry(kernel_id.clone()).or_insert(0);
1597            *count += 1;
1598        }
1599
1600        // Update last recovery time
1601        self.last_recovery
1602            .write()
1603            .insert(kernel_id.clone(), Instant::now());
1604
1605        // Get handler
1606        let handler = self.handlers.read().get(&policy).cloned();
1607
1608        let result = if let Some(handler) = handler {
1609            self.stats.attempts.fetch_add(1, Ordering::Relaxed);
1610            handler(&action).await
1611        } else {
1612            // No handler - use default behavior
1613            let result = self.default_recovery(&action).await;
1614            result
1615        };
1616
1617        // Update statistics
1618        if result.success {
1619            self.stats.successes.fetch_add(1, Ordering::Relaxed);
1620            // Reset attempt count on success
1621            self.attempts.write().remove(&kernel_id);
1622        } else {
1623            self.stats.failures.fetch_add(1, Ordering::Relaxed);
1624        }
1625
1626        // Store in history
1627        self.history
1628            .write()
1629            .entry(kernel_id)
1630            .or_default()
1631            .push(result.clone());
1632
1633        result
1634    }
1635
1636    /// Default recovery behavior.
1637    async fn default_recovery(&self, action: &RecoveryAction) -> RecoveryResult {
1638        let start = Instant::now();
1639
1640        match action.policy {
1641            RecoveryPolicy::Notify => {
1642                // Just log - no action taken
1643                RecoveryResult::success(action.clone(), start.elapsed())
1644            }
1645            RecoveryPolicy::Checkpoint => {
1646                // In a real implementation, this would trigger a checkpoint
1647                RecoveryResult::success(action.clone(), start.elapsed())
1648            }
1649            RecoveryPolicy::Restart => {
1650                // In a real implementation, this would restart the kernel
1651                let config = self.config.read();
1652                if action.attempt > config.max_restart_attempts {
1653                    RecoveryResult::failure_with_next(
1654                        action.clone(),
1655                        "Max restart attempts exceeded".to_string(),
1656                        start.elapsed(),
1657                        RecoveryPolicy::Escalate,
1658                    )
1659                } else {
1660                    RecoveryResult::success(action.clone(), start.elapsed())
1661                }
1662            }
1663            RecoveryPolicy::Migrate => {
1664                // In a real implementation, this would migrate to another GPU
1665                RecoveryResult::success(action.clone(), start.elapsed())
1666            }
1667            RecoveryPolicy::Circuit => {
1668                // Open circuit breaker
1669                RecoveryResult::success(action.clone(), start.elapsed())
1670            }
1671            RecoveryPolicy::Escalate => {
1672                // Escalation requires manual intervention
1673                RecoveryResult::failure(
1674                    action.clone(),
1675                    "Manual intervention required".to_string(),
1676                    start.elapsed(),
1677                )
1678            }
1679        }
1680    }
1681
1682    /// Get recovery history for a kernel.
1683    pub fn get_history(&self, kernel_id: &KernelId) -> Vec<RecoveryResult> {
1684        self.history
1685            .read()
1686            .get(kernel_id)
1687            .cloned()
1688            .unwrap_or_default()
1689    }
1690
1691    /// Clear recovery history.
1692    pub fn clear_history(&self) {
1693        self.history.write().clear();
1694        self.attempts.write().clear();
1695        self.last_recovery.write().clear();
1696    }
1697
1698    /// Get statistics snapshot.
1699    pub fn stats(&self) -> RecoveryStatsSnapshot {
1700        RecoveryStatsSnapshot {
1701            attempts: self.stats.attempts.load(Ordering::Relaxed),
1702            successes: self.stats.successes.load(Ordering::Relaxed),
1703            failures: self.stats.failures.load(Ordering::Relaxed),
1704            kernels_tracked: self.history.read().len(),
1705        }
1706    }
1707}
1708
1709impl Default for RecoveryManager {
1710    fn default() -> Self {
1711        Self::new()
1712    }
1713}
1714
1715/// Recovery statistics (atomic counters).
1716#[derive(Default)]
1717struct RecoveryStats {
1718    attempts: AtomicU64,
1719    successes: AtomicU64,
1720    failures: AtomicU64,
1721}
1722
1723/// Snapshot of recovery statistics.
1724#[derive(Debug, Clone, Default)]
1725pub struct RecoveryStatsSnapshot {
1726    /// Total recovery attempts.
1727    pub attempts: u64,
1728    /// Successful recoveries.
1729    pub successes: u64,
1730    /// Failed recoveries.
1731    pub failures: u64,
1732    /// Number of kernels with recovery history.
1733    pub kernels_tracked: usize,
1734}
1735
1736impl RecoveryStatsSnapshot {
1737    /// Calculate success rate.
1738    pub fn success_rate(&self) -> f64 {
1739        if self.attempts == 0 {
1740            1.0
1741        } else {
1742            self.successes as f64 / self.attempts as f64
1743        }
1744    }
1745}
1746
1747#[cfg(test)]
1748mod tests {
1749    use super::*;
1750
1751    #[test]
1752    fn test_health_status() {
1753        assert!(HealthStatus::Healthy.is_healthy());
1754        assert!(HealthStatus::Degraded.is_healthy());
1755        assert!(!HealthStatus::Unhealthy.is_healthy());
1756        assert!(HealthStatus::Unhealthy.is_unhealthy());
1757    }
1758
1759    #[tokio::test]
1760    async fn test_health_checker() {
1761        let checker = HealthChecker::new();
1762
1763        checker.register_liveness("test_alive", || async { true });
1764        checker.register_readiness("test_ready", || async { true });
1765
1766        assert_eq!(checker.check_count(), 2);
1767        assert!(checker.is_alive().await);
1768        assert!(checker.is_ready().await);
1769    }
1770
1771    #[tokio::test]
1772    async fn test_health_checker_unhealthy() {
1773        let checker = HealthChecker::new();
1774
1775        checker.register_liveness("failing_check", || async { false });
1776
1777        assert!(!checker.is_alive().await);
1778
1779        let status = checker.aggregate_status().await;
1780        assert_eq!(status, HealthStatus::Unhealthy);
1781    }
1782
1783    #[test]
1784    fn test_circuit_breaker_initial_state() {
1785        let breaker = CircuitBreaker::new();
1786        assert_eq!(breaker.state(), CircuitState::Closed);
1787        assert!(breaker.is_allowed());
1788    }
1789
1790    #[test]
1791    fn test_circuit_breaker_opens_on_failures() {
1792        let config = CircuitBreakerConfig {
1793            failure_threshold: 3,
1794            ..Default::default()
1795        };
1796        let breaker = CircuitBreaker::with_config(config);
1797
1798        breaker.record_failure();
1799        breaker.record_failure();
1800        assert_eq!(breaker.state(), CircuitState::Closed);
1801
1802        breaker.record_failure();
1803        assert_eq!(breaker.state(), CircuitState::Open);
1804        assert!(!breaker.is_allowed());
1805    }
1806
1807    #[test]
1808    fn test_circuit_breaker_reset() {
1809        let config = CircuitBreakerConfig {
1810            failure_threshold: 1,
1811            ..Default::default()
1812        };
1813        let breaker = CircuitBreaker::with_config(config);
1814
1815        breaker.record_failure();
1816        assert_eq!(breaker.state(), CircuitState::Open);
1817
1818        breaker.reset();
1819        assert_eq!(breaker.state(), CircuitState::Closed);
1820    }
1821
1822    #[test]
1823    fn test_backoff_strategy_fixed() {
1824        let backoff = BackoffStrategy::Fixed(Duration::from_secs(1));
1825        assert_eq!(backoff.delay(0), Duration::from_secs(1));
1826        assert_eq!(backoff.delay(5), Duration::from_secs(1));
1827    }
1828
1829    #[test]
1830    fn test_backoff_strategy_exponential() {
1831        let backoff = BackoffStrategy::Exponential {
1832            initial: Duration::from_millis(100),
1833            max: Duration::from_secs(10),
1834            multiplier: 2.0,
1835        };
1836
1837        assert_eq!(backoff.delay(0), Duration::from_millis(100));
1838        assert_eq!(backoff.delay(1), Duration::from_millis(200));
1839        assert_eq!(backoff.delay(2), Duration::from_millis(400));
1840    }
1841
1842    #[test]
1843    fn test_backoff_strategy_linear() {
1844        let backoff = BackoffStrategy::Linear {
1845            initial: Duration::from_millis(100),
1846            max: Duration::from_secs(1),
1847        };
1848
1849        assert_eq!(backoff.delay(0), Duration::from_millis(100));
1850        assert_eq!(backoff.delay(1), Duration::from_millis(200));
1851        assert_eq!(backoff.delay(9), Duration::from_secs(1)); // Capped
1852    }
1853
1854    #[tokio::test]
1855    async fn test_retry_policy_success() {
1856        let policy = RetryPolicy::new(3);
1857
1858        let result: Result<i32> = policy.execute(|| async { Ok::<_, &str>(42) }).await;
1859
1860        assert!(result.is_ok());
1861        assert_eq!(result.unwrap(), 42);
1862    }
1863
1864    #[test]
1865    fn test_degradation_manager_levels() {
1866        let manager = DegradationManager::new();
1867
1868        assert_eq!(manager.level(), DegradationLevel::Normal);
1869
1870        manager.set_level(DegradationLevel::Moderate);
1871        assert_eq!(manager.level(), DegradationLevel::Moderate);
1872    }
1873
1874    #[test]
1875    fn test_degradation_feature_disabled() {
1876        let manager = DegradationManager::new();
1877
1878        manager.set_level(DegradationLevel::Severe);
1879
1880        assert!(!manager.is_feature_disabled(DegradationLevel::Critical));
1881        assert!(manager.is_feature_disabled(DegradationLevel::Moderate));
1882        assert!(manager.is_feature_disabled(DegradationLevel::Normal));
1883    }
1884
1885    #[test]
1886    fn test_kernel_watchdog() {
1887        let watchdog = KernelWatchdog::new();
1888
1889        let kernel_id = KernelId::new("test_kernel");
1890        watchdog.watch(kernel_id.clone());
1891
1892        assert_eq!(watchdog.watched_count(), 1);
1893
1894        watchdog.heartbeat(&kernel_id);
1895        let health = watchdog.get_health(&kernel_id).unwrap();
1896        assert_eq!(health.status, HealthStatus::Healthy);
1897    }
1898
1899    #[test]
1900    fn test_kernel_watchdog_metrics() {
1901        let watchdog = KernelWatchdog::new();
1902
1903        let kernel_id = KernelId::new("test_kernel");
1904        watchdog.watch(kernel_id.clone());
1905
1906        watchdog.update_metrics(&kernel_id, 1000.0, 50);
1907
1908        let health = watchdog.get_health(&kernel_id).unwrap();
1909        assert_eq!(health.messages_per_sec, 1000.0);
1910        assert_eq!(health.queue_depth, 50);
1911    }
1912
1913    // Recovery tests
1914    #[test]
1915    fn test_recovery_policy_severity() {
1916        assert!(RecoveryPolicy::Notify.severity() < RecoveryPolicy::Restart.severity());
1917        assert!(RecoveryPolicy::Restart.severity() < RecoveryPolicy::Migrate.severity());
1918        assert!(RecoveryPolicy::Migrate.severity() < RecoveryPolicy::Escalate.severity());
1919    }
1920
1921    #[test]
1922    fn test_recovery_policy_requires_intervention() {
1923        assert!(RecoveryPolicy::Notify.requires_intervention());
1924        assert!(RecoveryPolicy::Escalate.requires_intervention());
1925        assert!(!RecoveryPolicy::Restart.requires_intervention());
1926        assert!(!RecoveryPolicy::Migrate.requires_intervention());
1927    }
1928
1929    #[test]
1930    fn test_recovery_config_default() {
1931        let config = RecoveryConfig::default();
1932        assert_eq!(config.max_restart_attempts, 3);
1933        assert!(config.checkpoint_before_restart);
1934        assert!(config.migrate_on_device_error);
1935        assert_eq!(
1936            config.policy_for(FailureType::Timeout),
1937            RecoveryPolicy::Restart
1938        );
1939        assert_eq!(
1940            config.policy_for(FailureType::DeviceError),
1941            RecoveryPolicy::Migrate
1942        );
1943    }
1944
1945    #[test]
1946    fn test_recovery_config_conservative() {
1947        let config = RecoveryConfig::conservative();
1948        assert_eq!(config.max_restart_attempts, 1);
1949        assert_eq!(
1950            config.policy_for(FailureType::Timeout),
1951            RecoveryPolicy::Notify
1952        );
1953    }
1954
1955    #[test]
1956    fn test_recovery_config_aggressive() {
1957        let config = RecoveryConfig::aggressive();
1958        assert_eq!(config.max_restart_attempts, 5);
1959        assert!(!config.checkpoint_before_restart);
1960        assert_eq!(config.restart_delay, Duration::from_secs(1));
1961    }
1962
1963    #[test]
1964    fn test_recovery_config_builder() {
1965        let config = RecoveryConfig::builder()
1966            .max_restart_attempts(10)
1967            .restart_delay(Duration::from_secs(2))
1968            .checkpoint_before_restart(false)
1969            .recovery_cooldown(Duration::from_secs(30))
1970            .policy(FailureType::Crash, RecoveryPolicy::Migrate)
1971            .build();
1972
1973        assert_eq!(config.max_restart_attempts, 10);
1974        assert_eq!(config.restart_delay, Duration::from_secs(2));
1975        assert!(!config.checkpoint_before_restart);
1976        assert_eq!(config.recovery_cooldown, Duration::from_secs(30));
1977        assert_eq!(
1978            config.policy_for(FailureType::Crash),
1979            RecoveryPolicy::Migrate
1980        );
1981    }
1982
1983    #[test]
1984    fn test_failure_type_description() {
1985        assert_eq!(
1986            FailureType::Timeout.description(),
1987            "Kernel heartbeat timeout"
1988        );
1989        assert_eq!(FailureType::Crash.description(), "Kernel crash");
1990        assert_eq!(FailureType::DeviceError.description(), "GPU device error");
1991    }
1992
1993    #[test]
1994    fn test_recovery_action() {
1995        let kernel_id = KernelId::new("test_kernel");
1996        let action = RecoveryAction::new(
1997            kernel_id.clone(),
1998            FailureType::Timeout,
1999            RecoveryPolicy::Restart,
2000        )
2001        .with_context("reason", "heartbeat missed")
2002        .with_attempt(2);
2003
2004        assert_eq!(action.kernel_id, kernel_id);
2005        assert_eq!(action.failure_type, FailureType::Timeout);
2006        assert_eq!(action.policy, RecoveryPolicy::Restart);
2007        assert_eq!(action.attempt, 2);
2008        assert_eq!(
2009            action.context.get("reason"),
2010            Some(&"heartbeat missed".to_string())
2011        );
2012    }
2013
2014    #[test]
2015    fn test_recovery_result() {
2016        let action = RecoveryAction::new(
2017            KernelId::new("test"),
2018            FailureType::Crash,
2019            RecoveryPolicy::Restart,
2020        );
2021
2022        let success = RecoveryResult::success(action.clone(), Duration::from_millis(100));
2023        assert!(success.success);
2024        assert!(success.error.is_none());
2025        assert!(success.next_action.is_none());
2026
2027        let failure = RecoveryResult::failure(
2028            action.clone(),
2029            "Failed".to_string(),
2030            Duration::from_millis(50),
2031        );
2032        assert!(!failure.success);
2033        assert_eq!(failure.error, Some("Failed".to_string()));
2034        assert_eq!(failure.next_action, Some(RecoveryPolicy::Escalate));
2035    }
2036
2037    #[test]
2038    fn test_recovery_manager_creation() {
2039        let manager = RecoveryManager::new();
2040        assert!(manager.is_enabled());
2041
2042        let stats = manager.stats();
2043        assert_eq!(stats.attempts, 0);
2044        assert_eq!(stats.successes, 0);
2045        assert_eq!(stats.failures, 0);
2046    }
2047
2048    #[test]
2049    fn test_recovery_manager_enable_disable() {
2050        let manager = RecoveryManager::new();
2051
2052        assert!(manager.is_enabled());
2053        manager.set_enabled(false);
2054        assert!(!manager.is_enabled());
2055        manager.set_enabled(true);
2056        assert!(manager.is_enabled());
2057    }
2058
2059    #[test]
2060    fn test_recovery_manager_determine_action() {
2061        let manager = RecoveryManager::new();
2062        let kernel_id = KernelId::new("test_kernel");
2063
2064        let action = manager.determine_action(&kernel_id, FailureType::Timeout);
2065        assert_eq!(action.kernel_id, kernel_id);
2066        assert_eq!(action.failure_type, FailureType::Timeout);
2067        assert_eq!(action.policy, RecoveryPolicy::Restart);
2068        assert_eq!(action.attempt, 1);
2069    }
2070
2071    #[test]
2072    fn test_recovery_manager_should_recover() {
2073        let config = RecoveryConfig::builder()
2074            .recovery_cooldown(Duration::from_millis(10))
2075            .build();
2076        let manager = RecoveryManager::with_config(config);
2077        let kernel_id = KernelId::new("test_kernel");
2078
2079        assert!(manager.should_recover(&kernel_id));
2080
2081        // Disable recovery
2082        manager.set_enabled(false);
2083        assert!(!manager.should_recover(&kernel_id));
2084    }
2085
2086    #[tokio::test]
2087    async fn test_recovery_manager_recover() {
2088        let manager = RecoveryManager::new();
2089        let kernel_id = KernelId::new("test_kernel");
2090
2091        let action = RecoveryAction::new(
2092            kernel_id.clone(),
2093            FailureType::Timeout,
2094            RecoveryPolicy::Notify,
2095        );
2096        let result = manager.recover(action).await;
2097
2098        assert!(result.success);
2099
2100        let stats = manager.stats();
2101        assert_eq!(stats.successes, 1);
2102        assert_eq!(stats.kernels_tracked, 1);
2103
2104        let history = manager.get_history(&kernel_id);
2105        assert_eq!(history.len(), 1);
2106    }
2107
2108    #[test]
2109    fn test_recovery_stats_snapshot_success_rate() {
2110        let stats = RecoveryStatsSnapshot {
2111            attempts: 10,
2112            successes: 8,
2113            failures: 2,
2114            kernels_tracked: 3,
2115        };
2116
2117        assert!((stats.success_rate() - 0.8).abs() < 0.001);
2118
2119        let empty = RecoveryStatsSnapshot::default();
2120        assert_eq!(empty.success_rate(), 1.0);
2121    }
2122
2123    #[test]
2124    fn test_recovery_manager_clear_history() {
2125        let manager = RecoveryManager::new();
2126        let kernel_id = KernelId::new("test_kernel");
2127
2128        // Add some history manually via attempts
2129        manager.attempts.write().insert(kernel_id.clone(), 5);
2130
2131        manager.clear_history();
2132
2133        assert!(manager.get_history(&kernel_id).is_empty());
2134        assert!(manager.attempts.read().is_empty());
2135    }
2136}