composition/
composition.rs1use do_over::{
7 bulkhead::Bulkhead,
8 circuit_breaker::CircuitBreaker,
9 error::DoOverError,
10 policy::Policy,
11 retry::RetryPolicy,
12 timeout::TimeoutPolicy,
13 wrap::Wrap,
14};
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19#[tokio::main]
20async fn main() {
21 println!("=== Do-Over Policy Composition Example ===\n");
22
23 pattern1_retry_timeout().await;
25
26 println!("\n{}\n", "─".repeat(60));
27
28 pattern2_circuit_retry().await;
30
31 println!("\n{}\n", "─".repeat(60));
32
33 pattern3_full_stack().await;
35}
36
37async fn pattern1_retry_timeout() {
38 println!("📌 Pattern 1: Retry wrapping Timeout");
39 println!(" Wrap::new(Retry(3, 100ms), Timeout(200ms))");
40 println!(" → Each retry attempt gets its own 200ms timeout\n");
41
42 let policy = Wrap::new(
43 RetryPolicy::fixed(3, Duration::from_millis(100)),
44 TimeoutPolicy::new(Duration::from_millis(200)),
45 );
46
47 let attempt_count = Arc::new(AtomicUsize::new(0));
48 let start = Instant::now();
49
50 let result: Result<String, DoOverError<String>> = {
52 let ac = Arc::clone(&attempt_count);
53 policy
54 .execute(|| {
55 let count = Arc::clone(&ac);
56 async move {
57 let attempt = count.fetch_add(1, Ordering::SeqCst) + 1;
58 let elapsed = start.elapsed().as_millis();
59 println!(
60 " [+{:>4}ms] Attempt {}: Started",
61 elapsed, attempt
62 );
63
64 if attempt < 3 {
65 println!(
67 " [+{:>4}ms] Attempt {}: Processing slowly (will timeout)...",
68 elapsed, attempt
69 );
70 tokio::time::sleep(Duration::from_millis(500)).await;
71 Ok("Should not reach here".to_string())
72 } else {
73 tokio::time::sleep(Duration::from_millis(50)).await;
75 let elapsed = start.elapsed().as_millis();
76 println!(
77 " [+{:>4}ms] Attempt {}: ✅ Completed quickly!",
78 elapsed, attempt
79 );
80 Ok("Success on attempt 3".to_string())
81 }
82 }
83 })
84 .await
85 };
86
87 let total_time = start.elapsed().as_millis();
88 println!("\n Result: {:?}", result.unwrap());
89 println!(" Total time: {}ms", total_time);
90 println!(" Total attempts: {}", attempt_count.load(Ordering::SeqCst));
91 println!("\n 💡 Each attempt had its own timeout; operation succeeded on 3rd try");
92}
93
94async fn pattern2_circuit_retry() {
95 println!("📌 Pattern 2: CircuitBreaker wrapping Retry");
96 println!(" Wrap::new(CircuitBreaker(3, 1s), Retry(2, 50ms))");
97 println!(" → Retries happen inside the circuit breaker\n");
98
99 let policy = Wrap::new(
100 CircuitBreaker::new(3, Duration::from_secs(1)),
101 RetryPolicy::fixed(2, Duration::from_millis(50)),
102 );
103
104 let failure_count = Arc::new(AtomicUsize::new(0));
105 let start = Instant::now();
106
107 println!(" Phase 1: Failing calls to open circuit");
109 for i in 1..=4 {
110 let fc = Arc::clone(&failure_count);
111 let s = start;
112 let result: Result<String, DoOverError<String>> = policy
113 .execute(|| {
114 let count = Arc::clone(&fc);
115 async move {
116 count.fetch_add(1, Ordering::SeqCst);
117 let elapsed = s.elapsed().as_millis();
118 println!(
119 " [+{:>4}ms] Call {}: Inner operation failing...",
120 elapsed, i
121 );
122 Err(DoOverError::Inner(format!("Service error")))
123 }
124 })
125 .await;
126
127 let elapsed = start.elapsed().as_millis();
128 match &result {
129 Err(DoOverError::Inner(_)) => {
130 println!(
131 " [+{:>4}ms] Call {}: ❌ Failed after retries",
132 elapsed, i
133 );
134 }
135 Err(DoOverError::CircuitOpen) => {
136 println!(
137 " [+{:>4}ms] Call {}: 🚫 Circuit is OPEN",
138 elapsed, i
139 );
140 }
141 _ => {}
142 }
143 }
144
145 let inner_calls = failure_count.load(Ordering::SeqCst);
146 println!("\n Summary:");
147 println!(" - Inner operation calls: {} (includes retries)", inner_calls);
148 println!(" - Circuit opened after threshold reached");
149 println!("\n 💡 Retries happened inside circuit breaker; failures accumulated to open circuit");
150}
151
152async fn pattern3_full_stack() {
153 println!("📌 Pattern 3: Full Resilience Stack");
154 println!(" Bulkhead(2) → Retry(2, 100ms) → Timeout(500ms)");
155 println!(" Recommended ordering for production systems\n");
156
157 let policy = Arc::new(Wrap::new(
160 Bulkhead::new(2),
161 Wrap::new(
162 RetryPolicy::fixed(2, Duration::from_millis(100)),
163 TimeoutPolicy::new(Duration::from_millis(500)),
164 ),
165 ));
166
167 let call_count = Arc::new(AtomicUsize::new(0));
168 let start = Instant::now();
169
170 println!(" Launching 4 concurrent requests (bulkhead limit is 2)...\n");
171
172 let mut handles = vec![];
173 for i in 1..=4 {
174 let p = Arc::clone(&policy);
175 let cc = Arc::clone(&call_count);
176 let s = start;
177
178 let handle = tokio::spawn(async move {
179 let result: Result<String, DoOverError<String>> = p
180 .execute(|| {
181 let count = Arc::clone(&cc);
182 async move {
183 let call = count.fetch_add(1, Ordering::SeqCst) + 1;
184 let elapsed = s.elapsed().as_millis();
185 println!(
186 " [+{:>4}ms] Request {}, call {}: Processing...",
187 elapsed, i, call
188 );
189 tokio::time::sleep(Duration::from_millis(200)).await;
190 let elapsed = s.elapsed().as_millis();
191 println!(
192 " [+{:>4}ms] Request {}, call {}: ✅ Done",
193 elapsed, i, call
194 );
195 Ok(format!("Response {}", i))
196 }
197 })
198 .await;
199
200 let elapsed = s.elapsed().as_millis();
201 match &result {
202 Ok(msg) => println!(" [+{:>4}ms] Request {}: ✅ {}", elapsed, i, msg),
203 Err(DoOverError::BulkheadFull) => {
204 println!(" [+{:>4}ms] Request {}: 🚫 Rejected by bulkhead", elapsed, i)
205 }
206 Err(e) => println!(" [+{:>4}ms] Request {}: ❌ {:?}", elapsed, i, e),
207 }
208 });
209 handles.push(handle);
210
211 tokio::time::sleep(Duration::from_millis(10)).await;
212 }
213
214 for handle in handles {
215 handle.await.unwrap();
216 }
217
218 let total_time = start.elapsed().as_millis();
219 let total_calls = call_count.load(Ordering::SeqCst);
220
221 println!("\n Summary:");
222 println!(" - Total inner operation calls: {}", total_calls);
223 println!(" - Total time: {}ms", total_time);
224 println!("\n Policy execution order:");
225 println!(" 1. Bulkhead: Limits to 2 concurrent executions");
226 println!(" 2. Retry: Retries transient failures");
227 println!(" 3. Timeout: Each attempt bounded to 500ms");
228}