1use crate::{unbounded, Config, Error, EventHandler, Receiver, RecursiveMode, Sender, Watcher};
7use std::{
8    collections::HashMap,
9    path::{Path, PathBuf},
10    sync::{
11        atomic::{AtomicBool, Ordering},
12        Arc, Mutex,
13    },
14    thread,
15    time::Duration,
16};
17
18pub type ScanEvent = crate::Result<PathBuf>;
20
21pub trait ScanEventHandler: Send + 'static {
26    fn handle_event(&mut self, event: ScanEvent);
28}
29
30impl<F> ScanEventHandler for F
31where
32    F: FnMut(ScanEvent) + Send + 'static,
33{
34    fn handle_event(&mut self, event: ScanEvent) {
35        (self)(event);
36    }
37}
38
39#[cfg(feature = "crossbeam-channel")]
40impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
41    fn handle_event(&mut self, event: ScanEvent) {
42        let _ = self.send(event);
43    }
44}
45
46#[cfg(feature = "flume")]
47impl ScanEventHandler for flume::Sender<ScanEvent> {
48    fn handle_event(&mut self, event: ScanEvent) {
49        let _ = self.send(event);
50    }
51}
52
53impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
54    fn handle_event(&mut self, event: ScanEvent) {
55        let _ = self.send(event);
56    }
57}
58
59impl ScanEventHandler for () {
60    fn handle_event(&mut self, _event: ScanEvent) {}
61}
62
63use data::{DataBuilder, WatchData};
64mod data {
65    use crate::{
66        event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
67        EventHandler,
68    };
69    use std::{
70        cell::RefCell,
71        collections::{hash_map::RandomState, HashMap},
72        fmt::{self, Debug},
73        fs::{self, File, Metadata},
74        hash::{BuildHasher, Hasher},
75        io::{self, Read},
76        path::{Path, PathBuf},
77        time::Instant,
78    };
79    use walkdir::WalkDir;
80
81    use super::ScanEventHandler;
82
83    fn system_time_to_seconds(time: std::time::SystemTime) -> i64 {
84        match time.duration_since(std::time::SystemTime::UNIX_EPOCH) {
85            Ok(d) => d.as_secs() as i64,
86            Err(e) => -(e.duration().as_secs() as i64),
87        }
88    }
89
90    pub(super) struct DataBuilder {
92        emitter: EventEmitter,
93        scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
94
95        build_hasher: Option<RandomState>,
98
99        now: Instant,
101    }
102
103    impl DataBuilder {
104        pub(super) fn new<F, G>(
105            event_handler: F,
106            compare_content: bool,
107            scan_emitter: Option<G>,
108        ) -> Self
109        where
110            F: EventHandler,
111            G: ScanEventHandler,
112        {
113            let scan_emitter = match scan_emitter {
114                None => None,
115                Some(v) => {
116                    let intermediate: Box<RefCell<dyn ScanEventHandler>> =
118                        Box::new(RefCell::new(v));
119                    Some(intermediate)
120                }
121            };
122            Self {
123                emitter: EventEmitter::new(event_handler),
124                scan_emitter,
125                build_hasher: compare_content.then(RandomState::default),
126                now: Instant::now(),
127            }
128        }
129
130        pub(super) fn update_timestamp(&mut self) {
132            self.now = Instant::now();
133        }
134
135        pub(super) fn build_watch_data(
140            &self,
141            root: PathBuf,
142            is_recursive: bool,
143            follow_symlinks: bool,
144        ) -> Option<WatchData> {
145            WatchData::new(self, root, is_recursive, follow_symlinks)
146        }
147
148        fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
150            PathData::new(self, meta_path)
151        }
152    }
153
154    impl Debug for DataBuilder {
155        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
156            f.debug_struct("DataBuilder")
157                .field("build_hasher", &self.build_hasher)
158                .field("now", &self.now)
159                .finish()
160        }
161    }
162
163    #[derive(Debug)]
164    pub(super) struct WatchData {
165        root: PathBuf,
167        is_recursive: bool,
168        follow_symlinks: bool,
169
170        all_path_data: HashMap<PathBuf, PathData>,
172    }
173
174    impl WatchData {
175        fn new(
181            data_builder: &DataBuilder,
182            root: PathBuf,
183            is_recursive: bool,
184            follow_symlinks: bool,
185        ) -> Option<Self> {
186            if let Err(e) = fs::metadata(&root) {
205                data_builder.emitter.emit_io_err(e, Some(&root));
206                return None;
207            }
208
209            let all_path_data = Self::scan_all_path_data(
210                data_builder,
211                root.clone(),
212                is_recursive,
213                follow_symlinks,
214                true,
215            )
216            .collect();
217
218            Some(Self {
219                root,
220                is_recursive,
221                follow_symlinks,
222                all_path_data,
223            })
224        }
225
226        pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) {
232            for (path, new_path_data) in Self::scan_all_path_data(
234                data_builder,
235                self.root.clone(),
236                self.is_recursive,
237                self.follow_symlinks,
238                false,
239            ) {
240                let old_path_data = self
241                    .all_path_data
242                    .insert(path.clone(), new_path_data.clone());
243
244                let event =
246                    PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data));
247                if let Some(event) = event {
248                    data_builder.emitter.emit_ok(event);
249                }
250            }
251
252            let mut disappeared_paths = Vec::new();
254            for (path, path_data) in self.all_path_data.iter() {
255                if path_data.last_check < data_builder.now {
256                    disappeared_paths.push(path.clone());
257                }
258            }
259
260            for path in disappeared_paths {
262                let old_path_data = self.all_path_data.remove(&path);
263
264                let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
266                if let Some(event) = event {
267                    data_builder.emitter.emit_ok(event);
268                }
269            }
270        }
271
272        fn scan_all_path_data(
278            data_builder: &'_ DataBuilder,
279            root: PathBuf,
280            is_recursive: bool,
281            follow_symlinks: bool,
282            is_initial: bool,
284        ) -> impl Iterator<Item = (PathBuf, PathData)> + '_ {
285            log::trace!("rescanning {root:?}");
286            WalkDir::new(root)
291                .follow_links(follow_symlinks)
292                .max_depth(Self::dir_scan_depth(is_recursive))
293                .into_iter()
294                .filter_map(|entry_res| match entry_res {
295                    Ok(entry) => Some(entry),
296                    Err(err) => {
297                        log::warn!("walkdir error scanning {err:?}");
298                        if let Some(io_error) = err.io_error() {
299                            let new_io_error = io::Error::new(io_error.kind(), err.to_string());
301                            data_builder.emitter.emit_io_err(new_io_error, err.path());
302                        } else {
303                            let crate_err =
304                                crate::Error::new(crate::ErrorKind::Generic(err.to_string()));
305                            data_builder.emitter.emit(Err(crate_err));
306                        }
307                        None
308                    }
309                })
310                .filter_map(move |entry| match entry.metadata() {
311                    Ok(metadata) => {
312                        let path = entry.into_path();
313                        if is_initial {
314                            if let Some(ref emitter) = data_builder.scan_emitter {
316                                emitter.borrow_mut().handle_event(Ok(path.clone()));
317                            }
318                        }
319                        let meta_path = MetaPath::from_parts_unchecked(path, metadata);
320                        let data_path = data_builder.build_path_data(&meta_path);
321
322                        Some((meta_path.into_path(), data_path))
323                    }
324                    Err(e) => {
325                        let path = entry.into_path();
327                        data_builder.emitter.emit_io_err(e, Some(path));
328
329                        None
330                    }
331                })
332        }
333
334        fn dir_scan_depth(is_recursive: bool) -> usize {
335            if is_recursive {
336                usize::MAX
337            } else {
338                1
339            }
340        }
341    }
342
343    #[derive(Debug, Clone)]
347    struct PathData {
348        mtime: i64,
350
351        hash: Option<u64>,
354
355        last_check: Instant,
357    }
358
359    impl PathData {
360        fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
362            let metadata = meta_path.metadata();
363
364            PathData {
365                mtime: metadata.modified().map_or(0, system_time_to_seconds),
366                hash: data_builder
367                    .build_hasher
368                    .as_ref()
369                    .filter(|_| metadata.is_file())
370                    .and_then(|build_hasher| {
371                        Self::get_content_hash(build_hasher, meta_path.path()).ok()
372                    }),
373
374                last_check: data_builder.now,
375            }
376        }
377
378        fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
380            let mut hasher = build_hasher.build_hasher();
381            let mut file = File::open(path)?;
382            let mut buf = [0; 512];
383
384            loop {
385                let n = match file.read(&mut buf) {
386                    Ok(0) => break,
387                    Ok(len) => len,
388                    Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
389                    Err(e) => return Err(e),
390                };
391
392                hasher.write(&buf[..n]);
393            }
394
395            Ok(hasher.finish())
396        }
397
398        fn compare_to_event<P>(
400            path: P,
401            old: Option<&PathData>,
402            new: Option<&PathData>,
403        ) -> Option<Event>
404        where
405            P: Into<PathBuf>,
406        {
407            match (old, new) {
408                (Some(old), Some(new)) => {
409                    if new.mtime > old.mtime {
410                        Some(EventKind::Modify(ModifyKind::Metadata(
411                            MetadataKind::WriteTime,
412                        )))
413                    } else if new.hash != old.hash {
414                        Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
415                    } else {
416                        None
417                    }
418                }
419                (None, Some(_new)) => Some(EventKind::Create(CreateKind::Any)),
420                (Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)),
421                (None, None) => None,
422            }
423            .map(|event_kind| Event::new(event_kind).add_path(path.into()))
424        }
425    }
426
427    #[derive(Debug)]
433    pub(super) struct MetaPath {
434        path: PathBuf,
435        metadata: Metadata,
436    }
437
438    impl MetaPath {
439        fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
445            Self { path, metadata }
446        }
447
448        fn path(&self) -> &Path {
449            &self.path
450        }
451
452        fn metadata(&self) -> &Metadata {
453            &self.metadata
454        }
455
456        fn into_path(self) -> PathBuf {
457            self.path
458        }
459    }
460
461    struct EventEmitter(
463        Box<RefCell<dyn EventHandler>>,
466    );
467
468    impl EventEmitter {
469        fn new<F: EventHandler>(event_handler: F) -> Self {
470            Self(Box::new(RefCell::new(event_handler)))
471        }
472
473        fn emit(&self, event: crate::Result<Event>) {
475            self.0.borrow_mut().handle_event(event);
476        }
477
478        fn emit_ok(&self, event: Event) {
480            self.emit(Ok(event))
481        }
482
483        fn emit_io_err<E, P>(&self, err: E, path: Option<P>)
485        where
486            E: Into<io::Error>,
487            P: Into<PathBuf>,
488        {
489            let e = crate::Error::io(err.into());
490            if let Some(path) = path {
491                self.emit(Err(e.add_path(path.into())));
492            } else {
493                self.emit(Err(e));
494            }
495        }
496    }
497}
498
499#[derive(Debug)]
506pub struct PollWatcher {
507    watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
508    data_builder: Arc<Mutex<DataBuilder>>,
509    want_to_stop: Arc<AtomicBool>,
510    message_channel: Sender<()>,
513    delay: Option<Duration>,
514    follow_sylinks: bool,
515}
516
517impl PollWatcher {
518    pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
520        Self::with_opt::<_, ()>(event_handler, config, None)
521    }
522
523    pub fn poll(&self) -> crate::Result<()> {
525        self.message_channel
526            .send(())
527            .map_err(|_| Error::generic("failed to send poll message"))?;
528        Ok(())
529    }
530
531    pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
535        event_handler: F,
536        config: Config,
537        scan_callback: G,
538    ) -> crate::Result<PollWatcher> {
539        Self::with_opt(event_handler, config, Some(scan_callback))
540    }
541
542    fn with_opt<F: EventHandler, G: ScanEventHandler>(
544        event_handler: F,
545        config: Config,
546        scan_callback: Option<G>,
547    ) -> crate::Result<PollWatcher> {
548        let data_builder =
549            DataBuilder::new(event_handler, config.compare_contents(), scan_callback);
550
551        let (tx, rx) = unbounded();
552
553        let poll_watcher = PollWatcher {
554            watches: Default::default(),
555            data_builder: Arc::new(Mutex::new(data_builder)),
556            want_to_stop: Arc::new(AtomicBool::new(false)),
557            delay: config.poll_interval(),
558            follow_sylinks: config.follow_symlinks(),
559            message_channel: tx,
560        };
561
562        poll_watcher.run(rx);
563
564        Ok(poll_watcher)
565    }
566
567    fn run(&self, rx: Receiver<()>) {
568        let watches = Arc::clone(&self.watches);
569        let data_builder = Arc::clone(&self.data_builder);
570        let want_to_stop = Arc::clone(&self.want_to_stop);
571        let delay = self.delay;
572
573        let _ = thread::Builder::new()
574            .name("notify-rs poll loop".to_string())
575            .spawn(move || {
576                loop {
577                    if want_to_stop.load(Ordering::SeqCst) {
578                        break;
579                    }
580
581                    if let (Ok(mut watches), Ok(mut data_builder)) =
586                        (watches.lock(), data_builder.lock())
587                    {
588                        data_builder.update_timestamp();
589
590                        let vals = watches.values_mut();
591                        for watch_data in vals {
592                            watch_data.rescan(&mut data_builder);
593                        }
594                    }
595                    if let Some(delay) = delay {
597                        let _ = rx.recv_timeout(delay);
598                    } else {
599                        let _ = rx.recv();
600                    }
601                }
602            });
603    }
604
605    fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) {
610        if let (Ok(mut watches), Ok(mut data_builder)) =
614            (self.watches.lock(), self.data_builder.lock())
615        {
616            data_builder.update_timestamp();
617
618            let watch_data = data_builder.build_watch_data(
619                path.to_path_buf(),
620                recursive_mode.is_recursive(),
621                self.follow_sylinks,
622            );
623
624            if let Some(watch_data) = watch_data {
626                watches.insert(path.to_path_buf(), watch_data);
627            }
628        }
629    }
630
631    fn unwatch_inner(&mut self, path: &Path) -> crate::Result<()> {
635        self.watches
637            .lock()
638            .unwrap()
639            .remove(path)
640            .map(|_| ())
641            .ok_or_else(crate::Error::watch_not_found)
642    }
643}
644
645impl Watcher for PollWatcher {
646    fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
648        Self::new(event_handler, config)
649    }
650
651    fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> crate::Result<()> {
652        self.watch_inner(path, recursive_mode);
653
654        Ok(())
655    }
656
657    fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
658        self.unwatch_inner(path)
659    }
660
661    fn kind() -> crate::WatcherKind {
662        crate::WatcherKind::PollWatcher
663    }
664}
665
666impl Drop for PollWatcher {
667    fn drop(&mut self) {
668        self.want_to_stop.store(true, Ordering::Relaxed);
669    }
670}
671
672#[test]
673fn poll_watcher_is_send_and_sync() {
674    fn check<T: Send + Sync>() {}
675    check::<PollWatcher>();
676}