Skip to main content

polyc_runtime/
health.rs

1//! The `axum` side-server (PRD ยง11): liveness, readiness, and metrics on a
2//! separate port from a binary's main surface.
3
4use std::{
5    net::SocketAddr,
6    sync::{
7        Arc, LazyLock,
8        atomic::{AtomicBool, Ordering},
9    },
10};
11
12use axum::{Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
13use prometheus::{Encoder, IntGauge, TextEncoder, register_int_gauge};
14use tokio_util::sync::CancellationToken;
15
16/// `1` while the process is up; registered to the default registry so it is
17/// always present in a `/metrics` scrape.
18static BUILD_INFO: LazyLock<IntGauge> = LazyLock::new(|| {
19    let gauge = register_int_gauge!("polychrome_build_info", "Always 1 for a live process")
20        .expect("register polychrome_build_info");
21    gauge.set(1);
22    gauge
23});
24
25/// Shared readiness flag, flipped on once the process is wired up and flipped
26/// off as shutdown begins.
27#[derive(Clone, Default)]
28pub struct Health {
29    ready: Arc<AtomicBool>,
30}
31
32impl Health {
33    /// A fresh, not-yet-ready health handle.
34    #[must_use]
35    pub fn new() -> Self {
36        Self::default()
37    }
38
39    /// Set readiness; `/readyz` returns 200 only while `true`.
40    pub fn set_ready(&self, ready: bool) {
41        self.ready.store(ready, Ordering::Relaxed);
42    }
43}
44
45/// Serve the side-server until `shutdown` is cancelled, binding `addr` here.
46///
47/// Callers that gate readiness on their listeners actually binding should
48/// bind up front and use [`serve_on`] instead.
49///
50/// # Errors
51///
52/// Returns an error if the listener cannot bind or `axum` serving fails.
53pub async fn serve(
54    addr: SocketAddr,
55    health: Health,
56    shutdown: CancellationToken,
57) -> anyhow::Result<()> {
58    let listener = tokio::net::TcpListener::bind(addr).await?;
59    serve_on(listener, health, shutdown).await
60}
61
62/// Serve the side-server on an already-bound listener until `shutdown` is
63/// cancelled. Binding is the caller's job, so readiness can be flipped on
64/// only once the socket actually exists.
65///
66/// # Errors
67///
68/// Returns an error if `axum` serving fails.
69pub async fn serve_on(
70    listener: tokio::net::TcpListener,
71    health: Health,
72    shutdown: CancellationToken,
73) -> anyhow::Result<()> {
74    let app = Router::new()
75        .route("/healthz", get(|| async { StatusCode::OK }))
76        .route("/livez", get(|| async { StatusCode::OK }))
77        .route("/readyz", get(readyz))
78        .route("/metrics", get(metrics))
79        .with_state(health);
80
81    let addr = listener.local_addr()?;
82    tracing::info!(%addr, "side-server listening");
83    axum::serve(listener, app)
84        .with_graceful_shutdown(async move { shutdown.cancelled().await })
85        .await?;
86    Ok(())
87}
88
89/// 200 once the process is ready, else 503.
90async fn readyz(State(health): State<Health>) -> impl IntoResponse {
91    if health.ready.load(Ordering::Relaxed) {
92        StatusCode::OK
93    } else {
94        StatusCode::SERVICE_UNAVAILABLE
95    }
96}
97
98/// Prometheus scrape target: encodes the default registry (process build info
99/// plus whatever the binary has registered).
100async fn metrics() -> impl IntoResponse {
101    LazyLock::force(&BUILD_INFO);
102    let families = prometheus::gather();
103    let mut buf = Vec::new();
104    let encoder = TextEncoder::new();
105    if encoder.encode(&families, &mut buf).is_err() {
106        return (StatusCode::INTERNAL_SERVER_ERROR, Vec::new()).into_response();
107    }
108    (
109        [(axum::http::header::CONTENT_TYPE, encoder.format_type())],
110        buf,
111    )
112        .into_response()
113}
114
115#[cfg(test)]
116mod tests {
117    #![allow(clippy::pedantic, clippy::nursery, missing_docs)]
118
119    use tokio::io::{AsyncReadExt, AsyncWriteExt};
120    use tokio_util::sync::CancellationToken;
121
122    use super::{Health, serve_on};
123
124    /// One HTTP/1.1 request over a raw socket; returns the status line.
125    async fn status_line(addr: std::net::SocketAddr, path: &str) -> String {
126        let mut stream = tokio::net::TcpStream::connect(addr).await.unwrap();
127        stream
128            .write_all(
129                format!("GET {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
130                    .as_bytes(),
131            )
132            .await
133            .unwrap();
134        let mut buf = String::new();
135        stream.read_to_string(&mut buf).await.unwrap();
136        buf.lines().next().unwrap_or_default().to_owned()
137    }
138
139    #[tokio::test]
140    async fn serve_on_takes_a_prebound_listener_and_readyz_tracks_the_flag() {
141        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
142        let addr = listener.local_addr().unwrap();
143        let health = Health::new();
144        let shutdown = CancellationToken::new();
145        let srv = tokio::spawn(serve_on(listener, health.clone(), shutdown.clone()));
146
147        // The socket exists before the serving task is even polled (the
148        // bind-then-ready contract); readiness stays the flag's job.
149        assert!(status_line(addr, "/readyz").await.contains("503"));
150        assert!(status_line(addr, "/livez").await.contains("200"));
151        health.set_ready(true);
152        assert!(status_line(addr, "/readyz").await.contains("200"));
153        health.set_ready(false);
154        assert!(status_line(addr, "/readyz").await.contains("503"));
155
156        shutdown.cancel();
157        srv.await.unwrap().unwrap();
158    }
159}