richat_shared/
metrics.rs

1use {
2    crate::config::ConfigMetrics,
3    http_body_util::{combinators::BoxBody, BodyExt, Empty as BodyEmpty, Full as BodyFull},
4    hyper::{
5        body::{Bytes, Incoming as BodyIncoming},
6        service::service_fn,
7        Request, Response, StatusCode,
8    },
9    hyper_util::{
10        rt::tokio::{TokioExecutor, TokioIo},
11        server::conn::auto::Builder as ServerBuilder,
12    },
13    prometheus::{proto::MetricFamily, TextEncoder},
14    std::{convert::Infallible, future::Future},
15    tokio::{net::TcpListener, task::JoinError},
16    tracing::{error, info},
17};
18
19pub async fn spawn_server(
20    ConfigMetrics { endpoint }: ConfigMetrics,
21    gather_metrics: impl Fn() -> Vec<MetricFamily> + Clone + Send + 'static,
22    shutdown: impl Future<Output = ()> + Send + 'static,
23) -> std::io::Result<impl Future<Output = Result<(), JoinError>>> {
24    let listener = TcpListener::bind(endpoint).await?;
25    info!("start server at: {endpoint}");
26
27    Ok(tokio::spawn(async move {
28        tokio::pin!(shutdown);
29        loop {
30            let stream = tokio::select! {
31                maybe_conn = listener.accept() => {
32                    match maybe_conn {
33                        Ok((stream, _addr)) => stream,
34                        Err(error) => {
35                            error!("failed to accept new connection: {error}");
36                            break;
37                        }
38                    }
39                }
40                () = &mut shutdown => {
41                    info!("shutdown");
42                    break
43                },
44            };
45            let gather_metrics = gather_metrics.clone();
46            tokio::spawn(async move {
47                if let Err(error) = ServerBuilder::new(TokioExecutor::new())
48                    .serve_connection(
49                        TokioIo::new(stream),
50                        service_fn(move |req: Request<BodyIncoming>| {
51                            let gather_metrics = gather_metrics.clone();
52                            async move {
53                                match req.uri().path() {
54                                    "/metrics" => metrics_handler(&gather_metrics()),
55                                    _ => not_found_handler(),
56                                }
57                            }
58                        }),
59                    )
60                    .await
61                {
62                    error!("failed to handle request: {error}");
63                }
64            });
65        }
66    }))
67}
68
69fn metrics_handler(
70    metric_families: &[MetricFamily],
71) -> http::Result<Response<BoxBody<Bytes, Infallible>>> {
72    let metrics = TextEncoder::new()
73        .encode_to_string(metric_families)
74        .unwrap_or_else(|error| {
75            error!("could not encode custom metrics: {}", error);
76            String::new()
77        });
78    Response::builder()
79        .status(StatusCode::OK)
80        .body(BodyFull::new(Bytes::from(metrics)).boxed())
81}
82
83fn not_found_handler() -> http::Result<Response<BoxBody<Bytes, Infallible>>> {
84    Response::builder()
85        .status(StatusCode::NOT_FOUND)
86        .body(BodyEmpty::new().boxed())
87}