use do_over::bulkhead::Bulkhead;
use do_over::circuit_breaker::CircuitBreaker;
use do_over::error::DoOverError;
use do_over::hedge::Hedge;
use do_over::policy::Policy;
use do_over::rate_limit::RateLimiter;
use do_over::retry::RetryPolicy;
use do_over::timeout::TimeoutPolicy;
use do_over::wrap::Wrap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[tokio::test]
async fn retry_eventually_succeeds() {
let counter = AtomicUsize::new(0);
let retry = RetryPolicy::fixed(3, Duration::from_millis(10));
let result = retry
.execute(|| async {
let c = counter.fetch_add(1, Ordering::SeqCst);
if c < 2 {
Err("fail")
} else {
Ok("ok")
}
})
.await;
assert_eq!(result.unwrap(), "ok");
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn retry_exhausts_all_attempts() {
let counter = AtomicUsize::new(0);
let retry = RetryPolicy::fixed(2, Duration::from_millis(10));
let result: Result<&str, &str> = retry
.execute(|| async {
counter.fetch_add(1, Ordering::SeqCst);
Err("permanent failure")
})
.await;
assert!(result.is_err());
assert_eq!(counter.load(Ordering::SeqCst), 3); }
#[tokio::test]
async fn retry_zero_retries_executes_once() {
let counter = AtomicUsize::new(0);
let retry = RetryPolicy::fixed(0, Duration::from_millis(10));
let result: Result<&str, &str> = retry
.execute(|| async {
counter.fetch_add(1, Ordering::SeqCst);
Err("fail")
})
.await;
assert!(result.is_err());
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn retry_exponential_backoff() {
let counter = AtomicUsize::new(0);
let retry = RetryPolicy::exponential(3, Duration::from_millis(10), 2.0);
let start = std::time::Instant::now();
let result: Result<&str, &str> = retry
.execute(|| async {
counter.fetch_add(1, Ordering::SeqCst);
Err("fail")
})
.await;
let elapsed = start.elapsed();
assert!(result.is_err());
assert!(elapsed >= Duration::from_millis(100));
}
#[tokio::test]
async fn timeout_succeeds_within_limit() {
let timeout = TimeoutPolicy::new(Duration::from_millis(100));
let result: Result<&str, DoOverError<&str>> = timeout
.execute(|| async {
tokio::time::sleep(Duration::from_millis(10)).await;
Ok("success")
})
.await;
assert_eq!(result.unwrap(), "success");
}
#[tokio::test]
async fn timeout_fails_when_exceeded() {
let timeout = TimeoutPolicy::new(Duration::from_millis(50));
let result: Result<&str, DoOverError<&str>> = timeout
.execute(|| async {
tokio::time::sleep(Duration::from_millis(200)).await;
Ok("success")
})
.await;
assert!(matches!(result, Err(DoOverError::Timeout)));
}
#[tokio::test]
async fn timeout_zero_duration() {
let timeout = TimeoutPolicy::new(Duration::ZERO);
let result: Result<&str, DoOverError<&str>> = timeout
.execute(|| async { Ok("instant") })
.await;
assert!(result.is_ok() || matches!(result, Err(DoOverError::Timeout)));
}
#[tokio::test]
async fn circuit_breaker_opens_after_threshold() {
let breaker = CircuitBreaker::new(3, Duration::from_secs(60));
for _ in 0..3 {
let _: Result<(), DoOverError<&str>> = breaker
.execute(|| async { Err(DoOverError::Inner("error")) })
.await;
}
let result: Result<(), DoOverError<&str>> = breaker
.execute(|| async { Ok(()) })
.await;
assert!(matches!(result, Err(DoOverError::CircuitOpen)));
}
#[tokio::test]
async fn circuit_breaker_resets_on_success() {
let breaker = CircuitBreaker::new(3, Duration::from_secs(60));
for _ in 0..2 {
let _: Result<(), DoOverError<&str>> = breaker
.execute(|| async { Err(DoOverError::Inner("error")) })
.await;
}
let _: Result<(), DoOverError<&str>> = breaker.execute(|| async { Ok(()) }).await;
for _ in 0..2 {
let _: Result<(), DoOverError<&str>> = breaker
.execute(|| async { Err(DoOverError::Inner("error")) })
.await;
}
let result: Result<(), DoOverError<&str>> = breaker.execute(|| async { Ok(()) }).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn bulkhead_limits_concurrency() {
let bulkhead = Arc::new(Bulkhead::new(2));
let concurrent = Arc::new(AtomicUsize::new(0));
let max_concurrent = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for _ in 0..5 {
let bh = Arc::clone(&bulkhead);
let conc = Arc::clone(&concurrent);
let max = Arc::clone(&max_concurrent);
handles.push(tokio::spawn(async move {
let result: Result<(), DoOverError<()>> = bh
.execute(|| async {
let c = conc.fetch_add(1, Ordering::SeqCst) + 1;
max.fetch_max(c, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(50)).await;
conc.fetch_sub(1, Ordering::SeqCst);
Ok(())
})
.await;
result
}));
}
let results: Vec<_> = futures::future::join_all(handles)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
let successes = results.iter().filter(|r| r.is_ok()).count();
let rejections = results
.iter()
.filter(|r| matches!(r, Err(DoOverError::BulkheadFull)))
.count();
assert_eq!(successes, 2);
assert_eq!(rejections, 3);
}
#[tokio::test]
async fn bulkhead_with_queue_allows_waiting() {
let bulkhead = Arc::new(Bulkhead::new(1).with_queue_timeout(Duration::from_millis(200)));
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for _ in 0..3 {
let bh = Arc::clone(&bulkhead);
let cnt = Arc::clone(&counter);
handles.push(tokio::spawn(async move {
let result: Result<(), DoOverError<()>> = bh
.execute(|| async {
cnt.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(50)).await;
Ok(())
})
.await;
result
}));
tokio::time::sleep(Duration::from_millis(10)).await;
}
let results: Vec<_> = futures::future::join_all(handles)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
let successes = results.iter().filter(|r| r.is_ok()).count();
assert!(successes >= 2);
}
#[tokio::test]
async fn rate_limiter_allows_burst() {
let limiter = RateLimiter::new(5, Duration::from_secs(1));
let mut successes = 0;
for _ in 0..5 {
let result: Result<(), DoOverError<()>> = limiter.execute(|| async { Ok(()) }).await;
if result.is_ok() {
successes += 1;
}
}
assert_eq!(successes, 5);
}
#[tokio::test]
async fn rate_limiter_rejects_excess() {
let limiter = RateLimiter::new(3, Duration::from_secs(1));
let mut successes = 0;
let mut rejections = 0;
for _ in 0..5 {
let result: Result<(), DoOverError<()>> = limiter.execute(|| async { Ok(()) }).await;
match result {
Ok(()) => successes += 1,
Err(DoOverError::BulkheadFull) => rejections += 1,
_ => {}
}
}
assert_eq!(successes, 3);
assert_eq!(rejections, 2);
}
#[tokio::test]
async fn rate_limiter_refills_after_interval() {
let limiter = RateLimiter::new(2, Duration::from_millis(100));
for _ in 0..2 {
let _: Result<(), DoOverError<()>> = limiter.execute(|| async { Ok(()) }).await;
}
let result: Result<(), DoOverError<()>> = limiter.execute(|| async { Ok(()) }).await;
assert!(matches!(result, Err(DoOverError::BulkheadFull)));
tokio::time::sleep(Duration::from_millis(110)).await;
let result: Result<(), DoOverError<()>> = limiter.execute(|| async { Ok(()) }).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn hedge_returns_first_result() {
let hedge = Hedge::new(Duration::from_millis(50));
let result: Result<&str, DoOverError<&str>> =
hedge.execute(|| async { Ok("result") }).await;
assert_eq!(result.unwrap(), "result");
}
#[tokio::test]
async fn hedge_starts_backup_after_delay() {
let hedge = Hedge::new(Duration::from_millis(50));
let call_count = Arc::new(AtomicUsize::new(0));
let cc = Arc::clone(&call_count);
let _: Result<&str, DoOverError<&str>> = hedge
.execute(|| {
let count = Arc::clone(&cc);
async move {
count.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(200)).await;
Ok("slow")
}
})
.await;
assert!(call_count.load(Ordering::SeqCst) >= 1);
}
#[tokio::test]
async fn wrap_retry_with_timeout() {
let policy = Wrap::new(
RetryPolicy::fixed(2, Duration::from_millis(50)),
TimeoutPolicy::new(Duration::from_millis(100)),
);
let counter = Arc::new(AtomicUsize::new(0));
let cc = Arc::clone(&counter);
let result: Result<&str, DoOverError<&str>> = policy
.execute(|| {
let count = Arc::clone(&cc);
async move {
let c = count.fetch_add(1, Ordering::SeqCst);
if c < 2 {
tokio::time::sleep(Duration::from_millis(200)).await;
}
Ok("success")
}
})
.await;
assert_eq!(result.unwrap(), "success");
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn wrap_bulkhead_with_retry() {
let policy = Wrap::new(
Bulkhead::new(5),
RetryPolicy::fixed(2, Duration::from_millis(10)),
);
let counter = Arc::new(AtomicUsize::new(0));
let cc = Arc::clone(&counter);
let result: Result<&str, DoOverError<&str>> = policy
.execute(|| {
let count = Arc::clone(&cc);
async move {
let c = count.fetch_add(1, Ordering::SeqCst);
if c < 1 {
Err(DoOverError::Inner("transient"))
} else {
Ok("success")
}
}
})
.await;
assert_eq!(result.unwrap(), "success");
}
#[tokio::test]
async fn retry_with_immediate_success() {
let retry = RetryPolicy::fixed(5, Duration::from_millis(100));
let counter = AtomicUsize::new(0);
let result: Result<&str, &str> = retry
.execute(|| async {
counter.fetch_add(1, Ordering::SeqCst);
Ok("immediate")
})
.await;
assert_eq!(result.unwrap(), "immediate");
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn circuit_breaker_threshold_one() {
let breaker = CircuitBreaker::new(1, Duration::from_secs(60));
let _: Result<(), DoOverError<&str>> = breaker
.execute(|| async { Err(DoOverError::Inner("error")) })
.await;
let result: Result<(), DoOverError<&str>> = breaker.execute(|| async { Ok(()) }).await;
assert!(matches!(result, Err(DoOverError::CircuitOpen)));
}
#[tokio::test]
async fn bulkhead_single_slot() {
let bulkhead = Bulkhead::new(1);
let result: Result<&str, DoOverError<&str>> =
bulkhead.execute(|| async { Ok("success") }).await;
assert_eq!(result.unwrap(), "success");
}