allframe_core/health/
server.rs

1//! HTTP health check server
2//!
3//! Provides a simple HTTP server that exposes health check endpoints.
4
5use std::{convert::Infallible, future::Future, net::SocketAddr, pin::Pin, sync::Arc};
6
7use hyper::{
8    body::Incoming, server::conn::http1, service::service_fn, Method, Request, Response, StatusCode,
9};
10use hyper_util::rt::TokioIo;
11use tokio::net::TcpListener;
12
13use super::{HealthCheck, HealthReport};
14
15/// HTTP health check server
16///
17/// Exposes `/health` and `/ready` endpoints for health monitoring.
18pub struct HealthServer<H: HealthCheck + 'static> {
19    health: Arc<H>,
20    addr: SocketAddr,
21}
22
23impl<H: HealthCheck + 'static> HealthServer<H> {
24    /// Create a new health server
25    pub fn new(health: H) -> Self {
26        Self {
27            health: Arc::new(health),
28            addr: ([0, 0, 0, 0], 8081).into(),
29        }
30    }
31
32    /// Set the address to bind to
33    pub fn addr(mut self, addr: impl Into<SocketAddr>) -> Self {
34        self.addr = addr.into();
35        self
36    }
37
38    /// Set the port (binds to 0.0.0.0)
39    pub fn port(mut self, port: u16) -> Self {
40        self.addr = ([0, 0, 0, 0], port).into();
41        self
42    }
43
44    /// Start the health server
45    ///
46    /// This will block until the server is shut down.
47    pub async fn serve(self) -> Result<(), HealthServerError> {
48        let listener = TcpListener::bind(self.addr)
49            .await
50            .map_err(|e| HealthServerError::Bind(e.to_string()))?;
51
52        loop {
53            let (stream, _) = listener
54                .accept()
55                .await
56                .map_err(|e| HealthServerError::Accept(e.to_string()))?;
57
58            let io = TokioIo::new(stream);
59            let health = Arc::clone(&self.health);
60
61            tokio::spawn(async move {
62                let service = service_fn(move |req| {
63                    let health = Arc::clone(&health);
64                    async move { handle_request(req, health).await }
65                });
66
67                if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
68                    // Connection errors are usually not fatal (client disconnected, etc.)
69                    if !e.is_incomplete_message() {
70                        #[cfg(feature = "otel")]
71                        tracing::debug!(error = %e, "Health server connection error");
72                    }
73                }
74            });
75        }
76    }
77
78    /// Start the health server with graceful shutdown
79    pub async fn serve_with_shutdown<F>(self, shutdown: F) -> Result<(), HealthServerError>
80    where
81        F: Future<Output = ()> + Send,
82    {
83        let listener = TcpListener::bind(self.addr)
84            .await
85            .map_err(|e| HealthServerError::Bind(e.to_string()))?;
86
87        tokio::pin!(shutdown);
88
89        loop {
90            tokio::select! {
91                _ = &mut shutdown => {
92                    return Ok(());
93                }
94                result = listener.accept() => {
95                    let (stream, _) = result
96                        .map_err(|e| HealthServerError::Accept(e.to_string()))?;
97
98                    let io = TokioIo::new(stream);
99                    let health = Arc::clone(&self.health);
100
101                    tokio::spawn(async move {
102                        let service = service_fn(move |req| {
103                            let health = Arc::clone(&health);
104                            async move { handle_request(req, health).await }
105                        });
106
107                        let _ = http1::Builder::new().serve_connection(io, service).await;
108                    });
109                }
110            }
111        }
112    }
113
114    /// Get a health report without starting the server
115    pub fn check(&self) -> Pin<Box<dyn Future<Output = HealthReport> + Send + '_>> {
116        self.health.check_all()
117    }
118}
119
120async fn handle_request<H: HealthCheck>(
121    req: Request<Incoming>,
122    health: Arc<H>,
123) -> Result<Response<String>, Infallible> {
124    let response = match (req.method(), req.uri().path()) {
125        (&Method::GET, "/health") | (&Method::GET, "/healthz") => {
126            let report = health.check_all().await;
127            let status_code = match report.status.http_status_code() {
128                200 => StatusCode::OK,
129                503 => StatusCode::SERVICE_UNAVAILABLE,
130                _ => StatusCode::INTERNAL_SERVER_ERROR,
131            };
132            let body = report
133                .to_json()
134                .unwrap_or_else(|_| r#"{"error":"serialization failed"}"#.to_string());
135
136            Response::builder()
137                .status(status_code)
138                .header("Content-Type", "application/json")
139                .body(body)
140                .unwrap()
141        }
142        (&Method::GET, "/ready") | (&Method::GET, "/readyz") => {
143            // Readiness check - just returns 200 if the server is running
144            Response::builder()
145                .status(StatusCode::OK)
146                .header("Content-Type", "application/json")
147                .body(r#"{"ready":true}"#.to_string())
148                .unwrap()
149        }
150        (&Method::GET, "/live") | (&Method::GET, "/livez") => {
151            // Liveness check - always returns 200 if the server is running
152            Response::builder()
153                .status(StatusCode::OK)
154                .header("Content-Type", "application/json")
155                .body(r#"{"alive":true}"#.to_string())
156                .unwrap()
157        }
158        _ => Response::builder()
159            .status(StatusCode::NOT_FOUND)
160            .body("Not Found".to_string())
161            .unwrap(),
162    };
163
164    Ok(response)
165}
166
167/// Errors that can occur when running the health server
168#[derive(Debug, Clone)]
169pub enum HealthServerError {
170    /// Failed to bind to the address
171    Bind(String),
172    /// Failed to accept a connection
173    Accept(String),
174}
175
176impl std::fmt::Display for HealthServerError {
177    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178        match self {
179            HealthServerError::Bind(msg) => write!(f, "Failed to bind: {}", msg),
180            HealthServerError::Accept(msg) => write!(f, "Failed to accept connection: {}", msg),
181        }
182    }
183}
184
185impl std::error::Error for HealthServerError {}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190    use crate::health::{AlwaysHealthy, SimpleHealthCheck};
191
192    #[tokio::test]
193    async fn test_health_server_check() {
194        let checker = SimpleHealthCheck::new().add_dependency(AlwaysHealthy::new("test"));
195        let server = HealthServer::new(checker);
196        let report = server.check().await;
197        assert!(report.is_healthy());
198    }
199
200    #[test]
201    fn test_health_server_error_display() {
202        let err = HealthServerError::Bind("address in use".into());
203        assert!(err.to_string().contains("address in use"));
204    }
205}