backoff_futures/
lib.rs

1//! An add-on to [`std::future::Future`] that makes it easy to introduce a retry mechanism
2//! with a backoff for functions that produce failible futures,
3//! i.e. futures where the `Output` type is some `Result<T, backoff::Error<E>>`.
4//! The `backoff::Error` wrapper is necessary so as to distinguish errors that are considered
5//! *transient*, and thus make it likely that a future attempt at producing and blocking on
6//! the same future could just as well succeed (e.g. the HTTP 503 Service Unavailable error),
7//! and errors that are considered *permanent*, where no future attempts are presumed to have
8//! a chance to succeed (e.g. the HTTP 404 Not Found error).
9//!
10//! The extension trait integrates with the `backoff` crate and expects a [`backoff::backoff::Backoff`]
11//! value to describe the various properties of the retry & backoff mechanism to be used.
12//!
13//! ```rust
14//! fn isahc_error_to_backoff(err: isahc::Error) -> backoff::Error<isahc::Error> {
15//!     match err {
16//!         isahc::Error::Aborted | isahc::Error::Io(_) | isahc::Error::Timeout =>
17//!             backoff::Error::Transient(err),
18//!         _ =>
19//!             backoff::Error::Permanent(err)
20//!     }
21//! }
22//!
23//! async fn get_example_contents() -> Result<String, backoff::Error<isahc::Error>> {
24//!     use isahc::ResponseExt;
25//!
26//!     let mut response = isahc::get_async("https://example.org")
27//!         .await
28//!         .map_err(isahc_error_to_backoff)?;
29//!
30//!     response
31//!         .text_async()
32//!         .await
33//!         .map_err(|err: std::io::Error| backoff::Error::Transient(isahc::Error::Io(err)))
34//! }
35//!
36//! async fn get_example_contents_with_retry() -> Result<String, isahc::Error> {
37//!     use backoff_futures::BackoffExt;
38//!
39//!     let mut backoff = backoff::ExponentialBackoff::default();
40//!     get_example_contents.with_backoff(&mut backoff)
41//!         .await
42//!         .map_err(|err| match err {
43//!             backoff::Error::Transient(err) | backoff::Error::Permanent(err) => err
44//!         })
45//! }
46//! ```
47//!
48//! See [`BackoffExt::with_backoff`] for more details.
49
50#![allow(clippy::type_repetition_in_bounds)]
51
52use backoff::backoff::Backoff;
53use backoff::Error;
54use std::future::Future;
55use std::time::Duration;
56
57struct BackoffFutureBuilder<'b, B, F, Fut, T, E>
58where
59    B: Backoff,
60    F: FnMut() -> Fut,
61    Fut: Future<Output = Result<T, Error<E>>>,
62{
63    backoff: &'b mut B,
64    f: F,
65}
66
67impl<'b, B, F, Fut, T, E> BackoffFutureBuilder<'b, B, F, Fut, T, E>
68where
69    B: Backoff,
70    F: FnMut() -> Fut,
71    Fut: Future<Output = Result<T, Error<E>>>,
72{
73    async fn fut<N: FnMut(&Error<E>, Duration)>(mut self, mut notify: N) -> Result<T, Error<E>> {
74        loop {
75            let work_result = (self.f)().await;
76            match work_result {
77                Ok(_) | Err(Error::Permanent(_)) => return work_result,
78                Err(err @ Error::Transient(_)) => {
79                    if let Some(backoff_duration) = self.backoff.next_backoff() {
80                        notify(&err, backoff_duration);
81                        tokio::time::delay_for(backoff_duration).await
82                    } else {
83                        return Err(err);
84                    }
85                }
86            }
87        }
88    }
89}
90
91#[async_trait::async_trait(?Send)]
92pub trait BackoffExt<T, E, Fut, F> {
93    #[deprecated(since = "0.3.1", note = "Use the built-in `backoff::future` async support")]
94    /// Returns a future that, when polled, will first ask `self` for a new future (with an output
95    /// type `Result<T, backoff::Error<_>>` to produce the expected result.
96    ///
97    /// If the underlying future is ready with an `Err` value, the nature of the error
98    /// (permanent/transient) will determine whether polling the future will employ the provided
99    /// `backoff` strategy and will result in the work being retried.
100    ///
101    /// Specifically, [`backoff::Error::Permanent`] errors will be returned immediately.
102    /// [`backoff::Error::Transient`] errors will, depending on the particular [`backoff::backoff::Backoff`],
103    /// result in a retry attempt, most likely with a delay.
104    ///
105    /// If the underlying future is ready with an [`std::result::Result::Ok`] value, it will be returned immediately.
106    async fn with_backoff<B>(self, backoff: &mut B) -> Result<T, Error<E>>
107    where
108        B: Backoff,
109        T: 'async_trait,
110        E: 'async_trait,
111        Fut: 'async_trait;
112    
113    #[deprecated(since = "0.3.1", note = "Use the built-in `backoff::future` async support")]
114    /// Same as [`BackoffExt::with_backoff`] but takes an extra `notify` closure that will be called every time
115    /// a new backoff is employed on transient errors. The closure takes the new delay duration as an argument.
116    async fn with_backoff_notify<B, N>(self, backoff: &mut B, notify: N) -> Result<T, Error<E>>
117    where
118        B: Backoff,
119        N: FnMut(&Error<E>, Duration),
120        T: 'async_trait,
121        E: 'async_trait,
122        Fut: 'async_trait;
123}
124
125#[async_trait::async_trait(?Send)]
126impl<T, E, Fut, F> BackoffExt<T, E, Fut, F> for F
127     where
128        F: FnMut() -> Fut,
129        Fut: Future<Output = Result<T, backoff::Error<E>>> {
130
131    async fn with_backoff<B>(self, backoff: &mut B) -> Result<T, Error<E>>
132    where
133        B: Backoff,
134        T: 'async_trait,
135        E: 'async_trait,
136        Fut: 'async_trait
137    {
138        let backoff_struct = BackoffFutureBuilder { backoff, f: self };
139        backoff_struct.fut(|_, _| {}).await
140    }
141
142    async fn with_backoff_notify<B, N>(self, backoff: &mut B, notify: N) -> Result<T, Error<E>>
143    where
144        B: Backoff,
145        N: FnMut(&Error<E>, Duration),
146        T: 'async_trait,
147        E: 'async_trait,
148        Fut: 'async_trait
149    {
150        let backoff_struct = BackoffFutureBuilder { backoff, f: self };
151        backoff_struct.fut(notify).await
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::BackoffExt;
158    use futures::Future;
159
160    #[test]
161    fn test_when_future_succeeds() {
162        fn do_work() -> impl Future<Output = Result<u32, backoff::Error<()>>> {
163            futures::future::ready(Ok(123))
164        }
165
166        let mut backoff = backoff::ExponentialBackoff::default();
167        let result: Result<u32, backoff::Error<()>> =
168            futures::executor::block_on(do_work.with_backoff(&mut backoff));
169        assert_eq!(result.ok(), Some(123));
170    }
171
172    #[test]
173    fn test_with_closure_when_future_succeeds() {
174        let do_work = || {
175            futures::future::lazy(|_| Ok(123))
176        };
177
178        let mut backoff = backoff::ExponentialBackoff::default();
179        let result: Result<u32, backoff::Error<()>> =
180            futures::executor::block_on(do_work.with_backoff(&mut backoff));
181        assert_eq!(result.ok(), Some(123));
182    }
183
184    #[test]
185    fn test_with_closure_when_future_fails_with_permanent_error() {
186        use matches::assert_matches;
187
188        let do_work = || {
189            let result = Err(backoff::Error::Permanent(()));
190            futures::future::ready(result)
191        };
192
193        let mut backoff = backoff::ExponentialBackoff::default();
194        let result: Result<u32, backoff::Error<()>> =
195            futures::executor::block_on(do_work.with_backoff(&mut backoff));
196        assert_matches!(result.err(), Some(backoff::Error::Permanent(_)));
197    }
198
199    #[test]
200    fn test_with_async_fn_when_future_succeeds() {
201        async fn do_work() -> Result<u32, backoff::Error<()>> {
202            Ok(123)
203        }
204
205        let mut backoff = backoff::ExponentialBackoff::default();
206        let result: Result<u32, backoff::Error<()>> =
207            futures::executor::block_on(do_work.with_backoff(&mut backoff));
208        assert_eq!(result.ok(), Some(123));
209    }
210
211    #[test]
212    fn test_with_async_fn_when_future_fails_for_some_time() {
213        static mut CALL_COUNTER: usize = 0;
214        const CALLS_TO_SUCCESS: usize = 5;
215
216        use std::time::Duration;
217
218        async fn do_work() -> Result<u32, backoff::Error<()>> {
219            unsafe {
220                CALL_COUNTER += 1;
221                if CALL_COUNTER == CALLS_TO_SUCCESS {
222                    Ok(123)
223                } else {
224                    Err(backoff::Error::Transient(()))
225                }
226            }
227        };
228
229        let mut backoff = backoff::ExponentialBackoff::default();
230        backoff.current_interval = Duration::from_millis(1);
231        backoff.initial_interval = Duration::from_millis(1);
232
233        let mut notify_counter = 0;
234
235        let mut runtime = tokio::runtime::Runtime::new()
236            .expect("tokio runtime creation");
237
238        let result = runtime.block_on(do_work.with_backoff_notify(&mut backoff, |e, d| {
239            notify_counter += 1;
240            println!("Error {:?}, waiting for: {}", e, d.as_millis());
241        }));
242
243        unsafe {
244            assert_eq!(CALL_COUNTER, CALLS_TO_SUCCESS);
245        }
246        assert_eq!(CALLS_TO_SUCCESS, notify_counter + 1);
247        assert_eq!(result.ok(), Some(123));
248    }
249}