use super::predict::{
algorithm_list_contains_none, extract_jwt_safe_reason, extract_jwt_vulnerable_source,
is_asymmetric_algorithm, matches_public_key_name, Evidence, JwtApi,
};
use crate::detectors::security::ast_helpers::{enclosing_python_function, node_text};
use std::collections::{HashMap, HashSet};
use tree_sitter::Node;
pub(super) struct PythonJwtSite<'a> {
pub call_node: Node<'a>,
pub api: JwtApi,
}
pub(super) fn collect_python_jwt_sites<'a>(
module_root: Node<'a>,
source: &'a [u8],
) -> Vec<PythonJwtSite<'a>> {
let imports = collect_jwt_imports(module_root, source);
if imports.is_empty() {
return Vec::new();
}
let aliases = collect_jwt_aliases(module_root, source);
let mut sites = Vec::new();
let mut stack: Vec<Node<'_>> = vec![module_root];
while let Some(node) = stack.pop() {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
stack.push(child);
}
if node.kind() != "call" {
continue;
}
let Some(func) = node.child_by_field_name("function") else {
continue;
};
let func_text = node_text(func, source).unwrap_or("");
if !is_jwt_callee(func_text) {
continue;
}
let api = classify_jwt_api(func_text, &imports, &aliases);
if !api.is_python() {
continue;
}
sites.push(PythonJwtSite {
call_node: node,
api,
});
}
sites
}
fn is_jwt_callee(func_text: &str) -> bool {
let tail = func_text.rsplit('.').next().unwrap_or(func_text);
matches!(tail, "decode" | "verify")
}
pub(super) fn extract_python_evidence<'a>(
call_node: Node<'a>,
module_root: Node<'a>,
source: &'a [u8],
lines: &[&str],
file_path: Option<String>,
) -> Evidence {
let mut ev = Evidence {
file_path,
..Default::default()
};
let imports = collect_jwt_imports(module_root, source);
ev.import_strong_key_lib = imports
.iter()
.any(|m| m.starts_with("cryptography") || m == "Crypto" || m.starts_with("Crypto."));
if let Some(fn_node) = enclosing_python_function(call_node) {
if let Some(name_node) = fn_node.child_by_field_name("name") {
if let Some(name) = node_text(name_node, source) {
ev.enclosing_function = Some(name.to_string());
}
}
}
ev.enclosing_class = enclosing_python_class_name(call_node, source);
let aliases = collect_jwt_aliases(module_root, source);
let func_text = call_node
.child_by_field_name("function")
.and_then(|f| node_text(f, source))
.unwrap_or("");
ev.api = Some(classify_jwt_api(func_text, &imports, &aliases));
if let Some(args) = call_node.child_by_field_name("arguments") {
let kwargs = collect_keyword_arguments(args, source);
if let Some(algos_val) = kwargs.get("algorithms") {
ev.explicit_algorithms_kwarg = true;
if algorithm_list_contains_none(algos_val) {
ev.algorithm_none_in_slot = true;
}
for elem in split_list_elements(algos_val) {
if is_asymmetric_algorithm(&elem) {
ev.asymmetric_algorithm_in_allowlist = true;
break;
}
}
}
if let Some(algo_val) = kwargs.get("algorithm") {
let stripped = algo_val.trim().trim_matches(['\'', '"']).to_lowercase();
if stripped == "none" {
ev.algorithm_singular_none = true;
ev.algorithm_none_in_slot = true;
}
}
if let Some(verify_val) = kwargs.get("verify") {
let stripped = verify_val.trim();
if stripped == "False" {
ev.explicit_verify_false = true;
} else if stripped == "True" {
ev.explicit_verify_true = true;
}
}
if let Some(options_val) = kwargs.get("options") {
let lower = options_val.to_lowercase();
if lower.contains("verify_signature") {
if let Some(key_pos) = lower.find("verify_signature") {
let after = &lower[key_pos..];
let window: String = after.chars().take(40).collect();
if window.contains(": false") || window.contains(":false") {
ev.explicit_verify_false = true;
} else if window.contains(": true") || window.contains(":true") {
ev.explicit_verify_true = true;
}
}
}
}
let uses_hs256 = uses_hs256(&kwargs);
if uses_hs256 {
if let Some(key_arg) = nth_positional_arg(args, 1) {
let key_text = node_text(key_arg, source).unwrap_or("");
if matches_public_key_name(key_text) {
ev.hs256_with_public_key = true;
}
if key_arg.kind() == "string" {
let raw = key_text.trim_matches(['\'', '"', 'b', 'r']);
let inner = raw.trim_matches(['\'', '"']);
if !inner.is_empty() && inner.chars().count() <= 16 {
ev.hs256_with_short_secret = true;
}
}
}
}
}
let line_idx = call_node.start_position().row;
if let Some(line) = lines.get(line_idx) {
ev.jwt_safe_annotation = extract_jwt_safe_reason(line);
ev.jwt_vulnerable_annotation = extract_jwt_vulnerable_source(line);
}
ev
}
fn uses_hs256(kwargs: &HashMap<String, String>) -> bool {
if let Some(val) = kwargs.get("algorithms") {
let lower = val.to_lowercase();
if lower.contains("hs256") || lower.contains("hs384") || lower.contains("hs512") {
return true;
}
}
if let Some(val) = kwargs.get("algorithm") {
let lower = val.to_lowercase();
if lower.contains("hs256") || lower.contains("hs384") || lower.contains("hs512") {
return true;
}
}
false
}
fn collect_keyword_arguments(args_node: Node<'_>, source: &[u8]) -> HashMap<String, String> {
let mut map = HashMap::new();
let mut cursor = args_node.walk();
for child in args_node.children(&mut cursor) {
if !child.is_named() {
continue;
}
if child.kind() != "keyword_argument" {
continue;
}
let name = child
.child_by_field_name("name")
.and_then(|n| node_text(n, source))
.map(str::to_string);
let value = child
.child_by_field_name("value")
.and_then(|n| node_text(n, source))
.map(str::to_string);
if let (Some(n), Some(v)) = (name, value) {
map.insert(n, v);
}
}
map
}
fn nth_positional_arg<'a>(args_node: Node<'a>, n: usize) -> Option<Node<'a>> {
let mut cursor = args_node.walk();
let mut idx = 0;
for child in args_node.children(&mut cursor) {
if !child.is_named() {
continue;
}
if child.kind() == "keyword_argument" {
continue;
}
if idx == n {
return Some(child);
}
idx += 1;
}
None
}
fn split_list_elements(list_text: &str) -> Vec<String> {
let trimmed = list_text.trim().trim_matches(['[', ']', '(', ')']);
trimmed
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect()
}
fn collect_jwt_imports<'a>(root: Node<'a>, source: &'a [u8]) -> HashSet<String> {
let mut set = HashSet::new();
let mut cursor = root.walk();
for top in root.children(&mut cursor) {
match top.kind() {
"import_statement" => {
let mut nc = top.walk();
for child in top.children(&mut nc) {
if !child.is_named() {
continue;
}
let module_name = match child.kind() {
"dotted_name" => node_text(child, source).map(str::to_string),
"aliased_import" => child
.child_by_field_name("name")
.and_then(|n| node_text(n, source))
.map(str::to_string),
_ => None,
};
if let Some(name) = module_name {
if is_jwt_module(&name) {
set.insert(name);
}
}
}
}
"import_from_statement" => {
if let Some(m) = top.child_by_field_name("module_name") {
if let Some(name) = node_text(m, source) {
if is_jwt_module(name) {
set.insert(name.to_string());
}
}
}
}
_ => {}
}
}
set
}
fn is_jwt_module(name: &str) -> bool {
const JWT_MODULES: &[&str] = &[
"jwt",
"jose",
"jose.jwt",
"python_jose",
"python_jose.jwt",
"authlib",
"authlib.jose",
"cryptography",
"Crypto",
];
JWT_MODULES
.iter()
.any(|m| name == *m || name.starts_with(&format!("{m}.")))
}
fn collect_jwt_aliases<'a>(root: Node<'a>, source: &'a [u8]) -> HashMap<String, String> {
let mut map = HashMap::new();
let mut cursor = root.walk();
for top in root.children(&mut cursor) {
match top.kind() {
"import_statement" => {
let mut nc = top.walk();
for child in top.children(&mut nc) {
if !child.is_named() {
continue;
}
if child.kind() == "aliased_import" {
let module = child
.child_by_field_name("name")
.and_then(|n| node_text(n, source));
let alias = child
.child_by_field_name("alias")
.and_then(|n| node_text(n, source));
if let (Some(m), Some(a)) = (module, alias) {
if is_jwt_module(m) {
map.insert(a.to_string(), m.to_string());
}
}
}
}
}
"import_from_statement" => {
let module = top
.child_by_field_name("module_name")
.and_then(|n| node_text(n, source));
let Some(module) = module else { continue };
if !is_jwt_module(module) {
continue;
}
let module_name_id = top.child_by_field_name("module_name").map(|n| n.id());
let mut nc = top.walk();
for child in top.children(&mut nc) {
if !child.is_named() || Some(child.id()) == module_name_id {
continue;
}
match child.kind() {
"dotted_name" => {
if let Some(name) = node_text(child, source) {
map.insert(name.to_string(), module.to_string());
}
}
"aliased_import" => {
let alias = child
.child_by_field_name("alias")
.and_then(|n| node_text(n, source));
if let Some(a) = alias {
map.insert(a.to_string(), module.to_string());
}
}
_ => {}
}
}
}
_ => {}
}
}
map
}
fn classify_jwt_api(
func_text: &str,
imports: &HashSet<String>,
aliases: &HashMap<String, String>,
) -> JwtApi {
for seg in chain_identifiers(func_text) {
if let Some(module) = aliases.get(seg) {
if let Some(api) = jwt_api_from_module(module) {
return api;
}
}
if let Some(api) = jwt_api_from_module(seg) {
return api;
}
}
let jwt_libs: Vec<&str> = ["jwt", "jose", "python_jose", "authlib"]
.into_iter()
.filter(|lib| {
imports
.iter()
.any(|m| m == lib || m.starts_with(&format!("{lib}.")))
})
.collect();
if jwt_libs.len() == 1 {
return match jwt_libs[0] {
"jwt" => JwtApi::PyJwt,
"jose" | "python_jose" => JwtApi::PythonJose,
"authlib" => JwtApi::JwtVerifier,
_ => JwtApi::Unknown,
};
}
JwtApi::Unknown
}
fn jwt_api_from_module(module: &str) -> Option<JwtApi> {
if module == "jwt" || module.starts_with("jwt.") {
return Some(JwtApi::PyJwt);
}
if module == "jose"
|| module.starts_with("jose.")
|| module == "python_jose"
|| module.starts_with("python_jose.")
{
return Some(JwtApi::PythonJose);
}
if module == "authlib" || module.starts_with("authlib.") {
return Some(JwtApi::JwtVerifier);
}
None
}
fn chain_identifiers(text: &str) -> Vec<&str> {
text.split('.')
.map(|seg| match seg.find('(') {
Some(i) => &seg[..i],
None => seg,
})
.filter(|s| !s.is_empty())
.collect()
}
fn enclosing_python_class_name<'a>(node: Node<'a>, source: &'a [u8]) -> Option<String> {
let mut cur = node.parent()?;
loop {
if cur.kind() == "class_definition" {
let name = cur.child_by_field_name("name")?;
return node_text(name, source).map(str::to_string);
}
if cur.kind() == "module" {
return None;
}
cur = cur.parent()?;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::detectors::ast_fingerprint::parse_root_ext;
use crate::parsers::lightweight::Language;
fn first_call_with_attr<'tree>(
tree: &'tree tree_sitter::Tree,
source: &[u8],
attr_name: &str,
) -> tree_sitter::Node<'tree> {
fn walk<'a>(
node: tree_sitter::Node<'a>,
source: &[u8],
attr_name: &str,
) -> Option<tree_sitter::Node<'a>> {
if node.kind() == "call" {
if let Some(func) = node.child_by_field_name("function") {
let text = node_text(func, source).unwrap_or("");
let last = text.rsplit('.').next().unwrap_or("");
if last == attr_name {
return Some(node);
}
}
}
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if let Some(found) = walk(child, source, attr_name) {
return Some(found);
}
}
None
}
walk(tree.root_node(), source, attr_name)
.unwrap_or_else(|| panic!("no call to {} found in source", attr_name))
}
fn extract(src: &str, attr: &str) -> Evidence {
let tree = parse_root_ext(src, Language::Python, "py").expect("parse python");
let root = tree.root_node();
let call = first_call_with_attr(&tree, src.as_bytes(), attr);
let lines: Vec<&str> = src.lines().collect();
extract_python_evidence(call, root, src.as_bytes(), &lines, None)
}
#[test]
fn detects_pyjwt_import() {
let src = "import jwt\njwt.decode(token, key, algorithms=['RS256'])\n";
let ev = extract(src, "decode");
assert_eq!(ev.api, Some(JwtApi::PyJwt));
}
#[test]
fn detects_jose_import() {
let src = "from jose import jwt\njwt.decode(token, key, algorithms=['RS256'])\n";
let ev = extract(src, "decode");
assert_eq!(ev.api, Some(JwtApi::PythonJose));
}
#[test]
fn detects_authlib_import() {
let src = "from authlib.jose import jwt\njwt.decode(token, key)\n";
let ev = extract(src, "decode");
assert_eq!(ev.api, Some(JwtApi::JwtVerifier));
}
#[test]
fn detects_cryptography_import() {
let src = "\
from cryptography.hazmat.primitives import serialization\n\
import jwt\n\
def f(token):\n\
\x20 key = serialization.load_pem_public_key(open('k.pem').read())\n\
\x20 return jwt.decode(token, key, algorithms=['RS256'])\n";
let ev = extract(src, "decode");
assert!(ev.import_strong_key_lib);
}
#[test]
fn aliased_jwt_import_classifies_correctly() {
let src = "import jwt as pyjwt\npyjwt.decode(token, key, algorithms=['RS256'])\n";
let ev = extract(src, "decode");
assert_eq!(ev.api, Some(JwtApi::PyJwt));
}
#[test]
fn detects_explicit_algorithms_kwarg_with_rs256() {
let src = "\
import jwt\n\
def f(token, key):\n\
\x20 return jwt.decode(token, key, algorithms=['RS256'])\n";
let ev = extract(src, "decode");
assert!(ev.explicit_algorithms_kwarg);
assert!(ev.asymmetric_algorithm_in_allowlist);
assert!(!ev.algorithm_none_in_slot);
}
#[test]
fn detects_explicit_algorithms_kwarg_with_hs256() {
let src = "\
import jwt\n\
def f(token, key):\n\
\x20 return jwt.decode(token, key, algorithms=['HS256'])\n";
let ev = extract(src, "decode");
assert!(ev.explicit_algorithms_kwarg);
assert!(!ev.asymmetric_algorithm_in_allowlist);
}
#[test]
fn no_algorithms_kwarg_omitted_signal_on_naked_decode() {
let src = "\
import jwt\n\
def f(token, key):\n\
\x20 return jwt.decode(token, key)\n";
let ev = extract(src, "decode");
assert!(!ev.explicit_algorithms_kwarg);
assert!(!ev.algorithm_none_in_slot);
}
#[test]
fn detects_singular_algorithm_none_kwarg() {
let src = "\
import jwt\n\
def f(token):\n\
\x20 return jwt.decode(token, 'secret', algorithm='none')\n";
let ev = extract(src, "decode");
assert!(ev.algorithm_singular_none);
assert!(ev.algorithm_none_in_slot, "must trigger collapse path");
}
#[test]
fn detects_none_in_algorithms_list() {
let src = "\
import jwt\n\
def f(token, key):\n\
\x20 return jwt.decode(token, key, algorithms=['none', 'HS256'])\n";
let ev = extract(src, "decode");
assert!(ev.explicit_algorithms_kwarg);
assert!(ev.algorithm_none_in_slot, "D1 amendment trigger");
}
#[test]
fn detects_none_case_insensitive_in_list() {
let src = "\
import jwt\n\
def f(token, key):\n\
\x20 return jwt.decode(token, key, algorithms=['NONE'])\n";
let ev = extract(src, "decode");
assert!(ev.algorithm_none_in_slot);
}
#[test]
fn no_none_signal_for_safe_allowlist() {
let src = "\
import jwt\n\
def f(token, key):\n\
\x20 return jwt.decode(token, key, algorithms=['RS256', 'ES256'])\n";
let ev = extract(src, "decode");
assert!(!ev.algorithm_none_in_slot);
assert!(!ev.algorithm_singular_none);
}
#[test]
fn detects_verify_false_kwarg() {
let src = "\
import jwt\n\
def f(token):\n\
\x20 return jwt.decode(token, 'k', algorithms=['HS256'], verify=False)\n";
let ev = extract(src, "decode");
assert!(ev.explicit_verify_false);
assert!(!ev.explicit_verify_true);
}
#[test]
fn detects_verify_true_kwarg() {
let src = "\
import jwt\n\
def f(token):\n\
\x20 return jwt.decode(token, 'k', algorithms=['HS256'], verify=True)\n";
let ev = extract(src, "decode");
assert!(ev.explicit_verify_true);
assert!(!ev.explicit_verify_false);
}
#[test]
fn detects_options_verify_signature_false() {
let src = "\
import jwt\n\
def f(token):\n\
\x20 return jwt.decode(token, 'k', options={'verify_signature': False})\n";
let ev = extract(src, "decode");
assert!(ev.explicit_verify_false);
}
#[test]
fn detects_options_verify_signature_true() {
let src = "\
import jwt\n\
def f(token):\n\
\x20 return jwt.decode(token, 'k', algorithms=['RS256'], options={'verify_signature': True})\n";
let ev = extract(src, "decode");
assert!(ev.explicit_verify_true);
}
#[test]
fn detects_hs256_with_public_key_name() {
let src = "\
import jwt\n\
def f(token, public_key):\n\
\x20 return jwt.decode(token, public_key, algorithms=['HS256'])\n";
let ev = extract(src, "decode");
assert!(ev.hs256_with_public_key);
}
#[test]
fn detects_hs256_with_pub_key_short_name() {
let src = "\
import jwt\n\
def f(token, pub_key):\n\
\x20 return jwt.decode(token, pub_key, algorithms=['HS256'])\n";
let ev = extract(src, "decode");
assert!(ev.hs256_with_public_key);
}
#[test]
fn no_hs256_pubkey_when_using_rs256() {
let src = "\
import jwt\n\
def f(token, public_key):\n\
\x20 return jwt.decode(token, public_key, algorithms=['RS256'])\n";
let ev = extract(src, "decode");
assert!(!ev.hs256_with_public_key);
}
#[test]
fn detects_hs256_with_short_secret_literal() {
let src = "\
import jwt\n\
def f(token):\n\
\x20 return jwt.decode(token, 'secret', algorithms=['HS256'])\n";
let ev = extract(src, "decode");
assert!(ev.hs256_with_short_secret);
}
#[test]
fn no_short_secret_for_long_string() {
let src = "\
import jwt\n\
def f(token):\n\
\x20 return jwt.decode(token, 'this_is_a_very_long_secret_key_definitely', algorithms=['HS256'])\n";
let ev = extract(src, "decode");
assert!(!ev.hs256_with_short_secret);
}
#[test]
fn detects_enclosing_function() {
let src = "\
import jwt\n\
def login_handler(token, key):\n\
\x20 jwt.decode(token, key, algorithms=['RS256'])\n";
let ev = extract(src, "decode");
assert_eq!(ev.enclosing_function, Some("login_handler".to_string()));
}
#[test]
fn detects_enclosing_class() {
let src = "\
import jwt\n\
class TokenService:\n\
\x20 def verify(self, token, key):\n\
\x20 jwt.decode(token, key, algorithms=['RS256'])\n";
let ev = extract(src, "decode");
assert_eq!(ev.enclosing_class, Some("TokenService".to_string()));
}
#[test]
fn detects_jwt_safe_annotation() {
let src = "\
import jwt\n\
def f(token, key):\n\
\x20 return jwt.decode(token, key) # repotoire: jwt-safe[verified-at-edge]\n";
let ev = extract(src, "decode");
assert_eq!(ev.jwt_safe_annotation, Some("verified-at-edge".to_string()));
assert_eq!(ev.jwt_vulnerable_annotation, None);
}
#[test]
fn detects_jwt_vulnerable_annotation() {
let src = "\
import jwt\n\
def f(token, key):\n\
\x20 return jwt.decode(token, key, algorithms=['RS256']) # repotoire: jwt-vulnerable[alg-from-header]\n";
let ev = extract(src, "decode");
assert_eq!(
ev.jwt_vulnerable_annotation,
Some("alg-from-header".to_string())
);
assert_eq!(ev.jwt_safe_annotation, None);
}
#[test]
fn ignores_unrelated_annotation_kinds() {
let src = "\
import jwt\n\
def f(token, key):\n\
\x20 return jwt.decode(token, key) # repotoire: ssrf-safe[ok]\n";
let ev = extract(src, "decode");
assert_eq!(ev.jwt_safe_annotation, None);
assert_eq!(ev.jwt_vulnerable_annotation, None);
}
#[test]
fn file_path_propagates_to_evidence() {
let src = "import jwt\njwt.decode(token, key, algorithms=['RS256'])\n";
let tree = parse_root_ext(src, Language::Python, "py").expect("parse python");
let root = tree.root_node();
let call = first_call_with_attr(&tree, src.as_bytes(), "decode");
let lines: Vec<&str> = src.lines().collect();
let ev = extract_python_evidence(
call,
root,
src.as_bytes(),
&lines,
Some("/app/auth/handlers.py".to_string()),
);
assert_eq!(ev.file_path, Some("/app/auth/handlers.py".to_string()));
}
#[test]
fn is_jwt_module_matches_exact_and_submodules() {
assert!(is_jwt_module("jwt"));
assert!(is_jwt_module("jwt.algorithms"));
assert!(is_jwt_module("jose"));
assert!(is_jwt_module("jose.jwt"));
assert!(is_jwt_module("python_jose"));
assert!(is_jwt_module("authlib"));
assert!(is_jwt_module("authlib.jose"));
assert!(is_jwt_module("cryptography"));
assert!(is_jwt_module("Crypto"));
assert!(!is_jwt_module("os"));
assert!(!is_jwt_module("subprocess"));
assert!(!is_jwt_module("jwts"));
}
#[test]
fn is_jwt_callee_matches_decode_verify() {
assert!(is_jwt_callee("jwt.decode"));
assert!(is_jwt_callee("jwt.verify"));
assert!(is_jwt_callee("jose.jwt.decode"));
assert!(is_jwt_callee("decode"));
assert!(is_jwt_callee("base64.decode")); assert!(!is_jwt_callee("jwt.encode"));
assert!(!is_jwt_callee("json.loads"));
}
#[test]
fn collect_sites_excludes_base64_decode_without_jwt_import() {
let src = "\
import base64\n\
def f(data):\n\
\x20 return base64.decode(data)\n";
let tree = parse_root_ext(src, Language::Python, "py").expect("parse python");
let sites = collect_python_jwt_sites(tree.root_node(), src.as_bytes());
assert!(sites.is_empty(), "base64.decode must not be a JWT site");
}
#[test]
fn collect_sites_finds_jwt_decode() {
let src = "import jwt\njwt.decode(t, k, algorithms=['RS256'])\n";
let tree = parse_root_ext(src, Language::Python, "py").expect("parse python");
let sites = collect_python_jwt_sites(tree.root_node(), src.as_bytes());
assert_eq!(sites.len(), 1);
assert_eq!(sites[0].api, JwtApi::PyJwt);
}
#[test]
fn case_a_extraction_explicit_rs256_in_login_handler() {
let src = "\
import jwt\n\
def login_handler(request):\n\
\x20 token = request.cookies['jwt']\n\
\x20 return jwt.decode(token, key, algorithms=['RS256'])\n";
let ev = extract(src, "decode");
assert_eq!(ev.api, Some(JwtApi::PyJwt));
assert!(ev.explicit_algorithms_kwarg);
assert!(ev.asymmetric_algorithm_in_allowlist);
assert!(!ev.algorithm_none_in_slot);
assert_eq!(ev.enclosing_function, Some("login_handler".to_string()));
}
#[test]
fn case_b_extraction_naked_decode_in_login_handler() {
let src = "\
import jwt\n\
def login_handler(request):\n\
\x20 token = request.cookies['jwt']\n\
\x20 return jwt.decode(token, key)\n";
let ev = extract(src, "decode");
assert!(!ev.explicit_algorithms_kwarg);
assert_eq!(ev.enclosing_function, Some("login_handler".to_string()));
}
#[test]
fn case_c_extraction_algorithm_none_singular() {
let src = "\
import jwt\n\
def decode_token(token):\n\
\x20 return jwt.decode(token, 'supersecret', algorithm='none')\n";
let ev = extract(src, "decode");
assert!(ev.algorithm_singular_none);
assert!(ev.algorithm_none_in_slot);
}
#[test]
fn case_d_extraction_algorithms_list_with_none() {
let src = "\
import jwt\n\
def login_handler(token, key):\n\
\x20 return jwt.decode(token, key, algorithms=['none', 'HS256'])\n";
let ev = extract(src, "decode");
assert!(ev.explicit_algorithms_kwarg);
assert!(ev.algorithm_none_in_slot);
}
#[test]
fn case_e_extraction_hs256_with_public_key() {
let src = "\
import jwt\n\
def f(token, public_key):\n\
\x20 return jwt.decode(token, public_key, algorithms=['HS256'])\n";
let ev = extract(src, "decode");
assert!(ev.explicit_algorithms_kwarg);
assert!(ev.hs256_with_public_key);
}
#[test]
fn jwt_safe_annotation_records_alongside_other_signals() {
let src = "\
import jwt\n\
def login_handler(token, key):\n\
\x20 return jwt.decode(token, key, algorithm='none') # repotoire: jwt-safe[verified-at-edge]\n";
let ev = extract(src, "decode");
assert_eq!(ev.jwt_safe_annotation, Some("verified-at-edge".to_string()));
assert!(ev.algorithm_none_in_slot);
assert_eq!(ev.enclosing_function, Some("login_handler".to_string()));
}
}