1use std::convert::Infallible;
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use http_body_util::Full;
18use hyper::body::Incoming;
19use hyper::service::Service;
20use hyper::{Request, Response, StatusCode};
21use metrics_exporter_prometheus::PrometheusHandle;
22
23pub type ReadyCheck =
25 Arc<dyn Fn() -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> + Send + Sync>;
26
27#[derive(Clone)]
29pub struct HealthRouter<S> {
30 pub inner: S,
31 pub ready_check: Option<ReadyCheck>,
32 pub metrics_handle: Option<PrometheusHandle>,
33}
34
35impl<S> HealthRouter<S> {
36 pub fn new(inner: S, ready_check: Option<ReadyCheck>) -> Self {
37 Self {
38 inner,
39 ready_check,
40 metrics_handle: None,
41 }
42 }
43
44 #[must_use]
45 pub fn with_metrics(mut self, handle: PrometheusHandle) -> Self {
46 self.metrics_handle = Some(handle);
47 self
48 }
49}
50
51type RespBody = s3s::Body;
55
56fn make_text_response(status: StatusCode, body: &'static str) -> Response<RespBody> {
57 let bytes = Bytes::from_static(body.as_bytes());
58 Response::builder()
59 .status(status)
60 .header("content-type", "text/plain; charset=utf-8")
61 .header("content-length", bytes.len().to_string())
62 .body(s3s::Body::http_body(
63 Full::new(bytes).map_err(|never| match never {}),
64 ))
65 .expect("static response")
66}
67
68fn make_owned_text_response(
69 status: StatusCode,
70 content_type: &'static str,
71 body: String,
72) -> Response<RespBody> {
73 let bytes = Bytes::from(body.into_bytes());
74 Response::builder()
75 .status(status)
76 .header("content-type", content_type)
77 .header("content-length", bytes.len().to_string())
78 .body(s3s::Body::http_body(
79 Full::new(bytes).map_err(|never| match never {}),
80 ))
81 .expect("owned response")
82}
83
84impl<S> Service<Request<Incoming>> for HealthRouter<S>
85where
86 S: Service<Request<Incoming>, Response = Response<s3s::Body>, Error = s3s::HttpError>
87 + Clone
88 + Send
89 + 'static,
90 S::Future: Send + 'static,
91{
92 type Response = Response<RespBody>;
93 type Error = s3s::HttpError;
94 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
95
96 fn call(&self, req: Request<Incoming>) -> Self::Future {
97 let path = req.uri().path();
98 match (req.method(), path) {
99 (&hyper::Method::GET, "/health") | (&hyper::Method::HEAD, "/health") => {
100 Box::pin(async { Ok(make_text_response(StatusCode::OK, "ok\n")) })
101 }
102 (&hyper::Method::GET, "/metrics") | (&hyper::Method::HEAD, "/metrics") => {
103 let handle = self.metrics_handle.clone();
104 Box::pin(async move {
105 match handle {
106 Some(h) => {
107 let body = h.render();
108 Ok(make_owned_text_response(
109 StatusCode::OK,
110 "text/plain; version=0.0.4; charset=utf-8",
111 body,
112 ))
113 }
114 None => Ok(make_text_response(
115 StatusCode::SERVICE_UNAVAILABLE,
116 "metrics not configured\n",
117 )),
118 }
119 })
120 }
121 (&hyper::Method::GET, "/ready") | (&hyper::Method::HEAD, "/ready") => {
122 let check = self.ready_check.clone();
123 Box::pin(async move {
124 match check {
125 Some(f) => match f().await {
126 Ok(()) => Ok(make_text_response(StatusCode::OK, "ready\n")),
127 Err(reason) => {
128 tracing::warn!(%reason, "readiness check failed");
129 Ok(make_text_response(
130 StatusCode::SERVICE_UNAVAILABLE,
131 "not ready\n",
132 ))
133 }
134 },
135 None => Ok(make_text_response(StatusCode::OK, "ready (no check)\n")),
136 }
137 })
138 }
139 _ => {
140 let inner = self.inner.clone();
141 Box::pin(async move { inner.call(req).await })
142 }
143 }
144 }
145}
146
147trait FullExt<B> {
149 fn map_err<E, F: FnMut(Infallible) -> E>(
150 self,
151 f: F,
152 ) -> http_body_util::combinators::MapErr<Self, F>
153 where
154 Self: Sized;
155}
156impl<B> FullExt<B> for Full<B>
157where
158 B: bytes::Buf,
159{
160 fn map_err<E, F: FnMut(Infallible) -> E>(
161 self,
162 f: F,
163 ) -> http_body_util::combinators::MapErr<Self, F>
164 where
165 Self: Sized,
166 {
167 http_body_util::BodyExt::map_err(self, f)
168 }
169}