strev 0.6.0

Event-driven pub/sub messaging library with compile-time ack safety
Documentation
use std::num::NonZeroU32;
use std::time::Duration;

use bytes::Bytes;
use strev::middleware::{CorrelationId, Retry, Timeout};
use strev::{HandlerResult, Message, Publisher, Router, ShutdownSignal, Topic};
use strev_channel::Channel;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() {
    let channel = Channel::new(64);
    let topic = Topic::new("incoming_messages");

    let mut router = Router::new();

    router.add_middleware(CorrelationId);
    router.add_middleware(Retry {
        max_attempts: NonZeroU32::new(3).unwrap(),
        initial_delay: Duration::from_millis(100),
        multiplier: 2.0,
        max_delay: Duration::from_secs(1),
    });

    router
        .add_consumer(
            "print_messages",
            topic.clone(),
            channel.clone(),
            |msg: Message| async move {
                let correlation_id = msg
                    .metadata()
                    .get("correlation_id")
                    .unwrap_or("none")
                    .to_string();

                let text = String::from_utf8_lossy(msg.payload());
                println!("[{correlation_id}] received: {text}");

                Ok(HandlerResult::ack(msg))
            },
        )
        .with_middleware(Timeout {
            duration: Duration::from_secs(5),
        });

    let token = CancellationToken::new();
    let token_clone = token.clone();

    let router_handle =
        tokio::spawn(async move { router.run(ShutdownSignal::Token(token_clone)).await });

    tokio::time::sleep(Duration::from_millis(50)).await;

    for i in 0..5 {
        let msg = Message::new(Bytes::from(format!("Hello, world! #{i}")));
        Publisher::publish(&channel, &topic, vec![msg])
            .await
            .unwrap();
        tokio::time::sleep(Duration::from_millis(200)).await;
    }

    tokio::time::sleep(Duration::from_millis(200)).await;
    token.cancel();
    router_handle.await.unwrap().unwrap();
    println!("router shut down gracefully");
}