1use base64::Engine;
10use hyper::body::Bytes;
11use hyper::http::{HeaderMap, HeaderValue};
12use hyper_util::client::legacy::Client;
13use hyper_util::client::legacy::connect::HttpConnector;
14use hyper_util::rt::TokioExecutor;
15use jsonrpsee_core::BoxError;
16use jsonrpsee_core::{
17 TEN_MB_SIZE_BYTES,
18 http_helpers::{self, HttpError},
19};
20use std::future::Future;
21use std::pin::Pin;
22use std::task::{Context, Poll};
23use thiserror::Error;
24use tower::layer::util::Identity;
25use tower::{Layer, Service, ServiceExt};
26use url::Url;
27
28use crate::{HttpBody, HttpRequest, HttpResponse};
29
30#[cfg(feature = "tls")]
31use crate::{CertificateStore, CustomCertStore};
32
33const CONTENT_TYPE_JSON: &str = "application/json";
34
35#[derive(Debug)]
37pub enum HttpBackend<B = HttpBody> {
38 #[cfg(feature = "tls")]
40 Https(Client<hyper_rustls::HttpsConnector<HttpConnector>, B>),
41 Http(Client<HttpConnector, B>),
43}
44
45impl<B> Clone for HttpBackend<B> {
46 fn clone(&self) -> Self {
47 match self {
48 Self::Http(inner) => Self::Http(inner.clone()),
49 #[cfg(feature = "tls")]
50 Self::Https(inner) => Self::Https(inner.clone()),
51 }
52 }
53}
54
55impl<B> tower::Service<HttpRequest<B>> for HttpBackend<B>
56where
57 B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
58 B::Data: Send,
59 B::Error: Into<BoxError>,
60{
61 type Response = HttpResponse<hyper::body::Incoming>;
62 type Error = Error;
63 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
64
65 fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
66 match self {
67 Self::Http(inner) => inner.poll_ready(ctx),
68 #[cfg(feature = "tls")]
69 Self::Https(inner) => inner.poll_ready(ctx),
70 }
71 .map_err(|e| Error::Http(HttpError::Stream(e.into())))
72 }
73
74 fn call(&mut self, req: HttpRequest<B>) -> Self::Future {
75 let resp = match self {
76 Self::Http(inner) => inner.call(req),
77 #[cfg(feature = "tls")]
78 Self::Https(inner) => inner.call(req),
79 };
80
81 Box::pin(async move { resp.await.map_err(|e| Error::Http(HttpError::Stream(e.into()))) })
82 }
83}
84
85#[derive(Debug)]
87pub struct HttpTransportClientBuilder<L> {
88 #[cfg(feature = "tls")]
90 pub(crate) certificate_store: CertificateStore,
91 pub(crate) max_request_size: u32,
93 pub(crate) max_response_size: u32,
95 pub(crate) headers: HeaderMap,
97 pub(crate) service_builder: tower::ServiceBuilder<L>,
99 pub(crate) tcp_no_delay: bool,
101}
102
103impl Default for HttpTransportClientBuilder<Identity> {
104 fn default() -> Self {
105 Self::new()
106 }
107}
108
109impl HttpTransportClientBuilder<Identity> {
110 pub fn new() -> Self {
112 Self {
113 #[cfg(feature = "tls")]
114 certificate_store: CertificateStore::Native,
115 max_request_size: TEN_MB_SIZE_BYTES,
116 max_response_size: TEN_MB_SIZE_BYTES,
117 headers: HeaderMap::new(),
118 service_builder: tower::ServiceBuilder::new(),
119 tcp_no_delay: true,
120 }
121 }
122}
123
124impl<L> HttpTransportClientBuilder<L> {
125 #[cfg(feature = "tls")]
127 pub fn with_custom_cert_store(mut self, cfg: CustomCertStore) -> Self {
128 self.certificate_store = CertificateStore::Custom(cfg);
129 self
130 }
131
132 pub fn max_request_size(mut self, size: u32) -> Self {
134 self.max_request_size = size;
135 self
136 }
137
138 pub fn max_response_size(mut self, size: u32) -> Self {
140 self.max_response_size = size;
141 self
142 }
143
144 pub fn set_headers(mut self, headers: HeaderMap) -> Self {
148 self.headers = headers;
149 self
150 }
151
152 pub fn set_tcp_no_delay(mut self, no_delay: bool) -> Self {
156 self.tcp_no_delay = no_delay;
157 self
158 }
159
160 pub fn set_service<T>(self, service: tower::ServiceBuilder<T>) -> HttpTransportClientBuilder<T> {
162 HttpTransportClientBuilder {
163 #[cfg(feature = "tls")]
164 certificate_store: self.certificate_store,
165 headers: self.headers,
166 max_request_size: self.max_request_size,
167 max_response_size: self.max_response_size,
168 service_builder: service,
169 tcp_no_delay: self.tcp_no_delay,
170 }
171 }
172
173 pub fn build<S, B>(self, target: impl AsRef<str>) -> Result<HttpTransportClient<S>, Error>
175 where
176 L: Layer<HttpBackend, Service = S>,
177 S: Service<HttpRequest, Response = HttpResponse<B>, Error = Error> + Clone,
178 B: http_body::Body<Data = Bytes> + Send + 'static,
179 B::Data: Send,
180 B::Error: Into<BoxError>,
181 {
182 let Self {
183 #[cfg(feature = "tls")]
184 certificate_store,
185 max_request_size,
186 max_response_size,
187 headers,
188 service_builder,
189 tcp_no_delay,
190 } = self;
191 let mut url = Url::parse(target.as_ref()).map_err(|e| Error::Url(format!("Invalid URL: {e}")))?;
192
193 if url.host_str().is_none() {
194 return Err(Error::Url("Invalid host".into()));
195 }
196 url.set_fragment(None);
197
198 let client = match url.scheme() {
199 "http" => {
200 let mut connector = HttpConnector::new();
201 connector.set_nodelay(tcp_no_delay);
202 HttpBackend::Http(Client::builder(TokioExecutor::new()).build(connector))
203 }
204 #[cfg(feature = "tls")]
205 "https" => {
206 let _ = rustls::crypto::ring::default_provider().install_default();
211
212 let mut http_conn = HttpConnector::new();
213 http_conn.set_nodelay(tcp_no_delay);
214 http_conn.enforce_http(false);
215
216 let https_conn = match certificate_store {
217 CertificateStore::Native => {
218 use rustls_platform_verifier::ConfigVerifierExt;
219
220 hyper_rustls::HttpsConnectorBuilder::new()
221 .with_tls_config(rustls::ClientConfig::with_platform_verifier())
222 .https_or_http()
223 .enable_all_versions()
224 .wrap_connector(http_conn)
225 }
226
227 CertificateStore::Custom(tls_config) => hyper_rustls::HttpsConnectorBuilder::new()
228 .with_tls_config(tls_config)
229 .https_or_http()
230 .enable_all_versions()
231 .wrap_connector(http_conn),
232 };
233
234 HttpBackend::Https(Client::builder(TokioExecutor::new()).build(https_conn))
235 }
236 _ => {
237 #[cfg(feature = "tls")]
238 let err = "URL scheme not supported, expects 'http' or 'https'";
239 #[cfg(not(feature = "tls"))]
240 let err = "URL scheme not supported, expects 'http'";
241 return Err(Error::Url(err.into()));
242 }
243 };
244
245 let mut cached_headers = HeaderMap::with_capacity(2 + headers.len());
249 cached_headers.insert(hyper::header::CONTENT_TYPE, HeaderValue::from_static(CONTENT_TYPE_JSON));
250 cached_headers.insert(hyper::header::ACCEPT, HeaderValue::from_static(CONTENT_TYPE_JSON));
251 for (key, value) in headers.into_iter() {
252 if let Some(key) = key {
253 cached_headers.insert(key, value);
254 }
255 }
256
257 if let Some(pwd) = url.password() {
258 if !cached_headers.contains_key(hyper::header::AUTHORIZATION) {
259 let digest = base64::engine::general_purpose::STANDARD.encode(format!("{}:{pwd}", url.username()));
260 cached_headers.insert(
261 hyper::header::AUTHORIZATION,
262 HeaderValue::from_str(&format!("Basic {digest}"))
263 .map_err(|_| Error::Url("Header value `authorization basic user:pwd` invalid".into()))?,
264 );
265 }
266 }
267 let _ = url.set_password(None);
268 let _ = url.set_username("");
269
270 Ok(HttpTransportClient {
271 target: url.as_str().to_owned(),
272 client: service_builder.service(client),
273 max_request_size,
274 max_response_size,
275 headers: cached_headers,
276 })
277 }
278}
279
280#[derive(Debug, Clone)]
282pub struct HttpTransportClient<S> {
283 target: String,
285 client: S,
287 max_request_size: u32,
289 max_response_size: u32,
291 headers: HeaderMap,
293}
294
295impl<B, S> HttpTransportClient<S>
296where
297 S: Service<HttpRequest, Response = HttpResponse<B>, Error = Error> + Clone,
298 B: http_body::Body<Data = Bytes> + Send + 'static,
299 B::Data: Send,
300 B::Error: Into<BoxError>,
301{
302 async fn inner_send(&self, body: String) -> Result<HttpResponse<B>, Error> {
303 if body.len() > self.max_request_size as usize {
304 return Err(Error::RequestTooLarge);
305 }
306
307 let mut req = HttpRequest::post(&self.target);
308 if let Some(headers) = req.headers_mut() {
309 *headers = self.headers.clone();
310 }
311
312 let req = req.body(body.into()).expect("URI and request headers are valid; qed");
313 let response = self.client.clone().ready().await?.call(req).await?;
314
315 if response.status().is_success() {
316 Ok(response)
317 } else {
318 Err(Error::Rejected { status_code: response.status().into() })
319 }
320 }
321
322 pub(crate) async fn send_and_read_body(&self, body: String) -> Result<Vec<u8>, Error> {
324 let response = self.inner_send(body).await?;
325
326 let (parts, body) = response.into_parts();
327 let (body, _is_single) = http_helpers::read_body(&parts.headers, body, self.max_response_size).await?;
328
329 Ok(body)
330 }
331
332 pub(crate) async fn send(&self, body: String) -> Result<(), Error> {
334 self.inner_send(body).await?;
335 Ok(())
336 }
337}
338
339#[derive(Debug, Error)]
341pub enum Error {
342 #[error("Invalid Url: {0}")]
344 Url(String),
345
346 #[error(transparent)]
348 Http(#[from] HttpError),
349
350 #[error("Request rejected `{status_code}`")]
352 Rejected {
353 status_code: u16,
355 },
356
357 #[error("The request body was too large")]
359 RequestTooLarge,
360
361 #[error("Invalid certificate store")]
363 InvalidCertficateStore,
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369
370 #[test]
371 fn invalid_http_url_rejected() {
372 let err = HttpTransportClientBuilder::new().build("ws://localhost:9933").unwrap_err();
373 assert!(matches!(err, Error::Url(_)));
374 }
375
376 #[cfg(feature = "tls")]
377 #[test]
378 fn https_works() {
379 let client = HttpTransportClientBuilder::new().build("https://localhost").unwrap();
380 assert_eq!(&client.target, "https://localhost/");
381 }
382
383 #[cfg(not(feature = "tls"))]
384 #[test]
385 fn https_fails_without_tls_feature() {
386 let err = HttpTransportClientBuilder::new().build("https://localhost").unwrap_err();
387 assert!(matches!(err, Error::Url(_)));
388 }
389
390 #[test]
391 fn faulty_port() {
392 let err = HttpTransportClientBuilder::new().build("http://localhost:-43").unwrap_err();
393 assert!(matches!(err, Error::Url(_)));
394
395 let err = HttpTransportClientBuilder::new().build("http://localhost:-99999").unwrap_err();
396 assert!(matches!(err, Error::Url(_)));
397 }
398
399 #[test]
400 fn url_with_path_works() {
401 let client = HttpTransportClientBuilder::new().build("http://localhost/my-special-path").unwrap();
402 assert_eq!(&client.target, "http://localhost/my-special-path");
403 }
404
405 #[test]
406 fn url_with_query_works() {
407 let client = HttpTransportClientBuilder::new().build("http://127.0.0.1/my?name1=value1&name2=value2").unwrap();
408 assert_eq!(&client.target, "http://127.0.0.1/my?name1=value1&name2=value2");
409 }
410
411 #[test]
412 fn url_with_fragment_is_ignored() {
413 let client = HttpTransportClientBuilder::new().build("http://127.0.0.1/my.htm#ignore").unwrap();
414 assert_eq!(&client.target, "http://127.0.0.1/my.htm");
415 }
416
417 #[test]
418 fn url_default_port_is_omitted() {
419 let client = HttpTransportClientBuilder::new().build("http://127.0.0.1:80").unwrap();
420 assert_eq!(&client.target, "http://127.0.0.1/");
421 }
422
423 #[cfg(feature = "tls")]
424 #[test]
425 fn https_custom_port_works() {
426 let client = HttpTransportClientBuilder::new().build("https://localhost:9999").unwrap();
427 assert_eq!(&client.target, "https://localhost:9999/");
428 }
429
430 #[test]
431 fn http_custom_port_works() {
432 let client = HttpTransportClientBuilder::new().build("http://localhost:9999").unwrap();
433 assert_eq!(&client.target, "http://localhost:9999/");
434 }
435
436 #[tokio::test]
437 async fn request_limit_works() {
438 let eighty_bytes_limit = 80;
439 let fifty_bytes_limit = 50;
440
441 let client = HttpTransportClientBuilder::new()
442 .max_request_size(eighty_bytes_limit)
443 .max_response_size(fifty_bytes_limit)
444 .build("http://localhost:9933")
445 .unwrap();
446
447 assert_eq!(client.max_request_size, eighty_bytes_limit);
448 assert_eq!(client.max_response_size, fifty_bytes_limit);
449
450 let body = "a".repeat(81);
451 assert_eq!(body.len(), 81);
452 let response = client.send(body).await.unwrap_err();
453 assert!(matches!(response, Error::RequestTooLarge));
454 }
455}