async_jsonrpc_client/http_client/
mod.rs

1mod builder;
2#[cfg(test)]
3mod tests;
4
5use std::sync::{
6    atomic::{AtomicU64, Ordering},
7    Arc,
8};
9
10use jsonrpc_types::*;
11use serde::{de::DeserializeOwned, Serialize};
12
13pub use self::builder::HttpClientBuilder;
14use crate::{
15    error::HttpClientError,
16    transport::{BatchTransport, Transport},
17};
18
19/// HTTP JSON-RPC client
20#[cfg(feature = "http-async-std")]
21#[derive(Clone)]
22pub struct HttpClient {
23    url: String,
24    id: Arc<AtomicU64>,
25    client: surf::Client,
26    headers: http::header::HeaderMap,
27    timeout: Option<std::time::Duration>,
28}
29
30/// HTTP JSON-RPC client
31#[cfg(feature = "http-tokio")]
32#[derive(Clone)]
33pub struct HttpClient {
34    url: String,
35    id: Arc<AtomicU64>,
36    client: reqwest::Client,
37}
38
39impl HttpClient {
40    /// Creates a new HTTP JSON-RPC client with given `url`.
41    pub fn new<U: Into<String>>(url: U) -> Result<Self, HttpClientError> {
42        HttpClientBuilder::new().build(url)
43    }
44
45    /// Creates a `HttpClientBuilder` to configure a `HttpClient`.
46    ///
47    /// This is the same as `HttpClientBuilder::new()`.
48    pub fn builder() -> HttpClientBuilder {
49        HttpClientBuilder::new()
50    }
51}
52
53#[cfg(feature = "http-async-std")]
54impl HttpClient {
55    async fn send_request<REQ, RSP>(&self, request: REQ) -> Result<RSP, HttpClientError>
56    where
57        REQ: Serialize,
58        RSP: Serialize + DeserializeOwned,
59    {
60        let request = serde_json::to_string(&request).expect("serialize request");
61        log::debug!("Request: {}", request);
62
63        let mut builder = self
64            .client
65            .post(&self.url)
66            .content_type(surf::http::mime::JSON)
67            .body(request);
68        for (header_name, header_value) in self.headers.iter() {
69            builder = builder.header(
70                header_name.as_str(),
71                header_value.to_str().expect("must be visible ascii"),
72            );
73        }
74
75        let response = builder.send();
76        let response = if let Some(duration) = self.timeout {
77            let timeout = async_std::task::sleep(duration);
78            futures::pin_mut!(response, timeout);
79            match futures::future::select(response, timeout).await {
80                futures::future::Either::Left((response, _)) => response,
81                futures::future::Either::Right((_, _)) => return Err(anyhow::anyhow!("http request timeout").into()),
82            }
83        } else {
84            response.await
85        };
86        let mut response = response.map_err(|err| err.into_inner())?;
87
88        let response = response.body_string().await.map_err(|err| err.into_inner())?;
89        log::debug!("Response: {}", response);
90
91        Ok(serde_json::from_str::<RSP>(&response)?)
92    }
93}
94
95#[cfg(feature = "http-tokio")]
96impl HttpClient {
97    async fn send_request<REQ, RSP>(&self, request: REQ) -> Result<RSP, HttpClientError>
98    where
99        REQ: Serialize,
100        RSP: Serialize + DeserializeOwned,
101    {
102        log::debug!(
103            "Request: {}",
104            serde_json::to_string(&request).expect("serialize request")
105        );
106        let builder = self.client.post(&self.url).json(&request);
107        let response = builder.send().await?;
108        let response = response.json().await?;
109        log::debug!(
110            "Response: {}",
111            serde_json::to_string(&response).expect("serialize response")
112        );
113        Ok(response)
114    }
115}
116
117#[async_trait::async_trait]
118impl Transport for HttpClient {
119    type Error = HttpClientError;
120
121    async fn request<M>(&self, method: M, params: Option<Params>) -> Result<Output, Self::Error>
122    where
123        M: Into<String> + Send,
124    {
125        let id = self.id.fetch_add(1, Ordering::AcqRel);
126        let call = MethodCall::new(method, params, Id::Num(id));
127        self.send_request(call).await
128    }
129}
130
131#[async_trait::async_trait]
132impl BatchTransport for HttpClient {
133    async fn request_batch<I, M>(&self, batch: I) -> Result<Vec<Output>, <Self as Transport>::Error>
134    where
135        I: IntoIterator<Item = (M, Option<Params>)> + Send,
136        I::IntoIter: Send,
137        M: Into<String>,
138    {
139        let calls = batch
140            .into_iter()
141            .map(|(method, params)| {
142                let id = self.id.fetch_add(1, Ordering::AcqRel);
143                MethodCall::new(method, params, Id::Num(id))
144            })
145            .collect::<Vec<_>>();
146        self.send_request(calls).await
147    }
148}