use crate::detectors::base::{Detector, DetectorConfig};
use crate::detectors::external_tool::{get_graph_context, run_external_tool};
use crate::graph::GraphStore;
use crate::models::{Finding, Severity};
use anyhow::Result;
use serde_json::Value as JsonValue;
use std::path::{Path, PathBuf};
use tracing::{info, warn};
use uuid::Uuid;
pub struct PylintDetector {
config: DetectorConfig,
repository_path: PathBuf,
max_findings: usize,
parallel_jobs: usize,
enable_only: Vec<String>,
disable: Vec<String>,
}
impl PylintDetector {
pub fn new(repository_path: impl Into<PathBuf>) -> Self {
Self {
config: DetectorConfig::default(),
repository_path: repository_path.into(),
max_findings: 100,
parallel_jobs: num_cpus::get(),
enable_only: Vec::new(),
disable: Vec::new(),
}
}
pub fn with_max_findings(mut self, max: usize) -> Self {
self.max_findings = max;
self
}
pub fn with_parallel_jobs(mut self, jobs: usize) -> Self {
self.parallel_jobs = jobs;
self
}
pub fn with_enable_only(mut self, rules: Vec<String>) -> Self {
self.enable_only = rules;
self
}
pub fn with_disable(mut self, rules: Vec<String>) -> Self {
self.disable = rules;
self
}
fn run_pylint(&self) -> Vec<JsonValue> {
let mut cmd = vec![
"pylint".to_string(),
"--output-format=json".to_string(),
"--recursive=y".to_string(),
format!("-j{}", self.parallel_jobs),
];
if !self.enable_only.is_empty() {
cmd.push("--disable=all".to_string());
cmd.push(format!("--enable={}", self.enable_only.join(",")));
} else if !self.disable.is_empty() {
cmd.push(format!("--disable={}", self.disable.join(",")));
}
cmd.push(self.repository_path.to_string_lossy().to_string());
let result = run_external_tool(&cmd, "pylint", 300, Some(&self.repository_path), None);
if result.timed_out {
warn!("Pylint timed out");
return Vec::new();
}
result.json_array().unwrap_or_default()
}
fn map_severity(msg_type: &str) -> Severity {
match msg_type.to_lowercase().as_str() {
"fatal" => Severity::Critical,
"error" => Severity::High,
"warning" => Severity::Medium,
"refactor" => Severity::Low,
"convention" => Severity::Low,
"info" => Severity::Info,
_ => Severity::Low,
}
}
fn create_finding(
&self,
result: &JsonValue,
graph: &GraphStore,
) -> Option<Finding> {
let file_path = result.get("path")?.as_str()?;
let line = result.get("line")?.as_u64()? as u32;
let column = result.get("column").and_then(|c| c.as_u64()).unwrap_or(0) as u32;
let message = result.get("message")?.as_str()?;
let message_id = result.get("message-id").and_then(|m| m.as_str()).unwrap_or("");
let symbol = result.get("symbol").and_then(|s| s.as_str()).unwrap_or("");
let msg_type = result.get("type").and_then(|t| t.as_str()).unwrap_or("convention");
let rel_path = if Path::new(file_path).is_absolute() {
Path::new(file_path)
.strip_prefix(&self.repository_path)
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|_| file_path.to_string())
} else {
file_path.to_string()
};
let ctx = get_graph_context(graph, &rel_path, Some(line));
let severity = Self::map_severity(msg_type);
let mut description = format!(
"{}\n\n\
**Location**: {}:{}\n\
**Rule**: {} ({})\n",
message, rel_path, line, symbol, message_id
);
if let Some(loc) = ctx.file_loc {
description.push_str(&format!("**File Size**: {} LOC\n", loc));
}
if ctx.max_complexity() > 0 {
description.push_str(&format!("**Complexity**: {}\n", ctx.max_complexity()));
}
if !ctx.affected_nodes.is_empty() {
description.push_str(&format!(
"**Affected**: {}\n",
ctx.affected_nodes.iter().take(3).cloned().collect::<Vec<_>>().join(", ")
));
}
Some(Finding {
id: Uuid::new_v4().to_string(),
detector: "PylintDetector".to_string(),
severity,
title: format!("Code quality: {}", symbol),
description,
affected_files: vec![PathBuf::from(&rel_path)],
line_start: Some(line),
line_end: Some(line),
suggested_fix: Some(Self::suggest_fix(symbol, message)),
estimated_effort: Some("Small (5-15 minutes)".to_string()),
category: Some(Self::get_category_tag(symbol)),
cwe_id: None,
why_it_matters: None,
..Default::default()
})
}
fn suggest_fix(symbol: &str, message: &str) -> String {
match symbol {
"unused-import" => "Remove the unused import statement".to_string(),
"unused-variable" => "Remove the unused variable or prefix with underscore".to_string(),
"too-many-arguments" => "Refactor to use a data class or reduce parameters".to_string(),
"too-many-locals" => "Extract helper functions to reduce local variables".to_string(),
"line-too-long" => "Break the line into multiple lines".to_string(),
"missing-docstring" => "Add a docstring explaining the purpose".to_string(),
"broad-except" => "Catch specific exceptions instead of broad Exception".to_string(),
"consider-using-enumerate" => "Use enumerate() for cleaner iteration".to_string(),
"consider-using-with" => "Use context manager (with statement)".to_string(),
"redefined-outer-name" => "Rename variable to avoid shadowing outer scope".to_string(),
_ => format!("Review pylint suggestion: {}", message),
}
}
fn get_category_tag(symbol: &str) -> String {
if symbol.contains("unused") {
"unused_code".to_string()
} else if symbol.contains("too-many") {
"complexity".to_string()
} else if symbol.contains("docstring") {
"documentation".to_string()
} else if symbol.contains("line-too-long") || symbol.contains("whitespace") || symbol.contains("indentation") {
"style".to_string()
} else if symbol.contains("except") {
"error_handling".to_string()
} else if symbol.contains("redefined") || symbol.contains("builtin") || symbol.contains("global") {
"naming_scope".to_string()
} else if symbol.contains("consider-using") || symbol.contains("unnecessary") {
"refactoring".to_string()
} else if symbol.contains("duplicate") {
"duplication".to_string()
} else {
"general".to_string()
}
}
}
impl Detector for PylintDetector {
fn name(&self) -> &'static str {
"PylintDetector"
}
fn description(&self) -> &'static str {
"Detects code quality issues in Python using Pylint"
}
fn detect(&self, graph: &GraphStore) -> Result<Vec<Finding>> {
info!("Running Pylint on {:?}", self.repository_path);
let results = self.run_pylint();
if results.is_empty() {
info!("No pylint violations found");
return Ok(Vec::new());
}
let findings: Vec<Finding> = results
.iter()
.take(self.max_findings)
.filter_map(|r| self.create_finding(r, graph))
.collect();
info!("Created {} code quality findings", findings.len());
Ok(findings)
}
fn category(&self) -> &'static str {
"code_quality"
}
fn config(&self) -> Option<&DetectorConfig> {
Some(&self.config)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_severity_mapping() {
assert_eq!(PylintDetector::map_severity("fatal"), Severity::Critical);
assert_eq!(PylintDetector::map_severity("error"), Severity::High);
assert_eq!(PylintDetector::map_severity("warning"), Severity::Medium);
assert_eq!(PylintDetector::map_severity("refactor"), Severity::Low);
assert_eq!(PylintDetector::map_severity("convention"), Severity::Low);
}
}