Skip to main content

zeph_index/
watcher.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! File-system watcher for incremental re-indexing on save.
5//!
6//! [`IndexWatcher`] wraps `notify-debouncer-mini` and feeds file change events
7//! through an async channel into a background Tokio task that calls
8//! [`crate::indexer::CodeIndexer::reindex_file`].
9//!
10//! ## Debouncing
11//!
12//! Events pass through two debounce stages. The `notify-debouncer-mini` layer
13//! coalesces OS-level inotify/kqueue/FSEvents bursts with a 1-second window.
14//! A second 500 ms Tokio-side debounce batches any remaining rapid events into a
15//! single reindex pass, further reducing redundant work on bursty saves.
16//!
17//! ## Gitignore filtering
18//!
19//! The watcher loads `.gitignore` from the project root and skips files matched by
20//! it. This prevents spurious reindex calls for build artifacts in `target/` or
21//! temporary files in `.local/`.
22//!
23//! ## TUI integration
24//!
25//! When `status_tx` is supplied, the watcher sends a short `"Re-indexing <file>..."`
26//! message before each reindex and an empty string when it completes, so the TUI
27//! status bar shows live feedback.
28
29use std::collections::HashSet;
30use std::path::{Path, PathBuf};
31use std::sync::Arc;
32use std::time::Duration;
33
34use ignore::gitignore::{Gitignore, GitignoreBuilder};
35use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
36use tokio::sync::mpsc;
37
38use crate::error::Result;
39use crate::indexer::CodeIndexer;
40use crate::languages::is_indexable;
41
42/// Build a gitignore matcher for `root` by loading `.gitignore` from the root directory.
43/// Returns an empty (pass-all) matcher on any error so the watcher degrades gracefully.
44fn build_gitignore(root: &Path) -> Gitignore {
45    let mut builder = GitignoreBuilder::new(root);
46    let _ = builder.add(root.join(".gitignore"));
47    builder.build().unwrap_or_else(|_| Gitignore::empty())
48}
49
50/// Returns `true` if `path` should be skipped because it (or one of its ancestors)
51/// is matched by the project's `.gitignore`.
52fn is_gitignored(gitignore: &Gitignore, root: &Path, path: &Path) -> bool {
53    // matched_path_or_any_parents requires a path relative to the gitignore root.
54    let Ok(relative) = path.strip_prefix(root) else {
55        // Path outside root — let is_indexable decide.
56        return false;
57    };
58    gitignore
59        .matched_path_or_any_parents(relative, false)
60        .is_ignore()
61}
62
63/// A running file-system watcher that triggers incremental re-indexing on file saves.
64///
65/// Created by [`IndexWatcher::start`]. Dropping the `IndexWatcher` aborts the
66/// background Tokio task and the underlying `notify` watcher, stopping all
67/// file-system monitoring.
68///
69/// # Examples
70///
71/// ```no_run
72/// use std::sync::Arc;
73/// use std::path::Path;
74/// use zeph_index::watcher::IndexWatcher;
75/// # async fn example() -> zeph_index::Result<()> {
76/// # let indexer: Arc<zeph_index::indexer::CodeIndexer> = panic!("placeholder");
77///
78/// // Start watching — the returned handle keeps the watcher alive.
79/// let _watcher = IndexWatcher::start(Path::new("."), indexer, None)?;
80/// # Ok(())
81/// # }
82/// ```
83pub struct IndexWatcher {
84    _handle: tokio::task::JoinHandle<()>,
85}
86
87impl IndexWatcher {
88    /// Start the file-system watcher.
89    ///
90    /// When `status_tx` is `Some`, a short status message is sent to it whenever a file
91    /// reindex begins, and an empty string is sent when it completes (clearing the TUI
92    /// status bar). Pass `None` in non-TUI modes where no status indicator is needed.
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if the filesystem watcher cannot be initialized.
97    pub fn start(
98        root: &Path,
99        indexer: Arc<CodeIndexer>,
100        status_tx: Option<tokio::sync::mpsc::UnboundedSender<String>>,
101    ) -> Result<Self> {
102        const DEBOUNCE: Duration = Duration::from_millis(500);
103        // Under sustained FS writes the deadline resets on every event. MAX_DEBOUNCE
104        // caps the total wait so reindexing is never starved indefinitely.
105        const MAX_DEBOUNCE: Duration = Duration::from_secs(5);
106
107        let (notify_tx, mut notify_rx) = mpsc::channel::<PathBuf>(64);
108
109        let mut debouncer = new_debouncer(
110            Duration::from_secs(1),
111            move |events: std::result::Result<
112                Vec<notify_debouncer_mini::DebouncedEvent>,
113                notify::Error,
114            >| {
115                let events = match events {
116                    Ok(events) => events,
117                    Err(e) => {
118                        tracing::warn!("index watcher error: {e}");
119                        return;
120                    }
121                };
122
123                let paths: HashSet<PathBuf> = events
124                    .into_iter()
125                    .filter(|e| e.kind == DebouncedEventKind::Any && is_indexable(&e.path))
126                    .map(|e| e.path)
127                    .collect();
128
129                for path in paths {
130                    let _ = notify_tx.blocking_send(path);
131                }
132            },
133        )?;
134
135        debouncer
136            .watcher()
137            .watch(root, notify::RecursiveMode::Recursive)?;
138
139        let root = root.to_path_buf();
140        let gitignore = build_gitignore(&root);
141
142        let handle = tokio::spawn(async move {
143            let _debouncer = debouncer;
144            let mut pending: HashSet<PathBuf> = HashSet::new();
145            let mut deadline = tokio::time::Instant::now() + DEBOUNCE;
146            let mut batch_start: Option<tokio::time::Instant> = None;
147
148            loop {
149                tokio::select! {
150                    msg = notify_rx.recv() => {
151                        let Some(path) = msg else { break };
152                        if is_gitignored(&gitignore, &root, &path) {
153                            tracing::trace!(path = %path.display(), "skipping gitignored path");
154                            continue;
155                        }
156                        let now = tokio::time::Instant::now();
157                        let start = *batch_start.get_or_insert(now);
158                        pending.insert(path);
159                        // Cap deadline so sustained writes cannot starve reindexing indefinitely.
160                        deadline = (start + MAX_DEBOUNCE).min(now + DEBOUNCE);
161                    }
162                    () = tokio::time::sleep_until(deadline), if !pending.is_empty() => {
163                        let paths: Vec<PathBuf> = pending.drain().collect();
164                        batch_start = None;
165                        tracing::trace!("debounce fired, reindexing {} paths", paths.len());
166                        for path in paths {
167                            if let Some(ref tx) = status_tx {
168                                let name = path.file_name().map_or_else(
169                                    || path.display().to_string(),
170                                    |n| n.to_string_lossy().into_owned(),
171                                );
172                                let _ = tx.send(format!("Re-indexing {name}..."));
173                            }
174                            if let Err(e) = indexer.reindex_file(&root, &path).await {
175                                tracing::warn!(path = %path.display(), "reindex failed: {e:#}");
176                            }
177                            if let Some(ref tx) = status_tx {
178                                let _ = tx.send(String::new());
179                            }
180                        }
181                    }
182                }
183            }
184        });
185
186        Ok(Self { _handle: handle })
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193
194    use zeph_llm::any::AnyProvider;
195    use zeph_llm::ollama::OllamaProvider;
196    use zeph_memory::QdrantOps;
197
198    async fn create_test_pool() -> zeph_db::DbPool {
199        zeph_db::sqlx::SqlitePool::connect("sqlite::memory:")
200            .await
201            .unwrap()
202    }
203
204    async fn create_test_indexer() -> Arc<CodeIndexer> {
205        let ops = QdrantOps::new("http://localhost:6334", None).unwrap();
206        let store = crate::store::CodeStore::with_ops(ops, create_test_pool().await);
207        let provider = AnyProvider::Ollama(OllamaProvider::new(
208            "http://127.0.0.1:1",
209            "test".into(),
210            "embed".into(),
211        ));
212        Arc::new(CodeIndexer::new(
213            store,
214            Arc::new(provider),
215            crate::indexer::IndexerConfig::default(),
216        ))
217    }
218
219    #[tokio::test]
220    async fn start_with_valid_directory() {
221        let dir = tempfile::tempdir().unwrap();
222        let watcher = IndexWatcher::start(dir.path(), create_test_indexer().await, None);
223        assert!(watcher.is_ok());
224    }
225
226    #[tokio::test]
227    async fn start_with_nonexistent_directory_fails() {
228        let result = IndexWatcher::start(
229            Path::new("/nonexistent/path/xyz"),
230            create_test_indexer().await,
231            None,
232        );
233        assert!(result.is_err());
234    }
235
236    #[test]
237    fn gitignore_filters_target_directory() {
238        let dir = tempfile::tempdir().unwrap();
239        let root = dir.path();
240        std::fs::write(root.join(".gitignore"), "target/\n.local/\n").unwrap();
241
242        let gitignore = build_gitignore(root);
243
244        // Paths inside gitignored dirs must be filtered.
245        assert!(is_gitignored(
246            &gitignore,
247            root,
248            &root.join("target/debug/build")
249        ));
250        assert!(is_gitignored(
251            &gitignore,
252            root,
253            &root.join(".local/testing/debug/dump.json")
254        ));
255        // Tracked source files must not be filtered.
256        assert!(!is_gitignored(&gitignore, root, &root.join("src/main.rs")));
257        assert!(!is_gitignored(
258            &gitignore,
259            root,
260            &root.join("crates/zeph-core/src/lib.rs")
261        ));
262    }
263
264    #[test]
265    fn gitignore_passes_all_when_no_gitignore_file() {
266        let dir = tempfile::tempdir().unwrap();
267        let root = dir.path();
268        // No .gitignore — nothing should be filtered.
269        let gitignore = build_gitignore(root);
270        assert!(!is_gitignored(&gitignore, root, &root.join("src/lib.rs")));
271        assert!(!is_gitignored(
272            &gitignore,
273            root,
274            &root.join("target/debug/bin")
275        ));
276    }
277
278    #[test]
279    fn gitignore_ignores_path_outside_root() {
280        let dir = tempfile::tempdir().unwrap();
281        let root = dir.path();
282        std::fs::write(root.join(".gitignore"), "target/\n").unwrap();
283        let gitignore = build_gitignore(root);
284        // Path outside root must not be filtered (strip_prefix fails → false).
285        assert!(!is_gitignored(
286            &gitignore,
287            root,
288            Path::new("/tmp/other/target/foo")
289        ));
290    }
291}