jaeb 0.2.2

simple actor based event bus
Documentation
// SPDX-License-Identifier: MIT
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

use jaeb::{EventBus, EventBusError, EventHandler, HandlerResult};

// ── Event types ──────────────────────────────────────────────────────

#[derive(Clone, Debug)]
struct Msg;

// ── Handlers ─────────────────────────────────────────────────────────

struct SlowHandler {
    count: Arc<AtomicUsize>,
}

impl EventHandler<Msg> for SlowHandler {
    async fn handle(&self, _event: &Msg) -> HandlerResult {
        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
        self.count.fetch_add(1, Ordering::SeqCst);
        Ok(())
    }
}

// ── Tests ────────────────────────────────────────────────────────────

#[tokio::test]
async fn try_publish_reports_channel_full() {
    let bus = EventBus::new(1).expect("valid config");

    bus.try_publish(Msg).expect("first try_publish should succeed");

    let err = bus.try_publish(Msg).expect_err("second try_publish should fail");
    assert_eq!(err, EventBusError::ChannelFull);

    bus.shutdown().await.expect("shutdown");
}

#[tokio::test]
async fn try_publish_after_shutdown_returns_actor_stopped() {
    let bus = EventBus::new(16).expect("valid config");
    bus.shutdown().await.expect("shutdown");

    let err = bus.try_publish(Msg).expect_err("try_publish after shutdown");
    assert_eq!(err, EventBusError::ActorStopped);
}

/// Fill the channel buffer with `try_publish`, then verify that `publish`
/// (the blocking variant) eventually succeeds once the actor drains the
/// queue.  This tests the interaction between backpressure and the actor
/// loop under a small buffer.
#[tokio::test]
async fn publish_succeeds_after_channel_drains() {
    let bus = EventBus::new(2).expect("valid config");
    let count = Arc::new(AtomicUsize::new(0));

    let _ = bus.subscribe(SlowHandler { count: Arc::clone(&count) }).await.expect("subscribe");

    // Fill the buffer. try_publish is non-blocking and succeeds as long as
    // there is capacity.
    bus.try_publish(Msg).expect("try_publish 1");
    bus.try_publish(Msg).expect("try_publish 2");

    // Buffer should be full now.
    assert_eq!(bus.try_publish(Msg).unwrap_err(), EventBusError::ChannelFull);

    // The blocking `publish` should succeed once the actor processes a
    // message and frees a slot. Use a timeout to avoid hanging if something
    // goes wrong.
    tokio::time::timeout(std::time::Duration::from_secs(5), bus.publish(Msg))
        .await
        .expect("publish should not time out")
        .expect("publish should succeed after drain");

    bus.shutdown().await.expect("shutdown");

    // All 3 events (2 via try_publish + 1 via publish) should have been
    // handled.
    assert_eq!(count.load(Ordering::SeqCst), 3, "all events should have been processed");
}