use super::annotation::parse_python_comment;
use crate::dual_branch::{
AlternativeBranch, BranchLabel, PredictionReason, PredictionReasonKind, ResolutionKind,
ResolutionSignal,
};
use crate::models::Severity;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum JwtApi {
PyJwt,
PythonJose,
JwtVerifier,
Unknown,
}
impl JwtApi {
pub(super) fn callee_label(self) -> &'static str {
match self {
JwtApi::PyJwt => "jwt",
JwtApi::PythonJose => "python-jose",
JwtApi::JwtVerifier => "JWTVerifier",
JwtApi::Unknown => "JWT client",
}
}
pub(super) fn is_python(self) -> bool {
matches!(
self,
JwtApi::PyJwt | JwtApi::PythonJose | JwtApi::JwtVerifier
)
}
}
pub(super) const W_EXPLICIT_ALGORITHMS_KWARG: f32 = 0.50;
pub(super) const W_ASYMMETRIC_ALGORITHM: f32 = 0.10;
pub(super) const W_EXPLICIT_VERIFY_TRUE: f32 = 0.10;
pub(super) const W_IMPORT_STRONG_KEY_LIB: f32 = 0.05;
pub(super) const W_ENCLOSING_TEST_FUNCTION: f32 = 0.15;
pub(super) const W_ALGORITHMS_KWARG_OMITTED: f32 = -0.40;
pub(super) const W_EXPLICIT_VERIFY_FALSE: f32 = -0.70;
pub(super) const W_HS256_WITH_PUBLIC_KEY: f32 = -0.50;
pub(super) const W_HS256_WITH_SHORT_SECRET: f32 = -0.20;
pub(super) const W_ENCLOSING_AUTH_FLOW: f32 = -0.20;
pub(super) const W_ALGORITHM_NONE_COLLAPSE: f32 = -1.0;
const ASYMMETRIC_ALGORITHM_NAMES: &[&str] = &[
"rs256", "rs384", "rs512", "es256", "es384", "es512", "ps256", "ps384", "ps512", "eddsa", "ed25519", "ed448",
];
const TEST_FUNCTION_SUBSTRINGS: &[&str] = &["test_", "_test", "fixture", "setup", "teardown"];
const AUTH_FLOW_FUNCTION_SUBSTRINGS: &[&str] =
&["auth", "login", "token", "verify", "session", "middleware"];
const PUBLIC_KEY_NAME_SUBSTRINGS: &[&str] = &[
"pub_key",
"public_key",
"publickey",
".pem",
"rsa_pub",
"ecdsa_pub",
"verify_key",
"verifying_key",
];
#[derive(Debug, Clone, Default, PartialEq)]
pub(super) struct Evidence {
pub api: Option<JwtApi>,
pub enclosing_function: Option<String>,
pub enclosing_class: Option<String>,
pub file_path: Option<String>,
pub import_strong_key_lib: bool,
pub explicit_algorithms_kwarg: bool,
pub algorithm_none_in_slot: bool,
pub asymmetric_algorithm_in_allowlist: bool,
pub explicit_verify_true: bool,
pub explicit_verify_false: bool,
pub algorithm_singular_none: bool,
pub hs256_with_public_key: bool,
pub hs256_with_short_secret: bool,
pub jwt_safe_annotation: Option<String>,
pub jwt_vulnerable_annotation: Option<String>,
}
impl Evidence {
#[cfg(test)]
pub(super) fn empty() -> Self {
Self::default()
}
}
#[derive(Debug, Clone)]
pub(super) struct Prediction {
pub predicted: BranchLabel,
pub alternative_branch: AlternativeBranch,
pub predicted_severity: Severity,
pub reasons: Vec<PredictionReason>,
pub resolutions: Vec<ResolutionSignal>,
}
pub(super) fn predict(evidence: &Evidence) -> Prediction {
let api = evidence.api.unwrap_or(JwtApi::Unknown);
let api_label = api.callee_label();
if let Some(reason) = &evidence.jwt_safe_annotation {
return collapse(
BranchLabel::Benign,
api,
0.0, ResolutionSignal {
kind: ResolutionKind::SourceAnnotation {
syntax: format!("# repotoire: jwt-safe[{reason}]"),
},
description: format!(
"`jwt-safe[{reason}]` annotation declares this JWT \
decode call as safe (verified at edge, signed by \
internal service, etc.); the finding collapses to \
Info."
),
example: Some(format!(
"{api_label}.decode(...) # repotoire: jwt-safe[{reason}]"
)),
collapses_to: BranchLabel::Benign,
},
PredictionReason {
kind: PredictionReasonKind::Custom {
description: format!("jwt-safe[{reason}] annotation"),
},
weight: 1.0,
note: format!(
"Annotated as verified-elsewhere ({reason}); not a \
JWT security risk."
),
},
);
}
if let Some(source) = &evidence.jwt_vulnerable_annotation {
return collapse(
BranchLabel::RealBug,
api,
-1.0, ResolutionSignal {
kind: ResolutionKind::SourceAnnotation {
syntax: format!("# repotoire: jwt-vulnerable[{source}]"),
},
description: format!(
"`jwt-vulnerable[{source}]` annotation declares this \
JWT decode as exposed (attacker-controlled algorithm, \
audited-untrusted, etc.); the finding stays at the \
existing severity."
),
example: Some(format!(
"{api_label}.decode(...) # repotoire: jwt-vulnerable[{source}]"
)),
collapses_to: BranchLabel::RealBug,
},
PredictionReason {
kind: PredictionReasonKind::Custom {
description: format!("jwt-vulnerable[{source}] annotation"),
},
weight: -1.0,
note: format!("Annotated as JWT-exposed (source: {source})."),
},
);
}
if evidence.algorithm_none_in_slot {
return collapse(
BranchLabel::RealBug,
api,
-1.0, ResolutionSignal {
kind: ResolutionKind::StructuralPattern {
description:
"'none' in JWT algorithm slot (unconditional unsigned-token acceptance)"
.to_string(),
},
description: "The JWT call accepts `'none'` as an \
algorithm — either via `algorithm='none'` (singular) \
or as an entry in the `algorithms=[...]` list. This \
is unconditional acceptance of unsigned tokens. No \
other defensive coding compensates because the `none` \
algorithm path bypasses signature verification by \
design."
.to_string(),
example: Some(format!(
"{api_label}.decode(token, key, algorithms=['RS256']) # not ['none', ...]"
)),
collapses_to: BranchLabel::RealBug,
},
PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description:
"'none' in JWT algorithm slot (unconditional unsigned-token acceptance)"
.to_string(),
},
weight: W_ALGORITHM_NONE_COLLAPSE,
note: "The call site has `'none'` in the algorithm slot. \
This is the textbook CVE-2015-2951 / CVE-2016-10555 \
failure mode: unsigned tokens are accepted, \
regardless of any other verification coding present. \
First RealBug-direction collapse in the dual-branch \
series (Phase 2g D1 amendment)."
.to_string(),
},
);
}
let mut sum: f32 = 0.0;
let mut reasons: Vec<PredictionReason> = Vec::new();
if evidence.explicit_algorithms_kwarg {
sum += W_EXPLICIT_ALGORITHMS_KWARG;
reasons.push(PredictionReason {
kind: PredictionReasonKind::KeywordArgument {
name: "algorithms".to_string(),
value: "<list>".to_string(),
},
weight: W_EXPLICIT_ALGORITHMS_KWARG,
note: "Explicit `algorithms=` kwarg is provided. The CVE-2015-\
2951 / CVE-2016-10555 family is about omitting this \
kwarg and letting the JWT header pick the algorithm. \
Decisive Benign signal on its own short of the `'none'` \
collapse path."
.to_string(),
});
}
if evidence.asymmetric_algorithm_in_allowlist {
sum += W_ASYMMETRIC_ALGORITHM;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "asymmetric algorithm (RS*/ES*/PS*/EdDSA) in algorithms=[...]"
.to_string(),
},
weight: W_ASYMMETRIC_ALGORITHM,
note: "The `algorithms=` allowlist contains at least one \
asymmetric algorithm (RS256, ES256, EdDSA, etc.) — \
stronger than an HS-only allowlist because the JWT \
key-confusion attack class is closed at the algorithm \
layer."
.to_string(),
});
}
if evidence.explicit_verify_true {
sum += W_EXPLICIT_VERIFY_TRUE;
reasons.push(PredictionReason {
kind: PredictionReasonKind::KeywordArgument {
name: "verify".to_string(),
value: "True".to_string(),
},
weight: W_EXPLICIT_VERIFY_TRUE,
note: "Explicit `verify=True` or `options={'verify_signature': \
True}`. Defensive coding — the developer wrote the \
kwarg even though `True` is the library's default."
.to_string(),
});
}
if evidence.import_strong_key_lib {
sum += W_IMPORT_STRONG_KEY_LIB;
reasons.push(PredictionReason {
kind: PredictionReasonKind::ImportPresence {
module: "cryptography.hazmat.primitives.serialization".to_string(),
},
weight: W_IMPORT_STRONG_KEY_LIB,
note: "Strong-key constant library imported near the JWT call \
site. Soft positive signal — the code path appears to \
load proper keys."
.to_string(),
});
}
if !evidence.explicit_algorithms_kwarg
&& !evidence.algorithm_singular_none
&& matches!(
api,
JwtApi::PyJwt | JwtApi::PythonJose | JwtApi::JwtVerifier
)
{
sum += W_ALGORITHMS_KWARG_OMITTED;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "algorithms= kwarg omitted on JWT decode".to_string(),
},
weight: W_ALGORITHMS_KWARG_OMITTED,
note: "The `algorithms=` kwarg is not provided. PyJWT >= 2.0 \
raises if this is omitted, but older PyJWT and \
python-jose accept the algorithm from the token \
header — the algorithm-confusion gateway."
.to_string(),
});
}
if evidence.explicit_verify_false {
sum += W_EXPLICIT_VERIFY_FALSE;
reasons.push(PredictionReason {
kind: PredictionReasonKind::KeywordArgument {
name: "verify".to_string(),
value: "False".to_string(),
},
weight: W_EXPLICIT_VERIFY_FALSE,
note: "Signature verification is explicitly disabled via \
`verify=False` or `options={'verify_signature': \
False}`. Tokens are accepted without signature \
validation."
.to_string(),
});
}
if evidence.hs256_with_public_key {
sum += W_HS256_WITH_PUBLIC_KEY;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "HS256 with public-key argument (JWT key confusion)".to_string(),
},
weight: W_HS256_WITH_PUBLIC_KEY,
note: "The JWT call uses HS256 (HMAC) but the key argument's \
name (`pub_key`, `public_key`, `*.pem`) suggests it \
is an RSA public key. Attackers can sign tokens with \
the public key as if it were an HMAC secret — the \
classic JWT key-confusion attack."
.to_string(),
});
}
if evidence.hs256_with_short_secret {
sum += W_HS256_WITH_SHORT_SECRET;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "HS256 with short hardcoded secret".to_string(),
},
weight: W_HS256_WITH_SHORT_SECRET,
note: "The JWT call uses HS256 with a short hardcoded string \
secret (≤ 16 chars). Weak secrets are vulnerable to \
offline brute-force attacks."
.to_string(),
});
}
if let Some(fn_name) = &evidence.enclosing_function {
let in_auth_path = evidence
.file_path
.as_deref()
.map(path_matches_auth_flow)
.unwrap_or(false);
if matches_test_function(fn_name) {
sum += W_ENCLOSING_TEST_FUNCTION;
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "function".to_string(),
name: fn_name.clone(),
},
weight: W_ENCLOSING_TEST_FUNCTION,
note: format!(
"Enclosing function `{fn_name}` looks like a \
test/fixture; test code rarely the actionable \
security target."
),
});
} else if matches_auth_flow_function(fn_name) || in_auth_path {
sum += W_ENCLOSING_AUTH_FLOW;
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "auth_flow".to_string(),
name: fn_name.clone(),
},
weight: W_ENCLOSING_AUTH_FLOW,
note: format!(
"Enclosing function `{fn_name}` (or file path) looks \
like an auth flow (`auth`/`login`/`token`/`verify`/\
`session`/`middleware`); higher prior on attacker-\
reachable JWT verification code."
),
});
}
} else if let Some(path) = evidence.file_path.as_deref() {
if path_matches_auth_flow(path) {
sum += W_ENCLOSING_AUTH_FLOW;
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "auth_flow".to_string(),
name: path.to_string(),
},
weight: W_ENCLOSING_AUTH_FLOW,
note: format!(
"File path `{path}` is in an auth-flow directory \
(auth/security/middleware); higher prior on \
attacker-reachable JWT code."
),
});
}
}
let predicted = if sum > 0.0 {
BranchLabel::Benign
} else {
BranchLabel::RealBug
};
build_prediction(predicted, api, sum, reasons, Vec::new())
}
pub(super) fn is_asymmetric_algorithm(name: &str) -> bool {
let lower = name.trim().trim_matches(['\'', '"']).to_lowercase();
ASYMMETRIC_ALGORITHM_NAMES
.iter()
.any(|sub| lower == *sub || lower.contains(sub))
}
pub(super) fn algorithm_list_contains_none(list_text: &str) -> bool {
let lower = list_text.to_lowercase();
lower.contains("'none'") || lower.contains("\"none\"")
}
pub(super) fn matches_public_key_name(name: &str) -> bool {
let lower = name.to_lowercase();
PUBLIC_KEY_NAME_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
fn matches_test_function(name: &str) -> bool {
let lower = name.to_lowercase();
TEST_FUNCTION_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
pub(super) fn matches_auth_flow_function(name: &str) -> bool {
let lower = name.to_lowercase();
AUTH_FLOW_FUNCTION_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
pub(super) fn path_matches_auth_flow(path: &str) -> bool {
let lower = path.to_lowercase();
lower.contains("auth") || lower.contains("security") || lower.contains("middleware")
}
fn collapse(
label: BranchLabel,
api: JwtApi,
forced_sum: f32,
resolution: ResolutionSignal,
reason: PredictionReason,
) -> Prediction {
build_prediction(label, api, forced_sum, vec![reason], vec![resolution])
}
fn build_prediction(
predicted: BranchLabel,
api: JwtApi,
sum: f32,
reasons: Vec<PredictionReason>,
resolutions: Vec<ResolutionSignal>,
) -> Prediction {
let api_label = api.callee_label();
let predicted_severity = severity_for_branch(predicted, sum);
let alternative_label = predicted.opposite();
let alternative_severity = severity_for_branch(alternative_label, sum);
let alternative_branch = AlternativeBranch {
label: alternative_label,
severity: alternative_severity,
title: title_for_branch(alternative_label, api_label),
description: description_for_branch(alternative_label, api_label),
suggested_fix: suggested_fix_for_branch(alternative_label, api_label),
};
Prediction {
predicted,
alternative_branch,
predicted_severity,
reasons,
resolutions,
}
}
fn severity_for_branch(label: BranchLabel, sum: f32) -> Severity {
match label {
BranchLabel::RealBug => {
if sum <= -0.7 {
Severity::Critical
} else if sum <= -0.4 {
Severity::High
} else {
Severity::Medium
}
}
BranchLabel::Benign => Severity::Info,
}
}
fn title_for_branch(label: BranchLabel, api_label: &str) -> String {
match label {
BranchLabel::RealBug => format!("Potential JWT vulnerability in {api_label} call"),
BranchLabel::Benign => {
format!("JWT decode via {api_label} appears correctly verified (informational)")
}
}
}
fn description_for_branch(label: BranchLabel, api_label: &str) -> String {
match label {
BranchLabel::RealBug => format!(
"The `{api_label}` JWT call appears to be missing an explicit \
`algorithms=` allowlist, OR has `verify=False`, OR uses a \
dangerous symmetric/asymmetric key pairing. JWT \
vulnerabilities allow attackers to forge authentication \
tokens, impersonate users, escalate privileges, or bypass \
authorization entirely."
),
BranchLabel::Benign => format!(
"The `{api_label}` JWT call appears to enforce an explicit \
algorithm allowlist (`algorithms=[...]`) and signature \
verification. The call site is carried as Info; the \
RealBug interpretation is preserved in `alternative_branch` \
in case the predictor is wrong."
),
}
}
fn suggested_fix_for_branch(label: BranchLabel, _api_label: &str) -> Option<String> {
match label {
BranchLabel::RealBug => Some(
"Always provide an explicit algorithm allowlist and never \
include `'none'`:\n\n\
```python\n\
jwt.decode(token, key, algorithms=['RS256']) # explicit, non-'none'\n\
```\n\n\
If using HS256, ensure the key argument is NOT a public \
key (the JWT key-confusion attack); prefer asymmetric \
algorithms with proper key pairs. If this is intentional \
safe usage (the JWT is verified upstream by an edge \
proxy, etc.), annotate `# repotoire: jwt-safe[<reason>]` \
to collapse the finding to Info definitively."
.to_string(),
),
BranchLabel::Benign => Some(
"If this is intentional safe usage, annotate \
`# repotoire: jwt-safe[<reason>]` to collapse the finding \
to Info definitively. If the alternative branch is \
correct (the JWT IS exposed to attacker-controlled \
algorithm choice), audit the algorithm allowlist for \
`'none'`, audit `verify=` for `False`, and verify the key \
argument is appropriate for the algorithm."
.to_string(),
),
}
}
pub(super) fn extract_jwt_safe_reason(line: &str) -> Option<String> {
let ann = parse_python_comment(line)?;
if ann.kind != "jwt-safe" {
return None;
}
if ann.args.is_empty() {
Some("unspecified".to_string())
} else {
Some(ann.args[0].clone())
}
}
pub(super) fn extract_jwt_vulnerable_source(line: &str) -> Option<String> {
let ann = parse_python_comment(line)?;
if ann.kind != "jwt-vulnerable" {
return None;
}
if ann.args.is_empty() {
Some("unspecified".to_string())
} else {
Some(ann.args[0].clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn case_a_explicit_rs256_in_auth_handler_predicts_benign() {
let evidence = Evidence {
api: Some(JwtApi::PyJwt),
explicit_algorithms_kwarg: true,
asymmetric_algorithm_in_allowlist: true,
enclosing_function: Some("login_handler".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!((total - 0.40).abs() < 1e-6, "expected +0.40, got {total}");
}
#[test]
fn case_b_naked_decode_in_handler_predicts_realbug_high() {
let evidence = Evidence {
api: Some(JwtApi::PyJwt),
enclosing_function: Some("login_handler".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::High);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!((total + 0.60).abs() < 1e-6, "expected -0.60, got {total}");
}
#[test]
fn case_c_algorithm_none_singular_collapses_to_realbug_critical() {
let evidence = Evidence {
api: Some(JwtApi::PyJwt),
algorithm_none_in_slot: true,
algorithm_singular_none: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
assert_eq!(p.reasons.len(), 1);
assert_eq!(p.resolutions.len(), 1);
assert!(matches!(
p.resolutions[0].kind,
ResolutionKind::StructuralPattern { .. }
));
}
#[test]
fn case_d_algorithm_none_in_list_collapses_to_realbug_critical() {
let evidence = Evidence {
api: Some(JwtApi::PyJwt),
explicit_algorithms_kwarg: true,
algorithm_none_in_slot: true,
enclosing_function: Some("login_handler".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
assert_eq!(p.reasons.len(), 1, "collapse emits exactly one reason");
}
#[test]
fn case_e_hs256_with_public_key_predicts_realbug() {
let evidence = Evidence {
api: Some(JwtApi::PyJwt),
explicit_algorithms_kwarg: true,
hs256_with_public_key: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!(total.abs() < 1e-6, "expected 0.00, got {total}");
}
#[test]
fn algorithm_none_collapse_dominates_other_benign_signals() {
let evidence = Evidence {
api: Some(JwtApi::PyJwt),
explicit_algorithms_kwarg: true,
asymmetric_algorithm_in_allowlist: true,
explicit_verify_true: true,
import_strong_key_lib: true,
algorithm_none_in_slot: true,
enclosing_function: Some("test_jwt_decode".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
}
#[test]
fn explicit_verify_false_predicts_realbug_high_or_critical() {
let evidence = Evidence {
api: Some(JwtApi::PyJwt),
explicit_algorithms_kwarg: true,
explicit_verify_false: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!((total + 0.20).abs() < 1e-6, "expected -0.20, got {total}");
}
#[test]
fn verify_false_with_auth_flow_predicts_realbug_high() {
let evidence = Evidence {
api: Some(JwtApi::PyJwt),
explicit_verify_false: true,
enclosing_function: Some("authenticate_user".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
}
#[test]
fn asymmetric_algorithm_compounds_benign() {
let evidence = Evidence {
api: Some(JwtApi::PyJwt),
explicit_algorithms_kwarg: true,
asymmetric_algorithm_in_allowlist: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!((total - 0.60).abs() < 1e-6);
}
#[test]
fn verify_true_compounds_benign() {
let evidence = Evidence {
api: Some(JwtApi::PyJwt),
explicit_algorithms_kwarg: true,
explicit_verify_true: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
}
#[test]
fn jwt_safe_annotation_collapses_to_benign() {
let evidence = Evidence {
api: Some(JwtApi::PyJwt),
algorithm_none_in_slot: true,
jwt_safe_annotation: Some("verified-at-edge".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
assert_eq!(p.resolutions.len(), 1);
assert!(matches!(
p.resolutions[0].kind,
ResolutionKind::SourceAnnotation { .. }
));
}
#[test]
fn jwt_vulnerable_annotation_collapses_to_realbug() {
let evidence = Evidence {
api: Some(JwtApi::PyJwt),
explicit_algorithms_kwarg: true,
asymmetric_algorithm_in_allowlist: true,
jwt_vulnerable_annotation: Some("alg-from-header".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
}
#[test]
fn empty_evidence_tiebreaks_realbug() {
let p = predict(&Evidence::empty());
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Medium);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn realbug_signal_weights_are_negative() {
assert!(W_ALGORITHMS_KWARG_OMITTED < 0.0);
assert!(W_EXPLICIT_VERIFY_FALSE < 0.0);
assert!(W_HS256_WITH_PUBLIC_KEY < 0.0);
assert!(W_HS256_WITH_SHORT_SECRET < 0.0);
assert!(W_ENCLOSING_AUTH_FLOW < 0.0);
assert!(W_ALGORITHM_NONE_COLLAPSE < 0.0);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn benign_signal_weights_are_positive() {
assert!(W_EXPLICIT_ALGORITHMS_KWARG > 0.0);
assert!(W_ASYMMETRIC_ALGORITHM > 0.0);
assert!(W_EXPLICIT_VERIFY_TRUE > 0.0);
assert!(W_IMPORT_STRONG_KEY_LIB > 0.0);
assert!(W_ENCLOSING_TEST_FUNCTION > 0.0);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn d1_amendment_required_case_d_predicts_medium_under_additive_only() {
let additive_sum = W_EXPLICIT_ALGORITHMS_KWARG + W_ENCLOSING_AUTH_FLOW;
assert!(
additive_sum > 0.0,
"Under additive-only, Case D would predict Benign — \
unconditional unsigned-token acceptance classified as safe. \
This is the bug the D1 amendment fixes. Pin: {additive_sum}"
);
}
#[test]
fn auth_flow_path_contributes_negative_weight() {
let evidence = Evidence {
api: Some(JwtApi::PyJwt),
explicit_algorithms_kwarg: true,
asymmetric_algorithm_in_allowlist: true,
file_path: Some("/app/auth/jwt_handlers.py".to_string()),
enclosing_function: Some("validate".to_string()),
..Default::default()
};
let p = predict(&evidence);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!((total - 0.40).abs() < 1e-6, "expected +0.40, got {total}");
}
#[test]
fn hs256_pubkey_in_auth_flow_predicts_realbug_high() {
let evidence = Evidence {
api: Some(JwtApi::PyJwt),
explicit_algorithms_kwarg: true,
hs256_with_public_key: true,
enclosing_function: Some("authenticate".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!((total + 0.20).abs() < 1e-6, "expected -0.20, got {total}");
}
#[test]
fn asymmetric_algorithm_lexicon() {
assert!(is_asymmetric_algorithm("RS256"));
assert!(is_asymmetric_algorithm("'RS256'"));
assert!(is_asymmetric_algorithm("\"ES256\""));
assert!(is_asymmetric_algorithm("EdDSA"));
assert!(is_asymmetric_algorithm("PS512"));
assert!(!is_asymmetric_algorithm("HS256"));
assert!(!is_asymmetric_algorithm("none"));
}
#[test]
fn algorithm_list_contains_none_detector() {
assert!(algorithm_list_contains_none("['none', 'HS256']"));
assert!(algorithm_list_contains_none("[\"none\"]"));
assert!(algorithm_list_contains_none("['HS256', 'NONE']"));
assert!(!algorithm_list_contains_none("['HS256', 'RS256']"));
assert!(!algorithm_list_contains_none("['nonexistent']"));
}
#[test]
fn public_key_name_lexicon() {
assert!(matches_public_key_name("pub_key"));
assert!(matches_public_key_name("public_key"));
assert!(matches_public_key_name("PublicKey"));
assert!(matches_public_key_name("/etc/keys/rsa.pem"));
assert!(matches_public_key_name("verify_key"));
assert!(!matches_public_key_name("secret"));
assert!(!matches_public_key_name("private_key"));
}
#[test]
fn auth_flow_function_lexicon() {
assert!(matches_auth_flow_function("authenticate_user"));
assert!(matches_auth_flow_function("login_handler"));
assert!(matches_auth_flow_function("decode_token"));
assert!(matches_auth_flow_function("verify_jwt"));
assert!(matches_auth_flow_function("get_session"));
assert!(matches_auth_flow_function("auth_middleware"));
assert!(!matches_auth_flow_function("calculate_total"));
}
#[test]
fn auth_flow_path_lexicon() {
assert!(path_matches_auth_flow("/app/auth/jwt.py"));
assert!(path_matches_auth_flow("src/security/decode.py"));
assert!(path_matches_auth_flow("middleware/jwt_check.py"));
assert!(!path_matches_auth_flow("src/payments/checkout.py"));
}
#[test]
fn test_function_lexicon() {
assert!(matches_test_function("test_jwt_decode"));
assert!(matches_test_function("jwt_test"));
assert!(matches_test_function("setup_fixture"));
assert!(!matches_test_function("decode_token"));
}
#[test]
fn extract_jwt_safe_with_reason() {
assert_eq!(
extract_jwt_safe_reason(
"jwt.decode(token, key) # repotoire: jwt-safe[verified-at-edge]"
),
Some("verified-at-edge".to_string())
);
}
#[test]
fn extract_jwt_safe_without_reason() {
assert_eq!(
extract_jwt_safe_reason("jwt.decode(token, key) # repotoire: jwt-safe"),
Some("unspecified".to_string())
);
}
#[test]
fn extract_jwt_vulnerable_with_source() {
assert_eq!(
extract_jwt_vulnerable_source(
"jwt.decode(token, key) # repotoire: jwt-vulnerable[alg-from-header]"
),
Some("alg-from-header".to_string())
);
}
#[test]
fn extract_jwt_safe_ignores_other_kinds() {
assert_eq!(
extract_jwt_safe_reason("subprocess.run(...) # repotoire: command-static[ok]"),
None
);
assert_eq!(
extract_jwt_safe_reason("ET.parse(blob) # repotoire: xxe-safe[ok]"),
None
);
assert_eq!(
extract_jwt_safe_reason("requests.get(url) # repotoire: ssrf-safe[ok]"),
None
);
}
#[test]
fn extract_jwt_vulnerable_ignores_other_kinds() {
assert_eq!(
extract_jwt_vulnerable_source(
"requests.get(url) # repotoire: ssrf-vulnerable[audited]"
),
None
);
}
#[test]
fn jwt_api_is_python_includes_recognized_libs() {
assert!(JwtApi::PyJwt.is_python());
assert!(JwtApi::PythonJose.is_python());
assert!(JwtApi::JwtVerifier.is_python());
assert!(!JwtApi::Unknown.is_python());
}
#[test]
fn jwt_api_callee_label_is_stable() {
assert_eq!(JwtApi::PyJwt.callee_label(), "jwt");
assert_eq!(JwtApi::PythonJose.callee_label(), "python-jose");
assert_eq!(JwtApi::JwtVerifier.callee_label(), "JWTVerifier");
assert_eq!(JwtApi::Unknown.callee_label(), "JWT client");
}
}