1use parking_lot::RwLock;
31use std::collections::HashMap;
32use std::future::Future;
33use std::pin::Pin;
34use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
35use std::sync::Arc;
36use std::time::{Duration, Instant};
37
38use crate::error::{Result, RingKernelError};
39use crate::runtime::KernelId;
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum HealthStatus {
48 Healthy,
50 Degraded,
52 Unhealthy,
54 Unknown,
56}
57
58impl HealthStatus {
59 pub fn is_healthy(&self) -> bool {
61 matches!(self, HealthStatus::Healthy | HealthStatus::Degraded)
62 }
63
64 pub fn is_unhealthy(&self) -> bool {
66 matches!(self, HealthStatus::Unhealthy)
67 }
68}
69
70#[derive(Debug, Clone)]
72pub struct HealthCheckResult {
73 pub name: String,
75 pub status: HealthStatus,
77 pub message: Option<String>,
79 pub duration: Duration,
81 pub checked_at: Instant,
83}
84
85pub type HealthCheckFn =
87 Arc<dyn Fn() -> Pin<Box<dyn Future<Output = HealthStatus> + Send>> + Send + Sync>;
88
89pub struct HealthCheck {
91 pub name: String,
93 check_fn: HealthCheckFn,
95 pub is_liveness: bool,
97 pub is_readiness: bool,
99 pub timeout: Duration,
101 last_result: RwLock<Option<HealthCheckResult>>,
103}
104
105impl HealthCheck {
106 pub fn new(name: impl Into<String>, check_fn: HealthCheckFn) -> Self {
108 Self {
109 name: name.into(),
110 check_fn,
111 is_liveness: false,
112 is_readiness: false,
113 timeout: Duration::from_secs(5),
114 last_result: RwLock::new(None),
115 }
116 }
117
118 pub fn liveness(mut self) -> Self {
120 self.is_liveness = true;
121 self
122 }
123
124 pub fn readiness(mut self) -> Self {
126 self.is_readiness = true;
127 self
128 }
129
130 pub fn timeout(mut self, timeout: Duration) -> Self {
132 self.timeout = timeout;
133 self
134 }
135
136 pub async fn check(&self) -> HealthCheckResult {
138 let start = Instant::now();
139 let status = (self.check_fn)().await;
140 let duration = start.elapsed();
141
142 let result = HealthCheckResult {
143 name: self.name.clone(),
144 status,
145 message: None,
146 duration,
147 checked_at: Instant::now(),
148 };
149
150 *self.last_result.write() = Some(result.clone());
151 result
152 }
153
154 pub fn last_result(&self) -> Option<HealthCheckResult> {
156 self.last_result.read().clone()
157 }
158}
159
160pub struct HealthChecker {
162 checks: RwLock<Vec<Arc<HealthCheck>>>,
164 #[allow(dead_code)]
166 check_interval: Duration,
167 #[allow(dead_code)]
169 running: std::sync::atomic::AtomicBool,
170}
171
172impl HealthChecker {
173 pub fn new() -> Arc<Self> {
175 Arc::new(Self {
176 checks: RwLock::new(Vec::new()),
177 check_interval: Duration::from_secs(10),
178 running: std::sync::atomic::AtomicBool::new(false),
179 })
180 }
181
182 pub fn with_interval(self: Arc<Self>, interval: Duration) -> Arc<Self> {
184 let _ = interval;
187 self
188 }
189
190 pub fn register(&self, check: HealthCheck) {
192 self.checks.write().push(Arc::new(check));
193 }
194
195 pub fn register_liveness<F, Fut>(&self, name: impl Into<String>, check_fn: F)
197 where
198 F: Fn() -> Fut + Send + Sync + 'static,
199 Fut: Future<Output = bool> + Send + 'static,
200 {
201 let name = name.into();
202 let check = HealthCheck::new(
203 name,
204 Arc::new(move || {
205 let fut = check_fn();
206 Box::pin(async move {
207 if fut.await {
208 HealthStatus::Healthy
209 } else {
210 HealthStatus::Unhealthy
211 }
212 }) as Pin<Box<dyn Future<Output = HealthStatus> + Send>>
213 }),
214 )
215 .liveness();
216 self.register(check);
217 }
218
219 pub fn register_readiness<F, Fut>(&self, name: impl Into<String>, check_fn: F)
221 where
222 F: Fn() -> Fut + Send + Sync + 'static,
223 Fut: Future<Output = bool> + Send + 'static,
224 {
225 let name = name.into();
226 let check = HealthCheck::new(
227 name,
228 Arc::new(move || {
229 let fut = check_fn();
230 Box::pin(async move {
231 if fut.await {
232 HealthStatus::Healthy
233 } else {
234 HealthStatus::Unhealthy
235 }
236 }) as Pin<Box<dyn Future<Output = HealthStatus> + Send>>
237 }),
238 )
239 .readiness();
240 self.register(check);
241 }
242
243 pub async fn check_all(&self) -> Vec<HealthCheckResult> {
245 let checks = self.checks.read().clone();
246 let mut results = Vec::with_capacity(checks.len());
247
248 for check in checks {
249 results.push(check.check().await);
250 }
251
252 results
253 }
254
255 pub async fn check_liveness(&self) -> Vec<HealthCheckResult> {
257 let checks = self.checks.read().clone();
258 let mut results = Vec::new();
259
260 for check in checks.iter().filter(|c| c.is_liveness) {
261 results.push(check.check().await);
262 }
263
264 results
265 }
266
267 pub async fn check_readiness(&self) -> Vec<HealthCheckResult> {
269 let checks = self.checks.read().clone();
270 let mut results = Vec::new();
271
272 for check in checks.iter().filter(|c| c.is_readiness) {
273 results.push(check.check().await);
274 }
275
276 results
277 }
278
279 pub async fn is_alive(&self) -> bool {
281 let results = self.check_liveness().await;
282 results.iter().all(|r| r.status.is_healthy())
283 }
284
285 pub async fn is_ready(&self) -> bool {
287 let results = self.check_readiness().await;
288 results.iter().all(|r| r.status.is_healthy())
289 }
290
291 pub async fn aggregate_status(&self) -> HealthStatus {
293 let results = self.check_all().await;
294
295 if results.is_empty() {
296 return HealthStatus::Unknown;
297 }
298
299 let all_healthy = results.iter().all(|r| r.status == HealthStatus::Healthy);
300 let any_unhealthy = results.iter().any(|r| r.status == HealthStatus::Unhealthy);
301
302 if all_healthy {
303 HealthStatus::Healthy
304 } else if any_unhealthy {
305 HealthStatus::Unhealthy
306 } else {
307 HealthStatus::Degraded
308 }
309 }
310
311 pub fn check_count(&self) -> usize {
313 self.checks.read().len()
314 }
315}
316
317impl Default for HealthChecker {
318 fn default() -> Self {
319 Self {
320 checks: RwLock::new(Vec::new()),
321 check_interval: Duration::from_secs(10),
322 running: std::sync::atomic::AtomicBool::new(false),
323 }
324 }
325}
326
327#[derive(Debug, Clone, Copy, PartialEq, Eq)]
333pub enum CircuitState {
334 Closed,
336 Open,
338 HalfOpen,
340}
341
342#[derive(Debug, Clone)]
344pub struct CircuitBreakerConfig {
345 pub failure_threshold: u32,
347 pub success_threshold: u32,
349 pub recovery_timeout: Duration,
351 pub window_duration: Duration,
353 pub half_open_max_requests: u32,
355}
356
357impl Default for CircuitBreakerConfig {
358 fn default() -> Self {
359 Self {
360 failure_threshold: 5,
361 success_threshold: 3,
362 recovery_timeout: Duration::from_secs(30),
363 window_duration: Duration::from_secs(60),
364 half_open_max_requests: 3,
365 }
366 }
367}
368
369pub struct CircuitBreaker {
371 config: CircuitBreakerConfig,
373 state: RwLock<CircuitState>,
375 failure_count: AtomicU32,
377 success_count: AtomicU32,
379 opened_at: RwLock<Option<Instant>>,
381 half_open_requests: AtomicU32,
383 total_requests: AtomicU64,
385 total_failures: AtomicU64,
387 total_rejections: AtomicU64,
389}
390
391impl CircuitBreaker {
392 pub fn new() -> Arc<Self> {
394 Self::with_config(CircuitBreakerConfig::default())
395 }
396
397 pub fn with_config(config: CircuitBreakerConfig) -> Arc<Self> {
399 Arc::new(Self {
400 config,
401 state: RwLock::new(CircuitState::Closed),
402 failure_count: AtomicU32::new(0),
403 success_count: AtomicU32::new(0),
404 opened_at: RwLock::new(None),
405 half_open_requests: AtomicU32::new(0),
406 total_requests: AtomicU64::new(0),
407 total_failures: AtomicU64::new(0),
408 total_rejections: AtomicU64::new(0),
409 })
410 }
411
412 pub fn state(&self) -> CircuitState {
414 let current_state = *self.state.read();
416 if current_state == CircuitState::Open {
417 if let Some(opened_at) = *self.opened_at.read() {
418 if opened_at.elapsed() >= self.config.recovery_timeout {
419 *self.state.write() = CircuitState::HalfOpen;
420 self.half_open_requests.store(0, Ordering::SeqCst);
421 self.success_count.store(0, Ordering::SeqCst);
422 return CircuitState::HalfOpen;
423 }
424 }
425 }
426 current_state
427 }
428
429 pub fn is_allowed(&self) -> bool {
431 match self.state() {
432 CircuitState::Closed => true,
433 CircuitState::Open => false,
434 CircuitState::HalfOpen => {
435 self.half_open_requests.load(Ordering::SeqCst) < self.config.half_open_max_requests
436 }
437 }
438 }
439
440 pub fn record_success(&self) {
442 self.total_requests.fetch_add(1, Ordering::Relaxed);
443
444 let state = self.state();
445 if state == CircuitState::HalfOpen {
446 let success_count = self.success_count.fetch_add(1, Ordering::SeqCst) + 1;
447 self.half_open_requests.fetch_sub(1, Ordering::SeqCst);
448
449 if success_count >= self.config.success_threshold {
450 self.close();
451 }
452 }
453 }
454
455 pub fn record_failure(&self) {
457 self.total_requests.fetch_add(1, Ordering::Relaxed);
458 self.total_failures.fetch_add(1, Ordering::Relaxed);
459
460 let state = self.state();
461 match state {
462 CircuitState::Closed => {
463 let failure_count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
464 if failure_count >= self.config.failure_threshold {
465 self.open();
466 }
467 }
468 CircuitState::HalfOpen => {
469 self.half_open_requests.fetch_sub(1, Ordering::SeqCst);
470 self.open();
471 }
472 CircuitState::Open => {}
473 }
474 }
475
476 pub fn record_rejection(&self) {
478 self.total_rejections.fetch_add(1, Ordering::Relaxed);
479 }
480
481 fn open(&self) {
483 *self.state.write() = CircuitState::Open;
484 *self.opened_at.write() = Some(Instant::now());
485 }
486
487 fn close(&self) {
489 *self.state.write() = CircuitState::Closed;
490 *self.opened_at.write() = None;
491 self.failure_count.store(0, Ordering::SeqCst);
492 self.success_count.store(0, Ordering::SeqCst);
493 }
494
495 pub fn reset(&self) {
497 self.close();
498 }
499
500 fn acquire_half_open(&self) -> bool {
502 if self.state() != CircuitState::HalfOpen {
503 return true;
504 }
505
506 let current = self.half_open_requests.load(Ordering::SeqCst);
507 if current >= self.config.half_open_max_requests {
508 return false;
509 }
510
511 self.half_open_requests.fetch_add(1, Ordering::SeqCst);
512 true
513 }
514
515 pub async fn execute<F, Fut, T, E>(&self, operation: F) -> Result<T>
517 where
518 F: FnOnce() -> Fut,
519 Fut: Future<Output = std::result::Result<T, E>>,
520 E: std::fmt::Display,
521 {
522 if !self.is_allowed() {
523 self.record_rejection();
524 return Err(RingKernelError::BackendError(
525 "Circuit breaker is open".to_string(),
526 ));
527 }
528
529 if !self.acquire_half_open() {
530 self.record_rejection();
531 return Err(RingKernelError::BackendError(
532 "Circuit breaker half-open limit reached".to_string(),
533 ));
534 }
535
536 match operation().await {
537 Ok(result) => {
538 self.record_success();
539 Ok(result)
540 }
541 Err(e) => {
542 self.record_failure();
543 Err(RingKernelError::BackendError(format!(
544 "Operation failed: {}",
545 e
546 )))
547 }
548 }
549 }
550
551 pub fn stats(&self) -> CircuitBreakerStats {
553 CircuitBreakerStats {
554 state: self.state(),
555 total_requests: self.total_requests.load(Ordering::Relaxed),
556 total_failures: self.total_failures.load(Ordering::Relaxed),
557 total_rejections: self.total_rejections.load(Ordering::Relaxed),
558 failure_count: self.failure_count.load(Ordering::Relaxed),
559 success_count: self.success_count.load(Ordering::Relaxed),
560 }
561 }
562}
563
564impl Default for CircuitBreaker {
565 fn default() -> Self {
566 Self {
567 config: CircuitBreakerConfig::default(),
568 state: RwLock::new(CircuitState::Closed),
569 failure_count: AtomicU32::new(0),
570 success_count: AtomicU32::new(0),
571 opened_at: RwLock::new(None),
572 half_open_requests: AtomicU32::new(0),
573 total_requests: AtomicU64::new(0),
574 total_failures: AtomicU64::new(0),
575 total_rejections: AtomicU64::new(0),
576 }
577 }
578}
579
580#[derive(Debug, Clone)]
582pub struct CircuitBreakerStats {
583 pub state: CircuitState,
585 pub total_requests: u64,
587 pub total_failures: u64,
589 pub total_rejections: u64,
591 pub failure_count: u32,
593 pub success_count: u32,
595}
596
597#[derive(Debug, Clone)]
603pub enum BackoffStrategy {
604 Fixed(Duration),
606 Linear {
608 initial: Duration,
610 max: Duration,
612 },
613 Exponential {
615 initial: Duration,
617 max: Duration,
619 multiplier: f64,
621 },
622 None,
624}
625
626impl BackoffStrategy {
627 pub fn delay(&self, attempt: u32) -> Duration {
629 match self {
630 BackoffStrategy::Fixed(d) => *d,
631 BackoffStrategy::Linear { initial, max } => {
632 let delay = initial.mul_f64((attempt + 1) as f64);
633 delay.min(*max)
634 }
635 BackoffStrategy::Exponential {
636 initial,
637 max,
638 multiplier,
639 } => {
640 let factor = multiplier.powi(attempt as i32);
641 let delay = initial.mul_f64(factor);
642 delay.min(*max)
643 }
644 BackoffStrategy::None => Duration::ZERO,
645 }
646 }
647}
648
649#[derive(Clone)]
651pub struct RetryPolicy {
652 pub max_attempts: u32,
654 pub backoff: BackoffStrategy,
656 pub jitter: bool,
658 #[allow(clippy::type_complexity)]
660 retryable: Option<Arc<dyn Fn(&str) -> bool + Send + Sync>>,
661}
662
663impl std::fmt::Debug for RetryPolicy {
664 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
665 f.debug_struct("RetryPolicy")
666 .field("max_attempts", &self.max_attempts)
667 .field("backoff", &self.backoff)
668 .field("jitter", &self.jitter)
669 .field("retryable", &self.retryable.is_some())
670 .finish()
671 }
672}
673
674impl RetryPolicy {
675 pub fn new(max_attempts: u32) -> Self {
677 Self {
678 max_attempts,
679 backoff: BackoffStrategy::Exponential {
680 initial: Duration::from_millis(100),
681 max: Duration::from_secs(30),
682 multiplier: 2.0,
683 },
684 jitter: true,
685 retryable: None,
686 }
687 }
688
689 pub fn with_backoff(mut self, backoff: BackoffStrategy) -> Self {
691 self.backoff = backoff;
692 self
693 }
694
695 pub fn without_jitter(mut self) -> Self {
697 self.jitter = false;
698 self
699 }
700
701 pub fn with_retryable<F>(mut self, predicate: F) -> Self
703 where
704 F: Fn(&str) -> bool + Send + Sync + 'static,
705 {
706 self.retryable = Some(Arc::new(predicate));
707 self
708 }
709
710 pub fn is_retryable(&self, error: &str) -> bool {
712 self.retryable.as_ref().map(|p| p(error)).unwrap_or(true)
713 }
714
715 pub fn get_delay(&self, attempt: u32) -> Duration {
717 let base_delay = self.backoff.delay(attempt);
718
719 if self.jitter && base_delay > Duration::ZERO {
720 let jitter_factor = 0.75 + (rand_u64() % 50) as f64 / 200.0;
722 base_delay.mul_f64(jitter_factor)
723 } else {
724 base_delay
725 }
726 }
727
728 pub async fn execute<F, Fut, T, E>(&self, mut operation: F) -> Result<T>
730 where
731 F: FnMut() -> Fut,
732 Fut: Future<Output = std::result::Result<T, E>>,
733 E: std::fmt::Display,
734 {
735 let mut last_error = String::new();
736
737 for attempt in 0..self.max_attempts {
738 match operation().await {
739 Ok(result) => return Ok(result),
740 Err(e) => {
741 last_error = format!("{}", e);
742
743 if !self.is_retryable(&last_error) {
745 return Err(RingKernelError::BackendError(format!(
746 "Non-retryable error: {}",
747 last_error
748 )));
749 }
750
751 if attempt + 1 >= self.max_attempts {
753 break;
754 }
755
756 let delay = self.get_delay(attempt);
758 tokio::time::sleep(delay).await;
759 }
760 }
761 }
762
763 Err(RingKernelError::BackendError(format!(
764 "Operation failed after {} attempts: {}",
765 self.max_attempts, last_error
766 )))
767 }
768}
769
770impl Default for RetryPolicy {
771 fn default() -> Self {
772 Self::new(3)
773 }
774}
775
776fn rand_u64() -> u64 {
778 use std::hash::{Hash, Hasher};
779 let mut hasher = std::collections::hash_map::DefaultHasher::new();
780 std::time::SystemTime::now().hash(&mut hasher);
781 std::thread::current().id().hash(&mut hasher);
782 hasher.finish()
783}
784
785#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
791pub enum DegradationLevel {
792 Normal = 0,
794 Light = 1,
796 Moderate = 2,
798 Severe = 3,
800 Critical = 4,
802}
803
804impl DegradationLevel {
805 pub fn next_worse(self) -> Self {
809 match self {
810 DegradationLevel::Normal => DegradationLevel::Light,
811 DegradationLevel::Light => DegradationLevel::Moderate,
812 DegradationLevel::Moderate => DegradationLevel::Severe,
813 DegradationLevel::Severe => DegradationLevel::Critical,
814 DegradationLevel::Critical => DegradationLevel::Critical,
815 }
816 }
817
818 pub fn next_better(self) -> Self {
822 match self {
823 DegradationLevel::Normal => DegradationLevel::Normal,
824 DegradationLevel::Light => DegradationLevel::Normal,
825 DegradationLevel::Moderate => DegradationLevel::Light,
826 DegradationLevel::Severe => DegradationLevel::Moderate,
827 DegradationLevel::Critical => DegradationLevel::Severe,
828 }
829 }
830}
831
832#[derive(Debug, Clone)]
834pub struct LoadSheddingPolicy {
835 pub queue_threshold: usize,
837 pub cpu_threshold: f64,
839 pub memory_threshold: f64,
841 pub shed_ratio: f64,
843}
844
845impl Default for LoadSheddingPolicy {
846 fn default() -> Self {
847 Self {
848 queue_threshold: 10000,
849 cpu_threshold: 0.9,
850 memory_threshold: 0.85,
851 shed_ratio: 0.1,
852 }
853 }
854}
855
856pub struct DegradationManager {
858 level: RwLock<DegradationLevel>,
860 policy: LoadSheddingPolicy,
862 #[allow(clippy::type_complexity)]
864 callbacks: RwLock<Vec<Arc<dyn Fn(DegradationLevel, DegradationLevel) + Send + Sync>>>,
865 shed_counter: AtomicU64,
867 total_requests: AtomicU64,
869 shed_requests: AtomicU64,
871}
872
873impl DegradationManager {
874 pub fn new() -> Arc<Self> {
876 Arc::new(Self {
877 level: RwLock::new(DegradationLevel::Normal),
878 policy: LoadSheddingPolicy::default(),
879 callbacks: RwLock::new(Vec::new()),
880 shed_counter: AtomicU64::new(0),
881 total_requests: AtomicU64::new(0),
882 shed_requests: AtomicU64::new(0),
883 })
884 }
885
886 pub fn with_policy(policy: LoadSheddingPolicy) -> Arc<Self> {
888 Arc::new(Self {
889 level: RwLock::new(DegradationLevel::Normal),
890 policy,
891 callbacks: RwLock::new(Vec::new()),
892 shed_counter: AtomicU64::new(0),
893 total_requests: AtomicU64::new(0),
894 shed_requests: AtomicU64::new(0),
895 })
896 }
897
898 pub fn level(&self) -> DegradationLevel {
900 *self.level.read()
901 }
902
903 pub fn set_level(&self, new_level: DegradationLevel) {
905 let old_level = *self.level.read();
906 if old_level != new_level {
907 *self.level.write() = new_level;
908
909 let callbacks = self.callbacks.read().clone();
911 for callback in callbacks {
912 callback(old_level, new_level);
913 }
914 }
915 }
916
917 pub fn on_level_change<F>(&self, callback: F)
919 where
920 F: Fn(DegradationLevel, DegradationLevel) + Send + Sync + 'static,
921 {
922 self.callbacks.write().push(Arc::new(callback));
923 }
924
925 pub fn should_shed(&self) -> bool {
927 self.total_requests.fetch_add(1, Ordering::Relaxed);
928
929 let level = self.level();
930 if level == DegradationLevel::Normal {
931 return false;
932 }
933
934 let base_ratio = self.policy.shed_ratio;
936 let level_factor = match level {
937 DegradationLevel::Normal => 0.0,
938 DegradationLevel::Light => 1.0,
939 DegradationLevel::Moderate => 2.0,
940 DegradationLevel::Severe => 3.0,
941 DegradationLevel::Critical => 4.0,
942 };
943
944 let shed_probability = (base_ratio * level_factor).min(0.9);
945
946 let counter = self.shed_counter.fetch_add(1, Ordering::Relaxed);
948 let should_shed = (counter % 100) < (shed_probability * 100.0) as u64;
949
950 if should_shed {
951 self.shed_requests.fetch_add(1, Ordering::Relaxed);
952 }
953
954 should_shed
955 }
956
957 pub fn is_feature_disabled(&self, required_level: DegradationLevel) -> bool {
959 self.level() > required_level
960 }
961
962 pub fn stats(&self) -> DegradationStats {
964 let total = self.total_requests.load(Ordering::Relaxed);
965 let shed = self.shed_requests.load(Ordering::Relaxed);
966
967 DegradationStats {
968 level: self.level(),
969 total_requests: total,
970 shed_requests: shed,
971 shed_ratio: if total > 0 {
972 shed as f64 / total as f64
973 } else {
974 0.0
975 },
976 }
977 }
978}
979
980impl Default for DegradationManager {
981 fn default() -> Self {
982 Self {
983 level: RwLock::new(DegradationLevel::Normal),
984 policy: LoadSheddingPolicy::default(),
985 callbacks: RwLock::new(Vec::new()),
986 shed_counter: AtomicU64::new(0),
987 total_requests: AtomicU64::new(0),
988 shed_requests: AtomicU64::new(0),
989 }
990 }
991}
992
993#[derive(Debug, Clone)]
995pub struct DegradationStats {
996 pub level: DegradationLevel,
998 pub total_requests: u64,
1000 pub shed_requests: u64,
1002 pub shed_ratio: f64,
1004}
1005
1006#[derive(Debug, Clone)]
1012pub struct KernelHealth {
1013 pub kernel_id: KernelId,
1015 pub last_heartbeat: Instant,
1017 pub status: HealthStatus,
1019 pub failure_count: u32,
1021 pub messages_per_sec: f64,
1023 pub queue_depth: usize,
1025}
1026
1027pub struct KernelWatchdog {
1029 kernels: RwLock<HashMap<KernelId, KernelHealth>>,
1031 heartbeat_timeout: Duration,
1033 #[allow(dead_code)]
1035 check_interval: Duration,
1036 failure_threshold: u32,
1038 #[allow(dead_code)]
1040 running: std::sync::atomic::AtomicBool,
1041 #[allow(clippy::type_complexity)]
1043 callbacks: RwLock<Vec<Arc<dyn Fn(&KernelHealth) + Send + Sync>>>,
1044}
1045
1046impl KernelWatchdog {
1047 pub fn new() -> Arc<Self> {
1049 Arc::new(Self {
1050 kernels: RwLock::new(HashMap::new()),
1051 heartbeat_timeout: Duration::from_secs(30),
1052 check_interval: Duration::from_secs(5),
1053 failure_threshold: 3,
1054 running: std::sync::atomic::AtomicBool::new(false),
1055 callbacks: RwLock::new(Vec::new()),
1056 })
1057 }
1058
1059 pub fn with_heartbeat_timeout(self: Arc<Self>, timeout: Duration) -> Arc<Self> {
1061 let _ = timeout; self
1063 }
1064
1065 pub fn watch(&self, kernel_id: KernelId) {
1067 let health = KernelHealth {
1068 kernel_id: kernel_id.clone(),
1069 last_heartbeat: Instant::now(),
1070 status: HealthStatus::Healthy,
1071 failure_count: 0,
1072 messages_per_sec: 0.0,
1073 queue_depth: 0,
1074 };
1075 self.kernels.write().insert(kernel_id, health);
1076 }
1077
1078 pub fn unwatch(&self, kernel_id: &KernelId) {
1080 self.kernels.write().remove(kernel_id);
1081 }
1082
1083 pub fn heartbeat(&self, kernel_id: &KernelId) {
1085 if let Some(health) = self.kernels.write().get_mut(kernel_id) {
1086 health.last_heartbeat = Instant::now();
1087 health.failure_count = 0;
1088 if health.status == HealthStatus::Unhealthy {
1089 health.status = HealthStatus::Healthy;
1090 }
1091 }
1092 }
1093
1094 pub fn update_metrics(&self, kernel_id: &KernelId, messages_per_sec: f64, queue_depth: usize) {
1096 if let Some(health) = self.kernels.write().get_mut(kernel_id) {
1097 health.messages_per_sec = messages_per_sec;
1098 health.queue_depth = queue_depth;
1099 }
1100 }
1101
1102 pub fn check_all(&self) -> Vec<KernelHealth> {
1104 let now = Instant::now();
1105 let mut kernels = self.kernels.write();
1106 let mut results = Vec::with_capacity(kernels.len());
1107
1108 for health in kernels.values_mut() {
1109 if now.duration_since(health.last_heartbeat) > self.heartbeat_timeout {
1111 health.failure_count += 1;
1112 if health.failure_count >= self.failure_threshold {
1113 health.status = HealthStatus::Unhealthy;
1114 } else {
1115 health.status = HealthStatus::Degraded;
1116 }
1117 }
1118
1119 results.push(health.clone());
1120 }
1121
1122 drop(kernels);
1124 let callbacks = self.callbacks.read().clone();
1125 for health in results
1126 .iter()
1127 .filter(|h| h.status == HealthStatus::Unhealthy)
1128 {
1129 for callback in &callbacks {
1130 callback(health);
1131 }
1132 }
1133
1134 results
1135 }
1136
1137 pub fn on_unhealthy<F>(&self, callback: F)
1139 where
1140 F: Fn(&KernelHealth) + Send + Sync + 'static,
1141 {
1142 self.callbacks.write().push(Arc::new(callback));
1143 }
1144
1145 pub fn get_health(&self, kernel_id: &KernelId) -> Option<KernelHealth> {
1147 self.kernels.read().get(kernel_id).cloned()
1148 }
1149
1150 pub fn unhealthy_kernels(&self) -> Vec<KernelHealth> {
1152 self.kernels
1153 .read()
1154 .values()
1155 .filter(|h| h.status == HealthStatus::Unhealthy)
1156 .cloned()
1157 .collect()
1158 }
1159
1160 pub fn watched_count(&self) -> usize {
1162 self.kernels.read().len()
1163 }
1164}
1165
1166impl Default for KernelWatchdog {
1167 fn default() -> Self {
1168 Self {
1169 kernels: RwLock::new(HashMap::new()),
1170 heartbeat_timeout: Duration::from_secs(30),
1171 check_interval: Duration::from_secs(5),
1172 failure_threshold: 3,
1173 running: std::sync::atomic::AtomicBool::new(false),
1174 callbacks: RwLock::new(Vec::new()),
1175 }
1176 }
1177}
1178
1179#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
1185pub enum RecoveryPolicy {
1186 Restart,
1188 Migrate,
1190 Checkpoint,
1192 #[default]
1194 Notify,
1195 Escalate,
1197 Circuit,
1199}
1200
1201impl RecoveryPolicy {
1202 pub fn severity(&self) -> u8 {
1204 match self {
1205 RecoveryPolicy::Notify => 1,
1206 RecoveryPolicy::Checkpoint => 2,
1207 RecoveryPolicy::Restart => 3,
1208 RecoveryPolicy::Circuit => 4,
1209 RecoveryPolicy::Migrate => 5,
1210 RecoveryPolicy::Escalate => 6,
1211 }
1212 }
1213
1214 pub fn requires_intervention(&self) -> bool {
1216 matches!(self, RecoveryPolicy::Notify | RecoveryPolicy::Escalate)
1217 }
1218}
1219
1220#[derive(Debug, Clone)]
1222pub struct RecoveryConfig {
1223 pub max_restart_attempts: u32,
1225 pub restart_delay: Duration,
1227 pub checkpoint_before_restart: bool,
1229 pub migrate_on_device_error: bool,
1231 pub recovery_cooldown: Duration,
1233 pub policies: HashMap<FailureType, RecoveryPolicy>,
1235}
1236
1237impl Default for RecoveryConfig {
1238 fn default() -> Self {
1239 let mut policies = HashMap::new();
1240 policies.insert(FailureType::Timeout, RecoveryPolicy::Restart);
1241 policies.insert(FailureType::Crash, RecoveryPolicy::Restart);
1242 policies.insert(FailureType::DeviceError, RecoveryPolicy::Migrate);
1243 policies.insert(FailureType::ResourceExhausted, RecoveryPolicy::Circuit);
1244 policies.insert(FailureType::Unknown, RecoveryPolicy::Notify);
1245
1246 Self {
1247 max_restart_attempts: 3,
1248 restart_delay: Duration::from_secs(5),
1249 checkpoint_before_restart: true,
1250 migrate_on_device_error: true,
1251 recovery_cooldown: Duration::from_secs(60),
1252 policies,
1253 }
1254 }
1255}
1256
1257impl RecoveryConfig {
1258 pub fn builder() -> RecoveryConfigBuilder {
1260 RecoveryConfigBuilder::new()
1261 }
1262
1263 #[allow(clippy::field_reassign_with_default)]
1265 pub fn conservative() -> Self {
1266 let mut config = Self::default();
1267 config.max_restart_attempts = 1;
1268 config.checkpoint_before_restart = true;
1269 for policy in config.policies.values_mut() {
1270 if *policy == RecoveryPolicy::Restart {
1271 *policy = RecoveryPolicy::Notify;
1272 }
1273 }
1274 config
1275 }
1276
1277 #[allow(clippy::field_reassign_with_default)]
1279 pub fn aggressive() -> Self {
1280 let mut config = Self::default();
1281 config.max_restart_attempts = 5;
1282 config.checkpoint_before_restart = false;
1283 config.restart_delay = Duration::from_secs(1);
1284 config.recovery_cooldown = Duration::from_secs(10);
1285 config
1286 }
1287
1288 pub fn policy_for(&self, failure_type: FailureType) -> RecoveryPolicy {
1290 self.policies
1291 .get(&failure_type)
1292 .copied()
1293 .unwrap_or(RecoveryPolicy::Notify)
1294 }
1295}
1296
1297#[derive(Debug, Default)]
1299pub struct RecoveryConfigBuilder {
1300 config: RecoveryConfig,
1301}
1302
1303impl RecoveryConfigBuilder {
1304 pub fn new() -> Self {
1306 Self {
1307 config: RecoveryConfig::default(),
1308 }
1309 }
1310
1311 pub fn max_restart_attempts(mut self, attempts: u32) -> Self {
1313 self.config.max_restart_attempts = attempts;
1314 self
1315 }
1316
1317 pub fn restart_delay(mut self, delay: Duration) -> Self {
1319 self.config.restart_delay = delay;
1320 self
1321 }
1322
1323 pub fn checkpoint_before_restart(mut self, enabled: bool) -> Self {
1325 self.config.checkpoint_before_restart = enabled;
1326 self
1327 }
1328
1329 pub fn migrate_on_device_error(mut self, enabled: bool) -> Self {
1331 self.config.migrate_on_device_error = enabled;
1332 self
1333 }
1334
1335 pub fn recovery_cooldown(mut self, cooldown: Duration) -> Self {
1337 self.config.recovery_cooldown = cooldown;
1338 self
1339 }
1340
1341 pub fn policy(mut self, failure_type: FailureType, policy: RecoveryPolicy) -> Self {
1343 self.config.policies.insert(failure_type, policy);
1344 self
1345 }
1346
1347 pub fn build(self) -> RecoveryConfig {
1349 self.config
1350 }
1351}
1352
1353#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1355pub enum FailureType {
1356 Timeout,
1358 Crash,
1360 DeviceError,
1362 ResourceExhausted,
1364 QueueOverflow,
1366 StateCorruption,
1368 Unknown,
1370}
1371
1372impl FailureType {
1373 pub fn description(&self) -> &'static str {
1375 match self {
1376 FailureType::Timeout => "Kernel heartbeat timeout",
1377 FailureType::Crash => "Kernel crash",
1378 FailureType::DeviceError => "GPU device error",
1379 FailureType::ResourceExhausted => "Resource exhaustion",
1380 FailureType::QueueOverflow => "Message queue overflow",
1381 FailureType::StateCorruption => "State corruption detected",
1382 FailureType::Unknown => "Unknown failure",
1383 }
1384 }
1385}
1386
1387#[derive(Debug, Clone)]
1389pub struct RecoveryAction {
1390 pub kernel_id: KernelId,
1392 pub failure_type: FailureType,
1394 pub policy: RecoveryPolicy,
1396 pub attempt: u32,
1398 pub created_at: Instant,
1400 pub context: HashMap<String, String>,
1402}
1403
1404impl RecoveryAction {
1405 pub fn new(kernel_id: KernelId, failure_type: FailureType, policy: RecoveryPolicy) -> Self {
1407 Self {
1408 kernel_id,
1409 failure_type,
1410 policy,
1411 attempt: 1,
1412 created_at: Instant::now(),
1413 context: HashMap::new(),
1414 }
1415 }
1416
1417 pub fn with_context(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
1419 self.context.insert(key.into(), value.into());
1420 self
1421 }
1422
1423 pub fn with_attempt(mut self, attempt: u32) -> Self {
1425 self.attempt = attempt;
1426 self
1427 }
1428}
1429
1430#[derive(Debug, Clone)]
1432pub struct RecoveryResult {
1433 pub action: RecoveryAction,
1435 pub success: bool,
1437 pub error: Option<String>,
1439 pub duration: Duration,
1441 pub next_action: Option<RecoveryPolicy>,
1443}
1444
1445impl RecoveryResult {
1446 pub fn success(action: RecoveryAction, duration: Duration) -> Self {
1448 Self {
1449 action,
1450 success: true,
1451 error: None,
1452 duration,
1453 next_action: None,
1454 }
1455 }
1456
1457 pub fn failure(action: RecoveryAction, error: String, duration: Duration) -> Self {
1459 Self {
1460 action,
1461 success: false,
1462 error: Some(error),
1463 duration,
1464 next_action: Some(RecoveryPolicy::Escalate),
1465 }
1466 }
1467
1468 pub fn failure_with_next(
1470 action: RecoveryAction,
1471 error: String,
1472 duration: Duration,
1473 next: RecoveryPolicy,
1474 ) -> Self {
1475 Self {
1476 action,
1477 success: false,
1478 error: Some(error),
1479 duration,
1480 next_action: Some(next),
1481 }
1482 }
1483}
1484
1485pub type RecoveryHandler = Arc<
1487 dyn Fn(&RecoveryAction) -> Pin<Box<dyn Future<Output = RecoveryResult> + Send>> + Send + Sync,
1488>;
1489
1490pub struct RecoveryManager {
1492 config: RwLock<RecoveryConfig>,
1494 handlers: RwLock<HashMap<RecoveryPolicy, RecoveryHandler>>,
1496 history: RwLock<HashMap<KernelId, Vec<RecoveryResult>>>,
1498 attempts: RwLock<HashMap<KernelId, u32>>,
1500 last_recovery: RwLock<HashMap<KernelId, Instant>>,
1502 stats: RecoveryStats,
1504 enabled: std::sync::atomic::AtomicBool,
1506}
1507
1508impl RecoveryManager {
1509 pub fn new() -> Self {
1511 Self::with_config(RecoveryConfig::default())
1512 }
1513
1514 pub fn with_config(config: RecoveryConfig) -> Self {
1516 Self {
1517 config: RwLock::new(config),
1518 handlers: RwLock::new(HashMap::new()),
1519 history: RwLock::new(HashMap::new()),
1520 attempts: RwLock::new(HashMap::new()),
1521 last_recovery: RwLock::new(HashMap::new()),
1522 stats: RecoveryStats::default(),
1523 enabled: std::sync::atomic::AtomicBool::new(true),
1524 }
1525 }
1526
1527 pub fn set_enabled(&self, enabled: bool) {
1529 self.enabled.store(enabled, Ordering::SeqCst);
1530 }
1531
1532 pub fn is_enabled(&self) -> bool {
1534 self.enabled.load(Ordering::SeqCst)
1535 }
1536
1537 pub fn set_config(&self, config: RecoveryConfig) {
1539 *self.config.write() = config;
1540 }
1541
1542 pub fn config(&self) -> RecoveryConfig {
1544 self.config.read().clone()
1545 }
1546
1547 pub fn register_handler(&self, policy: RecoveryPolicy, handler: RecoveryHandler) {
1549 self.handlers.write().insert(policy, handler);
1550 }
1551
1552 pub fn should_recover(&self, kernel_id: &KernelId) -> bool {
1554 if !self.is_enabled() {
1555 return false;
1556 }
1557
1558 let config = self.config.read();
1559 let last_recovery = self.last_recovery.read();
1560
1561 if let Some(last) = last_recovery.get(kernel_id) {
1562 last.elapsed() >= config.recovery_cooldown
1563 } else {
1564 true
1565 }
1566 }
1567
1568 pub fn determine_action(
1570 &self,
1571 kernel_id: &KernelId,
1572 failure_type: FailureType,
1573 ) -> RecoveryAction {
1574 let config = self.config.read();
1575 let attempts = self.attempts.read();
1576
1577 let current_attempt = attempts.get(kernel_id).copied().unwrap_or(0) + 1;
1578 let policy = if current_attempt > config.max_restart_attempts {
1579 RecoveryPolicy::Escalate
1580 } else {
1581 config.policy_for(failure_type)
1582 };
1583
1584 RecoveryAction::new(kernel_id.clone(), failure_type, policy).with_attempt(current_attempt)
1585 }
1586
1587 pub async fn recover(&self, action: RecoveryAction) -> RecoveryResult {
1589 let _start = Instant::now();
1590 let kernel_id = action.kernel_id.clone();
1591 let policy = action.policy;
1592
1593 {
1595 let mut attempts = self.attempts.write();
1596 let count = attempts.entry(kernel_id.clone()).or_insert(0);
1597 *count += 1;
1598 }
1599
1600 self.last_recovery
1602 .write()
1603 .insert(kernel_id.clone(), Instant::now());
1604
1605 let handler = self.handlers.read().get(&policy).cloned();
1607
1608 let result = if let Some(handler) = handler {
1609 self.stats.attempts.fetch_add(1, Ordering::Relaxed);
1610 handler(&action).await
1611 } else {
1612 let result = self.default_recovery(&action).await;
1614 result
1615 };
1616
1617 if result.success {
1619 self.stats.successes.fetch_add(1, Ordering::Relaxed);
1620 self.attempts.write().remove(&kernel_id);
1622 } else {
1623 self.stats.failures.fetch_add(1, Ordering::Relaxed);
1624 }
1625
1626 self.history
1628 .write()
1629 .entry(kernel_id)
1630 .or_default()
1631 .push(result.clone());
1632
1633 result
1634 }
1635
1636 async fn default_recovery(&self, action: &RecoveryAction) -> RecoveryResult {
1638 let start = Instant::now();
1639
1640 match action.policy {
1641 RecoveryPolicy::Notify => {
1642 RecoveryResult::success(action.clone(), start.elapsed())
1644 }
1645 RecoveryPolicy::Checkpoint => {
1646 RecoveryResult::success(action.clone(), start.elapsed())
1648 }
1649 RecoveryPolicy::Restart => {
1650 let config = self.config.read();
1652 if action.attempt > config.max_restart_attempts {
1653 RecoveryResult::failure_with_next(
1654 action.clone(),
1655 "Max restart attempts exceeded".to_string(),
1656 start.elapsed(),
1657 RecoveryPolicy::Escalate,
1658 )
1659 } else {
1660 RecoveryResult::success(action.clone(), start.elapsed())
1661 }
1662 }
1663 RecoveryPolicy::Migrate => {
1664 RecoveryResult::success(action.clone(), start.elapsed())
1666 }
1667 RecoveryPolicy::Circuit => {
1668 RecoveryResult::success(action.clone(), start.elapsed())
1670 }
1671 RecoveryPolicy::Escalate => {
1672 RecoveryResult::failure(
1674 action.clone(),
1675 "Manual intervention required".to_string(),
1676 start.elapsed(),
1677 )
1678 }
1679 }
1680 }
1681
1682 pub fn get_history(&self, kernel_id: &KernelId) -> Vec<RecoveryResult> {
1684 self.history
1685 .read()
1686 .get(kernel_id)
1687 .cloned()
1688 .unwrap_or_default()
1689 }
1690
1691 pub fn clear_history(&self) {
1693 self.history.write().clear();
1694 self.attempts.write().clear();
1695 self.last_recovery.write().clear();
1696 }
1697
1698 pub fn stats(&self) -> RecoveryStatsSnapshot {
1700 RecoveryStatsSnapshot {
1701 attempts: self.stats.attempts.load(Ordering::Relaxed),
1702 successes: self.stats.successes.load(Ordering::Relaxed),
1703 failures: self.stats.failures.load(Ordering::Relaxed),
1704 kernels_tracked: self.history.read().len(),
1705 }
1706 }
1707}
1708
1709impl Default for RecoveryManager {
1710 fn default() -> Self {
1711 Self::new()
1712 }
1713}
1714
1715#[derive(Default)]
1717struct RecoveryStats {
1718 attempts: AtomicU64,
1719 successes: AtomicU64,
1720 failures: AtomicU64,
1721}
1722
1723#[derive(Debug, Clone, Default)]
1725pub struct RecoveryStatsSnapshot {
1726 pub attempts: u64,
1728 pub successes: u64,
1730 pub failures: u64,
1732 pub kernels_tracked: usize,
1734}
1735
1736impl RecoveryStatsSnapshot {
1737 pub fn success_rate(&self) -> f64 {
1739 if self.attempts == 0 {
1740 1.0
1741 } else {
1742 self.successes as f64 / self.attempts as f64
1743 }
1744 }
1745}
1746
1747#[cfg(test)]
1748mod tests {
1749 use super::*;
1750
1751 #[test]
1752 fn test_health_status() {
1753 assert!(HealthStatus::Healthy.is_healthy());
1754 assert!(HealthStatus::Degraded.is_healthy());
1755 assert!(!HealthStatus::Unhealthy.is_healthy());
1756 assert!(HealthStatus::Unhealthy.is_unhealthy());
1757 }
1758
1759 #[tokio::test]
1760 async fn test_health_checker() {
1761 let checker = HealthChecker::new();
1762
1763 checker.register_liveness("test_alive", || async { true });
1764 checker.register_readiness("test_ready", || async { true });
1765
1766 assert_eq!(checker.check_count(), 2);
1767 assert!(checker.is_alive().await);
1768 assert!(checker.is_ready().await);
1769 }
1770
1771 #[tokio::test]
1772 async fn test_health_checker_unhealthy() {
1773 let checker = HealthChecker::new();
1774
1775 checker.register_liveness("failing_check", || async { false });
1776
1777 assert!(!checker.is_alive().await);
1778
1779 let status = checker.aggregate_status().await;
1780 assert_eq!(status, HealthStatus::Unhealthy);
1781 }
1782
1783 #[test]
1784 fn test_circuit_breaker_initial_state() {
1785 let breaker = CircuitBreaker::new();
1786 assert_eq!(breaker.state(), CircuitState::Closed);
1787 assert!(breaker.is_allowed());
1788 }
1789
1790 #[test]
1791 fn test_circuit_breaker_opens_on_failures() {
1792 let config = CircuitBreakerConfig {
1793 failure_threshold: 3,
1794 ..Default::default()
1795 };
1796 let breaker = CircuitBreaker::with_config(config);
1797
1798 breaker.record_failure();
1799 breaker.record_failure();
1800 assert_eq!(breaker.state(), CircuitState::Closed);
1801
1802 breaker.record_failure();
1803 assert_eq!(breaker.state(), CircuitState::Open);
1804 assert!(!breaker.is_allowed());
1805 }
1806
1807 #[test]
1808 fn test_circuit_breaker_reset() {
1809 let config = CircuitBreakerConfig {
1810 failure_threshold: 1,
1811 ..Default::default()
1812 };
1813 let breaker = CircuitBreaker::with_config(config);
1814
1815 breaker.record_failure();
1816 assert_eq!(breaker.state(), CircuitState::Open);
1817
1818 breaker.reset();
1819 assert_eq!(breaker.state(), CircuitState::Closed);
1820 }
1821
1822 #[test]
1823 fn test_backoff_strategy_fixed() {
1824 let backoff = BackoffStrategy::Fixed(Duration::from_secs(1));
1825 assert_eq!(backoff.delay(0), Duration::from_secs(1));
1826 assert_eq!(backoff.delay(5), Duration::from_secs(1));
1827 }
1828
1829 #[test]
1830 fn test_backoff_strategy_exponential() {
1831 let backoff = BackoffStrategy::Exponential {
1832 initial: Duration::from_millis(100),
1833 max: Duration::from_secs(10),
1834 multiplier: 2.0,
1835 };
1836
1837 assert_eq!(backoff.delay(0), Duration::from_millis(100));
1838 assert_eq!(backoff.delay(1), Duration::from_millis(200));
1839 assert_eq!(backoff.delay(2), Duration::from_millis(400));
1840 }
1841
1842 #[test]
1843 fn test_backoff_strategy_linear() {
1844 let backoff = BackoffStrategy::Linear {
1845 initial: Duration::from_millis(100),
1846 max: Duration::from_secs(1),
1847 };
1848
1849 assert_eq!(backoff.delay(0), Duration::from_millis(100));
1850 assert_eq!(backoff.delay(1), Duration::from_millis(200));
1851 assert_eq!(backoff.delay(9), Duration::from_secs(1)); }
1853
1854 #[tokio::test]
1855 async fn test_retry_policy_success() {
1856 let policy = RetryPolicy::new(3);
1857
1858 let result: Result<i32> = policy.execute(|| async { Ok::<_, &str>(42) }).await;
1859
1860 assert!(result.is_ok());
1861 assert_eq!(result.unwrap(), 42);
1862 }
1863
1864 #[test]
1865 fn test_degradation_manager_levels() {
1866 let manager = DegradationManager::new();
1867
1868 assert_eq!(manager.level(), DegradationLevel::Normal);
1869
1870 manager.set_level(DegradationLevel::Moderate);
1871 assert_eq!(manager.level(), DegradationLevel::Moderate);
1872 }
1873
1874 #[test]
1875 fn test_degradation_feature_disabled() {
1876 let manager = DegradationManager::new();
1877
1878 manager.set_level(DegradationLevel::Severe);
1879
1880 assert!(!manager.is_feature_disabled(DegradationLevel::Critical));
1881 assert!(manager.is_feature_disabled(DegradationLevel::Moderate));
1882 assert!(manager.is_feature_disabled(DegradationLevel::Normal));
1883 }
1884
1885 #[test]
1886 fn test_kernel_watchdog() {
1887 let watchdog = KernelWatchdog::new();
1888
1889 let kernel_id = KernelId::new("test_kernel");
1890 watchdog.watch(kernel_id.clone());
1891
1892 assert_eq!(watchdog.watched_count(), 1);
1893
1894 watchdog.heartbeat(&kernel_id);
1895 let health = watchdog.get_health(&kernel_id).unwrap();
1896 assert_eq!(health.status, HealthStatus::Healthy);
1897 }
1898
1899 #[test]
1900 fn test_kernel_watchdog_metrics() {
1901 let watchdog = KernelWatchdog::new();
1902
1903 let kernel_id = KernelId::new("test_kernel");
1904 watchdog.watch(kernel_id.clone());
1905
1906 watchdog.update_metrics(&kernel_id, 1000.0, 50);
1907
1908 let health = watchdog.get_health(&kernel_id).unwrap();
1909 assert_eq!(health.messages_per_sec, 1000.0);
1910 assert_eq!(health.queue_depth, 50);
1911 }
1912
1913 #[test]
1915 fn test_recovery_policy_severity() {
1916 assert!(RecoveryPolicy::Notify.severity() < RecoveryPolicy::Restart.severity());
1917 assert!(RecoveryPolicy::Restart.severity() < RecoveryPolicy::Migrate.severity());
1918 assert!(RecoveryPolicy::Migrate.severity() < RecoveryPolicy::Escalate.severity());
1919 }
1920
1921 #[test]
1922 fn test_recovery_policy_requires_intervention() {
1923 assert!(RecoveryPolicy::Notify.requires_intervention());
1924 assert!(RecoveryPolicy::Escalate.requires_intervention());
1925 assert!(!RecoveryPolicy::Restart.requires_intervention());
1926 assert!(!RecoveryPolicy::Migrate.requires_intervention());
1927 }
1928
1929 #[test]
1930 fn test_recovery_config_default() {
1931 let config = RecoveryConfig::default();
1932 assert_eq!(config.max_restart_attempts, 3);
1933 assert!(config.checkpoint_before_restart);
1934 assert!(config.migrate_on_device_error);
1935 assert_eq!(
1936 config.policy_for(FailureType::Timeout),
1937 RecoveryPolicy::Restart
1938 );
1939 assert_eq!(
1940 config.policy_for(FailureType::DeviceError),
1941 RecoveryPolicy::Migrate
1942 );
1943 }
1944
1945 #[test]
1946 fn test_recovery_config_conservative() {
1947 let config = RecoveryConfig::conservative();
1948 assert_eq!(config.max_restart_attempts, 1);
1949 assert_eq!(
1950 config.policy_for(FailureType::Timeout),
1951 RecoveryPolicy::Notify
1952 );
1953 }
1954
1955 #[test]
1956 fn test_recovery_config_aggressive() {
1957 let config = RecoveryConfig::aggressive();
1958 assert_eq!(config.max_restart_attempts, 5);
1959 assert!(!config.checkpoint_before_restart);
1960 assert_eq!(config.restart_delay, Duration::from_secs(1));
1961 }
1962
1963 #[test]
1964 fn test_recovery_config_builder() {
1965 let config = RecoveryConfig::builder()
1966 .max_restart_attempts(10)
1967 .restart_delay(Duration::from_secs(2))
1968 .checkpoint_before_restart(false)
1969 .recovery_cooldown(Duration::from_secs(30))
1970 .policy(FailureType::Crash, RecoveryPolicy::Migrate)
1971 .build();
1972
1973 assert_eq!(config.max_restart_attempts, 10);
1974 assert_eq!(config.restart_delay, Duration::from_secs(2));
1975 assert!(!config.checkpoint_before_restart);
1976 assert_eq!(config.recovery_cooldown, Duration::from_secs(30));
1977 assert_eq!(
1978 config.policy_for(FailureType::Crash),
1979 RecoveryPolicy::Migrate
1980 );
1981 }
1982
1983 #[test]
1984 fn test_failure_type_description() {
1985 assert_eq!(
1986 FailureType::Timeout.description(),
1987 "Kernel heartbeat timeout"
1988 );
1989 assert_eq!(FailureType::Crash.description(), "Kernel crash");
1990 assert_eq!(FailureType::DeviceError.description(), "GPU device error");
1991 }
1992
1993 #[test]
1994 fn test_recovery_action() {
1995 let kernel_id = KernelId::new("test_kernel");
1996 let action = RecoveryAction::new(
1997 kernel_id.clone(),
1998 FailureType::Timeout,
1999 RecoveryPolicy::Restart,
2000 )
2001 .with_context("reason", "heartbeat missed")
2002 .with_attempt(2);
2003
2004 assert_eq!(action.kernel_id, kernel_id);
2005 assert_eq!(action.failure_type, FailureType::Timeout);
2006 assert_eq!(action.policy, RecoveryPolicy::Restart);
2007 assert_eq!(action.attempt, 2);
2008 assert_eq!(
2009 action.context.get("reason"),
2010 Some(&"heartbeat missed".to_string())
2011 );
2012 }
2013
2014 #[test]
2015 fn test_recovery_result() {
2016 let action = RecoveryAction::new(
2017 KernelId::new("test"),
2018 FailureType::Crash,
2019 RecoveryPolicy::Restart,
2020 );
2021
2022 let success = RecoveryResult::success(action.clone(), Duration::from_millis(100));
2023 assert!(success.success);
2024 assert!(success.error.is_none());
2025 assert!(success.next_action.is_none());
2026
2027 let failure = RecoveryResult::failure(
2028 action.clone(),
2029 "Failed".to_string(),
2030 Duration::from_millis(50),
2031 );
2032 assert!(!failure.success);
2033 assert_eq!(failure.error, Some("Failed".to_string()));
2034 assert_eq!(failure.next_action, Some(RecoveryPolicy::Escalate));
2035 }
2036
2037 #[test]
2038 fn test_recovery_manager_creation() {
2039 let manager = RecoveryManager::new();
2040 assert!(manager.is_enabled());
2041
2042 let stats = manager.stats();
2043 assert_eq!(stats.attempts, 0);
2044 assert_eq!(stats.successes, 0);
2045 assert_eq!(stats.failures, 0);
2046 }
2047
2048 #[test]
2049 fn test_recovery_manager_enable_disable() {
2050 let manager = RecoveryManager::new();
2051
2052 assert!(manager.is_enabled());
2053 manager.set_enabled(false);
2054 assert!(!manager.is_enabled());
2055 manager.set_enabled(true);
2056 assert!(manager.is_enabled());
2057 }
2058
2059 #[test]
2060 fn test_recovery_manager_determine_action() {
2061 let manager = RecoveryManager::new();
2062 let kernel_id = KernelId::new("test_kernel");
2063
2064 let action = manager.determine_action(&kernel_id, FailureType::Timeout);
2065 assert_eq!(action.kernel_id, kernel_id);
2066 assert_eq!(action.failure_type, FailureType::Timeout);
2067 assert_eq!(action.policy, RecoveryPolicy::Restart);
2068 assert_eq!(action.attempt, 1);
2069 }
2070
2071 #[test]
2072 fn test_recovery_manager_should_recover() {
2073 let config = RecoveryConfig::builder()
2074 .recovery_cooldown(Duration::from_millis(10))
2075 .build();
2076 let manager = RecoveryManager::with_config(config);
2077 let kernel_id = KernelId::new("test_kernel");
2078
2079 assert!(manager.should_recover(&kernel_id));
2080
2081 manager.set_enabled(false);
2083 assert!(!manager.should_recover(&kernel_id));
2084 }
2085
2086 #[tokio::test]
2087 async fn test_recovery_manager_recover() {
2088 let manager = RecoveryManager::new();
2089 let kernel_id = KernelId::new("test_kernel");
2090
2091 let action = RecoveryAction::new(
2092 kernel_id.clone(),
2093 FailureType::Timeout,
2094 RecoveryPolicy::Notify,
2095 );
2096 let result = manager.recover(action).await;
2097
2098 assert!(result.success);
2099
2100 let stats = manager.stats();
2101 assert_eq!(stats.successes, 1);
2102 assert_eq!(stats.kernels_tracked, 1);
2103
2104 let history = manager.get_history(&kernel_id);
2105 assert_eq!(history.len(), 1);
2106 }
2107
2108 #[test]
2109 fn test_recovery_stats_snapshot_success_rate() {
2110 let stats = RecoveryStatsSnapshot {
2111 attempts: 10,
2112 successes: 8,
2113 failures: 2,
2114 kernels_tracked: 3,
2115 };
2116
2117 assert!((stats.success_rate() - 0.8).abs() < 0.001);
2118
2119 let empty = RecoveryStatsSnapshot::default();
2120 assert_eq!(empty.success_rate(), 1.0);
2121 }
2122
2123 #[test]
2124 fn test_recovery_manager_clear_history() {
2125 let manager = RecoveryManager::new();
2126 let kernel_id = KernelId::new("test_kernel");
2127
2128 manager.attempts.write().insert(kernel_id.clone(), 5);
2130
2131 manager.clear_history();
2132
2133 assert!(manager.get_history(&kernel_id).is_empty());
2134 assert!(manager.attempts.read().is_empty());
2135 }
2136}