reliakit_bulkhead/lib.rs
1//! Clock-agnostic concurrency limiter.
2//!
3//! `reliakit-bulkhead` caps how many operations may be *in flight* at once. It
4//! is a counting semaphore: you acquire a permit before starting work and
5//! release it when the work finishes. When no permit is available the request is
6//! rejected immediately so load is shed instead of piling up.
7//!
8//! It does not block, sleep, spawn tasks, or read the clock — acquiring a permit
9//! either succeeds now or fails now. That keeps it usable from synchronous code,
10//! any async runtime, and `no_std` / embedded targets, with deterministic tests.
11//!
12//! Where [`reliakit-ratelimit`](https://docs.rs/reliakit-ratelimit) caps the
13//! *rate* of operations over time, a [`Bulkhead`] caps the *number running at
14//! once*. The two compose: a rate limiter decides how often to start work, a
15//! bulkhead bounds how much runs concurrently.
16//!
17//! # Example
18//!
19//! ```
20//! use reliakit_bulkhead::Bulkhead;
21//!
22//! // Allow at most two concurrent operations.
23//! let mut bulkhead = Bulkhead::new(2);
24//!
25//! assert!(bulkhead.try_acquire_one()); // 1 in flight
26//! assert!(bulkhead.try_acquire_one()); // 2 in flight
27//! assert!(!bulkhead.try_acquire_one()); // full: rejected, shed load
28//!
29//! bulkhead.release_one(); // one operation finished
30//! assert!(bulkhead.try_acquire_one()); // room again
31//! ```
32//!
33//! # Releasing permits
34//!
35//! Every successful acquire must be matched by a release, including on the error
36//! path, or the bulkhead will slowly fill and reject everything. The crate keeps
37//! the model explicit (no RAII guard) so it stays `Copy` and `no_std` with no
38//! borrowing constraints; pair acquire/release yourself, e.g. with a `scopeguard`
39//! or a manual `Drop` wrapper in your own code.
40
41#![no_std]
42#![forbid(unsafe_code)]
43#![warn(missing_docs)]
44
45/// A concurrency limiter: a counting semaphore that caps in-flight operations.
46///
47/// `Bulkhead` is a small, `Copy` value holding a fixed `capacity` and the number
48/// of permits currently held (`in_flight`). [`try_acquire`](Self::try_acquire)
49/// takes permits when room exists and reports whether it succeeded;
50/// [`release`](Self::release) returns them.
51///
52/// The capacity is clamped to at least `1` at construction, so a bulkhead can
53/// always admit one operation. The invariant `in_flight <= capacity` holds on
54/// every public path, so [`available`](Self::available) never underflows.
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
56pub struct Bulkhead {
57 capacity: usize,
58 in_flight: usize,
59}
60
61impl Bulkhead {
62 /// Creates a bulkhead allowing at most `capacity` concurrent permits.
63 ///
64 /// `capacity` is clamped to a minimum of `1`: a bulkhead always admits at
65 /// least one operation, so a `0` would only ever reject and is treated as
66 /// `1`.
67 pub const fn new(capacity: usize) -> Self {
68 let capacity = if capacity == 0 { 1 } else { capacity };
69 Self {
70 capacity,
71 in_flight: 0,
72 }
73 }
74
75 /// Returns the maximum number of concurrent permits.
76 pub const fn capacity(&self) -> usize {
77 self.capacity
78 }
79
80 /// Returns the number of permits currently held.
81 pub const fn in_flight(&self) -> usize {
82 self.in_flight
83 }
84
85 /// Returns how many more permits can be acquired right now.
86 pub const fn available(&self) -> usize {
87 self.capacity - self.in_flight
88 }
89
90 /// Returns `true` when no further permits are available.
91 pub const fn is_full(&self) -> bool {
92 self.in_flight >= self.capacity
93 }
94
95 /// Returns `true` when no permits are held.
96 pub const fn is_empty(&self) -> bool {
97 self.in_flight == 0
98 }
99
100 /// Tries to acquire `permits` permits at once.
101 ///
102 /// Returns `true` and reserves them if at least `permits` are available;
103 /// otherwise returns `false` and changes nothing (no partial acquire). A
104 /// request for more than [`capacity`](Self::capacity) always fails.
105 /// Acquiring `0` permits always succeeds and reserves nothing.
106 pub fn try_acquire(&mut self, permits: usize) -> bool {
107 if permits > self.capacity {
108 return false;
109 }
110 if self.available() >= permits {
111 self.in_flight += permits;
112 true
113 } else {
114 false
115 }
116 }
117
118 /// Tries to acquire a single permit. See [`try_acquire`](Self::try_acquire).
119 pub fn try_acquire_one(&mut self) -> bool {
120 self.try_acquire(1)
121 }
122
123 /// Releases `permits` permits back to the bulkhead.
124 ///
125 /// Saturates at zero, so releasing more than are held simply empties the
126 /// bulkhead rather than underflowing — a release without a matching acquire
127 /// cannot drive `in_flight` negative or panic.
128 pub fn release(&mut self, permits: usize) {
129 self.in_flight = self.in_flight.saturating_sub(permits);
130 }
131
132 /// Releases a single permit. See [`release`](Self::release).
133 pub fn release_one(&mut self) {
134 self.release(1);
135 }
136
137 /// Releases every held permit, returning the bulkhead to empty.
138 pub fn reset(&mut self) {
139 self.in_flight = 0;
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146
147 #[test]
148 fn new_starts_empty() {
149 let b = Bulkhead::new(3);
150 assert_eq!(b.capacity(), 3);
151 assert_eq!(b.in_flight(), 0);
152 assert_eq!(b.available(), 3);
153 assert!(b.is_empty());
154 assert!(!b.is_full());
155 }
156
157 #[test]
158 fn capacity_clamped_to_one() {
159 let mut b = Bulkhead::new(0);
160 assert_eq!(b.capacity(), 1);
161 assert!(b.try_acquire_one());
162 assert!(!b.try_acquire_one());
163 }
164
165 #[test]
166 fn acquire_until_full_then_reject() {
167 let mut b = Bulkhead::new(2);
168 assert!(b.try_acquire_one());
169 assert!(b.try_acquire_one());
170 assert!(b.is_full());
171 assert!(!b.try_acquire_one());
172 assert_eq!(b.in_flight(), 2);
173 }
174
175 #[test]
176 fn release_frees_room() {
177 let mut b = Bulkhead::new(1);
178 assert!(b.try_acquire_one());
179 assert!(!b.try_acquire_one());
180 b.release_one();
181 assert!(b.is_empty());
182 assert!(b.try_acquire_one());
183 }
184
185 #[test]
186 fn batch_acquire_all_or_nothing() {
187 let mut b = Bulkhead::new(5);
188 assert!(b.try_acquire(3));
189 assert_eq!(b.available(), 2);
190 // Not enough room for 3 more: nothing is taken.
191 assert!(!b.try_acquire(3));
192 assert_eq!(b.available(), 2);
193 assert!(b.try_acquire(2));
194 assert!(b.is_full());
195 }
196
197 #[test]
198 fn acquire_more_than_capacity_always_fails() {
199 let mut b = Bulkhead::new(4);
200 assert!(!b.try_acquire(5));
201 assert_eq!(b.in_flight(), 0);
202 }
203
204 #[test]
205 fn acquire_zero_succeeds_and_reserves_nothing() {
206 let mut b = Bulkhead::new(2);
207 assert!(b.try_acquire(0));
208 assert_eq!(b.in_flight(), 0);
209 }
210
211 #[test]
212 fn release_saturates_at_zero() {
213 let mut b = Bulkhead::new(2);
214 assert!(b.try_acquire_one());
215 b.release(100);
216 assert_eq!(b.in_flight(), 0);
217 assert!(b.is_empty());
218 // A spurious release on an empty bulkhead stays at zero.
219 b.release_one();
220 assert_eq!(b.in_flight(), 0);
221 }
222
223 #[test]
224 fn reset_clears_all_permits() {
225 let mut b = Bulkhead::new(4);
226 assert!(b.try_acquire(3));
227 b.reset();
228 assert!(b.is_empty());
229 assert_eq!(b.available(), 4);
230 }
231
232 #[test]
233 fn available_never_underflows_at_capacity() {
234 let mut b = Bulkhead::new(usize::MAX);
235 assert!(b.try_acquire(usize::MAX));
236 assert!(b.is_full());
237 assert_eq!(b.available(), 0);
238 assert!(!b.try_acquire_one());
239 }
240}