maybe_backoff/
future.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5    time::Duration,
6};
7
8use futures_core::ready;
9use pin_project_lite::pin_project;
10
11use crate::{backoff::Backoff, error::Error};
12
13use crate::retry::{NoopNotify, Notify};
14
15pub trait Sleeper {
16    type Sleep: Future<Output = ()> + Send + 'static;
17    fn sleep(&self, dur: Duration) -> Self::Sleep;
18}
19
20/// Retries given `operation` according to the [`Backoff`] policy
21/// [`Backoff`] is reset before it is used.
22/// The returned future can be spawned onto a compatible runtime.
23///
24/// Only available through the `tokio` and `async-std` feature flags.
25///
26/// # Example
27///
28/// ```rust
29/// use maybe_backoff::ExponentialBackoff;
30///
31/// async fn f() -> Result<(), maybe_backoff::Error<&'static str>> {
32///     // Business logic...
33///     Err(maybe_backoff::Error::Permanent("error"))
34/// }
35///
36/// # async fn go() {
37/// maybe_backoff::future::retry(ExponentialBackoff::default(), f).await.err().unwrap();
38/// # }
39/// # fn main() { futures_executor::block_on(go()); }
40/// ```
41#[cfg(any(feature = "tokio", feature = "async-std"))]
42pub fn retry<I, E, Fn, Fut, B>(
43    backoff: B,
44    operation: Fn,
45) -> Retry<impl Sleeper, B, NoopNotify, Fn, Fut>
46where
47    B: Backoff,
48    Fn: FnMut() -> Fut,
49    Fut: Future<Output = Result<I, Error<E>>>,
50{
51    retry_notify(backoff, operation, NoopNotify)
52}
53
54/// Retries given `operation` according to the [`Backoff`] policy.
55/// Calls `notify` on failed attempts (in case of [`Error::Transient`]).
56/// [`Backoff`] is reset before it is used.
57/// The returned future can be spawned onto a compatible runtime.
58///
59/// Only available through the `tokio` and `async-std` feature flags.
60///
61/// # Async `notify`
62///
63/// `notify` can be neither `async fn` or [`Future`]. If you need to perform some async
64/// operations inside `notify`, consider using your runtimes task-spawning functionality.
65///
66/// The reason behind this is that [`Retry`] future cannot be responsible for polling
67/// `notify` future, because can easily be dropped _before_ `notify` is completed.
68/// So, considering the fact that most of the time no async operations are required in
69/// `notify`, it's up to the caller to decide how async `notify` should be performed.
70///
71/// # Example
72///
73/// ```rust
74/// use maybe_backoff::backoff::Stop;
75///
76/// async fn f() -> Result<(), maybe_backoff::Error<&'static str>> {
77///     // Business logic...
78///     Err(maybe_backoff::Error::transient("error"))
79/// }
80///
81/// # async fn go() {
82/// let err = maybe_backoff::future::retry_notify(Stop {}, f, |e, dur| {
83///     println!("Error happened at {:?}: {}", dur, e)
84/// })
85/// .await
86/// .err()
87/// .unwrap();
88/// assert_eq!(err, "error");
89/// # }
90/// # fn main() { futures_executor::block_on(go()); }
91/// ```
92#[cfg(any(feature = "tokio", feature = "async-std"))]
93pub fn retry_notify<I, E, Fn, Fut, B, N>(
94    mut backoff: B,
95    operation: Fn,
96    notify: N,
97) -> Retry<impl Sleeper, B, N, Fn, Fut>
98where
99    B: Backoff,
100    Fn: FnMut() -> Fut,
101    Fut: Future<Output = Result<I, Error<E>>>,
102    N: Notify<E>,
103{
104    backoff.reset();
105    Retry::new(rt_sleeper(), backoff, notify, operation)
106}
107
108pin_project! {
109    /// Retry implementation.
110    pub struct Retry<S: Sleeper, B, N, Fn, Fut> {
111        // The [`Sleeper`] that we generate the `delay` futures from.
112        sleeper: S,
113
114        // [`Backoff`] implementation to count next [`Retry::delay`] with.
115        backoff: B,
116
117        // [`Future`] which delays execution before next [`Retry::operation`] invocation.
118        #[pin]
119        delay: OptionPinned<S::Sleep>,
120
121        // Operation to be retried. It must return [`Future`].
122        operation: Fn,
123
124        // [`Future`] being resolved once [`Retry::operation`] is completed.
125        #[pin]
126        fut: Fut,
127
128        // [`Notify`] implementation to track [`Retry`] ticks.
129        notify: N,
130    }
131}
132
133impl<S, B, N, Fn, Fut, I, E> Retry<S, B, N, Fn, Fut>
134where
135    S: Sleeper,
136    Fn: FnMut() -> Fut,
137    Fut: Future<Output = Result<I, Error<E>>>,
138{
139    pub fn new(sleeper: S, backoff: B, notify: N, mut operation: Fn) -> Self {
140        let fut = operation();
141        Retry {
142            sleeper,
143            backoff,
144            delay: OptionPinned::None,
145            operation,
146            fut,
147            notify,
148        }
149    }
150}
151
152pin_project! {
153    #[project = OptionProj]
154    enum OptionPinned<T> {
155        Some {
156            #[pin]
157            inner: T,
158        },
159        None,
160    }
161}
162
163impl<S, B, N, Fn, Fut, I, E> Future for Retry<S, B, N, Fn, Fut>
164where
165    S: Sleeper,
166    B: Backoff,
167    N: Notify<E>,
168    Fn: FnMut() -> Fut,
169    Fut: Future<Output = Result<I, Error<E>>>,
170{
171    type Output = Result<I, E>;
172
173    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
174        let mut this = self.project();
175
176        loop {
177            if let OptionProj::Some { inner: delay } = this.delay.as_mut().project() {
178                ready!(delay.poll(cx));
179                this.delay.set(OptionPinned::None);
180            }
181
182            match ready!(this.fut.as_mut().poll(cx)) {
183                Ok(v) => return Poll::Ready(Ok(v)),
184                Err(Error::Permanent(e)) => return Poll::Ready(Err(e)),
185                Err(Error::Transient { err, retry_after }) => {
186                    match retry_after.or_else(|| this.backoff.next_backoff()) {
187                        Some(duration) => {
188                            this.notify.notify(err, duration);
189                            this.delay.set(OptionPinned::Some {
190                                inner: this.sleeper.sleep(duration),
191                            });
192                            this.fut.set((this.operation)());
193                        }
194                        None => return Poll::Ready(Err(err)),
195                    }
196                }
197            }
198        }
199    }
200}
201
202#[cfg(all(feature = "tokio", feature = "async-std"))]
203compile_error!("Feature \"tokio\" and \"async-std\" cannot be enabled at the same time");
204
205#[cfg(feature = "async-std")]
206fn rt_sleeper() -> impl Sleeper {
207    AsyncStdSleeper
208}
209
210#[cfg(feature = "tokio")]
211fn rt_sleeper() -> impl Sleeper {
212    TokioSleeper
213}
214
215#[cfg(feature = "tokio")]
216#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
217struct TokioSleeper;
218#[cfg(feature = "tokio")]
219#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
220impl Sleeper for TokioSleeper {
221    type Sleep = ::tokio_1::time::Sleep;
222    fn sleep(&self, dur: Duration) -> Self::Sleep {
223        ::tokio_1::time::sleep(dur)
224    }
225}
226
227#[cfg(feature = "async-std")]
228#[cfg_attr(docsrs, doc(cfg(feature = "async-std")))]
229struct AsyncStdSleeper;
230
231#[cfg(feature = "async-std")]
232#[cfg_attr(docsrs, doc(cfg(feature = "async-std")))]
233impl Sleeper for AsyncStdSleeper {
234    type Sleep = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
235    fn sleep(&self, dur: Duration) -> Self::Sleep {
236        Box::pin(::async_std_1::task::sleep(dur))
237    }
238}