rustkernel_core/resilience/
health.rs

1//! Health Checking
2//!
3//! Provides health checking infrastructure for kernels and the runtime.
4//!
5//! # Features
6//!
7//! - Liveness probes (is the kernel alive?)
8//! - Readiness probes (is the kernel ready to serve?)
9//! - Health aggregation for kernel groups
10//! - Degradation mode support
11//!
12//! # Example
13//!
14//! ```rust,ignore
15//! use rustkernel_core::resilience::health::{HealthCheck, HealthProbe};
16//!
17//! let probe = HealthProbe::new("graph/pagerank")
18//!     .with_interval(Duration::from_secs(10))
19//!     .with_timeout(Duration::from_secs(5));
20//!
21//! let result = probe.check(&kernel).await;
22//! println!("Health: {:?}", result.status);
23//! ```
24
25use crate::traits::HealthStatus;
26use serde::{Deserialize, Serialize};
27use std::time::{Duration, Instant};
28
29/// Health check result
30#[derive(Debug, Clone, Serialize)]
31pub struct HealthCheckResult {
32    /// Overall health status
33    pub status: HealthStatus,
34    /// Kernel ID (if applicable)
35    pub kernel_id: Option<String>,
36    /// Check timestamp
37    pub timestamp: chrono::DateTime<chrono::Utc>,
38    /// Check duration
39    pub duration: Duration,
40    /// Additional details
41    pub details: Option<HealthDetails>,
42    /// Error message (if unhealthy)
43    pub error: Option<String>,
44}
45
46impl HealthCheckResult {
47    /// Create a healthy result
48    pub fn healthy() -> Self {
49        Self {
50            status: HealthStatus::Healthy,
51            kernel_id: None,
52            timestamp: chrono::Utc::now(),
53            duration: Duration::ZERO,
54            details: None,
55            error: None,
56        }
57    }
58
59    /// Create an unhealthy result
60    pub fn unhealthy(error: impl Into<String>) -> Self {
61        Self {
62            status: HealthStatus::Unhealthy,
63            kernel_id: None,
64            timestamp: chrono::Utc::now(),
65            duration: Duration::ZERO,
66            details: None,
67            error: Some(error.into()),
68        }
69    }
70
71    /// Create a degraded result
72    pub fn degraded(reason: impl Into<String>) -> Self {
73        Self {
74            status: HealthStatus::Degraded,
75            kernel_id: None,
76            timestamp: chrono::Utc::now(),
77            duration: Duration::ZERO,
78            details: None,
79            error: Some(reason.into()),
80        }
81    }
82
83    /// Set kernel ID
84    pub fn with_kernel_id(mut self, id: impl Into<String>) -> Self {
85        self.kernel_id = Some(id.into());
86        self
87    }
88
89    /// Set duration
90    pub fn with_duration(mut self, duration: Duration) -> Self {
91        self.duration = duration;
92        self
93    }
94
95    /// Set details
96    pub fn with_details(mut self, details: HealthDetails) -> Self {
97        self.details = Some(details);
98        self
99    }
100
101    /// Check if healthy
102    pub fn is_healthy(&self) -> bool {
103        self.status == HealthStatus::Healthy
104    }
105
106    /// Check if degraded
107    pub fn is_degraded(&self) -> bool {
108        self.status == HealthStatus::Degraded
109    }
110
111    /// Check if unhealthy
112    pub fn is_unhealthy(&self) -> bool {
113        self.status == HealthStatus::Unhealthy
114    }
115}
116
117/// Additional health check details
118#[derive(Debug, Clone, Default, Serialize, Deserialize)]
119pub struct HealthDetails {
120    /// Queue depth
121    pub queue_depth: Option<u64>,
122    /// Messages processed
123    pub messages_processed: Option<u64>,
124    /// Error rate
125    pub error_rate: Option<f64>,
126    /// Average latency in microseconds
127    pub avg_latency_us: Option<f64>,
128    /// GPU memory usage in bytes
129    pub gpu_memory_bytes: Option<u64>,
130    /// Custom metrics
131    pub custom: std::collections::HashMap<String, serde_json::Value>,
132}
133
134impl HealthDetails {
135    /// Create new health details
136    pub fn new() -> Self {
137        Self::default()
138    }
139
140    /// Set queue depth
141    pub fn with_queue_depth(mut self, depth: u64) -> Self {
142        self.queue_depth = Some(depth);
143        self
144    }
145
146    /// Set messages processed
147    pub fn with_messages(mut self, count: u64) -> Self {
148        self.messages_processed = Some(count);
149        self
150    }
151
152    /// Set error rate
153    pub fn with_error_rate(mut self, rate: f64) -> Self {
154        self.error_rate = Some(rate);
155        self
156    }
157
158    /// Set average latency
159    pub fn with_latency(mut self, latency_us: f64) -> Self {
160        self.avg_latency_us = Some(latency_us);
161        self
162    }
163
164    /// Set GPU memory
165    pub fn with_gpu_memory(mut self, bytes: u64) -> Self {
166        self.gpu_memory_bytes = Some(bytes);
167        self
168    }
169
170    /// Add custom metric
171    pub fn with_custom(mut self, key: impl Into<String>, value: impl Serialize) -> Self {
172        if let Ok(json_value) = serde_json::to_value(value) {
173            self.custom.insert(key.into(), json_value);
174        }
175        self
176    }
177}
178
179/// Health check configuration
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct HealthCheckConfig {
182    /// Check interval
183    pub interval: Duration,
184    /// Check timeout
185    pub timeout: Duration,
186    /// Number of consecutive failures before unhealthy
187    pub failure_threshold: u32,
188    /// Number of consecutive successes before healthy
189    pub success_threshold: u32,
190    /// Enable liveness checks
191    pub liveness_enabled: bool,
192    /// Enable readiness checks
193    pub readiness_enabled: bool,
194}
195
196impl Default for HealthCheckConfig {
197    fn default() -> Self {
198        Self {
199            interval: Duration::from_secs(10),
200            timeout: Duration::from_secs(5),
201            failure_threshold: 3,
202            success_threshold: 1,
203            liveness_enabled: true,
204            readiness_enabled: true,
205        }
206    }
207}
208
209/// Health probe for a kernel
210pub struct HealthProbe {
211    /// Kernel ID
212    kernel_id: String,
213    /// Configuration
214    config: HealthCheckConfig,
215    /// Last check result
216    last_result: Option<HealthCheckResult>,
217    /// Consecutive failures
218    consecutive_failures: u32,
219    /// Consecutive successes
220    consecutive_successes: u32,
221}
222
223impl HealthProbe {
224    /// Create a new health probe
225    pub fn new(kernel_id: impl Into<String>) -> Self {
226        Self {
227            kernel_id: kernel_id.into(),
228            config: HealthCheckConfig::default(),
229            last_result: None,
230            consecutive_failures: 0,
231            consecutive_successes: 0,
232        }
233    }
234
235    /// Set check interval
236    pub fn with_interval(mut self, interval: Duration) -> Self {
237        self.config.interval = interval;
238        self
239    }
240
241    /// Set check timeout
242    pub fn with_timeout(mut self, timeout: Duration) -> Self {
243        self.config.timeout = timeout;
244        self
245    }
246
247    /// Set failure threshold
248    pub fn with_failure_threshold(mut self, threshold: u32) -> Self {
249        self.config.failure_threshold = threshold;
250        self
251    }
252
253    /// Get the kernel ID
254    pub fn kernel_id(&self) -> &str {
255        &self.kernel_id
256    }
257
258    /// Get last check result
259    pub fn last_result(&self) -> Option<&HealthCheckResult> {
260        self.last_result.as_ref()
261    }
262
263    /// Check kernel health
264    pub async fn check<K: crate::traits::GpuKernel>(&mut self, kernel: &K) -> HealthCheckResult {
265        let start = Instant::now();
266
267        // Perform health check with timeout
268        let status = match tokio::time::timeout(self.config.timeout, async {
269            kernel.health_check()
270        })
271        .await
272        {
273            Ok(status) => status,
274            Err(_) => {
275                // Timeout - treat as unhealthy
276                self.record_failure();
277                let result = HealthCheckResult::unhealthy("Health check timed out")
278                    .with_kernel_id(&self.kernel_id)
279                    .with_duration(start.elapsed());
280                self.last_result = Some(result.clone());
281                return result;
282            }
283        };
284
285        let result = match status {
286            HealthStatus::Healthy => {
287                self.record_success();
288                HealthCheckResult::healthy()
289            }
290            HealthStatus::Degraded => {
291                self.record_failure();
292                HealthCheckResult::degraded("Kernel reported degraded status")
293            }
294            HealthStatus::Unhealthy => {
295                self.record_failure();
296                HealthCheckResult::unhealthy("Kernel reported unhealthy status")
297            }
298            HealthStatus::Unknown => {
299                self.record_failure();
300                HealthCheckResult::unhealthy("Kernel health unknown")
301            }
302        };
303
304        let result = result
305            .with_kernel_id(&self.kernel_id)
306            .with_duration(start.elapsed());
307
308        self.last_result = Some(result.clone());
309        result
310    }
311
312    fn record_success(&mut self) {
313        self.consecutive_successes += 1;
314        self.consecutive_failures = 0;
315    }
316
317    fn record_failure(&mut self) {
318        self.consecutive_failures += 1;
319        self.consecutive_successes = 0;
320    }
321
322    /// Check if kernel should be considered unhealthy
323    pub fn is_unhealthy(&self) -> bool {
324        self.consecutive_failures >= self.config.failure_threshold
325    }
326
327    /// Check if kernel should be considered healthy
328    pub fn is_healthy(&self) -> bool {
329        self.consecutive_successes >= self.config.success_threshold
330    }
331}
332
333/// Health check trait for components
334pub trait HealthCheck {
335    /// Perform a health check
336    fn check_health(&self) -> HealthCheckResult;
337
338    /// Check if component is alive (liveness)
339    fn is_alive(&self) -> bool {
340        self.check_health().status != HealthStatus::Unhealthy
341    }
342
343    /// Check if component is ready (readiness)
344    fn is_ready(&self) -> bool {
345        self.check_health().status == HealthStatus::Healthy
346    }
347}
348
349/// Aggregate health from multiple checks
350pub fn aggregate_health(results: &[HealthCheckResult]) -> HealthCheckResult {
351    if results.is_empty() {
352        return HealthCheckResult::healthy();
353    }
354
355    let mut unhealthy_count = 0;
356    let mut degraded_count = 0;
357    let mut errors = Vec::new();
358
359    for result in results {
360        match result.status {
361            HealthStatus::Unhealthy => {
362                unhealthy_count += 1;
363                if let Some(ref error) = result.error {
364                    errors.push(error.clone());
365                }
366            }
367            HealthStatus::Degraded => {
368                degraded_count += 1;
369            }
370            _ => {}
371        }
372    }
373
374    if unhealthy_count > 0 {
375        HealthCheckResult::unhealthy(format!(
376            "{} unhealthy: {}",
377            unhealthy_count,
378            errors.join(", ")
379        ))
380    } else if degraded_count > 0 {
381        HealthCheckResult::degraded(format!("{} degraded", degraded_count))
382    } else {
383        HealthCheckResult::healthy()
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390
391    #[test]
392    fn test_health_check_result() {
393        let healthy = HealthCheckResult::healthy();
394        assert!(healthy.is_healthy());
395
396        let unhealthy = HealthCheckResult::unhealthy("test error");
397        assert!(unhealthy.is_unhealthy());
398        assert_eq!(unhealthy.error.as_deref(), Some("test error"));
399
400        let degraded = HealthCheckResult::degraded("test degradation");
401        assert!(degraded.is_degraded());
402    }
403
404    #[test]
405    fn test_health_details() {
406        let details = HealthDetails::new()
407            .with_queue_depth(100)
408            .with_error_rate(0.01)
409            .with_latency(150.0);
410
411        assert_eq!(details.queue_depth, Some(100));
412        assert_eq!(details.error_rate, Some(0.01));
413        assert_eq!(details.avg_latency_us, Some(150.0));
414    }
415
416    #[test]
417    fn test_aggregate_health_all_healthy() {
418        let results = vec![
419            HealthCheckResult::healthy(),
420            HealthCheckResult::healthy(),
421            HealthCheckResult::healthy(),
422        ];
423
424        let aggregate = aggregate_health(&results);
425        assert!(aggregate.is_healthy());
426    }
427
428    #[test]
429    fn test_aggregate_health_some_unhealthy() {
430        let results = vec![
431            HealthCheckResult::healthy(),
432            HealthCheckResult::unhealthy("kernel1 failed"),
433            HealthCheckResult::healthy(),
434        ];
435
436        let aggregate = aggregate_health(&results);
437        assert!(aggregate.is_unhealthy());
438    }
439
440    #[test]
441    fn test_aggregate_health_some_degraded() {
442        let results = vec![
443            HealthCheckResult::healthy(),
444            HealthCheckResult::degraded("kernel1 slow"),
445            HealthCheckResult::healthy(),
446        ];
447
448        let aggregate = aggregate_health(&results);
449        assert!(aggregate.is_degraded());
450    }
451
452    #[test]
453    fn test_health_probe_creation() {
454        let probe = HealthProbe::new("graph/pagerank")
455            .with_interval(Duration::from_secs(30))
456            .with_timeout(Duration::from_secs(10));
457
458        assert_eq!(probe.kernel_id(), "graph/pagerank");
459        assert_eq!(probe.config.interval, Duration::from_secs(30));
460        assert_eq!(probe.config.timeout, Duration::from_secs(10));
461    }
462}