notify_fork/
poll.rs

1//! Generic Watcher implementation based on polling
2//!
3//! Checks the `watch`ed paths periodically to detect changes. This implementation only uses
4//! Rust stdlib APIs and should work on all of the platforms it supports.
5
6use crate::{unbounded, Config, Error, EventHandler, Receiver, RecursiveMode, Sender, Watcher};
7use std::{
8    collections::HashMap,
9    path::{Path, PathBuf},
10    sync::{
11        atomic::{AtomicBool, Ordering},
12        Arc, Mutex,
13    },
14    thread,
15    time::Duration,
16};
17
18/// Event send for registered handler on initial directory scans
19pub type ScanEvent = crate::Result<PathBuf>;
20
21/// Handler trait for receivers of ScanEvent.
22/// Very much the same as [EventHandler], but including the Result.
23///
24/// See the full example for more information.
25pub trait ScanEventHandler: Send + 'static {
26    /// Handles an event.
27    fn handle_event(&mut self, event: ScanEvent);
28}
29
30impl<F> ScanEventHandler for F
31where
32    F: FnMut(ScanEvent) + Send + 'static,
33{
34    fn handle_event(&mut self, event: ScanEvent) {
35        (self)(event);
36    }
37}
38
39#[cfg(feature = "crossbeam-channel")]
40impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
41    fn handle_event(&mut self, event: ScanEvent) {
42        let _ = self.send(event);
43    }
44}
45
46impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
47    fn handle_event(&mut self, event: ScanEvent) {
48        let _ = self.send(event);
49    }
50}
51
52impl ScanEventHandler for () {
53    fn handle_event(&mut self, _event: ScanEvent) {}
54}
55
56use data::{DataBuilder, WatchData};
57mod data {
58    use crate::{
59        event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
60        EventHandler,
61    };
62    use filetime::FileTime;
63    use std::{
64        cell::RefCell,
65        collections::{hash_map::RandomState, HashMap},
66        fmt::{self, Debug},
67        fs::{self, File, Metadata},
68        hash::{BuildHasher, Hasher},
69        io::{self, Read},
70        path::{Path, PathBuf},
71        time::Instant,
72    };
73    use walkdir::WalkDir;
74
75    use super::ScanEventHandler;
76
77    /// Builder for [`WatchData`] & [`PathData`].
78    pub(super) struct DataBuilder {
79        emitter: EventEmitter,
80        scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
81
82        // TODO: May allow user setup their custom BuildHasher / BuildHasherDefault
83        // in future.
84        build_hasher: Option<RandomState>,
85
86        // current timestamp for building Data.
87        now: Instant,
88    }
89
90    impl DataBuilder {
91        pub(super) fn new<F, G>(
92            event_handler: F,
93            compare_content: bool,
94            scan_emitter: Option<G>,
95        ) -> Self
96        where
97            F: EventHandler,
98            G: ScanEventHandler,
99        {
100            let scan_emitter = match scan_emitter {
101                None => None,
102                Some(v) => {
103                    // workaround for a weird type resolution bug when directly going to dyn Trait
104                    let intermediate: Box<RefCell<dyn ScanEventHandler>> =
105                        Box::new(RefCell::new(v));
106                    Some(intermediate)
107                }
108            };
109            Self {
110                emitter: EventEmitter::new(event_handler),
111                scan_emitter,
112                build_hasher: compare_content.then(RandomState::default),
113                now: Instant::now(),
114            }
115        }
116
117        /// Update internal timestamp.
118        pub(super) fn update_timestamp(&mut self) {
119            self.now = Instant::now();
120        }
121
122        /// Create [`WatchData`].
123        ///
124        /// This function will return `Err(_)` if can not retrieve metadata from
125        /// the path location. (e.g., not found).
126        pub(super) fn build_watch_data(
127            &self,
128            root: PathBuf,
129            is_recursive: bool,
130        ) -> Option<WatchData> {
131            WatchData::new(self, root, is_recursive)
132        }
133
134        /// Create [`PathData`].
135        fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
136            PathData::new(self, meta_path)
137        }
138    }
139
140    impl Debug for DataBuilder {
141        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
142            f.debug_struct("DataBuilder")
143                .field("build_hasher", &self.build_hasher)
144                .field("now", &self.now)
145                .finish()
146        }
147    }
148
149    #[derive(Debug)]
150    pub(super) struct WatchData {
151        // config part, won't change.
152        root: PathBuf,
153        is_recursive: bool,
154
155        // current status part.
156        all_path_data: HashMap<PathBuf, PathData>,
157    }
158
159    impl WatchData {
160        /// Scan filesystem and create a new `WatchData`.
161        ///
162        /// # Side effect
163        ///
164        /// This function may send event by `data_builder.emitter`.
165        fn new(data_builder: &DataBuilder, root: PathBuf, is_recursive: bool) -> Option<Self> {
166            // If metadata read error at `root` path, it will emit
167            // a error event and stop to create the whole `WatchData`.
168            //
169            // QUESTION: inconsistent?
170            //
171            // When user try to *CREATE* a watch by `poll_watcher.watch(root, ..)`,
172            // if `root` path hit an io error, then watcher will reject to
173            // create this new watch.
174            //
175            // This may inconsistent with *POLLING* a watch. When watcher
176            // continue polling, io error at root path will not delete
177            // a existing watch. polling still working.
178            //
179            // So, consider a config file may not exists at first time but may
180            // create after a while, developer cannot watch it.
181            //
182            // FIXME: Can we always allow to watch a path, even file not
183            // found at this path?
184            if let Err(e) = fs::metadata(&root) {
185                data_builder.emitter.emit_io_err(e, &root);
186                return None;
187            }
188
189            let all_path_data =
190                Self::scan_all_path_data(data_builder, root.clone(), is_recursive, true).collect();
191
192            Some(Self {
193                root,
194                is_recursive,
195                all_path_data,
196            })
197        }
198
199        /// Rescan filesystem and update this `WatchData`.
200        ///
201        /// # Side effect
202        ///
203        /// This function may emit event by `data_builder.emitter`.
204        pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) {
205            // scan current filesystem.
206            for (path, new_path_data) in
207                Self::scan_all_path_data(data_builder, self.root.clone(), self.is_recursive, false)
208            {
209                let old_path_data = self
210                    .all_path_data
211                    .insert(path.clone(), new_path_data.clone());
212
213                // emit event
214                let event =
215                    PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data));
216                if let Some(event) = event {
217                    data_builder.emitter.emit_ok(event);
218                }
219            }
220
221            // scan for disappeared paths.
222            let mut disappeared_paths = Vec::new();
223            for (path, path_data) in self.all_path_data.iter() {
224                if path_data.last_check < data_builder.now {
225                    disappeared_paths.push(path.clone());
226                }
227            }
228
229            // remove disappeared paths
230            for path in disappeared_paths {
231                let old_path_data = self.all_path_data.remove(&path);
232
233                // emit event
234                let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
235                if let Some(event) = event {
236                    data_builder.emitter.emit_ok(event);
237                }
238            }
239        }
240
241        /// Get all `PathData` by given configuration.
242        ///
243        /// # Side Effect
244        ///
245        /// This function may emit some IO Error events by `data_builder.emitter`.
246        fn scan_all_path_data(
247            data_builder: &'_ DataBuilder,
248            root: PathBuf,
249            is_recursive: bool,
250            // whether this is an initial scan, used only for events
251            is_initial: bool,
252        ) -> impl Iterator<Item = (PathBuf, PathData)> + '_ {
253            log::trace!("rescanning {root:?}");
254            // WalkDir return only one entry if root is a file (not a folder),
255            // so we can use single logic to do the both file & dir's jobs.
256            //
257            // See: https://docs.rs/walkdir/2.0.1/walkdir/struct.WalkDir.html#method.new
258            WalkDir::new(root)
259                .follow_links(true)
260                .max_depth(Self::dir_scan_depth(is_recursive))
261                .into_iter()
262                //
263                // QUESTION: should we ignore IO Error?
264                //
265                // current implementation ignore some IO error, e.g.,
266                //
267                // - `.filter_map(|entry| entry.ok())`
268                // - all read error when hashing
269                //
270                // but the code also interest with `fs::metadata()` error and
271                // propagate to event handler. It may not consistent.
272                //
273                // FIXME: Should we emit all IO error events? Or ignore them all?
274                .filter_map(|entry_res| match entry_res {
275                    Ok(entry) => Some(entry),
276                    Err(err) => {
277                        log::warn!("walkdir error scanning {err:?}");
278                        let crate_err =
279                            crate::Error::new(crate::ErrorKind::Generic(err.to_string()));
280                        data_builder.emitter.emit(Err(crate_err));
281                        None
282                    }
283                })
284                .filter_map(move |entry| match entry.metadata() {
285                    Ok(metadata) => {
286                        let path = entry.into_path();
287                        if is_initial {
288                            // emit initial scans
289                            if let Some(ref emitter) = data_builder.scan_emitter {
290                                emitter.borrow_mut().handle_event(Ok(path.clone()));
291                            }
292                        }
293                        let meta_path = MetaPath::from_parts_unchecked(path, metadata);
294                        let data_path = data_builder.build_path_data(&meta_path);
295
296                        Some((meta_path.into_path(), data_path))
297                    }
298                    Err(e) => {
299                        // emit event.
300                        let path = entry.into_path();
301                        data_builder.emitter.emit_io_err(e, path);
302
303                        None
304                    }
305                })
306        }
307
308        fn dir_scan_depth(is_recursive: bool) -> usize {
309            if is_recursive {
310                usize::MAX
311            } else {
312                1
313            }
314        }
315    }
316
317    /// Stored data for a one path locations.
318    ///
319    /// See [`WatchData`] for more detail.
320    #[derive(Debug, Clone)]
321    struct PathData {
322        /// File updated time.
323        mtime: i64,
324
325        /// Content's hash value, only available if user request compare file
326        /// contents and read successful.
327        hash: Option<u64>,
328
329        /// Checked time.
330        last_check: Instant,
331    }
332
333    impl PathData {
334        /// Create a new `PathData`.
335        fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
336            let metadata = meta_path.metadata();
337
338            PathData {
339                mtime: FileTime::from_last_modification_time(metadata).seconds(),
340                hash: data_builder
341                    .build_hasher
342                    .as_ref()
343                    .filter(|_| metadata.is_file())
344                    .and_then(|build_hasher| {
345                        Self::get_content_hash(build_hasher, meta_path.path()).ok()
346                    }),
347
348                last_check: data_builder.now,
349            }
350        }
351
352        /// Get hash value for the data content in given file `path`.
353        fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
354            let mut hasher = build_hasher.build_hasher();
355            let mut file = File::open(path)?;
356            let mut buf = [0; 512];
357
358            loop {
359                let n = match file.read(&mut buf) {
360                    Ok(0) => break,
361                    Ok(len) => len,
362                    Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
363                    Err(e) => return Err(e),
364                };
365
366                hasher.write(&buf[..n]);
367            }
368
369            Ok(hasher.finish())
370        }
371
372        /// Get [`Event`] by compare two optional [`PathData`].
373        fn compare_to_event<P>(
374            path: P,
375            old: Option<&PathData>,
376            new: Option<&PathData>,
377        ) -> Option<Event>
378        where
379            P: Into<PathBuf>,
380        {
381            match (old, new) {
382                (Some(old), Some(new)) => {
383                    if new.mtime > old.mtime {
384                        Some(EventKind::Modify(ModifyKind::Metadata(
385                            MetadataKind::WriteTime,
386                        )))
387                    } else if new.hash != old.hash {
388                        Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
389                    } else {
390                        None
391                    }
392                }
393                (None, Some(_new)) => Some(EventKind::Create(CreateKind::Any)),
394                (Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)),
395                (None, None) => None,
396            }
397            .map(|event_kind| Event::new(event_kind).add_path(path.into()))
398        }
399    }
400
401    /// Compose path and its metadata.
402    ///
403    /// This data structure designed for make sure path and its metadata can be
404    /// transferred in consistent way, and may avoid some duplicated
405    /// `fs::metadata()` function call in some situations.
406    #[derive(Debug)]
407    pub(super) struct MetaPath {
408        path: PathBuf,
409        metadata: Metadata,
410    }
411
412    impl MetaPath {
413        /// Create `MetaPath` by given parts.
414        ///
415        /// # Invariant
416        ///
417        /// User must make sure the input `metadata` are associated with `path`.
418        fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
419            Self { path, metadata }
420        }
421
422        fn path(&self) -> &Path {
423            &self.path
424        }
425
426        fn metadata(&self) -> &Metadata {
427            &self.metadata
428        }
429
430        fn into_path(self) -> PathBuf {
431            self.path
432        }
433    }
434
435    /// Thin wrapper for outer event handler, for easy to use.
436    struct EventEmitter(
437        // Use `RefCell` to make sure `emit()` only need shared borrow of self (&self).
438        // Use `Box` to make sure EventEmitter is Sized.
439        Box<RefCell<dyn EventHandler>>,
440    );
441
442    impl EventEmitter {
443        fn new<F: EventHandler>(event_handler: F) -> Self {
444            Self(Box::new(RefCell::new(event_handler)))
445        }
446
447        /// Emit single event.
448        fn emit(&self, event: crate::Result<Event>) {
449            self.0.borrow_mut().handle_event(event);
450        }
451
452        /// Emit event.
453        fn emit_ok(&self, event: Event) {
454            self.emit(Ok(event))
455        }
456
457        /// Emit io error event.
458        fn emit_io_err<E, P>(&self, err: E, path: P)
459        where
460            E: Into<io::Error>,
461            P: Into<PathBuf>,
462        {
463            self.emit(Err(crate::Error::io(err.into()).add_path(path.into())))
464        }
465    }
466}
467
468/// Polling based `Watcher` implementation.
469///
470/// By default scans through all files and checks for changed entries based on their change date.
471/// Can also be changed to perform file content change checks.
472///
473/// See [Config] for more details.
474#[derive(Debug)]
475pub struct PollWatcher {
476    watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
477    data_builder: Arc<Mutex<DataBuilder>>,
478    want_to_stop: Arc<AtomicBool>,
479    /// channel to the poll loop
480    /// currently used only for manual polling
481    message_channel: Sender<()>,
482    delay: Option<Duration>,
483}
484
485impl PollWatcher {
486    /// Create a new [PollWatcher], configured as needed.
487    pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
488        Self::with_opt::<_, ()>(event_handler, config, None)
489    }
490
491    /// Actively poll for changes. Can be combined with a timeout of 0 to perform only manual polling.
492    pub fn poll(&self) -> crate::Result<()> {
493        self.message_channel
494            .send(())
495            .map_err(|_| Error::generic("failed to send poll message"))?;
496        Ok(())
497    }
498
499    /// Create a new [PollWatcher] with an scan event handler.
500    ///
501    /// `scan_fallback` is called on the initial scan with all files seen by the pollwatcher.
502    pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
503        event_handler: F,
504        config: Config,
505        scan_callback: G,
506    ) -> crate::Result<PollWatcher> {
507        Self::with_opt(event_handler, config, Some(scan_callback))
508    }
509
510    /// create a new PollWatcher with all options
511    fn with_opt<F: EventHandler, G: ScanEventHandler>(
512        event_handler: F,
513        config: Config,
514        scan_callback: Option<G>,
515    ) -> crate::Result<PollWatcher> {
516        let data_builder =
517            DataBuilder::new(event_handler, config.compare_contents(), scan_callback);
518
519        let (tx, rx) = unbounded();
520
521        let poll_watcher = PollWatcher {
522            watches: Default::default(),
523            data_builder: Arc::new(Mutex::new(data_builder)),
524            want_to_stop: Arc::new(AtomicBool::new(false)),
525            delay: config.poll_interval(),
526            message_channel: tx,
527        };
528
529        poll_watcher.run(rx);
530
531        Ok(poll_watcher)
532    }
533
534    fn run(&self, rx: Receiver<()>) {
535        let watches = Arc::clone(&self.watches);
536        let data_builder = Arc::clone(&self.data_builder);
537        let want_to_stop = Arc::clone(&self.want_to_stop);
538        let delay = self.delay;
539
540        let _ = thread::Builder::new()
541            .name("notify-rs poll loop".to_string())
542            .spawn(move || {
543                loop {
544                    if want_to_stop.load(Ordering::SeqCst) {
545                        break;
546                    }
547
548                    // HINT: Make sure always lock in the same order to avoid deadlock.
549                    //
550                    // FIXME: inconsistent: some place mutex poison cause panic,
551                    // some place just ignore.
552                    if let (Ok(mut watches), Ok(mut data_builder)) =
553                        (watches.lock(), data_builder.lock())
554                    {
555                        data_builder.update_timestamp();
556
557                        let vals = watches.values_mut();
558                        for watch_data in vals {
559                            watch_data.rescan(&mut data_builder);
560                        }
561                    }
562                    // TODO: v7.0 use delay - (Instant::now().saturating_duration_since(start))
563                    if let Some(delay) = delay {
564                        let _ = rx.recv_timeout(delay);
565                    } else {
566                        let _ = rx.recv();
567                    }
568                }
569            });
570    }
571
572    /// Watch a path location.
573    ///
574    /// QUESTION: this function never return an Error, is it as intend?
575    /// Please also consider the IO Error event problem.
576    fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) {
577        // HINT: Make sure always lock in the same order to avoid deadlock.
578        //
579        // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
580        if let (Ok(mut watches), Ok(mut data_builder)) =
581            (self.watches.lock(), self.data_builder.lock())
582        {
583            data_builder.update_timestamp();
584
585            let watch_data =
586                data_builder.build_watch_data(path.to_path_buf(), recursive_mode.is_recursive());
587
588            // if create watch_data successful, add it to watching list.
589            if let Some(watch_data) = watch_data {
590                watches.insert(path.to_path_buf(), watch_data);
591            }
592        }
593    }
594
595    /// Unwatch a path.
596    ///
597    /// Return `Err(_)` if given path has't be monitored.
598    fn unwatch_inner(&mut self, path: &Path) -> crate::Result<()> {
599        // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
600        self.watches
601            .lock()
602            .unwrap()
603            .remove(path)
604            .map(|_| ())
605            .ok_or_else(crate::Error::watch_not_found)
606    }
607}
608
609impl Watcher for PollWatcher {
610    /// Create a new [PollWatcher].
611    fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
612        Self::new(event_handler, config)
613    }
614
615    fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> crate::Result<()> {
616        self.watch_inner(path, recursive_mode);
617
618        Ok(())
619    }
620
621    fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
622        self.unwatch_inner(path)
623    }
624
625    fn kind() -> crate::WatcherKind {
626        crate::WatcherKind::PollWatcher
627    }
628}
629
630impl Drop for PollWatcher {
631    fn drop(&mut self) {
632        self.want_to_stop.store(true, Ordering::Relaxed);
633    }
634}
635
636#[test]
637fn poll_watcher_is_send_and_sync() {
638    fn check<T: Send + Sync>() {}
639    check::<PollWatcher>();
640}