vss_client/util/
retry.rs

1use rand::Rng;
2use std::error::Error;
3use std::future::Future;
4use std::marker::PhantomData;
5use std::time::Duration;
6
7/// A function that performs and retries the given operation according to a retry policy.
8///
9/// **Caution**: A retry policy without the number of attempts capped by [`MaxAttemptsRetryPolicy`]
10/// decorator will result in infinite retries.
11///
12/// **Example**
13/// ```rust
14/// # use std::time::Duration;
15/// # use vss_client::error::VssError;
16/// # use vss_client::util::retry::{ExponentialBackoffRetryPolicy, retry, RetryPolicy};
17/// #
18/// # async fn operation() -> Result<i32, VssError>  {
19/// # 	tokio::time::sleep(Duration::from_millis(10)).await;
20/// # 	Ok(42)
21/// # }
22/// #
23/// let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(100))
24/// 	.with_max_attempts(5)
25/// 	.with_max_total_delay(Duration::from_secs(2))
26/// 	.with_max_jitter(Duration::from_millis(30))
27/// 	.skip_retry_on_error(|e| matches!(e, VssError::InvalidRequestError(..)));
28///
29/// let result = retry(operation, &retry_policy);
30///```
31///
32/// To use a retry policy as a member in a [`Send`] & [`Sync`] safe struct which needs to have known
33/// size at compile time, we can specify its concrete type as follows:
34/// ```
35/// # use std::time::Duration;
36/// # use vss_client::error::VssError;
37/// # use vss_client::util::retry::{ExponentialBackoffRetryPolicy, FilteredRetryPolicy, retry, RetryPolicy};
38///
39/// type VssRetryPolicy = FilteredRetryPolicy<ExponentialBackoffRetryPolicy<VssError>, Box<dyn 'static + Send + Sync + Fn(&VssError) -> bool>>;
40///
41/// struct SomeStruct {
42/// 	retry_policy: VssRetryPolicy,
43/// }
44///
45/// impl SomeStruct {
46/// 	fn new() -> Self {
47/// 		let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(100))
48/// 			.skip_retry_on_error(Box::new(|e: &VssError| { matches!( e, VssError::NoSuchKeyError(..)) }) as _);
49/// 		Self { retry_policy }
50/// 	}
51/// }
52/// ```
53pub async fn retry<R, F, Fut, T, E>(mut operation: F, retry_policy: &R) -> Result<T, E>
54where
55	R: RetryPolicy<E = E>,
56	F: FnMut() -> Fut,
57	Fut: Future<Output = Result<T, E>>,
58	E: Error,
59{
60	let mut attempts_made = 0;
61	let mut accumulated_delay = Duration::ZERO;
62	loop {
63		match operation().await {
64			Ok(result) => return Ok(result),
65			Err(err) => {
66				attempts_made += 1;
67				if let Some(delay) = retry_policy.next_delay(&RetryContext {
68					attempts_made,
69					accumulated_delay,
70					error: &err,
71				}) {
72					tokio::time::sleep(delay).await;
73					accumulated_delay += delay;
74				} else {
75					return Err(err);
76				}
77			},
78		}
79	}
80}
81
82/// Provides the logic for how and when to perform retries.
83pub trait RetryPolicy: Sized {
84	/// The error type returned by the `operation` in `retry`.
85	type E: Error;
86
87	/// Returns the duration to wait before trying the next attempt.
88	/// `context` represents the context of a retry operation.
89	///
90	/// If `None` is returned then no further retry attempt is made.
91	fn next_delay(&self, context: &RetryContext<Self::E>) -> Option<Duration>;
92
93	/// Returns a new `RetryPolicy` that respects the given maximum attempts.
94	fn with_max_attempts(self, max_attempts: u32) -> MaxAttemptsRetryPolicy<Self> {
95		MaxAttemptsRetryPolicy { inner_policy: self, max_attempts }
96	}
97
98	/// Returns a new `RetryPolicy` that respects the given total delay.
99	fn with_max_total_delay(self, max_total_delay: Duration) -> MaxTotalDelayRetryPolicy<Self> {
100		MaxTotalDelayRetryPolicy { inner_policy: self, max_total_delay }
101	}
102
103	/// Returns a new `RetryPolicy` that adds jitter(random delay) to underlying policy.
104	fn with_max_jitter(self, max_jitter: Duration) -> JitteredRetryPolicy<Self> {
105		JitteredRetryPolicy { inner_policy: self, max_jitter }
106	}
107
108	/// Skips retrying on errors that evaluate to `true` after applying `function`.
109	fn skip_retry_on_error<F>(self, function: F) -> FilteredRetryPolicy<Self, F>
110	where
111		F: 'static + Fn(&Self::E) -> bool,
112	{
113		FilteredRetryPolicy { inner_policy: self, function }
114	}
115}
116
117/// Represents the context of a retry operation.
118///
119/// The context holds key information about the retry operation
120/// such as how many attempts have been made until now, the accumulated
121/// delay between retries, and the error that triggered the retry.
122pub struct RetryContext<'a, E: Error> {
123	/// The number attempts made until now, before attempting the next retry.
124	attempts_made: u32,
125
126	/// The amount of artificial delay we have already waited in between previous
127	/// attempts. Does not include the time taken to execute the operation.
128	accumulated_delay: Duration,
129
130	/// The error encountered in the previous attempt.
131	error: &'a E,
132}
133
134/// The exponential backoff strategy is a retry approach that doubles the delay between retries.
135/// A combined exponential backoff and jitter strategy is recommended that is ["Exponential Backoff and Jitter"](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/).
136/// This is helpful to avoid [Thundering Herd Problem](https://en.wikipedia.org/wiki/Thundering_herd_problem).
137pub struct ExponentialBackoffRetryPolicy<E> {
138	/// The base delay duration for the backoff algorithm. First retry is `base_delay` after first attempt.
139	base_delay: Duration,
140	phantom: PhantomData<E>,
141}
142
143impl<E: Error> ExponentialBackoffRetryPolicy<E> {
144	/// Constructs a new instance using `base_delay`.
145	///
146	/// `base_delay` is the base delay duration for the backoff algorithm. First retry is `base_delay`
147	/// after first attempt.
148	pub fn new(base_delay: Duration) -> ExponentialBackoffRetryPolicy<E> {
149		Self { base_delay, phantom: PhantomData }
150	}
151}
152
153impl<E: Error> RetryPolicy for ExponentialBackoffRetryPolicy<E> {
154	type E = E;
155	fn next_delay(&self, context: &RetryContext<Self::E>) -> Option<Duration> {
156		let backoff_factor = 2_u32.pow(context.attempts_made) - 1;
157		let delay = self.base_delay * backoff_factor;
158		Some(delay)
159	}
160}
161
162/// Decorates the given `RetryPolicy` to respect the given maximum attempts.
163pub struct MaxAttemptsRetryPolicy<T: RetryPolicy> {
164	/// The underlying retry policy to use.
165	inner_policy: T,
166	/// The maximum number of attempts to retry.
167	max_attempts: u32,
168}
169
170impl<T: RetryPolicy> RetryPolicy for MaxAttemptsRetryPolicy<T> {
171	type E = T::E;
172	fn next_delay(&self, context: &RetryContext<Self::E>) -> Option<Duration> {
173		if self.max_attempts == context.attempts_made {
174			None
175		} else {
176			self.inner_policy.next_delay(context)
177		}
178	}
179}
180
181/// Decorates the given `RetryPolicy` to respect the given maximum total delay.
182pub struct MaxTotalDelayRetryPolicy<T: RetryPolicy> {
183	/// The underlying retry policy to use.
184	inner_policy: T,
185	/// The maximum accumulated delay that will be allowed over all attempts.
186	max_total_delay: Duration,
187}
188
189impl<T: RetryPolicy> RetryPolicy for MaxTotalDelayRetryPolicy<T> {
190	type E = T::E;
191	fn next_delay(&self, context: &RetryContext<Self::E>) -> Option<Duration> {
192		let next_delay = self.inner_policy.next_delay(context);
193		if let Some(next_delay) = next_delay {
194			if self.max_total_delay < context.accumulated_delay + next_delay {
195				return None;
196			}
197		}
198		next_delay
199	}
200}
201
202/// Decorates the given `RetryPolicy` and adds jitter (random delay) to it. This can make retries
203/// more spread out and less likely to all fail at once.
204pub struct JitteredRetryPolicy<T: RetryPolicy> {
205	/// The underlying retry policy to use.
206	inner_policy: T,
207	/// The maximum amount of random jitter to apply to the delay.
208	max_jitter: Duration,
209}
210
211impl<T: RetryPolicy> RetryPolicy for JitteredRetryPolicy<T> {
212	type E = T::E;
213	fn next_delay(&self, context: &RetryContext<Self::E>) -> Option<Duration> {
214		if let Some(base_delay) = self.inner_policy.next_delay(context) {
215			let mut rng = rand::thread_rng();
216			let jitter =
217				Duration::from_micros(rng.gen_range(0..self.max_jitter.as_micros() as u64));
218			Some(base_delay + jitter)
219		} else {
220			None
221		}
222	}
223}
224
225/// Decorates the given `RetryPolicy` by not retrying on errors that match the given function.
226pub struct FilteredRetryPolicy<T: RetryPolicy, F> {
227	inner_policy: T,
228	function: F,
229}
230
231impl<T, F, E> RetryPolicy for FilteredRetryPolicy<T, F>
232where
233	T: RetryPolicy<E = E>,
234	F: Fn(&E) -> bool,
235	E: Error,
236{
237	type E = T::E;
238	fn next_delay(&self, context: &RetryContext<E>) -> Option<Duration> {
239		if (self.function)(&context.error) {
240			None
241		} else {
242			self.inner_policy.next_delay(context)
243		}
244	}
245}