1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
//! An add-on to [`std::future::Future`] that makes it easy to introduce a retry mechanism
//! with a backoff for functions that produce failible futures,
//! i.e. futures where the `Output` type is some `Result<T, backoff::Error<E>>`.
//! The `backoff::Error` wrapper is necessary so as to distinguish errors that are considered
//! *transient*, and thus make it likely that a future attempt at producing and blocking on
//! the same future could just as well succeed (e.g. the HTTP 503 Service Unavailable error),
//! and errors that are considered *permanent*, where no future attempts are presumed to have
//! a chance to succeed (e.g. the HTTP 404 Not Found error).
//!
//! The extension trait integrates with the `backoff` crate and expects a [`backoff::backoff::Backoff`]
//! value to describe the various properties of the retry & backoff mechanism to be used.
//!
//! ```rust
//! #![feature(async_await)]
//!
//! fn isahc_error_to_backoff(err: isahc::Error) -> backoff::Error<isahc::Error> {
//!     match err {
//!         isahc::Error::Aborted | isahc::Error::Io(_) | isahc::Error::Timeout =>
//!             backoff::Error::Transient(err),
//!         _ =>
//!             backoff::Error::Permanent(err)
//!     }
//! }
//!
//! async fn get_example_contents() -> Result<String, backoff::Error<isahc::Error>> {
//!     use isahc::ResponseExt;
//!
//!     let mut response = isahc::get_async("https://example.org")
//!         .await
//!         .map_err(isahc_error_to_backoff)?;
//!
//!     response
//!         .text_async()
//!         .await
//!         .map_err(|err: std::io::Error| backoff::Error::Transient(isahc::Error::Io(err)))
//! }
//!
//! async fn get_example_contents_with_retry() -> Result<String, isahc::Error> {
//!     use backoff_futures::BackoffExt;
//!
//!     let mut backoff = backoff::ExponentialBackoff::default();
//!     get_example_contents.with_backoff(&mut backoff)
//!         .await
//!         .map_err(|err| match err {
//!             backoff::Error::Transient(err) | backoff::Error::Permanent(err) => err
//!         })
//! }
//! ```
//!
//! See [`BackoffExt::with_backoff`] for more details.

#![cfg_attr(test, feature(async_await))]

#[cfg(test)] #[macro_use] extern crate matches;

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use backoff::backoff::{Backoff};

enum BackoffState<Fut> {
    Pending,
    Delay(tokio_timer::Delay),
    Work(Fut)
}

#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct BackoffFuture<'b, Fut, B, F> {
    state: BackoffState<Fut>,
    backoff: &'b mut B,
    f: F
}

pub trait BackoffExt<Fut, B, F> {
    /// Returns a future that, when polled, will first ask `self` for a new future (with an output
    /// type `Result<T, backoff::Error<_>>` to produce the expected result.
    ///
    /// If the underlying future is ready with an `Err` value, the nature of the error
    /// (permanent/transient) will determine whether polling the future will employ the provided
    /// `backoff` strategy and will result in the the work being retried.
    ///
    /// Specifically, `backoff::Error::Permanent` errors will be returned immediately.
    /// [`backoff::Error::Transient`] errors will, depending on the particular [`backoff::backoff::Backoff`],
    /// result in a retry attempt, most likely with a delay.
    ///
    /// If the underlying future is ready with an `Ok` value, it will be returned immediately.
    fn with_backoff(self, backoff: &mut B) -> BackoffFuture<'_, Fut, B, F>;
}

impl<Fut, T, E, B, F> BackoffExt<Fut, B, F> for F
     where F: FnMut() -> Fut,
           Fut: Future<Output = Result<T, backoff::Error<E>>> {
    fn with_backoff(self, backoff: &mut B) -> BackoffFuture<'_, Fut, B, F> {
        BackoffFuture {
            f: self,
            state: BackoffState::Pending,
            backoff
        }
    }
}

impl<Fut, F, B, T, E> Future for BackoffFuture<'_, Fut, B, F>
    where Fut: Future<Output = Result<T, backoff::Error<E>>>,
          F: FnMut() -> Fut + Unpin,
          B: Backoff + Unpin
{
    type Output = Fut::Output;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        use tokio_timer::Delay;
        use std::time::Instant;

        // The loop will be passed at most twice.
        loop {
            match self.as_mut().state {
                BackoffState::Work(_) => {
                    let fut = unsafe {
                        self.as_mut().map_unchecked_mut(|s| match s.state {
                            BackoffState::Work(ref mut f) => f,
                            _ => unreachable!()
                        })
                    };
        
                    match fut.poll(cx) {
                        Poll::Pending => return Poll::Pending,

                        Poll::Ready(value) => match value {
                            Ok(_) =>
                                return Poll::Ready(value),

                            Err(backoff::Error::Permanent(_)) =>
                                return Poll::Ready(value),

                            Err(backoff::Error::Transient(_)) => unsafe {
                                let mut s = self.as_mut().get_unchecked_mut();
                                match s.backoff.next_backoff() {
                                    Some(next) => {
                                        let delay = Delay::new(Instant::now() + next);
                                        s.state = BackoffState::Delay(delay);
                                    }
                                    None =>
                                        return Poll::Ready(value)
                                }
                            }
                        }
                    }
                }

                BackoffState::Delay(ref delay) if !delay.is_elapsed() =>
                    return Poll::Pending,

                _ => unsafe {
                    let mut s = self.as_mut().get_unchecked_mut();
                    s.state = BackoffState::Work((s.f)());
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use futures::Future;
    use super::BackoffExt;

    #[test]
    fn test_when_future_succeeds() {
        fn do_work() -> impl Future<Output = Result<u32, backoff::Error<()>>> {
            futures::future::ready(Ok(123))
        }

        let mut backoff = backoff::ExponentialBackoff::default();
        let result: Result<u32, backoff::Error<()>> =
            futures::executor::block_on(do_work.with_backoff(&mut backoff));
        assert_eq!(result.ok(), Some(123));
    }

    #[test]
    fn test_with_closure_when_future_succeeds() {
        let do_work = || {
            futures::future::lazy(|_| Ok(123))
        };

        let mut backoff = backoff::ExponentialBackoff::default();
        let result: Result<u32, backoff::Error<()>> =
            futures::executor::block_on(do_work.with_backoff(&mut backoff));
        assert_eq!(result.ok(), Some(123));
    }

    #[test]
    fn test_with_closure_when_future_fails_with_permanent_error() {
        let do_work = || {
            let result = Err(backoff::Error::Permanent(()));
            futures::future::ready(result)
        };

        let mut backoff = backoff::ExponentialBackoff::default();
        let result: Result<u32, backoff::Error<()>> =
            futures::executor::block_on(do_work.with_backoff(&mut backoff));
        assert_matches!(result.err(), Some(backoff::Error::Permanent(_)));
    }

    #[test]
    fn test_with_async_fn_when_future_succeeds() {
        async fn do_work() -> Result<u32, backoff::Error<()>> {
            futures::future::ready(Ok(123)).await
        }

        let mut backoff = backoff::ExponentialBackoff::default();
        let result: Result<u32, backoff::Error<()>> =
            futures::executor::block_on(do_work.with_backoff(&mut backoff));
        assert_eq!(result.ok(), Some(123));
    }
}