rustkernel_core/resilience/
health.rs1use crate::traits::HealthStatus;
26use serde::{Deserialize, Serialize};
27use std::time::{Duration, Instant};
28
29#[derive(Debug, Clone, Serialize)]
31pub struct HealthCheckResult {
32 pub status: HealthStatus,
34 pub kernel_id: Option<String>,
36 pub timestamp: chrono::DateTime<chrono::Utc>,
38 pub duration: Duration,
40 pub details: Option<HealthDetails>,
42 pub error: Option<String>,
44}
45
46impl HealthCheckResult {
47 pub fn healthy() -> Self {
49 Self {
50 status: HealthStatus::Healthy,
51 kernel_id: None,
52 timestamp: chrono::Utc::now(),
53 duration: Duration::ZERO,
54 details: None,
55 error: None,
56 }
57 }
58
59 pub fn unhealthy(error: impl Into<String>) -> Self {
61 Self {
62 status: HealthStatus::Unhealthy,
63 kernel_id: None,
64 timestamp: chrono::Utc::now(),
65 duration: Duration::ZERO,
66 details: None,
67 error: Some(error.into()),
68 }
69 }
70
71 pub fn degraded(reason: impl Into<String>) -> Self {
73 Self {
74 status: HealthStatus::Degraded,
75 kernel_id: None,
76 timestamp: chrono::Utc::now(),
77 duration: Duration::ZERO,
78 details: None,
79 error: Some(reason.into()),
80 }
81 }
82
83 pub fn with_kernel_id(mut self, id: impl Into<String>) -> Self {
85 self.kernel_id = Some(id.into());
86 self
87 }
88
89 pub fn with_duration(mut self, duration: Duration) -> Self {
91 self.duration = duration;
92 self
93 }
94
95 pub fn with_details(mut self, details: HealthDetails) -> Self {
97 self.details = Some(details);
98 self
99 }
100
101 pub fn is_healthy(&self) -> bool {
103 self.status == HealthStatus::Healthy
104 }
105
106 pub fn is_degraded(&self) -> bool {
108 self.status == HealthStatus::Degraded
109 }
110
111 pub fn is_unhealthy(&self) -> bool {
113 self.status == HealthStatus::Unhealthy
114 }
115}
116
117#[derive(Debug, Clone, Default, Serialize, Deserialize)]
119pub struct HealthDetails {
120 pub queue_depth: Option<u64>,
122 pub messages_processed: Option<u64>,
124 pub error_rate: Option<f64>,
126 pub avg_latency_us: Option<f64>,
128 pub gpu_memory_bytes: Option<u64>,
130 pub custom: std::collections::HashMap<String, serde_json::Value>,
132}
133
134impl HealthDetails {
135 pub fn new() -> Self {
137 Self::default()
138 }
139
140 pub fn with_queue_depth(mut self, depth: u64) -> Self {
142 self.queue_depth = Some(depth);
143 self
144 }
145
146 pub fn with_messages(mut self, count: u64) -> Self {
148 self.messages_processed = Some(count);
149 self
150 }
151
152 pub fn with_error_rate(mut self, rate: f64) -> Self {
154 self.error_rate = Some(rate);
155 self
156 }
157
158 pub fn with_latency(mut self, latency_us: f64) -> Self {
160 self.avg_latency_us = Some(latency_us);
161 self
162 }
163
164 pub fn with_gpu_memory(mut self, bytes: u64) -> Self {
166 self.gpu_memory_bytes = Some(bytes);
167 self
168 }
169
170 pub fn with_custom(mut self, key: impl Into<String>, value: impl Serialize) -> Self {
172 if let Ok(json_value) = serde_json::to_value(value) {
173 self.custom.insert(key.into(), json_value);
174 }
175 self
176 }
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct HealthCheckConfig {
182 pub interval: Duration,
184 pub timeout: Duration,
186 pub failure_threshold: u32,
188 pub success_threshold: u32,
190 pub liveness_enabled: bool,
192 pub readiness_enabled: bool,
194}
195
196impl Default for HealthCheckConfig {
197 fn default() -> Self {
198 Self {
199 interval: Duration::from_secs(10),
200 timeout: Duration::from_secs(5),
201 failure_threshold: 3,
202 success_threshold: 1,
203 liveness_enabled: true,
204 readiness_enabled: true,
205 }
206 }
207}
208
209pub struct HealthProbe {
211 kernel_id: String,
213 config: HealthCheckConfig,
215 last_result: Option<HealthCheckResult>,
217 consecutive_failures: u32,
219 consecutive_successes: u32,
221}
222
223impl HealthProbe {
224 pub fn new(kernel_id: impl Into<String>) -> Self {
226 Self {
227 kernel_id: kernel_id.into(),
228 config: HealthCheckConfig::default(),
229 last_result: None,
230 consecutive_failures: 0,
231 consecutive_successes: 0,
232 }
233 }
234
235 pub fn with_interval(mut self, interval: Duration) -> Self {
237 self.config.interval = interval;
238 self
239 }
240
241 pub fn with_timeout(mut self, timeout: Duration) -> Self {
243 self.config.timeout = timeout;
244 self
245 }
246
247 pub fn with_failure_threshold(mut self, threshold: u32) -> Self {
249 self.config.failure_threshold = threshold;
250 self
251 }
252
253 pub fn kernel_id(&self) -> &str {
255 &self.kernel_id
256 }
257
258 pub fn last_result(&self) -> Option<&HealthCheckResult> {
260 self.last_result.as_ref()
261 }
262
263 pub async fn check<K: crate::traits::GpuKernel>(&mut self, kernel: &K) -> HealthCheckResult {
265 let start = Instant::now();
266
267 let status = match tokio::time::timeout(self.config.timeout, async {
269 kernel.health_check()
270 })
271 .await
272 {
273 Ok(status) => status,
274 Err(_) => {
275 self.record_failure();
277 let result = HealthCheckResult::unhealthy("Health check timed out")
278 .with_kernel_id(&self.kernel_id)
279 .with_duration(start.elapsed());
280 self.last_result = Some(result.clone());
281 return result;
282 }
283 };
284
285 let result = match status {
286 HealthStatus::Healthy => {
287 self.record_success();
288 HealthCheckResult::healthy()
289 }
290 HealthStatus::Degraded => {
291 self.record_failure();
292 HealthCheckResult::degraded("Kernel reported degraded status")
293 }
294 HealthStatus::Unhealthy => {
295 self.record_failure();
296 HealthCheckResult::unhealthy("Kernel reported unhealthy status")
297 }
298 HealthStatus::Unknown => {
299 self.record_failure();
300 HealthCheckResult::unhealthy("Kernel health unknown")
301 }
302 };
303
304 let result = result
305 .with_kernel_id(&self.kernel_id)
306 .with_duration(start.elapsed());
307
308 self.last_result = Some(result.clone());
309 result
310 }
311
312 fn record_success(&mut self) {
313 self.consecutive_successes += 1;
314 self.consecutive_failures = 0;
315 }
316
317 fn record_failure(&mut self) {
318 self.consecutive_failures += 1;
319 self.consecutive_successes = 0;
320 }
321
322 pub fn is_unhealthy(&self) -> bool {
324 self.consecutive_failures >= self.config.failure_threshold
325 }
326
327 pub fn is_healthy(&self) -> bool {
329 self.consecutive_successes >= self.config.success_threshold
330 }
331}
332
333pub trait HealthCheck {
335 fn check_health(&self) -> HealthCheckResult;
337
338 fn is_alive(&self) -> bool {
340 self.check_health().status != HealthStatus::Unhealthy
341 }
342
343 fn is_ready(&self) -> bool {
345 self.check_health().status == HealthStatus::Healthy
346 }
347}
348
349pub fn aggregate_health(results: &[HealthCheckResult]) -> HealthCheckResult {
351 if results.is_empty() {
352 return HealthCheckResult::healthy();
353 }
354
355 let mut unhealthy_count = 0;
356 let mut degraded_count = 0;
357 let mut errors = Vec::new();
358
359 for result in results {
360 match result.status {
361 HealthStatus::Unhealthy => {
362 unhealthy_count += 1;
363 if let Some(ref error) = result.error {
364 errors.push(error.clone());
365 }
366 }
367 HealthStatus::Degraded => {
368 degraded_count += 1;
369 }
370 _ => {}
371 }
372 }
373
374 if unhealthy_count > 0 {
375 HealthCheckResult::unhealthy(format!(
376 "{} unhealthy: {}",
377 unhealthy_count,
378 errors.join(", ")
379 ))
380 } else if degraded_count > 0 {
381 HealthCheckResult::degraded(format!("{} degraded", degraded_count))
382 } else {
383 HealthCheckResult::healthy()
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390
391 #[test]
392 fn test_health_check_result() {
393 let healthy = HealthCheckResult::healthy();
394 assert!(healthy.is_healthy());
395
396 let unhealthy = HealthCheckResult::unhealthy("test error");
397 assert!(unhealthy.is_unhealthy());
398 assert_eq!(unhealthy.error.as_deref(), Some("test error"));
399
400 let degraded = HealthCheckResult::degraded("test degradation");
401 assert!(degraded.is_degraded());
402 }
403
404 #[test]
405 fn test_health_details() {
406 let details = HealthDetails::new()
407 .with_queue_depth(100)
408 .with_error_rate(0.01)
409 .with_latency(150.0);
410
411 assert_eq!(details.queue_depth, Some(100));
412 assert_eq!(details.error_rate, Some(0.01));
413 assert_eq!(details.avg_latency_us, Some(150.0));
414 }
415
416 #[test]
417 fn test_aggregate_health_all_healthy() {
418 let results = vec![
419 HealthCheckResult::healthy(),
420 HealthCheckResult::healthy(),
421 HealthCheckResult::healthy(),
422 ];
423
424 let aggregate = aggregate_health(&results);
425 assert!(aggregate.is_healthy());
426 }
427
428 #[test]
429 fn test_aggregate_health_some_unhealthy() {
430 let results = vec![
431 HealthCheckResult::healthy(),
432 HealthCheckResult::unhealthy("kernel1 failed"),
433 HealthCheckResult::healthy(),
434 ];
435
436 let aggregate = aggregate_health(&results);
437 assert!(aggregate.is_unhealthy());
438 }
439
440 #[test]
441 fn test_aggregate_health_some_degraded() {
442 let results = vec![
443 HealthCheckResult::healthy(),
444 HealthCheckResult::degraded("kernel1 slow"),
445 HealthCheckResult::healthy(),
446 ];
447
448 let aggregate = aggregate_health(&results);
449 assert!(aggregate.is_degraded());
450 }
451
452 #[test]
453 fn test_health_probe_creation() {
454 let probe = HealthProbe::new("graph/pagerank")
455 .with_interval(Duration::from_secs(30))
456 .with_timeout(Duration::from_secs(10));
457
458 assert_eq!(probe.kernel_id(), "graph/pagerank");
459 assert_eq!(probe.config.interval, Duration::from_secs(30));
460 assert_eq!(probe.config.timeout, Duration::from_secs(10));
461 }
462}