gemachain_client/
http_sender.rs

1//! The standard [`RpcSender`] over HTTP.
2
3use {
4    crate::{
5        client_error::Result,
6        rpc_custom_error,
7        rpc_request::{RpcError, RpcRequest, RpcResponseErrorData},
8        rpc_response::RpcSimulateTransactionResult,
9        rpc_sender::*,
10    },
11    log::*,
12    reqwest::{
13        self,
14        header::{CONTENT_TYPE, RETRY_AFTER},
15        StatusCode,
16    },
17    std::{
18        sync::{
19            atomic::{AtomicU64, Ordering},
20            Arc, RwLock,
21        },
22        thread::sleep,
23        time::{Duration, Instant},
24    },
25};
26
27pub struct HttpSender {
28    client: Arc<reqwest::blocking::Client>,
29    url: String,
30    request_id: AtomicU64,
31    stats: RwLock<RpcTransportStats>,
32}
33
34/// The standard [`RpcSender`] over HTTP.
35impl HttpSender {
36    /// Create an HTTP RPC sender.
37    ///
38    /// The URL is an HTTP URL, usually for port 8899, as in
39    /// "http://localhost:8899". The sender has a default timeout of 30 seconds.
40    pub fn new(url: String) -> Self {
41        Self::new_with_timeout(url, Duration::from_secs(30))
42    }
43
44    /// Create an HTTP RPC sender.
45    ///
46    /// The URL is an HTTP URL, usually for port 8899.
47    pub fn new_with_timeout(url: String, timeout: Duration) -> Self {
48        // `reqwest::blocking::Client` panics if run in a tokio async context.  Shuttle the
49        // request to a different tokio thread to avoid this
50        let client = Arc::new(
51            tokio::task::block_in_place(move || {
52                reqwest::blocking::Client::builder()
53                    .timeout(timeout)
54                    .build()
55            })
56            .expect("build rpc client"),
57        );
58
59        Self {
60            client,
61            url,
62            request_id: AtomicU64::new(0),
63            stats: RwLock::new(RpcTransportStats::default()),
64        }
65    }
66}
67
68#[derive(Deserialize, Debug)]
69struct RpcErrorObject {
70    code: i64,
71    message: String,
72}
73
74struct StatsUpdater<'a> {
75    stats: &'a RwLock<RpcTransportStats>,
76    request_start_time: Instant,
77    rate_limited_time: Duration,
78}
79
80impl<'a> StatsUpdater<'a> {
81    fn new(stats: &'a RwLock<RpcTransportStats>) -> Self {
82        Self {
83            stats,
84            request_start_time: Instant::now(),
85            rate_limited_time: Duration::default(),
86        }
87    }
88
89    fn add_rate_limited_time(&mut self, duration: Duration) {
90        self.rate_limited_time += duration;
91    }
92}
93
94impl<'a> Drop for StatsUpdater<'a> {
95    fn drop(&mut self) {
96        let mut stats = self.stats.write().unwrap();
97        stats.request_count += 1;
98        stats.elapsed_time += Instant::now().duration_since(self.request_start_time);
99        stats.rate_limited_time += self.rate_limited_time;
100    }
101}
102
103impl RpcSender for HttpSender {
104    fn get_transport_stats(&self) -> RpcTransportStats {
105        self.stats.read().unwrap().clone()
106    }
107
108    fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value> {
109        let mut stats_updater = StatsUpdater::new(&self.stats);
110
111        let request_id = self.request_id.fetch_add(1, Ordering::Relaxed);
112        let request_json = request.build_request_json(request_id, params).to_string();
113
114        let mut too_many_requests_retries = 5;
115        loop {
116            // `reqwest::blocking::Client` panics if run in a tokio async context.  Shuttle the
117            // request to a different tokio thread to avoid this
118            let response = {
119                let client = self.client.clone();
120                let request_json = request_json.clone();
121                tokio::task::block_in_place(move || {
122                    client
123                        .post(&self.url)
124                        .header(CONTENT_TYPE, "application/json")
125                        .body(request_json)
126                        .send()
127                })
128            };
129
130            match response {
131                Ok(response) => {
132                    if !response.status().is_success() {
133                        if response.status() == StatusCode::TOO_MANY_REQUESTS
134                            && too_many_requests_retries > 0
135                        {
136                            let mut duration = Duration::from_millis(500);
137                            if let Some(retry_after) = response.headers().get(RETRY_AFTER) {
138                                if let Ok(retry_after) = retry_after.to_str() {
139                                    if let Ok(retry_after) = retry_after.parse::<u64>() {
140                                        if retry_after < 120 {
141                                            duration = Duration::from_secs(retry_after);
142                                        }
143                                    }
144                                }
145                            }
146
147                            too_many_requests_retries -= 1;
148                            debug!(
149                                "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
150                                response, too_many_requests_retries, duration
151                            );
152
153                            sleep(duration);
154                            stats_updater.add_rate_limited_time(duration);
155                            continue;
156                        }
157                        return Err(response.error_for_status().unwrap_err().into());
158                    }
159
160                    let response_text = tokio::task::block_in_place(move || response.text())?;
161
162                    let json: serde_json::Value = serde_json::from_str(&response_text)?;
163                    if json["error"].is_object() {
164                        return match serde_json::from_value::<RpcErrorObject>(json["error"].clone())
165                        {
166                            Ok(rpc_error_object) => {
167                                let data = match rpc_error_object.code {
168                                    rpc_custom_error::JSON_RPC_SERVER_ERROR_SEND_TRANSACTION_PREFLIGHT_FAILURE => {
169                                        match serde_json::from_value::<RpcSimulateTransactionResult>(json["error"]["data"].clone()) {
170                                            Ok(data) => RpcResponseErrorData::SendTransactionPreflightFailure(data),
171                                            Err(err) => {
172                                                debug!("Failed to deserialize RpcSimulateTransactionResult: {:?}", err);
173                                                RpcResponseErrorData::Empty
174                                            }
175                                        }
176                                    },
177                                    rpc_custom_error::JSON_RPC_SERVER_ERROR_NODE_UNHEALTHY => {
178                                        match serde_json::from_value::<rpc_custom_error::NodeUnhealthyErrorData>(json["error"]["data"].clone()) {
179                                            Ok(rpc_custom_error::NodeUnhealthyErrorData {num_slots_behind}) => RpcResponseErrorData::NodeUnhealthy {num_slots_behind},
180                                            Err(_err) => {
181                                                RpcResponseErrorData::Empty
182                                            }
183                                        }
184                                    },
185                                    _ => RpcResponseErrorData::Empty
186                                };
187
188                                Err(RpcError::RpcResponseError {
189                                    code: rpc_error_object.code,
190                                    message: rpc_error_object.message,
191                                    data,
192                                }
193                                .into())
194                            }
195                            Err(err) => Err(RpcError::RpcRequestError(format!(
196                                "Failed to deserialize RPC error response: {} [{}]",
197                                serde_json::to_string(&json["error"]).unwrap(),
198                                err
199                            ))
200                            .into()),
201                        };
202                    }
203                    return Ok(json["result"].clone());
204                }
205                Err(err) => {
206                    return Err(err.into());
207                }
208            }
209        }
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216
217    #[tokio::test(flavor = "multi_thread")]
218    async fn http_sender_on_tokio_multi_thread() {
219        let http_sender = HttpSender::new("http://localhost:1234".to_string());
220        let _ = http_sender.send(RpcRequest::GetVersion, serde_json::Value::Null);
221    }
222
223    #[tokio::test(flavor = "current_thread")]
224    #[should_panic(expected = "can call blocking only when running on the multi-threaded runtime")]
225    async fn http_sender_ontokio_current_thread_should_panic() {
226        // RpcClient::new() will panic in the tokio current-thread runtime due to `tokio::task::block_in_place()` usage, and there
227        // doesn't seem to be a way to detect whether the tokio runtime is multi_thread or current_thread...
228        let _ = HttpSender::new("http://localhost:1234".to_string());
229    }
230}