#![allow(dead_code)]
use super::predict::{
extract_command_static_reason, extract_command_user_controlled_source, matches_config_object,
matches_request_object, Argv0Origin, Evidence, FirstArgOrigin,
};
use crate::detectors::security::ast_helpers::{
collect_named_args, enclosing_python_function, node_text, python_function_param_names,
python_kwarg_truthy,
};
use tree_sitter::Node;
pub(super) fn extract_python_evidence<'a>(
call_node: Node<'a>,
source: &'a [u8],
lines: &[&str],
) -> Evidence {
let mut ev = Evidence::default();
let enclosing_fn = enclosing_python_function(call_node);
if let Some(fn_node) = enclosing_fn {
if let Some(name_node) = fn_node.child_by_field_name("name") {
if let Some(name) = node_text(name_node, source) {
ev.enclosing_function = Some(name.to_string());
}
}
}
ev.enclosing_class = enclosing_python_class_name(call_node, source);
let param_names: Vec<String> = enclosing_fn
.map(|fn_node| python_function_param_names(fn_node, source))
.unwrap_or_default();
let mut all_args: Vec<Node<'_>> = Vec::new();
let mut positional_args: Vec<Node<'_>> = Vec::new();
if let Some(args_node) = call_node.child_by_field_name("arguments") {
all_args = collect_named_args(args_node);
positional_args = all_args
.iter()
.filter(|a| a.kind() != "keyword_argument" && a.kind() != "comment")
.copied()
.collect();
}
ev.kw_shell_true = python_kwarg_truthy(&all_args, "shell", source, true);
if let Some(first) = positional_args.first() {
match first.kind() {
"list" | "tuple" => {
let (argv0, all_literals) = classify_list_form_args(*first, source, ¶m_names);
ev.argv0_origin = Some(argv0);
ev.argv_list_all_literals = all_literals;
}
_ => {
ev.first_arg_origin = Some(classify_first_arg_origin(*first, source, ¶m_names));
}
}
}
let line_idx = call_node.start_position().row;
if let Some(line) = lines.get(line_idx) {
ev.command_static_annotation = extract_command_static_reason(line);
ev.command_user_controlled_annotation = extract_command_user_controlled_source(line);
}
ev
}
fn classify_first_arg_origin(
node: Node<'_>,
source: &[u8],
param_names: &[String],
) -> FirstArgOrigin {
match node.kind() {
"string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if child.kind() == "interpolation" {
return FirstArgOrigin::Unknown;
}
}
FirstArgOrigin::Literal
}
"concatenated_string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if classify_first_arg_origin(child, source, param_names) != FirstArgOrigin::Literal
{
return FirstArgOrigin::Unknown;
}
}
FirstArgOrigin::Literal
}
"identifier" => {
let Some(name) = node_text(node, source) else {
return FirstArgOrigin::Unknown;
};
if param_names.iter().any(|p| p == name) {
FirstArgOrigin::Parameter {
name: name.to_string(),
}
} else {
FirstArgOrigin::Unknown
}
}
"attribute" | "call" | "subscript" => {
let Some(text) = node_text(node, source) else {
return FirstArgOrigin::Unknown;
};
if matches_request_object(text) {
FirstArgOrigin::RequestSource
} else if matches_config_object(text) {
FirstArgOrigin::ConfigSource
} else {
FirstArgOrigin::Unknown
}
}
"parenthesized_expression" => {
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
return classify_first_arg_origin(c, source, param_names);
}
}
FirstArgOrigin::Unknown
}
_ => FirstArgOrigin::Unknown,
}
}
fn classify_list_form_args(
list_node: Node<'_>,
source: &[u8],
param_names: &[String],
) -> (Argv0Origin, bool) {
let mut elements: Vec<Node<'_>> = Vec::new();
let mut cursor = list_node.walk();
for child in list_node.named_children(&mut cursor) {
if child.kind() == "comment" {
continue;
}
elements.push(child);
}
let all_literals = !elements.is_empty()
&& elements
.iter()
.all(|el| is_static_string_literal(*el, source));
let argv0 = elements
.first()
.map(|el| classify_argv0(*el, source, param_names))
.unwrap_or(Argv0Origin::Other);
(argv0, all_literals)
}
fn is_static_string_literal(node: Node<'_>, _source: &[u8]) -> bool {
match node.kind() {
"string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if child.kind() == "interpolation" {
return false;
}
}
true
}
"concatenated_string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if !is_static_string_literal(child, _source) {
return false;
}
}
true
}
_ => false,
}
}
fn classify_argv0(node: Node<'_>, source: &[u8], param_names: &[String]) -> Argv0Origin {
match node.kind() {
"string" | "concatenated_string" => {
if is_static_string_literal(node, source) {
Argv0Origin::Literal
} else {
Argv0Origin::Other
}
}
"identifier" => {
let Some(name) = node_text(node, source) else {
return Argv0Origin::Other;
};
if param_names.iter().any(|p| p == name) {
Argv0Origin::Parameter {
name: name.to_string(),
}
} else {
Argv0Origin::Other
}
}
_ => Argv0Origin::Other,
}
}
fn enclosing_python_class_name<'a>(node: Node<'a>, source: &'a [u8]) -> Option<String> {
let mut cur = node.parent()?;
loop {
if cur.kind() == "class_definition" {
let name = cur.child_by_field_name("name")?;
return node_text(name, source).map(str::to_string);
}
if cur.kind() == "module" {
return None;
}
cur = cur.parent()?;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::detectors::ast_fingerprint::parse_root_ext;
use crate::parsers::lightweight::Language;
fn first_call_with_attr<'tree>(
tree: &'tree tree_sitter::Tree,
source: &[u8],
attr_name: &str,
) -> tree_sitter::Node<'tree> {
fn walk<'a>(
node: tree_sitter::Node<'a>,
source: &[u8],
attr_name: &str,
) -> Option<tree_sitter::Node<'a>> {
if node.kind() == "call" {
if let Some(func) = node.child_by_field_name("function") {
if func.kind() == "attribute" {
if let Some(attr) = func.child_by_field_name("attribute") {
if node_text(attr, source) == Some(attr_name) {
return Some(node);
}
}
} else if func.kind() == "identifier"
&& node_text(func, source) == Some(attr_name)
{
return Some(node);
}
}
}
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if let Some(found) = walk(child, source, attr_name) {
return Some(found);
}
}
None
}
walk(tree.root_node(), source, attr_name)
.unwrap_or_else(|| panic!("no call ending in `.{attr_name}` in source"))
}
fn extract(source: &str, attr: &str) -> Evidence {
let bytes = source.as_bytes();
let tree = parse_root_ext(source, Language::Python, "py").expect("parse python");
let lines: Vec<&str> = source.lines().collect();
let call = first_call_with_attr(&tree, bytes, attr);
extract_python_evidence(call, bytes, &lines)
}
#[test]
fn extracts_enclosing_class_and_function() {
let src = "
import subprocess
class CmdRunner:
def run(self, name):
return subprocess.run(['echo', name])
";
let ev = extract(src, "run");
assert_eq!(ev.enclosing_class.as_deref(), Some("CmdRunner"));
assert_eq!(ev.enclosing_function.as_deref(), Some("run"));
}
#[test]
fn module_level_call_has_no_enclosing_function() {
let src = "import os\nos.system('date')\n";
let ev = extract(src, "system");
assert!(ev.enclosing_function.is_none());
assert!(ev.enclosing_class.is_none());
}
#[test]
fn shell_true_literal_detected() {
let src = "
import subprocess
def run(cmd):
return subprocess.run(cmd, shell=True)
";
let ev = extract(src, "run");
assert!(ev.kw_shell_true);
}
#[test]
fn shell_false_literal_not_detected() {
let src = "
import subprocess
def run(cmd):
return subprocess.run(['echo', cmd], shell=False)
";
let ev = extract(src, "run");
assert!(!ev.kw_shell_true);
}
#[test]
fn shell_absent_kwarg_not_detected() {
let src = "
import subprocess
def run(cmd):
return subprocess.run(['echo', cmd])
";
let ev = extract(src, "run");
assert!(!ev.kw_shell_true);
}
#[test]
fn shell_non_literal_treated_as_truthy_conservative() {
let src = "
import subprocess
def run(cmd, enable_shell):
return subprocess.run(cmd, shell=enable_shell)
";
let ev = extract(src, "run");
assert!(
ev.kw_shell_true,
"non-literal shell= value should be treated as truthy"
);
}
#[test]
fn string_form_literal_first_arg() {
let src = "
import os
def run():
os.system('date')
";
let ev = extract(src, "system");
assert_eq!(ev.first_arg_origin, Some(FirstArgOrigin::Literal));
assert!(ev.argv0_origin.is_none(), "string-form: argv0_origin None");
}
#[test]
fn string_form_parameter_first_arg() {
let src = "
import os
def run(cmd):
os.system(cmd)
";
let ev = extract(src, "system");
match ev.first_arg_origin {
Some(FirstArgOrigin::Parameter { ref name }) if name == "cmd" => {}
other => panic!("expected Parameter {{ cmd }}, got {other:?}"),
}
}
#[test]
fn string_form_request_source_first_arg() {
let src = "
import os
def view(request):
os.system(request.GET['cmd'])
";
let ev = extract(src, "system");
assert_eq!(ev.first_arg_origin, Some(FirstArgOrigin::RequestSource));
}
#[test]
fn string_form_config_source_first_arg() {
let src = "
import os
def run():
os.system(os.environ.get('CMD'))
";
let ev = extract(src, "system");
assert_eq!(ev.first_arg_origin, Some(FirstArgOrigin::ConfigSource));
}
#[test]
fn string_form_fstring_interpolation_is_unknown() {
let src = "
import os
def run(name):
os.system(f'echo {name}')
";
let ev = extract(src, "system");
assert_eq!(ev.first_arg_origin, Some(FirstArgOrigin::Unknown));
}
#[test]
fn list_form_all_static_literals() {
let src = "
import subprocess
def run():
subprocess.run(['git', 'status', '--porcelain'])
";
let ev = extract(src, "run");
assert!(ev.argv_list_all_literals);
assert_eq!(ev.argv0_origin, Some(Argv0Origin::Literal));
assert!(ev.first_arg_origin.is_none());
}
#[test]
fn list_form_with_variable_element_not_all_literals() {
let src = "
import subprocess
def run(branch):
subprocess.run(['git', 'checkout', branch])
";
let ev = extract(src, "run");
assert!(!ev.argv_list_all_literals);
assert_eq!(ev.argv0_origin, Some(Argv0Origin::Literal));
}
#[test]
fn list_form_param_argv0() {
let src = "
import subprocess
def run(binary, arg):
subprocess.run([binary, arg])
";
let ev = extract(src, "run");
assert!(!ev.argv_list_all_literals);
match ev.argv0_origin {
Some(Argv0Origin::Parameter { ref name }) if name == "binary" => {}
other => panic!("expected Parameter {{ binary }}, got {other:?}"),
}
}
#[test]
fn list_form_other_argv0() {
let src = "
import subprocess
def run():
subprocess.run([get_binary(), 'arg'])
";
let ev = extract(src, "run");
assert_eq!(ev.argv0_origin, Some(Argv0Origin::Other));
assert!(!ev.argv_list_all_literals);
}
#[test]
fn list_form_tuple_works_same_as_list() {
let src = "
import subprocess
def run():
subprocess.run(('git', 'status'))
";
let ev = extract(src, "run");
assert!(ev.argv_list_all_literals);
assert_eq!(ev.argv0_origin, Some(Argv0Origin::Literal));
}
#[test]
fn list_form_fstring_argv0_is_other() {
let src = "
import subprocess
def run(x):
subprocess.run([f'/bin/{x}', 'arg'])
";
let ev = extract(src, "run");
assert!(!ev.argv_list_all_literals);
assert_eq!(ev.argv0_origin, Some(Argv0Origin::Other));
}
#[test]
fn command_static_annotation_extracted() {
let src = "
import subprocess
def run(branch):
subprocess.run(['git', 'checkout', branch]) # repotoire: command-static[allowlisted]
";
let ev = extract(src, "run");
assert_eq!(ev.command_static_annotation.as_deref(), Some("allowlisted"));
assert!(ev.command_user_controlled_annotation.is_none());
}
#[test]
fn command_user_controlled_annotation_extracted() {
let src = "
import subprocess
def run(branch):
subprocess.run(['git', 'checkout', branch]) # repotoire: command-user-controlled[GET]
";
let ev = extract(src, "run");
assert_eq!(
ev.command_user_controlled_annotation.as_deref(),
Some("GET")
);
assert!(ev.command_static_annotation.is_none());
}
#[test]
fn internal_path_annotation_ignored_by_command_extractor() {
let src = "
import subprocess
def run(p):
subprocess.run(['cat', p]) # repotoire: internal-path[ok]
";
let ev = extract(src, "run");
assert!(ev.command_static_annotation.is_none());
assert!(ev.command_user_controlled_annotation.is_none());
}
#[test]
fn no_annotation_yields_none() {
let src = "
import subprocess
def run(p):
subprocess.run(['cat', p])
";
let ev = extract(src, "run");
assert!(ev.command_static_annotation.is_none());
assert!(ev.command_user_controlled_annotation.is_none());
}
#[test]
fn worked_example_2_shell_true_param_interpolation() {
let src = "
import subprocess
def run(name):
subprocess.run(f'echo {name}', shell=True)
";
let ev = extract(src, "run");
assert!(ev.kw_shell_true);
assert_eq!(ev.first_arg_origin, Some(FirstArgOrigin::Unknown));
assert!(ev.argv0_origin.is_none());
}
#[test]
fn worked_example_4_mixed_list_literal_argv0() {
let src = "
import subprocess
def open_url(url):
subprocess.run(['xdg-open', url])
";
let ev = extract(src, "run");
assert_eq!(ev.argv0_origin, Some(Argv0Origin::Literal));
assert!(!ev.argv_list_all_literals);
assert!(!ev.kw_shell_true);
}
}