rmcp-memex 0.3.6

RAG/memory MCP server with LanceDB vector storage
Documentation
//! Path sanitization utilities.
//!
//! Provides secure path handling to prevent path traversal attacks.
//! All user-provided paths must be validated through this module.

use anyhow::{Result, anyhow};
use std::path::{Path, PathBuf};

// Path validation happens dynamically based on home directory.
// Allowed locations: home dir, /Users (macOS), /tmp, /var/folders.

/// Expand tilde and environment variables in a path.
fn expand_path(path: &str) -> String {
    shellexpand::tilde(path.trim()).to_string()
}

/// Canonicalize a path, returning error if it doesn't exist.
fn canonicalize_existing(path: &Path) -> Result<PathBuf> {
    path.canonicalize()
        .map_err(|e| anyhow!("Cannot canonicalize path '{}': {}", path.display(), e))
}

/// Check if a path contains traversal sequences.
fn contains_traversal(path: &str) -> bool {
    let path_lower = path.to_lowercase();
    path_lower.contains("..")
        || path_lower.contains("./")
        || path.contains('\0')
        || path.contains('\n')
        || path.contains('\r')
}

/// Get the user's home directory.
fn home_dir() -> Result<PathBuf> {
    std::env::var("HOME")
        .map(PathBuf::from)
        .map_err(|_| anyhow!("Cannot determine home directory from $HOME"))
}

/// Validate that a path is under an allowed base directory.
fn is_under_allowed_base(path: &Path) -> Result<bool> {
    let home = home_dir()?;

    // Check if path is under home directory
    if path.starts_with(&home) {
        return Ok(true);
    }

    // For macOS, also allow /Users/<username>
    #[cfg(target_os = "macos")]
    if path.starts_with("/Users") {
        // Validate it's a real user path, not traversal
        let components: Vec<_> = path.components().collect();
        if components.len() >= 3 {
            // /Users/username/... is fine
            return Ok(true);
        }
    }

    // Temporary directories are also allowed (for tests)
    // Note: On macOS, /var and /tmp are symlinks to /private/var and /private/tmp
    if path.starts_with("/tmp")
        || path.starts_with("/var/folders")
        || path.starts_with("/private/tmp")
        || path.starts_with("/private/var/folders")
    {
        return Ok(true);
    }

    Ok(false)
}

/// Sanitize and validate a user-provided path.
///
/// This function:
/// 1. Expands tilde (~) to home directory
/// 2. Checks for path traversal sequences
/// 3. Canonicalizes the path (requires it to exist)
/// 4. Validates the path is under an allowed base directory
///
/// Returns the sanitized, canonicalized path.
pub fn sanitize_existing_path(path: &str) -> Result<PathBuf> {
    // Check for traversal before expansion
    if contains_traversal(path) {
        return Err(anyhow!(
            "Path contains invalid traversal sequence: {}",
            path
        ));
    }

    let expanded = expand_path(path);

    // Check again after expansion
    if contains_traversal(&expanded) {
        return Err(anyhow!(
            "Expanded path contains invalid sequence: {}",
            expanded
        ));
    }

    // This IS the sanitization function. Traversal is checked above,
    // and the path is canonicalized and validated below.
    // nosemgrep: rust.actix.path-traversal.tainted-path.tainted-path
    let path_buf = PathBuf::from(&expanded);

    // Canonicalize to resolve any remaining symlinks
    let canonical = canonicalize_existing(&path_buf)?;

    // Validate it's under an allowed base
    if !is_under_allowed_base(&canonical)? {
        return Err(anyhow!(
            "Path '{}' is not under an allowed directory",
            canonical.display()
        ));
    }

    Ok(canonical)
}

/// Sanitize a path that may not exist yet (for creation).
///
/// This is more permissive - it validates the parent directory exists
/// and the path is under an allowed base.
pub fn sanitize_new_path(path: &str) -> Result<PathBuf> {
    // Check for traversal before expansion
    if contains_traversal(path) {
        return Err(anyhow!(
            "Path contains invalid traversal sequence: {}",
            path
        ));
    }

    let expanded = expand_path(path);

    // Check again after expansion
    if contains_traversal(&expanded) {
        return Err(anyhow!(
            "Expanded path contains invalid sequence: {}",
            expanded
        ));
    }

    // This IS the sanitization function. Traversal is checked above,
    // and parent directory is validated below.
    // nosemgrep: rust.actix.path-traversal.tainted-path.tainted-path
    let path_buf = PathBuf::from(&expanded);

    // For new paths, validate the parent exists and is allowed
    if let Some(parent) = path_buf.parent() {
        if parent.exists() {
            let canonical_parent = canonicalize_existing(parent)?;
            if !is_under_allowed_base(&canonical_parent)? {
                return Err(anyhow!(
                    "Parent directory '{}' is not under an allowed directory",
                    canonical_parent.display()
                ));
            }
        } else if let Some(grandparent) = parent.parent()
            && grandparent.exists()
        {
            // Parent doesn't exist - check grandparent
            let canonical_gp = canonicalize_existing(grandparent)?;
            if !is_under_allowed_base(&canonical_gp)? {
                return Err(anyhow!(
                    "Path '{}' would be created outside allowed directories",
                    path_buf.display()
                ));
            }
        }
    }

    Ok(path_buf)
}

/// Validate a path is safe for reading (must exist, be under allowed base).
pub fn validate_read_path(path: &Path) -> Result<PathBuf> {
    if !path.exists() {
        return Err(anyhow!("Path does not exist: {}", path.display()));
    }

    let canonical = canonicalize_existing(path)?;

    if !is_under_allowed_base(&canonical)? {
        return Err(anyhow!(
            "Cannot read from path outside allowed directories: {}",
            canonical.display()
        ));
    }

    Ok(canonical)
}

/// Validate a path is safe for writing.
pub fn validate_write_path(path: &Path) -> Result<PathBuf> {
    // Check the path string for traversal
    let path_str = path.to_string_lossy();
    if contains_traversal(&path_str) {
        return Err(anyhow!("Path contains invalid traversal sequence"));
    }

    if path.exists() {
        // Existing path - canonicalize and validate
        let canonical = canonicalize_existing(path)?;
        if !is_under_allowed_base(&canonical)? {
            return Err(anyhow!(
                "Cannot write to path outside allowed directories: {}",
                canonical.display()
            ));
        }
        Ok(canonical)
    } else {
        // New path - validate parent
        sanitize_new_path(&path_str)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::fs;
    use tempfile::tempdir;

    #[test]
    fn test_traversal_detection() {
        assert!(contains_traversal("../etc/passwd"));
        assert!(contains_traversal("foo/../bar"));
        assert!(contains_traversal("./hidden"));
        assert!(contains_traversal("path\0with\0nulls"));
        assert!(!contains_traversal("/normal/path"));
        assert!(!contains_traversal("~/Documents"));
    }

    #[test]
    fn test_sanitize_existing_path() {
        // Create a temp directory for testing
        let tmp = tempdir().unwrap();
        let test_file = tmp.path().join("test.txt");
        fs::write(&test_file, "test").unwrap();

        // Valid path should work
        let result = sanitize_existing_path(test_file.to_str().unwrap());
        assert!(
            result.is_ok(),
            "Failed for path: {:?}, error: {:?}",
            test_file,
            result
        );

        // Path with traversal should fail
        let traversal = format!("{}/../../../etc/passwd", tmp.path().display());
        let result = sanitize_existing_path(&traversal);
        assert!(result.is_err());
    }

    #[test]
    fn test_validate_read_path() {
        let tmp = tempdir().unwrap();
        let test_file = tmp.path().join("readable.txt");
        fs::write(&test_file, "content").unwrap();

        let result = validate_read_path(&test_file);
        assert!(result.is_ok());

        // Non-existent path should fail
        let missing = tmp.path().join("missing.txt");
        let result = validate_read_path(&missing);
        assert!(result.is_err());
    }

    #[test]
    fn test_validate_write_path() {
        let tmp = tempdir().unwrap();

        // New file in valid directory
        let new_file = tmp.path().join("new.txt");
        let result = validate_write_path(&new_file);
        assert!(result.is_ok());

        // Existing file
        let existing = tmp.path().join("existing.txt");
        fs::write(&existing, "data").unwrap();
        let result = validate_write_path(&existing);
        assert!(result.is_ok());
    }
}