use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use eframe::egui::{self, RichText, ScrollArea};
use serde::Deserialize;
use super::facett_theme::{Theme, GREEN, RED};
use crate::warehouse::iceberg::IcebergWarehouse;
use crate::warehouse::release_events::{
self, query_release_events, EventSelector, ReleaseEventRow,
};
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "kind")]
pub enum LiveEvent {
#[serde(rename = "run_start")]
RunStart { run_id: String, workspace: String },
#[serde(rename = "repo_start")]
RepoStart { repo: String, sha: String },
#[serde(rename = "phase_start")]
PhaseStart { repo: String, phase: String },
#[serde(rename = "phase_end")]
PhaseEnd { repo: String, phase: String, ok: bool, duration_ms: u64 },
#[serde(rename = "binary_start")]
BinaryStart { repo: String, binary: String },
#[serde(rename = "test_pass")]
TestPass { repo: String, binary: String, name: String },
#[serde(rename = "test_fail")]
TestFail { repo: String, binary: String, name: String },
#[serde(rename = "binary_done")]
BinaryDone { repo: String, binary: String, passed: u32, failed: u32 },
#[serde(rename = "repo_end")]
RepoEnd { repo: String, ok: bool },
#[serde(rename = "run_end")]
RunEnd { run_id: String, ok: bool },
}
#[derive(Clone)]
enum Durable {
Local(PathBuf),
Remote { endpoint: String, token: String, workspace: String },
None,
}
#[derive(Default)]
struct Shared {
events: Vec<LiveEvent>,
err: Option<String>,
file_path: Option<PathBuf>,
eof_run_ended: bool,
hydrated_from_warehouse: bool,
hydrated_run_id: Option<String>,
}
pub struct LiveRunState {
log_dir: PathBuf,
remote: Option<(String, String)>,
durable: Durable,
shared: Arc<Mutex<Shared>>,
started: bool,
theme: Theme,
}
impl LiveRunState {
pub fn new(log_dir: PathBuf, warehouse_root: PathBuf) -> Self {
Self {
log_dir,
remote: None,
durable: Durable::Local(warehouse_root),
shared: Arc::new(Mutex::new(Shared::default())),
started: false,
theme: Theme::default(),
}
}
pub fn set_palette(&mut self, t: Theme) {
self.theme = t;
}
pub fn new_remote(endpoint: String, token: String, workspace: String) -> Self {
Self {
log_dir: PathBuf::new(),
remote: Some((endpoint.clone(), token.clone())),
durable: Durable::Remote { endpoint, token, workspace },
shared: Arc::new(Mutex::new(Shared::default())),
started: false,
theme: Theme::default(),
}
}
pub fn set_workspace(&mut self, workspace: String) {
if let Durable::Remote { workspace: w, .. } = &mut self.durable {
*w = workspace;
}
}
#[doc(hidden)]
pub fn inject_for_test(&mut self, rows: Vec<ReleaseEventRow>) {
self.durable = Durable::None;
self.started = true; let (events, run_id) = events_from_release_rows(rows);
let mut s = self.shared.lock().unwrap();
s.events = events;
s.hydrated_from_warehouse = true;
s.hydrated_run_id = run_id;
s.err = None;
}
pub fn reload(&mut self) {
{
let mut s = self.shared.lock().unwrap_or_else(|p| p.into_inner());
s.hydrated_from_warehouse = false;
s.hydrated_run_id = None;
}
self.hydrate_from_warehouse();
}
fn hydrate_from_warehouse(&mut self) {
let res = match &self.durable {
Durable::Local(root) => {
super::trace::emit_in(
"live.hydrate",
&serde_json::json!({ "source": "warehouse.release_events", "root": root.display().to_string() }),
);
IcebergWarehouse::open_read_only(root)
.and_then(|wh| wh.block_on(query_release_events(&wh, &EventSelector::All)))
}
Durable::Remote { endpoint, token, workspace } => {
super::trace::emit_in(
"live.hydrate",
&serde_json::json!({ "source": "Viz.ReleaseEvents", "endpoint": endpoint, "workspace": workspace }),
);
super::remote::fetch_release_events(endpoint, token, workspace)
}
Durable::None => return,
};
let mut s = self.shared.lock().unwrap_or_else(|p| p.into_inner());
match res {
Ok(rows) => {
let (events, run_id) = events_from_release_rows(rows);
let n = events.len();
s.events = events;
s.hydrated_from_warehouse = true;
s.hydrated_run_id = run_id.clone();
s.err = None;
super::trace::emit_out(
"live.hydrate",
&serde_json::json!({ "run_id": run_id, "events": n }),
);
}
Err(e) => {
s.err = Some(format!("{e:#}"));
super::trace::emit_out(
"live.hydrate",
&serde_json::json!({ "error": format!("{e:#}") }),
);
}
}
}
fn ensure_started(&mut self) {
if self.started {
return;
}
self.started = true;
self.hydrate_from_warehouse();
let shared = Arc::clone(&self.shared);
if let Some((endpoint, token)) = self.remote.clone() {
thread::Builder::new()
.name("nornir-live-remote".into())
.spawn(move || remote_loop(endpoint, token, shared))
.expect("spawn live remote thread");
} else {
let log_dir = self.log_dir.clone();
thread::Builder::new()
.name("nornir-live-tail".into())
.spawn(move || tail_loop(log_dir, shared))
.expect("spawn tail thread");
}
}
pub fn draw(&mut self, ui: &mut egui::Ui) {
self.ensure_started();
let theme = self.theme;
let remote_ep = self.remote.as_ref().map(|(e, _)| e.clone());
let shared = self.shared.lock().unwrap_or_else(|p| p.into_inner());
if let Some(err) = &shared.err {
ui.colored_label(RED, err);
}
ui.horizontal(|ui| {
ui.label("durable:");
match &self.durable {
Durable::Local(p) => ui.monospace(format!("warehouse {}", p.display())),
Durable::Remote { endpoint, workspace, .. } => {
ui.monospace(format!("server {endpoint} ws={workspace} (Viz.ReleaseEvents)"))
}
Durable::None => ui.monospace("(injected)"),
};
if shared.hydrated_from_warehouse {
ui.colored_label(GREEN, "✓ hydrated");
} else {
ui.weak("(loading…)");
}
});
ui.horizontal(|ui| {
if let Some(ep) = &remote_ep {
ui.label("live-tail:");
ui.monospace(format!("server {ep}"));
} else {
ui.label("live-tail:");
if let Some(p) = &shared.file_path {
ui.monospace(p.display().to_string());
} else {
ui.weak("(warehouse baseline; waiting for next release-run-*.events.ndjson)");
}
}
});
ui.separator();
let summary = RunSummary::from_events(&shared.events);
ui.horizontal(|ui| {
ui.label(RichText::new("run").strong());
ui.monospace(&summary.run_id);
ui.separator();
ui.label("workspace:");
ui.monospace(&summary.workspace);
ui.separator();
match summary.run_ok {
None => ui.colored_label(theme.accent, "● running"),
Some(true) => ui.colored_label(GREEN, "✓ done"),
Some(false) => ui.colored_label(RED, "✗ failed"),
};
});
ui.horizontal(|ui| {
ui.label(RichText::new("repo").strong());
ui.monospace(&summary.current_repo);
ui.separator();
ui.label("phase:");
ui.monospace(&summary.current_phase);
ui.separator();
ui.label("binary:");
ui.monospace(short_bin(&summary.current_binary));
});
ui.horizontal(|ui| {
ui.colored_label(GREEN, format!("✓ {}", summary.total_pass));
ui.colored_label(RED, format!("✗ {}", summary.total_fail));
});
ui.separator();
ScrollArea::vertical().auto_shrink([false, false]).stick_to_bottom(true).show(ui, |ui| {
for ev in shared.events.iter().rev().take(500).rev() {
render_event_row(ui, ev, &theme);
}
});
ui.ctx().request_repaint_after(Duration::from_millis(200));
}
pub fn state_json(&self) -> serde_json::Value {
let shared = self.shared.lock().unwrap_or_else(|p| p.into_inner());
let summary = RunSummary::from_events(&shared.events);
let lines: Vec<String> = shared.events.iter().map(event_line).collect();
serde_json::json!({
"durable_source": match &self.durable {
Durable::Local(p) => format!("warehouse {}", p.display()),
Durable::Remote { endpoint, workspace, .. } => {
format!("server {endpoint} ws={workspace} (Viz.ReleaseEvents)")
}
Durable::None => "injected".to_string(),
},
"hydrated_from_warehouse": shared.hydrated_from_warehouse,
"hydrated_run_id": shared.hydrated_run_id,
"live_tail": match &self.remote {
Some((e, _)) => format!("server {e} (Release.Progress)"),
None => shared.file_path.as_ref()
.map(|p| format!("file {}", p.display()))
.unwrap_or_else(|| "(none yet)".to_string()),
},
"error": shared.err,
"run_id": summary.run_id,
"workspace": summary.workspace,
"run_ok": summary.run_ok,
"current_repo": summary.current_repo,
"current_phase": summary.current_phase,
"total_pass": summary.total_pass,
"total_fail": summary.total_fail,
"events": shared.events.len(),
"lines": lines,
"palette": self.theme.name,
})
}
}
fn events_from_release_rows(rows: Vec<ReleaseEventRow>) -> (Vec<LiveEvent>, Option<String>) {
use release_events::{phase, status};
let mut latest: Option<(&str, i64)> = None;
for r in &rows {
let cand = (r.run_id.as_str(), r.ts_micros);
if latest.map_or(true, |(_, ts)| cand.1 >= ts) {
latest = Some(cand);
}
}
let Some((run_id, _)) = latest else {
return (Vec::new(), None);
};
let run_id = run_id.to_string();
let mut run_rows: Vec<&ReleaseEventRow> =
rows.iter().filter(|r| r.run_id == run_id).collect();
run_rows.sort_by_key(|r| r.seq);
let mut out: Vec<LiveEvent> = Vec::with_capacity(run_rows.len());
for r in run_rows {
let ev = match (r.op.as_str(), r.phase.as_str()) {
("run", p) if p == phase::START => LiveEvent::RunStart {
run_id: r.run_id.clone(),
workspace: if r.detail.is_empty() { r.run_id.clone() } else { r.detail.clone() },
},
("run", p) if p == phase::END => LiveEvent::RunEnd {
run_id: r.run_id.clone(),
ok: r.status == status::OK,
},
(_, p) if p == phase::START => LiveEvent::PhaseStart {
repo: r.component.clone(),
phase: r.op.clone(),
},
(_, p) if p == phase::END => LiveEvent::PhaseEnd {
repo: r.component.clone(),
phase: r.op.clone(),
ok: r.status == status::OK,
duration_ms: r.elapsed_ms.unwrap_or(0).max(0) as u64,
},
_ => continue, };
out.push(ev);
}
(out, Some(run_id))
}
struct RunSummary {
run_id: String,
workspace: String,
current_repo: String,
current_phase: String,
current_binary: String,
total_pass: u32,
total_fail: u32,
run_ok: Option<bool>,
}
impl RunSummary {
fn from_events(events: &[LiveEvent]) -> Self {
let mut s = RunSummary {
run_id: "—".into(),
workspace: "—".into(),
current_repo: "—".into(),
current_phase: "—".into(),
current_binary: "—".into(),
total_pass: 0,
total_fail: 0,
run_ok: None,
};
for ev in events {
match ev {
LiveEvent::RunStart { run_id, workspace } => {
s.run_id = run_id.clone();
s.workspace = workspace.clone();
s.run_ok = None;
s.total_pass = 0;
s.total_fail = 0;
}
LiveEvent::RepoStart { repo, .. } => s.current_repo = repo.clone(),
LiveEvent::PhaseStart { repo, phase } => {
s.current_repo = repo.clone();
s.current_phase = phase.clone();
}
LiveEvent::PhaseEnd { repo, phase, .. } => {
s.current_repo = repo.clone();
s.current_phase = phase.clone();
}
LiveEvent::BinaryStart { binary, .. } => s.current_binary = binary.clone(),
LiveEvent::BinaryDone { passed, failed, .. } => {
s.total_pass += passed;
s.total_fail += failed;
}
LiveEvent::TestPass { .. } => s.total_pass += 1,
LiveEvent::TestFail { .. } => s.total_fail += 1,
LiveEvent::RunEnd { ok, .. } => s.run_ok = Some(*ok),
_ => {}
}
}
s
}
}
fn event_line(ev: &LiveEvent) -> String {
match ev {
LiveEvent::RunStart { run_id, workspace } => format!("▶ run {run_id} ({workspace})"),
LiveEvent::RepoStart { repo, sha } => format!("▶ {repo} sha={}", short_sha(sha)),
LiveEvent::PhaseStart { repo, phase } => format!("… {repo} {phase} start"),
LiveEvent::PhaseEnd { repo, phase, ok, duration_ms } => format!(
"{} {repo} {phase} ({:.1}s)",
if *ok { "✓" } else { "✗" },
*duration_ms as f32 / 1000.0,
),
LiveEvent::BinaryStart { repo, binary } => format!("📂 {repo} {}", short_bin(binary)),
LiveEvent::TestPass { name, .. } => format!("✓ {name}"),
LiveEvent::TestFail { name, .. } => format!("✗ {name}"),
LiveEvent::BinaryDone { passed, failed, .. } => format!("≡ {passed} passed, {failed} failed"),
LiveEvent::RepoEnd { repo, ok } => {
format!("{} {repo} done", if *ok { "✓" } else { "✗" })
}
LiveEvent::RunEnd { ok, .. } => {
(if *ok { "✓ run complete" } else { "✗ run failed" }).to_string()
}
}
}
fn render_event_row(ui: &mut egui::Ui, ev: &LiveEvent, theme: &Theme) {
match ev {
LiveEvent::BinaryStart { repo, binary } => {
ui.horizontal(|ui| {
ui.colored_label(theme.accent, "📂");
ui.monospace(format!("{repo} {}", short_bin(binary)));
});
}
LiveEvent::PhaseStart { repo, phase } => {
ui.horizontal(|ui| {
ui.colored_label(theme.accent, "…");
ui.monospace(format!("{repo} {phase} start"));
});
}
LiveEvent::TestPass { name, .. } => {
ui.horizontal(|ui| {
ui.colored_label(GREEN, "✓");
ui.label(name);
});
}
LiveEvent::TestFail { name, .. } => {
ui.horizontal(|ui| {
ui.colored_label(RED, "✗");
ui.colored_label(RED, name);
});
}
LiveEvent::BinaryDone { passed, failed, .. } => {
ui.horizontal(|ui| {
ui.colored_label(theme.text_dim, "≡");
ui.label(format!("{passed} passed, {failed} failed"));
});
}
LiveEvent::PhaseEnd { repo, phase, ok, duration_ms } => {
let col = if *ok { GREEN } else { RED };
ui.colored_label(col, format!(
"{} {repo} {phase} ({:.1}s)",
if *ok { "✓" } else { "✗" },
*duration_ms as f32 / 1000.0,
));
}
LiveEvent::RepoStart { repo, sha } => {
ui.colored_label(theme.text, format!("▶ {repo} sha={}", short_sha(sha)));
}
LiveEvent::RunStart { run_id, workspace } => {
ui.colored_label(theme.text, format!("▶ run {run_id} ({workspace})"));
}
LiveEvent::RunEnd { ok, .. } => {
let col = if *ok { GREEN } else { RED };
ui.colored_label(col, if *ok { "✓ run complete" } else { "✗ run failed" });
}
_ => {}
}
}
fn short_bin(s: &str) -> String {
s.split_whitespace().next().unwrap_or(s).to_string()
}
fn short_sha(s: &str) -> String {
if s.chars().count() > 12 { s.chars().take(12).collect() } else { s.to_string() }
}
fn remote_loop(endpoint: String, token: String, shared: Arc<Mutex<Shared>>) {
loop {
{
let mut s = shared.lock().unwrap();
s.events.clear();
s.err = None;
s.eof_run_ended = false;
s.file_path = None;
}
let sink = Arc::clone(&shared);
let res = super::remote::stream_progress(&endpoint, &token, move |ev| {
let mut s = sink.lock().unwrap();
if let LiveEvent::RunEnd { .. } = &ev {
s.eof_run_ended = true;
}
s.events.push(ev);
});
if let Err(e) = res {
shared.lock().unwrap().err = Some(format!("stream from {endpoint}: {e:#}"));
}
thread::sleep(Duration::from_secs(2));
}
}
fn tail_loop(log_dir: PathBuf, shared: Arc<Mutex<Shared>>) {
let mut current: Option<(PathBuf, BufReader<File>)> = None;
loop {
if let Some(newest) = newest_events_file(&log_dir) {
let pick = match ¤t {
Some((p, _)) if *p == newest => None,
_ => Some(newest),
};
if let Some(path) = pick {
match File::open(&path) {
Ok(mut f) => {
let _ = f.seek(SeekFrom::Start(0));
let mut s = shared.lock().unwrap();
s.events.clear();
s.err = None;
s.eof_run_ended = false;
s.file_path = Some(path.clone());
drop(s);
current = Some((path, BufReader::new(f)));
}
Err(e) => {
shared.lock().unwrap().err = Some(format!("open failed: {e}"));
}
}
}
}
if let Some((_, reader)) = current.as_mut() {
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line) {
Ok(0) => break, Ok(_) => {
let trimmed = line.trim_end_matches(['\r', '\n']);
if trimmed.is_empty() { continue; }
match serde_json::from_str::<LiveEvent>(trimmed) {
Ok(ev) => {
let mut s = shared.lock().unwrap();
if let LiveEvent::RunEnd { .. } = &ev {
s.eof_run_ended = true;
}
s.events.push(ev);
}
Err(_) => {
}
}
}
Err(e) => {
shared.lock().unwrap().err = Some(format!("read failed: {e}"));
break;
}
}
}
}
thread::sleep(Duration::from_millis(250));
}
}
fn newest_events_file(log_dir: &Path) -> Option<PathBuf> {
let rd = std::fs::read_dir(log_dir).ok()?;
let mut best: Option<(std::time::SystemTime, PathBuf)> = None;
for entry in rd.flatten() {
let p = entry.path();
let ok_name = p
.file_name()
.and_then(|s| s.to_str())
.map(|s| s.starts_with("release-run-") && s.ends_with(".events.ndjson"))
.unwrap_or(false);
if !ok_name { continue; }
if let Ok(m) = entry.metadata() {
if let Ok(t) = m.modified() {
if best.as_ref().map_or(true, |(bt, _)| t > *bt) {
best = Some((t, p));
}
}
}
}
best.map(|(_, p)| p)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::warehouse::release_events::{phase, status};
fn row(
run: &str,
seq: i64,
comp: &str,
op: &str,
ph: &str,
st: &str,
ms: Option<i64>,
detail: &str,
) -> ReleaseEventRow {
ReleaseEventRow {
run_id: run.into(),
seq,
ts_micros: seq * 1_000,
component: comp.into(),
repo: comp.into(),
op: op.into(),
phase: ph.into(),
status: st.into(),
detail: detail.into(),
depends_on: None,
elapsed_ms: ms,
}
}
#[test]
fn release_rows_become_live_events_for_newest_run() {
let rows = vec![
row("old", 0, "old", "run", phase::START, status::RUNNING, None, "wsA"),
row("run-1", 0, "run-1", "run", phase::START, status::RUNNING, None, "holger"),
row("run-1", 2, "znippy", "test", phase::END, status::OK, Some(1200), "3 passed"),
row("run-1", 1, "znippy", "test", phase::START, status::RUNNING, None, ""),
row("run-1", 3, "run-1", "run", phase::END, status::OK, None, ""),
];
let (events, run_id) = events_from_release_rows(rows);
assert_eq!(run_id.as_deref(), Some("run-1"), "newest run hydrated");
let s = RunSummary::from_events(&events);
assert_eq!(s.run_id, "run-1");
assert_eq!(s.workspace, "holger", "run/start detail → workspace label");
assert_eq!(s.run_ok, Some(true), "run/end ok → done");
assert_eq!(s.current_repo, "znippy");
let lines: Vec<String> = events.iter().map(event_line).collect();
assert!(lines.iter().any(|l| l == "… znippy test start"), "got {lines:?}");
assert!(lines.iter().any(|l| l == "✓ znippy test (1.2s)"), "got {lines:?}");
assert!(lines.iter().any(|l| l == "✓ run complete"), "got {lines:?}");
}
#[test]
fn inject_hydrates_state_json_without_warehouse() {
let mut live = LiveRunState::new(PathBuf::new(), PathBuf::new());
live.inject_for_test(vec![
row("r", 0, "r", "run", phase::START, status::RUNNING, None, "ws"),
row("r", 1, "znippy", "test", phase::END, status::OK, Some(500), ""),
]);
let v = live.state_json();
assert_eq!(v["hydrated_from_warehouse"], true);
assert_eq!(v["hydrated_run_id"], "r");
assert_eq!(v["run_id"], "r");
assert_eq!(v["workspace"], "ws");
let lines: Vec<&str> = v["lines"].as_array().unwrap().iter().filter_map(|l| l.as_str()).collect();
assert!(lines.iter().any(|l| *l == "✓ znippy test (0.5s)"), "got {lines:?}");
}
}