use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use eframe::egui::{self, Color32, RichText, ScrollArea};
use serde::Deserialize;
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "kind")]
pub enum LiveEvent {
#[serde(rename = "run_start")]
RunStart { run_id: String, workspace: String },
#[serde(rename = "repo_start")]
RepoStart { repo: String, sha: String },
#[serde(rename = "phase_start")]
PhaseStart { repo: String, phase: String },
#[serde(rename = "phase_end")]
PhaseEnd { repo: String, phase: String, ok: bool, duration_ms: u64 },
#[serde(rename = "binary_start")]
BinaryStart { repo: String, binary: String },
#[serde(rename = "test_pass")]
TestPass { repo: String, binary: String, name: String },
#[serde(rename = "test_fail")]
TestFail { repo: String, binary: String, name: String },
#[serde(rename = "binary_done")]
BinaryDone { repo: String, binary: String, passed: u32, failed: u32 },
#[serde(rename = "repo_end")]
RepoEnd { repo: String, ok: bool },
#[serde(rename = "run_end")]
RunEnd { run_id: String, ok: bool },
}
#[derive(Default)]
struct Shared {
events: Vec<LiveEvent>,
err: Option<String>,
file_path: Option<PathBuf>,
eof_run_ended: bool,
}
pub struct LiveRunState {
log_dir: PathBuf,
remote: Option<(String, String)>,
shared: Arc<Mutex<Shared>>,
started: bool,
}
impl LiveRunState {
pub fn new(log_dir: PathBuf) -> Self {
Self {
log_dir,
remote: None,
shared: Arc::new(Mutex::new(Shared::default())),
started: false,
}
}
pub fn new_remote(endpoint: String, token: String) -> Self {
Self {
log_dir: PathBuf::new(),
remote: Some((endpoint, token)),
shared: Arc::new(Mutex::new(Shared::default())),
started: false,
}
}
fn ensure_started(&mut self) {
if self.started { return; }
self.started = true;
let shared = Arc::clone(&self.shared);
if let Some((endpoint, token)) = self.remote.clone() {
thread::Builder::new()
.name("nornir-live-remote".into())
.spawn(move || remote_loop(endpoint, token, shared))
.expect("spawn live remote thread");
} else {
let log_dir = self.log_dir.clone();
thread::Builder::new()
.name("nornir-live-tail".into())
.spawn(move || tail_loop(log_dir, shared))
.expect("spawn tail thread");
}
}
pub fn draw(&mut self, ui: &mut egui::Ui) {
self.ensure_started();
let remote_ep = self.remote.as_ref().map(|(e, _)| e.clone());
let shared = self.shared.lock().unwrap_or_else(|p| p.into_inner());
if let Some(err) = &shared.err {
ui.colored_label(Color32::RED, err);
}
ui.horizontal(|ui| {
if let Some(ep) = &remote_ep {
ui.label("streaming:");
ui.monospace(format!("server {ep}"));
} else {
ui.label("tailing:");
if let Some(p) = &shared.file_path {
ui.monospace(p.display().to_string());
} else {
ui.label("(waiting for next release-run-*.events.ndjson)");
}
}
});
ui.separator();
let mut run_id = String::from("—");
let mut workspace = String::from("—");
let mut current_repo = String::from("—");
let mut current_phase = String::from("—");
let mut current_binary = String::from("—");
let mut total_pass = 0u32;
let mut total_fail = 0u32;
let mut run_ok: Option<bool> = None;
for ev in &shared.events {
match ev {
LiveEvent::RunStart { run_id: r, workspace: w } => {
run_id = r.clone();
workspace = w.clone();
run_ok = None;
total_pass = 0;
total_fail = 0;
}
LiveEvent::RepoStart { repo, .. } => current_repo = repo.clone(),
LiveEvent::PhaseStart { phase, .. } => current_phase = phase.clone(),
LiveEvent::BinaryStart { binary, .. } => current_binary = binary.clone(),
LiveEvent::BinaryDone { passed, failed, .. } => {
total_pass += passed;
total_fail += failed;
}
LiveEvent::RunEnd { ok, .. } => run_ok = Some(*ok),
_ => {}
}
}
ui.horizontal(|ui| {
ui.label(RichText::new("run").strong());
ui.monospace(&run_id);
ui.separator();
ui.label("workspace:");
ui.monospace(&workspace);
ui.separator();
match run_ok {
None => ui.colored_label(Color32::LIGHT_BLUE, "● running"),
Some(true) => ui.colored_label(Color32::LIGHT_GREEN, "✓ done"),
Some(false) => ui.colored_label(Color32::LIGHT_RED, "✗ failed"),
};
});
ui.horizontal(|ui| {
ui.label(RichText::new("repo").strong());
ui.monospace(¤t_repo);
ui.separator();
ui.label("phase:");
ui.monospace(¤t_phase);
ui.separator();
ui.label("binary:");
ui.monospace(short_bin(¤t_binary));
});
ui.horizontal(|ui| {
ui.colored_label(Color32::LIGHT_GREEN, format!("✓ {total_pass}"));
ui.colored_label(Color32::LIGHT_RED, format!("✗ {total_fail}"));
});
ui.separator();
ScrollArea::vertical().auto_shrink([false, false]).stick_to_bottom(true).show(ui, |ui| {
for ev in shared.events.iter().rev().take(500).rev() {
render_event_row(ui, ev);
}
});
ui.ctx().request_repaint_after(Duration::from_millis(200));
}
}
fn render_event_row(ui: &mut egui::Ui, ev: &LiveEvent) {
match ev {
LiveEvent::BinaryStart { repo, binary } => {
ui.horizontal(|ui| {
ui.colored_label(Color32::LIGHT_BLUE, "📂");
ui.monospace(format!("{repo} {}", short_bin(binary)));
});
}
LiveEvent::TestPass { name, .. } => {
ui.horizontal(|ui| {
ui.colored_label(Color32::LIGHT_GREEN, "✓");
ui.label(name);
});
}
LiveEvent::TestFail { name, .. } => {
ui.horizontal(|ui| {
ui.colored_label(Color32::LIGHT_RED, "✗");
ui.colored_label(Color32::LIGHT_RED, name);
});
}
LiveEvent::BinaryDone { passed, failed, .. } => {
ui.horizontal(|ui| {
ui.colored_label(Color32::GRAY, "≡");
ui.label(format!("{passed} passed, {failed} failed"));
});
}
LiveEvent::PhaseEnd { repo, phase, ok, duration_ms } => {
let col = if *ok { Color32::LIGHT_GREEN } else { Color32::LIGHT_RED };
ui.colored_label(col, format!(
"{} {repo} {phase} ({:.1}s)",
if *ok { "✓" } else { "✗" },
*duration_ms as f32 / 1000.0,
));
}
LiveEvent::RepoStart { repo, sha } => {
ui.colored_label(Color32::WHITE, format!("▶ {repo} sha={}", short_sha(sha)));
}
LiveEvent::RunEnd { ok, .. } => {
let col = if *ok { Color32::LIGHT_GREEN } else { Color32::LIGHT_RED };
ui.colored_label(col, if *ok { "✓ run complete" } else { "✗ run failed" });
}
_ => {}
}
}
fn short_bin(s: &str) -> String {
s.split_whitespace().next().unwrap_or(s).to_string()
}
fn short_sha(s: &str) -> String {
if s.chars().count() > 12 { s.chars().take(12).collect() } else { s.to_string() }
}
fn remote_loop(endpoint: String, token: String, shared: Arc<Mutex<Shared>>) {
loop {
{
let mut s = shared.lock().unwrap();
s.events.clear();
s.err = None;
s.eof_run_ended = false;
s.file_path = None;
}
let sink = Arc::clone(&shared);
let res = super::remote::stream_progress(&endpoint, &token, move |ev| {
let mut s = sink.lock().unwrap();
if let LiveEvent::RunEnd { .. } = &ev {
s.eof_run_ended = true;
}
s.events.push(ev);
});
if let Err(e) = res {
shared.lock().unwrap().err = Some(format!("stream from {endpoint}: {e:#}"));
}
thread::sleep(Duration::from_secs(2));
}
}
fn tail_loop(log_dir: PathBuf, shared: Arc<Mutex<Shared>>) {
let mut current: Option<(PathBuf, BufReader<File>)> = None;
loop {
if let Some(newest) = newest_events_file(&log_dir) {
let pick = match ¤t {
Some((p, _)) if *p == newest => None,
_ => Some(newest),
};
if let Some(path) = pick {
match File::open(&path) {
Ok(mut f) => {
let _ = f.seek(SeekFrom::Start(0));
let mut s = shared.lock().unwrap();
s.events.clear();
s.err = None;
s.eof_run_ended = false;
s.file_path = Some(path.clone());
drop(s);
current = Some((path, BufReader::new(f)));
}
Err(e) => {
shared.lock().unwrap().err = Some(format!("open failed: {e}"));
}
}
}
}
if let Some((_, reader)) = current.as_mut() {
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line) {
Ok(0) => break, Ok(_) => {
let trimmed = line.trim_end_matches(['\r', '\n']);
if trimmed.is_empty() { continue; }
match serde_json::from_str::<LiveEvent>(trimmed) {
Ok(ev) => {
let mut s = shared.lock().unwrap();
if let LiveEvent::RunEnd { .. } = &ev {
s.eof_run_ended = true;
}
s.events.push(ev);
}
Err(_) => {
}
}
}
Err(e) => {
shared.lock().unwrap().err = Some(format!("read failed: {e}"));
break;
}
}
}
}
thread::sleep(Duration::from_millis(250));
}
}
fn newest_events_file(log_dir: &Path) -> Option<PathBuf> {
let rd = std::fs::read_dir(log_dir).ok()?;
let mut best: Option<(std::time::SystemTime, PathBuf)> = None;
for entry in rd.flatten() {
let p = entry.path();
let ok_name = p
.file_name()
.and_then(|s| s.to_str())
.map(|s| s.starts_with("release-run-") && s.ends_with(".events.ndjson"))
.unwrap_or(false);
if !ok_name { continue; }
if let Ok(m) = entry.metadata() {
if let Ok(t) = m.modified() {
if best.as_ref().map_or(true, |(bt, _)| t > *bt) {
best = Some((t, p));
}
}
}
}
best.map(|(_, p)| p)
}