use anyhow::{Context, Result};
use std::fs;
use std::path::{Path, PathBuf};
use tracing::{debug, info, warn};
use crate::db::traits::StoreChunks;
use crate::db::traits::StoreCore;
use crate::db::SqliteStore;
use crate::indexer::SymbolChunk;
use std::sync::Arc;
use super::detector::ChangeType;
use super::edge_updater::EdgeUpdater;
use super::hash::ContentHash;
use super::path_utils::normalize_to_relpath;
use super::task::UpdateTask;
const MAX_FILE_SIZE_BYTES: u64 = 10 * 1024 * 1024;
pub struct IncrementalProcessor {
store: Arc<SqliteStore>,
edge_updater: EdgeUpdater,
repo_root: PathBuf,
repo_id: i64,
worktree_id: i64,
commit_id: i64,
}
impl IncrementalProcessor {
pub fn new(
store: Arc<SqliteStore>,
repo_root: PathBuf,
repo_id: i64,
worktree_id: i64,
commit_id: i64,
) -> Self {
Self {
edge_updater: EdgeUpdater::new(store.clone()),
store,
repo_root,
repo_id,
worktree_id,
commit_id,
}
}
pub async fn process(&self, task: UpdateTask) -> Result<()> {
let path_display = task.path.display().to_string();
debug!(
path = %path_display,
change_type = ?task.change_type,
priority = ?task.priority,
"Processing update task"
);
match &task.change_type {
ChangeType::New(hash) => {
self.index_new_file(&task.path, hash)
.await
.with_context(|| format!("Failed to index new file: {}", path_display))?;
info!(path = %path_display, "Indexed new file");
}
ChangeType::Modified { old: _, new } => {
self.update_file(&task.path, new)
.await
.with_context(|| format!("Failed to update modified file: {}", path_display))?;
info!(path = %path_display, "Updated modified file");
}
ChangeType::Deleted(_) => {
self.remove_file(&task.path)
.await
.with_context(|| format!("Failed to remove deleted file: {}", path_display))?;
info!(path = %path_display, "Removed deleted file");
}
ChangeType::None => {
debug!(path = %path_display, "No change detected, skipping");
return Ok(());
}
}
Ok(())
}
async fn index_new_file(&self, path: &Path, hash: &ContentHash) -> Result<()> {
let metadata = fs::metadata(path)
.with_context(|| format!("Failed to get file metadata: {}", path.display()))?;
if metadata.len() > MAX_FILE_SIZE_BYTES {
warn!(
path = %path.display(),
size_mb = metadata.len() / (1024 * 1024),
limit_mb = MAX_FILE_SIZE_BYTES / (1024 * 1024),
"File too large to index, skipping"
);
return Ok(()); }
let symlink_metadata = fs::symlink_metadata(path)
.with_context(|| format!("Failed to get symlink metadata: {}", path.display()))?;
if symlink_metadata.file_type().is_symlink() {
warn!(
path = %path.display(),
"Indexing symlink - resolved path may be outside repository"
);
}
let content = fs::read_to_string(path)
.with_context(|| format!("Failed to read file: {}", path.display()))?;
let language = detect_language_from_path(path);
let relpath = normalize_to_relpath(path, &self.repo_root)
.with_context(|| format!("Failed to normalize path: {}", path.display()))?;
let relpath_str = relpath
.to_str()
.ok_or_else(|| anyhow::anyhow!("Invalid UTF-8 in path: {}", relpath.display()))?;
let file_record = crate::db::FileRecord {
repo_id: self.repo_id,
worktree_id: self.worktree_id,
commit_id: self.commit_id,
relpath: relpath_str.to_string(),
language: language.map(|s| s.to_string()),
content_hash: hash.to_hex().to_string(),
size_bytes: metadata.len() as i32,
last_modified: Some(chrono::Utc::now()),
};
let file_id = self
.store
.upsert_file(&file_record)
.await
.with_context(|| format!("Failed to upsert file record: {}", path.display()))?;
let lang_str = language.unwrap_or("unknown");
let symbol_chunks = parse_file_chunks(&content, lang_str)
.with_context(|| format!("Failed to parse file: {}", path.display()))?;
let mut chunk_ids = Vec::new();
for chunk in &symbol_chunks {
let preview = content
.lines()
.skip((chunk.start_line - 1) as usize)
.take((chunk.end_line - chunk.start_line + 1) as usize)
.collect::<Vec<_>>()
.join("\n");
let ts_doc_text = build_ts_doc(
chunk.symbol_name.as_deref(),
chunk.signature.as_deref(),
chunk.docstring.as_deref(),
&preview,
);
let chunk_content = &preview;
let blob_sha = super::hash::FileHasher::hash_bytes(chunk_content.as_bytes())
.to_hex()
.to_string();
let chunk_record = crate::db::ChunkRecord {
file_id,
blob_sha,
symbol_name: chunk.symbol_name.clone(),
kind: chunk.kind.clone(),
signature: chunk.signature.clone(),
docstring: chunk.docstring.clone(),
start_line: chunk.start_line,
end_line: chunk.end_line,
preview,
ts_doc_text,
recency_score: 1.0, churn_score: 0.0, metadata: chunk.metadata.clone(),
worktree_id: self.worktree_id,
};
let chunk_id = self
.store
.insert_chunk(&chunk_record)
.await
.with_context(|| format!("Failed to insert chunk for file: {}", path.display()))?;
chunk_ids.push(chunk_id);
}
self.edge_updater
.update_edges(file_id)
.await
.with_context(|| format!("Failed to update edges for file: {}", path.display()))?;
debug!(
path = %path.display(),
file_id = file_id,
chunks = chunk_ids.len(),
"Indexed new file"
);
Ok(())
}
async fn update_file(&self, path: &Path, _new_hash: &ContentHash) -> Result<()> {
let metadata = fs::metadata(path)
.with_context(|| format!("Failed to get file metadata: {}", path.display()))?;
if metadata.len() > MAX_FILE_SIZE_BYTES {
warn!(
path = %path.display(),
size_mb = metadata.len() / (1024 * 1024),
limit_mb = MAX_FILE_SIZE_BYTES / (1024 * 1024),
"File too large to index, skipping"
);
return Ok(()); }
let symlink_metadata = fs::symlink_metadata(path)
.with_context(|| format!("Failed to get symlink metadata: {}", path.display()))?;
if symlink_metadata.file_type().is_symlink() {
warn!(
path = %path.display(),
"Indexing symlink - resolved path may be outside repository"
);
}
let content = fs::read_to_string(path)
.with_context(|| format!("Failed to read file: {}", path.display()))?;
let language = detect_language_from_path(path);
let relpath = normalize_to_relpath(path, &self.repo_root)
.with_context(|| format!("Failed to normalize path: {}", path.display()))?;
let relpath_str = relpath
.to_str()
.ok_or_else(|| anyhow::anyhow!("Invalid UTF-8 in path: {}", relpath.display()))?;
let file_id = self
.store
.get_file_id_by_relpath(relpath_str, self.worktree_id)
.await
.with_context(|| format!("Failed to look up file: {}", path.display()))?;
let file_id = match file_id {
Some(id) => id,
None => {
debug!(path = %path.display(), "File not found in DB, treating as new");
return self.index_new_file(path, _new_hash).await;
}
};
let chunks_deleted = self
.store
.delete_chunks_by_file(file_id)
.await
.with_context(|| format!("Failed to delete old chunks for file: {}", path.display()))?;
debug!(path = %path.display(), chunks_deleted = chunks_deleted, "Deleted old chunks");
let lang_str = language.unwrap_or("unknown");
let symbol_chunks = parse_file_chunks(&content, lang_str)
.with_context(|| format!("Failed to parse file: {}", path.display()))?;
let mut chunk_ids = Vec::new();
for chunk in &symbol_chunks {
let preview = content
.lines()
.skip((chunk.start_line - 1) as usize)
.take((chunk.end_line - chunk.start_line + 1) as usize)
.collect::<Vec<_>>()
.join("\n");
let ts_doc_text = build_ts_doc(
chunk.symbol_name.as_deref(),
chunk.signature.as_deref(),
chunk.docstring.as_deref(),
&preview,
);
let chunk_content = &preview;
let blob_sha = super::hash::FileHasher::hash_bytes(chunk_content.as_bytes())
.to_hex()
.to_string();
let chunk_record = crate::db::ChunkRecord {
file_id,
blob_sha,
symbol_name: chunk.symbol_name.clone(),
kind: chunk.kind.clone(),
signature: chunk.signature.clone(),
docstring: chunk.docstring.clone(),
start_line: chunk.start_line,
end_line: chunk.end_line,
preview,
ts_doc_text,
recency_score: 1.0, churn_score: 0.5, metadata: chunk.metadata.clone(),
worktree_id: self.worktree_id,
};
let chunk_id = self
.store
.insert_chunk(&chunk_record)
.await
.with_context(|| format!("Failed to insert chunk for file: {}", path.display()))?;
chunk_ids.push(chunk_id);
}
self.edge_updater
.update_edges(file_id)
.await
.with_context(|| format!("Failed to update edges for file: {}", path.display()))?;
debug!(
path = %path.display(),
file_id = file_id,
chunks = chunk_ids.len(),
"Updated file"
);
Ok(())
}
async fn remove_file(&self, path: &Path) -> Result<()> {
let relpath = normalize_to_relpath(path, &self.repo_root)
.with_context(|| format!("Failed to normalize path: {}", path.display()))?;
let relpath_str = relpath
.to_str()
.ok_or_else(|| anyhow::anyhow!("Invalid UTF-8 in path: {}", relpath.display()))?;
let file_id = self
.store
.get_file_id_by_relpath(relpath_str, self.worktree_id)
.await
.with_context(|| format!("Failed to look up file: {}", path.display()))?;
let file_id = match file_id {
Some(id) => id,
None => {
debug!(path = %path.display(), "File not found in DB, nothing to delete");
return Ok(());
}
};
let chunks_deleted = self
.store
.delete_chunks_by_file(file_id)
.await
.with_context(|| format!("Failed to delete chunks for file: {}", path.display()))?;
self.store
.delete_file(file_id)
.await
.with_context(|| format!("Failed to delete file record: {}", path.display()))?;
debug!(
path = %path.display(),
file_id = file_id,
chunks_deleted = chunks_deleted,
"Removed file"
);
Ok(())
}
}
fn detect_language_from_path(path: &Path) -> Option<&'static str> {
match path.extension().and_then(|e| e.to_str()).unwrap_or("") {
"ts" => Some("ts"),
"tsx" => Some("tsx"),
"js" => Some("js"),
"jsx" => Some("jsx"),
"rs" => Some("rs"),
"md" => Some("md"),
"mdx" => Some("mdx"),
"json" => Some("json"),
"yaml" | "yml" => Some("yaml"),
"toml" => Some("toml"),
_ => None,
}
}
fn parse_file_chunks(content: &str, language: &str) -> Result<Vec<SymbolChunk>> {
use crate::indexer::parser;
let chunks = parser::extract_chunks(content, language);
if chunks.is_empty() {
Ok(vec![SymbolChunk {
symbol_name: None,
kind: "module".to_string(),
signature: None,
docstring: None,
start_line: 1,
end_line: content.lines().count() as i32,
metadata: None,
}])
} else {
Ok(chunks)
}
}
fn build_ts_doc(
symbol_name: Option<&str>,
signature: Option<&str>,
docstring: Option<&str>,
preview: &str,
) -> String {
let mut parts: Vec<String> = Vec::new();
if let Some(s) = symbol_name {
parts.push(s.to_owned());
}
if let Some(s) = signature {
parts.push(s.to_owned());
}
if let Some(s) = docstring {
parts.push(s.to_owned());
}
parts.push(preview.to_owned());
parts.join(" \n ")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_detect_language_from_path() {
assert_eq!(
detect_language_from_path(Path::new("src/main.rs")),
Some("rs")
);
assert_eq!(
detect_language_from_path(Path::new("src/lib.ts")),
Some("ts")
);
assert_eq!(
detect_language_from_path(Path::new("README.md")),
Some("md")
);
assert_eq!(
detect_language_from_path(Path::new("config.yaml")),
Some("yaml")
);
assert_eq!(detect_language_from_path(Path::new("unknown.xyz")), None);
}
#[test]
fn test_build_ts_doc() {
let doc = build_ts_doc(
Some("myFunction"),
Some("fn myFunction(x: i32) -> i32"),
Some("Does something cool"),
"let x = 42;",
);
assert!(doc.contains("myFunction"));
assert!(doc.contains("fn myFunction"));
assert!(doc.contains("Does something cool"));
assert!(doc.contains("let x = 42;"));
}
#[test]
fn test_build_ts_doc_minimal() {
let doc = build_ts_doc(None, None, None, "some code");
assert_eq!(doc, "some code");
}
#[test]
fn test_parse_file_chunks_creates_module_for_empty() {
let chunks = parse_file_chunks("", "unknown").unwrap();
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].kind, "module");
}
}