use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use futures_core::ready;
use pin_project::pin_project;
use crate::{backoff::Backoff, error::Error};
use crate::retry::{NoopNotify, Notify};
pub trait Sleeper {
type Sleep: Future<Output = ()> + Send + 'static;
fn sleep(&self, dur: Duration) -> Self::Sleep;
}
#[cfg(any(feature = "tokio", feature = "async-std"))]
pub fn retry<I, E, Fn, Fut, B>(
backoff: B,
operation: Fn,
) -> Retry<impl Sleeper, B, NoopNotify, Fn, Fut>
where
B: Backoff,
Fn: FnMut() -> Fut,
Fut: Future<Output = Result<I, Error<E>>>,
{
retry_notify(backoff, operation, NoopNotify)
}
#[cfg(any(feature = "tokio", feature = "async-std"))]
pub fn retry_notify<I, E, Fn, Fut, B, N>(
mut backoff: B,
operation: Fn,
notify: N,
) -> Retry<impl Sleeper, B, N, Fn, Fut>
where
B: Backoff,
Fn: FnMut() -> Fut,
Fut: Future<Output = Result<I, Error<E>>>,
N: Notify<E>,
{
backoff.reset();
Retry::new(rt_sleeper(), backoff, notify, operation)
}
#[pin_project]
pub struct Retry<S: Sleeper, B, N, Fn, Fut> {
sleeper: S,
backoff: B,
#[pin]
delay: OptionPinned<S::Sleep>,
operation: Fn,
#[pin]
fut: Fut,
notify: N,
}
impl<S, B, N, Fn, Fut, I, E> Retry<S, B, N, Fn, Fut>
where
S: Sleeper,
Fn: FnMut() -> Fut,
Fut: Future<Output = Result<I, Error<E>>>,
{
pub fn new(sleeper: S, backoff: B, notify: N, mut operation: Fn) -> Self {
let fut = operation();
Retry {
sleeper,
backoff,
delay: OptionPinned::None,
operation,
fut,
notify,
}
}
}
#[pin_project(project = OptionProj)]
enum OptionPinned<T> {
Some(#[pin] T),
None,
}
impl<S, B, N, Fn, Fut, I, E> Future for Retry<S, B, N, Fn, Fut>
where
S: Sleeper,
B: Backoff,
N: Notify<E>,
Fn: FnMut() -> Fut,
Fut: Future<Output = Result<I, Error<E>>>,
{
type Output = Result<I, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
if let OptionProj::Some(delay) = this.delay.as_mut().project() {
ready!(delay.poll(cx));
this.delay.set(OptionPinned::None);
}
match ready!(this.fut.as_mut().poll(cx)) {
Ok(v) => return Poll::Ready(Ok(v)),
Err(Error::Permanent(e)) => return Poll::Ready(Err(e)),
Err(Error::Transient(e)) => match this.backoff.next_backoff() {
Some(duration) => {
this.notify.notify(e, duration);
this.delay
.set(OptionPinned::Some(this.sleeper.sleep(duration)));
this.fut.set((this.operation)());
}
None => return Poll::Ready(Err(e)),
},
}
}
}
}
#[cfg(all(feature = "tokio", feature = "async-std"))]
compile_error!("Feature \"tokio\" and \"async-std\" cannot be enabled at the same time");
#[cfg(feature = "async-std")]
fn rt_sleeper() -> impl Sleeper {
AsyncStdSleeper
}
#[cfg(feature = "tokio")]
fn rt_sleeper() -> impl Sleeper {
TokioSleeper
}
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
struct TokioSleeper;
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
impl Sleeper for TokioSleeper {
type Sleep = ::tokio_1::time::Sleep;
fn sleep(&self, dur: Duration) -> Self::Sleep {
::tokio_1::time::sleep(dur)
}
}
#[cfg(feature = "async-std")]
#[cfg_attr(docsrs, doc(cfg(feature = "async-std")))]
struct AsyncStdSleeper;
#[cfg(feature = "async-std")]
#[cfg_attr(docsrs, doc(cfg(feature = "async-std")))]
impl Sleeper for AsyncStdSleeper {
type Sleep = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
fn sleep(&self, dur: Duration) -> Self::Sleep {
Box::pin(::async_std_1::task::sleep(dur))
}
}