use std::collections::{BTreeMap, BTreeSet};
use std::path::Path;
use anyhow::Result;
use chrono::{DateTime, Utc};
use uuid::Uuid;
use crate::bench::BenchRun;
use crate::release::pipeline::{ReleaseReport, query_release_history};
use crate::warehouse::dep_graph::{DepGraphSnapshot, query_dep_graph_snapshots};
use crate::warehouse::iceberg::IcebergWarehouse;
use crate::warehouse::{BenchFilter, Warehouse};
#[derive(Debug, Clone)]
pub struct LaneNode {
pub release_id: Uuid,
pub timestamp: DateTime<Utc>,
pub sha: String,
pub branch: String,
pub dirty: bool,
pub gate_status: String,
pub tests_passed: u32,
pub tests_failed: u32,
pub published_versions: Vec<(String, String)>,
}
#[derive(Debug, Clone)]
pub struct Lane {
pub repo: String,
pub nodes: Vec<LaneNode>,
}
#[derive(Debug, Clone)]
pub struct BenchPoint {
pub timestamp: DateTime<Utc>,
pub primary_metric_name: String,
pub primary_metric_value: f64,
pub metrics: Vec<(String, f64)>,
pub version: String,
pub machine: String,
}
#[derive(Debug, Clone, Default)]
pub struct BenchHistory {
pub repo: String,
pub points: Vec<BenchPoint>,
}
impl BenchHistory {
pub fn min_max(&self, metric: Option<&str>) -> Option<(f64, f64)> {
let vals: Vec<f64> = self
.points
.iter()
.filter_map(|p| match metric {
None => Some(p.primary_metric_value),
Some(name) => p.metrics.iter().find(|(n, _)| n == name).map(|(_, v)| *v),
})
.collect();
if vals.is_empty() {
return None;
}
let mn = vals.iter().cloned().fold(f64::INFINITY, f64::min);
let mx = vals.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
Some((mn, mx))
}
}
#[derive(Debug, Clone)]
pub struct Timeline {
pub workspace_name: String,
pub lanes: Vec<Lane>,
pub release_order: Vec<Uuid>,
pub release_snapshot: BTreeMap<Uuid, Uuid>,
pub snapshots: BTreeMap<Uuid, DepGraphSnapshot>,
pub latest_snapshot: Option<DepGraphSnapshot>,
pub bench_history: BTreeMap<String, BenchHistory>,
}
impl Timeline {
pub fn is_empty(&self) -> bool {
self.lanes.iter().all(|l| l.nodes.is_empty())
&& self.bench_history.values().all(|h| h.points.is_empty())
}
pub fn has_releases(&self) -> bool {
!self.release_order.is_empty()
}
pub fn has_benches(&self) -> bool {
self.bench_history.values().any(|h| !h.points.is_empty())
}
pub fn snapshot_for(&self, release_id: &Uuid) -> Option<&DepGraphSnapshot> {
self.release_snapshot
.get(release_id)
.and_then(|sid| self.snapshots.get(sid))
.or(self.latest_snapshot.as_ref())
}
}
pub fn load_timeline(warehouse_root: &Path, workspace_name: &str) -> Result<Timeline> {
let wh = IcebergWarehouse::open(warehouse_root)?;
let reports: Vec<ReleaseReport> =
wh.block_on(query_release_history(&wh, workspace_name, None))?;
let snapshots_vec = wh.block_on(query_dep_graph_snapshots(&wh, workspace_name, None))?;
let mut snapshots: BTreeMap<Uuid, DepGraphSnapshot> = BTreeMap::new();
let latest_snapshot = snapshots_vec.last().cloned();
for s in snapshots_vec {
snapshots.insert(s.snapshot_id, s);
}
let mut by_repo: BTreeMap<String, Vec<LaneNode>> = BTreeMap::new();
let mut release_order: Vec<Uuid> = Vec::new();
let mut release_snapshot: BTreeMap<Uuid, Uuid> = BTreeMap::new();
let mut seen: BTreeSet<Uuid> = BTreeSet::new();
for rep in &reports {
if seen.insert(rep.release_id) {
release_order.push(rep.release_id);
release_snapshot.insert(rep.release_id, rep.dep_graph_snapshot_id);
}
let when = Utc::now();
for r in &rep.repos {
by_repo.entry(r.repo.clone()).or_default().push(LaneNode {
release_id: rep.release_id,
timestamp: when,
sha: r.git.sha.clone(),
branch: r.git.branch.clone(),
dirty: r.git.dirty,
gate_status: r.gate_status.clone(),
tests_passed: r.tests_passed,
tests_failed: r.tests_failed,
published_versions: r.published_versions.clone(),
});
}
}
let mut lanes: Vec<Lane> = by_repo
.into_iter()
.map(|(repo, mut nodes)| {
nodes.sort_by_key(|n| n.timestamp);
Lane { repo, nodes }
})
.collect();
if lanes.is_empty() {
let bench_dir = warehouse_root.join("bench_runs");
if let Ok(rd) = std::fs::read_dir(&bench_dir) {
for entry in rd.flatten() {
let name = entry.file_name().to_string_lossy().to_string();
if let Some(repo) = name.strip_prefix("repo=") {
lanes.push(Lane { repo: repo.to_string(), nodes: Vec::new() });
}
}
lanes.sort_by(|a, b| a.repo.cmp(&b.repo));
}
}
let mut bench_history: BTreeMap<String, BenchHistory> = BTreeMap::new();
for lane in &lanes {
let runs = wh
.query_bench_runs(&BenchFilter::for_repo(&lane.repo))
.unwrap_or_default();
let mut points: Vec<BenchPoint> = runs
.into_iter()
.map(|run| bench_point_from(&run))
.collect();
points.sort_by_key(|p| p.timestamp);
bench_history.insert(
lane.repo.clone(),
BenchHistory { repo: lane.repo.clone(), points },
);
}
Ok(Timeline {
workspace_name: workspace_name.to_string(),
lanes,
release_order,
release_snapshot,
snapshots,
latest_snapshot,
bench_history,
})
}
fn bench_point_from(run: &BenchRun) -> BenchPoint {
let ts = run
.timestamp
.as_deref()
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|d| d.with_timezone(&Utc))
.or_else(|| {
chrono::NaiveDate::parse_from_str(&run.date, "%Y-%m-%d")
.ok()
.and_then(|d| d.and_hms_opt(0, 0, 0))
.map(|n| DateTime::<Utc>::from_naive_utc_and_offset(n, Utc))
})
.unwrap_or_else(Utc::now);
let mut metrics: Vec<(String, f64)> = Vec::new();
for r in &run.results {
for (k, v) in &r.metrics {
if let Some(f) = v.as_f64() {
metrics.push((format!("{}.{}", r.name, k), f));
} else if let Some(i) = v.as_i64() {
metrics.push((format!("{}.{}", r.name, k), i as f64));
}
}
}
let (primary_metric_name, primary_metric_value) = metrics
.iter()
.max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
.cloned()
.unwrap_or_else(|| (String::from("(none)"), 0.0));
BenchPoint {
timestamp: ts,
primary_metric_name,
primary_metric_value,
metrics,
version: run.version.clone(),
machine: run.machine.clone(),
}
}