use do_over::{
bulkhead::Bulkhead,
circuit_breaker::CircuitBreaker,
error::DoOverError,
policy::Policy,
retry::RetryPolicy,
timeout::TimeoutPolicy,
wrap::Wrap,
};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
#[tokio::main]
async fn main() {
println!("=== Do-Over Policy Composition Example ===\n");
pattern1_retry_timeout().await;
println!("\n{}\n", "─".repeat(60));
pattern2_circuit_retry().await;
println!("\n{}\n", "─".repeat(60));
pattern3_full_stack().await;
}
async fn pattern1_retry_timeout() {
println!("📌 Pattern 1: Retry wrapping Timeout");
println!(" Wrap::new(Retry(3, 100ms), Timeout(200ms))");
println!(" → Each retry attempt gets its own 200ms timeout\n");
let policy = Wrap::new(
RetryPolicy::fixed(3, Duration::from_millis(100)),
TimeoutPolicy::new(Duration::from_millis(200)),
);
let attempt_count = Arc::new(AtomicUsize::new(0));
let start = Instant::now();
let result: Result<String, DoOverError<String>> = {
let ac = Arc::clone(&attempt_count);
policy
.execute(|| {
let count = Arc::clone(&ac);
async move {
let attempt = count.fetch_add(1, Ordering::SeqCst) + 1;
let elapsed = start.elapsed().as_millis();
println!(
" [+{:>4}ms] Attempt {}: Started",
elapsed, attempt
);
if attempt < 3 {
println!(
" [+{:>4}ms] Attempt {}: Processing slowly (will timeout)...",
elapsed, attempt
);
tokio::time::sleep(Duration::from_millis(500)).await;
Ok("Should not reach here".to_string())
} else {
tokio::time::sleep(Duration::from_millis(50)).await;
let elapsed = start.elapsed().as_millis();
println!(
" [+{:>4}ms] Attempt {}: ✅ Completed quickly!",
elapsed, attempt
);
Ok("Success on attempt 3".to_string())
}
}
})
.await
};
let total_time = start.elapsed().as_millis();
println!("\n Result: {:?}", result.unwrap());
println!(" Total time: {}ms", total_time);
println!(" Total attempts: {}", attempt_count.load(Ordering::SeqCst));
println!("\n 💡 Each attempt had its own timeout; operation succeeded on 3rd try");
}
async fn pattern2_circuit_retry() {
println!("📌 Pattern 2: CircuitBreaker wrapping Retry");
println!(" Wrap::new(CircuitBreaker(3, 1s), Retry(2, 50ms))");
println!(" → Retries happen inside the circuit breaker\n");
let policy = Wrap::new(
CircuitBreaker::new(3, Duration::from_secs(1)),
RetryPolicy::fixed(2, Duration::from_millis(50)),
);
let failure_count = Arc::new(AtomicUsize::new(0));
let start = Instant::now();
println!(" Phase 1: Failing calls to open circuit");
for i in 1..=4 {
let fc = Arc::clone(&failure_count);
let s = start;
let result: Result<String, DoOverError<String>> = policy
.execute(|| {
let count = Arc::clone(&fc);
async move {
count.fetch_add(1, Ordering::SeqCst);
let elapsed = s.elapsed().as_millis();
println!(
" [+{:>4}ms] Call {}: Inner operation failing...",
elapsed, i
);
Err(DoOverError::Inner(format!("Service error")))
}
})
.await;
let elapsed = start.elapsed().as_millis();
match &result {
Err(DoOverError::Inner(_)) => {
println!(
" [+{:>4}ms] Call {}: ❌ Failed after retries",
elapsed, i
);
}
Err(DoOverError::CircuitOpen) => {
println!(
" [+{:>4}ms] Call {}: 🚫 Circuit is OPEN",
elapsed, i
);
}
_ => {}
}
}
let inner_calls = failure_count.load(Ordering::SeqCst);
println!("\n Summary:");
println!(" - Inner operation calls: {} (includes retries)", inner_calls);
println!(" - Circuit opened after threshold reached");
println!("\n 💡 Retries happened inside circuit breaker; failures accumulated to open circuit");
}
async fn pattern3_full_stack() {
println!("📌 Pattern 3: Full Resilience Stack");
println!(" Bulkhead(2) → Retry(2, 100ms) → Timeout(500ms)");
println!(" Recommended ordering for production systems\n");
let policy = Arc::new(Wrap::new(
Bulkhead::new(2),
Wrap::new(
RetryPolicy::fixed(2, Duration::from_millis(100)),
TimeoutPolicy::new(Duration::from_millis(500)),
),
));
let call_count = Arc::new(AtomicUsize::new(0));
let start = Instant::now();
println!(" Launching 4 concurrent requests (bulkhead limit is 2)...\n");
let mut handles = vec![];
for i in 1..=4 {
let p = Arc::clone(&policy);
let cc = Arc::clone(&call_count);
let s = start;
let handle = tokio::spawn(async move {
let result: Result<String, DoOverError<String>> = p
.execute(|| {
let count = Arc::clone(&cc);
async move {
let call = count.fetch_add(1, Ordering::SeqCst) + 1;
let elapsed = s.elapsed().as_millis();
println!(
" [+{:>4}ms] Request {}, call {}: Processing...",
elapsed, i, call
);
tokio::time::sleep(Duration::from_millis(200)).await;
let elapsed = s.elapsed().as_millis();
println!(
" [+{:>4}ms] Request {}, call {}: ✅ Done",
elapsed, i, call
);
Ok(format!("Response {}", i))
}
})
.await;
let elapsed = s.elapsed().as_millis();
match &result {
Ok(msg) => println!(" [+{:>4}ms] Request {}: ✅ {}", elapsed, i, msg),
Err(DoOverError::BulkheadFull) => {
println!(" [+{:>4}ms] Request {}: 🚫 Rejected by bulkhead", elapsed, i)
}
Err(e) => println!(" [+{:>4}ms] Request {}: ❌ {:?}", elapsed, i, e),
}
});
handles.push(handle);
tokio::time::sleep(Duration::from_millis(10)).await;
}
for handle in handles {
handle.await.unwrap();
}
let total_time = start.elapsed().as_millis();
let total_calls = call_count.load(Ordering::SeqCst);
println!("\n Summary:");
println!(" - Total inner operation calls: {}", total_calls);
println!(" - Total time: {}ms", total_time);
println!("\n Policy execution order:");
println!(" 1. Bulkhead: Limits to 2 concurrent executions");
println!(" 2. Retry: Retries transient failures");
println!(" 3. Timeout: Each attempt bounded to 500ms");
}