#![allow(unsafe_code)]
use parking_lot::Mutex as ParkingMutex;
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context, Poll, Waker};
use crate::cx::Cx;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LockError {
Poisoned,
Cancelled,
PolledAfterCompletion,
}
impl std::fmt::Display for LockError {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Poisoned => write!(f, "mutex poisoned"),
Self::Cancelled => write!(f, "mutex lock cancelled"),
Self::PolledAfterCompletion => write!(f, "mutex future polled after completion"),
}
}
}
impl std::error::Error for LockError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TryLockError {
Locked,
Poisoned,
}
impl std::fmt::Display for TryLockError {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Locked => write!(f, "mutex is locked"),
Self::Poisoned => write!(f, "mutex poisoned"),
}
}
}
impl std::error::Error for TryLockError {}
#[derive(Debug)]
pub struct Mutex<T> {
data: UnsafeCell<T>,
poisoned: AtomicBool,
state: ParkingMutex<MutexState>,
}
unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}
#[derive(Debug)]
struct MutexState {
locked: bool,
waiters: VecDeque<Waiter>,
granted_waiter: Option<u64>,
next_waiter_id: u64,
}
#[derive(Debug)]
struct Waiter {
waker: Waker,
id: u64,
}
impl<T> Mutex<T> {
#[inline]
#[must_use]
pub fn new(value: T) -> Self {
Self {
data: UnsafeCell::new(value),
poisoned: AtomicBool::new(false),
state: ParkingMutex::new(MutexState {
locked: false,
waiters: VecDeque::with_capacity(4),
granted_waiter: None,
next_waiter_id: 0,
}),
}
}
#[inline]
#[must_use]
pub fn is_poisoned(&self) -> bool {
self.poisoned.load(Ordering::Acquire)
}
#[inline]
#[must_use]
pub fn is_locked(&self) -> bool {
self.state.lock().locked
}
#[inline]
#[must_use]
pub fn waiters(&self) -> usize {
self.state.lock().waiters.len()
}
#[inline]
pub fn lock<'a, 'b>(&'a self, cx: &'b Cx) -> LockFuture<'a, 'b, T> {
LockFuture {
mutex: self,
cx,
waiter_id: None,
completed: false,
}
}
#[inline]
pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> {
let mut state = self.state.lock();
if self.is_poisoned() {
return Err(TryLockError::Poisoned);
}
if state.locked || state.granted_waiter.is_some() || !state.waiters.is_empty() {
return Err(TryLockError::Locked);
}
state.locked = true;
drop(state);
Ok(MutexGuard { mutex: self })
}
#[inline]
pub fn get_mut(&mut self) -> &mut T {
assert!(!self.is_poisoned(), "mutex is poisoned");
self.data.get_mut()
}
#[inline]
pub fn into_inner(self) -> T {
assert!(!self.is_poisoned(), "mutex is poisoned");
self.data.into_inner()
}
#[inline]
fn poison(&self) {
self.poisoned.store(true, Ordering::Release);
}
#[inline]
fn unlock(&self) {
let waker_to_wake = {
let mut state = self.state.lock();
state.locked = false;
if let Some(waiter) = state.waiters.pop_front() {
state.granted_waiter = Some(waiter.id);
Some(waiter.waker)
} else {
state.granted_waiter = None;
None
}
};
if let Some(waker) = waker_to_wake {
waker.wake();
}
}
}
impl<T: Default> Default for Mutex<T> {
#[inline]
fn default() -> Self {
Self::new(T::default())
}
}
pub struct LockFuture<'a, 'b, T> {
mutex: &'a Mutex<T>,
cx: &'b Cx,
waiter_id: Option<u64>,
completed: bool,
}
impl<T> LockFuture<'_, '_, T> {
#[inline]
fn grant_next_waiter(state: &mut MutexState) -> Option<Waker> {
if let Some(waiter) = state.waiters.pop_front() {
state.granted_waiter = Some(waiter.id);
Some(waiter.waker)
} else {
state.granted_waiter = None;
None
}
}
#[inline]
fn cleanup_waiter(&mut self) {
if let Some(waiter_id) = self.waiter_id.take() {
let waker_to_wake = {
let mut state = self.mutex.state.lock();
if state.granted_waiter == Some(waiter_id) {
state.granted_waiter = None;
if !state.locked {
Self::grant_next_waiter(&mut state)
} else {
None
}
} else {
let pos = state.waiters.iter().position(|w| w.id == waiter_id);
let is_head = pos == Some(0);
if let Some(p) = pos {
state.waiters.remove(p);
}
if !state.locked && state.granted_waiter.is_none() && is_head {
Self::grant_next_waiter(&mut state)
} else {
None
}
}
};
if let Some(waker) = waker_to_wake {
waker.wake();
}
}
}
}
impl<'a, T> Future for LockFuture<'a, '_, T> {
type Output = Result<MutexGuard<'a, T>, LockError>;
#[inline]
#[allow(clippy::if_not_else, clippy::option_if_let_else)]
fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
if self.completed {
return Poll::Ready(Err(LockError::PolledAfterCompletion));
}
if let Err(_e) = self.cx.checkpoint() {
self.completed = true;
self.cleanup_waiter();
return Poll::Ready(Err(LockError::Cancelled));
}
let mut state = self.mutex.state.lock();
if self.mutex.is_poisoned() {
self.completed = true;
drop(state);
self.cleanup_waiter();
return Poll::Ready(Err(LockError::Poisoned));
}
if let Some(waiter_id) = self.waiter_id {
if state.granted_waiter == Some(waiter_id) {
if !state.locked {
state.granted_waiter = None;
state.locked = true;
self.waiter_id = None;
self.completed = true;
return Poll::Ready(Ok(MutexGuard { mutex: self.mutex }));
}
state.granted_waiter = None;
let new_id = state.next_waiter_id;
state.next_waiter_id = state.next_waiter_id.wrapping_add(1);
state.waiters.push_front(Waiter {
waker: context.waker().clone(),
id: new_id,
});
drop(state);
self.waiter_id = Some(new_id);
return Poll::Pending;
}
}
if !state.locked && state.granted_waiter.is_none() && self.waiter_id.is_none() {
state.locked = true;
self.completed = true;
return Poll::Ready(Ok(MutexGuard { mutex: self.mutex }));
}
if let Some(waiter_id) = self.waiter_id {
if let Some(existing) = state.waiters.iter_mut().find(|w| w.id == waiter_id) {
if !existing.waker.will_wake(context.waker()) {
existing.waker.clone_from(context.waker());
}
} else {
let new_id = state.next_waiter_id;
state.next_waiter_id = state.next_waiter_id.wrapping_add(1);
state.waiters.push_front(Waiter {
waker: context.waker().clone(),
id: new_id,
});
drop(state);
self.waiter_id = Some(new_id);
return Poll::Pending;
}
} else {
let id = state.next_waiter_id;
state.next_waiter_id = state.next_waiter_id.wrapping_add(1);
state.waiters.push_back(Waiter {
waker: context.waker().clone(),
id,
});
drop(state);
self.waiter_id = Some(id);
return Poll::Pending;
}
drop(state);
Poll::Pending
}
}
impl<T> Drop for LockFuture<'_, '_, T> {
fn drop(&mut self) {
self.cleanup_waiter();
}
}
#[must_use = "guard will be immediately released if not held"]
pub struct MutexGuard<'a, T> {
mutex: &'a Mutex<T>,
}
unsafe impl<T: Send> Send for MutexGuard<'_, T> {}
unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}
impl<T: std::fmt::Debug> std::fmt::Debug for MutexGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MutexGuard").field("data", &**self).finish()
}
}
impl<T> Deref for MutexGuard<'_, T> {
type Target = T;
#[inline]
fn deref(&self) -> &T {
unsafe { &*self.mutex.data.get() }
}
}
impl<T> DerefMut for MutexGuard<'_, T> {
#[inline]
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.mutex.data.get() }
}
}
impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
if std::thread::panicking() {
self.mutex.poison();
}
self.mutex.unlock();
}
}
#[must_use = "guard will be immediately released if not held"]
pub struct OwnedMutexGuard<T> {
mutex: Arc<Mutex<T>>,
}
unsafe impl<T: Send> Send for OwnedMutexGuard<T> {}
unsafe impl<T: Sync> Sync for OwnedMutexGuard<T> {}
impl<T> OwnedMutexGuard<T> {
pub async fn lock(mutex: Arc<Mutex<T>>, cx: &Cx) -> Result<Self, LockError> {
let _borrowed_guard = std::mem::ManuallyDrop::new(mutex.as_ref().lock(cx).await?);
Ok(Self { mutex })
}
#[inline]
pub fn try_lock(mutex: Arc<Mutex<T>>) -> Result<Self, TryLockError> {
{
let mut state = mutex.state.lock();
if mutex.is_poisoned() {
return Err(TryLockError::Poisoned);
}
if state.locked || state.granted_waiter.is_some() || !state.waiters.is_empty() {
return Err(TryLockError::Locked);
}
state.locked = true;
}
Ok(Self { mutex })
}
}
impl<T> Deref for OwnedMutexGuard<T> {
type Target = T;
#[inline]
fn deref(&self) -> &T {
unsafe { &*self.mutex.data.get() }
}
}
impl<T> DerefMut for OwnedMutexGuard<T> {
#[inline]
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.mutex.data.get() }
}
}
impl<T> Drop for OwnedMutexGuard<T> {
fn drop(&mut self) {
if std::thread::panicking() {
self.mutex.poison();
}
self.mutex.unlock();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::conformance::{ConformanceTarget, LabRuntimeTarget, TestConfig};
use crate::runtime::yield_now;
use crate::test_utils::init_test_logging;
use crate::types::Budget;
use crate::util::ArenaIndex;
use crate::{RegionId, TaskId};
use serde_json::Value;
use std::sync::Mutex as StdMutex;
fn test_cx() -> Cx {
Cx::new(
RegionId::from_arena(ArenaIndex::new(0, 0)),
TaskId::from_arena(ArenaIndex::new(0, 0)),
Budget::INFINITE,
)
}
fn init_test(test_name: &str) {
init_test_logging();
crate::test_phase!(test_name);
}
fn poll_once<T, F: Future<Output = T> + Unpin>(future: &mut F) -> Option<T> {
let waker = Waker::noop();
let mut cx = Context::from_waker(waker);
match Pin::new(future).poll(&mut cx) {
Poll::Ready(v) => Some(v),
Poll::Pending => None,
}
}
fn poll_until_ready<T, F: Future<Output = T> + Unpin>(future: &mut F) -> T {
let waker = Waker::noop();
let mut cx = Context::from_waker(waker);
loop {
match Pin::new(&mut *future).poll(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => std::thread::yield_now(),
}
}
}
fn poll_pinned_until_ready<T, F: Future<Output = T>>(mut future: Pin<&mut F>) -> T {
let waker = Waker::noop();
let mut cx = Context::from_waker(waker);
loop {
match future.as_mut().poll(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => std::thread::yield_now(),
}
}
}
fn lock_blocking<'a, T>(mutex: &'a Mutex<T>, cx: &Cx) -> MutexGuard<'a, T> {
let mut fut = mutex.lock(cx);
poll_until_ready(&mut fut).expect("lock failed")
}
#[test]
fn new_mutex_is_unlocked() {
init_test("new_mutex_is_unlocked");
let mutex = Mutex::new(42);
let ok = mutex.try_lock().is_ok();
crate::assert_with_log!(ok, "mutex should start unlocked", true, ok);
crate::test_complete!("new_mutex_is_unlocked");
}
#[test]
fn lock_acquires_mutex() {
init_test("lock_acquires_mutex");
let cx = test_cx();
let mutex = Mutex::new(42);
let mut future = mutex.lock(&cx);
let guard = poll_once(&mut future)
.expect("should complete immediately")
.expect("lock failed");
crate::assert_with_log!(*guard == 42, "guard should read value", 42, *guard);
crate::test_complete!("lock_acquires_mutex");
}
#[test]
fn test_mutex_try_lock_success() {
init_test("test_mutex_try_lock_success");
let mutex = Mutex::new(42);
let guard = mutex.try_lock().expect("should succeed");
crate::assert_with_log!(*guard == 42, "guard value", 42, *guard);
drop(guard);
crate::test_complete!("test_mutex_try_lock_success");
}
#[test]
fn test_mutex_try_lock_fail() {
init_test("test_mutex_try_lock_fail");
let cx = test_cx();
let mutex = Mutex::new(42);
let mut fut = mutex.lock(&cx);
let _guard = poll_once(&mut fut).expect("immediate").expect("lock");
let result = mutex.try_lock();
let is_locked = matches!(result, Err(TryLockError::Locked));
crate::assert_with_log!(is_locked, "should be locked", true, is_locked);
crate::test_complete!("test_mutex_try_lock_fail");
}
#[test]
fn test_mutex_cancel_waiting() {
init_test("test_mutex_cancel_waiting");
let cx = test_cx();
let mutex = Mutex::new(42);
let mut fut1 = mutex.lock(&cx);
let _guard = poll_once(&mut fut1).expect("immediate").expect("lock");
let cancel_cx = Cx::new(
RegionId::from_arena(ArenaIndex::new(0, 1)),
TaskId::from_arena(ArenaIndex::new(0, 1)),
Budget::INFINITE,
);
let mut fut2 = mutex.lock(&cancel_cx);
let pending = poll_once(&mut fut2).is_none();
crate::assert_with_log!(pending, "should be pending", true, pending);
cancel_cx.set_cancel_requested(true);
let result = poll_once(&mut fut2);
let cancelled = matches!(result, Some(Err(LockError::Cancelled)));
crate::assert_with_log!(cancelled, "should be cancelled", true, cancelled);
crate::test_complete!("test_mutex_cancel_waiting");
}
#[test]
fn test_mutex_no_queue_growth() {
init_test("test_mutex_no_queue_growth");
let cx = test_cx();
let mutex = Mutex::new(42);
let mut fut1 = mutex.lock(&cx);
let _guard = poll_once(&mut fut1).expect("immediate").expect("lock");
let mut fut2 = mutex.lock(&cx);
for _ in 0..100 {
let _ = poll_once(&mut fut2);
}
let waiters = mutex.waiters();
crate::assert_with_log!(waiters <= 1, "waiters bounded", true, waiters <= 1);
crate::test_complete!("test_mutex_no_queue_growth");
}
#[test]
fn test_mutex_get_mut() {
init_test("test_mutex_get_mut");
let mut mutex = Mutex::new(42);
*mutex.get_mut() = 100;
let value = *mutex.get_mut();
crate::assert_with_log!(value == 100, "get_mut works", 100, value);
crate::test_complete!("test_mutex_get_mut");
}
#[test]
fn test_mutex_into_inner() {
init_test("test_mutex_into_inner");
let mutex = Mutex::new(42);
let value = mutex.into_inner();
crate::assert_with_log!(value == 42, "into_inner works", 42, value);
crate::test_complete!("test_mutex_into_inner");
}
#[test]
fn test_mutex_drop_releases_lock() {
init_test("test_mutex_drop_releases_lock");
let cx = test_cx();
let mutex = Mutex::new(42);
{
let mut fut = mutex.lock(&cx);
let _guard = poll_once(&mut fut).expect("immediate").expect("lock");
}
let can_lock = mutex.try_lock().is_ok();
crate::assert_with_log!(can_lock, "should be unlocked", true, can_lock);
crate::test_complete!("test_mutex_drop_releases_lock");
}
#[test]
#[ignore = "stress test; run manually"]
fn stress_test_mutex_high_contention() {
init_test("stress_test_mutex_high_contention");
let threads = 8usize;
let iters = 2_000usize;
let mutex = Arc::new(Mutex::new(0usize));
let mut handles = Vec::with_capacity(threads);
for _ in 0..threads {
let mutex = Arc::clone(&mutex);
handles.push(std::thread::spawn(move || {
let cx = test_cx();
for _ in 0..iters {
let mut guard = lock_blocking(&mutex, &cx);
*guard += 1;
}
}));
}
for handle in handles {
handle.join().expect("thread join failed");
}
let final_value = *mutex.try_lock().expect("final lock failed");
let expected = threads * iters;
crate::assert_with_log!(
final_value == expected,
"final count matches",
expected,
final_value
);
crate::test_complete!("stress_test_mutex_high_contention");
}
#[test]
fn mutex_contention_under_lab_runtime() {
init_test("mutex_contention_under_lab_runtime");
let config = TestConfig::new()
.with_seed(0x6D57_E110)
.with_tracing(true)
.with_max_steps(20_000);
let mut runtime = LabRuntimeTarget::create_runtime(config);
let mutex = Arc::new(Mutex::new(0u32));
let checkpoints = Arc::new(StdMutex::new(Vec::<Value>::new()));
let (holder_value, waiter_value, final_value, checkpoints) =
LabRuntimeTarget::block_on(&mut runtime, async move {
let cx = Cx::current().expect("lab runtime should install a current Cx");
let holder_spawn_cx = cx.clone();
let waiter_spawn_cx = cx.clone();
let holder_mutex = Arc::clone(&mutex);
let holder_checkpoints = Arc::clone(&checkpoints);
let holder_task_cx = holder_spawn_cx.clone();
let holder =
LabRuntimeTarget::spawn(&holder_spawn_cx, Budget::INFINITE, async move {
let mut guard = holder_mutex
.lock(&holder_task_cx)
.await
.expect("holder lock should succeed");
*guard = 1;
let acquired = serde_json::json!({
"phase": "holder_acquired",
"value": *guard,
});
tracing::info!(event = %acquired, "mutex_lab_checkpoint");
holder_checkpoints.lock().unwrap().push(acquired);
yield_now().await;
yield_now().await;
let released = serde_json::json!({
"phase": "holder_releasing",
"value": *guard,
});
tracing::info!(event = %released, "mutex_lab_checkpoint");
holder_checkpoints.lock().unwrap().push(released);
*guard
});
yield_now().await;
let waiter_mutex = Arc::clone(&mutex);
let waiter_checkpoints = Arc::clone(&checkpoints);
let waiter_task_cx = waiter_spawn_cx.clone();
let waiter =
LabRuntimeTarget::spawn(&waiter_spawn_cx, Budget::INFINITE, async move {
let waiting = serde_json::json!({
"phase": "waiter_waiting",
});
tracing::info!(event = %waiting, "mutex_lab_checkpoint");
waiter_checkpoints.lock().unwrap().push(waiting);
let mut guard = waiter_mutex
.lock(&waiter_task_cx)
.await
.expect("waiter lock should succeed");
let observed = *guard;
let acquired = serde_json::json!({
"phase": "waiter_acquired",
"observed": observed,
});
tracing::info!(event = %acquired, "mutex_lab_checkpoint");
waiter_checkpoints.lock().unwrap().push(acquired);
*guard += 1;
let updated = serde_json::json!({
"phase": "waiter_updated",
"value": *guard,
});
tracing::info!(event = %updated, "mutex_lab_checkpoint");
waiter_checkpoints.lock().unwrap().push(updated);
*guard
});
let holder_outcome = holder.await;
crate::assert_with_log!(
matches!(holder_outcome, crate::types::Outcome::Ok(_)),
"holder task completes successfully",
true,
matches!(holder_outcome, crate::types::Outcome::Ok(_))
);
let crate::types::Outcome::Ok(holder_value) = holder_outcome else {
panic!("holder task should finish successfully");
};
let waiter_outcome = waiter.await;
crate::assert_with_log!(
matches!(waiter_outcome, crate::types::Outcome::Ok(_)),
"waiter task completes successfully",
true,
matches!(waiter_outcome, crate::types::Outcome::Ok(_))
);
let crate::types::Outcome::Ok(waiter_value) = waiter_outcome else {
panic!("waiter task should finish successfully");
};
let final_value = *mutex.try_lock().expect("final lock should succeed");
(
holder_value,
waiter_value,
final_value,
checkpoints.lock().unwrap().clone(),
)
});
assert_eq!(holder_value, 1);
assert_eq!(waiter_value, 2);
assert_eq!(final_value, 2);
assert!(
checkpoints
.iter()
.any(|event| event["phase"] == "holder_acquired"),
"holder acquisition checkpoint should be recorded"
);
assert!(
checkpoints
.iter()
.any(|event| event["phase"] == "waiter_acquired" && event["observed"] == 1),
"waiter should observe the holder's update"
);
assert!(
checkpoints
.iter()
.any(|event| event["phase"] == "waiter_updated" && event["value"] == 2),
"waiter update checkpoint should be recorded"
);
let violations = runtime.oracles.check_all(runtime.now());
assert!(
violations.is_empty(),
"mutex lab-runtime contention test should leave runtime invariants clean: {violations:?}"
);
}
#[test]
fn mutex_fifo_cancel_middle_preserves_order() {
init_test("mutex_fifo_cancel_middle_preserves_order");
let cx1 = test_cx();
let cx2 = Cx::new(
RegionId::from_arena(ArenaIndex::new(0, 2)),
TaskId::from_arena(ArenaIndex::new(0, 2)),
Budget::INFINITE,
);
let cx3 = test_cx();
let mutex = Mutex::new(0u32);
let mut fut_hold = mutex.lock(&cx1);
let guard = poll_once(&mut fut_hold).expect("immediate").expect("lock");
let mut fut1 = mutex.lock(&cx1);
let _ = poll_once(&mut fut1);
let mut fut2 = mutex.lock(&cx2);
let _ = poll_once(&mut fut2);
let mut fut3 = mutex.lock(&cx3);
let _ = poll_once(&mut fut3);
let waiters = mutex.waiters();
crate::assert_with_log!(waiters == 3, "3 waiters queued", 3usize, waiters);
cx2.set_cancel_requested(true);
let result2 = poll_once(&mut fut2);
let cancelled = matches!(result2, Some(Err(LockError::Cancelled)));
crate::assert_with_log!(cancelled, "middle cancelled", true, cancelled);
drop(guard);
let guard1 = poll_once(&mut fut1)
.expect("first acquires")
.expect("no error");
crate::assert_with_log!(true, "first waiter acquires", true, true);
let third_pending = poll_once(&mut fut3).is_none();
crate::assert_with_log!(third_pending, "third pending", true, third_pending);
drop(guard1);
crate::test_complete!("mutex_fifo_cancel_middle_preserves_order");
}
#[test]
fn mutex_guard_deref_mut() {
init_test("mutex_guard_deref_mut");
let cx = test_cx();
let mutex = Mutex::new(vec![1, 2, 3]);
let mut fut = mutex.lock(&cx);
let mut guard = poll_once(&mut fut).expect("immediate").expect("lock");
guard.push(4);
let len = guard.len();
crate::assert_with_log!(len == 4, "mutated via deref_mut", 4usize, len);
drop(guard);
let mut fut2 = mutex.lock(&cx);
let guard2 = poll_once(&mut fut2).expect("immediate").expect("lock");
let persisted = guard2.as_slice() == [1, 2, 3, 4];
crate::assert_with_log!(persisted, "mutation persisted", true, persisted);
crate::test_complete!("mutex_guard_deref_mut");
}
#[test]
fn mutex_is_locked_is_poisoned() {
init_test("mutex_is_locked_is_poisoned");
let cx = test_cx();
let mutex = Mutex::new(0);
let unlocked = !mutex.is_locked();
crate::assert_with_log!(unlocked, "starts unlocked", true, unlocked);
let not_poisoned = !mutex.is_poisoned();
crate::assert_with_log!(not_poisoned, "not poisoned", true, not_poisoned);
let mut fut = mutex.lock(&cx);
let _guard = poll_once(&mut fut).expect("immediate").expect("lock");
let locked = mutex.is_locked();
crate::assert_with_log!(locked, "locked after acquire", true, locked);
crate::test_complete!("mutex_is_locked_is_poisoned");
}
#[test]
fn drop_woken_future_passes_baton() {
init_test("drop_woken_future_passes_baton");
let cx = test_cx();
let mutex = Mutex::new(42);
let mut fut_hold = mutex.lock(&cx);
let guard = poll_once(&mut fut_hold).expect("immediate").expect("lock");
let mut fut_a = mutex.lock(&cx);
let _ = poll_once(&mut fut_a);
let mut fut_b = mutex.lock(&cx);
let _ = poll_once(&mut fut_b);
let waiters = mutex.waiters();
crate::assert_with_log!(waiters == 2, "2 waiters queued", 2usize, waiters);
drop(guard);
drop(fut_a);
let guard_b = poll_once(&mut fut_b)
.expect("should complete after baton pass")
.expect("no error");
crate::assert_with_log!(*guard_b == 42, "waiter B acquired", 42, *guard_b);
crate::test_complete!("drop_woken_future_passes_baton");
}
#[test]
fn try_lock_does_not_bypass_granted_waiter() {
init_test("try_lock_does_not_bypass_granted_waiter");
let cx = test_cx();
let mutex = Mutex::new(0u32);
let mut fut_hold = mutex.lock(&cx);
let guard = poll_once(&mut fut_hold).expect("immediate").expect("lock");
let mut fut_w = mutex.lock(&cx);
let _ = poll_once(&mut fut_w);
drop(guard);
let steal_blocked = matches!(mutex.try_lock(), Err(TryLockError::Locked));
crate::assert_with_log!(
steal_blocked,
"try_lock blocked by granted waiter",
true,
steal_blocked
);
let guard_w = poll_once(&mut fut_w)
.expect("should complete")
.expect("no error");
crate::assert_with_log!(
*guard_w == 0,
"granted waiter acquired before try_lock",
0u32,
*guard_w
);
crate::test_complete!("try_lock_does_not_bypass_granted_waiter");
}
#[test]
fn owned_try_lock_does_not_bypass_granted_waiter() {
init_test("owned_try_lock_does_not_bypass_granted_waiter");
let cx = test_cx();
let mutex = Arc::new(Mutex::new(9u32));
let mut fut_hold = mutex.as_ref().lock(&cx);
let guard = poll_once(&mut fut_hold).expect("immediate").expect("lock");
let mut fut_waiter = mutex.as_ref().lock(&cx);
let waiter_pending = poll_once(&mut fut_waiter).is_none();
crate::assert_with_log!(waiter_pending, "waiter queued", true, waiter_pending);
drop(guard);
let owned_blocked = matches!(
OwnedMutexGuard::try_lock(Arc::clone(&mutex)),
Err(TryLockError::Locked)
);
crate::assert_with_log!(
owned_blocked,
"owned try_lock blocked by granted waiter",
true,
owned_blocked
);
let waiter_guard = poll_once(&mut fut_waiter)
.expect("granted waiter acquires")
.expect("no error");
crate::assert_with_log!(
*waiter_guard == 9,
"granted waiter acquired before owned try_lock",
9u32,
*waiter_guard
);
crate::test_complete!("owned_try_lock_does_not_bypass_granted_waiter");
}
#[test]
fn cancel_head_waiter_does_not_skip_granted_predecessor() {
init_test("cancel_head_waiter_does_not_skip_granted_predecessor");
let cx1 = test_cx();
let cx2 = Cx::new(
RegionId::from_arena(ArenaIndex::new(0, 6)),
TaskId::from_arena(ArenaIndex::new(0, 6)),
Budget::INFINITE,
);
let cx3 = test_cx();
let mutex = Mutex::new(0u32);
let mut fut_hold = mutex.lock(&cx1);
let guard = poll_once(&mut fut_hold).expect("immediate").expect("lock");
let mut fut1 = mutex.lock(&cx1);
let _ = poll_once(&mut fut1);
let mut fut2 = mutex.lock(&cx2);
let _ = poll_once(&mut fut2);
let mut fut3 = mutex.lock(&cx3);
let _ = poll_once(&mut fut3);
drop(guard);
cx2.set_cancel_requested(true);
let result2 = poll_once(&mut fut2);
let cancelled = matches!(result2, Some(Err(LockError::Cancelled)));
crate::assert_with_log!(cancelled, "head waiter cancelled", true, cancelled);
let third_pending = poll_once(&mut fut3).is_none();
crate::assert_with_log!(
third_pending,
"third waiter stays pending behind granted predecessor",
true,
third_pending
);
let guard1 = poll_once(&mut fut1)
.expect("granted predecessor acquires")
.expect("no error");
crate::assert_with_log!(
*guard1 == 0,
"granted predecessor acquires first",
0u32,
*guard1
);
crate::test_complete!("cancel_head_waiter_does_not_skip_granted_predecessor");
}
#[test]
fn new_waiter_does_not_bypass_granted_waiter() {
init_test("new_waiter_does_not_bypass_granted_waiter");
let cx1 = test_cx();
let cx2 = test_cx();
let mutex = Mutex::new(7u32);
let mut fut_hold = mutex.lock(&cx1);
let guard = poll_once(&mut fut_hold).expect("immediate").expect("lock");
let mut older_waiter = mutex.lock(&cx1);
let older_pending = poll_once(&mut older_waiter).is_none();
crate::assert_with_log!(older_pending, "older waiter queued", true, older_pending);
drop(guard);
let mut newer_waiter = mutex.lock(&cx2);
let newer_pending = poll_once(&mut newer_waiter).is_none();
crate::assert_with_log!(
newer_pending,
"new waiter cannot bypass granted waiter",
true,
newer_pending
);
let older_guard = poll_once(&mut older_waiter)
.expect("older waiter acquires")
.expect("no error");
crate::assert_with_log!(
*older_guard == 7,
"older waiter acquires",
7u32,
*older_guard
);
drop(older_guard);
let newer_guard = poll_once(&mut newer_waiter)
.expect("newer waiter acquires after older waiter")
.expect("no error");
crate::assert_with_log!(
*newer_guard == 7,
"newer waiter acquires second",
7u32,
*newer_guard
);
crate::test_complete!("new_waiter_does_not_bypass_granted_waiter");
}
#[test]
fn test_owned_mutex_guard_try_lock() {
init_test("test_owned_mutex_guard_try_lock");
let mutex = Arc::new(Mutex::new(42_u32));
let mut guard =
OwnedMutexGuard::try_lock(Arc::clone(&mutex)).expect("try_lock should succeed");
crate::assert_with_log!(*guard == 42, "owned guard reads value", 42u32, *guard);
*guard = 100;
crate::assert_with_log!(*guard == 100, "owned guard writes value", 100u32, *guard);
let locked = OwnedMutexGuard::try_lock(Arc::clone(&mutex)).is_err();
crate::assert_with_log!(locked, "try_lock fails while held", true, locked);
drop(guard);
let guard2 = OwnedMutexGuard::try_lock(Arc::clone(&mutex)).expect("try_lock after drop");
crate::assert_with_log!(*guard2 == 100, "mutation persisted", 100u32, *guard2);
crate::test_complete!("test_owned_mutex_guard_try_lock");
}
#[test]
fn test_owned_mutex_guard_async_lock() {
init_test("test_owned_mutex_guard_async_lock");
let cx = test_cx();
let mutex = Arc::new(Mutex::new(0_u32));
let mut fut = std::pin::pin!(OwnedMutexGuard::lock(Arc::clone(&mutex), &cx));
let mut guard = poll_pinned_until_ready(fut.as_mut()).expect("async lock should succeed");
*guard = 99;
drop(guard);
let guard2 = OwnedMutexGuard::try_lock(Arc::clone(&mutex)).expect("try_lock after async");
crate::assert_with_log!(*guard2 == 99, "async mutation persisted", 99u32, *guard2);
crate::test_complete!("test_owned_mutex_guard_async_lock");
}
#[test]
fn test_mutex_default() {
init_test("test_mutex_default");
let mutex: Mutex<u32> = Mutex::default();
let guard = mutex.try_lock().expect("default mutex should be unlocked");
crate::assert_with_log!(*guard == 0, "default value", 0u32, *guard);
crate::test_complete!("test_mutex_default");
}
#[test]
fn mutex_poison_propagation_on_panic() {
init_test("mutex_poison_propagation_on_panic");
let mutex = Arc::new(Mutex::new(42_u32));
let m = Arc::clone(&mutex);
let handle = std::thread::spawn(move || {
let cx = test_cx();
let _guard = lock_blocking(&m, &cx);
panic!("deliberate panic to poison mutex");
});
let _ = handle.join();
let poisoned = mutex.is_poisoned();
crate::assert_with_log!(poisoned, "mutex should be poisoned", true, poisoned);
let try_result = mutex.try_lock();
let is_poisoned = matches!(try_result, Err(TryLockError::Poisoned));
crate::assert_with_log!(is_poisoned, "try_lock returns Poisoned", true, is_poisoned);
let cx = test_cx();
let mut fut = mutex.lock(&cx);
let lock_result = poll_once(&mut fut);
let lock_poisoned = matches!(lock_result, Some(Err(LockError::Poisoned)));
crate::assert_with_log!(lock_poisoned, "lock returns Poisoned", true, lock_poisoned);
crate::test_complete!("mutex_poison_propagation_on_panic");
}
#[test]
#[should_panic(expected = "mutex is poisoned")]
fn mutex_get_mut_panics_when_poisoned() {
let mutex = Arc::new(Mutex::new(42_u32));
let m = Arc::clone(&mutex);
let handle = std::thread::spawn(move || {
let cx = test_cx();
let _guard = lock_blocking(&m, &cx);
panic!("poison");
});
let _ = handle.join();
let mut mutex = Arc::try_unwrap(mutex).expect("sole owner");
let _ = mutex.get_mut();
}
#[test]
#[should_panic(expected = "mutex is poisoned")]
fn mutex_into_inner_panics_when_poisoned() {
let mutex = Arc::new(Mutex::new(42_u32));
let m = Arc::clone(&mutex);
let handle = std::thread::spawn(move || {
let cx = test_cx();
let _guard = lock_blocking(&m, &cx);
panic!("poison");
});
let _ = handle.join();
let mutex = Arc::try_unwrap(mutex).expect("sole owner");
let _ = mutex.into_inner();
}
#[test]
fn mutex_cancel_cleans_waiter_on_drop() {
init_test("mutex_cancel_cleans_waiter_on_drop");
let cx = test_cx();
let mutex = Mutex::new(0_u32);
let mut fut_hold = mutex.lock(&cx);
let _guard = poll_once(&mut fut_hold).expect("immediate").expect("lock");
let cancel_cx = Cx::new(
RegionId::from_arena(ArenaIndex::new(0, 5)),
TaskId::from_arena(ArenaIndex::new(0, 5)),
Budget::INFINITE,
);
let mut fut_wait = mutex.lock(&cancel_cx);
let pending = poll_once(&mut fut_wait).is_none();
crate::assert_with_log!(pending, "waiter is pending", true, pending);
let waiters_before = mutex.waiters();
crate::assert_with_log!(
waiters_before == 1,
"1 waiter queued",
1usize,
waiters_before
);
cancel_cx.set_cancel_requested(true);
let result = poll_once(&mut fut_wait);
let cancelled = matches!(result, Some(Err(LockError::Cancelled)));
crate::assert_with_log!(cancelled, "waiter cancelled", true, cancelled);
drop(fut_wait);
let waiters_after = mutex.waiters();
crate::assert_with_log!(
waiters_after == 0,
"no leaked waiters after cancel+drop",
0usize,
waiters_after
);
crate::test_complete!("mutex_cancel_cleans_waiter_on_drop");
}
#[test]
fn mutex_queued_waiter_sees_poison_after_holder_panics() {
init_test("mutex_queued_waiter_sees_poison_after_holder_panics");
let mutex = Arc::new(Mutex::new(0_u32));
let cx = test_cx();
let mut fut_wait = mutex.lock(&cx);
let m2 = Arc::clone(&mutex);
let handle = std::thread::spawn(move || {
let cx = test_cx();
let _guard = lock_blocking(&m2, &cx);
std::thread::sleep(std::time::Duration::from_millis(50));
panic!("poison while waiter is queued");
});
std::thread::sleep(std::time::Duration::from_millis(10));
let pending = poll_once(&mut fut_wait).is_none();
crate::assert_with_log!(pending, "waiter is pending", true, pending);
let _ = handle.join();
let result = poll_once(&mut fut_wait);
let poisoned = matches!(result, Some(Err(LockError::Poisoned)));
crate::assert_with_log!(poisoned, "queued waiter sees poison", true, poisoned);
crate::test_complete!("mutex_queued_waiter_sees_poison_after_holder_panics");
}
#[test]
fn owned_mutex_try_lock_returns_poisoned() {
init_test("owned_mutex_try_lock_returns_poisoned");
let mutex = Arc::new(Mutex::new(0_u32));
let m = Arc::clone(&mutex);
let handle = std::thread::spawn(move || {
let cx = test_cx();
let _guard = lock_blocking(&m, &cx);
panic!("poison");
});
let _ = handle.join();
let result = OwnedMutexGuard::try_lock(Arc::clone(&mutex));
let is_poisoned = matches!(result, Err(TryLockError::Poisoned));
crate::assert_with_log!(
is_poisoned,
"OwnedMutexGuard::try_lock Poisoned",
true,
is_poisoned
);
crate::test_complete!("owned_mutex_try_lock_returns_poisoned");
}
#[test]
fn lock_error_debug_clone_copy_eq_display() {
let poisoned = LockError::Poisoned;
let cancelled = LockError::Cancelled;
let copied = poisoned;
let cloned = poisoned;
assert_eq!(copied, cloned);
assert_eq!(copied, LockError::Poisoned);
assert_ne!(poisoned, cancelled);
assert!(format!("{poisoned:?}").contains("Poisoned"));
assert!(format!("{cancelled:?}").contains("Cancelled"));
assert!(poisoned.to_string().contains("poisoned"));
assert!(cancelled.to_string().contains("cancelled"));
}
#[test]
fn try_lock_error_debug_clone_copy_eq_display() {
let locked = TryLockError::Locked;
let poisoned = TryLockError::Poisoned;
let copied = locked;
let cloned = locked;
assert_eq!(copied, cloned);
assert_ne!(locked, poisoned);
assert!(format!("{locked:?}").contains("Locked"));
assert!(locked.to_string().contains("locked"));
assert!(poisoned.to_string().contains("poisoned"));
}
}