strev 0.6.0

Event-driven pub/sub messaging library with compile-time ack safety
Documentation
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;

use bytes::Bytes;
use serde::{Deserialize, Serialize};
use strev::middleware::{Deduplicator, InMemoryDeduplicateRepository};
use strev::{HandlerError, HandlerResult, Message, Publisher, Router, ShutdownSignal, Topic};
use strev_channel::Channel;
use tokio_util::sync::CancellationToken;

#[derive(Serialize, Deserialize, Debug)]
struct OrderEvent {
    order_id: String,
    action: String,
}

#[tokio::main]
async fn main() {
    let channel = Channel::new(64);
    let topic = Topic::new("orders");

    let processed = Arc::new(AtomicU32::new(0));

    let mut router = Router::new();

    let count = processed.clone();
    router
        .add_consumer(
            "order_processor",
            topic.clone(),
            channel.clone(),
            move |msg: Message| {
                let count = count.clone();
                async move {
                    let (event, msg): (OrderEvent, _) = match msg.try_deserialize() {
                        Ok(v) => v,
                        Err((e, msg)) => {
                            let _ = msg.nack();
                            return Err(HandlerError::Processing(Box::new(e)));
                        }
                    };

                    count.fetch_add(1, Ordering::SeqCst);
                    println!("processing order {}: {}", event.order_id, event.action);
                    Ok(HandlerResult::ack(msg))
                }
            },
        )
        .with_middleware(Deduplicator {
            repository: Arc::new(InMemoryDeduplicateRepository::new(Duration::from_secs(60))),
            key_factory: Some(Arc::new(|msg: &Message| {
                String::from_utf8_lossy(msg.payload()).to_string()
            })),
        });

    let token = CancellationToken::new();
    let tc = token.clone();
    let handle = tokio::spawn(async move { router.run(ShutdownSignal::Token(tc)).await });

    tokio::time::sleep(Duration::from_millis(50)).await;

    let events = vec![
        OrderEvent {
            order_id: "ORD-001".into(),
            action: "created".into(),
        },
        OrderEvent {
            order_id: "ORD-001".into(),
            action: "created".into(),
        },
        OrderEvent {
            order_id: "ORD-002".into(),
            action: "created".into(),
        },
        OrderEvent {
            order_id: "ORD-001".into(),
            action: "created".into(),
        },
        OrderEvent {
            order_id: "ORD-003".into(),
            action: "created".into(),
        },
    ];

    println!("publishing {} messages (with duplicates)...", events.len());
    for event in &events {
        let payload = serde_json::to_vec(event).unwrap();
        Publisher::publish(&channel, &topic, vec![Message::new(Bytes::from(payload))])
            .await
            .unwrap();
    }

    tokio::time::sleep(Duration::from_millis(500)).await;
    token.cancel();
    handle.await.unwrap().unwrap();

    println!(
        "\npublished: {}, processed (deduplicated): {}",
        events.len(),
        processed.load(Ordering::SeqCst)
    );
}