Skip to main content

mail_laser/health/
mod.rs

1use http_body::Body;
2use http_body_util::Full;
3use hyper::{Request, Response, StatusCode};
4use hyper_util::rt::{TokioExecutor, TokioIo};
5use hyper_util::server::conn::auto::Builder;
6
7use crate::config::Config;
8use acton_reactive::prelude::*;
9use anyhow::Result;
10use bytes::Bytes;
11use tokio::net::TcpListener;
12use tokio_util::sync::CancellationToken;
13
14async fn health_check_handler<B>(req: Request<B>) -> Result<Response<Full<Bytes>>, hyper::Error>
15where
16    B: Body,
17{
18    if req.uri().path() == "/health" {
19        Ok(Response::builder()
20            .status(StatusCode::OK)
21            .body(Full::new(Bytes::from("")))
22            .unwrap())
23    } else {
24        Ok(Response::builder()
25            .status(StatusCode::NOT_FOUND)
26            .body(Full::new(Bytes::from("Not Found")))
27            .unwrap())
28    }
29}
30
31async fn health_check_adapter(
32    req: Request<hyper::body::Incoming>,
33) -> Result<Response<Full<Bytes>>, hyper::Error> {
34    health_check_handler(req).await
35}
36
37// --- HealthActor ---
38
39#[acton_actor]
40pub struct HealthState;
41
42impl HealthState {
43    pub async fn create(
44        runtime: &mut ActorRuntime,
45        config: &Config,
46    ) -> anyhow::Result<ActorHandle> {
47        let actor_config = ActorConfig::new(Ern::with_root("health-check")?, None, None)?
48            .with_restart_policy(RestartPolicy::Permanent);
49
50        let mut builder = runtime.new_actor_with_config::<Self>(actor_config);
51
52        let cancel = CancellationToken::new();
53        let cancel_for_loop = cancel.clone();
54        let cancel_for_stop = cancel.clone();
55        let health_config = config.clone();
56
57        builder.after_start(move |_| {
58            let config = health_config.clone();
59            let cancel = cancel_for_loop.clone();
60
61            tokio::spawn(async move {
62                let addr_str = format!(
63                    "{}:{}",
64                    config.health_check_bind_address, config.health_check_port
65                );
66                let listener = match TcpListener::bind(&addr_str).await {
67                    Ok(l) => {
68                        tracing::info!("Health check server listening on {}", addr_str);
69                        l
70                    }
71                    Err(e) => {
72                        tracing::error!("Failed to bind health check server to {}: {}", addr_str, e);
73                        return;
74                    }
75                };
76
77                loop {
78                    tokio::select! {
79                        result = listener.accept() => {
80                            match result {
81                                Ok((stream, _)) => {
82                                    let io = TokioIo::new(stream);
83                                    let service = hyper::service::service_fn(health_check_adapter);
84
85                                    tokio::spawn(async move {
86                                        if let Err(err) = Builder::new(TokioExecutor::new())
87                                            .serve_connection(io, service)
88                                            .await
89                                        {
90                                            tracing::error!("Error serving health connection: {:?}", err);
91                                        }
92                                    });
93                                }
94                                Err(e) => {
95                                    tracing::error!("Health accept error: {}", e);
96                                    break;
97                                }
98                            }
99                        }
100                        _ = cancel.cancelled() => {
101                            tracing::info!("Health server shutting down");
102                            break;
103                        }
104                    }
105                }
106            });
107
108            Reply::ready()
109        });
110
111        builder.before_stop(move |_| {
112            cancel_for_stop.cancel();
113            Reply::ready()
114        });
115
116        Ok(builder.start().await)
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use bytes::Bytes;
124    use http_body_util::Empty;
125    use hyper::Request;
126    use hyper::StatusCode;
127
128    #[tokio::test]
129    async fn test_health_check_handler() {
130        let req = Request::builder()
131            .uri("/health")
132            .body(Empty::<Bytes>::new())
133            .unwrap();
134        let response = health_check_handler(req).await.unwrap();
135        assert_eq!(response.status(), StatusCode::OK);
136
137        let req = Request::builder()
138            .uri("/wrong")
139            .body(Empty::<Bytes>::new())
140            .unwrap();
141        let response = health_check_handler(req).await.unwrap();
142        assert_eq!(response.status(), StatusCode::NOT_FOUND);
143    }
144
145    #[tokio::test]
146    async fn test_health_check_post_method() {
147        let req = Request::builder()
148            .method(hyper::Method::POST)
149            .uri("/health")
150            .body(Empty::<Bytes>::new())
151            .unwrap();
152        let response = health_check_handler(req).await.unwrap();
153        assert_eq!(response.status(), StatusCode::OK);
154    }
155
156    #[tokio::test]
157    async fn test_health_check_put_method() {
158        let req = Request::builder()
159            .method(hyper::Method::PUT)
160            .uri("/health")
161            .body(Empty::<Bytes>::new())
162            .unwrap();
163        let response = health_check_handler(req).await.unwrap();
164        assert_eq!(response.status(), StatusCode::OK);
165    }
166
167    #[tokio::test]
168    async fn test_health_check_head_method() {
169        let req = Request::builder()
170            .method(hyper::Method::HEAD)
171            .uri("/health")
172            .body(Empty::<Bytes>::new())
173            .unwrap();
174        let response = health_check_handler(req).await.unwrap();
175        assert_eq!(response.status(), StatusCode::OK);
176    }
177
178    #[tokio::test]
179    async fn test_health_check_root_path_returns_404() {
180        let req = Request::builder()
181            .uri("/")
182            .body(Empty::<Bytes>::new())
183            .unwrap();
184        let response = health_check_handler(req).await.unwrap();
185        assert_eq!(response.status(), StatusCode::NOT_FOUND);
186    }
187
188    #[tokio::test]
189    async fn test_health_check_various_paths_return_404() {
190        let paths = vec!["/healthz", "/status", "/api/health", "/ready", "/healthcheck"];
191        for path in paths {
192            let req = Request::builder()
193                .uri(path)
194                .body(Empty::<Bytes>::new())
195                .unwrap();
196            let response = health_check_handler(req).await.unwrap();
197            assert_eq!(response.status(), StatusCode::NOT_FOUND, "GET {} should return 404", path);
198        }
199    }
200
201    #[tokio::test]
202    async fn test_health_check_post_wrong_path_returns_404() {
203        let req = Request::builder()
204            .method(hyper::Method::POST)
205            .uri("/wrong")
206            .body(Empty::<Bytes>::new())
207            .unwrap();
208        let response = health_check_handler(req).await.unwrap();
209        assert_eq!(response.status(), StatusCode::NOT_FOUND);
210    }
211}