Skip to main content

reliability_toolkit/
bulkhead.rs

1//! Bulkhead — semaphore-backed concurrency cap.
2//!
3//! Wraps `tokio::sync::Semaphore`. Useful for isolating a downstream resource
4//! ("at most N in-flight calls to this database") so one runaway caller can't
5//! starve the rest of the system.
6//!
7//! Holding a [`BulkheadPermit`] keeps a slot reserved; dropping it releases.
8
9use std::sync::Arc;
10
11use tokio::sync::{OwnedSemaphorePermit, Semaphore};
12
13use crate::error::ToolkitError;
14
15/// A concurrency cap.
16#[derive(Clone, Debug)]
17pub struct Bulkhead {
18    sem: Arc<Semaphore>,
19    capacity: usize,
20}
21
22/// Active permit. Dropping it releases the slot.
23pub struct BulkheadPermit(#[allow(dead_code)] OwnedSemaphorePermit);
24
25impl std::fmt::Debug for BulkheadPermit {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        f.debug_struct("BulkheadPermit").finish()
28    }
29}
30
31impl Bulkhead {
32    /// Build a bulkhead allowing up to `capacity` concurrent permits.
33    ///
34    /// # Panics
35    ///
36    /// Panics if `capacity` is zero.
37    pub fn new(capacity: usize) -> Self {
38        assert!(capacity > 0, "capacity must be > 0");
39        Self {
40            sem: Arc::new(Semaphore::new(capacity)),
41            capacity,
42        }
43    }
44
45    /// Wait until a slot is free, then take it.
46    pub async fn acquire(&self) -> Result<BulkheadPermit, ToolkitError> {
47        match self.sem.clone().acquire_owned().await {
48            Ok(p) => Ok(BulkheadPermit(p)),
49            Err(_) => Err(ToolkitError::BulkheadClosed),
50        }
51    }
52
53    /// Try to take a slot without waiting. Returns `None` if all slots are busy.
54    pub fn try_acquire(&self) -> Option<BulkheadPermit> {
55        self.sem
56            .clone()
57            .try_acquire_owned()
58            .ok()
59            .map(BulkheadPermit)
60    }
61
62    /// Number of currently free slots.
63    pub fn available_permits(&self) -> usize {
64        self.sem.available_permits()
65    }
66
67    /// Total capacity (slots when fully idle).
68    pub fn capacity(&self) -> usize {
69        self.capacity
70    }
71
72    /// Close the bulkhead — pending acquires will return [`ToolkitError::BulkheadClosed`].
73    pub fn close(&self) {
74        self.sem.close();
75    }
76}