1use base64::Engine;
10use hyper::body::Bytes;
11use hyper::http::{HeaderMap, HeaderValue};
12use hyper_util::client::legacy::connect::HttpConnector;
13use hyper_util::client::legacy::Client;
14use hyper_util::rt::TokioExecutor;
15use jsonrpsee_core::tracing::client::{rx_log_from_bytes, tx_log_from_str};
16use jsonrpsee_core::BoxError;
17use jsonrpsee_core::{
18 http_helpers::{self, HttpError},
19 TEN_MB_SIZE_BYTES,
20};
21use std::future::Future;
22use std::pin::Pin;
23use std::task::{Context, Poll};
24use thiserror::Error;
25use tower::layer::util::Identity;
26use tower::{Layer, Service, ServiceExt};
27use url::Url;
28
29use crate::{HttpBody, HttpRequest, HttpResponse};
30
31#[cfg(feature = "tls")]
32use crate::{CertificateStore, CustomCertStore};
33
34const CONTENT_TYPE_JSON: &str = "application/json";
35
36#[derive(Debug)]
38pub enum HttpBackend<B = HttpBody> {
39 #[cfg(feature = "tls")]
41 Https(Client<hyper_rustls::HttpsConnector<HttpConnector>, B>),
42 Http(Client<HttpConnector, B>),
44}
45
46impl<B> Clone for HttpBackend<B> {
47 fn clone(&self) -> Self {
48 match self {
49 Self::Http(inner) => Self::Http(inner.clone()),
50 #[cfg(feature = "tls")]
51 Self::Https(inner) => Self::Https(inner.clone()),
52 }
53 }
54}
55
56impl<B> tower::Service<HttpRequest<B>> for HttpBackend<B>
57where
58 B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
59 B::Data: Send,
60 B::Error: Into<BoxError>,
61{
62 type Response = HttpResponse<hyper::body::Incoming>;
63 type Error = Error;
64 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
65
66 fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
67 match self {
68 Self::Http(inner) => inner.poll_ready(ctx),
69 #[cfg(feature = "tls")]
70 Self::Https(inner) => inner.poll_ready(ctx),
71 }
72 .map_err(|e| Error::Http(HttpError::Stream(e.into())))
73 }
74
75 fn call(&mut self, req: HttpRequest<B>) -> Self::Future {
76 let resp = match self {
77 Self::Http(inner) => inner.call(req),
78 #[cfg(feature = "tls")]
79 Self::Https(inner) => inner.call(req),
80 };
81
82 Box::pin(async move { resp.await.map_err(|e| Error::Http(HttpError::Stream(e.into()))) })
83 }
84}
85
86#[derive(Debug)]
88pub struct HttpTransportClientBuilder<L> {
89 #[cfg(feature = "tls")]
91 pub(crate) certificate_store: CertificateStore,
92 pub(crate) max_request_size: u32,
94 pub(crate) max_response_size: u32,
96 pub(crate) max_log_length: u32,
100 pub(crate) headers: HeaderMap,
102 pub(crate) service_builder: tower::ServiceBuilder<L>,
104 pub(crate) tcp_no_delay: bool,
106}
107
108impl Default for HttpTransportClientBuilder<Identity> {
109 fn default() -> Self {
110 Self::new()
111 }
112}
113
114impl HttpTransportClientBuilder<Identity> {
115 pub fn new() -> Self {
117 Self {
118 #[cfg(feature = "tls")]
119 certificate_store: CertificateStore::Native,
120 max_request_size: TEN_MB_SIZE_BYTES,
121 max_response_size: TEN_MB_SIZE_BYTES,
122 max_log_length: 1024,
123 headers: HeaderMap::new(),
124 service_builder: tower::ServiceBuilder::new(),
125 tcp_no_delay: true,
126 }
127 }
128}
129
130impl<L> HttpTransportClientBuilder<L> {
131 #[cfg(feature = "tls")]
133 pub fn with_custom_cert_store(mut self, cfg: CustomCertStore) -> Self {
134 self.certificate_store = CertificateStore::Custom(cfg);
135 self
136 }
137
138 pub fn max_request_size(mut self, size: u32) -> Self {
140 self.max_request_size = size;
141 self
142 }
143
144 pub fn max_response_size(mut self, size: u32) -> Self {
146 self.max_response_size = size;
147 self
148 }
149
150 pub fn set_headers(mut self, headers: HeaderMap) -> Self {
154 self.headers = headers;
155 self
156 }
157
158 pub fn set_tcp_no_delay(mut self, no_delay: bool) -> Self {
162 self.tcp_no_delay = no_delay;
163 self
164 }
165
166 pub fn set_max_logging_length(mut self, max: u32) -> Self {
170 self.max_log_length = max;
171 self
172 }
173
174 pub fn set_service<T>(self, service: tower::ServiceBuilder<T>) -> HttpTransportClientBuilder<T> {
176 HttpTransportClientBuilder {
177 #[cfg(feature = "tls")]
178 certificate_store: self.certificate_store,
179 headers: self.headers,
180 max_log_length: self.max_log_length,
181 max_request_size: self.max_request_size,
182 max_response_size: self.max_response_size,
183 service_builder: service,
184 tcp_no_delay: self.tcp_no_delay,
185 }
186 }
187
188 pub fn build<S, B>(self, target: impl AsRef<str>) -> Result<HttpTransportClient<S>, Error>
190 where
191 L: Layer<HttpBackend, Service = S>,
192 S: Service<HttpRequest, Response = HttpResponse<B>, Error = Error> + Clone,
193 B: http_body::Body<Data = Bytes> + Send + 'static,
194 B::Data: Send,
195 B::Error: Into<BoxError>,
196 {
197 let Self {
198 #[cfg(feature = "tls")]
199 certificate_store,
200 max_request_size,
201 max_response_size,
202 max_log_length,
203 headers,
204 service_builder,
205 tcp_no_delay,
206 } = self;
207 let mut url = Url::parse(target.as_ref()).map_err(|e| Error::Url(format!("Invalid URL: {e}")))?;
208
209 if url.host_str().is_none() {
210 return Err(Error::Url("Invalid host".into()));
211 }
212 url.set_fragment(None);
213
214 let client = match url.scheme() {
215 "http" => {
216 let mut connector = HttpConnector::new();
217 connector.set_nodelay(tcp_no_delay);
218 HttpBackend::Http(Client::builder(TokioExecutor::new()).build(connector))
219 }
220 #[cfg(feature = "tls")]
221 "https" => {
222 let _ = rustls::crypto::ring::default_provider().install_default();
227
228 let mut http_conn = HttpConnector::new();
229 http_conn.set_nodelay(tcp_no_delay);
230 http_conn.enforce_http(false);
231
232 let https_conn = match certificate_store {
233 CertificateStore::Native => {
234 use rustls_platform_verifier::ConfigVerifierExt;
235
236 hyper_rustls::HttpsConnectorBuilder::new()
237 .with_tls_config(rustls::ClientConfig::with_platform_verifier())
238 .https_or_http()
239 .enable_all_versions()
240 .wrap_connector(http_conn)
241 }
242
243 CertificateStore::Custom(tls_config) => hyper_rustls::HttpsConnectorBuilder::new()
244 .with_tls_config(tls_config)
245 .https_or_http()
246 .enable_all_versions()
247 .wrap_connector(http_conn),
248 };
249
250 HttpBackend::Https(Client::builder(TokioExecutor::new()).build(https_conn))
251 }
252 _ => {
253 #[cfg(feature = "tls")]
254 let err = "URL scheme not supported, expects 'http' or 'https'";
255 #[cfg(not(feature = "tls"))]
256 let err = "URL scheme not supported, expects 'http'";
257 return Err(Error::Url(err.into()));
258 }
259 };
260
261 let mut cached_headers = HeaderMap::with_capacity(2 + headers.len());
265 cached_headers.insert(hyper::header::CONTENT_TYPE, HeaderValue::from_static(CONTENT_TYPE_JSON));
266 cached_headers.insert(hyper::header::ACCEPT, HeaderValue::from_static(CONTENT_TYPE_JSON));
267 for (key, value) in headers.into_iter() {
268 if let Some(key) = key {
269 cached_headers.insert(key, value);
270 }
271 }
272
273 if let Some(pwd) = url.password() {
274 if !cached_headers.contains_key(hyper::header::AUTHORIZATION) {
275 let digest = base64::engine::general_purpose::STANDARD.encode(format!("{}:{pwd}", url.username()));
276 cached_headers.insert(
277 hyper::header::AUTHORIZATION,
278 HeaderValue::from_str(&format!("Basic {digest}"))
279 .map_err(|_| Error::Url("Header value `authorization basic user:pwd` invalid".into()))?,
280 );
281 }
282 }
283
284 Ok(HttpTransportClient {
285 target: url.as_str().to_owned(),
286 client: service_builder.service(client),
287 max_request_size,
288 max_response_size,
289 max_log_length,
290 headers: cached_headers,
291 })
292 }
293}
294
295#[derive(Debug, Clone)]
297pub struct HttpTransportClient<S> {
298 target: String,
300 client: S,
302 max_request_size: u32,
304 max_response_size: u32,
306 max_log_length: u32,
310 headers: HeaderMap,
312}
313
314impl<B, S> HttpTransportClient<S>
315where
316 S: Service<HttpRequest, Response = HttpResponse<B>, Error = Error> + Clone,
317 B: http_body::Body<Data = Bytes> + Send + 'static,
318 B::Data: Send,
319 B::Error: Into<BoxError>,
320{
321 async fn inner_send(&self, body: String) -> Result<HttpResponse<B>, Error> {
322 if body.len() > self.max_request_size as usize {
323 return Err(Error::RequestTooLarge);
324 }
325
326 let mut req = HttpRequest::post(&self.target);
327 if let Some(headers) = req.headers_mut() {
328 *headers = self.headers.clone();
329 }
330
331 let req = req.body(body.into()).expect("URI and request headers are valid; qed");
332 let response = self.client.clone().ready().await?.call(req).await?;
333
334 if response.status().is_success() {
335 Ok(response)
336 } else {
337 Err(Error::Rejected { status_code: response.status().into() })
338 }
339 }
340
341 pub(crate) async fn send_and_read_body(&self, body: String) -> Result<Vec<u8>, Error> {
343 tx_log_from_str(&body, self.max_log_length);
344
345 let response = self.inner_send(body).await?;
346 let (parts, body) = response.into_parts();
347
348 let (body, _is_single) = http_helpers::read_body(&parts.headers, body, self.max_response_size).await?;
349
350 rx_log_from_bytes(&body, self.max_log_length);
351
352 Ok(body)
353 }
354
355 pub(crate) async fn send(&self, body: String) -> Result<(), Error> {
357 let _ = self.inner_send(body).await?;
358
359 Ok(())
360 }
361}
362
363#[derive(Debug, Error)]
365pub enum Error {
366 #[error("Invalid Url: {0}")]
368 Url(String),
369
370 #[error(transparent)]
372 Http(#[from] HttpError),
373
374 #[error("Request rejected `{status_code}`")]
376 Rejected {
377 status_code: u16,
379 },
380
381 #[error("The request body was too large")]
383 RequestTooLarge,
384
385 #[error("Invalid certificate store")]
387 InvalidCertficateStore,
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393
394 #[test]
395 fn invalid_http_url_rejected() {
396 let err = HttpTransportClientBuilder::new().build("ws://localhost:9933").unwrap_err();
397 assert!(matches!(err, Error::Url(_)));
398 }
399
400 #[cfg(feature = "tls")]
401 #[test]
402 fn https_works() {
403 let client = HttpTransportClientBuilder::new().build("https://localhost").unwrap();
404 assert_eq!(&client.target, "https://localhost/");
405 }
406
407 #[cfg(not(feature = "tls"))]
408 #[test]
409 fn https_fails_without_tls_feature() {
410 let err = HttpTransportClientBuilder::new().build("https://localhost").unwrap_err();
411 assert!(matches!(err, Error::Url(_)));
412 }
413
414 #[test]
415 fn faulty_port() {
416 let err = HttpTransportClientBuilder::new().build("http://localhost:-43").unwrap_err();
417 assert!(matches!(err, Error::Url(_)));
418
419 let err = HttpTransportClientBuilder::new().build("http://localhost:-99999").unwrap_err();
420 assert!(matches!(err, Error::Url(_)));
421 }
422
423 #[test]
424 fn url_with_path_works() {
425 let client = HttpTransportClientBuilder::new().build("http://localhost/my-special-path").unwrap();
426 assert_eq!(&client.target, "http://localhost/my-special-path");
427 }
428
429 #[test]
430 fn url_with_query_works() {
431 let client = HttpTransportClientBuilder::new().build("http://127.0.0.1/my?name1=value1&name2=value2").unwrap();
432 assert_eq!(&client.target, "http://127.0.0.1/my?name1=value1&name2=value2");
433 }
434
435 #[test]
436 fn url_with_fragment_is_ignored() {
437 let client = HttpTransportClientBuilder::new().build("http://127.0.0.1/my.htm#ignore").unwrap();
438 assert_eq!(&client.target, "http://127.0.0.1/my.htm");
439 }
440
441 #[test]
442 fn url_default_port_is_omitted() {
443 let client = HttpTransportClientBuilder::new().build("http://127.0.0.1:80").unwrap();
444 assert_eq!(&client.target, "http://127.0.0.1/");
445 }
446
447 #[cfg(feature = "tls")]
448 #[test]
449 fn https_custom_port_works() {
450 let client = HttpTransportClientBuilder::new().build("https://localhost:9999").unwrap();
451 assert_eq!(&client.target, "https://localhost:9999/");
452 }
453
454 #[test]
455 fn http_custom_port_works() {
456 let client = HttpTransportClientBuilder::new().build("http://localhost:9999").unwrap();
457 assert_eq!(&client.target, "http://localhost:9999/");
458 }
459
460 #[tokio::test]
461 async fn request_limit_works() {
462 let eighty_bytes_limit = 80;
463 let fifty_bytes_limit = 50;
464
465 let client = HttpTransportClientBuilder::new()
466 .max_request_size(eighty_bytes_limit)
467 .max_response_size(fifty_bytes_limit)
468 .build("http://localhost:9933")
469 .unwrap();
470
471 assert_eq!(client.max_request_size, eighty_bytes_limit);
472 assert_eq!(client.max_response_size, fifty_bytes_limit);
473
474 let body = "a".repeat(81);
475 assert_eq!(body.len(), 81);
476 let response = client.send(body).await.unwrap_err();
477 assert!(matches!(response, Error::RequestTooLarge));
478 }
479}