Skip to main content

liminal_server/health/
endpoint.rs

1use std::io::{Read, Write};
2use std::net::{SocketAddr, TcpListener, TcpStream};
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::thread::{self, JoinHandle};
6use std::time::Duration;
7
8use crate::ServerError;
9
10use super::checks::{SharedReadinessState, health_check, readiness_check};
11
12const HEALTH_PATH: &str = "/health";
13const READY_PATH: &str = "/ready";
14const APPLICATION_JSON: &str = "application/json";
15const READ_BUFFER_BYTES: usize = 2048;
16
17/// Handle for a running health endpoint server.
18#[derive(Debug)]
19pub struct HealthServerHandle {
20    local_addr: SocketAddr,
21    shutdown: Arc<AtomicBool>,
22    worker: Option<JoinHandle<Result<(), ServerError>>>,
23}
24
25impl HealthServerHandle {
26    /// Returns the bound address for the health endpoint server.
27    #[must_use]
28    pub const fn local_addr(&self) -> SocketAddr {
29        self.local_addr
30    }
31
32    /// Stops the health endpoint server and waits for its worker thread to exit.
33    ///
34    /// # Errors
35    ///
36    /// Returns [`ServerError::HealthEndpoint`] if the worker thread cannot be
37    /// joined cleanly or if the server loop recorded a serving error.
38    pub fn shutdown(mut self) -> Result<(), ServerError> {
39        self.stop_worker()
40    }
41
42    fn stop_worker(&mut self) -> Result<(), ServerError> {
43        self.shutdown.store(true, Ordering::SeqCst);
44        let Some(worker) = self.worker.take() else {
45            return Ok(());
46        };
47
48        worker.join().map_err(|_| ServerError::HealthEndpoint {
49            message: "health endpoint worker thread terminated unexpectedly".to_owned(),
50        })?
51    }
52}
53
54impl Drop for HealthServerHandle {
55    fn drop(&mut self) {
56        if let Err(error) = self.stop_worker() {
57            tracing::debug!(%error, "health endpoint shutdown during drop failed");
58        }
59    }
60}
61
62/// Starts the health endpoint HTTP server on a distinct health bind address.
63///
64/// The returned server handle is independent from the main wire protocol
65/// listener. Binding the health endpoint does not mark the main listener ready.
66///
67/// # Errors
68///
69/// Returns [`ServerError::HealthEndpoint`] when the health listener cannot bind,
70/// cannot be configured for non-blocking accepts, or cannot report its local
71/// address.
72pub fn start_health_server(
73    bind_address: SocketAddr,
74    readiness: SharedReadinessState,
75) -> Result<HealthServerHandle, ServerError> {
76    let listener =
77        TcpListener::bind(bind_address).map_err(|error| ServerError::HealthEndpoint {
78            message: format!("failed to bind health endpoint at {bind_address}: {error}"),
79        })?;
80    listener
81        .set_nonblocking(true)
82        .map_err(|error| ServerError::HealthEndpoint {
83            message: format!("failed to configure health endpoint listener: {error}"),
84        })?;
85    let local_addr = listener
86        .local_addr()
87        .map_err(|error| ServerError::HealthEndpoint {
88            message: format!("failed to inspect health endpoint listener address: {error}"),
89        })?;
90    let shutdown = Arc::new(AtomicBool::new(false));
91    let worker_shutdown = Arc::clone(&shutdown);
92    let worker = thread::spawn(move || serve(&listener, &readiness, &worker_shutdown));
93
94    Ok(HealthServerHandle {
95        local_addr,
96        shutdown,
97        worker: Some(worker),
98    })
99}
100
101fn serve(
102    listener: &TcpListener,
103    readiness: &SharedReadinessState,
104    shutdown: &AtomicBool,
105) -> Result<(), ServerError> {
106    while !shutdown.load(Ordering::SeqCst) {
107        match listener.accept() {
108            Ok((stream, ..)) => {
109                // A per-connection error (e.g. a TCP probe that connects but sends no HTTP
110                // data within the read timeout) must NOT terminate the serve loop — otherwise
111                // a single port probe kills the health server for the process lifetime and
112                // subsequent liveness/readiness probes get connection-refused. Only fatal
113                // listener-level accept errors (below) terminate serving.
114                if let Err(error) = handle_connection(stream, readiness) {
115                    tracing::debug!(%error, "health endpoint connection error");
116                }
117            }
118            Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
119                thread::sleep(Duration::from_millis(10));
120            }
121            Err(error) if error.kind() == std::io::ErrorKind::Interrupted => {}
122            Err(error) => {
123                return Err(ServerError::HealthEndpoint {
124                    message: format!("health endpoint accept failed: {error}"),
125                });
126            }
127        }
128    }
129
130    Ok(())
131}
132
133fn handle_connection(
134    mut stream: TcpStream,
135    readiness: &SharedReadinessState,
136) -> Result<(), ServerError> {
137    stream
138        .set_nonblocking(false)
139        .map_err(|error| ServerError::HealthEndpoint {
140            message: format!("failed to configure health request stream: {error}"),
141        })?;
142    stream
143        .set_read_timeout(Some(Duration::from_secs(2)))
144        .map_err(|error| ServerError::HealthEndpoint {
145            message: format!("failed to set health request read timeout: {error}"),
146        })?;
147
148    let mut buffer = [0_u8; READ_BUFFER_BYTES];
149    let bytes_read = stream
150        .read(&mut buffer)
151        .map_err(|error| ServerError::HealthEndpoint {
152            message: format!("failed to read health request: {error}"),
153        })?;
154
155    if bytes_read == 0 {
156        return Ok(());
157    }
158
159    let response = response_for_request(&buffer[..bytes_read], readiness)?;
160    stream
161        .write_all(&response)
162        .map_err(|error| ServerError::HealthEndpoint {
163            message: format!("failed to write health response: {error}"),
164        })?;
165    stream.flush().map_err(|error| ServerError::HealthEndpoint {
166        message: format!("failed to flush health response: {error}"),
167    })
168}
169
170fn response_for_request(
171    request: &[u8],
172    readiness: &SharedReadinessState,
173) -> Result<Vec<u8>, ServerError> {
174    let Ok(request) = std::str::from_utf8(request) else {
175        return Ok(empty_response(StatusCode::BadRequest));
176    };
177    let Some((method, path)) = parse_request_line(request) else {
178        return Ok(empty_response(StatusCode::BadRequest));
179    };
180
181    match (method, path) {
182        ("GET", HEALTH_PATH) => json_response(StatusCode::Ok, &health_check()),
183        ("GET", READY_PATH) => {
184            let status = readiness_check(&readiness.snapshot());
185            let status_code = if status.ready {
186                StatusCode::Ok
187            } else {
188                StatusCode::ServiceUnavailable
189            };
190            json_response(status_code, &status)
191        }
192        (_, HEALTH_PATH | READY_PATH) => Ok(empty_response(StatusCode::MethodNotAllowed)),
193        _ => Ok(empty_response(StatusCode::NotFound)),
194    }
195}
196
197fn parse_request_line(request: &str) -> Option<(&str, &str)> {
198    let request_line = request.lines().next()?;
199    let mut parts = request_line.split_whitespace();
200    let method = parts.next()?;
201    let path = parts.next()?;
202    parts.next()?;
203
204    Some((method, path))
205}
206
207fn json_response<T>(status: StatusCode, value: &T) -> Result<Vec<u8>, ServerError>
208where
209    T: serde::Serialize,
210{
211    let body = serde_json::to_vec(value).map_err(|error| ServerError::HealthEndpoint {
212        message: format!("failed to serialize health response: {error}"),
213    })?;
214    Ok(response(status, Some(APPLICATION_JSON), &body))
215}
216
217fn empty_response(status: StatusCode) -> Vec<u8> {
218    response(status, None, &[])
219}
220
221fn response(status: StatusCode, content_type: Option<&str>, body: &[u8]) -> Vec<u8> {
222    let mut response = Vec::new();
223    let status_line = format!("HTTP/1.1 {} {}\r\n", status.code(), status.reason());
224    response.extend_from_slice(status_line.as_bytes());
225    response.extend_from_slice(format!("Content-Length: {}\r\n", body.len()).as_bytes());
226    response.extend_from_slice(b"Connection: close\r\n");
227    if let Some(content_type) = content_type {
228        response.extend_from_slice(format!("Content-Type: {content_type}\r\n").as_bytes());
229    }
230    response.extend_from_slice(b"\r\n");
231    response.extend_from_slice(body);
232    response
233}
234
235#[derive(Debug, Clone, Copy, PartialEq, Eq)]
236enum StatusCode {
237    Ok,
238    BadRequest,
239    NotFound,
240    MethodNotAllowed,
241    ServiceUnavailable,
242}
243
244impl StatusCode {
245    const fn code(self) -> u16 {
246        match self {
247            Self::Ok => 200,
248            Self::BadRequest => 400,
249            Self::NotFound => 404,
250            Self::MethodNotAllowed => 405,
251            Self::ServiceUnavailable => 503,
252        }
253    }
254
255    const fn reason(self) -> &'static str {
256        match self {
257            Self::Ok => "OK",
258            Self::BadRequest => "Bad Request",
259            Self::NotFound => "Not Found",
260            Self::MethodNotAllowed => "Method Not Allowed",
261            Self::ServiceUnavailable => "Service Unavailable",
262        }
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use std::io::{Read, Write};
269    use std::net::{SocketAddr, TcpStream};
270    use std::time::Duration;
271
272    use serde_json::Value;
273
274    use super::{response_for_request, start_health_server};
275    use crate::health::checks::{
276        ClusterReadiness, ReadinessCondition, ReadinessState, SharedReadinessState,
277    };
278
279    fn loopback_ephemeral() -> Result<SocketAddr, Box<dyn std::error::Error>> {
280        Ok("127.0.0.1:0".parse()?)
281    }
282
283    fn get(address: SocketAddr, path: &str) -> Result<String, Box<dyn std::error::Error>> {
284        let mut stream = TcpStream::connect(address)?;
285        stream.set_read_timeout(Some(Duration::from_secs(2)))?;
286        let request = format!("GET {path} HTTP/1.1\r\nHost: localhost\r\n\r\n");
287        stream.write_all(request.as_bytes())?;
288
289        let mut response = String::new();
290        stream.read_to_string(&mut response)?;
291        Ok(response)
292    }
293
294    fn assert_status(response: &str, status: u16) {
295        let expected = format!("HTTP/1.1 {status} ");
296        assert!(
297            response.starts_with(&expected),
298            "response status did not start with {expected}: {response}"
299        );
300    }
301
302    fn body(response: &str) -> Result<&str, Box<dyn std::error::Error>> {
303        let Some((_headers, body)) = response.split_once("\r\n\r\n") else {
304            return Err("response did not contain a header/body separator".into());
305        };
306        Ok(body)
307    }
308
309    fn json_body(response: &str) -> Result<Value, Box<dyn std::error::Error>> {
310        Ok(serde_json::from_str(body(response)?)?)
311    }
312
313    #[test]
314    fn health_endpoint_returns_json_200_regardless_of_readiness()
315    -> Result<(), Box<dyn std::error::Error>> {
316        let readiness = SharedReadinessState::new(ReadinessState::default());
317        let server = start_health_server(loopback_ephemeral()?, readiness)?;
318
319        let response = get(server.local_addr(), "/health")?;
320        server.shutdown()?;
321
322        assert_status(&response, 200);
323        assert!(response.contains("Content-Type: application/json\r\n"));
324        let body = json_body(&response)?;
325        assert_eq!(body["status"], "healthy");
326
327        Ok(())
328    }
329
330    #[test]
331    fn ready_endpoint_returns_503_before_main_listener_binds()
332    -> Result<(), Box<dyn std::error::Error>> {
333        let readiness = SharedReadinessState::new(ReadinessState::new(
334            true,
335            false,
336            ClusterReadiness::NotConfigured,
337        ));
338        let server = start_health_server(loopback_ephemeral()?, readiness)?;
339
340        let response = get(server.local_addr(), "/ready")?;
341        server.shutdown()?;
342
343        assert_status(&response, 503);
344        assert!(response.contains("Content-Type: application/json\r\n"));
345        let body = json_body(&response)?;
346        assert_eq!(body["ready"], false);
347        assert_eq!(body["unmet_conditions"][0], "listener_bound");
348
349        Ok(())
350    }
351
352    #[test]
353    fn ready_endpoint_returns_200_after_all_startup_gates() -> Result<(), Box<dyn std::error::Error>>
354    {
355        let readiness = SharedReadinessState::new(ReadinessState::ready_without_cluster());
356        let server = start_health_server(loopback_ephemeral()?, readiness)?;
357
358        let response = get(server.local_addr(), "/ready")?;
359        server.shutdown()?;
360
361        assert_status(&response, 200);
362        let body = json_body(&response)?;
363        assert_eq!(body["ready"], true);
364        let Some(unmet_conditions) = body["unmet_conditions"].as_array() else {
365            return Err("unmet_conditions should be an array".into());
366        };
367        assert!(unmet_conditions.is_empty());
368
369        Ok(())
370    }
371
372    #[test]
373    fn ready_endpoint_updates_from_shared_readiness_state() -> Result<(), Box<dyn std::error::Error>>
374    {
375        let readiness = SharedReadinessState::new(ReadinessState::default());
376        let server = start_health_server(loopback_ephemeral()?, readiness.clone())?;
377
378        let response = get(server.local_addr(), "/ready")?;
379        assert_status(&response, 503);
380
381        readiness.set_config_loaded(true);
382        readiness.set_listener_bound(true);
383        let response = get(server.local_addr(), "/ready")?;
384        server.shutdown()?;
385
386        assert_status(&response, 200);
387
388        Ok(())
389    }
390
391    #[test]
392    fn cluster_readiness_is_listed_when_configured_but_not_joined()
393    -> Result<(), Box<dyn std::error::Error>> {
394        let readiness = SharedReadinessState::new(ReadinessState::new(
395            true,
396            true,
397            ClusterReadiness::Configured {
398                membership_established: false,
399            },
400        ));
401        let response = response_for_request(b"GET /ready HTTP/1.1\r\n\r\n", &readiness)?;
402        let response = String::from_utf8(response)?;
403
404        assert_status(&response, 503);
405        let body = json_body(&response)?;
406        assert_eq!(
407            body["unmet_conditions"][0],
408            serde_json::to_value(ReadinessCondition::ClusterMembershipEstablished)?
409        );
410
411        Ok(())
412    }
413
414    #[test]
415    fn unsupported_paths_are_not_served() -> Result<(), Box<dyn std::error::Error>> {
416        let readiness = SharedReadinessState::default();
417        let response = response_for_request(b"GET /metrics HTTP/1.1\r\n\r\n", &readiness)?;
418        let response = String::from_utf8(response)?;
419
420        assert_status(&response, 404);
421
422        Ok(())
423    }
424
425    #[test]
426    fn unsupported_methods_on_health_paths_are_rejected() -> Result<(), Box<dyn std::error::Error>>
427    {
428        let readiness = SharedReadinessState::default();
429        let response = response_for_request(b"POST /health HTTP/1.1\r\n\r\n", &readiness)?;
430        let response = String::from_utf8(response)?;
431
432        assert_status(&response, 405);
433
434        Ok(())
435    }
436}