Skip to main content

composition/
composition.rs

1//! Policy Composition Example
2//!
3//! This example demonstrates how to compose multiple policies using Wrap.
4//! Execution flows from outer policy → inner policy → operation.
5
6use do_over::{
7    bulkhead::Bulkhead,
8    circuit_breaker::CircuitBreaker,
9    error::DoOverError,
10    policy::Policy,
11    retry::RetryPolicy,
12    timeout::TimeoutPolicy,
13    wrap::Wrap,
14};
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19#[tokio::main]
20async fn main() {
21    println!("=== Do-Over Policy Composition Example ===\n");
22
23    // Pattern 1: Retry + Timeout (each retry has its own timeout)
24    pattern1_retry_timeout().await;
25
26    println!("\n{}\n", "─".repeat(60));
27
28    // Pattern 2: CircuitBreaker + Retry (retries only when circuit is closed)
29    pattern2_circuit_retry().await;
30
31    println!("\n{}\n", "─".repeat(60));
32
33    // Pattern 3: Full stack - Bulkhead → CircuitBreaker → Retry → Timeout
34    pattern3_full_stack().await;
35}
36
37async fn pattern1_retry_timeout() {
38    println!("📌 Pattern 1: Retry wrapping Timeout");
39    println!("   Wrap::new(Retry(3, 100ms), Timeout(200ms))");
40    println!("   → Each retry attempt gets its own 200ms timeout\n");
41
42    let policy = Wrap::new(
43        RetryPolicy::fixed(3, Duration::from_millis(100)),
44        TimeoutPolicy::new(Duration::from_millis(200)),
45    );
46
47    let attempt_count = Arc::new(AtomicUsize::new(0));
48    let start = Instant::now();
49
50    // Scenario: First 2 attempts timeout, third succeeds quickly
51    let result: Result<String, DoOverError<String>> = {
52        let ac = Arc::clone(&attempt_count);
53        policy
54            .execute(|| {
55                let count = Arc::clone(&ac);
56                async move {
57                    let attempt = count.fetch_add(1, Ordering::SeqCst) + 1;
58                    let elapsed = start.elapsed().as_millis();
59                    println!(
60                        "   [+{:>4}ms] Attempt {}: Started",
61                        elapsed, attempt
62                    );
63
64                    if attempt < 3 {
65                        // First two attempts are slow - will timeout
66                        println!(
67                            "   [+{:>4}ms] Attempt {}: Processing slowly (will timeout)...",
68                            elapsed, attempt
69                        );
70                        tokio::time::sleep(Duration::from_millis(500)).await;
71                        Ok("Should not reach here".to_string())
72                    } else {
73                        // Third attempt is fast
74                        tokio::time::sleep(Duration::from_millis(50)).await;
75                        let elapsed = start.elapsed().as_millis();
76                        println!(
77                            "   [+{:>4}ms] Attempt {}: ✅ Completed quickly!",
78                            elapsed, attempt
79                        );
80                        Ok("Success on attempt 3".to_string())
81                    }
82                }
83            })
84            .await
85    };
86
87    let total_time = start.elapsed().as_millis();
88    println!("\n   Result: {:?}", result.unwrap());
89    println!("   Total time: {}ms", total_time);
90    println!("   Total attempts: {}", attempt_count.load(Ordering::SeqCst));
91    println!("\n   💡 Each attempt had its own timeout; operation succeeded on 3rd try");
92}
93
94async fn pattern2_circuit_retry() {
95    println!("📌 Pattern 2: CircuitBreaker wrapping Retry");
96    println!("   Wrap::new(CircuitBreaker(3, 1s), Retry(2, 50ms))");
97    println!("   → Retries happen inside the circuit breaker\n");
98
99    let policy = Wrap::new(
100        CircuitBreaker::new(3, Duration::from_secs(1)),
101        RetryPolicy::fixed(2, Duration::from_millis(50)),
102    );
103
104    let failure_count = Arc::new(AtomicUsize::new(0));
105    let start = Instant::now();
106
107    // Make several calls that fail to open the circuit
108    println!("   Phase 1: Failing calls to open circuit");
109    for i in 1..=4 {
110        let fc = Arc::clone(&failure_count);
111        let s = start;
112        let result: Result<String, DoOverError<String>> = policy
113            .execute(|| {
114                let count = Arc::clone(&fc);
115                async move {
116                    count.fetch_add(1, Ordering::SeqCst);
117                    let elapsed = s.elapsed().as_millis();
118                    println!(
119                        "   [+{:>4}ms] Call {}: Inner operation failing...",
120                        elapsed, i
121                    );
122                    Err(DoOverError::Inner(format!("Service error")))
123                }
124            })
125            .await;
126
127        let elapsed = start.elapsed().as_millis();
128        match &result {
129            Err(DoOverError::Inner(_)) => {
130                println!(
131                    "   [+{:>4}ms] Call {}: ❌ Failed after retries",
132                    elapsed, i
133                );
134            }
135            Err(DoOverError::CircuitOpen) => {
136                println!(
137                    "   [+{:>4}ms] Call {}: 🚫 Circuit is OPEN",
138                    elapsed, i
139                );
140            }
141            _ => {}
142        }
143    }
144
145    let inner_calls = failure_count.load(Ordering::SeqCst);
146    println!("\n   Summary:");
147    println!("   - Inner operation calls: {} (includes retries)", inner_calls);
148    println!("   - Circuit opened after threshold reached");
149    println!("\n   💡 Retries happened inside circuit breaker; failures accumulated to open circuit");
150}
151
152async fn pattern3_full_stack() {
153    println!("📌 Pattern 3: Full Resilience Stack");
154    println!("   Bulkhead(2) → Retry(2, 100ms) → Timeout(500ms)");
155    println!("   Recommended ordering for production systems\n");
156
157    // Note: Using Bulkhead + Retry + Timeout for this example to avoid
158    // CircuitBreaker clone issues in concurrent async context
159    let policy = Arc::new(Wrap::new(
160        Bulkhead::new(2),
161        Wrap::new(
162            RetryPolicy::fixed(2, Duration::from_millis(100)),
163            TimeoutPolicy::new(Duration::from_millis(500)),
164        ),
165    ));
166
167    let call_count = Arc::new(AtomicUsize::new(0));
168    let start = Instant::now();
169
170    println!("   Launching 4 concurrent requests (bulkhead limit is 2)...\n");
171
172    let mut handles = vec![];
173    for i in 1..=4 {
174        let p = Arc::clone(&policy);
175        let cc = Arc::clone(&call_count);
176        let s = start;
177
178        let handle = tokio::spawn(async move {
179            let result: Result<String, DoOverError<String>> = p
180                .execute(|| {
181                    let count = Arc::clone(&cc);
182                    async move {
183                        let call = count.fetch_add(1, Ordering::SeqCst) + 1;
184                        let elapsed = s.elapsed().as_millis();
185                        println!(
186                            "   [+{:>4}ms] Request {}, call {}: Processing...",
187                            elapsed, i, call
188                        );
189                        tokio::time::sleep(Duration::from_millis(200)).await;
190                        let elapsed = s.elapsed().as_millis();
191                        println!(
192                            "   [+{:>4}ms] Request {}, call {}: ✅ Done",
193                            elapsed, i, call
194                        );
195                        Ok(format!("Response {}", i))
196                    }
197                })
198                .await;
199
200            let elapsed = s.elapsed().as_millis();
201            match &result {
202                Ok(msg) => println!("   [+{:>4}ms] Request {}: ✅ {}", elapsed, i, msg),
203                Err(DoOverError::BulkheadFull) => {
204                    println!("   [+{:>4}ms] Request {}: 🚫 Rejected by bulkhead", elapsed, i)
205                }
206                Err(e) => println!("   [+{:>4}ms] Request {}: ❌ {:?}", elapsed, i, e),
207            }
208        });
209        handles.push(handle);
210
211        tokio::time::sleep(Duration::from_millis(10)).await;
212    }
213
214    for handle in handles {
215        handle.await.unwrap();
216    }
217
218    let total_time = start.elapsed().as_millis();
219    let total_calls = call_count.load(Ordering::SeqCst);
220
221    println!("\n   Summary:");
222    println!("   - Total inner operation calls: {}", total_calls);
223    println!("   - Total time: {}ms", total_time);
224    println!("\n   Policy execution order:");
225    println!("   1. Bulkhead: Limits to 2 concurrent executions");
226    println!("   2. Retry: Retries transient failures");
227    println!("   3. Timeout: Each attempt bounded to 500ms");
228}