use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use ignore::gitignore::{Gitignore, GitignoreBuilder};
use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
use tokio::sync::mpsc;
use crate::error::Result;
use crate::indexer::CodeIndexer;
use crate::languages::is_indexable;
fn build_gitignore(root: &Path) -> Gitignore {
let mut builder = GitignoreBuilder::new(root);
let _ = builder.add(root.join(".gitignore"));
builder.build().unwrap_or_else(|_| Gitignore::empty())
}
fn is_gitignored(gitignore: &Gitignore, root: &Path, path: &Path) -> bool {
let Ok(relative) = path.strip_prefix(root) else {
return false;
};
gitignore
.matched_path_or_any_parents(relative, false)
.is_ignore()
}
pub struct IndexWatcher {
_handle: tokio::task::JoinHandle<()>,
}
impl IndexWatcher {
pub fn start(
root: &Path,
indexer: Arc<CodeIndexer>,
status_tx: Option<tokio::sync::mpsc::UnboundedSender<String>>,
) -> Result<Self> {
const DEBOUNCE: Duration = Duration::from_millis(500);
const MAX_DEBOUNCE: Duration = Duration::from_secs(5);
let (notify_tx, mut notify_rx) = mpsc::channel::<PathBuf>(64);
let mut debouncer = new_debouncer(
Duration::from_secs(1),
move |events: std::result::Result<
Vec<notify_debouncer_mini::DebouncedEvent>,
notify::Error,
>| {
let events = match events {
Ok(events) => events,
Err(e) => {
tracing::warn!("index watcher error: {e}");
return;
}
};
let paths: HashSet<PathBuf> = events
.into_iter()
.filter(|e| e.kind == DebouncedEventKind::Any && is_indexable(&e.path))
.map(|e| e.path)
.collect();
for path in paths {
let _ = notify_tx.blocking_send(path);
}
},
)?;
debouncer
.watcher()
.watch(root, notify::RecursiveMode::Recursive)?;
let root = root.to_path_buf();
let gitignore = build_gitignore(&root);
let handle = tokio::spawn(async move {
let _debouncer = debouncer;
let mut pending: HashSet<PathBuf> = HashSet::new();
let mut deadline = tokio::time::Instant::now() + DEBOUNCE;
let mut batch_start: Option<tokio::time::Instant> = None;
loop {
tokio::select! {
msg = notify_rx.recv() => {
let Some(path) = msg else { break };
if is_gitignored(&gitignore, &root, &path) {
tracing::trace!(path = %path.display(), "skipping gitignored path");
continue;
}
let now = tokio::time::Instant::now();
let start = *batch_start.get_or_insert(now);
pending.insert(path);
deadline = (start + MAX_DEBOUNCE).min(now + DEBOUNCE);
}
() = tokio::time::sleep_until(deadline), if !pending.is_empty() => {
let paths: Vec<PathBuf> = pending.drain().collect();
batch_start = None;
tracing::trace!("debounce fired, reindexing {} paths", paths.len());
for path in paths {
if let Some(ref tx) = status_tx {
let name = path.file_name().map_or_else(
|| path.display().to_string(),
|n| n.to_string_lossy().into_owned(),
);
let _ = tx.send(format!("Re-indexing {name}..."));
}
if let Err(e) = indexer.reindex_file(&root, &path).await {
tracing::warn!(path = %path.display(), "reindex failed: {e:#}");
}
if let Some(ref tx) = status_tx {
let _ = tx.send(String::new());
}
}
}
}
}
});
Ok(Self { _handle: handle })
}
}
#[cfg(test)]
mod tests {
use super::*;
use zeph_llm::any::AnyProvider;
use zeph_llm::ollama::OllamaProvider;
use zeph_memory::QdrantOps;
async fn create_test_pool() -> zeph_db::DbPool {
zeph_db::sqlx::SqlitePool::connect("sqlite::memory:")
.await
.unwrap()
}
async fn create_test_indexer() -> Arc<CodeIndexer> {
let ops = QdrantOps::new("http://localhost:6334").unwrap();
let store = crate::store::CodeStore::with_ops(ops, create_test_pool().await);
let provider = AnyProvider::Ollama(OllamaProvider::new(
"http://127.0.0.1:1",
"test".into(),
"embed".into(),
));
Arc::new(CodeIndexer::new(
store,
Arc::new(provider),
crate::indexer::IndexerConfig::default(),
))
}
#[tokio::test]
async fn start_with_valid_directory() {
let dir = tempfile::tempdir().unwrap();
let watcher = IndexWatcher::start(dir.path(), create_test_indexer().await, None);
assert!(watcher.is_ok());
}
#[tokio::test]
async fn start_with_nonexistent_directory_fails() {
let result = IndexWatcher::start(
Path::new("/nonexistent/path/xyz"),
create_test_indexer().await,
None,
);
assert!(result.is_err());
}
#[test]
fn gitignore_filters_target_directory() {
let dir = tempfile::tempdir().unwrap();
let root = dir.path();
std::fs::write(root.join(".gitignore"), "target/\n.local/\n").unwrap();
let gitignore = build_gitignore(root);
assert!(is_gitignored(
&gitignore,
root,
&root.join("target/debug/build")
));
assert!(is_gitignored(
&gitignore,
root,
&root.join(".local/testing/debug/dump.json")
));
assert!(!is_gitignored(&gitignore, root, &root.join("src/main.rs")));
assert!(!is_gitignored(
&gitignore,
root,
&root.join("crates/zeph-core/src/lib.rs")
));
}
#[test]
fn gitignore_passes_all_when_no_gitignore_file() {
let dir = tempfile::tempdir().unwrap();
let root = dir.path();
let gitignore = build_gitignore(root);
assert!(!is_gitignored(&gitignore, root, &root.join("src/lib.rs")));
assert!(!is_gitignored(
&gitignore,
root,
&root.join("target/debug/bin")
));
}
#[test]
fn gitignore_ignores_path_outside_root() {
let dir = tempfile::tempdir().unwrap();
let root = dir.path();
std::fs::write(root.join(".gitignore"), "target/\n").unwrap();
let gitignore = build_gitignore(root);
assert!(!is_gitignored(
&gitignore,
root,
Path::new("/tmp/other/target/foo")
));
}
}