use std::{
net::SocketAddr,
sync::{
Arc, LazyLock,
atomic::{AtomicBool, Ordering},
},
};
use axum::{Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
use prometheus::{Encoder, IntGauge, TextEncoder, register_int_gauge};
use tokio_util::sync::CancellationToken;
static BUILD_INFO: LazyLock<IntGauge> = LazyLock::new(|| {
let gauge = register_int_gauge!("polychrome_build_info", "Always 1 for a live process")
.expect("register polychrome_build_info");
gauge.set(1);
gauge
});
#[derive(Clone, Default)]
pub struct Health {
ready: Arc<AtomicBool>,
}
impl Health {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn set_ready(&self, ready: bool) {
self.ready.store(ready, Ordering::Relaxed);
}
}
pub async fn serve(
addr: SocketAddr,
health: Health,
shutdown: CancellationToken,
) -> anyhow::Result<()> {
let listener = tokio::net::TcpListener::bind(addr).await?;
serve_on(listener, health, shutdown).await
}
pub async fn serve_on(
listener: tokio::net::TcpListener,
health: Health,
shutdown: CancellationToken,
) -> anyhow::Result<()> {
let app = Router::new()
.route("/healthz", get(|| async { StatusCode::OK }))
.route("/livez", get(|| async { StatusCode::OK }))
.route("/readyz", get(readyz))
.route("/metrics", get(metrics))
.with_state(health);
let addr = listener.local_addr()?;
tracing::info!(%addr, "side-server listening");
axum::serve(listener, app)
.with_graceful_shutdown(async move { shutdown.cancelled().await })
.await?;
Ok(())
}
async fn readyz(State(health): State<Health>) -> impl IntoResponse {
if health.ready.load(Ordering::Relaxed) {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
}
}
async fn metrics() -> impl IntoResponse {
LazyLock::force(&BUILD_INFO);
let families = prometheus::gather();
let mut buf = Vec::new();
let encoder = TextEncoder::new();
if encoder.encode(&families, &mut buf).is_err() {
return (StatusCode::INTERNAL_SERVER_ERROR, Vec::new()).into_response();
}
(
[(axum::http::header::CONTENT_TYPE, encoder.format_type())],
buf,
)
.into_response()
}
#[cfg(test)]
mod tests {
#![allow(clippy::pedantic, clippy::nursery, missing_docs)]
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_util::sync::CancellationToken;
use super::{Health, serve_on};
async fn status_line(addr: std::net::SocketAddr, path: &str) -> String {
let mut stream = tokio::net::TcpStream::connect(addr).await.unwrap();
stream
.write_all(
format!("GET {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
.as_bytes(),
)
.await
.unwrap();
let mut buf = String::new();
stream.read_to_string(&mut buf).await.unwrap();
buf.lines().next().unwrap_or_default().to_owned()
}
#[tokio::test]
async fn serve_on_takes_a_prebound_listener_and_readyz_tracks_the_flag() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let health = Health::new();
let shutdown = CancellationToken::new();
let srv = tokio::spawn(serve_on(listener, health.clone(), shutdown.clone()));
assert!(status_line(addr, "/readyz").await.contains("503"));
assert!(status_line(addr, "/livez").await.contains("200"));
health.set_ready(true);
assert!(status_line(addr, "/readyz").await.contains("200"));
health.set_ready(false);
assert!(status_line(addr, "/readyz").await.contains("503"));
shutdown.cancel();
srv.await.unwrap().unwrap();
}
}