#![allow(clippy::all)]
#![allow(dead_code)]
use crate::combinator::bulkhead::{Bulkhead, BulkheadError, BulkheadPolicy};
use crate::types::Time;
use crate::util::DetRng;
use proptest::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::time::Duration;
#[derive(Debug, Clone)]
struct TestWorkUnit {
id: u64,
weight: u32,
processing_time_ms: u64,
should_cancel: bool,
priority: u32,
}
impl TestWorkUnit {
fn new(id: u64, weight: u32, processing_time_ms: u64) -> Self {
Self {
id,
weight,
processing_time_ms,
should_cancel: false,
priority: 0,
}
}
fn with_priority(mut self, priority: u32) -> Self {
self.priority = priority;
self
}
fn with_cancel(mut self) -> Self {
self.should_cancel = true;
self
}
}
#[derive(Debug, Default)]
struct GlobalBulkheadState {
processed_per_bulkhead: parking_lot::Mutex<HashMap<String, Vec<u64>>>,
rejected_per_bulkhead: parking_lot::Mutex<HashMap<String, Vec<u64>>>,
cancelled_per_bulkhead: parking_lot::Mutex<HashMap<String, Vec<u64>>>,
total_acquisitions: AtomicU64,
total_releases: AtomicU64,
peak_concurrent_workers: AtomicU32,
contamination_events: AtomicU32,
queue_overflow_events: AtomicU32,
cancellation_events: AtomicU32,
}
impl GlobalBulkheadState {
fn new() -> Arc<Self> {
Arc::new(Self::default())
}
fn record_processed(&self, bulkhead_name: &str, work_id: u64) {
self.processed_per_bulkhead
.lock()
.entry(bulkhead_name.to_string())
.or_default()
.push(work_id);
}
fn record_rejected(&self, bulkhead_name: &str, work_id: u64) {
self.rejected_per_bulkhead
.lock()
.entry(bulkhead_name.to_string())
.or_default()
.push(work_id);
self.queue_overflow_events.fetch_add(1, Ordering::SeqCst);
}
fn record_cancelled(&self, bulkhead_name: &str, work_id: u64) {
self.cancelled_per_bulkhead
.lock()
.entry(bulkhead_name.to_string())
.or_default()
.push(work_id);
self.cancellation_events.fetch_add(1, Ordering::SeqCst);
}
fn record_acquisition(&self) {
self.total_acquisitions.fetch_add(1, Ordering::SeqCst);
}
fn record_release(&self) {
self.total_releases.fetch_add(1, Ordering::SeqCst);
}
fn record_contamination(&self) {
self.contamination_events.fetch_add(1, Ordering::SeqCst);
}
fn update_peak_workers(&self, current: u32) {
let mut peak = self.peak_concurrent_workers.load(Ordering::SeqCst);
while current > peak {
match self.peak_concurrent_workers.compare_exchange_weak(
peak,
current,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break,
Err(actual) => peak = actual,
}
}
}
fn verify_isolation(&self) -> bool {
self.contamination_events.load(Ordering::SeqCst) == 0
}
fn total_processed(&self) -> usize {
self.processed_per_bulkhead
.lock()
.values()
.map(|v| v.len())
.sum()
}
fn total_rejected(&self) -> usize {
self.rejected_per_bulkhead
.lock()
.values()
.map(|v| v.len())
.sum()
}
fn bulkhead_stats(&self, name: &str) -> BulkheadStats {
let processed = self
.processed_per_bulkhead
.lock()
.get(name)
.map_or(0, |v| v.len());
let rejected = self
.rejected_per_bulkhead
.lock()
.get(name)
.map_or(0, |v| v.len());
let cancelled = self
.cancelled_per_bulkhead
.lock()
.get(name)
.map_or(0, |v| v.len());
BulkheadStats {
processed,
rejected,
cancelled,
}
}
}
#[derive(Debug, Clone, PartialEq)]
struct BulkheadStats {
processed: usize,
rejected: usize,
cancelled: usize,
}
#[derive(Debug, Clone)]
struct BurstTestConfig {
bulkhead_count: u32,
workers_per_bulkhead: u32,
queue_size_per_bulkhead: u32,
total_work_units: u32,
work_pattern: WorkPattern,
cancellation_ratio: f32,
}
#[derive(Debug, Clone)]
enum WorkPattern {
Uniform,
Burst { burst_ratio: f32 },
Random { seed: u64 },
}
const MAX_BULKHEADS: u32 = 8;
const MAX_WORKERS_PER_BULKHEAD: u32 = 16;
const MAX_WORK_UNITS: u32 = 200;
const MAX_QUEUE_SIZE: u32 = 50;
fn mr1_isolation_invariant(
bulkhead_count: u32,
workers_per_bulkhead: u32,
total_work_units: u32,
_seed: u64,
) -> bool {
let global_state = GlobalBulkheadState::new();
let mut bulkheads: Vec<(String, Arc<Bulkhead>)> = Vec::new();
for i in 0..bulkhead_count {
let name = format!("bulkhead_{}", i);
let policy = BulkheadPolicy {
name: name.clone(),
max_concurrent: workers_per_bulkhead,
max_queue: MAX_QUEUE_SIZE,
queue_timeout: Duration::from_millis(100),
weighted: false,
on_full: None,
};
let bulkhead = Arc::new(Bulkhead::new(policy));
bulkheads.push((name, bulkhead));
}
let _rng = DetRng::new(_seed);
let mut work_assignments: HashMap<String, Vec<TestWorkUnit>> = HashMap::new();
for work_id in 0..total_work_units {
let target_bulkhead = work_id as usize % bulkheads.len();
let bulkhead_name = &bulkheads[target_bulkhead].0;
let work_unit = TestWorkUnit::new(work_id as u64, 1, 10);
work_assignments
.entry(bulkhead_name.clone())
.or_default()
.push(work_unit);
}
for (bulkhead_name, bulkhead) in &bulkheads {
if let Some(work_units) = work_assignments.get(bulkhead_name) {
for work_unit in work_units {
match bulkhead.try_acquire(work_unit.weight) {
Some(permit) => {
global_state.record_acquisition();
global_state.record_processed(bulkhead_name, work_unit.id);
if !bulkhead_name
.contains(&format!("_{}", work_unit.id % bulkhead_count as u64))
{
}
global_state.record_release();
permit.release();
}
None => {
global_state.record_rejected(bulkhead_name, work_unit.id);
}
}
}
}
}
let isolation_maintained = global_state.verify_isolation();
let mut all_processed_ids = std::collections::HashSet::new();
for (bulkhead_name, _) in &bulkheads {
let processed = global_state.processed_per_bulkhead.lock();
if let Some(ids) = processed.get(bulkhead_name) {
for &id in ids {
if !all_processed_ids.insert(id) {
return false;
}
}
}
}
crate::assert_with_log!(
isolation_maintained,
"MR1: Isolation invariant maintained",
true,
isolation_maintained
);
isolation_maintained
}
fn mr2_rejection_accuracy(
max_workers: u32,
queue_size: u32,
work_burst_size: u32,
_seed: u64,
) -> bool {
let global_state = GlobalBulkheadState::new();
let policy = BulkheadPolicy {
name: "test_bulkhead".to_string(),
max_concurrent: max_workers,
max_queue: queue_size,
queue_timeout: Duration::from_millis(50),
weighted: false,
on_full: None,
};
let bulkhead = Arc::new(Bulkhead::new(policy));
let total_capacity = max_workers + queue_size;
let expected_rejections = work_burst_size.saturating_sub(total_capacity);
let mut acquired_permits = Vec::new();
let mut actual_rejections = 0;
for work_id in 0..work_burst_size {
match bulkhead.try_acquire(1) {
Some(permit) => {
global_state.record_acquisition();
acquired_permits.push(permit);
}
None => {
let now = Time::from_millis(0);
match bulkhead.enqueue(1, now) {
Ok(_entry_id) => {
}
Err(BulkheadError::Full) => {
actual_rejections += 1;
global_state.record_rejected("test_bulkhead", work_id as u64);
}
Err(_other) => {
}
}
}
}
}
let metrics = bulkhead.metrics();
let metrics_rejections = metrics.total_rejected;
let accuracy_maintained = actual_rejections == expected_rejections as u32
&& metrics_rejections == actual_rejections as u64;
crate::assert_with_log!(
accuracy_maintained,
"MR2: Rejection accuracy maintained",
true,
accuracy_maintained
);
accuracy_maintained
}
fn mr3_cancel_propagation(worker_count: u32, in_flight_count: u32, _seed: u64) -> bool {
let global_state = GlobalBulkheadState::new();
let policy = BulkheadPolicy {
name: "cancel_test".to_string(),
max_concurrent: worker_count,
max_queue: in_flight_count,
queue_timeout: Duration::from_millis(1000),
weighted: false,
on_full: None,
};
let bulkhead = Arc::new(Bulkhead::new(policy));
let cancel_flag = Arc::new(AtomicBool::new(false));
let mut permits = Vec::new();
let mut queue_entries = Vec::new();
for _work_id in 0..worker_count {
if let Some(permit) = bulkhead.try_acquire(1) {
global_state.record_acquisition();
permits.push(permit);
}
}
let now = Time::from_millis(0);
for _work_id in worker_count..worker_count + in_flight_count {
match bulkhead.enqueue(1, now) {
Ok(entry_id) => {
queue_entries.push(entry_id);
}
Err(_) => {
break;
}
}
}
let initial_metrics = bulkhead.metrics();
let _initial_active = initial_metrics.active_permits;
let initial_queued = initial_metrics.queue_depth;
cancel_flag.store(true, Ordering::SeqCst);
for entry_id in &queue_entries {
bulkhead.cancel_entry(*entry_id, now);
global_state.record_cancelled("cancel_test", *entry_id);
}
let _processed_after_cancel = bulkhead.process_queue(now);
let final_metrics = bulkhead.metrics();
let cancellations_processed = global_state.cancellation_events.load(Ordering::SeqCst) > 0;
let queue_reduced = final_metrics.queue_depth <= initial_queued;
let propagation_correct = cancellations_processed && queue_reduced;
crate::assert_with_log!(
propagation_correct,
"MR3: Cancel propagation correct",
true,
propagation_correct
);
for permit in permits {
permit.release();
}
propagation_correct
}
fn mr4_metrics_accuracy(
worker_count: u32,
operation_count: u32,
concurrency_level: u32,
_seed: u64,
) -> bool {
let global_state = GlobalBulkheadState::new();
let policy = BulkheadPolicy {
name: "metrics_test".to_string(),
max_concurrent: worker_count,
max_queue: operation_count,
queue_timeout: Duration::from_millis(100),
weighted: false,
on_full: None,
};
let bulkhead = Arc::new(Bulkhead::new(policy));
let mut executed_count = 0;
let mut rejected_count = 0;
let mut permits = Vec::new();
for op_id in 0..operation_count {
match bulkhead.try_acquire(1) {
Some(permit) => {
executed_count += 1;
global_state.record_acquisition();
permits.push(permit);
if permits.len() >= concurrency_level as usize {
if let Some(old_permit) = permits.pop() {
old_permit.release();
global_state.record_release();
}
}
}
None => {
rejected_count += 1;
global_state.record_rejected("metrics_test", op_id as u64);
}
}
}
for permit in permits {
permit.release();
global_state.record_release();
}
let final_metrics = bulkhead.metrics();
let global_acquisitions = global_state.total_acquisitions.load(Ordering::SeqCst);
let global_releases = global_state.total_releases.load(Ordering::SeqCst);
let executed_matches = final_metrics.total_executed == executed_count as u64;
let rejected_matches = final_metrics.total_rejected == rejected_count as u64;
let permit_balance = global_acquisitions == global_releases; let final_permits_correct = final_metrics.active_permits == 0;
let accuracy_maintained =
executed_matches && rejected_matches && permit_balance && final_permits_correct;
crate::assert_with_log!(
accuracy_maintained,
"MR4: Metrics accuracy maintained",
true,
accuracy_maintained
);
accuracy_maintained
}
fn mr5_deterministic_behavior(
worker_count: u32,
work_sequence: Vec<u32>, _seed: u64,
) -> bool {
let result1 = run_deterministic_sequence(worker_count, &work_sequence, _seed);
let result2 = run_deterministic_sequence(worker_count, &work_sequence, _seed);
let determinism_maintained = result1 == result2;
crate::assert_with_log!(
determinism_maintained,
"MR5: Deterministic behavior maintained",
true,
determinism_maintained
);
determinism_maintained
}
fn run_deterministic_sequence(
worker_count: u32,
work_sequence: &[u32],
_seed: u64,
) -> DeterministicResult {
let _global_state = GlobalBulkheadState::new();
let _rng = DetRng::new(_seed);
let policy = BulkheadPolicy {
name: "deterministic_test".to_string(),
max_concurrent: worker_count,
max_queue: 20,
queue_timeout: Duration::from_millis(100),
weighted: true,
on_full: None,
};
let bulkhead = Arc::new(Bulkhead::new(policy));
let mut results = Vec::new();
let mut permits = Vec::new();
for (i, &weight) in work_sequence.iter().enumerate() {
let op_result = match bulkhead.try_acquire(weight) {
Some(permit) => {
permits.push(permit);
OperationResult::Acquired { weight }
}
None => OperationResult::Rejected { weight },
};
results.push(op_result);
if i % 3 == 2 && !permits.is_empty() {
permits.remove(0).release();
}
}
for permit in permits {
permit.release();
}
let final_metrics = bulkhead.metrics();
DeterministicResult {
operations: results,
final_executed: final_metrics.total_executed,
final_rejected: final_metrics.total_rejected,
final_active: final_metrics.active_permits,
}
}
#[derive(Debug, Clone, PartialEq)]
struct DeterministicResult {
operations: Vec<OperationResult>,
final_executed: u64,
final_rejected: u64,
final_active: u32,
}
#[derive(Debug, Clone, PartialEq)]
enum OperationResult {
Acquired { weight: u32 },
Rejected { weight: u32 },
}
#[cfg(test)]
mod tests {
use super::*;
proptest! {
#[test]
fn test_mr1_isolation_invariant(
bulkhead_count in 1u32..=4,
workers_per_bulkhead in 1u32..=8,
total_work_units in 10u32..=40,
seed in any::<u64>(),
) {
prop_assert!(mr1_isolation_invariant(
bulkhead_count,
workers_per_bulkhead,
total_work_units,
seed
));
}
#[test]
fn test_mr2_rejection_accuracy(
max_workers in 1u32..=8,
queue_size in 1u32..=10,
work_burst_size in 1u32..=30,
seed in any::<u64>(),
) {
prop_assert!(mr2_rejection_accuracy(
max_workers,
queue_size,
work_burst_size,
seed
));
}
#[test]
fn test_mr3_cancel_propagation(
worker_count in 1u32..=6,
in_flight_count in 1u32..=12,
seed in any::<u64>(),
) {
prop_assert!(mr3_cancel_propagation(
worker_count,
in_flight_count,
seed
));
}
#[test]
fn test_mr4_metrics_accuracy(
worker_count in 1u32..=8,
operation_count in 5u32..=25,
concurrency_level in 1u32..=5,
seed in any::<u64>(),
) {
prop_assert!(mr4_metrics_accuracy(
worker_count,
operation_count,
concurrency_level,
seed
));
}
#[test]
fn test_mr5_deterministic_behavior(
worker_count in 1u32..=6,
work_sequence in prop::collection::vec(1u32..=3, 5..15),
seed in any::<u64>(),
) {
prop_assert!(mr5_deterministic_behavior(
worker_count,
work_sequence,
seed
));
}
}
#[test]
fn test_bulkhead_metamorphic_integration() {
let _global_state = GlobalBulkheadState::new();
let bulkhead_count = 3;
let workers_per_bulkhead = 4;
let total_work = 50;
let seed = 12345;
assert!(mr1_isolation_invariant(
bulkhead_count,
workers_per_bulkhead,
total_work,
seed
));
assert!(mr2_rejection_accuracy(4, 8, 20, seed));
assert!(mr3_cancel_propagation(6, 10, seed));
assert!(mr4_metrics_accuracy(5, 20, 3, seed));
assert!(mr5_deterministic_behavior(4, vec![1, 2, 1, 3, 1, 2], seed));
}
#[test]
fn test_bulkhead_edge_cases() {
assert!(mr1_isolation_invariant(1, 1, 2, 1111));
assert!(mr2_rejection_accuracy(1, 1, 5, 2222));
assert!(mr1_isolation_invariant(4, 8, 40, 3333));
assert!(mr2_rejection_accuracy(8, 10, 30, 4444));
assert!(mr5_deterministic_behavior(2, vec![], 5555));
assert!(mr5_deterministic_behavior(3, vec![1], 6666));
assert!(mr2_rejection_accuracy(1, 0, 10, 7777)); }
#[test]
fn test_burst_load_patterns() {
let config = BurstTestConfig {
bulkhead_count: 2,
workers_per_bulkhead: 3,
queue_size_per_bulkhead: 5,
total_work_units: 20,
work_pattern: WorkPattern::Burst { burst_ratio: 0.8 },
cancellation_ratio: 0.1,
};
assert!(mr1_isolation_invariant(
config.bulkhead_count,
config.workers_per_bulkhead,
config.total_work_units,
8888
));
}
#[test]
fn test_cancellation_scenarios() {
assert!(mr3_cancel_propagation(4, 8, 9999));
assert!(mr3_cancel_propagation(6, 2, 1010));
assert!(mr3_cancel_propagation(1, 5, 1212));
}
}