use anyhow::{Result, anyhow};
use std::path::{Path, PathBuf};
use tracing::debug;
pub struct PathValidator {
roots: Vec<PathBuf>,
protected_configs: Vec<PathBuf>,
}
impl PathValidator {
pub fn new(roots: Vec<PathBuf>) -> Self {
let protected_configs = Self::discover_config_paths(&roots);
debug!(
"PathValidator initialized with {} root(s), {} protected config(s)",
roots.len(),
protected_configs.len()
);
Self {
roots,
protected_configs,
}
}
#[must_use]
pub fn roots(&self) -> &[PathBuf] {
&self.roots
}
pub fn update_roots(&mut self, roots: Vec<PathBuf>) {
self.protected_configs = Self::discover_config_paths(&roots);
debug!(
"PathValidator updated: {} root(s), {} protected config(s)",
roots.len(),
self.protected_configs.len()
);
self.roots = roots;
}
pub fn validate_read(&self, path: &Path) -> Result<PathBuf> {
let canonical = path
.canonicalize()
.map_err(|e| anyhow!("Path does not exist: {}: {e}", path.display()))?;
if !self.is_within_roots(&canonical) {
return Err(anyhow!(
"Path is outside workspace roots: {}",
path.display()
));
}
Ok(canonical)
}
pub fn validate_write(&self, path: &Path) -> Result<PathBuf> {
let canonical = if path.exists() {
let canonical = path
.canonicalize()
.map_err(|e| anyhow!("Cannot resolve path: {}: {e}", path.display()))?;
if !self.is_within_roots(&canonical) {
return Err(anyhow!(
"Path is outside workspace roots: {}",
path.display()
));
}
canonical
} else {
let parent = path
.parent()
.ok_or_else(|| anyhow!("Cannot determine parent directory: {}", path.display()))?;
let existing_ancestor = Self::find_existing_ancestor(parent)?;
let canonical_ancestor = existing_ancestor.canonicalize().map_err(|e| {
anyhow!(
"Cannot resolve ancestor path: {}: {e}",
existing_ancestor.display()
)
})?;
if !self.is_within_roots(&canonical_ancestor) {
return Err(anyhow!(
"Path is outside workspace roots: {}",
path.display()
));
}
let remaining = path.strip_prefix(&existing_ancestor).unwrap_or_else(|_| {
path.file_name().map_or_else(|| Path::new(""), Path::new)
});
canonical_ancestor.join(remaining)
};
if self.is_config_file(&canonical) {
return Err(anyhow!(
"Cannot modify Catenary configuration file: {}",
path.display()
));
}
Ok(canonical)
}
fn is_within_roots(&self, canonical: &Path) -> bool {
self.roots.iter().any(|root| canonical.starts_with(root))
}
fn is_config_file(&self, canonical: &Path) -> bool {
self.protected_configs
.iter()
.any(|config| canonical == config)
}
fn discover_config_paths(roots: &[PathBuf]) -> Vec<PathBuf> {
let mut paths = Vec::new();
if let Some(config_dir) = dirs::config_dir() {
let user_config = config_dir.join("catenary").join("config.toml");
if let Ok(canonical) = user_config.canonicalize() {
paths.push(canonical);
}
}
for root in roots {
let mut current = Some(root.as_path());
while let Some(dir) = current {
let config_path = dir.join(".catenary.toml");
if let Ok(canonical) = config_path.canonicalize() {
if !paths.contains(&canonical) {
paths.push(canonical);
}
break;
}
current = dir.parent();
}
}
paths
}
fn find_existing_ancestor(path: &Path) -> Result<PathBuf> {
let mut current = path;
loop {
if current.exists() {
return Ok(current.to_path_buf());
}
current = current
.parent()
.ok_or_else(|| anyhow!("No existing ancestor found for: {}", path.display()))?;
}
}
}
#[cfg(test)]
#[allow(
clippy::expect_used,
reason = "tests use expect for readable assertions"
)]
mod tests {
use super::*;
use std::fs;
use tempfile::TempDir;
fn setup_workspace() -> Result<(TempDir, PathValidator)> {
let dir = TempDir::new().map_err(|e| anyhow!("{e}"))?;
let root = dir.path().canonicalize()?;
fs::write(root.join("test.rs"), "fn main() {}")?;
fs::create_dir_all(root.join("src"))?;
fs::write(root.join("src/lib.rs"), "// lib")?;
let validator = PathValidator::new(vec![root]);
Ok((dir, validator))
}
#[test]
fn test_read_within_root_succeeds() -> Result<()> {
let (dir, validator) = setup_workspace()?;
let result = validator.validate_read(&dir.path().join("test.rs"));
assert!(result.is_ok());
Ok(())
}
#[test]
fn test_read_subdirectory_succeeds() -> Result<()> {
let (dir, validator) = setup_workspace()?;
let result = validator.validate_read(&dir.path().join("src/lib.rs"));
assert!(result.is_ok());
Ok(())
}
#[test]
fn test_read_outside_root_fails() -> Result<()> {
let (_dir, validator) = setup_workspace()?;
let result = validator.validate_read(Path::new("/etc/hostname"));
assert!(result.is_err());
let err = result.expect_err("expected error").to_string();
assert!(
err.contains("outside workspace roots"),
"Error should mention workspace roots: {err}"
);
Ok(())
}
#[test]
fn test_read_nonexistent_fails() -> Result<()> {
let (dir, validator) = setup_workspace()?;
let result = validator.validate_read(&dir.path().join("nonexistent.rs"));
assert!(result.is_err());
let err = result.expect_err("expected error").to_string();
assert!(
err.contains("does not exist"),
"Error should mention file not existing: {err}"
);
Ok(())
}
#[test]
fn test_read_path_traversal_outside_root_fails() -> Result<()> {
let (_dir, validator) = setup_workspace()?;
let result = validator.validate_read(Path::new("/tmp/../etc/hostname"));
assert!(result.is_err());
Ok(())
}
#[test]
fn test_write_within_root_succeeds() -> Result<()> {
let (dir, validator) = setup_workspace()?;
let result = validator.validate_write(&dir.path().join("test.rs"));
assert!(result.is_ok());
Ok(())
}
#[test]
fn test_write_outside_root_fails() -> Result<()> {
let (_dir, validator) = setup_workspace()?;
let result = validator.validate_write(Path::new("/tmp/outside.rs"));
assert!(result.is_err());
Ok(())
}
#[test]
fn test_write_new_file_within_root_succeeds() -> Result<()> {
let (dir, validator) = setup_workspace()?;
let result = validator.validate_write(&dir.path().join("new_file.rs"));
assert!(result.is_ok());
Ok(())
}
#[test]
fn test_write_new_file_in_new_subdir_within_root() -> Result<()> {
let (dir, validator) = setup_workspace()?;
let result = validator.validate_write(&dir.path().join("new_dir/new_file.rs"));
assert!(result.is_ok());
Ok(())
}
#[test]
fn test_write_config_file_rejected() -> Result<()> {
let (dir, _) = setup_workspace()?;
let config_path = dir.path().join(".catenary.toml");
fs::write(&config_path, "idle_timeout = 300")?;
let root = dir.path().canonicalize()?;
let validator = PathValidator::new(vec![root]);
let result = validator.validate_write(&config_path);
assert!(result.is_err());
let err = result.expect_err("expected error").to_string();
assert!(
err.contains("configuration file"),
"Error should mention config file: {err}"
);
Ok(())
}
#[test]
fn test_read_config_file_allowed() -> Result<()> {
let (dir, _) = setup_workspace()?;
let config_path = dir.path().join(".catenary.toml");
fs::write(&config_path, "idle_timeout = 300")?;
let root = dir.path().canonicalize()?;
let validator = PathValidator::new(vec![root]);
let result = validator.validate_read(&config_path);
assert!(result.is_ok());
Ok(())
}
#[test]
fn test_multiple_roots() -> Result<()> {
let dir1 = TempDir::new()?;
let dir2 = TempDir::new()?;
let root1 = dir1.path().canonicalize()?;
let root2 = dir2.path().canonicalize()?;
fs::write(root1.join("a.rs"), "// a")?;
fs::write(root2.join("b.rs"), "// b")?;
let validator = PathValidator::new(vec![root1, root2]);
assert!(validator.validate_read(&dir1.path().join("a.rs")).is_ok());
assert!(validator.validate_read(&dir2.path().join("b.rs")).is_ok());
Ok(())
}
#[test]
fn test_update_roots() -> Result<()> {
let dir1 = TempDir::new()?;
let dir2 = TempDir::new()?;
let root1 = dir1.path().canonicalize()?;
let root2 = dir2.path().canonicalize()?;
fs::write(root1.join("a.rs"), "// a")?;
fs::write(root2.join("b.rs"), "// b")?;
let mut validator = PathValidator::new(vec![root1]);
assert!(validator.validate_read(&dir2.path().join("b.rs")).is_err());
validator.update_roots(vec![dir1.path().canonicalize()?, root2]);
assert!(validator.validate_read(&dir2.path().join("b.rs")).is_ok());
Ok(())
}
#[cfg(unix)]
#[test]
fn test_symlink_within_root_succeeds() -> Result<()> {
use std::os::unix::fs as unix_fs;
let (dir, validator) = setup_workspace()?;
let root = dir.path().canonicalize()?;
let link_path = root.join("link.rs");
unix_fs::symlink(root.join("test.rs"), &link_path)?;
let result = validator.validate_read(&link_path);
assert!(result.is_ok());
Ok(())
}
#[cfg(unix)]
#[test]
fn test_symlink_outside_root_fails() -> Result<()> {
use std::os::unix::fs as unix_fs;
let (dir, validator) = setup_workspace()?;
let root = dir.path().canonicalize()?;
let outside_dir = TempDir::new()?;
let outside_file = outside_dir.path().join("secret.txt");
fs::write(&outside_file, "secret")?;
let link_path = root.join("sneaky_link.txt");
unix_fs::symlink(&outside_file, &link_path)?;
let result = validator.validate_read(&link_path);
assert!(result.is_err());
Ok(())
}
}