Skip to main content

aft/
watcher_filter.rs

1use std::collections::BTreeSet;
2use std::path::{Component, Path, PathBuf};
3use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
4use std::sync::{mpsc, Arc, RwLock};
5use std::thread::{self, JoinHandle};
6use std::time::{Duration, Instant};
7
8use crossbeam_channel::{Receiver, SendTimeoutError, Sender};
9use ignore::gitignore::Gitignore;
10
11pub type SharedGitignore = Arc<RwLock<Option<Arc<Gitignore>>>>;
12
13pub const WATCHER_FLUSH_WINDOW: Duration = Duration::from_millis(250);
14pub const WATCHER_MAX_BATCH_PATHS: usize = 1024;
15pub const WATCHER_DISPATCH_CHANNEL_CAPACITY: usize = 1024;
16const ROOT_DELETED_CHECK_INTERVAL: Duration = Duration::from_millis(250);
17const GITIGNORE_REBUILD_POLL_INTERVAL: Duration = Duration::from_millis(10);
18const DISPATCH_SEND_POLL_INTERVAL: Duration = Duration::from_millis(50);
19
20#[derive(Debug, Clone)]
21pub struct WatcherFilterConfig {
22    pub project_root: PathBuf,
23    pub git_common_dir: Option<PathBuf>,
24}
25
26impl WatcherFilterConfig {
27    pub fn new(project_root: PathBuf, git_common_dir: Option<PathBuf>) -> Self {
28        Self {
29            project_root,
30            git_common_dir,
31        }
32    }
33
34    fn git_info_exclude_path(&self) -> PathBuf {
35        self.git_common_dir
36            .clone()
37            .unwrap_or_else(|| self.project_root.join(".git"))
38            .join("info")
39            .join("exclude")
40    }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum WatcherDispatchEvent {
45    Paths(Vec<PathBuf>),
46    IgnoreRulesChanged { path: PathBuf },
47    RootDeleted,
48    Error(String),
49}
50
51pub struct WatcherThreadHandle {
52    shutdown: Arc<AtomicBool>,
53    join: Option<JoinHandle<()>>,
54}
55
56impl WatcherThreadHandle {
57    pub fn new(shutdown: Arc<AtomicBool>, join: JoinHandle<()>) -> Self {
58        Self {
59            shutdown,
60            join: Some(join),
61        }
62    }
63
64    pub fn request_shutdown(&self) {
65        self.shutdown.store(true, Ordering::SeqCst);
66    }
67
68    pub fn is_finished(&self) -> bool {
69        self.join.as_ref().is_none_or(|join| join.is_finished())
70    }
71
72    pub fn shutdown_and_join(mut self) {
73        self.request_shutdown();
74        if let Some(join) = self.join.take() {
75            let _ = join.join();
76        }
77    }
78}
79
80impl Drop for WatcherThreadHandle {
81    fn drop(&mut self) {
82        self.request_shutdown();
83    }
84}
85
86struct ActiveWatcherThreadGuard;
87
88static ACTIVE_WATCHER_THREADS: AtomicUsize = AtomicUsize::new(0);
89
90impl ActiveWatcherThreadGuard {
91    fn new() -> Self {
92        ACTIVE_WATCHER_THREADS.fetch_add(1, Ordering::SeqCst);
93        Self
94    }
95}
96
97impl Drop for ActiveWatcherThreadGuard {
98    fn drop(&mut self) {
99        ACTIVE_WATCHER_THREADS.fetch_sub(1, Ordering::SeqCst);
100    }
101}
102
103pub fn active_watcher_thread_count_for_test() -> usize {
104    ACTIVE_WATCHER_THREADS.load(Ordering::SeqCst)
105}
106
107pub fn watcher_dispatch_channel() -> (Sender<WatcherDispatchEvent>, Receiver<WatcherDispatchEvent>)
108{
109    crossbeam_channel::bounded(WATCHER_DISPATCH_CHANNEL_CAPACITY)
110}
111
112/// Decide whether a `notify::Event` represents a real content change worth
113/// invalidating cached state for.
114pub fn watcher_event_invalidates(kind: &notify::EventKind) -> bool {
115    use notify::event::{MetadataKind, ModifyKind};
116    use notify::EventKind;
117    match kind {
118        EventKind::Create(_) | EventKind::Remove(_) => true,
119        EventKind::Modify(ModifyKind::Metadata(meta)) => !matches!(
120            meta,
121            MetadataKind::AccessTime
122                | MetadataKind::Permissions
123                | MetadataKind::Ownership
124                | MetadataKind::Extended
125        ),
126        EventKind::Modify(_) => true,
127        _ => false,
128    }
129}
130
131pub fn watcher_path_is_infra_skip(path: &Path) -> bool {
132    path.components().any(|c| {
133        matches!(c, Component::Normal(name) if matches!(
134            name.to_str().unwrap_or(""),
135            ".git" | ".opencode" | ".alfonso" | ".gsd" | "node_modules" | "target"
136        ))
137    })
138}
139
140fn watcher_path_is_ignore_file(path: &Path) -> bool {
141    path.file_name()
142        .map(|n| n == ".gitignore" || n == ".aftignore")
143        .unwrap_or(false)
144}
145
146fn watcher_same_path(path: &Path, target: &Path) -> bool {
147    if path == target {
148        return true;
149    }
150
151    std::fs::canonicalize(target)
152        .map(|target| path == target)
153        .unwrap_or(false)
154}
155
156fn watcher_path_is_git_info_exclude(config: &WatcherFilterConfig, path: &Path) -> bool {
157    watcher_same_path(path, &config.git_info_exclude_path())
158}
159
160fn watcher_path_is_global_gitignore(path: &Path) -> bool {
161    ignore::gitignore::gitconfig_excludes_path()
162        .as_deref()
163        .is_some_and(|global_ignore| watcher_same_path(path, global_ignore))
164}
165
166fn watcher_path_can_change_corpus_ignore(config: &WatcherFilterConfig, path: &Path) -> bool {
167    if watcher_path_is_global_gitignore(path) {
168        return true;
169    }
170    if watcher_path_is_git_info_exclude(config, path) {
171        return true;
172    }
173    if !path.starts_with(&config.project_root) {
174        return false;
175    }
176
177    watcher_path_is_ignore_file(path) && !watcher_path_is_infra_skip(path)
178}
179
180pub fn canonicalize_watcher_path(path: PathBuf) -> PathBuf {
181    if let Ok(canonical) = std::fs::canonicalize(&path) {
182        return canonical;
183    }
184
185    let parent = path.parent().map(Path::to_path_buf);
186    let file_name = path.file_name().map(std::ffi::OsStr::to_os_string);
187    match (parent, file_name) {
188        (Some(parent), Some(file_name)) => std::fs::canonicalize(parent)
189            .map(|canonical_parent| canonical_parent.join(file_name))
190            .unwrap_or(path),
191        _ => path,
192    }
193}
194
195fn watcher_path_is_ignored_by_matcher(matcher: &SharedGitignore, path: &Path) -> bool {
196    if watcher_path_is_infra_skip(path) {
197        return true;
198    }
199
200    let guard = matcher
201        .read()
202        .unwrap_or_else(|poisoned| poisoned.into_inner());
203    if let Some(matcher) = guard.as_ref() {
204        if path.starts_with(matcher.path()) {
205            let is_dir = path.is_dir();
206            return matcher
207                .matched_path_or_any_parents(path, is_dir)
208                .is_ignore();
209        }
210    }
211
212    false
213}
214
215#[derive(Debug, Default, Clone, PartialEq, Eq)]
216pub struct FilteredWatcherPaths {
217    pub changed: BTreeSet<PathBuf>,
218    pub ignore_file_changed: bool,
219}
220
221fn filter_canonical_paths(
222    config: &WatcherFilterConfig,
223    matcher: &SharedGitignore,
224    raw_paths: BTreeSet<PathBuf>,
225) -> FilteredWatcherPaths {
226    let ignore_file_changed = raw_paths
227        .iter()
228        .any(|path| watcher_path_can_change_corpus_ignore(config, path));
229
230    let changed = raw_paths
231        .into_iter()
232        .filter(|path| {
233            if watcher_path_is_infra_skip(path) {
234                return false;
235            }
236
237            if watcher_path_is_global_gitignore(path)
238                || watcher_path_is_git_info_exclude(config, path)
239            {
240                return false;
241            }
242
243            if watcher_path_is_ignored_by_matcher(matcher, path) {
244                return false;
245            }
246            true
247        })
248        .collect();
249
250    FilteredWatcherPaths {
251        changed,
252        ignore_file_changed,
253    }
254}
255
256pub fn filter_watcher_raw_paths_for_test<I>(
257    config: &WatcherFilterConfig,
258    matcher: &SharedGitignore,
259    raw_paths: I,
260) -> FilteredWatcherPaths
261where
262    I: IntoIterator<Item = PathBuf>,
263{
264    let raw_paths = raw_paths
265        .into_iter()
266        .map(canonicalize_watcher_path)
267        .collect::<BTreeSet<_>>();
268    filter_canonical_paths(config, matcher, raw_paths)
269}
270
271pub fn run_watcher_thread<W, E, F>(
272    config: WatcherFilterConfig,
273    extra_watch_paths: Vec<PathBuf>,
274    matcher: SharedGitignore,
275    matcher_generation: Arc<AtomicU64>,
276    dispatch_tx: Sender<WatcherDispatchEvent>,
277    shutdown: Arc<AtomicBool>,
278    attach: F,
279) where
280    W: Send + 'static,
281    E: std::fmt::Display,
282    F: FnOnce(PathBuf, Vec<PathBuf>, mpsc::Sender<notify::Result<notify::Event>>) -> Result<W, E>,
283{
284    let _active_guard = ActiveWatcherThreadGuard::new();
285    let (raw_tx, raw_rx) = mpsc::channel();
286    let root_path = config.project_root.clone();
287    match attach(root_path.clone(), extra_watch_paths, raw_tx) {
288        Ok(_watcher) => {
289            if shutdown.load(Ordering::SeqCst) {
290                return;
291            }
292            crate::slog_info!("watcher started: {}", root_path.display());
293            let mut filter = WatcherFilterThread::new(
294                config,
295                matcher,
296                matcher_generation,
297                dispatch_tx,
298                shutdown,
299            );
300            filter.run(raw_rx);
301        }
302        Err(error) => {
303            if !shutdown.load(Ordering::SeqCst) {
304                log::debug!(
305                    "watcher init failed: {} — callers will work with stale data",
306                    error
307                );
308                let _ = dispatch_tx.send(WatcherDispatchEvent::Error(format!(
309                    "watcher init failed: {error}"
310                )));
311            }
312        }
313    }
314}
315
316struct WatcherFilterThread {
317    config: WatcherFilterConfig,
318    matcher: SharedGitignore,
319    matcher_generation: Arc<AtomicU64>,
320    dispatch_tx: Sender<WatcherDispatchEvent>,
321    shutdown: Arc<AtomicBool>,
322    raw_paths: BTreeSet<PathBuf>,
323    flush_deadline: Option<Instant>,
324}
325
326impl WatcherFilterThread {
327    fn new(
328        config: WatcherFilterConfig,
329        matcher: SharedGitignore,
330        matcher_generation: Arc<AtomicU64>,
331        dispatch_tx: Sender<WatcherDispatchEvent>,
332        shutdown: Arc<AtomicBool>,
333    ) -> Self {
334        Self {
335            config,
336            matcher,
337            matcher_generation,
338            dispatch_tx,
339            shutdown,
340            raw_paths: BTreeSet::new(),
341            flush_deadline: None,
342        }
343    }
344
345    fn run(&mut self, raw_rx: mpsc::Receiver<notify::Result<notify::Event>>) {
346        loop {
347            if self.shutdown.load(Ordering::SeqCst) {
348                self.flush_pending();
349                return;
350            }
351            if self.project_root_was_deleted() {
352                self.raw_paths.clear();
353                let _ = self.send_dispatch(WatcherDispatchEvent::RootDeleted);
354                return;
355            }
356            if self.flush_deadline_reached() {
357                if !self.flush_pending() {
358                    return;
359                }
360                continue;
361            }
362
363            match raw_rx.recv_timeout(self.next_recv_timeout()) {
364                Ok(Ok(event)) => {
365                    if watcher_event_invalidates(&event.kind) && !self.push_raw_paths(event.paths) {
366                        return;
367                    }
368                }
369                Ok(Err(error)) => {
370                    let _ = self.send_dispatch(WatcherDispatchEvent::Error(error.to_string()));
371                    return;
372                }
373                Err(mpsc::RecvTimeoutError::Timeout) => {
374                    if !self.flush_pending() {
375                        return;
376                    }
377                }
378                Err(mpsc::RecvTimeoutError::Disconnected) => {
379                    if !self.shutdown.load(Ordering::SeqCst) {
380                        let _ = self.send_dispatch(WatcherDispatchEvent::Error(
381                            "watcher channel disconnected".to_string(),
382                        ));
383                    }
384                    return;
385                }
386            }
387        }
388    }
389
390    fn project_root_was_deleted(&self) -> bool {
391        !self.config.project_root.exists()
392    }
393
394    fn push_raw_paths(&mut self, paths: Vec<PathBuf>) -> bool {
395        for path in paths {
396            self.raw_paths.insert(canonicalize_watcher_path(path));
397        }
398        if !self.raw_paths.is_empty() && self.flush_deadline.is_none() {
399            self.flush_deadline = Some(Instant::now() + WATCHER_FLUSH_WINDOW);
400        }
401        if self.raw_paths.len() >= WATCHER_MAX_BATCH_PATHS {
402            return self.flush_pending();
403        }
404        true
405    }
406
407    fn next_recv_timeout(&self) -> Duration {
408        let root_check = ROOT_DELETED_CHECK_INTERVAL;
409        match self.flush_deadline {
410            Some(deadline) => deadline
411                .saturating_duration_since(Instant::now())
412                .min(root_check),
413            None => root_check,
414        }
415    }
416
417    fn flush_deadline_reached(&self) -> bool {
418        self.flush_deadline
419            .is_some_and(|deadline| Instant::now() >= deadline)
420    }
421
422    fn flush_pending(&mut self) -> bool {
423        if self.raw_paths.is_empty() {
424            self.flush_deadline = None;
425            return true;
426        }
427
428        let raw_paths = std::mem::take(&mut self.raw_paths);
429        self.flush_deadline = None;
430        let ignore_path = raw_paths
431            .iter()
432            .find(|path| watcher_path_can_change_corpus_ignore(&self.config, path))
433            .cloned();
434        let ignore_file_changed = ignore_path.is_some();
435        if let Some(path) = ignore_path {
436            let observed_generation = self.matcher_generation.load(Ordering::SeqCst);
437            if !self.send_dispatch(WatcherDispatchEvent::IgnoreRulesChanged { path }) {
438                return false;
439            }
440            if !self.wait_for_gitignore_rebuild(observed_generation) {
441                return false;
442            }
443        }
444
445        let filtered = filter_canonical_paths(&self.config, &self.matcher, raw_paths);
446        debug_assert_eq!(filtered.ignore_file_changed, ignore_file_changed);
447        if filtered.changed.is_empty() {
448            return true;
449        }
450        self.send_dispatch(WatcherDispatchEvent::Paths(
451            filtered.changed.into_iter().collect(),
452        ))
453    }
454
455    fn wait_for_gitignore_rebuild(&self, observed_generation: u64) -> bool {
456        while !self.shutdown.load(Ordering::SeqCst)
457            && self.matcher_generation.load(Ordering::SeqCst) == observed_generation
458        {
459            if self.project_root_was_deleted() {
460                let _ = self.send_dispatch(WatcherDispatchEvent::RootDeleted);
461                return false;
462            }
463            thread::sleep(GITIGNORE_REBUILD_POLL_INTERVAL);
464        }
465        !self.shutdown.load(Ordering::SeqCst)
466    }
467
468    fn send_dispatch(&self, event: WatcherDispatchEvent) -> bool {
469        let mut event = event;
470        loop {
471            match self
472                .dispatch_tx
473                .send_timeout(event, DISPATCH_SEND_POLL_INTERVAL)
474            {
475                Ok(()) => return true,
476                Err(SendTimeoutError::Timeout(returned)) => {
477                    if self.shutdown.load(Ordering::SeqCst) {
478                        return false;
479                    }
480                    event = returned;
481                }
482                Err(SendTimeoutError::Disconnected(_)) => return false,
483            }
484        }
485    }
486}
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491    use ignore::gitignore::GitignoreBuilder;
492    use notify::event::{AccessKind, AccessMode, CreateKind, DataChange, MetadataKind, ModifyKind};
493    use notify::EventKind;
494    use tempfile::TempDir;
495
496    fn shared_matcher(root: &Path) -> SharedGitignore {
497        let root = std::fs::canonicalize(root).unwrap_or_else(|_| root.to_path_buf());
498        let mut builder = GitignoreBuilder::new(&root);
499        let ignore = root.join(".gitignore");
500        if ignore.exists() {
501            if let Some(error) = builder.add(&ignore) {
502                panic!("gitignore parse error: {error}");
503            }
504        }
505        let matcher = builder.build().unwrap();
506        let matcher = (matcher.num_ignores() > 0).then(|| Arc::new(matcher));
507        Arc::new(RwLock::new(matcher))
508    }
509
510    #[test]
511    fn event_kind_filter_accepts_content_changes_only() {
512        assert!(watcher_event_invalidates(&EventKind::Create(
513            CreateKind::File
514        )));
515        assert!(watcher_event_invalidates(&EventKind::Modify(
516            ModifyKind::Data(DataChange::Content)
517        )));
518        assert!(watcher_event_invalidates(&EventKind::Modify(
519            ModifyKind::Metadata(MetadataKind::WriteTime)
520        )));
521        assert!(!watcher_event_invalidates(&EventKind::Modify(
522            ModifyKind::Metadata(MetadataKind::AccessTime)
523        )));
524        assert!(!watcher_event_invalidates(&EventKind::Modify(
525            ModifyKind::Metadata(MetadataKind::Permissions)
526        )));
527        assert!(!watcher_event_invalidates(&EventKind::Access(
528            AccessKind::Open(AccessMode::Read)
529        )));
530        assert!(!watcher_event_invalidates(&EventKind::Other));
531    }
532
533    #[test]
534    fn filters_gitignored_paths_with_shared_matcher() {
535        let tmp = TempDir::new().unwrap();
536        let root = std::fs::canonicalize(tmp.path()).unwrap();
537        std::fs::write(root.join(".gitignore"), "ignored/\n").unwrap();
538        std::fs::create_dir_all(root.join("ignored")).unwrap();
539        std::fs::write(root.join("ignored/file.ts"), "ignored").unwrap();
540        std::fs::write(root.join("kept.ts"), "kept").unwrap();
541        let matcher = shared_matcher(&root);
542        let config = WatcherFilterConfig::new(root.clone(), None);
543
544        let filtered = filter_watcher_raw_paths_for_test(
545            &config,
546            &matcher,
547            [root.join("ignored/file.ts"), root.join("kept.ts")],
548        );
549
550        assert!(!filtered.changed.contains(&root.join("ignored/file.ts")));
551        assert!(filtered.changed.contains(&root.join("kept.ts")));
552    }
553
554    #[test]
555    fn ignore_rule_paths_are_control_only_for_external_excludes() {
556        let tmp = TempDir::new().unwrap();
557        let root = std::fs::canonicalize(tmp.path()).unwrap();
558        let git_info = root.join(".git").join("info");
559        std::fs::create_dir_all(&git_info).unwrap();
560        let exclude = git_info.join("exclude");
561        std::fs::write(&exclude, "ignored/\n").unwrap();
562        let matcher = Arc::new(RwLock::new(None));
563        let config = WatcherFilterConfig::new(root, None);
564
565        let filtered = filter_watcher_raw_paths_for_test(&config, &matcher, [exclude]);
566
567        assert!(filtered.ignore_file_changed);
568        assert!(filtered.changed.is_empty());
569    }
570
571    #[test]
572    fn root_deleted_sends_control_and_exits() {
573        let tmp = TempDir::new().unwrap();
574        let root = std::fs::canonicalize(tmp.path()).unwrap();
575        let matcher = Arc::new(RwLock::new(None));
576        let generation = Arc::new(AtomicU64::new(0));
577        let shutdown = Arc::new(AtomicBool::new(false));
578        let (dispatch_tx, dispatch_rx) = watcher_dispatch_channel();
579        let (raw_tx, raw_rx) = mpsc::channel();
580        let config = WatcherFilterConfig::new(root.clone(), None);
581        let mut filter = WatcherFilterThread::new(
582            config,
583            matcher,
584            generation,
585            dispatch_tx,
586            Arc::clone(&shutdown),
587        );
588        let handle = thread::spawn(move || filter.run(raw_rx));
589        let _raw_tx = raw_tx;
590        std::fs::remove_dir_all(&root).unwrap();
591
592        let event = dispatch_rx
593            .recv_timeout(Duration::from_secs(2))
594            .expect("root deleted event");
595        assert_eq!(event, WatcherDispatchEvent::RootDeleted);
596        shutdown.store(true, Ordering::SeqCst);
597        handle.join().unwrap();
598    }
599}