elara_runtime/health_server.rs
1//! Health Check HTTP Server for ELARA Runtime
2//!
3//! This module provides a production-grade HTTP server for exposing health check
4//! endpoints. It is designed to integrate with Kubernetes liveness and readiness
5//! probes, load balancers, and monitoring systems.
6//!
7//! # Endpoints
8//!
9//! ## `/health` - Overall Health Status
10//!
11//! Returns the overall health status of the node, including all registered health
12//! checks. This endpoint is suitable for general health monitoring and alerting.
13//!
14//! **Response Codes:**
15//! - `200 OK` - Node is Healthy or Degraded (can serve traffic)
16//! - `503 Service Unavailable` - Node is Unhealthy (should not serve traffic)
17//!
18//! **Response Body:**
19//! ```json
20//! {
21//! "status": "healthy" | "degraded" | "unhealthy",
22//! "timestamp": "2024-01-15T10:30:00Z",
23//! "checks": {
24//! "connections": {
25//! "status": "healthy",
26//! "reason": null
27//! },
28//! "memory": {
29//! "status": "healthy",
30//! "reason": null
31//! }
32//! }
33//! }
34//! ```
35//!
36//! ## `/ready` - Readiness Probe
37//!
38//! Kubernetes readiness probe endpoint. Indicates whether the node is ready to
39//! accept traffic. A node may be alive but not ready (e.g., still initializing,
40//! warming up caches, establishing connections).
41//!
42//! **Response Codes:**
43//! - `200 OK` - Node is ready to accept traffic
44//! - `503 Service Unavailable` - Node is not ready
45//!
46//! ## `/live` - Liveness Probe
47//!
48//! Kubernetes liveness probe endpoint. Indicates whether the node is alive and
49//! functioning. If this check fails, Kubernetes will restart the pod.
50//!
51//! **Response Codes:**
52//! - `200 OK` - Node is alive
53//! - `503 Service Unavailable` - Node is deadlocked or unresponsive
54//!
55//! # Architecture
56//!
57//! The health server is designed to be:
58//! - **Non-blocking**: Uses async/await and Tokio runtime
59//! - **Fast**: Leverages cached health check results (no expensive checks on request)
60//! - **Lightweight**: Minimal overhead, suitable for high-frequency polling
61//! - **Production-ready**: Proper error handling, logging, and graceful shutdown
62//!
63//! # Example
64//!
65//! ```rust,no_run
66//! use elara_runtime::health::{HealthChecker, MemoryHealthCheck};
67//! use elara_runtime::health_server::{HealthServer, HealthServerConfig};
68//! use std::sync::Arc;
69//! use std::time::Duration;
70//!
71//! #[tokio::main]
72//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
73//! // Create health checker
74//! let mut checker = HealthChecker::new(Duration::from_secs(30));
75//! checker.add_check(Box::new(MemoryHealthCheck::new(1800)));
76//! let checker = Arc::new(checker);
77//!
78//! // Configure and start health server
79//! let config = HealthServerConfig {
80//! bind_address: "0.0.0.0:8080".parse()?,
81//! };
82//!
83//! let server = HealthServer::new(checker, config);
84//! server.serve().await?;
85//!
86//! Ok(())
87//! }
88//! ```
89//!
90//! # Kubernetes Integration
91//!
92//! Example Kubernetes deployment configuration:
93//!
94//! ```yaml
95//! apiVersion: v1
96//! kind: Pod
97//! metadata:
98//! name: elara-node
99//! spec:
100//! containers:
101//! - name: elara
102//! image: elara-node:latest
103//! ports:
104//! - containerPort: 8080
105//! name: health
106//! livenessProbe:
107//! httpGet:
108//! path: /live
109//! port: health
110//! initialDelaySeconds: 30
111//! periodSeconds: 10
112//! timeoutSeconds: 5
113//! failureThreshold: 3
114//! readinessProbe:
115//! httpGet:
116//! path: /ready
117//! port: health
118//! initialDelaySeconds: 10
119//! periodSeconds: 5
120//! timeoutSeconds: 3
121//! failureThreshold: 2
122//! ```
123
124use crate::health::{HealthChecker, HealthCheckResult, HealthStatus};
125use axum::{
126 extract::State,
127 http::StatusCode,
128 response::{IntoResponse, Response},
129 routing::get,
130 Json, Router,
131};
132use serde::{Deserialize, Serialize};
133use std::net::SocketAddr;
134use std::sync::Arc;
135use std::time::SystemTime;
136use tracing::{error, info, warn};
137
138/// Configuration for the health check HTTP server.
139#[derive(Debug, Clone)]
140pub struct HealthServerConfig {
141 /// Address to bind the HTTP server to (e.g., "0.0.0.0:8080")
142 pub bind_address: SocketAddr,
143}
144
145impl Default for HealthServerConfig {
146 fn default() -> Self {
147 Self {
148 bind_address: "0.0.0.0:8080".parse().unwrap(),
149 }
150 }
151}
152
153/// Health check HTTP server.
154///
155/// Provides HTTP endpoints for health monitoring, Kubernetes probes,
156/// and load balancer health checks.
157pub struct HealthServer {
158 /// Health checker instance
159 checker: Arc<HealthChecker>,
160 /// Server configuration
161 config: HealthServerConfig,
162}
163
164impl HealthServer {
165 /// Creates a new HealthServer.
166 ///
167 /// # Arguments
168 ///
169 /// * `checker` - Arc reference to the HealthChecker
170 /// * `config` - Server configuration
171 ///
172 /// # Example
173 ///
174 /// ```rust,no_run
175 /// use elara_runtime::health::HealthChecker;
176 /// use elara_runtime::health_server::{HealthServer, HealthServerConfig};
177 /// use std::sync::Arc;
178 /// use std::time::Duration;
179 ///
180 /// let checker = Arc::new(HealthChecker::new(Duration::from_secs(30)));
181 /// let config = HealthServerConfig::default();
182 /// let server = HealthServer::new(checker, config);
183 /// ```
184 pub fn new(checker: Arc<HealthChecker>, config: HealthServerConfig) -> Self {
185 Self { checker, config }
186 }
187
188 /// Creates a new HealthServer with default configuration.
189 ///
190 /// Binds to `0.0.0.0:8080` by default.
191 pub fn with_default_config(checker: Arc<HealthChecker>) -> Self {
192 Self::new(checker, HealthServerConfig::default())
193 }
194
195 /// Starts the health check HTTP server.
196 ///
197 /// This method runs the server until it is shut down. It should be
198 /// spawned as a background task in production deployments.
199 ///
200 /// # Returns
201 ///
202 /// Returns `Ok(())` if the server shuts down gracefully, or an error
203 /// if the server fails to start or encounters a fatal error.
204 ///
205 /// # Example
206 ///
207 /// ```rust,no_run
208 /// use elara_runtime::health::HealthChecker;
209 /// use elara_runtime::health_server::HealthServer;
210 /// use std::sync::Arc;
211 /// use std::time::Duration;
212 ///
213 /// #[tokio::main]
214 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
215 /// let checker = Arc::new(HealthChecker::new(Duration::from_secs(30)));
216 /// let server = HealthServer::with_default_config(checker);
217 ///
218 /// // Run server (blocks until shutdown)
219 /// server.serve().await?;
220 ///
221 /// Ok(())
222 /// }
223 /// ```
224 pub async fn serve(self) -> Result<(), std::io::Error> {
225 let app = self.create_router();
226 let addr = self.config.bind_address;
227
228 info!("Starting health check server on {}", addr);
229
230 let listener = tokio::net::TcpListener::bind(addr).await?;
231
232 info!("Health check server listening on {}", addr);
233 info!(" - /health - Overall health status");
234 info!(" - /ready - Readiness probe");
235 info!(" - /live - Liveness probe");
236
237 axum::serve(listener, app).await?;
238
239 info!("Health check server shut down");
240 Ok(())
241 }
242
243 /// Creates the Axum router with all health check endpoints.
244 ///
245 /// This method is public to allow testing and custom server configurations.
246 pub fn create_router(&self) -> Router {
247 Router::new()
248 .route("/health", get(health_handler))
249 .route("/ready", get(ready_handler))
250 .route("/live", get(live_handler))
251 .with_state(self.checker.clone())
252 }
253}
254
255/// JSON response for health check endpoints.
256#[derive(Debug, Clone, Serialize, Deserialize)]
257pub struct HealthResponse {
258 /// Overall status: "healthy", "degraded", or "unhealthy"
259 pub status: String,
260
261 /// ISO 8601 timestamp when the health check was performed
262 pub timestamp: String,
263
264 /// Individual health check results
265 pub checks: std::collections::HashMap<String, CheckResponse>,
266}
267
268/// JSON response for an individual health check.
269#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct CheckResponse {
271 /// Status: "healthy", "degraded", or "unhealthy"
272 pub status: String,
273
274 /// Optional reason for degraded/unhealthy status
275 #[serde(skip_serializing_if = "Option::is_none")]
276 pub reason: Option<String>,
277}
278
279impl From<&HealthStatus> for HealthResponse {
280 fn from(status: &HealthStatus) -> Self {
281 let checks = status
282 .checks
283 .iter()
284 .map(|(name, result)| {
285 let check_response = CheckResponse {
286 status: result_to_status_string(result),
287 reason: result.reason().map(|s| s.to_string()),
288 };
289 (name.clone(), check_response)
290 })
291 .collect();
292
293 Self {
294 status: result_to_status_string(&status.overall),
295 timestamp: format_timestamp(status.timestamp),
296 checks,
297 }
298 }
299}
300
301/// Converts a HealthCheckResult to a status string.
302fn result_to_status_string(result: &HealthCheckResult) -> String {
303 match result {
304 HealthCheckResult::Healthy => "healthy".to_string(),
305 HealthCheckResult::Degraded { .. } => "degraded".to_string(),
306 HealthCheckResult::Unhealthy { .. } => "unhealthy".to_string(),
307 }
308}
309
310/// Formats a timestamp as ISO 8601 string.
311fn format_timestamp(instant: std::time::Instant) -> String {
312 // Convert Instant to SystemTime for ISO 8601 formatting
313 // Note: This is an approximation since Instant doesn't have a fixed epoch
314 let now = SystemTime::now();
315 let elapsed = instant.elapsed();
316
317 // Subtract elapsed time from now to get approximate timestamp
318 let timestamp = now
319 .checked_sub(elapsed)
320 .unwrap_or(now);
321
322 // Format as ISO 8601
323 humantime::format_rfc3339(timestamp).to_string()
324}
325
326/// Determines the HTTP status code based on health check result.
327fn result_to_status_code(result: &HealthCheckResult) -> StatusCode {
328 match result {
329 HealthCheckResult::Healthy => StatusCode::OK,
330 HealthCheckResult::Degraded { .. } => StatusCode::OK,
331 HealthCheckResult::Unhealthy { .. } => StatusCode::SERVICE_UNAVAILABLE,
332 }
333}
334
335/// Handler for `/health` endpoint - Overall health status.
336///
337/// Returns the complete health status including all registered checks.
338/// This endpoint is suitable for general health monitoring and alerting.
339///
340/// **Response Codes:**
341/// - `200 OK` - Node is Healthy or Degraded
342/// - `503 Service Unavailable` - Node is Unhealthy
343async fn health_handler(
344 State(checker): State<Arc<HealthChecker>>,
345) -> Response {
346 let status = checker.check_health();
347 let status_code = result_to_status_code(&status.overall);
348 let response = HealthResponse::from(&status);
349
350 // Log unhealthy status for monitoring
351 if status.is_unhealthy() {
352 warn!(
353 status = "unhealthy",
354 reason = ?status.overall.reason(),
355 "Health check failed"
356 );
357 } else if status.is_degraded() {
358 warn!(
359 status = "degraded",
360 reason = ?status.overall.reason(),
361 "Health check degraded"
362 );
363 }
364
365 (status_code, Json(response)).into_response()
366}
367
368/// Handler for `/ready` endpoint - Readiness probe.
369///
370/// Kubernetes readiness probe endpoint. Indicates whether the node is ready
371/// to accept traffic. A node may be alive but not ready (e.g., still
372/// initializing, warming up caches, establishing connections).
373///
374/// **Response Codes:**
375/// - `200 OK` - Node is ready to accept traffic
376/// - `503 Service Unavailable` - Node is not ready
377///
378/// **Implementation Note:**
379/// Currently uses the same logic as `/health`. In a production deployment,
380/// you may want to implement separate readiness checks that verify:
381/// - All required connections are established
382/// - Caches are warmed up
383/// - Initial state synchronization is complete
384async fn ready_handler(
385 State(checker): State<Arc<HealthChecker>>,
386) -> Response {
387 let status = checker.check_health();
388 let status_code = result_to_status_code(&status.overall);
389 let response = HealthResponse::from(&status);
390
391 if status.is_unhealthy() {
392 warn!("Readiness probe failed: node not ready");
393 }
394
395 (status_code, Json(response)).into_response()
396}
397
398/// Handler for `/live` endpoint - Liveness probe.
399///
400/// Kubernetes liveness probe endpoint. Indicates whether the node is alive
401/// and functioning. If this check fails, Kubernetes will restart the pod.
402///
403/// **Response Codes:**
404/// - `200 OK` - Node is alive
405/// - `503 Service Unavailable` - Node is deadlocked or unresponsive
406///
407/// **Implementation Note:**
408/// Currently uses the same logic as `/health`. In a production deployment,
409/// you may want to implement separate liveness checks that verify:
410/// - Event loop is not deadlocked
411/// - Critical threads are responsive
412/// - No fatal errors have occurred
413///
414/// Liveness checks should be more lenient than readiness checks to avoid
415/// unnecessary restarts.
416async fn live_handler(
417 State(checker): State<Arc<HealthChecker>>,
418) -> Response {
419 let status = checker.check_health();
420
421 // For liveness, we're more lenient - only fail if truly unhealthy
422 // Degraded status is still considered "alive"
423 let status_code = if status.is_unhealthy() {
424 error!("Liveness probe failed: node unhealthy");
425 StatusCode::SERVICE_UNAVAILABLE
426 } else {
427 StatusCode::OK
428 };
429
430 let response = HealthResponse::from(&status);
431
432 (status_code, Json(response)).into_response()
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438 use crate::health::{HealthCheck, HealthCheckResult};
439 use std::time::Duration;
440
441 struct TestHealthyCheck;
442 impl HealthCheck for TestHealthyCheck {
443 fn name(&self) -> &str {
444 "test_healthy"
445 }
446 fn check(&self) -> HealthCheckResult {
447 HealthCheckResult::Healthy
448 }
449 }
450
451 struct TestDegradedCheck;
452 impl HealthCheck for TestDegradedCheck {
453 fn name(&self) -> &str {
454 "test_degraded"
455 }
456 fn check(&self) -> HealthCheckResult {
457 HealthCheckResult::Degraded {
458 reason: "Test degradation".to_string(),
459 }
460 }
461 }
462
463 struct TestUnhealthyCheck;
464 impl HealthCheck for TestUnhealthyCheck {
465 fn name(&self) -> &str {
466 "test_unhealthy"
467 }
468 fn check(&self) -> HealthCheckResult {
469 HealthCheckResult::Unhealthy {
470 reason: "Test failure".to_string(),
471 }
472 }
473 }
474
475 #[test]
476 fn test_result_to_status_string() {
477 assert_eq!(
478 result_to_status_string(&HealthCheckResult::Healthy),
479 "healthy"
480 );
481 assert_eq!(
482 result_to_status_string(&HealthCheckResult::Degraded {
483 reason: "test".to_string()
484 }),
485 "degraded"
486 );
487 assert_eq!(
488 result_to_status_string(&HealthCheckResult::Unhealthy {
489 reason: "test".to_string()
490 }),
491 "unhealthy"
492 );
493 }
494
495 #[test]
496 fn test_result_to_status_code() {
497 assert_eq!(
498 result_to_status_code(&HealthCheckResult::Healthy),
499 StatusCode::OK
500 );
501 assert_eq!(
502 result_to_status_code(&HealthCheckResult::Degraded {
503 reason: "test".to_string()
504 }),
505 StatusCode::OK
506 );
507 assert_eq!(
508 result_to_status_code(&HealthCheckResult::Unhealthy {
509 reason: "test".to_string()
510 }),
511 StatusCode::SERVICE_UNAVAILABLE
512 );
513 }
514
515 #[test]
516 fn test_health_response_from_status() {
517 let mut checker = HealthChecker::new(Duration::from_secs(30));
518 checker.add_check(Box::new(TestHealthyCheck));
519 checker.add_check(Box::new(TestDegradedCheck));
520
521 let status = checker.check_health();
522 let response = HealthResponse::from(&status);
523
524 assert_eq!(response.status, "degraded");
525 assert_eq!(response.checks.len(), 2);
526 assert_eq!(response.checks["test_healthy"].status, "healthy");
527 assert_eq!(response.checks["test_degraded"].status, "degraded");
528 assert_eq!(
529 response.checks["test_degraded"].reason,
530 Some("Test degradation".to_string())
531 );
532 }
533
534 #[tokio::test]
535 async fn test_health_handler_healthy() {
536 let mut checker = HealthChecker::new(Duration::from_secs(30));
537 checker.add_check(Box::new(TestHealthyCheck));
538 let checker = Arc::new(checker);
539
540 let response = health_handler(State(checker)).await;
541 let status = response.status();
542
543 assert_eq!(status, StatusCode::OK);
544 }
545
546 #[tokio::test]
547 async fn test_health_handler_degraded() {
548 let mut checker = HealthChecker::new(Duration::from_secs(30));
549 checker.add_check(Box::new(TestDegradedCheck));
550 let checker = Arc::new(checker);
551
552 let response = health_handler(State(checker)).await;
553 let status = response.status();
554
555 // Degraded still returns 200 OK
556 assert_eq!(status, StatusCode::OK);
557 }
558
559 #[tokio::test]
560 async fn test_health_handler_unhealthy() {
561 let mut checker = HealthChecker::new(Duration::from_secs(30));
562 checker.add_check(Box::new(TestUnhealthyCheck));
563 let checker = Arc::new(checker);
564
565 let response = health_handler(State(checker)).await;
566 let status = response.status();
567
568 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
569 }
570
571 #[tokio::test]
572 async fn test_ready_handler() {
573 let mut checker = HealthChecker::new(Duration::from_secs(30));
574 checker.add_check(Box::new(TestHealthyCheck));
575 let checker = Arc::new(checker);
576
577 let response = ready_handler(State(checker)).await;
578 let status = response.status();
579
580 assert_eq!(status, StatusCode::OK);
581 }
582
583 #[tokio::test]
584 async fn test_live_handler_degraded_is_alive() {
585 let mut checker = HealthChecker::new(Duration::from_secs(30));
586 checker.add_check(Box::new(TestDegradedCheck));
587 let checker = Arc::new(checker);
588
589 let response = live_handler(State(checker)).await;
590 let status = response.status();
591
592 // Degraded is still considered "alive" for liveness probe
593 assert_eq!(status, StatusCode::OK);
594 }
595
596 #[tokio::test]
597 async fn test_live_handler_unhealthy() {
598 let mut checker = HealthChecker::new(Duration::from_secs(30));
599 checker.add_check(Box::new(TestUnhealthyCheck));
600 let checker = Arc::new(checker);
601
602 let response = live_handler(State(checker)).await;
603 let status = response.status();
604
605 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
606 }
607
608 #[test]
609 fn test_health_server_config_default() {
610 let config = HealthServerConfig::default();
611 assert_eq!(config.bind_address.to_string(), "0.0.0.0:8080");
612 }
613
614 #[test]
615 fn test_health_server_creation() {
616 let checker = Arc::new(HealthChecker::new(Duration::from_secs(30)));
617 let config = HealthServerConfig::default();
618 let _server = HealthServer::new(checker, config);
619 }
620}