ruststream 0.4.0

Async messaging framework for Rust: broker-agnostic traits, router, codecs, and a conformance harness for broker authors.
Documentation
//! Integration tests for the unified failure policy: handler panics and decode failures, settled
//! by the per-subscriber `on_failure(panic = .., decode = ..)` policy. Driven over `MemoryBroker`.
#![cfg(feature = "macros")]

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

use ruststream::memory::MemoryBroker;
use ruststream::runtime::{
    AppInfo, HandlerResult, Router, RustStream, RustStreamError, TypedPublisher,
};
use ruststream::{OutgoingMessage, Publisher, subscriber};
use serde::{Deserialize, Serialize};
use tokio::sync::Notify;

#[derive(Debug, Serialize, Deserialize)]
struct Order {
    id: u32,
}

fn order_bytes(id: u32) -> Vec<u8> {
    serde_json::to_vec(&Order { id }).unwrap()
}

// Counters keyed per handler so the parallel tests do not interfere; each handler is used by one
// test only.
static DROP_DONE: AtomicUsize = AtomicUsize::new(0);
static SKIP_DONE: AtomicUsize = AtomicUsize::new(0);
static RPC_DONE: AtomicUsize = AtomicUsize::new(0);
static BATCH_DONE: AtomicUsize = AtomicUsize::new(0);
static BATCH_REPLY_DONE: AtomicUsize = AtomicUsize::new(0);

/// Default policy: a panic fails fast. Used by `handler_panic_fails_fast_and_run_returns_err`.
#[subscriber("boom")]
async fn boom(order: &Order) -> HandlerResult {
    // The test publishes ids other than u32::MAX, so this assertion always fails (panics); the
    // trailing expression keeps the body typed as HandlerResult.
    assert_eq!(order.id, u32::MAX, "handler exploded");
    HandlerResult::Ack
}

/// `panic = drop` settles the offending message and keeps consuming. The poison id is 0.
#[subscriber("dropping", on_failure(panic = drop))]
async fn dropping(order: &Order) -> HandlerResult {
    assert!(order.id != 0, "poison order must panic");
    DROP_DONE.fetch_add(1, Ordering::SeqCst);
    HandlerResult::Ack
}

/// `decode = fail_fast` tears the service down on a payload that cannot decode.
#[subscriber("decodeff", on_failure(decode = fail_fast))]
async fn decode_ff(_order: &Order) -> HandlerResult {
    HandlerResult::Ack
}

/// `decode = skip` acks past a payload that cannot decode and keeps consuming.
#[subscriber("skipping", on_failure(decode = skip))]
async fn skipping(_order: &Order) -> HandlerResult {
    SKIP_DONE.fetch_add(1, Ordering::SeqCst);
    HandlerResult::Ack
}

/// A batch handler under an explicit `panic = fail_fast`.
#[subscriber(batch("batchboom"), on_failure(panic = fail_fast))]
async fn batch_boom(orders: &[Order]) -> HandlerResult {
    // The test always delivers a non-empty batch, so this assertion always fails (panics).
    assert!(orders.is_empty(), "batch handler exploded");
    HandlerResult::Ack
}

/// A publishing handler: exercises the single-message decode-failure path (default `decode = drop`).
#[subscriber("rpcd", publish("rpcd.out"))]
async fn rpcd(order: &Order) -> u32 {
    RPC_DONE.fetch_add(1, Ordering::SeqCst);
    order.id
}

/// A plain batch handler: exercises the per-element batch decode-failure path.
#[subscriber(batch("bd"))]
async fn bd(orders: &[Order]) -> HandlerResult {
    BATCH_DONE.fetch_add(orders.len(), Ordering::SeqCst);
    HandlerResult::Ack
}

/// A batch publishing handler: exercises the batch-publishing decode-failure path.
#[subscriber(batch("bpd"), publish("bpd.out"))]
async fn bpd(orders: &[Order]) -> Vec<u32> {
    BATCH_REPLY_DONE.fetch_add(orders.len(), Ordering::SeqCst);
    orders.iter().map(|o| o.id).collect()
}

/// Spawns `run_until(pending)` and republishes `payload` to `topic` (the macro subscription only
/// opens after connect) until the service tears itself down, then returns the run result.
async fn run_until_torn_down(
    app: RustStream,
    publisher: impl Publisher,
    topic: &str,
    payload: Vec<u8>,
) -> Result<(), RustStreamError> {
    let run = tokio::spawn(app.run_until(std::future::pending::<()>()));
    let outcome = tokio::time::timeout(Duration::from_secs(5), async {
        loop {
            let _ = publisher
                .publish(OutgoingMessage::new(topic, &payload))
                .await;
            if run.is_finished() {
                break;
            }
            tokio::task::yield_now().await;
        }
        run.await
    })
    .await;
    outcome
        .expect("service did not tear down within the deadline")
        .expect("run task panicked")
}

/// Republishes `payload` to `topic` until `counter` advances once, proving the subscription is live
/// and the handler ran.
async fn drive_until_seen(
    publisher: &impl Publisher,
    topic: &str,
    payload: &[u8],
    counter: &AtomicUsize,
) {
    tokio::time::timeout(Duration::from_secs(5), async {
        let start = counter.load(Ordering::SeqCst);
        while counter.load(Ordering::SeqCst) == start {
            let _ = publisher
                .publish(OutgoingMessage::new(topic, payload))
                .await;
            tokio::task::yield_now().await;
        }
    })
    .await
    .expect("subscription never went live");
}

async fn wait_for(mut cond: impl FnMut() -> bool, timeout: Duration) {
    let result = tokio::time::timeout(timeout, async {
        while !cond() {
            tokio::task::yield_now().await;
        }
    })
    .await;
    assert!(result.is_ok(), "condition not met within {timeout:?}");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn handler_panic_fails_fast_and_run_returns_err() {
    let broker = MemoryBroker::new();
    let publisher = broker.publisher();
    let app = RustStream::new(AppInfo::new("boom", "0.1.0")).with_broker(broker, |b| {
        b.include(boom);
    });

    let result = run_until_torn_down(app, publisher, "boom", order_bytes(1)).await;
    assert!(
        matches!(result, Err(RustStreamError::Dispatch(_))),
        "a fail-fast panic must make run() return a dispatch error, got {result:?}",
    );
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn panic_drop_keeps_the_subscriber_consuming() {
    let broker = MemoryBroker::new();
    let publisher = broker.publisher();
    let app = RustStream::new(AppInfo::new("dropping", "0.1.0")).with_broker(broker, |b| {
        b.include(dropping);
    });

    let shutdown = Arc::new(Notify::new());
    let signal = Arc::clone(&shutdown);
    let run = tokio::spawn(app.run_until(async move { signal.notified().await }));

    drive_until_seen(&publisher, "dropping", &order_bytes(7), &DROP_DONE).await;
    let before = DROP_DONE.load(Ordering::SeqCst);

    // A poison order panics (dropped), then a good order must still be processed.
    publisher
        .publish(OutgoingMessage::new("dropping", &order_bytes(0)))
        .await
        .unwrap();
    publisher
        .publish(OutgoingMessage::new("dropping", &order_bytes(9)))
        .await
        .unwrap();

    wait_for(
        || DROP_DONE.load(Ordering::SeqCst) > before,
        Duration::from_secs(5),
    )
    .await;

    shutdown.notify_one();
    let result = tokio::time::timeout(Duration::from_secs(5), run)
        .await
        .expect("run did not stop")
        .expect("run task panicked");
    assert!(
        result.is_ok(),
        "a dropped panic must not error the run: {result:?}"
    );
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn decode_fail_fast_returns_err() {
    let broker = MemoryBroker::new();
    let publisher = broker.publisher();
    let app = RustStream::new(AppInfo::new("decodeff", "0.1.0")).with_broker(broker, |b| {
        b.include(decode_ff);
    });

    let result = run_until_torn_down(app, publisher, "decodeff", b"not json".to_vec()).await;
    assert!(
        matches!(result, Err(RustStreamError::Dispatch(_))),
        "a fail-fast decode failure must make run() return a dispatch error, got {result:?}",
    );
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn decode_skip_acks_past_bad_input_and_continues() {
    let broker = MemoryBroker::new();
    let publisher = broker.publisher();
    let app = RustStream::new(AppInfo::new("skipping", "0.1.0")).with_broker(broker, |b| {
        b.include(skipping);
    });

    let shutdown = Arc::new(Notify::new());
    let signal = Arc::clone(&shutdown);
    let run = tokio::spawn(app.run_until(async move { signal.notified().await }));

    drive_until_seen(&publisher, "skipping", &order_bytes(1), &SKIP_DONE).await;
    let before = SKIP_DONE.load(Ordering::SeqCst);

    // A malformed payload is skipped (acked past), then a good order is still processed.
    publisher
        .publish(OutgoingMessage::new("skipping", b"not json"))
        .await
        .unwrap();
    publisher
        .publish(OutgoingMessage::new("skipping", &order_bytes(2)))
        .await
        .unwrap();

    wait_for(
        || SKIP_DONE.load(Ordering::SeqCst) > before,
        Duration::from_secs(5),
    )
    .await;

    shutdown.notify_one();
    let result = tokio::time::timeout(Duration::from_secs(5), run)
        .await
        .expect("run did not stop")
        .expect("run task panicked");
    assert!(
        result.is_ok(),
        "a skipped decode failure must not error the run: {result:?}"
    );
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publishing_decode_failure_is_dropped_and_continues() {
    let broker = MemoryBroker::new();
    let publisher = broker.publisher();
    let router = Router::<MemoryBroker>::new()
        .include_publishing(rpcd, TypedPublisher::new(broker.publisher()));
    let app = RustStream::new(AppInfo::new("rpcd", "0.1.0"))
        .with_broker(broker, |b| b.include_router(router));

    let shutdown = Arc::new(Notify::new());
    let signal = Arc::clone(&shutdown);
    let run = tokio::spawn(app.run_until(async move { signal.notified().await }));

    drive_until_seen(&publisher, "rpcd", &order_bytes(1), &RPC_DONE).await;
    let before = RPC_DONE.load(Ordering::SeqCst);
    publisher
        .publish(OutgoingMessage::new("rpcd", b"not json"))
        .await
        .unwrap();
    publisher
        .publish(OutgoingMessage::new("rpcd", &order_bytes(2)))
        .await
        .unwrap();
    wait_for(
        || RPC_DONE.load(Ordering::SeqCst) > before,
        Duration::from_secs(5),
    )
    .await;

    shutdown.notify_one();
    let result = tokio::time::timeout(Duration::from_secs(5), run)
        .await
        .expect("run did not stop")
        .expect("run task panicked");
    assert!(
        result.is_ok(),
        "a dropped decode failure must not error the run: {result:?}"
    );
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn batch_decode_failure_drops_the_bad_element() {
    let broker = MemoryBroker::new();
    let publisher = broker.publisher();
    let app = RustStream::new(AppInfo::new("bd", "0.1.0")).with_broker(broker, |b| {
        b.include_batch(bd);
    });

    let shutdown = Arc::new(Notify::new());
    let signal = Arc::clone(&shutdown);
    let run = tokio::spawn(app.run_until(async move { signal.notified().await }));

    drive_until_seen(&publisher, "bd", &order_bytes(1), &BATCH_DONE).await;
    let before = BATCH_DONE.load(Ordering::SeqCst);
    publisher
        .publish(OutgoingMessage::new("bd", b"not json"))
        .await
        .unwrap();
    publisher
        .publish(OutgoingMessage::new("bd", &order_bytes(2)))
        .await
        .unwrap();
    wait_for(
        || BATCH_DONE.load(Ordering::SeqCst) > before,
        Duration::from_secs(5),
    )
    .await;

    shutdown.notify_one();
    let result = tokio::time::timeout(Duration::from_secs(5), run)
        .await
        .expect("run did not stop")
        .expect("run task panicked");
    assert!(
        result.is_ok(),
        "a dropped batch element must not error the run: {result:?}"
    );
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn batch_publishing_decode_failure_is_dropped() {
    let broker = MemoryBroker::new();
    let publisher = broker.publisher();
    let router = Router::<MemoryBroker>::new()
        .include_batch_publishing(bpd, TypedPublisher::new(broker.publisher()));
    let app = RustStream::new(AppInfo::new("bpd", "0.1.0"))
        .with_broker(broker, |b| b.include_router(router));

    let shutdown = Arc::new(Notify::new());
    let signal = Arc::clone(&shutdown);
    let run = tokio::spawn(app.run_until(async move { signal.notified().await }));

    drive_until_seen(&publisher, "bpd", &order_bytes(1), &BATCH_REPLY_DONE).await;
    let before = BATCH_REPLY_DONE.load(Ordering::SeqCst);
    publisher
        .publish(OutgoingMessage::new("bpd", b"not json"))
        .await
        .unwrap();
    publisher
        .publish(OutgoingMessage::new("bpd", &order_bytes(2)))
        .await
        .unwrap();
    wait_for(
        || BATCH_REPLY_DONE.load(Ordering::SeqCst) > before,
        Duration::from_secs(5),
    )
    .await;

    shutdown.notify_one();
    let result = tokio::time::timeout(Duration::from_secs(5), run)
        .await
        .expect("run did not stop")
        .expect("run task panicked");
    assert!(
        result.is_ok(),
        "a dropped batch reply element must not error the run: {result:?}"
    );
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn batch_handler_panic_fails_fast() {
    let broker = MemoryBroker::new();
    let publisher = broker.publisher();
    let app = RustStream::new(AppInfo::new("batchboom", "0.1.0")).with_broker(broker, |b| {
        b.include_batch(batch_boom);
    });

    let result = run_until_torn_down(app, publisher, "batchboom", order_bytes(1)).await;
    assert!(
        matches!(result, Err(RustStreamError::Dispatch(_))),
        "a fail-fast batch panic must make run() return a dispatch error, got {result:?}",
    );
}