1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
mod builder;
#[cfg(test)]
mod tests;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use jsonrpc_types::*;
use serde::{de::DeserializeOwned, Serialize};
pub use self::builder::HttpClientBuilder;
use crate::{
error::HttpClientError,
transport::{BatchTransport, Transport},
};
#[cfg(feature = "http-async-std")]
#[derive(Clone)]
pub struct HttpClient {
url: String,
id: Arc<AtomicU64>,
client: surf::Client,
headers: http::header::HeaderMap,
timeout: Option<std::time::Duration>,
}
#[cfg(feature = "http-tokio")]
#[derive(Clone)]
pub struct HttpClient {
url: String,
id: Arc<AtomicU64>,
client: reqwest::Client,
}
impl HttpClient {
pub fn new<U: Into<String>>(url: U) -> Result<Self, HttpClientError> {
HttpClientBuilder::new().build(url)
}
pub fn builder() -> HttpClientBuilder {
HttpClientBuilder::new()
}
}
#[cfg(feature = "http-async-std")]
impl HttpClient {
async fn send_request<REQ, RSP>(&self, request: REQ) -> Result<RSP, HttpClientError>
where
REQ: Serialize,
RSP: Serialize + DeserializeOwned,
{
let request = serde_json::to_string(&request).expect("serialize request");
log::debug!("Request: {}", request);
let mut builder = self
.client
.post(&self.url)
.content_type(surf::http::mime::JSON)
.body(request);
for (header_name, header_value) in self.headers.iter() {
builder = builder.header(
header_name.as_str(),
header_value.to_str().expect("must be visible ascii"),
);
}
let response = builder.send();
let response = if let Some(duration) = self.timeout {
let timeout = async_std::task::sleep(duration);
futures::pin_mut!(response, timeout);
match futures::future::select(response, timeout).await {
futures::future::Either::Left((response, _)) => response,
futures::future::Either::Right((_, _)) => return Err(anyhow::anyhow!("http request timeout").into()),
}
} else {
response.await
};
let mut response = response.map_err(|err| err.into_inner())?;
let response = response.body_string().await.map_err(|err| err.into_inner())?;
log::debug!("Response: {}", response);
Ok(serde_json::from_str::<RSP>(&response)?)
}
}
#[cfg(feature = "http-tokio")]
impl HttpClient {
async fn send_request<REQ, RSP>(&self, request: REQ) -> Result<RSP, HttpClientError>
where
REQ: Serialize,
RSP: Serialize + DeserializeOwned,
{
log::debug!(
"Request: {}",
serde_json::to_string(&request).expect("serialize request")
);
let builder = self.client.post(&self.url).json(&request);
let response = builder.send().await?;
let response = response.json().await?;
log::debug!(
"Response: {}",
serde_json::to_string(&response).expect("serialize response")
);
Ok(response)
}
}
#[async_trait::async_trait]
impl Transport for HttpClient {
type Error = HttpClientError;
async fn request<M>(&self, method: M, params: Option<Params>) -> Result<Output, Self::Error>
where
M: Into<String> + Send,
{
let id = self.id.fetch_add(1, Ordering::AcqRel);
let call = MethodCall::new(method, params, Id::Num(id));
self.send_request(call).await
}
}
#[async_trait::async_trait]
impl BatchTransport for HttpClient {
async fn request_batch<I, M>(&self, batch: I) -> Result<Vec<Output>, <Self as Transport>::Error>
where
I: IntoIterator<Item = (M, Option<Params>)> + Send,
I::IntoIter: Send,
M: Into<String>,
{
let calls = batch
.into_iter()
.map(|(method, params)| {
let id = self.id.fetch_add(1, Ordering::AcqRel);
MethodCall::new(method, params, Id::Num(id))
})
.collect::<Vec<_>>();
self.send_request(calls).await
}
}