mod annotation;
mod evidence;
mod predict;
use crate::detectors::base::{Detector, DetectorConfig};
use crate::graph::GraphQueryExt;
use crate::models::{deterministic_finding_id, Evidence, Finding, Severity, SourceSpan, Tier};
use anyhow::Result;
use regex::Regex;
use std::path::{Path, PathBuf};
use std::sync::LazyLock;
use tracing::info;
static NONE_ALG: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r#"(?i)(algorithm\s*[=:]\s*["']?none["']?|alg["']?\s*:\s*["']?none)"#)
.expect("valid regex")
});
static HS256_ALG: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r#"(?i)(algorithm\s*[=:]\s*["']?HS256["']?|alg["']?\s*:\s*["']?HS256)"#)
.expect("valid regex")
});
static JWT_API_CALL: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(
r"(?i)(\bjwt\s*\.\s*(?:decode|encode|verify|sign)\b|\bjose\s*\.|\bpyjwt\s*\.|\bauthlib\s*\.\s*jose\b|\bverify_jwt\s*\(|\bverifyToken\s*\(|\bJWTVerifier\b)",
)
.expect("valid regex")
});
static ALG_PARAM: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"(?i)(algorithms?\s*[=:]\s*\[|verify\s*=\s*False|options.*verify)")
.expect("valid regex")
});
static BLOCKING_ALG_NONE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(
r#"(?i)(?:algorithm\s*[=:]\s*["']none["']|alg["']?\s*:\s*["']none["']|algorithms\s*[=:]\s*\[["']none["']\])"#,
)
.expect("valid regex")
});
static BLOCKING_EMPTY_SECRET: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(
r#"(?i)\bjwt\s*\.\s*(?:sign|verify|decode)\s*\([^)]*,\s*(?:''|""|null|None)\s*[,)#]"#,
)
.expect("valid regex")
});
static BLOCKING_HARDCODED_SECRET: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r#"(?i)\bjwt\s*\.\s*(?:sign|verify|decode)\s*\([^)]*,\s*["'][^"']+["']"#)
.expect("valid regex")
});
static HS_FAMILY: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"(?i)\bHS(?:256|384|512)\b").expect("valid regex"));
pub struct JwtWeakDetector {
#[allow(dead_code)] repository_path: PathBuf,
max_findings: usize,
}
impl JwtWeakDetector {
crate::detectors::detector_new!(50);
fn classify_blocking_rule(line: &str) -> Option<&'static str> {
if BLOCKING_ALG_NONE.is_match(line) {
return Some("jwt_alg_none");
}
if BLOCKING_EMPTY_SECRET.is_match(line) {
return Some("jwt_empty_secret");
}
if BLOCKING_HARDCODED_SECRET.is_match(line) && HS_FAMILY.is_match(line) {
return Some("jwt_hardcoded_secret");
}
None
}
fn scan_python_file_dual_branch(&self, path: &Path, content: &str) -> Vec<Finding> {
if content.contains('\0') {
return Vec::new();
}
let Some(tree) = crate::detectors::ast_fingerprint::parse_root_ext(
content,
crate::parsers::lightweight::Language::Python,
"py",
) else {
return Vec::new();
};
let root = tree.root_node();
let source = content.as_bytes();
let lines: Vec<&str> = content.lines().collect();
let mut findings = Vec::new();
for site in evidence::collect_python_jwt_sites(root, source) {
let line_idx = site.call_node.start_position().row;
if let Some(line) = lines.get(line_idx) {
let prev = if line_idx > 0 {
Some(lines[line_idx - 1])
} else {
None
};
if crate::detectors::is_line_suppressed(line, prev) {
continue;
}
}
let snippet = lines.get(line_idx).map(|s| s.trim()).unwrap_or("");
let line_num = (line_idx + 1) as u32;
findings.push(self.build_dual_branch_jwt_finding(
path,
line_num,
site.api,
snippet,
site.call_node,
root,
source,
&lines,
));
}
findings
}
#[allow(clippy::too_many_arguments)]
fn build_dual_branch_jwt_finding(
&self,
path: &Path,
line_num: u32,
api: predict::JwtApi,
snippet: &str,
call_node: tree_sitter::Node<'_>,
module_root: tree_sitter::Node<'_>,
source: &[u8],
lines: &[&str],
) -> Finding {
let api_label = api.callee_label();
let path_str = path.to_string_lossy().to_string();
let ev = evidence::extract_python_evidence(
call_node,
module_root,
source,
lines,
Some(path_str),
);
let prediction = predict::predict(&ev);
let predicted_label = prediction.predicted;
let predicted_severity = prediction.predicted_severity;
let predicted_title = match predicted_label {
crate::dual_branch::BranchLabel::RealBug => {
format!("Potential JWT weakness via {api_label}")
}
crate::dual_branch::BranchLabel::Benign => {
format!("Hardened JWT decode via {api_label} (informational)")
}
};
let predicted_description = format!(
"**JWT Weakness (dual-branch, CWE-327)**\n\n\
**API**: `{}`\n\n\
**Location**: {}:{}\n\n\
**Code**:\n```python\n{}\n```\n\n\
{}",
api_label,
path.display(),
line_num,
snippet,
match predicted_label {
crate::dual_branch::BranchLabel::RealBug => format!(
"The `{api_label}` call site shows evidence of unsafe \
JWT verification — e.g. `algorithm='none'`, an \
omitted `algorithms=` allowlist, `verify=False`, or \
HS256 used with a public-key-named argument. The \
predictor leans RealBug for this call site (see \
`prediction_reasons`); the alternative Benign \
interpretation is carried in `alternative_branch`."
),
crate::dual_branch::BranchLabel::Benign => format!(
"The `{api_label}` call site uses an explicit \
`algorithms=` allowlist (typically with an \
asymmetric algorithm), does not pass `verify=False`, \
and does not show other JWT-weakness signals. The \
predictor leans Benign (see `prediction_reasons`); \
the original RealBug interpretation is preserved in \
`alternative_branch`."
),
},
);
let predicted_fix = match predicted_label {
crate::dual_branch::BranchLabel::RealBug => Some(
"Always pass an explicit `algorithms=[...]` allowlist of \
asymmetric algorithms (RS256, ES256, EdDSA) to \
`jwt.decode` / `jose.jwt.decode` / `authlib.jose`. \
Never use `algorithm='none'` and never set \
`verify=False` outside test code. If HS256 is \
unavoidable, use a strong randomly-generated 32+ byte \
secret and ensure it is not also published as a public \
key.\n\n\
```python\n\
# Safe (asymmetric, explicit allowlist):\n\
jwt.decode(token, public_key, algorithms=['RS256'])\n\
```\n\n\
If this is a false positive (the call site IS safe via \
a path the v0 predictor doesn't see), annotate the call \
site with `# repotoire: jwt-safe[<reason>]` to collapse \
the finding to Info."
.to_string(),
),
crate::dual_branch::BranchLabel::Benign => Some(
"If this decode IS in an attacker-reachable auth flow \
(the alternative branch), verify that:\n\
• The `algorithms=` list never contains `'none'`.\n\
• The key argument matches the algorithm family \
(HS* with a long secret, RS*/ES*/EdDSA with the \
matching public key).\n\
• `verify_signature` is never `False` in `options=`.\n\n\
If the predictor is correct that this call is \
hardened, no action is needed."
.to_string(),
),
};
let mut finding = Finding {
id: String::new(),
detector: "JwtWeakDetector".to_string(),
severity: predicted_severity,
title: predicted_title,
description: predicted_description,
affected_files: vec![path.to_path_buf()],
line_start: Some(line_num),
line_end: Some(line_num),
suggested_fix: predicted_fix,
estimated_effort: Some("30 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-327".to_string()),
why_it_matters: Some(
"JWT vulnerabilities can allow attackers to forge authentication tokens, \
impersonate users, escalate privileges, or bypass authorization entirely."
.to_string(),
),
..Default::default()
};
finding = finding.with_alternative_branch(prediction.alternative_branch);
for reason in prediction.reasons {
finding = finding.with_prediction_reason(reason);
}
for resolution in prediction.resolutions {
finding = finding.with_resolution_signal(resolution);
}
finding
}
fn analyze_vulnerability(line: &str, context: &str) -> JwtVulnerability {
let lower = line.to_lowercase();
let ctx_lower = context.to_lowercase();
if lower.contains("none") && (lower.contains("algorithm") || lower.contains("alg")) {
return JwtVulnerability::NoneAlgorithm;
}
if lower.contains("verify") && (lower.contains("false") || lower.contains("skip")) {
return JwtVulnerability::VerificationDisabled;
}
if ctx_lower.contains("header") && ctx_lower.contains("alg") {
return JwtVulnerability::AlgorithmConfusion;
}
if lower.contains("hs256") {
if ctx_lower.contains("public") || ctx_lower.contains("rsa") {
return JwtVulnerability::KeyConfusion;
}
return JwtVulnerability::WeakSymmetric;
}
JwtVulnerability::Other
}
fn is_auth_flow(func_name: &str, file_path: &str) -> bool {
let name_lower = func_name.to_lowercase();
let path_lower = file_path.to_lowercase();
name_lower.contains("auth")
|| name_lower.contains("login")
|| name_lower.contains("verify")
|| name_lower.contains("token")
|| name_lower.contains("session")
|| name_lower.contains("middleware")
|| path_lower.contains("auth")
|| path_lower.contains("security")
|| path_lower.contains("middleware")
}
}
#[derive(Debug, Clone)]
enum JwtVulnerability {
NoneAlgorithm, VerificationDisabled, AlgorithmConfusion, KeyConfusion, WeakSymmetric, Other,
}
impl JwtVulnerability {
fn severity(&self) -> Severity {
match self {
JwtVulnerability::NoneAlgorithm => Severity::Critical,
JwtVulnerability::VerificationDisabled => Severity::Critical,
JwtVulnerability::AlgorithmConfusion => Severity::Critical,
JwtVulnerability::KeyConfusion => Severity::Critical,
JwtVulnerability::WeakSymmetric => Severity::Medium,
JwtVulnerability::Other => Severity::Low,
}
}
fn title(&self) -> &'static str {
match self {
JwtVulnerability::NoneAlgorithm => "JWT algorithm 'none' allows unsigned tokens",
JwtVulnerability::VerificationDisabled => "JWT verification is disabled",
JwtVulnerability::AlgorithmConfusion => "JWT algorithm confusion vulnerability",
JwtVulnerability::KeyConfusion => "JWT key confusion (HS256 with RSA key)",
JwtVulnerability::WeakSymmetric => "JWT using symmetric algorithm (HS256)",
JwtVulnerability::Other => "Potential JWT security issue",
}
}
fn description(&self) -> &'static str {
match self {
JwtVulnerability::NoneAlgorithm => {
"Using algorithm 'none' means tokens aren't signed. \
Any attacker can forge valid tokens."
}
JwtVulnerability::VerificationDisabled => {
"Signature verification is disabled. \
Tokens are accepted without validation."
}
JwtVulnerability::AlgorithmConfusion => {
"The algorithm is read from the token header instead of being enforced. \
Attackers can switch algorithms to bypass verification."
}
JwtVulnerability::KeyConfusion => {
"Using HS256 (symmetric) with an RSA public key allows attackers \
to sign tokens with the public key."
}
JwtVulnerability::WeakSymmetric => {
"HS256 uses a shared secret. If the secret is weak or leaked, \
attackers can forge tokens. Consider RS256/ES256."
}
JwtVulnerability::Other => "Potential JWT security concern detected.",
}
}
fn fix(&self) -> &'static str {
match self {
JwtVulnerability::NoneAlgorithm => {
"Never allow 'none' algorithm in production:\n\n\
```python\n\
# Python (PyJWT)\n\
jwt.decode(token, key, algorithms=['RS256']) # Explicit whitelist\n\
```\n\n\
```javascript\n\
// Node.js\n\
jwt.verify(token, publicKey, { algorithms: ['RS256'] });\n\
```"
}
JwtVulnerability::VerificationDisabled => {
"Always verify JWT signatures:\n\n\
```python\n\
# Never do this:\n\
# jwt.decode(token, options={'verify_signature': False})\n\
\n\
# Always verify:\n\
jwt.decode(token, key, algorithms=['RS256'])\n\
```"
}
JwtVulnerability::AlgorithmConfusion => {
"Always specify allowed algorithms explicitly:\n\n\
```python\n\
# Don't trust the token's 'alg' header\n\
jwt.decode(token, key, algorithms=['RS256']) # Whitelist\n\
```"
}
JwtVulnerability::KeyConfusion => {
"Use asymmetric algorithms (RS256/ES256) with proper key pairs:\n\n\
```python\n\
# Sign with private key\n\
jwt.encode(payload, private_key, algorithm='RS256')\n\
\n\
# Verify with public key\n\
jwt.decode(token, public_key, algorithms=['RS256'])\n\
```"
}
JwtVulnerability::WeakSymmetric => {
"Consider using asymmetric algorithms:\n\n\
```python\n\
# RS256 (RSA) or ES256 (ECDSA) recommended\n\
jwt.encode(payload, private_key, algorithm='RS256')\n\
\n\
# If using HS256, ensure secret is:\n\
# - At least 256 bits (32 bytes)\n\
# - Cryptographically random\n\
# - Never hardcoded\n\
```"
}
JwtVulnerability::Other => "Review JWT implementation for security best practices.",
}
}
}
impl Detector for JwtWeakDetector {
fn name(&self) -> &'static str {
"jwt-weak"
}
fn description(&self) -> &'static str {
"Detects weak JWT algorithms and configurations"
}
fn bypass_postprocessor(&self) -> bool {
true
}
fn file_extensions(&self) -> &'static [&'static str] {
&["py", "js", "ts", "jsx", "tsx", "rb", "java", "go"]
}
fn content_requirements(&self) -> crate::detectors::detector_context::ContentFlags {
crate::detectors::detector_context::ContentFlags::HAS_CRYPTO
}
fn detect(
&self,
ctx: &crate::detectors::analysis_context::AnalysisContext,
) -> Result<Vec<Finding>> {
let graph = ctx.graph;
let files = &ctx.as_file_provider();
let mut findings = vec![];
let flag_on = ctx.dual_branch.is_enabled_for("jwt-weak");
for path in files.files_with_extensions(&["py", "js", "ts", "java", "go", "rb", "php"]) {
if findings.len() >= self.max_findings {
break;
}
let path_str = path.to_string_lossy().to_string();
let in_test_context = crate::detectors::base::is_test_path(&path_str);
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
if flag_on && ext == "py" {
if let Some(content) = files.content(path) {
let dual = self.scan_python_file_dual_branch(path, &content);
for finding in dual {
findings.push(finding);
if findings.len() >= self.max_findings {
break;
}
}
}
continue;
}
if let Some(content) = files.content(path) {
let lines: Vec<&str> = content.lines().collect();
for (i, line) in lines.iter().enumerate() {
let prev_line = if i > 0 { Some(lines[i - 1]) } else { None };
if crate::detectors::is_line_suppressed(line, prev_line) {
continue;
}
let trimmed = line.trim();
if trimmed.starts_with("//") || trimmed.starts_with("#") {
continue;
}
let prescreen_blocking_rule = if !in_test_context {
Self::classify_blocking_rule(line)
} else {
None
};
let has_blocking_secret = prescreen_blocking_rule
.map(|r| matches!(r, "jwt_empty_secret" | "jwt_hardcoded_secret"))
.unwrap_or(false);
let has_none = NONE_ALG.is_match(line);
let has_hs256 = HS256_ALG.is_match(line);
let has_verify_issue = ALG_PARAM.is_match(line)
&& (line.to_lowercase().contains("false") || line.contains("none"));
if !has_none && !has_hs256 && !has_verify_issue && !has_blocking_secret {
continue;
}
let start = i.saturating_sub(5);
let end = (i + 5).min(lines.len());
let context = lines[start..end].join(" ");
if !has_none && !has_hs256 && !has_blocking_secret {
let has_jwt_context =
JWT_API_CALL.is_match(line) || JWT_API_CALL.is_match(&context);
if !has_jwt_context {
continue;
}
}
let line_num = (i + 1) as u32;
let vuln = Self::analyze_vulnerability(line, &context);
let vuln = if matches!(vuln, JwtVulnerability::Other) && has_blocking_secret {
match prescreen_blocking_rule {
Some("jwt_empty_secret") => JwtVulnerability::VerificationDisabled,
_ => JwtVulnerability::WeakSymmetric,
}
} else {
vuln
};
if matches!(vuln, JwtVulnerability::Other) {
continue;
}
let containing_func = graph.find_function_at(&path_str, line_num);
let containing_info = containing_func.as_ref().map(|f| {
let callers = graph
.get_callers(f.qn(crate::graph::interner::global_interner()))
.len();
(
f.node_name(crate::graph::interner::global_interner())
.to_string(),
callers,
)
});
let is_auth = containing_info
.as_ref()
.map(|(name, _)| Self::is_auth_flow(name, &path_str))
.unwrap_or(false);
let mut notes = Vec::new();
if let Some((func_name, callers)) = &containing_info {
notes.push(format!(
"📦 In function: `{}` ({} callers)",
func_name, callers
));
}
if is_auth {
notes.push("🔐 Part of authentication flow".to_string());
}
let context_notes = if notes.is_empty() {
String::new()
} else {
format!("\n\n**Context:**\n{}", notes.join("\n"))
};
let (tier, deterministic, confidence, evidence) =
if let Some(rule) = prescreen_blocking_rule {
let span = SourceSpan {
file: path.to_path_buf(),
line_start: line_num,
line_end: line_num,
snippet: Some(trimmed.chars().take(200).collect()),
};
(
Tier::Blocking,
true,
Some(1.0_f64),
Some(Evidence::ConfigFact {
span,
rule: rule.to_string(),
}),
)
} else {
(Tier::Advisory, false, None, None)
};
findings.push(Finding {
id: String::new(),
detector: "JwtWeakDetector".to_string(),
severity: vuln.severity(),
title: vuln.title().to_string(),
description: format!("{}{}", vuln.description(), context_notes),
affected_files: vec![path.to_path_buf()],
line_start: Some(line_num),
line_end: Some(line_num),
suggested_fix: Some(vuln.fix().to_string()),
estimated_effort: Some("30 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-327".to_string()),
why_it_matters: Some(
"JWT vulnerabilities can allow attackers to forge authentication tokens, \
impersonate users, escalate privileges, or bypass authorization entirely.".to_string()
),
tier,
deterministic,
confidence,
evidence,
..Default::default()
});
}
}
}
info!(
"JwtWeakDetector found {} findings (graph-aware)",
findings.len()
);
Ok(findings)
}
}
impl crate::detectors::RegisteredDetector for JwtWeakDetector {
fn create(init: &crate::detectors::DetectorInit) -> std::sync::Arc<dyn Detector> {
std::sync::Arc::new(Self::new(init.repo_path))
}
fn max_tier() -> crate::models::Tier {
crate::models::Tier::Blocking
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::builder::GraphBuilder;
#[test]
fn test_detects_none_algorithm() {
let store = GraphBuilder::new().freeze();
let detector = JwtWeakDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("auth.py", "import jwt\n\ndef decode_token(token):\n payload = jwt.decode(token, algorithm=\"none\")\n return payload\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(!findings.is_empty(), "Should detect JWT algorithm='none'");
assert!(
findings.iter().any(|f| f.title.contains("none")),
"Finding should mention 'none' algorithm. Titles: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
assert!(
findings.iter().any(|f| f.severity == Severity::Critical),
"JWT none algorithm should be Critical severity"
);
}
#[test]
fn test_no_finding_for_secure_jwt() {
let store = GraphBuilder::new().freeze();
let detector = JwtWeakDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(&store, vec![
("auth.py", "import jwt\n\ndef decode_token(token, public_key):\n payload = jwt.decode(token, public_key, algorithms=[\"RS256\"])\n return payload\n"),
]);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Secure JWT with RS256 should produce no findings, but got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn tls_verify_false_is_not_a_jwt_finding() {
let store = GraphBuilder::new().freeze();
let detector = JwtWeakDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"tls_call.py",
"import requests\n\
\n\
def fetch(url):\n\
\x20\x20\x20\x20resp = requests.get(url, verify=False)\n\
\x20\x20\x20\x20return resp.json()\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"TLS `verify=False` on a non-JWT call must not be flagged by JwtWeakDetector. Got: {:?}",
findings
.iter()
.map(|f| format!("L{:?}: {}", f.line_start, f.title))
.collect::<Vec<_>>()
);
}
#[test]
fn jwt_decode_verify_false_still_flagged() {
let store = GraphBuilder::new().freeze();
let detector = JwtWeakDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"auth.py",
"import jwt\n\
\n\
def decode_token(token):\n\
\x20\x20\x20\x20return jwt.decode(token, verify=False)\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"jwt.decode(..., verify=False) is the canonical JWT-verification-disabled bug and must still be flagged"
);
}
#[test]
fn multiline_jwt_decode_verify_false_still_flagged() {
let store = GraphBuilder::new().freeze();
let detector = JwtWeakDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"auth.py",
"import jwt\n\
\n\
def decode_token(token):\n\
\x20\x20\x20\x20return jwt.decode(\n\
\x20\x20\x20\x20\x20\x20\x20\x20token,\n\
\x20\x20\x20\x20\x20\x20\x20\x20verify=False,\n\
\x20\x20\x20\x20)\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"Multi-line jwt.decode with verify=False must still be flagged (context window gate)"
);
}
fn run_dual_branch(file: &str, content: &str) -> Vec<Finding> {
use crate::config::DualBranchConfig;
use std::collections::HashMap;
let store = GraphBuilder::new().freeze();
let detector = JwtWeakDetector::new("/mock/repo");
let mut detectors = HashMap::new();
detectors.insert("jwt-weak".to_string(), true);
let cfg = DualBranchConfig {
enabled: true,
detectors,
};
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(file, content)],
)
.with_dual_branch(cfg);
detector.detect(&ctx).expect("detection should succeed")
}
#[test]
fn flag_off_jwt_emits_single_branch_unchanged() {
let store = GraphBuilder::new().freeze();
let detector = JwtWeakDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(
"auth.py",
"import jwt\n\
def login_handler(token):\n\
\x20 return jwt.decode(token, algorithm='none')\n",
)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(!findings.is_empty(), "must still fire single-branch");
for f in &findings {
assert!(
f.alternative_branch.is_none(),
"no alternative_branch when flag off: {:?}",
f.title
);
assert!(
f.prediction_reasons.iter().all(|r| r.weight == 0.0),
"no weight-bearing predictor reasons when flag off; \
graph-enrichment weight-0 reasons are allowed. reasons: {:?}",
f.prediction_reasons
.iter()
.map(|r| (&r.kind, r.weight))
.collect::<Vec<_>>()
);
}
}
#[test]
fn case_a_flag_on_explicit_rs256_in_auth_handler_classifies_benign() {
let findings = run_dual_branch(
"auth.py",
"import jwt\n\
def login_handler(token, key):\n\
\x20 return jwt.decode(token, key, algorithms=['RS256'])\n",
);
assert!(!findings.is_empty(), "must surface even when Benign");
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("must have a dual-branch finding");
assert_eq!(
f.severity,
Severity::Info,
"Case A predicted Benign → Info, got {:?}",
f.severity
);
let alt = f.alternative_branch.as_ref().unwrap();
assert_eq!(alt.label, crate::dual_branch::BranchLabel::RealBug);
}
#[test]
fn case_b_flag_on_naked_decode_in_handler_classifies_realbug() {
let findings = run_dual_branch(
"auth.py",
"import jwt\n\
def login_handler(token, key):\n\
\x20 return jwt.decode(token, key)\n",
);
assert!(!findings.is_empty());
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("must have a dual-branch finding");
assert!(
matches!(f.severity, Severity::High | Severity::Critical),
"Case B predicted RealBug; got {:?}",
f.severity
);
let alt = f.alternative_branch.as_ref().unwrap();
assert_eq!(alt.label, crate::dual_branch::BranchLabel::Benign);
}
#[test]
fn case_c_flag_on_algorithm_none_singular_collapses_to_critical() {
let findings = run_dual_branch(
"decode.py",
"import jwt\n\
def decode_token(token):\n\
\x20 return jwt.decode(token, 'supersecret', algorithm='none')\n",
);
assert!(!findings.is_empty());
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("must have a dual-branch finding");
assert_eq!(
f.severity,
Severity::Critical,
"D1 amendment: 'none' collapses to Critical regardless of other signals"
);
let alt = f.alternative_branch.as_ref().unwrap();
assert_eq!(alt.label, crate::dual_branch::BranchLabel::Benign);
}
#[test]
fn case_d_algorithms_list_with_none_realbug() {
let findings = run_dual_branch(
"auth.py",
"import jwt\n\
def login_handler(token, key):\n\
\x20 return jwt.decode(token, key, algorithms=['none', 'HS256'], verify=True)\n",
);
assert!(!findings.is_empty());
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("must have a dual-branch finding");
assert_eq!(
f.severity,
Severity::Critical,
"D1 amendment: 'none' in algorithms list collapses to Critical"
);
let alt = f.alternative_branch.as_ref().unwrap();
assert_eq!(alt.label, crate::dual_branch::BranchLabel::Benign);
}
#[test]
fn case_e_hs256_with_public_key_classifies_realbug() {
let findings = run_dual_branch(
"verify.py",
"import jwt\n\
def verify(token, public_key):\n\
\x20 return jwt.decode(token, public_key, algorithms=['HS256'])\n",
);
assert!(!findings.is_empty());
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("must have a dual-branch finding");
assert!(
matches!(
f.severity,
Severity::Medium | Severity::High | Severity::Critical
),
"Case E RealBug, got {:?}",
f.severity
);
}
#[test]
fn flag_on_jwt_safe_annotation_collapses_to_info() {
let findings = run_dual_branch(
"auth.py",
"import jwt\n\
def login_handler(token):\n\
\x20 return jwt.decode(token, key, algorithm='none') # repotoire: jwt-safe[verified-at-edge]\n",
);
assert!(!findings.is_empty(), "annotation still surfaces a finding");
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("must have a dual-branch finding");
assert_eq!(
f.severity,
Severity::Info,
"jwt-safe annotation collapses to Info even over 'none' collapse"
);
}
#[test]
fn flag_on_non_python_unchanged() {
let findings = run_dual_branch(
"auth.js",
"const jwt = require('jsonwebtoken');\n\
function login(token) {\n\
\x20 return jwt.verify(token, 'k', { algorithm: 'none' });\n\
}\n",
);
for f in &findings {
assert!(
f.alternative_branch.is_none(),
"JS JWT must not have alternative_branch in v0; \
got finding: {:?}",
f.title
);
}
}
#[test]
fn real_cve_2015_2951_algorithm_none_pattern() {
let findings = run_dual_branch(
"real_cve_2015_2951.py",
"import jwt\n\
def get_user_from_token(token):\n\
\x20 payload = jwt.decode(token, algorithm='none')\n\
\x20 return payload\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("dual-branch finding expected for algorithm='none'");
assert_eq!(
f.severity,
Severity::Critical,
"CVE-2015-2951 shape must classify Critical via D1 \
amendment collapse; got {:?}, reasons={:?}",
f.severity,
f.prediction_reasons
.iter()
.map(|r| (&r.kind, r.weight))
.collect::<Vec<_>>()
);
let alt = f.alternative_branch.as_ref().unwrap();
assert_eq!(
alt.label,
crate::dual_branch::BranchLabel::Benign,
"alternative carries the Benign interpretation for user \
inspection (e.g. if the code path is dead)"
);
}
#[test]
fn real_pyjwt_recommended_rs256_pattern() {
let findings = run_dual_branch(
"real_pyjwt_rs256.py",
"import jwt\n\
PUBLIC_KEY = open('/etc/auth/jwt_public.pem').read()\n\
def authenticate(request):\n\
\x20 token = request.headers['Authorization'].split()[1]\n\
\x20 payload = jwt.decode(token, PUBLIC_KEY, algorithms=['RS256'])\n\
\x20 return payload\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("dual-branch finding expected for jwt.decode site");
assert_eq!(
f.severity,
Severity::Info,
"PyJWT-recommended RS256 shape must classify Benign/Info; \
got {:?}, reasons={:?}",
f.severity,
f.prediction_reasons
.iter()
.map(|r| (&r.kind, r.weight))
.collect::<Vec<_>>()
);
let alt = f.alternative_branch.as_ref().unwrap();
assert_eq!(alt.label, crate::dual_branch::BranchLabel::RealBug);
}
#[test]
fn real_jwt_key_confusion_cve_2016_10555_pattern() {
let findings = run_dual_branch(
"real_key_confusion.py",
"import jwt\n\
def verify_token(token, public_key):\n\
\x20 return jwt.decode(token, public_key, algorithms=['HS256'])\n",
);
let f = findings
.iter()
.find(|f| f.is_dual_branch())
.expect("dual-branch finding expected");
assert!(
matches!(
f.severity,
Severity::Medium | Severity::High | Severity::Critical
),
"CVE-2016-10555 shape (HS256 + RSA public key) must \
classify RealBug; got {:?}, reasons={:?}",
f.severity,
f.prediction_reasons
.iter()
.map(|r| (&r.kind, r.weight))
.collect::<Vec<_>>()
);
let alt = f.alternative_branch.as_ref().unwrap();
assert_eq!(alt.label, crate::dual_branch::BranchLabel::Benign);
}
fn run_legacy(file: &str, content: &str) -> Vec<Finding> {
let store = GraphBuilder::new().freeze();
let detector = JwtWeakDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(file, content)],
);
detector.detect(&ctx).expect("detection should succeed")
}
#[test]
fn exact_antipattern_is_blocking() {
let findings = run_legacy(
"auth.js",
"const jwt = require('jsonwebtoken');\n\
function sign(payload, key) {\n\
\x20 return jwt.sign(payload, key, { algorithm: 'none' });\n\
}\n",
);
let f = findings
.iter()
.find(|f| f.tier == Tier::Blocking)
.expect("jwt.sign with algorithm:'none' must produce a Blocking finding");
assert!(
matches!(&f.evidence, Some(Evidence::ConfigFact { rule, .. }) if rule == "jwt_alg_none"),
"rule must be jwt_alg_none, got evidence: {:?}",
f.evidence
);
assert!(f.deterministic, "must be deterministic");
assert_eq!(
f.confidence.unwrap(),
1.0,
"confidence must be 1.0 for Blocking"
);
let findings = run_legacy(
"auth.js",
"const jwt = require('jsonwebtoken');\n\
function verify(token) {\n\
\x20 return jwt.verify(token, '');\n\
}\n",
);
let f = findings
.iter()
.find(|f| f.tier == Tier::Blocking)
.expect("jwt.verify with empty secret must produce a Blocking finding");
assert!(
matches!(&f.evidence, Some(Evidence::ConfigFact { rule, .. }) if rule == "jwt_empty_secret"),
"rule must be jwt_empty_secret, got evidence: {:?}",
f.evidence
);
assert!(f.deterministic);
assert_eq!(f.confidence.unwrap(), 1.0);
let findings = run_legacy(
"auth.js",
"const jwt = require('jsonwebtoken');\n\
function sign(payload) {\n\
\x20 return jwt.sign(payload, 'hardcoded-secret-123', { algorithm: 'HS256' });\n\
}\n",
);
let f = findings
.iter()
.find(|f| f.tier == Tier::Blocking)
.expect("jwt.sign with hardcoded secret + HS256 must produce a Blocking finding");
assert!(
matches!(&f.evidence, Some(Evidence::ConfigFact { rule, .. }) if rule == "jwt_hardcoded_secret"),
"rule must be jwt_hardcoded_secret, got evidence: {:?}",
f.evidence
);
assert!(f.deterministic);
assert_eq!(f.confidence.unwrap(), 1.0);
}
#[test]
fn near_miss_is_advisory_or_absent() {
let findings = run_legacy(
"auth.js",
"const jwt = require('jsonwebtoken');\n\
function sign(p, key) {\n\
\x20 return jwt.sign(p, key, { algorithm: cfg.alg });\n\
}\n",
);
for f in &findings {
assert_ne!(
f.tier,
Tier::Blocking,
"dynamic algorithm should not be Blocking; got: {:?}",
f
);
}
let findings = run_legacy(
"auth.js",
"const jwt = require('jsonwebtoken');\n\
function verify(token) {\n\
\x20 return jwt.verify(token, process.env.JWT_SECRET);\n\
}\n",
);
assert!(
findings
.iter()
.all(|f| f.tier != Tier::Blocking && f.evidence.is_none()),
"jwt.verify with an env-var secret must not be Blocking or carry blocking evidence; got: {:?}",
findings.iter().map(|f| (&f.title, f.tier, &f.evidence)).collect::<Vec<_>>()
);
}
#[test]
fn commented_out_is_not_flagged() {
let findings = run_legacy(
"auth.js",
"const jwt = require('jsonwebtoken');\n\
// jwt.sign(payload, key, { algorithm: 'none' });\n\
function safe(p, k) { return jwt.sign(p, k, { algorithm: 'RS256' }); }\n",
);
assert!(
findings.iter().all(|f| f.tier != Tier::Blocking),
"commented-out algorithm:'none' must not be Blocking. findings: {:?}",
findings
.iter()
.map(|f| (&f.title, f.tier))
.collect::<Vec<_>>()
);
}
#[test]
fn in_test_fixture_is_advisory() {
let findings = run_legacy(
"tests/x.spec.js",
"const jwt = require('jsonwebtoken');\n\
it('rejects none algorithm', () => {\n\
\x20 const tok = jwt.sign({}, 'k', { algorithm: 'none' });\n\
});\n",
);
let blocking = findings.iter().any(|f| f.tier == Tier::Blocking);
assert!(
!blocking,
"algorithm:'none' in a test fixture must stay Advisory, not Blocking"
);
assert!(
!findings.is_empty(),
"test-fixture JWT anti-patterns should still produce Advisory findings"
);
for f in &findings {
assert_eq!(
f.tier,
Tier::Advisory,
"all findings from a test fixture must be Advisory"
);
}
}
}