Skip to main content

soil_rpc/server/
mod.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Substrate RPC servers.
8
9#![warn(missing_docs)]
10
11pub mod middleware;
12pub mod utils;
13
14use std::{error::Error as StdError, net::SocketAddr, time::Duration};
15
16use jsonrpsee::{
17	core::BoxError,
18	server::{
19		serve_with_graceful_shutdown, stop_channel, ws, PingConfig, ServerHandle, StopHandle,
20	},
21	Methods, RpcModule,
22};
23use tower::Service;
24use utils::{
25	build_rpc_api, deny_unsafe, format_listen_addrs, get_proxy_ip, ListenAddrError, RpcSettings,
26};
27
28pub use ip_network::IpNetwork;
29pub use jsonrpsee::{
30	core::id_providers::{RandomIntegerIdProvider, RandomStringIdProvider},
31	server::{middleware::rpc::RpcServiceBuilder, BatchRequestConfig},
32};
33pub use middleware::{Metrics, MiddlewareLayer, NodeHealthProxyLayer, RpcMetrics};
34pub use utils::{RpcEndpoint, RpcMethods};
35
36const MEGABYTE: u32 = 1024 * 1024;
37
38/// Type to encapsulate the server handle and listening address.
39pub struct Server {
40	/// Handle to the rpc server
41	handle: ServerHandle,
42	/// Listening address of the server
43	listen_addrs: Vec<SocketAddr>,
44}
45
46impl Server {
47	/// Creates a new Server.
48	pub fn new(handle: ServerHandle, listen_addrs: Vec<SocketAddr>) -> Server {
49		Server { handle, listen_addrs }
50	}
51
52	/// Returns the `jsonrpsee::server::ServerHandle` for this Server. Can be used to stop the
53	/// server.
54	pub fn handle(&self) -> &ServerHandle {
55		&self.handle
56	}
57
58	/// The listen address for the running RPC service.
59	pub fn listen_addrs(&self) -> &[SocketAddr] {
60		&self.listen_addrs
61	}
62}
63
64impl Drop for Server {
65	fn drop(&mut self) {
66		// This doesn't not wait for the server to be stopped but fires the signal.
67		let _ = self.handle.stop();
68	}
69}
70
71/// Trait for providing subscription IDs that can be cloned.
72pub trait SubscriptionIdProvider:
73	jsonrpsee::core::traits::IdProvider + dyn_clone::DynClone
74{
75}
76
77dyn_clone::clone_trait_object!(SubscriptionIdProvider);
78
79/// RPC server configuration.
80#[derive(Debug)]
81pub struct Config<M: Send + Sync + 'static> {
82	/// RPC interfaces to start.
83	pub endpoints: Vec<RpcEndpoint>,
84	/// Metrics.
85	pub metrics: Option<RpcMetrics>,
86	/// RPC API.
87	pub rpc_api: RpcModule<M>,
88	/// Subscription ID provider.
89	pub id_provider: Option<Box<dyn SubscriptionIdProvider>>,
90	/// Tokio runtime handle.
91	pub tokio_handle: tokio::runtime::Handle,
92	/// RPC logger capacity (default: 1024).
93	pub request_logger_limit: u32,
94}
95
96#[derive(Debug, Clone)]
97struct PerConnection {
98	methods: Methods,
99	stop_handle: StopHandle,
100	metrics: Option<RpcMetrics>,
101	tokio_handle: tokio::runtime::Handle,
102}
103
104/// Start RPC server listening on given address.
105pub async fn start_server<M>(config: Config<M>) -> Result<Server, Box<dyn StdError + Send + Sync>>
106where
107	M: Send + Sync,
108{
109	let Config { endpoints, metrics, tokio_handle, rpc_api, id_provider, request_logger_limit } =
110		config;
111
112	let (stop_handle, server_handle) = stop_channel();
113	let cfg = PerConnection {
114		methods: build_rpc_api(rpc_api).into(),
115		metrics,
116		tokio_handle: tokio_handle.clone(),
117		stop_handle,
118	};
119
120	let mut local_addrs = Vec::new();
121
122	for endpoint in endpoints {
123		let allowed_to_fail = endpoint.is_optional;
124		let local_addr = endpoint.listen_addr;
125
126		let mut listener = match endpoint.bind().await {
127			Ok(l) => l,
128			Err(e) if allowed_to_fail => {
129				log::debug!(target: "rpc", "JSON-RPC server failed to bind optional address: {:?}, error: {:?}", local_addr, e);
130				continue;
131			},
132			Err(e) => return Err(e),
133		};
134		let local_addr = listener.local_addr();
135		local_addrs.push(local_addr);
136		let cfg = cfg.clone();
137
138		let RpcSettings {
139			batch_config,
140			max_connections,
141			max_payload_in_mb,
142			max_payload_out_mb,
143			max_buffer_capacity_per_connection,
144			max_subscriptions_per_connection,
145			rpc_methods,
146			rate_limit_trust_proxy_headers,
147			rate_limit_whitelisted_ips,
148			host_filter,
149			cors,
150			rate_limit,
151		} = listener.rpc_settings();
152
153		let http_middleware = tower::ServiceBuilder::new()
154			.option_layer(host_filter)
155			// Proxy `GET /health, /health/readiness` requests to the internal
156			// `system_health` method.
157			.layer(NodeHealthProxyLayer::default())
158			.layer(cors);
159
160		let mut builder = jsonrpsee::server::Server::builder()
161			.max_request_body_size(max_payload_in_mb.saturating_mul(MEGABYTE))
162			.max_response_body_size(max_payload_out_mb.saturating_mul(MEGABYTE))
163			.max_connections(max_connections)
164			.max_subscriptions_per_connection(max_subscriptions_per_connection)
165			.enable_ws_ping(
166				PingConfig::new()
167					.ping_interval(Duration::from_secs(30))
168					.inactive_limit(Duration::from_secs(60))
169					.max_failures(3),
170			)
171			.set_http_middleware(http_middleware)
172			.set_message_buffer_capacity(max_buffer_capacity_per_connection)
173			.set_batch_request_config(batch_config)
174			.custom_tokio_runtime(cfg.tokio_handle.clone());
175
176		if let Some(provider) = id_provider.clone() {
177			builder = builder.set_id_provider(provider);
178		} else {
179			builder = builder.set_id_provider(RandomStringIdProvider::new(16));
180		};
181
182		let service_builder = builder.to_service_builder();
183		let deny_unsafe = deny_unsafe(&local_addr, &rpc_methods);
184
185		tokio_handle.spawn(async move {
186			loop {
187				let (sock, remote_addr) = tokio::select! {
188					res = listener.accept() => {
189						match res {
190							Ok(s) => s,
191							Err(e) => {
192								log::debug!(target: "rpc", "Failed to accept connection: {:?}", e);
193								continue;
194							}
195						}
196					}
197					_ = cfg.stop_handle.clone().shutdown() => break,
198				};
199
200				let ip = remote_addr.ip();
201				let cfg2 = cfg.clone();
202				let service_builder2 = service_builder.clone();
203				let rate_limit_whitelisted_ips2 = rate_limit_whitelisted_ips.clone();
204
205				let svc =
206					tower::service_fn(move |mut req: http::Request<hyper::body::Incoming>| {
207						req.extensions_mut().insert(deny_unsafe);
208
209						let PerConnection { methods, metrics, tokio_handle, stop_handle } =
210							cfg2.clone();
211						let service_builder = service_builder2.clone();
212
213						let proxy_ip =
214							if rate_limit_trust_proxy_headers { get_proxy_ip(&req) } else { None };
215
216						let rate_limit_cfg = if rate_limit_whitelisted_ips2
217							.iter()
218							.any(|ips| ips.contains(proxy_ip.unwrap_or(ip)))
219						{
220							log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is trusted, disabling rate-limit", proxy_ip);
221							None
222						} else {
223							if !rate_limit_whitelisted_ips2.is_empty() {
224								log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is not trusted, rate-limit enabled", proxy_ip);
225							}
226							rate_limit
227						};
228
229						let is_websocket = ws::is_upgrade_request(&req);
230						let transport_label = if is_websocket { "ws" } else { "http" };
231
232						let middleware_layer = match (metrics, rate_limit_cfg) {
233							(None, None) => None,
234							(Some(metrics), None) => Some(
235								MiddlewareLayer::new()
236									.with_metrics(Metrics::new(metrics, transport_label)),
237							),
238							(None, Some(rate_limit)) =>
239								Some(MiddlewareLayer::new().with_rate_limit_per_minute(rate_limit)),
240							(Some(metrics), Some(rate_limit)) => Some(
241								MiddlewareLayer::new()
242									.with_metrics(Metrics::new(metrics, transport_label))
243									.with_rate_limit_per_minute(rate_limit),
244							),
245						};
246
247						let rpc_middleware = RpcServiceBuilder::new()
248							.rpc_logger(request_logger_limit)
249							.option_layer(middleware_layer.clone());
250						let mut svc = service_builder
251							.set_rpc_middleware(rpc_middleware)
252							.build(methods, stop_handle);
253
254						async move {
255							if is_websocket {
256								let on_disconnect = svc.on_session_closed();
257
258								// Spawn a task to handle when the connection is closed.
259								tokio_handle.spawn(async move {
260									let now = std::time::Instant::now();
261									middleware_layer.as_ref().map(|m| m.ws_connect());
262									on_disconnect.await;
263									middleware_layer.as_ref().map(|m| m.ws_disconnect(now));
264								});
265							}
266
267							// https://github.com/rust-lang/rust/issues/102211 the error type can't be inferred
268							// to be `Box<dyn std::error::Error + Send + Sync>` so we need to
269							// convert it to a concrete type as workaround.
270							svc.call(req).await.map_err(|e| BoxError::from(e))
271						}
272					});
273
274				cfg.tokio_handle.spawn(serve_with_graceful_shutdown(
275					sock,
276					svc,
277					cfg.stop_handle.clone().shutdown(),
278				));
279			}
280		});
281	}
282
283	if local_addrs.is_empty() {
284		return Err(Box::new(ListenAddrError));
285	}
286
287	// The previous logging format was before
288	// `Running JSON-RPC server: addr=127.0.0.1:9944, allowed origins=["*"]`
289	//
290	// The new format is `Running JSON-RPC server: addr=<addr1, addr2, .. addr_n>`
291	// with the exception that for a single address it will be `Running JSON-RPC server: addr=addr,`
292	// with a trailing comma.
293	//
294	// This is to make it work with old scripts/utils that parse the logs.
295	log::info!("Running JSON-RPC server: addr={}", format_listen_addrs(&local_addrs));
296
297	Ok(Server::new(server_handle, local_addrs))
298}