richat_shared/
metrics.rs

1use {
2    crate::config::ConfigMetrics,
3    http_body_util::{BodyExt, Full as BodyFull},
4    hyper::{
5        body::{Bytes, Incoming as BodyIncoming},
6        service::service_fn,
7        Request, Response, StatusCode,
8    },
9    hyper_util::{
10        rt::tokio::{TokioExecutor, TokioIo},
11        server::conn::auto::Builder as ServerBuilder,
12    },
13    std::future::Future,
14    tokio::{net::TcpListener, task::JoinError},
15    tracing::{error, info},
16};
17
18pub async fn spawn_server(
19    ConfigMetrics { endpoint }: ConfigMetrics,
20    gather_metrics: impl Fn() -> Bytes + Clone + Send + 'static,
21    is_health_check: impl Fn() -> bool + Clone + Send + 'static,
22    is_ready_check: impl Fn() -> bool + Clone + Send + 'static,
23    shutdown: impl Future<Output = ()> + Send + 'static,
24) -> std::io::Result<impl Future<Output = Result<(), JoinError>>> {
25    let listener = TcpListener::bind(endpoint).await?;
26    info!("start server at: {endpoint}");
27
28    Ok(tokio::spawn(async move {
29        tokio::pin!(shutdown);
30        loop {
31            let stream = tokio::select! {
32                maybe_conn = listener.accept() => {
33                    match maybe_conn {
34                        Ok((stream, _addr)) => stream,
35                        Err(error) => {
36                            error!("failed to accept new connection: {error}");
37                            break;
38                        }
39                    }
40                }
41                () = &mut shutdown => {
42                    info!("shutdown");
43                    break
44                },
45            };
46            let gather_metrics = gather_metrics.clone();
47            let is_health_check = is_health_check.clone();
48            let is_ready_check = is_ready_check.clone();
49            tokio::spawn(async move {
50                if let Err(error) = ServerBuilder::new(TokioExecutor::new())
51                    .serve_connection(
52                        TokioIo::new(stream),
53                        service_fn(move |req: Request<BodyIncoming>| {
54                            let gather_metrics = gather_metrics.clone();
55                            let is_health_check = is_health_check.clone();
56                            let is_ready_check = is_ready_check.clone();
57                            async move {
58                                let (status, bytes) = match req.uri().path() {
59                                    "/health" => {
60                                        if is_health_check() {
61                                            (StatusCode::OK, Bytes::from("OK"))
62                                        } else {
63                                            (
64                                                StatusCode::INTERNAL_SERVER_ERROR,
65                                                Bytes::from("Service is unhealthy"),
66                                            )
67                                        }
68                                    }
69                                    "/metrics" => (StatusCode::OK, gather_metrics()),
70                                    "/ready" => {
71                                        if is_ready_check() {
72                                            (StatusCode::OK, Bytes::from("OK"))
73                                        } else {
74                                            (
75                                                StatusCode::INTERNAL_SERVER_ERROR,
76                                                Bytes::from("Service is not ready"),
77                                            )
78                                        }
79                                    }
80                                    _ => (StatusCode::NOT_FOUND, Bytes::new()),
81                                };
82
83                                Response::builder()
84                                    .status(status)
85                                    .body(BodyFull::new(bytes).boxed())
86                            }
87                        }),
88                    )
89                    .await
90                {
91                    error!("failed to handle request: {error}");
92                }
93            });
94        }
95    }))
96}