notify/
poll.rs

1//! Generic Watcher implementation based on polling
2//!
3//! Checks the `watch`ed paths periodically to detect changes. This implementation only uses
4//! Rust stdlib APIs and should work on all of the platforms it supports.
5
6use crate::{Config, Error, EventHandler, Receiver, Sender, WatchMode, Watcher, unbounded};
7use std::{
8    collections::HashMap,
9    path::{Path, PathBuf},
10    sync::{
11        Arc, Mutex,
12        atomic::{AtomicBool, Ordering},
13        mpsc,
14    },
15    thread,
16    time::Duration,
17};
18
19/// Event sent for registered handlers on initial directory scans
20pub type ScanEvent = crate::Result<PathBuf>;
21
22/// Handler trait for receivers of [`ScanEvent`].
23/// Very much the same as [`EventHandler`], but including the Result.
24///
25/// See the full example for more information.
26pub trait ScanEventHandler: Send + 'static {
27    /// Handles an event.
28    fn handle_event(&mut self, event: ScanEvent);
29}
30
31impl<F> ScanEventHandler for F
32where
33    F: FnMut(ScanEvent) + Send + 'static,
34{
35    fn handle_event(&mut self, event: ScanEvent) {
36        (self)(event);
37    }
38}
39
40#[cfg(feature = "crossbeam-channel")]
41impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
42    fn handle_event(&mut self, event: ScanEvent) {
43        let result = self.send(event);
44        if let Err(e) = result {
45            tracing::error!(?e, "failed to send scan event result");
46        }
47    }
48}
49
50#[cfg(feature = "flume")]
51impl ScanEventHandler for flume::Sender<ScanEvent> {
52    fn handle_event(&mut self, event: ScanEvent) {
53        let result = self.send(event);
54        if let Err(e) = result {
55            tracing::error!(?e, "failed to send scan event result");
56        }
57    }
58}
59
60impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
61    fn handle_event(&mut self, event: ScanEvent) {
62        let result = self.send(event);
63        if let Err(e) = result {
64            tracing::error!(?e, "failed to send scan event result");
65        }
66    }
67}
68
69impl ScanEventHandler for () {
70    fn handle_event(&mut self, _event: ScanEvent) {}
71}
72
73use data::{DataBuilder, WatchData};
74mod data {
75    use crate::{
76        EventHandler,
77        event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
78    };
79    use std::{
80        cell::RefCell,
81        collections::{HashMap, hash_map::RandomState},
82        fmt::{self, Debug},
83        fs::{self, File, Metadata},
84        hash::{BuildHasher, Hasher},
85        io::{self, Read},
86        path::{Path, PathBuf},
87        time::Instant,
88    };
89    use walkdir::WalkDir;
90
91    use super::ScanEventHandler;
92
93    fn system_time_to_seconds(time: std::time::SystemTime) -> i64 {
94        match time.duration_since(std::time::SystemTime::UNIX_EPOCH) {
95            Ok(d) => d.as_secs() as i64,
96            Err(e) => -(e.duration().as_secs() as i64),
97        }
98    }
99
100    /// Builder for [`WatchData`] & [`PathData`].
101    pub(super) struct DataBuilder {
102        emitter: EventEmitter,
103        scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
104
105        // TODO: May allow user setup their custom BuildHasher / BuildHasherDefault
106        // in future.
107        build_hasher: Option<RandomState>,
108
109        // current timestamp for building Data.
110        now: Instant,
111    }
112
113    impl DataBuilder {
114        pub(super) fn new<F, G>(
115            event_handler: F,
116            compare_content: bool,
117            scan_emitter: Option<G>,
118        ) -> Self
119        where
120            F: EventHandler,
121            G: ScanEventHandler,
122        {
123            let scan_emitter = match scan_emitter {
124                None => None,
125                Some(v) => {
126                    // workaround for a weird type resolution bug when directly going to dyn Trait
127                    let intermediate: Box<RefCell<dyn ScanEventHandler>> =
128                        Box::new(RefCell::new(v));
129                    Some(intermediate)
130                }
131            };
132            Self {
133                emitter: EventEmitter::new(event_handler),
134                scan_emitter,
135                build_hasher: compare_content.then(RandomState::default),
136                now: Instant::now(),
137            }
138        }
139
140        /// Update internal timestamp.
141        pub(super) fn update_timestamp(&mut self) {
142            self.now = Instant::now();
143        }
144
145        /// Create [`WatchData`].
146        ///
147        /// This function will return `Err(_)` if can not retrieve metadata from
148        /// the path location. (e.g., not found).
149        pub(super) fn build_watch_data(
150            &self,
151            root: PathBuf,
152            is_recursive: bool,
153            follow_symlinks: bool,
154        ) -> Option<WatchData> {
155            WatchData::new(self, root, is_recursive, follow_symlinks)
156        }
157
158        /// Create [`PathData`].
159        fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
160            PathData::new(self, meta_path)
161        }
162    }
163
164    impl Debug for DataBuilder {
165        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
166            f.debug_struct("DataBuilder")
167                .field("build_hasher", &self.build_hasher)
168                .field("now", &self.now)
169                .finish()
170        }
171    }
172
173    #[derive(Debug)]
174    pub(super) struct WatchData {
175        // config part, won't change.
176        root: PathBuf,
177        is_recursive: bool,
178        follow_symlinks: bool,
179
180        // current status part.
181        all_path_data: HashMap<PathBuf, PathData>,
182    }
183
184    impl WatchData {
185        /// Scan filesystem and create a new `WatchData`.
186        ///
187        /// # Side effect
188        ///
189        /// This function may send event by `data_builder.emitter`.
190        fn new(
191            data_builder: &DataBuilder,
192            root: PathBuf,
193            is_recursive: bool,
194            follow_symlinks: bool,
195        ) -> Option<Self> {
196            // If metadata read error at `root` path, it will emit
197            // a error event and stop to create the whole `WatchData`.
198            //
199            // QUESTION: inconsistent?
200            //
201            // When user try to *CREATE* a watch by `poll_watcher.watch(root, ..)`,
202            // if `root` path hit an io error, then watcher will reject to
203            // create this new watch.
204            //
205            // This may inconsistent with *POLLING* a watch. When watcher
206            // continue polling, io error at root path will not delete
207            // a existing watch. polling still working.
208            //
209            // So, consider a config file may not exists at first time but may
210            // create after a while, developer cannot watch it.
211            //
212            // FIXME: Can we always allow to watch a path, even file not
213            // found at this path?
214            if let Err(e) = fs::metadata(&root) {
215                data_builder.emitter.emit_io_err(e, Some(&root));
216                return None;
217            }
218
219            let all_path_data = Self::scan_all_path_data(
220                data_builder,
221                root.clone(),
222                is_recursive,
223                follow_symlinks,
224                true,
225            )
226            .collect();
227
228            Some(Self {
229                root,
230                is_recursive,
231                follow_symlinks,
232                all_path_data,
233            })
234        }
235
236        /// Rescan filesystem and update this `WatchData`.
237        ///
238        /// # Side effect
239        ///
240        /// This function may emit event by `data_builder.emitter`.
241        pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) {
242            // scan current filesystem.
243            for (path, new_path_data) in Self::scan_all_path_data(
244                data_builder,
245                self.root.clone(),
246                self.is_recursive,
247                self.follow_symlinks,
248                false,
249            ) {
250                let old_path_data = self
251                    .all_path_data
252                    .insert(path.clone(), new_path_data.clone());
253
254                // emit event
255                let event =
256                    PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data));
257                if let Some(event) = event {
258                    data_builder.emitter.emit_ok(event);
259                }
260            }
261
262            // scan for disappeared paths.
263            let mut disappeared_paths = Vec::new();
264            for (path, path_data) in self.all_path_data.iter() {
265                if path_data.last_check < data_builder.now {
266                    disappeared_paths.push(path.clone());
267                }
268            }
269
270            // remove disappeared paths
271            for path in disappeared_paths {
272                let old_path_data = self.all_path_data.remove(&path);
273
274                // emit event
275                let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
276                if let Some(event) = event {
277                    data_builder.emitter.emit_ok(event);
278                }
279            }
280        }
281
282        /// Get all `PathData` by given configuration.
283        ///
284        /// # Side Effect
285        ///
286        /// This function may emit some IO Error events by `data_builder.emitter`.
287        fn scan_all_path_data(
288            data_builder: &'_ DataBuilder,
289            root: PathBuf,
290            is_recursive: bool,
291            follow_symlinks: bool,
292            // whether this is an initial scan, used only for events
293            is_initial: bool,
294        ) -> impl Iterator<Item = (PathBuf, PathData)> + '_ {
295            tracing::trace!("rescanning {root:?}");
296            // WalkDir return only one entry if root is a file (not a folder),
297            // so we can use single logic to do the both file & dir's jobs.
298            //
299            // See: https://docs.rs/walkdir/2.0.1/walkdir/struct.WalkDir.html#method.new
300            WalkDir::new(root)
301                .follow_links(follow_symlinks)
302                .max_depth(Self::dir_scan_depth(is_recursive))
303                .into_iter()
304                .filter_map(|entry_res| match entry_res {
305                    Ok(entry) => Some(entry),
306                    Err(err) => {
307                        tracing::warn!("walkdir error scanning {err:?}");
308
309                        if let Some(io_error) = err.io_error() {
310                            // clone an io::Error, so we have to create a new one.
311                            let new_io_error = io::Error::new(io_error.kind(), err.to_string());
312                            data_builder.emitter.emit_io_err(new_io_error, err.path());
313                        } else {
314                            let crate_err =
315                                crate::Error::new(crate::ErrorKind::Generic(err.to_string()));
316                            data_builder.emitter.emit(Err(crate_err));
317                        }
318                        None
319                    }
320                })
321                .filter_map(move |entry| match entry.metadata() {
322                    Ok(metadata) => {
323                        let path = entry.into_path();
324                        if is_initial {
325                            // emit initial scans
326                            if let Some(ref emitter) = data_builder.scan_emitter {
327                                emitter.borrow_mut().handle_event(Ok(path.clone()));
328                            }
329                        }
330                        let meta_path = MetaPath::from_parts_unchecked(path, metadata);
331                        let data_path = data_builder.build_path_data(&meta_path);
332
333                        Some((meta_path.into_path(), data_path))
334                    }
335                    Err(e) => {
336                        // emit event.
337                        let path = entry.into_path();
338                        data_builder.emitter.emit_io_err(e, Some(path));
339
340                        None
341                    }
342                })
343        }
344
345        fn dir_scan_depth(is_recursive: bool) -> usize {
346            if is_recursive { usize::MAX } else { 1 }
347        }
348    }
349
350    /// Stored data for a one path locations.
351    ///
352    /// See [`WatchData`] for more detail.
353    #[derive(Debug, Clone)]
354    struct PathData {
355        /// File updated time.
356        mtime: i64,
357
358        /// Content's hash value, only available if user request compare file
359        /// contents and read successful.
360        hash: Option<u64>,
361
362        /// Checked time.
363        last_check: Instant,
364    }
365
366    impl PathData {
367        /// Create a new `PathData`.
368        fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
369            let metadata = meta_path.metadata();
370
371            PathData {
372                mtime: metadata.modified().map_or(0, system_time_to_seconds),
373                hash: data_builder
374                    .build_hasher
375                    .as_ref()
376                    .filter(|_| metadata.is_file())
377                    .and_then(|build_hasher| {
378                        Self::get_content_hash(build_hasher, meta_path.path()).ok()
379                    }),
380
381                last_check: data_builder.now,
382            }
383        }
384
385        /// Get hash value for the data content in given file `path`.
386        fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
387            let mut hasher = build_hasher.build_hasher();
388            let mut file = File::open(path)?;
389            let mut buf = [0; 512];
390
391            loop {
392                let n = match file.read(&mut buf) {
393                    Ok(0) => break,
394                    Ok(len) => len,
395                    Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
396                    Err(e) => return Err(e),
397                };
398
399                hasher.write(&buf[..n]);
400            }
401
402            Ok(hasher.finish())
403        }
404
405        /// Get [`Event`] by compare two optional [`PathData`].
406        fn compare_to_event<P>(
407            path: P,
408            old: Option<&PathData>,
409            new: Option<&PathData>,
410        ) -> Option<Event>
411        where
412            P: Into<PathBuf>,
413        {
414            match (old, new) {
415                (Some(old), Some(new)) => {
416                    if new.mtime > old.mtime {
417                        Some(EventKind::Modify(ModifyKind::Metadata(
418                            MetadataKind::WriteTime,
419                        )))
420                    } else if new.hash != old.hash {
421                        Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
422                    } else {
423                        None
424                    }
425                }
426                (None, Some(_new)) => Some(EventKind::Create(CreateKind::Any)),
427                (Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)),
428                (None, None) => None,
429            }
430            .map(|event_kind| Event::new(event_kind).add_path(path.into()))
431        }
432    }
433
434    /// Compose path and its metadata.
435    ///
436    /// This data structure designed for make sure path and its metadata can be
437    /// transferred in consistent way, and may avoid some duplicated
438    /// `fs::metadata()` function call in some situations.
439    #[derive(Debug)]
440    pub(super) struct MetaPath {
441        path: PathBuf,
442        metadata: Metadata,
443    }
444
445    impl MetaPath {
446        /// Create `MetaPath` by given parts.
447        ///
448        /// # Invariant
449        ///
450        /// User must make sure the input `metadata` are associated with `path`.
451        fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
452            Self { path, metadata }
453        }
454
455        fn path(&self) -> &Path {
456            &self.path
457        }
458
459        fn metadata(&self) -> &Metadata {
460            &self.metadata
461        }
462
463        fn into_path(self) -> PathBuf {
464            self.path
465        }
466    }
467
468    /// Thin wrapper for outer event handler, for easy to use.
469    struct EventEmitter(
470        // Use `RefCell` to make sure `emit()` only need shared borrow of self (&self).
471        // Use `Box` to make sure EventEmitter is Sized.
472        Box<RefCell<dyn EventHandler>>,
473    );
474
475    impl EventEmitter {
476        fn new<F: EventHandler>(event_handler: F) -> Self {
477            Self(Box::new(RefCell::new(event_handler)))
478        }
479
480        /// Emit single event.
481        fn emit(&self, event: crate::Result<Event>) {
482            self.0.borrow_mut().handle_event(event);
483        }
484
485        /// Emit event.
486        fn emit_ok(&self, event: Event) {
487            self.emit(Ok(event))
488        }
489
490        /// Emit io error event.
491        fn emit_io_err<E, P>(&self, err: E, path: Option<P>)
492        where
493            E: Into<io::Error>,
494            P: Into<PathBuf>,
495        {
496            let e = crate::Error::io(err.into());
497            if let Some(path) = path {
498                self.emit(Err(e.add_path(path.into())));
499            } else {
500                self.emit(Err(e));
501            }
502        }
503    }
504}
505
506/// Polling based `Watcher` implementation.
507///
508/// By default scans through all files and checks for changed entries based on their change date.
509/// Can also be changed to perform file content change checks.
510///
511/// See [Config] for more details.
512#[derive(Debug)]
513pub struct PollWatcher {
514    watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
515    data_builder: Arc<Mutex<DataBuilder>>,
516    want_to_stop: Arc<AtomicBool>,
517    /// channel to the poll loop
518    /// currently used only for manual polling
519    message_channel: Sender<()>,
520    delay: Option<Duration>,
521    follow_sylinks: bool,
522}
523
524impl PollWatcher {
525    /// Create a new [`PollWatcher`], configured as needed.
526    pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
527        Self::with_opt::<_, ()>(event_handler, config, None)
528    }
529
530    /// Actively poll for changes. Can be combined with a timeout of 0 to perform only manual polling.
531    pub fn poll(&self) -> crate::Result<()> {
532        self.message_channel
533            .send(())
534            .map_err(|_| Error::generic("failed to send poll message"))?;
535        Ok(())
536    }
537
538    /// Returns a sender to initiate changes detection.
539    #[cfg(test)]
540    pub(crate) fn poll_sender(&self) -> Sender<()> {
541        self.message_channel.clone()
542    }
543
544    /// Create a new [`PollWatcher`] with an scan event handler.
545    ///
546    /// `scan_fallback` is called on the initial scan with all files seen by the pollwatcher.
547    pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
548        event_handler: F,
549        config: Config,
550        scan_callback: G,
551    ) -> crate::Result<PollWatcher> {
552        Self::with_opt(event_handler, config, Some(scan_callback))
553    }
554
555    /// create a new [`PollWatcher`] with all options.
556    fn with_opt<F: EventHandler, G: ScanEventHandler>(
557        event_handler: F,
558        config: Config,
559        scan_callback: Option<G>,
560    ) -> crate::Result<PollWatcher> {
561        let data_builder =
562            DataBuilder::new(event_handler, config.compare_contents(), scan_callback);
563
564        let (tx, rx) = unbounded();
565
566        let poll_watcher = PollWatcher {
567            watches: Default::default(),
568            data_builder: Arc::new(Mutex::new(data_builder)),
569            want_to_stop: Arc::new(AtomicBool::new(false)),
570            delay: config.poll_interval(),
571            follow_sylinks: config.follow_symlinks(),
572            message_channel: tx,
573        };
574
575        poll_watcher.run(rx);
576
577        Ok(poll_watcher)
578    }
579
580    fn run(&self, rx: Receiver<()>) {
581        let watches = Arc::clone(&self.watches);
582        let data_builder = Arc::clone(&self.data_builder);
583        let want_to_stop = Arc::clone(&self.want_to_stop);
584        let delay = self.delay;
585
586        let result = thread::Builder::new()
587            .name("notify-rs poll loop".to_string())
588            .spawn(move || {
589                loop {
590                    if want_to_stop.load(Ordering::SeqCst) {
591                        break;
592                    }
593
594                    // HINT: Make sure always lock in the same order to avoid deadlock.
595                    //
596                    // FIXME: inconsistent: some place mutex poison cause panic,
597                    // some place just ignore.
598                    if let (Ok(mut watches), Ok(mut data_builder)) =
599                        (watches.lock(), data_builder.lock())
600                    {
601                        data_builder.update_timestamp();
602
603                        let vals = watches.values_mut();
604                        for watch_data in vals {
605                            watch_data.rescan(&mut data_builder);
606                        }
607                    }
608                    // TODO: v7.0 use delay - (Instant::now().saturating_duration_since(start))
609                    let result = if let Some(delay) = delay {
610                        rx.recv_timeout(delay).or_else(|e| match e {
611                            mpsc::RecvTimeoutError::Timeout => Ok(()),
612                            mpsc::RecvTimeoutError::Disconnected => Err(mpsc::RecvError),
613                        })
614                    } else {
615                        rx.recv()
616                    };
617                    if let Err(e) = result {
618                        tracing::error!(?e, "failed to receive poll message");
619                    }
620                }
621            });
622        if let Err(e) = result {
623            tracing::error!(?e, "failed to start poll watcher thread");
624        }
625    }
626
627    /// Watch a path location.
628    ///
629    /// QUESTION: this function never return an Error, is it as intend?
630    /// Please also consider the IO Error event problem.
631    fn watch_inner(&mut self, path: &Path, watch_mode: WatchMode) {
632        // HINT: Make sure always lock in the same order to avoid deadlock.
633        //
634        // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
635        if let (Ok(mut watches), Ok(mut data_builder)) =
636            (self.watches.lock(), self.data_builder.lock())
637        {
638            data_builder.update_timestamp();
639
640            let watch_data = data_builder.build_watch_data(
641                path.to_path_buf(),
642                watch_mode.recursive_mode.is_recursive(),
643                self.follow_sylinks,
644            );
645
646            // if create watch_data successful, add it to watching list.
647            if let Some(watch_data) = watch_data {
648                watches.insert(path.to_path_buf(), watch_data);
649            }
650        }
651    }
652
653    /// Unwatch a path.
654    ///
655    /// Return `Err(_)` if given path has't be monitored.
656    fn unwatch_inner(&mut self, path: &Path) -> crate::Result<()> {
657        // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
658        self.watches
659            .lock()
660            .unwrap()
661            .remove(path)
662            .map(|_| ())
663            .ok_or_else(crate::Error::watch_not_found)
664    }
665}
666
667impl Watcher for PollWatcher {
668    /// Create a new [`PollWatcher`].
669    #[tracing::instrument(level = "debug", skip(event_handler))]
670    fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
671        Self::new(event_handler, config)
672    }
673
674    #[tracing::instrument(level = "debug", skip(self))]
675    fn watch(&mut self, path: &Path, watch_mode: WatchMode) -> crate::Result<()> {
676        self.watch_inner(path, watch_mode);
677
678        Ok(())
679    }
680
681    #[tracing::instrument(level = "debug", skip(self))]
682    fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
683        self.unwatch_inner(path)
684    }
685
686    fn kind() -> crate::WatcherKind {
687        crate::WatcherKind::PollWatcher
688    }
689}
690
691impl Drop for PollWatcher {
692    fn drop(&mut self) {
693        self.want_to_stop.store(true, Ordering::Relaxed);
694    }
695}
696
697#[cfg(test)]
698mod tests {
699    use super::PollWatcher;
700    use crate::{Error, ErrorKind, RecursiveMode, TargetMode, WatchMode, Watcher, test::*};
701
702    fn watcher() -> (TestWatcher<PollWatcher>, Receiver) {
703        poll_watcher_channel()
704    }
705
706    #[test]
707    fn poll_watcher_is_send_and_sync() {
708        fn check<T: Send + Sync>() {}
709        check::<PollWatcher>();
710    }
711
712    #[test]
713    fn create_file() {
714        let tmpdir = testdir();
715        let (mut watcher, mut rx) = watcher();
716        watcher.watch_recursively(&tmpdir);
717
718        let path = tmpdir.path().join("entry");
719        std::fs::File::create_new(&path).expect("Unable to create");
720
721        rx.sleep_until_exists(&path);
722        rx.wait_ordered_exact([expected(&path).create_any()]);
723    }
724
725    #[test]
726    #[ignore = "not implemented"]
727    fn create_self_file() {
728        let tmpdir = testdir();
729        let (mut watcher, mut rx) = watcher();
730
731        let path = tmpdir.path().join("entry");
732
733        watcher.watch_nonrecursively(&path);
734
735        std::fs::File::create_new(&path).expect("create");
736
737        rx.sleep_until_exists(&path);
738        rx.wait_ordered_exact([expected(&path).create_any()]);
739    }
740
741    #[test]
742    #[ignore = "not implemented"]
743    fn create_self_file_no_track() {
744        let tmpdir = testdir();
745        let (mut watcher, _) = watcher();
746
747        let path = tmpdir.path().join("entry");
748
749        let result = watcher.watcher.watch(
750            &path,
751            WatchMode {
752                recursive_mode: RecursiveMode::NonRecursive,
753                target_mode: TargetMode::NoTrack,
754            },
755        );
756        assert!(matches!(
757            result,
758            Err(Error {
759                paths: _,
760                kind: ErrorKind::PathNotFound
761            })
762        ));
763    }
764
765    #[test]
766    #[ignore = "TODO: not implemented"]
767    fn create_self_file_nested() {
768        let tmpdir = testdir();
769        let (mut watcher, mut rx) = watcher();
770
771        let path = tmpdir.path().join("entry/nested");
772
773        watcher.watch_nonrecursively(&path);
774
775        std::fs::create_dir_all(path.parent().unwrap()).expect("create");
776        std::fs::File::create_new(&path).expect("create");
777
778        rx.wait_ordered_exact([expected(&path).create_file()]);
779    }
780
781    #[test]
782    fn create_dir() {
783        let tmpdir = testdir();
784        let (mut watcher, mut rx) = watcher();
785        watcher.watch_recursively(&tmpdir);
786
787        let path = tmpdir.path().join("entry");
788        std::fs::create_dir(&path).expect("Unable to create");
789
790        rx.sleep_until_exists(&path);
791        rx.wait_ordered_exact([expected(&path).create_any()]);
792    }
793
794    #[test]
795    fn modify_file() {
796        let tmpdir = testdir();
797        let (mut watcher, mut rx) = watcher();
798        let path = tmpdir.path().join("entry");
799        std::fs::File::create_new(&path).expect("Unable to create");
800
801        watcher.watch_recursively(&tmpdir);
802        std::fs::write(&path, b"123").expect("Unable to write");
803
804        assert!(
805            rx.sleep_until(|| std::fs::read_to_string(&path).is_ok_and(|content| content == "123")),
806            "the file wasn't modified"
807        );
808        rx.wait_ordered_exact([expected(&path).modify_data_any()]);
809    }
810
811    #[test]
812    fn rename_file() {
813        let tmpdir = testdir();
814        let (mut watcher, mut rx) = watcher();
815        let path = tmpdir.path().join("entry");
816        let new_path = tmpdir.path().join("new_entry");
817        std::fs::File::create_new(&path).expect("Unable to create");
818
819        watcher.watch_recursively(&tmpdir);
820        std::fs::rename(&path, &new_path).expect("Unable to remove");
821
822        rx.sleep_while_exists(&path);
823        rx.sleep_until_exists(&new_path);
824
825        rx.wait_unordered_exact([
826            expected(&path).remove_any(),
827            expected(&new_path).create_any(),
828        ]);
829    }
830
831    #[test]
832    #[ignore = "TODO: not implemented"]
833    fn rename_self_file() {
834        let tmpdir = testdir();
835        let (mut watcher, mut rx) = watcher();
836
837        let path = tmpdir.path().join("entry");
838        std::fs::File::create_new(&path).expect("create");
839
840        watcher.watch_nonrecursively(&path);
841        let new_path = tmpdir.path().join("renamed");
842
843        std::fs::rename(&path, &new_path).expect("rename");
844
845        rx.sleep_while_exists(&path);
846        rx.sleep_until_exists(&new_path);
847
848        rx.wait_unordered_exact([expected(&path).remove_any()])
849            .ensure_no_tail();
850
851        std::fs::rename(&new_path, &path).expect("rename2");
852
853        rx.sleep_until_exists(&new_path);
854        rx.sleep_while_exists(&path);
855
856        rx.wait_unordered_exact([expected(&path).create_any()])
857            .ensure_no_tail();
858    }
859
860    #[test]
861    #[ignore = "TODO: not implemented"]
862    fn rename_self_file_no_track() {
863        let tmpdir = testdir();
864        let (mut watcher, mut rx) = watcher();
865
866        let path = tmpdir.path().join("entry");
867        std::fs::File::create_new(&path).expect("create");
868
869        watcher.watch(
870            &path,
871            WatchMode {
872                recursive_mode: RecursiveMode::NonRecursive,
873                target_mode: TargetMode::NoTrack,
874            },
875        );
876
877        let new_path = tmpdir.path().join("renamed");
878
879        std::fs::rename(&path, &new_path).expect("rename");
880
881        rx.sleep_while_exists(&path);
882        rx.sleep_until_exists(&new_path);
883
884        rx.wait_unordered_exact([expected(&path).remove_any()])
885            .ensure_no_tail();
886
887        let result = watcher.watcher.watch(
888            &path,
889            WatchMode {
890                recursive_mode: RecursiveMode::NonRecursive,
891                target_mode: TargetMode::NoTrack,
892            },
893        );
894        assert!(matches!(
895            result,
896            Err(Error {
897                paths: _,
898                kind: ErrorKind::PathNotFound
899            })
900        ));
901    }
902
903    #[test]
904    fn delete_file() {
905        let tmpdir = testdir();
906        let (mut watcher, mut rx) = watcher();
907        let path = tmpdir.path().join("entry");
908        std::fs::File::create_new(&path).expect("Unable to create");
909
910        watcher.watch_recursively(&tmpdir);
911        std::fs::remove_file(&path).expect("Unable to remove");
912
913        rx.sleep_while_exists(&path);
914        rx.wait_ordered_exact([expected(&path).remove_any()]);
915    }
916
917    #[test]
918    #[ignore = "TODO: not implemented"]
919    fn delete_self_file() {
920        let tmpdir = testdir();
921        let (mut watcher, mut rx) = watcher();
922        let path = tmpdir.path().join("entry");
923        std::fs::File::create_new(&path).expect("Unable to create");
924
925        watcher.watch_nonrecursively(&path);
926
927        std::fs::remove_file(&path).expect("Unable to remove");
928
929        rx.sleep_while_exists(&path);
930        rx.wait_ordered_exact([expected(&path).remove_any()]);
931
932        std::fs::write(&path, "").expect("write");
933
934        rx.sleep_until_exists(&path);
935        rx.wait_ordered_exact([expected(&path).create_file()]);
936    }
937
938    #[test]
939    #[ignore = "TODO: not implemented"]
940    fn delete_self_file_no_track() {
941        let tmpdir = testdir();
942        let (mut watcher, mut rx) = watcher();
943        let path = tmpdir.path().join("entry");
944        std::fs::File::create_new(&path).expect("Unable to create");
945
946        watcher.watch(
947            &path,
948            WatchMode {
949                recursive_mode: RecursiveMode::NonRecursive,
950                target_mode: TargetMode::NoTrack,
951            },
952        );
953
954        std::fs::remove_file(&path).expect("Unable to remove");
955
956        rx.sleep_while_exists(&path);
957        rx.wait_ordered_exact([expected(&path).remove_any()]);
958
959        std::fs::write(&path, "").expect("write");
960
961        rx.ensure_empty_with_wait();
962    }
963
964    #[test]
965    fn create_write_overwrite() {
966        let tmpdir = testdir();
967        let (mut watcher, mut rx) = watcher();
968        let overwritten_file = tmpdir.path().join("overwritten_file");
969        let overwriting_file = tmpdir.path().join("overwriting_file");
970        std::fs::write(&overwritten_file, "123").expect("write1");
971
972        watcher.watch_nonrecursively(&tmpdir);
973
974        std::fs::File::create(&overwriting_file).expect("create");
975        std::fs::write(&overwriting_file, "321").expect("write2");
976        std::fs::rename(&overwriting_file, &overwritten_file).expect("rename");
977
978        rx.sleep_while_exists(&overwriting_file);
979        assert!(
980            rx.sleep_until(
981                || std::fs::read_to_string(&overwritten_file).is_ok_and(|cnt| cnt == "321")
982            ),
983            "file {overwritten_file:?} was not replaced"
984        );
985
986        rx.wait_unordered([expected(&overwritten_file).modify_data_any()]);
987    }
988}