1use {
2 crate::config::ConfigMetrics,
3 http_body_util::{BodyExt, Full as BodyFull},
4 hyper::{
5 body::{Bytes, Incoming as BodyIncoming},
6 service::service_fn,
7 Request, Response, StatusCode,
8 },
9 hyper_util::{
10 rt::tokio::{TokioExecutor, TokioIo},
11 server::conn::auto::Builder as ServerBuilder,
12 },
13 prometheus::{proto::MetricFamily, TextEncoder},
14 std::future::Future,
15 tokio::{net::TcpListener, task::JoinError},
16 tracing::{error, info},
17};
18
19pub async fn spawn_server(
20 ConfigMetrics { endpoint }: ConfigMetrics,
21 gather_metrics: impl Fn() -> Vec<MetricFamily> + Clone + Send + 'static,
22 is_health_check: impl Fn() -> bool + Clone + Send + 'static,
23 is_ready_check: impl Fn() -> bool + Clone + Send + 'static,
24 shutdown: impl Future<Output = ()> + Send + 'static,
25) -> std::io::Result<impl Future<Output = Result<(), JoinError>>> {
26 let listener = TcpListener::bind(endpoint).await?;
27 info!("start server at: {endpoint}");
28
29 Ok(tokio::spawn(async move {
30 tokio::pin!(shutdown);
31 loop {
32 let stream = tokio::select! {
33 maybe_conn = listener.accept() => {
34 match maybe_conn {
35 Ok((stream, _addr)) => stream,
36 Err(error) => {
37 error!("failed to accept new connection: {error}");
38 break;
39 }
40 }
41 }
42 () = &mut shutdown => {
43 info!("shutdown");
44 break
45 },
46 };
47 let gather_metrics = gather_metrics.clone();
48 let is_health_check = is_health_check.clone();
49 let is_ready_check = is_ready_check.clone();
50 tokio::spawn(async move {
51 if let Err(error) = ServerBuilder::new(TokioExecutor::new())
52 .serve_connection(
53 TokioIo::new(stream),
54 service_fn(move |req: Request<BodyIncoming>| {
55 let gather_metrics = gather_metrics.clone();
56 let is_health_check = is_health_check.clone();
57 let is_ready_check = is_ready_check.clone();
58 async move {
59 let (status, bytes) = match req.uri().path() {
60 "/health" => {
61 if is_health_check() {
62 (StatusCode::OK, Bytes::from("OK"))
63 } else {
64 (
65 StatusCode::INTERNAL_SERVER_ERROR,
66 Bytes::from("Service is unhealthy"),
67 )
68 }
69 }
70 "/metrics" => {
71 let metrics = TextEncoder::new()
72 .encode_to_string(&gather_metrics())
73 .unwrap_or_else(|error| {
74 error!(
75 "could not encode custom metrics: {}",
76 error
77 );
78 String::new()
79 });
80 (StatusCode::OK, Bytes::from(metrics))
81 }
82 "/ready" => {
83 if is_ready_check() {
84 (StatusCode::OK, Bytes::from("OK"))
85 } else {
86 (
87 StatusCode::INTERNAL_SERVER_ERROR,
88 Bytes::from("Service is not ready"),
89 )
90 }
91 }
92 _ => (StatusCode::NOT_FOUND, Bytes::new()),
93 };
94
95 Response::builder()
96 .status(status)
97 .body(BodyFull::new(bytes).boxed())
98 }
99 }),
100 )
101 .await
102 {
103 error!("failed to handle request: {error}");
104 }
105 });
106 }
107 }))
108}