nornir 0.2.0

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
Documentation
//! Pure data model for the viz — no egui types here so we can
//! unit-test the loader without a display.

use std::collections::{BTreeMap, BTreeSet};
use std::path::Path;

use anyhow::Result;
use chrono::{DateTime, Utc};
use uuid::Uuid;

use crate::bench::BenchRun;
use crate::release::pipeline::{ReleaseReport, query_release_history};
use crate::warehouse::dep_graph::{DepGraphSnapshot, query_dep_graph_snapshots};
use crate::warehouse::iceberg::IcebergWarehouse;
use crate::warehouse::{BenchFilter, Warehouse};

/// One node on a repo's swim lane — a single recorded release of
/// that repo, at a given git SHA, with the gate verdict.
#[derive(Debug, Clone)]
pub struct LaneNode {
    pub release_id: Uuid,
    pub timestamp: DateTime<Utc>,
    pub sha: String,
    pub branch: String,
    pub dirty: bool,
    pub gate_status: String,
    pub tests_passed: u32,
    pub tests_failed: u32,
    pub published_versions: Vec<(String, String)>,
}

#[derive(Debug, Clone)]
pub struct Lane {
    pub repo: String,
    pub nodes: Vec<LaneNode>,
}

#[derive(Debug, Clone)]
pub struct BenchPoint {
    pub timestamp: DateTime<Utc>,
    /// Primary scalar plotted on the sparkline (the largest numeric
    /// metric on the first result — same heuristic across repos so
    /// every lane plots something).
    pub primary_metric_name: String,
    pub primary_metric_value: f64,
    /// All numeric metrics across all results, flattened for display.
    pub metrics: Vec<(String, f64)>,
    pub version: String,
    pub machine: String,
}

#[derive(Debug, Clone, Default)]
pub struct BenchHistory {
    pub repo: String,
    pub points: Vec<BenchPoint>,
}

impl BenchHistory {
    pub fn min_max(&self, metric: Option<&str>) -> Option<(f64, f64)> {
        let vals: Vec<f64> = self
            .points
            .iter()
            .filter_map(|p| match metric {
                None => Some(p.primary_metric_value),
                Some(name) => p.metrics.iter().find(|(n, _)| n == name).map(|(_, v)| *v),
            })
            .collect();
        if vals.is_empty() {
            return None;
        }
        let mn = vals.iter().cloned().fold(f64::INFINITY, f64::min);
        let mx = vals.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
        Some((mn, mx))
    }
}

#[derive(Debug, Clone)]
pub struct Timeline {
    pub workspace_name: String,
    pub lanes: Vec<Lane>,
    /// All releases keyed by id, in chronological order.
    pub release_order: Vec<Uuid>,
    /// release_id → snapshot id pinned at release time.
    pub release_snapshot: BTreeMap<Uuid, Uuid>,
    /// snapshot_id → loaded snapshot.
    pub snapshots: BTreeMap<Uuid, DepGraphSnapshot>,
    /// Most recent snapshot — used when nothing is selected.
    pub latest_snapshot: Option<DepGraphSnapshot>,
    /// Per-repo benchmark history, ordered by timestamp.
    pub bench_history: BTreeMap<String, BenchHistory>,
}

impl Timeline {
    pub fn is_empty(&self) -> bool {
        self.lanes.iter().all(|l| l.nodes.is_empty())
            && self.bench_history.values().all(|h| h.points.is_empty())
    }

    pub fn has_releases(&self) -> bool {
        !self.release_order.is_empty()
    }

    pub fn has_benches(&self) -> bool {
        self.bench_history.values().any(|h| !h.points.is_empty())
    }

    /// Snapshot pinned to `release_id`, falling back to the latest
    /// loaded snapshot if the release pre-dates lineage tracking.
    pub fn snapshot_for(&self, release_id: &Uuid) -> Option<&DepGraphSnapshot> {
        self.release_snapshot
            .get(release_id)
            .and_then(|sid| self.snapshots.get(sid))
            .or(self.latest_snapshot.as_ref())
    }
}

/// Read every recorded release for `workspace_name` and group rows
/// into per-repo lanes sorted by timestamp.
pub fn load_timeline(warehouse_root: &Path, workspace_name: &str) -> Result<Timeline> {
    let wh = IcebergWarehouse::open(warehouse_root)?;
    let reports: Vec<ReleaseReport> =
        wh.block_on(query_release_history(&wh, workspace_name, None))?;
    let snapshots_vec = wh.block_on(query_dep_graph_snapshots(&wh, workspace_name, None))?;
    let mut snapshots: BTreeMap<Uuid, DepGraphSnapshot> = BTreeMap::new();
    let latest_snapshot = snapshots_vec.last().cloned();
    for s in snapshots_vec {
        snapshots.insert(s.snapshot_id, s);
    }

    let mut by_repo: BTreeMap<String, Vec<LaneNode>> = BTreeMap::new();
    let mut release_order: Vec<Uuid> = Vec::new();
    let mut release_snapshot: BTreeMap<Uuid, Uuid> = BTreeMap::new();
    let mut seen: BTreeSet<Uuid> = BTreeSet::new();

    for rep in &reports {
        if seen.insert(rep.release_id) {
            release_order.push(rep.release_id);
            release_snapshot.insert(rep.release_id, rep.dep_graph_snapshot_id);
        }
        let when = Utc::now();
        for r in &rep.repos {
            by_repo.entry(r.repo.clone()).or_default().push(LaneNode {
                release_id: rep.release_id,
                timestamp: when,
                sha: r.git.sha.clone(),
                branch: r.git.branch.clone(),
                dirty: r.git.dirty,
                gate_status: r.gate_status.clone(),
                tests_passed: r.tests_passed,
                tests_failed: r.tests_failed,
                published_versions: r.published_versions.clone(),
            });
        }
    }

    let mut lanes: Vec<Lane> = by_repo
        .into_iter()
        .map(|(repo, mut nodes)| {
            nodes.sort_by_key(|n| n.timestamp);
            Lane { repo, nodes }
        })
        .collect();

    // Bench-only fallback: if there were no releases recorded, scan
    // the `bench_runs/repo=*/` partition directories and synthesize
    // empty lanes so the TimeTravel benchmarks reel still has data.
    if lanes.is_empty() {
        let bench_dir = warehouse_root.join("bench_runs");
        if let Ok(rd) = std::fs::read_dir(&bench_dir) {
            for entry in rd.flatten() {
                let name = entry.file_name().to_string_lossy().to_string();
                if let Some(repo) = name.strip_prefix("repo=") {
                    lanes.push(Lane { repo: repo.to_string(), nodes: Vec::new() });
                }
            }
            lanes.sort_by(|a, b| a.repo.cmp(&b.repo));
        }
    }

    // Load bench history per repo for the TimeTravel reels.
    let mut bench_history: BTreeMap<String, BenchHistory> = BTreeMap::new();
    for lane in &lanes {
        let runs = wh
            .query_bench_runs(&BenchFilter::for_repo(&lane.repo))
            .unwrap_or_default();
        let mut points: Vec<BenchPoint> = runs
            .into_iter()
            .map(|run| bench_point_from(&run))
            .collect();
        points.sort_by_key(|p| p.timestamp);
        bench_history.insert(
            lane.repo.clone(),
            BenchHistory { repo: lane.repo.clone(), points },
        );
    }

    Ok(Timeline {
        workspace_name: workspace_name.to_string(),
        lanes,
        release_order,
        release_snapshot,
        snapshots,
        latest_snapshot,
        bench_history,
    })
}

fn bench_point_from(run: &BenchRun) -> BenchPoint {
    let ts = run
        .timestamp
        .as_deref()
        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
        .map(|d| d.with_timezone(&Utc))
        .or_else(|| {
            chrono::NaiveDate::parse_from_str(&run.date, "%Y-%m-%d")
                .ok()
                .and_then(|d| d.and_hms_opt(0, 0, 0))
                .map(|n| DateTime::<Utc>::from_naive_utc_and_offset(n, Utc))
        })
        .unwrap_or_else(Utc::now);

    let mut metrics: Vec<(String, f64)> = Vec::new();
    for r in &run.results {
        for (k, v) in &r.metrics {
            if let Some(f) = v.as_f64() {
                metrics.push((format!("{}.{}", r.name, k), f));
            } else if let Some(i) = v.as_i64() {
                metrics.push((format!("{}.{}", r.name, k), i as f64));
            }
        }
    }
    let (primary_metric_name, primary_metric_value) = metrics
        .iter()
        .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
        .cloned()
        .unwrap_or_else(|| (String::from("(none)"), 0.0));

    BenchPoint {
        timestamp: ts,
        primary_metric_name,
        primary_metric_value,
        metrics,
        version: run.version.clone(),
        machine: run.machine.clone(),
    }
}