use std::io::Read as IoRead;
use std::path::Path;
use std::process::{Command, Stdio};
use std::time::{Duration, Instant};
const DEFAULT_TIMEOUT_SECS: u64 = 30;
const ALLOWED_ENV_NAMES: &[&str] = &[
"PATH",
"HOME",
"USER",
"SHELL",
"LANG",
"TZ",
"TERM",
"TMPDIR",
"TMP",
"TEMP",
"VIRTUAL_ENV",
"PYTHONUNBUFFERED",
"PYTHONDONTWRITEBYTECODE",
"USERNAME",
"USERPROFILE",
"HOMEDRIVE",
"HOMEPATH",
"SYSTEMROOT",
"SYSTEMDRIVE",
"WINDIR",
"COMSPEC",
"PROCESSOR_ARCHITECTURE",
"PROCESSOR_IDENTIFIER",
"NUMBER_OF_PROCESSORS",
"OS",
"PATHEXT",
];
const ALLOWED_ENV_PREFIXES: &[&str] = &["LC_"];
fn is_allowed_env_name(name: &str) -> bool {
let upper = name.to_ascii_uppercase();
if ALLOWED_ENV_NAMES.iter().any(|n| *n == upper) {
return true;
}
ALLOWED_ENV_PREFIXES.iter().any(|p| upper.starts_with(p))
}
#[derive(Debug)]
pub struct ToolResult {
pub success: bool,
pub stdout: String,
pub stderr: String,
pub timed_out: bool,
}
pub fn run_tool(program: &str, args: &[&str], cwd: &Path) -> ToolResult {
run_tool_with_timeout(program, args, cwd, DEFAULT_TIMEOUT_SECS)
}
pub fn run_tool_with_timeout(
program: &str,
args: &[&str],
cwd: &Path,
timeout_secs: u64,
) -> ToolResult {
if !tool_exists(program) {
return ToolResult {
success: false,
stdout: String::new(),
stderr: format!("{} not found in PATH", program),
timed_out: false,
};
}
let mut cmd = Command::new(program);
cmd.args(args)
.current_dir(cwd)
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let allowed: Vec<(String, String)> = std::env::vars()
.filter(|(k, _)| is_allowed_env_name(k))
.collect();
cmd.env_clear();
for (k, v) in allowed {
cmd.env(k, v);
}
let mut child = match cmd.spawn() {
Ok(c) => c,
Err(e) => {
return ToolResult {
success: false,
stdout: String::new(),
stderr: format!("Failed to spawn {}: {}", program, e),
timed_out: false,
}
}
};
let deadline = Instant::now() + Duration::from_secs(timeout_secs);
loop {
match child.try_wait() {
Ok(Some(status)) => {
let mut stdout = String::new();
let mut stderr = String::new();
if let Some(mut out) = child.stdout.take() {
let _ = out.read_to_string(&mut stdout);
}
if let Some(mut err) = child.stderr.take() {
let _ = err.read_to_string(&mut stderr);
}
return ToolResult {
success: status.success(),
stdout,
stderr,
timed_out: false,
};
}
Ok(None) => {
if Instant::now() >= deadline {
let _ = child.kill();
let _ = child.wait();
return ToolResult {
success: false,
stdout: String::new(),
stderr: format!("Killed: timed out after {}s", timeout_secs),
timed_out: true,
};
}
std::thread::sleep(Duration::from_millis(200));
}
Err(e) => {
return ToolResult {
success: false,
stdout: String::new(),
stderr: format!("Failed to wait for {}: {}", program, e),
timed_out: false,
}
}
}
}
}
pub fn run_tool_sandboxed(
program: &str,
args: &[&str],
cwd: &Path,
timeout_secs: u64,
deny_network: bool,
) -> ToolResult {
if !deny_network || !cfg!(target_os = "macos") || !tool_exists("sandbox-exec") {
return run_tool_with_timeout(program, args, cwd, timeout_secs);
}
let profile = "(version 1)\n(allow default)\n(deny network*)";
let mut sbox_args = vec!["-p", profile, program];
sbox_args.extend_from_slice(args);
run_tool_with_timeout("sandbox-exec", &sbox_args, cwd, timeout_secs)
}
pub fn validate_path_within(root: &Path, relative: &str) -> Result<std::path::PathBuf, String> {
use std::path::Component;
let trimmed = relative.trim();
if trimmed.is_empty() {
return Err("Empty path".to_string());
}
if trimmed.contains('\0') {
return Err(format!("Null byte in path: {:?}", trimmed));
}
if trimmed.starts_with('/') || trimmed.starts_with('\\') {
return Err(format!("Absolute path rejected: {}", trimmed));
}
let bytes = trimmed.as_bytes();
if bytes.len() >= 2 && bytes[1] == b':' && bytes[0].is_ascii_alphabetic() {
return Err(format!("Drive-letter path rejected: {}", trimmed));
}
let rel_path = Path::new(trimmed);
if rel_path.is_absolute() {
return Err(format!("Absolute path rejected: {}", trimmed));
}
for comp in rel_path.components() {
match comp {
Component::ParentDir => {
return Err(format!("Parent-dir traversal rejected: {}", trimmed));
}
Component::Prefix(_) | Component::RootDir => {
return Err(format!("Absolute or prefixed path rejected: {}", trimmed));
}
_ => {}
}
}
let joined = root.join(rel_path);
let canon_root = match std::fs::canonicalize(root) {
Ok(c) => c,
Err(_) => return Ok(joined),
};
let canon_joined = match std::fs::canonicalize(&joined) {
Ok(c) => c,
Err(_) => {
let mut probe = joined.clone();
loop {
match probe.parent() {
Some(parent) if parent != probe => {
probe = parent.to_path_buf();
if let Ok(c) = std::fs::canonicalize(&probe) {
break c;
}
}
_ => return Ok(joined),
}
}
}
};
if !canon_joined.starts_with(&canon_root) {
return Err(format!(
"Path escapes root directory (symlink?): {}",
trimmed
));
}
Ok(joined)
}
pub fn tool_exists(program: &str) -> bool {
let path = std::path::Path::new(program);
if path.is_absolute() || program.contains(std::path::MAIN_SEPARATOR) {
return path.exists();
}
let lookup = if cfg!(windows) { "where" } else { "which" };
Command::new(lookup)
.arg(program)
.output()
.map(|o| o.status.success())
.unwrap_or(false)
}
#[derive(Debug, Clone)]
pub struct LintIssue {
pub file: String,
pub line: Option<u32>,
pub column: Option<u32>,
pub severity: String,
pub message: String,
pub rule: String,
}
impl std::fmt::Display for LintIssue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(line) = self.line {
write!(
f,
"{}:{}: [{}] {}",
self.file, line, self.rule, self.message
)
} else {
write!(f, "{}: [{}] {}", self.file, self.rule, self.message)
}
}
}
pub fn parse_ruff_output(output: &str) -> Vec<LintIssue> {
let mut issues = Vec::new();
for line in output.lines() {
let parts: Vec<&str> = line.splitn(4, ':').collect();
if parts.len() >= 4 {
let file = parts[0].trim().to_string();
let line_num = parts[1].trim().parse().ok();
let col = parts[2].trim().parse().ok();
let rest = parts[3].trim();
let (rule, msg) = rest.split_once(' ').unwrap_or(("", rest));
issues.push(LintIssue {
file,
line: line_num,
column: col,
severity: "warning".into(),
message: msg.to_string(),
rule: rule.to_string(),
});
}
}
issues
}
#[derive(Debug)]
pub struct TestResult {
pub passed: u32,
pub failed: u32,
pub errors: u32,
pub output: String,
}
pub fn parse_pytest_output(stdout: &str, stderr: &str) -> TestResult {
let combined = format!("{}\n{}", stdout, stderr);
let mut passed = 0u32;
let mut failed = 0u32;
let mut errors = 0u32;
for line in combined.lines() {
let lower = line.to_lowercase();
let trimmed_lower = lower.trim();
let words: Vec<&str> = line.split_whitespace().collect();
for pair in words.windows(2) {
if let Ok(n) = pair[0]
.trim_matches(|c: char| c == ',' || c == '=')
.parse::<u32>()
{
let what = pair[1].to_lowercase();
if n < 10000 {
if what.starts_with("passed") && passed == 0 {
passed = n;
} else if what.starts_with("failed") && failed == 0 {
failed = n;
} else if what.starts_with("error") && errors == 0 {
errors = n;
}
}
}
}
if trimmed_lower.starts_with("failed ") && trimmed_lower.contains("::") {
failed += 1;
}
if trimmed_lower.starts_with("error ") && trimmed_lower.contains("::") {
errors += 1;
}
if trimmed_lower.ends_with("passed") || trimmed_lower.ends_with("passed.") {
if let Some(n_str) = trimmed_lower.split_whitespace().next() {
if let Ok(n) = n_str.parse::<u32>() {
if passed == 0 && n < 10000 {
passed = n;
}
}
}
}
}
TestResult {
passed,
failed,
errors,
output: combined,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_ruff_output() {
let output =
"app/main.py:10:5: E501 Line too long (120 > 88)\napp/main.py:25:1: F401 Unused import";
let issues = parse_ruff_output(output);
assert_eq!(issues.len(), 2);
assert_eq!(issues[0].line, Some(10));
assert_eq!(issues[0].rule, "E501");
assert_eq!(issues[1].rule, "F401");
}
#[test]
fn test_parse_pytest_summary_line() {
let stdout = "5 passed, 2 failed, 1 error in 3.2s";
let result = parse_pytest_output(stdout, "");
assert_eq!(result.passed, 5);
assert_eq!(result.failed, 2);
assert_eq!(result.errors, 1);
}
#[test]
fn test_parse_pytest_equals_format() {
let stdout = "===== 3 passed in 0.5s =====";
let result = parse_pytest_output(stdout, "");
assert_eq!(result.passed, 3);
}
#[test]
fn test_parse_pytest_q_format() {
let stdout = "3 passed";
let result = parse_pytest_output(stdout, "");
assert_eq!(result.passed, 3);
}
#[test]
fn test_tool_exists() {
assert!(tool_exists("ls"));
assert!(!tool_exists("nonexistent_tool_xyz_12345"));
}
#[test]
fn test_validate_path_safe() {
let root = std::path::Path::new("/tmp/test_root");
assert!(validate_path_within(root, "app/main.py").is_ok());
assert!(validate_path_within(root, "tests/test_foo.py").is_ok());
assert!(validate_path_within(root, "README.md").is_ok());
}
#[test]
fn test_validate_path_traversal_blocked() {
let root = std::path::Path::new("/tmp/test_root");
assert!(validate_path_within(root, "../etc/passwd").is_err());
assert!(validate_path_within(root, "app/../../etc/shadow").is_err());
assert!(validate_path_within(root, "/etc/passwd").is_err());
assert!(validate_path_within(root, "\\windows\\system32").is_err());
assert!(validate_path_within(root, "file\0.py").is_err());
}
#[test]
fn test_env_var_stripping() {
unsafe {
std::env::set_var("TEST_API_KEY", "secret123");
}
let result = run_tool("env", &[], std::path::Path::new("/tmp"));
assert!(
!result.stdout.contains("secret123"),
"API key leaked to subprocess!"
);
unsafe {
std::env::remove_var("TEST_API_KEY");
}
}
#[test]
fn test_timeout_kills_process() {
let result = run_tool_with_timeout("sleep", &["30"], std::path::Path::new("/tmp"), 2);
assert!(result.timed_out, "Process should have timed out");
assert!(!result.success);
}
#[test]
fn test_validate_path_dotdot_in_filename_allowed() {
let root = std::path::Path::new("/tmp/test_root");
assert!(validate_path_within(root, "file..py").is_ok());
assert!(validate_path_within(root, "a..b.txt").is_ok());
assert!(validate_path_within(root, "my.backup..tar").is_ok());
}
#[test]
fn test_validate_path_windows_drive_rejected() {
let root = std::path::Path::new("/tmp/test_root");
assert!(validate_path_within(root, "C:\\windows\\system32").is_err());
assert!(validate_path_within(root, "D:foo").is_err());
}
#[cfg(unix)]
#[test]
fn test_validate_path_symlink_escape_rejected() {
use std::os::unix::fs::symlink;
let root = std::env::temp_dir().join(format!("bcf-symlink-{}", std::process::id()));
std::fs::create_dir_all(&root).unwrap();
let link = root.join("escape");
let _ = std::fs::remove_file(&link);
symlink("/etc", &link).unwrap();
let result = validate_path_within(&root, "escape/passwd");
assert!(
result.is_err(),
"planted-symlink escape should be rejected, got Ok({:?})",
result
);
let _ = std::fs::remove_file(&link);
let _ = std::fs::remove_dir(&root);
}
#[test]
fn test_env_allowlist_keeps_essentials() {
assert!(is_allowed_env_name("PATH"));
assert!(is_allowed_env_name("HOME"));
assert!(is_allowed_env_name("USER"));
assert!(is_allowed_env_name("VIRTUAL_ENV"));
assert!(is_allowed_env_name("LC_ALL"));
assert!(is_allowed_env_name("LC_CTYPE"));
assert!(is_allowed_env_name("path"));
}
#[test]
fn test_env_allowlist_blocks_known_secrets() {
assert!(!is_allowed_env_name("ANTHROPIC_API_KEY"));
assert!(!is_allowed_env_name("XAI_API_KEY"));
assert!(!is_allowed_env_name("BRAVE_API_KEY"));
assert!(!is_allowed_env_name("OPENAI_API_KEY"));
assert!(!is_allowed_env_name("GH_TOKEN"));
assert!(!is_allowed_env_name("GITHUB_TOKEN"));
assert!(!is_allowed_env_name("HF_TOKEN"));
assert!(!is_allowed_env_name("AWS_ACCESS_KEY_ID"));
assert!(!is_allowed_env_name("AWS_SECRET_ACCESS_KEY"));
assert!(!is_allowed_env_name("OLLAMA_HOST"));
assert!(!is_allowed_env_name("KUBECONFIG"));
assert!(!is_allowed_env_name("SSH_AUTH_SOCK"));
assert!(!is_allowed_env_name("DATABASE_URL"));
assert!(!is_allowed_env_name("POSTGRES_URL"));
assert!(!is_allowed_env_name("REDIS_URL"));
}
}