#[derive(Debug, Clone, Default)]
pub struct LockMetricsSnapshot {
pub name: &'static str,
pub acquisitions: u64,
pub contentions: u64,
pub wait_ns: u64,
pub hold_ns: u64,
pub max_wait_ns: u64,
pub max_hold_ns: u64,
pub p95_wait_ns: u64,
pub p999_wait_ns: u64,
pub p95_hold_ns: u64,
pub p999_hold_ns: u64,
pub instrumentation_mode: &'static str,
}
#[cfg(feature = "lock-metrics")]
mod inner {
use super::LockMetricsSnapshot;
use crate::sync::lock_ordering::{self, LockRank};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{LockResult, Mutex, MutexGuard, PoisonError};
use std::time::Instant;
#[derive(Debug, Default)]
#[repr(C, align(64))]
struct Metrics {
acquisitions: AtomicU64,
contentions: AtomicU64,
wait_ns: AtomicU64,
max_wait_ns: AtomicU64,
_pad: [u8; 32],
hold_ns: AtomicU64,
max_hold_ns: AtomicU64,
wait_samples: Mutex<Vec<u64>>,
hold_samples: Mutex<Vec<u64>>,
}
const MAX_SAMPLES: usize = 10000;
impl Metrics {
fn update_max(current: &AtomicU64, value: u64) {
current.fetch_max(value, Ordering::Relaxed);
}
fn record_acquire(&self, wait_ns: u64, contended: bool) {
self.acquisitions.fetch_add(1, Ordering::Relaxed);
self.wait_ns.fetch_add(wait_ns, Ordering::Relaxed);
Self::update_max(&self.max_wait_ns, wait_ns);
let mut samples = self
.wait_samples
.lock()
.unwrap_or_else(PoisonError::into_inner);
if samples.len() >= MAX_SAMPLES {
samples.drain(0..MAX_SAMPLES / 4);
}
samples.push(wait_ns);
if contended {
self.contentions.fetch_add(1, Ordering::Relaxed);
}
}
fn record_hold(&self, hold_ns: u64) {
self.hold_ns.fetch_add(hold_ns, Ordering::Relaxed);
Self::update_max(&self.max_hold_ns, hold_ns);
let mut samples = self
.hold_samples
.lock()
.unwrap_or_else(PoisonError::into_inner);
if samples.len() >= MAX_SAMPLES {
samples.drain(0..MAX_SAMPLES / 4);
}
samples.push(hold_ns);
}
fn quantile(samples: &Mutex<Vec<u64>>, numerator: usize, denominator: usize) -> u64 {
let mut values = samples
.lock()
.unwrap_or_else(PoisonError::into_inner)
.clone();
if values.is_empty() {
return 0;
}
values.sort_unstable();
let last_index = values.len() - 1;
let rank = last_index
.saturating_mul(numerator)
.saturating_add(denominator / 2)
/ denominator;
values[rank.min(last_index)]
}
fn snapshot(&self, name: &'static str) -> LockMetricsSnapshot {
LockMetricsSnapshot {
name,
acquisitions: self.acquisitions.load(Ordering::Relaxed),
contentions: self.contentions.load(Ordering::Relaxed),
wait_ns: self.wait_ns.load(Ordering::Relaxed),
hold_ns: self.hold_ns.load(Ordering::Relaxed),
max_wait_ns: self.max_wait_ns.load(Ordering::Relaxed),
max_hold_ns: self.max_hold_ns.load(Ordering::Relaxed),
p95_wait_ns: Self::quantile(&self.wait_samples, 95, 100),
p999_wait_ns: Self::quantile(&self.wait_samples, 999, 1000),
p95_hold_ns: Self::quantile(&self.hold_samples, 95, 100),
p999_hold_ns: Self::quantile(&self.hold_samples, 999, 1000),
instrumentation_mode: "opt_in_lock_metrics",
}
}
fn reset(&self) {
self.acquisitions.store(0, Ordering::SeqCst);
self.contentions.store(0, Ordering::SeqCst);
self.wait_ns.store(0, Ordering::SeqCst);
self.hold_ns.store(0, Ordering::SeqCst);
self.max_wait_ns.store(0, Ordering::SeqCst);
self.max_hold_ns.store(0, Ordering::SeqCst);
self.wait_samples
.lock()
.unwrap_or_else(PoisonError::into_inner)
.clear();
self.hold_samples
.lock()
.unwrap_or_else(PoisonError::into_inner)
.clear();
}
}
#[derive(Debug)]
pub struct ContendedMutex<T> {
inner: Mutex<T>,
metrics: Metrics,
name: &'static str,
rank: Option<LockRank>,
}
impl<T> ContendedMutex<T> {
pub fn new(name: &'static str, value: T) -> Self {
let rank = LockRank::from_name(name);
Self {
inner: Mutex::new(value),
metrics: Metrics::default(),
name,
rank,
}
}
pub fn lock(&self) -> LockResult<ContendedMutexGuard<'_, T>> {
if let Some(rank) = self.rank {
lock_ordering::check_acquire(self.name, rank);
}
let start = Instant::now();
let (result, contended) = match self.inner.try_lock() {
Ok(guard) => (Ok(guard), false),
Err(std::sync::TryLockError::Poisoned(poison)) => (Err(poison), false),
Err(std::sync::TryLockError::WouldBlock) => (self.inner.lock(), true),
};
let acquired_at = Instant::now();
let wait_ns =
u64::try_from(acquired_at.duration_since(start).as_nanos()).unwrap_or(u64::MAX);
self.metrics.record_acquire(wait_ns, contended);
if let Some(rank) = self.rank {
lock_ordering::record_acquire(self.name, rank);
}
match result {
Ok(guard) => Ok(ContendedMutexGuard {
guard: Some(guard),
acquired_at,
metrics: &self.metrics,
name: self.name,
rank: self.rank,
}),
Err(poison) => Err(PoisonError::new(ContendedMutexGuard {
guard: Some(poison.into_inner()),
acquired_at,
metrics: &self.metrics,
name: self.name,
rank: self.rank,
})),
}
}
pub fn try_lock(
&self,
) -> Result<ContendedMutexGuard<'_, T>, std::sync::TryLockError<ContendedMutexGuard<'_, T>>>
{
match self.inner.try_lock() {
Ok(guard) => {
if let Some(rank) = self.rank {
lock_ordering::check_acquire(self.name, rank);
lock_ordering::record_acquire(self.name, rank);
}
let acquired_at = Instant::now();
self.metrics.record_acquire(0, false);
Ok(ContendedMutexGuard {
guard: Some(guard),
acquired_at,
metrics: &self.metrics,
name: self.name,
rank: self.rank,
})
}
Err(std::sync::TryLockError::WouldBlock) => {
Err(std::sync::TryLockError::WouldBlock)
}
Err(std::sync::TryLockError::Poisoned(poison)) => {
if let Some(rank) = self.rank {
lock_ordering::check_acquire(self.name, rank);
lock_ordering::record_acquire(self.name, rank);
}
let acquired_at = Instant::now();
self.metrics.record_acquire(0, false);
Err(std::sync::TryLockError::Poisoned(PoisonError::new(
ContendedMutexGuard {
guard: Some(poison.into_inner()),
acquired_at,
metrics: &self.metrics,
name: self.name,
rank: self.rank,
},
)))
}
}
}
pub fn snapshot(&self) -> LockMetricsSnapshot {
self.metrics.snapshot(self.name)
}
pub fn reset_metrics(&self) {
self.metrics.reset();
}
pub fn name(&self) -> &'static str {
self.name
}
}
pub struct ContendedMutexGuard<'a, T> {
guard: Option<MutexGuard<'a, T>>,
acquired_at: Instant,
metrics: &'a Metrics,
name: &'static str,
rank: Option<LockRank>,
}
impl<T> std::ops::Deref for ContendedMutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
self.guard.as_ref().expect("guard used after drop")
}
}
impl<T> std::ops::DerefMut for ContendedMutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
self.guard.as_mut().expect("guard used after drop")
}
}
impl<T> Drop for ContendedMutexGuard<'_, T> {
fn drop(&mut self) {
let hold_ns = u64::try_from(self.acquired_at.elapsed().as_nanos()).unwrap_or(u64::MAX);
drop(self.guard.take());
if let Some(rank) = self.rank {
lock_ordering::record_release(self.name, rank);
}
self.metrics.record_hold(hold_ns);
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for ContendedMutexGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ContendedMutexGuard")
.field("data", &self.guard)
.finish()
}
}
}
#[cfg(not(feature = "lock-metrics"))]
mod inner {
use super::LockMetricsSnapshot;
use crate::sync::lock_ordering::{self, LockRank};
use std::sync::{LockResult, Mutex, MutexGuard, PoisonError};
#[derive(Debug)]
pub struct ContendedMutex<T> {
inner: Mutex<T>,
name: &'static str,
rank: Option<LockRank>,
}
impl<T> ContendedMutex<T> {
#[inline]
pub fn new(name: &'static str, value: T) -> Self {
let rank = LockRank::from_name(name);
Self {
inner: Mutex::new(value),
name,
rank,
}
}
#[inline]
pub fn lock(&self) -> LockResult<ContendedMutexGuard<'_, T>> {
if let Some(rank) = self.rank {
lock_ordering::check_acquire(self.name, rank);
}
match self.inner.lock() {
Ok(guard) => {
if let Some(rank) = self.rank {
lock_ordering::record_acquire(self.name, rank);
}
Ok(ContendedMutexGuard {
guard,
name: self.name,
rank: self.rank,
})
}
Err(poison) => {
if let Some(rank) = self.rank {
lock_ordering::record_acquire(self.name, rank);
}
Err(PoisonError::new(ContendedMutexGuard {
guard: poison.into_inner(),
name: self.name,
rank: self.rank,
}))
}
}
}
pub fn try_lock(
&self,
) -> Result<ContendedMutexGuard<'_, T>, std::sync::TryLockError<ContendedMutexGuard<'_, T>>>
{
match self.inner.try_lock() {
Ok(guard) => {
if let Some(rank) = self.rank {
lock_ordering::check_acquire(self.name, rank);
lock_ordering::record_acquire(self.name, rank);
}
Ok(ContendedMutexGuard {
guard,
name: self.name,
rank: self.rank,
})
}
Err(std::sync::TryLockError::WouldBlock) => {
Err(std::sync::TryLockError::WouldBlock)
}
Err(std::sync::TryLockError::Poisoned(poison)) => {
if let Some(rank) = self.rank {
lock_ordering::check_acquire(self.name, rank);
lock_ordering::record_acquire(self.name, rank);
}
Err(std::sync::TryLockError::Poisoned(PoisonError::new(
ContendedMutexGuard {
guard: poison.into_inner(),
name: self.name,
rank: self.rank,
},
)))
}
}
}
pub fn snapshot(&self) -> LockMetricsSnapshot {
LockMetricsSnapshot {
name: self.name,
instrumentation_mode: "disabled",
..Default::default()
}
}
pub fn reset_metrics(&self) {}
pub fn name(&self) -> &'static str {
self.name
}
}
pub struct ContendedMutexGuard<'a, T> {
guard: MutexGuard<'a, T>,
name: &'static str,
rank: Option<LockRank>,
}
impl<T> std::ops::Deref for ContendedMutexGuard<'_, T> {
type Target = T;
#[inline]
fn deref(&self) -> &T {
&self.guard
}
}
impl<T> std::ops::DerefMut for ContendedMutexGuard<'_, T> {
#[inline]
fn deref_mut(&mut self) -> &mut T {
&mut self.guard
}
}
impl<T> Drop for ContendedMutexGuard<'_, T> {
fn drop(&mut self) {
if let Some(rank) = self.rank {
lock_ordering::record_release(self.name, rank);
}
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for ContendedMutexGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ContendedMutexGuard")
.field("data", &*self.guard)
.finish()
}
}
}
pub use inner::{ContendedMutex, ContendedMutexGuard};
#[cfg(test)]
#[allow(clippy::significant_drop_tightening)]
mod tests {
use super::*;
use std::sync::Arc;
#[cfg(feature = "lock-metrics")]
use std::thread;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
#[test]
fn basic_lock_unlock() {
init_test("basic_lock_unlock");
let m = ContendedMutex::new("test", 42);
{
let guard = m.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
crate::assert_with_log!(*guard == 42, "value", 42, *guard);
drop(guard);
}
crate::test_complete!("basic_lock_unlock");
}
#[test]
fn mutate_through_guard() {
init_test("mutate_through_guard");
let m = ContendedMutex::new("test", 0);
{
let mut guard = m.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
*guard = 99;
}
let guard = m.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
crate::assert_with_log!(*guard == 99, "mutated value", 99, *guard);
drop(guard);
crate::test_complete!("mutate_through_guard");
}
#[test]
fn try_lock_succeeds_when_free() {
init_test("try_lock_succeeds_when_free");
let m = ContendedMutex::new("test", 42);
let guard = m.try_lock().expect("should succeed");
crate::assert_with_log!(*guard == 42, "try_lock value", 42, *guard);
drop(guard);
crate::test_complete!("try_lock_succeeds_when_free");
}
#[test]
fn try_lock_fails_when_held() {
init_test("try_lock_fails_when_held");
let m = ContendedMutex::new("test", 42);
let _guard = m.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
let is_err = m.try_lock().is_err();
crate::assert_with_log!(is_err, "try_lock fails", true, is_err);
crate::test_complete!("try_lock_fails_when_held");
}
#[test]
fn snapshot_returns_name() {
init_test("snapshot_returns_name");
let m = ContendedMutex::new("my-shard", 0);
let snap = m.snapshot();
crate::assert_with_log!(snap.name == "my-shard", "name", "my-shard", snap.name);
crate::test_complete!("snapshot_returns_name");
}
#[test]
fn name_accessor() {
init_test("name_accessor");
let m = ContendedMutex::new("tasks", 0);
crate::assert_with_log!(m.name() == "tasks", "name", "tasks", m.name());
crate::test_complete!("name_accessor");
}
#[test]
fn reset_metrics_no_panic() {
init_test("reset_metrics_no_panic");
let m = ContendedMutex::new("test", 0);
{
let _g = m.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
}
m.reset_metrics();
let snap = m.snapshot();
crate::assert_with_log!(
snap.acquisitions == 0,
"acquisitions after reset",
0u64,
snap.acquisitions
);
crate::test_complete!("reset_metrics_no_panic");
}
#[cfg(feature = "lock-metrics")]
#[test]
fn metrics_track_acquisitions() {
init_test("metrics_track_acquisitions");
let m = ContendedMutex::new("test", 0);
for _ in 0..10 {
let _g = m.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
}
let snap = m.snapshot();
crate::assert_with_log!(
snap.acquisitions == 10,
"acquisitions",
10u64,
snap.acquisitions
);
crate::test_complete!("metrics_track_acquisitions");
}
#[cfg(feature = "lock-metrics")]
#[test]
fn metrics_track_hold_time() {
init_test("metrics_track_hold_time");
let m = ContendedMutex::new("test", 0);
{
let _g = m.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
std::thread::sleep(std::time::Duration::from_millis(5));
}
let snap = m.snapshot();
crate::assert_with_log!(
snap.hold_ns >= 4_000_000,
"hold_ns >= 4ms",
true,
snap.hold_ns >= 4_000_000
);
crate::assert_with_log!(
snap.max_hold_ns >= 4_000_000,
"max_hold_ns >= 4ms",
true,
snap.max_hold_ns >= 4_000_000
);
crate::test_complete!("metrics_track_hold_time");
}
#[cfg(feature = "lock-metrics")]
#[test]
fn metrics_track_contention() {
init_test("metrics_track_contention");
let m = Arc::new(ContendedMutex::new("test", 0));
let guard = m.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
let m2 = Arc::clone(&m);
let handle = thread::spawn(move || {
let _g = m2.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
});
thread::sleep(std::time::Duration::from_millis(10));
drop(guard);
handle.join().expect("thread panicked");
let snap = m.snapshot();
crate::assert_with_log!(
snap.contentions >= 1,
"contentions >= 1",
true,
snap.contentions >= 1
);
crate::assert_with_log!(snap.wait_ns > 0, "wait_ns > 0", true, snap.wait_ns > 0);
crate::test_complete!("metrics_track_contention");
}
#[cfg(feature = "lock-metrics")]
#[test]
fn reset_clears_all_metrics() {
init_test("reset_clears_all_metrics");
let m = ContendedMutex::new("test", 0);
{
let _g = m.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
}
let before = m.snapshot();
crate::assert_with_log!(
before.acquisitions == 1,
"before reset",
1u64,
before.acquisitions
);
m.reset_metrics();
let after = m.snapshot();
crate::assert_with_log!(
after.acquisitions == 0,
"after reset acquisitions",
0u64,
after.acquisitions
);
crate::assert_with_log!(
after.hold_ns == 0,
"after reset hold_ns",
0u64,
after.hold_ns
);
crate::test_complete!("reset_clears_all_metrics");
}
#[cfg(feature = "lock-metrics")]
#[test]
fn poisoned_lock_does_not_count_as_contention() {
init_test("poisoned_lock_does_not_count_as_contention");
let m = Arc::new(ContendedMutex::new("test", 0u8));
let m2 = Arc::clone(&m);
let poisoner = thread::spawn(move || {
let _guard = m2.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
panic!("intentional poison");
});
let _ = poisoner.join();
let poison_err = m.lock().expect_err("lock should be poisoned");
drop(poison_err.into_inner());
let snap = m.snapshot();
crate::assert_with_log!(
snap.contentions == 0,
"poison is not contention",
0u64,
snap.contentions
);
crate::test_complete!("poisoned_lock_does_not_count_as_contention");
}
#[test]
fn lock_metrics_snapshot_debug_clone_default() {
let snap = LockMetricsSnapshot::default();
let dbg = format!("{snap:?}");
assert!(dbg.contains("LockMetricsSnapshot"));
assert_eq!(snap.acquisitions, 0);
assert_eq!(snap.contentions, 0);
assert_eq!(snap.wait_ns, 0);
assert_eq!(snap.hold_ns, 0);
assert_eq!(snap.max_wait_ns, 0);
assert_eq!(snap.max_hold_ns, 0);
assert_eq!(snap.p95_wait_ns, 0);
assert_eq!(snap.p999_wait_ns, 0);
assert_eq!(snap.p95_hold_ns, 0);
assert_eq!(snap.p999_hold_ns, 0);
let cloned = snap.clone();
assert_eq!(cloned.name, snap.name);
}
#[cfg(feature = "lock-metrics")]
#[test]
fn metrics_snapshot_reports_tail_latencies() {
init_test("metrics_snapshot_reports_tail_latencies");
let m = ContendedMutex::new("tasks", 0u32);
for _ in 0..4 {
let mut guard = m.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
*guard += 1;
}
let snap = m.snapshot();
crate::assert_with_log!(
snap.instrumentation_mode == "opt_in_lock_metrics",
"instrumentation mode",
"opt_in_lock_metrics",
snap.instrumentation_mode
);
crate::assert_with_log!(
snap.acquisitions == 4,
"acquisitions",
4u64,
snap.acquisitions
);
crate::assert_with_log!(
snap.p95_wait_ns <= snap.max_wait_ns,
"p95 wait <= max wait",
true,
snap.p95_wait_ns <= snap.max_wait_ns
);
crate::assert_with_log!(
snap.p999_wait_ns <= snap.max_wait_ns,
"p999 wait <= max wait",
true,
snap.p999_wait_ns <= snap.max_wait_ns
);
crate::assert_with_log!(
snap.p95_hold_ns <= snap.max_hold_ns,
"p95 hold <= max hold",
true,
snap.p95_hold_ns <= snap.max_hold_ns
);
crate::assert_with_log!(
snap.p999_hold_ns <= snap.max_hold_ns,
"p999 hold <= max hold",
true,
snap.p999_hold_ns <= snap.max_hold_ns
);
crate::test_complete!("metrics_snapshot_reports_tail_latencies");
}
#[test]
fn contended_mutex_debug() {
let m = ContendedMutex::new("test", 42_i32);
let dbg = format!("{m:?}");
assert!(dbg.contains("ContendedMutex"));
}
#[test]
fn contended_mutex_guard_debug() {
let m = ContendedMutex::new("test", 42_i32);
let guard = m.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
let dbg = format!("{guard:?}");
assert!(dbg.contains("ContendedMutexGuard"));
drop(guard);
}
#[test]
fn try_lock_returns_poisoned_after_panic() {
init_test("try_lock_returns_poisoned_after_panic");
let m = Arc::new(ContendedMutex::new("test", 7u32));
let m2 = Arc::clone(&m);
let poisoner = std::thread::spawn(move || {
let _guard = m2.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
panic!("deliberate poison");
});
let _ = poisoner.join();
let result = m.try_lock();
let is_poisoned = matches!(result, Err(std::sync::TryLockError::Poisoned(_)));
crate::assert_with_log!(is_poisoned, "try_lock returns Poisoned", true, is_poisoned);
if let Err(std::sync::TryLockError::Poisoned(pe)) = m.try_lock() {
let guard = pe.into_inner();
crate::assert_with_log!(*guard == 7, "data preserved", 7u32, *guard);
}
crate::test_complete!("try_lock_returns_poisoned_after_panic");
}
#[cfg(feature = "lock-metrics")]
#[test]
fn hold_time_recorded_on_panic_in_critical_section() {
init_test("hold_time_recorded_on_panic_in_critical_section");
let m = Arc::new(ContendedMutex::new("test", 0u32));
let m2 = Arc::clone(&m);
let handle = std::thread::spawn(move || {
let _guard = m2.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
std::thread::sleep(std::time::Duration::from_millis(5));
panic!("panic while holding guard");
});
let _ = handle.join();
let snap = m.snapshot();
crate::assert_with_log!(
snap.hold_ns >= 4_000_000,
"hold_ns recorded despite panic",
true,
snap.hold_ns >= 4_000_000
);
crate::test_complete!("hold_time_recorded_on_panic_in_critical_section");
}
}