use std::io::{BufRead, BufReader};
use std::path::Path;
use std::process::{Command, Stdio};
use std::sync::mpsc;
use std::time::{Duration, Instant};
use crate::warehouse::test_results::status;
pub const DEFAULT_STALL_SECS: u64 = 120;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Runner {
Nextest,
CargoTest,
}
impl Runner {
pub fn label(self) -> &'static str {
match self {
Runner::Nextest => "cargo nextest run",
Runner::CargoTest => "cargo test",
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct TestCase {
pub suite: String,
pub name: String,
pub status: String,
pub duration_ms: f64,
pub message: String,
}
#[derive(Debug, Clone)]
pub struct MatrixRun {
pub runner: Runner,
pub cases: Vec<TestCase>,
pub stalled: bool,
}
impl MatrixRun {
pub fn passed(&self) -> usize {
self.cases.iter().filter(|c| c.status == status::PASS).count()
}
pub fn failed(&self) -> usize {
self.cases.iter().filter(|c| c.status == status::FAIL).count()
}
pub fn ignored(&self) -> usize {
self.cases.iter().filter(|c| c.status == status::IGNORED).count()
}
pub fn stalled_count(&self) -> usize {
self.cases.iter().filter(|c| c.status == status::STALLED).count()
}
pub fn green(&self) -> bool {
!self.cases.iter().any(|c| status::is_red(&c.status))
}
}
pub fn detect_runner() -> Runner {
if nextest_available() {
Runner::Nextest
} else {
Runner::CargoTest
}
}
fn nextest_available() -> bool {
Command::new("cargo")
.args(["nextest", "--version"])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.map(|s| s.success())
.unwrap_or(false)
}
pub fn stall_secs() -> u64 {
std::env::var("NORNIR_TEST_STALL_SECS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_STALL_SECS)
}
pub fn run_matrix(repo_root: &Path, runner: Runner) -> std::io::Result<MatrixRun> {
let mut cmd = Command::new("cargo");
match runner {
Runner::Nextest => {
cmd.args(["nextest", "run", "--message-format", "libtest-json"])
.env("NEXTEST_EXPERIMENTAL_LIBTEST_JSON", "1");
}
Runner::CargoTest => {
cmd.args(["test", "--no-fail-fast"]);
}
}
cmd.current_dir(repo_root)
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let mut child = cmd.spawn()?;
let stdout = child.stdout.take().expect("piped stdout");
let stderr = child.stderr.take().expect("piped stderr");
let (tx, rx) = mpsc::channel::<String>();
let tx_err = tx.clone();
let h_out = std::thread::spawn(move || {
for line in BufReader::new(stdout).lines().map_while(Result::ok) {
if tx.send(line).is_err() {
break;
}
}
});
let h_err = std::thread::spawn(move || {
for line in BufReader::new(stderr).lines().map_while(Result::ok) {
if tx_err.send(format!("\u{1}STDERR\u{1}{line}")).is_err() {
break;
}
}
});
let stall = stall_secs();
let mut parser = Parser::new(runner);
let mut stalled = false;
let poll = Duration::from_millis(500);
let mut last_activity = Instant::now();
loop {
match rx.recv_timeout(poll) {
Ok(line) => {
last_activity = Instant::now();
if let Some(rest) = line.strip_prefix("\u{1}STDERR\u{1}") {
let _ = rest;
} else {
parser.feed(&line);
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {
if stall > 0 && last_activity.elapsed() >= Duration::from_secs(stall) {
let _ = child.kill();
stalled = true;
parser.push_stalled(stall);
break;
}
if let Ok(Some(_)) = child.try_wait() {
while let Ok(line) = rx.try_recv() {
if let Some(rest) = line.strip_prefix("\u{1}STDERR\u{1}") {
let _ = rest;
} else {
parser.feed(&line);
}
}
break;
}
}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
}
let _ = child.wait();
let _ = h_out.join();
let _ = h_err.join();
Ok(MatrixRun { runner, cases: parser.into_cases(), stalled })
}
struct Parser {
runner: Runner,
cases: Vec<TestCase>,
current_suite: String,
}
impl Parser {
fn new(runner: Runner) -> Self {
Self { runner, cases: Vec::new(), current_suite: String::new() }
}
fn feed(&mut self, line: &str) {
match self.runner {
Runner::Nextest => self.feed_json(line),
Runner::CargoTest => self.feed_cargo(line),
}
}
fn feed_json(&mut self, line: &str) {
let line = line.trim();
if !line.starts_with('{') {
return;
}
let Ok(v) = serde_json::from_str::<serde_json::Value>(line) else { return };
if v.get("type").and_then(|t| t.as_str()) != Some("test") {
return;
}
let event = v.get("event").and_then(|e| e.as_str()).unwrap_or("");
let st = match event {
"ok" => status::PASS,
"failed" => status::FAIL,
"ignored" => status::IGNORED,
_ => return,
};
let raw_name = v.get("name").and_then(|n| n.as_str()).unwrap_or("").to_string();
let (suite, name) = match raw_name.split_once('$') {
Some((s, n)) => (s.to_string(), n.to_string()),
None => (String::new(), raw_name),
};
let duration_ms = v
.get("exec_time")
.and_then(|t| t.as_f64())
.map(|s| s * 1000.0)
.unwrap_or(0.0);
let message = v
.get("stdout")
.and_then(|s| s.as_str())
.map(first_failure_line)
.unwrap_or_default();
self.cases.push(TestCase { suite, name, status: st.into(), duration_ms, message });
}
fn feed_cargo(&mut self, line: &str) {
let t = line.trim();
if let Some(idx) = t.find("Running ") {
if let Some(open) = t[idx..].rfind('(') {
let inside = &t[idx + open + 1..];
if let Some(close) = inside.find(')') {
let path = &inside[..close];
self.current_suite = suite_from_path(path);
}
}
return;
}
let Some(rest) = t.strip_prefix("test ") else { return };
let Some((name, verdict)) = rest.rsplit_once(" ... ") else { return };
let name = name.trim();
if name == "result:" || name.is_empty() {
return;
}
let st = match verdict.trim() {
"ok" => status::PASS,
"FAILED" => status::FAIL,
v if v.starts_with("ignored") => status::IGNORED,
_ => return,
};
self.cases.push(TestCase {
suite: self.current_suite.clone(),
name: name.to_string(),
status: st.into(),
duration_ms: 0.0,
message: String::new(),
});
}
fn push_stalled(&mut self, stall_secs: u64) {
self.cases.push(TestCase {
suite: String::new(),
name: "<test-run>".into(),
status: status::STALLED.into(),
duration_ms: (stall_secs as f64) * 1000.0,
message: format!("no test output for {stall_secs}s — watchdog killed the run"),
});
}
fn into_cases(self) -> Vec<TestCase> {
self.cases
}
}
fn suite_from_path(path: &str) -> String {
let file = Path::new(path).file_name().and_then(|f| f.to_str()).unwrap_or(path);
match file.rsplit_once('-') {
Some((stem, hash)) if hash.chars().all(|c| c.is_ascii_hexdigit()) => stem.to_string(),
_ => file.to_string(),
}
}
fn first_failure_line(stdout: &str) -> String {
for line in stdout.lines() {
let l = line.trim();
if l.contains("panicked at") || l.contains("assertion") || l.starts_with("Error:") {
return l.chars().take(240).collect();
}
}
String::new()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_cargo_test_default_grammar() {
let mut p = Parser::new(Runner::CargoTest);
p.feed(" Running unittests src/lib.rs (target/debug/deps/nornir-3f9a1c0011223344)");
p.feed("test warehouse::tests::round_trip ... ok");
p.feed("test warehouse::tests::flaky ... FAILED");
p.feed("test warehouse::tests::skipped ... ignored");
p.feed("test result: FAILED. 1 passed; 1 failed; 1 ignored");
let cases = p.into_cases();
assert_eq!(cases.len(), 3, "3 cases parsed, not the summary line: {cases:?}");
assert_eq!(cases[0].suite, "nornir", "suite from binary path");
assert_eq!(cases[0].name, "warehouse::tests::round_trip");
assert_eq!(cases[0].status, status::PASS);
assert_eq!(cases[1].status, status::FAIL);
assert_eq!(cases[2].status, status::IGNORED);
}
#[test]
fn parse_nextest_libtest_json_events() {
let mut p = Parser::new(Runner::Nextest);
p.feed(r#"{"type":"suite","event":"started","test_count":2}"#);
p.feed(r#"{"type":"test","event":"started","name":"nornir::bin$mod::a"}"#);
p.feed(r#"{"type":"test","event":"ok","name":"nornir::bin$mod::a","exec_time":0.012}"#);
p.feed(r#"{"type":"test","event":"failed","name":"nornir::bin$mod::b","exec_time":0.5,"stdout":"thread 'x' panicked at src/y.rs:3:1:\nassertion `left == right` failed"}"#);
let cases = p.into_cases();
assert_eq!(cases.len(), 2, "only the two terminal events become rows: {cases:?}");
assert_eq!(cases[0].suite, "nornir::bin");
assert_eq!(cases[0].name, "mod::a");
assert_eq!(cases[0].status, status::PASS);
assert!((cases[0].duration_ms - 12.0).abs() < 0.001, "exec_time 0.012s → 12ms");
assert_eq!(cases[1].status, status::FAIL);
assert!(cases[1].message.contains("panicked at"), "failure line captured: {:?}", cases[1].message);
}
#[test]
fn matrix_run_counts_and_green() {
let cases = vec![
TestCase { suite: "s".into(), name: "a".into(), status: status::PASS.into(), duration_ms: 1.0, message: String::new() },
TestCase { suite: "s".into(), name: "b".into(), status: status::FAIL.into(), duration_ms: 2.0, message: "boom".into() },
TestCase { suite: "s".into(), name: "c".into(), status: status::IGNORED.into(), duration_ms: 0.0, message: String::new() },
];
let run = MatrixRun { runner: Runner::CargoTest, cases, stalled: false };
assert_eq!((run.passed(), run.failed(), run.ignored()), (1, 1, 1));
assert!(!run.green(), "a failing case makes the run red");
}
#[test]
fn watchdog_pushes_red_stalled_case() {
let mut p = Parser::new(Runner::CargoTest);
p.feed("test s::slow ... ok");
p.push_stalled(120);
let cases = p.into_cases();
assert_eq!(cases.len(), 2);
let stalled = cases.iter().find(|c| c.status == status::STALLED).unwrap();
assert!(stalled.message.contains("120s"), "stall note carries the threshold");
assert!(status::is_red(&stalled.status), "stalled is a red verdict");
}
#[test]
fn suite_from_path_strips_hash() {
assert_eq!(suite_from_path("target/debug/deps/nornir-3f9a1c00"), "nornir");
assert_eq!(suite_from_path("target/debug/deps/release_pipeline-aabbcc"), "release_pipeline");
assert_eq!(suite_from_path("target/debug/deps/weird_name"), "weird_name");
}
}