1use parking_lot::RwLock;
31use std::collections::HashMap;
32use std::future::Future;
33use std::pin::Pin;
34use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
35use std::sync::Arc;
36use std::time::{Duration, Instant};
37
38use crate::error::{Result, RingKernelError};
39use crate::runtime::KernelId;
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum HealthStatus {
48 Healthy,
50 Degraded,
52 Unhealthy,
54 Unknown,
56}
57
58impl HealthStatus {
59 pub fn is_healthy(&self) -> bool {
61 matches!(self, HealthStatus::Healthy | HealthStatus::Degraded)
62 }
63
64 pub fn is_unhealthy(&self) -> bool {
66 matches!(self, HealthStatus::Unhealthy)
67 }
68}
69
70#[derive(Debug, Clone)]
72pub struct HealthCheckResult {
73 pub name: String,
75 pub status: HealthStatus,
77 pub message: Option<String>,
79 pub duration: Duration,
81 pub checked_at: Instant,
83}
84
85pub type HealthCheckFn =
87 Arc<dyn Fn() -> Pin<Box<dyn Future<Output = HealthStatus> + Send>> + Send + Sync>;
88
89pub struct HealthCheck {
91 pub name: String,
93 check_fn: HealthCheckFn,
95 pub is_liveness: bool,
97 pub is_readiness: bool,
99 pub timeout: Duration,
101 last_result: RwLock<Option<HealthCheckResult>>,
103}
104
105impl HealthCheck {
106 pub fn new(name: impl Into<String>, check_fn: HealthCheckFn) -> Self {
108 Self {
109 name: name.into(),
110 check_fn,
111 is_liveness: false,
112 is_readiness: false,
113 timeout: Duration::from_secs(5),
114 last_result: RwLock::new(None),
115 }
116 }
117
118 pub fn liveness(mut self) -> Self {
120 self.is_liveness = true;
121 self
122 }
123
124 pub fn readiness(mut self) -> Self {
126 self.is_readiness = true;
127 self
128 }
129
130 pub fn timeout(mut self, timeout: Duration) -> Self {
132 self.timeout = timeout;
133 self
134 }
135
136 pub async fn check(&self) -> HealthCheckResult {
138 let start = Instant::now();
139 let status = (self.check_fn)().await;
140 let duration = start.elapsed();
141
142 let result = HealthCheckResult {
143 name: self.name.clone(),
144 status,
145 message: None,
146 duration,
147 checked_at: Instant::now(),
148 };
149
150 *self.last_result.write() = Some(result.clone());
151 result
152 }
153
154 pub fn last_result(&self) -> Option<HealthCheckResult> {
156 self.last_result.read().clone()
157 }
158}
159
160pub struct HealthChecker {
162 checks: RwLock<Vec<Arc<HealthCheck>>>,
164 #[allow(dead_code)]
166 check_interval: Duration,
167 #[allow(dead_code)]
169 running: std::sync::atomic::AtomicBool,
170}
171
172impl HealthChecker {
173 pub fn new() -> Arc<Self> {
175 Arc::new(Self {
176 checks: RwLock::new(Vec::new()),
177 check_interval: Duration::from_secs(10),
178 running: std::sync::atomic::AtomicBool::new(false),
179 })
180 }
181
182 pub fn with_interval(self: Arc<Self>, interval: Duration) -> Arc<Self> {
184 let _ = interval;
187 self
188 }
189
190 pub fn register(&self, check: HealthCheck) {
192 self.checks.write().push(Arc::new(check));
193 }
194
195 pub fn register_liveness<F, Fut>(&self, name: impl Into<String>, check_fn: F)
197 where
198 F: Fn() -> Fut + Send + Sync + 'static,
199 Fut: Future<Output = bool> + Send + 'static,
200 {
201 let name = name.into();
202 let check = HealthCheck::new(
203 name,
204 Arc::new(move || {
205 let fut = check_fn();
206 Box::pin(async move {
207 if fut.await {
208 HealthStatus::Healthy
209 } else {
210 HealthStatus::Unhealthy
211 }
212 }) as Pin<Box<dyn Future<Output = HealthStatus> + Send>>
213 }),
214 )
215 .liveness();
216 self.register(check);
217 }
218
219 pub fn register_readiness<F, Fut>(&self, name: impl Into<String>, check_fn: F)
221 where
222 F: Fn() -> Fut + Send + Sync + 'static,
223 Fut: Future<Output = bool> + Send + 'static,
224 {
225 let name = name.into();
226 let check = HealthCheck::new(
227 name,
228 Arc::new(move || {
229 let fut = check_fn();
230 Box::pin(async move {
231 if fut.await {
232 HealthStatus::Healthy
233 } else {
234 HealthStatus::Unhealthy
235 }
236 }) as Pin<Box<dyn Future<Output = HealthStatus> + Send>>
237 }),
238 )
239 .readiness();
240 self.register(check);
241 }
242
243 pub async fn check_all(&self) -> Vec<HealthCheckResult> {
245 let checks = self.checks.read().clone();
246 let mut results = Vec::with_capacity(checks.len());
247
248 for check in checks {
249 results.push(check.check().await);
250 }
251
252 results
253 }
254
255 pub async fn check_liveness(&self) -> Vec<HealthCheckResult> {
257 let checks = self.checks.read().clone();
258 let mut results = Vec::new();
259
260 for check in checks.iter().filter(|c| c.is_liveness) {
261 results.push(check.check().await);
262 }
263
264 results
265 }
266
267 pub async fn check_readiness(&self) -> Vec<HealthCheckResult> {
269 let checks = self.checks.read().clone();
270 let mut results = Vec::new();
271
272 for check in checks.iter().filter(|c| c.is_readiness) {
273 results.push(check.check().await);
274 }
275
276 results
277 }
278
279 pub async fn is_alive(&self) -> bool {
281 let results = self.check_liveness().await;
282 results.iter().all(|r| r.status.is_healthy())
283 }
284
285 pub async fn is_ready(&self) -> bool {
287 let results = self.check_readiness().await;
288 results.iter().all(|r| r.status.is_healthy())
289 }
290
291 pub async fn aggregate_status(&self) -> HealthStatus {
293 let results = self.check_all().await;
294
295 if results.is_empty() {
296 return HealthStatus::Unknown;
297 }
298
299 let all_healthy = results.iter().all(|r| r.status == HealthStatus::Healthy);
300 let any_unhealthy = results.iter().any(|r| r.status == HealthStatus::Unhealthy);
301
302 if all_healthy {
303 HealthStatus::Healthy
304 } else if any_unhealthy {
305 HealthStatus::Unhealthy
306 } else {
307 HealthStatus::Degraded
308 }
309 }
310
311 pub fn check_count(&self) -> usize {
313 self.checks.read().len()
314 }
315}
316
317impl Default for HealthChecker {
318 fn default() -> Self {
319 Self {
320 checks: RwLock::new(Vec::new()),
321 check_interval: Duration::from_secs(10),
322 running: std::sync::atomic::AtomicBool::new(false),
323 }
324 }
325}
326
327#[derive(Debug, Clone, Copy, PartialEq, Eq)]
333pub enum CircuitState {
334 Closed,
336 Open,
338 HalfOpen,
340}
341
342#[derive(Debug, Clone)]
344pub struct CircuitBreakerConfig {
345 pub failure_threshold: u32,
347 pub success_threshold: u32,
349 pub recovery_timeout: Duration,
351 pub window_duration: Duration,
353 pub half_open_max_requests: u32,
355}
356
357impl Default for CircuitBreakerConfig {
358 fn default() -> Self {
359 Self {
360 failure_threshold: 5,
361 success_threshold: 3,
362 recovery_timeout: Duration::from_secs(30),
363 window_duration: Duration::from_secs(60),
364 half_open_max_requests: 3,
365 }
366 }
367}
368
369pub struct CircuitBreaker {
371 config: CircuitBreakerConfig,
373 state: RwLock<CircuitState>,
375 failure_count: AtomicU32,
377 success_count: AtomicU32,
379 opened_at: RwLock<Option<Instant>>,
381 half_open_requests: AtomicU32,
383 total_requests: AtomicU64,
385 total_failures: AtomicU64,
387 total_rejections: AtomicU64,
389}
390
391impl CircuitBreaker {
392 pub fn new() -> Arc<Self> {
394 Self::with_config(CircuitBreakerConfig::default())
395 }
396
397 pub fn with_config(config: CircuitBreakerConfig) -> Arc<Self> {
399 Arc::new(Self {
400 config,
401 state: RwLock::new(CircuitState::Closed),
402 failure_count: AtomicU32::new(0),
403 success_count: AtomicU32::new(0),
404 opened_at: RwLock::new(None),
405 half_open_requests: AtomicU32::new(0),
406 total_requests: AtomicU64::new(0),
407 total_failures: AtomicU64::new(0),
408 total_rejections: AtomicU64::new(0),
409 })
410 }
411
412 pub fn state(&self) -> CircuitState {
414 let current_state = *self.state.read();
416 if current_state == CircuitState::Open {
417 if let Some(opened_at) = *self.opened_at.read() {
418 if opened_at.elapsed() >= self.config.recovery_timeout {
419 *self.state.write() = CircuitState::HalfOpen;
420 self.half_open_requests.store(0, Ordering::SeqCst);
421 self.success_count.store(0, Ordering::SeqCst);
422 return CircuitState::HalfOpen;
423 }
424 }
425 }
426 current_state
427 }
428
429 pub fn is_allowed(&self) -> bool {
431 match self.state() {
432 CircuitState::Closed => true,
433 CircuitState::Open => false,
434 CircuitState::HalfOpen => {
435 self.half_open_requests.load(Ordering::SeqCst) < self.config.half_open_max_requests
436 }
437 }
438 }
439
440 pub fn record_success(&self) {
442 self.total_requests.fetch_add(1, Ordering::Relaxed);
443
444 let state = self.state();
445 if state == CircuitState::HalfOpen {
446 let success_count = self.success_count.fetch_add(1, Ordering::SeqCst) + 1;
447 self.half_open_requests.fetch_sub(1, Ordering::SeqCst);
448
449 if success_count >= self.config.success_threshold {
450 self.close();
451 }
452 }
453 }
454
455 pub fn record_failure(&self) {
457 self.total_requests.fetch_add(1, Ordering::Relaxed);
458 self.total_failures.fetch_add(1, Ordering::Relaxed);
459
460 let state = self.state();
461 match state {
462 CircuitState::Closed => {
463 let failure_count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
464 if failure_count >= self.config.failure_threshold {
465 self.open();
466 }
467 }
468 CircuitState::HalfOpen => {
469 self.half_open_requests.fetch_sub(1, Ordering::SeqCst);
470 self.open();
471 }
472 CircuitState::Open => {}
473 }
474 }
475
476 pub fn record_rejection(&self) {
478 self.total_rejections.fetch_add(1, Ordering::Relaxed);
479 }
480
481 fn open(&self) {
483 *self.state.write() = CircuitState::Open;
484 *self.opened_at.write() = Some(Instant::now());
485 }
486
487 fn close(&self) {
489 *self.state.write() = CircuitState::Closed;
490 *self.opened_at.write() = None;
491 self.failure_count.store(0, Ordering::SeqCst);
492 self.success_count.store(0, Ordering::SeqCst);
493 }
494
495 pub fn reset(&self) {
497 self.close();
498 }
499
500 fn acquire_half_open(&self) -> bool {
502 if self.state() != CircuitState::HalfOpen {
503 return true;
504 }
505
506 let current = self.half_open_requests.load(Ordering::SeqCst);
507 if current >= self.config.half_open_max_requests {
508 return false;
509 }
510
511 self.half_open_requests.fetch_add(1, Ordering::SeqCst);
512 true
513 }
514
515 pub async fn execute<F, Fut, T, E>(&self, operation: F) -> Result<T>
517 where
518 F: FnOnce() -> Fut,
519 Fut: Future<Output = std::result::Result<T, E>>,
520 E: std::fmt::Display,
521 {
522 if !self.is_allowed() {
523 self.record_rejection();
524 return Err(RingKernelError::BackendError(
525 "Circuit breaker is open".to_string(),
526 ));
527 }
528
529 if !self.acquire_half_open() {
530 self.record_rejection();
531 return Err(RingKernelError::BackendError(
532 "Circuit breaker half-open limit reached".to_string(),
533 ));
534 }
535
536 match operation().await {
537 Ok(result) => {
538 self.record_success();
539 Ok(result)
540 }
541 Err(e) => {
542 self.record_failure();
543 Err(RingKernelError::BackendError(format!(
544 "Operation failed: {}",
545 e
546 )))
547 }
548 }
549 }
550
551 pub fn stats(&self) -> CircuitBreakerStats {
553 CircuitBreakerStats {
554 state: self.state(),
555 total_requests: self.total_requests.load(Ordering::Relaxed),
556 total_failures: self.total_failures.load(Ordering::Relaxed),
557 total_rejections: self.total_rejections.load(Ordering::Relaxed),
558 failure_count: self.failure_count.load(Ordering::Relaxed),
559 success_count: self.success_count.load(Ordering::Relaxed),
560 }
561 }
562}
563
564impl Default for CircuitBreaker {
565 fn default() -> Self {
566 Self {
567 config: CircuitBreakerConfig::default(),
568 state: RwLock::new(CircuitState::Closed),
569 failure_count: AtomicU32::new(0),
570 success_count: AtomicU32::new(0),
571 opened_at: RwLock::new(None),
572 half_open_requests: AtomicU32::new(0),
573 total_requests: AtomicU64::new(0),
574 total_failures: AtomicU64::new(0),
575 total_rejections: AtomicU64::new(0),
576 }
577 }
578}
579
580#[derive(Debug, Clone)]
582pub struct CircuitBreakerStats {
583 pub state: CircuitState,
585 pub total_requests: u64,
587 pub total_failures: u64,
589 pub total_rejections: u64,
591 pub failure_count: u32,
593 pub success_count: u32,
595}
596
597#[derive(Debug, Clone)]
603pub enum BackoffStrategy {
604 Fixed(Duration),
606 Linear {
608 initial: Duration,
610 max: Duration,
612 },
613 Exponential {
615 initial: Duration,
617 max: Duration,
619 multiplier: f64,
621 },
622 None,
624}
625
626impl BackoffStrategy {
627 pub fn delay(&self, attempt: u32) -> Duration {
629 match self {
630 BackoffStrategy::Fixed(d) => *d,
631 BackoffStrategy::Linear { initial, max } => {
632 let delay = initial.mul_f64((attempt + 1) as f64);
633 delay.min(*max)
634 }
635 BackoffStrategy::Exponential {
636 initial,
637 max,
638 multiplier,
639 } => {
640 let factor = multiplier.powi(attempt as i32);
641 let delay = initial.mul_f64(factor);
642 delay.min(*max)
643 }
644 BackoffStrategy::None => Duration::ZERO,
645 }
646 }
647}
648
649#[derive(Clone)]
651pub struct RetryPolicy {
652 pub max_attempts: u32,
654 pub backoff: BackoffStrategy,
656 pub jitter: bool,
658 #[allow(clippy::type_complexity)]
660 retryable: Option<Arc<dyn Fn(&str) -> bool + Send + Sync>>,
661}
662
663impl std::fmt::Debug for RetryPolicy {
664 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
665 f.debug_struct("RetryPolicy")
666 .field("max_attempts", &self.max_attempts)
667 .field("backoff", &self.backoff)
668 .field("jitter", &self.jitter)
669 .field("retryable", &self.retryable.is_some())
670 .finish()
671 }
672}
673
674impl RetryPolicy {
675 pub fn new(max_attempts: u32) -> Self {
677 Self {
678 max_attempts,
679 backoff: BackoffStrategy::Exponential {
680 initial: Duration::from_millis(100),
681 max: Duration::from_secs(30),
682 multiplier: 2.0,
683 },
684 jitter: true,
685 retryable: None,
686 }
687 }
688
689 pub fn with_backoff(mut self, backoff: BackoffStrategy) -> Self {
691 self.backoff = backoff;
692 self
693 }
694
695 pub fn without_jitter(mut self) -> Self {
697 self.jitter = false;
698 self
699 }
700
701 pub fn with_retryable<F>(mut self, predicate: F) -> Self
703 where
704 F: Fn(&str) -> bool + Send + Sync + 'static,
705 {
706 self.retryable = Some(Arc::new(predicate));
707 self
708 }
709
710 pub fn is_retryable(&self, error: &str) -> bool {
712 self.retryable.as_ref().map(|p| p(error)).unwrap_or(true)
713 }
714
715 pub fn get_delay(&self, attempt: u32) -> Duration {
717 let base_delay = self.backoff.delay(attempt);
718
719 if self.jitter && base_delay > Duration::ZERO {
720 let jitter_factor = 0.75 + (rand_u64() % 50) as f64 / 200.0;
722 base_delay.mul_f64(jitter_factor)
723 } else {
724 base_delay
725 }
726 }
727
728 pub async fn execute<F, Fut, T, E>(&self, mut operation: F) -> Result<T>
730 where
731 F: FnMut() -> Fut,
732 Fut: Future<Output = std::result::Result<T, E>>,
733 E: std::fmt::Display,
734 {
735 let mut last_error = String::new();
736
737 for attempt in 0..self.max_attempts {
738 match operation().await {
739 Ok(result) => return Ok(result),
740 Err(e) => {
741 last_error = format!("{}", e);
742
743 if !self.is_retryable(&last_error) {
745 return Err(RingKernelError::BackendError(format!(
746 "Non-retryable error: {}",
747 last_error
748 )));
749 }
750
751 if attempt + 1 >= self.max_attempts {
753 break;
754 }
755
756 let delay = self.get_delay(attempt);
758 tokio::time::sleep(delay).await;
759 }
760 }
761 }
762
763 Err(RingKernelError::BackendError(format!(
764 "Operation failed after {} attempts: {}",
765 self.max_attempts, last_error
766 )))
767 }
768}
769
770impl Default for RetryPolicy {
771 fn default() -> Self {
772 Self::new(3)
773 }
774}
775
776fn rand_u64() -> u64 {
778 use std::hash::{Hash, Hasher};
779 let mut hasher = std::collections::hash_map::DefaultHasher::new();
780 std::time::SystemTime::now().hash(&mut hasher);
781 std::thread::current().id().hash(&mut hasher);
782 hasher.finish()
783}
784
785#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
791pub enum DegradationLevel {
792 Normal = 0,
794 Light = 1,
796 Moderate = 2,
798 Severe = 3,
800 Critical = 4,
802}
803
804impl DegradationLevel {
805 pub fn next_worse(self) -> Self {
809 match self {
810 DegradationLevel::Normal => DegradationLevel::Light,
811 DegradationLevel::Light => DegradationLevel::Moderate,
812 DegradationLevel::Moderate => DegradationLevel::Severe,
813 DegradationLevel::Severe => DegradationLevel::Critical,
814 DegradationLevel::Critical => DegradationLevel::Critical,
815 }
816 }
817
818 pub fn next_better(self) -> Self {
822 match self {
823 DegradationLevel::Normal => DegradationLevel::Normal,
824 DegradationLevel::Light => DegradationLevel::Normal,
825 DegradationLevel::Moderate => DegradationLevel::Light,
826 DegradationLevel::Severe => DegradationLevel::Moderate,
827 DegradationLevel::Critical => DegradationLevel::Severe,
828 }
829 }
830}
831
832#[derive(Debug, Clone)]
834pub struct LoadSheddingPolicy {
835 pub queue_threshold: usize,
837 pub cpu_threshold: f64,
839 pub memory_threshold: f64,
841 pub shed_ratio: f64,
843}
844
845impl Default for LoadSheddingPolicy {
846 fn default() -> Self {
847 Self {
848 queue_threshold: 10000,
849 cpu_threshold: 0.9,
850 memory_threshold: 0.85,
851 shed_ratio: 0.1,
852 }
853 }
854}
855
856pub struct DegradationManager {
858 level: RwLock<DegradationLevel>,
860 policy: LoadSheddingPolicy,
862 #[allow(clippy::type_complexity)]
864 callbacks: RwLock<Vec<Arc<dyn Fn(DegradationLevel, DegradationLevel) + Send + Sync>>>,
865 shed_counter: AtomicU64,
867 total_requests: AtomicU64,
869 shed_requests: AtomicU64,
871}
872
873impl DegradationManager {
874 pub fn new() -> Arc<Self> {
876 Arc::new(Self {
877 level: RwLock::new(DegradationLevel::Normal),
878 policy: LoadSheddingPolicy::default(),
879 callbacks: RwLock::new(Vec::new()),
880 shed_counter: AtomicU64::new(0),
881 total_requests: AtomicU64::new(0),
882 shed_requests: AtomicU64::new(0),
883 })
884 }
885
886 pub fn with_policy(policy: LoadSheddingPolicy) -> Arc<Self> {
888 Arc::new(Self {
889 level: RwLock::new(DegradationLevel::Normal),
890 policy,
891 callbacks: RwLock::new(Vec::new()),
892 shed_counter: AtomicU64::new(0),
893 total_requests: AtomicU64::new(0),
894 shed_requests: AtomicU64::new(0),
895 })
896 }
897
898 pub fn level(&self) -> DegradationLevel {
900 *self.level.read()
901 }
902
903 pub fn set_level(&self, new_level: DegradationLevel) {
905 let old_level = *self.level.read();
906 if old_level != new_level {
907 *self.level.write() = new_level;
908
909 let callbacks = self.callbacks.read().clone();
911 for callback in callbacks {
912 callback(old_level, new_level);
913 }
914 }
915 }
916
917 pub fn on_level_change<F>(&self, callback: F)
919 where
920 F: Fn(DegradationLevel, DegradationLevel) + Send + Sync + 'static,
921 {
922 self.callbacks.write().push(Arc::new(callback));
923 }
924
925 pub fn should_shed(&self) -> bool {
927 self.total_requests.fetch_add(1, Ordering::Relaxed);
928
929 let level = self.level();
930 if level == DegradationLevel::Normal {
931 return false;
932 }
933
934 let base_ratio = self.policy.shed_ratio;
936 let level_factor = match level {
937 DegradationLevel::Normal => 0.0,
938 DegradationLevel::Light => 1.0,
939 DegradationLevel::Moderate => 2.0,
940 DegradationLevel::Severe => 3.0,
941 DegradationLevel::Critical => 4.0,
942 };
943
944 let shed_probability = (base_ratio * level_factor).min(0.9);
945
946 let counter = self.shed_counter.fetch_add(1, Ordering::Relaxed);
948 let should_shed = (counter % 100) < (shed_probability * 100.0) as u64;
949
950 if should_shed {
951 self.shed_requests.fetch_add(1, Ordering::Relaxed);
952 }
953
954 should_shed
955 }
956
957 pub fn is_feature_disabled(&self, required_level: DegradationLevel) -> bool {
959 self.level() > required_level
960 }
961
962 pub fn stats(&self) -> DegradationStats {
964 let total = self.total_requests.load(Ordering::Relaxed);
965 let shed = self.shed_requests.load(Ordering::Relaxed);
966
967 DegradationStats {
968 level: self.level(),
969 total_requests: total,
970 shed_requests: shed,
971 shed_ratio: if total > 0 {
972 shed as f64 / total as f64
973 } else {
974 0.0
975 },
976 }
977 }
978}
979
980impl Default for DegradationManager {
981 fn default() -> Self {
982 Self {
983 level: RwLock::new(DegradationLevel::Normal),
984 policy: LoadSheddingPolicy::default(),
985 callbacks: RwLock::new(Vec::new()),
986 shed_counter: AtomicU64::new(0),
987 total_requests: AtomicU64::new(0),
988 shed_requests: AtomicU64::new(0),
989 }
990 }
991}
992
993#[derive(Debug, Clone)]
995pub struct DegradationStats {
996 pub level: DegradationLevel,
998 pub total_requests: u64,
1000 pub shed_requests: u64,
1002 pub shed_ratio: f64,
1004}
1005
1006#[derive(Debug, Clone)]
1012pub struct KernelHealth {
1013 pub kernel_id: KernelId,
1015 pub last_heartbeat: Instant,
1017 pub status: HealthStatus,
1019 pub failure_count: u32,
1021 pub messages_per_sec: f64,
1023 pub queue_depth: usize,
1025}
1026
1027pub struct KernelWatchdog {
1029 kernels: RwLock<HashMap<KernelId, KernelHealth>>,
1031 heartbeat_timeout: Duration,
1033 #[allow(dead_code)]
1035 check_interval: Duration,
1036 failure_threshold: u32,
1038 #[allow(dead_code)]
1040 running: std::sync::atomic::AtomicBool,
1041 #[allow(clippy::type_complexity)]
1043 callbacks: RwLock<Vec<Arc<dyn Fn(&KernelHealth) + Send + Sync>>>,
1044}
1045
1046impl KernelWatchdog {
1047 pub fn new() -> Arc<Self> {
1049 Arc::new(Self {
1050 kernels: RwLock::new(HashMap::new()),
1051 heartbeat_timeout: Duration::from_secs(30),
1052 check_interval: Duration::from_secs(5),
1053 failure_threshold: 3,
1054 running: std::sync::atomic::AtomicBool::new(false),
1055 callbacks: RwLock::new(Vec::new()),
1056 })
1057 }
1058
1059 pub fn with_heartbeat_timeout(self: Arc<Self>, timeout: Duration) -> Arc<Self> {
1061 let _ = timeout; self
1063 }
1064
1065 pub fn watch(&self, kernel_id: KernelId) {
1067 let health = KernelHealth {
1068 kernel_id: kernel_id.clone(),
1069 last_heartbeat: Instant::now(),
1070 status: HealthStatus::Healthy,
1071 failure_count: 0,
1072 messages_per_sec: 0.0,
1073 queue_depth: 0,
1074 };
1075 self.kernels.write().insert(kernel_id, health);
1076 }
1077
1078 pub fn unwatch(&self, kernel_id: &KernelId) {
1080 self.kernels.write().remove(kernel_id);
1081 }
1082
1083 pub fn heartbeat(&self, kernel_id: &KernelId) {
1085 if let Some(health) = self.kernels.write().get_mut(kernel_id) {
1086 health.last_heartbeat = Instant::now();
1087 health.failure_count = 0;
1088 if health.status == HealthStatus::Unhealthy {
1089 health.status = HealthStatus::Healthy;
1090 }
1091 }
1092 }
1093
1094 pub fn update_metrics(&self, kernel_id: &KernelId, messages_per_sec: f64, queue_depth: usize) {
1096 if let Some(health) = self.kernels.write().get_mut(kernel_id) {
1097 health.messages_per_sec = messages_per_sec;
1098 health.queue_depth = queue_depth;
1099 }
1100 }
1101
1102 pub fn check_all(&self) -> Vec<KernelHealth> {
1104 let now = Instant::now();
1105 let mut kernels = self.kernels.write();
1106 let mut results = Vec::with_capacity(kernels.len());
1107
1108 for health in kernels.values_mut() {
1109 if now.duration_since(health.last_heartbeat) > self.heartbeat_timeout {
1111 health.failure_count += 1;
1112 if health.failure_count >= self.failure_threshold {
1113 health.status = HealthStatus::Unhealthy;
1114 } else {
1115 health.status = HealthStatus::Degraded;
1116 }
1117 }
1118
1119 results.push(health.clone());
1120 }
1121
1122 drop(kernels);
1124 let callbacks = self.callbacks.read().clone();
1125 for health in results
1126 .iter()
1127 .filter(|h| h.status == HealthStatus::Unhealthy)
1128 {
1129 for callback in &callbacks {
1130 callback(health);
1131 }
1132 }
1133
1134 results
1135 }
1136
1137 pub fn on_unhealthy<F>(&self, callback: F)
1139 where
1140 F: Fn(&KernelHealth) + Send + Sync + 'static,
1141 {
1142 self.callbacks.write().push(Arc::new(callback));
1143 }
1144
1145 pub fn get_health(&self, kernel_id: &KernelId) -> Option<KernelHealth> {
1147 self.kernels.read().get(kernel_id).cloned()
1148 }
1149
1150 pub fn unhealthy_kernels(&self) -> Vec<KernelHealth> {
1152 self.kernels
1153 .read()
1154 .values()
1155 .filter(|h| h.status == HealthStatus::Unhealthy)
1156 .cloned()
1157 .collect()
1158 }
1159
1160 pub fn watched_count(&self) -> usize {
1162 self.kernels.read().len()
1163 }
1164}
1165
1166impl Default for KernelWatchdog {
1167 fn default() -> Self {
1168 Self {
1169 kernels: RwLock::new(HashMap::new()),
1170 heartbeat_timeout: Duration::from_secs(30),
1171 check_interval: Duration::from_secs(5),
1172 failure_threshold: 3,
1173 running: std::sync::atomic::AtomicBool::new(false),
1174 callbacks: RwLock::new(Vec::new()),
1175 }
1176 }
1177}
1178
1179#[cfg(test)]
1180mod tests {
1181 use super::*;
1182
1183 #[test]
1184 fn test_health_status() {
1185 assert!(HealthStatus::Healthy.is_healthy());
1186 assert!(HealthStatus::Degraded.is_healthy());
1187 assert!(!HealthStatus::Unhealthy.is_healthy());
1188 assert!(HealthStatus::Unhealthy.is_unhealthy());
1189 }
1190
1191 #[tokio::test]
1192 async fn test_health_checker() {
1193 let checker = HealthChecker::new();
1194
1195 checker.register_liveness("test_alive", || async { true });
1196 checker.register_readiness("test_ready", || async { true });
1197
1198 assert_eq!(checker.check_count(), 2);
1199 assert!(checker.is_alive().await);
1200 assert!(checker.is_ready().await);
1201 }
1202
1203 #[tokio::test]
1204 async fn test_health_checker_unhealthy() {
1205 let checker = HealthChecker::new();
1206
1207 checker.register_liveness("failing_check", || async { false });
1208
1209 assert!(!checker.is_alive().await);
1210
1211 let status = checker.aggregate_status().await;
1212 assert_eq!(status, HealthStatus::Unhealthy);
1213 }
1214
1215 #[test]
1216 fn test_circuit_breaker_initial_state() {
1217 let breaker = CircuitBreaker::new();
1218 assert_eq!(breaker.state(), CircuitState::Closed);
1219 assert!(breaker.is_allowed());
1220 }
1221
1222 #[test]
1223 fn test_circuit_breaker_opens_on_failures() {
1224 let config = CircuitBreakerConfig {
1225 failure_threshold: 3,
1226 ..Default::default()
1227 };
1228 let breaker = CircuitBreaker::with_config(config);
1229
1230 breaker.record_failure();
1231 breaker.record_failure();
1232 assert_eq!(breaker.state(), CircuitState::Closed);
1233
1234 breaker.record_failure();
1235 assert_eq!(breaker.state(), CircuitState::Open);
1236 assert!(!breaker.is_allowed());
1237 }
1238
1239 #[test]
1240 fn test_circuit_breaker_reset() {
1241 let config = CircuitBreakerConfig {
1242 failure_threshold: 1,
1243 ..Default::default()
1244 };
1245 let breaker = CircuitBreaker::with_config(config);
1246
1247 breaker.record_failure();
1248 assert_eq!(breaker.state(), CircuitState::Open);
1249
1250 breaker.reset();
1251 assert_eq!(breaker.state(), CircuitState::Closed);
1252 }
1253
1254 #[test]
1255 fn test_backoff_strategy_fixed() {
1256 let backoff = BackoffStrategy::Fixed(Duration::from_secs(1));
1257 assert_eq!(backoff.delay(0), Duration::from_secs(1));
1258 assert_eq!(backoff.delay(5), Duration::from_secs(1));
1259 }
1260
1261 #[test]
1262 fn test_backoff_strategy_exponential() {
1263 let backoff = BackoffStrategy::Exponential {
1264 initial: Duration::from_millis(100),
1265 max: Duration::from_secs(10),
1266 multiplier: 2.0,
1267 };
1268
1269 assert_eq!(backoff.delay(0), Duration::from_millis(100));
1270 assert_eq!(backoff.delay(1), Duration::from_millis(200));
1271 assert_eq!(backoff.delay(2), Duration::from_millis(400));
1272 }
1273
1274 #[test]
1275 fn test_backoff_strategy_linear() {
1276 let backoff = BackoffStrategy::Linear {
1277 initial: Duration::from_millis(100),
1278 max: Duration::from_secs(1),
1279 };
1280
1281 assert_eq!(backoff.delay(0), Duration::from_millis(100));
1282 assert_eq!(backoff.delay(1), Duration::from_millis(200));
1283 assert_eq!(backoff.delay(9), Duration::from_secs(1)); }
1285
1286 #[tokio::test]
1287 async fn test_retry_policy_success() {
1288 let policy = RetryPolicy::new(3);
1289
1290 let result: Result<i32> = policy.execute(|| async { Ok::<_, &str>(42) }).await;
1291
1292 assert!(result.is_ok());
1293 assert_eq!(result.unwrap(), 42);
1294 }
1295
1296 #[test]
1297 fn test_degradation_manager_levels() {
1298 let manager = DegradationManager::new();
1299
1300 assert_eq!(manager.level(), DegradationLevel::Normal);
1301
1302 manager.set_level(DegradationLevel::Moderate);
1303 assert_eq!(manager.level(), DegradationLevel::Moderate);
1304 }
1305
1306 #[test]
1307 fn test_degradation_feature_disabled() {
1308 let manager = DegradationManager::new();
1309
1310 manager.set_level(DegradationLevel::Severe);
1311
1312 assert!(!manager.is_feature_disabled(DegradationLevel::Critical));
1313 assert!(manager.is_feature_disabled(DegradationLevel::Moderate));
1314 assert!(manager.is_feature_disabled(DegradationLevel::Normal));
1315 }
1316
1317 #[test]
1318 fn test_kernel_watchdog() {
1319 let watchdog = KernelWatchdog::new();
1320
1321 let kernel_id = KernelId::new("test_kernel");
1322 watchdog.watch(kernel_id.clone());
1323
1324 assert_eq!(watchdog.watched_count(), 1);
1325
1326 watchdog.heartbeat(&kernel_id);
1327 let health = watchdog.get_health(&kernel_id).unwrap();
1328 assert_eq!(health.status, HealthStatus::Healthy);
1329 }
1330
1331 #[test]
1332 fn test_kernel_watchdog_metrics() {
1333 let watchdog = KernelWatchdog::new();
1334
1335 let kernel_id = KernelId::new("test_kernel");
1336 watchdog.watch(kernel_id.clone());
1337
1338 watchdog.update_metrics(&kernel_id, 1000.0, 50);
1339
1340 let health = watchdog.get_health(&kernel_id).unwrap();
1341 assert_eq!(health.messages_per_sec, 1000.0);
1342 assert_eq!(health.queue_depth, 50);
1343 }
1344}