use std::path::{Component, Path, PathBuf};
use crate::audit::{log_audit_event, AuditCategory, AuditSeverity};
use crate::error::{Result, ZeptoError};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SafePath {
path: PathBuf,
}
impl SafePath {
pub fn as_path(&self) -> &Path {
&self.path
}
pub fn into_path_buf(self) -> PathBuf {
self.path
}
}
impl AsRef<Path> for SafePath {
fn as_ref(&self) -> &Path {
&self.path
}
}
pub fn validate_path_in_workspace(path: &str, workspace: &str) -> Result<SafePath> {
if contains_traversal_pattern(path) {
log_audit_event(
AuditCategory::PathSecurity,
AuditSeverity::Critical,
"path_traversal",
&format!("Path contains suspicious traversal pattern: {}", path),
true,
);
return Err(ZeptoError::SecurityViolation(format!(
"Path contains suspicious traversal pattern: {}",
path
)));
}
let workspace_path = Path::new(workspace);
let target_path = Path::new(path);
let resolved_path = if target_path.is_absolute() {
target_path.to_path_buf()
} else {
workspace_path.join(target_path)
};
let normalized_path = normalize_path(&resolved_path);
let canonical_workspace = workspace_path
.canonicalize()
.unwrap_or_else(|_| normalize_path(workspace_path));
check_symlink_escape(&normalized_path, &canonical_workspace)?;
if !normalized_path.starts_with(&canonical_workspace) {
log_audit_event(
AuditCategory::PathSecurity,
AuditSeverity::Critical,
"path_escape",
&format!(
"Path escapes workspace: {} is not within {}",
path, workspace
),
true,
);
return Err(ZeptoError::SecurityViolation(format!(
"Path escapes workspace: {} is not within {}",
path, workspace
)));
}
Ok(SafePath {
path: normalized_path,
})
}
fn check_symlink_escape(path: &Path, canonical_workspace: &Path) -> Result<()> {
let relative = match path.strip_prefix(canonical_workspace) {
Ok(rel) => rel,
Err(_) => {
return Ok(());
}
};
let mut current = canonical_workspace.to_path_buf();
for component in relative.components() {
current.push(component);
if current.exists() {
if let Ok(canonical) = current.canonicalize() {
if !canonical.starts_with(canonical_workspace) {
log_audit_event(
AuditCategory::PathSecurity,
AuditSeverity::Critical,
"symlink_escape",
&format!(
"Symlink escape: '{}' resolves to '{}' outside workspace",
current.display(),
canonical.display()
),
true,
);
return Err(ZeptoError::SecurityViolation(format!(
"Symlink escape detected: '{}' resolves to '{}' which is outside workspace",
current.display(),
canonical.display()
)));
}
}
}
}
Ok(())
}
fn normalize_path(path: &Path) -> PathBuf {
let mut normalized = PathBuf::new();
for component in path.components() {
match component {
Component::ParentDir => {
normalized.pop();
}
Component::CurDir => {
}
_ => {
normalized.push(component);
}
}
}
normalized.canonicalize().unwrap_or(normalized)
}
fn contains_traversal_pattern(path: &str) -> bool {
let patterns = [
"..", "%2e%2e", "%252e%252e", "..%2f", "%2f..", "..\\", "\\..\\", ];
let lower_path = path.to_lowercase();
patterns.iter().any(|p| lower_path.contains(p))
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::os::unix::fs::symlink;
use tempfile::tempdir;
#[test]
fn test_valid_relative_path() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
fs::create_dir_all(temp.path().join("src")).unwrap();
fs::write(temp.path().join("src/main.rs"), "fn main() {}").unwrap();
let result = validate_path_in_workspace("src/main.rs", workspace);
assert!(result.is_ok());
}
#[test]
fn test_valid_absolute_path_in_workspace() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
fs::write(temp.path().join("file.txt"), "content").unwrap();
let absolute_path = temp.path().join("file.txt");
let result = validate_path_in_workspace(absolute_path.to_str().unwrap(), workspace);
assert!(result.is_ok());
}
#[test]
fn test_traversal_with_double_dots() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let result = validate_path_in_workspace("../../../etc/passwd", workspace);
assert!(result.is_err());
if let Err(ZeptoError::SecurityViolation(msg)) = result {
assert!(msg.contains("traversal pattern") || msg.contains("escapes workspace"));
} else {
panic!("Expected SecurityViolation error");
}
}
#[test]
fn test_traversal_with_encoded_dots() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let result = validate_path_in_workspace("%2e%2e/etc/passwd", workspace);
assert!(result.is_err());
}
#[test]
fn test_traversal_with_mixed_encoding() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let result = validate_path_in_workspace("..%2f../etc/passwd", workspace);
assert!(result.is_err());
}
#[test]
fn test_absolute_path_outside_workspace() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let result = validate_path_in_workspace("/etc/passwd", workspace);
assert!(result.is_err());
if let Err(ZeptoError::SecurityViolation(msg)) = result {
assert!(msg.contains("escapes workspace"));
} else {
panic!("Expected SecurityViolation error");
}
}
#[test]
fn test_nested_traversal() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
fs::create_dir_all(temp.path().join("a/b/c")).unwrap();
let result = validate_path_in_workspace("a/b/c/../../../../etc/passwd", workspace);
assert!(result.is_err());
}
#[test]
fn test_current_directory_reference() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
fs::write(temp.path().join("file.txt"), "content").unwrap();
let result = validate_path_in_workspace("./file.txt", workspace);
assert!(result.is_ok());
}
#[test]
fn test_complex_valid_path() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
fs::create_dir_all(temp.path().join("src/lib")).unwrap();
fs::write(temp.path().join("src/lib/mod.rs"), "// module").unwrap();
let result = validate_path_in_workspace("src/./lib/mod.rs", workspace);
assert!(result.is_ok());
}
#[test]
fn test_safe_path_conversion() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
fs::write(temp.path().join("test.txt"), "content").unwrap();
let safe_path = validate_path_in_workspace("test.txt", workspace).unwrap();
assert!(safe_path.as_path().ends_with("test.txt"));
let path_buf = safe_path.clone().into_path_buf();
assert!(path_buf.ends_with("test.txt"));
let path_ref: &Path = safe_path.as_ref();
assert!(path_ref.ends_with("test.txt"));
}
#[test]
fn test_windows_style_traversal() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let result = validate_path_in_workspace("..\\..\\etc\\passwd", workspace);
assert!(result.is_err());
}
#[test]
fn test_empty_path() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let result = validate_path_in_workspace("", workspace);
assert!(result.is_ok());
}
#[test]
fn test_normalize_path_basic() {
let path = Path::new("/a/b/../c/./d");
let normalized = normalize_path(path);
let components: Vec<_> = normalized.components().collect();
assert!(components
.iter()
.any(|c| matches!(c, Component::Normal(s) if s.to_str() == Some("a"))));
assert!(components
.iter()
.any(|c| matches!(c, Component::Normal(s) if s.to_str() == Some("c"))));
assert!(components
.iter()
.any(|c| matches!(c, Component::Normal(s) if s.to_str() == Some("d"))));
}
#[test]
fn test_symlink_escape_to_outside() {
let temp = tempdir().unwrap();
let outside = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let symlink_path = temp.path().join("escape_link");
symlink(outside.path(), &symlink_path).unwrap();
let result = validate_path_in_workspace("escape_link/secret.txt", workspace);
assert!(result.is_err());
if let Err(ZeptoError::SecurityViolation(msg)) = result {
assert!(
msg.contains("Symlink escape") || msg.contains("escapes workspace"),
"Expected symlink escape error, got: {}",
msg
);
} else {
panic!("Expected SecurityViolation error");
}
}
#[test]
fn test_symlink_within_workspace_allowed() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
fs::create_dir_all(temp.path().join("real_dir")).unwrap();
fs::write(temp.path().join("real_dir/file.txt"), "content").unwrap();
let symlink_path = temp.path().join("link_to_real");
symlink(temp.path().join("real_dir"), &symlink_path).unwrap();
let result = validate_path_in_workspace("link_to_real/file.txt", workspace);
assert!(result.is_ok());
}
#[test]
fn test_nested_symlink_escape() {
let temp = tempdir().unwrap();
let outside = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
fs::create_dir_all(temp.path().join("a")).unwrap();
symlink(outside.path(), temp.path().join("a/b")).unwrap();
let result = validate_path_in_workspace("a/b/secret.txt", workspace);
assert!(result.is_err());
}
#[test]
fn test_symlink_to_parent_blocked() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let symlink_path = temp.path().join("parent_link");
if let Some(parent) = temp.path().parent() {
symlink(parent, &symlink_path).unwrap();
let result = validate_path_in_workspace("parent_link/etc/passwd", workspace);
assert!(result.is_err());
}
}
#[test]
fn test_new_file_in_symlinked_dir_blocked() {
let temp = tempdir().unwrap();
let outside = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let symlink_path = temp.path().join("linked_dir");
symlink(outside.path(), &symlink_path).unwrap();
let result = validate_path_in_workspace("linked_dir/new_file.txt", workspace);
assert!(
result.is_err(),
"Should block writing new files through symlinks to outside"
);
}
}