#![allow(unsafe_code)]
use parking_lot::Mutex as ParkingMutex;
use smallvec::SmallVec;
use std::cell::UnsafeCell;
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context, Poll, Waker};
use super::waiter::{WaiterChain, WaiterId};
use crate::cx::Cx;
use crate::sync::lock_ordering::{self, LockRank};
const MAX_CONSECUTIVE_WRITERS_BEFORE_READER_BATCH: usize = 16;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RwLockError {
Poisoned,
Cancelled,
PolledAfterCompletion,
}
impl std::fmt::Display for RwLockError {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Poisoned => write!(f, "rwlock poisoned"),
Self::Cancelled => write!(f, "rwlock acquisition cancelled"),
Self::PolledAfterCompletion => write!(f, "rwlock future polled after completion"),
}
}
}
impl std::error::Error for RwLockError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TryReadError {
Locked,
Poisoned,
}
impl std::fmt::Display for TryReadError {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Locked => write!(f, "rwlock is write-locked"),
Self::Poisoned => write!(f, "rwlock poisoned"),
}
}
}
impl std::error::Error for TryReadError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TryWriteError {
Locked,
Poisoned,
}
impl std::fmt::Display for TryWriteError {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Locked => write!(f, "rwlock is locked"),
Self::Poisoned => write!(f, "rwlock poisoned"),
}
}
}
impl std::error::Error for TryWriteError {}
#[derive(Debug, Clone, Default)]
struct State {
readers: usize,
writer_active: bool,
writer_waiters: usize,
reader_waiters: WaiterChain<u64>,
writer_queue: WaiterChain<u64>,
next_waiter_id: u64,
consecutive_writers_served: usize,
}
#[derive(Debug)]
pub struct RwLock<T> {
state: ParkingMutex<State>,
data: UnsafeCell<T>,
poisoned: AtomicBool,
name: &'static str,
rank: Option<LockRank>,
}
unsafe impl<T: Send> Send for RwLock<T> {}
unsafe impl<T: Send + Sync> Sync for RwLock<T> {}
impl<T> RwLock<T> {
#[inline]
#[must_use]
pub fn with_name(name: &'static str, value: T) -> Self {
let rank = LockRank::from_name(name);
Self {
state: ParkingMutex::new(State::default()),
data: UnsafeCell::new(value),
poisoned: AtomicBool::new(false),
name,
rank,
}
}
#[inline]
#[must_use]
pub fn new(value: T) -> Self {
Self::with_name("unknown", value)
}
#[inline]
pub fn into_inner(self) -> Result<T, RwLockError> {
if self.is_poisoned() {
return Err(RwLockError::Poisoned);
}
Ok(self.data.into_inner())
}
}
impl<T> RwLock<T> {
#[inline]
#[must_use]
pub fn is_poisoned(&self) -> bool {
self.poisoned.load(Ordering::Acquire)
}
#[inline]
pub fn read<'a, 'b, Caps>(&'a self, cx: &'b Cx<Caps>) -> ReadFuture<'a, 'b, T, Caps> {
ReadFuture {
lock: self,
cx,
waiter_id: None,
completed: false,
}
}
#[inline]
pub fn try_read(&self) -> Result<RwLockReadGuard<'_, T>, TryReadError> {
self.try_acquire_read_state()?;
Ok(RwLockReadGuard { lock: self })
}
#[inline]
pub fn write<'a, 'b, Caps>(&'a self, cx: &'b Cx<Caps>) -> WriteFuture<'a, 'b, T, Caps> {
WriteFuture {
lock: self,
cx,
waiter_id: None,
counted: false,
completed: false,
}
}
#[inline]
pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, T>, TryWriteError> {
self.try_acquire_write_state()?;
Ok(RwLockWriteGuard { lock: self })
}
#[inline]
pub fn get_mut(&mut self) -> Result<&mut T, RwLockError> {
if self.is_poisoned() {
return Err(RwLockError::Poisoned);
}
Ok(self.data.get_mut())
}
#[inline]
fn try_acquire_read_state(&self) -> Result<(), TryReadError> {
if let Some(rank) = self.rank {
lock_ordering::check_acquire(self.name, rank);
}
let mut state = self.state.lock();
if self.is_poisoned() {
return Err(TryReadError::Poisoned);
}
if state.writer_active || state.writer_waiters > 0 {
return Err(TryReadError::Locked);
}
state.readers += 1;
drop(state);
if let Some(rank) = self.rank {
lock_ordering::record_acquire(self.name, rank);
}
Ok(())
}
#[inline]
fn try_acquire_write_state(&self) -> Result<(), TryWriteError> {
if let Some(rank) = self.rank {
lock_ordering::check_acquire(self.name, rank);
}
let mut state = self.state.lock();
if self.is_poisoned() {
return Err(TryWriteError::Poisoned);
}
if state.writer_active || state.readers > 0 || state.writer_waiters > 0 {
return Err(TryWriteError::Locked);
}
state.writer_active = true;
drop(state);
if let Some(rank) = self.rank {
lock_ordering::record_acquire(self.name, rank);
}
Ok(())
}
#[inline]
fn pop_writer_waiter(state: &mut State) -> Option<Waker> {
state.writer_queue.pop_front().map(|(_, waker, _)| waker)
}
#[inline]
fn drain_reader_waiters(state: &mut State) -> SmallVec<[Waker; 4]> {
SmallVec::from_vec(state.reader_waiters.drain())
}
#[inline]
fn queued_waiter_wakers(state: &State) -> SmallVec<[Waker; 4]> {
let mut wakers = SmallVec::new();
wakers.extend(state.reader_waiters.clone_wakers());
wakers.extend(state.writer_queue.clone_wakers());
wakers
}
#[inline]
fn reader_arrived_before_writer(reader_id: u64, writer_id: u64) -> bool {
reader_id.wrapping_sub(writer_id).cast_signed() < 0
}
#[inline]
fn take_eligible_reader_waiters(state: &mut State) -> SmallVec<[Waker; 4]> {
let Some(first_writer_id) = state.writer_queue.front_tag().copied() else {
return Self::drain_reader_waiters(state);
};
let mut wakers = SmallVec::new();
while state.reader_waiters.front_tag().is_some_and(|reader_id| {
Self::reader_arrived_before_writer(*reader_id, first_writer_id)
}) {
if let Some((_, waker, _)) = state.reader_waiters.pop_front() {
wakers.push(waker);
}
}
wakers
}
#[inline]
fn take_forced_reader_turn(state: &mut State) -> SmallVec<[Waker; 4]> {
let mut wakers = SmallVec::new();
if let Some((_, waker, _)) = state.reader_waiters.pop_front() {
wakers.push(waker);
}
wakers
}
#[inline]
fn should_wake_writer(state: &State) -> bool {
if state.writer_queue.is_empty() {
return false;
}
if state.reader_waiters.is_empty() {
return true;
}
match (
state.writer_queue.front_tag().copied(),
state.reader_waiters.front_tag().copied(),
) {
(Some(writer_id), Some(reader_id)) => {
!Self::reader_arrived_before_writer(reader_id, writer_id)
}
_ => false,
}
}
#[inline]
fn release_reader(&self) {
let waker = {
let mut state = self.state.lock();
state.readers = state.readers.saturating_sub(1);
if state.readers == 0 && state.writer_waiters > 0 {
let waker = Self::pop_writer_waiter(&mut state);
if waker.is_some() {
state.writer_active = true;
if !state.reader_waiters.is_empty() {
state.consecutive_writers_served =
state.consecutive_writers_served.saturating_add(1);
} else {
state.consecutive_writers_served = 0;
}
}
waker
} else {
None
}
};
if let Some(waker) = waker {
waker.wake();
}
}
#[inline]
fn release_writer(&self) {
let (writer_waker, reader_wakers) = {
let mut state = self.state.lock();
state.writer_active = false;
if self.is_poisoned() {
let wakers = Self::queued_waiter_wakers(&state);
drop(state);
(None, wakers)
} else {
let force_reader_batch = state.consecutive_writers_served
>= MAX_CONSECUTIVE_WRITERS_BEFORE_READER_BATCH
&& !state.reader_waiters.is_empty();
let wake_writer = !force_reader_batch && Self::should_wake_writer(&state);
if wake_writer {
let waker = Self::pop_writer_waiter(&mut state);
if waker.is_some() {
state.writer_active = true;
if !state.reader_waiters.is_empty() {
state.consecutive_writers_served =
state.consecutive_writers_served.saturating_add(1);
} else {
state.consecutive_writers_served = 0;
}
}
(waker, SmallVec::new())
} else if force_reader_batch {
let wakers = Self::take_forced_reader_turn(&mut state);
state.readers += wakers.len();
state.consecutive_writers_served = 0;
drop(state);
(None, wakers)
} else {
let wakers = Self::take_eligible_reader_waiters(&mut state);
state.readers += wakers.len();
if !wakers.is_empty() {
state.consecutive_writers_served = 0;
}
drop(state);
(None, wakers)
}
}
};
if let Some(waker) = writer_waker {
waker.wake();
}
for waker in reader_wakers {
waker.wake();
}
}
#[inline]
fn abandon_read_waiter(&self, waiter_id: &mut Option<WaiterId>) {
let Some(waiter_id) = waiter_id.take() else {
return;
};
let writer_waker = {
let mut state = self.state.lock();
if state.reader_waiters.remove(waiter_id).is_some() {
None
} else {
state.readers = state.readers.saturating_sub(1);
if let Some(rank) = self.rank {
lock_ordering::record_release(self.name, rank);
}
if state.readers == 0 && state.writer_waiters > 0 {
let waker = Self::pop_writer_waiter(&mut state);
if waker.is_some() {
state.writer_active = true;
if !state.reader_waiters.is_empty() {
state.consecutive_writers_served =
state.consecutive_writers_served.saturating_add(1);
} else {
state.consecutive_writers_served = 0;
}
}
waker
} else {
None
}
}
};
if let Some(waker) = writer_waker {
waker.wake();
}
}
#[inline]
fn abandon_write_waiter(&self, waiter_id: &mut Option<WaiterId>, counted: &mut bool) {
if !*counted {
return;
}
let waiter_id = waiter_id.take();
let poisoned = self.is_poisoned();
let (writer_waker, reader_wakers) = {
let mut state = self.state.lock();
let result = if let Some(waiter_id) = waiter_id {
if state.writer_queue.remove(waiter_id).is_some() {
state.writer_waiters = state.writer_waiters.saturating_sub(1);
if state.writer_waiters == 0 && !state.writer_active {
if poisoned {
(None, SmallVec::<[Waker; 4]>::new())
} else {
let wakers = Self::drain_reader_waiters(&mut state);
state.readers += wakers.len();
(None, wakers)
}
} else {
(None, SmallVec::<[Waker; 4]>::new())
}
} else {
state.writer_waiters = state.writer_waiters.saturating_sub(1);
state.writer_active = false;
if let Some(rank) = self.rank {
lock_ordering::record_release(self.name, rank);
}
if poisoned {
let wakers = Self::queued_waiter_wakers(&state);
(None, wakers)
} else {
let force_reader_batch = state.consecutive_writers_served
>= MAX_CONSECUTIVE_WRITERS_BEFORE_READER_BATCH
&& !state.reader_waiters.is_empty();
let wake_writer = !force_reader_batch && Self::should_wake_writer(&state);
if wake_writer {
let waker = Self::pop_writer_waiter(&mut state);
if waker.is_some() {
state.writer_active = true;
if !state.reader_waiters.is_empty() {
state.consecutive_writers_served =
state.consecutive_writers_served.saturating_add(1);
} else {
state.consecutive_writers_served = 0;
}
}
(waker, SmallVec::<[Waker; 4]>::new())
} else if force_reader_batch {
let wakers = Self::take_forced_reader_turn(&mut state);
state.readers += wakers.len();
state.consecutive_writers_served = 0;
(None, wakers)
} else {
let wakers = Self::take_eligible_reader_waiters(&mut state);
state.readers += wakers.len();
if !wakers.is_empty() {
state.consecutive_writers_served = 0;
}
(None, wakers)
}
}
}
} else {
state.writer_waiters = state.writer_waiters.saturating_sub(1);
if state.writer_waiters == 0 && !state.writer_active {
if poisoned {
(None, SmallVec::<[Waker; 4]>::new())
} else {
let wakers = Self::drain_reader_waiters(&mut state);
state.readers += wakers.len();
(None, wakers)
}
} else {
(None, SmallVec::<[Waker; 4]>::new())
}
};
drop(state);
result
};
*counted = false;
if let Some(waker) = writer_waker {
waker.wake();
}
for waker in reader_wakers {
waker.wake();
}
}
#[cfg(test)]
fn debug_state(&self) -> State {
(*self.state.lock()).clone()
}
}
pub struct ReadFuture<'a, 'b, T, Caps = crate::cx::cap::All> {
lock: &'a RwLock<T>,
cx: &'b Cx<Caps>,
waiter_id: Option<WaiterId>,
completed: bool,
}
impl<'a, T, Caps> Future for ReadFuture<'a, '_, T, Caps> {
type Output = Result<RwLockReadGuard<'a, T>, RwLockError>;
#[inline]
fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.completed {
return Poll::Ready(Err(RwLockError::PolledAfterCompletion));
}
if this.cx.checkpoint().is_err() {
this.lock.abandon_read_waiter(&mut this.waiter_id);
this.completed = true;
return Poll::Ready(Err(RwLockError::Cancelled));
}
let mut state = this.lock.state.lock();
if this.lock.is_poisoned() {
drop(state);
this.lock.abandon_read_waiter(&mut this.waiter_id);
this.completed = true;
return Poll::Ready(Err(RwLockError::Poisoned));
}
if let Some(waiter_id) = this.waiter_id {
if state
.reader_waiters
.update_waker(waiter_id, context.waker())
{
drop(state);
return Poll::Pending;
}
if let Some(rank) = this.lock.rank {
lock_ordering::check_acquire(this.lock.name, rank);
lock_ordering::record_acquire(this.lock.name, rank);
}
this.waiter_id = None;
drop(state);
this.completed = true;
return Poll::Ready(Ok(RwLockReadGuard { lock: this.lock }));
}
if !state.writer_active && state.writer_waiters == 0 {
if let Some(rank) = this.lock.rank {
lock_ordering::check_acquire(this.lock.name, rank);
}
state.readers += 1;
drop(state);
if let Some(rank) = this.lock.rank {
lock_ordering::record_acquire(this.lock.name, rank);
}
this.completed = true;
return Poll::Ready(Ok(RwLockReadGuard { lock: this.lock }));
}
let id = state.next_waiter_id;
state.next_waiter_id = state.next_waiter_id.wrapping_add(1);
let waiter_id = state
.reader_waiters
.push_back_tagged(context.waker().clone(), id);
drop(state);
this.waiter_id = Some(waiter_id);
Poll::Pending
}
}
impl<T, Caps> Drop for ReadFuture<'_, '_, T, Caps> {
fn drop(&mut self) {
self.lock.abandon_read_waiter(&mut self.waiter_id);
}
}
pub struct WriteFuture<'a, 'b, T, Caps = crate::cx::cap::All> {
lock: &'a RwLock<T>,
cx: &'b Cx<Caps>,
waiter_id: Option<WaiterId>,
counted: bool,
completed: bool,
}
impl<'a, T, Caps> Future for WriteFuture<'a, '_, T, Caps> {
type Output = Result<RwLockWriteGuard<'a, T>, RwLockError>;
#[inline]
fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.completed {
return Poll::Ready(Err(RwLockError::PolledAfterCompletion));
}
if this.cx.checkpoint().is_err() {
this.lock
.abandon_write_waiter(&mut this.waiter_id, &mut this.counted);
this.completed = true;
return Poll::Ready(Err(RwLockError::Cancelled));
}
let mut state = this.lock.state.lock();
if this.lock.is_poisoned() {
drop(state);
this.lock
.abandon_write_waiter(&mut this.waiter_id, &mut this.counted);
this.completed = true;
return Poll::Ready(Err(RwLockError::Poisoned));
}
if let Some(waiter_id) = this.waiter_id {
if state.writer_queue.update_waker(waiter_id, context.waker()) {
drop(state);
return Poll::Pending;
}
if let Some(rank) = this.lock.rank {
lock_ordering::check_acquire(this.lock.name, rank);
lock_ordering::record_acquire(this.lock.name, rank);
}
this.waiter_id = None;
if this.counted {
state.writer_waiters = state.writer_waiters.saturating_sub(1);
this.counted = false;
}
drop(state);
this.completed = true;
return Poll::Ready(Ok(RwLockWriteGuard { lock: this.lock }));
}
let can_acquire =
!state.writer_active && state.readers == 0 && state.writer_queue.is_empty();
if can_acquire {
if let Some(rank) = this.lock.rank {
lock_ordering::check_acquire(this.lock.name, rank);
}
state.writer_active = true;
drop(state);
if let Some(rank) = this.lock.rank {
lock_ordering::record_acquire(this.lock.name, rank);
}
this.completed = true;
return Poll::Ready(Ok(RwLockWriteGuard { lock: this.lock }));
}
if !this.counted {
state.writer_waiters += 1;
this.counted = true;
}
let id = state.next_waiter_id;
state.next_waiter_id = state.next_waiter_id.wrapping_add(1);
let waiter_id = state
.writer_queue
.push_back_tagged(context.waker().clone(), id);
drop(state);
this.waiter_id = Some(waiter_id);
Poll::Pending
}
}
impl<T, Caps> Drop for WriteFuture<'_, '_, T, Caps> {
fn drop(&mut self) {
self.lock
.abandon_write_waiter(&mut self.waiter_id, &mut self.counted);
}
}
#[must_use = "guard will be immediately released if not held"]
pub struct RwLockReadGuard<'a, T> {
lock: &'a RwLock<T>,
}
unsafe impl<T: Send + Sync> Send for RwLockReadGuard<'_, T> {}
unsafe impl<T: Send + Sync> Sync for RwLockReadGuard<'_, T> {}
impl<T> Deref for RwLockReadGuard<'_, T> {
type Target = T;
#[inline]
fn deref(&self) -> &Self::Target {
unsafe { &*self.lock.data.get() }
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for RwLockReadGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(&**self, f)
}
}
impl<T> Drop for RwLockReadGuard<'_, T> {
#[inline]
fn drop(&mut self) {
self.lock.release_reader();
if let Some(rank) = self.lock.rank {
lock_ordering::record_release(self.lock.name, rank);
}
}
}
#[must_use = "guard will be immediately released if not held"]
pub struct RwLockWriteGuard<'a, T> {
lock: &'a RwLock<T>,
}
unsafe impl<T: Send> Send for RwLockWriteGuard<'_, T> {}
unsafe impl<T: Send + Sync> Sync for RwLockWriteGuard<'_, T> {}
impl<T> Deref for RwLockWriteGuard<'_, T> {
type Target = T;
#[inline]
fn deref(&self) -> &Self::Target {
unsafe { &*self.lock.data.get() }
}
}
impl<T> DerefMut for RwLockWriteGuard<'_, T> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.lock.data.get() }
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for RwLockWriteGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(&**self, f)
}
}
impl<T> Drop for RwLockWriteGuard<'_, T> {
#[inline]
fn drop(&mut self) {
if std::thread::panicking() {
self.lock.poisoned.store(true, Ordering::Release);
}
self.lock.release_writer();
if let Some(rank) = self.lock.rank {
lock_ordering::record_release(self.lock.name, rank);
}
}
}
impl<'a, T> RwLockWriteGuard<'a, T> {
pub fn downgrade(self) -> RwLockReadGuard<'a, T> {
let md = std::mem::ManuallyDrop::new(self);
let read_guard = RwLockReadGuard { lock: md.lock };
let reader_wakers = {
let mut state = md.lock.state.lock();
debug_assert!(state.writer_active, "downgrade called but no active writer");
state.writer_active = false;
state.readers = 1;
let wakers = RwLock::<T>::take_eligible_reader_waiters(&mut state);
state.readers += wakers.len();
if !wakers.is_empty() {
state.consecutive_writers_served = 0;
}
wakers
};
for waker in reader_wakers {
waker.wake();
}
read_guard
}
}
#[must_use = "guard will be immediately released if not held"]
pub struct OwnedRwLockReadGuard<T> {
lock: Arc<RwLock<T>>,
}
impl<T> OwnedRwLockReadGuard<T> {
pub fn read<Caps>(lock: Arc<RwLock<T>>, cx: &Cx<Caps>) -> OwnedReadFuture<'_, T, Caps> {
OwnedReadFuture {
lock,
cx,
waiter_id: None,
completed: false,
}
}
pub fn try_read(lock: Arc<RwLock<T>>) -> Result<Self, TryReadError> {
lock.try_acquire_read_state()?;
Ok(Self { lock })
}
pub fn with_read<F, R>(&self, f: F) -> R
where
F: FnOnce(&T) -> R,
{
assert!(!self.lock.is_poisoned(), "rwlock poisoned");
f(unsafe { &*self.lock.data.get() })
}
}
impl<T> Deref for OwnedRwLockReadGuard<T> {
type Target = T;
#[inline]
fn deref(&self) -> &Self::Target {
unsafe { &*self.lock.data.get() }
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for OwnedRwLockReadGuard<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(&**self, f)
}
}
impl<T> Drop for OwnedRwLockReadGuard<T> {
#[inline]
fn drop(&mut self) {
self.lock.release_reader();
if let Some(rank) = self.lock.rank {
lock_ordering::record_release(self.lock.name, rank);
}
}
}
#[must_use = "guard will be immediately released if not held"]
pub struct OwnedRwLockWriteGuard<T> {
lock: Arc<RwLock<T>>,
}
impl<T> OwnedRwLockWriteGuard<T> {
pub fn write<Caps>(lock: Arc<RwLock<T>>, cx: &Cx<Caps>) -> OwnedWriteFuture<'_, T, Caps> {
OwnedWriteFuture {
lock,
cx,
waiter_id: None,
counted: false,
completed: false,
}
}
pub fn try_write(lock: Arc<RwLock<T>>) -> Result<Self, TryWriteError> {
lock.try_acquire_write_state()?;
Ok(Self { lock })
}
pub fn with_write<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut T) -> R,
{
assert!(!self.lock.is_poisoned(), "rwlock poisoned");
f(unsafe { &mut *self.lock.data.get() })
}
}
impl<T> Deref for OwnedRwLockWriteGuard<T> {
type Target = T;
#[inline]
fn deref(&self) -> &Self::Target {
unsafe { &*self.lock.data.get() }
}
}
impl<T> DerefMut for OwnedRwLockWriteGuard<T> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.lock.data.get() }
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for OwnedRwLockWriteGuard<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(&**self, f)
}
}
impl<T> Drop for OwnedRwLockWriteGuard<T> {
#[inline]
fn drop(&mut self) {
if std::thread::panicking() {
self.lock.poisoned.store(true, Ordering::Release);
}
self.lock.release_writer();
if let Some(rank) = self.lock.rank {
lock_ordering::record_release(self.lock.name, rank);
}
}
}
impl<T> OwnedRwLockWriteGuard<T> {
pub fn downgrade(self) -> OwnedRwLockReadGuard<T> {
let md = std::mem::ManuallyDrop::new(self);
let lock = unsafe { std::ptr::read(&md.lock) };
let read_guard = OwnedRwLockReadGuard { lock };
let reader_wakers = {
let mut state = read_guard.lock.state.lock();
debug_assert!(state.writer_active, "downgrade called but no active writer");
state.writer_active = false;
state.readers = 1;
let wakers = RwLock::<T>::take_eligible_reader_waiters(&mut state);
state.readers += wakers.len();
if !wakers.is_empty() {
state.consecutive_writers_served = 0;
}
wakers
};
for waker in reader_wakers {
waker.wake();
}
read_guard
}
}
pub struct OwnedReadFuture<'b, T, Caps = crate::cx::cap::All> {
lock: Arc<RwLock<T>>,
cx: &'b Cx<Caps>,
waiter_id: Option<WaiterId>,
completed: bool,
}
impl<T, Caps> Future for OwnedReadFuture<'_, T, Caps> {
type Output = Result<OwnedRwLockReadGuard<T>, RwLockError>;
#[inline]
fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.completed {
return Poll::Ready(Err(RwLockError::PolledAfterCompletion));
}
if this.cx.checkpoint().is_err() {
this.lock.abandon_read_waiter(&mut this.waiter_id);
this.completed = true;
return Poll::Ready(Err(RwLockError::Cancelled));
}
let mut state = this.lock.state.lock();
if this.lock.is_poisoned() {
drop(state);
this.lock.abandon_read_waiter(&mut this.waiter_id);
this.completed = true;
return Poll::Ready(Err(RwLockError::Poisoned));
}
if let Some(waiter_id) = this.waiter_id {
if state
.reader_waiters
.update_waker(waiter_id, context.waker())
{
drop(state);
return Poll::Pending;
}
if let Some(rank) = this.lock.rank {
lock_ordering::check_acquire(this.lock.name, rank);
lock_ordering::record_acquire(this.lock.name, rank);
}
this.waiter_id = None;
drop(state);
this.completed = true;
return Poll::Ready(Ok(OwnedRwLockReadGuard {
lock: Arc::clone(&this.lock),
}));
}
if !state.writer_active && state.writer_waiters == 0 {
if let Some(rank) = this.lock.rank {
lock_ordering::check_acquire(this.lock.name, rank);
}
state.readers += 1;
drop(state);
if let Some(rank) = this.lock.rank {
lock_ordering::record_acquire(this.lock.name, rank);
}
this.completed = true;
return Poll::Ready(Ok(OwnedRwLockReadGuard {
lock: Arc::clone(&this.lock),
}));
}
let id = state.next_waiter_id;
state.next_waiter_id = state.next_waiter_id.wrapping_add(1);
let waiter_id = state
.reader_waiters
.push_back_tagged(context.waker().clone(), id);
drop(state);
this.waiter_id = Some(waiter_id);
Poll::Pending
}
}
impl<T, Caps> Drop for OwnedReadFuture<'_, T, Caps> {
fn drop(&mut self) {
self.lock.abandon_read_waiter(&mut self.waiter_id);
}
}
pub struct OwnedWriteFuture<'b, T, Caps = crate::cx::cap::All> {
lock: Arc<RwLock<T>>,
cx: &'b Cx<Caps>,
waiter_id: Option<WaiterId>,
counted: bool,
completed: bool,
}
impl<T, Caps> Future for OwnedWriteFuture<'_, T, Caps> {
type Output = Result<OwnedRwLockWriteGuard<T>, RwLockError>;
#[inline]
fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.completed {
return Poll::Ready(Err(RwLockError::PolledAfterCompletion));
}
if this.cx.checkpoint().is_err() {
this.lock
.abandon_write_waiter(&mut this.waiter_id, &mut this.counted);
this.completed = true;
return Poll::Ready(Err(RwLockError::Cancelled));
}
let mut state = this.lock.state.lock();
if this.lock.is_poisoned() {
drop(state);
this.lock
.abandon_write_waiter(&mut this.waiter_id, &mut this.counted);
this.completed = true;
return Poll::Ready(Err(RwLockError::Poisoned));
}
if let Some(waiter_id) = this.waiter_id {
if state.writer_queue.update_waker(waiter_id, context.waker()) {
drop(state);
return Poll::Pending;
}
if let Some(rank) = this.lock.rank {
lock_ordering::check_acquire(this.lock.name, rank);
lock_ordering::record_acquire(this.lock.name, rank);
}
this.waiter_id = None;
if this.counted {
state.writer_waiters = state.writer_waiters.saturating_sub(1);
this.counted = false;
}
drop(state);
this.completed = true;
return Poll::Ready(Ok(OwnedRwLockWriteGuard {
lock: Arc::clone(&this.lock),
}));
}
let can_acquire =
!state.writer_active && state.readers == 0 && state.writer_queue.is_empty();
if can_acquire {
if let Some(rank) = this.lock.rank {
lock_ordering::check_acquire(this.lock.name, rank);
}
state.writer_active = true;
drop(state);
if let Some(rank) = this.lock.rank {
lock_ordering::record_acquire(this.lock.name, rank);
}
this.completed = true;
return Poll::Ready(Ok(OwnedRwLockWriteGuard {
lock: Arc::clone(&this.lock),
}));
}
if !this.counted {
state.writer_waiters += 1;
this.counted = true;
}
let id = state.next_waiter_id;
state.next_waiter_id = state.next_waiter_id.wrapping_add(1);
let waiter_id = state
.writer_queue
.push_back_tagged(context.waker().clone(), id);
drop(state);
this.waiter_id = Some(waiter_id);
Poll::Pending
}
}
impl<T, Caps> Drop for OwnedWriteFuture<'_, T, Caps> {
fn drop(&mut self) {
self.lock
.abandon_write_waiter(&mut self.waiter_id, &mut self.counted);
}
}
#[cfg(test)]
#[allow(clippy::significant_drop_tightening)]
#[allow(dead_code)]
mod tests {
use super::*;
use crate::cx::cap;
use crate::test_utils::init_test_logging;
use crate::util::ArenaIndex;
use std::sync::Arc as StdArc;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::thread;
fn init_test(name: &str) {
init_test_logging();
crate::test_phase!(name);
}
fn poll_once<T>(future: &mut (impl Future<Output = T> + Unpin)) -> Option<T> {
let waker = Waker::noop();
let mut cx = Context::from_waker(waker);
match std::pin::Pin::new(future).poll(&mut cx) {
Poll::Ready(v) => Some(v),
Poll::Pending => None,
}
}
fn poll_until_ready<T>(future: impl Future<Output = T>) -> T {
let waker = Waker::noop();
let mut cx = Context::from_waker(waker);
let mut future = std::pin::pin!(future);
loop {
match future.as_mut().poll(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => std::thread::yield_now(),
}
}
}
fn read_blocking<'a, T>(lock: &'a RwLock<T>, cx: &Cx) -> RwLockReadGuard<'a, T> {
poll_until_ready(lock.read(cx)).expect("read failed")
}
fn write_blocking<'a, T>(lock: &'a RwLock<T>, cx: &Cx) -> RwLockWriteGuard<'a, T> {
poll_until_ready(lock.write(cx)).expect("write failed")
}
fn test_cx() -> Cx<cap::All> {
test_cx_with_slot(0)
}
fn test_cx_with_slot(slot: u32) -> Cx<cap::All> {
Cx::new(
crate::types::RegionId::from_arena(ArenaIndex::new(0, slot)),
crate::types::TaskId::from_arena(ArenaIndex::new(0, slot)),
crate::types::Budget::INFINITE,
)
}
#[test]
fn multiple_readers_allowed() {
init_test("multiple_readers_allowed");
let cx = test_cx();
let lock = RwLock::new(42_u32);
let guard1 = read_blocking(&lock, &cx);
let guard2 = read_blocking(&lock, &cx);
crate::assert_with_log!(*guard1 == 42, "guard1 value", 42u32, *guard1);
crate::assert_with_log!(*guard2 == 42, "guard2 value", 42u32, *guard2);
crate::test_complete!("multiple_readers_allowed");
}
#[test]
fn read_accepts_detached_no_cap_context() {
init_test("read_accepts_detached_no_cap_context");
let cx = Cx::<cap::None>::detached_cancel_context();
let lock = RwLock::new(42_u32);
let guard = poll_until_ready(lock.read(&cx)).expect("read should accept cap::None Cx");
crate::assert_with_log!(*guard == 42, "read guard value", 42u32, *guard);
crate::test_complete!("read_accepts_detached_no_cap_context");
}
#[test]
fn write_accepts_detached_no_cap_context() {
init_test("write_accepts_detached_no_cap_context");
let cx = Cx::<cap::None>::detached_cancel_context();
let lock = RwLock::new(5_u32);
let mut guard =
poll_until_ready(lock.write(&cx)).expect("write should accept cap::None Cx");
*guard = 7;
crate::assert_with_log!(*guard == 7, "write guard value", 7u32, *guard);
crate::test_complete!("write_accepts_detached_no_cap_context");
}
#[test]
fn owned_read_accepts_detached_no_cap_context() {
init_test("owned_read_accepts_detached_no_cap_context");
let cx = Cx::<cap::None>::detached_cancel_context();
let lock = StdArc::new(RwLock::new(42_u32));
let guard = poll_until_ready(OwnedRwLockReadGuard::read(StdArc::clone(&lock), &cx))
.expect("owned read should accept cap::None Cx");
crate::assert_with_log!(*guard == 42, "owned read guard value", 42u32, *guard);
crate::test_complete!("owned_read_accepts_detached_no_cap_context");
}
#[test]
fn owned_write_accepts_detached_no_cap_context() {
init_test("owned_write_accepts_detached_no_cap_context");
let cx = Cx::<cap::None>::detached_cancel_context();
let lock = StdArc::new(RwLock::new(5_u32));
let mut guard = poll_until_ready(OwnedRwLockWriteGuard::write(StdArc::clone(&lock), &cx))
.expect("owned write should accept cap::None Cx");
*guard = 7;
crate::assert_with_log!(*guard == 7, "owned write guard value", 7u32, *guard);
crate::test_complete!("owned_write_accepts_detached_no_cap_context");
}
#[test]
fn write_excludes_readers_and_writers() {
init_test("write_excludes_readers_and_writers");
let cx = test_cx();
let lock = RwLock::new(5_u32);
let mut write = write_blocking(&lock, &cx);
*write = 7;
let read_locked = matches!(lock.try_read(), Err(TryReadError::Locked));
crate::assert_with_log!(read_locked, "read locked", true, read_locked);
let write_locked = matches!(lock.try_write(), Err(TryWriteError::Locked));
crate::assert_with_log!(write_locked, "write locked", true, write_locked);
drop(write);
let read = read_blocking(&lock, &cx);
crate::assert_with_log!(*read == 7, "read after write", 7u32, *read);
crate::test_complete!("write_excludes_readers_and_writers");
}
#[test]
fn writer_waiting_blocks_new_readers() {
init_test("writer_waiting_blocks_new_readers");
let cx = test_cx();
let lock = StdArc::new(RwLock::new(1_u32));
let read_guard = read_blocking(&lock, &cx);
let writer_started = StdArc::new(AtomicBool::new(false));
let writer_lock = StdArc::clone(&lock);
let writer_flag = StdArc::clone(&writer_started);
let handle = thread::spawn(move || {
let cx = test_cx();
writer_flag.store(true, AtomicOrdering::Release);
let _guard = write_blocking(&writer_lock, &cx);
});
while !writer_started.load(AtomicOrdering::Acquire) {
std::thread::yield_now();
}
let mut success = false;
for _ in 0..100 {
if matches!(lock.try_read(), Err(TryReadError::Locked)) {
success = true;
break;
}
std::thread::yield_now();
std::thread::sleep(std::time::Duration::from_millis(1));
}
crate::assert_with_log!(success, "writer blocked readers", true, success);
drop(read_guard);
let _ = handle.join();
crate::test_complete!("writer_waiting_blocks_new_readers");
}
#[test]
fn try_write_does_not_bypass_waiting_writer_turn() {
init_test("try_write_does_not_bypass_waiting_writer_turn");
let cx = test_cx();
let lock = RwLock::new(1_u32);
let read_guard = read_blocking(&lock, &cx);
let mut queued_writer = lock.write(&cx);
let pending = poll_once(&mut queued_writer).is_none();
crate::assert_with_log!(pending, "writer queued while reader held", true, pending);
drop(read_guard);
let try_write_locked = matches!(lock.try_write(), Err(TryWriteError::Locked));
crate::assert_with_log!(
try_write_locked,
"try_write must not bypass queued writer",
true,
try_write_locked
);
let queued_guard = poll_until_ready(queued_writer).expect("queued writer should acquire");
drop(queued_guard);
crate::test_complete!("try_write_does_not_bypass_waiting_writer_turn");
}
#[test]
fn cancel_during_read_wait() {
init_test("cancel_during_read_wait");
let cx = test_cx();
let lock = RwLock::new(0_u32);
let _write = write_blocking(&lock, &cx);
let mut fut = lock.read(&cx);
let pending = poll_once(&mut fut).is_none();
crate::assert_with_log!(pending, "read waits while writer held", true, pending);
cx.set_cancel_requested(true);
let cancelled = matches!(poll_once(&mut fut), Some(Err(RwLockError::Cancelled)));
crate::assert_with_log!(cancelled, "read cancelled", true, cancelled);
drop(fut);
let state = lock.debug_state();
let waiters = state.reader_waiters.len();
crate::assert_with_log!(waiters == 0, "reader waiters cleaned", 0usize, waiters);
crate::test_complete!("cancel_during_read_wait");
}
#[test]
fn cancel_queued_write_waiter_cleans_state_before_drop() {
init_test("cancel_queued_write_waiter_cleans_state_before_drop");
let cx = test_cx();
let cancel_cx = test_cx_with_slot(10);
let lock = RwLock::new(42_u32);
let read_guard = read_blocking(&lock, &cx);
let mut write_fut = lock.write(&cancel_cx);
let write_pending = poll_once(&mut write_fut).is_none();
crate::assert_with_log!(write_pending, "write waiter pending", true, write_pending);
let mut read_fut = lock.read(&cx);
let read_pending = poll_once(&mut read_fut).is_none();
crate::assert_with_log!(
read_pending,
"reader blocked by queued writer",
true,
read_pending
);
cancel_cx.set_cancel_requested(true);
let cancelled = matches!(poll_once(&mut write_fut), Some(Err(RwLockError::Cancelled)));
crate::assert_with_log!(cancelled, "write waiter cancelled", true, cancelled);
let state = lock.debug_state();
crate::assert_with_log!(
state.writer_waiters == 0 && state.writer_queue.is_empty(),
"write waiter removed without drop",
true,
state.writer_waiters == 0 && state.writer_queue.is_empty()
);
let read_result = poll_once(&mut read_fut);
let reader_acquired = matches!(read_result, Some(Ok(_)));
crate::assert_with_log!(
reader_acquired,
"reader unblocked before cancelled writer future is dropped",
true,
reader_acquired
);
if let Some(Ok(guard)) = read_result {
drop(guard);
}
drop(read_guard);
drop(write_fut);
crate::test_complete!("cancel_queued_write_waiter_cleans_state_before_drop");
}
#[test]
fn test_rwlock_try_read_success() {
init_test("test_rwlock_try_read_success");
let lock = RwLock::new(42_u32);
let guard = lock.try_read().expect("try_read should succeed");
crate::assert_with_log!(*guard == 42, "read value", 42u32, *guard);
crate::test_complete!("test_rwlock_try_read_success");
}
#[test]
fn test_rwlock_try_write_success() {
init_test("test_rwlock_try_write_success");
let lock = RwLock::new(42_u32);
let mut guard = lock.try_write().expect("try_write should succeed");
*guard = 100;
crate::assert_with_log!(*guard == 100, "write value", 100u32, *guard);
crate::test_complete!("test_rwlock_try_write_success");
}
#[test]
fn test_rwlock_cancel_during_write_wait() {
init_test("test_rwlock_cancel_during_write_wait");
let cx = test_cx();
let lock = RwLock::new(0_u32);
let _read = read_blocking(&lock, &cx);
let mut fut = lock.write(&cx);
let pending = poll_once(&mut fut).is_none();
crate::assert_with_log!(pending, "write waits while reader held", true, pending);
cx.set_cancel_requested(true);
let cancelled = matches!(poll_once(&mut fut), Some(Err(RwLockError::Cancelled)));
crate::assert_with_log!(cancelled, "write cancelled", true, cancelled);
drop(fut);
let state = lock.debug_state();
let waiters = state.writer_queue.len();
let writer_count = state.writer_waiters;
crate::assert_with_log!(
waiters == 0 && writer_count == 0,
"writer waiters cleaned",
true,
waiters == 0 && writer_count == 0
);
crate::test_complete!("test_rwlock_cancel_during_write_wait");
}
#[test]
fn cancel_pregranted_read_waiter_wakes_writer_before_drop() {
init_test("cancel_pregranted_read_waiter_wakes_writer_before_drop");
let cx = test_cx();
let cancel_cx = test_cx_with_slot(11);
let lock = RwLock::new(0_u32);
let active_writer = write_blocking(&lock, &cx);
let mut read_fut = lock.read(&cancel_cx);
let read_pending = poll_once(&mut read_fut).is_none();
crate::assert_with_log!(read_pending, "reader queued", true, read_pending);
let mut writer_fut = lock.write(&cx);
let writer_pending = poll_once(&mut writer_fut).is_none();
crate::assert_with_log!(writer_pending, "writer queued", true, writer_pending);
drop(active_writer);
cancel_cx.set_cancel_requested(true);
let cancelled = matches!(poll_once(&mut read_fut), Some(Err(RwLockError::Cancelled)));
crate::assert_with_log!(cancelled, "pre-granted reader cancelled", true, cancelled);
let state = lock.debug_state();
crate::assert_with_log!(
state.readers == 0 && state.reader_waiters.is_empty() && state.writer_active,
"pre-granted reader cleanup forwarded turn to writer",
true,
state.readers == 0 && state.reader_waiters.is_empty() && state.writer_active
);
let writer_result = poll_once(&mut writer_fut);
let writer_acquired = matches!(writer_result, Some(Ok(_)));
crate::assert_with_log!(
writer_acquired,
"writer acquires before cancelled reader future is dropped",
true,
writer_acquired
);
if let Some(Ok(guard)) = writer_result {
drop(guard);
}
drop(read_fut);
crate::test_complete!("cancel_pregranted_read_waiter_wakes_writer_before_drop");
}
#[test]
fn read_future_second_poll_fails_closed() {
init_test("read_future_second_poll_fails_closed");
let cx = test_cx();
let lock = RwLock::new(42_u32);
let mut fut = lock.read(&cx);
let first = poll_once(&mut fut);
let Some(Ok(guard)) = first else {
panic!("expected ready read guard");
};
let second = poll_once(&mut fut);
let done = matches!(second, Some(Err(RwLockError::PolledAfterCompletion)));
crate::assert_with_log!(done, "read future second poll fails closed", true, done);
drop(guard);
crate::test_complete!("read_future_second_poll_fails_closed");
}
#[test]
fn write_future_second_poll_fails_closed() {
init_test("write_future_second_poll_fails_closed");
let cx = test_cx();
let lock = RwLock::new(42_u32);
let mut fut = lock.write(&cx);
let first = poll_once(&mut fut);
let Some(Ok(mut guard)) = first else {
panic!("expected ready write guard");
};
*guard = 55;
let second = poll_once(&mut fut);
let done = matches!(second, Some(Err(RwLockError::PolledAfterCompletion)));
crate::assert_with_log!(done, "write future second poll fails closed", true, done);
drop(guard);
crate::test_complete!("write_future_second_poll_fails_closed");
}
#[test]
fn owned_read_future_second_poll_fails_closed() {
init_test("owned_read_future_second_poll_fails_closed");
let cx = test_cx();
let lock = StdArc::new(RwLock::new(42_u32));
let mut fut = OwnedRwLockReadGuard::read(StdArc::clone(&lock), &cx);
let first = poll_once(&mut fut);
let Some(Ok(guard)) = first else {
panic!("expected ready owned read guard");
};
let second = poll_once(&mut fut);
let done = matches!(second, Some(Err(RwLockError::PolledAfterCompletion)));
crate::assert_with_log!(
done,
"owned read future second poll fails closed",
true,
done
);
drop(guard);
crate::test_complete!("owned_read_future_second_poll_fails_closed");
}
#[test]
fn owned_write_future_second_poll_fails_closed() {
init_test("owned_write_future_second_poll_fails_closed");
let cx = test_cx();
let lock = StdArc::new(RwLock::new(42_u32));
let mut fut = OwnedRwLockWriteGuard::write(StdArc::clone(&lock), &cx);
let first = poll_once(&mut fut);
let Some(Ok(mut guard)) = first else {
panic!("expected ready owned write guard");
};
*guard = 77;
let second = poll_once(&mut fut);
let done = matches!(second, Some(Err(RwLockError::PolledAfterCompletion)));
crate::assert_with_log!(
done,
"owned write future second poll fails closed",
true,
done
);
drop(guard);
crate::test_complete!("owned_write_future_second_poll_fails_closed");
}
#[test]
fn test_rwlock_get_mut() {
init_test("test_rwlock_get_mut");
let mut lock = RwLock::new(42_u32);
*lock.get_mut().expect("rwlock should not be poisoned") = 100;
let value = *lock.get_mut().expect("rwlock should not be poisoned");
crate::assert_with_log!(value == 100, "get_mut works", 100u32, value);
crate::test_complete!("test_rwlock_get_mut");
}
#[test]
fn test_rwlock_into_inner() {
init_test("test_rwlock_into_inner");
let lock = RwLock::new(42_u32);
let value = lock.into_inner().expect("rwlock should not be poisoned");
crate::assert_with_log!(value == 42, "into_inner works", 42u32, value);
crate::test_complete!("test_rwlock_into_inner");
}
#[test]
fn test_rwlock_read_released_on_drop() {
init_test("test_rwlock_read_released_on_drop");
let cx = test_cx();
let lock = RwLock::new(42_u32);
{
let _guard = read_blocking(&lock, &cx);
}
let can_write = lock.try_write().is_ok();
crate::assert_with_log!(can_write, "can write after read drop", true, can_write);
crate::test_complete!("test_rwlock_read_released_on_drop");
}
#[test]
fn test_rwlock_write_released_on_drop() {
init_test("test_rwlock_write_released_on_drop");
let cx = test_cx();
let lock = RwLock::new(42_u32);
{
let _guard = write_blocking(&lock, &cx);
}
let can_read = lock.try_read().is_ok();
crate::assert_with_log!(can_read, "can read after write drop", true, can_read);
crate::test_complete!("test_rwlock_write_released_on_drop");
}
#[test]
fn test_writer_fifo_ordering() {
init_test("test_writer_fifo_ordering");
let cx = test_cx();
let lock = StdArc::new(RwLock::new(Vec::<u32>::new()));
let order = StdArc::new(parking_lot::Mutex::new(Vec::new()));
let read_guard = read_blocking(&lock, &cx);
let mut handles = Vec::new();
for id in 1..=3_u32 {
let lock_c = StdArc::clone(&lock);
let order_c = StdArc::clone(&order);
handles.push(thread::spawn(move || {
let cx = test_cx();
let mut guard = write_blocking(&lock_c, &cx);
order_c.lock().push(id);
guard.push(id);
}));
thread::sleep(std::time::Duration::from_millis(10));
}
drop(read_guard);
for h in handles {
let _ = h.join();
}
let final_order = order.lock().clone();
let data = lock.try_read().unwrap();
crate::assert_with_log!(
final_order == *data,
"writer FIFO order matches data",
true,
final_order == *data
);
crate::test_complete!("test_writer_fifo_ordering");
}
#[test]
fn release_writer_prefers_older_writer_over_reader() {
init_test("release_writer_prefers_older_writer_over_reader");
let cx = test_cx();
let lock = RwLock::new(0_u32);
let active_writer = write_blocking(&lock, &cx);
let mut writer_fut = lock.write(&cx);
let writer_pending = poll_once(&mut writer_fut).is_none();
crate::assert_with_log!(
writer_pending,
"queued writer is pending",
true,
writer_pending
);
let mut reader_fut = lock.read(&cx);
let reader_pending = poll_once(&mut reader_fut).is_none();
crate::assert_with_log!(
reader_pending,
"queued reader is pending",
true,
reader_pending
);
drop(active_writer);
let writer_result = poll_once(&mut writer_fut);
let writer_acquired = matches!(writer_result, Some(Ok(_)));
crate::assert_with_log!(
writer_acquired,
"older writer acquires before reader",
true,
writer_acquired
);
let reader_still_pending = poll_once(&mut reader_fut).is_none();
crate::assert_with_log!(
reader_still_pending,
"reader remains pending while writer holds lock",
true,
reader_still_pending
);
if let Some(Ok(writer_guard)) = writer_result {
drop(writer_guard);
}
let reader_result = poll_once(&mut reader_fut);
let reader_acquired = matches!(reader_result, Some(Ok(_)));
crate::assert_with_log!(
reader_acquired,
"reader acquires after writer releases",
true,
reader_acquired
);
crate::test_complete!("release_writer_prefers_older_writer_over_reader");
}
#[test]
fn release_writer_prefers_older_reader_over_writer() {
init_test("release_writer_prefers_older_reader_over_writer");
let cx = test_cx();
let lock = RwLock::new(0_u32);
let active_writer = write_blocking(&lock, &cx);
let mut reader_fut = lock.read(&cx);
let reader_pending = poll_once(&mut reader_fut).is_none();
crate::assert_with_log!(
reader_pending,
"queued reader is pending",
true,
reader_pending
);
let mut writer_fut = lock.write(&cx);
let writer_pending = poll_once(&mut writer_fut).is_none();
crate::assert_with_log!(
writer_pending,
"queued writer is pending",
true,
writer_pending
);
drop(active_writer);
let reader_result = poll_once(&mut reader_fut);
let reader_acquired = matches!(reader_result, Some(Ok(_)));
crate::assert_with_log!(
reader_acquired,
"older reader acquires before writer",
true,
reader_acquired
);
let writer_still_pending = poll_once(&mut writer_fut).is_none();
crate::assert_with_log!(
writer_still_pending,
"writer remains pending while reader holds lock",
true,
writer_still_pending
);
if let Some(Ok(reader_guard)) = reader_result {
drop(reader_guard);
}
let writer_result = poll_once(&mut writer_fut);
let writer_acquired = matches!(writer_result, Some(Ok(_)));
crate::assert_with_log!(
writer_acquired,
"writer acquires after reader releases",
true,
writer_acquired
);
crate::test_complete!("release_writer_prefers_older_reader_over_writer");
}
#[test]
fn release_writer_does_not_wake_readers_younger_than_first_writer() {
init_test("release_writer_does_not_wake_readers_younger_than_first_writer");
let cx = test_cx();
let lock = RwLock::new(0_u32);
let active_writer = write_blocking(&lock, &cx);
let mut older_reader_fut = lock.read(&cx);
let older_reader_pending = poll_once(&mut older_reader_fut).is_none();
crate::assert_with_log!(
older_reader_pending,
"older reader is pending",
true,
older_reader_pending
);
let mut writer_fut = lock.write(&cx);
let writer_pending = poll_once(&mut writer_fut).is_none();
crate::assert_with_log!(writer_pending, "writer is pending", true, writer_pending);
let mut younger_reader_fut = lock.read(&cx);
let younger_reader_pending = poll_once(&mut younger_reader_fut).is_none();
crate::assert_with_log!(
younger_reader_pending,
"younger reader is pending",
true,
younger_reader_pending
);
drop(active_writer);
let older_reader_result = poll_once(&mut older_reader_fut);
let older_reader_acquired = matches!(older_reader_result, Some(Ok(_)));
crate::assert_with_log!(
older_reader_acquired,
"older reader acquires first",
true,
older_reader_acquired
);
let younger_reader_still_pending = poll_once(&mut younger_reader_fut).is_none();
crate::assert_with_log!(
younger_reader_still_pending,
"younger reader stays queued behind writer",
true,
younger_reader_still_pending
);
let writer_still_pending = poll_once(&mut writer_fut).is_none();
crate::assert_with_log!(
writer_still_pending,
"writer is still queued while older reader holds lock",
true,
writer_still_pending
);
if let Some(Ok(older_reader_guard)) = older_reader_result {
drop(older_reader_guard);
}
let writer_result = poll_once(&mut writer_fut);
let writer_acquired = matches!(writer_result, Some(Ok(_)));
crate::assert_with_log!(
writer_acquired,
"writer acquires before younger reader",
true,
writer_acquired
);
let younger_reader_still_pending = poll_once(&mut younger_reader_fut).is_none();
crate::assert_with_log!(
younger_reader_still_pending,
"younger reader remains queued while writer holds lock",
true,
younger_reader_still_pending
);
if let Some(Ok(writer_guard)) = writer_result {
drop(writer_guard);
}
let younger_reader_result = poll_once(&mut younger_reader_fut);
let younger_reader_acquired = matches!(younger_reader_result, Some(Ok(_)));
crate::assert_with_log!(
younger_reader_acquired,
"younger reader acquires after writer releases",
true,
younger_reader_acquired
);
crate::test_complete!("release_writer_does_not_wake_readers_younger_than_first_writer");
}
#[test]
fn downgrade_preserves_writer_preference_for_younger_readers() {
init_test("downgrade_preserves_writer_preference_for_younger_readers");
let cx = test_cx();
let lock = RwLock::new(0_u32);
let active_writer = write_blocking(&lock, &cx);
let mut writer_fut = lock.write(&cx);
let writer_pending = poll_once(&mut writer_fut).is_none();
crate::assert_with_log!(
writer_pending,
"writer queued behind active writer",
true,
writer_pending
);
let mut younger_reader_fut = lock.read(&cx);
let reader_pending = poll_once(&mut younger_reader_fut).is_none();
crate::assert_with_log!(
reader_pending,
"younger reader queued behind writer",
true,
reader_pending
);
let downgraded_reader = active_writer.downgrade();
let state = lock.debug_state();
crate::assert_with_log!(
state.readers == 1
&& state.writer_waiters == 1
&& state.writer_queue.len() == 1
&& state.reader_waiters.len() == 1,
"downgrade keeps younger reader queued behind writer",
true,
state.readers == 1
&& state.writer_waiters == 1
&& state.writer_queue.len() == 1
&& state.reader_waiters.len() == 1
);
let younger_reader_still_pending = poll_once(&mut younger_reader_fut).is_none();
crate::assert_with_log!(
younger_reader_still_pending,
"younger reader stays pending after downgrade",
true,
younger_reader_still_pending
);
let writer_still_pending = poll_once(&mut writer_fut).is_none();
crate::assert_with_log!(
writer_still_pending,
"writer waits while downgraded reader is held",
true,
writer_still_pending
);
drop(downgraded_reader);
let writer_guard = match poll_once(&mut writer_fut) {
Some(Ok(guard)) => guard,
other => panic!("writer should acquire before younger reader: {other:?}"),
};
let younger_reader_still_pending = poll_once(&mut younger_reader_fut).is_none();
crate::assert_with_log!(
younger_reader_still_pending,
"younger reader remains queued while writer holds lock",
true,
younger_reader_still_pending
);
drop(writer_guard);
let younger_reader_guard = match poll_once(&mut younger_reader_fut) {
Some(Ok(guard)) => guard,
other => panic!("younger reader should acquire after writer releases: {other:?}"),
};
drop(younger_reader_guard);
crate::test_complete!("downgrade_preserves_writer_preference_for_younger_readers");
}
#[test]
fn owned_downgrade_preserves_writer_preference_for_younger_readers() {
init_test("owned_downgrade_preserves_writer_preference_for_younger_readers");
let cx = test_cx();
let lock = StdArc::new(RwLock::new(0_u32));
let active_writer = OwnedRwLockWriteGuard::try_write(StdArc::clone(&lock))
.expect("owned writer should acquire");
let mut writer_fut = OwnedRwLockWriteGuard::write(StdArc::clone(&lock), &cx);
let writer_pending = poll_once(&mut writer_fut).is_none();
crate::assert_with_log!(
writer_pending,
"owned writer queued behind active writer",
true,
writer_pending
);
let mut younger_reader_fut = OwnedRwLockReadGuard::read(StdArc::clone(&lock), &cx);
let reader_pending = poll_once(&mut younger_reader_fut).is_none();
crate::assert_with_log!(
reader_pending,
"owned younger reader queued behind writer",
true,
reader_pending
);
let downgraded_reader = active_writer.downgrade();
let state = lock.debug_state();
crate::assert_with_log!(
state.readers == 1
&& state.writer_waiters == 1
&& state.writer_queue.len() == 1
&& state.reader_waiters.len() == 1,
"owned downgrade keeps younger reader queued behind writer",
true,
state.readers == 1
&& state.writer_waiters == 1
&& state.writer_queue.len() == 1
&& state.reader_waiters.len() == 1
);
let younger_reader_still_pending = poll_once(&mut younger_reader_fut).is_none();
crate::assert_with_log!(
younger_reader_still_pending,
"owned younger reader stays pending after downgrade",
true,
younger_reader_still_pending
);
let writer_still_pending = poll_once(&mut writer_fut).is_none();
crate::assert_with_log!(
writer_still_pending,
"owned writer waits while downgraded reader is held",
true,
writer_still_pending
);
drop(downgraded_reader);
let writer_guard = match poll_once(&mut writer_fut) {
Some(Ok(guard)) => guard,
other => panic!("owned writer should acquire before younger reader: {other:?}"),
};
let younger_reader_still_pending = poll_once(&mut younger_reader_fut).is_none();
crate::assert_with_log!(
younger_reader_still_pending,
"owned younger reader remains queued while writer holds lock",
true,
younger_reader_still_pending
);
drop(writer_guard);
let younger_reader_guard = match poll_once(&mut younger_reader_fut) {
Some(Ok(guard)) => guard,
other => {
panic!("owned younger reader should acquire after writer releases: {other:?}")
}
};
drop(younger_reader_guard);
crate::test_complete!("owned_downgrade_preserves_writer_preference_for_younger_readers");
}
#[test]
fn test_write_future_drop_wakes_readers_when_last_writer() {
init_test("test_write_future_drop_wakes_readers_when_last_writer");
let cx = test_cx();
let lock = RwLock::new(42_u32);
let write_guard = write_blocking(&lock, &cx);
let mut write_fut = lock.write(&cx);
let pending = poll_once(&mut write_fut).is_none();
crate::assert_with_log!(pending, "write future pending", true, pending);
let mut read_fut = lock.read(&cx);
let read_pending = poll_once(&mut read_fut).is_none();
crate::assert_with_log!(read_pending, "read future pending", true, read_pending);
drop(write_guard);
drop(write_fut);
let read_result = poll_once(&mut read_fut);
let acquired = matches!(read_result, Some(Ok(_)));
crate::assert_with_log!(
acquired,
"reader acquired after writer drop",
true,
acquired
);
let state = lock.debug_state();
crate::assert_with_log!(
state.writer_waiters == 0,
"no writer waiters left",
0usize,
state.writer_waiters
);
crate::test_complete!("test_write_future_drop_wakes_readers_when_last_writer");
}
#[test]
fn test_read_future_drop_forwards_wake_to_writer() {
init_test("test_read_future_drop_forwards_wake_to_writer");
let cx = test_cx();
let lock = StdArc::new(RwLock::new(0_u32));
let write_guard = write_blocking(&lock, &cx);
let mut read_fut = lock.read(&cx);
let pending = poll_once(&mut read_fut).is_none();
crate::assert_with_log!(pending, "read pending while writer active", true, pending);
let writer_lock = StdArc::clone(&lock);
let writer_done = StdArc::new(AtomicBool::new(false));
let writer_done_c = StdArc::clone(&writer_done);
let handle = thread::spawn(move || {
let cx = test_cx();
let _guard = write_blocking(&writer_lock, &cx);
writer_done_c.store(true, AtomicOrdering::Release);
});
thread::sleep(std::time::Duration::from_millis(20));
drop(write_guard);
drop(read_fut);
let _ = handle.join();
let done = writer_done.load(AtomicOrdering::Acquire);
crate::assert_with_log!(done, "second writer eventually acquired", true, done);
crate::test_complete!("test_read_future_drop_forwards_wake_to_writer");
}
#[test]
fn test_owned_read_guard_basic() {
init_test("test_owned_read_guard_basic");
let _cx = test_cx();
let lock = StdArc::new(RwLock::new(42_u32));
let guard =
OwnedRwLockReadGuard::try_read(StdArc::clone(&lock)).expect("try_read should succeed");
let value = guard.with_read(|v| *v);
crate::assert_with_log!(value == 42, "owned read guard value", 42u32, value);
drop(guard);
let can_write = lock.try_write().is_ok();
crate::assert_with_log!(can_write, "write after owned read drop", true, can_write);
crate::test_complete!("test_owned_read_guard_basic");
}
#[test]
fn test_owned_write_guard_basic() {
init_test("test_owned_write_guard_basic");
let _cx = test_cx();
let lock = StdArc::new(RwLock::new(42_u32));
let mut guard = OwnedRwLockWriteGuard::try_write(StdArc::clone(&lock))
.expect("try_write should succeed");
guard.with_write(|v| *v = 100);
drop(guard);
let read_guard = lock.try_read().expect("read after write drop");
crate::assert_with_log!(
*read_guard == 100,
"owned write persisted",
100u32,
*read_guard
);
crate::test_complete!("test_owned_write_guard_basic");
}
#[test]
fn owned_cancel_queued_read_waiter_cleans_state_before_drop() {
init_test("owned_cancel_queued_read_waiter_cleans_state_before_drop");
let cx = test_cx();
let cancel_cx = test_cx_with_slot(12);
let lock = StdArc::new(RwLock::new(0_u32));
let active_writer = write_blocking(lock.as_ref(), &cx);
let mut read_fut = OwnedRwLockReadGuard::read(StdArc::clone(&lock), &cancel_cx);
let read_pending = poll_once(&mut read_fut).is_none();
crate::assert_with_log!(read_pending, "owned reader queued", true, read_pending);
let mut writer_fut = OwnedRwLockWriteGuard::write(StdArc::clone(&lock), &cx);
let writer_pending = poll_once(&mut writer_fut).is_none();
crate::assert_with_log!(writer_pending, "owned writer queued", true, writer_pending);
cancel_cx.set_cancel_requested(true);
let cancelled = matches!(poll_once(&mut read_fut), Some(Err(RwLockError::Cancelled)));
crate::assert_with_log!(cancelled, "owned reader cancelled", true, cancelled);
let state = lock.debug_state();
crate::assert_with_log!(
state.reader_waiters.is_empty(),
"owned reader waiter removed without drop",
true,
state.reader_waiters.is_empty()
);
drop(active_writer);
let writer_result = poll_once(&mut writer_fut);
let writer_acquired = matches!(writer_result, Some(Ok(_)));
crate::assert_with_log!(
writer_acquired,
"owned writer acquires before cancelled reader future is dropped",
true,
writer_acquired
);
if let Some(Ok(guard)) = writer_result {
drop(guard);
}
drop(read_fut);
crate::test_complete!("owned_cancel_queued_read_waiter_cleans_state_before_drop");
}
#[test]
fn owned_cancel_pregranted_write_waiter_unblocks_readers_before_drop() {
init_test("owned_cancel_pregranted_write_waiter_unblocks_readers_before_drop");
let cx = test_cx();
let cancel_cx = test_cx_with_slot(13);
let lock = StdArc::new(RwLock::new(42_u32));
let read_guard = read_blocking(lock.as_ref(), &cx);
let mut write_fut = OwnedRwLockWriteGuard::write(StdArc::clone(&lock), &cancel_cx);
let write_pending = poll_once(&mut write_fut).is_none();
crate::assert_with_log!(write_pending, "owned writer queued", true, write_pending);
let mut read_fut = OwnedRwLockReadGuard::read(StdArc::clone(&lock), &cx);
let read_pending = poll_once(&mut read_fut).is_none();
crate::assert_with_log!(
read_pending,
"owned reader blocked by queued writer",
true,
read_pending
);
drop(read_guard);
cancel_cx.set_cancel_requested(true);
let cancelled = matches!(poll_once(&mut write_fut), Some(Err(RwLockError::Cancelled)));
crate::assert_with_log!(
cancelled,
"pre-granted owned writer cancelled",
true,
cancelled
);
let state = lock.debug_state();
crate::assert_with_log!(
!state.writer_active && state.writer_waiters == 0,
"pre-granted owned writer cleanup released writer slot",
true,
!state.writer_active && state.writer_waiters == 0
);
let read_result = poll_once(&mut read_fut);
let reader_acquired = matches!(read_result, Some(Ok(_)));
crate::assert_with_log!(
reader_acquired,
"owned reader acquires before cancelled writer future is dropped",
true,
reader_acquired
);
if let Some(Ok(guard)) = read_result {
drop(guard);
}
drop(write_fut);
crate::test_complete!("owned_cancel_pregranted_write_waiter_unblocks_readers_before_drop");
}
#[test]
fn test_multiple_writer_cascade() {
init_test("test_multiple_writer_cascade");
let cx = test_cx();
let lock = RwLock::new(0_u32);
let write1 = write_blocking(&lock, &cx);
let mut write2_fut = lock.write(&cx);
let w2_pending = poll_once(&mut write2_fut).is_none();
crate::assert_with_log!(w2_pending, "writer 2 pending", true, w2_pending);
let mut write3_fut = lock.write(&cx);
let w3_pending = poll_once(&mut write3_fut).is_none();
crate::assert_with_log!(w3_pending, "writer 3 pending", true, w3_pending);
let state = lock.debug_state();
crate::assert_with_log!(
state.writer_waiters == 2,
"two writers waiting",
2usize,
state.writer_waiters
);
drop(write1);
let w2_result = poll_once(&mut write2_fut);
let w2_acquired = matches!(w2_result, Some(Ok(_)));
crate::assert_with_log!(w2_acquired, "writer 2 acquired", true, w2_acquired);
let w3_still_pending = poll_once(&mut write3_fut).is_none();
crate::assert_with_log!(
w3_still_pending,
"writer 3 still pending",
true,
w3_still_pending
);
if let Some(Ok(guard)) = w2_result {
drop(guard);
}
let w3_result = poll_once(&mut write3_fut);
let w3_acquired = matches!(w3_result, Some(Ok(_)));
crate::assert_with_log!(w3_acquired, "writer 3 acquired", true, w3_acquired);
crate::test_complete!("test_multiple_writer_cascade");
}
#[test]
fn test_try_read_blocked_by_writer_waiters() {
init_test("test_try_read_blocked_by_writer_waiters");
let cx = test_cx();
let lock = RwLock::new(0_u32);
let read = read_blocking(&lock, &cx);
let mut write_fut = lock.write(&cx);
let pending = poll_once(&mut write_fut).is_none();
crate::assert_with_log!(pending, "writer queued", true, pending);
let try_read_guard = lock.try_read();
crate::assert_with_log!(
try_read_guard.is_err(),
"try_read blocked by writer waiter",
true,
try_read_guard.is_err()
);
drop(read);
crate::test_complete!("test_try_read_blocked_by_writer_waiters");
}
#[test]
fn cancel_only_write_waiter_unblocks_readers() {
init_test("cancel_only_write_waiter_unblocks_readers");
let cx = test_cx();
let lock = RwLock::new(42_u32);
let read_guard = read_blocking(&lock, &cx);
let cancel_cx: Cx = Cx::new(
crate::types::RegionId::from_arena(ArenaIndex::new(0, 10)),
crate::types::TaskId::from_arena(ArenaIndex::new(0, 10)),
crate::types::Budget::INFINITE,
);
let mut write_fut = lock.write(&cancel_cx);
let pending = poll_once(&mut write_fut).is_none();
crate::assert_with_log!(pending, "write waiter pending", true, pending);
let mut read_fut = lock.read(&cx);
let read_pending = poll_once(&mut read_fut).is_none();
crate::assert_with_log!(
read_pending,
"reader blocked by writer waiter",
true,
read_pending
);
cancel_cx.set_cancel_requested(true);
let cancelled = matches!(poll_once(&mut write_fut), Some(Err(RwLockError::Cancelled)));
crate::assert_with_log!(cancelled, "write waiter cancelled", true, cancelled);
drop(write_fut);
let state = lock.debug_state();
crate::assert_with_log!(
state.writer_waiters == 0,
"writer_waiters cleared",
0usize,
state.writer_waiters
);
let read_result = poll_once(&mut read_fut);
let reader_acquired = matches!(read_result, Some(Ok(_)));
crate::assert_with_log!(
reader_acquired,
"reader unblocked after write cancel",
true,
reader_acquired
);
drop(read_guard);
crate::test_complete!("cancel_only_write_waiter_unblocks_readers");
}
#[test]
fn drop_write_future_cleans_writer_waiters_counter() {
init_test("drop_write_future_cleans_writer_waiters_counter");
let cx = test_cx();
let lock = RwLock::new(0_u32);
let _read = read_blocking(&lock, &cx);
let mut w1 = lock.write(&cx);
let _ = poll_once(&mut w1);
let mut w2 = lock.write(&cx);
let _ = poll_once(&mut w2);
let state = lock.debug_state();
crate::assert_with_log!(
state.writer_waiters == 2,
"2 writer waiters",
2usize,
state.writer_waiters
);
drop(w1);
let state = lock.debug_state();
crate::assert_with_log!(
state.writer_waiters == 1,
"1 writer waiter after drop",
1usize,
state.writer_waiters
);
crate::assert_with_log!(
state.writer_queue.len() == 1,
"1 in writer queue after drop",
1usize,
state.writer_queue.len()
);
drop(w2);
let state = lock.debug_state();
crate::assert_with_log!(
state.writer_waiters == 0,
"0 writer waiters after both dropped",
0usize,
state.writer_waiters
);
crate::test_complete!("drop_write_future_cleans_writer_waiters_counter");
}
#[test]
fn rwlock_poison_propagation() {
init_test("rwlock_poison_propagation");
let lock = StdArc::new(RwLock::new(0_u32));
let l = StdArc::clone(&lock);
let handle = thread::spawn(move || {
let cx = test_cx();
let _guard = write_blocking(&l, &cx);
panic!("poison rwlock");
});
let _ = handle.join();
let poisoned = lock.is_poisoned();
crate::assert_with_log!(poisoned, "rwlock is poisoned", true, poisoned);
let try_read = lock.try_read();
let read_is_poisoned = matches!(try_read, Err(TryReadError::Poisoned));
crate::assert_with_log!(
read_is_poisoned,
"try_read Poisoned",
true,
read_is_poisoned
);
let try_write = lock.try_write();
let write_is_poisoned = matches!(try_write, Err(TryWriteError::Poisoned));
crate::assert_with_log!(
write_is_poisoned,
"try_write Poisoned",
true,
write_is_poisoned
);
let cx = test_cx();
let mut read_fut = lock.read(&cx);
let read_result = poll_once(&mut read_fut);
let read_poisoned = matches!(read_result, Some(Err(RwLockError::Poisoned)));
crate::assert_with_log!(read_poisoned, "read() Poisoned", true, read_poisoned);
let mut write_fut = lock.write(&cx);
let write_result = poll_once(&mut write_fut);
let write_poisoned = matches!(write_result, Some(Err(RwLockError::Poisoned)));
crate::assert_with_log!(write_poisoned, "write() Poisoned", true, write_poisoned);
crate::test_complete!("rwlock_poison_propagation");
}
#[test]
fn rwlock_error_debug_clone_copy_eq_display() {
let poisoned = RwLockError::Poisoned;
let cancelled = RwLockError::Cancelled;
let polled_after_completion = RwLockError::PolledAfterCompletion;
let dbg = format!("{poisoned:?}");
assert!(dbg.contains("Poisoned"));
let cloned = poisoned;
assert_eq!(cloned, RwLockError::Poisoned);
assert_ne!(poisoned, cancelled);
assert_ne!(poisoned, polled_after_completion);
assert!(poisoned.to_string().contains("poisoned"));
assert!(cancelled.to_string().contains("cancelled"));
assert!(
polled_after_completion
.to_string()
.contains("polled after completion")
);
}
#[test]
fn try_read_error_debug_clone_copy_eq_display() {
let locked = TryReadError::Locked;
let poisoned = TryReadError::Poisoned;
let dbg = format!("{locked:?}");
assert!(dbg.contains("Locked"));
let copied = locked;
assert_eq!(copied, TryReadError::Locked);
assert_ne!(locked, poisoned);
assert!(locked.to_string().contains("write-locked"));
assert!(poisoned.to_string().contains("poisoned"));
}
#[test]
fn try_write_error_debug_clone_copy_eq_display() {
let locked = TryWriteError::Locked;
let poisoned = TryWriteError::Poisoned;
let dbg = format!("{locked:?}");
assert!(dbg.contains("Locked"));
let copied = locked;
assert_eq!(copied, TryWriteError::Locked);
assert_ne!(locked, poisoned);
assert!(locked.to_string().contains("locked"));
assert!(poisoned.to_string().contains("poisoned"));
}
#[test]
fn rwlock_debug() {
let lock = RwLock::new(42_i32);
let dbg = format!("{lock:?}");
assert!(dbg.contains("RwLock"));
}
struct CountWaker(StdArc<std::sync::atomic::AtomicUsize>);
impl std::task::Wake for CountWaker {
fn wake(self: StdArc<Self>) {
self.0.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
}
#[test]
fn test_drop_queued_writer_wakes_readers_when_readers_active() {
init_test("test_drop_queued_writer_wakes_readers_when_readers_active");
let cx = test_cx();
let lock = RwLock::new(0_u32);
let wake_state = StdArc::new(std::sync::atomic::AtomicUsize::new(0));
let waker = Waker::from(StdArc::new(CountWaker(wake_state.clone())));
let mut task_cx = Context::from_waker(&waker);
let mut fut_read1 = lock.read(&cx);
let Poll::Ready(Ok(_guard1)) = std::pin::Pin::new(&mut fut_read1).poll(&mut task_cx) else {
panic!("Expected Ready") };
let mut fut_write = lock.write(&cx);
let pending_write = std::pin::Pin::new(&mut fut_write).poll(&mut task_cx);
assert!(pending_write.is_pending());
let mut fut_read2 = lock.read(&cx);
let pending_read = std::pin::Pin::new(&mut fut_read2).poll(&mut task_cx);
assert!(pending_read.is_pending());
wake_state.store(0, AtomicOrdering::SeqCst);
drop(fut_write);
let wake_count = wake_state.load(AtomicOrdering::SeqCst);
crate::assert_with_log!(
wake_count > 0,
"reader woken after writer drop",
true,
wake_count > 0
);
crate::test_complete!("test_drop_queued_writer_wakes_readers_when_readers_active");
}
#[test]
fn bounded_writer_preference_eventually_admits_starved_reader() {
init_test("bounded_writer_preference_eventually_admits_starved_reader");
let cx = test_cx();
let lock = RwLock::new(0_u32);
let wake_state = StdArc::new(std::sync::atomic::AtomicUsize::new(0));
let waker = Waker::from(StdArc::new(CountWaker(wake_state.clone())));
let mut task_cx = Context::from_waker(&waker);
const N: usize = MAX_CONSECUTIVE_WRITERS_BEFORE_READER_BATCH;
let mut fut_initial_w = lock.write(&cx);
let Poll::Ready(Ok(initial_w_guard)) =
std::pin::Pin::new(&mut fut_initial_w).poll(&mut task_cx)
else {
panic!("expected Ready on uncontended write")
};
let mut writer_futs: Vec<_> = (0..(N + 2)).map(|_| Box::pin(lock.write(&cx))).collect();
for f in &mut writer_futs {
assert!(
f.as_mut().poll(&mut task_cx).is_pending(),
"successor writers must queue"
);
}
let mut fut_starved_reader = lock.read(&cx);
assert!(
std::pin::Pin::new(&mut fut_starved_reader)
.poll(&mut task_cx)
.is_pending(),
"reader must initially queue behind active writer"
);
drop(initial_w_guard);
let mut readers_active = false;
let mut writers_drained = 0;
for cycle in 0..(N + 2) {
let mut popped = None;
for (i, f) in writer_futs.iter_mut().enumerate() {
if let Poll::Ready(Ok(_g)) = f.as_mut().poll(&mut task_cx) {
popped = Some(i);
break;
}
}
if let Some(i) = popped {
writers_drained += 1;
drop(writer_futs.remove(i));
} else {
if std::pin::Pin::new(&mut fut_starved_reader)
.poll(&mut task_cx)
.is_ready()
{
readers_active = true;
break;
}
}
if std::pin::Pin::new(&mut fut_starved_reader)
.poll(&mut task_cx)
.is_ready()
{
readers_active = true;
break;
}
assert!(
cycle < N + 1,
"reader should be admitted within N writer cycles, got {cycle}"
);
}
crate::assert_with_log!(
readers_active,
"starved reader admitted within N writer cycles",
true,
readers_active
);
crate::assert_with_log!(
writers_drained > 0 && writers_drained <= N + 1,
"writers drained within bound",
true,
writers_drained > 0 && writers_drained <= N + 1
);
crate::test_complete!("bounded_writer_preference_eventually_admits_starved_reader");
}
#[test]
fn forced_reader_turn_does_not_drain_younger_reader_batch_ahead_of_head_writer() {
init_test("forced_reader_turn_does_not_drain_younger_reader_batch_ahead_of_head_writer");
let cx = test_cx();
let lock = RwLock::new(0_u32);
const N: usize = MAX_CONSECUTIVE_WRITERS_BEFORE_READER_BATCH;
let active_writer = write_blocking(&lock, &cx);
let mut writer_futs: Vec<_> = (0..=N).map(|_| lock.write(&cx)).collect();
for fut in &mut writer_futs {
assert!(poll_once(fut).is_none(), "queued writer must wait");
}
let mut younger_reader_1 = lock.read(&cx);
let mut younger_reader_2 = lock.read(&cx);
assert!(
poll_once(&mut younger_reader_1).is_none(),
"first younger reader must queue behind the writers"
);
assert!(
poll_once(&mut younger_reader_2).is_none(),
"second younger reader must queue behind the writers"
);
drop(active_writer);
for cycle in 0..N {
let mut granted = None;
for (i, fut) in writer_futs.iter_mut().enumerate() {
match poll_once(fut) {
Some(Ok(guard)) => {
granted = Some((i, guard));
break;
}
Some(Err(err)) => panic!("queued writer failed on cycle {cycle}: {err:?}"),
None => {}
}
}
let (ready_index, guard) = granted.expect("one queued writer should acquire per cycle");
writer_futs.remove(ready_index);
drop(guard);
}
assert_eq!(writer_futs.len(), 1, "one head writer should remain queued");
let reader_guard = match poll_once(&mut younger_reader_1) {
Some(Ok(guard)) => guard,
other => panic!("forced reader turn should admit exactly one reader: {other:?}"),
};
assert!(
poll_once(&mut younger_reader_2).is_none(),
"second younger reader must remain queued behind the head writer"
);
assert!(
poll_once(&mut writer_futs[0]).is_none(),
"head writer must wait while the forced reader turn is held"
);
drop(reader_guard);
let writer_guard = match poll_once(&mut writer_futs[0]) {
Some(Ok(guard)) => guard,
other => {
panic!("head writer should run immediately after the forced reader turn: {other:?}")
}
};
assert!(
poll_once(&mut younger_reader_2).is_none(),
"remaining younger reader must still wait while the head writer runs"
);
drop(writer_guard);
let trailing_reader_guard = match poll_once(&mut younger_reader_2) {
Some(Ok(guard)) => guard,
other => panic!("remaining younger reader should run after the head writer: {other:?}"),
};
drop(trailing_reader_guard);
crate::test_complete!(
"forced_reader_turn_does_not_drain_younger_reader_batch_ahead_of_head_writer"
);
}
#[test]
fn writer_panic_wakes_all_queued_waiters_without_pregranting_slots() {
init_test("writer_panic_wakes_all_queued_waiters_without_pregranting_slots");
let cx = test_cx();
let lock = RwLock::new(0_u32);
let active_writer = write_blocking(&lock, &cx);
let writer_wake_count = StdArc::new(std::sync::atomic::AtomicUsize::new(0));
let writer_waker = Waker::from(StdArc::new(CountWaker(writer_wake_count.clone())));
let mut writer_task_cx = Context::from_waker(&writer_waker);
let mut writer_fut = lock.write(&cx);
let writer_pending = std::pin::Pin::new(&mut writer_fut)
.poll(&mut writer_task_cx)
.is_pending();
crate::assert_with_log!(
writer_pending,
"writer waiter queued before poison",
true,
writer_pending
);
let reader_wake_count = StdArc::new(std::sync::atomic::AtomicUsize::new(0));
let reader_waker = Waker::from(StdArc::new(CountWaker(reader_wake_count.clone())));
let mut reader_task_cx = Context::from_waker(&reader_waker);
let mut reader_fut = lock.read(&cx);
let reader_pending = std::pin::Pin::new(&mut reader_fut)
.poll(&mut reader_task_cx)
.is_pending();
crate::assert_with_log!(
reader_pending,
"reader waiter queued before poison",
true,
reader_pending
);
let panic_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _guard = active_writer;
panic!("poison rwlock");
}));
crate::assert_with_log!(
panic_result.is_err(),
"writer panic poisons the lock",
true,
panic_result.is_err()
);
let state = lock.debug_state();
crate::assert_with_log!(
!state.writer_active && state.readers == 0,
"poison handoff does not pregrant reader or writer slots",
true,
!state.writer_active && state.readers == 0
);
crate::assert_with_log!(
state.writer_waiters == 1
&& state.writer_queue.len() == 1
&& state.reader_waiters.len() == 1,
"poison handoff leaves queued waiters to fail closed on poll",
true,
state.writer_waiters == 1
&& state.writer_queue.len() == 1
&& state.reader_waiters.len() == 1
);
let writer_woken = writer_wake_count.load(AtomicOrdering::SeqCst) > 0;
let reader_woken = reader_wake_count.load(AtomicOrdering::SeqCst) > 0;
crate::assert_with_log!(
writer_woken,
"queued writer is woken on poison",
true,
writer_woken
);
crate::assert_with_log!(
reader_woken,
"queued reader is also woken on poison",
true,
reader_woken
);
let writer_result = std::pin::Pin::new(&mut writer_fut).poll(&mut writer_task_cx);
let writer_poisoned = matches!(writer_result, Poll::Ready(Err(RwLockError::Poisoned)));
crate::assert_with_log!(
writer_poisoned,
"queued writer fails closed with poison",
true,
writer_poisoned
);
let reader_result = std::pin::Pin::new(&mut reader_fut).poll(&mut reader_task_cx);
let reader_poisoned = matches!(reader_result, Poll::Ready(Err(RwLockError::Poisoned)));
crate::assert_with_log!(
reader_poisoned,
"queued reader fails closed with poison",
true,
reader_poisoned
);
let final_state = lock.debug_state();
crate::assert_with_log!(
!final_state.writer_active
&& final_state.readers == 0
&& final_state.writer_waiters == 0
&& final_state.writer_queue.is_empty()
&& final_state.reader_waiters.is_empty(),
"poisoned waiters clean themselves out without leaking reservations",
true,
!final_state.writer_active
&& final_state.readers == 0
&& final_state.writer_waiters == 0
&& final_state.writer_queue.is_empty()
&& final_state.reader_waiters.is_empty()
);
crate::test_complete!("writer_panic_wakes_all_queued_waiters_without_pregranting_slots");
}
}
#[cfg(test)]
mod metamorphic_tests {
use super::*;
use crate::cx::{Cx, cap};
use crate::lab::{LabConfig, LabRuntime};
use crate::types::{Budget, RegionId, TaskId};
use crate::util::{ArenaIndex, DetRng};
use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll, Waker};
use proptest::prelude::*;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn test_cx() -> Cx<cap::All> {
test_cx_with_slot(0)
}
fn test_cx_with_slot(slot: u32) -> Cx<cap::All> {
Cx::new(
RegionId::from_arena(ArenaIndex::new(0, slot)),
TaskId::from_arena(ArenaIndex::new(0, slot)),
Budget::INFINITE,
)
}
fn block_on<F: Future>(f: F) -> F::Output {
let waker = std::task::Waker::noop().clone();
let mut cx = Context::from_waker(&waker);
let mut pinned = Box::pin(f);
loop {
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => {}
}
}
}
fn poll_once<T>(future: &mut (impl Future<Output = T> + Unpin)) -> Option<T> {
let waker = Waker::noop();
let mut cx = Context::from_waker(waker);
match std::pin::Pin::new(future).poll(&mut cx) {
Poll::Ready(value) => Some(value),
Poll::Pending => None,
}
}
#[derive(Debug)]
struct CountWaker {
count: Arc<AtomicUsize>,
}
impl CountWaker {
fn new() -> (Self, Arc<AtomicUsize>) {
let count = Arc::new(AtomicUsize::new(0));
(
Self {
count: count.clone(),
},
count,
)
}
}
impl std::task::Wake for CountWaker {
fn wake(self: Arc<Self>) {
self.count.fetch_add(1, Ordering::SeqCst);
}
}
#[derive(Debug)]
struct RwLockTestHarness<T> {
lock: Arc<RwLock<T>>,
}
impl<T> RwLockTestHarness<T> {
fn new(value: T) -> Self {
Self {
lock: Arc::new(RwLock::new(value)),
}
}
fn lock(&self) -> Arc<RwLock<T>> {
self.lock.clone()
}
}
proptest! {
#[test]
fn mr_writer_preference_enforcement(
num_readers in 2usize..8,
_seed in any::<u64>(),
) {
let _runtime = std::rc::Rc::new(LabRuntime::new(LabConfig::default()));
let harness = RwLockTestHarness::new(0u64);
let lock = harness.lock();
let cx = test_cx();
let _rng = DetRng::new(_seed);
let write_guard = block_on(lock.write(&cx)).expect("Initial write should succeed");
let writer_lock = lock.clone();
let mut write_fut = OwnedRwLockWriteGuard::write(writer_lock, &cx);
let (writer_waker, writer_wake_count) = CountWaker::new();
let writer_waker_obj = Waker::from(Arc::new(writer_waker));
let mut writer_task_cx = Context::from_waker(&writer_waker_obj);
let writer_poll = Pin::new(&mut write_fut).poll(&mut writer_task_cx);
prop_assert!(
writer_poll.is_pending(),
"MR1 VIOLATION: Second writer should be pending while first writer active"
);
let mut reader_results = Vec::new();
for _ in 0..num_readers {
let lock_clone = lock.clone();
let (count_waker, wake_count) = CountWaker::new();
let waker = Waker::from(Arc::new(count_waker));
let mut task_cx = Context::from_waker(&waker);
let mut read_fut = OwnedRwLockReadGuard::read(lock_clone, &cx);
let poll_result = Pin::new(&mut read_fut).poll(&mut task_cx);
prop_assert!(
poll_result.is_pending(),
"MR1 VIOLATION: Reader acquired lock while writer was active or queued"
);
reader_results.push((read_fut, wake_count));
}
drop(write_guard);
prop_assert!(
writer_wake_count.load(Ordering::SeqCst) > 0,
"MR1 VIOLATION: Queued writer was not woken when lock released"
);
for (_, wake_count) in &reader_results {
prop_assert_eq!(
wake_count.load(Ordering::SeqCst),
0,
"MR1 VIOLATION: Reader was woken before the older queued writer"
);
}
let writer_result = Pin::new(&mut write_fut).poll(&mut writer_task_cx);
prop_assert!(
matches!(writer_result, Poll::Ready(Ok(_))),
"MR1 VIOLATION: Queued writer failed to acquire after being woken"
);
}
}
proptest! {
#[test]
fn mr_reader_concurrency_capacity(
num_readers in 2usize..12,
_seed in any::<u64>(),
) {
let _runtime = std::rc::Rc::new(LabRuntime::new(LabConfig::default()));
let harness = RwLockTestHarness::new(0u64);
let lock = harness.lock();
let cx = test_cx();
let _rng = DetRng::new(_seed);
prop_assert!(
matches!(lock.try_read(), Ok(_)),
"MR2 SETUP VIOLATION: Lock should be available for reads"
);
let mut read_guards = Vec::new();
for _ in 0..num_readers {
let guard = block_on(lock.read(&cx))
.expect("Concurrent reader should succeed");
read_guards.push(guard);
}
prop_assert!(
read_guards.len() == num_readers,
"MR2 VIOLATION: Not all concurrent readers succeeded. Got {}, expected {}",
read_guards.len(), num_readers
);
let state = lock.debug_state();
prop_assert_eq!(
state.readers,
num_readers,
"MR2 VIOLATION: Reader count mismatch while guards are held"
);
prop_assert!(
!state.writer_active && state.writer_waiters == 0,
"MR2 VIOLATION: Writer state should remain idle while only readers hold the lock"
);
prop_assert!(
state.reader_waiters.is_empty() && state.writer_queue.is_empty(),
"MR2 VIOLATION: Reader-only acquisition should not enqueue waiters"
);
let extra_reader = lock.try_read();
prop_assert!(
extra_reader.is_ok(),
"MR2 VIOLATION: Additional readers should still acquire immediately"
);
drop(extra_reader);
let writer_try_result = lock.try_write();
prop_assert!(
matches!(writer_try_result, Err(TryWriteError::Locked)),
"MR2 VIOLATION: Writer barged in while readers were active"
);
drop(read_guards);
let post_read_writer_try = lock.try_write();
prop_assert!(
post_read_writer_try.is_ok(),
"MR2 VIOLATION: Writer could not acquire after all readers released"
);
}
}
proptest! {
#[test]
fn mr_writer_cancellation_releases_preference(
num_readers_after_cancel in 2usize..6,
_seed in any::<u64>(),
) {
let _runtime = std::rc::Rc::new(LabRuntime::new(LabConfig::default()));
let harness = RwLockTestHarness::new(0u64);
let lock = harness.lock();
let cx = test_cx();
let _rng = DetRng::new(_seed);
let blocking_writer = block_on(lock.write(&cx))
.expect("Blocking writer should acquire");
let cancelable_lock = lock.clone();
let mut cancelable_write_fut = OwnedRwLockWriteGuard::write(cancelable_lock, &cx);
let (cancel_waker, _cancel_wake_count) = CountWaker::new();
let cancel_waker_obj = Waker::from(Arc::new(cancel_waker));
let mut cancel_task_cx = Context::from_waker(&cancel_waker_obj);
let cancel_poll = Pin::new(&mut cancelable_write_fut).poll(&mut cancel_task_cx);
prop_assert!(
cancel_poll.is_pending(),
"MR3 SETUP VIOLATION: Cancelable writer should be pending"
);
let mut reader_futures = Vec::new();
let mut reader_wake_counts = Vec::new();
for _ in 0..num_readers_after_cancel {
let reader_lock = lock.clone();
let mut read_fut = OwnedRwLockReadGuard::read(reader_lock, &cx);
let (reader_waker, reader_wake_count) = CountWaker::new();
let reader_waker_obj = Waker::from(Arc::new(reader_waker));
let mut reader_task_cx = Context::from_waker(&reader_waker_obj);
let reader_poll = Pin::new(&mut read_fut).poll(&mut reader_task_cx);
prop_assert!(
reader_poll.is_pending(),
"MR3 SETUP VIOLATION: Reader should be blocked by writer preference"
);
reader_futures.push(read_fut);
reader_wake_counts.push(reader_wake_count);
}
drop(cancelable_write_fut);
drop(blocking_writer);
for (i, wake_count) in reader_wake_counts.iter().enumerate() {
prop_assert!(
wake_count.load(Ordering::SeqCst) > 0,
"MR3 VIOLATION: Reader {} not woken after writer cancellation", i
);
}
let mut completed_readers = 0;
for mut read_fut in reader_futures {
let (completion_waker, _) = CountWaker::new();
let completion_waker_obj = Waker::from(Arc::new(completion_waker));
let mut completion_task_cx = Context::from_waker(&completion_waker_obj);
let completion_poll = Pin::new(&mut read_fut).poll(&mut completion_task_cx);
if matches!(completion_poll, Poll::Ready(Ok(_))) {
completed_readers += 1;
}
}
prop_assert!(
completed_readers >= num_readers_after_cancel / 2,
"MR3 VIOLATION: Too few readers completed after writer cancellation. Got {}, expected at least {}",
completed_readers, num_readers_after_cancel / 2
);
}
}
proptest! {
#[test]
fn mr_reader_cancellation_ref_count_correctness(
num_readers_to_cancel in 1usize..6,
_seed in any::<u64>(),
) {
let _runtime = std::rc::Rc::new(LabRuntime::new(LabConfig::default()));
let harness = RwLockTestHarness::new(0u64);
let lock = harness.lock();
let cx = test_cx();
let _rng = DetRng::new(_seed);
let mut reader_guards = Vec::new();
for _ in 0..num_readers_to_cancel {
let guard = block_on(lock.read(&cx))
.expect("Reader acquisition should succeed");
reader_guards.push(guard);
}
let writer_try_result = lock.try_write();
prop_assert!(
matches!(writer_try_result, Err(TryWriteError::Locked)),
"MR4 SETUP VIOLATION: Writer should be blocked by active readers"
);
let initial_reader_count = reader_guards.len();
reader_guards.clear();
let post_cancel_writer_try = lock.try_write();
prop_assert!(
post_cancel_writer_try.is_ok(),
"MR4 VIOLATION: Writer cannot acquire after {} readers cancelled - ref count likely leaked",
initial_reader_count
);
if let Ok(writer_guard) = post_cancel_writer_try {
let concurrent_reader_try = lock.try_read();
prop_assert!(
matches!(concurrent_reader_try, Err(TryReadError::Locked)),
"MR4 VIOLATION: Reader can acquire while writer active - exclusive access violated"
);
drop(writer_guard);
}
let post_writer_reader = lock.try_read();
prop_assert!(
post_writer_reader.is_ok(),
"MR4 VIOLATION: Readers cannot acquire after writer release - lock state corrupted"
);
}
}
proptest! {
#[test]
fn mr_writer_preference_under_cancellation_pressure(
num_cancellable_readers in 3usize..8,
num_persistent_readers in 2usize..5,
_seed in any::<u64>(),
) {
let _runtime = std::rc::Rc::new(LabRuntime::new(LabConfig::default()));
let harness = RwLockTestHarness::new(0u64);
let lock = harness.lock();
let cx = test_cx();
let _rng = DetRng::new(_seed);
let blocking_writer = block_on(lock.write(&cx))
.expect("Initial writer should acquire");
let mut cancellable_readers = Vec::new();
for _ in 0..num_cancellable_readers {
let reader_lock = lock.clone();
let read_fut = OwnedRwLockReadGuard::read(reader_lock, &cx);
cancellable_readers.push(read_fut);
}
let priority_writer_lock = lock.clone();
let mut priority_write_fut = OwnedRwLockWriteGuard::write(priority_writer_lock, &cx);
let (priority_waker, priority_wake_count) = CountWaker::new();
let priority_waker_obj = Waker::from(Arc::new(priority_waker));
let mut priority_task_cx = Context::from_waker(&priority_waker_obj);
let priority_poll = Pin::new(&mut priority_write_fut).poll(&mut priority_task_cx);
prop_assert!(
priority_poll.is_pending(),
"MR5 SETUP VIOLATION: Priority writer should be pending"
);
let mut persistent_readers = Vec::new();
let mut persistent_wake_counts = Vec::new();
for _ in 0..num_persistent_readers {
let reader_lock = lock.clone();
let mut read_fut = OwnedRwLockReadGuard::read(reader_lock, &cx);
let (reader_waker, reader_wake_count) = CountWaker::new();
let reader_waker_obj = Waker::from(Arc::new(reader_waker));
let mut reader_task_cx = Context::from_waker(&reader_waker_obj);
let reader_poll = Pin::new(&mut read_fut).poll(&mut reader_task_cx);
prop_assert!(
reader_poll.is_pending(),
"MR5 SETUP VIOLATION: Persistent reader should be blocked by writer preference"
);
persistent_readers.push(read_fut);
persistent_wake_counts.push(reader_wake_count);
}
drop(cancellable_readers);
drop(blocking_writer);
prop_assert!(
priority_wake_count.load(Ordering::SeqCst) > 0,
"MR5 VIOLATION: Priority writer not woken despite preference policy"
);
let priority_result = Pin::new(&mut priority_write_fut).poll(&mut priority_task_cx);
prop_assert!(
matches!(priority_result, Poll::Ready(Ok(_))),
"MR5 VIOLATION: Priority writer failed to acquire despite being woken"
);
for (i, wake_count) in persistent_wake_counts.iter().enumerate() {
prop_assert!(
wake_count.load(Ordering::SeqCst) == 0,
"MR5 VIOLATION: Persistent reader {} was woken while writer active", i
);
}
}
}
#[derive(Debug)]
struct OlderReaderSuffixOutcome {
reader_ready_after_release: bool,
readers_while_guard_held: usize,
writer_waiters_while_reader_active: usize,
writer_wakes_before_reader: Vec<usize>,
}
#[derive(Debug, PartialEq, Eq)]
struct UpgradeWriterLivenessSignature {
active_readers_after_queue: usize,
queued_writer_waiters_after_queue: usize,
queued_reader_waiters_after_queue: usize,
late_reader_wakes_before_writer_turn: Vec<usize>,
late_readers_pending_while_writer_held: usize,
late_readers_ready_after_writer_release: usize,
final_readers: usize,
final_writer_waiters: usize,
final_reader_waiters: usize,
}
fn older_reader_admission_with_younger_writer_suffix(
extra_writers: usize,
) -> OlderReaderSuffixOutcome {
let _runtime = std::rc::Rc::new(LabRuntime::new(LabConfig::default()));
let harness = RwLockTestHarness::new(0u64);
let lock = harness.lock();
let cx = test_cx();
let blocking_writer = block_on(lock.write(&cx)).expect("initial writer should acquire");
let mut older_reader_fut = OwnedRwLockReadGuard::read(lock.clone(), &cx);
let (reader_waker, _reader_wake_count) = CountWaker::new();
let reader_waker_obj = Waker::from(Arc::new(reader_waker));
let mut reader_task_cx = Context::from_waker(&reader_waker_obj);
assert!(
Pin::new(&mut older_reader_fut)
.poll(&mut reader_task_cx)
.is_pending(),
"older reader should queue behind the active writer"
);
let mut writer_futs = Vec::new();
let mut writer_wake_counts = Vec::new();
let mut writer_wakers = Vec::new();
for _ in 0..extra_writers {
let mut writer_fut = OwnedRwLockWriteGuard::write(lock.clone(), &cx);
let (writer_waker, wake_count) = CountWaker::new();
let writer_waker_obj = Waker::from(Arc::new(writer_waker));
let mut writer_task_cx = Context::from_waker(&writer_waker_obj);
assert!(
Pin::new(&mut writer_fut)
.poll(&mut writer_task_cx)
.is_pending(),
"younger writer should queue behind the older reader"
);
writer_futs.push(writer_fut);
writer_wake_counts.push(wake_count);
writer_wakers.push(writer_waker_obj);
}
drop(blocking_writer);
let reader_guard = match Pin::new(&mut older_reader_fut).poll(&mut reader_task_cx) {
Poll::Ready(Ok(guard)) => Some(guard),
_ => None,
};
let state_while_reader_active = lock.debug_state();
drop(reader_guard);
drop(writer_futs);
drop(writer_wakers);
OlderReaderSuffixOutcome {
reader_ready_after_release: state_while_reader_active.readers > 0,
readers_while_guard_held: state_while_reader_active.readers,
writer_waiters_while_reader_active: state_while_reader_active.writer_waiters,
writer_wakes_before_reader: writer_wake_counts
.iter()
.map(|count| count.load(Ordering::SeqCst))
.collect(),
}
}
fn upgrade_writer_liveness_signature(
prime_with_read: bool,
late_readers: usize,
) -> UpgradeWriterLivenessSignature {
let harness = RwLockTestHarness::new(0u64);
let lock = harness.lock();
let cx = test_cx();
let blocking_peer_reader = block_on(lock.read(&cx)).expect("peer reader should acquire");
if prime_with_read {
let transient_reader = block_on(lock.read(&cx)).expect("upgrade reader should acquire");
drop(transient_reader);
}
let mut writer_fut = OwnedRwLockWriteGuard::write(lock.clone(), &cx);
let (writer_waker, writer_wake_count) = CountWaker::new();
let writer_waker_obj = Waker::from(Arc::new(writer_waker));
let mut writer_task_cx = Context::from_waker(&writer_waker_obj);
assert!(
std::pin::Pin::new(&mut writer_fut)
.poll(&mut writer_task_cx)
.is_pending(),
"writer should queue while the blocking reader is active"
);
let mut late_reader_futs = Vec::new();
let mut late_reader_wake_counts = Vec::new();
for _ in 0..late_readers {
let mut late_reader_fut = OwnedRwLockReadGuard::read(lock.clone(), &cx);
let (late_reader_waker, late_reader_wake_count) = CountWaker::new();
let late_reader_waker_obj = Waker::from(Arc::new(late_reader_waker));
let mut late_reader_task_cx = Context::from_waker(&late_reader_waker_obj);
assert!(
std::pin::Pin::new(&mut late_reader_fut)
.poll(&mut late_reader_task_cx)
.is_pending(),
"late reader should queue behind the waiting writer"
);
late_reader_futs.push((late_reader_fut, late_reader_waker_obj));
late_reader_wake_counts.push(late_reader_wake_count);
}
let queued_state = lock.debug_state();
drop(blocking_peer_reader);
assert!(
writer_wake_count.load(Ordering::SeqCst) > 0,
"writer should be woken once the last blocking reader releases"
);
let writer_guard = match std::pin::Pin::new(&mut writer_fut).poll(&mut writer_task_cx) {
Poll::Ready(Ok(guard)) => guard,
other => panic!("writer did not acquire after wake: {other:?}"),
};
let late_reader_wakes_before_writer_turn = late_reader_wake_counts
.iter()
.map(|count| count.load(Ordering::SeqCst))
.collect::<Vec<_>>();
let late_readers_pending_while_writer_held = late_reader_futs
.iter_mut()
.map(|(fut, waker)| {
let mut task_cx = Context::from_waker(waker);
std::pin::Pin::new(fut).poll(&mut task_cx)
})
.filter(|poll| poll.is_pending())
.count();
drop(writer_guard);
let mut admitted_late_readers = Vec::new();
for (mut late_reader_fut, late_reader_waker) in late_reader_futs {
let mut late_reader_task_cx = Context::from_waker(&late_reader_waker);
match std::pin::Pin::new(&mut late_reader_fut).poll(&mut late_reader_task_cx) {
Poll::Ready(Ok(guard)) => admitted_late_readers.push(guard),
other => panic!("late reader did not acquire after writer turn: {other:?}"),
}
}
let late_readers_ready_after_writer_release = admitted_late_readers.len();
drop(admitted_late_readers);
let final_state = lock.debug_state();
UpgradeWriterLivenessSignature {
active_readers_after_queue: queued_state.readers,
queued_writer_waiters_after_queue: queued_state.writer_waiters,
queued_reader_waiters_after_queue: queued_state.reader_waiters.len(),
late_reader_wakes_before_writer_turn,
late_readers_pending_while_writer_held,
late_readers_ready_after_writer_release,
final_readers: final_state.readers,
final_writer_waiters: final_state.writer_waiters,
final_reader_waiters: final_state.reader_waiters.len(),
}
}
proptest! {
#[test]
fn mr_older_reader_admission_invariant_to_younger_writer_suffix(
extra_writers in 1usize..6,
) {
let baseline = older_reader_admission_with_younger_writer_suffix(0);
let transformed = older_reader_admission_with_younger_writer_suffix(extra_writers);
prop_assert!(
baseline.reader_ready_after_release,
"MR6 SETUP VIOLATION: baseline older reader did not acquire after writer release"
);
prop_assert!(
transformed.reader_ready_after_release,
"MR6 VIOLATION: older reader was delayed by appended younger writers"
);
prop_assert_eq!(
transformed.readers_while_guard_held,
baseline.readers_while_guard_held,
"MR6 VIOLATION: appended younger writers changed active reader cardinality"
);
prop_assert_eq!(
transformed.writer_waiters_while_reader_active,
extra_writers,
"MR6 VIOLATION: younger writers should still be queued while the older reader runs"
);
for (i, wake_count) in transformed.writer_wakes_before_reader.iter().enumerate() {
prop_assert_eq!(
*wake_count,
0,
"MR6 VIOLATION: younger writer {} was woken before the older reader acquired",
i,
);
}
}
}
#[test]
fn waiting_writer_blocks_late_readers_until_writer_turn_completes() {
let harness = RwLockTestHarness::new(0u64);
let lock = harness.lock();
let cx = test_cx();
let blocking_reader = block_on(lock.read(&cx)).expect("initial reader should acquire");
let mut writer_fut = OwnedRwLockWriteGuard::write(lock.clone(), &cx);
let (writer_waker, writer_wake_count) = CountWaker::new();
let writer_waker_obj = Waker::from(Arc::new(writer_waker));
let mut writer_task_cx = Context::from_waker(&writer_waker_obj);
assert!(
Pin::new(&mut writer_fut)
.poll(&mut writer_task_cx)
.is_pending(),
"writer should wait behind active reader"
);
let mut late_reader_fut = OwnedRwLockReadGuard::read(lock.clone(), &cx);
let (late_reader_waker, late_reader_wake_count) = CountWaker::new();
let late_reader_waker_obj = Waker::from(Arc::new(late_reader_waker));
let mut late_reader_task_cx = Context::from_waker(&late_reader_waker_obj);
assert!(
Pin::new(&mut late_reader_fut)
.poll(&mut late_reader_task_cx)
.is_pending(),
"late reader should queue behind waiting writer"
);
drop(blocking_reader);
assert!(
writer_wake_count.load(Ordering::SeqCst) > 0,
"writer should be woken when the blocking reader releases"
);
assert_eq!(
late_reader_wake_count.load(Ordering::SeqCst),
0,
"late reader must stay blocked until the queued writer runs"
);
let writer_guard = match Pin::new(&mut writer_fut).poll(&mut writer_task_cx) {
Poll::Ready(Ok(guard)) => guard,
other => panic!("writer did not acquire after wake: {other:?}"),
};
assert!(
Pin::new(&mut late_reader_fut)
.poll(&mut late_reader_task_cx)
.is_pending(),
"late reader must still be blocked while writer guard is held"
);
drop(writer_guard);
assert!(
late_reader_wake_count.load(Ordering::SeqCst) > 0,
"late reader should be woken after writer completes its turn"
);
assert!(
matches!(
Pin::new(&mut late_reader_fut).poll(&mut late_reader_task_cx),
Poll::Ready(Ok(_))
),
"late reader should acquire once writer turn completes"
);
}
#[test]
fn cancelled_waiting_writer_reopens_reader_admission() {
let harness = RwLockTestHarness::new(0u64);
let lock = harness.lock();
let cx = test_cx();
let blocking_reader = block_on(lock.read(&cx)).expect("initial reader should acquire");
let mut cancelled_writer_fut = OwnedRwLockWriteGuard::write(lock.clone(), &cx);
let (writer_waker, _writer_wake_count) = CountWaker::new();
let writer_waker_obj = Waker::from(Arc::new(writer_waker));
let mut writer_task_cx = Context::from_waker(&writer_waker_obj);
assert!(
Pin::new(&mut cancelled_writer_fut)
.poll(&mut writer_task_cx)
.is_pending(),
"writer should queue while reader is active"
);
let mut reader_after_cancel_fut = OwnedRwLockReadGuard::read(lock.clone(), &cx);
let (reader_waker, reader_wake_count) = CountWaker::new();
let reader_waker_obj = Waker::from(Arc::new(reader_waker));
let mut reader_task_cx = Context::from_waker(&reader_waker_obj);
assert!(
Pin::new(&mut reader_after_cancel_fut)
.poll(&mut reader_task_cx)
.is_pending(),
"reader should be blocked while writer preference is active"
);
drop(cancelled_writer_fut);
let state_after_cancel = lock.debug_state();
assert_eq!(
state_after_cancel.writer_waiters, 0,
"cancelling the queued writer must release writer preference"
);
drop(blocking_reader);
assert!(
reader_wake_count.load(Ordering::SeqCst) > 0,
"reader should be woken once the cancelled writer no longer blocks admission"
);
assert!(
matches!(
Pin::new(&mut reader_after_cancel_fut).poll(&mut reader_task_cx),
Poll::Ready(Ok(_))
),
"reader should acquire after the cancelled writer is removed"
);
}
#[test]
fn metamorphic_read_then_write_preserves_liveness_under_reader_pressure() {
let baseline = upgrade_writer_liveness_signature(false, 2);
let transformed = upgrade_writer_liveness_signature(true, 2);
assert_eq!(
transformed, baseline,
"dropping an upgrader's read guard before queueing write must preserve the same writer-liveness signature under reader pressure"
);
assert_eq!(
baseline.active_readers_after_queue, 1,
"the transient upgrader read must not leak into the queued writer state"
);
assert_eq!(
baseline.queued_writer_waiters_after_queue, 1,
"exactly one writer should be queued in the upgrade-liveness scenario"
);
assert_eq!(
baseline.queued_reader_waiters_after_queue, 2,
"both late readers should remain queued behind the writer"
);
assert!(
baseline
.late_reader_wakes_before_writer_turn
.iter()
.all(|wake_count| *wake_count == 0),
"late readers must not be woken before the writer turn completes"
);
assert_eq!(
baseline.late_readers_pending_while_writer_held, 2,
"late readers must stay pending while the writer guard is held"
);
assert_eq!(
baseline.late_readers_ready_after_writer_release, 2,
"all queued late readers should be admitted once the writer releases"
);
assert_eq!(
(
baseline.final_readers,
baseline.final_writer_waiters,
baseline.final_reader_waiters
),
(0, 0, 0),
"the mixed read-then-write path must drain all waiter state"
);
}
#[test]
fn jxq2e6_writer_preference_holds_under_reader_cancellation_cascade() {
let lock = Arc::new(RwLock::new(0_u32));
let cx = test_cx();
let reader_1 = block_on(lock.read(&cx)).expect("reader-1 acquires");
let state = lock.debug_state();
assert_eq!(state.readers, 1, "jxq2e6: one active reader");
let waker = Waker::noop().clone();
let mut writer_task_cx = Context::from_waker(&waker);
let mut writer_fut = lock.write(&cx);
let pending = Pin::new(&mut writer_fut)
.poll(&mut writer_task_cx)
.is_pending();
assert!(pending, "jxq2e6: writer must queue while reader-1 holds");
let mut reader_2 = lock.read(&cx);
let mut reader_3 = lock.read(&cx);
let mut reader_4 = lock.read(&cx);
for r in [
Pin::new(&mut reader_2),
Pin::new(&mut reader_3),
Pin::new(&mut reader_4),
] {
assert!(
r.poll(&mut writer_task_cx).is_pending(),
"jxq2e6: queued reader must wait for writer-preference"
);
}
drop(reader_2);
drop(reader_3);
drop(reader_4);
let state_after_cancel = lock.debug_state();
assert!(
state_after_cancel.reader_waiters.is_empty(),
"jxq2e6: cancelled reader-waiter slots must clear (got {} waiters)",
state_after_cancel.reader_waiters.len()
);
assert_eq!(
state_after_cancel.writer_waiters, 1,
"jxq2e6: writer still queued"
);
drop(reader_1);
let writer_acquired = matches!(
Pin::new(&mut writer_fut).poll(&mut writer_task_cx),
Poll::Ready(Ok(_))
);
assert!(
writer_acquired,
"jxq2e6: writer MUST acquire after reader-1 release + cancelled-reader cleanup"
);
}
proptest! {
#[test]
fn mr_reader_writer_fairness_bound_invariant(
num_excess_writers in 1usize..8,
num_queued_readers in 2usize..5,
) {
let _runtime = std::rc::Rc::new(LabRuntime::new(LabConfig::default()));
let harness = RwLockTestHarness::new(0u64);
let lock = harness.lock();
let cx = test_cx();
let total_writers = MAX_CONSECUTIVE_WRITERS_BEFORE_READER_BATCH + num_excess_writers;
let initial_writer = block_on(lock.write(&cx)).expect("initial writer acquire");
let mut reader_wake_counts = Vec::new();
for i in 0..num_queued_readers {
let reader_lock = lock.clone();
let mut read_fut = OwnedRwLockReadGuard::read(reader_lock, &cx);
let (waker, count) = CountWaker::new();
let waker_obj = Waker::from(Arc::new(waker));
let mut task_cx = Context::from_waker(&waker_obj);
prop_assert!(
Pin::new(&mut read_fut).poll(&mut task_cx).is_pending(),
"Reader {} should be blocked", i
);
reader_wake_counts.push(count);
std::mem::drop(read_fut);
}
let mut writer_futures = Vec::new();
for _i in 0..total_writers {
let writer_lock = lock.clone();
let write_fut = OwnedRwLockWriteGuard::write(writer_lock, &cx);
writer_futures.push(write_fut);
}
drop(initial_writer);
let mut writers_served = 0;
for _ in 0..total_writers {
let mut found_ready = false;
for writer_fut in writer_futures.iter_mut() {
let (waker, _) = CountWaker::new();
let waker_obj = Waker::from(Arc::new(waker));
let mut task_cx = Context::from_waker(&waker_obj);
if let Poll::Ready(Ok(guard)) = Pin::new(writer_fut).poll(&mut task_cx) {
writers_served += 1;
drop(guard); found_ready = true;
break;
}
}
if !found_ready {
break;
}
if writers_served >= MAX_CONSECUTIVE_WRITERS_BEFORE_READER_BATCH {
let readers_granted = reader_wake_counts.iter()
.map(|c| c.load(Ordering::SeqCst))
.sum::<usize>();
prop_assert!(
readers_granted > 0,
"FAIRNESS VIOLATION: No readers granted after {} writers served (excess={})",
writers_served, num_excess_writers
);
break;
}
}
if writers_served >= MAX_CONSECUTIVE_WRITERS_BEFORE_READER_BATCH {
let total_reader_wakes = reader_wake_counts.iter()
.map(|c| c.load(Ordering::SeqCst))
.sum::<usize>();
prop_assert!(
total_reader_wakes > 0,
"BOUNDED FAIRNESS VIOLATED: Excess write pressure ({} beyond threshold) \
prevented reader service after {} writer cycles",
num_excess_writers, writers_served
);
}
}
}
#[test]
fn audit_rwlock_writer_starvation_prevention() {
init_test("audit_rwlock_writer_starvation_prevention");
let cx = test_cx();
let lock = RwLock::new(42u64);
let initial_reader = block_on(lock.read(&cx)).expect("initial reader should acquire");
let mut writer_fut = lock.write(&cx);
let writer_pending = poll_once(&mut writer_fut).is_none();
assert!(
writer_pending,
"writer should be pending while reader is active"
);
{
let state = lock.state.lock();
assert!(
state.writer_waiters > 0,
"writer_waiters should be > 0 after queuing writer, got {}",
state.writer_waiters
);
}
let mut new_reader_fut1 = lock.read(&cx);
let reader1_blocked = poll_once(&mut new_reader_fut1).is_none();
assert!(
reader1_blocked,
"new reader should be blocked when writer_waiters > 0"
);
let mut new_reader_fut2 = lock.read(&cx);
let reader2_blocked = poll_once(&mut new_reader_fut2).is_none();
assert!(
reader2_blocked,
"second new reader should also be blocked when writer_waiters > 0"
);
let try_read_result = lock.try_read();
assert!(
matches!(try_read_result, Err(TryReadError::Locked)),
"try_read should fail with Locked when writers are waiting, got {:?}",
try_read_result
);
drop(initial_reader);
let writer_guard = poll_once(&mut writer_fut);
assert!(
writer_guard.is_some() && writer_guard.as_ref().unwrap().is_ok(),
"writer should acquire lock after initial reader releases"
);
let reader1_still_blocked = poll_once(&mut new_reader_fut1).is_none();
let reader2_still_blocked = poll_once(&mut new_reader_fut2).is_none();
assert!(
reader1_still_blocked && reader2_still_blocked,
"readers should remain blocked while writer is active"
);
drop(writer_guard.unwrap().unwrap());
let reader1_acquired = poll_once(&mut new_reader_fut1);
let reader2_acquired = poll_once(&mut new_reader_fut2);
assert!(
reader1_acquired.is_some() && reader1_acquired.as_ref().unwrap().is_ok(),
"reader1 should acquire after writer releases"
);
assert!(
reader2_acquired.is_some() && reader2_acquired.as_ref().unwrap().is_ok(),
"reader2 should acquire after writer releases"
);
{
let state = lock.state.lock();
assert_eq!(
state.writer_waiters, 0,
"writer_waiters should be 0 after writer completes"
);
assert_eq!(state.readers, 2, "should have 2 active readers");
assert!(!state.writer_active, "no writer should be active");
}
crate::test_complete!("audit_rwlock_writer_starvation_prevention");
}
#[test]
fn audit_rwlock_no_read_to_write_upgrade() {
init_test("audit_rwlock_no_read_to_write_upgrade");
let cx = test_cx();
let lock = RwLock::new(0_u32);
let read_guard = block_on(lock.read(&cx)).expect("initial read guard should acquire");
assert_eq!(*read_guard, 0);
assert!(
matches!(lock.try_write(), Err(TryWriteError::Locked)),
"RwLock intentionally has no in-place read-to-write upgrade; try_write must fail while a read guard is held"
);
let mut write_fut = lock.write(&cx);
assert!(
poll_once(&mut write_fut).is_none(),
"write acquisition must wait until the read guard is dropped"
);
let state_while_read_held = lock.debug_state();
assert_eq!(state_while_read_held.readers, 1);
assert_eq!(state_while_read_held.writer_waiters, 1);
assert!(
!state_while_read_held.writer_active,
"writer must not become active while a read guard is held"
);
let mut late_reader_fut = lock.read(&cx);
assert!(
poll_once(&mut late_reader_fut).is_none(),
"late reader must queue behind the pending writer"
);
drop(read_guard);
let mut write_guard = poll_once(&mut write_fut)
.expect("writer should acquire after dropping read guard")
.expect("writer acquisition should succeed");
*write_guard = 7;
assert!(
poll_once(&mut late_reader_fut).is_none(),
"late reader must remain blocked while the writer guard is active"
);
drop(write_guard);
let late_reader = poll_once(&mut late_reader_fut)
.expect("late reader should acquire after writer releases")
.expect("late reader acquisition should succeed");
assert_eq!(
*late_reader, 7,
"late reader should observe the write made after the read guard was dropped"
);
let state_with_late_reader = lock.debug_state();
assert_eq!(state_with_late_reader.readers, 1);
assert_eq!(state_with_late_reader.writer_waiters, 0);
assert_eq!(state_with_late_reader.reader_waiters.len(), 0);
assert!(!state_with_late_reader.writer_active);
drop(late_reader);
let final_state = lock.debug_state();
assert_eq!(final_state.readers, 0);
assert_eq!(final_state.writer_waiters, 0);
assert_eq!(final_state.reader_waiters.len(), 0);
assert!(!final_state.writer_active);
let cancel_cx = test_cx_with_slot(14);
let cancel_lock = RwLock::new(1_u32);
let blocking_read =
block_on(cancel_lock.read(&cx)).expect("blocking read guard should acquire");
let mut cancelled_write_fut = cancel_lock.write(&cancel_cx);
assert!(
poll_once(&mut cancelled_write_fut).is_none(),
"write waiter should queue behind the active read guard before cancellation"
);
cancel_cx.set_cancel_requested(true);
assert!(
matches!(
poll_once(&mut cancelled_write_fut),
Some(Err(RwLockError::Cancelled))
),
"cancelled write waiter must return a cancellation error without acquiring the lock"
);
let state_after_cancel = cancel_lock.debug_state();
assert_eq!(state_after_cancel.readers, 1);
assert_eq!(state_after_cancel.writer_waiters, 0);
assert_eq!(state_after_cancel.writer_queue.len(), 0);
assert!(!state_after_cancel.writer_active);
drop(blocking_read);
let final_cancel_state = cancel_lock.debug_state();
assert_eq!(final_cancel_state.readers, 0);
assert_eq!(final_cancel_state.writer_waiters, 0);
assert_eq!(final_cancel_state.writer_queue.len(), 0);
assert_eq!(final_cancel_state.reader_waiters.len(), 0);
assert!(!final_cancel_state.writer_active);
crate::test_complete!("audit_rwlock_no_read_to_write_upgrade");
}
#[test]
fn abandon_waiter_calls_lock_ordering_record_release() {
init_test("abandon_waiter_calls_lock_ordering_record_release");
let cx = test_cx();
{
let lock = RwLock::with_name("test_abandon_read", 42_u32);
let _writer = block_on(lock.write(&cx)).expect("write");
let mut read_fut = lock.read(&cx);
let pending = poll_once(&mut read_fut).is_none();
crate::assert_with_log!(pending, "reader queued", true, pending);
drop(_writer);
drop(read_fut);
let state = lock.debug_state();
crate::assert_with_log!(
state.readers == 0 && !state.writer_active,
"abandoned read grant cleaned up",
true,
state.readers == 0 && !state.writer_active
);
}
{
let lock = RwLock::with_name("test_abandon_write", 42_u32);
let _reader = block_on(lock.read(&cx)).expect("read");
let mut write_fut = lock.write(&cx);
let pending = poll_once(&mut write_fut).is_none();
crate::assert_with_log!(pending, "writer queued", true, pending);
drop(_reader);
drop(write_fut);
let state = lock.debug_state();
crate::assert_with_log!(
!state.writer_active && state.writer_waiters == 0,
"abandoned write grant cleaned up",
true,
!state.writer_active && state.writer_waiters == 0
);
}
crate::test_complete!("abandon_waiter_calls_lock_ordering_record_release");
}
#[cfg(debug_assertions)]
#[test]
fn queued_read_handoff_checks_lock_order_before_recording_grant() {
init_test("queued_read_handoff_checks_lock_order_before_recording_grant");
crate::sync::lock_ordering::clear_held_locks();
let cx = test_cx();
let regions_lock = RwLock::with_name("regions_table", 0_u32);
let tasks_lock = RwLock::with_name("tasks_queue", 0_u32);
let active_region_writer = block_on(regions_lock.write(&cx)).expect("region writer");
let tasks_guard = block_on(tasks_lock.write(&cx)).expect("tasks writer");
let mut read_fut = regions_lock.read(&cx);
let queued = poll_once(&mut read_fut).is_none();
crate::assert_with_log!(queued, "reader queued behind writer", true, queued);
drop(active_region_writer);
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = poll_once(&mut read_fut);
}));
assert!(
result.is_err(),
"pre-granted read waiter must reject Regions acquisition while Tasks is held"
);
drop(read_fut);
drop(tasks_guard);
crate::sync::lock_ordering::clear_held_locks();
let state = regions_lock.debug_state();
assert_eq!(state.readers, 0);
assert!(!state.writer_active);
crate::test_complete!("queued_read_handoff_checks_lock_order_before_recording_grant");
}
#[cfg(debug_assertions)]
#[test]
fn queued_write_handoff_checks_lock_order_before_recording_grant() {
init_test("queued_write_handoff_checks_lock_order_before_recording_grant");
crate::sync::lock_ordering::clear_held_locks();
let cx = test_cx();
let regions_lock = RwLock::with_name("regions_table", 0_u32);
let tasks_lock = RwLock::with_name("tasks_queue", 0_u32);
let active_region_reader = block_on(regions_lock.read(&cx)).expect("region reader");
let tasks_guard = block_on(tasks_lock.write(&cx)).expect("tasks writer");
let mut write_fut = regions_lock.write(&cx);
let queued = poll_once(&mut write_fut).is_none();
crate::assert_with_log!(queued, "writer queued behind reader", true, queued);
drop(active_region_reader);
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = poll_once(&mut write_fut);
}));
assert!(
result.is_err(),
"pre-granted write waiter must reject Regions acquisition while Tasks is held"
);
drop(write_fut);
drop(tasks_guard);
crate::sync::lock_ordering::clear_held_locks();
let state = regions_lock.debug_state();
assert_eq!(state.readers, 0);
assert_eq!(state.writer_waiters, 0);
assert!(!state.writer_active);
crate::test_complete!("queued_write_handoff_checks_lock_order_before_recording_grant");
}
}