use std::future::{pending, ready, Pending, Ready};
use crate::Policy;
pub fn sequential() -> SequentialRetryPolicy {
SequentialRetryPolicy
}
#[derive(Debug, Clone, Copy)]
pub struct SequentialRetryPolicy;
impl<T, E> Policy<T, E> for SequentialRetryPolicy {
type ForceRetryFuture = Pending<()>;
type RetryFuture = Ready<Self>;
fn force_retry_after(&self) -> Self::ForceRetryFuture {
pending()
}
fn retry(self, _result: Option<Result<&T, &E>>) -> Option<Self::RetryFuture> {
Some(ready(self))
}
}
#[cfg(any(feature = "tokio", feature = "async-std"))]
mod retry_backoff {
use super::*;
use pin_project_lite::pin_project;
use std::time::Duration;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
pub fn backoff<I>(backoff: I) -> BackoffRetry<I> {
BackoffRetry { backoff }
}
#[derive(Debug, Clone, Copy)]
pub struct BackoffRetry<I> {
backoff: I,
}
impl<I, T, E> Policy<T, E> for BackoffRetry<I>
where
I: Iterator<Item = Duration>,
{
type ForceRetryFuture = Pending<()>;
type RetryFuture = SeqDelay<I>;
fn force_retry_after(&self) -> Self::ForceRetryFuture {
pending()
}
fn retry(mut self, _result: Option<Result<&T, &E>>) -> Option<Self::RetryFuture> {
let delay = self.backoff.next().map(crate::sleep::sleep)?;
Some(SeqDelay {
backoff: Some(self.backoff),
delay,
})
}
}
pin_project! {
pub struct SeqDelay<I>
{
backoff: Option<I>,
#[pin]
delay: crate::sleep::Sleep,
}
}
impl<I> Future for SeqDelay<I>
where
I: Iterator<Item = Duration>,
{
type Output = BackoffRetry<I>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.delay.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(backoff(
this.backoff.take().expect("SeqDelay Backoff must be some"),
)),
}
}
}
}
#[cfg(any(feature = "tokio", feature = "async-std"))]
pub use retry_backoff::*;
#[cfg(all(any(feature = "tokio", feature = "async-std"), test))]
mod tests {
use std::sync::{Arc, Mutex};
use crate::policies::attempts;
use crate::retry;
use crate::tests::run_test;
mod retry_backoff {
use std::time::{Duration, Instant};
use crate::{backoff::exponential, policies::sequential::backoff};
use super::*;
#[test]
fn should_run_next_after_backoff() {
run_test(async {
let create_fut = || async {
crate::tests::yield_now().await;
Err::<(), ()>(())
};
let now = Instant::now();
let policy = backoff(exponential(
Duration::from_millis(50),
2,
Some(Duration::from_secs(1)),
));
let result = retry(create_fut, attempts(policy, 2)).await;
assert!(now.elapsed() > Duration::from_millis(150));
assert!(result.is_err());
})
}
#[test]
fn should_stop_retrying_when_backoff_iter_exhausted() {
run_test(async {
let create_fut = || async {
crate::tests::yield_now().await;
Err::<(), ()>(())
};
let policy = backoff(std::iter::empty());
let result = retry(create_fut, policy).await;
assert!(result.is_err())
})
}
}
mod attempts {
use std::time::Duration;
use crate::policies::sequential::sequential;
use super::*;
#[test]
fn should_run_futures_sequentially() {
run_test(async {
let call_count = Arc::new(Mutex::new(0));
let pass_allowed = Arc::new(Mutex::new(false));
let create_fut = {
let call_count = call_count.clone();
let pass_allowed = pass_allowed.clone();
move || {
let call_count = call_count.clone();
let pass_allowed = pass_allowed.clone();
async move {
{
let mut mutex_guard = call_count.lock().unwrap();
*mutex_guard += 1;
}
loop {
{
if *pass_allowed.lock().unwrap() {
break;
}
}
crate::tests::yield_now().await
}
Err::<(), ()>(())
}
}
};
crate::tests::spawn(
async move { retry(create_fut, attempts(sequential(), 2)).await },
);
crate::sleep::sleep(Duration::from_millis(10)).await;
{
let guard = call_count.lock().unwrap();
assert_eq!(*guard, 1);
}
*pass_allowed.lock().unwrap() = true;
crate::sleep::sleep(Duration::from_millis(10)).await;
let guard = call_count.lock().unwrap();
assert_eq!(*guard, 3);
})
}
}
}