Skip to main content

notify/
poll.rs

1//! Generic Watcher implementation based on polling
2//!
3//! Checks the `watch`ed paths periodically to detect changes. This implementation only uses
4//! Rust stdlib APIs and should work on all of the platforms it supports.
5
6use crate::{
7    Config, Error, EventHandler, PathsMut, Receiver, Result, Sender, WatchMode, Watcher,
8    poll::data::WatchData, unbounded,
9};
10use std::{
11    path::{Path, PathBuf},
12    sync::mpsc,
13    thread,
14    time::Duration,
15};
16
17/// Event sent for registered handlers on initial directory scans
18pub type ScanEvent = crate::Result<PathBuf>;
19
20/// Handler trait for receivers of [`ScanEvent`].
21/// Very much the same as [`EventHandler`], but including the Result.
22///
23/// See the full example for more information.
24pub trait ScanEventHandler: Send + 'static {
25    /// Handles an event.
26    fn handle_event(&mut self, event: ScanEvent);
27}
28
29impl<F> ScanEventHandler for F
30where
31    F: FnMut(ScanEvent) + Send + 'static,
32{
33    fn handle_event(&mut self, event: ScanEvent) {
34        (self)(event);
35    }
36}
37
38#[cfg(feature = "crossbeam-channel")]
39impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
40    fn handle_event(&mut self, event: ScanEvent) {
41        let result = self.send(event);
42        if let Err(e) = result {
43            tracing::error!(?e, "failed to send scan event result");
44        }
45    }
46}
47
48#[cfg(feature = "flume")]
49impl ScanEventHandler for flume::Sender<ScanEvent> {
50    fn handle_event(&mut self, event: ScanEvent) {
51        let result = self.send(event);
52        if let Err(e) = result {
53            tracing::error!(?e, "failed to send scan event result");
54        }
55    }
56}
57
58impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
59    fn handle_event(&mut self, event: ScanEvent) {
60        let result = self.send(event);
61        if let Err(e) = result {
62            tracing::error!(?e, "failed to send scan event result");
63        }
64    }
65}
66
67impl ScanEventHandler for () {
68    fn handle_event(&mut self, _event: ScanEvent) {}
69}
70
71use data::DataBuilder;
72mod data {
73    use crate::{
74        Error, EventHandler, Result, WatchMode,
75        consolidating_path_trie::ConsolidatingPathTrie,
76        event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
77    };
78    use std::{
79        cell::RefCell,
80        collections::{HashMap, hash_map::RandomState},
81        fmt::{self, Debug},
82        fs::{File, FileType, Metadata},
83        hash::{BuildHasher, Hasher},
84        io::{self, Read},
85        path::{Path, PathBuf},
86        time::Instant,
87    };
88    use walkdir::WalkDir;
89
90    use super::ScanEventHandler;
91
92    fn system_time_to_seconds(time: std::time::SystemTime) -> i64 {
93        #[expect(clippy::cast_possible_wrap)]
94        match time.duration_since(std::time::SystemTime::UNIX_EPOCH) {
95            Ok(d) => d.as_secs() as i64,
96            Err(e) => -(e.duration().as_secs() as i64),
97        }
98    }
99
100    /// Builder for [`WatchData`] & [`PathData`].
101    pub(super) struct DataBuilder {
102        emitter: EventEmitter,
103        scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
104
105        // TODO: May allow user setup their custom BuildHasher / BuildHasherDefault
106        // in future.
107        build_hasher: Option<RandomState>,
108
109        // current timestamp for building Data.
110        now: Instant,
111    }
112
113    impl DataBuilder {
114        pub(super) fn new<F, G>(
115            event_handler: F,
116            compare_content: bool,
117            scan_emitter: Option<G>,
118        ) -> Self
119        where
120            F: EventHandler,
121            G: ScanEventHandler,
122        {
123            let scan_emitter = match scan_emitter {
124                None => None,
125                Some(v) => {
126                    // workaround for a weird type resolution bug when directly going to dyn Trait
127                    let intermediate: Box<RefCell<dyn ScanEventHandler>> =
128                        Box::new(RefCell::new(v));
129                    Some(intermediate)
130                }
131            };
132            Self {
133                emitter: EventEmitter::new(event_handler),
134                scan_emitter,
135                build_hasher: compare_content.then(RandomState::default),
136                now: Instant::now(),
137            }
138        }
139
140        /// Update internal timestamp.
141        pub(super) fn update_timestamp(&mut self) {
142            self.now = Instant::now();
143        }
144
145        /// Create [`PathData`].
146        fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
147            PathData::new(self, meta_path)
148        }
149    }
150
151    impl Debug for DataBuilder {
152        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
153            f.debug_struct("DataBuilder")
154                .field("build_hasher", &self.build_hasher)
155                .field("now", &self.now)
156                .finish_non_exhaustive()
157        }
158    }
159
160    #[derive(Debug)]
161    struct WatchHandlers {
162        current: HashMap<PathBuf, /* recursive */ bool>,
163        next: HashMap<PathBuf, /* recursive */ bool>,
164        is_stale: bool,
165    }
166
167    impl WatchHandlers {
168        fn new() -> Self {
169            Self {
170                current: HashMap::new(),
171                next: HashMap::new(),
172                is_stale: false,
173            }
174        }
175
176        /// Recalculate from `watches`.
177        fn recalculate(&mut self, watches: &HashMap<PathBuf, WatchMode>) {
178            self.next.clear();
179            self.is_stale = true;
180
181            let mut trie = ConsolidatingPathTrie::new(false);
182            for (path, mode) in watches {
183                if mode.recursive_mode == crate::RecursiveMode::Recursive {
184                    trie.insert(path);
185                }
186            }
187            // insert non-recursive watches that are not covered by recursive watches
188            for (path, mode) in watches {
189                if mode.recursive_mode != crate::RecursiveMode::Recursive {
190                    self.next.insert(path.clone(), false);
191                }
192            }
193            // insert recursive watches
194            for path in trie.values() {
195                self.next.insert(path, true);
196            }
197        }
198
199        fn use_handlers(
200            &mut self,
201        ) -> (
202            &HashMap<PathBuf, /* recursive */ bool>,
203            Option<HashMap<PathBuf, /* recursive */ bool>>,
204        ) {
205            if self.is_stale {
206                let old_next = std::mem::take(&mut self.next);
207                let old_current = std::mem::replace(&mut self.current, old_next);
208                self.is_stale = false;
209                return (&self.current, Some(old_current));
210            }
211            (&self.current, None)
212        }
213    }
214
215    #[derive(Debug)]
216    pub(super) struct WatchData {
217        // config part, won't change.
218        follow_symlinks: bool,
219
220        // current status part.
221        watches: HashMap<PathBuf, WatchMode>,
222        watch_handlers: WatchHandlers,
223        all_path_data: HashMap<PathBuf, PathData>,
224    }
225
226    impl WatchData {
227        /// Create a new `WatchData`.
228        pub fn new(follow_symlinks: bool) -> Self {
229            Self {
230                follow_symlinks,
231                watches: HashMap::new(),
232                watch_handlers: WatchHandlers::new(),
233                all_path_data: HashMap::new(),
234            }
235        }
236
237        pub fn add_watch(&mut self, path: PathBuf, mode: WatchMode) -> Result<()> {
238            if mode.target_mode == crate::TargetMode::NoTrack && !path.exists() {
239                return Err(crate::Error::path_not_found().add_path(path));
240            }
241
242            self.watches.insert(path, mode);
243            self.watch_handlers.recalculate(&self.watches);
244            Ok(())
245        }
246
247        pub fn add_watch_multiple(&mut self, paths: Vec<(PathBuf, WatchMode)>) -> Result<()> {
248            for (path, mode) in paths {
249                if mode.target_mode == crate::TargetMode::NoTrack && !path.exists() {
250                    return Err(crate::Error::path_not_found().add_path(path));
251                }
252
253                self.watches.insert(path, mode);
254            }
255            self.watch_handlers.recalculate(&self.watches);
256            Ok(())
257        }
258
259        pub fn remove_watch(&mut self, path: &Path) -> Result<()> {
260            self.watches.remove(path).ok_or(Error::watch_not_found())?;
261            self.watch_handlers.recalculate(&self.watches);
262            Ok(())
263        }
264
265        /// Rescan filesystem and update this `WatchData`.
266        ///
267        /// # Side effect
268        ///
269        /// This function may emit event by `data_builder.emitter`.
270        pub(super) fn rescan(&mut self, data_builder: &DataBuilder) {
271            let (watch_handlers, old_watch_handlers) = self.watch_handlers.use_handlers();
272
273            // scan current filesystem.
274            for (path, new_path_data) in
275                Self::scan_all_path_data(data_builder, watch_handlers, self.follow_symlinks)
276            {
277                let old_path_data = self
278                    .all_path_data
279                    .insert(path.clone(), new_path_data.clone());
280
281                let is_initial = old_watch_handlers
282                    .as_ref()
283                    .is_some_and(|old_watch_handlers| {
284                        !old_watch_handlers.contains_key(&path)
285                            && !path.ancestors().skip(1).any(|ancestor| {
286                                old_watch_handlers
287                                    .get(ancestor)
288                                    .is_some_and(|is_recursive| *is_recursive)
289                            })
290                    });
291                if is_initial {
292                    // emit initial scans
293                    if let Some(ref emitter) = data_builder.scan_emitter {
294                        emitter.borrow_mut().handle_event(Ok(path.clone()));
295                    }
296                } else {
297                    // emit event
298                    let event = PathData::compare_to_event(
299                        path,
300                        old_path_data.as_ref(),
301                        Some(&new_path_data),
302                    );
303                    if let Some(event) = event {
304                        data_builder.emitter.emit_ok(event);
305                    }
306                }
307            }
308
309            // scan for disappeared paths.
310            let mut disappeared_paths = Vec::new();
311            for (path, path_data) in &self.all_path_data {
312                if path_data.last_check < data_builder.now {
313                    disappeared_paths.push(path.clone());
314                }
315            }
316
317            // remove disappeared paths
318            for path in disappeared_paths {
319                let old_path_data = self.all_path_data.remove(&path);
320
321                // emit event
322                let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
323                if let Some(event) = event {
324                    data_builder.emitter.emit_ok(event);
325                }
326            }
327        }
328
329        /// Get all `PathData` by given configuration.
330        ///
331        /// # Side Effect
332        ///
333        /// This function may emit some IO Error events by `data_builder.emitter`.
334        fn scan_all_path_data(
335            data_builder: &DataBuilder,
336            watch_handlers: &HashMap<PathBuf, /* recursive */ bool>,
337            follow_symlinks: bool,
338        ) -> impl Iterator<Item = (PathBuf, PathData)> {
339            tracing::trace!("rescanning");
340
341            watch_handlers.iter().flat_map(move |(path, is_recursive)| {
342                tracing::trace!(?path, is_recursive, "scanning watch handler");
343
344                // WalkDir return only one entry if root is a file (not a folder),
345                // so we can use single logic to do the both file & dir's jobs.
346                //
347                // See: https://docs.rs/walkdir/2.0.1/walkdir/struct.WalkDir.html#method.new
348                WalkDir::new(path)
349                    .follow_links(follow_symlinks)
350                    .max_depth(if *is_recursive { usize::MAX } else { 1 })
351                    .into_iter()
352                    .filter_map(|entry_res| match entry_res {
353                        Ok(entry) => Some(entry),
354                        Err(err) => {
355                            tracing::warn!("walkdir error scanning {err:?}");
356
357                            if let Some(io_error) = err.io_error() {
358                                if io_error.kind() == io::ErrorKind::NotFound {
359                                    return None;
360                                }
361                                // clone an io::Error, so we have to create a new one.
362                                let new_io_error = io::Error::new(io_error.kind(), err.to_string());
363                                data_builder.emitter.emit_io_err(new_io_error, err.path());
364                            } else {
365                                let crate_err =
366                                    Error::new(crate::ErrorKind::Generic(err.to_string()));
367                                data_builder.emitter.emit(Err(crate_err));
368                            }
369                            None
370                        }
371                    })
372                    .filter_map(move |entry| match entry.metadata() {
373                        Ok(metadata) => {
374                            let path = entry.into_path();
375                            let meta_path = MetaPath::from_parts_unchecked(path, metadata);
376                            let data_path = data_builder.build_path_data(&meta_path);
377
378                            Some((meta_path.into_path(), data_path))
379                        }
380                        Err(err) => {
381                            if let Some(io_error) = err.io_error()
382                                && io_error.kind() == io::ErrorKind::NotFound
383                            {
384                                return None;
385                            }
386
387                            // emit event.
388                            let path = entry.into_path();
389                            data_builder.emitter.emit_io_err(err, Some(path));
390
391                            None
392                        }
393                    })
394            })
395        }
396    }
397
398    /// Stored data for a one path locations.
399    ///
400    /// See [`WatchData`] for more detail.
401    #[derive(Debug, Clone)]
402    struct PathData {
403        /// File updated time.
404        mtime: i64,
405
406        file_type: FileType,
407
408        /// Content's hash value, only available if user request compare file
409        /// contents and read successful.
410        hash: Option<u64>,
411
412        /// Checked time.
413        last_check: Instant,
414    }
415
416    impl PathData {
417        /// Create a new `PathData`.
418        fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
419            let metadata = meta_path.metadata();
420
421            PathData {
422                mtime: metadata.modified().map_or(0, system_time_to_seconds),
423                file_type: metadata.file_type(),
424                hash: data_builder
425                    .build_hasher
426                    .as_ref()
427                    .filter(|_| metadata.is_file())
428                    .and_then(|build_hasher| {
429                        Self::get_content_hash(build_hasher, meta_path.path()).ok()
430                    }),
431
432                last_check: data_builder.now,
433            }
434        }
435
436        /// Get hash value for the data content in given file `path`.
437        fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
438            let mut hasher = build_hasher.build_hasher();
439            let mut file = File::open(path)?;
440            let mut buf = [0; 512];
441
442            loop {
443                let n = match file.read(&mut buf) {
444                    Ok(0) => break,
445                    Ok(len) => len,
446                    Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
447                    Err(e) => return Err(e),
448                };
449
450                hasher.write(&buf[..n]);
451            }
452
453            Ok(hasher.finish())
454        }
455
456        /// Get `CreateKind` by file type.
457        fn get_create_kind(&self) -> CreateKind {
458            #[expect(clippy::filetype_is_file)]
459            if self.file_type.is_dir() {
460                CreateKind::Folder
461            } else if self.file_type.is_file() {
462                CreateKind::File
463            } else {
464                CreateKind::Any
465            }
466        }
467
468        /// Get `RemoveKind` by file type.
469        fn get_remove_kind(&self) -> RemoveKind {
470            #[expect(clippy::filetype_is_file)]
471            if self.file_type.is_dir() {
472                RemoveKind::Folder
473            } else if self.file_type.is_file() {
474                RemoveKind::File
475            } else {
476                RemoveKind::Any
477            }
478        }
479
480        /// Get [`Event`] by compare two optional [`PathData`].
481        fn compare_to_event<P>(
482            path: P,
483            old: Option<&PathData>,
484            new: Option<&PathData>,
485        ) -> Option<Event>
486        where
487            P: Into<PathBuf>,
488        {
489            match (old, new) {
490                (Some(old), Some(new)) => {
491                    if new.mtime > old.mtime {
492                        Some(EventKind::Modify(ModifyKind::Metadata(
493                            MetadataKind::WriteTime,
494                        )))
495                    } else if new.hash != old.hash {
496                        Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
497                    } else {
498                        None
499                    }
500                }
501                (None, Some(new)) => Some(EventKind::Create(new.get_create_kind())),
502                (Some(old), None) => Some(EventKind::Remove(old.get_remove_kind())),
503                (None, None) => None,
504            }
505            .map(|event_kind| Event::new(event_kind).add_path(path.into()))
506        }
507    }
508
509    /// Compose path and its metadata.
510    ///
511    /// This data structure designed for make sure path and its metadata can be
512    /// transferred in consistent way, and may avoid some duplicated
513    /// `fs::metadata()` function call in some situations.
514    #[derive(Debug)]
515    pub(super) struct MetaPath {
516        path: PathBuf,
517        metadata: Metadata,
518    }
519
520    impl MetaPath {
521        /// Create `MetaPath` by given parts.
522        ///
523        /// # Invariant
524        ///
525        /// User must make sure the input `metadata` are associated with `path`.
526        fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
527            Self { path, metadata }
528        }
529
530        fn path(&self) -> &Path {
531            &self.path
532        }
533
534        fn metadata(&self) -> &Metadata {
535            &self.metadata
536        }
537
538        fn into_path(self) -> PathBuf {
539            self.path
540        }
541    }
542
543    /// Thin wrapper for outer event handler, for easy to use.
544    struct EventEmitter(
545        // Use `RefCell` to make sure `emit()` only need shared borrow of self (&self).
546        // Use `Box` to make sure EventEmitter is Sized.
547        Box<RefCell<dyn EventHandler>>,
548    );
549
550    impl EventEmitter {
551        fn new<F: EventHandler>(event_handler: F) -> Self {
552            Self(Box::new(RefCell::new(event_handler)))
553        }
554
555        /// Emit single event.
556        fn emit(&self, event: crate::Result<Event>) {
557            self.0.borrow_mut().handle_event(event);
558        }
559
560        /// Emit event.
561        fn emit_ok(&self, event: Event) {
562            self.emit(Ok(event));
563        }
564
565        /// Emit io error event.
566        fn emit_io_err<E, P>(&self, err: E, path: Option<P>)
567        where
568            E: Into<io::Error>,
569            P: Into<PathBuf>,
570        {
571            let e = crate::Error::io(err.into());
572            if let Some(path) = path {
573                self.emit(Err(e.add_path(path.into())));
574            } else {
575                self.emit(Err(e));
576            }
577        }
578    }
579}
580
581enum EventLoopMsg {
582    AddWatch(PathBuf, WatchMode, Sender<Result<()>>),
583    AddWatchMultiple(Vec<(PathBuf, WatchMode)>, Sender<Result<()>>),
584    RemoveWatch(PathBuf, Sender<Result<()>>),
585    #[cfg(test)]
586    WaitNextScan(Sender<Result<()>>),
587    /// currently used only for manual polling
588    Poll,
589    Shutdown,
590}
591
592struct PollPathsMut<'a> {
593    inner: &'a mut PollWatcher,
594    add_paths: Vec<(PathBuf, WatchMode)>,
595}
596impl<'a> PollPathsMut<'a> {
597    fn new(watcher: &'a mut PollWatcher) -> Self {
598        Self {
599            inner: watcher,
600            add_paths: Vec::new(),
601        }
602    }
603}
604impl PathsMut for PollPathsMut<'_> {
605    #[tracing::instrument(level = "debug", skip(self))]
606    fn add(&mut self, path: &Path, watch_mode: WatchMode) -> Result<()> {
607        self.add_paths.push((path.to_owned(), watch_mode));
608        Ok(())
609    }
610
611    #[tracing::instrument(level = "debug", skip(self))]
612    fn remove(&mut self, path: &Path) -> Result<()> {
613        self.inner.unwatch_inner(path)
614    }
615
616    #[tracing::instrument(level = "debug", skip(self))]
617    fn commit(self: Box<Self>) -> Result<()> {
618        let paths = self.add_paths;
619        self.inner.watch_multiple_inner(paths)
620    }
621}
622
623/// Polling based `Watcher` implementation.
624///
625/// By default scans through all files and checks for changed entries based on their change date.
626/// Can also be changed to perform file content change checks.
627///
628/// See [Config] for more details.
629#[derive(Debug)]
630pub struct PollWatcher {
631    delay: Option<Duration>,
632    follow_symlinks: bool,
633
634    event_loop_tx: Sender<EventLoopMsg>,
635}
636
637impl PollWatcher {
638    /// Create a new [`PollWatcher`], configured as needed.
639    pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
640        Ok(Self::with_opt::<_, ()>(event_handler, config, None))
641    }
642
643    /// Actively poll for changes. Can be combined with a timeout of 0 to perform only manual polling.
644    pub fn poll(&self) -> crate::Result<()> {
645        self.event_loop_tx
646            .send(EventLoopMsg::Poll)
647            .map_err(|_| Error::generic("failed to send poll message"))?;
648        Ok(())
649    }
650
651    #[cfg(test)]
652    pub(crate) fn wait_next_scan(&self) -> crate::Result<()> {
653        let (tx, rx) = unbounded();
654        self.event_loop_tx
655            .send(EventLoopMsg::WaitNextScan(tx))
656            .map_err(|_| Error::generic("failed to send WaitNextScan message"))?;
657        rx.recv().unwrap()
658    }
659
660    /// Returns a sender to initiate changes detection.
661    #[cfg(test)]
662    pub(crate) fn poll_sender(&self) -> Sender<()> {
663        let inner_tx = self.event_loop_tx.clone();
664        let (tx, rx) = unbounded();
665        thread::Builder::new()
666            .name("notify-rs poll loop".to_string())
667            .spawn(move || {
668                for () in &rx {
669                    if let Err(err) = inner_tx.send(EventLoopMsg::Poll) {
670                        tracing::error!(?err, "failed to send poll message");
671                    }
672                }
673            })
674            .unwrap();
675        tx
676    }
677
678    /// Create a new [`PollWatcher`] with an scan event handler.
679    ///
680    /// `scan_fallback` is called on the initial scan with all files seen by the pollwatcher.
681    pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
682        event_handler: F,
683        config: Config,
684        scan_callback: G,
685    ) -> crate::Result<PollWatcher> {
686        Ok(Self::with_opt(event_handler, config, Some(scan_callback)))
687    }
688
689    /// create a new [`PollWatcher`] with all options.
690    fn with_opt<F: EventHandler, G: ScanEventHandler>(
691        event_handler: F,
692        config: Config,
693        scan_callback: Option<G>,
694    ) -> PollWatcher {
695        let (tx, rx) = unbounded();
696
697        let poll_watcher = PollWatcher {
698            delay: config.poll_interval(),
699            follow_symlinks: config.follow_symlinks(),
700
701            event_loop_tx: tx,
702        };
703
704        let data_builder =
705            DataBuilder::new(event_handler, config.compare_contents(), scan_callback);
706        poll_watcher.run(rx, data_builder);
707
708        poll_watcher
709    }
710
711    fn run(&self, rx: Receiver<EventLoopMsg>, mut data_builder: DataBuilder) {
712        let delay = self.delay;
713        let follow_symlinks = self.follow_symlinks;
714
715        let result = thread::Builder::new()
716            .name("notify-rs poll loop".to_string())
717            .spawn(move || {
718                let mut watch_data = WatchData::new(follow_symlinks);
719
720                loop {
721                    data_builder.update_timestamp();
722                    watch_data.rescan(&data_builder);
723
724                    // TODO: v7.0 use delay - (Instant::now().saturating_duration_since(start))
725                    let result = if let Some(delay) = delay {
726                        rx.recv_timeout(delay).or_else(|e| match e {
727                            mpsc::RecvTimeoutError::Timeout => Ok(EventLoopMsg::Poll),
728                            mpsc::RecvTimeoutError::Disconnected => Err(mpsc::RecvError),
729                        })
730                    } else {
731                        rx.recv()
732                    };
733                    match result {
734                        Ok(EventLoopMsg::AddWatch(path, mode, resp_tx)) => {
735                            let result = resp_tx.send(watch_data.add_watch(path, mode));
736                            if let Err(e) = result {
737                                tracing::error!(?e, "failed to send AddWatch response");
738                            }
739                        }
740                        Ok(EventLoopMsg::AddWatchMultiple(paths, resp_tx)) => {
741                            let result = resp_tx.send(watch_data.add_watch_multiple(paths));
742                            if let Err(e) = result {
743                                tracing::error!(?e, "failed to send AddWatchMultiple response");
744                            }
745                        }
746                        Ok(EventLoopMsg::RemoveWatch(path, resp_tx)) => {
747                            let result = resp_tx.send(watch_data.remove_watch(&path));
748                            if let Err(e) = result {
749                                tracing::error!(?e, "failed to send RemoveWatch response");
750                            }
751                        }
752                        Ok(EventLoopMsg::Poll) => {
753                            // continue the loop
754                        }
755                        #[cfg(test)]
756                        Ok(EventLoopMsg::WaitNextScan(resp_tx)) => {
757                            let result = resp_tx.send(Ok(()));
758                            if let Err(e) = result {
759                                tracing::error!(?e, "failed to send WaitNextScan response");
760                            }
761                        }
762                        Ok(EventLoopMsg::Shutdown) => {
763                            break;
764                        }
765                        Err(e) => {
766                            tracing::error!(?e, "failed to receive poll message");
767                        }
768                    }
769                }
770            });
771        if let Err(e) = result {
772            tracing::error!(?e, "failed to start poll watcher thread");
773        }
774    }
775
776    /// Watch a path location.
777    fn watch_inner(&self, path: &Path, watch_mode: WatchMode) -> crate::Result<()> {
778        let (tx, rx) = unbounded();
779        self.event_loop_tx
780            .send(EventLoopMsg::AddWatch(path.to_path_buf(), watch_mode, tx))?;
781        rx.recv().unwrap()
782    }
783
784    fn watch_multiple_inner(&self, paths: Vec<(PathBuf, WatchMode)>) -> crate::Result<()> {
785        let (tx, rx) = unbounded();
786        self.event_loop_tx
787            .send(EventLoopMsg::AddWatchMultiple(paths, tx))?;
788        rx.recv().unwrap()
789    }
790
791    /// Unwatch a path.
792    ///
793    /// Return `Err(_)` if given path has't be monitored.
794    fn unwatch_inner(&self, path: &Path) -> crate::Result<()> {
795        let (tx, rx) = unbounded();
796        self.event_loop_tx
797            .send(EventLoopMsg::RemoveWatch(path.to_path_buf(), tx))?;
798        rx.recv().unwrap()
799    }
800}
801
802impl Watcher for PollWatcher {
803    /// Create a new [`PollWatcher`].
804    #[tracing::instrument(level = "debug", skip(event_handler))]
805    fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
806        Self::new(event_handler, config)
807    }
808
809    #[tracing::instrument(level = "debug", skip(self))]
810    fn watch(&mut self, path: &Path, watch_mode: WatchMode) -> crate::Result<()> {
811        self.watch_inner(path, watch_mode)
812    }
813
814    #[tracing::instrument(level = "debug", skip(self))]
815    fn paths_mut<'me>(&'me mut self) -> Box<dyn PathsMut + 'me> {
816        Box::new(PollPathsMut::new(self))
817    }
818
819    #[tracing::instrument(level = "debug", skip(self))]
820    fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
821        self.unwatch_inner(path)
822    }
823
824    fn kind() -> crate::WatcherKind {
825        crate::WatcherKind::PollWatcher
826    }
827}
828
829impl Drop for PollWatcher {
830    fn drop(&mut self) {
831        let result = self.event_loop_tx.send(EventLoopMsg::Shutdown);
832        if let Err(e) = result {
833            tracing::error!(?e, "failed to send shutdown message to poll watcher thread");
834        }
835    }
836}
837
838#[cfg(test)]
839mod tests {
840    #[cfg(target_family = "wasm")]
841    use std::thread::sleep;
842    #[cfg(target_family = "wasm")]
843    use std::time::Duration;
844
845    use super::PollWatcher;
846    use crate::{Error, ErrorKind, RecursiveMode, TargetMode, WatchMode, Watcher, test::*};
847
848    fn watcher() -> (TestWatcher<PollWatcher>, Receiver) {
849        poll_watcher_channel()
850    }
851
852    #[test]
853    fn poll_watcher_is_send_and_sync() {
854        fn check<T: Send + Sync>() {}
855        check::<PollWatcher>();
856    }
857
858    #[test]
859    fn create_file() {
860        let tmpdir = testdir();
861        let (mut watcher, rx) = watcher();
862        watcher.watch_recursively(&tmpdir);
863        watcher.watcher.wait_next_scan().expect("wait next scan");
864
865        let path = tmpdir.path().join("entry");
866        std::fs::File::create_new(&path).expect("Unable to create");
867
868        rx.sleep_until_parent_contains(&path);
869        rx.sleep_until_exists(&path);
870
871        rx.wait_ordered_exact([expected(&path).create_file()]);
872    }
873
874    #[test]
875    fn create_self_file() {
876        let tmpdir = testdir();
877        let (mut watcher, rx) = watcher();
878
879        let path = tmpdir.path().join("entry");
880
881        watcher.watch_nonrecursively(&path);
882        watcher.watcher.wait_next_scan().expect("wait next scan");
883
884        std::fs::File::create_new(&path).expect("create");
885
886        rx.sleep_until_exists(&path);
887        rx.wait_ordered_exact([expected(&path).create_file()]);
888    }
889
890    #[test]
891    fn create_self_file_no_track() {
892        let tmpdir = testdir();
893        let (mut watcher, _) = watcher();
894
895        let path = tmpdir.path().join("entry");
896
897        let result = watcher.watcher.watch(
898            &path,
899            WatchMode {
900                recursive_mode: RecursiveMode::NonRecursive,
901                target_mode: TargetMode::NoTrack,
902            },
903        );
904        assert!(matches!(
905            result,
906            Err(Error {
907                paths: _,
908                kind: ErrorKind::PathNotFound
909            })
910        ));
911    }
912
913    #[test]
914    fn create_self_file_nested() {
915        let tmpdir = testdir();
916        let (mut watcher, rx) = watcher();
917
918        let path = tmpdir.path().join("entry/nested");
919
920        watcher.watch_nonrecursively(&path);
921        watcher.watcher.wait_next_scan().expect("wait next scan");
922
923        std::fs::create_dir_all(path.parent().unwrap()).expect("create");
924        std::fs::File::create_new(&path).expect("create");
925
926        rx.wait_ordered_exact([expected(&path).create_file()]);
927    }
928
929    #[test]
930    fn create_dir() {
931        let tmpdir = testdir();
932        let (mut watcher, rx) = watcher();
933        watcher.watch_recursively(&tmpdir);
934        watcher.watcher.wait_next_scan().expect("wait next scan");
935
936        let path = tmpdir.path().join("entry");
937        std::fs::create_dir(&path).expect("Unable to create");
938
939        rx.sleep_until_parent_contains(&path);
940        rx.sleep_until_exists(&path);
941
942        rx.wait_ordered_exact([expected(&path).create_folder()]);
943    }
944
945    #[test]
946    fn modify_file() {
947        let tmpdir = testdir();
948        let (mut watcher, rx) = watcher();
949        let path = tmpdir.path().join("entry");
950        std::fs::File::create_new(&path).expect("Unable to create");
951
952        rx.sleep_until_parent_contains(&path);
953
954        watcher.watch_recursively(&tmpdir);
955        watcher.watcher.wait_next_scan().expect("wait next scan");
956        std::fs::write(&path, b"123").expect("Unable to write");
957
958        assert!(
959            rx.sleep_until(|| std::fs::read_to_string(&path).is_ok_and(|content| content == "123")),
960            "the file wasn't modified"
961        );
962        rx.wait_ordered_exact([expected(&path).modify_data_any()]);
963    }
964
965    #[test]
966    fn rename_file() {
967        let tmpdir = testdir();
968        let (mut watcher, rx) = watcher();
969        let path = tmpdir.path().join("entry");
970        let new_path = tmpdir.path().join("new_entry");
971        std::fs::File::create_new(&path).expect("Unable to create");
972
973        rx.sleep_until_parent_contains(&path);
974
975        watcher.watch_recursively(&tmpdir);
976
977        watcher.watcher.wait_next_scan().expect("wait next scan");
978        std::fs::rename(&path, &new_path).expect("Unable to remove");
979
980        rx.sleep_while_exists(&path);
981        rx.sleep_until_exists(&new_path);
982
983        rx.sleep_while_parent_contains(&path);
984        rx.sleep_until_parent_contains(&new_path);
985
986        rx.wait_unordered_exact([
987            expected(&path).remove_file(),
988            expected(&new_path).create_file(),
989        ]);
990    }
991
992    #[test]
993    fn rename_self_file() {
994        let tmpdir = testdir();
995        let (mut watcher, rx) = watcher();
996
997        let path = tmpdir.path().join("entry");
998        std::fs::File::create_new(&path).expect("create");
999
1000        watcher.watch_nonrecursively(&path);
1001        watcher.watcher.wait_next_scan().expect("wait next scan");
1002        let new_path = tmpdir.path().join("renamed");
1003
1004        std::fs::rename(&path, &new_path).expect("rename");
1005
1006        rx.sleep_while_exists(&path);
1007        rx.sleep_until_exists(&new_path);
1008
1009        rx.wait_unordered_exact([expected(&path).remove_file()])
1010            .ensure_no_tail();
1011
1012        std::fs::rename(&new_path, &path).expect("rename2");
1013        watcher.watcher.wait_next_scan().expect("wait next scan");
1014
1015        rx.sleep_while_exists(&new_path);
1016        rx.sleep_until_exists(&path);
1017
1018        rx.wait_unordered_exact([expected(&path).create_file()])
1019            .ensure_no_tail();
1020    }
1021
1022    #[test]
1023    fn rename_self_file_no_track() {
1024        let tmpdir = testdir();
1025        let (mut watcher, rx) = watcher();
1026
1027        let path = tmpdir.path().join("entry");
1028        std::fs::File::create_new(&path).expect("create");
1029
1030        watcher.watch(
1031            &path,
1032            WatchMode {
1033                recursive_mode: RecursiveMode::NonRecursive,
1034                target_mode: TargetMode::NoTrack,
1035            },
1036        );
1037        watcher.watcher.wait_next_scan().expect("wait next scan");
1038
1039        let new_path = tmpdir.path().join("renamed");
1040
1041        std::fs::rename(&path, &new_path).expect("rename");
1042
1043        rx.sleep_while_exists(&path);
1044        rx.sleep_until_exists(&new_path);
1045
1046        #[cfg(target_family = "wasm")]
1047        sleep(Duration::from_millis(100));
1048
1049        rx.wait_unordered_exact([
1050            expected(&path).modify_data_any().optional(),
1051            expected(&path).remove_file(),
1052        ])
1053        .ensure_no_tail();
1054
1055        let result = watcher.watcher.watch(
1056            &path,
1057            WatchMode {
1058                recursive_mode: RecursiveMode::NonRecursive,
1059                target_mode: TargetMode::NoTrack,
1060            },
1061        );
1062        assert!(matches!(
1063            result,
1064            Err(Error {
1065                paths: _,
1066                kind: ErrorKind::PathNotFound
1067            })
1068        ));
1069    }
1070
1071    #[test]
1072    fn delete_file() {
1073        let tmpdir = testdir();
1074        let (mut watcher, rx) = watcher();
1075        let path = tmpdir.path().join("entry");
1076        std::fs::File::create_new(&path).expect("Unable to create");
1077
1078        rx.sleep_until_parent_contains(&path);
1079
1080        watcher.watch_recursively(&tmpdir);
1081        watcher.watcher.wait_next_scan().expect("wait next scan");
1082
1083        std::fs::remove_file(&path).expect("Unable to remove");
1084
1085        rx.sleep_while_exists(&path);
1086        rx.sleep_while_parent_contains(&path);
1087
1088        rx.wait_ordered_exact([
1089            expected(&path).modify_data_any().optional(),
1090            expected(&path).remove_file(),
1091        ]);
1092    }
1093
1094    #[test]
1095    fn delete_self_file() {
1096        let tmpdir = testdir();
1097        let (mut watcher, rx) = watcher();
1098        let path = tmpdir.path().join("entry");
1099        std::fs::File::create_new(&path).expect("Unable to create");
1100
1101        watcher.watch_nonrecursively(&path);
1102        watcher.watcher.wait_next_scan().expect("wait next scan");
1103
1104        std::fs::remove_file(&path).expect("Unable to remove");
1105
1106        rx.sleep_while_exists(&path);
1107        rx.wait_ordered_exact([
1108            expected(&path).modify_data_any().optional(),
1109            expected(&path).remove_file(),
1110        ]);
1111
1112        std::fs::write(&path, "").expect("write");
1113
1114        rx.sleep_until_exists(&path);
1115        rx.wait_ordered_exact([expected(&path).create_file()]);
1116    }
1117
1118    #[test]
1119    fn delete_self_file_no_track() {
1120        let tmpdir = testdir();
1121        let (mut watcher, rx) = watcher();
1122        let path = tmpdir.path().join("entry");
1123        std::fs::File::create_new(&path).expect("Unable to create");
1124
1125        watcher.watch(
1126            &path,
1127            WatchMode {
1128                recursive_mode: RecursiveMode::NonRecursive,
1129                target_mode: TargetMode::NoTrack,
1130            },
1131        );
1132        watcher.watcher.wait_next_scan().expect("wait next scan");
1133
1134        std::fs::remove_file(&path).expect("Unable to remove");
1135
1136        rx.sleep_while_exists(&path);
1137        rx.wait_ordered_exact([
1138            expected(&path).modify_data_any().optional(),
1139            expected(&path).remove_file(),
1140        ]);
1141
1142        #[cfg(target_family = "wasm")]
1143        sleep(Duration::from_millis(100));
1144
1145        std::fs::write(&path, "").expect("write");
1146
1147        rx.ensure_empty_with_wait();
1148    }
1149
1150    #[test]
1151    fn create_write_overwrite() {
1152        let tmpdir = testdir();
1153        let (mut watcher, rx) = watcher();
1154        let overwritten_file = tmpdir.path().join("overwritten_file");
1155        let overwriting_file = tmpdir.path().join("overwriting_file");
1156        std::fs::write(&overwritten_file, "123").expect("write1");
1157
1158        rx.sleep_until_parent_contains(&overwritten_file);
1159        rx.sleep_until_exists(&overwritten_file);
1160
1161        watcher.watch_nonrecursively(&tmpdir);
1162        watcher.watcher.wait_next_scan().expect("wait next scan");
1163
1164        std::fs::File::create(&overwriting_file).expect("create");
1165        std::fs::write(&overwriting_file, "321").expect("write2");
1166        std::fs::rename(&overwriting_file, &overwritten_file).expect("rename");
1167
1168        rx.sleep_while_exists(&overwriting_file);
1169        rx.sleep_while_parent_contains(&overwriting_file);
1170
1171        assert!(
1172            rx.sleep_until(
1173                || std::fs::read_to_string(&overwritten_file).is_ok_and(|cnt| cnt == "321")
1174            ),
1175            "file {overwritten_file:?} was not replaced"
1176        );
1177
1178        rx.wait_unordered([expected(&overwritten_file).modify_data_any()]);
1179    }
1180}