Skip to main content

arcly_http/resilience/
bulkhead.rs

1//! Bulkhead — bounded concurrency per endpoint group.
2//!
3//! The circuit breaker protects against a dependency that *fails*; the
4//! bulkhead protects against one that is merely *slow*. Without it, a single
5//! laggy downstream lets in-flight requests pile up until every worker on the
6//! pod is parked on that dependency — the classic slow-loris cascade that a
7//! breaker never trips on (nothing is erroring, everything is just waiting).
8//!
9//! `tokio::sync::Semaphore` is a lock-free waiter queue, and `try_acquire`
10//! never waits at all — saturation answers `503` immediately so the caller
11//! retries on a healthier replica, instead of queueing latency on this one.
12//!
13//! ```ignore
14//! static PAYMENT: Bulkhead = Bulkhead::new("payment", 32);
15//!
16//! async fn create_order(ctx: RequestContext /* ... */) -> Result<Json<Value>, HttpException> {
17//!     let _permit = PAYMENT.try_enter()?;   // 503 when saturated
18//!     payment.charge(/* ... */).await               // permit released on drop
19//! }
20//! ```
21
22use tokio::sync::{Semaphore, SemaphorePermit};
23
24use crate::web::Error;
25
26pub struct Bulkhead {
27    sem: Semaphore,
28    name: &'static str,
29}
30
31impl Bulkhead {
32    /// `const`-constructible so it can live in a `static` next to the handler.
33    pub const fn new(name: &'static str, max_concurrent: usize) -> Self {
34        Self {
35            sem: Semaphore::const_new(max_concurrent),
36            name,
37        }
38    }
39
40    /// Non-blocking admission: shed (`503`) instead of queueing when full.
41    /// Hold the returned permit for the protected section; drop releases it.
42    pub fn try_enter(&self) -> Result<SemaphorePermit<'_>, Error> {
43        let result = self.sem.try_acquire().map_err(|_| {
44            metrics::counter!("bulkhead_rejected_total", "name" => self.name).increment(1);
45            Error::ServiceUnavailable("bulkhead saturated")
46        });
47        // Approximate occupancy gauge — refreshed on every admission attempt.
48        metrics::gauge!("bulkhead_available", "name" => self.name)
49            .set(self.sem.available_permits() as f64);
50        result
51    }
52
53    /// Currently available slots (for dashboards/tests).
54    pub fn available(&self) -> usize {
55        self.sem.available_permits()
56    }
57}