use normalize_facts::extract::compute_complexity;
use normalize_languages::parsers::{grammar_loader, parse_with_grammar};
use normalize_languages::support_for_path;
use normalize_output::diagnostics::{DiagnosticsReport, Issue, Severity};
use std::path::Path;
use streaming_iterator::StreamingIterator;
use crate::cache::{FileRule, run_file_rule};
use normalize_rules_config::WalkConfig;
#[derive(serde::Serialize, serde::Deserialize)]
pub struct HighComplexityFinding {
rel_path: String,
name: String,
start_line: usize,
complexity: usize,
}
pub struct HighComplexityRule {
pub threshold: usize,
}
impl FileRule for HighComplexityRule {
type Finding = HighComplexityFinding;
fn engine_name(&self) -> &str {
"high-complexity"
}
fn config_hash(&self) -> String {
self.threshold.to_string()
}
fn check_file(&self, path: &Path, root: &Path) -> Vec<Self::Finding> {
let support = match support_for_path(path) {
Some(s) => s,
None => return Vec::new(),
};
let content = match std::fs::read_to_string(path) {
Ok(c) => c,
Err(_) => return Vec::new(),
};
let grammar_name = support.grammar_name();
let tree = match parse_with_grammar(grammar_name, &content) {
Some(t) => t,
None => return Vec::new(),
};
let loader = grammar_loader();
let tags_scm = match loader.get_tags(grammar_name) {
Some(t) => t,
None => return Vec::new(),
};
let ts_lang = match loader.get(grammar_name) {
Ok(l) => l,
Err(_) => return Vec::new(),
};
let tags_query = match tree_sitter::Query::new(&ts_lang, &tags_scm) {
Ok(q) => q,
Err(_) => return Vec::new(),
};
let complexity_query = loader.get_complexity(grammar_name).and_then(|scm| {
let grammar = loader.get(grammar_name).ok()?;
tree_sitter::Query::new(&grammar, &scm).ok()
});
let capture_names = tags_query.capture_names();
let root_node = tree.root_node();
let mut qcursor = tree_sitter::QueryCursor::new();
let mut matches = qcursor.matches(&tags_query, root_node, content.as_bytes());
let rel_path = path
.strip_prefix(root)
.unwrap_or(path)
.to_string_lossy()
.to_string();
let mut results = Vec::new();
while let Some(m) = matches.next() {
for capture in m.captures {
let cn = capture_names[capture.index as usize];
if !matches!(cn, "definition.function" | "definition.method") {
continue;
}
let node = capture.node;
let name = match support.node_name(&node, &content) {
Some(n) => n.to_string(),
None => continue,
};
let complexity = if let Some(ref cq) = complexity_query {
count_complexity_with_query(&node, cq, &content)
} else {
compute_complexity(&node, support, content.as_bytes())
};
if complexity >= self.threshold {
let start_line = node.start_position().row + 1;
results.push(HighComplexityFinding {
rel_path: rel_path.clone(),
name,
start_line,
complexity,
});
}
}
}
results
}
fn to_diagnostics(
&self,
findings: Vec<(std::path::PathBuf, Vec<Self::Finding>)>,
_root: &Path,
files_checked: usize,
) -> DiagnosticsReport {
let threshold = self.threshold;
let mut issues: Vec<Issue> = findings
.into_iter()
.flat_map(|(_path, file_findings)| file_findings)
.map(|f| Issue {
file: f.rel_path,
line: Some(f.start_line),
column: None,
end_line: None,
end_column: None,
rule_id: "high-complexity".into(),
message: format!(
"function `{}` has cyclomatic complexity {} (threshold: {threshold})",
f.name, f.complexity
),
severity: Severity::Warning,
source: "high-complexity".into(),
related: vec![],
suggestion: Some(
"consider extracting helper functions to reduce complexity".into(),
),
})
.collect();
issues.sort_by(|a, b| {
let extract = |msg: &str| -> usize {
msg.rsplit("complexity ")
.next()
.and_then(|s| s.split(' ').next())
.and_then(|s| s.parse().ok())
.unwrap_or(0)
};
extract(&b.message).cmp(&extract(&a.message))
});
DiagnosticsReport {
issues,
files_checked,
sources_run: vec!["high-complexity".into()],
tool_errors: vec![],
daemon_cached: false,
}
}
}
fn count_complexity_with_query(
node: &tree_sitter::Node,
query: &tree_sitter::Query,
content: &str,
) -> usize {
let complexity_idx = query
.capture_names()
.iter()
.position(|n| *n == "complexity");
let Some(complexity_idx) = complexity_idx else {
return 1;
};
let mut qcursor = tree_sitter::QueryCursor::new();
qcursor.set_byte_range(node.byte_range());
let mut complexity = 1usize;
let mut matches = qcursor.matches(query, *node, content.as_bytes());
while let Some(m) = matches.next() {
for capture in m.captures {
if capture.index as usize == complexity_idx {
complexity += 1;
}
}
}
complexity
}
pub fn build_high_complexity_report(
root: &Path,
threshold: usize,
explicit_files: Option<&[std::path::PathBuf]>,
walk_config: &WalkConfig,
) -> DiagnosticsReport {
let rule = HighComplexityRule { threshold };
run_file_rule(&rule, root, explicit_files, walk_config)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write as _;
fn make_python_function_with_branches(
dir: &std::path::Path,
name: &str,
branch_count: usize,
) -> std::path::PathBuf {
let path = dir.join(name);
let mut f = std::fs::File::create(&path).unwrap();
writeln!(f, "def complex_function(x):").unwrap();
for i in 0..branch_count {
writeln!(f, " if x == {i}:").unwrap();
writeln!(f, " return {i}").unwrap();
}
writeln!(f, " return -1").unwrap();
path
}
#[test]
fn test_default_threshold_not_triggered() {
let dir = tempfile::tempdir().unwrap();
let path = make_python_function_with_branches(dir.path(), "low.py", 18);
let rule = HighComplexityRule { threshold: 20 };
let findings = rule.check_file(&path, dir.path());
assert!(
findings.is_empty(),
"complexity 19 should not trigger default threshold of 20; got {} findings",
findings.len()
);
}
#[test]
fn test_default_threshold_triggered() {
let dir = tempfile::tempdir().unwrap();
let path = make_python_function_with_branches(dir.path(), "high.py", 19);
let rule = HighComplexityRule { threshold: 20 };
let findings = rule.check_file(&path, dir.path());
assert!(
!findings.is_empty(),
"complexity 20 should trigger default threshold of 20"
);
}
#[test]
fn test_custom_threshold_lower() {
let dir = tempfile::tempdir().unwrap();
let path = make_python_function_with_branches(dir.path(), "medium.py", 5);
let rule = HighComplexityRule { threshold: 5 };
let findings = rule.check_file(&path, dir.path());
assert!(
!findings.is_empty(),
"complexity 6 should trigger custom threshold of 5"
);
}
#[test]
fn test_custom_threshold_higher() {
let dir = tempfile::tempdir().unwrap();
let path = make_python_function_with_branches(dir.path(), "medium.py", 19);
let rule = HighComplexityRule { threshold: 30 };
let findings = rule.check_file(&path, dir.path());
assert!(
findings.is_empty(),
"complexity 20 should not trigger custom threshold of 30"
);
}
}