use jaeb::{EventBus, EventHandler, HandlerResult, SyncEventHandler};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Barrier;
use tokio::time::Instant;
#[derive(Clone, Debug)]
struct Alpha(usize);
#[derive(Clone, Debug)]
struct Beta(usize);
#[derive(Clone, Debug)]
struct Gamma;
struct SyncAlphaRecorder {
log: Arc<std::sync::Mutex<Vec<usize>>>,
}
impl SyncEventHandler<Alpha> for SyncAlphaRecorder {
fn handle(&self, event: &Alpha) -> HandlerResult {
self.log.lock().unwrap().push(event.0);
Ok(())
}
}
struct SlowAlphaHandler {
barrier: Arc<Barrier>,
reached: Arc<AtomicBool>,
}
impl EventHandler<Alpha> for SlowAlphaHandler {
async fn handle(&self, _event: &Alpha) -> HandlerResult {
self.reached.store(true, Ordering::SeqCst);
self.barrier.wait().await;
Ok(())
}
}
struct SlowBetaHandler {
barrier: Arc<Barrier>,
reached: Arc<AtomicBool>,
}
impl EventHandler<Beta> for SlowBetaHandler {
async fn handle(&self, _event: &Beta) -> HandlerResult {
self.reached.store(true, Ordering::SeqCst);
self.barrier.wait().await;
Ok(())
}
}
struct AlphaAsyncCounter(Arc<AtomicUsize>);
impl EventHandler<Alpha> for AlphaAsyncCounter {
async fn handle(&self, event: &Alpha) -> HandlerResult {
self.0.fetch_add(event.0, Ordering::SeqCst);
Ok(())
}
}
struct BetaAsyncCounter(Arc<AtomicUsize>);
impl EventHandler<Beta> for BetaAsyncCounter {
async fn handle(&self, event: &Beta) -> HandlerResult {
self.0.fetch_add(event.0, Ordering::SeqCst);
Ok(())
}
}
struct GammaAsyncCounter(Arc<AtomicUsize>);
impl EventHandler<Gamma> for GammaAsyncCounter {
async fn handle(&self, _event: &Gamma) -> HandlerResult {
self.0.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
struct AlphaSyncCounter(Arc<AtomicUsize>);
impl SyncEventHandler<Alpha> for AlphaSyncCounter {
fn handle(&self, _event: &Alpha) -> HandlerResult {
self.0.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
#[tokio::test]
async fn per_type_parallelism() {
let bus = EventBus::new(64).expect("valid config");
let barrier_alpha = Arc::new(Barrier::new(2));
let barrier_beta = Arc::new(Barrier::new(2));
let reached_alpha = Arc::new(AtomicBool::new(false));
let reached_beta = Arc::new(AtomicBool::new(false));
let _ = bus
.subscribe(SlowAlphaHandler {
barrier: Arc::clone(&barrier_alpha),
reached: Arc::clone(&reached_alpha),
})
.await
.expect("subscribe alpha");
let _ = bus
.subscribe(SlowBetaHandler {
barrier: Arc::clone(&barrier_beta),
reached: Arc::clone(&reached_beta),
})
.await
.expect("subscribe beta");
bus.publish(Alpha(1)).await.expect("publish alpha");
bus.publish(Beta(1)).await.expect("publish beta");
let start = Instant::now();
let timeout = Duration::from_secs(5);
loop {
if reached_alpha.load(Ordering::SeqCst) && reached_beta.load(Ordering::SeqCst) {
break;
}
if start.elapsed() > timeout {
panic!("timed out waiting for both handlers to start — type slots may not be parallel");
}
tokio::task::yield_now().await;
}
barrier_alpha.wait().await;
barrier_beta.wait().await;
bus.shutdown().await.expect("shutdown");
}
#[tokio::test]
async fn fifo_within_event_type() {
let bus = EventBus::new(64).expect("valid config");
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
let _ = bus.subscribe(SyncAlphaRecorder { log: Arc::clone(&log) }).await.expect("subscribe");
for i in 0..10 {
bus.publish(Alpha(i)).await.expect("publish");
}
bus.shutdown().await.expect("shutdown");
let entries: Vec<usize> = log.lock().unwrap().clone();
let expected: Vec<usize> = (0..10).collect();
assert_eq!(entries, expected, "events should arrive in FIFO order within the same event type");
}
#[tokio::test]
async fn lazy_type_slot_registration() {
let bus = EventBus::new(64).expect("valid config");
let count = Arc::new(AtomicUsize::new(0));
let _ = bus.subscribe(AlphaSyncCounter(Arc::clone(&count))).await.expect("subscribe alpha");
let stats = bus.stats().await.expect("stats");
assert_eq!(stats.registered_event_types.len(), 1, "only one event type should have a slot");
bus.publish(Beta(1)).await.expect("publish beta");
let stats = bus.stats().await.expect("stats after publish");
assert_eq!(
stats.registered_event_types.len(),
1,
"publishing to an unregistered type should not create a slot"
);
bus.shutdown().await.expect("shutdown");
assert_eq!(count.load(Ordering::SeqCst), 0, "Alpha handler should not have fired");
}
#[tokio::test]
async fn type_slot_cleanup_on_empty() {
let bus = EventBus::new(64).expect("valid config");
let count = Arc::new(AtomicUsize::new(0));
let sub = bus.subscribe(AlphaSyncCounter(Arc::clone(&count))).await.expect("subscribe");
let stats = bus.stats().await.expect("stats before unsub");
assert_eq!(stats.total_subscriptions, 1);
assert_eq!(stats.registered_event_types.len(), 1, "slot should exist for Alpha");
let removed = sub.unsubscribe().await.expect("unsubscribe");
assert!(removed, "unsubscribe should return true");
let stats = bus.stats().await.expect("stats after unsub");
assert_eq!(stats.total_subscriptions, 0);
assert_eq!(stats.registered_event_types.len(), 0, "empty slot should be cleaned up");
bus.shutdown().await.expect("shutdown");
}
#[tokio::test]
async fn shutdown_aggregates_all_type_slots() {
let bus = EventBus::new(64).expect("valid config");
let alpha_count = Arc::new(AtomicUsize::new(0));
let beta_count = Arc::new(AtomicUsize::new(0));
let gamma_count = Arc::new(AtomicUsize::new(0));
let _ = bus.subscribe(AlphaAsyncCounter(Arc::clone(&alpha_count))).await.expect("subscribe alpha");
let _ = bus.subscribe(BetaAsyncCounter(Arc::clone(&beta_count))).await.expect("subscribe beta");
let _ = bus.subscribe(GammaAsyncCounter(Arc::clone(&gamma_count))).await.expect("subscribe gamma");
bus.publish(Alpha(10)).await.expect("publish alpha");
bus.publish(Beta(20)).await.expect("publish beta");
bus.publish(Gamma).await.expect("publish gamma");
bus.shutdown().await.expect("shutdown");
assert_eq!(alpha_count.load(Ordering::SeqCst), 10, "alpha handler should have executed");
assert_eq!(beta_count.load(Ordering::SeqCst), 20, "beta handler should have executed");
assert_eq!(gamma_count.load(Ordering::SeqCst), 1, "gamma handler should have executed");
}
#[tokio::test]
async fn multiple_listeners_share_type_slot() {
let bus = EventBus::new(64).expect("valid config");
let sync_count = Arc::new(AtomicUsize::new(0));
let async_count = Arc::new(AtomicUsize::new(0));
let _ = bus.subscribe(AlphaSyncCounter(Arc::clone(&sync_count))).await.expect("subscribe sync");
let _ = bus.subscribe(AlphaAsyncCounter(Arc::clone(&async_count))).await.expect("subscribe async");
let stats = bus.stats().await.expect("stats");
assert_eq!(stats.total_subscriptions, 2, "both listeners should be counted");
assert_eq!(stats.registered_event_types.len(), 1, "both should share a single type slot");
bus.publish(Alpha(5)).await.expect("publish");
bus.shutdown().await.expect("shutdown");
assert_eq!(sync_count.load(Ordering::SeqCst), 1, "sync handler should fire");
assert_eq!(async_count.load(Ordering::SeqCst), 5, "async handler should fire");
}