use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use jaeb::{EventBus, EventBusError, EventHandler, HandlerResult, SyncEventHandler, SyncMiddleware};
#[derive(Clone, Debug)]
struct Work {
value: usize,
}
struct NoOpSync;
impl SyncEventHandler<Work> for NoOpSync {
fn handle(&self, _event: &Work, _bus: &EventBus) -> HandlerResult {
Ok(())
}
}
struct SyncAccumulator {
sum: Arc<AtomicUsize>,
}
impl SyncEventHandler<Work> for SyncAccumulator {
fn handle(&self, event: &Work, _bus: &EventBus) -> HandlerResult {
self.sum.fetch_add(event.value, Ordering::SeqCst);
Ok(())
}
}
struct SlowAsync {
done: Arc<AtomicUsize>,
}
impl EventHandler<Work> for SlowAsync {
async fn handle(&self, _event: &Work, _bus: &EventBus) -> HandlerResult {
tokio::time::sleep(Duration::from_millis(100)).await;
self.done.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
struct VerySlowAsync {
done: Arc<AtomicUsize>,
}
impl EventHandler<Work> for VerySlowAsync {
async fn handle(&self, _event: &Work, _bus: &EventBus) -> HandlerResult {
tokio::time::sleep(Duration::from_secs(5)).await;
self.done.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
struct BlockingAsyncMiddleware {
started: Arc<AtomicUsize>,
release: Arc<tokio::sync::Notify>,
}
impl jaeb::Middleware for BlockingAsyncMiddleware {
async fn process(&self, _name: &'static str, _event: &(dyn std::any::Any + Send + Sync)) -> jaeb::MiddlewareDecision {
self.started.fetch_add(1, Ordering::SeqCst);
self.release.notified().await;
jaeb::MiddlewareDecision::Continue
}
}
struct SleepingSyncMiddleware {
started: Arc<AtomicUsize>,
release: Arc<tokio::sync::Notify>,
}
impl SyncMiddleware for SleepingSyncMiddleware {
fn process(&self, _name: &'static str, _event: &(dyn std::any::Any + Send + Sync)) -> jaeb::MiddlewareDecision {
self.started.fetch_add(1, Ordering::SeqCst);
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(self.release.notified()));
jaeb::MiddlewareDecision::Continue
}
}
#[tokio::test]
async fn shutdown_stops_new_operations() {
let bus = EventBus::builder().build().await.expect("valid config");
tokio::time::timeout(Duration::from_secs(2), bus.shutdown())
.await
.expect("shutdown timed out")
.expect("shutdown");
let reg_err = match bus.subscribe(NoOpSync).await {
Ok(_) => panic!("subscribe after shutdown unexpectedly succeeded"),
Err(err) => err,
};
assert_eq!(reg_err, EventBusError::Stopped);
let pub_err = bus.publish(Work { value: 1 }).await.expect_err("publish after shutdown");
assert_eq!(pub_err, EventBusError::Stopped);
}
#[tokio::test]
async fn unsubscribe_after_shutdown_returns_stopped() {
let bus = EventBus::builder().build().await.expect("valid config");
let sum = Arc::new(AtomicUsize::new(0));
let sub = bus.subscribe(SyncAccumulator { sum: Arc::clone(&sum) }).await.expect("subscribe");
let id = sub.id();
tokio::time::timeout(Duration::from_secs(2), bus.shutdown())
.await
.expect("shutdown timed out")
.expect("shutdown");
let err = bus.unsubscribe(id).await.expect_err("unsubscribe after shutdown");
assert_eq!(err, EventBusError::Stopped);
}
#[tokio::test]
async fn shutdown_waits_for_inflight_async_handlers() {
let bus = EventBus::builder().build().await.expect("valid config");
let done = Arc::new(AtomicUsize::new(0));
let _ = bus.subscribe(SlowAsync { done: Arc::clone(&done) }).await.expect("subscribe");
bus.publish(Work { value: 1 }).await.expect("publish");
tokio::time::timeout(Duration::from_secs(3), bus.shutdown())
.await
.expect("shutdown timed out")
.expect("shutdown");
assert_eq!(done.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn shutdown_returns_timeout_when_tasks_aborted() {
let bus = EventBus::builder()
.shutdown_timeout(Duration::from_millis(50))
.build()
.await
.expect("valid config");
let done = Arc::new(AtomicUsize::new(0));
let _ = bus.subscribe(VerySlowAsync { done: Arc::clone(&done) }).await.expect("subscribe");
bus.publish(Work { value: 1 }).await.expect("publish");
let result = tokio::time::timeout(Duration::from_secs(2), bus.shutdown())
.await
.expect("shutdown call timed out");
assert_eq!(result, Err(EventBusError::ShutdownTimeout));
assert_eq!(done.load(Ordering::SeqCst), 0);
let err = bus.publish(Work { value: 2 }).await.expect_err("publish after shutdown");
assert_eq!(err, EventBusError::Stopped);
}
#[tokio::test]
async fn shutdown_succeeds_when_tasks_finish_before_deadline() {
let bus = EventBus::builder()
.shutdown_timeout(Duration::from_secs(5))
.build()
.await
.expect("valid config");
let done = Arc::new(AtomicUsize::new(0));
let _ = bus.subscribe(SlowAsync { done: Arc::clone(&done) }).await.expect("subscribe");
bus.publish(Work { value: 1 }).await.expect("publish");
tokio::time::timeout(Duration::from_secs(3), bus.shutdown())
.await
.expect("shutdown timed out")
.expect("shutdown should succeed when tasks finish in time");
assert_eq!(done.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn shutdown_timeout_blocks_async_spawns_from_accepted_publish_still_in_middleware() {
let bus = EventBus::builder()
.shutdown_timeout(Duration::from_millis(50))
.build()
.await
.expect("valid config");
let middleware_started = Arc::new(AtomicUsize::new(0));
let middleware_release = Arc::new(tokio::sync::Notify::new());
let handler_done = Arc::new(AtomicUsize::new(0));
let _mw = bus
.add_middleware(BlockingAsyncMiddleware {
started: Arc::clone(&middleware_started),
release: Arc::clone(&middleware_release),
})
.await
.expect("add middleware");
let _sub = bus
.subscribe(VerySlowAsync {
done: Arc::clone(&handler_done),
})
.await
.expect("subscribe");
let publish_task = tokio::spawn({
let bus = bus.clone();
async move { bus.publish(Work { value: 1 }).await }
});
tokio::time::timeout(Duration::from_secs(2), async {
while middleware_started.load(Ordering::SeqCst) == 0 {
tokio::time::sleep(Duration::from_millis(5)).await;
}
})
.await
.expect("middleware should start");
let result = tokio::time::timeout(Duration::from_secs(2), bus.shutdown())
.await
.expect("shutdown call timed out");
assert_eq!(result, Err(EventBusError::ShutdownTimeout));
middleware_release.notify_waiters();
let publish_result = tokio::time::timeout(Duration::from_secs(2), publish_task)
.await
.expect("publish task timed out")
.expect("publish task panicked");
assert_eq!(publish_result, Ok(()));
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(
handler_done.load(Ordering::SeqCst),
0,
"async handler should not spawn after shutdown timeout"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn shutdown_timeout_rejects_async_spawn_after_sync_middleware_completes() {
let bus = EventBus::builder()
.shutdown_timeout(Duration::from_millis(50))
.build()
.await
.expect("valid config");
let middleware_started = Arc::new(AtomicUsize::new(0));
let middleware_release = Arc::new(tokio::sync::Notify::new());
let handler_done = Arc::new(AtomicUsize::new(0));
let _mw = bus
.add_sync_middleware(SleepingSyncMiddleware {
started: Arc::clone(&middleware_started),
release: Arc::clone(&middleware_release),
})
.await
.expect("add middleware");
let _sub = bus
.subscribe(VerySlowAsync {
done: Arc::clone(&handler_done),
})
.await
.expect("subscribe");
let publish_task = tokio::spawn({
let bus = bus.clone();
async move { bus.publish(Work { value: 1 }).await }
});
tokio::time::timeout(Duration::from_secs(2), async {
while middleware_started.load(Ordering::SeqCst) == 0 {
tokio::time::sleep(Duration::from_millis(5)).await;
}
})
.await
.expect("middleware should start");
let result = tokio::time::timeout(Duration::from_secs(2), bus.shutdown())
.await
.expect("shutdown call timed out");
assert_eq!(result, Err(EventBusError::ShutdownTimeout));
middleware_release.notify_waiters();
let publish_result = tokio::time::timeout(Duration::from_secs(2), publish_task)
.await
.expect("publish task timed out")
.expect("publish task panicked");
assert_eq!(publish_result, Ok(()));
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(
handler_done.load(Ordering::SeqCst),
0,
"async handler should not spawn after timeout rejection"
);
}