#![allow(clippy::all)]
use crate::lab::LabConfig;
use crate::lab::runtime::LabRuntime;
use crate::sync::mutex::{LockError, Mutex, TryLockError};
use crate::types::Budget;
use crate::util::ArenaIndex;
use crate::{Cx, RegionId, TaskId};
use proptest::prelude::*;
use std::panic::{AssertUnwindSafe, catch_unwind};
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq)]
struct TestData {
value: u32,
counter: u64,
}
impl Default for TestData {
fn default() -> Self {
Self {
value: 42,
counter: 0,
}
}
}
fn panic_injector(should_panic: bool, panic_message: &str) {
assert!(!should_panic, "{}", panic_message);
}
fn create_test_context(region_id: u32, task_id: u32) -> Cx {
Cx::new(
RegionId::from_arena(ArenaIndex::new(region_id, 0)),
TaskId::from_arena(ArenaIndex::new(task_id, 0)),
Budget::INFINITE,
)
}
#[test]
fn mr1_panic_poisoning_consistency() {
proptest!(|(
initial_value in 0u32..1000,
panic_after_operations in 0usize..5,
num_subsequent_accesses in 1usize..10
)| {
let mutex = Arc::new(Mutex::new(TestData {
value: initial_value,
counter: 0,
}));
let mutex_clone = Arc::clone(&mutex);
let handle = std::thread::spawn(move || {
let cx = create_test_context(1, 1);
futures_lite::future::block_on(async {
let mut guard = mutex_clone.lock(&cx).await.expect("initial lock should succeed");
for _ in 0..panic_after_operations {
guard.counter += 1;
guard.value = guard.value.wrapping_add(1);
}
panic_injector(true, "deliberate panic to test poisoning");
});
});
let _ = handle.join();
for i in 0..num_subsequent_accesses {
let cx = create_test_context(2, i as u32 + 2);
let try_result = mutex.try_lock();
prop_assert!(matches!(try_result, Err(TryLockError::Poisoned)),
"try_lock attempt {} should return Poisoned, got {:?}", i, try_result);
let lock_result = futures_lite::future::block_on(async {
mutex.lock(&cx).await
});
match lock_result {
Err(LockError::Poisoned) => {
}
other => {
prop_assert!(false, "async lock attempt {} should return Poisoned, got {:?}", i, other);
}
}
prop_assert!(mutex.is_poisoned(), "is_poisoned() should return true after panic");
}
{
let mut mutex_owned = Arc::try_unwrap(mutex).expect("should be sole owner now");
let get_mut_result = catch_unwind(AssertUnwindSafe(|| {
let _ = mutex_owned.get_mut();
}));
prop_assert!(get_mut_result.is_err(), "get_mut should panic on poisoned mutex");
let into_inner_result = catch_unwind(AssertUnwindSafe(|| {
let _ = mutex_owned.into_inner();
}));
prop_assert!(into_inner_result.is_err(), "into_inner should panic on poisoned mutex");
}
});
}
#[test]
fn mr2_cancel_non_poisoning() {
proptest!(|(
initial_value in 0u32..1000,
operations_before_cancel in 0usize..3,
cancel_during_wait in prop::bool::ANY
)| {
let mutex = Arc::new(Mutex::new(TestData {
value: initial_value,
counter: 0,
}));
let _lab = LabRuntime::new(LabConfig::default());
if cancel_during_wait {
futures_lite::future::block_on(async {
let mutex_clone = Arc::clone(&mutex);
let cx1 = create_test_context(1, 1);
let cx2 = create_test_context(1, 2);
let _guard1 = mutex.lock(&cx1).await.expect("first lock should succeed");
let lock_future = mutex_clone.lock(&cx2);
cx2.set_cancel_requested(true);
let result = lock_future.await;
prop_assert!(matches!(result, Err(LockError::Cancelled)),
"cancelled wait should return Cancelled, got {:?}", result);
drop(_guard1);
Ok::<(), TestCaseError>(())
})?;
} else {
futures_lite::future::block_on(async {
let cx = create_test_context(1, 1);
let mut guard = mutex.lock(&cx).await.expect("lock should succeed");
for _ in 0..operations_before_cancel {
guard.counter += 1;
guard.value = guard.value.wrapping_add(1);
}
cx.set_cancel_requested(true);
guard.value = guard.value.wrapping_mul(2);
drop(guard);
Ok::<(), TestCaseError>(())
})?;
}
prop_assert!(!mutex.is_poisoned(), "mutex should not be poisoned after cancel");
let cx = create_test_context(2, 1);
let try_result = mutex.try_lock();
prop_assert!(try_result.is_ok(), "try_lock should succeed after cancel, got {:?}", try_result);
drop(try_result);
let _lab2 = LabRuntime::new(LabConfig::default());
let lock_result = futures_lite::future::block_on(async {
mutex.lock(&cx).await
});
match lock_result {
Ok(_guard) => {
}
other => {
prop_assert!(false, "async lock should succeed after cancel, got {:?}", other);
}
}
});
}
#[test]
fn mr3_poison_recovery() {
proptest!(|(
initial_value in 0u32..1000,
operations_before_panic in 0usize..5,
final_operation_value in 0u32..100
)| {
let mutex = Arc::new(Mutex::new(TestData {
value: initial_value,
counter: 0,
}));
let _expected_state = {
let cx = create_test_context(1, 1);
let _lab = LabRuntime::new(LabConfig::default());
futures_lite::future::block_on(async {
let mut guard = mutex.lock(&cx).await.expect("initial lock should succeed");
for i in 0..operations_before_panic {
guard.counter += 1;
guard.value = guard.value.wrapping_add(i as u32);
}
guard.value = guard.value.wrapping_add(final_operation_value);
TestData {
value: guard.value,
counter: guard.counter,
}
})
};
let mutex_clone = Arc::clone(&mutex);
let handle = std::thread::spawn(move || {
let cx = create_test_context(2, 1);
let _lab = LabRuntime::new(LabConfig::default());
futures_lite::future::block_on(async {
let mut guard = mutex_clone.lock(&cx).await.expect("lock for poison should succeed");
for i in 0..operations_before_panic {
guard.counter += 1;
guard.value = guard.value.wrapping_add(i as u32);
}
guard.value = guard.value.wrapping_add(final_operation_value);
panic!("deliberate panic for poison recovery test");
});
});
let _ = handle.join().expect_err("should panic");
prop_assert!(mutex.is_poisoned(), "mutex should be poisoned");
let poison_check_1 = mutex.try_lock();
prop_assert!(matches!(poison_check_1, Err(TryLockError::Poisoned)),
"poison check 1 should return Poisoned");
let poison_check_2 = mutex.try_lock();
prop_assert!(matches!(poison_check_2, Err(TryLockError::Poisoned)),
"poison check 2 should return Poisoned (consistency)");
prop_assert!(mutex.is_poisoned(), "poison state should remain stable");
});
}
#[test]
fn mr4_concurrent_poison_consistency() {
proptest!(|(
num_waiters in 2usize..8,
operations_before_poison in 0usize..3,
stagger_delay_ms in 0u64..10
)| {
let mutex = Arc::new(Mutex::new(TestData {
value: 100,
counter: 0,
}));
let _lab = LabRuntime::new(LabConfig::default());
let waiter_handles = Arc::new(std::sync::Mutex::new(Vec::new()));
let waiter_results = Arc::new(std::sync::Mutex::new(Vec::new()));
let barrier = Arc::new(std::sync::Barrier::new(num_waiters + 1));
for i in 0..num_waiters {
let mutex_clone = Arc::clone(&mutex);
let results_clone = Arc::clone(&waiter_results);
let barrier_clone = Arc::clone(&barrier);
let handle = std::thread::spawn(move || {
barrier_clone.wait();
if i > 0 {
std::thread::sleep(Duration::from_millis(stagger_delay_ms * i as u64));
}
let cx = create_test_context(i as u32 + 10, i as u32 + 10);
let result = futures_lite::future::block_on(async move {
mutex_clone.lock(&cx).await.map(|_| ())
});
results_clone.lock().unwrap().push((i, result));
});
waiter_handles.lock().unwrap().push(handle);
}
let mutex_for_poison = Arc::clone(&mutex);
let poison_handle = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(5));
let cx = create_test_context(1, 1);
futures_lite::future::block_on(async {
let mut guard = mutex_for_poison.lock(&cx).await.expect("poison thread should lock");
for _ in 0..operations_before_poison {
guard.counter += 1;
guard.value = guard.value.wrapping_add(1);
}
panic!("deliberate panic to test concurrent poison consistency");
});
});
barrier.wait();
let _ = poison_handle.join().expect_err("poison thread should panic");
for handle in waiter_handles.lock().unwrap().drain(..) {
let _ = handle.join();
}
let results = waiter_results.lock().unwrap();
let mut poison_count = 0;
let mut success_count = 0;
let mut cancel_count = 0;
let mut other_count = 0;
for (_waiter_id, result) in results.iter() {
match result {
Err(LockError::Poisoned) => poison_count += 1,
Ok(_) => success_count += 1,
Err(LockError::Cancelled) => cancel_count += 1,
Err(LockError::PolledAfterCompletion) => other_count += 1,
}
}
prop_assert!(success_count <= 1,
"at most 1 waiter should succeed before poison, got {} successes", success_count);
if success_count == 0 {
prop_assert!(poison_count == num_waiters,
"all {} waiters should see poison when no one succeeded, got {} poison",
num_waiters, poison_count);
} else {
prop_assert!(poison_count == num_waiters - 1,
"remaining {} waiters should see poison, got {} poison",
num_waiters - 1, poison_count);
}
prop_assert!(cancel_count == 0 && other_count == 0,
"no unexpected states: {} cancels, {} others", cancel_count, other_count);
prop_assert!(mutex.is_poisoned(), "mutex should be poisoned at end");
});
}
#[test]
fn mr_composite_cancel_during_poison_recovery() {
proptest!(|(
initial_value in 0u32..1000,
recovery_attempts in 1usize..5
)| {
let mutex = Arc::new(Mutex::new(TestData {
value: initial_value,
counter: 0,
}));
let mutex_clone = Arc::clone(&mutex);
let handle = std::thread::spawn(move || {
let cx = create_test_context(1, 1);
let _lab = LabRuntime::new(LabConfig::default());
futures_lite::future::block_on(async {
let _guard = mutex_clone.lock(&cx).await.expect("lock for poison should succeed");
panic!("poison for recovery test");
});
});
let _ = handle.join().expect_err("should panic");
prop_assert!(mutex.is_poisoned(), "mutex should be poisoned");
for i in 0..recovery_attempts {
let cx = create_test_context(2, i as u32 + 2);
let _lab = LabRuntime::new(LabConfig::default());
cx.set_cancel_requested(true);
let result = futures_lite::future::block_on(async {
mutex.lock(&cx).await
});
match result {
Err(LockError::Cancelled) => {
}
Err(LockError::Poisoned) => {
}
other => {
prop_assert!(false, "recovery attempt {} should return Cancelled or Poisoned, got {:?}", i, other);
}
}
prop_assert!(mutex.is_poisoned(), "poison state should persist through cancel attempts");
}
let try_result = mutex.try_lock();
prop_assert!(matches!(try_result, Err(TryLockError::Poisoned)),
"try_lock should still return Poisoned after cancel attempts");
});
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn integration_all_mrs_together() {
let mutex = Arc::new(Mutex::new(TestData::default()));
let mutex_clone = Arc::clone(&mutex);
let handle = std::thread::spawn(move || {
let cx = create_test_context(1, 1);
let _lab = LabRuntime::new(LabConfig::default());
futures_lite::future::block_on(async {
let _guard = mutex_clone.lock(&cx).await.expect("lock");
panic!("test poison");
});
});
let _ = handle.join().expect_err("should panic");
assert!(mutex.is_poisoned());
assert!(matches!(mutex.try_lock(), Err(TryLockError::Poisoned)));
let clean_mutex = Arc::new(Mutex::new(TestData::default()));
let cx = create_test_context(2, 2);
futures_lite::future::block_on(async {
let _guard = clean_mutex.lock(&cx).await.expect("clean lock");
cx.set_cancel_requested(true);
});
assert!(!clean_mutex.is_poisoned());
assert!(clean_mutex.try_lock().is_ok());
}
}