use super::*;
#[test]
fn test_fstring_sql_detection() {
let detector = SQLInjectionDetector::new();
assert_eq!(
detector.check_line_for_patterns(
r#"cursor.execute(f"SELECT * FROM users WHERE id={user_id}")"#
),
Some(("f-string", false))
);
assert!(detector
.check_line_for_patterns(r#"cursor.execute("SELECT * FROM users")"#)
.is_none());
}
#[test]
fn test_concat_sql_detection() {
let detector = SQLInjectionDetector::new();
assert_eq!(
detector.check_line_for_patterns(
r#"cursor.execute("SELECT * FROM users WHERE id=" + user_id)"#
),
Some(("concatenation", false))
);
}
#[test]
fn test_format_sql_detection() {
let detector = SQLInjectionDetector::new();
assert_eq!(
detector.check_line_for_patterns(
r#"cursor.execute("SELECT * FROM users WHERE id={}".format(user_id))"#
),
Some(("format", false))
);
}
#[test]
fn test_percent_sql_detection() {
let detector = SQLInjectionDetector::new();
assert_eq!(
detector.check_line_for_patterns(
r#"cursor.execute("SELECT * FROM users WHERE id=%s" % user_id)"#
),
Some(("percent_format", false))
);
}
#[test]
fn test_sql_context_detection() {
let detector = SQLInjectionDetector::new();
assert!(detector.is_sql_context("cursor.execute(query)"));
assert!(detector.is_sql_context("conn.execute(sql)"));
assert!(detector.is_sql_context("db.query(statement)"));
assert!(detector.is_sql_context("User.objects.raw(sql)"));
assert!(!detector.is_sql_context("print(message)"));
}
#[test]
fn test_js_template_sql_detection() {
let detector = SQLInjectionDetector::new();
assert_eq!(
detector.check_line_for_patterns(r#"db.query(`SELECT * FROM users WHERE id = ${userId}`)"#),
Some(("js_template", false))
);
assert_eq!(
detector.check_line_for_patterns(
r#"pool.execute(`INSERT INTO logs (msg) VALUES ('${message}')`)"#
),
Some(("js_template", false))
);
assert!(detector
.check_line_for_patterns(r#"db.query(`SELECT * FROM users`)"#)
.is_none());
}
#[test]
fn test_go_sprintf_sql_detection() {
let detector = SQLInjectionDetector::new();
assert_eq!(
detector.check_line_for_patterns(
r#"query := fmt.Sprintf("SELECT * FROM users WHERE id = %s", id)"#
),
Some(("go_sprintf", false))
);
assert_eq!(
detector.check_line_for_patterns(
r#"sql := fmt.Sprintf("DELETE FROM users WHERE id = %v", userId)"#
),
Some(("go_sprintf", false))
);
assert!(detector
.check_line_for_patterns(r#"msg := fmt.Sprintf("Hello %s", name)"#)
.is_none());
}
#[test]
fn test_js_sql_context_detection() {
let detector = SQLInjectionDetector::new();
assert!(detector.is_sql_context("pool.query(sql)"));
assert!(detector.is_sql_context("client.execute(query)"));
assert!(detector.is_sql_context("mysql.query(statement)"));
assert!(detector.is_sql_context("const result = await pg.query(sql)"));
}
#[test]
fn test_go_sql_context_detection() {
let detector = SQLInjectionDetector::new();
assert!(detector.is_sql_context("db.QueryRow(query)"));
assert!(detector.is_sql_context("db.Exec(sql)"));
assert!(detector.is_sql_context("db.Query(statement)"));
assert!(detector.is_sql_context(r#"query := fmt.Sprintf("SELECT * FROM users")"#));
}
#[test]
fn test_parameterized_placeholders_detection() {
let detector = SQLInjectionDetector::new();
assert!(detector.has_parameterized_placeholders("SELECT * FROM users WHERE id = @userId"));
assert!(detector.has_parameterized_placeholders("SELECT * FROM users WHERE id = $1"));
assert!(detector.has_parameterized_placeholders("SELECT * FROM users WHERE id = :id"));
assert!(detector.has_parameterized_placeholders("SELECT * FROM users WHERE id = ?"));
assert!(!detector.has_parameterized_placeholders("What? No placeholders here"));
}
#[test]
fn test_parameterized_query_co_occurrence_reduces_severity() {
let detector = SQLInjectionDetector::new();
let line = r#"db.query(`SELECT COUNT(*) as count FROM vehicles ${where} AND make = @make`)"#;
if let Some((pattern_type, is_likely_fp)) = detector.check_line_for_patterns(line) {
assert_eq!(pattern_type, "js_template");
assert!(
is_likely_fp,
"Should be marked as likely false positive due to @make placeholder"
);
} else {
panic!("Should detect js_template pattern");
}
}
#[test]
fn test_placeholder_generation_pattern_skipped() {
let detector = SQLInjectionDetector::new();
assert!(detector.check_line_for_patterns(
r#"const placeholders = ids.map(() => '?').join(','); db.query(`SELECT * FROM vehicles WHERE id IN (${placeholders})`)"#
).is_none(), "Should skip placeholder generation pattern");
assert!(
detector
.check_line_for_patterns(
r#"db.query(`SELECT * FROM items WHERE id IN (${ids.map(() => '?').join(',')})`)"#
)
.is_none(),
"Should skip inline placeholder generation"
);
assert!(detector.check_line_for_patterns(
r#"const qs = Array(10).fill('?').join(','); stmt = `SELECT * FROM t WHERE id IN (${qs})`"#
).is_none(), "Should skip Array.fill placeholder generation");
}
#[test]
fn test_sql_structure_variable_detection() {
let detector = SQLInjectionDetector::new();
assert!(detector.is_sql_structure_variable(r#"`SELECT * FROM users ${where}`"#));
assert!(detector.is_sql_structure_variable(r#"`SELECT * FROM users ORDER BY ${orderBy}`"#));
assert!(detector.is_sql_structure_variable(r#"`SELECT ${columns} FROM users`"#));
assert!(detector.is_sql_structure_variable(r#"`SELECT * FROM ${tableName}`"#));
assert!(detector.is_sql_structure_variable(r#"`SELECT * FROM users ${conditions}`"#));
assert!(
!detector.is_sql_structure_variable(r#"`SELECT * FROM users WHERE name = ${userName}`"#)
);
assert!(!detector.is_sql_structure_variable(r#"`SELECT * FROM users WHERE id = ${userId}`"#));
}
#[test]
fn test_sql_structure_variable_reduces_severity() {
let detector = SQLInjectionDetector::new();
let line = r#"db.query(`SELECT COUNT(*) as count FROM vehicles ${where}`)"#;
if let Some((pattern_type, is_likely_fp)) = detector.check_line_for_patterns(line) {
assert_eq!(pattern_type, "js_template");
assert!(
is_likely_fp,
"Should be marked as likely false positive due to where structure var"
);
} else {
panic!("Should detect js_template pattern");
}
let line2 = r#"db.query(`SELECT * FROM users WHERE name = '${userName}'`)"#;
if let Some((pattern_type, is_likely_fp)) = detector.check_line_for_patterns(line2) {
assert_eq!(pattern_type, "js_template");
assert!(
!is_likely_fp,
"Should NOT be marked as likely false positive"
);
} else {
panic!("Should detect js_template pattern");
}
}
#[test]
fn test_real_world_false_positive_case_1() {
let detector = SQLInjectionDetector::new();
let line = r#"db.query(`SELECT COUNT(*) as count FROM vehicles ${where}`, params)"#;
if let Some((pattern_type, is_likely_fp)) = detector.check_line_for_patterns(line) {
assert_eq!(pattern_type, "js_template");
assert!(
is_likely_fp,
"WHERE clause interpolation should be marked as likely FP"
);
} else {
panic!("Should detect pattern");
}
}
#[test]
fn test_real_world_false_positive_case_2() {
let detector = SQLInjectionDetector::new();
let line = r#"const placeholders = ids.map(() => '?').join(',');
db.query(`SELECT * FROM vehicles WHERE id IN (${placeholders})`, ...ids)"#;
assert!(
detector.check_line_for_patterns(line).is_none(),
"Placeholder generation for IN clause should be skipped"
);
}
#[test]
fn test_legitimate_sql_injection_still_detected() {
let detector = SQLInjectionDetector::new();
let line = r#"db.query(`SELECT * FROM users WHERE name = '${userInput}'`)"#;
if let Some((pattern_type, is_likely_fp)) = detector.check_line_for_patterns(line) {
assert_eq!(pattern_type, "js_template");
assert!(
!is_likely_fp,
"Real SQL injection should NOT be marked as likely FP"
);
} else {
panic!("Should detect SQL injection");
}
}
#[test]
fn test_better_sqlite3_patterns() {
let detector = SQLInjectionDetector::new();
let line1 = r#"const stmt = db.prepare('SELECT * FROM users WHERE id = ?'); stmt.get(userId);"#;
let line2 = r#"db.prepare('SELECT * FROM users WHERE id = @id').all({ id: userId });"#;
assert!(detector.check_line_for_patterns(line1).is_none());
assert!(detector.check_line_for_patterns(line2).is_none());
}
#[test]
fn test_no_finding_for_quote_name_sanitized() {
let detector = SQLInjectionDetector::new();
assert!(detector.is_sanitized_value(
r#"cursor.execute("SELECT * FROM %s" % connection.ops.quote_name(table_name))"#
));
}
#[test]
fn test_excludes_db_backend_paths() {
let detector = SQLInjectionDetector::new();
assert!(detector.should_exclude(std::path::Path::new(
"django/db/backends/postgresql/introspection.py"
)));
assert!(detector.should_exclude(std::path::Path::new("django/db/models/sql/compiler.py")));
assert!(detector.should_exclude(std::path::Path::new("django/core/cache/backends/db.py")));
assert!(!detector.should_exclude(std::path::Path::new("myapp/views.py")));
}
#[cfg(test)]
mod dual_branch_integration_tests {
use super::*;
use crate::config::DualBranchConfig;
use crate::graph::builder::GraphBuilder;
use std::collections::HashMap;
fn run_dual_branch(file: &str, content: &str) -> Vec<Finding> {
let store = GraphBuilder::new().freeze();
let detector = SQLInjectionDetector::with_repository_path(PathBuf::from("/mock/repo"));
let mut detectors = HashMap::new();
detectors.insert("sql-injection".to_string(), true);
let cfg = DualBranchConfig {
enabled: true,
detectors,
};
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(file, content)],
)
.with_dual_branch(cfg);
detector.detect(&ctx).expect("detection should succeed")
}
#[test]
fn flag_off_sql_injection_emits_single_branch_unchanged() {
let store = GraphBuilder::new().freeze();
let detector = SQLInjectionDetector::with_repository_path(PathBuf::from("/mock/repo"));
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"vuln.py",
"from flask import request\n\
def get_user():\n\
\x20 return cursor.execute(f\"SELECT * FROM u WHERE id = {request.form['id']}\")\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
for f in &findings {
assert!(
f.alternative_branch.is_none(),
"no alternative_branch when flag off: {:?}",
f.title
);
assert!(
f.prediction_reasons.iter().all(|r| r.weight == 0.0),
"no weight-bearing predictor reasons when flag off; got: {:?}",
f.prediction_reasons
.iter()
.map(|r| (&r.kind, r.weight))
.collect::<Vec<_>>()
);
}
}
#[test]
fn flag_on_case_a_parameterized_execute_collapses_benign() {
let findings = run_dual_branch(
"case_a.py",
"from flask import request\n\
@app.route('/u', methods=['POST'])\n\
def get_user():\n\
\x20 return cursor.execute(\"SELECT * FROM u WHERE id = %s\", (request.form['id'],))\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("must have dual-branch finding for Case A");
assert_eq!(
f.severity,
Severity::Info,
"Case A: parameterized execute should collapse to Info; got {:?}",
f.severity
);
let alt = f.alternative_branch.as_ref().unwrap();
assert_eq!(alt.label, crate::dual_branch::BranchLabel::RealBug);
}
#[test]
fn flag_on_case_b_fstring_execute_collapses_realbug_critical() {
let findings = run_dual_branch(
"case_b.py",
"from flask import request\n\
def get_user():\n\
\x20 return cursor.execute(f\"SELECT * FROM u WHERE id = {request.form['id']}\")\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("must have dual-branch finding for Case B");
assert_eq!(
f.severity,
Severity::Critical,
"Case B: f-string execute should collapse to Critical; got {:?}",
f.severity
);
let alt = f.alternative_branch.as_ref().unwrap();
assert_eq!(alt.label, crate::dual_branch::BranchLabel::Benign);
}
#[test]
fn flag_on_case_c_type_cast_laundered_format_unsafe_v0_limitation() {
let findings = run_dual_branch(
"case_c.py",
"from flask import request\n\
def get_user():\n\
\x20 return cursor.execute(\"SELECT * WHERE id = {}\".format(int(request.form['id'])))\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("must have dual-branch finding for Case C");
assert_eq!(
f.severity,
Severity::Critical,
"Case C (D5.3 limitation): .format() should still collapse to Critical \
even though int() was applied — annotation is the escape hatch; \
got {:?}",
f.severity
);
}
#[test]
fn flag_on_case_d_django_orm_filter_collapses_benign() {
let findings = run_dual_branch(
"case_d.py",
"from flask import request\n\
def list_users():\n\
\x20 return User.objects.filter(id=request.GET['id'])\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("must have dual-branch finding for Case D");
assert_eq!(
f.severity,
Severity::Info,
"Case D: Django ORM .filter() should collapse to Info; got {:?}",
f.severity
);
}
#[test]
fn flag_on_case_e_django_raw_concat_collapses_realbug_critical() {
let findings = run_dual_branch(
"case_e.py",
"from flask import request\n\
def list_users():\n\
\x20 return User.objects.raw(\"SELECT * FROM u WHERE id = \" + request.GET['id'])\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("must have dual-branch finding for Case E");
assert_eq!(
f.severity,
Severity::Critical,
"Case E (D1.c HEADLINE): .raw() with concat should collapse to Critical; got {:?}",
f.severity
);
let weight_reasons: Vec<_> = f
.prediction_reasons
.iter()
.filter(|r| r.weight != 0.0)
.collect();
assert!(
!weight_reasons.is_empty(),
"Case E must have at least one weight-bearing predictor reason"
);
}
#[test]
fn flag_on_case_f_static_literal_insert_collapses_benign() {
let findings = run_dual_branch(
"case_f.py",
"def init_db():\n\
\x20 return cursor.execute(\"INSERT INTO log VALUES ('static')\")\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("must have dual-branch finding for Case F");
assert_eq!(
f.severity,
Severity::Info,
"Case F: static literal should collapse to Info; got {:?}",
f.severity
);
}
#[test]
fn flag_on_case_g_opaque_variable_tiebreak() {
let findings = run_dual_branch(
"case_g.py",
"from flask import request\n\
def get_user():\n\
\x20 q = build_query(request.form)\n\
\x20 return cursor.execute(q)\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("must have dual-branch finding for Case G");
let weight_reasons: Vec<_> = f
.prediction_reasons
.iter()
.filter(|r| r.weight != 0.0)
.collect();
assert!(
!weight_reasons.is_empty(),
"Case G must produce weighted-scoring reasons (Ambiguous bucket)"
);
}
#[test]
fn flag_on_case_h_sqlalchemy_text_with_binds_collapses_benign() {
let findings = run_dual_branch(
"case_h.py",
"from flask import request\n\
def f():\n\
\x20 return db.execute(text(\"SELECT :id\"), {\"id\": request.form['id']})\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("must have dual-branch finding for Case H");
assert_eq!(
f.severity,
Severity::Info,
"Case H: SQLAlchemy text+binds should collapse to Info; got {:?}",
f.severity
);
}
#[test]
fn flag_on_sql_safe_annotation_collapses_benign() {
let findings = run_dual_branch(
"ann_safe.py",
"def f(q):\n\
\x20 return cursor.execute(q) # repotoire: sql-safe[whitelisted-table]\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("must have dual-branch finding for sql-safe annotation");
assert_eq!(
f.severity,
Severity::Info,
"sql-safe annotation should collapse to Info; got {:?}",
f.severity
);
}
#[test]
fn flag_on_sql_vulnerable_annotation_collapses_realbug() {
let findings = run_dual_branch(
"ann_vuln.py",
"def f():\n\
\x20 return cursor.execute(\"SELECT 1\") # repotoire: sql-vulnerable[helper-built]\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("must have dual-branch finding for sql-vulnerable annotation");
assert_eq!(
f.severity,
Severity::Critical,
"sql-vulnerable annotation should collapse to Critical; got {:?}",
f.severity
);
}
#[test]
fn flag_on_non_python_unchanged_per_d5_scope() {
let findings = run_dual_branch(
"vuln.js",
"const userId = req.params.id;\n\
db.query(`SELECT * FROM users WHERE id = ${userId}`);\n",
);
for f in &findings {
assert!(
!f.is_dual_branch(),
"JS file must not emit dual-branch findings in v0 (D5.1); got: {:?}",
f.title
);
}
}
}
#[cfg(test)]
mod dual_branch_real_world_tests {
use super::*;
use crate::config::DualBranchConfig;
use crate::graph::builder::GraphBuilder;
use std::collections::HashMap;
fn run(file: &str, content: &str) -> Vec<Finding> {
let store = GraphBuilder::new().freeze();
let detector = SQLInjectionDetector::with_repository_path(PathBuf::from("/mock/repo"));
let mut detectors = HashMap::new();
detectors.insert("sql-injection".to_string(), true);
let cfg = DualBranchConfig {
enabled: true,
detectors,
};
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(file, content)],
)
.with_dual_branch(cfg);
detector.detect(&ctx).expect("detection should succeed")
}
#[test]
fn real_world_django_view_with_orm_filter() {
let findings = run(
"users/views.py",
"from rest_framework.views import APIView\n\
from rest_framework.response import Response\n\
from .models import User\n\
\n\
class UserListView(APIView):\n\
\x20 def get(self, request):\n\
\x20 active = request.query_params.get('active', 'true')\n\
\x20 role = request.query_params.get('role')\n\
\x20 qs = User.objects.filter(active=(active == 'true'))\n\
\x20 if role:\n\
\x20 qs = qs.filter(role=role)\n\
\x20 return Response(list(qs.values()))\n",
);
let dual_findings: Vec<_> = findings.iter().filter(|f| f.is_dual_branch()).collect();
assert!(
!dual_findings.is_empty(),
"Django ORM view should produce at least one dual-branch finding; got {} findings",
findings.len()
);
for f in dual_findings {
assert_eq!(
f.severity,
Severity::Info,
"Django ORM .filter() should collapse to Info; got {:?} for {:?}",
f.severity,
f.title
);
}
}
#[test]
fn real_world_flask_route_with_fstring_injection() {
let findings = run(
"app/routes.py",
"from flask import Flask, request, jsonify\n\
import sqlite3\n\
\n\
app = Flask(__name__)\n\
\n\
@app.route('/search')\n\
def search():\n\
\x20 q = request.args.get('q', '')\n\
\x20 conn = sqlite3.connect('app.db')\n\
\x20 cursor = conn.cursor()\n\
\x20 cursor.execute(f\"SELECT * FROM products WHERE name LIKE '%{q}%'\")\n\
\x20 return jsonify(cursor.fetchall())\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("Flask route with f-string SQL must produce dual-branch finding");
assert_eq!(
f.severity,
Severity::Critical,
"Flask f-string SQL injection should be Critical; got {:?}",
f.severity
);
}
#[test]
fn real_world_django_raw_concat_full_view() {
let findings = run(
"reports/views.py",
"from django.shortcuts import render\n\
from django.views.decorators.http import require_GET\n\
from .models import Order\n\
\n\
@require_GET\n\
def search_orders(request):\n\
\x20 customer = request.GET.get('customer', '')\n\
\x20 # Developer thought .raw() was \"safer than building strings\"\n\
\x20 results = Order.objects.raw(\n\
\x20 \"SELECT * FROM reports_order WHERE customer LIKE '%\" + customer + \"%'\"\n\
\x20 )\n\
\x20 return render(request, 'orders/list.html', {'orders': results})\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("Django .raw() with concat must produce dual-branch finding");
assert_eq!(
f.severity,
Severity::Critical,
"Django .raw() with concat is the Phase 2j headline — must be Critical; got {:?}",
f.severity
);
assert!(
f.title.contains("raw") || f.description.contains(".raw"),
"title/description should reference .raw(); got title={:?}",
f.title
);
}
#[test]
fn real_world_sqlalchemy_text_with_bound_params_full_module() {
let findings = run(
"services/order_lookup.py",
"from sqlalchemy import text\n\
from sqlalchemy.orm import Session\n\
\n\
def find_order_by_external_ref(session: Session, customer_id: int, external_ref: str):\n\
\x20 return session.execute(\n\
\x20 text(\n\
\x20 \"SELECT id, status FROM orders \"\n\
\x20 \"WHERE customer_id = :cust AND external_ref = :ref\"\n\
\x20 ),\n\
\x20 {\"cust\": customer_id, \"ref\": external_ref},\n\
\x20 ).first()\n",
);
let dual: Vec<_> = findings.iter().filter(|f| f.is_dual_branch()).collect();
assert!(
!dual.is_empty(),
"SQLAlchemy text+binds must produce a dual-branch finding"
);
for f in dual {
assert_eq!(
f.severity,
Severity::Info,
"SQLAlchemy text+binds should collapse to Info; got {:?} for {:?}",
f.severity,
f.title
);
}
}
}
#[cfg(test)]
mod blocking_tier_tests {
use super::*;
use crate::detectors::security::taint::{TaintCategory, TaintPath};
use crate::graph::builder::GraphBuilder;
use crate::models::{Evidence, Tier};
fn make_sql_taint_path(
source_file: &str,
source_line: u32,
sink_file: &str,
sink_line: u32,
is_sanitized: bool,
) -> TaintPath {
TaintPath {
source_function: "handle_request".to_string(),
source_file: source_file.to_string(),
source_line,
sink_function: "cursor.execute".to_string(),
sink_file: sink_file.to_string(),
sink_line,
category: TaintCategory::SqlInjection,
call_chain: vec![],
is_sanitized,
sanitizer: None,
confidence: 0.95,
sink_callee_text: "cursor.execute".to_string(),
sanitizers_on_path: vec![],
}
}
#[test]
fn taint_to_dangerous_sink_is_blocking() {
let store = GraphBuilder::new().freeze();
let detector = SQLInjectionDetector::with_repository_path(PathBuf::from("/mock/repo"));
let content = "def h(request):\n\
\x20 db.cursor().execute(\"SELECT * FROM users WHERE id = \" + request.args['id'])\n";
let taint = make_sql_taint_path("vuln.py", 1, "vuln.py", 2, false);
detector.set_precomputed_taint(vec![], vec![taint]);
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("vuln.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
let blocking: Vec<_> = findings
.iter()
.filter(|f| f.tier == Tier::Blocking)
.collect();
assert!(
!blocking.is_empty(),
"Expected at least one Blocking finding; all findings: {:?}",
findings
.iter()
.map(|f| (&f.tier, &f.title))
.collect::<Vec<_>>()
);
let f = blocking[0];
assert_eq!(f.tier, Tier::Blocking, "finding tier must be Blocking");
assert!(
matches!(
f.evidence,
Some(Evidence::TaintPath { ref sink_kind, .. }) if sink_kind == "sql_exec"
),
"evidence must be TaintPath with sink_kind=\"sql_exec\"; got {:?}",
f.evidence
);
assert!(f.deterministic, "blocking finding must be deterministic");
assert!(
f.confidence.unwrap_or(0.0) >= 0.90,
"blocking finding must have confidence >= 0.90; got {:?}",
f.confidence
);
assert!(
matches!(f.severity, Severity::Critical | Severity::High),
"blocking finding must be Critical or High; got {:?}",
f.severity
);
}
#[test]
fn parameterized_query_is_advisory() {
let store = GraphBuilder::new().freeze();
let detector = SQLInjectionDetector::with_repository_path(PathBuf::from("/mock/repo"));
let content = "def h(request):\n\
\x20 cursor.execute(\"SELECT * FROM users WHERE id = %s\", (request.args['id'],))\n";
let taint = make_sql_taint_path("safe.py", 1, "safe.py", 2, true);
detector.set_precomputed_taint(vec![], vec![taint]);
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("safe.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
for f in &findings {
assert_ne!(
f.tier,
Tier::Blocking,
"sanitized path must not be Blocking; finding: {:?}",
f.title
);
if matches!(f.tier, Tier::Blocking) {
assert!(
f.evidence.is_none(),
"non-blocking finding must have no TaintPath evidence; got {:?}",
f.evidence
);
}
}
}
#[test]
fn line_heuristic_match_is_advisory() {
let store = GraphBuilder::new().freeze();
let detector = SQLInjectionDetector::with_repository_path(PathBuf::from("/mock/repo"));
let content = "cursor.execute(\"SELECT * FROM users WHERE id=\" + user_id)\n";
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("heuristic.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Line heuristic should produce at least one finding"
);
for f in &findings {
assert_ne!(
f.tier,
Tier::Blocking,
"line heuristic finding must not be Blocking; got tier={:?} for {:?}",
f.tier,
f.title
);
assert!(
f.evidence.is_none(),
"line heuristic finding must have no Evidence; got {:?}",
f.evidence
);
}
}
}