1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::collections::HashSet;
6use std::mem::ManuallyDrop;
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::mpsc;
10use std::sync::mpsc::Receiver;
11use std::sync::mpsc::RecvTimeoutError;
12use std::time::Duration;
13
14use globset::GlobBuilder;
15use globset::GlobSet;
16use globset::GlobSetBuilder;
17use notify::Config;
18use notify::Event;
19use notify::EventKind;
20use notify::RecommendedWatcher;
21use notify::RecursiveMode;
22use notify::Watcher as NotifyWatcher;
23use notify::event::ModifyKind;
24
25use crate::Database;
26use crate::DatabaseReader;
27use crate::ReadDatabase;
28use crate::error::DatabaseError;
29use crate::exclusion::Exclusion;
30use crate::file::File;
31use crate::file::FileId;
32use crate::file::FileType;
33use crate::utils::bytes_to_path;
34
35const DEFAULT_POLL_INTERVAL_MS: u64 = 1000;
36const WAIT_INTERNAL_MS: u64 = 100;
37const WAIT_DEBOUNCE_MS: u64 = 300;
38const STABILITY_CHECK_MS: u64 = 10;
39
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41struct ChangedFile {
42 id: FileId,
43 path: PathBuf,
44}
45
46#[derive(Debug, Clone)]
48pub struct WatchOptions {
49 pub poll_interval: Option<Duration>,
50 pub additional_excludes: Vec<Exclusion<'static>>,
51}
52
53impl Default for WatchOptions {
54 #[inline]
55 fn default() -> Self {
56 Self { poll_interval: Some(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)), additional_excludes: vec![] }
57 }
58}
59
60pub struct DatabaseWatcher<'config> {
62 database: Database<'config>,
63 watcher: Option<RecommendedWatcher>,
64 watched_paths: Vec<PathBuf>,
65 receiver: Option<Receiver<Vec<ChangedFile>>>,
66}
67
68impl<'config> DatabaseWatcher<'config> {
69 #[inline]
70 #[must_use]
71 pub fn new(database: Database<'config>) -> Self {
72 Self { database, watcher: None, watched_paths: Vec::new(), receiver: None }
73 }
74
75 #[inline]
84 pub fn watch(&mut self, options: WatchOptions) -> Result<(), DatabaseError> {
85 self.stop();
86
87 let config = &self.database.configuration;
88
89 let (tx, rx) = mpsc::channel();
90
91 let mut all_exclusions = vec![
92 Exclusion::Pattern(Cow::Borrowed("**/node_modules/**")),
93 Exclusion::Pattern(Cow::Borrowed("**/.git/**")),
94 Exclusion::Pattern(Cow::Borrowed("**/.idea/**")),
95 Exclusion::Pattern(Cow::Borrowed("**/vendor/**")),
96 ];
97 all_exclusions.extend(config.excludes.iter().cloned());
98 all_exclusions.extend(options.additional_excludes);
99
100 let glob_settings = &config.glob;
101 let mut glob_builder = GlobSetBuilder::new();
102 for ex in &all_exclusions {
103 if let Exclusion::Pattern(pat) = ex {
104 let glob = GlobBuilder::new(pat)
105 .case_insensitive(glob_settings.case_insensitive)
106 .literal_separator(glob_settings.literal_separator)
107 .backslash_escape(glob_settings.backslash_escape)
108 .empty_alternates(glob_settings.empty_alternates)
109 .build()?;
110 glob_builder.add(glob);
111 }
112 }
113
114 let glob_excludes = glob_builder.build()?;
115
116 let path_excludes: HashSet<PathBuf> = all_exclusions
117 .iter()
118 .filter_map(|ex| match ex {
119 Exclusion::Path(p) => Some(p.as_ref().to_path_buf()),
120 Exclusion::Pattern(_) => None,
121 })
122 .collect();
123
124 let extensions: HashSet<Vec<u8>> = config.extensions.iter().map(|c| c.to_vec()).collect();
125 let workspace = config.workspace.as_ref().to_path_buf();
126
127 let mut unique_watch_paths = HashSet::new();
132
133 for path in &config.paths {
134 let watch_path = Self::extract_watch_path(path.as_ref());
135 let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
136
137 unique_watch_paths.insert(absolute_path);
138 }
139
140 for path in &config.includes {
141 let watch_path = Self::extract_watch_path(path.as_ref());
142 let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
143
144 unique_watch_paths.insert(absolute_path);
145 }
146
147 let explicit_watch_paths: Vec<PathBuf> = unique_watch_paths
148 .iter()
149 .filter(|wp| glob_excludes.is_match(wp.as_path()) || path_excludes.contains(wp.as_path()))
150 .cloned()
151 .collect();
152
153 let mut watcher = RecommendedWatcher::new(
154 move |res: Result<Event, notify::Error>| {
155 if let Ok(event) = res
156 && let Some(changed) = Self::handle_event(
157 event,
158 &workspace,
159 &glob_excludes,
160 &path_excludes,
161 &extensions,
162 &explicit_watch_paths,
163 )
164 {
165 let _ = tx.send(changed);
166 }
167 },
168 Config::default()
169 .with_poll_interval(options.poll_interval.unwrap_or(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS))),
170 )
171 .map_err(DatabaseError::WatcherInit)?;
172
173 let mut watched_paths = Vec::new();
174 for path in unique_watch_paths {
175 watcher.watch(&path, RecursiveMode::Recursive).map_err(DatabaseError::WatcherWatch)?;
176 watched_paths.push(path.clone());
177 tracing::debug!("Watching path: {}", path.display());
178 }
179
180 tracing::info!("Database watcher started for workspace: {}", config.workspace.display());
181
182 self.watcher = Some(watcher);
183 self.watched_paths = watched_paths;
184 self.receiver = Some(rx);
185
186 Ok(())
187 }
188
189 #[inline]
191 pub fn stop(&mut self) {
192 if let Some(mut watcher) = self.watcher.take() {
193 for path in &self.watched_paths {
194 let _ = watcher.unwatch(path);
195 tracing::debug!("Stopped watching: {}", path.display());
196 }
197 }
198 self.watched_paths.clear();
199 self.receiver = None;
200 }
201
202 #[inline]
204 #[must_use]
205 pub fn is_watching(&self) -> bool {
206 self.watcher.is_some()
207 }
208
209 fn extract_watch_path(pattern: &[u8]) -> PathBuf {
220 let is_glob =
221 pattern.contains(&b'*') || pattern.contains(&b'?') || pattern.contains(&b'[') || pattern.contains(&b'{');
222
223 if !is_glob {
224 return bytes_to_path(pattern).into_owned();
225 }
226
227 let first_glob_pos =
228 pattern.iter().position(|&b| matches!(b, b'*' | b'?' | b'[' | b'{')).unwrap_or(pattern.len());
229
230 let base = &pattern[..first_glob_pos];
231
232 let mut end = base.len();
233 while end > 0 && matches!(base[end - 1], b'/' | b'\\') {
234 end -= 1;
235 }
236 let base = &base[..end];
237
238 if base.is_empty() { PathBuf::from(".") } else { bytes_to_path(base).into_owned() }
239 }
240
241 fn handle_event(
242 event: Event,
243 workspace: &Path,
244 glob_excludes: &GlobSet,
245 path_excludes: &HashSet<PathBuf>,
246 extensions: &HashSet<Vec<u8>>,
247 explicit_watch_paths: &[PathBuf],
248 ) -> Option<Vec<ChangedFile>> {
249 tracing::debug!("Watcher received event: kind={:?}, paths={:?}", event.kind, event.paths);
250
251 if let EventKind::Other | EventKind::Any | EventKind::Access(_) | EventKind::Modify(ModifyKind::Metadata(_)) =
252 event.kind
253 {
254 tracing::debug!("Ignoring non-modification event: {:?}", event.kind);
255
256 return None;
257 }
258
259 let mut changed_files = Vec::new();
260
261 for path in event.paths {
262 if let Some(ext) = path.extension() {
264 if !extensions.contains(ext.as_encoded_bytes()) {
265 continue;
266 }
267 } else {
268 continue;
269 }
270
271 let is_explicitly_watched = explicit_watch_paths.iter().any(|wp| path.starts_with(wp));
272 if !is_explicitly_watched {
273 if glob_excludes.is_match(&path) {
275 tracing::debug!("Skipping path excluded by pattern: {}", path.display());
276 continue;
277 }
278
279 if path_excludes.contains(&path) {
281 tracing::debug!("Skipping excluded path: {}", path.display());
282 continue;
283 }
284
285 let mut should_skip = false;
287 for ancestor in path.ancestors().skip(1) {
288 if path_excludes.contains(ancestor) {
289 tracing::debug!("Skipping path under excluded directory: {}", path.display());
290 should_skip = true;
291 break;
292 }
293 }
294
295 if should_skip {
296 continue;
297 }
298 }
299
300 #[cfg(windows)]
302 let logical_name = path
303 .strip_prefix(workspace)
304 .unwrap_or(&path)
305 .as_os_str()
306 .as_encoded_bytes()
307 .iter()
308 .map(|i| if *i == b'\\' { b'/' } else { *i })
309 .collect::<Vec<_>>();
310 #[cfg(not(windows))]
311 let logical_name = path.strip_prefix(workspace).unwrap_or(&path).as_os_str().as_encoded_bytes().to_owned();
312
313 let file_id = FileId::new(&logical_name);
314
315 changed_files.push(ChangedFile { id: file_id, path: path.clone() });
316 }
317
318 if changed_files.is_empty() { None } else { Some(changed_files) }
319 }
320
321 #[inline]
332 pub fn wait(&mut self) -> Result<Vec<FileId>, DatabaseError> {
333 let Some(receiver) = &self.receiver else {
334 return Err(DatabaseError::WatcherNotActive);
335 };
336
337 let config = &self.database.configuration;
338 let workspace = config.workspace.as_ref().to_path_buf();
339
340 match receiver.recv_timeout(Duration::from_millis(WAIT_INTERNAL_MS)) {
341 Ok(changed_files) => {
342 let mut all_changed = changed_files;
343 loop {
344 match receiver.recv_timeout(Duration::from_millis(WAIT_DEBOUNCE_MS)) {
345 Ok(more) => all_changed.extend(more),
346 Err(RecvTimeoutError::Timeout) => break,
347 Err(RecvTimeoutError::Disconnected) => {
348 self.stop();
349 return Err(DatabaseError::WatcherNotActive);
350 }
351 }
352 }
353
354 let mut latest_changes: HashMap<FileId, ChangedFile> = HashMap::new();
355 for changed in all_changed {
356 latest_changes.insert(changed.id, changed);
357 }
358 let all_changed: Vec<ChangedFile> = latest_changes.into_values().collect();
359 let mut changed_ids = Vec::new();
360
361 for changed_file in &all_changed {
362 changed_ids.push(changed_file.id);
363
364 let Ok(file) = self.database.get(&changed_file.id) else {
365 if changed_file.path.exists() {
366 match File::read(&workspace, &changed_file.path, FileType::Host) {
367 Ok(file) => {
368 self.database.add(file);
369 tracing::debug!("Added new file to database: {}", changed_file.path.display());
370 }
371 Err(e) => {
372 tracing::error!("Failed to load new file {}: {}", changed_file.path.display(), e);
373 }
374 }
375 }
376
377 continue;
378 };
379
380 if !changed_file.path.exists() {
381 self.database.delete(changed_file.id);
382 tracing::trace!("Deleted file from database: {}", String::from_utf8_lossy(&file.name));
383 continue;
384 }
385
386 match Self::read_stable_contents(&changed_file.path) {
387 Ok(contents) => {
388 if self.database.update(changed_file.id, Cow::Owned(contents)) {
389 tracing::trace!("Updated file in database: {}", String::from_utf8_lossy(&file.name));
390 } else {
391 tracing::warn!(
392 "Failed to update file in database (ID not found): {}",
393 String::from_utf8_lossy(&file.name)
394 );
395 }
396 }
397 Err(e) => {
398 tracing::error!("Failed to read file {}: {}", changed_file.path.display(), e);
399 }
400 }
401 }
402
403 Ok(changed_ids)
404 }
405 Err(RecvTimeoutError::Timeout) => Ok(Vec::new()),
406 Err(RecvTimeoutError::Disconnected) => {
407 self.stop();
408 Err(DatabaseError::WatcherNotActive)
409 }
410 }
411 }
412
413 fn read_stable_contents(path: &Path) -> std::io::Result<Vec<u8>> {
419 let contents = std::fs::read(path)?;
420
421 std::thread::sleep(Duration::from_millis(STABILITY_CHECK_MS));
422
423 if path.exists()
424 && let Ok(reread) = std::fs::read(path)
425 && reread != contents
426 {
427 tracing::debug!("File content changed during stability check: {}", path.display());
428
429 return Ok(reread);
430 }
431
432 Ok(contents)
433 }
434
435 #[inline]
437 #[must_use]
438 pub fn database(&self) -> &Database<'config> {
439 &self.database
440 }
441
442 #[inline]
444 #[must_use]
445 pub fn read_only_database(&self) -> ReadDatabase {
446 self.database.read_only()
447 }
448
449 #[inline]
451 pub fn database_mut(&mut self) -> &mut Database<'config> {
452 &mut self.database
453 }
454
455 #[inline]
464 pub fn with_database_mut<F, R>(&mut self, f: F) -> R
465 where
466 F: for<'borrow> FnOnce(&'borrow mut Database<'config>) -> R,
467 {
468 f(&mut self.database)
469 }
470
471 #[inline]
473 #[must_use]
474 pub fn into_database(self) -> Database<'config> {
475 let mut md = ManuallyDrop::new(self);
476 md.stop();
477 unsafe { std::ptr::read(&raw const md.database) }
480 }
481}
482
483impl Drop for DatabaseWatcher<'_> {
484 #[inline]
485 fn drop(&mut self) {
486 self.stop();
487 }
488}