use std::collections::HashMap;
use std::sync::{Arc, LazyLock, OnceLock, RwLock};
use std::time::Duration;
use crate::instant::Instant;
#[cfg(target_os = "macos")]
#[path = "threads/collector_macos.rs"]
mod collector;
#[cfg(target_os = "linux")]
#[path = "threads/collector_linux.rs"]
mod collector;
pub(crate) use crate::json::ThreadMetrics;
use crate::json::{format_bytes_signed, JsonThreadEntry, JsonThreadsList};
use crate::output::format_bytes;
#[cfg_attr(feature = "hotpath-meta", hotpath_meta::measure(log = true))]
pub(crate) fn thread_metrics_with_percentage(
mut metrics: ThreadMetrics,
prev: Option<&ThreadMetrics>,
elapsed_secs: f64,
) -> ThreadMetrics {
if let Some(prev_metrics) = prev {
if prev_metrics.os_tid == metrics.os_tid && elapsed_secs > 0.0 {
let cpu_delta = metrics.cpu_total - prev_metrics.cpu_total;
metrics.cpu_percent = Some((cpu_delta / elapsed_secs) * 100.0);
}
}
metrics
}
#[allow(dead_code)]
struct ThreadsState {
previous_metrics: HashMap<u64, ThreadMetrics>,
current_metrics: Vec<ThreadMetrics>,
last_sample_time: Instant,
sample_interval: Duration,
start_time: Instant,
max_cpu_percent: HashMap<u64, f64>,
}
type ThreadsStateRef = Arc<RwLock<ThreadsState>>;
static THREADS_STATE: OnceLock<ThreadsStateRef> = OnceLock::new();
static THREADS_INTERVAL_MS: LazyLock<u64> = LazyLock::new(|| {
std::env::var("HOTPATH_THREADS_INTERVAL_MS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(250)
});
#[cfg_attr(feature = "hotpath-meta", hotpath_meta::measure(log = true))]
pub(crate) fn init_threads_monitoring() {
THREADS_STATE.get_or_init(|| {
let sample_interval_ms = *THREADS_INTERVAL_MS;
let sample_interval = Duration::from_millis(sample_interval_ms);
let start_time = Instant::now();
let state = Arc::new(RwLock::new(ThreadsState {
previous_metrics: HashMap::new(),
current_metrics: Vec::new(),
last_sample_time: start_time,
sample_interval,
start_time,
max_cpu_percent: HashMap::new(),
}));
let state_clone = Arc::clone(&state);
std::thread::Builder::new()
.name("hp-threads".into())
.spawn(move || {
let _suspend = crate::lib_on::SuspendAllocTracking::new();
collector_loop(state_clone, sample_interval);
})
.expect("Failed to spawn thread-metrics-collector thread");
state
});
}
#[cfg_attr(feature = "hotpath-meta", hotpath_meta::measure(log = true))]
#[cfg(any(target_os = "macos", target_os = "linux"))]
fn collector_loop(state: ThreadsStateRef, interval: Duration) {
loop {
match collector::collect_thread_metrics() {
Ok(raw_metrics) => {
let mut state_guard = match state.write() {
Ok(guard) => guard,
Err(_) => continue,
};
let elapsed_secs = state_guard.last_sample_time.elapsed().as_secs_f64();
let profiler_elapsed = state_guard.start_time.elapsed().as_secs_f64();
let mut new_metrics = Vec::with_capacity(raw_metrics.len());
for metric in raw_metrics {
let prev = state_guard.previous_metrics.get(&metric.os_tid);
#[allow(unused_mut)]
let mut m_with_percent =
thread_metrics_with_percentage(metric.clone(), prev, elapsed_secs);
#[cfg(feature = "hotpath-alloc")]
if let Some((alloc, dealloc)) =
super::functions::alloc::core::get_thread_alloc_stats(m_with_percent.os_tid)
{
m_with_percent.alloc_bytes = Some(alloc);
m_with_percent.dealloc_bytes = Some(dealloc);
m_with_percent.mem_diff = Some(alloc as i64 - dealloc as i64);
}
if profiler_elapsed > 0.0 {
m_with_percent.cpu_percent_avg =
Some((m_with_percent.cpu_total / profiler_elapsed) * 100.0);
}
if let Some(pct) = m_with_percent.cpu_percent {
let max = state_guard
.max_cpu_percent
.entry(m_with_percent.os_tid)
.or_insert(0.0);
if pct > *max {
*max = pct;
}
m_with_percent.cpu_percent_max = Some(*max);
} else if let Some(&max) =
state_guard.max_cpu_percent.get(&m_with_percent.os_tid)
{
m_with_percent.cpu_percent_max = Some(max);
}
new_metrics.push(m_with_percent);
}
state_guard.previous_metrics =
new_metrics.iter().map(|m| (m.os_tid, m.clone())).collect();
state_guard.current_metrics = new_metrics;
state_guard.last_sample_time = Instant::now();
}
Err(e) => {
eprintln!("[hotpath] Failed to collect thread metrics: {}", e);
}
}
std::thread::sleep(interval);
}
}
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
fn collector_loop(_state: ThreadsStateRef, _interval: Duration) {
loop {
std::thread::sleep(Duration::from_secs(3600));
}
}
#[cfg(any(target_os = "macos", target_os = "linux"))]
fn get_rss_bytes() -> Option<u64> {
collector::get_rss_bytes()
}
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
fn get_rss_bytes() -> Option<u64> {
None
}
#[cfg_attr(feature = "hotpath-meta", hotpath_meta::measure(log = true))]
pub(crate) fn get_threads_json() -> JsonThreadsList {
let rss_bytes = get_rss_bytes();
if let Some(state) = THREADS_STATE.get() {
if let Ok(state_guard) = state.read() {
let current_elapsed_ns = state_guard.start_time.elapsed().as_nanos() as u64;
let (total_alloc, total_dealloc) =
state_guard
.current_metrics
.iter()
.fold((0u64, 0u64), |(alloc, dealloc), m| {
(
alloc + m.alloc_bytes.unwrap_or(0),
dealloc + m.dealloc_bytes.unwrap_or(0),
)
});
let has_alloc_data = state_guard
.current_metrics
.iter()
.any(|m| m.alloc_bytes.is_some());
let (total_alloc_bytes, total_dealloc_bytes, alloc_dealloc_diff) = if has_alloc_data {
let diff = total_alloc as i64 - total_dealloc as i64;
(
Some(format_bytes(total_alloc)),
Some(format_bytes(total_dealloc)),
Some(format_bytes_signed(diff)),
)
} else {
(None, None, None)
};
let mut sorted_metrics: Vec<&ThreadMetrics> =
state_guard.current_metrics.iter().collect();
#[cfg(feature = "hotpath-alloc")]
sorted_metrics
.sort_by(|a, b| b.alloc_bytes.unwrap_or(0).cmp(&a.alloc_bytes.unwrap_or(0)));
#[cfg(not(feature = "hotpath-alloc"))]
sorted_metrics.sort_by(|a, b| {
b.cpu_percent_max
.unwrap_or(0.0)
.total_cmp(&a.cpu_percent_max.unwrap_or(0.0))
});
return JsonThreadsList {
current_elapsed_ns,
sample_interval_ms: state_guard.sample_interval.as_millis() as u64,
data: sorted_metrics
.iter()
.map(|m| JsonThreadEntry::from(*m))
.collect(),
thread_count: state_guard.current_metrics.len(),
rss_bytes: rss_bytes.map(format_bytes),
total_alloc_bytes,
total_dealloc_bytes,
alloc_dealloc_diff,
};
}
}
JsonThreadsList {
current_elapsed_ns: 0,
sample_interval_ms: *THREADS_INTERVAL_MS,
data: Vec::new(),
thread_count: 0,
rss_bytes: rss_bytes.map(format_bytes),
total_alloc_bytes: None,
total_dealloc_bytes: None,
alloc_dealloc_diff: None,
}
}