fure/
lib.rs

1//! A crate for retrying futures.
2//!
3//! [`Policy`] trait will help you define different retry policies.
4//!
5//! Some builtin policies can be found in [`policies`] module.
6//!
7//! By default this create uses `tokio` timers for [`crate::policies::interval`] and [`crate::policies::backoff`] policies,
8//! but `async-std` is also available as feature `async-std`.
9//! # Examples.
10//! ## Interval retry.
11//! Starts with sending a request, setting up a 1 second timer, and waits for either of them.
12//!
13//! If the timer completes first (it means that the request didn't complete in 1 second) one more request fires.
14//!
15//! If the request completes first and it has an [`Ok`] response it is returned, if request has an [`Err`] response, timer resets and a new request fires.
16//!
17//! At most 4 requests will be fired.
18//! ```
19//! # async fn run() -> Result<(), reqwest::Error> {
20//! use fure::policies::{interval, attempts};
21//! use std::time::Duration;
22//!
23//! let get_body = || async {
24//!     reqwest::get("https://www.rust-lang.org")
25//!         .await?
26//!         .text()
27//!         .await
28//! };
29//! let policy = attempts(interval(Duration::from_secs(1)), 3);
30//! let body = fure::retry(get_body, policy).await?;
31//! println!("body = {}", body);
32//! # Ok(())
33//! # }
34//! ```
35//! ## Sequential retry with backoff.
36//! Retries failed requests with an exponential backoff and a jitter.
37//! ```
38//! # async fn run() -> Result<(), reqwest::Error> {
39//! use fure::{policies::{backoff, cond}, backoff::{exponential, jitter}};
40//! use std::time::Duration;
41//!
42//! let get_body = || async {
43//!     reqwest::get("https://www.rust-lang.org")
44//!         .await?
45//!         .text()
46//!         .await
47//! };
48//! let exp_backoff = exponential(Duration::from_secs(1), 2, Some(Duration::from_secs(10)))
49//!     .map(jitter);
50//! let policy = cond(backoff(exp_backoff), |result| !matches!(result, Some(Ok(_))));
51//! let body = fure::retry(get_body, policy).await?;
52//! println!("body = {}", body);
53//! # Ok(())
54//! # }
55//! ```
56//! ## Implementing your own policy.
57//! It behaves like the interval policy above, but if it hits `TOO_MANY_REQUESTS` it will wait for some seconds before sending next request.
58//! ```
59//! # async fn run() -> Result<(), reqwest::Error> {
60//! use std::{future::{Future, ready}, pin::Pin, time::Duration};
61//! use fure::Policy;
62//! use reqwest::{Error, Response, StatusCode};
63//!
64//! struct RetryPolicy;
65//!
66//! impl Policy<Response, Error> for RetryPolicy {
67//!     type ForceRetryFuture = tokio::time::Sleep;
68//!
69//!     type RetryFuture = Pin<Box<dyn Future<Output = Self>>>;
70//!
71//!     fn force_retry_after(&self) -> Self::ForceRetryFuture {
72//!         tokio::time::sleep(Duration::from_millis(100))
73//!     }
74//!
75//!     fn retry(
76//!         self,
77//!         result: Option<Result<&Response, &Error>>,
78//!     ) -> Option<Self::RetryFuture> {
79//!         match result {
80//!             Some(Ok(response)) => match response.status() {
81//!                 StatusCode::OK => None,
82//!                 StatusCode::TOO_MANY_REQUESTS => {
83//!                     let retry_after_secs: u64 = response
84//!                         .headers()
85//!                         .get("Retry-After")
86//!                         .and_then(|x| x.to_str().ok()?.parse().ok())
87//!                         .unwrap_or(1);
88//!                     Some(Box::pin(async move {
89//!                         tokio::time::sleep(Duration::from_secs(retry_after_secs)).await;
90//!                         self
91//!                     }))
92//!                 }
93//!                 _ => Some(Box::pin(ready(self))),
94//!             },
95//!             _ => Some(Box::pin(ready(self))),
96//!         }
97//!     }
98//! }
99//!
100//! let get_response = || reqwest::get("https://www.rust-lang.org");
101//! let response = fure::retry(get_response, RetryPolicy).await?;
102//! println!("body = {}", response.text().await?);
103//! # Ok(())
104//! # }
105//! ```
106
107#[cfg(all(feature = "tokio", feature = "async-std"))]
108compile_error!("`tokio` and `async-std` features must not be enabled together");
109
110use std::future::Future;
111
112/// Backoff utilities for [`crate::policies::backoff`] policy.
113pub mod backoff;
114
115/// Some builtin implementations of [`Policy`].
116pub mod policies;
117
118/// Sleep functions for `tokio` and `async-std`.
119#[cfg(any(feature = "tokio", feature = "async-std"))]
120pub mod sleep;
121
122pub use future::Retry;
123
124mod future;
125
126/// Runs futures created by [`CreateFuture`] accorrding to [`Policy`].
127/// ## Simple concurrent policy
128/// Runs at most 4 concurrent futures and waits a successful one.
129/// ```
130/// # async fn run() -> Result<(), reqwest::Error> {
131/// use fure::policies::{concurrent, attempts};
132///
133/// let get_body = || async {
134///     reqwest::get("https://www.rust-lang.org")
135///         .await?
136///         .text()
137///         .await
138/// };
139/// let body = fure::retry(get_body, attempts(concurrent(), 3)).await?;
140/// println!("body = {}", body);
141/// # Ok(())
142/// # }
143/// ```
144pub fn retry<P, T, E, CF>(create_f: CF, policy: P) -> Retry<P, T, E, CF>
145where
146    P: Policy<T, E>,
147    CF: CreateFuture<T, E>,
148{
149    Retry::new(policy, create_f)
150}
151
152/// A trait is used to create futures which then will be run.
153pub trait CreateFuture<T, E> {
154    type Output: Future<Output = Result<T, E>>;
155
156    fn create(&mut self) -> Self::Output;
157}
158
159impl<T, E, F, FN> CreateFuture<T, E> for FN
160where
161    FN: FnMut() -> F,
162    F: Future<Output = Result<T, E>>,
163{
164    type Output = F;
165
166    fn create(&mut self) -> Self::Output {
167        self()
168    }
169}
170
171/// A generic policy is used to determine how a future should be retried.
172pub trait Policy<T, E>: Sized {
173    /// Future type returned by [`Policy::force_retry_after`].
174    type ForceRetryFuture: Future;
175
176    /// Future type returned by [`Policy::retry`].
177    type RetryFuture: Future<Output = Self>;
178
179    /// This method is called right after calling your future and
180    /// if it completes before your future [`Policy::retry`] will be called with [`None`] argument
181    /// without cancelling your future.
182    ///
183    /// If your future completes first, current [`Self::ForceRetryFuture`] will be dropped and this method will be called again if needs.
184    fn force_retry_after(&self) -> Self::ForceRetryFuture;
185
186    /// Checks the policy if a new future should be created and polled using [`CreateFuture`].
187    ///
188    /// If the future should be created return [`Some`] with a new policy, otherwise return [`None`].
189    ///
190    /// The future will be created only after this method resolves the new policy
191    ///
192    /// This method is passed a reference to the future's result or [`None`] if it is called after [`Policy::force_retry_after`].
193    fn retry(self, result: Option<Result<&T, &E>>) -> Option<Self::RetryFuture>;
194}
195
196#[cfg(test)]
197mod tests {
198    use crate::{retry, Policy};
199    use std::future::pending;
200    use std::{
201        future::{ready, Future, Ready},
202        pin::Pin,
203        sync::{Arc, Mutex},
204    };
205
206    #[cfg(all(not(feature = "async-std"), test))]
207    pub fn run_test(f: impl std::future::Future + Send + 'static) {
208        tokio::runtime::Builder::new_current_thread()
209            .enable_time()
210            .build()
211            .unwrap()
212            .block_on(f);
213    }
214
215    #[cfg(all(feature = "async-std", test))]
216    pub fn run_test(f: impl std::future::Future + Send + 'static) {
217        async_std::task::block_on(f);
218    }
219
220    #[cfg(all(not(feature = "async-std"), test))]
221    pub use tokio::task::spawn;
222
223    #[cfg(all(feature = "async-std", test))]
224    pub use async_std::task::spawn;
225
226    #[cfg(all(not(feature = "async-std"), test))]
227    pub use tokio::task::yield_now;
228
229    #[cfg(all(feature = "async-std", test))]
230    pub use async_std::task::yield_now;
231
232    #[test]
233    fn should_drop_previous_delay_after_retry() {
234        struct RetryTest {
235            retry: usize,
236        }
237
238        impl<T, E> Policy<T, E> for RetryTest {
239            type ForceRetryFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
240            type RetryFuture = Ready<Self>;
241
242            fn force_retry_after(&self) -> Self::ForceRetryFuture {
243                if self.retry == 1 {
244                    Box::pin(pending())
245                } else {
246                    Box::pin(ready(()))
247                }
248            }
249
250            fn retry(self, _result: Option<Result<&T, &E>>) -> Option<Self::RetryFuture> {
251                if self.retry == 5 {
252                    return None;
253                } else {
254                    Some(ready(Self {
255                        retry: self.retry + 1,
256                    }))
257                }
258            }
259        }
260        run_test(async {
261            let call_count = Arc::new(Mutex::new(0));
262            let create_fut = || {
263                let call_count = call_count.clone();
264                async move {
265                    let call = {
266                        let mut mutex_guard = call_count.lock().unwrap();
267                        *mutex_guard += 1;
268                        *mutex_guard
269                    };
270                    if call == 2 {
271                        crate::tests::yield_now().await;
272                        Err(())
273                    } else if call == 3 {
274                        pending().await
275                    } else {
276                        Err::<(), ()>(())
277                    }
278                }
279            };
280
281            let _ = retry(create_fut, RetryTest { retry: 0 }).await;
282
283            let guard = call_count.lock().unwrap();
284            assert_eq!(*guard, 6);
285        })
286    }
287}