use super::annotation::parse_python_comment;
use crate::dual_branch::{
AlternativeBranch, BranchLabel, PredictionReason, PredictionReasonKind, ResolutionKind,
ResolutionSignal,
};
use crate::models::Severity;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum DeserializeApi {
Safe,
Unsafe,
Ambiguous,
Unknown,
}
impl DeserializeApi {
pub(super) fn callee_label(self) -> &'static str {
match self {
DeserializeApi::Safe => "safe-deserializer",
DeserializeApi::Unsafe => "unsafe-deserializer",
DeserializeApi::Ambiguous => "ambiguous-deserializer",
DeserializeApi::Unknown => "deserialize client",
}
}
pub(super) fn is_recognized(self) -> bool {
!matches!(self, DeserializeApi::Unknown)
}
pub(super) fn collapses_safe(self) -> bool {
matches!(self, DeserializeApi::Safe)
}
pub(super) fn collapses_unsafe(self) -> bool {
matches!(self, DeserializeApi::Unsafe)
}
}
pub(super) const W_API_SAFE_COLLAPSE: f32 = 1.0;
pub(super) const W_API_UNSAFE_COLLAPSE: f32 = -1.0;
pub(super) const W_USER_INPUT_TO_DESERIALIZE: f32 = -0.50;
pub(super) const W_ENCLOSING_ROUTE_HANDLER: f32 = -0.30;
pub(super) const W_ENCLOSING_UPLOAD_LIKE: f32 = -0.10;
pub(super) const W_LOCAL_FILE_SOURCE: f32 = 0.10;
pub(super) const W_ENCLOSING_TEST_FUNCTION: f32 = 0.15;
pub(super) const W_TRUST_BOUNDARY_NAME: f32 = 0.10;
pub(super) const USER_INPUT_NAME_SUBSTRINGS: &[&str] = &[
"request.data",
"request.body",
"request.json",
"request.form",
"request.files",
"request.get_json",
"request.values",
"request.args",
"flask.request",
"django.request",
"self.request",
"input(",
"sys.argv",
];
pub(super) const ROUTE_HANDLER_DECORATOR_SUBSTRINGS: &[&str] = &[
"@app.route",
"@app.get",
"@app.post",
"@app.put",
"@app.delete",
"@router.get",
"@router.post",
"@router.put",
"@router.delete",
"@view",
"@api_view",
"@require_http_methods",
"@csrf_exempt",
"@login_required",
"@blueprint.route",
];
pub(super) const ROUTE_HANDLER_NAME_SUBSTRINGS: &[&str] =
&["_handler", "_endpoint", "_view", "_route"];
pub(super) const UPLOAD_LIKE_NAME_SUBSTRINGS: &[&str] = &[
"upload", "import_", "_import", "load_", "_load", "restore", "ingest",
];
pub(super) const TRUST_BOUNDARY_NAME_SUBSTRINGS: &[&str] =
&["_trusted", "_admin", "_internal", "_signed"];
const TEST_FUNCTION_SUBSTRINGS: &[&str] = &["test_", "_test", "fixture", "setup", "teardown"];
#[derive(Debug, Clone, Default, PartialEq)]
pub(super) struct Evidence {
pub api: Option<DeserializeApi>,
pub callee_label: Option<String>,
pub enclosing_function: Option<String>,
pub enclosing_class: Option<String>,
pub file_path: Option<String>,
pub user_input_nearby: bool,
pub enclosing_route_handler: bool,
pub enclosing_upload_like: bool,
pub local_file_source: bool,
pub trust_boundary_name: bool,
pub deserialize_safe_annotation: Option<String>,
pub deserialize_vulnerable_annotation: Option<String>,
}
impl Evidence {
#[cfg(test)]
pub(super) fn empty() -> Self {
Self::default()
}
}
#[derive(Debug, Clone)]
pub(super) struct Prediction {
pub predicted: BranchLabel,
pub alternative_branch: AlternativeBranch,
pub predicted_severity: Severity,
pub reasons: Vec<PredictionReason>,
pub resolutions: Vec<ResolutionSignal>,
}
pub(super) fn predict(evidence: &Evidence) -> Prediction {
let api = evidence.api.unwrap_or(DeserializeApi::Unknown);
let api_label = api.callee_label();
if let Some(reason) = &evidence.deserialize_safe_annotation {
return collapse(
BranchLabel::Benign,
api,
0.0,
ResolutionSignal {
kind: ResolutionKind::SourceAnnotation {
syntax: format!("# repotoire: deserialize-safe[{reason}]"),
},
description: format!(
"`deserialize-safe[{reason}]` annotation declares this \
deserialize call as safe (HMAC-signed payload, \
restricted-unpickler wrapper, internal trusted source, \
etc.); the finding collapses to Info."
),
example: Some(format!(
"{api_label}(...) # repotoire: deserialize-safe[{reason}]"
)),
collapses_to: BranchLabel::Benign,
},
PredictionReason {
kind: PredictionReasonKind::Custom {
description: format!("deserialize-safe[{reason}] annotation"),
},
weight: 1.0,
note: format!(
"Annotated as safely-wrapped ({reason}); not a \
deserialization risk."
),
},
);
}
if let Some(source) = &evidence.deserialize_vulnerable_annotation {
return collapse(
BranchLabel::RealBug,
api,
-1.0,
ResolutionSignal {
kind: ResolutionKind::SourceAnnotation {
syntax: format!("# repotoire: deserialize-vulnerable[{source}]"),
},
description: format!(
"`deserialize-vulnerable[{source}]` annotation declares \
this deserialize as exposed (third-party crate without \
verification, audited-untrusted, etc.); the finding \
stays at the existing severity."
),
example: Some(format!(
"{api_label}(...) # repotoire: deserialize-vulnerable[{source}]"
)),
collapses_to: BranchLabel::RealBug,
},
PredictionReason {
kind: PredictionReasonKind::Custom {
description: format!("deserialize-vulnerable[{source}] annotation"),
},
weight: -1.0,
note: format!("Annotated as deserialize-exposed (source: {source})."),
},
);
}
if api.collapses_safe() {
return collapse(
BranchLabel::Benign,
api,
0.0,
ResolutionSignal {
kind: ResolutionKind::StructuralPattern {
description:
"Safe-by-construction deserialize API (json.loads / yaml.safe_load / ...)"
.to_string(),
},
description: "The call uses a Safe-by-construction \
deserialization API (`json.loads`, `yaml.safe_load`, \
`yaml.load(..., Loader=SafeLoader)`, or ruamel.yaml's \
`YAML(typ='safe').load`). These APIs do not have \
code-execution affordances. The input source is \
irrelevant to the safety verdict: a Safe API on \
attacker-controlled input is still Safe."
.to_string(),
example: Some(
"yaml.safe_load(request.data) # safe regardless of source".to_string(),
),
collapses_to: BranchLabel::Benign,
},
PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "Safe-by-construction deserialize API".to_string(),
},
weight: W_API_SAFE_COLLAPSE,
note: "The call site uses an API that cannot execute code \
by design (json.loads / yaml.safe_load / yaml.load \
with explicit SafeLoader / ruamel YAML(typ='safe')). \
Phase 2h D1.a amendment: bidirectional Step 1.5 \
collapse — Benign direction."
.to_string(),
},
);
}
if api.collapses_unsafe() {
return collapse(
BranchLabel::RealBug,
api,
-1.0,
ResolutionSignal {
kind: ResolutionKind::StructuralPattern {
description:
"Unsafe-by-construction deserialize API (pickle.loads / marshal.loads / yaml.load without SafeLoader)"
.to_string(),
},
description: "The call uses an Unsafe-by-construction \
deserialization API (`pickle.loads`, `marshal.loads`, \
`yaml.load` without explicit `Loader=SafeLoader`, \
`cPickle.loads`). These APIs have code-execution \
affordances as their design contract: pickle is \
Turing-complete, marshal honors crafted code objects, \
default yaml.load instantiates Python objects via \
`!!python/object/apply` tags. No defensive coding \
compensates for using these APIs on attacker- \
controlled (or even attacker-writable local-file) \
input."
.to_string(),
example: Some(
"# Replace pickle.loads with json.loads (or annotate if \
truly verified):\npickle.loads(data) # repotoire: deserialize-safe[hmac-verified]"
.to_string(),
),
collapses_to: BranchLabel::RealBug,
},
PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "Unsafe-by-construction deserialize API".to_string(),
},
weight: W_API_UNSAFE_COLLAPSE,
note: "The call site uses an API that grants arbitrary \
code execution by design (pickle.loads / pickle.load \
/ marshal.loads / yaml.load without Loader / \
cPickle.loads). Phase 2h D1.b amendment: \
bidirectional Step 1.5 collapse — RealBug direction. \
This is the textbook CVE-2019-20907 family failure \
mode."
.to_string(),
},
);
}
let mut sum: f32 = 0.0;
let mut reasons: Vec<PredictionReason> = Vec::new();
if evidence.user_input_nearby {
sum += W_USER_INPUT_TO_DESERIALIZE;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "user input flows to deserialize call".to_string(),
},
weight: W_USER_INPUT_TO_DESERIALIZE,
note: "User-controlled input (request.data, request.body, \
request.json, etc.) flows to the deserialize call. \
For Ambiguous APIs, this raises the prior on a real \
bug; for Safe APIs the Step 1.5 collapse fires first \
and this signal is suppressed."
.to_string(),
});
}
if evidence.enclosing_route_handler {
sum += W_ENCLOSING_ROUTE_HANDLER;
if let Some(fn_name) = &evidence.enclosing_function {
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "route_handler".to_string(),
name: fn_name.clone(),
},
weight: W_ENCLOSING_ROUTE_HANDLER,
note: "Enclosing function is a route handler (decorator \
or naming convention); higher prior on attacker- \
reachable deserialize code."
.to_string(),
});
} else {
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "enclosing route handler context".to_string(),
},
weight: W_ENCLOSING_ROUTE_HANDLER,
note: "Call site is in a route-handler context.".to_string(),
});
}
}
if evidence.enclosing_upload_like {
sum += W_ENCLOSING_UPLOAD_LIKE;
if let Some(fn_name) = &evidence.enclosing_function {
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "upload_handler".to_string(),
name: fn_name.clone(),
},
weight: W_ENCLOSING_UPLOAD_LIKE,
note: "Enclosing function name suggests load/import/\
upload/restore — data likely crossing trust \
boundary."
.to_string(),
});
}
}
if evidence.local_file_source {
sum += W_LOCAL_FILE_SOURCE;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "local-disk file source".to_string(),
},
weight: W_LOCAL_FILE_SOURCE,
note: "The deserialize call's source is a local-disk file. \
Local config files are usually trusted; soft positive. \
Note: pickle.loads-on-local-file is already covered \
by the Unsafe-API D1.b collapse for the cases that \
matter; this signal applies only to Ambiguous APIs."
.to_string(),
});
}
if evidence.trust_boundary_name {
sum += W_TRUST_BOUNDARY_NAME;
if let Some(fn_name) = &evidence.enclosing_function {
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "trust_boundary".to_string(),
name: fn_name.clone(),
},
weight: W_TRUST_BOUNDARY_NAME,
note: "Enclosing function name contains a trust-boundary \
keyword (_trusted/_admin/_internal/_signed) — \
developer-authored signal that data has been \
verified."
.to_string(),
});
}
}
if let Some(fn_name) = &evidence.enclosing_function {
if matches_test_function(fn_name) {
sum += W_ENCLOSING_TEST_FUNCTION;
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "function".to_string(),
name: fn_name.clone(),
},
weight: W_ENCLOSING_TEST_FUNCTION,
note: format!(
"Enclosing function `{fn_name}` looks like a \
test/fixture; test code rarely the actionable \
security target."
),
});
}
}
let predicted = if sum > 0.0 {
BranchLabel::Benign
} else {
BranchLabel::RealBug
};
build_prediction(predicted, api, sum, reasons, Vec::new())
}
fn matches_test_function(name: &str) -> bool {
let lower = name.to_lowercase();
TEST_FUNCTION_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
pub(super) fn matches_route_handler_name(name: &str) -> bool {
let lower = name.to_lowercase();
ROUTE_HANDLER_NAME_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
pub(super) fn matches_upload_like_name(name: &str) -> bool {
let lower = name.to_lowercase();
UPLOAD_LIKE_NAME_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
pub(super) fn matches_trust_boundary_name(name: &str) -> bool {
let lower = name.to_lowercase();
TRUST_BOUNDARY_NAME_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
pub(super) fn matches_route_handler_decorator(line: &str) -> bool {
let trimmed = line.trim();
ROUTE_HANDLER_DECORATOR_SUBSTRINGS
.iter()
.any(|sub| trimmed.starts_with(sub))
}
pub(super) fn line_contains_user_input(line: &str) -> bool {
let lower = line.to_lowercase();
USER_INPUT_NAME_SUBSTRINGS
.iter()
.any(|sub| lower.contains(*sub))
}
fn collapse(
label: BranchLabel,
api: DeserializeApi,
forced_sum: f32,
resolution: ResolutionSignal,
reason: PredictionReason,
) -> Prediction {
build_prediction(label, api, forced_sum, vec![reason], vec![resolution])
}
fn build_prediction(
predicted: BranchLabel,
api: DeserializeApi,
sum: f32,
reasons: Vec<PredictionReason>,
resolutions: Vec<ResolutionSignal>,
) -> Prediction {
let api_label = api.callee_label();
let predicted_severity = severity_for_branch(predicted, sum);
let alternative_label = predicted.opposite();
let alternative_severity = severity_for_branch(alternative_label, sum);
let alternative_branch = AlternativeBranch {
label: alternative_label,
severity: alternative_severity,
title: title_for_branch(alternative_label, api_label),
description: description_for_branch(alternative_label, api_label),
suggested_fix: suggested_fix_for_branch(alternative_label, api_label),
};
Prediction {
predicted,
alternative_branch,
predicted_severity,
reasons,
resolutions,
}
}
fn severity_for_branch(label: BranchLabel, sum: f32) -> Severity {
match label {
BranchLabel::RealBug => {
if sum <= -0.7 {
Severity::Critical
} else if sum <= -0.4 {
Severity::High
} else {
Severity::Medium
}
}
BranchLabel::Benign => Severity::Info,
}
}
fn title_for_branch(label: BranchLabel, api_label: &str) -> String {
match label {
BranchLabel::RealBug => format!("Potential insecure deserialization via {api_label}"),
BranchLabel::Benign => {
format!("Safe deserialization via {api_label} (informational)")
}
}
}
fn description_for_branch(label: BranchLabel, api_label: &str) -> String {
match label {
BranchLabel::RealBug => format!(
"The `{api_label}` call appears to deserialize attacker-\
reachable data with an unsafe API (`pickle.loads`, \
`marshal.loads`, `yaml.load` without `SafeLoader`, ...). \
Insecure deserialization allows attackers to execute \
arbitrary code, bypass authentication, or access sensitive \
data."
),
BranchLabel::Benign => format!(
"The `{api_label}` call appears to use a safe-by-construction \
deserialization API (`json.loads`, `yaml.safe_load`, \
`yaml.load(..., Loader=SafeLoader)`). The call is carried \
as Info; the RealBug interpretation is preserved in \
`alternative_branch` in case the predictor is wrong."
),
}
}
fn suggested_fix_for_branch(label: BranchLabel, _api_label: &str) -> Option<String> {
match label {
BranchLabel::RealBug => Some(
"Replace the unsafe deserialize call with a safe-by-construction \
alternative:\n\n\
```python\n\
# Instead of pickle.loads(data):\n\
data = json.loads(text)\n\
\n\
# Instead of yaml.load(stream):\n\
data = yaml.safe_load(stream)\n\
```\n\n\
If the unsafe API is unavoidable (e.g. pickle for an \
internal binary format), verify the payload with an HMAC \
check before deserializing, OR subclass `pickle.Unpickler` \
and override `find_class` to restrict allowed classes. \
If this is intentional safe usage that the v0 predictor \
cannot trace (cross-statement HMAC unwrap, etc.), annotate \
`# repotoire: deserialize-safe[<reason>]` to collapse the \
finding to Info definitively."
.to_string(),
),
BranchLabel::Benign => Some(
"If this is intentional safe usage, annotate \
`# repotoire: deserialize-safe[<reason>]` to collapse the \
finding to Info definitively. If the alternative branch is \
correct (the deserialize IS exposed to attacker-controlled \
data via a path the predictor missed), audit the call's \
source for user input and prefer a safe-by-construction \
alternative (`json.loads`, `yaml.safe_load`)."
.to_string(),
),
}
}
pub(super) fn extract_deserialize_safe_reason(line: &str) -> Option<String> {
let ann = parse_python_comment(line)?;
if ann.kind != "deserialize-safe" {
return None;
}
if ann.args.is_empty() {
Some("unspecified".to_string())
} else {
Some(ann.args[0].clone())
}
}
pub(super) fn extract_deserialize_vulnerable_source(line: &str) -> Option<String> {
let ann = parse_python_comment(line)?;
if ann.kind != "deserialize-vulnerable" {
return None;
}
if ann.args.is_empty() {
Some("unspecified".to_string())
} else {
Some(ann.args[0].clone())
}
}
pub(super) fn classify_deserialize_callee(callee: &str) -> DeserializeApi {
let normalized: String = callee
.chars()
.filter(|c| !c.is_whitespace())
.collect::<String>()
.to_lowercase();
const SAFE_CALLEES: &[&str] = &[
"yaml.safe_load",
"yaml.safe_load_all",
"json.loads",
"json.load",
"simplejson.loads",
"simplejson.load",
"orjson.loads",
"ujson.loads",
"rapidjson.loads",
"msgpack.unpackb", "msgpack.unpack",
];
for safe in SAFE_CALLEES {
if normalized == *safe || normalized.ends_with(&format!(".{safe}")) {
return DeserializeApi::Safe;
}
}
const UNSAFE_CALLEES: &[&str] = &[
"pickle.loads",
"pickle.load",
"cpickle.loads",
"cpickle.load",
"_pickle.loads",
"_pickle.load",
"marshal.loads",
"marshal.load",
"shelve.open", "dill.loads",
"dill.load",
];
for unsafe_callee in UNSAFE_CALLEES {
if normalized == *unsafe_callee || normalized.ends_with(&format!(".{unsafe_callee}")) {
return DeserializeApi::Unsafe;
}
}
if normalized == "yaml.load" || normalized.ends_with(".yaml.load") {
return DeserializeApi::Ambiguous;
}
DeserializeApi::Unknown
}
pub(super) fn yaml_loader_is_safe(loader_value: &str) -> bool {
let lower = loader_value.trim().to_lowercase();
let suffix = lower.rsplit('.').next().unwrap_or(&lower);
matches!(suffix, "safeloader" | "csafeloader")
}
pub(super) fn yaml_loader_is_unsafe(loader_value: &str) -> bool {
let lower = loader_value.trim().to_lowercase();
let suffix = lower.rsplit('.').next().unwrap_or(&lower);
matches!(suffix, "loader" | "fullloader" | "unsafeloader")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn case_a_safe_api_yaml_safe_load_collapses_to_benign() {
let evidence = Evidence {
api: Some(DeserializeApi::Safe),
user_input_nearby: true,
enclosing_route_handler: true,
enclosing_function: Some("update_config".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
assert_eq!(p.reasons.len(), 1);
assert_eq!(p.resolutions.len(), 1);
assert!(matches!(
p.resolutions[0].kind,
ResolutionKind::StructuralPattern { .. }
));
assert_eq!(p.resolutions[0].collapses_to, BranchLabel::Benign);
}
#[test]
fn case_b_unsafe_api_pickle_loads_collapses_to_realbug_critical() {
let evidence = Evidence {
api: Some(DeserializeApi::Unsafe),
local_file_source: true,
enclosing_function: Some("load_cache".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
assert_eq!(p.reasons.len(), 1);
assert_eq!(p.resolutions.len(), 1);
assert!(matches!(
p.resolutions[0].kind,
ResolutionKind::StructuralPattern { .. }
));
assert_eq!(p.resolutions[0].collapses_to, BranchLabel::RealBug);
}
#[test]
fn case_c_safe_api_json_loads_collapses_to_benign() {
let evidence = Evidence {
api: Some(DeserializeApi::Safe),
user_input_nearby: true,
enclosing_route_handler: true,
enclosing_function: Some("api_handler".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
}
#[test]
fn safe_api_collapse_dominates_user_input_and_handler_signals() {
let evidence = Evidence {
api: Some(DeserializeApi::Safe),
user_input_nearby: true,
enclosing_route_handler: true,
enclosing_upload_like: true,
enclosing_function: Some("upload_handler".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
}
#[test]
fn case_e_unsafe_api_collapse_dominates_local_file_signal() {
let evidence = Evidence {
api: Some(DeserializeApi::Unsafe),
local_file_source: true,
trust_boundary_name: true,
enclosing_function: Some("load_admin_signed".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
}
#[test]
fn ambiguous_api_with_user_input_and_handler_predicts_realbug_high() {
let evidence = Evidence {
api: Some(DeserializeApi::Ambiguous),
user_input_nearby: true,
enclosing_route_handler: true,
enclosing_function: Some("config_handler".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical); let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!((total + 0.80).abs() < 1e-6, "expected -0.80, got {total}");
}
#[test]
fn ambiguous_api_test_function_predicts_benign() {
let evidence = Evidence {
api: Some(DeserializeApi::Ambiguous),
enclosing_function: Some("test_yaml_parse".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
}
#[test]
fn ambiguous_api_local_file_predicts_benign_via_soft_positive() {
let evidence = Evidence {
api: Some(DeserializeApi::Ambiguous),
local_file_source: true,
enclosing_function: Some("read_config".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
}
#[test]
fn deserialize_safe_annotation_collapses_to_benign() {
let evidence = Evidence {
api: Some(DeserializeApi::Unsafe),
deserialize_safe_annotation: Some("hmac-verified".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
assert_eq!(p.resolutions.len(), 1);
assert!(matches!(
p.resolutions[0].kind,
ResolutionKind::SourceAnnotation { .. }
));
}
#[test]
fn deserialize_vulnerable_annotation_collapses_to_realbug() {
let evidence = Evidence {
api: Some(DeserializeApi::Safe),
deserialize_vulnerable_annotation: Some("third-party-no-validation".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
}
#[test]
fn empty_evidence_tiebreaks_realbug() {
let p = predict(&Evidence::empty());
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Medium);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn realbug_signal_weights_are_negative() {
assert!(W_USER_INPUT_TO_DESERIALIZE < 0.0);
assert!(W_ENCLOSING_ROUTE_HANDLER < 0.0);
assert!(W_ENCLOSING_UPLOAD_LIKE < 0.0);
assert!(W_API_UNSAFE_COLLAPSE < 0.0);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn benign_signal_weights_are_positive() {
assert!(W_LOCAL_FILE_SOURCE > 0.0);
assert!(W_ENCLOSING_TEST_FUNCTION > 0.0);
assert!(W_TRUST_BOUNDARY_NAME > 0.0);
assert!(W_API_SAFE_COLLAPSE > 0.0);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn d1a_amendment_required_case_a_without_collapse_predicts_realbug_critical() {
let additive_sum = W_USER_INPUT_TO_DESERIALIZE + W_ENCLOSING_ROUTE_HANDLER;
assert!(
additive_sum <= -0.7,
"Under additive-only, Case A would predict RealBug Critical \
for a safe-by-construction API. This is the bug D1.a fixes. \
Pin: {additive_sum}"
);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn d1b_amendment_required_case_b_without_collapse_predicts_benign() {
let additive_sum = W_LOCAL_FILE_SOURCE;
assert!(
additive_sum > 0.0,
"Under additive-only, Case B would predict Benign Info \
for pickle.loads on a local file. This is the bug D1.b \
fixes. Pin: {additive_sum}"
);
}
#[test]
fn deserialize_api_collapses_predicates() {
assert!(DeserializeApi::Safe.collapses_safe());
assert!(!DeserializeApi::Safe.collapses_unsafe());
assert!(DeserializeApi::Unsafe.collapses_unsafe());
assert!(!DeserializeApi::Unsafe.collapses_safe());
assert!(!DeserializeApi::Ambiguous.collapses_safe());
assert!(!DeserializeApi::Ambiguous.collapses_unsafe());
assert!(!DeserializeApi::Unknown.collapses_safe());
assert!(!DeserializeApi::Unknown.collapses_unsafe());
}
#[test]
fn deserialize_api_is_recognized() {
assert!(DeserializeApi::Safe.is_recognized());
assert!(DeserializeApi::Unsafe.is_recognized());
assert!(DeserializeApi::Ambiguous.is_recognized());
assert!(!DeserializeApi::Unknown.is_recognized());
}
#[test]
fn deserialize_api_callee_label_is_stable() {
assert_eq!(DeserializeApi::Safe.callee_label(), "safe-deserializer");
assert_eq!(DeserializeApi::Unsafe.callee_label(), "unsafe-deserializer");
assert_eq!(
DeserializeApi::Ambiguous.callee_label(),
"ambiguous-deserializer"
);
assert_eq!(DeserializeApi::Unknown.callee_label(), "deserialize client");
}
#[test]
fn classify_safe_callees() {
assert_eq!(
classify_deserialize_callee("yaml.safe_load"),
DeserializeApi::Safe
);
assert_eq!(
classify_deserialize_callee("yaml.safe_load_all"),
DeserializeApi::Safe
);
assert_eq!(
classify_deserialize_callee("json.loads"),
DeserializeApi::Safe
);
assert_eq!(
classify_deserialize_callee("json.load"),
DeserializeApi::Safe
);
assert_eq!(
classify_deserialize_callee("simplejson.loads"),
DeserializeApi::Safe
);
assert_eq!(
classify_deserialize_callee("orjson.loads"),
DeserializeApi::Safe
);
}
#[test]
fn classify_unsafe_callees() {
assert_eq!(
classify_deserialize_callee("pickle.loads"),
DeserializeApi::Unsafe
);
assert_eq!(
classify_deserialize_callee("pickle.load"),
DeserializeApi::Unsafe
);
assert_eq!(
classify_deserialize_callee("cPickle.loads"),
DeserializeApi::Unsafe
);
assert_eq!(
classify_deserialize_callee("marshal.loads"),
DeserializeApi::Unsafe
);
assert_eq!(
classify_deserialize_callee("dill.loads"),
DeserializeApi::Unsafe
);
}
#[test]
fn classify_yaml_load_as_ambiguous() {
assert_eq!(
classify_deserialize_callee("yaml.load"),
DeserializeApi::Ambiguous
);
}
#[test]
fn classify_unknown_callee() {
assert_eq!(
classify_deserialize_callee("json.dumps"),
DeserializeApi::Unknown
);
assert_eq!(
classify_deserialize_callee("foo.bar"),
DeserializeApi::Unknown
);
}
#[test]
fn yaml_safe_loader_recognized() {
assert!(yaml_loader_is_safe("yaml.SafeLoader"));
assert!(yaml_loader_is_safe("SafeLoader"));
assert!(yaml_loader_is_safe("yaml.CSafeLoader"));
assert!(yaml_loader_is_safe("CSafeLoader"));
}
#[test]
fn yaml_unsafe_loader_recognized() {
assert!(yaml_loader_is_unsafe("yaml.Loader"));
assert!(yaml_loader_is_unsafe("Loader"));
assert!(yaml_loader_is_unsafe("yaml.FullLoader"));
assert!(yaml_loader_is_unsafe("yaml.UnsafeLoader"));
}
#[test]
fn yaml_loader_classifications_disjoint() {
assert!(!yaml_loader_is_safe("yaml.Loader"));
assert!(!yaml_loader_is_unsafe("yaml.SafeLoader"));
}
#[test]
fn route_handler_decorator_matches() {
assert!(matches_route_handler_decorator("@app.route('/foo')"));
assert!(matches_route_handler_decorator(" @app.post('/x')"));
assert!(matches_route_handler_decorator("@router.get('/v1')"));
assert!(matches_route_handler_decorator("@blueprint.route('/x')"));
assert!(!matches_route_handler_decorator("@dataclass"));
}
#[test]
fn route_handler_name_matches() {
assert!(matches_route_handler_name("login_handler"));
assert!(matches_route_handler_name("api_endpoint"));
assert!(matches_route_handler_name("user_view"));
assert!(!matches_route_handler_name("compute_total"));
}
#[test]
fn upload_like_name_matches() {
assert!(matches_upload_like_name("upload_file"));
assert!(matches_upload_like_name("import_data"));
assert!(matches_upload_like_name("load_config"));
assert!(matches_upload_like_name("restore_session"));
assert!(matches_upload_like_name("ingest_payload"));
assert!(!matches_upload_like_name("compute"));
}
#[test]
fn trust_boundary_name_matches() {
assert!(matches_trust_boundary_name("parse_trusted"));
assert!(matches_trust_boundary_name("load_admin_data"));
assert!(matches_trust_boundary_name("read_internal_state"));
assert!(matches_trust_boundary_name("decode_signed_blob"));
assert!(!matches_trust_boundary_name("parse_input"));
}
#[test]
fn line_contains_user_input_matches() {
assert!(line_contains_user_input("data = request.data"));
assert!(line_contains_user_input(" return request.json"));
assert!(line_contains_user_input("body = request.body"));
assert!(!line_contains_user_input("compute(local_var)"));
}
#[test]
fn extract_deserialize_safe_with_reason() {
assert_eq!(
extract_deserialize_safe_reason(
"pickle.loads(data) # repotoire: deserialize-safe[hmac-verified]"
),
Some("hmac-verified".to_string())
);
}
#[test]
fn extract_deserialize_safe_without_reason() {
assert_eq!(
extract_deserialize_safe_reason("yaml.load(blob) # repotoire: deserialize-safe"),
Some("unspecified".to_string())
);
}
#[test]
fn extract_deserialize_vulnerable_with_source() {
assert_eq!(
extract_deserialize_vulnerable_source(
"cattrs.structure(blob, cls) # repotoire: deserialize-vulnerable[third-party]"
),
Some("third-party".to_string())
);
}
#[test]
fn extract_deserialize_safe_ignores_other_kinds() {
assert_eq!(
extract_deserialize_safe_reason("subprocess.run(...) # repotoire: command-static[ok]"),
None
);
assert_eq!(
extract_deserialize_safe_reason("jwt.decode(...) # repotoire: jwt-safe[ok]"),
None
);
assert_eq!(
extract_deserialize_safe_reason("ET.parse(blob) # repotoire: xxe-safe[ok]"),
None
);
}
#[test]
fn extract_deserialize_vulnerable_ignores_other_kinds() {
assert_eq!(
extract_deserialize_vulnerable_source(
"jwt.decode(...) # repotoire: jwt-vulnerable[alg-from-header]"
),
None
);
}
}