richat_shared/
metrics.rs

1use {
2    crate::config::ConfigMetrics,
3    http_body_util::{BodyExt, Full as BodyFull},
4    hyper::{
5        body::{Bytes, Incoming as BodyIncoming},
6        service::service_fn,
7        Request, Response, StatusCode,
8    },
9    hyper_util::{
10        rt::tokio::{TokioExecutor, TokioIo},
11        server::conn::auto::Builder as ServerBuilder,
12    },
13    std::future::Future,
14    tokio::{net::TcpListener, task::JoinError, time::Duration},
15    tracing::{error, info},
16};
17
18#[inline]
19pub fn duration_to_seconds(d: Duration) -> f64 {
20    d.as_secs() as f64 + d.subsec_nanos() as f64 / 1e9
21}
22
23pub async fn spawn_server(
24    ConfigMetrics { endpoint }: ConfigMetrics,
25    gather_metrics: impl Fn() -> Bytes + Clone + Send + 'static,
26    is_health_check: impl Fn() -> bool + Clone + Send + 'static,
27    is_ready_check: impl Fn() -> bool + Clone + Send + 'static,
28    shutdown: impl Future<Output = ()> + Send + 'static,
29) -> std::io::Result<impl Future<Output = Result<(), JoinError>>> {
30    let listener = TcpListener::bind(endpoint).await?;
31    info!("start server at: {endpoint}");
32
33    Ok(tokio::spawn(async move {
34        tokio::pin!(shutdown);
35        loop {
36            let stream = tokio::select! {
37                maybe_conn = listener.accept() => {
38                    match maybe_conn {
39                        Ok((stream, _addr)) => stream,
40                        Err(error) => {
41                            error!("failed to accept new connection: {error}");
42                            break;
43                        }
44                    }
45                }
46                () = &mut shutdown => {
47                    info!("shutdown");
48                    break
49                },
50            };
51            let gather_metrics = gather_metrics.clone();
52            let is_health_check = is_health_check.clone();
53            let is_ready_check = is_ready_check.clone();
54            tokio::spawn(async move {
55                if let Err(error) = ServerBuilder::new(TokioExecutor::new())
56                    .serve_connection(
57                        TokioIo::new(stream),
58                        service_fn(move |req: Request<BodyIncoming>| {
59                            let gather_metrics = gather_metrics.clone();
60                            let is_health_check = is_health_check.clone();
61                            let is_ready_check = is_ready_check.clone();
62                            async move {
63                                let (status, bytes) = match req.uri().path() {
64                                    "/health" => {
65                                        if is_health_check() {
66                                            (StatusCode::OK, Bytes::from("OK"))
67                                        } else {
68                                            (
69                                                StatusCode::INTERNAL_SERVER_ERROR,
70                                                Bytes::from("Service is unhealthy"),
71                                            )
72                                        }
73                                    }
74                                    "/metrics" => (StatusCode::OK, gather_metrics()),
75                                    "/ready" => {
76                                        if is_ready_check() {
77                                            (StatusCode::OK, Bytes::from("OK"))
78                                        } else {
79                                            (
80                                                StatusCode::INTERNAL_SERVER_ERROR,
81                                                Bytes::from("Service is not ready"),
82                                            )
83                                        }
84                                    }
85                                    _ => (StatusCode::NOT_FOUND, Bytes::new()),
86                                };
87
88                                Response::builder()
89                                    .status(status)
90                                    .body(BodyFull::new(bytes).boxed())
91                            }
92                        }),
93                    )
94                    .await
95                {
96                    error!("failed to handle request: {error}");
97                }
98            });
99        }
100    }))
101}