Skip to main content

comprehensive/
comprehensive.rs

1//! Comprehensive Example: Order Processing System
2//!
3//! This example simulates a real-world order processing system with multiple
4//! services, each protected by different resilience policies.
5//!
6//! Services:
7//! - Inventory Service: Rate limited + Retry + Timeout
8//! - Payment Service: CircuitBreaker + Retry + Timeout
9//! - Notification Service: Hedge + Timeout
10
11use do_over::{
12    circuit_breaker::CircuitBreaker,
13    error::DoOverError,
14    hedge::Hedge,
15    policy::Policy,
16    rate_limit::RateLimiter,
17    retry::RetryPolicy,
18    timeout::TimeoutPolicy,
19    wrap::Wrap,
20};
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25// Simulated service state
26struct ServiceState {
27    inventory_rate_limited: AtomicBool,
28    payment_failing: AtomicBool,
29    notification_slow: AtomicBool,
30    inventory_calls: AtomicUsize,
31    payment_calls: AtomicUsize,
32    notification_calls: AtomicUsize,
33}
34
35impl ServiceState {
36    fn new() -> Self {
37        Self {
38            inventory_rate_limited: AtomicBool::new(false),
39            payment_failing: AtomicBool::new(false),
40            notification_slow: AtomicBool::new(false),
41            inventory_calls: AtomicUsize::new(0),
42            payment_calls: AtomicUsize::new(0),
43            notification_calls: AtomicUsize::new(0),
44        }
45    }
46}
47
48// Policy types
49type InventoryPolicy = Wrap<RateLimiter, Wrap<RetryPolicy, TimeoutPolicy>>;
50type PaymentPolicy = Wrap<CircuitBreaker, Wrap<RetryPolicy, TimeoutPolicy>>;
51type NotificationPolicy = Wrap<Hedge, TimeoutPolicy>;
52
53fn create_inventory_policy() -> InventoryPolicy {
54    Wrap::new(
55        RateLimiter::new(3, Duration::from_secs(1)), // 3 requests per second
56        Wrap::new(
57            RetryPolicy::fixed(2, Duration::from_millis(100)),
58            TimeoutPolicy::new(Duration::from_millis(500)),
59        ),
60    )
61}
62
63fn create_payment_policy() -> PaymentPolicy {
64    Wrap::new(
65        CircuitBreaker::new(3, Duration::from_secs(5)), // Opens after 3 failures
66        Wrap::new(
67            RetryPolicy::fixed(2, Duration::from_millis(200)),
68            TimeoutPolicy::new(Duration::from_secs(1)),
69        ),
70    )
71}
72
73fn create_notification_policy() -> NotificationPolicy {
74    Wrap::new(
75        Hedge::new(Duration::from_millis(100)), // Start hedge after 100ms
76        TimeoutPolicy::new(Duration::from_millis(500)),
77    )
78}
79
80#[tokio::main]
81async fn main() {
82    println!("=== Do-Over Comprehensive Example: Order Processing System ===\n");
83    println!("Services:");
84    println!("  📦 Inventory: RateLimiter(3/s) → Retry(2) → Timeout(500ms)");
85    println!("  💳 Payment:   CircuitBreaker(3, 5s) → Retry(2) → Timeout(1s)");
86    println!("  📧 Notification: Hedge(100ms) → Timeout(500ms)\n");
87
88    let state = Arc::new(ServiceState::new());
89
90    // Scenario 1: Normal operation
91    scenario1_normal(&state).await;
92
93    println!("\n{}\n", "═".repeat(70));
94
95    // Scenario 2: Inventory rate limited
96    scenario2_rate_limited(&state).await;
97
98    println!("\n{}\n", "═".repeat(70));
99
100    // Scenario 3: Payment failures → circuit opens
101    scenario3_payment_failures(&state).await;
102
103    println!("\n{}\n", "═".repeat(70));
104
105    // Scenario 4: Slow notification → hedge wins
106    scenario4_slow_notification(&state).await;
107}
108
109async fn scenario1_normal(state: &Arc<ServiceState>) {
110    println!("📌 Scenario 1: Normal Operation");
111    println!("   All services healthy - order should complete successfully\n");
112
113    // Reset state
114    state.inventory_rate_limited.store(false, Ordering::SeqCst);
115    state.payment_failing.store(false, Ordering::SeqCst);
116    state.notification_slow.store(false, Ordering::SeqCst);
117
118    let start = Instant::now();
119    process_order("ORD-001", state, &start).await;
120
121    println!(
122        "\n   Total order processing time: {}ms",
123        start.elapsed().as_millis()
124    );
125}
126
127async fn scenario2_rate_limited(state: &Arc<ServiceState>) {
128    println!("📌 Scenario 2: Inventory Rate Limited");
129    println!("   Burst of orders exceeds rate limit (3/second)\n");
130
131    // Reset state
132    state.inventory_rate_limited.store(false, Ordering::SeqCst);
133    state.payment_failing.store(false, Ordering::SeqCst);
134    state.notification_slow.store(false, Ordering::SeqCst);
135    state.inventory_calls.store(0, Ordering::SeqCst);
136
137    let inventory_policy = create_inventory_policy();
138    let start = Instant::now();
139
140    println!("   Sending 5 rapid inventory check requests...\n");
141
142    for i in 1..=5 {
143        let elapsed = start.elapsed().as_millis();
144        let result: Result<String, DoOverError<String>> = inventory_policy
145            .execute(|| async {
146                state.inventory_calls.fetch_add(1, Ordering::SeqCst);
147                tokio::time::sleep(Duration::from_millis(50)).await;
148                Ok("Stock available".to_string())
149            })
150            .await;
151
152        match &result {
153            Ok(msg) => println!("   [+{:>4}ms] Order {}: ✅ {}", elapsed, i, msg),
154            Err(DoOverError::BulkheadFull) => {
155                println!("   [+{:>4}ms] Order {}: 🚫 Rate limited - try again later", elapsed, i)
156            }
157            Err(e) => println!("   [+{:>4}ms] Order {}: ❌ {:?}", elapsed, i, e),
158        }
159    }
160
161    println!(
162        "\n   Inventory service calls made: {}",
163        state.inventory_calls.load(Ordering::SeqCst)
164    );
165    println!("   💡 Rate limiter protected the inventory service from overload");
166}
167
168async fn scenario3_payment_failures(state: &Arc<ServiceState>) {
169    println!("📌 Scenario 3: Payment Service Failures");
170    println!("   Payment service is failing - circuit will open after 3 failures\n");
171
172    // Set payment to failing mode
173    state.payment_failing.store(true, Ordering::SeqCst);
174    state.payment_calls.store(0, Ordering::SeqCst);
175
176    let payment_policy = create_payment_policy();
177    let start = Instant::now();
178
179    for i in 1..=5 {
180        let elapsed = start.elapsed().as_millis();
181        let failing = state.payment_failing.load(Ordering::SeqCst);
182
183        let result: Result<String, DoOverError<String>> = payment_policy
184            .execute(|| async {
185                state.payment_calls.fetch_add(1, Ordering::SeqCst);
186                tokio::time::sleep(Duration::from_millis(50)).await;
187                if failing {
188                    Err(DoOverError::Inner("Payment gateway unavailable".to_string()))
189                } else {
190                    Ok("Payment processed".to_string())
191                }
192            })
193            .await;
194
195        match &result {
196            Ok(msg) => println!("   [+{:>4}ms] Payment {}: ✅ {}", elapsed, i, msg),
197            Err(DoOverError::CircuitOpen) => {
198                println!("   [+{:>4}ms] Payment {}: 🚫 Circuit OPEN - failing fast", elapsed, i)
199            }
200            Err(DoOverError::Inner(e)) => {
201                println!("   [+{:>4}ms] Payment {}: ❌ {} (after retries)", elapsed, i, e)
202            }
203            Err(e) => println!("   [+{:>4}ms] Payment {}: ❌ {:?}", elapsed, i, e),
204        }
205
206        tokio::time::sleep(Duration::from_millis(100)).await;
207    }
208
209    println!(
210        "\n   Payment service calls made: {} (includes retries)",
211        state.payment_calls.load(Ordering::SeqCst)
212    );
213    println!("   💡 Circuit breaker opened after repeated failures, preventing further calls");
214
215    // Show recovery
216    println!("\n   Waiting for circuit reset (5s)...");
217    tokio::time::sleep(Duration::from_secs(5)).await;
218    state.payment_failing.store(false, Ordering::SeqCst);
219    println!("   → Payment service recovered, circuit will test on next request");
220
221    let elapsed = start.elapsed().as_millis();
222    let result: Result<String, DoOverError<String>> = payment_policy
223        .execute(|| async {
224            state.payment_calls.fetch_add(1, Ordering::SeqCst);
225            tokio::time::sleep(Duration::from_millis(50)).await;
226            Ok("Payment processed".to_string())
227        })
228        .await;
229
230    match &result {
231        Ok(msg) => println!("   [+{:>4}ms] Recovery test: ✅ {} - Circuit CLOSED", elapsed, msg),
232        Err(e) => println!("   [+{:>4}ms] Recovery test: {:?}", elapsed, e),
233    }
234}
235
236async fn scenario4_slow_notification(state: &Arc<ServiceState>) {
237    println!("📌 Scenario 4: Slow Notification Service");
238    println!("   Notification is slow - hedge request will win\n");
239
240    // Set notification to slow mode
241    state.notification_slow.store(true, Ordering::SeqCst);
242    state.notification_calls.store(0, Ordering::SeqCst);
243
244    let notification_policy = create_notification_policy();
245    let start = Instant::now();
246
247    let slow = state.notification_slow.load(Ordering::SeqCst);
248    let calls = Arc::new(AtomicUsize::new(0));
249
250    let result: Result<String, DoOverError<String>> = {
251        let c = Arc::clone(&calls);
252        notification_policy
253            .execute(|| {
254                let count = Arc::clone(&c);
255                async move {
256                    let call_num = count.fetch_add(1, Ordering::SeqCst) + 1;
257                    let elapsed = start.elapsed().as_millis();
258                    let is_primary = call_num == 1;
259
260                    println!(
261                        "   [+{:>4}ms] Notification {} ({}) started",
262                        elapsed,
263                        call_num,
264                        if is_primary { "primary" } else { "hedge" }
265                    );
266
267                    // Primary is slow, hedge is fast
268                    let delay = if is_primary && slow { 400 } else { 50 };
269                    tokio::time::sleep(Duration::from_millis(delay)).await;
270
271                    let elapsed = start.elapsed().as_millis();
272                    println!(
273                        "   [+{:>4}ms] Notification {} ({}) completed",
274                        elapsed,
275                        call_num,
276                        if is_primary { "primary" } else { "hedge" }
277                    );
278
279                    Ok(format!(
280                        "Email sent via {}",
281                        if is_primary { "primary" } else { "hedge" }
282                    ))
283                }
284            })
285            .await
286    };
287
288    let elapsed = start.elapsed().as_millis();
289    let total_calls = calls.load(Ordering::SeqCst);
290
291    match &result {
292        Ok(msg) => println!("\n   [+{:>4}ms] Result: ✅ {}", elapsed, msg),
293        Err(e) => println!("\n   [+{:>4}ms] Result: {:?}", elapsed, e),
294    }
295
296    println!("   Notification calls made: {}", total_calls);
297    println!("   💡 Hedge request completed first, reducing user-perceived latency");
298}
299
300async fn process_order(order_id: &str, state: &Arc<ServiceState>, start: &Instant) {
301    println!("   Processing order: {}", order_id);
302
303    let inventory_policy = create_inventory_policy();
304    let payment_policy = create_payment_policy();
305    let notification_policy = create_notification_policy();
306
307    // Step 1: Check inventory
308    let elapsed = start.elapsed().as_millis();
309    println!("   [+{:>4}ms] Step 1: Checking inventory...", elapsed);
310
311    let inventory_result: Result<String, DoOverError<String>> = inventory_policy
312        .execute(|| async {
313            state.inventory_calls.fetch_add(1, Ordering::SeqCst);
314            tokio::time::sleep(Duration::from_millis(100)).await;
315            Ok("Stock available".to_string())
316        })
317        .await;
318
319    let elapsed = start.elapsed().as_millis();
320    match &inventory_result {
321        Ok(msg) => println!("   [+{:>4}ms]   → ✅ {}", elapsed, msg),
322        Err(e) => {
323            println!("   [+{:>4}ms]   → ❌ {:?}", elapsed, e);
324            return;
325        }
326    }
327
328    // Step 2: Process payment
329    let elapsed = start.elapsed().as_millis();
330    println!("   [+{:>4}ms] Step 2: Processing payment...", elapsed);
331
332    let payment_result: Result<String, DoOverError<String>> = payment_policy
333        .execute(|| async {
334            state.payment_calls.fetch_add(1, Ordering::SeqCst);
335            tokio::time::sleep(Duration::from_millis(150)).await;
336            if state.payment_failing.load(Ordering::SeqCst) {
337                Err(DoOverError::Inner("Payment failed".to_string()))
338            } else {
339                Ok("Payment processed".to_string())
340            }
341        })
342        .await;
343
344    let elapsed = start.elapsed().as_millis();
345    match &payment_result {
346        Ok(msg) => println!("   [+{:>4}ms]   → ✅ {}", elapsed, msg),
347        Err(e) => {
348            println!("   [+{:>4}ms]   → ❌ {:?}", elapsed, e);
349            return;
350        }
351    }
352
353    // Step 3: Send notification
354    let elapsed = start.elapsed().as_millis();
355    println!("   [+{:>4}ms] Step 3: Sending notification...", elapsed);
356
357    let notification_result: Result<String, DoOverError<String>> = notification_policy
358        .execute(|| async {
359            state.notification_calls.fetch_add(1, Ordering::SeqCst);
360            let delay = if state.notification_slow.load(Ordering::SeqCst) {
361                400
362            } else {
363                50
364            };
365            tokio::time::sleep(Duration::from_millis(delay)).await;
366            Ok("Notification sent".to_string())
367        })
368        .await;
369
370    let elapsed = start.elapsed().as_millis();
371    match &notification_result {
372        Ok(msg) => println!("   [+{:>4}ms]   → ✅ {}", elapsed, msg),
373        Err(e) => println!("   [+{:>4}ms]   → ⚠️  {:?} (non-critical)", elapsed, e),
374    }
375
376    let elapsed = start.elapsed().as_millis();
377    println!("\n   [+{:>4}ms] ✨ Order {} completed successfully!", elapsed, order_id);
378}