sc_rpc_server/middleware/
node_health.rs1use std::{
22 error::Error,
23 future::Future,
24 pin::Pin,
25 task::{Context, Poll},
26};
27
28use futures::future::FutureExt;
29use http::{HeaderValue, Method, StatusCode, Uri};
30use jsonrpsee::{
31 server::{HttpBody, HttpRequest, HttpResponse},
32 types::{Response as RpcResponse, ResponseSuccess as RpcResponseSuccess},
33};
34use tower::Service;
35
36const RPC_SYSTEM_HEALTH_CALL: &str = r#"{"jsonrpc":"2.0","method":"system_health","id":0}"#;
37const HEADER_VALUE_JSON: HeaderValue = HeaderValue::from_static("application/json; charset=utf-8");
38
39#[derive(Debug, Clone, Default)]
42pub struct NodeHealthProxyLayer;
43
44impl<S> tower::Layer<S> for NodeHealthProxyLayer {
45 type Service = NodeHealthProxy<S>;
46
47 fn layer(&self, service: S) -> Self::Service {
48 NodeHealthProxy::new(service)
49 }
50}
51
52pub struct NodeHealthProxy<S>(S);
54
55impl<S> NodeHealthProxy<S> {
56 pub fn new(service: S) -> Self {
58 Self(service)
59 }
60}
61
62impl<S> tower::Service<http::Request<hyper::body::Incoming>> for NodeHealthProxy<S>
63where
64 S: Service<HttpRequest, Response = HttpResponse>,
65 S::Response: 'static,
66 S::Error: Into<Box<dyn Error + Send + Sync>> + 'static,
67 S::Future: Send + 'static,
68{
69 type Response = S::Response;
70 type Error = Box<dyn Error + Send + Sync + 'static>;
71 type Future =
72 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
73
74 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
75 self.0.poll_ready(cx).map_err(Into::into)
76 }
77
78 fn call(&mut self, req: http::Request<hyper::body::Incoming>) -> Self::Future {
79 let mut req = req.map(|body| HttpBody::new(body));
80 let maybe_intercept = InterceptRequest::from_http(&req);
81
82 if let InterceptRequest::Health | InterceptRequest::Readiness = maybe_intercept {
84 *req.method_mut() = Method::POST;
86 *req.uri_mut() = Uri::from_static("/");
88
89 req.headers_mut().insert(http::header::CONTENT_TYPE, HEADER_VALUE_JSON);
91 req.headers_mut().insert(http::header::ACCEPT, HEADER_VALUE_JSON);
92
93 req = req.map(|_| HttpBody::from(RPC_SYSTEM_HEALTH_CALL));
95 }
96
97 let fut = self.0.call(req);
99
100 async move {
101 Ok(match maybe_intercept {
102 InterceptRequest::Deny => {
103 http_response(StatusCode::METHOD_NOT_ALLOWED, HttpBody::empty())
104 },
105 InterceptRequest::No => fut.await.map_err(|err| err.into())?,
106 InterceptRequest::Health => {
107 let res = fut.await.map_err(|err| err.into())?;
108 if let Ok(health) = parse_rpc_response(res.into_body()).await {
109 http_ok_response(serde_json::to_string(&health)?)
110 } else {
111 http_internal_error()
112 }
113 },
114 InterceptRequest::Readiness => {
115 let res = fut.await.map_err(|err| err.into())?;
116 match parse_rpc_response(res.into_body()).await {
117 Ok(health)
118 if (!health.is_syncing && health.peers > 0) ||
119 !health.should_have_peers =>
120 {
121 http_ok_response(HttpBody::empty())
122 },
123 _ => http_internal_error(),
124 }
125 },
126 })
127 }
128 .boxed()
129 }
130}
131
132#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
134#[serde(rename_all = "camelCase")]
135struct Health {
136 pub peers: usize,
138 pub is_syncing: bool,
140 pub should_have_peers: bool,
144}
145
146fn http_ok_response<S: Into<HttpBody>>(body: S) -> HttpResponse {
147 http_response(StatusCode::OK, body)
148}
149
150fn http_response<S: Into<HttpBody>>(status_code: StatusCode, body: S) -> HttpResponse {
151 HttpResponse::builder()
152 .status(status_code)
153 .header(http::header::CONTENT_TYPE, HEADER_VALUE_JSON)
154 .body(body.into())
155 .expect("Header is valid; qed")
156}
157
158fn http_internal_error() -> HttpResponse {
159 http_response(hyper::StatusCode::INTERNAL_SERVER_ERROR, HttpBody::empty())
160}
161
162async fn parse_rpc_response(
163 body: HttpBody,
164) -> Result<Health, Box<dyn Error + Send + Sync + 'static>> {
165 use http_body_util::BodyExt;
166
167 let bytes = body.collect().await?.to_bytes();
168
169 let raw_rp = serde_json::from_slice::<RpcResponse<Health>>(&bytes)?;
170 let rp = RpcResponseSuccess::<Health>::try_from(raw_rp)?;
171
172 Ok(rp.result)
173}
174
175enum InterceptRequest {
177 Health,
179 Readiness,
183 No,
185 Deny,
189}
190
191impl InterceptRequest {
192 fn from_http(req: &HttpRequest) -> InterceptRequest {
193 match req.uri().path() {
194 "/health" => {
195 if req.method() == http::Method::GET {
196 InterceptRequest::Health
197 } else {
198 InterceptRequest::Deny
199 }
200 },
201 "/health/readiness" => {
202 if req.method() == http::Method::GET {
203 InterceptRequest::Readiness
204 } else {
205 InterceptRequest::Deny
206 }
207 },
208 _ => InterceptRequest::No,
210 }
211 }
212}