use crate::detectors::base::{Detector, DetectorConfig};
use crate::graph::GraphQueryExt;
use crate::models::{Finding, Severity};
use anyhow::Result;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use tracing::info;
pub struct EmptyCatchDetector {
#[allow(dead_code)] config: DetectorConfig,
#[allow(dead_code)] repository_path: PathBuf,
max_findings: usize,
}
impl EmptyCatchDetector {
pub fn new(repository_path: impl Into<PathBuf>) -> Self {
Self {
config: DetectorConfig::default(),
repository_path: repository_path.into(),
max_findings: 100,
}
}
fn find_try_block_start(lines: &[&str], catch_line: usize) -> Option<usize> {
for i in (0..catch_line).rev() {
let trimmed = lines[i].trim();
if trimmed.starts_with("try") || trimmed == "try:" || trimmed == "try {" {
return Some(i);
}
}
None
}
fn extract_calls(lines: &[&str], start: usize, end: usize) -> HashSet<String> {
use std::sync::LazyLock;
static CALL_RE: LazyLock<regex::Regex> = LazyLock::new(|| {
regex::Regex::new(r"\b([a-zA-Z_][a-zA-Z0-9_]*)\s*\(").expect("valid regex")
});
let call_re = &*CALL_RE;
let mut calls = HashSet::new();
for line in lines.get(start..end).unwrap_or(&[]) {
for cap in call_re.captures_iter(line) {
if let Some(m) = cap.get(1) {
let name = m.as_str();
if ![
"if", "for", "while", "print", "len", "str", "int", "float", "bool",
"list", "dict", "set",
]
.contains(&name)
{
calls.insert(name.to_string());
}
}
}
}
calls
}
fn assess_risk(
calls: &HashSet<String>,
graph: &dyn crate::graph::GraphQuery,
func_qn_map: &std::collections::HashMap<String, String>,
) -> (Severity, Vec<String>) {
let io_patterns = [
"read", "write", "open", "close", "fetch", "request", "send", "recv", "connect",
"query", "execute", "save", "load", "delete", "update",
];
let mut risk_notes = Vec::new();
let mut has_io = false;
for call in calls {
let call_lower = call.to_lowercase();
if io_patterns.iter().any(|p| call_lower.contains(p)) {
has_io = true;
risk_notes.push(format!("⚠️ `{}` appears to do I/O", call));
}
}
for call in calls {
if let Some(qn) = func_qn_map.get(call.as_str()) {
let callees = graph.get_callees(qn);
if callees.len() > 5 {
risk_notes.push(format!(
"📊 `{}` is complex ({} internal calls)",
call,
callees.len()
));
}
}
}
let severity = if has_io {
Severity::High } else if !risk_notes.is_empty() {
Severity::Medium
} else {
Severity::Low };
(severity, risk_notes)
}
fn scan_file(
&self,
path: &Path,
ext: &str,
graph: &dyn crate::graph::GraphQuery,
files: &dyn crate::detectors::file_provider::FileProvider,
func_qn_map: &std::collections::HashMap<String, String>,
) -> Vec<Finding> {
let mut findings = vec![];
let content = match files.content(path) {
Some(c) => c,
None => return findings,
};
let lines: Vec<&str> = content.lines().collect();
for (i, line) in lines.iter().enumerate() {
let prev_line = if i > 0 { Some(lines[i - 1]) } else { None };
if crate::detectors::is_line_suppressed(line, prev_line) {
continue;
}
let trimmed = line.trim();
let mut is_empty_catch = false;
let mut is_common_idiom = false;
let catch_line = i;
if ext == "py" && trimmed.starts_with("except") && trimmed.ends_with(":") {
if let Some(next) = lines.get(i + 1) {
let next_trimmed = next.trim();
if next_trimmed == "pass" || next_trimmed == "..." {
let except_body = trimmed
.strip_prefix("except")
.unwrap_or("")
.strip_suffix(":")
.unwrap_or("")
.trim();
let skip_entirely = ["ImportError", "ModuleNotFoundError"];
let should_skip = !except_body.is_empty()
&& skip_entirely.iter().any(|e| except_body.contains(e));
if should_skip {
} else {
is_empty_catch = true;
let broad_catches = [
"except:",
"except Exception:",
"except BaseException:",
"except Exception as",
"except BaseException as",
];
let is_broad_catch = except_body.is_empty()
|| broad_catches.iter().any(|b| trimmed.contains(b));
if !is_broad_catch {
is_common_idiom = true;
}
}
}
}
}
if matches!(ext, "js" | "ts" | "jsx" | "tsx" | "java" | "cs")
&& trimmed.contains("catch")
&& trimmed.contains("{")
&& trimmed.contains("}")
&& (trimmed.ends_with("{ }") || trimmed.ends_with("{}"))
{
is_empty_catch = true;
}
if is_empty_catch {
let cleanup_methods: &[&str] = &[
"close",
"_close",
"__del__",
"__exit__",
"__aexit__",
"shutdown",
"dispose",
"cleanup",
"teardown",
"finalize",
"_cleanup",
"_teardown",
"_dispose",
"_shutdown",
];
let mut in_cleanup = false;
if ext == "py" {
for j in (0..catch_line).rev() {
let lt = lines[j].trim();
if lt.starts_with("def ") {
if let Some(name_part) = lt.strip_prefix("def ") {
let func_name = name_part.split('(').next().unwrap_or("").trim();
if cleanup_methods.contains(&func_name) {
in_cleanup = true;
}
}
break;
}
if lt.starts_with("class ") {
break;
}
}
}
if in_cleanup {
continue;
}
let try_body_lines: Vec<&str> = if ext == "py" {
if let Some(try_start) = Self::find_try_block_start(&lines, catch_line) {
lines
.get((try_start + 1)..catch_line)
.unwrap_or(&[])
.iter()
.map(|l| l.trim())
.filter(|l| !l.is_empty())
.collect()
} else {
vec![]
}
} else {
vec![]
};
if ext == "py" && try_body_lines.len() <= 2 {
let has_import = try_body_lines
.iter()
.any(|l| l.starts_with("import ") || l.starts_with("from "));
if has_import {
continue;
}
}
let safe_exception_types: &[&str] = &[
"KeyError",
"AttributeError",
"TypeError",
"ValueError",
"FileNotFoundError",
"OSError",
"PermissionError",
"NotImplementedError",
"StopIteration",
"UnicodeDecodeError",
"UnicodeEncodeError",
"LookupError",
"IndexError",
];
if ext == "py" && try_body_lines.len() <= 2 {
let except_types_str = trimmed
.strip_prefix("except")
.unwrap_or("")
.strip_suffix(":")
.unwrap_or("")
.trim();
let except_types_str =
except_types_str.split(" as ").next().unwrap_or("").trim();
if !except_types_str.is_empty()
&& except_types_str != "Exception"
&& except_types_str != "BaseException"
{
let types_inner = except_types_str
.trim_start_matches('(')
.trim_end_matches(')');
let all_safe = types_inner
.split(',')
.map(|t| t.trim())
.filter(|t| !t.is_empty())
.all(|t| safe_exception_types.contains(&t));
if all_safe {
continue;
}
}
}
let (severity, risk_notes) =
if let Some(try_start) = Self::find_try_block_start(&lines, catch_line) {
let calls = Self::extract_calls(&lines, try_start, catch_line);
let (sev, notes) = Self::assess_risk(&calls, graph, func_qn_map);
if !calls.is_empty() {
(sev, notes)
} else {
(Severity::Medium, vec![])
}
} else {
(Severity::Medium, vec![])
};
let severity = if is_common_idiom {
Severity::Low
} else if severity == Severity::Low {
Severity::Medium
} else {
severity
};
let context_notes = if risk_notes.is_empty() {
String::new()
} else {
format!("\n\n**Risk Assessment:**\n{}", risk_notes.join("\n"))
};
let suggestion = if severity == Severity::High {
"This swallows I/O or network exceptions - very dangerous!\n\
At minimum, log the exception:\n\
```python\n\
except Exception as e:\n\
logger.error(f\"Operation failed: {e}\")\n\
```"
.to_string()
} else {
"Log the exception or handle it appropriately:\n\
- Add logging to track failures\n\
- Re-raise if recovery isn't possible\n\
- Handle specific exception types"
.to_string()
};
findings.push(Finding {
id: String::new(),
detector: "EmptyCatchDetector".to_string(),
severity,
title: "Empty catch block swallows exceptions".to_string(),
description: format!(
"This catch block silently swallows exceptions, hiding potential bugs.{}",
context_notes
),
affected_files: vec![path.to_path_buf()],
line_start: Some((catch_line + 1) as u32),
line_end: Some((catch_line + 1) as u32),
suggested_fix: Some(suggestion),
estimated_effort: Some("10 minutes".to_string()),
category: Some("error-handling".to_string()),
cwe_id: Some("CWE-390".to_string()),
why_it_matters: Some(
"Swallowed exceptions hide bugs and make debugging extremely difficult. \
When something fails silently, you may not know until much later."
.to_string(),
),
..Default::default()
});
}
}
findings
}
}
impl Detector for EmptyCatchDetector {
fn name(&self) -> &'static str {
"empty-catch-block"
}
fn description(&self) -> &'static str {
"Detects empty catch/except blocks"
}
fn file_extensions(&self) -> &'static [&'static str] {
&[
"py", "js", "ts", "jsx", "tsx", "rb", "java", "go", "rs", "c", "cpp", "cs",
]
}
fn detect(
&self,
ctx: &crate::detectors::analysis_context::AnalysisContext,
) -> Result<Vec<Finding>> {
let graph = ctx.graph;
let files = &ctx.as_file_provider();
let mut findings = vec![];
let gi = graph.interner();
let func_qn_map: std::collections::HashMap<String, String> = graph
.get_functions()
.into_iter()
.map(|f| (f.node_name(gi).to_string(), f.qn(gi).to_string()))
.collect();
for path in
files.files_with_extensions(&["py", "js", "ts", "jsx", "tsx", "java", "cs", "cpp"])
{
if findings.len() >= self.max_findings {
break;
}
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
findings.extend(self.scan_file(path, ext, graph, files, &func_qn_map));
}
info!(
"EmptyCatchDetector found {} findings (graph-aware)",
findings.len()
);
Ok(findings)
}
}
impl crate::detectors::RegisteredDetector for EmptyCatchDetector {
fn create(init: &crate::detectors::DetectorInit) -> std::sync::Arc<dyn Detector> {
std::sync::Arc::new(Self::new(init.repo_path))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::builder::GraphBuilder;
#[test]
fn test_detects_empty_except_pass_in_python() {
let store = GraphBuilder::new().freeze();
let detector = EmptyCatchDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"handler.py",
"def process():\n try:\n do_something()\n except:\n pass\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect empty except: pass block"
);
assert!(
findings[0].title.contains("Empty catch"),
"Title should mention empty catch, got: {}",
findings[0].title
);
}
#[test]
fn test_no_finding_for_handled_exception() {
let store = GraphBuilder::new().freeze();
let detector = EmptyCatchDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("handler.py", "def process():\n try:\n do_something()\n except ValueError as e:\n logger.error(e)\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag exception that is properly handled, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_except_importerror_pass() {
let store = GraphBuilder::new().freeze();
let detector = EmptyCatchDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"optional.py",
"try:\n import yaml\nexcept ImportError:\n pass\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag except ImportError: pass (optional import idiom). Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_except_keyerror_pass_single_line() {
let store = GraphBuilder::new().freeze();
let detector = EmptyCatchDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"lookup.py",
"try:\n value = cache[key]\nexcept KeyError:\n pass\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag safe single-line probe with KeyError. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_still_detects_bare_except_pass() {
let store = GraphBuilder::new().freeze();
let detector = EmptyCatchDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("bad.py", "try:\n do_something()\nexcept:\n pass\n")],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should still detect bare except: pass"
);
assert_ne!(
findings[0].severity,
Severity::Low,
"Bare except: pass should NOT be Low severity"
);
}
#[test]
fn test_still_detects_except_exception_pass() {
let store = GraphBuilder::new().freeze();
let detector = EmptyCatchDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"bad.py",
"try:\n do_something()\nexcept Exception:\n pass\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should still detect except Exception: pass (too broad)"
);
assert_ne!(
findings[0].severity,
Severity::Low,
"except Exception: pass should NOT be Low severity"
);
}
#[test]
fn test_specific_exception_gets_low_severity() {
let store = GraphBuilder::new().freeze();
let detector = EmptyCatchDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("views.py", "def get_user(pk):\n try:\n return User.objects.get(pk=pk)\n except User.DoesNotExist:\n pass\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(!findings.is_empty(), "Should still detect empty catch");
assert!(
findings.iter().all(|f| f.severity == Severity::Low),
"Specific named exception should be Low severity. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, &f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_broad_except_gets_higher_severity() {
let store = GraphBuilder::new().freeze();
let detector = EmptyCatchDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("handler.py", "def process():\n try:\n do_something()\n except Exception:\n pass\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(!findings.is_empty(), "Should detect broad except");
assert!(
findings.iter().any(|f| f.severity != Severity::Low),
"Broad 'except Exception:' should NOT be Low severity. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, &f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_cleanup_method() {
let store = GraphBuilder::new().freeze();
let detector = EmptyCatchDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("response.py", "class Response:\n def close(self):\n for closer in self._closers:\n try:\n closer()\n except Exception:\n pass\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag empty catch in close() method. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_exit_method() {
let store = GraphBuilder::new().freeze();
let detector = EmptyCatchDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("cursor.py", "class Cursor:\n def __exit__(self, exc_type, exc_val, tb):\n try:\n self.close()\n except db.Error:\n pass\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag empty catch in __exit__ method. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_import_probing_with_broad_except() {
let store = GraphBuilder::new().freeze();
let detector = EmptyCatchDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("compat.py", "try:\n from yaml import CSafeLoader as SafeLoader\nexcept Exception:\n pass\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag import probing with broad except. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_safe_single_line_probe() {
let store = GraphBuilder::new().freeze();
let detector = EmptyCatchDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("utils.py", "def get_size(f):\n try:\n return os.path.getsize(f.name)\n except (OSError, TypeError):\n pass\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag safe single-line probe with specific exceptions. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_still_detects_broad_except_in_regular_function() {
let store = GraphBuilder::new().freeze();
let detector = EmptyCatchDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("handler.py", "def process_data():\n try:\n result = complex_operation()\n save_to_db(result)\n except Exception:\n pass\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should still flag broad except in regular function with multi-line try body"
);
}
}