allframe_core/health/
server.rs1use std::{convert::Infallible, future::Future, net::SocketAddr, pin::Pin, sync::Arc};
6
7use hyper::{
8 body::Incoming, server::conn::http1, service::service_fn, Method, Request, Response, StatusCode,
9};
10use hyper_util::rt::TokioIo;
11use tokio::net::TcpListener;
12
13use super::{HealthCheck, HealthReport};
14
15pub struct HealthServer<H: HealthCheck + 'static> {
19 health: Arc<H>,
20 addr: SocketAddr,
21}
22
23impl<H: HealthCheck + 'static> HealthServer<H> {
24 pub fn new(health: H) -> Self {
26 Self {
27 health: Arc::new(health),
28 addr: ([0, 0, 0, 0], 8081).into(),
29 }
30 }
31
32 pub fn addr(mut self, addr: impl Into<SocketAddr>) -> Self {
34 self.addr = addr.into();
35 self
36 }
37
38 pub fn port(mut self, port: u16) -> Self {
40 self.addr = ([0, 0, 0, 0], port).into();
41 self
42 }
43
44 pub async fn serve(self) -> Result<(), HealthServerError> {
48 let listener = TcpListener::bind(self.addr)
49 .await
50 .map_err(|e| HealthServerError::Bind(e.to_string()))?;
51
52 loop {
53 let (stream, _) = listener
54 .accept()
55 .await
56 .map_err(|e| HealthServerError::Accept(e.to_string()))?;
57
58 let io = TokioIo::new(stream);
59 let health = Arc::clone(&self.health);
60
61 tokio::spawn(async move {
62 let service = service_fn(move |req| {
63 let health = Arc::clone(&health);
64 async move { handle_request(req, health).await }
65 });
66
67 if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
68 if !e.is_incomplete_message() {
70 #[cfg(feature = "otel")]
71 tracing::debug!(error = %e, "Health server connection error");
72 }
73 }
74 });
75 }
76 }
77
78 pub async fn serve_with_shutdown<F>(self, shutdown: F) -> Result<(), HealthServerError>
80 where
81 F: Future<Output = ()> + Send,
82 {
83 let listener = TcpListener::bind(self.addr)
84 .await
85 .map_err(|e| HealthServerError::Bind(e.to_string()))?;
86
87 tokio::pin!(shutdown);
88
89 loop {
90 tokio::select! {
91 _ = &mut shutdown => {
92 return Ok(());
93 }
94 result = listener.accept() => {
95 let (stream, _) = result
96 .map_err(|e| HealthServerError::Accept(e.to_string()))?;
97
98 let io = TokioIo::new(stream);
99 let health = Arc::clone(&self.health);
100
101 tokio::spawn(async move {
102 let service = service_fn(move |req| {
103 let health = Arc::clone(&health);
104 async move { handle_request(req, health).await }
105 });
106
107 let _ = http1::Builder::new().serve_connection(io, service).await;
108 });
109 }
110 }
111 }
112 }
113
114 pub fn check(&self) -> Pin<Box<dyn Future<Output = HealthReport> + Send + '_>> {
116 self.health.check_all()
117 }
118}
119
120async fn handle_request<H: HealthCheck>(
121 req: Request<Incoming>,
122 health: Arc<H>,
123) -> Result<Response<String>, Infallible> {
124 let response = match (req.method(), req.uri().path()) {
125 (&Method::GET, "/health") | (&Method::GET, "/healthz") => {
126 let report = health.check_all().await;
127 let status_code = match report.status.http_status_code() {
128 200 => StatusCode::OK,
129 503 => StatusCode::SERVICE_UNAVAILABLE,
130 _ => StatusCode::INTERNAL_SERVER_ERROR,
131 };
132 let body = report
133 .to_json()
134 .unwrap_or_else(|_| r#"{"error":"serialization failed"}"#.to_string());
135
136 Response::builder()
137 .status(status_code)
138 .header("Content-Type", "application/json")
139 .body(body)
140 .unwrap()
141 }
142 (&Method::GET, "/ready") | (&Method::GET, "/readyz") => {
143 Response::builder()
145 .status(StatusCode::OK)
146 .header("Content-Type", "application/json")
147 .body(r#"{"ready":true}"#.to_string())
148 .unwrap()
149 }
150 (&Method::GET, "/live") | (&Method::GET, "/livez") => {
151 Response::builder()
153 .status(StatusCode::OK)
154 .header("Content-Type", "application/json")
155 .body(r#"{"alive":true}"#.to_string())
156 .unwrap()
157 }
158 _ => Response::builder()
159 .status(StatusCode::NOT_FOUND)
160 .body("Not Found".to_string())
161 .unwrap(),
162 };
163
164 Ok(response)
165}
166
167#[derive(Debug, Clone)]
169pub enum HealthServerError {
170 Bind(String),
172 Accept(String),
174}
175
176impl std::fmt::Display for HealthServerError {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 match self {
179 HealthServerError::Bind(msg) => write!(f, "Failed to bind: {}", msg),
180 HealthServerError::Accept(msg) => write!(f, "Failed to accept connection: {}", msg),
181 }
182 }
183}
184
185impl std::error::Error for HealthServerError {}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190 use crate::health::{AlwaysHealthy, SimpleHealthCheck};
191
192 #[tokio::test]
193 async fn test_health_server_check() {
194 let checker = SimpleHealthCheck::new().add_dependency(AlwaysHealthy::new("test"));
195 let server = HealthServer::new(checker);
196 let report = server.check().await;
197 assert!(report.is_healthy());
198 }
199
200 #[test]
201 fn test_health_server_error_display() {
202 let err = HealthServerError::Bind("address in use".into());
203 assert!(err.to_string().contains("address in use"));
204 }
205}