async_jsonrpc_client/http_client/
mod.rs1mod builder;
2#[cfg(test)]
3mod tests;
4
5use std::sync::{
6 atomic::{AtomicU64, Ordering},
7 Arc,
8};
9
10use jsonrpc_types::*;
11use serde::{de::DeserializeOwned, Serialize};
12
13pub use self::builder::HttpClientBuilder;
14use crate::{
15 error::HttpClientError,
16 transport::{BatchTransport, Transport},
17};
18
19#[cfg(feature = "http-async-std")]
21#[derive(Clone)]
22pub struct HttpClient {
23 url: String,
24 id: Arc<AtomicU64>,
25 client: surf::Client,
26 headers: http::header::HeaderMap,
27 timeout: Option<std::time::Duration>,
28}
29
30#[cfg(feature = "http-tokio")]
32#[derive(Clone)]
33pub struct HttpClient {
34 url: String,
35 id: Arc<AtomicU64>,
36 client: reqwest::Client,
37}
38
39impl HttpClient {
40 pub fn new<U: Into<String>>(url: U) -> Result<Self, HttpClientError> {
42 HttpClientBuilder::new().build(url)
43 }
44
45 pub fn builder() -> HttpClientBuilder {
49 HttpClientBuilder::new()
50 }
51}
52
53#[cfg(feature = "http-async-std")]
54impl HttpClient {
55 async fn send_request<REQ, RSP>(&self, request: REQ) -> Result<RSP, HttpClientError>
56 where
57 REQ: Serialize,
58 RSP: Serialize + DeserializeOwned,
59 {
60 let request = serde_json::to_string(&request).expect("serialize request");
61 log::debug!("Request: {}", request);
62
63 let mut builder = self
64 .client
65 .post(&self.url)
66 .content_type(surf::http::mime::JSON)
67 .body(request);
68 for (header_name, header_value) in self.headers.iter() {
69 builder = builder.header(
70 header_name.as_str(),
71 header_value.to_str().expect("must be visible ascii"),
72 );
73 }
74
75 let response = builder.send();
76 let response = if let Some(duration) = self.timeout {
77 let timeout = async_std::task::sleep(duration);
78 futures::pin_mut!(response, timeout);
79 match futures::future::select(response, timeout).await {
80 futures::future::Either::Left((response, _)) => response,
81 futures::future::Either::Right((_, _)) => return Err(anyhow::anyhow!("http request timeout").into()),
82 }
83 } else {
84 response.await
85 };
86 let mut response = response.map_err(|err| err.into_inner())?;
87
88 let response = response.body_string().await.map_err(|err| err.into_inner())?;
89 log::debug!("Response: {}", response);
90
91 Ok(serde_json::from_str::<RSP>(&response)?)
92 }
93}
94
95#[cfg(feature = "http-tokio")]
96impl HttpClient {
97 async fn send_request<REQ, RSP>(&self, request: REQ) -> Result<RSP, HttpClientError>
98 where
99 REQ: Serialize,
100 RSP: Serialize + DeserializeOwned,
101 {
102 log::debug!(
103 "Request: {}",
104 serde_json::to_string(&request).expect("serialize request")
105 );
106 let builder = self.client.post(&self.url).json(&request);
107 let response = builder.send().await?;
108 let response = response.json().await?;
109 log::debug!(
110 "Response: {}",
111 serde_json::to_string(&response).expect("serialize response")
112 );
113 Ok(response)
114 }
115}
116
117#[async_trait::async_trait]
118impl Transport for HttpClient {
119 type Error = HttpClientError;
120
121 async fn request<M>(&self, method: M, params: Option<Params>) -> Result<Output, Self::Error>
122 where
123 M: Into<String> + Send,
124 {
125 let id = self.id.fetch_add(1, Ordering::AcqRel);
126 let call = MethodCall::new(method, params, Id::Num(id));
127 self.send_request(call).await
128 }
129}
130
131#[async_trait::async_trait]
132impl BatchTransport for HttpClient {
133 async fn request_batch<I, M>(&self, batch: I) -> Result<Vec<Output>, <Self as Transport>::Error>
134 where
135 I: IntoIterator<Item = (M, Option<Params>)> + Send,
136 I::IntoIter: Send,
137 M: Into<String>,
138 {
139 let calls = batch
140 .into_iter()
141 .map(|(method, params)| {
142 let id = self.id.fetch_add(1, Ordering::AcqRel);
143 MethodCall::new(method, params, Id::Num(id))
144 })
145 .collect::<Vec<_>>();
146 self.send_request(calls).await
147 }
148}