use super::annotation::parse_python_comment;
use crate::dual_branch::{
AlternativeBranch, BranchLabel, PredictionReason, PredictionReasonKind, ResolutionKind,
ResolutionSignal,
};
use crate::models::Severity;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum HttpApi {
Advocate,
Requests,
Urllib,
Httpx,
Aiohttp,
Unknown,
}
impl HttpApi {
pub(super) fn callee_label(self) -> &'static str {
match self {
HttpApi::Advocate => "advocate",
HttpApi::Requests => "requests",
HttpApi::Urllib => "urllib",
HttpApi::Httpx => "httpx",
HttpApi::Aiohttp => "aiohttp",
HttpApi::Unknown => "HTTP client",
}
}
pub(super) fn is_python(self) -> bool {
matches!(
self,
HttpApi::Advocate
| HttpApi::Requests
| HttpApi::Urllib
| HttpApi::Httpx
| HttpApi::Aiohttp
)
}
}
pub(super) const W_IMPORT_ADVOCATE: f32 = 0.30;
pub(super) const W_IMPORT_DEFUSEDURL: f32 = 0.30;
pub(super) const W_IMPORT_VALIDATORS: f32 = 0.10;
pub(super) const W_ALLOWLIST_CALL: f32 = 0.40;
pub(super) const W_SCHEME_HOSTNAME_ALLOWLIST: f32 = 0.30;
pub(super) const W_PRIVATE_IP_GUARD: f32 = 0.30;
pub(super) const W_ENCLOSING_TEST_FUNCTION: f32 = 0.15;
pub(super) const W_USER_INPUT_FLOW: f32 = -0.50;
pub(super) const W_ENCLOSING_HANDLER: f32 = -0.30;
pub(super) const W_URL_FSTRING_CONCAT: f32 = -0.20;
pub(super) const W_API_ADVOCATE_CALL: f32 = 1.0;
const USER_INPUT_SUBSTRINGS: &[&str] = &[
"req.body",
"req.query",
"req.params",
"req.form",
"request.body",
"request.query",
"request.params",
"request.json",
"request.args",
"request.form",
"request.values",
"ctx.params",
"ctx.query",
"ctx.request",
];
const TEST_FUNCTION_SUBSTRINGS: &[&str] = &["test_", "_test", "fixture", "setup", "teardown"];
const HANDLER_FUNCTION_SUBSTRINGS: &[&str] = &[
"handler",
"route",
"endpoint",
"view",
"controller",
"middleware",
"request",
"response",
"proxy",
"fetch",
"download",
"webhook",
"callback",
"import",
];
const ALLOWLIST_CALL_SUBSTRINGS: &[&str] = &[
"is_safe_url",
"validate_url",
"check_url",
"verify_url",
"is_allowed_url",
"ensure_safe_url",
"validators.url",
"url_allowed",
"is_url_allowed",
];
const SCHEME_HOSTNAME_ALLOWLIST_SUBSTRINGS: &[&str] = &[
".scheme in",
".hostname in",
".host in",
".netloc in",
"ALLOWED_HOSTS",
"ALLOWED_URLS",
"ALLOWLIST",
"allowed_hosts",
"allowed_urls",
"allowlist",
];
const PRIVATE_IP_GUARD_SUBSTRINGS: &[&str] = &[
"ip_address(",
"ip_network(",
"is_private",
"is_loopback",
"is_link_local",
"is_reserved",
"is_multicast",
"is_unspecified",
];
#[derive(Debug, Clone, Default, PartialEq)]
pub(super) struct Evidence {
pub api: Option<HttpApi>,
pub enclosing_function: Option<String>,
pub enclosing_class: Option<String>,
pub import_advocate: bool,
pub import_defusedurl: bool,
pub import_validators: bool,
pub has_allowlist_call: bool,
pub has_scheme_hostname_allowlist: bool,
pub has_private_ip_guard: bool,
pub has_user_input_flow: bool,
pub url_fstring_or_concat: bool,
pub ssrf_safe_annotation: Option<String>,
pub ssrf_vulnerable_annotation: Option<String>,
}
impl Evidence {
#[cfg(test)]
pub(super) fn empty() -> Self {
Self::default()
}
}
#[derive(Debug, Clone)]
pub(super) struct Prediction {
pub predicted: BranchLabel,
pub alternative_branch: AlternativeBranch,
pub predicted_severity: Severity,
pub reasons: Vec<PredictionReason>,
pub resolutions: Vec<ResolutionSignal>,
}
pub(super) fn predict(evidence: &Evidence) -> Prediction {
let api = evidence.api.unwrap_or(HttpApi::Unknown);
let api_label = api.callee_label();
if let Some(reason) = &evidence.ssrf_safe_annotation {
return collapse(
BranchLabel::Benign,
api,
evidence.has_user_input_flow,
ResolutionSignal {
kind: ResolutionKind::SourceAnnotation {
syntax: format!("# repotoire: ssrf-safe[{reason}]"),
},
description: format!(
"`ssrf-safe[{reason}]` annotation declares this HTTP \
call as safe (caller-side allowlist, CDN-gateway \
validation, etc.); the finding collapses to Info."
),
example: Some(format!(
"{api_label}(...) # repotoire: ssrf-safe[{reason}]"
)),
collapses_to: BranchLabel::Benign,
},
PredictionReason {
kind: PredictionReasonKind::Custom {
description: format!("ssrf-safe[{reason}] annotation"),
},
weight: 1.0,
note: format!(
"Annotated as caller-validated ({reason}); not an \
SSRF risk."
),
},
);
}
if let Some(source) = &evidence.ssrf_vulnerable_annotation {
return collapse(
BranchLabel::RealBug,
api,
evidence.has_user_input_flow,
ResolutionSignal {
kind: ResolutionKind::SourceAnnotation {
syntax: format!("# repotoire: ssrf-vulnerable[{source}]"),
},
description: format!(
"`ssrf-vulnerable[{source}]` annotation declares this \
HTTP call as exposed to attacker-controlled URLs; \
the finding stays at the existing severity."
),
example: Some(format!(
"{api_label}(...) # repotoire: ssrf-vulnerable[{source}]"
)),
collapses_to: BranchLabel::RealBug,
},
PredictionReason {
kind: PredictionReasonKind::Custom {
description: format!("ssrf-vulnerable[{source}] annotation"),
},
weight: -1.0,
note: format!("Annotated as SSRF-exposed (source: {source})."),
},
);
}
if matches!(api, HttpApi::Advocate) {
return collapse(
BranchLabel::Benign,
api,
evidence.has_user_input_flow,
ResolutionSignal {
kind: ResolutionKind::StructuralPattern {
description: "call on advocate API (safe-by-construction)".to_string(),
},
description: "`advocate.*` HTTP clients are safe-by-construction against SSRF: \
the Advocate wrapper enforces the IP allowlist / private-IP block \
at the request transport layer. The URL the user supplies is \
irrelevant once Advocate is in the path."
.to_string(),
example: Some(format!("{api_label}(...)")),
collapses_to: BranchLabel::Benign,
},
PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "HTTP call uses advocate (safe-by-construction)".to_string(),
},
weight: W_API_ADVOCATE_CALL,
note: "The call site is on an `advocate.*` API. Advocate \
blocks private IPs / metadata endpoints / loopback \
at the transport layer, so user-controlled URLs \
cannot mount an SSRF attack. Strongest Benign \
signal in the v0 model."
.to_string(),
},
);
}
let mut sum: f32 = 0.0;
let mut reasons: Vec<PredictionReason> = Vec::new();
if evidence.import_advocate {
sum += W_IMPORT_ADVOCATE;
reasons.push(PredictionReason {
kind: PredictionReasonKind::ImportPresence {
module: "advocate".to_string(),
},
weight: W_IMPORT_ADVOCATE,
note: "`advocate` is the de-facto SSRF-safe HTTP client for \
Python. The import is weak signal (file-scoped, not \
call-scoped per v0); a naked `requests.get` in the \
same file can still flip the verdict to RealBug."
.to_string(),
});
}
if evidence.import_defusedurl {
sum += W_IMPORT_DEFUSEDURL;
reasons.push(PredictionReason {
kind: PredictionReasonKind::ImportPresence {
module: "defusedurl".to_string(),
},
weight: W_IMPORT_DEFUSEDURL,
note: "`defusedurl` / `safe_url_check` provides URL validation \
for SSRF mitigation. Weak signal on its own; pairs \
with the validator-call signals."
.to_string(),
});
}
if evidence.import_validators {
sum += W_IMPORT_VALIDATORS;
reasons.push(PredictionReason {
kind: PredictionReasonKind::ImportPresence {
module: "validators".to_string(),
},
weight: W_IMPORT_VALIDATORS,
note: "`validators` exposes `validators.url(...)` — a syntactic \
URL validator. Weakest of the import signals; pairs \
with an actual call to `validators.url(...)` upstream."
.to_string(),
});
}
if evidence.has_allowlist_call {
sum += W_ALLOWLIST_CALL;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "URL passes through allowlist validator before request".to_string(),
},
weight: W_ALLOWLIST_CALL,
note: "A developer-authored allowlist callable \
(`is_safe_url`, `validate_url`, `validators.url`, etc.) \
appears in the 10-line lookback window. v0 trusts \
*presence* not *correctness* — if the allowlist's \
body is misconfigured this signal still fires."
.to_string(),
});
}
if evidence.has_scheme_hostname_allowlist {
sum += W_SCHEME_HOSTNAME_ALLOWLIST;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "scheme/hostname allowlist check before request".to_string(),
},
weight: W_SCHEME_HOSTNAME_ALLOWLIST,
note: "A scheme or hostname allowlist check \
(`parsed.scheme in {...}`, `parsed.hostname in \
ALLOWED_HOSTS`) appears in the 10-line lookback \
window. Same trust-presence-not-correctness caveat."
.to_string(),
});
}
if evidence.has_private_ip_guard {
sum += W_PRIVATE_IP_GUARD;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "private-IP guard via ipaddress before request".to_string(),
},
weight: W_PRIVATE_IP_GUARD,
note: "An `ipaddress.ip_address(host).is_private` (or \
`.is_loopback`, `.is_link_local`, etc.) guard appears \
in the 10-line lookback window. Closes the metadata-\
endpoint / internal-host leg specifically."
.to_string(),
});
}
if evidence.has_user_input_flow {
sum += W_USER_INPUT_FLOW;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "user input flows to URL within 10 lines".to_string(),
},
weight: W_USER_INPUT_FLOW,
note: "Request body / query / params / form data flows into \
the URL argument within a 10-line lookback window. \
Attacker controls the URL being requested."
.to_string(),
});
}
if evidence.url_fstring_or_concat {
sum += W_URL_FSTRING_CONCAT;
reasons.push(PredictionReason {
kind: PredictionReasonKind::StructuralPattern {
description: "URL constructed from f-string/concat with user input".to_string(),
},
weight: W_URL_FSTRING_CONCAT,
note: "The URL argument is built via f-string or string \
concatenation. The developer cannot rely on \
`urlparse` to normalize the scheme/host because the \
URL is assembled raw."
.to_string(),
});
}
if let Some(fn_name) = &evidence.enclosing_function {
if matches_test_function(fn_name) {
sum += W_ENCLOSING_TEST_FUNCTION;
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "function".to_string(),
name: fn_name.clone(),
},
weight: W_ENCLOSING_TEST_FUNCTION,
note: format!(
"Enclosing function `{fn_name}` looks like a \
test/fixture; test code rarely the actionable \
security target."
),
});
} else if matches_handler_function(fn_name) {
sum += W_ENCLOSING_HANDLER;
reasons.push(PredictionReason {
kind: PredictionReasonKind::EnclosingScope {
scope_kind: "request_handler".to_string(),
name: fn_name.clone(),
},
weight: W_ENCLOSING_HANDLER,
note: format!(
"Enclosing function `{fn_name}` looks like a request \
handler (`handler`/`route`/`endpoint`/`view`/\
`controller`/`proxy`/`fetch`/`webhook`/`callback`); \
higher prior on attacker-reachable code."
),
});
}
}
let predicted = if sum > 0.0 {
BranchLabel::Benign
} else {
BranchLabel::RealBug
};
build_prediction(
predicted,
api,
evidence.has_user_input_flow,
reasons,
Vec::new(),
)
}
pub(super) fn matches_user_input(text: &str) -> bool {
let lower = text.to_lowercase();
USER_INPUT_SUBSTRINGS.iter().any(|sub| lower.contains(sub))
}
pub(super) fn matches_allowlist_call(text: &str) -> bool {
let lower = text.to_lowercase();
ALLOWLIST_CALL_SUBSTRINGS.iter().any(|sub| {
let needle = sub.to_lowercase();
let mut search_start = 0usize;
while let Some(rel) = lower[search_start..].find(&needle) {
let idx = search_start + rel;
let after = &lower[idx + needle.len()..];
if after.trim_start().starts_with('(') {
return true;
}
search_start = idx + needle.len();
}
false
})
}
pub(super) fn matches_scheme_hostname_allowlist(text: &str) -> bool {
let lower = text.to_lowercase();
SCHEME_HOSTNAME_ALLOWLIST_SUBSTRINGS
.iter()
.any(|sub| lower.contains(&sub.to_lowercase()))
}
pub(super) fn matches_private_ip_guard(text: &str) -> bool {
let lower = text.to_lowercase();
PRIVATE_IP_GUARD_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
fn matches_test_function(name: &str) -> bool {
let lower = name.to_lowercase();
TEST_FUNCTION_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
fn matches_handler_function(name: &str) -> bool {
let lower = name.to_lowercase();
HANDLER_FUNCTION_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
}
fn collapse(
label: BranchLabel,
api: HttpApi,
has_user_input: bool,
resolution: ResolutionSignal,
reason: PredictionReason,
) -> Prediction {
build_prediction(label, api, has_user_input, vec![reason], vec![resolution])
}
fn build_prediction(
predicted: BranchLabel,
api: HttpApi,
has_user_input: bool,
reasons: Vec<PredictionReason>,
resolutions: Vec<ResolutionSignal>,
) -> Prediction {
let api_label = api.callee_label();
let predicted_severity = severity_for_branch(predicted, has_user_input);
let alternative_label = predicted.opposite();
let alternative_severity = severity_for_branch(alternative_label, has_user_input);
let alternative_branch = AlternativeBranch {
label: alternative_label,
severity: alternative_severity,
title: title_for_branch(alternative_label, api_label),
description: description_for_branch(alternative_label, api_label),
suggested_fix: suggested_fix_for_branch(alternative_label, api_label),
};
Prediction {
predicted,
alternative_branch,
predicted_severity,
reasons,
resolutions,
}
}
fn severity_for_branch(label: BranchLabel, has_user_input: bool) -> Severity {
match label {
BranchLabel::RealBug => {
if has_user_input {
Severity::Critical
} else {
Severity::High
}
}
BranchLabel::Benign => Severity::Info,
}
}
fn title_for_branch(label: BranchLabel, api_label: &str) -> String {
match label {
BranchLabel::RealBug => format!("Potential SSRF vulnerability in {api_label} call"),
BranchLabel::Benign => {
format!("HTTP call via {api_label} appears safely gated (informational)")
}
}
}
fn description_for_branch(label: BranchLabel, api_label: &str) -> String {
match label {
BranchLabel::RealBug => format!(
"The `{api_label}` HTTP call appears to be operating without \
a safe-URL gate. SSRF vulnerabilities allow attackers to \
reach internal services (`http://internal-admin/`), exfil \
cloud metadata endpoints (`http://169.254.169.254/`), \
port-scan internal networks, and bypass IP-based access \
controls."
),
BranchLabel::Benign => format!(
"The `{api_label}` HTTP call appears to be either safe-by-\
construction (Advocate) or explicitly gated (allowlist \
validator / private-IP guard / scheme allowlist). The \
call site is carried as Info; the RealBug interpretation \
is preserved in `alternative_branch` in case the predictor \
is wrong."
),
}
}
fn suggested_fix_for_branch(label: BranchLabel, _api_label: &str) -> Option<String> {
match label {
BranchLabel::RealBug => Some(
"Use Advocate (the SSRF-safe HTTP client for Python) or add \
an explicit allowlist check before the request. Example: \
`from advocate import Session; Session().get(url)`. \
Alternative: `parsed = urlparse(url); assert parsed.hostname \
in ALLOWED_HOSTS; requests.get(url)`. Or: \
`assert not ipaddress.ip_address(host).is_private; requests.get(url)`."
.to_string(),
),
BranchLabel::Benign => Some(
"If this is intentional safe usage, annotate \
`# repotoire: ssrf-safe[<reason>]` to collapse the finding \
to Info definitively."
.to_string(),
),
}
}
pub(super) fn extract_ssrf_safe_reason(line: &str) -> Option<String> {
let ann = parse_python_comment(line)?;
if ann.kind != "ssrf-safe" {
return None;
}
if ann.args.is_empty() {
Some("unspecified".to_string())
} else {
Some(ann.args[0].clone())
}
}
pub(super) fn extract_ssrf_vulnerable_source(line: &str) -> Option<String> {
let ann = parse_python_comment(line)?;
if ann.kind != "ssrf-vulnerable" {
return None;
}
if ann.args.is_empty() {
Some("unspecified".to_string())
} else {
Some(ann.args[0].clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn requests_get_with_user_input_in_handler_predicts_realbug_critical() {
let evidence = Evidence {
api: Some(HttpApi::Requests),
has_user_input_flow: true,
enclosing_function: Some("proxy_handler".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
assert_eq!(p.alternative_branch.label, BranchLabel::Benign);
assert_eq!(p.alternative_branch.severity, Severity::Info);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!(
(total - (W_USER_INPUT_FLOW + W_ENCLOSING_HANDLER)).abs() < 1e-6,
"expected -0.80, got {total}"
);
}
#[test]
fn advocate_canonical_usage_predicts_benign() {
let evidence = Evidence {
api: Some(HttpApi::Advocate),
import_advocate: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
assert!(
p.reasons.iter().any(|r| r.weight == W_API_ADVOCATE_CALL),
"must emit the advocate-call reason"
);
}
#[test]
fn advocate_call_with_user_input_stays_benign() {
let evidence = Evidence {
api: Some(HttpApi::Advocate),
import_advocate: true,
has_user_input_flow: true,
enclosing_function: Some("proxy_handler".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(
p.predicted,
BranchLabel::Benign,
"advocate call stays Benign even with user input + handler"
);
assert_eq!(p.predicted_severity, Severity::Info);
}
#[test]
fn allowlist_call_with_user_input_in_handler_stays_realbug() {
let evidence = Evidence {
api: Some(HttpApi::Requests),
has_allowlist_call: true,
has_user_input_flow: true,
enclosing_function: Some("proxy_handler".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!(
(total - (W_ALLOWLIST_CALL + W_USER_INPUT_FLOW + W_ENCLOSING_HANDLER)).abs() < 1e-6,
"expected -0.40, got {total}"
);
}
#[test]
fn allowlist_plus_private_ip_guard_predicts_benign() {
let evidence = Evidence {
api: Some(HttpApi::Requests),
has_allowlist_call: true,
has_private_ip_guard: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!(
(total - (W_ALLOWLIST_CALL + W_PRIVATE_IP_GUARD)).abs() < 1e-6,
"expected +0.70, got {total}"
);
}
#[test]
fn unused_advocate_import_with_naked_requests_predicts_benign() {
let evidence = Evidence {
api: Some(HttpApi::Requests),
import_advocate: true,
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
}
#[test]
fn unused_advocate_in_handler_with_user_input_correctly_flips_realbug() {
let evidence = Evidence {
api: Some(HttpApi::Requests),
import_advocate: true,
has_user_input_flow: true,
enclosing_function: Some("proxy_handler".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!((total - -0.50).abs() < 1e-6, "expected -0.50, got {total}");
}
#[test]
fn fstring_with_user_input_predicts_realbug_critical() {
let evidence = Evidence {
api: Some(HttpApi::Requests),
has_user_input_flow: true,
url_fstring_or_concat: true,
enclosing_function: Some("proxy_handler".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::Critical);
let total: f32 = p.reasons.iter().map(|r| r.weight).sum();
assert!((total - -1.00).abs() < 1e-6, "expected -1.00, got {total}");
}
#[test]
fn ssrf_safe_annotation_collapses_to_benign() {
let evidence = Evidence {
api: Some(HttpApi::Requests),
has_user_input_flow: true,
enclosing_function: Some("proxy_handler".to_string()),
ssrf_safe_annotation: Some("validated-by-cdn".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::Benign);
assert_eq!(p.predicted_severity, Severity::Info);
assert_eq!(p.resolutions.len(), 1);
assert!(matches!(
p.resolutions[0].kind,
ResolutionKind::SourceAnnotation { .. }
));
}
#[test]
fn ssrf_vulnerable_annotation_collapses_to_realbug() {
let evidence = Evidence {
api: Some(HttpApi::Advocate),
import_advocate: true,
ssrf_vulnerable_annotation: Some("audited-untrusted".to_string()),
..Default::default()
};
let p = predict(&evidence);
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::High);
}
#[test]
fn empty_evidence_tiebreaks_realbug() {
let p = predict(&Evidence::empty());
assert_eq!(p.predicted, BranchLabel::RealBug);
assert_eq!(p.predicted_severity, Severity::High);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn realbug_signal_weights_are_negative() {
assert!(W_USER_INPUT_FLOW < 0.0);
assert!(W_ENCLOSING_HANDLER < 0.0);
assert!(W_URL_FSTRING_CONCAT < 0.0);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn benign_signal_weights_are_positive() {
assert!(W_IMPORT_ADVOCATE > 0.0);
assert!(W_IMPORT_DEFUSEDURL > 0.0);
assert!(W_IMPORT_VALIDATORS > 0.0);
assert!(W_ALLOWLIST_CALL > 0.0);
assert!(W_SCHEME_HOSTNAME_ALLOWLIST > 0.0);
assert!(W_PRIVATE_IP_GUARD > 0.0);
assert!(W_ENCLOSING_TEST_FUNCTION > 0.0);
assert!(W_API_ADVOCATE_CALL > 0.0);
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn advocate_weight_is_overcome_by_user_input_and_handler() {
let unused_import_baseline = W_IMPORT_ADVOCATE;
let max_negatives = W_USER_INPUT_FLOW + W_ENCLOSING_HANDLER;
assert!(unused_import_baseline > 0.0);
assert!(unused_import_baseline + max_negatives < 0.0);
}
#[test]
fn user_input_lexicon() {
assert!(matches_user_input("url = req.body.url"));
assert!(matches_user_input("u = request.json['url']"));
assert!(matches_user_input("target = request.args.get('u')"));
assert!(matches_user_input("p = ctx.params['target']"));
assert!(!matches_user_input("u = os.environ['API_URL']"));
}
#[test]
fn allowlist_call_lexicon() {
assert!(matches_allowlist_call("if is_safe_url(url):"));
assert!(matches_allowlist_call("validators.url(url, public=False)"));
assert!(matches_allowlist_call("validate_url(url)"));
assert!(!matches_allowlist_call(
"# call is_safe_url before requesting"
));
assert!(!matches_allowlist_call("is_safe_url_flag = True"));
}
#[test]
fn scheme_hostname_allowlist_lexicon() {
assert!(matches_scheme_hostname_allowlist(
"if parsed.hostname in ALLOWED_HOSTS:"
));
assert!(matches_scheme_hostname_allowlist(
"if parsed.scheme in {'http', 'https'}:"
));
assert!(matches_scheme_hostname_allowlist(
"allowed_hosts = ['x.com']"
));
assert!(!matches_scheme_hostname_allowlist(
"hostname = parsed.hostname"
));
}
#[test]
fn private_ip_guard_lexicon() {
assert!(matches_private_ip_guard(
"if ipaddress.ip_address(host).is_private:"
));
assert!(matches_private_ip_guard(
"if ipaddress.ip_address(h).is_loopback:"
));
assert!(matches_private_ip_guard("ip = ip_address(host)"));
assert!(!matches_private_ip_guard("host = 'example.com'"));
}
#[test]
fn handler_lexicon() {
assert!(matches_handler_function("proxy_handler"));
assert!(matches_handler_function("fetch_route"));
assert!(matches_handler_function("import_data"));
assert!(matches_handler_function("download_endpoint"));
assert!(matches_handler_function("webhook_callback"));
assert!(!matches_handler_function("calculate_total"));
}
#[test]
fn test_function_lexicon() {
assert!(matches_test_function("test_ssrf_handler"));
assert!(matches_test_function("ssrf_test"));
assert!(matches_test_function("setup_fixture"));
assert!(!matches_test_function("fetch_data"));
}
#[test]
fn extract_ssrf_safe_with_reason() {
assert_eq!(
extract_ssrf_safe_reason("requests.get(url) # repotoire: ssrf-safe[validated-by-cdn]"),
Some("validated-by-cdn".to_string())
);
}
#[test]
fn extract_ssrf_safe_without_reason() {
assert_eq!(
extract_ssrf_safe_reason("requests.get(url) # repotoire: ssrf-safe"),
Some("unspecified".to_string())
);
}
#[test]
fn extract_ssrf_vulnerable_with_source() {
assert_eq!(
extract_ssrf_vulnerable_source(
"advocate_session.get(url) # repotoire: ssrf-vulnerable[audited]"
),
Some("audited".to_string())
);
}
#[test]
fn extract_ssrf_safe_ignores_other_kinds() {
assert_eq!(
extract_ssrf_safe_reason("subprocess.run(...) # repotoire: command-static[ok]"),
None
);
assert_eq!(
extract_ssrf_safe_reason("ET.parse(blob) # repotoire: xxe-safe[ok]"),
None
);
}
#[test]
fn extract_ssrf_vulnerable_ignores_other_kinds() {
assert_eq!(
extract_ssrf_vulnerable_source(
"subprocess.run(...) # repotoire: command-user-controlled[GET]"
),
None
);
assert_eq!(
extract_ssrf_vulnerable_source("ET.parse(blob) # repotoire: xxe-vulnerable[audited]"),
None
);
}
#[test]
fn http_api_is_python_includes_recognized_libs() {
assert!(HttpApi::Advocate.is_python());
assert!(HttpApi::Requests.is_python());
assert!(HttpApi::Urllib.is_python());
assert!(HttpApi::Httpx.is_python());
assert!(HttpApi::Aiohttp.is_python());
assert!(!HttpApi::Unknown.is_python());
}
#[test]
fn http_api_callee_label_is_stable() {
assert_eq!(HttpApi::Advocate.callee_label(), "advocate");
assert_eq!(HttpApi::Requests.callee_label(), "requests");
assert_eq!(HttpApi::Urllib.callee_label(), "urllib");
assert_eq!(HttpApi::Httpx.callee_label(), "httpx");
assert_eq!(HttpApi::Aiohttp.callee_label(), "aiohttp");
assert_eq!(HttpApi::Unknown.callee_label(), "HTTP client");
}
}