use std::future::{ready, Ready};
use crate::Policy;
pub fn concurrent() -> ConcurrentRetryPolicy {
ConcurrentRetryPolicy
}
#[derive(Debug, Clone, Copy)]
pub struct ConcurrentRetryPolicy;
impl<T, E> Policy<T, E> for ConcurrentRetryPolicy {
type ForceRetryFuture = Ready<()>;
type RetryFuture = Ready<Self>;
fn force_retry_after(&self) -> Self::ForceRetryFuture {
ready(())
}
fn retry(self, _result: Option<Result<&T, &E>>) -> Option<Self::RetryFuture> {
Some(ready(Self))
}
}
#[cfg(any(feature = "tokio", feature = "async-std"))]
mod _interval {
use super::*;
use std::time::Duration;
pub fn interval(force_retry_after: Duration) -> IntervalRetryPolicy {
IntervalRetryPolicy { force_retry_after }
}
#[derive(Debug, Clone, Copy)]
pub struct IntervalRetryPolicy {
force_retry_after: Duration,
}
impl<T, E> Policy<T, E> for IntervalRetryPolicy {
type ForceRetryFuture = crate::sleep::Sleep;
type RetryFuture = Ready<Self>;
fn force_retry_after(&self) -> Self::ForceRetryFuture {
crate::sleep::sleep(self.force_retry_after)
}
fn retry(self, _result: Option<Result<&T, &E>>) -> Option<Self::RetryFuture> {
Some(ready(self))
}
}
}
#[cfg(any(feature = "tokio", feature = "async-std"))]
pub use _interval::*;
#[cfg(test)]
mod test {
use std::sync::{Arc, Mutex};
use crate::policies::attempts;
use crate::retry;
use crate::tests::run_test;
use std::future::pending;
mod concurrent {
use crate::policies::concurrent::concurrent;
use super::*;
#[test]
fn should_run_all_non_ready_futures() {
run_test(async {
let call_count = Arc::new(Mutex::new(0));
let create_fut = || async {
let call = {
let mut mutex_guard = call_count.lock().unwrap();
*mutex_guard += 1;
*mutex_guard
};
if call == 3 {
crate::tests::yield_now().await;
crate::tests::yield_now().await;
crate::tests::yield_now().await;
Ok::<(), ()>(())
} else {
pending().await
}
};
let result = retry(create_fut, attempts(concurrent(), 4)).await;
let guard = call_count.lock().unwrap();
assert_eq!(*guard, 4);
assert!(result.is_ok());
})
}
#[test]
fn should_run_futures_till_ready_one() {
run_test(async {
let call_count = Arc::new(Mutex::new(0));
let create_fut = || async {
let call = {
let mut mutex_guard = call_count.lock().unwrap();
*mutex_guard += 1;
*mutex_guard
};
if call == 2 {
Ok::<(), ()>(())
} else {
pending().await
}
};
let result = retry(create_fut, attempts(concurrent(), 5)).await;
let guard = call_count.lock().unwrap();
assert_eq!(*guard, 2);
assert!(result.is_ok());
})
}
}
#[cfg(any(feature = "tokio", feature = "async-std"))]
mod delayed_concurrent {
use std::time::{Duration, Instant};
use crate::policies::concurrent::interval;
use super::*;
#[test]
fn should_return_last_error_when_all_attempts() {
run_test(async {
let call_count = Arc::new(Mutex::new(0));
let create_fut = || async {
crate::tests::yield_now().await;
let mut mutex_guard = call_count.lock().unwrap();
*mutex_guard += 1;
if *mutex_guard == 1 {
Err::<(), _>(*mutex_guard)
} else {
Err(*mutex_guard)
}
};
let result = retry(
create_fut,
attempts(interval(Duration::from_secs(10000)), 2),
)
.await;
let guard = call_count.lock().unwrap();
assert_eq!(*guard, 3);
assert_eq!(result, Err(3));
})
}
#[test]
fn should_retry_after_delay() {
run_test(async {
let call_count = Arc::new(Mutex::new(0));
let create_fut = || async {
crate::tests::yield_now().await;
let call_count = {
let mut mutex_guard = call_count.lock().unwrap();
*mutex_guard += 1;
*mutex_guard
};
if call_count == 1 {
pending::<Result<(), ()>>().await
} else {
Ok(())
}
};
let now = Instant::now();
let result =
retry(create_fut, attempts(interval(Duration::from_millis(50)), 2)).await;
let guard = call_count.lock().unwrap();
assert_eq!(*guard, 2);
assert!(now.elapsed() >= Duration::from_millis(50));
assert!(result.is_ok());
})
}
}
}