use crate::ir::data_surface::{TaintSinkType, TaintSourceType};
use crate::ir::{ArgumentSource, ScanTarget};
use crate::rules::{
AttackCategory, Confidence, Detector, Evidence, Finding, RuleMetadata, Severity,
};
pub struct MetadataSsrfDetector;
const METADATA_ENDPOINTS: &[&str] = &[
"169.254.169.254", "metadata.google.internal", "metadata.google", "100.100.100.200", "169.254.170.2", ];
const PRIVATE_PATTERNS: &[&str] = &[
"10.", "172.16.", "172.17.", "172.18.", "172.19.", "172.20.", "172.21.", "172.22.", "172.23.", "172.24.",
"172.25.", "172.26.", "172.27.", "172.28.", "172.29.", "172.30.", "172.31.",
"192.168.", "127.", "0.", "[::1]", "[fd", "[fe80:", ];
fn is_metadata_or_private(url: &str) -> Option<&'static str> {
let url_lower = url.to_lowercase();
if METADATA_ENDPOINTS.iter().any(|ep| url_lower.contains(ep)) {
return Some("cloud metadata endpoint");
}
if PRIVATE_PATTERNS.iter().any(|pat| url_lower.contains(pat)) {
return Some("private network");
}
None
}
impl Detector for MetadataSsrfDetector {
fn metadata(&self) -> RuleMetadata {
RuleMetadata {
id: "SHIELD-013".into(),
name: "Metadata SSRF".into(),
description: "Tool arguments flow to HTTP requests that could target \
cloud metadata endpoints or private networks"
.into(),
default_severity: Severity::Critical,
attack_category: AttackCategory::Ssrf,
cwe_id: Some("CWE-918".into()),
}
}
fn run(&self, target: &ScanTarget) -> Vec<Finding> {
let mut findings = Vec::new();
for path in &target.data.taint_paths {
if matches!(path.source.source_type, TaintSourceType::ToolArgument)
&& matches!(path.sink.sink_type, TaintSinkType::HttpRequest)
{
findings.push(Finding {
rule_id: "SHIELD-013".into(),
rule_name: "Metadata SSRF".into(),
severity: Severity::Critical,
confidence: Confidence::High,
attack_category: AttackCategory::Ssrf,
message: format!(
"Tool parameter '{}' flows to HTTP request '{}' without URL \
validation — could target cloud metadata or private networks",
path.source.description, path.sink.description
),
location: Some(path.sink.location.clone()),
evidence: vec![
Evidence {
description: format!(
"Source: tool parameter '{}'",
path.source.description
),
location: Some(path.source.location.clone()),
snippet: None,
},
Evidence {
description: format!(
"Sink: HTTP request via '{}'",
path.sink.description
),
location: Some(path.sink.location.clone()),
snippet: None,
},
],
taint_path: Some(path.clone()),
remediation: Some(
"Validate URLs against an allowlist before making requests. \
Block private IP ranges (10.x, 172.16-31.x, 192.168.x), \
link-local (169.254.x), and cloud metadata endpoints \
(169.254.169.254)."
.into(),
),
cwe_id: Some("CWE-918".into()),
});
}
}
for net_op in &target.execution.network_operations {
if let ArgumentSource::Literal(ref url) = net_op.url_arg {
if let Some(target_type) = is_metadata_or_private(url) {
findings.push(Finding {
rule_id: "SHIELD-013".into(),
rule_name: "Metadata SSRF".into(),
severity: Severity::Critical,
confidence: Confidence::High,
attack_category: AttackCategory::Ssrf,
message: format!(
"'{}' makes request to {} ({})",
net_op.function, target_type, url
),
location: Some(net_op.location.clone()),
evidence: vec![Evidence {
description: format!("Hardcoded {} URL: {}", target_type, url),
location: Some(net_op.location.clone()),
snippet: None,
}],
taint_path: None,
remediation: Some(
"Remove direct access to metadata endpoints and private \
networks. Use cloud provider SDKs for metadata access."
.into(),
),
cwe_id: Some("CWE-918".into()),
});
}
}
}
findings
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ir::data_surface::*;
use crate::ir::execution_surface::*;
use crate::ir::*;
use std::path::PathBuf;
fn loc() -> SourceLocation {
SourceLocation {
file: PathBuf::from("test.py"),
line: 5,
column: 0,
end_line: None,
end_column: None,
}
}
fn empty_target() -> ScanTarget {
ScanTarget {
name: "test".into(),
framework: Framework::Mcp,
root_path: PathBuf::from("."),
tools: vec![],
execution: ExecutionSurface::default(),
data: DataSurface::default(),
dependencies: Default::default(),
provenance: Default::default(),
source_files: vec![],
}
}
#[test]
fn detects_taint_path_to_http() {
let mut target = empty_target();
target.data.taint_paths.push(TaintPath {
source: TaintSource {
source_type: TaintSourceType::ToolArgument,
description: "url".into(),
location: loc(),
},
sink: TaintSink {
sink_type: TaintSinkType::HttpRequest,
description: "requests.get".into(),
location: loc(),
},
through: vec![],
confidence: 0.9,
});
let findings = MetadataSsrfDetector.run(&target);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].rule_id, "SHIELD-013");
assert_eq!(findings[0].severity, Severity::Critical);
assert!(findings[0].taint_path.is_some());
}
#[test]
fn detects_literal_metadata_url() {
let mut target = empty_target();
target.execution.network_operations.push(NetworkOperation {
function: "requests.get".into(),
url_arg: ArgumentSource::Literal("http://169.254.169.254/latest/meta-data/".into()),
method: Some("GET".into()),
sends_data: false,
location: loc(),
});
let findings = MetadataSsrfDetector.run(&target);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].rule_id, "SHIELD-013");
assert!(findings[0].message.contains("cloud metadata endpoint"));
}
#[test]
fn detects_literal_private_ip() {
let mut target = empty_target();
target.execution.network_operations.push(NetworkOperation {
function: "fetch".into(),
url_arg: ArgumentSource::Literal("http://192.168.1.1/admin".into()),
method: Some("GET".into()),
sends_data: false,
location: loc(),
});
let findings = MetadataSsrfDetector.run(&target);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].rule_id, "SHIELD-013");
assert!(findings[0].message.contains("private network"));
}
#[test]
fn no_finding_for_public_url() {
let mut target = empty_target();
target.execution.network_operations.push(NetworkOperation {
function: "requests.get".into(),
url_arg: ArgumentSource::Literal("https://api.example.com/data".into()),
method: Some("GET".into()),
sends_data: false,
location: loc(),
});
let findings = MetadataSsrfDetector.run(&target);
assert!(findings.is_empty());
}
#[test]
fn no_finding_for_sanitized_arg() {
let mut target = empty_target();
target.execution.network_operations.push(NetworkOperation {
function: "requests.get".into(),
url_arg: ArgumentSource::Sanitized {
sanitizer: "validate_url".into(),
},
method: Some("GET".into()),
sends_data: false,
location: loc(),
});
let findings = MetadataSsrfDetector.run(&target);
assert!(findings.is_empty());
}
#[test]
fn detects_alibaba_metadata() {
let mut target = empty_target();
target.execution.network_operations.push(NetworkOperation {
function: "urllib.request.urlopen".into(),
url_arg: ArgumentSource::Literal("http://100.100.100.200/latest/meta-data/".into()),
method: Some("GET".into()),
sends_data: false,
location: loc(),
});
let findings = MetadataSsrfDetector.run(&target);
assert_eq!(findings.len(), 1);
assert!(findings[0].message.contains("cloud metadata endpoint"));
}
#[test]
fn detects_gcp_metadata() {
let mut target = empty_target();
target.execution.network_operations.push(NetworkOperation {
function: "requests.get".into(),
url_arg: ArgumentSource::Literal(
"http://metadata.google.internal/computeMetadata/v1/".into(),
),
method: Some("GET".into()),
sends_data: false,
location: loc(),
});
let findings = MetadataSsrfDetector.run(&target);
assert_eq!(findings.len(), 1);
assert!(findings[0].message.contains("cloud metadata endpoint"));
}
#[test]
fn no_overlap_with_parameter_url() {
let mut target = empty_target();
target.execution.network_operations.push(NetworkOperation {
function: "requests.get".into(),
url_arg: ArgumentSource::Parameter { name: "url".into() },
method: Some("GET".into()),
sends_data: false,
location: loc(),
});
let findings = MetadataSsrfDetector.run(&target);
assert!(
findings.is_empty(),
"Phase 2 should not fire on Parameter sources (that's SHIELD-003)"
);
}
}