#![cfg(feature = "std")]
use core::cell::Cell;
use core::time::Duration;
use std::cell::RefCell;
use std::rc::Rc;
use relentless::{
AsyncRetryExt, RetryError, RetryExt, RetryPolicy, RetryState, Stop, StopReason, Wait,
predicate, stop, wait,
};
const MAX_ATTEMPTS: u32 = 3;
const SUCCESS_VALUE: i32 = 42;
const ERROR_VALUE: &str = "fail";
const WAIT_DURATION: Duration = Duration::from_millis(1);
const DEFAULT_INITIAL_WAIT: Duration = Duration::from_millis(100);
const DEFAULT_SECOND_WAIT: Duration = Duration::from_millis(200);
const DEFAULT_WAIT_SEQUENCE: [Duration; 2] = [DEFAULT_INITIAL_WAIT, DEFAULT_SECOND_WAIT];
const STATEFUL_STOP_THRESHOLD: u32 = 2;
const UNTIL_TARGET: u32 = 3;
const CLOCK_STEP_MILLIS: u64 = 100;
const TIMEOUT_DEADLINE: Duration = Duration::from_millis(50);
fn instant_sleep(_dur: Duration) {}
struct StatefulStop {
consultations: Cell<u32>,
threshold: u32,
}
impl Stop for StatefulStop {
fn should_stop(&self, _state: &relentless::RetryState) -> bool {
let next = self.consultations.get().saturating_add(1);
self.consultations.set(next);
next >= self.threshold
}
}
struct StatefulWait {
calls: Cell<u32>,
}
impl Wait for StatefulWait {
fn next_wait(&self, _state: &relentless::RetryState) -> Duration {
let next = self.calls.get().saturating_add(1);
self.calls.set(next);
Duration::from_millis(u64::from(next))
}
}
#[test]
fn retry_ext_closure_form_retries_until_success() {
let attempts = Rc::new(Cell::new(0_u32));
let attempts_ref = Rc::clone(&attempts);
let result: Result<i32, RetryError<i32, &str>> = (move || {
attempts_ref.set(attempts_ref.get().saturating_add(1));
if attempts_ref.get() < MAX_ATTEMPTS {
Err(ERROR_VALUE)
} else {
Ok(SUCCESS_VALUE)
}
})
.retry()
.stop(stop::attempts(MAX_ATTEMPTS))
.wait(wait::fixed(WAIT_DURATION))
.sleep(|_dur| {})
.call();
assert_eq!(result, Ok(SUCCESS_VALUE));
assert_eq!(attempts.get(), MAX_ATTEMPTS);
}
#[allow(clippy::unnecessary_wraps)]
fn do_work() -> Result<i32, &'static str> {
Ok(SUCCESS_VALUE)
}
#[test]
fn retry_ext_function_pointer_form_works() {
let result = do_work.retry().call();
assert_eq!(result, Ok(SUCCESS_VALUE));
}
#[test]
fn default_sync_retry_builder_alias_is_nameable() {
type SyncWorkFn = fn() -> Result<i32, &'static str>;
type Builder = relentless::DefaultSyncRetryBuilder<SyncWorkFn, i32, &'static str>;
let typed: Builder = (do_work as SyncWorkFn).retry();
assert_eq!(typed.call(), Ok(SUCCESS_VALUE));
}
#[test]
fn default_sync_retry_builder_with_stats_alias_is_nameable() {
type SyncWorkFn = fn() -> Result<i32, &'static str>;
type SleepFn = fn(Duration);
type Builder =
relentless::DefaultSyncRetryBuilderWithStats<SyncWorkFn, SleepFn, i32, &'static str>;
let typed: Builder = (do_work as SyncWorkFn)
.retry()
.sleep(instant_sleep as SleepFn)
.with_stats();
let (result, stats) = typed.call();
assert_eq!(result, Ok(SUCCESS_VALUE));
assert_eq!(stats.attempts, 1);
}
#[test]
fn retry_ext_uses_default_policy_when_not_overridden() {
let attempts = Rc::new(Cell::new(0_u32));
let attempts_ref = Rc::clone(&attempts);
let sleeps = Rc::new(RefCell::new(Vec::new()));
let sleeps_ref = Rc::clone(&sleeps);
let result = (move || {
attempts_ref.set(attempts_ref.get().saturating_add(1));
Err::<i32, &str>(ERROR_VALUE)
})
.retry()
.sleep(move |dur| sleeps_ref.borrow_mut().push(dur))
.call();
assert!(matches!(result, Err(RetryError::Exhausted { .. })));
assert_eq!(attempts.get(), MAX_ATTEMPTS);
assert_eq!(*sleeps.borrow(), DEFAULT_WAIT_SEQUENCE);
}
#[test]
fn retry_ext_with_stats_reports_attempts() {
let attempts = Rc::new(Cell::new(0_u32));
let attempts_ref = Rc::clone(&attempts);
let (result, stats): (Result<i32, RetryError<i32, &str>>, relentless::RetryStats) =
(move || {
attempts_ref.set(attempts_ref.get().saturating_add(1));
Err(ERROR_VALUE)
})
.retry()
.stop(stop::attempts(2))
.sleep(|_dur| {})
.with_stats()
.call();
assert!(matches!(result, Err(RetryError::Exhausted { .. })));
assert_eq!(stats.attempts, 2);
assert_eq!(attempts.get(), 2);
}
#[test]
fn retry_ext_stateful_stop_and_wait_work() {
let attempts = Rc::new(Cell::new(0_u32));
let attempts_ref = Rc::clone(&attempts);
let sleeps = Rc::new(RefCell::new(Vec::new()));
let sleeps_ref = Rc::clone(&sleeps);
let result = (move || {
attempts_ref.set(attempts_ref.get().saturating_add(1));
Err::<i32, &str>(ERROR_VALUE)
})
.retry()
.stop(StatefulStop {
consultations: Cell::new(0),
threshold: STATEFUL_STOP_THRESHOLD,
})
.wait(StatefulWait {
calls: Cell::new(0),
})
.sleep(move |dur| sleeps_ref.borrow_mut().push(dur))
.call();
assert!(matches!(result, Err(RetryError::Exhausted { .. })));
assert_eq!(attempts.get(), STATEFUL_STOP_THRESHOLD);
assert_eq!(*sleeps.borrow(), vec![WAIT_DURATION]);
}
#[test]
fn retry_ext_hooks_match_policy_hook_points() {
let before_calls: RefCell<Vec<u32>> = RefCell::new(Vec::new());
let after_calls: RefCell<Vec<(u32, Option<Duration>)>> = RefCell::new(Vec::new());
let exit_calls: RefCell<Vec<(u32, bool, StopReason)>> = RefCell::new(Vec::new());
let _ = (|| Err::<i32, _>(ERROR_VALUE))
.retry()
.stop(stop::attempts(MAX_ATTEMPTS))
.wait(wait::fixed(WAIT_DURATION))
.before_attempt(|state| before_calls.borrow_mut().push(state.attempt))
.after_attempt(|state: &relentless::AttemptState<i32, &str>| {
after_calls
.borrow_mut()
.push((state.attempt, state.next_delay));
})
.on_exit(|state: &relentless::ExitState<i32, &str>| {
exit_calls.borrow_mut().push((
state.attempt,
state.outcome.is_err(),
state.stop_reason,
));
})
.sleep(instant_sleep)
.call();
assert_eq!(*before_calls.borrow(), vec![1, 2, 3]);
assert_eq!(
*after_calls.borrow(),
vec![
(1, Some(WAIT_DURATION)),
(2, Some(WAIT_DURATION)),
(3, None),
]
);
assert_eq!(
*exit_calls.borrow(),
vec![(MAX_ATTEMPTS, true, StopReason::Exhausted)]
);
}
#[test]
fn retry_ext_condition_not_met_for_ok_exhaustion() {
let result = (|| Ok::<i32, &str>(-1))
.retry()
.stop(stop::attempts(2))
.when(predicate::ok(|_value: &i32| true))
.sleep(instant_sleep)
.call();
assert!(matches!(
result,
Err(RetryError::Exhausted { last: Ok(-1), .. })
));
}
#[test]
fn retry_ext_non_retryable_error_returns_immediately() {
let result = (|| Err::<i32, &str>(ERROR_VALUE))
.retry()
.stop(stop::attempts(MAX_ATTEMPTS))
.when(predicate::error(|err: &&str| *err == "retryable"))
.sleep(instant_sleep)
.call();
assert!(matches!(
result,
Err(RetryError::Rejected {
last: ERROR_VALUE,
..
})
));
}
#[test]
fn retry_ext_until_retries_until_predicate_fires() {
let attempts = Rc::new(Cell::new(0_u32));
let attempts_ref = Rc::clone(&attempts);
let result = (move || {
attempts_ref.set(attempts_ref.get().saturating_add(1));
Ok::<u32, &str>(attempts_ref.get())
})
.retry()
.stop(stop::attempts(5))
.until(predicate::ok(move |v: &u32| *v >= UNTIL_TARGET))
.sleep(instant_sleep)
.call();
assert_eq!(result, Ok(UNTIL_TARGET));
assert_eq!(attempts.get(), UNTIL_TARGET);
}
#[test]
fn retry_ext_timeout_stops_on_deadline() {
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
let clock_millis = Arc::new(AtomicU64::new(0));
let clock_ref = Arc::clone(&clock_millis);
let result = (|| {
clock_ref.fetch_add(CLOCK_STEP_MILLIS, Ordering::Relaxed);
Err::<i32, &str>(ERROR_VALUE)
})
.retry()
.stop(stop::attempts(100))
.wait(wait::fixed(WAIT_DURATION))
.elapsed_clock_fn(move || Duration::from_millis(clock_millis.load(Ordering::Relaxed)))
.timeout(TIMEOUT_DEADLINE)
.sleep(instant_sleep)
.call();
assert!(matches!(result, Err(RetryError::Exhausted { .. })));
}
#[test]
fn sync_retry_builder_debug_format() {
let builder = (|| Ok::<i32, &str>(1)).retry().sleep(instant_sleep);
let debug = format!("{builder:?}");
assert!(debug.contains("SyncRetryBuilder"));
}
#[test]
fn sync_retry_builder_with_stats_debug_format() {
let builder = (|| Ok::<i32, &str>(1))
.retry()
.sleep(instant_sleep)
.with_stats();
let debug = format!("{builder:?}");
assert!(debug.contains("SyncRetryBuilderWithStats"));
}
#[cfg(feature = "alloc")]
mod async_tests {
use core::future::{Future, ready};
use core::pin::Pin;
use core::task::{Context, Poll, Waker};
use std::sync::Arc;
use super::*;
fn noop_waker() -> Waker {
struct NoopWake;
impl std::task::Wake for NoopWake {
fn wake(self: Arc<Self>) {}
}
Waker::from(Arc::new(NoopWake))
}
fn block_on<F: Future>(future: F) -> F::Output {
let mut future = Box::pin(future);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
loop {
match Future::poll(Pin::as_mut(&mut future), &mut cx) {
Poll::Ready(output) => return output,
Poll::Pending => std::thread::yield_now(),
}
}
}
fn do_async_work() -> core::future::Ready<Result<i32, &'static str>> {
ready(Ok(SUCCESS_VALUE))
}
fn ready_sleep(_dur: Duration) -> core::future::Ready<()> {
ready(())
}
#[test]
fn async_retry_ext_retries_until_success() {
let attempts = Rc::new(Cell::new(0_u32));
let attempts_ref = Rc::clone(&attempts);
let future = (move || {
attempts_ref.set(attempts_ref.get().saturating_add(1));
if attempts_ref.get() < MAX_ATTEMPTS {
ready(Err(ERROR_VALUE))
} else {
ready(Ok(SUCCESS_VALUE))
}
})
.retry_async()
.stop(stop::attempts(MAX_ATTEMPTS))
.wait(wait::fixed(Duration::from_millis(1)))
.sleep(|_dur| ready(()));
let result: Result<i32, RetryError<i32, &str>> = block_on(future);
assert_eq!(result, Ok(SUCCESS_VALUE));
assert_eq!(attempts.get(), MAX_ATTEMPTS);
}
#[test]
fn async_retry_ext_uses_default_policy_when_not_overridden() {
let attempts = Rc::new(Cell::new(0_u32));
let attempts_ref = Rc::clone(&attempts);
let sleeps = Rc::new(RefCell::new(Vec::new()));
let sleeps_ref = Rc::clone(&sleeps);
let result = block_on(
(move || {
attempts_ref.set(attempts_ref.get().saturating_add(1));
ready::<Result<i32, &str>>(Err(ERROR_VALUE))
})
.retry_async()
.sleep(move |dur| {
sleeps_ref.borrow_mut().push(dur);
ready(())
}),
);
assert!(matches!(result, Err(RetryError::Exhausted { .. })));
assert_eq!(attempts.get(), MAX_ATTEMPTS);
assert_eq!(*sleeps.borrow(), DEFAULT_WAIT_SEQUENCE);
}
#[test]
fn default_async_retry_builder_alias_is_nameable() {
type AsyncWorkFn = fn() -> core::future::Ready<Result<i32, &'static str>>;
type AsyncWork = core::future::Ready<Result<i32, &'static str>>;
type Builder =
relentless::DefaultAsyncRetryBuilder<AsyncWorkFn, AsyncWork, i32, &'static str>;
let typed: Builder = (do_async_work as AsyncWorkFn).retry_async();
let result: Result<i32, RetryError<i32, &str>> =
block_on(typed.sleep(|_dur: Duration| async {}));
assert_eq!(result, Ok(SUCCESS_VALUE));
}
#[test]
fn default_async_retry_builder_with_stats_alias_is_nameable() {
type AsyncWorkFn = fn() -> core::future::Ready<Result<i32, &'static str>>;
type AsyncWork = core::future::Ready<Result<i32, &'static str>>;
type SleepFn = fn(Duration) -> core::future::Ready<()>;
type SleepFuture = core::future::Ready<()>;
type Builder = relentless::DefaultAsyncRetryBuilderWithStats<
AsyncWorkFn,
AsyncWork,
SleepFn,
i32,
&'static str,
SleepFuture,
>;
let typed: Builder = (do_async_work as AsyncWorkFn)
.retry_async()
.sleep(ready_sleep as SleepFn)
.with_stats();
let (result, stats): (Result<i32, RetryError<i32, &str>>, relentless::RetryStats) =
block_on(typed);
assert_eq!(result, Ok(SUCCESS_VALUE));
assert_eq!(stats.attempts, 1);
}
#[test]
fn async_retry_ext_hooks_match_policy_hook_points() {
let before_calls: RefCell<Vec<u32>> = RefCell::new(Vec::new());
let after_calls: RefCell<Vec<(u32, Option<Duration>)>> = RefCell::new(Vec::new());
let exit_calls: RefCell<Vec<(u32, bool, StopReason)>> = RefCell::new(Vec::new());
let future = (|| ready::<Result<i32, &str>>(Err(ERROR_VALUE)))
.retry_async()
.stop(stop::attempts(MAX_ATTEMPTS))
.wait(wait::fixed(WAIT_DURATION))
.before_attempt(|state| before_calls.borrow_mut().push(state.attempt))
.after_attempt(|state: &relentless::AttemptState<i32, &str>| {
after_calls
.borrow_mut()
.push((state.attempt, state.next_delay));
})
.on_exit(|state: &relentless::ExitState<i32, &str>| {
exit_calls.borrow_mut().push((
state.attempt,
state.outcome.is_err(),
state.stop_reason,
));
})
.sleep(|_dur| ready(()));
let _ = block_on(future);
assert_eq!(*before_calls.borrow(), vec![1, 2, 3]);
assert_eq!(
*after_calls.borrow(),
vec![
(1, Some(WAIT_DURATION)),
(2, Some(WAIT_DURATION)),
(3, None),
]
);
assert_eq!(
*exit_calls.borrow(),
vec![(MAX_ATTEMPTS, true, StopReason::Exhausted)]
);
}
#[test]
fn async_retry_ext_with_stats_reports_attempts() {
let future = (|| ready::<Result<i32, &str>>(Err(ERROR_VALUE)))
.retry_async()
.stop(stop::attempts(2))
.when(predicate::any_error())
.sleep(|_dur| ready(()))
.with_stats();
let (result, stats) = block_on(future);
assert!(matches!(result, Err(RetryError::Exhausted { .. })));
assert_eq!(stats.attempts, 2);
}
#[test]
fn async_retry_ext_stateful_stop_and_wait_work() {
let attempts = Rc::new(Cell::new(0_u32));
let attempts_ref = Rc::clone(&attempts);
let sleeps = Rc::new(RefCell::new(Vec::new()));
let sleeps_ref = Rc::clone(&sleeps);
let result = block_on(
(move || {
attempts_ref.set(attempts_ref.get().saturating_add(1));
ready::<Result<i32, &str>>(Err(ERROR_VALUE))
})
.retry_async()
.stop(StatefulStop {
consultations: Cell::new(0),
threshold: STATEFUL_STOP_THRESHOLD,
})
.wait(StatefulWait {
calls: Cell::new(0),
})
.sleep(move |dur| {
sleeps_ref.borrow_mut().push(dur);
ready(())
}),
);
assert!(matches!(result, Err(RetryError::Exhausted { .. })));
assert_eq!(attempts.get(), STATEFUL_STOP_THRESHOLD);
assert_eq!(*sleeps.borrow(), vec![WAIT_DURATION]);
}
#[test]
fn async_retry_ext_repoll_after_completion_panics() {
let mut retry = Box::pin(
(|| ready(Ok::<i32, &str>(SUCCESS_VALUE)))
.retry_async()
.stop(stop::attempts(1))
.sleep(|_dur| ready(())),
);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let first_poll = Future::poll(Pin::as_mut(&mut retry), &mut cx);
assert_eq!(first_poll, Poll::Ready(Ok(SUCCESS_VALUE)));
let second_poll = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = Future::poll(Pin::as_mut(&mut retry), &mut cx);
}));
assert!(second_poll.is_err());
}
#[test]
fn free_function_retry_async_uses_default_policy() {
let attempts = Rc::new(Cell::new(0_u32));
let attempts_ref = Rc::clone(&attempts);
let result = block_on(
relentless::retry_async(move |_| {
attempts_ref.set(attempts_ref.get().saturating_add(1));
ready::<Result<i32, &str>>(Err(ERROR_VALUE))
})
.sleep(|_dur| ready(())),
);
assert!(matches!(result, Err(RetryError::Exhausted { .. })));
assert_eq!(attempts.get(), MAX_ATTEMPTS);
}
#[test]
fn async_retry_ext_until_retries_until_predicate_fires() {
let attempts = Rc::new(Cell::new(0_u32));
let attempts_ref = Rc::clone(&attempts);
let result = block_on(
(move || {
attempts_ref.set(attempts_ref.get().saturating_add(1));
ready(Ok::<u32, &str>(attempts_ref.get()))
})
.retry_async()
.stop(stop::attempts(5))
.until(predicate::ok(move |v: &u32| *v >= UNTIL_TARGET))
.sleep(|_dur| ready(())),
);
assert_eq!(result, Ok(UNTIL_TARGET));
assert_eq!(attempts.get(), UNTIL_TARGET);
}
#[test]
fn async_retry_ext_timeout_stops_on_deadline() {
use std::sync::atomic::{AtomicU64, Ordering};
let clock_millis = Arc::new(AtomicU64::new(0));
let clock_ref = Arc::clone(&clock_millis);
let result = block_on(
(move || {
clock_ref.fetch_add(CLOCK_STEP_MILLIS, Ordering::Relaxed);
ready(Err::<i32, &str>(ERROR_VALUE))
})
.retry_async()
.stop(stop::attempts(100))
.wait(wait::fixed(WAIT_DURATION))
.elapsed_clock_fn(move || Duration::from_millis(clock_millis.load(Ordering::Relaxed)))
.timeout(TIMEOUT_DEADLINE)
.sleep(|_dur| ready(())),
);
assert!(matches!(result, Err(RetryError::Exhausted { .. })));
}
#[test]
fn async_retry_builder_debug_format() {
let builder = (|| ready(Ok::<i32, &str>(1)))
.retry_async()
.sleep(|_dur: Duration| ready(()));
let debug = format!("{builder:?}");
assert!(debug.contains("AsyncRetryBuilder"));
}
#[test]
fn async_retry_builder_with_stats_debug_format() {
let builder = (|| ready(Ok::<i32, &str>(1)))
.retry_async()
.sleep(|_dur: Duration| ready(()))
.with_stats();
let debug = format!("{builder:?}");
assert!(debug.contains("AsyncRetryBuilderWithStats"));
}
}
#[test]
fn policy_and_extension_forms_are_equivalent_for_basic_case() {
let from_ext = (|| Err::<i32, &str>(ERROR_VALUE))
.retry()
.sleep(instant_sleep)
.call();
let from_policy = RetryPolicy::default()
.retry(|_state: RetryState| Err::<i32, &str>(ERROR_VALUE))
.sleep(instant_sleep)
.call();
assert!(matches!(
from_ext,
Err(RetryError::Exhausted {
last: Err(ERROR_VALUE),
..
})
));
assert!(matches!(
from_policy,
Err(RetryError::Exhausted {
last: Err(ERROR_VALUE),
..
})
));
}