use crate::detectors::base::{Detector, DetectorConfig};
use crate::graph::GraphStore;
use crate::models::{deterministic_finding_id, Finding, Severity};
use anyhow::Result;
use regex::Regex;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::OnceLock;
use tracing::info;
static TIMEOUT_PATTERN: OnceLock<Regex> = OnceLock::new();
fn timeout_pattern() -> &'static Regex {
TIMEOUT_PATTERN.get_or_init(|| {
Regex::new(r"(?i)(timeout|sleep|delay|wait|setTimeout|setInterval|read_timeout|write_timeout|connect_timeout)\s*[\(=:]\s*(\d{4,})").expect("valid regex")
})
}
fn format_duration(ms: u64) -> String {
if ms >= 60000 {
format!("{} minutes", ms / 60000)
} else if ms >= 1000 {
format!("{} seconds", ms / 1000)
} else {
format!("{} ms", ms)
}
}
fn suggest_constant_name(context: &str, value: u64) -> String {
let ctx_lower = context.to_lowercase();
if ctx_lower.contains("connect") {
return "CONNECTION_TIMEOUT_MS".to_string();
}
if ctx_lower.contains("read") {
return "READ_TIMEOUT_MS".to_string();
}
if ctx_lower.contains("write") {
return "WRITE_TIMEOUT_MS".to_string();
}
if ctx_lower.contains("request") || ctx_lower.contains("http") {
return "REQUEST_TIMEOUT_MS".to_string();
}
if ctx_lower.contains("database") || ctx_lower.contains("db") || ctx_lower.contains("query") {
return "DB_TIMEOUT_MS".to_string();
}
if ctx_lower.contains("socket") {
return "SOCKET_TIMEOUT_MS".to_string();
}
if ctx_lower.contains("retry") || ctx_lower.contains("backoff") {
return "RETRY_DELAY_MS".to_string();
}
if ctx_lower.contains("poll") || ctx_lower.contains("interval") {
return "POLL_INTERVAL_MS".to_string();
}
format!("TIMEOUT_{}MS", value)
}
pub struct HardcodedTimeoutDetector {
repository_path: PathBuf,
max_findings: usize,
}
impl HardcodedTimeoutDetector {
pub fn new(repository_path: impl Into<PathBuf>) -> Self {
Self {
repository_path: repository_path.into(),
max_findings: 50,
}
}
fn analyze_context(line: &str) -> (String, bool) {
let line_lower = line.to_lowercase();
if line_lower.contains("connect")
|| line_lower.contains("request")
|| line_lower.contains("http")
|| line_lower.contains("socket")
|| line_lower.contains("database")
|| line_lower.contains("query")
|| line_lower.contains("grpc")
|| line_lower.contains("rpc")
{
return ("Network/database operation".to_string(), true);
}
if line_lower.contains("read")
|| line_lower.contains("write")
|| line_lower.contains("file")
|| line_lower.contains("stream")
{
return ("I/O operation".to_string(), false);
}
if line_lower.contains("animation")
|| line_lower.contains("transition")
|| line_lower.contains("debounce")
|| line_lower.contains("throttle")
{
return ("UI/animation".to_string(), false);
}
("General timeout".to_string(), false)
}
fn count_occurrences(&self) -> HashMap<u64, usize> {
let mut counts: HashMap<u64, usize> = HashMap::new();
let walker = ignore::WalkBuilder::new(&self.repository_path)
.hidden(false)
.git_ignore(true)
.build();
for entry in walker.filter_map(|e| e.ok()) {
let path = entry.path();
if !path.is_file() {
continue;
}
if let Some(content) = crate::cache::global_cache().get_content(path) {
for line in content.lines() {
if let Some(caps) = timeout_pattern().captures(line) {
if let Some(val) = caps.get(2) {
if let Ok(v) = val.as_str().parse::<u64>() {
*counts.entry(v).or_default() += 1;
}
}
}
}
}
}
counts
}
fn find_containing_function(
graph: &dyn crate::graph::GraphQuery,
file_path: &str,
line: u32,
) -> Option<String> {
graph
.get_functions()
.into_iter()
.find(|f| f.file_path == file_path && f.line_start <= line && f.line_end >= line)
.map(|f| f.name)
}
}
impl Detector for HardcodedTimeoutDetector {
fn name(&self) -> &'static str {
"hardcoded-timeout"
}
fn description(&self) -> &'static str {
"Detects hardcoded timeout values"
}
fn detect(&self, graph: &dyn crate::graph::GraphQuery) -> Result<Vec<Finding>> {
let mut findings = vec![];
let occurrence_counts = self.count_occurrences();
let walker = ignore::WalkBuilder::new(&self.repository_path)
.hidden(false)
.git_ignore(true)
.build();
for entry in walker.filter_map(|e| e.ok()) {
if findings.len() >= self.max_findings {
break;
}
let path = entry.path();
if !path.is_file() {
continue;
}
let path_str = path.to_string_lossy().to_string();
if crate::detectors::base::is_test_path(&path_str) || path_str.contains("config") {
continue;
}
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
if !matches!(
ext,
"py" | "js" | "ts" | "java" | "go" | "rs" | "rb" | "jsx" | "tsx"
) {
continue;
}
if let Some(content) = crate::cache::global_cache().get_content(path) {
for (i, line) in content.lines().enumerate() {
let trimmed = line.trim();
if trimmed.starts_with("//") || trimmed.starts_with("#") {
continue;
}
if let Some(caps) = timeout_pattern().captures(line) {
if let (Some(keyword), Some(val)) = (caps.get(1), caps.get(2)) {
let _keyword_str = keyword.as_str();
let value: u64 = val.as_str().parse().unwrap_or(0);
let occurrences = occurrence_counts.get(&value).copied().unwrap_or(1);
let (context, is_network) = Self::analyze_context(line);
let containing_func =
Self::find_containing_function(graph, &path_str, (i + 1) as u32);
let severity = if is_network || occurrences > 3 {
Severity::Medium } else {
Severity::Low
};
let mut notes = Vec::new();
notes.push(format!("⏱️ Duration: {}", format_duration(value)));
notes.push(format!("📍 Context: {}", context));
if occurrences > 1 {
notes.push(format!("📊 Same value used {} times", occurrences));
}
if let Some(func) = containing_func {
notes.push(format!("📦 In function: `{}`", func));
}
let context_notes = format!("\n\n**Analysis:**\n{}", notes.join("\n"));
let const_name = suggest_constant_name(line, value);
let suggestion = if occurrences > 3 {
format!(
"This timeout value appears {} times. Extract to a centralized config:\n\n\
```python\n\
# config.py\n\
{} = {} # {}\n\
\n\
# usage\n\
from config import {}\n\
requests.get(url, timeout={}/1000)\n\
```",
occurrences, const_name, value, format_duration(value),
const_name, const_name
)
} else if is_network {
format!(
"Network timeouts should be configurable:\n\n\
```python\n\
import os\n\
{} = int(os.environ.get('{}', '{}'))\n\
```",
const_name, const_name, value
)
} else {
format!(
"Extract to a named constant:\n\n\
```python\n\
{} = {} # {}\n\
```",
const_name,
value,
format_duration(value)
)
};
findings.push(Finding {
id: String::new(),
detector: "HardcodedTimeoutDetector".to_string(),
severity,
title: format!("Hardcoded timeout: {}", format_duration(value)),
description: format!(
"Magic timeout value `{}` makes configuration and tuning difficult.{}",
value, context_notes
),
affected_files: vec![path.to_path_buf()],
line_start: Some((i + 1) as u32),
line_end: Some((i + 1) as u32),
suggested_fix: Some(suggestion),
estimated_effort: Some("5 minutes".to_string()),
category: Some("maintainability".to_string()),
cwe_id: None,
why_it_matters: Some(
"Hardcoded timeouts are hard to find, tune, and change across environments. \
Network timeouts especially need to be configurable based on deployment.".to_string()
),
..Default::default()
});
}
}
}
}
}
info!(
"HardcodedTimeoutDetector found {} findings (graph-aware)",
findings.len()
);
Ok(findings)
}
}