use crate::types::{RegionId, TaskId, Time};
use proptest::prelude::*;
use std::collections::{HashMap, HashSet, VecDeque};
const MAX_TASKS_PER_TEST: usize = 20;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum PriorityClass {
Cancel,
Timed,
Ready,
}
fn test_task_id(index: u64) -> TaskId {
TaskId::new_for_test(index as u32, 0)
}
fn test_region_id(index: u32) -> RegionId {
RegionId::new_for_test(index, 0)
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct TestTask {
id: TaskId,
priority: PriorityClass,
region: RegionId,
is_send: bool,
spawn_time: Time,
}
impl TestTask {
fn new(id: TaskId, priority: PriorityClass, region: RegionId, is_send: bool) -> Self {
Self {
id,
priority,
region,
is_send,
spawn_time: Time::from_nanos(1_000_000_000),
}
}
}
#[test]
fn mr1_priority_ordering_preservation() {
proptest!(|(
worker_count in 1usize..=4,
task_priorities: Vec<u8>
)| {
prop_assume!(!task_priorities.is_empty() && task_priorities.len() <= MAX_TASKS_PER_TEST);
prop_assert!(worker_count > 0);
let mut tasks = Vec::new();
for (i, &priority_val) in task_priorities.iter().enumerate() {
let priority = match priority_val % 3 {
0 => PriorityClass::Cancel,
1 => PriorityClass::Timed,
2 => PriorityClass::Ready,
_ => unreachable!(),
};
let task = TestTask::new(
test_task_id(i as u64 + 1),
priority,
test_region_id(1),
true, );
tasks.push(task);
}
let mut expected_order = tasks.clone();
expected_order.sort_by_key(|task| priority_to_order(task.priority));
for i in 0..tasks.len() {
for j in i+1..tasks.len() {
let task_a = &tasks[i];
let task_b = &tasks[j];
if task_a.priority != task_b.priority {
let a_order = priority_to_order(task_a.priority);
let b_order = priority_to_order(task_b.priority);
prop_assert!(a_order <= b_order || b_order <= a_order,
"Priority ordering should be consistent for tasks {:?} and {:?}",
task_a.id, task_b.id);
}
}
}
});
}
fn priority_to_order(priority: PriorityClass) -> u8 {
match priority {
PriorityClass::Cancel => 0,
PriorityClass::Timed => 1,
PriorityClass::Ready => 2,
}
}
#[test]
fn mr2_fairness_counter_monotonicity() {
proptest!(|(
cancel_limit in 1u32..=10,
dispatch_sequence: Vec<u8>
)| {
prop_assume!(!dispatch_sequence.is_empty() && dispatch_sequence.len() <= 50);
let mut cancel_streak = 0u32;
let effective_limit = cancel_limit;
for &dispatch_type in &dispatch_sequence {
let is_cancel_dispatch = (dispatch_type % 4) == 0;
if is_cancel_dispatch {
if cancel_streak < effective_limit {
cancel_streak += 1;
} else {
cancel_streak = 0;
}
} else {
cancel_streak = 0;
}
prop_assert!(cancel_streak <= effective_limit,
"Cancel streak {} exceeded limit {}", cancel_streak, effective_limit);
if cancel_streak == effective_limit {
prop_assert!(!is_cancel_dispatch ||
dispatch_sequence.len() == 1, "Fairness violation: cancel work continued after hitting limit");
}
}
});
}
#[test]
fn mr3_work_conservation() {
proptest!(|(
initial_tasks: Vec<u8>,
operations: Vec<u8>
)| {
prop_assume!(!initial_tasks.is_empty() && initial_tasks.len() <= 15);
prop_assume!(operations.len() <= 20);
let mut task_set = HashSet::new();
let mut completed_tasks = HashSet::new();
let mut next_task_id = 1u64;
for &task_type in &initial_tasks {
let priority = match task_type % 3 {
0 => PriorityClass::Cancel,
1 => PriorityClass::Timed,
2 => PriorityClass::Ready,
_ => unreachable!(),
};
let task = TestTask::new(
test_task_id(next_task_id),
priority,
test_region_id(1),
true,
);
task_set.insert(task.id);
next_task_id += 1;
}
let initial_count = task_set.len();
for &op in &operations {
match op % 4 {
0 => {
let task_id = test_task_id(next_task_id);
task_set.insert(task_id);
next_task_id += 1;
}
1 => {
if let Some(&task_id) = task_set.iter().next() {
task_set.remove(&task_id);
completed_tasks.insert(task_id);
}
}
2 => {
}
3 => {
}
_ => unreachable!(),
}
let current_total = task_set.len() + completed_tasks.len();
let expected_total = initial_count +
operations.iter().take((op as usize) + 1).filter(|&&x| x % 4 == 0).count();
prop_assert!(current_total <= expected_total,
"Task conservation violation: {} tasks in system but expected ≤ {}",
current_total, expected_total);
}
});
}
#[test]
fn mr4_queue_consistency() {
proptest!(|(
push_sequence: Vec<u16>,
pop_count in 0usize..=10
)| {
prop_assume!(!push_sequence.is_empty() && push_sequence.len() <= 20);
prop_assume!(pop_count <= push_sequence.len());
let mut queue = VecDeque::new();
let mut pushed_order = Vec::new();
for &task_id in &push_sequence {
queue.push_back(task_id);
pushed_order.push(task_id);
}
prop_assert_eq!(queue.len(), push_sequence.len(),
"Queue length should match push count");
let mut popped_order = Vec::new();
for _ in 0..pop_count {
if let Some(task_id) = queue.pop_front() {
popped_order.push(task_id);
}
}
for (i, &popped_task) in popped_order.iter().enumerate() {
prop_assert_eq!(popped_task, pushed_order[i],
"FIFO violation: position {} should be {} but got {}",
i, pushed_order[i], popped_task);
}
let remaining: Vec<_> = queue.into_iter().collect();
let expected_remaining = &pushed_order[pop_count..];
prop_assert_eq!(remaining.len(), expected_remaining.len(),
"Remaining queue length mismatch");
for (i, (&remaining_task, &expected_task)) in
remaining.iter().zip(expected_remaining.iter()).enumerate() {
prop_assert_eq!(remaining_task, expected_task,
"Remaining queue order violation at position {}", i);
}
});
}
#[test]
fn mr5_stealing_locality_preservation() {
proptest!(|(
worker_count in 2usize..=8,
cohort_size in 1usize..=4,
steal_opportunities: Vec<u8>
)| {
prop_assume!(cohort_size <= worker_count);
prop_assume!(!steal_opportunities.is_empty() && steal_opportunities.len() <= 30);
let cohort_count = (worker_count + cohort_size - 1) / cohort_size;
let mut same_cohort_steals = 0u32;
let mut cross_cohort_steals = 0u32;
for &steal_op in &steal_opportunities {
let stealer_worker = steal_op as usize % worker_count;
let target_worker = (steal_op as usize / worker_count) % worker_count;
if stealer_worker == target_worker {
continue; }
let stealer_cohort = stealer_worker / cohort_size;
let target_cohort = target_worker / cohort_size;
prop_assert!(
stealer_cohort < cohort_count,
"Stealer cohort {} should be < cohort count {}",
stealer_cohort,
cohort_count
);
prop_assert!(
target_cohort < cohort_count,
"Target cohort {} should be < cohort count {}",
target_cohort,
cohort_count
);
if stealer_cohort == target_cohort {
same_cohort_steals += 1;
} else {
cross_cohort_steals += 1;
}
}
let total_steals = same_cohort_steals + cross_cohort_steals;
if total_steals > 0 {
prop_assert!(same_cohort_steals + cross_cohort_steals == total_steals);
}
});
}
#[test]
fn mr6_backpressure_compliance() {
proptest!(|(
injection_attempts: Vec<bool>,
governor_drain_active: bool
)| {
prop_assume!(!injection_attempts.is_empty() && injection_attempts.len() <= 25);
let mut accepted_injections = 0u32;
let mut throttled_injections = 0u32;
for &is_critical in &injection_attempts {
if governor_drain_active && !is_critical {
throttled_injections += 1;
} else {
accepted_injections += 1;
}
}
let total_attempts = injection_attempts.len() as u32;
prop_assert_eq!(accepted_injections + throttled_injections, total_attempts,
"Injection accounting mismatch: {} + {} ≠ {}",
accepted_injections, throttled_injections, total_attempts);
if governor_drain_active {
let critical_count = injection_attempts.iter().filter(|&&x| x).count() as u32;
prop_assert_eq!(accepted_injections, critical_count,
"During drain mode, only critical tasks should be accepted");
}
});
}
#[test]
fn mr7_waker_determinism() {
proptest!(|(
task_ids: Vec<u16>,
wake_order_a: Vec<usize>,
wake_order_b: Vec<usize>
)| {
prop_assume!(!task_ids.is_empty() && task_ids.len() <= 10);
prop_assume!(wake_order_a.len() == task_ids.len());
prop_assume!(wake_order_b.len() == task_ids.len());
let mut tasks_a = Vec::new();
let mut tasks_b = Vec::new();
for &order_idx in &wake_order_a {
let task_idx = order_idx % task_ids.len();
tasks_a.push(task_ids[task_idx]);
}
for &order_idx in &wake_order_b {
let task_idx = order_idx % task_ids.len();
tasks_b.push(task_ids[task_idx]);
}
let mut set_a: Vec<_> = tasks_a.clone();
let mut set_b: Vec<_> = tasks_b.clone();
set_a.sort_unstable();
set_b.sort_unstable();
prop_assert_eq!(set_a, set_b,
"Equivalent wake operations should affect the same task set");
let mut count_a = HashMap::new();
let mut count_b = HashMap::new();
for task in tasks_a {
*count_a.entry(task).or_insert(0) += 1;
}
for task in tasks_b {
*count_b.entry(task).or_insert(0) += 1;
}
prop_assert_eq!(count_a, count_b,
"Wake count distribution should be equivalent");
});
}
#[test]
fn mr8_batch_processing_invariance() {
proptest!(|(
tasks: Vec<u8>,
batch_sizes: Vec<usize>
)| {
prop_assume!(!tasks.is_empty() && tasks.len() <= 20);
prop_assume!(!batch_sizes.is_empty() && batch_sizes.len() <= 5);
prop_assume!(batch_sizes.iter().all(|&size| size > 0 && size <= 10));
let mut individual_result = 0u64;
for &task in &tasks {
individual_result += task as u64;
}
for &batch_size in &batch_sizes {
let mut batch_result = 0u64;
let mut i = 0;
while i < tasks.len() {
let batch_end = std::cmp::min(i + batch_size, tasks.len());
let batch_sum: u64 = tasks[i..batch_end].iter().map(|&x| x as u64).sum();
batch_result += batch_sum;
i = batch_end;
}
prop_assert_eq!(individual_result, batch_result,
"Batch processing with size {} should equal individual processing: {} vs {}",
batch_size, batch_result, individual_result);
}
});
}
#[cfg(test)]
mod integration_tests {
use super::*;
#[test]
fn mr_composition_priority_with_fairness() {
let cancel_limit = 5u32;
let mut cancel_streak = 0u32;
let dispatch_sequence = [
PriorityClass::Cancel,
PriorityClass::Cancel,
PriorityClass::Ready, PriorityClass::Cancel,
PriorityClass::Cancel,
PriorityClass::Cancel, PriorityClass::Ready, ];
let mut dispatched_ready_after_limit = false;
for priority in &dispatch_sequence {
match priority {
PriorityClass::Cancel => {
if cancel_streak < cancel_limit {
cancel_streak += 1;
} else {
panic!("Cancel work should not continue after hitting fairness limit");
}
}
_ => {
if cancel_streak == cancel_limit {
dispatched_ready_after_limit = true;
}
cancel_streak = 0;
}
}
}
assert!(
dispatched_ready_after_limit,
"Ready work should be dispatched after cancel limit"
);
}
#[test]
fn mr_validation_catches_scheduler_bugs() {
let high_priority = PriorityClass::Cancel;
let low_priority = PriorityClass::Ready;
assert!(
priority_to_order(high_priority) < priority_to_order(low_priority),
"Priority ordering should prevent inversion"
);
let mut queue = VecDeque::new();
queue.push_back(42u16);
queue.push_back(42u16);
let first = queue.pop_front().unwrap();
let second = queue.pop_front().unwrap();
if first == second {
println!(
"Detected potential work duplication: {} == {}",
first, second
);
}
}
}