jsonrpsee_server/
server.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27use std::error::Error as StdError;
28use std::future::Future;
29use std::net::{SocketAddr, TcpListener as StdTcpListener};
30use std::pin::Pin;
31use std::sync::Arc;
32use std::sync::atomic::AtomicU32;
33use std::task::Poll;
34use std::time::Duration;
35
36use crate::future::{ConnectionGuard, ServerHandle, SessionClose, SessionClosedFuture, StopHandle, session_close};
37use crate::middleware::rpc::{RpcService, RpcServiceCfg};
38use crate::transport::ws::BackgroundTaskParams;
39use crate::transport::{http, ws};
40use crate::utils::deserialize_with_ext;
41use crate::{Extensions, HttpBody, HttpRequest, HttpResponse, LOG_TARGET};
42
43use futures_util::future::{self, Either, FutureExt};
44use futures_util::io::{BufReader, BufWriter};
45use hyper::body::Bytes;
46use hyper_util::rt::{TokioExecutor, TokioIo};
47use jsonrpsee_core::id_providers::RandomIntegerIdProvider;
48use jsonrpsee_core::middleware::{Batch, BatchEntry, BatchEntryErr, RpcServiceBuilder, RpcServiceT};
49use jsonrpsee_core::server::helpers::prepare_error;
50use jsonrpsee_core::server::{BoundedSubscriptions, ConnectionId, MethodResponse, MethodSink, Methods};
51use jsonrpsee_core::traits::IdProvider;
52use jsonrpsee_core::{BoxError, JsonRawValue, TEN_MB_SIZE_BYTES};
53use jsonrpsee_types::error::{
54	BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG, ErrorCode, reject_too_big_batch_request,
55};
56use jsonrpsee_types::{ErrorObject, Id};
57use soketto::handshake::http::is_upgrade_request;
58use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
59use tokio::sync::{OwnedSemaphorePermit, mpsc, watch};
60use tokio_util::compat::TokioAsyncReadCompatExt;
61use tower::layer::util::Identity;
62use tower::{Layer, Service};
63use tracing::{Instrument, instrument};
64
65/// Default maximum connections allowed.
66const MAX_CONNECTIONS: u32 = 100;
67
68type Notif<'a> = Option<std::borrow::Cow<'a, JsonRawValue>>;
69
70/// JSON RPC server.
71pub struct Server<HttpMiddleware = Identity, RpcMiddleware = Identity> {
72	listener: TcpListener,
73	server_cfg: ServerConfig,
74	rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
75	http_middleware: tower::ServiceBuilder<HttpMiddleware>,
76}
77
78impl Server<Identity, Identity> {
79	/// Create a builder for the server.
80	pub fn builder() -> Builder<Identity, Identity> {
81		Builder::new()
82	}
83}
84
85impl<RpcMiddleware, HttpMiddleware> std::fmt::Debug for Server<RpcMiddleware, HttpMiddleware> {
86	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87		f.debug_struct("Server").field("listener", &self.listener).field("server_cfg", &self.server_cfg).finish()
88	}
89}
90
91impl<RpcMiddleware, HttpMiddleware> Server<RpcMiddleware, HttpMiddleware> {
92	/// Returns socket address to which the server is bound.
93	pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
94		self.listener.local_addr()
95	}
96}
97
98impl<HttpMiddleware, RpcMiddleware, Body> Server<HttpMiddleware, RpcMiddleware>
99where
100	RpcMiddleware: tower::Layer<RpcService> + Clone + Send + 'static,
101	<RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT,
102	HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
103	<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service:
104		Send + Clone + Service<HttpRequest, Response = HttpResponse<Body>, Error = BoxError>,
105	<<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as Service<HttpRequest>>::Future: Send,
106	Body: http_body::Body<Data = Bytes> + Send + 'static,
107	<Body as http_body::Body>::Error: Into<BoxError>,
108	<Body as http_body::Body>::Data: Send,
109{
110	/// Start responding to connections requests.
111	///
112	/// This will run on the tokio runtime until the server is stopped or the `ServerHandle` is dropped.
113	pub fn start(mut self, methods: impl Into<Methods>) -> ServerHandle {
114		let methods = methods.into();
115		let (stop_tx, stop_rx) = watch::channel(());
116
117		let stop_handle = StopHandle::new(stop_rx);
118
119		match self.server_cfg.tokio_runtime.take() {
120			Some(rt) => rt.spawn(self.start_inner(methods, stop_handle)),
121			None => tokio::spawn(self.start_inner(methods, stop_handle)),
122		};
123
124		ServerHandle::new(stop_tx)
125	}
126
127	async fn start_inner(self, methods: Methods, stop_handle: StopHandle) {
128		let mut id: u32 = 0;
129		let connection_guard = ConnectionGuard::new(self.server_cfg.max_connections as usize);
130		let listener = self.listener;
131
132		let stopped = stop_handle.clone().shutdown();
133		tokio::pin!(stopped);
134
135		let (drop_on_completion, mut process_connection_awaiter) = mpsc::channel::<()>(1);
136
137		loop {
138			match try_accept_conn(&listener, stopped).await {
139				AcceptConnection::Established { socket, remote_addr, stop } => {
140					process_connection(ProcessConnection {
141						http_middleware: &self.http_middleware,
142						rpc_middleware: self.rpc_middleware.clone(),
143						remote_addr,
144						methods: methods.clone(),
145						stop_handle: stop_handle.clone(),
146						conn_id: id,
147						server_cfg: self.server_cfg.clone(),
148						conn_guard: &connection_guard,
149						socket,
150						drop_on_completion: drop_on_completion.clone(),
151					});
152					id = id.wrapping_add(1);
153					stopped = stop;
154				}
155				AcceptConnection::Err((e, stop)) => {
156					tracing::debug!(target: LOG_TARGET, "Error while awaiting a new connection: {:?}", e);
157					stopped = stop;
158				}
159				AcceptConnection::Shutdown => break,
160			}
161		}
162
163		// Drop the last Sender
164		drop(drop_on_completion);
165
166		// Once this channel is closed it is safe to assume that all connections have been gracefully shutdown
167		while process_connection_awaiter.recv().await.is_some() {
168			// Generally, messages should not be sent across this channel,
169			// but we'll loop here to wait for `None` just to be on the safe side
170		}
171	}
172}
173
174/// Static server configuration which is shared per connection.
175#[derive(Debug, Clone)]
176pub struct ServerConfig {
177	/// Maximum size in bytes of a request.
178	pub(crate) max_request_body_size: u32,
179	/// Maximum size in bytes of a response.
180	pub(crate) max_response_body_size: u32,
181	/// Maximum number of incoming connections allowed.
182	pub(crate) max_connections: u32,
183	/// Maximum number of subscriptions per connection.
184	pub(crate) max_subscriptions_per_connection: u32,
185	/// Whether batch requests are supported by this server or not.
186	pub(crate) batch_requests_config: BatchRequestConfig,
187	/// Custom tokio runtime to run the server on.
188	pub(crate) tokio_runtime: Option<tokio::runtime::Handle>,
189	/// Enable HTTP.
190	pub(crate) enable_http: bool,
191	/// Enable WS.
192	pub(crate) enable_ws: bool,
193	/// Number of messages that server is allowed to `buffer` until backpressure kicks in.
194	pub(crate) message_buffer_capacity: u32,
195	/// Ping settings.
196	pub(crate) ping_config: Option<PingConfig>,
197	/// ID provider.
198	pub(crate) id_provider: Arc<dyn IdProvider>,
199	/// `TCP_NODELAY` settings.
200	pub(crate) tcp_no_delay: bool,
201}
202
203/// The builder to configure and create a JSON-RPC server configuration.
204#[derive(Debug, Clone)]
205pub struct ServerConfigBuilder {
206	/// Maximum size in bytes of a request.
207	max_request_body_size: u32,
208	/// Maximum size in bytes of a response.
209	max_response_body_size: u32,
210	/// Maximum number of incoming connections allowed.
211	max_connections: u32,
212	/// Maximum number of subscriptions per connection.
213	max_subscriptions_per_connection: u32,
214	/// Whether batch requests are supported by this server or not.
215	batch_requests_config: BatchRequestConfig,
216	/// Custom tokio runtime to run the server on.
217	tokio_runtime: Option<tokio::runtime::Handle>,
218	/// Enable HTTP.
219	enable_http: bool,
220	/// Enable WS.
221	enable_ws: bool,
222	/// Number of messages that server is allowed to `buffer` until backpressure kicks in.
223	message_buffer_capacity: u32,
224	/// Ping settings.
225	ping_config: Option<PingConfig>,
226	/// ID provider.
227	id_provider: Arc<dyn IdProvider>,
228	/// `TCP_NODELAY` settings.
229	tcp_no_delay: bool,
230}
231
232/// Builder for [`TowerService`].
233#[derive(Debug, Clone)]
234pub struct TowerServiceBuilder<RpcMiddleware, HttpMiddleware> {
235	/// ServerConfig
236	pub(crate) server_cfg: ServerConfig,
237	/// RPC middleware.
238	pub(crate) rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
239	/// HTTP middleware.
240	pub(crate) http_middleware: tower::ServiceBuilder<HttpMiddleware>,
241	/// Connection ID.
242	pub(crate) conn_id: Arc<AtomicU32>,
243	/// Connection guard.
244	pub(crate) conn_guard: ConnectionGuard,
245}
246
247/// Configuration for batch request handling.
248#[derive(Debug, Copy, Clone)]
249pub enum BatchRequestConfig {
250	/// Batch requests are disabled.
251	Disabled,
252	/// Each batch request is limited to `len` and any batch request bigger than `len` will not be processed.
253	Limit(u32),
254	/// The batch request is unlimited.
255	Unlimited,
256}
257
258/// Connection related state that is needed
259/// to execute JSON-RPC calls.
260#[derive(Debug, Clone)]
261pub struct ConnectionState {
262	/// Stop handle.
263	pub(crate) stop_handle: StopHandle,
264	/// Connection ID
265	pub(crate) conn_id: u32,
266	/// Connection guard.
267	pub(crate) _conn_permit: Arc<OwnedSemaphorePermit>,
268}
269
270impl ConnectionState {
271	/// Create a new connection state.
272	pub fn new(stop_handle: StopHandle, conn_id: u32, conn_permit: OwnedSemaphorePermit) -> ConnectionState {
273		Self { stop_handle, conn_id, _conn_permit: Arc::new(conn_permit) }
274	}
275}
276
277/// Configuration for WebSocket ping/pong mechanism and it may be used to disconnect
278/// an inactive connection.
279///
280/// jsonrpsee doesn't associate the ping/pong frames just that if
281/// a pong frame isn't received within the `inactive_limit` then it's regarded
282/// as missed.
283///
284/// Such that the `inactive_limit` should be configured to longer than a single
285/// WebSocket ping takes or it might be missed and may end up
286/// terminating the connection.
287///
288/// Default: ping_interval: 30 seconds, max failures: 1 and inactive limit: 40 seconds.
289#[derive(Debug, Copy, Clone)]
290pub struct PingConfig {
291	/// Period which the server pings the connected client.
292	pub(crate) ping_interval: Duration,
293	/// Max allowed time for a connection to stay idle.
294	pub(crate) inactive_limit: Duration,
295	/// Max failures.
296	pub(crate) max_failures: usize,
297}
298
299impl Default for PingConfig {
300	fn default() -> Self {
301		Self { ping_interval: Duration::from_secs(30), max_failures: 1, inactive_limit: Duration::from_secs(40) }
302	}
303}
304
305impl PingConfig {
306	/// Create a new PingConfig.
307	pub fn new() -> Self {
308		Self::default()
309	}
310
311	/// Configure the interval when the WebSocket pings are sent out.
312	pub fn ping_interval(mut self, ping_interval: Duration) -> Self {
313		self.ping_interval = ping_interval;
314		self
315	}
316
317	/// Configure how long to wait for the WebSocket pong.
318	/// When this limit is expired it's regarded as the client is unresponsive.
319	///
320	/// You may configure how many times the client is allowed to be "inactive" by
321	/// [`PingConfig::max_failures`].
322	pub fn inactive_limit(mut self, inactivity_limit: Duration) -> Self {
323		self.inactive_limit = inactivity_limit;
324		self
325	}
326
327	/// Configure how many times the remote peer is allowed be
328	/// inactive until the connection is closed.
329	///
330	/// # Panics
331	///
332	/// This method panics if `max` == 0.
333	pub fn max_failures(mut self, max: usize) -> Self {
334		assert!(max > 0);
335		self.max_failures = max;
336		self
337	}
338}
339
340impl Default for ServerConfig {
341	fn default() -> Self {
342		ServerConfig::builder().build()
343	}
344}
345
346impl ServerConfig {
347	/// Create a new builder for the [`ServerConfig`].
348	pub fn builder() -> ServerConfigBuilder {
349		ServerConfigBuilder::default()
350	}
351}
352
353impl Default for ServerConfigBuilder {
354	fn default() -> Self {
355		ServerConfigBuilder {
356			max_request_body_size: TEN_MB_SIZE_BYTES,
357			max_response_body_size: TEN_MB_SIZE_BYTES,
358			max_connections: MAX_CONNECTIONS,
359			max_subscriptions_per_connection: 1024,
360			batch_requests_config: BatchRequestConfig::Unlimited,
361			tokio_runtime: None,
362			enable_http: true,
363			enable_ws: true,
364			message_buffer_capacity: 1024,
365			ping_config: None,
366			id_provider: Arc::new(RandomIntegerIdProvider),
367			tcp_no_delay: true,
368		}
369	}
370}
371
372impl ServerConfigBuilder {
373	/// Create a new [`ServerConfigBuilder`].
374	pub fn new() -> Self {
375		Self::default()
376	}
377
378	/// Set the maximum size of a request body in bytes. Default is 10 MiB.
379	pub fn max_request_body_size(mut self, size: u32) -> Self {
380		self.max_request_body_size = size;
381		self
382	}
383
384	/// Set the maximum size of a response body in bytes. Default is 10 MiB.
385	pub fn max_response_body_size(mut self, size: u32) -> Self {
386		self.max_response_body_size = size;
387		self
388	}
389
390	/// Set the maximum number of connections allowed. Default is 100.
391	pub fn max_connections(mut self, max: u32) -> Self {
392		self.max_connections = max;
393		self
394	}
395
396	/// Set the maximum number of connections allowed. Default is 1024.
397	pub fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
398		self.max_subscriptions_per_connection = max;
399		self
400	}
401
402	/// Configure how [batch requests](https://www.jsonrpc.org/specification#batch) shall be handled
403	/// by the server.
404	///
405	/// Default: batch requests are allowed and can be arbitrary big but the maximum payload size is limited.
406	pub fn set_batch_request_config(mut self, cfg: BatchRequestConfig) -> Self {
407		self.batch_requests_config = cfg;
408		self
409	}
410
411	/// Configure a custom [`tokio::runtime::Handle`] to run the server on.
412	///
413	/// Default: [`tokio::spawn`]
414	pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
415		self.tokio_runtime = Some(rt);
416		self
417	}
418
419	/// Configure the server to only serve JSON-RPC HTTP requests.
420	///
421	/// Default: both http and ws are enabled.
422	pub fn http_only(mut self) -> Self {
423		self.enable_http = true;
424		self.enable_ws = false;
425		self
426	}
427
428	/// Configure the server to only serve JSON-RPC WebSocket requests.
429	///
430	/// That implies that server just denies HTTP requests which isn't a WebSocket upgrade request
431	///
432	/// Default: both http and ws are enabled.
433	pub fn ws_only(mut self) -> Self {
434		self.enable_http = false;
435		self.enable_ws = true;
436		self
437	}
438
439	/// The server enforces backpressure which means that
440	/// `n` messages can be buffered and if the client
441	/// can't keep with up the server.
442	///
443	/// This `capacity` is applied per connection and
444	/// applies globally on the connection which implies
445	/// all JSON-RPC messages.
446	///
447	/// For example if a subscription produces plenty of new items
448	/// and the client can't keep up then no new messages are handled.
449	///
450	/// If this limit is exceeded then the server will "back-off"
451	/// and only accept new messages once the client reads pending messages.
452	///
453	/// # Panics
454	///
455	/// Panics if the buffer capacity is 0.
456	///
457	pub fn set_message_buffer_capacity(mut self, c: u32) -> Self {
458		assert!(c > 0, "buffer capacity must be set to > 0");
459		self.message_buffer_capacity = c;
460		self
461	}
462
463	/// Enable WebSocket ping/pong on the server.
464	///
465	/// Default: pings are disabled.
466	///
467	/// # Examples
468	///
469	/// ```rust
470	/// use std::{time::Duration, num::NonZeroUsize};
471	/// use jsonrpsee_server::{ServerConfigBuilder, PingConfig};
472	///
473	/// // Set the ping interval to 10 seconds but terminates the connection if a client is inactive for more than 2 minutes
474	/// let ping_cfg = PingConfig::new().ping_interval(Duration::from_secs(10)).inactive_limit(Duration::from_secs(60 * 2));
475	/// let builder = ServerConfigBuilder::default().enable_ws_ping(ping_cfg);
476	/// ```
477	pub fn enable_ws_ping(mut self, config: PingConfig) -> Self {
478		self.ping_config = Some(config);
479		self
480	}
481
482	/// Disable WebSocket ping/pong on the server.
483	///
484	/// Default: pings are disabled.
485	pub fn disable_ws_ping(mut self) -> Self {
486		self.ping_config = None;
487		self
488	}
489
490	/// Configure custom `subscription ID` provider for the server to use
491	/// to when getting new subscription calls.
492	///
493	/// You may choose static dispatch or dynamic dispatch because
494	/// `IdProvider` is implemented for `Box<T>`.
495	///
496	/// Default: [`RandomIntegerIdProvider`].
497	///
498	/// # Examples
499	///
500	/// ```rust
501	/// use jsonrpsee_server::{ServerConfigBuilder, RandomStringIdProvider, IdProvider};
502	///
503	/// // static dispatch
504	/// let builder1 = ServerConfigBuilder::default().set_id_provider(RandomStringIdProvider::new(16));
505	///
506	/// // or dynamic dispatch
507	/// let builder2 = ServerConfigBuilder::default().set_id_provider(Box::new(RandomStringIdProvider::new(16)));
508	/// ```
509	///
510	pub fn set_id_provider<I: IdProvider + 'static>(mut self, id_provider: I) -> Self {
511		self.id_provider = Arc::new(id_provider);
512		self
513	}
514
515	/// Configure `TCP_NODELAY` on the socket to the supplied value `nodelay`.
516	///
517	/// Default is `true`.
518	pub fn set_tcp_no_delay(mut self, no_delay: bool) -> Self {
519		self.tcp_no_delay = no_delay;
520		self
521	}
522
523	/// Build the [`ServerConfig`].
524	pub fn build(self) -> ServerConfig {
525		ServerConfig {
526			max_request_body_size: self.max_request_body_size,
527			max_response_body_size: self.max_response_body_size,
528			max_connections: self.max_connections,
529			max_subscriptions_per_connection: self.max_subscriptions_per_connection,
530			batch_requests_config: self.batch_requests_config,
531			tokio_runtime: self.tokio_runtime,
532			enable_http: self.enable_http,
533			enable_ws: self.enable_ws,
534			message_buffer_capacity: self.message_buffer_capacity,
535			ping_config: self.ping_config,
536			id_provider: self.id_provider,
537			tcp_no_delay: self.tcp_no_delay,
538		}
539	}
540}
541
542/// Builder to configure and create a JSON-RPC server.
543#[derive(Debug)]
544pub struct Builder<HttpMiddleware, RpcMiddleware> {
545	server_cfg: ServerConfig,
546	rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
547	http_middleware: tower::ServiceBuilder<HttpMiddleware>,
548}
549
550impl Default for Builder<Identity, Identity> {
551	fn default() -> Self {
552		Builder {
553			server_cfg: ServerConfig::default(),
554			rpc_middleware: RpcServiceBuilder::new(),
555			http_middleware: tower::ServiceBuilder::new(),
556		}
557	}
558}
559
560impl Builder<Identity, Identity> {
561	/// Create a default server builder.
562	pub fn new() -> Self {
563		Self::default()
564	}
565
566	/// Create a server builder with the given [`ServerConfig`].
567	pub fn with_config(config: ServerConfig) -> Self {
568		Self { server_cfg: config, ..Default::default() }
569	}
570}
571
572impl<RpcMiddleware, HttpMiddleware> TowerServiceBuilder<RpcMiddleware, HttpMiddleware> {
573	/// Build a tower service.
574	pub fn build(
575		self,
576		methods: impl Into<Methods>,
577		stop_handle: StopHandle,
578	) -> TowerService<RpcMiddleware, HttpMiddleware> {
579		let conn_id = self.conn_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
580
581		let rpc_middleware = TowerServiceNoHttp {
582			rpc_middleware: self.rpc_middleware,
583			inner: ServiceData {
584				methods: methods.into(),
585				stop_handle,
586				conn_id,
587				conn_guard: self.conn_guard,
588				server_cfg: self.server_cfg,
589			},
590			on_session_close: None,
591		};
592
593		TowerService { rpc_middleware, http_middleware: self.http_middleware }
594	}
595
596	/// Configure the connection id.
597	///
598	/// This is incremented every time `build` is called.
599	pub fn connection_id(mut self, id: u32) -> Self {
600		self.conn_id = Arc::new(AtomicU32::new(id));
601		self
602	}
603
604	/// Configure the max allowed connections on the server.
605	pub fn max_connections(mut self, limit: u32) -> Self {
606		self.conn_guard = ConnectionGuard::new(limit as usize);
607		self
608	}
609
610	/// Configure rpc middleware.
611	pub fn set_rpc_middleware<T>(self, rpc_middleware: RpcServiceBuilder<T>) -> TowerServiceBuilder<T, HttpMiddleware> {
612		TowerServiceBuilder {
613			server_cfg: self.server_cfg,
614			rpc_middleware,
615			http_middleware: self.http_middleware,
616			conn_id: self.conn_id,
617			conn_guard: self.conn_guard,
618		}
619	}
620
621	/// Configure http middleware.
622	pub fn set_http_middleware<T>(
623		self,
624		http_middleware: tower::ServiceBuilder<T>,
625	) -> TowerServiceBuilder<RpcMiddleware, T> {
626		TowerServiceBuilder {
627			server_cfg: self.server_cfg,
628			rpc_middleware: self.rpc_middleware,
629			http_middleware,
630			conn_id: self.conn_id,
631			conn_guard: self.conn_guard,
632		}
633	}
634}
635
636impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
637	/// Configure the [`ServerConfig`].
638	pub fn set_config(mut self, cfg: ServerConfig) -> Self {
639		self.server_cfg = cfg;
640		self
641	}
642
643	/// Enable middleware that is invoked on every JSON-RPC call.
644	///
645	/// The middleware itself is very similar to the `tower middleware` but
646	/// it has a different service trait which takes &self instead &mut self
647	/// which means that you can't use built-in middleware from tower.
648	///
649	/// Another consequence of `&self` is that you must wrap any of the middleware state in
650	/// a type which is Send and provides interior mutability such `Arc<Mutex>`.
651	///
652	/// The builder itself exposes a similar API as the [`tower::ServiceBuilder`]
653	/// where it is possible to compose layers to the middleware.
654	///
655	/// To add a middleware [`crate::middleware::rpc::RpcServiceBuilder`] exposes a few different layer APIs that
656	/// is wrapped on top of the [`tower::ServiceBuilder`].
657	///
658	/// When the server is started these layers are wrapped in the [`crate::middleware::rpc::RpcService`] and
659	/// that's why the service APIs is not exposed.
660	/// ```
661	///
662	/// use std::{time::Instant, net::SocketAddr, sync::Arc};
663	/// use std::sync::atomic::{Ordering, AtomicUsize};
664	///
665	/// use jsonrpsee_server::middleware::rpc::{RpcService, RpcServiceBuilder, RpcServiceT, MethodResponse, Notification, Request, Batch};
666	/// use jsonrpsee_server::ServerBuilder;
667	///
668	/// #[derive(Clone)]
669	/// struct MyMiddleware<S> {
670	///     service: S,
671	///     count: Arc<AtomicUsize>,
672	/// }
673	///
674	/// impl<S> RpcServiceT for MyMiddleware<S>
675	/// where S: RpcServiceT + Send + Sync + Clone + 'static,
676	/// {
677	///    type MethodResponse = S::MethodResponse;
678	///    type BatchResponse = S::BatchResponse;
679	///    type NotificationResponse = S::NotificationResponse;
680	///
681	///    fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
682	///         tracing::info!("MyMiddleware processed call {}", req.method);
683	///         let count = self.count.clone();
684	///         let service = self.service.clone();
685	///
686	///         async move {
687	///             let rp = service.call(req).await;
688	///             // Modify the state.
689	///             count.fetch_add(1, Ordering::Relaxed);
690	///             rp
691	///         }
692	///    }
693	///
694	///    fn batch<'a>(&self, batch: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
695	///          self.service.batch(batch)
696	///    }
697	///
698	///    fn notification<'a>(&self, notif: Notification<'a>) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
699	///          self.service.notification(notif)
700	///    }
701	///
702	/// }
703	///
704	/// // Create a state per connection
705	/// // NOTE: The service type can be omitted once `start` is called on the server.
706	/// let m = RpcServiceBuilder::new().layer_fn(move |service: ()| MyMiddleware { service, count: Arc::new(AtomicUsize::new(0)) });
707	/// let builder = ServerBuilder::default().set_rpc_middleware(m);
708	/// ```
709	pub fn set_rpc_middleware<T>(self, rpc_middleware: RpcServiceBuilder<T>) -> Builder<HttpMiddleware, T> {
710		Builder { server_cfg: self.server_cfg, rpc_middleware, http_middleware: self.http_middleware }
711	}
712
713	/// Configure a custom [`tower::ServiceBuilder`] middleware for composing layers to be applied to the RPC service.
714	///
715	/// Default: No tower layers are applied to the RPC service.
716	///
717	/// # Examples
718	///
719	/// ```rust
720	///
721	/// use std::time::Duration;
722	/// use std::net::SocketAddr;
723	///
724	/// #[tokio::main]
725	/// async fn main() {
726	///     let builder = tower::ServiceBuilder::new().timeout(Duration::from_secs(2));
727	///
728	///     let server = jsonrpsee_server::ServerBuilder::new()
729	///         .set_http_middleware(builder)
730	///         .build("127.0.0.1:0".parse::<SocketAddr>().unwrap())
731	///         .await
732	///         .unwrap();
733	/// }
734	/// ```
735	pub fn set_http_middleware<T>(self, http_middleware: tower::ServiceBuilder<T>) -> Builder<T, RpcMiddleware> {
736		Builder { server_cfg: self.server_cfg, http_middleware, rpc_middleware: self.rpc_middleware }
737	}
738
739	/// Convert the server builder to a [`TowerServiceBuilder`].
740	///
741	/// This can be used to utilize the [`TowerService`] from jsonrpsee.
742	///
743	/// # Examples
744	///
745	/// ```no_run
746	/// use jsonrpsee_server::{Methods, ServerConfig, ServerHandle, ws, stop_channel, serve_with_graceful_shutdown};
747	/// use tower::Service;
748	/// use std::{error::Error as StdError, net::SocketAddr};
749	/// use futures_util::future::{self, Either};
750	/// use hyper_util::rt::{TokioIo, TokioExecutor};
751	///
752	/// fn run_server() -> ServerHandle {
753	///     let (stop_handle, server_handle) = stop_channel();
754	///     let svc_builder = jsonrpsee_server::Server::builder()
755	///         .set_config(ServerConfig::builder().max_connections(33).build())
756	///         .to_service_builder();
757	///     let methods = Methods::new();
758	///     let stop_handle = stop_handle.clone();
759	///
760	///     tokio::spawn(async move {
761	///         let listener = tokio::net::TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).await.unwrap();
762	///
763	///         loop {
764	///              // The `tokio::select!` macro is used to wait for either of the
765	///              // listeners to accept a new connection or for the server to be
766	///              // stopped.
767	///              let (sock, remote_addr) = tokio::select! {
768	///                  res = listener.accept() => {
769	///                      match res {
770	///                         Ok(sock) => sock,
771	///                         Err(e) => {
772	///                             tracing::error!("failed to accept v4 connection: {:?}", e);
773	///                             continue;
774	///                         }
775	///                       }
776	///                  }
777	///                  _ = stop_handle.clone().shutdown() => break,
778	///              };
779	///
780	///              let stop_handle2 = stop_handle.clone();
781	///              let svc_builder2 = svc_builder.clone();
782	///              let methods2 = methods.clone();
783	///
784	///              let svc = tower::service_fn(move |req| {
785	///                   let stop_handle = stop_handle2.clone();
786	///                   let svc_builder = svc_builder2.clone();
787	///                   let methods = methods2.clone();
788	///
789	///                   let mut svc = svc_builder.build(methods, stop_handle.clone());
790	///
791	///                   // It's not possible to know whether the websocket upgrade handshake failed or not here.
792	///                   let is_websocket = ws::is_upgrade_request(&req);
793	///
794	///                   if is_websocket {
795	///                       println!("websocket")
796	///                   } else {
797	///                       println!("http")
798	///                   }
799	///
800	///                   // Call the jsonrpsee service which
801	///                   // may upgrade it to a WebSocket connection
802	///                   // or treat it as "ordinary HTTP request".
803	///                   async move { svc.call(req).await }
804	///               });
805	///
806	///               // Upgrade the connection to a HTTP service with graceful shutdown.
807	///               tokio::spawn(serve_with_graceful_shutdown(sock, svc, stop_handle.clone().shutdown()));
808	///          }
809	///     });
810	///
811	///     server_handle
812	/// }
813	/// ```
814	pub fn to_service_builder(self) -> TowerServiceBuilder<RpcMiddleware, HttpMiddleware> {
815		let max_conns = self.server_cfg.max_connections as usize;
816
817		TowerServiceBuilder {
818			server_cfg: self.server_cfg,
819			rpc_middleware: self.rpc_middleware,
820			http_middleware: self.http_middleware,
821			conn_id: Arc::new(AtomicU32::new(0)),
822			conn_guard: ConnectionGuard::new(max_conns),
823		}
824	}
825
826	/// Finalize the configuration of the server. Consumes the [`Builder`].
827	///
828	/// ```rust
829	/// #[tokio::main]
830	/// async fn main() {
831	///   let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
832	///   let occupied_addr = listener.local_addr().unwrap();
833	///   let addrs: &[std::net::SocketAddr] = &[
834	///       occupied_addr,
835	///       "127.0.0.1:0".parse().unwrap(),
836	///   ];
837	///   assert!(jsonrpsee_server::ServerBuilder::default().build(occupied_addr).await.is_err());
838	///   assert!(jsonrpsee_server::ServerBuilder::default().build(addrs).await.is_ok());
839	/// }
840	/// ```
841	///
842	pub async fn build(self, addrs: impl ToSocketAddrs) -> std::io::Result<Server<HttpMiddleware, RpcMiddleware>> {
843		let listener = TcpListener::bind(addrs).await?;
844
845		Ok(Server {
846			listener,
847			server_cfg: self.server_cfg,
848			rpc_middleware: self.rpc_middleware,
849			http_middleware: self.http_middleware,
850		})
851	}
852
853	/// Finalizes the configuration of the server with customized TCP settings on the socket.
854	///
855	///
856	/// ```rust
857	/// use jsonrpsee_server::Server;
858	/// use socket2::{Domain, Socket, Type};
859	/// use std::time::Duration;
860	///
861	/// #[tokio::main]
862	/// async fn main() {
863	///   let addr = "127.0.0.1:0".parse().unwrap();
864	///   let domain = Domain::for_address(addr);
865	///   let socket = Socket::new(domain, Type::STREAM, None).unwrap();
866	///   socket.set_nonblocking(true).unwrap();
867	///
868	///   let address = addr.into();
869	///   socket.bind(&address).unwrap();
870	///
871	///   socket.listen(4096).unwrap();
872	///
873	///   let server = Server::builder().build_from_tcp(socket).unwrap();
874	/// }
875	/// ```
876	pub fn build_from_tcp(
877		self,
878		listener: impl Into<StdTcpListener>,
879	) -> std::io::Result<Server<HttpMiddleware, RpcMiddleware>> {
880		let listener = TcpListener::from_std(listener.into())?;
881
882		Ok(Server {
883			listener,
884			server_cfg: self.server_cfg,
885			rpc_middleware: self.rpc_middleware,
886			http_middleware: self.http_middleware,
887		})
888	}
889}
890
891/// Data required by the server to handle requests.
892#[derive(Debug, Clone)]
893struct ServiceData {
894	/// Registered server methods.
895	methods: Methods,
896	/// Stop handle.
897	stop_handle: StopHandle,
898	/// Connection ID
899	conn_id: u32,
900	/// Connection guard.
901	conn_guard: ConnectionGuard,
902	/// ServerConfig
903	server_cfg: ServerConfig,
904}
905
906/// jsonrpsee tower service
907///
908/// This will enable both `http_middleware` and `rpc_middleware`
909/// that may be enabled by [`Builder`] or [`TowerServiceBuilder`].
910#[derive(Debug, Clone)]
911pub struct TowerService<RpcMiddleware, HttpMiddleware> {
912	rpc_middleware: TowerServiceNoHttp<RpcMiddleware>,
913	http_middleware: tower::ServiceBuilder<HttpMiddleware>,
914}
915
916impl<RpcMiddleware, HttpMiddleware> TowerService<RpcMiddleware, HttpMiddleware> {
917	/// A future that returns when the connection has been closed.
918	///
919	/// This method must be called before every [`TowerService::call`]
920	/// because the `SessionClosedFuture` may already been consumed or
921	/// not used.
922	pub fn on_session_closed(&mut self) -> SessionClosedFuture {
923		if let Some(n) = self.rpc_middleware.on_session_close.as_mut() {
924			// If it's called more then once another listener is created.
925			n.closed()
926		} else {
927			let (session_close, fut) = session_close();
928			self.rpc_middleware.on_session_close = Some(session_close);
929			fut
930		}
931	}
932}
933
934impl<RequestBody, ResponseBody, RpcMiddleware, HttpMiddleware> Service<HttpRequest<RequestBody>> for TowerService<RpcMiddleware, HttpMiddleware>
935where
936	RpcMiddleware: tower::Layer<RpcService> + Clone,
937	<RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT + Send + Sync + 'static,
938	HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
939	<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service:
940		Send + Service<HttpRequest<RequestBody>, Response = HttpResponse<ResponseBody>, Error = Box<(dyn StdError + Send + Sync + 'static)>>,
941	<<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as Service<HttpRequest<RequestBody>>>::Future:
942		Send + 'static,
943	RequestBody: http_body::Body<Data = Bytes> + Send + 'static,
944	RequestBody::Error: Into<BoxError>,
945{
946	type Response = HttpResponse<ResponseBody>;
947	type Error = BoxError;
948	type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
949
950	fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
951		Poll::Ready(Ok(()))
952	}
953
954	fn call(&mut self, request: HttpRequest<RequestBody>) -> Self::Future {
955		Box::pin(self.http_middleware.service(self.rpc_middleware.clone()).call(request))
956	}
957}
958
959/// jsonrpsee tower service without HTTP specific middleware.
960///
961/// # Note
962/// This is similar to [`hyper::service::service_fn`].
963#[derive(Debug, Clone)]
964pub struct TowerServiceNoHttp<L> {
965	inner: ServiceData,
966	rpc_middleware: RpcServiceBuilder<L>,
967	on_session_close: Option<SessionClose>,
968}
969
970impl<Body, RpcMiddleware> Service<HttpRequest<Body>> for TowerServiceNoHttp<RpcMiddleware>
971where
972	RpcMiddleware: tower::Layer<RpcService>,
973	<RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT<
974			MethodResponse = MethodResponse,
975			BatchResponse = MethodResponse,
976			NotificationResponse = MethodResponse,
977		> + Send
978		+ Sync
979		+ 'static,
980	Body: http_body::Body<Data = Bytes> + Send + 'static,
981	Body::Error: Into<BoxError>,
982{
983	type Response = HttpResponse;
984
985	// The following associated type is required by the `impl<B, U, M: JsonRpcMiddleware> Server<B, L>` bounds.
986	// It satisfies the server's bounds when the `tower::ServiceBuilder<B>` is not set (ie `B: Identity`).
987	type Error = BoxError;
988
989	type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
990
991	fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
992		Poll::Ready(Ok(()))
993	}
994
995	fn call(&mut self, request: HttpRequest<Body>) -> Self::Future {
996		let mut request = request.map(HttpBody::new);
997
998		let conn_guard = &self.inner.conn_guard;
999		let stop_handle = self.inner.stop_handle.clone();
1000		let conn_id = self.inner.conn_id;
1001		let on_session_close = self.on_session_close.take();
1002
1003		tracing::trace!(target: LOG_TARGET, "{:?}", request);
1004
1005		let Some(conn_permit) = conn_guard.try_acquire() else {
1006			return async move { Ok(http::response::too_many_requests()) }.boxed();
1007		};
1008
1009		let conn = ConnectionState::new(stop_handle.clone(), conn_id, conn_permit);
1010
1011		let max_conns = conn_guard.max_connections();
1012		let curr_conns = max_conns - conn_guard.available_connections();
1013		tracing::debug!(target: LOG_TARGET, "Accepting new connection {}/{}", curr_conns, max_conns);
1014
1015		let req_ext = request.extensions_mut();
1016		req_ext.insert::<ConnectionGuard>(conn_guard.clone());
1017		req_ext.insert::<ConnectionId>(conn.conn_id.into());
1018
1019		let is_upgrade_request = is_upgrade_request(&request);
1020
1021		if self.inner.server_cfg.enable_ws && is_upgrade_request {
1022			let this = self.inner.clone();
1023
1024			let mut server = soketto::handshake::http::Server::new();
1025
1026			let response = match server.receive_request(&request) {
1027				Ok(response) => {
1028					let (tx, rx) = mpsc::channel(this.server_cfg.message_buffer_capacity as usize);
1029					let sink = MethodSink::new(tx);
1030
1031					// On each method call the `pending_calls` is cloned
1032					// then when all pending_calls are dropped
1033					// a graceful shutdown can occur.
1034					let (pending_calls, pending_calls_completed) = mpsc::channel::<()>(1);
1035
1036					let cfg = RpcServiceCfg::CallsAndSubscriptions {
1037						bounded_subscriptions: BoundedSubscriptions::new(
1038							this.server_cfg.max_subscriptions_per_connection,
1039						),
1040						id_provider: this.server_cfg.id_provider.clone(),
1041						sink: sink.clone(),
1042						_pending_calls: pending_calls,
1043					};
1044
1045					let rpc_service = RpcService::new(
1046						this.methods.clone(),
1047						this.server_cfg.max_response_body_size as usize,
1048						this.conn_id.into(),
1049						cfg,
1050					);
1051
1052					let rpc_service = self.rpc_middleware.service(rpc_service);
1053
1054					tokio::spawn(
1055						async move {
1056							let extensions = request.extensions().clone();
1057
1058							let upgraded = match hyper::upgrade::on(request).await {
1059								Ok(u) => u,
1060								Err(e) => {
1061									tracing::debug!(target: LOG_TARGET, "Could not upgrade connection: {}", e);
1062									return;
1063								}
1064							};
1065
1066							let io = hyper_util::rt::TokioIo::new(upgraded);
1067
1068							let stream = BufReader::new(BufWriter::new(io.compat()));
1069							let mut ws_builder = server.into_builder(stream);
1070							ws_builder.set_max_message_size(this.server_cfg.max_request_body_size as usize);
1071							let (sender, receiver) = ws_builder.finish();
1072
1073							let params = BackgroundTaskParams {
1074								server_cfg: this.server_cfg,
1075								conn,
1076								ws_sender: sender,
1077								ws_receiver: receiver,
1078								rpc_service,
1079								sink,
1080								rx,
1081								pending_calls_completed,
1082								on_session_close,
1083								extensions,
1084							};
1085
1086							ws::background_task(params).await;
1087						}
1088						.in_current_span(),
1089					);
1090
1091					response.map(|()| HttpBody::empty())
1092				}
1093				Err(e) => {
1094					tracing::debug!(target: LOG_TARGET, "Could not upgrade connection: {}", e);
1095					HttpResponse::new(HttpBody::from(format!("Could not upgrade connection: {e}")))
1096				}
1097			};
1098
1099			async { Ok(response) }.boxed()
1100		} else if self.inner.server_cfg.enable_http && !is_upgrade_request {
1101			let this = &self.inner;
1102			let max_response_size = this.server_cfg.max_response_body_size;
1103			let max_request_size = this.server_cfg.max_request_body_size;
1104			let methods = this.methods.clone();
1105			let batch_config = this.server_cfg.batch_requests_config;
1106
1107			let rpc_service = self.rpc_middleware.service(RpcService::new(
1108				methods,
1109				max_response_size as usize,
1110				this.conn_id.into(),
1111				RpcServiceCfg::OnlyCalls,
1112			));
1113
1114			Box::pin(async move {
1115				let rp = http::call_with_service(request, batch_config, max_request_size, rpc_service).await;
1116				// NOTE: The `conn guard` must be held until the response is processed
1117				// to respect the `max_connections` limit.
1118				drop(conn);
1119				Ok(rp)
1120			})
1121		} else {
1122			// NOTE: the `conn guard` is dropped when this function which is fine
1123			// because it doesn't rely on any async operations.
1124			Box::pin(async { Ok(http::response::denied()) })
1125		}
1126	}
1127}
1128
1129struct ProcessConnection<'a, HttpMiddleware, RpcMiddleware> {
1130	http_middleware: &'a tower::ServiceBuilder<HttpMiddleware>,
1131	rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
1132	conn_guard: &'a ConnectionGuard,
1133	conn_id: u32,
1134	server_cfg: ServerConfig,
1135	stop_handle: StopHandle,
1136	socket: TcpStream,
1137	drop_on_completion: mpsc::Sender<()>,
1138	remote_addr: SocketAddr,
1139	methods: Methods,
1140}
1141
1142#[instrument(name = "connection", skip_all, fields(remote_addr = %params.remote_addr, conn_id = %params.conn_id), level = "INFO")]
1143fn process_connection<'a, RpcMiddleware, HttpMiddleware, Body>(params: ProcessConnection<HttpMiddleware, RpcMiddleware>)
1144where
1145	RpcMiddleware: 'static,
1146	HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
1147	<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service:
1148		Send + 'static + Clone + Service<HttpRequest, Response = HttpResponse<Body>, Error = BoxError>,
1149	<<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as Service<HttpRequest>>::Future:
1150		Send + 'static,
1151	Body: http_body::Body<Data = Bytes> + Send + 'static,
1152	<Body as http_body::Body>::Error: Into<BoxError>,
1153	<Body as http_body::Body>::Data: Send,
1154{
1155	let ProcessConnection {
1156		http_middleware,
1157		rpc_middleware,
1158		conn_guard,
1159		conn_id,
1160		server_cfg,
1161		socket,
1162		stop_handle,
1163		drop_on_completion,
1164		methods,
1165		..
1166	} = params;
1167
1168	if let Err(e) = socket.set_nodelay(server_cfg.tcp_no_delay) {
1169		tracing::warn!(target: LOG_TARGET, "Could not set NODELAY on socket: {:?}", e);
1170		return;
1171	}
1172
1173	let tower_service = TowerServiceNoHttp {
1174		inner: ServiceData {
1175			server_cfg,
1176			methods,
1177			stop_handle: stop_handle.clone(),
1178			conn_id,
1179			conn_guard: conn_guard.clone(),
1180		},
1181		rpc_middleware,
1182		on_session_close: None,
1183	};
1184
1185	let service = http_middleware.service(tower_service);
1186
1187	tokio::spawn(async {
1188		// this requires Clone.
1189		let service = crate::utils::TowerToHyperService::new(service);
1190		let io = TokioIo::new(socket);
1191		let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
1192
1193		let conn = builder.serve_connection_with_upgrades(io, service);
1194		let stopped = stop_handle.shutdown();
1195
1196		tokio::pin!(stopped, conn);
1197
1198		let res = match future::select(conn, stopped).await {
1199			Either::Left((conn, _)) => conn,
1200			Either::Right((_, mut conn)) => {
1201				// NOTE: the connection should continue to be polled until shutdown can finish.
1202				// Thus, both lines below are needed and not a nit.
1203				conn.as_mut().graceful_shutdown();
1204				conn.await
1205			}
1206		};
1207
1208		if let Err(e) = res {
1209			tracing::debug!(target: LOG_TARGET, "HTTP serve connection failed {:?}", e);
1210		}
1211		drop(drop_on_completion)
1212	});
1213}
1214
1215enum AcceptConnection<S> {
1216	Shutdown,
1217	Established { socket: TcpStream, remote_addr: SocketAddr, stop: S },
1218	Err((std::io::Error, S)),
1219}
1220
1221async fn try_accept_conn<S>(listener: &TcpListener, stopped: S) -> AcceptConnection<S>
1222where
1223	S: Future + Unpin,
1224{
1225	let accept = listener.accept();
1226	tokio::pin!(accept);
1227
1228	match futures_util::future::select(accept, stopped).await {
1229		Either::Left((res, stop)) => match res {
1230			Ok((socket, remote_addr)) => AcceptConnection::Established { socket, remote_addr, stop },
1231			Err(e) => AcceptConnection::Err((e, stop)),
1232		},
1233		Either::Right(_) => AcceptConnection::Shutdown,
1234	}
1235}
1236
1237pub(crate) async fn handle_rpc_call<S>(
1238	body: &[u8],
1239	is_single: bool,
1240	batch_config: BatchRequestConfig,
1241	rpc_service: &S,
1242	extensions: Extensions,
1243) -> MethodResponse
1244where
1245	S: RpcServiceT<
1246			MethodResponse = MethodResponse,
1247			BatchResponse = MethodResponse,
1248			NotificationResponse = MethodResponse,
1249		> + Send,
1250{
1251	// Single request or notification
1252	if is_single {
1253		if let Ok(req) = deserialize_with_ext::call::from_slice(body, &extensions) {
1254			rpc_service.call(req).await
1255		} else if let Ok(notif) = deserialize_with_ext::notif::from_slice::<Notif>(body, &extensions) {
1256			rpc_service.notification(notif).await
1257		} else {
1258			let (id, code) = prepare_error(body);
1259			MethodResponse::error(id, ErrorObject::from(code))
1260		}
1261	}
1262	// Batch of requests.
1263	else {
1264		let max_len = match batch_config {
1265			BatchRequestConfig::Disabled => {
1266				let rp = MethodResponse::error(
1267					Id::Null,
1268					ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG, None),
1269				);
1270				return rp;
1271			}
1272			BatchRequestConfig::Limit(limit) => limit as usize,
1273			BatchRequestConfig::Unlimited => usize::MAX,
1274		};
1275
1276		if let Ok(unchecked_batch) = serde_json::from_slice::<Vec<&JsonRawValue>>(body) {
1277			if unchecked_batch.len() > max_len {
1278				return MethodResponse::error(Id::Null, reject_too_big_batch_request(max_len));
1279			}
1280
1281			let mut batch = Vec::with_capacity(unchecked_batch.len());
1282
1283			for call in unchecked_batch {
1284				if let Ok(req) = deserialize_with_ext::call::from_str(call.get(), &extensions) {
1285					batch.push(Ok(BatchEntry::Call(req)));
1286				} else if let Ok(notif) = deserialize_with_ext::notif::from_str::<Notif>(call.get(), &extensions) {
1287					batch.push(Ok(BatchEntry::Notification(notif)));
1288				} else {
1289					let id = match serde_json::from_str::<jsonrpsee_types::InvalidRequest>(call.get()) {
1290						Ok(err) => err.id,
1291						Err(_) => Id::Null,
1292					};
1293
1294					batch.push(Err(BatchEntryErr::new(id, ErrorCode::InvalidRequest.into())));
1295				}
1296			}
1297
1298			rpc_service.batch(Batch::from(batch)).await
1299		} else {
1300			MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::ParseError))
1301		}
1302	}
1303}