Skip to main content

jsonrpsee_http_client/
transport.rs

1// Implementation note: hyper's API is not adapted to async/await at all, and there's
2// unfortunately a lot of boilerplate here that could be removed once/if it gets reworked.
3//
4// Additionally, despite the fact that hyper is capable of performing requests to multiple different
5// servers through the same `hyper::Client`, we don't use that feature on purpose. The reason is
6// that we need to be guaranteed that hyper doesn't re-use an existing connection if we ever reset
7// the JSON-RPC request id to a value that might have already been used.
8
9use base64::Engine;
10use hyper::body::Bytes;
11use hyper::http::{HeaderMap, HeaderValue};
12use hyper_util::client::legacy::Client;
13use hyper_util::client::legacy::connect::HttpConnector;
14use hyper_util::rt::TokioExecutor;
15use jsonrpsee_core::BoxError;
16use jsonrpsee_core::{
17	TEN_MB_SIZE_BYTES,
18	http_helpers::{self, HttpError},
19};
20use std::future::Future;
21use std::pin::Pin;
22use std::task::{Context, Poll};
23use thiserror::Error;
24use tower::layer::util::Identity;
25use tower::{Layer, Service, ServiceExt};
26use url::Url;
27
28use crate::{HttpBody, HttpRequest, HttpResponse};
29
30#[cfg(feature = "tls")]
31use crate::{CertificateStore, CustomCertStore};
32
33const CONTENT_TYPE_JSON: &str = "application/json";
34
35/// Wrapper over HTTP transport and connector.
36#[derive(Debug)]
37pub enum HttpBackend<B = HttpBody> {
38	/// Hyper client with https connector.
39	#[cfg(feature = "tls")]
40	Https(Client<hyper_rustls::HttpsConnector<HttpConnector>, B>),
41	/// Hyper client with http connector.
42	Http(Client<HttpConnector, B>),
43}
44
45impl<B> Clone for HttpBackend<B> {
46	fn clone(&self) -> Self {
47		match self {
48			Self::Http(inner) => Self::Http(inner.clone()),
49			#[cfg(feature = "tls")]
50			Self::Https(inner) => Self::Https(inner.clone()),
51		}
52	}
53}
54
55impl<B> tower::Service<HttpRequest<B>> for HttpBackend<B>
56where
57	B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
58	B::Data: Send,
59	B::Error: Into<BoxError>,
60{
61	type Response = HttpResponse<hyper::body::Incoming>;
62	type Error = Error;
63	type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
64
65	fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
66		match self {
67			Self::Http(inner) => inner.poll_ready(ctx),
68			#[cfg(feature = "tls")]
69			Self::Https(inner) => inner.poll_ready(ctx),
70		}
71		.map_err(|e| Error::Http(HttpError::Stream(e.into())))
72	}
73
74	fn call(&mut self, req: HttpRequest<B>) -> Self::Future {
75		let resp = match self {
76			Self::Http(inner) => inner.call(req),
77			#[cfg(feature = "tls")]
78			Self::Https(inner) => inner.call(req),
79		};
80
81		Box::pin(async move { resp.await.map_err(|e| Error::Http(HttpError::Stream(e.into()))) })
82	}
83}
84
85/// Builder for [`HttpTransportClient`].
86#[derive(Debug)]
87pub struct HttpTransportClientBuilder<L> {
88	/// Certificate store.
89	#[cfg(feature = "tls")]
90	pub(crate) certificate_store: CertificateStore,
91	/// Configurable max request body size
92	pub(crate) max_request_size: u32,
93	/// Configurable max response body size
94	pub(crate) max_response_size: u32,
95	/// Custom headers to pass with every request.
96	pub(crate) headers: HeaderMap,
97	/// Service builder
98	pub(crate) service_builder: tower::ServiceBuilder<L>,
99	/// TCP_NODELAY
100	pub(crate) tcp_no_delay: bool,
101}
102
103impl Default for HttpTransportClientBuilder<Identity> {
104	fn default() -> Self {
105		Self::new()
106	}
107}
108
109impl HttpTransportClientBuilder<Identity> {
110	/// Create a new [`HttpTransportClientBuilder`].
111	pub fn new() -> Self {
112		Self {
113			#[cfg(feature = "tls")]
114			certificate_store: CertificateStore::Native,
115			max_request_size: TEN_MB_SIZE_BYTES,
116			max_response_size: TEN_MB_SIZE_BYTES,
117			headers: HeaderMap::new(),
118			service_builder: tower::ServiceBuilder::new(),
119			tcp_no_delay: true,
120		}
121	}
122}
123
124impl<L> HttpTransportClientBuilder<L> {
125	/// See docs [`crate::HttpClientBuilder::with_custom_cert_store`] for more information.
126	#[cfg(feature = "tls")]
127	pub fn with_custom_cert_store(mut self, cfg: CustomCertStore) -> Self {
128		self.certificate_store = CertificateStore::Custom(cfg);
129		self
130	}
131
132	/// Set the maximum size of a request body in bytes. Default is 10 MiB.
133	pub fn max_request_size(mut self, size: u32) -> Self {
134		self.max_request_size = size;
135		self
136	}
137
138	/// Set the maximum size of a response in bytes. Default is 10 MiB.
139	pub fn max_response_size(mut self, size: u32) -> Self {
140		self.max_response_size = size;
141		self
142	}
143
144	/// Set a custom header passed to the server with every request (default is none).
145	///
146	/// The caller is responsible for checking that the headers do not conflict or are duplicated.
147	pub fn set_headers(mut self, headers: HeaderMap) -> Self {
148		self.headers = headers;
149		self
150	}
151
152	/// Configure `TCP_NODELAY` on the socket to the supplied value `nodelay`.
153	///
154	/// Default is `true`.
155	pub fn set_tcp_no_delay(mut self, no_delay: bool) -> Self {
156		self.tcp_no_delay = no_delay;
157		self
158	}
159
160	/// Configure a tower service.
161	pub fn set_service<T>(self, service: tower::ServiceBuilder<T>) -> HttpTransportClientBuilder<T> {
162		HttpTransportClientBuilder {
163			#[cfg(feature = "tls")]
164			certificate_store: self.certificate_store,
165			headers: self.headers,
166			max_request_size: self.max_request_size,
167			max_response_size: self.max_response_size,
168			service_builder: service,
169			tcp_no_delay: self.tcp_no_delay,
170		}
171	}
172
173	/// Build a [`HttpTransportClient`].
174	pub fn build<S, B>(self, target: impl AsRef<str>) -> Result<HttpTransportClient<S>, Error>
175	where
176		L: Layer<HttpBackend, Service = S>,
177		S: Service<HttpRequest, Response = HttpResponse<B>, Error = Error> + Clone,
178		B: http_body::Body<Data = Bytes> + Send + 'static,
179		B::Data: Send,
180		B::Error: Into<BoxError>,
181	{
182		let Self {
183			#[cfg(feature = "tls")]
184			certificate_store,
185			max_request_size,
186			max_response_size,
187			headers,
188			service_builder,
189			tcp_no_delay,
190		} = self;
191		let mut url = Url::parse(target.as_ref()).map_err(|e| Error::Url(format!("Invalid URL: {e}")))?;
192
193		if url.host_str().is_none() {
194			return Err(Error::Url("Invalid host".into()));
195		}
196		url.set_fragment(None);
197
198		let client = match url.scheme() {
199			"http" => {
200				let mut connector = HttpConnector::new();
201				connector.set_nodelay(tcp_no_delay);
202				HttpBackend::Http(Client::builder(TokioExecutor::new()).build(connector))
203			}
204			#[cfg(feature = "tls")]
205			"https" => {
206				// Make sure that the TLS provider is set. If not, set a default one.
207				// Otherwise, creating `tls` configuration may panic if there are multiple
208				// providers available due to `rustls` features (e.g. both `ring` and `aws-lc-rs`).
209				// Function returns an error if the provider is already installed, and we're fine with it.
210				let _ = rustls::crypto::ring::default_provider().install_default();
211
212				let mut http_conn = HttpConnector::new();
213				http_conn.set_nodelay(tcp_no_delay);
214				http_conn.enforce_http(false);
215
216				let https_conn = match certificate_store {
217					CertificateStore::Native => {
218						use rustls_platform_verifier::ConfigVerifierExt;
219
220						hyper_rustls::HttpsConnectorBuilder::new()
221							.with_tls_config(rustls::ClientConfig::with_platform_verifier())
222							.https_or_http()
223							.enable_all_versions()
224							.wrap_connector(http_conn)
225					}
226
227					CertificateStore::Custom(tls_config) => hyper_rustls::HttpsConnectorBuilder::new()
228						.with_tls_config(tls_config)
229						.https_or_http()
230						.enable_all_versions()
231						.wrap_connector(http_conn),
232				};
233
234				HttpBackend::Https(Client::builder(TokioExecutor::new()).build(https_conn))
235			}
236			_ => {
237				#[cfg(feature = "tls")]
238				let err = "URL scheme not supported, expects 'http' or 'https'";
239				#[cfg(not(feature = "tls"))]
240				let err = "URL scheme not supported, expects 'http'";
241				return Err(Error::Url(err.into()));
242			}
243		};
244
245		// Cache request headers: 2 default headers, followed by user custom headers.
246		// Maintain order for headers in case of duplicate keys:
247		// https://datatracker.ietf.org/doc/html/rfc7230#section-3.2.2
248		let mut cached_headers = HeaderMap::with_capacity(2 + headers.len());
249		cached_headers.insert(hyper::header::CONTENT_TYPE, HeaderValue::from_static(CONTENT_TYPE_JSON));
250		cached_headers.insert(hyper::header::ACCEPT, HeaderValue::from_static(CONTENT_TYPE_JSON));
251		for (key, value) in headers.into_iter() {
252			if let Some(key) = key {
253				cached_headers.insert(key, value);
254			}
255		}
256
257		if let Some(pwd) = url.password() {
258			if !cached_headers.contains_key(hyper::header::AUTHORIZATION) {
259				let digest = base64::engine::general_purpose::STANDARD.encode(format!("{}:{pwd}", url.username()));
260				cached_headers.insert(
261					hyper::header::AUTHORIZATION,
262					HeaderValue::from_str(&format!("Basic {digest}"))
263						.map_err(|_| Error::Url("Header value `authorization basic user:pwd` invalid".into()))?,
264				);
265			}
266		}
267		let _ = url.set_password(None);
268		let _ = url.set_username("");
269
270		Ok(HttpTransportClient {
271			target: url.as_str().to_owned(),
272			client: service_builder.service(client),
273			max_request_size,
274			max_response_size,
275			headers: cached_headers,
276		})
277	}
278}
279
280/// HTTP Transport Client.
281#[derive(Debug, Clone)]
282pub struct HttpTransportClient<S> {
283	/// Target to connect to.
284	target: String,
285	/// HTTP client
286	client: S,
287	/// Configurable max request body size
288	max_request_size: u32,
289	/// Configurable max response body size
290	max_response_size: u32,
291	/// Custom headers to pass with every request.
292	headers: HeaderMap,
293}
294
295impl<B, S> HttpTransportClient<S>
296where
297	S: Service<HttpRequest, Response = HttpResponse<B>, Error = Error> + Clone,
298	B: http_body::Body<Data = Bytes> + Send + 'static,
299	B::Data: Send,
300	B::Error: Into<BoxError>,
301{
302	async fn inner_send(&self, body: String) -> Result<HttpResponse<B>, Error> {
303		if body.len() > self.max_request_size as usize {
304			return Err(Error::RequestTooLarge);
305		}
306
307		let mut req = HttpRequest::post(&self.target);
308		if let Some(headers) = req.headers_mut() {
309			*headers = self.headers.clone();
310		}
311
312		let req = req.body(body.into()).expect("URI and request headers are valid; qed");
313		let response = self.client.clone().ready().await?.call(req).await?;
314
315		if response.status().is_success() {
316			Ok(response)
317		} else {
318			Err(Error::Rejected { status_code: response.status().into() })
319		}
320	}
321
322	/// Send serialized message and wait until all bytes from the HTTP message body have been read.
323	pub(crate) async fn send_and_read_body(&self, body: String) -> Result<Vec<u8>, Error> {
324		let response = self.inner_send(body).await?;
325
326		let (parts, body) = response.into_parts();
327		let (body, _is_single) = http_helpers::read_body(&parts.headers, body, self.max_response_size).await?;
328
329		Ok(body)
330	}
331
332	/// Send serialized message without reading the HTTP message body.
333	pub(crate) async fn send(&self, body: String) -> Result<(), Error> {
334		self.inner_send(body).await?;
335		Ok(())
336	}
337}
338
339/// Error that can happen during a request.
340#[derive(Debug, Error)]
341pub enum Error {
342	/// Invalid URL.
343	#[error("Invalid Url: {0}")]
344	Url(String),
345
346	/// Error during the HTTP request, including networking errors and HTTP protocol errors.
347	#[error(transparent)]
348	Http(#[from] HttpError),
349
350	/// Server returned a non-success status code.
351	#[error("Request rejected `{status_code}`")]
352	Rejected {
353		/// HTTP Status code returned by the server.
354		status_code: u16,
355	},
356
357	/// Request body too large.
358	#[error("The request body was too large")]
359	RequestTooLarge,
360
361	/// Invalid certificate store.
362	#[error("Invalid certificate store")]
363	InvalidCertficateStore,
364}
365
366#[cfg(test)]
367mod tests {
368	use super::*;
369
370	#[test]
371	fn invalid_http_url_rejected() {
372		let err = HttpTransportClientBuilder::new().build("ws://localhost:9933").unwrap_err();
373		assert!(matches!(err, Error::Url(_)));
374	}
375
376	#[cfg(feature = "tls")]
377	#[test]
378	fn https_works() {
379		let client = HttpTransportClientBuilder::new().build("https://localhost").unwrap();
380		assert_eq!(&client.target, "https://localhost/");
381	}
382
383	#[cfg(not(feature = "tls"))]
384	#[test]
385	fn https_fails_without_tls_feature() {
386		let err = HttpTransportClientBuilder::new().build("https://localhost").unwrap_err();
387		assert!(matches!(err, Error::Url(_)));
388	}
389
390	#[test]
391	fn faulty_port() {
392		let err = HttpTransportClientBuilder::new().build("http://localhost:-43").unwrap_err();
393		assert!(matches!(err, Error::Url(_)));
394
395		let err = HttpTransportClientBuilder::new().build("http://localhost:-99999").unwrap_err();
396		assert!(matches!(err, Error::Url(_)));
397	}
398
399	#[test]
400	fn url_with_path_works() {
401		let client = HttpTransportClientBuilder::new().build("http://localhost/my-special-path").unwrap();
402		assert_eq!(&client.target, "http://localhost/my-special-path");
403	}
404
405	#[test]
406	fn url_with_query_works() {
407		let client = HttpTransportClientBuilder::new().build("http://127.0.0.1/my?name1=value1&name2=value2").unwrap();
408		assert_eq!(&client.target, "http://127.0.0.1/my?name1=value1&name2=value2");
409	}
410
411	#[test]
412	fn url_with_fragment_is_ignored() {
413		let client = HttpTransportClientBuilder::new().build("http://127.0.0.1/my.htm#ignore").unwrap();
414		assert_eq!(&client.target, "http://127.0.0.1/my.htm");
415	}
416
417	#[test]
418	fn url_default_port_is_omitted() {
419		let client = HttpTransportClientBuilder::new().build("http://127.0.0.1:80").unwrap();
420		assert_eq!(&client.target, "http://127.0.0.1/");
421	}
422
423	#[cfg(feature = "tls")]
424	#[test]
425	fn https_custom_port_works() {
426		let client = HttpTransportClientBuilder::new().build("https://localhost:9999").unwrap();
427		assert_eq!(&client.target, "https://localhost:9999/");
428	}
429
430	#[test]
431	fn http_custom_port_works() {
432		let client = HttpTransportClientBuilder::new().build("http://localhost:9999").unwrap();
433		assert_eq!(&client.target, "http://localhost:9999/");
434	}
435
436	#[tokio::test]
437	async fn request_limit_works() {
438		let eighty_bytes_limit = 80;
439		let fifty_bytes_limit = 50;
440
441		let client = HttpTransportClientBuilder::new()
442			.max_request_size(eighty_bytes_limit)
443			.max_response_size(fifty_bytes_limit)
444			.build("http://localhost:9933")
445			.unwrap();
446
447		assert_eq!(client.max_request_size, eighty_bytes_limit);
448		assert_eq!(client.max_response_size, fifty_bytes_limit);
449
450		let body = "a".repeat(81);
451		assert_eq!(body.len(), 81);
452		let response = client.send(body).await.unwrap_err();
453		assert!(matches!(response, Error::RequestTooLarge));
454	}
455}