use crate::detectors::ast_fingerprint::parse_root_ext;
use crate::detectors::ast_walk::AstWalkCtx;
use crate::detectors::base::{Detector, DetectorConfig};
use crate::detectors::detector_context::ContentFlags;
use crate::detectors::fast_search::{find_in, *};
use crate::detectors::security::ast_helpers::{
collect_named_args, node_text, receiver_chain_label as receiver_chain_label_shared,
unwrap_callee,
};
use crate::detectors::security::scan_inputs::{ScanAstInputs, ScanInputs};
use crate::detectors::taint::{TaintAnalysisResult, TaintAnalyzer, TaintCategory};
use crate::models::{Finding, Severity};
use crate::parsers::lightweight::Language;
use anyhow::Result;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{LazyLock, OnceLock};
const SUPPORTED_EXTS: &[&str] = &["py", "js", "ts", "jsx", "tsx", "go"];
const AST_EXTS: &[&str] = &["py", "js", "ts", "jsx", "tsx", "go"];
pub struct PathTraversalDetector {
repository_path: PathBuf,
max_findings: usize,
taint_analyzer: TaintAnalyzer,
precomputed_cross: OnceLock<Vec<crate::detectors::taint::TaintPath>>,
precomputed_intra: OnceLock<Vec<crate::detectors::taint::TaintPath>>,
}
impl PathTraversalDetector {
pub fn new(repository_path: impl Into<PathBuf>) -> Self {
Self {
repository_path: repository_path.into(),
max_findings: 50,
taint_analyzer: TaintAnalyzer::new(),
precomputed_cross: OnceLock::new(),
precomputed_intra: OnceLock::new(),
}
}
}
impl Detector for PathTraversalDetector {
fn name(&self) -> &'static str {
"path-traversal"
}
fn description(&self) -> &'static str {
"Detects path traversal vulnerabilities"
}
fn bypass_postprocessor(&self) -> bool {
true
}
crate::detectors::impl_taint_precompute!();
fn taint_category(&self) -> Option<crate::detectors::taint::TaintCategory> {
Some(TaintCategory::PathTraversal)
}
fn file_extensions(&self) -> &'static [&'static str] {
SUPPORTED_EXTS
}
fn content_requirements(&self) -> crate::detectors::detector_context::ContentFlags {
crate::detectors::detector_context::ContentFlags::FILE_OPS
.union(crate::detectors::detector_context::ContentFlags::PATH_OPS)
}
fn detect(
&self,
ctx: &crate::detectors::analysis_context::AnalysisContext,
) -> Result<Vec<Finding>> {
let graph = ctx.graph;
let det_ctx = &ctx.detector_ctx;
let files = &ctx.as_file_provider();
let mut findings: Vec<Finding> = vec![];
let mut taint_paths = if let Some(cross) = self.precomputed_cross.get() {
cross.clone()
} else {
self.taint_analyzer
.trace_taint(graph, TaintCategory::PathTraversal)
};
let intra_paths = if let Some(intra) = self.precomputed_intra.get() {
intra.clone()
} else {
crate::detectors::taint::run_intra_function_taint(
&self.taint_analyzer,
graph,
TaintCategory::PathTraversal,
&self.repository_path,
)
};
taint_paths.extend(intra_paths);
let taint_result = TaintAnalysisResult::from_paths(taint_paths);
for path in files.files_with_extensions(SUPPORTED_EXTS) {
if findings.len() >= self.max_findings {
break;
}
let flags = det_ctx.content_flags.get(path).copied().unwrap_or_default();
let should_check = flags.has(ContentFlags::FILE_OPS)
|| flags.has(ContentFlags::PATH_OPS)
|| det_ctx.content_flags.is_empty();
if !should_check {
continue;
}
let raw = match files.content(path) {
Some(c) => c,
None => continue,
};
let raw_str: &str = &raw;
if det_ctx.content_flags.is_empty() && !contains_any(PATH_KEYWORD_FINDERS, raw_str) {
continue;
}
if raw_str.len() > 500_000 {
continue;
}
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
if !AST_EXTS.contains(&ext) {
continue;
}
let lang = Language::from_path(path);
let cached = files.tree(path);
let scan = ScanInputs::new(path, raw_str, ext);
let ast_inputs = ScanAstInputs::new(scan, lang, cached.as_deref());
let new_findings = self.scan_file_ast(&ast_inputs, &taint_result);
findings.extend(new_findings);
}
merge_taint_paths(&mut findings, &taint_result, &self.repository_path);
findings.retain(|f| f.severity != Severity::Low);
Ok(findings)
}
}
impl crate::detectors::RegisteredDetector for PathTraversalDetector {
fn create(init: &crate::detectors::DetectorInit) -> std::sync::Arc<dyn Detector> {
std::sync::Arc::new(Self::new(init.repo_path))
}
}
static PATH_KEYWORD_FINDERS: &[&LazyLock<memchr::memmem::Finder<'static>>] = &[
&FIND_OPEN_PAREN,
&FIND_READ_FILE,
&FIND_WRITE_FILE,
&FIND_PATH_JOIN,
&FIND_PATH_RESOLVE,
&FIND_OS_PATH,
&FIND_SEND_FILE,
&FIND_SEND_FILE_SNAKE,
&FIND_SERVE_FILE,
&FIND_UNLINK,
&FIND_RMDIR,
&FIND_MKDIR,
&FIND_COPY_FILE,
&FIND_RENAME_PAREN,
&FIND_OS_REMOVE,
&FIND_SHUTIL,
&FIND_FILEPATH,
&FIND_PATHLIB,
&FIND_CREATE_READ_STREAM,
&FIND_CREATE_WRITE_STREAM,
&FIND_APPEND_FILE,
&FIND_STAT_SYNC,
&FIND_ACCESS_SYNC,
];
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PathApi {
FileOp,
PathJoin,
SendFile,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PathArgKind {
StaticLiteral,
Interpolated,
UserVariable,
Concatenation,
Unknown,
}
impl PathApi {
fn title(self) -> &'static str {
match self {
PathApi::FileOp => "Potential path traversal in file operation",
PathApi::PathJoin => "Path traversal via path.join with user input",
PathApi::SendFile => "Path traversal in file download",
}
}
fn base_description(self) -> &'static str {
match self {
PathApi::FileOp => {
"File operation with user-controlled input detected. An attacker could use '../' \
sequences to access files outside the intended directory."
}
PathApi::PathJoin => {
"path.join() with user input does NOT prevent path traversal. Joining '/base' \
with '../etc/passwd' results in '/etc/passwd'."
}
PathApi::SendFile => {
"File download/send function with user-controlled path. Attackers could download \
arbitrary files from the server."
}
}
}
fn suggested_fix(self) -> &'static str {
match self {
PathApi::FileOp =>
"1. Use path.basename() to extract filename only\n\
2. Validate resolved path is within allowed directory\n\
3. Use a whitelist of allowed filenames if possible",
PathApi::PathJoin =>
"After joining, verify the resolved path starts with your base directory:\n\
```\nconst resolved = path.resolve(baseDir, userInput);\n\
if (!resolved.startsWith(path.resolve(baseDir))) { throw new Error('Invalid path'); }\n```",
PathApi::SendFile =>
"Use res.download() with { root: '/safe/base/dir' } option, or validate resolved \
path is within allowed directory.",
}
}
fn why_it_matters(self) -> &'static str {
match self {
PathApi::FileOp => {
"Attackers could read sensitive files like /etc/passwd or overwrite critical \
system files."
}
PathApi::PathJoin => {
"path.join() is commonly misunderstood as safe, but it preserves '../' sequences \
allowing directory escape."
}
PathApi::SendFile => {
"Attackers could download sensitive configuration files, source code, or \
credentials from the server."
}
}
}
fn severity_for(self, kind: PathArgKind, has_user_marker: bool) -> Severity {
match (self, kind) {
(_, PathArgKind::StaticLiteral) => Severity::Low,
(PathApi::PathJoin, PathArgKind::Unknown) => Severity::Low,
(_, PathArgKind::Interpolated)
| (_, PathArgKind::UserVariable)
| (_, PathArgKind::Concatenation) => {
if has_user_marker {
Severity::High
} else {
Severity::Medium
}
}
(_, PathArgKind::Unknown) => {
if has_user_marker {
Severity::Medium
} else {
Severity::Low
}
}
}
}
}
struct PathSite<'a> {
call_node: tree_sitter::Node<'a>,
api: PathApi,
arg_kind: PathArgKind,
arg_text: String,
}
impl PathTraversalDetector {
fn scan_file_ast(
&self,
inputs: &ScanAstInputs<'_>,
_taint_result: &TaintAnalysisResult,
) -> Vec<Finding> {
let path = inputs.path();
let content = inputs.content();
let ext = inputs.ext();
let lang = inputs.lang;
let cached_tree = inputs.cached_tree;
let mut findings = vec![];
if content.contains('\0') {
return findings;
}
let owned;
let root = match cached_tree {
Some(tree) => tree.root_node(),
None => match parse_root_ext(content, lang, ext) {
Some(t) => {
owned = t;
owned.root_node()
}
None => return findings,
},
};
let bytes = content.as_bytes();
let lines: Vec<&str> = content.lines().collect();
let py_aliases = if matches!(lang, Language::Python) {
super::python_imports::collect_python_from_imports(root, bytes)
} else {
HashMap::new()
};
let py_module_aliases = if matches!(lang, Language::Python) {
super::python_imports::collect_python_module_aliases(root, bytes)
} else {
HashMap::new()
};
let mut sites: Vec<PathSite> = Vec::new();
let ctx = AstWalkCtx {
lang,
source: bytes,
};
let aliases = super::python_imports::PythonAliases::new(&py_aliases, &py_module_aliases);
collect_path_sites(&ctx, root, &aliases, &mut sites);
let rel_path = path
.strip_prefix(&self.repository_path)
.unwrap_or(path)
.to_path_buf();
let file_str = path.to_string_lossy();
let is_test_file = file_str.contains("/test")
|| file_str.contains("/tests/")
|| file_str.contains("_test.")
|| file_str.contains(".test.")
|| file_str.contains("/spec/")
|| file_str.contains("_spec.");
for site in sites {
if findings.len() >= self.max_findings {
break;
}
let line_idx = site.call_node.start_position().row;
if let Some(line) = lines.get(line_idx) {
let prev = if line_idx > 0 {
Some(lines[line_idx - 1])
} else {
None
};
if crate::detectors::is_line_suppressed(line, prev) {
continue;
}
}
let line_num = (line_idx + 1) as u32;
let has_user_marker = contains_any(USER_INPUT_FINDERS, &site.arg_text);
let mut severity = site.api.severity_for(site.arg_kind, has_user_marker);
if is_test_file {
severity = match severity {
Severity::Critical => Severity::Medium,
Severity::High => Severity::Low,
Severity::Medium => Severity::Low,
other => other,
};
}
if severity == Severity::Low {
continue;
}
findings.push(Finding {
id: String::new(),
detector: "PathTraversalDetector".to_string(),
severity,
title: site.api.title().to_string(),
description: site.api.base_description().to_string(),
affected_files: vec![rel_path.clone()],
line_start: Some(line_num),
line_end: Some(line_num),
suggested_fix: Some(site.api.suggested_fix().to_string()),
estimated_effort: Some("30 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-22".to_string()),
why_it_matters: Some(site.api.why_it_matters().to_string()),
..Default::default()
});
}
findings
}
}
static USER_INPUT_FINDERS: &[&LazyLock<memchr::memmem::Finder<'static>>] = &[
&FIND_REQ_PARAMS,
&FIND_REQ_QUERY,
&FIND_REQ_BODY,
&FIND_REQ_FILE,
&FIND_REQUEST_GET,
&FIND_REQUEST_POST,
&FIND_REQUEST_FILES,
&FIND_REQUEST_ARGS,
&FIND_REQUEST_FORM,
&FIND_REQUEST_DATA,
&FIND_REQUEST_VALUES,
&FIND_PARAMS_BRACKET,
&FIND_INPUT_PAREN,
&FIND_SYS_ARGV,
&FIND_PROCESS_ARGV,
&FIND_R_URL,
&FIND_C_PARAM,
&FIND_C_QUERY,
&FIND_FORM_VALUE,
&FIND_R_FORM,
&FIND_QUERY_BRACKET,
&FIND_QUERY_GET,
&FIND_BODY_BRACKET,
&FIND_BODY_GET,
];
fn collect_path_sites<'a>(
ctx: &AstWalkCtx<'a>,
node: tree_sitter::Node<'a>,
py_aliases: &super::python_imports::PythonAliases<'_>,
out: &mut Vec<PathSite<'a>>,
) {
if let Some(site) = match_path_site(node, ctx.source, ctx.lang, py_aliases) {
out.push(site);
}
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
collect_path_sites(ctx, child, py_aliases, out);
}
}
fn match_path_site<'a>(
node: tree_sitter::Node<'a>,
source: &'a [u8],
lang: Language,
py_aliases: &super::python_imports::PythonAliases<'_>,
) -> Option<PathSite<'a>> {
match (node.kind(), lang) {
("call", Language::Python) => match_python_call(node, source, py_aliases),
("call_expression", Language::JavaScript | Language::TypeScript) => {
match_js_call(node, source)
}
("call_expression", Language::Go) => match_go_call(node, source),
_ => None,
}
}
fn classify_python_path_callee(module: &str, name: &str) -> Option<(PathApi, usize)> {
Some(match (module, name) {
("", "open") | ("builtins", "open") | ("io", "open") => (PathApi::FileOp, 0),
("os", "remove" | "unlink" | "rmdir" | "mkdir" | "rename" | "chmod") => {
(PathApi::FileOp, 0)
}
("shutil", "copy" | "copyfile" | "copy2" | "move" | "rmtree") => (PathApi::FileOp, 0),
("os.path", "join") => (PathApi::PathJoin, 0),
("pathlib", "Path") => (PathApi::FileOp, 0),
("flask", "send_file" | "send_from_directory") => (PathApi::SendFile, 0),
("django.http", "FileResponse") => (PathApi::SendFile, 0),
("starlette.responses", "FileResponse") => (PathApi::SendFile, 0),
_ => return None,
})
}
fn match_python_call<'a>(
node: tree_sitter::Node<'a>,
source: &'a [u8],
aliases: &super::python_imports::PythonAliases<'_>,
) -> Option<PathSite<'a>> {
let func = node.child_by_field_name("function")?;
let func = unwrap_callee(func);
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let (api, idx) = match func.kind() {
"attribute" => {
let obj = func.child_by_field_name("object")?;
let attr = func.child_by_field_name("attribute")?;
let attr_text = node_text(attr, source)?;
let recv_text = node_text(obj, source).unwrap_or("");
let module_label = aliases
.modules
.get(recv_text)
.cloned()
.unwrap_or_else(|| recv_text.to_string());
classify_python_path_callee(&module_label, attr_text)?
}
"identifier" => {
let name = node_text(func, source)?;
let module = if name == "open" {
"".to_string()
} else {
aliases.imports.get(name).cloned()?
};
classify_python_path_callee(&module, name)?
}
_ => return None,
};
let target = arg_nodes.get(idx).copied()?;
let target = if target.kind() == "keyword_argument" {
arg_nodes
.iter()
.copied()
.find(|a| a.kind() != "keyword_argument")?
} else {
target
};
let arg_kind = if api == PathApi::PathJoin {
classify_path_args_python(&arg_nodes, source)
} else {
classify_path_arg_python(target, source)
};
let arg_text = collect_arg_text(&arg_nodes, source);
Some(PathSite {
call_node: node,
api,
arg_kind,
arg_text,
})
}
fn classify_path_args_python(args: &[tree_sitter::Node<'_>], source: &[u8]) -> PathArgKind {
let mut strongest = PathArgKind::StaticLiteral;
for a in args {
if a.kind() == "keyword_argument" {
continue;
}
let k = classify_path_arg_python(*a, source);
strongest = strongest_kind(strongest, k);
}
strongest
}
#[allow(clippy::only_used_in_recursion)]
fn classify_path_arg_python(node: tree_sitter::Node<'_>, source: &[u8]) -> PathArgKind {
match node.kind() {
"string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if child.kind() == "interpolation" {
return PathArgKind::Interpolated;
}
}
PathArgKind::StaticLiteral
}
"concatenated_string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if classify_path_arg_python(child, source) == PathArgKind::Interpolated {
return PathArgKind::Interpolated;
}
}
PathArgKind::StaticLiteral
}
"binary_operator" => {
let left = node.child_by_field_name("left");
let right = node.child_by_field_name("right");
let mut found_var = false;
let mut found_lit = false;
for opt in [left, right].iter().flatten() {
match classify_path_arg_python(*opt, source) {
PathArgKind::UserVariable
| PathArgKind::Interpolated
| PathArgKind::Concatenation
| PathArgKind::Unknown => found_var = true,
PathArgKind::StaticLiteral => found_lit = true,
}
}
if found_var && found_lit {
PathArgKind::Concatenation
} else if found_var {
PathArgKind::UserVariable
} else {
PathArgKind::StaticLiteral
}
}
"identifier" | "attribute" | "subscript" | "call" => PathArgKind::UserVariable,
"parenthesized_expression" => {
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
return classify_path_arg_python(c, source);
}
}
PathArgKind::Unknown
}
"await" | "conditional_expression" => {
let mut strongest = PathArgKind::StaticLiteral;
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
strongest = strongest_kind(strongest, classify_path_arg_python(c, source));
}
}
strongest
}
_ => PathArgKind::Unknown,
}
}
fn match_js_call<'a>(node: tree_sitter::Node<'a>, source: &'a [u8]) -> Option<PathSite<'a>> {
let func = node.child_by_field_name("function")?;
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let func = unwrap_callee(func);
let (api, idx) = match func.kind() {
"identifier" => {
match node_text(func, source)? {
"readFile" | "readFileSync" | "writeFile" | "writeFileSync" | "appendFile"
| "unlink" | "unlinkSync" | "rmdir" | "mkdir" | "copyFile" | "rename" | "stat"
| "statSync" | "access" | "accessSync" | "createReadStream"
| "createWriteStream" | "open" => (PathApi::FileOp, 0),
_ => return None,
}
}
"member_expression" => {
let obj = func.child_by_field_name("object")?;
let prop = func.child_by_field_name("property")?;
let prop_text = node_text(prop, source)?;
let recv = receiver_chain_label_js(obj, source);
let is_fs = matches!(recv.as_str(), "fs" | "fsp" | "fspromises");
let is_path = matches!(recv.as_str(), "path");
let is_response = matches!(recv.as_str(), "res" | "response");
if is_fs {
match prop_text {
"readFile" | "readFileSync" | "writeFile" | "writeFileSync" | "appendFile"
| "unlink" | "unlinkSync" | "rmdir" | "mkdir" | "copyFile" | "rename"
| "stat" | "statSync" | "access" | "accessSync" | "createReadStream"
| "createWriteStream" | "open" => (PathApi::FileOp, 0),
_ => return None,
}
} else if is_path {
match prop_text {
"join" | "resolve" => (PathApi::PathJoin, 0),
_ => return None,
}
} else if is_response {
match prop_text {
"sendFile" | "download" => (PathApi::SendFile, 0),
_ => return None,
}
} else {
return None;
}
}
_ => return None,
};
let target = arg_nodes.get(idx).copied()?;
let arg_kind = if api == PathApi::PathJoin {
classify_path_args_js(&arg_nodes, source)
} else {
classify_path_arg_js(target, source)
};
let arg_text = collect_arg_text(&arg_nodes, source);
Some(PathSite {
call_node: node,
api,
arg_kind,
arg_text,
})
}
fn classify_path_args_js(args: &[tree_sitter::Node<'_>], source: &[u8]) -> PathArgKind {
let mut strongest = PathArgKind::StaticLiteral;
for a in args {
let k = classify_path_arg_js(*a, source);
strongest = strongest_kind(strongest, k);
}
strongest
}
#[allow(clippy::only_used_in_recursion)]
fn classify_path_arg_js(node: tree_sitter::Node<'_>, source: &[u8]) -> PathArgKind {
match node.kind() {
"string" => PathArgKind::StaticLiteral,
"template_string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if child.kind() == "template_substitution" {
return PathArgKind::Interpolated;
}
}
PathArgKind::StaticLiteral
}
"binary_expression" => {
let left = node.child_by_field_name("left");
let right = node.child_by_field_name("right");
let mut found_var = false;
let mut found_lit = false;
for opt in [left, right].iter().flatten() {
match classify_path_arg_js(*opt, source) {
PathArgKind::UserVariable
| PathArgKind::Interpolated
| PathArgKind::Concatenation
| PathArgKind::Unknown => found_var = true,
PathArgKind::StaticLiteral => found_lit = true,
}
}
if found_var && found_lit {
PathArgKind::Concatenation
} else if found_var {
PathArgKind::UserVariable
} else {
PathArgKind::StaticLiteral
}
}
"identifier" | "member_expression" | "subscript_expression" | "call_expression" => {
PathArgKind::UserVariable
}
"parenthesized_expression"
| "await_expression"
| "as_expression"
| "type_assertion_expression"
| "non_null_expression"
| "satisfies_expression" => {
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
return classify_path_arg_js(c, source);
}
}
PathArgKind::Unknown
}
"ternary_expression" => {
let consequence = node.child_by_field_name("consequence");
let alternative = node.child_by_field_name("alternative");
let mut strongest = PathArgKind::StaticLiteral;
for opt in [consequence, alternative].iter().flatten() {
let k = classify_path_arg_js(*opt, source);
strongest = strongest_kind(strongest, k);
}
strongest
}
_ => PathArgKind::Unknown,
}
}
fn match_go_call<'a>(node: tree_sitter::Node<'a>, source: &'a [u8]) -> Option<PathSite<'a>> {
let func = node.child_by_field_name("function")?;
if func.kind() != "selector_expression" {
return None;
}
let operand = func.child_by_field_name("operand")?;
let field = func.child_by_field_name("field")?;
let operand_text = node_text(operand, source)?;
let field_text = node_text(field, source)?;
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let (api, idx) = match (operand_text, field_text) {
(
"os",
"Open" | "Create" | "OpenFile" | "Remove" | "RemoveAll" | "Rename" | "Mkdir"
| "MkdirAll" | "Chmod",
) => (PathApi::FileOp, 0),
("ioutil", "ReadFile" | "WriteFile" | "ReadDir") => (PathApi::FileOp, 0),
("os", _) | ("ioutil", _) => return None,
("filepath", "Join" | "Clean") => (PathApi::PathJoin, 0),
("http", "ServeFile") => (PathApi::SendFile, 1),
_ => return None,
};
let target = arg_nodes.get(idx).copied()?;
let arg_kind = if api == PathApi::PathJoin {
classify_path_args_go(&arg_nodes, source)
} else {
classify_path_arg_go(target, source)
};
let arg_text = collect_arg_text(&arg_nodes, source);
Some(PathSite {
call_node: node,
api,
arg_kind,
arg_text,
})
}
fn classify_path_args_go(args: &[tree_sitter::Node<'_>], source: &[u8]) -> PathArgKind {
let mut strongest = PathArgKind::StaticLiteral;
for a in args {
let k = classify_path_arg_go(*a, source);
strongest = strongest_kind(strongest, k);
}
strongest
}
#[allow(clippy::only_used_in_recursion)]
fn classify_path_arg_go(node: tree_sitter::Node<'_>, source: &[u8]) -> PathArgKind {
match node.kind() {
"interpreted_string_literal" | "raw_string_literal" => PathArgKind::StaticLiteral,
"binary_expression" => {
let left = node.child_by_field_name("left");
let right = node.child_by_field_name("right");
let mut found_var = false;
let mut found_lit = false;
for opt in [left, right].iter().flatten() {
match classify_path_arg_go(*opt, source) {
PathArgKind::UserVariable
| PathArgKind::Interpolated
| PathArgKind::Concatenation
| PathArgKind::Unknown => found_var = true,
PathArgKind::StaticLiteral => found_lit = true,
}
}
if found_var && found_lit {
PathArgKind::Concatenation
} else if found_var {
PathArgKind::UserVariable
} else {
PathArgKind::StaticLiteral
}
}
"identifier" | "selector_expression" | "index_expression" | "call_expression" => {
PathArgKind::UserVariable
}
"parenthesized_expression" => {
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
return classify_path_arg_go(c, source);
}
}
PathArgKind::Unknown
}
_ => PathArgKind::Unknown,
}
}
fn merge_taint_paths(
findings: &mut Vec<Finding>,
taint_result: &TaintAnalysisResult,
repo_root: &Path,
) {
for taint in &taint_result.paths {
let abs_sink = Path::new(&taint.sink_file);
let rel_sink = abs_sink.strip_prefix(repo_root).unwrap_or(abs_sink);
let sink_line = taint.sink_line;
let mut matched = false;
for f in findings.iter_mut() {
let file_match = f
.affected_files
.first()
.map(|p| p == rel_sink || p == abs_sink)
.unwrap_or(false);
let line_match = f.line_start == Some(sink_line);
if !(file_match && line_match) {
continue;
}
matched = true;
if taint.is_sanitized {
f.severity = Severity::Low;
f.description = format!(
"{}\n\n**Taint Analysis Note**: A sanitizer function (`{}`) was found in \
the data flow path, which may mitigate this vulnerability.",
f.description,
taint.sanitizer.as_deref().unwrap_or("unknown")
);
} else {
f.severity = Severity::Critical;
f.description = format!(
"{}\n\n**Taint Analysis Confirmed**: Data flow analysis traced a path from \
user input to this file sink without sanitization:\n\n`{}`",
f.description,
taint.path_string()
);
}
}
if !matched && !taint.is_sanitized {
findings.push(Finding {
id: String::new(),
detector: "PathTraversalDetector".to_string(),
severity: Severity::Critical,
title: "Path traversal confirmed by taint analysis".to_string(),
description: format!(
"{}\n\n**Taint Analysis Confirmed**: Data flow analysis traced a path from \
user input to this file sink without sanitization:\n\n`{}`",
PathApi::FileOp.base_description(),
taint.path_string()
),
affected_files: vec![rel_sink.to_path_buf()],
line_start: Some(sink_line),
line_end: Some(sink_line),
suggested_fix: Some(PathApi::FileOp.suggested_fix().to_string()),
estimated_effort: Some("30 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-22".to_string()),
why_it_matters: Some(PathApi::FileOp.why_it_matters().to_string()),
..Default::default()
});
}
}
}
fn receiver_chain_label_js(node: tree_sitter::Node<'_>, source: &[u8]) -> String {
receiver_chain_label_shared(node, source, Some(&require_module_label))
}
fn require_module_label(node: tree_sitter::Node<'_>, source: &[u8]) -> Option<&'static str> {
debug_assert_eq!(node.kind(), "call_expression");
let func = node.child_by_field_name("function")?;
let func_text = node_text(func, source)?;
let is_require_or_import =
matches!(func.kind(), "identifier" | "import") && matches!(func_text, "require" | "import");
if !is_require_or_import {
return None;
}
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let first = arg_nodes.first()?;
let module = js_string_literal_value(*first, source)?;
match module.as_str() {
"fs" | "node:fs" | "fs/promises" | "node:fs/promises" => Some("fs"),
"path" | "node:path" => Some("path"),
_ => None,
}
}
fn js_string_literal_value(node: tree_sitter::Node<'_>, source: &[u8]) -> Option<String> {
if node.kind() != "string" {
return None;
}
let mut cursor = node.walk();
let mut buf = String::new();
let mut saw_fragment = false;
for child in node.children(&mut cursor) {
if child.kind() == "string_fragment" {
if let Some(t) = node_text(child, source) {
buf.push_str(t);
saw_fragment = true;
}
}
}
if saw_fragment {
Some(buf)
} else {
None
}
}
fn collect_arg_text(args: &[tree_sitter::Node<'_>], source: &[u8]) -> String {
let mut out = String::new();
for a in args {
if let Some(t) = node_text(*a, source) {
out.push_str(t);
out.push(' ');
}
}
out
}
fn strongest_kind(a: PathArgKind, b: PathArgKind) -> PathArgKind {
fn rank(k: PathArgKind) -> u8 {
match k {
PathArgKind::StaticLiteral => 0,
PathArgKind::Unknown => 1,
PathArgKind::Concatenation => 2,
PathArgKind::Interpolated => 3,
PathArgKind::UserVariable => 4,
}
}
if rank(b) > rank(a) {
b
} else {
a
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::detectors::base::Detector;
use crate::graph::builder::GraphBuilder;
#[test]
fn test_detects_open_with_user_input() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("vuln.py", "def download(request):\n filename = request.GET.get(\"file\")\n f = open(request.GET[\"file\"], \"r\")\n return f.read()\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect open() with user-controlled path from request"
);
assert!(
findings
.iter()
.any(|f| f.title.to_lowercase().contains("path traversal")),
"Finding should mention path traversal. Titles: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
assert!(
findings
.iter()
.any(|f| f.cwe_id.as_deref() == Some("CWE-22")),
"Finding should have CWE-22"
);
}
#[test]
fn test_no_findings_for_hardcoded_path() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("safe.py", "def read_config():\n with open(\"config/settings.json\", \"r\") as f:\n return json.load(f)\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Hardcoded path should have no path traversal findings, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_get_full_path() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("views.py", "from django.http import HttpResponseRedirect\n\ndef my_view(request):\n return HttpResponseRedirect(request.get_full_path())\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag request.get_full_path() as path traversal. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_list_remove() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("library.py", "def process(request):\n params = list(request.GET.keys())\n params.remove('page')\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag list.remove() as file operation. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_still_detects_real_path_traversal() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("download.py", "import os\n\ndef download(request):\n filepath = os.path.join('/uploads', request.GET.get('file'))\n return open(filepath, 'r').read()\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should still detect real path traversal with request.GET"
);
}
#[test]
fn test_detects_path_join_with_req_params_js() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("download.js", "const path = require('path');\n\nfunction getFile(req, res) {\n const filePath = path.join('/uploads', req.params.filename);\n res.sendFile(filePath);\n}\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect path.join with req.params user input in JS"
);
assert!(
findings
.iter()
.any(|f| f.cwe_id.as_deref() == Some("CWE-22")),
"Finding should have CWE-22"
);
}
#[test]
fn test_detects_readfile_with_request_query_ts() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("serve.ts", "import fs from 'fs';\n\nfunction serveFile(req: Request, res: Response) {\n const name = req.query.file;\n const data = fs.readFileSync('/data/' + req.query.file);\n res.send(data);\n}\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect readFileSync with req.query in TypeScript"
);
}
#[test]
fn test_no_finding_for_path_traversal_in_comment() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("safe.py", "# Vulnerable: open(request.GET['file'], 'r')\ndef read_config():\n with open('config.json', 'r') as f:\n return f.read()\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Path traversal pattern in a comment should not produce findings, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_sendfile_with_user_input_js() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("server.js", "const express = require('express');\n\napp.get('/download', (req, res) => {\n const file = req.query.file;\n res.sendFile(req.query.file);\n});\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect sendFile with user-controlled req.query"
);
assert!(
findings
.iter()
.any(|f| f.title.to_lowercase().contains("path traversal")),
"Finding should mention path traversal. Titles: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_open_with_user_input_python_ast() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"v.py",
"def f(request):\n return open(request.args.get('file')).read()\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect open() with user input via AST"
);
}
#[test]
fn test_detects_fs_readfile_with_req_param_js_ast() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"v.js",
"const fs = require('fs');\nfs.readFile(req.params.filename);\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect fs.readFile with req.params via AST"
);
}
#[test]
fn test_skips_open_with_static_literal_python() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("v.py", "def f():\n return open('/etc/hosts').read()\n")],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Static-literal open() must not fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_skips_open_in_comment() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("v.py", "# open(user_input)\ndef f():\n return 1\n")],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Comment-only sink must not fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_b1_require_fs_readfile_via_require_alias_js() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("v.js", "require('fs').readFile(req.body.path);\n")],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"require('fs').readFile(...) should fire via receiver descent. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_python_aliased_module_open() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("v.py", "import os.path as op\ndef f(request):\n return op.join('/base', request.args.get('x'))\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Aliased `import os.path as op` then `op.join(base, user)` should fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_skips_open_method_name_python() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"v.py",
"class Reader:\n def open(self):\n return self\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Method named `open` is a definition, not a call site. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_path_join_with_concatenation_python() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("v.py", "import os\ndef f(request):\n return os.path.join('/base', '../' + request.args.get('x'))\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"os.path.join(base, '../' + user) should fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_pathlib_path_with_user_input_python() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("v.py", "import pathlib\ndef f(request):\n return pathlib.Path(request.args.get('x')).read_text()\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"pathlib.Path(user_input).read_text() should fire on the Path() constructor. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_express_sendfile_with_req_path_js() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"v.js",
"app.get('/x', (req, res) => { res.sendFile(req.params.path); });\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"res.sendFile(req.params.path) should fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
assert!(findings
.iter()
.any(|f| f.title.to_lowercase().contains("download")
|| f.title.to_lowercase().contains("traversal")));
}
#[test]
fn test_taint_confirmed_boosts_to_critical() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("app.py", "from flask import request\ndef serve():\n return open(request.args.get('file')).read()\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(!findings.is_empty(), "Direct user-input sink should fire");
let f = &findings[0];
assert!(matches!(f.severity, Severity::High | Severity::Critical),
"Direct user-input sink (marker on the same expression) must be High or Critical; got {:?}", f.severity);
}
#[test]
fn test_severity_critical_for_user_input_low_for_static_literal() {
let store = GraphBuilder::new().freeze();
let detector = PathTraversalDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("v.py", "def f(request):\n a = open('/etc/hosts').read()\n b = open(request.args.get('x')).read()\n return a + b\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert_eq!(
findings.len(),
1,
"Expected exactly the user-input line to fire. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, &f.severity, f.line_start))
.collect::<Vec<_>>()
);
let f = &findings[0];
assert!(
matches!(f.severity, Severity::High | Severity::Critical),
"User-input sink should be at least High; got {:?}",
f.severity
);
assert_eq!(
f.line_start,
Some(3),
"Should fire on the user-input line (line 3), not the static-literal line (line 2)"
);
}
}