//! Django Security Detector
//!
//! Graph-enhanced detection of Django security issues.
//! Uses graph to:
//! - Identify which views are affected by csrf_exempt
//! - Check if raw SQL is in exposed endpoints
//! - Trace authentication/authorization coverage
use crate::detectors::base::{Detector, DetectorConfig};
use crate::graph::GraphQueryExt;
use crate::models::{deterministic_finding_id, Finding, Severity};
use anyhow::Result;
use regex::Regex;
use std::path::PathBuf;
use std::sync::LazyLock;
use tracing::info;
static CSRF_EXEMPT: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"@csrf_exempt|csrf_exempt\(").expect("valid regex"));
static DEBUG_TRUE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"DEBUG\s*=\s*True").expect("valid regex"));
static RAW_SQL: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"\.raw\(|\.extra\(|RawSQL\(|cursor\.execute").expect("valid regex")
});
static SECRET_KEY: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r#"SECRET_KEY\s*=\s*['"][^'"]{10,}['"]"#).expect("valid regex"));
static ALLOWED_HOSTS: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r#"ALLOWED_HOSTS\s*=\s*\[\s*['"][*]['"]"#).expect("valid regex"));
pub struct DjangoSecurityDetector {
#[allow(dead_code)] // Part of detector pattern, used for file scanning
repository_path: PathBuf,
max_findings: usize,
}
impl DjangoSecurityDetector {
crate::detectors::detector_new!(50);
/// Find containing function/view using graph trait method
fn find_containing_function(
graph: &dyn crate::graph::GraphQuery,
file_path: &str,
line: u32,
) -> Option<(String, usize, bool)> {
let i = graph.interner();
graph.find_function_at(file_path, line).map(|f| {
let callers = graph.get_callers(f.qn(i));
let name_lower = f.node_name(i).to_lowercase();
// Check if this is a view function
let is_view = name_lower.contains("view")
|| name_lower.starts_with("get")
|| name_lower.starts_with("post")
|| name_lower.starts_with("put")
|| name_lower.starts_with("delete")
|| name_lower.starts_with("patch")
|| name_lower.contains("api")
|| name_lower.contains("handler");
(f.node_name(i).to_string(), callers.len(), is_view)
})
}
/// Check if function has authentication decorators
fn has_auth_decorator(lines: &[&str], func_line: usize) -> bool {
// Look backwards for decorators
let start = func_line.saturating_sub(5);
let context = lines[start..func_line].join(" ").to_lowercase();
context.contains("@login_required")
|| context.contains("@permission_required")
|| context.contains("@user_passes_test")
|| context.contains("@staff_member_required")
|| context.contains("@authentication_classes")
|| context.contains("@permission_classes")
}
}
impl Detector for DjangoSecurityDetector {
fn name(&self) -> &'static str {
"django-security"
}
fn description(&self) -> &'static str {
"Detects Django security issues"
}
fn bypass_postprocessor(&self) -> bool {
true
}
fn file_extensions(&self) -> &'static [&'static str] {
&["py"]
}
fn content_requirements(&self) -> crate::detectors::detector_context::ContentFlags {
crate::detectors::detector_context::ContentFlags::HAS_DJANGO
}
fn detect(
&self,
ctx: &crate::detectors::analysis_context::AnalysisContext,
) -> Result<Vec<Finding>> {
let graph = ctx.graph;
let files = &ctx.as_file_provider();
// Codebase-level pre-filter: skip if no file uses Django
let has_django = files.files_with_extension("py").iter().any(|p| {
files.content(p).is_some_and(|c| {
c.contains("django")
|| c.contains("ALLOWED_HOSTS")
|| c.contains("SECRET_KEY")
|| c.contains("csrf_exempt")
|| c.contains(".objects.")
|| c.contains("cursor.execute")
})
});
if !has_django {
return Ok(vec![]);
}
let mut findings = vec![];
for path in files.files_with_extension("py") {
if findings.len() >= self.max_findings {
break;
}
let path_str = path.to_string_lossy().to_string();
if let Some(content) = files.content(path) {
let lines: Vec<&str> = content.lines().collect();
let fname = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
for (i, line) in lines.iter().enumerate() {
let prev_line = if i > 0 { Some(lines[i - 1]) } else { None };
if crate::detectors::is_line_suppressed(line, prev_line) {
continue;
}
let line_num = (i + 1) as u32;
// Check CSRF exemption
if CSRF_EXEMPT.is_match(line) {
// Skip decorator definition modules (e.g. django/views/decorators/csrf.py)
if path_str.contains("decorators/csrf") {
continue;
}
// Skip comments mentioning csrf_exempt
let trimmed = line.trim();
if trimmed.starts_with('#') || trimmed.starts_with("//") {
continue;
}
let func_context =
Self::find_containing_function(graph, &path_str, line_num);
let has_auth = Self::has_auth_decorator(&lines, i);
// Severity based on context
let severity = if has_auth {
Severity::Medium // At least has auth
} else if func_context
.as_ref()
.map(|(_, _, is_view)| *is_view)
.unwrap_or(false)
{
Severity::Critical // View without CSRF or auth
} else {
Severity::High
};
let mut notes = Vec::new();
if let Some((func_name, callers, is_view)) = &func_context {
notes.push(format!(
"📦 Function: `{}` ({} callers)",
func_name, callers
));
if *is_view {
notes.push("🌐 Appears to be a view function".to_string());
}
}
if has_auth {
notes.push("✅ Has authentication decorator".to_string());
} else {
notes.push("❌ No authentication decorator found".to_string());
}
let context_notes = if notes.is_empty() {
String::new()
} else {
format!("\n\n**Analysis:**\n{}", notes.join("\n"))
};
findings.push(Finding {
id: String::new(),
detector: "DjangoSecurityDetector".to_string(),
severity,
title: "CSRF protection disabled".to_string(),
description: format!(
"@csrf_exempt removes CSRF protection from this view.{}",
context_notes
),
affected_files: vec![path.to_path_buf()],
line_start: Some(line_num),
line_end: Some(line_num),
suggested_fix: Some(
"Options:\n\
1. Remove @csrf_exempt and use CSRF tokens properly\n\
2. If this is an API endpoint, use DRF authentication:\n\
```python\n\
@api_view(['POST'])\n\
@authentication_classes([TokenAuthentication])\n\
def my_view(request):\n\
...\n\
```"
.to_string(),
),
estimated_effort: Some("20 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-352".to_string()),
why_it_matters: Some(
"CSRF exemption allows attackers to trick users into performing \
unintended actions through malicious websites or links."
.to_string(),
),
..Default::default()
});
}
// Check DEBUG setting
if DEBUG_TRUE.is_match(line)
&& fname.contains("settings")
&& !fname.contains("dev")
&& !fname.contains("local")
&& !crate::detectors::base::is_test_path(&path_str)
{
findings.push(Finding {
id: String::new(),
detector: "DjangoSecurityDetector".to_string(),
severity: Severity::Critical,
title: "DEBUG = True in settings".to_string(),
description: format!(
"Debug mode is enabled in `{}`.\n\n\
**Impact:**\n\
• Stack traces exposed to users\n\
• Configuration details leaked\n\
• Database queries visible\n\
• Template variables exposed",
fname
),
affected_files: vec![path.to_path_buf()],
line_start: Some(line_num),
line_end: Some(line_num),
suggested_fix: Some(
"Use environment variables:\n\
```python\n\
DEBUG = os.environ.get('DEBUG', 'False').lower() == 'true'\n\
```\n\
Or use django-environ:\n\
```python\n\
import environ\n\
env = environ.Env(DEBUG=(bool, False))\n\
DEBUG = env('DEBUG')\n\
```"
.to_string(),
),
estimated_effort: Some("5 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-215".to_string()),
why_it_matters: Some(
"Debug mode leaks sensitive information to attackers.".to_string(),
),
..Default::default()
});
}
// Check hardcoded SECRET_KEY
if SECRET_KEY.is_match(line)
&& !line.contains("os.environ")
&& !line.contains("env(")
&& fname.contains("settings")
&& !fname.contains("dev")
&& !fname.contains("local")
&& !crate::detectors::base::is_test_path(&path_str)
{
findings.push(Finding {
id: String::new(),
detector: "DjangoSecurityDetector".to_string(),
severity: Severity::Critical,
title: "Hardcoded SECRET_KEY".to_string(),
description: "SECRET_KEY is hardcoded in settings file.".to_string(),
affected_files: vec![path.to_path_buf()],
line_start: Some(line_num),
line_end: Some(line_num),
suggested_fix: Some(
"Move to environment variable:\n\
```python\n\
SECRET_KEY = os.environ['SECRET_KEY']\n\
```"
.to_string(),
),
estimated_effort: Some("5 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-798".to_string()),
why_it_matters: Some(
"Leaked SECRET_KEY allows session hijacking and data tampering."
.to_string(),
),
..Default::default()
});
}
// Check ALLOWED_HOSTS wildcard
if ALLOWED_HOSTS.is_match(line)
&& fname.contains("settings")
&& !fname.contains("dev")
&& !fname.contains("local")
{
findings.push(Finding {
id: String::new(),
detector: "DjangoSecurityDetector".to_string(),
severity: Severity::High,
title: "ALLOWED_HOSTS allows all hosts".to_string(),
description: "ALLOWED_HOSTS = ['*'] allows any host.".to_string(),
affected_files: vec![path.to_path_buf()],
line_start: Some(line_num),
line_end: Some(line_num),
suggested_fix: Some("Specify allowed hosts explicitly.".to_string()),
estimated_effort: Some("5 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-16".to_string()),
why_it_matters: Some("Allows HTTP Host header attacks.".to_string()),
..Default::default()
});
}
// Check raw SQL
if RAW_SQL.is_match(line) {
// Skip ORM/database-internal paths — these files ARE the database layer
let lower_path = path_str.to_lowercase();
if lower_path.contains("db/backends/")
|| lower_path.contains("db/models/sql/")
|| lower_path.contains("db/models/expressions")
|| lower_path.contains("db/models/constraints")
|| lower_path.contains("db/models/fields/")
|| lower_path.contains("db/models/query")
|| lower_path.contains("db/migrations/")
|| lower_path.contains("core/cache/backends/")
|| lower_path.contains("/migrations/")
|| lower_path.starts_with("migrations/")
|| lower_path.contains("contrib/postgres/")
|| lower_path.contains("management/commands/")
|| lower_path.ends_with("management.py")
{
continue;
}
let func_context =
Self::find_containing_function(graph, &path_str, line_num);
// Check for user input
// Removed: || line.contains("+ ") -- too broad, triggers on safe concatenation
let has_user_input = line.contains("request.")
|| line.contains("f\"")
|| line.contains("f'")
|| line.contains(".format(");
let severity = if has_user_input {
Severity::Critical
} else if func_context
.as_ref()
.map(|(_, _, is_view)| *is_view)
.unwrap_or(false)
{
Severity::High // In a view = exposed
} else {
Severity::Medium
};
let mut notes = Vec::new();
if has_user_input {
notes.push(
"⚠️ String interpolation detected - possible SQL injection"
.to_string(),
);
}
if let Some((func_name, callers, is_view)) = &func_context {
notes.push(format!(
"📦 In function: `{}` ({} callers)",
func_name, callers
));
if *is_view {
notes.push("🌐 In view function (exposed)".to_string());
}
}
let context_notes = if notes.is_empty() {
String::new()
} else {
format!("\n\n**Analysis:**\n{}", notes.join("\n"))
};
findings.push(Finding {
id: String::new(),
detector: "DjangoSecurityDetector".to_string(),
severity,
title: "Raw SQL usage".to_string(),
description: format!(
"Raw SQL bypasses Django's ORM protections.{}",
context_notes
),
affected_files: vec![path.to_path_buf()],
line_start: Some(line_num),
line_end: Some(line_num),
suggested_fix: Some(
"Use Django ORM methods or parameterized queries:\n\
```python\n\
# Instead of:\n\
cursor.execute(f\"SELECT * FROM users WHERE id = {user_id}\")\n\
\n\
# Use:\n\
User.objects.filter(id=user_id)\n\
# Or:\n\
cursor.execute(\"SELECT * FROM users WHERE id = %s\", [user_id])\n\
```"
.to_string(),
),
estimated_effort: Some("30 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-89".to_string()),
why_it_matters: Some(
"Raw SQL with user input can lead to SQL injection.".to_string(),
),
..Default::default()
});
}
}
}
}
info!(
"DjangoSecurityDetector found {} findings (graph-aware)",
findings.len()
);
Ok(findings)
}
}
impl crate::detectors::RegisteredDetector for DjangoSecurityDetector {
fn create(init: &crate::detectors::DetectorInit) -> std::sync::Arc<dyn Detector> {
std::sync::Arc::new(Self::new(init.repo_path))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::builder::GraphBuilder;
#[test]
fn test_detects_csrf_exempt() {
let store = GraphBuilder::new().freeze();
let detector = DjangoSecurityDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("views.py", "from django.views.decorators.csrf import csrf_exempt\n\n@csrf_exempt\ndef webhook(request):\n return JsonResponse({\"ok\": True})\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(!findings.is_empty(), "Should detect @csrf_exempt usage");
assert!(
findings.iter().any(|f| f.title.contains("CSRF")),
"Finding should mention CSRF. Titles: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
assert!(
findings
.iter()
.any(|f| f.cwe_id.as_deref() == Some("CWE-352")),
"Finding should have CWE-352"
);
}
#[test]
fn test_detects_debug_true() {
// File name must contain "settings" but not "dev" or "local"
let store = GraphBuilder::new().freeze();
let detector = DjangoSecurityDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"settings.py",
"import os\n\nDEBUG = True\nALLOWED_HOSTS = []\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.iter().any(|f| f.title.contains("DEBUG")),
"Should detect DEBUG = True in settings.py. Titles: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
assert!(
findings
.iter()
.any(|f| f.title.contains("DEBUG") && f.severity == Severity::Critical),
"DEBUG = True should be Critical severity"
);
}
#[test]
fn test_detects_raw_sql() {
let store = GraphBuilder::new().freeze();
let detector = DjangoSecurityDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("queries.py", "from django.db import connection\n\ndef get_users(name):\n cursor = connection.cursor()\n cursor.execute(\"SELECT * FROM users WHERE name = %s\", [name])\n return cursor.fetchall()\n\ndef get_posts():\n return Post.objects.raw(\"SELECT * FROM posts\")\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
let raw_sql_findings: Vec<_> = findings
.iter()
.filter(|f| f.title.contains("Raw SQL"))
.collect();
assert!(
raw_sql_findings.len() >= 2,
"Should detect both cursor.execute and .raw() usage, found {}",
raw_sql_findings.len()
);
assert!(
raw_sql_findings
.iter()
.any(|f| f.cwe_id.as_deref() == Some("CWE-89")),
"Raw SQL findings should have CWE-89"
);
}
#[test]
fn test_detects_wildcard_allowed_hosts() {
let store = GraphBuilder::new().freeze();
let detector = DjangoSecurityDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("settings.py", "import os\n\nSECRET_KEY = os.environ['SECRET_KEY']\nDEBUG = False\nALLOWED_HOSTS = ['*']\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.iter().any(|f| f.title.contains("ALLOWED_HOSTS")),
"Should detect ALLOWED_HOSTS = ['*']. Titles: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
assert!(
findings
.iter()
.any(|f| f.title.contains("ALLOWED_HOSTS") && f.severity == Severity::High),
"Wildcard ALLOWED_HOSTS should be High severity"
);
}
#[test]
fn test_clean_django_no_findings() {
let store = GraphBuilder::new().freeze();
let detector = DjangoSecurityDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("views.py", "from django.http import JsonResponse\nfrom django.views.decorators.http import require_POST\nfrom django.contrib.auth.decorators import login_required\n\n@login_required\n@require_POST\ndef create_item(request):\n name = request.POST.get('name')\n item = Item.objects.create(name=name)\n return JsonResponse({\"id\": item.id})\n\ndef list_items(request):\n items = Item.objects.filter(active=True).values('id', 'name')\n return JsonResponse({\"items\": list(items)})\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Clean Django view code should produce no findings, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_debug_in_test_settings() {
let store = GraphBuilder::new().freeze();
let detector = DjangoSecurityDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"tests/settings.py",
"DEBUG = True\nSECRET_KEY = 'test-secret-key-for-testing'\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Should not flag DEBUG=True or SECRET_KEY in test settings. Found: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_raw_sql_concat_not_critical_without_user_input() {
let store = GraphBuilder::new().freeze();
let detector = DjangoSecurityDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("queries.py", "def get_table_data(table_name):\n return Model.objects.raw(\"SELECT * FROM \" + table_name)\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
let raw_sql: Vec<_> = findings
.iter()
.filter(|f| f.title.contains("Raw SQL"))
.collect();
assert!(!raw_sql.is_empty(), "Should still detect raw SQL usage");
assert!(
!raw_sql.iter().any(|f| f.severity == Severity::Critical),
"Raw SQL with '+ ' alone (no request./f-string) should not be Critical. Got: {:?}",
raw_sql
.iter()
.map(|f| (&f.title, &f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_no_raw_sql_finding_for_db_backend() {
let store = GraphBuilder::new().freeze();
let detector = DjangoSecurityDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("db/backends/postgresql/introspection.py", "def get_table_list(self, cursor):\n cursor.execute(\"SELECT c.relname FROM pg_catalog.pg_class c\")\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
let raw_sql_findings: Vec<_> = findings
.iter()
.filter(|f| f.title.contains("Raw SQL"))
.collect();
assert!(
raw_sql_findings.is_empty(),
"Should not flag raw SQL in db/backends/. Found: {:?}",
raw_sql_findings
.iter()
.map(|f| &f.title)
.collect::<Vec<_>>()
);
}
#[test]
fn test_still_detects_debug_in_production_settings() {
let store = GraphBuilder::new().freeze();
let detector = DjangoSecurityDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"settings.py",
"import os\n\nDEBUG = True\nALLOWED_HOSTS = []\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.iter().any(|f| f.title.contains("DEBUG")),
"Should still detect DEBUG = True in production settings"
);
}
#[test]
fn test_no_raw_sql_finding_for_orm_internals() {
let store = GraphBuilder::new().freeze();
let detector = DjangoSecurityDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("db/models/constraints.py", "def as_sql(self, compiler, connection):\n return cursor.execute(sql)\n"),
("db/models/query.py", "class QuerySet:\n def raw(self, raw_query):\n return RawSQL(raw_query)\n"),
("contrib/postgres/operations.py", "def database_forwards(self):\n cursor.execute(\"CREATE EXTENSION\")\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
let raw_sql_findings: Vec<_> = findings
.iter()
.filter(|f| f.title.contains("Raw SQL"))
.collect();
assert!(
raw_sql_findings.is_empty(),
"Should not flag raw SQL in ORM internals. Found: {:?}",
raw_sql_findings
.iter()
.map(|f| (&f.title, &f.affected_files))
.collect::<Vec<_>>()
);
}
#[test]
fn test_no_csrf_finding_for_decorator_definition_module() {
let store = GraphBuilder::new().freeze();
let detector = DjangoSecurityDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("views/decorators/csrf.py", "from functools import wraps\n\ndef csrf_exempt(view_func):\n @wraps(view_func)\n def wrapper(*args, **kwargs):\n return view_func(*args, **kwargs)\n wrapper.csrf_exempt = True\n return wrapper\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
let csrf_findings: Vec<_> = findings
.iter()
.filter(|f| f.title.contains("CSRF"))
.collect();
assert!(
csrf_findings.is_empty(),
"Should not flag CSRF in decorator definition module. Found: {:?}",
csrf_findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_csrf_finding_for_comment() {
let store = GraphBuilder::new().freeze();
let detector = DjangoSecurityDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("views/base.py", "class View:\n # Copy possible attributes set by decorators, e.g. @csrf_exempt\n pass\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
let csrf_findings: Vec<_> = findings
.iter()
.filter(|f| f.title.contains("CSRF"))
.collect();
assert!(
csrf_findings.is_empty(),
"Should not flag CSRF mentioned in comments. Found: {:?}",
csrf_findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_raw_sql_finding_for_management_command() {
let store = GraphBuilder::new().freeze();
let detector = DjangoSecurityDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("management/commands/loaddata.py", "class Command(BaseCommand):\n def handle(self):\n cursor.execute(line)\n"),
("contrib/sites/management.py", "def create_default_site(app_config):\n cursor.execute(command)\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
let raw_sql_findings: Vec<_> = findings
.iter()
.filter(|f| f.title.contains("Raw SQL"))
.collect();
assert!(
raw_sql_findings.is_empty(),
"Should not flag raw SQL in management commands. Found: {:?}",
raw_sql_findings
.iter()
.map(|f| &f.title)
.collect::<Vec<_>>()
);
}
#[test]
fn test_raw_sql_with_fstring_is_critical() {
// cursor.execute(f"SELECT ... {user_input}") should be Critical severity
// because the f-string indicates user input interpolation.
let store = GraphBuilder::new().freeze();
let detector = DjangoSecurityDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![(
"views.py",
"def search_users(request):\n name = request.GET['name']\n cursor.execute(f\"SELECT * FROM users WHERE name = '{name}'\")\n return JsonResponse({'results': cursor.fetchall()})\n",
)]);
let findings = detector.detect(&ctx).expect("detection should succeed");
let raw_sql: Vec<_> = findings
.iter()
.filter(|f| f.title.contains("Raw SQL"))
.collect();
assert!(!raw_sql.is_empty(), "Should detect raw SQL with f-string");
assert!(
raw_sql.iter().any(|f| f.severity == Severity::Critical),
"Raw SQL with f-string interpolation should be Critical. Got: {:?}",
raw_sql
.iter()
.map(|f| (&f.title, &f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_parameterized_query_still_detected_but_not_critical() {
// cursor.execute("SELECT ... WHERE id=%s", [id]) is parameterized —
// the detector still flags cursor.execute but at Medium severity
// (no f-string/request. interpolation markers).
let store = GraphBuilder::new().freeze();
let detector = DjangoSecurityDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![(
"queries.py",
"def get_user(user_id):\n cursor = connection.cursor()\n cursor.execute(\"SELECT * FROM users WHERE id = %s\", [user_id])\n return cursor.fetchone()\n",
)]);
let findings = detector.detect(&ctx).expect("detection should succeed");
let raw_sql: Vec<_> = findings
.iter()
.filter(|f| f.title.contains("Raw SQL"))
.collect();
assert!(
!raw_sql.is_empty(),
"Parameterized cursor.execute is still flagged as raw SQL"
);
assert!(
!raw_sql.iter().any(|f| f.severity == Severity::Critical),
"Parameterized query without user input markers should NOT be Critical. Got: {:?}",
raw_sql
.iter()
.map(|f| (&f.title, &f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_safe_orm_rendering_no_findings() {
// Standard Django ORM queries and template rendering should be clean.
let store = GraphBuilder::new().freeze();
let detector = DjangoSecurityDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![(
"views.py",
"from django.shortcuts import render\nfrom .models import Article\n\ndef article_list(request):\n articles = Article.objects.filter(published=True).order_by('-date')\n return render(request, 'articles/list.html', {'articles': articles})\n\ndef article_detail(request, pk):\n article = Article.objects.get(pk=pk)\n return render(request, 'articles/detail.html', {'article': article})\n",
)]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Safe ORM queries and template rendering should produce no findings, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_hardcoded_secret_key_in_settings() {
let store = GraphBuilder::new().freeze();
let detector = DjangoSecurityDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![(
"settings.py",
"SECRET_KEY = 'django-insecure-abc123xyz789def456ghi'\nDEBUG = False\nALLOWED_HOSTS = ['example.com']\n",
)]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.iter().any(|f| f.title.contains("SECRET_KEY")),
"Should detect hardcoded SECRET_KEY in production settings. Titles: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
assert!(
findings
.iter()
.any(|f| f.title.contains("SECRET_KEY") && f.severity == Severity::Critical),
"Hardcoded SECRET_KEY should be Critical severity"
);
}
}