use crate::detectors::base::{DetectionSummary, Detector, DetectorResult, ProgressCallback};
use crate::graph::GraphStore;
use crate::models::Finding;
use anyhow::Result;
use rayon::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tracing::{debug, error, info, warn};
const MAX_FINDINGS_LIMIT: usize = 10_000;
pub struct DetectorEngine {
detectors: Vec<Arc<dyn Detector>>,
workers: usize,
max_findings: usize,
progress_callback: Option<ProgressCallback>,
}
impl DetectorEngine {
pub fn new(workers: usize) -> Self {
let actual_workers = if workers == 0 {
std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(4)
.min(16) } else {
workers
};
Self {
detectors: Vec::new(),
workers: actual_workers,
max_findings: MAX_FINDINGS_LIMIT,
progress_callback: None,
}
}
pub fn default() -> Self {
Self::new(0)
}
pub fn with_max_findings(mut self, max: usize) -> Self {
self.max_findings = max;
self
}
pub fn with_progress_callback(mut self, callback: ProgressCallback) -> Self {
self.progress_callback = Some(callback);
self
}
pub fn register(&mut self, detector: Arc<dyn Detector>) {
debug!("Registering detector: {}", detector.name());
self.detectors.push(detector);
}
pub fn register_all(&mut self, detectors: impl IntoIterator<Item = Arc<dyn Detector>>) {
for detector in detectors {
self.register(detector);
}
}
pub fn detector_count(&self) -> usize {
self.detectors.len()
}
pub fn detector_names(&self) -> Vec<&'static str> {
self.detectors.iter().map(|d| d.name()).collect()
}
pub fn run(&self, graph: &GraphStore) -> Result<Vec<Finding>> {
let start = Instant::now();
info!(
"Starting detection with {} detectors on {} workers",
self.detectors.len(),
self.workers
);
let (independent, dependent): (Vec<_>, Vec<_>) = self.detectors
.iter()
.cloned()
.partition(|d| !d.is_dependent());
info!(
"Detectors: {} independent, {} dependent",
independent.len(),
dependent.len()
);
let completed = Arc::new(AtomicUsize::new(0));
let total = self.detectors.len();
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(self.workers)
.build()?;
let independent_results: Vec<DetectorResult> = pool.install(|| {
independent
.par_iter()
.map(|detector| {
let result = self.run_single_detector(detector, graph);
let done = completed.fetch_add(1, Ordering::SeqCst) + 1;
if let Some(ref callback) = self.progress_callback {
callback(detector.name(), done, total);
}
result
})
.collect()
});
let mut all_findings: Vec<Finding> = Vec::new();
let mut summary = DetectionSummary::default();
for result in independent_results {
summary.add_result(&result);
if result.success {
all_findings.extend(result.findings);
} else if let Some(err) = &result.error {
warn!("Detector {} failed: {}", result.detector_name, err);
}
}
for detector in dependent {
let result = self.run_single_detector(&detector, graph);
let done = completed.fetch_add(1, Ordering::SeqCst) + 1;
if let Some(ref callback) = self.progress_callback {
callback(detector.name(), done, total);
}
summary.add_result(&result);
if result.success {
all_findings.extend(result.findings);
} else if let Some(err) = &result.error {
warn!("Detector {} failed: {}", result.detector_name, err);
}
}
all_findings.sort_by(|a, b| b.severity.cmp(&a.severity));
if all_findings.len() > self.max_findings {
warn!(
"Truncating findings from {} to {} (max limit)",
all_findings.len(),
self.max_findings
);
all_findings.truncate(self.max_findings);
}
let duration = start.elapsed();
info!(
"Detection complete: {} findings from {}/{} detectors in {:?}",
all_findings.len(),
summary.detectors_succeeded,
summary.detectors_run,
duration
);
Ok(all_findings)
}
pub fn run_detailed(&self, graph: &GraphStore) -> Result<(Vec<DetectorResult>, DetectionSummary)> {
let start = Instant::now();
let (independent, dependent): (Vec<_>, Vec<_>) = self.detectors
.iter()
.cloned()
.partition(|d| !d.is_dependent());
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(self.workers)
.build()?;
let mut all_results: Vec<DetectorResult> = pool.install(|| {
independent
.par_iter()
.map(|detector| self.run_single_detector(detector, graph))
.collect()
});
for detector in dependent {
all_results.push(self.run_single_detector(&detector, graph));
}
let mut summary = DetectionSummary::default();
for result in &all_results {
summary.add_result(result);
}
summary.total_duration_ms = start.elapsed().as_millis() as u64;
Ok((all_results, summary))
}
fn run_single_detector(&self, detector: &Arc<dyn Detector>, graph: &GraphStore) -> DetectorResult {
let name = detector.name().to_string();
let start = Instant::now();
debug!("Running detector: {}", name);
let detect_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
detector.detect(graph)
}));
match detect_result {
Ok(Ok(mut findings)) => {
let duration = start.elapsed().as_millis() as u64;
if let Some(config) = detector.config() {
if let Some(max) = config.max_findings {
if findings.len() > max {
findings.truncate(max);
}
}
}
debug!(
"Detector {} found {} findings in {}ms",
name,
findings.len(),
duration
);
DetectorResult::success(name, findings, duration)
}
Ok(Err(e)) => {
let duration = start.elapsed().as_millis() as u64;
debug!("Detector {} skipped (query error): {}", name, e);
DetectorResult::failure(name, e.to_string(), duration)
}
Err(panic_info) => {
let duration = start.elapsed().as_millis() as u64;
let panic_msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
s.to_string()
} else if let Some(s) = panic_info.downcast_ref::<String>() {
s.clone()
} else {
"Unknown panic".to_string()
};
error!("Detector {} panicked: {}", name, panic_msg);
DetectorResult::failure(name, format!("Panic: {}", panic_msg), duration)
}
}
}
}
impl Default for DetectorEngine {
fn default() -> Self {
Self::new(0)
}
}
pub struct DetectorEngineBuilder {
workers: usize,
max_findings: usize,
detectors: Vec<Arc<dyn Detector>>,
progress_callback: Option<ProgressCallback>,
}
impl DetectorEngineBuilder {
pub fn new() -> Self {
Self {
workers: 0,
max_findings: MAX_FINDINGS_LIMIT,
detectors: Vec::new(),
progress_callback: None,
}
}
pub fn workers(mut self, workers: usize) -> Self {
self.workers = workers;
self
}
pub fn max_findings(mut self, max: usize) -> Self {
self.max_findings = max;
self
}
pub fn detector(mut self, detector: Arc<dyn Detector>) -> Self {
self.detectors.push(detector);
self
}
pub fn detectors(mut self, detectors: impl IntoIterator<Item = Arc<dyn Detector>>) -> Self {
self.detectors.extend(detectors);
self
}
pub fn on_progress(mut self, callback: ProgressCallback) -> Self {
self.progress_callback = Some(callback);
self
}
pub fn build(self) -> DetectorEngine {
let mut engine = DetectorEngine::new(self.workers)
.with_max_findings(self.max_findings);
if let Some(callback) = self.progress_callback {
engine = engine.with_progress_callback(callback);
}
engine.register_all(self.detectors);
engine
}
}
impl Default for DetectorEngineBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::Severity;
use std::path::PathBuf;
struct MockDetector {
name: &'static str,
findings_count: usize,
dependent: bool,
}
impl Detector for MockDetector {
fn name(&self) -> &'static str {
self.name
}
fn description(&self) -> &'static str {
"Mock detector for testing"
}
fn detect(&self, _graph: &GraphStore) -> Result<Vec<Finding>> {
Ok((0..self.findings_count)
.map(|i| Finding {
id: format!("{}-{}", self.name, i),
detector: self.name.to_string(),
severity: Severity::Medium,
title: format!("Finding {}", i),
description: "Test finding".to_string(),
affected_files: vec![PathBuf::from("test.py")],
line_start: Some(1),
line_end: Some(10),
suggested_fix: None,
estimated_effort: None,
category: None,
cwe_id: None,
why_it_matters: None,
..Default::default()
})
.collect())
}
fn is_dependent(&self) -> bool {
self.dependent
}
}
#[test]
fn test_engine_creation() {
let engine = DetectorEngine::new(4);
assert_eq!(engine.workers, 4);
assert_eq!(engine.detector_count(), 0);
}
#[test]
fn test_engine_default_workers() {
let engine = DetectorEngine::new(0);
assert!(engine.workers > 0);
assert!(engine.workers <= 16);
}
#[test]
fn test_register_detectors() {
let mut engine = DetectorEngine::new(2);
engine.register(Arc::new(MockDetector {
name: "Detector1",
findings_count: 5,
dependent: false,
}));
engine.register(Arc::new(MockDetector {
name: "Detector2",
findings_count: 3,
dependent: true,
}));
assert_eq!(engine.detector_count(), 2);
assert_eq!(engine.detector_names(), vec!["Detector1", "Detector2"]);
}
#[test]
fn test_builder() {
let engine = DetectorEngineBuilder::new()
.workers(4)
.max_findings(100)
.detector(Arc::new(MockDetector {
name: "Test",
findings_count: 1,
dependent: false,
}))
.build();
assert_eq!(engine.workers, 4);
assert_eq!(engine.max_findings, 100);
assert_eq!(engine.detector_count(), 1);
}
}