use do_over::{
circuit_breaker::CircuitBreaker,
error::DoOverError,
hedge::Hedge,
policy::Policy,
rate_limit::RateLimiter,
retry::RetryPolicy,
timeout::TimeoutPolicy,
wrap::Wrap,
};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
struct ServiceState {
inventory_rate_limited: AtomicBool,
payment_failing: AtomicBool,
notification_slow: AtomicBool,
inventory_calls: AtomicUsize,
payment_calls: AtomicUsize,
notification_calls: AtomicUsize,
}
impl ServiceState {
fn new() -> Self {
Self {
inventory_rate_limited: AtomicBool::new(false),
payment_failing: AtomicBool::new(false),
notification_slow: AtomicBool::new(false),
inventory_calls: AtomicUsize::new(0),
payment_calls: AtomicUsize::new(0),
notification_calls: AtomicUsize::new(0),
}
}
}
type InventoryPolicy = Wrap<RateLimiter, Wrap<RetryPolicy, TimeoutPolicy>>;
type PaymentPolicy = Wrap<CircuitBreaker, Wrap<RetryPolicy, TimeoutPolicy>>;
type NotificationPolicy = Wrap<Hedge, TimeoutPolicy>;
fn create_inventory_policy() -> InventoryPolicy {
Wrap::new(
RateLimiter::new(3, Duration::from_secs(1)), Wrap::new(
RetryPolicy::fixed(2, Duration::from_millis(100)),
TimeoutPolicy::new(Duration::from_millis(500)),
),
)
}
fn create_payment_policy() -> PaymentPolicy {
Wrap::new(
CircuitBreaker::new(3, Duration::from_secs(5)), Wrap::new(
RetryPolicy::fixed(2, Duration::from_millis(200)),
TimeoutPolicy::new(Duration::from_secs(1)),
),
)
}
fn create_notification_policy() -> NotificationPolicy {
Wrap::new(
Hedge::new(Duration::from_millis(100)), TimeoutPolicy::new(Duration::from_millis(500)),
)
}
#[tokio::main]
async fn main() {
println!("=== Do-Over Comprehensive Example: Order Processing System ===\n");
println!("Services:");
println!(" 📦 Inventory: RateLimiter(3/s) → Retry(2) → Timeout(500ms)");
println!(" 💳 Payment: CircuitBreaker(3, 5s) → Retry(2) → Timeout(1s)");
println!(" 📧 Notification: Hedge(100ms) → Timeout(500ms)\n");
let state = Arc::new(ServiceState::new());
scenario1_normal(&state).await;
println!("\n{}\n", "═".repeat(70));
scenario2_rate_limited(&state).await;
println!("\n{}\n", "═".repeat(70));
scenario3_payment_failures(&state).await;
println!("\n{}\n", "═".repeat(70));
scenario4_slow_notification(&state).await;
}
async fn scenario1_normal(state: &Arc<ServiceState>) {
println!("📌 Scenario 1: Normal Operation");
println!(" All services healthy - order should complete successfully\n");
state.inventory_rate_limited.store(false, Ordering::SeqCst);
state.payment_failing.store(false, Ordering::SeqCst);
state.notification_slow.store(false, Ordering::SeqCst);
let start = Instant::now();
process_order("ORD-001", state, &start).await;
println!(
"\n Total order processing time: {}ms",
start.elapsed().as_millis()
);
}
async fn scenario2_rate_limited(state: &Arc<ServiceState>) {
println!("📌 Scenario 2: Inventory Rate Limited");
println!(" Burst of orders exceeds rate limit (3/second)\n");
state.inventory_rate_limited.store(false, Ordering::SeqCst);
state.payment_failing.store(false, Ordering::SeqCst);
state.notification_slow.store(false, Ordering::SeqCst);
state.inventory_calls.store(0, Ordering::SeqCst);
let inventory_policy = create_inventory_policy();
let start = Instant::now();
println!(" Sending 5 rapid inventory check requests...\n");
for i in 1..=5 {
let elapsed = start.elapsed().as_millis();
let result: Result<String, DoOverError<String>> = inventory_policy
.execute(|| async {
state.inventory_calls.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(50)).await;
Ok("Stock available".to_string())
})
.await;
match &result {
Ok(msg) => println!(" [+{:>4}ms] Order {}: ✅ {}", elapsed, i, msg),
Err(DoOverError::BulkheadFull) => {
println!(" [+{:>4}ms] Order {}: 🚫 Rate limited - try again later", elapsed, i)
}
Err(e) => println!(" [+{:>4}ms] Order {}: ❌ {:?}", elapsed, i, e),
}
}
println!(
"\n Inventory service calls made: {}",
state.inventory_calls.load(Ordering::SeqCst)
);
println!(" 💡 Rate limiter protected the inventory service from overload");
}
async fn scenario3_payment_failures(state: &Arc<ServiceState>) {
println!("📌 Scenario 3: Payment Service Failures");
println!(" Payment service is failing - circuit will open after 3 failures\n");
state.payment_failing.store(true, Ordering::SeqCst);
state.payment_calls.store(0, Ordering::SeqCst);
let payment_policy = create_payment_policy();
let start = Instant::now();
for i in 1..=5 {
let elapsed = start.elapsed().as_millis();
let failing = state.payment_failing.load(Ordering::SeqCst);
let result: Result<String, DoOverError<String>> = payment_policy
.execute(|| async {
state.payment_calls.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(50)).await;
if failing {
Err(DoOverError::Inner("Payment gateway unavailable".to_string()))
} else {
Ok("Payment processed".to_string())
}
})
.await;
match &result {
Ok(msg) => println!(" [+{:>4}ms] Payment {}: ✅ {}", elapsed, i, msg),
Err(DoOverError::CircuitOpen) => {
println!(" [+{:>4}ms] Payment {}: 🚫 Circuit OPEN - failing fast", elapsed, i)
}
Err(DoOverError::Inner(e)) => {
println!(" [+{:>4}ms] Payment {}: ❌ {} (after retries)", elapsed, i, e)
}
Err(e) => println!(" [+{:>4}ms] Payment {}: ❌ {:?}", elapsed, i, e),
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
println!(
"\n Payment service calls made: {} (includes retries)",
state.payment_calls.load(Ordering::SeqCst)
);
println!(" 💡 Circuit breaker opened after repeated failures, preventing further calls");
println!("\n Waiting for circuit reset (5s)...");
tokio::time::sleep(Duration::from_secs(5)).await;
state.payment_failing.store(false, Ordering::SeqCst);
println!(" → Payment service recovered, circuit will test on next request");
let elapsed = start.elapsed().as_millis();
let result: Result<String, DoOverError<String>> = payment_policy
.execute(|| async {
state.payment_calls.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(50)).await;
Ok("Payment processed".to_string())
})
.await;
match &result {
Ok(msg) => println!(" [+{:>4}ms] Recovery test: ✅ {} - Circuit CLOSED", elapsed, msg),
Err(e) => println!(" [+{:>4}ms] Recovery test: {:?}", elapsed, e),
}
}
async fn scenario4_slow_notification(state: &Arc<ServiceState>) {
println!("📌 Scenario 4: Slow Notification Service");
println!(" Notification is slow - hedge request will win\n");
state.notification_slow.store(true, Ordering::SeqCst);
state.notification_calls.store(0, Ordering::SeqCst);
let notification_policy = create_notification_policy();
let start = Instant::now();
let slow = state.notification_slow.load(Ordering::SeqCst);
let calls = Arc::new(AtomicUsize::new(0));
let result: Result<String, DoOverError<String>> = {
let c = Arc::clone(&calls);
notification_policy
.execute(|| {
let count = Arc::clone(&c);
async move {
let call_num = count.fetch_add(1, Ordering::SeqCst) + 1;
let elapsed = start.elapsed().as_millis();
let is_primary = call_num == 1;
println!(
" [+{:>4}ms] Notification {} ({}) started",
elapsed,
call_num,
if is_primary { "primary" } else { "hedge" }
);
let delay = if is_primary && slow { 400 } else { 50 };
tokio::time::sleep(Duration::from_millis(delay)).await;
let elapsed = start.elapsed().as_millis();
println!(
" [+{:>4}ms] Notification {} ({}) completed",
elapsed,
call_num,
if is_primary { "primary" } else { "hedge" }
);
Ok(format!(
"Email sent via {}",
if is_primary { "primary" } else { "hedge" }
))
}
})
.await
};
let elapsed = start.elapsed().as_millis();
let total_calls = calls.load(Ordering::SeqCst);
match &result {
Ok(msg) => println!("\n [+{:>4}ms] Result: ✅ {}", elapsed, msg),
Err(e) => println!("\n [+{:>4}ms] Result: {:?}", elapsed, e),
}
println!(" Notification calls made: {}", total_calls);
println!(" 💡 Hedge request completed first, reducing user-perceived latency");
}
async fn process_order(order_id: &str, state: &Arc<ServiceState>, start: &Instant) {
println!(" Processing order: {}", order_id);
let inventory_policy = create_inventory_policy();
let payment_policy = create_payment_policy();
let notification_policy = create_notification_policy();
let elapsed = start.elapsed().as_millis();
println!(" [+{:>4}ms] Step 1: Checking inventory...", elapsed);
let inventory_result: Result<String, DoOverError<String>> = inventory_policy
.execute(|| async {
state.inventory_calls.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(100)).await;
Ok("Stock available".to_string())
})
.await;
let elapsed = start.elapsed().as_millis();
match &inventory_result {
Ok(msg) => println!(" [+{:>4}ms] → ✅ {}", elapsed, msg),
Err(e) => {
println!(" [+{:>4}ms] → ❌ {:?}", elapsed, e);
return;
}
}
let elapsed = start.elapsed().as_millis();
println!(" [+{:>4}ms] Step 2: Processing payment...", elapsed);
let payment_result: Result<String, DoOverError<String>> = payment_policy
.execute(|| async {
state.payment_calls.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(150)).await;
if state.payment_failing.load(Ordering::SeqCst) {
Err(DoOverError::Inner("Payment failed".to_string()))
} else {
Ok("Payment processed".to_string())
}
})
.await;
let elapsed = start.elapsed().as_millis();
match &payment_result {
Ok(msg) => println!(" [+{:>4}ms] → ✅ {}", elapsed, msg),
Err(e) => {
println!(" [+{:>4}ms] → ❌ {:?}", elapsed, e);
return;
}
}
let elapsed = start.elapsed().as_millis();
println!(" [+{:>4}ms] Step 3: Sending notification...", elapsed);
let notification_result: Result<String, DoOverError<String>> = notification_policy
.execute(|| async {
state.notification_calls.fetch_add(1, Ordering::SeqCst);
let delay = if state.notification_slow.load(Ordering::SeqCst) {
400
} else {
50
};
tokio::time::sleep(Duration::from_millis(delay)).await;
Ok("Notification sent".to_string())
})
.await;
let elapsed = start.elapsed().as_millis();
match ¬ification_result {
Ok(msg) => println!(" [+{:>4}ms] → ✅ {}", elapsed, msg),
Err(e) => println!(" [+{:>4}ms] → ⚠️ {:?} (non-critical)", elapsed, e),
}
let elapsed = start.elapsed().as_millis();
println!("\n [+{:>4}ms] ✨ Order {} completed successfully!", elapsed, order_id);
}