1use std::collections::BTreeSet;
2use std::path::{Component, Path, PathBuf};
3use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
4use std::sync::{mpsc, Arc, RwLock};
5use std::thread::{self, JoinHandle};
6use std::time::{Duration, Instant};
7
8use crossbeam_channel::{Receiver, SendTimeoutError, Sender};
9use ignore::gitignore::Gitignore;
10
11pub type SharedGitignore = Arc<RwLock<Option<Arc<Gitignore>>>>;
12
13pub const WATCHER_FLUSH_WINDOW: Duration = Duration::from_millis(250);
14pub const WATCHER_MAX_BATCH_PATHS: usize = 1024;
15pub const WATCHER_DISPATCH_CHANNEL_CAPACITY: usize = 1024;
16const ROOT_DELETED_CHECK_INTERVAL: Duration = Duration::from_millis(250);
17const GITIGNORE_REBUILD_POLL_INTERVAL: Duration = Duration::from_millis(10);
18const DISPATCH_SEND_POLL_INTERVAL: Duration = Duration::from_millis(50);
19
20#[derive(Debug, Clone)]
21pub struct WatcherFilterConfig {
22 pub project_root: PathBuf,
23 pub git_common_dir: Option<PathBuf>,
24}
25
26impl WatcherFilterConfig {
27 pub fn new(project_root: PathBuf, git_common_dir: Option<PathBuf>) -> Self {
28 Self {
29 project_root,
30 git_common_dir,
31 }
32 }
33
34 fn git_info_exclude_path(&self) -> PathBuf {
35 self.git_common_dir
36 .clone()
37 .unwrap_or_else(|| self.project_root.join(".git"))
38 .join("info")
39 .join("exclude")
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum WatcherDispatchEvent {
45 Paths(Vec<PathBuf>),
46 IgnoreRulesChanged { path: PathBuf },
47 RootDeleted,
48 Error(String),
49}
50
51pub struct WatcherThreadHandle {
52 shutdown: Arc<AtomicBool>,
53 join: Option<JoinHandle<()>>,
54}
55
56impl WatcherThreadHandle {
57 pub fn new(shutdown: Arc<AtomicBool>, join: JoinHandle<()>) -> Self {
58 Self {
59 shutdown,
60 join: Some(join),
61 }
62 }
63
64 pub fn request_shutdown(&self) {
65 self.shutdown.store(true, Ordering::SeqCst);
66 }
67
68 pub fn is_finished(&self) -> bool {
69 self.join.as_ref().is_none_or(|join| join.is_finished())
70 }
71
72 pub fn shutdown_and_join(mut self) {
73 self.request_shutdown();
74 if let Some(join) = self.join.take() {
75 let _ = join.join();
76 }
77 }
78}
79
80impl Drop for WatcherThreadHandle {
81 fn drop(&mut self) {
82 self.request_shutdown();
83 }
84}
85
86struct ActiveWatcherThreadGuard;
87
88static ACTIVE_WATCHER_THREADS: AtomicUsize = AtomicUsize::new(0);
89
90impl ActiveWatcherThreadGuard {
91 fn new() -> Self {
92 ACTIVE_WATCHER_THREADS.fetch_add(1, Ordering::SeqCst);
93 Self
94 }
95}
96
97impl Drop for ActiveWatcherThreadGuard {
98 fn drop(&mut self) {
99 ACTIVE_WATCHER_THREADS.fetch_sub(1, Ordering::SeqCst);
100 }
101}
102
103pub fn active_watcher_thread_count_for_test() -> usize {
104 ACTIVE_WATCHER_THREADS.load(Ordering::SeqCst)
105}
106
107pub fn watcher_dispatch_channel() -> (Sender<WatcherDispatchEvent>, Receiver<WatcherDispatchEvent>)
108{
109 crossbeam_channel::bounded(WATCHER_DISPATCH_CHANNEL_CAPACITY)
110}
111
112pub fn watcher_event_invalidates(kind: ¬ify::EventKind) -> bool {
115 use notify::event::{MetadataKind, ModifyKind};
116 use notify::EventKind;
117 match kind {
118 EventKind::Create(_) | EventKind::Remove(_) => true,
119 EventKind::Modify(ModifyKind::Metadata(meta)) => !matches!(
120 meta,
121 MetadataKind::AccessTime
122 | MetadataKind::Permissions
123 | MetadataKind::Ownership
124 | MetadataKind::Extended
125 ),
126 EventKind::Modify(_) => true,
127 _ => false,
128 }
129}
130
131pub fn watcher_path_is_infra_skip(path: &Path) -> bool {
132 path.components().any(|c| {
133 matches!(c, Component::Normal(name) if matches!(
134 name.to_str().unwrap_or(""),
135 ".git" | ".opencode" | ".alfonso" | ".gsd" | "node_modules" | "target"
136 ))
137 })
138}
139
140fn watcher_path_is_ignore_file(path: &Path) -> bool {
141 path.file_name()
142 .map(|n| n == ".gitignore" || n == ".aftignore")
143 .unwrap_or(false)
144}
145
146fn watcher_same_path(path: &Path, target: &Path) -> bool {
147 if path == target {
148 return true;
149 }
150
151 std::fs::canonicalize(target)
152 .map(|target| path == target)
153 .unwrap_or(false)
154}
155
156fn watcher_path_is_git_info_exclude(config: &WatcherFilterConfig, path: &Path) -> bool {
157 watcher_same_path(path, &config.git_info_exclude_path())
158}
159
160fn watcher_path_is_global_gitignore(path: &Path) -> bool {
161 ignore::gitignore::gitconfig_excludes_path()
162 .as_deref()
163 .is_some_and(|global_ignore| watcher_same_path(path, global_ignore))
164}
165
166fn watcher_path_can_change_corpus_ignore(config: &WatcherFilterConfig, path: &Path) -> bool {
167 if watcher_path_is_global_gitignore(path) {
168 return true;
169 }
170 if watcher_path_is_git_info_exclude(config, path) {
171 return true;
172 }
173 if !path.starts_with(&config.project_root) {
174 return false;
175 }
176
177 watcher_path_is_ignore_file(path) && !watcher_path_is_infra_skip(path)
178}
179
180pub fn canonicalize_watcher_path(path: PathBuf) -> PathBuf {
181 if let Ok(canonical) = std::fs::canonicalize(&path) {
182 return canonical;
183 }
184
185 let parent = path.parent().map(Path::to_path_buf);
186 let file_name = path.file_name().map(std::ffi::OsStr::to_os_string);
187 match (parent, file_name) {
188 (Some(parent), Some(file_name)) => std::fs::canonicalize(parent)
189 .map(|canonical_parent| canonical_parent.join(file_name))
190 .unwrap_or(path),
191 _ => path,
192 }
193}
194
195fn watcher_path_is_ignored_by_matcher(matcher: &SharedGitignore, path: &Path) -> bool {
196 if watcher_path_is_infra_skip(path) {
197 return true;
198 }
199
200 let guard = matcher
201 .read()
202 .unwrap_or_else(|poisoned| poisoned.into_inner());
203 if let Some(matcher) = guard.as_ref() {
204 if path.starts_with(matcher.path()) {
205 let is_dir = path.is_dir();
206 return matcher
207 .matched_path_or_any_parents(path, is_dir)
208 .is_ignore();
209 }
210 }
211
212 false
213}
214
215#[derive(Debug, Default, Clone, PartialEq, Eq)]
216pub struct FilteredWatcherPaths {
217 pub changed: BTreeSet<PathBuf>,
218 pub ignore_file_changed: bool,
219}
220
221fn filter_canonical_paths(
222 config: &WatcherFilterConfig,
223 matcher: &SharedGitignore,
224 raw_paths: BTreeSet<PathBuf>,
225) -> FilteredWatcherPaths {
226 let ignore_file_changed = raw_paths
227 .iter()
228 .any(|path| watcher_path_can_change_corpus_ignore(config, path));
229
230 let changed = raw_paths
231 .into_iter()
232 .filter(|path| {
233 if watcher_path_is_infra_skip(path) {
234 return false;
235 }
236
237 if watcher_path_is_global_gitignore(path)
238 || watcher_path_is_git_info_exclude(config, path)
239 {
240 return false;
241 }
242
243 if watcher_path_is_ignored_by_matcher(matcher, path) {
244 return false;
245 }
246 true
247 })
248 .collect();
249
250 FilteredWatcherPaths {
251 changed,
252 ignore_file_changed,
253 }
254}
255
256pub fn filter_watcher_raw_paths_for_test<I>(
257 config: &WatcherFilterConfig,
258 matcher: &SharedGitignore,
259 raw_paths: I,
260) -> FilteredWatcherPaths
261where
262 I: IntoIterator<Item = PathBuf>,
263{
264 let raw_paths = raw_paths
265 .into_iter()
266 .map(canonicalize_watcher_path)
267 .collect::<BTreeSet<_>>();
268 filter_canonical_paths(config, matcher, raw_paths)
269}
270
271pub fn run_watcher_thread<W, E, F>(
272 config: WatcherFilterConfig,
273 extra_watch_paths: Vec<PathBuf>,
274 matcher: SharedGitignore,
275 matcher_generation: Arc<AtomicU64>,
276 dispatch_tx: Sender<WatcherDispatchEvent>,
277 shutdown: Arc<AtomicBool>,
278 attach: F,
279) where
280 W: Send + 'static,
281 E: std::fmt::Display,
282 F: FnOnce(PathBuf, Vec<PathBuf>, mpsc::Sender<notify::Result<notify::Event>>) -> Result<W, E>,
283{
284 let _active_guard = ActiveWatcherThreadGuard::new();
285 let (raw_tx, raw_rx) = mpsc::channel();
286 let root_path = config.project_root.clone();
287 match attach(root_path.clone(), extra_watch_paths, raw_tx) {
288 Ok(_watcher) => {
289 if shutdown.load(Ordering::SeqCst) {
290 return;
291 }
292 crate::slog_info!("watcher started: {}", root_path.display());
293 let mut filter = WatcherFilterThread::new(
294 config,
295 matcher,
296 matcher_generation,
297 dispatch_tx,
298 shutdown,
299 );
300 filter.run(raw_rx);
301 }
302 Err(error) => {
303 if !shutdown.load(Ordering::SeqCst) {
304 log::debug!(
305 "watcher init failed: {} — callers will work with stale data",
306 error
307 );
308 let _ = dispatch_tx.send(WatcherDispatchEvent::Error(format!(
309 "watcher init failed: {error}"
310 )));
311 }
312 }
313 }
314}
315
316struct WatcherFilterThread {
317 config: WatcherFilterConfig,
318 matcher: SharedGitignore,
319 matcher_generation: Arc<AtomicU64>,
320 dispatch_tx: Sender<WatcherDispatchEvent>,
321 shutdown: Arc<AtomicBool>,
322 raw_paths: BTreeSet<PathBuf>,
323 flush_deadline: Option<Instant>,
324}
325
326impl WatcherFilterThread {
327 fn new(
328 config: WatcherFilterConfig,
329 matcher: SharedGitignore,
330 matcher_generation: Arc<AtomicU64>,
331 dispatch_tx: Sender<WatcherDispatchEvent>,
332 shutdown: Arc<AtomicBool>,
333 ) -> Self {
334 Self {
335 config,
336 matcher,
337 matcher_generation,
338 dispatch_tx,
339 shutdown,
340 raw_paths: BTreeSet::new(),
341 flush_deadline: None,
342 }
343 }
344
345 fn run(&mut self, raw_rx: mpsc::Receiver<notify::Result<notify::Event>>) {
346 loop {
347 if self.shutdown.load(Ordering::SeqCst) {
348 self.flush_pending();
349 return;
350 }
351 if self.project_root_was_deleted() {
352 self.raw_paths.clear();
353 let _ = self.send_dispatch(WatcherDispatchEvent::RootDeleted);
354 return;
355 }
356 if self.flush_deadline_reached() {
357 if !self.flush_pending() {
358 return;
359 }
360 continue;
361 }
362
363 match raw_rx.recv_timeout(self.next_recv_timeout()) {
364 Ok(Ok(event)) => {
365 if watcher_event_invalidates(&event.kind) && !self.push_raw_paths(event.paths) {
366 return;
367 }
368 }
369 Ok(Err(error)) => {
370 let _ = self.send_dispatch(WatcherDispatchEvent::Error(error.to_string()));
371 return;
372 }
373 Err(mpsc::RecvTimeoutError::Timeout) => {
374 if !self.flush_pending() {
375 return;
376 }
377 }
378 Err(mpsc::RecvTimeoutError::Disconnected) => {
379 if !self.shutdown.load(Ordering::SeqCst) {
380 let _ = self.send_dispatch(WatcherDispatchEvent::Error(
381 "watcher channel disconnected".to_string(),
382 ));
383 }
384 return;
385 }
386 }
387 }
388 }
389
390 fn project_root_was_deleted(&self) -> bool {
391 !self.config.project_root.exists()
392 }
393
394 fn push_raw_paths(&mut self, paths: Vec<PathBuf>) -> bool {
395 for path in paths {
396 self.raw_paths.insert(canonicalize_watcher_path(path));
397 }
398 if !self.raw_paths.is_empty() && self.flush_deadline.is_none() {
399 self.flush_deadline = Some(Instant::now() + WATCHER_FLUSH_WINDOW);
400 }
401 if self.raw_paths.len() >= WATCHER_MAX_BATCH_PATHS {
402 return self.flush_pending();
403 }
404 true
405 }
406
407 fn next_recv_timeout(&self) -> Duration {
408 let root_check = ROOT_DELETED_CHECK_INTERVAL;
409 match self.flush_deadline {
410 Some(deadline) => deadline
411 .saturating_duration_since(Instant::now())
412 .min(root_check),
413 None => root_check,
414 }
415 }
416
417 fn flush_deadline_reached(&self) -> bool {
418 self.flush_deadline
419 .is_some_and(|deadline| Instant::now() >= deadline)
420 }
421
422 fn flush_pending(&mut self) -> bool {
423 if self.raw_paths.is_empty() {
424 self.flush_deadline = None;
425 return true;
426 }
427
428 let raw_paths = std::mem::take(&mut self.raw_paths);
429 self.flush_deadline = None;
430 let ignore_path = raw_paths
431 .iter()
432 .find(|path| watcher_path_can_change_corpus_ignore(&self.config, path))
433 .cloned();
434 let ignore_file_changed = ignore_path.is_some();
435 if let Some(path) = ignore_path {
436 let observed_generation = self.matcher_generation.load(Ordering::SeqCst);
437 if !self.send_dispatch(WatcherDispatchEvent::IgnoreRulesChanged { path }) {
438 return false;
439 }
440 if !self.wait_for_gitignore_rebuild(observed_generation) {
441 return false;
442 }
443 }
444
445 let filtered = filter_canonical_paths(&self.config, &self.matcher, raw_paths);
446 debug_assert_eq!(filtered.ignore_file_changed, ignore_file_changed);
447 if filtered.changed.is_empty() {
448 return true;
449 }
450 self.send_dispatch(WatcherDispatchEvent::Paths(
451 filtered.changed.into_iter().collect(),
452 ))
453 }
454
455 fn wait_for_gitignore_rebuild(&self, observed_generation: u64) -> bool {
456 while !self.shutdown.load(Ordering::SeqCst)
457 && self.matcher_generation.load(Ordering::SeqCst) == observed_generation
458 {
459 if self.project_root_was_deleted() {
460 let _ = self.send_dispatch(WatcherDispatchEvent::RootDeleted);
461 return false;
462 }
463 thread::sleep(GITIGNORE_REBUILD_POLL_INTERVAL);
464 }
465 !self.shutdown.load(Ordering::SeqCst)
466 }
467
468 fn send_dispatch(&self, event: WatcherDispatchEvent) -> bool {
469 let mut event = event;
470 loop {
471 match self
472 .dispatch_tx
473 .send_timeout(event, DISPATCH_SEND_POLL_INTERVAL)
474 {
475 Ok(()) => return true,
476 Err(SendTimeoutError::Timeout(returned)) => {
477 if self.shutdown.load(Ordering::SeqCst) {
478 return false;
479 }
480 event = returned;
481 }
482 Err(SendTimeoutError::Disconnected(_)) => return false,
483 }
484 }
485 }
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491 use ignore::gitignore::GitignoreBuilder;
492 use notify::event::{AccessKind, AccessMode, CreateKind, DataChange, MetadataKind, ModifyKind};
493 use notify::EventKind;
494 use tempfile::TempDir;
495
496 fn shared_matcher(root: &Path) -> SharedGitignore {
497 let root = std::fs::canonicalize(root).unwrap_or_else(|_| root.to_path_buf());
498 let mut builder = GitignoreBuilder::new(&root);
499 let ignore = root.join(".gitignore");
500 if ignore.exists() {
501 if let Some(error) = builder.add(&ignore) {
502 panic!("gitignore parse error: {error}");
503 }
504 }
505 let matcher = builder.build().unwrap();
506 let matcher = (matcher.num_ignores() > 0).then(|| Arc::new(matcher));
507 Arc::new(RwLock::new(matcher))
508 }
509
510 #[test]
511 fn event_kind_filter_accepts_content_changes_only() {
512 assert!(watcher_event_invalidates(&EventKind::Create(
513 CreateKind::File
514 )));
515 assert!(watcher_event_invalidates(&EventKind::Modify(
516 ModifyKind::Data(DataChange::Content)
517 )));
518 assert!(watcher_event_invalidates(&EventKind::Modify(
519 ModifyKind::Metadata(MetadataKind::WriteTime)
520 )));
521 assert!(!watcher_event_invalidates(&EventKind::Modify(
522 ModifyKind::Metadata(MetadataKind::AccessTime)
523 )));
524 assert!(!watcher_event_invalidates(&EventKind::Modify(
525 ModifyKind::Metadata(MetadataKind::Permissions)
526 )));
527 assert!(!watcher_event_invalidates(&EventKind::Access(
528 AccessKind::Open(AccessMode::Read)
529 )));
530 assert!(!watcher_event_invalidates(&EventKind::Other));
531 }
532
533 #[test]
534 fn filters_gitignored_paths_with_shared_matcher() {
535 let tmp = TempDir::new().unwrap();
536 let root = std::fs::canonicalize(tmp.path()).unwrap();
537 std::fs::write(root.join(".gitignore"), "ignored/\n").unwrap();
538 std::fs::create_dir_all(root.join("ignored")).unwrap();
539 std::fs::write(root.join("ignored/file.ts"), "ignored").unwrap();
540 std::fs::write(root.join("kept.ts"), "kept").unwrap();
541 let matcher = shared_matcher(&root);
542 let config = WatcherFilterConfig::new(root.clone(), None);
543
544 let filtered = filter_watcher_raw_paths_for_test(
545 &config,
546 &matcher,
547 [root.join("ignored/file.ts"), root.join("kept.ts")],
548 );
549
550 assert!(!filtered.changed.contains(&root.join("ignored/file.ts")));
551 assert!(filtered.changed.contains(&root.join("kept.ts")));
552 }
553
554 #[test]
555 fn ignore_rule_paths_are_control_only_for_external_excludes() {
556 let tmp = TempDir::new().unwrap();
557 let root = std::fs::canonicalize(tmp.path()).unwrap();
558 let git_info = root.join(".git").join("info");
559 std::fs::create_dir_all(&git_info).unwrap();
560 let exclude = git_info.join("exclude");
561 std::fs::write(&exclude, "ignored/\n").unwrap();
562 let matcher = Arc::new(RwLock::new(None));
563 let config = WatcherFilterConfig::new(root, None);
564
565 let filtered = filter_watcher_raw_paths_for_test(&config, &matcher, [exclude]);
566
567 assert!(filtered.ignore_file_changed);
568 assert!(filtered.changed.is_empty());
569 }
570
571 #[test]
572 fn root_deleted_sends_control_and_exits() {
573 let tmp = TempDir::new().unwrap();
574 let root = std::fs::canonicalize(tmp.path()).unwrap();
575 let matcher = Arc::new(RwLock::new(None));
576 let generation = Arc::new(AtomicU64::new(0));
577 let shutdown = Arc::new(AtomicBool::new(false));
578 let (dispatch_tx, dispatch_rx) = watcher_dispatch_channel();
579 let (raw_tx, raw_rx) = mpsc::channel();
580 let config = WatcherFilterConfig::new(root.clone(), None);
581 let mut filter = WatcherFilterThread::new(
582 config,
583 matcher,
584 generation,
585 dispatch_tx,
586 Arc::clone(&shutdown),
587 );
588 let handle = thread::spawn(move || filter.run(raw_rx));
589 let _raw_tx = raw_tx;
590 std::fs::remove_dir_all(&root).unwrap();
591
592 let event = dispatch_rx
593 .recv_timeout(Duration::from_secs(2))
594 .expect("root deleted event");
595 assert_eq!(event, WatcherDispatchEvent::RootDeleted);
596 shutdown.store(true, Ordering::SeqCst);
597 handle.join().unwrap();
598 }
599}