//! CFG-based infinite loop and unreachable code detector.
//!
//! Detects:
//! - Infinite loops (while True without break, bare while True, etc.)
//! - Unreachable code after return/raise/break/continue
//! - High cyclomatic complexity
use std::path::Path;
use crate::detectors::base::{Detector, DetectorConfig, DetectorResult};
use crate::graph::GraphClient;
use crate::models::{Finding, Severity};
/// Infinite loop and control flow detector
///
/// Uses graph data and optionally CFG analysis to detect:
/// - Infinite loops
/// - Unreachable code
/// - High cyclomatic complexity
pub struct InfiniteLoopDetector {
config: DetectorConfig,
/// Repository path for source analysis
repository_path: Option<String>,
/// Maximum findings to report
max_findings: usize,
/// Complexity threshold
complexity_threshold: i64,
/// Whether to detect unreachable code
detect_unreachable: bool,
/// Whether to detect infinite loops
detect_infinite_loops: bool,
}
impl InfiniteLoopDetector {
/// Create a new infinite loop detector
pub fn new() -> Self {
Self {
config: DetectorConfig::default(),
repository_path: None,
max_findings: 100,
complexity_threshold: 15,
detect_unreachable: true,
detect_infinite_loops: true,
}
}
/// Set repository path for source analysis
pub fn with_repository_path<P: AsRef<Path>>(mut self, path: P) -> Self {
self.repository_path = Some(path.as_ref().to_string_lossy().to_string());
self
}
/// Set complexity threshold
pub fn with_complexity_threshold(mut self, threshold: i64) -> Self {
self.complexity_threshold = threshold;
self
}
/// Find functions with high cyclomatic complexity
fn find_high_complexity(&self, graph: &GraphClient) -> anyhow::Result<Vec<Finding>> {
let mut findings = Vec::new();
let query = format!(
r#"
MATCH (f:Function)
WHERE f.complexity > {}
RETURN f.qualifiedName AS func_name,
f.name AS func_simple_name,
f.filePath AS file_path,
f.lineStart AS line_start,
f.complexity AS complexity
ORDER BY f.complexity DESC
LIMIT {}
"#,
self.complexity_threshold, self.max_findings
);
let results = graph.execute(&query)?;
for row in results {
let func_name = row.get_string("func_name").unwrap_or_default();
let func_simple_name = row.get_string("func_simple_name").unwrap_or_default();
let file_path = row.get_string("file_path").unwrap_or_default();
let line_start = row.get_i64("line_start");
let complexity = row.get_i64("complexity").unwrap_or(0);
if func_name.is_empty() {
continue;
}
if findings.len() >= self.max_findings {
break;
}
let severity = if complexity > 25 {
Severity::High
} else if complexity > 20 {
Severity::Medium
} else {
Severity::Low
};
let description = format!(
"Function has cyclomatic complexity of {} (threshold: {}).\n\n\
**Location**: {}\n\
**Function**: {}\n\
**Complexity**: {}\n\n\
**Impact**: High complexity makes code harder to understand, test, and maintain.",
complexity, self.complexity_threshold, file_path, func_simple_name, complexity
);
let effort = if complexity > 25 {
"Medium (1-4 hours)"
} else {
"Small (30 min - 1 hour)"
};
let finding = Finding {
id: format!("high_complexity_{}", func_name),
detector: "InfiniteLoopDetector".to_string(),
severity,
title: format!("High complexity in {}: {}", func_simple_name, complexity),
description,
affected_nodes: vec![func_name],
affected_files: if file_path.is_empty() {
vec![]
} else {
vec![file_path]
},
line_start,
line_end: None,
suggested_fix: Some(format!(
"Refactor function to reduce complexity from {} to below {}",
complexity, self.complexity_threshold
)),
estimated_effort: Some(effort.to_string()),
confidence: 1.0, // Cyclomatic complexity is deterministic
tags: vec![
"complexity".to_string(),
"maintainability".to_string(),
"refactoring".to_string(),
],
metadata: serde_json::json!({
"cyclomatic_complexity": complexity,
"function": func_simple_name,
}),
};
findings.push(finding);
}
Ok(findings)
}
/// Suggest fix based on loop type
fn suggest_infinite_loop_fix(loop_type: &str) -> &'static str {
match loop_type {
"bare_while_true" => "Add a break condition or use a different loop structure",
"while_true_no_break" => "Add a break statement or use a bounded loop",
"for_infinite" => "Ensure the iterator has a finite length",
"recursive_no_base" => "Add a base case to the recursive function",
_ => "Add proper termination condition to the loop",
}
}
}
impl Default for InfiniteLoopDetector {
fn default() -> Self {
Self::new()
}
}
impl Detector for InfiniteLoopDetector {
fn name(&self) -> &'static str {
"InfiniteLoopDetector"
}
fn description(&self) -> &'static str {
"Detects infinite loops, unreachable code, and high cyclomatic complexity"
}
fn detect(&self, graph: &GraphClient) -> DetectorResult {
let mut findings = Vec::new();
// Find high complexity functions (always available from graph)
match self.find_high_complexity(graph) {
Ok(complexity_findings) => findings.extend(complexity_findings),
Err(e) => tracing::warn!("Failed to find high complexity: {}", e),
}
// Note: Full CFG-based infinite loop and unreachable code detection
// would require source file analysis. The Python version uses
// repotoire_fast.analyze_cfg_batch which is a Rust library.
// For the CLI, we focus on graph-based detection.
Ok(findings)
}
fn is_dependent(&self) -> bool {
false
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_detector_creation() {
let detector = InfiniteLoopDetector::new();
assert_eq!(detector.name(), "InfiniteLoopDetector");
assert_eq!(detector.complexity_threshold, 15);
}
#[test]
fn test_loop_fix_suggestions() {
assert!(InfiniteLoopDetector::suggest_infinite_loop_fix("bare_while_true").contains("break"));
assert!(InfiniteLoopDetector::suggest_infinite_loop_fix("recursive_no_base")
.contains("base case"));
}
}