use crate::core::detector;
use crate::core::locks::NEXT_LOCK_ID;
use crate::core::types::{LockId, ThreadId, get_current_thread_id};
#[cfg(feature = "logging-and-visualization")]
use crate::core::{Events, logger};
use parking_lot::{Mutex as ParkingLotMutex, MutexGuard as ParkingLotMutexGuard};
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
pub struct Mutex<T> {
id: LockId,
inner: ParkingLotMutex<T>,
creator_thread_id: ThreadId,
owner: AtomicUsize,
}
pub struct MutexGuard<'a, T> {
thread_id: ThreadId,
lock_id: LockId,
guard: ParkingLotMutexGuard<'a, T>,
owner_atomic: &'a AtomicUsize,
tracked_globally: bool,
}
impl<T> Mutex<T> {
pub fn new(value: T) -> Self {
let id = NEXT_LOCK_ID.fetch_add(1, Ordering::SeqCst);
let creator_thread_id = get_current_thread_id();
detector::mutex::create_mutex(id, Some(creator_thread_id));
Mutex {
id,
inner: ParkingLotMutex::new(value),
creator_thread_id,
owner: AtomicUsize::new(0),
}
}
pub fn id(&self) -> LockId {
self.id
}
pub fn creator_thread_id(&self) -> ThreadId {
self.creator_thread_id
}
pub fn lock(&self) -> MutexGuard<'_, T> {
let thread_id = get_current_thread_id();
let tid_usize = thread_id;
#[cfg(not(feature = "stress-test"))]
if let Some(guard) = self.inner.try_lock() {
self.owner.store(tid_usize, Ordering::Release);
#[cfg(feature = "logging-and-visualization")]
{
if logger::LOGGING_ENABLED.load(Ordering::Relaxed) {
logger::log_interaction_event(thread_id, self.id, Events::MutexAttempt);
}
}
#[cfg(feature = "lock-order-graph")]
detector::mutex::complete_acquire(thread_id, self.id);
#[cfg(feature = "logging-and-visualization")]
{
if logger::LOGGING_ENABLED.load(Ordering::Relaxed) {
logger::log_interaction_event(thread_id, self.id, Events::MutexAcquired);
}
}
return MutexGuard {
thread_id,
lock_id: self.id,
guard,
owner_atomic: &self.owner,
tracked_globally: cfg!(feature = "lock-order-graph"),
};
}
let mut current_owner_val = self.owner.load(Ordering::Acquire);
if current_owner_val == 0 && self.inner.is_locked() {
let mut spin_count = 0;
while current_owner_val == 0 {
if spin_count < 100 {
std::hint::spin_loop();
} else {
std::thread::yield_now();
}
current_owner_val = self.owner.load(Ordering::Relaxed);
spin_count += 1;
if spin_count % 16 == 0 && !self.inner.is_locked() {
break;
}
}
std::sync::atomic::fence(Ordering::Acquire);
}
let current_owner = if current_owner_val == 0 {
None
} else {
Some(current_owner_val as ThreadId)
};
let deadlock_info = detector::mutex::acquire_slow(thread_id, self.id, current_owner);
if let Some(info) = deadlock_info {
let is_stale = if let Some(expected_owner) = current_owner {
let actual_owner = self.owner.load(Ordering::Relaxed);
!detector::deadlock_handling::verify_deadlock_edges(
&info,
thread_id,
self.id,
expected_owner,
actual_owner,
)
} else {
false
};
if !is_stale {
detector::deadlock_handling::process_deadlock(info);
}
}
let guard = self.inner.lock();
detector::mutex::complete_acquire(thread_id, self.id);
self.owner.store(tid_usize, Ordering::Release);
MutexGuard {
thread_id,
lock_id: self.id,
guard,
owner_atomic: &self.owner,
tracked_globally: true,
}
}
pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
let thread_id = get_current_thread_id();
let tid_usize = thread_id;
if let Some(guard) = self.inner.try_lock() {
self.owner.store(tid_usize, Ordering::Release);
#[cfg(feature = "logging-and-visualization")]
{
if logger::LOGGING_ENABLED.load(Ordering::Relaxed) {
logger::log_interaction_event(thread_id, self.id, Events::MutexAttempt);
}
}
#[cfg(feature = "lock-order-graph")]
detector::mutex::complete_acquire(thread_id, self.id);
#[cfg(feature = "logging-and-visualization")]
{
if logger::LOGGING_ENABLED.load(Ordering::Relaxed) {
logger::log_interaction_event(thread_id, self.id, Events::MutexAcquired);
}
}
Some(MutexGuard {
thread_id,
lock_id: self.id,
guard,
owner_atomic: &self.owner,
tracked_globally: cfg!(feature = "lock-order-graph"),
})
} else {
None
}
}
pub fn into_inner(self) -> T
where
T: Sized,
{
detector::mutex::destroy_mutex(self.id);
let mutex = std::mem::ManuallyDrop::new(self);
unsafe { std::ptr::read(&mutex.inner) }.into_inner()
}
pub fn get_mut(&mut self) -> &mut T {
self.inner.get_mut()
}
}
impl<T> Drop for Mutex<T> {
fn drop(&mut self) {
detector::mutex::destroy_mutex(self.id);
}
}
impl<T> Deref for MutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.guard.deref()
}
}
impl<T> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.guard.deref_mut()
}
}
impl<'a, T> MutexGuard<'a, T> {
pub(crate) fn inner_guard(&mut self) -> &mut ParkingLotMutexGuard<'a, T> {
&mut self.guard
}
pub(crate) fn lock_id(&self) -> LockId {
self.lock_id
}
pub(crate) fn clear_ownership(&self) {
self.owner_atomic.store(0, Ordering::Release);
}
pub(crate) fn restore_ownership(&self) {
self.owner_atomic.store(self.thread_id, Ordering::Release);
}
}
impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
self.owner_atomic.store(0, Ordering::Release);
if self.tracked_globally {
detector::mutex::release_mutex(self.thread_id, self.lock_id);
} else {
#[cfg(feature = "logging-and-visualization")]
if logger::LOGGING_ENABLED.load(Ordering::Relaxed) {
logger::log_interaction_event(self.thread_id, self.lock_id, Events::MutexReleased);
}
}
}
}
impl<T: Default> Default for Mutex<T> {
fn default() -> Mutex<T> {
Mutex::new(Default::default())
}
}
impl<T> From<T> for Mutex<T> {
fn from(t: T) -> Self {
Mutex::new(t)
}
}