use std::path::{Component, Path, PathBuf};
use crate::audit::{log_audit_event, AuditCategory, AuditSeverity};
use crate::error::{Result, ZeptoError};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SafePath {
path: PathBuf,
}
impl SafePath {
pub fn as_path(&self) -> &Path {
&self.path
}
pub fn into_path_buf(self) -> PathBuf {
self.path
}
}
impl AsRef<Path> for SafePath {
fn as_ref(&self) -> &Path {
&self.path
}
}
pub fn validate_path_in_workspace(path: &str, workspace: &str) -> Result<SafePath> {
if contains_traversal_pattern(path) {
log_audit_event(
AuditCategory::PathSecurity,
AuditSeverity::Critical,
"path_traversal",
&format!("Path contains suspicious traversal pattern: {}", path),
true,
);
return Err(ZeptoError::SecurityViolation(format!(
"Path contains suspicious traversal pattern: {}",
path
)));
}
let workspace_path = Path::new(workspace);
let target_path = Path::new(path);
let resolved_path = if target_path.is_absolute() {
target_path.to_path_buf()
} else {
workspace_path.join(target_path)
};
let normalized_path = normalize_path(&resolved_path);
let canonical_workspace = workspace_path
.canonicalize()
.unwrap_or_else(|_| normalize_path(workspace_path));
check_symlink_escape(&normalized_path, &canonical_workspace)?;
if !normalized_path.starts_with(&canonical_workspace) {
log_audit_event(
AuditCategory::PathSecurity,
AuditSeverity::Critical,
"path_escape",
&format!(
"Path escapes workspace: {} is not within {}",
path, workspace
),
true,
);
return Err(ZeptoError::SecurityViolation(format!(
"Path escapes workspace: {} is not within {}",
path, workspace
)));
}
Ok(SafePath {
path: normalized_path,
})
}
fn check_symlink_escape(path: &Path, canonical_workspace: &Path) -> Result<()> {
let relative = match path.strip_prefix(canonical_workspace) {
Ok(rel) => rel,
Err(_) => {
return Ok(());
}
};
let mut current = canonical_workspace.to_path_buf();
for component in relative.components() {
current.push(component);
match std::fs::symlink_metadata(¤t) {
Ok(meta) => {
if meta.file_type().is_symlink() {
match current.canonicalize() {
Ok(canonical) => {
if !canonical.starts_with(canonical_workspace) {
log_audit_event(
AuditCategory::PathSecurity,
AuditSeverity::Critical,
"symlink_escape",
&format!(
"Symlink escape: '{}' resolves to '{}' outside workspace",
current.display(),
canonical.display()
),
true,
);
return Err(ZeptoError::SecurityViolation(format!(
"Symlink escape detected: '{}' resolves to '{}' which is outside workspace",
current.display(),
canonical.display()
)));
}
}
Err(_) => {
log_audit_event(
AuditCategory::PathSecurity,
AuditSeverity::Critical,
"dangling_symlink",
&format!(
"Dangling symlink: '{}' cannot be resolved",
current.display()
),
true,
);
return Err(ZeptoError::SecurityViolation(format!(
"Dangling symlink detected: '{}' target does not exist and cannot be validated",
current.display()
)));
}
}
} else if meta.is_dir() {
if let Ok(canonical) = current.canonicalize() {
if !canonical.starts_with(canonical_workspace) {
log_audit_event(
AuditCategory::PathSecurity,
AuditSeverity::Critical,
"symlink_escape",
&format!(
"Symlink escape: '{}' resolves to '{}' outside workspace",
current.display(),
canonical.display()
),
true,
);
return Err(ZeptoError::SecurityViolation(format!(
"Symlink escape detected: '{}' resolves to '{}' which is outside workspace",
current.display(),
canonical.display()
)));
}
}
}
}
Err(_) => {
}
}
}
Ok(())
}
pub fn revalidate_path(path: &Path, workspace: &str) -> Result<()> {
let workspace_path = Path::new(workspace);
let canonical_workspace = workspace_path
.canonicalize()
.unwrap_or_else(|_| normalize_path(workspace_path));
check_symlink_escape(path, &canonical_workspace)?;
if let Ok(canonical) = path.canonicalize() {
if !canonical.starts_with(&canonical_workspace) {
log_audit_event(
AuditCategory::PathSecurity,
AuditSeverity::Critical,
"toctou_escape",
&format!(
"Path moved outside workspace between validation and use: '{}' -> '{}'",
path.display(),
canonical.display()
),
true,
);
return Err(ZeptoError::SecurityViolation(format!(
"Path escaped workspace between validation and use: '{}' resolves to '{}'",
path.display(),
canonical.display()
)));
}
}
Ok(())
}
pub fn check_hardlink_write(path: &Path) -> Result<()> {
use std::os::unix::fs::MetadataExt;
match std::fs::metadata(path) {
Ok(meta) => {
if meta.nlink() > 1 {
log_audit_event(
AuditCategory::PathSecurity,
AuditSeverity::Critical,
"hardlink_escape",
&format!(
"File has {} hard links, may alias external inode: '{}'",
meta.nlink(),
path.display()
),
true,
);
return Err(ZeptoError::SecurityViolation(format!(
"Write blocked: '{}' has {} hard links and may alias content outside workspace",
path.display(),
meta.nlink()
)));
}
Ok(())
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
Ok(())
}
Err(e) => Err(ZeptoError::Tool(format!(
"Failed to check file metadata for '{}': {}",
path.display(),
e
))),
}
}
fn normalize_path(path: &Path) -> PathBuf {
let mut normalized = PathBuf::new();
for component in path.components() {
match component {
Component::ParentDir => {
normalized.pop();
}
Component::CurDir => {
}
_ => {
normalized.push(component);
}
}
}
normalized.canonicalize().unwrap_or(normalized)
}
fn contains_traversal_pattern(path: &str) -> bool {
let patterns = [
"..", "%2e%2e", "%252e%252e", "..%2f", "%2f..", "..\\", "\\..\\", ];
let lower_path = path.to_lowercase();
patterns.iter().any(|p| lower_path.contains(p))
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::os::unix::fs::symlink;
use tempfile::tempdir;
#[test]
fn test_valid_relative_path() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
fs::create_dir_all(temp.path().join("src")).unwrap();
fs::write(temp.path().join("src/main.rs"), "fn main() {}").unwrap();
let result = validate_path_in_workspace("src/main.rs", workspace);
assert!(result.is_ok());
}
#[test]
fn test_valid_absolute_path_in_workspace() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
fs::write(temp.path().join("file.txt"), "content").unwrap();
let absolute_path = temp.path().join("file.txt");
let result = validate_path_in_workspace(absolute_path.to_str().unwrap(), workspace);
assert!(result.is_ok());
}
#[test]
fn test_traversal_with_double_dots() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let result = validate_path_in_workspace("../../../etc/passwd", workspace);
assert!(result.is_err());
if let Err(ZeptoError::SecurityViolation(msg)) = result {
assert!(msg.contains("traversal pattern") || msg.contains("escapes workspace"));
} else {
panic!("Expected SecurityViolation error");
}
}
#[test]
fn test_traversal_with_encoded_dots() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let result = validate_path_in_workspace("%2e%2e/etc/passwd", workspace);
assert!(result.is_err());
}
#[test]
fn test_traversal_with_mixed_encoding() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let result = validate_path_in_workspace("..%2f../etc/passwd", workspace);
assert!(result.is_err());
}
#[test]
fn test_absolute_path_outside_workspace() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let result = validate_path_in_workspace("/etc/passwd", workspace);
assert!(result.is_err());
if let Err(ZeptoError::SecurityViolation(msg)) = result {
assert!(msg.contains("escapes workspace"));
} else {
panic!("Expected SecurityViolation error");
}
}
#[test]
fn test_nested_traversal() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
fs::create_dir_all(temp.path().join("a/b/c")).unwrap();
let result = validate_path_in_workspace("a/b/c/../../../../etc/passwd", workspace);
assert!(result.is_err());
}
#[test]
fn test_current_directory_reference() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
fs::write(temp.path().join("file.txt"), "content").unwrap();
let result = validate_path_in_workspace("./file.txt", workspace);
assert!(result.is_ok());
}
#[test]
fn test_complex_valid_path() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
fs::create_dir_all(temp.path().join("src/lib")).unwrap();
fs::write(temp.path().join("src/lib/mod.rs"), "// module").unwrap();
let result = validate_path_in_workspace("src/./lib/mod.rs", workspace);
assert!(result.is_ok());
}
#[test]
fn test_safe_path_conversion() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
fs::write(temp.path().join("test.txt"), "content").unwrap();
let safe_path = validate_path_in_workspace("test.txt", workspace).unwrap();
assert!(safe_path.as_path().ends_with("test.txt"));
let path_buf = safe_path.clone().into_path_buf();
assert!(path_buf.ends_with("test.txt"));
let path_ref: &Path = safe_path.as_ref();
assert!(path_ref.ends_with("test.txt"));
}
#[test]
fn test_windows_style_traversal() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let result = validate_path_in_workspace("..\\..\\etc\\passwd", workspace);
assert!(result.is_err());
}
#[test]
fn test_empty_path() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let result = validate_path_in_workspace("", workspace);
assert!(result.is_ok());
}
#[test]
fn test_normalize_path_basic() {
let path = Path::new("/a/b/../c/./d");
let normalized = normalize_path(path);
let components: Vec<_> = normalized.components().collect();
assert!(components
.iter()
.any(|c| matches!(c, Component::Normal(s) if s.to_str() == Some("a"))));
assert!(components
.iter()
.any(|c| matches!(c, Component::Normal(s) if s.to_str() == Some("c"))));
assert!(components
.iter()
.any(|c| matches!(c, Component::Normal(s) if s.to_str() == Some("d"))));
}
#[test]
fn test_symlink_escape_to_outside() {
let temp = tempdir().unwrap();
let outside = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let symlink_path = temp.path().join("escape_link");
symlink(outside.path(), &symlink_path).unwrap();
let result = validate_path_in_workspace("escape_link/secret.txt", workspace);
assert!(result.is_err());
if let Err(ZeptoError::SecurityViolation(msg)) = result {
assert!(
msg.contains("Symlink escape") || msg.contains("escapes workspace"),
"Expected symlink escape error, got: {}",
msg
);
} else {
panic!("Expected SecurityViolation error");
}
}
#[test]
fn test_symlink_within_workspace_allowed() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
fs::create_dir_all(temp.path().join("real_dir")).unwrap();
fs::write(temp.path().join("real_dir/file.txt"), "content").unwrap();
let symlink_path = temp.path().join("link_to_real");
symlink(temp.path().join("real_dir"), &symlink_path).unwrap();
let result = validate_path_in_workspace("link_to_real/file.txt", workspace);
assert!(result.is_ok());
}
#[test]
fn test_nested_symlink_escape() {
let temp = tempdir().unwrap();
let outside = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
fs::create_dir_all(temp.path().join("a")).unwrap();
symlink(outside.path(), temp.path().join("a/b")).unwrap();
let result = validate_path_in_workspace("a/b/secret.txt", workspace);
assert!(result.is_err());
}
#[test]
fn test_symlink_to_parent_blocked() {
let temp = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let symlink_path = temp.path().join("parent_link");
if let Some(parent) = temp.path().parent() {
symlink(parent, &symlink_path).unwrap();
let result = validate_path_in_workspace("parent_link/etc/passwd", workspace);
assert!(result.is_err());
}
}
#[test]
fn test_new_file_in_symlinked_dir_blocked() {
let temp = tempdir().unwrap();
let outside = tempdir().unwrap();
let workspace = temp.path().to_str().unwrap();
let symlink_path = temp.path().join("linked_dir");
symlink(outside.path(), &symlink_path).unwrap();
let result = validate_path_in_workspace("linked_dir/new_file.txt", workspace);
assert!(
result.is_err(),
"Should block writing new files through symlinks to outside"
);
}
#[test]
fn test_dangling_symlink_rejected() {
let temp = tempdir().unwrap();
let canonical = temp.path().canonicalize().unwrap();
let workspace = canonical.to_str().unwrap();
let nonexistent_target = canonical.join("does_not_exist_subdir");
let symlink_path = canonical.join("dangling_link");
symlink(&nonexistent_target, &symlink_path).unwrap();
let result = validate_path_in_workspace("dangling_link/file.txt", workspace);
assert!(
result.is_err(),
"Should reject dangling symlinks whose target can't be verified"
);
if let Err(ZeptoError::SecurityViolation(msg)) = result {
assert!(
msg.contains("Dangling symlink") || msg.contains("cannot be validated"),
"Expected dangling symlink error, got: {}",
msg
);
}
}
#[test]
fn test_dangling_symlink_to_outside_workspace() {
let temp = tempdir().unwrap();
let canonical = temp.path().canonicalize().unwrap();
let workspace = canonical.to_str().unwrap();
let symlink_path = canonical.join("future_escape");
symlink("/tmp/attacker_controlled_dir_nonexistent", &symlink_path).unwrap();
let result = validate_path_in_workspace("future_escape/secret.txt", workspace);
assert!(
result.is_err(),
"Should reject dangling symlink pointing outside workspace"
);
}
#[test]
fn test_nested_dangling_symlink() {
let temp = tempdir().unwrap();
let canonical = temp.path().canonicalize().unwrap();
let workspace = canonical.to_str().unwrap();
fs::create_dir_all(canonical.join("a")).unwrap();
let nonexistent_target = canonical.join("no_such_dir");
symlink(&nonexistent_target, canonical.join("a/dangling")).unwrap();
let result = validate_path_in_workspace("a/dangling/file.txt", workspace);
assert!(result.is_err(), "Should reject nested dangling symlinks");
}
#[test]
fn test_dangling_symlink_direct_access() {
let temp = tempdir().unwrap();
let canonical = temp.path().canonicalize().unwrap();
let workspace = canonical.to_str().unwrap();
let nonexistent_target = canonical.join("ghost");
let symlink_path = canonical.join("broken_link");
symlink(&nonexistent_target, &symlink_path).unwrap();
let result = validate_path_in_workspace("broken_link", workspace);
assert!(
result.is_err(),
"Should reject direct access to dangling symlink"
);
}
#[test]
fn test_revalidate_path_valid_file() {
let temp = tempdir().unwrap();
let canonical = temp.path().canonicalize().unwrap();
let workspace = canonical.to_str().unwrap();
let file = canonical.join("safe.txt");
fs::write(&file, "content").unwrap();
let result = revalidate_path(&file, workspace);
assert!(result.is_ok());
}
#[test]
fn test_revalidate_path_nonexistent_file() {
let temp = tempdir().unwrap();
let canonical = temp.path().canonicalize().unwrap();
let workspace = canonical.to_str().unwrap();
let file = canonical.join("new_file.txt");
let result = revalidate_path(&file, workspace);
assert!(result.is_ok());
}
#[test]
fn test_revalidate_path_symlink_escape() {
let temp = tempdir().unwrap();
let outside = tempdir().unwrap();
let canonical = temp.path().canonicalize().unwrap();
let workspace = canonical.to_str().unwrap();
let escape = canonical.join("escape");
symlink(outside.path(), &escape).unwrap();
let target = escape.join("secret.txt");
let result = revalidate_path(&target, workspace);
assert!(
result.is_err(),
"Should detect symlink escape on revalidation"
);
}
#[test]
fn test_hardlink_write_single_link() {
let temp = tempdir().unwrap();
let file = temp.path().join("single.txt");
fs::write(&file, "content").unwrap();
let result = check_hardlink_write(&file);
assert!(result.is_ok());
}
#[test]
fn test_hardlink_write_multiple_links() {
let temp = tempdir().unwrap();
let original = temp.path().join("original.txt");
fs::write(&original, "content").unwrap();
let link = temp.path().join("hardlink.txt");
fs::hard_link(&original, &link).unwrap();
let result = check_hardlink_write(&link);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("hard links"),
"Expected hardlink error, got: {}",
err
);
let result = check_hardlink_write(&original);
assert!(result.is_err());
}
#[test]
fn test_hardlink_write_nonexistent_file() {
let temp = tempdir().unwrap();
let nonexistent = temp.path().join("does_not_exist.txt");
let result = check_hardlink_write(&nonexistent);
assert!(result.is_ok());
}
}