use super::annotation::parse_python_comment;
use crate::dual_branch::{
AlternativeBranch, BranchLabel, PredictionReason, PredictionReasonKind, ResolutionKind,
ResolutionSignal,
};
use crate::models::Severity;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum XmlApi {
Defusedxml,
LxmlEtree,
StdlibElementTree,
StdlibOther,
Unknown,
}
impl XmlApi {
pub(super) fn callee_label(self) -> &'static str {
match self {
XmlApi::Defusedxml => "defusedxml",
XmlApi::LxmlEtree => "lxml.etree",
XmlApi::StdlibElementTree => "xml.etree.ElementTree",
XmlApi::StdlibOther => "xml.sax / xml.dom",
XmlApi::Unknown => "XML parser",
}
}
pub(super) fn is_stdlib_unsafe(self) -> bool {
matches!(self, XmlApi::StdlibElementTree | XmlApi::StdlibOther)
}
pub(super) fn is_python(self) -> bool {
matches!(
self,
XmlApi::Defusedxml
| XmlApi::LxmlEtree
| XmlApi::StdlibElementTree
| XmlApi::StdlibOther
)
}
}
const W_IMPORT_DEFUSEDXML: f32 = 0.30;
const W_IMPORT_LXML_ETREE: f32 = 0.10;
const W_KW_RESOLVE_ENTITIES_FALSE: f32 = 0.40;
const W_KW_NO_NETWORK_TRUE: f32 = 0.30;
const W_KW_FORBID_DTD_TRUE: f32 = 0.40;
const W_ENCLOSING_TEST_FUNCTION: f32 = 0.15;
const W_USER_INPUT_FLOW: f32 = -0.50;
const W_ENCLOSING_HANDLER: f32 = -0.30;
const W_STDLIB_UNSAFE_PARSER: f32 = -0.20;
const W_API_DEFUSEDXML_CALL: f32 = 1.0;
const USER_INPUT_SUBSTRINGS: &[&str] = &[
"req.body",
"req.file",
"req.files",
"request.data",
"request.body",
"request.files",
"request.get_data",
"uploaded",
"file_content",
"getinputstream",
];
const TEST_FUNCTION_SUBSTRINGS: &[&str] = &["test_", "_test", "fixture", "setup", "teardown"];
const HANDLER_FUNCTION_SUBSTRINGS: &[&str] = &[
"handler",
"route",
"endpoint",
"view",
"controller",
"middleware",
"request",
"response",
"upload",
"import",
"parse",
];
#[derive(Debug, Clone, Default, PartialEq)]
pub(super) struct Evidence {
pub api: Option<XmlApi>,
pub enclosing_function: Option<String>,
pub enclosing_class: Option<String>,
pub import_defusedxml: bool,
pub import_lxml_etree: bool,
pub kw_resolve_entities_false: bool,
pub kw_no_network_true: bool,
pub kw_forbid_dtd_true: bool,
pub has_user_input_flow: bool,
pub xxe_safe_annotation: Option<String>,
pub xxe_vulnerable_annotation: Option<String>,
}
impl Evidence {
#[cfg(test)]
pub(super) fn empty() -> Self {
Self::default()
}
}
#[derive(Debug, Clone)]
pub(super) struct Prediction {
pub predicted: BranchLabel,
pub alternative_branch: AlternativeBranch,
pub predicted_severity: Severity,
pub reasons: Vec<PredictionReason>,
pub resolutions: Vec<ResolutionSignal>,
}
pub(super) fn predict(evidence: &Evidence) -> Prediction {
let api = evidence.api.unwrap_or(XmlApi::Unknown);
let api_label = api.callee_label();
if let Some(reason) = &evidence.xxe_safe_annotation {
return collapse(
BranchLabel::Benign,
api,
evidence.has_user_input_flow,
ResolutionSignal {
kind: ResolutionKind::SourceAnnotation {
syntax: format!("# repotoire: xxe-safe[{reason}]"),
},
description: format!(
"`xxe-safe[{reason}]` annotation declares this XML \
parse as protected (caller-side validation, XSD \
pre-check, etc.); the finding collapses to Info."
),
example: Some(format!("{api_label}(...) # repotoire: xxe-safe[{reason}]")),
collapses_to: BranchLabel::Benign,
},
PredictionReason {
kind: PredictionReasonKind::Custom {
description: format!("xxe-safe[{reason}] annotation"),
},
weight: 1.0,
note: format!(
"Annotated as caller-validated ({reason}); not an \
XXE risk."
),
},
);
}
if let Some(source) = &evidence.xxe_vulnerable_annotation {
return collapse(
BranchLabel::RealBug,
api,
evidence.has_user_input_flow,
ResolutionSignal {
kind: ResolutionKind::SourceAnnotation {
syntax: format!("# repotoire: xxe-vulnerable[{source}]"),
},
description: format!(
"`xxe-vulnerable[{source}]` annotation declares this \
XML parser as exposed to attacker-controlled XML; \
the finding stays at the existing severity."
),
example: Some(format!(
"{api_label}(...) # repotoire: xxe-vulnerable[{source}]"
)),
collapses_to: BranchLabel::RealBug,
},
PredictionReason {
kind: PredictionReasonKind::Custom {
description: format!("xxe-vulnerable[{source}] annotation"),
},
weight: -1.0,
note: format!("Annotated as XXE-exposed (source: {source})."),
},
);
}
if matches!(api, XmlApi::Defusedxml) {
return collapse(
BranchLabel::Benign,
api,
evidence.has_user_input_flow,
ResolutionSignal {
kind: ResolutionKind::StructuralPattern {
description: "call on defusedxml API (safe-by-construction)".to_string(),
},
description: "`defusedxml.*` parsers are safe-by-construction against XXE: \
entity resolution, DTD processing, and external network \
fetches are all disabled by default. The input source \
(user-controlled or not) is irrelevant once entity \
resolution is off at the parser level."
.to_string(),
example: Some(format!("{api_label}(...)")),
collapses_to: BranchLabel::Benign,
},
PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "parse call uses defusedxml (safe-by-construction)".to_string(),
},
weight: W_API_DEFUSEDXML_CALL,
note: "The call site is on a `defusedxml.*` API. defusedxml \
disables all entity-resolution defaults, so user-\
controlled XML cannot mount an XXE attack. Strongest \
Benign signal in the v0 model."
.to_string(),
},
);
}
let mut sum: f32 = 0.0;
let mut reasons: Vec<PredictionReason> = Vec::new();
if evidence.import_defusedxml {
sum += W_IMPORT_DEFUSEDXML;
reasons.push(PredictionReason {
kind: PredictionReasonKind::ImportPresence {
module: "defusedxml".to_string(),
},
weight: W_IMPORT_DEFUSEDXML,
note: "`defusedxml` is safe-by-default for XML parsing. \
The import is weak signal (file-scoped, not call-\
scoped per v0); a stdlib parse in the same file can \
still flip the verdict to RealBug."
.to_string(),
});
}
if evidence.import_lxml_etree {
sum += W_IMPORT_LXML_ETREE;
reasons.push(PredictionReason {
kind: PredictionReasonKind::ImportPresence {
module: "lxml.etree".to_string(),
},
weight: W_IMPORT_LXML_ETREE,
note: "`lxml.etree` is capable of safe configuration via \
`resolve_entities=False` / `no_network=True` but is \
not safe-by-default. Weak signal on its own; pairs \
with the kwarg signals."
.to_string(),
});
}
if evidence.kw_resolve_entities_false {
sum += W_KW_RESOLVE_ENTITIES_FALSE;
reasons.push(PredictionReason {
kind: PredictionReasonKind::KeywordArgument {
name: "resolve_entities".to_string(),
value: "False".to_string(),
},
weight: W_KW_RESOLVE_ENTITIES_FALSE,
note: "`resolve_entities=False` on the lxml parser disables \
external entity resolution; the OWASP-recommended \
protection against XXE."
.to_string(),
});
}
if evidence.kw_no_network_true {
sum += W_KW_NO_NETWORK_TRUE;
reasons.push(PredictionReason {
kind: PredictionReasonKind::KeywordArgument {
name: "no_network".to_string(),
value: "True".to_string(),
},
weight: W_KW_NO_NETWORK_TRUE,
note: "`no_network=True` closes the SSRF-via-XXE leg by \
preventing the parser from fetching external entities \
over the network."
.to_string(),
});
}
if evidence.kw_forbid_dtd_true {
sum += W_KW_FORBID_DTD_TRUE;
reasons.push(PredictionReason {
kind: PredictionReasonKind::KeywordArgument {
name: "forbid_dtd".to_string(),
value: "True".to_string(),
},
weight: W_KW_FORBID_DTD_TRUE,
note: "`forbid_dtd=True` is the defusedxml opt-in to refuse \
any DOCTYPE declaration; closes the entire XXE class."
.to_string(),
});
}
if evidence.has_user_input_flow {
sum += W_USER_INPUT_FLOW;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "user input flows to parser within 10 lines".to_string(),
},
weight: W_USER_INPUT_FLOW,
note: "Request body / uploaded data / file content flows \
into the parse call within a 10-line lookback window. \
Attacker controls the XML being parsed."
.to_string(),
});
}
if api.is_stdlib_unsafe() {
sum += W_STDLIB_UNSAFE_PARSER;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "parser uses stdlib xml module without protection".to_string(),
},
weight: W_STDLIB_UNSAFE_PARSER,
note: "The stdlib `xml.*` modules are documented as unsafe \
by default in Python ≤ 3.11 \
(https://docs.python.org/3/library/xml.html#xml-vulnerabilities). \
Without explicit hardening, this parser will resolve \
external entities."
.to_string(),
});
}
if let Some(fn_name) = &evidence.enclosing_function {
if matches_test_function(fn_name) {
sum += W_ENCLOSING_TEST_FUNCTION;
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "function".to_string(),
name: fn_name.clone(),
},
weight: W_ENCLOSING_TEST_FUNCTION,
note: format!(
"Enclosing function `{fn_name}` looks like a \
test/fixture; test code rarely the actionable \
security target."
),
});
} else if matches_handler_function(fn_name) {
sum += W_ENCLOSING_HANDLER;
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "request_handler".to_string(),
name: fn_name.clone(),
},
weight: W_ENCLOSING_HANDLER,
note: format!(
"Enclosing function `{fn_name}` looks like a request \
handler (`handler`/`route`/`endpoint`/`view`/\
`controller`/`upload`/`parse`); higher prior on \
attacker-reachable code."
),
});
}
}
let predicted = if sum > 0.0 {
BranchLabel::Benign
} else {
BranchLabel::RealBug
};
build_prediction(
predicted,
api,
evidence.has_user_input_flow,
reasons,
Vec::new(),
)
}
pub(super) fn matches_user_input(text: &str) -> bool {
let lower = text.to_lowercase();
USER_INPUT_SUBSTRINGS.iter().any(|sub| lower.contains(sub))
}
fn matches_test_function(name: &str) -> bool {
let lower = name.to_lowercase();
TEST_FUNCTION_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
fn matches_handler_function(name: &str) -> bool {
let lower = name.to_lowercase();
HANDLER_FUNCTION_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
fn collapse(
label: BranchLabel,
api: XmlApi,
has_user_input: bool,
resolution: ResolutionSignal,
reason: PredictionReason,
) -> Prediction {
build_prediction(label, api, has_user_input, vec![reason], vec![resolution])
}
fn build_prediction(
predicted: BranchLabel,
api: XmlApi,
has_user_input: bool,
reasons: Vec<PredictionReason>,
resolutions: Vec<ResolutionSignal>,
) -> Prediction {
let api_label = api.callee_label();
let predicted_severity = severity_for_branch(predicted, has_user_input);
let alternative_label = predicted.opposite();
let alternative_severity = severity_for_branch(alternative_label, has_user_input);
let alternative_branch = AlternativeBranch {
label: alternative_label,
severity: alternative_severity,
title: title_for_branch(alternative_label, api_label),
description: description_for_branch(alternative_label, api_label),
suggested_fix: suggested_fix_for_branch(alternative_label, api_label),
};
Prediction {
predicted,
alternative_branch,
predicted_severity,
reasons,
resolutions,
}
}
fn severity_for_branch(label: BranchLabel, has_user_input: bool) -> Severity {
match label {
BranchLabel::RealBug => {
if has_user_input {
Severity::Critical
} else {
Severity::High
}
}
BranchLabel::Benign => Severity::Info,
}
}
fn title_for_branch(label: BranchLabel, api_label: &str) -> String {
match label {
BranchLabel::RealBug => format!("Potential XXE vulnerability in {api_label} parse"),
BranchLabel::Benign => {
format!("XML parse via {api_label} appears safely configured (informational)")
}
}
}
fn description_for_branch(label: BranchLabel, api_label: &str) -> String {
match label {
BranchLabel::RealBug => format!(
"The `{api_label}` parser appears to be operating without \
protections against external entity resolution. XXE \
vulnerabilities allow attackers to read arbitrary files \
(`file:///etc/passwd`), perform SSRF \
(`http://internal-server/`), launch denial-of-service \
attacks (billion laughs), and port-scan internal networks."
),
BranchLabel::Benign => format!(
"The `{api_label}` parser appears to be either safe-by-\
default (defusedxml) or explicitly hardened \
(`resolve_entities=False`, `no_network=True`). The call \
site is carried as Info; the RealBug interpretation is \
preserved in `alternative_branch` in case the predictor \
is wrong."
),
}
}
fn suggested_fix_for_branch(label: BranchLabel, _api_label: &str) -> Option<String> {
match label {
BranchLabel::RealBug => Some(
"Use `defusedxml` (the safe-by-default Python XML library) \
or explicitly disable external entity resolution on the \
existing parser. Example: `from defusedxml.ElementTree \
import parse; parse(blob)`. For lxml: \
`etree.XMLParser(resolve_entities=False, no_network=True)`."
.to_string(),
),
BranchLabel::Benign => Some(
"If this is intentional safe usage, annotate \
`# repotoire: xxe-safe[<reason>]` to collapse the finding \
to Info definitively."
.to_string(),
),
}
}
pub(super) fn extract_xxe_safe_reason(line: &str) -> Option<String> {
let ann = parse_python_comment(line)?;
if ann.kind != "xxe-safe" {
return None;
}
if ann.args.is_empty() {
Some("unspecified".to_string())
} else {
Some(ann.args[0].clone())
}
}
pub(super) fn extract_xxe_vulnerable_source(line: &str) -> Option<String> {
let ann = parse_python_comment(line)?;
if ann.kind != "xxe-vulnerable" {
return None;
}
if ann.args.is_empty() {
Some("unspecified".to_string())
} else {
Some(ann.args[0].clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn lxml_with_explicit_protection_predicts_benign_info() {
let evidence = Evidence {
api: Some(XmlApi::LxmlEtree),
import_lxml_etree: true,
kw_resolve_entities_false: true,
kw_no_network_true: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
assert_eq!(p.alternative_branch.label, BranchLabel::RealBug);
assert_eq!(p.alternative_branch.severity, Severity::High);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!(
(total - (W_IMPORT_LXML_ETREE + W_KW_RESOLVE_ENTITIES_FALSE + W_KW_NO_NETWORK_TRUE))
.abs()
< 1e-6,
"expected +0.80, got {total}"
);
}
#[test]
fn stdlib_etree_with_user_input_in_handler_predicts_realbug_critical() {
let evidence = Evidence {
api: Some(XmlApi::StdlibElementTree),
has_user_input_flow: true,
enclosing_function: Some("handle_upload".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
assert_eq!(p.alternative_branch.label, BranchLabel::Benign);
assert_eq!(p.alternative_branch.severity, Severity::Info);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!(
(total - (W_STDLIB_UNSAFE_PARSER + W_USER_INPUT_FLOW + W_ENCLOSING_HANDLER)).abs()
< 1e-6,
"expected -1.00, got {total}"
);
}
#[test]
fn unused_defusedxml_import_with_stdlib_parse_predicts_benign() {
let evidence = Evidence {
api: Some(XmlApi::StdlibElementTree),
import_defusedxml: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!(
(total - (W_IMPORT_DEFUSEDXML + W_STDLIB_UNSAFE_PARSER)).abs() < 1e-6,
"expected +0.10, got {total}"
);
}
#[test]
fn unused_defusedxml_in_handler_with_user_input_correctly_flips_realbug() {
let evidence = Evidence {
api: Some(XmlApi::StdlibElementTree),
import_defusedxml: true,
has_user_input_flow: true,
enclosing_function: Some("handle_upload".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!((total - -0.70).abs() < 1e-6, "expected -0.70, got {total}");
}
#[test]
fn defusedxml_canonical_usage_predicts_benign() {
let evidence = Evidence {
api: Some(XmlApi::Defusedxml),
import_defusedxml: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
assert!(
p.reasons.iter().any(|r| r.weight == W_API_DEFUSEDXML_CALL),
"must emit the defusedxml-call reason"
);
}
#[test]
fn defusedxml_call_with_user_input_stays_benign() {
let evidence = Evidence {
api: Some(XmlApi::Defusedxml),
import_defusedxml: true,
has_user_input_flow: true,
enclosing_function: Some("handler".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(
p.predicted,
BranchLabel::Benign,
"defusedxml call stays Benign even with user input + handler"
);
assert_eq!(p.predicted_severity, Severity::Info);
}
#[test]
fn xxe_safe_annotation_collapses_to_benign() {
let evidence = Evidence {
api: Some(XmlApi::StdlibElementTree),
has_user_input_flow: true,
enclosing_function: Some("handle_upload".to_string()),
xxe_safe_annotation: Some("xsd-validated-upstream".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
assert_eq!(p.resolutions.len(), 1);
assert!(matches!(
p.resolutions[0].kind,
ResolutionKind::SourceAnnotation { .. }
));
}
#[test]
fn xxe_vulnerable_annotation_collapses_to_realbug() {
let evidence = Evidence {
api: Some(XmlApi::Defusedxml),
import_defusedxml: true,
kw_forbid_dtd_true: true,
xxe_vulnerable_annotation: Some("audited-untrusted-source".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::High);
}
#[test]
fn empty_evidence_tiebreaks_realbug() {
let p = predict(&Evidence::empty());
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::High);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn realbug_signal_weights_are_negative() {
assert!(W_USER_INPUT_FLOW < 0.0);
assert!(W_ENCLOSING_HANDLER < 0.0);
assert!(W_STDLIB_UNSAFE_PARSER < 0.0);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn benign_signal_weights_are_positive() {
assert!(W_IMPORT_DEFUSEDXML > 0.0);
assert!(W_IMPORT_LXML_ETREE > 0.0);
assert!(W_KW_RESOLVE_ENTITIES_FALSE > 0.0);
assert!(W_KW_NO_NETWORK_TRUE > 0.0);
assert!(W_KW_FORBID_DTD_TRUE > 0.0);
assert!(W_ENCLOSING_TEST_FUNCTION > 0.0);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn defusedxml_weight_is_overcome_by_single_negative_signal() {
let unused_import_baseline = W_IMPORT_DEFUSEDXML + W_STDLIB_UNSAFE_PARSER;
assert!(unused_import_baseline > 0.0);
assert!(unused_import_baseline < W_KW_RESOLVE_ENTITIES_FALSE.abs());
assert!(W_USER_INPUT_FLOW.abs() > unused_import_baseline);
assert!(W_ENCLOSING_HANDLER.abs() > unused_import_baseline);
}
#[test]
fn user_input_lexicon() {
assert!(matches_user_input("blob = request.data"));
assert!(matches_user_input("uploaded_xml = req.files['x']"));
assert!(matches_user_input("content = request.get_data()"));
assert!(!matches_user_input("blob = open('config.xml').read()"));
}
#[test]
fn handler_lexicon() {
assert!(matches_handler_function("handle_upload"));
assert!(matches_handler_function("parse_route"));
assert!(matches_handler_function("import_data"));
assert!(!matches_handler_function("calculate_total"));
}
#[test]
fn test_function_lexicon() {
assert!(matches_test_function("test_xxe_parse"));
assert!(matches_test_function("xxe_test"));
assert!(matches_test_function("setup_fixture"));
assert!(!matches_test_function("parse_data"));
}
#[test]
fn extract_xxe_safe_with_reason() {
assert_eq!(
extract_xxe_safe_reason("ET.parse(blob) # repotoire: xxe-safe[xsd-validated]"),
Some("xsd-validated".to_string())
);
}
#[test]
fn extract_xxe_safe_without_reason() {
assert_eq!(
extract_xxe_safe_reason("ET.parse(blob) # repotoire: xxe-safe"),
Some("unspecified".to_string())
);
}
#[test]
fn extract_xxe_vulnerable_with_source() {
assert_eq!(
extract_xxe_vulnerable_source("ET.parse(blob) # repotoire: xxe-vulnerable[audited]"),
Some("audited".to_string())
);
}
#[test]
fn extract_xxe_safe_ignores_other_kinds() {
assert_eq!(
extract_xxe_safe_reason("subprocess.run(...) # repotoire: command-static[ok]"),
None
);
assert_eq!(
extract_xxe_safe_reason("os.path.join(...) # repotoire: internal-path[ok]"),
None
);
}
#[test]
fn extract_xxe_vulnerable_ignores_other_kinds() {
assert_eq!(
extract_xxe_vulnerable_source(
"subprocess.run(...) # repotoire: command-user-controlled[GET]"
),
None
);
}
#[test]
fn xml_api_is_stdlib_unsafe() {
assert!(XmlApi::StdlibElementTree.is_stdlib_unsafe());
assert!(XmlApi::StdlibOther.is_stdlib_unsafe());
assert!(!XmlApi::Defusedxml.is_stdlib_unsafe());
assert!(!XmlApi::LxmlEtree.is_stdlib_unsafe());
assert!(!XmlApi::Unknown.is_stdlib_unsafe());
}
}