vss_client/util/retry.rs
1use rand::Rng;
2use std::error::Error;
3use std::future::Future;
4use std::marker::PhantomData;
5use std::time::Duration;
6
7/// A function that performs and retries the given operation according to a retry policy.
8///
9/// **Caution**: A retry policy without the number of attempts capped by [`MaxAttemptsRetryPolicy`]
10/// decorator will result in infinite retries.
11///
12/// **Example**
13/// ```rust
14/// # use std::time::Duration;
15/// # use vss_client::error::VssError;
16/// # use vss_client::util::retry::{ExponentialBackoffRetryPolicy, retry, RetryPolicy};
17/// #
18/// # async fn operation() -> Result<i32, VssError> {
19/// # tokio::time::sleep(Duration::from_millis(10)).await;
20/// # Ok(42)
21/// # }
22/// #
23/// let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(100))
24/// .with_max_attempts(5)
25/// .with_max_total_delay(Duration::from_secs(2))
26/// .with_max_jitter(Duration::from_millis(30))
27/// .skip_retry_on_error(|e| matches!(e, VssError::InvalidRequestError(..)));
28///
29/// let result = retry(operation, &retry_policy);
30///```
31///
32/// To use a retry policy as a member in a [`Send`] & [`Sync`] safe struct which needs to have known
33/// size at compile time, we can specify its concrete type as follows:
34/// ```
35/// # use std::time::Duration;
36/// # use vss_client::error::VssError;
37/// # use vss_client::util::retry::{ExponentialBackoffRetryPolicy, FilteredRetryPolicy, retry, RetryPolicy};
38///
39/// type VssRetryPolicy = FilteredRetryPolicy<ExponentialBackoffRetryPolicy<VssError>, Box<dyn 'static + Send + Sync + Fn(&VssError) -> bool>>;
40///
41/// struct SomeStruct {
42/// retry_policy: VssRetryPolicy,
43/// }
44///
45/// impl SomeStruct {
46/// fn new() -> Self {
47/// let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(100))
48/// .skip_retry_on_error(Box::new(|e: &VssError| { matches!( e, VssError::NoSuchKeyError(..)) }) as _);
49/// Self { retry_policy }
50/// }
51/// }
52/// ```
53pub async fn retry<R, F, Fut, T, E>(mut operation: F, retry_policy: &R) -> Result<T, E>
54where
55 R: RetryPolicy<E = E>,
56 F: FnMut() -> Fut,
57 Fut: Future<Output = Result<T, E>>,
58 E: Error,
59{
60 let mut attempts_made = 0;
61 let mut accumulated_delay = Duration::ZERO;
62 loop {
63 match operation().await {
64 Ok(result) => return Ok(result),
65 Err(err) => {
66 attempts_made += 1;
67 if let Some(delay) = retry_policy.next_delay(&RetryContext {
68 attempts_made,
69 accumulated_delay,
70 error: &err,
71 }) {
72 tokio::time::sleep(delay).await;
73 accumulated_delay += delay;
74 } else {
75 return Err(err);
76 }
77 },
78 }
79 }
80}
81
82/// Provides the logic for how and when to perform retries.
83pub trait RetryPolicy: Sized {
84 /// The error type returned by the `operation` in `retry`.
85 type E: Error;
86
87 /// Returns the duration to wait before trying the next attempt.
88 /// `context` represents the context of a retry operation.
89 ///
90 /// If `None` is returned then no further retry attempt is made.
91 fn next_delay(&self, context: &RetryContext<Self::E>) -> Option<Duration>;
92
93 /// Returns a new `RetryPolicy` that respects the given maximum attempts.
94 fn with_max_attempts(self, max_attempts: u32) -> MaxAttemptsRetryPolicy<Self> {
95 MaxAttemptsRetryPolicy { inner_policy: self, max_attempts }
96 }
97
98 /// Returns a new `RetryPolicy` that respects the given total delay.
99 fn with_max_total_delay(self, max_total_delay: Duration) -> MaxTotalDelayRetryPolicy<Self> {
100 MaxTotalDelayRetryPolicy { inner_policy: self, max_total_delay }
101 }
102
103 /// Returns a new `RetryPolicy` that adds jitter(random delay) to underlying policy.
104 fn with_max_jitter(self, max_jitter: Duration) -> JitteredRetryPolicy<Self> {
105 JitteredRetryPolicy { inner_policy: self, max_jitter }
106 }
107
108 /// Skips retrying on errors that evaluate to `true` after applying `function`.
109 fn skip_retry_on_error<F>(self, function: F) -> FilteredRetryPolicy<Self, F>
110 where
111 F: 'static + Fn(&Self::E) -> bool,
112 {
113 FilteredRetryPolicy { inner_policy: self, function }
114 }
115}
116
117/// Represents the context of a retry operation.
118///
119/// The context holds key information about the retry operation
120/// such as how many attempts have been made until now, the accumulated
121/// delay between retries, and the error that triggered the retry.
122pub struct RetryContext<'a, E: Error> {
123 /// The number attempts made until now, before attempting the next retry.
124 attempts_made: u32,
125
126 /// The amount of artificial delay we have already waited in between previous
127 /// attempts. Does not include the time taken to execute the operation.
128 accumulated_delay: Duration,
129
130 /// The error encountered in the previous attempt.
131 error: &'a E,
132}
133
134/// The exponential backoff strategy is a retry approach that doubles the delay between retries.
135/// A combined exponential backoff and jitter strategy is recommended that is ["Exponential Backoff and Jitter"](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/).
136/// This is helpful to avoid [Thundering Herd Problem](https://en.wikipedia.org/wiki/Thundering_herd_problem).
137pub struct ExponentialBackoffRetryPolicy<E> {
138 /// The base delay duration for the backoff algorithm. First retry is `base_delay` after first attempt.
139 base_delay: Duration,
140 phantom: PhantomData<E>,
141}
142
143impl<E: Error> ExponentialBackoffRetryPolicy<E> {
144 /// Constructs a new instance using `base_delay`.
145 ///
146 /// `base_delay` is the base delay duration for the backoff algorithm. First retry is `base_delay`
147 /// after first attempt.
148 pub fn new(base_delay: Duration) -> ExponentialBackoffRetryPolicy<E> {
149 Self { base_delay, phantom: PhantomData }
150 }
151}
152
153impl<E: Error> RetryPolicy for ExponentialBackoffRetryPolicy<E> {
154 type E = E;
155 fn next_delay(&self, context: &RetryContext<Self::E>) -> Option<Duration> {
156 let backoff_factor = 2_u32.pow(context.attempts_made) - 1;
157 let delay = self.base_delay * backoff_factor;
158 Some(delay)
159 }
160}
161
162/// Decorates the given `RetryPolicy` to respect the given maximum attempts.
163pub struct MaxAttemptsRetryPolicy<T: RetryPolicy> {
164 /// The underlying retry policy to use.
165 inner_policy: T,
166 /// The maximum number of attempts to retry.
167 max_attempts: u32,
168}
169
170impl<T: RetryPolicy> RetryPolicy for MaxAttemptsRetryPolicy<T> {
171 type E = T::E;
172 fn next_delay(&self, context: &RetryContext<Self::E>) -> Option<Duration> {
173 if self.max_attempts == context.attempts_made {
174 None
175 } else {
176 self.inner_policy.next_delay(context)
177 }
178 }
179}
180
181/// Decorates the given `RetryPolicy` to respect the given maximum total delay.
182pub struct MaxTotalDelayRetryPolicy<T: RetryPolicy> {
183 /// The underlying retry policy to use.
184 inner_policy: T,
185 /// The maximum accumulated delay that will be allowed over all attempts.
186 max_total_delay: Duration,
187}
188
189impl<T: RetryPolicy> RetryPolicy for MaxTotalDelayRetryPolicy<T> {
190 type E = T::E;
191 fn next_delay(&self, context: &RetryContext<Self::E>) -> Option<Duration> {
192 let next_delay = self.inner_policy.next_delay(context);
193 if let Some(next_delay) = next_delay {
194 if self.max_total_delay < context.accumulated_delay + next_delay {
195 return None;
196 }
197 }
198 next_delay
199 }
200}
201
202/// Decorates the given `RetryPolicy` and adds jitter (random delay) to it. This can make retries
203/// more spread out and less likely to all fail at once.
204pub struct JitteredRetryPolicy<T: RetryPolicy> {
205 /// The underlying retry policy to use.
206 inner_policy: T,
207 /// The maximum amount of random jitter to apply to the delay.
208 max_jitter: Duration,
209}
210
211impl<T: RetryPolicy> RetryPolicy for JitteredRetryPolicy<T> {
212 type E = T::E;
213 fn next_delay(&self, context: &RetryContext<Self::E>) -> Option<Duration> {
214 if let Some(base_delay) = self.inner_policy.next_delay(context) {
215 let mut rng = rand::thread_rng();
216 let jitter =
217 Duration::from_micros(rng.gen_range(0..self.max_jitter.as_micros() as u64));
218 Some(base_delay + jitter)
219 } else {
220 None
221 }
222 }
223}
224
225/// Decorates the given `RetryPolicy` by not retrying on errors that match the given function.
226pub struct FilteredRetryPolicy<T: RetryPolicy, F> {
227 inner_policy: T,
228 function: F,
229}
230
231impl<T, F, E> RetryPolicy for FilteredRetryPolicy<T, F>
232where
233 T: RetryPolicy<E = E>,
234 F: Fn(&E) -> bool,
235 E: Error,
236{
237 type E = T::E;
238 fn next_delay(&self, context: &RetryContext<E>) -> Option<Duration> {
239 if (self.function)(&context.error) {
240 None
241 } else {
242 self.inner_policy.next_delay(context)
243 }
244 }
245}