strev 0.6.0

Event-driven pub/sub messaging library with compile-time ack safety
Documentation
use std::sync::Arc;
use std::time::Duration;

use bytes::Bytes;
use strev::middleware::{correlation_id, set_correlation_id};
use strev::{
    HandlerResult, Message, Publisher, Router, ShutdownSignal, Subscriber, Topic, bulk_read,
    passthrough,
};
use strev_channel::Channel;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;

#[test]
fn message_copy_creates_new_uuid() {
    let msg = Message::new(Bytes::from("hello"));
    let copy = msg.copy();
    assert_ne!(msg.uuid(), copy.uuid());
    assert_eq!(msg.payload(), copy.payload());
    let _ = msg.ack();
    let _ = copy.ack();
}

#[test]
fn message_copy_preserves_metadata() {
    let mut msg = Message::new(Bytes::from("hello"));
    msg.metadata_mut().set("key", "value");
    let copy = msg.copy();
    assert_eq!(copy.metadata().get("key"), Some("value"));
    let _ = msg.ack();
    let _ = copy.ack();
}

#[test]
fn set_and_get_correlation_id() {
    let mut msg = Message::new(Bytes::from("test"));
    assert!(correlation_id(&msg).is_none());
    set_correlation_id(&mut msg, "abc-123");
    assert_eq!(correlation_id(&msg), Some("abc-123"));
    let _ = msg.ack();
}

#[tokio::test]
async fn passthrough_handler_produces_message() {
    let topic = Topic::new("output");
    let handler = passthrough(topic.clone());

    use strev::Handler;
    let msg = Message::new(Bytes::from("forwarded"));
    let result = handler.handle(msg).await.unwrap();
    assert!(result.outcome().is_acked());
    assert_eq!(result.produced().len(), 1);
    assert_eq!(result.produced()[0].topic, topic);
    assert_eq!(result.produced()[0].payload.as_ref(), b"forwarded");
}

#[tokio::test]
async fn bulk_read_collects_messages() {
    let channel = Channel::new(16);
    let topic = Topic::new("bulk");

    let mut stream = Subscriber::subscribe(&channel, &topic).await.unwrap();

    for i in 0..5 {
        let msg = Message::new(Bytes::from(format!("msg-{i}")));
        Publisher::publish(&channel, &topic, vec![msg])
            .await
            .unwrap();
    }

    let messages = bulk_read(&mut stream, 10, Duration::from_millis(200)).await;
    assert_eq!(messages.len(), 5);
    for msg in messages {
        let _ = msg.ack();
    }
}

#[tokio::test]
async fn bulk_read_respects_limit() {
    let channel = Channel::new(16);
    let topic = Topic::new("bulk");

    let mut stream = Subscriber::subscribe(&channel, &topic).await.unwrap();

    for i in 0..10 {
        let msg = Message::new(Bytes::from(format!("msg-{i}")));
        Publisher::publish(&channel, &topic, vec![msg])
            .await
            .unwrap();
    }

    let messages = bulk_read(&mut stream, 3, Duration::from_millis(200)).await;
    assert_eq!(messages.len(), 3);
    for msg in messages {
        let _ = msg.ack();
    }
}

#[tokio::test]
async fn bulk_read_respects_timeout() {
    let channel = Channel::new(16);
    let topic = Topic::new("bulk");

    let mut stream = Subscriber::subscribe(&channel, &topic).await.unwrap();

    let msg = Message::new(Bytes::from("only-one"));
    Publisher::publish(&channel, &topic, vec![msg])
        .await
        .unwrap();

    let start = std::time::Instant::now();
    let messages = bulk_read(&mut stream, 10, Duration::from_millis(100)).await;
    let elapsed = start.elapsed();

    assert_eq!(messages.len(), 1);
    assert!(elapsed >= Duration::from_millis(80));
    for msg in messages {
        let _ = msg.ack();
    }
}

#[test]
fn router_config_default_close_timeout() {
    use strev::RouterConfig;
    let config = RouterConfig::default();
    assert_eq!(config.close_timeout, Duration::from_secs(30));
}

#[test]
fn router_handler_names() {
    let channel = Channel::new(16);
    let mut router = Router::new();

    router.add_consumer(
        "handler_a",
        Topic::new("a"),
        channel.clone(),
        |msg: Message| async move { Ok(HandlerResult::ack(msg)) },
    );

    router.add_consumer(
        "handler_b",
        Topic::new("b"),
        channel.clone(),
        |msg: Message| async move { Ok(HandlerResult::ack(msg)) },
    );

    let names = router.handler_names();
    assert_eq!(names, vec!["handler_a", "handler_b"]);
}

#[test]
#[should_panic(expected = "duplicate handler name")]
fn router_rejects_duplicate_handler_names() {
    let channel = Channel::new(16);
    let mut router = Router::new();

    router.add_consumer(
        "same_name",
        Topic::new("a"),
        channel.clone(),
        |msg: Message| async move { Ok(HandlerResult::ack(msg)) },
    );

    router.add_consumer(
        "same_name",
        Topic::new("b"),
        channel.clone(),
        |msg: Message| async move { Ok(HandlerResult::ack(msg)) },
    );
}

#[tokio::test]
async fn router_with_publisher_decorator() {
    let channel = Channel::new(64);
    let topic_in = Topic::new("input");
    let topic_out = Topic::new("output");

    let mut router = Router::new();

    router.add_publisher_decorator(strev::MessageTransformPublisherDecorator {
        transform: Arc::new(|msg: &mut Message| {
            msg.metadata_mut().set("decorated", "true");
        }),
    });

    router.add_handler(
        "forward",
        topic_in.clone(),
        channel.clone(),
        channel.clone(),
        passthrough(topic_out.clone()),
    );

    let mut output = Subscriber::subscribe(&channel, &topic_out).await.unwrap();

    let token = CancellationToken::new();
    let tc = token.clone();
    let handle = tokio::spawn(async move { router.run(ShutdownSignal::Token(tc)).await });

    tokio::time::sleep(Duration::from_millis(50)).await;

    let msg = Message::new(Bytes::from("test"));
    Publisher::publish(&channel, &topic_in, vec![msg])
        .await
        .unwrap();

    let received = tokio::time::timeout(Duration::from_millis(500), output.next())
        .await
        .unwrap()
        .unwrap();

    assert_eq!(received.metadata().get("decorated"), Some("true"));
    let _ = received.ack();

    token.cancel();
    handle.await.unwrap().unwrap();
}