hyperstack_server/
http_health.rs

1use crate::health::HealthMonitor;
2use anyhow::Result;
3use http_body_util::Full;
4use hyper::body::Bytes;
5use hyper::server::conn::http1;
6use hyper::service::service_fn;
7use hyper::{Request, Response, StatusCode};
8use hyper_util::rt::TokioIo;
9use std::convert::Infallible;
10use std::net::SocketAddr;
11use std::sync::Arc;
12use tokio::net::TcpListener;
13use tracing::{error, info};
14
15/// Configuration for the HTTP health server
16#[derive(Clone, Debug)]
17pub struct HttpHealthConfig {
18    pub bind_address: SocketAddr,
19}
20
21impl Default for HttpHealthConfig {
22    fn default() -> Self {
23        Self {
24            bind_address: "[::]:8081".parse().expect("valid socket address"),
25        }
26    }
27}
28
29impl HttpHealthConfig {
30    pub fn new(bind_address: impl Into<SocketAddr>) -> Self {
31        Self {
32            bind_address: bind_address.into(),
33        }
34    }
35}
36
37/// HTTP server that exposes health endpoints
38pub struct HttpHealthServer {
39    bind_addr: SocketAddr,
40    health_monitor: Option<HealthMonitor>,
41}
42
43impl HttpHealthServer {
44    pub fn new(bind_addr: SocketAddr) -> Self {
45        Self {
46            bind_addr,
47            health_monitor: None,
48        }
49    }
50
51    pub fn with_health_monitor(mut self, monitor: HealthMonitor) -> Self {
52        self.health_monitor = Some(monitor);
53        self
54    }
55
56    pub async fn start(self) -> Result<()> {
57        info!("Starting HTTP health server on {}", self.bind_addr);
58
59        let listener = TcpListener::bind(&self.bind_addr).await?;
60        info!("HTTP health server listening on {}", self.bind_addr);
61
62        let health_monitor = Arc::new(self.health_monitor);
63
64        loop {
65            match listener.accept().await {
66                Ok((stream, _addr)) => {
67                    let io = TokioIo::new(stream);
68                    let monitor = health_monitor.clone();
69
70                    tokio::spawn(async move {
71                        let service = service_fn(move |req| {
72                            let monitor = monitor.clone();
73                            async move { handle_request(req, monitor).await }
74                        });
75
76                        if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
77                            error!("HTTP connection error: {}", e);
78                        }
79                    });
80                }
81                Err(e) => {
82                    error!("Failed to accept HTTP connection: {}", e);
83                }
84            }
85        }
86    }
87}
88
89async fn handle_request(
90    req: Request<hyper::body::Incoming>,
91    health_monitor: Arc<Option<HealthMonitor>>,
92) -> Result<Response<Full<Bytes>>, Infallible> {
93    let path = req.uri().path();
94
95    match path {
96        "/health" | "/healthz" => {
97            // Basic health check - server is running
98            Ok(Response::builder()
99                .status(StatusCode::OK)
100                .header("Content-Type", "text/plain")
101                .body(Full::new(Bytes::from("OK")))
102                .unwrap())
103        }
104        "/ready" | "/readiness" => {
105            // Readiness check - check if stream is healthy
106            if let Some(monitor) = health_monitor.as_ref() {
107                if monitor.is_healthy().await {
108                    Ok(Response::builder()
109                        .status(StatusCode::OK)
110                        .header("Content-Type", "text/plain")
111                        .body(Full::new(Bytes::from("READY")))
112                        .unwrap())
113                } else {
114                    Ok(Response::builder()
115                        .status(StatusCode::SERVICE_UNAVAILABLE)
116                        .header("Content-Type", "text/plain")
117                        .body(Full::new(Bytes::from("NOT READY")))
118                        .unwrap())
119                }
120            } else {
121                // No health monitor configured, assume ready
122                Ok(Response::builder()
123                    .status(StatusCode::OK)
124                    .header("Content-Type", "text/plain")
125                    .body(Full::new(Bytes::from("READY")))
126                    .unwrap())
127            }
128        }
129        "/status" => {
130            // Detailed status endpoint
131            if let Some(monitor) = health_monitor.as_ref() {
132                let status = monitor.status().await;
133                let error_count = monitor.error_count().await;
134                let is_healthy = monitor.is_healthy().await;
135
136                let status_json = serde_json::json!({
137                    "healthy": is_healthy,
138                    "status": format!("{:?}", status),
139                    "error_count": error_count
140                });
141
142                let status_code = if is_healthy {
143                    StatusCode::OK
144                } else {
145                    StatusCode::SERVICE_UNAVAILABLE
146                };
147
148                Ok(Response::builder()
149                    .status(status_code)
150                    .header("Content-Type", "application/json")
151                    .body(Full::new(Bytes::from(status_json.to_string())))
152                    .unwrap())
153            } else {
154                let status_json = serde_json::json!({
155                    "healthy": true,
156                    "status": "no_monitor",
157                    "error_count": 0
158                });
159
160                Ok(Response::builder()
161                    .status(StatusCode::OK)
162                    .header("Content-Type", "application/json")
163                    .body(Full::new(Bytes::from(status_json.to_string())))
164                    .unwrap())
165            }
166        }
167        _ => Ok(Response::builder()
168            .status(StatusCode::NOT_FOUND)
169            .header("Content-Type", "text/plain")
170            .body(Full::new(Bytes::from("Not Found")))
171            .unwrap()),
172    }
173}