use super::protocol_state_machines::{
CancelProtocolValidator, CancelStateMachine, ObligationContext, ObligationEvent,
ObligationStateMachine, RegionContext, RegionEvent, RegionStateMachine, TaskContext, TaskEvent,
TaskState, TaskStateMachine, TransitionResult, ValidationLevel,
};
use crate::types::{ObligationId, RegionId, TaskId, Time};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
#[cfg(test)]
use proptest::prelude::*;
#[derive(Debug, Clone)]
pub struct BugInjectionConfig {
pub violation_types: Vec<ProtocolViolationType>,
pub injection_probability: f64,
pub random_seed: Option<u64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProtocolViolationType {
RegionSkipDrain,
TaskCompleteAfterCancel,
ObligationDoubleCommit,
ObligationDoubleAbort,
ChannelUseAfterClose,
InvalidStateTransition,
ResourceLeak,
StateUpdateRace,
}
pub struct BugInjector {
config: BugInjectionConfig,
injected_bugs: AtomicU64,
detected_bugs: AtomicU64,
random_state: AtomicU64,
}
impl BugInjector {
pub fn new(config: BugInjectionConfig) -> Self {
Self {
random_state: AtomicU64::new(config.random_seed.unwrap_or(42)),
config,
injected_bugs: AtomicU64::new(0),
detected_bugs: AtomicU64::new(0),
}
}
pub fn should_inject(&self, violation_type: ProtocolViolationType) -> bool {
if !self.config.violation_types.contains(&violation_type) {
return false;
}
let current = self.random_state.load(Ordering::Relaxed);
let next = current.wrapping_mul(1103515245).wrapping_add(12345);
self.random_state.store(next, Ordering::Relaxed);
let probability = (next % 1000) as f64 / 1000.0;
probability < self.config.injection_probability
}
pub fn record_injection(&self, _violation_type: ProtocolViolationType) {
self.injected_bugs.fetch_add(1, Ordering::Relaxed);
}
pub fn record_detection(&self) {
self.detected_bugs.fetch_add(1, Ordering::Relaxed);
}
pub fn stats(&self) -> BugInjectionStats {
let injected = self.injected_bugs.load(Ordering::Relaxed);
let detected = self.detected_bugs.load(Ordering::Relaxed);
BugInjectionStats {
bugs_injected: injected,
bugs_detected: detected,
detection_rate: if injected == 0 {
1.0
} else {
detected as f64 / injected as f64
},
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct BugInjectionStats {
pub bugs_injected: u64,
pub bugs_detected: u64,
pub detection_rate: f64,
}
#[cfg(test)]
pub fn region_event_strategy() -> impl Strategy<Value = RegionEvent> {
prop_oneof![
Just(RegionEvent::Activate),
Just(RegionEvent::TaskSpawned),
Just(RegionEvent::TaskCompleted),
Just(RegionEvent::TaskDrained),
Just(RegionEvent::Cancel {
reason: "property cancel".to_string(),
}),
Just(RegionEvent::FinalizerRegistered),
Just(RegionEvent::FinalizerStarted),
Just(RegionEvent::FinalizerCompleted),
Just(RegionEvent::RequestClose),
]
}
#[cfg(test)]
pub fn task_event_strategy() -> impl Strategy<Value = TaskEvent> {
prop_oneof![
Just(TaskEvent::Start),
Just(TaskEvent::Complete),
Just(TaskEvent::RequestCancel),
Just(TaskEvent::DrainComplete),
prop::string::string_regex(r"[a-zA-Z0-9 ]{1,50}")
.unwrap()
.prop_map(|msg| TaskEvent::Panic { message: msg }),
]
}
#[cfg(test)]
pub fn obligation_event_strategy() -> impl Strategy<Value = ObligationEvent> {
prop_oneof![
Just(ObligationEvent::Reserve { token: 1 }),
Just(ObligationEvent::Commit),
Just(ObligationEvent::Abort {
reason: "property abort".to_string(),
}),
]
}
pub struct PropertyTestHarness {
validation_level: ValidationLevel,
bug_injector: Option<BugInjector>,
}
impl PropertyTestHarness {
pub fn new(validation_level: ValidationLevel, bug_injector: Option<BugInjector>) -> Self {
Self {
validation_level,
bug_injector,
}
}
pub fn test_region_transitions(&mut self, events: Vec<RegionEvent>) -> Result<(), String> {
let region_id = RegionId::new_for_test(10, 0);
let context = RegionContext {
region_id,
parent_region: None,
created_at: Time::ZERO,
validation_level: self.validation_level,
};
let mut state_machine = RegionStateMachine::new(region_id, self.validation_level);
for (i, event) in events.iter().enumerate() {
if let Some(ref injector) = self.bug_injector {
if matches!(event, RegionEvent::TaskDrained)
&& injector.should_inject(ProtocolViolationType::RegionSkipDrain)
{
injector.record_injection(ProtocolViolationType::RegionSkipDrain);
let result = state_machine.transition(RegionEvent::RequestClose, &context);
if !result.is_valid() {
injector.record_detection();
return Err(format!(
"Injected skipped-drain violation detected at step {i}: {result:?}"
));
}
}
}
let result = state_machine.transition(event.clone(), &context);
if !result.is_valid() {
if let Some(ref injector) = self.bug_injector {
injector.record_detection();
}
return Err(format!("Invalid transition at step {i}: {result:?}"));
}
}
Ok(())
}
pub fn test_task_transitions(&mut self, events: Vec<TaskEvent>) -> Result<(), String> {
let task_id = TaskId::new_for_test(20, 0);
let region_id = RegionId::new_for_test(20, 0);
let context = TaskContext {
task_id,
region_id,
spawned_at: Time::ZERO,
validation_level: self.validation_level,
};
let mut state_machine = TaskStateMachine::new(task_id, region_id, self.validation_level);
for (i, event) in events.iter().enumerate() {
let mut event = event.clone();
if let Some(ref injector) = self.bug_injector {
if matches!(state_machine.current_state(), TaskState::CancelRequested)
&& matches!(event, TaskEvent::DrainComplete)
&& injector.should_inject(ProtocolViolationType::TaskCompleteAfterCancel)
{
injector.record_injection(ProtocolViolationType::TaskCompleteAfterCancel);
event = TaskEvent::Complete;
}
}
let result = state_machine.transition(event, &context);
if !result.is_valid() {
if let Some(ref injector) = self.bug_injector {
injector.record_detection();
}
return Err(format!("Invalid transition at step {i}: {result:?}"));
}
}
Ok(())
}
pub fn test_obligation_transitions(
&mut self,
events: Vec<ObligationEvent>,
) -> Result<(), String> {
let obligation_id = ObligationId::new_for_test(30, 0);
let context = ObligationContext {
obligation_id,
region_id: RegionId::new_for_test(30, 0),
created_at: Time::ZERO,
validation_level: self.validation_level,
};
let mut state_machine = ObligationStateMachine::new(obligation_id, self.validation_level);
for (i, event) in events.iter().enumerate() {
let event = event.clone();
if let Some(ref injector) = self.bug_injector {
let inject_double_commit = matches!(event, ObligationEvent::Commit)
&& injector.should_inject(ProtocolViolationType::ObligationDoubleCommit);
let inject_double_abort = matches!(event, ObligationEvent::Abort { .. })
&& injector.should_inject(ProtocolViolationType::ObligationDoubleAbort);
if inject_double_commit || inject_double_abort {
let first = state_machine.transition(event.clone(), &context);
if !first.is_valid() {
injector.record_detection();
return Err(format!(
"Injected obligation setup failed at step {i}: {first:?}"
));
}
let injected_kind = if inject_double_commit {
ProtocolViolationType::ObligationDoubleCommit
} else {
ProtocolViolationType::ObligationDoubleAbort
};
injector.record_injection(injected_kind);
let second = state_machine.transition(event, &context);
if !second.is_valid() {
injector.record_detection();
return Err(format!(
"Injected duplicate obligation violation detected at step {i}: {second:?}"
));
}
continue;
}
}
let result = state_machine.transition(event, &context);
if !result.is_valid() {
if let Some(ref injector) = self.bug_injector {
injector.record_detection();
}
return Err(format!("Invalid transition at step {i}: {result:?}"));
}
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct PerformanceMeasurement {
pub validation_overhead_pct: f64,
pub memory_overhead_bytes: u64,
pub avg_latency_ns: u64,
pub p99_latency_ns: u64,
pub throughput_ops_per_sec: f64,
}
#[derive(Debug, Clone)]
pub struct PerformanceTestConfig {
pub num_operations: usize,
pub num_warmup: usize,
pub validation_level: ValidationLevel,
}
pub struct PerformanceTestHarness {
config: PerformanceTestConfig,
}
impl PerformanceTestHarness {
pub fn new(config: PerformanceTestConfig) -> Self {
Self { config }
}
pub fn measure_validation_overhead(&self) -> PerformanceMeasurement {
let with_validation = self.run_validation_benchmark(true);
let without_validation = self.run_validation_benchmark(false);
let overhead_pct = if without_validation.total_time_ns == 0 {
0.0
} else {
((with_validation.total_time_ns as f64 - without_validation.total_time_ns as f64)
/ without_validation.total_time_ns as f64)
* 100.0
};
PerformanceMeasurement {
validation_overhead_pct: overhead_pct,
memory_overhead_bytes: with_validation
.memory_usage
.saturating_sub(without_validation.memory_usage),
avg_latency_ns: with_validation.avg_latency_ns,
p99_latency_ns: with_validation.p99_latency_ns,
throughput_ops_per_sec: with_validation.throughput_ops_per_sec,
}
}
fn run_validation_benchmark(&self, enable_validation: bool) -> BenchmarkResult {
let validation_level = if enable_validation {
self.config.validation_level
} else {
ValidationLevel::None
};
let mut validator = CancelProtocolValidator::new(validation_level);
let mut latencies = Vec::with_capacity(self.config.num_operations);
for _ in 0..self.config.num_warmup {
let _ = self.simulate_cancel_protocol_operation(&mut validator);
}
let memory_before = Self::estimate_memory_usage(&validator);
let start_time = Instant::now();
for _ in 0..self.config.num_operations {
let op_start = Instant::now();
let _ = self.simulate_cancel_protocol_operation(&mut validator);
let op_duration = op_start.elapsed();
latencies.push(op_duration.as_nanos() as u64);
}
let total_time = start_time.elapsed();
let memory_after = Self::estimate_memory_usage(&validator);
if latencies.is_empty() {
return BenchmarkResult {
total_time_ns: total_time.as_nanos() as u64,
memory_usage: memory_after.saturating_sub(memory_before),
avg_latency_ns: 0,
p99_latency_ns: 0,
throughput_ops_per_sec: 0.0,
};
}
latencies.sort_unstable();
let avg_latency_ns = latencies.iter().sum::<u64>() / latencies.len() as u64;
let p99_index = (latencies.len() as f64 * 0.99) as usize;
let p99_latency_ns = latencies[p99_index.min(latencies.len() - 1)];
let throughput_ops_per_sec = self.config.num_operations as f64 / total_time.as_secs_f64();
BenchmarkResult {
total_time_ns: total_time.as_nanos() as u64,
memory_usage: memory_after.saturating_sub(memory_before),
avg_latency_ns,
p99_latency_ns,
throughput_ops_per_sec,
}
}
fn simulate_cancel_protocol_operation(
&self,
validator: &mut CancelProtocolValidator,
) -> Result<(), String> {
let region_id = RegionId::new_for_test(40, 0);
let task_id = TaskId::new_for_test(40, 0);
let obligation_id = ObligationId::new_for_test(40, 0);
validator.register_region(region_id);
validator.register_task(task_id, region_id);
validator.register_obligation(obligation_id);
let region_context = RegionContext {
region_id,
parent_region: None,
created_at: Time::ZERO,
validation_level: self.config.validation_level,
};
let task_context = TaskContext {
task_id,
region_id,
spawned_at: Time::ZERO,
validation_level: self.config.validation_level,
};
let obligation_context = ObligationContext {
obligation_id,
region_id,
created_at: Time::ZERO,
validation_level: self.config.validation_level,
};
Self::ensure_valid(validator.validate_region_transition(
region_id,
RegionEvent::Activate,
®ion_context,
))?;
Self::ensure_valid(validator.validate_region_transition(
region_id,
RegionEvent::TaskSpawned,
®ion_context,
))?;
Self::ensure_valid(validator.validate_task_transition(
task_id,
TaskEvent::Start,
&task_context,
))?;
Self::ensure_valid(validator.validate_obligation_transition(
obligation_id,
ObligationEvent::Reserve { token: 40 },
&obligation_context,
))?;
Self::ensure_valid(validator.validate_obligation_transition(
obligation_id,
ObligationEvent::Commit,
&obligation_context,
))?;
Self::ensure_valid(validator.validate_task_transition(
task_id,
TaskEvent::Complete,
&task_context,
))?;
Self::ensure_valid(validator.validate_region_transition(
region_id,
RegionEvent::TaskCompleted,
®ion_context,
))?;
Self::ensure_valid(validator.validate_region_transition(
region_id,
RegionEvent::RequestClose,
®ion_context,
))?;
Ok(())
}
fn ensure_valid(result: TransitionResult) -> Result<(), String> {
if result.is_valid() {
Ok(())
} else {
Err(format!("validator transition failed: {result:?}"))
}
}
fn estimate_memory_usage(validator: &CancelProtocolValidator) -> u64 {
let (regions, tasks, obligations, channels, io_ops, timers, _) = validator.stats();
let bytes = regions * std::mem::size_of::<RegionStateMachine>()
+ tasks * std::mem::size_of::<TaskStateMachine>()
+ obligations * std::mem::size_of::<ObligationStateMachine>()
+ channels * std::mem::size_of::<super::protocol_state_machines::ChannelStateMachine>()
+ io_ops * std::mem::size_of::<super::protocol_state_machines::IoStateMachine>()
+ timers * std::mem::size_of::<super::protocol_state_machines::TimerStateMachine>();
bytes as u64
}
}
#[derive(Debug, Clone)]
struct BenchmarkResult {
total_time_ns: u64,
memory_usage: u64,
avg_latency_ns: u64,
p99_latency_ns: u64,
throughput_ops_per_sec: f64,
}
#[derive(Debug, Clone)]
pub struct IntegrationTestConfig {
pub num_concurrent_regions: usize,
pub num_tasks_per_region: usize,
pub num_obligations_per_task: usize,
pub validation_level: ValidationLevel,
}
pub struct IntegrationTestHarness {
config: IntegrationTestConfig,
}
impl IntegrationTestHarness {
pub fn new(config: IntegrationTestConfig) -> Self {
Self { config }
}
pub fn test_concurrent_regions(&self) -> Result<(), String> {
for i in 0..self.config.num_concurrent_regions {
let config = self.config.clone();
Self::simulate_region_lifecycle_sync(i, config)?;
}
Ok(())
}
fn simulate_region_lifecycle_sync(
region_idx: usize,
config: IntegrationTestConfig,
) -> Result<(), String> {
let mut validator = CancelProtocolValidator::new(config.validation_level);
let region_index = 100 + region_idx as u32;
let region_id = RegionId::new_for_test(region_index, 0);
validator.register_region(region_id);
let region_context = RegionContext {
region_id,
parent_region: None,
created_at: Time::ZERO,
validation_level: config.validation_level,
};
PerformanceTestHarness::ensure_valid(validator.validate_region_transition(
region_id,
RegionEvent::Activate,
®ion_context,
))?;
for task_idx in 0..config.num_tasks_per_region {
let task_id = TaskId::new_for_test(region_index, task_idx as u32);
validator.register_task(task_id, region_id);
let task_context = TaskContext {
task_id,
region_id,
spawned_at: Time::ZERO,
validation_level: config.validation_level,
};
PerformanceTestHarness::ensure_valid(validator.validate_region_transition(
region_id,
RegionEvent::TaskSpawned,
®ion_context,
))?;
PerformanceTestHarness::ensure_valid(validator.validate_task_transition(
task_id,
TaskEvent::Start,
&task_context,
))?;
for obligation_idx in 0..config.num_obligations_per_task {
let obligation_id = ObligationId::new_for_test(
region_index,
(task_idx * config.num_obligations_per_task + obligation_idx) as u32,
);
validator.register_obligation(obligation_id);
let obligation_context = ObligationContext {
obligation_id,
region_id,
created_at: Time::ZERO,
validation_level: config.validation_level,
};
PerformanceTestHarness::ensure_valid(validator.validate_obligation_transition(
obligation_id,
ObligationEvent::Reserve {
token: 1 + obligation_idx as u64,
},
&obligation_context,
))?;
PerformanceTestHarness::ensure_valid(validator.validate_obligation_transition(
obligation_id,
ObligationEvent::Commit,
&obligation_context,
))?;
}
PerformanceTestHarness::ensure_valid(validator.validate_task_transition(
task_id,
TaskEvent::Complete,
&task_context,
))?;
PerformanceTestHarness::ensure_valid(validator.validate_region_transition(
region_id,
RegionEvent::TaskCompleted,
®ion_context,
))?;
}
PerformanceTestHarness::ensure_valid(validator.validate_region_transition(
region_id,
RegionEvent::RequestClose,
®ion_context,
))?;
Ok(())
}
pub fn test_error_reporting(&self) -> Result<(), String> {
let mut validator = CancelProtocolValidator::new(self.config.validation_level);
let region_id = RegionId::new_for_test(200, 0);
let context = RegionContext {
region_id,
parent_region: None,
created_at: Time::ZERO,
validation_level: self.config.validation_level,
};
let result =
validator.validate_region_transition(region_id, RegionEvent::RequestClose, &context);
match result {
TransitionResult::Invalid { reason, .. } if reason.contains("not registered") => Ok(()),
other => Err(format!("unexpected validation result: {other:?}")),
}
}
pub fn test_validation_level_config(&self) -> Result<(), String> {
let test_cases = [
ValidationLevel::None,
ValidationLevel::Basic,
ValidationLevel::Full,
ValidationLevel::Debug,
];
for (idx, level) in test_cases.into_iter().enumerate() {
Self::simulate_region_lifecycle_sync(
idx,
IntegrationTestConfig {
num_concurrent_regions: 1,
num_tasks_per_region: 1,
num_obligations_per_task: 1,
validation_level: level,
},
)?;
}
Ok(())
}
}
pub struct FalsePositiveTestHarness {
validator: CancelProtocolValidator,
}
impl FalsePositiveTestHarness {
pub fn new(validation_level: ValidationLevel) -> Self {
Self {
validator: CancelProtocolValidator::new(validation_level),
}
}
pub fn test_valid_sequences(&mut self) -> Result<(), String> {
let test_sequences = vec![
self.test_simple_region_lifecycle(),
self.test_nested_region_lifecycle(),
self.test_concurrent_task_completion(),
self.test_obligation_lifecycle(),
self.test_cancel_propagation(),
self.test_cancel_propagation_nested(),
];
for (i, result) in test_sequences.into_iter().enumerate() {
result.map_err(|e| format!("Valid sequence {} failed validation: {}", i, e))?;
}
Ok(())
}
fn test_cancel_propagation_nested(&mut self) -> Result<(), String> {
let parent = RegionId::new_for_test(305, 0);
let child = RegionId::new_for_test(305, 1);
let grandchild = RegionId::new_for_test(305, 2);
let parent_task = TaskId::new_for_test(305, 0);
let child_task = TaskId::new_for_test(305, 1);
let grandchild_task = TaskId::new_for_test(305, 2);
let region_ctx = |region_id: RegionId, parent_id: Option<RegionId>| RegionContext {
region_id,
parent_region: parent_id,
created_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
let task_ctx = |task_id: TaskId, region_id: RegionId| TaskContext {
task_id,
region_id,
spawned_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
self.validator.register_region(parent);
self.validator.register_region(child);
self.validator.register_region(grandchild);
self.validator.register_task(parent_task, parent);
self.validator.register_task(child_task, child);
self.validator.register_task(grandchild_task, grandchild);
for (rid, parent_of_rid) in [
(parent, None),
(child, Some(parent)),
(grandchild, Some(child)),
] {
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
rid,
RegionEvent::Activate,
®ion_ctx(rid, parent_of_rid),
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
rid,
RegionEvent::TaskSpawned,
®ion_ctx(rid, parent_of_rid),
))?;
}
for (tid, rid) in [
(parent_task, parent),
(child_task, child),
(grandchild_task, grandchild),
] {
PerformanceTestHarness::ensure_valid(self.validator.validate_task_transition(
tid,
TaskEvent::Start,
&task_ctx(tid, rid),
))?;
}
for (rid, parent_of_rid) in [
(parent, None),
(child, Some(parent)),
(grandchild, Some(child)),
] {
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
rid,
RegionEvent::Cancel {
reason: format!("nested cancel from {parent:?}"),
},
®ion_ctx(rid, parent_of_rid),
))?;
}
for (tid, rid) in [
(grandchild_task, grandchild),
(child_task, child),
(parent_task, parent),
] {
PerformanceTestHarness::ensure_valid(self.validator.validate_task_transition(
tid,
TaskEvent::RequestCancel,
&task_ctx(tid, rid),
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_task_transition(
tid,
TaskEvent::DrainComplete,
&task_ctx(tid, rid),
))?;
}
for (rid, parent_of_rid) in [
(grandchild, Some(child)),
(child, Some(parent)),
(parent, None),
] {
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
rid,
RegionEvent::TaskDrained,
®ion_ctx(rid, parent_of_rid),
))?;
}
Ok(())
}
fn test_simple_region_lifecycle(&mut self) -> Result<(), String> {
let region_id = RegionId::new_for_test(300, 0);
let task_id = TaskId::new_for_test(300, 0);
let region_context = RegionContext {
region_id,
parent_region: None,
created_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
let task_context = TaskContext {
task_id,
region_id,
spawned_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
self.validator.register_region(region_id);
self.validator.register_task(task_id, region_id);
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::Activate,
®ion_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::TaskSpawned,
®ion_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_task_transition(
task_id,
TaskEvent::Start,
&task_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_task_transition(
task_id,
TaskEvent::Complete,
&task_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::TaskCompleted,
®ion_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::RequestClose,
®ion_context,
))?;
Ok(())
}
fn test_nested_region_lifecycle(&mut self) -> Result<(), String> {
let parent_region = RegionId::new_for_test(301, 0);
let child_region = RegionId::new_for_test(301, 1);
let parent_context = RegionContext {
region_id: parent_region,
parent_region: None,
created_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
let child_context = RegionContext {
region_id: child_region,
parent_region: Some(parent_region),
created_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
self.validator.register_region(parent_region);
self.validator.register_region(child_region);
for (region_id, context) in [
(parent_region, &parent_context),
(child_region, &child_context),
] {
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::Activate,
context,
))?;
}
for (region_id, context) in [
(child_region, &child_context),
(parent_region, &parent_context),
] {
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::RequestClose,
context,
))?;
}
Ok(())
}
fn test_concurrent_task_completion(&mut self) -> Result<(), String> {
let region_id = RegionId::new_for_test(302, 0);
let region_context = RegionContext {
region_id,
parent_region: None,
created_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
self.validator.register_region(region_id);
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::Activate,
®ion_context,
))?;
let task_ids: Vec<_> = (0..5).map(|idx| TaskId::new_for_test(302, idx)).collect();
for &task_id in &task_ids {
let task_context = TaskContext {
task_id,
region_id,
spawned_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
self.validator.register_task(task_id, region_id);
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::TaskSpawned,
®ion_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_task_transition(
task_id,
TaskEvent::Start,
&task_context,
))?;
}
for &task_id in task_ids.iter().rev() {
let task_context = TaskContext {
task_id,
region_id,
spawned_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
PerformanceTestHarness::ensure_valid(self.validator.validate_task_transition(
task_id,
TaskEvent::Complete,
&task_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::TaskCompleted,
®ion_context,
))?;
}
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::RequestClose,
®ion_context,
))?;
Ok(())
}
fn test_obligation_lifecycle(&mut self) -> Result<(), String> {
let region_id = RegionId::new_for_test(303, 0);
let task_id = TaskId::new_for_test(303, 0);
let obligation_id = ObligationId::new_for_test(303, 0);
let region_context = RegionContext {
region_id,
parent_region: None,
created_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
let task_context = TaskContext {
task_id,
region_id,
spawned_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
let obligation_context = ObligationContext {
obligation_id,
region_id,
created_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
self.validator.register_region(region_id);
self.validator.register_task(task_id, region_id);
self.validator.register_obligation(obligation_id);
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::Activate,
®ion_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::TaskSpawned,
®ion_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_task_transition(
task_id,
TaskEvent::Start,
&task_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_obligation_transition(
obligation_id,
ObligationEvent::Reserve { token: 303 },
&obligation_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_obligation_transition(
obligation_id,
ObligationEvent::Commit,
&obligation_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_task_transition(
task_id,
TaskEvent::Complete,
&task_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::TaskCompleted,
®ion_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::RequestClose,
®ion_context,
))?;
Ok(())
}
fn test_cancel_propagation(&mut self) -> Result<(), String> {
let region_id = RegionId::new_for_test(304, 0);
let task_id = TaskId::new_for_test(304, 0);
let region_context = RegionContext {
region_id,
parent_region: None,
created_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
let task_context = TaskContext {
task_id,
region_id,
spawned_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
self.validator.register_region(region_id);
self.validator.register_task(task_id, region_id);
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::Activate,
®ion_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::TaskSpawned,
®ion_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_task_transition(
task_id,
TaskEvent::Start,
&task_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::Cancel {
reason: "test cancel".to_string(),
},
®ion_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_task_transition(
task_id,
TaskEvent::RequestCancel,
&task_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_task_transition(
task_id,
TaskEvent::DrainComplete,
&task_context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::TaskDrained,
®ion_context,
))?;
Ok(())
}
pub fn test_edge_cases(&mut self) -> Result<(), String> {
for idx in 0..100 {
let region_id = RegionId::new_for_test(400, idx);
let context = RegionContext {
region_id,
parent_region: None,
created_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
self.validator.register_region(region_id);
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::Activate,
&context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::RequestClose,
&context,
))?;
}
let region_id = RegionId::new_for_test(401, 0);
let task_id = TaskId::new_for_test(401, 0);
let context = RegionContext {
region_id,
parent_region: None,
created_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
self.validator.register_region(region_id);
self.validator.register_task(task_id, region_id);
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::Activate,
&context,
))?;
PerformanceTestHarness::ensure_valid(self.validator.validate_region_transition(
region_id,
RegionEvent::RequestClose,
&context,
))?;
Ok(())
}
}
pub struct CancelProtocolTestSuite {
pub bug_injection: BugInjectionStats,
pub performance: PerformanceMeasurement,
pub false_positive_count: u64,
pub total_tests_run: u64,
}
impl CancelProtocolTestSuite {
pub fn run_full_suite() -> Result<Self, String> {
let mut total_tests = 0u64;
let false_positives = 0u64;
let bug_injection_config = BugInjectionConfig {
violation_types: vec![
ProtocolViolationType::RegionSkipDrain,
ProtocolViolationType::TaskCompleteAfterCancel,
ProtocolViolationType::ObligationDoubleCommit,
],
injection_probability: 1.0,
random_seed: Some(42),
};
let bug_injector = BugInjector::new(bug_injection_config);
let mut property_harness =
PropertyTestHarness::new(ValidationLevel::Full, Some(bug_injector));
total_tests += 1;
let _ = property_harness.test_task_transitions(vec![
TaskEvent::Start,
TaskEvent::RequestCancel,
TaskEvent::DrainComplete,
]);
let _ = property_harness.test_obligation_transitions(vec![
ObligationEvent::Reserve { token: 1 },
ObligationEvent::Commit,
]);
let bug_injection_stats = property_harness.bug_injector.as_ref().unwrap().stats();
if bug_injection_stats.bugs_injected != bug_injection_stats.bugs_detected {
return Err(format!(
"bug injection detection mismatch: injected={}, detected={}",
bug_injection_stats.bugs_injected, bug_injection_stats.bugs_detected
));
}
let perf_config = PerformanceTestConfig {
num_operations: 256,
num_warmup: 32,
validation_level: ValidationLevel::Full,
};
let perf_harness = PerformanceTestHarness::new(perf_config);
let performance_results = perf_harness.measure_validation_overhead();
total_tests += 1;
let mut fp_harness = FalsePositiveTestHarness::new(ValidationLevel::Full);
match fp_harness.test_valid_sequences() {
Ok(_) => {}
Err(e) => {
return Err(format!("false positive detected: {e}"));
}
}
total_tests += 1;
let integration_config = IntegrationTestConfig {
num_concurrent_regions: 10,
num_tasks_per_region: 5,
num_obligations_per_task: 2,
validation_level: ValidationLevel::Full,
};
let integration_harness = IntegrationTestHarness::new(integration_config);
integration_harness.test_error_reporting()?;
integration_harness.test_validation_level_config()?;
total_tests += 2;
Ok(Self {
bug_injection: bug_injection_stats,
performance: performance_results,
false_positive_count: false_positives,
total_tests_run: total_tests,
})
}
pub fn generate_report(&self) -> String {
format!(
r"
# Cancel Protocol Validator Test Suite Results
## Summary
- Total tests run: {}
- False positives: {}
- Bug detection rate: {:.2}%
## Bug Injection Testing
- Bugs injected: {}
- Bugs detected: {}
- Detection rate: {:.2}%
## Performance Testing
- Validation overhead: {:.2}%
- Memory overhead: {} bytes
- Average latency: {} ns
- P99 latency: {} ns
- Throughput: {:.0} ops/sec
## Performance Targets
- Debug overhead target: <5% (actual: {:.2}%)
- Production overhead target: <0.1% (estimated from debug)
- Memory overhead: acceptable if <1MB per 1000 entities
## Recommendations
{}
",
self.total_tests_run,
self.false_positive_count,
if self.bug_injection.bugs_injected > 0 {
self.bug_injection.detection_rate * 100.0
} else {
100.0
},
self.bug_injection.bugs_injected,
self.bug_injection.bugs_detected,
self.bug_injection.detection_rate * 100.0,
self.performance.validation_overhead_pct,
self.performance.memory_overhead_bytes,
self.performance.avg_latency_ns,
self.performance.p99_latency_ns,
self.performance.throughput_ops_per_sec,
self.performance.validation_overhead_pct,
self.generate_recommendations()
)
}
fn generate_recommendations(&self) -> String {
let mut recommendations = Vec::new();
if self.bug_injection.detection_rate < 1.0 {
recommendations
.push("- Improve bug detection: some injected violations were not caught");
}
if self.performance.validation_overhead_pct > 5.0 {
recommendations.push("- Optimize validation performance: overhead exceeds 5% target");
}
if self.false_positive_count > 0 {
recommendations
.push("- Fix false positives: valid operations should never trigger assertions");
}
if self.performance.memory_overhead_bytes > 1024 * 1024 {
recommendations.push("- Optimize memory usage: overhead exceeds 1MB guidelines");
}
if recommendations.is_empty() {
recommendations.push("- All tests passed within acceptable parameters");
}
recommendations.join("\n")
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
use insta::{assert_json_snapshot, assert_snapshot};
use serde_json::{Value, json};
fn render_transition_result(result: &TransitionResult) -> Value {
match result {
TransitionResult::Valid => json!({
"kind": "valid",
}),
TransitionResult::Invalid {
reason,
current_state,
attempted_transition,
} => json!({
"kind": "invalid",
"reason": reason,
"current_state": current_state,
"attempted_transition": attempted_transition,
}),
TransitionResult::InvariantViolation { invariant, context } => json!({
"kind": "invariant_violation",
"invariant": invariant,
"context": context,
}),
}
}
fn render_validator_stats(validator: &CancelProtocolValidator) -> Value {
let (regions, tasks, obligations, channels, io_ops, timers, violations) = validator.stats();
json!({
"regions": regions,
"tasks": tasks,
"obligations": obligations,
"channels": channels,
"io_operations": io_ops,
"timers": timers,
"violations": violations,
})
}
fn render_transition_step<E: std::fmt::Debug>(
step: usize,
event: &E,
result: &TransitionResult,
) -> Value {
json!({
"step": step,
"event": format!("{event:?}"),
"result": render_transition_result(result),
})
}
#[test]
fn test_bug_injector_creation() {
let config = BugInjectionConfig {
violation_types: vec![ProtocolViolationType::RegionSkipDrain],
injection_probability: 0.5,
random_seed: Some(42),
};
let injector = BugInjector::new(config);
let stats = injector.stats();
assert_eq!(stats.bugs_injected, 0);
assert_eq!(stats.bugs_detected, 0);
assert_eq!(stats.detection_rate, 1.0);
}
#[test]
fn test_performance_harness_creation() {
let config = PerformanceTestConfig {
num_operations: 100,
num_warmup: 10,
validation_level: ValidationLevel::Full,
};
let harness = PerformanceTestHarness::new(config);
assert!(harness.config.num_operations == 100);
}
#[test]
fn test_property_harness_basic() {
let mut harness = PropertyTestHarness::new(ValidationLevel::Full, None);
let events = vec![
RegionEvent::Activate,
RegionEvent::TaskSpawned,
RegionEvent::TaskCompleted,
RegionEvent::RequestClose,
];
assert!(harness.test_region_transitions(events).is_ok());
}
#[test]
fn test_false_positive_harness() {
let mut harness = FalsePositiveTestHarness::new(ValidationLevel::Full);
assert!(harness.test_simple_region_lifecycle().is_ok());
assert!(harness.test_edge_cases().is_ok());
}
#[test]
fn test_integration_harness_config() {
let config = IntegrationTestConfig {
num_concurrent_regions: 5,
num_tasks_per_region: 3,
num_obligations_per_task: 1,
validation_level: ValidationLevel::Full,
};
let harness = IntegrationTestHarness::new(config);
assert!(harness.test_validation_level_config().is_ok());
}
#[test]
fn golden_task_clean_cancel_sequence() {
let task_id = TaskId::new_for_test(401, 0);
let region_id = RegionId::new_for_test(401, 0);
let context = TaskContext {
task_id,
region_id,
spawned_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
let mut machine = TaskStateMachine::new(task_id, region_id, ValidationLevel::Full);
let events = vec![
TaskEvent::Start,
TaskEvent::RequestCancel,
TaskEvent::DrainComplete,
];
let steps: Vec<_> = events
.into_iter()
.enumerate()
.map(|(step, event)| {
let result = machine.transition(event.clone(), &context);
render_transition_step(step, &event, &result)
})
.collect();
assert_json_snapshot!(
"cancel_protocol_task_clean_cancel_sequence",
json!({
"scenario": "task_clean_cancel_sequence",
"steps": steps,
"final_state": format!("{:?}", machine.current_state()),
"state_description": machine.state_description(),
"is_terminal": machine.is_terminal(),
})
);
}
#[test]
fn golden_task_panic_after_cancel_request() {
let task_id = TaskId::new_for_test(402, 0);
let region_id = RegionId::new_for_test(402, 0);
let context = TaskContext {
task_id,
region_id,
spawned_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
let mut machine = TaskStateMachine::new(task_id, region_id, ValidationLevel::Full);
let events = vec![
TaskEvent::Start,
TaskEvent::RequestCancel,
TaskEvent::Panic {
message: "panic during finalize".to_string(),
},
];
let steps: Vec<_> = events
.into_iter()
.enumerate()
.map(|(step, event)| {
let result = machine.transition(event.clone(), &context);
render_transition_step(step, &event, &result)
})
.collect();
assert_json_snapshot!(
"cancel_protocol_task_panic_after_cancel_request",
json!({
"scenario": "task_panic_after_cancel_request",
"steps": steps,
"final_state": format!("{:?}", machine.current_state()),
"state_description": machine.state_description(),
"is_terminal": machine.is_terminal(),
})
);
}
#[test]
fn golden_obligation_abort_sequence() {
let obligation_id = ObligationId::new_for_test(403, 0);
let context = ObligationContext {
obligation_id,
region_id: RegionId::new_for_test(403, 0),
created_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
let mut machine = ObligationStateMachine::new(obligation_id, ValidationLevel::Full);
let events = vec![
ObligationEvent::Reserve { token: 7 },
ObligationEvent::Abort {
reason: "race loser aborted".to_string(),
},
];
let steps: Vec<_> = events
.into_iter()
.enumerate()
.map(|(step, event)| {
let result = machine.transition(event.clone(), &context);
render_transition_step(step, &event, &result)
})
.collect();
assert_json_snapshot!(
"cancel_protocol_obligation_abort_sequence",
json!({
"scenario": "obligation_abort_sequence",
"steps": steps,
"final_state": format!("{:?}", machine.current_state()),
"state_description": machine.state_description(),
"is_terminal": machine.is_terminal(),
})
);
}
#[test]
fn golden_obligation_duplicate_commit_violation() {
let obligation_id = ObligationId::new_for_test(404, 0);
let context = ObligationContext {
obligation_id,
region_id: RegionId::new_for_test(404, 0),
created_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
let mut machine = ObligationStateMachine::new(obligation_id, ValidationLevel::Full);
let events = vec![
ObligationEvent::Reserve { token: 7 },
ObligationEvent::Commit,
ObligationEvent::Commit,
];
let steps: Vec<_> = events
.into_iter()
.enumerate()
.map(|(step, event)| {
let result = machine.transition(event.clone(), &context);
render_transition_step(step, &event, &result)
})
.collect();
assert_json_snapshot!(
"cancel_protocol_obligation_duplicate_commit_violation",
json!({
"scenario": "obligation_duplicate_commit_violation",
"steps": steps,
"final_state": format!("{:?}", machine.current_state()),
"state_description": machine.state_description(),
"is_terminal": machine.is_terminal(),
})
);
}
#[test]
fn golden_validator_diagnostic_matrix() {
let mut validator = CancelProtocolValidator::new(ValidationLevel::Full);
let region_id = RegionId::new_for_test(405, 0);
let registered_obligation_id = ObligationId::new_for_test(405, 0);
let unregistered_task_id = TaskId::new_for_test(406, 0);
let unregistered_obligation_id = ObligationId::new_for_test(406, 0);
let region_context = RegionContext {
region_id,
parent_region: None,
created_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
let task_context = TaskContext {
task_id: unregistered_task_id,
region_id,
spawned_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
let registered_obligation_context = ObligationContext {
obligation_id: registered_obligation_id,
region_id,
created_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
let unregistered_obligation_context = ObligationContext {
obligation_id: unregistered_obligation_id,
region_id,
created_at: Time::ZERO,
validation_level: ValidationLevel::Full,
};
validator.register_region(region_id);
validator.register_obligation(registered_obligation_id);
let activate =
validator.validate_region_transition(region_id, RegionEvent::Activate, ®ion_context);
let region_invalid = validator.validate_region_transition(
region_id,
RegionEvent::TaskCompleted,
®ion_context,
);
let zero_token_invariant = validator.validate_obligation_transition(
registered_obligation_id,
ObligationEvent::Reserve { token: 0 },
®istered_obligation_context,
);
let unregistered_task = validator.validate_task_transition(
unregistered_task_id,
TaskEvent::Complete,
&task_context,
);
let unregistered_obligation = validator.validate_obligation_transition(
unregistered_obligation_id,
ObligationEvent::Commit,
&unregistered_obligation_context,
);
assert_json_snapshot!(
"cancel_protocol_validator_diagnostic_matrix",
json!({
"scenario": "validator_diagnostic_matrix",
"activate": render_transition_result(&activate),
"region_invalid_complete_without_tasks": render_transition_result(®ion_invalid),
"zero_token_invariant_violation": render_transition_result(&zero_token_invariant),
"unregistered_task": render_transition_result(&unregistered_task),
"unregistered_obligation": render_transition_result(&unregistered_obligation),
"validator_stats": render_validator_stats(&validator),
"violation_count": validator.violation_count(),
})
);
}
#[test]
fn golden_cancel_protocol_test_suite_report() {
let suite = CancelProtocolTestSuite {
bug_injection: BugInjectionStats {
bugs_injected: 4,
bugs_detected: 3,
detection_rate: 0.75,
},
performance: PerformanceMeasurement {
validation_overhead_pct: 6.25,
memory_overhead_bytes: 2_048,
avg_latency_ns: 128,
p99_latency_ns: 512,
throughput_ops_per_sec: 4_096.0,
},
false_positive_count: 1,
total_tests_run: 16,
};
assert_snapshot!("cancel_protocol_test_suite_report", suite.generate_report());
}
proptest! {
#[test]
fn property_test_region_events(events in prop::collection::vec(region_event_strategy(), 1..20)) {
let mut harness = PropertyTestHarness::new(ValidationLevel::Full, None);
let result = harness.test_region_transitions(events);
match result {
Ok(_) => {
}
Err(error) => {
assert!(!error.is_empty(), "Error messages should not be empty");
assert!(error.len() < 1000, "Error messages should be reasonable length");
}
}
}
#[test]
fn property_test_task_events(events in prop::collection::vec(task_event_strategy(), 1..15)) {
let mut harness = PropertyTestHarness::new(ValidationLevel::Full, None);
let result = harness.test_task_transitions(events);
match result {
Ok(_) => {
}
Err(error) => {
assert!(!error.is_empty());
}
}
}
}
}