1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::collections::HashSet;
6use std::mem::ManuallyDrop;
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::mpsc;
10use std::sync::mpsc::Receiver;
11use std::sync::mpsc::RecvTimeoutError;
12use std::time::Duration;
13
14use globset::GlobBuilder;
15use globset::GlobSet;
16use globset::GlobSetBuilder;
17use notify::Config;
18use notify::Event;
19use notify::EventKind;
20use notify::RecommendedWatcher;
21use notify::RecursiveMode;
22use notify::Watcher as NotifyWatcher;
23use notify::event::ModifyKind;
24
25use crate::Database;
26use crate::DatabaseReader;
27use crate::ReadDatabase;
28use crate::error::DatabaseError;
29use crate::exclusion::Exclusion;
30use crate::file::File;
31use crate::file::FileId;
32use crate::file::FileType;
33use crate::loader::calculate_pattern_specificity;
34use crate::loader::resolve_file_type;
35use crate::utils::bytes_to_path;
36
37const DEFAULT_POLL_INTERVAL_MS: u64 = 1000;
38const WAIT_INTERNAL_MS: u64 = 100;
39const WAIT_DEBOUNCE_MS: u64 = 300;
40const STABILITY_CHECK_MS: u64 = 10;
41
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43struct ChangedFile {
44 id: FileId,
45 path: PathBuf,
46}
47
48#[derive(Debug, Clone)]
50pub struct WatchOptions {
51 pub poll_interval: Option<Duration>,
52 pub additional_excludes: Vec<Exclusion<'static>>,
53}
54
55impl Default for WatchOptions {
56 #[inline]
57 fn default() -> Self {
58 Self { poll_interval: Some(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)), additional_excludes: vec![] }
59 }
60}
61
62pub struct DatabaseWatcher<'config> {
64 database: Database<'config>,
65 watcher: Option<RecommendedWatcher>,
66 watched_paths: Vec<PathBuf>,
67 receiver: Option<Receiver<Vec<ChangedFile>>>,
68 host_base_paths: Vec<(PathBuf, usize)>,
74 patch_base_paths: Vec<(PathBuf, usize)>,
75 include_base_paths: Vec<(PathBuf, usize)>,
76}
77
78impl<'config> DatabaseWatcher<'config> {
79 #[inline]
80 #[must_use]
81 pub fn new(database: Database<'config>) -> Self {
82 Self {
83 database,
84 watcher: None,
85 watched_paths: Vec::new(),
86 receiver: None,
87 host_base_paths: Vec::new(),
88 patch_base_paths: Vec::new(),
89 include_base_paths: Vec::new(),
90 }
91 }
92
93 #[inline]
102 pub fn watch(&mut self, options: WatchOptions) -> Result<(), DatabaseError> {
103 self.stop();
104
105 let config = &self.database.configuration;
106
107 let (tx, rx) = mpsc::channel();
108
109 let mut all_exclusions = vec![
110 Exclusion::Pattern(Cow::Borrowed("**/node_modules/**")),
111 Exclusion::Pattern(Cow::Borrowed("**/.git/**")),
112 Exclusion::Pattern(Cow::Borrowed("**/.idea/**")),
113 Exclusion::Pattern(Cow::Borrowed("**/vendor/**")),
114 ];
115 all_exclusions.extend(config.excludes.iter().cloned());
116 all_exclusions.extend(options.additional_excludes);
117
118 let glob_settings = &config.glob;
119 let mut glob_builder = GlobSetBuilder::new();
120 for ex in &all_exclusions {
121 if let Exclusion::Pattern(pat) = ex {
122 let glob = GlobBuilder::new(pat)
123 .case_insensitive(glob_settings.case_insensitive)
124 .literal_separator(glob_settings.literal_separator)
125 .backslash_escape(glob_settings.backslash_escape)
126 .empty_alternates(glob_settings.empty_alternates)
127 .build()?;
128 glob_builder.add(glob);
129 }
130 }
131
132 let glob_excludes = glob_builder.build()?;
133
134 let path_excludes: HashSet<PathBuf> = all_exclusions
135 .iter()
136 .filter_map(|ex| match ex {
137 Exclusion::Path(p) => Some(p.as_ref().to_path_buf()),
138 Exclusion::Pattern(_) => None,
139 })
140 .collect();
141
142 let extensions: HashSet<Vec<u8>> = config.extensions.iter().map(|c| c.to_vec()).collect();
143 let workspace = config.workspace.as_ref().to_path_buf();
144
145 let mut unique_watch_paths = HashSet::new();
150
151 let mut host_base_paths = Vec::new();
152 for path in &config.paths {
153 let specificity = calculate_pattern_specificity(path.as_ref());
156 let watch_path = Self::extract_watch_path(path.as_ref());
157 let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
158
159 let canonical = absolute_path.canonicalize().unwrap_or_else(|_| absolute_path.clone());
160 host_base_paths.push((canonical, specificity));
161 unique_watch_paths.insert(absolute_path);
162 }
163
164 let mut include_base_paths = Vec::new();
165 for path in &config.includes {
166 let specificity = calculate_pattern_specificity(path.as_ref());
167 let watch_path = Self::extract_watch_path(path.as_ref());
168 let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
169 let canonical = absolute_path.canonicalize().unwrap_or_else(|_| absolute_path.clone());
170 include_base_paths.push((canonical, specificity));
171 unique_watch_paths.insert(absolute_path);
172 }
173
174 let mut patch_base_paths = Vec::new();
175 for path in &config.patches {
176 let specificity = calculate_pattern_specificity(path.as_ref());
177 let watch_path = Self::extract_watch_path(path.as_ref());
178 let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
179 let canonical = absolute_path.canonicalize().unwrap_or_else(|_| absolute_path.clone());
180 patch_base_paths.push((canonical, specificity));
181 unique_watch_paths.insert(absolute_path);
182 }
183
184 let explicit_watch_paths: Vec<PathBuf> = unique_watch_paths
185 .iter()
186 .filter(|wp| glob_excludes.is_match(wp.as_path()) || path_excludes.contains(wp.as_path()))
187 .cloned()
188 .collect();
189
190 let mut watcher = RecommendedWatcher::new(
191 move |res: Result<Event, notify::Error>| {
192 if let Ok(event) = res
193 && let Some(changed) = Self::handle_event(
194 event,
195 &workspace,
196 &glob_excludes,
197 &path_excludes,
198 &extensions,
199 &explicit_watch_paths,
200 )
201 {
202 let _ = tx.send(changed);
203 }
204 },
205 Config::default()
206 .with_poll_interval(options.poll_interval.unwrap_or(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS))),
207 )
208 .map_err(DatabaseError::WatcherInit)?;
209
210 let mut watched_paths = Vec::new();
211 for path in unique_watch_paths {
212 watcher.watch(&path, RecursiveMode::Recursive).map_err(DatabaseError::WatcherWatch)?;
213 watched_paths.push(path.clone());
214 tracing::debug!("Watching path: {}", path.display());
215 }
216
217 tracing::info!("Database watcher started for workspace: {}", config.workspace.display());
218
219 self.watcher = Some(watcher);
220 self.watched_paths = watched_paths;
221 self.receiver = Some(rx);
222 self.host_base_paths = host_base_paths;
223 self.patch_base_paths = patch_base_paths;
224 self.include_base_paths = include_base_paths;
225
226 Ok(())
227 }
228
229 #[inline]
231 pub fn stop(&mut self) {
232 if let Some(mut watcher) = self.watcher.take() {
233 for path in &self.watched_paths {
234 let _ = watcher.unwatch(path);
235 tracing::debug!("Stopped watching: {}", path.display());
236 }
237 }
238 self.watched_paths.clear();
239 self.receiver = None;
240 self.host_base_paths.clear();
241 self.patch_base_paths.clear();
242 self.include_base_paths.clear();
243 }
244
245 #[inline]
247 #[must_use]
248 pub fn is_watching(&self) -> bool {
249 self.watcher.is_some()
250 }
251
252 fn extract_watch_path(pattern: &[u8]) -> PathBuf {
263 let is_glob =
264 pattern.contains(&b'*') || pattern.contains(&b'?') || pattern.contains(&b'[') || pattern.contains(&b'{');
265
266 if !is_glob {
267 return bytes_to_path(pattern).into_owned();
268 }
269
270 let first_glob_pos =
271 pattern.iter().position(|&b| matches!(b, b'*' | b'?' | b'[' | b'{')).unwrap_or(pattern.len());
272
273 let base = &pattern[..first_glob_pos];
274
275 let mut end = base.len();
276 while end > 0 && matches!(base[end - 1], b'/' | b'\\') {
277 end -= 1;
278 }
279 let base = &base[..end];
280
281 if base.is_empty() { PathBuf::from(".") } else { bytes_to_path(base).into_owned() }
282 }
283
284 fn handle_event(
285 event: Event,
286 workspace: &Path,
287 glob_excludes: &GlobSet,
288 path_excludes: &HashSet<PathBuf>,
289 extensions: &HashSet<Vec<u8>>,
290 explicit_watch_paths: &[PathBuf],
291 ) -> Option<Vec<ChangedFile>> {
292 tracing::debug!("Watcher received event: kind={:?}, paths={:?}", event.kind, event.paths);
293
294 if let EventKind::Other | EventKind::Any | EventKind::Access(_) | EventKind::Modify(ModifyKind::Metadata(_)) =
295 event.kind
296 {
297 tracing::debug!("Ignoring non-modification event: {:?}", event.kind);
298
299 return None;
300 }
301
302 let mut changed_files = Vec::new();
303
304 for path in event.paths {
305 if let Some(ext) = path.extension() {
307 if !extensions.contains(ext.as_encoded_bytes()) {
308 continue;
309 }
310 } else {
311 continue;
312 }
313
314 let is_explicitly_watched = explicit_watch_paths.iter().any(|wp| path.starts_with(wp));
315 if !is_explicitly_watched {
316 if glob_excludes.is_match(&path) {
318 tracing::debug!("Skipping path excluded by pattern: {}", path.display());
319 continue;
320 }
321
322 if path_excludes.contains(&path) {
324 tracing::debug!("Skipping excluded path: {}", path.display());
325 continue;
326 }
327
328 let mut should_skip = false;
330 for ancestor in path.ancestors().skip(1) {
331 if path_excludes.contains(ancestor) {
332 tracing::debug!("Skipping path under excluded directory: {}", path.display());
333 should_skip = true;
334 break;
335 }
336 }
337
338 if should_skip {
339 continue;
340 }
341 }
342
343 #[cfg(windows)]
345 let logical_name = path
346 .strip_prefix(workspace)
347 .unwrap_or(&path)
348 .as_os_str()
349 .as_encoded_bytes()
350 .iter()
351 .map(|i| if *i == b'\\' { b'/' } else { *i })
352 .collect::<Vec<_>>();
353 #[cfg(not(windows))]
354 let logical_name = path.strip_prefix(workspace).unwrap_or(&path).as_os_str().as_encoded_bytes().to_owned();
355
356 let file_id = FileId::new(&logical_name);
357
358 changed_files.push(ChangedFile { id: file_id, path: path.clone() });
359 }
360
361 if changed_files.is_empty() { None } else { Some(changed_files) }
362 }
363
364 #[inline]
375 pub fn wait(&mut self) -> Result<Vec<FileId>, DatabaseError> {
376 let Some(receiver) = &self.receiver else {
377 return Err(DatabaseError::WatcherNotActive);
378 };
379
380 let config = &self.database.configuration;
381 let workspace = config.workspace.as_ref().to_path_buf();
382
383 match receiver.recv_timeout(Duration::from_millis(WAIT_INTERNAL_MS)) {
384 Ok(changed_files) => {
385 let mut all_changed = changed_files;
386 loop {
387 match receiver.recv_timeout(Duration::from_millis(WAIT_DEBOUNCE_MS)) {
388 Ok(more) => all_changed.extend(more),
389 Err(RecvTimeoutError::Timeout) => break,
390 Err(RecvTimeoutError::Disconnected) => {
391 self.stop();
392 return Err(DatabaseError::WatcherNotActive);
393 }
394 }
395 }
396
397 let mut latest_changes: HashMap<FileId, ChangedFile> = HashMap::new();
398 for changed in all_changed {
399 latest_changes.insert(changed.id, changed);
400 }
401 let all_changed: Vec<ChangedFile> = latest_changes.into_values().collect();
402 let mut changed_ids = Vec::new();
403
404 for changed_file in &all_changed {
405 changed_ids.push(changed_file.id);
406
407 let Ok(file) = self.database.get(&changed_file.id) else {
408 if changed_file.path.exists() {
409 let new_file_type = classify_added_file(
410 &changed_file.path,
411 &self.host_base_paths,
412 &self.patch_base_paths,
413 &self.include_base_paths,
414 );
415 match File::read(&workspace, &changed_file.path, new_file_type) {
416 Ok(file) => {
417 self.database.add(file);
418 tracing::debug!("Added new file to database: {}", changed_file.path.display());
419 }
420 Err(e) => {
421 tracing::error!("Failed to load new file {}: {}", changed_file.path.display(), e);
422 }
423 }
424 }
425
426 continue;
427 };
428
429 if !changed_file.path.exists() {
430 self.database.delete(changed_file.id);
431 tracing::trace!("Deleted file from database: {}", String::from_utf8_lossy(&file.name));
432 continue;
433 }
434
435 match Self::read_stable_contents(&changed_file.path) {
436 Ok(contents) => {
437 if self.database.update(changed_file.id, Cow::Owned(contents)) {
438 tracing::trace!("Updated file in database: {}", String::from_utf8_lossy(&file.name));
439 } else {
440 tracing::warn!(
441 "Failed to update file in database (ID not found): {}",
442 String::from_utf8_lossy(&file.name)
443 );
444 }
445 }
446 Err(e) => {
447 tracing::error!("Failed to read file {}: {}", changed_file.path.display(), e);
448 }
449 }
450 }
451
452 Ok(changed_ids)
453 }
454 Err(RecvTimeoutError::Timeout) => Ok(Vec::new()),
455 Err(RecvTimeoutError::Disconnected) => {
456 self.stop();
457 Err(DatabaseError::WatcherNotActive)
458 }
459 }
460 }
461
462 fn read_stable_contents(path: &Path) -> std::io::Result<Vec<u8>> {
468 let contents = std::fs::read(path)?;
469
470 std::thread::sleep(Duration::from_millis(STABILITY_CHECK_MS));
471
472 if path.exists()
473 && let Ok(reread) = std::fs::read(path)
474 && reread != contents
475 {
476 tracing::debug!("File content changed during stability check: {}", path.display());
477
478 return Ok(reread);
479 }
480
481 Ok(contents)
482 }
483
484 #[inline]
486 #[must_use]
487 pub fn database(&self) -> &Database<'config> {
488 &self.database
489 }
490
491 #[inline]
493 #[must_use]
494 pub fn read_only_database(&self) -> ReadDatabase {
495 self.database.read_only()
496 }
497
498 #[inline]
500 pub fn database_mut(&mut self) -> &mut Database<'config> {
501 &mut self.database
502 }
503
504 #[inline]
513 pub fn with_database_mut<F, R>(&mut self, f: F) -> R
514 where
515 F: for<'borrow> FnOnce(&'borrow mut Database<'config>) -> R,
516 {
517 f(&mut self.database)
518 }
519
520 #[inline]
522 #[must_use]
523 pub fn into_database(self) -> Database<'config> {
524 let mut md = ManuallyDrop::new(self);
525 md.stop();
526 unsafe { std::ptr::read(&raw const md.database) }
529 }
530}
531
532impl Drop for DatabaseWatcher<'_> {
533 #[inline]
534 fn drop(&mut self) {
535 self.stop();
536 }
537}
538
539fn classify_added_file(
548 path: &Path,
549 host_bases: &[(PathBuf, usize)],
550 patch_bases: &[(PathBuf, usize)],
551 include_bases: &[(PathBuf, usize)],
552) -> FileType {
553 let canonical = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
554 let max_spec = |bases: &[(PathBuf, usize)]| {
555 bases.iter().filter(|(b, _)| canonical.starts_with(b.as_path())).map(|(_, s)| *s).max()
556 };
557
558 resolve_file_type(max_spec(host_bases), max_spec(include_bases), max_spec(patch_bases))
559}
560
561#[cfg(test)]
562mod classify_added_file_tests {
563 use super::*;
564
565 fn bases(items: &[&str]) -> Vec<(PathBuf, usize)> {
566 items.iter().map(|p| (PathBuf::from(p), calculate_pattern_specificity(p.as_bytes()))).collect()
567 }
568
569 #[test]
570 fn defaults_to_host_when_no_base_matches() {
571 let ft =
572 classify_added_file(Path::new("/ws/orphan/foo.php"), &bases(&["/ws/src"]), &[], &bases(&["/ws/stubs"]));
573 assert_eq!(ft, FileType::Host);
574 }
575
576 #[test]
577 fn vendored_when_only_include_matches() {
578 let ft = classify_added_file(Path::new("/ws/stubs/foo.php"), &bases(&["/ws/src"]), &[], &bases(&["/ws/stubs"]));
582 assert_eq!(ft, FileType::Vendored);
583 }
584
585 #[test]
586 fn host_when_only_host_matches() {
587 let ft = classify_added_file(Path::new("/ws/src/foo.php"), &bases(&["/ws/src"]), &[], &bases(&["/ws/stubs"]));
588 assert_eq!(ft, FileType::Host);
589 }
590
591 #[test]
592 fn matches_loader_at_equal_specificity_for_same_dir_under_both() {
593 let ft =
597 classify_added_file(Path::new("/ws/shared/foo.php"), &bases(&["/ws/shared"]), &[], &bases(&["/ws/shared"]));
598 assert_eq!(ft, FileType::Vendored);
599 }
600
601 #[test]
602 fn include_wins_when_strictly_more_specific() {
603 let ft = classify_added_file(
606 Path::new("/ws/src/vendor/stub.php"),
607 &bases(&["/ws/src"]),
608 &[],
609 &bases(&["/ws/src/vendor"]),
610 );
611 assert_eq!(ft, FileType::Vendored);
612 }
613
614 #[test]
615 fn exact_host_file_beats_directory_include_via_loader_score() {
616 let ft =
621 classify_added_file(Path::new("/ws/src/foo.php"), &bases(&["/ws/src/foo.php"]), &[], &bases(&["/ws/src"]));
622 assert_eq!(ft, FileType::Host);
623 }
624
625 #[test]
626 fn patch_when_only_patch_matches() {
627 let ft = classify_added_file(
628 Path::new("/ws/patches/foo.php"),
629 &bases(&["/ws/src"]),
630 &bases(&["/ws/patches"]),
631 &bases(&["/ws/stubs"]),
632 );
633 assert_eq!(ft, FileType::Patch);
634 }
635
636 #[test]
637 fn patch_wins_over_host_when_strictly_more_specific() {
638 let ft = classify_added_file(
639 Path::new("/ws/src/patches/foo.php"),
640 &bases(&["/ws/src"]),
641 &bases(&["/ws/src/patches"]),
642 &[],
643 );
644 assert_eq!(ft, FileType::Patch);
645 }
646
647 #[test]
648 fn host_wins_over_patch_at_equal_specificity() {
649 let ft =
651 classify_added_file(Path::new("/ws/shared/foo.php"), &bases(&["/ws/shared"]), &bases(&["/ws/shared"]), &[]);
652 assert_eq!(ft, FileType::Host);
653 }
654
655 #[test]
656 fn patch_wins_over_include_when_both_match_without_host() {
657 let ft = classify_added_file(
660 Path::new("/ws/overlap/foo.php"),
661 &[],
662 &bases(&["/ws/overlap"]),
663 &bases(&["/ws/overlap"]),
664 );
665 assert_eq!(ft, FileType::Patch);
666 }
667}