use std::path::PathBuf;
use std::time::{Duration, Instant};
use eframe::egui::{self, Color32, RichText, ScrollArea};
use super::facett_theme::{Theme, GREEN, RED};
use crate::warehouse::iceberg::{BenchTelemetryRow, IcebergWarehouse};
use crate::warehouse::{BenchFilter, Warehouse};
const RELOAD_EVERY: Duration = Duration::from_millis(1500);
enum Src {
Local(PathBuf),
Remote {
endpoint: String,
token: String,
workspace: String,
},
}
#[derive(Debug, Clone, PartialEq)]
pub struct LiveBench {
pub run_id: String,
pub repo: String,
pub bench: String,
pub n_cores: u32,
pub cores_busy_avg: f64,
pub cores_busy_max: u32,
pub cpu_pct_avg: f64,
pub cpu_pct_max: f64,
pub mem_peak_mb: f64,
pub elapsed_ms: f64,
pub ok: Option<bool>,
}
impl LiveBench {
pub fn cpu_spark(&self) -> Vec<f64> {
let busy_frac = if self.n_cores > 0 {
self.cores_busy_max as f64 / self.n_cores as f64 * 100.0
} else {
0.0
};
vec![self.cpu_pct_avg, self.cpu_pct_max, busy_frac]
}
pub fn cores_label(&self) -> String {
format!(
"cores-busy {:.1}/{} (peak {}/{})",
self.cores_busy_avg, self.n_cores, self.cores_busy_max, self.n_cores
)
}
}
pub struct BenchLive {
src: Src,
loaded: bool,
last_reload: Instant,
error: Option<String>,
benches: Vec<LiveBench>,
live_run_id: Option<String>,
theme: Theme,
}
impl BenchLive {
pub fn local(root: PathBuf) -> Self {
Self::with(Src::Local(root))
}
pub fn remote(endpoint: String, token: String, workspace: String) -> Self {
Self::with(Src::Remote {
endpoint,
token,
workspace,
})
}
pub fn set_workspace(&mut self, workspace: String) {
if let Src::Remote { workspace: w, .. } = &mut self.src {
*w = workspace;
}
self.reload();
}
fn with(src: Src) -> Self {
Self {
src,
loaded: false,
last_reload: Instant::now(),
error: None,
benches: Vec::new(),
live_run_id: None,
theme: Theme::default(),
}
}
pub fn set_palette(&mut self, t: Theme) {
self.theme = t;
}
#[doc(hidden)]
pub fn inject_for_test(
&mut self,
telemetry: Vec<BenchTelemetryRow>,
runs: Vec<(String, crate::bench::BenchRun)>,
) {
self.benches = fold_live(telemetry, &runs);
self.live_run_id = self.benches.first().map(|b| b.run_id.clone());
self.loaded = true;
self.last_reload = Instant::now() + Duration::from_secs(3600);
self.error = None;
self.emit_trace();
}
pub fn reload(&mut self) {
self.loaded = false;
self.error = None;
}
fn source_label(&self) -> String {
match &self.src {
Src::Local(p) => format!("local {}", p.display()),
Src::Remote { endpoint, workspace, .. } => {
format!("remote {endpoint} ws={workspace} (Viz.BenchTelemetry)")
}
}
}
fn load(&mut self) {
super::trace::emit_in(
"bench.live.load",
&serde_json::json!({ "source": self.source_label() }),
);
let res: anyhow::Result<(Vec<BenchTelemetryRow>, Vec<crate::bench::BenchRun>)> =
match &self.src {
Src::Local(root) => IcebergWarehouse::open_read_only(root).and_then(|wh| {
let telem = wh.query_bench_telemetry(None)?;
let runs = wh.query_bench_runs(&BenchFilter::default())?;
Ok((telem, runs))
}),
Src::Remote {
endpoint,
token,
workspace,
} => super::remote::fetch_bench_live(endpoint, token, workspace),
};
match res {
Ok((telem, runs)) => {
let keyed: Vec<(String, crate::bench::BenchRun)> =
runs.into_iter().map(|r| (String::new(), r)).collect();
self.benches = fold_live(telem, &keyed);
self.live_run_id = self.benches.first().map(|b| b.run_id.clone());
self.error = None;
self.emit_trace();
}
Err(e) => {
self.error = Some(format!("{e:#}"));
super::trace::emit_out("bench.live.load", &serde_json::json!({ "error": format!("{e:#}") }));
}
}
}
fn emit_trace(&self) {
super::trace::emit_out("bench.live.build", &self.benches_json());
let (green, red, neutral) = self.tallies();
super::trace::emit_end(
"bench.live",
&serde_json::json!({
"live_run_id": self.live_run_id,
"benches": self.benches.len(),
"green": green, "red": red, "neutral": neutral,
}),
);
}
fn tallies(&self) -> (usize, usize, usize) {
let mut green = 0;
let mut red = 0;
let mut neutral = 0;
for b in &self.benches {
match b.ok {
Some(true) => green += 1,
Some(false) => red += 1,
None => neutral += 1,
}
}
(green, red, neutral)
}
pub fn draw(&mut self, ui: &mut egui::Ui) {
if !self.loaded {
self.loaded = true;
self.load();
self.last_reload = Instant::now();
} else if self.last_reload.elapsed() >= RELOAD_EVERY {
self.load();
self.last_reload = Instant::now();
}
let theme = self.theme;
ui.heading(RichText::new("๐ก Bench โ live telemetry").color(theme.text));
if let Some(err) = self.error.clone() {
ui.colored_label(RED, format!("bench_telemetry read failed:\n{err}"));
ui.separator();
return;
}
ui.horizontal(|ui| {
ui.label(RichText::new("live run:").color(theme.text_dim));
ui.monospace(self.live_run_id.as_deref().unwrap_or("โ"));
ui.separator();
let (green, red, neutral) = self.tallies();
ui.colored_label(GREEN, format!("โ {green}"));
ui.colored_label(RED, format!("โ {red}"));
if neutral > 0 {
ui.colored_label(theme.text_dim, format!("ยท {neutral}"));
}
if ui.button("โป").on_hover_text("re-read bench_telemetry").clicked() {
self.reload();
}
});
if self.benches.is_empty() {
ui.add_space(8.0);
ui.weak("no bench telemetry recorded yet โ run a bench to light this up:");
ui.monospace("nornir bench <repo>");
ui.separator();
return;
}
ui.separator();
ScrollArea::vertical().auto_shrink([false, false]).max_height(220.0).show(ui, |ui| {
for b in &self.benches {
draw_live_row(ui, &theme, b);
}
});
ui.separator();
ui.ctx().request_repaint_after(Duration::from_millis(400));
}
pub fn state_json(&self) -> serde_json::Value {
let (green, red, neutral) = self.tallies();
serde_json::json!({
"palette": self.theme.name,
"source": self.source_label(),
"error": self.error,
"live_run_id": self.live_run_id,
"count": self.benches.len(),
"green": green, "red": red, "neutral": neutral,
"benches": self.benches_json_inner(),
})
}
fn benches_json(&self) -> serde_json::Value {
serde_json::json!({ "benches": self.benches_json_inner() })
}
fn benches_json_inner(&self) -> Vec<serde_json::Value> {
self.benches
.iter()
.map(|b| {
serde_json::json!({
"run_id": b.run_id,
"repo": b.repo,
"bench": b.bench,
"status": match b.ok {
Some(true) => "green",
Some(false) => "red",
None => "neutral",
},
"ok": b.ok,
"n_cores": b.n_cores,
"cores_busy_avg": round2(b.cores_busy_avg),
"cores_busy_max": b.cores_busy_max,
"cores_label": b.cores_label(),
"cpu_pct_avg": round2(b.cpu_pct_avg),
"cpu_pct_max": round2(b.cpu_pct_max),
"mem_peak_mb": round2(b.mem_peak_mb),
"elapsed_ms": round2(b.elapsed_ms),
"cpu_spark": b.cpu_spark().iter().map(|v| round2(*v)).collect::<Vec<_>>(),
})
})
.collect()
}
}
fn fold_live(
mut telemetry: Vec<BenchTelemetryRow>,
runs: &[(String, crate::bench::BenchRun)],
) -> Vec<LiveBench> {
telemetry.sort_by(|a, b| {
b.run_id.cmp(&a.run_id).then(a.bench.cmp(&b.bench))
});
telemetry
.into_iter()
.map(|t| {
let ok = bench_status(&t.repo, &t.bench, runs);
LiveBench {
run_id: t.run_id,
repo: t.repo,
bench: t.bench,
n_cores: t.n_cores,
cores_busy_avg: t.cores_busy_avg,
cores_busy_max: t.cores_busy_max,
cpu_pct_avg: t.cpu_pct_avg,
cpu_pct_max: t.cpu_pct_max,
mem_peak_mb: t.mem_peak_mb,
elapsed_ms: t.elapsed_ms,
ok,
}
})
.collect()
}
fn bench_status(repo: &str, bench: &str, runs: &[(String, crate::bench::BenchRun)]) -> Option<bool> {
let leaf = bench.rsplit('.').next().unwrap_or(bench);
for (_key, run) in runs {
let _ = repo;
let names_this_bench = run.results.iter().any(|r| r.name == bench || r.name == leaf);
if !names_this_bench && !run.tests.iter().any(|t| t.name == bench || t.name == leaf) {
continue;
}
for t in &run.tests {
if t.name == bench || t.name == leaf {
return Some(t.passed);
}
}
}
None
}
fn draw_live_row(ui: &mut egui::Ui, theme: &Theme, b: &LiveBench) {
ui.horizontal(|ui| {
let (mark, col) = match b.ok {
Some(true) => ("โ", GREEN),
Some(false) => ("โ", RED),
None => ("ยท", theme.text_dim),
};
ui.colored_label(col, mark);
ui.add_sized(
[180.0, 16.0],
egui::Label::new(RichText::new(&b.bench).monospace().size(12.0).color(theme.text)),
);
let busy_col = busy_color(theme, b.cores_busy_max, b.n_cores);
ui.colored_label(
busy_col,
RichText::new(format!("{}/{}", b.cores_busy_max, b.n_cores)).monospace().strong(),
);
ui.label(RichText::new("cores").color(theme.text_dim));
draw_cpu_spark(ui, theme, &b.cpu_spark());
ui.label(RichText::new(format!("{:.0}% cpu", b.cpu_pct_max)).monospace().size(11.0).color(theme.text));
if b.mem_peak_mb > 0.0 {
ui.label(RichText::new(format!("{:.0} MB", b.mem_peak_mb)).color(theme.text_dim));
}
});
}
fn busy_color(theme: &Theme, busy: u32, n: u32) -> Color32 {
if n == 0 {
return theme.text_dim;
}
let frac = (busy as f64 / n as f64).clamp(0.0, 1.0);
theme.health_color((1.0 - frac) * 100.0)
}
fn draw_cpu_spark(ui: &mut egui::Ui, theme: &Theme, series: &[f64]) {
let (rect, _) = ui.allocate_exact_size(egui::Vec2::new(46.0, 14.0), egui::Sense::hover());
if series.len() < 2 {
return;
}
let painter = ui.painter_at(rect);
let n = series.len();
let pts: Vec<egui::Pos2> = series
.iter()
.enumerate()
.map(|(i, &v)| {
let t = i as f32 / (n - 1) as f32;
let x = rect.min.x + t * rect.width();
let norm = (v.clamp(0.0, 100.0) / 100.0) as f32;
let y = rect.max.y - norm * (rect.height() - 2.0) - 1.0;
egui::Pos2::new(x, y)
})
.collect();
for w in pts.windows(2) {
painter.line_segment([w[0], w[1]], egui::Stroke::new(1.4, theme.point));
}
if let Some(last) = pts.last() {
painter.circle_filled(*last, 1.8, theme.point);
}
}
fn round2(f: f64) -> f64 {
(f * 100.0).round() / 100.0
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bench::{BenchResult, BenchRun, TestOutcome};
fn telem(run: &str, repo: &str, bench: &str, n: u32, busy_avg: f64, busy_max: u32) -> BenchTelemetryRow {
BenchTelemetryRow {
run_id: run.into(),
repo: repo.into(),
bench: bench.into(),
n_cores: n,
cpu_pct_avg: 50.0,
cpu_pct_max: 90.0,
cores_busy_avg: busy_avg,
cores_busy_max: busy_max,
mem_peak_mb: 256.0,
mem_pct_max: 30.0,
elapsed_ms: 1234.0,
}
}
fn run_with_tests(tests: Vec<(&str, bool)>, results: Vec<&str>) -> BenchRun {
BenchRun {
date: "2026-06-13".into(),
timestamp: None,
version: "1.0".into(),
machine: "host".into(),
cores: 8,
results: results
.into_iter()
.map(|n| BenchResult { name: n.into(), metrics: Default::default() })
.collect(),
tests: tests
.into_iter()
.map(|(n, ok)| TestOutcome { name: n.into(), passed: ok, duration_ms: None, message: None })
.collect(),
}
}
#[test]
fn fold_pairs_status_and_cores() {
let telem_rows = vec![telem("run-2", "znippy", "znippy.compress", 8, 6.5, 8)];
let runs = vec![(
String::new(),
run_with_tests(vec![("compress", false)], vec!["znippy.compress"]),
)];
let live = fold_live(telem_rows, &runs);
assert_eq!(live.len(), 1);
let b = &live[0];
assert_eq!(b.bench, "znippy.compress");
assert_eq!(b.ok, Some(false), "red test โ red bench");
assert_eq!(b.n_cores, 8);
assert_eq!(b.cores_busy_max, 8);
assert!((b.cores_busy_avg - 6.5).abs() < 1e-9);
assert_eq!(b.cores_label(), "cores-busy 6.5/8 (peak 8/8)");
assert_eq!(b.cpu_spark(), vec![50.0, 90.0, 100.0]);
}
#[test]
fn newest_run_first_then_bench_name() {
let telem_rows = vec![
telem("run-1", "z", "z.b", 4, 2.0, 3),
telem("run-2", "z", "z.a", 4, 1.0, 2),
telem("run-2", "z", "z.c", 4, 4.0, 4),
];
let live = fold_live(telem_rows, &[]);
assert_eq!(live[0].run_id, "run-2");
assert_eq!(live[0].bench, "z.a");
assert_eq!(live[1].bench, "z.c");
assert_eq!(live[2].run_id, "run-1");
assert_eq!(live[0].ok, None);
}
#[test]
fn inject_renders_into_state_json() {
let mut live = BenchLive::local(PathBuf::new());
live.inject_for_test(
vec![
telem("run-9", "znippy", "znippy.compress", 8, 5.0, 7),
telem("run-9", "holger", "holger.ops", 8, 3.0, 4),
],
vec![
(String::new(), run_with_tests(vec![("compress", true)], vec!["znippy.compress"])),
(String::new(), run_with_tests(vec![("ops", false)], vec!["holger.ops"])),
],
);
let v = live.state_json();
assert_eq!(v["live_run_id"], "run-9");
assert_eq!(v["count"], 2);
assert_eq!(v["green"], 1);
assert_eq!(v["red"], 1);
let benches = v["benches"].as_array().unwrap();
let compress = benches.iter().find(|b| b["bench"] == "znippy.compress").unwrap();
assert_eq!(compress["status"], "green");
assert_eq!(compress["n_cores"], 8);
assert_eq!(compress["cores_busy_max"], 7);
assert_eq!(compress["cores_label"], "cores-busy 5.0/8 (peak 7/8)");
assert_eq!(compress["cpu_spark"].as_array().unwrap().len(), 3);
let ops = benches.iter().find(|b| b["bench"] == "holger.ops").unwrap();
assert_eq!(ops["status"], "red", "failed test โ red");
assert_eq!(ops["cores_busy_max"], 4);
}
}