use core::borrow::Borrow;
use core::cell::UnsafeCell;
use core::fmt;
use core::marker::PhantomData;
use core::ops::{Deref, DerefMut};
use core::pin::Pin;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::task::Poll;
use core::usize;
use alloc::sync::Arc;
#[cfg(all(feature = "std", not(target_family = "wasm")))]
use std::time::{Duration, Instant};
use event_listener::{Event, EventListener};
use event_listener_strategy::{easy_wrapper, EventListenerFuture};
pub struct Mutex<T: ?Sized> {
state: AtomicUsize,
lock_ops: Event,
data: UnsafeCell<T>,
}
unsafe impl<T: Send + ?Sized> Send for Mutex<T> {}
unsafe impl<T: Send + ?Sized> Sync for Mutex<T> {}
impl<T> Mutex<T> {
pub const fn new(data: T) -> Mutex<T> {
Mutex {
state: AtomicUsize::new(0),
lock_ops: Event::new(),
data: UnsafeCell::new(data),
}
}
pub fn into_inner(self) -> T {
self.data.into_inner()
}
}
impl<T: ?Sized> Mutex<T> {
#[inline]
pub fn lock(&self) -> Lock<'_, T> {
Lock::_new(LockInner {
mutex: self,
acquire_slow: None,
})
}
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn lock_blocking(&self) -> MutexGuard<'_, T> {
self.lock().wait()
}
#[inline]
pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
if self
.state
.compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
.is_ok()
{
Some(MutexGuard(self))
} else {
None
}
}
pub fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.data.get() }
}
pub(crate) unsafe fn unlock_unchecked(&self) {
self.state.fetch_sub(1, Ordering::Release);
self.lock_ops.notify(1);
}
}
impl<T: ?Sized> Mutex<T> {
#[inline]
pub fn lock_arc(self: &Arc<Self>) -> LockArc<T> {
LockArc::_new(LockArcInnards::Unpolled {
mutex: Some(self.clone()),
})
}
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn lock_arc_blocking(self: &Arc<Self>) -> MutexGuardArc<T> {
self.lock_arc().wait()
}
#[inline]
pub fn try_lock_arc(self: &Arc<Self>) -> Option<MutexGuardArc<T>> {
if self
.state
.compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
.is_ok()
{
Some(MutexGuardArc(self.clone()))
} else {
None
}
}
}
impl<T: fmt::Debug + ?Sized> fmt::Debug for Mutex<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
struct Locked;
impl fmt::Debug for Locked {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<locked>")
}
}
match self.try_lock() {
None => f.debug_struct("Mutex").field("data", &Locked).finish(),
Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(),
}
}
}
impl<T> From<T> for Mutex<T> {
fn from(val: T) -> Mutex<T> {
Mutex::new(val)
}
}
impl<T: Default + ?Sized> Default for Mutex<T> {
fn default() -> Mutex<T> {
Mutex::new(Default::default())
}
}
easy_wrapper! {
pub struct Lock<'a, T: ?Sized>(LockInner<'a, T> => MutexGuard<'a, T>);
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub(crate) wait();
}
pin_project_lite::pin_project! {
struct LockInner<'a, T: ?Sized> {
mutex: &'a Mutex<T>,
#[pin]
acquire_slow: Option<AcquireSlow<&'a Mutex<T>, T>>,
}
}
unsafe impl<T: Send + ?Sized> Send for Lock<'_, T> {}
unsafe impl<T: Sync + ?Sized> Sync for Lock<'_, T> {}
impl<T: ?Sized> fmt::Debug for Lock<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Lock { .. }")
}
}
impl<'a, T: ?Sized> EventListenerFuture for LockInner<'a, T> {
type Output = MutexGuard<'a, T>;
#[inline]
fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
self: Pin<&mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
let mut this = self.project();
if this.acquire_slow.is_none() {
match this.mutex.try_lock() {
Some(guard) => return Poll::Ready(guard),
None => {
this.acquire_slow.set(Some(AcquireSlow::new(this.mutex)));
}
}
}
ready!(this
.acquire_slow
.as_pin_mut()
.unwrap()
.poll_with_strategy(strategy, context));
Poll::Ready(MutexGuard(this.mutex))
}
}
easy_wrapper! {
pub struct LockArc<T: ?Sized>(LockArcInnards<T> => MutexGuardArc<T>);
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub(crate) wait();
}
pin_project_lite::pin_project! {
#[project = LockArcInnardsProj]
enum LockArcInnards<T: ?Sized> {
Unpolled { mutex: Option<Arc<Mutex<T>>> },
AcquireSlow {
#[pin]
inner: AcquireSlow<Arc<Mutex<T>>, T>
},
}
}
unsafe impl<T: Send + ?Sized> Send for LockArc<T> {}
unsafe impl<T: Sync + ?Sized> Sync for LockArc<T> {}
impl<T: ?Sized> fmt::Debug for LockArcInnards<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("LockArc { .. }")
}
}
impl<T: ?Sized> EventListenerFuture for LockArcInnards<T> {
type Output = MutexGuardArc<T>;
fn poll_with_strategy<'a, S: event_listener_strategy::Strategy<'a>>(
mut self: Pin<&mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
if let LockArcInnardsProj::Unpolled { mutex } = self.as_mut().project() {
let mutex = mutex.take().expect("mutex taken more than once");
if let Some(guard) = mutex.try_lock_arc() {
return Poll::Ready(guard);
}
self.as_mut().set(LockArcInnards::AcquireSlow {
inner: AcquireSlow::new(mutex),
});
}
let value = match self.project() {
LockArcInnardsProj::AcquireSlow { inner } => {
ready!(inner.poll_with_strategy(strategy, context))
}
_ => unreachable!(),
};
Poll::Ready(MutexGuardArc(value))
}
}
pin_project_lite::pin_project! {
struct AcquireSlow<B: Borrow<Mutex<T>>, T: ?Sized> {
mutex: Option<B>,
#[pin]
listener: EventListener,
start: Start,
starved: bool,
#[pin]
_marker: PhantomData<T>,
}
impl<T: ?Sized, B: Borrow<Mutex<T>>> PinnedDrop for AcquireSlow<B, T> {
fn drop(this: Pin<&mut Self>) {
this.take_mutex();
}
}
}
struct Start {
#[cfg(all(feature = "std", not(target_family = "wasm")))]
start: Option<Instant>,
}
impl<T: ?Sized, B: Borrow<Mutex<T>>> AcquireSlow<B, T> {
#[cold]
fn new(mutex: B) -> Self {
let listener = { EventListener::new() };
AcquireSlow {
mutex: Some(mutex),
listener,
start: Start {
#[cfg(all(feature = "std", not(target_family = "wasm")))]
start: None,
},
starved: false,
_marker: PhantomData,
}
}
fn take_mutex(self: Pin<&mut Self>) -> Option<B> {
let this = self.project();
let mutex = this.mutex.take();
if *this.starved {
if let Some(mutex) = mutex.as_ref() {
mutex.borrow().state.fetch_sub(2, Ordering::Release);
}
}
mutex
}
}
impl<T: ?Sized, B: Unpin + Borrow<Mutex<T>>> EventListenerFuture for AcquireSlow<B, T> {
type Output = B;
#[cold]
fn poll_with_strategy<'a, S: event_listener_strategy::Strategy<'a>>(
mut self: Pin<&mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
#[cfg(all(feature = "std", not(target_family = "wasm")))]
let start = *this.start.start.get_or_insert_with(Instant::now);
let mutex = Borrow::<Mutex<T>>::borrow(
this.mutex.as_ref().expect("future polled after completion"),
);
if !*this.starved {
loop {
if !this.listener.is_listening() {
this.listener.as_mut().listen(&mutex.lock_ops);
match mutex
.state
.compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
.unwrap_or_else(|x| x)
{
0 => return Poll::Ready(self.take_mutex().unwrap()),
1 => {}
_ => break,
}
} else {
ready!(strategy.poll(this.listener.as_mut(), context));
match mutex
.state
.compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
.unwrap_or_else(|x| x)
{
0 => return Poll::Ready(self.take_mutex().unwrap()),
1 => {}
_ => {
mutex.lock_ops.notify(1);
break;
}
}
#[cfg(all(feature = "std", not(target_family = "wasm")))]
if start.elapsed() > Duration::from_micros(500) {
break;
}
}
}
if mutex.state.fetch_add(2, Ordering::Release) > usize::MAX / 2 {
crate::abort();
}
*this.starved = true;
}
loop {
if !this.listener.is_listening() {
this.listener.as_mut().listen(&mutex.lock_ops);
match mutex
.state
.compare_exchange(2, 2 | 1, Ordering::Acquire, Ordering::Acquire)
.unwrap_or_else(|x| x)
{
2 => return Poll::Ready(self.take_mutex().unwrap()),
s if s % 2 == 1 => {}
_ => {
mutex.lock_ops.notify(1);
}
}
} else {
ready!(strategy.poll(this.listener.as_mut(), context));
if mutex.state.fetch_or(1, Ordering::Acquire) % 2 == 0 {
return Poll::Ready(self.take_mutex().unwrap());
}
}
}
}
}
#[clippy::has_significant_drop]
pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex<T>);
unsafe impl<T: Send + ?Sized> Send for MutexGuard<'_, T> {}
unsafe impl<T: Sync + ?Sized> Sync for MutexGuard<'_, T> {}
impl<'a, T: ?Sized> MutexGuard<'a, T> {
pub fn source(guard: &MutexGuard<'a, T>) -> &'a Mutex<T> {
guard.0
}
}
impl<T: ?Sized> Drop for MutexGuard<'_, T> {
#[inline]
fn drop(&mut self) {
unsafe {
self.0.unlock_unchecked();
}
}
}
impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
(**self).fmt(f)
}
}
impl<T: ?Sized> Deref for MutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.0.data.get() }
}
}
impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.0.data.get() }
}
}
#[clippy::has_significant_drop]
pub struct MutexGuardArc<T: ?Sized>(Arc<Mutex<T>>);
unsafe impl<T: Send + ?Sized> Send for MutexGuardArc<T> {}
unsafe impl<T: Sync + ?Sized> Sync for MutexGuardArc<T> {}
impl<T: ?Sized> MutexGuardArc<T> {
pub fn source(guard: &Self) -> &Arc<Mutex<T>>
where
T: Send,
{
&guard.0
}
}
impl<T: ?Sized> Drop for MutexGuardArc<T> {
#[inline]
fn drop(&mut self) {
unsafe {
self.0.unlock_unchecked();
}
}
}
impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuardArc<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuardArc<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
(**self).fmt(f)
}
}
impl<T: ?Sized> Deref for MutexGuardArc<T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.0.data.get() }
}
}
impl<T: ?Sized> DerefMut for MutexGuardArc<T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.0.data.get() }
}
}
struct CallOnDrop<F: Fn()>(F);
impl<F: Fn()> Drop for CallOnDrop<F> {
fn drop(&mut self) {
(self.0)();
}
}