use crate::runtime::execution::ExecutionState;
use crate::runtime::task::clock::VectorClock;
use crate::runtime::task::{TaskId, TaskSet};
use crate::runtime::thread;
use std::cell::RefCell;
use std::fmt::{Debug, Display};
use std::ops::{Deref, DerefMut};
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::rc::Rc;
use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult};
use tracing::trace;
pub struct RwLock<T: ?Sized> {
state: Rc<RefCell<RwLockState>>,
inner: std::sync::RwLock<T>,
}
#[derive(Debug)]
struct RwLockState {
holder: RwLockHolder,
waiting_readers: TaskSet,
waiting_writers: TaskSet,
clock: VectorClock,
}
#[derive(PartialEq, Eq, Debug)]
enum RwLockHolder {
Read(TaskSet),
Write(TaskId),
None,
}
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
enum RwLockType {
Read,
Write,
}
impl<T> RwLock<T> {
pub fn new(value: T) -> Self {
let state = RwLockState {
holder: RwLockHolder::None,
waiting_readers: TaskSet::new(),
waiting_writers: TaskSet::new(),
clock: VectorClock::new(),
};
Self {
inner: std::sync::RwLock::new(value),
state: Rc::new(RefCell::new(state)),
}
}
}
impl<T: ?Sized> RwLock<T> {
pub fn read(&self) -> LockResult<RwLockReadGuard<'_, T>> {
self.lock(RwLockType::Read);
match self.inner.try_read() {
Ok(guard) => Ok(RwLockReadGuard {
inner: Some(guard),
state: Rc::clone(&self.state),
me: ExecutionState::me(),
}),
Err(TryLockError::Poisoned(err)) => Err(PoisonError::new(RwLockReadGuard {
inner: Some(err.into_inner()),
state: Rc::clone(&self.state),
me: ExecutionState::me(),
})),
Err(TryLockError::WouldBlock) => panic!("rwlock state out of sync"),
}
}
pub fn write(&self) -> LockResult<RwLockWriteGuard<'_, T>> {
self.lock(RwLockType::Write);
match self.inner.try_write() {
Ok(guard) => Ok(RwLockWriteGuard {
inner: Some(guard),
state: Rc::clone(&self.state),
me: ExecutionState::me(),
}),
Err(TryLockError::Poisoned(err)) => Err(PoisonError::new(RwLockWriteGuard {
inner: Some(err.into_inner()),
state: Rc::clone(&self.state),
me: ExecutionState::me(),
})),
Err(TryLockError::WouldBlock) => panic!("rwlock state out of sync"),
}
}
pub fn try_read(&self) -> TryLockResult<RwLockReadGuard<T>> {
if self.try_lock(RwLockType::Read) {
match self.inner.try_read() {
Ok(guard) => Ok(RwLockReadGuard {
inner: Some(guard),
state: Rc::clone(&self.state),
me: ExecutionState::me(),
}),
Err(TryLockError::Poisoned(err)) => Err(TryLockError::Poisoned(PoisonError::new(RwLockReadGuard {
inner: Some(err.into_inner()),
state: Rc::clone(&self.state),
me: ExecutionState::me(),
}))),
Err(TryLockError::WouldBlock) => panic!("rwlock state out of sync"),
}
} else {
Err(TryLockError::WouldBlock)
}
}
pub fn try_write(&self) -> TryLockResult<RwLockWriteGuard<T>> {
if self.try_lock(RwLockType::Write) {
match self.inner.try_write() {
Ok(guard) => Ok(RwLockWriteGuard {
inner: Some(guard),
state: Rc::clone(&self.state),
me: ExecutionState::me(),
}),
Err(TryLockError::Poisoned(err)) => Err(TryLockError::Poisoned(PoisonError::new(RwLockWriteGuard {
inner: Some(err.into_inner()),
state: Rc::clone(&self.state),
me: ExecutionState::me(),
}))),
Err(TryLockError::WouldBlock) => panic!("rwlock state out of sync"),
}
} else {
Err(TryLockError::WouldBlock)
}
}
pub fn into_inner(self) -> LockResult<T>
where
T: Sized,
{
let state = self.state.borrow();
assert_eq!(state.holder, RwLockHolder::None);
ExecutionState::with(|s| {
s.update_clock(&state.clock);
});
self.inner.into_inner()
}
fn lock(&self, typ: RwLockType) {
let me = ExecutionState::me();
let mut state = self.state.borrow_mut();
trace!(
holder = ?state.holder,
waiting_readers = ?state.waiting_readers,
waiting_writers = ?state.waiting_writers,
"acquiring {:?} lock on rwlock {:p}",
typ,
self.state,
);
if typ == RwLockType::Write {
state.waiting_writers.insert(me);
} else {
state.waiting_readers.insert(me);
}
let should_switch = match &state.holder {
RwLockHolder::Write(writer) => {
if *writer == me {
panic!("deadlock! task {:?} tried to acquire a RwLock it already holds", me);
}
ExecutionState::with(|s| s.current_mut().block());
true
}
RwLockHolder::Read(readers) => {
if readers.contains(me) {
panic!("deadlock! task {:?} tried to acquire a RwLock it already holds", me);
}
if typ == RwLockType::Write {
ExecutionState::with(|s| s.current_mut().block());
true
} else {
false
}
}
RwLockHolder::None => false,
};
drop(state);
if should_switch {
thread::switch();
}
let mut state = self.state.borrow_mut();
match (typ, &mut state.holder) {
(RwLockType::Write, RwLockHolder::None) => {
state.holder = RwLockHolder::Write(me);
}
(RwLockType::Read, RwLockHolder::None) => {
let mut readers = TaskSet::new();
readers.insert(me);
state.holder = RwLockHolder::Read(readers);
}
(RwLockType::Read, RwLockHolder::Read(readers)) => {
readers.insert(me);
}
_ => {
panic!(
"resumed a waiting {:?} thread while the lock was in state {:?}",
typ, state.holder
);
}
}
if typ == RwLockType::Write {
state.waiting_writers.remove(me);
} else {
state.waiting_readers.remove(me);
}
trace!(
holder = ?state.holder,
waiting_readers = ?state.waiting_readers,
waiting_writers = ?state.waiting_writers,
"acquired {:?} lock on rwlock {:p}",
typ,
self.state
);
ExecutionState::with(|s| {
s.update_clock(&state.clock);
state.clock.update(s.get_clock(me));
});
Self::block_waiters(&state, me, typ);
drop(state);
thread::switch();
}
fn try_lock(&self, typ: RwLockType) -> bool {
let me = ExecutionState::me();
let mut state = self.state.borrow_mut();
trace!(
holder = ?state.holder,
waiting_readers = ?state.waiting_readers,
waiting_writers = ?state.waiting_writers,
"trying to acquire {:?} lock on rwlock {:p}",
typ,
self.state,
);
let acquired = match (typ, &mut state.holder) {
(RwLockType::Write, RwLockHolder::None) => {
state.holder = RwLockHolder::Write(me);
true
}
(RwLockType::Read, RwLockHolder::None) => {
let mut readers = TaskSet::new();
readers.insert(me);
state.holder = RwLockHolder::Read(readers);
true
}
(RwLockType::Read, RwLockHolder::Read(readers)) => {
readers.insert(me)
}
_ => false,
};
trace!(
"{} {:?} lock on rwlock {:p}",
if acquired { "acquired" } else { "failed to acquire" },
typ,
self.state,
);
ExecutionState::with(|s| {
s.update_clock(&state.clock);
state.clock.update(s.get_clock(me));
});
Self::block_waiters(&state, me, typ);
drop(state);
thread::switch();
acquired
}
fn block_waiters(state: &RwLockState, me: TaskId, typ: RwLockType) {
if typ == RwLockType::Write {
for tid in state.waiting_readers.iter() {
assert_ne!(tid, me);
ExecutionState::with(|s| s.get_mut(tid).block());
}
}
for tid in state.waiting_writers.iter() {
assert_ne!(tid, me);
ExecutionState::with(|s| s.get_mut(tid).block());
}
}
fn unblock_waiters(state: &RwLockState, me: TaskId, drop_type: RwLockType) {
for tid in state.waiting_readers.iter() {
debug_assert_ne!(tid, me);
ExecutionState::with(|s| {
let t = s.get_mut(tid);
debug_assert!(drop_type == RwLockType::Read || t.blocked());
t.unblock();
});
}
if state.holder == RwLockHolder::None {
for tid in state.waiting_writers.iter() {
debug_assert_ne!(tid, me);
ExecutionState::with(|s| {
let t = s.get_mut(tid);
debug_assert!(t.blocked());
t.unblock();
});
}
}
}
}
unsafe impl<T: Send + ?Sized> Send for RwLock<T> {}
unsafe impl<T: Send + ?Sized> Sync for RwLock<T> {}
impl<T: ?Sized> UnwindSafe for RwLock<T> {}
impl<T: ?Sized> RefUnwindSafe for RwLock<T> {}
impl<T: Default + ?Sized> Default for RwLock<T> {
fn default() -> Self {
Self::new(Default::default())
}
}
impl<T: ?Sized + Debug> Debug for RwLock<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&self.inner, f)
}
}
pub struct RwLockReadGuard<'a, T: ?Sized> {
state: Rc<RefCell<RwLockState>>,
me: TaskId,
inner: Option<std::sync::RwLockReadGuard<'a, T>>,
}
impl<T: ?Sized> Deref for RwLockReadGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.inner.as_ref().unwrap().deref()
}
}
impl<T: Debug> Debug for RwLockReadGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
Debug::fmt(&self.inner.as_ref().unwrap(), f)
}
}
impl<T: Display + ?Sized> Display for RwLockReadGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
(**self).fmt(f)
}
}
impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {
fn drop(&mut self) {
self.inner = None;
let mut state = self.state.borrow_mut();
trace!(
holder = ?state.holder,
waiting_readers = ?state.waiting_readers,
waiting_writers = ?state.waiting_writers,
"releasing Read lock on rwlock {:p}",
self.state
);
match &mut state.holder {
RwLockHolder::Read(readers) => {
let was_reader = readers.remove(self.me);
assert!(was_reader);
if readers.is_empty() {
state.holder = RwLockHolder::None;
}
}
_ => panic!("exiting a reader but rwlock is in the wrong state {:?}", state.holder),
}
if ExecutionState::should_stop() {
return;
}
RwLock::<T>::unblock_waiters(&state, self.me, RwLockType::Read);
drop(state);
thread::switch();
}
}
pub struct RwLockWriteGuard<'a, T: ?Sized> {
inner: Option<std::sync::RwLockWriteGuard<'a, T>>,
state: Rc<RefCell<RwLockState>>,
me: TaskId,
}
impl<T: ?Sized> Deref for RwLockWriteGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.inner.as_ref().unwrap().deref()
}
}
impl<T: ?Sized> DerefMut for RwLockWriteGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.as_mut().unwrap().deref_mut()
}
}
impl<T: Debug> Debug for RwLockWriteGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&self.inner.as_ref().unwrap(), f)
}
}
impl<T: Display + ?Sized> Display for RwLockWriteGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
(**self).fmt(f)
}
}
impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> {
fn drop(&mut self) {
self.inner = None;
let mut state = self.state.borrow_mut();
trace!(
holder = ?state.holder,
waiting_readers = ?state.waiting_readers,
waiting_writers = ?state.waiting_writers,
"releasing Write lock on rwlock {:p}",
self.state
);
assert_eq!(state.holder, RwLockHolder::Write(self.me));
state.holder = RwLockHolder::None;
ExecutionState::with(|s| {
let clock = s.increment_clock();
state.clock.update(clock);
});
if ExecutionState::should_stop() {
return;
}
RwLock::<T>::unblock_waiters(&state, self.me, RwLockType::Write);
drop(state);
thread::switch();
}
}