use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use jaeb::{EventBus, EventBusError, EventHandler, FailurePolicy, HandlerResult, SyncEventHandler};
#[derive(Clone, Debug)]
struct Job;
struct FailOnceAsync {
attempts: Arc<AtomicUsize>,
}
impl EventHandler<Job> for FailOnceAsync {
async fn handle(&self, _event: &Job) -> HandlerResult {
let n = self.attempts.fetch_add(1, Ordering::SeqCst);
if n == 0 { Err("first attempt fails".into()) } else { Ok(()) }
}
}
struct AlwaysFailSync {
attempts: Arc<AtomicUsize>,
}
impl SyncEventHandler<Job> for AlwaysFailSync {
fn handle(&self, _event: &Job) -> HandlerResult {
self.attempts.fetch_add(1, Ordering::SeqCst);
Err("always fails".into())
}
}
struct FailNTimesAsync {
attempts: Arc<AtomicUsize>,
fail_count: usize,
}
impl EventHandler<Job> for FailNTimesAsync {
async fn handle(&self, _event: &Job) -> HandlerResult {
let n = self.attempts.fetch_add(1, Ordering::SeqCst);
if n < self.fail_count {
Err(format!("attempt {n} fails").into())
} else {
Ok(())
}
}
}
struct AlwaysFailAsync {
attempts: Arc<AtomicUsize>,
}
impl EventHandler<Job> for AlwaysFailAsync {
async fn handle(&self, _event: &Job) -> HandlerResult {
self.attempts.fetch_add(1, Ordering::SeqCst);
Err("always fails".into())
}
}
#[tokio::test]
async fn retry_policy_retries_async_handler() {
let bus = EventBus::new(16).expect("valid config");
let attempts = Arc::new(AtomicUsize::new(0));
let policy = FailurePolicy::default().with_max_retries(1).with_retry_delay(Duration::from_millis(1));
let _ = bus
.subscribe_with_policy(
FailOnceAsync {
attempts: Arc::clone(&attempts),
},
policy,
)
.await
.expect("subscribe");
bus.publish(Job).await.expect("publish");
bus.shutdown().await.expect("shutdown");
assert_eq!(attempts.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn sync_subscribe_with_retries_is_rejected() {
let bus = EventBus::new(16).expect("valid config");
let policy = FailurePolicy::default().with_max_retries(1);
let result = bus
.subscribe_with_policy(
AlwaysFailSync {
attempts: Arc::new(AtomicUsize::new(0)),
},
policy,
)
.await;
assert_eq!(result.unwrap_err(), EventBusError::SyncRetryNotSupported);
bus.shutdown().await.expect("shutdown");
}
#[tokio::test]
async fn sync_handler_single_attempt_on_failure() {
let bus = EventBus::new(16).expect("valid config");
let attempts = Arc::new(AtomicUsize::new(0));
let _ = bus
.subscribe(AlwaysFailSync {
attempts: Arc::clone(&attempts),
})
.await
.expect("subscribe");
bus.publish(Job).await.expect("publish");
bus.shutdown().await.expect("shutdown");
assert_eq!(attempts.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn retry_multiple_attempts() {
let bus = EventBus::new(16).expect("valid config");
let attempts = Arc::new(AtomicUsize::new(0));
let policy = FailurePolicy::default().with_max_retries(3).with_retry_delay(Duration::from_millis(1));
let _ = bus
.subscribe_with_policy(
FailNTimesAsync {
attempts: Arc::clone(&attempts),
fail_count: 3,
},
policy,
)
.await
.expect("subscribe");
bus.publish(Job).await.expect("publish");
bus.shutdown().await.expect("shutdown");
assert_eq!(attempts.load(Ordering::SeqCst), 4); }
#[tokio::test]
async fn retry_delay_is_respected() {
let bus = EventBus::new(16).expect("valid config");
let attempts = Arc::new(AtomicUsize::new(0));
let policy = FailurePolicy::default()
.with_max_retries(1)
.with_retry_delay(Duration::from_millis(50))
.with_dead_letter(false);
let _ = bus
.subscribe_with_policy(
AlwaysFailAsync {
attempts: Arc::clone(&attempts),
},
policy,
)
.await
.expect("subscribe");
let start = Instant::now();
bus.publish(Job).await.expect("publish");
bus.shutdown().await.expect("shutdown");
let elapsed = start.elapsed();
assert_eq!(attempts.load(Ordering::SeqCst), 2);
assert!(elapsed >= Duration::from_millis(45), "expected >= 45ms, got {elapsed:?}");
}