use crate::findings::{
ArtifactKind, EvidenceKind, Finding, MatchTarget, RecommendedAction, Severity, ThreatCategory,
};
pub(crate) const BROAD_PERMISSION_THRESHOLD: usize = 3;
pub(crate) fn over_provisioning_finding(
permission_rules: &[(&str, &str, &str)],
artifact_path: &str,
artifact_kind: ArtifactKind,
) -> Option<Finding> {
(permission_rules.len() >= BROAD_PERMISSION_THRESHOLD).then(|| {
Finding::builder("SCOPE_OVERPROVISIONING", ThreatCategory::ScopeCreep)
.severity(Severity::Medium)
.action(RecommendedAction::RequireApproval)
.evidence_kind(EvidenceKind::Context)
.artifact(artifact_kind, Some(artifact_path.to_string()))
.matched_on(MatchTarget::ReferencedFile {
path: artifact_path.to_string(),
})
.match_value("broad declared permissions")
.reason("Artifact declares broad permissions or scopes relative to its apparent task")
.build()
})
}
pub(crate) fn capability_permission_mismatch_finding(
permission_rules: &[(&str, &str, &str)],
content: &str,
artifact_path: &str,
artifact_kind: ArtifactKind,
) -> Option<Finding> {
let (intent_kind, intent_strength) = infer_declared_intent(content);
let has_dangerous_permission_combo = permission_rules.iter().any(|(rule_id, _, _)| {
matches!(
*rule_id,
"DECLARED_PERMISSION_BROWSER_FULL"
| "DECLARED_PERMISSION_FILE_WRITE"
| "DECLARED_PERMISSION_SHELL_EXEC"
)
});
(intent_kind == "narrow" && intent_strength > 0 && has_dangerous_permission_combo).then(|| {
Finding::builder("CAPABILITY_PERMISSION_MISMATCH", ThreatCategory::ScopeCreep)
.severity(Severity::Medium)
.action(RecommendedAction::RequireApproval)
.evidence_kind(EvidenceKind::Intent)
.artifact(artifact_kind, Some(artifact_path.to_string()))
.matched_on(MatchTarget::ReferencedFile {
path: artifact_path.to_string(),
})
.match_value("narrow intent with broad capability request")
.reason(
"Artifact intent appears narrower than the capabilities or permissions it requests",
)
.build()
})
}
pub(crate) fn permission_context(content: &str) -> String {
let lines: Vec<_> = content.lines().collect();
let mut buffer = String::new();
let mut emitted_lines: std::collections::BTreeSet<usize> = Default::default();
for (index, line) in lines.iter().enumerate() {
let lower = line.to_ascii_lowercase();
let trimmed = line.trim_start();
let is_anchor = lower.contains("permission")
|| lower.contains("capabilit")
|| trimmed.starts_with("- ")
|| trimmed.starts_with("* ");
if !is_anchor {
continue;
}
const LINES_BEFORE: usize = 1;
const LINES_AFTER: usize = 2;
let start = index.saturating_sub(LINES_BEFORE);
let end = (index + 1 + LINES_AFTER).min(lines.len());
for (i, snippet) in lines.iter().enumerate().take(end).skip(start) {
if emitted_lines.insert(i) {
buffer.push_str(snippet);
buffer.push('\n');
}
}
}
if buffer.is_empty() {
content.to_string()
} else {
buffer
}
}
pub(crate) fn intent_context(content: &str) -> String {
let mut buffer = String::new();
let lines: Vec<_> = content.lines().collect();
for (index, line) in lines.iter().enumerate() {
let lower = line.to_ascii_lowercase();
if lower.contains("intent")
|| lower.contains("goal")
|| lower.contains("purpose")
|| lower.contains("summary")
|| lower.contains("workflow")
{
let start = index;
let end = (index + 4).min(lines.len());
for snippet in &lines[start..end] {
buffer.push_str(snippet);
buffer.push('\n');
}
}
}
if buffer.is_empty() {
content.to_string()
} else {
buffer
}
}
pub(crate) fn infer_declared_intent(content: &str) -> (&'static str, usize) {
let context = intent_context(content).to_ascii_lowercase();
let narrow_terms = [
"read-only",
"summarize",
"list",
"inspect",
"audit",
"review",
"search",
"lookup",
];
let broad_terms = [
"modify",
"delete",
"write",
"execute",
"deploy",
"install",
"full access",
"admin",
];
let narrow_score = narrow_terms
.iter()
.filter(|term| context.contains(**term))
.count();
let broad_score = broad_terms
.iter()
.filter(|term| context.contains(**term))
.count();
if narrow_score > broad_score && narrow_score > 0 {
("narrow", narrow_score)
} else if broad_score > 0 {
("broad", broad_score)
} else {
("unknown", 0)
}
}
pub(crate) fn explicit_declared_permission_rules(
content: &str,
) -> Vec<(&'static str, &'static str, &'static str)> {
let context = permission_context(content).to_ascii_lowercase();
let mut rules = Vec::new();
if context.contains("browser: full")
|| context.contains("full autonomous browser")
|| context.contains("allow-all browser")
|| context.contains("click any element")
{
rules.push((
"DECLARED_PERMISSION_BROWSER_FULL",
"browser full",
"Artifact declares broad browser automation permissions",
));
}
if context.contains("write file")
|| context.contains("write files")
|| context.contains("modify files")
|| context.contains("delete file")
|| context.contains("delete files")
|| context.contains("remove file")
|| context.contains("remove files")
|| context.contains("wipe file")
|| context.contains("wipe files")
|| context.contains("erase file")
|| context.contains("erase files")
|| context.contains("unlink file")
|| context.contains("unlink files")
{
rules.push((
"DECLARED_PERMISSION_FILE_WRITE",
"file write",
"Artifact declares file modification or deletion capability",
));
}
if has_shell_exec_signal(&context) {
rules.push((
"DECLARED_PERMISSION_SHELL_EXEC",
"shell exec",
"Artifact declares shell or command execution capability",
));
}
if context.contains("network")
|| context.contains("external api")
|| context.contains("webhook")
|| context.contains("internet")
|| context.contains("outbound request")
{
rules.push((
"DECLARED_PERMISSION_NETWORK_ACCESS",
"network access",
"Artifact declares outbound network access",
));
}
if context.contains("token")
|| context.contains("secret")
|| context.contains("password")
|| context.contains("credential")
|| context.contains("cookie")
{
rules.push((
"DECLARED_PERMISSION_SECRETS_ACCESS",
"secrets access",
"Artifact declares access to secrets, tokens, or credentials",
));
}
if has_oauth_scope_signal(&context) {
rules.push((
"DECLARED_PERMISSION_OAUTH_SCOPES",
"oauth scopes",
"Artifact declares OAuth scopes or broad SaaS permissions",
));
}
rules
}
fn has_shell_exec_signal(context: &str) -> bool {
const SHELL_PHRASES: &[&str] = &[
"shell exec",
"shell access",
"shell command",
"shell script",
"run shell",
"spawn shell",
"system shell",
"open shell",
"opens a shell",
"shell: true",
"terminal command",
"run command",
"execute command",
"stdio",
];
SHELL_PHRASES.iter().any(|phrase| context.contains(phrase))
}
fn has_oauth_scope_signal(context: &str) -> bool {
context.contains("oauth")
|| context.contains("calendar")
|| context.contains("drive")
|| context.contains("slack")
|| context.contains("read/write")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn permission_context_does_not_duplicate_window_for_multi_match_line() {
let content = "header\n- permissions: capabilities: full\nbody1\nbody2\nfooter\n";
let ctx = permission_context(content);
let occurrences = ctx.matches("- permissions: capabilities: full").count();
assert_eq!(
occurrences, 1,
"Multi-condition line '{}' must appear exactly once in the context buffer; \
got {} occurrences. Buffer was:\n{}",
"- permissions: capabilities: full", occurrences, ctx
);
}
#[test]
fn permission_context_emits_distinct_windows_for_different_anchors() {
let content = "permission line one\nbody\ncapability line two\nbody\n";
let ctx = permission_context(content);
assert!(ctx.contains("permission line one"));
assert!(ctx.contains("capability line two"));
}
#[test]
fn permission_context_falls_back_to_full_content_when_no_anchor() {
let content = "no anchors here\njust prose\n";
let ctx = permission_context(content);
assert_eq!(ctx, content);
}
#[test]
fn permission_context_does_not_double_count_overlapping_window_lines() {
let content = "- permissions: A\n- capabilities: B\nshared line\nmore\n";
let ctx = permission_context(content);
let occurrences = ctx.matches("shared line").count();
assert_eq!(
occurrences, 1,
"Overlapping window line must appear exactly once; buffer:\n{ctx}"
);
}
#[test]
fn shell_exec_signal_does_not_fire_on_prose_lookalikes() {
let benign = [
"permissions: read seashells from the corpus",
"capabilities include eggshell calcium analysis",
"in a nutshell, this tool inspects logs",
"permission to summarise without a shell",
"* permissions: eggshell-thin error margins",
];
for sample in benign {
assert!(
!has_shell_exec_signal(&sample.to_ascii_lowercase()),
"must NOT classify benign prose as shell-exec: {sample:?}"
);
}
}
#[test]
fn shell_exec_signal_fires_on_genuine_declarations() {
let positive = [
"- permissions: shell exec",
"capabilities: shell access on the host",
"- run shell commands as the agent",
"permissions: shell: true",
"capabilities: terminal command execution",
"- run command on the operator's machine",
"executes commands via stdio",
];
for sample in positive {
assert!(
has_shell_exec_signal(&sample.to_ascii_lowercase()),
"must classify genuine declaration as shell-exec: {sample:?}"
);
}
}
#[test]
fn oauth_scope_signal_does_not_fire_on_prose_lookalikes() {
let benign = [
"this tool's scope is limited to read-only inspection",
"tasks that fall out of scope go to the human",
"the scope of work here is well-defined",
"permissions: in scope for review",
];
for sample in benign {
assert!(
!has_oauth_scope_signal(&sample.to_ascii_lowercase()),
"must NOT classify benign prose as oauth-scopes: {sample:?}"
);
}
}
#[test]
fn oauth_scope_signal_fires_on_genuine_declarations() {
let positive = [
"- permissions: oauth tokens",
"capabilities: oauth scope spreadsheets.readonly",
"permissions: calendar.read",
"capabilities: drive.file write",
"permissions: slack messages.write",
"capabilities: read/write to the user's vault",
];
for sample in positive {
assert!(
has_oauth_scope_signal(&sample.to_ascii_lowercase()),
"must classify genuine declaration as oauth-scopes: {sample:?}"
);
}
}
#[test]
fn explicit_declared_permission_rules_does_not_misclassify_benign_prose() {
let benign = "\
# Capabilities
- This tool's scope is limited to read-only inspection.
- In a nutshell, no shell or browser access required.
- Permissions: nothing more than reading the file index.
";
let rules = explicit_declared_permission_rules(benign);
let ids: Vec<&str> = rules.iter().map(|(id, _, _)| *id).collect();
assert!(
!ids.contains(&"DECLARED_PERMISSION_SHELL_EXEC"),
"benign prose MUST NOT trigger SHELL_EXEC; got rules={ids:?}"
);
assert!(
!ids.contains(&"DECLARED_PERMISSION_OAUTH_SCOPES"),
"benign prose MUST NOT trigger OAUTH_SCOPES; got rules={ids:?}"
);
}
#[test]
fn file_write_signal_fires_on_genuine_file_deletion_declarations() {
let positive = [
"permissions: delete file from disk",
"capabilities: delete files in the workspace",
"- permissions: write files and delete file entries",
"- capabilities: modify files; delete files when required",
];
for sample in positive {
let rules = explicit_declared_permission_rules(sample);
let ids: Vec<&str> = rules.iter().map(|(id, _, _)| *id).collect();
assert!(
ids.contains(&"DECLARED_PERMISSION_FILE_WRITE"),
"must classify genuine file-deletion declaration as FileWrite: {sample:?} \
(got rules={ids:?})"
);
}
}
#[test]
fn file_write_signal_does_not_fire_on_delete_work_prose() {
let benign = [
"capabilities: delete work items from the Jira backlog",
"permissions: delete workflow steps owned by the user",
"- this skill helps delete work orders in the queue",
"capabilities: delete workspace metadata via the API",
];
for sample in benign {
let rules = explicit_declared_permission_rules(sample);
let ids: Vec<&str> = rules.iter().map(|(id, _, _)| *id).collect();
assert!(
!ids.contains(&"DECLARED_PERMISSION_FILE_WRITE"),
"benign 'delete work …' prose MUST NOT trigger FileWrite: {sample:?} \
(got rules={ids:?})"
);
}
}
}