use std::collections::{BTreeMap, BTreeSet};
use std::path::Path;
use anyhow::Result;
use chrono::{DateTime, Utc};
use uuid::Uuid;
use crate::bench::BenchRun;
use crate::release::pipeline::{ReleaseReport, query_release_history};
use crate::warehouse::dep_graph::{DepGraphSnapshot, query_dep_graph_snapshots};
use crate::warehouse::iceberg::IcebergWarehouse;
use crate::warehouse::{BenchFilter, Warehouse};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct LaneNode {
pub release_id: Uuid,
pub timestamp: DateTime<Utc>,
pub sha: String,
pub branch: String,
pub dirty: bool,
pub gate_status: String,
pub tests_passed: u32,
pub tests_failed: u32,
pub published_versions: Vec<(String, String)>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Lane {
pub repo: String,
pub nodes: Vec<LaneNode>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct BenchPoint {
pub timestamp: DateTime<Utc>,
pub primary_metric_name: String,
pub primary_metric_value: f64,
pub metrics: Vec<(String, f64)>,
pub version: String,
pub machine: String,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct BenchHistory {
pub repo: String,
pub points: Vec<BenchPoint>,
}
impl BenchHistory {
pub fn min_max(&self, metric: Option<&str>) -> Option<(f64, f64)> {
let vals: Vec<f64> = self
.points
.iter()
.filter_map(|p| match metric {
None => Some(p.primary_metric_value),
Some(name) => p.metrics.iter().find(|(n, _)| n == name).map(|(_, v)| *v),
})
.collect();
if vals.is_empty() {
return None;
}
let mn = vals.iter().cloned().fold(f64::INFINITY, f64::min);
let mx = vals.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
Some((mn, mx))
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Timeline {
pub workspace_name: String,
pub lanes: Vec<Lane>,
pub release_order: Vec<Uuid>,
pub release_snapshot: BTreeMap<Uuid, Uuid>,
pub snapshots: BTreeMap<Uuid, DepGraphSnapshot>,
pub latest_snapshot: Option<DepGraphSnapshot>,
pub bench_history: BTreeMap<String, BenchHistory>,
}
impl Timeline {
pub fn is_empty(&self) -> bool {
self.lanes.iter().all(|l| l.nodes.is_empty())
&& self.bench_history.values().all(|h| h.points.is_empty())
}
pub fn has_releases(&self) -> bool {
!self.release_order.is_empty()
}
pub fn has_benches(&self) -> bool {
self.bench_history.values().any(|h| !h.points.is_empty())
}
pub fn snapshot_for(&self, release_id: &Uuid) -> Option<&DepGraphSnapshot> {
self.release_snapshot
.get(release_id)
.and_then(|sid| self.snapshots.get(sid))
.or(self.latest_snapshot.as_ref())
}
}
pub fn load_timeline(warehouse_root: &Path, workspace_name: &str) -> Result<Timeline> {
let wh = IcebergWarehouse::open(warehouse_root)?;
build_timeline(&wh, workspace_name, Some(warehouse_root))
}
pub fn build_timeline(
wh: &IcebergWarehouse,
workspace_name: &str,
bench_fallback_dir: Option<&Path>,
) -> Result<Timeline> {
let reports: Vec<ReleaseReport> =
wh.block_on(query_release_history(wh, workspace_name, None))?;
let snapshots_vec = wh.block_on(query_dep_graph_snapshots(wh, workspace_name, None))?;
let mut snapshots: BTreeMap<Uuid, DepGraphSnapshot> = BTreeMap::new();
let latest_snapshot = snapshots_vec.last().cloned();
for s in snapshots_vec {
snapshots.insert(s.snapshot_id, s);
}
let mut by_repo: BTreeMap<String, Vec<LaneNode>> = BTreeMap::new();
let mut release_order: Vec<Uuid> = Vec::new();
let mut release_snapshot: BTreeMap<Uuid, Uuid> = BTreeMap::new();
let mut seen: BTreeSet<Uuid> = BTreeSet::new();
for rep in &reports {
if seen.insert(rep.release_id) {
release_order.push(rep.release_id);
release_snapshot.insert(rep.release_id, rep.dep_graph_snapshot_id);
}
let when = Utc::now();
for r in &rep.repos {
by_repo.entry(r.repo.clone()).or_default().push(LaneNode {
release_id: rep.release_id,
timestamp: when,
sha: r.git.sha.clone(),
branch: r.git.branch.clone(),
dirty: r.git.dirty,
gate_status: r.gate_status.clone(),
tests_passed: r.tests_passed,
tests_failed: r.tests_failed,
published_versions: r.published_versions.clone(),
});
}
}
let mut lanes: Vec<Lane> = by_repo
.into_iter()
.map(|(repo, mut nodes)| {
nodes.sort_by_key(|n| n.timestamp);
Lane { repo, nodes }
})
.collect();
if lanes.is_empty() {
if let Some(rd) = bench_fallback_dir
.map(|root| root.join("bench_runs"))
.and_then(|bench_dir| std::fs::read_dir(bench_dir).ok())
{
for entry in rd.flatten() {
let name = entry.file_name().to_string_lossy().to_string();
if let Some(repo) = name.strip_prefix("repo=") {
lanes.push(Lane { repo: repo.to_string(), nodes: Vec::new() });
}
}
lanes.sort_by(|a, b| a.repo.cmp(&b.repo));
}
}
let mut bench_history: BTreeMap<String, BenchHistory> = BTreeMap::new();
for lane in &lanes {
let runs = wh
.query_bench_runs(&BenchFilter::for_repo(&lane.repo))
.unwrap_or_default();
let mut points: Vec<BenchPoint> = runs
.into_iter()
.map(|run| bench_point_from(&run))
.collect();
points.sort_by_key(|p| p.timestamp);
bench_history.insert(
lane.repo.clone(),
BenchHistory { repo: lane.repo.clone(), points },
);
}
Ok(Timeline {
workspace_name: workspace_name.to_string(),
lanes,
release_order,
release_snapshot,
snapshots,
latest_snapshot,
bench_history,
})
}
fn bench_point_from(run: &BenchRun) -> BenchPoint {
let ts = run
.timestamp
.as_deref()
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|d| d.with_timezone(&Utc))
.or_else(|| {
chrono::NaiveDate::parse_from_str(&run.date, "%Y-%m-%d")
.ok()
.and_then(|d| d.and_hms_opt(0, 0, 0))
.map(|n| DateTime::<Utc>::from_naive_utc_and_offset(n, Utc))
})
.unwrap_or_else(Utc::now);
let mut metrics: Vec<(String, f64)> = Vec::new();
for r in &run.results {
for (k, v) in &r.metrics {
if let Some(f) = v.as_f64() {
metrics.push((format!("{}.{}", r.name, k), f));
} else if let Some(i) = v.as_i64() {
metrics.push((format!("{}.{}", r.name, k), i as f64));
}
}
}
let (primary_metric_name, primary_metric_value) = metrics
.iter()
.max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
.cloned()
.unwrap_or_else(|| (String::from("(none)"), 0.0));
BenchPoint {
timestamp: ts,
primary_metric_name,
primary_metric_value,
metrics,
version: run.version.clone(),
machine: run.machine.clone(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::warehouse::dep_graph::CrossRepoEdge;
use std::collections::BTreeSet;
#[test]
fn timeline_json_roundtrip() {
let sid = Uuid::new_v4();
let rid = Uuid::new_v4();
let now = Utc::now();
let snap = DepGraphSnapshot {
snapshot_id: sid,
workspace_name: "ws".into(),
timestamp: now,
edges: vec![CrossRepoEdge {
from: "a".into(),
to: "b".into(),
via: BTreeSet::from(["shared".to_string()]),
}],
};
let mut snapshots = BTreeMap::new();
snapshots.insert(sid, snap.clone());
let mut release_snapshot = BTreeMap::new();
release_snapshot.insert(rid, sid);
let mut bench_history = BTreeMap::new();
bench_history.insert(
"a".to_string(),
BenchHistory {
repo: "a".into(),
points: vec![BenchPoint {
timestamp: now,
primary_metric_name: "ops_sec".into(),
primary_metric_value: 1234.5,
metrics: vec![("x.ops_sec".into(), 1234.5)],
version: "0.1.0".into(),
machine: "m".into(),
}],
},
);
let tl = Timeline {
workspace_name: "ws".into(),
lanes: vec![Lane {
repo: "a".into(),
nodes: vec![LaneNode {
release_id: rid,
timestamp: now,
sha: "deadbeef".into(),
branch: "main".into(),
dirty: false,
gate_status: "succeeded".into(),
tests_passed: 3,
tests_failed: 0,
published_versions: vec![("a".into(), "0.1.0".into())],
}],
}],
release_order: vec![rid],
release_snapshot,
snapshots,
latest_snapshot: Some(snap),
bench_history,
};
let json = serde_json::to_string(&tl).expect("serialize");
let back: Timeline = serde_json::from_str(&json).expect("deserialize");
assert_eq!(back.workspace_name, "ws");
assert_eq!(back.lanes.len(), 1);
assert_eq!(back.lanes[0].nodes[0].release_id, rid);
assert_eq!(back.snapshots[&sid].edges[0].from, "a");
assert_eq!(back.release_snapshot[&rid], sid);
assert_eq!(back.bench_history["a"].points[0].primary_metric_value, 1234.5);
}
}