Skip to main content

capo_agent/permissions/
policy.rs

1//! Declarative permission policy loaded from `permissions.toml`.
2//!
3//! Precedence: hard-blocked paths always win, then persistent allowlist,
4//! then session cache (handled separately), then prompt.
5
6use std::path::Path;
7
8use globset::{Glob, GlobSetBuilder};
9use regex::Regex;
10use serde::Deserialize;
11
12#[derive(Debug, Default, Deserialize)]
13pub struct Policy {
14    #[serde(default)]
15    pub bash: BashPolicy,
16    #[serde(default)]
17    pub write: PathPolicy,
18    #[serde(default)]
19    pub edit: Option<PathPolicy>,
20    #[serde(default)]
21    pub mcp: std::collections::HashMap<String, McpPolicy>,
22}
23
24#[derive(Debug, Default, Deserialize)]
25pub struct BashPolicy {
26    #[serde(default)]
27    pub exact: Vec<String>,
28    #[serde(default)]
29    pub prefix: Vec<String>,
30    #[serde(default)]
31    pub regex: Vec<String>,
32}
33
34#[derive(Debug, Default, Deserialize)]
35pub struct PathPolicy {
36    #[serde(default)]
37    pub allowed_paths: Vec<String>,
38    #[serde(default)]
39    pub blocked_paths: Vec<String>,
40}
41
42#[derive(Debug, Default, Deserialize)]
43pub struct McpPolicy {
44    #[serde(default)]
45    pub auto_allow: Vec<String>,
46}
47
48impl Policy {
49    pub fn load_or_default(path: &Path) -> Result<Self, PolicyError> {
50        if !path.exists() {
51            return Ok(Self::default());
52        }
53        let raw = std::fs::read_to_string(path).map_err(PolicyError::Io)?;
54        let parsed: Policy = toml::from_str(&raw).map_err(PolicyError::Toml)?;
55        parsed.validate_globs()?;
56        Ok(parsed)
57    }
58
59    fn validate_globs(&self) -> Result<(), PolicyError> {
60        validate_path_policy_globs("write", &self.write)?;
61        if let Some(edit) = &self.edit {
62            validate_path_policy_globs("edit", edit)?;
63        }
64        Ok(())
65    }
66
67    pub fn bash_is_allowed(&self, command: &str) -> bool {
68        let trimmed = command.trim();
69        if self.bash.exact.iter().any(|e| e == trimmed) {
70            return true;
71        }
72        if self.bash.prefix.iter().any(|p| trimmed.starts_with(p)) {
73            return true;
74        }
75        for r in &self.bash.regex {
76            if let Ok(re) = Regex::new(r) {
77                if re.is_match(trimmed) {
78                    return true;
79                }
80            }
81        }
82        false
83    }
84
85    pub fn write_is_allowed(&self, path: &Path, project_root: &Path) -> bool {
86        path_policy_is_allowed(&self.write, path, project_root)
87    }
88
89    pub fn write_is_blocked(&self, path: &Path, project_root: &Path) -> bool {
90        path_policy_is_blocked(&self.write, path, project_root)
91    }
92
93    pub fn edit_is_allowed(&self, path: &Path, project_root: &Path) -> bool {
94        path_policy_is_allowed(
95            self.edit.as_ref().unwrap_or(&self.write),
96            path,
97            project_root,
98        )
99    }
100
101    pub fn edit_is_blocked(&self, path: &Path, project_root: &Path) -> bool {
102        path_policy_is_blocked(
103            self.edit.as_ref().unwrap_or(&self.write),
104            path,
105            project_root,
106        )
107    }
108
109    pub fn mcp_auto_allow(&self, server: &str, tool: &str) -> bool {
110        self.mcp
111            .get(server)
112            .map(|p| p.auto_allow.iter().any(|t| t == tool))
113            .unwrap_or(false)
114    }
115}
116
117fn path_policy_is_allowed(policy: &PathPolicy, path: &Path, project_root: &Path) -> bool {
118    path_matches(&policy.allowed_paths, path, project_root)
119        && !path_matches(&policy.blocked_paths, path, project_root)
120}
121
122fn path_policy_is_blocked(policy: &PathPolicy, path: &Path, project_root: &Path) -> bool {
123    path_matches(&policy.blocked_paths, path, project_root)
124}
125
126fn validate_path_policy_globs(scope: &'static str, policy: &PathPolicy) -> Result<(), PolicyError> {
127    for pattern in policy.allowed_paths.iter().chain(&policy.blocked_paths) {
128        Glob::new(pattern).map_err(|source| PolicyError::Glob {
129            scope,
130            pattern: pattern.clone(),
131            source,
132        })?;
133    }
134    Ok(())
135}
136
137fn path_matches(patterns: &[String], path: &Path, project_root: &Path) -> bool {
138    if patterns.is_empty() {
139        return false;
140    }
141    let mut builder = GlobSetBuilder::new();
142    for p in patterns {
143        if let Ok(glob) = Glob::new(p) {
144            builder.add(glob);
145        }
146    }
147    let Ok(set) = builder.build() else {
148        return false;
149    };
150    let rel = path.strip_prefix(project_root).unwrap_or(path);
151    set.is_match(rel) || set.is_match(path)
152}
153
154#[derive(Debug, thiserror::Error)]
155pub enum PolicyError {
156    #[error("io: {0}")]
157    Io(#[from] std::io::Error),
158    #[error("toml: {0}")]
159    Toml(#[from] toml::de::Error),
160    #[error("invalid {scope} path glob {pattern:?}: {source}")]
161    Glob {
162        scope: &'static str,
163        pattern: String,
164        #[source]
165        source: globset::Error,
166    },
167}
168
169#[cfg(test)]
170mod tests {
171    #![allow(clippy::unwrap_used)]
172
173    use super::*;
174
175    fn fixture() -> Policy {
176        toml::from_str(
177            r#"
178[bash]
179exact = ["git status", "cargo fmt"]
180prefix = ["cargo test "]
181regex = ["^ls( .*)?$"]
182
183[write]
184allowed_paths = ["src/**"]
185blocked_paths = [".env*"]
186
187[mcp.github]
188auto_allow = ["list_issues"]
189"#,
190        )
191        .unwrap()
192    }
193
194    #[test]
195    fn bash_exact_matches() {
196        assert!(fixture().bash_is_allowed("git status"));
197        assert!(!fixture().bash_is_allowed("git push"));
198    }
199
200    #[test]
201    fn bash_prefix_matches_only_when_prefix() {
202        assert!(fixture().bash_is_allowed("cargo test --all"));
203        assert!(!fixture().bash_is_allowed("cargo tests"));
204    }
205
206    #[test]
207    fn bash_regex_matches() {
208        assert!(fixture().bash_is_allowed("ls"));
209        assert!(fixture().bash_is_allowed("ls -la"));
210        assert!(!fixture().bash_is_allowed("list"));
211    }
212
213    #[test]
214    fn mcp_auto_allow_is_exact_per_tool() {
215        let p = fixture();
216        assert!(p.mcp_auto_allow("github", "list_issues"));
217        assert!(!p.mcp_auto_allow("github", "create_issue"));
218        assert!(!p.mcp_auto_allow("notion", "list_issues"));
219    }
220
221    #[test]
222    fn write_allowed_checks_blocked_first() {
223        let p = fixture();
224        let root = std::path::Path::new("/proj");
225        assert!(p.write_is_allowed(&root.join("src/main.rs"), root));
226        assert!(!p.write_is_allowed(&root.join(".env.local"), root));
227    }
228
229    #[test]
230    fn edit_policy_overrides_write_policy_when_present() {
231        let p: Policy = toml::from_str(
232            r#"
233[write]
234allowed_paths = ["src/**"]
235
236[edit]
237allowed_paths = ["docs/**"]
238blocked_paths = ["docs/secrets/**"]
239"#,
240        )
241        .unwrap();
242        let root = std::path::Path::new("/proj");
243
244        assert!(p.write_is_allowed(&root.join("src/main.rs"), root));
245        assert!(!p.edit_is_allowed(&root.join("src/main.rs"), root));
246        assert!(p.edit_is_allowed(&root.join("docs/readme.md"), root));
247        assert!(p.edit_is_blocked(&root.join("docs/secrets/key.md"), root));
248    }
249
250    #[test]
251    fn load_rejects_invalid_path_globs() {
252        let dir = tempfile::tempdir().unwrap();
253        let path = dir.path().join("permissions.toml");
254        std::fs::write(
255            &path,
256            r#"
257[write]
258blocked_paths = ["["]
259"#,
260        )
261        .unwrap();
262
263        let err = Policy::load_or_default(&path).unwrap_err();
264        assert!(matches!(err, PolicyError::Glob { scope: "write", .. }));
265    }
266}