1#![warn(missing_docs)]
22
23pub mod middleware;
24pub mod utils;
25
26use std::{error::Error as StdError, net::SocketAddr, sync::Arc, time::Duration};
27
28use futures::future::BoxFuture;
29use jsonrpsee::{
30 core::BoxError,
31 server::{
32 serve_with_graceful_shutdown, stop_channel, ws, PingConfig, ServerHandle, StopHandle,
33 },
34 Methods, RpcModule,
35};
36use tower::Service;
37use utils::{
38 build_rpc_api, deny_unsafe, format_listen_addrs, get_proxy_ip, ListenAddrError, RpcSettings,
39};
40
41pub use ip_network::IpNetwork;
42pub use jsonrpsee::{
43 core::id_providers::{RandomIntegerIdProvider, RandomStringIdProvider},
44 server::{middleware::rpc::RpcServiceBuilder, BatchRequestConfig},
45};
46pub use middleware::{Metrics, MiddlewareLayer, NodeHealthProxyLayer, RpcMetrics};
47pub use utils::{RpcEndpoint, RpcMethods};
48
49const MEGABYTE: u32 = 1024 * 1024;
50
51pub fn create_rpc_runtime(max_connections: u32) -> std::io::Result<tokio::runtime::Runtime> {
56 tokio::runtime::Builder::new_multi_thread()
57 .thread_name("rpc")
58 .enable_all()
59 .max_blocking_threads((max_connections as usize).max(1))
60 .on_thread_start(|| {
61 sc_utils::metrics::TOKIO_THREADS_ALIVE.inc();
62 sc_utils::metrics::TOKIO_THREADS_TOTAL.inc();
63 middleware::RPC_THREADS_ALIVE.inc();
64 middleware::RPC_THREADS_TOTAL.inc();
65 })
66 .on_thread_stop(|| {
67 sc_utils::metrics::TOKIO_THREADS_ALIVE.dec();
68 middleware::RPC_THREADS_ALIVE.dec();
69 })
70 .build()
71}
72
73#[derive(Clone)]
79pub struct RpcSpawnHandle {
80 handle: tokio::runtime::Handle,
81}
82
83impl RpcSpawnHandle {
84 pub fn new(handle: tokio::runtime::Handle) -> Self {
86 Self { handle }
87 }
88}
89
90impl sp_core::traits::SpawnNamed for RpcSpawnHandle {
91 fn spawn_blocking(
92 &self,
93 _name: &'static str,
94 _group: Option<&'static str>,
95 future: BoxFuture<'static, ()>,
96 ) {
97 let handle = self.handle.clone();
98 self.handle.spawn_blocking(move || {
99 handle.block_on(future);
100 });
101 }
102
103 fn spawn(
104 &self,
105 _name: &'static str,
106 _group: Option<&'static str>,
107 future: BoxFuture<'static, ()>,
108 ) {
109 self.handle.spawn(future);
110 }
111}
112
113pub struct Server {
115 handle: ServerHandle,
117 listen_addrs: Vec<SocketAddr>,
119 rpc_runtime: Option<tokio::runtime::Runtime>,
121}
122
123impl Server {
124 pub fn new(
126 handle: ServerHandle,
127 listen_addrs: Vec<SocketAddr>,
128 rpc_runtime: tokio::runtime::Runtime,
129 ) -> Server {
130 Server { handle, listen_addrs, rpc_runtime: Some(rpc_runtime) }
131 }
132
133 pub fn handle(&self) -> &ServerHandle {
136 &self.handle
137 }
138
139 pub fn listen_addrs(&self) -> &[SocketAddr] {
141 &self.listen_addrs
142 }
143
144 pub fn spawn_handle(&self) -> Arc<dyn sp_core::traits::SpawnNamed> {
146 Arc::new(RpcSpawnHandle::new(
147 self.rpc_runtime
148 .as_ref()
149 .expect("rpc_runtime is only taken in Drop; qed")
150 .handle()
151 .clone(),
152 ))
153 }
154}
155
156impl Drop for Server {
157 fn drop(&mut self) {
158 let _ = self.handle.stop();
160
161 if let Some(runtime) = self.rpc_runtime.take() {
164 runtime.shutdown_background();
165 }
166 }
167}
168
169pub trait SubscriptionIdProvider:
171 jsonrpsee::core::traits::IdProvider + dyn_clone::DynClone
172{
173}
174
175dyn_clone::clone_trait_object!(SubscriptionIdProvider);
176
177pub struct Config<M: Send + Sync + 'static> {
179 pub endpoints: Vec<RpcEndpoint>,
181 pub metrics: Option<RpcMetrics>,
183 pub rpc_api: RpcModule<M>,
185 pub id_provider: Option<Box<dyn SubscriptionIdProvider>>,
187 pub request_logger_limit: u32,
189 pub rpc_runtime: tokio::runtime::Runtime,
191}
192
193#[derive(Debug, Clone)]
194struct PerConnection {
195 methods: Methods,
196 stop_handle: StopHandle,
197 metrics: Option<RpcMetrics>,
198 tokio_handle: tokio::runtime::Handle,
199}
200
201pub async fn start_server<M>(config: Config<M>) -> Result<Server, Box<dyn StdError + Send + Sync>>
203where
204 M: Send + Sync,
205{
206 let Config { endpoints, metrics, rpc_api, id_provider, request_logger_limit, rpc_runtime } =
207 config;
208
209 let rpc_handle = rpc_runtime.handle().clone();
210
211 let (stop_handle, server_handle) = stop_channel();
212 let cfg = PerConnection {
213 methods: build_rpc_api(rpc_api).into(),
214 metrics,
215 tokio_handle: rpc_handle.clone(),
216 stop_handle,
217 };
218
219 let mut local_addrs = Vec::new();
220
221 for endpoint in endpoints {
222 let allowed_to_fail = endpoint.is_optional;
223 let local_addr = endpoint.listen_addr;
224
225 let mut listener = match endpoint.bind().await {
226 Ok(l) => l,
227 Err(e) if allowed_to_fail => {
228 log::debug!(target: "rpc", "JSON-RPC server failed to bind optional address: {:?}, error: {:?}", local_addr, e);
229 continue;
230 },
231 Err(e) => return Err(e),
232 };
233 let local_addr = listener.local_addr();
234 local_addrs.push(local_addr);
235 let cfg = cfg.clone();
236
237 let RpcSettings {
238 batch_config,
239 max_connections,
240 max_payload_in_mb,
241 max_payload_out_mb,
242 max_buffer_capacity_per_connection,
243 max_subscriptions_per_connection,
244 rpc_methods,
245 rate_limit_trust_proxy_headers,
246 rate_limit_whitelisted_ips,
247 host_filter,
248 cors,
249 rate_limit,
250 } = listener.rpc_settings();
251
252 let http_middleware = tower::ServiceBuilder::new()
253 .option_layer(host_filter)
254 .layer(NodeHealthProxyLayer::default())
257 .layer(cors);
258
259 let mut builder = jsonrpsee::server::Server::builder()
260 .max_request_body_size(max_payload_in_mb.saturating_mul(MEGABYTE))
261 .max_response_body_size(max_payload_out_mb.saturating_mul(MEGABYTE))
262 .max_connections(max_connections)
263 .max_subscriptions_per_connection(max_subscriptions_per_connection)
264 .enable_ws_ping(
265 PingConfig::new()
266 .ping_interval(Duration::from_secs(30))
267 .inactive_limit(Duration::from_secs(60))
268 .max_failures(3),
269 )
270 .set_http_middleware(http_middleware)
271 .set_message_buffer_capacity(max_buffer_capacity_per_connection)
272 .set_batch_request_config(batch_config)
273 .custom_tokio_runtime(rpc_handle.clone());
274
275 if let Some(provider) = id_provider.clone() {
276 builder = builder.set_id_provider(provider);
277 } else {
278 builder = builder.set_id_provider(RandomStringIdProvider::new(16));
279 };
280
281 let service_builder = builder.to_service_builder();
282 let deny_unsafe = deny_unsafe(&local_addr, &rpc_methods);
283
284 rpc_handle.spawn(async move {
285 loop {
286 let (sock, remote_addr) = tokio::select! {
287 res = listener.accept() => {
288 match res {
289 Ok(s) => s,
290 Err(e) => {
291 log::debug!(target: "rpc", "Failed to accept connection: {:?}", e);
292 continue;
293 }
294 }
295 }
296 _ = cfg.stop_handle.clone().shutdown() => break,
297 };
298
299 let ip = remote_addr.ip();
300 let cfg2 = cfg.clone();
301 let service_builder2 = service_builder.clone();
302 let rate_limit_whitelisted_ips2 = rate_limit_whitelisted_ips.clone();
303
304 let svc =
305 tower::service_fn(move |mut req: http::Request<hyper::body::Incoming>| {
306 req.extensions_mut().insert(deny_unsafe);
307
308 let PerConnection { methods, metrics, tokio_handle, stop_handle } =
309 cfg2.clone();
310 let service_builder = service_builder2.clone();
311
312 let proxy_ip =
313 if rate_limit_trust_proxy_headers { get_proxy_ip(&req) } else { None };
314
315 let rate_limit_cfg = if rate_limit_whitelisted_ips2
316 .iter()
317 .any(|ips| ips.contains(proxy_ip.unwrap_or(ip)))
318 {
319 log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is trusted, disabling rate-limit", proxy_ip);
320 None
321 } else {
322 if !rate_limit_whitelisted_ips2.is_empty() {
323 log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is not trusted, rate-limit enabled", proxy_ip);
324 }
325 rate_limit
326 };
327
328 let is_websocket = ws::is_upgrade_request(&req);
329 let transport_label = if is_websocket { "ws" } else { "http" };
330
331 let middleware_layer = match (metrics, rate_limit_cfg) {
332 (None, None) => None,
333 (Some(metrics), None) => Some(
334 MiddlewareLayer::new()
335 .with_metrics(Metrics::new(metrics, transport_label)),
336 ),
337 (None, Some(rate_limit)) =>
338 Some(MiddlewareLayer::new().with_rate_limit_per_minute(rate_limit)),
339 (Some(metrics), Some(rate_limit)) => Some(
340 MiddlewareLayer::new()
341 .with_metrics(Metrics::new(metrics, transport_label))
342 .with_rate_limit_per_minute(rate_limit),
343 ),
344 };
345
346 let rpc_middleware = RpcServiceBuilder::new()
347 .rpc_logger(request_logger_limit)
348 .option_layer(middleware_layer.clone());
349 let mut svc = service_builder
350 .set_rpc_middleware(rpc_middleware)
351 .build(methods, stop_handle);
352
353 async move {
354 if is_websocket {
355 let on_disconnect = svc.on_session_closed();
356
357 tokio_handle.spawn(async move {
359 let now = std::time::Instant::now();
360 middleware_layer.as_ref().map(|m| m.ws_connect());
361 on_disconnect.await;
362 middleware_layer.as_ref().map(|m| m.ws_disconnect(now));
363 });
364 }
365
366 svc.call(req).await.map_err(|e| BoxError::from(e))
370 }
371 });
372
373 cfg.tokio_handle.spawn(serve_with_graceful_shutdown(
374 sock,
375 svc,
376 cfg.stop_handle.clone().shutdown(),
377 ));
378 }
379 });
380 }
381
382 if local_addrs.is_empty() {
383 return Err(Box::new(ListenAddrError));
384 }
385
386 log::info!("Running JSON-RPC server: addr={}", format_listen_addrs(&local_addrs));
395
396 Ok(Server::new(server_handle, local_addrs, rpc_runtime))
397}