use super::text_edit::resolve_text_edit;
use super::{FinalizeEditParams, ResolvedEdit};
use crate::server::helpers::{
check_occ, check_sandbox_access, io_error_data, pathfinder_to_error_data,
};
use crate::server::types::EditResponse;
use pathfinder_common::error::PathfinderError;
use pathfinder_common::indent::dedent_then_reindent;
use pathfinder_common::normalize::{normalize_for_body_replace, normalize_for_full_replace};
use pathfinder_common::types::{SemanticPath, VersionHash};
use rmcp::handler::server::wrapper::Json;
use rmcp::model::ErrorData;
use std::path::{Path, PathBuf};
use tracing::instrument;
impl crate::server::PathfinderServer {
pub(crate) async fn validate_batch_occ(
&self,
absolute_path: &Path,
base_version: &str,
filepath_str: &str,
) -> Result<(Vec<u8>, VersionHash), ErrorData> {
let source = tokio::fs::read(absolute_path)
.await
.map_err(|e| io_error_data(format!("failed to read file: {e}")))?;
let current_hash = VersionHash::compute(&source);
check_occ(base_version, ¤t_hash, PathBuf::from(filepath_str))?;
Ok((source, current_hash))
}
pub(crate) async fn resolve_single_batch_edit(
&self,
edit: &crate::server::types::BatchEdit,
edit_index: usize,
source: &[u8],
file_path: &Path,
) -> Result<ResolvedEdit, ErrorData> {
if let Some(ref old_text) = edit.old_text {
let Some(context_line) = edit.context_line else {
let err = PathfinderError::InvalidTarget {
semantic_path: format!("edit[{edit_index}]"),
reason: "`context_line` is required when `old_text` is set".to_owned(),
edit_index: Some(edit_index),
valid_edit_types: None,
};
return Err(pathfinder_to_error_data(&err));
};
let replacement = edit.replacement_text.as_deref().unwrap_or("");
let free = resolve_text_edit(
source,
old_text.as_str(),
context_line,
replacement,
edit.normalize_whitespace,
file_path,
)
.map_err(|e| pathfinder_to_error_data(&e))?;
return Ok(ResolvedEdit {
start_byte: free.start_byte,
end_byte: free.end_byte,
replacement: free.replacement,
});
}
let Some(semantic_path) = SemanticPath::parse(&edit.semantic_path) else {
let err = PathfinderError::InvalidSemanticPath {
input: edit.semantic_path.clone(),
issue: "Semantic path is malformed or missing '::' separator.".to_owned(),
};
return Err(pathfinder_to_error_data(&err));
};
self.resolve_semantic_batch_edit(&semantic_path, edit, edit_index, source)
.await
}
pub(crate) async fn resolve_batch_replace_body(
&self,
semantic_path: &SemanticPath,
new_code: &str,
source: &[u8],
) -> Result<ResolvedEdit, ErrorData> {
let (body_range, ..) = self
.surgeon
.resolve_body_range(self.workspace_root.path(), semantic_path)
.await
.map_err(crate::server::helpers::treesitter_error_to_error_data)?;
let normalized = normalize_for_body_replace(new_code);
let indented = dedent_then_reindent(&normalized, body_range.body_indent_column);
let is_brace_block = if body_range.end_byte > body_range.start_byte {
source.get(body_range.start_byte) == Some(&b'{')
&& source.get(body_range.end_byte.saturating_sub(1)) == Some(&b'}')
} else {
false
};
if is_brace_block {
let inner_start = body_range.start_byte + 1;
let inner_end = body_range.end_byte.saturating_sub(1);
let replacement = if indented.trim().is_empty() {
Vec::new()
} else {
let closing_indent = " ".repeat(body_range.indent_column);
format!("\n{indented}\n{closing_indent}").into_bytes()
};
Ok(ResolvedEdit {
start_byte: inner_start,
end_byte: inner_end,
replacement,
})
} else {
let mut end = body_range.start_byte;
while end > 0 && (source[end - 1] == b' ' || source[end - 1] == b'\t') {
end -= 1;
}
Ok(ResolvedEdit {
start_byte: end,
end_byte: body_range.end_byte,
replacement: format!("\n{indented}").into_bytes(),
})
}
}
pub(crate) async fn resolve_batch_replace_full(
&self,
semantic_path: &SemanticPath,
new_code: &str,
source: &[u8],
) -> Result<ResolvedEdit, ErrorData> {
if semantic_path.is_bare_file() {
return Ok(ResolvedEdit {
start_byte: 0,
end_byte: source.len(),
replacement: new_code.as_bytes().to_vec(),
});
}
let (full_range, ..) = self
.surgeon
.resolve_full_range(self.workspace_root.path(), semantic_path)
.await
.map_err(crate::server::helpers::treesitter_error_to_error_data)?;
let normalized = normalize_for_full_replace(new_code);
let indented = dedent_then_reindent(&normalized, full_range.indent_column);
Ok(ResolvedEdit {
start_byte: full_range.start_byte,
end_byte: full_range.end_byte,
replacement: indented.into_bytes(),
})
}
pub(crate) async fn resolve_batch_insert_before(
&self,
semantic_path: &SemanticPath,
new_code: &str,
source: &[u8],
) -> Result<ResolvedEdit, ErrorData> {
let (insert_byte, indent_column) = if semantic_path.is_bare_file() {
(0, 0)
} else {
let (symbol_range, ..) = self
.surgeon
.resolve_symbol_range(self.workspace_root.path(), semantic_path)
.await
.map_err(crate::server::helpers::treesitter_error_to_error_data)?;
(symbol_range.start_byte, symbol_range.indent_column)
};
let normalized = normalize_for_full_replace(new_code);
let indented = dedent_then_reindent(&normalized, indent_column);
let trailing = if indented.ends_with('\n') { "" } else { "\n" };
let after = &source[insert_byte..];
let sep = if after.starts_with(b"\n\n") {
""
} else if after.starts_with(b"\n") {
"\n"
} else {
"\n\n"
};
Ok(ResolvedEdit {
start_byte: insert_byte,
end_byte: insert_byte,
replacement: format!("{indented}{trailing}{sep}").into_bytes(),
})
}
pub(crate) async fn resolve_batch_insert_after(
&self,
semantic_path: &SemanticPath,
new_code: &str,
source: &[u8],
) -> Result<ResolvedEdit, ErrorData> {
let (insert_byte, indent_column) = if semantic_path.is_bare_file() {
(source.len(), 0)
} else {
let (symbol_range, ..) = self
.surgeon
.resolve_symbol_range(self.workspace_root.path(), semantic_path)
.await
.map_err(crate::server::helpers::treesitter_error_to_error_data)?;
(symbol_range.end_byte, symbol_range.indent_column)
};
let normalized = normalize_for_full_replace(new_code);
let indented = dedent_then_reindent(&normalized, indent_column);
let before = &source[..insert_byte];
let before_sep = if before.ends_with(b"\n\n") {
""
} else if before.ends_with(b"\n") {
"\n"
} else {
"\n\n"
};
let after_sep = if indented.ends_with('\n') { "" } else { "\n" };
Ok(ResolvedEdit {
start_byte: insert_byte,
end_byte: insert_byte,
replacement: format!("{before_sep}{indented}{after_sep}").into_bytes(),
})
}
pub(crate) async fn resolve_batch_delete(
&self,
semantic_path: &SemanticPath,
source: &[u8],
) -> Result<ResolvedEdit, ErrorData> {
let (full_range, ..) = self
.surgeon
.resolve_full_range(self.workspace_root.path(), semantic_path)
.await
.map_err(crate::server::helpers::treesitter_error_to_error_data)?;
let mut b_end = full_range.start_byte;
while b_end > 0 && source[b_end - 1].is_ascii_whitespace() {
b_end -= 1;
}
let mut a_start = full_range.end_byte;
while a_start < source.len() && source[a_start].is_ascii_whitespace() {
a_start += 1;
}
let sep = if b_end == 0 || a_start == source.len() {
b"\n" as &[u8]
} else {
b"\n\n"
};
Ok(ResolvedEdit {
start_byte: b_end,
end_byte: a_start,
replacement: sep.to_vec(),
})
}
pub(crate) async fn resolve_semantic_batch_edit(
&self,
semantic_path: &SemanticPath,
edit: &crate::server::types::BatchEdit,
edit_index: usize,
source: &[u8],
) -> Result<ResolvedEdit, ErrorData> {
let new_code = edit.new_code.as_deref().unwrap_or_default();
match edit.edit_type.as_str() {
"replace_body" => {
self.resolve_batch_replace_body(semantic_path, new_code, source)
.await
}
"replace_full" => {
self.resolve_batch_replace_full(semantic_path, new_code, source)
.await
}
"insert_before" => {
self.resolve_batch_insert_before(semantic_path, new_code, source)
.await
}
"insert_after" => {
self.resolve_batch_insert_after(semantic_path, new_code, source)
.await
}
"delete" => self.resolve_batch_delete(semantic_path, source).await,
_unknown => {
let err = PathfinderError::InvalidTarget {
semantic_path: edit.semantic_path.clone(),
reason: format!(
"edit_type is required for semantic targeting. Got: '{}' (empty).",
edit.edit_type
),
edit_index: Some(edit_index),
valid_edit_types: Some(vec![
"replace_body".to_string(),
"replace_full".to_string(),
"insert_before".to_string(),
"insert_after".to_string(),
"delete".to_string(),
]),
};
Err(pathfinder_to_error_data(&err))
}
}
}
pub(crate) fn apply_sorted_edits(
source: &[u8],
mut resolved_edits: Vec<(usize, String, ResolvedEdit)>,
) -> Result<Vec<u8>, ErrorData> {
resolved_edits.sort_by_key(|(_, _, e)| std::cmp::Reverse(e.start_byte));
for i in 1..resolved_edits.len() {
let (prev_idx, _, prev) = &resolved_edits[i - 1]; let (curr_idx, curr_path, curr) = &resolved_edits[i]; if curr.end_byte > prev.start_byte {
let err = PathfinderError::InvalidTarget {
semantic_path: curr_path.clone(),
reason: format!(
"overlapping edits in replace_batch: edit {curr_idx} overlaps with edit {prev_idx}"
),
edit_index: Some(*curr_idx),
valid_edit_types: None,
};
return Err(pathfinder_to_error_data(&err));
}
}
let mut new_bytes = source.to_vec();
for (_, _, edit) in resolved_edits {
new_bytes.splice(edit.start_byte..edit.end_byte, edit.replacement);
}
Ok(new_bytes)
}
#[instrument(skip(self, params), fields(filepath = %params.filepath))]
pub(crate) async fn replace_batch_impl(
&self,
params: crate::server::types::ReplaceBatchParams,
) -> Result<Json<EditResponse>, ErrorData> {
let start = std::time::Instant::now();
tracing::info!(
tool = "replace_batch",
filepath = %params.filepath,
"replace_batch: start"
);
let file_path = Path::new(¶ms.filepath);
check_sandbox_access(&self.sandbox, file_path, "replace_batch", ¶ms.filepath)?;
let absolute_path = self.workspace_root.resolve(file_path);
let (source, current_hash) = self
.validate_batch_occ(&absolute_path, ¶ms.base_version, ¶ms.filepath)
.await?;
let mut resolved_edits = Vec::new();
for (edit_index, edit) in params.edits.iter().enumerate() {
let resolved = self
.resolve_single_batch_edit(edit, edit_index, &source, file_path)
.await?;
let path_or_text = if !edit.semantic_path.is_empty() {
edit.semantic_path.clone()
} else if let Some(old_text) = &edit.old_text {
format!("text match: '{old_text}'")
} else {
"unknown".to_string()
};
resolved_edits.push((edit_index, path_or_text, resolved));
}
let new_bytes = Self::apply_sorted_edits(&source, resolved_edits)?;
let resolve_ms = start.elapsed().as_millis();
let semantic_path = if let Some(p) = SemanticPath::parse(¶ms.filepath) {
p
} else {
tracing::warn!(
filepath = %params.filepath,
"replace_batch: SemanticPath::parse failed, treating as bare file"
);
SemanticPath {
file_path: file_path.to_path_buf(),
symbol_chain: None,
}
};
self.finalize_edit(FinalizeEditParams {
tool_name: "replace_batch",
semantic_path: &semantic_path,
raw_semantic_path_str: ¶ms.filepath,
source: &source,
original_hash: ¤t_hash,
new_content: new_bytes,
ignore_validation_failures: params.ignore_validation_failures,
start_time: start,
resolve_ms,
})
.await
}
}