1use {
2 crate::config::ConfigMetrics,
3 http_body_util::{combinators::BoxBody, BodyExt, Empty as BodyEmpty, Full as BodyFull},
4 hyper::{
5 body::{Bytes, Incoming as BodyIncoming},
6 service::service_fn,
7 Request, Response, StatusCode,
8 },
9 hyper_util::{
10 rt::tokio::{TokioExecutor, TokioIo},
11 server::conn::auto::Builder as ServerBuilder,
12 },
13 prometheus::{proto::MetricFamily, TextEncoder},
14 std::{convert::Infallible, future::Future},
15 tokio::{net::TcpListener, task::JoinError},
16 tracing::{error, info},
17};
18
19pub async fn spawn_server(
20 ConfigMetrics { endpoint }: ConfigMetrics,
21 gather_metrics: impl Fn() -> Vec<MetricFamily> + Clone + Send + 'static,
22 shutdown: impl Future<Output = ()> + Send + 'static,
23) -> std::io::Result<impl Future<Output = Result<(), JoinError>>> {
24 let listener = TcpListener::bind(endpoint).await?;
25 info!("start server at: {endpoint}");
26
27 Ok(tokio::spawn(async move {
28 tokio::pin!(shutdown);
29 loop {
30 let stream = tokio::select! {
31 maybe_conn = listener.accept() => {
32 match maybe_conn {
33 Ok((stream, _addr)) => stream,
34 Err(error) => {
35 error!("failed to accept new connection: {error}");
36 break;
37 }
38 }
39 }
40 () = &mut shutdown => {
41 info!("shutdown");
42 break
43 },
44 };
45 let gather_metrics = gather_metrics.clone();
46 tokio::spawn(async move {
47 if let Err(error) = ServerBuilder::new(TokioExecutor::new())
48 .serve_connection(
49 TokioIo::new(stream),
50 service_fn(move |req: Request<BodyIncoming>| {
51 let gather_metrics = gather_metrics.clone();
52 async move {
53 match req.uri().path() {
54 "/metrics" => metrics_handler(&gather_metrics()),
55 _ => not_found_handler(),
56 }
57 }
58 }),
59 )
60 .await
61 {
62 error!("failed to handle request: {error}");
63 }
64 });
65 }
66 }))
67}
68
69fn metrics_handler(
70 metric_families: &[MetricFamily],
71) -> http::Result<Response<BoxBody<Bytes, Infallible>>> {
72 let metrics = TextEncoder::new()
73 .encode_to_string(metric_families)
74 .unwrap_or_else(|error| {
75 error!("could not encode custom metrics: {}", error);
76 String::new()
77 });
78 Response::builder()
79 .status(StatusCode::OK)
80 .body(BodyFull::new(Bytes::from(metrics)).boxed())
81}
82
83fn not_found_handler() -> http::Result<Response<BoxBody<Bytes, Infallible>>> {
84 Response::builder()
85 .status(StatusCode::NOT_FOUND)
86 .body(BodyEmpty::new().boxed())
87}