use crate::detectors::base::{Detector, DetectorConfig};
use crate::graph::GraphQueryExt;
use crate::models::{deterministic_finding_id, Finding, Severity};
use anyhow::Result;
use regex::Regex;
use std::path::PathBuf;
use std::sync::LazyLock;
use tracing::info;
static XXE_PATTERN: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"(?i)(xml\.parse|parseXML|XMLParser|DocumentBuilder|SAXParser|etree\.parse|lxml\.etree|xml\.etree|DOMParser|XMLReader|xml\.dom|minidom|pulldom|xml2js|fast-xml-parser|libxml)").expect("valid regex")
});
static USER_INPUT: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"(req\.(body|file|files)|request\.(data|files)|uploaded|file_content|input|read\(|getInputStream)").expect("valid regex")
});
fn get_protection_patterns(ext: &str) -> Vec<&'static str> {
match ext {
"py" => vec![
"resolve_entities=False",
"no_network=True",
"defusedxml",
"forbid_dtd=True",
"forbid_entities=True",
"feature_external_ges",
"feature_external_pes",
"DTDForbidden",
"EntitiesForbidden",
"ExternalReferenceForbidden",
"defused",
],
"java" => vec![
"FEATURE_SECURE_PROCESSING",
"FEATURE_EXTERNAL_GENERAL_ENTITIES",
"FEATURE_EXTERNAL_PARAMETER_ENTITIES",
"FEATURE_DISALLOW_DOCTYPE_DECL",
"setExpandEntityReferences(false)",
],
"js" | "ts" => vec![
"noent: false",
"nonet: true",
"dtdload: false",
"dtdvalid: false",
"explicitEntities: false",
],
"php" => vec![
"LIBXML_NOENT",
"LIBXML_DTDLOAD",
"libxml_disable_entity_loader",
],
"cs" => vec![
"DtdProcessing.Prohibit",
"XmlResolver = null",
"ProhibitDtd = true",
],
"rb" => vec![
"nonet: true",
"noent: false",
"Nokogiri::XML::ParseOptions::NONET",
],
_ => vec![],
}
}
fn get_fix_example(ext: &str) -> &'static str {
match ext {
"py" => {
"```python\n\
# Use defusedxml (recommended)\n\
import defusedxml.ElementTree as ET\n\
tree = ET.parse(xml_file)\n\
\n\
# Or configure lxml safely\n\
from lxml import etree\n\
parser = etree.XMLParser(\n\
resolve_entities=False,\n\
no_network=True,\n\
dtd_validation=False\n\
)\n\
tree = etree.parse(xml_file, parser)\n\
```"
}
"java" => {
"```java\n\
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();\n\
\n\
// Disable XXE\n\
dbf.setFeature(\"http://apache.org/xml/features/disallow-doctype-decl\", true);\n\
dbf.setFeature(\"http://xml.org/sax/features/external-general-entities\", false);\n\
dbf.setFeature(\"http://xml.org/sax/features/external-parameter-entities\", false);\n\
dbf.setXIncludeAware(false);\n\
dbf.setExpandEntityReferences(false);\n\
\n\
DocumentBuilder db = dbf.newDocumentBuilder();\n\
```"
}
"js" | "ts" => {
"```javascript\n\
// Use a safe parser\n\
const { XMLParser } = require('fast-xml-parser');\n\
const parser = new XMLParser({\n\
allowBooleanAttributes: true,\n\
// No external entity resolution by default\n\
});\n\
\n\
// Or configure libxmljs safely\n\
const libxmljs = require('libxmljs');\n\
const doc = libxmljs.parseXml(xmlString, {\n\
noent: false, // Don't expand entities\n\
nonet: true, // Don't fetch from network\n\
dtdload: false\n\
});\n\
```"
}
"php" => {
"```php\n\
// Disable entity loading (PHP < 8.0)\n\
libxml_disable_entity_loader(true);\n\
\n\
// Use LIBXML_NOENT and LIBXML_DTDLOAD flags\n\
$doc = new DOMDocument();\n\
$doc->loadXML($xml, LIBXML_NONET | LIBXML_DTDLOAD);\n\
\n\
// Better: Use SimpleXML with safe options\n\
$xml = simplexml_load_string($data, 'SimpleXMLElement', LIBXML_NOENT);\n\
```"
}
"cs" => {
"```csharp\n\
XmlReaderSettings settings = new XmlReaderSettings();\n\
settings.DtdProcessing = DtdProcessing.Prohibit;\n\
settings.XmlResolver = null;\n\
\n\
using (XmlReader reader = XmlReader.Create(stream, settings))\n\
{\n\
// Process XML safely\n\
}\n\
```"
}
_ => "Disable external entity resolution in your XML parser configuration.",
}
}
pub struct XxeDetector {
repository_path: PathBuf,
max_findings: usize,
precomputed_cross: std::sync::OnceLock<Vec<crate::detectors::taint::TaintPath>>,
precomputed_intra: std::sync::OnceLock<Vec<crate::detectors::taint::TaintPath>>,
}
impl XxeDetector {
pub fn new(repository_path: impl Into<PathBuf>) -> Self {
Self {
repository_path: repository_path.into(),
max_findings: 50,
precomputed_cross: std::sync::OnceLock::new(),
precomputed_intra: std::sync::OnceLock::new(),
}
}
fn has_protection(content: &str, ext: &str) -> bool {
let patterns = get_protection_patterns(ext);
let content_lower = content.to_lowercase();
patterns
.iter()
.any(|p| content_lower.contains(&p.to_lowercase()))
}
fn has_user_input_flow(lines: &[&str], parse_line: usize) -> bool {
let start = parse_line.saturating_sub(10);
let context = lines[start..parse_line].join(" ");
USER_INPUT.is_match(&context)
}
}
impl Detector for XxeDetector {
fn name(&self) -> &'static str {
"xxe"
}
fn description(&self) -> &'static str {
"Detects XXE vulnerabilities"
}
fn bypass_postprocessor(&self) -> bool {
true
}
crate::detectors::impl_taint_precompute!();
fn taint_category(&self) -> Option<crate::detectors::taint::TaintCategory> {
Some(crate::detectors::taint::TaintCategory::PathTraversal)
}
fn file_extensions(&self) -> &'static [&'static str] {
&["py", "js", "ts", "jsx", "tsx", "rb", "php", "java"]
}
fn content_requirements(&self) -> crate::detectors::detector_context::ContentFlags {
crate::detectors::detector_context::ContentFlags::HAS_SERIALIZE
}
fn detect(
&self,
ctx: &crate::detectors::analysis_context::AnalysisContext,
) -> Result<Vec<Finding>> {
let graph = ctx.graph;
let files = &ctx.as_file_provider();
let mut findings = vec![];
for path in
files.files_with_extensions(&["py", "js", "ts", "java", "php", "cs", "rb", "go"])
{
if findings.len() >= self.max_findings {
break;
}
let path_str = path.to_string_lossy().to_string();
if crate::detectors::base::is_test_path(&path_str) {
continue;
}
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
if let Some(content) = files.content(path) {
let file_has_any_protection = Self::has_protection(&content, ext);
let lines: Vec<&str> = content.lines().collect();
for (i, line) in lines.iter().enumerate() {
let prev_line = if i > 0 { Some(lines[i - 1]) } else { None };
if crate::detectors::is_line_suppressed(line, prev_line) {
continue;
}
if !XXE_PATTERN.is_match(line) {
continue;
}
let trimmed = line.trim();
if trimmed.starts_with("from ") || trimmed.starts_with("import ") {
continue;
}
let has_parse_call = line.contains(".parse(")
|| line.contains(".parseString(")
|| line.contains("XMLParser(")
|| line.contains("DocumentBuilder")
|| line.contains("SAXParser(")
|| line.contains("XMLReader(");
if !has_parse_call {
continue;
}
if ext == "js" {
let trimmed_line = line.trim();
if trimmed_line.ends_with("false,")
|| trimmed_line.ends_with("true,")
|| trimmed_line.ends_with("false")
|| trimmed_line.ends_with("true")
{
continue;
}
}
if file_has_any_protection {
let local_start = i.saturating_sub(15);
let local_end = (i + 15).min(lines.len());
let local_context = lines[local_start..local_end].join("\n");
if Self::has_protection(&local_context, ext) {
continue; }
}
let has_user_input = Self::has_user_input_flow(&lines, i);
let func_context = graph.find_function_at(&path_str, (i + 1) as u32).map(|f| {
let callers =
graph.get_callers(f.qn(crate::graph::interner::global_interner()));
let has_external_callers = callers.iter().any(|c| {
let name = c
.node_name(crate::graph::interner::global_interner())
.to_lowercase();
name.contains("route")
|| name.contains("handler")
|| name.contains("api")
|| name.contains("upload")
|| name.contains("import")
|| name.contains("parse")
});
(
f.node_name(crate::graph::interner::global_interner())
.to_string(),
has_external_callers,
)
});
let severity = if has_user_input {
Severity::Critical } else {
Severity::High };
let mut notes = Vec::new();
if has_user_input {
notes.push("⚠️ User input flows to XML parser".to_string());
}
if let Some((func_name, external)) = &func_context {
notes.push(format!("📦 In function: `{}`", func_name));
if *external {
notes.push("🌐 Called from route handlers".to_string());
}
}
notes.push(format!("❌ No XXE protection detected for {}", ext));
let context_notes = format!("\n\n**Analysis:**\n{}", notes.join("\n"));
findings.push(Finding {
id: String::new(),
detector: "XxeDetector".to_string(),
severity,
title: "XML External Entity (XXE) vulnerability".to_string(),
description: format!(
"XML parser processes external entities without proper restrictions.{}",
context_notes
),
affected_files: vec![path.to_path_buf()],
line_start: Some((i + 1) as u32),
line_end: Some((i + 1) as u32),
suggested_fix: Some(get_fix_example(ext).to_string()),
estimated_effort: Some("20 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-611".to_string()),
why_it_matters: Some(
"XXE vulnerabilities allow attackers to:\n\
• Read arbitrary files from the server (file:///etc/passwd)\n\
• Perform SSRF attacks (http://internal-server/)\n\
• Denial of service (billion laughs attack)\n\
• Port scanning of internal networks"
.to_string(),
),
..Default::default()
});
}
}
}
let intra_paths = if let Some(intra) = self.precomputed_intra.get() {
intra.clone()
} else {
let taint_analyzer = crate::detectors::taint::TaintAnalyzer::new();
crate::detectors::taint::run_intra_function_taint(
&taint_analyzer,
graph,
crate::detectors::taint::TaintCategory::PathTraversal,
&self.repository_path,
)
};
let mut seen: std::collections::HashSet<(String, u32)> = findings
.iter()
.filter_map(|f| {
f.affected_files
.first()
.map(|p| (p.to_string_lossy().to_string(), f.line_start.unwrap_or(0)))
})
.collect();
for path in intra_paths.iter().filter(|p| !p.is_sanitized) {
let loc = (path.sink_file.clone(), path.sink_line);
if !seen.insert(loc) {
continue;
}
findings.push(crate::detectors::taint::taint_path_to_finding(
path,
"XxeDetector",
"XML External Entity Injection",
));
if findings.len() >= self.max_findings {
break;
}
}
info!(
"XxeDetector found {} findings (graph-aware + taint)",
findings.len()
);
Ok(findings)
}
}
impl crate::detectors::RegisteredDetector for XxeDetector {
fn create(init: &crate::detectors::DetectorInit) -> std::sync::Arc<dyn Detector> {
std::sync::Arc::new(Self::new(init.repo_path))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::builder::GraphBuilder;
#[test]
fn test_detects_xxe_without_protection() {
let store = GraphBuilder::new().freeze();
let detector = XxeDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"parser.py",
"\nfrom lxml import etree\ntree = etree.parse(xml_file)\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect XML parsing without XXE protection"
);
assert!(findings.iter().any(|f| f.detector == "XxeDetector"));
}
#[test]
fn test_no_finding_with_defusedxml() {
let store = GraphBuilder::new().freeze();
let detector = XxeDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"safe_parser.py",
"\nimport defusedxml.ElementTree as ET\ntree = ET.parse(xml_file)\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag XML parsing with defusedxml protection, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_import_only() {
let store = GraphBuilder::new().freeze();
let detector = XxeDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"parser.py",
"from xml.dom import minidom, pulldom\nimport xml.etree.ElementTree as ET\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag import-only lines. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_with_custom_defused_parser() {
let store = GraphBuilder::new().freeze();
let detector = XxeDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("serializer.py", "class DefusedExpatParser:\n feature_external_ges = False\n feature_external_pes = False\n def reset(self):\n raise DTDForbidden()\n\ndef deserialize(stream):\n event_stream = pulldom.parse(stream, parser)\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag XML parsing when custom defused parser exists. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_js_static_data() {
let store = GraphBuilder::new().freeze();
let detector = XxeDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"globals.js",
"var globals = {\n \"DOMParser\": false,\n \"XMLHttpRequest\": false,\n};\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag JS static data. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
}