Skip to main content

bulkhead/
bulkhead.rs

1//! Bulkhead Policy Example
2//!
3//! This example demonstrates the Bulkhead policy for concurrency limiting.
4//! It shows how concurrent operations are limited and excess requests are rejected.
5
6use do_over::{bulkhead::Bulkhead, error::DoOverError, policy::Policy};
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11#[tokio::main]
12async fn main() {
13    println!("=== Do-Over Bulkhead Policy Example ===\n");
14
15    // Scenario 1: Basic concurrency limiting
16    basic_concurrency_example().await;
17
18    println!("\n{}\n", "─".repeat(60));
19
20    // Scenario 2: With queue timeout
21    queue_timeout_example().await;
22}
23
24async fn basic_concurrency_example() {
25    println!("📌 Scenario 1: Basic Concurrency Limiting");
26    println!("   Configuration: max_concurrent=2, no queue");
27    println!("   Launching 5 concurrent requests...\n");
28
29    let bulkhead = Arc::new(Bulkhead::new(2));
30    let start = Instant::now();
31    let completed = Arc::new(AtomicUsize::new(0));
32    let rejected = Arc::new(AtomicUsize::new(0));
33
34    let mut handles = vec![];
35
36    for i in 1..=5 {
37        let bh = Arc::clone(&bulkhead);
38        let comp = Arc::clone(&completed);
39        let rej = Arc::clone(&rejected);
40        let s = start;
41
42        let handle = tokio::spawn(async move {
43            let result: Result<String, DoOverError<String>> = bh
44                .execute(|| async {
45                    let elapsed = s.elapsed().as_millis();
46                    println!(
47                        "   [+{:>4}ms] Request {}: 🔓 Acquired slot, processing...",
48                        elapsed, i
49                    );
50                    tokio::time::sleep(Duration::from_millis(200)).await;
51                    let elapsed = s.elapsed().as_millis();
52                    println!(
53                        "   [+{:>4}ms] Request {}: ✅ Completed, releasing slot",
54                        elapsed, i
55                    );
56                    Ok(format!("Request {} done", i))
57                })
58                .await;
59
60            match result {
61                Ok(_) => {
62                    comp.fetch_add(1, Ordering::SeqCst);
63                }
64                Err(DoOverError::BulkheadFull) => {
65                    let elapsed = s.elapsed().as_millis();
66                    println!(
67                        "   [+{:>4}ms] Request {}: 🚫 Rejected (BulkheadFull)",
68                        elapsed, i
69                    );
70                    rej.fetch_add(1, Ordering::SeqCst);
71                }
72                Err(e) => println!("   Request {}: Error - {:?}", i, e),
73            }
74        });
75        handles.push(handle);
76
77        // Small delay to make output more readable
78        tokio::time::sleep(Duration::from_millis(10)).await;
79    }
80
81    for handle in handles {
82        handle.await.unwrap();
83    }
84
85    let elapsed = start.elapsed().as_millis();
86    println!("\n   Summary:");
87    println!("   - Completed: {}", completed.load(Ordering::SeqCst));
88    println!("   - Rejected:  {}", rejected.load(Ordering::SeqCst));
89    println!("   - Total time: {}ms", elapsed);
90    println!("\n   💡 Only 2 requests could run concurrently, 3 were rejected immediately");
91}
92
93async fn queue_timeout_example() {
94    println!("📌 Scenario 2: With Queue Timeout");
95    println!("   Configuration: max_concurrent=2, queue_timeout=150ms");
96    println!("   Launching 5 concurrent requests...\n");
97
98    let bulkhead = Arc::new(Bulkhead::new(2).with_queue_timeout(Duration::from_millis(150)));
99    let start = Instant::now();
100    let completed = Arc::new(AtomicUsize::new(0));
101    let rejected = Arc::new(AtomicUsize::new(0));
102    let queued = Arc::new(AtomicUsize::new(0));
103
104    let mut handles = vec![];
105
106    for i in 1..=5 {
107        let bh = Arc::clone(&bulkhead);
108        let comp = Arc::clone(&completed);
109        let rej = Arc::clone(&rejected);
110        let q = Arc::clone(&queued);
111        let s = start;
112
113        let handle = tokio::spawn(async move {
114            let request_start = s.elapsed().as_millis();
115            let result: Result<String, DoOverError<String>> = bh
116                .execute(|| async {
117                    let elapsed = s.elapsed().as_millis();
118                    let wait_time = elapsed - request_start;
119                    if wait_time > 10 {
120                        println!(
121                            "   [+{:>4}ms] Request {}: ⏳ Waited {}ms in queue, now processing",
122                            elapsed, i, wait_time
123                        );
124                        q.fetch_add(1, Ordering::SeqCst);
125                    } else {
126                        println!(
127                            "   [+{:>4}ms] Request {}: 🔓 Acquired slot immediately",
128                            elapsed, i
129                        );
130                    }
131                    tokio::time::sleep(Duration::from_millis(100)).await;
132                    let elapsed = s.elapsed().as_millis();
133                    println!(
134                        "   [+{:>4}ms] Request {}: ✅ Completed",
135                        elapsed, i
136                    );
137                    Ok(format!("Request {} done", i))
138                })
139                .await;
140
141            match result {
142                Ok(_) => {
143                    comp.fetch_add(1, Ordering::SeqCst);
144                }
145                Err(DoOverError::BulkheadFull) => {
146                    let elapsed = s.elapsed().as_millis();
147                    println!(
148                        "   [+{:>4}ms] Request {}: 🚫 Queue timeout expired",
149                        elapsed, i
150                    );
151                    rej.fetch_add(1, Ordering::SeqCst);
152                }
153                Err(e) => println!("   Request {}: Error - {:?}", i, e),
154            }
155        });
156        handles.push(handle);
157
158        // Small delay between launching requests
159        tokio::time::sleep(Duration::from_millis(20)).await;
160    }
161
162    for handle in handles {
163        handle.await.unwrap();
164    }
165
166    let elapsed = start.elapsed().as_millis();
167    println!("\n   Summary:");
168    println!("   - Completed:     {}", completed.load(Ordering::SeqCst));
169    println!("   - Queued/Waited: {}", queued.load(Ordering::SeqCst));
170    println!("   - Rejected:      {}", rejected.load(Ordering::SeqCst));
171    println!("   - Total time:    {}ms", elapsed);
172    println!("\n   💡 Requests waited in queue up to 150ms before being rejected");
173}