1use std::{
5 net::SocketAddr,
6 sync::{
7 Arc, LazyLock,
8 atomic::{AtomicBool, Ordering},
9 },
10};
11
12use axum::{Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
13use prometheus::{Encoder, IntGauge, TextEncoder, register_int_gauge};
14use tokio_util::sync::CancellationToken;
15
16static BUILD_INFO: LazyLock<IntGauge> = LazyLock::new(|| {
19 let gauge = register_int_gauge!("polychrome_build_info", "Always 1 for a live process")
20 .expect("register polychrome_build_info");
21 gauge.set(1);
22 gauge
23});
24
25#[derive(Clone, Default)]
28pub struct Health {
29 ready: Arc<AtomicBool>,
30}
31
32impl Health {
33 #[must_use]
35 pub fn new() -> Self {
36 Self::default()
37 }
38
39 pub fn set_ready(&self, ready: bool) {
41 self.ready.store(ready, Ordering::Relaxed);
42 }
43}
44
45pub async fn serve(
54 addr: SocketAddr,
55 health: Health,
56 shutdown: CancellationToken,
57) -> anyhow::Result<()> {
58 let listener = tokio::net::TcpListener::bind(addr).await?;
59 serve_on(listener, health, shutdown).await
60}
61
62pub async fn serve_on(
70 listener: tokio::net::TcpListener,
71 health: Health,
72 shutdown: CancellationToken,
73) -> anyhow::Result<()> {
74 let app = Router::new()
75 .route("/healthz", get(|| async { StatusCode::OK }))
76 .route("/livez", get(|| async { StatusCode::OK }))
77 .route("/readyz", get(readyz))
78 .route("/metrics", get(metrics))
79 .with_state(health);
80
81 let addr = listener.local_addr()?;
82 tracing::info!(%addr, "side-server listening");
83 axum::serve(listener, app)
84 .with_graceful_shutdown(async move { shutdown.cancelled().await })
85 .await?;
86 Ok(())
87}
88
89async fn readyz(State(health): State<Health>) -> impl IntoResponse {
91 if health.ready.load(Ordering::Relaxed) {
92 StatusCode::OK
93 } else {
94 StatusCode::SERVICE_UNAVAILABLE
95 }
96}
97
98async fn metrics() -> impl IntoResponse {
101 LazyLock::force(&BUILD_INFO);
102 let families = prometheus::gather();
103 let mut buf = Vec::new();
104 let encoder = TextEncoder::new();
105 if encoder.encode(&families, &mut buf).is_err() {
106 return (StatusCode::INTERNAL_SERVER_ERROR, Vec::new()).into_response();
107 }
108 (
109 [(axum::http::header::CONTENT_TYPE, encoder.format_type())],
110 buf,
111 )
112 .into_response()
113}
114
115#[cfg(test)]
116mod tests {
117 #![allow(clippy::pedantic, clippy::nursery, missing_docs)]
118
119 use tokio::io::{AsyncReadExt, AsyncWriteExt};
120 use tokio_util::sync::CancellationToken;
121
122 use super::{Health, serve_on};
123
124 async fn status_line(addr: std::net::SocketAddr, path: &str) -> String {
126 let mut stream = tokio::net::TcpStream::connect(addr).await.unwrap();
127 stream
128 .write_all(
129 format!("GET {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
130 .as_bytes(),
131 )
132 .await
133 .unwrap();
134 let mut buf = String::new();
135 stream.read_to_string(&mut buf).await.unwrap();
136 buf.lines().next().unwrap_or_default().to_owned()
137 }
138
139 #[tokio::test]
140 async fn serve_on_takes_a_prebound_listener_and_readyz_tracks_the_flag() {
141 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
142 let addr = listener.local_addr().unwrap();
143 let health = Health::new();
144 let shutdown = CancellationToken::new();
145 let srv = tokio::spawn(serve_on(listener, health.clone(), shutdown.clone()));
146
147 assert!(status_line(addr, "/readyz").await.contains("503"));
150 assert!(status_line(addr, "/livez").await.contains("200"));
151 health.set_ready(true);
152 assert!(status_line(addr, "/readyz").await.contains("200"));
153 health.set_ready(false);
154 assert!(status_line(addr, "/readyz").await.contains("503"));
155
156 shutdown.cancel();
157 srv.await.unwrap().unwrap();
158 }
159}