use std::path::{Path, PathBuf};
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use walkdir::WalkDir;
use crate::ast::extract::extract_file;
use crate::error::TldrError;
use crate::metrics::calculate_all_complexities_file;
use crate::types::Language;
use crate::TldrResult;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComplexityHotspot {
pub name: String,
pub file: PathBuf,
pub line: usize,
pub cyclomatic: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub cognitive: Option<usize>,
pub loc: usize,
pub rank: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FunctionComplexity {
pub name: String,
pub file: PathBuf,
pub line: usize,
pub cyclomatic: usize,
pub cognitive: usize,
pub loc: usize,
pub rank: usize,
pub is_hotspot: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComplexitySummary {
pub total_functions: usize,
pub avg_cyclomatic: f64,
pub max_cyclomatic: usize,
pub hotspot_count: usize,
pub total_loc: usize,
}
impl Default for ComplexitySummary {
fn default() -> Self {
Self {
total_functions: 0,
avg_cyclomatic: 0.0,
max_cyclomatic: 0,
hotspot_count: 0,
total_loc: 0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComplexityReport {
pub functions_analyzed: usize,
pub avg_cyclomatic: f64,
pub max_cyclomatic: usize,
pub hotspot_count: usize,
pub hotspots: Vec<ComplexityHotspot>,
pub functions: Vec<FunctionComplexity>,
pub summary: ComplexitySummary,
}
impl Default for ComplexityReport {
fn default() -> Self {
Self {
functions_analyzed: 0,
avg_cyclomatic: 0.0,
max_cyclomatic: 0,
hotspot_count: 0,
hotspots: Vec::new(),
functions: Vec::new(),
summary: ComplexitySummary::default(),
}
}
}
#[derive(Debug, Clone)]
pub struct ComplexityOptions {
pub hotspot_threshold: usize,
pub max_hotspots: usize,
pub include_cognitive: bool,
}
impl Default for ComplexityOptions {
fn default() -> Self {
Self {
hotspot_threshold: 10,
max_hotspots: 20,
include_cognitive: true,
}
}
}
pub fn analyze_complexity(
path: &Path,
language: Option<Language>,
options: Option<ComplexityOptions>,
) -> TldrResult<ComplexityReport> {
let opts = options.unwrap_or_default();
let file_paths: Vec<PathBuf> = if path.is_file() {
vec![path.to_path_buf()]
} else {
WalkDir::new(path)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
.filter(|e| {
let detected = Language::from_path(e.path());
match (detected, language) {
(Some(d), Some(l)) => d == l,
(Some(_), None) => true,
_ => false,
}
})
.map(|e| e.path().to_path_buf())
.collect()
};
let all_functions_nested: Vec<Vec<FunctionComplexity>> = file_paths
.par_iter()
.filter_map(|file_path| analyze_file_complexity(file_path, opts.include_cognitive).ok())
.collect();
let mut all_functions: Vec<FunctionComplexity> =
all_functions_nested.into_iter().flatten().collect();
all_functions.sort_by(|a, b| b.cyclomatic.cmp(&a.cyclomatic));
for (rank, func) in all_functions.iter_mut().enumerate() {
func.rank = rank + 1;
}
for func in &mut all_functions {
func.is_hotspot = func.cyclomatic > opts.hotspot_threshold;
}
let total_functions = all_functions.len();
let total_cc: usize = all_functions.iter().map(|f| f.cyclomatic).sum();
let total_loc: usize = all_functions.iter().map(|f| f.loc).sum();
let max_cyclomatic = all_functions.first().map(|f| f.cyclomatic).unwrap_or(0);
let avg_cyclomatic = if total_functions > 0 {
total_cc as f64 / total_functions as f64
} else {
0.0
};
let hotspots: Vec<ComplexityHotspot> = all_functions
.iter()
.filter(|f| f.is_hotspot)
.take(opts.max_hotspots)
.map(|f| ComplexityHotspot {
name: f.name.clone(),
file: f.file.clone(),
line: f.line,
cyclomatic: f.cyclomatic,
cognitive: if opts.include_cognitive {
Some(f.cognitive)
} else {
None
},
loc: f.loc,
rank: f.rank,
})
.collect();
let hotspot_count = all_functions.iter().filter(|f| f.is_hotspot).count();
let summary = ComplexitySummary {
total_functions,
avg_cyclomatic,
max_cyclomatic,
hotspot_count,
total_loc,
};
Ok(ComplexityReport {
functions_analyzed: total_functions,
avg_cyclomatic,
max_cyclomatic,
hotspot_count,
hotspots,
functions: all_functions,
summary,
})
}
fn analyze_file_complexity(
file_path: &Path,
include_cognitive: bool,
) -> TldrResult<Vec<FunctionComplexity>> {
Language::from_path(file_path).ok_or_else(|| {
TldrError::UnsupportedLanguage(
file_path
.extension()
.and_then(|e| e.to_str())
.unwrap_or("unknown")
.to_string(),
)
})?;
let metrics_map = calculate_all_complexities_file(file_path)?;
let module = extract_file(file_path, None)?;
let mut results = Vec::new();
for func in &module.functions {
if let Some(metrics) = metrics_map.get(&func.name) {
results.push(FunctionComplexity {
name: func.name.clone(),
file: file_path.to_path_buf(),
line: func.line_number as usize,
cyclomatic: metrics.cyclomatic as usize,
cognitive: if include_cognitive {
metrics.cognitive as usize
} else {
0
},
loc: metrics.lines_of_code as usize,
rank: 0, is_hotspot: false, });
}
}
for class in &module.classes {
for method in &class.methods {
if method.name.starts_with("__") && method.name.ends_with("__") {
continue;
}
if let Some(metrics) = metrics_map.get(&method.name) {
results.push(FunctionComplexity {
name: format!("{}.{}", class.name, method.name),
file: file_path.to_path_buf(),
line: method.line_number as usize,
cyclomatic: metrics.cyclomatic as usize,
cognitive: if include_cognitive {
metrics.cognitive as usize
} else {
0
},
loc: metrics.lines_of_code as usize,
rank: 0,
is_hotspot: false,
});
}
}
}
Ok(results)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn create_test_dir() -> TempDir {
TempDir::new().unwrap()
}
fn write_file(dir: &TempDir, name: &str, content: &str) -> PathBuf {
let path = dir.path().join(name);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).unwrap();
}
std::fs::write(&path, content).unwrap();
path
}
#[test]
fn test_complexity_empty_file() {
let dir = create_test_dir();
write_file(&dir, "empty.py", "");
let report = analyze_complexity(dir.path(), Some(Language::Python), None).unwrap();
assert_eq!(report.functions_analyzed, 0);
assert_eq!(report.avg_cyclomatic, 0.0);
assert_eq!(report.max_cyclomatic, 0);
assert_eq!(report.hotspot_count, 0);
assert!(report.hotspots.is_empty());
}
#[test]
fn test_complexity_simple_function() {
let dir = create_test_dir();
let source = r#"
def simple():
return 42
"#;
write_file(&dir, "simple.py", source);
let report = analyze_complexity(dir.path(), Some(Language::Python), None).unwrap();
assert_eq!(report.functions_analyzed, 1);
assert_eq!(report.max_cyclomatic, 1);
assert_eq!(report.hotspot_count, 0); }
#[test]
fn test_complexity_average_calculation() {
let dir = create_test_dir();
let source = r#"
def func_cc1():
return 1
def func_cc2(a):
if a:
return 1
return 0
def func_cc3(a, b):
if a:
return 1
elif b:
return 2
return 0
def func_cc4(a, b, c):
if a:
return 1
elif b:
return 2
elif c:
return 3
return 0
"#;
write_file(&dir, "multiple.py", source);
let report = analyze_complexity(dir.path(), Some(Language::Python), None).unwrap();
assert_eq!(report.functions_analyzed, 4);
assert!((report.avg_cyclomatic - 2.5).abs() < 0.01);
assert_eq!(report.max_cyclomatic, 4);
}
#[test]
fn test_complexity_hotspot_detection() {
let dir = create_test_dir();
let source = r#"
def complex_function(a, b, c, d, e, f):
result = 0
if a > 0:
if b > 0:
result += 1
elif c > 0:
result += 2
else:
result += 3
elif d > 0:
if e > 0:
result += 4
elif f > 0:
result += 5
else:
result += 6
else:
if a < -10:
result -= 1
elif b < -10:
result -= 2
else:
result -= 3
for i in range(10):
if i % 2 == 0:
result += i
return result
"#;
write_file(&dir, "high_complexity.py", source);
let report = analyze_complexity(dir.path(), Some(Language::Python), None).unwrap();
assert_eq!(report.functions_analyzed, 1);
assert!(
report.max_cyclomatic > 10,
"Expected CC > 10, got {}",
report.max_cyclomatic
);
assert!(report.hotspot_count >= 1, "Expected at least 1 hotspot");
assert!(!report.hotspots.is_empty());
assert_eq!(report.hotspots[0].rank, 1);
}
#[test]
fn test_complexity_sorted_descending() {
let dir = create_test_dir();
let source = r#"
def low_cc():
return 1
def medium_cc(a, b):
if a:
return 1
elif b:
return 2
return 0
def high_cc(a, b, c, d):
if a:
if b:
return 1
elif c:
return 2
elif d:
return 3
return 0
"#;
write_file(&dir, "sorted.py", source);
let report = analyze_complexity(dir.path(), Some(Language::Python), None).unwrap();
assert_eq!(report.functions_analyzed, 3);
for i in 1..report.functions.len() {
assert!(
report.functions[i - 1].cyclomatic >= report.functions[i].cyclomatic,
"Functions not sorted descending"
);
}
for (i, func) in report.functions.iter().enumerate() {
assert_eq!(func.rank, i + 1, "Rank mismatch for {}", func.name);
}
}
#[test]
fn test_complexity_threshold_configurable() {
let dir = create_test_dir();
let source = r#"
def moderate_cc(a, b, c, d, e):
if a:
return 1
elif b:
return 2
elif c:
return 3
elif d:
return 4
elif e:
return 5
return 0
"#;
write_file(&dir, "moderate.py", source);
let report_default = analyze_complexity(dir.path(), Some(Language::Python), None).unwrap();
assert_eq!(report_default.hotspot_count, 0);
let opts = ComplexityOptions {
hotspot_threshold: 5,
..Default::default()
};
let report_strict =
analyze_complexity(dir.path(), Some(Language::Python), Some(opts)).unwrap();
assert!(
report_strict.hotspot_count > 0,
"Expected hotspot with threshold=5"
);
}
#[test]
fn test_complexity_multi_language() {
let dir = create_test_dir();
write_file(&dir, "test.py", "def py_func():\n return 1\n");
write_file(
&dir,
"test.ts",
"function tsFunc(): number {\n return 1;\n}\n",
);
let report_all = analyze_complexity(dir.path(), None, None).unwrap();
assert!(report_all.functions_analyzed >= 1);
let report_py = analyze_complexity(dir.path(), Some(Language::Python), None).unwrap();
assert_eq!(report_py.functions_analyzed, 1);
let report_ts = analyze_complexity(dir.path(), Some(Language::TypeScript), None).unwrap();
assert!(report_ts.functions_analyzed <= 1);
}
#[test]
fn test_complexity_per_function_data() {
let dir = create_test_dir();
let source = r#"
def test_func(x, y):
if x > 0:
return y
return 0
"#;
write_file(&dir, "test.py", source);
let report = analyze_complexity(dir.path(), Some(Language::Python), None).unwrap();
assert_eq!(report.functions.len(), 1);
let func = &report.functions[0];
assert_eq!(func.name, "test_func");
assert!(func.line > 0);
assert!(func.cyclomatic >= 1);
assert!(func.loc > 0);
assert_eq!(func.rank, 1);
}
#[test]
fn test_complexity_max_hotspots() {
let dir = create_test_dir();
let mut source = String::new();
for i in 0..30 {
source.push_str(&format!(
"def func_{0}(a, b, c, d, e, f, g, h, i, j, k):\n \
if a:\n return 1\n \
elif b:\n return 2\n \
elif c:\n return 3\n \
elif d:\n return 4\n \
elif e:\n return 5\n \
elif f:\n return 6\n \
elif g:\n return 7\n \
elif h:\n return 8\n \
elif i:\n return 9\n \
elif j:\n return 10\n \
elif k:\n return 11\n \
return 0\n\n",
i
));
}
write_file(&dir, "many.py", &source);
let opts = ComplexityOptions {
max_hotspots: 10,
hotspot_threshold: 3, ..Default::default()
};
let report = analyze_complexity(dir.path(), Some(Language::Python), Some(opts)).unwrap();
assert!(
report.hotspots.len() <= 10,
"Expected at most 10 hotspots in list, got {}",
report.hotspots.len()
);
assert!(
report.hotspot_count >= 10,
"Expected at least 10 hotspots total, got {}",
report.hotspot_count
);
}
#[test]
fn test_complexity_class_methods() {
let dir = create_test_dir();
let source = r#"
class Calculator:
def add(self, a, b):
return a + b
def complex_calc(self, x, y, z):
if x > 0:
if y > 0:
return x + y
elif z > 0:
return x + z
return 0
def standalone():
return 42
"#;
write_file(&dir, "calc.py", source);
let report = analyze_complexity(dir.path(), Some(Language::Python), None).unwrap();
assert_eq!(
report.functions_analyzed, 3,
"Expected 3 functions (2 methods + 1 standalone), got {}",
report.functions_analyzed
);
let names: Vec<&str> = report.functions.iter().map(|f| f.name.as_str()).collect();
assert!(
names.contains(&"Calculator.add"),
"Missing Calculator.add, got {:?}",
names
);
assert!(
names.contains(&"Calculator.complex_calc"),
"Missing Calculator.complex_calc, got {:?}",
names
);
assert!(
names.contains(&"standalone"),
"Missing standalone, got {:?}",
names
);
let add = report
.functions
.iter()
.find(|f| f.name == "Calculator.add")
.unwrap();
let calc = report
.functions
.iter()
.find(|f| f.name == "Calculator.complex_calc")
.unwrap();
assert!(
calc.cyclomatic > add.cyclomatic,
"complex_calc CC ({}) should be > add CC ({})",
calc.cyclomatic,
add.cyclomatic
);
}
#[test]
fn test_complexity_skips_dunder_methods() {
let dir = create_test_dir();
let source = r#"
class MyClass:
def __init__(self):
self.value = 0
def __repr__(self):
return f"MyClass({self.value})"
def process(self, x):
if x > 0:
return x
return 0
"#;
write_file(&dir, "dunders.py", source);
let report = analyze_complexity(dir.path(), Some(Language::Python), None).unwrap();
assert_eq!(
report.functions_analyzed, 1,
"Expected 1 function (process only), got {}",
report.functions_analyzed
);
assert_eq!(report.functions[0].name, "MyClass.process");
}
}