use anyhow::Result;
use canon_embed::EmbeddingEngine;
use canon_store::GraphStore;
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tracing::{error, info, warn};
pub fn start_watcher(
watch_dir: PathBuf,
graph: Arc<Mutex<GraphStore>>,
embedder: Arc<EmbeddingEngine>,
data_dir: PathBuf,
) -> Result<tokio::task::JoinHandle<()>> {
let (tx, rx) = std::sync::mpsc::channel::<notify::Result<Event>>();
let mut watcher = RecommendedWatcher::new(
move |res| {
let _ = tx.send(res);
},
Config::default(),
)?;
watcher.watch(&watch_dir, RecursiveMode::Recursive)?;
info!("File watcher started for: {:?}", watch_dir);
let handle = tokio::task::spawn_blocking(move || {
let _watcher = watcher;
for event in rx {
match event {
Ok(event) => {
handle_event(&event, &graph, &embedder, &data_dir, &watch_dir);
}
Err(e) => {
warn!("File watcher error: {}", e);
}
}
}
});
Ok(handle)
}
fn handle_event(
event: &Event,
graph: &Arc<Mutex<GraphStore>>,
embedder: &Arc<EmbeddingEngine>,
data_dir: &Path,
watch_dir: &Path,
) {
for path in &event.paths {
if should_skip(path, data_dir, watch_dir) {
continue;
}
match event.kind {
EventKind::Create(_) | EventKind::Modify(_) => {
if path.is_file() {
if let Err(e) = index_file(path, graph, embedder) {
error!("Failed to index {}: {}", path.display(), e);
}
}
}
EventKind::Remove(_) => {
if let Err(e) = remove_file(path, graph) {
error!("Failed to remove {}: {}", path.display(), e);
}
}
_ => {}
}
}
}
fn should_skip(path: &Path, data_dir: &Path, _watch_dir: &Path) -> bool {
if path.starts_with(data_dir) {
return true;
}
let path_str = path.to_string_lossy();
for component in path.components() {
if let std::path::Component::Normal(name) = component {
let name = name.to_string_lossy();
if name.starts_with('.') && name != "." && name != ".." {
return true;
}
}
}
if path_str.contains("/node_modules/")
|| path_str.contains("/target/")
|| path_str.contains("/__pycache__/")
|| path_str.contains("/.git/")
{
return true;
}
if let Some(ext) = path.extension() {
let ext = ext.to_string_lossy().to_lowercase();
if matches!(
ext.as_str(),
"png" | "jpg" | "jpeg" | "gif" | "bmp" | "ico" | "svg"
| "woff" | "woff2" | "ttf" | "eot"
| "zip" | "tar" | "gz" | "bz2" | "xz" | "7z"
| "exe" | "dll" | "so" | "dylib"
| "pdf" | "doc" | "docx" | "xls" | "xlsx"
| "mp3" | "mp4" | "avi" | "mov" | "wav"
| "db" | "sqlite" | "usearch"
| "lock"
) {
return true;
}
}
if path.file_name().map(|n| n == ".DS_Store").unwrap_or(false) {
return true;
}
false
}
fn index_file(
file_path: &Path,
graph: &Arc<Mutex<GraphStore>>,
embedder: &Arc<EmbeddingEngine>,
) -> Result<()> {
let content = match crate::parser::parse_file(file_path) {
Ok(text) => text,
Err(_) => std::fs::read_to_string(file_path)?,
};
if content.trim().is_empty() {
return Ok(());
}
let mtime = std::fs::metadata(file_path)
.ok()
.and_then(|m| m.modified().ok())
.map(|t| {
t.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64
})
.unwrap_or(0);
let mut doc = canon_core::Document::new(file_path.to_path_buf(), content.as_bytes(), mtime);
let mut graph = graph.lock().unwrap();
if let Ok(Some(existing)) = graph.get_document_by_path(file_path) {
if existing.hash == doc.hash {
return Ok(()); }
graph.delete_document(existing.id)?;
}
let chunker = crate::parser::Chunker::new(crate::parser::ChunkConfig::default());
let chunks = chunker.chunk(doc.id, &content)
.map_err(|e| anyhow::anyhow!("Chunking failed: {}", e))?;
let chunk_hashes: Vec<[u8; 32]> = chunks.iter().map(|c| c.text_hash).collect();
let hier_hash = canon_core::Document::compute_hierarchical_hash(&chunk_hashes);
doc.set_hierarchical_hash(hier_hash);
graph.insert_document(&doc)?;
let texts: Vec<&str> = chunks.iter().map(|c| c.text.as_str()).collect();
let vectors = embedder
.embed_batch(&texts)
.map_err(|e| anyhow::anyhow!("Embedding failed: {}", e))?;
for (chunk, vector) in chunks.iter().zip(vectors.iter()) {
graph.insert_chunk(chunk)?;
let emb = canon_core::Embedding::new(chunk.id, vector, embedder.model_hash(), 0);
graph.insert_embedding(&emb)?;
graph.add_edge(&canon_core::Edge::doc_to_chunk(doc.id, chunk.id))?;
graph.add_edge(&canon_core::Edge::chunk_to_embedding(chunk.id, emb.id))?;
}
info!("Watcher indexed: {} ({} chunks)", file_path.display(), chunks.len());
Ok(())
}
fn remove_file(file_path: &Path, graph: &Arc<Mutex<GraphStore>>) -> Result<()> {
let mut graph = graph.lock().unwrap();
if let Ok(Some(doc)) = graph.get_document_by_path(file_path) {
graph.delete_document(doc.id)?;
info!("Watcher removed: {}", file_path.display());
}
Ok(())
}