use std::io::Read;
use std::path::Path;
use std::process::{Command, Stdio};
use std::time::{Duration, Instant};
const DEFAULT_TIMEOUT_SECS: u64 = 30;
#[derive(Debug, Clone)]
pub struct CommandResult {
pub success: bool,
pub stdout: String,
pub stderr: String,
pub timed_out: bool,
pub exit_code: Option<i32>,
}
#[derive(Debug, Clone)]
pub struct TestResult {
pub all_passed: bool,
pub passed: u32,
pub failed: u32,
pub errors: u32,
pub error_output: String,
}
pub fn run_command_with_timeout(
program: &str,
args: &[&str],
timeout_secs: u64,
cwd: Option<&Path>,
) -> CommandResult {
let mut cmd = Command::new(program);
cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped());
if let Some(dir) = cwd {
cmd.current_dir(dir);
}
let mut child = match cmd.spawn() {
Ok(c) => c,
Err(e) => {
return CommandResult {
success: false,
stdout: String::new(),
stderr: format!("failed to spawn `{program}`: {e}"),
timed_out: false,
exit_code: None,
};
}
};
let stdout_reader = spawn_pipe_reader(child.stdout.take());
let stderr_reader = spawn_pipe_reader(child.stderr.take());
let start = Instant::now();
let timeout = Duration::from_secs(timeout_secs);
loop {
match child.try_wait() {
Ok(Some(status)) => {
return CommandResult {
success: status.success(),
stdout: join_reader(stdout_reader),
stderr: join_reader(stderr_reader),
timed_out: false,
exit_code: status.code(),
};
}
Ok(None) => {
if start.elapsed() > timeout {
let _ = child.kill();
let _ = child.wait();
let stdout = join_reader(stdout_reader);
let mut stderr = join_reader(stderr_reader);
if !stderr.is_empty() && !stderr.ends_with('\n') {
stderr.push('\n');
}
use std::fmt::Write as _;
let _ = write!(stderr, "timed out after {timeout_secs}s");
return CommandResult {
success: false,
stdout,
stderr,
timed_out: true,
exit_code: None,
};
}
std::thread::sleep(Duration::from_millis(100));
}
Err(e) => {
return CommandResult {
success: false,
stdout: join_reader(stdout_reader),
stderr: format!(
"{}\ntry_wait error: {e}",
join_reader(stderr_reader).trim_end()
),
timed_out: false,
exit_code: None,
};
}
}
}
}
fn spawn_pipe_reader<P>(pipe: Option<P>) -> std::thread::JoinHandle<String>
where
P: Read + Send + 'static,
{
std::thread::spawn(move || {
let Some(mut r) = pipe else {
return String::new();
};
let mut buf = String::new();
let _ = r.read_to_string(&mut buf);
buf
})
}
fn join_reader(handle: std::thread::JoinHandle<String>) -> String {
handle.join().unwrap_or_default()
}
pub fn run_python_syntax_check(path: &Path) -> CommandResult {
let path_str = path.to_string_lossy();
run_command_with_timeout(
"python",
&["-m", "py_compile", &path_str],
DEFAULT_TIMEOUT_SECS,
None,
)
}
pub fn run_python_unittest(path: &Path) -> TestResult {
let parent = path.parent().unwrap_or(Path::new("."));
let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
let result = run_command_with_timeout(
"python",
&["-m", "unittest", stem, "-v"],
DEFAULT_TIMEOUT_SECS,
Some(parent),
);
parse_unittest_output(&result)
}
pub fn parse_unittest_output(result: &CommandResult) -> TestResult {
let combined = format!("{}\n{}", result.stdout, result.stderr);
let mut passed: u32 = 0;
let mut failed: u32 = 0;
let mut errors: u32 = 0;
for line in combined.lines() {
let trimmed = line.trim_end();
if trimmed.ends_with("... ok") {
passed += 1;
} else if trimmed.ends_with("... FAIL") {
failed += 1;
} else if trimmed.ends_with("... ERROR") {
errors += 1;
}
}
if passed == 0 && failed == 0 && errors == 0 && !result.success {
for line in combined.lines() {
if line.starts_with("FAILED") {
for segment in line.split(|c: char| !c.is_ascii_digit()) {
if let Ok(n) = segment.parse::<u32>() {
if n > 0 && failed == 0 {
failed = n;
}
}
}
break;
}
}
if failed == 0 {
failed = 1; }
}
let all_passed = result.success && failed == 0 && errors == 0;
TestResult {
all_passed,
passed,
failed,
errors,
error_output: if all_passed { String::new() } else { combined },
}
}
#[must_use]
pub fn has_python_tests(content: &str) -> bool {
content.contains("def test_")
|| content.contains("class Test")
|| content.contains("import unittest")
|| content.contains("from unittest")
}
#[derive(Debug, Clone)]
pub struct ImportCheckResult {
pub missing: Vec<String>,
pub check_error: String,
}
pub fn check_python_imports(path: &Path) -> ImportCheckResult {
const SCRIPT: &str = "\
import sys, ast\n\
if len(sys.argv) < 2:\n\
sys.exit(0)\n\
try:\n\
with open(sys.argv[1], 'r', encoding='utf-8') as f:\n\
tree = ast.parse(f.read())\n\
except Exception:\n\
sys.exit(0)\n\
missing = []\n\
seen = set()\n\
def try_mod(mod):\n\
if mod in seen:\n\
return\n\
seen.add(mod)\n\
try:\n\
__import__(mod)\n\
except ImportError:\n\
missing.append(mod)\n\
except Exception:\n\
pass\n\
for node in ast.walk(tree):\n\
if isinstance(node, ast.Import):\n\
for alias in node.names:\n\
try_mod(alias.name.split('.')[0])\n\
elif isinstance(node, ast.ImportFrom):\n\
if node.level == 0 and node.module:\n\
try_mod(node.module.split('.')[0])\n\
if missing:\n\
print(','.join(missing))\n\
sys.exit(1)\n\
sys.exit(0)\n";
let path_str = path.to_string_lossy();
let result = run_command_with_timeout(
"python",
&["-c", SCRIPT, &path_str],
DEFAULT_TIMEOUT_SECS,
None,
);
if result.success {
return ImportCheckResult {
missing: Vec::new(),
check_error: String::new(),
};
}
if result.exit_code == Some(1) && !result.stdout.trim().is_empty() {
let missing = result
.stdout
.trim()
.split(',')
.filter(|s| !s.is_empty())
.map(str::to_string)
.collect();
return ImportCheckResult {
missing,
check_error: String::new(),
};
}
ImportCheckResult {
missing: Vec::new(),
check_error: if result.timed_out {
"import check timed out".to_string()
} else {
format!("import check failed: {}", result.stderr.trim())
},
}
}
pub fn run_rust_syntax_check(path: &Path) -> CommandResult {
let path_str = path.to_string_lossy();
let tmp = std::env::temp_dir().join("claudette-rustc");
let _ = std::fs::create_dir_all(&tmp);
let out_dir = tmp.to_string_lossy();
run_command_with_timeout(
"rustc",
&[
"--edition",
"2021",
"--crate-type",
"lib",
"--out-dir",
&out_dir,
&path_str,
],
DEFAULT_TIMEOUT_SECS,
None,
)
}
#[must_use]
pub fn has_rust_tests(content: &str) -> bool {
content.contains("#[test]") || content.contains("#[cfg(test)]")
}
pub fn run_js_syntax_check(path: &Path) -> CommandResult {
let path_str = path.to_string_lossy();
run_command_with_timeout("node", &["--check", &path_str], DEFAULT_TIMEOUT_SECS, None)
}
pub fn run_ts_syntax_check(path: &Path) -> CommandResult {
let path_str = path.to_string_lossy();
run_command_with_timeout(
"npx",
&["tsc", "--noEmit", "--strict", &path_str],
DEFAULT_TIMEOUT_SECS,
None,
)
}
#[cfg(test)]
mod tests {
use super::*;
fn cmd_result(success: bool, stdout: &str, stderr: &str) -> CommandResult {
CommandResult {
success,
stdout: stdout.to_string(),
stderr: stderr.to_string(),
timed_out: false,
exit_code: if success { Some(0) } else { Some(1) },
}
}
#[test]
fn parse_unittest_all_pass() {
let result = cmd_result(
true,
"",
"test_greet (__main__.TestUser.test_greet) ... ok\n\
test_get_name (__main__.TestUser.test_get_name) ... ok\n\
test_is_adult (__main__.TestUser.test_is_adult) ... ok\n\
\n----------------------------------------------------------------------\n\
Ran 3 tests in 0.001s\n\n\
OK\n",
);
let tr = parse_unittest_output(&result);
assert!(tr.all_passed);
assert_eq!(tr.passed, 3);
assert_eq!(tr.failed, 0);
assert_eq!(tr.errors, 0);
assert!(tr.error_output.is_empty());
}
#[test]
fn parse_unittest_one_failure() {
let result = cmd_result(
false,
"",
"test_greet (__main__.TestUser.test_greet) ... ok\n\
test_get_age (__main__.TestUser.test_get_age) ... FAIL\n\
test_is_adult (__main__.TestUser.test_is_adult) ... ok\n\
\n======================================================================\n\
FAIL: test_get_age (__main__.TestUser.test_get_age)\n\
AssertionError: 40 != 4\n\
\n----------------------------------------------------------------------\n\
Ran 3 tests in 0.001s\n\n\
FAILED (failures=1)\n",
);
let tr = parse_unittest_output(&result);
assert!(!tr.all_passed);
assert_eq!(tr.passed, 2);
assert_eq!(tr.failed, 1);
assert_eq!(tr.errors, 0);
assert!(tr.error_output.contains("FAIL"));
assert!(tr.error_output.contains("40 != 4"));
}
#[test]
fn parse_unittest_mixed_fail_and_error() {
let result = cmd_result(
false,
"",
"test_a ... ok\n\
test_b ... FAIL\n\
test_c ... ERROR\n\
test_d ... ok\n\
\nRan 4 tests in 0.002s\n\n\
FAILED (failures=1, errors=1)\n",
);
let tr = parse_unittest_output(&result);
assert!(!tr.all_passed);
assert_eq!(tr.passed, 2);
assert_eq!(tr.failed, 1);
assert_eq!(tr.errors, 1);
}
#[test]
fn parse_unittest_empty_output_nonzero_exit() {
let result = cmd_result(false, "", "");
let tr = parse_unittest_output(&result);
assert!(!tr.all_passed);
assert_eq!(
tr.failed, 1,
"should infer at least 1 failure from exit code"
);
}
#[test]
fn parse_unittest_success_no_verbose_lines() {
let result = cmd_result(true, "", "Ran 5 tests in 0.001s\n\nOK\n");
let tr = parse_unittest_output(&result);
assert!(tr.all_passed);
assert_eq!(tr.passed, 0, "can't count without verbose lines");
assert_eq!(tr.failed, 0);
}
#[test]
fn has_python_tests_detects_patterns() {
assert!(has_python_tests("def test_foo(): pass"));
assert!(has_python_tests("class TestUser(unittest.TestCase):"));
assert!(has_python_tests("import unittest"));
assert!(has_python_tests("from unittest import TestCase"));
}
#[test]
fn has_python_tests_returns_false_for_plain_code() {
assert!(!has_python_tests("def greet(): pass"));
assert!(!has_python_tests("class User:\n pass"));
assert!(!has_python_tests("x = 42"));
}
fn write_temp_py(tag: &str, body: &str) -> std::path::PathBuf {
let path = std::env::temp_dir().join(format!("claudette-import-check-{tag}.py"));
std::fs::write(&path, body).expect("write temp file");
path
}
#[test]
fn check_python_imports_allows_stdlib_only() {
let path = write_temp_py(
"stdlib",
"import os\nimport sys\nfrom pathlib import Path\n",
);
let result = check_python_imports(&path);
let _ = std::fs::remove_file(&path);
if result.check_error.is_empty() {
assert!(
result.missing.is_empty(),
"stdlib imports should resolve, got missing: {:?}",
result.missing
);
}
}
#[test]
fn check_python_imports_flags_obvious_miss() {
let path = write_temp_py("miss", "import claudette_definitely_not_a_real_module\n");
let result = check_python_imports(&path);
let _ = std::fs::remove_file(&path);
if result.check_error.is_empty() {
assert_eq!(
result.missing,
vec!["claudette_definitely_not_a_real_module".to_string()],
"expected the bogus module name, got missing={:?}",
result.missing,
);
}
}
#[test]
fn run_command_drains_large_output_without_timeout() {
let body = "import sys; sys.stdout.write('x' * 200_000); sys.stdout.flush()";
let result = run_command_with_timeout("python", &["-c", body], 10, None);
if !result.success
&& result.exit_code.is_none()
&& result.stderr.starts_with("failed to spawn")
{
eprintln!("skipping: python not on PATH");
return;
}
assert!(!result.timed_out, "should not time out: {result:?}");
assert!(result.success, "child should exit 0: {result:?}");
assert_eq!(result.stdout.len(), 200_000);
}
#[test]
fn check_python_imports_skips_relative_imports() {
let path = write_temp_py("relative", "from . import sibling\nimport os\n");
let result = check_python_imports(&path);
let _ = std::fs::remove_file(&path);
if result.check_error.is_empty() {
assert!(
result.missing.is_empty(),
"relative imports should be skipped, got missing: {:?}",
result.missing,
);
}
}
}