notify/
poll.rs

1//! Generic Watcher implementation based on polling
2//!
3//! Checks the `watch`ed paths periodically to detect changes. This implementation only uses
4//! Rust stdlib APIs and should work on all of the platforms it supports.
5
6use crate::{
7    Config, Error, EventHandler, PathsMut, Receiver, Result, Sender, WatchMode, Watcher,
8    poll::data::WatchData, unbounded,
9};
10use std::{
11    path::{Path, PathBuf},
12    sync::mpsc,
13    thread,
14    time::Duration,
15};
16
17/// Event sent for registered handlers on initial directory scans
18pub type ScanEvent = crate::Result<PathBuf>;
19
20/// Handler trait for receivers of [`ScanEvent`].
21/// Very much the same as [`EventHandler`], but including the Result.
22///
23/// See the full example for more information.
24pub trait ScanEventHandler: Send + 'static {
25    /// Handles an event.
26    fn handle_event(&mut self, event: ScanEvent);
27}
28
29impl<F> ScanEventHandler for F
30where
31    F: FnMut(ScanEvent) + Send + 'static,
32{
33    fn handle_event(&mut self, event: ScanEvent) {
34        (self)(event);
35    }
36}
37
38#[cfg(feature = "crossbeam-channel")]
39impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
40    fn handle_event(&mut self, event: ScanEvent) {
41        let result = self.send(event);
42        if let Err(e) = result {
43            tracing::error!(?e, "failed to send scan event result");
44        }
45    }
46}
47
48#[cfg(feature = "flume")]
49impl ScanEventHandler for flume::Sender<ScanEvent> {
50    fn handle_event(&mut self, event: ScanEvent) {
51        let result = self.send(event);
52        if let Err(e) = result {
53            tracing::error!(?e, "failed to send scan event result");
54        }
55    }
56}
57
58impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
59    fn handle_event(&mut self, event: ScanEvent) {
60        let result = self.send(event);
61        if let Err(e) = result {
62            tracing::error!(?e, "failed to send scan event result");
63        }
64    }
65}
66
67impl ScanEventHandler for () {
68    fn handle_event(&mut self, _event: ScanEvent) {}
69}
70
71use data::DataBuilder;
72mod data {
73    use crate::{
74        Error, EventHandler, Result, WatchMode,
75        consolidating_path_trie::ConsolidatingPathTrie,
76        event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
77    };
78    use std::{
79        cell::RefCell,
80        collections::{HashMap, hash_map::RandomState},
81        fmt::{self, Debug},
82        fs::{File, FileType, Metadata},
83        hash::{BuildHasher, Hasher},
84        io::{self, Read},
85        path::{Path, PathBuf},
86        time::Instant,
87    };
88    use walkdir::WalkDir;
89
90    use super::ScanEventHandler;
91
92    fn system_time_to_seconds(time: std::time::SystemTime) -> i64 {
93        #[expect(clippy::cast_possible_wrap)]
94        match time.duration_since(std::time::SystemTime::UNIX_EPOCH) {
95            Ok(d) => d.as_secs() as i64,
96            Err(e) => -(e.duration().as_secs() as i64),
97        }
98    }
99
100    /// Builder for [`WatchData`] & [`PathData`].
101    pub(super) struct DataBuilder {
102        emitter: EventEmitter,
103        scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
104
105        // TODO: May allow user setup their custom BuildHasher / BuildHasherDefault
106        // in future.
107        build_hasher: Option<RandomState>,
108
109        // current timestamp for building Data.
110        now: Instant,
111    }
112
113    impl DataBuilder {
114        pub(super) fn new<F, G>(
115            event_handler: F,
116            compare_content: bool,
117            scan_emitter: Option<G>,
118        ) -> Self
119        where
120            F: EventHandler,
121            G: ScanEventHandler,
122        {
123            let scan_emitter = match scan_emitter {
124                None => None,
125                Some(v) => {
126                    // workaround for a weird type resolution bug when directly going to dyn Trait
127                    let intermediate: Box<RefCell<dyn ScanEventHandler>> =
128                        Box::new(RefCell::new(v));
129                    Some(intermediate)
130                }
131            };
132            Self {
133                emitter: EventEmitter::new(event_handler),
134                scan_emitter,
135                build_hasher: compare_content.then(RandomState::default),
136                now: Instant::now(),
137            }
138        }
139
140        /// Update internal timestamp.
141        pub(super) fn update_timestamp(&mut self) {
142            self.now = Instant::now();
143        }
144
145        /// Create [`PathData`].
146        fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
147            PathData::new(self, meta_path)
148        }
149    }
150
151    impl Debug for DataBuilder {
152        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
153            f.debug_struct("DataBuilder")
154                .field("build_hasher", &self.build_hasher)
155                .field("now", &self.now)
156                .finish_non_exhaustive()
157        }
158    }
159
160    #[derive(Debug)]
161    struct WatchHandlers {
162        current: HashMap<PathBuf, /* recursive */ bool>,
163        next: HashMap<PathBuf, /* recursive */ bool>,
164        is_stale: bool,
165    }
166
167    impl WatchHandlers {
168        fn new() -> Self {
169            Self {
170                current: HashMap::new(),
171                next: HashMap::new(),
172                is_stale: false,
173            }
174        }
175
176        /// Recalculate from `watches`.
177        fn recalculate(&mut self, watches: &HashMap<PathBuf, WatchMode>) {
178            self.next.clear();
179            self.is_stale = true;
180
181            let mut trie = ConsolidatingPathTrie::new(false);
182            for (path, mode) in watches {
183                if mode.recursive_mode == crate::RecursiveMode::Recursive {
184                    trie.insert(path);
185                }
186            }
187            // insert non-recursive watches that are not covered by recursive watches
188            for (path, mode) in watches {
189                if mode.recursive_mode != crate::RecursiveMode::Recursive {
190                    self.next.insert(path.clone(), false);
191                }
192            }
193            // insert recursive watches
194            for path in trie.values() {
195                self.next.insert(path, true);
196            }
197        }
198
199        fn use_handlers(
200            &mut self,
201        ) -> (
202            &HashMap<PathBuf, /* recursive */ bool>,
203            Option<HashMap<PathBuf, /* recursive */ bool>>,
204        ) {
205            if self.is_stale {
206                let old_next = std::mem::take(&mut self.next);
207                let old_current = std::mem::replace(&mut self.current, old_next);
208                self.is_stale = false;
209                return (&self.current, Some(old_current));
210            }
211            (&self.current, None)
212        }
213    }
214
215    #[derive(Debug)]
216    pub(super) struct WatchData {
217        // config part, won't change.
218        follow_symlinks: bool,
219
220        // current status part.
221        watches: HashMap<PathBuf, WatchMode>,
222        watch_handlers: WatchHandlers,
223        all_path_data: HashMap<PathBuf, PathData>,
224    }
225
226    impl WatchData {
227        /// Create a new `WatchData`.
228        pub fn new(follow_symlinks: bool) -> Self {
229            Self {
230                follow_symlinks,
231                watches: HashMap::new(),
232                watch_handlers: WatchHandlers::new(),
233                all_path_data: HashMap::new(),
234            }
235        }
236
237        pub fn add_watch(&mut self, path: PathBuf, mode: WatchMode) -> Result<()> {
238            if mode.target_mode == crate::TargetMode::NoTrack && !path.exists() {
239                return Err(crate::Error::path_not_found().add_path(path));
240            }
241
242            self.watches.insert(path, mode);
243            self.watch_handlers.recalculate(&self.watches);
244            Ok(())
245        }
246
247        pub fn add_watch_multiple(&mut self, paths: Vec<(PathBuf, WatchMode)>) -> Result<()> {
248            for (path, mode) in paths {
249                if mode.target_mode == crate::TargetMode::NoTrack && !path.exists() {
250                    return Err(crate::Error::path_not_found().add_path(path));
251                }
252
253                self.watches.insert(path, mode);
254            }
255            self.watch_handlers.recalculate(&self.watches);
256            Ok(())
257        }
258
259        pub fn remove_watch(&mut self, path: &Path) -> Result<()> {
260            self.watches.remove(path).ok_or(Error::watch_not_found())?;
261            self.watch_handlers.recalculate(&self.watches);
262            Ok(())
263        }
264
265        /// Rescan filesystem and update this `WatchData`.
266        ///
267        /// # Side effect
268        ///
269        /// This function may emit event by `data_builder.emitter`.
270        pub(super) fn rescan(&mut self, data_builder: &DataBuilder) {
271            let (watch_handlers, old_watch_handlers) = self.watch_handlers.use_handlers();
272
273            // scan current filesystem.
274            for (path, new_path_data) in
275                Self::scan_all_path_data(data_builder, watch_handlers, self.follow_symlinks)
276            {
277                let old_path_data = self
278                    .all_path_data
279                    .insert(path.clone(), new_path_data.clone());
280
281                let is_initial = old_watch_handlers
282                    .as_ref()
283                    .is_some_and(|old_watch_handlers| {
284                        !old_watch_handlers.contains_key(&path)
285                            && !path.ancestors().skip(1).any(|ancestor| {
286                                old_watch_handlers
287                                    .get(ancestor)
288                                    .is_some_and(|is_recursive| *is_recursive)
289                            })
290                    });
291                if is_initial {
292                    // emit initial scans
293                    if let Some(ref emitter) = data_builder.scan_emitter {
294                        emitter.borrow_mut().handle_event(Ok(path.clone()));
295                    }
296                } else {
297                    // emit event
298                    let event = PathData::compare_to_event(
299                        path,
300                        old_path_data.as_ref(),
301                        Some(&new_path_data),
302                    );
303                    if let Some(event) = event {
304                        data_builder.emitter.emit_ok(event);
305                    }
306                }
307            }
308
309            // scan for disappeared paths.
310            let mut disappeared_paths = Vec::new();
311            for (path, path_data) in &self.all_path_data {
312                if path_data.last_check < data_builder.now {
313                    disappeared_paths.push(path.clone());
314                }
315            }
316
317            // remove disappeared paths
318            for path in disappeared_paths {
319                let old_path_data = self.all_path_data.remove(&path);
320
321                // emit event
322                let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
323                if let Some(event) = event {
324                    data_builder.emitter.emit_ok(event);
325                }
326            }
327        }
328
329        /// Get all `PathData` by given configuration.
330        ///
331        /// # Side Effect
332        ///
333        /// This function may emit some IO Error events by `data_builder.emitter`.
334        fn scan_all_path_data(
335            data_builder: &DataBuilder,
336            watch_handlers: &HashMap<PathBuf, /* recursive */ bool>,
337            follow_symlinks: bool,
338        ) -> impl Iterator<Item = (PathBuf, PathData)> {
339            tracing::trace!("rescanning");
340
341            watch_handlers.iter().flat_map(move |(path, is_recursive)| {
342                tracing::trace!(?path, is_recursive, "scanning watch handler");
343
344                // WalkDir return only one entry if root is a file (not a folder),
345                // so we can use single logic to do the both file & dir's jobs.
346                //
347                // See: https://docs.rs/walkdir/2.0.1/walkdir/struct.WalkDir.html#method.new
348                WalkDir::new(path)
349                    .follow_links(follow_symlinks)
350                    .max_depth(if *is_recursive { usize::MAX } else { 1 })
351                    .into_iter()
352                    .filter_map(|entry_res| match entry_res {
353                        Ok(entry) => Some(entry),
354                        Err(err) => {
355                            tracing::warn!("walkdir error scanning {err:?}");
356
357                            if let Some(io_error) = err.io_error() {
358                                if io_error.kind() == io::ErrorKind::NotFound {
359                                    return None;
360                                }
361                                // clone an io::Error, so we have to create a new one.
362                                let new_io_error = io::Error::new(io_error.kind(), err.to_string());
363                                data_builder.emitter.emit_io_err(new_io_error, err.path());
364                            } else {
365                                let crate_err =
366                                    Error::new(crate::ErrorKind::Generic(err.to_string()));
367                                data_builder.emitter.emit(Err(crate_err));
368                            }
369                            None
370                        }
371                    })
372                    .filter_map(move |entry| match entry.metadata() {
373                        Ok(metadata) => {
374                            let path = entry.into_path();
375                            let meta_path = MetaPath::from_parts_unchecked(path, metadata);
376                            let data_path = data_builder.build_path_data(&meta_path);
377
378                            Some((meta_path.into_path(), data_path))
379                        }
380                        Err(err) => {
381                            if let Some(io_error) = err.io_error()
382                                && io_error.kind() == io::ErrorKind::NotFound
383                            {
384                                return None;
385                            }
386
387                            // emit event.
388                            let path = entry.into_path();
389                            data_builder.emitter.emit_io_err(err, Some(path));
390
391                            None
392                        }
393                    })
394            })
395        }
396    }
397
398    /// Stored data for a one path locations.
399    ///
400    /// See [`WatchData`] for more detail.
401    #[derive(Debug, Clone)]
402    struct PathData {
403        /// File updated time.
404        mtime: i64,
405
406        file_type: FileType,
407
408        /// Content's hash value, only available if user request compare file
409        /// contents and read successful.
410        hash: Option<u64>,
411
412        /// Checked time.
413        last_check: Instant,
414    }
415
416    impl PathData {
417        /// Create a new `PathData`.
418        fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
419            let metadata = meta_path.metadata();
420
421            PathData {
422                mtime: metadata.modified().map_or(0, system_time_to_seconds),
423                file_type: metadata.file_type(),
424                hash: data_builder
425                    .build_hasher
426                    .as_ref()
427                    .filter(|_| metadata.is_file())
428                    .and_then(|build_hasher| {
429                        Self::get_content_hash(build_hasher, meta_path.path()).ok()
430                    }),
431
432                last_check: data_builder.now,
433            }
434        }
435
436        /// Get hash value for the data content in given file `path`.
437        fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
438            let mut hasher = build_hasher.build_hasher();
439            let mut file = File::open(path)?;
440            let mut buf = [0; 512];
441
442            loop {
443                let n = match file.read(&mut buf) {
444                    Ok(0) => break,
445                    Ok(len) => len,
446                    Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
447                    Err(e) => return Err(e),
448                };
449
450                hasher.write(&buf[..n]);
451            }
452
453            Ok(hasher.finish())
454        }
455
456        /// Get `CreateKind` by file type.
457        fn get_create_kind(&self) -> CreateKind {
458            #[expect(clippy::filetype_is_file)]
459            if self.file_type.is_dir() {
460                CreateKind::Folder
461            } else if self.file_type.is_file() {
462                CreateKind::File
463            } else {
464                CreateKind::Any
465            }
466        }
467
468        /// Get `RemoveKind` by file type.
469        fn get_remove_kind(&self) -> RemoveKind {
470            #[expect(clippy::filetype_is_file)]
471            if self.file_type.is_dir() {
472                RemoveKind::Folder
473            } else if self.file_type.is_file() {
474                RemoveKind::File
475            } else {
476                RemoveKind::Any
477            }
478        }
479
480        /// Get [`Event`] by compare two optional [`PathData`].
481        fn compare_to_event<P>(
482            path: P,
483            old: Option<&PathData>,
484            new: Option<&PathData>,
485        ) -> Option<Event>
486        where
487            P: Into<PathBuf>,
488        {
489            match (old, new) {
490                (Some(old), Some(new)) => {
491                    if new.mtime > old.mtime {
492                        Some(EventKind::Modify(ModifyKind::Metadata(
493                            MetadataKind::WriteTime,
494                        )))
495                    } else if new.hash != old.hash {
496                        Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
497                    } else {
498                        None
499                    }
500                }
501                (None, Some(new)) => Some(EventKind::Create(new.get_create_kind())),
502                (Some(old), None) => Some(EventKind::Remove(old.get_remove_kind())),
503                (None, None) => None,
504            }
505            .map(|event_kind| Event::new(event_kind).add_path(path.into()))
506        }
507    }
508
509    /// Compose path and its metadata.
510    ///
511    /// This data structure designed for make sure path and its metadata can be
512    /// transferred in consistent way, and may avoid some duplicated
513    /// `fs::metadata()` function call in some situations.
514    #[derive(Debug)]
515    pub(super) struct MetaPath {
516        path: PathBuf,
517        metadata: Metadata,
518    }
519
520    impl MetaPath {
521        /// Create `MetaPath` by given parts.
522        ///
523        /// # Invariant
524        ///
525        /// User must make sure the input `metadata` are associated with `path`.
526        fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
527            Self { path, metadata }
528        }
529
530        fn path(&self) -> &Path {
531            &self.path
532        }
533
534        fn metadata(&self) -> &Metadata {
535            &self.metadata
536        }
537
538        fn into_path(self) -> PathBuf {
539            self.path
540        }
541    }
542
543    /// Thin wrapper for outer event handler, for easy to use.
544    struct EventEmitter(
545        // Use `RefCell` to make sure `emit()` only need shared borrow of self (&self).
546        // Use `Box` to make sure EventEmitter is Sized.
547        Box<RefCell<dyn EventHandler>>,
548    );
549
550    impl EventEmitter {
551        fn new<F: EventHandler>(event_handler: F) -> Self {
552            Self(Box::new(RefCell::new(event_handler)))
553        }
554
555        /// Emit single event.
556        fn emit(&self, event: crate::Result<Event>) {
557            self.0.borrow_mut().handle_event(event);
558        }
559
560        /// Emit event.
561        fn emit_ok(&self, event: Event) {
562            self.emit(Ok(event));
563        }
564
565        /// Emit io error event.
566        fn emit_io_err<E, P>(&self, err: E, path: Option<P>)
567        where
568            E: Into<io::Error>,
569            P: Into<PathBuf>,
570        {
571            let e = crate::Error::io(err.into());
572            if let Some(path) = path {
573                self.emit(Err(e.add_path(path.into())));
574            } else {
575                self.emit(Err(e));
576            }
577        }
578    }
579}
580
581enum EventLoopMsg {
582    AddWatch(PathBuf, WatchMode, Sender<Result<()>>),
583    AddWatchMultiple(Vec<(PathBuf, WatchMode)>, Sender<Result<()>>),
584    RemoveWatch(PathBuf, Sender<Result<()>>),
585    #[cfg(test)]
586    WaitNextScan(Sender<Result<()>>),
587    /// currently used only for manual polling
588    Poll,
589    Shutdown,
590}
591
592struct PollPathsMut<'a> {
593    inner: &'a mut PollWatcher,
594    add_paths: Vec<(PathBuf, WatchMode)>,
595}
596impl<'a> PollPathsMut<'a> {
597    fn new(watcher: &'a mut PollWatcher) -> Self {
598        Self {
599            inner: watcher,
600            add_paths: Vec::new(),
601        }
602    }
603}
604impl PathsMut for PollPathsMut<'_> {
605    #[tracing::instrument(level = "debug", skip(self))]
606    fn add(&mut self, path: &Path, watch_mode: WatchMode) -> Result<()> {
607        self.add_paths.push((path.to_owned(), watch_mode));
608        Ok(())
609    }
610
611    #[tracing::instrument(level = "debug", skip(self))]
612    fn remove(&mut self, path: &Path) -> Result<()> {
613        self.inner.unwatch_inner(path)
614    }
615
616    #[tracing::instrument(level = "debug", skip(self))]
617    fn commit(self: Box<Self>) -> Result<()> {
618        let paths = self.add_paths;
619        self.inner.watch_multiple_inner(paths)
620    }
621}
622
623/// Polling based `Watcher` implementation.
624///
625/// By default scans through all files and checks for changed entries based on their change date.
626/// Can also be changed to perform file content change checks.
627///
628/// See [Config] for more details.
629#[derive(Debug)]
630pub struct PollWatcher {
631    delay: Option<Duration>,
632    follow_symlinks: bool,
633
634    event_loop_tx: Sender<EventLoopMsg>,
635}
636
637impl PollWatcher {
638    /// Create a new [`PollWatcher`], configured as needed.
639    pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
640        Ok(Self::with_opt::<_, ()>(event_handler, config, None))
641    }
642
643    /// Actively poll for changes. Can be combined with a timeout of 0 to perform only manual polling.
644    pub fn poll(&self) -> crate::Result<()> {
645        self.event_loop_tx
646            .send(EventLoopMsg::Poll)
647            .map_err(|_| Error::generic("failed to send poll message"))?;
648        Ok(())
649    }
650
651    #[cfg(test)]
652    pub(crate) fn wait_next_scan(&self) -> crate::Result<()> {
653        let (tx, rx) = unbounded();
654        self.event_loop_tx
655            .send(EventLoopMsg::WaitNextScan(tx))
656            .map_err(|_| Error::generic("failed to send WaitNextScan message"))?;
657        rx.recv().unwrap()
658    }
659
660    /// Returns a sender to initiate changes detection.
661    #[cfg(test)]
662    pub(crate) fn poll_sender(&self) -> Sender<()> {
663        let inner_tx = self.event_loop_tx.clone();
664        let (tx, rx) = unbounded();
665        thread::Builder::new()
666            .name("notify-rs poll loop".to_string())
667            .spawn(move || {
668                for () in &rx {
669                    if let Err(err) = inner_tx.send(EventLoopMsg::Poll) {
670                        tracing::error!(?err, "failed to send poll message");
671                    }
672                }
673            })
674            .unwrap();
675        tx
676    }
677
678    /// Create a new [`PollWatcher`] with an scan event handler.
679    ///
680    /// `scan_fallback` is called on the initial scan with all files seen by the pollwatcher.
681    pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
682        event_handler: F,
683        config: Config,
684        scan_callback: G,
685    ) -> crate::Result<PollWatcher> {
686        Ok(Self::with_opt(event_handler, config, Some(scan_callback)))
687    }
688
689    /// create a new [`PollWatcher`] with all options.
690    fn with_opt<F: EventHandler, G: ScanEventHandler>(
691        event_handler: F,
692        config: Config,
693        scan_callback: Option<G>,
694    ) -> PollWatcher {
695        let (tx, rx) = unbounded();
696
697        let poll_watcher = PollWatcher {
698            delay: config.poll_interval(),
699            follow_symlinks: config.follow_symlinks(),
700
701            event_loop_tx: tx,
702        };
703
704        let data_builder =
705            DataBuilder::new(event_handler, config.compare_contents(), scan_callback);
706        poll_watcher.run(rx, data_builder);
707
708        poll_watcher
709    }
710
711    fn run(&self, rx: Receiver<EventLoopMsg>, mut data_builder: DataBuilder) {
712        let delay = self.delay;
713        let follow_symlinks = self.follow_symlinks;
714
715        let result = thread::Builder::new()
716            .name("notify-rs poll loop".to_string())
717            .spawn(move || {
718                let mut watch_data = WatchData::new(follow_symlinks);
719
720                loop {
721                    data_builder.update_timestamp();
722                    watch_data.rescan(&data_builder);
723
724                    // TODO: v7.0 use delay - (Instant::now().saturating_duration_since(start))
725                    let result = if let Some(delay) = delay {
726                        rx.recv_timeout(delay).or_else(|e| match e {
727                            mpsc::RecvTimeoutError::Timeout => Ok(EventLoopMsg::Poll),
728                            mpsc::RecvTimeoutError::Disconnected => Err(mpsc::RecvError),
729                        })
730                    } else {
731                        rx.recv()
732                    };
733                    match result {
734                        Ok(EventLoopMsg::AddWatch(path, mode, resp_tx)) => {
735                            let result = resp_tx.send(watch_data.add_watch(path, mode));
736                            if let Err(e) = result {
737                                tracing::error!(?e, "failed to send AddWatch response");
738                            }
739                        }
740                        Ok(EventLoopMsg::AddWatchMultiple(paths, resp_tx)) => {
741                            let result = resp_tx.send(watch_data.add_watch_multiple(paths));
742                            if let Err(e) = result {
743                                tracing::error!(?e, "failed to send AddWatchMultiple response");
744                            }
745                        }
746                        Ok(EventLoopMsg::RemoveWatch(path, resp_tx)) => {
747                            let result = resp_tx.send(watch_data.remove_watch(&path));
748                            if let Err(e) = result {
749                                tracing::error!(?e, "failed to send RemoveWatch response");
750                            }
751                        }
752                        Ok(EventLoopMsg::Poll) => {
753                            // continue the loop
754                        }
755                        #[cfg(test)]
756                        Ok(EventLoopMsg::WaitNextScan(resp_tx)) => {
757                            let result = resp_tx.send(Ok(()));
758                            if let Err(e) = result {
759                                tracing::error!(?e, "failed to send WaitNextScan response");
760                            }
761                        }
762                        Ok(EventLoopMsg::Shutdown) => {
763                            break;
764                        }
765                        Err(e) => {
766                            tracing::error!(?e, "failed to receive poll message");
767                        }
768                    }
769                }
770            });
771        if let Err(e) = result {
772            tracing::error!(?e, "failed to start poll watcher thread");
773        }
774    }
775
776    /// Watch a path location.
777    fn watch_inner(&self, path: &Path, watch_mode: WatchMode) -> crate::Result<()> {
778        let (tx, rx) = unbounded();
779        self.event_loop_tx
780            .send(EventLoopMsg::AddWatch(path.to_path_buf(), watch_mode, tx))?;
781        rx.recv().unwrap()
782    }
783
784    fn watch_multiple_inner(&self, paths: Vec<(PathBuf, WatchMode)>) -> crate::Result<()> {
785        let (tx, rx) = unbounded();
786        self.event_loop_tx
787            .send(EventLoopMsg::AddWatchMultiple(paths, tx))?;
788        rx.recv().unwrap()
789    }
790
791    /// Unwatch a path.
792    ///
793    /// Return `Err(_)` if given path has't be monitored.
794    fn unwatch_inner(&self, path: &Path) -> crate::Result<()> {
795        let (tx, rx) = unbounded();
796        self.event_loop_tx
797            .send(EventLoopMsg::RemoveWatch(path.to_path_buf(), tx))?;
798        rx.recv().unwrap()
799    }
800}
801
802impl Watcher for PollWatcher {
803    /// Create a new [`PollWatcher`].
804    #[tracing::instrument(level = "debug", skip(event_handler))]
805    fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
806        Self::new(event_handler, config)
807    }
808
809    #[tracing::instrument(level = "debug", skip(self))]
810    fn watch(&mut self, path: &Path, watch_mode: WatchMode) -> crate::Result<()> {
811        self.watch_inner(path, watch_mode)
812    }
813
814    #[tracing::instrument(level = "debug", skip(self))]
815    fn paths_mut<'me>(&'me mut self) -> Box<dyn PathsMut + 'me> {
816        Box::new(PollPathsMut::new(self))
817    }
818
819    #[tracing::instrument(level = "debug", skip(self))]
820    fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
821        self.unwatch_inner(path)
822    }
823
824    fn kind() -> crate::WatcherKind {
825        crate::WatcherKind::PollWatcher
826    }
827}
828
829impl Drop for PollWatcher {
830    fn drop(&mut self) {
831        let result = self.event_loop_tx.send(EventLoopMsg::Shutdown);
832        if let Err(e) = result {
833            tracing::error!(?e, "failed to send shutdown message to poll watcher thread");
834        }
835    }
836}
837
838#[cfg(test)]
839mod tests {
840    use super::PollWatcher;
841    use crate::{Error, ErrorKind, RecursiveMode, TargetMode, WatchMode, Watcher, test::*};
842
843    fn watcher() -> (TestWatcher<PollWatcher>, Receiver) {
844        poll_watcher_channel()
845    }
846
847    #[test]
848    fn poll_watcher_is_send_and_sync() {
849        fn check<T: Send + Sync>() {}
850        check::<PollWatcher>();
851    }
852
853    #[test]
854    fn create_file() {
855        let tmpdir = testdir();
856        let (mut watcher, rx) = watcher();
857        watcher.watch_recursively(&tmpdir);
858        watcher.watcher.wait_next_scan().expect("wait next scan");
859
860        let path = tmpdir.path().join("entry");
861        std::fs::File::create_new(&path).expect("Unable to create");
862
863        rx.sleep_until_exists(&path);
864        rx.wait_ordered_exact([expected(&path).create_file()]);
865    }
866
867    #[test]
868    fn create_self_file() {
869        let tmpdir = testdir();
870        let (mut watcher, rx) = watcher();
871
872        let path = tmpdir.path().join("entry");
873
874        watcher.watch_nonrecursively(&path);
875        watcher.watcher.wait_next_scan().expect("wait next scan");
876
877        std::fs::File::create_new(&path).expect("create");
878
879        rx.sleep_until_exists(&path);
880        rx.wait_ordered_exact([expected(&path).create_file()]);
881    }
882
883    #[test]
884    fn create_self_file_no_track() {
885        let tmpdir = testdir();
886        let (mut watcher, _) = watcher();
887
888        let path = tmpdir.path().join("entry");
889
890        let result = watcher.watcher.watch(
891            &path,
892            WatchMode {
893                recursive_mode: RecursiveMode::NonRecursive,
894                target_mode: TargetMode::NoTrack,
895            },
896        );
897        assert!(matches!(
898            result,
899            Err(Error {
900                paths: _,
901                kind: ErrorKind::PathNotFound
902            })
903        ));
904    }
905
906    #[test]
907    fn create_self_file_nested() {
908        let tmpdir = testdir();
909        let (mut watcher, rx) = watcher();
910
911        let path = tmpdir.path().join("entry/nested");
912
913        watcher.watch_nonrecursively(&path);
914        watcher.watcher.wait_next_scan().expect("wait next scan");
915
916        std::fs::create_dir_all(path.parent().unwrap()).expect("create");
917        std::fs::File::create_new(&path).expect("create");
918
919        rx.wait_ordered_exact([expected(&path).create_file()]);
920    }
921
922    #[test]
923    fn create_dir() {
924        let tmpdir = testdir();
925        let (mut watcher, rx) = watcher();
926        watcher.watch_recursively(&tmpdir);
927        watcher.watcher.wait_next_scan().expect("wait next scan");
928
929        let path = tmpdir.path().join("entry");
930        std::fs::create_dir(&path).expect("Unable to create");
931
932        rx.sleep_until_exists(&path);
933        rx.wait_ordered_exact([expected(&path).create_folder()]);
934    }
935
936    #[test]
937    fn modify_file() {
938        let tmpdir = testdir();
939        let (mut watcher, rx) = watcher();
940        let path = tmpdir.path().join("entry");
941        std::fs::File::create_new(&path).expect("Unable to create");
942
943        watcher.watch_recursively(&tmpdir);
944        watcher.watcher.wait_next_scan().expect("wait next scan");
945        std::fs::write(&path, b"123").expect("Unable to write");
946
947        assert!(
948            rx.sleep_until(|| std::fs::read_to_string(&path).is_ok_and(|content| content == "123")),
949            "the file wasn't modified"
950        );
951        rx.wait_ordered_exact([expected(&path).modify_data_any()]);
952    }
953
954    #[test]
955    fn rename_file() {
956        let tmpdir = testdir();
957        let (mut watcher, rx) = watcher();
958        let path = tmpdir.path().join("entry");
959        let new_path = tmpdir.path().join("new_entry");
960        std::fs::File::create_new(&path).expect("Unable to create");
961
962        watcher.watch_recursively(&tmpdir);
963        watcher.watcher.wait_next_scan().expect("wait next scan");
964        std::fs::rename(&path, &new_path).expect("Unable to remove");
965
966        rx.sleep_while_exists(&path);
967        rx.sleep_until_exists(&new_path);
968
969        rx.wait_unordered_exact([
970            expected(&path).remove_file(),
971            expected(&new_path).create_file(),
972        ]);
973    }
974
975    #[test]
976    fn rename_self_file() {
977        let tmpdir = testdir();
978        let (mut watcher, rx) = watcher();
979
980        let path = tmpdir.path().join("entry");
981        std::fs::File::create_new(&path).expect("create");
982
983        watcher.watch_nonrecursively(&path);
984        watcher.watcher.wait_next_scan().expect("wait next scan");
985        let new_path = tmpdir.path().join("renamed");
986
987        std::fs::rename(&path, &new_path).expect("rename");
988
989        rx.sleep_while_exists(&path);
990        rx.sleep_until_exists(&new_path);
991
992        rx.wait_unordered_exact([expected(&path).remove_file()])
993            .ensure_no_tail();
994
995        std::fs::rename(&new_path, &path).expect("rename2");
996
997        rx.sleep_while_exists(&new_path);
998        rx.sleep_until_exists(&path);
999
1000        rx.wait_unordered_exact([expected(&path).create_file()])
1001            .ensure_no_tail();
1002    }
1003
1004    #[test]
1005    fn rename_self_file_no_track() {
1006        let tmpdir = testdir();
1007        let (mut watcher, rx) = watcher();
1008
1009        let path = tmpdir.path().join("entry");
1010        std::fs::File::create_new(&path).expect("create");
1011
1012        watcher.watch(
1013            &path,
1014            WatchMode {
1015                recursive_mode: RecursiveMode::NonRecursive,
1016                target_mode: TargetMode::NoTrack,
1017            },
1018        );
1019        watcher.watcher.wait_next_scan().expect("wait next scan");
1020
1021        let new_path = tmpdir.path().join("renamed");
1022
1023        std::fs::rename(&path, &new_path).expect("rename");
1024
1025        rx.sleep_while_exists(&path);
1026        rx.sleep_until_exists(&new_path);
1027
1028        rx.wait_unordered_exact([expected(&path).remove_file()])
1029            .ensure_no_tail();
1030
1031        let result = watcher.watcher.watch(
1032            &path,
1033            WatchMode {
1034                recursive_mode: RecursiveMode::NonRecursive,
1035                target_mode: TargetMode::NoTrack,
1036            },
1037        );
1038        assert!(matches!(
1039            result,
1040            Err(Error {
1041                paths: _,
1042                kind: ErrorKind::PathNotFound
1043            })
1044        ));
1045    }
1046
1047    #[test]
1048    fn delete_file() {
1049        let tmpdir = testdir();
1050        let (mut watcher, rx) = watcher();
1051        let path = tmpdir.path().join("entry");
1052        std::fs::File::create_new(&path).expect("Unable to create");
1053
1054        watcher.watch_recursively(&tmpdir);
1055        watcher.watcher.wait_next_scan().expect("wait next scan");
1056        std::fs::remove_file(&path).expect("Unable to remove");
1057
1058        rx.sleep_while_exists(&path);
1059        rx.wait_ordered_exact([
1060            expected(&path).modify_data_any().optional(),
1061            expected(&path).remove_file(),
1062        ]);
1063    }
1064
1065    #[test]
1066    fn delete_self_file() {
1067        let tmpdir = testdir();
1068        let (mut watcher, rx) = watcher();
1069        let path = tmpdir.path().join("entry");
1070        std::fs::File::create_new(&path).expect("Unable to create");
1071
1072        watcher.watch_nonrecursively(&path);
1073        watcher.watcher.wait_next_scan().expect("wait next scan");
1074
1075        std::fs::remove_file(&path).expect("Unable to remove");
1076
1077        rx.sleep_while_exists(&path);
1078        rx.wait_ordered_exact([
1079            expected(&path).modify_data_any().optional(),
1080            expected(&path).remove_file(),
1081        ]);
1082
1083        std::fs::write(&path, "").expect("write");
1084
1085        rx.sleep_until_exists(&path);
1086        rx.wait_ordered_exact([expected(&path).create_file()]);
1087    }
1088
1089    #[test]
1090    fn delete_self_file_no_track() {
1091        let tmpdir = testdir();
1092        let (mut watcher, rx) = watcher();
1093        let path = tmpdir.path().join("entry");
1094        std::fs::File::create_new(&path).expect("Unable to create");
1095
1096        watcher.watch(
1097            &path,
1098            WatchMode {
1099                recursive_mode: RecursiveMode::NonRecursive,
1100                target_mode: TargetMode::NoTrack,
1101            },
1102        );
1103        watcher.watcher.wait_next_scan().expect("wait next scan");
1104
1105        std::fs::remove_file(&path).expect("Unable to remove");
1106
1107        rx.sleep_while_exists(&path);
1108        rx.wait_ordered_exact([
1109            expected(&path).modify_data_any().optional(),
1110            expected(&path).remove_file(),
1111        ]);
1112
1113        std::fs::write(&path, "").expect("write");
1114
1115        rx.ensure_empty_with_wait();
1116    }
1117
1118    #[test]
1119    fn create_write_overwrite() {
1120        let tmpdir = testdir();
1121        let (mut watcher, rx) = watcher();
1122        let overwritten_file = tmpdir.path().join("overwritten_file");
1123        let overwriting_file = tmpdir.path().join("overwriting_file");
1124        std::fs::write(&overwritten_file, "123").expect("write1");
1125
1126        watcher.watch_nonrecursively(&tmpdir);
1127        watcher.watcher.wait_next_scan().expect("wait next scan");
1128
1129        std::fs::File::create(&overwriting_file).expect("create");
1130        std::fs::write(&overwriting_file, "321").expect("write2");
1131        std::fs::rename(&overwriting_file, &overwritten_file).expect("rename");
1132
1133        rx.sleep_while_exists(&overwriting_file);
1134        assert!(
1135            rx.sleep_until(
1136                || std::fs::read_to_string(&overwritten_file).is_ok_and(|cnt| cnt == "321")
1137            ),
1138            "file {overwritten_file:?} was not replaced"
1139        );
1140
1141        rx.wait_unordered([expected(&overwritten_file).modify_data_any()]);
1142    }
1143}