Skip to main content

Bulkhead

Struct Bulkhead 

Source
pub struct Bulkhead { /* private fields */ }
Expand description

A policy that limits concurrent executions.

The bulkhead uses a semaphore to control how many operations can run simultaneously. When the limit is reached, new requests are either rejected immediately or queued (if a queue timeout is configured).

§Examples

use do_over::{policy::Policy, bulkhead::Bulkhead, error::DoOverError};
use std::time::Duration;

// Basic bulkhead - reject immediately when full
let bulkhead = Bulkhead::new(5);

// With queue timeout - wait for a slot
let bulkhead = Bulkhead::new(5)
    .with_queue_timeout(Duration::from_millis(500));

Implementations§

Source§

impl Bulkhead

Source

pub fn new(max_concurrent: usize) -> Self

Create a new bulkhead with the specified concurrency limit.

Without a queue timeout, requests are rejected immediately when no slots are available.

§Arguments
  • max_concurrent - Maximum number of concurrent executions
§Examples
use do_over::bulkhead::Bulkhead;

// Allow up to 10 concurrent operations
let bulkhead = Bulkhead::new(10);
Examples found in repository?
examples/bulkhead.rs (line 29)
24async fn basic_concurrency_example() {
25    println!("📌 Scenario 1: Basic Concurrency Limiting");
26    println!("   Configuration: max_concurrent=2, no queue");
27    println!("   Launching 5 concurrent requests...\n");
28
29    let bulkhead = Arc::new(Bulkhead::new(2));
30    let start = Instant::now();
31    let completed = Arc::new(AtomicUsize::new(0));
32    let rejected = Arc::new(AtomicUsize::new(0));
33
34    let mut handles = vec![];
35
36    for i in 1..=5 {
37        let bh = Arc::clone(&bulkhead);
38        let comp = Arc::clone(&completed);
39        let rej = Arc::clone(&rejected);
40        let s = start;
41
42        let handle = tokio::spawn(async move {
43            let result: Result<String, DoOverError<String>> = bh
44                .execute(|| async {
45                    let elapsed = s.elapsed().as_millis();
46                    println!(
47                        "   [+{:>4}ms] Request {}: 🔓 Acquired slot, processing...",
48                        elapsed, i
49                    );
50                    tokio::time::sleep(Duration::from_millis(200)).await;
51                    let elapsed = s.elapsed().as_millis();
52                    println!(
53                        "   [+{:>4}ms] Request {}: ✅ Completed, releasing slot",
54                        elapsed, i
55                    );
56                    Ok(format!("Request {} done", i))
57                })
58                .await;
59
60            match result {
61                Ok(_) => {
62                    comp.fetch_add(1, Ordering::SeqCst);
63                }
64                Err(DoOverError::BulkheadFull) => {
65                    let elapsed = s.elapsed().as_millis();
66                    println!(
67                        "   [+{:>4}ms] Request {}: 🚫 Rejected (BulkheadFull)",
68                        elapsed, i
69                    );
70                    rej.fetch_add(1, Ordering::SeqCst);
71                }
72                Err(e) => println!("   Request {}: Error - {:?}", i, e),
73            }
74        });
75        handles.push(handle);
76
77        // Small delay to make output more readable
78        tokio::time::sleep(Duration::from_millis(10)).await;
79    }
80
81    for handle in handles {
82        handle.await.unwrap();
83    }
84
85    let elapsed = start.elapsed().as_millis();
86    println!("\n   Summary:");
87    println!("   - Completed: {}", completed.load(Ordering::SeqCst));
88    println!("   - Rejected:  {}", rejected.load(Ordering::SeqCst));
89    println!("   - Total time: {}ms", elapsed);
90    println!("\n   💡 Only 2 requests could run concurrently, 3 were rejected immediately");
91}
92
93async fn queue_timeout_example() {
94    println!("📌 Scenario 2: With Queue Timeout");
95    println!("   Configuration: max_concurrent=2, queue_timeout=150ms");
96    println!("   Launching 5 concurrent requests...\n");
97
98    let bulkhead = Arc::new(Bulkhead::new(2).with_queue_timeout(Duration::from_millis(150)));
99    let start = Instant::now();
100    let completed = Arc::new(AtomicUsize::new(0));
101    let rejected = Arc::new(AtomicUsize::new(0));
102    let queued = Arc::new(AtomicUsize::new(0));
103
104    let mut handles = vec![];
105
106    for i in 1..=5 {
107        let bh = Arc::clone(&bulkhead);
108        let comp = Arc::clone(&completed);
109        let rej = Arc::clone(&rejected);
110        let q = Arc::clone(&queued);
111        let s = start;
112
113        let handle = tokio::spawn(async move {
114            let request_start = s.elapsed().as_millis();
115            let result: Result<String, DoOverError<String>> = bh
116                .execute(|| async {
117                    let elapsed = s.elapsed().as_millis();
118                    let wait_time = elapsed - request_start;
119                    if wait_time > 10 {
120                        println!(
121                            "   [+{:>4}ms] Request {}: ⏳ Waited {}ms in queue, now processing",
122                            elapsed, i, wait_time
123                        );
124                        q.fetch_add(1, Ordering::SeqCst);
125                    } else {
126                        println!(
127                            "   [+{:>4}ms] Request {}: 🔓 Acquired slot immediately",
128                            elapsed, i
129                        );
130                    }
131                    tokio::time::sleep(Duration::from_millis(100)).await;
132                    let elapsed = s.elapsed().as_millis();
133                    println!(
134                        "   [+{:>4}ms] Request {}: ✅ Completed",
135                        elapsed, i
136                    );
137                    Ok(format!("Request {} done", i))
138                })
139                .await;
140
141            match result {
142                Ok(_) => {
143                    comp.fetch_add(1, Ordering::SeqCst);
144                }
145                Err(DoOverError::BulkheadFull) => {
146                    let elapsed = s.elapsed().as_millis();
147                    println!(
148                        "   [+{:>4}ms] Request {}: 🚫 Queue timeout expired",
149                        elapsed, i
150                    );
151                    rej.fetch_add(1, Ordering::SeqCst);
152                }
153                Err(e) => println!("   Request {}: Error - {:?}", i, e),
154            }
155        });
156        handles.push(handle);
157
158        // Small delay between launching requests
159        tokio::time::sleep(Duration::from_millis(20)).await;
160    }
161
162    for handle in handles {
163        handle.await.unwrap();
164    }
165
166    let elapsed = start.elapsed().as_millis();
167    println!("\n   Summary:");
168    println!("   - Completed:     {}", completed.load(Ordering::SeqCst));
169    println!("   - Queued/Waited: {}", queued.load(Ordering::SeqCst));
170    println!("   - Rejected:      {}", rejected.load(Ordering::SeqCst));
171    println!("   - Total time:    {}ms", elapsed);
172    println!("\n   💡 Requests waited in queue up to 150ms before being rejected");
173}
More examples
Hide additional examples
examples/composition.rs (line 160)
152async fn pattern3_full_stack() {
153    println!("📌 Pattern 3: Full Resilience Stack");
154    println!("   Bulkhead(2) → Retry(2, 100ms) → Timeout(500ms)");
155    println!("   Recommended ordering for production systems\n");
156
157    // Note: Using Bulkhead + Retry + Timeout for this example to avoid
158    // CircuitBreaker clone issues in concurrent async context
159    let policy = Arc::new(Wrap::new(
160        Bulkhead::new(2),
161        Wrap::new(
162            RetryPolicy::fixed(2, Duration::from_millis(100)),
163            TimeoutPolicy::new(Duration::from_millis(500)),
164        ),
165    ));
166
167    let call_count = Arc::new(AtomicUsize::new(0));
168    let start = Instant::now();
169
170    println!("   Launching 4 concurrent requests (bulkhead limit is 2)...\n");
171
172    let mut handles = vec![];
173    for i in 1..=4 {
174        let p = Arc::clone(&policy);
175        let cc = Arc::clone(&call_count);
176        let s = start;
177
178        let handle = tokio::spawn(async move {
179            let result: Result<String, DoOverError<String>> = p
180                .execute(|| {
181                    let count = Arc::clone(&cc);
182                    async move {
183                        let call = count.fetch_add(1, Ordering::SeqCst) + 1;
184                        let elapsed = s.elapsed().as_millis();
185                        println!(
186                            "   [+{:>4}ms] Request {}, call {}: Processing...",
187                            elapsed, i, call
188                        );
189                        tokio::time::sleep(Duration::from_millis(200)).await;
190                        let elapsed = s.elapsed().as_millis();
191                        println!(
192                            "   [+{:>4}ms] Request {}, call {}: ✅ Done",
193                            elapsed, i, call
194                        );
195                        Ok(format!("Response {}", i))
196                    }
197                })
198                .await;
199
200            let elapsed = s.elapsed().as_millis();
201            match &result {
202                Ok(msg) => println!("   [+{:>4}ms] Request {}: ✅ {}", elapsed, i, msg),
203                Err(DoOverError::BulkheadFull) => {
204                    println!("   [+{:>4}ms] Request {}: 🚫 Rejected by bulkhead", elapsed, i)
205                }
206                Err(e) => println!("   [+{:>4}ms] Request {}: ❌ {:?}", elapsed, i, e),
207            }
208        });
209        handles.push(handle);
210
211        tokio::time::sleep(Duration::from_millis(10)).await;
212    }
213
214    for handle in handles {
215        handle.await.unwrap();
216    }
217
218    let total_time = start.elapsed().as_millis();
219    let total_calls = call_count.load(Ordering::SeqCst);
220
221    println!("\n   Summary:");
222    println!("   - Total inner operation calls: {}", total_calls);
223    println!("   - Total time: {}ms", total_time);
224    println!("\n   Policy execution order:");
225    println!("   1. Bulkhead: Limits to 2 concurrent executions");
226    println!("   2. Retry: Retries transient failures");
227    println!("   3. Timeout: Each attempt bounded to 500ms");
228}
Source

pub fn with_queue_timeout(self, timeout: Duration) -> Self

Set a queue timeout for waiting on a slot.

When set, requests will wait up to the specified duration for a slot to become available before being rejected.

§Arguments
  • timeout - Maximum time to wait for a slot
§Examples
use do_over::bulkhead::Bulkhead;
use std::time::Duration;

let bulkhead = Bulkhead::new(10)
    .with_queue_timeout(Duration::from_secs(1));
Examples found in repository?
examples/bulkhead.rs (line 98)
93async fn queue_timeout_example() {
94    println!("📌 Scenario 2: With Queue Timeout");
95    println!("   Configuration: max_concurrent=2, queue_timeout=150ms");
96    println!("   Launching 5 concurrent requests...\n");
97
98    let bulkhead = Arc::new(Bulkhead::new(2).with_queue_timeout(Duration::from_millis(150)));
99    let start = Instant::now();
100    let completed = Arc::new(AtomicUsize::new(0));
101    let rejected = Arc::new(AtomicUsize::new(0));
102    let queued = Arc::new(AtomicUsize::new(0));
103
104    let mut handles = vec![];
105
106    for i in 1..=5 {
107        let bh = Arc::clone(&bulkhead);
108        let comp = Arc::clone(&completed);
109        let rej = Arc::clone(&rejected);
110        let q = Arc::clone(&queued);
111        let s = start;
112
113        let handle = tokio::spawn(async move {
114            let request_start = s.elapsed().as_millis();
115            let result: Result<String, DoOverError<String>> = bh
116                .execute(|| async {
117                    let elapsed = s.elapsed().as_millis();
118                    let wait_time = elapsed - request_start;
119                    if wait_time > 10 {
120                        println!(
121                            "   [+{:>4}ms] Request {}: ⏳ Waited {}ms in queue, now processing",
122                            elapsed, i, wait_time
123                        );
124                        q.fetch_add(1, Ordering::SeqCst);
125                    } else {
126                        println!(
127                            "   [+{:>4}ms] Request {}: 🔓 Acquired slot immediately",
128                            elapsed, i
129                        );
130                    }
131                    tokio::time::sleep(Duration::from_millis(100)).await;
132                    let elapsed = s.elapsed().as_millis();
133                    println!(
134                        "   [+{:>4}ms] Request {}: ✅ Completed",
135                        elapsed, i
136                    );
137                    Ok(format!("Request {} done", i))
138                })
139                .await;
140
141            match result {
142                Ok(_) => {
143                    comp.fetch_add(1, Ordering::SeqCst);
144                }
145                Err(DoOverError::BulkheadFull) => {
146                    let elapsed = s.elapsed().as_millis();
147                    println!(
148                        "   [+{:>4}ms] Request {}: 🚫 Queue timeout expired",
149                        elapsed, i
150                    );
151                    rej.fetch_add(1, Ordering::SeqCst);
152                }
153                Err(e) => println!("   Request {}: Error - {:?}", i, e),
154            }
155        });
156        handles.push(handle);
157
158        // Small delay between launching requests
159        tokio::time::sleep(Duration::from_millis(20)).await;
160    }
161
162    for handle in handles {
163        handle.await.unwrap();
164    }
165
166    let elapsed = start.elapsed().as_millis();
167    println!("\n   Summary:");
168    println!("   - Completed:     {}", completed.load(Ordering::SeqCst));
169    println!("   - Queued/Waited: {}", queued.load(Ordering::SeqCst));
170    println!("   - Rejected:      {}", rejected.load(Ordering::SeqCst));
171    println!("   - Total time:    {}ms", elapsed);
172    println!("\n   💡 Requests waited in queue up to 150ms before being rejected");
173}

Trait Implementations§

Source§

impl Clone for Bulkhead

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<E> Policy<DoOverError<E>> for Bulkhead
where E: Send + Sync,

Source§

fn execute<'life0, 'async_trait, F, Fut, T>( &'life0 self, f: F, ) -> Pin<Box<dyn Future<Output = Result<T, DoOverError<E>>> + Send + 'async_trait>>
where F: Fn() -> Fut + Send + Sync + 'async_trait, Fut: Future<Output = Result<T, DoOverError<E>>> + Send + 'async_trait, T: Send + 'async_trait, Self: 'async_trait, 'life0: 'async_trait,

Execute an async operation with this policy’s resilience behavior. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.