use do_over::{bulkhead::Bulkhead, error::DoOverError, policy::Policy};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
#[tokio::main]
async fn main() {
println!("=== Do-Over Bulkhead Policy Example ===\n");
basic_concurrency_example().await;
println!("\n{}\n", "─".repeat(60));
queue_timeout_example().await;
}
async fn basic_concurrency_example() {
println!("📌 Scenario 1: Basic Concurrency Limiting");
println!(" Configuration: max_concurrent=2, no queue");
println!(" Launching 5 concurrent requests...\n");
let bulkhead = Arc::new(Bulkhead::new(2));
let start = Instant::now();
let completed = Arc::new(AtomicUsize::new(0));
let rejected = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for i in 1..=5 {
let bh = Arc::clone(&bulkhead);
let comp = Arc::clone(&completed);
let rej = Arc::clone(&rejected);
let s = start;
let handle = tokio::spawn(async move {
let result: Result<String, DoOverError<String>> = bh
.execute(|| async {
let elapsed = s.elapsed().as_millis();
println!(
" [+{:>4}ms] Request {}: 🔓 Acquired slot, processing...",
elapsed, i
);
tokio::time::sleep(Duration::from_millis(200)).await;
let elapsed = s.elapsed().as_millis();
println!(
" [+{:>4}ms] Request {}: ✅ Completed, releasing slot",
elapsed, i
);
Ok(format!("Request {} done", i))
})
.await;
match result {
Ok(_) => {
comp.fetch_add(1, Ordering::SeqCst);
}
Err(DoOverError::BulkheadFull) => {
let elapsed = s.elapsed().as_millis();
println!(
" [+{:>4}ms] Request {}: 🚫 Rejected (BulkheadFull)",
elapsed, i
);
rej.fetch_add(1, Ordering::SeqCst);
}
Err(e) => println!(" Request {}: Error - {:?}", i, e),
}
});
handles.push(handle);
tokio::time::sleep(Duration::from_millis(10)).await;
}
for handle in handles {
handle.await.unwrap();
}
let elapsed = start.elapsed().as_millis();
println!("\n Summary:");
println!(" - Completed: {}", completed.load(Ordering::SeqCst));
println!(" - Rejected: {}", rejected.load(Ordering::SeqCst));
println!(" - Total time: {}ms", elapsed);
println!("\n 💡 Only 2 requests could run concurrently, 3 were rejected immediately");
}
async fn queue_timeout_example() {
println!("📌 Scenario 2: With Queue Timeout");
println!(" Configuration: max_concurrent=2, queue_timeout=150ms");
println!(" Launching 5 concurrent requests...\n");
let bulkhead = Arc::new(Bulkhead::new(2).with_queue_timeout(Duration::from_millis(150)));
let start = Instant::now();
let completed = Arc::new(AtomicUsize::new(0));
let rejected = Arc::new(AtomicUsize::new(0));
let queued = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for i in 1..=5 {
let bh = Arc::clone(&bulkhead);
let comp = Arc::clone(&completed);
let rej = Arc::clone(&rejected);
let q = Arc::clone(&queued);
let s = start;
let handle = tokio::spawn(async move {
let request_start = s.elapsed().as_millis();
let result: Result<String, DoOverError<String>> = bh
.execute(|| async {
let elapsed = s.elapsed().as_millis();
let wait_time = elapsed - request_start;
if wait_time > 10 {
println!(
" [+{:>4}ms] Request {}: ⏳ Waited {}ms in queue, now processing",
elapsed, i, wait_time
);
q.fetch_add(1, Ordering::SeqCst);
} else {
println!(
" [+{:>4}ms] Request {}: 🔓 Acquired slot immediately",
elapsed, i
);
}
tokio::time::sleep(Duration::from_millis(100)).await;
let elapsed = s.elapsed().as_millis();
println!(
" [+{:>4}ms] Request {}: ✅ Completed",
elapsed, i
);
Ok(format!("Request {} done", i))
})
.await;
match result {
Ok(_) => {
comp.fetch_add(1, Ordering::SeqCst);
}
Err(DoOverError::BulkheadFull) => {
let elapsed = s.elapsed().as_millis();
println!(
" [+{:>4}ms] Request {}: 🚫 Queue timeout expired",
elapsed, i
);
rej.fetch_add(1, Ordering::SeqCst);
}
Err(e) => println!(" Request {}: Error - {:?}", i, e),
}
});
handles.push(handle);
tokio::time::sleep(Duration::from_millis(20)).await;
}
for handle in handles {
handle.await.unwrap();
}
let elapsed = start.elapsed().as_millis();
println!("\n Summary:");
println!(" - Completed: {}", completed.load(Ordering::SeqCst));
println!(" - Queued/Waited: {}", queued.load(Ordering::SeqCst));
println!(" - Rejected: {}", rejected.load(Ordering::SeqCst));
println!(" - Total time: {}ms", elapsed);
println!("\n 💡 Requests waited in queue up to 150ms before being rejected");
}