#[cfg(test)]
use proptest::prelude::*;
#[derive(Debug, Clone, PartialEq)]
pub struct MockMutex {
pub locked: bool,
pub owner: Option<ThreadId>,
pub waiters: Vec<ThreadId>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ThreadId(pub u64);
#[derive(Debug, Clone, PartialEq)]
pub struct MockSemaphore {
pub permits: usize,
pub max_permits: usize,
pub waiters: Vec<ThreadId>,
pub acquired_permits: Vec<(ThreadId, usize)>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct MockBroadcast {
pub messages: Vec<MockMessage>,
pub receivers: Vec<MockReceiver>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct MockMessage {
pub sequence: u64,
pub content: String,
pub timestamp: u64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct MockReceiver {
pub id: u64,
pub last_seen: u64,
pub lag_count: u64,
pub lag_bound: u64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct MockWatch<T> {
pub value: T,
pub version: u64,
pub pending_updates: Vec<T>,
pub subscribers: Vec<ThreadId>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct MockNotify {
pub waiters: Vec<ThreadId>,
pub notify_count: u64,
pub woken_threads: Vec<ThreadId>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct MockPool<T> {
pub resources: Vec<T>,
pub reserved: Vec<(ThreadId, T)>,
pub total_capacity: usize,
}
#[derive(Debug, Clone, PartialEq)]
pub struct MockOnceCell<T> {
pub value: Option<T>,
pub initialized: bool,
pub init_attempts: Vec<ThreadId>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct MockEpoch {
pub epoch_number: u64,
pub protected_objects: Vec<ObjectId>,
pub reclaimed_objects: Vec<ObjectId>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ObjectId(pub u64);
#[derive(Debug, Clone, PartialEq)]
pub struct MockPriorityLane {
pub priority: u8,
pub tasks: Vec<MockTask>,
pub processed: Vec<MockTask>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct MockTask {
pub id: u64,
pub priority: u8,
pub deadline: u64,
pub work_amount: u32,
}
#[derive(Debug, Clone, PartialEq)]
pub struct MockWorkStealer {
pub local_queue: Vec<MockTask>,
pub stealer_id: u64,
pub stolen_count: u64,
pub total_work: u64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct MockEdfScheduler {
pub tasks: Vec<MockTask>,
pub scheduled_order: Vec<u64>,
pub current_time: u64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct MockIntrusiveHeap {
pub nodes: Vec<HeapNode>,
pub size: usize,
}
#[derive(Debug, Clone, PartialEq, PartialOrd)]
pub struct HeapNode {
pub key: u64,
pub value: String,
pub index: usize,
}
impl MockMutex {
pub fn new() -> Self {
Self {
locked: false,
owner: None,
waiters: Vec::new(),
}
}
pub fn acquire(&mut self, thread_id: ThreadId) -> bool {
if !self.locked {
self.locked = true;
self.owner = Some(thread_id);
true
} else {
self.waiters.push(thread_id);
false
}
}
pub fn release(&mut self, thread_id: ThreadId) -> bool {
if self.locked && self.owner == Some(thread_id) {
self.locked = false;
self.owner = None;
if let Some(next_thread) = self.waiters.drain(0..1).next() {
self.locked = true;
self.owner = Some(next_thread);
}
true
} else {
false
}
}
pub fn can_reacquire(&self, thread_id: ThreadId) -> bool {
!self.locked || self.owner == Some(thread_id)
}
}
impl MockSemaphore {
pub fn new(permits: usize) -> Self {
Self {
permits,
max_permits: permits,
waiters: Vec::new(),
acquired_permits: Vec::new(),
}
}
pub fn acquire(&mut self, thread_id: ThreadId, count: usize) -> bool {
if self.permits >= count {
self.permits -= count;
self.acquired_permits.push((thread_id, count));
true
} else {
self.waiters.push(thread_id);
false
}
}
pub fn release(&mut self, thread_id: ThreadId, count: usize) -> bool {
if let Some(pos) = self
.acquired_permits
.iter()
.position(|(t, c)| *t == thread_id && *c == count)
{
self.acquired_permits.remove(pos);
self.permits += count;
while let Some(waiter) = self.waiters.pop() {
if self.permits >= 1 {
self.permits -= 1;
self.acquired_permits.push((waiter, 1));
} else {
self.waiters.push(waiter);
break;
}
}
true
} else {
false
}
}
pub fn conservation_holds(&self) -> bool {
let acquired_total: usize = self.acquired_permits.iter().map(|(_, c)| *c).sum();
self.permits + acquired_total == self.max_permits
}
}
impl MockBroadcast {
pub fn new() -> Self {
Self {
messages: Vec::new(),
receivers: Vec::new(),
}
}
pub fn send(&mut self, content: String) {
let sequence = self.messages.len() as u64;
let message = MockMessage {
sequence,
content,
timestamp: sequence,
};
self.messages.push(message);
for receiver in &mut self.receivers {
if receiver.last_seen < sequence {
receiver.lag_count = sequence - receiver.last_seen;
}
}
}
pub fn add_receiver(&mut self, lag_bound: u64) -> u64 {
let id = self.receivers.len() as u64;
self.receivers.push(MockReceiver {
id,
last_seen: self.messages.len() as u64,
lag_count: 0,
lag_bound,
});
id
}
pub fn receive(&mut self, receiver_id: u64) -> Option<MockMessage> {
if let Some(receiver) = self.receivers.iter_mut().find(|r| r.id == receiver_id) {
if receiver.last_seen < self.messages.len() as u64 {
let message = self.messages[receiver.last_seen as usize].clone();
receiver.last_seen += 1;
receiver.lag_count = receiver.lag_count.saturating_sub(1);
Some(message)
} else {
None
}
} else {
None
}
}
pub fn lag_bounds_monotonic(&self) -> bool {
for receiver in &self.receivers {
if receiver.lag_count > receiver.lag_bound {
return false;
}
}
true
}
}
impl<T: Clone> MockWatch<T> {
pub fn new(initial_value: T) -> Self {
Self {
value: initial_value,
version: 0,
pending_updates: Vec::new(),
subscribers: Vec::new(),
}
}
pub fn update(&mut self, new_value: T) {
self.pending_updates.push(new_value);
}
pub fn coalesce(&mut self) {
if let Some(final_value) = self.pending_updates.pop() {
self.value = final_value;
self.version += self.pending_updates.len() as u64 + 1;
self.pending_updates.clear();
}
}
pub fn subscribe(&mut self, thread_id: ThreadId) {
self.subscribers.push(thread_id);
}
pub fn coalescing_idempotent(&self, original_updates: &[T]) -> bool
where
T: PartialEq,
{
if let Some(last_update) = original_updates.last() {
&self.value == last_update && self.pending_updates.is_empty()
} else {
true
}
}
}
impl MockNotify {
pub fn new() -> Self {
Self {
waiters: Vec::new(),
notify_count: 0,
woken_threads: Vec::new(),
}
}
pub fn wait(&mut self, thread_id: ThreadId) {
self.waiters.push(thread_id);
}
pub fn notify_one(&mut self) {
if let Some(thread_id) = self.waiters.pop() {
self.woken_threads.push(thread_id);
self.notify_count += 1;
}
}
pub fn notify_all(&mut self) {
while let Some(thread_id) = self.waiters.pop() {
self.woken_threads.push(thread_id);
}
self.notify_count += 1;
}
pub fn single_waker_invariant_holds(&self) -> bool {
self.woken_threads.len() <= self.notify_count as usize
}
}
impl<T: Clone> MockPool<T> {
pub fn new(resources: Vec<T>) -> Self {
let capacity = resources.len();
Self {
resources,
reserved: Vec::new(),
total_capacity: capacity,
}
}
pub fn reserve(&mut self, thread_id: ThreadId) -> Option<T> {
if let Some(resource) = self.resources.pop() {
self.reserved.push((thread_id, resource.clone()));
Some(resource)
} else {
None
}
}
pub fn return_resource(&mut self, thread_id: ThreadId, resource: T) -> bool
where
T: PartialEq,
{
if let Some(pos) = self
.reserved
.iter()
.position(|(t, r)| *t == thread_id && *r == resource)
{
let (_, returned_resource) = self.reserved.remove(pos);
self.resources.push(returned_resource);
true
} else {
false
}
}
pub fn roundtrip_preserves_capacity(&self) -> bool {
self.resources.len() + self.reserved.len() == self.total_capacity
}
}
impl<T: Clone> MockOnceCell<T> {
pub fn new() -> Self {
Self {
value: None,
initialized: false,
init_attempts: Vec::new(),
}
}
pub fn get_or_init(&mut self, thread_id: ThreadId, init_value: T) -> T {
self.init_attempts.push(thread_id);
if !self.initialized {
self.value = Some(init_value.clone());
self.initialized = true;
init_value
} else {
self.value.as_ref().unwrap().clone()
}
}
pub fn single_init_guarantee_holds(&self) -> bool {
if self.initialized {
self.value.is_some()
} else {
self.value.is_none()
}
}
}
impl MockEpoch {
pub fn new(epoch_number: u64) -> Self {
Self {
epoch_number,
protected_objects: Vec::new(),
reclaimed_objects: Vec::new(),
}
}
pub fn protect(&mut self, object_id: ObjectId) {
if !self.reclaimed_objects.contains(&object_id) {
self.protected_objects.push(object_id);
}
}
pub fn reclaim(&mut self, object_id: ObjectId) -> bool {
if let Some(pos) = self
.protected_objects
.iter()
.position(|&id| id == object_id)
{
self.protected_objects.remove(pos);
self.reclaimed_objects.push(object_id);
true
} else {
false
}
}
pub fn reclamation_safety_holds(&self) -> bool {
self.reclaimed_objects.iter().all(|&reclaimed_id| {
!self.protected_objects.contains(&reclaimed_id)
})
}
}
impl MockPriorityLane {
pub fn new(priority: u8) -> Self {
Self {
priority,
tasks: Vec::new(),
processed: Vec::new(),
}
}
pub fn add_task(&mut self, task: MockTask) {
self.tasks.push(task);
}
pub fn process_next(&mut self) -> Option<MockTask> {
if let Some(task) = self.tasks.drain(0..1).next() {
self.processed.push(task.clone());
Some(task)
} else {
None
}
}
pub fn priority_ordering_preserved(lanes: &[Self]) -> bool {
let mut last_priority = 255;
for lane in lanes {
if lane.priority > last_priority {
return false; }
last_priority = lane.priority;
}
true
}
}
impl MockWorkStealer {
pub fn new(stealer_id: u64) -> Self {
Self {
local_queue: Vec::new(),
stealer_id,
stolen_count: 0,
total_work: 0,
}
}
pub fn add_work(&mut self, task: MockTask) {
self.total_work += task.work_amount as u64;
self.local_queue.push(task);
}
pub fn steal_work(&mut self, target: &mut Self) -> Option<MockTask> {
if let Some(task) = target.local_queue.pop() {
self.stolen_count += 1;
self.local_queue.push(task.clone());
Some(task)
} else {
None
}
}
pub fn fairness_coefficient(stealers: &[Self]) -> f64 {
if stealers.is_empty() {
return 1.0;
}
let total_work: u64 = stealers.iter().map(|s| s.total_work).sum();
let average_work = total_work as f64 / stealers.len() as f64;
let variance: f64 = stealers
.iter()
.map(|s| (s.total_work as f64 - average_work).powi(2))
.sum();
let coefficient_of_variation = if average_work > 0.0 {
(variance / stealers.len() as f64).sqrt() / average_work
} else {
0.0
};
1.0 / (1.0 + coefficient_of_variation)
}
}
impl MockEdfScheduler {
pub fn new() -> Self {
Self {
tasks: Vec::new(),
scheduled_order: Vec::new(),
current_time: 0,
}
}
pub fn add_task(&mut self, task: MockTask) {
self.tasks.push(task);
}
pub fn schedule_edf(&mut self) {
self.tasks.sort_by_key(|t| t.deadline);
self.scheduled_order.clear();
for task in &self.tasks {
self.scheduled_order.push(task.id);
}
}
pub fn edf_ordering_valid(&self) -> bool {
let mut last_deadline = 0;
for &task_id in &self.scheduled_order {
if let Some(task) = self.tasks.iter().find(|t| t.id == task_id) {
if task.deadline < last_deadline {
return false; }
last_deadline = task.deadline;
}
}
true
}
}
impl MockIntrusiveHeap {
pub fn new() -> Self {
Self {
nodes: Vec::new(),
size: 0,
}
}
pub fn insert(&mut self, key: u64, value: String) {
let index = self.size;
let node = HeapNode { key, value, index };
self.nodes.push(node);
self.size += 1;
self.heapify_up(index);
}
pub fn extract_min(&mut self) -> Option<HeapNode> {
if self.size == 0 {
return None;
}
let min_node = self.nodes[0].clone();
if self.size == 1 {
self.nodes.clear();
self.size = 0;
} else {
self.nodes[0] = self.nodes[self.size - 1].clone();
self.nodes[0].index = 0;
self.nodes.truncate(self.size - 1);
self.size -= 1;
self.heapify_down(0);
}
Some(min_node)
}
fn heapify_up(&mut self, mut index: usize) {
while index > 0 {
let parent = (index - 1) / 2;
if self.nodes[index].key >= self.nodes[parent].key {
break;
}
self.nodes.swap(index, parent);
self.nodes[index].index = index;
self.nodes[parent].index = parent;
index = parent;
}
}
fn heapify_down(&mut self, mut index: usize) {
loop {
let mut smallest = index;
let left = 2 * index + 1;
let right = 2 * index + 2;
if left < self.size && self.nodes[left].key < self.nodes[smallest].key {
smallest = left;
}
if right < self.size && self.nodes[right].key < self.nodes[smallest].key {
smallest = right;
}
if smallest == index {
break;
}
self.nodes.swap(index, smallest);
self.nodes[index].index = index;
self.nodes[smallest].index = smallest;
index = smallest;
}
}
pub fn min_key_invariant_holds(&self) -> bool {
for i in 0..self.size {
let left = 2 * i + 1;
let right = 2 * i + 2;
if left < self.size && self.nodes[i].key > self.nodes[left].key {
return false;
}
if right < self.size && self.nodes[i].key > self.nodes[right].key {
return false;
}
}
true
}
}
#[test]
fn test_mr_mutex_acquire_release_symmetry() {
proptest!(|(
thread_ids: Vec<u64>,
cycles in 1u32..=5
)| {
if thread_ids.is_empty() {
return Ok(());
}
let thread_id = ThreadId(thread_ids[0]);
let mut mutex = MockMutex::new();
for _ in 0..cycles {
let acquire_result = mutex.acquire(thread_id);
prop_assert!(acquire_result, "Mutex acquire should succeed for owner");
let release_result = mutex.release(thread_id);
prop_assert!(release_result, "Mutex release should succeed for owner");
prop_assert!(mutex.can_reacquire(thread_id),
"Mutex should be reacquirable after release cycle");
}
});
}
#[test]
fn test_mr_semaphore_permit_conservation() {
proptest!(|(
initial_permits in 1usize..=20usize,
operations: Vec<(u64, bool, usize)> // (thread_id, is_acquire, count)
)| {
let mut semaphore = MockSemaphore::new(initial_permits);
prop_assert!(semaphore.conservation_holds(),
"Initial semaphore state should satisfy conservation");
let mut successful_acquisitions = Vec::new();
for (thread_id, is_acquire, count) in operations {
let thread = ThreadId(thread_id);
let count = count.max(1).min(initial_permits);
if is_acquire {
let success = semaphore.acquire(thread, count);
if success {
successful_acquisitions.push((thread, count));
}
} else if let Some(pos) = successful_acquisitions.iter()
.position(|(t, c)| *t == thread && *c == count) {
let (thread, count) = successful_acquisitions.remove(pos);
let release_success = semaphore.release(thread, count);
prop_assert!(release_success, "Release should succeed for valid acquisition");
}
prop_assert!(semaphore.conservation_holds(),
"Semaphore permit conservation should hold: permits={}, acquired={:?}, max={}",
semaphore.permits, semaphore.acquired_permits, semaphore.max_permits);
}
});
}
#[test]
fn test_mr_broadcast_lag_bound_monotonicity() {
proptest!(|(
messages: Vec<String>,
receiver_bounds: Vec<u64>,
receive_patterns: Vec<bool> // true = receive, false = skip
)| {
if receiver_bounds.is_empty() || messages.is_empty() {
return Ok(());
}
let mut broadcast = MockBroadcast::new();
let mut receiver_ids = Vec::new();
for &bound in &receiver_bounds {
let bound = bound.max(1).min(100); let id = broadcast.add_receiver(bound);
receiver_ids.push(id);
}
for message in &messages {
broadcast.send(message.clone());
prop_assert!(broadcast.lag_bounds_monotonic(),
"Broadcast lag bounds should be monotonic after send");
}
for (i, &should_receive) in receive_patterns.iter().enumerate() {
if should_receive && !receiver_ids.is_empty() {
let receiver_id = receiver_ids[i % receiver_ids.len()];
let _ = broadcast.receive(receiver_id);
prop_assert!(broadcast.lag_bounds_monotonic(),
"Broadcast lag bounds should be monotonic after receive");
}
}
});
}
#[test]
fn test_mr_watch_coalescing_idempotency() {
proptest!(|(
initial_value: i32,
updates: Vec<i32>,
thread_ids: Vec<u64>
)| {
if updates.is_empty() {
return Ok(());
}
let mut watch = MockWatch::new(initial_value);
for &thread_id in &thread_ids {
watch.subscribe(ThreadId(thread_id));
}
let original_updates = updates.clone();
for update in updates {
watch.update(update);
}
watch.coalesce();
prop_assert!(watch.coalescing_idempotent(&original_updates),
"Watch coalescing should preserve final value: expected={:?}, actual={}",
original_updates.last(), watch.value);
let value_after_first = watch.value;
let version_after_first = watch.version;
watch.coalesce();
prop_assert_eq!(watch.value, value_after_first,
"Multiple coalescing should be idempotent");
prop_assert_eq!(watch.version, version_after_first,
"Version should not change on redundant coalescing");
});
}
#[test]
fn test_mr_notify_single_waker_invariant() {
proptest!(|(
waiters: Vec<u64>,
notify_operations: Vec<bool> // true = notify_one, false = notify_all
)| {
let mut notify = MockNotify::new();
for &waiter_id in &waiters {
notify.wait(ThreadId(waiter_id));
}
let initial_waiters = notify.waiters.len();
for &is_notify_one in ¬ify_operations {
if is_notify_one {
notify.notify_one();
} else {
notify.notify_all();
break; }
prop_assert!(notify.single_waker_invariant_holds(),
"Notify single waker invariant should hold: woken={}, notify_count={}",
notify.woken_threads.len(), notify.notify_count);
}
prop_assert!(notify.woken_threads.len() <= initial_waiters,
"Woken threads should not exceed initial waiters");
});
}
#[test]
fn test_mr_pool_reservation_return_roundtrip() {
proptest!(|(
resources: Vec<String>,
operations: Vec<(u64, bool)> // (thread_id, is_reserve)
)| {
if resources.is_empty() {
return Ok(());
}
let mut pool = MockPool::new(resources);
let mut reservations = Vec::new();
prop_assert!(pool.roundtrip_preserves_capacity(),
"Initial pool should preserve capacity");
for (thread_id, is_reserve) in operations {
let thread = ThreadId(thread_id);
if is_reserve {
if let Some(resource) = pool.reserve(thread) {
reservations.push((thread, resource));
}
} else if let Some((thread, resource)) = reservations.pop() {
let return_success = pool.return_resource(thread, resource);
prop_assert!(return_success, "Return should succeed for valid reservation");
}
prop_assert!(pool.roundtrip_preserves_capacity(),
"Pool roundtrip should preserve capacity: available={}, reserved={}, total={}",
pool.resources.len(), pool.reserved.len(), pool.total_capacity);
}
});
}
#[test]
fn test_mr_once_cell_single_init_guarantee() {
proptest!(|(
init_attempts: Vec<(u64, String)> // (thread_id, init_value)
)| {
if init_attempts.is_empty() {
return Ok(());
}
let mut once_cell = MockOnceCell::new();
let mut results = Vec::new();
for (thread_id, init_value) in &init_attempts {
let thread = ThreadId(*thread_id);
let result = once_cell.get_or_init(thread, init_value.clone());
results.push(result);
prop_assert!(once_cell.single_init_guarantee_holds(),
"OnceCell single init guarantee should hold after attempt");
}
if let Some(first_result) = results.first() {
for result in &results {
prop_assert_eq!(result, first_result,
"All init attempts should return same value");
}
}
if let Some((_, first_init_value)) = init_attempts.first() {
prop_assert_eq!(once_cell.value.as_ref().unwrap(), first_init_value,
"OnceCell should be initialized with first attempt value");
}
});
}
#[test]
fn test_mr_epoch_reclamation_safety() {
proptest!(|(
epoch_number: u64,
objects: Vec<u64>,
operations: Vec<bool> // true = protect, false = reclaim
)| {
let mut epoch = MockEpoch::new(epoch_number);
let mut protected_objects = Vec::new();
for (i, &is_protect) in operations.iter().enumerate() {
if i >= objects.len() {
break;
}
let object_id = ObjectId(objects[i]);
if is_protect {
epoch.protect(object_id);
if !protected_objects.contains(&object_id) {
protected_objects.push(object_id);
}
} else {
let reclaim_success = epoch.reclaim(object_id);
if reclaim_success {
prop_assert!(protected_objects.contains(&object_id),
"Reclamation should only succeed for protected objects");
}
}
prop_assert!(epoch.reclamation_safety_holds(),
"Epoch reclamation safety should hold after operation");
}
});
}
#[test]
fn test_mr_priority_lane_ordering() {
proptest!(|(
lane_priorities: Vec<u8>,
tasks_per_lane in 1usize..=5usize
)| {
if lane_priorities.is_empty() {
return Ok(());
}
let mut lanes: Vec<MockPriorityLane> = lane_priorities.iter()
.map(|&priority| MockPriorityLane::new(priority))
.collect();
for (i, lane) in lanes.iter_mut().enumerate() {
for j in 0..tasks_per_lane {
let task = MockTask {
id: (i * tasks_per_lane + j) as u64,
priority: lane.priority,
deadline: 100,
work_amount: 1,
};
lane.add_task(task);
}
}
lanes.sort_by(|a, b| b.priority.cmp(&a.priority));
let mut processed_tasks = Vec::new();
loop {
let mut made_progress = false;
for lane in &mut lanes {
if let Some(task) = lane.process_next() {
processed_tasks.push(task);
made_progress = true;
break; }
}
if !made_progress {
break;
}
}
prop_assert!(MockPriorityLane::priority_ordering_preserved(&lanes),
"Priority lane ordering should be preserved");
for i in 1..processed_tasks.len() {
prop_assert!(
processed_tasks[i-1].priority >= processed_tasks[i].priority,
"Processed tasks should maintain priority order: task[{}].priority={} >= task[{}].priority={}",
i-1, processed_tasks[i-1].priority, i, processed_tasks[i].priority
);
}
});
}
#[test]
fn test_mr_work_stealing_fairness() {
proptest!(|(
stealer_count in 2usize..=8usize,
total_tasks in 10usize..=50usize,
work_amounts: Vec<u32>
)| {
if work_amounts.is_empty() {
return Ok(());
}
let mut stealers: Vec<MockWorkStealer> = (0..stealer_count)
.map(|i| MockWorkStealer::new(i as u64))
.collect();
for (i, &work_amount) in work_amounts.iter().take(total_tasks).enumerate() {
let task = MockTask {
id: i as u64,
priority: 0,
deadline: 100,
work_amount,
};
let stealer_idx = i % stealer_count;
stealers[stealer_idx].add_work(task);
}
for _ in 0..(total_tasks / 4) {
let stealer_idx = 0; let target_idx = 1;
if stealer_idx < stealers.len() && target_idx < stealers.len() && stealer_idx != target_idx {
let (stealer_part, target_part) = if stealer_idx < target_idx {
let (left, right) = stealers.split_at_mut(target_idx);
(&mut left[stealer_idx], &mut right[0])
} else {
let (left, right) = stealers.split_at_mut(stealer_idx);
(&mut right[0], &mut left[target_idx])
};
let _ = stealer_part.steal_work(target_part);
}
}
let fairness = MockWorkStealer::fairness_coefficient(&stealers);
prop_assert!(fairness >= 0.0 && fairness <= 1.0,
"Fairness coefficient should be between 0 and 1: {}", fairness);
let total_work_final: u64 = stealers.iter()
.map(|s| s.total_work)
.sum();
let expected_total: u64 = work_amounts.iter().take(total_tasks)
.map(|&w| w as u64)
.sum();
prop_assert_eq!(total_work_final, expected_total,
"Work stealing should preserve total work");
});
}
#[test]
fn test_mr_edf_deadline_ordering() {
proptest!(|(
tasks_data: Vec<(u64, u64)> // (task_id, deadline)
)| {
if tasks_data.len() < 2 {
return Ok(());
}
let mut scheduler = MockEdfScheduler::new();
for (task_id, deadline) in &tasks_data {
let task = MockTask {
id: *task_id,
priority: 0,
deadline: *deadline,
work_amount: 1,
};
scheduler.add_task(task);
}
scheduler.schedule_edf();
prop_assert!(scheduler.edf_ordering_valid(),
"EDF scheduler should maintain deadline ordering");
prop_assert_eq!(scheduler.scheduled_order.len(), tasks_data.len(),
"All tasks should be scheduled");
let mut scheduled_set = std::collections::HashSet::new();
for &task_id in &scheduler.scheduled_order {
prop_assert!(scheduled_set.insert(task_id),
"Each task should be scheduled exactly once");
}
});
}
#[test]
fn test_mr_intrusive_heap_min_key_invariant() {
proptest!(|(
operations: Vec<(bool, u64, String)> // (is_insert, key, value)
)| {
let mut heap = MockIntrusiveHeap::new();
for (is_insert, key, value) in operations {
if is_insert {
heap.insert(key, value);
} else {
let _ = heap.extract_min();
}
prop_assert!(heap.min_key_invariant_holds(),
"Intrusive heap min-key invariant should hold after operation");
}
let mut extracted_keys = Vec::new();
while let Some(node) = heap.extract_min() {
extracted_keys.push(node.key);
prop_assert!(heap.min_key_invariant_holds(),
"Min-key invariant should hold during extraction");
}
for i in 1..extracted_keys.len() {
prop_assert!(extracted_keys[i-1] <= extracted_keys[i],
"Extracted keys should be in non-decreasing order: {} <= {}",
extracted_keys[i-1], extracted_keys[i]);
}
});
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_mock_implementations() {
let mut mutex = MockMutex::new();
let thread = ThreadId(1);
assert!(mutex.acquire(thread));
assert!(mutex.release(thread));
assert!(mutex.can_reacquire(thread));
let mut semaphore = MockSemaphore::new(5);
let thread = ThreadId(1);
assert!(semaphore.acquire(thread, 2));
assert!(semaphore.conservation_holds());
assert!(semaphore.release(thread, 2));
assert!(semaphore.conservation_holds());
let mut heap = MockIntrusiveHeap::new();
heap.insert(10, "ten".to_string());
heap.insert(5, "five".to_string());
heap.insert(15, "fifteen".to_string());
assert!(heap.min_key_invariant_holds());
let min = heap.extract_min().unwrap();
assert_eq!(min.key, 5);
assert!(heap.min_key_invariant_holds());
}
}