use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use jaeb::{EventBus, EventHandler, HandlerResult, SyncEventHandler};
#[derive(Clone, Debug)]
struct EventA;
#[derive(Clone, Debug)]
struct EventB;
struct SyncHandlerA;
impl SyncEventHandler<EventA> for SyncHandlerA {
fn handle(&self, _event: &EventA) -> HandlerResult {
Ok(())
}
fn name(&self) -> Option<&'static str> {
Some("handler-a")
}
}
struct SyncHandlerB;
impl SyncEventHandler<EventB> for SyncHandlerB {
fn handle(&self, _event: &EventB) -> HandlerResult {
Ok(())
}
}
struct SlowAsyncHandler {
started: Arc<AtomicBool>,
}
impl EventHandler<EventA> for SlowAsyncHandler {
async fn handle(&self, _event: &EventA) -> HandlerResult {
self.started.store(true, Ordering::SeqCst);
tokio::time::sleep(Duration::from_secs(10)).await;
Ok(())
}
fn name(&self) -> Option<&'static str> {
Some("slow-async")
}
}
#[tokio::test]
async fn stats_empty_bus() {
let bus = EventBus::builder().buffer_size(128).build().await.expect("valid config");
let stats = bus.stats().await.expect("stats");
assert_eq!(stats.total_subscriptions, 0);
assert!(stats.registered_event_types.is_empty());
assert!(stats.subscriptions_by_event.is_empty());
assert_eq!(stats.queue_capacity, 128);
assert_eq!(stats.publish_permits_available, 128);
assert_eq!(stats.publish_in_flight, 0);
assert_eq!(stats.in_flight_async, 0);
assert!(!stats.shutdown_called);
bus.shutdown().await.expect("shutdown");
}
#[tokio::test]
async fn stats_after_subscriptions() {
let bus = EventBus::builder().buffer_size(64).build().await.expect("valid config");
let _sub_a = bus.subscribe(SyncHandlerA).await.expect("subscribe a");
let _sub_b = bus.subscribe(SyncHandlerB).await.expect("subscribe b");
let stats = bus.stats().await.expect("stats");
assert_eq!(stats.total_subscriptions, 2);
assert_eq!(stats.registered_event_types.len(), 2);
let event_a_name = stats
.registered_event_types
.iter()
.find(|n| n.contains("EventA"))
.expect("EventA should be registered");
let a_listeners = &stats.subscriptions_by_event[event_a_name];
assert_eq!(a_listeners.len(), 1);
assert_eq!(a_listeners[0].name, Some("handler-a"));
let event_b_name = stats
.registered_event_types
.iter()
.find(|n| n.contains("EventB"))
.expect("EventB should be registered");
let b_listeners = &stats.subscriptions_by_event[event_b_name];
assert_eq!(b_listeners.len(), 1);
assert_eq!(b_listeners[0].name, None);
bus.shutdown().await.expect("shutdown");
}
#[tokio::test]
async fn stats_in_flight_async() {
let bus = EventBus::builder()
.buffer_size(64)
.shutdown_timeout(Duration::from_millis(100))
.build()
.await
.expect("valid config");
let started = Arc::new(AtomicBool::new(false));
let _sub = bus
.subscribe(SlowAsyncHandler {
started: Arc::clone(&started),
})
.await
.expect("subscribe");
bus.publish(EventA).await.expect("publish");
tokio::time::timeout(Duration::from_secs(2), async {
while !started.load(Ordering::SeqCst) {
tokio::time::sleep(Duration::from_millis(5)).await;
}
})
.await
.expect("handler should have started");
let stats = bus.stats().await.expect("stats");
assert!(
stats.in_flight_async >= 1,
"expected at least 1 in-flight async task, got {}",
stats.in_flight_async
);
let _ = bus.shutdown().await;
}
#[tokio::test]
async fn stats_shutdown_called() {
let bus = EventBus::builder().buffer_size(64).build().await.expect("valid config");
let stats = bus.stats().await.expect("stats before shutdown");
assert!(!stats.shutdown_called);
bus.shutdown().await.expect("shutdown");
let result = bus.stats().await;
assert!(result.is_err(), "stats after shutdown should fail");
}
#[tokio::test]
async fn stats_after_unsubscribe() {
let bus = EventBus::builder().buffer_size(64).build().await.expect("valid config");
let sub = bus.subscribe(SyncHandlerA).await.expect("subscribe");
let stats = bus.stats().await.expect("stats");
assert_eq!(stats.total_subscriptions, 1);
sub.unsubscribe().await.expect("unsubscribe");
let stats = bus.stats().await.expect("stats");
assert_eq!(stats.total_subscriptions, 0);
assert!(stats.registered_event_types.is_empty());
bus.shutdown().await.expect("shutdown");
}