mod annotation;
mod evidence;
mod predict;
use crate::detectors::ast_fingerprint::parse_root_ext;
use crate::detectors::ast_walk::AstWalkCtx;
use crate::detectors::base::{Detector, DetectorConfig};
use crate::detectors::detector_context::ContentFlags;
use crate::detectors::fast_search::{find_in, *};
use crate::detectors::security::ast_helpers::{
collect_named_args, node_text, receiver_chain_label as receiver_chain_label_shared,
unwrap_callee,
};
use crate::detectors::security::scan_inputs::{ScanAstInputs, ScanInputs};
use crate::detectors::taint::{TaintAnalysisResult, TaintAnalyzer, TaintCategory};
use crate::models::{Finding, Severity};
use crate::parsers::lightweight::Language;
use anyhow::Result;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{LazyLock, OnceLock};
const SUPPORTED_EXTS: &[&str] = &["py", "js", "ts", "jsx", "tsx", "go"];
const AST_EXTS: &[&str] = &["py", "js", "ts", "jsx", "tsx", "go"];
pub struct PathTraversalDetector {
repository_path: PathBuf,
max_findings: usize,
taint_analyzer: TaintAnalyzer,
precomputed_cross: OnceLock<Vec<crate::detectors::taint::TaintPath>>,
precomputed_intra: OnceLock<Vec<crate::detectors::taint::TaintPath>>,
}
impl PathTraversalDetector {
pub fn new(repository_path: impl Into<PathBuf>) -> Self {
Self {
repository_path: repository_path.into(),
max_findings: 50,
taint_analyzer: TaintAnalyzer::new(),
precomputed_cross: OnceLock::new(),
precomputed_intra: OnceLock::new(),
}
}
}
impl Detector for PathTraversalDetector {
fn name(&self) -> &'static str {
"path-traversal"
}
fn description(&self) -> &'static str {
"Detects path traversal vulnerabilities"
}
fn bypass_postprocessor(&self) -> bool {
true
}
crate::detectors::impl_taint_precompute!();
fn taint_category(&self) -> Option<crate::detectors::taint::TaintCategory> {
Some(TaintCategory::PathTraversal)
}
fn file_extensions(&self) -> &'static [&'static str] {
SUPPORTED_EXTS
}
fn content_requirements(&self) -> crate::detectors::detector_context::ContentFlags {
crate::detectors::detector_context::ContentFlags::FILE_OPS
.union(crate::detectors::detector_context::ContentFlags::PATH_OPS)
}
fn detect(
&self,
ctx: &crate::detectors::analysis_context::AnalysisContext,
) -> Result<Vec<Finding>> {
let graph = ctx.graph;
let det_ctx = &ctx.detector_ctx;
let files = &ctx.as_file_provider();
let mut findings: Vec<Finding> = vec![];
let mut taint_paths = if let Some(cross) = self.precomputed_cross.get() {
cross.clone()
} else {
self.taint_analyzer
.trace_taint(graph, TaintCategory::PathTraversal)
};
let intra_paths = if let Some(intra) = self.precomputed_intra.get() {
intra.clone()
} else {
crate::detectors::taint::run_intra_function_taint(
&self.taint_analyzer,
graph,
TaintCategory::PathTraversal,
&self.repository_path,
)
};
taint_paths.extend(intra_paths);
let taint_result = TaintAnalysisResult::from_paths(taint_paths);
let dual_branch_policy = DualBranchPolicy {
flag_on: ctx.dual_branch.is_enabled_for("path-traversal"),
};
for path in files.files_with_extensions(SUPPORTED_EXTS) {
if findings.len() >= self.max_findings {
break;
}
let flags = det_ctx.content_flags.get(path).copied().unwrap_or_default();
let should_check = flags.has(ContentFlags::FILE_OPS)
|| flags.has(ContentFlags::PATH_OPS)
|| det_ctx.content_flags.is_empty();
if !should_check {
continue;
}
let raw = match files.content(path) {
Some(c) => c,
None => continue,
};
let raw_str: &str = &raw;
if det_ctx.content_flags.is_empty() && !contains_any(PATH_KEYWORD_FINDERS, raw_str) {
continue;
}
if raw_str.len() > 500_000 {
continue;
}
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
if !AST_EXTS.contains(&ext) {
continue;
}
let lang = Language::from_path(path);
let cached = files.tree(path);
let scan = ScanInputs::new(path, raw_str, ext);
let ast_inputs = ScanAstInputs::new(scan, lang, cached.as_deref());
let new_findings = self.scan_file_ast(&ast_inputs, &taint_result, &dual_branch_policy);
findings.extend(new_findings);
}
merge_taint_paths(&mut findings, &taint_result, &self.repository_path);
findings.retain(|f| f.severity != Severity::Low);
Ok(findings)
}
}
impl crate::detectors::RegisteredDetector for PathTraversalDetector {
fn create(init: &crate::detectors::DetectorInit) -> std::sync::Arc<dyn Detector> {
std::sync::Arc::new(Self::new(init.repo_path))
}
}
static PATH_KEYWORD_FINDERS: &[&LazyLock<memchr::memmem::Finder<'static>>] = &[
&FIND_OPEN_PAREN,
&FIND_READ_FILE,
&FIND_WRITE_FILE,
&FIND_PATH_JOIN,
&FIND_PATH_RESOLVE,
&FIND_OS_PATH,
&FIND_SEND_FILE,
&FIND_SEND_FILE_SNAKE,
&FIND_SERVE_FILE,
&FIND_UNLINK,
&FIND_RMDIR,
&FIND_MKDIR,
&FIND_COPY_FILE,
&FIND_RENAME_PAREN,
&FIND_OS_REMOVE,
&FIND_SHUTIL,
&FIND_FILEPATH,
&FIND_PATHLIB,
&FIND_CREATE_READ_STREAM,
&FIND_CREATE_WRITE_STREAM,
&FIND_APPEND_FILE,
&FIND_STAT_SYNC,
&FIND_ACCESS_SYNC,
];
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PathApi {
FileOp,
PathJoin,
SendFile,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PathArgKind {
StaticLiteral,
Interpolated,
UserVariable,
Concatenation,
Unknown,
}
impl PathApi {
fn title(self) -> &'static str {
match self {
PathApi::FileOp => "Potential path traversal in file operation",
PathApi::PathJoin => "Path traversal via path.join with user input",
PathApi::SendFile => "Path traversal in file download",
}
}
fn base_description(self) -> &'static str {
match self {
PathApi::FileOp => {
"File operation with user-controlled input detected. An attacker could use '../' \
sequences to access files outside the intended directory."
}
PathApi::PathJoin => {
"path.join() with user input does NOT prevent path traversal. Joining '/base' \
with '../etc/passwd' results in '/etc/passwd'."
}
PathApi::SendFile => {
"File download/send function with user-controlled path. Attackers could download \
arbitrary files from the server."
}
}
}
fn suggested_fix(self) -> &'static str {
match self {
PathApi::FileOp =>
"1. Use path.basename() to extract filename only\n\
2. Validate resolved path is within allowed directory\n\
3. Use a whitelist of allowed filenames if possible",
PathApi::PathJoin =>
"After joining, verify the resolved path starts with your base directory:\n\
```\nconst resolved = path.resolve(baseDir, userInput);\n\
if (!resolved.startsWith(path.resolve(baseDir))) { throw new Error('Invalid path'); }\n```",
PathApi::SendFile =>
"Use res.download() with { root: '/safe/base/dir' } option, or validate resolved \
path is within allowed directory.",
}
}
fn why_it_matters(self) -> &'static str {
match self {
PathApi::FileOp => {
"Attackers could read sensitive files like /etc/passwd or overwrite critical \
system files."
}
PathApi::PathJoin => {
"path.join() is commonly misunderstood as safe, but it preserves '../' sequences \
allowing directory escape."
}
PathApi::SendFile => {
"Attackers could download sensitive configuration files, source code, or \
credentials from the server."
}
}
}
fn severity_for(self, kind: PathArgKind, has_user_marker: bool) -> Severity {
match (self, kind) {
(_, PathArgKind::StaticLiteral) => Severity::Low,
(PathApi::PathJoin, PathArgKind::Unknown) => Severity::Low,
(_, PathArgKind::Interpolated)
| (_, PathArgKind::UserVariable)
| (_, PathArgKind::Concatenation) => {
if has_user_marker {
Severity::High
} else {
Severity::Medium
}
}
(_, PathArgKind::Unknown) => {
if has_user_marker {
Severity::Medium
} else {
Severity::Low
}
}
}
}
}
struct PathSite<'a> {
call_node: tree_sitter::Node<'a>,
api: PathApi,
arg_kind: PathArgKind,
arg_text: String,
}
struct DualBranchPolicy {
flag_on: bool,
}
impl DualBranchPolicy {
fn applies_to(&self, lang: Language, _api: PathApi) -> bool {
if !self.flag_on {
return false;
}
if !matches!(lang, Language::Python) {
return false;
}
true
}
}
impl PathTraversalDetector {
fn scan_file_ast(
&self,
inputs: &ScanAstInputs<'_>,
_taint_result: &TaintAnalysisResult,
dual_branch_policy: &DualBranchPolicy,
) -> Vec<Finding> {
let path = inputs.path();
let content = inputs.content();
let ext = inputs.ext();
let lang = inputs.lang;
let cached_tree = inputs.cached_tree;
let mut findings = vec![];
if content.contains('\0') {
return findings;
}
let owned;
let root = match cached_tree {
Some(tree) => tree.root_node(),
None => match parse_root_ext(content, lang, ext) {
Some(t) => {
owned = t;
owned.root_node()
}
None => return findings,
},
};
let bytes = content.as_bytes();
let lines: Vec<&str> = content.lines().collect();
let py_aliases = if matches!(lang, Language::Python) {
super::python_imports::collect_python_from_imports(root, bytes)
} else {
HashMap::new()
};
let py_module_aliases = if matches!(lang, Language::Python) {
super::python_imports::collect_python_module_aliases(root, bytes)
} else {
HashMap::new()
};
let mut sites: Vec<PathSite> = Vec::new();
let ctx = AstWalkCtx {
lang,
source: bytes,
};
let aliases = super::python_imports::PythonAliases::new(&py_aliases, &py_module_aliases);
collect_path_sites(&ctx, root, &aliases, &mut sites);
let rel_path = path
.strip_prefix(&self.repository_path)
.unwrap_or(path)
.to_path_buf();
let file_str = path.to_string_lossy();
let is_test_file = file_str.contains("/test")
|| file_str.contains("/tests/")
|| file_str.contains("_test.")
|| file_str.contains(".test.")
|| file_str.contains("/spec/")
|| file_str.contains("_spec.");
for site in sites {
if findings.len() >= self.max_findings {
break;
}
let line_idx = site.call_node.start_position().row;
if let Some(line) = lines.get(line_idx) {
let prev = if line_idx > 0 {
Some(lines[line_idx - 1])
} else {
None
};
if crate::detectors::is_line_suppressed(line, prev) {
continue;
}
}
let line_num = (line_idx + 1) as u32;
if dual_branch_policy.applies_to(lang, site.api) {
let snippet = lines.get(line_idx).map(|s| s.trim()).unwrap_or("");
findings.push(self.build_dual_branch_python_finding(
&rel_path,
line_num,
site.api,
snippet,
site.call_node,
bytes,
&lines,
));
continue;
}
let has_user_marker = contains_any(USER_INPUT_FINDERS, &site.arg_text);
let mut severity = site.api.severity_for(site.arg_kind, has_user_marker);
if is_test_file {
severity = match severity {
Severity::Critical => Severity::Medium,
Severity::High => Severity::Low,
Severity::Medium => Severity::Low,
other => other,
};
}
if severity == Severity::Low {
continue;
}
findings.push(Finding {
id: String::new(),
detector: "PathTraversalDetector".to_string(),
severity,
title: site.api.title().to_string(),
description: site.api.base_description().to_string(),
affected_files: vec![rel_path.clone()],
line_start: Some(line_num),
line_end: Some(line_num),
suggested_fix: Some(site.api.suggested_fix().to_string()),
estimated_effort: Some("30 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-22".to_string()),
why_it_matters: Some(site.api.why_it_matters().to_string()),
..Default::default()
});
}
findings
}
fn build_dual_branch_python_finding(
&self,
rel_path: &Path,
line_num: u32,
api: PathApi,
snippet: &str,
call_node: tree_sitter::Node<'_>,
source: &[u8],
lines: &[&str],
) -> Finding {
let api_label = match api {
PathApi::FileOp => "open",
PathApi::PathJoin => "os.path.join",
PathApi::SendFile => "send_file",
};
let evidence = evidence::extract_python_evidence(call_node, source, lines);
let prediction = predict::predict(&evidence, api_label);
let predicted_label = prediction.predicted;
let predicted_severity = prediction.predicted_severity;
let predicted_title = match predicted_label {
crate::dual_branch::BranchLabel::RealBug => {
format!("Path traversal via {api_label}")
}
crate::dual_branch::BranchLabel::Benign => {
format!("Internal path-join in {api_label} (informational)")
}
};
let predicted_description = format!(
"**Path traversal (dual-branch)**\n\n\
**API**: `{}`\n\n\
**Location**: {}:{}\n\n\
**Code**:\n```\n{}\n```\n\n\
{}",
api_label,
rel_path.display(),
line_num,
snippet,
match predicted_label {
crate::dual_branch::BranchLabel::RealBug => format!(
"The path argument to `{api_label}` appears to originate from \
user-controlled input. The predictor leans RealBug for this \
call site (see `prediction_reasons`)."
),
crate::dual_branch::BranchLabel::Benign => format!(
"The path argument to `{api_label}` appears to be \
internal/literal/config-derived. The predictor leans Benign \
(see `prediction_reasons`); the High-severity interpretation \
is carried in `alternative_branch`."
),
},
);
let predicted_fix = match predicted_label {
crate::dual_branch::BranchLabel::RealBug => Some(
"Validate the path component against an allowlist, or wrap with \
`os.path.basename(...)` to strip `..` sequences. For file-serving \
endpoints, use `flask.send_from_directory` or \
`pathlib.Path.resolve` with a base-prefix check.\n\n\
If this is a false positive (the path is internal/config-derived \
and not attacker-reachable), annotate the call site with \
`# repotoire: internal-path[<reason>]` to collapse the finding \
to Info."
.to_string(),
),
crate::dual_branch::BranchLabel::Benign => Some(
"If this is intentional internal use, annotate \
`# repotoire: internal-path[<reason>]` to collapse the finding to \
Info definitively. If this IS attacker-reachable (the alternative \
branch), validate against an allowlist or wrap with \
`os.path.basename(...)`."
.to_string(),
),
};
let mut finding = Finding {
id: String::new(),
detector: "PathTraversalDetector".to_string(),
severity: predicted_severity,
title: predicted_title,
description: predicted_description,
affected_files: vec![rel_path.to_path_buf()],
line_start: Some(line_num),
line_end: Some(line_num),
suggested_fix: predicted_fix,
estimated_effort: Some("30 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-22".to_string()),
why_it_matters: Some(
"Path traversal lets attackers read or write files outside the \
intended directory — but not every path-join call site is \
attacker-reachable. The predictor's job is to distinguish."
.to_string(),
),
..Default::default()
};
finding = finding.with_alternative_branch(prediction.alternative_branch);
for reason in prediction.reasons {
finding = finding.with_prediction_reason(reason);
}
for resolution in prediction.resolutions {
finding = finding.with_resolution_signal(resolution);
}
finding
}
}
static USER_INPUT_FINDERS: &[&LazyLock<memchr::memmem::Finder<'static>>] = &[
&FIND_REQ_PARAMS,
&FIND_REQ_QUERY,
&FIND_REQ_BODY,
&FIND_REQ_FILE,
&FIND_REQUEST_GET,
&FIND_REQUEST_POST,
&FIND_REQUEST_FILES,
&FIND_REQUEST_ARGS,
&FIND_REQUEST_FORM,
&FIND_REQUEST_DATA,
&FIND_REQUEST_VALUES,
&FIND_PARAMS_BRACKET,
&FIND_INPUT_PAREN,
&FIND_SYS_ARGV,
&FIND_PROCESS_ARGV,
&FIND_R_URL,
&FIND_C_PARAM,
&FIND_C_QUERY,
&FIND_FORM_VALUE,
&FIND_R_FORM,
&FIND_QUERY_BRACKET,
&FIND_QUERY_GET,
&FIND_BODY_BRACKET,
&FIND_BODY_GET,
];
fn collect_path_sites<'a>(
ctx: &AstWalkCtx<'a>,
node: tree_sitter::Node<'a>,
py_aliases: &super::python_imports::PythonAliases<'_>,
out: &mut Vec<PathSite<'a>>,
) {
if let Some(site) = match_path_site(node, ctx.source, ctx.lang, py_aliases) {
out.push(site);
}
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
collect_path_sites(ctx, child, py_aliases, out);
}
}
fn match_path_site<'a>(
node: tree_sitter::Node<'a>,
source: &'a [u8],
lang: Language,
py_aliases: &super::python_imports::PythonAliases<'_>,
) -> Option<PathSite<'a>> {
match (node.kind(), lang) {
("call", Language::Python) => match_python_call(node, source, py_aliases),
("call_expression", Language::JavaScript | Language::TypeScript) => {
match_js_call(node, source)
}
("call_expression", Language::Go) => match_go_call(node, source),
_ => None,
}
}
fn classify_python_path_callee(module: &str, name: &str) -> Option<(PathApi, usize)> {
Some(match (module, name) {
("", "open") | ("builtins", "open") | ("io", "open") => (PathApi::FileOp, 0),
("os", "remove" | "unlink" | "rmdir" | "mkdir" | "rename" | "chmod") => {
(PathApi::FileOp, 0)
}
("shutil", "copy" | "copyfile" | "copy2" | "move" | "rmtree") => (PathApi::FileOp, 0),
("os.path", "join") => (PathApi::PathJoin, 0),
("pathlib", "Path") => (PathApi::FileOp, 0),
("flask", "send_file" | "send_from_directory") => (PathApi::SendFile, 0),
("django.http", "FileResponse") => (PathApi::SendFile, 0),
("starlette.responses", "FileResponse") => (PathApi::SendFile, 0),
_ => return None,
})
}
fn match_python_call<'a>(
node: tree_sitter::Node<'a>,
source: &'a [u8],
aliases: &super::python_imports::PythonAliases<'_>,
) -> Option<PathSite<'a>> {
let func = node.child_by_field_name("function")?;
let func = unwrap_callee(func);
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let (api, idx) = match func.kind() {
"attribute" => {
let obj = func.child_by_field_name("object")?;
let attr = func.child_by_field_name("attribute")?;
let attr_text = node_text(attr, source)?;
let recv_text = node_text(obj, source).unwrap_or("");
let module_label = aliases
.modules
.get(recv_text)
.cloned()
.unwrap_or_else(|| recv_text.to_string());
classify_python_path_callee(&module_label, attr_text)?
}
"identifier" => {
let name = node_text(func, source)?;
let module = if name == "open" {
"".to_string()
} else {
aliases.imports.get(name).cloned()?
};
classify_python_path_callee(&module, name)?
}
_ => return None,
};
let target = arg_nodes.get(idx).copied()?;
let target = if target.kind() == "keyword_argument" {
arg_nodes
.iter()
.copied()
.find(|a| a.kind() != "keyword_argument")?
} else {
target
};
let arg_kind = if api == PathApi::PathJoin {
classify_path_args_python(&arg_nodes, source)
} else {
classify_path_arg_python(target, source)
};
let arg_text = collect_arg_text(&arg_nodes, source);
Some(PathSite {
call_node: node,
api,
arg_kind,
arg_text,
})
}
fn classify_path_args_python(args: &[tree_sitter::Node<'_>], source: &[u8]) -> PathArgKind {
let mut strongest = PathArgKind::StaticLiteral;
for a in args {
if a.kind() == "keyword_argument" {
continue;
}
let k = classify_path_arg_python(*a, source);
strongest = strongest_kind(strongest, k);
}
strongest
}
#[allow(clippy::only_used_in_recursion)]
fn classify_path_arg_python(node: tree_sitter::Node<'_>, source: &[u8]) -> PathArgKind {
match node.kind() {
"string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if child.kind() == "interpolation" {
return PathArgKind::Interpolated;
}
}
PathArgKind::StaticLiteral
}
"concatenated_string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if classify_path_arg_python(child, source) == PathArgKind::Interpolated {
return PathArgKind::Interpolated;
}
}
PathArgKind::StaticLiteral
}
"binary_operator" => {
let left = node.child_by_field_name("left");
let right = node.child_by_field_name("right");
let mut found_var = false;
let mut found_lit = false;
for opt in [left, right].iter().flatten() {
match classify_path_arg_python(*opt, source) {
PathArgKind::UserVariable
| PathArgKind::Interpolated
| PathArgKind::Concatenation
| PathArgKind::Unknown => found_var = true,
PathArgKind::StaticLiteral => found_lit = true,
}
}
if found_var && found_lit {
PathArgKind::Concatenation
} else if found_var {
PathArgKind::UserVariable
} else {
PathArgKind::StaticLiteral
}
}
"identifier" | "attribute" | "subscript" | "call" => PathArgKind::UserVariable,
"parenthesized_expression" => {
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
return classify_path_arg_python(c, source);
}
}
PathArgKind::Unknown
}
"await" | "conditional_expression" => {
let mut strongest = PathArgKind::StaticLiteral;
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
strongest = strongest_kind(strongest, classify_path_arg_python(c, source));
}
}
strongest
}
_ => PathArgKind::Unknown,
}
}
fn match_js_call<'a>(node: tree_sitter::Node<'a>, source: &'a [u8]) -> Option<PathSite<'a>> {
let func = node.child_by_field_name("function")?;
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let func = unwrap_callee(func);
let (api, idx) = match func.kind() {
"identifier" => {
match node_text(func, source)? {
"readFile" | "readFileSync" | "writeFile" | "writeFileSync" | "appendFile"
| "unlink" | "unlinkSync" | "rmdir" | "mkdir" | "copyFile" | "rename" | "stat"
| "statSync" | "access" | "accessSync" | "createReadStream"
| "createWriteStream" | "open" => (PathApi::FileOp, 0),
_ => return None,
}
}
"member_expression" => {
let obj = func.child_by_field_name("object")?;
let prop = func.child_by_field_name("property")?;
let prop_text = node_text(prop, source)?;
let recv = receiver_chain_label_js(obj, source);
let is_fs = matches!(recv.as_str(), "fs" | "fsp" | "fspromises");
let is_path = matches!(recv.as_str(), "path");
let is_response = matches!(recv.as_str(), "res" | "response");
if is_fs {
match prop_text {
"readFile" | "readFileSync" | "writeFile" | "writeFileSync" | "appendFile"
| "unlink" | "unlinkSync" | "rmdir" | "mkdir" | "copyFile" | "rename"
| "stat" | "statSync" | "access" | "accessSync" | "createReadStream"
| "createWriteStream" | "open" => (PathApi::FileOp, 0),
_ => return None,
}
} else if is_path {
match prop_text {
"join" | "resolve" => (PathApi::PathJoin, 0),
_ => return None,
}
} else if is_response {
match prop_text {
"sendFile" | "download" => (PathApi::SendFile, 0),
_ => return None,
}
} else {
return None;
}
}
_ => return None,
};
let target = arg_nodes.get(idx).copied()?;
let arg_kind = if api == PathApi::PathJoin {
classify_path_args_js(&arg_nodes, source)
} else {
classify_path_arg_js(target, source)
};
let arg_text = collect_arg_text(&arg_nodes, source);
Some(PathSite {
call_node: node,
api,
arg_kind,
arg_text,
})
}
fn classify_path_args_js(args: &[tree_sitter::Node<'_>], source: &[u8]) -> PathArgKind {
let mut strongest = PathArgKind::StaticLiteral;
for a in args {
let k = classify_path_arg_js(*a, source);
strongest = strongest_kind(strongest, k);
}
strongest
}
#[allow(clippy::only_used_in_recursion)]
fn classify_path_arg_js(node: tree_sitter::Node<'_>, source: &[u8]) -> PathArgKind {
match node.kind() {
"string" => PathArgKind::StaticLiteral,
"template_string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if child.kind() == "template_substitution" {
return PathArgKind::Interpolated;
}
}
PathArgKind::StaticLiteral
}
"binary_expression" => {
let left = node.child_by_field_name("left");
let right = node.child_by_field_name("right");
let mut found_var = false;
let mut found_lit = false;
for opt in [left, right].iter().flatten() {
match classify_path_arg_js(*opt, source) {
PathArgKind::UserVariable
| PathArgKind::Interpolated
| PathArgKind::Concatenation
| PathArgKind::Unknown => found_var = true,
PathArgKind::StaticLiteral => found_lit = true,
}
}
if found_var && found_lit {
PathArgKind::Concatenation
} else if found_var {
PathArgKind::UserVariable
} else {
PathArgKind::StaticLiteral
}
}
"identifier" | "member_expression" | "subscript_expression" | "call_expression"
if is_trusted_node_global(node, source) =>
{
PathArgKind::StaticLiteral
}
"identifier" | "member_expression" | "subscript_expression" | "call_expression" => {
PathArgKind::UserVariable
}
"parenthesized_expression"
| "await_expression"
| "as_expression"
| "type_assertion_expression"
| "non_null_expression"
| "satisfies_expression" => {
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
return classify_path_arg_js(c, source);
}
}
PathArgKind::Unknown
}
"ternary_expression" => {
let consequence = node.child_by_field_name("consequence");
let alternative = node.child_by_field_name("alternative");
let mut strongest = PathArgKind::StaticLiteral;
for opt in [consequence, alternative].iter().flatten() {
let k = classify_path_arg_js(*opt, source);
strongest = strongest_kind(strongest, k);
}
strongest
}
_ => PathArgKind::Unknown,
}
}
fn is_trusted_node_global(node: tree_sitter::Node<'_>, source: &[u8]) -> bool {
match node.kind() {
"identifier" => {
matches!(node_text(node, source), Some("__dirname" | "__filename"))
}
"member_expression" => {
let obj = match node.child_by_field_name("object") {
Some(n) => n,
None => return false,
};
let prop = match node.child_by_field_name("property") {
Some(n) => n,
None => return false,
};
matches!(node_text(obj, source), Some("process"))
&& matches!(node_text(prop, source), Some("cwd"))
}
"call_expression" => {
let func = match node.child_by_field_name("function") {
Some(n) => n,
None => return false,
};
if func.kind() != "member_expression" {
return false;
}
is_trusted_node_global(func, source)
}
_ => false,
}
}
fn match_go_call<'a>(node: tree_sitter::Node<'a>, source: &'a [u8]) -> Option<PathSite<'a>> {
let func = node.child_by_field_name("function")?;
if func.kind() != "selector_expression" {
return None;
}
let operand = func.child_by_field_name("operand")?;
let field = func.child_by_field_name("field")?;
let operand_text = node_text(operand, source)?;
let field_text = node_text(field, source)?;
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let (api, idx) = match (operand_text, field_text) {
(
"os",
"Open" | "Create" | "OpenFile" | "Remove" | "RemoveAll" | "Rename" | "Mkdir"
| "MkdirAll" | "Chmod",
) => (PathApi::FileOp, 0),
("ioutil", "ReadFile" | "WriteFile" | "ReadDir") => (PathApi::FileOp, 0),
("os", _) | ("ioutil", _) => return None,
("filepath", "Join" | "Clean") => (PathApi::PathJoin, 0),
("http", "ServeFile") => (PathApi::SendFile, 1),
_ => return None,
};
let target = arg_nodes.get(idx).copied()?;
let arg_kind = if api == PathApi::PathJoin {
classify_path_args_go(&arg_nodes, source)
} else {
classify_path_arg_go(target, source)
};
let arg_text = collect_arg_text(&arg_nodes, source);
Some(PathSite {
call_node: node,
api,
arg_kind,
arg_text,
})
}
fn classify_path_args_go(args: &[tree_sitter::Node<'_>], source: &[u8]) -> PathArgKind {
let mut strongest = PathArgKind::StaticLiteral;
for a in args {
let k = classify_path_arg_go(*a, source);
strongest = strongest_kind(strongest, k);
}
strongest
}
#[allow(clippy::only_used_in_recursion)]
fn classify_path_arg_go(node: tree_sitter::Node<'_>, source: &[u8]) -> PathArgKind {
match node.kind() {
"interpreted_string_literal" | "raw_string_literal" => PathArgKind::StaticLiteral,
"binary_expression" => {
let left = node.child_by_field_name("left");
let right = node.child_by_field_name("right");
let mut found_var = false;
let mut found_lit = false;
for opt in [left, right].iter().flatten() {
match classify_path_arg_go(*opt, source) {
PathArgKind::UserVariable
| PathArgKind::Interpolated
| PathArgKind::Concatenation
| PathArgKind::Unknown => found_var = true,
PathArgKind::StaticLiteral => found_lit = true,
}
}
if found_var && found_lit {
PathArgKind::Concatenation
} else if found_var {
PathArgKind::UserVariable
} else {
PathArgKind::StaticLiteral
}
}
"identifier" | "selector_expression" | "index_expression" | "call_expression" => {
PathArgKind::UserVariable
}
"parenthesized_expression" => {
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
return classify_path_arg_go(c, source);
}
}
PathArgKind::Unknown
}
_ => PathArgKind::Unknown,
}
}
fn merge_taint_paths(
findings: &mut Vec<Finding>,
taint_result: &TaintAnalysisResult,
repo_root: &Path,
) {
for taint in &taint_result.paths {
let abs_sink = Path::new(&taint.sink_file);
let rel_sink = abs_sink.strip_prefix(repo_root).unwrap_or(abs_sink);
let sink_line = taint.sink_line;
let sits_at_sink = |f: &Finding| {
let file_match = f
.affected_files
.first()
.map(|p| p == rel_sink || p == abs_sink)
.unwrap_or(false);
file_match && f.line_start == Some(sink_line)
};
let mut matched = false;
for f in findings.iter_mut() {
if !sits_at_sink(f) {
continue;
}
matched = true;
if f.is_dual_branch() {
continue;
}
if taint.is_sanitized {
f.severity = Severity::Low;
f.description = format!(
"{}\n\n**Taint Analysis Note**: A sanitizer function (`{}`) was found in \
the data flow path, which may mitigate this vulnerability.",
f.description,
taint.sanitizer.as_deref().unwrap_or("unknown")
);
} else {
f.severity = Severity::Critical;
f.description = format!(
"{}\n\n**Taint Analysis Confirmed**: Data flow analysis traced a path from \
user input to this file sink without sanitization:\n\n`{}`",
f.description,
taint.path_string()
);
}
}
if !matched && !taint.is_sanitized {
findings.push(Finding {
id: String::new(),
detector: "PathTraversalDetector".to_string(),
severity: Severity::Critical,
title: "Path traversal confirmed by taint analysis".to_string(),
description: format!(
"{}\n\n**Taint Analysis Confirmed**: Data flow analysis traced a path from \
user input to this file sink without sanitization:\n\n`{}`",
PathApi::FileOp.base_description(),
taint.path_string()
),
affected_files: vec![rel_sink.to_path_buf()],
line_start: Some(sink_line),
line_end: Some(sink_line),
suggested_fix: Some(PathApi::FileOp.suggested_fix().to_string()),
estimated_effort: Some("30 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-22".to_string()),
why_it_matters: Some(PathApi::FileOp.why_it_matters().to_string()),
..Default::default()
});
}
}
}
fn receiver_chain_label_js(node: tree_sitter::Node<'_>, source: &[u8]) -> String {
receiver_chain_label_shared(node, source, Some(&require_module_label))
}
fn require_module_label(node: tree_sitter::Node<'_>, source: &[u8]) -> Option<&'static str> {
debug_assert_eq!(node.kind(), "call_expression");
let func = node.child_by_field_name("function")?;
let func_text = node_text(func, source)?;
let is_require_or_import =
matches!(func.kind(), "identifier" | "import") && matches!(func_text, "require" | "import");
if !is_require_or_import {
return None;
}
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let first = arg_nodes.first()?;
let module = js_string_literal_value(*first, source)?;
match module.as_str() {
"fs" | "node:fs" | "fs/promises" | "node:fs/promises" => Some("fs"),
"path" | "node:path" => Some("path"),
_ => None,
}
}
fn js_string_literal_value(node: tree_sitter::Node<'_>, source: &[u8]) -> Option<String> {
if node.kind() != "string" {
return None;
}
let mut cursor = node.walk();
let mut buf = String::new();
let mut saw_fragment = false;
for child in node.children(&mut cursor) {
if child.kind() == "string_fragment" {
if let Some(t) = node_text(child, source) {
buf.push_str(t);
saw_fragment = true;
}
}
}
if saw_fragment {
Some(buf)
} else {
None
}
}
fn collect_arg_text(args: &[tree_sitter::Node<'_>], source: &[u8]) -> String {
let mut out = String::new();
for a in args {
if let Some(t) = node_text(*a, source) {
out.push_str(t);
out.push(' ');
}
}
out
}
fn strongest_kind(a: PathArgKind, b: PathArgKind) -> PathArgKind {
fn rank(k: PathArgKind) -> u8 {
match k {
PathArgKind::StaticLiteral => 0,
PathArgKind::Unknown => 1,
PathArgKind::Concatenation => 2,
PathArgKind::Interpolated => 3,
PathArgKind::UserVariable => 4,
}
}
if rank(b) > rank(a) {
b
} else {
a
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::detectors::base::Detector;
use crate::graph::builder::GraphBuilder;
#[test]
fn test_detects_open_with_user_input() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("vuln.py", "def download(request):\n filename = request.GET.get(\"file\")\n f = open(request.GET[\"file\"], \"r\")\n return f.read()\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect open() with user-controlled path from request"
);
assert!(
findings
.iter()
.any(|f| f.title.to_lowercase().contains("path traversal")),
"Finding should mention path traversal. Titles: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
assert!(
findings
.iter()
.any(|f| f.cwe_id.as_deref() == Some("CWE-22")),
"Finding should have CWE-22"
);
}
#[test]
fn test_no_findings_for_hardcoded_path() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("safe.py", "def read_config():\n with open(\"config/settings.json\", \"r\") as f:\n return json.load(f)\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Hardcoded path should have no path traversal findings, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_get_full_path() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("views.py", "from django.http import HttpResponseRedirect\n\ndef my_view(request):\n return HttpResponseRedirect(request.get_full_path())\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag request.get_full_path() as path traversal. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_list_remove() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("library.py", "def process(request):\n params = list(request.GET.keys())\n params.remove('page')\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag list.remove() as file operation. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_still_detects_real_path_traversal() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("download.py", "import os\n\ndef download(request):\n filepath = os.path.join('/uploads', request.GET.get('file'))\n return open(filepath, 'r').read()\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should still detect real path traversal with request.GET"
);
}
#[test]
fn test_detects_path_join_with_req_params_js() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("download.js", "const path = require('path');\n\nfunction getFile(req, res) {\n const filePath = path.join('/uploads', req.params.filename);\n res.sendFile(filePath);\n}\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect path.join with req.params user input in JS"
);
assert!(
findings
.iter()
.any(|f| f.cwe_id.as_deref() == Some("CWE-22")),
"Finding should have CWE-22"
);
}
#[test]
fn test_detects_readfile_with_request_query_ts() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("serve.ts", "import fs from 'fs';\n\nfunction serveFile(req: Request, res: Response) {\n const name = req.query.file;\n const data = fs.readFileSync('/data/' + req.query.file);\n res.send(data);\n}\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect readFileSync with req.query in TypeScript"
);
}
#[test]
fn test_no_finding_for_path_traversal_in_comment() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("safe.py", "# Vulnerable: open(request.GET['file'], 'r')\ndef read_config():\n with open('config.json', 'r') as f:\n return f.read()\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Path traversal pattern in a comment should not produce findings, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_sendfile_with_user_input_js() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("server.js", "const express = require('express');\n\napp.get('/download', (req, res) => {\n const file = req.query.file;\n res.sendFile(req.query.file);\n});\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect sendFile with user-controlled req.query"
);
assert!(
findings
.iter()
.any(|f| f.title.to_lowercase().contains("path traversal")),
"Finding should mention path traversal. Titles: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_open_with_user_input_python_ast() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"v.py",
"def f(request):\n return open(request.args.get('file')).read()\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect open() with user input via AST"
);
}
#[test]
fn test_detects_fs_readfile_with_req_param_js_ast() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"v.js",
"const fs = require('fs');\nfs.readFile(req.params.filename);\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect fs.readFile with req.params via AST"
);
}
#[test]
fn test_skips_open_with_static_literal_python() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("v.py", "def f():\n return open('/etc/hosts').read()\n")],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Static-literal open() must not fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_skips_open_in_comment() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("v.py", "# open(user_input)\ndef f():\n return 1\n")],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Comment-only sink must not fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_b1_require_fs_readfile_via_require_alias_js() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("v.js", "require('fs').readFile(req.body.path);\n")],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"require('fs').readFile(...) should fire via receiver descent. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_python_aliased_module_open() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("v.py", "import os.path as op\ndef f(request):\n return op.join('/base', request.args.get('x'))\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Aliased `import os.path as op` then `op.join(base, user)` should fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_skips_open_method_name_python() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"v.py",
"class Reader:\n def open(self):\n return self\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Method named `open` is a definition, not a call site. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_path_join_with_concatenation_python() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("v.py", "import os\ndef f(request):\n return os.path.join('/base', '../' + request.args.get('x'))\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"os.path.join(base, '../' + user) should fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_pathlib_path_with_user_input_python() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("v.py", "import pathlib\ndef f(request):\n return pathlib.Path(request.args.get('x')).read_text()\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"pathlib.Path(user_input).read_text() should fire on the Path() constructor. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_express_sendfile_with_req_path_js() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"v.js",
"app.get('/x', (req, res) => { res.sendFile(req.params.path); });\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"res.sendFile(req.params.path) should fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
assert!(findings
.iter()
.any(|f| f.title.to_lowercase().contains("download")
|| f.title.to_lowercase().contains("traversal")));
}
#[test]
fn test_taint_confirmed_boosts_to_critical() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("app.py", "from flask import request\ndef serve():\n return open(request.args.get('file')).read()\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(!findings.is_empty(), "Direct user-input sink should fire");
let f = &findings[0];
assert!(matches!(f.severity, Severity::High | Severity::Critical),
"Direct user-input sink (marker on the same expression) must be High or Critical; got {:?}", f.severity);
}
#[test]
fn repro_path_join_with_process_cwd_should_not_fire() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"docs.ts",
"import path from 'path';\n\nconst DOCS_CLI_DIR = path.join(process.cwd(), 'src', 'app', 'docs', 'cli');\nconst DIR2 = path.join(__dirname, 'fixtures');\nconst DIR3 = path.join(__filename, 'sibling');\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"path.join(process.cwd()/__dirname/__filename, ...static...) is trusted, should NOT fire. Got: {:?}",
findings
.iter()
.map(|f| format!("{} (line {:?})", f.title, f.line_start))
.collect::<Vec<_>>()
);
}
#[test]
fn repro_path_join_with_user_input_still_fires() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"vuln.ts",
"import path from 'path';\n\nfunction handler(req: Request) {\n return path.join(req.body.userPath, 'foo');\n}\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"path.join(req.body.userPath, ...) MUST still fire as path traversal"
);
}
#[test]
fn test_severity_critical_for_user_input_low_for_static_literal() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("v.py", "def f(request):\n a = open('/etc/hosts').read()\n b = open(request.args.get('x')).read()\n return a + b\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert_eq!(
findings.len(),
1,
"Expected exactly the user-input line to fire. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, &f.severity, f.line_start))
.collect::<Vec<_>>()
);
let f = &findings[0];
assert!(
matches!(f.severity, Severity::High | Severity::Critical),
"User-input sink should be at least High; got {:?}",
f.severity
);
assert_eq!(
f.line_start,
Some(3),
"Should fire on the user-input line (line 3), not the static-literal line (line 2)"
);
}
fn run_dual_branch(file: &str, content: &str) -> Vec<Finding> {
use crate::config::DualBranchConfig;
use std::collections::HashMap;
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let mut detectors = HashMap::new();
detectors.insert("path-traversal".to_string(), true);
let cfg = DualBranchConfig {
enabled: true,
detectors,
};
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(file, content)],
)
.with_dual_branch(cfg);
detector.detect(&ctx).expect("detection should succeed")
}
#[test]
fn flag_off_emits_single_branch_unchanged() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"vuln.py",
"def download(request):\n return open(request.GET[\"file\"], \"r\")\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(!findings.is_empty(), "must still fire single-branch");
for f in &findings {
assert!(
f.alternative_branch.is_none(),
"no alternative_branch when flag off: {:?}",
f.title
);
assert!(
f.prediction_reasons.iter().all(|r| r.weight == 0.0),
"no predictor-emitted (weight ≠ 0) reasons when flag off; \
only weight-0 graph-enrichment reasons are allowed. reasons: {:?}",
f.prediction_reasons
.iter()
.map(|r| (&r.kind, r.weight))
.collect::<Vec<_>>()
);
}
}
#[test]
fn flag_on_python_path_traversal_emits_dual_branch() {
let findings = run_dual_branch(
"vuln.py",
"def download(request):\n return open(request.GET[\"file\"], \"r\")\n",
);
assert!(!findings.is_empty(), "must fire dual-branch");
let f = &findings[0];
assert!(
f.alternative_branch.is_some(),
"alternative_branch must be populated when flag on. title={:?}",
f.title
);
assert!(
!f.prediction_reasons.is_empty(),
"at least one prediction reason"
);
}
#[test]
fn matrix_predicted_realbug_actual_realbug() {
let findings = run_dual_branch(
"vuln.py",
"def download(request):\n return open(request.GET[\"file\"], \"r\")\n",
);
assert!(!findings.is_empty());
let f = &findings[0];
assert_eq!(f.severity, Severity::High);
assert!(
f.title.to_lowercase().contains("path traversal"),
"RealBug title; got {:?}",
f.title
);
let alt = f.alternative_branch.as_ref().unwrap();
assert_eq!(alt.label, crate::dual_branch::BranchLabel::Benign);
assert_eq!(alt.severity, Severity::Info);
}
#[test]
fn matrix_predicted_benign_actual_benign_literal_first() {
let findings = run_dual_branch(
"internal.py",
"import os\n\
def make_config_path():\n\
\x20 return os.path.join('/etc/myapp', 'config.json')\n",
);
assert!(!findings.is_empty(), "must produce a finding");
let f = &findings[0];
assert_eq!(
f.severity,
Severity::Info,
"predicted Benign for literal first arg. title={:?}, reasons={:?}",
f.title,
f.prediction_reasons
.iter()
.map(|r| (&r.kind, r.weight))
.collect::<Vec<_>>()
);
let alt = f.alternative_branch.as_ref().unwrap();
assert_eq!(alt.label, crate::dual_branch::BranchLabel::RealBug);
assert_eq!(alt.severity, Severity::High);
}
#[test]
fn matrix_predicted_realbug_actual_benign_synthetic() {
let findings = run_dual_branch(
"internal.py",
"import os\n\
def serve(folder):\n\
\x20 return open(os.path.join(folder, 'data.json'))\n",
);
let f = findings
.iter()
.find(|f| f.title.to_lowercase().contains("path traversal"))
.expect("expected at least one path-traversal finding");
assert_eq!(
f.severity,
Severity::High,
"Parameter first arg leans RealBug. title={:?}",
f.title
);
}
#[test]
fn matrix_predicted_benign_actual_realbug_basename_misuse() {
let findings = run_dual_branch(
"helpers.py",
"import os\n\
def test_serve(name):\n\
\x20 return open(os.path.join(name, 'data'), 'r')\n",
);
let f = findings
.iter()
.find(|f| f.title.to_lowercase().contains("path"))
.unwrap_or_else(|| {
panic!(
"expected at least one path finding; got titles: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
)
});
assert!(
f.is_dual_branch(),
"must be dual-branched; got {:?}",
f.title
);
let total: f32 = f.prediction_reasons.iter().map(|r| r.weight).sum();
let signals: Vec<_> = f
.prediction_reasons
.iter()
.map(|r| (&r.kind, r.weight))
.collect();
if total > 0.0 {
assert_eq!(
f.severity,
Severity::Info,
"positive sum {total} should predict Benign/Info; signals={signals:?}"
);
} else {
assert_eq!(
f.severity,
Severity::High,
"non-positive sum {total} should predict RealBug/High (tiebreak conservative); \
signals={signals:?}"
);
}
}
#[test]
fn internal_path_annotation_collapses_via_detect() {
let findings = run_dual_branch(
"guarded.py",
"def serve(request):\n\
\x20 return open(request.GET[\"file\"], \"r\") # repotoire: internal-path[validated-by-caller]\n",
);
assert!(!findings.is_empty(), "must produce a finding");
let f = &findings[0];
assert_eq!(
f.severity,
Severity::Info,
"internal-path annotation must collapse to Info even with request-source arg"
);
assert_eq!(
f.resolution_signals.len(),
1,
"exactly one resolution signal"
);
match &f.resolution_signals[0].kind {
crate::dual_branch::ResolutionKind::SourceAnnotation { syntax } => {
assert!(syntax.contains("internal-path"));
assert!(syntax.contains("validated-by-caller"));
}
other => panic!("unexpected resolution kind: {other:?}"),
}
}
#[test]
fn user_controlled_annotation_collapses_via_detect() {
let findings = run_dual_branch(
"annotated.py",
"import os\n\
def make():\n\
\x20 return os.path.join('/var/www', x) # repotoire: user-controlled[GET]\n",
);
assert!(!findings.is_empty());
let f = &findings[0];
assert_eq!(f.severity, Severity::High);
assert_eq!(f.resolution_signals.len(), 1);
match &f.resolution_signals[0].kind {
crate::dual_branch::ResolutionKind::SourceAnnotation { syntax } => {
assert!(syntax.contains("user-controlled"));
assert!(syntax.contains("GET"));
}
other => panic!("unexpected resolution kind: {other:?}"),
}
}
#[test]
fn dual_branch_does_not_affect_non_python_languages() {
let findings = run_dual_branch(
"vuln.js",
"const fs = require('fs');\n\
function download(req) {\n\
\x20 return fs.readFile(req.query.file, 'utf8');\n\
}\n",
);
for f in &findings {
assert!(
f.alternative_branch.is_none(),
"JS path must not be dual-branched in Phase 2b: {:?}",
f.title
);
assert!(
f.prediction_reasons.iter().all(|r| r.weight == 0.0),
"JS path must not carry predictor (weight ≠ 0) reasons in Phase 2b"
);
}
}
#[test]
fn dual_branch_findings_skip_taint_critical_bump() {
let findings = run_dual_branch(
"internal.py",
"import os\n\
def make():\n\
\x20 return os.path.join('/etc', 'config')\n",
);
assert!(
!findings.is_empty(),
"literal first arg must produce a finding"
);
let f = &findings[0];
assert!(
f.is_dual_branch(),
"literal first arg → dual-branch finding emitted"
);
assert_eq!(
f.severity,
Severity::Info,
"predicted Benign severity preserved (no taint Critical bump)"
);
}
#[test]
fn click_utils_489_simplified_with_folder_as_param() {
let findings = run_dual_branch(
"click_utils_simplified.py",
"import os\n\
def get_app_dir(app_name, folder=None):\n\
\x20 if folder is None:\n\
\x20 folder = os.environ.get('XDG_CONFIG_HOME', '~/.config')\n\
\x20 return os.path.join(folder, app_name)\n",
);
let f = findings
.iter()
.find(|f| f.title.to_lowercase().contains("path"))
.unwrap_or_else(|| {
panic!(
"expected a path finding for the os.path.join call; got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
)
});
assert!(f.is_dual_branch());
assert_eq!(
f.severity,
Severity::High,
"first arg is parameter `folder` → RealBug. reasons: {:?}",
f.prediction_reasons
.iter()
.map(|r| (&r.kind, r.weight))
.collect::<Vec<_>>()
);
assert!(
f.prediction_reasons.iter().any(|r| matches!(
&r.kind,
crate::dual_branch::PredictionReasonKind::FirstArgIdentifier { name } if name == "folder"
)),
"FirstArgIdentifier signal must fire for `folder`; got: {:?}",
f.prediction_reasons
.iter()
.map(|r| &r.kind)
.collect::<Vec<_>>()
);
let alt = f.alternative_branch.as_ref().unwrap();
assert_eq!(alt.label, crate::dual_branch::BranchLabel::Benign);
assert_eq!(alt.severity, Severity::Info);
assert!(
f.suggested_fix
.as_deref()
.unwrap_or("")
.contains("internal-path"),
"RealBug suggested_fix should mention the internal-path annotation. \
got: {:?}",
f.suggested_fix
);
}
#[test]
fn click_utils_489_real_signature_local_var_folder() {
let findings = run_dual_branch(
"click_utils_real.py",
"import os\n\
def get_app_dir(app_name, roaming=True, force_posix=False):\n\
\x20 key = 'APPDATA' if roaming else 'LOCALAPPDATA'\n\
\x20 folder = os.environ.get(key)\n\
\x20 return os.path.join(folder, app_name)\n",
);
let f = findings
.iter()
.find(|f| f.title.to_lowercase().contains("path"))
.unwrap_or_else(|| {
panic!(
"expected a path finding; got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
)
});
assert!(f.is_dual_branch());
assert_eq!(f.severity, Severity::High, "tiebreak → RealBug → High");
let has_folder_signal = f.prediction_reasons.iter().any(|r| matches!(
&r.kind,
crate::dual_branch::PredictionReasonKind::FirstArgIdentifier { name } if name == "folder"
));
assert!(
!has_folder_signal,
"v0 limitation: `folder` is a local var, not a param — the \
extractor should NOT emit a FirstArgIdentifier signal. If \
this assertion now fails, v1 extractor has landed; update \
this test and the decisions doc together. reasons: {:?}",
f.prediction_reasons
.iter()
.map(|r| &r.kind)
.collect::<Vec<_>>()
);
}
}