use crate::detectors::base::{Detector, DetectorConfig};
use crate::graph::GraphQueryExt;
use crate::models::{deterministic_finding_id, Finding, Severity};
use anyhow::Result;
use regex::Regex;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::LazyLock;
use tracing::info;
static TEST_IMPORT: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r#"(?i)(import.*pytest|import.*unittest|import.*mock|from.*mock|require\(['"]jest|require\(['"]sinon|import.*@testing-library)"#).expect("valid regex")
});
static TEST_USAGE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"(?i)(mock\.|Mock\(|MagicMock|patch\(|stub\.|fake\.|spy\.|jest\.|sinon\.|@pytest|@test|unittest\.|\.toBe\(|\.toEqual\(|\.toHaveBeenCalled|\.toThrow\(|fixture|@Before|@After|@BeforeEach)").expect("valid regex")
});
static DEBUG_PATTERN: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"(?i)(DEBUG\s*=\s*True|if\s+__debug__|if\s+DEBUG|#\s*TODO.*test|#\s*FIXME.*test)")
.expect("valid regex")
});
pub struct TestInProductionDetector {
#[allow(dead_code)] repository_path: PathBuf,
max_findings: usize,
}
impl TestInProductionDetector {
crate::detectors::detector_new!(50);
fn categorize_test_code(line: &str) -> (&'static str, &'static str) {
let lower = line.to_lowercase();
if lower.contains("mock") || lower.contains("patch") || lower.contains("stub") {
return ("mock", "Mock/stub objects");
}
if lower.contains("assert") || lower.contains("expect") || lower.contains("tobe") {
return ("assertion", "Test assertions");
}
if lower.contains("fixture") || lower.contains("setup") || lower.contains("teardown") {
return ("fixture", "Test fixtures");
}
if lower.contains("describe(") || lower.contains("it(") || lower.contains("test(") {
return ("framework", "Test framework");
}
if lower.contains("spy") {
return ("spy", "Spy functions");
}
("unknown", "Test code")
}
fn is_imported_by_production(graph: &dyn crate::graph::GraphQuery, file_path: &str) -> bool {
let i = graph.interner();
let funcs = graph.get_functions_in_file(file_path);
for func in funcs {
for caller in graph.get_callers(func.qn(i)) {
let caller_path = caller.path(i);
if !crate::detectors::base::is_test_path(caller_path) {
return true;
}
}
}
false
}
}
impl Detector for TestInProductionDetector {
fn name(&self) -> &'static str {
"test-in-production"
}
fn description(&self) -> &'static str {
"Detects test code in production files"
}
fn file_extensions(&self) -> &'static [&'static str] {
&["py", "js", "ts", "jsx", "tsx", "java", "go", "rs"]
}
fn detect(
&self,
ctx: &crate::detectors::analysis_context::AnalysisContext,
) -> Result<Vec<Finding>> {
let graph = ctx.graph;
let files = &ctx.as_file_provider();
let mut findings = vec![];
let mut issues_per_file: HashMap<PathBuf, Vec<(u32, String, String)>> = HashMap::new();
for path in
files.files_with_extensions(&["py", "js", "ts", "jsx", "tsx", "java", "rb", "go"])
{
if findings.len() >= self.max_findings {
break;
}
let path_str = path.to_string_lossy().to_string();
if crate::detectors::base::is_test_path(&path_str)
|| path_str.contains("__tests__")
|| path_str.contains("fixtures")
|| path_str.contains("conftest")
|| path_str.contains("devtools")
|| path_str.contains("debug")
|| path_str.contains("/scripts/")
{
continue;
}
if let Some(content) = files.masked_content(path) {
let lines: Vec<&str> = content.lines().collect();
let mut file_issues = Vec::new();
let has_test_import = lines.iter().any(|l| TEST_IMPORT.is_match(l));
for (i, line) in lines.iter().enumerate() {
let prev_line = if i > 0 { Some(lines[i - 1]) } else { None };
if crate::detectors::is_line_suppressed(line, prev_line) {
continue;
}
let trimmed = line.trim();
if trimmed.starts_with("//")
|| trimmed.starts_with("#")
|| trimmed.starts_with("*")
{
continue;
}
if TEST_USAGE.is_match(line) {
let (category, desc) = Self::categorize_test_code(line);
file_issues.push(((i + 1) as u32, category.to_string(), desc.to_string()));
}
if DEBUG_PATTERN.is_match(line) {
file_issues.push((
(i + 1) as u32,
"debug".to_string(),
"Debug flag".to_string(),
));
}
}
if !file_issues.is_empty() || has_test_import {
issues_per_file.insert(path.to_path_buf(), file_issues);
}
}
}
let mut sorted_issues: Vec<_> = issues_per_file.into_iter().collect();
sorted_issues.sort_by(|(a, _), (b, _)| a.cmp(b));
for (file_path, issues) in sorted_issues {
let path_str = file_path.to_string_lossy().to_string();
let is_used = Self::is_imported_by_production(graph, &path_str);
let mut by_type: HashMap<String, Vec<u32>> = HashMap::new();
for (line, category, _) in &issues {
by_type.entry(category.clone()).or_default().push(*line);
}
let severity = if is_used {
Severity::High } else if by_type.contains_key("mock") || by_type.contains_key("assertion") {
Severity::Medium
} else {
Severity::Low
};
let mut sorted_by_type: Vec<_> = by_type.iter().collect();
sorted_by_type.sort_by_key(|(a, _)| *a);
let mut notes = Vec::new();
for (category, lines) in sorted_by_type {
let desc = match category.as_str() {
"mock" => "Mock/stub objects",
"assertion" => "Test assertions",
"fixture" => "Test fixtures",
"framework" => "Test framework code",
"spy" => "Spy functions",
"debug" => "Debug flags",
_ => "Test code",
};
notes.push(format!(
"• {} (line{})",
desc,
if lines.len() > 1 {
format!(
"s {}",
lines
.iter()
.map(|l| l.to_string())
.collect::<Vec<_>>()
.join(", ")
)
} else {
format!(" {}", lines[0])
}
));
}
if is_used {
notes.push("⚠️ This file is imported by production code!".to_string());
}
let context_notes = format!("\n\n**Found:**\n{}", notes.join("\n"));
let first_line = issues.first().map(|(l, _, _)| *l).unwrap_or(1);
findings.push(Finding {
id: String::new(),
detector: "TestInProductionDetector".to_string(),
severity,
title: format!("Test code in production: {} issue{}",
issues.len(),
if issues.len() > 1 { "s" } else { "" }
),
description: format!(
"Test utilities and patterns found in what appears to be production code.{}",
context_notes
),
affected_files: vec![file_path.clone()],
line_start: Some(first_line),
line_end: Some(issues.last().map(|(l, _, _)| *l).unwrap_or(first_line)),
suggested_fix: Some(
"Options:\n\n\
1. **Move to test files:** If this is test code, move it to a test directory\n\
2. **Use TYPE_CHECKING:** For type hints only, wrap imports in `if TYPE_CHECKING:`\n\
3. **Environment check:** If needed at runtime, guard with environment variable\n\n\
```python\n\
# Instead of:\n\
from unittest.mock import Mock\n\
\n\
# Use:\n\
from typing import TYPE_CHECKING\n\
if TYPE_CHECKING:\n\
from unittest.mock import Mock\n\
```".to_string()
),
estimated_effort: Some("15 minutes".to_string()),
category: Some("code-quality".to_string()),
cwe_id: Some("CWE-489".to_string()), why_it_matters: Some(if is_used {
"This test code is imported by production code. Test dependencies add bloat, \
security risks, and can cause unexpected behavior in production.".to_string()
} else {
"Test code in production files adds confusion and potential bloat. \
It may be accidentally executed or cause import errors in environments \
without test dependencies.".to_string()
}),
..Default::default()
});
}
info!(
"TestInProductionDetector found {} findings (graph-aware)",
findings.len()
);
Ok(findings)
}
}
impl crate::detectors::RegisteredDetector for TestInProductionDetector {
fn create(init: &crate::detectors::DetectorInit) -> std::sync::Arc<dyn Detector> {
std::sync::Arc::new(Self::new(init.repo_path))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::builder::GraphBuilder;
#[test]
fn test_detects_mock_in_production_code() {
let store = GraphBuilder::new().freeze();
let detector = TestInProductionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("service.py", "\nfrom unittest.mock import Mock\n\ndef get_client():\n client = Mock()\n return client\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Should detect test code (Mock) in production file"
);
assert!(findings
.iter()
.any(|f| f.detector == "TestInProductionDetector"));
}
#[test]
fn test_no_finding_for_clean_production_code() {
let store = GraphBuilder::new().freeze();
let detector = TestInProductionDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("service.py", "\nimport os\n\ndef get_config():\n return os.environ.get(\"APP_CONFIG\", \"default\")\n\ndef process_data(data):\n return [item.strip() for item in data]\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag clean production code, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
}