capo_agent/permissions/
policy.rs1use std::path::Path;
7
8use globset::{Glob, GlobSetBuilder};
9use regex::Regex;
10use serde::Deserialize;
11
12#[derive(Debug, Default, Deserialize)]
13pub struct Policy {
14 #[serde(default)]
15 pub bash: BashPolicy,
16 #[serde(default)]
17 pub write: PathPolicy,
18 #[serde(default)]
19 pub edit: Option<PathPolicy>,
20 #[serde(default)]
21 pub mcp: std::collections::HashMap<String, McpPolicy>,
22}
23
24#[derive(Debug, Default, Deserialize)]
25pub struct BashPolicy {
26 #[serde(default)]
27 pub exact: Vec<String>,
28 #[serde(default)]
29 pub prefix: Vec<String>,
30 #[serde(default)]
31 pub regex: Vec<String>,
32}
33
34#[derive(Debug, Default, Deserialize)]
35pub struct PathPolicy {
36 #[serde(default)]
37 pub allowed_paths: Vec<String>,
38 #[serde(default)]
39 pub blocked_paths: Vec<String>,
40}
41
42#[derive(Debug, Default, Deserialize)]
43pub struct McpPolicy {
44 #[serde(default)]
45 pub auto_allow: Vec<String>,
46}
47
48impl Policy {
49 pub fn load_or_default(path: &Path) -> Result<Self, PolicyError> {
50 if !path.exists() {
51 return Ok(Self::default());
52 }
53 let raw = std::fs::read_to_string(path).map_err(PolicyError::Io)?;
54 let parsed: Policy = toml::from_str(&raw).map_err(PolicyError::Toml)?;
55 parsed.validate_globs()?;
56 Ok(parsed)
57 }
58
59 fn validate_globs(&self) -> Result<(), PolicyError> {
60 validate_path_policy_globs("write", &self.write)?;
61 if let Some(edit) = &self.edit {
62 validate_path_policy_globs("edit", edit)?;
63 }
64 Ok(())
65 }
66
67 pub fn bash_is_allowed(&self, command: &str) -> bool {
68 let trimmed = command.trim();
69 if self.bash.exact.iter().any(|e| e == trimmed) {
70 return true;
71 }
72 if self.bash.prefix.iter().any(|p| trimmed.starts_with(p)) {
73 return true;
74 }
75 for r in &self.bash.regex {
76 if let Ok(re) = Regex::new(r) {
77 if re.is_match(trimmed) {
78 return true;
79 }
80 }
81 }
82 false
83 }
84
85 pub fn write_is_allowed(&self, path: &Path, project_root: &Path) -> bool {
86 path_policy_is_allowed(&self.write, path, project_root)
87 }
88
89 pub fn write_is_blocked(&self, path: &Path, project_root: &Path) -> bool {
90 path_policy_is_blocked(&self.write, path, project_root)
91 }
92
93 pub fn edit_is_allowed(&self, path: &Path, project_root: &Path) -> bool {
94 path_policy_is_allowed(
95 self.edit.as_ref().unwrap_or(&self.write),
96 path,
97 project_root,
98 )
99 }
100
101 pub fn edit_is_blocked(&self, path: &Path, project_root: &Path) -> bool {
102 path_policy_is_blocked(
103 self.edit.as_ref().unwrap_or(&self.write),
104 path,
105 project_root,
106 )
107 }
108
109 pub fn mcp_auto_allow(&self, server: &str, tool: &str) -> bool {
110 self.mcp
111 .get(server)
112 .map(|p| p.auto_allow.iter().any(|t| t == tool))
113 .unwrap_or(false)
114 }
115}
116
117fn path_policy_is_allowed(policy: &PathPolicy, path: &Path, project_root: &Path) -> bool {
118 path_matches(&policy.allowed_paths, path, project_root)
119 && !path_matches(&policy.blocked_paths, path, project_root)
120}
121
122fn path_policy_is_blocked(policy: &PathPolicy, path: &Path, project_root: &Path) -> bool {
123 path_matches(&policy.blocked_paths, path, project_root)
124}
125
126fn validate_path_policy_globs(scope: &'static str, policy: &PathPolicy) -> Result<(), PolicyError> {
127 for pattern in policy.allowed_paths.iter().chain(&policy.blocked_paths) {
128 Glob::new(pattern).map_err(|source| PolicyError::Glob {
129 scope,
130 pattern: pattern.clone(),
131 source,
132 })?;
133 }
134 Ok(())
135}
136
137fn path_matches(patterns: &[String], path: &Path, project_root: &Path) -> bool {
138 if patterns.is_empty() {
139 return false;
140 }
141 let mut builder = GlobSetBuilder::new();
142 for p in patterns {
143 if let Ok(glob) = Glob::new(p) {
144 builder.add(glob);
145 }
146 }
147 let Ok(set) = builder.build() else {
148 return false;
149 };
150 let rel = path.strip_prefix(project_root).unwrap_or(path);
151 set.is_match(rel) || set.is_match(path)
152}
153
154#[derive(Debug, thiserror::Error)]
155pub enum PolicyError {
156 #[error("io: {0}")]
157 Io(#[from] std::io::Error),
158 #[error("toml: {0}")]
159 Toml(#[from] toml::de::Error),
160 #[error("invalid {scope} path glob {pattern:?}: {source}")]
161 Glob {
162 scope: &'static str,
163 pattern: String,
164 #[source]
165 source: globset::Error,
166 },
167}
168
169#[cfg(test)]
170mod tests {
171 #![allow(clippy::unwrap_used)]
172
173 use super::*;
174
175 fn fixture() -> Policy {
176 toml::from_str(
177 r#"
178[bash]
179exact = ["git status", "cargo fmt"]
180prefix = ["cargo test "]
181regex = ["^ls( .*)?$"]
182
183[write]
184allowed_paths = ["src/**"]
185blocked_paths = [".env*"]
186
187[mcp.github]
188auto_allow = ["list_issues"]
189"#,
190 )
191 .unwrap()
192 }
193
194 #[test]
195 fn bash_exact_matches() {
196 assert!(fixture().bash_is_allowed("git status"));
197 assert!(!fixture().bash_is_allowed("git push"));
198 }
199
200 #[test]
201 fn bash_prefix_matches_only_when_prefix() {
202 assert!(fixture().bash_is_allowed("cargo test --all"));
203 assert!(!fixture().bash_is_allowed("cargo tests"));
204 }
205
206 #[test]
207 fn bash_regex_matches() {
208 assert!(fixture().bash_is_allowed("ls"));
209 assert!(fixture().bash_is_allowed("ls -la"));
210 assert!(!fixture().bash_is_allowed("list"));
211 }
212
213 #[test]
214 fn mcp_auto_allow_is_exact_per_tool() {
215 let p = fixture();
216 assert!(p.mcp_auto_allow("github", "list_issues"));
217 assert!(!p.mcp_auto_allow("github", "create_issue"));
218 assert!(!p.mcp_auto_allow("notion", "list_issues"));
219 }
220
221 #[test]
222 fn write_allowed_checks_blocked_first() {
223 let p = fixture();
224 let root = std::path::Path::new("/proj");
225 assert!(p.write_is_allowed(&root.join("src/main.rs"), root));
226 assert!(!p.write_is_allowed(&root.join(".env.local"), root));
227 }
228
229 #[test]
230 fn edit_policy_overrides_write_policy_when_present() {
231 let p: Policy = toml::from_str(
232 r#"
233[write]
234allowed_paths = ["src/**"]
235
236[edit]
237allowed_paths = ["docs/**"]
238blocked_paths = ["docs/secrets/**"]
239"#,
240 )
241 .unwrap();
242 let root = std::path::Path::new("/proj");
243
244 assert!(p.write_is_allowed(&root.join("src/main.rs"), root));
245 assert!(!p.edit_is_allowed(&root.join("src/main.rs"), root));
246 assert!(p.edit_is_allowed(&root.join("docs/readme.md"), root));
247 assert!(p.edit_is_blocked(&root.join("docs/secrets/key.md"), root));
248 }
249
250 #[test]
251 fn load_rejects_invalid_path_globs() {
252 let dir = tempfile::tempdir().unwrap();
253 let path = dir.path().join("permissions.toml");
254 std::fs::write(
255 &path,
256 r#"
257[write]
258blocked_paths = ["["]
259"#,
260 )
261 .unwrap();
262
263 let err = Policy::load_or_default(&path).unwrap_err();
264 assert!(matches!(err, PolicyError::Glob { scope: "write", .. }));
265 }
266}