codelens_engine/
watcher.rs1use crate::import_graph::GraphCache;
2use crate::symbols::SymbolIndex;
3use crate::vfs;
4use anyhow::Result;
5use notify::RecommendedWatcher;
6use notify_debouncer_mini::{DebouncedEventKind, Debouncer, new_debouncer};
7use std::path::{Path, PathBuf};
8use std::sync::{
9 Arc,
10 atomic::{AtomicBool, AtomicU64, Ordering},
11};
12use std::time::Duration;
13use tracing::{debug, warn};
14
15pub struct FileWatcher {
17 _debouncer: Debouncer<RecommendedWatcher>,
18 running: Arc<AtomicBool>,
19 events_processed: Arc<AtomicU64>,
20 files_reindexed: Arc<AtomicU64>,
21 lock_contention_batches: Arc<AtomicU64>,
22}
23
24#[derive(Debug, Clone, serde::Serialize)]
25pub struct WatcherStats {
26 pub running: bool,
27 pub events_processed: u64,
28 pub files_reindexed: u64,
29 pub lock_contention_batches: u64,
30 #[serde(skip_serializing_if = "Option::is_none")]
32 pub index_failures: Option<usize>,
33}
34
35impl FileWatcher {
36 pub fn start(
40 root: &Path,
41 symbol_index: Arc<SymbolIndex>,
42 graph_cache: Arc<GraphCache>,
43 ) -> Result<Self> {
44 let running = Arc::new(AtomicBool::new(true));
45 let events_processed = Arc::new(AtomicU64::new(0));
46 let files_reindexed = Arc::new(AtomicU64::new(0));
47 let lock_contention_batches = Arc::new(AtomicU64::new(0));
48
49 let running_clone = running.clone();
50 let events_clone = events_processed.clone();
51 let files_clone = files_reindexed.clone();
52 let contention_clone = lock_contention_batches.clone();
53 let watch_root = root.to_path_buf();
56
57 let mut debouncer = new_debouncer(
58 Duration::from_millis(300),
59 move |res: Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>| {
60 if !running_clone.load(Ordering::Relaxed) {
61 return;
62 }
63 let events = match res {
64 Ok(events) => events,
65 Err(e) => {
66 warn!(error = %e, "file watcher error");
67 return;
68 }
69 };
70
71 let mut raw_changed: Vec<PathBuf> = Vec::new();
73 let mut raw_removed: Vec<PathBuf> = Vec::new();
74
75 for event in &events {
76 let path = &event.path;
77 match event.kind {
78 DebouncedEventKind::Any => {
79 if path.is_file() {
80 raw_changed.push(path.clone());
81 } else {
82 raw_removed.push(path.clone());
83 }
84 }
85 DebouncedEventKind::AnyContinuous => {} _ => {}
87 }
88 }
89
90 events_clone.fetch_add(events.len() as u64, Ordering::Relaxed);
91
92 let file_events = vfs::normalize_events(&watch_root, &raw_changed, &raw_removed);
94 let (changed, removed, renamed) = vfs::partition_events(&file_events);
95
96 debug!(
97 changed = changed.len(),
98 removed = removed.len(),
99 renamed = renamed.len(),
100 total_events = events.len(),
101 "watcher batch processed"
102 );
103
104 if changed.is_empty() && removed.is_empty() {
105 return;
106 }
107
108 let mut reindexed = 0u64;
109 if !changed.is_empty() {
110 match index_files_with_retry(&symbol_index, &changed) {
111 Ok(n) => {
112 reindexed += n as u64;
113 let db = symbol_index.db();
115 for file in &changed {
116 let rel = file.to_string_lossy();
117 let _ = db.clear_index_failure(&rel);
118 }
119 }
120 Err(e) => {
121 if is_lock_contention_error(&e) {
122 contention_clone.fetch_add(1, Ordering::Relaxed);
123 debug!(
124 error = %e,
125 count = changed.len(),
126 "index_files batch skipped after lock contention retries"
127 );
128 return;
129 }
130 warn!(error = %e, count = changed.len(), "index_files batch failed");
131 let db = symbol_index.db();
133 for file in &changed {
134 let rel = file.to_string_lossy();
135 let _ = db.record_index_failure(
136 &rel,
137 "index_batch_error",
138 &e.to_string(),
139 );
140 }
141 }
142 }
143 }
144 if !removed.is_empty() {
145 match symbol_index.remove_files(&removed) {
146 Ok(n) => reindexed += n as u64,
147 Err(e) => warn!(error = %e, "remove_files failed"),
148 }
149 }
150
151 if reindexed > 0 {
152 graph_cache.invalidate();
153 let _ = symbol_index.db().invalidate_fts();
155 files_clone.fetch_add(reindexed, Ordering::Relaxed);
156 debug!(reindexed, "graph cache + FTS invalidated");
157 }
158 },
159 )?;
160
161 debouncer
163 .watcher()
164 .watch(root, notify::RecursiveMode::Recursive)?;
165
166 Ok(Self {
167 _debouncer: debouncer,
168 running,
169 events_processed,
170 files_reindexed,
171 lock_contention_batches,
172 })
173 }
174
175 pub fn stats(&self) -> WatcherStats {
176 WatcherStats {
177 running: self.running.load(Ordering::Relaxed),
178 events_processed: self.events_processed.load(Ordering::Relaxed),
179 files_reindexed: self.files_reindexed.load(Ordering::Relaxed),
180 lock_contention_batches: self.lock_contention_batches.load(Ordering::Relaxed),
181 index_failures: None,
182 }
183 }
184
185 pub fn stop(&self) {
186 self.running.store(false, Ordering::Relaxed);
187 }
188}
189
190fn index_files_with_retry(symbol_index: &SymbolIndex, changed: &[PathBuf]) -> Result<usize> {
191 const RETRY_DELAYS_MS: [u64; 2] = [100, 250];
192
193 match symbol_index.index_files(changed) {
194 Ok(count) => Ok(count),
195 Err(error) if is_lock_contention_error(&error) => {
196 for delay_ms in RETRY_DELAYS_MS {
197 std::thread::sleep(Duration::from_millis(delay_ms));
198 match symbol_index.index_files(changed) {
199 Ok(count) => return Ok(count),
200 Err(retry_error) if is_lock_contention_error(&retry_error) => continue,
201 Err(retry_error) => return Err(retry_error),
202 }
203 }
204 Err(error)
205 }
206 Err(error) => Err(error),
207 }
208}
209
210fn is_lock_contention_error(error: &anyhow::Error) -> bool {
211 error.chain().any(|cause| {
212 cause
213 .downcast_ref::<rusqlite::Error>()
214 .is_some_and(|sqlite_error| {
215 matches!(
216 sqlite_error,
217 rusqlite::Error::SqliteFailure(code, _)
218 if matches!(
219 code.code,
220 rusqlite::ErrorCode::DatabaseBusy
221 | rusqlite::ErrorCode::DatabaseLocked
222 )
223 )
224 })
225 }) || error.to_string().contains("database is locked")
226}
227
228#[cfg(test)]
229#[allow(clippy::items_after_test_module)]
230mod tests {
231 use super::is_lock_contention_error;
232
233 #[test]
234 fn detects_sqlite_lock_contention_errors() {
235 let error = anyhow::Error::new(rusqlite::Error::SqliteFailure(
236 rusqlite::ffi::Error {
237 code: rusqlite::ErrorCode::DatabaseLocked,
238 extended_code: rusqlite::ffi::SQLITE_LOCKED,
239 },
240 Some("database is locked".to_owned()),
241 ));
242 assert!(is_lock_contention_error(&error));
243 }
244
245 #[test]
246 fn ignores_non_lock_errors() {
247 let error = anyhow::anyhow!("some other indexing failure");
248 assert!(!is_lock_contention_error(&error));
249 }
250}
251
252impl Drop for FileWatcher {
253 fn drop(&mut self) {
254 self.stop();
255 }
256}