Skip to main content

s4_server/
routing.rs

1//! `/health` と `/ready` の HTTP routing layer。
2//!
3//! S3 server と同じポートで health probe に応答できると AWS ALB / NLB / k8s
4//! readiness probe との統合が単純になる。
5//!
6//! - `GET /health` → 常に `200 OK` (server プロセスが生きていれば返す)
7//! - `GET /ready` → `ready_check` future を await し、`Ok(())` なら 200、
8//!   それ以外 (backend 不通等) は 503。
9//! - その他のパス → inner S3Service へ委譲
10
11use std::convert::Infallible;
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use http_body_util::Full;
18use hyper::body::Incoming;
19use hyper::service::Service;
20use hyper::{Request, Response, StatusCode};
21use metrics_exporter_prometheus::PrometheusHandle;
22
23/// readiness check 関数。bound is `Send + Sync` for cross-task use.
24pub type ReadyCheck =
25    Arc<dyn Fn() -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> + Send + Sync>;
26
27/// inner service と health/ready/metrics handler を合成する hyper Service。
28#[derive(Clone)]
29pub struct HealthRouter<S> {
30    pub inner: S,
31    pub ready_check: Option<ReadyCheck>,
32    pub metrics_handle: Option<PrometheusHandle>,
33}
34
35impl<S> HealthRouter<S> {
36    pub fn new(inner: S, ready_check: Option<ReadyCheck>) -> Self {
37        Self {
38            inner,
39            ready_check,
40            metrics_handle: None,
41        }
42    }
43
44    #[must_use]
45    pub fn with_metrics(mut self, handle: PrometheusHandle) -> Self {
46        self.metrics_handle = Some(handle);
47        self
48    }
49}
50
51/// `/health` と `/ready` のレスポンス Body。
52/// inner S3Service の Body と互換する形にするために `s3s::Body` でラップ可能な
53/// `Full<Bytes>` を `s3s::Body::http_body` 経由で構築する。
54type RespBody = s3s::Body;
55
56fn make_text_response(status: StatusCode, body: &'static str) -> Response<RespBody> {
57    let bytes = Bytes::from_static(body.as_bytes());
58    Response::builder()
59        .status(status)
60        .header("content-type", "text/plain; charset=utf-8")
61        .header("content-length", bytes.len().to_string())
62        .body(s3s::Body::http_body(
63            Full::new(bytes).map_err(|never| match never {}),
64        ))
65        .expect("static response")
66}
67
68fn make_owned_text_response(
69    status: StatusCode,
70    content_type: &'static str,
71    body: String,
72) -> Response<RespBody> {
73    let bytes = Bytes::from(body.into_bytes());
74    Response::builder()
75        .status(status)
76        .header("content-type", content_type)
77        .header("content-length", bytes.len().to_string())
78        .body(s3s::Body::http_body(
79            Full::new(bytes).map_err(|never| match never {}),
80        ))
81        .expect("owned response")
82}
83
84impl<S> Service<Request<Incoming>> for HealthRouter<S>
85where
86    S: Service<Request<Incoming>, Response = Response<s3s::Body>, Error = s3s::HttpError>
87        + Clone
88        + Send
89        + 'static,
90    S::Future: Send + 'static,
91{
92    type Response = Response<RespBody>;
93    type Error = s3s::HttpError;
94    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
95
96    fn call(&self, req: Request<Incoming>) -> Self::Future {
97        let path = req.uri().path();
98        match (req.method(), path) {
99            (&hyper::Method::GET, "/health") | (&hyper::Method::HEAD, "/health") => {
100                Box::pin(async { Ok(make_text_response(StatusCode::OK, "ok\n")) })
101            }
102            (&hyper::Method::GET, "/metrics") | (&hyper::Method::HEAD, "/metrics") => {
103                let handle = self.metrics_handle.clone();
104                Box::pin(async move {
105                    match handle {
106                        Some(h) => {
107                            let body = h.render();
108                            Ok(make_owned_text_response(
109                                StatusCode::OK,
110                                "text/plain; version=0.0.4; charset=utf-8",
111                                body,
112                            ))
113                        }
114                        None => Ok(make_text_response(
115                            StatusCode::SERVICE_UNAVAILABLE,
116                            "metrics not configured\n",
117                        )),
118                    }
119                })
120            }
121            (&hyper::Method::GET, "/ready") | (&hyper::Method::HEAD, "/ready") => {
122                let check = self.ready_check.clone();
123                Box::pin(async move {
124                    match check {
125                        Some(f) => match f().await {
126                            Ok(()) => Ok(make_text_response(StatusCode::OK, "ready\n")),
127                            Err(reason) => {
128                                tracing::warn!(%reason, "readiness check failed");
129                                Ok(make_text_response(
130                                    StatusCode::SERVICE_UNAVAILABLE,
131                                    "not ready\n",
132                                ))
133                            }
134                        },
135                        None => Ok(make_text_response(StatusCode::OK, "ready (no check)\n")),
136                    }
137                })
138            }
139            _ => {
140                let inner = self.inner.clone();
141                Box::pin(async move { inner.call(req).await })
142            }
143        }
144    }
145}
146
147/// `Infallible` を anything に変換するためのトリック (`Full::map_err` 用)
148trait FullExt<B> {
149    fn map_err<E, F: FnMut(Infallible) -> E>(
150        self,
151        f: F,
152    ) -> http_body_util::combinators::MapErr<Self, F>
153    where
154        Self: Sized;
155}
156impl<B> FullExt<B> for Full<B>
157where
158    B: bytes::Buf,
159{
160    fn map_err<E, F: FnMut(Infallible) -> E>(
161        self,
162        f: F,
163    ) -> http_body_util::combinators::MapErr<Self, F>
164    where
165        Self: Sized,
166    {
167        http_body_util::BodyExt::map_err(self, f)
168    }
169}