mod annotation;
mod evidence;
mod predict;
use crate::detectors::ast_fingerprint::parse_root_ext;
use crate::detectors::ast_walk::AstWalkCtx;
use crate::detectors::base::{Detector, DetectorConfig};
use crate::detectors::fast_search::*;
use crate::detectors::security::ast_helpers::{
collect_named_args, node_text, receiver_chain_label as receiver_chain_label_shared,
unwrap_callee,
};
use crate::detectors::security::scan_inputs::{ScanAstInputs, ScanInputs};
use crate::graph::GraphQueryExt;
use crate::models::{Finding, Severity};
use crate::parsers::lightweight::Language;
use anyhow::Result;
use regex::Regex;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::LazyLock;
use tracing::{debug, info};
const SUPPORTED_EXTS: &[&str] = &[
"py", "js", "ts", "jsx", "tsx", "rb", "java", "go", "php", "c", "cc", "cpp", "h", "hpp", "cs",
];
const AST_EXTS: &[&str] = &["py", "js", "ts", "jsx", "tsx"];
fn classify_algo_name(name: &str) -> AlgoKind {
let n = name.to_lowercase();
let n = n.replace([' ', '_', '-'], "");
if matches!(
n.as_str(),
"md5"
| "md4"
| "md2"
| "sha"
| "sha1"
| "sha0"
| "ripemd"
| "ripemd128"
| "ripemd160"
| "des"
| "rc4"
| "rc2"
| "arc4"
| "blowfish"
| "3des"
| "tripledes"
) {
return AlgoKind::Broken(name.to_string());
}
if matches!(
n.as_str(),
"sha256"
| "sha384"
| "sha512"
| "sha224"
| "sha3"
| "sha3256"
| "sha3384"
| "sha3512"
| "blake2"
| "blake2b"
| "blake2s"
| "blake3"
| "aes"
| "aes128"
| "aes192"
| "aes256"
| "chacha20"
| "chacha20poly1305"
| "xchacha20poly1305"
) {
return AlgoKind::Safe;
}
AlgoKind::Unknown
}
fn split_algo_mode(name: &str) -> Option<(String, String)> {
let lower = name.to_lowercase();
const MODES: &[&str] = &["ecb", "cbc", "ctr", "ofb", "cfb", "gcm", "ccm", "xts"];
for mode in MODES {
for sep in ['-', '_', '/'] {
let needle = format!("{sep}{mode}");
if let Some(idx) = lower.rfind(&needle) {
let algo = &lower[..idx];
let algo_canonical: String =
algo.chars().take_while(|c| c.is_alphabetic()).collect();
let algo_canonical = if algo_canonical.is_empty() {
algo.to_string()
} else {
algo_canonical
};
return Some((algo_canonical, (*mode).to_string()));
}
}
}
None
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum AlgoKind {
Broken(String),
Safe,
Unknown,
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum WeakCryptoCallKind {
BrokenAlgoIdentifier(String),
BrokenAlgoStringLiteral(String),
#[allow(dead_code)]
WeakModeCombo { algo: String, mode: String },
UnknownAlgoFromVariable,
}
impl WeakCryptoCallKind {
fn algo_label(&self) -> String {
match self {
WeakCryptoCallKind::BrokenAlgoIdentifier(n)
| WeakCryptoCallKind::BrokenAlgoStringLiteral(n) => n.clone(),
WeakCryptoCallKind::WeakModeCombo { algo, mode } => format!("{algo}/{mode}"),
WeakCryptoCallKind::UnknownAlgoFromVariable => "<dynamic>".to_string(),
}
}
}
fn severity_for(kind: &WeakCryptoCallKind) -> Severity {
match kind {
WeakCryptoCallKind::BrokenAlgoIdentifier(_)
| WeakCryptoCallKind::BrokenAlgoStringLiteral(_) => Severity::High,
WeakCryptoCallKind::WeakModeCombo { .. } => {
Severity::High
}
WeakCryptoCallKind::UnknownAlgoFromVariable => Severity::Low,
}
}
struct DualBranchPolicy {
flag_on: bool,
}
impl DualBranchPolicy {
fn applies_to(&self, lang: Language, kind: &WeakCryptoCallKind) -> bool {
if !self.flag_on {
return false;
}
if !matches!(lang, Language::Python) {
return false;
}
let algo_name = match kind {
WeakCryptoCallKind::BrokenAlgoIdentifier(n)
| WeakCryptoCallKind::BrokenAlgoStringLiteral(n) => n,
_ => return false,
};
matches!(
algo_name.to_lowercase().as_str(),
"md5" | "sha1" | "sha" | "md4"
)
}
}
pub struct InsecureCryptoDetector {
repository_path: PathBuf,
max_findings: usize,
}
impl InsecureCryptoDetector {
crate::detectors::detector_new!(50);
fn relative_path(&self, path: &Path) -> PathBuf {
crate::detectors::detector_relative_path(&self.repository_path, path)
}
fn scan_file_ast(
&self,
inputs: &ScanAstInputs<'_>,
dual_branch_policy: &DualBranchPolicy,
) -> Vec<Finding> {
let path = inputs.path();
let content = inputs.content();
let ext = inputs.ext();
let lang = inputs.lang;
let cached_tree = inputs.cached_tree;
let mut findings = vec![];
if content.contains('\0') || content.len() > 500_000 {
return findings;
}
let owned;
let root = match cached_tree {
Some(tree) => tree.root_node(),
None => match parse_root_ext(content, lang, ext) {
Some(t) => {
owned = t;
owned.root_node()
}
None => return findings,
},
};
let bytes = content.as_bytes();
let lines: Vec<&str> = content.lines().collect();
let alias_map = if matches!(lang, Language::Python) {
collect_python_from_imports(root, bytes)
} else {
HashMap::new()
};
let module_aliases = if matches!(lang, Language::Python) {
collect_python_module_aliases(root, bytes)
} else {
HashMap::new()
};
let mut sites: Vec<CryptoSite> = Vec::new();
let ctx = AstWalkCtx {
lang,
source: bytes,
};
let aliases = PythonAliases::new(&alias_map, &module_aliases);
collect_crypto_sites(&ctx, root, &aliases, &mut sites);
for site in sites {
if findings.len() >= self.max_findings {
break;
}
let line_idx = site.call_node.start_position().row;
if let Some(line) = lines.get(line_idx) {
let prev = if line_idx > 0 {
Some(lines[line_idx - 1])
} else {
None
};
if crate::detectors::is_line_suppressed(line, prev) {
continue;
}
}
let snippet = lines.get(line_idx).map(|s| s.trim()).unwrap_or("");
let line_num = (line_idx + 1) as u32;
let finding = if dual_branch_policy.applies_to(lang, &site.kind) {
self.build_dual_branch_python_finding(
path,
line_num,
&site.kind,
snippet,
site.call_node,
bytes,
&lines,
)
} else {
self.build_finding(path, line_num, &site.kind, snippet, ext)
};
findings.push(finding);
}
findings
}
fn scan_file_line(&self, inputs: &ScanInputs<'_>) -> Vec<Finding> {
let path = inputs.path;
let content = inputs.content;
let ext = inputs.ext;
let mut findings = vec![];
if content.len() > 500_000 {
return findings;
}
let lines: Vec<&str> = content.lines().collect();
for (i, line) in lines.iter().enumerate() {
if findings.len() >= self.max_findings {
break;
}
let prev = if i > 0 { Some(lines[i - 1]) } else { None };
if crate::detectors::is_line_suppressed(line, prev) {
continue;
}
let trimmed = line.trim_start();
if trimmed.starts_with('#') || trimmed.starts_with("//") || trimmed.starts_with('*') {
continue;
}
if let Some(kind) = match_line_crypto(line, ext) {
let line_num = (i + 1) as u32;
findings.push(self.build_finding(path, line_num, &kind, line.trim(), ext));
}
}
findings
}
fn build_finding(
&self,
path: &Path,
line_num: u32,
kind: &WeakCryptoCallKind,
snippet: &str,
_ext: &str,
) -> Finding {
let (title, suggested_fix, cwe) = match kind {
WeakCryptoCallKind::BrokenAlgoIdentifier(name)
| WeakCryptoCallKind::BrokenAlgoStringLiteral(name) => {
let upper = name.to_uppercase();
if matches!(
upper.as_str(),
"MD5" | "SHA1" | "SHA-1" | "MD4" | "MD2" | "SHA"
) {
(
format!("Weak hash algorithm ({upper})"),
"Use SHA-256 / SHA-3 / BLAKE3 instead. For password hashing use Argon2 or scrypt.".to_string(),
"CWE-328",
)
} else {
(
format!("Weak cipher algorithm ({upper})"),
"Use AES-GCM or ChaCha20-Poly1305 instead.".to_string(),
"CWE-327",
)
}
}
WeakCryptoCallKind::WeakModeCombo { algo, mode } => (
format!(
"Weak cipher mode ({}/{})",
algo.to_uppercase(),
mode.to_uppercase()
),
"ECB mode leaks plaintext patterns. Use AES-GCM or ChaCha20-Poly1305.".to_string(),
"CWE-327",
),
WeakCryptoCallKind::UnknownAlgoFromVariable => (
"Cryptographic algorithm chosen at runtime".to_string(),
"Validate the algorithm against an allowlist (SHA-256+, AES-GCM, ChaCha20-Poly1305).".to_string(),
"CWE-327",
),
};
let description = format!(
"**Insecure cryptographic primitive**\n\n\
**Algorithm**: `{}`\n\n\
**Location**: {}:{}\n\n\
**Code**:\n```\n{}\n```\n\n\
Cryptographically broken primitives (MD5, SHA-1, DES, RC4, ECB mode) can be \
exploited via collision attacks, key recovery, or plaintext-pattern leakage. \
Replace them with modern primitives.",
kind.algo_label(),
path.display(),
line_num,
snippet,
);
Finding {
id: String::new(),
detector: "InsecureCryptoDetector".to_string(),
severity: severity_for(kind),
title,
description,
affected_files: vec![self.relative_path(path)],
line_start: Some(line_num),
line_end: Some(line_num),
suggested_fix: Some(suggested_fix),
estimated_effort: Some("30 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some(cwe.to_string()),
why_it_matters: Some(
"Broken primitives let attackers forge signatures, recover keys, or leak \
plaintext patterns."
.to_string(),
),
..Default::default()
}
}
fn build_dual_branch_python_finding(
&self,
path: &Path,
line_num: u32,
kind: &WeakCryptoCallKind,
snippet: &str,
call_node: tree_sitter::Node<'_>,
source: &[u8],
lines: &[&str],
) -> Finding {
let algo_label = match kind {
WeakCryptoCallKind::BrokenAlgoIdentifier(n)
| WeakCryptoCallKind::BrokenAlgoStringLiteral(n) => n.to_uppercase(),
_ => unreachable!(
"build_dual_branch_python_finding called for non-hash kind: {kind:?} \
— DualBranchPolicy::applies_to should have filtered this"
),
};
let evidence = evidence::extract_python_evidence(call_node, source, lines);
let prediction = predict::predict(&evidence, &algo_label);
let predicted_label = prediction.predicted;
let predicted_severity = prediction.predicted_severity;
let predicted_title = match predicted_label {
crate::dual_branch::BranchLabel::RealBug => {
format!("Weak hash algorithm ({algo_label})")
}
crate::dual_branch::BranchLabel::Benign => {
format!("Non-security use of {algo_label} (informational)")
}
};
let predicted_description = format!(
"**Insecure cryptographic primitive (dual-branch)**\n\n\
**Algorithm**: `{}`\n\n\
**Location**: {}:{}\n\n\
**Code**:\n```\n{}\n```\n\n\
{}",
kind.algo_label(),
path.display(),
line_num,
snippet,
match predicted_label {
crate::dual_branch::BranchLabel::RealBug => format!(
"{algo_label} is cryptographically broken. The predictor leans \
RealBug for this call site (see `prediction_reasons`)."
),
crate::dual_branch::BranchLabel::Benign => format!(
"{algo_label} appears to be used for a non-security purpose at \
this call site. The predictor leans Benign (see \
`prediction_reasons`); the High-severity interpretation is \
carried in `alternative_branch`."
),
},
);
let predicted_fix = match predicted_label {
crate::dual_branch::BranchLabel::RealBug => Some(format!(
"Replace `{algo_label}` with SHA-256, SHA-3, or BLAKE3. For password \
hashing use Argon2 or scrypt."
)),
crate::dual_branch::BranchLabel::Benign => Some(
"If this is intentional non-security use, add `usedforsecurity=False` \
(Python 3.9+) or annotate `# repotoire: protocol-required[<RFC>]` \
to collapse the finding to Info definitively."
.to_string(),
),
};
let mut finding = Finding {
id: String::new(),
detector: "InsecureCryptoDetector".to_string(),
severity: predicted_severity,
title: predicted_title,
description: predicted_description,
affected_files: vec![self.relative_path(path)],
line_start: Some(line_num),
line_end: Some(line_num),
suggested_fix: predicted_fix,
estimated_effort: Some("30 minutes".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-328".to_string()),
why_it_matters: Some(
"Broken primitives let attackers forge signatures, recover keys, or \
leak plaintext patterns — but not every call site is security-relevant. \
The predictor's job is to distinguish."
.to_string(),
),
..Default::default()
};
finding = finding.with_alternative_branch(prediction.alternative_branch);
for reason in prediction.reasons {
finding = finding.with_prediction_reason(reason);
}
for resolution in prediction.resolutions {
finding = finding.with_resolution_signal(resolution);
}
finding
}
}
impl Detector for InsecureCryptoDetector {
fn name(&self) -> &'static str {
"insecure-crypto"
}
fn description(&self) -> &'static str {
"Detects weak cryptographic algorithms"
}
fn requires_graph(&self) -> bool {
false
}
fn file_extensions(&self) -> &'static [&'static str] {
SUPPORTED_EXTS
}
fn content_requirements(&self) -> crate::detectors::detector_context::ContentFlags {
crate::detectors::detector_context::ContentFlags::HAS_CRYPTO
}
fn detect(
&self,
ctx: &crate::detectors::analysis_context::AnalysisContext,
) -> Result<Vec<Finding>> {
let graph = ctx.graph;
let files = &ctx.as_file_provider();
debug!("Starting insecure-crypto detection (AST-first)");
let dual_branch_policy = DualBranchPolicy {
flag_on: ctx.dual_branch.is_enabled_for("insecure-crypto"),
};
let mut findings: Vec<Finding> = Vec::new();
for path in files.files_with_extensions(SUPPORTED_EXTS) {
if findings.len() >= self.max_findings {
break;
}
let path_str = path.to_string_lossy().to_lowercase();
if crate::detectors::base::is_test_path(&path_str) {
continue;
}
if path_str.contains("/lang/")
|| path_str.contains("/locale")
|| path_str.contains("/i18n/")
|| path_str.contains("/translations/")
|| path_str.contains("_lang")
|| path_str.contains(".lang.")
{
continue;
}
if path_str.contains("scripts/") || path_str.contains("/script/") {
continue;
}
let content = match files.content(path) {
Some(c) => c,
None => continue,
};
if !contains_any(CRYPTO_KEYWORD_FINDERS, &content) {
continue;
}
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
let scan = ScanInputs::new(path, &content, ext);
let new_findings = if AST_EXTS.contains(&ext) {
let cached = files.tree(path);
let lang = Language::from_path(path);
let ast_inputs = ScanAstInputs::new(scan, lang, cached.as_deref());
self.scan_file_ast(&ast_inputs, &dual_branch_policy)
} else {
self.scan_file_line(&scan)
};
findings.extend(new_findings);
}
static HANDLER_VERB_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"^(get|post|put|delete|patch|head|options)[A-Z]").expect("valid regex")
});
for finding in &mut findings {
if !matches!(finding.severity, Severity::High | Severity::Medium) {
continue;
}
if let (Some(file_path), Some(line)) =
(finding.affected_files.first(), finding.line_start)
{
let path_str = file_path.to_string_lossy().to_string();
let i = graph.interner();
if let Some(func) = graph.find_function_at(&path_str, line) {
let raw_name = func.node_name(i);
let name_lower = raw_name.to_lowercase();
let is_route = name_lower.contains("handler")
|| name_lower.contains("route")
|| name_lower.contains("endpoint")
|| name_lower.contains("view")
|| name_lower.contains("controller")
|| name_lower.contains("middleware")
|| name_lower.contains("request")
|| name_lower.contains("response")
|| HANDLER_VERB_RE.is_match(raw_name);
if is_route && finding.severity == Severity::High {
finding.severity = Severity::Critical;
}
}
}
}
info!(
"InsecureCryptoDetector found {} potential vulnerabilities",
findings.len()
);
Ok(findings)
}
fn bypass_postprocessor(&self) -> bool {
true
}
}
impl crate::detectors::RegisteredDetector for InsecureCryptoDetector {
fn create(init: &crate::detectors::DetectorInit) -> std::sync::Arc<dyn Detector> {
std::sync::Arc::new(Self::new(init.repo_path))
}
}
static CRYPTO_KEYWORD_FINDERS: &[&LazyLock<memchr::memmem::Finder<'static>>] = &[
&FIND_HASHLIB,
&FIND_CRYPTO_CIPHER,
&FIND_HAZMAT,
&FIND_CREATE_HASH,
&FIND_CREATE_CIPHER,
&FIND_CREATE_DECIPHER,
&FIND_CREATE_SIGN,
&FIND_REQUIRE_CRYPTO,
&FIND_REQUIRE_CRYPTO_DQ,
&FIND_MD5_UPPER,
&FIND_SHA1_UPPER,
&FIND_MD4_UPPER,
&FIND_DES_UPPER,
&FIND_RC4_UPPER,
&FIND_RC2_UPPER,
&FIND_BLOWFISH,
&FIND_ECB,
&FIND_MD5_LOWER,
&FIND_SHA1_LOWER,
&FIND_MESSAGE_DIGEST,
&FIND_CIPHER_GETINSTANCE,
];
struct CryptoSite<'a> {
call_node: tree_sitter::Node<'a>,
kind: WeakCryptoCallKind,
}
fn collect_crypto_sites<'a>(
ctx: &AstWalkCtx<'a>,
node: tree_sitter::Node<'a>,
aliases: &PythonAliases<'_>,
out: &mut Vec<CryptoSite<'a>>,
) {
if let Some(site) = match_crypto_site(node, ctx.source, ctx.lang, aliases) {
out.push(site);
}
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
collect_crypto_sites(ctx, child, aliases, out);
}
}
fn match_crypto_site<'a>(
node: tree_sitter::Node<'a>,
source: &'a [u8],
lang: Language,
aliases: &PythonAliases<'_>,
) -> Option<CryptoSite<'a>> {
match (node.kind(), lang) {
("call", Language::Python) => match_python_crypto_call(node, source, aliases),
("call_expression", Language::JavaScript | Language::TypeScript) => {
match_js_crypto_call(node, source)
}
_ => None,
}
}
use super::python_imports::{
collect_python_from_imports, collect_python_module_aliases, PythonAliases,
};
const PY_BROKEN_HASH_ATTRS: &[&str] = &["md5", "sha1", "md4", "md2", "sha"];
fn match_python_crypto_call<'a>(
node: tree_sitter::Node<'a>,
source: &'a [u8],
aliases: &PythonAliases<'_>,
) -> Option<CryptoSite<'a>> {
let func = node.child_by_field_name("function")?;
let func = unwrap_callee(func);
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let kind = match func.kind() {
"attribute" => {
let obj = func.child_by_field_name("object")?;
let attr = func.child_by_field_name("attribute")?;
let attr_text = node_text(attr, source)?.to_string();
let raw_label = receiver_chain_label(obj, source);
let obj_text = node_text(obj, source).unwrap_or("");
let obj_label = aliases
.modules
.get(obj_text)
.or_else(|| aliases.modules.get(raw_label.as_str()))
.cloned()
.unwrap_or(raw_label);
if obj_label == "hashlib"
&& PY_BROKEN_HASH_ATTRS.contains(&attr_text.to_lowercase().as_str())
{
WeakCryptoCallKind::BrokenAlgoIdentifier(attr_text)
}
else if obj_label == "hashlib" && attr_text == "new" {
let first_arg = arg_nodes.first().copied();
match first_arg.and_then(|n| python_string_literal_value(n, source)) {
Some(s) => match classify_algo_name(&s) {
AlgoKind::Broken(n) => WeakCryptoCallKind::BrokenAlgoStringLiteral(n),
AlgoKind::Safe => return None,
AlgoKind::Unknown => return None,
},
None => match first_arg {
Some(n) if !is_static_string(n) => {
WeakCryptoCallKind::UnknownAlgoFromVariable
}
_ => return None,
},
}
}
else if attr_text == "new" {
let recv_full = node_text(obj, source).unwrap_or("");
let last = recv_full.rsplit('.').next().unwrap_or(recv_full);
let last_lower = last.to_lowercase();
match classify_algo_name(last) {
AlgoKind::Broken(_) => {
WeakCryptoCallKind::BrokenAlgoIdentifier(last.to_string())
}
_ if last_lower == "aes" => {
if let Some(mode_arg) = positional_arg(&arg_nodes, 1) {
let mode_text = node_text(mode_arg, source).unwrap_or("");
if mode_text.to_uppercase().ends_with("MODE_ECB") {
WeakCryptoCallKind::WeakModeCombo {
algo: "aes".to_string(),
mode: "ecb".to_string(),
}
} else {
return None;
}
} else {
return None;
}
}
_ => return None,
}
}
else if matches!(attr_text.to_uppercase().as_str(), "MD5" | "SHA1" | "MD4")
&& obj_label == "hashes"
{
WeakCryptoCallKind::BrokenAlgoIdentifier(attr_text)
} else {
return None;
}
}
"identifier" => {
let name = node_text(func, source)?.to_string();
if let Some(module) = aliases.imports.get(&name) {
if module == "hashlib"
&& PY_BROKEN_HASH_ATTRS.contains(&name.to_lowercase().as_str())
{
WeakCryptoCallKind::BrokenAlgoIdentifier(name)
} else {
return None;
}
} else {
return None;
}
}
_ => return None,
};
Some(CryptoSite {
call_node: node,
kind,
})
}
fn positional_arg<'a>(args: &[tree_sitter::Node<'a>], idx: usize) -> Option<tree_sitter::Node<'a>> {
let mut count = 0;
for a in args {
if a.kind() == "keyword_argument" {
continue;
}
if count == idx {
return Some(*a);
}
count += 1;
}
None
}
fn is_static_string(node: tree_sitter::Node<'_>) -> bool {
matches!(node.kind(), "string" | "concatenated_string")
}
fn python_string_literal_value(node: tree_sitter::Node<'_>, source: &[u8]) -> Option<String> {
if node.kind() != "string" {
return None;
}
let raw = node_text(node, source)?;
let bytes = raw.as_bytes();
let mut i = 0;
while i < bytes.len() && bytes[i].is_ascii_alphabetic() {
i += 1;
}
let after_prefix = &raw[i..];
let inner = after_prefix
.strip_prefix("\"\"\"")
.and_then(|s| s.strip_suffix("\"\"\""))
.or_else(|| {
after_prefix
.strip_prefix("'''")
.and_then(|s| s.strip_suffix("'''"))
})
.or_else(|| {
after_prefix
.strip_prefix('"')
.and_then(|s| s.strip_suffix('"'))
})
.or_else(|| {
after_prefix
.strip_prefix('\'')
.and_then(|s| s.strip_suffix('\''))
})?;
Some(inner.to_string())
}
fn match_js_crypto_call<'a>(
node: tree_sitter::Node<'a>,
source: &'a [u8],
) -> Option<CryptoSite<'a>> {
let func = node.child_by_field_name("function")?;
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let func = unwrap_callee(func);
let kind = match func.kind() {
"member_expression" => {
let obj = func.child_by_field_name("object")?;
let prop = func.child_by_field_name("property")?;
let prop_text = node_text(prop, source)?.to_string();
let recv = receiver_chain_label(obj, source);
let is_crypto_recv =
recv == "crypto" || recv == "Crypto".to_lowercase() || recv == "node:crypto";
if !is_crypto_recv {
return None;
}
match prop_text.as_str() {
"createHash" | "createHmac" => {
classify_string_algo_arg(&arg_nodes, source, false)?
}
"createCipheriv" | "createDecipheriv" | "createCipher" | "createDecipher" => {
classify_string_algo_arg(&arg_nodes, source, true)?
}
"createSign" | "createVerify" => {
classify_string_algo_arg(&arg_nodes, source, false)?
}
_ => return None,
}
}
_ => return None,
};
Some(CryptoSite {
call_node: node,
kind,
})
}
fn classify_string_algo_arg(
args: &[tree_sitter::Node<'_>],
source: &[u8],
allow_mode: bool,
) -> Option<WeakCryptoCallKind> {
let first = args.first().copied()?;
let first = unwrap_arg(first);
if let Some(s) = js_string_literal_value(first, source) {
if allow_mode {
if let Some((algo, mode)) = split_algo_mode(&s) {
let algo_class = classify_algo_name(&algo);
if mode.eq_ignore_ascii_case("ecb") || matches!(algo_class, AlgoKind::Broken(_)) {
return Some(WeakCryptoCallKind::WeakModeCombo { algo, mode });
}
if matches!(algo_class, AlgoKind::Safe) {
return None;
}
}
}
let lower = s.to_lowercase();
for token in lower.split(['-', '_', '/']) {
match classify_algo_name(token) {
AlgoKind::Broken(_) => {
return Some(WeakCryptoCallKind::BrokenAlgoStringLiteral(s));
}
AlgoKind::Safe => {
return None;
}
AlgoKind::Unknown => continue,
}
}
match classify_algo_name(&s) {
AlgoKind::Broken(_) => Some(WeakCryptoCallKind::BrokenAlgoStringLiteral(s)),
_ => None,
}
} else {
Some(WeakCryptoCallKind::UnknownAlgoFromVariable)
}
}
fn unwrap_arg(mut node: tree_sitter::Node<'_>) -> tree_sitter::Node<'_> {
loop {
match node.kind() {
"parenthesized_expression"
| "await_expression"
| "as_expression"
| "type_assertion_expression"
| "non_null_expression"
| "satisfies_expression" => {
let next = node.named_child(0);
match next {
Some(n) => node = n,
None => return node,
}
}
_ => return node,
}
}
}
fn receiver_chain_label(node: tree_sitter::Node<'_>, source: &[u8]) -> String {
receiver_chain_label_shared(node, source, Some(&call_expression_module_label))
}
fn call_expression_module_label(
node: tree_sitter::Node<'_>,
source: &[u8],
) -> Option<&'static str> {
debug_assert_eq!(node.kind(), "call_expression");
let func = node.child_by_field_name("function")?;
let func_text = node_text(func, source)?;
let is_require_or_import =
matches!(func.kind(), "identifier" | "import") && matches!(func_text, "require" | "import");
if !is_require_or_import {
return None;
}
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let first = arg_nodes.first()?;
let module = js_string_literal_value(*first, source)?;
match module.as_str() {
"crypto" | "node:crypto" => Some("crypto"),
_ => None,
}
}
fn js_string_literal_value(node: tree_sitter::Node<'_>, source: &[u8]) -> Option<String> {
if node.kind() != "string" {
return None;
}
let mut cursor = node.walk();
let mut buf = String::new();
let mut saw_fragment = false;
for child in node.children(&mut cursor) {
if child.kind() == "string_fragment" {
if let Some(t) = node_text(child, source) {
buf.push_str(t);
saw_fragment = true;
}
}
}
if saw_fragment {
return Some(buf);
}
let raw = node_text(node, source)?;
let inner = raw
.strip_prefix('"')
.and_then(|s| s.strip_suffix('"'))
.or_else(|| raw.strip_prefix('\'').and_then(|s| s.strip_suffix('\'')))?;
Some(inner.to_string())
}
fn match_line_crypto(line: &str, _ext: &str) -> Option<WeakCryptoCallKind> {
static MESSAGE_DIGEST_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r#"MessageDigest\.getInstance\s*\(\s*["']([^"']+)["']"#).expect("valid regex")
});
static CIPHER_GETINSTANCE_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r#"Cipher\.getInstance\s*\(\s*["']([^"']+)["']"#).expect("valid regex")
});
static MCRYPT_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"mcrypt_(?:encrypt|decrypt)\s*\(\s*MCRYPT_(\w+)").expect("valid regex")
});
static OPENSSL_ENCRYPT_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r#"openssl_(?:encrypt|decrypt)\s*\([^,]*,\s*["']([^"']+)["']"#)
.expect("valid regex")
});
static GO_DES_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"\bdes\.NewCipher\s*\(|\bdes\.NewTripleDESCipher\s*\(").expect("valid regex")
});
static GO_RC4_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"\brc4\.NewCipher\s*\(").expect("valid regex"));
static GO_WEAK_HASH_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"\b(md5|sha1)\s*\.\s*(?:New|Sum)\s*\(").expect("valid regex"));
static RUBY_DIGEST_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"\bDigest::(MD5|SHA1|MD4|MD2)\b").expect("valid regex"));
static C_DIRECT_HASH_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"\b(MD5|SHA1|MD4|MD2|DES_set_key|DES_ecb_encrypt|RC4)\s*\(")
.expect("valid regex")
});
if let Some(cap) = MESSAGE_DIGEST_RE.captures(line) {
let algo = &cap[1];
if let AlgoKind::Broken(name) = classify_algo_name(algo) {
return Some(WeakCryptoCallKind::BrokenAlgoStringLiteral(name));
}
}
if let Some(cap) = CIPHER_GETINSTANCE_RE.captures(line) {
let spec = &cap[1];
let parts: Vec<&str> = spec.split('/').collect();
let algo = parts.first().copied().unwrap_or("");
let mode = parts.get(1).copied().unwrap_or("");
match classify_algo_name(algo) {
AlgoKind::Broken(n) => {
return Some(WeakCryptoCallKind::BrokenAlgoStringLiteral(n));
}
_ => {
if mode.eq_ignore_ascii_case("ecb") {
return Some(WeakCryptoCallKind::WeakModeCombo {
algo: algo.to_string(),
mode: mode.to_string(),
});
}
}
}
}
if let Some(cap) = MCRYPT_RE.captures(line) {
let algo = &cap[1];
if let AlgoKind::Broken(name) = classify_algo_name(algo) {
return Some(WeakCryptoCallKind::BrokenAlgoStringLiteral(name));
}
}
if let Some(cap) = OPENSSL_ENCRYPT_RE.captures(line) {
let spec = &cap[1];
let parts: Vec<&str> = spec.split('-').collect();
let algo = parts.first().copied().unwrap_or("");
let mode = parts.last().copied().unwrap_or("");
if let AlgoKind::Broken(name) = classify_algo_name(algo) {
return Some(WeakCryptoCallKind::BrokenAlgoStringLiteral(name));
}
if mode.eq_ignore_ascii_case("ecb") {
return Some(WeakCryptoCallKind::WeakModeCombo {
algo: algo.to_string(),
mode: mode.to_string(),
});
}
}
if GO_DES_RE.is_match(line) {
return Some(WeakCryptoCallKind::BrokenAlgoIdentifier("DES".to_string()));
}
if GO_RC4_RE.is_match(line) {
return Some(WeakCryptoCallKind::BrokenAlgoIdentifier("RC4".to_string()));
}
if let Some(cap) = GO_WEAK_HASH_RE.captures(line) {
return Some(WeakCryptoCallKind::BrokenAlgoIdentifier(
cap[1].to_uppercase(),
));
}
if let Some(cap) = RUBY_DIGEST_RE.captures(line) {
return Some(WeakCryptoCallKind::BrokenAlgoIdentifier(cap[1].to_string()));
}
if let Some(cap) = C_DIRECT_HASH_RE.captures(line) {
let name = cap[1].to_string();
let canonical = if name.starts_with("DES_") {
"DES".to_string()
} else if name.starts_with("RC4") {
"RC4".to_string()
} else {
name
};
return Some(WeakCryptoCallKind::BrokenAlgoIdentifier(canonical));
}
None
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::builder::GraphBuilder;
fn run(file: &str, content: &str) -> Vec<Finding> {
let store = GraphBuilder::new().freeze();
let detector = InsecureCryptoDetector::new("/mock/repo");
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(file, content)],
);
detector.detect(&ctx).expect("detection should succeed")
}
#[test]
fn test_detects_md5_usage() {
let findings = run(
"crypto_util.py",
"import hashlib\n\ndef compute_hash(data):\n return hashlib.md5(data).hexdigest()\n",
);
assert!(!findings.is_empty(), "Should detect hashlib.md5 usage");
}
#[test]
fn test_no_finding_for_sha256() {
let findings = run(
"crypto_util.py",
"import hashlib\n\ndef compute_hash(data):\n return hashlib.sha256(data).hexdigest()\n",
);
assert!(
findings.is_empty(),
"sha256 must not fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_no_finding_for_class_definition() {
let findings = run(
"text.py",
"from django.db.models import Transform\n\nclass MD5(Transform):\n function = 'MD5'\n lookup_name = 'md5'\n",
);
assert!(
findings.is_empty(),
"class MD5(...) is a class def, not a call. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_java_des_cipher() {
let findings = run(
"CryptoUtil.java",
"import javax.crypto.*;\n\npublic class CryptoUtil {\n public byte[] encrypt(byte[] data) throws Exception {\n Cipher cipher = Cipher.getInstance(\"DES\");\n return cipher.doFinal(data);\n }\n}\n",
);
assert!(
!findings.is_empty(),
"Cipher.getInstance(\"DES\") must fire"
);
}
#[test]
fn test_detects_go_md5_new_and_sum() {
let findings = run(
"hash.go",
r#"
package main
import "crypto/md5"
func hash(payload []byte) {
h := md5.New()
_ = h.Sum(payload)
_ = md5.Sum(payload)
}
"#,
);
assert!(findings.iter().any(|f| f.line_start == Some(5)));
assert!(findings.iter().any(|f| f.line_start == Some(7)));
}
#[test]
fn test_detects_go_sha1_sum() {
let findings = run(
"hash.go",
r#"
package main
import "crypto/sha1"
func hash(payload []byte) {
_ = sha1.Sum(payload)
}
"#,
);
assert!(findings.iter().any(|f| f.line_start == Some(5)));
}
#[test]
fn test_detects_hashlib_md5_python() {
let findings = run(
"h.py",
"import hashlib\ndef hash_pwd(p):\n return hashlib.md5(p).hexdigest()\n",
);
assert!(
findings
.iter()
.any(|f| matches!(f.severity, Severity::High | Severity::Critical)),
"hashlib.md5 should fire High. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_detects_hashlib_sha1_python() {
let findings = run(
"h.py",
"import hashlib\ndef hash_pwd(p):\n return hashlib.sha1(p).hexdigest()\n",
);
assert!(
findings
.iter()
.any(|f| matches!(f.severity, Severity::High | Severity::Critical)),
"hashlib.sha1 should fire High. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_detects_crypto_createhash_md5_js() {
let findings = run(
"h.js",
"const crypto = require('crypto');\nfunction f(d) { return crypto.createHash('md5').update(d).digest(); }\n",
);
assert!(
findings
.iter()
.any(|f| matches!(f.severity, Severity::High | Severity::Critical)),
"crypto.createHash('md5') should fire High. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_skips_hashlib_sha256_python() {
let findings = run(
"h.py",
"import hashlib\ndef ok(p):\n return hashlib.sha256(p).hexdigest()\n",
);
assert!(
findings.is_empty(),
"hashlib.sha256 must not fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_skips_md5_in_comment() {
let findings = run(
"h.py",
"import hashlib\ndef ok(p):\n # Use hashlib.md5 if needed\n return hashlib.sha256(p).hexdigest()\n",
);
assert!(
findings.is_empty(),
"md5 in comment must not fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_b1_require_crypto_createhash_md5_js() {
let findings = run(
"go.js",
"function go(data) { return require('crypto').createHash('md5').update(data).digest(); }\n",
);
assert!(
findings
.iter()
.any(|f| matches!(f.severity, Severity::High | Severity::Critical)),
"B1: require('crypto').createHash('md5') must fire. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_skips_md5_as_method_name_python() {
let findings = run(
"c.py",
"class C:\n def md5(self, data):\n return data\n\nc = C()\nresult = c.md5(b'x')\n",
);
assert!(
findings.is_empty(),
"Method name md5 must not be confused with hashlib.md5. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_aes_ecb_mode_combo_detected_js() {
let findings = run(
"c.js",
"const crypto = require('crypto');\nfunction f(k, iv) { return crypto.createCipheriv('aes-128-ecb', k, iv); }\n",
);
assert!(
!findings.is_empty(),
"AES/ECB combo must fire. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_skips_md5_in_string_literal() {
let findings = run(
"s.js",
"function doc() { let msg = \"compute md5 hash\"; return msg; }\n",
);
assert!(
findings.is_empty(),
"md5 inside a string literal must not fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_python_bare_md5_after_from_import() {
let findings = run(
"h.py",
"from hashlib import md5\ndef hash_pwd(p):\n return md5(p).hexdigest()\n",
);
assert!(
findings
.iter()
.any(|f| matches!(f.severity, Severity::High | Severity::Critical)),
"Bare md5(...) after from hashlib import md5 must fire. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_pycrypto_des_new_detected() {
let findings = run(
"p.py",
"from Crypto.Cipher import DES\ndef encrypt(data, key):\n cipher = DES.new(key, DES.MODE_ECB)\n return cipher.encrypt(data)\n",
);
assert!(
findings
.iter()
.any(|f| matches!(f.severity, Severity::High | Severity::Critical)),
"PyCrypto DES.new must fire High. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_unknown_algo_from_variable_low_severity() {
let findings = run(
"v.js",
"const crypto = require('crypto');\nfunction f(userChoice, d) { return crypto.createHash(userChoice).update(d).digest(); }\n",
);
assert!(
findings.iter().any(|f| matches!(f.severity, Severity::Low)),
"Variable-algo createHash must produce a Low finding. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_python_aliased_module_hashlib_md5_detected() {
let findings = run(
"h.py",
"import hashlib as hl\ndef hash_pwd(p):\n return hl.md5(p).hexdigest()\n",
);
assert!(
findings
.iter()
.any(|f| matches!(f.severity, Severity::High | Severity::Critical)),
"Aliased `hl.md5(...)` after `import hashlib as hl` must fire. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
fn run_dual_branch(file: &str, content: &str) -> Vec<Finding> {
use crate::config::DualBranchConfig;
use std::collections::HashMap;
let store = GraphBuilder::new().freeze();
let detector = InsecureCryptoDetector::new("/mock/repo");
let mut detectors = HashMap::new();
detectors.insert("insecure-crypto".to_string(), true);
let cfg = DualBranchConfig {
enabled: true,
detectors,
};
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![(file, content)],
)
.with_dual_branch(cfg);
detector.detect(&ctx).expect("detection should succeed")
}
#[test]
fn flag_off_emits_single_branch_unchanged() {
let findings = run("h.py", "import hashlib\nh = hashlib.sha1(password)\n");
assert!(!findings.is_empty(), "must still fire single-branch");
for f in &findings {
assert!(
f.alternative_branch.is_none(),
"no alternative_branch when flag off: {:?}",
f.title
);
assert!(
f.prediction_reasons.iter().all(|r| r.weight == 0.0),
"no predictor-emitted (weight ≠ 0) reasons when flag off; \
only weight-0 graph-enrichment reasons are allowed. \
reasons: {:?}",
f.prediction_reasons
.iter()
.map(|r| (&r.kind, r.weight))
.collect::<Vec<_>>()
);
assert_eq!(
f.severity,
Severity::High,
"severity unchanged when flag off"
);
}
}
#[test]
fn flag_on_python_weak_hash_emits_dual_branch() {
let findings = run_dual_branch("h.py", "import hashlib\nh = hashlib.sha1(password)\n");
assert_eq!(findings.len(), 1, "exactly one finding");
let f = &findings[0];
assert!(
f.alternative_branch.is_some(),
"alternative_branch must be populated when flag on. title={:?}",
f.title
);
assert!(
!f.prediction_reasons.is_empty(),
"at least one prediction reason"
);
}
#[test]
fn matrix_predicted_realbug_actual_realbug() {
let findings = run_dual_branch(
"auth.py",
"import hashlib\n\ndef hash_password(password):\n return hashlib.md5(password).hexdigest()\n",
);
assert_eq!(findings.len(), 1);
let f = &findings[0];
assert_eq!(f.severity, Severity::High);
assert!(
f.title.contains("Weak hash"),
"RealBug title; got {:?}",
f.title
);
let alt = f.alternative_branch.as_ref().unwrap();
assert_eq!(alt.label, crate::dual_branch::BranchLabel::Benign);
assert_eq!(alt.severity, Severity::Info);
}
#[test]
fn matrix_predicted_benign_actual_benign_worked_example() {
let findings = run_dual_branch(
"httpx_auth.py",
"import os, hashlib\n\
\n\
class DigestAuth:\n\
\x20 def _get_client_nonce(self):\n\
\x20 return hashlib.sha1(os.urandom(8)).hexdigest()[:8]\n",
);
assert_eq!(findings.len(), 1, "must produce exactly one finding");
let f = &findings[0];
assert_eq!(
f.severity,
Severity::Info,
"predicted Benign in DigestAuth context. title={:?}, reasons={:?}",
f.title,
f.prediction_reasons
.iter()
.map(|r| (&r.kind, r.weight))
.collect::<Vec<_>>()
);
let alt = f.alternative_branch.as_ref().unwrap();
assert_eq!(alt.label, crate::dual_branch::BranchLabel::RealBug);
assert_eq!(alt.severity, Severity::High);
assert!(
f.prediction_reasons.len() >= 2,
"expected ≥2 reasons (enclosing class, truncation, urandom); \
got {} reasons: {:?}",
f.prediction_reasons.len(),
f.prediction_reasons
.iter()
.map(|r| (&r.kind, r.weight))
.collect::<Vec<_>>()
);
}
#[test]
fn matrix_predicted_benign_actual_realbug_synthetic() {
let findings = run_dual_branch(
"buggy_auth.py",
"import hashlib\n\
\n\
class DigestAuthBug:\n\
\x20 def hash_user(self, password):\n\
\x20 return hashlib.md5(password).hexdigest()\n",
);
assert_eq!(findings.len(), 1);
let f = &findings[0];
assert_eq!(
f.severity,
Severity::High,
"0.0 sum tiebreaks to RealBug (conservative default). \
title={:?}, reasons={:?}",
f.title,
f.prediction_reasons
.iter()
.map(|r| (&r.kind, r.weight))
.collect::<Vec<_>>()
);
}
#[test]
fn matrix_predicted_realbug_actual_benign_synthetic() {
let findings = run_dual_branch(
"cache.py",
"import hashlib\n\
\n\
def cache_key(s):\n\
\x20 return hashlib.sha1(s + b'salt').digest()\n",
);
assert_eq!(findings.len(), 1);
let f = &findings[0];
assert_eq!(
f.severity,
Severity::High,
"RealBug with no signals (conservative default). title={:?}",
f.title
);
}
#[test]
fn usedforsecurity_false_collapses_via_detect() {
let findings = run_dual_branch(
"h.py",
"import hashlib\n\
\n\
def hash_password(password):\n\
\x20 return hashlib.md5(password, usedforsecurity=False).hexdigest()\n",
);
assert_eq!(findings.len(), 1);
let f = &findings[0];
assert_eq!(
f.severity,
Severity::Info,
"usedforsecurity=False must collapse to Info even with sensitive arg/fn name"
);
assert_eq!(
f.resolution_signals.len(),
1,
"exactly one resolution signal"
);
match &f.resolution_signals[0].kind {
crate::dual_branch::ResolutionKind::KeywordArgument { name, value } => {
assert_eq!(name, "usedforsecurity");
assert_eq!(value, "False");
}
other => panic!("unexpected resolution kind: {other:?}"),
}
}
#[test]
fn protocol_required_annotation_collapses_via_detect() {
let findings = run_dual_branch(
"h.py",
"import hashlib\n\
\n\
def hash_for_protocol(password):\n\
\x20 return hashlib.sha1(password).hexdigest() # repotoire: protocol-required[RFC7616]\n",
);
assert_eq!(findings.len(), 1);
let f = &findings[0];
assert_eq!(f.severity, Severity::Info);
assert_eq!(f.resolution_signals.len(), 1);
match &f.resolution_signals[0].kind {
crate::dual_branch::ResolutionKind::SourceAnnotation { syntax } => {
assert!(syntax.contains("RFC7616"));
}
other => panic!("unexpected resolution kind: {other:?}"),
}
}
#[test]
fn dual_branch_does_not_affect_non_python_languages() {
let findings = run_dual_branch(
"h.js",
"const crypto = require('crypto');\n\
const h = crypto.createHash('md5').update(data).digest('hex');\n",
);
assert!(
!findings.is_empty(),
"JS path must still fire single-branch"
);
for f in &findings {
assert!(
f.alternative_branch.is_none(),
"JS path must not be dual-branched in Phase 2a: {:?}",
f.title
);
assert!(
f.prediction_reasons.is_empty(),
"JS path must not carry prediction reasons in Phase 2a"
);
}
}
#[test]
fn dual_branch_does_not_affect_ecb_mode_findings() {
let findings = run_dual_branch(
"ecb.py",
"from Crypto.Cipher import AES, DES\n\
\n\
def encrypt(key, data):\n\
\x20 c = AES.new(key, AES.MODE_ECB)\n\
\x20 return c.encrypt(data)\n",
);
for f in &findings {
if f.title.to_lowercase().contains("mode") || f.title.to_lowercase().contains("ecb") {
assert!(
f.alternative_branch.is_none(),
"ECB-mode finding must not be dual-branched in Phase 2a: {:?}",
f.title
);
}
}
}
}