1use std::io::{Read, Write};
2use std::net::{SocketAddr, TcpListener, TcpStream};
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::thread::{self, JoinHandle};
6use std::time::Duration;
7
8use crate::ServerError;
9
10use super::checks::{SharedReadinessState, health_check, readiness_check};
11
12const HEALTH_PATH: &str = "/health";
13const READY_PATH: &str = "/ready";
14const APPLICATION_JSON: &str = "application/json";
15const READ_BUFFER_BYTES: usize = 2048;
16
17#[derive(Debug)]
19pub struct HealthServerHandle {
20 local_addr: SocketAddr,
21 shutdown: Arc<AtomicBool>,
22 worker: Option<JoinHandle<Result<(), ServerError>>>,
23}
24
25impl HealthServerHandle {
26 #[must_use]
28 pub const fn local_addr(&self) -> SocketAddr {
29 self.local_addr
30 }
31
32 pub fn shutdown(mut self) -> Result<(), ServerError> {
39 self.stop_worker()
40 }
41
42 fn stop_worker(&mut self) -> Result<(), ServerError> {
43 self.shutdown.store(true, Ordering::SeqCst);
44 let Some(worker) = self.worker.take() else {
45 return Ok(());
46 };
47
48 worker.join().map_err(|_| ServerError::HealthEndpoint {
49 message: "health endpoint worker thread terminated unexpectedly".to_owned(),
50 })?
51 }
52}
53
54impl Drop for HealthServerHandle {
55 fn drop(&mut self) {
56 if let Err(error) = self.stop_worker() {
57 tracing::debug!(%error, "health endpoint shutdown during drop failed");
58 }
59 }
60}
61
62pub fn start_health_server(
73 bind_address: SocketAddr,
74 readiness: SharedReadinessState,
75) -> Result<HealthServerHandle, ServerError> {
76 let listener =
77 TcpListener::bind(bind_address).map_err(|error| ServerError::HealthEndpoint {
78 message: format!("failed to bind health endpoint at {bind_address}: {error}"),
79 })?;
80 listener
81 .set_nonblocking(true)
82 .map_err(|error| ServerError::HealthEndpoint {
83 message: format!("failed to configure health endpoint listener: {error}"),
84 })?;
85 let local_addr = listener
86 .local_addr()
87 .map_err(|error| ServerError::HealthEndpoint {
88 message: format!("failed to inspect health endpoint listener address: {error}"),
89 })?;
90 let shutdown = Arc::new(AtomicBool::new(false));
91 let worker_shutdown = Arc::clone(&shutdown);
92 let worker = thread::spawn(move || serve(&listener, &readiness, &worker_shutdown));
93
94 Ok(HealthServerHandle {
95 local_addr,
96 shutdown,
97 worker: Some(worker),
98 })
99}
100
101fn serve(
102 listener: &TcpListener,
103 readiness: &SharedReadinessState,
104 shutdown: &AtomicBool,
105) -> Result<(), ServerError> {
106 while !shutdown.load(Ordering::SeqCst) {
107 match listener.accept() {
108 Ok((stream, ..)) => {
109 if let Err(error) = handle_connection(stream, readiness) {
115 tracing::debug!(%error, "health endpoint connection error");
116 }
117 }
118 Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
119 thread::sleep(Duration::from_millis(10));
120 }
121 Err(error) if error.kind() == std::io::ErrorKind::Interrupted => {}
122 Err(error) => {
123 return Err(ServerError::HealthEndpoint {
124 message: format!("health endpoint accept failed: {error}"),
125 });
126 }
127 }
128 }
129
130 Ok(())
131}
132
133fn handle_connection(
134 mut stream: TcpStream,
135 readiness: &SharedReadinessState,
136) -> Result<(), ServerError> {
137 stream
138 .set_nonblocking(false)
139 .map_err(|error| ServerError::HealthEndpoint {
140 message: format!("failed to configure health request stream: {error}"),
141 })?;
142 stream
143 .set_read_timeout(Some(Duration::from_secs(2)))
144 .map_err(|error| ServerError::HealthEndpoint {
145 message: format!("failed to set health request read timeout: {error}"),
146 })?;
147
148 let mut buffer = [0_u8; READ_BUFFER_BYTES];
149 let bytes_read = stream
150 .read(&mut buffer)
151 .map_err(|error| ServerError::HealthEndpoint {
152 message: format!("failed to read health request: {error}"),
153 })?;
154
155 if bytes_read == 0 {
156 return Ok(());
157 }
158
159 let response = response_for_request(&buffer[..bytes_read], readiness)?;
160 stream
161 .write_all(&response)
162 .map_err(|error| ServerError::HealthEndpoint {
163 message: format!("failed to write health response: {error}"),
164 })?;
165 stream.flush().map_err(|error| ServerError::HealthEndpoint {
166 message: format!("failed to flush health response: {error}"),
167 })
168}
169
170fn response_for_request(
171 request: &[u8],
172 readiness: &SharedReadinessState,
173) -> Result<Vec<u8>, ServerError> {
174 let Ok(request) = std::str::from_utf8(request) else {
175 return Ok(empty_response(StatusCode::BadRequest));
176 };
177 let Some((method, path)) = parse_request_line(request) else {
178 return Ok(empty_response(StatusCode::BadRequest));
179 };
180
181 match (method, path) {
182 ("GET", HEALTH_PATH) => json_response(StatusCode::Ok, &health_check()),
183 ("GET", READY_PATH) => {
184 let status = readiness_check(&readiness.snapshot());
185 let status_code = if status.ready {
186 StatusCode::Ok
187 } else {
188 StatusCode::ServiceUnavailable
189 };
190 json_response(status_code, &status)
191 }
192 (_, HEALTH_PATH | READY_PATH) => Ok(empty_response(StatusCode::MethodNotAllowed)),
193 _ => Ok(empty_response(StatusCode::NotFound)),
194 }
195}
196
197fn parse_request_line(request: &str) -> Option<(&str, &str)> {
198 let request_line = request.lines().next()?;
199 let mut parts = request_line.split_whitespace();
200 let method = parts.next()?;
201 let path = parts.next()?;
202 parts.next()?;
203
204 Some((method, path))
205}
206
207fn json_response<T>(status: StatusCode, value: &T) -> Result<Vec<u8>, ServerError>
208where
209 T: serde::Serialize,
210{
211 let body = serde_json::to_vec(value).map_err(|error| ServerError::HealthEndpoint {
212 message: format!("failed to serialize health response: {error}"),
213 })?;
214 Ok(response(status, Some(APPLICATION_JSON), &body))
215}
216
217fn empty_response(status: StatusCode) -> Vec<u8> {
218 response(status, None, &[])
219}
220
221fn response(status: StatusCode, content_type: Option<&str>, body: &[u8]) -> Vec<u8> {
222 let mut response = Vec::new();
223 let status_line = format!("HTTP/1.1 {} {}\r\n", status.code(), status.reason());
224 response.extend_from_slice(status_line.as_bytes());
225 response.extend_from_slice(format!("Content-Length: {}\r\n", body.len()).as_bytes());
226 response.extend_from_slice(b"Connection: close\r\n");
227 if let Some(content_type) = content_type {
228 response.extend_from_slice(format!("Content-Type: {content_type}\r\n").as_bytes());
229 }
230 response.extend_from_slice(b"\r\n");
231 response.extend_from_slice(body);
232 response
233}
234
235#[derive(Debug, Clone, Copy, PartialEq, Eq)]
236enum StatusCode {
237 Ok,
238 BadRequest,
239 NotFound,
240 MethodNotAllowed,
241 ServiceUnavailable,
242}
243
244impl StatusCode {
245 const fn code(self) -> u16 {
246 match self {
247 Self::Ok => 200,
248 Self::BadRequest => 400,
249 Self::NotFound => 404,
250 Self::MethodNotAllowed => 405,
251 Self::ServiceUnavailable => 503,
252 }
253 }
254
255 const fn reason(self) -> &'static str {
256 match self {
257 Self::Ok => "OK",
258 Self::BadRequest => "Bad Request",
259 Self::NotFound => "Not Found",
260 Self::MethodNotAllowed => "Method Not Allowed",
261 Self::ServiceUnavailable => "Service Unavailable",
262 }
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use std::io::{Read, Write};
269 use std::net::{SocketAddr, TcpStream};
270 use std::time::Duration;
271
272 use serde_json::Value;
273
274 use super::{response_for_request, start_health_server};
275 use crate::health::checks::{
276 ClusterReadiness, ReadinessCondition, ReadinessState, SharedReadinessState,
277 };
278
279 fn loopback_ephemeral() -> Result<SocketAddr, Box<dyn std::error::Error>> {
280 Ok("127.0.0.1:0".parse()?)
281 }
282
283 fn get(address: SocketAddr, path: &str) -> Result<String, Box<dyn std::error::Error>> {
284 let mut stream = TcpStream::connect(address)?;
285 stream.set_read_timeout(Some(Duration::from_secs(2)))?;
286 let request = format!("GET {path} HTTP/1.1\r\nHost: localhost\r\n\r\n");
287 stream.write_all(request.as_bytes())?;
288
289 let mut response = String::new();
290 stream.read_to_string(&mut response)?;
291 Ok(response)
292 }
293
294 fn assert_status(response: &str, status: u16) {
295 let expected = format!("HTTP/1.1 {status} ");
296 assert!(
297 response.starts_with(&expected),
298 "response status did not start with {expected}: {response}"
299 );
300 }
301
302 fn body(response: &str) -> Result<&str, Box<dyn std::error::Error>> {
303 let Some((_headers, body)) = response.split_once("\r\n\r\n") else {
304 return Err("response did not contain a header/body separator".into());
305 };
306 Ok(body)
307 }
308
309 fn json_body(response: &str) -> Result<Value, Box<dyn std::error::Error>> {
310 Ok(serde_json::from_str(body(response)?)?)
311 }
312
313 #[test]
314 fn health_endpoint_returns_json_200_regardless_of_readiness()
315 -> Result<(), Box<dyn std::error::Error>> {
316 let readiness = SharedReadinessState::new(ReadinessState::default());
317 let server = start_health_server(loopback_ephemeral()?, readiness)?;
318
319 let response = get(server.local_addr(), "/health")?;
320 server.shutdown()?;
321
322 assert_status(&response, 200);
323 assert!(response.contains("Content-Type: application/json\r\n"));
324 let body = json_body(&response)?;
325 assert_eq!(body["status"], "healthy");
326
327 Ok(())
328 }
329
330 #[test]
331 fn ready_endpoint_returns_503_before_main_listener_binds()
332 -> Result<(), Box<dyn std::error::Error>> {
333 let readiness = SharedReadinessState::new(ReadinessState::new(
334 true,
335 false,
336 ClusterReadiness::NotConfigured,
337 ));
338 let server = start_health_server(loopback_ephemeral()?, readiness)?;
339
340 let response = get(server.local_addr(), "/ready")?;
341 server.shutdown()?;
342
343 assert_status(&response, 503);
344 assert!(response.contains("Content-Type: application/json\r\n"));
345 let body = json_body(&response)?;
346 assert_eq!(body["ready"], false);
347 assert_eq!(body["unmet_conditions"][0], "listener_bound");
348
349 Ok(())
350 }
351
352 #[test]
353 fn ready_endpoint_returns_200_after_all_startup_gates() -> Result<(), Box<dyn std::error::Error>>
354 {
355 let readiness = SharedReadinessState::new(ReadinessState::ready_without_cluster());
356 let server = start_health_server(loopback_ephemeral()?, readiness)?;
357
358 let response = get(server.local_addr(), "/ready")?;
359 server.shutdown()?;
360
361 assert_status(&response, 200);
362 let body = json_body(&response)?;
363 assert_eq!(body["ready"], true);
364 let Some(unmet_conditions) = body["unmet_conditions"].as_array() else {
365 return Err("unmet_conditions should be an array".into());
366 };
367 assert!(unmet_conditions.is_empty());
368
369 Ok(())
370 }
371
372 #[test]
373 fn ready_endpoint_updates_from_shared_readiness_state() -> Result<(), Box<dyn std::error::Error>>
374 {
375 let readiness = SharedReadinessState::new(ReadinessState::default());
376 let server = start_health_server(loopback_ephemeral()?, readiness.clone())?;
377
378 let response = get(server.local_addr(), "/ready")?;
379 assert_status(&response, 503);
380
381 readiness.set_config_loaded(true);
382 readiness.set_listener_bound(true);
383 let response = get(server.local_addr(), "/ready")?;
384 server.shutdown()?;
385
386 assert_status(&response, 200);
387
388 Ok(())
389 }
390
391 #[test]
392 fn cluster_readiness_is_listed_when_configured_but_not_joined()
393 -> Result<(), Box<dyn std::error::Error>> {
394 let readiness = SharedReadinessState::new(ReadinessState::new(
395 true,
396 true,
397 ClusterReadiness::Configured {
398 membership_established: false,
399 },
400 ));
401 let response = response_for_request(b"GET /ready HTTP/1.1\r\n\r\n", &readiness)?;
402 let response = String::from_utf8(response)?;
403
404 assert_status(&response, 503);
405 let body = json_body(&response)?;
406 assert_eq!(
407 body["unmet_conditions"][0],
408 serde_json::to_value(ReadinessCondition::ClusterMembershipEstablished)?
409 );
410
411 Ok(())
412 }
413
414 #[test]
415 fn unsupported_paths_are_not_served() -> Result<(), Box<dyn std::error::Error>> {
416 let readiness = SharedReadinessState::default();
417 let response = response_for_request(b"GET /metrics HTTP/1.1\r\n\r\n", &readiness)?;
418 let response = String::from_utf8(response)?;
419
420 assert_status(&response, 404);
421
422 Ok(())
423 }
424
425 #[test]
426 fn unsupported_methods_on_health_paths_are_rejected() -> Result<(), Box<dyn std::error::Error>>
427 {
428 let readiness = SharedReadinessState::default();
429 let response = response_for_request(b"POST /health HTTP/1.1\r\n\r\n", &readiness)?;
430 let response = String::from_utf8(response)?;
431
432 assert_status(&response, 405);
433
434 Ok(())
435 }
436}