use crate::detectors::base::{Detector, DetectorConfig};
use crate::models::{deterministic_finding_id, Finding, Severity};
use anyhow::Result;
use regex::Regex;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::LazyLock;
use tracing::info;
static BROAD_EXCEPT: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"(?i)(except\s*:|catch\s*\(\s*(Exception|Error|Throwable|BaseException|\w)\s*\)|catch\s*\{)").expect("valid regex")
});
pub struct BroadExceptionDetector {
#[allow(dead_code)] repository_path: PathBuf,
max_findings: usize,
}
impl BroadExceptionDetector {
crate::detectors::detector_new!(50);
fn analyze_try_block(lines: &[&str], catch_line: usize) -> HashSet<String> {
let call_re = Regex::new(r"\b([a-zA-Z_][a-zA-Z0-9_]*)\s*\(").expect("valid regex");
let mut calls = HashSet::new();
let mut try_start = None;
for i in (0..catch_line).rev() {
let trimmed = lines[i].trim();
if trimmed.starts_with("try") {
try_start = Some(i);
break;
}
}
if let Some(start) = try_start {
for line in lines.get(start..catch_line).unwrap_or(&[]) {
for cap in call_re.captures_iter(line) {
if let Some(m) = cap.get(1) {
let name = m.as_str();
if !["try", "if", "for", "while", "print"].contains(&name) {
calls.insert(name.to_string());
}
}
}
}
}
calls
}
fn suggest_exceptions(calls: &HashSet<String>, ext: &str) -> Vec<String> {
let mut suggestions = Vec::new();
let file_ops = ["open", "read", "write", "close"];
let network_ops = ["fetch", "request", "get", "post", "connect", "send"];
let parse_ops = ["parse", "json", "loads", "dumps", "decode", "encode"];
let db_ops = ["query", "execute", "commit", "rollback", "cursor"];
for call in calls {
let call_lower = call.to_lowercase();
if file_ops.iter().any(|op| call_lower.contains(op)) {
match ext {
"py" => {
suggestions.push("IOError, FileNotFoundError, PermissionError".to_string())
}
"java" => suggestions.push("IOException, FileNotFoundException".to_string()),
"js" | "ts" => {
suggestions.push("Error (check error.code for ENOENT, EACCES)".to_string())
}
_ => suggestions.push("File I/O exceptions".to_string()),
}
}
if network_ops.iter().any(|op| call_lower.contains(op)) {
match ext {
"py" => suggestions.push(
"requests.RequestException, urllib.error.URLError, ConnectionError"
.to_string(),
),
"java" => suggestions
.push("IOException, SocketException, HttpClientErrorException".to_string()),
"js" | "ts" => {
suggestions.push("TypeError (network errors), AbortError".to_string())
}
_ => suggestions.push("Network/HTTP exceptions".to_string()),
}
}
if parse_ops.iter().any(|op| call_lower.contains(op)) {
match ext {
"py" => suggestions
.push("json.JSONDecodeError, ValueError, UnicodeDecodeError".to_string()),
"java" => {
suggestions.push("JsonParseException, NumberFormatException".to_string())
}
"js" | "ts" => suggestions.push("SyntaxError (for JSON.parse)".to_string()),
_ => suggestions.push("Parse/decode exceptions".to_string()),
}
}
if db_ops.iter().any(|op| call_lower.contains(op)) {
match ext {
"py" => {
suggestions.push("sqlite3.Error, psycopg2.Error, pymysql.Error".to_string())
}
"java" => suggestions.push("SQLException, DataAccessException".to_string()),
_ => suggestions.push("Database exceptions".to_string()),
}
}
}
suggestions.sort();
suggestions.dedup();
suggestions
}
}
impl Detector for BroadExceptionDetector {
fn name(&self) -> &'static str {
"broad-exception"
}
fn description(&self) -> &'static str {
"Detects overly broad exception catching"
}
fn requires_graph(&self) -> bool {
false
}
fn file_extensions(&self) -> &'static [&'static str] {
&["py", "js", "ts", "jsx", "tsx", "java", "go", "rs"]
}
fn detect(
&self,
ctx: &crate::detectors::analysis_context::AnalysisContext,
) -> Result<Vec<Finding>> {
let files = &ctx.as_file_provider();
let mut findings = vec![];
for path in
files.files_with_extensions(&["py", "js", "ts", "jsx", "tsx", "java", "go", "rs"])
{
if findings.len() >= self.max_findings {
break;
}
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
if let Some(content) = files.content(path) {
let lines: Vec<&str> = content.lines().collect();
for (i, line) in lines.iter().enumerate() {
let prev_line = if i > 0 { Some(lines[i - 1]) } else { None };
if crate::detectors::is_line_suppressed(line, prev_line) {
continue;
}
if BROAD_EXCEPT.is_match(line) {
let next_lines = lines
.get(i + 1..i + 4)
.map(|s| s.join(" "))
.unwrap_or_default();
if next_lines.contains("raise") || next_lines.contains("throw") {
continue;
}
let calls = Self::analyze_try_block(&lines, i);
let suggestions = Self::suggest_exceptions(&calls, ext);
let mut notes = Vec::new();
if !calls.is_empty() {
let call_list: Vec<_> = calls.iter().take(5).cloned().collect();
notes.push(format!("📞 Try block calls: {}", call_list.join(", ")));
}
let context_notes = if notes.is_empty() {
String::new()
} else {
format!("\n\n**Analysis:**\n{}", notes.join("\n"))
};
let severity = if suggestions.len() >= 2 {
Severity::Medium } else {
Severity::Low
};
let suggestion = if !suggestions.is_empty() {
format!(
"Based on the operations in your try block, consider catching:\n{}\n\n\
Example:\n\
```python\n\
except ({}) as e:\n\
logger.error(f\"Operation failed: {{e}}\")\n\
```",
suggestions.iter().map(|s| format!(" • {}", s)).collect::<Vec<_>>().join("\n"),
suggestions.first().unwrap_or(&"SpecificException".to_string())
)
} else {
"Catch specific exceptions instead of generic Exception.".to_string()
};
findings.push(Finding {
id: String::new(),
detector: "BroadExceptionDetector".to_string(),
severity,
title: "Broad exception catch".to_string(),
description: format!(
"Catching generic Exception hides bugs and makes debugging difficult.{}",
context_notes
),
affected_files: vec![path.to_path_buf()],
line_start: Some((i + 1) as u32),
line_end: Some((i + 1) as u32),
suggested_fix: Some(suggestion),
estimated_effort: Some("10 minutes".to_string()),
category: Some("error-handling".to_string()),
cwe_id: None,
why_it_matters: Some(
"Broad exception catches mask unexpected errors like TypeErrors or \
AttributeErrors that indicate bugs in your code.".to_string()
),
..Default::default()
});
}
}
}
}
info!(
"BroadExceptionDetector found {} findings (graph-aware)",
findings.len()
);
Ok(findings)
}
}
impl crate::detectors::RegisteredDetector for BroadExceptionDetector {
fn create(init: &crate::detectors::DetectorInit) -> std::sync::Arc<dyn Detector> {
std::sync::Arc::new(Self::new(init.repo_path))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::builder::GraphBuilder;
#[test]
fn test_detects_bare_except() {
let graph = GraphBuilder::new().freeze();
let detector = BroadExceptionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&graph, vec![
("handler.py", "def process():\n try:\n do_work()\n except:\n log(\"something failed\")\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(!findings.is_empty(), "Should detect bare except:");
assert!(
findings[0].title.contains("Broad exception"),
"Title should mention broad exception, got: {}",
findings[0].title
);
}
#[test]
fn test_no_finding_for_specific_exception() {
let graph = GraphBuilder::new().freeze();
let detector = BroadExceptionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&graph, vec![
("handler.py", "def process():\n try:\n do_work()\n except ValueError as e:\n log(f\"bad value: {e}\")\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag specific except ValueError, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
}