Skip to main content

codelens_engine/
watcher.rs

1use crate::import_graph::GraphCache;
2use crate::symbols::SymbolIndex;
3use crate::vfs;
4use anyhow::Result;
5use notify::RecommendedWatcher;
6use notify_debouncer_mini::{DebouncedEventKind, Debouncer, new_debouncer};
7use std::path::{Path, PathBuf};
8use std::sync::{
9    Arc,
10    atomic::{AtomicBool, AtomicU64, Ordering},
11};
12use std::time::Duration;
13use tracing::{debug, warn};
14
15/// File watcher that automatically re-indexes changed files.
16pub struct FileWatcher {
17    _debouncer: Debouncer<RecommendedWatcher>,
18    running: Arc<AtomicBool>,
19    events_processed: Arc<AtomicU64>,
20    files_reindexed: Arc<AtomicU64>,
21    lock_contention_batches: Arc<AtomicU64>,
22}
23
24#[derive(Debug, Clone, serde::Serialize)]
25pub struct WatcherStats {
26    pub running: bool,
27    pub events_processed: u64,
28    pub files_reindexed: u64,
29    pub lock_contention_batches: u64,
30    /// Number of files that failed to index (available when symbol index is queried).
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub index_failures: Option<usize>,
33}
34
35impl FileWatcher {
36    /// Start watching the project root for file changes.
37    /// Changed files are automatically re-indexed via `SymbolIndex::index_files`
38    /// and the `GraphCache` is invalidated.
39    pub fn start(
40        root: &Path,
41        symbol_index: Arc<SymbolIndex>,
42        graph_cache: Arc<GraphCache>,
43    ) -> Result<Self> {
44        let running = Arc::new(AtomicBool::new(true));
45        let events_processed = Arc::new(AtomicU64::new(0));
46        let files_reindexed = Arc::new(AtomicU64::new(0));
47        let lock_contention_batches = Arc::new(AtomicU64::new(0));
48
49        let running_clone = running.clone();
50        let events_clone = events_processed.clone();
51        let files_clone = files_reindexed.clone();
52        let contention_clone = lock_contention_batches.clone();
53
54        let mut debouncer = new_debouncer(
55            Duration::from_millis(300),
56            move |res: Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>| {
57                if !running_clone.load(Ordering::Relaxed) {
58                    return;
59                }
60                let events = match res {
61                    Ok(events) => events,
62                    Err(e) => {
63                        warn!(error = %e, "file watcher error");
64                        return;
65                    }
66                };
67
68                // Classify raw watcher events into changed/removed
69                let mut raw_changed: Vec<PathBuf> = Vec::new();
70                let mut raw_removed: Vec<PathBuf> = Vec::new();
71
72                for event in &events {
73                    let path = &event.path;
74                    match event.kind {
75                        DebouncedEventKind::Any => {
76                            if path.is_file() {
77                                raw_changed.push(path.clone());
78                            } else {
79                                raw_removed.push(path.clone());
80                            }
81                        }
82                        DebouncedEventKind::AnyContinuous => {} // ongoing writes — skip
83                        _ => {}
84                    }
85                }
86
87                events_clone.fetch_add(events.len() as u64, Ordering::Relaxed);
88
89                // Normalize through VFS layer (filters, deduplicates, detects renames)
90                let file_events = vfs::normalize_events(&raw_changed, &raw_removed);
91                let (changed, removed, renamed) = vfs::partition_events(&file_events);
92
93                debug!(
94                    changed = changed.len(),
95                    removed = removed.len(),
96                    renamed = renamed.len(),
97                    total_events = events.len(),
98                    "watcher batch processed"
99                );
100
101                if changed.is_empty() && removed.is_empty() {
102                    return;
103                }
104
105                let mut reindexed = 0u64;
106                if !changed.is_empty() {
107                    match index_files_with_retry(&symbol_index, &changed) {
108                        Ok(n) => {
109                            reindexed += n as u64;
110                            // Clear failures for successfully indexed files
111                            let db = symbol_index.db();
112                            for file in &changed {
113                                let rel = file.to_string_lossy();
114                                let _ = db.clear_index_failure(&rel);
115                            }
116                        }
117                        Err(e) => {
118                            if is_lock_contention_error(&e) {
119                                contention_clone.fetch_add(1, Ordering::Relaxed);
120                                debug!(
121                                    error = %e,
122                                    count = changed.len(),
123                                    "index_files batch skipped after lock contention retries"
124                                );
125                                return;
126                            }
127                            warn!(error = %e, count = changed.len(), "index_files batch failed");
128                            // Record failure for each file in the batch
129                            let db = symbol_index.db();
130                            for file in &changed {
131                                let rel = file.to_string_lossy();
132                                let _ = db.record_index_failure(
133                                    &rel,
134                                    "index_batch_error",
135                                    &e.to_string(),
136                                );
137                            }
138                        }
139                    }
140                }
141                if !removed.is_empty() {
142                    match symbol_index.remove_files(&removed) {
143                        Ok(n) => reindexed += n as u64,
144                        Err(e) => warn!(error = %e, "remove_files failed"),
145                    }
146                }
147
148                if reindexed > 0 {
149                    graph_cache.invalidate();
150                    // Invalidate FTS index so next search triggers lazy rebuild
151                    let _ = symbol_index.db().invalidate_fts();
152                    files_clone.fetch_add(reindexed, Ordering::Relaxed);
153                    debug!(reindexed, "graph cache + FTS invalidated");
154                }
155            },
156        )?;
157
158        // Watch the project root recursively
159        debouncer
160            .watcher()
161            .watch(root, notify::RecursiveMode::Recursive)?;
162
163        Ok(Self {
164            _debouncer: debouncer,
165            running,
166            events_processed,
167            files_reindexed,
168            lock_contention_batches,
169        })
170    }
171
172    pub fn stats(&self) -> WatcherStats {
173        WatcherStats {
174            running: self.running.load(Ordering::Relaxed),
175            events_processed: self.events_processed.load(Ordering::Relaxed),
176            files_reindexed: self.files_reindexed.load(Ordering::Relaxed),
177            lock_contention_batches: self.lock_contention_batches.load(Ordering::Relaxed),
178            index_failures: None,
179        }
180    }
181
182    pub fn stop(&self) {
183        self.running.store(false, Ordering::Relaxed);
184    }
185}
186
187fn index_files_with_retry(symbol_index: &SymbolIndex, changed: &[PathBuf]) -> Result<usize> {
188    const RETRY_DELAYS_MS: [u64; 2] = [100, 250];
189
190    match symbol_index.index_files(changed) {
191        Ok(count) => Ok(count),
192        Err(error) if is_lock_contention_error(&error) => {
193            for delay_ms in RETRY_DELAYS_MS {
194                std::thread::sleep(Duration::from_millis(delay_ms));
195                match symbol_index.index_files(changed) {
196                    Ok(count) => return Ok(count),
197                    Err(retry_error) if is_lock_contention_error(&retry_error) => continue,
198                    Err(retry_error) => return Err(retry_error),
199                }
200            }
201            Err(error)
202        }
203        Err(error) => Err(error),
204    }
205}
206
207fn is_lock_contention_error(error: &anyhow::Error) -> bool {
208    error.chain().any(|cause| {
209        cause
210            .downcast_ref::<rusqlite::Error>()
211            .is_some_and(|sqlite_error| {
212                matches!(
213                    sqlite_error,
214                    rusqlite::Error::SqliteFailure(code, _)
215                        if matches!(
216                            code.code,
217                            rusqlite::ErrorCode::DatabaseBusy
218                                | rusqlite::ErrorCode::DatabaseLocked
219                        )
220                )
221            })
222    }) || error.to_string().contains("database is locked")
223}
224
225#[cfg(test)]
226#[allow(clippy::items_after_test_module)]
227mod tests {
228    use super::is_lock_contention_error;
229
230    #[test]
231    fn detects_sqlite_lock_contention_errors() {
232        let error = anyhow::Error::new(rusqlite::Error::SqliteFailure(
233            rusqlite::ffi::Error {
234                code: rusqlite::ErrorCode::DatabaseLocked,
235                extended_code: rusqlite::ffi::SQLITE_LOCKED,
236            },
237            Some("database is locked".to_owned()),
238        ));
239        assert!(is_lock_contention_error(&error));
240    }
241
242    #[test]
243    fn ignores_non_lock_errors() {
244        let error = anyhow::anyhow!("some other indexing failure");
245        assert!(!is_lock_contention_error(&error));
246    }
247}
248
249impl Drop for FileWatcher {
250    fn drop(&mut self) {
251        self.stop();
252    }
253}