use super::predict::{
extract_internal_path_reason, extract_user_controlled_source, matches_config_object,
matches_request_object, Evidence, FirstArgOrigin,
};
use crate::detectors::security::ast_helpers::{
collect_named_args, enclosing_python_function, node_text, python_function_param_names,
};
use tree_sitter::Node;
pub(super) fn extract_python_evidence<'a>(
call_node: Node<'a>,
source: &'a [u8],
lines: &[&str],
) -> Evidence {
let mut ev = Evidence::default();
let enclosing_fn = enclosing_python_function(call_node);
if let Some(fn_node) = enclosing_fn {
if let Some(name_node) = fn_node.child_by_field_name("name") {
if let Some(name) = node_text(name_node, source) {
ev.enclosing_function = Some(name.to_string());
}
}
}
ev.enclosing_class = enclosing_python_class_name(call_node, source);
let param_names: Vec<String> = enclosing_fn
.map(|fn_node| python_function_param_names(fn_node, source))
.unwrap_or_default();
let mut positional_args: Vec<Node<'_>> = Vec::new();
if let Some(args_node) = call_node.child_by_field_name("arguments") {
let all = collect_named_args(args_node);
positional_args = all
.into_iter()
.filter(|a| a.kind() != "keyword_argument" && a.kind() != "comment")
.collect();
}
if let Some(first) = positional_args.first() {
ev.first_arg_origin = Some(classify_first_arg_origin(*first, source, ¶m_names));
}
ev.basename_applied = positional_args
.iter()
.any(|arg| expression_contains_os_path_basename(*arg, source));
let line_idx = call_node.start_position().row;
if let Some(line) = lines.get(line_idx) {
ev.internal_path_annotation = extract_internal_path_reason(line);
ev.user_controlled_annotation = extract_user_controlled_source(line);
}
ev
}
fn classify_first_arg_origin(
node: Node<'_>,
source: &[u8],
param_names: &[String],
) -> FirstArgOrigin {
match node.kind() {
"string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if child.kind() == "interpolation" {
return FirstArgOrigin::Unknown;
}
}
FirstArgOrigin::Literal
}
"concatenated_string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if classify_first_arg_origin(child, source, param_names) != FirstArgOrigin::Literal
{
return FirstArgOrigin::Unknown;
}
}
FirstArgOrigin::Literal
}
"identifier" => {
let Some(name) = node_text(node, source) else {
return FirstArgOrigin::Unknown;
};
if param_names.iter().any(|p| p == name) {
FirstArgOrigin::Parameter {
name: name.to_string(),
}
} else {
FirstArgOrigin::Unknown
}
}
"attribute" | "call" | "subscript" => {
let Some(text) = node_text(node, source) else {
return FirstArgOrigin::Unknown;
};
if matches_request_object(text) {
FirstArgOrigin::RequestSource
} else if matches_config_object(text) {
FirstArgOrigin::ConfigSource
} else {
FirstArgOrigin::Unknown
}
}
"parenthesized_expression" => {
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
return classify_first_arg_origin(c, source, param_names);
}
}
FirstArgOrigin::Unknown
}
_ => FirstArgOrigin::Unknown,
}
}
fn enclosing_python_class_name<'a>(node: Node<'a>, source: &'a [u8]) -> Option<String> {
let mut cur = node.parent()?;
loop {
if cur.kind() == "class_definition" {
let name = cur.child_by_field_name("name")?;
return node_text(name, source).map(str::to_string);
}
if cur.kind() == "module" {
return None;
}
cur = cur.parent()?;
}
}
fn expression_contains_os_path_basename<'a>(expr: Node<'a>, source: &'a [u8]) -> bool {
if is_os_path_basename_call(expr, source) {
return true;
}
let mut cursor = expr.walk();
for child in expr.children(&mut cursor) {
if expression_contains_os_path_basename(child, source) {
return true;
}
}
false
}
fn is_os_path_basename_call<'a>(node: Node<'a>, source: &'a [u8]) -> bool {
if node.kind() != "call" {
return false;
}
let Some(func) = node.child_by_field_name("function") else {
return false;
};
match func.kind() {
"attribute" => {
let Some(attr) = func.child_by_field_name("attribute") else {
return false;
};
if node_text(attr, source) != Some("basename") {
return false;
}
let Some(obj) = func.child_by_field_name("object") else {
return false;
};
match obj.kind() {
"identifier" => node_text(obj, source) == Some("path"),
"attribute" => obj
.child_by_field_name("attribute")
.and_then(|a| node_text(a, source))
.map(|n| n == "path")
.unwrap_or(false),
_ => false,
}
}
"identifier" => {
node_text(func, source) == Some("basename")
}
_ => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::detectors::ast_fingerprint::parse_root_ext;
use crate::parsers::lightweight::Language;
fn first_call_with_attr<'tree>(
tree: &'tree tree_sitter::Tree,
source: &[u8],
attr_name: &str,
) -> tree_sitter::Node<'tree> {
fn walk<'a>(
node: tree_sitter::Node<'a>,
source: &[u8],
attr_name: &str,
) -> Option<tree_sitter::Node<'a>> {
if node.kind() == "call" {
if let Some(func) = node.child_by_field_name("function") {
if func.kind() == "attribute" {
if let Some(attr) = func.child_by_field_name("attribute") {
if node_text(attr, source) == Some(attr_name) {
return Some(node);
}
}
} else if func.kind() == "identifier"
&& node_text(func, source) == Some(attr_name)
{
return Some(node);
}
}
}
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if let Some(found) = walk(child, source, attr_name) {
return Some(found);
}
}
None
}
walk(tree.root_node(), source, attr_name)
.unwrap_or_else(|| panic!("no call ending in `.{attr_name}` in source"))
}
fn extract(source: &str, attr: &str) -> Evidence {
let bytes = source.as_bytes();
let tree = parse_root_ext(source, Language::Python, "py").expect("parse python");
let lines: Vec<&str> = source.lines().collect();
let call = first_call_with_attr(&tree, bytes, attr);
extract_python_evidence(call, bytes, &lines)
}
#[test]
fn extracts_enclosing_class_and_function() {
let src = "
import os
class FileServer:
def serve(self, name):
return open(os.path.join('/var/www', name))
";
let ev = extract(src, "open");
assert_eq!(ev.enclosing_class.as_deref(), Some("FileServer"));
assert_eq!(ev.enclosing_function.as_deref(), Some("serve"));
}
#[test]
fn module_level_call_has_no_enclosing_function() {
let src = "import os\nopen(os.path.join('/tmp', 'x'))\n";
let ev = extract(src, "open");
assert!(ev.enclosing_function.is_none());
assert!(ev.enclosing_class.is_none());
}
#[test]
fn literal_first_arg_classified_as_literal() {
let src = "import os\np = os.path.join('/var/www', x)\n";
let ev = extract(src, "join");
assert_eq!(ev.first_arg_origin, Some(FirstArgOrigin::Literal));
}
#[test]
fn bytes_literal_first_arg_classified_as_literal() {
let src = "import os\np = os.path.join(b'/var/www', x)\n";
let ev = extract(src, "join");
assert_eq!(ev.first_arg_origin, Some(FirstArgOrigin::Literal));
}
#[test]
fn fstring_with_interpolation_first_arg_is_unknown() {
let src = "import os\np = os.path.join(f'/var/{base}', x)\n";
let ev = extract(src, "join");
assert_eq!(ev.first_arg_origin, Some(FirstArgOrigin::Unknown));
}
#[test]
fn first_arg_matching_parameter_classified_as_parameter() {
let src = "
import os
def serve(name):
return open(os.path.join('/var/www', name))
";
let ev = extract(src, "join");
assert_eq!(ev.first_arg_origin, Some(FirstArgOrigin::Literal));
}
#[test]
fn parameter_in_first_position_classified_as_parameter() {
let src = "
import os
def get_path(folder, name):
return os.path.join(folder, name)
";
let ev = extract(src, "join");
match ev.first_arg_origin {
Some(FirstArgOrigin::Parameter { ref name }) if name == "folder" => {}
other => panic!("expected Parameter {{ folder }}, got {other:?}"),
}
}
#[test]
fn non_param_identifier_in_first_position_is_unknown() {
let src = "
import os
def make():
folder = compute_folder()
return os.path.join(folder, 'x')
";
let ev = extract(src, "join");
assert_eq!(ev.first_arg_origin, Some(FirstArgOrigin::Unknown));
}
#[test]
fn request_args_first_arg_classified_as_request() {
let src = "
def view(request):
return open(request.GET['file'])
";
let ev = extract(src, "open");
assert_eq!(ev.first_arg_origin, Some(FirstArgOrigin::RequestSource));
}
#[test]
fn flask_request_first_arg_classified_as_request() {
let src = "
import os
from flask import request
def view():
return os.path.join(request.args['name'])
";
let ev = extract(src, "join");
assert_eq!(ev.first_arg_origin, Some(FirstArgOrigin::RequestSource));
}
#[test]
fn settings_attribute_first_arg_classified_as_config() {
let src = "
import os
from django.conf import settings
def make():
return os.path.join(settings.BASE_DIR, 'static')
";
let ev = extract(src, "join");
assert_eq!(ev.first_arg_origin, Some(FirstArgOrigin::ConfigSource));
}
#[test]
fn os_environ_get_first_arg_classified_as_config() {
let src = "
import os
def make():
return os.path.join(os.environ.get('HOME'), 'data')
";
let ev = extract(src, "join");
assert_eq!(ev.first_arg_origin, Some(FirstArgOrigin::ConfigSource));
}
#[test]
fn os_path_expanduser_first_arg_classified_as_config() {
let src = "
import os
def make():
return os.path.join(os.path.expanduser('~'), 'data')
";
let ev = extract(src, "join");
assert_eq!(ev.first_arg_origin, Some(FirstArgOrigin::ConfigSource));
}
#[test]
fn os_path_basename_wrapping_detected() {
let src = "
import os
def serve(name):
return open(os.path.join('/var/www', os.path.basename(name)))
";
let ev = extract(src, "join");
assert!(ev.basename_applied);
}
#[test]
fn no_basename_wrapping_not_detected() {
let src = "
import os
def serve(name):
return open(os.path.join('/var/www', name))
";
let ev = extract(src, "join");
assert!(!ev.basename_applied);
}
#[test]
fn bare_basename_call_detected() {
let src = "
import os
from os.path import basename
def serve(name):
return open(os.path.join('/var/www', basename(name)))
";
let ev = extract(src, "join");
assert!(ev.basename_applied);
}
#[test]
fn internal_path_annotation_extracted() {
let src = "import os\np = os.path.join(folder, x) # repotoire: internal-path[validated]\n";
let ev = extract(src, "join");
assert_eq!(ev.internal_path_annotation.as_deref(), Some("validated"));
assert!(ev.user_controlled_annotation.is_none());
}
#[test]
fn user_controlled_annotation_extracted() {
let src = "import os\np = os.path.join(folder, x) # repotoire: user-controlled[GET]\n";
let ev = extract(src, "join");
assert_eq!(ev.user_controlled_annotation.as_deref(), Some("GET"));
assert!(ev.internal_path_annotation.is_none());
}
#[test]
fn no_annotation_yields_none() {
let src = "import os\np = os.path.join(folder, x)\n";
let ev = extract(src, "join");
assert!(ev.internal_path_annotation.is_none());
assert!(ev.user_controlled_annotation.is_none());
}
#[test]
fn unrelated_annotation_yields_none() {
let src = "import os\np = os.path.join(folder, x) # repotoire: protocol-required[RFC]\n";
let ev = extract(src, "join");
assert!(ev.internal_path_annotation.is_none());
assert!(ev.user_controlled_annotation.is_none());
}
#[test]
fn click_utils_489_shape_extracts_canonical_signals() {
let src = "
import os
def get_app_dir(app_name, folder=None):
if folder is None:
folder = os.environ.get('XDG_CONFIG_HOME', '~/.config')
return os.path.join(folder, app_name)
";
let ev = extract(src, "join");
assert_eq!(ev.enclosing_function.as_deref(), Some("get_app_dir"));
match ev.first_arg_origin {
Some(FirstArgOrigin::Parameter { ref name }) if name == "folder" => {}
other => panic!("expected Parameter {{ folder }}, got {other:?}"),
}
assert!(!ev.basename_applied);
assert!(ev.internal_path_annotation.is_none());
}
}