use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use jaeb::{EventBus, EventHandler, HandlerResult};
#[derive(Clone, Debug)]
struct Ping;
struct SlowHandler {
count: Arc<AtomicUsize>,
}
impl EventHandler<Ping> for SlowHandler {
async fn handle(&self, _event: &Ping) -> HandlerResult {
tokio::time::sleep(Duration::from_millis(50)).await;
self.count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
#[tokio::test]
async fn semaphore_limited_bus_shuts_down_cleanly() {
let bus = EventBus::builder()
.buffer_size(64)
.max_concurrent_async(1)
.shutdown_timeout(Duration::from_millis(100))
.build()
.await
.expect("valid config");
let count = Arc::new(AtomicUsize::new(0));
let _ = bus.subscribe(SlowHandler { count: Arc::clone(&count) }).await.expect("subscribe");
for _ in 0..5 {
bus.publish(Ping).await.expect("publish");
}
let _ = bus.shutdown().await;
assert!(count.load(Ordering::SeqCst) >= 1);
}
#[tokio::test]
async fn semaphore_limited_handlers_execute_normally() {
let bus = EventBus::builder()
.buffer_size(64)
.max_concurrent_async(2)
.build()
.await
.expect("valid config");
let count = Arc::new(AtomicUsize::new(0));
let _ = bus.subscribe(SlowHandler { count: Arc::clone(&count) }).await.expect("subscribe");
for _ in 0..4 {
bus.publish(Ping).await.expect("publish");
}
bus.shutdown().await.expect("shutdown");
assert_eq!(count.load(Ordering::SeqCst), 4);
}