soil_rpc/server/middleware/
node_health.rs1use std::{
10 error::Error,
11 future::Future,
12 pin::Pin,
13 task::{Context, Poll},
14};
15
16use futures::future::FutureExt;
17use http::{HeaderValue, Method, StatusCode, Uri};
18use jsonrpsee::{
19 server::{HttpBody, HttpRequest, HttpResponse},
20 types::{Response as RpcResponse, ResponseSuccess as RpcResponseSuccess},
21};
22use tower::Service;
23
24const RPC_SYSTEM_HEALTH_CALL: &str = r#"{"jsonrpc":"2.0","method":"system_health","id":0}"#;
25const HEADER_VALUE_JSON: HeaderValue = HeaderValue::from_static("application/json; charset=utf-8");
26
27#[derive(Debug, Clone, Default)]
30pub struct NodeHealthProxyLayer;
31
32impl<S> tower::Layer<S> for NodeHealthProxyLayer {
33 type Service = NodeHealthProxy<S>;
34
35 fn layer(&self, service: S) -> Self::Service {
36 NodeHealthProxy::new(service)
37 }
38}
39
40pub struct NodeHealthProxy<S>(S);
42
43impl<S> NodeHealthProxy<S> {
44 pub fn new(service: S) -> Self {
46 Self(service)
47 }
48}
49
50impl<S> tower::Service<http::Request<hyper::body::Incoming>> for NodeHealthProxy<S>
51where
52 S: Service<HttpRequest, Response = HttpResponse>,
53 S::Response: 'static,
54 S::Error: Into<Box<dyn Error + Send + Sync>> + 'static,
55 S::Future: Send + 'static,
56{
57 type Response = S::Response;
58 type Error = Box<dyn Error + Send + Sync + 'static>;
59 type Future =
60 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
61
62 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
63 self.0.poll_ready(cx).map_err(Into::into)
64 }
65
66 fn call(&mut self, req: http::Request<hyper::body::Incoming>) -> Self::Future {
67 let mut req = req.map(|body| HttpBody::new(body));
68 let maybe_intercept = InterceptRequest::from_http(&req);
69
70 if let InterceptRequest::Health | InterceptRequest::Readiness = maybe_intercept {
72 *req.method_mut() = Method::POST;
74 *req.uri_mut() = Uri::from_static("/");
76
77 req.headers_mut().insert(http::header::CONTENT_TYPE, HEADER_VALUE_JSON);
79 req.headers_mut().insert(http::header::ACCEPT, HEADER_VALUE_JSON);
80
81 req = req.map(|_| HttpBody::from(RPC_SYSTEM_HEALTH_CALL));
83 }
84
85 let fut = self.0.call(req);
87
88 async move {
89 Ok(match maybe_intercept {
90 InterceptRequest::Deny => {
91 http_response(StatusCode::METHOD_NOT_ALLOWED, HttpBody::empty())
92 },
93 InterceptRequest::No => fut.await.map_err(|err| err.into())?,
94 InterceptRequest::Health => {
95 let res = fut.await.map_err(|err| err.into())?;
96 if let Ok(health) = parse_rpc_response(res.into_body()).await {
97 http_ok_response(serde_json::to_string(&health)?)
98 } else {
99 http_internal_error()
100 }
101 },
102 InterceptRequest::Readiness => {
103 let res = fut.await.map_err(|err| err.into())?;
104 match parse_rpc_response(res.into_body()).await {
105 Ok(health)
106 if (!health.is_syncing && health.peers > 0)
107 || !health.should_have_peers =>
108 {
109 http_ok_response(HttpBody::empty())
110 },
111 _ => http_internal_error(),
112 }
113 },
114 })
115 }
116 .boxed()
117 }
118}
119
120#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
122#[serde(rename_all = "camelCase")]
123struct Health {
124 pub peers: usize,
126 pub is_syncing: bool,
128 pub should_have_peers: bool,
132}
133
134fn http_ok_response<S: Into<HttpBody>>(body: S) -> HttpResponse {
135 http_response(StatusCode::OK, body)
136}
137
138fn http_response<S: Into<HttpBody>>(status_code: StatusCode, body: S) -> HttpResponse {
139 HttpResponse::builder()
140 .status(status_code)
141 .header(http::header::CONTENT_TYPE, HEADER_VALUE_JSON)
142 .body(body.into())
143 .expect("Header is valid; qed")
144}
145
146fn http_internal_error() -> HttpResponse {
147 http_response(hyper::StatusCode::INTERNAL_SERVER_ERROR, HttpBody::empty())
148}
149
150async fn parse_rpc_response(
151 body: HttpBody,
152) -> Result<Health, Box<dyn Error + Send + Sync + 'static>> {
153 use http_body_util::BodyExt;
154
155 let bytes = body.collect().await?.to_bytes();
156
157 let raw_rp = serde_json::from_slice::<RpcResponse<Health>>(&bytes)?;
158 let rp = RpcResponseSuccess::<Health>::try_from(raw_rp)?;
159
160 Ok(rp.result)
161}
162
163enum InterceptRequest {
165 Health,
167 Readiness,
171 No,
173 Deny,
177}
178
179impl InterceptRequest {
180 fn from_http(req: &HttpRequest) -> InterceptRequest {
181 match req.uri().path() {
182 "/health" => {
183 if req.method() == http::Method::GET {
184 InterceptRequest::Health
185 } else {
186 InterceptRequest::Deny
187 }
188 },
189 "/health/readiness" => {
190 if req.method() == http::Method::GET {
191 InterceptRequest::Readiness
192 } else {
193 InterceptRequest::Deny
194 }
195 },
196 _ => InterceptRequest::No,
198 }
199 }
200}