use crate::as_mut;
use crate::alloc::MemPool;
use crate::cell::VCell;
use crate::ptr::Ptr;
use crate::stm::{Journal, Log, Notifier, Logger};
use crate::{PSafe, RootObj, TxInSafe, TxOutSafe};
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::sync::{TryLockError, TryLockResult};
#[allow(unused_imports)]
use std::{fmt, intrinsics};
pub struct Mutex<T, A: MemPool> {
heap: PhantomData<A>,
inner: VCell<MutexInner, A>,
data: UnsafeCell<(u8, T)>,
}
struct MutexInner {
borrowed: bool,
#[cfg(feature = "pthread")]
lock: (bool, libc::pthread_mutex_t, libc::pthread_mutexattr_t),
#[cfg(not(feature = "pthread"))]
lock: (bool, u64)
}
impl Default for MutexInner {
#[cfg(feature = "pthread")]
fn default() -> Self {
use std::mem::MaybeUninit;
let mut attr = MaybeUninit::<libc::pthread_mutexattr_t>::uninit();
let mut lock = libc::PTHREAD_MUTEX_INITIALIZER;
unsafe { init_lock(&mut lock, attr.as_mut_ptr()); }
MutexInner { borrowed: false, lock: (false, lock, unsafe { attr.assume_init() }) }
}
#[cfg(not(feature = "pthread"))]
fn default() -> Self {
MutexInner { borrowed: false, lock: (false, 0) }
}
}
impl MutexInner {
fn acquire(&self) -> bool {
if self.borrowed {
false
} else {
as_mut(self).borrowed = true;
true
}
}
fn release(&self) {
as_mut(self).borrowed = false;
}
}
impl<T: ?Sized, A: MemPool> !TxOutSafe for Mutex<T, A> {}
impl<T, A: MemPool> UnwindSafe for Mutex<T, A> {}
impl<T, A: MemPool> RefUnwindSafe for Mutex<T, A> {}
unsafe impl<T, A: MemPool> TxInSafe for Mutex<T, A> {}
unsafe impl<T, A: MemPool> PSafe for Mutex<T, A> {}
unsafe impl<T: Send, A: MemPool> Send for Mutex<T, A> {}
unsafe impl<T: Send, A: MemPool> Sync for Mutex<T, A> {}
impl<T, A: MemPool> Mutex<T, A> {
pub fn new(data: T, _journal: &Journal<A>) -> Mutex<T, A> {
Mutex {
heap: PhantomData,
inner: VCell::new(MutexInner::default()),
data: UnsafeCell::new((0, data)),
}
}
}
impl<T: PSafe, A: MemPool> Mutex<T, A> {
#[inline]
#[allow(clippy::mut_from_ref)]
pub(crate) fn get_mut(&self, journal: &Journal<A>) -> &mut T {
unsafe {
let inner = &mut *self.data.get();
if inner.0 == 0 {
inner.1.take_log(journal, Notifier::NonAtomic(Ptr::from_ref(&inner.0)));
}
&mut inner.1
}
}
#[inline]
#[allow(clippy::mut_from_ref)]
fn self_mut(&self) -> &mut Self {
unsafe {
let ptr: *const Self = self;
&mut *(ptr as *mut Self)
}
}
}
impl<T, A: MemPool> Mutex<T, A> {
#[inline]
fn raw_lock(&self, journal: &Journal<A>) {
unsafe {
let lock = &self.inner.lock.1 as *const _ as *mut _;
#[cfg(feature = "pthread")] {
libc::pthread_mutex_lock(lock);
}
#[cfg(not(feature = "pthread"))] {
let tid = std::thread::current().id().as_u64().get();
while intrinsics::atomic_cxchg_acqrel(lock, 0, tid).0 != tid {}
}
if self.inner.acquire() {
Log::unlock_on_commit(&self.inner.lock as *const _ as u64, journal);
} else {
#[cfg(feature = "pthread")]
libc::pthread_mutex_unlock(lock);
#[cfg(not(feature = "pthread"))]
intrinsics::atomic_store_rel(lock, 0);
panic!("Cannot have multiple instances of MutexGuard");
}
}
}
pub fn lock<'a>(&'a self, journal: &'a Journal<A>) -> MutexGuard<'a, T, A> {
self.raw_lock(journal);
unsafe { MutexGuard::new(self, journal) }
}
#[inline]
fn raw_trylock(&self, journal: &Journal<A>) -> bool {
unsafe {
let lock = &self.inner.lock.1 as *const _ as *mut _;
#[cfg(feature = "pthread")]
let result = libc::pthread_mutex_trylock(lock) == 0;
#[cfg(not(feature = "pthread"))]
let result = {
let tid = std::thread::current().id().as_u64().get();
intrinsics::atomic_cxchg_acqrel(lock, 0, tid).0 == tid
};
if result {
if self.inner.acquire() {
Log::unlock_on_commit(&self.inner.lock as *const _ as u64, journal);
true
} else {
#[cfg(feature = "pthread")]
libc::pthread_mutex_unlock(lock);
#[cfg(not(feature = "pthread"))]
intrinsics::atomic_store_rel(lock, 0);
panic!("Cannot have multiple instances of MutexGuard");
}
} else {
false
}
}
}
pub fn try_lock<'a>(&'a self, journal: &'a Journal<A>) -> TryLockResult<MutexGuard<'a, T, A>> {
if self.raw_trylock(journal) {
unsafe { Ok(MutexGuard::new(self, journal)) }
} else {
Err(TryLockError::WouldBlock)
}
}
}
impl<T: RootObj<A>, A: MemPool> RootObj<A> for Mutex<T, A> {
fn init(journal: &Journal<A>) -> Self {
Mutex::new(T::init(journal), journal)
}
}
impl<T: fmt::Debug, A: MemPool> fmt::Debug for Mutex<T, A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.data.fmt(f)
}
}
pub struct MutexGuard<'a, T: 'a, A: MemPool> {
lock: &'a Mutex<T, A>,
journal: *const Journal<A>,
}
impl<T: ?Sized, A: MemPool> !TxOutSafe for MutexGuard<'_, T, A> {}
impl<T: ?Sized, A: MemPool> !Send for MutexGuard<'_, T, A> {}
unsafe impl<T: Sync, A: MemPool> Sync for MutexGuard<'_, T, A> {}
impl<T: fmt::Debug, A: MemPool> fmt::Debug for MutexGuard<'_, T, A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<T: fmt::Display, A: MemPool> fmt::Display for MutexGuard<'_, T, A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
(**self).fmt(f)
}
}
impl<'mutex, T, A: MemPool> MutexGuard<'mutex, T, A> {
unsafe fn new(
lock: &'mutex Mutex<T, A>,
journal: &'mutex Journal<A>,
) -> MutexGuard<'mutex, T, A> {
MutexGuard { lock, journal }
}
}
impl<T, A: MemPool> Deref for MutexGuard<'_, T, A> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &(*self.lock.data.get()).1 }
}
}
impl<T: PSafe, A: MemPool> DerefMut for MutexGuard<'_, T, A> {
fn deref_mut(&mut self) -> &mut T {
unsafe { self.lock.get_mut(&*self.journal) }
}
}
impl<T, A: MemPool> Drop for MutexGuard<'_, T, A> {
fn drop(&mut self) {
self.lock.inner.release()
}
}
#[cfg(feature = "pthread")]
pub unsafe fn init_lock(mutex: *mut libc::pthread_mutex_t, attr: *mut libc::pthread_mutexattr_t) {
*mutex = libc::PTHREAD_MUTEX_INITIALIZER;
let result = libc::pthread_mutexattr_init(attr);
debug_assert_eq!(result, 0);
let result =
libc::pthread_mutexattr_settype(attr, libc::PTHREAD_MUTEX_RECURSIVE);
debug_assert_eq!(result, 0);
let result = libc::pthread_mutex_init(mutex, attr);
debug_assert_eq!(result, 0);
let result = libc::pthread_mutexattr_destroy(attr);
debug_assert_eq!(result, 0);
}