use std::{future::Future, time::Duration};
use crate::Subscriber;
use futures::StreamExt;
use tokio::time::{sleep, timeout};
pub async fn wait_until<F>(mut cond: F, deadline: Duration) -> bool
where
F: FnMut() -> bool,
{
timeout(deadline, async {
while !cond() {
sleep(Duration::from_millis(10)).await;
}
})
.await
.is_ok()
}
pub async fn wait_until_async<F, Fut>(mut cond: F, deadline: Duration) -> bool
where
F: FnMut() -> Fut,
Fut: Future<Output = bool>,
{
timeout(deadline, async {
while !cond().await {
sleep(Duration::from_millis(10)).await;
}
})
.await
.is_ok()
}
pub async fn wait_for_no_messages<S>(
subscriber: &mut S,
quiet_for: Duration,
) -> Result<(), S::Message>
where
S: Subscriber,
{
let mut stream = std::pin::pin!(subscriber.stream());
match timeout(quiet_for, stream.next()).await {
Err(_) | Ok(None | Some(Err(_))) => Ok(()),
Ok(Some(Ok(msg))) => Err(msg),
}
}
pub async fn next_message<S>(subscriber: &mut S, within: Duration) -> S::Message
where
S: Subscriber,
S::Error: std::fmt::Debug,
{
let mut stream = std::pin::pin!(subscriber.stream());
let item = timeout(within, stream.next())
.await
.expect("subscriber stream timed out");
let item = item.expect("subscriber stream ended unexpectedly");
item.expect("subscriber stream yielded error")
}
#[cfg(test)]
mod tests {
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use super::*;
#[tokio::test]
async fn wait_until_returns_true_when_condition_eventually_holds() {
let flag = Arc::new(AtomicBool::new(false));
let flag_clone = Arc::clone(&flag);
tokio::spawn(async move {
sleep(Duration::from_millis(20)).await;
flag_clone.store(true, Ordering::SeqCst);
});
assert!(wait_until(|| flag.load(Ordering::SeqCst), Duration::from_millis(500)).await);
}
#[tokio::test]
async fn wait_until_returns_false_on_timeout() {
let outcome = wait_until(|| false, Duration::from_millis(50)).await;
assert!(!outcome);
}
#[tokio::test]
async fn wait_until_async_resolves_and_times_out() {
let flag = Arc::new(AtomicBool::new(false));
let flag_clone = Arc::clone(&flag);
tokio::spawn(async move {
sleep(Duration::from_millis(20)).await;
flag_clone.store(true, Ordering::SeqCst);
});
assert!(
wait_until_async(
|| {
let flag = Arc::clone(&flag);
async move { flag.load(Ordering::SeqCst) }
},
Duration::from_millis(500),
)
.await
);
assert!(
!wait_until_async(|| async { false }, Duration::from_millis(50)).await,
"a never-true condition must time out"
);
}
#[cfg(feature = "memory")]
#[tokio::test]
async fn next_message_returns_the_next_delivery() {
use crate::{IncomingMessage, OutgoingMessage, Publisher, memory::MemoryBroker};
let broker = MemoryBroker::new();
let mut sub = broker.subscribe("conf-next");
broker
.publisher()
.publish(OutgoingMessage::new("conf-next", b"hi".as_slice()))
.await
.unwrap();
let msg = next_message(&mut sub, Duration::from_secs(1)).await;
assert_eq!(msg.payload(), b"hi");
msg.ack().await.unwrap();
}
#[cfg(feature = "memory")]
#[tokio::test]
async fn wait_for_no_messages_distinguishes_quiet_from_delivery() {
use crate::{OutgoingMessage, Publisher, memory::MemoryBroker};
let broker = MemoryBroker::new();
let mut sub = broker.subscribe("conf-quiet");
assert!(
wait_for_no_messages(&mut sub, Duration::from_millis(50))
.await
.is_ok()
);
broker
.publisher()
.publish(OutgoingMessage::new("conf-quiet", b"surprise".as_slice()))
.await
.unwrap();
let unexpected = wait_for_no_messages(&mut sub, Duration::from_millis(200)).await;
assert!(unexpected.is_err());
}
}