use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use sysinfo::{Pid, ProcessRefreshKind, System};
#[derive(Debug, Default)]
pub struct StageMetrics {
pub name: &'static str,
pub threads: usize,
pub wall_time: Duration,
pub input_wait: Duration,
pub output_wait: Duration,
pub items_processed: usize,
pub secondary_count: usize,
pub secondary_label: &'static str,
}
impl StageMetrics {
pub fn throughput(&self) -> Option<f64> {
let secs = self.wall_time.as_secs_f64();
if secs >= 0.01 {
Some(self.items_processed as f64 / secs)
} else {
None
}
}
pub fn active_time(&self) -> Duration {
self.wall_time
.saturating_sub(self.input_wait)
.saturating_sub(self.output_wait)
}
pub fn percentage_of(&self, total: Duration) -> f64 {
if total.as_nanos() > 0 {
(self.wall_time.as_nanos() as f64 / total.as_nanos() as f64) * 100.0
} else {
0.0
}
}
}
#[derive(Debug)]
pub struct StageTracker {
name: &'static str,
threads: usize,
start: Instant,
items: AtomicUsize,
secondary: AtomicUsize,
secondary_label: &'static str,
input_wait_ns: AtomicU64,
output_wait_ns: AtomicU64,
}
impl StageTracker {
pub fn new(name: &'static str, threads: usize) -> Self {
Self {
name,
threads,
start: Instant::now(),
items: AtomicUsize::new(0),
secondary: AtomicUsize::new(0),
secondary_label: "",
input_wait_ns: AtomicU64::new(0),
output_wait_ns: AtomicU64::new(0),
}
}
pub fn with_secondary(mut self, label: &'static str) -> Self {
self.secondary_label = label;
self
}
pub fn record_item(&self) {
self.items.fetch_add(1, Ordering::Relaxed);
}
pub fn record_items(&self, count: usize) {
self.items.fetch_add(count, Ordering::Relaxed);
}
pub fn record_secondary(&self, count: usize) {
self.secondary.fetch_add(count, Ordering::Relaxed);
}
pub fn record_input_wait(&self, duration: Duration) {
self.input_wait_ns
.fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
}
pub fn record_output_wait(&self, duration: Duration) {
self.output_wait_ns
.fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
}
pub fn finalize(self) -> StageMetrics {
StageMetrics {
name: self.name,
threads: self.threads,
wall_time: self.start.elapsed(),
input_wait: Duration::from_nanos(self.input_wait_ns.load(Ordering::Relaxed)),
output_wait: Duration::from_nanos(self.output_wait_ns.load(Ordering::Relaxed)),
items_processed: self.items.load(Ordering::Relaxed),
secondary_count: self.secondary.load(Ordering::Relaxed),
secondary_label: self.secondary_label,
}
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct MemorySnapshot {
pub rss: u64,
pub virtual_mem: u64,
}
impl MemorySnapshot {
pub fn current() -> Self {
let mut sys = System::new();
let pid = Pid::from_u32(std::process::id());
sys.refresh_processes_specifics(
sysinfo::ProcessesToUpdate::Some(&[pid]),
true,
ProcessRefreshKind::nothing().with_memory(),
);
if let Some(process) = sys.process(pid) {
Self {
rss: process.memory(),
virtual_mem: process.virtual_memory(),
}
} else {
Self::default()
}
}
pub fn rss_human(&self) -> String {
format_bytes(self.rss)
}
}
fn format_bytes(bytes: u64) -> String {
const KB: u64 = 1024;
const MB: u64 = KB * 1024;
const GB: u64 = MB * 1024;
if bytes >= GB {
format!("{:.1}GB", bytes as f64 / GB as f64)
} else if bytes >= MB {
format!("{:.1}MB", bytes as f64 / MB as f64)
} else if bytes >= KB {
format!("{:.1}KB", bytes as f64 / KB as f64)
} else {
format!("{bytes}B")
}
}
#[derive(Debug, Default)]
pub struct PipelineReport {
pub directory: String,
pub stages: Vec<StageMetrics>,
pub memory_start: MemorySnapshot,
pub memory_end: MemorySnapshot,
pub total_time: Duration,
}
impl PipelineReport {
pub fn new(directory: impl Into<String>) -> Self {
Self {
directory: directory.into(),
stages: Vec::new(),
memory_start: MemorySnapshot::current(),
memory_end: MemorySnapshot::default(),
total_time: Duration::ZERO,
}
}
pub fn add_stage(&mut self, metrics: StageMetrics) {
self.stages.push(metrics);
}
pub fn finalize(&mut self, total_time: Duration) {
self.total_time = total_time;
self.memory_end = MemorySnapshot::current();
}
pub fn bottleneck(&self) -> Option<&StageMetrics> {
self.stages
.iter()
.max_by(|a, b| a.wall_time.cmp(&b.wall_time))
}
pub fn log(&self) {
tracing::info!(target: "pipeline", "");
tracing::info!(target: "pipeline", "========================================");
tracing::info!(target: "pipeline", "PIPELINE TRACE: {}", self.directory);
tracing::info!(target: "pipeline", "========================================");
tracing::info!(target: "pipeline",
"{:<10} {:>7} {:>10} {:>14} {:>12}",
"Stage", "Threads", "Time", "Throughput", "Wait"
);
tracing::info!(target: "pipeline", "{}", "-".repeat(60));
for stage in &self.stages {
let throughput = match stage.throughput() {
Some(t) => format!("{t:.0}/s"),
None => "-".to_string(),
};
let wait = stage.input_wait + stage.output_wait;
let wait_str = if wait.as_millis() > 0 {
format!("{:.1}s", wait.as_secs_f64())
} else {
"-".to_string()
};
tracing::info!(target: "pipeline",
"{:<10} {:>7} {:>10} {:>14} {:>12}",
stage.name,
stage.threads,
format!("{:.2}s", stage.wall_time.as_secs_f64()),
throughput,
wait_str
);
if !stage.secondary_label.is_empty() && stage.secondary_count > 0 {
tracing::info!(target: "pipeline",
" {:>7} {:>10}",
"",
format!("{} {}", stage.secondary_count, stage.secondary_label)
);
}
}
tracing::info!(target: "pipeline", "{}", "-".repeat(60));
let mem_delta = self.memory_end.rss.saturating_sub(self.memory_start.rss);
let bottleneck = self
.bottleneck()
.map(|s| format!("{} ({:.0}%)", s.name, s.percentage_of(self.total_time)))
.unwrap_or_else(|| "-".to_string());
tracing::info!(target: "pipeline",
"Total: {:.2}s | Memory: {} -> {} (+{}) | Bottleneck: {}",
self.total_time.as_secs_f64(),
self.memory_start.rss_human(),
self.memory_end.rss_human(),
format_bytes(mem_delta),
bottleneck
);
tracing::info!(target: "pipeline", "");
}
}
#[derive(Debug)]
pub struct PipelineMetrics {
enabled: bool,
report: std::sync::Mutex<PipelineReport>,
}
impl PipelineMetrics {
pub fn new(directory: impl Into<String>, enabled: bool) -> Arc<Self> {
Arc::new(Self {
enabled,
report: std::sync::Mutex::new(PipelineReport::new(directory)),
})
}
pub fn is_enabled(&self) -> bool {
self.enabled
}
pub fn add_stage(&self, metrics: StageMetrics) {
if self.enabled {
if let Ok(mut report) = self.report.lock() {
report.add_stage(metrics);
}
}
}
pub fn finalize(&self, total_time: Duration) {
if self.enabled {
if let Ok(mut report) = self.report.lock() {
report.finalize(total_time);
}
}
}
pub fn log(&self) {
if self.enabled {
if let Ok(report) = self.report.lock() {
report.log();
}
}
}
pub fn finalize_and_log(&self, total_time: Duration) {
if self.enabled {
if let Ok(mut report) = self.report.lock() {
report.finalize(total_time);
report.log();
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stage_tracker() {
let tracker = StageTracker::new("TEST", 4).with_secondary("symbols");
tracker.record_items(100);
tracker.record_secondary(500);
tracker.record_input_wait(Duration::from_millis(50));
std::thread::sleep(Duration::from_millis(10));
let metrics = tracker.finalize();
assert_eq!(metrics.name, "TEST");
assert_eq!(metrics.threads, 4);
assert_eq!(metrics.items_processed, 100);
assert_eq!(metrics.secondary_count, 500);
assert!(metrics.wall_time >= Duration::from_millis(10));
}
#[test]
fn test_memory_snapshot() {
let snapshot = MemorySnapshot::current();
assert!(snapshot.rss > 0);
}
#[test]
fn test_format_bytes() {
assert_eq!(format_bytes(500), "500B");
assert_eq!(format_bytes(1500), "1.5KB");
assert_eq!(format_bytes(1_500_000), "1.4MB");
assert_eq!(format_bytes(1_500_000_000), "1.4GB");
}
}