use std::fmt::Write as FmtWrite;
use std::io::{self, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use super::resources::{read_cpu_times, read_meminfo, CpuTimes};
#[derive(Debug, Clone)]
pub struct GpuTimelineSample {
pub device: u8,
pub compute_util: u8,
pub vram_used_bytes: u64,
pub vram_allocated_bytes: u64,
pub vram_total_bytes: u64,
}
#[derive(Debug, Clone)]
pub struct TimelineSample {
pub elapsed_ms: u64,
pub cpu_util: f32,
pub ram_used_bytes: u64,
pub ram_total_bytes: u64,
pub gpus: Vec<GpuTimelineSample>,
}
#[derive(Debug, Clone)]
pub struct TimelineEvent {
pub elapsed_ms: u64,
pub kind: EventKind,
}
#[derive(Debug, Clone)]
pub enum EventKind {
EpochStart { epoch: usize },
EpochEnd { epoch: usize, loss: f64 },
SyncStart,
SyncEnd { duration_ms: f64 },
CpuAvgStart,
CpuAvgEnd { duration_ms: f64 },
AnchorChanged { from: usize, to: usize },
Throttle { rank: usize },
Idle { device: u8, duration_ms: f64 },
Custom { label: String },
}
#[derive(Debug, Clone)]
pub struct TimelineSummary {
pub total_ms: u64,
pub sample_count: usize,
pub event_count: usize,
pub gpu_idle_pct: Vec<f64>,
pub sync_count: usize,
pub avg_sync_ms: f64,
pub cpu_avg_count: usize,
pub avg_cpu_avg_ms: f64,
pub anchor_change_count: usize,
pub throttle_count: usize,
}
#[derive(Debug, Clone)]
pub struct TimelineBroadcast {
pub samples: Vec<TimelineSample>,
pub events: Vec<TimelineEvent>,
}
pub struct Timeline {
start: Instant,
poll_interval_ms: u64,
broadcast_interval_ms: u64,
samples: Mutex<Vec<TimelineSample>>,
events: Mutex<Vec<TimelineEvent>>,
stop_flag: AtomicBool,
poller_handle: Mutex<Option<JoinHandle<()>>>,
subscribers: Mutex<Vec<mpsc::Sender<TimelineBroadcast>>>,
pending_samples: Mutex<Vec<TimelineSample>>,
pending_events: Mutex<Vec<TimelineEvent>>,
}
impl Timeline {
pub fn new(poll_interval_ms: u64) -> Arc<Self> {
Arc::new(Self {
start: Instant::now(),
poll_interval_ms,
broadcast_interval_ms: 1000,
samples: Mutex::new(Vec::new()),
events: Mutex::new(Vec::new()),
stop_flag: AtomicBool::new(false),
poller_handle: Mutex::new(None),
subscribers: Mutex::new(Vec::new()),
pending_samples: Mutex::new(Vec::new()),
pending_events: Mutex::new(Vec::new()),
})
}
pub fn with_intervals(poll_interval_ms: u64, broadcast_interval_ms: u64) -> Arc<Self> {
Arc::new(Self {
start: Instant::now(),
poll_interval_ms,
broadcast_interval_ms,
samples: Mutex::new(Vec::new()),
events: Mutex::new(Vec::new()),
stop_flag: AtomicBool::new(false),
poller_handle: Mutex::new(None),
subscribers: Mutex::new(Vec::new()),
pending_samples: Mutex::new(Vec::new()),
pending_events: Mutex::new(Vec::new()),
})
}
pub fn subscribe(&self) -> mpsc::Receiver<TimelineBroadcast> {
let (tx, rx) = mpsc::channel();
self.subscribers.lock().unwrap().push(tx);
rx
}
pub fn start(self: &Arc<Self>) {
let mut handle = self.poller_handle.lock().unwrap();
if handle.is_some() {
return; }
self.stop_flag.store(false, Ordering::SeqCst);
let tl = Arc::clone(self);
*handle = Some(thread::spawn(move || tl.poll_loop()));
}
pub fn stop(&self) {
self.stop_flag.store(true, Ordering::SeqCst);
let handle = self.poller_handle.lock().unwrap().take();
if let Some(h) = handle {
let _ = h.join();
}
}
pub fn event(&self, kind: EventKind) {
let elapsed_ms = self.start.elapsed().as_millis() as u64;
let evt = TimelineEvent { elapsed_ms, kind };
self.events.lock().unwrap().push(evt.clone());
self.pending_events.lock().unwrap().push(evt);
}
pub fn elapsed_ms(&self) -> u64 {
self.start.elapsed().as_millis() as u64
}
pub fn idle_gaps(&self, device: u8, threshold_pct: u8, min_ms: u64) -> Vec<(u64, u64)> {
let samples = self.samples.lock().unwrap();
let mut gaps = Vec::new();
let mut gap_start: Option<u64> = None;
for s in samples.iter() {
let util = s
.gpus
.iter()
.find(|g| g.device == device)
.map(|g| g.compute_util)
.unwrap_or(100);
if util < threshold_pct {
if gap_start.is_none() {
gap_start = Some(s.elapsed_ms);
}
} else if let Some(start) = gap_start.take() {
let duration = s.elapsed_ms.saturating_sub(start);
if duration >= min_ms {
gaps.push((start, s.elapsed_ms));
}
}
}
if let Some(start) = gap_start {
if let Some(last) = samples.last() {
let duration = last.elapsed_ms.saturating_sub(start);
if duration >= min_ms {
gaps.push((start, last.elapsed_ms));
}
}
}
gaps
}
pub fn summary(&self) -> TimelineSummary {
let samples = self.samples.lock().unwrap();
let events = self.events.lock().unwrap();
let total_ms = samples.last().map(|s| s.elapsed_ms).unwrap_or(0);
let sample_count = samples.len();
let n_gpus = samples.first().map(|s| s.gpus.len()).unwrap_or(0);
let mut gpu_idle_pct = vec![0.0; n_gpus];
if sample_count > 0 {
for s in samples.iter() {
for (gi, g) in s.gpus.iter().enumerate() {
if g.compute_util < 5 {
gpu_idle_pct[gi] += 1.0;
}
}
}
for v in &mut gpu_idle_pct {
*v = *v / sample_count as f64 * 100.0;
}
}
let mut sync_count = 0usize;
let mut sync_total_ms = 0.0f64;
let mut sync_end_count = 0usize;
let mut cpu_avg_count = 0usize;
let mut cpu_avg_total_ms = 0.0f64;
let mut cpu_avg_end_count = 0usize;
let mut anchor_change_count = 0usize;
let mut throttle_count = 0usize;
for e in events.iter() {
match &e.kind {
EventKind::SyncStart => sync_count += 1,
EventKind::SyncEnd { duration_ms } => {
sync_total_ms += duration_ms;
sync_end_count += 1;
}
EventKind::CpuAvgStart => cpu_avg_count += 1,
EventKind::CpuAvgEnd { duration_ms } => {
cpu_avg_total_ms += duration_ms;
cpu_avg_end_count += 1;
}
EventKind::AnchorChanged { .. } => anchor_change_count += 1,
EventKind::Throttle { .. } => throttle_count += 1,
_ => {}
}
}
TimelineSummary {
total_ms,
sample_count,
event_count: events.len(),
gpu_idle_pct,
sync_count,
avg_sync_ms: if sync_end_count > 0 {
sync_total_ms / sync_end_count as f64
} else {
0.0
},
cpu_avg_count,
avg_cpu_avg_ms: if cpu_avg_end_count > 0 {
cpu_avg_total_ms / cpu_avg_end_count as f64
} else {
0.0
},
anchor_change_count,
throttle_count,
}
}
pub fn drain(&self) -> (Vec<TimelineSample>, Vec<TimelineEvent>) {
let mut samples = self.samples.lock().unwrap();
let mut events = self.events.lock().unwrap();
let s = std::mem::take(&mut *samples);
let e = std::mem::take(&mut *events);
(s, e)
}
pub fn sample_count(&self) -> usize {
self.samples.lock().unwrap().len()
}
pub fn save_json(&self, path: &str) -> io::Result<()> {
let samples = self.samples.lock().unwrap();
let events = self.events.lock().unwrap();
let mut out = String::with_capacity(samples.len() * 120 + events.len() * 80);
out.push_str("{\n\"samples\":[\n");
write_samples_json(&mut out, &samples);
out.push_str("],\n\"events\":[\n");
write_events_json(&mut out, &events);
out.push_str("]\n}\n");
let mut f = std::fs::File::create(path)?;
f.write_all(out.as_bytes())
}
pub fn save_csv(&self, path: &str) -> io::Result<()> {
let samples = self.samples.lock().unwrap();
let n_gpus = samples.first().map(|s| s.gpus.len()).unwrap_or(0);
let mut out = String::with_capacity(samples.len() * 80);
out.push_str("elapsed_ms,cpu_util,ram_used,ram_total");
for i in 0..n_gpus {
let _ = write!(
out,
",gpu{i}_util,gpu{i}_vram_alloc,gpu{i}_vram_used,gpu{i}_vram_total"
);
}
out.push('\n');
for s in samples.iter() {
let _ = write!(
out,
"{},{:.1},{},{}",
s.elapsed_ms, s.cpu_util, s.ram_used_bytes, s.ram_total_bytes,
);
for g in &s.gpus {
let _ = write!(
out,
",{},{},{},{}",
g.compute_util, g.vram_allocated_bytes, g.vram_used_bytes, g.vram_total_bytes,
);
}
out.push('\n');
}
let mut f = std::fs::File::create(path)?;
f.write_all(out.as_bytes())
}
pub fn save_html(&self, path: &str) -> io::Result<()> {
let samples = self.samples.lock().unwrap();
let events = self.events.lock().unwrap();
let template = include_str!("timeline.html");
let mut samples_json = String::with_capacity(samples.len() * 100);
write_samples_json(&mut samples_json, &samples);
let mut events_json = String::with_capacity(events.len() * 80);
write_events_json(&mut events_json, &events);
let inject = format!(
"<script>\nconst TIMELINE_SAMPLES=[{}];\nconst TIMELINE_EVENTS=[{}];\n</script>\n",
samples_json, events_json,
);
let html = template.replacen("<!-- TIMELINE_DATA -->", &inject, 1);
let mut f = std::fs::File::create(path)?;
f.write_all(html.as_bytes())
}
fn poll_loop(&self) {
let interval = Duration::from_millis(self.poll_interval_ms);
let broadcast_interval = Duration::from_millis(self.broadcast_interval_ms);
let mut prev_cpu: Option<CpuTimes> = None;
let mut last_broadcast = Instant::now();
let n_gpus = {
#[cfg(feature = "cuda")]
{
crate::tensor::cuda_device_count().max(0) as usize
}
#[cfg(not(feature = "cuda"))]
{
0usize
}
};
while !self.stop_flag.load(Ordering::SeqCst) {
let elapsed_ms = self.start.elapsed().as_millis() as u64;
let cur_cpu = read_cpu_times();
let cpu_util = match (&prev_cpu, &cur_cpu) {
(Some(prev), Some(cur)) => {
let dt = cur.total.saturating_sub(prev.total);
let di = cur.idle.saturating_sub(prev.idle);
if dt > 0 {
(dt.saturating_sub(di)) as f32 / dt as f32 * 100.0
} else {
0.0
}
}
_ => 0.0,
};
prev_cpu = cur_cpu;
let (ram_used, ram_total) = read_meminfo().unwrap_or((0, 0));
let mut gpus = Vec::with_capacity(n_gpus);
for i in 0..n_gpus {
#[cfg(feature = "cuda")]
let (compute_util, vram_used, vram_alloc, vram_total) = {
let idx = i as i32;
let util = crate::tensor::cuda_utilization_idx(idx)
.map(|u| u as u8)
.unwrap_or(0);
let (used, total) = crate::tensor::cuda_memory_info_idx(idx).unwrap_or((0, 0));
let alloc = crate::tensor::cuda_allocated_bytes_idx(idx).unwrap_or(0);
(util, used, alloc, total)
};
#[cfg(not(feature = "cuda"))]
let (compute_util, vram_used, vram_alloc, vram_total) = (0u8, 0u64, 0u64, 0u64);
gpus.push(GpuTimelineSample {
device: i as u8,
compute_util,
vram_used_bytes: vram_used,
vram_allocated_bytes: vram_alloc,
vram_total_bytes: vram_total,
});
}
let sample = TimelineSample {
elapsed_ms,
cpu_util,
ram_used_bytes: ram_used,
ram_total_bytes: ram_total,
gpus,
};
self.samples.lock().unwrap().push(sample.clone());
self.pending_samples.lock().unwrap().push(sample);
if last_broadcast.elapsed() >= broadcast_interval {
self.flush_broadcast();
last_broadcast = Instant::now();
}
let wake = Instant::now() + interval;
while Instant::now() < wake {
if self.stop_flag.load(Ordering::SeqCst) {
self.flush_broadcast();
return;
}
thread::sleep(Duration::from_millis(10));
}
}
}
fn flush_broadcast(&self) {
let samples = std::mem::take(&mut *self.pending_samples.lock().unwrap());
let events = std::mem::take(&mut *self.pending_events.lock().unwrap());
if samples.is_empty() && events.is_empty() {
return;
}
let batch = TimelineBroadcast { samples, events };
let mut subs = self.subscribers.lock().unwrap();
subs.retain(|tx| tx.send(batch.clone()).is_ok());
}
}
impl Drop for Timeline {
fn drop(&mut self) {
self.stop_flag.store(true, Ordering::SeqCst);
if let Some(h) = self.poller_handle.lock().unwrap().take() {
let _ = h.join();
}
}
}
impl std::fmt::Debug for Timeline {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Timeline")
.field("poll_interval_ms", &self.poll_interval_ms)
.field("broadcast_interval_ms", &self.broadcast_interval_ms)
.field("samples", &self.samples.lock().unwrap().len())
.field("events", &self.events.lock().unwrap().len())
.field("running", &!self.stop_flag.load(Ordering::Relaxed))
.finish()
}
}
fn write_samples_json(out: &mut String, samples: &[TimelineSample]) {
for (i, s) in samples.iter().enumerate() {
if i > 0 {
out.push_str(",\n");
}
let _ = write!(
out,
"{{\"t\":{},\"cpu\":{:.1},\"ram\":[{},{}],\"gpus\":[",
s.elapsed_ms, s.cpu_util, s.ram_used_bytes, s.ram_total_bytes,
);
for (gi, g) in s.gpus.iter().enumerate() {
if gi > 0 {
out.push(',');
}
let _ = write!(
out,
"{{\"d\":{},\"u\":{},\"vu\":{},\"va\":{},\"vt\":{}}}",
g.device,
g.compute_util,
g.vram_used_bytes,
g.vram_allocated_bytes,
g.vram_total_bytes,
);
}
out.push_str("]}");
}
}
fn write_events_json(out: &mut String, events: &[TimelineEvent]) {
for (i, e) in events.iter().enumerate() {
if i > 0 {
out.push_str(",\n");
}
let _ = write!(out, "{{\"t\":{},", e.elapsed_ms);
match &e.kind {
EventKind::EpochStart { epoch } => {
let _ = write!(out, "\"k\":\"epoch_start\",\"epoch\":{epoch}");
}
EventKind::EpochEnd { epoch, loss } => {
let _ = write!(
out,
"\"k\":\"epoch_end\",\"epoch\":{epoch},\"loss\":{loss:.6}"
);
}
EventKind::SyncStart => {
out.push_str("\"k\":\"sync_start\"");
}
EventKind::SyncEnd { duration_ms } => {
let _ = write!(out, "\"k\":\"sync_end\",\"ms\":{duration_ms:.3}");
}
EventKind::CpuAvgStart => {
out.push_str("\"k\":\"cpu_avg_start\"");
}
EventKind::CpuAvgEnd { duration_ms } => {
let _ = write!(out, "\"k\":\"cpu_avg_end\",\"ms\":{duration_ms:.3}");
}
EventKind::AnchorChanged { from, to } => {
let _ = write!(out, "\"k\":\"anchor\",\"from\":{from},\"to\":{to}");
}
EventKind::Throttle { rank } => {
let _ = write!(out, "\"k\":\"throttle\",\"rank\":{rank}");
}
EventKind::Idle {
device,
duration_ms,
} => {
let _ = write!(
out,
"\"k\":\"idle\",\"dev\":{device},\"ms\":{duration_ms:.1}"
);
}
EventKind::Custom { label } => {
let escaped = label.replace('\\', "\\\\").replace('"', "\\\"");
let _ = write!(out, "\"k\":\"custom\",\"label\":\"{escaped}\"");
}
}
out.push('}');
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_timeline_create_and_event() {
let tl = Timeline::new(100);
tl.event(EventKind::EpochStart { epoch: 0 });
tl.event(EventKind::SyncStart);
tl.event(EventKind::SyncEnd { duration_ms: 1.5 });
tl.event(EventKind::EpochEnd {
epoch: 0,
loss: 0.42,
});
let events = tl.events.lock().unwrap();
assert_eq!(events.len(), 4);
assert!(matches!(events[0].kind, EventKind::EpochStart { epoch: 0 }));
}
#[test]
fn test_idle_gaps() {
let tl = Timeline::new(100);
{
let mut samples = tl.samples.lock().unwrap();
for i in 0..20 {
let util = if (5..15).contains(&i) { 2 } else { 80 };
samples.push(TimelineSample {
elapsed_ms: i * 100,
cpu_util: 50.0,
ram_used_bytes: 1_000_000,
ram_total_bytes: 8_000_000,
gpus: vec![GpuTimelineSample {
device: 0,
compute_util: util,
vram_used_bytes: 0,
vram_allocated_bytes: 0,
vram_total_bytes: 0,
}],
});
}
}
let gaps = tl.idle_gaps(0, 5, 500);
assert_eq!(gaps.len(), 1);
assert_eq!(gaps[0], (500, 1500)); }
#[test]
fn test_summary() {
let tl = Timeline::new(100);
tl.event(EventKind::SyncStart);
tl.event(EventKind::SyncEnd { duration_ms: 2.0 });
tl.event(EventKind::SyncStart);
tl.event(EventKind::SyncEnd { duration_ms: 4.0 });
tl.event(EventKind::AnchorChanged { from: 10, to: 12 });
tl.event(EventKind::Throttle { rank: 1 });
let s = tl.summary();
assert_eq!(s.sync_count, 2);
assert!((s.avg_sync_ms - 3.0).abs() < 0.01);
assert_eq!(s.anchor_change_count, 1);
assert_eq!(s.throttle_count, 1);
}
#[test]
fn test_json_export() {
let tl = Timeline::new(100);
{
let mut samples = tl.samples.lock().unwrap();
samples.push(TimelineSample {
elapsed_ms: 0,
cpu_util: 45.0,
ram_used_bytes: 4_000_000_000,
ram_total_bytes: 8_000_000_000,
gpus: vec![GpuTimelineSample {
device: 0,
compute_util: 82,
vram_used_bytes: 2_000_000_000,
vram_allocated_bytes: 1_800_000_000,
vram_total_bytes: 8_000_000_000,
}],
});
}
tl.event(EventKind::SyncStart);
let mut buf = String::new();
let samples = tl.samples.lock().unwrap();
let events = tl.events.lock().unwrap();
write_samples_json(&mut buf, &samples);
assert!(buf.contains("\"t\":0"));
assert!(buf.contains("\"u\":82"));
let mut buf2 = String::new();
write_events_json(&mut buf2, &events);
assert!(buf2.contains("\"sync_start\""));
}
#[test]
fn test_subscribe_receives_batches() {
let tl = Timeline::with_intervals(50, 200);
let rx = tl.subscribe();
tl.event(EventKind::EpochStart { epoch: 0 });
tl.start();
std::thread::sleep(Duration::from_millis(350));
tl.stop();
let mut total_samples = 0;
let mut total_events = 0;
while let Ok(batch) = rx.try_recv() {
total_samples += batch.samples.len();
total_events += batch.events.len();
}
assert!(total_samples >= 1, "expected samples, got {total_samples}");
assert!(total_events >= 1, "expected events, got {total_events}");
}
#[test]
fn test_with_intervals() {
let tl = Timeline::with_intervals(50, 500);
assert_eq!(tl.poll_interval_ms, 50);
assert_eq!(tl.broadcast_interval_ms, 500);
}
}