use std::{future::Future, time::Duration};
use crate::Subscriber;
use futures::StreamExt;
use tokio::time::{sleep, timeout};
pub async fn wait_until<F>(mut cond: F, deadline: Duration) -> bool
where
F: FnMut() -> bool,
{
timeout(deadline, async {
while !cond() {
sleep(Duration::from_millis(10)).await;
}
})
.await
.is_ok()
}
pub async fn wait_until_async<F, Fut>(mut cond: F, deadline: Duration) -> bool
where
F: FnMut() -> Fut,
Fut: Future<Output = bool>,
{
timeout(deadline, async {
while !cond().await {
sleep(Duration::from_millis(10)).await;
}
})
.await
.is_ok()
}
pub async fn wait_for_no_messages<S>(
subscriber: &mut S,
quiet_for: Duration,
) -> Result<(), S::Message>
where
S: Subscriber,
{
let mut stream = std::pin::pin!(subscriber.stream());
match timeout(quiet_for, stream.next()).await {
Err(_) | Ok(None | Some(Err(_))) => Ok(()),
Ok(Some(Ok(msg))) => Err(msg),
}
}
pub async fn next_message<S>(subscriber: &mut S, within: Duration) -> S::Message
where
S: Subscriber,
S::Error: std::fmt::Debug,
{
let mut stream = std::pin::pin!(subscriber.stream());
let item = timeout(within, stream.next())
.await
.expect("subscriber stream timed out");
let item = item.expect("subscriber stream ended unexpectedly");
item.expect("subscriber stream yielded error")
}
#[cfg(test)]
mod tests {
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use super::*;
#[tokio::test]
async fn wait_until_returns_true_when_condition_eventually_holds() {
let flag = Arc::new(AtomicBool::new(false));
let flag_clone = Arc::clone(&flag);
tokio::spawn(async move {
sleep(Duration::from_millis(20)).await;
flag_clone.store(true, Ordering::SeqCst);
});
assert!(wait_until(|| flag.load(Ordering::SeqCst), Duration::from_millis(500)).await);
}
#[tokio::test]
async fn wait_until_returns_false_on_timeout() {
let outcome = wait_until(|| false, Duration::from_millis(50)).await;
assert!(!outcome);
}
}