Skip to main content

aft/
watcher_filter.rs

1use std::collections::BTreeSet;
2use std::path::{Component, Path, PathBuf};
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::{mpsc, Arc, RwLock};
5use std::thread::{self, JoinHandle};
6use std::time::{Duration, Instant};
7
8use crossbeam_channel::{Receiver, SendTimeoutError, Sender};
9use ignore::gitignore::Gitignore;
10
11pub type SharedGitignore = Arc<RwLock<Option<Arc<Gitignore>>>>;
12
13pub const WATCHER_FLUSH_WINDOW: Duration = Duration::from_millis(250);
14pub const WATCHER_MAX_BATCH_PATHS: usize = 1024;
15pub const WATCHER_DISPATCH_CHANNEL_CAPACITY: usize = 1024;
16const ROOT_DELETED_CHECK_INTERVAL: Duration = Duration::from_millis(250);
17const GITIGNORE_REBUILD_POLL_INTERVAL: Duration = Duration::from_millis(10);
18const DISPATCH_SEND_POLL_INTERVAL: Duration = Duration::from_millis(50);
19
20#[derive(Debug, Clone)]
21pub struct WatcherFilterConfig {
22    pub project_root: PathBuf,
23    pub git_common_dir: Option<PathBuf>,
24}
25
26impl WatcherFilterConfig {
27    pub fn new(project_root: PathBuf, git_common_dir: Option<PathBuf>) -> Self {
28        Self {
29            project_root,
30            git_common_dir,
31        }
32    }
33
34    fn git_info_exclude_path(&self) -> PathBuf {
35        self.git_common_dir
36            .clone()
37            .unwrap_or_else(|| self.project_root.join(".git"))
38            .join("info")
39            .join("exclude")
40    }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum WatcherDispatchEvent {
45    Paths(Vec<PathBuf>),
46    RescanRequired,
47    IgnoreRulesChanged { path: PathBuf },
48    RootDeleted,
49    Error(String),
50}
51
52pub struct WatcherThreadHandle {
53    shutdown: Arc<AtomicBool>,
54    join: Option<JoinHandle<()>>,
55}
56
57impl WatcherThreadHandle {
58    pub fn new(shutdown: Arc<AtomicBool>, join: JoinHandle<()>) -> Self {
59        Self {
60            shutdown,
61            join: Some(join),
62        }
63    }
64
65    pub fn request_shutdown(&self) {
66        self.shutdown.store(true, Ordering::SeqCst);
67    }
68
69    pub fn is_finished(&self) -> bool {
70        self.join.as_ref().is_none_or(|join| join.is_finished())
71    }
72
73    pub fn shutdown_and_join(mut self) {
74        self.request_shutdown();
75        if let Some(join) = self.join.take() {
76            let _ = join.join();
77        }
78    }
79}
80
81impl Drop for WatcherThreadHandle {
82    fn drop(&mut self) {
83        self.request_shutdown();
84    }
85}
86
87pub fn watcher_dispatch_channel() -> (Sender<WatcherDispatchEvent>, Receiver<WatcherDispatchEvent>)
88{
89    crossbeam_channel::bounded(WATCHER_DISPATCH_CHANNEL_CAPACITY)
90}
91
92/// Decide whether a `notify::Event` represents a real content change worth
93/// invalidating cached state for.
94pub fn watcher_event_invalidates(kind: &notify::EventKind) -> bool {
95    use notify::event::{MetadataKind, ModifyKind};
96    use notify::EventKind;
97    match kind {
98        EventKind::Create(_) | EventKind::Remove(_) => true,
99        EventKind::Modify(ModifyKind::Metadata(meta)) => !matches!(
100            meta,
101            MetadataKind::AccessTime
102                | MetadataKind::Permissions
103                | MetadataKind::Ownership
104                | MetadataKind::Extended
105        ),
106        EventKind::Modify(_) => true,
107        _ => false,
108    }
109}
110
111pub fn watcher_path_is_infra_skip(path: &Path) -> bool {
112    path.components().any(|c| {
113        matches!(c, Component::Normal(name) if matches!(
114            name.to_str().unwrap_or(""),
115            ".git" | ".opencode" | ".alfonso" | ".gsd" | "node_modules" | "target"
116        ))
117    })
118}
119
120fn watcher_path_is_ignore_file(path: &Path) -> bool {
121    path.file_name()
122        .map(|n| n == ".gitignore" || n == ".aftignore")
123        .unwrap_or(false)
124}
125
126fn watcher_same_path(path: &Path, target: &Path) -> bool {
127    if path == target {
128        return true;
129    }
130
131    std::fs::canonicalize(target)
132        .map(|target| path == target)
133        .unwrap_or(false)
134}
135
136fn watcher_path_is_git_info_exclude(config: &WatcherFilterConfig, path: &Path) -> bool {
137    watcher_same_path(path, &config.git_info_exclude_path())
138}
139
140fn watcher_path_is_global_gitignore(path: &Path) -> bool {
141    ignore::gitignore::gitconfig_excludes_path()
142        .as_deref()
143        .is_some_and(|global_ignore| watcher_same_path(path, global_ignore))
144}
145
146fn watcher_path_can_change_corpus_ignore(config: &WatcherFilterConfig, path: &Path) -> bool {
147    if watcher_path_is_global_gitignore(path) {
148        return true;
149    }
150    if watcher_path_is_git_info_exclude(config, path) {
151        return true;
152    }
153    if !path.starts_with(&config.project_root) {
154        return false;
155    }
156
157    watcher_path_is_ignore_file(path) && !watcher_path_is_infra_skip(path)
158}
159
160pub fn canonicalize_watcher_path(path: PathBuf) -> PathBuf {
161    if let Ok(canonical) = std::fs::canonicalize(&path) {
162        return canonical;
163    }
164
165    let parent = path.parent().map(Path::to_path_buf);
166    let file_name = path.file_name().map(std::ffi::OsStr::to_os_string);
167    match (parent, file_name) {
168        (Some(parent), Some(file_name)) => std::fs::canonicalize(parent)
169            .map(|canonical_parent| canonical_parent.join(file_name))
170            .unwrap_or(path),
171        _ => path,
172    }
173}
174
175fn watcher_path_is_ignored_by_matcher(matcher: &SharedGitignore, path: &Path) -> bool {
176    if watcher_path_is_infra_skip(path) {
177        return true;
178    }
179
180    let guard = matcher
181        .read()
182        .unwrap_or_else(|poisoned| poisoned.into_inner());
183    if let Some(matcher) = guard.as_ref() {
184        if path.starts_with(matcher.path()) {
185            let is_dir = path.is_dir();
186            return matcher
187                .matched_path_or_any_parents(path, is_dir)
188                .is_ignore();
189        }
190    }
191
192    false
193}
194
195#[derive(Debug, Default, Clone, PartialEq, Eq)]
196pub struct FilteredWatcherPaths {
197    pub changed: BTreeSet<PathBuf>,
198    pub ignore_file_changed: bool,
199}
200
201fn filter_canonical_paths(
202    config: &WatcherFilterConfig,
203    matcher: &SharedGitignore,
204    raw_paths: BTreeSet<PathBuf>,
205) -> FilteredWatcherPaths {
206    let ignore_file_changed = raw_paths
207        .iter()
208        .any(|path| watcher_path_can_change_corpus_ignore(config, path));
209
210    let changed = raw_paths
211        .into_iter()
212        .filter(|path| {
213            if watcher_path_is_infra_skip(path) {
214                return false;
215            }
216
217            if watcher_path_is_global_gitignore(path)
218                || watcher_path_is_git_info_exclude(config, path)
219            {
220                return false;
221            }
222
223            if watcher_path_is_ignored_by_matcher(matcher, path) {
224                return false;
225            }
226            true
227        })
228        .collect();
229
230    FilteredWatcherPaths {
231        changed,
232        ignore_file_changed,
233    }
234}
235
236pub fn filter_watcher_raw_paths_for_test<I>(
237    config: &WatcherFilterConfig,
238    matcher: &SharedGitignore,
239    raw_paths: I,
240) -> FilteredWatcherPaths
241where
242    I: IntoIterator<Item = PathBuf>,
243{
244    let raw_paths = raw_paths
245        .into_iter()
246        .map(canonicalize_watcher_path)
247        .collect::<BTreeSet<_>>();
248    filter_canonical_paths(config, matcher, raw_paths)
249}
250
251pub fn run_watcher_thread<W, E, F>(
252    config: WatcherFilterConfig,
253    extra_watch_paths: Vec<PathBuf>,
254    matcher: SharedGitignore,
255    matcher_generation: Arc<AtomicU64>,
256    dispatch_tx: Sender<WatcherDispatchEvent>,
257    shutdown: Arc<AtomicBool>,
258    attach: F,
259) where
260    W: Send + 'static,
261    E: std::fmt::Display,
262    F: FnOnce(PathBuf, Vec<PathBuf>, mpsc::Sender<notify::Result<notify::Event>>) -> Result<W, E>,
263{
264    let (raw_tx, raw_rx) = mpsc::channel();
265    let root_path = config.project_root.clone();
266    match attach(root_path.clone(), extra_watch_paths, raw_tx) {
267        Ok(_watcher) => {
268            if shutdown.load(Ordering::SeqCst) {
269                return;
270            }
271            crate::slog_info!("watcher started: {}", root_path.display());
272            let mut filter = WatcherFilterThread::new(
273                config,
274                matcher,
275                matcher_generation,
276                dispatch_tx,
277                shutdown,
278            );
279            filter.run(raw_rx);
280        }
281        Err(error) => {
282            if !shutdown.load(Ordering::SeqCst) {
283                log::debug!(
284                    "watcher init failed: {} — callers will work with stale data",
285                    error
286                );
287                let _ = dispatch_tx.send(WatcherDispatchEvent::Error(format!(
288                    "watcher init failed: {error}"
289                )));
290            }
291        }
292    }
293}
294
295struct WatcherFilterThread {
296    config: WatcherFilterConfig,
297    matcher: SharedGitignore,
298    matcher_generation: Arc<AtomicU64>,
299    dispatch_tx: Sender<WatcherDispatchEvent>,
300    shutdown: Arc<AtomicBool>,
301    raw_paths: BTreeSet<PathBuf>,
302    flush_deadline: Option<Instant>,
303}
304
305impl WatcherFilterThread {
306    fn new(
307        config: WatcherFilterConfig,
308        matcher: SharedGitignore,
309        matcher_generation: Arc<AtomicU64>,
310        dispatch_tx: Sender<WatcherDispatchEvent>,
311        shutdown: Arc<AtomicBool>,
312    ) -> Self {
313        Self {
314            config,
315            matcher,
316            matcher_generation,
317            dispatch_tx,
318            shutdown,
319            raw_paths: BTreeSet::new(),
320            flush_deadline: None,
321        }
322    }
323
324    fn run(&mut self, raw_rx: mpsc::Receiver<notify::Result<notify::Event>>) {
325        loop {
326            if self.shutdown.load(Ordering::SeqCst) {
327                self.flush_pending();
328                return;
329            }
330            if self.project_root_was_deleted() {
331                self.raw_paths.clear();
332                let _ = self.send_dispatch(WatcherDispatchEvent::RootDeleted);
333                return;
334            }
335            if self.flush_deadline_reached() {
336                if !self.flush_pending() {
337                    return;
338                }
339                continue;
340            }
341
342            match raw_rx.recv_timeout(self.next_recv_timeout()) {
343                Ok(Ok(event)) => {
344                    if event.need_rescan() {
345                        self.raw_paths.clear();
346                        self.flush_deadline = None;
347                        if !self.send_dispatch(WatcherDispatchEvent::RescanRequired) {
348                            return;
349                        }
350                        continue;
351                    }
352                    if watcher_event_invalidates(&event.kind) && !self.push_raw_paths(event.paths) {
353                        return;
354                    }
355                }
356                Ok(Err(error)) => {
357                    let _ = self.send_dispatch(WatcherDispatchEvent::Error(error.to_string()));
358                    return;
359                }
360                Err(mpsc::RecvTimeoutError::Timeout) => {
361                    if !self.flush_pending() {
362                        return;
363                    }
364                }
365                Err(mpsc::RecvTimeoutError::Disconnected) => {
366                    if !self.shutdown.load(Ordering::SeqCst) {
367                        let _ = self.send_dispatch(WatcherDispatchEvent::Error(
368                            "watcher channel disconnected".to_string(),
369                        ));
370                    }
371                    return;
372                }
373            }
374        }
375    }
376
377    fn project_root_was_deleted(&self) -> bool {
378        !self.config.project_root.exists()
379    }
380
381    fn push_raw_paths(&mut self, paths: Vec<PathBuf>) -> bool {
382        for path in paths {
383            self.raw_paths.insert(canonicalize_watcher_path(path));
384        }
385        if !self.raw_paths.is_empty() && self.flush_deadline.is_none() {
386            self.flush_deadline = Some(Instant::now() + WATCHER_FLUSH_WINDOW);
387        }
388        if self.raw_paths.len() >= WATCHER_MAX_BATCH_PATHS {
389            return self.flush_pending();
390        }
391        true
392    }
393
394    fn next_recv_timeout(&self) -> Duration {
395        let root_check = ROOT_DELETED_CHECK_INTERVAL;
396        match self.flush_deadline {
397            Some(deadline) => deadline
398                .saturating_duration_since(Instant::now())
399                .min(root_check),
400            None => root_check,
401        }
402    }
403
404    fn flush_deadline_reached(&self) -> bool {
405        self.flush_deadline
406            .is_some_and(|deadline| Instant::now() >= deadline)
407    }
408
409    fn flush_pending(&mut self) -> bool {
410        if self.raw_paths.is_empty() {
411            self.flush_deadline = None;
412            return true;
413        }
414
415        let raw_paths = std::mem::take(&mut self.raw_paths);
416        self.flush_deadline = None;
417        let ignore_path = raw_paths
418            .iter()
419            .find(|path| watcher_path_can_change_corpus_ignore(&self.config, path))
420            .cloned();
421        let ignore_file_changed = ignore_path.is_some();
422        if let Some(path) = ignore_path {
423            let observed_generation = self.matcher_generation.load(Ordering::SeqCst);
424            if !self.send_dispatch(WatcherDispatchEvent::IgnoreRulesChanged { path }) {
425                return false;
426            }
427            if !self.wait_for_gitignore_rebuild(observed_generation) {
428                return false;
429            }
430        }
431
432        let filtered = filter_canonical_paths(&self.config, &self.matcher, raw_paths);
433        debug_assert_eq!(filtered.ignore_file_changed, ignore_file_changed);
434        if filtered.changed.is_empty() {
435            return true;
436        }
437        self.send_dispatch(WatcherDispatchEvent::Paths(
438            filtered.changed.into_iter().collect(),
439        ))
440    }
441
442    fn wait_for_gitignore_rebuild(&self, observed_generation: u64) -> bool {
443        while !self.shutdown.load(Ordering::SeqCst)
444            && self.matcher_generation.load(Ordering::SeqCst) == observed_generation
445        {
446            if self.project_root_was_deleted() {
447                let _ = self.send_dispatch(WatcherDispatchEvent::RootDeleted);
448                return false;
449            }
450            thread::sleep(GITIGNORE_REBUILD_POLL_INTERVAL);
451        }
452        !self.shutdown.load(Ordering::SeqCst)
453    }
454
455    fn send_dispatch(&self, event: WatcherDispatchEvent) -> bool {
456        let mut event = event;
457        loop {
458            match self
459                .dispatch_tx
460                .send_timeout(event, DISPATCH_SEND_POLL_INTERVAL)
461            {
462                Ok(()) => return true,
463                Err(SendTimeoutError::Timeout(returned)) => {
464                    if self.shutdown.load(Ordering::SeqCst) {
465                        return false;
466                    }
467                    event = returned;
468                }
469                Err(SendTimeoutError::Disconnected(_)) => return false,
470            }
471        }
472    }
473}
474
475#[cfg(test)]
476mod tests {
477    use super::*;
478    use ignore::gitignore::GitignoreBuilder;
479    use notify::event::{
480        AccessKind, AccessMode, CreateKind, DataChange, Flag, MetadataKind, ModifyKind,
481    };
482    use notify::EventKind;
483    use tempfile::TempDir;
484
485    fn shared_matcher(root: &Path) -> SharedGitignore {
486        let root = std::fs::canonicalize(root).unwrap_or_else(|_| root.to_path_buf());
487        let mut builder = GitignoreBuilder::new(&root);
488        let ignore = root.join(".gitignore");
489        if ignore.exists() {
490            if let Some(error) = builder.add(&ignore) {
491                panic!("gitignore parse error: {error}");
492            }
493        }
494        let matcher = builder.build().unwrap();
495        let matcher = (matcher.num_ignores() > 0).then(|| Arc::new(matcher));
496        Arc::new(RwLock::new(matcher))
497    }
498
499    #[test]
500    fn event_kind_filter_accepts_content_changes_only() {
501        assert!(watcher_event_invalidates(&EventKind::Create(
502            CreateKind::File
503        )));
504        assert!(watcher_event_invalidates(&EventKind::Modify(
505            ModifyKind::Data(DataChange::Content)
506        )));
507        assert!(watcher_event_invalidates(&EventKind::Modify(
508            ModifyKind::Metadata(MetadataKind::WriteTime)
509        )));
510        assert!(!watcher_event_invalidates(&EventKind::Modify(
511            ModifyKind::Metadata(MetadataKind::AccessTime)
512        )));
513        assert!(!watcher_event_invalidates(&EventKind::Modify(
514            ModifyKind::Metadata(MetadataKind::Permissions)
515        )));
516        assert!(!watcher_event_invalidates(&EventKind::Access(
517            AccessKind::Open(AccessMode::Read)
518        )));
519        assert!(!watcher_event_invalidates(&EventKind::Other));
520    }
521
522    #[test]
523    fn rescan_event_dispatches_control_and_supersedes_pending_paths() {
524        let tmp = TempDir::new().unwrap();
525        let root = std::fs::canonicalize(tmp.path()).unwrap();
526        let pending = root.join("pending.rs");
527        std::fs::write(&pending, "fn main() {}\n").unwrap();
528        let matcher = Arc::new(RwLock::new(None));
529        let generation = Arc::new(AtomicU64::new(0));
530        let shutdown = Arc::new(AtomicBool::new(false));
531        let (dispatch_tx, dispatch_rx) = watcher_dispatch_channel();
532        let (raw_tx, raw_rx) = mpsc::channel();
533        let config = WatcherFilterConfig::new(root, None);
534        let mut filter = WatcherFilterThread::new(
535            config,
536            matcher,
537            generation,
538            dispatch_tx,
539            Arc::clone(&shutdown),
540        );
541        let handle = thread::spawn(move || filter.run(raw_rx));
542
543        let mut granular = notify::Event::new(EventKind::Create(CreateKind::File));
544        granular.paths.push(pending);
545        raw_tx.send(Ok(granular)).unwrap();
546        raw_tx
547            .send(Ok(
548                notify::Event::new(EventKind::Other).set_flag(Flag::Rescan)
549            ))
550            .unwrap();
551
552        let event = dispatch_rx
553            .recv_timeout(Duration::from_secs(2))
554            .expect("rescan event");
555        assert_eq!(event, WatcherDispatchEvent::RescanRequired);
556        assert!(
557            dispatch_rx
558                .recv_timeout(WATCHER_FLUSH_WINDOW + Duration::from_millis(100))
559                .is_err(),
560            "pending granular paths should be cleared by a rescan signal"
561        );
562
563        shutdown.store(true, Ordering::SeqCst);
564        drop(raw_tx);
565        handle.join().unwrap();
566    }
567
568    #[test]
569    fn filters_gitignored_paths_with_shared_matcher() {
570        let tmp = TempDir::new().unwrap();
571        let root = std::fs::canonicalize(tmp.path()).unwrap();
572        std::fs::write(root.join(".gitignore"), "ignored/\n").unwrap();
573        std::fs::create_dir_all(root.join("ignored")).unwrap();
574        std::fs::write(root.join("ignored/file.ts"), "ignored").unwrap();
575        std::fs::write(root.join("kept.ts"), "kept").unwrap();
576        let matcher = shared_matcher(&root);
577        let config = WatcherFilterConfig::new(root.clone(), None);
578
579        let filtered = filter_watcher_raw_paths_for_test(
580            &config,
581            &matcher,
582            [root.join("ignored/file.ts"), root.join("kept.ts")],
583        );
584
585        assert!(!filtered.changed.contains(&root.join("ignored/file.ts")));
586        assert!(filtered.changed.contains(&root.join("kept.ts")));
587    }
588
589    #[test]
590    fn ignore_rule_paths_are_control_only_for_external_excludes() {
591        let tmp = TempDir::new().unwrap();
592        let root = std::fs::canonicalize(tmp.path()).unwrap();
593        let git_info = root.join(".git").join("info");
594        std::fs::create_dir_all(&git_info).unwrap();
595        let exclude = git_info.join("exclude");
596        std::fs::write(&exclude, "ignored/\n").unwrap();
597        let matcher = Arc::new(RwLock::new(None));
598        let config = WatcherFilterConfig::new(root, None);
599
600        let filtered = filter_watcher_raw_paths_for_test(&config, &matcher, [exclude]);
601
602        assert!(filtered.ignore_file_changed);
603        assert!(filtered.changed.is_empty());
604    }
605
606    #[test]
607    fn root_deleted_sends_control_and_exits() {
608        let tmp = TempDir::new().unwrap();
609        let root = std::fs::canonicalize(tmp.path()).unwrap();
610        let matcher = Arc::new(RwLock::new(None));
611        let generation = Arc::new(AtomicU64::new(0));
612        let shutdown = Arc::new(AtomicBool::new(false));
613        let (dispatch_tx, dispatch_rx) = watcher_dispatch_channel();
614        let (raw_tx, raw_rx) = mpsc::channel();
615        let config = WatcherFilterConfig::new(root.clone(), None);
616        let mut filter = WatcherFilterThread::new(
617            config,
618            matcher,
619            generation,
620            dispatch_tx,
621            Arc::clone(&shutdown),
622        );
623        let handle = thread::spawn(move || filter.run(raw_rx));
624        let _raw_tx = raw_tx;
625        std::fs::remove_dir_all(&root).unwrap();
626
627        let event = dispatch_rx
628            .recv_timeout(Duration::from_secs(2))
629            .expect("root deleted event");
630        assert_eq!(event, WatcherDispatchEvent::RootDeleted);
631        shutdown.store(true, Ordering::SeqCst);
632        handle.join().unwrap();
633    }
634}