svc_utils/
metrics.rs

1use std::{net::SocketAddr, time::Duration};
2
3use axum::{extract::Extension, routing, routing::Router, Server};
4use hyper::{Body, Request, Response};
5use prometheus::{Encoder, Registry, TextEncoder};
6use tokio::{sync::oneshot, task::JoinHandle};
7use tower_http::trace::TraceLayer;
8use tracing::{error, field::Empty, info, warn, Span};
9
10/// Http server with graceful shutdown that serves prometheus metrics
11///
12/// Runs in a separate tokio task
13pub struct MetricsServer {
14    join_handle: JoinHandle<Result<(), hyper::Error>>,
15    closer: oneshot::Sender<()>,
16}
17
18impl MetricsServer {
19    /// Create new server with prometheus default registry. This will spawn a new tokio task.
20    ///
21    /// # Arguments
22    ///
23    /// * `registry` - prometheus registry to gather metrics from
24    /// * `bind_addr` - address to bind server to
25    pub fn new(bind_addr: SocketAddr) -> Self {
26        let app = Router::new().route("/metrics", routing::get(metrics_handler));
27
28        Self::new_(app, bind_addr)
29    }
30
31    /// Create new server with a given registry. This will spawn a new tokio task.
32    ///
33    /// # Arguments
34    ///
35    /// * `registry` - prometheus registry to gather metrics from
36    /// * `bind_addr` - address to bind server to
37    pub fn new_with_registry(registry: Registry, bind_addr: SocketAddr) -> Self {
38        let app = Router::new();
39
40        let app = app
41            .route("/metrics", routing::get(metrics_handler_with_registry))
42            .layer(Extension(registry));
43
44        Self::new_(app, bind_addr)
45    }
46
47    fn new_(app: Router, bind_addr: SocketAddr) -> Self {
48        let app = app.layer(
49            TraceLayer::new_for_http()
50                .make_span_with(|request: &Request<_>| {
51                    // TODO: Option will be recorded simpler
52                    // when https://github.com/tokio-rs/tracing/pull/1393 lands
53
54                    let span = tracing::info_span!(
55                        "http-metrics-request",
56                        status_code = Empty,
57                        path = request.uri().path(),
58                        query = Empty
59                    );
60                    if let Some(query) = request.uri().query() {
61                        // clippy in CI doesn't like the simple '&query' here
62                        span.record("query", &tracing::field::display(query));
63                    }
64                    span
65                })
66                .on_response(|response: &Response<_>, latency: Duration, span: &Span| {
67                    span.record("status_code", &tracing::field::display(response.status()));
68                    info!("response generated in {:?}", latency)
69                }),
70        );
71
72        let (closer, rx) = oneshot::channel::<()>();
73
74        let join_handle = tokio::task::spawn(async move {
75            Server::bind(&bind_addr)
76                .serve(app.into_make_service())
77                .with_graceful_shutdown(async {
78                    rx.await.ok();
79                })
80                .await
81        });
82
83        Self {
84            join_handle,
85            closer,
86        }
87    }
88
89    /// Shutdowns the server
90    pub async fn shutdown(self) {
91        info!("Received signal, triggering metrics server shutdown");
92
93        let _ = self.closer.send(());
94        let fut = tokio::time::timeout(Duration::from_secs(3), self.join_handle);
95
96        match fut.await {
97            Err(e) => {
98                error!("Metrics server timed out during shutdown, error = {:?}", e);
99            }
100            Ok(Err(e)) => {
101                error!("Metrics server failed during shutdown, error = {:?}", e);
102            }
103            Ok(Ok(_)) => {
104                info!("Metrics server successfully exited");
105            }
106        }
107    }
108}
109
110async fn metrics_handler() -> Response<Body> {
111    let mut buffer = vec![];
112    let encoder = TextEncoder::new();
113    let metric_families = prometheus::gather();
114    match encoder.encode(&metric_families, &mut buffer) {
115        Ok(_) => Response::builder().status(200).body(buffer.into()).unwrap(),
116        Err(err) => {
117            warn!("Metrics not gathered: {:?}", err);
118            Response::builder().status(500).body(vec![].into()).unwrap()
119        }
120    }
121}
122
123async fn metrics_handler_with_registry(state: Extension<Registry>) -> Response<Body> {
124    let registry = state.0;
125    let mut buffer = vec![];
126    let encoder = TextEncoder::new();
127    let metric_families = registry.gather();
128    match encoder.encode(&metric_families, &mut buffer) {
129        Ok(_) => Response::builder().status(200).body(buffer.into()).unwrap(),
130        Err(err) => {
131            warn!("Metrics not gathered: {:?}", err);
132            Response::builder().status(500).body(vec![].into()).unwrap()
133        }
134    }
135}