ethers_providers/rpc/transports/
retry.rs

1//! A [JsonRpcClient] implementation that retries requests filtered by [RetryPolicy]
2//! with an exponential backoff.
3
4use super::{common::JsonRpcError, http::ClientError};
5use crate::{errors::ProviderError, JsonRpcClient};
6use async_trait::async_trait;
7use serde::{de::DeserializeOwned, Deserialize, Serialize};
8use std::{
9    fmt::Debug,
10    sync::atomic::{AtomicU32, Ordering},
11    time::Duration,
12};
13use thiserror::Error;
14use tracing::trace;
15
16/// [RetryPolicy] defines logic for which [JsonRpcClient::Error] instances should
17/// the client retry the request and try to recover from.
18pub trait RetryPolicy<E>: Send + Sync + Debug {
19    /// Whether to retry the request based on the given `error`
20    fn should_retry(&self, error: &E) -> bool;
21
22    /// Providers may include the `backoff` in the error response directly
23    fn backoff_hint(&self, error: &E) -> Option<Duration>;
24}
25
26/// [RetryClient] presents as a wrapper around [JsonRpcClient] that will retry
27/// requests based with an exponential backoff and filtering based on [RetryPolicy].
28///
29/// The `RetryPolicy`, mainly for rate-limiting errors, can be adjusted for specific applications,
30/// endpoints. In addition to the `RetryPolicy` errors due to connectivity issues, like timed out
31/// connections or responses in range `5xx` can be retried separately.
32///
33/// # Example
34///
35/// ```
36/// #  async fn demo() {
37/// use ethers_providers::{Http, RetryClient, RetryClientBuilder, HttpRateLimitRetryPolicy};
38/// use std::time::Duration;
39/// use url::Url;
40///
41/// let http = Http::new(Url::parse("http://localhost:8545").unwrap());
42/// let client = RetryClientBuilder::default()
43///     .rate_limit_retries(10)
44///     .timeout_retries(3)
45///     .initial_backoff(Duration::from_millis(500))
46///     .build(http, Box::new(HttpRateLimitRetryPolicy::default()));
47/// # }
48/// ```
49#[derive(Debug)]
50pub struct RetryClient<T>
51where
52    T: JsonRpcClient,
53    T::Error: crate::RpcError + Sync + Send + 'static,
54{
55    inner: T,
56    requests_enqueued: AtomicU32,
57    /// The policy to use to determine whether to retry a request due to rate limiting
58    policy: Box<dyn RetryPolicy<T::Error>>,
59    /// How many connection `TimedOut` should be retried.
60    timeout_retries: u32,
61    /// How many retries for rate limited responses
62    rate_limit_retries: u32,
63    /// How long to wait initially
64    initial_backoff: Duration,
65    /// available CPU per second
66    compute_units_per_second: u64,
67}
68
69impl<T> RetryClient<T>
70where
71    T: JsonRpcClient,
72    T::Error: Sync + Send + 'static,
73{
74    /// Creates a new `RetryClient` that wraps a client and adds retry and backoff support
75    ///
76    /// # Example
77    ///
78    /// ```
79    /// 
80    /// # async fn demo() {
81    /// use ethers_providers::{Http, RetryClient, HttpRateLimitRetryPolicy};
82    /// use std::time::Duration;
83    /// use url::Url;
84    ///
85    /// let http = Http::new(Url::parse("http://localhost:8545").unwrap());
86    /// let backoff_timeout = 3000; // in ms
87    /// let max_retries = 10;
88    /// let client = RetryClient::new(http, Box::new(HttpRateLimitRetryPolicy::default()), max_retries, backoff_timeout);
89    ///
90    /// # }
91    /// ```
92    pub fn new(
93        inner: T,
94        policy: Box<dyn RetryPolicy<T::Error>>,
95        max_retry: u32,
96        // in milliseconds
97        initial_backoff: u64,
98    ) -> Self {
99        RetryClientBuilder::default()
100            .initial_backoff(Duration::from_millis(initial_backoff))
101            .rate_limit_retries(max_retry)
102            .build(inner, policy)
103    }
104
105    /// Sets the free compute units per second limit.
106    ///
107    /// This is the maximum number of weighted request that can be handled per second by the
108    /// endpoint before rate limit kicks in.
109    ///
110    /// This is used to guesstimate how long to wait until to retry again
111    pub fn set_compute_units(&mut self, cpus: u64) -> &mut Self {
112        self.compute_units_per_second = cpus;
113        self
114    }
115}
116
117/// Builder for a [`RetryClient`]
118#[derive(Debug, Clone, Eq, PartialEq)]
119pub struct RetryClientBuilder {
120    /// How many connection `TimedOut` should be retried.
121    timeout_retries: u32,
122    /// How many retries for rate limited responses
123    rate_limit_retries: u32,
124    /// How long to wait initially
125    initial_backoff: Duration,
126    /// available CPU per second
127    compute_units_per_second: u64,
128}
129
130// === impl RetryClientBuilder ===
131
132impl RetryClientBuilder {
133    /// Sets the number of retries after a connection times out
134    ///
135    /// **Note:** this will only be used for `request::Error::TimedOut`
136    pub fn timeout_retries(mut self, timeout_retries: u32) -> Self {
137        self.timeout_retries = timeout_retries;
138        self
139    }
140
141    /// How many retries for rate limited responses
142    pub fn rate_limit_retries(mut self, rate_limit_retries: u32) -> Self {
143        self.rate_limit_retries = rate_limit_retries;
144        self
145    }
146
147    /// Sets the number of assumed available compute units per second
148    ///
149    /// See also, <https://github.com/alchemyplatform/alchemy-docs/blob/master/documentation/compute-units.md#rate-limits-cups>
150    pub fn compute_units_per_second(mut self, compute_units_per_second: u64) -> Self {
151        self.compute_units_per_second = compute_units_per_second;
152        self
153    }
154
155    /// Sets the duration to wait initially before retrying
156    pub fn initial_backoff(mut self, initial_backoff: Duration) -> Self {
157        self.initial_backoff = initial_backoff;
158        self
159    }
160
161    /// Creates the `RetryClient` with the configured settings
162    pub fn build<T>(self, client: T, policy: Box<dyn RetryPolicy<T::Error>>) -> RetryClient<T>
163    where
164        T: JsonRpcClient,
165        T::Error: Sync + Send + 'static,
166    {
167        let RetryClientBuilder {
168            timeout_retries,
169            rate_limit_retries,
170            initial_backoff,
171            compute_units_per_second,
172        } = self;
173        RetryClient {
174            inner: client,
175            requests_enqueued: AtomicU32::new(0),
176            policy,
177            timeout_retries,
178            rate_limit_retries,
179            initial_backoff,
180            compute_units_per_second,
181        }
182    }
183}
184
185// Some sensible defaults
186impl Default for RetryClientBuilder {
187    fn default() -> Self {
188        Self {
189            timeout_retries: 3,
190            // this should be enough to even out heavy loads
191            rate_limit_retries: 10,
192            initial_backoff: Duration::from_millis(1000),
193            // alchemy max cpus <https://github.com/alchemyplatform/alchemy-docs/blob/master/documentation/compute-units.md#rate-limits-cups>
194            compute_units_per_second: 330,
195        }
196    }
197}
198
199/// Error thrown when:
200/// 1. Internal client throws an error we do not wish to try to recover from.
201/// 2. Params serialization failed.
202/// 3. Request timed out i.e. max retries were already made.
203#[derive(Error, Debug)]
204pub enum RetryClientError {
205    /// Internal provider error
206    #[error(transparent)]
207    ProviderError(ProviderError),
208    /// Timeout while making requests
209    TimeoutError,
210    /// (De)Serialization error
211    #[error(transparent)]
212    SerdeJson(serde_json::Error),
213}
214
215impl crate::RpcError for RetryClientError {
216    fn as_error_response(&self) -> Option<&super::JsonRpcError> {
217        if let RetryClientError::ProviderError(err) = self {
218            err.as_error_response()
219        } else {
220            None
221        }
222    }
223
224    fn as_serde_error(&self) -> Option<&serde_json::Error> {
225        match self {
226            RetryClientError::ProviderError(e) => e.as_serde_error(),
227            RetryClientError::SerdeJson(e) => Some(e),
228            _ => None,
229        }
230    }
231}
232
233impl std::fmt::Display for RetryClientError {
234    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235        write!(f, "{self:?}")
236    }
237}
238
239impl From<RetryClientError> for ProviderError {
240    fn from(src: RetryClientError) -> Self {
241        match src {
242            RetryClientError::ProviderError(err) => err,
243            RetryClientError::TimeoutError => ProviderError::JsonRpcClientError(Box::new(src)),
244            RetryClientError::SerdeJson(err) => err.into(),
245        }
246    }
247}
248
249#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
250#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
251impl<T> JsonRpcClient for RetryClient<T>
252where
253    T: JsonRpcClient + 'static,
254    T::Error: Sync + Send + 'static,
255{
256    type Error = RetryClientError;
257
258    async fn request<A, R>(&self, method: &str, params: A) -> Result<R, Self::Error>
259    where
260        A: Debug + Serialize + Send + Sync,
261        R: DeserializeOwned + Send,
262    {
263        // Helper type that caches the `params` value across several retries
264        // This is necessary because the wrapper provider is supposed to skip he `params` if it's of
265        // size 0, see `crate::transports::common::Request`
266        enum RetryParams<Params> {
267            Value(Params),
268            Zst(()),
269        }
270
271        let params = if std::mem::size_of::<A>() == 0 {
272            RetryParams::Zst(())
273        } else {
274            let params = serde_json::to_value(params).map_err(RetryClientError::SerdeJson)?;
275            RetryParams::Value(params)
276        };
277
278        let ahead_in_queue = self.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64;
279
280        let mut rate_limit_retry_number: u32 = 0;
281        let mut timeout_retries: u32 = 0;
282
283        loop {
284            let err;
285
286            // hack to not hold `R` across an await in the sleep future and prevent requiring
287            // R: Send + Sync
288            {
289                let resp = match params {
290                    RetryParams::Value(ref params) => self.inner.request(method, params).await,
291                    RetryParams::Zst(unit) => self.inner.request(method, unit).await,
292                };
293                match resp {
294                    Ok(ret) => {
295                        self.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
296                        return Ok(ret)
297                    }
298                    Err(err_) => err = err_,
299                }
300            }
301
302            let should_retry = self.policy.should_retry(&err);
303            if should_retry {
304                rate_limit_retry_number += 1;
305                if rate_limit_retry_number > self.rate_limit_retries {
306                    trace!("request timed out after {} retries", self.rate_limit_retries);
307                    return Err(RetryClientError::TimeoutError)
308                }
309
310                let current_queued_requests = self.requests_enqueued.load(Ordering::SeqCst) as u64;
311
312                // try to extract the requested backoff from the error or compute the next backoff
313                // based on retry count
314                let mut next_backoff = self.policy.backoff_hint(&err).unwrap_or_else(|| {
315                    Duration::from_millis(self.initial_backoff.as_millis() as u64)
316                });
317
318                // requests are usually weighted and can vary from 10 CU to several 100 CU, cheaper
319                // requests are more common some example alchemy weights:
320                // - `eth_getStorageAt`: 17
321                // - `eth_getBlockByNumber`: 16
322                // - `eth_newFilter`: 20
323                //
324                // (coming from forking mode) assuming here that storage request will be the driver
325                // for Rate limits we choose `17` as the average cost of any request
326                const AVG_COST: u64 = 17u64;
327                let seconds_to_wait_for_compute_budget = compute_unit_offset_in_secs(
328                    AVG_COST,
329                    self.compute_units_per_second,
330                    current_queued_requests,
331                    ahead_in_queue,
332                );
333                next_backoff += Duration::from_secs(seconds_to_wait_for_compute_budget);
334
335                trace!("retrying and backing off for {:?}", next_backoff);
336
337                #[cfg(target_arch = "wasm32")]
338                futures_timer::Delay::new(next_backoff).await;
339
340                #[cfg(not(target_arch = "wasm32"))]
341                tokio::time::sleep(next_backoff).await;
342            } else {
343                let err: ProviderError = err.into();
344                if timeout_retries < self.timeout_retries && maybe_connectivity(&err) {
345                    timeout_retries += 1;
346                    trace!(err = ?err, "retrying due to spurious network");
347                    continue
348                }
349
350                trace!(err = ?err, "should not retry");
351                self.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
352                return Err(RetryClientError::ProviderError(err))
353            }
354        }
355    }
356}
357
358/// Implements [RetryPolicy] that will retry requests that errored with
359/// status code 429 i.e. TOO_MANY_REQUESTS
360///
361/// Infura often fails with a `"header not found"` rpc error which is apparently linked to load
362/// balancing, which are retried as well.
363#[derive(Debug, Default)]
364pub struct HttpRateLimitRetryPolicy;
365
366impl RetryPolicy<ClientError> for HttpRateLimitRetryPolicy {
367    fn should_retry(&self, error: &ClientError) -> bool {
368        fn should_retry_json_rpc_error(err: &JsonRpcError) -> bool {
369            let JsonRpcError { code, message, .. } = err;
370            // alchemy throws it this way
371            if *code == 429 {
372                return true
373            }
374
375            // This is an infura error code for `exceeded project rate limit`
376            if *code == -32005 {
377                return true
378            }
379
380            // alternative alchemy error for specific IPs
381            if *code == -32016 && message.contains("rate limit") {
382                return true
383            }
384
385            match message.as_str() {
386                // this is commonly thrown by infura and is apparently a load balancer issue, see also <https://github.com/MetaMask/metamask-extension/issues/7234>
387                "header not found" => true,
388                // also thrown by infura if out of budget for the day and ratelimited
389                "daily request count exceeded, request rate limited" => true,
390                _ => false,
391            }
392        }
393
394        match error {
395            ClientError::ReqwestError(err) => {
396                err.status() == Some(http::StatusCode::TOO_MANY_REQUESTS)
397            }
398            ClientError::JsonRpcError(err) => should_retry_json_rpc_error(err),
399            ClientError::SerdeJson { text, .. } => {
400                // some providers send invalid JSON RPC in the error case (no `id:u64`), but the
401                // text should be a `JsonRpcError`
402                #[derive(Deserialize)]
403                struct Resp {
404                    error: JsonRpcError,
405                }
406
407                if let Ok(resp) = serde_json::from_str::<Resp>(text) {
408                    return should_retry_json_rpc_error(&resp.error)
409                }
410                false
411            }
412        }
413    }
414
415    fn backoff_hint(&self, error: &ClientError) -> Option<Duration> {
416        if let ClientError::JsonRpcError(JsonRpcError { data, .. }) = error {
417            let data = data.as_ref()?;
418
419            // if daily rate limit exceeded, infura returns the requested backoff in the error
420            // response
421            let backoff_seconds = &data["rate"]["backoff_seconds"];
422            // infura rate limit error
423            if let Some(seconds) = backoff_seconds.as_u64() {
424                return Some(Duration::from_secs(seconds))
425            }
426            if let Some(seconds) = backoff_seconds.as_f64() {
427                return Some(Duration::from_secs(seconds as u64 + 1))
428            }
429        }
430
431        None
432    }
433}
434
435/// Calculates an offset in seconds by taking into account the number of currently queued requests,
436/// number of requests that were ahead in the queue when the request was first issued, the average
437/// cost a weighted request (heuristic), and the number of available compute units per seconds.
438///
439/// Returns the number of seconds (the unit the remote endpoint measures compute budget) a request
440/// is supposed to wait to not get rate limited. The budget per second is
441/// `compute_units_per_second`, assuming an average cost of `avg_cost` this allows (in theory)
442/// `compute_units_per_second / avg_cost` requests per seconds without getting rate limited.
443/// By taking into account the number of concurrent request and the position in queue when the
444/// request was first issued and determine the number of seconds a request is supposed to wait, if
445/// at all
446fn compute_unit_offset_in_secs(
447    avg_cost: u64,
448    compute_units_per_second: u64,
449    current_queued_requests: u64,
450    ahead_in_queue: u64,
451) -> u64 {
452    let request_capacity_per_second = compute_units_per_second.saturating_div(avg_cost);
453    if current_queued_requests > request_capacity_per_second {
454        current_queued_requests.min(ahead_in_queue).saturating_div(request_capacity_per_second)
455    } else {
456        0
457    }
458}
459
460/// Checks whether the `error` is the result of a connectivity issue, like
461/// `request::Error::TimedOut`
462fn maybe_connectivity(err: &ProviderError) -> bool {
463    if let ProviderError::HTTPError(reqwest_err) = err {
464        if reqwest_err.is_timeout() {
465            return true
466        }
467
468        #[cfg(not(target_arch = "wasm32"))]
469        if reqwest_err.is_connect() {
470            return true
471        }
472
473        // Error HTTP codes (5xx) are considered connectivity issues and will prompt retry
474        if let Some(status) = reqwest_err.status() {
475            let code = status.as_u16();
476            if (500..600).contains(&code) {
477                return true
478            }
479        }
480    }
481    false
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487    // assumed average cost of a request
488    const AVG_COST: u64 = 17u64;
489    const COMPUTE_UNITS: u64 = 330u64;
490
491    fn compute_offset(current_queued_requests: u64, ahead_in_queue: u64) -> u64 {
492        compute_unit_offset_in_secs(
493            AVG_COST,
494            COMPUTE_UNITS,
495            current_queued_requests,
496            ahead_in_queue,
497        )
498    }
499
500    #[test]
501    fn can_measure_unit_offset_single_request() {
502        let current_queued_requests = 1;
503        let ahead_in_queue = 0;
504        let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
505        assert_eq!(to_wait, 0);
506
507        let current_queued_requests = 19;
508        let ahead_in_queue = 18;
509        let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
510        assert_eq!(to_wait, 0);
511    }
512
513    #[test]
514    fn can_measure_unit_offset_1x_over_budget() {
515        let current_queued_requests = 20;
516        let ahead_in_queue = 19;
517        let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
518        // need to wait 1 second
519        assert_eq!(to_wait, 1);
520    }
521
522    #[test]
523    fn can_measure_unit_offset_2x_over_budget() {
524        let current_queued_requests = 49;
525        let ahead_in_queue = 48;
526        let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
527        // need to wait 1 second
528        assert_eq!(to_wait, 2);
529
530        let current_queued_requests = 49;
531        let ahead_in_queue = 20;
532        let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
533        // need to wait 1 second
534        assert_eq!(to_wait, 1);
535    }
536
537    #[test]
538    fn can_extract_backoff() {
539        let resp = r#"{"rate": {"allowed_rps": 1, "backoff_seconds": 30, "current_rps": 1.1}, "see": "https://infura.io/dashboard"}"#;
540
541        let err = ClientError::JsonRpcError(JsonRpcError {
542            code: 0,
543            message: "daily request count exceeded, request rate limited".to_string(),
544            data: Some(serde_json::from_str(resp).unwrap()),
545        });
546        let backoff = HttpRateLimitRetryPolicy.backoff_hint(&err).unwrap();
547        assert_eq!(backoff, Duration::from_secs(30));
548
549        let err = ClientError::JsonRpcError(JsonRpcError {
550            code: 0,
551            message: "daily request count exceeded, request rate limited".to_string(),
552            data: Some(serde_json::Value::String("blocked".to_string())),
553        });
554        let backoff = HttpRateLimitRetryPolicy.backoff_hint(&err);
555        assert!(backoff.is_none());
556    }
557
558    #[test]
559    fn test_alchemy_ip_rate_limit() {
560        let s = "{\"code\":-32016,\"message\":\"Your IP has exceeded its requests per second capacity. To increase your rate limits, please sign up for a free Alchemy account at https://www.alchemy.com/optimism.\"}";
561        let err: JsonRpcError = serde_json::from_str(s).unwrap();
562        let err = ClientError::JsonRpcError(err);
563
564        let should_retry = HttpRateLimitRetryPolicy.should_retry(&err);
565        assert!(should_retry);
566    }
567
568    #[test]
569    fn test_rate_limit_omitted_id() {
570        let s = r#"{"jsonrpc":"2.0","error":{"code":-32016,"message":"Your IP has exceeded its requests per second capacity. To increase your rate limits, please sign up for a free Alchemy account at https://www.alchemy.com/optimism."},"id":null}"#;
571
572        let err = ClientError::SerdeJson {
573            err: serde::de::Error::custom("unexpected notification over HTTP transport"),
574            text: s.to_string(),
575        };
576
577        let should_retry = HttpRateLimitRetryPolicy.should_retry(&err);
578        assert!(should_retry);
579    }
580}