ruststream 0.2.3

Async messaging framework for Rust: broker-agnostic traits, router, codecs, and a conformance harness for broker authors.
Documentation
//! Async assertion helpers for end-user broker integration tests.
//!
//! These are intentionally broker-agnostic and operate on the `ruststream-core` traits, so
//! they can be reused by application tests against any broker that ships a `TestClient`.

use std::{future::Future, time::Duration};

use crate::Subscriber;
use futures::StreamExt;
use tokio::time::{sleep, timeout};

/// Polls `cond` until it returns `true` or `deadline` elapses.
///
/// Returns `true` if the condition was met within the deadline, `false` on timeout.
pub async fn wait_until<F>(mut cond: F, deadline: Duration) -> bool
where
    F: FnMut() -> bool,
{
    timeout(deadline, async {
        while !cond() {
            sleep(Duration::from_millis(10)).await;
        }
    })
    .await
    .is_ok()
}

/// Awaits an async `cond` repeatedly until it returns `true` or the deadline elapses.
pub async fn wait_until_async<F, Fut>(mut cond: F, deadline: Duration) -> bool
where
    F: FnMut() -> Fut,
    Fut: Future<Output = bool>,
{
    timeout(deadline, async {
        while !cond().await {
            sleep(Duration::from_millis(10)).await;
        }
    })
    .await
    .is_ok()
}

/// Asserts that `subscriber` yields no further messages within `quiet_for`.
///
/// Drains the next pending item (if any) via the subscriber's stream. Returns `Ok(())` when
/// the subscriber stayed quiet, or `Err(msg)` carrying the unexpected delivery.
///
/// # Errors
///
/// Returns the unexpected `IncomingMessage` if one arrives before the timeout elapses.
pub async fn wait_for_no_messages<S>(
    subscriber: &mut S,
    quiet_for: Duration,
) -> Result<(), S::Message>
where
    S: Subscriber,
{
    let mut stream = std::pin::pin!(subscriber.stream());
    match timeout(quiet_for, stream.next()).await {
        Err(_) | Ok(None | Some(Err(_))) => Ok(()),
        Ok(Some(Ok(msg))) => Err(msg),
    }
}

/// Awaits the next delivery from `subscriber`, panicking on timeout, stream end, or error.
///
/// Convenience helper for the common test pattern of "expect one message".
///
/// # Panics
///
/// Panics if the stream times out, ends, or yields an error.
pub async fn next_message<S>(subscriber: &mut S, within: Duration) -> S::Message
where
    S: Subscriber,
    S::Error: std::fmt::Debug,
{
    let mut stream = std::pin::pin!(subscriber.stream());
    let item = timeout(within, stream.next())
        .await
        .expect("subscriber stream timed out");
    let item = item.expect("subscriber stream ended unexpectedly");
    item.expect("subscriber stream yielded error")
}

#[cfg(test)]
mod tests {
    use std::sync::{
        Arc,
        atomic::{AtomicBool, Ordering},
    };

    use super::*;

    #[tokio::test]
    async fn wait_until_returns_true_when_condition_eventually_holds() {
        let flag = Arc::new(AtomicBool::new(false));
        let flag_clone = Arc::clone(&flag);
        tokio::spawn(async move {
            sleep(Duration::from_millis(20)).await;
            flag_clone.store(true, Ordering::SeqCst);
        });
        assert!(wait_until(|| flag.load(Ordering::SeqCst), Duration::from_millis(500)).await);
    }

    #[tokio::test]
    async fn wait_until_returns_false_on_timeout() {
        let outcome = wait_until(|| false, Duration::from_millis(50)).await;
        assert!(!outcome);
    }
}