do-over 0.1.0

Async resilience policies for Rust inspired by Polly
Documentation
//! Comprehensive Example: Order Processing System
//!
//! This example simulates a real-world order processing system with multiple
//! services, each protected by different resilience policies.
//!
//! Services:
//! - Inventory Service: Rate limited + Retry + Timeout
//! - Payment Service: CircuitBreaker + Retry + Timeout
//! - Notification Service: Hedge + Timeout

use do_over::{
    circuit_breaker::CircuitBreaker,
    error::DoOverError,
    hedge::Hedge,
    policy::Policy,
    rate_limit::RateLimiter,
    retry::RetryPolicy,
    timeout::TimeoutPolicy,
    wrap::Wrap,
};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};

// Simulated service state
struct ServiceState {
    inventory_rate_limited: AtomicBool,
    payment_failing: AtomicBool,
    notification_slow: AtomicBool,
    inventory_calls: AtomicUsize,
    payment_calls: AtomicUsize,
    notification_calls: AtomicUsize,
}

impl ServiceState {
    fn new() -> Self {
        Self {
            inventory_rate_limited: AtomicBool::new(false),
            payment_failing: AtomicBool::new(false),
            notification_slow: AtomicBool::new(false),
            inventory_calls: AtomicUsize::new(0),
            payment_calls: AtomicUsize::new(0),
            notification_calls: AtomicUsize::new(0),
        }
    }
}

// Policy types
type InventoryPolicy = Wrap<RateLimiter, Wrap<RetryPolicy, TimeoutPolicy>>;
type PaymentPolicy = Wrap<CircuitBreaker, Wrap<RetryPolicy, TimeoutPolicy>>;
type NotificationPolicy = Wrap<Hedge, TimeoutPolicy>;

fn create_inventory_policy() -> InventoryPolicy {
    Wrap::new(
        RateLimiter::new(3, Duration::from_secs(1)), // 3 requests per second
        Wrap::new(
            RetryPolicy::fixed(2, Duration::from_millis(100)),
            TimeoutPolicy::new(Duration::from_millis(500)),
        ),
    )
}

fn create_payment_policy() -> PaymentPolicy {
    Wrap::new(
        CircuitBreaker::new(3, Duration::from_secs(5)), // Opens after 3 failures
        Wrap::new(
            RetryPolicy::fixed(2, Duration::from_millis(200)),
            TimeoutPolicy::new(Duration::from_secs(1)),
        ),
    )
}

fn create_notification_policy() -> NotificationPolicy {
    Wrap::new(
        Hedge::new(Duration::from_millis(100)), // Start hedge after 100ms
        TimeoutPolicy::new(Duration::from_millis(500)),
    )
}

#[tokio::main]
async fn main() {
    println!("=== Do-Over Comprehensive Example: Order Processing System ===\n");
    println!("Services:");
    println!("  📦 Inventory: RateLimiter(3/s) → Retry(2) → Timeout(500ms)");
    println!("  💳 Payment:   CircuitBreaker(3, 5s) → Retry(2) → Timeout(1s)");
    println!("  📧 Notification: Hedge(100ms) → Timeout(500ms)\n");

    let state = Arc::new(ServiceState::new());

    // Scenario 1: Normal operation
    scenario1_normal(&state).await;

    println!("\n{}\n", "".repeat(70));

    // Scenario 2: Inventory rate limited
    scenario2_rate_limited(&state).await;

    println!("\n{}\n", "".repeat(70));

    // Scenario 3: Payment failures → circuit opens
    scenario3_payment_failures(&state).await;

    println!("\n{}\n", "".repeat(70));

    // Scenario 4: Slow notification → hedge wins
    scenario4_slow_notification(&state).await;
}

async fn scenario1_normal(state: &Arc<ServiceState>) {
    println!("📌 Scenario 1: Normal Operation");
    println!("   All services healthy - order should complete successfully\n");

    // Reset state
    state.inventory_rate_limited.store(false, Ordering::SeqCst);
    state.payment_failing.store(false, Ordering::SeqCst);
    state.notification_slow.store(false, Ordering::SeqCst);

    let start = Instant::now();
    process_order("ORD-001", state, &start).await;

    println!(
        "\n   Total order processing time: {}ms",
        start.elapsed().as_millis()
    );
}

async fn scenario2_rate_limited(state: &Arc<ServiceState>) {
    println!("📌 Scenario 2: Inventory Rate Limited");
    println!("   Burst of orders exceeds rate limit (3/second)\n");

    // Reset state
    state.inventory_rate_limited.store(false, Ordering::SeqCst);
    state.payment_failing.store(false, Ordering::SeqCst);
    state.notification_slow.store(false, Ordering::SeqCst);
    state.inventory_calls.store(0, Ordering::SeqCst);

    let inventory_policy = create_inventory_policy();
    let start = Instant::now();

    println!("   Sending 5 rapid inventory check requests...\n");

    for i in 1..=5 {
        let elapsed = start.elapsed().as_millis();
        let result: Result<String, DoOverError<String>> = inventory_policy
            .execute(|| async {
                state.inventory_calls.fetch_add(1, Ordering::SeqCst);
                tokio::time::sleep(Duration::from_millis(50)).await;
                Ok("Stock available".to_string())
            })
            .await;

        match &result {
            Ok(msg) => println!("   [+{:>4}ms] Order {}: ✅ {}", elapsed, i, msg),
            Err(DoOverError::BulkheadFull) => {
                println!("   [+{:>4}ms] Order {}: 🚫 Rate limited - try again later", elapsed, i)
            }
            Err(e) => println!("   [+{:>4}ms] Order {}: ❌ {:?}", elapsed, i, e),
        }
    }

    println!(
        "\n   Inventory service calls made: {}",
        state.inventory_calls.load(Ordering::SeqCst)
    );
    println!("   💡 Rate limiter protected the inventory service from overload");
}

async fn scenario3_payment_failures(state: &Arc<ServiceState>) {
    println!("📌 Scenario 3: Payment Service Failures");
    println!("   Payment service is failing - circuit will open after 3 failures\n");

    // Set payment to failing mode
    state.payment_failing.store(true, Ordering::SeqCst);
    state.payment_calls.store(0, Ordering::SeqCst);

    let payment_policy = create_payment_policy();
    let start = Instant::now();

    for i in 1..=5 {
        let elapsed = start.elapsed().as_millis();
        let failing = state.payment_failing.load(Ordering::SeqCst);

        let result: Result<String, DoOverError<String>> = payment_policy
            .execute(|| async {
                state.payment_calls.fetch_add(1, Ordering::SeqCst);
                tokio::time::sleep(Duration::from_millis(50)).await;
                if failing {
                    Err(DoOverError::Inner("Payment gateway unavailable".to_string()))
                } else {
                    Ok("Payment processed".to_string())
                }
            })
            .await;

        match &result {
            Ok(msg) => println!("   [+{:>4}ms] Payment {}: ✅ {}", elapsed, i, msg),
            Err(DoOverError::CircuitOpen) => {
                println!("   [+{:>4}ms] Payment {}: 🚫 Circuit OPEN - failing fast", elapsed, i)
            }
            Err(DoOverError::Inner(e)) => {
                println!("   [+{:>4}ms] Payment {}: ❌ {} (after retries)", elapsed, i, e)
            }
            Err(e) => println!("   [+{:>4}ms] Payment {}: ❌ {:?}", elapsed, i, e),
        }

        tokio::time::sleep(Duration::from_millis(100)).await;
    }

    println!(
        "\n   Payment service calls made: {} (includes retries)",
        state.payment_calls.load(Ordering::SeqCst)
    );
    println!("   💡 Circuit breaker opened after repeated failures, preventing further calls");

    // Show recovery
    println!("\n   Waiting for circuit reset (5s)...");
    tokio::time::sleep(Duration::from_secs(5)).await;
    state.payment_failing.store(false, Ordering::SeqCst);
    println!("   → Payment service recovered, circuit will test on next request");

    let elapsed = start.elapsed().as_millis();
    let result: Result<String, DoOverError<String>> = payment_policy
        .execute(|| async {
            state.payment_calls.fetch_add(1, Ordering::SeqCst);
            tokio::time::sleep(Duration::from_millis(50)).await;
            Ok("Payment processed".to_string())
        })
        .await;

    match &result {
        Ok(msg) => println!("   [+{:>4}ms] Recovery test: ✅ {} - Circuit CLOSED", elapsed, msg),
        Err(e) => println!("   [+{:>4}ms] Recovery test: {:?}", elapsed, e),
    }
}

async fn scenario4_slow_notification(state: &Arc<ServiceState>) {
    println!("📌 Scenario 4: Slow Notification Service");
    println!("   Notification is slow - hedge request will win\n");

    // Set notification to slow mode
    state.notification_slow.store(true, Ordering::SeqCst);
    state.notification_calls.store(0, Ordering::SeqCst);

    let notification_policy = create_notification_policy();
    let start = Instant::now();

    let slow = state.notification_slow.load(Ordering::SeqCst);
    let calls = Arc::new(AtomicUsize::new(0));

    let result: Result<String, DoOverError<String>> = {
        let c = Arc::clone(&calls);
        notification_policy
            .execute(|| {
                let count = Arc::clone(&c);
                async move {
                    let call_num = count.fetch_add(1, Ordering::SeqCst) + 1;
                    let elapsed = start.elapsed().as_millis();
                    let is_primary = call_num == 1;

                    println!(
                        "   [+{:>4}ms] Notification {} ({}) started",
                        elapsed,
                        call_num,
                        if is_primary { "primary" } else { "hedge" }
                    );

                    // Primary is slow, hedge is fast
                    let delay = if is_primary && slow { 400 } else { 50 };
                    tokio::time::sleep(Duration::from_millis(delay)).await;

                    let elapsed = start.elapsed().as_millis();
                    println!(
                        "   [+{:>4}ms] Notification {} ({}) completed",
                        elapsed,
                        call_num,
                        if is_primary { "primary" } else { "hedge" }
                    );

                    Ok(format!(
                        "Email sent via {}",
                        if is_primary { "primary" } else { "hedge" }
                    ))
                }
            })
            .await
    };

    let elapsed = start.elapsed().as_millis();
    let total_calls = calls.load(Ordering::SeqCst);

    match &result {
        Ok(msg) => println!("\n   [+{:>4}ms] Result: ✅ {}", elapsed, msg),
        Err(e) => println!("\n   [+{:>4}ms] Result: {:?}", elapsed, e),
    }

    println!("   Notification calls made: {}", total_calls);
    println!("   💡 Hedge request completed first, reducing user-perceived latency");
}

async fn process_order(order_id: &str, state: &Arc<ServiceState>, start: &Instant) {
    println!("   Processing order: {}", order_id);

    let inventory_policy = create_inventory_policy();
    let payment_policy = create_payment_policy();
    let notification_policy = create_notification_policy();

    // Step 1: Check inventory
    let elapsed = start.elapsed().as_millis();
    println!("   [+{:>4}ms] Step 1: Checking inventory...", elapsed);

    let inventory_result: Result<String, DoOverError<String>> = inventory_policy
        .execute(|| async {
            state.inventory_calls.fetch_add(1, Ordering::SeqCst);
            tokio::time::sleep(Duration::from_millis(100)).await;
            Ok("Stock available".to_string())
        })
        .await;

    let elapsed = start.elapsed().as_millis();
    match &inventory_result {
        Ok(msg) => println!("   [+{:>4}ms]   → ✅ {}", elapsed, msg),
        Err(e) => {
            println!("   [+{:>4}ms]   → ❌ {:?}", elapsed, e);
            return;
        }
    }

    // Step 2: Process payment
    let elapsed = start.elapsed().as_millis();
    println!("   [+{:>4}ms] Step 2: Processing payment...", elapsed);

    let payment_result: Result<String, DoOverError<String>> = payment_policy
        .execute(|| async {
            state.payment_calls.fetch_add(1, Ordering::SeqCst);
            tokio::time::sleep(Duration::from_millis(150)).await;
            if state.payment_failing.load(Ordering::SeqCst) {
                Err(DoOverError::Inner("Payment failed".to_string()))
            } else {
                Ok("Payment processed".to_string())
            }
        })
        .await;

    let elapsed = start.elapsed().as_millis();
    match &payment_result {
        Ok(msg) => println!("   [+{:>4}ms]   → ✅ {}", elapsed, msg),
        Err(e) => {
            println!("   [+{:>4}ms]   → ❌ {:?}", elapsed, e);
            return;
        }
    }

    // Step 3: Send notification
    let elapsed = start.elapsed().as_millis();
    println!("   [+{:>4}ms] Step 3: Sending notification...", elapsed);

    let notification_result: Result<String, DoOverError<String>> = notification_policy
        .execute(|| async {
            state.notification_calls.fetch_add(1, Ordering::SeqCst);
            let delay = if state.notification_slow.load(Ordering::SeqCst) {
                400
            } else {
                50
            };
            tokio::time::sleep(Duration::from_millis(delay)).await;
            Ok("Notification sent".to_string())
        })
        .await;

    let elapsed = start.elapsed().as_millis();
    match &notification_result {
        Ok(msg) => println!("   [+{:>4}ms]   → ✅ {}", elapsed, msg),
        Err(e) => println!("   [+{:>4}ms]   → ⚠️  {:?} (non-critical)", elapsed, e),
    }

    let elapsed = start.elapsed().as_millis();
    println!("\n   [+{:>4}ms] ✨ Order {} completed successfully!", elapsed, order_id);
}