use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use jaeb::{EventBus, EventHandler, FailurePolicy, HandlerResult, SyncEventHandler};
#[derive(Clone, Debug)]
struct Job;
struct FailOnceAsync {
attempts: Arc<AtomicUsize>,
}
impl EventHandler<Job> for FailOnceAsync {
async fn handle(&self, _event: &Job) -> HandlerResult {
let n = self.attempts.fetch_add(1, Ordering::SeqCst);
if n == 0 { Err("first attempt fails".into()) } else { Ok(()) }
}
}
struct FailOnceSync {
attempts: Arc<AtomicUsize>,
}
impl SyncEventHandler<Job> for FailOnceSync {
fn handle(&self, _event: &Job) -> HandlerResult {
let n = self.attempts.fetch_add(1, Ordering::SeqCst);
if n == 0 { Err("first attempt fails".into()) } else { Ok(()) }
}
}
struct FailNTimesAsync {
attempts: Arc<AtomicUsize>,
fail_count: usize,
}
impl EventHandler<Job> for FailNTimesAsync {
async fn handle(&self, _event: &Job) -> HandlerResult {
let n = self.attempts.fetch_add(1, Ordering::SeqCst);
if n < self.fail_count {
Err(format!("attempt {n} fails").into())
} else {
Ok(())
}
}
}
struct AlwaysFailAsync {
attempts: Arc<AtomicUsize>,
}
impl EventHandler<Job> for AlwaysFailAsync {
async fn handle(&self, _event: &Job) -> HandlerResult {
self.attempts.fetch_add(1, Ordering::SeqCst);
Err("always fails".into())
}
}
#[tokio::test]
async fn retry_policy_retries_async_handler() {
let bus = EventBus::new(16);
let attempts = Arc::new(AtomicUsize::new(0));
let policy = FailurePolicy::default().with_max_retries(1).with_retry_delay(Duration::from_millis(1));
bus.subscribe_with_policy(
FailOnceAsync {
attempts: Arc::clone(&attempts),
},
policy,
)
.await
.expect("subscribe");
bus.publish(Job).await.expect("publish");
tokio::time::sleep(Duration::from_millis(30)).await;
assert_eq!(attempts.load(Ordering::SeqCst), 2);
bus.shutdown().await.expect("shutdown");
}
#[tokio::test]
async fn retry_policy_retries_sync_handler() {
let bus = EventBus::new(16);
let attempts = Arc::new(AtomicUsize::new(0));
let policy = FailurePolicy::default().with_max_retries(1).with_retry_delay(Duration::from_millis(1));
bus.subscribe_with_policy(
FailOnceSync {
attempts: Arc::clone(&attempts),
},
policy,
)
.await
.expect("subscribe");
bus.publish(Job).await.expect("publish");
assert_eq!(attempts.load(Ordering::SeqCst), 2);
bus.shutdown().await.expect("shutdown");
}
#[tokio::test]
async fn retry_multiple_attempts() {
let bus = EventBus::new(16);
let attempts = Arc::new(AtomicUsize::new(0));
let policy = FailurePolicy::default().with_max_retries(3).with_retry_delay(Duration::from_millis(1));
bus.subscribe_with_policy(
FailNTimesAsync {
attempts: Arc::clone(&attempts),
fail_count: 3,
},
policy,
)
.await
.expect("subscribe");
bus.publish(Job).await.expect("publish");
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(attempts.load(Ordering::SeqCst), 4);
bus.shutdown().await.expect("shutdown");
}
#[tokio::test]
async fn retry_delay_is_respected() {
let bus = EventBus::new(16);
let attempts = Arc::new(AtomicUsize::new(0));
let policy = FailurePolicy::default()
.with_max_retries(1)
.with_retry_delay(Duration::from_millis(50))
.with_dead_letter(false);
bus.subscribe_with_policy(
AlwaysFailAsync {
attempts: Arc::clone(&attempts),
},
policy,
)
.await
.expect("subscribe");
let start = Instant::now();
bus.publish(Job).await.expect("publish");
tokio::time::sleep(Duration::from_millis(150)).await;
let elapsed = start.elapsed();
assert_eq!(attempts.load(Ordering::SeqCst), 2);
assert!(elapsed >= Duration::from_millis(40), "expected >= 40ms, got {elapsed:?}");
bus.shutdown().await.expect("shutdown");
}