1use {
2 crate::config::ConfigMetrics,
3 http_body_util::{BodyExt, Full as BodyFull},
4 hyper::{
5 body::{Bytes, Incoming as BodyIncoming},
6 service::service_fn,
7 Request, Response, StatusCode,
8 },
9 hyper_util::{
10 rt::tokio::{TokioExecutor, TokioIo},
11 server::conn::auto::Builder as ServerBuilder,
12 },
13 std::future::Future,
14 tokio::{net::TcpListener, task::JoinError},
15 tracing::{error, info},
16};
17
18pub async fn spawn_server(
19 ConfigMetrics { endpoint }: ConfigMetrics,
20 gather_metrics: impl Fn() -> Bytes + Clone + Send + 'static,
21 is_health_check: impl Fn() -> bool + Clone + Send + 'static,
22 is_ready_check: impl Fn() -> bool + Clone + Send + 'static,
23 shutdown: impl Future<Output = ()> + Send + 'static,
24) -> std::io::Result<impl Future<Output = Result<(), JoinError>>> {
25 let listener = TcpListener::bind(endpoint).await?;
26 info!("start server at: {endpoint}");
27
28 Ok(tokio::spawn(async move {
29 tokio::pin!(shutdown);
30 loop {
31 let stream = tokio::select! {
32 maybe_conn = listener.accept() => {
33 match maybe_conn {
34 Ok((stream, _addr)) => stream,
35 Err(error) => {
36 error!("failed to accept new connection: {error}");
37 break;
38 }
39 }
40 }
41 () = &mut shutdown => {
42 info!("shutdown");
43 break
44 },
45 };
46 let gather_metrics = gather_metrics.clone();
47 let is_health_check = is_health_check.clone();
48 let is_ready_check = is_ready_check.clone();
49 tokio::spawn(async move {
50 if let Err(error) = ServerBuilder::new(TokioExecutor::new())
51 .serve_connection(
52 TokioIo::new(stream),
53 service_fn(move |req: Request<BodyIncoming>| {
54 let gather_metrics = gather_metrics.clone();
55 let is_health_check = is_health_check.clone();
56 let is_ready_check = is_ready_check.clone();
57 async move {
58 let (status, bytes) = match req.uri().path() {
59 "/health" => {
60 if is_health_check() {
61 (StatusCode::OK, Bytes::from("OK"))
62 } else {
63 (
64 StatusCode::INTERNAL_SERVER_ERROR,
65 Bytes::from("Service is unhealthy"),
66 )
67 }
68 }
69 "/metrics" => (StatusCode::OK, gather_metrics()),
70 "/ready" => {
71 if is_ready_check() {
72 (StatusCode::OK, Bytes::from("OK"))
73 } else {
74 (
75 StatusCode::INTERNAL_SERVER_ERROR,
76 Bytes::from("Service is not ready"),
77 )
78 }
79 }
80 _ => (StatusCode::NOT_FOUND, Bytes::new()),
81 };
82
83 Response::builder()
84 .status(status)
85 .body(BodyFull::new(bytes).boxed())
86 }
87 }),
88 )
89 .await
90 {
91 error!("failed to handle request: {error}");
92 }
93 });
94 }
95 }))
96}