1use std::collections::HashSet;
30use std::path::{Path, PathBuf};
31use std::sync::Arc;
32use std::time::Duration;
33
34use ignore::gitignore::{Gitignore, GitignoreBuilder};
35use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
36use tokio::sync::mpsc;
37
38use crate::error::Result;
39use crate::indexer::CodeIndexer;
40use crate::languages::is_indexable;
41
42fn build_gitignore(root: &Path) -> Gitignore {
45 let mut builder = GitignoreBuilder::new(root);
46 let _ = builder.add(root.join(".gitignore"));
47 builder.build().unwrap_or_else(|_| Gitignore::empty())
48}
49
50fn is_gitignored(gitignore: &Gitignore, root: &Path, path: &Path) -> bool {
53 let Ok(relative) = path.strip_prefix(root) else {
55 return false;
57 };
58 gitignore
59 .matched_path_or_any_parents(relative, false)
60 .is_ignore()
61}
62
63pub struct IndexWatcher {
84 _handle: tokio::task::JoinHandle<()>,
85}
86
87impl IndexWatcher {
88 pub fn start(
98 root: &Path,
99 indexer: Arc<CodeIndexer>,
100 status_tx: Option<tokio::sync::mpsc::UnboundedSender<String>>,
101 ) -> Result<Self> {
102 const DEBOUNCE: Duration = Duration::from_millis(500);
103 const MAX_DEBOUNCE: Duration = Duration::from_secs(5);
106
107 let (notify_tx, mut notify_rx) = mpsc::channel::<PathBuf>(64);
108
109 let mut debouncer = new_debouncer(
110 Duration::from_secs(1),
111 move |events: std::result::Result<
112 Vec<notify_debouncer_mini::DebouncedEvent>,
113 notify::Error,
114 >| {
115 let events = match events {
116 Ok(events) => events,
117 Err(e) => {
118 tracing::warn!("index watcher error: {e}");
119 return;
120 }
121 };
122
123 let paths: HashSet<PathBuf> = events
124 .into_iter()
125 .filter(|e| e.kind == DebouncedEventKind::Any && is_indexable(&e.path))
126 .map(|e| e.path)
127 .collect();
128
129 for path in paths {
130 let _ = notify_tx.blocking_send(path);
131 }
132 },
133 )?;
134
135 debouncer
136 .watcher()
137 .watch(root, notify::RecursiveMode::Recursive)?;
138
139 let root = root.to_path_buf();
140 let gitignore = build_gitignore(&root);
141
142 let handle = tokio::spawn(async move {
143 let _debouncer = debouncer;
144 let mut pending: HashSet<PathBuf> = HashSet::new();
145 let mut deadline = tokio::time::Instant::now() + DEBOUNCE;
146 let mut batch_start: Option<tokio::time::Instant> = None;
147
148 loop {
149 tokio::select! {
150 msg = notify_rx.recv() => {
151 let Some(path) = msg else { break };
152 if is_gitignored(&gitignore, &root, &path) {
153 tracing::trace!(path = %path.display(), "skipping gitignored path");
154 continue;
155 }
156 let now = tokio::time::Instant::now();
157 let start = *batch_start.get_or_insert(now);
158 pending.insert(path);
159 deadline = (start + MAX_DEBOUNCE).min(now + DEBOUNCE);
161 }
162 () = tokio::time::sleep_until(deadline), if !pending.is_empty() => {
163 let paths: Vec<PathBuf> = pending.drain().collect();
164 batch_start = None;
165 tracing::trace!("debounce fired, reindexing {} paths", paths.len());
166 for path in paths {
167 if let Some(ref tx) = status_tx {
168 let name = path.file_name().map_or_else(
169 || path.display().to_string(),
170 |n| n.to_string_lossy().into_owned(),
171 );
172 let _ = tx.send(format!("Re-indexing {name}..."));
173 }
174 if let Err(e) = indexer.reindex_file(&root, &path).await {
175 tracing::warn!(path = %path.display(), "reindex failed: {e:#}");
176 }
177 if let Some(ref tx) = status_tx {
178 let _ = tx.send(String::new());
179 }
180 }
181 }
182 }
183 }
184 });
185
186 Ok(Self { _handle: handle })
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 use zeph_llm::any::AnyProvider;
195 use zeph_llm::ollama::OllamaProvider;
196 use zeph_memory::QdrantOps;
197
198 async fn create_test_pool() -> zeph_db::DbPool {
199 zeph_db::sqlx::SqlitePool::connect("sqlite::memory:")
200 .await
201 .unwrap()
202 }
203
204 async fn create_test_indexer() -> Arc<CodeIndexer> {
205 let ops = QdrantOps::new("http://localhost:6334", None).unwrap();
206 let store = crate::store::CodeStore::with_ops(ops, create_test_pool().await);
207 let provider = AnyProvider::Ollama(OllamaProvider::new(
208 "http://127.0.0.1:1",
209 "test".into(),
210 "embed".into(),
211 ));
212 Arc::new(CodeIndexer::new(
213 store,
214 Arc::new(provider),
215 crate::indexer::IndexerConfig::default(),
216 ))
217 }
218
219 #[tokio::test]
220 async fn start_with_valid_directory() {
221 let dir = tempfile::tempdir().unwrap();
222 let watcher = IndexWatcher::start(dir.path(), create_test_indexer().await, None);
223 assert!(watcher.is_ok());
224 }
225
226 #[tokio::test]
227 async fn start_with_nonexistent_directory_fails() {
228 let result = IndexWatcher::start(
229 Path::new("/nonexistent/path/xyz"),
230 create_test_indexer().await,
231 None,
232 );
233 assert!(result.is_err());
234 }
235
236 #[test]
237 fn gitignore_filters_target_directory() {
238 let dir = tempfile::tempdir().unwrap();
239 let root = dir.path();
240 std::fs::write(root.join(".gitignore"), "target/\n.local/\n").unwrap();
241
242 let gitignore = build_gitignore(root);
243
244 assert!(is_gitignored(
246 &gitignore,
247 root,
248 &root.join("target/debug/build")
249 ));
250 assert!(is_gitignored(
251 &gitignore,
252 root,
253 &root.join(".local/testing/debug/dump.json")
254 ));
255 assert!(!is_gitignored(&gitignore, root, &root.join("src/main.rs")));
257 assert!(!is_gitignored(
258 &gitignore,
259 root,
260 &root.join("crates/zeph-core/src/lib.rs")
261 ));
262 }
263
264 #[test]
265 fn gitignore_passes_all_when_no_gitignore_file() {
266 let dir = tempfile::tempdir().unwrap();
267 let root = dir.path();
268 let gitignore = build_gitignore(root);
270 assert!(!is_gitignored(&gitignore, root, &root.join("src/lib.rs")));
271 assert!(!is_gitignored(
272 &gitignore,
273 root,
274 &root.join("target/debug/bin")
275 ));
276 }
277
278 #[test]
279 fn gitignore_ignores_path_outside_root() {
280 let dir = tempfile::tempdir().unwrap();
281 let root = dir.path();
282 std::fs::write(root.join(".gitignore"), "target/\n").unwrap();
283 let gitignore = build_gitignore(root);
284 assert!(!is_gitignored(
286 &gitignore,
287 root,
288 Path::new("/tmp/other/target/foo")
289 ));
290 }
291}