Skip to main content

codelens_engine/
watcher.rs

1use crate::import_graph::GraphCache;
2use crate::symbols::SymbolIndex;
3use crate::vfs;
4use anyhow::Result;
5use notify::RecommendedWatcher;
6use notify_debouncer_mini::{DebouncedEventKind, Debouncer, new_debouncer};
7use std::path::{Path, PathBuf};
8use std::sync::{
9    Arc,
10    atomic::{AtomicBool, AtomicU64, Ordering},
11};
12use std::time::Duration;
13use tracing::{debug, warn};
14
15/// File watcher that automatically re-indexes changed files.
16pub struct FileWatcher {
17    _debouncer: Debouncer<RecommendedWatcher>,
18    running: Arc<AtomicBool>,
19    events_processed: Arc<AtomicU64>,
20    files_reindexed: Arc<AtomicU64>,
21    lock_contention_batches: Arc<AtomicU64>,
22}
23
24#[derive(Debug, Clone, serde::Serialize)]
25pub struct WatcherStats {
26    pub running: bool,
27    pub events_processed: u64,
28    pub files_reindexed: u64,
29    pub lock_contention_batches: u64,
30    /// Number of files that failed to index (available when symbol index is queried).
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub index_failures: Option<usize>,
33}
34
35impl FileWatcher {
36    /// Start watching the project root for file changes.
37    /// Changed files are automatically re-indexed via `SymbolIndex::index_files`
38    /// and the `GraphCache` is invalidated.
39    pub fn start(
40        root: &Path,
41        symbol_index: Arc<SymbolIndex>,
42        graph_cache: Arc<GraphCache>,
43    ) -> Result<Self> {
44        let running = Arc::new(AtomicBool::new(true));
45        let events_processed = Arc::new(AtomicU64::new(0));
46        let files_reindexed = Arc::new(AtomicU64::new(0));
47        let lock_contention_batches = Arc::new(AtomicU64::new(0));
48
49        let running_clone = running.clone();
50        let events_clone = events_processed.clone();
51        let files_clone = files_reindexed.clone();
52        let contention_clone = lock_contention_batches.clone();
53        // Owned copy for the move-closure: exclusion is evaluated relative to
54        // the watched root so dot-directory ancestors don't drop events (#358).
55        let watch_root = root.to_path_buf();
56
57        let mut debouncer = new_debouncer(
58            Duration::from_millis(300),
59            move |res: Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>| {
60                if !running_clone.load(Ordering::Relaxed) {
61                    return;
62                }
63                let events = match res {
64                    Ok(events) => events,
65                    Err(e) => {
66                        warn!(error = %e, "file watcher error");
67                        return;
68                    }
69                };
70
71                // Classify raw watcher events into changed/removed
72                let mut raw_changed: Vec<PathBuf> = Vec::new();
73                let mut raw_removed: Vec<PathBuf> = Vec::new();
74
75                for event in &events {
76                    let path = &event.path;
77                    match event.kind {
78                        DebouncedEventKind::Any => {
79                            if path.is_file() {
80                                raw_changed.push(path.clone());
81                            } else {
82                                raw_removed.push(path.clone());
83                            }
84                        }
85                        DebouncedEventKind::AnyContinuous => {} // ongoing writes — skip
86                        _ => {}
87                    }
88                }
89
90                events_clone.fetch_add(events.len() as u64, Ordering::Relaxed);
91
92                // Normalize through VFS layer (filters, deduplicates, detects renames)
93                let file_events = vfs::normalize_events(&watch_root, &raw_changed, &raw_removed);
94                let (changed, removed, renamed) = vfs::partition_events(&file_events);
95
96                debug!(
97                    changed = changed.len(),
98                    removed = removed.len(),
99                    renamed = renamed.len(),
100                    total_events = events.len(),
101                    "watcher batch processed"
102                );
103
104                if changed.is_empty() && removed.is_empty() {
105                    return;
106                }
107
108                let mut reindexed = 0u64;
109                if !changed.is_empty() {
110                    match index_files_with_retry(&symbol_index, &changed) {
111                        Ok(n) => {
112                            reindexed += n as u64;
113                            // Clear failures for successfully indexed files
114                            let db = symbol_index.db();
115                            for file in &changed {
116                                let rel = file.to_string_lossy();
117                                let _ = db.clear_index_failure(&rel);
118                            }
119                        }
120                        Err(e) => {
121                            if is_lock_contention_error(&e) {
122                                contention_clone.fetch_add(1, Ordering::Relaxed);
123                                debug!(
124                                    error = %e,
125                                    count = changed.len(),
126                                    "index_files batch skipped after lock contention retries"
127                                );
128                                return;
129                            }
130                            warn!(error = %e, count = changed.len(), "index_files batch failed");
131                            // Record failure for each file in the batch
132                            let db = symbol_index.db();
133                            for file in &changed {
134                                let rel = file.to_string_lossy();
135                                let _ = db.record_index_failure(
136                                    &rel,
137                                    "index_batch_error",
138                                    &e.to_string(),
139                                );
140                            }
141                        }
142                    }
143                }
144                if !removed.is_empty() {
145                    match symbol_index.remove_files(&removed) {
146                        Ok(n) => reindexed += n as u64,
147                        Err(e) => warn!(error = %e, "remove_files failed"),
148                    }
149                }
150
151                if reindexed > 0 {
152                    graph_cache.invalidate();
153                    // Invalidate FTS index so next search triggers lazy rebuild
154                    let _ = symbol_index.db().invalidate_fts();
155                    files_clone.fetch_add(reindexed, Ordering::Relaxed);
156                    debug!(reindexed, "graph cache + FTS invalidated");
157                }
158            },
159        )?;
160
161        // Watch the project root recursively
162        debouncer
163            .watcher()
164            .watch(root, notify::RecursiveMode::Recursive)?;
165
166        Ok(Self {
167            _debouncer: debouncer,
168            running,
169            events_processed,
170            files_reindexed,
171            lock_contention_batches,
172        })
173    }
174
175    pub fn stats(&self) -> WatcherStats {
176        WatcherStats {
177            running: self.running.load(Ordering::Relaxed),
178            events_processed: self.events_processed.load(Ordering::Relaxed),
179            files_reindexed: self.files_reindexed.load(Ordering::Relaxed),
180            lock_contention_batches: self.lock_contention_batches.load(Ordering::Relaxed),
181            index_failures: None,
182        }
183    }
184
185    pub fn stop(&self) {
186        self.running.store(false, Ordering::Relaxed);
187    }
188}
189
190fn index_files_with_retry(symbol_index: &SymbolIndex, changed: &[PathBuf]) -> Result<usize> {
191    const RETRY_DELAYS_MS: [u64; 2] = [100, 250];
192
193    match symbol_index.index_files(changed) {
194        Ok(count) => Ok(count),
195        Err(error) if is_lock_contention_error(&error) => {
196            for delay_ms in RETRY_DELAYS_MS {
197                std::thread::sleep(Duration::from_millis(delay_ms));
198                match symbol_index.index_files(changed) {
199                    Ok(count) => return Ok(count),
200                    Err(retry_error) if is_lock_contention_error(&retry_error) => continue,
201                    Err(retry_error) => return Err(retry_error),
202                }
203            }
204            Err(error)
205        }
206        Err(error) => Err(error),
207    }
208}
209
210fn is_lock_contention_error(error: &anyhow::Error) -> bool {
211    error.chain().any(|cause| {
212        cause
213            .downcast_ref::<rusqlite::Error>()
214            .is_some_and(|sqlite_error| {
215                matches!(
216                    sqlite_error,
217                    rusqlite::Error::SqliteFailure(code, _)
218                        if matches!(
219                            code.code,
220                            rusqlite::ErrorCode::DatabaseBusy
221                                | rusqlite::ErrorCode::DatabaseLocked
222                        )
223                )
224            })
225    }) || error.to_string().contains("database is locked")
226}
227
228#[cfg(test)]
229#[allow(clippy::items_after_test_module)]
230mod tests {
231    use super::is_lock_contention_error;
232
233    #[test]
234    fn detects_sqlite_lock_contention_errors() {
235        let error = anyhow::Error::new(rusqlite::Error::SqliteFailure(
236            rusqlite::ffi::Error {
237                code: rusqlite::ErrorCode::DatabaseLocked,
238                extended_code: rusqlite::ffi::SQLITE_LOCKED,
239            },
240            Some("database is locked".to_owned()),
241        ));
242        assert!(is_lock_contention_error(&error));
243    }
244
245    #[test]
246    fn ignores_non_lock_errors() {
247        let error = anyhow::anyhow!("some other indexing failure");
248        assert!(!is_lock_contention_error(&error));
249    }
250}
251
252impl Drop for FileWatcher {
253    fn drop(&mut self) {
254        self.stop();
255    }
256}