Skip to main content

aft/
watcher_filter.rs

1use std::collections::BTreeSet;
2use std::path::{Component, Path, PathBuf};
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::{mpsc, Arc, RwLock};
5use std::thread::{self, JoinHandle};
6use std::time::{Duration, Instant};
7
8use crossbeam_channel::{Receiver, SendTimeoutError, Sender};
9use ignore::gitignore::Gitignore;
10
11pub type SharedGitignore = Arc<RwLock<Option<Arc<Gitignore>>>>;
12
13pub const WATCHER_FLUSH_WINDOW: Duration = Duration::from_millis(250);
14pub const WATCHER_MAX_BATCH_PATHS: usize = 1024;
15pub const WATCHER_DISPATCH_CHANNEL_CAPACITY: usize = 1024;
16const ROOT_DELETED_CHECK_INTERVAL: Duration = Duration::from_millis(250);
17const GITIGNORE_REBUILD_POLL_INTERVAL: Duration = Duration::from_millis(10);
18const DISPATCH_SEND_POLL_INTERVAL: Duration = Duration::from_millis(50);
19
20#[derive(Debug, Clone)]
21pub struct WatcherFilterConfig {
22    pub project_root: PathBuf,
23    pub git_common_dir: Option<PathBuf>,
24}
25
26impl WatcherFilterConfig {
27    pub fn new(project_root: PathBuf, git_common_dir: Option<PathBuf>) -> Self {
28        Self {
29            project_root,
30            git_common_dir,
31        }
32    }
33
34    fn git_info_exclude_path(&self) -> PathBuf {
35        self.git_common_dir
36            .clone()
37            .unwrap_or_else(|| self.project_root.join(".git"))
38            .join("info")
39            .join("exclude")
40    }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum WatcherDispatchEvent {
45    Paths(Vec<PathBuf>),
46    RescanRequired,
47    IgnoreRulesChanged { path: PathBuf },
48    RootDeleted,
49    Error(String),
50}
51
52pub struct WatcherThreadHandle {
53    shutdown: Arc<AtomicBool>,
54    join: Option<JoinHandle<()>>,
55}
56
57impl WatcherThreadHandle {
58    pub fn new(shutdown: Arc<AtomicBool>, join: JoinHandle<()>) -> Self {
59        Self {
60            shutdown,
61            join: Some(join),
62        }
63    }
64
65    pub fn request_shutdown(&self) {
66        self.shutdown.store(true, Ordering::SeqCst);
67    }
68
69    pub fn is_finished(&self) -> bool {
70        self.join.as_ref().is_none_or(|join| join.is_finished())
71    }
72
73    pub fn shutdown_and_join(mut self) {
74        self.request_shutdown();
75        if let Some(join) = self.join.take() {
76            let _ = join.join();
77        }
78    }
79}
80
81impl Drop for WatcherThreadHandle {
82    fn drop(&mut self) {
83        self.request_shutdown();
84    }
85}
86
87pub fn watcher_dispatch_channel() -> (Sender<WatcherDispatchEvent>, Receiver<WatcherDispatchEvent>)
88{
89    crossbeam_channel::bounded(WATCHER_DISPATCH_CHANNEL_CAPACITY)
90}
91
92/// Decide whether a `notify::Event` represents a real content change worth
93/// invalidating cached state for.
94pub fn watcher_event_invalidates(kind: &notify::EventKind) -> bool {
95    use notify::event::{MetadataKind, ModifyKind};
96    use notify::EventKind;
97    match kind {
98        EventKind::Create(_) | EventKind::Remove(_) => true,
99        EventKind::Modify(ModifyKind::Metadata(meta)) => !matches!(
100            meta,
101            MetadataKind::AccessTime
102                | MetadataKind::Permissions
103                | MetadataKind::Ownership
104                | MetadataKind::Extended
105        ),
106        EventKind::Modify(_) => true,
107        _ => false,
108    }
109}
110
111pub fn watcher_path_is_infra_skip(path: &Path) -> bool {
112    path.components().any(|c| {
113        matches!(c, Component::Normal(name) if matches!(
114            name.to_str().unwrap_or(""),
115            ".git" | ".opencode" | ".alfonso" | ".gsd" | "node_modules" | "target"
116        ))
117    })
118}
119
120/// High-churn ignored directories that can be dropped from the raw event stream
121/// before paying for a `realpath` canonicalization.
122///
123/// A build writes hundreds of thousands of files under `target/` (or
124/// `node_modules/` for JS installs); FSEvents delivers every one to AFT, and
125/// canonicalizing each just to drop it later in the filter pegs the
126/// single-threaded watcher loop. This is a pure path-component scan (no syscall),
127/// so the flood is rejected almost for free.
128///
129/// This deliberately omits `.git`: `.git/info/exclude` changes the corpus ignore
130/// set, and dropping `.git` here would hide them from the ignore-relevance check
131/// in the full filter. `.git` churn is small next to `target/`, so it stays on
132/// the canonicalizing path.
133fn watcher_path_is_high_churn_infra(path: &Path) -> bool {
134    path.components().any(|c| {
135        matches!(c, Component::Normal(name) if matches!(
136            name.to_str().unwrap_or(""),
137            ".opencode" | ".alfonso" | ".gsd" | "node_modules" | "target"
138        ))
139    })
140}
141
142fn watcher_path_is_ignore_file(path: &Path) -> bool {
143    path.file_name()
144        .map(|n| n == ".gitignore" || n == ".aftignore")
145        .unwrap_or(false)
146}
147
148fn watcher_same_path(path: &Path, target: &Path) -> bool {
149    if path == target {
150        return true;
151    }
152
153    std::fs::canonicalize(target)
154        .map(|target| path == target)
155        .unwrap_or(false)
156}
157
158fn watcher_path_is_git_info_exclude(config: &WatcherFilterConfig, path: &Path) -> bool {
159    watcher_same_path(path, &config.git_info_exclude_path())
160}
161
162fn watcher_path_is_global_gitignore(path: &Path) -> bool {
163    ignore::gitignore::gitconfig_excludes_path()
164        .as_deref()
165        .is_some_and(|global_ignore| watcher_same_path(path, global_ignore))
166}
167
168fn watcher_path_can_change_corpus_ignore(config: &WatcherFilterConfig, path: &Path) -> bool {
169    if watcher_path_is_global_gitignore(path) {
170        return true;
171    }
172    if watcher_path_is_git_info_exclude(config, path) {
173        return true;
174    }
175    if !path.starts_with(&config.project_root) {
176        return false;
177    }
178
179    watcher_path_is_ignore_file(path) && !watcher_path_is_infra_skip(path)
180}
181
182pub fn canonicalize_watcher_path(path: PathBuf) -> PathBuf {
183    if let Ok(canonical) = std::fs::canonicalize(&path) {
184        return canonical;
185    }
186
187    let parent = path.parent().map(Path::to_path_buf);
188    let file_name = path.file_name().map(std::ffi::OsStr::to_os_string);
189    match (parent, file_name) {
190        (Some(parent), Some(file_name)) => std::fs::canonicalize(parent)
191            .map(|canonical_parent| canonical_parent.join(file_name))
192            .unwrap_or(path),
193        _ => path,
194    }
195}
196
197fn watcher_path_is_ignored_by_matcher(matcher: &SharedGitignore, path: &Path) -> bool {
198    if watcher_path_is_infra_skip(path) {
199        return true;
200    }
201
202    let guard = matcher
203        .read()
204        .unwrap_or_else(|poisoned| poisoned.into_inner());
205    if let Some(matcher) = guard.as_ref() {
206        if path.starts_with(matcher.path()) {
207            let is_dir = path.is_dir();
208            return matcher
209                .matched_path_or_any_parents(path, is_dir)
210                .is_ignore();
211        }
212    }
213
214    false
215}
216
217#[derive(Debug, Default, Clone, PartialEq, Eq)]
218pub struct FilteredWatcherPaths {
219    pub changed: BTreeSet<PathBuf>,
220    pub ignore_file_changed: bool,
221}
222
223fn filter_canonical_paths(
224    config: &WatcherFilterConfig,
225    matcher: &SharedGitignore,
226    raw_paths: BTreeSet<PathBuf>,
227) -> FilteredWatcherPaths {
228    let ignore_file_changed = raw_paths
229        .iter()
230        .any(|path| watcher_path_can_change_corpus_ignore(config, path));
231
232    let changed = raw_paths
233        .into_iter()
234        .filter(|path| {
235            if watcher_path_is_infra_skip(path) {
236                return false;
237            }
238
239            if watcher_path_is_global_gitignore(path)
240                || watcher_path_is_git_info_exclude(config, path)
241            {
242                return false;
243            }
244
245            if watcher_path_is_ignored_by_matcher(matcher, path) {
246                return false;
247            }
248            true
249        })
250        .collect();
251
252    FilteredWatcherPaths {
253        changed,
254        ignore_file_changed,
255    }
256}
257
258pub fn filter_watcher_raw_paths_for_test<I>(
259    config: &WatcherFilterConfig,
260    matcher: &SharedGitignore,
261    raw_paths: I,
262) -> FilteredWatcherPaths
263where
264    I: IntoIterator<Item = PathBuf>,
265{
266    let raw_paths = raw_paths
267        .into_iter()
268        .map(canonicalize_watcher_path)
269        .collect::<BTreeSet<_>>();
270    filter_canonical_paths(config, matcher, raw_paths)
271}
272
273pub fn run_watcher_thread<W, E, F>(
274    config: WatcherFilterConfig,
275    extra_watch_paths: Vec<PathBuf>,
276    matcher: SharedGitignore,
277    matcher_generation: Arc<AtomicU64>,
278    dispatch_tx: Sender<WatcherDispatchEvent>,
279    shutdown: Arc<AtomicBool>,
280    attach: F,
281) where
282    W: Send + 'static,
283    E: std::fmt::Display,
284    F: FnOnce(PathBuf, Vec<PathBuf>, mpsc::Sender<notify::Result<notify::Event>>) -> Result<W, E>,
285{
286    let (raw_tx, raw_rx) = mpsc::channel();
287    let root_path = config.project_root.clone();
288    match attach(root_path.clone(), extra_watch_paths, raw_tx) {
289        Ok(_watcher) => {
290            if shutdown.load(Ordering::SeqCst) {
291                return;
292            }
293            crate::slog_info!("watcher started: {}", root_path.display());
294            let mut filter = WatcherFilterThread::new(
295                config,
296                matcher,
297                matcher_generation,
298                dispatch_tx,
299                shutdown,
300            );
301            filter.run(raw_rx);
302        }
303        Err(error) => {
304            if !shutdown.load(Ordering::SeqCst) {
305                log::debug!(
306                    "watcher init failed: {} — callers will work with stale data",
307                    error
308                );
309                let _ = dispatch_tx.send(WatcherDispatchEvent::Error(format!(
310                    "watcher init failed: {error}"
311                )));
312            }
313        }
314    }
315}
316
317struct WatcherFilterThread {
318    config: WatcherFilterConfig,
319    matcher: SharedGitignore,
320    matcher_generation: Arc<AtomicU64>,
321    dispatch_tx: Sender<WatcherDispatchEvent>,
322    shutdown: Arc<AtomicBool>,
323    raw_paths: BTreeSet<PathBuf>,
324    flush_deadline: Option<Instant>,
325}
326
327impl WatcherFilterThread {
328    fn new(
329        config: WatcherFilterConfig,
330        matcher: SharedGitignore,
331        matcher_generation: Arc<AtomicU64>,
332        dispatch_tx: Sender<WatcherDispatchEvent>,
333        shutdown: Arc<AtomicBool>,
334    ) -> Self {
335        Self {
336            config,
337            matcher,
338            matcher_generation,
339            dispatch_tx,
340            shutdown,
341            raw_paths: BTreeSet::new(),
342            flush_deadline: None,
343        }
344    }
345
346    fn run(&mut self, raw_rx: mpsc::Receiver<notify::Result<notify::Event>>) {
347        loop {
348            if self.shutdown.load(Ordering::SeqCst) {
349                self.flush_pending();
350                return;
351            }
352            if self.project_root_was_deleted() {
353                self.raw_paths.clear();
354                let _ = self.send_dispatch(WatcherDispatchEvent::RootDeleted);
355                return;
356            }
357            if self.flush_deadline_reached() {
358                if !self.flush_pending() {
359                    return;
360                }
361                continue;
362            }
363
364            match raw_rx.recv_timeout(self.next_recv_timeout()) {
365                Ok(Ok(event)) => {
366                    if event.need_rescan() {
367                        self.raw_paths.clear();
368                        self.flush_deadline = None;
369                        if !self.send_dispatch(WatcherDispatchEvent::RescanRequired) {
370                            return;
371                        }
372                        continue;
373                    }
374                    if watcher_event_invalidates(&event.kind) && !self.push_raw_paths(event.paths) {
375                        return;
376                    }
377                }
378                Ok(Err(error)) => {
379                    let _ = self.send_dispatch(WatcherDispatchEvent::Error(error.to_string()));
380                    return;
381                }
382                Err(mpsc::RecvTimeoutError::Timeout) => {
383                    if !self.flush_pending() {
384                        return;
385                    }
386                }
387                Err(mpsc::RecvTimeoutError::Disconnected) => {
388                    if !self.shutdown.load(Ordering::SeqCst) {
389                        let _ = self.send_dispatch(WatcherDispatchEvent::Error(
390                            "watcher channel disconnected".to_string(),
391                        ));
392                    }
393                    return;
394                }
395            }
396        }
397    }
398
399    fn project_root_was_deleted(&self) -> bool {
400        !self.config.project_root.exists()
401    }
402
403    fn push_raw_paths(&mut self, paths: Vec<PathBuf>) -> bool {
404        for path in paths {
405            // Drop high-churn ignored dirs (target/, node_modules/, agent infra)
406            // on the RAW path before canonicalizing. `canonicalize_watcher_path`
407            // is a realpath syscall; a build floods FSEvents with hundreds of
408            // thousands of target/ paths, and paying a syscall per path only to
409            // discard them later pegged this single watcher thread. The full
410            // filter still drops these (watcher_path_is_infra_skip), so this is a
411            // pure perf short-circuit with no behavior change.
412            if watcher_path_is_high_churn_infra(&path) {
413                continue;
414            }
415            // Canonicalize at intake so the set keys (and downstream consumers)
416            // see normalized paths — this is what collapses macOS /var ->
417            // /private/var aliasing and matches the callgraph/semantic/search
418            // cache keys. Same-file repeats within the window still dedup here;
419            // the high-churn flood is already dropped above, before this syscall.
420            self.raw_paths.insert(canonicalize_watcher_path(path));
421        }
422        if !self.raw_paths.is_empty() && self.flush_deadline.is_none() {
423            self.flush_deadline = Some(Instant::now() + WATCHER_FLUSH_WINDOW);
424        }
425        if self.raw_paths.len() >= WATCHER_MAX_BATCH_PATHS {
426            return self.flush_pending();
427        }
428        true
429    }
430
431    fn next_recv_timeout(&self) -> Duration {
432        let root_check = ROOT_DELETED_CHECK_INTERVAL;
433        match self.flush_deadline {
434            Some(deadline) => deadline
435                .saturating_duration_since(Instant::now())
436                .min(root_check),
437            None => root_check,
438        }
439    }
440
441    fn flush_deadline_reached(&self) -> bool {
442        self.flush_deadline
443            .is_some_and(|deadline| Instant::now() >= deadline)
444    }
445
446    fn flush_pending(&mut self) -> bool {
447        if self.raw_paths.is_empty() {
448            self.flush_deadline = None;
449            return true;
450        }
451
452        let raw_paths = std::mem::take(&mut self.raw_paths);
453        self.flush_deadline = None;
454        let ignore_path = raw_paths
455            .iter()
456            .find(|path| watcher_path_can_change_corpus_ignore(&self.config, path))
457            .cloned();
458        let ignore_file_changed = ignore_path.is_some();
459        if let Some(path) = ignore_path {
460            let observed_generation = self.matcher_generation.load(Ordering::SeqCst);
461            if !self.send_dispatch(WatcherDispatchEvent::IgnoreRulesChanged { path }) {
462                return false;
463            }
464            if !self.wait_for_gitignore_rebuild(observed_generation) {
465                return false;
466            }
467        }
468
469        let filtered = filter_canonical_paths(&self.config, &self.matcher, raw_paths);
470        debug_assert_eq!(filtered.ignore_file_changed, ignore_file_changed);
471        if filtered.changed.is_empty() {
472            return true;
473        }
474        self.send_dispatch(WatcherDispatchEvent::Paths(
475            filtered.changed.into_iter().collect(),
476        ))
477    }
478
479    fn wait_for_gitignore_rebuild(&self, observed_generation: u64) -> bool {
480        while !self.shutdown.load(Ordering::SeqCst)
481            && self.matcher_generation.load(Ordering::SeqCst) == observed_generation
482        {
483            if self.project_root_was_deleted() {
484                let _ = self.send_dispatch(WatcherDispatchEvent::RootDeleted);
485                return false;
486            }
487            thread::sleep(GITIGNORE_REBUILD_POLL_INTERVAL);
488        }
489        !self.shutdown.load(Ordering::SeqCst)
490    }
491
492    fn send_dispatch(&self, event: WatcherDispatchEvent) -> bool {
493        let mut event = event;
494        loop {
495            match self
496                .dispatch_tx
497                .send_timeout(event, DISPATCH_SEND_POLL_INTERVAL)
498            {
499                Ok(()) => return true,
500                Err(SendTimeoutError::Timeout(returned)) => {
501                    if self.shutdown.load(Ordering::SeqCst) {
502                        return false;
503                    }
504                    event = returned;
505                }
506                Err(SendTimeoutError::Disconnected(_)) => return false,
507            }
508        }
509    }
510}
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515    use ignore::gitignore::GitignoreBuilder;
516    use notify::event::{
517        AccessKind, AccessMode, CreateKind, DataChange, Flag, MetadataKind, ModifyKind,
518    };
519    use notify::EventKind;
520    use tempfile::TempDir;
521
522    fn shared_matcher(root: &Path) -> SharedGitignore {
523        let root = std::fs::canonicalize(root).unwrap_or_else(|_| root.to_path_buf());
524        let mut builder = GitignoreBuilder::new(&root);
525        let ignore = root.join(".gitignore");
526        if ignore.exists() {
527            if let Some(error) = builder.add(&ignore) {
528                panic!("gitignore parse error: {error}");
529            }
530        }
531        let matcher = builder.build().unwrap();
532        let matcher = (matcher.num_ignores() > 0).then(|| Arc::new(matcher));
533        Arc::new(RwLock::new(matcher))
534    }
535
536    #[test]
537    fn event_kind_filter_accepts_content_changes_only() {
538        assert!(watcher_event_invalidates(&EventKind::Create(
539            CreateKind::File
540        )));
541        assert!(watcher_event_invalidates(&EventKind::Modify(
542            ModifyKind::Data(DataChange::Content)
543        )));
544        assert!(watcher_event_invalidates(&EventKind::Modify(
545            ModifyKind::Metadata(MetadataKind::WriteTime)
546        )));
547        assert!(!watcher_event_invalidates(&EventKind::Modify(
548            ModifyKind::Metadata(MetadataKind::AccessTime)
549        )));
550        assert!(!watcher_event_invalidates(&EventKind::Modify(
551            ModifyKind::Metadata(MetadataKind::Permissions)
552        )));
553        assert!(!watcher_event_invalidates(&EventKind::Access(
554            AccessKind::Open(AccessMode::Read)
555        )));
556        assert!(!watcher_event_invalidates(&EventKind::Other));
557    }
558
559    #[test]
560    fn high_churn_infra_skip_drops_build_dirs_but_keeps_git_and_source() {
561        // target/ and node_modules/ are dropped on the raw path before the
562        // realpath syscall — this is the build-flood short-circuit.
563        assert!(watcher_path_is_high_churn_infra(Path::new(
564            "/proj/target/debug/deps/foo.o"
565        )));
566        assert!(watcher_path_is_high_churn_infra(Path::new(
567            "/proj/node_modules/.bin/x"
568        )));
569        assert!(watcher_path_is_high_churn_infra(Path::new(
570            "/proj/.alfonso/notes/x"
571        )));
572        // .git is deliberately NOT high-churn-skipped: .git/info/exclude must
573        // still reach the ignore-relevance check.
574        assert!(!watcher_path_is_high_churn_infra(Path::new(
575            "/proj/.git/info/exclude"
576        )));
577        // Source files always pass through to canonicalization + filtering.
578        assert!(!watcher_path_is_high_churn_infra(Path::new(
579            "/proj/src/main.rs"
580        )));
581        // The full filter still drops .git (and everything high-churn does).
582        assert!(watcher_path_is_infra_skip(Path::new("/proj/.git/index")));
583    }
584
585    #[test]
586    fn rescan_event_dispatches_control_and_supersedes_pending_paths() {
587        let tmp = TempDir::new().unwrap();
588        let root = std::fs::canonicalize(tmp.path()).unwrap();
589        let pending = root.join("pending.rs");
590        std::fs::write(&pending, "fn main() {}\n").unwrap();
591        let matcher = Arc::new(RwLock::new(None));
592        let generation = Arc::new(AtomicU64::new(0));
593        let shutdown = Arc::new(AtomicBool::new(false));
594        let (dispatch_tx, dispatch_rx) = watcher_dispatch_channel();
595        let (raw_tx, raw_rx) = mpsc::channel();
596        let config = WatcherFilterConfig::new(root, None);
597        let mut filter = WatcherFilterThread::new(
598            config,
599            matcher,
600            generation,
601            dispatch_tx,
602            Arc::clone(&shutdown),
603        );
604        let handle = thread::spawn(move || filter.run(raw_rx));
605
606        let mut granular = notify::Event::new(EventKind::Create(CreateKind::File));
607        granular.paths.push(pending);
608        raw_tx.send(Ok(granular)).unwrap();
609        raw_tx
610            .send(Ok(
611                notify::Event::new(EventKind::Other).set_flag(Flag::Rescan)
612            ))
613            .unwrap();
614
615        let event = dispatch_rx
616            .recv_timeout(Duration::from_secs(2))
617            .expect("rescan event");
618        assert_eq!(event, WatcherDispatchEvent::RescanRequired);
619        assert!(
620            dispatch_rx
621                .recv_timeout(WATCHER_FLUSH_WINDOW + Duration::from_millis(100))
622                .is_err(),
623            "pending granular paths should be cleared by a rescan signal"
624        );
625
626        shutdown.store(true, Ordering::SeqCst);
627        drop(raw_tx);
628        handle.join().unwrap();
629    }
630
631    #[test]
632    fn filters_gitignored_paths_with_shared_matcher() {
633        let tmp = TempDir::new().unwrap();
634        let root = std::fs::canonicalize(tmp.path()).unwrap();
635        std::fs::write(root.join(".gitignore"), "ignored/\n").unwrap();
636        std::fs::create_dir_all(root.join("ignored")).unwrap();
637        std::fs::write(root.join("ignored/file.ts"), "ignored").unwrap();
638        std::fs::write(root.join("kept.ts"), "kept").unwrap();
639        let matcher = shared_matcher(&root);
640        let config = WatcherFilterConfig::new(root.clone(), None);
641
642        let filtered = filter_watcher_raw_paths_for_test(
643            &config,
644            &matcher,
645            [root.join("ignored/file.ts"), root.join("kept.ts")],
646        );
647
648        assert!(!filtered.changed.contains(&root.join("ignored/file.ts")));
649        assert!(filtered.changed.contains(&root.join("kept.ts")));
650    }
651
652    #[test]
653    fn ignore_rule_paths_are_control_only_for_external_excludes() {
654        let tmp = TempDir::new().unwrap();
655        let root = std::fs::canonicalize(tmp.path()).unwrap();
656        let git_info = root.join(".git").join("info");
657        std::fs::create_dir_all(&git_info).unwrap();
658        let exclude = git_info.join("exclude");
659        std::fs::write(&exclude, "ignored/\n").unwrap();
660        let matcher = Arc::new(RwLock::new(None));
661        let config = WatcherFilterConfig::new(root, None);
662
663        let filtered = filter_watcher_raw_paths_for_test(&config, &matcher, [exclude]);
664
665        assert!(filtered.ignore_file_changed);
666        assert!(filtered.changed.is_empty());
667    }
668
669    #[test]
670    fn root_deleted_sends_control_and_exits() {
671        let tmp = TempDir::new().unwrap();
672        let root = std::fs::canonicalize(tmp.path()).unwrap();
673        let matcher = Arc::new(RwLock::new(None));
674        let generation = Arc::new(AtomicU64::new(0));
675        let shutdown = Arc::new(AtomicBool::new(false));
676        let (dispatch_tx, dispatch_rx) = watcher_dispatch_channel();
677        let (raw_tx, raw_rx) = mpsc::channel();
678        let config = WatcherFilterConfig::new(root.clone(), None);
679        let mut filter = WatcherFilterThread::new(
680            config,
681            matcher,
682            generation,
683            dispatch_tx,
684            Arc::clone(&shutdown),
685        );
686        let handle = thread::spawn(move || filter.run(raw_rx));
687        let _raw_tx = raw_tx;
688        std::fs::remove_dir_all(&root).unwrap();
689
690        let event = dispatch_rx
691            .recv_timeout(Duration::from_secs(2))
692            .expect("root deleted event");
693        assert_eq!(event, WatcherDispatchEvent::RootDeleted);
694        shutdown.store(true, Ordering::SeqCst);
695        handle.join().unwrap();
696    }
697}