Skip to main content

elara_runtime/
health_server.rs

1//! Health Check HTTP Server for ELARA Runtime
2//!
3//! This module provides a production-grade HTTP server for exposing health check
4//! endpoints. It is designed to integrate with Kubernetes liveness and readiness
5//! probes, load balancers, and monitoring systems.
6//!
7//! # Endpoints
8//!
9//! ## `/health` - Overall Health Status
10//!
11//! Returns the overall health status of the node, including all registered health
12//! checks. This endpoint is suitable for general health monitoring and alerting.
13//!
14//! **Response Codes:**
15//! - `200 OK` - Node is Healthy or Degraded (can serve traffic)
16//! - `503 Service Unavailable` - Node is Unhealthy (should not serve traffic)
17//!
18//! **Response Body:**
19//! ```json
20//! {
21//!   "status": "healthy" | "degraded" | "unhealthy",
22//!   "timestamp": "2024-01-15T10:30:00Z",
23//!   "checks": {
24//!     "connections": {
25//!       "status": "healthy",
26//!       "reason": null
27//!     },
28//!     "memory": {
29//!       "status": "healthy",
30//!       "reason": null
31//!     }
32//!   }
33//! }
34//! ```
35//!
36//! ## `/ready` - Readiness Probe
37//!
38//! Kubernetes readiness probe endpoint. Indicates whether the node is ready to
39//! accept traffic. A node may be alive but not ready (e.g., still initializing,
40//! warming up caches, establishing connections).
41//!
42//! **Response Codes:**
43//! - `200 OK` - Node is ready to accept traffic
44//! - `503 Service Unavailable` - Node is not ready
45//!
46//! ## `/live` - Liveness Probe
47//!
48//! Kubernetes liveness probe endpoint. Indicates whether the node is alive and
49//! functioning. If this check fails, Kubernetes will restart the pod.
50//!
51//! **Response Codes:**
52//! - `200 OK` - Node is alive
53//! - `503 Service Unavailable` - Node is deadlocked or unresponsive
54//!
55//! # Architecture
56//!
57//! The health server is designed to be:
58//! - **Non-blocking**: Uses async/await and Tokio runtime
59//! - **Fast**: Leverages cached health check results (no expensive checks on request)
60//! - **Lightweight**: Minimal overhead, suitable for high-frequency polling
61//! - **Production-ready**: Proper error handling, logging, and graceful shutdown
62//!
63//! # Example
64//!
65//! ```rust,no_run
66//! use elara_runtime::health::{HealthChecker, MemoryHealthCheck};
67//! use elara_runtime::health_server::{HealthServer, HealthServerConfig};
68//! use std::sync::Arc;
69//! use std::time::Duration;
70//!
71//! #[tokio::main]
72//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
73//!     // Create health checker
74//!     let mut checker = HealthChecker::new(Duration::from_secs(30));
75//!     checker.add_check(Box::new(MemoryHealthCheck::new(1800)));
76//!     let checker = Arc::new(checker);
77//!
78//!     // Configure and start health server
79//!     let config = HealthServerConfig {
80//!         bind_address: "0.0.0.0:8080".parse()?,
81//!     };
82//!
83//!     let server = HealthServer::new(checker, config);
84//!     server.serve().await?;
85//!
86//!     Ok(())
87//! }
88//! ```
89//!
90//! # Kubernetes Integration
91//!
92//! Example Kubernetes deployment configuration:
93//!
94//! ```yaml
95//! apiVersion: v1
96//! kind: Pod
97//! metadata:
98//!   name: elara-node
99//! spec:
100//!   containers:
101//!   - name: elara
102//!     image: elara-node:latest
103//!     ports:
104//!     - containerPort: 8080
105//!       name: health
106//!     livenessProbe:
107//!       httpGet:
108//!         path: /live
109//!         port: health
110//!       initialDelaySeconds: 30
111//!       periodSeconds: 10
112//!       timeoutSeconds: 5
113//!       failureThreshold: 3
114//!     readinessProbe:
115//!       httpGet:
116//!         path: /ready
117//!         port: health
118//!       initialDelaySeconds: 10
119//!       periodSeconds: 5
120//!       timeoutSeconds: 3
121//!       failureThreshold: 2
122//! ```
123
124use crate::health::{HealthChecker, HealthCheckResult, HealthStatus};
125use axum::{
126    extract::State,
127    http::StatusCode,
128    response::{IntoResponse, Response},
129    routing::get,
130    Json, Router,
131};
132use serde::{Deserialize, Serialize};
133use std::net::SocketAddr;
134use std::sync::Arc;
135use std::time::SystemTime;
136use tracing::{error, info, warn};
137
138/// Configuration for the health check HTTP server.
139#[derive(Debug, Clone)]
140pub struct HealthServerConfig {
141    /// Address to bind the HTTP server to (e.g., "0.0.0.0:8080")
142    pub bind_address: SocketAddr,
143}
144
145impl Default for HealthServerConfig {
146    fn default() -> Self {
147        Self {
148            bind_address: "0.0.0.0:8080".parse().unwrap(),
149        }
150    }
151}
152
153/// Health check HTTP server.
154///
155/// Provides HTTP endpoints for health monitoring, Kubernetes probes,
156/// and load balancer health checks.
157pub struct HealthServer {
158    /// Health checker instance
159    checker: Arc<HealthChecker>,
160    /// Server configuration
161    config: HealthServerConfig,
162}
163
164impl HealthServer {
165    /// Creates a new HealthServer.
166    ///
167    /// # Arguments
168    ///
169    /// * `checker` - Arc reference to the HealthChecker
170    /// * `config` - Server configuration
171    ///
172    /// # Example
173    ///
174    /// ```rust,no_run
175    /// use elara_runtime::health::HealthChecker;
176    /// use elara_runtime::health_server::{HealthServer, HealthServerConfig};
177    /// use std::sync::Arc;
178    /// use std::time::Duration;
179    ///
180    /// let checker = Arc::new(HealthChecker::new(Duration::from_secs(30)));
181    /// let config = HealthServerConfig::default();
182    /// let server = HealthServer::new(checker, config);
183    /// ```
184    pub fn new(checker: Arc<HealthChecker>, config: HealthServerConfig) -> Self {
185        Self { checker, config }
186    }
187    
188    /// Creates a new HealthServer with default configuration.
189    ///
190    /// Binds to `0.0.0.0:8080` by default.
191    pub fn with_default_config(checker: Arc<HealthChecker>) -> Self {
192        Self::new(checker, HealthServerConfig::default())
193    }
194    
195    /// Starts the health check HTTP server.
196    ///
197    /// This method runs the server until it is shut down. It should be
198    /// spawned as a background task in production deployments.
199    ///
200    /// # Returns
201    ///
202    /// Returns `Ok(())` if the server shuts down gracefully, or an error
203    /// if the server fails to start or encounters a fatal error.
204    ///
205    /// # Example
206    ///
207    /// ```rust,no_run
208    /// use elara_runtime::health::HealthChecker;
209    /// use elara_runtime::health_server::HealthServer;
210    /// use std::sync::Arc;
211    /// use std::time::Duration;
212    ///
213    /// #[tokio::main]
214    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
215    ///     let checker = Arc::new(HealthChecker::new(Duration::from_secs(30)));
216    ///     let server = HealthServer::with_default_config(checker);
217    ///     
218    ///     // Run server (blocks until shutdown)
219    ///     server.serve().await?;
220    ///     
221    ///     Ok(())
222    /// }
223    /// ```
224    pub async fn serve(self) -> Result<(), std::io::Error> {
225        let app = self.create_router();
226        let addr = self.config.bind_address;
227        
228        info!("Starting health check server on {}", addr);
229        
230        let listener = tokio::net::TcpListener::bind(addr).await?;
231        
232        info!("Health check server listening on {}", addr);
233        info!("  - /health - Overall health status");
234        info!("  - /ready  - Readiness probe");
235        info!("  - /live   - Liveness probe");
236        
237        axum::serve(listener, app).await?;
238        
239        info!("Health check server shut down");
240        Ok(())
241    }
242    
243    /// Creates the Axum router with all health check endpoints.
244    ///
245    /// This method is public to allow testing and custom server configurations.
246    pub fn create_router(&self) -> Router {
247        Router::new()
248            .route("/health", get(health_handler))
249            .route("/ready", get(ready_handler))
250            .route("/live", get(live_handler))
251            .with_state(self.checker.clone())
252    }
253}
254
255/// JSON response for health check endpoints.
256#[derive(Debug, Clone, Serialize, Deserialize)]
257pub struct HealthResponse {
258    /// Overall status: "healthy", "degraded", or "unhealthy"
259    pub status: String,
260    
261    /// ISO 8601 timestamp when the health check was performed
262    pub timestamp: String,
263    
264    /// Individual health check results
265    pub checks: std::collections::HashMap<String, CheckResponse>,
266}
267
268/// JSON response for an individual health check.
269#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct CheckResponse {
271    /// Status: "healthy", "degraded", or "unhealthy"
272    pub status: String,
273    
274    /// Optional reason for degraded/unhealthy status
275    #[serde(skip_serializing_if = "Option::is_none")]
276    pub reason: Option<String>,
277}
278
279impl From<&HealthStatus> for HealthResponse {
280    fn from(status: &HealthStatus) -> Self {
281        let checks = status
282            .checks
283            .iter()
284            .map(|(name, result)| {
285                let check_response = CheckResponse {
286                    status: result_to_status_string(result),
287                    reason: result.reason().map(|s| s.to_string()),
288                };
289                (name.clone(), check_response)
290            })
291            .collect();
292        
293        Self {
294            status: result_to_status_string(&status.overall),
295            timestamp: format_timestamp(status.timestamp),
296            checks,
297        }
298    }
299}
300
301/// Converts a HealthCheckResult to a status string.
302fn result_to_status_string(result: &HealthCheckResult) -> String {
303    match result {
304        HealthCheckResult::Healthy => "healthy".to_string(),
305        HealthCheckResult::Degraded { .. } => "degraded".to_string(),
306        HealthCheckResult::Unhealthy { .. } => "unhealthy".to_string(),
307    }
308}
309
310/// Formats a timestamp as ISO 8601 string.
311fn format_timestamp(instant: std::time::Instant) -> String {
312    // Convert Instant to SystemTime for ISO 8601 formatting
313    // Note: This is an approximation since Instant doesn't have a fixed epoch
314    let now = SystemTime::now();
315    let elapsed = instant.elapsed();
316    
317    // Subtract elapsed time from now to get approximate timestamp
318    let timestamp = now
319        .checked_sub(elapsed)
320        .unwrap_or(now);
321    
322    // Format as ISO 8601
323    humantime::format_rfc3339(timestamp).to_string()
324}
325
326/// Determines the HTTP status code based on health check result.
327fn result_to_status_code(result: &HealthCheckResult) -> StatusCode {
328    match result {
329        HealthCheckResult::Healthy => StatusCode::OK,
330        HealthCheckResult::Degraded { .. } => StatusCode::OK,
331        HealthCheckResult::Unhealthy { .. } => StatusCode::SERVICE_UNAVAILABLE,
332    }
333}
334
335/// Handler for `/health` endpoint - Overall health status.
336///
337/// Returns the complete health status including all registered checks.
338/// This endpoint is suitable for general health monitoring and alerting.
339///
340/// **Response Codes:**
341/// - `200 OK` - Node is Healthy or Degraded
342/// - `503 Service Unavailable` - Node is Unhealthy
343async fn health_handler(
344    State(checker): State<Arc<HealthChecker>>,
345) -> Response {
346    let status = checker.check_health();
347    let status_code = result_to_status_code(&status.overall);
348    let response = HealthResponse::from(&status);
349    
350    // Log unhealthy status for monitoring
351    if status.is_unhealthy() {
352        warn!(
353            status = "unhealthy",
354            reason = ?status.overall.reason(),
355            "Health check failed"
356        );
357    } else if status.is_degraded() {
358        warn!(
359            status = "degraded",
360            reason = ?status.overall.reason(),
361            "Health check degraded"
362        );
363    }
364    
365    (status_code, Json(response)).into_response()
366}
367
368/// Handler for `/ready` endpoint - Readiness probe.
369///
370/// Kubernetes readiness probe endpoint. Indicates whether the node is ready
371/// to accept traffic. A node may be alive but not ready (e.g., still
372/// initializing, warming up caches, establishing connections).
373///
374/// **Response Codes:**
375/// - `200 OK` - Node is ready to accept traffic
376/// - `503 Service Unavailable` - Node is not ready
377///
378/// **Implementation Note:**
379/// Currently uses the same logic as `/health`. In a production deployment,
380/// you may want to implement separate readiness checks that verify:
381/// - All required connections are established
382/// - Caches are warmed up
383/// - Initial state synchronization is complete
384async fn ready_handler(
385    State(checker): State<Arc<HealthChecker>>,
386) -> Response {
387    let status = checker.check_health();
388    let status_code = result_to_status_code(&status.overall);
389    let response = HealthResponse::from(&status);
390    
391    if status.is_unhealthy() {
392        warn!("Readiness probe failed: node not ready");
393    }
394    
395    (status_code, Json(response)).into_response()
396}
397
398/// Handler for `/live` endpoint - Liveness probe.
399///
400/// Kubernetes liveness probe endpoint. Indicates whether the node is alive
401/// and functioning. If this check fails, Kubernetes will restart the pod.
402///
403/// **Response Codes:**
404/// - `200 OK` - Node is alive
405/// - `503 Service Unavailable` - Node is deadlocked or unresponsive
406///
407/// **Implementation Note:**
408/// Currently uses the same logic as `/health`. In a production deployment,
409/// you may want to implement separate liveness checks that verify:
410/// - Event loop is not deadlocked
411/// - Critical threads are responsive
412/// - No fatal errors have occurred
413///
414/// Liveness checks should be more lenient than readiness checks to avoid
415/// unnecessary restarts.
416async fn live_handler(
417    State(checker): State<Arc<HealthChecker>>,
418) -> Response {
419    let status = checker.check_health();
420    
421    // For liveness, we're more lenient - only fail if truly unhealthy
422    // Degraded status is still considered "alive"
423    let status_code = if status.is_unhealthy() {
424        error!("Liveness probe failed: node unhealthy");
425        StatusCode::SERVICE_UNAVAILABLE
426    } else {
427        StatusCode::OK
428    };
429    
430    let response = HealthResponse::from(&status);
431    
432    (status_code, Json(response)).into_response()
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438    use crate::health::{HealthCheck, HealthCheckResult};
439    use std::time::Duration;
440    
441    struct TestHealthyCheck;
442    impl HealthCheck for TestHealthyCheck {
443        fn name(&self) -> &str {
444            "test_healthy"
445        }
446        fn check(&self) -> HealthCheckResult {
447            HealthCheckResult::Healthy
448        }
449    }
450    
451    struct TestDegradedCheck;
452    impl HealthCheck for TestDegradedCheck {
453        fn name(&self) -> &str {
454            "test_degraded"
455        }
456        fn check(&self) -> HealthCheckResult {
457            HealthCheckResult::Degraded {
458                reason: "Test degradation".to_string(),
459            }
460        }
461    }
462    
463    struct TestUnhealthyCheck;
464    impl HealthCheck for TestUnhealthyCheck {
465        fn name(&self) -> &str {
466            "test_unhealthy"
467        }
468        fn check(&self) -> HealthCheckResult {
469            HealthCheckResult::Unhealthy {
470                reason: "Test failure".to_string(),
471            }
472        }
473    }
474    
475    #[test]
476    fn test_result_to_status_string() {
477        assert_eq!(
478            result_to_status_string(&HealthCheckResult::Healthy),
479            "healthy"
480        );
481        assert_eq!(
482            result_to_status_string(&HealthCheckResult::Degraded {
483                reason: "test".to_string()
484            }),
485            "degraded"
486        );
487        assert_eq!(
488            result_to_status_string(&HealthCheckResult::Unhealthy {
489                reason: "test".to_string()
490            }),
491            "unhealthy"
492        );
493    }
494    
495    #[test]
496    fn test_result_to_status_code() {
497        assert_eq!(
498            result_to_status_code(&HealthCheckResult::Healthy),
499            StatusCode::OK
500        );
501        assert_eq!(
502            result_to_status_code(&HealthCheckResult::Degraded {
503                reason: "test".to_string()
504            }),
505            StatusCode::OK
506        );
507        assert_eq!(
508            result_to_status_code(&HealthCheckResult::Unhealthy {
509                reason: "test".to_string()
510            }),
511            StatusCode::SERVICE_UNAVAILABLE
512        );
513    }
514    
515    #[test]
516    fn test_health_response_from_status() {
517        let mut checker = HealthChecker::new(Duration::from_secs(30));
518        checker.add_check(Box::new(TestHealthyCheck));
519        checker.add_check(Box::new(TestDegradedCheck));
520        
521        let status = checker.check_health();
522        let response = HealthResponse::from(&status);
523        
524        assert_eq!(response.status, "degraded");
525        assert_eq!(response.checks.len(), 2);
526        assert_eq!(response.checks["test_healthy"].status, "healthy");
527        assert_eq!(response.checks["test_degraded"].status, "degraded");
528        assert_eq!(
529            response.checks["test_degraded"].reason,
530            Some("Test degradation".to_string())
531        );
532    }
533    
534    #[tokio::test]
535    async fn test_health_handler_healthy() {
536        let mut checker = HealthChecker::new(Duration::from_secs(30));
537        checker.add_check(Box::new(TestHealthyCheck));
538        let checker = Arc::new(checker);
539        
540        let response = health_handler(State(checker)).await;
541        let status = response.status();
542        
543        assert_eq!(status, StatusCode::OK);
544    }
545    
546    #[tokio::test]
547    async fn test_health_handler_degraded() {
548        let mut checker = HealthChecker::new(Duration::from_secs(30));
549        checker.add_check(Box::new(TestDegradedCheck));
550        let checker = Arc::new(checker);
551        
552        let response = health_handler(State(checker)).await;
553        let status = response.status();
554        
555        // Degraded still returns 200 OK
556        assert_eq!(status, StatusCode::OK);
557    }
558    
559    #[tokio::test]
560    async fn test_health_handler_unhealthy() {
561        let mut checker = HealthChecker::new(Duration::from_secs(30));
562        checker.add_check(Box::new(TestUnhealthyCheck));
563        let checker = Arc::new(checker);
564        
565        let response = health_handler(State(checker)).await;
566        let status = response.status();
567        
568        assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
569    }
570    
571    #[tokio::test]
572    async fn test_ready_handler() {
573        let mut checker = HealthChecker::new(Duration::from_secs(30));
574        checker.add_check(Box::new(TestHealthyCheck));
575        let checker = Arc::new(checker);
576        
577        let response = ready_handler(State(checker)).await;
578        let status = response.status();
579        
580        assert_eq!(status, StatusCode::OK);
581    }
582    
583    #[tokio::test]
584    async fn test_live_handler_degraded_is_alive() {
585        let mut checker = HealthChecker::new(Duration::from_secs(30));
586        checker.add_check(Box::new(TestDegradedCheck));
587        let checker = Arc::new(checker);
588        
589        let response = live_handler(State(checker)).await;
590        let status = response.status();
591        
592        // Degraded is still considered "alive" for liveness probe
593        assert_eq!(status, StatusCode::OK);
594    }
595    
596    #[tokio::test]
597    async fn test_live_handler_unhealthy() {
598        let mut checker = HealthChecker::new(Duration::from_secs(30));
599        checker.add_check(Box::new(TestUnhealthyCheck));
600        let checker = Arc::new(checker);
601        
602        let response = live_handler(State(checker)).await;
603        let status = response.status();
604        
605        assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
606    }
607    
608    #[test]
609    fn test_health_server_config_default() {
610        let config = HealthServerConfig::default();
611        assert_eq!(config.bind_address.to_string(), "0.0.0.0:8080");
612    }
613    
614    #[test]
615    fn test_health_server_creation() {
616        let checker = Arc::new(HealthChecker::new(Duration::from_secs(30)));
617        let config = HealthServerConfig::default();
618        let _server = HealthServer::new(checker, config);
619    }
620}