use std::collections::HashSet;
use std::io::{self, IsTerminal, Read};
use std::process::ExitCode;
use std::sync::LazyLock;
use std::time::Duration;
use regex::Regex;
use crate::cmd::{combine_output, user_has_flag};
use crate::output::canonical::{TestEntry, TestOutcome, TestResult, TestSummary};
use crate::output::{strip_ansi, ParseResult};
use crate::runner::{CommandOutput, CommandRunner};
pub(crate) fn run(args: &[String], show_stats: bool) -> anyhow::Result<ExitCode> {
if args.iter().any(|a| matches!(a.as_str(), "--help" | "-h")) {
print_pytest_help();
}
let output = if io::stdin().is_terminal() {
let final_args = build_args(args);
let arg_refs: Vec<&str> = final_args.iter().map(String::as_str).collect();
run_pytest(&arg_refs)?
} else {
let stdin_output = read_stdin()?;
if stdin_output.stdout.trim().is_empty() {
let final_args = build_args(args);
let arg_refs: Vec<&str> = final_args.iter().map(String::as_str).collect();
run_pytest(&arg_refs)?
} else {
stdin_output
}
};
let combined = combine_output(&output);
let cleaned = strip_ansi(&combined);
let result = parse(&cleaned);
emit_result(&result, &output)?;
if show_stats {
let (orig, comp) = crate::process::count_token_pair(&cleaned, result.content());
crate::process::report_token_stats(orig, comp, "");
}
if crate::analytics::is_analytics_enabled() {
crate::analytics::try_record_command(
cleaned,
result.content().to_string(),
format!("skim test pytest {}", args.join(" ")),
crate::analytics::CommandType::Test,
output.duration,
Some(result.tier_name()),
);
}
let code = match output.exit_code {
Some(0) => ExitCode::SUCCESS,
Some(_) => ExitCode::FAILURE,
None => {
match &result {
ParseResult::Full(tr) | ParseResult::Degraded(tr, _) => {
if tr.summary.fail > 0 {
ExitCode::FAILURE
} else {
ExitCode::SUCCESS
}
}
ParseResult::Passthrough(_) => ExitCode::FAILURE,
}
}
};
Ok(code)
}
fn print_pytest_help() {
println!("skim test pytest [ARGS...]");
println!();
println!(" Run pytest and parse its output into a structured summary.");
println!();
println!(" BEHAVIOR:");
println!(" - Injects --tb=short and -q unless you override them");
println!(" - Parses output into PASS/FAIL/SKIP counts with failure details");
println!(" - Supports piped input: pytest ... | skim test pytest");
println!();
println!(" FLAGS MANAGED BY SKIM:");
println!(" --tb=short Injected unless --tb is already set");
println!(" -q Injected unless -q/-v/--quiet/--verbose is set");
println!();
println!("--- pytest native help follows ---");
println!();
}
fn build_args(user_args: &[String]) -> Vec<String> {
let mut args: Vec<String> = user_args.to_vec();
if !user_has_flag(user_args, &["--tb"]) {
args.push("--tb=short".to_string());
}
if !user_has_flag(user_args, &["-q", "--quiet", "-v", "--verbose"]) {
args.push("-q".to_string());
}
args
}
fn run_pytest(args: &[&str]) -> anyhow::Result<CommandOutput> {
let runner = CommandRunner::new(Some(Duration::from_secs(300)));
runner
.run("pytest", args)
.map_err(|e| anyhow::anyhow!("{e}\n\nHint: Is pytest installed? Try: pip install pytest"))
}
const MAX_STDIN_BYTES: usize = 64 * 1024 * 1024;
fn read_stdin() -> anyhow::Result<CommandOutput> {
let mut buf = Vec::new();
let mut chunk = [0u8; 8 * 1024];
let stdin = io::stdin();
let mut handle = stdin.lock();
loop {
let n = handle.read(&mut chunk)?;
if n == 0 {
break;
}
if buf.len() + n > MAX_STDIN_BYTES {
anyhow::bail!("stdin exceeded {} byte limit", MAX_STDIN_BYTES);
}
buf.extend_from_slice(&chunk[..n]);
}
Ok(CommandOutput {
stdout: String::from_utf8_lossy(&buf).into_owned(),
stderr: String::new(),
exit_code: None,
duration: Duration::ZERO,
})
}
fn parse(output: &str) -> ParseResult<TestResult> {
if let Some(result) = tier1_parse(output) {
return ParseResult::Full(result);
}
ParseResult::Passthrough(output.to_string())
}
static SUMMARY_LINE_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"=*\s*(?:\d+\s+(?:passed|failed|skipped|error)(?:,\s+)?)+\s+in\s+([\d.]+)s\s*=*")
.expect("summary line regex is valid")
});
static SUMMARY_PAIR_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"(\d+)\s+(passed|failed|skipped|error)").expect("summary pair regex is valid")
});
struct SummaryCounts {
pass: usize,
fail: usize,
skip: usize,
duration_ms: Option<u64>,
}
fn parse_summary_line(line: &str) -> Option<SummaryCounts> {
let line_caps = SUMMARY_LINE_RE.captures(line)?;
let duration_ms = line_caps.get(1).and_then(|m| {
let secs: f64 = m.as_str().parse().ok()?;
Some((secs * 1000.0) as u64)
});
let mut pass: usize = 0;
let mut fail: usize = 0;
let mut skip: usize = 0;
for caps in SUMMARY_PAIR_RE.captures_iter(line) {
let count: usize = caps[1].parse().unwrap_or(0);
match &caps[2] {
"passed" => pass = count,
"failed" => fail += count,
"skipped" => skip = count,
"error" => fail += count,
_ => {}
}
}
Some(SummaryCounts {
pass,
fail,
skip,
duration_ms,
})
}
fn tier1_parse(output: &str) -> Option<TestResult> {
let mut entries: Vec<TestEntry> = Vec::new();
let mut in_failures = false;
let mut in_summary_info = false;
let mut current_failure_name: Option<String> = None;
let mut current_failure_detail: Vec<String> = Vec::new();
let mut summary_counts: Option<SummaryCounts> = None;
for line in output.lines() {
let trimmed = line.trim();
if let Some(counts) = parse_summary_line(trimmed) {
summary_counts = Some(counts);
continue;
}
if trimmed.starts_with("===") && trimmed.contains("FAILURES") {
in_failures = true;
in_summary_info = false;
continue;
}
if trimmed.starts_with("===") && trimmed.contains("short test summary info") {
in_summary_info = true;
in_failures = false;
flush_failure(
&mut entries,
&mut current_failure_name,
&mut current_failure_detail,
);
continue;
}
if trimmed.starts_with("===") && trimmed.ends_with("===") {
if in_failures {
flush_failure(
&mut entries,
&mut current_failure_name,
&mut current_failure_detail,
);
}
in_failures = false;
in_summary_info = false;
continue;
}
if in_failures {
if trimmed.starts_with('_') && trimmed.ends_with('_') {
flush_failure(
&mut entries,
&mut current_failure_name,
&mut current_failure_detail,
);
let name = trimmed.trim_matches('_').trim().to_string();
if !name.is_empty() {
current_failure_name = Some(name);
}
} else if current_failure_name.is_some() {
current_failure_detail.push(line.to_string());
}
continue;
}
if in_summary_info {
let rest = trimmed
.strip_prefix("FAILED ")
.or_else(|| trimmed.strip_prefix("ERROR "));
if let Some(rest) = rest {
let (name, detail) = if let Some(dash_pos) = rest.find(" - ") {
(
rest[..dash_pos].to_string(),
Some(rest[dash_pos + 3..].to_string()),
)
} else {
(rest.to_string(), None)
};
entries.push(TestEntry {
name,
outcome: TestOutcome::Fail,
detail,
});
}
continue;
}
if trimmed.ends_with(" PASSED") {
let name = trimmed.trim_end_matches(" PASSED").to_string();
entries.push(TestEntry {
name,
outcome: TestOutcome::Pass,
detail: None,
});
} else if trimmed.ends_with(" FAILED") {
let name = trimmed.trim_end_matches(" FAILED").to_string();
entries.push(TestEntry {
name,
outcome: TestOutcome::Fail,
detail: None,
});
} else if trimmed.ends_with(" SKIPPED") {
let name = trimmed.trim_end_matches(" SKIPPED").to_string();
entries.push(TestEntry {
name,
outcome: TestOutcome::Skip,
detail: None,
});
}
}
flush_failure(
&mut entries,
&mut current_failure_name,
&mut current_failure_detail,
);
let counts = summary_counts?;
let mut seen = HashSet::new();
entries.retain(|e| seen.insert(e.name.clone()));
let summary = TestSummary {
pass: counts.pass,
fail: counts.fail,
skip: counts.skip,
duration_ms: counts.duration_ms,
};
Some(TestResult::new(summary, entries))
}
fn flush_failure(
entries: &mut Vec<TestEntry>,
name: &mut Option<String>,
detail_lines: &mut Vec<String>,
) {
if let Some(test_name) = name.take() {
let detail = if detail_lines.is_empty() {
None
} else {
let trimmed: Vec<&str> = detail_lines
.iter()
.map(|l| l.trim())
.filter(|l| !l.is_empty())
.collect();
if trimmed.is_empty() {
None
} else {
Some(trimmed.join("\n"))
}
};
entries.push(TestEntry {
name: test_name,
outcome: TestOutcome::Fail,
detail,
});
detail_lines.clear();
}
}
fn emit_result(result: &ParseResult<TestResult>, output: &CommandOutput) -> anyhow::Result<()> {
use std::io::Write;
let stdout = io::stdout();
let stderr = io::stderr();
let mut out = stdout.lock();
let mut err = stderr.lock();
match result {
ParseResult::Full(tr) => {
writeln!(out, "{tr}")?;
}
ParseResult::Degraded(tr, _markers) => {
writeln!(out, "{tr}")?;
result.emit_markers(&mut err)?;
}
ParseResult::Passthrough(raw) => {
write!(out, "{raw}")?;
result.emit_markers(&mut err)?;
}
}
if !output.stderr.is_empty() && !result.is_passthrough() {
write!(err, "{}", output.stderr)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn load_fixture(name: &str) -> String {
let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("fixtures")
.join("cmd")
.join("test")
.join(name);
std::fs::read_to_string(&path)
.unwrap_or_else(|e| panic!("Failed to load fixture {}: {e}", path.display()))
}
#[test]
fn test_tier1_all_pass() {
let input = load_fixture("pytest_pass.txt");
let result = parse(&input);
assert!(
result.is_full(),
"expected Full, got {:?}",
result.tier_name()
);
if let ParseResult::Full(tr) = &result {
assert_eq!(tr.summary.pass, 5, "expected 5 passed");
assert_eq!(tr.summary.fail, 0, "expected 0 failed");
assert_eq!(tr.summary.skip, 0, "expected 0 skipped");
}
}
#[test]
fn test_tier1_with_failures() {
let input = load_fixture("pytest_fail.txt");
let result = parse(&input);
assert!(
result.is_full(),
"expected Full, got {:?}",
result.tier_name()
);
if let ParseResult::Full(tr) = &result {
assert_eq!(tr.summary.pass, 2, "expected 2 passed");
assert!(tr.summary.fail > 0, "expected failures");
assert_eq!(tr.summary.fail, 1, "expected 1 failed");
let fail_entries: Vec<_> = tr
.entries
.iter()
.filter(|e| e.outcome == TestOutcome::Fail)
.collect();
assert!(
!fail_entries.is_empty(),
"expected at least one FAIL entry, got none"
);
}
}
#[test]
fn test_tier1_mixed() {
let input = load_fixture("pytest_mixed.txt");
let result = parse(&input);
assert!(
result.is_full(),
"expected Full, got {:?}",
result.tier_name()
);
if let ParseResult::Full(tr) = &result {
assert_eq!(tr.summary.pass, 4, "expected 4 passed");
assert_eq!(tr.summary.fail, 1, "expected 1 failed");
assert_eq!(tr.summary.skip, 1, "expected 1 skipped");
}
}
#[test]
fn test_passthrough() {
let input = "totally unrelated output\nno pytest here";
let result = parse(input);
assert!(
result.is_passthrough(),
"expected Passthrough, got {:?}",
result.tier_name()
);
}
#[test]
fn test_flag_injection_skipped_with_verbose() {
let user_args: Vec<String> = vec!["-v".to_string(), "tests/".to_string()];
let built = build_args(&user_args);
assert!(
!built.contains(&"-q".to_string()),
"-q should not be injected when -v is present: {built:?}"
);
assert!(
built.contains(&"--tb=short".to_string()),
"--tb=short should be injected: {built:?}"
);
}
#[test]
fn test_flag_injection_skipped_with_tb() {
let user_args: Vec<String> = vec!["--tb=long".to_string()];
let built = build_args(&user_args);
assert!(
!built.contains(&"--tb=short".to_string()),
"--tb=short should not be injected when --tb=long is present: {built:?}"
);
assert!(
built.contains(&"-q".to_string()),
"-q should be injected: {built:?}"
);
}
#[test]
fn test_flag_injection_default() {
let user_args: Vec<String> = vec!["tests/".to_string()];
let built = build_args(&user_args);
assert!(
built.contains(&"--tb=short".to_string()),
"--tb=short should be injected by default: {built:?}"
);
assert!(
built.contains(&"-q".to_string()),
"-q should be injected by default: {built:?}"
);
}
#[test]
fn test_flag_injection_skipped_with_quiet() {
let user_args: Vec<String> = vec!["--quiet".to_string()];
let built = build_args(&user_args);
assert!(
!built.contains(&"-q".to_string()),
"-q should not be injected when --quiet is present: {built:?}"
);
}
#[test]
fn test_user_has_flag_exact_match() {
let args = vec!["-v".to_string(), "tests/".to_string()];
assert!(user_has_flag(&args, &["-v"]));
}
#[test]
fn test_user_has_flag_with_equals() {
let args = vec!["--tb=long".to_string()];
assert!(user_has_flag(&args, &["--tb"]));
}
#[test]
fn test_user_has_flag_not_present() {
let args = vec!["tests/".to_string()];
assert!(!user_has_flag(&args, &["-v", "--verbose"]));
}
#[test]
fn test_summary_passed_only() {
let line =
"============================== 5 passed in 0.12s ===============================";
let counts = parse_summary_line(line).expect("should match");
assert_eq!(counts.pass, 5);
assert_eq!(counts.fail, 0);
assert_eq!(counts.skip, 0);
assert_eq!(counts.duration_ms, Some(120));
}
#[test]
fn test_summary_all_groups() {
let line = "======= 10 passed, 2 failed, 3 skipped, 1 error in 1.50s =======";
let counts = parse_summary_line(line).expect("should match");
assert_eq!(counts.pass, 10);
assert_eq!(counts.fail, 3); assert_eq!(counts.skip, 3);
assert_eq!(counts.duration_ms, Some(1500));
}
#[test]
fn test_summary_no_match_on_garbage() {
assert!(parse_summary_line("hello world").is_none());
}
#[test]
fn test_summary_failed_only_no_passed() {
let line = "=== 3 failed in 0.15s ===";
let counts = parse_summary_line(line).expect("should match failed-only summary");
assert_eq!(counts.pass, 0, "no passed tests");
assert_eq!(counts.fail, 3, "3 failed tests");
assert_eq!(counts.skip, 0, "no skipped tests");
assert_eq!(counts.duration_ms, Some(150));
}
#[test]
fn test_summary_failed_and_error_no_passed() {
let line = "=== 1 failed, 2 error in 0.30s ===";
let counts = parse_summary_line(line).expect("should match failed+error summary");
assert_eq!(counts.pass, 0);
assert_eq!(counts.fail, 3); assert_eq!(counts.skip, 0);
assert_eq!(counts.duration_ms, Some(300));
}
#[test]
fn test_summary_duration_extraction() {
let line = "============== 4 passed, 1 failed, 1 skipped in 0.20s =============";
let counts = parse_summary_line(line).expect("should match");
assert_eq!(counts.duration_ms, Some(200));
}
#[test]
fn test_summary_quiet_mode_no_equals() {
let line = "2 passed in 0.00s";
let counts = parse_summary_line(line).expect("should match quiet mode");
assert_eq!(counts.pass, 2);
assert_eq!(counts.fail, 0);
assert_eq!(counts.duration_ms, Some(0));
}
#[test]
fn test_tier1_all_failures() {
let input = load_fixture("pytest_all_fail.txt");
let result = parse(&input);
assert!(
result.is_full(),
"expected Full for all-failures output, got {:?}",
result.tier_name()
);
if let ParseResult::Full(tr) = &result {
assert_eq!(tr.summary.pass, 0, "expected 0 passed");
assert_eq!(tr.summary.fail, 3, "expected 3 failed");
assert_eq!(tr.summary.skip, 0, "expected 0 skipped");
assert!(
tr.summary.duration_ms.is_some(),
"duration should be extracted"
);
let fail_entries: Vec<_> = tr
.entries
.iter()
.filter(|e| e.outcome == TestOutcome::Fail)
.collect();
assert!(
!fail_entries.is_empty(),
"expected at least one FAIL entry for all-failures fixture"
);
}
}
#[test]
fn test_tier1_extracts_duration() {
let input = load_fixture("pytest_pass.txt");
let result = parse(&input);
if let ParseResult::Full(tr) = &result {
assert!(
tr.summary.duration_ms.is_some(),
"duration_ms should be populated from summary line"
);
assert_eq!(tr.summary.duration_ms, Some(120));
}
}
#[test]
fn test_summary_line_extracts_duration() {
let input = "============== 4 passed, 1 failed, 1 skipped in 0.20s ==============";
let counts = parse_summary_line(input);
assert!(counts.is_some());
let counts = counts.unwrap();
assert_eq!(counts.duration_ms, Some(200));
}
#[test]
fn test_tier1_deduplicates_entries() {
let input = "\
tests/test_a.py::test_one PASSED
tests/test_b.py::test_two FAILED
=========================== short test summary info ============================
FAILED tests/test_b.py::test_two - assert 1 == 2
========================= 1 passed, 1 failed in 0.10s =========================";
let result = parse(input);
if let ParseResult::Full(tr) = &result {
let fail_entries: Vec<_> = tr
.entries
.iter()
.filter(|e| e.outcome == TestOutcome::Fail)
.collect();
assert_eq!(
fail_entries.len(),
1,
"test_two should be deduplicated to a single entry, got {}",
fail_entries.len()
);
assert_eq!(fail_entries[0].name, "tests/test_b.py::test_two");
}
}
}