use crossbeam_channel::{
bounded, unbounded, Receiver as CbReceiver, Select, Sender as CbSender, TryRecvError,
};
use hdrhistogram::Histogram;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex as StdMutex, OnceLock, RwLock as StdRwLock};
use crate::batch::{register_thread_batch, BatchRegistry, BatchedMeasurement, MeasurementBatch};
use crate::instant::Instant;
use crate::lib_on::hotpath_guard::{
WORKER_BATCH_SIZE, WORKER_FLUSH_INTERVAL_MS, WORKER_SHUTDOWN_DRAIN_LIMIT,
};
use crate::lib_on::START_TIME;
use crate::metrics_server::METRICS_SERVER_PORT;
pub(crate) mod wrapper;
pub use wrapper::std::{Mutex, MutexGuard};
static MUTEX_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
fn next_mutex_id() -> u32 {
MUTEX_ID_COUNTER.fetch_add(1, Ordering::Relaxed)
}
#[derive(Debug)]
pub(crate) enum MutexEvent {
Created {
id: u32,
source: &'static str,
label: Option<String>,
type_name: &'static str,
},
Released {
id: u32,
wait_nanos: u64,
acquire_nanos: u64,
elapsed_ns: u64,
},
}
#[derive(Debug, Clone)]
pub(crate) struct MutexEntry {
pub(crate) id: u32,
pub(crate) source: &'static str,
pub(crate) label: Option<String>,
pub(crate) type_name: &'static str,
pub(crate) count: u64,
pub(crate) wait_total_nanos: u64,
pub(crate) acquire_total_nanos: u64,
wait_hist: Option<Histogram<u64>>,
acquire_hist: Option<Histogram<u64>>,
pub(crate) iter: u32,
}
impl MutexEntry {
const LOW_NS: u64 = 1;
const HIGH_NS: u64 = 1_000_000_000_000; const SIGFIGS: u8 = 3;
fn new_histogram() -> Histogram<u64> {
Histogram::<u64>::new_with_bounds(Self::LOW_NS, Self::HIGH_NS, Self::SIGFIGS)
.expect("hdrhistogram init")
}
#[inline]
fn record(hist: &mut Option<Histogram<u64>>, nanos: u64) {
if let Some(ref mut hist) = hist {
hist.record(nanos.clamp(Self::LOW_NS, Self::HIGH_NS))
.unwrap();
}
}
pub(crate) fn wait_avg_nanos(&self) -> u64 {
self.wait_total_nanos.checked_div(self.count).unwrap_or(0)
}
pub(crate) fn acquire_avg_nanos(&self) -> u64 {
self.acquire_total_nanos
.checked_div(self.count)
.unwrap_or(0)
}
fn percentile(hist: &Option<Histogram<u64>>, count: u64, p: f64) -> u64 {
match hist {
Some(hist) if count > 0 => hist.value_at_percentile(p.clamp(0.0, 100.0)),
_ => 0,
}
}
pub(crate) fn wait_percentile_nanos(&self, p: f64) -> u64 {
Self::percentile(&self.wait_hist, self.count, p)
}
pub(crate) fn acquire_percentile_nanos(&self, p: f64) -> u64 {
Self::percentile(&self.acquire_hist, self.count, p)
}
}
pub(crate) struct MutexesInternalState {
pub(crate) stats: HashMap<u32, MutexEntry>,
}
#[cfg(feature = "hotpath-meta")]
pub(crate) type MutexEventTx = hotpath_meta::wrap::crossbeam::Sender<Vec<MutexEvent>>;
#[cfg(not(feature = "hotpath-meta"))]
pub(crate) type MutexEventTx = CbSender<Vec<MutexEvent>>;
pub(crate) struct MutexesState {
pub(crate) event_tx: MutexEventTx,
pub(crate) inner: Arc<StdRwLock<MutexesInternalState>>,
pub(crate) shutdown_tx: StdMutex<Option<CbSender<()>>>,
pub(crate) completion_rx: StdMutex<Option<CbReceiver<()>>>,
}
pub(crate) static MUTEXES_STATE: OnceLock<MutexesState> = OnceLock::new();
pub(crate) fn get_sorted_mutex_entries() -> Vec<MutexEntry> {
let Some(state) = MUTEXES_STATE.get() else {
return Vec::new();
};
let guard = state.inner.read().unwrap();
let mut stats: Vec<MutexEntry> = guard.stats.values().cloned().collect();
stats.sort_by(compare_mutex_entries);
stats
}
pub(crate) fn get_mutexes_json() -> crate::json::JsonMutexesList {
let entries = get_sorted_mutex_entries();
let elapsed = std::time::Duration::from_nanos(crate::lib_on::current_elapsed_ns());
crate::lib_on::report::collect_mutexes_json(
&entries,
elapsed,
&crate::lib_on::hotpath_guard::configured_percentiles(),
)
}
#[inline]
pub(crate) fn elapsed_nanos(start: Instant) -> u64 {
start.elapsed().as_nanos() as u64
}
static EVENT_REGISTRY: BatchRegistry<MutexEvent> = BatchRegistry::new();
thread_local! {
static EVENT_BATCH: std::sync::Arc<std::sync::Mutex<MeasurementBatch<MutexEvent>>> =
register_thread_batch(&EVENT_REGISTRY);
}
#[inline]
pub(crate) fn send_mutex_event(event: MutexEvent) {
let _suspend = crate::lib_on::SuspendAllocTracking::new();
EVENT_BATCH.with(|b| {
if let Ok(mut b) = b.lock() {
b.add(event);
}
});
}
pub(crate) fn flush_mutex_batch() {
EVENT_REGISTRY.flush_all();
}
impl BatchedMeasurement for MutexEvent {
type Tx = MutexEventTx;
fn elapsed_since_start_ns(&self) -> u64 {
match self {
MutexEvent::Released { elapsed_ns, .. } => *elapsed_ns,
_ => 0,
}
}
fn fetch_sender() -> Option<Self::Tx> {
Some(MUTEXES_STATE.get()?.event_tx.clone())
}
fn send_batch(tx: &Self::Tx, batch: Vec<Self>) {
let _ = tx.send(batch);
}
fn is_flush_boundary(&self) -> bool {
matches!(self, MutexEvent::Created { .. })
}
}
fn process_mutex_event(state: &mut MutexesInternalState, event: MutexEvent) {
match event {
MutexEvent::Created {
id,
source,
label,
type_name,
} => {
let iter = state.stats.values().filter(|s| s.source == source).count() as u32;
state.stats.insert(
id,
MutexEntry {
id,
source,
label,
type_name,
count: 0,
wait_total_nanos: 0,
acquire_total_nanos: 0,
wait_hist: Some(MutexEntry::new_histogram()),
acquire_hist: Some(MutexEntry::new_histogram()),
iter,
},
);
}
MutexEvent::Released {
id,
wait_nanos,
acquire_nanos,
elapsed_ns: _,
} => {
if let Some(entry) = state.stats.get_mut(&id) {
entry.count += 1;
entry.wait_total_nanos += wait_nanos;
entry.acquire_total_nanos += acquire_nanos;
MutexEntry::record(&mut entry.wait_hist, wait_nanos);
MutexEntry::record(&mut entry.acquire_hist, acquire_nanos);
}
}
}
}
pub(crate) fn register_mutex<T>(source: &'static str, label: Option<String>) -> u32 {
let type_name = std::any::type_name::<T>();
init_mutexes_state();
let id = next_mutex_id();
send_mutex_event(MutexEvent::Created {
id,
source,
label,
type_name,
});
id
}
fn flush_mutex_buffer(buffer: &mut Vec<MutexEvent>, inner: &Arc<StdRwLock<MutexesInternalState>>) {
if buffer.is_empty() {
return;
}
if let Ok(mut shared) = inner.write() {
for e in buffer.drain(..) {
process_mutex_event(&mut shared, e);
}
}
}
pub(crate) fn init_mutexes_state() -> &'static MutexesState {
MUTEXES_STATE.get_or_init(|| {
START_TIME.get_or_init(Instant::now);
let (event_tx, event_rx) = unbounded::<Vec<MutexEvent>>();
#[cfg(feature = "hotpath-meta")]
let (event_tx, event_rx) = hotpath_meta::channel!(
(event_tx, event_rx),
wrap = true,
log = true,
label = "hp-mx-events"
);
let (shutdown_tx, shutdown_rx) = bounded::<()>(1);
let (completion_tx, completion_rx) = bounded::<()>(1);
let inner = Arc::new(StdRwLock::new(MutexesInternalState {
stats: HashMap::new(),
}));
let inner_clone = Arc::clone(&inner);
std::thread::Builder::new()
.name("hp-mutexes".into())
.spawn(move || {
let mut local_buffer: Vec<MutexEvent> = Vec::with_capacity(WORKER_BATCH_SIZE);
let flush_interval = std::time::Duration::from_millis(WORKER_FLUSH_INTERVAL_MS);
let mut select = Select::new();
let _shutdown_idx = select.recv(&shutdown_rx);
#[cfg(feature = "hotpath-meta")]
let _event_idx = select.recv(event_rx.select_handle());
#[cfg(not(feature = "hotpath-meta"))]
let _event_idx = select.recv(&event_rx);
loop {
if select.ready_timeout(flush_interval).is_err() {
flush_mutex_buffer(&mut local_buffer, &inner_clone);
continue;
}
if !matches!(shutdown_rx.try_recv(), Err(TryRecvError::Empty)) {
for _ in 0..WORKER_SHUTDOWN_DRAIN_LIMIT {
match event_rx.try_recv() {
Ok(events) => local_buffer.extend(events),
Err(_) => break,
}
}
flush_mutex_buffer(&mut local_buffer, &inner_clone);
break;
}
match event_rx.try_recv() {
Ok(events) => {
local_buffer.extend(events);
if local_buffer.len() >= WORKER_BATCH_SIZE {
flush_mutex_buffer(&mut local_buffer, &inner_clone);
}
}
Err(TryRecvError::Disconnected) => {
flush_mutex_buffer(&mut local_buffer, &inner_clone);
break;
}
Err(TryRecvError::Empty) => {}
}
}
let _ = completion_tx.send(());
})
.expect("Failed to spawn mutex-stats-collector thread");
crate::metrics_server::start_metrics_server_once(*METRICS_SERVER_PORT);
MutexesState {
event_tx,
inner,
shutdown_tx: StdMutex::new(Some(shutdown_tx)),
completion_rx: StdMutex::new(Some(completion_rx)),
}
})
}
pub(crate) fn compare_mutex_entries(a: &MutexEntry, b: &MutexEntry) -> std::cmp::Ordering {
match (a.label.is_some(), b.label.is_some()) {
(true, false) => std::cmp::Ordering::Less,
(false, true) => std::cmp::Ordering::Greater,
(true, true) => a
.label
.as_ref()
.unwrap()
.cmp(b.label.as_ref().unwrap())
.then_with(|| a.iter.cmp(&b.iter)),
(false, false) => a.source.cmp(b.source).then_with(|| a.iter.cmp(&b.iter)),
}
}
#[doc(hidden)]
pub trait InstrumentMutex {
type Output;
fn instrument(self, source: &'static str, label: Option<String>) -> Self::Output;
}
#[macro_export]
macro_rules! mutex {
($expr:expr) => {{
const MUTEX_ID: &'static str = concat!(file!(), ":", line!());
$crate::InstrumentMutex::instrument($expr, MUTEX_ID, None)
}};
($expr:expr, label = $label:expr) => {{
const MUTEX_ID: &'static str = concat!(file!(), ":", line!());
$crate::InstrumentMutex::instrument($expr, MUTEX_ID, Some($label.to_string()))
}};
}