strev 0.5.0

Event-driven pub/sub messaging library with compile-time ack safety
Documentation
use std::num::NonZeroU32;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;

use bytes::Bytes;
use strev::middleware::{CircuitBreaker, CorrelationId, Retry, Throttle, Timeout};
use strev::{HandlerError, HandlerResult, Message, Publisher, Router, ShutdownSignal, Topic};
use strev_channel::Channel;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() {
    let channel = Channel::new(64);
    let topic = Topic::new("orders");

    let attempt_count = Arc::new(AtomicU32::new(0));
    let success_count = Arc::new(AtomicU32::new(0));

    let mut router = Router::new();

    router.add_middleware(CorrelationId);

    router.add_middleware(Timeout {
        duration: Duration::from_secs(5),
    });

    router.add_middleware(Throttle {
        max_per_second: NonZeroU32::new(50).unwrap(),
    });

    let attempts = attempt_count.clone();
    let successes = success_count.clone();
    router
        .add_consumer(
            "flaky_processor",
            topic.clone(),
            channel.clone(),
            move |msg: Message| {
                let attempts = attempts.clone();
                let successes = successes.clone();
                async move {
                    let n = attempts.fetch_add(1, Ordering::SeqCst);
                    let payload = String::from_utf8_lossy(msg.payload()).to_string();
                    let cid = msg
                        .metadata()
                        .get("correlation_id")
                        .unwrap_or("none")
                        .to_string();

                    if n.is_multiple_of(3) {
                        println!("[{cid}] FAIL: {payload} (attempt {n})");
                        let _ = msg.nack();
                        return Err(HandlerError::Processing("transient failure".into()));
                    }

                    successes.fetch_add(1, Ordering::SeqCst);
                    println!("[{cid}] OK: {payload}");
                    Ok(HandlerResult::ack(msg))
                }
            },
        )
        .with_middleware(Retry {
            max_attempts: NonZeroU32::new(3).unwrap(),
            initial_delay: Duration::from_millis(10),
            multiplier: 2.0,
            max_delay: Duration::from_millis(100),
        })
        .with_middleware(CircuitBreaker {
            max_failures: NonZeroU32::new(5).unwrap(),
            reset_timeout: Duration::from_secs(5),
        });

    let token = CancellationToken::new();
    let tc = token.clone();
    let handle = tokio::spawn(async move { router.run(ShutdownSignal::Token(tc)).await });

    tokio::time::sleep(Duration::from_millis(50)).await;

    for i in 0..8 {
        let msg = Message::new(Bytes::from(format!("order-{i}")));
        Publisher::publish(&channel, &topic, vec![msg])
            .await
            .unwrap();
        tokio::time::sleep(Duration::from_millis(30)).await;
    }

    tokio::time::sleep(Duration::from_millis(1000)).await;
    token.cancel();
    handle.await.unwrap().unwrap();

    println!("\n--- Stats ---");
    println!("total attempts: {}", attempt_count.load(Ordering::SeqCst));
    println!("successes: {}", success_count.load(Ordering::SeqCst));
}