Skip to main content

sc_rpc_server/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Substrate RPC servers.
20
21#![warn(missing_docs)]
22
23pub mod middleware;
24pub mod utils;
25
26use std::{error::Error as StdError, net::SocketAddr, sync::Arc, time::Duration};
27
28use futures::future::BoxFuture;
29use jsonrpsee::{
30	core::BoxError,
31	server::{
32		serve_with_graceful_shutdown, stop_channel, ws, PingConfig, ServerHandle, StopHandle,
33	},
34	Methods, RpcModule,
35};
36use tower::Service;
37use utils::{
38	build_rpc_api, deny_unsafe, format_listen_addrs, get_proxy_ip, ListenAddrError, RpcSettings,
39};
40
41pub use ip_network::IpNetwork;
42pub use jsonrpsee::{
43	core::id_providers::{RandomIntegerIdProvider, RandomStringIdProvider},
44	server::{middleware::rpc::RpcServiceBuilder, BatchRequestConfig},
45};
46pub use middleware::{Metrics, MiddlewareLayer, NodeHealthProxyLayer, RpcMetrics};
47pub use utils::{RpcEndpoint, RpcMethods};
48
49const MEGABYTE: u32 = 1024 * 1024;
50
51/// Creates a dedicated tokio runtime for RPC operations.
52///
53/// This runtime isolates RPC blocking operations from the rest of the node
54/// by limiting the number of blocking threads to `max_connections`.
55pub fn create_rpc_runtime(max_connections: u32) -> std::io::Result<tokio::runtime::Runtime> {
56	tokio::runtime::Builder::new_multi_thread()
57		.thread_name("rpc")
58		.enable_all()
59		.max_blocking_threads((max_connections as usize).max(1))
60		.on_thread_start(|| {
61			sc_utils::metrics::TOKIO_THREADS_ALIVE.inc();
62			sc_utils::metrics::TOKIO_THREADS_TOTAL.inc();
63			middleware::RPC_THREADS_ALIVE.inc();
64			middleware::RPC_THREADS_TOTAL.inc();
65		})
66		.on_thread_stop(|| {
67			sc_utils::metrics::TOKIO_THREADS_ALIVE.dec();
68			middleware::RPC_THREADS_ALIVE.dec();
69		})
70		.build()
71}
72
73/// Spawn handle for RPC tasks that uses the dedicated RPC runtime.
74///
75/// This ensures all RPC-related task spawning (including rpc-spec-v2 APIs like
76/// chainHead and transactionWatch) runs on the isolated RPC runtime rather than
77/// the main node runtime.
78#[derive(Clone)]
79pub struct RpcSpawnHandle {
80	handle: tokio::runtime::Handle,
81}
82
83impl RpcSpawnHandle {
84	/// Create a new RpcSpawnHandle from a tokio runtime handle.
85	pub fn new(handle: tokio::runtime::Handle) -> Self {
86		Self { handle }
87	}
88}
89
90impl sp_core::traits::SpawnNamed for RpcSpawnHandle {
91	fn spawn_blocking(
92		&self,
93		_name: &'static str,
94		_group: Option<&'static str>,
95		future: BoxFuture<'static, ()>,
96	) {
97		let handle = self.handle.clone();
98		self.handle.spawn_blocking(move || {
99			handle.block_on(future);
100		});
101	}
102
103	fn spawn(
104		&self,
105		_name: &'static str,
106		_group: Option<&'static str>,
107		future: BoxFuture<'static, ()>,
108	) {
109		self.handle.spawn(future);
110	}
111}
112
113/// Type to encapsulate the server handle and listening address.
114pub struct Server {
115	/// Handle to the rpc server
116	handle: ServerHandle,
117	/// Listening address of the server
118	listen_addrs: Vec<SocketAddr>,
119	/// Dedicated RPC runtime (kept alive for the lifetime of the server)
120	rpc_runtime: Option<tokio::runtime::Runtime>,
121}
122
123impl Server {
124	/// Creates a new Server.
125	pub fn new(
126		handle: ServerHandle,
127		listen_addrs: Vec<SocketAddr>,
128		rpc_runtime: tokio::runtime::Runtime,
129	) -> Server {
130		Server { handle, listen_addrs, rpc_runtime: Some(rpc_runtime) }
131	}
132
133	/// Returns the `jsonrpsee::server::ServerHandle` for this Server. Can be used to stop the
134	/// server.
135	pub fn handle(&self) -> &ServerHandle {
136		&self.handle
137	}
138
139	/// The listen address for the running RPC service.
140	pub fn listen_addrs(&self) -> &[SocketAddr] {
141		&self.listen_addrs
142	}
143
144	/// Returns the spawn handle for tasks on the dedicated RPC runtime.
145	pub fn spawn_handle(&self) -> Arc<dyn sp_core::traits::SpawnNamed> {
146		Arc::new(RpcSpawnHandle::new(
147			self.rpc_runtime
148				.as_ref()
149				.expect("rpc_runtime is only taken in Drop; qed")
150				.handle()
151				.clone(),
152		))
153	}
154}
155
156impl Drop for Server {
157	fn drop(&mut self) {
158		// This doesn't not wait for the server to be stopped but fires the signal.
159		let _ = self.handle.stop();
160
161		// Use `shutdown_background()` to avoid blocking, which would panic if
162		// we are being dropped from within an async context.
163		if let Some(runtime) = self.rpc_runtime.take() {
164			runtime.shutdown_background();
165		}
166	}
167}
168
169/// Trait for providing subscription IDs that can be cloned.
170pub trait SubscriptionIdProvider:
171	jsonrpsee::core::traits::IdProvider + dyn_clone::DynClone
172{
173}
174
175dyn_clone::clone_trait_object!(SubscriptionIdProvider);
176
177/// RPC server configuration.
178pub struct Config<M: Send + Sync + 'static> {
179	/// RPC interfaces to start.
180	pub endpoints: Vec<RpcEndpoint>,
181	/// Metrics.
182	pub metrics: Option<RpcMetrics>,
183	/// RPC API module.
184	pub rpc_api: RpcModule<M>,
185	/// Subscription ID provider.
186	pub id_provider: Option<Box<dyn SubscriptionIdProvider>>,
187	/// RPC logger capacity (default: 1024).
188	pub request_logger_limit: u32,
189	/// Dedicated RPC runtime.
190	pub rpc_runtime: tokio::runtime::Runtime,
191}
192
193#[derive(Debug, Clone)]
194struct PerConnection {
195	methods: Methods,
196	stop_handle: StopHandle,
197	metrics: Option<RpcMetrics>,
198	tokio_handle: tokio::runtime::Handle,
199}
200
201/// Start RPC server listening on given address.
202pub async fn start_server<M>(config: Config<M>) -> Result<Server, Box<dyn StdError + Send + Sync>>
203where
204	M: Send + Sync,
205{
206	let Config { endpoints, metrics, rpc_api, id_provider, request_logger_limit, rpc_runtime } =
207		config;
208
209	let rpc_handle = rpc_runtime.handle().clone();
210
211	let (stop_handle, server_handle) = stop_channel();
212	let cfg = PerConnection {
213		methods: build_rpc_api(rpc_api).into(),
214		metrics,
215		tokio_handle: rpc_handle.clone(),
216		stop_handle,
217	};
218
219	let mut local_addrs = Vec::new();
220
221	for endpoint in endpoints {
222		let allowed_to_fail = endpoint.is_optional;
223		let local_addr = endpoint.listen_addr;
224
225		let mut listener = match endpoint.bind().await {
226			Ok(l) => l,
227			Err(e) if allowed_to_fail => {
228				log::debug!(target: "rpc", "JSON-RPC server failed to bind optional address: {:?}, error: {:?}", local_addr, e);
229				continue;
230			},
231			Err(e) => return Err(e),
232		};
233		let local_addr = listener.local_addr();
234		local_addrs.push(local_addr);
235		let cfg = cfg.clone();
236
237		let RpcSettings {
238			batch_config,
239			max_connections,
240			max_payload_in_mb,
241			max_payload_out_mb,
242			max_buffer_capacity_per_connection,
243			max_subscriptions_per_connection,
244			rpc_methods,
245			rate_limit_trust_proxy_headers,
246			rate_limit_whitelisted_ips,
247			host_filter,
248			cors,
249			rate_limit,
250		} = listener.rpc_settings();
251
252		let http_middleware = tower::ServiceBuilder::new()
253			.option_layer(host_filter)
254			// Proxy `GET /health, /health/readiness` requests to the internal
255			// `system_health` method.
256			.layer(NodeHealthProxyLayer::default())
257			.layer(cors);
258
259		let mut builder = jsonrpsee::server::Server::builder()
260			.max_request_body_size(max_payload_in_mb.saturating_mul(MEGABYTE))
261			.max_response_body_size(max_payload_out_mb.saturating_mul(MEGABYTE))
262			.max_connections(max_connections)
263			.max_subscriptions_per_connection(max_subscriptions_per_connection)
264			.enable_ws_ping(
265				PingConfig::new()
266					.ping_interval(Duration::from_secs(30))
267					.inactive_limit(Duration::from_secs(60))
268					.max_failures(3),
269			)
270			.set_http_middleware(http_middleware)
271			.set_message_buffer_capacity(max_buffer_capacity_per_connection)
272			.set_batch_request_config(batch_config)
273			.custom_tokio_runtime(rpc_handle.clone());
274
275		if let Some(provider) = id_provider.clone() {
276			builder = builder.set_id_provider(provider);
277		} else {
278			builder = builder.set_id_provider(RandomStringIdProvider::new(16));
279		};
280
281		let service_builder = builder.to_service_builder();
282		let deny_unsafe = deny_unsafe(&local_addr, &rpc_methods);
283
284		rpc_handle.spawn(async move {
285			loop {
286				let (sock, remote_addr) = tokio::select! {
287					res = listener.accept() => {
288						match res {
289							Ok(s) => s,
290							Err(e) => {
291								log::debug!(target: "rpc", "Failed to accept connection: {:?}", e);
292								continue;
293							}
294						}
295					}
296					_ = cfg.stop_handle.clone().shutdown() => break,
297				};
298
299				let ip = remote_addr.ip();
300				let cfg2 = cfg.clone();
301				let service_builder2 = service_builder.clone();
302				let rate_limit_whitelisted_ips2 = rate_limit_whitelisted_ips.clone();
303
304				let svc =
305					tower::service_fn(move |mut req: http::Request<hyper::body::Incoming>| {
306						req.extensions_mut().insert(deny_unsafe);
307
308						let PerConnection { methods, metrics, tokio_handle, stop_handle } =
309							cfg2.clone();
310						let service_builder = service_builder2.clone();
311
312						let proxy_ip =
313							if rate_limit_trust_proxy_headers { get_proxy_ip(&req) } else { None };
314
315						let rate_limit_cfg = if rate_limit_whitelisted_ips2
316							.iter()
317							.any(|ips| ips.contains(proxy_ip.unwrap_or(ip)))
318						{
319							log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is trusted, disabling rate-limit", proxy_ip);
320							None
321						} else {
322							if !rate_limit_whitelisted_ips2.is_empty() {
323								log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is not trusted, rate-limit enabled", proxy_ip);
324							}
325							rate_limit
326						};
327
328						let is_websocket = ws::is_upgrade_request(&req);
329						let transport_label = if is_websocket { "ws" } else { "http" };
330
331						let middleware_layer = match (metrics, rate_limit_cfg) {
332							(None, None) => None,
333							(Some(metrics), None) => Some(
334								MiddlewareLayer::new()
335									.with_metrics(Metrics::new(metrics, transport_label)),
336							),
337							(None, Some(rate_limit)) =>
338								Some(MiddlewareLayer::new().with_rate_limit_per_minute(rate_limit)),
339							(Some(metrics), Some(rate_limit)) => Some(
340								MiddlewareLayer::new()
341									.with_metrics(Metrics::new(metrics, transport_label))
342									.with_rate_limit_per_minute(rate_limit),
343							),
344						};
345
346						let rpc_middleware = RpcServiceBuilder::new()
347							.rpc_logger(request_logger_limit)
348							.option_layer(middleware_layer.clone());
349						let mut svc = service_builder
350							.set_rpc_middleware(rpc_middleware)
351							.build(methods, stop_handle);
352
353						async move {
354							if is_websocket {
355								let on_disconnect = svc.on_session_closed();
356
357								// Spawn a task to handle when the connection is closed.
358								tokio_handle.spawn(async move {
359									let now = std::time::Instant::now();
360									middleware_layer.as_ref().map(|m| m.ws_connect());
361									on_disconnect.await;
362									middleware_layer.as_ref().map(|m| m.ws_disconnect(now));
363								});
364							}
365
366							// https://github.com/rust-lang/rust/issues/102211 the error type can't be inferred
367							// to be `Box<dyn std::error::Error + Send + Sync>` so we need to
368							// convert it to a concrete type as workaround.
369							svc.call(req).await.map_err(|e| BoxError::from(e))
370						}
371					});
372
373				cfg.tokio_handle.spawn(serve_with_graceful_shutdown(
374					sock,
375					svc,
376					cfg.stop_handle.clone().shutdown(),
377				));
378			}
379		});
380	}
381
382	if local_addrs.is_empty() {
383		return Err(Box::new(ListenAddrError));
384	}
385
386	// The previous logging format was before
387	// `Running JSON-RPC server: addr=127.0.0.1:9944, allowed origins=["*"]`
388	//
389	// The new format is `Running JSON-RPC server: addr=<addr1, addr2, .. addr_n>`
390	// with the exception that for a single address it will be `Running JSON-RPC server: addr=addr,`
391	// with a trailing comma.
392	//
393	// This is to make it work with old scripts/utils that parse the logs.
394	log::info!("Running JSON-RPC server: addr={}", format_listen_addrs(&local_addrs));
395
396	Ok(Server::new(server_handle, local_addrs, rpc_runtime))
397}