jsonrpsee_http_server/
server.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27use std::future::Future;
28use std::net::{SocketAddr, TcpListener as StdTcpListener};
29use std::pin::Pin;
30use std::task::{Context, Poll};
31
32use crate::response;
33use crate::response::{internal_error, malformed};
34use futures_channel::mpsc;
35use futures_util::future::FutureExt;
36use futures_util::stream::{StreamExt, TryStreamExt};
37use hyper::header::{HeaderMap, HeaderValue};
38use hyper::server::conn::AddrStream;
39use hyper::server::{conn::AddrIncoming, Builder as HyperBuilder};
40use hyper::service::{make_service_fn, service_fn};
41use hyper::{Error as HyperError, Method};
42use jsonrpsee_core::error::{Error, GenericTransportError};
43use jsonrpsee_core::http_helpers::{self, read_body};
44use jsonrpsee_core::middleware::{self, HttpMiddleware as Middleware};
45use jsonrpsee_core::server::access_control::AccessControl;
46use jsonrpsee_core::server::helpers::{prepare_error, MethodResponse};
47use jsonrpsee_core::server::helpers::{BatchResponse, BatchResponseBuilder};
48use jsonrpsee_core::server::resource_limiting::Resources;
49use jsonrpsee_core::server::rpc_module::{MethodKind, Methods};
50use jsonrpsee_core::tracing::{rx_log_from_json, rx_log_from_str, tx_log_from_str, RpcTracing};
51use jsonrpsee_core::TEN_MB_SIZE_BYTES;
52use jsonrpsee_types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
53use jsonrpsee_types::{Id, Notification, Params, Request};
54use serde_json::value::RawValue;
55use tokio::net::{TcpListener, ToSocketAddrs};
56use tracing_futures::Instrument;
57
58type Notif<'a> = Notification<'a, Option<&'a RawValue>>;
59
60/// Builder to create JSON-RPC HTTP server.
61#[derive(Debug)]
62pub struct Builder<M = ()> {
63	/// Access control based on HTTP headers.
64	access_control: AccessControl,
65	resources: Resources,
66	max_request_body_size: u32,
67	max_response_body_size: u32,
68	batch_requests_supported: bool,
69	/// Custom tokio runtime to run the server on.
70	tokio_runtime: Option<tokio::runtime::Handle>,
71	middleware: M,
72	max_log_length: u32,
73	health_api: Option<HealthApi>,
74}
75
76impl Default for Builder {
77	fn default() -> Self {
78		Self {
79			access_control: AccessControl::default(),
80			max_request_body_size: TEN_MB_SIZE_BYTES,
81			max_response_body_size: TEN_MB_SIZE_BYTES,
82			batch_requests_supported: true,
83			resources: Resources::default(),
84			tokio_runtime: None,
85			middleware: (),
86			max_log_length: 4096,
87			health_api: None,
88		}
89	}
90}
91
92impl Builder {
93	/// Create a default server builder.
94	pub fn new() -> Self {
95		Self::default()
96	}
97}
98
99impl<M> Builder<M> {
100	/// Add a middleware to the builder [`Middleware`](../jsonrpsee_core/middleware/trait.Middleware.html).
101	///
102	/// ```
103	/// use std::{time::Instant, net::SocketAddr};
104	///
105	/// use jsonrpsee_core::middleware::{HttpMiddleware, Headers, MethodKind, Params};
106	/// use jsonrpsee_http_server::HttpServerBuilder;
107	///
108	/// #[derive(Clone)]
109	/// struct MyMiddleware;
110	///
111	/// impl HttpMiddleware for MyMiddleware {
112	///     type Instant = Instant;
113	///
114	///     // Called once the HTTP request is received, it may be a single JSON-RPC call
115	///     // or batch.
116	///     fn on_request(&self, _remote_addr: SocketAddr, _headers: &Headers) -> Instant {
117	///         Instant::now()
118	///     }
119	///
120	///     // Called once a single JSON-RPC method call is processed, it may be called multiple times
121	///     // on batches.
122	///     fn on_call(&self, method_name: &str, params: Params, kind: MethodKind) {
123	///         println!("Call to method: '{}' params: {:?}, kind: {}", method_name, params, kind);
124	///     }
125	///
126	///     // Called once a single JSON-RPC call is completed, it may be called multiple times
127	///     // on batches.
128	///     fn on_result(&self, method_name: &str, success: bool, started_at: Instant) {
129	///         println!("Call to '{}' took {:?}", method_name, started_at.elapsed());
130	///     }
131	///
132	///     // Called the entire JSON-RPC is completed, called on once for both single calls or batches.
133	///     fn on_response(&self, result: &str, started_at: Instant) {
134	///         println!("complete JSON-RPC response: {}, took: {:?}", result, started_at.elapsed());
135	///     }
136	/// }
137	///
138	/// let builder = HttpServerBuilder::new().set_middleware(MyMiddleware);
139	/// ```
140	pub fn set_middleware<T: Middleware>(self, middleware: T) -> Builder<T> {
141		Builder {
142			access_control: self.access_control,
143			max_request_body_size: self.max_request_body_size,
144			max_response_body_size: self.max_response_body_size,
145			batch_requests_supported: self.batch_requests_supported,
146			resources: self.resources,
147			tokio_runtime: self.tokio_runtime,
148			middleware,
149			max_log_length: self.max_log_length,
150			health_api: self.health_api,
151		}
152	}
153
154	/// Sets the maximum size of a request body in bytes (default is 10 MiB).
155	pub fn max_request_body_size(mut self, size: u32) -> Self {
156		self.max_request_body_size = size;
157		self
158	}
159
160	/// Sets the maximum size of a response body in bytes (default is 10 MiB).
161	pub fn max_response_body_size(mut self, size: u32) -> Self {
162		self.max_response_body_size = size;
163		self
164	}
165
166	/// Sets access control settings.
167	pub fn set_access_control(mut self, acl: AccessControl) -> Self {
168		self.access_control = acl;
169		self
170	}
171
172	/// Enables or disables support of [batch requests](https://www.jsonrpc.org/specification#batch).
173	/// By default, support is enabled.
174	pub fn batch_requests_supported(mut self, supported: bool) -> Self {
175		self.batch_requests_supported = supported;
176		self
177	}
178
179	/// Register a new resource kind. Errors if `label` is already registered, or if the number of
180	/// registered resources on this server instance would exceed 8.
181	///
182	/// See the module documentation for [`resource_limiting`](../jsonrpsee_utils/server/resource_limiting/index.html#resource-limiting)
183	/// for details.
184	pub fn register_resource(mut self, label: &'static str, capacity: u16, default: u16) -> Result<Self, Error> {
185		self.resources.register(label, capacity, default)?;
186
187		Ok(self)
188	}
189
190	/// Configure a custom [`tokio::runtime::Handle`] to run the server on.
191	///
192	/// Default: [`tokio::spawn`]
193	pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
194		self.tokio_runtime = Some(rt);
195		self
196	}
197
198	/// Enable health endpoint.
199	/// Allows you to expose one of the methods under GET /<path> The method will be invoked with no parameters.
200	/// Error returned from the method will be converted to status 500 response.
201	/// Expects a tuple with (</path>, <rpc-method-name>).
202	///
203	/// Fails if the path is missing `/`.
204	pub fn health_api(mut self, path: impl Into<String>, method: impl Into<String>) -> Result<Self, Error> {
205		let path = path.into();
206
207		if !path.starts_with('/') {
208			return Err(Error::Custom(format!("Health endpoint path must start with `/` to work, got: {}", path)));
209		}
210
211		self.health_api = Some(HealthApi { path, method: method.into() });
212		Ok(self)
213	}
214
215	/// Finalizes the configuration of the server with customized TCP settings on the socket and on hyper.
216	///
217	/// ```rust
218	/// use jsonrpsee_http_server::HttpServerBuilder;
219	/// use socket2::{Domain, Socket, Type};
220	/// use std::net::TcpListener;
221	///
222	/// #[tokio::main]
223	/// async fn main() {
224	///   let addr = "127.0.0.1:0".parse().unwrap();
225	///   let domain = Domain::for_address(addr);
226	///   let socket = Socket::new(domain, Type::STREAM, None).unwrap();
227	///   socket.set_nonblocking(true).unwrap();
228	///
229	///   let address = addr.into();
230	///   socket.bind(&address).unwrap();
231	///   socket.listen(4096).unwrap();
232	///
233	///   let listener: TcpListener = socket.into();
234	///   let local_addr = listener.local_addr().ok();
235	///
236	///   // hyper does some settings on the provided socket, ensure that nothing breaks our "expected settings".
237	///
238	///   let listener = hyper::Server::from_tcp(listener)
239	///     .unwrap()
240	///     .tcp_sleep_on_accept_errors(true)
241	///     .tcp_keepalive(None)
242	///     .tcp_nodelay(true);
243	///
244	///   let server = HttpServerBuilder::new().build_from_hyper(listener, addr).unwrap();
245	/// }
246	/// ```
247	pub fn build_from_hyper(
248		self,
249		listener: hyper::server::Builder<AddrIncoming>,
250		local_addr: SocketAddr,
251	) -> Result<Server<M>, Error> {
252		Ok(Server {
253			access_control: self.access_control,
254			listener,
255			local_addr: Some(local_addr),
256			max_request_body_size: self.max_request_body_size,
257			max_response_body_size: self.max_response_body_size,
258			batch_requests_supported: self.batch_requests_supported,
259			resources: self.resources,
260			tokio_runtime: self.tokio_runtime,
261			middleware: self.middleware,
262			max_log_length: self.max_log_length,
263			health_api: self.health_api,
264		})
265	}
266
267	/// Finalizes the configuration of the server with customized TCP settings on the socket.
268	/// Note, that [`hyper`] might overwrite some of the TCP settings on the socket
269	/// if you want full-control of socket settings use [`Builder::build_from_hyper`] instead.
270	///
271	/// ```rust
272	/// use jsonrpsee_http_server::HttpServerBuilder;
273	/// use socket2::{Domain, Socket, Type};
274	/// use std::time::Duration;
275	///
276	/// #[tokio::main]
277	/// async fn main() {
278	///   let addr = "127.0.0.1:0".parse().unwrap();
279	///   let domain = Domain::for_address(addr);
280	///   let socket = Socket::new(domain, Type::STREAM, None).unwrap();
281	///   socket.set_nonblocking(true).unwrap();
282	///
283	///   let address = addr.into();
284	///   socket.bind(&address).unwrap();
285	///
286	///   socket.listen(4096).unwrap();
287	///
288	///   let server = HttpServerBuilder::new().build_from_tcp(socket).unwrap();
289	/// }
290	/// ```
291	pub fn build_from_tcp(self, listener: impl Into<StdTcpListener>) -> Result<Server<M>, Error> {
292		let listener = listener.into();
293		let local_addr = listener.local_addr().ok();
294
295		let listener = hyper::Server::from_tcp(listener)?;
296
297		Ok(Server {
298			listener,
299			local_addr,
300			access_control: self.access_control,
301			max_request_body_size: self.max_request_body_size,
302			max_response_body_size: self.max_response_body_size,
303			batch_requests_supported: self.batch_requests_supported,
304			resources: self.resources,
305			tokio_runtime: self.tokio_runtime,
306			middleware: self.middleware,
307			max_log_length: self.max_log_length,
308			health_api: self.health_api,
309		})
310	}
311
312	/// Finalizes the configuration of the server.
313	///
314	/// ```rust
315	/// #[tokio::main]
316	/// async fn main() {
317	///   let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
318	///   let occupied_addr = listener.local_addr().unwrap();
319	///   let addrs: &[std::net::SocketAddr] = &[
320	///       occupied_addr,
321	///       "127.0.0.1:0".parse().unwrap(),
322	///   ];
323	///   assert!(jsonrpsee_http_server::HttpServerBuilder::default().build(occupied_addr).await.is_err());
324	///   assert!(jsonrpsee_http_server::HttpServerBuilder::default().build(addrs).await.is_ok());
325	/// }
326	/// ```
327	pub async fn build(self, addrs: impl ToSocketAddrs) -> Result<Server<M>, Error> {
328		let listener = TcpListener::bind(addrs).await?.into_std()?;
329
330		let local_addr = listener.local_addr().ok();
331		let listener = hyper::Server::from_tcp(listener)?.tcp_nodelay(true);
332
333		Ok(Server {
334			listener,
335			local_addr,
336			access_control: self.access_control,
337			max_request_body_size: self.max_request_body_size,
338			max_response_body_size: self.max_response_body_size,
339			batch_requests_supported: self.batch_requests_supported,
340			resources: self.resources,
341			tokio_runtime: self.tokio_runtime,
342			middleware: self.middleware,
343			max_log_length: self.max_log_length,
344			health_api: self.health_api,
345		})
346	}
347}
348
349#[derive(Debug, Clone)]
350struct HealthApi {
351	path: String,
352	method: String,
353}
354
355/// Handle used to run or stop the server.
356#[derive(Debug)]
357pub struct ServerHandle {
358	stop_sender: mpsc::Sender<()>,
359	pub(crate) handle: Option<tokio::task::JoinHandle<()>>,
360}
361
362impl ServerHandle {
363	/// Requests server to stop. Returns an error if server was already stopped.
364	pub fn stop(mut self) -> Result<tokio::task::JoinHandle<()>, Error> {
365		let stop = self.stop_sender.try_send(()).map(|_| self.handle.take());
366		match stop {
367			Ok(Some(handle)) => Ok(handle),
368			_ => Err(Error::AlreadyStopped),
369		}
370	}
371}
372
373impl Future for ServerHandle {
374	type Output = ();
375
376	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
377		let handle = match &mut self.handle {
378			Some(handle) => handle,
379			None => return Poll::Ready(()),
380		};
381
382		handle.poll_unpin(cx).map(|_| ())
383	}
384}
385
386/// An HTTP JSON RPC server.
387#[derive(Debug)]
388pub struct Server<M = ()> {
389	/// Hyper server.
390	listener: HyperBuilder<AddrIncoming>,
391	/// Local address
392	local_addr: Option<SocketAddr>,
393	/// Max request body size.
394	max_request_body_size: u32,
395	/// Max response body size.
396	max_response_body_size: u32,
397	/// Max length for logging for request and response
398	///
399	/// Logs bigger than this limit will be truncated.
400	max_log_length: u32,
401	/// Whether batch requests are supported by this server or not.
402	batch_requests_supported: bool,
403	/// Access control.
404	access_control: AccessControl,
405	/// Tracker for currently used resources on the server.
406	resources: Resources,
407	/// Custom tokio runtime to run the server on.
408	tokio_runtime: Option<tokio::runtime::Handle>,
409	middleware: M,
410	health_api: Option<HealthApi>,
411}
412
413impl<M: Middleware> Server<M> {
414	/// Returns socket address to which the server is bound.
415	pub fn local_addr(&self) -> Result<SocketAddr, Error> {
416		self.local_addr.ok_or_else(|| Error::Custom("Local address not found".into()))
417	}
418
419	/// Start the server.
420	pub fn start(mut self, methods: impl Into<Methods>) -> Result<ServerHandle, Error> {
421		let max_request_body_size = self.max_request_body_size;
422		let max_response_body_size = self.max_response_body_size;
423		let max_log_length = self.max_log_length;
424		let acl = self.access_control;
425		let (tx, mut rx) = mpsc::channel(1);
426		let listener = self.listener;
427		let resources = self.resources;
428		let middleware = self.middleware;
429		let batch_requests_supported = self.batch_requests_supported;
430		let methods = methods.into().initialize_resources(&resources)?;
431		let health_api = self.health_api;
432
433		let make_service = make_service_fn(move |conn: &AddrStream| {
434			let remote_addr = conn.remote_addr();
435			let methods = methods.clone();
436			let acl = acl.clone();
437			let resources = resources.clone();
438			let middleware = middleware.clone();
439			let health_api = health_api.clone();
440
441			async move {
442				Ok::<_, HyperError>(service_fn(move |request| {
443					let request_start = middleware.on_request(remote_addr, request.headers());
444
445					let methods = methods.clone();
446					let acl = acl.clone();
447					let resources = resources.clone();
448					let middleware = middleware.clone();
449					let health_api = health_api.clone();
450
451					// Run some validation on the http request, then read the body and try to deserialize it into one of
452					// two cases: a single RPC request or a batch of RPC requests.
453					async move {
454						let keys = request.headers().keys().map(|k| k.as_str());
455						let cors_request_headers = http_helpers::get_cors_request_headers(request.headers());
456
457						let host = match http_helpers::read_header_value(request.headers(), "host") {
458							Some(origin) => origin,
459							None => return Ok(malformed()),
460						};
461						let maybe_origin = http_helpers::read_header_value(request.headers(), "origin");
462
463						if let Err(e) = acl.verify_host(host) {
464							tracing::warn!("Denied request: {:?}", e);
465							return Ok(response::host_not_allowed());
466						}
467
468						if let Err(e) = acl.verify_origin(maybe_origin, host) {
469							tracing::warn!("Denied request: {:?}", e);
470							return Ok(response::invalid_allow_origin());
471						}
472
473						if let Err(e) = acl.verify_headers(keys, cors_request_headers) {
474							tracing::warn!("Denied request: {:?}", e);
475							return Ok(response::invalid_allow_headers());
476						}
477
478						// Only `POST` and `OPTIONS` methods are allowed.
479						match *request.method() {
480							// An OPTIONS request is a CORS preflight request. We've done our access check
481							// above so we just need to tell the browser that the request is OK.
482							Method::OPTIONS => {
483								let origin = match maybe_origin {
484									Some(origin) => origin,
485									None => return Ok(malformed()),
486								};
487
488								let allowed_headers = acl.allowed_headers().to_cors_header_value();
489								let allowed_header_bytes = allowed_headers.as_bytes();
490
491								let res = hyper::Response::builder()
492									.header("access-control-allow-origin", origin)
493									.header("access-control-allow-methods", "POST")
494									.header("access-control-allow-headers", allowed_header_bytes)
495									.body(hyper::Body::empty())
496									.unwrap_or_else(|e| {
497										tracing::error!("Error forming preflight response: {}", e);
498										internal_error()
499									});
500
501								Ok(res)
502							}
503							// The actual request. If it's a CORS request we need to remember to add
504							// the access-control-allow-origin header (despite preflight) to allow it
505							// to be read in a browser.
506							Method::POST if content_type_is_json(&request) => {
507								let origin = return_origin_if_different_from_host(request.headers()).cloned();
508								let mut res = process_validated_request(ProcessValidatedRequest {
509									request,
510									middleware,
511									methods,
512									resources,
513									max_request_body_size,
514									max_response_body_size,
515									max_log_length,
516									batch_requests_supported,
517									request_start,
518								})
519								.await?;
520
521								if let Some(origin) = origin {
522									res.headers_mut().insert("access-control-allow-origin", origin);
523								}
524								Ok(res)
525							}
526							Method::GET => match health_api.as_ref() {
527								Some(health) if health.path.as_str() == request.uri().path() => {
528									process_health_request(
529										health,
530										middleware,
531										methods,
532										max_response_body_size,
533										request_start,
534										max_log_length,
535									)
536									.await
537								}
538								_ => Ok(response::method_not_allowed()),
539							},
540							// Error scenarios:
541							Method::POST => Ok(response::unsupported_content_type()),
542							_ => Ok(response::method_not_allowed()),
543						}
544					}
545				}))
546			}
547		});
548
549		let rt = match self.tokio_runtime.take() {
550			Some(rt) => rt,
551			None => tokio::runtime::Handle::current(),
552		};
553
554		let handle = rt.spawn(async move {
555			let server = listener.serve(make_service);
556			let _ = server.with_graceful_shutdown(async move { rx.next().await.map_or((), |_| ()) }).await;
557		});
558
559		Ok(ServerHandle { handle: Some(handle), stop_sender: tx })
560	}
561}
562
563// Checks the origin and host headers. If they both exist, return the origin if it does not match the host.
564// If one of them doesn't exist (origin most probably), or they are identical, return None.
565fn return_origin_if_different_from_host(headers: &HeaderMap) -> Option<&HeaderValue> {
566	if let (Some(origin), Some(host)) = (headers.get("origin"), headers.get("host")) {
567		if origin != host {
568			Some(origin)
569		} else {
570			None
571		}
572	} else {
573		None
574	}
575}
576
577/// Checks that content type of received request is valid for JSON-RPC.
578fn content_type_is_json(request: &hyper::Request<hyper::Body>) -> bool {
579	is_json(request.headers().get("content-type"))
580}
581
582/// Returns true if the `content_type` header indicates a valid JSON message.
583fn is_json(content_type: Option<&hyper::header::HeaderValue>) -> bool {
584	match content_type.and_then(|val| val.to_str().ok()) {
585		Some(content)
586			if content.eq_ignore_ascii_case("application/json")
587				|| content.eq_ignore_ascii_case("application/json; charset=utf-8")
588				|| content.eq_ignore_ascii_case("application/json;charset=utf-8") =>
589		{
590			true
591		}
592		_ => false,
593	}
594}
595
596struct ProcessValidatedRequest<M: Middleware> {
597	request: hyper::Request<hyper::Body>,
598	middleware: M,
599	methods: Methods,
600	resources: Resources,
601	max_request_body_size: u32,
602	max_response_body_size: u32,
603	max_log_length: u32,
604	batch_requests_supported: bool,
605	request_start: M::Instant,
606}
607
608/// Process a verified request, it implies a POST request with content type JSON.
609async fn process_validated_request<M: Middleware>(
610	input: ProcessValidatedRequest<M>,
611) -> Result<hyper::Response<hyper::Body>, HyperError> {
612	let ProcessValidatedRequest {
613		request,
614		middleware,
615		methods,
616		resources,
617		max_request_body_size,
618		max_response_body_size,
619		max_log_length,
620		batch_requests_supported,
621		request_start,
622	} = input;
623
624	let (parts, body) = request.into_parts();
625
626	let (body, is_single) = match read_body(&parts.headers, body, max_request_body_size).await {
627		Ok(r) => r,
628		Err(GenericTransportError::TooLarge) => return Ok(response::too_large(max_request_body_size)),
629		Err(GenericTransportError::Malformed) => return Ok(response::malformed()),
630		Err(GenericTransportError::Inner(e)) => {
631			tracing::error!("Internal error reading request body: {}", e);
632			return Ok(response::internal_error());
633		}
634	};
635
636	// Single request or notification
637	if is_single {
638		let call = CallData {
639			conn_id: 0,
640			middleware: &middleware,
641			methods: &methods,
642			max_response_body_size,
643			max_log_length,
644			resources: &resources,
645			request_start,
646		};
647		let response = process_single_request(body, call).await;
648		middleware.on_response(&response.result, request_start);
649		Ok(response::ok_response(response.result))
650	}
651	// Batch of requests or notifications
652	else if !batch_requests_supported {
653		let err = MethodResponse::error(
654			Id::Null,
655			ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None),
656		);
657		middleware.on_response(&err.result, request_start);
658		Ok(response::ok_response(err.result))
659	}
660	// Batch of requests or notifications
661	else {
662		let response = process_batch_request(Batch {
663			data: body,
664			call: CallData {
665				conn_id: 0,
666				middleware: &middleware,
667				methods: &methods,
668				max_response_body_size,
669				max_log_length,
670				resources: &resources,
671				request_start,
672			},
673		})
674		.await;
675		middleware.on_response(&response.result, request_start);
676		Ok(response::ok_response(response.result))
677	}
678}
679
680async fn process_health_request<M: Middleware>(
681	health_api: &HealthApi,
682	middleware: M,
683	methods: Methods,
684	max_response_body_size: u32,
685	request_start: M::Instant,
686	max_log_length: u32,
687) -> Result<hyper::Response<hyper::Body>, HyperError> {
688	let trace = RpcTracing::method_call(&health_api.method);
689	async {
690		tx_log_from_str("HTTP health API", max_log_length);
691		let response = match methods.method_with_name(&health_api.method) {
692			None => MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::MethodNotFound)),
693			Some((_name, method_callback)) => match method_callback.inner() {
694				MethodKind::Sync(callback) => {
695					(callback)(Id::Number(0), Params::new(None), max_response_body_size as usize)
696				}
697				MethodKind::Async(callback) => {
698					(callback)(Id::Number(0), Params::new(None), 0, max_response_body_size as usize, None).await
699				}
700				MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => {
701					MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError))
702				}
703			},
704		};
705
706		rx_log_from_str(&response.result, max_log_length);
707		middleware.on_result(&health_api.method, response.success, request_start);
708		middleware.on_response(&response.result, request_start);
709
710		if response.success {
711			#[derive(serde::Deserialize)]
712			struct RpcPayload<'a> {
713				#[serde(borrow)]
714				result: &'a serde_json::value::RawValue,
715			}
716
717			let payload: RpcPayload = serde_json::from_str(&response.result)
718				.expect("valid JSON-RPC response must have a result field and be valid JSON; qed");
719			Ok(response::ok_response(payload.result.to_string()))
720		} else {
721			Ok(response::internal_error())
722		}
723	}
724	.instrument(trace.into_span())
725	.await
726}
727
728#[derive(Debug, Clone)]
729struct Batch<'a, M: Middleware> {
730	data: Vec<u8>,
731	call: CallData<'a, M>,
732}
733
734#[derive(Debug, Clone)]
735struct CallData<'a, M: Middleware> {
736	conn_id: usize,
737	middleware: &'a M,
738	methods: &'a Methods,
739	max_response_body_size: u32,
740	max_log_length: u32,
741	resources: &'a Resources,
742	request_start: M::Instant,
743}
744
745#[derive(Debug, Clone)]
746struct Call<'a, M: Middleware> {
747	params: Params<'a>,
748	name: &'a str,
749	call: CallData<'a, M>,
750	id: Id<'a>,
751}
752
753// Batch responses must be sent back as a single message so we read the results from each
754// request in the batch and read the results off of a new channel, `rx_batch`, and then send the
755// complete batch response back to the client over `tx`.
756async fn process_batch_request<M>(b: Batch<'_, M>) -> BatchResponse
757where
758	M: Middleware,
759{
760	let Batch { data, call } = b;
761
762	if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&data) {
763		let max_response_size = call.max_response_body_size;
764		let batch = batch.into_iter().map(|req| Ok((req, call.clone())));
765
766		let batch_stream = futures_util::stream::iter(batch);
767
768		let trace = RpcTracing::batch();
769		return async {
770			let batch_response = batch_stream
771				.try_fold(
772					BatchResponseBuilder::new_with_limit(max_response_size as usize),
773					|batch_response, (req, call)| async move {
774						let params = Params::new(req.params.map(|params| params.get()));
775						let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await;
776						batch_response.append(&response)
777					},
778				)
779				.await;
780
781			match batch_response {
782				Ok(batch) => batch.finish(),
783				Err(batch_err) => batch_err,
784			}
785		}
786		.instrument(trace.into_span())
787		.await;
788	}
789
790	if let Ok(batch) = serde_json::from_slice::<Vec<Notif>>(&data) {
791		return if !batch.is_empty() {
792			BatchResponse { result: "".to_string(), success: true }
793		} else {
794			BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
795		};
796	}
797
798	// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
799	// Array with at least one value, the response from the Server MUST be a single
800	// Response object." – The Spec.
801	let (id, code) = prepare_error(&data);
802	BatchResponse::error(id, ErrorObject::from(code))
803}
804
805async fn process_single_request<M: Middleware>(data: Vec<u8>, call: CallData<'_, M>) -> MethodResponse {
806	if let Ok(req) = serde_json::from_slice::<Request>(&data) {
807		let trace = RpcTracing::method_call(&req.method);
808		async {
809			rx_log_from_json(&req, call.max_log_length);
810			let params = Params::new(req.params.map(|params| params.get()));
811			let name = &req.method;
812			let id = req.id;
813			execute_call(Call { name, params, id, call }).await
814		}
815		.instrument(trace.into_span())
816		.await
817	} else if let Ok(req) = serde_json::from_slice::<Notif>(&data) {
818		let trace = RpcTracing::notification(&req.method);
819		let span = trace.into_span();
820		let _enter = span.enter();
821		rx_log_from_json(&req, call.max_log_length);
822
823		MethodResponse { result: String::new(), success: true }
824	} else {
825		let (id, code) = prepare_error(&data);
826		MethodResponse::error(id, ErrorObject::from(code))
827	}
828}
829
830async fn execute_call<M: Middleware>(c: Call<'_, M>) -> MethodResponse {
831	let Call { name, id, params, call } = c;
832	let CallData { resources, methods, middleware, max_response_body_size, max_log_length, conn_id, request_start } =
833		call;
834
835	let response = match methods.method_with_name(name) {
836		None => {
837			middleware.on_call(name, params.clone(), middleware::MethodKind::Unknown);
838			MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound))
839		}
840		Some((name, method)) => match &method.inner() {
841			MethodKind::Sync(callback) => {
842				middleware.on_call(name, params.clone(), middleware::MethodKind::MethodCall);
843
844				match method.claim(name, resources) {
845					Ok(guard) => {
846						let r = (callback)(id, params, max_response_body_size as usize);
847						drop(guard);
848						r
849					}
850					Err(err) => {
851						tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
852						MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
853					}
854				}
855			}
856			MethodKind::Async(callback) => {
857				middleware.on_call(name, params.clone(), middleware::MethodKind::MethodCall);
858				match method.claim(name, resources) {
859					Ok(guard) => {
860						let id = id.into_owned();
861						let params = params.into_owned();
862
863						(callback)(id, params, conn_id, max_response_body_size as usize, Some(guard)).await
864					}
865					Err(err) => {
866						tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
867						MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
868					}
869				}
870			}
871			MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => {
872				middleware.on_call(name, params.clone(), middleware::MethodKind::Unknown);
873				tracing::error!("Subscriptions not supported on HTTP");
874				MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError))
875			}
876		},
877	};
878
879	tx_log_from_str(&response.result, max_log_length);
880	middleware.on_result(name, response.success, request_start);
881	response
882}