nornir 0.4.32

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
//! Pure data model for the viz — no egui types here so we can
//! unit-test the loader without a display.

use std::collections::{BTreeMap, BTreeSet};
use std::path::Path;

use anyhow::Result;
use chrono::{DateTime, Utc};
use uuid::Uuid;

use crate::bench::BenchRun;
use crate::release::pipeline::{ReleaseReport, query_release_history};
use crate::warehouse::dep_graph::{DepGraphSnapshot, query_dep_graph_snapshots};
use crate::warehouse::iceberg::IcebergWarehouse;
use crate::warehouse::{BenchFilter, Warehouse};

/// One node on a repo's swim lane — a single recorded release of
/// that repo, at a given git SHA, with the gate verdict.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct LaneNode {
    pub release_id: Uuid,
    pub timestamp: DateTime<Utc>,
    pub sha: String,
    pub branch: String,
    pub dirty: bool,
    pub gate_status: String,
    pub tests_passed: u32,
    pub tests_failed: u32,
    pub published_versions: Vec<(String, String)>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Lane {
    pub repo: String,
    pub nodes: Vec<LaneNode>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct BenchPoint {
    pub timestamp: DateTime<Utc>,
    /// Primary scalar plotted on the sparkline (the largest numeric
    /// metric on the first result — same heuristic across repos so
    /// every lane plots something).
    pub primary_metric_name: String,
    pub primary_metric_value: f64,
    /// All numeric metrics across all results, flattened for display.
    pub metrics: Vec<(String, f64)>,
    pub version: String,
    pub machine: String,
}

#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct BenchHistory {
    pub repo: String,
    pub points: Vec<BenchPoint>,
}

impl BenchHistory {
    pub fn min_max(&self, metric: Option<&str>) -> Option<(f64, f64)> {
        let vals: Vec<f64> = self
            .points
            .iter()
            .filter_map(|p| match metric {
                None => Some(p.primary_metric_value),
                Some(name) => p.metrics.iter().find(|(n, _)| n == name).map(|(_, v)| *v),
            })
            .collect();
        if vals.is_empty() {
            return None;
        }
        let mn = vals.iter().cloned().fold(f64::INFINITY, f64::min);
        let mx = vals.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
        Some((mn, mx))
    }
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Timeline {
    pub workspace_name: String,
    pub lanes: Vec<Lane>,
    /// All releases keyed by id, in chronological order.
    pub release_order: Vec<Uuid>,
    /// release_id → snapshot id pinned at release time.
    pub release_snapshot: BTreeMap<Uuid, Uuid>,
    /// snapshot_id → loaded snapshot.
    pub snapshots: BTreeMap<Uuid, DepGraphSnapshot>,
    /// Most recent snapshot — used when nothing is selected.
    pub latest_snapshot: Option<DepGraphSnapshot>,
    /// Per-repo benchmark history, ordered by timestamp.
    pub bench_history: BTreeMap<String, BenchHistory>,
}

impl Timeline {
    pub fn is_empty(&self) -> bool {
        self.lanes.iter().all(|l| l.nodes.is_empty())
            && self.bench_history.values().all(|h| h.points.is_empty())
    }

    pub fn has_releases(&self) -> bool {
        !self.release_order.is_empty()
    }

    pub fn has_benches(&self) -> bool {
        self.bench_history.values().any(|h| !h.points.is_empty())
    }

    /// Snapshot pinned to `release_id`, falling back to the latest
    /// loaded snapshot if the release pre-dates lineage tracking.
    pub fn snapshot_for(&self, release_id: &Uuid) -> Option<&DepGraphSnapshot> {
        self.release_snapshot
            .get(release_id)
            .and_then(|sid| self.snapshots.get(sid))
            .or(self.latest_snapshot.as_ref())
    }
}

/// Read every recorded release for `workspace_name` and group rows
/// into per-repo lanes sorted by timestamp.
/// Parse a release tag into `(crate, version)`: `skade-v0.4.8` → `("skade",
/// "0.4.8")`, `v0.1.5` → `("", "0.1.5")`, otherwise `(tag, "")`.
fn parse_tag_version(tag: &str) -> (String, String) {
    if let Some(idx) = tag.rfind("-v") {
        let ver = &tag[idx + 2..];
        if ver.chars().next().is_some_and(|c| c.is_ascii_digit()) {
            return (tag[..idx].to_string(), ver.to_string());
        }
    }
    if let Some(ver) = tag.strip_prefix('v') {
        if ver.chars().next().is_some_and(|c| c.is_ascii_digit()) {
            return (String::new(), ver.to_string());
        }
    }
    (tag.to_string(), String::new())
}

/// Read every member git repo under `git_dir` and synthesize one [`Lane`] per
/// repo from its **tags** — the whole-git-history fallback for a workspace with
/// no recorded releases. Best-effort: unreadable repos/tags are skipped.
fn git_history_lanes(git_dir: &Path) -> Vec<Lane> {
    let mut lanes = Vec::new();
    let Ok(rd) = std::fs::read_dir(git_dir) else { return lanes };
    for entry in rd.flatten() {
        let repo_path = entry.path();
        if !repo_path.is_dir() {
            continue;
        }
        let repo_name = entry.file_name().to_string_lossy().to_string();
        let repo = match gix::open(&repo_path) {
            Ok(r) => r,
            Err(e) => {
                eprintln!("git-fallback: open {repo_path:?} failed: {e}");
                continue;
            }
        };
        let platform = match repo.references() {
            Ok(p) => p,
            Err(e) => {
                eprintln!("git-fallback: references {repo_name} failed: {e}");
                continue;
            }
        };
        let tags = match platform.tags() {
            Ok(t) => t,
            Err(e) => {
                eprintln!("git-fallback: tags {repo_name} failed: {e}");
                continue;
            }
        };
        let mut nodes = Vec::new();
        for tag in tags.flatten() {
            let tag_name = tag.name().shorten().to_string();
            let Ok(id) = tag.into_fully_peeled_id() else { continue };
            let sha = id.to_hex().to_string();
            let Ok(obj) = id.object() else { continue };
            let Ok(commit) = obj.try_into_commit() else { continue };
            let secs = commit.time().map(|t| t.seconds).unwrap_or(0);
            let timestamp = DateTime::<Utc>::from_timestamp(secs, 0).unwrap_or_else(Utc::now);
            let (crate_name, version) = parse_tag_version(&tag_name);
            let published_versions =
                if version.is_empty() { Vec::new() } else { vec![(crate_name, version)] };
            nodes.push(LaneNode {
                release_id: Uuid::new_v4(),
                timestamp,
                sha,
                branch: tag_name,
                dirty: false,
                gate_status: "succeeded".to_string(),
                tests_passed: 0,
                tests_failed: 0,
                published_versions,
            });
        }
        // Success-path observability: one line per member so the journal shows
        // the fallback firing and how many tags each repo contributed (the error
        // lines above only fire on failure — a clean run was otherwise silent).
        eprintln!("git-fallback: {repo_name}{} tags", nodes.len());
        if !nodes.is_empty() {
            nodes.sort_by_key(|n| n.timestamp);
            lanes.push(Lane { repo: repo_name, nodes });
        }
    }
    lanes.sort_by(|a, b| a.repo.cmp(&b.repo));
    eprintln!("git-fallback: {git_dir:?}{} repo lane(s) with tags", lanes.len());
    lanes
}

pub fn load_timeline(warehouse_root: &Path, workspace_name: &str) -> Result<Timeline> {
    let wh = IcebergWarehouse::open(warehouse_root)?;
    build_timeline(&wh, workspace_name, Some(warehouse_root))
}

/// Core timeline builder over an already-open warehouse handle. Shared by the
/// local loader (above) and the server's `Viz.Timeline` RPC, so the viz can run
/// either embedded (own warehouse) or as a thin gRPC client. `bench_fallback_dir`
/// is the warehouse root used only to synthesize empty lanes for repos that have
/// bench rows but no recorded release; pass `None` when the path isn't known
/// (e.g. server-side) — the fallback is then skipped (a cosmetic nicety only).
pub fn build_timeline(
    wh: &IcebergWarehouse,
    workspace_name: &str,
    bench_fallback_dir: Option<&Path>,
) -> Result<Timeline> {
    let reports: Vec<ReleaseReport> =
        wh.block_on(query_release_history(wh, workspace_name, None))?;
    let snapshots_vec = wh.block_on(query_dep_graph_snapshots(wh, workspace_name, None))?;
    let mut snapshots: BTreeMap<Uuid, DepGraphSnapshot> = BTreeMap::new();
    let latest_snapshot = snapshots_vec.last().cloned();
    for s in snapshots_vec {
        snapshots.insert(s.snapshot_id, s);
    }

    let mut by_repo: BTreeMap<String, Vec<LaneNode>> = BTreeMap::new();
    let mut release_order: Vec<Uuid> = Vec::new();
    let mut release_snapshot: BTreeMap<Uuid, Uuid> = BTreeMap::new();
    let mut seen: BTreeSet<Uuid> = BTreeSet::new();

    for rep in &reports {
        if seen.insert(rep.release_id) {
            release_order.push(rep.release_id);
            release_snapshot.insert(rep.release_id, rep.dep_graph_snapshot_id);
        }
        // Real release timestamp. The release_lineage table persists one
        // (`query_release_history` even reads it into its grouping key), but
        // `ReleaseReport` has no `timestamp` field to carry it out — so the viz
        // can't see it directly. Each release pins the dep-graph snapshot
        // captured during the SAME release run, and that `DepGraphSnapshot`
        // *does* reach us with a real `timestamp`; use it as the release's
        // recorded time. Falls back to `Utc::now()` only for pre-1.x rows whose
        // snapshot id is `Uuid::nil()` (no snapshot was captured).
        //
        // TODO: the exact fix is to add a `timestamp: DateTime<Utc>` field to
        // `ReleaseReport` (src/release/pipeline.rs) populated from the lineage
        // `tss` column already read at query time, then use it here. That file
        // is out of scope for this viz-only change.
        let when = snapshots
            .get(&rep.dep_graph_snapshot_id)
            .map(|s| s.timestamp)
            .unwrap_or_else(Utc::now);
        for r in &rep.repos {
            by_repo.entry(r.repo.clone()).or_default().push(LaneNode {
                release_id: rep.release_id,
                timestamp: when,
                sha: r.git.sha.clone(),
                branch: r.git.branch.clone(),
                dirty: r.git.dirty,
                gate_status: r.gate_status.clone(),
                tests_passed: r.tests_passed,
                tests_failed: r.tests_failed,
                published_versions: r.published_versions.clone(),
            });
        }
    }

    let mut lanes: Vec<Lane> = by_repo
        .into_iter()
        .map(|(repo, mut nodes)| {
            nodes.sort_by_key(|n| n.timestamp);
            Lane { repo, nodes }
        })
        .collect();

    // Git-history fallback: when a workspace has NO recorded releases (empty
    // warehouse / never republished), read the WHOLE GIT HISTORY of each
    // monitored member — its tags become lane nodes — so the timeline shows the
    // repo's real release history instead of an empty placeholder. The monitor's
    // git clones live in `../git/<member>/` beside the warehouse root.
    if lanes.is_empty() {
        if let Some(git_dir) =
            bench_fallback_dir.and_then(|root| root.parent()).map(|p| p.join("git"))
        {
            lanes = git_history_lanes(&git_dir);
            // Each node's x-column is its index in `release_order`; without this
            // every fallback node resolves to col 0 and they pile up invisibly at
            // the far left. Build the order chronologically across all lanes.
            let mut chron: Vec<(DateTime<Utc>, Uuid)> = lanes
                .iter()
                .flat_map(|l| l.nodes.iter().map(|n| (n.timestamp, n.release_id)))
                .collect();
            chron.sort_by_key(|(t, _)| *t);
            release_order = chron.into_iter().map(|(_, id)| id).collect();
        }
    }

    // Bench-only fallback: if there were STILL no releases, scan the
    // `bench_runs/repo=*/` partition directories and synthesize empty lanes so
    // the TimeTravel benchmarks reel still has data.
    if lanes.is_empty() {
        if let Some(rd) = bench_fallback_dir
            .map(|root| root.join("bench_runs"))
            .and_then(|bench_dir| std::fs::read_dir(bench_dir).ok())
        {
            for entry in rd.flatten() {
                let name = entry.file_name().to_string_lossy().to_string();
                if let Some(repo) = name.strip_prefix("repo=") {
                    lanes.push(Lane { repo: repo.to_string(), nodes: Vec::new() });
                }
            }
            lanes.sort_by(|a, b| a.repo.cmp(&b.repo));
        }
    }

    // Load bench history per repo for the TimeTravel reels.
    let mut bench_history: BTreeMap<String, BenchHistory> = BTreeMap::new();
    for lane in &lanes {
        let runs = wh
            .query_bench_runs(&BenchFilter::for_repo(&lane.repo))
            .unwrap_or_default();
        let mut points: Vec<BenchPoint> = runs
            .into_iter()
            .map(|run| bench_point_from(&run))
            .collect();
        points.sort_by_key(|p| p.timestamp);
        bench_history.insert(
            lane.repo.clone(),
            BenchHistory { repo: lane.repo.clone(), points },
        );
    }

    Ok(Timeline {
        workspace_name: workspace_name.to_string(),
        lanes,
        release_order,
        release_snapshot,
        snapshots,
        latest_snapshot,
        bench_history,
    })
}

fn bench_point_from(run: &BenchRun) -> BenchPoint {
    let ts = run
        .timestamp
        .as_deref()
        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
        .map(|d| d.with_timezone(&Utc))
        .or_else(|| {
            chrono::NaiveDate::parse_from_str(&run.date, "%Y-%m-%d")
                .ok()
                .and_then(|d| d.and_hms_opt(0, 0, 0))
                .map(|n| DateTime::<Utc>::from_naive_utc_and_offset(n, Utc))
        })
        .unwrap_or_else(Utc::now);

    let mut metrics: Vec<(String, f64)> = Vec::new();
    for r in &run.results {
        for (k, v) in &r.metrics {
            if let Some(f) = v.as_f64() {
                metrics.push((format!("{}.{}", r.name, k), f));
            } else if let Some(i) = v.as_i64() {
                metrics.push((format!("{}.{}", r.name, k), i as f64));
            }
        }
    }
    let (primary_metric_name, primary_metric_value) = metrics
        .iter()
        .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
        .cloned()
        .unwrap_or_else(|| (String::from("(none)"), 0.0));

    BenchPoint {
        timestamp: ts,
        primary_metric_name,
        primary_metric_value,
        metrics,
        version: run.version.clone(),
        machine: run.machine.clone(),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::warehouse::dep_graph::CrossRepoEdge;
    use std::collections::BTreeSet;

    /// The viz `Timeline` must survive a JSON round-trip — that's the wire
    /// contract for the `Viz.Timeline` RPC (server serializes, client decodes).
    /// Covers Uuid, DateTime<Utc>, nested DepGraphSnapshot/CrossRepoEdge.
    #[test]
    fn timeline_json_roundtrip() {
        let sid = Uuid::new_v4();
        let rid = Uuid::new_v4();
        let now = Utc::now();
        let snap = DepGraphSnapshot {
            snapshot_id: sid,
            workspace_name: "ws".into(),
            timestamp: now,
            edges: vec![CrossRepoEdge {
                from: "a".into(),
                to: "b".into(),
                via: BTreeSet::from(["shared".to_string()]),
            }],
        };
        let mut snapshots = BTreeMap::new();
        snapshots.insert(sid, snap.clone());
        let mut release_snapshot = BTreeMap::new();
        release_snapshot.insert(rid, sid);
        let mut bench_history = BTreeMap::new();
        bench_history.insert(
            "a".to_string(),
            BenchHistory {
                repo: "a".into(),
                points: vec![BenchPoint {
                    timestamp: now,
                    primary_metric_name: "ops_sec".into(),
                    primary_metric_value: 1234.5,
                    metrics: vec![("x.ops_sec".into(), 1234.5)],
                    version: "0.1.0".into(),
                    machine: "m".into(),
                }],
            },
        );
        let tl = Timeline {
            workspace_name: "ws".into(),
            lanes: vec![Lane {
                repo: "a".into(),
                nodes: vec![LaneNode {
                    release_id: rid,
                    timestamp: now,
                    sha: "deadbeef".into(),
                    branch: "main".into(),
                    dirty: false,
                    gate_status: "succeeded".into(),
                    tests_passed: 3,
                    tests_failed: 0,
                    published_versions: vec![("a".into(), "0.1.0".into())],
                }],
            }],
            release_order: vec![rid],
            release_snapshot,
            snapshots,
            latest_snapshot: Some(snap),
            bench_history,
        };

        let json = serde_json::to_string(&tl).expect("serialize");
        let back: Timeline = serde_json::from_str(&json).expect("deserialize");
        assert_eq!(back.workspace_name, "ws");
        assert_eq!(back.lanes.len(), 1);
        assert_eq!(back.lanes[0].nodes[0].release_id, rid);
        assert_eq!(back.snapshots[&sid].edges[0].from, "a");
        assert_eq!(back.release_snapshot[&rid], sid);
        assert_eq!(back.bench_history["a"].points[0].primary_metric_value, 1234.5);
    }

    /// The git-history fallback (`git_history_lanes`) must turn a member repo's
    /// **tags** into lane nodes — the path that fills the timeline for a
    /// workspace with no recorded releases. Builds a real temp git repo with two
    /// version tags and asserts the lane, node count, and parsed versions.
    #[test]
    fn git_history_lanes_reads_member_tags() {
        use std::process::Command;
        let dir = tempfile::tempdir().unwrap();
        let git_dir = dir.path().join("git");
        let repo = git_dir.join("myrepo");
        std::fs::create_dir_all(&repo).unwrap();
        let git = |args: &[&str]| {
            let ok = Command::new("git")
                .args(args)
                .current_dir(&repo)
                .env("GIT_AUTHOR_NAME", "t")
                .env("GIT_AUTHOR_EMAIL", "t@e")
                .env("GIT_COMMITTER_NAME", "t")
                .env("GIT_COMMITTER_EMAIL", "t@e")
                .output()
                .expect("run git")
                .status
                .success();
            assert!(ok, "git {args:?} failed");
        };
        git(&["init", "-q", "-b", "main"]);
        std::fs::write(repo.join("f"), "1").unwrap();
        git(&["add", "."]);
        git(&["commit", "-q", "-m", "one"]);
        git(&["tag", "v0.1.0"]);
        std::fs::write(repo.join("f"), "2").unwrap();
        git(&["add", "."]);
        git(&["commit", "-q", "-m", "two"]);
        git(&["tag", "myrepo-v0.2.0"]);

        let lanes = git_history_lanes(&git_dir);
        assert_eq!(lanes.len(), 1, "one member repo → one lane");
        assert_eq!(lanes[0].repo, "myrepo");
        assert_eq!(lanes[0].nodes.len(), 2, "two tags → two nodes");
        let versions: Vec<String> = lanes[0]
            .nodes
            .iter()
            .flat_map(|n| n.published_versions.iter().map(|(_, v)| v.clone()))
            .collect();
        assert!(versions.contains(&"0.1.0".to_string()), "v0.1.0 parsed: {versions:?}");
        assert!(versions.contains(&"0.2.0".to_string()), "myrepo-v0.2.0 parsed: {versions:?}");
        assert!(lanes[0].nodes.iter().all(|n| n.sha.len() >= 7), "each node has a sha");
    }
}