1use http_body::Body;
2use http_body_util::Full;
3use hyper::{Request, Response, StatusCode};
4use hyper_util::rt::{TokioExecutor, TokioIo};
5use hyper_util::server::conn::auto::Builder;
6
7use crate::config::Config;
8use acton_reactive::prelude::*;
9use anyhow::Result;
10use bytes::Bytes;
11use tokio::net::TcpListener;
12use tokio_util::sync::CancellationToken;
13
14async fn health_check_handler<B>(req: Request<B>) -> Result<Response<Full<Bytes>>, hyper::Error>
15where
16 B: Body,
17{
18 if req.uri().path() == "/health" {
19 Ok(Response::builder()
20 .status(StatusCode::OK)
21 .body(Full::new(Bytes::from("")))
22 .unwrap())
23 } else {
24 Ok(Response::builder()
25 .status(StatusCode::NOT_FOUND)
26 .body(Full::new(Bytes::from("Not Found")))
27 .unwrap())
28 }
29}
30
31async fn health_check_adapter(
32 req: Request<hyper::body::Incoming>,
33) -> Result<Response<Full<Bytes>>, hyper::Error> {
34 health_check_handler(req).await
35}
36
37#[acton_actor]
40pub struct HealthState;
41
42impl HealthState {
43 pub async fn create(
44 runtime: &mut ActorRuntime,
45 config: &Config,
46 ) -> anyhow::Result<ActorHandle> {
47 let actor_config = ActorConfig::new(Ern::with_root("health-check")?, None, None)?
48 .with_restart_policy(RestartPolicy::Permanent);
49
50 let mut builder = runtime.new_actor_with_config::<Self>(actor_config);
51
52 let cancel = CancellationToken::new();
53 let cancel_for_loop = cancel.clone();
54 let cancel_for_stop = cancel.clone();
55 let health_config = config.clone();
56
57 builder.after_start(move |_| {
58 let config = health_config.clone();
59 let cancel = cancel_for_loop.clone();
60
61 tokio::spawn(async move {
62 let addr_str = format!(
63 "{}:{}",
64 config.health_check_bind_address, config.health_check_port
65 );
66 let listener = match TcpListener::bind(&addr_str).await {
67 Ok(l) => {
68 tracing::info!("Health check server listening on {}", addr_str);
69 l
70 }
71 Err(e) => {
72 tracing::error!("Failed to bind health check server to {}: {}", addr_str, e);
73 return;
74 }
75 };
76
77 loop {
78 tokio::select! {
79 result = listener.accept() => {
80 match result {
81 Ok((stream, _)) => {
82 let io = TokioIo::new(stream);
83 let service = hyper::service::service_fn(health_check_adapter);
84
85 tokio::spawn(async move {
86 if let Err(err) = Builder::new(TokioExecutor::new())
87 .serve_connection(io, service)
88 .await
89 {
90 tracing::error!("Error serving health connection: {:?}", err);
91 }
92 });
93 }
94 Err(e) => {
95 tracing::error!("Health accept error: {}", e);
96 break;
97 }
98 }
99 }
100 _ = cancel.cancelled() => {
101 tracing::info!("Health server shutting down");
102 break;
103 }
104 }
105 }
106 });
107
108 Reply::ready()
109 });
110
111 builder.before_stop(move |_| {
112 cancel_for_stop.cancel();
113 Reply::ready()
114 });
115
116 Ok(builder.start().await)
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123 use bytes::Bytes;
124 use http_body_util::Empty;
125 use hyper::Request;
126 use hyper::StatusCode;
127
128 #[tokio::test]
129 async fn test_health_check_handler() {
130 let req = Request::builder()
131 .uri("/health")
132 .body(Empty::<Bytes>::new())
133 .unwrap();
134 let response = health_check_handler(req).await.unwrap();
135 assert_eq!(response.status(), StatusCode::OK);
136
137 let req = Request::builder()
138 .uri("/wrong")
139 .body(Empty::<Bytes>::new())
140 .unwrap();
141 let response = health_check_handler(req).await.unwrap();
142 assert_eq!(response.status(), StatusCode::NOT_FOUND);
143 }
144
145 #[tokio::test]
146 async fn test_health_check_post_method() {
147 let req = Request::builder()
148 .method(hyper::Method::POST)
149 .uri("/health")
150 .body(Empty::<Bytes>::new())
151 .unwrap();
152 let response = health_check_handler(req).await.unwrap();
153 assert_eq!(response.status(), StatusCode::OK);
154 }
155
156 #[tokio::test]
157 async fn test_health_check_put_method() {
158 let req = Request::builder()
159 .method(hyper::Method::PUT)
160 .uri("/health")
161 .body(Empty::<Bytes>::new())
162 .unwrap();
163 let response = health_check_handler(req).await.unwrap();
164 assert_eq!(response.status(), StatusCode::OK);
165 }
166
167 #[tokio::test]
168 async fn test_health_check_head_method() {
169 let req = Request::builder()
170 .method(hyper::Method::HEAD)
171 .uri("/health")
172 .body(Empty::<Bytes>::new())
173 .unwrap();
174 let response = health_check_handler(req).await.unwrap();
175 assert_eq!(response.status(), StatusCode::OK);
176 }
177
178 #[tokio::test]
179 async fn test_health_check_root_path_returns_404() {
180 let req = Request::builder()
181 .uri("/")
182 .body(Empty::<Bytes>::new())
183 .unwrap();
184 let response = health_check_handler(req).await.unwrap();
185 assert_eq!(response.status(), StatusCode::NOT_FOUND);
186 }
187
188 #[tokio::test]
189 async fn test_health_check_various_paths_return_404() {
190 let paths = vec!["/healthz", "/status", "/api/health", "/ready", "/healthcheck"];
191 for path in paths {
192 let req = Request::builder()
193 .uri(path)
194 .body(Empty::<Bytes>::new())
195 .unwrap();
196 let response = health_check_handler(req).await.unwrap();
197 assert_eq!(response.status(), StatusCode::NOT_FOUND, "GET {} should return 404", path);
198 }
199 }
200
201 #[tokio::test]
202 async fn test_health_check_post_wrong_path_returns_404() {
203 let req = Request::builder()
204 .method(hyper::Method::POST)
205 .uri("/wrong")
206 .body(Empty::<Bytes>::new())
207 .unwrap();
208 let response = health_check_handler(req).await.unwrap();
209 assert_eq!(response.status(), StatusCode::NOT_FOUND);
210 }
211}