rpc_json_client/
client.rs1use crate::error::Error;
2use crate::RpcRequest;
3use crate::RpcResponse;
4use async_std::task::sleep;
5use futures::lock::Mutex;
6use isahc::Error as IsahcError;
7use log::{info, warn};
8use serde_json::json;
9use std::sync::Arc;
10use std::time::Duration;
11
12pub struct RpcClient {
17 url: String,
18 user: Option<String>,
19 password: Option<String>,
20 id: Arc<Mutex<u64>>,
21 retry: bool,
22 backup_urls: Vec<String>,
23}
24
25impl RpcClient {
26 pub fn new(
27 url: &str,
28 user: Option<String>,
29 password: Option<String>,
30 retry: bool,
31 backup_urls: Vec<String>,
32 ) -> Self {
33 RpcClient {
34 url: url.to_owned(),
35 user,
36 password,
37 id: Arc::new(Mutex::new(0)),
38 retry,
39 backup_urls,
40 }
41 }
42
43 async fn build_request(&self, method: &str, params: &[serde_json::value::Value]) -> RpcRequest {
44 let mut id = self.id.lock().await;
45 *id += 1;
46 RpcRequest {
47 method: method.to_owned(),
48 params: params.to_vec(),
49 id: json!(*id),
50 jsonrpc: Some("2.0".to_owned()),
51 }
52 }
53
54 pub async fn execute<T: for<'a> serde::de::Deserialize<'a>>(
55 &self,
56 method: &str,
57 params: &[serde_json::value::Value],
58 ) -> Result<T, Error> {
59 let request = self.build_request(method, params).await;
60
61 let response = self.send_request(&request).await?;
62
63 Ok(response.into_result()?)
64 }
65
66 pub async fn send_request(&self, request: &RpcRequest) -> Result<RpcResponse, Error> {
67 let response: RpcResponse = self.send_raw(&request).await?;
68
69 if response.jsonrpc != None && response.jsonrpc != Some(From::from("2.0")) {
70 return Err(Error::VersionMismatch);
71 }
72 if response.id != request.id {
73 return Err(Error::IdMismatch);
74 }
75 Ok(response)
76 }
77
78 async fn send_raw<B, R>(&self, body: &B) -> Result<R, Error>
80 where
81 B: serde::ser::Serialize,
82 R: for<'de> serde::de::Deserialize<'de>,
83 {
84 let retry_max = 5;
85 let mut retries = 0;
86 let mut current_url = self.url.clone();
93 let current_backup_url = 0;
95
96 loop {
97 let mut req = surf::post(¤t_url);
98 if let Some(ref user) = self.user {
102 let mut auth = user.clone();
103 auth.push(':');
104 if let Some(ref pass) = self.password {
105 auth.push_str(&pass[..]);
106 }
107
108 let value = format!("Basic {}", &base64::encode(auth.as_bytes()));
109
110 req = req.header("Authorization", value);
111 }
112
113 let req = req.body(surf::Body::from_json(body)?);
114
115 let mut res = req.send().await?;
116
117 match res.body_json().await {
118 Ok(response) => return Ok(response),
119 Err(e) => {
120 warn!("RPC Request failed with error: {}", e);
121
122 if let Some(err) = &e.downcast_ref::<IsahcError>() {
125 match err {
126 IsahcError::Timeout => {
127 current_url = self.backup_urls[current_backup_url].clone();
128 }
129 _ => {}
130 }
131 }
132
133 if !self.retry {
134 return Err(Error::HttpError(e));
135 }
136 }
137 }
138
139 if self.retry && retries < retry_max {
140 retries += 1;
141 info!("Retrying request... Retry count: {}", retries);
142 sleep(Duration::from_secs(retries)).await;
144 continue;
146 } else {
147 return Err(Error::FailedRetry);
148 }
149 }
150 }
151}