capo-agent 0.6.0

Coding-agent library built on motosan-agent-loop. Composable, embeddable.
Documentation
//! Declarative permission policy loaded from `permissions.toml`.
//!
//! Precedence: hard-blocked paths always win, then persistent allowlist,
//! then session cache (handled separately), then prompt.

use std::path::Path;

use globset::{Glob, GlobSetBuilder};
use regex::Regex;
use serde::Deserialize;

#[derive(Debug, Default, Deserialize)]
pub struct Policy {
    #[serde(default)]
    pub bash: BashPolicy,
    #[serde(default)]
    pub write: PathPolicy,
    #[serde(default)]
    pub edit: Option<PathPolicy>,
    #[serde(default)]
    pub mcp: std::collections::HashMap<String, McpPolicy>,
}

#[derive(Debug, Default, Deserialize)]
pub struct BashPolicy {
    #[serde(default)]
    pub exact: Vec<String>,
    #[serde(default)]
    pub prefix: Vec<String>,
    #[serde(default)]
    pub regex: Vec<String>,
}

#[derive(Debug, Default, Deserialize)]
pub struct PathPolicy {
    #[serde(default)]
    pub allowed_paths: Vec<String>,
    #[serde(default)]
    pub blocked_paths: Vec<String>,
}

#[derive(Debug, Default, Deserialize)]
pub struct McpPolicy {
    #[serde(default)]
    pub auto_allow: Vec<String>,
}

impl Policy {
    pub fn load_or_default(path: &Path) -> Result<Self, PolicyError> {
        if !path.exists() {
            return Ok(Self::default());
        }
        let raw = std::fs::read_to_string(path).map_err(PolicyError::Io)?;
        let parsed: Policy = toml::from_str(&raw).map_err(PolicyError::Toml)?;
        parsed.validate_globs()?;
        Ok(parsed)
    }

    fn validate_globs(&self) -> Result<(), PolicyError> {
        validate_path_policy_globs("write", &self.write)?;
        if let Some(edit) = &self.edit {
            validate_path_policy_globs("edit", edit)?;
        }
        Ok(())
    }

    pub fn bash_is_allowed(&self, command: &str) -> bool {
        let trimmed = command.trim();
        if self.bash.exact.iter().any(|e| e == trimmed) {
            return true;
        }
        if self.bash.prefix.iter().any(|p| trimmed.starts_with(p)) {
            return true;
        }
        for r in &self.bash.regex {
            if let Ok(re) = Regex::new(r) {
                if re.is_match(trimmed) {
                    return true;
                }
            }
        }
        false
    }

    pub fn write_is_allowed(&self, path: &Path, project_root: &Path) -> bool {
        path_policy_is_allowed(&self.write, path, project_root)
    }

    pub fn write_is_blocked(&self, path: &Path, project_root: &Path) -> bool {
        path_policy_is_blocked(&self.write, path, project_root)
    }

    pub fn edit_is_allowed(&self, path: &Path, project_root: &Path) -> bool {
        path_policy_is_allowed(
            self.edit.as_ref().unwrap_or(&self.write),
            path,
            project_root,
        )
    }

    pub fn edit_is_blocked(&self, path: &Path, project_root: &Path) -> bool {
        path_policy_is_blocked(
            self.edit.as_ref().unwrap_or(&self.write),
            path,
            project_root,
        )
    }

    pub fn mcp_auto_allow(&self, server: &str, tool: &str) -> bool {
        self.mcp
            .get(server)
            .map(|p| p.auto_allow.iter().any(|t| t == tool))
            .unwrap_or(false)
    }
}

fn path_policy_is_allowed(policy: &PathPolicy, path: &Path, project_root: &Path) -> bool {
    path_matches(&policy.allowed_paths, path, project_root)
        && !path_matches(&policy.blocked_paths, path, project_root)
}

fn path_policy_is_blocked(policy: &PathPolicy, path: &Path, project_root: &Path) -> bool {
    path_matches(&policy.blocked_paths, path, project_root)
}

fn validate_path_policy_globs(scope: &'static str, policy: &PathPolicy) -> Result<(), PolicyError> {
    for pattern in policy.allowed_paths.iter().chain(&policy.blocked_paths) {
        Glob::new(pattern).map_err(|source| PolicyError::Glob {
            scope,
            pattern: pattern.clone(),
            source,
        })?;
    }
    Ok(())
}

fn path_matches(patterns: &[String], path: &Path, project_root: &Path) -> bool {
    if patterns.is_empty() {
        return false;
    }
    let mut builder = GlobSetBuilder::new();
    for p in patterns {
        if let Ok(glob) = Glob::new(p) {
            builder.add(glob);
        }
    }
    let Ok(set) = builder.build() else {
        return false;
    };
    let rel = path.strip_prefix(project_root).unwrap_or(path);
    set.is_match(rel) || set.is_match(path)
}

#[derive(Debug, thiserror::Error)]
pub enum PolicyError {
    #[error("io: {0}")]
    Io(#[from] std::io::Error),
    #[error("toml: {0}")]
    Toml(#[from] toml::de::Error),
    #[error("invalid {scope} path glob {pattern:?}: {source}")]
    Glob {
        scope: &'static str,
        pattern: String,
        #[source]
        source: globset::Error,
    },
}

#[cfg(test)]
mod tests {
    #![allow(clippy::unwrap_used)]

    use super::*;

    fn fixture() -> Policy {
        toml::from_str(
            r#"
[bash]
exact = ["git status", "cargo fmt"]
prefix = ["cargo test "]
regex = ["^ls( .*)?$"]

[write]
allowed_paths = ["src/**"]
blocked_paths = [".env*"]

[mcp.github]
auto_allow = ["list_issues"]
"#,
        )
        .unwrap()
    }

    #[test]
    fn bash_exact_matches() {
        assert!(fixture().bash_is_allowed("git status"));
        assert!(!fixture().bash_is_allowed("git push"));
    }

    #[test]
    fn bash_prefix_matches_only_when_prefix() {
        assert!(fixture().bash_is_allowed("cargo test --all"));
        assert!(!fixture().bash_is_allowed("cargo tests"));
    }

    #[test]
    fn bash_regex_matches() {
        assert!(fixture().bash_is_allowed("ls"));
        assert!(fixture().bash_is_allowed("ls -la"));
        assert!(!fixture().bash_is_allowed("list"));
    }

    #[test]
    fn mcp_auto_allow_is_exact_per_tool() {
        let p = fixture();
        assert!(p.mcp_auto_allow("github", "list_issues"));
        assert!(!p.mcp_auto_allow("github", "create_issue"));
        assert!(!p.mcp_auto_allow("notion", "list_issues"));
    }

    #[test]
    fn write_allowed_checks_blocked_first() {
        let p = fixture();
        let root = std::path::Path::new("/proj");
        assert!(p.write_is_allowed(&root.join("src/main.rs"), root));
        assert!(!p.write_is_allowed(&root.join(".env.local"), root));
    }

    #[test]
    fn edit_policy_overrides_write_policy_when_present() {
        let p: Policy = toml::from_str(
            r#"
[write]
allowed_paths = ["src/**"]

[edit]
allowed_paths = ["docs/**"]
blocked_paths = ["docs/secrets/**"]
"#,
        )
        .unwrap();
        let root = std::path::Path::new("/proj");

        assert!(p.write_is_allowed(&root.join("src/main.rs"), root));
        assert!(!p.edit_is_allowed(&root.join("src/main.rs"), root));
        assert!(p.edit_is_allowed(&root.join("docs/readme.md"), root));
        assert!(p.edit_is_blocked(&root.join("docs/secrets/key.md"), root));
    }

    #[test]
    fn load_rejects_invalid_path_globs() {
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("permissions.toml");
        std::fs::write(
            &path,
            r#"
[write]
blocked_paths = ["["]
"#,
        )
        .unwrap();

        let err = Policy::load_or_default(&path).unwrap_err();
        assert!(matches!(err, PolicyError::Glob { scope: "write", .. }));
    }
}