jaeb 0.3.0

simple snapshot-driven event bus
Documentation
//! Tests for per-event-type snapshot-slot architecture.
//!
//! These tests validate structural properties of per-type snapshot slots
//! and dispatch lanes rather than general event bus semantics.

use jaeb::{EventBus, EventHandler, HandlerResult, SyncEventHandler};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Barrier;
use tokio::time::Instant;

// ── Event types ──────────────────────────────────────────────────────

#[derive(Clone, Debug)]
struct Alpha(usize);

#[derive(Clone, Debug)]
struct Beta(usize);

#[derive(Clone, Debug)]
struct Gamma;

// ── Helpers ──────────────────────────────────────────────────────────

struct SyncAlphaRecorder {
    log: Arc<std::sync::Mutex<Vec<usize>>>,
}

impl SyncEventHandler<Alpha> for SyncAlphaRecorder {
    fn handle(&self, event: &Alpha) -> HandlerResult {
        self.log.lock().unwrap().push(event.0);
        Ok(())
    }
}

struct SlowAlphaHandler {
    barrier: Arc<Barrier>,
    reached: Arc<AtomicBool>,
}

impl EventHandler<Alpha> for SlowAlphaHandler {
    async fn handle(&self, _event: &Alpha) -> HandlerResult {
        self.reached.store(true, Ordering::SeqCst);
        self.barrier.wait().await;
        Ok(())
    }
}

struct SlowBetaHandler {
    barrier: Arc<Barrier>,
    reached: Arc<AtomicBool>,
}

impl EventHandler<Beta> for SlowBetaHandler {
    async fn handle(&self, _event: &Beta) -> HandlerResult {
        self.reached.store(true, Ordering::SeqCst);
        self.barrier.wait().await;
        Ok(())
    }
}

struct AlphaAsyncCounter(Arc<AtomicUsize>);

impl EventHandler<Alpha> for AlphaAsyncCounter {
    async fn handle(&self, event: &Alpha) -> HandlerResult {
        self.0.fetch_add(event.0, Ordering::SeqCst);
        Ok(())
    }
}

struct BetaAsyncCounter(Arc<AtomicUsize>);

impl EventHandler<Beta> for BetaAsyncCounter {
    async fn handle(&self, event: &Beta) -> HandlerResult {
        self.0.fetch_add(event.0, Ordering::SeqCst);
        Ok(())
    }
}

struct GammaAsyncCounter(Arc<AtomicUsize>);

impl EventHandler<Gamma> for GammaAsyncCounter {
    async fn handle(&self, _event: &Gamma) -> HandlerResult {
        self.0.fetch_add(1, Ordering::SeqCst);
        Ok(())
    }
}

struct AlphaSyncCounter(Arc<AtomicUsize>);

impl SyncEventHandler<Alpha> for AlphaSyncCounter {
    fn handle(&self, _event: &Alpha) -> HandlerResult {
        self.0.fetch_add(1, Ordering::SeqCst);
        Ok(())
    }
}

// ── Tests ────────────────────────────────────────────────────────────

/// Different event types are dispatched via independent type slots, so two
/// slow handlers on different event types should run concurrently rather
/// than blocking each other.
#[tokio::test]
async fn per_type_parallelism() {
    let bus = EventBus::new(64).expect("valid config");

    // Two barriers — each needs 2 waiters (handler + test).
    let barrier_alpha = Arc::new(Barrier::new(2));
    let barrier_beta = Arc::new(Barrier::new(2));
    let reached_alpha = Arc::new(AtomicBool::new(false));
    let reached_beta = Arc::new(AtomicBool::new(false));

    let _ = bus
        .subscribe(SlowAlphaHandler {
            barrier: Arc::clone(&barrier_alpha),
            reached: Arc::clone(&reached_alpha),
        })
        .await
        .expect("subscribe alpha");

    let _ = bus
        .subscribe(SlowBetaHandler {
            barrier: Arc::clone(&barrier_beta),
            reached: Arc::clone(&reached_beta),
        })
        .await
        .expect("subscribe beta");

    // Publish to both types — dispatch returns quickly because handlers
    // are async (spawned).
    bus.publish(Alpha(1)).await.expect("publish alpha");
    bus.publish(Beta(1)).await.expect("publish beta");

    // Both handlers should reach their barrier concurrently. If type slots
    // were serialised the second handler would never start until the
    // first barrier was released.
    let start = Instant::now();
    let timeout = Duration::from_secs(5);

    // Spin until both handlers have reached their barrier entry point.
    loop {
        if reached_alpha.load(Ordering::SeqCst) && reached_beta.load(Ordering::SeqCst) {
            break;
        }
        if start.elapsed() > timeout {
            panic!("timed out waiting for both handlers to start — type slots may not be parallel");
        }
        tokio::task::yield_now().await;
    }

    // Release both barriers so handlers can complete.
    barrier_alpha.wait().await;
    barrier_beta.wait().await;

    bus.shutdown().await.expect("shutdown");
}

/// Events of the same type dispatched sequentially should be processed
/// in FIFO order by the same sync lane.
#[tokio::test]
async fn fifo_within_event_type() {
    let bus = EventBus::new(64).expect("valid config");
    let log = Arc::new(std::sync::Mutex::new(Vec::new()));

    let _ = bus.subscribe(SyncAlphaRecorder { log: Arc::clone(&log) }).await.expect("subscribe");

    for i in 0..10 {
        bus.publish(Alpha(i)).await.expect("publish");
    }

    bus.shutdown().await.expect("shutdown");

    let entries: Vec<usize> = log.lock().unwrap().clone();
    let expected: Vec<usize> = (0..10).collect();
    assert_eq!(entries, expected, "events should arrive in FIFO order within the same event type");
}

/// No type slot should exist until a subscription for that event
/// type is actually registered. We verify this indirectly: registering
/// event types for only Alpha and publishing to Beta should result in no
/// stats for Beta.
#[tokio::test]
async fn lazy_type_slot_registration() {
    let bus = EventBus::new(64).expect("valid config");
    let count = Arc::new(AtomicUsize::new(0));

    // Only subscribe to Alpha.
    let _ = bus.subscribe(AlphaSyncCounter(Arc::clone(&count))).await.expect("subscribe alpha");

    let stats = bus.stats().await.expect("stats");
    assert_eq!(stats.registered_event_types.len(), 1, "only one event type should have a slot");

    // Publishing Beta should be a no-op (no slot exists for it).
    bus.publish(Beta(1)).await.expect("publish beta");

    // Stats should still show only one event type.
    let stats = bus.stats().await.expect("stats after publish");
    assert_eq!(
        stats.registered_event_types.len(),
        1,
        "publishing to an unregistered type should not create a slot"
    );

    bus.shutdown().await.expect("shutdown");
    assert_eq!(count.load(Ordering::SeqCst), 0, "Alpha handler should not have fired");
}

/// Unregistering all listeners and middleware for an event type should
/// eventually remove the slot, freeing resources.
#[tokio::test]
async fn type_slot_cleanup_on_empty() {
    let bus = EventBus::new(64).expect("valid config");
    let count = Arc::new(AtomicUsize::new(0));

    let sub = bus.subscribe(AlphaSyncCounter(Arc::clone(&count))).await.expect("subscribe");

    let stats = bus.stats().await.expect("stats before unsub");
    assert_eq!(stats.total_subscriptions, 1);
    assert_eq!(stats.registered_event_types.len(), 1, "slot should exist for Alpha");

    // Unsubscribe the only listener — slot becomes empty.
    let removed = sub.unsubscribe().await.expect("unsubscribe");
    assert!(removed, "unsubscribe should return true");

    let stats = bus.stats().await.expect("stats after unsub");
    assert_eq!(stats.total_subscriptions, 0);
    assert_eq!(stats.registered_event_types.len(), 0, "empty slot should be cleaned up");

    bus.shutdown().await.expect("shutdown");
}

/// Shutdown must aggregate results from all active type slots.
/// If we have three event types registered, shutdown should drain all
/// of them and report success.
#[tokio::test]
async fn shutdown_aggregates_all_type_slots() {
    let bus = EventBus::new(64).expect("valid config");
    let alpha_count = Arc::new(AtomicUsize::new(0));
    let beta_count = Arc::new(AtomicUsize::new(0));
    let gamma_count = Arc::new(AtomicUsize::new(0));

    let _ = bus.subscribe(AlphaAsyncCounter(Arc::clone(&alpha_count))).await.expect("subscribe alpha");

    let _ = bus.subscribe(BetaAsyncCounter(Arc::clone(&beta_count))).await.expect("subscribe beta");

    let _ = bus.subscribe(GammaAsyncCounter(Arc::clone(&gamma_count))).await.expect("subscribe gamma");

    bus.publish(Alpha(10)).await.expect("publish alpha");
    bus.publish(Beta(20)).await.expect("publish beta");
    bus.publish(Gamma).await.expect("publish gamma");

    // Shutdown drains in-flight handlers across all type slots.
    bus.shutdown().await.expect("shutdown");

    assert_eq!(alpha_count.load(Ordering::SeqCst), 10, "alpha handler should have executed");
    assert_eq!(beta_count.load(Ordering::SeqCst), 20, "beta handler should have executed");
    assert_eq!(gamma_count.load(Ordering::SeqCst), 1, "gamma handler should have executed");
}

/// Multiple listeners on the same event type coexist within a single
/// type slot — each receives the event.
#[tokio::test]
async fn multiple_listeners_share_type_slot() {
    let bus = EventBus::new(64).expect("valid config");
    let sync_count = Arc::new(AtomicUsize::new(0));
    let async_count = Arc::new(AtomicUsize::new(0));

    let _ = bus.subscribe(AlphaSyncCounter(Arc::clone(&sync_count))).await.expect("subscribe sync");

    let _ = bus.subscribe(AlphaAsyncCounter(Arc::clone(&async_count))).await.expect("subscribe async");

    // Despite two subscriptions, there should be only one event type registered.
    let stats = bus.stats().await.expect("stats");
    assert_eq!(stats.total_subscriptions, 2, "both listeners should be counted");
    assert_eq!(stats.registered_event_types.len(), 1, "both should share a single type slot");

    bus.publish(Alpha(5)).await.expect("publish");
    bus.shutdown().await.expect("shutdown");

    assert_eq!(sync_count.load(Ordering::SeqCst), 1, "sync handler should fire");
    assert_eq!(async_count.load(Ordering::SeqCst), 5, "async handler should fire");
}