use super::predict::{
extract_xxe_safe_reason, extract_xxe_vulnerable_source, matches_user_input, Evidence, XmlApi,
};
use crate::detectors::security::ast_helpers::{
collect_named_args, enclosing_python_function, node_text, python_kwarg_truthy,
};
use std::collections::HashSet;
use tree_sitter::Node;
pub(super) struct PythonXmlSite<'a> {
pub call_node: Node<'a>,
pub api: XmlApi,
}
pub(super) fn collect_python_xml_sites<'a>(
module_root: Node<'a>,
source: &'a [u8],
) -> Vec<PythonXmlSite<'a>> {
let imports = collect_xml_imports(module_root, source);
if imports.is_empty() {
return Vec::new();
}
let mut sites = Vec::new();
let cursor = module_root.walk();
let mut stack: Vec<Node<'_>> = vec![module_root];
while let Some(node) = stack.pop() {
for child in node.children(&mut cursor.clone()) {
stack.push(child);
}
if node.kind() != "call" {
continue;
}
let Some(func) = node.child_by_field_name("function") else {
continue;
};
let func_text = node_text(func, source).unwrap_or("");
if !is_xml_parse_callee(func_text) {
continue;
}
let api = classify_xml_api(node, source, &imports);
if !api.is_python() {
continue;
}
sites.push(PythonXmlSite {
call_node: node,
api,
});
}
sites
}
fn is_xml_parse_callee(func_text: &str) -> bool {
let tail = func_text.rsplit('.').next().unwrap_or(func_text);
matches!(
tail,
"parse"
| "parseString"
| "parseFile"
| "fromstring"
| "XMLParser"
| "XMLReader"
| "XMLTreeBuilder"
| "DocumentBuilder"
| "SAXParser"
| "iterparse"
)
}
pub(super) fn extract_python_evidence<'a>(
call_node: Node<'a>,
module_root: Node<'a>,
source: &'a [u8],
lines: &[&str],
) -> Evidence {
let mut ev = Evidence::default();
let imports = collect_xml_imports(module_root, source);
ev.import_defusedxml = imports
.iter()
.any(|m| m == "defusedxml" || m.starts_with("defusedxml."));
ev.import_lxml_etree = imports
.iter()
.any(|m| m == "lxml" || m == "lxml.etree" || m.starts_with("lxml."));
if let Some(fn_node) = enclosing_python_function(call_node) {
if let Some(name_node) = fn_node.child_by_field_name("name") {
if let Some(name) = node_text(name_node, source) {
ev.enclosing_function = Some(name.to_string());
}
}
}
ev.enclosing_class = enclosing_python_class_name(call_node, source);
ev.api = Some(classify_xml_api(call_node, source, &imports));
let mut all_args: Vec<Node<'_>> = Vec::new();
if let Some(args_node) = call_node.child_by_field_name("arguments") {
all_args = collect_named_args(args_node);
}
ev.kw_resolve_entities_false = python_kwarg_falsy(&all_args, "resolve_entities", source);
ev.kw_no_network_true = python_kwarg_truthy(&all_args, "no_network", source, false);
ev.kw_forbid_dtd_true = python_kwarg_truthy(&all_args, "forbid_dtd", source, false);
let mut cursor = call_node.walk();
for arg in call_node.children(&mut cursor) {
if arg.kind() == "argument_list" {
scan_inline_xmlparser_kwargs(arg, source, &mut ev);
}
}
let line_idx = call_node.start_position().row;
let start = line_idx.saturating_sub(10);
let mut found_user_input = false;
for prev in &lines[start..line_idx] {
if matches_user_input(prev) {
found_user_input = true;
break;
}
}
ev.has_user_input_flow = found_user_input;
if let Some(line) = lines.get(line_idx) {
ev.xxe_safe_annotation = extract_xxe_safe_reason(line);
ev.xxe_vulnerable_annotation = extract_xxe_vulnerable_source(line);
}
ev
}
fn collect_xml_imports<'a>(root: Node<'a>, source: &'a [u8]) -> HashSet<String> {
let mut set = HashSet::new();
let mut cursor = root.walk();
for top in root.children(&mut cursor) {
match top.kind() {
"import_statement" => {
let mut nc = top.walk();
for child in top.children(&mut nc) {
if !child.is_named() {
continue;
}
let module_name = match child.kind() {
"dotted_name" => node_text(child, source).map(str::to_string),
"aliased_import" => child
.child_by_field_name("name")
.and_then(|n| node_text(n, source))
.map(str::to_string),
_ => None,
};
if let Some(name) = module_name {
if is_xml_module(&name) {
set.insert(name);
}
}
}
}
"import_from_statement" => {
if let Some(m) = top.child_by_field_name("module_name") {
if let Some(name) = node_text(m, source) {
if is_xml_module(name) {
set.insert(name.to_string());
}
}
}
}
_ => {}
}
}
set
}
fn is_xml_module(name: &str) -> bool {
const XML_MODULES: &[&str] = &[
"defusedxml",
"lxml",
"lxml.etree",
"xml",
"xml.etree",
"xml.etree.ElementTree",
"xml.etree.cElementTree",
"xml.sax",
"xml.dom",
"xml.dom.minidom",
"xml.dom.pulldom",
];
XML_MODULES
.iter()
.any(|m| name == *m || name.starts_with(&format!("{m}.")))
}
fn collect_xml_aliases<'a>(
root: Node<'a>,
source: &'a [u8],
) -> std::collections::HashMap<String, String> {
let mut map = std::collections::HashMap::new();
let mut cursor = root.walk();
for top in root.children(&mut cursor) {
match top.kind() {
"import_statement" => {
let mut nc = top.walk();
for child in top.children(&mut nc) {
if !child.is_named() {
continue;
}
match child.kind() {
"aliased_import" => {
let module = child
.child_by_field_name("name")
.and_then(|n| node_text(n, source));
let alias = child
.child_by_field_name("alias")
.and_then(|n| node_text(n, source));
if let (Some(m), Some(a)) = (module, alias) {
if is_xml_module(m) {
map.insert(a.to_string(), m.to_string());
}
}
}
"dotted_name" => {
}
_ => {}
}
}
}
"import_from_statement" => {
let module = top
.child_by_field_name("module_name")
.and_then(|n| node_text(n, source));
let Some(module) = module else { continue };
if !is_xml_module(module) {
continue;
}
let module_name_id = top.child_by_field_name("module_name").map(|n| n.id());
let mut nc = top.walk();
for child in top.children(&mut nc) {
if !child.is_named() || Some(child.id()) == module_name_id {
continue;
}
match child.kind() {
"dotted_name" => {
if let Some(name) = node_text(child, source) {
map.insert(name.to_string(), module.to_string());
}
}
"aliased_import" => {
let alias = child
.child_by_field_name("alias")
.and_then(|n| node_text(n, source));
if let Some(a) = alias {
map.insert(a.to_string(), module.to_string());
}
}
_ => {}
}
}
}
_ => {}
}
}
map
}
fn classify_xml_api<'a>(
call_node: Node<'a>,
source: &'a [u8],
imports: &HashSet<String>,
) -> XmlApi {
let module_root = walk_to_module(call_node);
let aliases = collect_xml_aliases(module_root, source);
let Some(func) = call_node.child_by_field_name("function") else {
return XmlApi::Unknown;
};
let func_text = node_text(func, source).unwrap_or("");
let leftmost = leftmost_identifier(func_text);
let resolved_module = aliases
.get(leftmost)
.cloned()
.unwrap_or_else(|| leftmost.to_string());
if resolved_module.starts_with("defusedxml") {
return XmlApi::Defusedxml;
}
if resolved_module == "lxml" || resolved_module.starts_with("lxml.") {
return XmlApi::LxmlEtree;
}
if resolved_module.starts_with("xml.etree") {
return XmlApi::StdlibElementTree;
}
if resolved_module.starts_with("xml.sax") || resolved_module.starts_with("xml.dom") {
return XmlApi::StdlibOther;
}
if imports.iter().any(|m| m.starts_with("defusedxml")) && leftmost.starts_with("defused") {
return XmlApi::Defusedxml;
}
if imports.contains("lxml.etree") || imports.contains("lxml") {
return XmlApi::LxmlEtree;
}
if imports.iter().any(|m| m.starts_with("xml.etree")) {
return XmlApi::StdlibElementTree;
}
if imports
.iter()
.any(|m| m.starts_with("xml.sax") || m.starts_with("xml.dom"))
{
return XmlApi::StdlibOther;
}
XmlApi::Unknown
}
fn leftmost_identifier(text: &str) -> &str {
text.split('.').next().unwrap_or(text)
}
fn walk_to_module(node: Node<'_>) -> Node<'_> {
let mut cur = node;
while let Some(parent) = cur.parent() {
cur = parent;
}
cur
}
fn python_kwarg_falsy(args: &[Node<'_>], name: &str, source: &[u8]) -> bool {
for arg in args {
if arg.kind() != "keyword_argument" {
continue;
}
let Some(arg_name) = arg.child_by_field_name("name") else {
continue;
};
let Some(arg_name_text) = node_text(arg_name, source) else {
continue;
};
if arg_name_text != name {
continue;
}
let Some(value) = arg.child_by_field_name("value") else {
continue;
};
if let Some(value_text) = node_text(value, source) {
return value_text.trim() == "False";
}
}
false
}
fn scan_inline_xmlparser_kwargs<'a>(arg_list: Node<'a>, source: &'a [u8], ev: &mut Evidence) {
let mut cursor = arg_list.walk();
for child in arg_list.children(&mut cursor) {
if child.kind() != "call" {
continue;
}
let Some(func) = child.child_by_field_name("function") else {
continue;
};
let func_text = node_text(func, source).unwrap_or("");
if !func_text.ends_with("XMLParser") {
continue;
}
let Some(inner_args) = child.child_by_field_name("arguments") else {
continue;
};
let inner = collect_named_args(inner_args);
if !ev.kw_resolve_entities_false && python_kwarg_falsy(&inner, "resolve_entities", source) {
ev.kw_resolve_entities_false = true;
}
if !ev.kw_no_network_true && python_kwarg_truthy(&inner, "no_network", source, false) {
ev.kw_no_network_true = true;
}
if !ev.kw_forbid_dtd_true && python_kwarg_truthy(&inner, "forbid_dtd", source, false) {
ev.kw_forbid_dtd_true = true;
}
}
}
fn enclosing_python_class_name<'a>(node: Node<'a>, source: &'a [u8]) -> Option<String> {
let mut cur = node.parent()?;
loop {
if cur.kind() == "class_definition" {
let name = cur.child_by_field_name("name")?;
return node_text(name, source).map(str::to_string);
}
if cur.kind() == "module" {
return None;
}
cur = cur.parent()?;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::detectors::ast_fingerprint::parse_root_ext;
use crate::parsers::lightweight::Language;
fn first_call_with_attr<'tree>(
tree: &'tree tree_sitter::Tree,
source: &[u8],
attr_name: &str,
) -> tree_sitter::Node<'tree> {
fn walk<'a>(
node: tree_sitter::Node<'a>,
source: &[u8],
attr_name: &str,
) -> Option<tree_sitter::Node<'a>> {
if node.kind() == "call" {
if let Some(func) = node.child_by_field_name("function") {
let text = node_text(func, source).unwrap_or("");
let last = text.rsplit('.').next().unwrap_or("");
if last == attr_name {
return Some(node);
}
}
}
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if let Some(found) = walk(child, source, attr_name) {
return Some(found);
}
}
None
}
walk(tree.root_node(), source, attr_name)
.unwrap_or_else(|| panic!("no call to {} found in source", attr_name))
}
fn extract(src: &str, attr: &str) -> Evidence {
let tree = parse_root_ext(src, Language::Python, "py").expect("parse python");
let root = tree.root_node();
let call = first_call_with_attr(&tree, src.as_bytes(), attr);
let lines: Vec<&str> = src.lines().collect();
extract_python_evidence(call, root, src.as_bytes(), &lines)
}
#[test]
fn detects_defusedxml_import() {
let src = "import defusedxml.ElementTree as ET\nET.parse('x.xml')\n";
let ev = extract(src, "parse");
assert!(ev.import_defusedxml);
assert!(!ev.import_lxml_etree);
assert_eq!(ev.api, Some(XmlApi::Defusedxml));
}
#[test]
fn detects_lxml_etree_import() {
let src = "from lxml import etree\netree.parse('x.xml')\n";
let ev = extract(src, "parse");
assert!(ev.import_lxml_etree);
assert!(!ev.import_defusedxml);
assert_eq!(ev.api, Some(XmlApi::LxmlEtree));
}
#[test]
fn detects_stdlib_etree_import() {
let src = "import xml.etree.ElementTree as ET\nET.parse('x.xml')\n";
let ev = extract(src, "parse");
assert!(!ev.import_defusedxml);
assert!(!ev.import_lxml_etree);
assert_eq!(ev.api, Some(XmlApi::StdlibElementTree));
}
#[test]
fn detects_xml_sax_import() {
let src = "import xml.sax\nxml.sax.parse('x.xml', handler)\n";
let ev = extract(src, "parse");
assert_eq!(ev.api, Some(XmlApi::StdlibOther));
}
#[test]
fn detects_xml_dom_minidom_import() {
let src = "from xml.dom import minidom\nminidom.parse('x.xml')\n";
let ev = extract(src, "parse");
assert_eq!(ev.api, Some(XmlApi::StdlibOther));
}
#[test]
fn detects_resolve_entities_false_on_parser_constructor() {
let src = "\
from lxml import etree\n\
etree.parse(blob, etree.XMLParser(resolve_entities=False))\n";
let ev = extract(src, "parse");
assert!(ev.kw_resolve_entities_false);
}
#[test]
fn detects_no_network_true() {
let src = "\
from lxml import etree\n\
etree.parse(blob, etree.XMLParser(no_network=True))\n";
let ev = extract(src, "parse");
assert!(ev.kw_no_network_true);
}
#[test]
fn detects_forbid_dtd_true() {
let src = "\
import defusedxml.ElementTree as ET\n\
ET.parse(blob, forbid_dtd=True)\n";
let ev = extract(src, "parse");
assert!(ev.kw_forbid_dtd_true);
}
#[test]
fn resolve_entities_true_does_not_fire_falsy_signal() {
let src = "\
from lxml import etree\n\
etree.parse(blob, etree.XMLParser(resolve_entities=True))\n";
let ev = extract(src, "parse");
assert!(!ev.kw_resolve_entities_false);
}
#[test]
fn non_literal_kwarg_value_is_not_classified_falsy() {
let src = "\
from lxml import etree\n\
etree.parse(blob, etree.XMLParser(resolve_entities=flag))\n";
let ev = extract(src, "parse");
assert!(!ev.kw_resolve_entities_false);
}
#[test]
fn detects_request_data_within_lookback_window() {
let src = "\
import xml.etree.ElementTree as ET\n\
def handle(request):\n\
\x20 blob = request.data\n\
\x20 return ET.parse(blob)\n";
let ev = extract(src, "parse");
assert!(ev.has_user_input_flow);
}
#[test]
fn no_user_input_flow_for_local_file_read() {
let src = "\
import xml.etree.ElementTree as ET\n\
blob = open('config.xml').read()\n\
ET.parse(blob)\n";
let ev = extract(src, "parse");
assert!(!ev.has_user_input_flow);
}
#[test]
fn detects_uploaded_file_input() {
let src = "\
import xml.etree.ElementTree as ET\n\
uploaded = request.files['xml']\n\
ET.parse(uploaded)\n";
let ev = extract(src, "parse");
assert!(ev.has_user_input_flow);
}
#[test]
fn detects_enclosing_function() {
let src = "\
import xml.etree.ElementTree as ET\n\
def handle_upload(request):\n\
\x20 ET.parse(request.data)\n";
let ev = extract(src, "parse");
assert_eq!(ev.enclosing_function, Some("handle_upload".to_string()));
}
#[test]
fn detects_enclosing_class() {
let src = "\
import xml.etree.ElementTree as ET\n\
class UploadHandler:\n\
\x20 def parse(self, blob):\n\
\x20 ET.parse(blob)\n";
let ev = extract(src, "parse");
assert_eq!(ev.enclosing_class, Some("UploadHandler".to_string()));
}
#[test]
fn no_enclosing_class_at_module_level() {
let src = "\
import xml.etree.ElementTree as ET\n\
ET.parse('x.xml')\n";
let ev = extract(src, "parse");
assert_eq!(ev.enclosing_class, None);
}
#[test]
fn detects_xxe_safe_annotation() {
let src = "\
import xml.etree.ElementTree as ET\n\
ET.parse(blob) # repotoire: xxe-safe[xsd-validated]\n";
let ev = extract(src, "parse");
assert_eq!(ev.xxe_safe_annotation, Some("xsd-validated".to_string()));
assert_eq!(ev.xxe_vulnerable_annotation, None);
}
#[test]
fn detects_xxe_vulnerable_annotation() {
let src = "\
import xml.etree.ElementTree as ET\n\
ET.parse(blob) # repotoire: xxe-vulnerable[audited]\n";
let ev = extract(src, "parse");
assert_eq!(ev.xxe_vulnerable_annotation, Some("audited".to_string()));
assert_eq!(ev.xxe_safe_annotation, None);
}
#[test]
fn ignores_unrelated_annotation_kinds() {
let src = "\
import xml.etree.ElementTree as ET\n\
ET.parse(blob) # repotoire: command-static[ok]\n";
let ev = extract(src, "parse");
assert_eq!(ev.xxe_safe_annotation, None);
assert_eq!(ev.xxe_vulnerable_annotation, None);
}
#[test]
fn leftmost_identifier_handles_dotted_chains() {
assert_eq!(leftmost_identifier("ET.parse"), "ET");
assert_eq!(leftmost_identifier("xml.etree.ElementTree.parse"), "xml");
assert_eq!(leftmost_identifier("parse"), "parse");
}
#[test]
fn is_xml_module_matches_exact_and_submodules() {
assert!(is_xml_module("defusedxml"));
assert!(is_xml_module("defusedxml.ElementTree"));
assert!(is_xml_module("lxml.etree"));
assert!(is_xml_module("xml.etree.ElementTree"));
assert!(is_xml_module("xml.sax"));
assert!(is_xml_module("xml.dom.minidom"));
assert!(!is_xml_module("os"));
assert!(!is_xml_module("subprocess"));
assert!(!is_xml_module("xmllint"));
}
#[test]
fn worked_example_canonical_lxml_safe_extraction() {
let src = "\
from lxml import etree\n\
def parse_user_xml(blob):\n\
\x20 parser = etree.XMLParser(resolve_entities=False, no_network=True)\n\
\x20 return etree.parse(blob, parser)\n";
let ev = extract(src, "parse");
assert!(ev.import_lxml_etree);
assert_eq!(ev.api, Some(XmlApi::LxmlEtree));
assert!(!ev.kw_resolve_entities_false);
assert!(!ev.kw_no_network_true);
}
#[test]
fn worked_example_canonical_lxml_safe_inline_extraction() {
let src = "\
from lxml import etree\n\
def parse_user_xml(blob):\n\
\x20 return etree.parse(blob, etree.XMLParser(resolve_entities=False, no_network=True))\n";
let ev = extract(src, "parse");
assert!(ev.import_lxml_etree);
assert_eq!(ev.api, Some(XmlApi::LxmlEtree));
assert!(ev.kw_resolve_entities_false);
assert!(ev.kw_no_network_true);
}
#[test]
fn worked_example_canonical_realbug_extraction() {
let src = "\
import xml.etree.ElementTree as ET\n\
def handle_upload(request):\n\
\x20 blob = request.data\n\
\x20 return ET.parse(blob)\n";
let ev = extract(src, "parse");
assert!(!ev.import_defusedxml);
assert_eq!(ev.api, Some(XmlApi::StdlibElementTree));
assert!(ev.has_user_input_flow);
assert_eq!(ev.enclosing_function, Some("handle_upload".to_string()));
}
#[test]
fn unused_defusedxml_import_pins_v0_limitation() {
let src = "\
import defusedxml.ElementTree # not used\n\
import xml.etree.ElementTree as ET\n\
def parse_blob(blob):\n\
\x20 return ET.parse(blob)\n";
let ev = extract(src, "parse");
assert!(ev.import_defusedxml);
assert_eq!(ev.api, Some(XmlApi::StdlibElementTree));
}
#[test]
fn xxe_safe_annotation_records_alongside_other_signals() {
let src = "\
import xml.etree.ElementTree as ET\n\
def handle_upload(request):\n\
\x20 blob = request.data\n\
\x20 return ET.parse(blob) # repotoire: xxe-safe[xsd-validated]\n";
let ev = extract(src, "parse");
assert_eq!(ev.xxe_safe_annotation, Some("xsd-validated".to_string()));
assert!(ev.has_user_input_flow);
assert_eq!(ev.enclosing_function, Some("handle_upload".to_string()));
assert_eq!(ev.api, Some(XmlApi::StdlibElementTree));
}
}