arcly_http/resilience/bulkhead.rs
1//! Bulkhead — bounded concurrency per endpoint group.
2//!
3//! The circuit breaker protects against a dependency that *fails*; the
4//! bulkhead protects against one that is merely *slow*. Without it, a single
5//! laggy downstream lets in-flight requests pile up until every worker on the
6//! pod is parked on that dependency — the classic slow-loris cascade that a
7//! breaker never trips on (nothing is erroring, everything is just waiting).
8//!
9//! `tokio::sync::Semaphore` is a lock-free waiter queue, and `try_acquire`
10//! never waits at all — saturation answers `503` immediately so the caller
11//! retries on a healthier replica, instead of queueing latency on this one.
12//!
13//! ```ignore
14//! static PAYMENT: Bulkhead = Bulkhead::new("payment", 32);
15//!
16//! async fn create_order(ctx: RequestContext /* ... */) -> Result<Json<Value>, HttpException> {
17//! let _permit = PAYMENT.try_enter()?; // 503 when saturated
18//! payment.charge(/* ... */).await // permit released on drop
19//! }
20//! ```
21
22use tokio::sync::{Semaphore, SemaphorePermit};
23
24use crate::web::Error;
25
26pub struct Bulkhead {
27 sem: Semaphore,
28 name: &'static str,
29}
30
31impl Bulkhead {
32 /// `const`-constructible so it can live in a `static` next to the handler.
33 pub const fn new(name: &'static str, max_concurrent: usize) -> Self {
34 Self {
35 sem: Semaphore::const_new(max_concurrent),
36 name,
37 }
38 }
39
40 /// Non-blocking admission: shed (`503`) instead of queueing when full.
41 /// Hold the returned permit for the protected section; drop releases it.
42 pub fn try_enter(&self) -> Result<SemaphorePermit<'_>, Error> {
43 let result = self.sem.try_acquire().map_err(|_| {
44 metrics::counter!("bulkhead_rejected_total", "name" => self.name).increment(1);
45 Error::ServiceUnavailable("bulkhead saturated")
46 });
47 // Approximate occupancy gauge — refreshed on every admission attempt.
48 metrics::gauge!("bulkhead_available", "name" => self.name)
49 .set(self.sem.available_permits() as f64);
50 result
51 }
52
53 /// Currently available slots (for dashboards/tests).
54 pub fn available(&self) -> usize {
55 self.sem.available_permits()
56 }
57}