ruststream 0.4.0

Async messaging framework for Rust: broker-agnostic traits, router, codecs, and a conformance harness for broker authors.
Documentation
//! Optional conformance suites, one per capability trait.
//!
//! A broker crate that implements a capability ([`RequestReply`], [`BatchSubscriber`],
//! [`TransactionalPublisher`]) runs the matching suite to prove the implementation honours the
//! trait contract; brokers without the capability simply do not call it. Like
//! [`harness::lifecycle`](super::harness::lifecycle), every suite is built from caller-supplied
//! factories so it stays broker-agnostic, and the in-memory broker is the executable reference
//! that passes all of them.

use std::time::Duration;

use futures::StreamExt;

use super::harness::{expect_next, expect_no_more};
use crate::{
    AckError, BatchSubscriber, Broker, Headers, IncomingMessage, OutgoingMessage, Publisher,
    RequestReply, Subscriber, SubscriptionSource, TransactionalPublisher,
};

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2);
const MISS_TIMEOUT: Duration = Duration::from_millis(100);

/// Verifies the [`RequestReply`] contract.
///
/// A request reaches a responder with a usable `reply-to` header, the correlated reply resolves
/// the request, and a request nobody answers fails once its timeout elapses.
///
/// The factories mirror [`harness::lifecycle`](super::harness::lifecycle): `make_source` opens
/// the responder's subscription, `make_requester` produces the [`RequestReply`] publisher under
/// test, and `make_publisher` produces the plain publisher the responder replies through.
///
/// # Examples
///
/// ```no_run
/// # #[cfg(feature = "memory")]
/// # async fn run() {
/// use ruststream::conformance::capabilities;
/// use ruststream::memory::{MemoryBroker, MemorySource};
///
/// capabilities::request_reply(
///     MemoryBroker::new,
///     |name| MemorySource::new(name),
///     |broker| broker.requester(),
///     |broker| broker.publisher(),
/// )
/// .await;
/// # }
/// ```
///
/// # Panics
///
/// Panics with a descriptive message if any step violates the contract.
pub async fn request_reply<B, MkBroker, Src, MkSrc, Req, MkReq, Pub, MkPub>(
    make_broker: MkBroker,
    make_source: MkSrc,
    make_requester: MkReq,
    make_publisher: MkPub,
) where
    B: Broker,
    MkBroker: Fn() -> B,
    Src: SubscriptionSource<B> + Send,
    Src::Subscriber: Send,
    MkSrc: Fn(&str) -> Src,
    Req: RequestReply,
    MkReq: Fn(&B) -> Req,
    Pub: Publisher,
    MkPub: Fn(&B) -> Pub,
{
    const SUBJECT: &str = "conformance.request_reply";

    let broker = make_broker();
    Broker::connect(&broker).await.expect("broker must connect");

    let mut responder = make_source(SUBJECT)
        .subscribe(&broker)
        .await
        .expect("responder subscription must open after connect");
    let publisher = make_publisher(&broker);
    let requester = make_requester(&broker);

    let respond = async {
        let mut stream = std::pin::pin!(responder.stream());
        let msg = expect_next(&mut stream, "request_reply responder").await;
        assert_eq!(
            msg.payload(),
            b"ping",
            "responder must receive the request payload"
        );
        let reply_to = msg
            .headers()
            .reply_to()
            .expect("a request must carry a usable reply-to header")
            .to_owned();

        // Echo the correlation id when the requester set one; replies must at minimum go to
        // the reply-to destination.
        let mut headers = Headers::new();
        if let Some(correlation_id) = msg.headers().correlation_id() {
            headers.insert("correlation-id", correlation_id.to_owned());
        }
        publisher
            .publish(OutgoingMessage::new(&reply_to, b"pong".as_slice()).with_headers(headers))
            .await
            .expect("reply publish failed");
        match msg.ack().await {
            Ok(()) | Err(AckError::Unsupported) => {}
            Err(other) => panic!("ack must succeed or be unsupported, got: {other:?}"),
        }
    };
    let request = requester.request(
        OutgoingMessage::new(SUBJECT, b"ping".as_slice()),
        DEFAULT_TIMEOUT,
    );

    let (reply, ()) = futures::join!(request, respond);
    let reply = reply.expect("request must resolve once the responder replies");
    assert_eq!(
        reply.payload(),
        b"pong",
        "the correlated reply must carry the responder payload"
    );

    let unanswered = requester
        .request(
            OutgoingMessage::new("conformance.request_reply.void", b"ping".as_slice()),
            MISS_TIMEOUT,
        )
        .await;
    assert!(
        unanswered.is_err(),
        "a request nobody answers must fail once its timeout elapses",
    );

    Broker::shutdown(&broker)
        .await
        .expect("broker must shut down cleanly");
}

/// Verifies the [`BatchSubscriber`] contract.
///
/// Every published message arrives, in publish order, distributed over one or more non-empty
/// batches.
///
/// # Examples
///
/// ```no_run
/// # #[cfg(feature = "memory")]
/// # async fn run() {
/// use ruststream::conformance::capabilities;
/// use ruststream::memory::{MemoryBroker, MemorySource};
///
/// capabilities::batches(
///     MemoryBroker::new,
///     |name| MemorySource::new(name),
///     |broker| broker.publisher(),
/// )
/// .await;
/// # }
/// ```
///
/// # Panics
///
/// Panics with a descriptive message if any step violates the contract.
pub async fn batches<B, MkBroker, Src, MkSrc, Pub, MkPub>(
    make_broker: MkBroker,
    make_source: MkSrc,
    make_publisher: MkPub,
) where
    B: Broker,
    MkBroker: Fn() -> B,
    Src: SubscriptionSource<B> + Send,
    Src::Subscriber: BatchSubscriber + Send,
    MkSrc: Fn(&str) -> Src,
    Pub: Publisher,
    MkPub: Fn(&B) -> Pub,
{
    const SUBJECT: &str = "conformance.batches";
    const COUNT: u32 = 10;

    let broker = make_broker();
    Broker::connect(&broker).await.expect("broker must connect");

    let mut subscriber = make_source(SUBJECT)
        .subscribe(&broker)
        .await
        .expect("subscription must open after connect");
    let publisher = make_publisher(&broker);

    for i in 0..COUNT {
        publisher
            .publish(OutgoingMessage::new(SUBJECT, i.to_be_bytes().as_slice()))
            .await
            .expect("publish failed");
    }

    let mut received = Vec::new();
    let mut stream = std::pin::pin!(subscriber.batches());
    while received.len() < COUNT as usize {
        let batch = tokio::time::timeout(DEFAULT_TIMEOUT, stream.next())
            .await
            .expect("batches: stream timed out")
            .expect("batches: stream ended unexpectedly")
            .unwrap_or_else(|err| panic!("batches: stream yielded error: {err:?}"));

        let batch: Vec<_> = batch.into_iter().collect();
        assert!(!batch.is_empty(), "a yielded batch must not be empty");
        for msg in batch {
            received.push(msg.payload().to_vec());
            match msg.ack().await {
                Ok(()) | Err(AckError::Unsupported) => {}
                Err(other) => panic!("ack must succeed or be unsupported, got: {other:?}"),
            }
        }
    }

    let expected: Vec<Vec<u8>> = (0..COUNT).map(|i| i.to_be_bytes().to_vec()).collect();
    assert_eq!(
        received, expected,
        "batched deliveries must preserve publish order across batches",
    );

    Broker::shutdown(&broker)
        .await
        .expect("broker must shut down cleanly");
}

/// Verifies the [`TransactionalPublisher`] contract.
///
/// Nothing published inside a transaction is visible before `commit`, a commit makes every
/// buffered message visible in publish order, and an abort discards the buffer.
///
/// # Examples
///
/// ```no_run
/// # #[cfg(feature = "memory")]
/// # async fn run() {
/// use ruststream::conformance::capabilities;
/// use ruststream::memory::{MemoryBroker, MemorySource};
///
/// capabilities::transactions(
///     MemoryBroker::new,
///     |name| MemorySource::new(name),
///     |broker| broker.publisher(),
/// )
/// .await;
/// # }
/// ```
///
/// # Panics
///
/// Panics with a descriptive message if any step violates the contract.
pub async fn transactions<B, MkBroker, Src, MkSrc, Pub, MkPub>(
    make_broker: MkBroker,
    make_source: MkSrc,
    make_publisher: MkPub,
) where
    B: Broker,
    MkBroker: Fn() -> B,
    Src: SubscriptionSource<B> + Send,
    Src::Subscriber: Send,
    MkSrc: Fn(&str) -> Src,
    Pub: TransactionalPublisher,
    MkPub: Fn(&B) -> Pub,
{
    const SUBJECT: &str = "conformance.transactions";

    let broker = make_broker();
    Broker::connect(&broker).await.expect("broker must connect");

    let mut subscriber = make_source(SUBJECT)
        .subscribe(&broker)
        .await
        .expect("subscription must open after connect");
    let publisher = make_publisher(&broker);
    let mut stream = std::pin::pin!(subscriber.stream());

    publisher
        .begin_transaction()
        .await
        .expect("begin_transaction failed");
    publisher
        .publish(OutgoingMessage::new(SUBJECT, b"first".as_slice()))
        .await
        .expect("publish inside transaction failed");
    publisher
        .publish(OutgoingMessage::new(SUBJECT, b"second".as_slice()))
        .await
        .expect("publish inside transaction failed");
    expect_no_more(&mut stream, "transactions: before commit").await;

    publisher.commit().await.expect("commit failed");
    let first = expect_next(&mut stream, "transactions: first after commit").await;
    assert_eq!(
        first.payload(),
        b"first",
        "commit must make buffered messages visible in publish order",
    );
    match first.ack().await {
        Ok(()) | Err(AckError::Unsupported) => {}
        Err(other) => panic!("ack must succeed or be unsupported, got: {other:?}"),
    }
    let second = expect_next(&mut stream, "transactions: second after commit").await;
    assert_eq!(second.payload(), b"second");
    match second.ack().await {
        Ok(()) | Err(AckError::Unsupported) => {}
        Err(other) => panic!("ack must succeed or be unsupported, got: {other:?}"),
    }

    publisher
        .begin_transaction()
        .await
        .expect("begin_transaction failed");
    publisher
        .publish(OutgoingMessage::new(SUBJECT, b"discarded".as_slice()))
        .await
        .expect("publish inside transaction failed");
    publisher.abort().await.expect("abort failed");
    expect_no_more(&mut stream, "transactions: after abort").await;

    Broker::shutdown(&broker)
        .await
        .expect("broker must shut down cleanly");
}