gemachain_client/
http_sender.rs1use {
4 crate::{
5 client_error::Result,
6 rpc_custom_error,
7 rpc_request::{RpcError, RpcRequest, RpcResponseErrorData},
8 rpc_response::RpcSimulateTransactionResult,
9 rpc_sender::*,
10 },
11 log::*,
12 reqwest::{
13 self,
14 header::{CONTENT_TYPE, RETRY_AFTER},
15 StatusCode,
16 },
17 std::{
18 sync::{
19 atomic::{AtomicU64, Ordering},
20 Arc, RwLock,
21 },
22 thread::sleep,
23 time::{Duration, Instant},
24 },
25};
26
27pub struct HttpSender {
28 client: Arc<reqwest::blocking::Client>,
29 url: String,
30 request_id: AtomicU64,
31 stats: RwLock<RpcTransportStats>,
32}
33
34impl HttpSender {
36 pub fn new(url: String) -> Self {
41 Self::new_with_timeout(url, Duration::from_secs(30))
42 }
43
44 pub fn new_with_timeout(url: String, timeout: Duration) -> Self {
48 let client = Arc::new(
51 tokio::task::block_in_place(move || {
52 reqwest::blocking::Client::builder()
53 .timeout(timeout)
54 .build()
55 })
56 .expect("build rpc client"),
57 );
58
59 Self {
60 client,
61 url,
62 request_id: AtomicU64::new(0),
63 stats: RwLock::new(RpcTransportStats::default()),
64 }
65 }
66}
67
68#[derive(Deserialize, Debug)]
69struct RpcErrorObject {
70 code: i64,
71 message: String,
72}
73
74struct StatsUpdater<'a> {
75 stats: &'a RwLock<RpcTransportStats>,
76 request_start_time: Instant,
77 rate_limited_time: Duration,
78}
79
80impl<'a> StatsUpdater<'a> {
81 fn new(stats: &'a RwLock<RpcTransportStats>) -> Self {
82 Self {
83 stats,
84 request_start_time: Instant::now(),
85 rate_limited_time: Duration::default(),
86 }
87 }
88
89 fn add_rate_limited_time(&mut self, duration: Duration) {
90 self.rate_limited_time += duration;
91 }
92}
93
94impl<'a> Drop for StatsUpdater<'a> {
95 fn drop(&mut self) {
96 let mut stats = self.stats.write().unwrap();
97 stats.request_count += 1;
98 stats.elapsed_time += Instant::now().duration_since(self.request_start_time);
99 stats.rate_limited_time += self.rate_limited_time;
100 }
101}
102
103impl RpcSender for HttpSender {
104 fn get_transport_stats(&self) -> RpcTransportStats {
105 self.stats.read().unwrap().clone()
106 }
107
108 fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value> {
109 let mut stats_updater = StatsUpdater::new(&self.stats);
110
111 let request_id = self.request_id.fetch_add(1, Ordering::Relaxed);
112 let request_json = request.build_request_json(request_id, params).to_string();
113
114 let mut too_many_requests_retries = 5;
115 loop {
116 let response = {
119 let client = self.client.clone();
120 let request_json = request_json.clone();
121 tokio::task::block_in_place(move || {
122 client
123 .post(&self.url)
124 .header(CONTENT_TYPE, "application/json")
125 .body(request_json)
126 .send()
127 })
128 };
129
130 match response {
131 Ok(response) => {
132 if !response.status().is_success() {
133 if response.status() == StatusCode::TOO_MANY_REQUESTS
134 && too_many_requests_retries > 0
135 {
136 let mut duration = Duration::from_millis(500);
137 if let Some(retry_after) = response.headers().get(RETRY_AFTER) {
138 if let Ok(retry_after) = retry_after.to_str() {
139 if let Ok(retry_after) = retry_after.parse::<u64>() {
140 if retry_after < 120 {
141 duration = Duration::from_secs(retry_after);
142 }
143 }
144 }
145 }
146
147 too_many_requests_retries -= 1;
148 debug!(
149 "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
150 response, too_many_requests_retries, duration
151 );
152
153 sleep(duration);
154 stats_updater.add_rate_limited_time(duration);
155 continue;
156 }
157 return Err(response.error_for_status().unwrap_err().into());
158 }
159
160 let response_text = tokio::task::block_in_place(move || response.text())?;
161
162 let json: serde_json::Value = serde_json::from_str(&response_text)?;
163 if json["error"].is_object() {
164 return match serde_json::from_value::<RpcErrorObject>(json["error"].clone())
165 {
166 Ok(rpc_error_object) => {
167 let data = match rpc_error_object.code {
168 rpc_custom_error::JSON_RPC_SERVER_ERROR_SEND_TRANSACTION_PREFLIGHT_FAILURE => {
169 match serde_json::from_value::<RpcSimulateTransactionResult>(json["error"]["data"].clone()) {
170 Ok(data) => RpcResponseErrorData::SendTransactionPreflightFailure(data),
171 Err(err) => {
172 debug!("Failed to deserialize RpcSimulateTransactionResult: {:?}", err);
173 RpcResponseErrorData::Empty
174 }
175 }
176 },
177 rpc_custom_error::JSON_RPC_SERVER_ERROR_NODE_UNHEALTHY => {
178 match serde_json::from_value::<rpc_custom_error::NodeUnhealthyErrorData>(json["error"]["data"].clone()) {
179 Ok(rpc_custom_error::NodeUnhealthyErrorData {num_slots_behind}) => RpcResponseErrorData::NodeUnhealthy {num_slots_behind},
180 Err(_err) => {
181 RpcResponseErrorData::Empty
182 }
183 }
184 },
185 _ => RpcResponseErrorData::Empty
186 };
187
188 Err(RpcError::RpcResponseError {
189 code: rpc_error_object.code,
190 message: rpc_error_object.message,
191 data,
192 }
193 .into())
194 }
195 Err(err) => Err(RpcError::RpcRequestError(format!(
196 "Failed to deserialize RPC error response: {} [{}]",
197 serde_json::to_string(&json["error"]).unwrap(),
198 err
199 ))
200 .into()),
201 };
202 }
203 return Ok(json["result"].clone());
204 }
205 Err(err) => {
206 return Err(err.into());
207 }
208 }
209 }
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216
217 #[tokio::test(flavor = "multi_thread")]
218 async fn http_sender_on_tokio_multi_thread() {
219 let http_sender = HttpSender::new("http://localhost:1234".to_string());
220 let _ = http_sender.send(RpcRequest::GetVersion, serde_json::Value::Null);
221 }
222
223 #[tokio::test(flavor = "current_thread")]
224 #[should_panic(expected = "can call blocking only when running on the multi-threaded runtime")]
225 async fn http_sender_ontokio_current_thread_should_panic() {
226 let _ = HttpSender::new("http://localhost:1234".to_string());
229 }
230}