mod annotation;
mod evidence;
mod predict;
use crate::detectors::ast_fingerprint::parse_root_ext;
use crate::detectors::ast_walk::AstWalkCtx;
use crate::detectors::base::Detector;
use crate::detectors::fast_search::{
contains_any, find_in, FIND_CHILD_PROCESS, FIND_COMMANDS_GETOUTPUT,
FIND_COMMANDS_GETSTATUSOUTPUT, FIND_EXEC_ASYNC, FIND_EXEC_COMMAND, FIND_EXEC_PAREN,
FIND_EXEC_SYNC, FIND_GETOUTPUT_PAREN, FIND_OS_EXEC_IMPORT, FIND_OS_POPEN, FIND_OS_SYSTEM,
FIND_PASSTHRU_PAREN, FIND_POPEN_PAREN, FIND_PROCESS_BUILDER, FIND_PROC_OPEN, FIND_PTY_SPAWN,
FIND_RUNTIME_GETRUNTIME, FIND_SHELLJS, FIND_SHELL_EXEC, FIND_SHELL_TRUE, FIND_SHELL_TRUE_JS,
FIND_SPAWN_SYNC, FIND_SUBPROCESS, FIND_SYSCALL_DOT, FIND_SYSTEM_PAREN,
};
use crate::detectors::security::ast_helpers::{
collect_named_args, node_text, python_kwarg_truthy,
receiver_chain_label as receiver_chain_label_shared, receiver_chain_label_go, unwrap_callee,
};
use crate::detectors::security::scan_inputs::{ScanAstInputs, ScanInputs};
use crate::detectors::taint::{TaintAnalysisResult, TaintAnalyzer, TaintCategory};
use crate::graph::GraphQueryExt;
use crate::models::{Finding, Severity};
use crate::parsers::lightweight::Language;
use anyhow::Result;
use regex::Regex;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::LazyLock;
const SUPPORTED_EXTS: &[&str] = &[
"py", "js", "ts", "jsx", "tsx", "go", "rb", "php", "java", "sh",
];
const AST_EXTS: &[&str] = &["py", "js", "ts", "jsx", "tsx", "go"];
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum CommandArgKind {
StaticLiteral,
StaticList,
Interpolated,
UserVariable,
MixedListVarArgv0,
MixedListLiteralArgv0,
FunctionLike,
Unknown,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum CommandApi {
PyOsSystem,
PyOsPopen,
PySubprocessNoShell,
PySubprocessShellC,
PySubprocessShell,
PySubprocessGetOutput,
PyPtySpawn,
PyCommandsGetOutput,
JsChildProcessExec,
JsChildProcessExecFile,
JsChildProcessSpawn,
JsSpawnShellC,
JsChildProcessFork,
JsShellJsExec,
GoExecCommandShellC,
GoExecCommand,
GoSyscallExec,
}
impl CommandApi {
pub(super) fn callee_label(self) -> &'static str {
match self {
CommandApi::PyOsSystem => "os.system",
CommandApi::PyOsPopen => "os.popen",
CommandApi::PySubprocessNoShell => "subprocess",
CommandApi::PySubprocessShellC => "subprocess.run([\"sh\", \"-c\", ...])",
CommandApi::PySubprocessShell => "subprocess (shell=True)",
CommandApi::PySubprocessGetOutput => "subprocess.getoutput",
CommandApi::PyPtySpawn => "pty.spawn",
CommandApi::PyCommandsGetOutput => "commands.getoutput",
CommandApi::JsChildProcessExec => "child_process.exec",
CommandApi::JsChildProcessExecFile => "child_process.execFile",
CommandApi::JsChildProcessSpawn => "child_process.spawn",
CommandApi::JsSpawnShellC => "child_process.spawn(\"sh\", [\"-c\", ...])",
CommandApi::JsChildProcessFork => "child_process.fork",
CommandApi::JsShellJsExec => "shelljs.exec",
CommandApi::GoExecCommandShellC => "exec.Command(\"sh\", \"-c\", ...)",
CommandApi::GoExecCommand => "exec.Command",
CommandApi::GoSyscallExec => "syscall.Exec",
}
}
pub(super) fn is_python(self) -> bool {
matches!(
self,
CommandApi::PyOsSystem
| CommandApi::PyOsPopen
| CommandApi::PySubprocessNoShell
| CommandApi::PySubprocessShellC
| CommandApi::PySubprocessShell
| CommandApi::PySubprocessGetOutput
| CommandApi::PyPtySpawn
| CommandApi::PyCommandsGetOutput
)
}
fn is_shell_api(self) -> bool {
matches!(
self,
CommandApi::PyOsSystem
| CommandApi::PyOsPopen
| CommandApi::PySubprocessShell
| CommandApi::PySubprocessShellC
| CommandApi::PySubprocessGetOutput
| CommandApi::PyCommandsGetOutput
| CommandApi::JsChildProcessExec
| CommandApi::JsSpawnShellC
| CommandApi::JsShellJsExec
| CommandApi::GoExecCommandShellC
)
}
pub(super) fn severity_for(self, kind: CommandArgKind, literal_text: Option<&str>) -> Severity {
if matches!(
self,
CommandApi::GoExecCommandShellC
| CommandApi::PySubprocessShellC
| CommandApi::JsSpawnShellC
) {
return Severity::Critical;
}
if self.is_shell_api() && kind == CommandArgKind::StaticLiteral {
if let Some(text) = literal_text {
if !shell_metachars_in(text).is_empty() {
return Severity::Medium;
}
}
}
if self.is_shell_api() {
match kind {
CommandArgKind::StaticLiteral => Severity::Low,
CommandArgKind::StaticList => Severity::Low,
CommandArgKind::Interpolated | CommandArgKind::UserVariable => Severity::Critical,
CommandArgKind::MixedListVarArgv0 => Severity::Critical,
CommandArgKind::MixedListLiteralArgv0 => Severity::High,
CommandArgKind::FunctionLike => Severity::Low,
CommandArgKind::Unknown => Severity::High,
}
} else {
match kind {
CommandArgKind::StaticLiteral => Severity::Low,
CommandArgKind::StaticList => Severity::Low,
CommandArgKind::Interpolated | CommandArgKind::UserVariable => Severity::High,
CommandArgKind::MixedListVarArgv0 => Severity::Critical,
CommandArgKind::MixedListLiteralArgv0 => Severity::Low,
CommandArgKind::FunctionLike => Severity::Low,
CommandArgKind::Unknown => Severity::High,
}
}
}
}
pub struct CommandInjectionDetector {
repository_path: PathBuf,
max_findings: usize,
taint_analyzer: TaintAnalyzer,
precomputed_cross: std::sync::OnceLock<Vec<crate::detectors::taint::TaintPath>>,
precomputed_intra: std::sync::OnceLock<Vec<crate::detectors::taint::TaintPath>>,
}
impl CommandInjectionDetector {
pub fn new(repository_path: impl Into<PathBuf>) -> Self {
Self {
repository_path: repository_path.into(),
max_findings: 50,
taint_analyzer: TaintAnalyzer::new(),
precomputed_cross: std::sync::OnceLock::new(),
precomputed_intra: std::sync::OnceLock::new(),
}
}
fn relative_path(&self, path: &Path) -> PathBuf {
crate::detectors::detector_relative_path(&self.repository_path, path)
}
fn scan_file_ast(&self, inputs: &ScanAstInputs<'_>, flag_on: bool) -> Vec<Finding> {
let path = inputs.path();
let content = inputs.content();
let ext = inputs.ext();
let lang = inputs.lang;
let cached_tree = inputs.cached_tree;
let mut findings = vec![];
if content.contains('\0') {
return findings;
}
let owned;
let root = match cached_tree {
Some(tree) => tree.root_node(),
None => match parse_root_ext(content, lang, ext) {
Some(t) => {
owned = t;
owned.root_node()
}
None => return findings,
},
};
let bytes = content.as_bytes();
let lines: Vec<&str> = content.lines().collect();
let mut sites: Vec<CommandSite> = Vec::new();
let go_aliases = if matches!(lang, Language::Go) {
collect_go_import_aliases(root, bytes)
} else {
GoImportAliases::default()
};
let py_aliases = if matches!(lang, Language::Python) {
super::python_imports::collect_python_from_imports(root, bytes)
} else {
HashMap::new()
};
let py_module_aliases = if matches!(lang, Language::Python) {
super::python_imports::collect_python_module_aliases(root, bytes)
} else {
HashMap::new()
};
let ctx = AstWalkCtx {
lang,
source: bytes,
};
let aliases = super::python_imports::PythonAliases::new(&py_aliases, &py_module_aliases);
collect_command_sites(&ctx, root, &go_aliases, &aliases, &mut sites);
for site in sites {
if findings.len() >= self.max_findings {
break;
}
let line_idx = site.call_node.start_position().row;
if let Some(line) = lines.get(line_idx) {
let prev = if line_idx > 0 {
Some(lines[line_idx - 1])
} else {
None
};
if crate::detectors::is_line_suppressed(line, prev) {
continue;
}
}
let snippet = lines.get(line_idx).map(|s| s.trim()).unwrap_or("");
let line_num = (line_idx + 1) as u32;
if flag_on && matches!(lang, Language::Python) && site.api.is_python() {
findings.push(self.build_dual_branch_python_finding(
path,
line_num,
site.api,
site.arg_kind,
snippet,
site.call_node,
bytes,
&lines,
site.target_text.as_deref(),
ext,
));
continue;
}
let severity = site
.api
.severity_for(site.arg_kind, site.target_text.as_deref());
findings.push(self.build_finding(
path,
line_num,
site.api,
site.arg_kind,
severity,
snippet,
ext,
site.target_text.as_deref(),
));
}
findings
}
fn scan_file_line(&self, inputs: &ScanInputs<'_>) -> Vec<Finding> {
let path = inputs.path;
let content = inputs.content;
let ext = inputs.ext;
let mut findings = vec![];
if content.len() > 500_000 {
return findings;
}
let lines: Vec<&str> = content.lines().collect();
for (i, line) in lines.iter().enumerate() {
if findings.len() >= self.max_findings {
break;
}
let prev = if i > 0 { Some(lines[i - 1]) } else { None };
if crate::detectors::is_line_suppressed(line, prev) {
continue;
}
let trimmed = line.trim_start();
if trimmed.starts_with('#') || trimmed.starts_with("//") {
continue;
}
if let Some((api, arg_kind)) = match_line_command(line, ext) {
let line_num = (i + 1) as u32;
let severity = api.severity_for(arg_kind, None);
findings.push(self.build_finding(
path,
line_num,
api,
arg_kind,
severity,
line.trim(),
ext,
None,
));
}
}
findings
}
fn build_finding(
&self,
path: &Path,
line_num: u32,
api: CommandApi,
arg_kind: CommandArgKind,
severity: Severity,
snippet: &str,
ext: &str,
literal_text: Option<&str>,
) -> Finding {
let api_name = api.callee_label();
let metachars = literal_text
.filter(|_| api.is_shell_api() && arg_kind == CommandArgKind::StaticLiteral)
.map(|t| shell_metachars_in(t))
.unwrap_or_default();
let title = if let Some(first) = metachars.first() {
format!(
"Potential command injection via {} (shell metacharacter `{}` in literal)",
api_name, first,
)
} else {
format!("Potential command injection via {}", api_name)
};
let arg_desc = match arg_kind {
CommandArgKind::StaticLiteral if !metachars.is_empty() => {
"static string literal containing shell metacharacters (RCE risk)"
}
CommandArgKind::StaticLiteral => "static string literal (low risk)",
CommandArgKind::StaticList => "list of static literals (low risk)",
CommandArgKind::Interpolated => "string with variable interpolation (RCE risk)",
CommandArgKind::UserVariable => "user-controlled expression (RCE risk)",
CommandArgKind::MixedListVarArgv0 => "list whose argv[0] is variable (RCE risk)",
CommandArgKind::MixedListLiteralArgv0 => {
"list with fixed argv[0] and variable later arguments \
(no shell-injection vector with shell=False; argument injection \
into the target binary is still possible — CWE-88)"
}
CommandArgKind::FunctionLike => "function value",
CommandArgKind::Unknown => "non-static argument",
};
let lang_label = match ext {
"py" => "python",
"js" | "jsx" => "javascript",
"ts" | "tsx" => "typescript",
"rb" => "ruby",
"php" => "php",
"go" => "go",
"java" => "java",
"sh" => "bash",
_ => "",
};
let description = format!(
"**Potential Command Injection (CWE-78)**\n\n\
**API**: `{}`\n\n\
**Argument shape**: {}\n\n\
**Location**: {}:{}\n\n\
**Code snippet**:\n```{}\n{}\n```\n\n\
OS-command-execution APIs run their argument as a shell or \
argv list. When that argument is anything other than a \
constant the program author controls at write time, \
attackers who can influence the value get arbitrary \
command execution.",
api_name,
arg_desc,
path.display(),
line_num,
lang_label,
snippet,
);
let suggested_fix = self.recommend(api, ext);
Finding {
id: String::new(),
detector: "CommandInjectionDetector".to_string(),
severity,
title,
description,
affected_files: vec![self.relative_path(path)],
line_start: Some(line_num),
line_end: Some(line_num),
suggested_fix: Some(suggested_fix),
estimated_effort: Some("45 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-78".to_string()),
why_it_matters: Some(
"Attackers could execute arbitrary system commands by injecting shell \
metacharacters or by choosing the binary that runs."
.to_string(),
),
..Default::default()
}
}
#[allow(clippy::too_many_arguments)]
fn build_dual_branch_python_finding(
&self,
path: &Path,
line_num: u32,
api: CommandApi,
arg_kind: CommandArgKind,
snippet: &str,
call_node: tree_sitter::Node<'_>,
source: &[u8],
lines: &[&str],
literal_text: Option<&str>,
ext: &str,
) -> Finding {
let api_label = api.callee_label();
let evidence = evidence::extract_python_evidence(call_node, source, lines);
let prediction = predict::predict(&evidence, api, arg_kind, literal_text);
let predicted_label = prediction.predicted;
let predicted_severity = prediction.predicted_severity;
let predicted_title = match predicted_label {
crate::dual_branch::BranchLabel::RealBug => {
format!("Potential command injection via {api_label}")
}
crate::dual_branch::BranchLabel::Benign => {
format!("Internal command exec via {api_label} (informational)")
}
};
let lang_label = match ext {
"py" => "python",
_ => "",
};
let predicted_description = format!(
"**Command injection (dual-branch, CWE-78)**\n\n\
**API**: `{}`\n\n\
**Location**: {}:{}\n\n\
**Code**:\n```{}\n{}\n```\n\n\
{}",
api_label,
path.display(),
line_num,
lang_label,
snippet,
match predicted_label {
crate::dual_branch::BranchLabel::RealBug => format!(
"The argument to `{api_label}` appears to be \
attacker-influenceable (request source, parameter \
not classified as config/literal, or `shell=True` \
with non-literal command). The predictor leans \
RealBug for this call site (see `prediction_reasons`)."
),
crate::dual_branch::BranchLabel::Benign => format!(
"The argument to `{api_label}` appears to be \
internal — fixed argv[0], all-literal list, or \
config-derived. The predictor leans Benign (see \
`prediction_reasons`); the original \
`severity_for`-table interpretation is carried in \
`alternative_branch`."
),
},
);
let predicted_fix = match predicted_label {
crate::dual_branch::BranchLabel::RealBug => Some(format!(
"{}\n\nIf this is a false positive (the command is \
internal/config-derived and not attacker-reachable), \
annotate the call site with `# repotoire: \
command-static[<reason>]` to collapse the finding to \
Info.",
self.recommend(api, ext)
)),
crate::dual_branch::BranchLabel::Benign => Some(
"If this is intentional internal use, annotate \
`# repotoire: command-static[<reason>]` to collapse \
the finding to Info definitively. If this IS \
attacker-reachable (the alternative branch), follow \
the standard remediation: pass arguments as a list \
with `shell=False`, and validate any user-controlled \
component against an allowlist."
.to_string(),
),
};
let mut finding = Finding {
id: String::new(),
detector: "CommandInjectionDetector".to_string(),
severity: predicted_severity,
title: predicted_title,
description: predicted_description,
affected_files: vec![self.relative_path(path)],
line_start: Some(line_num),
line_end: Some(line_num),
suggested_fix: predicted_fix,
estimated_effort: Some("45 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-78".to_string()),
why_it_matters: Some(
"Command injection lets attackers run arbitrary OS \
commands — but not every exec call site is \
attacker-reachable. The predictor's job is to \
distinguish, and to keep the alternative interpretation \
visible via --show-alternatives."
.to_string(),
),
..Default::default()
};
finding = finding.with_alternative_branch(prediction.alternative_branch);
for reason in prediction.reasons {
finding = finding.with_prediction_reason(reason);
}
for resolution in prediction.resolutions {
finding = finding.with_resolution_signal(resolution);
}
finding
}
fn recommend(&self, api: CommandApi, ext: &str) -> String {
match (api, ext) {
(CommandApi::PyOsSystem | CommandApi::PyOsPopen, _) => {
"Avoid `os.system` / `os.popen` — they spawn a shell and pass the \
string through it.\n\n\
- Prefer `subprocess.run([\"cmd\", \"arg\"], shell=False)` with a \
fixed argv[0] and untrusted input only as later argv elements.\n\
- Validate any user-controlled argv[0] against an allowlist."
.to_string()
}
(
CommandApi::PySubprocessShell
| CommandApi::PySubprocessGetOutput
| CommandApi::PyCommandsGetOutput,
_,
) => "Avoid `subprocess` with `shell=True` (or `getoutput`, which is \
implicit shell=True). Replace with the list form: \
`subprocess.run([\"cmd\", \"arg\"], shell=False)`."
.to_string(),
(CommandApi::PySubprocessNoShell, _) => {
"Use a fixed-string argv[0]. Validate any user-controlled argv[0] \
against an allowlist of allowed binaries — even without shell=True \
the attacker can otherwise choose which program runs."
.to_string()
}
(CommandApi::PySubprocessShellC, _) => {
"`subprocess.run([\"sh\", \"-c\", user_input])` is a textbook \
shell-injection sink — the shell still interprets user_input as \
a command line. Drop the shell entirely: use \
`subprocess.run([\"binary\", \"arg1\", user_input], shell=False)` \
with a fixed binary and pass user data as later argv elements."
.to_string()
}
(CommandApi::PyPtySpawn, _) => {
"Avoid `pty.spawn` on user-controlled command strings. Use a fixed \
binary and pass user data as later argv elements."
.to_string()
}
(
CommandApi::JsChildProcessExec | CommandApi::JsShellJsExec,
"js" | "ts" | "jsx" | "tsx",
) => "Avoid `child_process.exec` (it always spawns a shell). Use \
`child_process.execFile` or `spawn` with `[binary, args]` and \
`shell: false`. Never interpolate user input into a command \
string."
.to_string(),
(
CommandApi::JsChildProcessExecFile
| CommandApi::JsChildProcessSpawn
| CommandApi::JsChildProcessFork,
"js" | "ts" | "jsx" | "tsx",
) => "Use a fixed binary path for argv[0]. Validate any \
user-controlled argv[0] against an allowlist — even without \
`shell: true` the attacker can otherwise choose which program \
runs."
.to_string(),
(CommandApi::JsSpawnShellC, _) => {
"`child_process.spawn(\"sh\", [\"-c\", userInput])` is a \
textbook shell-injection sink — the shell still interprets \
userInput as a command line. Drop the shell entirely: use \
`child_process.spawn(\"binary\", [\"arg1\", userInput], { shell: false })` \
with a fixed binary and pass user data as later argv elements."
.to_string()
}
(CommandApi::GoExecCommandShellC, _) => {
"`exec.Command(\"sh\", \"-c\", userInput)` is a textbook \
shell-injection sink. Use `exec.Command(binary, arg1, arg2)` with \
a fixed binary and pass user input as later arguments. Never \
build a shell command string from user data."
.to_string()
}
(CommandApi::GoExecCommand | CommandApi::GoSyscallExec, _) => {
"Use a fixed binary path for the first argument. Validate any \
user-controlled argv[0] against an allowlist of allowed \
binaries. Use `filepath.Clean` for paths."
.to_string()
}
(_, "rb") => "Avoid `system` / `exec` / backtick-strings on user input. \
Use `Open3.capture2(['cmd', arg])` with an array form."
.to_string(),
(_, "php") => "Avoid `system` / `shell_exec` / `passthru` / `proc_open` / `exec` \
on user input. If you must shell out, validate with \
`escapeshellarg`/`escapeshellcmd` and use a fixed command."
.to_string(),
(_, "java") => "Use `ProcessBuilder` with an explicit argv list and a \
fixed binary; never concatenate user input into the command \
string."
.to_string(),
_ => "Avoid passing user-controlled data to OS-command APIs.".to_string(),
}
}
}
impl Detector for CommandInjectionDetector {
fn name(&self) -> &'static str {
"command-injection"
}
fn description(&self) -> &'static str {
"Detects command injection vulnerabilities (AST-first; CWE-78)"
}
fn bypass_postprocessor(&self) -> bool {
true
}
crate::detectors::impl_taint_precompute!();
fn taint_category(&self) -> Option<crate::detectors::taint::TaintCategory> {
Some(TaintCategory::CommandInjection)
}
fn file_extensions(&self) -> &'static [&'static str] {
SUPPORTED_EXTS
}
fn content_requirements(&self) -> crate::detectors::detector_context::ContentFlags {
crate::detectors::detector_context::ContentFlags::HAS_EXEC
}
fn detect(
&self,
ctx: &crate::detectors::analysis_context::AnalysisContext,
) -> Result<Vec<Finding>> {
let graph = ctx.graph;
let files = &ctx.as_file_provider();
let mut findings: Vec<Finding> = vec![];
let flag_on = ctx.dual_branch.is_enabled_for("command-injection");
let mut taint_paths = if let Some(cross) = self.precomputed_cross.get() {
cross.clone()
} else {
self.taint_analyzer
.trace_taint(graph, TaintCategory::CommandInjection)
};
let intra_paths = if let Some(intra) = self.precomputed_intra.get() {
intra.clone()
} else {
crate::detectors::taint::run_intra_function_taint(
&self.taint_analyzer,
graph,
TaintCategory::CommandInjection,
&self.repository_path,
)
};
taint_paths.extend(intra_paths);
let taint_result = TaintAnalysisResult::from_paths(taint_paths);
for path in files.files_with_extensions(SUPPORTED_EXTS) {
if findings.len() >= self.max_findings {
break;
}
let raw = match files.content(path) {
Some(c) => c,
None => continue,
};
let raw_str: &str = &raw;
if !contains_any(COMMAND_KEYWORD_FINDERS, raw_str) && !raw_str.contains('`') {
continue;
}
if raw.len() > 500_000 {
continue;
}
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
let has_ast_grammar = AST_EXTS.contains(&ext);
let new_findings = if has_ast_grammar {
let cached = files.tree(path);
let lang = Language::from_path(path);
let scan = ScanInputs::new(path, raw_str, ext);
let ast_inputs = ScanAstInputs::new(scan, lang, cached.as_deref());
self.scan_file_ast(&ast_inputs, flag_on)
} else {
let scan = ScanInputs::new(path, raw_str, ext);
self.scan_file_line(&scan)
};
findings.extend(new_findings);
}
for finding in &mut findings {
if finding.is_dual_branch() {
continue;
}
let file_path = finding
.affected_files
.first()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_default();
let line = finding.line_start.unwrap_or(0);
for taint in &taint_result.paths {
if (taint.sink_file == file_path || taint.source_file == file_path)
&& (taint.sink_line == line || taint.source_line == line)
{
if taint.is_sanitized {
finding.severity = Severity::Low;
finding.description = format!(
"{}\n\n**Taint Analysis Note**: A sanitizer function (`{}`) \
was found in the data flow path, which may mitigate this \
vulnerability.",
finding.description,
taint.sanitizer.as_deref().unwrap_or("unknown")
);
} else {
finding.severity = Severity::Critical;
finding.description = format!(
"{}\n\n**Taint Analysis Confirmed**: Data flow analysis \
traced a path from user input to this command-execution \
sink without sanitization:\n\n`{}`",
finding.description,
taint.path_string()
);
}
break;
}
}
}
static HANDLER_VERB_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"^(get|post|put|delete|patch|head|options)[A-Z]").expect("valid regex")
});
for finding in &mut findings {
if finding.is_dual_branch() {
continue;
}
if !matches!(finding.severity, Severity::High | Severity::Medium) {
continue;
}
if let (Some(file_path), Some(line)) =
(finding.affected_files.first(), finding.line_start)
{
let path_str = file_path.to_string_lossy().to_string();
let i = graph.interner();
if let Some(func) = graph.find_function_at(&path_str, line) {
let raw_name = func.node_name(i);
let name_lower = raw_name.to_lowercase();
let is_route = name_lower.contains("handler")
|| name_lower.contains("route")
|| name_lower.contains("endpoint")
|| name_lower.contains("view")
|| name_lower.contains("controller")
|| name_lower.contains("middleware")
|| name_lower.contains("request")
|| name_lower.contains("response")
|| HANDLER_VERB_RE.is_match(raw_name);
if is_route {
finding.severity = Severity::Critical;
}
}
}
}
if !flag_on {
findings.retain(|f| f.severity != Severity::Low);
}
Ok(findings)
}
}
impl crate::detectors::RegisteredDetector for CommandInjectionDetector {
fn create(init: &crate::detectors::DetectorInit) -> std::sync::Arc<dyn Detector> {
std::sync::Arc::new(Self::new(init.repo_path))
}
}
static COMMAND_KEYWORD_FINDERS: &[&LazyLock<memchr::memmem::Finder<'static>>] = &[
&FIND_OS_SYSTEM,
&FIND_OS_POPEN,
&FIND_SUBPROCESS,
&FIND_CHILD_PROCESS,
&FIND_EXEC_SYNC,
&FIND_EXEC_ASYNC,
&FIND_SPAWN_SYNC,
&FIND_SHELL_EXEC,
&FIND_PROC_OPEN,
&FIND_EXEC_COMMAND,
&FIND_RUNTIME_GETRUNTIME,
&FIND_PROCESS_BUILDER,
&FIND_SHELL_TRUE,
&FIND_SHELL_TRUE_JS,
&FIND_EXEC_PAREN,
&FIND_GETOUTPUT_PAREN,
&FIND_SYSTEM_PAREN,
&FIND_PASSTHRU_PAREN,
&FIND_POPEN_PAREN,
&FIND_SYSCALL_DOT,
&FIND_SHELLJS,
&FIND_PTY_SPAWN,
&FIND_COMMANDS_GETOUTPUT,
&FIND_COMMANDS_GETSTATUSOUTPUT,
&FIND_OS_EXEC_IMPORT,
];
struct CommandSite<'a> {
call_node: tree_sitter::Node<'a>,
api: CommandApi,
arg_kind: CommandArgKind,
target_text: Option<String>,
}
fn collect_command_sites<'a>(
ctx: &AstWalkCtx<'a>,
node: tree_sitter::Node<'a>,
go_aliases: &GoImportAliases,
py_aliases: &super::python_imports::PythonAliases<'_>,
out: &mut Vec<CommandSite<'a>>,
) {
if let Some(site) = match_command_site(node, ctx.source, ctx.lang, go_aliases, py_aliases) {
out.push(site);
}
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
collect_command_sites(ctx, child, go_aliases, py_aliases, out);
}
}
fn match_command_site<'a>(
node: tree_sitter::Node<'a>,
source: &'a [u8],
lang: Language,
go_aliases: &GoImportAliases,
py_aliases: &super::python_imports::PythonAliases<'_>,
) -> Option<CommandSite<'a>> {
match (node.kind(), lang) {
("call", Language::Python) => match_python_call(node, source, py_aliases),
("call_expression", Language::JavaScript | Language::TypeScript) => {
match_js_call(node, source)
}
("call_expression", Language::Go) => match_go_call(node, source, go_aliases),
_ => None,
}
}
fn is_shell_binary_name(name: &str) -> bool {
matches!(
name,
"sh" | "bash"
| "dash"
| "zsh"
| "ksh"
| "ash"
| "csh"
| "tcsh"
| "fish"
| "/bin/sh"
| "/bin/bash"
| "/bin/dash"
| "/bin/zsh"
| "/bin/ksh"
| "/bin/ash"
| "/bin/csh"
| "/bin/tcsh"
| "/usr/bin/sh"
| "/usr/bin/bash"
| "/usr/bin/dash"
| "/usr/bin/zsh"
| "/usr/bin/ksh"
| "/usr/bin/env"
| "/usr/local/bin/bash"
| "cmd"
| "cmd.exe"
| "powershell"
| "powershell.exe"
| "pwsh"
| "pwsh.exe"
)
}
fn is_shell_c_flag(flag: &str) -> bool {
matches!(flag, "-c" | "/c" | "/C" | "-Command")
}
fn classify_python_command_callee(
module: &str,
name: &str,
arg_nodes: &[tree_sitter::Node<'_>],
source: &[u8],
) -> Option<(CommandApi, usize)> {
Some(match (module, name) {
("os", "system") => (CommandApi::PyOsSystem, 0),
("os", "popen" | "popen2" | "popen3" | "popen4") => (CommandApi::PyOsPopen, 0),
("subprocess", "run" | "call" | "Popen" | "check_output" | "check_call") => {
let api = if python_subprocess_shell_true(arg_nodes, source) {
CommandApi::PySubprocessShell
} else {
CommandApi::PySubprocessNoShell
};
(api, 0)
}
("subprocess", "getoutput" | "getstatusoutput") => (CommandApi::PySubprocessGetOutput, 0),
("commands", "getoutput" | "getstatusoutput") => (CommandApi::PyCommandsGetOutput, 0),
("pty", "spawn") => (CommandApi::PyPtySpawn, 0),
_ => return None,
})
}
fn match_python_call<'a>(
node: tree_sitter::Node<'a>,
source: &'a [u8],
aliases: &super::python_imports::PythonAliases<'_>,
) -> Option<CommandSite<'a>> {
let func = node.child_by_field_name("function")?;
let func = unwrap_callee(func);
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let (api, classified_arg_index) = match func.kind() {
"attribute" => {
let obj = func.child_by_field_name("object")?;
let attr = func.child_by_field_name("attribute")?;
let attr_text = node_text(attr, source)?;
let raw_label = receiver_chain_label(obj, source);
let obj_text = node_text(obj, source).unwrap_or("");
let obj_label = aliases
.modules
.get(obj_text)
.or_else(|| aliases.modules.get(raw_label.as_str()))
.cloned()
.unwrap_or(raw_label);
classify_python_command_callee(obj_label.as_str(), attr_text, &arg_nodes, source)?
}
"identifier" => {
let name = node_text(func, source)?;
let module = aliases.imports.get(name)?;
classify_python_command_callee(module.as_str(), name, &arg_nodes, source)?
}
_ => return None,
};
let target = arg_nodes.get(classified_arg_index).copied()?;
let target = if target.kind() == "keyword_argument" {
arg_nodes
.iter()
.copied()
.find(|a| a.kind() != "keyword_argument")?
} else {
target
};
if api == CommandApi::PySubprocessNoShell {
if let Some(kind) = python_detect_shell_c(target, source) {
return Some(CommandSite {
call_node: node,
api: CommandApi::PySubprocessShellC,
arg_kind: kind,
target_text: None,
});
}
}
let arg_kind = classify_command_arg_python(target, source);
let target_text = if matches!(arg_kind, CommandArgKind::StaticLiteral) {
node_text(target, source).map(|s| s.to_string())
} else {
None
};
Some(CommandSite {
call_node: node,
api,
arg_kind,
target_text,
})
}
fn python_subprocess_shell_true(args: &[tree_sitter::Node<'_>], source: &[u8]) -> bool {
python_kwarg_truthy(args, "shell", source, true)
}
fn python_string_literal_value(node: tree_sitter::Node<'_>, source: &[u8]) -> Option<String> {
if node.kind() != "string" {
return None;
}
let mut cursor = node.walk();
let mut buf = String::new();
let mut saw_content = false;
for child in node.children(&mut cursor) {
match child.kind() {
"interpolation" => return None,
"string_content" => {
if let Some(t) = node_text(child, source) {
buf.push_str(t);
saw_content = true;
}
}
_ => {}
}
}
if saw_content {
return Some(buf);
}
let raw = node_text(node, source)?;
let inner = raw
.strip_prefix("r\"")
.or_else(|| raw.strip_prefix("r'"))
.or_else(|| raw.strip_prefix('"'))
.or_else(|| raw.strip_prefix('\''))?;
let inner = inner
.strip_suffix('"')
.or_else(|| inner.strip_suffix('\''))?;
Some(inner.to_string())
}
fn python_detect_shell_c(target: tree_sitter::Node<'_>, source: &[u8]) -> Option<CommandArgKind> {
if !matches!(target.kind(), "list" | "tuple") {
return None;
}
let mut elements: Vec<tree_sitter::Node<'_>> = Vec::new();
let mut cursor = target.walk();
for child in target.children(&mut cursor) {
if child.is_named() {
elements.push(child);
}
}
if elements.len() < 3 {
return None;
}
let name = python_string_literal_value(elements[0], source)?;
if !is_shell_binary_name(name.as_str()) {
return None;
}
let flag = python_string_literal_value(elements[1], source)?;
if !is_shell_c_flag(flag.as_str()) {
return None;
}
let any_var = elements[2..].iter().any(|e| {
!matches!(
classify_command_arg_python(*e, source),
CommandArgKind::StaticLiteral | CommandArgKind::FunctionLike
)
});
if !any_var {
return None;
}
Some(CommandArgKind::MixedListVarArgv0)
}
#[allow(clippy::only_used_in_recursion)]
fn classify_command_arg_python(node: tree_sitter::Node<'_>, source: &[u8]) -> CommandArgKind {
match node.kind() {
"string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if child.kind() == "interpolation" {
return CommandArgKind::Interpolated;
}
}
CommandArgKind::StaticLiteral
}
"concatenated_string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if classify_command_arg_python(child, source) == CommandArgKind::Interpolated {
return CommandArgKind::Interpolated;
}
}
CommandArgKind::StaticLiteral
}
"binary_operator" => {
let mut found_var = false;
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if !child.is_named() {
continue;
}
match classify_command_arg_python(child, source) {
CommandArgKind::UserVariable
| CommandArgKind::Interpolated
| CommandArgKind::Unknown => found_var = true,
_ => {}
}
}
if found_var {
CommandArgKind::Interpolated
} else {
CommandArgKind::StaticLiteral
}
}
"list" | "tuple" => {
let mut elements: Vec<tree_sitter::Node<'_>> = Vec::new();
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if child.is_named() {
elements.push(child);
}
}
classify_list_elements_py(&elements, source)
}
"identifier" | "attribute" | "subscript" | "call" => CommandArgKind::UserVariable,
"lambda" => CommandArgKind::FunctionLike,
"parenthesized_expression" => {
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
return classify_command_arg_python(c, source);
}
}
CommandArgKind::Unknown
}
"await" => {
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
return classify_command_arg_python(c, source);
}
}
CommandArgKind::Unknown
}
"conditional_expression" => {
let mut strongest = CommandArgKind::StaticLiteral;
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
let k = classify_command_arg_python(c, source);
strongest = strongest_arg_kind(strongest, k);
}
}
strongest
}
_ => CommandArgKind::Unknown,
}
}
fn classify_list_elements_py(elements: &[tree_sitter::Node<'_>], source: &[u8]) -> CommandArgKind {
if elements.is_empty() {
return CommandArgKind::StaticList;
}
let first_kind = classify_command_arg_python(elements[0], source);
let mut all_literal = matches!(first_kind, CommandArgKind::StaticLiteral);
let mut any_non_literal = !all_literal;
for e in &elements[1..] {
let k = classify_command_arg_python(*e, source);
match k {
CommandArgKind::StaticLiteral => {}
_ => {
all_literal = false;
any_non_literal = true;
}
}
}
if all_literal {
CommandArgKind::StaticList
} else if !matches!(first_kind, CommandArgKind::StaticLiteral) {
CommandArgKind::MixedListVarArgv0
} else if any_non_literal {
CommandArgKind::MixedListLiteralArgv0
} else {
CommandArgKind::StaticList
}
}
fn match_js_call<'a>(node: tree_sitter::Node<'a>, source: &'a [u8]) -> Option<CommandSite<'a>> {
let func = node.child_by_field_name("function")?;
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let func = unwrap_callee(func);
let api = match func.kind() {
"identifier" => {
match node_text(func, source)? {
"exec" | "execAsync" => CommandApi::JsChildProcessExec,
"execSync" => CommandApi::JsChildProcessExec,
"execFile" | "execFileSync" => CommandApi::JsChildProcessExecFile,
"spawn" | "spawnSync" => CommandApi::JsChildProcessSpawn,
"fork" => CommandApi::JsChildProcessFork,
_ => return None,
}
}
"member_expression" => {
let obj = func.child_by_field_name("object")?;
let prop = func.child_by_field_name("property")?;
let prop_text = node_text(prop, source)?;
let recv = receiver_chain_label(obj, source);
let cp_aliases = matches!(recv.as_str(), "child_process" | "cp" | "childprocess");
let shelljs_aliases = matches!(recv.as_str(), "shelljs" | "shell" | "sh");
if cp_aliases {
match prop_text {
"exec" | "execAsync" | "execSync" => CommandApi::JsChildProcessExec,
"execFile" | "execFileSync" => CommandApi::JsChildProcessExecFile,
"spawn" | "spawnSync" => CommandApi::JsChildProcessSpawn,
"fork" => CommandApi::JsChildProcessFork,
_ => return None,
}
} else if shelljs_aliases && prop_text == "exec" {
CommandApi::JsShellJsExec
} else {
return None;
}
}
_ => return None,
};
let shell_option_true = matches!(
api,
CommandApi::JsChildProcessSpawn | CommandApi::JsChildProcessExecFile
) && js_spawn_options_shell_true(&arg_nodes, source);
let api = if shell_option_true {
CommandApi::JsChildProcessExec
} else {
api
};
if matches!(
api,
CommandApi::JsChildProcessSpawn | CommandApi::JsChildProcessExecFile
) {
if let Some(site) = js_detect_shell_c(node, &arg_nodes, source) {
return Some(site);
}
}
let first = arg_nodes.first().copied()?;
let arg_kind_first = classify_command_arg_js(first, source);
let arg_kind = if shell_option_true && matches!(arg_kind_first, CommandArgKind::StaticLiteral) {
match arg_nodes.get(1).copied() {
Some(second) => match classify_command_arg_js(second, source) {
CommandArgKind::StaticLiteral | CommandArgKind::StaticList => arg_kind_first,
other => other,
},
None => arg_kind_first,
}
} else {
arg_kind_first
};
let target_text = if matches!(arg_kind, CommandArgKind::StaticLiteral) {
node_text(first, source).map(|s| s.to_string())
} else {
None
};
Some(CommandSite {
call_node: node,
api,
arg_kind,
target_text,
})
}
fn js_detect_shell_c<'a>(
node: tree_sitter::Node<'a>,
arg_nodes: &[tree_sitter::Node<'a>],
source: &'a [u8],
) -> Option<CommandSite<'a>> {
let cmd_node = *arg_nodes.first()?;
let cmd = js_string_literal_value(cmd_node, source)?;
if !is_shell_binary_name(cmd.as_str()) {
return None;
}
let args_array = *arg_nodes.get(1)?;
if args_array.kind() != "array" {
return None;
}
let mut elements: Vec<tree_sitter::Node<'_>> = Vec::new();
let mut cursor = args_array.walk();
for child in args_array.children(&mut cursor) {
if child.is_named() {
elements.push(child);
}
}
if elements.is_empty() {
return None;
}
let flag = js_string_literal_value(elements[0], source)?;
if !is_shell_c_flag(flag.as_str()) {
return None;
}
let any_var = elements.iter().skip(1).any(|e| {
!matches!(
classify_command_arg_js(*e, source),
CommandArgKind::StaticLiteral | CommandArgKind::FunctionLike
)
});
if !any_var {
return None;
}
Some(CommandSite {
call_node: node,
api: CommandApi::JsSpawnShellC,
arg_kind: CommandArgKind::MixedListVarArgv0,
target_text: None,
})
}
fn js_spawn_options_shell_true(args: &[tree_sitter::Node<'_>], source: &[u8]) -> bool {
for arg in args.iter().rev() {
if arg.kind() != "object" {
continue;
}
let mut cursor = arg.walk();
for child in arg.children(&mut cursor) {
if child.kind() != "pair" {
continue;
}
let key = match child.child_by_field_name("key") {
Some(k) => k,
None => continue,
};
let key_text = match key.kind() {
"property_identifier" => node_text(key, source).map(|s| s.to_string()),
"string" => js_string_literal_value(key, source),
_ => None,
};
if key_text.as_deref() != Some("shell") {
continue;
}
let value = match child.child_by_field_name("value") {
Some(v) => v,
None => continue,
};
match value.kind() {
"true" => return true,
"false" => return false,
_ => return true, }
}
}
false
}
#[allow(clippy::only_used_in_recursion)]
fn classify_command_arg_js(node: tree_sitter::Node<'_>, source: &[u8]) -> CommandArgKind {
match node.kind() {
"string" => CommandArgKind::StaticLiteral,
"template_string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if child.kind() == "template_substitution" {
return CommandArgKind::Interpolated;
}
}
CommandArgKind::StaticLiteral
}
"binary_expression" => {
let left = node.child_by_field_name("left");
let right = node.child_by_field_name("right");
let mut found_var = false;
for opt in [left, right].iter().flatten() {
match classify_command_arg_js(*opt, source) {
CommandArgKind::UserVariable
| CommandArgKind::Interpolated
| CommandArgKind::Unknown => {
found_var = true;
}
_ => {}
}
}
if found_var {
CommandArgKind::Interpolated
} else {
CommandArgKind::StaticLiteral
}
}
"array" => {
let mut elements: Vec<tree_sitter::Node<'_>> = Vec::new();
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if child.is_named() {
elements.push(child);
}
}
classify_list_elements_js(&elements, source)
}
"identifier" | "member_expression" | "subscript_expression" | "call_expression" => {
CommandArgKind::UserVariable
}
"arrow_function" | "function_expression" | "function" | "function_declaration" => {
CommandArgKind::FunctionLike
}
"parenthesized_expression" => {
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
return classify_command_arg_js(c, source);
}
}
CommandArgKind::Unknown
}
"await_expression"
| "as_expression"
| "type_assertion_expression"
| "non_null_expression"
| "satisfies_expression" => {
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
return classify_command_arg_js(c, source);
}
}
CommandArgKind::Unknown
}
"ternary_expression" => {
let consequence = node.child_by_field_name("consequence");
let alternative = node.child_by_field_name("alternative");
let mut strongest = CommandArgKind::StaticLiteral;
for opt in [consequence, alternative].iter().flatten() {
let k = classify_command_arg_js(*opt, source);
strongest = strongest_arg_kind(strongest, k);
}
strongest
}
_ => CommandArgKind::Unknown,
}
}
fn classify_list_elements_js(elements: &[tree_sitter::Node<'_>], source: &[u8]) -> CommandArgKind {
if elements.is_empty() {
return CommandArgKind::StaticList;
}
let first_kind = classify_command_arg_js(elements[0], source);
let mut all_literal = matches!(first_kind, CommandArgKind::StaticLiteral);
let mut any_non_literal = !all_literal;
for e in &elements[1..] {
let k = classify_command_arg_js(*e, source);
match k {
CommandArgKind::StaticLiteral => {}
_ => {
all_literal = false;
any_non_literal = true;
}
}
}
if all_literal {
CommandArgKind::StaticList
} else if !matches!(first_kind, CommandArgKind::StaticLiteral) {
CommandArgKind::MixedListVarArgv0
} else if any_non_literal {
CommandArgKind::MixedListLiteralArgv0
} else {
CommandArgKind::StaticList
}
}
fn match_go_call<'a>(
node: tree_sitter::Node<'a>,
source: &'a [u8],
aliases: &GoImportAliases,
) -> Option<CommandSite<'a>> {
let func = node.child_by_field_name("function")?;
if func.kind() != "selector_expression" {
return None;
}
let operand = func.child_by_field_name("operand")?;
let field = func.child_by_field_name("field")?;
let raw_operand_label = receiver_chain_label_go(operand, source);
let operand_label = aliases.canonical(&raw_operand_label);
let field_text = node_text(field, source)?;
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let (api_base, name_idx) = match (operand_label, field_text) {
("exec", "Command") => (CommandApi::GoExecCommand, 0usize),
("exec", "CommandContext") => (CommandApi::GoExecCommand, 1usize),
("syscall", "Exec" | "StartProcess") => (CommandApi::GoSyscallExec, 0usize),
_ => return None,
};
if api_base == CommandApi::GoExecCommand {
if let (Some(name_node), Some(flag_node)) =
(arg_nodes.get(name_idx), arg_nodes.get(name_idx + 1))
{
let name_lit = go_string_literal_value(*name_node, source);
let flag_lit = go_string_literal_value(*flag_node, source);
if let (Some(name), Some(flag)) = (name_lit.as_deref(), flag_lit.as_deref()) {
if is_shell_binary_name(name) && is_shell_c_flag(flag) {
if let Some(rest) = arg_nodes.get(name_idx + 2) {
let kind = classify_command_arg_go(*rest, source);
if !matches!(
kind,
CommandArgKind::StaticLiteral | CommandArgKind::FunctionLike
) {
return Some(CommandSite {
call_node: node,
api: CommandApi::GoExecCommandShellC,
arg_kind: kind,
target_text: None,
});
}
}
}
}
}
}
let target = arg_nodes.get(name_idx).copied()?;
let arg_kind = classify_command_arg_go(target, source);
let target_text = if matches!(arg_kind, CommandArgKind::StaticLiteral) {
node_text(target, source).map(|s| s.to_string())
} else {
None
};
Some(CommandSite {
call_node: node,
api: api_base,
arg_kind,
target_text,
})
}
fn go_string_literal_value(node: tree_sitter::Node<'_>, source: &[u8]) -> Option<String> {
match node.kind() {
"interpreted_string_literal" | "raw_string_literal" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if matches!(
child.kind(),
"interpreted_string_literal_content"
| "raw_string_literal_content"
| "string_content"
) {
return node_text(child, source).map(|s| s.to_string());
}
}
let raw = node_text(node, source)?;
let trimmed = raw
.strip_prefix('"')
.and_then(|s| s.strip_suffix('"'))
.or_else(|| raw.strip_prefix('`').and_then(|s| s.strip_suffix('`')))
.unwrap_or(raw);
Some(trimmed.to_string())
}
"parenthesized_expression" => {
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
return go_string_literal_value(c, source);
}
}
None
}
_ => None,
}
}
#[allow(clippy::only_used_in_recursion)]
fn classify_command_arg_go(node: tree_sitter::Node<'_>, source: &[u8]) -> CommandArgKind {
match node.kind() {
"interpreted_string_literal" | "raw_string_literal" => CommandArgKind::StaticLiteral,
"binary_expression" => {
let left = node.child_by_field_name("left");
let right = node.child_by_field_name("right");
let mut found_var = false;
for opt in [left, right].iter().flatten() {
match classify_command_arg_go(*opt, source) {
CommandArgKind::UserVariable
| CommandArgKind::Interpolated
| CommandArgKind::Unknown => {
found_var = true;
}
_ => {}
}
}
if found_var {
CommandArgKind::Interpolated
} else {
CommandArgKind::StaticLiteral
}
}
"identifier" | "selector_expression" | "index_expression" | "call_expression" => {
CommandArgKind::UserVariable
}
"func_literal" => CommandArgKind::FunctionLike,
"parenthesized_expression" => {
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
return classify_command_arg_go(c, source);
}
}
CommandArgKind::Unknown
}
"composite_literal" => {
let mut elements: Vec<tree_sitter::Node<'_>> = Vec::new();
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if child.kind() == "literal_value" {
let mut c2 = child.walk();
for inner in child.children(&mut c2) {
if inner.is_named() {
if inner.kind() == "keyed_element" {
if let Some(value) = inner.child_by_field_name("value") {
elements.push(value);
}
} else if inner.kind() != "literal_element" {
elements.push(inner);
} else {
for j in 0..inner.named_child_count() {
if let Some(c) = inner.named_child(j) {
elements.push(c);
}
}
}
}
}
}
}
classify_list_elements_go(&elements, source)
}
_ => CommandArgKind::Unknown,
}
}
fn classify_list_elements_go(elements: &[tree_sitter::Node<'_>], source: &[u8]) -> CommandArgKind {
if elements.is_empty() {
return CommandArgKind::StaticList;
}
let first_kind = classify_command_arg_go(elements[0], source);
let mut all_literal = matches!(first_kind, CommandArgKind::StaticLiteral);
let mut any_non_literal = !all_literal;
for e in &elements[1..] {
let k = classify_command_arg_go(*e, source);
match k {
CommandArgKind::StaticLiteral => {}
_ => {
all_literal = false;
any_non_literal = true;
}
}
}
if all_literal {
CommandArgKind::StaticList
} else if !matches!(first_kind, CommandArgKind::StaticLiteral) {
CommandArgKind::MixedListVarArgv0
} else if any_non_literal {
CommandArgKind::MixedListLiteralArgv0
} else {
CommandArgKind::StaticList
}
}
#[derive(Debug, Default, Clone)]
struct GoImportAliases {
exec_aliases: std::collections::HashSet<String>,
syscall_aliases: std::collections::HashSet<String>,
}
impl GoImportAliases {
fn canonical<'s>(&self, label: &'s str) -> &'s str {
if self.exec_aliases.contains(label) {
return "exec";
}
if self.syscall_aliases.contains(label) {
return "syscall";
}
label
}
}
fn collect_go_import_aliases(root: tree_sitter::Node<'_>, source: &[u8]) -> GoImportAliases {
let mut out = GoImportAliases::default();
out.exec_aliases.insert("exec".to_string());
out.syscall_aliases.insert("syscall".to_string());
fn visit(node: tree_sitter::Node<'_>, source: &[u8], out: &mut GoImportAliases, depth: u8) {
if depth > 4 {
return;
}
if node.kind() == "import_spec" {
handle_import_spec(node, source, out);
return;
}
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
visit(child, source, out, depth + 1);
}
}
fn handle_import_spec(node: tree_sitter::Node<'_>, source: &[u8], out: &mut GoImportAliases) {
let path = match node.child_by_field_name("path") {
Some(p) => p,
None => return,
};
let raw = match node_text(path, source) {
Some(s) => s,
None => return,
};
let pkg_path = raw
.strip_prefix('"')
.and_then(|s| s.strip_suffix('"'))
.unwrap_or(raw);
let local = match node.child_by_field_name("name") {
Some(n) => node_text(n, source).map(|s| s.to_string()),
None => pkg_path.rsplit('/').next().map(|s| s.to_string()),
};
let local = match local {
Some(s) if !s.is_empty() && s != "_" && s != "." => s,
_ => return,
};
let local_lower = local.to_lowercase();
match pkg_path {
"os/exec" => {
out.exec_aliases.insert(local_lower);
}
"syscall" => {
out.syscall_aliases.insert(local_lower);
}
_ => {}
}
}
visit(root, source, &mut out, 0);
out
}
fn receiver_chain_label(node: tree_sitter::Node<'_>, source: &[u8]) -> String {
receiver_chain_label_shared(node, source, Some(&call_expression_module_label))
}
fn call_expression_module_label(
node: tree_sitter::Node<'_>,
source: &[u8],
) -> Option<&'static str> {
debug_assert_eq!(node.kind(), "call_expression");
let func = node.child_by_field_name("function")?;
let func_text = node_text(func, source)?;
let is_require_or_import =
matches!(func.kind(), "identifier" | "import") && matches!(func_text, "require" | "import");
if !is_require_or_import {
return None;
}
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let first = arg_nodes.first()?;
let module = js_string_literal_value(*first, source)?;
match module.as_str() {
"child_process" | "node:child_process" => Some("child_process"),
_ => None,
}
}
fn js_string_literal_value(node: tree_sitter::Node<'_>, source: &[u8]) -> Option<String> {
if node.kind() != "string" {
return None;
}
let mut cursor = node.walk();
let mut buf = String::new();
let mut saw_fragment = false;
for child in node.children(&mut cursor) {
if child.kind() == "string_fragment" {
if let Some(t) = node_text(child, source) {
buf.push_str(t);
saw_fragment = true;
}
}
}
if saw_fragment {
return Some(buf);
}
let raw = node_text(node, source)?;
let inner = raw
.strip_prefix('"')
.and_then(|s| s.strip_suffix('"'))
.or_else(|| raw.strip_prefix('\'').and_then(|s| s.strip_suffix('\'')))?;
Some(inner.to_string())
}
fn strongest_arg_kind(a: CommandArgKind, b: CommandArgKind) -> CommandArgKind {
fn rank(k: CommandArgKind) -> u8 {
match k {
CommandArgKind::MixedListVarArgv0 => 6,
CommandArgKind::UserVariable => 5,
CommandArgKind::Interpolated => 4,
CommandArgKind::MixedListLiteralArgv0 => 3,
CommandArgKind::Unknown => 2,
CommandArgKind::FunctionLike => 1,
CommandArgKind::StaticList => 0,
CommandArgKind::StaticLiteral => 0,
}
}
if rank(a) >= rank(b) {
a
} else {
b
}
}
fn match_line_command(line: &str, ext: &str) -> Option<(CommandApi, CommandArgKind)> {
static RUBY_BACKTICK_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"`[^`\n]*`").expect("valid regex"));
static RUBY_SYSTEM_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"(?:^|[^.>])\b(system|exec)\s*\(").expect("valid regex"));
static PHP_SHELL_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"(?:^|[^.>])\b(system|shell_exec|passthru|proc_open|popen|exec)\s*\(")
.expect("valid regex")
});
static JAVA_RUNTIME_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"(Runtime\.getRuntime\(\)\.exec|new\s+ProcessBuilder)\s*\(")
.expect("valid regex")
});
match ext {
"rb" => {
if let Some(m) = RUBY_BACKTICK_RE.find(line) {
let bt = m.as_str();
if bt.contains("#{") {
return Some((CommandApi::PyOsSystem, CommandArgKind::Interpolated));
}
return Some((CommandApi::PyOsSystem, CommandArgKind::StaticLiteral));
}
if let Some(m) = RUBY_SYSTEM_RE.find(line) {
let after = &line[m.end()..];
let arg = classify_line_arg(after);
return Some((CommandApi::PyOsSystem, arg));
}
None
}
"php" => {
if let Some(m) = PHP_SHELL_RE.find(line) {
let after = &line[m.end()..];
let arg = classify_line_arg(after);
let api = if line.contains("shell_exec(") {
CommandApi::JsShellJsExec
} else {
CommandApi::PyOsSystem
};
return Some((api, arg));
}
None
}
"java" => {
if let Some(m) = JAVA_RUNTIME_RE.find(line) {
let after = &line[m.end()..];
let arg = classify_line_arg(after);
return Some((CommandApi::PyOsSystem, arg));
}
None
}
"sh" => None,
_ => None,
}
}
fn classify_line_arg(after_paren: &str) -> CommandArgKind {
let trimmed = after_paren.trim_start();
if trimmed.starts_with('[') || trimmed.starts_with("array(") {
return CommandArgKind::MixedListLiteralArgv0;
}
if trimmed.starts_with('"') || trimmed.starts_with('\'') {
let quote = trimmed.as_bytes()[0];
let mut i = 1;
let bytes = trimmed.as_bytes();
let mut had_interp = false;
while i < bytes.len() {
let c = bytes[i];
if c == b'\\' {
i += 2;
continue;
}
if c == quote {
break;
}
if quote == b'"' && c == b'#' && bytes.get(i + 1) == Some(&b'{') {
had_interp = true;
}
if quote == b'"' && c == b'$' {
had_interp = true;
}
i += 1;
}
let after_str = std::str::from_utf8(&bytes[i + 1..]).unwrap_or("");
let concat =
after_str.trim_start().starts_with('+') || after_str.trim_start().starts_with('.');
if had_interp || concat {
CommandArgKind::Interpolated
} else {
CommandArgKind::StaticLiteral
}
} else if trimmed.starts_with(')') {
CommandArgKind::Unknown
} else {
CommandArgKind::UserVariable
}
}
fn shell_metachars_in(s: &str) -> Vec<&'static str> {
let mut out = Vec::new();
if s.contains(';') {
out.push(";");
}
if s.contains("&&") {
out.push("&&");
}
if s.contains("||") {
out.push("||");
}
if s.contains('|') && !s.contains("||") {
out.push("|");
}
if s.contains('`') {
out.push("`");
}
if s.contains("$(") {
out.push("$(");
}
if s.contains('>') && !s.contains(">(") {
out.push(">");
}
if s.contains("<(") {
out.push("<(");
}
if s.contains(">(") {
out.push(">(");
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use crate::detectors::base::Detector;
use crate::graph::builder::GraphBuilder;
#[test]
fn test_detects_os_system_with_user_input() {
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"vuln.py",
"import os\n\ndef run_command(user_input):\n os.system(\"ls \" + user_input)\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect os.system with user input concatenation"
);
assert!(
findings
.iter()
.any(|f| f.title.to_lowercase().contains("command injection")),
"Finding should mention command injection. Titles: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
assert!(
findings
.iter()
.any(|f| f.cwe_id.as_deref() == Some("CWE-78")),
"Finding should have CWE-78"
);
}
#[test]
fn test_no_findings_for_safe_subprocess() {
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("safe.py", "import subprocess\n\ndef list_files():\n result = subprocess.run([\"ls\", \"-la\"], capture_output=True)\n return result.stdout\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Safe subprocess usage with list args should have no findings, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_subprocess_shell_true_python() {
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("run.py", "import subprocess\n\ndef execute(user_input):\n subprocess.call(\"grep \" + user_input, shell=True)\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect subprocess.call with shell=True and user input"
);
assert!(
findings
.iter()
.any(|f| f.cwe_id.as_deref() == Some("CWE-78")),
"Finding should have CWE-78"
);
}
#[test]
fn test_detects_child_process_exec_with_template_js() {
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("handler.js", "const { exec } = require('child_process');\n\nfunction runCommand(req, res) {\n const userId = req.params.id;\n child_process.exec(`find /data -user ${userId}`);\n}\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect child_process.exec with template literal interpolation"
);
assert!(
findings
.iter()
.any(|f| f.title.to_lowercase().contains("command injection")),
"Finding should mention command injection. Titles: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_exec_in_comment() {
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("safe.js", "// Dangerous example: os.system(user_input) - never do this\nfunction safeFunc() {\n return 42;\n}\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"os.system in a comment should not produce findings, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_go_exec_command() {
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("handler.go", "package main\n\nimport (\n\t\"os/exec\"\n\t\"net/http\"\n)\n\nfunc runCmd(w http.ResponseWriter, r *http.Request) {\n\tcmd := r.FormValue(\"command\")\n\texec.Command(cmd)\n}\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect exec.Command with user input from r.FormValue. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
assert!(
findings
.iter()
.any(|f| f.title.to_lowercase().contains("exec.command")),
"Finding should mention exec.Command. Titles: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_subprocess_run_with_shell_true_python() {
let content = "import subprocess\n\ndef run(user_input):\n subprocess.run(user_input, shell=True)\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("vuln.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect subprocess.run with shell=True and user input. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
assert!(
findings
.iter()
.any(|f| f.cwe_id.as_deref() == Some("CWE-78")),
"Finding should carry CWE-78"
);
}
#[test]
fn test_detects_os_system_python() {
let content = "import os\n\ndef run(user_input):\n os.system(user_input)\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("vuln.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect os.system(user_input). Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_child_process_exec_javascript() {
let content = "const child_process = require('child_process');\n\nfunction run(req, res) {\n child_process.exec(req.body.cmd);\n}\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("handler.js", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect child_process.exec(req.body.cmd). Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_ruby_backtick_with_interpolation() {
let content = "def list_files(user_path)\n result = `ls #{user_path}`\n result\nend\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("vuln.rb", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
let _ = findings;
}
#[test]
fn test_detects_php_system_with_user_input() {
let content = "<?php\nfunction run() {\n system($_GET['cmd']);\n}\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("vuln.php", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
let _ = findings;
}
#[test]
fn test_skips_command_in_comment() {
let content = "import subprocess\n\ndef safe(x):\n # subprocess.run(thing, shell=True)\n return x\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("safe.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"subprocess.run inside a comment must not fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_skips_subprocess_run_with_static_list_args() {
let content =
"import subprocess\n\ndef list_files():\n subprocess.run([\"git\", \"status\"])\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("safe.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"subprocess.run with all-literal list args is safe. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_subprocess_run_with_user_in_list_first_position() {
let content = "import subprocess\n\ndef run(user_binary):\n subprocess.run([user_binary, \"--flag\"])\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("vuln.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"argv[0] user-controlled in subprocess.run list must fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_skips_string_literal_mentioning_subprocess() {
let content =
"def doc():\n msg = \"Use subprocess.run() to call commands\"\n return msg\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("docs.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"subprocess.run inside a string literal must not fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_skips_exec_as_method_name() {
let content =
"class Runner:\n def exec(self, cmd):\n return cmd\n\nr = Runner()\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("safe.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"exec as a method-name definition must not fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_subprocess_with_concatenation() {
let content = "import subprocess\n\ndef run(user_dir):\n subprocess.run(\"ls \" + user_dir, shell=True)\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("vuln.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"subprocess.run with `+`-concatenated tainted arg + shell=True must fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_python_fstring_in_subprocess() {
let content = "import subprocess\n\ndef run(user_dir):\n subprocess.run(f\"ls {user_dir}\", shell=True)\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("vuln.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"f-string with user_dir interpolation must fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_js_template_literal_in_exec() {
let content = "const child_process = require('child_process');\n\nfunction run(userDir) {\n child_process.exec(`ls ${userDir}`);\n}\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("handler.js", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"template-literal interpolation in exec must fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_severity_critical_for_user_input_low_for_static_literal() {
let content =
"import os\n\ndef run(user_input):\n os.system(user_input)\n os.system(\"date\")\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("mixed.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
let critical_count = findings
.iter()
.filter(|f| f.severity == Severity::Critical)
.count();
let low_count = findings
.iter()
.filter(|f| f.severity == Severity::Low)
.count();
assert!(
critical_count >= 1,
"Expected >=1 Critical for tainted os.system. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, &f.severity))
.collect::<Vec<_>>()
);
assert!(
findings.len() <= critical_count + low_count,
"Static-literal os.system should be Low or filtered, not Critical. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, &f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_detects_go_exec_command_with_user_arg() {
let content = "package main\n\nimport (\n\t\"os/exec\"\n\t\"net/http\"\n)\n\nfunc runOne(r *http.Request) {\n\tuserBin := r.FormValue(\"bin\")\n\texec.Command(userBin, \"--flag\")\n}\n\nfunc runTwo(r *http.Request) {\n\tuserInput := r.FormValue(\"cmd\")\n\texec.Command(\"sh\", \"-c\", userInput)\n}\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("handler.go", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.len() >= 2,
"Both exec.Command call shapes must fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_b1_require_child_process_exec_detected() {
let content =
"function run(req, res) {\n require('child_process').exec(req.body.cmd);\n}\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("handler.js", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"require('child_process').exec(...) must produce a finding. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
#[allow(non_snake_case)]
fn test_b1_require_child_process_execSync_detected() {
let content =
"function run(userInput) {\n require('child_process').execSync(userInput);\n}\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("handler.js", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"require('child_process').execSync(...) must produce a finding. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_b1_await_import_child_process_exec_detected() {
let content = "async function run(userInput) {\n (await import('child_process')).exec(userInput);\n}\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("handler.js", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"(await import('child_process')).exec(...) must produce a finding. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_b2_go_aliased_exec_command_detected() {
let content =
"package main\n\nimport e \"os/exec\"\n\nfunc handler(b string) {\n\te.Command(b)\n}\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("handler.go", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Aliased exec.Command via `import e \"os/exec\"` must fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_b2_go_aliased_exec_commandcontext_detected() {
let content = "package main\n\nimport (\n\tx \"os/exec\"\n\t\"context\"\n)\n\nfunc handler(ctx context.Context, b string) {\n\tx.CommandContext(ctx, b)\n}\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("handler.go", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Aliased exec.CommandContext must fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_b4_php_object_method_does_not_fire() {
let content = "<?php\nfunction run($obj, $input) {\n $obj->system($input);\n $obj->exec($input);\n $obj->shell_exec($input);\n}\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("safe.php", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Object-method `$obj->system($x)` must not fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_b4_ruby_object_method_does_not_fire() {
let content = "def run(obj, arg)\n obj.system(arg)\n obj.exec(arg)\nend\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("safe.rb", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Object-method `obj.system(x)` must not fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_b5_go_shell_c_with_parenthesized_name_detected() {
let content = "package main\n\nimport (\n\t\"os/exec\"\n\t\"net/http\"\n)\n\nfunc handler(r *http.Request) {\n\tuserInput := r.FormValue(\"cmd\")\n\texec.Command((\"sh\"), \"-c\", userInput)\n}\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("handler.go", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.iter().any(|f| f.severity == Severity::Critical),
"Parenthesized shell name in shell-c form must produce Critical. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, &f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_b6_js_spawn_with_shell_true_option_boosted() {
let content = "const child_process = require('child_process');\nfunction run(userArgs) {\n child_process.spawn('cmd', userArgs, { shell: true });\n}\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("handler.js", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"spawn(..., {{ shell: true }}) must boost severity above Low. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_b6_js_execfile_with_shell_true_option_boosted() {
let content = "const child_process = require('child_process');\nfunction run(userArgs) {\n child_process.execFile('cmd', userArgs, { shell: true });\n}\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("handler.js", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"execFile(..., {{ shell: true }}) must boost severity above Low. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_b8_php_popen_with_user_input_detected() {
let content = "<?php\nfunction run() {\n popen($_GET['cmd'], 'r');\n}\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("vuln.php", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"popen($_GET['cmd'], 'r') must produce a finding. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_b9_process_exec_does_not_fire_as_child_process() {
let content = "class Runner {}\nconst process = new Runner();\nprocess.exec = function(x){};\nfunction run(input) {\n process.exec(input);\n}\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("safe.js", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"`process.exec(x)` is not a real Node API and must not fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_python_bare_system_after_from_import() {
let content = "from os import system\n\ndef run(user_input):\n system(user_input)\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("h.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.iter().any(|f| f.line_start == Some(4)),
"Should fire on `system(user_input)` after `from os import system`. Got: {:?}",
findings
.iter()
.map(|f| (f.line_start, &f.title))
.collect::<Vec<_>>()
);
}
#[test]
fn test_python_bare_subprocess_run_after_from_import() {
let content =
"from subprocess import run\n\ndef go(user_input):\n run(user_input, shell=True)\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("h.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings
.iter()
.any(|f| f.line_start == Some(4) && f.severity == Severity::Critical),
"Should fire Critical on `run(user_input, shell=True)` after `from subprocess import run`. Got: {:?}",
findings
.iter()
.map(|f| (f.line_start, f.severity, &f.title))
.collect::<Vec<_>>()
);
}
#[test]
fn test_python_aliased_module_subprocess_run_detected() {
let content =
"import subprocess as sp\n\ndef go(user_input):\n sp.run(user_input, shell=True)\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("h.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings
.iter()
.any(|f| f.line_start == Some(4) && f.severity == Severity::Critical),
"Should fire Critical on `sp.run(user_input, shell=True)` after `import subprocess as sp`. Got: {:?}",
findings
.iter()
.map(|f| (f.line_start, f.severity, &f.title))
.collect::<Vec<_>>()
);
}
#[test]
fn test_b15_static_literal_with_semicolon_chain_python_os_system() {
let content = "import os\nos.system(\"ls; rm -rf /\")\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("a.py", content)],
);
let findings = detector.detect(&ctx).expect("detection");
assert!(
findings.iter().any(|f| f.severity == Severity::Medium),
"B15: static literal with `;` must fire Medium. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_b15_static_literal_with_pipe_python_subprocess_shell_true() {
let content = "import subprocess\nsubprocess.run(\"cat foo | grep bar\", shell=True)\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("a.py", content)],
);
let findings = detector.detect(&ctx).expect("detection");
assert!(
findings.iter().any(|f| f.severity == Severity::Medium),
"B15: static literal with `|` must fire Medium. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_b15_static_literal_with_dollar_paren_subst_python() {
let content = "import os\nos.system(\"echo $(date)\")\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("a.py", content)],
);
let findings = detector.detect(&ctx).expect("detection");
assert!(
findings.iter().any(|f| f.severity == Severity::Medium),
"B15: static literal with `$(` must fire Medium. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_b15_static_literal_no_metachar_still_low_or_skipped() {
let content = "import os\nos.system(\"date\")\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("a.py", content)],
);
let findings = detector.detect(&ctx).expect("detection");
let medium_or_above = findings
.iter()
.filter(|f| f.severity >= Severity::Medium)
.count();
assert_eq!(
medium_or_above,
0,
"Static literal `date` (no metachar) must not fire Medium+. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_python_subprocess_list_literal_argv0_with_url_param_is_low() {
let content =
"import subprocess\n\ndef open_url(url):\n subprocess.Popen([\"xdg-open\", url])\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("u.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
let high_or_critical = findings
.iter()
.filter(|f| matches!(f.severity, Severity::High | Severity::Critical))
.count();
assert_eq!(
high_or_critical,
0,
"Literal argv[0] (`xdg-open`) + variable later arg with shell=False \
must not produce High/Critical findings. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_python_subprocess_list_literal_argv0_with_filepath_is_low() {
let content = "import subprocess\n\ndef show(url):\n subprocess.call([\"explorer\", f\"/select,{url}\"])\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("p.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
let high_or_critical = findings
.iter()
.filter(|f| matches!(f.severity, Severity::High | Severity::Critical))
.count();
assert_eq!(
high_or_critical,
0,
"Literal argv[0] + interpolated later arg with shell=False must not \
produce High/Critical findings. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_python_subprocess_sh_dash_c_user_input_critical() {
let content = "import subprocess\n\ndef run(user_input):\n subprocess.run([\"sh\", \"-c\", user_input])\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("h.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.iter().any(|f| f.severity == Severity::Critical),
"subprocess.run([\"sh\", \"-c\", user_input]) must fire Critical. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_python_subprocess_bash_dash_c_user_input_critical() {
for variant in &["bash", "/bin/bash"] {
let content = format!(
"import subprocess\n\ndef run(user_input):\n subprocess.run([\"{}\", \"-c\", user_input])\n",
variant
);
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("h.py", content.as_str())],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.iter().any(|f| f.severity == Severity::Critical),
"subprocess.run([\"{}\", \"-c\", user_input]) must fire Critical. Got: {:?}",
variant,
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
}
#[test]
fn test_js_spawn_sh_dash_c_user_critical() {
let content = "const child_process = require('child_process');\nfunction run(userInput) {\n child_process.spawn(\"sh\", [\"-c\", userInput]);\n}\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("h.js", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.iter().any(|f| f.severity == Severity::Critical),
"child_process.spawn(\"sh\", [\"-c\", userInput]) must fire Critical. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_js_spawn_literal_argv0_with_user_arg_low() {
let content = "const child_process = require('child_process');\nfunction openUrl(url) {\n child_process.spawn(\"xdg-open\", [url]);\n}\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("o.js", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
let high_or_critical = findings
.iter()
.filter(|f| matches!(f.severity, Severity::High | Severity::Critical))
.count();
assert_eq!(
high_or_critical,
0,
"spawn(\"xdg-open\", [url]) must not produce High/Critical. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_python_subprocess_var_argv0_unchanged() {
let content = "import subprocess\n\ndef go(cmd):\n subprocess.run([cmd, \"--flag\"])\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("v.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.iter().any(|f| f.severity == Severity::Critical),
"subprocess.run([cmd, \"--flag\"]) (variable argv[0]) must stay Critical. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_python_subprocess_shell_true_unchanged() {
let content = "import subprocess\n\ndef go(x):\n subprocess.run([\"sh\", \"echo\", x], shell=True)\n";
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("s.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings
.iter()
.any(|f| matches!(f.severity, Severity::High | Severity::Critical)),
"shell=True with mixed list must still surface High/Critical. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
fn run_dual_branch(file: &str, content: &str) -> Vec<Finding> {
use crate::config::DualBranchConfig;
use std::collections::HashMap;
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let mut detectors = HashMap::new();
detectors.insert("command-injection".to_string(), true);
let cfg = DualBranchConfig {
enabled: true,
detectors,
};
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(file, content)],
)
.with_dual_branch(cfg);
detector.detect(&ctx).expect("detection should succeed")
}
#[test]
fn flag_off_emits_single_branch_unchanged() {
let store = GraphBuilder::new().freeze();
let detector = CommandInjectionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"vuln.py",
"import os\n\
def handler(request):\n\
\x20 os.system(\"ls \" + request.GET[\"q\"])\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(!findings.is_empty(), "must still fire single-branch");
for f in &findings {
assert!(
f.alternative_branch.is_none(),
"no alternative_branch when flag off: {:?}",
f.title
);
assert!(
f.prediction_reasons.iter().all(|r| r.weight == 0.0),
"no predictor-emitted (weight ≠ 0) reasons when flag off; \
only weight-0 graph-enrichment reasons are allowed. reasons: {:?}",
f.prediction_reasons
.iter()
.map(|r| (&r.kind, r.weight))
.collect::<Vec<_>>()
);
}
}
#[test]
fn flag_on_python_command_injection_emits_dual_branch() {
let findings = run_dual_branch(
"vuln.py",
"import os\n\
def handler(request):\n\
\x20 os.system(\"ls \" + request.GET[\"q\"])\n",
);
assert!(!findings.is_empty(), "must fire dual-branch");
let f = &findings[0];
assert!(
f.alternative_branch.is_some(),
"alternative_branch must be populated when flag on. title={:?}",
f.title
);
assert!(
!f.prediction_reasons.is_empty(),
"at least one prediction reason"
);
}
#[test]
fn matrix_predicted_realbug_request_source() {
let findings = run_dual_branch(
"vuln.py",
"import os\n\
def handler(request):\n\
\x20 os.system(\"ls \" + request.GET[\"q\"])\n",
);
assert!(!findings.is_empty());
let f = &findings[0];
assert!(
matches!(f.severity, Severity::High | Severity::Critical),
"predicted RealBug uses 2D severity table — interpolated arg \
to os.system is High/Critical, got {:?}",
f.severity
);
assert!(
f.title.to_lowercase().contains("command injection"),
"RealBug title; got {:?}",
f.title
);
let alt = f.alternative_branch.as_ref().unwrap();
assert_eq!(alt.label, crate::dual_branch::BranchLabel::Benign);
assert_eq!(alt.severity, Severity::Info);
}
#[test]
fn matrix_predicted_benign_all_literals_list() {
let findings = run_dual_branch(
"internal.py",
"import subprocess\n\
def cleanup():\n\
\x20 subprocess.run([\"rm\", \"-rf\", \"/tmp/cache\"])\n",
);
assert!(!findings.is_empty(), "must surface (drop-Low disabled)");
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("must have at least one dual-branch finding");
assert_eq!(
f.severity,
Severity::Info,
"predicted Benign → Info, got {:?}",
f.severity
);
let alt = f.alternative_branch.as_ref().unwrap();
assert_eq!(alt.label, crate::dual_branch::BranchLabel::RealBug);
assert!(
!matches!(alt.severity, Severity::Info),
"alternative carries non-Info severity (the original 2D \
table interpretation); got {:?}",
alt.severity
);
}
#[test]
fn matrix_predicted_benign_literal_argv0_param_tail() {
let findings = run_dual_branch(
"tool.py",
"import subprocess\n\
def lookup(name):\n\
\x20 subprocess.run([\"grep\", name, \"/etc/passwd\"])\n",
);
assert!(!findings.is_empty());
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("dual-branch finding expected");
assert_eq!(
f.severity,
Severity::Info,
"literal argv0 + var tail leans Benign → Info; got {:?}",
f.severity
);
let alt = f.alternative_branch.as_ref().unwrap();
assert_eq!(alt.label, crate::dual_branch::BranchLabel::RealBug);
}
#[test]
fn dual_branch_finding_skips_pass_c_handler_boost() {
let findings = run_dual_branch(
"h.py",
"import subprocess\n\
def handler():\n\
\x20 subprocess.run([\"ls\", \"-la\"])\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("dual-branch finding expected");
assert_eq!(
f.severity,
Severity::Info,
"Pass C skipped dual-branch findings — predicted Info \
stays Info even in handler-scope. Got {:?}",
f.severity
);
}
#[test]
fn dual_branch_drop_low_disabled_when_flag_on() {
let findings = run_dual_branch(
"static.py",
"import os\n\
def cleanup():\n\
\x20 os.system(\"rm -rf /tmp/cache\")\n",
);
assert!(
findings.iter().any(|f| f.is_dual_branch()),
"drop-Low must be disabled when flag-on — static-literal \
call should still surface as a dual-branch finding. \
Findings: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity, f.is_dual_branch()))
.collect::<Vec<_>>()
);
}
#[test]
fn collapsing_annotation_command_static_forces_benign() {
let findings = run_dual_branch(
"annot.py",
"import os\n\
def handler(request):\n\
\x20 os.system(\"ls \" + request.GET[\"q\"]) # repotoire: command-static[internal-tool]\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("dual-branch finding expected");
assert_eq!(f.severity, Severity::Info, "annotation collapses to Info");
assert!(
!f.resolution_signals.is_empty(),
"annotation must surface as a resolution_signal"
);
}
#[test]
fn collapsing_annotation_command_user_controlled_forces_realbug() {
let findings = run_dual_branch(
"annot.py",
"import subprocess\n\
def cleanup():\n\
\x20 subprocess.run([\"rm\", \"/tmp/cache\"]) # repotoire: command-user-controlled[admin-route]\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("dual-branch finding expected");
assert!(
!matches!(f.severity, Severity::Info),
"annotation forces RealBug, severity from 2D table, not \
Info. Got {:?}",
f.severity
);
assert!(
!f.resolution_signals.is_empty(),
"annotation must surface as a resolution_signal"
);
}
#[test]
fn real_gitpython_execute_signature() {
let findings = run_dual_branch(
"gitpython_execute.py",
"import subprocess\n\
def execute(self, command):\n\
\x20 return subprocess.Popen([\"git\"] + command, stdout=subprocess.PIPE)\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("dual-branch finding expected");
assert!(f.alternative_branch.is_some());
}
#[test]
fn real_flask_handler_shell_true() {
let findings = run_dual_branch(
"flask_handler.py",
"import subprocess\n\
def run_command(request):\n\
\x20 cmd = request.args.get(\"cmd\")\n\
\x20 subprocess.run(cmd, shell=True)\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("dual-branch finding expected");
assert!(
matches!(f.severity, Severity::High | Severity::Critical),
"shell=True + handler-named function → RealBug, severity \
from 2D table (High/Critical). Got {:?}. Reasons: {:?}",
f.severity,
f.prediction_reasons
.iter()
.map(|r| (&r.kind, r.weight))
.collect::<Vec<_>>()
);
assert!(
f.prediction_reasons.iter().any(|r| matches!(
&r.kind,
crate::dual_branch::PredictionReasonKind::KeywordArgument { name, value }
if name == "shell" && value == "True"
)),
"KeywordArgument(shell=True) signal must fire on shell=True. Reasons: {:?}",
f.prediction_reasons
.iter()
.map(|r| &r.kind)
.collect::<Vec<_>>()
);
}
#[test]
fn real_pip_invoke_via_sys_executable() {
let findings = run_dual_branch(
"pip_invoke.py",
"import subprocess, sys\n\
def install_package(name):\n\
\x20 subprocess.run([sys.executable, \"-m\", \"pip\", \"install\", name], check=True)\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("dual-branch finding expected");
let alt = f
.alternative_branch
.as_ref()
.expect("alternative_branch populated");
assert!(
matches!(
alt.label,
crate::dual_branch::BranchLabel::RealBug | crate::dual_branch::BranchLabel::Benign
),
"alternative branch label must be set"
);
assert!(
!alt.title.is_empty() && !alt.description.is_empty(),
"alternative branch must carry a title and description"
);
}
}