Skip to main content

reliakit_bulkhead/
lib.rs

1//! Clock-agnostic concurrency limiter.
2//!
3//! `reliakit-bulkhead` caps how many operations may be *in flight* at once. It
4//! is a counting semaphore: you acquire a permit before starting work and
5//! release it when the work finishes. When no permit is available the request is
6//! rejected immediately so load is shed instead of piling up.
7//!
8//! It does not block, sleep, spawn tasks, or read the clock — acquiring a permit
9//! either succeeds now or fails now. That keeps it usable from synchronous code,
10//! any async runtime, and `no_std` / embedded targets, with deterministic tests.
11//!
12//! Where [`reliakit-ratelimit`](https://docs.rs/reliakit-ratelimit) caps the
13//! *rate* of operations over time, a [`Bulkhead`] caps the *number running at
14//! once*. The two compose: a rate limiter decides how often to start work, a
15//! bulkhead bounds how much runs concurrently.
16//!
17//! # Example
18//!
19//! ```
20//! use reliakit_bulkhead::Bulkhead;
21//!
22//! // Allow at most two concurrent operations.
23//! let mut bulkhead = Bulkhead::new(2);
24//!
25//! assert!(bulkhead.try_acquire_one()); // 1 in flight
26//! assert!(bulkhead.try_acquire_one()); // 2 in flight
27//! assert!(!bulkhead.try_acquire_one()); // full: rejected, shed load
28//!
29//! bulkhead.release_one(); // one operation finished
30//! assert!(bulkhead.try_acquire_one()); // room again
31//! ```
32//!
33//! # Releasing permits
34//!
35//! Every successful acquire must be matched by a release, including on the error
36//! path, or the bulkhead will slowly fill and reject everything. The crate keeps
37//! the model explicit (no RAII guard) so it stays `Copy` and `no_std` with no
38//! borrowing constraints; pair acquire/release yourself, e.g. with a `scopeguard`
39//! or a manual `Drop` wrapper in your own code.
40
41#![no_std]
42#![forbid(unsafe_code)]
43#![warn(missing_docs)]
44
45/// A concurrency limiter: a counting semaphore that caps in-flight operations.
46///
47/// `Bulkhead` is a small, `Copy` value holding a fixed `capacity` and the number
48/// of permits currently held (`in_flight`). [`try_acquire`](Self::try_acquire)
49/// takes permits when room exists and reports whether it succeeded;
50/// [`release`](Self::release) returns them.
51///
52/// The capacity is clamped to at least `1` at construction, so a bulkhead can
53/// always admit one operation. The invariant `in_flight <= capacity` holds on
54/// every public path, so [`available`](Self::available) never underflows.
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
56pub struct Bulkhead {
57    capacity: usize,
58    in_flight: usize,
59}
60
61impl Bulkhead {
62    /// Creates a bulkhead allowing at most `capacity` concurrent permits.
63    ///
64    /// `capacity` is clamped to a minimum of `1`: a bulkhead always admits at
65    /// least one operation, so a `0` would only ever reject and is treated as
66    /// `1`.
67    pub const fn new(capacity: usize) -> Self {
68        let capacity = if capacity == 0 { 1 } else { capacity };
69        Self {
70            capacity,
71            in_flight: 0,
72        }
73    }
74
75    /// Returns the maximum number of concurrent permits.
76    pub const fn capacity(&self) -> usize {
77        self.capacity
78    }
79
80    /// Returns the number of permits currently held.
81    pub const fn in_flight(&self) -> usize {
82        self.in_flight
83    }
84
85    /// Returns how many more permits can be acquired right now.
86    pub const fn available(&self) -> usize {
87        self.capacity - self.in_flight
88    }
89
90    /// Returns `true` when no further permits are available.
91    pub const fn is_full(&self) -> bool {
92        self.in_flight >= self.capacity
93    }
94
95    /// Returns `true` when no permits are held.
96    pub const fn is_empty(&self) -> bool {
97        self.in_flight == 0
98    }
99
100    /// Tries to acquire `permits` permits at once.
101    ///
102    /// Returns `true` and reserves them if at least `permits` are available;
103    /// otherwise returns `false` and changes nothing (no partial acquire). A
104    /// request for more than [`capacity`](Self::capacity) always fails.
105    /// Acquiring `0` permits always succeeds and reserves nothing.
106    pub fn try_acquire(&mut self, permits: usize) -> bool {
107        if permits > self.capacity {
108            return false;
109        }
110        if self.available() >= permits {
111            self.in_flight += permits;
112            true
113        } else {
114            false
115        }
116    }
117
118    /// Tries to acquire a single permit. See [`try_acquire`](Self::try_acquire).
119    pub fn try_acquire_one(&mut self) -> bool {
120        self.try_acquire(1)
121    }
122
123    /// Releases `permits` permits back to the bulkhead.
124    ///
125    /// Saturates at zero, so releasing more than are held simply empties the
126    /// bulkhead rather than underflowing — a release without a matching acquire
127    /// cannot drive `in_flight` negative or panic.
128    pub fn release(&mut self, permits: usize) {
129        self.in_flight = self.in_flight.saturating_sub(permits);
130    }
131
132    /// Releases a single permit. See [`release`](Self::release).
133    pub fn release_one(&mut self) {
134        self.release(1);
135    }
136
137    /// Releases every held permit, returning the bulkhead to empty.
138    pub fn reset(&mut self) {
139        self.in_flight = 0;
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146
147    #[test]
148    fn new_starts_empty() {
149        let b = Bulkhead::new(3);
150        assert_eq!(b.capacity(), 3);
151        assert_eq!(b.in_flight(), 0);
152        assert_eq!(b.available(), 3);
153        assert!(b.is_empty());
154        assert!(!b.is_full());
155    }
156
157    #[test]
158    fn capacity_clamped_to_one() {
159        let mut b = Bulkhead::new(0);
160        assert_eq!(b.capacity(), 1);
161        assert!(b.try_acquire_one());
162        assert!(!b.try_acquire_one());
163    }
164
165    #[test]
166    fn acquire_until_full_then_reject() {
167        let mut b = Bulkhead::new(2);
168        assert!(b.try_acquire_one());
169        assert!(b.try_acquire_one());
170        assert!(b.is_full());
171        assert!(!b.try_acquire_one());
172        assert_eq!(b.in_flight(), 2);
173    }
174
175    #[test]
176    fn release_frees_room() {
177        let mut b = Bulkhead::new(1);
178        assert!(b.try_acquire_one());
179        assert!(!b.try_acquire_one());
180        b.release_one();
181        assert!(b.is_empty());
182        assert!(b.try_acquire_one());
183    }
184
185    #[test]
186    fn batch_acquire_all_or_nothing() {
187        let mut b = Bulkhead::new(5);
188        assert!(b.try_acquire(3));
189        assert_eq!(b.available(), 2);
190        // Not enough room for 3 more: nothing is taken.
191        assert!(!b.try_acquire(3));
192        assert_eq!(b.available(), 2);
193        assert!(b.try_acquire(2));
194        assert!(b.is_full());
195    }
196
197    #[test]
198    fn acquire_more_than_capacity_always_fails() {
199        let mut b = Bulkhead::new(4);
200        assert!(!b.try_acquire(5));
201        assert_eq!(b.in_flight(), 0);
202    }
203
204    #[test]
205    fn acquire_zero_succeeds_and_reserves_nothing() {
206        let mut b = Bulkhead::new(2);
207        assert!(b.try_acquire(0));
208        assert_eq!(b.in_flight(), 0);
209    }
210
211    #[test]
212    fn release_saturates_at_zero() {
213        let mut b = Bulkhead::new(2);
214        assert!(b.try_acquire_one());
215        b.release(100);
216        assert_eq!(b.in_flight(), 0);
217        assert!(b.is_empty());
218        // A spurious release on an empty bulkhead stays at zero.
219        b.release_one();
220        assert_eq!(b.in_flight(), 0);
221    }
222
223    #[test]
224    fn reset_clears_all_permits() {
225        let mut b = Bulkhead::new(4);
226        assert!(b.try_acquire(3));
227        b.reset();
228        assert!(b.is_empty());
229        assert_eq!(b.available(), 4);
230    }
231
232    #[test]
233    fn available_never_underflows_at_capacity() {
234        let mut b = Bulkhead::new(usize::MAX);
235        assert!(b.try_acquire(usize::MAX));
236        assert!(b.is_full());
237        assert_eq!(b.available(), 0);
238        assert!(!b.try_acquire_one());
239    }
240}