use parking_lot::RwLock;
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::Duration;
use crate::types::Time;
#[derive(Clone)]
pub struct BulkheadPolicy {
pub name: String,
pub max_concurrent: u32,
pub max_queue: u32,
pub queue_timeout: Duration,
pub weighted: bool,
pub on_full: Option<FullCallback>,
}
impl BulkheadPolicy {
#[must_use]
pub fn concurrency(mut self, max: u32) -> Self {
self.max_concurrent = max;
self
}
}
pub type FullCallback = Arc<dyn Fn(&BulkheadMetrics) + Send + Sync>;
impl fmt::Debug for BulkheadPolicy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BulkheadPolicy")
.field("name", &self.name)
.field("max_concurrent", &self.max_concurrent)
.field("max_queue", &self.max_queue)
.field("queue_timeout", &self.queue_timeout)
.field("weighted", &self.weighted)
.field("on_full", &self.on_full.is_some())
.finish()
}
}
impl Default for BulkheadPolicy {
fn default() -> Self {
Self {
name: "default".into(),
max_concurrent: 10,
max_queue: 100,
queue_timeout: Duration::from_secs(5),
weighted: false,
on_full: None,
}
}
}
#[derive(Clone, Debug, Default)]
pub struct BulkheadMetrics {
pub active_permits: u32,
pub queue_depth: u32,
pub total_executed: u64,
pub total_queued: u64,
pub total_rejected: u64,
pub total_timeout: u64,
pub total_cancelled: u64,
pub avg_queue_wait_ms: f64,
pub max_queue_wait_ms: u64,
pub utilization: f64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RejectionReason {
Timeout,
}
type QueueEntryResult = Option<Result<(), RejectionReason>>;
#[derive(Debug)]
struct QueueEntry {
id: u64,
weight: u32,
enqueued_at_millis: u64,
deadline_millis: u64,
result: QueueEntryResult,
}
pub struct Bulkhead {
policy: BulkheadPolicy,
available_permits: AtomicU32,
queue: RwLock<VecDeque<QueueEntry>>,
pending_queue_count: AtomicU32,
next_id: AtomicU64,
total_wait_time_ms: AtomicU64,
total_executed_atomic: AtomicU64,
total_queued_atomic: AtomicU64,
total_rejected_atomic: AtomicU64,
total_timeout_atomic: AtomicU64,
total_cancelled_atomic: AtomicU64,
max_queue_wait_ms_atomic: AtomicU64,
}
impl Bulkhead {
#[must_use]
pub fn new(policy: BulkheadPolicy) -> Self {
let available = policy.max_concurrent;
let max_queue = policy.max_queue as usize;
Self {
policy,
available_permits: AtomicU32::new(available),
queue: RwLock::new(VecDeque::with_capacity(max_queue)),
pending_queue_count: AtomicU32::new(0),
next_id: AtomicU64::new(0),
total_wait_time_ms: AtomicU64::new(0),
total_executed_atomic: AtomicU64::new(0),
total_queued_atomic: AtomicU64::new(0),
total_rejected_atomic: AtomicU64::new(0),
total_timeout_atomic: AtomicU64::new(0),
total_cancelled_atomic: AtomicU64::new(0),
max_queue_wait_ms_atomic: AtomicU64::new(0),
}
}
#[must_use]
pub fn name(&self) -> &str {
&self.policy.name
}
#[must_use]
pub fn max_concurrent(&self) -> u32 {
self.policy.max_concurrent
}
#[must_use]
pub fn available(&self) -> u32 {
self.available_permits.load(Ordering::Acquire)
}
#[must_use]
#[allow(clippy::significant_drop_tightening, clippy::cast_precision_loss)]
pub fn metrics(&self) -> BulkheadMetrics {
let active = self.pending_queue_count.load(Ordering::Relaxed);
let used_permits =
self.policy.max_concurrent - self.available_permits.load(Ordering::Acquire);
let total_executed = self.total_executed_atomic.load(Ordering::Relaxed);
let total_queued = self.total_queued_atomic.load(Ordering::Relaxed);
let completed_queued = total_queued.saturating_sub(u64::from(active));
let avg_queue_wait_ms = if completed_queued > 0 {
self.total_wait_time_ms.load(Ordering::Relaxed) as f64 / completed_queued as f64
} else {
0.0
};
BulkheadMetrics {
active_permits: used_permits,
queue_depth: active,
total_executed,
total_queued: self.total_queued_atomic.load(Ordering::Relaxed),
total_rejected: self.total_rejected_atomic.load(Ordering::Relaxed),
total_timeout: self.total_timeout_atomic.load(Ordering::Relaxed),
total_cancelled: self.total_cancelled_atomic.load(Ordering::Relaxed),
avg_queue_wait_ms,
max_queue_wait_ms: self.max_queue_wait_ms_atomic.load(Ordering::Relaxed),
utilization: if self.policy.max_concurrent > 0 {
f64::from(used_permits) / f64::from(self.policy.max_concurrent)
} else {
0.0
},
}
}
#[must_use]
pub fn try_acquire(&self, weight: u32) -> Option<BulkheadPermit<'_>> {
if self.pending_queue_count.load(Ordering::Acquire) > 0 {
return None;
}
let queue = self.queue.read();
if queue.iter().any(|entry| entry.result.is_none()) {
return None;
}
let mut available = self.available_permits.load(Ordering::Acquire);
loop {
if available < weight {
return None;
}
match self.available_permits.compare_exchange_weak(
available,
available - weight,
Ordering::Acquire,
Ordering::Acquire,
) {
Ok(_) => {
self.total_executed_atomic.fetch_add(1, Ordering::Relaxed);
return Some(BulkheadPermit {
bulkhead: self,
weight,
});
}
Err(actual) => available = actual,
}
}
}
#[allow(clippy::cast_precision_loss)]
pub fn process_queue(&self, now: Time) -> Option<u64> {
let mut queue = self.queue.write();
self.process_queue_inner(&mut queue, now)
}
fn process_queue_inner(&self, queue: &mut VecDeque<QueueEntry>, now: Time) -> Option<u64> {
let now_millis = now.as_millis();
let mut timeout_count = 0u64;
let mut timeout_wait_ms = 0u64;
let mut max_individual_wait_ms = 0u64;
for entry in queue.iter_mut() {
if entry.result.is_none() && now_millis >= entry.deadline_millis {
entry.result = Some(Err(RejectionReason::Timeout));
timeout_count += 1;
let wait = now_millis.saturating_sub(entry.enqueued_at_millis);
timeout_wait_ms += wait;
max_individual_wait_ms = max_individual_wait_ms.max(wait);
}
}
if timeout_count > 0 {
#[allow(clippy::cast_possible_truncation)]
self.pending_queue_count
.fetch_sub(timeout_count as u32, Ordering::Release);
self.total_timeout_atomic
.fetch_add(timeout_count, Ordering::Relaxed);
self.total_wait_time_ms
.fetch_add(timeout_wait_ms, Ordering::Relaxed);
self.max_queue_wait_ms_atomic
.fetch_max(max_individual_wait_ms, Ordering::Relaxed);
}
let mut first_granted = None;
for entry in queue.iter_mut() {
if entry.result.is_none() {
let granted = {
let mut current = self.available_permits.load(Ordering::Acquire);
loop {
if current < entry.weight {
break false;
}
match self.available_permits.compare_exchange_weak(
current,
current - entry.weight,
Ordering::Acquire,
Ordering::Acquire,
) {
Ok(_) => break true,
Err(actual) => current = actual,
}
}
};
if !granted {
break;
}
entry.result = Some(Ok(()));
self.pending_queue_count.fetch_sub(1, Ordering::Release);
let wait_ms = now_millis.saturating_sub(entry.enqueued_at_millis);
self.total_wait_time_ms
.fetch_add(wait_ms, Ordering::Relaxed);
self.total_executed_atomic.fetch_add(1, Ordering::Relaxed);
self.max_queue_wait_ms_atomic
.fetch_max(wait_ms, Ordering::Relaxed);
if first_granted.is_none() {
first_granted = Some(entry.id);
}
}
}
first_granted
}
#[allow(clippy::significant_drop_tightening, clippy::cast_precision_loss)]
pub fn enqueue(&self, weight: u32, now: Time) -> Result<u64, BulkheadError<()>> {
if weight > self.policy.max_concurrent {
self.total_rejected_atomic.fetch_add(1, Ordering::Relaxed);
return Err(BulkheadError::Full);
}
let now_millis = now.as_millis();
let timeout_millis = self
.policy
.queue_timeout
.as_millis()
.min(u128::from(u64::MAX)) as u64;
let deadline_millis = now_millis.saturating_add(timeout_millis);
let mut queue = self.queue.write();
if queue.len() >= self.policy.max_queue as usize {
self.total_rejected_atomic.fetch_add(1, Ordering::Relaxed);
if let Some(ref callback) = self.policy.on_full {
drop(queue);
callback(&self.metrics());
}
return Err(BulkheadError::QueueFull);
}
let entry_id = self.next_id.fetch_add(1, Ordering::Relaxed);
queue.push_back(QueueEntry {
id: entry_id,
weight,
enqueued_at_millis: now_millis,
deadline_millis,
result: None,
});
self.pending_queue_count.fetch_add(1, Ordering::Release);
self.total_queued_atomic.fetch_add(1, Ordering::Relaxed);
Ok(entry_id)
}
#[allow(clippy::option_if_let_else, clippy::significant_drop_tightening)]
pub fn check_entry(
&self,
entry_id: u64,
now: Time,
) -> Result<Option<BulkheadPermit<'_>>, BulkheadError<()>> {
let mut queue = self.queue.write();
let _ = self.process_queue_inner(&mut queue, now);
let entry_idx = queue.iter().position(|e| e.id == entry_id);
if let Some(idx) = entry_idx {
match queue[idx].result {
Some(Ok(())) => {
let entry = queue.remove(idx).expect("entry must exist");
Ok(Some(BulkheadPermit {
bulkhead: self,
weight: entry.weight,
}))
}
Some(Err(RejectionReason::Timeout)) => {
let entry = queue.remove(idx).expect("entry must exist");
let wait_ms = now.as_millis().saturating_sub(entry.enqueued_at_millis);
Err(BulkheadError::QueueTimeout {
waited: Duration::from_millis(wait_ms),
})
}
None => Ok(None),
}
} else {
Err(BulkheadError::Cancelled)
}
}
pub fn cancel_entry(&self, entry_id: u64, now: Time) {
let mut queue = self.queue.write();
if let Some(idx) = queue.iter().position(|e| e.id == entry_id) {
let entry = queue.remove(idx).expect("entry must exist");
let previous_result = entry.result;
drop(queue);
if matches!(previous_result, Some(Ok(()))) {
self.release_permit(entry.weight);
self.total_cancelled_atomic.fetch_add(1, Ordering::Relaxed);
let _ = self.process_queue(now);
} else if previous_result.is_none() {
let wait_ms = now.as_millis().saturating_sub(entry.enqueued_at_millis);
self.total_wait_time_ms
.fetch_add(wait_ms, Ordering::Relaxed);
self.max_queue_wait_ms_atomic
.fetch_max(wait_ms, Ordering::Relaxed);
self.pending_queue_count.fetch_sub(1, Ordering::Release);
self.total_cancelled_atomic.fetch_add(1, Ordering::Relaxed);
let _ = self.process_queue(now);
}
}
}
fn release_permit(&self, weight: u32) {
let max = self.policy.max_concurrent;
let mut current = self.available_permits.load(Ordering::Acquire);
loop {
let new = current.saturating_add(weight).min(max);
if new == current {
break;
}
match self.available_permits.compare_exchange_weak(
current,
new,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
pub fn call<T, E, F>(&self, op: F) -> Result<T, BulkheadError<E>>
where
F: FnOnce() -> Result<T, E>,
E: fmt::Display,
{
self.call_weighted(1, op)
}
pub fn call_weighted<T, E, F>(&self, weight: u32, op: F) -> Result<T, BulkheadError<E>>
where
F: FnOnce() -> Result<T, E>,
E: fmt::Display,
{
let _permit = self.try_acquire(weight).ok_or(BulkheadError::Full)?;
op().map_err(BulkheadError::Inner)
}
pub fn reset(&self) {
self.available_permits
.store(self.policy.max_concurrent, Ordering::Release);
let mut queue = self.queue.write();
queue.clear();
self.pending_queue_count.store(0, Ordering::Release);
drop(queue);
}
}
impl fmt::Debug for Bulkhead {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Bulkhead")
.field("name", &self.policy.name)
.field("available", &self.available_permits.load(Ordering::Relaxed))
.field("max_concurrent", &self.policy.max_concurrent)
.finish_non_exhaustive()
}
}
#[derive(Debug)]
pub struct BulkheadPermit<'a> {
bulkhead: &'a Bulkhead,
weight: u32,
}
impl BulkheadPermit<'_> {
#[must_use]
pub fn weight(&self) -> u32 {
self.weight
}
pub fn release(self) {
drop(self);
}
}
impl Drop for BulkheadPermit<'_> {
fn drop(&mut self) {
self.bulkhead.release_permit(self.weight);
}
}
#[derive(Debug, Clone)]
pub enum BulkheadError<E> {
Full,
QueueFull,
QueueTimeout {
waited: Duration,
},
Cancelled,
Inner(E),
}
impl<E: fmt::Display> fmt::Display for BulkheadError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Full => write!(f, "bulkhead full"),
Self::QueueFull => write!(f, "bulkhead queue full"),
Self::QueueTimeout { waited } => {
write!(f, "bulkhead queue timeout after {waited:?}")
}
Self::Cancelled => write!(f, "cancelled while waiting for bulkhead"),
Self::Inner(e) => write!(f, "{e}"),
}
}
}
impl<E: fmt::Debug + fmt::Display> std::error::Error for BulkheadError<E> {}
#[derive(Default)]
pub struct BulkheadPolicyBuilder {
policy: BulkheadPolicy,
}
impl BulkheadPolicyBuilder {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn name(mut self, name: impl Into<String>) -> Self {
self.policy.name = name.into();
self
}
#[must_use]
pub const fn max_concurrent(mut self, max: u32) -> Self {
self.policy.max_concurrent = max;
self
}
#[must_use]
pub const fn max_queue(mut self, max: u32) -> Self {
self.policy.max_queue = max;
self
}
#[must_use]
pub const fn queue_timeout(mut self, timeout: Duration) -> Self {
self.policy.queue_timeout = timeout;
self
}
#[must_use]
pub const fn weighted(mut self, enabled: bool) -> Self {
self.policy.weighted = enabled;
self
}
#[must_use]
pub fn on_full(mut self, callback: FullCallback) -> Self {
self.policy.on_full = Some(callback);
self
}
#[must_use]
pub fn build(self) -> BulkheadPolicy {
self.policy
}
}
pub struct BulkheadRegistry {
bulkheads: RwLock<HashMap<String, Arc<Bulkhead>>>,
default_policy: BulkheadPolicy,
}
const DEFAULT_BULKHEAD_REGISTRY_CAPACITY: usize = 16;
impl BulkheadRegistry {
#[must_use]
pub fn new(default_policy: BulkheadPolicy) -> Self {
Self {
bulkheads: RwLock::new(HashMap::with_capacity(DEFAULT_BULKHEAD_REGISTRY_CAPACITY)),
default_policy,
}
}
pub fn get_or_create(&self, name: &str) -> Arc<Bulkhead> {
{
let bulkheads = self.bulkheads.read();
if let Some(b) = bulkheads.get(name) {
return b.clone();
}
}
let mut bulkheads = self.bulkheads.write();
bulkheads
.entry(name.to_string())
.or_insert_with(|| {
Arc::new(Bulkhead::new(BulkheadPolicy {
name: name.to_string(),
..self.default_policy.clone()
}))
})
.clone()
}
pub fn get_or_create_with(&self, name: &str, policy: BulkheadPolicy) -> Arc<Bulkhead> {
let mut bulkheads = self.bulkheads.write();
bulkheads
.entry(name.to_string())
.or_insert_with(|| Arc::new(Bulkhead::new(policy)))
.clone()
}
#[must_use]
pub fn all_metrics(&self) -> HashMap<String, BulkheadMetrics> {
let bulkheads = self.bulkheads.read();
let mut metrics = HashMap::with_capacity(bulkheads.len());
for (name, bulkhead) in bulkheads.iter() {
metrics.insert(name.clone(), bulkhead.metrics());
}
drop(bulkheads);
metrics
}
pub fn remove(&self, name: &str) -> Option<Arc<Bulkhead>> {
let mut bulkheads = self.bulkheads.write();
bulkheads.remove(name)
}
}
impl fmt::Debug for BulkheadRegistry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let bulkheads = self.bulkheads.read();
f.debug_struct("BulkheadRegistry")
.field("count", &bulkheads.len())
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::conformance::{ConformanceTarget, LabRuntimeTarget, TestConfig};
use crate::cx::Cx;
use crate::runtime::yield_now;
use crate::types::Budget;
use proptest::prelude::*;
use serde_json::Value;
use std::sync::Mutex;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
#[test]
fn new_bulkhead_has_full_capacity() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 10,
..Default::default()
});
assert_eq!(bh.available_permits.load(Ordering::SeqCst), 10);
assert_eq!(bh.metrics().active_permits, 0);
assert!((bh.metrics().utilization - 0.0).abs() < f64::EPSILON);
}
#[test]
fn try_acquire_reduces_available() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 10,
..Default::default()
});
let permit = bh.try_acquire(1).unwrap();
assert_eq!(bh.available_permits.load(Ordering::SeqCst), 9);
assert_eq!(bh.metrics().active_permits, 1);
permit.release();
assert_eq!(bh.available_permits.load(Ordering::SeqCst), 10);
assert_eq!(bh.metrics().active_permits, 0);
}
#[test]
fn try_acquire_fails_when_exhausted() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 2,
..Default::default()
});
let p1 = bh.try_acquire(1).unwrap();
let p2 = bh.try_acquire(1).unwrap();
let p3 = bh.try_acquire(1);
assert!(p3.is_none());
assert_eq!(bh.metrics().active_permits, 2);
p1.release();
p2.release();
}
#[test]
fn weighted_permit_consumes_multiple() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 10,
..Default::default()
});
let permit = bh.try_acquire(5).unwrap();
assert_eq!(bh.available_permits.load(Ordering::SeqCst), 5);
assert_eq!(permit.weight(), 5);
assert!(bh.try_acquire(6).is_none());
let p2 = bh.try_acquire(5).unwrap();
assert_eq!(bh.available_permits.load(Ordering::SeqCst), 0);
permit.release();
p2.release();
assert_eq!(bh.available_permits.load(Ordering::SeqCst), 10);
}
#[test]
fn weighted_permit_zero_weight_allowed() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 10,
..Default::default()
});
let permit = bh.try_acquire(0).unwrap();
assert_eq!(bh.available_permits.load(Ordering::SeqCst), 10);
permit.release();
}
#[test]
fn enqueue_adds_to_queue() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 1,
max_queue: 10,
..Default::default()
});
let now = Time::from_millis(0);
let _p = bh.try_acquire(1).unwrap();
let entry_id = bh.enqueue(1, now).unwrap();
assert!(entry_id < 1000);
assert_eq!(bh.metrics().total_queued, 1);
}
#[test]
fn try_acquire_yields_to_queued_entry() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 1,
max_queue: 10,
queue_timeout: Duration::from_secs(60),
..Default::default()
});
let now = Time::from_millis(0);
let permit = bh.try_acquire(1).unwrap();
let entry_id = bh.enqueue(1, now).unwrap();
permit.release();
assert!(
bh.try_acquire(1).is_none(),
"queued entry should block barging try_acquire"
);
let granted_id = bh.process_queue(now);
assert_eq!(granted_id, Some(entry_id));
assert_eq!(
bh.available(),
0,
"queued entry should consume the released permit"
);
let claimed = bh.check_entry(entry_id, now).unwrap();
assert!(
claimed.is_some(),
"queued entry should be claimable after process_queue"
);
}
#[test]
fn try_acquire_observes_waiter_before_counter_publish() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 1,
max_queue: 10,
queue_timeout: Duration::from_secs(60),
..Default::default()
});
let now = Time::from_millis(0);
{
let mut queue = bh.queue.write();
queue.push_back(QueueEntry {
id: 42,
weight: 1,
enqueued_at_millis: now.as_millis(),
deadline_millis: now.as_millis().saturating_add(60_000),
result: None,
});
}
assert!(
bh.try_acquire(1).is_none(),
"direct acquisition must not barge ahead of a waiter that is already in the queue"
);
assert_eq!(
bh.available(),
1,
"failed barging attempt must not consume a permit"
);
}
#[test]
fn enqueue_rejects_when_queue_full() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 1,
max_queue: 2,
..Default::default()
});
let now = Time::from_millis(0);
let _p = bh.try_acquire(1).unwrap();
bh.enqueue(1, now).unwrap();
bh.enqueue(1, now).unwrap();
let result = bh.enqueue(1, now);
assert!(matches!(result, Err(BulkheadError::QueueFull)));
assert_eq!(bh.metrics().total_rejected, 1);
}
#[test]
fn process_queue_grants_when_permits_available() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 1,
max_queue: 10,
queue_timeout: Duration::from_secs(60),
..Default::default()
});
let now = Time::from_millis(0);
let p = bh.try_acquire(1).unwrap();
let entry_id = bh.enqueue(1, now).unwrap();
p.release();
let granted = bh.process_queue(now);
assert_eq!(granted, Some(entry_id));
}
#[test]
fn check_entry_returns_permit_when_granted() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 2,
max_queue: 10,
queue_timeout: Duration::from_secs(60),
..Default::default()
});
let now = Time::from_millis(0);
let p1 = bh.try_acquire(1).unwrap();
let _p2 = bh.try_acquire(1).unwrap();
let entry_id = bh.enqueue(1, now).unwrap();
let result = bh.check_entry(entry_id, now);
assert!(matches!(result, Ok(None)));
p1.release();
let result = bh.check_entry(entry_id, now);
assert!(matches!(result, Ok(Some(_))));
}
#[test]
fn queue_timeout_triggers_error() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 1,
max_queue: 10,
queue_timeout: Duration::from_millis(100),
..Default::default()
});
let now = Time::from_millis(0);
let _p = bh.try_acquire(1).unwrap();
let entry_id = bh.enqueue(1, now).unwrap();
let later = Time::from_millis(200);
let result = bh.check_entry(entry_id, later);
assert!(matches!(result, Err(BulkheadError::QueueTimeout { .. })));
assert_eq!(bh.metrics().total_timeout, 1);
}
#[test]
fn cancel_entry_triggers_cancellation() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 1,
max_queue: 10,
queue_timeout: Duration::from_secs(60),
..Default::default()
});
let now = Time::from_millis(0);
let _p = bh.try_acquire(1).unwrap();
let entry_id = bh.enqueue(1, now).unwrap();
bh.cancel_entry(entry_id, now);
assert_eq!(bh.metrics().total_cancelled, 1);
}
#[test]
fn metrics_track_active_permits() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 10,
..Default::default()
});
assert_eq!(bh.metrics().active_permits, 0);
let p1 = bh.try_acquire(1).unwrap();
assert_eq!(bh.metrics().active_permits, 1);
let p2 = bh.try_acquire(3).unwrap();
assert_eq!(bh.metrics().active_permits, 4);
p1.release();
assert_eq!(bh.metrics().active_permits, 3);
p2.release();
assert_eq!(bh.metrics().active_permits, 0);
}
#[test]
fn metrics_calculate_utilization() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 10,
..Default::default()
});
assert!((bh.metrics().utilization - 0.0).abs() < f64::EPSILON);
let p1 = bh.try_acquire(5).unwrap();
assert!((bh.metrics().utilization - 0.5).abs() < f64::EPSILON);
let p2 = bh.try_acquire(5).unwrap();
assert!((bh.metrics().utilization - 1.0).abs() < f64::EPSILON);
p1.release();
p2.release();
}
#[test]
fn metrics_initial_values() {
let bh = Bulkhead::new(BulkheadPolicy {
name: "test".into(),
max_concurrent: 5,
..Default::default()
});
let m = bh.metrics();
assert_eq!(m.active_permits, 0);
assert_eq!(m.queue_depth, 0);
assert_eq!(m.total_executed, 0);
assert_eq!(m.total_queued, 0);
assert_eq!(m.total_rejected, 0);
assert_eq!(m.total_timeout, 0);
assert_eq!(m.total_cancelled, 0);
assert!((m.avg_queue_wait_ms - 0.0).abs() < f64::EPSILON);
assert_eq!(m.max_queue_wait_ms, 0);
}
#[test]
fn registry_creates_named_bulkheads() {
let registry = BulkheadRegistry::new(BulkheadPolicy::default());
let bh1 = registry.get_or_create("service-a");
let bh2 = registry.get_or_create("service-b");
let bh3 = registry.get_or_create("service-a");
assert!(Arc::ptr_eq(&bh1, &bh3));
assert!(!Arc::ptr_eq(&bh1, &bh2));
}
#[test]
fn registry_uses_provided_name() {
let registry = BulkheadRegistry::new(BulkheadPolicy::default());
let bh = registry.get_or_create("my-service");
assert_eq!(bh.name(), "my-service");
}
#[test]
fn registry_custom_policy() {
let registry = BulkheadRegistry::new(BulkheadPolicy::default());
let bh = registry.get_or_create_with(
"custom",
BulkheadPolicy {
max_concurrent: 100,
max_queue: 500,
..Default::default()
},
);
assert_eq!(bh.available_permits.load(Ordering::SeqCst), 100);
}
#[test]
fn registry_all_metrics() {
let registry = BulkheadRegistry::new(BulkheadPolicy::default());
let bh1 = registry.get_or_create("db");
let bh2 = registry.get_or_create("api");
let _p1 = bh1.try_acquire(1);
let _p2 = bh2.try_acquire(3);
let all = registry.all_metrics();
assert_eq!(all.len(), 2);
assert_eq!(all.get("db").unwrap().active_permits, 1);
assert_eq!(all.get("api").unwrap().active_permits, 3);
}
#[test]
fn registry_remove() {
let registry = BulkheadRegistry::new(BulkheadPolicy::default());
let bh1 = registry.get_or_create("temp");
assert_eq!(registry.all_metrics().len(), 1);
let removed = registry.remove("temp");
assert!(removed.is_some());
assert!(Arc::ptr_eq(&bh1, &removed.unwrap()));
assert_eq!(registry.all_metrics().len(), 0);
assert!(registry.remove("nonexistent").is_none());
}
#[test]
fn concurrent_acquire_release_safe() {
use std::thread;
let bh = Arc::new(Bulkhead::new(BulkheadPolicy {
max_concurrent: 10,
..Default::default()
}));
let handles: Vec<_> = (0..100)
.map(|_| {
let bh = bh.clone();
thread::spawn(move || {
for _ in 0..100 {
if let Some(permit) = bh.try_acquire(1) {
std::thread::yield_now();
permit.release();
}
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(bh.available_permits.load(Ordering::SeqCst), 10);
}
#[test]
fn concurrent_never_exceeds_max() {
use std::sync::atomic::AtomicU32;
use std::thread;
let bh = Arc::new(Bulkhead::new(BulkheadPolicy {
max_concurrent: 5,
..Default::default()
}));
let current = Arc::new(AtomicU32::new(0));
let peak = Arc::new(AtomicU32::new(0));
let handles: Vec<_> = (0..50)
.map(|_| {
let bh = bh.clone();
let current = current.clone();
let peak = peak.clone();
thread::spawn(move || {
for _ in 0..20 {
if let Some(permit) = bh.try_acquire(1) {
let c = current.fetch_add(1, Ordering::SeqCst) + 1;
peak.fetch_max(c, Ordering::SeqCst);
std::thread::yield_now();
current.fetch_sub(1, Ordering::SeqCst);
permit.release();
}
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert!(peak.load(Ordering::SeqCst) <= 5);
}
#[test]
fn call_executes_and_records() {
let bh = Bulkhead::new(BulkheadPolicy::default());
let result = bh.call(|| Ok::<_, &str>(42));
assert_eq!(result.unwrap(), 42);
assert_eq!(bh.metrics().total_executed, 1);
}
#[test]
fn call_handles_inner_error() {
let bh = Bulkhead::new(BulkheadPolicy::default());
let result: Result<i32, BulkheadError<&str>> = bh.call(|| Err("error"));
assert!(matches!(result, Err(BulkheadError::Inner("error"))));
assert_eq!(
bh.metrics().total_executed,
1,
"inner-error calls still executed and should be counted"
);
}
#[test]
fn enqueue_with_huge_timeout_does_not_wrap_deadline() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 1,
max_queue: 2,
queue_timeout: Duration::MAX,
..Default::default()
});
let now = Time::MAX;
let _p = bh.try_acquire(1).unwrap();
let entry_id = bh.enqueue(1, now).unwrap();
let state = bh.check_entry(entry_id, now);
assert!(
matches!(state, Ok(None)),
"entry should remain pending at enqueue time even with huge timeout"
);
}
#[test]
fn call_rejects_when_full() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 1,
..Default::default()
});
let _p = bh.try_acquire(1).unwrap();
let result: Result<i32, BulkheadError<&str>> = bh.call(|| Ok(42));
assert!(matches!(result, Err(BulkheadError::Full)));
}
#[test]
fn call_releases_permit_on_panic() {
use std::panic::{AssertUnwindSafe, catch_unwind};
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 1,
..Default::default()
});
assert_eq!(bh.available(), 1);
let result = catch_unwind(AssertUnwindSafe(|| {
bh.call(|| -> Result<(), &str> { panic!("intentional test panic") })
}));
assert!(result.is_err());
assert_eq!(bh.available(), 1, "permit should be released after panic");
let permit = bh.try_acquire(1);
assert!(permit.is_some(), "should be able to acquire after panic");
}
#[test]
fn builder_creates_policy() {
let policy = BulkheadPolicyBuilder::new()
.name("test")
.max_concurrent(20)
.max_queue(50)
.queue_timeout(Duration::from_secs(30))
.weighted(true)
.build();
assert_eq!(policy.name, "test");
assert_eq!(policy.max_concurrent, 20);
assert_eq!(policy.max_queue, 50);
assert_eq!(policy.queue_timeout, Duration::from_secs(30));
assert!(policy.weighted);
}
#[test]
fn error_display() {
let full: BulkheadError<&str> = BulkheadError::Full;
assert!(full.to_string().contains("full"));
let queue_full: BulkheadError<&str> = BulkheadError::QueueFull;
assert!(queue_full.to_string().contains("queue full"));
let timeout: BulkheadError<&str> = BulkheadError::QueueTimeout {
waited: Duration::from_millis(500),
};
assert!(timeout.to_string().contains("timeout"));
let cancelled: BulkheadError<&str> = BulkheadError::Cancelled;
assert!(cancelled.to_string().contains("cancelled"));
let inner: BulkheadError<&str> = BulkheadError::Inner("inner error");
assert_eq!(inner.to_string(), "inner error");
}
#[test]
fn reset_restores_capacity() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 10,
..Default::default()
});
let _p1 = bh.try_acquire(5).unwrap();
let _p2 = bh.try_acquire(3).unwrap();
assert_eq!(bh.available(), 2);
bh.reset();
assert_eq!(bh.available(), 10);
}
#[test]
fn release_after_reset_does_not_exceed_max_concurrent() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 10,
..Default::default()
});
let p1 = bh.try_acquire(5).unwrap();
let p2 = bh.try_acquire(3).unwrap();
assert_eq!(bh.available(), 2);
bh.reset();
assert_eq!(bh.available(), 10);
p1.release();
assert_eq!(
bh.available(),
10,
"available_permits must be capped at max_concurrent after reset + release"
);
p2.release();
assert_eq!(
bh.available(),
10,
"available_permits must still be capped after second release"
);
let mut permits: Vec<BulkheadPermit> = Vec::new();
for _ in 0..10 {
permits.push(bh.try_acquire(1).unwrap());
}
assert!(
bh.try_acquire(1).is_none(),
"must reject 11th permit even after reset + release"
);
for p in permits {
p.release();
}
}
#[test]
fn cancel_entry_releases_permit_if_already_granted() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 1,
max_queue: 10,
..Default::default()
});
let now = Time::from_millis(0);
let p1 = bh.try_acquire(1).unwrap();
let entry_id = bh.enqueue(1, now).unwrap();
p1.release();
let granted_id = bh.process_queue(now);
assert_eq!(granted_id, Some(entry_id));
assert_eq!(bh.available(), 0);
bh.cancel_entry(entry_id, now);
assert_eq!(
bh.available(),
1,
"permit should be released upon cancellation of granted entry"
);
}
#[test]
fn cancel_removes_entry_from_queue() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 1,
max_queue: 10,
..Default::default()
});
let now = Time::from_millis(0);
let _p1 = bh.try_acquire(1).unwrap();
let entry_id = bh.enqueue(1, now).unwrap();
bh.cancel_entry(entry_id, now);
let metrics = bh.metrics();
assert_eq!(
metrics.queue_depth, 0,
"queue should be empty after cancellation"
);
for _ in 0..10 {
assert!(bh.enqueue(1, now).is_ok());
}
}
#[test]
fn cancel_removes_timed_out_entry_from_queue() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 1,
max_queue: 10,
queue_timeout: Duration::from_millis(1),
..Default::default()
});
let now = Time::from_millis(0);
let _p1 = bh.try_acquire(1).unwrap();
let entry_id = bh.enqueue(1, now).unwrap();
let later = Time::from_millis(100);
bh.process_queue(later);
bh.cancel_entry(entry_id, now);
let metrics = bh.metrics();
assert_eq!(
metrics.queue_depth, 0,
"queue should be empty after cancellation of timed-out entry"
);
for _ in 0..10 {
assert!(bh.enqueue(1, now).is_ok());
}
}
#[test]
fn cancelled_entry_behind_granted_zombie_frees_queue_slot() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 1,
max_queue: 2,
..Default::default()
});
let now = Time::from_millis(0);
let p1 = bh.try_acquire(1).unwrap();
let id1 = bh.enqueue(1, now).unwrap();
let id2 = bh.enqueue(1, now).unwrap();
p1.release();
assert_eq!(bh.process_queue(now), Some(id1));
bh.cancel_entry(id2, now);
assert!(
bh.enqueue(1, now).is_ok(),
"actively cancelled entry must stop occupying queue capacity behind a granted zombie"
);
}
#[test]
fn zombies_fill_queue() {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 1,
max_queue: 2,
queue_timeout: Duration::from_secs(60),
..Default::default()
});
let now = Time::from_millis(0);
let p1 = bh.try_acquire(1).unwrap();
let id1 = bh.enqueue(1, now).unwrap();
let _id2 = bh.enqueue(1, now).unwrap();
p1.release();
let granted = bh.process_queue(now);
assert_eq!(granted, Some(id1));
let result = bh.enqueue(1, now);
assert!(
matches!(result, Err(BulkheadError::QueueFull)),
"Zombies should count towards queue limit"
);
let permit = bh.check_entry(id1, now).unwrap().unwrap();
permit.release();
bh.enqueue(1, now).expect("enqueue should succeed");
}
#[derive(Debug, PartialEq, Eq)]
struct QueueAdmissionRun {
granted_positions: Vec<usize>,
queue_depth: u32,
available: u32,
queued_executed: u64,
total_cancelled: u64,
}
fn run_queue_admission(cancelled: &[bool], filter_cancelled: bool) -> QueueAdmissionRun {
let bh = Bulkhead::new(BulkheadPolicy {
max_concurrent: 1,
max_queue: cancelled.len() as u32 + 1,
queue_timeout: Duration::from_secs(60),
..Default::default()
});
let now = Time::from_millis(0);
let held = bh.try_acquire(1).expect("seed permit must exist");
let mut entries = Vec::new();
for (position, &is_cancelled) in cancelled.iter().enumerate() {
if filter_cancelled && is_cancelled {
continue;
}
let entry_id = bh.enqueue(1, now).expect("queue should have capacity");
entries.push((position, entry_id, is_cancelled));
}
if !filter_cancelled {
for &(_, entry_id, is_cancelled) in &entries {
if is_cancelled {
bh.cancel_entry(entry_id, now);
}
}
}
held.release();
let mut granted_positions = Vec::new();
for &(position, entry_id, is_cancelled) in &entries {
if !filter_cancelled && is_cancelled {
assert!(
matches!(bh.check_entry(entry_id, now), Err(BulkheadError::Cancelled)),
"cancelled entry should not remain claimable"
);
continue;
}
let permit = bh
.check_entry(entry_id, now)
.expect("survivor should not reject")
.expect("survivor should be granted in FIFO order");
granted_positions.push(position);
permit.release();
}
let metrics = bh.metrics();
QueueAdmissionRun {
granted_positions,
queue_depth: metrics.queue_depth,
available: bh.available(),
queued_executed: metrics.total_executed.saturating_sub(1),
total_cancelled: metrics.total_cancelled,
}
}
proptest! {
#[test]
fn metamorphic_queue_cancellation_matches_filtered_admission(cancelled in prop::collection::vec(any::<bool>(), 1..12)) {
let cancelled_run = run_queue_admission(&cancelled, false);
let filtered_run = run_queue_admission(&cancelled, true);
let expected_positions = cancelled
.iter()
.enumerate()
.filter_map(|(position, &is_cancelled)| (!is_cancelled).then_some(position))
.collect::<Vec<_>>();
let survivor_count = expected_positions.len() as u64;
let cancelled_count = cancelled.iter().filter(|&&is_cancelled| is_cancelled).count() as u64;
prop_assert_eq!(&cancelled_run.granted_positions, &expected_positions);
prop_assert_eq!(&filtered_run.granted_positions, &expected_positions);
prop_assert_eq!(
&cancelled_run.granted_positions,
&filtered_run.granted_positions
);
prop_assert_eq!(cancelled_run.queue_depth, 0);
prop_assert_eq!(filtered_run.queue_depth, 0);
prop_assert_eq!(cancelled_run.available, 1);
prop_assert_eq!(filtered_run.available, 1);
prop_assert_eq!(cancelled_run.queued_executed, survivor_count);
prop_assert_eq!(filtered_run.queued_executed, survivor_count);
prop_assert_eq!(cancelled_run.total_cancelled, cancelled_count);
prop_assert_eq!(filtered_run.total_cancelled, 0);
}
}
#[test]
fn conf_isolation_independent_bulkheads_isolated_overflow() {
let bh_critical = Arc::new(Bulkhead::new(BulkheadPolicy {
name: "critical-service".into(),
max_concurrent: 2,
max_queue: 1,
..Default::default()
}));
let bh_batch = Arc::new(Bulkhead::new(BulkheadPolicy {
name: "batch-service".into(),
max_concurrent: 1,
max_queue: 1,
..Default::default()
}));
let now = Time::from_millis(0);
let _batch_p1 = bh_batch.try_acquire(1).unwrap();
let _batch_queue_id = bh_batch.enqueue(1, now).unwrap();
let batch_overflow = bh_batch.enqueue(1, now);
assert!(matches!(batch_overflow, Err(BulkheadError::QueueFull)));
assert_eq!(bh_critical.available(), 2);
let crit_p1 = bh_critical.try_acquire(1);
assert!(
crit_p1.is_some(),
"critical service should be unaffected by batch overflow"
);
let crit_p2 = bh_critical.try_acquire(1);
assert!(
crit_p2.is_some(),
"critical service should maintain full capacity"
);
let batch_metrics = bh_batch.metrics();
let critical_metrics = bh_critical.metrics();
assert_eq!(batch_metrics.total_rejected, 1);
assert_eq!(critical_metrics.total_rejected, 0);
assert_eq!(critical_metrics.active_permits, 2);
assert_eq!(batch_metrics.active_permits, 1);
}
#[test]
fn conf_isolation_concurrent_overflows_independent() {
let bh_db = Arc::new(Bulkhead::new(BulkheadPolicy {
name: "database".into(),
max_concurrent: 1,
max_queue: 2,
..Default::default()
}));
let bh_api = Arc::new(Bulkhead::new(BulkheadPolicy {
name: "external-api".into(),
max_concurrent: 1,
max_queue: 1,
..Default::default()
}));
let now = Time::from_millis(0);
let _db_p = bh_db.try_acquire(1).unwrap();
let _api_p = bh_api.try_acquire(1).unwrap();
let _db_q1 = bh_db.enqueue(1, now).unwrap();
let _db_q2 = bh_db.enqueue(1, now).unwrap();
let _api_q1 = bh_api.enqueue(1, now).unwrap();
let db_overflow = bh_db.enqueue(1, now);
let api_overflow = bh_api.enqueue(1, now);
assert!(matches!(db_overflow, Err(BulkheadError::QueueFull)));
assert!(matches!(api_overflow, Err(BulkheadError::QueueFull)));
assert_eq!(bh_db.metrics().total_rejected, 1);
assert_eq!(bh_api.metrics().total_rejected, 1);
assert_eq!(bh_db.metrics().queue_depth, 2);
assert_eq!(bh_api.metrics().queue_depth, 1);
let third_service = Bulkhead::new(BulkheadPolicy {
name: "unaffected".into(),
max_concurrent: 5,
..Default::default()
});
let p1 = third_service.try_acquire(3);
assert!(
p1.is_some(),
"unaffected service should work during other overflows"
);
assert_eq!(third_service.available(), 2);
}
#[test]
fn conf_isolation_overflow_recovery_isolated() {
let bh_overloaded = Arc::new(Bulkhead::new(BulkheadPolicy {
name: "overloaded".into(),
max_concurrent: 1,
max_queue: 2,
..Default::default()
}));
let bh_stable = Arc::new(Bulkhead::new(BulkheadPolicy {
name: "stable".into(),
max_concurrent: 2,
max_queue: 5,
..Default::default()
}));
let now = Time::from_millis(0);
let overloaded_permit = bh_overloaded.try_acquire(1).unwrap();
let _q1 = bh_overloaded.enqueue(1, now).unwrap();
let _q2 = bh_overloaded.enqueue(1, now).unwrap();
let overflow = bh_overloaded.enqueue(1, now);
assert!(matches!(overflow, Err(BulkheadError::QueueFull)));
let _stable_p1 = bh_stable.try_acquire(1).unwrap();
let _stable_q1 = bh_stable.enqueue(1, now).unwrap();
assert_eq!(bh_stable.available(), 1);
overloaded_permit.release();
let _granted = bh_overloaded.process_queue(now);
assert_eq!(
bh_stable.available(),
1,
"stable service should be unaffected by recovery"
);
assert_eq!(bh_stable.metrics().active_permits, 1);
assert_eq!(bh_stable.metrics().queue_depth, 1);
let new_stable = bh_stable.try_acquire(1);
assert!(
new_stable.is_some(),
"stable service should remain functional"
);
let overloaded_available = bh_overloaded.available();
assert_eq!(
overloaded_available, 0,
"overloaded service should have granted queued request"
);
}
#[test]
fn conf_isolation_weighted_permits_overflow_isolation() {
let bh_heavy = Arc::new(Bulkhead::new(BulkheadPolicy {
name: "heavy-ops".into(),
max_concurrent: 10,
max_queue: 3,
weighted: true,
..Default::default()
}));
let bh_light = Arc::new(Bulkhead::new(BulkheadPolicy {
name: "light-ops".into(),
max_concurrent: 5,
max_queue: 3,
weighted: true,
..Default::default()
}));
let now = Time::from_millis(0);
let _heavy_p1 = bh_heavy.try_acquire(8).unwrap(); let _heavy_q1 = bh_heavy.enqueue(2, now).unwrap(); let _heavy_q2 = bh_heavy.enqueue(1, now).unwrap();
let heavy_overflow = bh_heavy.enqueue(3, now);
assert!(matches!(heavy_overflow, Err(BulkheadError::QueueFull)));
let light_p1 = bh_light.try_acquire(3);
assert!(
light_p1.is_some(),
"light service should work despite heavy overflow"
);
assert_eq!(bh_light.available(), 2);
let _light_q1 = bh_light.enqueue(2, now).unwrap();
assert_eq!(bh_light.metrics().queue_depth, 1);
assert_eq!(bh_heavy.metrics().total_rejected, 1);
assert_eq!(bh_light.metrics().total_rejected, 0);
assert!(bh_heavy.metrics().utilization > 0.8); assert!(bh_light.metrics().utilization > 0.5); }
#[test]
fn conf_isolation_timeout_overflow_isolation() {
let bh_fast = Arc::new(Bulkhead::new(BulkheadPolicy {
name: "fast-timeout".into(),
max_concurrent: 1,
max_queue: 2,
queue_timeout: Duration::from_millis(10),
..Default::default()
}));
let bh_slow = Arc::new(Bulkhead::new(BulkheadPolicy {
name: "slow-timeout".into(),
max_concurrent: 1,
max_queue: 2,
queue_timeout: Duration::from_secs(60),
..Default::default()
}));
let now = Time::from_millis(0);
let _fast_p = bh_fast.try_acquire(1).unwrap();
let fast_q1 = bh_fast.enqueue(1, now).unwrap();
let _slow_p = bh_slow.try_acquire(1).unwrap();
let slow_q1 = bh_slow.enqueue(1, now).unwrap();
let later = Time::from_millis(50);
bh_fast.process_queue(later);
let fast_result = bh_fast.check_entry(fast_q1, later);
assert!(matches!(
fast_result,
Err(BulkheadError::QueueTimeout { .. })
));
assert_eq!(bh_fast.metrics().total_timeout, 1);
let slow_result = bh_slow.check_entry(slow_q1, later);
assert!(
matches!(slow_result, Ok(None)),
"slow bulkhead should still be waiting"
);
assert_eq!(bh_slow.metrics().total_timeout, 0);
assert_eq!(bh_slow.metrics().queue_depth, 1);
let fast_can_enqueue = bh_fast.enqueue(1, later);
assert!(
fast_can_enqueue.is_ok(),
"fast bulkhead should recover queue capacity after timeout"
);
let slow_metrics = bh_slow.metrics();
assert_eq!(
slow_metrics.queue_depth, 1,
"slow bulkhead queue unchanged by fast timeout"
);
}
#[test]
fn bulkhead_saturation_queues_and_recovers_under_lab_runtime() {
init_test("bulkhead_saturation_queues_and_recovers_under_lab_runtime");
let config = TestConfig::new()
.with_seed(0xB011_CE11)
.with_tracing(true)
.with_max_steps(20_000);
let mut runtime = LabRuntimeTarget::create_runtime(config);
let bulkhead = Arc::new(Bulkhead::new(BulkheadPolicy {
name: "lab-bulkhead".into(),
max_concurrent: 1,
max_queue: 1,
queue_timeout: Duration::from_secs(1),
..Default::default()
}));
let checkpoints = Arc::new(Mutex::new(Vec::<Value>::new()));
let (queue_entry_id, checkpoints, final_metrics, final_available) =
LabRuntimeTarget::block_on(&mut runtime, async move {
let cx = Cx::current().expect("lab runtime should install a current Cx");
let holder_spawn_cx = cx.clone();
let waiter_spawn_cx = cx.clone();
let holder_bulkhead = Arc::clone(&bulkhead);
let holder_checkpoints = Arc::clone(&checkpoints);
let holder_task_cx = holder_spawn_cx.clone();
let holder =
LabRuntimeTarget::spawn(&holder_spawn_cx, Budget::INFINITE, async move {
let permit = holder_bulkhead
.try_acquire(1)
.expect("holder should acquire the only permit");
let acquired = serde_json::json!({
"phase": "holder_acquired",
"available": holder_bulkhead.available(),
});
tracing::info!(event = %acquired, "bulkhead_lab_checkpoint");
holder_checkpoints.lock().unwrap().push(acquired);
yield_now().await;
yield_now().await;
permit.release();
let released = serde_json::json!({
"phase": "holder_released",
"available": holder_bulkhead.available(),
"time_ns": holder_task_cx.now().as_nanos(),
});
tracing::info!(event = %released, "bulkhead_lab_checkpoint");
holder_checkpoints.lock().unwrap().push(released);
});
yield_now().await;
let waiter_bulkhead = Arc::clone(&bulkhead);
let waiter_checkpoints = Arc::clone(&checkpoints);
let waiter_task_cx = waiter_spawn_cx.clone();
let waiter =
LabRuntimeTarget::spawn(&waiter_spawn_cx, Budget::INFINITE, async move {
let entry_id = waiter_bulkhead
.enqueue(1, waiter_task_cx.now())
.expect("waiter should enqueue while saturated");
let enqueued = serde_json::json!({
"phase": "waiter_enqueued",
"entry_id": entry_id,
"queue_depth": waiter_bulkhead.metrics().queue_depth,
});
tracing::info!(event = %enqueued, "bulkhead_lab_checkpoint");
waiter_checkpoints.lock().unwrap().push(enqueued);
let permit = loop {
match waiter_bulkhead.check_entry(entry_id, waiter_task_cx.now()) {
Ok(Some(permit)) => break permit,
Ok(None) => yield_now().await,
other => panic!("waiter queue check failed: {other:?}"),
}
};
let granted = serde_json::json!({
"phase": "waiter_granted",
"entry_id": entry_id,
"available": waiter_bulkhead.available(),
});
tracing::info!(event = %granted, "bulkhead_lab_checkpoint");
waiter_checkpoints.lock().unwrap().push(granted);
permit.release();
let released = serde_json::json!({
"phase": "waiter_released",
"entry_id": entry_id,
"available": waiter_bulkhead.available(),
});
tracing::info!(event = %released, "bulkhead_lab_checkpoint");
waiter_checkpoints.lock().unwrap().push(released);
entry_id
});
let holder_outcome = holder.await;
crate::assert_with_log!(
matches!(holder_outcome, crate::types::Outcome::Ok(())),
"holder task completes successfully",
true,
matches!(holder_outcome, crate::types::Outcome::Ok(()))
);
let waiter_outcome = waiter.await;
crate::assert_with_log!(
matches!(waiter_outcome, crate::types::Outcome::Ok(_)),
"waiter task completes successfully",
true,
matches!(waiter_outcome, crate::types::Outcome::Ok(_))
);
let crate::types::Outcome::Ok(queue_entry_id) = waiter_outcome else {
panic!("waiter task should finish successfully");
};
(
queue_entry_id,
checkpoints.lock().unwrap().clone(),
bulkhead.metrics(),
bulkhead.available(),
)
});
assert_eq!(queue_entry_id, 0);
assert_eq!(final_metrics.active_permits, 0);
assert_eq!(final_metrics.queue_depth, 0);
assert_eq!(final_metrics.total_executed, 2);
assert_eq!(final_metrics.total_queued, 1);
assert_eq!(final_metrics.total_rejected, 0);
assert_eq!(final_metrics.total_timeout, 0);
assert_eq!(final_metrics.total_cancelled, 0);
assert_eq!(final_available, 1);
assert!(
checkpoints
.iter()
.any(|event| event["phase"] == "holder_acquired"),
"holder acquisition checkpoint should be recorded"
);
assert!(
checkpoints
.iter()
.any(|event| event["phase"] == "waiter_enqueued"),
"waiter enqueue checkpoint should be recorded"
);
assert!(
checkpoints
.iter()
.any(|event| event["phase"] == "waiter_granted"),
"waiter grant checkpoint should be recorded"
);
let violations = runtime.oracles.check_all(runtime.now());
assert!(
violations.is_empty(),
"bulkhead lab-runtime saturation test should leave runtime invariants clean: {violations:?}"
);
}
}