use std::collections::{BTreeMap, VecDeque};
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use anyhow::{Context, Result};
use arrow::array::{
Array, BooleanArray, Int32Array, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use chrono::Utc;
use futures::TryStreamExt;
use iceberg::Catalog;
use iceberg::arrow::schema_to_arrow_schema;
use uuid::Uuid;
use crate::warehouse::dep_graph::WorkspaceGraph;
use crate::warehouse::iceberg::{self as wh_iceberg, IcebergWarehouse};
use crate::workspace::descriptor::WorkspaceDescriptor;
#[derive(Debug, Clone)]
pub struct PipelineOptions {
pub dry_run: bool,
pub dep_graph_snapshot_id: Uuid,
pub refuse_if_dirty: bool,
pub progress: Option<crate::release::progress::ProgressWriter>,
pub gate_config: Option<PipelineGateConfig>,
}
#[derive(Debug, Clone)]
pub struct PipelineGateConfig {
pub workspace_root: PathBuf,
pub repo_gates: BTreeMap<String, crate::config::Gates>,
}
impl Default for PipelineOptions {
fn default() -> Self {
Self {
dry_run: true,
dep_graph_snapshot_id: Uuid::nil(),
refuse_if_dirty: true,
progress: None,
gate_config: None,
}
}
}
pub(crate) fn gate_succeeded(status: &str) -> bool {
matches!(status, "succeeded" | "succeeded_dry_run")
}
#[derive(Debug, Clone)]
pub struct RepoGitState {
pub sha: String,
pub branch: String,
pub dirty: bool,
}
#[derive(Debug, Clone)]
pub struct RepoReleaseRecord {
pub repo: String,
pub build_order_idx: usize,
pub git: RepoGitState,
pub gate_status: String,
pub tests_passed: u32,
pub tests_failed: u32,
pub published_versions: Vec<(String, String)>,
}
#[derive(Debug, Clone)]
pub struct ReleaseReport {
pub release_id: Uuid,
pub workspace_name: String,
pub dep_graph_snapshot_id: Uuid,
pub repos: Vec<RepoReleaseRecord>,
}
pub async fn run_pipeline(
wh: &IcebergWarehouse,
desc: &WorkspaceDescriptor,
graph: &WorkspaceGraph,
opts: &PipelineOptions,
) -> Result<ReleaseReport> {
let release_id = Uuid::new_v4();
let workspace_name = desc.workspace.name.clone();
let order = graph.build_order().context("derive build order")?;
let sources: BTreeMap<String, PathBuf> = crate::workspace::resolve::resolve_sources(desc)
.context("resolve workspace sources")?;
let mut records: Vec<RepoReleaseRecord> = Vec::new();
use crate::release::progress::{now, ReleaseEvent};
if let Some(p) = &opts.progress {
p.emit(&ReleaseEvent::RunStart {
ts: now(),
run_id: release_id.to_string(),
workspace: workspace_name.clone(),
});
}
let mut pre: BTreeMap<String, RepoGitState> = BTreeMap::new();
for repo in &order {
let root = sources
.get(repo)
.ok_or_else(|| anyhow::anyhow!("no source path for repo `{repo}` in descriptor"))?;
let git = read_git_state(root)
.with_context(|| format!("read git state for {}", root.display()))?;
pre.insert(repo.clone(), git);
}
if opts.refuse_if_dirty {
let dirty: Vec<&String> = order.iter().filter(|r| pre[*r].dirty).collect();
if !dirty.is_empty() {
let detail = dirty
.iter()
.map(|r| format!(" - {r} (HEAD={})", &pre[*r].sha[..12]))
.collect::<Vec<_>>()
.join("\n");
anyhow::bail!(
"release refused: {} repo(s) have uncommitted changes:\n{detail}\n\
commit the work, or `git reset --hard <last-clean-sha>` to roll back, \
then re-run. (Set refuse_if_dirty=false to override.)",
dirty.len()
);
}
}
for (idx, repo) in order.iter().enumerate() {
let root = sources.get(repo).expect("checked above");
let git = pre.remove(repo).expect("checked above");
println!(
"\n\u{2501}\u{2501}\u{2501} [{i}/{n}] {repo} \u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}\u{2501}",
i = idx + 1,
n = order.len(),
);
println!(
"📍 sha={} branch={} dirty={}",
&git.sha[..git.sha.len().min(12)],
git.branch,
if git.dirty { "yes" } else { "no" }
);
if let Some(p) = &opts.progress {
p.emit(&ReleaseEvent::RepoStart {
ts: now(),
repo: repo.clone(),
sha: git.sha.clone(),
});
p.emit(&ReleaseEvent::PhaseStart {
ts: now(),
repo: repo.clone(),
phase: "test".to_string(),
});
}
let phase_start = Instant::now();
let (tests_passed, tests_failed, test_ok) =
run_cargo_test(root, Some(repo), opts.progress.as_ref())
.with_context(|| format!("cargo test for repo `{repo}`"))?;
println!(
" \u{2192} test totals: {tests_passed} passed, {tests_failed} failed ({:.1}s) {}",
phase_start.elapsed().as_secs_f32(),
if test_ok { "\u{2713}" } else { "\u{2717} FAILED" }
);
if let Some(p) = &opts.progress {
p.emit(&ReleaseEvent::PhaseEnd {
ts: now(),
repo: repo.clone(),
phase: "test".to_string(),
ok: test_ok,
duration_ms: phase_start.elapsed().as_millis() as u64,
});
}
let bench_start = Instant::now();
let bench_status = match run_bench_example(root) {
Ok(None) => {
println!("\u{26A1} bench: skipped (no examples/nornir-bench.rs)");
"skipped".to_string()
}
Ok(Some(run)) => {
let n_res = run.results.len();
let n_failed = run.tests.iter().filter(|t| !t.passed).count();
println!(
"\u{26A1} bench ({:.1}s): {} result(s), {} test fail(s)",
bench_start.elapsed().as_secs_f32(),
n_res,
n_failed,
);
for r in &run.results {
let mut kv: Vec<String> = r
.metrics
.iter()
.filter_map(|(k, v)| v.as_f64().map(|f| format!("{k}={f:.2}")))
.collect();
kv.sort();
println!(" \u{2022} {:30} {}", r.name, kv.join(" "));
}
if n_failed > 0 {
for t in run.tests.iter().filter(|t| !t.passed) {
println!(" \u{2717} test {}", t.name);
}
"failed_bench".to_string()
} else {
let regression = match &opts.gate_config {
Some(gc) if gc.repo_gates.get(repo).map(|g| g.no_regression).unwrap_or(false) => {
let g = gc.repo_gates.get(repo).expect("checked");
let pct = if g.max_regression_pct > 0.0 { g.max_regression_pct } else { 10.0 };
let hist = wh
.query_bench_runs_async(&crate::warehouse::BenchFilter {
repo: Some(repo.clone()),
machine: Some(run.machine.clone()),
limit: None,
})
.await
.with_context(|| format!("read bench history for `{repo}`"))?;
crate::release::gate::no_regression_against(&run, &hist, pct).err()
}
_ => None,
};
if let Some(e) = regression {
println!(" \u{2717} regression gate: {e:#}");
"failed_regression".to_string()
} else {
wh.append_bench_run_async(repo, &run)
.await
.with_context(|| format!("persist bench run for `{repo}`"))?;
println!(" \u{2713} persisted into bench_runs/bench_results");
"ok".to_string()
}
}
}
Err(e) => {
println!("\u{26A1} bench: \u{2717} error: {e:#}");
"failed_bench_exec".to_string()
}
};
let guard_failure: Option<String> = match &opts.gate_config {
Some(gc) if gc.repo_gates.get(repo).map(|g| g.guard_intact).unwrap_or(false) => {
match crate::guard::read_manifest(&gc.workspace_root) {
Ok(manifest) => crate::guard::intact(&gc.workspace_root, &manifest)
.err()
.map(|e| format!("{e:#}")),
Err(e) => Some(format!("guard manifest unavailable: {e:#}")),
}
}
_ => None,
};
let bench_ok = matches!(bench_status.as_str(), "ok" | "skipped");
let (gate_status, published) = if !test_ok {
("failed_test".to_string(), Vec::new())
} else if !bench_ok {
(bench_status.clone(), Vec::new())
} else if let Some(reason) = &guard_failure {
println!(" \u{2717} guard_intact gate: {reason}");
("failed_guard_intact".to_string(), Vec::new())
} else if opts.dry_run {
("succeeded_dry_run".to_string(), Vec::new())
} else {
("succeeded".to_string(), Vec::new())
};
let repo_ok = gate_succeeded(&gate_status);
records.push(RepoReleaseRecord {
repo: repo.clone(),
build_order_idx: idx,
git,
gate_status: gate_status.clone(),
tests_passed,
tests_failed,
published_versions: published,
});
if let Some(p) = &opts.progress {
p.emit(&ReleaseEvent::RepoEnd {
ts: now(),
repo: repo.clone(),
ok: repo_ok,
});
}
if !repo_ok {
break;
}
}
persist(wh, release_id, &workspace_name, &opts.dep_graph_snapshot_id, &records, opts.dry_run)
.await?;
let final_ok = records.iter().all(|r| gate_succeeded(&r.gate_status));
if let Some(p) = &opts.progress {
p.emit(&ReleaseEvent::RunEnd {
ts: now(),
run_id: release_id.to_string(),
ok: final_ok,
});
}
Ok(ReleaseReport {
release_id,
workspace_name,
dep_graph_snapshot_id: opts.dep_graph_snapshot_id,
repos: records,
})
}
fn read_git_state(root: &Path) -> Result<RepoGitState> {
let repo = gix::open(root).with_context(|| format!("gix::open {}", root.display()))?;
let head = repo.head().context("read HEAD")?;
let sha = head
.id()
.ok_or_else(|| anyhow::anyhow!("HEAD has no commit (unborn?)"))?
.to_string();
let branch = match repo.head_name()? {
Some(name) => name.shorten().to_string(),
None => "(detached)".to_string(),
};
let dirty = is_dirty(&repo);
Ok(RepoGitState { sha, branch, dirty })
}
fn is_dirty(repo: &gix::Repository) -> bool {
let Ok(platform) = repo.status(gix::progress::Discard) else {
return false;
};
let Ok(iter) = platform.into_index_worktree_iter(Vec::<gix::bstr::BString>::new()) else {
return false;
};
iter.filter_map(Result::ok).next().is_some()
}
pub fn run_cargo_test(
root: &Path,
repo_for_events: Option<&str>,
progress: Option<&crate::release::progress::ProgressWriter>,
) -> Result<(u32, u32, bool)> {
use crate::release::progress::{now, ReleaseEvent};
let repo_label = repo_for_events.unwrap_or("").to_string();
let emit = |ev: ReleaseEvent| {
if let Some(p) = progress { p.emit(&ev); }
};
let heavy = std::env::var("NORNIR_HEAVY_TESTS")
.map(|v| !v.is_empty() && v != "0")
.unwrap_or(false);
let verbose = std::env::var("NORNIR_VERBOSE")
.map(|v| !v.is_empty() && v != "0")
.unwrap_or(false);
let mut args: Vec<&str> = vec![
"test", "--workspace", "--no-fail-fast", "-j", "1",
"--", "--test-threads=1",
];
if heavy {
args.push("--include-ignored");
}
println!("🧪 cargo {}", args.join(" "));
let mut child = Command::new("cargo")
.args(&args)
.current_dir(root)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.context("spawn cargo test")?;
let stdout = child.stdout.take().expect("piped stdout");
let stderr = child.stderr.take().expect("piped stderr");
let pending_bins: Arc<Mutex<VecDeque<String>>> = Arc::new(Mutex::new(VecDeque::new()));
let bins_for_stderr = Arc::clone(&pending_bins);
let progress_for_stderr = progress.cloned();
let repo_for_stderr = repo_label.clone();
let stderr_handle = std::thread::spawn(move || {
for line in BufReader::new(stderr).lines().map_while(Result::ok) {
let trimmed = line.trim_start();
if let Some(what) = trimmed.strip_prefix("Running ") {
let what = what.trim().to_string();
println!(" 📂 {what}");
if let Some(p) = &progress_for_stderr {
p.emit(&ReleaseEvent::BinaryStart {
ts: now(),
repo: repo_for_stderr.clone(),
binary: what.clone(),
});
}
bins_for_stderr.lock().unwrap().push_back(what);
} else if verbose && trimmed.starts_with("Compiling ") {
println!(" 🔨 {}", trimmed.trim_start_matches("Compiling "));
} else if trimmed.starts_with("error") {
println!(" {line}");
}
}
});
let mut passed = 0u32;
let mut failed = 0u32;
let mut failure_names: Vec<String> = Vec::new();
let mut current_bin: String = String::new();
let mut bin_passed: u32 = 0;
let mut bin_failed: u32 = 0;
for line in BufReader::new(stdout).lines().map_while(Result::ok) {
let trimmed = line.trim_start();
if let Some(rest) = trimmed.strip_prefix("running ") {
if let Some((n, _)) = rest.split_once(' ') {
if let Ok(n) = n.parse::<u32>() {
current_bin = pending_bins
.lock()
.unwrap()
.pop_front()
.unwrap_or_default();
let bin_short = current_bin
.split_whitespace()
.next()
.unwrap_or(¤t_bin);
println!(" \u{2192} {n} test(s) {bin_short}");
bin_passed = 0;
bin_failed = 0;
}
}
} else if let Some(name) = trimmed.strip_prefix("test ") {
if let Some(nm) = name.strip_suffix(" ... FAILED") {
println!(" \u{2717} {nm}");
failure_names.push(format!(
"{}::{nm}",
current_bin
.split_whitespace()
.next()
.unwrap_or("?")
));
bin_failed += 1;
emit(ReleaseEvent::TestFail {
ts: now(),
repo: repo_label.clone(),
binary: current_bin.clone(),
name: nm.to_string(),
});
} else if let Some(nm) = name.strip_suffix(" ... ok") {
if verbose {
println!(" \u{2713} {nm}");
}
bin_passed += 1;
emit(ReleaseEvent::TestPass {
ts: now(),
repo: repo_label.clone(),
binary: current_bin.clone(),
name: nm.to_string(),
});
}
} else if trimmed.starts_with("test result:") {
for chunk in trimmed.split(';') {
let chunk = chunk.trim();
if let Some(n) = chunk.strip_suffix(" passed") {
if let Some(n) = n.split_whitespace().last() {
passed += n.parse::<u32>().unwrap_or(0);
}
} else if let Some(n) = chunk.strip_suffix(" failed") {
if let Some(n) = n.split_whitespace().last() {
failed += n.parse::<u32>().unwrap_or(0);
}
}
}
println!(" \u{2261} {trimmed}");
emit(ReleaseEvent::BinaryDone {
ts: now(),
repo: repo_label.clone(),
binary: current_bin.clone(),
passed: bin_passed,
failed: bin_failed,
});
}
}
let status = child.wait().context("wait cargo test")?;
let _ = stderr_handle.join();
if failure_names.len() as u32 > failed {
failed = failure_names.len() as u32;
}
if !failure_names.is_empty() {
println!(" ✗ {} failure(s):", failure_names.len());
for fn_ in &failure_names {
println!(" • {fn_}");
}
}
Ok((passed, failed, status.success() && failed == 0))
}
async fn persist(
wh: &IcebergWarehouse,
release_id: Uuid,
workspace_name: &str,
dep_graph_snapshot_id: &Uuid,
records: &[RepoReleaseRecord],
dry_run: bool,
) -> Result<()> {
if records.is_empty() {
return Ok(());
}
let table = wh
.catalog()
.load_table(&wh.table_ident(wh_iceberg::TABLE_RELEASE_LINEAGE))
.await?;
let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let ts = Utc::now().timestamp_micros();
let rid = release_id.to_string();
let dsid = dep_graph_snapshot_id.to_string();
let n = records.len();
let mut release_ids = Vec::with_capacity(n);
let mut ws_names = Vec::with_capacity(n);
let mut ts_vals = Vec::with_capacity(n);
let mut dgsids = Vec::with_capacity(n);
let mut order_idx = Vec::with_capacity(n);
let mut repos = Vec::with_capacity(n);
let mut shas = Vec::with_capacity(n);
let mut branches = Vec::with_capacity(n);
let mut dirties = Vec::with_capacity(n);
let mut statuses = Vec::with_capacity(n);
let mut tpass = Vec::with_capacity(n);
let mut tfail = Vec::with_capacity(n);
let mut dryruns = Vec::with_capacity(n);
let mut pubs = Vec::with_capacity(n);
for r in records {
release_ids.push(rid.clone());
ws_names.push(workspace_name.to_string());
ts_vals.push(ts);
dgsids.push(dsid.clone());
order_idx.push(r.build_order_idx as i32);
repos.push(r.repo.clone());
shas.push(r.git.sha.clone());
branches.push(r.git.branch.clone());
dirties.push(r.git.dirty);
statuses.push(r.gate_status.clone());
tpass.push(r.tests_passed as i32);
tfail.push(r.tests_failed as i32);
dryruns.push(dry_run);
pubs.push(
r.published_versions
.iter()
.map(|(c, v)| format!("{c}@{v}"))
.collect::<Vec<_>>()
.join(","),
);
}
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(release_ids)),
Arc::new(StringArray::from(ws_names)),
Arc::new(TimestampMicrosecondArray::from(ts_vals).with_timezone("+00:00")),
Arc::new(StringArray::from(dgsids)),
Arc::new(Int32Array::from(order_idx)),
Arc::new(StringArray::from(repos)),
Arc::new(StringArray::from(shas)),
Arc::new(StringArray::from(branches)),
Arc::new(BooleanArray::from(dirties)),
Arc::new(StringArray::from(statuses)),
Arc::new(Int32Array::from(tpass)),
Arc::new(Int32Array::from(tfail)),
Arc::new(BooleanArray::from(dryruns)),
Arc::new(StringArray::from(pubs)),
Arc::new(StringArray::from(vec![None as Option<String>; n])),
Arc::new(StringArray::from(vec![None as Option<String>; n])),
Arc::new(StringArray::from(vec![None as Option<String>; n])),
Arc::new(StringArray::from(vec![None as Option<String>; n])), Arc::new(BooleanArray::from(vec![None as Option<bool>; n])), Arc::new(StringArray::from(vec![None as Option<String>; n])), Arc::new(Int32Array::from(vec![None as Option<i32>; n])), ];
let batch = RecordBatch::try_new(arrow_schema, cols)?;
wh_iceberg::append_batch(wh.catalog(), table, batch).await?;
Ok(())
}
pub async fn query_release_history(
wh: &IcebergWarehouse,
workspace_name: &str,
limit: Option<usize>,
) -> Result<Vec<ReleaseReport>> {
let table = wh
.catalog()
.load_table(&wh.table_ident(wh_iceberg::TABLE_RELEASE_LINEAGE))
.await?;
let scan = table.scan().build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
let mut by_release: BTreeMap<(Uuid, i64), (String, Uuid, Vec<RepoReleaseRecord>)> = BTreeMap::new();
for batch in &batches {
let rid = down::<StringArray>(batch, 0)?;
let wss = down::<StringArray>(batch, 1)?;
let tss = down::<TimestampMicrosecondArray>(batch, 2)?;
let dgs = down::<StringArray>(batch, 3)?;
let oid = down::<Int32Array>(batch, 4)?;
let rep = down::<StringArray>(batch, 5)?;
let sha = down::<StringArray>(batch, 6)?;
let br = down::<StringArray>(batch, 7)?;
let dty = down::<BooleanArray>(batch, 8)?;
let st = down::<StringArray>(batch, 9)?;
let tp = down::<Int32Array>(batch, 10)?;
let tf = down::<Int32Array>(batch, 11)?;
let _dr = down::<BooleanArray>(batch, 12)?;
let pub_csv = down::<StringArray>(batch, 13)?;
for i in 0..batch.num_rows() {
if !workspace_name.is_empty() && wss.value(i) != workspace_name {
continue;
}
let release_uuid = Uuid::parse_str(rid.value(i))?;
let snap_uuid = Uuid::parse_str(dgs.value(i)).unwrap_or(Uuid::nil());
let key = (release_uuid, tss.value(i));
let entry = by_release
.entry(key)
.or_insert_with(|| (wss.value(i).to_string(), snap_uuid, Vec::new()));
let published = if pub_csv.value(i).is_empty() {
Vec::new()
} else {
pub_csv
.value(i)
.split(',')
.filter_map(|s| {
let (c, v) = s.split_once('@')?;
Some((c.to_string(), v.to_string()))
})
.collect()
};
entry.2.push(RepoReleaseRecord {
repo: rep.value(i).to_string(),
build_order_idx: oid.value(i) as usize,
git: RepoGitState {
sha: sha.value(i).to_string(),
branch: br.value(i).to_string(),
dirty: dty.value(i),
},
gate_status: st.value(i).to_string(),
tests_passed: tp.value(i) as u32,
tests_failed: tf.value(i) as u32,
published_versions: published,
});
}
}
let mut out: Vec<ReleaseReport> = by_release
.into_iter()
.map(|((release_id, _ts), (ws, snap, mut repos))| {
repos.sort_by_key(|r| r.build_order_idx);
ReleaseReport {
release_id,
workspace_name: ws,
dep_graph_snapshot_id: snap,
repos,
}
})
.collect();
if let Some(n) = limit {
let drop_n = out.len().saturating_sub(n);
out.drain(..drop_n);
}
Ok(out)
}
fn down<T: 'static>(batch: &RecordBatch, idx: usize) -> Result<&T> {
batch
.column(idx)
.as_any()
.downcast_ref::<T>()
.ok_or_else(|| anyhow::anyhow!("column {idx} downcast failed"))
}
pub fn run_bench_example(root: &Path) -> Result<Option<crate::bench::BenchRun>> {
let (cwd_args, run_dir): (Vec<&str>, PathBuf) =
if root.join("examples/nornir-bench.rs").is_file() {
(
vec!["run", "--release", "--example", "nornir-bench", "--quiet"],
root.to_path_buf(),
)
} else if root.join("xtask/examples/nornir-bench.rs").is_file() {
(
vec!["run", "-p", "xtask", "--release", "--example", "nornir-bench", "--quiet"],
root.to_path_buf(),
)
} else if root.join("bench/examples/nornir-bench.rs").is_file() {
(
vec!["run", "--release", "--example", "nornir-bench", "--quiet"],
root.join("bench"),
)
} else {
return Ok(None);
};
let output = Command::new("cargo")
.args(&cwd_args)
.current_dir(&run_dir)
.output()
.context("spawn cargo run --example nornir-bench")?;
if !output.status.success() {
anyhow::bail!(
"cargo {} exited {} for {}\nstderr:\n{}",
cwd_args.join(" "),
output.status,
root.display(),
String::from_utf8_lossy(&output.stderr)
);
}
let stdout = String::from_utf8_lossy(&output.stdout);
let line = stdout
.lines()
.filter(|l| !l.trim().is_empty())
.last()
.ok_or_else(|| anyhow::anyhow!("nornir-bench produced no stdout"))?;
let run: crate::bench::BenchRun = serde_json::from_str(line)
.with_context(|| format!("parse nornir-bench stdout as BenchRun:\n{line}"))?;
Ok(Some(run))
}
#[cfg(test)]
mod gate_status_tests {
use super::gate_succeeded;
#[test]
fn only_success_statuses_count_as_succeeded() {
assert!(gate_succeeded("succeeded"));
assert!(gate_succeeded("succeeded_dry_run"));
for bad in [
"failed_test",
"failed_bench",
"failed_bench_exec",
"failed_regression",
"failed_guard_intact",
"skipped",
"",
] {
assert!(!gate_succeeded(bad), "`{bad}` must not be a success status");
}
}
}