use std::fmt;
use std::future::{Future, poll_fn};
use std::panic::Location;
use std::pin::{Pin, pin};
use std::task::Poll;
use std::time::{Duration, Instant};
use test_better_core::{ErrorKind, TestError, TestResult};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Elapsed {
pub limit: Duration,
}
impl fmt::Display for Elapsed {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "future did not complete within {:?}", self.limit)
}
}
impl std::error::Error for Elapsed {}
#[diagnostic::on_unimplemented(
message = "this async timing assertion needs a runtime, but no runtime feature is enabled",
note = "enable exactly one runtime feature on `test-better`: `tokio`, `async-std`, or `smol`",
note = "or, for `eventually`, use the runtime-free `eventually_blocking`"
)]
pub trait RuntimeAvailable {}
#[cfg(any(feature = "tokio", feature = "async-std", feature = "smol"))]
impl<T: ?Sized> RuntimeAvailable for T {}
#[cfg(feature = "tokio")]
fn selected_sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin(async move { tokio::time::sleep(duration).await })
}
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
fn selected_sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin(async move { async_std::task::sleep(duration).await })
}
#[cfg(all(feature = "smol", not(any(feature = "tokio", feature = "async-std"))))]
fn selected_sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin(async move {
smol::Timer::after(duration).await;
})
}
#[cfg(not(any(feature = "tokio", feature = "async-std", feature = "smol")))]
fn selected_sleep(_duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin(std::future::pending())
}
async fn race<F, S>(fut: F, timer: S) -> Result<F::Output, ()>
where
F: Future,
S: Future<Output = ()>,
{
let mut fut = pin!(fut);
let mut timer = pin!(timer);
poll_fn(move |cx| {
if let Poll::Ready(output) = fut.as_mut().poll(cx) {
return Poll::Ready(Ok(output));
}
if timer.as_mut().poll(cx).is_ready() {
return Poll::Ready(Err(()));
}
Poll::Pending
})
.await
}
pub async fn run_within<F>(limit: Duration, fut: F) -> Result<F::Output, Elapsed>
where
F: Future + RuntimeAvailable,
{
match race(fut, selected_sleep(limit)).await {
Ok(output) => Ok(output),
Err(()) => Err(Elapsed { limit }),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Backoff {
pub initial: Duration,
pub ceiling: Duration,
pub factor: u32,
}
impl Default for Backoff {
fn default() -> Self {
Self {
initial: Duration::from_millis(1),
ceiling: Duration::from_millis(100),
factor: 2,
}
}
}
impl Backoff {
fn next_nap(&self, previous: Duration) -> Duration {
previous.saturating_mul(self.factor).min(self.ceiling)
}
}
fn eventually_error(
timeout: Duration,
elapsed: Duration,
probes: u32,
location: &'static Location<'static>,
) -> TestError {
let plural = if probes == 1 { "" } else { "s" };
TestError::new(ErrorKind::Timeout)
.with_message(format!(
"condition was not met within {timeout:?}: gave up after {probes} probe{plural} \
over {elapsed:?}"
))
.with_location(location)
}
async fn eventually_impl<F, Fut>(
timeout: Duration,
backoff: Backoff,
mut probe: F,
location: &'static Location<'static>,
) -> TestResult
where
F: FnMut() -> Fut,
Fut: Future<Output = bool>,
{
let start = Instant::now();
let mut nap = backoff.initial;
let mut probes: u32 = 0;
loop {
probes = probes.saturating_add(1);
if probe().await {
return Ok(());
}
let elapsed = start.elapsed();
if elapsed >= timeout {
return Err(eventually_error(timeout, elapsed, probes, location));
}
selected_sleep(nap.min(timeout - elapsed)).await;
nap = backoff.next_nap(nap);
}
}
fn eventually_blocking_impl<F>(
timeout: Duration,
backoff: Backoff,
mut probe: F,
location: &'static Location<'static>,
) -> TestResult
where
F: FnMut() -> bool,
{
let start = Instant::now();
let mut nap = backoff.initial;
let mut probes: u32 = 0;
loop {
probes = probes.saturating_add(1);
if probe() {
return Ok(());
}
let elapsed = start.elapsed();
if elapsed >= timeout {
return Err(eventually_error(timeout, elapsed, probes, location));
}
std::thread::sleep(nap.min(timeout - elapsed));
nap = backoff.next_nap(nap);
}
}
#[track_caller]
pub fn eventually<F, Fut>(timeout: Duration, probe: F) -> impl Future<Output = TestResult>
where
F: FnMut() -> Fut + RuntimeAvailable,
Fut: Future<Output = bool>,
{
eventually_impl(timeout, Backoff::default(), probe, Location::caller())
}
#[track_caller]
pub fn eventually_with<F, Fut>(
timeout: Duration,
backoff: Backoff,
probe: F,
) -> impl Future<Output = TestResult>
where
F: FnMut() -> Fut + RuntimeAvailable,
Fut: Future<Output = bool>,
{
eventually_impl(timeout, backoff, probe, Location::caller())
}
#[track_caller]
pub fn eventually_blocking<F>(timeout: Duration, probe: F) -> TestResult
where
F: FnMut() -> bool,
{
eventually_blocking_impl(timeout, Backoff::default(), probe, Location::caller())
}
#[track_caller]
pub fn eventually_blocking_with<F>(timeout: Duration, backoff: Backoff, probe: F) -> TestResult
where
F: FnMut() -> bool,
{
eventually_blocking_impl(timeout, backoff, probe, Location::caller())
}
#[cfg(test)]
mod tests {
use super::*;
use std::cell::Cell;
use std::future::{pending, ready};
use test_better_matchers::{check, eq, ge, is_true};
#[test]
fn race_returns_the_future_output_when_it_is_ready_first() -> TestResult {
let outcome = pollster::block_on(race(ready(7), pending::<()>()));
check!(outcome).satisfies(eq(Ok(7)))
}
#[test]
fn race_reports_the_timer_when_the_future_is_not_ready() -> TestResult {
let outcome = pollster::block_on(race(pending::<i32>(), ready(())));
check!(outcome).satisfies(eq(Err(())))
}
#[test]
fn race_prefers_the_future_when_both_are_ready() -> TestResult {
let outcome = pollster::block_on(race(ready("done"), ready(())));
check!(outcome).satisfies(eq(Ok("done")))
}
#[test]
fn backoff_grows_by_factor_and_stops_at_the_ceiling() -> TestResult {
let backoff = Backoff {
initial: Duration::from_millis(10),
ceiling: Duration::from_millis(25),
factor: 2,
};
check!(backoff.next_nap(Duration::from_millis(10)))
.satisfies(eq(Duration::from_millis(20)))?;
check!(backoff.next_nap(Duration::from_millis(20))).satisfies(eq(Duration::from_millis(25)))
}
#[test]
fn eventually_blocking_stops_as_soon_as_the_probe_passes() -> TestResult {
let calls = Cell::new(0u32);
eventually_blocking(Duration::from_secs(5), || {
calls.set(calls.get() + 1);
calls.get() >= 3
})?;
check!(calls.get()).satisfies(eq(3))
}
#[test]
fn eventually_blocking_passes_immediately_when_the_probe_is_already_true() -> TestResult {
let calls = Cell::new(0u32);
eventually_blocking(Duration::from_secs(5), || {
calls.set(calls.get() + 1);
true
})?;
check!(calls.get()).satisfies(eq(1))
}
#[test]
fn eventually_blocking_reports_elapsed_and_probe_count_on_timeout() -> TestResult {
let calls = Cell::new(0u32);
let error = eventually_blocking_with(
Duration::from_millis(40),
Backoff {
initial: Duration::from_millis(5),
ceiling: Duration::from_millis(5),
factor: 2,
},
|| {
calls.set(calls.get() + 1);
false
},
)
.expect_err("a probe that is never true must time out");
let rendered = error.to_string();
check!(rendered.contains("was not met within")).satisfies(is_true())?;
check!(calls.get()).satisfies(ge(2))?;
check!(rendered.contains(&format!("{} probe", calls.get()))).satisfies(is_true())
}
#[test]
fn eventually_blocking_failure_kind_is_timeout() -> TestResult {
let error = eventually_blocking(Duration::from_millis(1), || false)
.expect_err("an always-false probe times out");
check!(error.kind).satisfies(eq(ErrorKind::Timeout))
}
}