jsonrpsee_http_client/
transport.rs

1// Implementation note: hyper's API is not adapted to async/await at all, and there's
2// unfortunately a lot of boilerplate here that could be removed once/if it gets reworked.
3//
4// Additionally, despite the fact that hyper is capable of performing requests to multiple different
5// servers through the same `hyper::Client`, we don't use that feature on purpose. The reason is
6// that we need to be guaranteed that hyper doesn't re-use an existing connection if we ever reset
7// the JSON-RPC request id to a value that might have already been used.
8
9use base64::Engine;
10use hyper::body::Bytes;
11use hyper::http::{HeaderMap, HeaderValue};
12use hyper_util::client::legacy::connect::HttpConnector;
13use hyper_util::client::legacy::Client;
14use hyper_util::rt::TokioExecutor;
15use jsonrpsee_core::tracing::client::{rx_log_from_bytes, tx_log_from_str};
16use jsonrpsee_core::BoxError;
17use jsonrpsee_core::{
18	http_helpers::{self, HttpError},
19	TEN_MB_SIZE_BYTES,
20};
21use std::future::Future;
22use std::pin::Pin;
23use std::task::{Context, Poll};
24use thiserror::Error;
25use tower::layer::util::Identity;
26use tower::{Layer, Service, ServiceExt};
27use url::Url;
28
29use crate::{HttpBody, HttpRequest, HttpResponse};
30
31#[cfg(feature = "tls")]
32use crate::{CertificateStore, CustomCertStore};
33
34const CONTENT_TYPE_JSON: &str = "application/json";
35
36/// Wrapper over HTTP transport and connector.
37#[derive(Debug)]
38pub enum HttpBackend<B = HttpBody> {
39	/// Hyper client with https connector.
40	#[cfg(feature = "tls")]
41	Https(Client<hyper_rustls::HttpsConnector<HttpConnector>, B>),
42	/// Hyper client with http connector.
43	Http(Client<HttpConnector, B>),
44}
45
46impl<B> Clone for HttpBackend<B> {
47	fn clone(&self) -> Self {
48		match self {
49			Self::Http(inner) => Self::Http(inner.clone()),
50			#[cfg(feature = "tls")]
51			Self::Https(inner) => Self::Https(inner.clone()),
52		}
53	}
54}
55
56impl<B> tower::Service<HttpRequest<B>> for HttpBackend<B>
57where
58	B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
59	B::Data: Send,
60	B::Error: Into<BoxError>,
61{
62	type Response = HttpResponse<hyper::body::Incoming>;
63	type Error = Error;
64	type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
65
66	fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
67		match self {
68			Self::Http(inner) => inner.poll_ready(ctx),
69			#[cfg(feature = "tls")]
70			Self::Https(inner) => inner.poll_ready(ctx),
71		}
72		.map_err(|e| Error::Http(HttpError::Stream(e.into())))
73	}
74
75	fn call(&mut self, req: HttpRequest<B>) -> Self::Future {
76		let resp = match self {
77			Self::Http(inner) => inner.call(req),
78			#[cfg(feature = "tls")]
79			Self::Https(inner) => inner.call(req),
80		};
81
82		Box::pin(async move { resp.await.map_err(|e| Error::Http(HttpError::Stream(e.into()))) })
83	}
84}
85
86/// Builder for [`HttpTransportClient`].
87#[derive(Debug)]
88pub struct HttpTransportClientBuilder<L> {
89	/// Certificate store.
90	#[cfg(feature = "tls")]
91	pub(crate) certificate_store: CertificateStore,
92	/// Configurable max request body size
93	pub(crate) max_request_size: u32,
94	/// Configurable max response body size
95	pub(crate) max_response_size: u32,
96	/// Max length for logging for requests and responses
97	///
98	/// Logs bigger than this limit will be truncated.
99	pub(crate) max_log_length: u32,
100	/// Custom headers to pass with every request.
101	pub(crate) headers: HeaderMap,
102	/// Service builder
103	pub(crate) service_builder: tower::ServiceBuilder<L>,
104	/// TCP_NODELAY
105	pub(crate) tcp_no_delay: bool,
106}
107
108impl Default for HttpTransportClientBuilder<Identity> {
109	fn default() -> Self {
110		Self::new()
111	}
112}
113
114impl HttpTransportClientBuilder<Identity> {
115	/// Create a new [`HttpTransportClientBuilder`].
116	pub fn new() -> Self {
117		Self {
118			#[cfg(feature = "tls")]
119			certificate_store: CertificateStore::Native,
120			max_request_size: TEN_MB_SIZE_BYTES,
121			max_response_size: TEN_MB_SIZE_BYTES,
122			max_log_length: 1024,
123			headers: HeaderMap::new(),
124			service_builder: tower::ServiceBuilder::new(),
125			tcp_no_delay: true,
126		}
127	}
128}
129
130impl<L> HttpTransportClientBuilder<L> {
131	/// See docs [`crate::HttpClientBuilder::with_custom_cert_store`] for more information.
132	#[cfg(feature = "tls")]
133	pub fn with_custom_cert_store(mut self, cfg: CustomCertStore) -> Self {
134		self.certificate_store = CertificateStore::Custom(cfg);
135		self
136	}
137
138	/// Set the maximum size of a request body in bytes. Default is 10 MiB.
139	pub fn max_request_size(mut self, size: u32) -> Self {
140		self.max_request_size = size;
141		self
142	}
143
144	/// Set the maximum size of a response in bytes. Default is 10 MiB.
145	pub fn max_response_size(mut self, size: u32) -> Self {
146		self.max_response_size = size;
147		self
148	}
149
150	/// Set a custom header passed to the server with every request (default is none).
151	///
152	/// The caller is responsible for checking that the headers do not conflict or are duplicated.
153	pub fn set_headers(mut self, headers: HeaderMap) -> Self {
154		self.headers = headers;
155		self
156	}
157
158	/// Configure `TCP_NODELAY` on the socket to the supplied value `nodelay`.
159	///
160	/// Default is `true`.
161	pub fn set_tcp_no_delay(mut self, no_delay: bool) -> Self {
162		self.tcp_no_delay = no_delay;
163		self
164	}
165
166	/// Max length for logging for requests and responses in number characters.
167	///
168	/// Logs bigger than this limit will be truncated.
169	pub fn set_max_logging_length(mut self, max: u32) -> Self {
170		self.max_log_length = max;
171		self
172	}
173
174	/// Configure a tower service.
175	pub fn set_service<T>(self, service: tower::ServiceBuilder<T>) -> HttpTransportClientBuilder<T> {
176		HttpTransportClientBuilder {
177			#[cfg(feature = "tls")]
178			certificate_store: self.certificate_store,
179			headers: self.headers,
180			max_log_length: self.max_log_length,
181			max_request_size: self.max_request_size,
182			max_response_size: self.max_response_size,
183			service_builder: service,
184			tcp_no_delay: self.tcp_no_delay,
185		}
186	}
187
188	/// Build a [`HttpTransportClient`].
189	pub fn build<S, B>(self, target: impl AsRef<str>) -> Result<HttpTransportClient<S>, Error>
190	where
191		L: Layer<HttpBackend, Service = S>,
192		S: Service<HttpRequest, Response = HttpResponse<B>, Error = Error> + Clone,
193		B: http_body::Body<Data = Bytes> + Send + 'static,
194		B::Data: Send,
195		B::Error: Into<BoxError>,
196	{
197		let Self {
198			#[cfg(feature = "tls")]
199			certificate_store,
200			max_request_size,
201			max_response_size,
202			max_log_length,
203			headers,
204			service_builder,
205			tcp_no_delay,
206		} = self;
207		let mut url = Url::parse(target.as_ref()).map_err(|e| Error::Url(format!("Invalid URL: {e}")))?;
208
209		if url.host_str().is_none() {
210			return Err(Error::Url("Invalid host".into()));
211		}
212		url.set_fragment(None);
213
214		let client = match url.scheme() {
215			"http" => {
216				let mut connector = HttpConnector::new();
217				connector.set_nodelay(tcp_no_delay);
218				HttpBackend::Http(Client::builder(TokioExecutor::new()).build(connector))
219			}
220			#[cfg(feature = "tls")]
221			"https" => {
222				// Make sure that the TLS provider is set. If not, set a default one.
223				// Otherwise, creating `tls` configuration may panic if there are multiple
224				// providers available due to `rustls` features (e.g. both `ring` and `aws-lc-rs`).
225				// Function returns an error if the provider is already installed, and we're fine with it.
226				let _ = rustls::crypto::ring::default_provider().install_default();
227
228				let mut http_conn = HttpConnector::new();
229				http_conn.set_nodelay(tcp_no_delay);
230				http_conn.enforce_http(false);
231
232				let https_conn = match certificate_store {
233					CertificateStore::Native => {
234						use rustls_platform_verifier::ConfigVerifierExt;
235
236						hyper_rustls::HttpsConnectorBuilder::new()
237							.with_tls_config(rustls::ClientConfig::with_platform_verifier())
238							.https_or_http()
239							.enable_all_versions()
240							.wrap_connector(http_conn)
241					}
242
243					CertificateStore::Custom(tls_config) => hyper_rustls::HttpsConnectorBuilder::new()
244						.with_tls_config(tls_config)
245						.https_or_http()
246						.enable_all_versions()
247						.wrap_connector(http_conn),
248				};
249
250				HttpBackend::Https(Client::builder(TokioExecutor::new()).build(https_conn))
251			}
252			_ => {
253				#[cfg(feature = "tls")]
254				let err = "URL scheme not supported, expects 'http' or 'https'";
255				#[cfg(not(feature = "tls"))]
256				let err = "URL scheme not supported, expects 'http'";
257				return Err(Error::Url(err.into()));
258			}
259		};
260
261		// Cache request headers: 2 default headers, followed by user custom headers.
262		// Maintain order for headers in case of duplicate keys:
263		// https://datatracker.ietf.org/doc/html/rfc7230#section-3.2.2
264		let mut cached_headers = HeaderMap::with_capacity(2 + headers.len());
265		cached_headers.insert(hyper::header::CONTENT_TYPE, HeaderValue::from_static(CONTENT_TYPE_JSON));
266		cached_headers.insert(hyper::header::ACCEPT, HeaderValue::from_static(CONTENT_TYPE_JSON));
267		for (key, value) in headers.into_iter() {
268			if let Some(key) = key {
269				cached_headers.insert(key, value);
270			}
271		}
272
273		if let Some(pwd) = url.password() {
274			if !cached_headers.contains_key(hyper::header::AUTHORIZATION) {
275				let digest = base64::engine::general_purpose::STANDARD.encode(format!("{}:{pwd}", url.username()));
276				cached_headers.insert(
277					hyper::header::AUTHORIZATION,
278					HeaderValue::from_str(&format!("Basic {digest}"))
279						.map_err(|_| Error::Url("Header value `authorization basic user:pwd` invalid".into()))?,
280				);
281			}
282		}
283
284		Ok(HttpTransportClient {
285			target: url.as_str().to_owned(),
286			client: service_builder.service(client),
287			max_request_size,
288			max_response_size,
289			max_log_length,
290			headers: cached_headers,
291		})
292	}
293}
294
295/// HTTP Transport Client.
296#[derive(Debug, Clone)]
297pub struct HttpTransportClient<S> {
298	/// Target to connect to.
299	target: String,
300	/// HTTP client
301	client: S,
302	/// Configurable max request body size
303	max_request_size: u32,
304	/// Configurable max response body size
305	max_response_size: u32,
306	/// Max length for logging for requests and responses
307	///
308	/// Logs bigger than this limit will be truncated.
309	max_log_length: u32,
310	/// Custom headers to pass with every request.
311	headers: HeaderMap,
312}
313
314impl<B, S> HttpTransportClient<S>
315where
316	S: Service<HttpRequest, Response = HttpResponse<B>, Error = Error> + Clone,
317	B: http_body::Body<Data = Bytes> + Send + 'static,
318	B::Data: Send,
319	B::Error: Into<BoxError>,
320{
321	async fn inner_send(&self, body: String) -> Result<HttpResponse<B>, Error> {
322		if body.len() > self.max_request_size as usize {
323			return Err(Error::RequestTooLarge);
324		}
325
326		let mut req = HttpRequest::post(&self.target);
327		if let Some(headers) = req.headers_mut() {
328			*headers = self.headers.clone();
329		}
330
331		let req = req.body(body.into()).expect("URI and request headers are valid; qed");
332		let response = self.client.clone().ready().await?.call(req).await?;
333
334		if response.status().is_success() {
335			Ok(response)
336		} else {
337			Err(Error::Rejected { status_code: response.status().into() })
338		}
339	}
340
341	/// Send serialized message and wait until all bytes from the HTTP message body have been read.
342	pub(crate) async fn send_and_read_body(&self, body: String) -> Result<Vec<u8>, Error> {
343		tx_log_from_str(&body, self.max_log_length);
344
345		let response = self.inner_send(body).await?;
346		let (parts, body) = response.into_parts();
347
348		let (body, _is_single) = http_helpers::read_body(&parts.headers, body, self.max_response_size).await?;
349
350		rx_log_from_bytes(&body, self.max_log_length);
351
352		Ok(body)
353	}
354
355	/// Send serialized message without reading the HTTP message body.
356	pub(crate) async fn send(&self, body: String) -> Result<(), Error> {
357		let _ = self.inner_send(body).await?;
358
359		Ok(())
360	}
361}
362
363/// Error that can happen during a request.
364#[derive(Debug, Error)]
365pub enum Error {
366	/// Invalid URL.
367	#[error("Invalid Url: {0}")]
368	Url(String),
369
370	/// Error during the HTTP request, including networking errors and HTTP protocol errors.
371	#[error(transparent)]
372	Http(#[from] HttpError),
373
374	/// Server returned a non-success status code.
375	#[error("Request rejected `{status_code}`")]
376	Rejected {
377		/// HTTP Status code returned by the server.
378		status_code: u16,
379	},
380
381	/// Request body too large.
382	#[error("The request body was too large")]
383	RequestTooLarge,
384
385	/// Invalid certificate store.
386	#[error("Invalid certificate store")]
387	InvalidCertficateStore,
388}
389
390#[cfg(test)]
391mod tests {
392	use super::*;
393
394	#[test]
395	fn invalid_http_url_rejected() {
396		let err = HttpTransportClientBuilder::new().build("ws://localhost:9933").unwrap_err();
397		assert!(matches!(err, Error::Url(_)));
398	}
399
400	#[cfg(feature = "tls")]
401	#[test]
402	fn https_works() {
403		let client = HttpTransportClientBuilder::new().build("https://localhost").unwrap();
404		assert_eq!(&client.target, "https://localhost/");
405	}
406
407	#[cfg(not(feature = "tls"))]
408	#[test]
409	fn https_fails_without_tls_feature() {
410		let err = HttpTransportClientBuilder::new().build("https://localhost").unwrap_err();
411		assert!(matches!(err, Error::Url(_)));
412	}
413
414	#[test]
415	fn faulty_port() {
416		let err = HttpTransportClientBuilder::new().build("http://localhost:-43").unwrap_err();
417		assert!(matches!(err, Error::Url(_)));
418
419		let err = HttpTransportClientBuilder::new().build("http://localhost:-99999").unwrap_err();
420		assert!(matches!(err, Error::Url(_)));
421	}
422
423	#[test]
424	fn url_with_path_works() {
425		let client = HttpTransportClientBuilder::new().build("http://localhost/my-special-path").unwrap();
426		assert_eq!(&client.target, "http://localhost/my-special-path");
427	}
428
429	#[test]
430	fn url_with_query_works() {
431		let client = HttpTransportClientBuilder::new().build("http://127.0.0.1/my?name1=value1&name2=value2").unwrap();
432		assert_eq!(&client.target, "http://127.0.0.1/my?name1=value1&name2=value2");
433	}
434
435	#[test]
436	fn url_with_fragment_is_ignored() {
437		let client = HttpTransportClientBuilder::new().build("http://127.0.0.1/my.htm#ignore").unwrap();
438		assert_eq!(&client.target, "http://127.0.0.1/my.htm");
439	}
440
441	#[test]
442	fn url_default_port_is_omitted() {
443		let client = HttpTransportClientBuilder::new().build("http://127.0.0.1:80").unwrap();
444		assert_eq!(&client.target, "http://127.0.0.1/");
445	}
446
447	#[cfg(feature = "tls")]
448	#[test]
449	fn https_custom_port_works() {
450		let client = HttpTransportClientBuilder::new().build("https://localhost:9999").unwrap();
451		assert_eq!(&client.target, "https://localhost:9999/");
452	}
453
454	#[test]
455	fn http_custom_port_works() {
456		let client = HttpTransportClientBuilder::new().build("http://localhost:9999").unwrap();
457		assert_eq!(&client.target, "http://localhost:9999/");
458	}
459
460	#[tokio::test]
461	async fn request_limit_works() {
462		let eighty_bytes_limit = 80;
463		let fifty_bytes_limit = 50;
464
465		let client = HttpTransportClientBuilder::new()
466			.max_request_size(eighty_bytes_limit)
467			.max_response_size(fifty_bytes_limit)
468			.build("http://localhost:9933")
469			.unwrap();
470
471		assert_eq!(client.max_request_size, eighty_bytes_limit);
472		assert_eq!(client.max_response_size, fifty_bytes_limit);
473
474		let body = "a".repeat(81);
475		assert_eq!(body.len(), 81);
476		let response = client.send(body).await.unwrap_err();
477		assert!(matches!(response, Error::RequestTooLarge));
478	}
479}