use super::wait_wake::{futex_wait, futex_wake, futex_wake_all};
use core::sync::atomic::{
AtomicU32,
Ordering::{Acquire, Relaxed, Release},
};
pub type MovableRwLock = RwLock;
pub struct RwLock {
state: AtomicU32,
writer_notify: AtomicU32,
}
const READ_LOCKED: u32 = 1;
const MASK: u32 = (1 << 30) - 1;
const WRITE_LOCKED: u32 = MASK;
const MAX_READERS: u32 = MASK - 1;
const READERS_WAITING: u32 = 1 << 30;
const WRITERS_WAITING: u32 = 1 << 31;
fn is_unlocked(state: u32) -> bool {
state & MASK == 0
}
fn is_write_locked(state: u32) -> bool {
state & MASK == WRITE_LOCKED
}
fn has_readers_waiting(state: u32) -> bool {
state & READERS_WAITING != 0
}
fn has_writers_waiting(state: u32) -> bool {
state & WRITERS_WAITING != 0
}
fn is_read_lockable(state: u32) -> bool {
state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state)
}
fn has_reached_max_readers(state: u32) -> bool {
state & MASK == MAX_READERS
}
impl RwLock {
#[inline]
pub const fn new() -> Self {
Self {
state: AtomicU32::new(0),
writer_notify: AtomicU32::new(0),
}
}
#[inline]
pub unsafe fn try_read(&self) -> bool {
self.state
.fetch_update(Acquire, Relaxed, |s| {
is_read_lockable(s).then(|| s + READ_LOCKED)
})
.is_ok()
}
#[inline]
pub unsafe fn read(&self) {
let state = self.state.load(Relaxed);
if !is_read_lockable(state)
|| self
.state
.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
.is_err()
{
self.read_contended();
}
}
#[inline]
pub unsafe fn read_unlock(&self) {
let state = self.state.fetch_sub(READ_LOCKED, Release) - READ_LOCKED;
debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state));
if is_unlocked(state) && has_writers_waiting(state) {
self.wake_writer_or_readers(state);
}
}
#[cold]
fn read_contended(&self) {
let mut state = self.spin_read();
loop {
if is_read_lockable(state) {
match self
.state
.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
{
Ok(_) => return, Err(s) => {
state = s;
continue;
}
}
}
if has_reached_max_readers(state) {
panic!("too many active read locks on RwLock");
}
if !has_readers_waiting(state) {
if let Err(s) =
self.state
.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed)
{
state = s;
continue;
}
}
futex_wait(&self.state, state | READERS_WAITING, None);
state = self.spin_read();
}
}
#[inline]
pub unsafe fn try_write(&self) -> bool {
self.state
.fetch_update(Acquire, Relaxed, |s| {
is_unlocked(s).then(|| s + WRITE_LOCKED)
})
.is_ok()
}
#[inline]
pub unsafe fn write(&self) {
if self
.state
.compare_exchange_weak(0, WRITE_LOCKED, Acquire, Relaxed)
.is_err()
{
self.write_contended();
}
}
#[inline]
pub unsafe fn write_unlock(&self) {
let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED;
debug_assert!(is_unlocked(state));
if has_writers_waiting(state) || has_readers_waiting(state) {
self.wake_writer_or_readers(state);
}
}
#[cold]
fn write_contended(&self) {
let mut state = self.spin_write();
let mut other_writers_waiting = 0;
loop {
if is_unlocked(state) {
match self.state.compare_exchange_weak(
state,
state | WRITE_LOCKED | other_writers_waiting,
Acquire,
Relaxed,
) {
Ok(_) => return, Err(s) => {
state = s;
continue;
}
}
}
if !has_writers_waiting(state) {
if let Err(s) =
self.state
.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed)
{
state = s;
continue;
}
}
other_writers_waiting = WRITERS_WAITING;
let seq = self.writer_notify.load(Acquire);
let s = self.state.load(Relaxed);
if is_unlocked(state) || !has_writers_waiting(s) {
state = s;
continue;
}
futex_wait(&self.writer_notify, seq, None);
state = self.spin_write();
}
}
#[cold]
fn wake_writer_or_readers(&self, mut state: u32) {
assert!(is_unlocked(state));
if state == WRITERS_WAITING {
match self.state.compare_exchange(state, 0, Relaxed, Relaxed) {
Ok(_) => {
self.wake_writer();
return;
}
Err(s) => {
state = s;
}
}
}
if state == READERS_WAITING + WRITERS_WAITING {
if self
.state
.compare_exchange(state, READERS_WAITING, Relaxed, Relaxed)
.is_err()
{
return;
}
if self.wake_writer() {
return;
}
state = READERS_WAITING;
}
if state == READERS_WAITING {
if self
.state
.compare_exchange(state, 0, Relaxed, Relaxed)
.is_ok()
{
futex_wake_all(&self.state);
}
}
}
fn wake_writer(&self) -> bool {
self.writer_notify.fetch_add(1, Release);
futex_wake(&self.writer_notify)
}
fn spin_until(&self, f: impl Fn(u32) -> bool) -> u32 {
let mut spin = 100; loop {
let state = self.state.load(Relaxed);
if f(state) || spin == 0 {
return state;
}
core::hint::spin_loop();
spin -= 1;
}
}
fn spin_write(&self) -> u32 {
self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
}
fn spin_read(&self) -> u32 {
self.spin_until(|state| {
!is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)
})
}
}