fure/lib.rs
1//! A crate for retrying futures.
2//!
3//! [`Policy`] trait will help you define different retry policies.
4//!
5//! Some builtin policies can be found in [`policies`] module.
6//!
7//! By default this create uses `tokio` timers for [`crate::policies::interval`] and [`crate::policies::backoff`] policies,
8//! but `async-std` is also available as feature `async-std`.
9//! # Examples.
10//! ## Interval retry.
11//! Starts with sending a request, setting up a 1 second timer, and waits for either of them.
12//!
13//! If the timer completes first (it means that the request didn't complete in 1 second) one more request fires.
14//!
15//! If the request completes first and it has an [`Ok`] response it is returned, if request has an [`Err`] response, timer resets and a new request fires.
16//!
17//! At most 4 requests will be fired.
18//! ```
19//! # async fn run() -> Result<(), reqwest::Error> {
20//! use fure::policies::{interval, attempts};
21//! use std::time::Duration;
22//!
23//! let get_body = || async {
24//! reqwest::get("https://www.rust-lang.org")
25//! .await?
26//! .text()
27//! .await
28//! };
29//! let policy = attempts(interval(Duration::from_secs(1)), 3);
30//! let body = fure::retry(get_body, policy).await?;
31//! println!("body = {}", body);
32//! # Ok(())
33//! # }
34//! ```
35//! ## Sequential retry with backoff.
36//! Retries failed requests with an exponential backoff and a jitter.
37//! ```
38//! # async fn run() -> Result<(), reqwest::Error> {
39//! use fure::{policies::{backoff, cond}, backoff::{exponential, jitter}};
40//! use std::time::Duration;
41//!
42//! let get_body = || async {
43//! reqwest::get("https://www.rust-lang.org")
44//! .await?
45//! .text()
46//! .await
47//! };
48//! let exp_backoff = exponential(Duration::from_secs(1), 2, Some(Duration::from_secs(10)))
49//! .map(jitter);
50//! let policy = cond(backoff(exp_backoff), |result| !matches!(result, Some(Ok(_))));
51//! let body = fure::retry(get_body, policy).await?;
52//! println!("body = {}", body);
53//! # Ok(())
54//! # }
55//! ```
56//! ## Implementing your own policy.
57//! It behaves like the interval policy above, but if it hits `TOO_MANY_REQUESTS` it will wait for some seconds before sending next request.
58//! ```
59//! # async fn run() -> Result<(), reqwest::Error> {
60//! use std::{future::{Future, ready}, pin::Pin, time::Duration};
61//! use fure::Policy;
62//! use reqwest::{Error, Response, StatusCode};
63//!
64//! struct RetryPolicy;
65//!
66//! impl Policy<Response, Error> for RetryPolicy {
67//! type ForceRetryFuture = tokio::time::Sleep;
68//!
69//! type RetryFuture = Pin<Box<dyn Future<Output = Self>>>;
70//!
71//! fn force_retry_after(&self) -> Self::ForceRetryFuture {
72//! tokio::time::sleep(Duration::from_millis(100))
73//! }
74//!
75//! fn retry(
76//! self,
77//! result: Option<Result<&Response, &Error>>,
78//! ) -> Option<Self::RetryFuture> {
79//! match result {
80//! Some(Ok(response)) => match response.status() {
81//! StatusCode::OK => None,
82//! StatusCode::TOO_MANY_REQUESTS => {
83//! let retry_after_secs: u64 = response
84//! .headers()
85//! .get("Retry-After")
86//! .and_then(|x| x.to_str().ok()?.parse().ok())
87//! .unwrap_or(1);
88//! Some(Box::pin(async move {
89//! tokio::time::sleep(Duration::from_secs(retry_after_secs)).await;
90//! self
91//! }))
92//! }
93//! _ => Some(Box::pin(ready(self))),
94//! },
95//! _ => Some(Box::pin(ready(self))),
96//! }
97//! }
98//! }
99//!
100//! let get_response = || reqwest::get("https://www.rust-lang.org");
101//! let response = fure::retry(get_response, RetryPolicy).await?;
102//! println!("body = {}", response.text().await?);
103//! # Ok(())
104//! # }
105//! ```
106
107#[cfg(all(feature = "tokio", feature = "async-std"))]
108compile_error!("`tokio` and `async-std` features must not be enabled together");
109
110use std::future::Future;
111
112/// Backoff utilities for [`crate::policies::backoff`] policy.
113pub mod backoff;
114
115/// Some builtin implementations of [`Policy`].
116pub mod policies;
117
118/// Sleep functions for `tokio` and `async-std`.
119#[cfg(any(feature = "tokio", feature = "async-std"))]
120pub mod sleep;
121
122pub use future::Retry;
123
124mod future;
125
126/// Runs futures created by [`CreateFuture`] accorrding to [`Policy`].
127/// ## Simple concurrent policy
128/// Runs at most 4 concurrent futures and waits a successful one.
129/// ```
130/// # async fn run() -> Result<(), reqwest::Error> {
131/// use fure::policies::{concurrent, attempts};
132///
133/// let get_body = || async {
134/// reqwest::get("https://www.rust-lang.org")
135/// .await?
136/// .text()
137/// .await
138/// };
139/// let body = fure::retry(get_body, attempts(concurrent(), 3)).await?;
140/// println!("body = {}", body);
141/// # Ok(())
142/// # }
143/// ```
144pub fn retry<P, T, E, CF>(create_f: CF, policy: P) -> Retry<P, T, E, CF>
145where
146 P: Policy<T, E>,
147 CF: CreateFuture<T, E>,
148{
149 Retry::new(policy, create_f)
150}
151
152/// A trait is used to create futures which then will be run.
153pub trait CreateFuture<T, E> {
154 type Output: Future<Output = Result<T, E>>;
155
156 fn create(&mut self) -> Self::Output;
157}
158
159impl<T, E, F, FN> CreateFuture<T, E> for FN
160where
161 FN: FnMut() -> F,
162 F: Future<Output = Result<T, E>>,
163{
164 type Output = F;
165
166 fn create(&mut self) -> Self::Output {
167 self()
168 }
169}
170
171/// A generic policy is used to determine how a future should be retried.
172pub trait Policy<T, E>: Sized {
173 /// Future type returned by [`Policy::force_retry_after`].
174 type ForceRetryFuture: Future;
175
176 /// Future type returned by [`Policy::retry`].
177 type RetryFuture: Future<Output = Self>;
178
179 /// This method is called right after calling your future and
180 /// if it completes before your future [`Policy::retry`] will be called with [`None`] argument
181 /// without cancelling your future.
182 ///
183 /// If your future completes first, current [`Self::ForceRetryFuture`] will be dropped and this method will be called again if needs.
184 fn force_retry_after(&self) -> Self::ForceRetryFuture;
185
186 /// Checks the policy if a new future should be created and polled using [`CreateFuture`].
187 ///
188 /// If the future should be created return [`Some`] with a new policy, otherwise return [`None`].
189 ///
190 /// The future will be created only after this method resolves the new policy
191 ///
192 /// This method is passed a reference to the future's result or [`None`] if it is called after [`Policy::force_retry_after`].
193 fn retry(self, result: Option<Result<&T, &E>>) -> Option<Self::RetryFuture>;
194}
195
196#[cfg(test)]
197mod tests {
198 use crate::{retry, Policy};
199 use std::future::pending;
200 use std::{
201 future::{ready, Future, Ready},
202 pin::Pin,
203 sync::{Arc, Mutex},
204 };
205
206 #[cfg(all(not(feature = "async-std"), test))]
207 pub fn run_test(f: impl std::future::Future + Send + 'static) {
208 tokio::runtime::Builder::new_current_thread()
209 .enable_time()
210 .build()
211 .unwrap()
212 .block_on(f);
213 }
214
215 #[cfg(all(feature = "async-std", test))]
216 pub fn run_test(f: impl std::future::Future + Send + 'static) {
217 async_std::task::block_on(f);
218 }
219
220 #[cfg(all(not(feature = "async-std"), test))]
221 pub use tokio::task::spawn;
222
223 #[cfg(all(feature = "async-std", test))]
224 pub use async_std::task::spawn;
225
226 #[cfg(all(not(feature = "async-std"), test))]
227 pub use tokio::task::yield_now;
228
229 #[cfg(all(feature = "async-std", test))]
230 pub use async_std::task::yield_now;
231
232 #[test]
233 fn should_drop_previous_delay_after_retry() {
234 struct RetryTest {
235 retry: usize,
236 }
237
238 impl<T, E> Policy<T, E> for RetryTest {
239 type ForceRetryFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
240 type RetryFuture = Ready<Self>;
241
242 fn force_retry_after(&self) -> Self::ForceRetryFuture {
243 if self.retry == 1 {
244 Box::pin(pending())
245 } else {
246 Box::pin(ready(()))
247 }
248 }
249
250 fn retry(self, _result: Option<Result<&T, &E>>) -> Option<Self::RetryFuture> {
251 if self.retry == 5 {
252 return None;
253 } else {
254 Some(ready(Self {
255 retry: self.retry + 1,
256 }))
257 }
258 }
259 }
260 run_test(async {
261 let call_count = Arc::new(Mutex::new(0));
262 let create_fut = || {
263 let call_count = call_count.clone();
264 async move {
265 let call = {
266 let mut mutex_guard = call_count.lock().unwrap();
267 *mutex_guard += 1;
268 *mutex_guard
269 };
270 if call == 2 {
271 crate::tests::yield_now().await;
272 Err(())
273 } else if call == 3 {
274 pending().await
275 } else {
276 Err::<(), ()>(())
277 }
278 }
279 };
280
281 let _ = retry(create_fut, RetryTest { retry: 0 }).await;
282
283 let guard = call_count.lock().unwrap();
284 assert_eq!(*guard, 6);
285 })
286 }
287}