use crate::detectors::base::{Detector, DetectorConfig};
use crate::detectors::fast_search::{find_in, *};
use crate::detectors::taint::{TaintAnalysisResult, TaintAnalyzer, TaintCategory};
use crate::models::{deterministic_finding_id, Finding, Severity};
use anyhow::Result;
use regex::Regex;
use std::path::PathBuf;
use std::sync::LazyLock;
static HTTP_CLIENT: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"(?i)(requests\.(get|post|put|delete)|fetch\(|axios\.|http\.get|urllib|urlopen|HttpClient|curl)").expect("valid regex")
});
pub struct SsrfDetector {
repository_path: PathBuf,
max_findings: usize,
taint_analyzer: TaintAnalyzer,
precomputed_cross: std::sync::OnceLock<Vec<crate::detectors::taint::TaintPath>>,
precomputed_intra: std::sync::OnceLock<Vec<crate::detectors::taint::TaintPath>>,
}
impl SsrfDetector {
pub fn new(repository_path: impl Into<PathBuf>) -> Self {
Self {
repository_path: repository_path.into(),
max_findings: 50,
taint_analyzer: TaintAnalyzer::new(),
precomputed_cross: std::sync::OnceLock::new(),
precomputed_intra: std::sync::OnceLock::new(),
}
}
}
impl Detector for SsrfDetector {
fn name(&self) -> &'static str {
"ssrf"
}
fn description(&self) -> &'static str {
"Detects SSRF vulnerabilities"
}
fn bypass_postprocessor(&self) -> bool {
true
}
crate::detectors::impl_taint_precompute!();
fn taint_category(&self) -> Option<crate::detectors::taint::TaintCategory> {
Some(TaintCategory::Ssrf)
}
fn file_extensions(&self) -> &'static [&'static str] {
&["py", "js", "ts", "jsx", "tsx", "rb", "php", "java", "go"]
}
fn content_requirements(&self) -> crate::detectors::detector_context::ContentFlags {
crate::detectors::detector_context::ContentFlags::HAS_HTTP_CLIENT
}
fn detect(
&self,
ctx: &crate::detectors::analysis_context::AnalysisContext,
) -> Result<Vec<Finding>> {
let graph = ctx.graph;
let files = &ctx.as_file_provider();
let mut findings = vec![];
let mut taint_paths = if let Some(cross) = self.precomputed_cross.get() {
cross.clone()
} else {
self.taint_analyzer.trace_taint(graph, TaintCategory::Ssrf)
};
let intra_paths = if let Some(intra) = self.precomputed_intra.get() {
intra.clone()
} else {
crate::detectors::taint::run_intra_function_taint(
&self.taint_analyzer,
graph,
TaintCategory::Ssrf,
&self.repository_path,
)
};
taint_paths.extend(intra_paths);
let taint_result = TaintAnalysisResult::from_paths(taint_paths);
for path in files
.files_with_extensions(&["py", "js", "ts", "jsx", "tsx", "rb", "php", "java", "go"])
{
if findings.len() >= self.max_findings {
break;
}
if let Some(content) = files.content(path) {
let file_str = path.to_string_lossy();
let lines: Vec<&str> = content.lines().collect();
for (i, line) in lines.iter().enumerate() {
let prev_line = if i > 0 { Some(lines[i - 1]) } else { None };
if crate::detectors::is_line_suppressed(line, prev_line) {
continue;
}
if HTTP_CLIENT.is_match(line) {
if find_in(&FIND_FETCH_SLASH_SINGLE, line)
|| find_in(&FIND_FETCH_BACKTICK_SLASH, line)
|| find_in(&FIND_FETCH_DQUOTE_SLASH, line)
{
continue;
}
if find_in(&FIND_API_URL, line)
|| find_in(&FIND_BASE_URL, line)
|| find_in(&FIND_SERVER_URL, line)
|| find_in(&FIND_BACKEND_URL, line)
|| find_in(&FIND_API_URL_CAMEL, line)
|| find_in(&FIND_BASE_URL_CAMEL, line)
{
let has_dynamic_path = find_in(&FIND_PARAMS, line)
|| find_in(&FIND_DOT_QUERY, line)
|| (find_in(&FIND_DOLLAR_BRACE, line)
&& !line.contains("${API_URL")
&& !line.contains("${BASE_URL")
&& !line.contains("${SERVER_URL"));
if !has_dynamic_path {
continue;
}
}
let is_env_sourced = {
let context_start = i.saturating_sub(20);
let context = &lines[context_start..=i];
let context_str = context.join("\n").to_lowercase();
find_in(&FIND_PROCESS_ENV, &context_str)
|| context_str.contains("env.get(")
|| context_str.contains("os.environ")
|| context_str.contains("std::env")
|| context_str.contains("config.")
|| context_str.contains("options.base")
|| context_str.contains("baseurl")
|| context_str.contains("base_url")
};
if is_env_sourced {
continue;
}
let has_user_input = find_in(&FIND_REQ_DOT, line)
|| find_in(&FIND_REQUEST_BODY, line)
|| find_in(&FIND_REQUEST_QUERY, line)
|| find_in(&FIND_REQUEST_PARAMS, line)
|| find_in(&FIND_CTX_PARAMS, line)
|| find_in(&FIND_CTX_QUERY, line);
if has_user_input {
let line_num = (i + 1) as u32;
let matching_taint = taint_result.paths.iter().find(|p| {
(p.sink_file == file_str || p.source_file == file_str)
&& (p.sink_line == line_num || p.source_line == line_num)
});
let (severity, description) = match matching_taint {
Some(taint_path) if taint_path.is_sanitized => {
(Severity::Low, format!(
"HTTP request with user-controlled URL.\n\n\
**Taint Analysis Note**: A sanitizer function (`{}`) was found \
in the data flow path, which may mitigate this vulnerability.",
taint_path.sanitizer.as_deref().unwrap_or("unknown")
))
}
Some(taint_path) => {
(Severity::Critical, format!(
"HTTP request with user-controlled URL.\n\n\
**Taint Analysis Confirmed**: Data flow analysis traced a path \
from user input to this SSRF sink without sanitization:\n\n\
`{}`",
taint_path.path_string()
))
}
None => {
(
Severity::High,
"HTTP request with user-controlled URL.".to_string(),
)
}
};
findings.push(Finding {
id: String::new(),
detector: "SsrfDetector".to_string(),
severity,
title: "Potential SSRF vulnerability".to_string(),
description,
affected_files: vec![path.to_path_buf()],
line_start: Some(line_num),
line_end: Some(line_num),
suggested_fix: Some(
"Validate URL against allowlist, block internal IPs."
.to_string(),
),
estimated_effort: Some("45 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-918".to_string()),
why_it_matters: Some(
"Attackers could access internal services.".to_string(),
),
..Default::default()
});
}
}
}
}
}
findings.retain(|f| f.severity != Severity::Low);
Ok(findings)
}
}
impl crate::detectors::RegisteredDetector for SsrfDetector {
fn create(init: &crate::detectors::DetectorInit) -> std::sync::Arc<dyn Detector> {
std::sync::Arc::new(Self::new(init.repo_path))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::detectors::base::Detector;
use crate::graph::builder::GraphBuilder;
#[test]
fn test_detects_requests_get_with_user_input() {
let store = GraphBuilder::new().freeze();
let detector = SsrfDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("vuln.py", "import requests\n\ndef fetch_url(req):\n url = req.body.get(\"url\")\n response = requests.get(req.body[\"url\"])\n return response.text\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect requests.get with user-controlled URL from req.body"
);
assert!(
findings.iter().any(|f| f.title.contains("SSRF")),
"Finding should mention SSRF. Titles: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
assert!(
findings
.iter()
.any(|f| f.cwe_id.as_deref() == Some("CWE-918")),
"Finding should have CWE-918"
);
}
#[test]
fn test_no_findings_for_hardcoded_url() {
let store = GraphBuilder::new().freeze();
let detector = SsrfDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("safe.py", "import requests\n\ndef fetch_data():\n response = requests.get(\"https://api.example.com/data\")\n return response.json()\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Hardcoded URL should have no SSRF findings, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_fetch_with_user_input_in_js() {
let store = GraphBuilder::new().freeze();
let detector = SsrfDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("proxy.js", "async function proxyRequest(req, res) {\n const targetUrl = req.body.url;\n const response = await fetch(req.body.url);\n const data = await response.json();\n res.json(data);\n}\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect fetch() with user-controlled URL from req.body"
);
assert!(
findings
.iter()
.any(|f| f.cwe_id.as_deref() == Some("CWE-918")),
"Finding should have CWE-918"
);
}
#[test]
fn test_detects_urllib_with_user_input_in_python() {
let store = GraphBuilder::new().freeze();
let detector = SsrfDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("handler.py", "from urllib.request import urlopen\n\ndef fetch(request):\n url = request.query.get('target')\n response = urlopen(request.query['target'])\n return response.read()\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect urlopen with user-controlled URL from request.query"
);
assert!(
findings.iter().any(|f| f.title.contains("SSRF")),
"Finding should mention SSRF"
);
}
#[test]
fn test_no_finding_for_env_sourced_url() {
let store = GraphBuilder::new().freeze();
let detector = SsrfDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("client.py", "import os\nimport requests\n\ndef call_api():\n base = os.environ.get('API_HOST')\n response = requests.get(base + '/health')\n return response.status_code\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"URL sourced from environment variable should not trigger SSRF, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_relative_fetch() {
let store = GraphBuilder::new().freeze();
let detector = SsrfDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("api.js", "async function loadData(req, res) {\n const data = await fetch('/api/users');\n res.json(await data.json());\n}\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Relative URL fetch should not trigger SSRF, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
}