codelens_engine/
watcher.rs1use crate::import_graph::GraphCache;
2use crate::symbols::SymbolIndex;
3use crate::vfs;
4use anyhow::Result;
5use notify::RecommendedWatcher;
6use notify_debouncer_mini::{DebouncedEventKind, Debouncer, new_debouncer};
7use std::path::{Path, PathBuf};
8use std::sync::{
9 Arc,
10 atomic::{AtomicBool, AtomicU64, Ordering},
11};
12use std::time::Duration;
13use tracing::{debug, warn};
14
15pub struct FileWatcher {
17 _debouncer: Debouncer<RecommendedWatcher>,
18 running: Arc<AtomicBool>,
19 events_processed: Arc<AtomicU64>,
20 files_reindexed: Arc<AtomicU64>,
21 lock_contention_batches: Arc<AtomicU64>,
22}
23
24#[derive(Debug, Clone, serde::Serialize)]
25pub struct WatcherStats {
26 pub running: bool,
27 pub events_processed: u64,
28 pub files_reindexed: u64,
29 pub lock_contention_batches: u64,
30 #[serde(skip_serializing_if = "Option::is_none")]
32 pub index_failures: Option<usize>,
33}
34
35impl FileWatcher {
36 pub fn start(
40 root: &Path,
41 symbol_index: Arc<SymbolIndex>,
42 graph_cache: Arc<GraphCache>,
43 ) -> Result<Self> {
44 let running = Arc::new(AtomicBool::new(true));
45 let events_processed = Arc::new(AtomicU64::new(0));
46 let files_reindexed = Arc::new(AtomicU64::new(0));
47 let lock_contention_batches = Arc::new(AtomicU64::new(0));
48
49 let running_clone = running.clone();
50 let events_clone = events_processed.clone();
51 let files_clone = files_reindexed.clone();
52 let contention_clone = lock_contention_batches.clone();
53
54 let mut debouncer = new_debouncer(
55 Duration::from_millis(300),
56 move |res: Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>| {
57 if !running_clone.load(Ordering::Relaxed) {
58 return;
59 }
60 let events = match res {
61 Ok(events) => events,
62 Err(e) => {
63 warn!(error = %e, "file watcher error");
64 return;
65 }
66 };
67
68 let mut raw_changed: Vec<PathBuf> = Vec::new();
70 let mut raw_removed: Vec<PathBuf> = Vec::new();
71
72 for event in &events {
73 let path = &event.path;
74 match event.kind {
75 DebouncedEventKind::Any => {
76 if path.is_file() {
77 raw_changed.push(path.clone());
78 } else {
79 raw_removed.push(path.clone());
80 }
81 }
82 DebouncedEventKind::AnyContinuous => {} _ => {}
84 }
85 }
86
87 events_clone.fetch_add(events.len() as u64, Ordering::Relaxed);
88
89 let file_events = vfs::normalize_events(&raw_changed, &raw_removed);
91 let (changed, removed, renamed) = vfs::partition_events(&file_events);
92
93 debug!(
94 changed = changed.len(),
95 removed = removed.len(),
96 renamed = renamed.len(),
97 total_events = events.len(),
98 "watcher batch processed"
99 );
100
101 if changed.is_empty() && removed.is_empty() {
102 return;
103 }
104
105 let mut reindexed = 0u64;
106 if !changed.is_empty() {
107 match index_files_with_retry(&symbol_index, &changed) {
108 Ok(n) => {
109 reindexed += n as u64;
110 let db = symbol_index.db();
112 for file in &changed {
113 let rel = file.to_string_lossy();
114 let _ = db.clear_index_failure(&rel);
115 }
116 }
117 Err(e) => {
118 if is_lock_contention_error(&e) {
119 contention_clone.fetch_add(1, Ordering::Relaxed);
120 debug!(
121 error = %e,
122 count = changed.len(),
123 "index_files batch skipped after lock contention retries"
124 );
125 return;
126 }
127 warn!(error = %e, count = changed.len(), "index_files batch failed");
128 let db = symbol_index.db();
130 for file in &changed {
131 let rel = file.to_string_lossy();
132 let _ = db.record_index_failure(
133 &rel,
134 "index_batch_error",
135 &e.to_string(),
136 );
137 }
138 }
139 }
140 }
141 if !removed.is_empty() {
142 match symbol_index.remove_files(&removed) {
143 Ok(n) => reindexed += n as u64,
144 Err(e) => warn!(error = %e, "remove_files failed"),
145 }
146 }
147
148 if reindexed > 0 {
149 graph_cache.invalidate();
150 let _ = symbol_index.db().invalidate_fts();
152 files_clone.fetch_add(reindexed, Ordering::Relaxed);
153 debug!(reindexed, "graph cache + FTS invalidated");
154 }
155 },
156 )?;
157
158 debouncer
160 .watcher()
161 .watch(root, notify::RecursiveMode::Recursive)?;
162
163 Ok(Self {
164 _debouncer: debouncer,
165 running,
166 events_processed,
167 files_reindexed,
168 lock_contention_batches,
169 })
170 }
171
172 pub fn stats(&self) -> WatcherStats {
173 WatcherStats {
174 running: self.running.load(Ordering::Relaxed),
175 events_processed: self.events_processed.load(Ordering::Relaxed),
176 files_reindexed: self.files_reindexed.load(Ordering::Relaxed),
177 lock_contention_batches: self.lock_contention_batches.load(Ordering::Relaxed),
178 index_failures: None,
179 }
180 }
181
182 pub fn stop(&self) {
183 self.running.store(false, Ordering::Relaxed);
184 }
185}
186
187fn index_files_with_retry(symbol_index: &SymbolIndex, changed: &[PathBuf]) -> Result<usize> {
188 const RETRY_DELAYS_MS: [u64; 2] = [100, 250];
189
190 match symbol_index.index_files(changed) {
191 Ok(count) => Ok(count),
192 Err(error) if is_lock_contention_error(&error) => {
193 for delay_ms in RETRY_DELAYS_MS {
194 std::thread::sleep(Duration::from_millis(delay_ms));
195 match symbol_index.index_files(changed) {
196 Ok(count) => return Ok(count),
197 Err(retry_error) if is_lock_contention_error(&retry_error) => continue,
198 Err(retry_error) => return Err(retry_error),
199 }
200 }
201 Err(error)
202 }
203 Err(error) => Err(error),
204 }
205}
206
207fn is_lock_contention_error(error: &anyhow::Error) -> bool {
208 error.chain().any(|cause| {
209 cause
210 .downcast_ref::<rusqlite::Error>()
211 .is_some_and(|sqlite_error| {
212 matches!(
213 sqlite_error,
214 rusqlite::Error::SqliteFailure(code, _)
215 if matches!(
216 code.code,
217 rusqlite::ErrorCode::DatabaseBusy
218 | rusqlite::ErrorCode::DatabaseLocked
219 )
220 )
221 })
222 }) || error.to_string().contains("database is locked")
223}
224
225#[cfg(test)]
226#[allow(clippy::items_after_test_module)]
227mod tests {
228 use super::is_lock_contention_error;
229
230 #[test]
231 fn detects_sqlite_lock_contention_errors() {
232 let error = anyhow::Error::new(rusqlite::Error::SqliteFailure(
233 rusqlite::ffi::Error {
234 code: rusqlite::ErrorCode::DatabaseLocked,
235 extended_code: rusqlite::ffi::SQLITE_LOCKED,
236 },
237 Some("database is locked".to_owned()),
238 ));
239 assert!(is_lock_contention_error(&error));
240 }
241
242 #[test]
243 fn ignores_non_lock_errors() {
244 let error = anyhow::anyhow!("some other indexing failure");
245 assert!(!is_lock_contention_error(&error));
246 }
247}
248
249impl Drop for FileWatcher {
250 fn drop(&mut self) {
251 self.stop();
252 }
253}