ethers_providers/rpc/transports/
retry.rs1use super::{common::JsonRpcError, http::ClientError};
5use crate::{errors::ProviderError, JsonRpcClient};
6use async_trait::async_trait;
7use serde::{de::DeserializeOwned, Deserialize, Serialize};
8use std::{
9 fmt::Debug,
10 sync::atomic::{AtomicU32, Ordering},
11 time::Duration,
12};
13use thiserror::Error;
14use tracing::trace;
15
16pub trait RetryPolicy<E>: Send + Sync + Debug {
19 fn should_retry(&self, error: &E) -> bool;
21
22 fn backoff_hint(&self, error: &E) -> Option<Duration>;
24}
25
26#[derive(Debug)]
50pub struct RetryClient<T>
51where
52 T: JsonRpcClient,
53 T::Error: crate::RpcError + Sync + Send + 'static,
54{
55 inner: T,
56 requests_enqueued: AtomicU32,
57 policy: Box<dyn RetryPolicy<T::Error>>,
59 timeout_retries: u32,
61 rate_limit_retries: u32,
63 initial_backoff: Duration,
65 compute_units_per_second: u64,
67}
68
69impl<T> RetryClient<T>
70where
71 T: JsonRpcClient,
72 T::Error: Sync + Send + 'static,
73{
74 pub fn new(
93 inner: T,
94 policy: Box<dyn RetryPolicy<T::Error>>,
95 max_retry: u32,
96 initial_backoff: u64,
98 ) -> Self {
99 RetryClientBuilder::default()
100 .initial_backoff(Duration::from_millis(initial_backoff))
101 .rate_limit_retries(max_retry)
102 .build(inner, policy)
103 }
104
105 pub fn set_compute_units(&mut self, cpus: u64) -> &mut Self {
112 self.compute_units_per_second = cpus;
113 self
114 }
115}
116
117#[derive(Debug, Clone, Eq, PartialEq)]
119pub struct RetryClientBuilder {
120 timeout_retries: u32,
122 rate_limit_retries: u32,
124 initial_backoff: Duration,
126 compute_units_per_second: u64,
128}
129
130impl RetryClientBuilder {
133 pub fn timeout_retries(mut self, timeout_retries: u32) -> Self {
137 self.timeout_retries = timeout_retries;
138 self
139 }
140
141 pub fn rate_limit_retries(mut self, rate_limit_retries: u32) -> Self {
143 self.rate_limit_retries = rate_limit_retries;
144 self
145 }
146
147 pub fn compute_units_per_second(mut self, compute_units_per_second: u64) -> Self {
151 self.compute_units_per_second = compute_units_per_second;
152 self
153 }
154
155 pub fn initial_backoff(mut self, initial_backoff: Duration) -> Self {
157 self.initial_backoff = initial_backoff;
158 self
159 }
160
161 pub fn build<T>(self, client: T, policy: Box<dyn RetryPolicy<T::Error>>) -> RetryClient<T>
163 where
164 T: JsonRpcClient,
165 T::Error: Sync + Send + 'static,
166 {
167 let RetryClientBuilder {
168 timeout_retries,
169 rate_limit_retries,
170 initial_backoff,
171 compute_units_per_second,
172 } = self;
173 RetryClient {
174 inner: client,
175 requests_enqueued: AtomicU32::new(0),
176 policy,
177 timeout_retries,
178 rate_limit_retries,
179 initial_backoff,
180 compute_units_per_second,
181 }
182 }
183}
184
185impl Default for RetryClientBuilder {
187 fn default() -> Self {
188 Self {
189 timeout_retries: 3,
190 rate_limit_retries: 10,
192 initial_backoff: Duration::from_millis(1000),
193 compute_units_per_second: 330,
195 }
196 }
197}
198
199#[derive(Error, Debug)]
204pub enum RetryClientError {
205 #[error(transparent)]
207 ProviderError(ProviderError),
208 TimeoutError,
210 #[error(transparent)]
212 SerdeJson(serde_json::Error),
213}
214
215impl crate::RpcError for RetryClientError {
216 fn as_error_response(&self) -> Option<&super::JsonRpcError> {
217 if let RetryClientError::ProviderError(err) = self {
218 err.as_error_response()
219 } else {
220 None
221 }
222 }
223
224 fn as_serde_error(&self) -> Option<&serde_json::Error> {
225 match self {
226 RetryClientError::ProviderError(e) => e.as_serde_error(),
227 RetryClientError::SerdeJson(e) => Some(e),
228 _ => None,
229 }
230 }
231}
232
233impl std::fmt::Display for RetryClientError {
234 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235 write!(f, "{self:?}")
236 }
237}
238
239impl From<RetryClientError> for ProviderError {
240 fn from(src: RetryClientError) -> Self {
241 match src {
242 RetryClientError::ProviderError(err) => err,
243 RetryClientError::TimeoutError => ProviderError::JsonRpcClientError(Box::new(src)),
244 RetryClientError::SerdeJson(err) => err.into(),
245 }
246 }
247}
248
249#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
250#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
251impl<T> JsonRpcClient for RetryClient<T>
252where
253 T: JsonRpcClient + 'static,
254 T::Error: Sync + Send + 'static,
255{
256 type Error = RetryClientError;
257
258 async fn request<A, R>(&self, method: &str, params: A) -> Result<R, Self::Error>
259 where
260 A: Debug + Serialize + Send + Sync,
261 R: DeserializeOwned + Send,
262 {
263 enum RetryParams<Params> {
267 Value(Params),
268 Zst(()),
269 }
270
271 let params = if std::mem::size_of::<A>() == 0 {
272 RetryParams::Zst(())
273 } else {
274 let params = serde_json::to_value(params).map_err(RetryClientError::SerdeJson)?;
275 RetryParams::Value(params)
276 };
277
278 let ahead_in_queue = self.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64;
279
280 let mut rate_limit_retry_number: u32 = 0;
281 let mut timeout_retries: u32 = 0;
282
283 loop {
284 let err;
285
286 {
289 let resp = match params {
290 RetryParams::Value(ref params) => self.inner.request(method, params).await,
291 RetryParams::Zst(unit) => self.inner.request(method, unit).await,
292 };
293 match resp {
294 Ok(ret) => {
295 self.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
296 return Ok(ret)
297 }
298 Err(err_) => err = err_,
299 }
300 }
301
302 let should_retry = self.policy.should_retry(&err);
303 if should_retry {
304 rate_limit_retry_number += 1;
305 if rate_limit_retry_number > self.rate_limit_retries {
306 trace!("request timed out after {} retries", self.rate_limit_retries);
307 return Err(RetryClientError::TimeoutError)
308 }
309
310 let current_queued_requests = self.requests_enqueued.load(Ordering::SeqCst) as u64;
311
312 let mut next_backoff = self.policy.backoff_hint(&err).unwrap_or_else(|| {
315 Duration::from_millis(self.initial_backoff.as_millis() as u64)
316 });
317
318 const AVG_COST: u64 = 17u64;
327 let seconds_to_wait_for_compute_budget = compute_unit_offset_in_secs(
328 AVG_COST,
329 self.compute_units_per_second,
330 current_queued_requests,
331 ahead_in_queue,
332 );
333 next_backoff += Duration::from_secs(seconds_to_wait_for_compute_budget);
334
335 trace!("retrying and backing off for {:?}", next_backoff);
336
337 #[cfg(target_arch = "wasm32")]
338 futures_timer::Delay::new(next_backoff).await;
339
340 #[cfg(not(target_arch = "wasm32"))]
341 tokio::time::sleep(next_backoff).await;
342 } else {
343 let err: ProviderError = err.into();
344 if timeout_retries < self.timeout_retries && maybe_connectivity(&err) {
345 timeout_retries += 1;
346 trace!(err = ?err, "retrying due to spurious network");
347 continue
348 }
349
350 trace!(err = ?err, "should not retry");
351 self.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
352 return Err(RetryClientError::ProviderError(err))
353 }
354 }
355 }
356}
357
358#[derive(Debug, Default)]
364pub struct HttpRateLimitRetryPolicy;
365
366impl RetryPolicy<ClientError> for HttpRateLimitRetryPolicy {
367 fn should_retry(&self, error: &ClientError) -> bool {
368 fn should_retry_json_rpc_error(err: &JsonRpcError) -> bool {
369 let JsonRpcError { code, message, .. } = err;
370 if *code == 429 {
372 return true
373 }
374
375 if *code == -32005 {
377 return true
378 }
379
380 if *code == -32016 && message.contains("rate limit") {
382 return true
383 }
384
385 match message.as_str() {
386 "header not found" => true,
388 "daily request count exceeded, request rate limited" => true,
390 _ => false,
391 }
392 }
393
394 match error {
395 ClientError::ReqwestError(err) => {
396 err.status() == Some(http::StatusCode::TOO_MANY_REQUESTS)
397 }
398 ClientError::JsonRpcError(err) => should_retry_json_rpc_error(err),
399 ClientError::SerdeJson { text, .. } => {
400 #[derive(Deserialize)]
403 struct Resp {
404 error: JsonRpcError,
405 }
406
407 if let Ok(resp) = serde_json::from_str::<Resp>(text) {
408 return should_retry_json_rpc_error(&resp.error)
409 }
410 false
411 }
412 }
413 }
414
415 fn backoff_hint(&self, error: &ClientError) -> Option<Duration> {
416 if let ClientError::JsonRpcError(JsonRpcError { data, .. }) = error {
417 let data = data.as_ref()?;
418
419 let backoff_seconds = &data["rate"]["backoff_seconds"];
422 if let Some(seconds) = backoff_seconds.as_u64() {
424 return Some(Duration::from_secs(seconds))
425 }
426 if let Some(seconds) = backoff_seconds.as_f64() {
427 return Some(Duration::from_secs(seconds as u64 + 1))
428 }
429 }
430
431 None
432 }
433}
434
435fn compute_unit_offset_in_secs(
447 avg_cost: u64,
448 compute_units_per_second: u64,
449 current_queued_requests: u64,
450 ahead_in_queue: u64,
451) -> u64 {
452 let request_capacity_per_second = compute_units_per_second.saturating_div(avg_cost);
453 if current_queued_requests > request_capacity_per_second {
454 current_queued_requests.min(ahead_in_queue).saturating_div(request_capacity_per_second)
455 } else {
456 0
457 }
458}
459
460fn maybe_connectivity(err: &ProviderError) -> bool {
463 if let ProviderError::HTTPError(reqwest_err) = err {
464 if reqwest_err.is_timeout() {
465 return true
466 }
467
468 #[cfg(not(target_arch = "wasm32"))]
469 if reqwest_err.is_connect() {
470 return true
471 }
472
473 if let Some(status) = reqwest_err.status() {
475 let code = status.as_u16();
476 if (500..600).contains(&code) {
477 return true
478 }
479 }
480 }
481 false
482}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487 const AVG_COST: u64 = 17u64;
489 const COMPUTE_UNITS: u64 = 330u64;
490
491 fn compute_offset(current_queued_requests: u64, ahead_in_queue: u64) -> u64 {
492 compute_unit_offset_in_secs(
493 AVG_COST,
494 COMPUTE_UNITS,
495 current_queued_requests,
496 ahead_in_queue,
497 )
498 }
499
500 #[test]
501 fn can_measure_unit_offset_single_request() {
502 let current_queued_requests = 1;
503 let ahead_in_queue = 0;
504 let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
505 assert_eq!(to_wait, 0);
506
507 let current_queued_requests = 19;
508 let ahead_in_queue = 18;
509 let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
510 assert_eq!(to_wait, 0);
511 }
512
513 #[test]
514 fn can_measure_unit_offset_1x_over_budget() {
515 let current_queued_requests = 20;
516 let ahead_in_queue = 19;
517 let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
518 assert_eq!(to_wait, 1);
520 }
521
522 #[test]
523 fn can_measure_unit_offset_2x_over_budget() {
524 let current_queued_requests = 49;
525 let ahead_in_queue = 48;
526 let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
527 assert_eq!(to_wait, 2);
529
530 let current_queued_requests = 49;
531 let ahead_in_queue = 20;
532 let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
533 assert_eq!(to_wait, 1);
535 }
536
537 #[test]
538 fn can_extract_backoff() {
539 let resp = r#"{"rate": {"allowed_rps": 1, "backoff_seconds": 30, "current_rps": 1.1}, "see": "https://infura.io/dashboard"}"#;
540
541 let err = ClientError::JsonRpcError(JsonRpcError {
542 code: 0,
543 message: "daily request count exceeded, request rate limited".to_string(),
544 data: Some(serde_json::from_str(resp).unwrap()),
545 });
546 let backoff = HttpRateLimitRetryPolicy.backoff_hint(&err).unwrap();
547 assert_eq!(backoff, Duration::from_secs(30));
548
549 let err = ClientError::JsonRpcError(JsonRpcError {
550 code: 0,
551 message: "daily request count exceeded, request rate limited".to_string(),
552 data: Some(serde_json::Value::String("blocked".to_string())),
553 });
554 let backoff = HttpRateLimitRetryPolicy.backoff_hint(&err);
555 assert!(backoff.is_none());
556 }
557
558 #[test]
559 fn test_alchemy_ip_rate_limit() {
560 let s = "{\"code\":-32016,\"message\":\"Your IP has exceeded its requests per second capacity. To increase your rate limits, please sign up for a free Alchemy account at https://www.alchemy.com/optimism.\"}";
561 let err: JsonRpcError = serde_json::from_str(s).unwrap();
562 let err = ClientError::JsonRpcError(err);
563
564 let should_retry = HttpRateLimitRetryPolicy.should_retry(&err);
565 assert!(should_retry);
566 }
567
568 #[test]
569 fn test_rate_limit_omitted_id() {
570 let s = r#"{"jsonrpc":"2.0","error":{"code":-32016,"message":"Your IP has exceeded its requests per second capacity. To increase your rate limits, please sign up for a free Alchemy account at https://www.alchemy.com/optimism."},"id":null}"#;
571
572 let err = ClientError::SerdeJson {
573 err: serde::de::Error::custom("unexpected notification over HTTP transport"),
574 text: s.to_string(),
575 };
576
577 let should_retry = HttpRateLimitRetryPolicy.should_retry(&err);
578 assert!(should_retry);
579 }
580}