use super::annotation::parse_python_comment;
use crate::dual_branch::{
AlternativeBranch, BranchLabel, PredictionReason, PredictionReasonKind, ResolutionKind,
ResolutionSignal,
};
use crate::models::Severity;
const W_AUTH_PROTOCOL_CLASS: f32 = 0.40;
const W_FIRST_ARG_SENSITIVE: f32 = -0.40;
const W_FIRST_ARG_NON_SENSITIVE: f32 = 0.20;
const W_RESULT_TRUNCATED: f32 = 0.15;
const W_INPUT_INCLUDES_URANDOM: f32 = 0.07;
const SENSITIVE_IDENT_SUBSTRINGS: &[&str] = &[
"password",
"passwd",
"secret",
"token",
"credential",
"apikey",
"api_key",
"private_key",
"privatekey",
"auth",
"session",
];
const AUTH_PROTOCOL_CLASS_SUBSTRINGS: &[&str] = &[
"digestauth",
"ntlmauth",
"krb5",
"oauth1signature", ];
#[derive(Debug, Clone, Default, PartialEq)]
pub(super) struct Evidence {
pub enclosing_class: Option<String>,
pub enclosing_function: Option<String>,
pub first_arg_ident: Option<String>,
pub result_truncated: bool,
pub input_includes_urandom: bool,
pub usedforsecurity_false: bool,
pub protocol_required_annotation: Option<String>,
}
impl Evidence {
#[cfg(test)]
pub(super) fn empty() -> Self {
Self::default()
}
}
#[derive(Debug, Clone)]
pub(super) struct Prediction {
pub predicted: BranchLabel,
pub alternative_branch: AlternativeBranch,
pub predicted_severity: Severity,
pub reasons: Vec<PredictionReason>,
pub resolutions: Vec<ResolutionSignal>,
}
pub(super) fn predict(
evidence: &Evidence,
algo_label: &str, ) -> Prediction {
if evidence.usedforsecurity_false {
return collapse_to_benign(
algo_label,
ResolutionSignal {
kind: ResolutionKind::KeywordArgument {
name: "usedforsecurity".to_string(),
value: "False".to_string(),
},
description: format!(
"`usedforsecurity=False` declares this {algo_label} call as \
non-security (Python 3.9+); the finding collapses to Info."
),
example: Some(format!(
"hashlib.{}(data, usedforsecurity=False)",
algo_label.to_lowercase()
)),
collapses_to: BranchLabel::Benign,
},
PredictionReason {
kind: PredictionReasonKind::KeywordArgument {
name: "usedforsecurity".to_string(),
value: "False".to_string(),
},
weight: 1.0,
note: format!(
"Caller explicitly opted out of security semantics for this \
{algo_label} call."
),
},
);
}
if let Some(rfc) = &evidence.protocol_required_annotation {
return collapse_to_benign(
algo_label,
ResolutionSignal {
kind: ResolutionKind::SourceAnnotation {
syntax: format!("# repotoire: protocol-required[{rfc}]"),
},
description: format!(
"`protocol-required[{rfc}]` annotation declares this {algo_label} \
call as required by an external protocol; the finding collapses \
to Info."
),
example: Some(format!(
"hashlib.{}(data) # repotoire: protocol-required[{rfc}]",
algo_label.to_lowercase()
)),
collapses_to: BranchLabel::Benign,
},
PredictionReason {
kind: PredictionReasonKind::Custom {
description: format!("protocol-required[{rfc}] annotation"),
},
weight: 1.0,
note: format!(
"Annotated as required by {rfc}; not a discretionary algorithm choice."
),
},
);
}
let mut sum: f32 = 0.0;
let mut reasons: Vec<PredictionReason> = Vec::new();
if let Some(class_name) = &evidence.enclosing_class {
if matches_auth_protocol_class(class_name) {
sum += W_AUTH_PROTOCOL_CLASS;
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "class".to_string(),
name: class_name.clone(),
},
weight: W_AUTH_PROTOCOL_CLASS,
note: format!(
"Enclosing class `{class_name}` matches an authentication-protocol \
pattern; {algo_label} use is likely protocol-required."
),
});
}
}
if let Some(arg_name) = &evidence.first_arg_ident {
if matches_sensitive_ident(arg_name) {
sum += W_FIRST_ARG_SENSITIVE;
reasons.push(PredictionReason {
kind: PredictionReasonKind::FirstArgIdentifier {
name: arg_name.clone(),
},
weight: W_FIRST_ARG_SENSITIVE,
note: format!(
"First argument is `{arg_name}`, which matches a sensitive-data \
lexicon; this looks like security-relevant hashing."
),
});
} else {
sum += W_FIRST_ARG_NON_SENSITIVE;
reasons.push(PredictionReason {
kind: PredictionReasonKind::FirstArgIdentifier {
name: arg_name.clone(),
},
weight: W_FIRST_ARG_NON_SENSITIVE,
note: format!(
"First argument is `{arg_name}`, which does not look like sensitive \
data; suggests non-security use."
),
});
}
}
if evidence.result_truncated {
sum += W_RESULT_TRUNCATED;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "result truncated to [:N]".to_string(),
},
weight: W_RESULT_TRUNCATED,
note: "Truncating the digest discards bits; suggests use as a short \
identifier rather than a cryptographic hash."
.to_string(),
});
}
if evidence.input_includes_urandom {
sum += W_INPUT_INCLUDES_URANDOM;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "input includes os.urandom".to_string(),
},
weight: W_INPUT_INCLUDES_URANDOM,
note: "Hashing entropy from `os.urandom` is consistent with nonce or \
key-id derivation, not data integrity."
.to_string(),
});
}
let predicted = if sum > 0.0 {
BranchLabel::Benign
} else {
BranchLabel::RealBug
};
build_prediction(predicted, algo_label, reasons, Vec::new())
}
fn matches_auth_protocol_class(name: &str) -> bool {
let lower = name.to_lowercase();
AUTH_PROTOCOL_CLASS_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
fn matches_sensitive_ident(name: &str) -> bool {
let lower = name.to_lowercase();
SENSITIVE_IDENT_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
fn collapse_to_benign(
algo_label: &str,
resolution: ResolutionSignal,
reason: PredictionReason,
) -> Prediction {
build_prediction(
BranchLabel::Benign,
algo_label,
vec![reason],
vec![resolution],
)
}
fn build_prediction(
predicted: BranchLabel,
algo_label: &str,
reasons: Vec<PredictionReason>,
resolutions: Vec<ResolutionSignal>,
) -> Prediction {
let predicted_severity = severity_for_branch(predicted);
let alternative_label = predicted.opposite();
let alternative_severity = severity_for_branch(alternative_label);
let alternative_branch = AlternativeBranch {
label: alternative_label,
severity: alternative_severity,
title: title_for_branch(alternative_label, algo_label),
description: description_for_branch(alternative_label, algo_label),
suggested_fix: suggested_fix_for_branch(alternative_label, algo_label),
};
Prediction {
predicted,
alternative_branch,
predicted_severity,
reasons,
resolutions,
}
}
fn severity_for_branch(label: BranchLabel) -> Severity {
match label {
BranchLabel::RealBug => Severity::High,
BranchLabel::Benign => Severity::Info,
}
}
fn title_for_branch(label: BranchLabel, algo_label: &str) -> String {
match label {
BranchLabel::RealBug => format!("Weak hash algorithm ({algo_label})"),
BranchLabel::Benign => {
format!("Non-security use of {algo_label} (informational)")
}
}
}
fn description_for_branch(label: BranchLabel, algo_label: &str) -> String {
match label {
BranchLabel::RealBug => format!(
"{algo_label} is cryptographically broken. If this call is protecting \
security-sensitive data (passwords, signatures, integrity), it must be \
replaced with SHA-256+ or a password-hashing function (Argon2/scrypt)."
),
BranchLabel::Benign => format!(
"{algo_label} appears to be used for a non-security purpose (cache key, \
nonce, identifier, or protocol-required computation). The algorithm's \
cryptographic weakness is irrelevant in this context, but the call is \
carried as an alternative interpretation in case the predictor is wrong."
),
}
}
fn suggested_fix_for_branch(label: BranchLabel, algo_label: &str) -> Option<String> {
match label {
BranchLabel::RealBug => Some(format!(
"Replace `{algo_label}` with SHA-256, SHA-3, or BLAKE3. For password \
hashing use Argon2 or scrypt."
)),
BranchLabel::Benign => Some(
"If this is intentional non-security use, add `usedforsecurity=False` \
(Python 3.9+) or annotate `# repotoire: protocol-required[<RFC>]` to \
collapse the finding to Info."
.to_string(),
),
}
}
pub(super) fn extract_protocol_required_rfc(line: &str) -> Option<String> {
let ann = parse_python_comment(line)?;
if ann.kind != "protocol-required" {
return None;
}
if ann.args.is_empty() {
Some("unknown".to_string())
} else {
Some(ann.args[0].clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn worked_example_sums_to_design_doc_82_percent() {
let ev = Evidence {
enclosing_class: Some("DigestAuth".to_string()),
enclosing_function: Some("_get_client_nonce".to_string()),
first_arg_ident: Some("s".to_string()),
result_truncated: true,
input_includes_urandom: true,
usedforsecurity_false: false,
protocol_required_annotation: None,
};
let pred = predict(&ev, "SHA1");
assert_eq!(pred.predicted, BranchLabel::Benign);
assert_eq!(pred.predicted_severity, Severity::Info);
let sum: f32 = pred.reasons.iter().map(|r| r.weight).sum();
assert!(
(sum - 0.82).abs() < 0.001,
"expected sum ~= 0.82 (design doc), got {sum}"
);
assert_eq!(pred.reasons.len(), 4);
}
#[test]
fn usedforsecurity_false_collapses_to_benign() {
let ev = Evidence {
usedforsecurity_false: true,
first_arg_ident: Some("password".to_string()),
..Evidence::empty()
};
let pred = predict(&ev, "MD5");
assert_eq!(pred.predicted, BranchLabel::Benign);
assert_eq!(pred.predicted_severity, Severity::Info);
assert_eq!(pred.reasons.len(), 1);
assert_eq!(pred.reasons[0].weight, 1.0);
assert_eq!(pred.resolutions.len(), 1);
match &pred.resolutions[0].kind {
ResolutionKind::KeywordArgument { name, value } => {
assert_eq!(name, "usedforsecurity");
assert_eq!(value, "False");
}
other => panic!("unexpected resolution kind: {other:?}"),
}
}
#[test]
fn protocol_required_annotation_collapses_to_benign() {
let ev = Evidence {
protocol_required_annotation: Some("RFC7616".to_string()),
first_arg_ident: Some("password".to_string()),
..Evidence::empty()
};
let pred = predict(&ev, "SHA1");
assert_eq!(pred.predicted, BranchLabel::Benign);
assert_eq!(pred.resolutions.len(), 1);
match &pred.resolutions[0].kind {
ResolutionKind::SourceAnnotation { syntax } => {
assert!(syntax.contains("RFC7616"));
assert!(syntax.starts_with("# repotoire:"));
}
other => panic!("unexpected resolution kind: {other:?}"),
}
}
#[test]
fn matrix_predicted_benign_actual_benign() {
let ev = Evidence {
enclosing_class: Some("DigestAuth".to_string()),
enclosing_function: Some("_get_client_nonce".to_string()),
first_arg_ident: Some("s".to_string()),
result_truncated: true,
input_includes_urandom: true,
..Evidence::empty()
};
let pred = predict(&ev, "SHA1");
assert_eq!(pred.predicted, BranchLabel::Benign);
}
#[test]
fn matrix_predicted_benign_actual_realbug_synthetic() {
let ev = Evidence {
enclosing_class: Some("DigestAuthBug".to_string()),
first_arg_ident: Some("password".to_string()),
..Evidence::empty()
};
let pred = predict(&ev, "MD5");
assert_eq!(
pred.predicted,
BranchLabel::RealBug,
"auth-class + sensitive-arg should tie at 0 and tiebreak RealBug"
);
}
#[test]
fn matrix_predicted_realbug_actual_benign_synthetic() {
let ev = Evidence {
enclosing_class: None,
enclosing_function: Some("cache_key".to_string()),
first_arg_ident: None, ..Evidence::empty()
};
let pred = predict(&ev, "SHA1");
assert_eq!(
pred.predicted,
BranchLabel::RealBug,
"no signals at all should tiebreak RealBug"
);
}
#[test]
fn matrix_predicted_realbug_actual_realbug() {
let ev = Evidence {
enclosing_function: Some("hash_password".to_string()),
first_arg_ident: Some("password".to_string()),
..Evidence::empty()
};
let pred = predict(&ev, "MD5");
assert_eq!(pred.predicted, BranchLabel::RealBug);
assert_eq!(pred.predicted_severity, Severity::High);
}
#[test]
fn sensitive_lexicon_substring_matches() {
assert!(matches_sensitive_ident("password"));
assert!(matches_sensitive_ident("user_password"));
assert!(matches_sensitive_ident("USER_PASSWORD")); assert!(matches_sensitive_ident("api_key"));
assert!(matches_sensitive_ident("session_token"));
assert!(!matches_sensitive_ident("s"));
assert!(!matches_sensitive_ident("data"));
assert!(!matches_sensitive_ident("nonce"));
}
#[test]
fn auth_protocol_class_matches() {
assert!(matches_auth_protocol_class("DigestAuth"));
assert!(matches_auth_protocol_class("HTTPDigestAuth"));
assert!(matches_auth_protocol_class("digestauth")); assert!(!matches_auth_protocol_class("MyHasher"));
assert!(!matches_auth_protocol_class("HmacBuilder"));
}
#[test]
fn predicted_benign_carries_realbug_alternative() {
let ev = Evidence {
enclosing_class: Some("DigestAuth".to_string()),
first_arg_ident: Some("s".to_string()),
result_truncated: true,
input_includes_urandom: true,
..Evidence::empty()
};
let pred = predict(&ev, "SHA1");
assert_eq!(pred.predicted, BranchLabel::Benign);
assert_eq!(pred.alternative_branch.label, BranchLabel::RealBug);
assert_eq!(pred.alternative_branch.severity, Severity::High);
assert!(pred.alternative_branch.title.contains("Weak hash"));
}
#[test]
fn predicted_realbug_carries_benign_alternative() {
let ev = Evidence {
first_arg_ident: Some("password".to_string()),
..Evidence::empty()
};
let pred = predict(&ev, "MD5");
assert_eq!(pred.predicted, BranchLabel::RealBug);
assert_eq!(pred.alternative_branch.label, BranchLabel::Benign);
assert_eq!(pred.alternative_branch.severity, Severity::Info);
}
#[test]
fn empty_evidence_predicts_realbug() {
let pred = predict(&Evidence::empty(), "MD5");
assert_eq!(
pred.predicted,
BranchLabel::RealBug,
"with no evidence we lean conservative"
);
assert!(pred.reasons.is_empty());
}
#[test]
fn extracts_protocol_required_rfc() {
assert_eq!(
extract_protocol_required_rfc("h.sha1(s) # repotoire: protocol-required[RFC7616]"),
Some("RFC7616".to_string()),
);
}
#[test]
fn extract_protocol_required_defaults_when_no_rfc() {
assert_eq!(
extract_protocol_required_rfc("h.sha1(s) # repotoire: protocol-required"),
Some("unknown".to_string()),
);
assert_eq!(
extract_protocol_required_rfc("h.sha1(s) # repotoire: protocol-required[]"),
Some("unknown".to_string()),
);
}
#[test]
fn extract_protocol_required_ignores_other_kinds() {
assert_eq!(
extract_protocol_required_rfc("h.sha1(s) # repotoire: low-entropy[md5]"),
None,
);
assert_eq!(extract_protocol_required_rfc("h.sha1(s) # noqa"), None,);
}
}