use super::annotation::parse_python_comment;
use crate::dual_branch::{
AlternativeBranch, BranchLabel, PredictionReason, PredictionReasonKind, ResolutionKind,
ResolutionSignal,
};
use crate::models::Severity;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum SqlApi {
Safe,
Unsafe,
UnsafeRaw,
Ambiguous,
Unknown,
}
impl SqlApi {
pub(super) fn callee_label(self) -> &'static str {
match self {
SqlApi::Safe => "parameterized-sql-call",
SqlApi::Unsafe => "string-formatted-sql-call",
SqlApi::UnsafeRaw => "orm-raw-escape-hatch-call",
SqlApi::Ambiguous => "ambiguous-sql-call",
SqlApi::Unknown => "sql-call",
}
}
#[cfg(test)]
pub(super) fn is_recognized(self) -> bool {
!matches!(self, SqlApi::Unknown)
}
pub(super) fn collapses_safe(self) -> bool {
matches!(self, SqlApi::Safe)
}
pub(super) fn collapses_unsafe(self) -> bool {
matches!(self, SqlApi::Unsafe)
}
pub(super) fn collapses_unsafe_raw(self) -> bool {
matches!(self, SqlApi::UnsafeRaw)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub(super) enum UserInputSource {
TypedString,
UnstructuredJson,
#[default]
None,
}
pub(super) const W_API_SAFE_COLLAPSE: f32 = 1.0;
pub(super) const W_API_UNSAFE_COLLAPSE: f32 = -1.0;
pub(super) const W_API_UNSAFE_RAW_COLLAPSE: f32 = -1.0;
pub(super) const W_USER_INPUT_TYPED_STRING_NEARBY: f32 = -0.30;
pub(super) const W_USER_INPUT_UNSTRUCTURED_JSON_NEARBY: f32 = -0.30;
pub(super) const W_STATIC_SQL_STRING_LITERAL: f32 = 0.40;
pub(super) const W_SQL_KEYWORD_NO_FORMATTING: f32 = 0.10;
pub(super) const W_ENCLOSING_ROUTE_HANDLER: f32 = -0.20;
pub(super) const W_ENCLOSING_TEST_FUNCTION: f32 = 0.15;
pub(super) const W_TRUST_BOUNDARY_NAME: f32 = 0.10;
pub(super) const TYPED_STRING_USER_INPUT_SUBSTRINGS: &[&str] = &[
"request.form",
"request.args",
"request.values",
"request.cookies",
"request.headers",
"request.path_params",
"request.GET",
"request.POST",
];
pub(super) const UNSTRUCTURED_JSON_USER_INPUT_SUBSTRINGS: &[&str] = &[
"request.json",
"request.get_json",
"request.body",
"request.data",
"flask.request.json",
"self.request.body",
];
pub(super) const ROUTE_HANDLER_DECORATOR_SUBSTRINGS: &[&str] = &[
"@app.route",
"@app.get",
"@app.post",
"@app.put",
"@app.delete",
"@router.get",
"@router.post",
"@router.put",
"@router.delete",
"@view",
"@api_view",
"@require_http_methods",
"@csrf_exempt",
"@login_required",
"@blueprint.route",
];
pub(super) const ROUTE_HANDLER_NAME_SUBSTRINGS: &[&str] =
&["_handler", "_endpoint", "_view", "_route"];
pub(super) const TRUST_BOUNDARY_NAME_SUBSTRINGS: &[&str] =
&["_trusted", "_admin", "_internal", "_validated", "_signed"];
const TEST_FUNCTION_SUBSTRINGS: &[&str] = &["test_", "_test", "fixture", "setup", "teardown"];
pub(super) const SQL_KEYWORDS: &[&str] = &[
"SELECT", "INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER", "TRUNCATE", "EXEC",
"EXECUTE",
];
pub(super) const SAFE_DJANGO_ORM_METHODS: &[&str] = &[
"filter", "get", "exclude", "create", "update", "delete", "all", "none", "first", "last",
"count", "exists",
];
pub(super) const UNSAFE_RAW_DJANGO_ORM_METHODS: &[&str] = &["raw", "extra"];
pub(super) const SQL_SINK_METHODS: &[&str] = &[
"execute",
"executemany",
"executescript",
"scalar",
"scalars",
"from_statement",
"mogrify",
"run_sql",
"execute_sql",
"query",
];
pub(super) const SQLALCHEMY_TEXT_FUNCTION_NAMES: &[&str] = &["text", "literal_column"];
#[derive(Debug, Clone, Default, PartialEq)]
pub(super) struct Evidence {
pub api: Option<SqlApi>,
pub callee_label: Option<String>,
pub enclosing_function: Option<String>,
pub enclosing_class: Option<String>,
pub file_path: Option<String>,
pub user_input_source: UserInputSource,
pub enclosing_route_handler: bool,
pub trust_boundary_name: bool,
pub static_sql_string_literal: bool,
pub sql_keyword_no_formatting: bool,
pub sql_safe_annotation: Option<String>,
pub sql_vulnerable_annotation: Option<String>,
}
impl Evidence {
#[cfg(test)]
pub(super) fn empty() -> Self {
Self::default()
}
}
#[derive(Debug, Clone)]
pub(super) struct Prediction {
pub predicted: BranchLabel,
pub alternative_branch: AlternativeBranch,
pub predicted_severity: Severity,
pub reasons: Vec<PredictionReason>,
pub resolutions: Vec<ResolutionSignal>,
}
pub(super) fn predict(evidence: &Evidence) -> Prediction {
let api = evidence.api.unwrap_or(SqlApi::Unknown);
let api_label = api.callee_label();
if let Some(reason) = &evidence.sql_safe_annotation {
return collapse(
BranchLabel::Benign,
api,
0.0,
ResolutionSignal {
kind: ResolutionKind::SourceAnnotation {
syntax: format!("# repotoire: sql-safe[{reason}]"),
},
description: format!(
"`sql-safe[{reason}]` annotation declares this \
SQL call site as safe (whitelisted table name, \
audited internal source, cross-statement validated, \
etc.); the finding collapses to Info."
),
example: Some(format!("{api_label}(...) # repotoire: sql-safe[{reason}]")),
collapses_to: BranchLabel::Benign,
},
PredictionReason {
kind: PredictionReasonKind::Custom {
description: format!("sql-safe[{reason}] annotation"),
},
weight: 1.0,
note: format!(
"Annotated as safely-constructed ({reason}); not a SQL injection risk."
),
},
);
}
if let Some(source) = &evidence.sql_vulnerable_annotation {
return collapse(
BranchLabel::RealBug,
api,
-1.0,
ResolutionSignal {
kind: ResolutionKind::SourceAnnotation {
syntax: format!("# repotoire: sql-vulnerable[{source}]"),
},
description: format!(
"`sql-vulnerable[{source}]` annotation declares this \
SQL call site as exposed (third-party shim, dynamic \
query builder the predictor can't trace, audited-\
untrusted, etc.); the finding stays at the existing \
severity."
),
example: Some(format!(
"{api_label}(...) # repotoire: sql-vulnerable[{source}]"
)),
collapses_to: BranchLabel::RealBug,
},
PredictionReason {
kind: PredictionReasonKind::Custom {
description: format!("sql-vulnerable[{source}] annotation"),
},
weight: -1.0,
note: format!("Annotated as sql-exposed (source: {source})."),
},
);
}
if api.collapses_safe() {
return collapse(
BranchLabel::Benign,
api,
0.0,
ResolutionSignal {
kind: ResolutionKind::StructuralPattern {
description: "Parameterized SQL call / ORM filter expression".to_string(),
},
description: "The SQL call site uses a parameterized \
API: a string-literal query argument paired with a \
bound-values argument (`cursor.execute(\"SELECT ... \
%s\", (val,))`), an ORM keyword-filter expression \
(`Model.objects.filter(id=val)`), or a SQLAlchemy \
`text()` + bound-params dict pair \
(`db.execute(text(\"SELECT :id\"), {\"id\": val})`). \
The driver / ORM separates the SQL grammar from the \
bound values — user input cannot become part of the \
SQL string. The call is safe by structural \
construction."
.to_string(),
example: Some(
"cursor.execute(\"SELECT * FROM users WHERE id = %s\", (user_id,)) # safe"
.to_string(),
),
collapses_to: BranchLabel::Benign,
},
PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "Parameterized SQL / ORM filter".to_string(),
},
weight: W_API_SAFE_COLLAPSE,
note: "The call site is a parameterized SQL or ORM \
keyword-filter expression: the driver / ORM binds \
values separately from the SQL grammar. Phase 2j \
D1.a amendment: trifecta Step 1.5 collapse — \
Benign direction. This is the AST-dominant Safe \
classification."
.to_string(),
},
);
}
if api.collapses_unsafe() {
return collapse(
BranchLabel::RealBug,
api,
-1.0,
ResolutionSignal {
kind: ResolutionKind::StructuralPattern {
description: "String-formatted SQL (f-string / concat / format / %)"
.to_string(),
},
description: "The SQL call's first argument is an \
f-string, a string concatenation expression, a \
`.format()` method call, or a `%`-operator \
expression that interpolates user input into the \
SQL grammar. The interpolated value becomes part \
of the SQL string at runtime — there is no \
parameterization. The textbook CWE-89 SQL \
injection shape."
.to_string(),
example: Some(
"cursor.execute(f\"SELECT * FROM users WHERE id = {request.form['id']}\")"
.to_string(),
),
collapses_to: BranchLabel::RealBug,
},
PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "String-formatted SQL argument".to_string(),
},
weight: W_API_UNSAFE_COLLAPSE,
note: "The SQL call's first argument uses f-string / \
`+`-concat / `.format()` / `%`-operator with \
SQL keywords. Phase 2j D1.b amendment: trifecta \
Step 1.5 collapse — RealBug direction via \
string-formatted-SQL structural pattern. \
Textbook CWE-89."
.to_string(),
},
);
}
if api.collapses_unsafe_raw() {
return collapse(
BranchLabel::RealBug,
api,
-1.0,
ResolutionSignal {
kind: ResolutionKind::StructuralPattern {
description: "ORM .raw() escape hatch with formatted SQL".to_string(),
},
description: "The call site is `<Model>.objects.raw(<formatted \
SQL>)` — the Django ORM raw-SQL escape hatch \
invoked with f-string / concat / `.format()` user \
input. Django developers commonly think `.raw()` \
is safe because \"it's on the ORM,\" but `.raw()` \
takes a raw SQL string and applies no \
parameterization beyond what the developer wires \
in. Combined with string interpolation, `.raw()` \
is exactly the textbook SQL-injection shape — \
distinct from `.filter()` / `.get()` which use \
keyword-bound values."
.to_string(),
example: Some(
"User.objects.raw(\"SELECT * FROM users WHERE id = \" + request.GET['id'])"
.to_string(),
),
collapses_to: BranchLabel::RealBug,
},
PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "Django .raw() escape hatch with formatted SQL".to_string(),
},
weight: W_API_UNSAFE_RAW_COLLAPSE,
note: "The call uses `<Model>.objects.raw(...)` with \
formatted SQL (f-string / concat / format / %). \
Phase 2j D1.c amendment: trifecta Step 1.5 \
collapse — RealBug direction via Django ORM \
raw-SQL escape hatch. This is the Phase 2j \
headline distinction: `.raw()` is NOT the same \
safety contract as `.filter()`."
.to_string(),
},
);
}
let mut sum: f32 = 0.0;
let mut reasons: Vec<PredictionReason> = Vec::new();
match evidence.user_input_source {
UserInputSource::TypedString => {
sum += W_USER_INPUT_TYPED_STRING_NEARBY;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description:
"user input from typed-string source (request.form / request.args)"
.to_string(),
},
weight: W_USER_INPUT_TYPED_STRING_NEARBY,
note: "User input from a typed-string source \
(`request.form`, `request.args`, `request.GET`). \
Unlike Phase 2i (where pymongo's BSON \
serialization made TypedString safer), for SQL \
the source's Python type is not load-bearing — \
f-string interpolation flattens both TypedString \
and UnstructuredJson into the same SQL string. \
Both source families weigh negatively. (D5.2 \
honest-review finding.)"
.to_string(),
});
}
UserInputSource::UnstructuredJson => {
sum += W_USER_INPUT_UNSTRUCTURED_JSON_NEARBY;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description:
"user input from unstructured-JSON source (request.json / request.body)"
.to_string(),
},
weight: W_USER_INPUT_UNSTRUCTURED_JSON_NEARBY,
note: "User input from an unstructured-JSON source \
(`request.json`, `request.get_json()`, \
`request.body`). For SQL, same magnitude as \
TypedString because string interpolation \
flattens both. D5.2 honest-review finding."
.to_string(),
});
}
UserInputSource::None => {}
}
if evidence.static_sql_string_literal {
sum += W_STATIC_SQL_STRING_LITERAL;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "static SQL string literal (no interpolation)".to_string(),
},
weight: W_STATIC_SQL_STRING_LITERAL,
note: "The call's first argument is a string literal \
containing SQL keywords with no f-string / concat / \
format / %% formatting markers. Strong Benign signal: \
the developer wrote a literal SQL string, often \
paired with parameterized placeholders."
.to_string(),
});
}
if evidence.sql_keyword_no_formatting {
sum += W_SQL_KEYWORD_NO_FORMATTING;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "SQL keyword present, no formatting markers in argument".to_string(),
},
weight: W_SQL_KEYWORD_NO_FORMATTING,
note: "The line contains a SQL keyword \
(SELECT/INSERT/UPDATE/DELETE) but the call's first \
argument shows no formatting markers. Soft Benign — \
typical literal-SQL-with-parameters pattern."
.to_string(),
});
}
if evidence.enclosing_route_handler {
sum += W_ENCLOSING_ROUTE_HANDLER;
if let Some(fn_name) = &evidence.enclosing_function {
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "route_handler".to_string(),
name: fn_name.clone(),
},
weight: W_ENCLOSING_ROUTE_HANDLER,
note: "Enclosing function is a route handler (decorator \
or naming convention); higher prior on attacker-\
reachable SQL code. Lighter weight than 2e–2h \
because the AST classification is doing most of \
the work — the handler prior is a thin tiebreaker."
.to_string(),
});
} else {
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "enclosing route handler context".to_string(),
},
weight: W_ENCLOSING_ROUTE_HANDLER,
note: "Call site is in a route-handler context.".to_string(),
});
}
}
if evidence.trust_boundary_name {
sum += W_TRUST_BOUNDARY_NAME;
if let Some(fn_name) = &evidence.enclosing_function {
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "trust_boundary".to_string(),
name: fn_name.clone(),
},
weight: W_TRUST_BOUNDARY_NAME,
note: "Enclosing function name contains a trust-boundary \
keyword (_trusted/_admin/_internal/_validated/_signed) \
— developer-authored signal that data has been verified."
.to_string(),
});
}
}
if let Some(fn_name) = &evidence.enclosing_function {
if matches_test_function(fn_name) {
sum += W_ENCLOSING_TEST_FUNCTION;
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "function".to_string(),
name: fn_name.clone(),
},
weight: W_ENCLOSING_TEST_FUNCTION,
note: format!(
"Enclosing function `{fn_name}` looks like a \
test/fixture; test code rarely the actionable \
security target."
),
});
}
}
let predicted = if sum > 0.0 {
BranchLabel::Benign
} else {
BranchLabel::RealBug
};
build_prediction(predicted, api, sum, reasons, Vec::new())
}
fn matches_test_function(name: &str) -> bool {
let lower = name.to_lowercase();
TEST_FUNCTION_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
pub(super) fn matches_route_handler_name(name: &str) -> bool {
let lower = name.to_lowercase();
ROUTE_HANDLER_NAME_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
pub(super) fn matches_trust_boundary_name(name: &str) -> bool {
let lower = name.to_lowercase();
TRUST_BOUNDARY_NAME_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
pub(super) fn matches_route_handler_decorator(line: &str) -> bool {
let trimmed = line.trim();
ROUTE_HANDLER_DECORATOR_SUBSTRINGS
.iter()
.any(|sub| trimmed.starts_with(sub))
}
pub(super) fn classify_user_input_source(line: &str) -> UserInputSource {
let lower = line.to_lowercase();
for s in UNSTRUCTURED_JSON_USER_INPUT_SUBSTRINGS {
if lower.contains(&s.to_lowercase()) {
return UserInputSource::UnstructuredJson;
}
}
for s in TYPED_STRING_USER_INPUT_SUBSTRINGS {
if lower.contains(&s.to_lowercase()) {
return UserInputSource::TypedString;
}
}
UserInputSource::None
}
pub(super) fn line_contains_sql_keyword(line: &str) -> bool {
let upper = line.to_uppercase();
SQL_KEYWORDS.iter().any(|kw| upper.contains(kw))
}
pub(super) fn is_safe_django_orm_method(method: &str) -> bool {
SAFE_DJANGO_ORM_METHODS.contains(&method)
}
pub(super) fn is_unsafe_raw_django_orm_method(method: &str) -> bool {
UNSAFE_RAW_DJANGO_ORM_METHODS.contains(&method)
}
pub(super) fn is_sql_sink_method(method: &str) -> bool {
SQL_SINK_METHODS.contains(&method)
}
pub(super) fn is_sqlalchemy_text_function(name: &str) -> bool {
SQLALCHEMY_TEXT_FUNCTION_NAMES.contains(&name)
}
fn collapse(
label: BranchLabel,
api: SqlApi,
forced_sum: f32,
resolution: ResolutionSignal,
reason: PredictionReason,
) -> Prediction {
build_prediction(label, api, forced_sum, vec![reason], vec![resolution])
}
fn build_prediction(
predicted: BranchLabel,
api: SqlApi,
sum: f32,
reasons: Vec<PredictionReason>,
resolutions: Vec<ResolutionSignal>,
) -> Prediction {
let api_label = api.callee_label();
let predicted_severity = severity_for_branch(predicted, sum);
let alternative_label = predicted.opposite();
let alternative_severity = severity_for_branch(alternative_label, sum);
let alternative_branch = AlternativeBranch {
label: alternative_label,
severity: alternative_severity,
title: title_for_branch(alternative_label, api_label),
description: description_for_branch(alternative_label, api_label),
suggested_fix: suggested_fix_for_branch(alternative_label, api_label),
};
Prediction {
predicted,
alternative_branch,
predicted_severity,
reasons,
resolutions,
}
}
fn severity_for_branch(label: BranchLabel, sum: f32) -> Severity {
match label {
BranchLabel::RealBug => {
if sum <= -0.7 {
Severity::Critical
} else if sum <= -0.4 {
Severity::High
} else {
Severity::Medium
}
}
BranchLabel::Benign => Severity::Info,
}
}
fn title_for_branch(label: BranchLabel, api_label: &str) -> String {
match label {
BranchLabel::RealBug => format!("Potential SQL injection via {api_label}"),
BranchLabel::Benign => {
format!("Safe SQL call ({api_label}) — informational")
}
}
}
fn description_for_branch(label: BranchLabel, api_label: &str) -> String {
match label {
BranchLabel::RealBug => format!(
"The `{api_label}` call appears to construct a SQL query \
via string interpolation (f-string / concat / format / %), \
or invokes the Django `.raw()` escape hatch with user-\
reachable input. SQL injection allows attackers to access, \
modify, or delete database records and in some cases \
execute operating-system commands."
),
BranchLabel::Benign => format!(
"The `{api_label}` call appears to use a parameterized SQL \
API (string-literal + bound values), an ORM keyword-filter \
expression (`Model.objects.filter(...)`), or a SQLAlchemy \
`text() + bound-params dict` pair. The call is carried as \
Info; the RealBug interpretation is preserved in \
`alternative_branch` in case the predictor is wrong."
),
}
}
fn suggested_fix_for_branch(label: BranchLabel, _api_label: &str) -> Option<String> {
match label {
BranchLabel::RealBug => Some(
"Sanitize the SQL construction:\n\n\
```python\n\
# Instead of:\n\
cursor.execute(f\"SELECT * FROM users WHERE id = {request.form['id']}\")\n\
User.objects.raw(\"SELECT * FROM users WHERE id = \" + request.GET['id'])\n\
\n\
# Use parameterized queries:\n\
cursor.execute(\"SELECT * FROM users WHERE id = %s\", (request.form['id'],))\n\
cursor.execute(\"SELECT * FROM users WHERE id = ?\", [request.form['id']])\n\
\n\
# Or the ORM:\n\
User.objects.filter(id=request.GET['id'])\n\
\n\
# Or SQLAlchemy text() + bound params:\n\
db.execute(text(\"SELECT * FROM users WHERE id = :id\"), {\"id\": user_id})\n\
```\n\n\
If the call is intentionally constructing a complex query \
that the predictor cannot trace (cross-statement assembly, \
dynamic query builder, etc.), annotate the call site with \
`# repotoire: sql-safe[<reason>]` to collapse the finding \
to Info."
.to_string(),
),
BranchLabel::Benign => Some(
"If this is intentional safe usage, annotate \
`# repotoire: sql-safe[<reason>]` to collapse the finding \
to Info definitively. If the alternative branch is correct \
(the call IS exposed to attacker-controlled SQL via a \
path the predictor missed), audit the call's input source \
classification and consider switching to a parameterized \
API or ORM filter expression."
.to_string(),
),
}
}
pub(super) fn extract_sql_safe_reason(line: &str) -> Option<String> {
let ann = parse_python_comment(line)?;
if ann.kind != "sql-safe" {
return None;
}
if ann.args.is_empty() {
Some("unspecified".to_string())
} else {
Some(ann.args[0].clone())
}
}
pub(super) fn extract_sql_vulnerable_source(line: &str) -> Option<String> {
let ann = parse_python_comment(line)?;
if ann.kind != "sql-vulnerable" {
return None;
}
if ann.args.is_empty() {
Some("unspecified".to_string())
} else {
Some(ann.args[0].clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn case_a_parameterized_execute_predicts_benign_info() {
let evidence = Evidence {
api: Some(SqlApi::Safe),
user_input_source: UserInputSource::TypedString,
enclosing_route_handler: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
assert_eq!(p.resolutions.len(), 1);
assert!(matches!(
p.resolutions[0].kind,
ResolutionKind::StructuralPattern { .. }
));
assert_eq!(p.resolutions[0].collapses_to, BranchLabel::Benign);
}
#[test]
fn case_b_fstring_predicts_realbug_critical() {
let evidence = Evidence {
api: Some(SqlApi::Unsafe),
user_input_source: UserInputSource::TypedString,
enclosing_route_handler: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
assert_eq!(p.resolutions.len(), 1);
assert_eq!(p.resolutions[0].collapses_to, BranchLabel::RealBug);
}
#[test]
fn case_c_type_cast_laundered_format_overfires_realbug() {
let evidence = Evidence {
api: Some(SqlApi::Unsafe),
user_input_source: UserInputSource::TypedString,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
}
#[test]
fn case_d_django_orm_filter_predicts_benign_info() {
let evidence = Evidence {
api: Some(SqlApi::Safe),
user_input_source: UserInputSource::TypedString,
enclosing_route_handler: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
}
#[test]
fn case_e_django_raw_with_concat_predicts_realbug_critical() {
let evidence = Evidence {
api: Some(SqlApi::UnsafeRaw),
user_input_source: UserInputSource::TypedString,
enclosing_route_handler: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
assert_eq!(p.resolutions.len(), 1);
assert!(matches!(
p.resolutions[0].kind,
ResolutionKind::StructuralPattern { .. }
));
}
#[test]
fn case_f_static_literal_sql_predicts_benign_info() {
let evidence = Evidence {
api: Some(SqlApi::Safe),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
}
#[test]
fn case_g_opaque_var_with_user_input_handler_predicts_realbug_high() {
let evidence = Evidence {
api: Some(SqlApi::Ambiguous),
user_input_source: UserInputSource::TypedString,
enclosing_route_handler: true,
enclosing_function: Some("get_user_endpoint".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::High);
}
#[test]
fn case_g_prime_opaque_var_no_user_input_predicts_realbug_medium() {
let evidence = Evidence {
api: Some(SqlApi::Ambiguous),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Medium);
}
#[test]
fn case_h_sqlalchemy_text_with_binds_predicts_benign_info() {
let evidence = Evidence {
api: Some(SqlApi::Safe),
user_input_source: UserInputSource::TypedString,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
}
#[test]
fn safe_collapse_dominates_handler_and_user_input() {
let evidence = Evidence {
api: Some(SqlApi::Safe),
user_input_source: UserInputSource::UnstructuredJson,
enclosing_route_handler: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
}
#[test]
fn unsafe_collapse_dominates_test_function() {
let evidence = Evidence {
api: Some(SqlApi::Unsafe),
enclosing_function: Some("test_query_builder".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
}
#[test]
fn unsafe_raw_collapse_dominates_trust_boundary() {
let evidence = Evidence {
api: Some(SqlApi::UnsafeRaw),
trust_boundary_name: true,
enclosing_function: Some("admin_query_trusted".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
}
#[test]
fn ambiguous_with_unstructured_json_and_handler_predicts_realbug_high() {
let evidence = Evidence {
api: Some(SqlApi::Ambiguous),
user_input_source: UserInputSource::UnstructuredJson,
enclosing_route_handler: true,
enclosing_function: Some("api_handler".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::High);
}
#[test]
fn ambiguous_with_typed_string_only_predicts_realbug_medium() {
let evidence = Evidence {
api: Some(SqlApi::Ambiguous),
user_input_source: UserInputSource::TypedString,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Medium);
}
#[test]
fn ambiguous_test_function_predicts_benign() {
let evidence = Evidence {
api: Some(SqlApi::Ambiguous),
enclosing_function: Some("test_user_query".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
}
#[test]
fn ambiguous_static_sql_literal_predicts_benign() {
let evidence = Evidence {
api: Some(SqlApi::Ambiguous),
static_sql_string_literal: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
}
#[test]
fn ambiguous_sql_keyword_no_formatting_predicts_benign() {
let evidence = Evidence {
api: Some(SqlApi::Ambiguous),
sql_keyword_no_formatting: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
}
#[test]
fn ambiguous_trust_boundary_name_predicts_benign() {
let evidence = Evidence {
api: Some(SqlApi::Ambiguous),
trust_boundary_name: true,
enclosing_function: Some("admin_query_trusted".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
}
#[test]
fn sql_safe_annotation_collapses_to_benign() {
let evidence = Evidence {
api: Some(SqlApi::Unsafe), sql_safe_annotation: Some("type-cast-laundered".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
assert_eq!(p.resolutions.len(), 1);
assert!(matches!(
p.resolutions[0].kind,
ResolutionKind::SourceAnnotation { .. }
));
}
#[test]
fn sql_vulnerable_annotation_collapses_to_realbug() {
let evidence = Evidence {
api: Some(SqlApi::Safe), sql_vulnerable_annotation: Some("dynamic-query-builder".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
}
#[test]
fn empty_evidence_tiebreaks_realbug_medium() {
let p = predict(&Evidence::empty());
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Medium);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn realbug_signal_weights_are_negative() {
assert!(W_USER_INPUT_UNSTRUCTURED_JSON_NEARBY < 0.0);
assert!(W_USER_INPUT_TYPED_STRING_NEARBY < 0.0);
assert!(W_ENCLOSING_ROUTE_HANDLER < 0.0);
assert!(W_API_UNSAFE_COLLAPSE < 0.0);
assert!(W_API_UNSAFE_RAW_COLLAPSE < 0.0);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn benign_signal_weights_are_positive() {
assert!(W_STATIC_SQL_STRING_LITERAL > 0.0);
assert!(W_SQL_KEYWORD_NO_FORMATTING > 0.0);
assert!(W_TRUST_BOUNDARY_NAME > 0.0);
assert!(W_ENCLOSING_TEST_FUNCTION > 0.0);
assert!(W_API_SAFE_COLLAPSE > 0.0);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn d5_2_typed_string_and_unstructured_json_same_magnitude_for_sql() {
assert!(
W_USER_INPUT_TYPED_STRING_NEARBY < 0.0,
"TypedString must be negative for SQL (interpolation flattens type)"
);
assert!(
W_USER_INPUT_UNSTRUCTURED_JSON_NEARBY < 0.0,
"UnstructuredJson must be negative for SQL"
);
assert_eq!(
W_USER_INPUT_TYPED_STRING_NEARBY.abs(),
W_USER_INPUT_UNSTRUCTURED_JSON_NEARBY.abs(),
"For SQL, both source families have the same magnitude (D5.2)"
);
}
#[test]
fn sql_api_collapses_predicates() {
assert!(SqlApi::Safe.collapses_safe());
assert!(!SqlApi::Safe.collapses_unsafe());
assert!(!SqlApi::Safe.collapses_unsafe_raw());
assert!(SqlApi::Unsafe.collapses_unsafe());
assert!(!SqlApi::Unsafe.collapses_safe());
assert!(!SqlApi::Unsafe.collapses_unsafe_raw());
assert!(SqlApi::UnsafeRaw.collapses_unsafe_raw());
assert!(!SqlApi::UnsafeRaw.collapses_safe());
assert!(!SqlApi::UnsafeRaw.collapses_unsafe());
assert!(!SqlApi::Ambiguous.collapses_safe());
assert!(!SqlApi::Ambiguous.collapses_unsafe());
assert!(!SqlApi::Ambiguous.collapses_unsafe_raw());
assert!(!SqlApi::Unknown.collapses_safe());
assert!(!SqlApi::Unknown.collapses_unsafe());
assert!(!SqlApi::Unknown.collapses_unsafe_raw());
}
#[test]
fn sql_api_is_recognized() {
assert!(SqlApi::Safe.is_recognized());
assert!(SqlApi::Unsafe.is_recognized());
assert!(SqlApi::UnsafeRaw.is_recognized());
assert!(SqlApi::Ambiguous.is_recognized());
assert!(!SqlApi::Unknown.is_recognized());
}
#[test]
fn classify_request_form_as_typed_string() {
assert_eq!(
classify_user_input_source("u = request.form['user']"),
UserInputSource::TypedString
);
assert_eq!(
classify_user_input_source("uid = request.GET['id']"),
UserInputSource::TypedString
);
}
#[test]
fn classify_request_json_as_unstructured_json() {
assert_eq!(
classify_user_input_source("p = request.json"),
UserInputSource::UnstructuredJson
);
assert_eq!(
classify_user_input_source("body = request.get_json()"),
UserInputSource::UnstructuredJson
);
}
#[test]
fn classify_unstructured_json_priority_when_both_present() {
let src = "x = request.form['a'] or request.json";
assert_eq!(
classify_user_input_source(src),
UserInputSource::UnstructuredJson
);
}
#[test]
fn classify_no_user_input() {
assert_eq!(
classify_user_input_source("x = compute()"),
UserInputSource::None
);
}
#[test]
fn line_contains_sql_keyword_recognizes_select() {
assert!(line_contains_sql_keyword("SELECT * FROM users"));
assert!(line_contains_sql_keyword("select * from users"));
assert!(line_contains_sql_keyword("INSERT INTO log VALUES (1)"));
assert!(line_contains_sql_keyword(
"cursor.execute(\"DELETE FROM x\")"
));
}
#[test]
fn line_contains_sql_keyword_no_match() {
assert!(!line_contains_sql_keyword("x = compute(payload)"));
assert!(!line_contains_sql_keyword("foo = bar"));
}
#[test]
fn safe_django_orm_method_recognition() {
assert!(is_safe_django_orm_method("filter"));
assert!(is_safe_django_orm_method("get"));
assert!(is_safe_django_orm_method("create"));
assert!(is_safe_django_orm_method("update"));
assert!(!is_safe_django_orm_method("raw"));
assert!(!is_safe_django_orm_method("extra"));
}
#[test]
fn unsafe_raw_django_orm_method_recognition() {
assert!(is_unsafe_raw_django_orm_method("raw"));
assert!(is_unsafe_raw_django_orm_method("extra"));
assert!(!is_unsafe_raw_django_orm_method("filter"));
assert!(!is_unsafe_raw_django_orm_method("get"));
}
#[test]
fn sql_sink_method_recognition() {
assert!(is_sql_sink_method("execute"));
assert!(is_sql_sink_method("executemany"));
assert!(is_sql_sink_method("executescript"));
assert!(!is_sql_sink_method("filter"));
assert!(!is_sql_sink_method("compute"));
}
#[test]
fn sqlalchemy_text_function_recognition() {
assert!(is_sqlalchemy_text_function("text"));
assert!(is_sqlalchemy_text_function("literal_column"));
assert!(!is_sqlalchemy_text_function("execute"));
}
#[test]
fn route_handler_decorator_matches() {
assert!(matches_route_handler_decorator("@app.route('/foo')"));
assert!(matches_route_handler_decorator(" @app.post('/x')"));
assert!(matches_route_handler_decorator("@router.get('/v1')"));
assert!(!matches_route_handler_decorator("@dataclass"));
}
#[test]
fn route_handler_name_matches() {
assert!(matches_route_handler_name("login_handler"));
assert!(matches_route_handler_name("user_endpoint"));
assert!(!matches_route_handler_name("compute_total"));
}
#[test]
fn trust_boundary_name_matches() {
assert!(matches_trust_boundary_name("query_trusted"));
assert!(matches_trust_boundary_name("load_admin_query"));
assert!(matches_trust_boundary_name("post_validated_filter"));
assert!(matches_trust_boundary_name("read_internal_state"));
assert!(!matches_trust_boundary_name("plain_query"));
}
#[test]
fn extract_sql_safe_with_reason() {
assert_eq!(
extract_sql_safe_reason(
"cursor.execute(q) # repotoire: sql-safe[whitelisted-table-name]"
),
Some("whitelisted-table-name".to_string())
);
}
#[test]
fn extract_sql_safe_without_reason() {
assert_eq!(
extract_sql_safe_reason("cursor.execute(q) # repotoire: sql-safe"),
Some("unspecified".to_string())
);
}
#[test]
fn extract_sql_vulnerable_with_source() {
assert_eq!(
extract_sql_vulnerable_source(
"cursor.execute(q) # repotoire: sql-vulnerable[dynamic-query-builder]"
),
Some("dynamic-query-builder".to_string())
);
}
#[test]
fn extract_sql_ignores_other_kinds() {
assert_eq!(
extract_sql_safe_reason("x # repotoire: nosql-safe[ok]"),
None
);
assert_eq!(
extract_sql_vulnerable_source("x # repotoire: deserialize-vulnerable[ok]"),
None
);
}
}