ruststream 0.4.0

Async messaging framework for Rust: broker-agnostic traits, router, codecs, and a conformance harness for broker authors.
Documentation
//! Async assertion helpers for end-user broker integration tests.
//!
//! These are intentionally broker-agnostic and operate on the `ruststream-core` traits, so
//! they can be reused by application tests against any broker that ships a `TestClient`.

use std::{future::Future, time::Duration};

use crate::Subscriber;
use futures::StreamExt;
use tokio::time::{sleep, timeout};

/// Polls `cond` until it returns `true` or `deadline` elapses.
///
/// Returns `true` if the condition was met within the deadline, `false` on timeout.
pub async fn wait_until<F>(mut cond: F, deadline: Duration) -> bool
where
    F: FnMut() -> bool,
{
    timeout(deadline, async {
        while !cond() {
            sleep(Duration::from_millis(10)).await;
        }
    })
    .await
    .is_ok()
}

/// Awaits an async `cond` repeatedly until it returns `true` or the deadline elapses.
pub async fn wait_until_async<F, Fut>(mut cond: F, deadline: Duration) -> bool
where
    F: FnMut() -> Fut,
    Fut: Future<Output = bool>,
{
    timeout(deadline, async {
        while !cond().await {
            sleep(Duration::from_millis(10)).await;
        }
    })
    .await
    .is_ok()
}

/// Asserts that `subscriber` yields no further messages within `quiet_for`.
///
/// Drains the next pending item (if any) via the subscriber's stream. Returns `Ok(())` when
/// the subscriber stayed quiet, or `Err(msg)` carrying the unexpected delivery.
///
/// # Errors
///
/// Returns the unexpected `IncomingMessage` if one arrives before the timeout elapses.
pub async fn wait_for_no_messages<S>(
    subscriber: &mut S,
    quiet_for: Duration,
) -> Result<(), S::Message>
where
    S: Subscriber,
{
    let mut stream = std::pin::pin!(subscriber.stream());
    match timeout(quiet_for, stream.next()).await {
        Err(_) | Ok(None | Some(Err(_))) => Ok(()),
        Ok(Some(Ok(msg))) => Err(msg),
    }
}

/// Awaits the next delivery from `subscriber`, panicking on timeout, stream end, or error.
///
/// Convenience helper for the common test pattern of "expect one message".
///
/// # Panics
///
/// Panics if the stream times out, ends, or yields an error.
pub async fn next_message<S>(subscriber: &mut S, within: Duration) -> S::Message
where
    S: Subscriber,
    S::Error: std::fmt::Debug,
{
    let mut stream = std::pin::pin!(subscriber.stream());
    let item = timeout(within, stream.next())
        .await
        .expect("subscriber stream timed out");
    let item = item.expect("subscriber stream ended unexpectedly");
    item.expect("subscriber stream yielded error")
}

#[cfg(test)]
mod tests {
    use std::sync::{
        Arc,
        atomic::{AtomicBool, Ordering},
    };

    use super::*;

    #[tokio::test]
    async fn wait_until_returns_true_when_condition_eventually_holds() {
        let flag = Arc::new(AtomicBool::new(false));
        let flag_clone = Arc::clone(&flag);
        tokio::spawn(async move {
            sleep(Duration::from_millis(20)).await;
            flag_clone.store(true, Ordering::SeqCst);
        });
        assert!(wait_until(|| flag.load(Ordering::SeqCst), Duration::from_millis(500)).await);
    }

    #[tokio::test]
    async fn wait_until_returns_false_on_timeout() {
        let outcome = wait_until(|| false, Duration::from_millis(50)).await;
        assert!(!outcome);
    }

    #[tokio::test]
    async fn wait_until_async_resolves_and_times_out() {
        let flag = Arc::new(AtomicBool::new(false));
        let flag_clone = Arc::clone(&flag);
        tokio::spawn(async move {
            sleep(Duration::from_millis(20)).await;
            flag_clone.store(true, Ordering::SeqCst);
        });
        assert!(
            wait_until_async(
                || {
                    let flag = Arc::clone(&flag);
                    async move { flag.load(Ordering::SeqCst) }
                },
                Duration::from_millis(500),
            )
            .await
        );
        assert!(
            !wait_until_async(|| async { false }, Duration::from_millis(50)).await,
            "a never-true condition must time out"
        );
    }

    #[cfg(feature = "memory")]
    #[tokio::test]
    async fn next_message_returns_the_next_delivery() {
        use crate::{IncomingMessage, OutgoingMessage, Publisher, memory::MemoryBroker};

        let broker = MemoryBroker::new();
        let mut sub = broker.subscribe("conf-next");
        broker
            .publisher()
            .publish(OutgoingMessage::new("conf-next", b"hi".as_slice()))
            .await
            .unwrap();

        let msg = next_message(&mut sub, Duration::from_secs(1)).await;
        assert_eq!(msg.payload(), b"hi");
        msg.ack().await.unwrap();
    }

    #[cfg(feature = "memory")]
    #[tokio::test]
    async fn wait_for_no_messages_distinguishes_quiet_from_delivery() {
        use crate::{OutgoingMessage, Publisher, memory::MemoryBroker};

        let broker = MemoryBroker::new();
        let mut sub = broker.subscribe("conf-quiet");

        // No publish yet: the subscriber stays quiet within the window.
        assert!(
            wait_for_no_messages(&mut sub, Duration::from_millis(50))
                .await
                .is_ok()
        );

        broker
            .publisher()
            .publish(OutgoingMessage::new("conf-quiet", b"surprise".as_slice()))
            .await
            .unwrap();
        // Now a delivery arrives, so the helper hands it back as an error.
        let unexpected = wait_for_no_messages(&mut sub, Duration::from_millis(200)).await;
        assert!(unexpected.is_err());
    }
}