use rmcp::model::ErrorData;
use tracing::warn;
use crate::error_meta;
pub(crate) fn validate_path(
path: &str,
require_exists: bool,
) -> Result<std::path::PathBuf, ErrorData> {
let cwd = std::env::current_dir().map_err(|_| {
ErrorData::new(
rmcp::model::ErrorCode::INVALID_PARAMS,
"path is outside the allowed root".to_string(),
Some(error_meta(
"validation",
false,
"ensure the working directory is accessible",
)),
)
})?;
let allowed_root = std::fs::canonicalize(&cwd).map_err(|_| {
ErrorData::new(
rmcp::model::ErrorCode::INVALID_PARAMS,
"path is outside the allowed root".to_string(),
Some(error_meta(
"validation",
false,
"ensure the working directory is accessible",
)),
)
})?;
let canonical_path = if require_exists {
std::fs::canonicalize(path).map_err(|e| {
let msg = match e.kind() {
std::io::ErrorKind::NotFound => format!("path not found: {path}"),
std::io::ErrorKind::PermissionDenied => format!("permission denied: {path}"),
_ => "path is outside the allowed root".to_string(),
};
ErrorData::new(
rmcp::model::ErrorCode::INVALID_PARAMS,
msg,
Some(error_meta(
"validation",
false,
"provide a valid path within the working directory",
)),
)
})?
} else {
let p = std::path::Path::new(path);
let mut ancestor = p.to_path_buf();
let mut suffix_components: Vec<std::ffi::OsString> = Vec::new();
loop {
if ancestor.exists() {
break;
}
if let Some(parent) = ancestor.parent()
&& let Some(file_name) = ancestor.file_name()
{
suffix_components.push(file_name.to_owned());
ancestor = parent.to_path_buf();
} else {
ancestor = allowed_root.clone();
break;
}
}
let suffix: std::path::PathBuf = suffix_components.into_iter().rev().collect();
let canonical_base = std::fs::canonicalize(&ancestor).unwrap_or_else(|e| {
warn!(
path = %ancestor.display(),
error = %e,
"canonicalize of existing ancestor failed (race condition); falling back to allowed_root"
);
allowed_root.clone()
});
canonical_base.join(&suffix)
};
if !canonical_path.starts_with(&allowed_root) {
return Err(ErrorData::new(
rmcp::model::ErrorCode::INVALID_PARAMS,
"path is outside the allowed root".to_string(),
Some(error_meta(
"validation",
false,
"provide a path within the current working directory",
)),
));
}
Ok(canonical_path)
}
pub(crate) fn io_error_to_path_error(
err: &std::io::Error,
path_context: &str,
suggested_action: &'static str,
) -> ErrorData {
let msg = match err.kind() {
std::io::ErrorKind::NotFound => format!("{path_context} not found"),
std::io::ErrorKind::PermissionDenied => format!("permission denied: {path_context}"),
_ => format!("{path_context} is invalid"),
};
let mut meta = error_meta("validation", false, suggested_action);
if let Some(obj) = meta.as_object_mut() {
obj.insert(
"ioErrorKind".to_string(),
serde_json::json!(format!("{:?}", err.kind())),
);
obj.insert(
"ioErrorSource".to_string(),
serde_json::json!(err.to_string()),
);
}
ErrorData::new(rmcp::model::ErrorCode::INVALID_PARAMS, msg, Some(meta))
}
pub(crate) fn validate_path_in_dir(
path: &str,
require_exists: bool,
working_dir: &std::path::Path,
) -> Result<std::path::PathBuf, ErrorData> {
let canonical_working_dir = std::fs::canonicalize(working_dir).map_err(|e| {
io_error_to_path_error(&e, "working_dir", "provide a valid working directory")
})?;
if !std::fs::metadata(&canonical_working_dir)
.map(|m| m.is_dir())
.unwrap_or(false)
{
return Err(ErrorData::new(
rmcp::model::ErrorCode::INVALID_PARAMS,
"working_dir must be a directory".to_string(),
Some(error_meta(
"validation",
false,
"provide a valid directory path",
)),
));
}
let canonical_path = if require_exists {
let target_path = canonical_working_dir.join(path);
std::fs::canonicalize(&target_path).map_err(|e| {
io_error_to_path_error(
&e,
path,
"provide a valid path within the working directory",
)
})?
} else {
let p = std::path::Path::new(path);
let mut ancestor = p.to_path_buf();
let mut suffix_components: Vec<std::ffi::OsString> = Vec::new();
loop {
let full_path = canonical_working_dir.join(&ancestor);
if full_path.exists() {
break;
}
if let Some(parent) = ancestor.parent()
&& let Some(file_name) = ancestor.file_name()
{
suffix_components.push(file_name.to_owned());
ancestor = parent.to_path_buf();
} else {
ancestor = std::path::PathBuf::new();
break;
}
}
let suffix: std::path::PathBuf = suffix_components.into_iter().rev().collect();
let canonical_base = canonical_working_dir.join(&ancestor);
let canonical_base =
std::fs::canonicalize(&canonical_base).unwrap_or(canonical_working_dir.clone());
canonical_base.join(&suffix)
};
if !canonical_path.starts_with(&canonical_working_dir) {
return Err(ErrorData::new(
rmcp::model::ErrorCode::INVALID_PARAMS,
"path is outside the working directory".to_string(),
Some(error_meta(
"validation",
false,
"provide a path within the working directory",
)),
));
}
Ok(canonical_path)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn validate_path_no_trailing_slash() {
let input = "subdir/new_file.txt";
let result = validate_path(input, false);
if let Ok(resolved) = result {
let path_str = resolved.to_string_lossy();
assert!(
!path_str.ends_with('/'),
"resolved path must not end with trailing slash: {path_str}"
);
assert_eq!(
resolved.extension(),
Some(std::ffi::OsStr::new("txt")),
"file extension should be txt, path has trailing separator"
);
}
}
}