jsonrpsee_http_client/
client.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27use std::borrow::Cow as StdCow;
28use std::fmt;
29use std::sync::Arc;
30use std::time::Duration;
31
32use crate::transport::{self, Error as TransportError, HttpBackend, HttpTransportClient, HttpTransportClientBuilder};
33use crate::types::{NotificationSer, RequestSer, Response};
34use crate::{HttpRequest, HttpResponse};
35use async_trait::async_trait;
36use hyper::body::Bytes;
37use hyper::http::HeaderMap;
38use jsonrpsee_core::client::{
39	generate_batch_id_range, BatchResponse, ClientT, Error, IdKind, RequestIdManager, Subscription, SubscriptionClientT,
40};
41use jsonrpsee_core::params::BatchRequestBuilder;
42use jsonrpsee_core::traits::ToRpcParams;
43use jsonrpsee_core::{BoxError, JsonRawValue, TEN_MB_SIZE_BYTES};
44use jsonrpsee_types::{ErrorObject, InvalidRequestId, ResponseSuccess, TwoPointZero};
45use serde::de::DeserializeOwned;
46use tokio::sync::Semaphore;
47use tower::layer::util::Identity;
48use tower::{Layer, Service};
49use tracing::instrument;
50
51#[cfg(feature = "tls")]
52use crate::{CertificateStore, CustomCertStore};
53
54/// HTTP client builder.
55///
56/// # Examples
57///
58/// ```no_run
59///
60/// use jsonrpsee_http_client::{HttpClientBuilder, HeaderMap, HeaderValue};
61///
62/// #[tokio::main]
63/// async fn main() {
64///     // Build custom headers used for every submitted request.
65///     let mut headers = HeaderMap::new();
66///     headers.insert("Any-Header-You-Like", HeaderValue::from_static("42"));
67///
68///     // Build client
69///     let client = HttpClientBuilder::default()
70///          .set_headers(headers)
71///          .build("http://localhost")
72///          .unwrap();
73///
74///     // use client....
75/// }
76/// ```
77#[derive(Debug)]
78pub struct HttpClientBuilder<L = Identity> {
79	max_request_size: u32,
80	max_response_size: u32,
81	request_timeout: Duration,
82	#[cfg(feature = "tls")]
83	certificate_store: CertificateStore,
84	id_kind: IdKind,
85	max_log_length: u32,
86	headers: HeaderMap,
87	service_builder: tower::ServiceBuilder<L>,
88	tcp_no_delay: bool,
89	max_concurrent_requests: Option<usize>,
90}
91
92impl<L> HttpClientBuilder<L> {
93	/// Set the maximum size of a request body in bytes. Default is 10 MiB.
94	pub fn max_request_size(mut self, size: u32) -> Self {
95		self.max_request_size = size;
96		self
97	}
98
99	/// Set the maximum size of a response in bytes. Default is 10 MiB.
100	pub fn max_response_size(mut self, size: u32) -> Self {
101		self.max_response_size = size;
102		self
103	}
104
105	/// Set request timeout (default is 60 seconds).
106	pub fn request_timeout(mut self, timeout: Duration) -> Self {
107		self.request_timeout = timeout;
108		self
109	}
110
111	/// Set the maximum number of concurrent requests. Default disabled.
112	pub fn max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
113		self.max_concurrent_requests = Some(max_concurrent_requests);
114		self
115	}
116
117	/// Force to use the rustls native certificate store.
118	///
119	/// Since multiple certificate stores can be optionally enabled, this option will
120	/// force the `native certificate store` to be used.
121	///
122	/// # Optional
123	///
124	/// This requires the optional `tls` feature.
125	///
126	/// # Example
127	///
128	/// ```no_run
129	/// use jsonrpsee_http_client::{HttpClientBuilder, CustomCertStore};
130	/// use rustls::{
131	///     client::danger::{self, HandshakeSignatureValid, ServerCertVerified},
132	///     pki_types::{CertificateDer, ServerName, UnixTime},
133	///     Error,
134	/// };
135	///
136	/// #[derive(Debug)]
137	/// struct NoCertificateVerification;
138	///
139	/// impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
140	///     fn verify_server_cert(
141	///         &self,
142	///         _: &CertificateDer<'_>,
143	///         _: &[CertificateDer<'_>],
144	///         _: &ServerName<'_>,
145	///         _: &[u8],
146	///         _: UnixTime,
147	///     ) -> Result<ServerCertVerified, Error> {
148	///         Ok(ServerCertVerified::assertion())
149	///     }
150	///
151	///     fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
152	///         vec![rustls::SignatureScheme::ECDSA_NISTP256_SHA256]
153	///     }
154	///
155	///     fn verify_tls12_signature(
156	///         &self,
157	///         _: &[u8],
158	///         _: &CertificateDer<'_>,
159	///         _: &rustls::DigitallySignedStruct,
160	///     ) -> Result<rustls::client::danger::HandshakeSignatureValid, Error> {
161	///         Ok(HandshakeSignatureValid::assertion())
162	///     }
163	///
164	///     fn verify_tls13_signature(
165	///         &self,
166	///         _: &[u8],
167	///         _: &CertificateDer<'_>,
168	///         _: &rustls::DigitallySignedStruct,
169	///     ) -> Result<HandshakeSignatureValid, Error> {
170	///         Ok(HandshakeSignatureValid::assertion())
171	///     }
172	/// }
173	///
174	/// let tls_cfg = CustomCertStore::builder()
175	///    .dangerous()
176	///    .with_custom_certificate_verifier(std::sync::Arc::new(NoCertificateVerification))
177	///    .with_no_client_auth();
178	///
179	/// // client builder with disabled certificate verification.
180	/// let client_builder = HttpClientBuilder::new().with_custom_cert_store(tls_cfg);
181	/// ```
182	#[cfg(feature = "tls")]
183	pub fn with_custom_cert_store(mut self, cfg: CustomCertStore) -> Self {
184		self.certificate_store = CertificateStore::Custom(cfg);
185		self
186	}
187
188	/// Configure the data type of the request object ID (default is number).
189	pub fn id_format(mut self, id_kind: IdKind) -> Self {
190		self.id_kind = id_kind;
191		self
192	}
193
194	/// Max length for logging for requests and responses in number characters.
195	///
196	/// Logs bigger than this limit will be truncated.
197	pub fn set_max_logging_length(mut self, max: u32) -> Self {
198		self.max_log_length = max;
199		self
200	}
201
202	/// Set a custom header passed to the server with every request (default is none).
203	///
204	/// The caller is responsible for checking that the headers do not conflict or are duplicated.
205	pub fn set_headers(mut self, headers: HeaderMap) -> Self {
206		self.headers = headers;
207		self
208	}
209
210	/// Configure `TCP_NODELAY` on the socket to the supplied value `nodelay`.
211	///
212	/// Default is `true`.
213	pub fn set_tcp_no_delay(mut self, no_delay: bool) -> Self {
214		self.tcp_no_delay = no_delay;
215		self
216	}
217
218	/// Set custom tower middleware.
219	pub fn set_http_middleware<T>(self, service_builder: tower::ServiceBuilder<T>) -> HttpClientBuilder<T> {
220		HttpClientBuilder {
221			#[cfg(feature = "tls")]
222			certificate_store: self.certificate_store,
223			id_kind: self.id_kind,
224			headers: self.headers,
225			max_log_length: self.max_log_length,
226			max_request_size: self.max_request_size,
227			max_response_size: self.max_response_size,
228			service_builder,
229			request_timeout: self.request_timeout,
230			tcp_no_delay: self.tcp_no_delay,
231			max_concurrent_requests: self.max_concurrent_requests,
232		}
233	}
234}
235
236impl<B, S, L> HttpClientBuilder<L>
237where
238	L: Layer<transport::HttpBackend, Service = S>,
239	S: Service<HttpRequest, Response = HttpResponse<B>, Error = TransportError> + Clone,
240	B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
241	B::Data: Send,
242	B::Error: Into<BoxError>,
243{
244	/// Build the HTTP client with target to connect to.
245	pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient<S>, Error> {
246		let Self {
247			max_request_size,
248			max_response_size,
249			request_timeout,
250			#[cfg(feature = "tls")]
251			certificate_store,
252			id_kind,
253			headers,
254			max_log_length,
255			service_builder,
256			tcp_no_delay,
257			..
258		} = self;
259
260		let transport = HttpTransportClientBuilder {
261			max_request_size,
262			max_response_size,
263			headers,
264			max_log_length,
265			tcp_no_delay,
266			service_builder,
267			#[cfg(feature = "tls")]
268			certificate_store,
269		}
270		.build(target)
271		.map_err(|e| Error::Transport(e.into()))?;
272
273		let request_guard = self
274			.max_concurrent_requests
275			.map(|max_concurrent_requests| Arc::new(Semaphore::new(max_concurrent_requests)));
276
277		Ok(HttpClient {
278			transport,
279			id_manager: Arc::new(RequestIdManager::new(id_kind)),
280			request_timeout,
281			request_guard,
282		})
283	}
284}
285
286impl Default for HttpClientBuilder<Identity> {
287	fn default() -> Self {
288		Self {
289			max_request_size: TEN_MB_SIZE_BYTES,
290			max_response_size: TEN_MB_SIZE_BYTES,
291			request_timeout: Duration::from_secs(60),
292			#[cfg(feature = "tls")]
293			certificate_store: CertificateStore::Native,
294			id_kind: IdKind::Number,
295			max_log_length: 4096,
296			headers: HeaderMap::new(),
297			service_builder: tower::ServiceBuilder::new(),
298			tcp_no_delay: true,
299			max_concurrent_requests: None,
300		}
301	}
302}
303
304impl HttpClientBuilder<Identity> {
305	/// Create a new builder.
306	pub fn new() -> HttpClientBuilder<Identity> {
307		HttpClientBuilder::default()
308	}
309}
310
311/// JSON-RPC HTTP Client that provides functionality to perform method calls and notifications.
312#[derive(Debug, Clone)]
313pub struct HttpClient<S = HttpBackend> {
314	/// HTTP transport client.
315	transport: HttpTransportClient<S>,
316	/// Request timeout. Defaults to 60sec.
317	request_timeout: Duration,
318	/// Request ID manager.
319	id_manager: Arc<RequestIdManager>,
320	/// Concurrent requests limit guard.
321	request_guard: Option<Arc<Semaphore>>,
322}
323
324impl HttpClient<HttpBackend> {
325	/// Create a builder for the HttpClient.
326	pub fn builder() -> HttpClientBuilder<Identity> {
327		HttpClientBuilder::new()
328	}
329}
330
331#[async_trait]
332impl<B, S> ClientT for HttpClient<S>
333where
334	S: Service<HttpRequest, Response = HttpResponse<B>, Error = TransportError> + Send + Sync + Clone,
335	<S as Service<HttpRequest>>::Future: Send,
336	B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
337	B::Error: Into<BoxError>,
338	B::Data: Send,
339{
340	#[instrument(name = "notification", skip(self, params), level = "trace")]
341	async fn notification<Params>(&self, method: &str, params: Params) -> Result<(), Error>
342	where
343		Params: ToRpcParams + Send,
344	{
345		let _permit = match self.request_guard.as_ref() {
346			Some(permit) => permit.acquire().await.ok(),
347			None => None,
348		};
349		let params = params.to_rpc_params()?;
350		let notif =
351			serde_json::to_string(&NotificationSer::borrowed(&method, params.as_deref())).map_err(Error::ParseError)?;
352
353		let fut = self.transport.send(notif);
354
355		match tokio::time::timeout(self.request_timeout, fut).await {
356			Ok(Ok(ok)) => Ok(ok),
357			Err(_) => Err(Error::RequestTimeout),
358			Ok(Err(e)) => Err(Error::Transport(e.into())),
359		}
360	}
361
362	#[instrument(name = "method_call", skip(self, params), level = "trace")]
363	async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, Error>
364	where
365		R: DeserializeOwned,
366		Params: ToRpcParams + Send,
367	{
368		let _permit = match self.request_guard.as_ref() {
369			Some(permit) => permit.acquire().await.ok(),
370			None => None,
371		};
372		let id = self.id_manager.next_request_id();
373		let params = params.to_rpc_params()?;
374
375		let request = RequestSer::borrowed(&id, &method, params.as_deref());
376		let raw = serde_json::to_string(&request).map_err(Error::ParseError)?;
377
378		let fut = self.transport.send_and_read_body(raw);
379		let body = match tokio::time::timeout(self.request_timeout, fut).await {
380			Ok(Ok(body)) => body,
381			Err(_e) => {
382				return Err(Error::RequestTimeout);
383			}
384			Ok(Err(e)) => {
385				return Err(Error::Transport(e.into()));
386			}
387		};
388
389		// NOTE: it's decoded first to `JsonRawValue` and then to `R` below to get
390		// a better error message if `R` couldn't be decoded.
391		let response = ResponseSuccess::try_from(serde_json::from_slice::<Response<&JsonRawValue>>(&body)?)?;
392
393		let result = serde_json::from_str(response.result.get()).map_err(Error::ParseError)?;
394
395		if response.id == id {
396			Ok(result)
397		} else {
398			Err(InvalidRequestId::NotPendingRequest(response.id.to_string()).into())
399		}
400	}
401
402	#[instrument(name = "batch", skip(self, batch), level = "trace")]
403	async fn batch_request<'a, R>(&self, batch: BatchRequestBuilder<'a>) -> Result<BatchResponse<'a, R>, Error>
404	where
405		R: DeserializeOwned + fmt::Debug + 'a,
406	{
407		let _permit = match self.request_guard.as_ref() {
408			Some(permit) => permit.acquire().await.ok(),
409			None => None,
410		};
411		let batch = batch.build()?;
412		let id = self.id_manager.next_request_id();
413		let id_range = generate_batch_id_range(id, batch.len() as u64)?;
414
415		let mut batch_request = Vec::with_capacity(batch.len());
416		for ((method, params), id) in batch.into_iter().zip(id_range.clone()) {
417			let id = self.id_manager.as_id_kind().into_id(id);
418			batch_request.push(RequestSer {
419				jsonrpc: TwoPointZero,
420				id,
421				method: method.into(),
422				params: params.map(StdCow::Owned),
423			});
424		}
425
426		let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);
427
428		let body = match tokio::time::timeout(self.request_timeout, fut).await {
429			Ok(Ok(body)) => body,
430			Err(_e) => return Err(Error::RequestTimeout),
431			Ok(Err(e)) => return Err(Error::Transport(e.into())),
432		};
433
434		let json_rps: Vec<Response<&JsonRawValue>> = serde_json::from_slice(&body).map_err(Error::ParseError)?;
435
436		let mut responses = Vec::with_capacity(json_rps.len());
437		let mut successful_calls = 0;
438		let mut failed_calls = 0;
439
440		for _ in 0..json_rps.len() {
441			responses.push(Err(ErrorObject::borrowed(0, "", None)));
442		}
443
444		for rp in json_rps {
445			let id = rp.id.try_parse_inner_as_number()?;
446
447			let res = match ResponseSuccess::try_from(rp) {
448				Ok(r) => {
449					let result = serde_json::from_str(r.result.get())?;
450					successful_calls += 1;
451					Ok(result)
452				}
453				Err(err) => {
454					failed_calls += 1;
455					Err(err)
456				}
457			};
458
459			let maybe_elem = id
460				.checked_sub(id_range.start)
461				.and_then(|p| p.try_into().ok())
462				.and_then(|p: usize| responses.get_mut(p));
463
464			if let Some(elem) = maybe_elem {
465				*elem = res;
466			} else {
467				return Err(InvalidRequestId::NotPendingRequest(id.to_string()).into());
468			}
469		}
470
471		Ok(BatchResponse::new(successful_calls, responses, failed_calls))
472	}
473}
474
475#[async_trait]
476impl<B, S> SubscriptionClientT for HttpClient<S>
477where
478	S: Service<HttpRequest, Response = HttpResponse<B>, Error = TransportError> + Send + Sync + Clone,
479	<S as Service<HttpRequest>>::Future: Send,
480	B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
481	B::Data: Send,
482	B::Error: Into<BoxError>,
483{
484	/// Send a subscription request to the server. Not implemented for HTTP; will always return
485	/// [`Error::HttpNotImplemented`].
486	#[instrument(name = "subscription", fields(method = _subscribe_method), skip(self, _params, _subscribe_method, _unsubscribe_method), level = "trace")]
487	async fn subscribe<'a, N, Params>(
488		&self,
489		_subscribe_method: &'a str,
490		_params: Params,
491		_unsubscribe_method: &'a str,
492	) -> Result<Subscription<N>, Error>
493	where
494		Params: ToRpcParams + Send,
495		N: DeserializeOwned,
496	{
497		Err(Error::HttpNotImplemented)
498	}
499
500	/// Subscribe to a specific method. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`].
501	#[instrument(name = "subscribe_method", fields(method = _method), skip(self, _method), level = "trace")]
502	async fn subscribe_to_method<'a, N>(&self, _method: &'a str) -> Result<Subscription<N>, Error>
503	where
504		N: DeserializeOwned,
505	{
506		Err(Error::HttpNotImplemented)
507	}
508}