richat_shared/
metrics.rs

1use {
2    crate::config::ConfigMetrics,
3    http_body_util::{BodyExt, Full as BodyFull},
4    hyper::{
5        body::{Bytes, Incoming as BodyIncoming},
6        service::service_fn,
7        Request, Response, StatusCode,
8    },
9    hyper_util::{
10        rt::tokio::{TokioExecutor, TokioIo},
11        server::conn::auto::Builder as ServerBuilder,
12    },
13    prometheus::{proto::MetricFamily, TextEncoder},
14    std::future::Future,
15    tokio::{net::TcpListener, task::JoinError},
16    tracing::{error, info},
17};
18
19pub async fn spawn_server(
20    ConfigMetrics { endpoint }: ConfigMetrics,
21    gather_metrics: impl Fn() -> Vec<MetricFamily> + Clone + Send + 'static,
22    is_health_check: impl Fn() -> bool + Clone + Send + 'static,
23    is_ready_check: impl Fn() -> bool + Clone + Send + 'static,
24    shutdown: impl Future<Output = ()> + Send + 'static,
25) -> std::io::Result<impl Future<Output = Result<(), JoinError>>> {
26    let listener = TcpListener::bind(endpoint).await?;
27    info!("start server at: {endpoint}");
28
29    Ok(tokio::spawn(async move {
30        tokio::pin!(shutdown);
31        loop {
32            let stream = tokio::select! {
33                maybe_conn = listener.accept() => {
34                    match maybe_conn {
35                        Ok((stream, _addr)) => stream,
36                        Err(error) => {
37                            error!("failed to accept new connection: {error}");
38                            break;
39                        }
40                    }
41                }
42                () = &mut shutdown => {
43                    info!("shutdown");
44                    break
45                },
46            };
47            let gather_metrics = gather_metrics.clone();
48            let is_health_check = is_health_check.clone();
49            let is_ready_check = is_ready_check.clone();
50            tokio::spawn(async move {
51                if let Err(error) = ServerBuilder::new(TokioExecutor::new())
52                    .serve_connection(
53                        TokioIo::new(stream),
54                        service_fn(move |req: Request<BodyIncoming>| {
55                            let gather_metrics = gather_metrics.clone();
56                            let is_health_check = is_health_check.clone();
57                            let is_ready_check = is_ready_check.clone();
58                            async move {
59                                let (status, bytes) = match req.uri().path() {
60                                    "/health" => {
61                                        if is_health_check() {
62                                            (StatusCode::OK, Bytes::from("OK"))
63                                        } else {
64                                            (
65                                                StatusCode::INTERNAL_SERVER_ERROR,
66                                                Bytes::from("Service is unhealthy"),
67                                            )
68                                        }
69                                    }
70                                    "/metrics" => {
71                                        let metrics = TextEncoder::new()
72                                            .encode_to_string(&gather_metrics())
73                                            .unwrap_or_else(|error| {
74                                                error!(
75                                                    "could not encode custom metrics: {}",
76                                                    error
77                                                );
78                                                String::new()
79                                            });
80                                        (StatusCode::OK, Bytes::from(metrics))
81                                    }
82                                    "/ready" => {
83                                        if is_ready_check() {
84                                            (StatusCode::OK, Bytes::from("OK"))
85                                        } else {
86                                            (
87                                                StatusCode::INTERNAL_SERVER_ERROR,
88                                                Bytes::from("Service is not ready"),
89                                            )
90                                        }
91                                    }
92                                    _ => (StatusCode::NOT_FOUND, Bytes::new()),
93                                };
94
95                                Response::builder()
96                                    .status(status)
97                                    .body(BodyFull::new(bytes).boxed())
98                            }
99                        }),
100                    )
101                    .await
102                {
103                    error!("failed to handle request: {error}");
104                }
105            });
106        }
107    }))
108}