use parking_lot::Mutex;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::time::Instant;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub enum AutonomyLevel {
ReadOnly,
#[default]
Supervised,
Full,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CommandRiskLevel {
Low,
Medium,
High,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ToolOperation {
Read,
Act,
}
#[derive(Debug)]
pub struct ActionTracker {
actions: Mutex<Vec<Instant>>,
}
impl ActionTracker {
pub fn new() -> Self {
Self {
actions: Mutex::new(Vec::new()),
}
}
pub fn record(&self) -> usize {
let mut actions = self.actions.lock();
let cutoff = Instant::now()
.checked_sub(std::time::Duration::from_secs(3600))
.unwrap_or_else(Instant::now);
actions.retain(|t| *t > cutoff);
actions.push(Instant::now());
actions.len()
}
pub fn count(&self) -> usize {
let mut actions = self.actions.lock();
let cutoff = Instant::now()
.checked_sub(std::time::Duration::from_secs(3600))
.unwrap_or_else(Instant::now);
actions.retain(|t| *t > cutoff);
actions.len()
}
}
impl Clone for ActionTracker {
fn clone(&self) -> Self {
let actions = self.actions.lock();
Self {
actions: Mutex::new(actions.clone()),
}
}
}
#[derive(Debug, Clone)]
pub struct SecurityPolicy {
pub autonomy: AutonomyLevel,
pub workspace_dir: PathBuf,
pub workspace_only: bool,
pub allowed_commands: Vec<String>,
pub forbidden_paths: Vec<String>,
pub allowed_roots: Vec<PathBuf>,
pub max_actions_per_hour: u32,
pub max_cost_per_day_cents: u32,
pub require_approval_for_medium_risk: bool,
pub block_high_risk_commands: bool,
pub shell_env_passthrough: Vec<String>,
pub tracker: ActionTracker,
}
impl Default for SecurityPolicy {
fn default() -> Self {
Self {
autonomy: AutonomyLevel::Supervised,
workspace_dir: PathBuf::from("."),
workspace_only: true,
allowed_commands: vec![
"git".into(),
"npm".into(),
"cargo".into(),
"ls".into(),
"cat".into(),
"grep".into(),
"find".into(),
"echo".into(),
"pwd".into(),
"wc".into(),
"head".into(),
"tail".into(),
"date".into(),
],
forbidden_paths: vec![
"/etc".into(),
"/root".into(),
"/home".into(),
"/usr".into(),
"/bin".into(),
"/sbin".into(),
"/lib".into(),
"/opt".into(),
"/boot".into(),
"/dev".into(),
"/proc".into(),
"/sys".into(),
"/var".into(),
"/tmp".into(),
"~/.ssh".into(),
"~/.gnupg".into(),
"~/.aws".into(),
"~/.config".into(),
],
allowed_roots: Vec::new(),
max_actions_per_hour: 20,
max_cost_per_day_cents: 500,
require_approval_for_medium_risk: true,
block_high_risk_commands: true,
shell_env_passthrough: vec![],
tracker: ActionTracker::new(),
}
}
}
fn home_dir() -> Option<PathBuf> {
std::env::var_os("HOME").map(PathBuf::from)
}
fn expand_user_path(path: &str) -> PathBuf {
if path == "~" {
if let Some(home) = home_dir() {
return home;
}
}
if let Some(stripped) = path.strip_prefix("~/") {
if let Some(home) = home_dir() {
return home.join(stripped);
}
}
PathBuf::from(path)
}
fn skip_env_assignments(s: &str) -> &str {
let mut rest = s;
loop {
let Some(word) = rest.split_whitespace().next() else {
return rest;
};
if word.contains('=')
&& word
.chars()
.next()
.is_some_and(|c| c.is_ascii_alphabetic() || c == '_')
{
rest = rest[word.len()..].trim_start();
} else {
return rest;
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum QuoteState {
None,
Single,
Double,
}
fn split_unquoted_segments(command: &str) -> Vec<String> {
let mut segments = Vec::new();
let mut current = String::new();
let mut quote = QuoteState::None;
let mut escaped = false;
let mut chars = command.chars().peekable();
let push_segment = |segments: &mut Vec<String>, current: &mut String| {
let trimmed = current.trim();
if !trimmed.is_empty() {
segments.push(trimmed.to_string());
}
current.clear();
};
while let Some(ch) = chars.next() {
match quote {
QuoteState::Single => {
if ch == '\'' {
quote = QuoteState::None;
}
current.push(ch);
}
QuoteState::Double => {
if escaped {
escaped = false;
current.push(ch);
continue;
}
if ch == '\\' {
escaped = true;
current.push(ch);
continue;
}
if ch == '"' {
quote = QuoteState::None;
}
current.push(ch);
}
QuoteState::None => {
if escaped {
escaped = false;
current.push(ch);
continue;
}
if ch == '\\' {
escaped = true;
current.push(ch);
continue;
}
match ch {
'\'' => {
quote = QuoteState::Single;
current.push(ch);
}
'"' => {
quote = QuoteState::Double;
current.push(ch);
}
';' | '\n' => push_segment(&mut segments, &mut current),
'|' => {
if chars.next_if_eq(&'|').is_some() {
}
push_segment(&mut segments, &mut current);
}
'&' => {
if chars.next_if_eq(&'&').is_some() {
push_segment(&mut segments, &mut current);
} else {
current.push(ch);
}
}
_ => current.push(ch),
}
}
}
}
let trimmed = current.trim();
if !trimmed.is_empty() {
segments.push(trimmed.to_string());
}
segments
}
fn contains_unquoted_single_ampersand(command: &str) -> bool {
let mut quote = QuoteState::None;
let mut escaped = false;
let mut chars = command.chars().peekable();
while let Some(ch) = chars.next() {
match quote {
QuoteState::Single => {
if ch == '\'' {
quote = QuoteState::None;
}
}
QuoteState::Double => {
if escaped {
escaped = false;
continue;
}
if ch == '\\' {
escaped = true;
continue;
}
if ch == '"' {
quote = QuoteState::None;
}
}
QuoteState::None => {
if escaped {
escaped = false;
continue;
}
if ch == '\\' {
escaped = true;
continue;
}
match ch {
'\'' => quote = QuoteState::Single,
'"' => quote = QuoteState::Double,
'&' => {
if chars.next_if_eq(&'&').is_none() {
return true;
}
}
_ => {}
}
}
}
}
false
}
fn contains_unquoted_char(command: &str, target: char) -> bool {
let mut quote = QuoteState::None;
let mut escaped = false;
for ch in command.chars() {
match quote {
QuoteState::Single => {
if ch == '\'' {
quote = QuoteState::None;
}
}
QuoteState::Double => {
if escaped {
escaped = false;
continue;
}
if ch == '\\' {
escaped = true;
continue;
}
if ch == '"' {
quote = QuoteState::None;
}
}
QuoteState::None => {
if escaped {
escaped = false;
continue;
}
if ch == '\\' {
escaped = true;
continue;
}
match ch {
'\'' => quote = QuoteState::Single,
'"' => quote = QuoteState::Double,
_ if ch == target => return true,
_ => {}
}
}
}
}
false
}
fn contains_unquoted_shell_variable_expansion(command: &str) -> bool {
let mut quote = QuoteState::None;
let mut escaped = false;
let chars: Vec<char> = command.chars().collect();
for i in 0..chars.len() {
let ch = chars[i];
match quote {
QuoteState::Single => {
if ch == '\'' {
quote = QuoteState::None;
}
continue;
}
QuoteState::Double => {
if escaped {
escaped = false;
continue;
}
if ch == '\\' {
escaped = true;
continue;
}
if ch == '"' {
quote = QuoteState::None;
continue;
}
}
QuoteState::None => {
if escaped {
escaped = false;
continue;
}
if ch == '\\' {
escaped = true;
continue;
}
if ch == '\'' {
quote = QuoteState::Single;
continue;
}
if ch == '"' {
quote = QuoteState::Double;
continue;
}
}
}
if ch != '$' {
continue;
}
let Some(next) = chars.get(i + 1).copied() else {
continue;
};
if next.is_ascii_alphanumeric()
|| matches!(
next,
'_' | '{' | '(' | '#' | '?' | '!' | '$' | '*' | '@' | '-'
)
{
return true;
}
}
false
}
fn strip_wrapping_quotes(token: &str) -> &str {
token.trim_matches(|c| c == '"' || c == '\'')
}
fn looks_like_path(candidate: &str) -> bool {
candidate.starts_with('/')
|| candidate.starts_with("./")
|| candidate.starts_with("../")
|| candidate.starts_with('~')
|| candidate == "."
|| candidate == ".."
|| candidate.contains('/')
}
fn attached_short_option_value(token: &str) -> Option<&str> {
let body = token.strip_prefix('-')?;
if body.starts_with('-') || body.len() < 2 {
return None;
}
let value = body[1..].trim_start_matches('=').trim();
if value.is_empty() {
None
} else {
Some(value)
}
}
fn redirection_target(token: &str) -> Option<&str> {
let marker_idx = token.find(['<', '>'])?;
let mut rest = &token[marker_idx + 1..];
rest = rest.trim_start_matches(['<', '>']);
rest = rest.trim_start_matches('&');
rest = rest.trim_start_matches(|c: char| c.is_ascii_digit());
let trimmed = rest.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed)
}
}
fn is_allowlist_entry_match(allowed: &str, executable: &str, executable_base: &str) -> bool {
let allowed = strip_wrapping_quotes(allowed).trim();
if allowed.is_empty() {
return false;
}
if allowed == "*" {
return true;
}
if looks_like_path(allowed) {
let allowed_path = expand_user_path(allowed);
let executable_path = expand_user_path(executable);
return executable_path == allowed_path;
}
allowed == executable_base
}
impl SecurityPolicy {
pub fn command_risk_level(&self, command: &str) -> CommandRiskLevel {
let mut saw_medium = false;
for segment in split_unquoted_segments(command) {
let cmd_part = skip_env_assignments(&segment);
let mut words = cmd_part.split_whitespace();
let Some(base_raw) = words.next() else {
continue;
};
let base = base_raw
.rsplit('/')
.next()
.unwrap_or("")
.to_ascii_lowercase();
let args: Vec<String> = words.map(|w| w.to_ascii_lowercase()).collect();
let joined_segment = cmd_part.to_ascii_lowercase();
if matches!(
base.as_str(),
"rm" | "mkfs"
| "dd"
| "shutdown"
| "reboot"
| "halt"
| "poweroff"
| "sudo"
| "su"
| "chown"
| "chmod"
| "useradd"
| "userdel"
| "usermod"
| "passwd"
| "mount"
| "umount"
| "iptables"
| "ufw"
| "firewall-cmd"
| "curl"
| "wget"
| "nc"
| "ncat"
| "netcat"
| "scp"
| "ssh"
| "ftp"
| "telnet"
) {
return CommandRiskLevel::High;
}
if joined_segment.contains("rm -rf /")
|| joined_segment.contains("rm -fr /")
|| joined_segment.contains(":(){:|:&};:")
{
return CommandRiskLevel::High;
}
let medium = match base.as_str() {
"git" => args.first().is_some_and(|verb| {
matches!(
verb.as_str(),
"commit"
| "push"
| "reset"
| "clean"
| "rebase"
| "merge"
| "cherry-pick"
| "revert"
| "branch"
| "checkout"
| "switch"
| "tag"
)
}),
"npm" | "pnpm" | "yarn" => args.first().is_some_and(|verb| {
matches!(
verb.as_str(),
"install" | "add" | "remove" | "uninstall" | "update" | "publish"
)
}),
"cargo" => args.first().is_some_and(|verb| {
matches!(
verb.as_str(),
"add" | "remove" | "install" | "clean" | "publish"
)
}),
"touch" | "mkdir" | "mv" | "cp" | "ln" => true,
_ => false,
};
saw_medium |= medium;
}
if saw_medium {
CommandRiskLevel::Medium
} else {
CommandRiskLevel::Low
}
}
pub fn validate_command_execution(
&self,
command: &str,
approved: bool,
) -> Result<CommandRiskLevel, String> {
if !self.is_command_allowed(command) {
return Err(format!("Command not allowed by security policy: {command}"));
}
let risk = self.command_risk_level(command);
if risk == CommandRiskLevel::High {
if self.block_high_risk_commands {
return Err("Command blocked: high-risk command is disallowed by policy".into());
}
if self.autonomy == AutonomyLevel::Supervised && !approved {
return Err(
"Command requires explicit approval (approved=true): high-risk operation"
.into(),
);
}
}
if risk == CommandRiskLevel::Medium
&& self.autonomy == AutonomyLevel::Supervised
&& self.require_approval_for_medium_risk
&& !approved
{
return Err(
"Command requires explicit approval (approved=true): medium-risk operation".into(),
);
}
Ok(risk)
}
pub fn is_command_allowed(&self, command: &str) -> bool {
if self.autonomy == AutonomyLevel::ReadOnly {
return false;
}
if command.contains('`')
|| contains_unquoted_shell_variable_expansion(command)
|| command.contains("<(")
|| command.contains(">(")
{
return false;
}
if contains_unquoted_char(command, '>') || contains_unquoted_char(command, '<') {
return false;
}
if command
.split_whitespace()
.any(|w| w == "tee" || w.ends_with("/tee"))
{
return false;
}
if contains_unquoted_single_ampersand(command) {
return false;
}
let segments = split_unquoted_segments(command);
for segment in &segments {
let cmd_part = skip_env_assignments(segment);
let mut words = cmd_part.split_whitespace();
let executable = strip_wrapping_quotes(words.next().unwrap_or("")).trim();
let base_cmd = executable.rsplit('/').next().unwrap_or("");
if base_cmd.is_empty() {
continue;
}
if !self
.allowed_commands
.iter()
.any(|allowed| is_allowlist_entry_match(allowed, executable, base_cmd))
{
return false;
}
let args: Vec<String> = words.map(|w| w.to_ascii_lowercase()).collect();
if !self.is_args_safe(base_cmd, &args) {
return false;
}
}
let has_cmd = segments.iter().any(|s| {
let s = skip_env_assignments(s.trim());
s.split_whitespace().next().is_some_and(|w| !w.is_empty())
});
has_cmd
}
fn is_args_safe(&self, base: &str, args: &[String]) -> bool {
let base = base.to_ascii_lowercase();
match base.as_str() {
"find" => {
!args.iter().any(|arg| arg == "-exec" || arg == "-ok")
}
"git" => {
!args.iter().any(|arg| {
arg == "config"
|| arg.starts_with("config.")
|| arg == "alias"
|| arg.starts_with("alias.")
|| arg == "-c"
})
}
_ => true,
}
}
pub fn forbidden_path_argument(&self, command: &str) -> Option<String> {
let forbidden_candidate = |raw: &str| {
let candidate = strip_wrapping_quotes(raw).trim();
if candidate.is_empty() || candidate.contains("://") {
return None;
}
if looks_like_path(candidate) && !self.is_path_allowed(candidate) {
Some(candidate.to_string())
} else {
None
}
};
for segment in split_unquoted_segments(command) {
let cmd_part = skip_env_assignments(&segment);
let mut words = cmd_part.split_whitespace();
let Some(executable) = words.next() else {
continue;
};
if let Some(target) = redirection_target(strip_wrapping_quotes(executable)) {
if let Some(blocked) = forbidden_candidate(target) {
return Some(blocked);
}
}
for token in words {
let candidate = strip_wrapping_quotes(token).trim();
if candidate.is_empty() || candidate.contains("://") {
continue;
}
if let Some(target) = redirection_target(candidate) {
if let Some(blocked) = forbidden_candidate(target) {
return Some(blocked);
}
}
if candidate.starts_with('-') {
if let Some((_, value)) = candidate.split_once('=') {
if let Some(blocked) = forbidden_candidate(value) {
return Some(blocked);
}
}
if let Some(value) = attached_short_option_value(candidate) {
if let Some(blocked) = forbidden_candidate(value) {
return Some(blocked);
}
}
continue;
}
if let Some(blocked) = forbidden_candidate(candidate) {
return Some(blocked);
}
}
}
None
}
pub fn is_path_allowed(&self, path: &str) -> bool {
if path.contains('\0') {
return false;
}
if Path::new(path)
.components()
.any(|c| matches!(c, std::path::Component::ParentDir))
{
return false;
}
let lower = path.to_lowercase();
if lower.contains("..%2f") || lower.contains("%2f..") {
return false;
}
if path.starts_with('~') && path != "~" && !path.starts_with("~/") {
return false;
}
let expanded_path = expand_user_path(path);
if self.workspace_only && expanded_path.is_absolute() {
return false;
}
for forbidden in &self.forbidden_paths {
let forbidden_path = expand_user_path(forbidden);
if expanded_path.starts_with(forbidden_path) {
return false;
}
}
true
}
pub fn is_resolved_path_allowed(&self, resolved: &Path) -> bool {
let workspace_root = self
.workspace_dir
.canonicalize()
.unwrap_or_else(|_| self.workspace_dir.clone());
if resolved.starts_with(&workspace_root) {
return true;
}
for root in &self.allowed_roots {
let canonical = root.canonicalize().unwrap_or_else(|_| root.clone());
if resolved.starts_with(&canonical) {
return true;
}
}
for forbidden in &self.forbidden_paths {
let forbidden_path = expand_user_path(forbidden);
if resolved.starts_with(&forbidden_path) {
return false;
}
}
if !self.workspace_only {
return true;
}
false
}
pub fn resolved_path_violation_message(&self, resolved: &Path) -> String {
let guidance = if self.allowed_roots.is_empty() {
"Add the directory to [autonomy].allowed_roots (for example: allowed_roots = [\"/absolute/path\"]), or move the file into the workspace."
} else {
"Add a matching parent directory to [autonomy].allowed_roots, or move the file into the workspace."
};
format!(
"Resolved path escapes workspace allowlist: {}. {}",
resolved.display(),
guidance
)
}
pub fn can_act(&self) -> bool {
self.autonomy != AutonomyLevel::ReadOnly
}
pub fn enforce_tool_operation(
&self,
operation: ToolOperation,
operation_name: &str,
) -> Result<(), String> {
match operation {
ToolOperation::Read => Ok(()),
ToolOperation::Act => {
if !self.can_act() {
return Err(format!(
"Security policy: read-only mode, cannot perform '{operation_name}'"
));
}
if !self.record_action() {
return Err("Rate limit exceeded: action budget exhausted".to_string());
}
Ok(())
}
}
}
pub fn record_action(&self) -> bool {
let count = self.tracker.record();
count <= self.max_actions_per_hour as usize
}
pub fn is_rate_limited(&self) -> bool {
self.tracker.count() >= self.max_actions_per_hour as usize
}
pub fn from_config(
autonomy_config: &crate::config::AutonomyConfig,
workspace_dir: &Path,
) -> Self {
Self {
autonomy: autonomy_config.level,
workspace_dir: workspace_dir.to_path_buf(),
workspace_only: autonomy_config.workspace_only,
allowed_commands: autonomy_config.allowed_commands.clone(),
forbidden_paths: autonomy_config.forbidden_paths.clone(),
allowed_roots: autonomy_config
.allowed_roots
.iter()
.map(|root| {
let expanded = expand_user_path(root);
if expanded.is_absolute() {
expanded
} else {
workspace_dir.join(expanded)
}
})
.collect(),
max_actions_per_hour: autonomy_config.max_actions_per_hour,
max_cost_per_day_cents: autonomy_config.max_cost_per_day_cents,
require_approval_for_medium_risk: autonomy_config.require_approval_for_medium_risk,
block_high_risk_commands: autonomy_config.block_high_risk_commands,
shell_env_passthrough: autonomy_config.shell_env_passthrough.clone(),
tracker: ActionTracker::new(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn default_policy() -> SecurityPolicy {
SecurityPolicy::default()
}
fn readonly_policy() -> SecurityPolicy {
SecurityPolicy {
autonomy: AutonomyLevel::ReadOnly,
..SecurityPolicy::default()
}
}
fn full_policy() -> SecurityPolicy {
SecurityPolicy {
autonomy: AutonomyLevel::Full,
..SecurityPolicy::default()
}
}
#[test]
fn autonomy_default_is_supervised() {
assert_eq!(AutonomyLevel::default(), AutonomyLevel::Supervised);
}
#[test]
fn autonomy_serde_roundtrip() {
let json = serde_json::to_string(&AutonomyLevel::Full).unwrap();
assert_eq!(json, "\"full\"");
let parsed: AutonomyLevel = serde_json::from_str("\"readonly\"").unwrap();
assert_eq!(parsed, AutonomyLevel::ReadOnly);
let parsed2: AutonomyLevel = serde_json::from_str("\"supervised\"").unwrap();
assert_eq!(parsed2, AutonomyLevel::Supervised);
}
#[test]
fn can_act_readonly_false() {
assert!(!readonly_policy().can_act());
}
#[test]
fn can_act_supervised_true() {
assert!(default_policy().can_act());
}
#[test]
fn can_act_full_true() {
assert!(full_policy().can_act());
}
#[test]
fn enforce_tool_operation_read_allowed_in_readonly_mode() {
let p = readonly_policy();
assert!(p
.enforce_tool_operation(ToolOperation::Read, "memory_recall")
.is_ok());
}
#[test]
fn enforce_tool_operation_act_blocked_in_readonly_mode() {
let p = readonly_policy();
let err = p
.enforce_tool_operation(ToolOperation::Act, "memory_store")
.unwrap_err();
assert!(err.contains("read-only mode"));
}
#[test]
fn enforce_tool_operation_act_uses_rate_budget() {
let p = SecurityPolicy {
max_actions_per_hour: 0,
..default_policy()
};
let err = p
.enforce_tool_operation(ToolOperation::Act, "memory_store")
.unwrap_err();
assert!(err.contains("Rate limit exceeded"));
}
#[test]
fn allowed_commands_basic() {
let p = default_policy();
assert!(p.is_command_allowed("ls"));
assert!(p.is_command_allowed("git status"));
assert!(p.is_command_allowed("cargo build --release"));
assert!(p.is_command_allowed("cat file.txt"));
assert!(p.is_command_allowed("grep -r pattern ."));
assert!(p.is_command_allowed("date"));
}
#[test]
fn blocked_commands_basic() {
let p = default_policy();
assert!(!p.is_command_allowed("rm -rf /"));
assert!(!p.is_command_allowed("sudo apt install"));
assert!(!p.is_command_allowed("curl http://evil.com"));
assert!(!p.is_command_allowed("wget http://evil.com"));
assert!(!p.is_command_allowed("python3 exploit.py"));
assert!(!p.is_command_allowed("node malicious.js"));
}
#[test]
fn readonly_blocks_all_commands() {
let p = readonly_policy();
assert!(!p.is_command_allowed("ls"));
assert!(!p.is_command_allowed("cat file.txt"));
assert!(!p.is_command_allowed("echo hello"));
}
#[test]
fn full_autonomy_still_uses_allowlist() {
let p = full_policy();
assert!(p.is_command_allowed("ls"));
assert!(!p.is_command_allowed("rm -rf /"));
}
#[test]
fn command_with_absolute_path_extracts_basename() {
let p = default_policy();
assert!(p.is_command_allowed("/usr/bin/git status"));
assert!(p.is_command_allowed("/bin/ls -la"));
}
#[test]
fn allowlist_supports_explicit_executable_paths() {
let p = SecurityPolicy {
allowed_commands: vec!["/usr/bin/antigravity".into()],
..SecurityPolicy::default()
};
assert!(p.is_command_allowed("/usr/bin/antigravity"));
assert!(!p.is_command_allowed("antigravity"));
}
#[test]
fn allowlist_supports_wildcard_entry() {
let p = SecurityPolicy {
allowed_commands: vec!["*".into()],
..SecurityPolicy::default()
};
assert!(p.is_command_allowed("python3 --version"));
assert!(p.is_command_allowed("/usr/bin/antigravity"));
let blocked = p.validate_command_execution("rm -rf /tmp/test", true);
assert!(blocked.is_err());
assert!(blocked.unwrap_err().contains("high-risk"));
}
#[test]
fn empty_command_blocked() {
let p = default_policy();
assert!(!p.is_command_allowed(""));
assert!(!p.is_command_allowed(" "));
}
#[test]
fn command_with_pipes_validates_all_segments() {
let p = default_policy();
assert!(p.is_command_allowed("ls | grep foo"));
assert!(p.is_command_allowed("cat file.txt | wc -l"));
assert!(!p.is_command_allowed("ls | curl http://evil.com"));
assert!(!p.is_command_allowed("echo hello | python3 -"));
}
#[test]
fn custom_allowlist() {
let p = SecurityPolicy {
allowed_commands: vec!["docker".into(), "kubectl".into()],
..SecurityPolicy::default()
};
assert!(p.is_command_allowed("docker ps"));
assert!(p.is_command_allowed("kubectl get pods"));
assert!(!p.is_command_allowed("ls"));
assert!(!p.is_command_allowed("git status"));
}
#[test]
fn empty_allowlist_blocks_everything() {
let p = SecurityPolicy {
allowed_commands: vec![],
..SecurityPolicy::default()
};
assert!(!p.is_command_allowed("ls"));
assert!(!p.is_command_allowed("echo hello"));
}
#[test]
fn command_risk_low_for_read_commands() {
let p = default_policy();
assert_eq!(p.command_risk_level("git status"), CommandRiskLevel::Low);
assert_eq!(p.command_risk_level("ls -la"), CommandRiskLevel::Low);
}
#[test]
fn command_risk_medium_for_mutating_commands() {
let p = SecurityPolicy {
allowed_commands: vec!["git".into(), "touch".into()],
..SecurityPolicy::default()
};
assert_eq!(
p.command_risk_level("git reset --hard HEAD~1"),
CommandRiskLevel::Medium
);
assert_eq!(
p.command_risk_level("touch file.txt"),
CommandRiskLevel::Medium
);
}
#[test]
fn command_risk_high_for_dangerous_commands() {
let p = SecurityPolicy {
allowed_commands: vec!["rm".into()],
..SecurityPolicy::default()
};
assert_eq!(
p.command_risk_level("rm -rf /tmp/test"),
CommandRiskLevel::High
);
}
#[test]
fn validate_command_requires_approval_for_medium_risk() {
let p = SecurityPolicy {
autonomy: AutonomyLevel::Supervised,
require_approval_for_medium_risk: true,
allowed_commands: vec!["touch".into()],
..SecurityPolicy::default()
};
let denied = p.validate_command_execution("touch test.txt", false);
assert!(denied.is_err());
assert!(denied.unwrap_err().contains("requires explicit approval"),);
let allowed = p.validate_command_execution("touch test.txt", true);
assert_eq!(allowed.unwrap(), CommandRiskLevel::Medium);
}
#[test]
fn validate_command_blocks_high_risk_by_default() {
let p = SecurityPolicy {
autonomy: AutonomyLevel::Supervised,
allowed_commands: vec!["rm".into()],
..SecurityPolicy::default()
};
let result = p.validate_command_execution("rm -rf /tmp/test", true);
assert!(result.is_err());
assert!(result.unwrap_err().contains("high-risk"));
}
#[test]
fn validate_command_full_mode_skips_medium_risk_approval_gate() {
let p = SecurityPolicy {
autonomy: AutonomyLevel::Full,
require_approval_for_medium_risk: true,
allowed_commands: vec!["touch".into()],
..SecurityPolicy::default()
};
let result = p.validate_command_execution("touch test.txt", false);
assert_eq!(result.unwrap(), CommandRiskLevel::Medium);
}
#[test]
fn validate_command_rejects_background_chain_bypass() {
let p = default_policy();
let result = p.validate_command_execution("ls & python3 -c 'print(1)'", false);
assert!(result.is_err());
assert!(result.unwrap_err().contains("not allowed"));
}
#[test]
fn relative_paths_allowed() {
let p = default_policy();
assert!(p.is_path_allowed("file.txt"));
assert!(p.is_path_allowed("src/main.rs"));
assert!(p.is_path_allowed("deep/nested/dir/file.txt"));
}
#[test]
fn path_traversal_blocked() {
let p = default_policy();
assert!(!p.is_path_allowed("../etc/passwd"));
assert!(!p.is_path_allowed("../../root/.ssh/id_rsa"));
assert!(!p.is_path_allowed("foo/../../../etc/shadow"));
assert!(!p.is_path_allowed(".."));
}
#[test]
fn absolute_paths_blocked_when_workspace_only() {
let p = default_policy();
assert!(!p.is_path_allowed("/etc/passwd"));
assert!(!p.is_path_allowed("/root/.ssh/id_rsa"));
assert!(!p.is_path_allowed("/tmp/file.txt"));
}
#[test]
fn absolute_paths_allowed_when_not_workspace_only() {
let p = SecurityPolicy {
workspace_only: false,
forbidden_paths: vec![],
..SecurityPolicy::default()
};
assert!(p.is_path_allowed("/tmp/file.txt"));
}
#[test]
fn forbidden_paths_blocked() {
let p = SecurityPolicy {
workspace_only: false,
..SecurityPolicy::default()
};
assert!(!p.is_path_allowed("/etc/passwd"));
assert!(!p.is_path_allowed("/root/.bashrc"));
assert!(!p.is_path_allowed("~/.ssh/id_rsa"));
assert!(!p.is_path_allowed("~/.gnupg/pubring.kbx"));
}
#[test]
fn empty_path_allowed() {
let p = default_policy();
assert!(p.is_path_allowed(""));
}
#[test]
fn dotfile_in_workspace_allowed() {
let p = default_policy();
assert!(p.is_path_allowed(".gitignore"));
assert!(p.is_path_allowed(".env"));
}
#[test]
fn from_config_maps_all_fields() {
let autonomy_config = crate::config::AutonomyConfig {
level: AutonomyLevel::Full,
workspace_only: false,
allowed_commands: vec!["docker".into()],
forbidden_paths: vec!["/secret".into()],
max_actions_per_hour: 100,
max_cost_per_day_cents: 1000,
require_approval_for_medium_risk: false,
block_high_risk_commands: false,
shell_env_passthrough: vec!["DATABASE_URL".into()],
..crate::config::AutonomyConfig::default()
};
let workspace = PathBuf::from("/tmp/test-workspace");
let policy = SecurityPolicy::from_config(&autonomy_config, &workspace);
assert_eq!(policy.autonomy, AutonomyLevel::Full);
assert!(!policy.workspace_only);
assert_eq!(policy.allowed_commands, vec!["docker"]);
assert_eq!(policy.forbidden_paths, vec!["/secret"]);
assert_eq!(policy.max_actions_per_hour, 100);
assert_eq!(policy.max_cost_per_day_cents, 1000);
assert!(!policy.require_approval_for_medium_risk);
assert!(!policy.block_high_risk_commands);
assert_eq!(policy.shell_env_passthrough, vec!["DATABASE_URL"]);
assert_eq!(policy.workspace_dir, PathBuf::from("/tmp/test-workspace"));
}
#[test]
fn from_config_normalizes_allowed_roots() {
let autonomy_config = crate::config::AutonomyConfig {
allowed_roots: vec!["~/Desktop".into(), "shared-data".into()],
..crate::config::AutonomyConfig::default()
};
let workspace = PathBuf::from("/tmp/test-workspace");
let policy = SecurityPolicy::from_config(&autonomy_config, &workspace);
let expected_home_root = if let Some(home) = std::env::var_os("HOME") {
PathBuf::from(home).join("Desktop")
} else {
PathBuf::from("~/Desktop")
};
assert_eq!(policy.allowed_roots[0], expected_home_root);
assert_eq!(policy.allowed_roots[1], workspace.join("shared-data"));
}
#[test]
fn resolved_path_violation_message_includes_allowed_roots_guidance() {
let p = default_policy();
let msg = p.resolved_path_violation_message(Path::new("/tmp/outside.txt"));
assert!(msg.contains("escapes workspace"));
assert!(msg.contains("allowed_roots"));
}
#[test]
fn default_policy_has_sane_values() {
let p = SecurityPolicy::default();
assert_eq!(p.autonomy, AutonomyLevel::Supervised);
assert!(p.workspace_only);
assert!(!p.allowed_commands.is_empty());
assert!(!p.forbidden_paths.is_empty());
assert!(p.max_actions_per_hour > 0);
assert!(p.max_cost_per_day_cents > 0);
assert!(p.require_approval_for_medium_risk);
assert!(p.block_high_risk_commands);
assert!(p.shell_env_passthrough.is_empty());
}
#[test]
fn action_tracker_starts_at_zero() {
let tracker = ActionTracker::new();
assert_eq!(tracker.count(), 0);
}
#[test]
fn action_tracker_records_actions() {
let tracker = ActionTracker::new();
assert_eq!(tracker.record(), 1);
assert_eq!(tracker.record(), 2);
assert_eq!(tracker.record(), 3);
assert_eq!(tracker.count(), 3);
}
#[test]
fn record_action_allows_within_limit() {
let p = SecurityPolicy {
max_actions_per_hour: 5,
..SecurityPolicy::default()
};
for _ in 0..5 {
assert!(p.record_action(), "should allow actions within limit");
}
}
#[test]
fn record_action_blocks_over_limit() {
let p = SecurityPolicy {
max_actions_per_hour: 3,
..SecurityPolicy::default()
};
assert!(p.record_action()); assert!(p.record_action()); assert!(p.record_action()); assert!(!p.record_action()); }
#[test]
fn is_rate_limited_reflects_count() {
let p = SecurityPolicy {
max_actions_per_hour: 2,
..SecurityPolicy::default()
};
assert!(!p.is_rate_limited());
p.record_action();
assert!(!p.is_rate_limited());
p.record_action();
assert!(p.is_rate_limited());
}
#[test]
fn action_tracker_clone_is_independent() {
let tracker = ActionTracker::new();
tracker.record();
tracker.record();
let cloned = tracker.clone();
assert_eq!(cloned.count(), 2);
tracker.record();
assert_eq!(tracker.count(), 3);
assert_eq!(cloned.count(), 2); }
#[test]
fn command_injection_semicolon_blocked() {
let p = default_policy();
assert!(!p.is_command_allowed("ls; rm -rf /"));
}
#[test]
fn command_injection_semicolon_no_space() {
let p = default_policy();
assert!(!p.is_command_allowed("ls;rm -rf /"));
}
#[test]
fn quoted_semicolons_do_not_split_sqlite_command() {
let p = SecurityPolicy {
allowed_commands: vec!["sqlite3".into()],
..SecurityPolicy::default()
};
assert!(p.is_command_allowed(
"sqlite3 /tmp/test.db \"CREATE TABLE t(id INT); INSERT INTO t VALUES(1); SELECT * FROM t;\""
));
assert_eq!(
p.command_risk_level(
"sqlite3 /tmp/test.db \"CREATE TABLE t(id INT); INSERT INTO t VALUES(1); SELECT * FROM t;\""
),
CommandRiskLevel::Low
);
}
#[test]
fn unquoted_semicolon_after_quoted_sql_still_splits_commands() {
let p = SecurityPolicy {
allowed_commands: vec!["sqlite3".into()],
..SecurityPolicy::default()
};
assert!(!p.is_command_allowed("sqlite3 /tmp/test.db \"SELECT 1;\"; rm -rf /"));
}
#[test]
fn command_injection_backtick_blocked() {
let p = default_policy();
assert!(!p.is_command_allowed("echo `whoami`"));
assert!(!p.is_command_allowed("echo `rm -rf /`"));
}
#[test]
fn command_injection_dollar_paren_blocked() {
let p = default_policy();
assert!(!p.is_command_allowed("echo $(cat /etc/passwd)"));
assert!(!p.is_command_allowed("echo $(rm -rf /)"));
}
#[test]
fn command_injection_dollar_paren_literal_inside_single_quotes_allowed() {
let p = default_policy();
assert!(p.is_command_allowed("echo '$(cat /etc/passwd)'"));
}
#[test]
fn command_injection_dollar_brace_literal_inside_single_quotes_allowed() {
let p = default_policy();
assert!(p.is_command_allowed("echo '${HOME}'"));
}
#[test]
fn command_injection_dollar_brace_unquoted_blocked() {
let p = default_policy();
assert!(!p.is_command_allowed("echo ${HOME}"));
}
#[test]
fn command_with_env_var_prefix() {
let p = default_policy();
assert!(!p.is_command_allowed("FOO=bar rm -rf /"));
}
#[test]
fn command_newline_injection_blocked() {
let p = default_policy();
assert!(!p.is_command_allowed("ls\nrm -rf /"));
assert!(p.is_command_allowed("ls\necho hello"));
}
#[test]
fn command_injection_and_chain_blocked() {
let p = default_policy();
assert!(!p.is_command_allowed("ls && rm -rf /"));
assert!(!p.is_command_allowed("echo ok && curl http://evil.com"));
assert!(p.is_command_allowed("ls && echo done"));
}
#[test]
fn command_injection_or_chain_blocked() {
let p = default_policy();
assert!(!p.is_command_allowed("ls || rm -rf /"));
assert!(p.is_command_allowed("ls || echo fallback"));
}
#[test]
fn command_injection_background_chain_blocked() {
let p = default_policy();
assert!(!p.is_command_allowed("ls & rm -rf /"));
assert!(!p.is_command_allowed("ls&rm -rf /"));
assert!(!p.is_command_allowed("echo ok & python3 -c 'print(1)'"));
}
#[test]
fn command_injection_redirect_blocked() {
let p = default_policy();
assert!(!p.is_command_allowed("echo secret > /etc/crontab"));
assert!(!p.is_command_allowed("ls >> /tmp/exfil.txt"));
assert!(!p.is_command_allowed("cat </etc/passwd"));
assert!(!p.is_command_allowed("cat</etc/passwd"));
}
#[test]
fn quoted_ampersand_and_redirect_literals_are_not_treated_as_operators() {
let p = default_policy();
assert!(p.is_command_allowed("echo \"A&B\""));
assert!(p.is_command_allowed("echo \"A>B\""));
assert!(p.is_command_allowed("echo \"A<B\""));
}
#[test]
fn command_argument_injection_blocked() {
let p = default_policy();
assert!(!p.is_command_allowed("find . -exec rm -rf {} +"));
assert!(!p.is_command_allowed("find / -ok cat {} \\;"));
assert!(!p.is_command_allowed("git config core.editor \"rm -rf /\""));
assert!(!p.is_command_allowed("git alias.st status"));
assert!(!p.is_command_allowed("git -c core.editor=calc.exe commit"));
assert!(p.is_command_allowed("find . -name '*.txt'"));
assert!(p.is_command_allowed("git status"));
assert!(p.is_command_allowed("git add ."));
}
#[test]
fn command_injection_dollar_brace_blocked() {
let p = default_policy();
assert!(!p.is_command_allowed("echo ${IFS}cat${IFS}/etc/passwd"));
}
#[test]
fn command_injection_plain_dollar_var_blocked() {
let p = default_policy();
assert!(!p.is_command_allowed("cat $HOME/.ssh/id_rsa"));
assert!(!p.is_command_allowed("cat $SECRET_FILE"));
}
#[test]
fn command_injection_tee_blocked() {
let p = default_policy();
assert!(!p.is_command_allowed("echo secret | tee /etc/crontab"));
assert!(!p.is_command_allowed("ls | /usr/bin/tee outfile"));
assert!(!p.is_command_allowed("tee file.txt"));
}
#[test]
fn command_injection_process_substitution_blocked() {
let p = default_policy();
assert!(!p.is_command_allowed("cat <(echo pwned)"));
assert!(!p.is_command_allowed("ls >(cat /etc/passwd)"));
}
#[test]
fn command_env_var_prefix_with_allowed_cmd() {
let p = default_policy();
assert!(p.is_command_allowed("FOO=bar ls"));
assert!(p.is_command_allowed("LANG=C grep pattern file"));
assert!(!p.is_command_allowed("FOO=bar rm -rf /"));
}
#[test]
fn forbidden_path_argument_detects_absolute_path() {
let p = default_policy();
assert_eq!(
p.forbidden_path_argument("cat /etc/passwd"),
Some("/etc/passwd".into())
);
}
#[test]
fn forbidden_path_argument_detects_parent_dir_reference() {
let p = default_policy();
assert_eq!(
p.forbidden_path_argument("cat ../secret.txt"),
Some("../secret.txt".into())
);
assert_eq!(
p.forbidden_path_argument("find .. -name '*.rs'"),
Some("..".into())
);
}
#[test]
fn forbidden_path_argument_allows_workspace_relative_paths() {
let p = default_policy();
assert_eq!(p.forbidden_path_argument("cat src/main.rs"), None);
assert_eq!(p.forbidden_path_argument("grep -r todo ./src"), None);
}
#[test]
fn forbidden_path_argument_detects_option_assignment_paths() {
let p = default_policy();
assert_eq!(
p.forbidden_path_argument("grep --file=/etc/passwd root ./src"),
Some("/etc/passwd".into())
);
assert_eq!(
p.forbidden_path_argument("cat --input=../secret.txt"),
Some("../secret.txt".into())
);
}
#[test]
fn forbidden_path_argument_allows_safe_option_assignment_paths() {
let p = default_policy();
assert_eq!(
p.forbidden_path_argument("grep --file=./patterns.txt root ./src"),
None
);
}
#[test]
fn forbidden_path_argument_detects_short_option_attached_paths() {
let p = default_policy();
assert_eq!(
p.forbidden_path_argument("grep -f/etc/passwd root ./src"),
Some("/etc/passwd".into())
);
assert_eq!(
p.forbidden_path_argument("git -C../outside status"),
Some("../outside".into())
);
}
#[test]
fn forbidden_path_argument_allows_safe_short_option_attached_paths() {
let p = default_policy();
assert_eq!(
p.forbidden_path_argument("grep -f./patterns.txt root ./src"),
None
);
assert_eq!(p.forbidden_path_argument("git -C./repo status"), None);
}
#[test]
fn forbidden_path_argument_detects_tilde_user_paths() {
let p = default_policy();
assert_eq!(
p.forbidden_path_argument("cat ~root/.ssh/id_rsa"),
Some("~root/.ssh/id_rsa".into())
);
assert_eq!(
p.forbidden_path_argument("ls ~nobody"),
Some("~nobody".into())
);
}
#[test]
fn forbidden_path_argument_detects_input_redirection_paths() {
let p = default_policy();
assert_eq!(
p.forbidden_path_argument("cat </etc/passwd"),
Some("/etc/passwd".into())
);
assert_eq!(
p.forbidden_path_argument("cat</etc/passwd"),
Some("/etc/passwd".into())
);
}
#[test]
fn path_traversal_encoded_dots() {
let p = default_policy();
assert!(!p.is_path_allowed("foo/..%2f..%2fetc/passwd"));
}
#[test]
fn path_traversal_double_dot_in_filename() {
let p = default_policy();
assert!(p.is_path_allowed("my..file.txt"));
assert!(!p.is_path_allowed("../etc/passwd"));
assert!(!p.is_path_allowed("foo/../etc/passwd"));
}
#[test]
fn path_with_null_byte_blocked() {
let p = default_policy();
assert!(!p.is_path_allowed("file\0.txt"));
}
#[test]
fn path_symlink_style_absolute() {
let p = default_policy();
assert!(!p.is_path_allowed("/proc/self/root/etc/passwd"));
}
#[test]
fn path_home_tilde_ssh() {
let p = SecurityPolicy {
workspace_only: false,
..SecurityPolicy::default()
};
assert!(!p.is_path_allowed("~/.ssh/id_rsa"));
assert!(!p.is_path_allowed("~/.gnupg/secring.gpg"));
assert!(!p.is_path_allowed("~root/.ssh/id_rsa"));
assert!(!p.is_path_allowed("~nobody"));
}
#[test]
fn path_var_run_blocked() {
let p = SecurityPolicy {
workspace_only: false,
..SecurityPolicy::default()
};
assert!(!p.is_path_allowed("/var/run/docker.sock"));
}
#[test]
fn rate_limit_exactly_at_boundary() {
let p = SecurityPolicy {
max_actions_per_hour: 1,
..SecurityPolicy::default()
};
assert!(p.record_action()); assert!(!p.record_action()); assert!(!p.record_action()); }
#[test]
fn rate_limit_zero_blocks_everything() {
let p = SecurityPolicy {
max_actions_per_hour: 0,
..SecurityPolicy::default()
};
assert!(!p.record_action());
}
#[test]
fn rate_limit_high_allows_many() {
let p = SecurityPolicy {
max_actions_per_hour: 10000,
..SecurityPolicy::default()
};
for _ in 0..100 {
assert!(p.record_action());
}
}
#[test]
fn readonly_blocks_even_safe_commands() {
let p = SecurityPolicy {
autonomy: AutonomyLevel::ReadOnly,
allowed_commands: vec!["ls".into(), "cat".into()],
..SecurityPolicy::default()
};
assert!(!p.is_command_allowed("ls"));
assert!(!p.is_command_allowed("cat"));
assert!(!p.can_act());
}
#[test]
fn supervised_allows_listed_commands() {
let p = SecurityPolicy {
autonomy: AutonomyLevel::Supervised,
allowed_commands: vec!["git".into()],
..SecurityPolicy::default()
};
assert!(p.is_command_allowed("git status"));
assert!(!p.is_command_allowed("docker ps"));
}
#[test]
fn full_autonomy_still_respects_forbidden_paths() {
let p = SecurityPolicy {
autonomy: AutonomyLevel::Full,
workspace_only: false,
..SecurityPolicy::default()
};
assert!(!p.is_path_allowed("/etc/shadow"));
assert!(!p.is_path_allowed("/root/.bashrc"));
}
#[test]
fn workspace_only_false_allows_resolved_outside_workspace() {
let workspace = std::env::temp_dir().join("zeroclaw_test_ws_only_false");
let _ = std::fs::create_dir_all(&workspace);
let canonical_workspace = workspace
.canonicalize()
.unwrap_or_else(|_| workspace.clone());
let p = SecurityPolicy {
workspace_dir: canonical_workspace.clone(),
workspace_only: false,
forbidden_paths: vec!["/etc".into(), "/var".into()],
..SecurityPolicy::default()
};
let outside = std::env::var_os("HOME")
.map(std::path::PathBuf::from)
.unwrap_or_else(|| PathBuf::from("/home"))
.join("zeroclaw_outside_ws");
assert!(
p.is_resolved_path_allowed(&outside),
"workspace_only=false must allow resolved paths outside workspace"
);
assert!(
!p.is_resolved_path_allowed(Path::new("/etc/passwd")),
"forbidden paths must be blocked even when workspace_only=false"
);
assert!(
!p.is_resolved_path_allowed(Path::new("/var/run/docker.sock")),
"forbidden /var must be blocked even when workspace_only=false"
);
let _ = std::fs::remove_dir_all(&workspace);
}
#[test]
fn workspace_only_true_blocks_resolved_outside_workspace() {
let workspace = std::env::temp_dir().join("zeroclaw_test_ws_only_true");
let _ = std::fs::create_dir_all(&workspace);
let canonical_workspace = workspace
.canonicalize()
.unwrap_or_else(|_| workspace.clone());
let p = SecurityPolicy {
workspace_dir: canonical_workspace.clone(),
workspace_only: true,
..SecurityPolicy::default()
};
let inside = canonical_workspace.join("subdir");
assert!(
p.is_resolved_path_allowed(&inside),
"path inside workspace must be allowed"
);
let outside = std::env::temp_dir()
.canonicalize()
.unwrap_or_else(|_| std::env::temp_dir())
.join("zeroclaw_outside_ws_true");
assert!(
!p.is_resolved_path_allowed(&outside),
"workspace_only=true must block resolved paths outside workspace"
);
let _ = std::fs::remove_dir_all(&workspace);
}
#[test]
fn from_config_creates_fresh_tracker() {
let autonomy_config = crate::config::AutonomyConfig {
level: AutonomyLevel::Full,
workspace_only: false,
allowed_commands: vec![],
forbidden_paths: vec![],
max_actions_per_hour: 10,
max_cost_per_day_cents: 100,
require_approval_for_medium_risk: true,
block_high_risk_commands: true,
..crate::config::AutonomyConfig::default()
};
let workspace = PathBuf::from("/tmp/test");
let policy = SecurityPolicy::from_config(&autonomy_config, &workspace);
assert_eq!(policy.tracker.count(), 0);
assert!(!policy.is_rate_limited());
}
#[test]
fn checklist_root_path_blocked() {
let p = default_policy();
assert!(!p.is_path_allowed("/"));
assert!(!p.is_path_allowed("/anything"));
}
#[test]
fn checklist_all_system_dirs_blocked() {
let p = SecurityPolicy {
workspace_only: false,
..SecurityPolicy::default()
};
for dir in [
"/etc", "/root", "/home", "/usr", "/bin", "/sbin", "/lib", "/opt", "/boot", "/dev",
"/proc", "/sys", "/var", "/tmp",
] {
assert!(
!p.is_path_allowed(dir),
"System dir should be blocked: {dir}"
);
assert!(
!p.is_path_allowed(&format!("{dir}/subpath")),
"Subpath of system dir should be blocked: {dir}/subpath"
);
}
}
#[test]
fn checklist_sensitive_dotfiles_blocked() {
let p = SecurityPolicy {
workspace_only: false,
..SecurityPolicy::default()
};
for path in [
"~/.ssh/id_rsa",
"~/.gnupg/secring.gpg",
"~/.aws/credentials",
"~/.config/secrets",
] {
assert!(
!p.is_path_allowed(path),
"Sensitive dotfile should be blocked: {path}"
);
}
}
#[test]
fn checklist_null_byte_injection_blocked() {
let p = default_policy();
assert!(!p.is_path_allowed("safe\0/../../../etc/passwd"));
assert!(!p.is_path_allowed("\0"));
assert!(!p.is_path_allowed("file\0"));
}
#[test]
fn checklist_workspace_only_blocks_all_absolute() {
let p = SecurityPolicy {
workspace_only: true,
..SecurityPolicy::default()
};
assert!(!p.is_path_allowed("/any/absolute/path"));
assert!(p.is_path_allowed("relative/path.txt"));
}
#[test]
fn checklist_resolved_path_must_be_in_workspace() {
let p = SecurityPolicy {
workspace_dir: PathBuf::from("/home/user/project"),
..SecurityPolicy::default()
};
assert!(p.is_resolved_path_allowed(Path::new("/home/user/project/src/main.rs")));
assert!(!p.is_resolved_path_allowed(Path::new("/etc/passwd")));
assert!(!p.is_resolved_path_allowed(Path::new("/home/user/other_project/file")));
assert!(!p.is_resolved_path_allowed(Path::new("/")));
}
#[test]
fn checklist_default_policy_is_workspace_only() {
let p = SecurityPolicy::default();
assert!(
p.workspace_only,
"Default policy must be workspace_only=true"
);
}
#[test]
fn checklist_default_forbidden_paths_comprehensive() {
let p = SecurityPolicy::default();
for dir in ["/etc", "/root", "/proc", "/sys", "/dev", "/var", "/tmp"] {
assert!(
p.forbidden_paths.iter().any(|f| f == dir),
"Default forbidden_paths must include {dir}"
);
}
for dot in ["~/.ssh", "~/.gnupg", "~/.aws"] {
assert!(
p.forbidden_paths.iter().any(|f| f == dot),
"Default forbidden_paths must include {dot}"
);
}
}
#[test]
fn resolved_path_blocks_outside_workspace() {
let workspace = std::env::temp_dir().join("zeroclaw_test_resolved_path");
let _ = std::fs::create_dir_all(&workspace);
let canonical_workspace = workspace
.canonicalize()
.unwrap_or_else(|_| workspace.clone());
let policy = SecurityPolicy {
workspace_dir: canonical_workspace.clone(),
..SecurityPolicy::default()
};
let inside = canonical_workspace.join("subdir").join("file.txt");
assert!(
policy.is_resolved_path_allowed(&inside),
"path inside workspace should be allowed"
);
let canonical_temp = std::env::temp_dir()
.canonicalize()
.unwrap_or_else(|_| std::env::temp_dir());
let outside = canonical_temp.join("outside_workspace_zeroclaw");
assert!(
!policy.is_resolved_path_allowed(&outside),
"path outside workspace must be blocked"
);
let _ = std::fs::remove_dir_all(&workspace);
}
#[test]
fn resolved_path_blocks_root_escape() {
let policy = SecurityPolicy {
workspace_dir: PathBuf::from("/home/zeroclaw_user/project"),
..SecurityPolicy::default()
};
assert!(
!policy.is_resolved_path_allowed(Path::new("/etc/passwd")),
"resolved path to /etc/passwd must be blocked"
);
assert!(
!policy.is_resolved_path_allowed(Path::new("/root/.bashrc")),
"resolved path to /root/.bashrc must be blocked"
);
}
#[cfg(unix)]
#[test]
fn resolved_path_blocks_symlink_escape() {
use std::os::unix::fs::symlink;
let root = std::env::temp_dir().join("zeroclaw_test_symlink_escape");
let workspace = root.join("workspace");
let outside = root.join("outside_target");
let _ = std::fs::remove_dir_all(&root);
std::fs::create_dir_all(&workspace).unwrap();
std::fs::create_dir_all(&outside).unwrap();
let link_path = workspace.join("escape_link");
symlink(&outside, &link_path).unwrap();
let policy = SecurityPolicy {
workspace_dir: workspace.clone(),
..SecurityPolicy::default()
};
let resolved = link_path.canonicalize().unwrap();
assert!(
!policy.is_resolved_path_allowed(&resolved),
"symlink-resolved path outside workspace must be blocked"
);
let _ = std::fs::remove_dir_all(&root);
}
#[cfg(unix)]
#[test]
fn allowed_roots_permits_paths_outside_workspace() {
use std::os::unix::fs::symlink;
let root = std::env::temp_dir().join("zeroclaw_test_allowed_roots");
let workspace = root.join("workspace");
let extra = root.join("extra_root");
let extra_file = extra.join("data.txt");
let _ = std::fs::remove_dir_all(&root);
std::fs::create_dir_all(&workspace).unwrap();
std::fs::create_dir_all(&extra).unwrap();
std::fs::write(&extra_file, "test").unwrap();
let link_path = workspace.join("link_to_extra");
symlink(&extra, &link_path).unwrap();
let resolved = link_path.join("data.txt").canonicalize().unwrap();
let policy_without = SecurityPolicy {
workspace_dir: workspace.clone(),
allowed_roots: vec![],
..SecurityPolicy::default()
};
assert!(
!policy_without.is_resolved_path_allowed(&resolved),
"without allowed_roots, symlink target must be blocked"
);
let policy_with = SecurityPolicy {
workspace_dir: workspace.clone(),
allowed_roots: vec![extra.clone()],
..SecurityPolicy::default()
};
assert!(
policy_with.is_resolved_path_allowed(&resolved),
"with allowed_roots containing the target, symlink must be allowed"
);
let unrelated = root.join("unrelated");
std::fs::create_dir_all(&unrelated).unwrap();
assert!(
!policy_with.is_resolved_path_allowed(&unrelated.canonicalize().unwrap()),
"paths outside workspace and allowed_roots must still be blocked"
);
let _ = std::fs::remove_dir_all(&root);
}
#[test]
fn is_path_allowed_blocks_null_bytes() {
let policy = default_policy();
assert!(
!policy.is_path_allowed("file\0.txt"),
"paths with null bytes must be blocked"
);
}
#[test]
fn is_path_allowed_blocks_url_encoded_traversal() {
let policy = default_policy();
assert!(
!policy.is_path_allowed("..%2fetc%2fpasswd"),
"URL-encoded path traversal must be blocked"
);
assert!(
!policy.is_path_allowed("subdir%2f..%2f..%2fetc"),
"URL-encoded parent dir traversal must be blocked"
);
}
}