1use std::collections::BTreeSet;
2use std::path::{Component, Path, PathBuf};
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::{mpsc, Arc, RwLock};
5use std::thread::{self, JoinHandle};
6use std::time::{Duration, Instant};
7
8use crossbeam_channel::{Receiver, SendTimeoutError, Sender};
9use ignore::gitignore::Gitignore;
10
11pub type SharedGitignore = Arc<RwLock<Option<Arc<Gitignore>>>>;
12
13pub const WATCHER_FLUSH_WINDOW: Duration = Duration::from_millis(250);
14pub const WATCHER_MAX_BATCH_PATHS: usize = 1024;
15pub const WATCHER_DISPATCH_CHANNEL_CAPACITY: usize = 1024;
16const ROOT_DELETED_CHECK_INTERVAL: Duration = Duration::from_millis(250);
17const GITIGNORE_REBUILD_POLL_INTERVAL: Duration = Duration::from_millis(10);
18const DISPATCH_SEND_POLL_INTERVAL: Duration = Duration::from_millis(50);
19
20#[derive(Debug, Clone)]
21pub struct WatcherFilterConfig {
22 pub project_root: PathBuf,
23 pub git_common_dir: Option<PathBuf>,
24}
25
26impl WatcherFilterConfig {
27 pub fn new(project_root: PathBuf, git_common_dir: Option<PathBuf>) -> Self {
28 Self {
29 project_root,
30 git_common_dir,
31 }
32 }
33
34 fn git_info_exclude_path(&self) -> PathBuf {
35 self.git_common_dir
36 .clone()
37 .unwrap_or_else(|| self.project_root.join(".git"))
38 .join("info")
39 .join("exclude")
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum WatcherDispatchEvent {
45 Paths(Vec<PathBuf>),
46 RescanRequired,
47 IgnoreRulesChanged { path: PathBuf },
48 RootDeleted,
49 Error(String),
50}
51
52pub struct WatcherThreadHandle {
53 shutdown: Arc<AtomicBool>,
54 join: Option<JoinHandle<()>>,
55}
56
57impl WatcherThreadHandle {
58 pub fn new(shutdown: Arc<AtomicBool>, join: JoinHandle<()>) -> Self {
59 Self {
60 shutdown,
61 join: Some(join),
62 }
63 }
64
65 pub fn request_shutdown(&self) {
66 self.shutdown.store(true, Ordering::SeqCst);
67 }
68
69 pub fn is_finished(&self) -> bool {
70 self.join.as_ref().is_none_or(|join| join.is_finished())
71 }
72
73 pub fn shutdown_and_join(mut self) {
74 self.request_shutdown();
75 if let Some(join) = self.join.take() {
76 let _ = join.join();
77 }
78 }
79}
80
81impl Drop for WatcherThreadHandle {
82 fn drop(&mut self) {
83 self.request_shutdown();
84 }
85}
86
87pub fn watcher_dispatch_channel() -> (Sender<WatcherDispatchEvent>, Receiver<WatcherDispatchEvent>)
88{
89 crossbeam_channel::bounded(WATCHER_DISPATCH_CHANNEL_CAPACITY)
90}
91
92pub fn watcher_event_invalidates(kind: ¬ify::EventKind) -> bool {
95 use notify::event::{MetadataKind, ModifyKind};
96 use notify::EventKind;
97 match kind {
98 EventKind::Create(_) | EventKind::Remove(_) => true,
99 EventKind::Modify(ModifyKind::Metadata(meta)) => !matches!(
100 meta,
101 MetadataKind::AccessTime
102 | MetadataKind::Permissions
103 | MetadataKind::Ownership
104 | MetadataKind::Extended
105 ),
106 EventKind::Modify(_) => true,
107 _ => false,
108 }
109}
110
111pub fn watcher_path_is_infra_skip(path: &Path) -> bool {
112 path.components().any(|c| {
113 matches!(c, Component::Normal(name) if matches!(
114 name.to_str().unwrap_or(""),
115 ".git" | ".opencode" | ".alfonso" | ".gsd" | "node_modules" | "target"
116 ))
117 })
118}
119
120fn watcher_path_is_high_churn_infra(path: &Path) -> bool {
134 path.components().any(|c| {
135 matches!(c, Component::Normal(name) if matches!(
136 name.to_str().unwrap_or(""),
137 ".opencode" | ".alfonso" | ".gsd" | "node_modules" | "target"
138 ))
139 })
140}
141
142fn watcher_path_is_ignore_file(path: &Path) -> bool {
143 path.file_name()
144 .map(|n| n == ".gitignore" || n == ".aftignore")
145 .unwrap_or(false)
146}
147
148fn watcher_same_path(path: &Path, target: &Path) -> bool {
149 if path == target {
150 return true;
151 }
152
153 std::fs::canonicalize(target)
154 .map(|target| path == target)
155 .unwrap_or(false)
156}
157
158fn watcher_path_is_git_info_exclude(config: &WatcherFilterConfig, path: &Path) -> bool {
159 watcher_same_path(path, &config.git_info_exclude_path())
160}
161
162fn watcher_path_is_global_gitignore(path: &Path) -> bool {
163 ignore::gitignore::gitconfig_excludes_path()
164 .as_deref()
165 .is_some_and(|global_ignore| watcher_same_path(path, global_ignore))
166}
167
168fn watcher_path_can_change_corpus_ignore(config: &WatcherFilterConfig, path: &Path) -> bool {
169 if watcher_path_is_global_gitignore(path) {
170 return true;
171 }
172 if watcher_path_is_git_info_exclude(config, path) {
173 return true;
174 }
175 if !path.starts_with(&config.project_root) {
176 return false;
177 }
178
179 watcher_path_is_ignore_file(path) && !watcher_path_is_infra_skip(path)
180}
181
182pub fn canonicalize_watcher_path(path: PathBuf) -> PathBuf {
183 if let Ok(canonical) = std::fs::canonicalize(&path) {
184 return canonical;
185 }
186
187 let parent = path.parent().map(Path::to_path_buf);
188 let file_name = path.file_name().map(std::ffi::OsStr::to_os_string);
189 match (parent, file_name) {
190 (Some(parent), Some(file_name)) => std::fs::canonicalize(parent)
191 .map(|canonical_parent| canonical_parent.join(file_name))
192 .unwrap_or(path),
193 _ => path,
194 }
195}
196
197fn watcher_path_is_ignored_by_matcher(matcher: &SharedGitignore, path: &Path) -> bool {
198 if watcher_path_is_infra_skip(path) {
199 return true;
200 }
201
202 let guard = matcher
203 .read()
204 .unwrap_or_else(|poisoned| poisoned.into_inner());
205 if let Some(matcher) = guard.as_ref() {
206 if path.starts_with(matcher.path()) {
207 let is_dir = path.is_dir();
208 return matcher
209 .matched_path_or_any_parents(path, is_dir)
210 .is_ignore();
211 }
212 }
213
214 false
215}
216
217#[derive(Debug, Default, Clone, PartialEq, Eq)]
218pub struct FilteredWatcherPaths {
219 pub changed: BTreeSet<PathBuf>,
220 pub ignore_file_changed: bool,
221}
222
223fn filter_canonical_paths(
224 config: &WatcherFilterConfig,
225 matcher: &SharedGitignore,
226 raw_paths: BTreeSet<PathBuf>,
227) -> FilteredWatcherPaths {
228 let ignore_file_changed = raw_paths
229 .iter()
230 .any(|path| watcher_path_can_change_corpus_ignore(config, path));
231
232 let changed = raw_paths
233 .into_iter()
234 .filter(|path| {
235 if watcher_path_is_infra_skip(path) {
236 return false;
237 }
238
239 if watcher_path_is_global_gitignore(path)
240 || watcher_path_is_git_info_exclude(config, path)
241 {
242 return false;
243 }
244
245 if watcher_path_is_ignored_by_matcher(matcher, path) {
246 return false;
247 }
248 true
249 })
250 .collect();
251
252 FilteredWatcherPaths {
253 changed,
254 ignore_file_changed,
255 }
256}
257
258pub fn filter_watcher_raw_paths_for_test<I>(
259 config: &WatcherFilterConfig,
260 matcher: &SharedGitignore,
261 raw_paths: I,
262) -> FilteredWatcherPaths
263where
264 I: IntoIterator<Item = PathBuf>,
265{
266 let raw_paths = raw_paths
267 .into_iter()
268 .map(canonicalize_watcher_path)
269 .collect::<BTreeSet<_>>();
270 filter_canonical_paths(config, matcher, raw_paths)
271}
272
273pub fn run_watcher_thread<W, E, F>(
274 config: WatcherFilterConfig,
275 extra_watch_paths: Vec<PathBuf>,
276 matcher: SharedGitignore,
277 matcher_generation: Arc<AtomicU64>,
278 dispatch_tx: Sender<WatcherDispatchEvent>,
279 shutdown: Arc<AtomicBool>,
280 attach: F,
281) where
282 W: Send + 'static,
283 E: std::fmt::Display,
284 F: FnOnce(PathBuf, Vec<PathBuf>, mpsc::Sender<notify::Result<notify::Event>>) -> Result<W, E>,
285{
286 let (raw_tx, raw_rx) = mpsc::channel();
287 let root_path = config.project_root.clone();
288 match attach(root_path.clone(), extra_watch_paths, raw_tx) {
289 Ok(_watcher) => {
290 if shutdown.load(Ordering::SeqCst) {
291 return;
292 }
293 crate::slog_info!("watcher started: {}", root_path.display());
294 let mut filter = WatcherFilterThread::new(
295 config,
296 matcher,
297 matcher_generation,
298 dispatch_tx,
299 shutdown,
300 );
301 filter.run(raw_rx);
302 }
303 Err(error) => {
304 if !shutdown.load(Ordering::SeqCst) {
305 log::debug!(
306 "watcher init failed: {} — callers will work with stale data",
307 error
308 );
309 let _ = dispatch_tx.send(WatcherDispatchEvent::Error(format!(
310 "watcher init failed: {error}"
311 )));
312 }
313 }
314 }
315}
316
317struct WatcherFilterThread {
318 config: WatcherFilterConfig,
319 matcher: SharedGitignore,
320 matcher_generation: Arc<AtomicU64>,
321 dispatch_tx: Sender<WatcherDispatchEvent>,
322 shutdown: Arc<AtomicBool>,
323 raw_paths: BTreeSet<PathBuf>,
324 flush_deadline: Option<Instant>,
325}
326
327impl WatcherFilterThread {
328 fn new(
329 config: WatcherFilterConfig,
330 matcher: SharedGitignore,
331 matcher_generation: Arc<AtomicU64>,
332 dispatch_tx: Sender<WatcherDispatchEvent>,
333 shutdown: Arc<AtomicBool>,
334 ) -> Self {
335 Self {
336 config,
337 matcher,
338 matcher_generation,
339 dispatch_tx,
340 shutdown,
341 raw_paths: BTreeSet::new(),
342 flush_deadline: None,
343 }
344 }
345
346 fn run(&mut self, raw_rx: mpsc::Receiver<notify::Result<notify::Event>>) {
347 loop {
348 if self.shutdown.load(Ordering::SeqCst) {
349 self.flush_pending();
350 return;
351 }
352 if self.project_root_was_deleted() {
353 self.raw_paths.clear();
354 let _ = self.send_dispatch(WatcherDispatchEvent::RootDeleted);
355 return;
356 }
357 if self.flush_deadline_reached() {
358 if !self.flush_pending() {
359 return;
360 }
361 continue;
362 }
363
364 match raw_rx.recv_timeout(self.next_recv_timeout()) {
365 Ok(Ok(event)) => {
366 if event.need_rescan() {
367 self.raw_paths.clear();
368 self.flush_deadline = None;
369 if !self.send_dispatch(WatcherDispatchEvent::RescanRequired) {
370 return;
371 }
372 continue;
373 }
374 if watcher_event_invalidates(&event.kind) && !self.push_raw_paths(event.paths) {
375 return;
376 }
377 }
378 Ok(Err(error)) => {
379 let _ = self.send_dispatch(WatcherDispatchEvent::Error(error.to_string()));
380 return;
381 }
382 Err(mpsc::RecvTimeoutError::Timeout) => {
383 if !self.flush_pending() {
384 return;
385 }
386 }
387 Err(mpsc::RecvTimeoutError::Disconnected) => {
388 if !self.shutdown.load(Ordering::SeqCst) {
389 let _ = self.send_dispatch(WatcherDispatchEvent::Error(
390 "watcher channel disconnected".to_string(),
391 ));
392 }
393 return;
394 }
395 }
396 }
397 }
398
399 fn project_root_was_deleted(&self) -> bool {
400 !self.config.project_root.exists()
401 }
402
403 fn push_raw_paths(&mut self, paths: Vec<PathBuf>) -> bool {
404 for path in paths {
405 if watcher_path_is_high_churn_infra(&path) {
413 continue;
414 }
415 self.raw_paths.insert(canonicalize_watcher_path(path));
421 }
422 if !self.raw_paths.is_empty() && self.flush_deadline.is_none() {
423 self.flush_deadline = Some(Instant::now() + WATCHER_FLUSH_WINDOW);
424 }
425 if self.raw_paths.len() >= WATCHER_MAX_BATCH_PATHS {
426 return self.flush_pending();
427 }
428 true
429 }
430
431 fn next_recv_timeout(&self) -> Duration {
432 let root_check = ROOT_DELETED_CHECK_INTERVAL;
433 match self.flush_deadline {
434 Some(deadline) => deadline
435 .saturating_duration_since(Instant::now())
436 .min(root_check),
437 None => root_check,
438 }
439 }
440
441 fn flush_deadline_reached(&self) -> bool {
442 self.flush_deadline
443 .is_some_and(|deadline| Instant::now() >= deadline)
444 }
445
446 fn flush_pending(&mut self) -> bool {
447 if self.raw_paths.is_empty() {
448 self.flush_deadline = None;
449 return true;
450 }
451
452 let raw_paths = std::mem::take(&mut self.raw_paths);
453 self.flush_deadline = None;
454 let ignore_path = raw_paths
455 .iter()
456 .find(|path| watcher_path_can_change_corpus_ignore(&self.config, path))
457 .cloned();
458 let ignore_file_changed = ignore_path.is_some();
459 if let Some(path) = ignore_path {
460 let observed_generation = self.matcher_generation.load(Ordering::SeqCst);
461 if !self.send_dispatch(WatcherDispatchEvent::IgnoreRulesChanged { path }) {
462 return false;
463 }
464 if !self.wait_for_gitignore_rebuild(observed_generation) {
465 return false;
466 }
467 }
468
469 let filtered = filter_canonical_paths(&self.config, &self.matcher, raw_paths);
470 debug_assert_eq!(filtered.ignore_file_changed, ignore_file_changed);
471 if filtered.changed.is_empty() {
472 return true;
473 }
474 self.send_dispatch(WatcherDispatchEvent::Paths(
475 filtered.changed.into_iter().collect(),
476 ))
477 }
478
479 fn wait_for_gitignore_rebuild(&self, observed_generation: u64) -> bool {
480 while !self.shutdown.load(Ordering::SeqCst)
481 && self.matcher_generation.load(Ordering::SeqCst) == observed_generation
482 {
483 if self.project_root_was_deleted() {
484 let _ = self.send_dispatch(WatcherDispatchEvent::RootDeleted);
485 return false;
486 }
487 thread::sleep(GITIGNORE_REBUILD_POLL_INTERVAL);
488 }
489 !self.shutdown.load(Ordering::SeqCst)
490 }
491
492 fn send_dispatch(&self, event: WatcherDispatchEvent) -> bool {
493 let mut event = event;
494 loop {
495 match self
496 .dispatch_tx
497 .send_timeout(event, DISPATCH_SEND_POLL_INTERVAL)
498 {
499 Ok(()) => return true,
500 Err(SendTimeoutError::Timeout(returned)) => {
501 if self.shutdown.load(Ordering::SeqCst) {
502 return false;
503 }
504 event = returned;
505 }
506 Err(SendTimeoutError::Disconnected(_)) => return false,
507 }
508 }
509 }
510}
511
512#[cfg(test)]
513mod tests {
514 use super::*;
515 use ignore::gitignore::GitignoreBuilder;
516 use notify::event::{
517 AccessKind, AccessMode, CreateKind, DataChange, Flag, MetadataKind, ModifyKind,
518 };
519 use notify::EventKind;
520 use tempfile::TempDir;
521
522 fn shared_matcher(root: &Path) -> SharedGitignore {
523 let root = std::fs::canonicalize(root).unwrap_or_else(|_| root.to_path_buf());
524 let mut builder = GitignoreBuilder::new(&root);
525 let ignore = root.join(".gitignore");
526 if ignore.exists() {
527 if let Some(error) = builder.add(&ignore) {
528 panic!("gitignore parse error: {error}");
529 }
530 }
531 let matcher = builder.build().unwrap();
532 let matcher = (matcher.num_ignores() > 0).then(|| Arc::new(matcher));
533 Arc::new(RwLock::new(matcher))
534 }
535
536 #[test]
537 fn event_kind_filter_accepts_content_changes_only() {
538 assert!(watcher_event_invalidates(&EventKind::Create(
539 CreateKind::File
540 )));
541 assert!(watcher_event_invalidates(&EventKind::Modify(
542 ModifyKind::Data(DataChange::Content)
543 )));
544 assert!(watcher_event_invalidates(&EventKind::Modify(
545 ModifyKind::Metadata(MetadataKind::WriteTime)
546 )));
547 assert!(!watcher_event_invalidates(&EventKind::Modify(
548 ModifyKind::Metadata(MetadataKind::AccessTime)
549 )));
550 assert!(!watcher_event_invalidates(&EventKind::Modify(
551 ModifyKind::Metadata(MetadataKind::Permissions)
552 )));
553 assert!(!watcher_event_invalidates(&EventKind::Access(
554 AccessKind::Open(AccessMode::Read)
555 )));
556 assert!(!watcher_event_invalidates(&EventKind::Other));
557 }
558
559 #[test]
560 fn high_churn_infra_skip_drops_build_dirs_but_keeps_git_and_source() {
561 assert!(watcher_path_is_high_churn_infra(Path::new(
564 "/proj/target/debug/deps/foo.o"
565 )));
566 assert!(watcher_path_is_high_churn_infra(Path::new(
567 "/proj/node_modules/.bin/x"
568 )));
569 assert!(watcher_path_is_high_churn_infra(Path::new(
570 "/proj/.alfonso/notes/x"
571 )));
572 assert!(!watcher_path_is_high_churn_infra(Path::new(
575 "/proj/.git/info/exclude"
576 )));
577 assert!(!watcher_path_is_high_churn_infra(Path::new(
579 "/proj/src/main.rs"
580 )));
581 assert!(watcher_path_is_infra_skip(Path::new("/proj/.git/index")));
583 }
584
585 #[test]
586 fn rescan_event_dispatches_control_and_supersedes_pending_paths() {
587 let tmp = TempDir::new().unwrap();
588 let root = std::fs::canonicalize(tmp.path()).unwrap();
589 let pending = root.join("pending.rs");
590 std::fs::write(&pending, "fn main() {}\n").unwrap();
591 let matcher = Arc::new(RwLock::new(None));
592 let generation = Arc::new(AtomicU64::new(0));
593 let shutdown = Arc::new(AtomicBool::new(false));
594 let (dispatch_tx, dispatch_rx) = watcher_dispatch_channel();
595 let (raw_tx, raw_rx) = mpsc::channel();
596 let config = WatcherFilterConfig::new(root, None);
597 let mut filter = WatcherFilterThread::new(
598 config,
599 matcher,
600 generation,
601 dispatch_tx,
602 Arc::clone(&shutdown),
603 );
604 let handle = thread::spawn(move || filter.run(raw_rx));
605
606 let mut granular = notify::Event::new(EventKind::Create(CreateKind::File));
607 granular.paths.push(pending);
608 raw_tx.send(Ok(granular)).unwrap();
609 raw_tx
610 .send(Ok(
611 notify::Event::new(EventKind::Other).set_flag(Flag::Rescan)
612 ))
613 .unwrap();
614
615 let event = dispatch_rx
616 .recv_timeout(Duration::from_secs(2))
617 .expect("rescan event");
618 assert_eq!(event, WatcherDispatchEvent::RescanRequired);
619 assert!(
620 dispatch_rx
621 .recv_timeout(WATCHER_FLUSH_WINDOW + Duration::from_millis(100))
622 .is_err(),
623 "pending granular paths should be cleared by a rescan signal"
624 );
625
626 shutdown.store(true, Ordering::SeqCst);
627 drop(raw_tx);
628 handle.join().unwrap();
629 }
630
631 #[test]
632 fn filters_gitignored_paths_with_shared_matcher() {
633 let tmp = TempDir::new().unwrap();
634 let root = std::fs::canonicalize(tmp.path()).unwrap();
635 std::fs::write(root.join(".gitignore"), "ignored/\n").unwrap();
636 std::fs::create_dir_all(root.join("ignored")).unwrap();
637 std::fs::write(root.join("ignored/file.ts"), "ignored").unwrap();
638 std::fs::write(root.join("kept.ts"), "kept").unwrap();
639 let matcher = shared_matcher(&root);
640 let config = WatcherFilterConfig::new(root.clone(), None);
641
642 let filtered = filter_watcher_raw_paths_for_test(
643 &config,
644 &matcher,
645 [root.join("ignored/file.ts"), root.join("kept.ts")],
646 );
647
648 assert!(!filtered.changed.contains(&root.join("ignored/file.ts")));
649 assert!(filtered.changed.contains(&root.join("kept.ts")));
650 }
651
652 #[test]
653 fn ignore_rule_paths_are_control_only_for_external_excludes() {
654 let tmp = TempDir::new().unwrap();
655 let root = std::fs::canonicalize(tmp.path()).unwrap();
656 let git_info = root.join(".git").join("info");
657 std::fs::create_dir_all(&git_info).unwrap();
658 let exclude = git_info.join("exclude");
659 std::fs::write(&exclude, "ignored/\n").unwrap();
660 let matcher = Arc::new(RwLock::new(None));
661 let config = WatcherFilterConfig::new(root, None);
662
663 let filtered = filter_watcher_raw_paths_for_test(&config, &matcher, [exclude]);
664
665 assert!(filtered.ignore_file_changed);
666 assert!(filtered.changed.is_empty());
667 }
668
669 #[test]
670 fn root_deleted_sends_control_and_exits() {
671 let tmp = TempDir::new().unwrap();
672 let root = std::fs::canonicalize(tmp.path()).unwrap();
673 let matcher = Arc::new(RwLock::new(None));
674 let generation = Arc::new(AtomicU64::new(0));
675 let shutdown = Arc::new(AtomicBool::new(false));
676 let (dispatch_tx, dispatch_rx) = watcher_dispatch_channel();
677 let (raw_tx, raw_rx) = mpsc::channel();
678 let config = WatcherFilterConfig::new(root.clone(), None);
679 let mut filter = WatcherFilterThread::new(
680 config,
681 matcher,
682 generation,
683 dispatch_tx,
684 Arc::clone(&shutdown),
685 );
686 let handle = thread::spawn(move || filter.run(raw_rx));
687 let _raw_tx = raw_tx;
688 std::fs::remove_dir_all(&root).unwrap();
689
690 let event = dispatch_rx
691 .recv_timeout(Duration::from_secs(2))
692 .expect("root deleted event");
693 assert_eq!(event, WatcherDispatchEvent::RootDeleted);
694 shutdown.store(true, Ordering::SeqCst);
695 handle.join().unwrap();
696 }
697}