pub struct Bulkhead { /* private fields */ }Expand description
A policy that limits concurrent executions.
The bulkhead uses a semaphore to control how many operations can run simultaneously. When the limit is reached, new requests are either rejected immediately or queued (if a queue timeout is configured).
§Examples
use do_over::{policy::Policy, bulkhead::Bulkhead, error::DoOverError};
use std::time::Duration;
// Basic bulkhead - reject immediately when full
let bulkhead = Bulkhead::new(5);
// With queue timeout - wait for a slot
let bulkhead = Bulkhead::new(5)
.with_queue_timeout(Duration::from_millis(500));Implementations§
Source§impl Bulkhead
impl Bulkhead
Sourcepub fn new(max_concurrent: usize) -> Self
pub fn new(max_concurrent: usize) -> Self
Create a new bulkhead with the specified concurrency limit.
Without a queue timeout, requests are rejected immediately when no slots are available.
§Arguments
max_concurrent- Maximum number of concurrent executions
§Examples
use do_over::bulkhead::Bulkhead;
// Allow up to 10 concurrent operations
let bulkhead = Bulkhead::new(10);Examples found in repository?
examples/bulkhead.rs (line 29)
24async fn basic_concurrency_example() {
25 println!("📌 Scenario 1: Basic Concurrency Limiting");
26 println!(" Configuration: max_concurrent=2, no queue");
27 println!(" Launching 5 concurrent requests...\n");
28
29 let bulkhead = Arc::new(Bulkhead::new(2));
30 let start = Instant::now();
31 let completed = Arc::new(AtomicUsize::new(0));
32 let rejected = Arc::new(AtomicUsize::new(0));
33
34 let mut handles = vec![];
35
36 for i in 1..=5 {
37 let bh = Arc::clone(&bulkhead);
38 let comp = Arc::clone(&completed);
39 let rej = Arc::clone(&rejected);
40 let s = start;
41
42 let handle = tokio::spawn(async move {
43 let result: Result<String, DoOverError<String>> = bh
44 .execute(|| async {
45 let elapsed = s.elapsed().as_millis();
46 println!(
47 " [+{:>4}ms] Request {}: 🔓 Acquired slot, processing...",
48 elapsed, i
49 );
50 tokio::time::sleep(Duration::from_millis(200)).await;
51 let elapsed = s.elapsed().as_millis();
52 println!(
53 " [+{:>4}ms] Request {}: ✅ Completed, releasing slot",
54 elapsed, i
55 );
56 Ok(format!("Request {} done", i))
57 })
58 .await;
59
60 match result {
61 Ok(_) => {
62 comp.fetch_add(1, Ordering::SeqCst);
63 }
64 Err(DoOverError::BulkheadFull) => {
65 let elapsed = s.elapsed().as_millis();
66 println!(
67 " [+{:>4}ms] Request {}: 🚫 Rejected (BulkheadFull)",
68 elapsed, i
69 );
70 rej.fetch_add(1, Ordering::SeqCst);
71 }
72 Err(e) => println!(" Request {}: Error - {:?}", i, e),
73 }
74 });
75 handles.push(handle);
76
77 // Small delay to make output more readable
78 tokio::time::sleep(Duration::from_millis(10)).await;
79 }
80
81 for handle in handles {
82 handle.await.unwrap();
83 }
84
85 let elapsed = start.elapsed().as_millis();
86 println!("\n Summary:");
87 println!(" - Completed: {}", completed.load(Ordering::SeqCst));
88 println!(" - Rejected: {}", rejected.load(Ordering::SeqCst));
89 println!(" - Total time: {}ms", elapsed);
90 println!("\n 💡 Only 2 requests could run concurrently, 3 were rejected immediately");
91}
92
93async fn queue_timeout_example() {
94 println!("📌 Scenario 2: With Queue Timeout");
95 println!(" Configuration: max_concurrent=2, queue_timeout=150ms");
96 println!(" Launching 5 concurrent requests...\n");
97
98 let bulkhead = Arc::new(Bulkhead::new(2).with_queue_timeout(Duration::from_millis(150)));
99 let start = Instant::now();
100 let completed = Arc::new(AtomicUsize::new(0));
101 let rejected = Arc::new(AtomicUsize::new(0));
102 let queued = Arc::new(AtomicUsize::new(0));
103
104 let mut handles = vec![];
105
106 for i in 1..=5 {
107 let bh = Arc::clone(&bulkhead);
108 let comp = Arc::clone(&completed);
109 let rej = Arc::clone(&rejected);
110 let q = Arc::clone(&queued);
111 let s = start;
112
113 let handle = tokio::spawn(async move {
114 let request_start = s.elapsed().as_millis();
115 let result: Result<String, DoOverError<String>> = bh
116 .execute(|| async {
117 let elapsed = s.elapsed().as_millis();
118 let wait_time = elapsed - request_start;
119 if wait_time > 10 {
120 println!(
121 " [+{:>4}ms] Request {}: ⏳ Waited {}ms in queue, now processing",
122 elapsed, i, wait_time
123 );
124 q.fetch_add(1, Ordering::SeqCst);
125 } else {
126 println!(
127 " [+{:>4}ms] Request {}: 🔓 Acquired slot immediately",
128 elapsed, i
129 );
130 }
131 tokio::time::sleep(Duration::from_millis(100)).await;
132 let elapsed = s.elapsed().as_millis();
133 println!(
134 " [+{:>4}ms] Request {}: ✅ Completed",
135 elapsed, i
136 );
137 Ok(format!("Request {} done", i))
138 })
139 .await;
140
141 match result {
142 Ok(_) => {
143 comp.fetch_add(1, Ordering::SeqCst);
144 }
145 Err(DoOverError::BulkheadFull) => {
146 let elapsed = s.elapsed().as_millis();
147 println!(
148 " [+{:>4}ms] Request {}: 🚫 Queue timeout expired",
149 elapsed, i
150 );
151 rej.fetch_add(1, Ordering::SeqCst);
152 }
153 Err(e) => println!(" Request {}: Error - {:?}", i, e),
154 }
155 });
156 handles.push(handle);
157
158 // Small delay between launching requests
159 tokio::time::sleep(Duration::from_millis(20)).await;
160 }
161
162 for handle in handles {
163 handle.await.unwrap();
164 }
165
166 let elapsed = start.elapsed().as_millis();
167 println!("\n Summary:");
168 println!(" - Completed: {}", completed.load(Ordering::SeqCst));
169 println!(" - Queued/Waited: {}", queued.load(Ordering::SeqCst));
170 println!(" - Rejected: {}", rejected.load(Ordering::SeqCst));
171 println!(" - Total time: {}ms", elapsed);
172 println!("\n 💡 Requests waited in queue up to 150ms before being rejected");
173}More examples
examples/composition.rs (line 160)
152async fn pattern3_full_stack() {
153 println!("📌 Pattern 3: Full Resilience Stack");
154 println!(" Bulkhead(2) → Retry(2, 100ms) → Timeout(500ms)");
155 println!(" Recommended ordering for production systems\n");
156
157 // Note: Using Bulkhead + Retry + Timeout for this example to avoid
158 // CircuitBreaker clone issues in concurrent async context
159 let policy = Arc::new(Wrap::new(
160 Bulkhead::new(2),
161 Wrap::new(
162 RetryPolicy::fixed(2, Duration::from_millis(100)),
163 TimeoutPolicy::new(Duration::from_millis(500)),
164 ),
165 ));
166
167 let call_count = Arc::new(AtomicUsize::new(0));
168 let start = Instant::now();
169
170 println!(" Launching 4 concurrent requests (bulkhead limit is 2)...\n");
171
172 let mut handles = vec![];
173 for i in 1..=4 {
174 let p = Arc::clone(&policy);
175 let cc = Arc::clone(&call_count);
176 let s = start;
177
178 let handle = tokio::spawn(async move {
179 let result: Result<String, DoOverError<String>> = p
180 .execute(|| {
181 let count = Arc::clone(&cc);
182 async move {
183 let call = count.fetch_add(1, Ordering::SeqCst) + 1;
184 let elapsed = s.elapsed().as_millis();
185 println!(
186 " [+{:>4}ms] Request {}, call {}: Processing...",
187 elapsed, i, call
188 );
189 tokio::time::sleep(Duration::from_millis(200)).await;
190 let elapsed = s.elapsed().as_millis();
191 println!(
192 " [+{:>4}ms] Request {}, call {}: ✅ Done",
193 elapsed, i, call
194 );
195 Ok(format!("Response {}", i))
196 }
197 })
198 .await;
199
200 let elapsed = s.elapsed().as_millis();
201 match &result {
202 Ok(msg) => println!(" [+{:>4}ms] Request {}: ✅ {}", elapsed, i, msg),
203 Err(DoOverError::BulkheadFull) => {
204 println!(" [+{:>4}ms] Request {}: 🚫 Rejected by bulkhead", elapsed, i)
205 }
206 Err(e) => println!(" [+{:>4}ms] Request {}: ❌ {:?}", elapsed, i, e),
207 }
208 });
209 handles.push(handle);
210
211 tokio::time::sleep(Duration::from_millis(10)).await;
212 }
213
214 for handle in handles {
215 handle.await.unwrap();
216 }
217
218 let total_time = start.elapsed().as_millis();
219 let total_calls = call_count.load(Ordering::SeqCst);
220
221 println!("\n Summary:");
222 println!(" - Total inner operation calls: {}", total_calls);
223 println!(" - Total time: {}ms", total_time);
224 println!("\n Policy execution order:");
225 println!(" 1. Bulkhead: Limits to 2 concurrent executions");
226 println!(" 2. Retry: Retries transient failures");
227 println!(" 3. Timeout: Each attempt bounded to 500ms");
228}Sourcepub fn with_queue_timeout(self, timeout: Duration) -> Self
pub fn with_queue_timeout(self, timeout: Duration) -> Self
Set a queue timeout for waiting on a slot.
When set, requests will wait up to the specified duration for a slot to become available before being rejected.
§Arguments
timeout- Maximum time to wait for a slot
§Examples
use do_over::bulkhead::Bulkhead;
use std::time::Duration;
let bulkhead = Bulkhead::new(10)
.with_queue_timeout(Duration::from_secs(1));Examples found in repository?
examples/bulkhead.rs (line 98)
93async fn queue_timeout_example() {
94 println!("📌 Scenario 2: With Queue Timeout");
95 println!(" Configuration: max_concurrent=2, queue_timeout=150ms");
96 println!(" Launching 5 concurrent requests...\n");
97
98 let bulkhead = Arc::new(Bulkhead::new(2).with_queue_timeout(Duration::from_millis(150)));
99 let start = Instant::now();
100 let completed = Arc::new(AtomicUsize::new(0));
101 let rejected = Arc::new(AtomicUsize::new(0));
102 let queued = Arc::new(AtomicUsize::new(0));
103
104 let mut handles = vec![];
105
106 for i in 1..=5 {
107 let bh = Arc::clone(&bulkhead);
108 let comp = Arc::clone(&completed);
109 let rej = Arc::clone(&rejected);
110 let q = Arc::clone(&queued);
111 let s = start;
112
113 let handle = tokio::spawn(async move {
114 let request_start = s.elapsed().as_millis();
115 let result: Result<String, DoOverError<String>> = bh
116 .execute(|| async {
117 let elapsed = s.elapsed().as_millis();
118 let wait_time = elapsed - request_start;
119 if wait_time > 10 {
120 println!(
121 " [+{:>4}ms] Request {}: ⏳ Waited {}ms in queue, now processing",
122 elapsed, i, wait_time
123 );
124 q.fetch_add(1, Ordering::SeqCst);
125 } else {
126 println!(
127 " [+{:>4}ms] Request {}: 🔓 Acquired slot immediately",
128 elapsed, i
129 );
130 }
131 tokio::time::sleep(Duration::from_millis(100)).await;
132 let elapsed = s.elapsed().as_millis();
133 println!(
134 " [+{:>4}ms] Request {}: ✅ Completed",
135 elapsed, i
136 );
137 Ok(format!("Request {} done", i))
138 })
139 .await;
140
141 match result {
142 Ok(_) => {
143 comp.fetch_add(1, Ordering::SeqCst);
144 }
145 Err(DoOverError::BulkheadFull) => {
146 let elapsed = s.elapsed().as_millis();
147 println!(
148 " [+{:>4}ms] Request {}: 🚫 Queue timeout expired",
149 elapsed, i
150 );
151 rej.fetch_add(1, Ordering::SeqCst);
152 }
153 Err(e) => println!(" Request {}: Error - {:?}", i, e),
154 }
155 });
156 handles.push(handle);
157
158 // Small delay between launching requests
159 tokio::time::sleep(Duration::from_millis(20)).await;
160 }
161
162 for handle in handles {
163 handle.await.unwrap();
164 }
165
166 let elapsed = start.elapsed().as_millis();
167 println!("\n Summary:");
168 println!(" - Completed: {}", completed.load(Ordering::SeqCst));
169 println!(" - Queued/Waited: {}", queued.load(Ordering::SeqCst));
170 println!(" - Rejected: {}", rejected.load(Ordering::SeqCst));
171 println!(" - Total time: {}ms", elapsed);
172 println!("\n 💡 Requests waited in queue up to 150ms before being rejected");
173}Trait Implementations§
Auto Trait Implementations§
impl Freeze for Bulkhead
impl RefUnwindSafe for Bulkhead
impl Send for Bulkhead
impl Sync for Bulkhead
impl Unpin for Bulkhead
impl UnwindSafe for Bulkhead
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more