#![no_std]
use core::future::Future;
use core::iter::{IntoIterator, Iterator};
use core::pin::Pin;
use core::task::{Context, Poll};
use pin_project_lite::pin_project;
use tokio::time::{sleep_until, Duration, Instant, Sleep};
pub mod strategy;
pin_project! {
#[project = RetryStateProj]
enum RetryState<A>
where
A: Action,
{
Running {
#[pin]
future: A::Future,
},
Sleeping {
#[pin]
future: Sleep,
},
}
}
impl<A: Action> RetryState<A> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> RetryFuturePoll<A> {
match self.project() {
RetryStateProj::Running { future } => RetryFuturePoll::Running(future.poll(cx)),
RetryStateProj::Sleeping { future } => RetryFuturePoll::Sleeping(future.poll(cx)),
}
}
}
enum RetryFuturePoll<A>
where
A: Action,
{
Running(Poll<Result<A::Item, A::Error>>),
Sleeping(Poll<()>),
}
pin_project! {
pub struct Retry<I, A>
where
I: Iterator<Item = Duration>,
A: Action,
{
#[pin]
retry_if: RetryIf<I, A, fn(&A::Error) -> bool>,
}
}
impl<I, A> Retry<I, A>
where
I: Iterator<Item = Duration>,
A: Action,
{
pub fn start<T: IntoIterator<IntoIter = I, Item = Duration>>(strategy: T, action: A) -> Self {
Self {
retry_if: RetryIf::start(strategy, action, (|_| true) as fn(&A::Error) -> bool),
}
}
#[deprecated(since = "0.3.2", note = "renamed to `start()`")]
pub fn spawn<T: IntoIterator<IntoIter = I, Item = Duration>>(strategy: T, action: A) -> Self {
Self::start(strategy, action)
}
}
impl<I, A> Future for Retry<I, A>
where
I: Iterator<Item = Duration>,
A: Action,
{
type Output = Result<A::Item, A::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.retry_if.poll(cx)
}
}
pin_project! {
pub struct RetryIf<I, A, C>
where
I: Iterator<Item = Duration>,
A: Action,
C: Condition<A::Error>,
{
strategy: I,
#[pin]
state: RetryState<A>,
action: A,
condition: C,
}
}
impl<I, A, C> RetryIf<I, A, C>
where
I: Iterator<Item = Duration>,
A: Action,
C: Condition<A::Error>,
{
pub fn start<T: IntoIterator<IntoIter = I, Item = Duration>>(
strategy: T,
mut action: A,
condition: C,
) -> Self {
Self {
strategy: strategy.into_iter(),
state: RetryState::Running {
future: action.run(),
},
action,
condition,
}
}
#[deprecated(since = "0.3.2", note = "renamed to `start()`")]
pub fn spawn<T: IntoIterator<IntoIter = I, Item = Duration>>(
strategy: T,
action: A,
condition: C,
) -> Self {
Self::start(strategy, action, condition)
}
fn attempt(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<A::Item, A::Error>> {
let future = {
let this = self.as_mut().project();
this.action.run()
};
self.as_mut()
.project()
.state
.set(RetryState::Running { future });
self.poll(cx)
}
#[allow(clippy::type_complexity)]
fn retry(
mut self: Pin<&mut Self>,
err: A::Error,
cx: &mut Context<'_>,
) -> Result<Poll<Result<A::Item, A::Error>>, A::Error> {
match self.as_mut().project().strategy.next() {
None => Err(err),
Some(duration) => {
let deadline = Instant::now() + duration;
let future = sleep_until(deadline);
self.as_mut()
.project()
.state
.set(RetryState::Sleeping { future });
Ok(self.poll(cx))
}
}
}
}
impl<I, A, C> Future for RetryIf<I, A, C>
where
I: Iterator<Item = Duration>,
A: Action,
C: Condition<A::Error>,
{
type Output = Result<A::Item, A::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().project().state.poll(cx) {
RetryFuturePoll::Running(poll_result) => match poll_result {
Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok)),
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
if self.as_mut().project().condition.should_retry(&err) {
match self.retry(err, cx) {
Ok(poll) => poll,
Err(err) => Poll::Ready(Err(err)),
}
} else {
Poll::Ready(Err(err))
}
}
},
RetryFuturePoll::Sleeping(poll_result) => match poll_result {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => self.attempt(cx),
},
}
}
}
pub trait Action {
type Future: Future<Output = Result<Self::Item, Self::Error>>;
type Item;
type Error;
fn run(&mut self) -> Self::Future;
}
impl<R, E, T: Future<Output = Result<R, E>>, F: FnMut() -> T> Action for F {
type Item = R;
type Error = E;
type Future = T;
fn run(&mut self) -> Self::Future {
self()
}
}
pub trait Condition<E> {
fn should_retry(&mut self, error: &E) -> bool;
}
impl<E, F: FnMut(&E) -> bool> Condition<E> for F {
fn should_retry(&mut self, error: &E) -> bool {
self(error)
}
}