use crate::detectors::base::{Detector, DetectorConfig};
use crate::graph::GraphQueryExt;
use crate::models::{Finding, Severity};
use anyhow::Result;
use regex::Regex;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::LazyLock;
use tracing::info;
static GENERATOR_DEF: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"def\s+(\w+)\s*\(").expect("valid regex"));
static YIELD_STMT: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"\byield\b").expect("valid regex"));
static YIELD_FROM: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"\byield\s+from\b").expect("valid regex"));
static LIST_CALL: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"list\s*\(\s*(\w+)\s*\(").expect("valid regex"));
pub struct GeneratorMisuseDetector {
config: DetectorConfig,
#[allow(dead_code)] repository_path: PathBuf,
max_findings: usize,
}
impl GeneratorMisuseDetector {
pub fn new() -> Self {
Self {
config: DetectorConfig::new(),
repository_path: PathBuf::from("."),
max_findings: 50,
}
}
pub fn with_path(repository_path: impl Into<PathBuf>) -> Self {
Self {
config: DetectorConfig::new(),
repository_path: repository_path.into(),
max_findings: 50,
}
}
fn yield_is_in_string(line: &str, yield_byte_offset: usize) -> bool {
let prefix = &line[..yield_byte_offset];
let single_quotes = prefix.chars().filter(|&c| c == '\'').count();
let double_quotes = prefix.chars().filter(|&c| c == '"').count();
single_quotes % 2 == 1 || double_quotes % 2 == 1
}
fn count_yields(lines: &[&str], func_start: usize, indent: usize) -> (usize, bool) {
let mut count = 0;
let mut in_loop = false;
for line in lines.iter().skip(func_start + 1) {
let current_indent = line.chars().take_while(|c| c.is_whitespace()).count();
if !line.trim().is_empty() && current_indent <= indent {
break;
}
if line.trim().starts_with('#') {
continue;
}
if line.contains("for ") || line.contains("while ") {
in_loop = true;
}
if YIELD_FROM.is_match(line) {
return (2, in_loop);
}
if let Some(m) = YIELD_STMT.find(line) {
if Self::yield_is_in_string(line, m.start()) {
continue;
}
count += 1;
}
}
(count, in_loop)
}
fn is_resource_management_yield(lines: &[&str], func_start: usize, indent: usize) -> bool {
let mut has_try = false;
let mut has_finally = false;
for line in lines.iter().skip(func_start + 1) {
let current_indent = line.chars().take_while(|c| c.is_whitespace()).count();
if !line.trim().is_empty() && current_indent <= indent {
break;
}
let trimmed = line.trim();
if trimmed.starts_with("try:") {
has_try = true;
}
if trimmed.starts_with("finally:") {
has_finally = true;
}
}
has_try && has_finally
}
fn has_framework_yield_import(content: &str) -> bool {
content.contains("from fastapi")
|| content.contains("from starlette")
|| content.contains("from contextlib import contextmanager")
|| content.contains("from contextlib import asynccontextmanager")
|| content.contains("import contextlib")
}
fn has_contextmanager_decorator(lines: &[&str], func_start: usize) -> bool {
for i in (0..func_start).rev() {
let trimmed = lines[i].trim();
if trimmed.is_empty() {
continue;
}
if trimmed.starts_with('@') {
return trimmed.contains("contextmanager");
}
if !trimmed.starts_with('@') {
break;
}
}
false
}
fn find_list_wrapped_generators(
&self,
_graph: &dyn crate::graph::GraphQuery,
files: &dyn crate::detectors::file_provider::FileProvider,
) -> HashSet<String> {
let mut wrapped = HashSet::new();
for path in files.files_with_extension("py") {
if let Some(content) = files.content(path) {
for cap in LIST_CALL.captures_iter(&content) {
if let Some(func_name) = cap.get(1) {
let name = func_name.as_str();
const BUILTINS: &[&str] = &[
"list",
"dict",
"set",
"tuple",
"str",
"int",
"float",
"bool",
"map",
"filter",
"range",
"zip",
"sorted",
"reversed",
"enumerate",
"iter",
"next",
"type",
"super",
"print",
"len",
"max",
"min",
"sum",
"any",
"all",
];
if !BUILTINS.contains(&name) {
wrapped.insert(name.to_string());
}
}
}
}
}
wrapped
}
fn is_consumed_lazily(
func_name: &str,
graph: &dyn crate::graph::GraphQuery,
func_map: &std::collections::HashMap<String, crate::graph::store_models::CodeNode>,
) -> bool {
let i = graph.interner();
if let Some(func) = func_map.get(func_name) {
let callers = graph.get_callers(func.qn(i));
for caller in callers {
if let Ok(content) = std::fs::read_to_string(caller.path(i)) {
let has_lazy = content.contains(&"for ".to_string())
&& content.contains(&format!("{}(", func_name));
let has_list = content.contains(&format!("list({}(", func_name));
if has_lazy && !has_list {
return true;
}
}
}
}
false
}
fn is_consumed_lazily_in_files(
func_name: &str,
files: &dyn crate::detectors::file_provider::FileProvider,
) -> bool {
let call_pattern = format!("{}(", func_name);
for path in files.files_with_extension("py") {
if let Some(content) = files.content(path) {
if content.contains(&call_pattern) {
for line in content.lines() {
let trimmed = line.trim();
if trimmed.starts_with("for ") && trimmed.contains(&call_pattern) {
return true;
}
}
}
}
}
false
}
}
impl Default for GeneratorMisuseDetector {
fn default() -> Self {
Self::new()
}
}
impl Detector for GeneratorMisuseDetector {
fn name(&self) -> &'static str {
"GeneratorMisuseDetector"
}
fn description(&self) -> &'static str {
"Detects single-yield generators that add unnecessary complexity"
}
fn config(&self) -> Option<&DetectorConfig> {
Some(&self.config)
}
fn file_extensions(&self) -> &'static [&'static str] {
&["py"]
}
fn detect(
&self,
ctx: &crate::detectors::analysis_context::AnalysisContext,
) -> Result<Vec<Finding>> {
let graph = ctx.graph;
let files = &ctx.as_file_provider();
let mut findings = vec![];
let list_wrapped = self.find_list_wrapped_generators(graph, files);
let mut func_map: Option<
std::collections::HashMap<String, crate::graph::store_models::CodeNode>,
> = None;
for path in files.files_with_extension("py") {
if findings.len() >= self.max_findings {
break;
}
let path_str = path.to_string_lossy().to_string();
if crate::detectors::base::is_test_path(&path_str) {
continue;
}
let raw = match files.content(path) {
Some(c) => c,
None => continue,
};
if !raw.contains("yield") {
continue;
}
if let Some(content) = Some(raw) {
let lines: Vec<&str> = content.lines().collect();
for (i, line) in lines.iter().enumerate() {
let prev_line = if i > 0 { Some(lines[i - 1]) } else { None };
if crate::detectors::is_line_suppressed(line, prev_line) {
continue;
}
if let Some(caps) = GENERATOR_DEF.captures(line) {
let func_name = caps.get(1).map(|m| m.as_str()).unwrap_or("");
let indent = line.chars().take_while(|c| c.is_whitespace()).count();
let (yield_count, yield_in_loop) = Self::count_yields(&lines, i, indent);
if yield_count == 0 {
continue;
}
if yield_count == 1 && !yield_in_loop {
if Self::has_contextmanager_decorator(&lines, i) {
continue;
}
if Self::has_framework_yield_import(&content) {
continue;
}
if Self::is_resource_management_yield(&lines, i, indent) {
continue;
}
let polymorphic_methods = [
"get_template_sources",
"subwidgets",
"chunks",
"__iter__",
"__aiter__",
"__next__",
"__anext__",
];
if polymorphic_methods.contains(&func_name)
|| func_name.starts_with("iter_")
{
continue;
}
findings.push(Finding {
id: String::new(),
detector: "GeneratorMisuseDetector".to_string(),
severity: Severity::Low,
title: format!("Single-yield generator: `{}`", func_name),
description: format!(
"Generator `{}` only yields once and not in a loop. \
Consider using a simple function with return instead.\n\n\
**Why it matters:** Single-yield generators add complexity \
without the lazy evaluation benefits.",
func_name
),
affected_files: vec![path.to_path_buf()],
line_start: Some((i + 1) as u32),
line_end: None,
suggested_fix: Some(format!(
"Convert to a simple function:\n\n\
```python\n\
# Instead of:\n\
def {}(...):\n\
yield some_value\n\
\n\
# Use:\n\
def {}(...):\n\
return some_value\n\
```",
func_name, func_name
)),
estimated_effort: Some("10 minutes".to_string()),
category: Some("code-quality".to_string()),
cwe_id: None,
why_it_matters: Some(
"Single-yield generators require callers to use next() or iterate, \
adding complexity without benefits.".to_string()
),
..Default::default()
});
}
if list_wrapped.contains(func_name)
&& !Self::is_consumed_lazily(
func_name,
graph,
func_map.get_or_insert_with(|| {
let gi = graph.interner();
graph
.get_functions()
.into_iter()
.map(|f| (f.node_name(gi).to_string(), f))
.collect()
}),
)
&& !Self::is_consumed_lazily_in_files(func_name, files)
{
findings.push(Finding {
id: String::new(),
detector: "GeneratorMisuseDetector".to_string(),
severity: Severity::Low,
title: format!("Generator always list()-wrapped: `{}`", func_name),
description: format!(
"Generator `{}` is always wrapped in `list()`, defeating lazy evaluation.\n\n\
**Analysis:** No callers consume this generator lazily.",
func_name
),
affected_files: vec![path.to_path_buf()],
line_start: Some((i + 1) as u32),
line_end: None,
suggested_fix: Some(format!(
"Consider returning a list directly:\n\n\
```python\n\
# Instead of:\n\
def {}(...):\n\
for item in items:\n\
yield transform(item)\n\
\n\
# result = list({}(...)) # Always converted\n\
\n\
# Use:\n\
def {}(...):\n\
return [transform(item) for item in items]\n\
```",
func_name, func_name, func_name
)),
estimated_effort: Some("15 minutes".to_string()),
category: Some("performance".to_string()),
cwe_id: None,
why_it_matters: Some(
"Generators wrapped in list() lose lazy evaluation benefits \
and add unnecessary overhead.".to_string()
),
..Default::default()
});
}
}
}
}
}
info!(
"GeneratorMisuseDetector found {} findings (graph-aware)",
findings.len()
);
Ok(findings)
}
}
impl crate::detectors::RegisteredDetector for GeneratorMisuseDetector {
fn create(init: &crate::detectors::DetectorInit) -> std::sync::Arc<dyn Detector> {
std::sync::Arc::new(Self::with_path(init.repo_path))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::builder::GraphBuilder;
#[test]
fn test_detects_single_yield_generator() {
let store = GraphBuilder::new().freeze();
let detector = GeneratorMisuseDetector::with_path("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("utils.py", "\ndef single_value():\n yield 42\n")],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(!findings.is_empty(), "Should detect single-yield generator");
assert!(findings
.iter()
.any(|f| f.title.contains("Single-yield generator")));
}
#[test]
fn test_no_finding_for_generator_with_loop() {
let store = GraphBuilder::new().freeze();
let detector = GeneratorMisuseDetector::with_path("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"utils.py",
"\ndef multi_yield(items):\n for item in items:\n yield item * 2\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag generator with yield inside a loop, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_fastapi_dependency() {
let store = GraphBuilder::new().freeze();
let detector = GeneratorMisuseDetector::with_path("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("deps.py", "from fastapi import Depends\n\ndef get_db():\n db = SessionLocal()\n try:\n yield db\n finally:\n db.close()\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag FastAPI try/yield/finally dependency. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_contextmanager() {
let store = GraphBuilder::new().freeze();
let detector = GeneratorMisuseDetector::with_path("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("utils.py", "from contextlib import contextmanager\n\n@contextmanager\ndef managed_resource():\n resource = acquire()\n try:\n yield resource\n finally:\n release(resource)\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag contextmanager try/yield/finally. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_yield_from() {
let store = GraphBuilder::new().freeze();
let detector = GeneratorMisuseDetector::new();
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"iterators.py",
"def __iter__(self):\n yield from self.items\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag yield from as single-yield. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_yield_in_string() {
let store = GraphBuilder::new().freeze();
let detector = GeneratorMisuseDetector::new();
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("paginator.py", "def _check(self):\n warnings.warn(\"Pagination may yield inconsistent results\")\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag 'yield' inside string literal. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_contextmanager_without_finally() {
let store = GraphBuilder::new().freeze();
let detector = GeneratorMisuseDetector::new();
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("errors.py", "from contextlib import contextmanager\n\n@contextmanager\ndef wrap_errors():\n try:\n yield\n except DatabaseError:\n raise\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag @contextmanager even without finally. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_polymorphic_single_yield() {
let store = GraphBuilder::new().freeze();
let detector = GeneratorMisuseDetector::with_path("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("loaders/locmem.py", "class Loader(BaseLoader):\n def get_template_sources(self, template_name):\n yield Origin(name=template_name, loader=self)\n"),
("widgets.py", "class Widget:\n def subwidgets(self, name, value):\n yield self.get_context(name, value)\n"),
("files/uploadedfile.py", "class InMemoryUploadedFile(UploadedFile):\n def chunks(self, chunk_size=None):\n yield self.read()\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag polymorphic interface methods. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
}