1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
use futures_timer::Delay;
use pin_project::pin_project;
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
    time::Duration,
};

mod backoff;
pub use backoff::*;

#[derive(Clone, Copy, Debug)]
pub struct Cancelled;

/// Retry a future until it succeeds.
pub fn retry<R, S>(task: R, scheduler: S) -> Retry<R>
where
    R: Retryable,
    R::Error: std::fmt::Debug,
    S: Backoff + 'static,
{
    Retry {
        retryable: task,
        scheduler: Box::new(scheduler),
        state: RetryState::Pending,
        trying_fut: None,
        waiting_fut: None,
    }
}

/// Retryable must be implemented for a task that can be retried any number of times.
///
/// All errors wil be reported with `report_error`. The default implementation will report
/// the error with `tracing::error!()`.
pub trait Retryable {
    type Item;
    type Error: std::fmt::Debug;
    type Future: Future<Output = Result<Self::Item, Self::Error>>;

    /// Setup a new attempt at completing the task.
    fn call(&self) -> Self::Future;

    /// Report the error of the last attempt to complete the task.
    fn report_error(&self, error: &Self::Error, next_retry: Option<Duration>) {
        tracing::error!(
            "error after retry: {:?} (will retry in {:?})",
            error,
            next_retry
        );
    }
}

/// Retry is return by `retry`
#[pin_project]
pub struct Retry<R>
where
    R: Retryable,
    R::Error: std::fmt::Debug,
{
    retryable: R,
    scheduler: Box<dyn Backoff>,
    state: RetryState,

    #[pin]
    trying_fut: Option<R::Future>,

    #[pin]
    waiting_fut: Option<Delay>,
}

enum RetryState {
    Pending,
    Trying,
    Waiting,
}

impl<R> Future for Retry<R>
where
    R: Retryable,
    R::Error: std::fmt::Debug,
{
    type Output = Result<R::Item, Cancelled>;

    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut this = self.project();
        loop {
            *this.state = match this.state {
                RetryState::Pending => {
                    this.waiting_fut.set(None);
                    this.trying_fut.set(Some(this.retryable.call()));
                    RetryState::Trying
                }
                RetryState::Trying => {
                    match this.trying_fut.as_mut().as_pin_mut().unwrap().poll(ctx) {
                        Poll::Pending => return Poll::Pending,
                        Poll::Ready(Ok(result)) => return Poll::Ready(Ok(result)),
                        Poll::Ready(Err(err)) => {
                            let retry_after = this.scheduler.next_retry();

                            // log error
                            this.retryable.report_error(&err, retry_after);

                            match retry_after {
                                None => return Poll::Ready(Err(Cancelled)),
                                Some(retry_after) => {
                                    this.trying_fut.set(None);
                                    this.waiting_fut.set(Some(Delay::new(retry_after)));
                                    RetryState::Waiting
                                }
                            }
                        }
                    }
                }
                RetryState::Waiting => {
                    match this.waiting_fut.as_mut().as_pin_mut().unwrap().poll(ctx) {
                        Poll::Pending => return Poll::Pending,
                        Poll::Ready(_) => RetryState::Pending,
                    }
                }
            };
        }
    }
}

impl<F, Fut, I, E> Retryable for F
where
    F: Fn() -> Fut,
    Fut: Future<Output = Result<I, E>>,
    E: std::fmt::Debug,
{
    type Item = I;
    type Error = E;
    type Future = Fut;

    fn call(&self) -> Self::Future {
        self()
    }
}

#[cfg(test)]
mod tests {
    #[test]
    fn it_works() {
        assert_eq!(2 + 2, 4);
    }
}