use std::{
cell::UnsafeCell,
hint::spin_loop,
marker::PhantomData,
ops::{Deref, DerefMut},
ptr::addr_of,
sync::atomic::{AtomicU32, Ordering},
};
use atomic_wait::wake_one;
use crate::poison::{self, LockResult, TryLockError, TryLockResult};
const UNLOCKED: u32 = 0;
const LOCKED: u32 = 1;
const CONTENDED: u32 = 2;
const EXTRA_CONTENDED: u32 = 3;
pub struct Mutex<T: ?Sized> {
futex: AtomicU32,
lock_epoch: AtomicU32,
poison: poison::Flag,
data: UnsafeCell<T>,
}
impl<T: ?Sized> Mutex<T> {
#[inline]
pub fn lock(&self) -> LockResult<MutexGuard<T>> {
if self
.futex
.compare_exchange(UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
self.lock_epoch.fetch_add(1, Ordering::Relaxed);
return MutexGuard::new(self);
}
self.lock_contended()
}
#[cold]
#[allow(clippy::comparison_chain)] fn lock_contended(&self) -> LockResult<MutexGuard<T>> {
loop {
let state = self.spin();
let expect = if state < CONTENDED {
if self.futex.swap(CONTENDED, Ordering::Acquire) == UNLOCKED {
self.lock_epoch.fetch_add(1, Ordering::Relaxed);
return MutexGuard::new(self);
}
CONTENDED
} else if state == CONTENDED {
if self.futex.swap(EXTRA_CONTENDED, Ordering::Acquire) == UNLOCKED {
self.lock_epoch.fetch_add(1, Ordering::Relaxed);
return MutexGuard::new(self);
}
EXTRA_CONTENDED
} else {
EXTRA_CONTENDED
};
atomic_wait::wait(&self.futex, expect);
}
}
#[cold]
fn spin(&self) -> u32 {
let mut spin = 400;
let mut epoch = 0;
loop {
let v = self.futex.load(Ordering::Relaxed);
if v != LOCKED || spin == 0 {
break v;
}
let now = self.lock_epoch.load(Ordering::Relaxed);
if now != epoch {
epoch = now;
spin = 400;
std::thread::yield_now();
}
spin_loop();
spin -= 1;
}
}
#[inline]
pub fn try_lock(&self) -> TryLockResult<MutexGuard<'_, T>> {
match self
.futex
.compare_exchange(UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed)
{
Ok(_) => Ok(MutexGuard::new(self)?),
Err(_) => Err(TryLockError::WouldBlock),
}
}
pub fn get_mut(&mut self) -> LockResult<&mut T> {
let data = self.data.get_mut();
poison::map_result(self.poison.borrow(), |()| data)
}
}
impl<T> Mutex<T> {
#[inline]
pub const fn new(data: T) -> Self {
Self {
data: UnsafeCell::new(data),
lock_epoch: AtomicU32::new(0),
poison: poison::Flag::new(),
futex: AtomicU32::new(UNLOCKED),
}
}
}
impl<T> From<T> for Mutex<T> {
fn from(t: T) -> Self {
Mutex::new(t)
}
}
impl<T: ?Sized + Default> Default for Mutex<T> {
fn default() -> Mutex<T> {
Mutex::new(Default::default())
}
}
impl<T: ?Sized + std::fmt::Debug> std::fmt::Debug for Mutex<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut d = f.debug_struct("Mutex");
match self.try_lock() {
Ok(guard) => {
d.field("data", &&*guard);
}
Err(TryLockError::Poisoned(err)) => {
d.field("data", &&**err.get_ref());
}
Err(TryLockError::WouldBlock) => {
d.field("data", &format_args!("<locked>"));
}
}
d.field("poisoned", &self.poison.get());
d.finish_non_exhaustive()
}
}
unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
#[must_use = "if unused the Mutex will immediately unlock"]
#[clippy::has_significant_drop]
pub struct MutexGuard<'a, T: ?Sized + 'a> {
lock: &'a Mutex<T>,
poison: poison::Guard,
_phantom: PhantomUnsend,
}
impl<'a, T: ?Sized> MutexGuard<'a, T> {
fn new(lock: &'a Mutex<T>) -> LockResult<Self> {
poison::map_result(lock.poison.guard(), |guard| Self {
lock,
poison: guard,
_phantom: PhantomData,
})
}
}
pub type PhantomUnsend = PhantomData<std::sync::MutexGuard<'static, ()>>;
impl<T: ?Sized> Deref for MutexGuard<'_, T> {
type Target = T;
#[inline]
fn deref(&self) -> &T {
unsafe { &*self.lock.data.get() }
}
}
impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
#[inline]
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.lock.data.get() }
}
}
impl<T: ?Sized> Drop for MutexGuard<'_, T> {
#[inline]
fn drop(&mut self) {
self.lock.poison.done(&self.poison);
let released = self.lock.futex.swap(UNLOCKED, Ordering::Release);
if released == CONTENDED {
wake_one(addr_of!(self.lock.futex));
} else if released == EXTRA_CONTENDED {
wake_one(addr_of!(self.lock.futex));
wake_one(addr_of!(self.lock.futex));
}
}
}
impl<T: ?Sized + std::fmt::Debug> std::fmt::Debug for MutexGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(&**self, f)
}
}
impl<T: ?Sized + std::fmt::Display> std::fmt::Display for MutexGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
(**self).fmt(f)
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use crate::Mutex;
#[test]
fn poisoned() {
let m = Arc::new(Mutex::new(()));
let mt = m.clone();
let _ = std::thread::spawn(move || {
let _g = mt.lock().expect("lock must succeed");
panic!("bail while locked");
})
.join();
match m.lock() {
Ok(_) => panic!("must not lock"),
Err(_poison) => {
}
};
}
}