Skip to main content

soil_rpc/server/middleware/
node_health.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Middleware for handling `/health` and `/health/readiness` endpoints.
8
9use std::{
10	error::Error,
11	future::Future,
12	pin::Pin,
13	task::{Context, Poll},
14};
15
16use futures::future::FutureExt;
17use http::{HeaderValue, Method, StatusCode, Uri};
18use jsonrpsee::{
19	server::{HttpBody, HttpRequest, HttpResponse},
20	types::{Response as RpcResponse, ResponseSuccess as RpcResponseSuccess},
21};
22use tower::Service;
23
24const RPC_SYSTEM_HEALTH_CALL: &str = r#"{"jsonrpc":"2.0","method":"system_health","id":0}"#;
25const HEADER_VALUE_JSON: HeaderValue = HeaderValue::from_static("application/json; charset=utf-8");
26
27/// Layer that applies [`NodeHealthProxy`] which
28/// proxies `/health` and `/health/readiness` endpoints.
29#[derive(Debug, Clone, Default)]
30pub struct NodeHealthProxyLayer;
31
32impl<S> tower::Layer<S> for NodeHealthProxyLayer {
33	type Service = NodeHealthProxy<S>;
34
35	fn layer(&self, service: S) -> Self::Service {
36		NodeHealthProxy::new(service)
37	}
38}
39
40/// Middleware that proxies `/health` and `/health/readiness` endpoints.
41pub struct NodeHealthProxy<S>(S);
42
43impl<S> NodeHealthProxy<S> {
44	/// Creates a new [`NodeHealthProxy`].
45	pub fn new(service: S) -> Self {
46		Self(service)
47	}
48}
49
50impl<S> tower::Service<http::Request<hyper::body::Incoming>> for NodeHealthProxy<S>
51where
52	S: Service<HttpRequest, Response = HttpResponse>,
53	S::Response: 'static,
54	S::Error: Into<Box<dyn Error + Send + Sync>> + 'static,
55	S::Future: Send + 'static,
56{
57	type Response = S::Response;
58	type Error = Box<dyn Error + Send + Sync + 'static>;
59	type Future =
60		Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
61
62	fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
63		self.0.poll_ready(cx).map_err(Into::into)
64	}
65
66	fn call(&mut self, req: http::Request<hyper::body::Incoming>) -> Self::Future {
67		let mut req = req.map(|body| HttpBody::new(body));
68		let maybe_intercept = InterceptRequest::from_http(&req);
69
70		// Modify the request and proxy it to `system_health`
71		if let InterceptRequest::Health | InterceptRequest::Readiness = maybe_intercept {
72			// RPC methods are accessed with `POST`.
73			*req.method_mut() = Method::POST;
74			// Precautionary remove the URI.
75			*req.uri_mut() = Uri::from_static("/");
76
77			// Requests must have the following headers:
78			req.headers_mut().insert(http::header::CONTENT_TYPE, HEADER_VALUE_JSON);
79			req.headers_mut().insert(http::header::ACCEPT, HEADER_VALUE_JSON);
80
81			// Adjust the body to reflect the method call.
82			req = req.map(|_| HttpBody::from(RPC_SYSTEM_HEALTH_CALL));
83		}
84
85		// Call the inner service and get a future that resolves to the response.
86		let fut = self.0.call(req);
87
88		async move {
89			Ok(match maybe_intercept {
90				InterceptRequest::Deny => {
91					http_response(StatusCode::METHOD_NOT_ALLOWED, HttpBody::empty())
92				},
93				InterceptRequest::No => fut.await.map_err(|err| err.into())?,
94				InterceptRequest::Health => {
95					let res = fut.await.map_err(|err| err.into())?;
96					if let Ok(health) = parse_rpc_response(res.into_body()).await {
97						http_ok_response(serde_json::to_string(&health)?)
98					} else {
99						http_internal_error()
100					}
101				},
102				InterceptRequest::Readiness => {
103					let res = fut.await.map_err(|err| err.into())?;
104					match parse_rpc_response(res.into_body()).await {
105						Ok(health)
106							if (!health.is_syncing && health.peers > 0)
107								|| !health.should_have_peers =>
108						{
109							http_ok_response(HttpBody::empty())
110						},
111						_ => http_internal_error(),
112					}
113				},
114			})
115		}
116		.boxed()
117	}
118}
119
120// NOTE: This is duplicated here to avoid dependency to the `RPC API`.
121#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
122#[serde(rename_all = "camelCase")]
123struct Health {
124	/// Number of connected peers
125	pub peers: usize,
126	/// Is the node syncing
127	pub is_syncing: bool,
128	/// Should this node have any peers
129	///
130	/// Might be false for local chains or when running without discovery.
131	pub should_have_peers: bool,
132}
133
134fn http_ok_response<S: Into<HttpBody>>(body: S) -> HttpResponse {
135	http_response(StatusCode::OK, body)
136}
137
138fn http_response<S: Into<HttpBody>>(status_code: StatusCode, body: S) -> HttpResponse {
139	HttpResponse::builder()
140		.status(status_code)
141		.header(http::header::CONTENT_TYPE, HEADER_VALUE_JSON)
142		.body(body.into())
143		.expect("Header is valid; qed")
144}
145
146fn http_internal_error() -> HttpResponse {
147	http_response(hyper::StatusCode::INTERNAL_SERVER_ERROR, HttpBody::empty())
148}
149
150async fn parse_rpc_response(
151	body: HttpBody,
152) -> Result<Health, Box<dyn Error + Send + Sync + 'static>> {
153	use http_body_util::BodyExt;
154
155	let bytes = body.collect().await?.to_bytes();
156
157	let raw_rp = serde_json::from_slice::<RpcResponse<Health>>(&bytes)?;
158	let rp = RpcResponseSuccess::<Health>::try_from(raw_rp)?;
159
160	Ok(rp.result)
161}
162
163/// Whether the request should be treated as ordinary RPC call or be modified.
164enum InterceptRequest {
165	/// Proxy `/health` to `system_health`.
166	Health,
167	/// Checks if node has at least one peer and is not doing major syncing.
168	///
169	/// Returns HTTP status code 200 on success otherwise HTTP status code 500 is returned.
170	Readiness,
171	/// Treat as a ordinary RPC call and don't modify the request or response.
172	No,
173	/// Deny health or readiness calls that is not HTTP GET request.
174	///
175	/// Returns HTTP status code 405.
176	Deny,
177}
178
179impl InterceptRequest {
180	fn from_http(req: &HttpRequest) -> InterceptRequest {
181		match req.uri().path() {
182			"/health" => {
183				if req.method() == http::Method::GET {
184					InterceptRequest::Health
185				} else {
186					InterceptRequest::Deny
187				}
188			},
189			"/health/readiness" => {
190				if req.method() == http::Method::GET {
191					InterceptRequest::Readiness
192				} else {
193					InterceptRequest::Deny
194				}
195			},
196			// Forward all other requests to the RPC server.
197			_ => InterceptRequest::No,
198		}
199	}
200}