use crate::detectors::base::{Detector, DetectorConfig};
use crate::graph::GraphQueryExt;
use crate::models::{deterministic_finding_id, Finding, Severity};
use anyhow::Result;
use regex::Regex;
use std::path::PathBuf;
use std::sync::LazyLock;
use tracing::info;
static REGEX_CREATE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"(?i)(Regex::new|re\.compile|new RegExp|Pattern\.compile|regex!|/[^/]+/)")
.expect("valid regex")
});
static VULNERABLE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"\([^)]*[+*][^)]*\)[+*]|\.\*\.\*|\(\?:[^)]*\)[+*]{2}|\[[^\]]*\][+*]\[[^\]]*\][+*]")
.expect("valid regex")
});
fn is_vulnerable_pattern(pattern: &str) -> Option<&'static str> {
if pattern.contains(")+)") || pattern.contains(")*)*") || pattern.contains("+)+") {
return Some("nested quantifiers");
}
if pattern.contains("(a|a)") || pattern.contains("(.+)*") || pattern.contains("(.*)+") {
return Some("greedy quantifier on group");
}
let evil_patterns = [
"(a+)+", "(a*)*", "(a|aa)+", "(.*a){", "([a-zA-Z]+)*", ];
for evil in evil_patterns {
if pattern.contains(evil) {
return Some("classic ReDoS pattern");
}
}
None
}
pub struct RegexDosDetector {
#[allow(dead_code)] repository_path: PathBuf,
max_findings: usize,
}
impl RegexDosDetector {
crate::detectors::detector_new!(50);
fn find_function_context(
graph: &dyn crate::graph::GraphQuery,
file_path: &str,
line: u32,
) -> Option<(String, usize, bool)> {
let i = graph.interner();
graph.find_function_at(file_path, line).map(|f| {
let callers = graph.get_callers(f.qn(i));
let name_lower = f.node_name(i).to_lowercase();
let processes_input = name_lower.contains("validate")
|| name_lower.contains("parse")
|| name_lower.contains("match")
|| name_lower.contains("search")
|| name_lower.contains("filter")
|| name_lower.contains("handler")
|| name_lower.contains("route");
(f.node_name(i).to_string(), callers.len(), processes_input)
})
}
fn uses_user_input(lines: &[&str], current_line: usize) -> bool {
let start = current_line.saturating_sub(5);
let end = (current_line + 5).min(lines.len());
let context = lines[start..end].join(" ").to_lowercase();
context.contains("req.")
|| context.contains("request.")
|| context.contains("input")
|| context.contains("body")
|| context.contains("params")
|| context.contains("query")
|| context.contains("user")
|| context.contains("data")
}
fn extract_pattern(line: &str) -> Option<String> {
let patterns = [
(r#"new RegExp\(["']"#, r#"["']"#),
(r#"re\.compile\(r?["']"#, r#"["']"#),
(r#"Regex::new\(r?#?"#, r#"[#"]?"#),
(r#"/"#, r#"/"#),
];
for (start, end) in patterns {
if let Some(s_idx) = line.find(start) {
let after_start = &line[s_idx + start.len()..];
if let Some(e_idx) = after_start.find(end) {
return Some(after_start[..e_idx].to_string());
}
}
}
None
}
}
impl Detector for RegexDosDetector {
fn name(&self) -> &'static str {
"regex-dos"
}
fn description(&self) -> &'static str {
"Detects ReDoS vulnerable patterns"
}
fn bypass_postprocessor(&self) -> bool {
true
}
fn file_extensions(&self) -> &'static [&'static str] {
&["py", "js", "ts", "jsx", "tsx", "rb", "java", "go"]
}
fn detect(
&self,
ctx: &crate::detectors::analysis_context::AnalysisContext,
) -> Result<Vec<Finding>> {
let graph = ctx.graph;
let files = &ctx.as_file_provider();
let mut findings = vec![];
for path in
files.files_with_extensions(&["py", "js", "ts", "java", "rs", "go", "rb", "php"])
{
if findings.len() >= self.max_findings {
break;
}
let path_str = path.to_string_lossy().to_string();
if crate::detectors::base::is_test_path(&path_str) {
continue;
}
if path_str.contains("/detectors/") && path_str.ends_with(".rs") {
continue;
}
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
if let Some(content) = files.content(path) {
let lines: Vec<&str> = content.lines().collect();
for (i, line) in lines.iter().enumerate() {
let prev_line = if i > 0 { Some(lines[i - 1]) } else { None };
if crate::detectors::is_line_suppressed(line, prev_line) {
continue;
}
let trimmed = line.trim();
if trimmed.starts_with("//")
|| trimmed.starts_with("#")
|| trimmed.starts_with("*")
{
continue;
}
if !REGEX_CREATE.is_match(line) {
continue;
}
let is_vuln = VULNERABLE.is_match(line);
let pattern = Self::extract_pattern(line);
let extra_vuln = pattern.as_ref().and_then(|p| is_vulnerable_pattern(p));
if !is_vuln && extra_vuln.is_none() {
continue;
}
let line_num = (i + 1) as u32;
let func_context = Self::find_function_context(graph, &path_str, line_num);
let uses_input = Self::uses_user_input(&lines, i);
let mut severity = Severity::High;
if uses_input {
severity = Severity::Critical;
}
if let Some((_, _, processes_input)) = &func_context {
if *processes_input {
severity = Severity::Critical;
}
}
if !uses_input && func_context.is_none() {
severity = Severity::Medium;
}
let mut notes = Vec::new();
if let Some(vuln_type) = extra_vuln {
notes.push(format!("🔍 Pattern issue: {}", vuln_type));
} else {
notes.push("🔍 Pattern issue: nested quantifiers detected".to_string());
}
if let Some(pat) = &pattern {
let display_pat = if pat.len() > 50 {
format!("{}...", &pat[..50])
} else {
pat.clone()
};
notes.push(format!("📝 Pattern: `{}`", display_pat));
}
if let Some((func_name, callers, processes_input)) = &func_context {
notes.push(format!(
"📦 In function: `{}` ({} callers)",
func_name, callers
));
if *processes_input {
notes.push("⚠️ Function appears to process user input".to_string());
}
}
if uses_input {
notes.push("🎯 User input detected in context".to_string());
}
let context_notes = format!("\n\n**Analysis:**\n{}", notes.join("\n"));
let suggestion = match ext {
"js" | "ts" => "Rewrite the regex to avoid catastrophic backtracking:\n\
```javascript\n\
// Instead of:\n\
const regex = /(a+)+$/; // Vulnerable!\n\
\n\
// Use:\n\
const regex = /a+$/; // Non-nested quantifier\n\
\n\
// Or use possessive quantifiers (if supported):\n\
// Or add input length limits before regex matching:\n\
if (input.length > 1000) throw new Error('Input too long');\n\
```"
.to_string(),
"py" => "Rewrite the regex to avoid catastrophic backtracking:\n\
```python\n\
# Instead of:\n\
pattern = re.compile(r'(a+)+')\n\
\n\
# Use:\n\
pattern = re.compile(r'a+')\n\
\n\
# Or set a timeout using the regex module:\n\
import regex\n\
pattern = regex.compile(r'...', timeout=1.0)\n\
```"
.to_string(),
"rs" => "Consider using bounded repetition or atomic groups:\n\
```rust\n\
// Use the regex crate which has built-in protections\n\
// Or add length limits:\n\
if input.len() > 10_000 { return Err(\"Input too long\"); }\n\
```"
.to_string(),
_ => {
"Rewrite regex to avoid nested quantifiers and add input length limits."
.to_string()
}
};
findings.push(Finding {
id: String::new(),
detector: "RegexDosDetector".to_string(),
severity,
title: "Potential ReDoS vulnerability".to_string(),
description: format!(
"Regex with nested quantifiers may cause catastrophic backtracking, \
leading to denial of service.{}",
context_notes
),
affected_files: vec![path.to_path_buf()],
line_start: Some(line_num),
line_end: Some(line_num),
suggested_fix: Some(suggestion),
estimated_effort: Some("30 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-1333".to_string()),
why_it_matters: Some(
"ReDoS attacks exploit regex backtracking to consume CPU:\n\
• A malicious input can take exponential time to evaluate\n\
• Single requests can freeze the server\n\
• Input like 'aaaaaaaaaaaaaaaaaaaaaaaaaaaa!' on /(a+)+$/ can hang"
.to_string(),
),
..Default::default()
});
}
}
}
info!(
"RegexDosDetector found {} findings (graph-aware)",
findings.len()
);
Ok(findings)
}
}
impl crate::detectors::RegisteredDetector for RegexDosDetector {
fn create(init: &crate::detectors::DetectorInit) -> std::sync::Arc<dyn Detector> {
std::sync::Arc::new(Self::new(init.repo_path))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::builder::GraphBuilder;
#[test]
fn test_detects_redos_nested_quantifier() {
let store = GraphBuilder::new().freeze();
let detector = RegexDosDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("validate.py", "\nimport re\nuser_input = input(\"Enter data: \")\npattern = re.compile(r'(a+)+$')\npattern.match(user_input)\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect catastrophic backtracking regex (a+)+"
);
assert!(findings.iter().any(|f| f.detector == "RegexDosDetector"));
}
#[test]
fn test_no_finding_for_safe_regex() {
let store = GraphBuilder::new().freeze();
let detector = RegexDosDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"utils.py",
"\nimport re\npattern = re.compile(r'^[a-zA-Z0-9]+$')\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag safe regex without nested quantifiers, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
}