do-over 0.1.0

Async resilience policies for Rust inspired by Polly
Documentation
//! Bulkhead Policy Example
//!
//! This example demonstrates the Bulkhead policy for concurrency limiting.
//! It shows how concurrent operations are limited and excess requests are rejected.

use do_over::{bulkhead::Bulkhead, error::DoOverError, policy::Policy};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};

#[tokio::main]
async fn main() {
    println!("=== Do-Over Bulkhead Policy Example ===\n");

    // Scenario 1: Basic concurrency limiting
    basic_concurrency_example().await;

    println!("\n{}\n", "".repeat(60));

    // Scenario 2: With queue timeout
    queue_timeout_example().await;
}

async fn basic_concurrency_example() {
    println!("📌 Scenario 1: Basic Concurrency Limiting");
    println!("   Configuration: max_concurrent=2, no queue");
    println!("   Launching 5 concurrent requests...\n");

    let bulkhead = Arc::new(Bulkhead::new(2));
    let start = Instant::now();
    let completed = Arc::new(AtomicUsize::new(0));
    let rejected = Arc::new(AtomicUsize::new(0));

    let mut handles = vec![];

    for i in 1..=5 {
        let bh = Arc::clone(&bulkhead);
        let comp = Arc::clone(&completed);
        let rej = Arc::clone(&rejected);
        let s = start;

        let handle = tokio::spawn(async move {
            let result: Result<String, DoOverError<String>> = bh
                .execute(|| async {
                    let elapsed = s.elapsed().as_millis();
                    println!(
                        "   [+{:>4}ms] Request {}: 🔓 Acquired slot, processing...",
                        elapsed, i
                    );
                    tokio::time::sleep(Duration::from_millis(200)).await;
                    let elapsed = s.elapsed().as_millis();
                    println!(
                        "   [+{:>4}ms] Request {}: ✅ Completed, releasing slot",
                        elapsed, i
                    );
                    Ok(format!("Request {} done", i))
                })
                .await;

            match result {
                Ok(_) => {
                    comp.fetch_add(1, Ordering::SeqCst);
                }
                Err(DoOverError::BulkheadFull) => {
                    let elapsed = s.elapsed().as_millis();
                    println!(
                        "   [+{:>4}ms] Request {}: 🚫 Rejected (BulkheadFull)",
                        elapsed, i
                    );
                    rej.fetch_add(1, Ordering::SeqCst);
                }
                Err(e) => println!("   Request {}: Error - {:?}", i, e),
            }
        });
        handles.push(handle);

        // Small delay to make output more readable
        tokio::time::sleep(Duration::from_millis(10)).await;
    }

    for handle in handles {
        handle.await.unwrap();
    }

    let elapsed = start.elapsed().as_millis();
    println!("\n   Summary:");
    println!("   - Completed: {}", completed.load(Ordering::SeqCst));
    println!("   - Rejected:  {}", rejected.load(Ordering::SeqCst));
    println!("   - Total time: {}ms", elapsed);
    println!("\n   💡 Only 2 requests could run concurrently, 3 were rejected immediately");
}

async fn queue_timeout_example() {
    println!("📌 Scenario 2: With Queue Timeout");
    println!("   Configuration: max_concurrent=2, queue_timeout=150ms");
    println!("   Launching 5 concurrent requests...\n");

    let bulkhead = Arc::new(Bulkhead::new(2).with_queue_timeout(Duration::from_millis(150)));
    let start = Instant::now();
    let completed = Arc::new(AtomicUsize::new(0));
    let rejected = Arc::new(AtomicUsize::new(0));
    let queued = Arc::new(AtomicUsize::new(0));

    let mut handles = vec![];

    for i in 1..=5 {
        let bh = Arc::clone(&bulkhead);
        let comp = Arc::clone(&completed);
        let rej = Arc::clone(&rejected);
        let q = Arc::clone(&queued);
        let s = start;

        let handle = tokio::spawn(async move {
            let request_start = s.elapsed().as_millis();
            let result: Result<String, DoOverError<String>> = bh
                .execute(|| async {
                    let elapsed = s.elapsed().as_millis();
                    let wait_time = elapsed - request_start;
                    if wait_time > 10 {
                        println!(
                            "   [+{:>4}ms] Request {}: ⏳ Waited {}ms in queue, now processing",
                            elapsed, i, wait_time
                        );
                        q.fetch_add(1, Ordering::SeqCst);
                    } else {
                        println!(
                            "   [+{:>4}ms] Request {}: 🔓 Acquired slot immediately",
                            elapsed, i
                        );
                    }
                    tokio::time::sleep(Duration::from_millis(100)).await;
                    let elapsed = s.elapsed().as_millis();
                    println!(
                        "   [+{:>4}ms] Request {}: ✅ Completed",
                        elapsed, i
                    );
                    Ok(format!("Request {} done", i))
                })
                .await;

            match result {
                Ok(_) => {
                    comp.fetch_add(1, Ordering::SeqCst);
                }
                Err(DoOverError::BulkheadFull) => {
                    let elapsed = s.elapsed().as_millis();
                    println!(
                        "   [+{:>4}ms] Request {}: 🚫 Queue timeout expired",
                        elapsed, i
                    );
                    rej.fetch_add(1, Ordering::SeqCst);
                }
                Err(e) => println!("   Request {}: Error - {:?}", i, e),
            }
        });
        handles.push(handle);

        // Small delay between launching requests
        tokio::time::sleep(Duration::from_millis(20)).await;
    }

    for handle in handles {
        handle.await.unwrap();
    }

    let elapsed = start.elapsed().as_millis();
    println!("\n   Summary:");
    println!("   - Completed:     {}", completed.load(Ordering::SeqCst));
    println!("   - Queued/Waited: {}", queued.load(Ordering::SeqCst));
    println!("   - Rejected:      {}", rejected.load(Ordering::SeqCst));
    println!("   - Total time:    {}ms", elapsed);
    println!("\n   💡 Requests waited in queue up to 150ms before being rejected");
}