use std::net::SocketAddr;
use axum::Router;
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::get;
use tokio_util::sync::CancellationToken;
use crate::metrics::SharedRegistry;
pub fn router(registry: SharedRegistry) -> Router {
Router::new()
.route("/metrics", get(metrics))
.with_state(registry)
}
pub(crate) async fn run(
addr: SocketAddr,
registry: SharedRegistry,
shutdown: CancellationToken,
) -> std::io::Result<SocketAddr> {
let listener = tokio::net::TcpListener::bind(addr).await?;
let bound = listener.local_addr()?;
tracing::info!(%bound, "metrics server listening");
let app = router(registry);
tokio::spawn(async move {
let server = axum::serve(listener, app).with_graceful_shutdown(async move {
shutdown.cancelled().await;
});
if let Err(e) = server.await {
tracing::warn!(error = %e, "metrics server error");
}
});
Ok(bound)
}
async fn metrics(State(registry): State<SharedRegistry>) -> impl IntoResponse {
let mut buf = String::new();
let r = registry.lock().await;
if let Err(e) = prometheus_client::encoding::text::encode(&mut buf, &r) {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("encode: {e}")).into_response();
}
(
StatusCode::OK,
[(
"content-type",
"application/openmetrics-text; version=1.0.0; charset=utf-8",
)],
buf,
)
.into_response()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::metrics::BrokerMetrics;
use assert2::assert;
use axum::body::Body;
use axum::http::Request;
use tower::ServiceExt as _;
#[tokio::test]
async fn metrics_route_returns_openmetrics() {
let m = BrokerMetrics::new();
m.record_produce("t", 42);
let app = router(m.registry);
let resp = app
.oneshot(
Request::builder()
.uri("/metrics")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert!(resp.status() == StatusCode::OK);
let ct = resp
.headers()
.get("content-type")
.unwrap()
.to_str()
.unwrap();
assert!(ct.starts_with("application/openmetrics-text"), "ct={ct}");
let body = axum::body::to_bytes(resp.into_body(), 64 * 1024)
.await
.unwrap();
let s = std::str::from_utf8(&body).unwrap();
assert!(s.contains("crabka_broker_topic_bytes_in_total"), "{s}");
assert!(s.contains("42"), "{s}");
assert!(s.contains("# EOF"), "{s}");
}
}