use aa_security::policy::{
Capability as CanonCapability, CapabilitySet as CanonCapabilitySet, NetworkPolicy as CanonNetworkPolicy,
PolicyDocument as CanonPolicyDocument, ToolRule as CanonToolRule,
};
use crate::policy::document::PolicyDocument;
fn to_canon_capability(cap: &aa_core::Capability) -> CanonCapability {
match cap {
aa_core::Capability::FileRead => CanonCapability::FileRead,
aa_core::Capability::FileWrite => CanonCapability::FileWrite,
aa_core::Capability::NetworkOutbound => CanonCapability::NetworkOutbound,
aa_core::Capability::NetworkInbound => CanonCapability::NetworkInbound,
aa_core::Capability::TerminalExec => CanonCapability::TerminalExec,
aa_core::Capability::McpTool(n) => CanonCapability::McpTool(n.clone()),
aa_core::Capability::Model(n) => CanonCapability::Model(n.clone()),
aa_core::Capability::AgentSpawn => CanonCapability::AgentSpawn,
}
}
impl PolicyDocument {
pub fn to_canonical(&self) -> CanonPolicyDocument {
let capabilities = self.capabilities.as_ref().map(|caps| {
let mut set = CanonCapabilitySet::default();
for c in &caps.allow {
set.allow.insert(to_canon_capability(c));
}
for c in &caps.deny {
set.deny.insert(to_canon_capability(c));
}
set
});
let network = self.network.as_ref().map(|n| CanonNetworkPolicy {
allowlist: n.allowlist.clone(),
});
let mut tools: Vec<CanonToolRule> = self
.tools
.iter()
.map(|(name, t)| CanonToolRule {
name: name.clone(),
allow: t.allow,
requires_approval_if: t.requires_approval_if.clone(),
})
.collect();
tools.sort_by(|a, b| a.name.cmp(&b.name));
CanonPolicyDocument {
name: self.name.clone(),
network,
capabilities,
tools,
syscall_allowlist: None,
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use aa_core::CapabilitySet;
use super::*;
use crate::policy::document::ToolPolicy;
use crate::policy::scope::PolicyScope;
fn base_doc() -> PolicyDocument {
PolicyDocument {
name: Some("t".to_string()),
policy_version: None,
version: None,
scope: PolicyScope::Global,
network: None,
schedule: None,
budget: None,
data: None,
approval_timeout_secs: 300,
approval_policy: None,
tools: HashMap::new(),
capabilities: None,
}
}
#[test]
fn projects_capabilities() {
let mut caps = CapabilitySet::default();
caps.deny.insert(aa_core::Capability::FileWrite);
caps.allow.insert(aa_core::Capability::FileRead);
let mut doc = base_doc();
doc.capabilities = Some(caps);
let canon = doc.to_canonical();
let cc = canon.capabilities.unwrap();
assert!(cc.deny.contains(&CanonCapability::FileWrite));
assert!(cc.allow.contains(&CanonCapability::FileRead));
}
#[test]
fn projects_network_and_sorted_tools() {
let mut doc = base_doc();
doc.network = Some(crate::policy::document::NetworkPolicy {
allowlist: vec!["api.openai.com".to_string()],
});
doc.tools.insert(
"zebra".to_string(),
ToolPolicy {
allow: true,
limit_per_hour: None,
requires_approval_if: None,
},
);
doc.tools.insert(
"alpha".to_string(),
ToolPolicy {
allow: false,
limit_per_hour: None,
requires_approval_if: Some("path starts_with \"/etc\"".to_string()),
},
);
let canon = doc.to_canonical();
assert_eq!(canon.egress_allowlist(), ["api.openai.com"]);
assert_eq!(canon.tools[0].name, "alpha");
assert_eq!(canon.tools[1].name, "zebra");
assert_eq!(
canon.tools[0].requires_approval_if.as_deref(),
Some("path starts_with \"/etc\"")
);
}
#[test]
fn canonical_lowers_to_ebpf_rules() {
let mut caps = CapabilitySet::default();
caps.deny.insert(aa_core::Capability::FileWrite);
let mut doc = base_doc();
doc.capabilities = Some(caps);
let rules = aa_security::policy::lower_to_ebpf(&doc.to_canonical());
assert!(rules.deny_paths().any(|p| p == "/etc"));
}
}