async_json_rpc/clients/
http.rs

1use std::{
2    error, fmt,
3    pin::Pin,
4    sync::{
5        atomic::{AtomicUsize, Ordering},
6        Arc,
7    },
8};
9
10use futures_core::{
11    task::{Context, Poll},
12    Future,
13};
14use futures_util::TryFutureExt;
15use hyper::client::HttpConnector;
16use hyper::{
17    body::to_bytes,
18    header::{AUTHORIZATION, CONTENT_TYPE},
19    Body, Client as HyperClient, Error as HyperError, Request as HttpRequest,
20    Response as HttpResponse,
21};
22use hyper_tls::HttpsConnector;
23use tower_service::Service;
24use tower_util::ServiceExt;
25
26use super::{Error, RequestFactory};
27use crate::objects::{Request, RequestBuilder, Response};
28
29pub type HttpError<E> = Error<ConnectionError<E>>;
30
31/// Error specific to HTTP connections.
32#[derive(Debug)]
33pub enum ConnectionError<E> {
34    Poll(E),
35    Service(E),
36    Body(HyperError),
37}
38
39impl<E: fmt::Display> fmt::Display for ConnectionError<E> {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        match self {
42            Self::Poll(err) => write!(f, "polling error, {}", err),
43            Self::Service(err) => write!(f, "service error, {}", err),
44            Self::Body(err) => write!(f, "body error, {}", err),
45        }
46    }
47}
48
49impl<E: fmt::Display + fmt::Debug> error::Error for ConnectionError<E> {}
50
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct Credentials {
53    url: String,
54    user: Option<String>,
55    password: Option<String>,
56}
57
58/// A handle to a remote HTTP JSON-RPC server.
59#[derive(Clone, Debug)]
60pub struct Client<S> {
61    credentials: Arc<Credentials>,
62    nonce: Arc<AtomicUsize>,
63    inner_service: S,
64}
65
66impl<S> Client<S> {
67    /// Creates a new HTTP client from a [`Service`].
68    ///
69    /// [`Service`]: tower::Service
70    pub fn from_service(
71        service: S,
72        url: String,
73        user: Option<String>,
74        password: Option<String>,
75    ) -> Self {
76        let credentials = Arc::new(Credentials {
77            url,
78            user,
79            password,
80        });
81        Client {
82            credentials,
83            inner_service: service,
84            nonce: Arc::new(AtomicUsize::new(0)),
85        }
86    }
87
88    /// Increment nonce and return the last value.
89    pub fn next_nonce(&self) -> usize {
90        self.nonce.load(Ordering::AcqRel)
91    }
92}
93
94impl Client<HyperClient<HttpConnector>> {
95    /// Creates a new HTTP client.
96    pub fn new(url: String, user: Option<String>, password: Option<String>) -> Self {
97        Self::from_service(HyperClient::new(), url, user, password)
98    }
99}
100
101impl Client<HyperClient<HttpsConnector<HttpConnector>>> {
102    /// Creates a new HTTPS client.
103    pub fn new_tls(url: String, user: Option<String>, password: Option<String>) -> Self {
104        let https = HttpsConnector::new();
105        let service = HyperClient::builder().build::<_, Body>(https);
106        Self::from_service(service, url, user, password)
107    }
108}
109
110type FutResponse<R, E> = Pin<Box<dyn Future<Output = Result<R, E>> + 'static + Send>>;
111
112impl<S> Service<Request> for Client<S>
113where
114    S: Service<HttpRequest<Body>, Response = HttpResponse<Body>>,
115    S::Error: 'static,
116    S::Future: Send + 'static,
117{
118    type Response = Response;
119    type Error = Error<ConnectionError<S::Error>>;
120    type Future = FutResponse<Self::Response, Self::Error>;
121
122    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
123        self.inner_service
124            .poll_ready(cx)
125            .map_err(ConnectionError::Poll)
126            .map_err(Error::Connection)
127    }
128
129    fn call(&mut self, request: Request) -> Self::Future {
130        let json_raw = serde_json::to_vec(&request).unwrap(); // This is safe
131        let body = Body::from(json_raw);
132        let mut builder = hyper::Request::post(&self.credentials.url);
133
134        // Add authorization
135        if let Some(ref user) = self.credentials.user {
136            let pass_str = match &self.credentials.password {
137                Some(some) => some,
138                None => "",
139            };
140            builder = builder.header(
141                AUTHORIZATION,
142                format!(
143                    "Basic {}",
144                    base64::encode(&format!("{}:{}", user, pass_str))
145                ),
146            )
147        };
148
149        // Add headers and body
150        let request = builder
151            .header(CONTENT_TYPE, "application/json")
152            .body(body)
153            .unwrap(); // This is safe
154
155        // Send request
156        let fut = self
157            .inner_service
158            .call(request)
159            .map_err(ConnectionError::Service)
160            .map_err(Error::Connection)
161            .and_then(|response| async move {
162                let body = to_bytes(response.into_body())
163                    .await
164                    .map_err(ConnectionError::Body)
165                    .map_err(Error::Connection)?;
166                Ok(serde_json::from_slice(&body).map_err(Error::Json)?)
167            });
168
169        Box::pin(fut)
170    }
171}
172
173impl<S> Client<S>
174where
175    S: Service<HttpRequest<Body>, Response = HttpResponse<Body>> + Clone,
176    S::Error: 'static,
177    S::Future: Send + 'static,
178{
179    pub async fn send(
180        &self,
181        request: Request,
182    ) -> Result<Response, Error<ConnectionError<S::Error>>> {
183        self.clone().oneshot(request).await
184    }
185}
186
187impl<C> RequestFactory for Client<C> {
188    /// Build the request.
189    fn build_request(&self) -> RequestBuilder {
190        let id = serde_json::Value::Number(self.nonce.fetch_add(1, Ordering::AcqRel).into());
191        Request::build().id(id)
192    }
193}