polyc-runtime 0.1.3

Shared Unix-coherence runtime for polychrome binaries: logging, health/metrics side-server, signals.
Documentation
//! The `axum` side-server (PRD ยง11): liveness, readiness, and metrics on a
//! separate port from a binary's main surface.

use std::{
    net::SocketAddr,
    sync::{
        Arc, LazyLock,
        atomic::{AtomicBool, Ordering},
    },
};

use axum::{Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
use prometheus::{Encoder, IntGauge, TextEncoder, register_int_gauge};
use tokio_util::sync::CancellationToken;

/// `1` while the process is up; registered to the default registry so it is
/// always present in a `/metrics` scrape.
static BUILD_INFO: LazyLock<IntGauge> = LazyLock::new(|| {
    let gauge = register_int_gauge!("polychrome_build_info", "Always 1 for a live process")
        .expect("register polychrome_build_info");
    gauge.set(1);
    gauge
});

/// Shared readiness flag, flipped on once the process is wired up and flipped
/// off as shutdown begins.
#[derive(Clone, Default)]
pub struct Health {
    ready: Arc<AtomicBool>,
}

impl Health {
    /// A fresh, not-yet-ready health handle.
    #[must_use]
    pub fn new() -> Self {
        Self::default()
    }

    /// Set readiness; `/readyz` returns 200 only while `true`.
    pub fn set_ready(&self, ready: bool) {
        self.ready.store(ready, Ordering::Relaxed);
    }
}

/// Serve the side-server until `shutdown` is cancelled, binding `addr` here.
///
/// Callers that gate readiness on their listeners actually binding should
/// bind up front and use [`serve_on`] instead.
///
/// # Errors
///
/// Returns an error if the listener cannot bind or `axum` serving fails.
pub async fn serve(
    addr: SocketAddr,
    health: Health,
    shutdown: CancellationToken,
) -> anyhow::Result<()> {
    let listener = tokio::net::TcpListener::bind(addr).await?;
    serve_on(listener, health, shutdown).await
}

/// Serve the side-server on an already-bound listener until `shutdown` is
/// cancelled. Binding is the caller's job, so readiness can be flipped on
/// only once the socket actually exists.
///
/// # Errors
///
/// Returns an error if `axum` serving fails.
pub async fn serve_on(
    listener: tokio::net::TcpListener,
    health: Health,
    shutdown: CancellationToken,
) -> anyhow::Result<()> {
    let app = Router::new()
        .route("/healthz", get(|| async { StatusCode::OK }))
        .route("/livez", get(|| async { StatusCode::OK }))
        .route("/readyz", get(readyz))
        .route("/metrics", get(metrics))
        .with_state(health);

    let addr = listener.local_addr()?;
    tracing::info!(%addr, "side-server listening");
    axum::serve(listener, app)
        .with_graceful_shutdown(async move { shutdown.cancelled().await })
        .await?;
    Ok(())
}

/// 200 once the process is ready, else 503.
async fn readyz(State(health): State<Health>) -> impl IntoResponse {
    if health.ready.load(Ordering::Relaxed) {
        StatusCode::OK
    } else {
        StatusCode::SERVICE_UNAVAILABLE
    }
}

/// Prometheus scrape target: encodes the default registry (process build info
/// plus whatever the binary has registered).
async fn metrics() -> impl IntoResponse {
    LazyLock::force(&BUILD_INFO);
    let families = prometheus::gather();
    let mut buf = Vec::new();
    let encoder = TextEncoder::new();
    if encoder.encode(&families, &mut buf).is_err() {
        return (StatusCode::INTERNAL_SERVER_ERROR, Vec::new()).into_response();
    }
    (
        [(axum::http::header::CONTENT_TYPE, encoder.format_type())],
        buf,
    )
        .into_response()
}

#[cfg(test)]
mod tests {
    #![allow(clippy::pedantic, clippy::nursery, missing_docs)]

    use tokio::io::{AsyncReadExt, AsyncWriteExt};
    use tokio_util::sync::CancellationToken;

    use super::{Health, serve_on};

    /// One HTTP/1.1 request over a raw socket; returns the status line.
    async fn status_line(addr: std::net::SocketAddr, path: &str) -> String {
        let mut stream = tokio::net::TcpStream::connect(addr).await.unwrap();
        stream
            .write_all(
                format!("GET {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
                    .as_bytes(),
            )
            .await
            .unwrap();
        let mut buf = String::new();
        stream.read_to_string(&mut buf).await.unwrap();
        buf.lines().next().unwrap_or_default().to_owned()
    }

    #[tokio::test]
    async fn serve_on_takes_a_prebound_listener_and_readyz_tracks_the_flag() {
        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap();
        let health = Health::new();
        let shutdown = CancellationToken::new();
        let srv = tokio::spawn(serve_on(listener, health.clone(), shutdown.clone()));

        // The socket exists before the serving task is even polled (the
        // bind-then-ready contract); readiness stays the flag's job.
        assert!(status_line(addr, "/readyz").await.contains("503"));
        assert!(status_line(addr, "/livez").await.contains("200"));
        health.set_ready(true);
        assert!(status_line(addr, "/readyz").await.contains("200"));
        health.set_ready(false);
        assert!(status_line(addr, "/readyz").await.contains("503"));

        shutdown.cancel();
        srv.await.unwrap().unwrap();
    }
}