rpc_json_client/
client.rs

1use crate::error::Error;
2use crate::RpcRequest;
3use crate::RpcResponse;
4use async_std::task::sleep;
5use futures::lock::Mutex;
6use isahc::Error as IsahcError;
7use log::{info, warn};
8use serde_json::json;
9use std::sync::Arc;
10use std::time::Duration;
11
12//TODO this should implement some kind of trait that is exposed at the top level I think.
13//TODO right now this only uses http, we should make this extendable. TCP, UDP, Http, etc
14//@todo we need to throw all this stuff into an inner trait and then create RCs to it so that we
15//can make this cloneable.
16pub struct RpcClient {
17    url: String,
18    user: Option<String>,
19    password: Option<String>,
20    id: Arc<Mutex<u64>>,
21    retry: bool,
22    backup_urls: Vec<String>,
23}
24
25impl RpcClient {
26    pub fn new(
27        url: &str,
28        user: Option<String>,
29        password: Option<String>,
30        retry: bool,
31        backup_urls: Vec<String>,
32    ) -> Self {
33        RpcClient {
34            url: url.to_owned(),
35            user,
36            password,
37            id: Arc::new(Mutex::new(0)),
38            retry,
39            backup_urls,
40        }
41    }
42
43    async fn build_request(&self, method: &str, params: &[serde_json::value::Value]) -> RpcRequest {
44        let mut id = self.id.lock().await;
45        *id += 1;
46        RpcRequest {
47            method: method.to_owned(),
48            params: params.to_vec(),
49            id: json!(*id),
50            jsonrpc: Some("2.0".to_owned()),
51        }
52    }
53
54    pub async fn execute<T: for<'a> serde::de::Deserialize<'a>>(
55        &self,
56        method: &str,
57        params: &[serde_json::value::Value],
58    ) -> Result<T, Error> {
59        let request = self.build_request(method, params).await;
60
61        let response = self.send_request(&request).await?;
62
63        Ok(response.into_result()?)
64    }
65
66    pub async fn send_request(&self, request: &RpcRequest) -> Result<RpcResponse, Error> {
67        let response: RpcResponse = self.send_raw(&request).await?;
68
69        if response.jsonrpc != None && response.jsonrpc != Some(From::from("2.0")) {
70            return Err(Error::VersionMismatch);
71        }
72        if response.id != request.id {
73            return Err(Error::IdMismatch);
74        }
75        Ok(response)
76    }
77
78    /// The actual send logic used by both [send_request] and [send_batch].
79    async fn send_raw<B, R>(&self, body: &B) -> Result<R, Error>
80    where
81        B: serde::ser::Serialize,
82        R: for<'de> serde::de::Deserialize<'de>,
83    {
84        let retry_max = 5;
85        let mut retries = 0;
86        // Build request
87        // let request_raw = serde_json::to_vec(body)?;
88        // let request_raw = serde_json::to_vec(body).unwrap(); //TODO
89
90        // let request = Request::builder().method("POST").header("Content-Type", "application/json");
91        // let mut request_builder = Request::builder();
92        let mut current_url = self.url.clone();
93        //@todo current only supports 1 backup, let's improve this.
94        let current_backup_url = 0;
95
96        loop {
97            let mut req = surf::post(&current_url);
98            //@todo we might just want to set MIME here actually see: https://docs.rs/surf/1.0.2/surf/struct.Request.html#method.set_mime
99            // request_builder.uri(&self.url).method("POST").header("Content-Type", "application/json");
100
101            if let Some(ref user) = self.user {
102                let mut auth = user.clone();
103                auth.push(':');
104                if let Some(ref pass) = self.password {
105                    auth.push_str(&pass[..]);
106                }
107
108                let value = format!("Basic {}", &base64::encode(auth.as_bytes()));
109
110                req = req.header("Authorization", value);
111            }
112
113            let req = req.body(surf::Body::from_json(body)?);
114
115            let mut res = req.send().await?;
116
117            match res.body_json().await {
118                Ok(response) => return Ok(response),
119                Err(e) => {
120                    warn!("RPC Request failed with error: {}", e);
121
122                    //@todo define more conditions in which we'd want to try the backup URL. For
123                    //now, we just use timeouts.
124                    if let Some(err) = &e.downcast_ref::<IsahcError>() {
125                        match err {
126                            IsahcError::Timeout => {
127                                current_url = self.backup_urls[current_backup_url].clone();
128                            }
129                            _ => {}
130                        }
131                    }
132
133                    if !self.retry {
134                        return Err(Error::HttpError(e));
135                    }
136                }
137            }
138
139            if self.retry && retries < retry_max {
140                retries += 1;
141                info!("Retrying request... Retry count: {}", retries);
142                //Currently just sleeps the amount of time of retries.
143                sleep(Duration::from_secs(retries)).await;
144                //Just to be explicit
145                continue;
146            } else {
147                return Err(Error::FailedRetry);
148            }
149        }
150    }
151}