use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use jaeb::NoRetryPolicy;
use jaeb::{DeadLetter, EventBus, EventHandler, HandlerResult, SyncEventHandler};
#[derive(Clone)]
struct Ping;
struct SyncCounter(Arc<AtomicUsize>);
impl SyncEventHandler<Ping> for SyncCounter {
fn handle(&self, _event: &Ping) -> HandlerResult {
self.0.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
#[tokio::test]
async fn once_sync_handler_fires_once() {
let bus = EventBus::new(64).expect("valid config");
let count = Arc::new(AtomicUsize::new(0));
let _sub = bus
.subscribe_once::<Ping, _, _>(SyncCounter(Arc::clone(&count)))
.await
.expect("subscribe_once");
bus.publish(Ping).await.expect("publish 1");
bus.publish(Ping).await.expect("publish 2");
bus.publish(Ping).await.expect("publish 3");
bus.shutdown().await.expect("shutdown");
assert_eq!(count.load(Ordering::SeqCst), 1, "sync once-handler should fire exactly once");
}
struct AsyncCounter(Arc<AtomicUsize>);
impl EventHandler<Ping> for AsyncCounter {
async fn handle(&self, _event: &Ping) -> HandlerResult {
self.0.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
#[tokio::test]
async fn once_async_handler_fires_once() {
let bus = EventBus::new(64).expect("valid config");
let count = Arc::new(AtomicUsize::new(0));
let _sub = bus
.subscribe_once::<Ping, _, _>(AsyncCounter(Arc::clone(&count)))
.await
.expect("subscribe_once");
bus.publish(Ping).await.expect("publish 1");
bus.publish(Ping).await.expect("publish 2");
bus.publish(Ping).await.expect("publish 3");
bus.shutdown().await.expect("shutdown");
assert_eq!(count.load(Ordering::SeqCst), 1, "async once-handler should fire exactly once");
}
struct FailingHandler;
impl SyncEventHandler<Ping> for FailingHandler {
fn handle(&self, _event: &Ping) -> HandlerResult {
Err("intentional failure".into())
}
}
struct DeadLetterCounter(Arc<AtomicUsize>);
impl SyncEventHandler<DeadLetter> for DeadLetterCounter {
fn handle(&self, _event: &DeadLetter) -> HandlerResult {
self.0.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
#[tokio::test]
async fn once_handler_failure_emits_dead_letter_and_removes() {
let bus = EventBus::new(64).expect("valid config");
let dl_count = Arc::new(AtomicUsize::new(0));
let _dl_sub = bus
.subscribe_dead_letters(DeadLetterCounter(Arc::clone(&dl_count)))
.await
.expect("subscribe dead letters");
let policy = NoRetryPolicy::default();
let _sub = bus
.subscribe_once_with_policy::<Ping, _, _>(FailingHandler, policy)
.await
.expect("subscribe_once_with_policy");
bus.publish(Ping).await.expect("publish 1");
bus.publish(Ping).await.expect("publish 2");
bus.shutdown().await.expect("shutdown");
assert_eq!(
dl_count.load(Ordering::SeqCst),
1,
"exactly one dead letter should be emitted (handler fired once, then removed)"
);
}
#[tokio::test]
async fn once_handler_unsubscribe_after_removal_returns_false() {
let bus = EventBus::new(64).expect("valid config");
let count = Arc::new(AtomicUsize::new(0));
let sub = bus
.subscribe_once::<Ping, _, _>(SyncCounter(Arc::clone(&count)))
.await
.expect("subscribe_once");
bus.publish(Ping).await.expect("publish");
let removed = sub.unsubscribe().await.expect("unsubscribe");
assert!(!removed, "unsubscribe after auto-removal should return false");
bus.shutdown().await.expect("shutdown");
}
#[tokio::test]
async fn once_handler_visible_in_stats_until_fired() {
let bus = EventBus::new(64).expect("valid config");
let count = Arc::new(AtomicUsize::new(0));
let _sub = bus
.subscribe_once::<Ping, _, _>(SyncCounter(Arc::clone(&count)))
.await
.expect("subscribe_once");
let stats = bus.stats().await.expect("stats before");
assert_eq!(stats.total_subscriptions, 1, "once-listener should appear before firing");
bus.publish(Ping).await.expect("publish");
let stats = bus.stats().await.expect("stats after");
assert_eq!(stats.total_subscriptions, 0, "once-listener should be removed after firing");
bus.shutdown().await.expect("shutdown");
}