1use std::collections::BTreeSet;
2use std::path::{Component, Path, PathBuf};
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::{mpsc, Arc, RwLock};
5use std::thread::{self, JoinHandle};
6use std::time::{Duration, Instant};
7
8use crossbeam_channel::{Receiver, SendTimeoutError, Sender};
9use ignore::gitignore::Gitignore;
10
11pub type SharedGitignore = Arc<RwLock<Option<Arc<Gitignore>>>>;
12
13pub const WATCHER_FLUSH_WINDOW: Duration = Duration::from_millis(250);
14pub const WATCHER_MAX_BATCH_PATHS: usize = 1024;
15pub const WATCHER_DISPATCH_CHANNEL_CAPACITY: usize = 1024;
16const ROOT_DELETED_CHECK_INTERVAL: Duration = Duration::from_millis(250);
17const GITIGNORE_REBUILD_POLL_INTERVAL: Duration = Duration::from_millis(10);
18const DISPATCH_SEND_POLL_INTERVAL: Duration = Duration::from_millis(50);
19
20#[derive(Debug, Clone)]
21pub struct WatcherFilterConfig {
22 pub project_root: PathBuf,
23 pub git_common_dir: Option<PathBuf>,
24}
25
26impl WatcherFilterConfig {
27 pub fn new(project_root: PathBuf, git_common_dir: Option<PathBuf>) -> Self {
28 Self {
29 project_root,
30 git_common_dir,
31 }
32 }
33
34 fn git_info_exclude_path(&self) -> PathBuf {
35 self.git_common_dir
36 .clone()
37 .unwrap_or_else(|| self.project_root.join(".git"))
38 .join("info")
39 .join("exclude")
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum WatcherDispatchEvent {
45 Paths(Vec<PathBuf>),
46 RescanRequired,
47 IgnoreRulesChanged { path: PathBuf },
48 RootDeleted,
49 Error(String),
50}
51
52pub struct WatcherThreadHandle {
53 shutdown: Arc<AtomicBool>,
54 join: Option<JoinHandle<()>>,
55}
56
57impl WatcherThreadHandle {
58 pub fn new(shutdown: Arc<AtomicBool>, join: JoinHandle<()>) -> Self {
59 Self {
60 shutdown,
61 join: Some(join),
62 }
63 }
64
65 pub fn request_shutdown(&self) {
66 self.shutdown.store(true, Ordering::SeqCst);
67 }
68
69 pub fn is_finished(&self) -> bool {
70 self.join.as_ref().is_none_or(|join| join.is_finished())
71 }
72
73 pub fn shutdown_and_join(mut self) {
74 self.request_shutdown();
75 if let Some(join) = self.join.take() {
76 let _ = join.join();
77 }
78 }
79}
80
81impl Drop for WatcherThreadHandle {
82 fn drop(&mut self) {
83 self.request_shutdown();
84 }
85}
86
87pub fn watcher_dispatch_channel() -> (Sender<WatcherDispatchEvent>, Receiver<WatcherDispatchEvent>)
88{
89 crossbeam_channel::bounded(WATCHER_DISPATCH_CHANNEL_CAPACITY)
90}
91
92pub fn watcher_event_invalidates(kind: ¬ify::EventKind) -> bool {
95 use notify::event::{MetadataKind, ModifyKind};
96 use notify::EventKind;
97 match kind {
98 EventKind::Create(_) | EventKind::Remove(_) => true,
99 EventKind::Modify(ModifyKind::Metadata(meta)) => !matches!(
100 meta,
101 MetadataKind::AccessTime
102 | MetadataKind::Permissions
103 | MetadataKind::Ownership
104 | MetadataKind::Extended
105 ),
106 EventKind::Modify(_) => true,
107 _ => false,
108 }
109}
110
111pub fn watcher_path_is_infra_skip(path: &Path) -> bool {
112 path.components().any(|c| {
113 matches!(c, Component::Normal(name) if matches!(
114 name.to_str().unwrap_or(""),
115 ".git" | ".opencode" | ".alfonso" | ".gsd" | "node_modules" | "target"
116 ))
117 })
118}
119
120fn watcher_path_is_ignore_file(path: &Path) -> bool {
121 path.file_name()
122 .map(|n| n == ".gitignore" || n == ".aftignore")
123 .unwrap_or(false)
124}
125
126fn watcher_same_path(path: &Path, target: &Path) -> bool {
127 if path == target {
128 return true;
129 }
130
131 std::fs::canonicalize(target)
132 .map(|target| path == target)
133 .unwrap_or(false)
134}
135
136fn watcher_path_is_git_info_exclude(config: &WatcherFilterConfig, path: &Path) -> bool {
137 watcher_same_path(path, &config.git_info_exclude_path())
138}
139
140fn watcher_path_is_global_gitignore(path: &Path) -> bool {
141 ignore::gitignore::gitconfig_excludes_path()
142 .as_deref()
143 .is_some_and(|global_ignore| watcher_same_path(path, global_ignore))
144}
145
146fn watcher_path_can_change_corpus_ignore(config: &WatcherFilterConfig, path: &Path) -> bool {
147 if watcher_path_is_global_gitignore(path) {
148 return true;
149 }
150 if watcher_path_is_git_info_exclude(config, path) {
151 return true;
152 }
153 if !path.starts_with(&config.project_root) {
154 return false;
155 }
156
157 watcher_path_is_ignore_file(path) && !watcher_path_is_infra_skip(path)
158}
159
160pub fn canonicalize_watcher_path(path: PathBuf) -> PathBuf {
161 if let Ok(canonical) = std::fs::canonicalize(&path) {
162 return canonical;
163 }
164
165 let parent = path.parent().map(Path::to_path_buf);
166 let file_name = path.file_name().map(std::ffi::OsStr::to_os_string);
167 match (parent, file_name) {
168 (Some(parent), Some(file_name)) => std::fs::canonicalize(parent)
169 .map(|canonical_parent| canonical_parent.join(file_name))
170 .unwrap_or(path),
171 _ => path,
172 }
173}
174
175fn watcher_path_is_ignored_by_matcher(matcher: &SharedGitignore, path: &Path) -> bool {
176 if watcher_path_is_infra_skip(path) {
177 return true;
178 }
179
180 let guard = matcher
181 .read()
182 .unwrap_or_else(|poisoned| poisoned.into_inner());
183 if let Some(matcher) = guard.as_ref() {
184 if path.starts_with(matcher.path()) {
185 let is_dir = path.is_dir();
186 return matcher
187 .matched_path_or_any_parents(path, is_dir)
188 .is_ignore();
189 }
190 }
191
192 false
193}
194
195#[derive(Debug, Default, Clone, PartialEq, Eq)]
196pub struct FilteredWatcherPaths {
197 pub changed: BTreeSet<PathBuf>,
198 pub ignore_file_changed: bool,
199}
200
201fn filter_canonical_paths(
202 config: &WatcherFilterConfig,
203 matcher: &SharedGitignore,
204 raw_paths: BTreeSet<PathBuf>,
205) -> FilteredWatcherPaths {
206 let ignore_file_changed = raw_paths
207 .iter()
208 .any(|path| watcher_path_can_change_corpus_ignore(config, path));
209
210 let changed = raw_paths
211 .into_iter()
212 .filter(|path| {
213 if watcher_path_is_infra_skip(path) {
214 return false;
215 }
216
217 if watcher_path_is_global_gitignore(path)
218 || watcher_path_is_git_info_exclude(config, path)
219 {
220 return false;
221 }
222
223 if watcher_path_is_ignored_by_matcher(matcher, path) {
224 return false;
225 }
226 true
227 })
228 .collect();
229
230 FilteredWatcherPaths {
231 changed,
232 ignore_file_changed,
233 }
234}
235
236pub fn filter_watcher_raw_paths_for_test<I>(
237 config: &WatcherFilterConfig,
238 matcher: &SharedGitignore,
239 raw_paths: I,
240) -> FilteredWatcherPaths
241where
242 I: IntoIterator<Item = PathBuf>,
243{
244 let raw_paths = raw_paths
245 .into_iter()
246 .map(canonicalize_watcher_path)
247 .collect::<BTreeSet<_>>();
248 filter_canonical_paths(config, matcher, raw_paths)
249}
250
251pub fn run_watcher_thread<W, E, F>(
252 config: WatcherFilterConfig,
253 extra_watch_paths: Vec<PathBuf>,
254 matcher: SharedGitignore,
255 matcher_generation: Arc<AtomicU64>,
256 dispatch_tx: Sender<WatcherDispatchEvent>,
257 shutdown: Arc<AtomicBool>,
258 attach: F,
259) where
260 W: Send + 'static,
261 E: std::fmt::Display,
262 F: FnOnce(PathBuf, Vec<PathBuf>, mpsc::Sender<notify::Result<notify::Event>>) -> Result<W, E>,
263{
264 let (raw_tx, raw_rx) = mpsc::channel();
265 let root_path = config.project_root.clone();
266 match attach(root_path.clone(), extra_watch_paths, raw_tx) {
267 Ok(_watcher) => {
268 if shutdown.load(Ordering::SeqCst) {
269 return;
270 }
271 crate::slog_info!("watcher started: {}", root_path.display());
272 let mut filter = WatcherFilterThread::new(
273 config,
274 matcher,
275 matcher_generation,
276 dispatch_tx,
277 shutdown,
278 );
279 filter.run(raw_rx);
280 }
281 Err(error) => {
282 if !shutdown.load(Ordering::SeqCst) {
283 log::debug!(
284 "watcher init failed: {} — callers will work with stale data",
285 error
286 );
287 let _ = dispatch_tx.send(WatcherDispatchEvent::Error(format!(
288 "watcher init failed: {error}"
289 )));
290 }
291 }
292 }
293}
294
295struct WatcherFilterThread {
296 config: WatcherFilterConfig,
297 matcher: SharedGitignore,
298 matcher_generation: Arc<AtomicU64>,
299 dispatch_tx: Sender<WatcherDispatchEvent>,
300 shutdown: Arc<AtomicBool>,
301 raw_paths: BTreeSet<PathBuf>,
302 flush_deadline: Option<Instant>,
303}
304
305impl WatcherFilterThread {
306 fn new(
307 config: WatcherFilterConfig,
308 matcher: SharedGitignore,
309 matcher_generation: Arc<AtomicU64>,
310 dispatch_tx: Sender<WatcherDispatchEvent>,
311 shutdown: Arc<AtomicBool>,
312 ) -> Self {
313 Self {
314 config,
315 matcher,
316 matcher_generation,
317 dispatch_tx,
318 shutdown,
319 raw_paths: BTreeSet::new(),
320 flush_deadline: None,
321 }
322 }
323
324 fn run(&mut self, raw_rx: mpsc::Receiver<notify::Result<notify::Event>>) {
325 loop {
326 if self.shutdown.load(Ordering::SeqCst) {
327 self.flush_pending();
328 return;
329 }
330 if self.project_root_was_deleted() {
331 self.raw_paths.clear();
332 let _ = self.send_dispatch(WatcherDispatchEvent::RootDeleted);
333 return;
334 }
335 if self.flush_deadline_reached() {
336 if !self.flush_pending() {
337 return;
338 }
339 continue;
340 }
341
342 match raw_rx.recv_timeout(self.next_recv_timeout()) {
343 Ok(Ok(event)) => {
344 if event.need_rescan() {
345 self.raw_paths.clear();
346 self.flush_deadline = None;
347 if !self.send_dispatch(WatcherDispatchEvent::RescanRequired) {
348 return;
349 }
350 continue;
351 }
352 if watcher_event_invalidates(&event.kind) && !self.push_raw_paths(event.paths) {
353 return;
354 }
355 }
356 Ok(Err(error)) => {
357 let _ = self.send_dispatch(WatcherDispatchEvent::Error(error.to_string()));
358 return;
359 }
360 Err(mpsc::RecvTimeoutError::Timeout) => {
361 if !self.flush_pending() {
362 return;
363 }
364 }
365 Err(mpsc::RecvTimeoutError::Disconnected) => {
366 if !self.shutdown.load(Ordering::SeqCst) {
367 let _ = self.send_dispatch(WatcherDispatchEvent::Error(
368 "watcher channel disconnected".to_string(),
369 ));
370 }
371 return;
372 }
373 }
374 }
375 }
376
377 fn project_root_was_deleted(&self) -> bool {
378 !self.config.project_root.exists()
379 }
380
381 fn push_raw_paths(&mut self, paths: Vec<PathBuf>) -> bool {
382 for path in paths {
383 self.raw_paths.insert(canonicalize_watcher_path(path));
384 }
385 if !self.raw_paths.is_empty() && self.flush_deadline.is_none() {
386 self.flush_deadline = Some(Instant::now() + WATCHER_FLUSH_WINDOW);
387 }
388 if self.raw_paths.len() >= WATCHER_MAX_BATCH_PATHS {
389 return self.flush_pending();
390 }
391 true
392 }
393
394 fn next_recv_timeout(&self) -> Duration {
395 let root_check = ROOT_DELETED_CHECK_INTERVAL;
396 match self.flush_deadline {
397 Some(deadline) => deadline
398 .saturating_duration_since(Instant::now())
399 .min(root_check),
400 None => root_check,
401 }
402 }
403
404 fn flush_deadline_reached(&self) -> bool {
405 self.flush_deadline
406 .is_some_and(|deadline| Instant::now() >= deadline)
407 }
408
409 fn flush_pending(&mut self) -> bool {
410 if self.raw_paths.is_empty() {
411 self.flush_deadline = None;
412 return true;
413 }
414
415 let raw_paths = std::mem::take(&mut self.raw_paths);
416 self.flush_deadline = None;
417 let ignore_path = raw_paths
418 .iter()
419 .find(|path| watcher_path_can_change_corpus_ignore(&self.config, path))
420 .cloned();
421 let ignore_file_changed = ignore_path.is_some();
422 if let Some(path) = ignore_path {
423 let observed_generation = self.matcher_generation.load(Ordering::SeqCst);
424 if !self.send_dispatch(WatcherDispatchEvent::IgnoreRulesChanged { path }) {
425 return false;
426 }
427 if !self.wait_for_gitignore_rebuild(observed_generation) {
428 return false;
429 }
430 }
431
432 let filtered = filter_canonical_paths(&self.config, &self.matcher, raw_paths);
433 debug_assert_eq!(filtered.ignore_file_changed, ignore_file_changed);
434 if filtered.changed.is_empty() {
435 return true;
436 }
437 self.send_dispatch(WatcherDispatchEvent::Paths(
438 filtered.changed.into_iter().collect(),
439 ))
440 }
441
442 fn wait_for_gitignore_rebuild(&self, observed_generation: u64) -> bool {
443 while !self.shutdown.load(Ordering::SeqCst)
444 && self.matcher_generation.load(Ordering::SeqCst) == observed_generation
445 {
446 if self.project_root_was_deleted() {
447 let _ = self.send_dispatch(WatcherDispatchEvent::RootDeleted);
448 return false;
449 }
450 thread::sleep(GITIGNORE_REBUILD_POLL_INTERVAL);
451 }
452 !self.shutdown.load(Ordering::SeqCst)
453 }
454
455 fn send_dispatch(&self, event: WatcherDispatchEvent) -> bool {
456 let mut event = event;
457 loop {
458 match self
459 .dispatch_tx
460 .send_timeout(event, DISPATCH_SEND_POLL_INTERVAL)
461 {
462 Ok(()) => return true,
463 Err(SendTimeoutError::Timeout(returned)) => {
464 if self.shutdown.load(Ordering::SeqCst) {
465 return false;
466 }
467 event = returned;
468 }
469 Err(SendTimeoutError::Disconnected(_)) => return false,
470 }
471 }
472 }
473}
474
475#[cfg(test)]
476mod tests {
477 use super::*;
478 use ignore::gitignore::GitignoreBuilder;
479 use notify::event::{
480 AccessKind, AccessMode, CreateKind, DataChange, Flag, MetadataKind, ModifyKind,
481 };
482 use notify::EventKind;
483 use tempfile::TempDir;
484
485 fn shared_matcher(root: &Path) -> SharedGitignore {
486 let root = std::fs::canonicalize(root).unwrap_or_else(|_| root.to_path_buf());
487 let mut builder = GitignoreBuilder::new(&root);
488 let ignore = root.join(".gitignore");
489 if ignore.exists() {
490 if let Some(error) = builder.add(&ignore) {
491 panic!("gitignore parse error: {error}");
492 }
493 }
494 let matcher = builder.build().unwrap();
495 let matcher = (matcher.num_ignores() > 0).then(|| Arc::new(matcher));
496 Arc::new(RwLock::new(matcher))
497 }
498
499 #[test]
500 fn event_kind_filter_accepts_content_changes_only() {
501 assert!(watcher_event_invalidates(&EventKind::Create(
502 CreateKind::File
503 )));
504 assert!(watcher_event_invalidates(&EventKind::Modify(
505 ModifyKind::Data(DataChange::Content)
506 )));
507 assert!(watcher_event_invalidates(&EventKind::Modify(
508 ModifyKind::Metadata(MetadataKind::WriteTime)
509 )));
510 assert!(!watcher_event_invalidates(&EventKind::Modify(
511 ModifyKind::Metadata(MetadataKind::AccessTime)
512 )));
513 assert!(!watcher_event_invalidates(&EventKind::Modify(
514 ModifyKind::Metadata(MetadataKind::Permissions)
515 )));
516 assert!(!watcher_event_invalidates(&EventKind::Access(
517 AccessKind::Open(AccessMode::Read)
518 )));
519 assert!(!watcher_event_invalidates(&EventKind::Other));
520 }
521
522 #[test]
523 fn rescan_event_dispatches_control_and_supersedes_pending_paths() {
524 let tmp = TempDir::new().unwrap();
525 let root = std::fs::canonicalize(tmp.path()).unwrap();
526 let pending = root.join("pending.rs");
527 std::fs::write(&pending, "fn main() {}\n").unwrap();
528 let matcher = Arc::new(RwLock::new(None));
529 let generation = Arc::new(AtomicU64::new(0));
530 let shutdown = Arc::new(AtomicBool::new(false));
531 let (dispatch_tx, dispatch_rx) = watcher_dispatch_channel();
532 let (raw_tx, raw_rx) = mpsc::channel();
533 let config = WatcherFilterConfig::new(root, None);
534 let mut filter = WatcherFilterThread::new(
535 config,
536 matcher,
537 generation,
538 dispatch_tx,
539 Arc::clone(&shutdown),
540 );
541 let handle = thread::spawn(move || filter.run(raw_rx));
542
543 let mut granular = notify::Event::new(EventKind::Create(CreateKind::File));
544 granular.paths.push(pending);
545 raw_tx.send(Ok(granular)).unwrap();
546 raw_tx
547 .send(Ok(
548 notify::Event::new(EventKind::Other).set_flag(Flag::Rescan)
549 ))
550 .unwrap();
551
552 let event = dispatch_rx
553 .recv_timeout(Duration::from_secs(2))
554 .expect("rescan event");
555 assert_eq!(event, WatcherDispatchEvent::RescanRequired);
556 assert!(
557 dispatch_rx
558 .recv_timeout(WATCHER_FLUSH_WINDOW + Duration::from_millis(100))
559 .is_err(),
560 "pending granular paths should be cleared by a rescan signal"
561 );
562
563 shutdown.store(true, Ordering::SeqCst);
564 drop(raw_tx);
565 handle.join().unwrap();
566 }
567
568 #[test]
569 fn filters_gitignored_paths_with_shared_matcher() {
570 let tmp = TempDir::new().unwrap();
571 let root = std::fs::canonicalize(tmp.path()).unwrap();
572 std::fs::write(root.join(".gitignore"), "ignored/\n").unwrap();
573 std::fs::create_dir_all(root.join("ignored")).unwrap();
574 std::fs::write(root.join("ignored/file.ts"), "ignored").unwrap();
575 std::fs::write(root.join("kept.ts"), "kept").unwrap();
576 let matcher = shared_matcher(&root);
577 let config = WatcherFilterConfig::new(root.clone(), None);
578
579 let filtered = filter_watcher_raw_paths_for_test(
580 &config,
581 &matcher,
582 [root.join("ignored/file.ts"), root.join("kept.ts")],
583 );
584
585 assert!(!filtered.changed.contains(&root.join("ignored/file.ts")));
586 assert!(filtered.changed.contains(&root.join("kept.ts")));
587 }
588
589 #[test]
590 fn ignore_rule_paths_are_control_only_for_external_excludes() {
591 let tmp = TempDir::new().unwrap();
592 let root = std::fs::canonicalize(tmp.path()).unwrap();
593 let git_info = root.join(".git").join("info");
594 std::fs::create_dir_all(&git_info).unwrap();
595 let exclude = git_info.join("exclude");
596 std::fs::write(&exclude, "ignored/\n").unwrap();
597 let matcher = Arc::new(RwLock::new(None));
598 let config = WatcherFilterConfig::new(root, None);
599
600 let filtered = filter_watcher_raw_paths_for_test(&config, &matcher, [exclude]);
601
602 assert!(filtered.ignore_file_changed);
603 assert!(filtered.changed.is_empty());
604 }
605
606 #[test]
607 fn root_deleted_sends_control_and_exits() {
608 let tmp = TempDir::new().unwrap();
609 let root = std::fs::canonicalize(tmp.path()).unwrap();
610 let matcher = Arc::new(RwLock::new(None));
611 let generation = Arc::new(AtomicU64::new(0));
612 let shutdown = Arc::new(AtomicBool::new(false));
613 let (dispatch_tx, dispatch_rx) = watcher_dispatch_channel();
614 let (raw_tx, raw_rx) = mpsc::channel();
615 let config = WatcherFilterConfig::new(root.clone(), None);
616 let mut filter = WatcherFilterThread::new(
617 config,
618 matcher,
619 generation,
620 dispatch_tx,
621 Arc::clone(&shutdown),
622 );
623 let handle = thread::spawn(move || filter.run(raw_rx));
624 let _raw_tx = raw_tx;
625 std::fs::remove_dir_all(&root).unwrap();
626
627 let event = dispatch_rx
628 .recv_timeout(Duration::from_secs(2))
629 .expect("root deleted event");
630 assert_eq!(event, WatcherDispatchEvent::RootDeleted);
631 shutdown.store(true, Ordering::SeqCst);
632 handle.join().unwrap();
633 }
634}