use super::predict::{extract_protocol_required_rfc, Evidence};
use crate::detectors::security::ast_helpers::{
collect_named_args, enclosing_python_function, node_text,
};
use tree_sitter::Node;
pub(super) fn extract_python_evidence<'a>(
call_node: Node<'a>,
source: &'a [u8],
lines: &[&str],
) -> Evidence {
let mut ev = Evidence::default();
if let Some(fn_node) = enclosing_python_function(call_node) {
if let Some(name_node) = fn_node.child_by_field_name("name") {
if let Some(name) = node_text(name_node, source) {
ev.enclosing_function = Some(name.to_string());
}
}
}
ev.enclosing_class = enclosing_python_class_name(call_node, source);
if let Some(args_node) = call_node.child_by_field_name("arguments") {
let args = collect_named_args(args_node);
let first_positional = args
.iter()
.find(|a| a.kind() != "keyword_argument" && a.kind() != "comment")
.copied();
if let Some(arg) = first_positional {
ev.first_arg_ident = python_arg_as_identifier(arg, source);
ev.input_includes_urandom = expression_contains_os_urandom(arg, source);
}
ev.usedforsecurity_false = has_usedforsecurity_false(&args, source);
}
ev.result_truncated = call_result_is_sliced(call_node);
let line_idx = call_node.start_position().row;
if let Some(line) = lines.get(line_idx) {
ev.protocol_required_annotation = extract_protocol_required_rfc(line);
}
ev
}
fn enclosing_python_class_name<'a>(node: Node<'a>, source: &'a [u8]) -> Option<String> {
let mut cur = node.parent()?;
loop {
if cur.kind() == "class_definition" {
let name = cur.child_by_field_name("name")?;
return node_text(name, source).map(str::to_string);
}
if cur.kind() == "module" {
return None;
}
cur = cur.parent()?;
}
}
fn python_arg_as_identifier<'a>(arg: Node<'a>, source: &'a [u8]) -> Option<String> {
if arg.kind() == "identifier" {
node_text(arg, source).map(str::to_string)
} else {
None
}
}
fn expression_contains_os_urandom<'a>(expr: Node<'a>, source: &'a [u8]) -> bool {
if is_os_urandom_call(expr, source) {
return true;
}
let mut cursor = expr.walk();
for child in expr.children(&mut cursor) {
if expression_contains_os_urandom(child, source) {
return true;
}
}
false
}
fn is_os_urandom_call<'a>(node: Node<'a>, source: &'a [u8]) -> bool {
if node.kind() != "call" {
return false;
}
let Some(func) = node.child_by_field_name("function") else {
return false;
};
if func.kind() != "attribute" {
return false;
}
let Some(obj) = func.child_by_field_name("object") else {
return false;
};
let Some(attr) = func.child_by_field_name("attribute") else {
return false;
};
node_text(obj, source) == Some("os") && node_text(attr, source) == Some("urandom")
}
fn has_usedforsecurity_false(args: &[Node<'_>], source: &[u8]) -> bool {
use crate::detectors::security::ast_helpers::python_kwarg_value;
let Some(value) = python_kwarg_value(args, "usedforsecurity", source) else {
return false;
};
value.kind() == "false"
}
fn call_result_is_sliced(call_node: Node<'_>) -> bool {
let mut cur = call_node;
for _ in 0..5 {
let Some(parent) = cur.parent() else {
return false;
};
match parent.kind() {
"subscript" => {
let mut cursor = parent.walk();
for child in parent.children(&mut cursor) {
if child.kind() == "slice" {
return true;
}
}
return false;
}
"attribute" | "call" | "parenthesized_expression" => {
cur = parent;
}
_ => return false,
}
}
false
}
#[cfg(test)]
mod tests {
use super::*;
use crate::detectors::ast_fingerprint::parse_root_ext;
use crate::parsers::lightweight::Language;
fn first_call_with_attr<'tree>(
tree: &'tree tree_sitter::Tree,
source: &[u8],
attr_name: &str,
) -> tree_sitter::Node<'tree> {
fn walk<'a>(
node: tree_sitter::Node<'a>,
source: &[u8],
attr_name: &str,
) -> Option<tree_sitter::Node<'a>> {
if node.kind() == "call" {
if let Some(func) = node.child_by_field_name("function") {
if func.kind() == "attribute" {
if let Some(attr) = func.child_by_field_name("attribute") {
if node_text(attr, source) == Some(attr_name) {
return Some(node);
}
}
}
}
}
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if let Some(found) = walk(child, source, attr_name) {
return Some(found);
}
}
None
}
walk(tree.root_node(), source, attr_name)
.unwrap_or_else(|| panic!("no call ending in `.{attr_name}` in source"))
}
fn extract(source: &str, attr: &str) -> Evidence {
let bytes = source.as_bytes();
let tree = parse_root_ext(source, Language::Python, "py").expect("parse python");
let lines: Vec<&str> = source.lines().collect();
let call = first_call_with_attr(&tree, bytes, attr);
extract_python_evidence(call, bytes, &lines)
}
#[test]
fn extracts_enclosing_class_and_function() {
let src = "
class DigestAuth:
def _get_client_nonce(self, s):
return hashlib.sha1(s).hexdigest()[:8]
";
let ev = extract(src, "sha1");
assert_eq!(ev.enclosing_class.as_deref(), Some("DigestAuth"));
assert_eq!(ev.enclosing_function.as_deref(), Some("_get_client_nonce"));
}
#[test]
fn module_level_call_has_no_enclosing_class() {
let src = "import hashlib\nh = hashlib.sha1(b'data')\n";
let ev = extract(src, "sha1");
assert!(ev.enclosing_class.is_none());
assert!(ev.enclosing_function.is_none());
}
#[test]
fn function_level_call_has_no_enclosing_class() {
let src = "
def hash_thing(x):
return hashlib.sha1(x).digest()
";
let ev = extract(src, "sha1");
assert!(ev.enclosing_class.is_none());
assert_eq!(ev.enclosing_function.as_deref(), Some("hash_thing"));
}
#[test]
fn first_arg_identifier_extracted() {
let src = "import hashlib\nh = hashlib.sha1(password)\n";
let ev = extract(src, "sha1");
assert_eq!(ev.first_arg_ident.as_deref(), Some("password"));
}
#[test]
fn first_arg_string_literal_yields_no_identifier() {
let src = "import hashlib\nh = hashlib.sha1(b'data')\n";
let ev = extract(src, "sha1");
assert!(ev.first_arg_ident.is_none());
}
#[test]
fn first_arg_expression_yields_no_identifier() {
let src = "import hashlib\nh = hashlib.sha1(s + nonce)\n";
let ev = extract(src, "sha1");
assert!(ev.first_arg_ident.is_none());
}
#[test]
fn first_arg_call_yields_no_identifier() {
let src = "import hashlib\nh = hashlib.sha1(get_password())\n";
let ev = extract(src, "sha1");
assert!(ev.first_arg_ident.is_none());
}
#[test]
fn hexdigest_truncated_is_detected() {
let src = "import hashlib\nh = hashlib.sha1(s).hexdigest()[:8]\n";
let ev = extract(src, "sha1");
assert!(ev.result_truncated);
}
#[test]
fn untruncated_hexdigest_is_not_detected() {
let src = "import hashlib\nh = hashlib.sha1(s).hexdigest()\n";
let ev = extract(src, "sha1");
assert!(!ev.result_truncated);
}
#[test]
fn digest_indexed_with_int_is_not_truncation() {
let src = "import hashlib\nh = hashlib.sha1(s).digest()[0]\n";
let ev = extract(src, "sha1");
assert!(!ev.result_truncated);
}
#[test]
fn input_with_os_urandom_is_detected() {
let src = "import os, hashlib\nh = hashlib.sha1(os.urandom(8)).hexdigest()\n";
let ev = extract(src, "sha1");
assert!(ev.input_includes_urandom);
}
#[test]
fn input_with_nested_os_urandom_is_detected() {
let src = "import os, hashlib\nh = hashlib.sha1(os.urandom(8) + nonce).hexdigest()\n";
let ev = extract(src, "sha1");
assert!(ev.input_includes_urandom);
}
#[test]
fn input_without_os_urandom_is_not_detected() {
let src = "import hashlib\nh = hashlib.sha1(b'data').hexdigest()\n";
let ev = extract(src, "sha1");
assert!(!ev.input_includes_urandom);
}
#[test]
fn usedforsecurity_false_kwarg_is_detected() {
let src = "import hashlib\nh = hashlib.md5(b'data', usedforsecurity=False)\n";
let ev = extract(src, "md5");
assert!(ev.usedforsecurity_false);
}
#[test]
fn usedforsecurity_true_is_not_detected_as_false() {
let src = "import hashlib\nh = hashlib.md5(b'data', usedforsecurity=True)\n";
let ev = extract(src, "md5");
assert!(!ev.usedforsecurity_false);
}
#[test]
fn usedforsecurity_variable_is_not_detected_as_false() {
let src = "import hashlib\nh = hashlib.md5(b'data', usedforsecurity=flag)\n";
let ev = extract(src, "md5");
assert!(!ev.usedforsecurity_false);
}
#[test]
fn missing_usedforsecurity_kwarg_is_false() {
let src = "import hashlib\nh = hashlib.md5(b'data')\n";
let ev = extract(src, "md5");
assert!(!ev.usedforsecurity_false);
}
#[test]
fn protocol_required_annotation_extracted() {
let src = "import hashlib\nh = hashlib.sha1(s) # repotoire: protocol-required[RFC7616]\n";
let ev = extract(src, "sha1");
assert_eq!(ev.protocol_required_annotation.as_deref(), Some("RFC7616"),);
}
#[test]
fn no_annotation_yields_none() {
let src = "import hashlib\nh = hashlib.sha1(s)\n";
let ev = extract(src, "sha1");
assert!(ev.protocol_required_annotation.is_none());
}
#[test]
fn unrelated_annotation_yields_none() {
let src = "import hashlib\nh = hashlib.sha1(s) # repotoire: low-entropy[md5]\n";
let ev = extract(src, "sha1");
assert!(ev.protocol_required_annotation.is_none());
}
#[test]
fn worked_example_extracts_all_four_signals() {
let src = "
import os
import hashlib
class DigestAuth:
def _get_client_nonce(self):
s = b''
s = os.urandom(8) + s
return hashlib.sha1(s).hexdigest()[:8]
";
let ev = extract(src, "sha1");
assert_eq!(ev.enclosing_class.as_deref(), Some("DigestAuth"));
assert_eq!(ev.first_arg_ident.as_deref(), Some("s"));
assert!(ev.result_truncated);
assert!(
!ev.input_includes_urandom,
"first_arg_ident=`s` doesn't itself contain os.urandom; data flow is out of scope"
);
assert!(!ev.usedforsecurity_false);
assert!(ev.protocol_required_annotation.is_none());
}
#[test]
fn worked_example_inline_extracts_all_four_signals() {
let src = "
import os
import hashlib
class DigestAuth:
def _get_client_nonce(self):
return hashlib.sha1(os.urandom(8)).hexdigest()[:8]
";
let ev = extract(src, "sha1");
assert_eq!(ev.enclosing_class.as_deref(), Some("DigestAuth"));
assert!(ev.first_arg_ident.is_none());
assert!(ev.result_truncated);
assert!(ev.input_includes_urandom);
}
}