1use {
2 crate::config::ConfigMetrics,
3 http_body_util::{BodyExt, Full as BodyFull},
4 hyper::{
5 body::{Bytes, Incoming as BodyIncoming},
6 service::service_fn,
7 Request, Response, StatusCode,
8 },
9 hyper_util::{
10 rt::tokio::{TokioExecutor, TokioIo},
11 server::conn::auto::Builder as ServerBuilder,
12 },
13 std::future::Future,
14 tokio::{net::TcpListener, task::JoinError, time::Duration},
15 tracing::{error, info},
16};
17
18#[inline]
19pub fn duration_to_seconds(d: Duration) -> f64 {
20 d.as_secs() as f64 + d.subsec_nanos() as f64 / 1e9
21}
22
23pub async fn spawn_server(
24 ConfigMetrics { endpoint }: ConfigMetrics,
25 gather_metrics: impl Fn() -> Bytes + Clone + Send + 'static,
26 is_health_check: impl Fn() -> bool + Clone + Send + 'static,
27 is_ready_check: impl Fn() -> bool + Clone + Send + 'static,
28 shutdown: impl Future<Output = ()> + Send + 'static,
29) -> std::io::Result<impl Future<Output = Result<(), JoinError>>> {
30 let listener = TcpListener::bind(endpoint).await?;
31 info!("start server at: {endpoint}");
32
33 Ok(tokio::spawn(async move {
34 tokio::pin!(shutdown);
35 loop {
36 let stream = tokio::select! {
37 maybe_conn = listener.accept() => {
38 match maybe_conn {
39 Ok((stream, _addr)) => stream,
40 Err(error) => {
41 error!("failed to accept new connection: {error}");
42 break;
43 }
44 }
45 }
46 () = &mut shutdown => {
47 info!("shutdown");
48 break
49 },
50 };
51 let gather_metrics = gather_metrics.clone();
52 let is_health_check = is_health_check.clone();
53 let is_ready_check = is_ready_check.clone();
54 tokio::spawn(async move {
55 if let Err(error) = ServerBuilder::new(TokioExecutor::new())
56 .serve_connection(
57 TokioIo::new(stream),
58 service_fn(move |req: Request<BodyIncoming>| {
59 let gather_metrics = gather_metrics.clone();
60 let is_health_check = is_health_check.clone();
61 let is_ready_check = is_ready_check.clone();
62 async move {
63 let (status, bytes) = match req.uri().path() {
64 "/health" => {
65 if is_health_check() {
66 (StatusCode::OK, Bytes::from("OK"))
67 } else {
68 (
69 StatusCode::INTERNAL_SERVER_ERROR,
70 Bytes::from("Service is unhealthy"),
71 )
72 }
73 }
74 "/metrics" => (StatusCode::OK, gather_metrics()),
75 "/ready" => {
76 if is_ready_check() {
77 (StatusCode::OK, Bytes::from("OK"))
78 } else {
79 (
80 StatusCode::INTERNAL_SERVER_ERROR,
81 Bytes::from("Service is not ready"),
82 )
83 }
84 }
85 _ => (StatusCode::NOT_FOUND, Bytes::new()),
86 };
87
88 Response::builder()
89 .status(status)
90 .body(BodyFull::new(bytes).boxed())
91 }
92 }),
93 )
94 .await
95 {
96 error!("failed to handle request: {error}");
97 }
98 });
99 }
100 }))
101}