1use std::{net::SocketAddr, time::Duration};
2
3use axum::{extract::Extension, routing, routing::Router, Server};
4use hyper::{Body, Request, Response};
5use prometheus::{Encoder, Registry, TextEncoder};
6use tokio::{sync::oneshot, task::JoinHandle};
7use tower_http::trace::TraceLayer;
8use tracing::{error, field::Empty, info, warn, Span};
9
10pub struct MetricsServer {
14 join_handle: JoinHandle<Result<(), hyper::Error>>,
15 closer: oneshot::Sender<()>,
16}
17
18impl MetricsServer {
19 pub fn new(bind_addr: SocketAddr) -> Self {
26 let app = Router::new().route("/metrics", routing::get(metrics_handler));
27
28 Self::new_(app, bind_addr)
29 }
30
31 pub fn new_with_registry(registry: Registry, bind_addr: SocketAddr) -> Self {
38 let app = Router::new();
39
40 let app = app
41 .route("/metrics", routing::get(metrics_handler_with_registry))
42 .layer(Extension(registry));
43
44 Self::new_(app, bind_addr)
45 }
46
47 fn new_(app: Router, bind_addr: SocketAddr) -> Self {
48 let app = app.layer(
49 TraceLayer::new_for_http()
50 .make_span_with(|request: &Request<_>| {
51 let span = tracing::info_span!(
55 "http-metrics-request",
56 status_code = Empty,
57 path = request.uri().path(),
58 query = Empty
59 );
60 if let Some(query) = request.uri().query() {
61 span.record("query", &tracing::field::display(query));
63 }
64 span
65 })
66 .on_response(|response: &Response<_>, latency: Duration, span: &Span| {
67 span.record("status_code", &tracing::field::display(response.status()));
68 info!("response generated in {:?}", latency)
69 }),
70 );
71
72 let (closer, rx) = oneshot::channel::<()>();
73
74 let join_handle = tokio::task::spawn(async move {
75 Server::bind(&bind_addr)
76 .serve(app.into_make_service())
77 .with_graceful_shutdown(async {
78 rx.await.ok();
79 })
80 .await
81 });
82
83 Self {
84 join_handle,
85 closer,
86 }
87 }
88
89 pub async fn shutdown(self) {
91 info!("Received signal, triggering metrics server shutdown");
92
93 let _ = self.closer.send(());
94 let fut = tokio::time::timeout(Duration::from_secs(3), self.join_handle);
95
96 match fut.await {
97 Err(e) => {
98 error!("Metrics server timed out during shutdown, error = {:?}", e);
99 }
100 Ok(Err(e)) => {
101 error!("Metrics server failed during shutdown, error = {:?}", e);
102 }
103 Ok(Ok(_)) => {
104 info!("Metrics server successfully exited");
105 }
106 }
107 }
108}
109
110async fn metrics_handler() -> Response<Body> {
111 let mut buffer = vec![];
112 let encoder = TextEncoder::new();
113 let metric_families = prometheus::gather();
114 match encoder.encode(&metric_families, &mut buffer) {
115 Ok(_) => Response::builder().status(200).body(buffer.into()).unwrap(),
116 Err(err) => {
117 warn!("Metrics not gathered: {:?}", err);
118 Response::builder().status(500).body(vec![].into()).unwrap()
119 }
120 }
121}
122
123async fn metrics_handler_with_registry(state: Extension<Registry>) -> Response<Body> {
124 let registry = state.0;
125 let mut buffer = vec![];
126 let encoder = TextEncoder::new();
127 let metric_families = registry.gather();
128 match encoder.encode(&metric_families, &mut buffer) {
129 Ok(_) => Response::builder().status(200).body(buffer.into()).unwrap(),
130 Err(err) => {
131 warn!("Metrics not gathered: {:?}", err);
132 Response::builder().status(500).body(vec![].into()).unwrap()
133 }
134 }
135}