use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use jaeb::{EventBus, EventHandler, HandlerResult, RetryStrategy, SubscriptionPolicy, SyncEventHandler};
#[derive(Clone, Debug)]
struct Job;
struct FailOnceAsync {
attempts: Arc<AtomicUsize>,
}
impl EventHandler<Job> for FailOnceAsync {
async fn handle(&self, _event: &Job) -> HandlerResult {
let n = self.attempts.fetch_add(1, Ordering::SeqCst);
if n == 0 { Err("first attempt fails".into()) } else { Ok(()) }
}
}
struct AlwaysFailSync {
attempts: Arc<AtomicUsize>,
}
impl SyncEventHandler<Job> for AlwaysFailSync {
fn handle(&self, _event: &Job) -> HandlerResult {
self.attempts.fetch_add(1, Ordering::SeqCst);
Err("always fails".into())
}
}
struct FailNTimesAsync {
attempts: Arc<AtomicUsize>,
fail_count: usize,
}
impl EventHandler<Job> for FailNTimesAsync {
async fn handle(&self, _event: &Job) -> HandlerResult {
let n = self.attempts.fetch_add(1, Ordering::SeqCst);
if n < self.fail_count {
Err(format!("attempt {n} fails").into())
} else {
Ok(())
}
}
}
struct AlwaysFailAsync {
attempts: Arc<AtomicUsize>,
}
impl EventHandler<Job> for AlwaysFailAsync {
async fn handle(&self, _event: &Job) -> HandlerResult {
self.attempts.fetch_add(1, Ordering::SeqCst);
Err("always fails".into())
}
}
#[tokio::test]
async fn retry_policy_retries_async_handler() {
let bus = EventBus::new(16).expect("valid config");
let attempts = Arc::new(AtomicUsize::new(0));
let policy = SubscriptionPolicy::default()
.with_max_retries(1)
.with_retry_strategy(RetryStrategy::Fixed(Duration::from_millis(1)));
let _ = bus
.subscribe_with_policy(
FailOnceAsync {
attempts: Arc::clone(&attempts),
},
policy,
)
.await
.expect("subscribe");
bus.publish(Job).await.expect("publish");
bus.shutdown().await.expect("shutdown");
assert_eq!(attempts.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn sync_handler_single_attempt_on_failure() {
let bus = EventBus::new(16).expect("valid config");
let attempts = Arc::new(AtomicUsize::new(0));
let _ = bus
.subscribe(AlwaysFailSync {
attempts: Arc::clone(&attempts),
})
.await
.expect("subscribe");
bus.publish(Job).await.expect("publish");
bus.shutdown().await.expect("shutdown");
assert_eq!(attempts.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn retry_multiple_attempts() {
let bus = EventBus::new(16).expect("valid config");
let attempts = Arc::new(AtomicUsize::new(0));
let policy = SubscriptionPolicy::default()
.with_max_retries(3)
.with_retry_strategy(RetryStrategy::Fixed(Duration::from_millis(1)));
let _ = bus
.subscribe_with_policy(
FailNTimesAsync {
attempts: Arc::clone(&attempts),
fail_count: 3,
},
policy,
)
.await
.expect("subscribe");
bus.publish(Job).await.expect("publish");
bus.shutdown().await.expect("shutdown");
assert_eq!(attempts.load(Ordering::SeqCst), 4); }
#[tokio::test]
async fn retry_delay_is_respected() {
let bus = EventBus::new(16).expect("valid config");
let attempts = Arc::new(AtomicUsize::new(0));
let policy = SubscriptionPolicy::default()
.with_max_retries(1)
.with_retry_strategy(RetryStrategy::Fixed(Duration::from_millis(50)))
.with_dead_letter(false);
let _ = bus
.subscribe_with_policy(
AlwaysFailAsync {
attempts: Arc::clone(&attempts),
},
policy,
)
.await
.expect("subscribe");
let start = Instant::now();
bus.publish(Job).await.expect("publish");
bus.shutdown().await.expect("shutdown");
let elapsed = start.elapsed();
assert_eq!(attempts.load(Ordering::SeqCst), 2);
assert!(elapsed >= Duration::from_millis(45), "expected >= 45ms, got {elapsed:?}");
}
#[tokio::test]
async fn retry_exponential_backoff() {
let bus = EventBus::new(16).expect("valid config");
let attempts = Arc::new(AtomicUsize::new(0));
let policy = SubscriptionPolicy::default()
.with_max_retries(2)
.with_retry_strategy(RetryStrategy::Exponential {
base: Duration::from_millis(25),
max: Duration::from_millis(200),
})
.with_dead_letter(false);
let _ = bus
.subscribe_with_policy(
AlwaysFailAsync {
attempts: Arc::clone(&attempts),
},
policy,
)
.await
.expect("subscribe");
let start = Instant::now();
bus.publish(Job).await.expect("publish");
bus.shutdown().await.expect("shutdown");
let elapsed = start.elapsed();
assert_eq!(attempts.load(Ordering::SeqCst), 3); assert!(elapsed >= Duration::from_millis(65), "expected >= 65ms, got {elapsed:?}");
}
#[tokio::test]
async fn retry_exponential_caps_at_max() {
let bus = EventBus::new(16).expect("valid config");
let attempts = Arc::new(AtomicUsize::new(0));
let policy = SubscriptionPolicy::default()
.with_max_retries(3)
.with_retry_strategy(RetryStrategy::Exponential {
base: Duration::from_millis(50),
max: Duration::from_millis(60),
})
.with_dead_letter(false);
let _ = bus
.subscribe_with_policy(
AlwaysFailAsync {
attempts: Arc::clone(&attempts),
},
policy,
)
.await
.expect("subscribe");
let start = Instant::now();
bus.publish(Job).await.expect("publish");
bus.shutdown().await.expect("shutdown");
let elapsed = start.elapsed();
assert_eq!(attempts.load(Ordering::SeqCst), 4); assert!(elapsed >= Duration::from_millis(150), "expected >= 150ms, got {elapsed:?}");
assert!(elapsed < Duration::from_millis(500), "expected < 500ms, got {elapsed:?}");
}
#[tokio::test]
async fn retry_exponential_with_jitter_is_bounded() {
let bus = EventBus::new(16).expect("valid config");
let attempts = Arc::new(AtomicUsize::new(0));
let policy = SubscriptionPolicy::default()
.with_max_retries(2)
.with_retry_strategy(RetryStrategy::ExponentialWithJitter {
base: Duration::from_millis(10),
max: Duration::from_millis(100),
})
.with_dead_letter(false);
let _ = bus
.subscribe_with_policy(
AlwaysFailAsync {
attempts: Arc::clone(&attempts),
},
policy,
)
.await
.expect("subscribe");
let start = Instant::now();
bus.publish(Job).await.expect("publish");
bus.shutdown().await.expect("shutdown");
let elapsed = start.elapsed();
assert_eq!(attempts.load(Ordering::SeqCst), 3); assert!(elapsed < Duration::from_millis(500), "expected < 500ms, got {elapsed:?}");
}
#[tokio::test]
async fn retry_no_strategy_retries_immediately() {
let bus = EventBus::new(16).expect("valid config");
let attempts = Arc::new(AtomicUsize::new(0));
let policy = SubscriptionPolicy::default().with_max_retries(2).with_dead_letter(false);
let _ = bus
.subscribe_with_policy(
AlwaysFailAsync {
attempts: Arc::clone(&attempts),
},
policy,
)
.await
.expect("subscribe");
let start = Instant::now();
bus.publish(Job).await.expect("publish");
bus.shutdown().await.expect("shutdown");
let elapsed = start.elapsed();
assert_eq!(attempts.load(Ordering::SeqCst), 3); assert!(elapsed < Duration::from_millis(100), "expected < 100ms, got {elapsed:?}");
}
#[test]
fn retry_strategy_delay_for_attempt_fixed() {
let strategy = RetryStrategy::Fixed(Duration::from_millis(42));
assert_eq!(strategy.delay_for_attempt(0), Duration::from_millis(42));
assert_eq!(strategy.delay_for_attempt(1), Duration::from_millis(42));
assert_eq!(strategy.delay_for_attempt(10), Duration::from_millis(42));
}
#[test]
fn retry_strategy_delay_for_attempt_exponential() {
let strategy = RetryStrategy::Exponential {
base: Duration::from_millis(10),
max: Duration::from_millis(100),
};
assert_eq!(strategy.delay_for_attempt(0), Duration::from_millis(10)); assert_eq!(strategy.delay_for_attempt(1), Duration::from_millis(20)); assert_eq!(strategy.delay_for_attempt(2), Duration::from_millis(40)); assert_eq!(strategy.delay_for_attempt(3), Duration::from_millis(80)); assert_eq!(strategy.delay_for_attempt(4), Duration::from_millis(100)); assert_eq!(strategy.delay_for_attempt(10), Duration::from_millis(100)); }
#[test]
fn retry_strategy_delay_for_attempt_jitter_bounded() {
let strategy = RetryStrategy::ExponentialWithJitter {
base: Duration::from_millis(10),
max: Duration::from_millis(100),
};
for attempt in 0..10 {
let delay = strategy.delay_for_attempt(attempt);
let exp_cap = std::cmp::min(
Duration::from_millis(10).saturating_mul(1u32.checked_shl(attempt as u32).unwrap_or(u32::MAX)),
Duration::from_millis(100),
);
assert!(delay <= exp_cap, "attempt {attempt}: {delay:?} > cap {exp_cap:?}");
}
}