use super::config::AuthAnalysisRules;
use super::model::{
AnalysisUnit, AnalysisUnitKind, AuthCheck, AuthCheckKind, AuthorizationModel, OperationKind,
SensitiveOperation, ValueRef, ValueSourceKind,
};
use crate::patterns::Severity;
#[derive(Debug, Clone)]
pub struct AuthFinding {
pub rule_id: String,
pub severity: Severity,
pub span: (usize, usize),
pub message: String,
}
pub fn run_checks(model: &AuthorizationModel, rules: &AuthAnalysisRules) -> Vec<AuthFinding> {
let mut findings = Vec::new();
let web_signal = model.lang_web_framework_signal;
let lang = model.lang.as_str();
findings.extend(check_admin_routes(model, rules));
findings.extend(check_ownership_gaps(model, rules, web_signal, lang));
findings.extend(check_partial_batch_authorization(
model, rules, web_signal, lang,
));
findings.extend(check_stale_authorization(model, rules, web_signal, lang));
findings.extend(check_token_override_without_validation(
model, rules, web_signal, lang,
));
findings.sort_by(|a, b| a.span.cmp(&b.span).then_with(|| a.rule_id.cmp(&b.rule_id)));
findings.dedup_by(|a, b| a.span == b.span && a.rule_id == b.rule_id);
findings
}
fn check_admin_routes(model: &AuthorizationModel, rules: &AuthAnalysisRules) -> Vec<AuthFinding> {
let mut findings = Vec::new();
for route in &model.routes {
let Some(unit) = model.units.get(route.unit_idx) else {
continue;
};
let requires_admin =
rules.requires_admin_path(&route.path) || route_is_admin_sensitive(unit);
if !requires_admin {
continue;
}
let has_admin = route
.middleware_calls
.iter()
.any(|mw| rules.is_admin_guard(&mw.name, &mw.args));
let has_login = route
.middleware_calls
.iter()
.any(|mw| rules.is_login_guard(&mw.name) || rules.is_admin_guard(&mw.name, &mw.args));
if !has_admin && has_login {
findings.push(AuthFinding {
rule_id: rules.rule_id("admin_route_missing_admin_check"),
severity: Severity::High,
span: route.handler_span,
message: format!(
"route `{}` appears admin-sensitive but its middleware only enforces login-level access",
route.path
),
});
}
}
findings
}
fn check_ownership_gaps(
model: &AuthorizationModel,
rules: &AuthAnalysisRules,
web_signal: Option<bool>,
lang: &str,
) -> Vec<AuthFinding> {
let mut findings = Vec::new();
for unit in &model.units {
if !unit_has_user_input_evidence(unit, web_signal, lang) {
continue;
}
for op in &unit.operations {
if op.kind == OperationKind::TokenLookup {
continue;
}
if op.sink_class.is_some_and(|c| !c.is_auth_relevant()) {
continue;
}
if op.kind == OperationKind::Read && unit_is_auth_helper(unit) {
continue;
}
let relevant_subjects: Vec<&ValueRef> = op
.subjects
.iter()
.filter(|s| is_relevant_target_subject(s, unit))
.collect();
if relevant_subjects.is_empty() {
continue;
}
if op.kind == OperationKind::Read || op.kind == OperationKind::Mutation {
if is_delegated_read_with_actor_context(unit, op, &relevant_subjects) {
continue;
}
if !has_prior_subject_auth(unit, op, &relevant_subjects) {
findings.push(AuthFinding {
rule_id: rules.rule_id("missing_ownership_check"),
severity: Severity::High,
span: op.span,
message: format!(
"operation `{}` uses scoped identifier input without a preceding ownership or membership check",
op.callee
),
});
}
}
}
}
findings
}
fn check_partial_batch_authorization(
model: &AuthorizationModel,
rules: &AuthAnalysisRules,
web_signal: Option<bool>,
lang: &str,
) -> Vec<AuthFinding> {
let mut findings = Vec::new();
for unit in &model.units {
if !unit_has_user_input_evidence(unit, web_signal, lang) {
continue;
}
for op in &unit.operations {
if op.sink_class.is_some_and(|c| !c.is_auth_relevant()) {
continue;
}
let batch_subjects: Vec<&ValueRef> = op
.subjects
.iter()
.filter(|subject| is_batch_collection(subject))
.collect();
if batch_subjects.is_empty() {
continue;
}
let partial_check = unit.auth_checks.iter().any(|check| {
check.line <= op.line
&& check.subjects.iter().any(|subject| {
subject.source_kind == ValueSourceKind::ArrayIndex
&& subject.base.as_ref().is_some_and(|base| {
batch_subjects
.iter()
.any(|op_subject| op_subject.name == *base)
})
})
});
let full_collection_check = has_prior_collection_auth(unit, op, &batch_subjects);
if partial_check && !full_collection_check {
findings.push(AuthFinding {
rule_id: rules.rule_id("partial_batch_authorization"),
severity: Severity::High,
span: op.span,
message: format!(
"batch operation `{}` authorizes only a single indexed element before acting on the full collection",
op.callee
),
});
}
}
}
findings
}
fn check_stale_authorization(
model: &AuthorizationModel,
rules: &AuthAnalysisRules,
web_signal: Option<bool>,
lang: &str,
) -> Vec<AuthFinding> {
let mut findings = Vec::new();
for unit in &model.units {
if !unit_has_user_input_evidence(unit, web_signal, lang) {
continue;
}
for op in unit.operations.iter().filter(|operation| {
operation.kind == OperationKind::Mutation
&& operation.sink_class.is_none_or(|c| c.is_auth_relevant())
}) {
let session_subject = op.subjects.iter().any(is_stale_session_subject);
if !session_subject {
continue;
}
let has_fresh_auth = unit.auth_checks.iter().any(|check| {
check.line <= op.line
&& matches!(
check.kind,
AuthCheckKind::Ownership
| AuthCheckKind::Membership
| AuthCheckKind::AdminGuard
| AuthCheckKind::Other
)
});
if !has_fresh_auth {
findings.push(AuthFinding {
rule_id: rules.rule_id("stale_authorization"),
severity: Severity::Medium,
span: op.span,
message: format!(
"mutation `{}` relies on session-carried state without a fresh authorization check",
op.callee
),
});
}
}
}
findings
}
fn check_token_override_without_validation(
model: &AuthorizationModel,
rules: &AuthAnalysisRules,
web_signal: Option<bool>,
lang: &str,
) -> Vec<AuthFinding> {
let mut findings = Vec::new();
for unit in &model.units {
if !unit_has_user_input_evidence(unit, web_signal, lang) {
continue;
}
let Some(token_lookup) = unit
.operations
.iter()
.find(|operation| operation.kind == OperationKind::TokenLookup)
else {
continue;
};
let Some(final_write) = unit.operations.iter().rev().find(|operation| {
operation.kind == OperationKind::Mutation && operation.line >= token_lookup.line
}) else {
continue;
};
let override_pattern = (final_write.text.contains("||")
|| final_write
.text
.split(|ch: char| !ch.is_ascii_alphanumeric() && ch != '_')
.any(|segment| segment.eq_ignore_ascii_case("or")))
&& final_write
.subjects
.iter()
.any(|subject| subject.source_kind == ValueSourceKind::TokenField)
&& final_write
.subjects
.iter()
.any(|subject| subject.source_kind != ValueSourceKind::TokenField);
let has_expiry_check = unit
.auth_checks
.iter()
.any(|check| check.kind == AuthCheckKind::TokenExpiry)
|| unit
.condition_texts
.iter()
.any(|condition| rules.has_expiry_field(condition));
let has_recipient_check = unit
.auth_checks
.iter()
.any(|check| check.kind == AuthCheckKind::TokenRecipient)
|| unit
.condition_texts
.iter()
.any(|condition| rules.has_recipient_field(condition));
if override_pattern || !has_expiry_check || !has_recipient_check {
let mut missing = Vec::new();
if override_pattern {
missing.push("request data overrides token-bound values");
}
if !has_expiry_check {
missing.push("token expiration is not validated");
}
if !has_recipient_check {
missing.push("token recipient identity is not validated");
}
findings.push(AuthFinding {
rule_id: rules.rule_id("token_override_without_validation"),
severity: Severity::High,
span: final_write.span,
message: format!(
"token acceptance flow writes through `{}` without validating that {}",
final_write.callee,
missing.join(", ")
),
});
}
}
findings
}
fn route_is_admin_sensitive(unit: &AnalysisUnit) -> bool {
unit.call_sites.iter().any(|call| {
let lower = call.name.to_ascii_lowercase();
lower.contains("admin") || lower.contains("impersonat") || lower.contains("role")
})
}
fn has_prior_subject_auth(
unit: &AnalysisUnit,
op: &SensitiveOperation,
subjects: &[&ValueRef],
) -> bool {
if has_row_fetch_exemption(unit, op) {
return true;
}
let relevant_checks = unit.auth_checks.iter().filter(|check| {
check.line <= op.line
&& !matches!(
check.kind,
AuthCheckKind::LoginGuard
| AuthCheckKind::TokenExpiry
| AuthCheckKind::TokenRecipient
)
});
relevant_checks.into_iter().any(|check| {
subjects
.iter()
.any(|subject| auth_check_covers_subject(check, subject, unit))
})
}
fn has_row_fetch_exemption(unit: &AnalysisUnit, op: &SensitiveOperation) -> bool {
let row_var: Option<&str> = unit
.row_population_data
.iter()
.find_map(|(var, (line, _))| {
if *line == op.line {
Some(var.as_str())
} else {
None
}
});
let Some(row_var) = row_var else {
return false;
};
unit.auth_checks.iter().any(|check| {
if matches!(
check.kind,
AuthCheckKind::LoginGuard | AuthCheckKind::TokenExpiry | AuthCheckKind::TokenRecipient
) {
return false;
}
check
.subjects
.iter()
.any(|subj| chain_root(subj) == row_var)
})
}
fn chain_root(subj: &ValueRef) -> &str {
let raw = subj.base.as_deref().unwrap_or(subj.name.as_str());
raw.split('.').next().unwrap_or(raw)
}
fn has_prior_collection_auth(
unit: &AnalysisUnit,
op: &SensitiveOperation,
subjects: &[&ValueRef],
) -> bool {
let relevant_checks = unit.auth_checks.iter().filter(|check| {
check.line <= op.line
&& !matches!(
check.kind,
AuthCheckKind::LoginGuard
| AuthCheckKind::TokenExpiry
| AuthCheckKind::TokenRecipient
)
});
relevant_checks.into_iter().any(|check| {
subjects.iter().any(|subject| {
check.subjects.iter().any(|check_subject| {
check_subject.source_kind != ValueSourceKind::ArrayIndex
&& canonical_subject_name(check_subject) == subject.name
})
})
})
}
fn auth_check_covers_subject(check: &AuthCheck, subject: &ValueRef, unit: &AnalysisUnit) -> bool {
if check.is_route_level {
return true;
}
let subject_key = canonical_subject_name(subject);
let subject_related_base = related_subject_base(subject);
let subject_row_chain = row_binding_chain(unit, &subject.name);
let subject_anchor_authorized = subject_row_chain
.iter()
.any(|name| unit.authorized_sql_vars.contains(name));
let subject_alias_chain: Option<&str> = if subject.base.is_none() && subject.field.is_none() {
unit.var_alias_chain.get(&subject.name).map(|s| s.as_str())
} else {
None
};
let subject_populates: Vec<&str> = unit
.row_population_data
.iter()
.filter_map(|(row_var, (_line, args))| {
let matches_arg = args.iter().any(|arg| {
if canonical_subject_name(arg) == subject_key {
return true;
}
if let Some(chain) = subject_alias_chain
&& arg.name == chain
{
return true;
}
false
});
if matches_arg {
Some(row_var.as_str())
} else {
None
}
})
.collect();
check.subjects.iter().any(|check_subject| {
let check_key = canonical_subject_name(check_subject);
let check_related_base = related_subject_base(check_subject);
if check_key == subject_key
|| (subject_related_base.is_some() && subject_related_base == check_related_base)
|| (subject_related_base.as_ref() == Some(&check_key))
|| (check_related_base.as_ref() == Some(&subject_key))
{
return true;
}
for row in &subject_row_chain {
if check_key == *row || check_related_base.as_deref() == Some(row.as_str()) {
return true;
}
}
for row in &subject_populates {
if chain_root(check_subject) == *row {
return true;
}
}
if subject_anchor_authorized && unit.authorized_sql_vars.contains(&check_key) {
return true;
}
false
})
}
fn row_binding_chain(unit: &AnalysisUnit, start: &str) -> Vec<String> {
let mut chain: Vec<String> = Vec::new();
if start.is_empty() {
return chain;
}
let mut cur = start.to_string();
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut hops = 0;
while hops < 16 && seen.insert(cur.clone()) {
chain.push(cur.clone());
let Some(next) = unit.row_field_vars.get(&cur) else {
break;
};
cur = next.clone();
hops += 1;
}
chain
}
fn canonical_subject_name(subject: &ValueRef) -> String {
match subject.source_kind {
ValueSourceKind::ArrayIndex => subject.base.clone().unwrap_or_else(|| subject.name.clone()),
_ => subject.name.clone(),
}
}
fn related_subject_base(subject: &ValueRef) -> Option<String> {
let base = subject.base.as_deref()?;
let lower = base.to_ascii_lowercase();
if lower == "req"
|| lower.starts_with("req.")
|| lower == "request"
|| lower.starts_with("request.")
|| lower == "ctx"
|| lower.starts_with("ctx.")
|| lower == "session"
|| lower.starts_with("session.")
{
None
} else {
Some(base.to_string())
}
}
fn is_relevant_target_subject(subject: &ValueRef, unit: &AnalysisUnit) -> bool {
is_id_like(subject)
&& !is_actor_context_subject(subject, unit)
&& !is_const_bound_subject(subject, unit)
&& !is_typed_bounded_subject(subject, unit)
&& !is_caller_scope_entity_subject(subject, unit)
}
fn is_caller_scope_entity_subject(subject: &ValueRef, unit: &AnalysisUnit) -> bool {
let Some(field) = subject.field.as_deref() else {
return false;
};
let field_lower = field.to_ascii_lowercase();
if !matches!(field_lower.as_str(), "id" | "pk") {
return false;
}
let Some(base) = subject.base.as_deref() else {
return false;
};
let root = base.split('.').next().unwrap_or(base);
if !is_caller_scope_entity_name(root) {
return false;
}
unit.params.iter().any(|p| p == root)
}
fn is_caller_scope_entity_name(name: &str) -> bool {
let lower = name.to_ascii_lowercase();
matches!(
lower.as_str(),
"organization"
| "org"
| "project"
| "team"
| "workspace"
| "tenant"
| "account"
| "community"
| "group"
| "repository"
| "repo"
| "company"
)
}
fn is_const_bound_subject(subject: &ValueRef, unit: &AnalysisUnit) -> bool {
if subject.base.is_some() || subject.field.is_some() {
return false;
}
unit.const_bound_vars.contains(&subject.name)
}
fn is_typed_bounded_subject(subject: &ValueRef, unit: &AnalysisUnit) -> bool {
if subject.base.is_none() && subject.field.is_none() {
return unit.typed_bounded_vars.contains(&subject.name);
}
let Some(base) = subject.base.as_deref() else {
return false;
};
let Some(field) = subject.field.as_deref() else {
return false;
};
let root = base.split('.').next().unwrap_or(base);
unit.typed_bounded_dto_fields
.get(root)
.is_some_and(|fields| fields.iter().any(|f| f == field))
}
fn is_actor_context_subject(subject: &ValueRef, unit: &AnalysisUnit) -> bool {
if is_self_scoped_session_subject(subject) {
return true;
}
if let Some(base) = subject.base.as_deref()
&& unit.self_scoped_session_bases.contains(base)
&& subject.field.as_deref().is_some_and(is_self_actor_id_field)
{
return true;
}
if let Some(base) = subject.base.as_deref() {
let root = base.split('.').next().unwrap_or(base);
if unit.self_actor_vars.contains(root)
&& subject.field.as_deref().is_some_and(is_self_actor_id_field)
{
return true;
}
}
if unit.self_actor_id_vars.contains(&subject.name) {
return true;
}
matches!(
subject_identity_key(subject).as_deref(),
Some(
"ownerid"
| "authorid"
| "actorid"
| "currentuserid"
| "uploaderid"
| "createdby"
| "updatedby"
)
)
}
fn is_self_actor_id_field(field: &str) -> bool {
let lower = field.to_ascii_lowercase();
matches!(
lower.as_str(),
"id" | "user_id" | "userid" | "uid"
| "email" | "username" | "handle"
)
}
fn subject_identity_key(subject: &ValueRef) -> Option<String> {
let raw = match subject.source_kind {
ValueSourceKind::ArrayIndex => subject.base.as_deref().unwrap_or(&subject.name),
_ => subject
.field
.as_deref()
.or(subject.base.as_deref())
.unwrap_or(&subject.name),
};
let key: String = raw
.chars()
.filter(|c| c.is_ascii_alphanumeric())
.map(|c| c.to_ascii_lowercase())
.collect();
if key.is_empty() { None } else { Some(key) }
}
fn is_self_scoped_session_subject(subject: &ValueRef) -> bool {
subject.source_kind == ValueSourceKind::Session
&& subject
.base
.as_deref()
.is_some_and(is_self_scoped_session_base)
}
fn is_self_scoped_session_base(base: &str) -> bool {
matches!(
base,
"req.session.user"
| "request.session.user"
| "session.user"
| "req.session.currentUser"
| "request.session.currentUser"
| "session.currentUser"
| "req.user"
| "request.user"
| "req.currentUser"
| "request.currentUser"
| "ctx.session.user"
| "ctx.session.currentUser"
| "ctx.state.user"
| "ctx.state.currentUser"
)
}
fn is_stale_session_subject(subject: &ValueRef) -> bool {
subject.source_kind == ValueSourceKind::Session
&& is_id_like(subject)
&& !is_self_scoped_session_subject(subject)
}
fn unit_is_auth_helper(unit: &AnalysisUnit) -> bool {
let Some(name) = unit.name.as_deref() else {
return false;
};
let normalized: String = name
.chars()
.filter(|c| c.is_ascii_alphanumeric())
.map(|c| c.to_ascii_lowercase())
.collect();
(normalized.starts_with("has")
|| normalized.starts_with("check")
|| normalized.starts_with("require")
|| normalized.starts_with("verify")
|| normalized.starts_with("authorize")
|| normalized.starts_with("can")
|| normalized.starts_with("is"))
&& (normalized.contains("membership")
|| normalized.contains("ownership")
|| normalized.contains("access")
|| normalized.contains("permission")
|| normalized.contains("authoriz"))
}
fn is_delegated_read_with_actor_context(
unit: &AnalysisUnit,
op: &SensitiveOperation,
relevant_subjects: &[&ValueRef],
) -> bool {
unit.kind == AnalysisUnitKind::RouteHandler
&& op.kind == OperationKind::Read
&& op.callee.to_ascii_lowercase().contains("service")
&& op.subjects.iter().any(is_self_scoped_session_subject)
&& relevant_subjects.iter().any(|subject| {
matches!(
subject.source_kind,
ValueSourceKind::RequestParam
| ValueSourceKind::RequestBody
| ValueSourceKind::RequestQuery
)
})
}
fn is_id_like(subject: &ValueRef) -> bool {
let field = subject
.field
.as_deref()
.or(subject.base.as_deref())
.unwrap_or(&subject.name);
is_id_like_name(field)
}
fn is_id_like_name(name: &str) -> bool {
let lower = name.to_ascii_lowercase();
lower == "id"
|| lower.ends_with("id")
|| lower.ends_with("_id")
|| lower.ends_with("ids")
|| lower.contains("workspaceid")
|| lower.contains("projectid")
|| lower.contains("noteid")
}
fn unit_has_user_input_evidence(unit: &AnalysisUnit, web_signal: Option<bool>, lang: &str) -> bool {
if unit.kind == AnalysisUnitKind::RouteHandler {
return true;
}
if web_signal == Some(false) {
return false;
}
if !unit.context_inputs.is_empty() {
return true;
}
unit.params
.iter()
.any(|p| is_external_input_param_name_for_lang(p, lang))
}
#[cfg(test)]
fn is_external_input_param_name(name: &str) -> bool {
is_external_input_param_name_for_lang(name, "")
}
fn is_external_input_param_name_for_lang(name: &str, lang: &str) -> bool {
if name.starts_with("mock_") || name.starts_with("mocked_") {
return false;
}
if is_id_like_name(name) {
return true;
}
let lower = name.to_ascii_lowercase();
if lower == "token" || lower.ends_with("_token") || lower.ends_with("token") {
return true;
}
if matches!(lang, "go") {
return matches!(lower.as_str(), "req" | "request");
}
matches!(
lower.as_str(),
"req"
| "request"
| "ctx"
| "context"
| "info"
| "path"
| "payload"
| "body"
| "dto"
| "form"
| "query"
)
}
fn is_batch_collection(subject: &ValueRef) -> bool {
subject.source_kind == ValueSourceKind::Identifier
&& subject.name.to_ascii_lowercase().ends_with("ids")
}
#[cfg(test)]
mod tests {
use super::{
auth_check_covers_subject, is_actor_context_subject, is_caller_scope_entity_name,
is_caller_scope_entity_subject, is_external_input_param_name, is_relevant_target_subject,
unit_has_user_input_evidence,
};
use crate::auth_analysis::model::{AnalysisUnit, AnalysisUnitKind, ValueRef, ValueSourceKind};
use std::collections::{HashMap, HashSet};
fn empty_unit() -> AnalysisUnit {
AnalysisUnit {
kind: AnalysisUnitKind::Function,
name: Some("handle".into()),
span: (0, 0),
params: Vec::new(),
context_inputs: Vec::new(),
call_sites: Vec::new(),
auth_checks: Vec::new(),
operations: Vec::new(),
value_refs: Vec::new(),
condition_texts: Vec::new(),
line: 1,
row_field_vars: HashMap::new(),
var_alias_chain: HashMap::new(),
row_population_data: HashMap::new(),
self_actor_vars: HashSet::new(),
self_actor_id_vars: HashSet::new(),
authorized_sql_vars: HashSet::new(),
const_bound_vars: HashSet::new(),
typed_bounded_vars: HashSet::new(),
typed_bounded_dto_fields: HashMap::new(),
self_scoped_session_bases: HashSet::new(),
}
}
fn member(base: &str, field: &str) -> ValueRef {
ValueRef {
source_kind: ValueSourceKind::MemberField,
name: format!("{base}.{field}"),
base: Some(base.to_string()),
field: Some(field.to_string()),
index: None,
span: (0, 0),
}
}
#[test]
fn self_actor_var_widens_actor_context_for_self_id_fields() {
let mut unit = empty_unit();
unit.self_actor_vars.insert("user".into());
assert!(is_actor_context_subject(&member("user", "id"), &unit));
assert!(is_actor_context_subject(&member("user", "user_id"), &unit));
assert!(is_actor_context_subject(&member("user", "uid"), &unit));
assert!(!is_actor_context_subject(
&member("user", "group_id"),
&unit
));
assert!(!is_actor_context_subject(
&member("user", "workspace_id"),
&unit
));
assert!(!is_actor_context_subject(&member("target", "id"), &unit));
}
#[test]
fn self_actor_var_suppresses_relevant_subject_for_self_id() {
let mut unit = empty_unit();
unit.self_actor_vars.insert("user".into());
assert!(!is_relevant_target_subject(&member("user", "id"), &unit));
assert!(is_relevant_target_subject(
&member("user", "group_id"),
&unit
));
}
fn plain(name: &str) -> ValueRef {
ValueRef {
source_kind: ValueSourceKind::Identifier,
name: name.to_string(),
base: None,
field: None,
index: None,
span: (0, 0),
}
}
#[test]
fn self_actor_id_vars_widens_actor_context_for_plain_subjects() {
let mut unit = empty_unit();
unit.self_actor_id_vars.insert("uid".into());
assert!(is_actor_context_subject(&plain("uid"), &unit));
assert!(!is_actor_context_subject(&plain("trip_id"), &unit));
assert!(!is_actor_context_subject(&plain("doc_id"), &unit));
}
#[test]
fn self_actor_id_field_set_includes_email_username_handle() {
let mut unit = empty_unit();
unit.self_actor_vars.insert("user".into());
assert!(is_actor_context_subject(&member("user", "email"), &unit));
assert!(is_actor_context_subject(&member("user", "username"), &unit));
assert!(is_actor_context_subject(&member("user", "handle"), &unit));
assert!(!is_actor_context_subject(&member("target", "email"), &unit));
}
#[test]
fn const_bound_plain_subjects_are_not_relevant() {
let mut unit = empty_unit();
unit.const_bound_vars.insert("id".into());
assert!(!is_relevant_target_subject(&plain("id"), &unit));
let unit2 = empty_unit();
assert!(is_relevant_target_subject(&plain("id"), &unit2));
unit.const_bound_vars.insert("req".into());
assert!(is_relevant_target_subject(&member("req", "id"), &unit));
}
#[test]
fn caller_scope_entity_subject_recognises_unit_param_id() {
let mut unit = empty_unit();
unit.params.push("organization".into());
assert!(is_caller_scope_entity_subject(
&member("organization", "id"),
&unit
));
assert!(is_caller_scope_entity_subject(
&member("organization", "pk"),
&unit
));
assert!(!is_relevant_target_subject(
&member("organization", "id"),
&unit
));
let mut unit_p = empty_unit();
unit_p.params.push("project".into());
assert!(is_caller_scope_entity_subject(
&member("project", "id"),
&unit_p
));
let mut unit_t = empty_unit();
unit_t.params.push("team".into());
assert!(is_caller_scope_entity_subject(
&member("team", "id"),
&unit_t
));
let mut unit_w = empty_unit();
unit_w.params.push("workspace".into());
assert!(is_caller_scope_entity_subject(
&member("workspace", "id"),
&unit_w
));
let mut unit_r = empty_unit();
unit_r.params.push("repo".into());
assert!(is_caller_scope_entity_subject(
&member("repo", "id"),
&unit_r
));
}
#[test]
fn caller_scope_entity_subject_does_not_overreach() {
let unit = empty_unit();
assert!(!is_caller_scope_entity_subject(
&member("organization", "id"),
&unit
));
let mut unit = empty_unit();
unit.params.push("organization".into());
assert!(!is_caller_scope_entity_subject(
&member("organization", "name"),
&unit
));
assert!(!is_caller_scope_entity_subject(
&member("organization", "slug"),
&unit
));
let mut unit_u = empty_unit();
unit_u.params.push("user".into());
assert!(!is_caller_scope_entity_subject(
&member("user", "id"),
&unit_u
));
let mut unit_m = empty_unit();
unit_m.params.push("member".into());
assert!(!is_caller_scope_entity_subject(
&member("member", "id"),
&unit_m
));
let mut unit_b = empty_unit();
unit_b.params.push("organization".into());
assert!(!is_caller_scope_entity_subject(
&plain("organization"),
&unit_b
));
}
#[test]
fn caller_scope_entity_name_vocabulary() {
for name in [
"organization",
"Organization",
"ORG",
"project",
"team",
"workspace",
"tenant",
"account",
"community",
"group",
"repository",
"repo",
"company",
] {
assert!(
is_caller_scope_entity_name(name),
"expected {name} to be recognised as scope entity"
);
}
for name in ["user", "member", "actor", "request", "self", "ctx"] {
assert!(
!is_caller_scope_entity_name(name),
"expected {name} NOT to be recognised as scope entity"
);
}
}
#[test]
fn typed_bounded_plain_subjects_are_not_relevant() {
let mut unit = empty_unit();
unit.typed_bounded_vars.insert("user_id".into());
assert!(!is_relevant_target_subject(&plain("user_id"), &unit));
let unit2 = empty_unit();
assert!(is_relevant_target_subject(&plain("user_id"), &unit2));
unit.typed_bounded_vars.insert("req".into());
assert!(is_relevant_target_subject(&member("req", "user_id"), &unit));
}
#[test]
fn unit_user_input_evidence_recognises_external_inputs() {
let mut unit = empty_unit();
assert!(!unit_has_user_input_evidence(&unit, None, ""));
unit.params.push("apps".into());
unit.params.push("schema_editor".into());
assert!(!unit_has_user_input_evidence(&unit, None, ""));
let mut unit = empty_unit();
unit.params.push("config".into());
unit.params.push("items".into());
assert!(!unit_has_user_input_evidence(&unit, None, ""));
unit.params.push("doc_id".into());
assert!(unit_has_user_input_evidence(&unit, None, ""));
let mut unit = empty_unit();
unit.params.push("token".into());
unit.params.push("currentUser".into());
unit.params.push("roleOverride".into());
assert!(unit_has_user_input_evidence(&unit, None, ""));
let mut unit = empty_unit();
unit.params.push("request".into());
assert!(unit_has_user_input_evidence(&unit, None, ""));
let mut unit = empty_unit();
unit.params.push("path".into());
assert!(unit_has_user_input_evidence(&unit, None, ""));
let mut unit = empty_unit();
unit.kind = AnalysisUnitKind::RouteHandler;
assert!(unit_has_user_input_evidence(&unit, None, ""));
}
#[test]
fn web_framework_signal_gates_user_input_heuristics() {
let mut unit = empty_unit();
unit.params.push("session_id".into());
assert!(unit_has_user_input_evidence(&unit, None, ""));
assert!(unit_has_user_input_evidence(&unit, Some(true), ""));
assert!(!unit_has_user_input_evidence(&unit, Some(false), ""));
unit.kind = AnalysisUnitKind::RouteHandler;
assert!(unit_has_user_input_evidence(&unit, Some(false), ""));
let mut unit = empty_unit();
unit.context_inputs.push(ValueRef {
source_kind: ValueSourceKind::Session,
name: "session.update".into(),
base: Some("session".into()),
field: Some("update".into()),
index: None,
span: (0, 0),
});
assert!(unit_has_user_input_evidence(&unit, None, ""));
assert!(unit_has_user_input_evidence(&unit, Some(true), ""));
assert!(!unit_has_user_input_evidence(&unit, Some(false), ""));
}
#[test]
fn external_input_param_name_classification() {
assert!(is_external_input_param_name("id"));
assert!(is_external_input_param_name("doc_id"));
assert!(is_external_input_param_name("groupId"));
assert!(is_external_input_param_name("voucher_code_ids"));
assert!(is_external_input_param_name("token"));
assert!(is_external_input_param_name("access_token"));
assert!(is_external_input_param_name("refreshToken"));
assert!(is_external_input_param_name("request"));
assert!(is_external_input_param_name("req"));
assert!(is_external_input_param_name("ctx"));
assert!(is_external_input_param_name("path"));
assert!(is_external_input_param_name("payload"));
assert!(is_external_input_param_name("dto"));
assert!(is_external_input_param_name("query"));
assert!(!is_external_input_param_name("apps"));
assert!(!is_external_input_param_name("schema_editor"));
assert!(!is_external_input_param_name("config"));
assert!(!is_external_input_param_name("items"));
assert!(!is_external_input_param_name("promotion"));
assert!(!is_external_input_param_name("update_rule_variants"));
assert!(!is_external_input_param_name("manager"));
assert!(!is_external_input_param_name("c"));
assert!(!is_external_input_param_name("mock_project_id"));
assert!(!is_external_input_param_name("mock_session"));
assert!(!is_external_input_param_name("mock_user_id"));
assert!(!is_external_input_param_name("mocked_request"));
assert!(!is_external_input_param_name("mocked_token"));
}
#[test]
fn external_input_param_name_for_go_narrows_allowlist() {
use super::is_external_input_param_name_for_lang as f;
assert!(f("user_id", "go"));
assert!(f("repoID", "go"));
assert!(f("access_token", "go"));
assert!(f("req", "go"));
assert!(f("request", "go"));
assert!(!f("ctx", "go"));
assert!(!f("context", "go"));
assert!(!f("info", "go"));
assert!(!f("body", "go"));
assert!(!f("path", "go"));
assert!(!f("payload", "go"));
assert!(!f("dto", "go"));
assert!(!f("form", "go"));
assert!(!f("query", "go"));
assert!(f("ctx", "javascript"));
assert!(f("body", "typescript"));
assert!(f("path", "rust"));
assert!(f("payload", "python"));
}
#[test]
fn row_fetch_exemption_covers_fetch_when_check_names_row() {
use super::has_row_fetch_exemption;
use crate::auth_analysis::model::{
AuthCheck, AuthCheckKind, OperationKind, SensitiveOperation,
};
let mut unit = empty_unit();
unit.row_population_data.insert(
"community".to_string(),
(10, vec![member("data", "community_id")]),
);
unit.auth_checks.push(AuthCheck {
kind: AuthCheckKind::Membership,
callee: "check_community_user_action".into(),
subjects: vec![member("community", "id")],
span: (0, 0),
line: 20,
args: Vec::new(),
condition_text: None,
is_route_level: false,
});
let fetch_op = SensitiveOperation {
kind: OperationKind::Read,
sink_class: None,
callee: "Community.read".into(),
subjects: vec![member("data", "community_id")],
span: (0, 0),
line: 10,
text: String::new(),
};
assert!(has_row_fetch_exemption(&unit, &fetch_op));
let mid_op = SensitiveOperation {
kind: OperationKind::Mutation,
sink_class: None,
callee: "delete_post".into(),
subjects: vec![member("data", "post_id")],
span: (0, 0),
line: 15,
text: String::new(),
};
assert!(!has_row_fetch_exemption(&unit, &mid_op));
}
#[test]
fn row_fetch_exemption_skips_when_no_check_names_row() {
use super::has_row_fetch_exemption;
use crate::auth_analysis::model::{OperationKind, SensitiveOperation};
let mut unit = empty_unit();
unit.row_population_data.insert(
"community".to_string(),
(10, vec![member("data", "community_id")]),
);
let fetch_op = SensitiveOperation {
kind: OperationKind::Read,
sink_class: None,
callee: "Community.read".into(),
subjects: vec![member("data", "community_id")],
span: (0, 0),
line: 10,
text: String::new(),
};
assert!(!has_row_fetch_exemption(&unit, &fetch_op));
}
#[test]
fn row_fetch_exemption_ignores_login_token_checks() {
use super::has_row_fetch_exemption;
use crate::auth_analysis::model::{
AuthCheck, AuthCheckKind, OperationKind, SensitiveOperation,
};
let mut unit = empty_unit();
unit.row_population_data.insert(
"community".to_string(),
(10, vec![member("data", "community_id")]),
);
unit.auth_checks.push(AuthCheck {
kind: AuthCheckKind::LoginGuard,
callee: "require_login".into(),
subjects: vec![member("community", "id")],
span: (0, 0),
line: 20,
args: Vec::new(),
condition_text: None,
is_route_level: false,
});
let fetch_op = SensitiveOperation {
kind: OperationKind::Read,
sink_class: None,
callee: "Community.read".into(),
subjects: vec![member("data", "community_id")],
span: (0, 0),
line: 10,
text: String::new(),
};
assert!(!has_row_fetch_exemption(&unit, &fetch_op));
}
#[test]
fn auth_check_covers_subject_via_row_population_reverse_walk() {
use crate::auth_analysis::model::{AuthCheck, AuthCheckKind};
let mut unit = empty_unit();
unit.row_population_data.insert(
"community".to_string(),
(10, vec![member("data", "community_id")]),
);
let check = AuthCheck {
kind: AuthCheckKind::Membership,
callee: "check_community_user_action".into(),
subjects: vec![member("community", "id")],
span: (0, 0),
line: 20,
args: Vec::new(),
condition_text: None,
is_route_level: false,
};
assert!(auth_check_covers_subject(
&check,
&member("data", "community_id"),
&unit
));
assert!(!auth_check_covers_subject(
&check,
&member("data", "post_id"),
&unit
));
}
#[test]
fn auth_check_covers_subject_via_row_population_reverse_walk_plain_arg() {
use crate::auth_analysis::model::{AuthCheck, AuthCheckKind};
let mut unit = empty_unit();
unit.row_population_data
.insert("community".to_string(), (10, vec![plain("community_id")]));
let check = AuthCheck {
kind: AuthCheckKind::Membership,
callee: "check_community_mod_action".into(),
subjects: vec![member("community", "id")],
span: (0, 0),
line: 20,
args: Vec::new(),
condition_text: None,
is_route_level: false,
};
assert!(auth_check_covers_subject(
&check,
&plain("community_id"),
&unit
));
assert!(!auth_check_covers_subject(&check, &plain("post_id"), &unit));
}
#[test]
fn auth_check_covers_subject_via_row_population_alias_chain() {
use crate::auth_analysis::model::{AuthCheck, AuthCheckKind};
let mut unit = empty_unit();
unit.row_population_data.insert(
"community".to_string(),
(10, vec![member("req", "community_id")]),
);
unit.var_alias_chain
.insert("community_id".to_string(), "req.community_id".to_string());
let check = AuthCheck {
kind: AuthCheckKind::Membership,
callee: "check_community_user_action".into(),
subjects: vec![member("community", "id")],
span: (0, 0),
line: 20,
args: Vec::new(),
condition_text: None,
is_route_level: false,
};
assert!(auth_check_covers_subject(
&check,
&plain("community_id"),
&unit
));
assert!(auth_check_covers_subject(
&check,
&member("req", "community_id"),
&unit
));
assert!(!auth_check_covers_subject(&check, &plain("post_id"), &unit));
}
#[test]
fn auth_check_covers_subject_route_level_short_circuits() {
use crate::auth_analysis::model::{AuthCheck, AuthCheckKind};
let unit = empty_unit();
let route_check = AuthCheck {
kind: AuthCheckKind::Other,
callee: "requires_access_dag".into(),
subjects: Vec::new(), span: (0, 0),
line: 0,
args: Vec::new(),
condition_text: None,
is_route_level: true,
};
assert!(auth_check_covers_subject(
&route_check,
&plain("dag_id"),
&unit
));
assert!(auth_check_covers_subject(
&route_check,
&member("req", "dag_run_id"),
&unit
));
assert!(auth_check_covers_subject(
&route_check,
&plain("dag"),
&unit
));
let in_body_check = AuthCheck {
kind: AuthCheckKind::Other,
callee: "requires_access_dag".into(),
subjects: Vec::new(),
span: (0, 0),
line: 0,
args: Vec::new(),
condition_text: None,
is_route_level: false,
};
assert!(!auth_check_covers_subject(
&in_body_check,
&plain("dag_id"),
&unit
));
}
}