use super::annotation::parse_python_comment;
use crate::dual_branch::{
AlternativeBranch, BranchLabel, PredictionReason, PredictionReasonKind, ResolutionKind,
ResolutionSignal,
};
use crate::models::Severity;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum NosqlApi {
TypedValueQuery,
OperatorInjection,
DictExpansion,
Ambiguous,
Unknown,
}
impl NosqlApi {
pub(super) fn callee_label(self) -> &'static str {
match self {
NosqlApi::TypedValueQuery => "typed-value-pymongo-query",
NosqlApi::OperatorInjection => "operator-injection-pymongo-query",
NosqlApi::DictExpansion => "dict-expansion-pymongo-query",
NosqlApi::Ambiguous => "ambiguous-pymongo-query",
NosqlApi::Unknown => "pymongo query",
}
}
#[cfg(test)]
pub(super) fn is_recognized(self) -> bool {
!matches!(self, NosqlApi::Unknown)
}
pub(super) fn collapses_typed_query(self) -> bool {
matches!(self, NosqlApi::TypedValueQuery)
}
pub(super) fn collapses_operator(self) -> bool {
matches!(self, NosqlApi::OperatorInjection)
}
pub(super) fn collapses_dict_expansion(self) -> bool {
matches!(self, NosqlApi::DictExpansion)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub(super) enum UserInputSource {
TypedString,
UnstructuredJson,
#[default]
None,
}
pub(super) const W_API_TYPED_QUERY_COLLAPSE: f32 = 1.0;
pub(super) const W_API_OPERATOR_COLLAPSE: f32 = -1.0;
pub(super) const W_API_DICT_EXPANSION_COLLAPSE: f32 = -1.0;
pub(super) const W_USER_INPUT_TYPED_STRING_NEARBY: f32 = 0.20;
pub(super) const W_USER_INPUT_UNSTRUCTURED_JSON_NEARBY: f32 = -0.30;
pub(super) const W_HAS_DOLLAR_REGEX_WITH_USER_INPUT: f32 = -0.30;
pub(super) const W_DEVELOPER_WRITTEN_OPERATOR: f32 = 0.10;
pub(super) const W_OBJECTID_OR_TYPE_CAST_NEARBY: f32 = 0.40;
pub(super) const W_ENCLOSING_ROUTE_HANDLER: f32 = -0.20;
pub(super) const W_ENCLOSING_TEST_FUNCTION: f32 = 0.15;
pub(super) const W_TRUST_BOUNDARY_NAME: f32 = 0.10;
pub(super) const TYPED_STRING_USER_INPUT_SUBSTRINGS: &[&str] = &[
"request.form",
"request.args",
"request.values",
"request.cookies",
"request.headers",
"request.path_params",
];
pub(super) const UNSTRUCTURED_JSON_USER_INPUT_SUBSTRINGS: &[&str] = &[
"request.json",
"request.get_json",
"request.body",
"request.data",
"flask.request.json",
"self.request.body",
];
pub(super) const ROUTE_HANDLER_DECORATOR_SUBSTRINGS: &[&str] = &[
"@app.route",
"@app.get",
"@app.post",
"@app.put",
"@app.delete",
"@router.get",
"@router.post",
"@router.put",
"@router.delete",
"@view",
"@api_view",
"@require_http_methods",
"@csrf_exempt",
"@login_required",
"@blueprint.route",
];
pub(super) const ROUTE_HANDLER_NAME_SUBSTRINGS: &[&str] =
&["_handler", "_endpoint", "_view", "_route"];
pub(super) const TRUST_BOUNDARY_NAME_SUBSTRINGS: &[&str] =
&["_trusted", "_admin", "_internal", "_validated", "_signed"];
const TEST_FUNCTION_SUBSTRINGS: &[&str] = &["test_", "_test", "fixture", "setup", "teardown"];
pub(super) const TYPE_CAST_SUBSTRINGS: &[&str] = &[
"ObjectId(",
"bson.ObjectId(",
"int(",
"float(",
"bool(",
"UUID(",
"uuid.UUID(",
".parse_obj(",
".model_validate(",
"schema.load(",
];
pub(super) const DANGEROUS_OPERATORS: &[&str] = &["$where", "$function", "$expr", "$accumulator"];
pub(super) const DEVELOPER_OPERATORS: &[&str] = &[
"$ne", "$gt", "$gte", "$lt", "$lte", "$in", "$nin", "$exists",
];
#[derive(Debug, Clone, Default, PartialEq)]
pub(super) struct Evidence {
pub api: Option<NosqlApi>,
pub callee_label: Option<String>,
pub enclosing_function: Option<String>,
pub enclosing_class: Option<String>,
pub file_path: Option<String>,
pub user_input_source: UserInputSource,
pub enclosing_route_handler: bool,
pub trust_boundary_name: bool,
pub has_dollar_regex_with_user_input: bool,
pub has_developer_written_operator: bool,
pub type_cast_nearby: bool,
pub nosql_safe_annotation: Option<String>,
pub nosql_vulnerable_annotation: Option<String>,
}
impl Evidence {
#[cfg(test)]
pub(super) fn empty() -> Self {
Self::default()
}
}
#[derive(Debug, Clone)]
pub(super) struct Prediction {
pub predicted: BranchLabel,
pub alternative_branch: AlternativeBranch,
pub predicted_severity: Severity,
pub reasons: Vec<PredictionReason>,
pub resolutions: Vec<ResolutionSignal>,
}
pub(super) fn predict(evidence: &Evidence) -> Prediction {
let api = evidence.api.unwrap_or(NosqlApi::Unknown);
let api_label = api.callee_label();
if let Some(reason) = &evidence.nosql_safe_annotation {
return collapse(
BranchLabel::Benign,
api,
0.0,
ResolutionSignal {
kind: ResolutionKind::SourceAnnotation {
syntax: format!("# repotoire: nosql-safe[{reason}]"),
},
description: format!(
"`nosql-safe[{reason}]` annotation declares this \
pymongo query as safe (pydantic-validated, cross-\
statement type cast, audited internal source, etc.); \
the finding collapses to Info."
),
example: Some(format!(
"{api_label}(...) # repotoire: nosql-safe[{reason}]"
)),
collapses_to: BranchLabel::Benign,
},
PredictionReason {
kind: PredictionReasonKind::Custom {
description: format!("nosql-safe[{reason}] annotation"),
},
weight: 1.0,
note: format!(
"Annotated as safely-constructed ({reason}); not a NoSQL injection risk."
),
},
);
}
if let Some(source) = &evidence.nosql_vulnerable_annotation {
return collapse(
BranchLabel::RealBug,
api,
-1.0,
ResolutionSignal {
kind: ResolutionKind::SourceAnnotation {
syntax: format!("# repotoire: nosql-vulnerable[{source}]"),
},
description: format!(
"`nosql-vulnerable[{source}]` annotation declares this \
pymongo query as exposed (third-party shim, helper-\
assembled-query the predictor can't trace, audited-\
untrusted, etc.); the finding stays at the existing \
severity."
),
example: Some(format!(
"{api_label}(...) # repotoire: nosql-vulnerable[{source}]"
)),
collapses_to: BranchLabel::RealBug,
},
PredictionReason {
kind: PredictionReasonKind::Custom {
description: format!("nosql-vulnerable[{source}] annotation"),
},
weight: -1.0,
note: format!("Annotated as nosql-exposed (source: {source})."),
},
);
}
if api.collapses_typed_query() {
return collapse(
BranchLabel::Benign,
api,
0.0,
ResolutionSignal {
kind: ResolutionKind::StructuralPattern {
description:
"Typed-value pymongo query (no dangerous operators, all user-input values cast)"
.to_string(),
},
description: "The query passes user input as a structured \
typed value: every user-derived value is wrapped in \
a type-narrowing cast (`str(...)`, `ObjectId(...)`, \
`int(...)`, pydantic-validated model), the dict literal \
has no dangerous server-side operators (`$where`, \
`$function`, `$expr`), and there is no `**`-expansion \
of raw user input. pymongo serializes Python `str` \
values to BSON String — there is no operator-\
interpretation path. The query is safe by structural \
construction."
.to_string(),
example: Some(
"users.find_one({\"username\": str(request.form['user'])}) # safe"
.to_string(),
),
collapses_to: BranchLabel::Benign,
},
PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "Typed-value pymongo query".to_string(),
},
weight: W_API_TYPED_QUERY_COLLAPSE,
note: "The call site is a structurally-typed pymongo \
query: no dangerous operators, no `**`-expansion, \
and all user-input values are cast (str / ObjectId \
/ int / pydantic). Phase 2i D1.a amendment: \
trifecta Step 1.5 collapse — Benign direction. \
This is the headline FP-reduction case for Phase 2i."
.to_string(),
},
);
}
if api.collapses_operator() {
return collapse(
BranchLabel::RealBug,
api,
-1.0,
ResolutionSignal {
kind: ResolutionKind::StructuralPattern {
description:
"Dangerous server-side operator ($where/$function/$expr) with user input"
.to_string(),
},
description: "The query dict literal contains a dangerous \
server-side MongoDB operator (`$where`, `$function`, \
`$expr`, `$accumulator`) whose value derives from \
user input. `$where` executes JavaScript on the \
database server; `$function` and `$accumulator` \
allow aggregation-pipeline JavaScript execution; \
`$expr` with a user-controlled dict enables \
aggregation-expression injection. The textbook \
CWE-943 RCE shape regardless of language."
.to_string(),
example: Some(
"users.find_one({\"$where\": f\"this.name=='{request.form['name']}'\"})"
.to_string(),
),
collapses_to: BranchLabel::RealBug,
},
PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "Dangerous server-side operator with user input".to_string(),
},
weight: W_API_OPERATOR_COLLAPSE,
note: "The query embeds a dangerous server-side operator \
(`$where` / `$function` / `$expr` / `$accumulator`) \
with a user-input expression value. Phase 2i D1.b \
amendment: trifecta Step 1.5 collapse — RealBug \
direction via dangerous-operator structural pattern. \
Server-side JavaScript / aggregation-expression \
execution."
.to_string(),
},
);
}
if api.collapses_dict_expansion() {
return collapse(
BranchLabel::RealBug,
api,
-1.0,
ResolutionSignal {
kind: ResolutionKind::StructuralPattern {
description: "Dict-expansion of raw user input into pymongo query".to_string(),
},
description: "The query body is a `**`-expansion of raw \
user input (`request.json`, `request.get_json()`, \
`request.body`). The attacker controls every key \
and value in the resulting query dict — they can \
supply `{\"$ne\": null}` to bypass equality checks, \
`{\"$where\": \"...\"}` to inject JavaScript, etc. \
The textbook NoSQL auth-bypass vector."
.to_string(),
example: Some(
"users.find_one({**request.get_json()}) # auth bypass via $ne".to_string(),
),
collapses_to: BranchLabel::RealBug,
},
PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "Dict-expansion of raw user input".to_string(),
},
weight: W_API_DICT_EXPANSION_COLLAPSE,
note: "The query body is `**request.json` / `**req.body` / \
similar — the attacker controls every key. Phase 2i \
D1.c amendment: trifecta Step 1.5 collapse — RealBug \
direction via dict-expansion structural pattern. \
This is the canonical NoSQL auth-bypass shape."
.to_string(),
},
);
}
let mut sum: f32 = 0.0;
let mut reasons: Vec<PredictionReason> = Vec::new();
match evidence.user_input_source {
UserInputSource::TypedString => {
sum += W_USER_INPUT_TYPED_STRING_NEARBY;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description:
"user input from typed-string source (request.form / request.args)"
.to_string(),
},
weight: W_USER_INPUT_TYPED_STRING_NEARBY,
note: "User input from a typed-string source \
(`request.form`, `request.args`, `request.values`). \
Python str values flowing into pymongo become BSON \
String — no operator interpretation possible. \
Structural-safety signal (D5.2 honest-review \
finding: this is the source family where Python is \
genuinely safer than JS)."
.to_string(),
});
}
UserInputSource::UnstructuredJson => {
sum += W_USER_INPUT_UNSTRUCTURED_JSON_NEARBY;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description:
"user input from unstructured-JSON source (request.json / request.body)"
.to_string(),
},
weight: W_USER_INPUT_UNSTRUCTURED_JSON_NEARBY,
note: "User input from an unstructured-JSON source \
(`request.json`, `request.get_json()`, \
`request.body`). The attacker can send a dict \
value (e.g. `{\"$ne\": null}`) and pymongo will \
faithfully serialize it as a MongoDB operator \
expression. **Python is NOT structurally safer \
than JS for this source family.** D5.2 honest-\
review finding."
.to_string(),
});
}
UserInputSource::None => {}
}
if evidence.has_dollar_regex_with_user_input {
sum += W_HAS_DOLLAR_REGEX_WITH_USER_INPUT;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "$regex field with user-supplied pattern".to_string(),
},
weight: W_HAS_DOLLAR_REGEX_WITH_USER_INPUT,
note: "Query contains a `$regex` field whose pattern is \
user-derived. Allowing user-supplied regex enables \
ReDoS via crafted exponential-backtracking patterns."
.to_string(),
});
}
if evidence.has_developer_written_operator {
sum += W_DEVELOPER_WRITTEN_OPERATOR;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "developer-written $ne/$gt/$lt operator with literal value"
.to_string(),
},
weight: W_DEVELOPER_WRITTEN_OPERATOR,
note: "Query contains `$ne`/`$gt`/`$lt`/`$in` with a literal \
value (not user-derived). The developer is the author \
of the operator semantics — normal MongoDB query \
construction, NOT operator injection. **This is the \
headline FP-reduction signal for Phase 2i: the legacy \
detector flags this idiom; the predictor doesn't.**"
.to_string(),
});
}
if evidence.type_cast_nearby {
sum += W_OBJECTID_OR_TYPE_CAST_NEARBY;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "type-narrowing cast nearby (ObjectId / int / pydantic)".to_string(),
},
weight: W_OBJECTID_OR_TYPE_CAST_NEARBY,
note: "`ObjectId(...)`, `int(...)`, `float(...)`, or pydantic \
model instantiation appears within ±5 lines of the \
call. Strong type-narrowing signal — even when not \
part of the D1.a collapse pattern, the presence of \
these casts indicates the developer is type-asserting \
user input before query construction."
.to_string(),
});
}
if evidence.enclosing_route_handler {
sum += W_ENCLOSING_ROUTE_HANDLER;
if let Some(fn_name) = &evidence.enclosing_function {
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "route_handler".to_string(),
name: fn_name.clone(),
},
weight: W_ENCLOSING_ROUTE_HANDLER,
note: "Enclosing function is a route handler (decorator \
or naming convention); higher prior on attacker- \
reachable query code. Lighter weight than 2e–2h \
because Python pymongo is structurally safer."
.to_string(),
});
} else {
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "enclosing route handler context".to_string(),
},
weight: W_ENCLOSING_ROUTE_HANDLER,
note: "Call site is in a route-handler context.".to_string(),
});
}
}
if evidence.trust_boundary_name {
sum += W_TRUST_BOUNDARY_NAME;
if let Some(fn_name) = &evidence.enclosing_function {
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "trust_boundary".to_string(),
name: fn_name.clone(),
},
weight: W_TRUST_BOUNDARY_NAME,
note: "Enclosing function name contains a trust-boundary \
keyword (_trusted/_admin/_internal/_validated/_signed) \
— developer-authored signal that data has been verified."
.to_string(),
});
}
}
if let Some(fn_name) = &evidence.enclosing_function {
if matches_test_function(fn_name) {
sum += W_ENCLOSING_TEST_FUNCTION;
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "function".to_string(),
name: fn_name.clone(),
},
weight: W_ENCLOSING_TEST_FUNCTION,
note: format!(
"Enclosing function `{fn_name}` looks like a \
test/fixture; test code rarely the actionable \
security target."
),
});
}
}
let predicted = if sum > 0.0 {
BranchLabel::Benign
} else {
BranchLabel::RealBug
};
build_prediction(predicted, api, sum, reasons, Vec::new())
}
fn matches_test_function(name: &str) -> bool {
let lower = name.to_lowercase();
TEST_FUNCTION_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
pub(super) fn matches_route_handler_name(name: &str) -> bool {
let lower = name.to_lowercase();
ROUTE_HANDLER_NAME_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
pub(super) fn matches_trust_boundary_name(name: &str) -> bool {
let lower = name.to_lowercase();
TRUST_BOUNDARY_NAME_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
pub(super) fn matches_route_handler_decorator(line: &str) -> bool {
let trimmed = line.trim();
ROUTE_HANDLER_DECORATOR_SUBSTRINGS
.iter()
.any(|sub| trimmed.starts_with(sub))
}
pub(super) fn classify_user_input_source(line: &str) -> UserInputSource {
let lower = line.to_lowercase();
for s in UNSTRUCTURED_JSON_USER_INPUT_SUBSTRINGS {
if lower.contains(s) {
return UserInputSource::UnstructuredJson;
}
}
for s in TYPED_STRING_USER_INPUT_SUBSTRINGS {
if lower.contains(s) {
return UserInputSource::TypedString;
}
}
UserInputSource::None
}
pub(super) fn line_contains_type_cast(line: &str) -> bool {
TYPE_CAST_SUBSTRINGS.iter().any(|s| line.contains(s))
}
pub(super) fn line_contains_dangerous_operator(line: &str) -> bool {
DANGEROUS_OPERATORS.iter().any(|s| line.contains(s))
}
pub(super) fn line_contains_developer_operator(line: &str) -> bool {
DEVELOPER_OPERATORS.iter().any(|s| line.contains(s))
}
fn collapse(
label: BranchLabel,
api: NosqlApi,
forced_sum: f32,
resolution: ResolutionSignal,
reason: PredictionReason,
) -> Prediction {
build_prediction(label, api, forced_sum, vec![reason], vec![resolution])
}
fn build_prediction(
predicted: BranchLabel,
api: NosqlApi,
sum: f32,
reasons: Vec<PredictionReason>,
resolutions: Vec<ResolutionSignal>,
) -> Prediction {
let api_label = api.callee_label();
let predicted_severity = severity_for_branch(predicted, sum);
let alternative_label = predicted.opposite();
let alternative_severity = severity_for_branch(alternative_label, sum);
let alternative_branch = AlternativeBranch {
label: alternative_label,
severity: alternative_severity,
title: title_for_branch(alternative_label, api_label),
description: description_for_branch(alternative_label, api_label),
suggested_fix: suggested_fix_for_branch(alternative_label, api_label),
};
Prediction {
predicted,
alternative_branch,
predicted_severity,
reasons,
resolutions,
}
}
fn severity_for_branch(label: BranchLabel, sum: f32) -> Severity {
match label {
BranchLabel::RealBug => {
if sum <= -0.7 {
Severity::Critical
} else if sum <= -0.4 {
Severity::High
} else {
Severity::Medium
}
}
BranchLabel::Benign => Severity::Info,
}
}
fn title_for_branch(label: BranchLabel, api_label: &str) -> String {
match label {
BranchLabel::RealBug => format!("Potential NoSQL injection via {api_label}"),
BranchLabel::Benign => {
format!("Safe pymongo query ({api_label}) — informational")
}
}
}
fn description_for_branch(label: BranchLabel, api_label: &str) -> String {
match label {
BranchLabel::RealBug => format!(
"The `{api_label}` call appears to construct a MongoDB query \
with attacker-reachable input flowing into a dangerous \
operator (`$where`/`$function`/`$expr`/`$accumulator`) or via \
dict-expansion of raw user input. NoSQL injection allows \
attackers to bypass authentication, extract data via \
`$regex` probing, execute arbitrary JavaScript on the \
database server (via `$where`/`$function`), or trigger ReDoS."
),
BranchLabel::Benign => format!(
"The `{api_label}` call appears to construct a typed-value \
MongoDB query (no dangerous operators, no `**`-expansion, \
user-input values cast to BSON-safe types via `str`, \
`ObjectId`, pydantic). The call is carried as Info; the \
RealBug interpretation is preserved in `alternative_branch` \
in case the predictor is wrong."
),
}
}
fn suggested_fix_for_branch(label: BranchLabel, _api_label: &str) -> Option<String> {
match label {
BranchLabel::RealBug => Some(
"Sanitize the query construction:\n\n\
```python\n\
# Instead of:\n\
users.find_one({\"$where\": f\"this.name=='{req.form['n']}'\"})\n\
users.find_one({**request.get_json()})\n\
\n\
# Use typed-value queries:\n\
users.find_one({\"name\": str(request.form['n'])})\n\
users.find_one({\"_id\": ObjectId(request.form['id'])})\n\
\n\
# Or pydantic-validate the payload first:\n\
class Query(BaseModel):\n\
\x20 name: str\n\
q = Query.model_validate(request.get_json())\n\
users.find_one({\"name\": q.name})\n\
```\n\n\
If the call is intentionally constructing a complex query \
that the predictor cannot trace (cross-statement assembly, \
helper-built filter, etc.), annotate the call site with \
`# repotoire: nosql-safe[<reason>]` to collapse the finding \
to Info."
.to_string(),
),
BranchLabel::Benign => Some(
"If this is intentional safe usage, annotate \
`# repotoire: nosql-safe[<reason>]` to collapse the finding \
to Info definitively. If the alternative branch is correct \
(the query IS exposed to attacker-controlled operators via a \
path the predictor missed), audit the call's input source \
classification and consider tightening the type cast on \
user-derived values."
.to_string(),
),
}
}
pub(super) fn extract_nosql_safe_reason(line: &str) -> Option<String> {
let ann = parse_python_comment(line)?;
if ann.kind != "nosql-safe" {
return None;
}
if ann.args.is_empty() {
Some("unspecified".to_string())
} else {
Some(ann.args[0].clone())
}
}
pub(super) fn extract_nosql_vulnerable_source(line: &str) -> Option<String> {
let ann = parse_python_comment(line)?;
if ann.kind != "nosql-vulnerable" {
return None;
}
if ann.args.is_empty() {
Some("unspecified".to_string())
} else {
Some(ann.args[0].clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn case_a_request_json_in_handler_predicts_realbug_high() {
let evidence = Evidence {
api: Some(NosqlApi::Ambiguous),
user_input_source: UserInputSource::UnstructuredJson,
enclosing_route_handler: true,
enclosing_function: Some("login_handler".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::High);
}
#[test]
fn case_b_typed_query_collapse_dominates_handler_signal() {
let evidence = Evidence {
api: Some(NosqlApi::TypedValueQuery),
user_input_source: UserInputSource::TypedString,
enclosing_route_handler: true,
enclosing_function: Some("login".to_string()),
type_cast_nearby: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
assert_eq!(p.reasons.len(), 1);
assert_eq!(p.resolutions.len(), 1);
assert!(matches!(
p.resolutions[0].kind,
ResolutionKind::StructuralPattern { .. }
));
assert_eq!(p.resolutions[0].collapses_to, BranchLabel::Benign);
}
#[test]
fn case_c_where_operator_collapse_dominates() {
let evidence = Evidence {
api: Some(NosqlApi::OperatorInjection),
user_input_source: UserInputSource::TypedString,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
assert_eq!(p.resolutions.len(), 1);
assert_eq!(p.resolutions[0].collapses_to, BranchLabel::RealBug);
}
#[test]
fn case_d_developer_written_operator_predicts_benign() {
let evidence = Evidence {
api: Some(NosqlApi::Ambiguous),
has_developer_written_operator: true,
enclosing_function: Some("list_users".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
}
#[test]
fn case_e_dict_expansion_collapse_dominates() {
let evidence = Evidence {
api: Some(NosqlApi::DictExpansion),
user_input_source: UserInputSource::UnstructuredJson,
enclosing_route_handler: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
assert_eq!(p.resolutions.len(), 1);
assert_eq!(p.resolutions[0].collapses_to, BranchLabel::RealBug);
}
#[test]
fn case_f_objectid_cast_typed_query_predicts_benign() {
let evidence = Evidence {
api: Some(NosqlApi::TypedValueQuery),
user_input_source: UserInputSource::TypedString,
type_cast_nearby: true,
enclosing_route_handler: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
}
#[test]
fn typed_query_collapse_dominates_handler_and_user_input() {
let evidence = Evidence {
api: Some(NosqlApi::TypedValueQuery),
user_input_source: UserInputSource::UnstructuredJson,
enclosing_route_handler: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
}
#[test]
fn operator_collapse_dominates_typed_string_source() {
let evidence = Evidence {
api: Some(NosqlApi::OperatorInjection),
user_input_source: UserInputSource::TypedString, type_cast_nearby: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
}
#[test]
fn ambiguous_with_unstructured_json_and_handler_predicts_realbug_high() {
let evidence = Evidence {
api: Some(NosqlApi::Ambiguous),
user_input_source: UserInputSource::UnstructuredJson,
enclosing_route_handler: true,
enclosing_function: Some("user_endpoint".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::High);
}
#[test]
fn ambiguous_with_typed_string_predicts_benign() {
let evidence = Evidence {
api: Some(NosqlApi::Ambiguous),
user_input_source: UserInputSource::TypedString,
enclosing_function: Some("get_user".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
}
#[test]
fn ambiguous_test_function_predicts_benign() {
let evidence = Evidence {
api: Some(NosqlApi::Ambiguous),
enclosing_function: Some("test_user_query".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
}
#[test]
fn ambiguous_dollar_regex_with_user_input_predicts_realbug() {
let evidence = Evidence {
api: Some(NosqlApi::Ambiguous),
has_dollar_regex_with_user_input: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Medium);
}
#[test]
fn ambiguous_type_cast_nearby_predicts_benign() {
let evidence = Evidence {
api: Some(NosqlApi::Ambiguous),
type_cast_nearby: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
}
#[test]
fn nosql_safe_annotation_collapses_to_benign() {
let evidence = Evidence {
api: Some(NosqlApi::OperatorInjection), nosql_safe_annotation: Some("hmac-verified".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
assert_eq!(p.resolutions.len(), 1);
assert!(matches!(
p.resolutions[0].kind,
ResolutionKind::SourceAnnotation { .. }
));
}
#[test]
fn nosql_vulnerable_annotation_collapses_to_realbug() {
let evidence = Evidence {
api: Some(NosqlApi::TypedValueQuery), nosql_vulnerable_annotation: Some("helper-assembled-query".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
}
#[test]
fn empty_evidence_tiebreaks_realbug_medium() {
let p = predict(&Evidence::empty());
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Medium);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn realbug_signal_weights_are_negative() {
assert!(W_USER_INPUT_UNSTRUCTURED_JSON_NEARBY < 0.0);
assert!(W_HAS_DOLLAR_REGEX_WITH_USER_INPUT < 0.0);
assert!(W_ENCLOSING_ROUTE_HANDLER < 0.0);
assert!(W_API_OPERATOR_COLLAPSE < 0.0);
assert!(W_API_DICT_EXPANSION_COLLAPSE < 0.0);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn benign_signal_weights_are_positive() {
assert!(W_USER_INPUT_TYPED_STRING_NEARBY > 0.0);
assert!(W_DEVELOPER_WRITTEN_OPERATOR > 0.0);
assert!(W_OBJECTID_OR_TYPE_CAST_NEARBY > 0.0);
assert!(W_TRUST_BOUNDARY_NAME > 0.0);
assert!(W_ENCLOSING_TEST_FUNCTION > 0.0);
assert!(W_API_TYPED_QUERY_COLLAPSE > 0.0);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn d5_2_typed_string_vs_unstructured_json_asymmetry() {
assert!(
W_USER_INPUT_TYPED_STRING_NEARBY > 0.0,
"TypedString (request.form/args) must be positive (Python is structurally safer)"
);
assert!(
W_USER_INPUT_UNSTRUCTURED_JSON_NEARBY < 0.0,
"UnstructuredJson (request.json/body) must be negative (Python is NOT safer here)"
);
assert!(
W_USER_INPUT_UNSTRUCTURED_JSON_NEARBY.abs() > W_USER_INPUT_TYPED_STRING_NEARBY.abs(),
"UnstructuredJson magnitude must exceed TypedString magnitude (D5.2 design)"
);
}
#[test]
fn nosql_api_collapses_predicates() {
assert!(NosqlApi::TypedValueQuery.collapses_typed_query());
assert!(!NosqlApi::TypedValueQuery.collapses_operator());
assert!(!NosqlApi::TypedValueQuery.collapses_dict_expansion());
assert!(NosqlApi::OperatorInjection.collapses_operator());
assert!(!NosqlApi::OperatorInjection.collapses_typed_query());
assert!(!NosqlApi::OperatorInjection.collapses_dict_expansion());
assert!(NosqlApi::DictExpansion.collapses_dict_expansion());
assert!(!NosqlApi::DictExpansion.collapses_typed_query());
assert!(!NosqlApi::DictExpansion.collapses_operator());
assert!(!NosqlApi::Ambiguous.collapses_typed_query());
assert!(!NosqlApi::Ambiguous.collapses_operator());
assert!(!NosqlApi::Ambiguous.collapses_dict_expansion());
assert!(!NosqlApi::Unknown.collapses_typed_query());
assert!(!NosqlApi::Unknown.collapses_operator());
assert!(!NosqlApi::Unknown.collapses_dict_expansion());
}
#[test]
fn nosql_api_is_recognized() {
assert!(NosqlApi::TypedValueQuery.is_recognized());
assert!(NosqlApi::OperatorInjection.is_recognized());
assert!(NosqlApi::DictExpansion.is_recognized());
assert!(NosqlApi::Ambiguous.is_recognized());
assert!(!NosqlApi::Unknown.is_recognized());
}
#[test]
fn classify_request_form_as_typed_string() {
assert_eq!(
classify_user_input_source("u = request.form['user']"),
UserInputSource::TypedString
);
assert_eq!(
classify_user_input_source("ids = request.args.getlist('id')"),
UserInputSource::TypedString
);
}
#[test]
fn classify_request_json_as_unstructured_json() {
assert_eq!(
classify_user_input_source("p = request.json"),
UserInputSource::UnstructuredJson
);
assert_eq!(
classify_user_input_source("body = request.get_json()"),
UserInputSource::UnstructuredJson
);
assert_eq!(
classify_user_input_source("b = request.body"),
UserInputSource::UnstructuredJson
);
}
#[test]
fn classify_unstructured_json_priority_when_both_present() {
let src = "x = request.form['a'] or request.json";
assert_eq!(
classify_user_input_source(src),
UserInputSource::UnstructuredJson
);
}
#[test]
fn classify_no_user_input() {
assert_eq!(
classify_user_input_source("x = compute()"),
UserInputSource::None
);
}
#[test]
fn line_contains_type_cast_recognizes_objectid() {
assert!(line_contains_type_cast("_id = ObjectId(x)"));
assert!(line_contains_type_cast(
"from bson import ObjectId; q = ObjectId(s)"
));
}
#[test]
fn line_contains_type_cast_recognizes_pydantic() {
assert!(line_contains_type_cast(
"q = QuerySchema.model_validate(payload)"
));
assert!(line_contains_type_cast(
"q = QuerySchema.parse_obj(payload)"
));
}
#[test]
fn line_contains_type_cast_no_match() {
assert!(!line_contains_type_cast("x = compute(payload)"));
}
#[test]
fn dangerous_operator_recognition() {
assert!(line_contains_dangerous_operator("{\"$where\": \"...\"}"));
assert!(line_contains_dangerous_operator("{\"$function\": ...}"));
assert!(line_contains_dangerous_operator("{\"$expr\": ...}"));
assert!(line_contains_dangerous_operator("{\"$accumulator\": ...}"));
assert!(!line_contains_dangerous_operator("{\"$ne\": null}"));
}
#[test]
fn developer_operator_recognition() {
assert!(line_contains_developer_operator("{\"$ne\": null}"));
assert!(line_contains_developer_operator("{\"$gt\": 0}"));
assert!(line_contains_developer_operator("{\"$in\": [1, 2]}"));
assert!(!line_contains_developer_operator("{\"$where\": \"...\"}"));
}
#[test]
fn route_handler_decorator_matches() {
assert!(matches_route_handler_decorator("@app.route('/foo')"));
assert!(matches_route_handler_decorator(" @app.post('/x')"));
assert!(matches_route_handler_decorator("@router.get('/v1')"));
assert!(matches_route_handler_decorator("@blueprint.route('/x')"));
assert!(!matches_route_handler_decorator("@dataclass"));
}
#[test]
fn route_handler_name_matches() {
assert!(matches_route_handler_name("login_handler"));
assert!(matches_route_handler_name("user_endpoint"));
assert!(matches_route_handler_name("posts_view"));
assert!(!matches_route_handler_name("compute_total"));
}
#[test]
fn trust_boundary_name_matches() {
assert!(matches_trust_boundary_name("query_trusted"));
assert!(matches_trust_boundary_name("load_admin_query"));
assert!(matches_trust_boundary_name("post_validated_filter"));
assert!(matches_trust_boundary_name("read_internal_state"));
assert!(!matches_trust_boundary_name("plain_query"));
}
#[test]
fn extract_nosql_safe_with_reason() {
assert_eq!(
extract_nosql_safe_reason(
"users.find_one({...}) # repotoire: nosql-safe[pydantic-validated]"
),
Some("pydantic-validated".to_string())
);
}
#[test]
fn extract_nosql_safe_without_reason() {
assert_eq!(
extract_nosql_safe_reason("users.find({}) # repotoire: nosql-safe"),
Some("unspecified".to_string())
);
}
#[test]
fn extract_nosql_vulnerable_with_source() {
assert_eq!(
extract_nosql_vulnerable_source(
"users.find(q) # repotoire: nosql-vulnerable[helper-assembled]"
),
Some("helper-assembled".to_string())
);
}
#[test]
fn extract_nosql_ignores_other_kinds() {
assert_eq!(
extract_nosql_safe_reason("x # repotoire: deserialize-safe[ok]"),
None
);
assert_eq!(
extract_nosql_vulnerable_source("x # repotoire: jwt-vulnerable[ok]"),
None
);
}
}