nornir 0.1.0

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
Documentation
//! Live release run pane — tails `<workspace>/.nornir/logs/release-run-*.events.ndjson`
//! in a background `std::thread` and renders the stream into the
//! egui pane next to Time Travel.
//!
//! No tokio dep, no SSE client: eframe's `App::update` runs sync at
//! ~60 Hz, and the producer is on the same machine for local dev.
//! The server's SSE endpoint is the same data over the wire for
//! future remote consumption — same JSON, same semantics — but the
//! egui pane doesn't need it when the file is local.

use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

use eframe::egui::{self, Color32, RichText, ScrollArea};
use serde::Deserialize;

#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "kind")]
pub enum LiveEvent {
    #[serde(rename = "run_start")]
    RunStart { run_id: String, workspace: String },
    #[serde(rename = "repo_start")]
    RepoStart { repo: String, sha: String },
    #[serde(rename = "phase_start")]
    PhaseStart { repo: String, phase: String },
    #[serde(rename = "phase_end")]
    PhaseEnd { repo: String, phase: String, ok: bool, duration_ms: u64 },
    #[serde(rename = "binary_start")]
    BinaryStart { repo: String, binary: String },
    #[serde(rename = "test_pass")]
    TestPass { repo: String, binary: String, name: String },
    #[serde(rename = "test_fail")]
    TestFail { repo: String, binary: String, name: String },
    #[serde(rename = "binary_done")]
    BinaryDone { repo: String, binary: String, passed: u32, failed: u32 },
    #[serde(rename = "repo_end")]
    RepoEnd { repo: String, ok: bool },
    #[serde(rename = "run_end")]
    RunEnd { run_id: String, ok: bool },
}

#[derive(Default)]
struct Shared {
    events: Vec<LiveEvent>,
    err: Option<String>,
    file_path: Option<PathBuf>,
    eof_run_ended: bool,
}

pub struct LiveRunState {
    log_dir: PathBuf,
    shared: Arc<Mutex<Shared>>,
    started: bool,
}

impl LiveRunState {
    /// `log_dir` is `<workspace_root>/.nornir/logs`. Construction does
    /// not start the tail — that happens on first `draw()` so a user
    /// who never opens the tab pays nothing.
    pub fn new(log_dir: PathBuf) -> Self {
        Self {
            log_dir,
            shared: Arc::new(Mutex::new(Shared::default())),
            started: false,
        }
    }

    fn ensure_started(&mut self) {
        if self.started { return; }
        self.started = true;
        let shared = Arc::clone(&self.shared);
        let log_dir = self.log_dir.clone();
        thread::Builder::new()
            .name("nornir-live-tail".into())
            .spawn(move || tail_loop(log_dir, shared))
            .expect("spawn tail thread");
    }

    pub fn draw(&mut self, ui: &mut egui::Ui) {
        self.ensure_started();
        let shared = self.shared.lock().unwrap();

        if let Some(err) = &shared.err {
            ui.colored_label(Color32::RED, err);
        }
        ui.horizontal(|ui| {
            ui.label("tailing:");
            if let Some(p) = &shared.file_path {
                ui.monospace(p.display().to_string());
            } else {
                ui.label("(waiting for next release-run-*.events.ndjson)");
            }
        });
        ui.separator();

        // Header: latest known run + currently-active binary + repo + phase.
        let mut run_id = String::from("");
        let mut workspace = String::from("");
        let mut current_repo = String::from("");
        let mut current_phase = String::from("");
        let mut current_binary = String::from("");
        let mut total_pass = 0u32;
        let mut total_fail = 0u32;
        let mut run_ok: Option<bool> = None;
        for ev in &shared.events {
            match ev {
                LiveEvent::RunStart { run_id: r, workspace: w } => {
                    run_id = r.clone();
                    workspace = w.clone();
                    run_ok = None;
                    total_pass = 0;
                    total_fail = 0;
                }
                LiveEvent::RepoStart { repo, .. } => current_repo = repo.clone(),
                LiveEvent::PhaseStart { phase, .. } => current_phase = phase.clone(),
                LiveEvent::BinaryStart { binary, .. } => current_binary = binary.clone(),
                LiveEvent::BinaryDone { passed, failed, .. } => {
                    total_pass += passed;
                    total_fail += failed;
                }
                LiveEvent::RunEnd { ok, .. } => run_ok = Some(*ok),
                _ => {}
            }
        }

        ui.horizontal(|ui| {
            ui.label(RichText::new("run").strong());
            ui.monospace(&run_id);
            ui.separator();
            ui.label("workspace:");
            ui.monospace(&workspace);
            ui.separator();
            match run_ok {
                None => ui.colored_label(Color32::LIGHT_BLUE, "● running"),
                Some(true) => ui.colored_label(Color32::LIGHT_GREEN, "✓ done"),
                Some(false) => ui.colored_label(Color32::LIGHT_RED, "✗ failed"),
            };
        });
        ui.horizontal(|ui| {
            ui.label(RichText::new("repo").strong());
            ui.monospace(&current_repo);
            ui.separator();
            ui.label("phase:");
            ui.monospace(&current_phase);
            ui.separator();
            ui.label("binary:");
            ui.monospace(short_bin(&current_binary));
        });
        ui.horizontal(|ui| {
            ui.colored_label(Color32::LIGHT_GREEN, format!("{total_pass}"));
            ui.colored_label(Color32::LIGHT_RED, format!("{total_fail}"));
        });
        ui.separator();

        // Scrolling log: most recent at top, last 500 lines.
        ScrollArea::vertical().auto_shrink([false, false]).stick_to_bottom(true).show(ui, |ui| {
            for ev in shared.events.iter().rev().take(500).rev() {
                render_event_row(ui, ev);
            }
        });

        // Drive the next frame so live updates appear without user input.
        ui.ctx().request_repaint_after(Duration::from_millis(200));
    }
}

fn render_event_row(ui: &mut egui::Ui, ev: &LiveEvent) {
    match ev {
        LiveEvent::BinaryStart { repo, binary } => {
            ui.horizontal(|ui| {
                ui.colored_label(Color32::LIGHT_BLUE, "📂");
                ui.monospace(format!("{repo}  {}", short_bin(binary)));
            });
        }
        LiveEvent::TestPass { name, .. } => {
            ui.horizontal(|ui| {
                ui.colored_label(Color32::LIGHT_GREEN, "");
                ui.label(name);
            });
        }
        LiveEvent::TestFail { name, .. } => {
            ui.horizontal(|ui| {
                ui.colored_label(Color32::LIGHT_RED, "");
                ui.colored_label(Color32::LIGHT_RED, name);
            });
        }
        LiveEvent::BinaryDone { passed, failed, .. } => {
            ui.horizontal(|ui| {
                ui.colored_label(Color32::GRAY, "");
                ui.label(format!("{passed} passed, {failed} failed"));
            });
        }
        LiveEvent::PhaseEnd { repo, phase, ok, duration_ms } => {
            let col = if *ok { Color32::LIGHT_GREEN } else { Color32::LIGHT_RED };
            ui.colored_label(col, format!(
                "{} {repo} {phase} ({:.1}s)",
                if *ok { "" } else { "" },
                *duration_ms as f32 / 1000.0,
            ));
        }
        LiveEvent::RepoStart { repo, sha } => {
            ui.colored_label(Color32::WHITE, format!("{repo}  sha={}", short_sha(sha)));
        }
        LiveEvent::RunEnd { ok, .. } => {
            let col = if *ok { Color32::LIGHT_GREEN } else { Color32::LIGHT_RED };
            ui.colored_label(col, if *ok { "✓ run complete" } else { "✗ run failed" });
        }
        _ => {}
    }
}

fn short_bin(s: &str) -> String {
    // cargo prints `Running tests/foo.rs (target/debug/deps/foo-abcd)`
    // — keep the unittests / tests/<name> half, drop the deps path.
    s.split_whitespace().next().unwrap_or(s).to_string()
}

fn short_sha(s: &str) -> String {
    if s.len() > 12 { s[..12].to_string() } else { s.to_string() }
}

fn tail_loop(log_dir: PathBuf, shared: Arc<Mutex<Shared>>) {
    let mut current: Option<(PathBuf, BufReader<File>)> = None;
    loop {
        // Re-scan for the newest events file every cycle so a fresh
        // run (new timestamp suffix) supersedes the previous one.
        if let Some(newest) = newest_events_file(&log_dir) {
            let pick = match &current {
                Some((p, _)) if *p == newest => None,
                _ => Some(newest),
            };
            if let Some(path) = pick {
                match File::open(&path) {
                    Ok(mut f) => {
                        let _ = f.seek(SeekFrom::Start(0));
                        let mut s = shared.lock().unwrap();
                        s.events.clear();
                        s.err = None;
                        s.eof_run_ended = false;
                        s.file_path = Some(path.clone());
                        drop(s);
                        current = Some((path, BufReader::new(f)));
                    }
                    Err(e) => {
                        shared.lock().unwrap().err = Some(format!("open failed: {e}"));
                    }
                }
            }
        }

        if let Some((_, reader)) = current.as_mut() {
            let mut line = String::new();
            loop {
                line.clear();
                match reader.read_line(&mut line) {
                    Ok(0) => break, // EOF — wait, then retry / re-scan
                    Ok(_) => {
                        let trimmed = line.trim_end_matches(['\r', '\n']);
                        if trimmed.is_empty() { continue; }
                        match serde_json::from_str::<LiveEvent>(trimmed) {
                            Ok(ev) => {
                                let mut s = shared.lock().unwrap();
                                if let LiveEvent::RunEnd { .. } = &ev {
                                    s.eof_run_ended = true;
                                }
                                s.events.push(ev);
                            }
                            Err(_) => {
                                // Forward-compat: ignore unknown variants.
                            }
                        }
                    }
                    Err(e) => {
                        shared.lock().unwrap().err = Some(format!("read failed: {e}"));
                        break;
                    }
                }
            }
        }

        thread::sleep(Duration::from_millis(250));
    }
}

fn newest_events_file(log_dir: &Path) -> Option<PathBuf> {
    let rd = std::fs::read_dir(log_dir).ok()?;
    let mut best: Option<(std::time::SystemTime, PathBuf)> = None;
    for entry in rd.flatten() {
        let p = entry.path();
        let ok_name = p
            .file_name()
            .and_then(|s| s.to_str())
            .map(|s| s.starts_with("release-run-") && s.ends_with(".events.ndjson"))
            .unwrap_or(false);
        if !ok_name { continue; }
        if let Ok(m) = entry.metadata() {
            if let Ok(t) = m.modified() {
                if best.as_ref().map_or(true, |(bt, _)| t > *bt) {
                    best = Some((t, p));
                }
            }
        }
    }
    best.map(|(_, p)| p)
}