use std::collections::HashMap;
use std::io::Write;
use prettytable::{color, Attr, Cell, Row, Table};
use crate::channels::{
channel_to_json, compare_channel_entries, resolve_label, ChannelEntry, CHANNELS_STATE,
};
use crate::debug::{
get_sorted_debug_dbg_entries, get_sorted_debug_gauge_entries, get_sorted_debug_val_entries,
};
use crate::futures::{compare_future_stats, FutureEntry, FUTURES_STATE};
use crate::json::JsonDebugEntry;
use crate::json::{
JsonChannelsList, JsonFutureEntry, JsonFuturesList, JsonMutexEntry, JsonMutexesList,
JsonRwLockEntry, JsonRwLocksList, JsonStreamEntry, JsonStreamsList,
};
use crate::mutexes::{compare_mutex_entries, MutexEntry, MUTEXES_STATE};
use crate::output::{
format_bytes, format_duration, format_percentile_header, format_percentile_key, format_rate,
};
use crate::output_on::write_section_header;
use crate::rw_locks::{compare_rw_lock_entries, RwLockEntry, RwLockKind, RW_LOCKS_STATE};
use crate::streams::{compare_stream_stats, StreamStats, STREAMS_STATE};
fn styled_header(text: &str) -> Cell {
if crate::output::use_colors() {
Cell::new(text)
.with_style(Attr::Bold)
.with_style(Attr::ForegroundColor(color::CYAN))
} else {
Cell::new(text).with_style(Attr::Bold)
}
}
fn print_table(table: &Table, writer: &mut dyn Write) {
if crate::output::use_colors() {
let _ = table.print_tty(false);
} else {
let _ = table.print(writer);
}
}
pub(crate) fn shutdown_channels() -> Vec<ChannelEntry> {
crate::channels::flush_channel_batch();
CHANNELS_STATE
.get()
.and_then(|state| {
if let Ok(mut guard) = state.shutdown_tx.lock() {
if let Some(tx) = guard.take() {
let _ = tx.send(());
}
}
state
.completion_rx
.lock()
.ok()
.and_then(|mut guard| guard.take())
.and_then(|rx| rx.recv().ok());
state
.inner
.read()
.ok()
.map(|inner| inner.stats.values().cloned().collect::<Vec<_>>())
})
.map(|mut channels| {
channels.sort_by(compare_channel_entries);
channels
})
.unwrap_or_default()
}
pub(crate) fn report_channels_table(
channels: &[ChannelEntry],
total_count: usize,
writer: &mut dyn Write,
) {
if channels.is_empty() {
return;
}
write_section_header(writer, "channels", "Channel throughput statistics.");
let mut table = Table::new();
table.add_row(Row::new(vec![
styled_header("Channel"),
styled_header("Type"),
styled_header("Sent"),
styled_header("Received"),
styled_header("Sent/s"),
styled_header("Recv/s"),
styled_header("Max queue"),
]));
for channel_stats in channels {
let label = resolve_label(
channel_stats.source,
channel_stats.label.as_deref(),
Some(channel_stats.iter),
);
let max_queue = channel_stats
.max_queue_size
.map_or_else(|| "-".to_string(), |q| q.to_string());
table.add_row(Row::new(vec![
Cell::new(&label),
Cell::new(&channel_stats.channel_type.to_string()),
Cell::new(&channel_stats.sent_count.to_string()),
Cell::new(&channel_stats.received_count.to_string()),
Cell::new(&format_rate(channel_stats.sent_per_sec())),
Cell::new(&format_rate(channel_stats.received_per_sec())),
Cell::new(&max_queue),
]));
}
if channels.len() < total_count {
let _ = write!(writer, " ({}/{})", channels.len(), total_count);
}
let _ = writeln!(writer);
print_table(&table, writer);
let _ = writeln!(writer);
}
pub(crate) fn report_channel_latency_table(
channels: &[ChannelEntry],
percentiles: &[f64],
writer: &mut dyn Write,
) {
let rows: Vec<&ChannelEntry> = channels
.iter()
.filter(|c| c.has_proc_hist() && c.received_count > 0)
.collect();
if rows.is_empty() {
return;
}
write_section_header(
writer,
"channels latency",
"Channel send->receive latency statistics (wrap channels only).",
);
let _ = writeln!(writer);
let mut header = vec![
styled_header("Channel"),
styled_header("Msgs"),
styled_header("Avg"),
];
for &p in percentiles {
header.push(styled_header(&format_percentile_header(p)));
}
let mut table = Table::new();
table.add_row(Row::new(header));
for channel in rows {
let label = resolve_label(channel.source, channel.label.as_deref(), Some(channel.iter));
let mut row = vec![
Cell::new(&label),
Cell::new(&channel.received_count.to_string()),
Cell::new(&format_duration(channel.proc_avg_nanos())),
];
for &p in percentiles {
row.push(Cell::new(&format_duration(
channel.proc_percentile_nanos(p),
)));
}
table.add_row(Row::new(row));
}
print_table(&table, writer);
let _ = writeln!(writer);
}
pub(crate) fn collect_channels_json(
channels: &[ChannelEntry],
elapsed: std::time::Duration,
percentiles: &[f64],
) -> JsonChannelsList {
JsonChannelsList {
current_elapsed_ns: elapsed.as_nanos() as u64,
percentiles: percentiles.to_vec(),
data: channels
.iter()
.map(|entry| channel_to_json(entry, percentiles))
.collect(),
}
}
pub(crate) fn shutdown_rw_locks() -> Vec<RwLockEntry> {
crate::lib_on::rw_locks::flush_rw_lock_batch();
RW_LOCKS_STATE
.get()
.and_then(|state| {
if let Ok(mut guard) = state.shutdown_tx.lock() {
if let Some(tx) = guard.take() {
let _ = tx.send(());
}
}
state
.completion_rx
.lock()
.ok()
.and_then(|mut guard| guard.take())
.and_then(|rx| rx.recv().ok());
state
.inner
.read()
.ok()
.map(|inner| inner.stats.values().cloned().collect::<Vec<_>>())
})
.map(|mut rw_locks| {
rw_locks.sort_by(compare_rw_lock_entries);
rw_locks
})
.unwrap_or_default()
}
pub(crate) fn report_rw_locks_table(
rw_locks: &[RwLockEntry],
total_count: usize,
percentiles: &[f64],
writer: &mut dyn Write,
) {
if rw_locks.is_empty() {
return;
}
write_section_header(writer, "rw_locks", "RwLock wait & acquire time statistics.");
if rw_locks.len() < total_count {
let _ = write!(writer, " ({}/{})", rw_locks.len(), total_count);
}
let _ = writeln!(writer);
report_rw_locks_subtable(rw_locks, RwLockKind::Read, percentiles, writer);
report_rw_locks_subtable(rw_locks, RwLockKind::Write, percentiles, writer);
}
fn report_rw_locks_subtable(
rw_locks: &[RwLockEntry],
kind: RwLockKind,
percentiles: &[f64],
writer: &mut dyn Write,
) {
let rows: Vec<&RwLockEntry> = rw_locks.iter().filter(|l| l.count(kind) > 0).collect();
if rows.is_empty() {
return;
}
let count_label = match kind {
RwLockKind::Read => "Reads",
RwLockKind::Write => "Writes",
};
let mut header = vec![
styled_header("RwLock"),
styled_header(count_label),
styled_header("Wait avg"),
];
for &p in percentiles {
header.push(styled_header(&format!(
"Wait {}",
format_percentile_header(p)
)));
}
header.push(styled_header("Acq avg"));
for &p in percentiles {
header.push(styled_header(&format!(
"Acq {}",
format_percentile_header(p)
)));
}
let mut table = Table::new();
table.add_row(Row::new(header));
for rw_lock in rows {
let label = resolve_label(rw_lock.source, rw_lock.label.as_deref(), Some(rw_lock.iter));
let mut row = vec![
Cell::new(&label),
Cell::new(&rw_lock.count(kind).to_string()),
Cell::new(&format_duration(rw_lock.wait_avg_nanos(kind))),
];
for &p in percentiles {
row.push(Cell::new(&format_duration(
rw_lock.wait_percentile_nanos(kind, p),
)));
}
row.push(Cell::new(&format_duration(rw_lock.acquire_avg_nanos(kind))));
for &p in percentiles {
row.push(Cell::new(&format_duration(
rw_lock.acquire_percentile_nanos(kind, p),
)));
}
table.add_row(Row::new(row));
}
print_table(&table, writer);
let _ = writeln!(writer);
}
fn rw_lock_to_json(rw_lock: &RwLockEntry, percentiles: &[f64]) -> JsonRwLockEntry {
let label = resolve_label(rw_lock.source, rw_lock.label.as_deref(), Some(rw_lock.iter));
let mut read_wait_percentiles = HashMap::new();
let mut write_wait_percentiles = HashMap::new();
let mut read_acquire_percentiles = HashMap::new();
let mut write_acquire_percentiles = HashMap::new();
for &p in percentiles {
let key = format_percentile_key(p);
read_wait_percentiles.insert(
key.clone(),
format_duration(rw_lock.wait_percentile_nanos(RwLockKind::Read, p)),
);
write_wait_percentiles.insert(
key.clone(),
format_duration(rw_lock.wait_percentile_nanos(RwLockKind::Write, p)),
);
read_acquire_percentiles.insert(
key.clone(),
format_duration(rw_lock.acquire_percentile_nanos(RwLockKind::Read, p)),
);
write_acquire_percentiles.insert(
key,
format_duration(rw_lock.acquire_percentile_nanos(RwLockKind::Write, p)),
);
}
JsonRwLockEntry {
id: rw_lock.id,
source: rw_lock.source.to_string(),
label,
has_custom_label: rw_lock.label.is_some(),
type_name: rw_lock.type_name.to_string(),
read_count: rw_lock.read_count,
write_count: rw_lock.write_count,
read_wait_avg: format_duration(rw_lock.wait_avg_nanos(RwLockKind::Read)),
write_wait_avg: format_duration(rw_lock.wait_avg_nanos(RwLockKind::Write)),
read_acquire_avg: format_duration(rw_lock.acquire_avg_nanos(RwLockKind::Read)),
write_acquire_avg: format_duration(rw_lock.acquire_avg_nanos(RwLockKind::Write)),
read_wait_percentiles,
write_wait_percentiles,
read_acquire_percentiles,
write_acquire_percentiles,
iter: rw_lock.iter,
}
}
pub(crate) fn collect_rw_locks_json(
rw_locks: &[RwLockEntry],
elapsed: std::time::Duration,
percentiles: &[f64],
) -> JsonRwLocksList {
JsonRwLocksList {
current_elapsed_ns: elapsed.as_nanos() as u64,
percentiles: percentiles.to_vec(),
data: rw_locks
.iter()
.map(|rw_lock| rw_lock_to_json(rw_lock, percentiles))
.collect(),
}
}
pub(crate) fn shutdown_mutexes() -> Vec<MutexEntry> {
crate::lib_on::mutexes::flush_mutex_batch();
MUTEXES_STATE
.get()
.and_then(|state| {
if let Ok(mut guard) = state.shutdown_tx.lock() {
if let Some(tx) = guard.take() {
let _ = tx.send(());
}
}
state
.completion_rx
.lock()
.ok()
.and_then(|mut guard| guard.take())
.and_then(|rx| rx.recv().ok());
state
.inner
.read()
.ok()
.map(|inner| inner.stats.values().cloned().collect::<Vec<_>>())
})
.map(|mut mutexes| {
mutexes.sort_by(compare_mutex_entries);
mutexes
})
.unwrap_or_default()
}
pub(crate) fn report_mutexes_table(
mutexes: &[MutexEntry],
total_count: usize,
percentiles: &[f64],
writer: &mut dyn Write,
) {
let rows: Vec<&MutexEntry> = mutexes.iter().filter(|l| l.count > 0).collect();
if rows.is_empty() {
return;
}
write_section_header(writer, "mutexes", "Mutex wait & acquire time statistics.");
if mutexes.len() < total_count {
let _ = write!(writer, " ({}/{})", mutexes.len(), total_count);
}
let _ = writeln!(writer);
let mut header = vec![
styled_header("Mutex"),
styled_header("Locks"),
styled_header("Wait avg"),
];
for &p in percentiles {
header.push(styled_header(&format!(
"Wait {}",
format_percentile_header(p)
)));
}
header.push(styled_header("Acq avg"));
for &p in percentiles {
header.push(styled_header(&format!(
"Acq {}",
format_percentile_header(p)
)));
}
let mut table = Table::new();
table.add_row(Row::new(header));
for mutex in rows {
let label = resolve_label(mutex.source, mutex.label.as_deref(), Some(mutex.iter));
let mut row = vec![
Cell::new(&label),
Cell::new(&mutex.count.to_string()),
Cell::new(&format_duration(mutex.wait_avg_nanos())),
];
for &p in percentiles {
row.push(Cell::new(&format_duration(mutex.wait_percentile_nanos(p))));
}
row.push(Cell::new(&format_duration(mutex.acquire_avg_nanos())));
for &p in percentiles {
row.push(Cell::new(&format_duration(
mutex.acquire_percentile_nanos(p),
)));
}
table.add_row(Row::new(row));
}
print_table(&table, writer);
let _ = writeln!(writer);
}
fn mutex_to_json(mutex: &MutexEntry, percentiles: &[f64]) -> JsonMutexEntry {
let label = resolve_label(mutex.source, mutex.label.as_deref(), Some(mutex.iter));
let mut wait_percentiles = HashMap::new();
let mut acquire_percentiles = HashMap::new();
for &p in percentiles {
let key = format_percentile_key(p);
wait_percentiles.insert(key.clone(), format_duration(mutex.wait_percentile_nanos(p)));
acquire_percentiles.insert(key, format_duration(mutex.acquire_percentile_nanos(p)));
}
JsonMutexEntry {
id: mutex.id,
source: mutex.source.to_string(),
label,
has_custom_label: mutex.label.is_some(),
type_name: mutex.type_name.to_string(),
count: mutex.count,
wait_avg: format_duration(mutex.wait_avg_nanos()),
acquire_avg: format_duration(mutex.acquire_avg_nanos()),
wait_percentiles,
acquire_percentiles,
iter: mutex.iter,
}
}
pub(crate) fn collect_mutexes_json(
mutexes: &[MutexEntry],
elapsed: std::time::Duration,
percentiles: &[f64],
) -> JsonMutexesList {
JsonMutexesList {
current_elapsed_ns: elapsed.as_nanos() as u64,
percentiles: percentiles.to_vec(),
data: mutexes
.iter()
.map(|mutex| mutex_to_json(mutex, percentiles))
.collect(),
}
}
pub(crate) fn shutdown_streams() -> Vec<StreamStats> {
crate::streams::flush_stream_batch();
STREAMS_STATE
.get()
.and_then(|state| {
if let Ok(mut guard) = state.shutdown_tx.lock() {
if let Some(tx) = guard.take() {
let _ = tx.send(());
}
}
state
.completion_rx
.lock()
.ok()
.and_then(|mut guard| guard.take())
.and_then(|rx| rx.recv().ok());
state
.inner
.read()
.ok()
.map(|inner| inner.stats.values().cloned().collect::<Vec<_>>())
})
.map(|mut streams| {
streams.sort_by(compare_stream_stats);
streams
})
.unwrap_or_default()
}
pub(crate) fn report_streams_table(
streams: &[StreamStats],
total_count: usize,
writer: &mut dyn Write,
) {
if streams.is_empty() {
return;
}
write_section_header(writer, "streams", "Stream yield statistics.");
let mut table = Table::new();
table.add_row(Row::new(vec![
styled_header("Stream"),
styled_header("State"),
styled_header("Yielded"),
]));
for stream_stats in streams {
let label = resolve_label(
stream_stats.source,
stream_stats.label.as_deref(),
Some(stream_stats.iter),
);
table.add_row(Row::new(vec![
Cell::new(&label),
Cell::new(stream_stats.state.as_str()),
Cell::new(&stream_stats.items_yielded.to_string()),
]));
}
if streams.len() < total_count {
let _ = write!(writer, " ({}/{})", streams.len(), total_count);
}
let _ = writeln!(writer);
print_table(&table, writer);
let _ = writeln!(writer);
}
pub(crate) fn collect_streams_json(
streams: &[StreamStats],
elapsed: std::time::Duration,
) -> JsonStreamsList {
JsonStreamsList {
current_elapsed_ns: elapsed.as_nanos() as u64,
data: streams.iter().map(JsonStreamEntry::from).collect(),
}
}
pub(crate) fn shutdown_futures() -> Vec<FutureEntry> {
crate::lib_on::futures::flush_future_batch();
FUTURES_STATE
.get()
.and_then(|state| {
if let Ok(mut guard) = state.shutdown_tx.lock() {
if let Some(tx) = guard.take() {
let _ = tx.send(());
}
}
state
.completion_rx
.lock()
.ok()
.and_then(|mut guard| guard.take())
.and_then(|rx| rx.recv().ok());
state
.inner
.read()
.ok()
.map(|inner| inner.stats.values().cloned().collect::<Vec<_>>())
})
.map(|mut futures| {
futures.sort_by(compare_future_stats);
futures
})
.unwrap_or_default()
}
pub(crate) fn report_futures_table(
futures: &[FutureEntry],
total_count: usize,
writer: &mut dyn Write,
) {
if futures.is_empty() {
return;
}
write_section_header(writer, "futures", "Future poll and lifecycle statistics.");
let mut table = Table::new();
table.add_row(Row::new(vec![
styled_header("Future"),
styled_header("Calls"),
styled_header("Polls"),
styled_header("Avg Poll"),
styled_header("Total Poll"),
styled_header("Avg Alloc"),
styled_header("Total Alloc"),
]));
for future_stats in futures {
let label = resolve_label(future_stats.source, future_stats.label.as_deref(), None);
let total_calls = future_stats.logs_count;
let total_polls = future_stats.total_polls();
let total_poll_dur = future_stats.total_poll_duration_ns();
let total_alloc_bytes_across_polls = future_stats.total_poll_alloc_bytes();
let avg_poll = match total_poll_dur.checked_div(total_polls) {
Some(avg) => format_duration(avg),
None => "-".to_string(),
};
let avg_alloc_per_call = match total_alloc_bytes_across_polls {
Some(total_alloc_bytes) if total_calls > 0 => {
format_bytes(total_alloc_bytes / total_calls)
}
_ => "-".to_string(),
};
let total_alloc = total_alloc_bytes_across_polls
.map(format_bytes)
.unwrap_or_else(|| "-".to_string());
table.add_row(Row::new(vec![
Cell::new(&label),
Cell::new(&total_calls.to_string()),
Cell::new(&total_polls.to_string()),
Cell::new(&avg_poll),
Cell::new(&format_duration(total_poll_dur)),
Cell::new(&avg_alloc_per_call),
Cell::new(&total_alloc),
]));
}
if futures.len() < total_count {
let _ = write!(writer, " ({}/{})", futures.len(), total_count);
}
let _ = writeln!(writer);
print_table(&table, writer);
let _ = writeln!(writer);
}
pub(crate) fn collect_futures_json(
futures: &[FutureEntry],
elapsed: std::time::Duration,
) -> JsonFuturesList {
JsonFuturesList {
current_elapsed_ns: elapsed.as_nanos() as u64,
data: futures.iter().map(JsonFutureEntry::from).collect(),
}
}
#[cfg(feature = "threads")]
pub(crate) fn report_threads_table(writer: &mut dyn Write, limit: usize) {
let mut threads_json = crate::threads::get_threads_json();
if threads_json.data.is_empty() {
return;
}
let total_count = threads_json.data.len();
if limit > 0 && limit < total_count {
threads_json.data.truncate(limit);
}
write_section_header(writer, "threads", "Thread CPU and memory statistics.");
let has_alloc = threads_json.data.iter().any(|t| t.alloc_bytes.is_some());
let mut header = vec![
styled_header("Thread"),
styled_header("Status"),
styled_header("CPU%"),
styled_header("Max%"),
styled_header("Avg%"),
];
if has_alloc {
header.push(styled_header("Alloc"));
header.push(styled_header("Dealloc"));
header.push(styled_header("Diff"));
}
let mut table = Table::new();
table.add_row(Row::new(header));
for thread in &threads_json.data {
let cpu_pct = thread.cpu_percent.as_deref().unwrap_or("-");
let cpu_pct_max = thread.cpu_percent_max.as_deref().unwrap_or("-");
let cpu_pct_avg = thread.cpu_percent_avg.as_deref().unwrap_or("-");
let mut row = vec![
Cell::new(&thread.name),
Cell::new(&thread.status),
Cell::new(cpu_pct),
Cell::new(cpu_pct_max),
Cell::new(cpu_pct_avg),
];
if has_alloc {
row.push(Cell::new(thread.alloc_bytes.as_deref().unwrap_or("-")));
row.push(Cell::new(thread.dealloc_bytes.as_deref().unwrap_or("-")));
row.push(Cell::new(thread.mem_diff.as_deref().unwrap_or("-")));
}
table.add_row(Row::new(row));
}
let mut info_parts = Vec::new();
if let Some(rss) = &threads_json.rss_bytes {
info_parts.push(format!("RSS: {}", rss));
}
if let Some(alloc) = &threads_json.total_alloc_bytes {
info_parts.push(format!("Alloc: {}", alloc));
}
if let Some(dealloc) = &threads_json.total_dealloc_bytes {
info_parts.push(format!("Dealloc: {}", dealloc));
}
if let Some(diff) = &threads_json.alloc_dealloc_diff {
info_parts.push(format!("Diff: {}", diff));
}
let displayed = threads_json.data.len();
if displayed < total_count {
info_parts.push(format!("{}/{}", displayed, total_count));
}
if !info_parts.is_empty() {
let _ = write!(writer, " ({})", info_parts.join(", "));
}
let _ = writeln!(writer);
print_table(&table, writer);
let _ = writeln!(writer);
}
#[cfg(feature = "threads")]
pub(crate) fn collect_threads_json(limit: usize) -> crate::json::JsonThreadsList {
let mut json = crate::threads::get_threads_json();
if limit > 0 && limit < json.data.len() {
json.data.truncate(limit);
}
json
}
pub(crate) fn report_debug_table(writer: &mut dyn Write) {
let dbg_entries = get_sorted_debug_dbg_entries();
let val_entries = get_sorted_debug_val_entries();
let gauge_entries = get_sorted_debug_gauge_entries();
if dbg_entries.is_empty() && val_entries.is_empty() && gauge_entries.is_empty() {
return;
}
write_section_header(writer, "debug", "Debug last values (dbg!, val!, gauge!).");
let header = vec![
styled_header("Type"),
styled_header("Key/Expr"),
styled_header("Value"),
styled_header("Updates"),
styled_header("Source"),
];
let mut table = Table::new();
table.add_row(Row::new(header));
let mut entries: Vec<JsonDebugEntry> = Vec::new();
entries.extend(dbg_entries.iter().map(JsonDebugEntry::from));
entries.extend(val_entries.iter().map(JsonDebugEntry::from));
entries.extend(gauge_entries.iter().map(JsonDebugEntry::from));
for entry in &entries {
let value = entry.last_value.as_deref().unwrap_or("-");
table.add_row(Row::new(vec![
Cell::new(entry.entry_type.as_str()),
Cell::new(&entry.expression),
Cell::new(value),
Cell::new(&entry.log_count.to_string()),
Cell::new(&entry.source_display),
]));
}
let _ = writeln!(writer);
print_table(&table, writer);
let _ = writeln!(writer);
}
pub(crate) fn collect_debug_json(elapsed: std::time::Duration) -> crate::json::JsonDebugList {
let mut entries: Vec<JsonDebugEntry> = Vec::new();
entries.extend(
get_sorted_debug_dbg_entries()
.iter()
.map(JsonDebugEntry::from),
);
entries.extend(
get_sorted_debug_val_entries()
.iter()
.map(JsonDebugEntry::from),
);
entries.extend(
get_sorted_debug_gauge_entries()
.iter()
.map(JsonDebugEntry::from),
);
crate::json::JsonDebugList {
current_elapsed_ns: elapsed.as_nanos() as u64,
entries,
}
}