1use do_over::{
12 circuit_breaker::CircuitBreaker,
13 error::DoOverError,
14 hedge::Hedge,
15 policy::Policy,
16 rate_limit::RateLimiter,
17 retry::RetryPolicy,
18 timeout::TimeoutPolicy,
19 wrap::Wrap,
20};
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25struct ServiceState {
27 inventory_rate_limited: AtomicBool,
28 payment_failing: AtomicBool,
29 notification_slow: AtomicBool,
30 inventory_calls: AtomicUsize,
31 payment_calls: AtomicUsize,
32 notification_calls: AtomicUsize,
33}
34
35impl ServiceState {
36 fn new() -> Self {
37 Self {
38 inventory_rate_limited: AtomicBool::new(false),
39 payment_failing: AtomicBool::new(false),
40 notification_slow: AtomicBool::new(false),
41 inventory_calls: AtomicUsize::new(0),
42 payment_calls: AtomicUsize::new(0),
43 notification_calls: AtomicUsize::new(0),
44 }
45 }
46}
47
48type InventoryPolicy = Wrap<RateLimiter, Wrap<RetryPolicy, TimeoutPolicy>>;
50type PaymentPolicy = Wrap<CircuitBreaker, Wrap<RetryPolicy, TimeoutPolicy>>;
51type NotificationPolicy = Wrap<Hedge, TimeoutPolicy>;
52
53fn create_inventory_policy() -> InventoryPolicy {
54 Wrap::new(
55 RateLimiter::new(3, Duration::from_secs(1)), Wrap::new(
57 RetryPolicy::fixed(2, Duration::from_millis(100)),
58 TimeoutPolicy::new(Duration::from_millis(500)),
59 ),
60 )
61}
62
63fn create_payment_policy() -> PaymentPolicy {
64 Wrap::new(
65 CircuitBreaker::new(3, Duration::from_secs(5)), Wrap::new(
67 RetryPolicy::fixed(2, Duration::from_millis(200)),
68 TimeoutPolicy::new(Duration::from_secs(1)),
69 ),
70 )
71}
72
73fn create_notification_policy() -> NotificationPolicy {
74 Wrap::new(
75 Hedge::new(Duration::from_millis(100)), TimeoutPolicy::new(Duration::from_millis(500)),
77 )
78}
79
80#[tokio::main]
81async fn main() {
82 println!("=== Do-Over Comprehensive Example: Order Processing System ===\n");
83 println!("Services:");
84 println!(" 📦 Inventory: RateLimiter(3/s) → Retry(2) → Timeout(500ms)");
85 println!(" 💳 Payment: CircuitBreaker(3, 5s) → Retry(2) → Timeout(1s)");
86 println!(" 📧 Notification: Hedge(100ms) → Timeout(500ms)\n");
87
88 let state = Arc::new(ServiceState::new());
89
90 scenario1_normal(&state).await;
92
93 println!("\n{}\n", "═".repeat(70));
94
95 scenario2_rate_limited(&state).await;
97
98 println!("\n{}\n", "═".repeat(70));
99
100 scenario3_payment_failures(&state).await;
102
103 println!("\n{}\n", "═".repeat(70));
104
105 scenario4_slow_notification(&state).await;
107}
108
109async fn scenario1_normal(state: &Arc<ServiceState>) {
110 println!("📌 Scenario 1: Normal Operation");
111 println!(" All services healthy - order should complete successfully\n");
112
113 state.inventory_rate_limited.store(false, Ordering::SeqCst);
115 state.payment_failing.store(false, Ordering::SeqCst);
116 state.notification_slow.store(false, Ordering::SeqCst);
117
118 let start = Instant::now();
119 process_order("ORD-001", state, &start).await;
120
121 println!(
122 "\n Total order processing time: {}ms",
123 start.elapsed().as_millis()
124 );
125}
126
127async fn scenario2_rate_limited(state: &Arc<ServiceState>) {
128 println!("📌 Scenario 2: Inventory Rate Limited");
129 println!(" Burst of orders exceeds rate limit (3/second)\n");
130
131 state.inventory_rate_limited.store(false, Ordering::SeqCst);
133 state.payment_failing.store(false, Ordering::SeqCst);
134 state.notification_slow.store(false, Ordering::SeqCst);
135 state.inventory_calls.store(0, Ordering::SeqCst);
136
137 let inventory_policy = create_inventory_policy();
138 let start = Instant::now();
139
140 println!(" Sending 5 rapid inventory check requests...\n");
141
142 for i in 1..=5 {
143 let elapsed = start.elapsed().as_millis();
144 let result: Result<String, DoOverError<String>> = inventory_policy
145 .execute(|| async {
146 state.inventory_calls.fetch_add(1, Ordering::SeqCst);
147 tokio::time::sleep(Duration::from_millis(50)).await;
148 Ok("Stock available".to_string())
149 })
150 .await;
151
152 match &result {
153 Ok(msg) => println!(" [+{:>4}ms] Order {}: ✅ {}", elapsed, i, msg),
154 Err(DoOverError::BulkheadFull) => {
155 println!(" [+{:>4}ms] Order {}: 🚫 Rate limited - try again later", elapsed, i)
156 }
157 Err(e) => println!(" [+{:>4}ms] Order {}: ❌ {:?}", elapsed, i, e),
158 }
159 }
160
161 println!(
162 "\n Inventory service calls made: {}",
163 state.inventory_calls.load(Ordering::SeqCst)
164 );
165 println!(" 💡 Rate limiter protected the inventory service from overload");
166}
167
168async fn scenario3_payment_failures(state: &Arc<ServiceState>) {
169 println!("📌 Scenario 3: Payment Service Failures");
170 println!(" Payment service is failing - circuit will open after 3 failures\n");
171
172 state.payment_failing.store(true, Ordering::SeqCst);
174 state.payment_calls.store(0, Ordering::SeqCst);
175
176 let payment_policy = create_payment_policy();
177 let start = Instant::now();
178
179 for i in 1..=5 {
180 let elapsed = start.elapsed().as_millis();
181 let failing = state.payment_failing.load(Ordering::SeqCst);
182
183 let result: Result<String, DoOverError<String>> = payment_policy
184 .execute(|| async {
185 state.payment_calls.fetch_add(1, Ordering::SeqCst);
186 tokio::time::sleep(Duration::from_millis(50)).await;
187 if failing {
188 Err(DoOverError::Inner("Payment gateway unavailable".to_string()))
189 } else {
190 Ok("Payment processed".to_string())
191 }
192 })
193 .await;
194
195 match &result {
196 Ok(msg) => println!(" [+{:>4}ms] Payment {}: ✅ {}", elapsed, i, msg),
197 Err(DoOverError::CircuitOpen) => {
198 println!(" [+{:>4}ms] Payment {}: 🚫 Circuit OPEN - failing fast", elapsed, i)
199 }
200 Err(DoOverError::Inner(e)) => {
201 println!(" [+{:>4}ms] Payment {}: ❌ {} (after retries)", elapsed, i, e)
202 }
203 Err(e) => println!(" [+{:>4}ms] Payment {}: ❌ {:?}", elapsed, i, e),
204 }
205
206 tokio::time::sleep(Duration::from_millis(100)).await;
207 }
208
209 println!(
210 "\n Payment service calls made: {} (includes retries)",
211 state.payment_calls.load(Ordering::SeqCst)
212 );
213 println!(" 💡 Circuit breaker opened after repeated failures, preventing further calls");
214
215 println!("\n Waiting for circuit reset (5s)...");
217 tokio::time::sleep(Duration::from_secs(5)).await;
218 state.payment_failing.store(false, Ordering::SeqCst);
219 println!(" → Payment service recovered, circuit will test on next request");
220
221 let elapsed = start.elapsed().as_millis();
222 let result: Result<String, DoOverError<String>> = payment_policy
223 .execute(|| async {
224 state.payment_calls.fetch_add(1, Ordering::SeqCst);
225 tokio::time::sleep(Duration::from_millis(50)).await;
226 Ok("Payment processed".to_string())
227 })
228 .await;
229
230 match &result {
231 Ok(msg) => println!(" [+{:>4}ms] Recovery test: ✅ {} - Circuit CLOSED", elapsed, msg),
232 Err(e) => println!(" [+{:>4}ms] Recovery test: {:?}", elapsed, e),
233 }
234}
235
236async fn scenario4_slow_notification(state: &Arc<ServiceState>) {
237 println!("📌 Scenario 4: Slow Notification Service");
238 println!(" Notification is slow - hedge request will win\n");
239
240 state.notification_slow.store(true, Ordering::SeqCst);
242 state.notification_calls.store(0, Ordering::SeqCst);
243
244 let notification_policy = create_notification_policy();
245 let start = Instant::now();
246
247 let slow = state.notification_slow.load(Ordering::SeqCst);
248 let calls = Arc::new(AtomicUsize::new(0));
249
250 let result: Result<String, DoOverError<String>> = {
251 let c = Arc::clone(&calls);
252 notification_policy
253 .execute(|| {
254 let count = Arc::clone(&c);
255 async move {
256 let call_num = count.fetch_add(1, Ordering::SeqCst) + 1;
257 let elapsed = start.elapsed().as_millis();
258 let is_primary = call_num == 1;
259
260 println!(
261 " [+{:>4}ms] Notification {} ({}) started",
262 elapsed,
263 call_num,
264 if is_primary { "primary" } else { "hedge" }
265 );
266
267 let delay = if is_primary && slow { 400 } else { 50 };
269 tokio::time::sleep(Duration::from_millis(delay)).await;
270
271 let elapsed = start.elapsed().as_millis();
272 println!(
273 " [+{:>4}ms] Notification {} ({}) completed",
274 elapsed,
275 call_num,
276 if is_primary { "primary" } else { "hedge" }
277 );
278
279 Ok(format!(
280 "Email sent via {}",
281 if is_primary { "primary" } else { "hedge" }
282 ))
283 }
284 })
285 .await
286 };
287
288 let elapsed = start.elapsed().as_millis();
289 let total_calls = calls.load(Ordering::SeqCst);
290
291 match &result {
292 Ok(msg) => println!("\n [+{:>4}ms] Result: ✅ {}", elapsed, msg),
293 Err(e) => println!("\n [+{:>4}ms] Result: {:?}", elapsed, e),
294 }
295
296 println!(" Notification calls made: {}", total_calls);
297 println!(" 💡 Hedge request completed first, reducing user-perceived latency");
298}
299
300async fn process_order(order_id: &str, state: &Arc<ServiceState>, start: &Instant) {
301 println!(" Processing order: {}", order_id);
302
303 let inventory_policy = create_inventory_policy();
304 let payment_policy = create_payment_policy();
305 let notification_policy = create_notification_policy();
306
307 let elapsed = start.elapsed().as_millis();
309 println!(" [+{:>4}ms] Step 1: Checking inventory...", elapsed);
310
311 let inventory_result: Result<String, DoOverError<String>> = inventory_policy
312 .execute(|| async {
313 state.inventory_calls.fetch_add(1, Ordering::SeqCst);
314 tokio::time::sleep(Duration::from_millis(100)).await;
315 Ok("Stock available".to_string())
316 })
317 .await;
318
319 let elapsed = start.elapsed().as_millis();
320 match &inventory_result {
321 Ok(msg) => println!(" [+{:>4}ms] → ✅ {}", elapsed, msg),
322 Err(e) => {
323 println!(" [+{:>4}ms] → ❌ {:?}", elapsed, e);
324 return;
325 }
326 }
327
328 let elapsed = start.elapsed().as_millis();
330 println!(" [+{:>4}ms] Step 2: Processing payment...", elapsed);
331
332 let payment_result: Result<String, DoOverError<String>> = payment_policy
333 .execute(|| async {
334 state.payment_calls.fetch_add(1, Ordering::SeqCst);
335 tokio::time::sleep(Duration::from_millis(150)).await;
336 if state.payment_failing.load(Ordering::SeqCst) {
337 Err(DoOverError::Inner("Payment failed".to_string()))
338 } else {
339 Ok("Payment processed".to_string())
340 }
341 })
342 .await;
343
344 let elapsed = start.elapsed().as_millis();
345 match &payment_result {
346 Ok(msg) => println!(" [+{:>4}ms] → ✅ {}", elapsed, msg),
347 Err(e) => {
348 println!(" [+{:>4}ms] → ❌ {:?}", elapsed, e);
349 return;
350 }
351 }
352
353 let elapsed = start.elapsed().as_millis();
355 println!(" [+{:>4}ms] Step 3: Sending notification...", elapsed);
356
357 let notification_result: Result<String, DoOverError<String>> = notification_policy
358 .execute(|| async {
359 state.notification_calls.fetch_add(1, Ordering::SeqCst);
360 let delay = if state.notification_slow.load(Ordering::SeqCst) {
361 400
362 } else {
363 50
364 };
365 tokio::time::sleep(Duration::from_millis(delay)).await;
366 Ok("Notification sent".to_string())
367 })
368 .await;
369
370 let elapsed = start.elapsed().as_millis();
371 match ¬ification_result {
372 Ok(msg) => println!(" [+{:>4}ms] → ✅ {}", elapsed, msg),
373 Err(e) => println!(" [+{:>4}ms] → ⚠️ {:?} (non-critical)", elapsed, e),
374 }
375
376 let elapsed = start.elapsed().as_millis();
377 println!("\n [+{:>4}ms] ✨ Order {} completed successfully!", elapsed, order_id);
378}