use crate::core::futex::{futex_wait, futex_wake_all};
use crate::sync::Backoff;
use crossbeam_utils::CachePadded;
use std::fmt;
use std::sync::atomic;
use std::sync::atomic::{AtomicUsize, Ordering};
type State = usize;
const UNLOCKED: State = 0;
const READERS_PARKED: State = 0b0001;
const WRITERS_PARKED: State = 0b0010;
const ONE_READER: State = 0b0100;
const ONE_WRITER: State = !(READERS_PARKED | WRITERS_PARKED);
#[inline]
fn is_writer_locked(state: State) -> bool {
(state & ONE_WRITER) == ONE_WRITER
}
#[inline]
fn readers_count(state: State) -> usize {
if is_writer_locked(state) {
0
} else {
(state & ONE_WRITER) >> 2
}
}
struct InnerMutex {
state: CachePadded<AtomicUsize>,
readers_futex: CachePadded<AtomicUsize>,
writers_futex: CachePadded<AtomicUsize>,
ref_count: CachePadded<AtomicUsize>,
}
impl InnerMutex {
fn new() -> Self {
Self {
state: CachePadded::new(AtomicUsize::new(0)),
ref_count: CachePadded::new(AtomicUsize::new(1)),
readers_futex: CachePadded::new(AtomicUsize::new(0)),
writers_futex: CachePadded::new(AtomicUsize::new(0)),
}
}
}
#[repr(transparent)]
pub(crate) struct RawMutex {
ptr: *const InnerMutex,
}
unsafe impl Send for RawMutex {}
unsafe impl Sync for RawMutex {}
impl std::panic::UnwindSafe for RawMutex {}
impl std::panic::RefUnwindSafe for RawMutex {}
impl RawMutex {
pub(crate) fn new() -> Self {
let ptr = Box::into_raw(Box::new(InnerMutex::new()));
Self { ptr }
}
#[inline(always)]
fn inner(&self) -> &InnerMutex {
unsafe { &*self.ptr }
}
pub(crate) fn get_ref_count(&self) -> usize {
self.inner().ref_count.load(Ordering::Acquire)
}
pub(crate) fn get_shared_locked(&self) -> usize {
readers_count(self.inner().state.load(Ordering::Acquire))
}
#[inline]
pub(crate) fn is_locked_exclusive(&self) -> bool {
is_writer_locked(self.inner().state.load(Ordering::Acquire))
}
#[inline]
pub(crate) fn is_locked_shared(&self) -> bool {
let s = self.inner().state.load(Ordering::Acquire);
!is_writer_locked(s) && readers_count(s) > 0
}
pub(crate) fn is_locked(&self) -> bool {
self.inner().state.load(Ordering::Acquire) != UNLOCKED
}
#[inline]
pub(crate) fn try_lock_exclusive(&self) -> bool {
self.inner()
.state
.compare_exchange(UNLOCKED, ONE_WRITER, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
}
pub(crate) fn lock_exclusive(&self) {
if self
.inner()
.state
.compare_exchange(UNLOCKED, ONE_WRITER, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
self.lock_exclusive_slow();
}
}
pub(crate) fn unlock_exclusive(&self) {
if self
.inner()
.state
.compare_exchange(ONE_WRITER, UNLOCKED, Ordering::Release, Ordering::Relaxed)
.is_err()
{
self.unlock_exclusive_slow();
}
}
#[cold]
fn lock_exclusive_slow(&self) {
let inner = self.inner();
let backoff = Backoff::new();
let mut state = inner.state.load(Ordering::Relaxed);
loop {
while state & ONE_WRITER == 0 {
let new_state = (state & READERS_PARKED) | ONE_WRITER;
match inner.state.compare_exchange_weak(
state,
new_state,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => return,
Err(e) => state = e,
}
}
if state & WRITERS_PARKED == 0 {
if !backoff.is_completed() {
backoff.snooze();
continue;
}
if let Err(e) = inner.state.compare_exchange_weak(
state,
state | WRITERS_PARKED,
Ordering::Relaxed,
Ordering::Relaxed,
) {
state = e;
continue;
}
}
loop {
let w_key = inner.writers_futex.load(Ordering::Acquire);
let state = inner.state.load(Ordering::Acquire);
if (state & ONE_WRITER == 0) || (state & WRITERS_PARKED == 0) {
break;
}
futex_wait(&inner.writers_futex, w_key);
}
backoff.reset();
state = inner.state.load(Ordering::Relaxed);
}
}
#[inline]
fn unlock_exclusive_slow(&self) {
let inner = self.inner();
let state = inner.state.load(Ordering::Relaxed);
let parked = state & (READERS_PARKED | WRITERS_PARKED);
let next_state = if parked & WRITERS_PARKED != 0 {
parked
} else {
UNLOCKED
};
let parked = inner.state.swap(next_state, Ordering::Release);
if parked & WRITERS_PARKED != 0 {
inner.writers_futex.fetch_add(1, Ordering::Release);
futex_wake_all(&*inner.writers_futex);
}
if parked & READERS_PARKED != 0 && parked & WRITERS_PARKED == 0 {
inner.readers_futex.fetch_add(1, Ordering::Release);
futex_wake_all(&*inner.readers_futex);
}
}
#[inline]
pub(crate) fn try_lock_shared(&self) -> bool {
self.try_lock_shared_fast() || self.try_lock_shared_slow()
}
#[inline(always)]
fn try_lock_shared_fast(&self) -> bool {
let inner = self.inner();
let state = inner.state.load(Ordering::Relaxed);
if let Some(new_state) = state.checked_add(ONE_READER)
&& new_state & ONE_WRITER != ONE_WRITER
&& state & WRITERS_PARKED == 0
{
return inner
.state
.compare_exchange(state, new_state, Ordering::Acquire, Ordering::Relaxed)
.is_ok();
}
false
}
#[cold]
fn try_lock_shared_slow(&self) -> bool {
let inner = self.inner();
let mut state = inner.state.load(Ordering::Relaxed);
while let Some(new_state) = state.checked_add(ONE_READER) {
if new_state & ONE_WRITER == ONE_WRITER || state & WRITERS_PARKED != 0 {
break;
}
match inner.state.compare_exchange_weak(
state,
new_state,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => return true,
Err(e) => state = e,
}
}
false
}
pub(crate) fn lock_shared(&self) {
if !self.try_lock_shared_fast() {
self.lock_shared_slow();
}
}
pub(crate) fn unlock_shared(&self) {
let inner = self.inner();
let prev_state = inner.state.fetch_sub(ONE_READER, Ordering::Release);
if readers_count(prev_state) == 1 && (prev_state & WRITERS_PARKED != 0) {
inner.writers_futex.fetch_add(1, Ordering::Release);
futex_wake_all(&*inner.writers_futex);
}
}
#[cold]
fn lock_shared_slow(&self) {
let inner = self.inner();
let backoff = Backoff::new();
loop {
let mut state = inner.state.load(Ordering::Relaxed);
while let Some(new_state) = state.checked_add(ONE_READER) {
if state & WRITERS_PARKED != 0 || new_state & ONE_WRITER == ONE_WRITER {
break;
}
if inner
.state
.compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
return;
}
state = inner.state.load(Ordering::Relaxed);
}
if state & READERS_PARKED == 0 {
if !backoff.is_completed() {
backoff.snooze();
continue;
}
if inner
.state
.compare_exchange_weak(
state,
state | READERS_PARKED,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_err()
{
continue;
}
}
let w_key = inner.readers_futex.load(Ordering::Acquire);
let state = inner.state.load(Ordering::Acquire);
if state & ONE_WRITER != ONE_WRITER && state & WRITERS_PARKED == 0 {
backoff.reset();
continue;
}
futex_wait(&inner.readers_futex, w_key);
backoff.reset();
}
}
pub(crate) fn unlock_all_shared(&self) {
let inner = self.inner();
loop {
let state = inner.state.load(Ordering::Acquire);
let count = readers_count(state);
if count == 0 {
return;
}
let readers_to_remove = ONE_READER.wrapping_mul(count);
let new_state = state.wrapping_sub(readers_to_remove);
debug_assert!(
!is_writer_locked(new_state),
"unlock_all_shared resulted in writer lock set"
);
debug_assert!(
readers_count(new_state) == 0,
"unlock_all_shared didn't remove all readers"
);
if inner
.state
.compare_exchange(state, new_state, Ordering::Release, Ordering::Relaxed)
.is_ok()
{
if readers_count(new_state) == 0 && (state & WRITERS_PARKED != 0) {
inner.writers_futex.fetch_add(1, Ordering::Release);
futex_wake_all(&*inner.writers_futex);
}
break;
}
}
}
pub(crate) fn downgrade(&self) {
let inner = self.inner();
let mut state = inner.state.load(Ordering::Relaxed);
loop {
let new_state = (state & !ONE_WRITER) + ONE_READER;
match inner.state.compare_exchange(
state,
new_state,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => {
if state & READERS_PARKED != 0 {
futex_wake_all(&*inner.readers_futex);
}
break;
}
Err(s) => state = s,
}
}
}
}
impl Clone for RawMutex {
fn clone(&self) -> Self {
self.inner().ref_count.fetch_add(1, Ordering::Relaxed);
RawMutex { ptr: self.ptr }
}
}
impl Drop for RawMutex {
fn drop(&mut self) {
if self.inner().ref_count.fetch_sub(1, Ordering::Release) == 1 {
atomic::fence(Ordering::Acquire);
let ptr = self.ptr as *mut InnerMutex;
unsafe { drop(Box::from_raw(ptr)) };
}
}
}
impl Default for RawMutex {
fn default() -> Self {
Self::new()
}
}
impl fmt::Debug for RawMutex {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.inner().state.load(Ordering::Acquire);
let readers = readers_count(state);
f.debug_struct("Mutex")
.field("state", &format!("{:b}", state))
.field("exclusive_locked", &self.is_locked_exclusive())
.field("readers_count", &readers)
.field("readers_parked", &(state & READERS_PARKED != 0))
.field("writers_parked", &(state & WRITERS_PARKED != 0))
.field("ref_count", &self.inner().ref_count.load(Ordering::Acquire))
.finish()
}
}