1use do_over::{bulkhead::Bulkhead, error::DoOverError, policy::Policy};
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11#[tokio::main]
12async fn main() {
13 println!("=== Do-Over Bulkhead Policy Example ===\n");
14
15 basic_concurrency_example().await;
17
18 println!("\n{}\n", "─".repeat(60));
19
20 queue_timeout_example().await;
22}
23
24async fn basic_concurrency_example() {
25 println!("📌 Scenario 1: Basic Concurrency Limiting");
26 println!(" Configuration: max_concurrent=2, no queue");
27 println!(" Launching 5 concurrent requests...\n");
28
29 let bulkhead = Arc::new(Bulkhead::new(2));
30 let start = Instant::now();
31 let completed = Arc::new(AtomicUsize::new(0));
32 let rejected = Arc::new(AtomicUsize::new(0));
33
34 let mut handles = vec![];
35
36 for i in 1..=5 {
37 let bh = Arc::clone(&bulkhead);
38 let comp = Arc::clone(&completed);
39 let rej = Arc::clone(&rejected);
40 let s = start;
41
42 let handle = tokio::spawn(async move {
43 let result: Result<String, DoOverError<String>> = bh
44 .execute(|| async {
45 let elapsed = s.elapsed().as_millis();
46 println!(
47 " [+{:>4}ms] Request {}: 🔓 Acquired slot, processing...",
48 elapsed, i
49 );
50 tokio::time::sleep(Duration::from_millis(200)).await;
51 let elapsed = s.elapsed().as_millis();
52 println!(
53 " [+{:>4}ms] Request {}: ✅ Completed, releasing slot",
54 elapsed, i
55 );
56 Ok(format!("Request {} done", i))
57 })
58 .await;
59
60 match result {
61 Ok(_) => {
62 comp.fetch_add(1, Ordering::SeqCst);
63 }
64 Err(DoOverError::BulkheadFull) => {
65 let elapsed = s.elapsed().as_millis();
66 println!(
67 " [+{:>4}ms] Request {}: 🚫 Rejected (BulkheadFull)",
68 elapsed, i
69 );
70 rej.fetch_add(1, Ordering::SeqCst);
71 }
72 Err(e) => println!(" Request {}: Error - {:?}", i, e),
73 }
74 });
75 handles.push(handle);
76
77 tokio::time::sleep(Duration::from_millis(10)).await;
79 }
80
81 for handle in handles {
82 handle.await.unwrap();
83 }
84
85 let elapsed = start.elapsed().as_millis();
86 println!("\n Summary:");
87 println!(" - Completed: {}", completed.load(Ordering::SeqCst));
88 println!(" - Rejected: {}", rejected.load(Ordering::SeqCst));
89 println!(" - Total time: {}ms", elapsed);
90 println!("\n 💡 Only 2 requests could run concurrently, 3 were rejected immediately");
91}
92
93async fn queue_timeout_example() {
94 println!("📌 Scenario 2: With Queue Timeout");
95 println!(" Configuration: max_concurrent=2, queue_timeout=150ms");
96 println!(" Launching 5 concurrent requests...\n");
97
98 let bulkhead = Arc::new(Bulkhead::new(2).with_queue_timeout(Duration::from_millis(150)));
99 let start = Instant::now();
100 let completed = Arc::new(AtomicUsize::new(0));
101 let rejected = Arc::new(AtomicUsize::new(0));
102 let queued = Arc::new(AtomicUsize::new(0));
103
104 let mut handles = vec![];
105
106 for i in 1..=5 {
107 let bh = Arc::clone(&bulkhead);
108 let comp = Arc::clone(&completed);
109 let rej = Arc::clone(&rejected);
110 let q = Arc::clone(&queued);
111 let s = start;
112
113 let handle = tokio::spawn(async move {
114 let request_start = s.elapsed().as_millis();
115 let result: Result<String, DoOverError<String>> = bh
116 .execute(|| async {
117 let elapsed = s.elapsed().as_millis();
118 let wait_time = elapsed - request_start;
119 if wait_time > 10 {
120 println!(
121 " [+{:>4}ms] Request {}: ⏳ Waited {}ms in queue, now processing",
122 elapsed, i, wait_time
123 );
124 q.fetch_add(1, Ordering::SeqCst);
125 } else {
126 println!(
127 " [+{:>4}ms] Request {}: 🔓 Acquired slot immediately",
128 elapsed, i
129 );
130 }
131 tokio::time::sleep(Duration::from_millis(100)).await;
132 let elapsed = s.elapsed().as_millis();
133 println!(
134 " [+{:>4}ms] Request {}: ✅ Completed",
135 elapsed, i
136 );
137 Ok(format!("Request {} done", i))
138 })
139 .await;
140
141 match result {
142 Ok(_) => {
143 comp.fetch_add(1, Ordering::SeqCst);
144 }
145 Err(DoOverError::BulkheadFull) => {
146 let elapsed = s.elapsed().as_millis();
147 println!(
148 " [+{:>4}ms] Request {}: 🚫 Queue timeout expired",
149 elapsed, i
150 );
151 rej.fetch_add(1, Ordering::SeqCst);
152 }
153 Err(e) => println!(" Request {}: Error - {:?}", i, e),
154 }
155 });
156 handles.push(handle);
157
158 tokio::time::sleep(Duration::from_millis(20)).await;
160 }
161
162 for handle in handles {
163 handle.await.unwrap();
164 }
165
166 let elapsed = start.elapsed().as_millis();
167 println!("\n Summary:");
168 println!(" - Completed: {}", completed.load(Ordering::SeqCst));
169 println!(" - Queued/Waited: {}", queued.load(Ordering::SeqCst));
170 println!(" - Rejected: {}", rejected.load(Ordering::SeqCst));
171 println!(" - Total time: {}ms", elapsed);
172 println!("\n 💡 Requests waited in queue up to 150ms before being rejected");
173}