notify/
poll.rs

1//! Generic Watcher implementation based on polling
2//!
3//! Checks the `watch`ed paths periodically to detect changes. This implementation only uses
4//! Rust stdlib APIs and should work on all of the platforms it supports.
5
6use crate::{Config, Error, EventHandler, Receiver, Sender, WatchMode, Watcher, unbounded};
7use std::{
8    collections::HashMap,
9    path::{Path, PathBuf},
10    sync::{
11        Arc, Mutex,
12        atomic::{AtomicBool, Ordering},
13        mpsc,
14    },
15    thread,
16    time::Duration,
17};
18
19/// Event sent for registered handlers on initial directory scans
20pub type ScanEvent = crate::Result<PathBuf>;
21
22/// Handler trait for receivers of [`ScanEvent`].
23/// Very much the same as [`EventHandler`], but including the Result.
24///
25/// See the full example for more information.
26pub trait ScanEventHandler: Send + 'static {
27    /// Handles an event.
28    fn handle_event(&mut self, event: ScanEvent);
29}
30
31impl<F> ScanEventHandler for F
32where
33    F: FnMut(ScanEvent) + Send + 'static,
34{
35    fn handle_event(&mut self, event: ScanEvent) {
36        (self)(event);
37    }
38}
39
40#[cfg(feature = "crossbeam-channel")]
41impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
42    fn handle_event(&mut self, event: ScanEvent) {
43        let result = self.send(event);
44        if let Err(e) = result {
45            tracing::error!(?e, "failed to send scan event result");
46        }
47    }
48}
49
50#[cfg(feature = "flume")]
51impl ScanEventHandler for flume::Sender<ScanEvent> {
52    fn handle_event(&mut self, event: ScanEvent) {
53        let result = self.send(event);
54        if let Err(e) = result {
55            tracing::error!(?e, "failed to send scan event result");
56        }
57    }
58}
59
60impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
61    fn handle_event(&mut self, event: ScanEvent) {
62        let result = self.send(event);
63        if let Err(e) = result {
64            tracing::error!(?e, "failed to send scan event result");
65        }
66    }
67}
68
69impl ScanEventHandler for () {
70    fn handle_event(&mut self, _event: ScanEvent) {}
71}
72
73use data::{DataBuilder, WatchData};
74mod data {
75    use crate::{
76        EventHandler,
77        event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
78    };
79    use std::{
80        cell::RefCell,
81        collections::{HashMap, hash_map::RandomState},
82        fmt::{self, Debug},
83        fs::{self, File, Metadata},
84        hash::{BuildHasher, Hasher},
85        io::{self, Read},
86        path::{Path, PathBuf},
87        time::Instant,
88    };
89    use walkdir::WalkDir;
90
91    use super::ScanEventHandler;
92
93    fn system_time_to_seconds(time: std::time::SystemTime) -> i64 {
94        #[expect(clippy::cast_possible_wrap)]
95        match time.duration_since(std::time::SystemTime::UNIX_EPOCH) {
96            Ok(d) => d.as_secs() as i64,
97            Err(e) => -(e.duration().as_secs() as i64),
98        }
99    }
100
101    /// Builder for [`WatchData`] & [`PathData`].
102    pub(super) struct DataBuilder {
103        emitter: EventEmitter,
104        scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
105
106        // TODO: May allow user setup their custom BuildHasher / BuildHasherDefault
107        // in future.
108        build_hasher: Option<RandomState>,
109
110        // current timestamp for building Data.
111        now: Instant,
112    }
113
114    impl DataBuilder {
115        pub(super) fn new<F, G>(
116            event_handler: F,
117            compare_content: bool,
118            scan_emitter: Option<G>,
119        ) -> Self
120        where
121            F: EventHandler,
122            G: ScanEventHandler,
123        {
124            let scan_emitter = match scan_emitter {
125                None => None,
126                Some(v) => {
127                    // workaround for a weird type resolution bug when directly going to dyn Trait
128                    let intermediate: Box<RefCell<dyn ScanEventHandler>> =
129                        Box::new(RefCell::new(v));
130                    Some(intermediate)
131                }
132            };
133            Self {
134                emitter: EventEmitter::new(event_handler),
135                scan_emitter,
136                build_hasher: compare_content.then(RandomState::default),
137                now: Instant::now(),
138            }
139        }
140
141        /// Update internal timestamp.
142        pub(super) fn update_timestamp(&mut self) {
143            self.now = Instant::now();
144        }
145
146        /// Create [`WatchData`].
147        ///
148        /// This function will return `Err(_)` if can not retrieve metadata from
149        /// the path location. (e.g., not found).
150        pub(super) fn build_watch_data(
151            &self,
152            root: PathBuf,
153            is_recursive: bool,
154            follow_symlinks: bool,
155        ) -> Option<WatchData> {
156            WatchData::new(self, root, is_recursive, follow_symlinks)
157        }
158
159        /// Create [`PathData`].
160        fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
161            PathData::new(self, meta_path)
162        }
163    }
164
165    impl Debug for DataBuilder {
166        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
167            f.debug_struct("DataBuilder")
168                .field("build_hasher", &self.build_hasher)
169                .field("now", &self.now)
170                .finish_non_exhaustive()
171        }
172    }
173
174    #[derive(Debug)]
175    pub(super) struct WatchData {
176        // config part, won't change.
177        root: PathBuf,
178        is_recursive: bool,
179        follow_symlinks: bool,
180
181        // current status part.
182        all_path_data: HashMap<PathBuf, PathData>,
183    }
184
185    impl WatchData {
186        /// Scan filesystem and create a new `WatchData`.
187        ///
188        /// # Side effect
189        ///
190        /// This function may send event by `data_builder.emitter`.
191        fn new(
192            data_builder: &DataBuilder,
193            root: PathBuf,
194            is_recursive: bool,
195            follow_symlinks: bool,
196        ) -> Option<Self> {
197            // If metadata read error at `root` path, it will emit
198            // a error event and stop to create the whole `WatchData`.
199            //
200            // QUESTION: inconsistent?
201            //
202            // When user try to *CREATE* a watch by `poll_watcher.watch(root, ..)`,
203            // if `root` path hit an io error, then watcher will reject to
204            // create this new watch.
205            //
206            // This may inconsistent with *POLLING* a watch. When watcher
207            // continue polling, io error at root path will not delete
208            // a existing watch. polling still working.
209            //
210            // So, consider a config file may not exists at first time but may
211            // create after a while, developer cannot watch it.
212            //
213            // FIXME: Can we always allow to watch a path, even file not
214            // found at this path?
215            if let Err(e) = fs::metadata(&root) {
216                data_builder.emitter.emit_io_err(e, Some(&root));
217                return None;
218            }
219
220            let all_path_data = Self::scan_all_path_data(
221                data_builder,
222                root.clone(),
223                is_recursive,
224                follow_symlinks,
225                true,
226            )
227            .collect();
228
229            Some(Self {
230                root,
231                is_recursive,
232                follow_symlinks,
233                all_path_data,
234            })
235        }
236
237        /// Rescan filesystem and update this `WatchData`.
238        ///
239        /// # Side effect
240        ///
241        /// This function may emit event by `data_builder.emitter`.
242        pub(super) fn rescan(&mut self, data_builder: &DataBuilder) {
243            // scan current filesystem.
244            for (path, new_path_data) in Self::scan_all_path_data(
245                data_builder,
246                self.root.clone(),
247                self.is_recursive,
248                self.follow_symlinks,
249                false,
250            ) {
251                let old_path_data = self
252                    .all_path_data
253                    .insert(path.clone(), new_path_data.clone());
254
255                // emit event
256                let event =
257                    PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data));
258                if let Some(event) = event {
259                    data_builder.emitter.emit_ok(event);
260                }
261            }
262
263            // scan for disappeared paths.
264            let mut disappeared_paths = Vec::new();
265            for (path, path_data) in &self.all_path_data {
266                if path_data.last_check < data_builder.now {
267                    disappeared_paths.push(path.clone());
268                }
269            }
270
271            // remove disappeared paths
272            for path in disappeared_paths {
273                let old_path_data = self.all_path_data.remove(&path);
274
275                // emit event
276                let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
277                if let Some(event) = event {
278                    data_builder.emitter.emit_ok(event);
279                }
280            }
281        }
282
283        /// Get all `PathData` by given configuration.
284        ///
285        /// # Side Effect
286        ///
287        /// This function may emit some IO Error events by `data_builder.emitter`.
288        fn scan_all_path_data(
289            data_builder: &'_ DataBuilder,
290            root: PathBuf,
291            is_recursive: bool,
292            follow_symlinks: bool,
293            // whether this is an initial scan, used only for events
294            is_initial: bool,
295        ) -> impl Iterator<Item = (PathBuf, PathData)> + '_ {
296            tracing::trace!("rescanning {root:?}");
297            // WalkDir return only one entry if root is a file (not a folder),
298            // so we can use single logic to do the both file & dir's jobs.
299            //
300            // See: https://docs.rs/walkdir/2.0.1/walkdir/struct.WalkDir.html#method.new
301            WalkDir::new(root)
302                .follow_links(follow_symlinks)
303                .max_depth(Self::dir_scan_depth(is_recursive))
304                .into_iter()
305                .filter_map(|entry_res| match entry_res {
306                    Ok(entry) => Some(entry),
307                    Err(err) => {
308                        tracing::warn!("walkdir error scanning {err:?}");
309
310                        if let Some(io_error) = err.io_error() {
311                            // clone an io::Error, so we have to create a new one.
312                            let new_io_error = io::Error::new(io_error.kind(), err.to_string());
313                            data_builder.emitter.emit_io_err(new_io_error, err.path());
314                        } else {
315                            let crate_err =
316                                crate::Error::new(crate::ErrorKind::Generic(err.to_string()));
317                            data_builder.emitter.emit(Err(crate_err));
318                        }
319                        None
320                    }
321                })
322                .filter_map(move |entry| match entry.metadata() {
323                    Ok(metadata) => {
324                        let path = entry.into_path();
325                        if is_initial {
326                            // emit initial scans
327                            if let Some(ref emitter) = data_builder.scan_emitter {
328                                emitter.borrow_mut().handle_event(Ok(path.clone()));
329                            }
330                        }
331                        let meta_path = MetaPath::from_parts_unchecked(path, metadata);
332                        let data_path = data_builder.build_path_data(&meta_path);
333
334                        Some((meta_path.into_path(), data_path))
335                    }
336                    Err(e) => {
337                        // emit event.
338                        let path = entry.into_path();
339                        data_builder.emitter.emit_io_err(e, Some(path));
340
341                        None
342                    }
343                })
344        }
345
346        fn dir_scan_depth(is_recursive: bool) -> usize {
347            if is_recursive { usize::MAX } else { 1 }
348        }
349    }
350
351    /// Stored data for a one path locations.
352    ///
353    /// See [`WatchData`] for more detail.
354    #[derive(Debug, Clone)]
355    struct PathData {
356        /// File updated time.
357        mtime: i64,
358
359        /// Content's hash value, only available if user request compare file
360        /// contents and read successful.
361        hash: Option<u64>,
362
363        /// Checked time.
364        last_check: Instant,
365    }
366
367    impl PathData {
368        /// Create a new `PathData`.
369        fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
370            let metadata = meta_path.metadata();
371
372            PathData {
373                mtime: metadata.modified().map_or(0, system_time_to_seconds),
374                hash: data_builder
375                    .build_hasher
376                    .as_ref()
377                    .filter(|_| metadata.is_file())
378                    .and_then(|build_hasher| {
379                        Self::get_content_hash(build_hasher, meta_path.path()).ok()
380                    }),
381
382                last_check: data_builder.now,
383            }
384        }
385
386        /// Get hash value for the data content in given file `path`.
387        fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
388            let mut hasher = build_hasher.build_hasher();
389            let mut file = File::open(path)?;
390            let mut buf = [0; 512];
391
392            loop {
393                let n = match file.read(&mut buf) {
394                    Ok(0) => break,
395                    Ok(len) => len,
396                    Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
397                    Err(e) => return Err(e),
398                };
399
400                hasher.write(&buf[..n]);
401            }
402
403            Ok(hasher.finish())
404        }
405
406        /// Get [`Event`] by compare two optional [`PathData`].
407        fn compare_to_event<P>(
408            path: P,
409            old: Option<&PathData>,
410            new: Option<&PathData>,
411        ) -> Option<Event>
412        where
413            P: Into<PathBuf>,
414        {
415            match (old, new) {
416                (Some(old), Some(new)) => {
417                    if new.mtime > old.mtime {
418                        Some(EventKind::Modify(ModifyKind::Metadata(
419                            MetadataKind::WriteTime,
420                        )))
421                    } else if new.hash != old.hash {
422                        Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
423                    } else {
424                        None
425                    }
426                }
427                (None, Some(_new)) => Some(EventKind::Create(CreateKind::Any)),
428                (Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)),
429                (None, None) => None,
430            }
431            .map(|event_kind| Event::new(event_kind).add_path(path.into()))
432        }
433    }
434
435    /// Compose path and its metadata.
436    ///
437    /// This data structure designed for make sure path and its metadata can be
438    /// transferred in consistent way, and may avoid some duplicated
439    /// `fs::metadata()` function call in some situations.
440    #[derive(Debug)]
441    pub(super) struct MetaPath {
442        path: PathBuf,
443        metadata: Metadata,
444    }
445
446    impl MetaPath {
447        /// Create `MetaPath` by given parts.
448        ///
449        /// # Invariant
450        ///
451        /// User must make sure the input `metadata` are associated with `path`.
452        fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
453            Self { path, metadata }
454        }
455
456        fn path(&self) -> &Path {
457            &self.path
458        }
459
460        fn metadata(&self) -> &Metadata {
461            &self.metadata
462        }
463
464        fn into_path(self) -> PathBuf {
465            self.path
466        }
467    }
468
469    /// Thin wrapper for outer event handler, for easy to use.
470    struct EventEmitter(
471        // Use `RefCell` to make sure `emit()` only need shared borrow of self (&self).
472        // Use `Box` to make sure EventEmitter is Sized.
473        Box<RefCell<dyn EventHandler>>,
474    );
475
476    impl EventEmitter {
477        fn new<F: EventHandler>(event_handler: F) -> Self {
478            Self(Box::new(RefCell::new(event_handler)))
479        }
480
481        /// Emit single event.
482        fn emit(&self, event: crate::Result<Event>) {
483            self.0.borrow_mut().handle_event(event);
484        }
485
486        /// Emit event.
487        fn emit_ok(&self, event: Event) {
488            self.emit(Ok(event));
489        }
490
491        /// Emit io error event.
492        fn emit_io_err<E, P>(&self, err: E, path: Option<P>)
493        where
494            E: Into<io::Error>,
495            P: Into<PathBuf>,
496        {
497            let e = crate::Error::io(err.into());
498            if let Some(path) = path {
499                self.emit(Err(e.add_path(path.into())));
500            } else {
501                self.emit(Err(e));
502            }
503        }
504    }
505}
506
507/// Polling based `Watcher` implementation.
508///
509/// By default scans through all files and checks for changed entries based on their change date.
510/// Can also be changed to perform file content change checks.
511///
512/// See [Config] for more details.
513#[derive(Debug)]
514pub struct PollWatcher {
515    watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
516    data_builder: Arc<Mutex<DataBuilder>>,
517    want_to_stop: Arc<AtomicBool>,
518    /// channel to the poll loop
519    /// currently used only for manual polling
520    message_channel: Sender<()>,
521    delay: Option<Duration>,
522    follow_sylinks: bool,
523}
524
525impl PollWatcher {
526    /// Create a new [`PollWatcher`], configured as needed.
527    pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
528        Ok(Self::with_opt::<_, ()>(event_handler, config, None))
529    }
530
531    /// Actively poll for changes. Can be combined with a timeout of 0 to perform only manual polling.
532    pub fn poll(&self) -> crate::Result<()> {
533        self.message_channel
534            .send(())
535            .map_err(|_| Error::generic("failed to send poll message"))?;
536        Ok(())
537    }
538
539    /// Returns a sender to initiate changes detection.
540    #[cfg(test)]
541    pub(crate) fn poll_sender(&self) -> Sender<()> {
542        self.message_channel.clone()
543    }
544
545    /// Create a new [`PollWatcher`] with an scan event handler.
546    ///
547    /// `scan_fallback` is called on the initial scan with all files seen by the pollwatcher.
548    pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
549        event_handler: F,
550        config: Config,
551        scan_callback: G,
552    ) -> crate::Result<PollWatcher> {
553        Ok(Self::with_opt(event_handler, config, Some(scan_callback)))
554    }
555
556    /// create a new [`PollWatcher`] with all options.
557    fn with_opt<F: EventHandler, G: ScanEventHandler>(
558        event_handler: F,
559        config: Config,
560        scan_callback: Option<G>,
561    ) -> PollWatcher {
562        let data_builder =
563            DataBuilder::new(event_handler, config.compare_contents(), scan_callback);
564
565        let (tx, rx) = unbounded();
566
567        let poll_watcher = PollWatcher {
568            watches: Arc::default(),
569            data_builder: Arc::new(Mutex::new(data_builder)),
570            want_to_stop: Arc::new(AtomicBool::new(false)),
571            delay: config.poll_interval(),
572            follow_sylinks: config.follow_symlinks(),
573            message_channel: tx,
574        };
575
576        poll_watcher.run(rx);
577
578        poll_watcher
579    }
580
581    fn run(&self, rx: Receiver<()>) {
582        let watches = Arc::clone(&self.watches);
583        let data_builder = Arc::clone(&self.data_builder);
584        let want_to_stop = Arc::clone(&self.want_to_stop);
585        let delay = self.delay;
586
587        let result = thread::Builder::new()
588            .name("notify-rs poll loop".to_string())
589            .spawn(move || {
590                loop {
591                    if want_to_stop.load(Ordering::SeqCst) {
592                        break;
593                    }
594
595                    // HINT: Make sure always lock in the same order to avoid deadlock.
596                    //
597                    // FIXME: inconsistent: some place mutex poison cause panic,
598                    // some place just ignore.
599                    if let (Ok(mut watches), Ok(mut data_builder)) =
600                        (watches.lock(), data_builder.lock())
601                    {
602                        data_builder.update_timestamp();
603
604                        let vals = watches.values_mut();
605                        for watch_data in vals {
606                            watch_data.rescan(&data_builder);
607                        }
608                    }
609                    // TODO: v7.0 use delay - (Instant::now().saturating_duration_since(start))
610                    let result = if let Some(delay) = delay {
611                        rx.recv_timeout(delay).or_else(|e| match e {
612                            mpsc::RecvTimeoutError::Timeout => Ok(()),
613                            mpsc::RecvTimeoutError::Disconnected => Err(mpsc::RecvError),
614                        })
615                    } else {
616                        rx.recv()
617                    };
618                    if let Err(e) = result {
619                        tracing::error!(?e, "failed to receive poll message");
620                    }
621                }
622            });
623        if let Err(e) = result {
624            tracing::error!(?e, "failed to start poll watcher thread");
625        }
626    }
627
628    /// Watch a path location.
629    ///
630    /// QUESTION: this function never return an Error, is it as intend?
631    /// Please also consider the IO Error event problem.
632    fn watch_inner(&self, path: &Path, watch_mode: WatchMode) {
633        // HINT: Make sure always lock in the same order to avoid deadlock.
634        //
635        // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
636        if let (Ok(mut watches), Ok(mut data_builder)) =
637            (self.watches.lock(), self.data_builder.lock())
638        {
639            data_builder.update_timestamp();
640
641            let watch_data = data_builder.build_watch_data(
642                path.to_path_buf(),
643                watch_mode.recursive_mode.is_recursive(),
644                self.follow_sylinks,
645            );
646
647            // if create watch_data successful, add it to watching list.
648            if let Some(watch_data) = watch_data {
649                watches.insert(path.to_path_buf(), watch_data);
650            }
651        }
652    }
653
654    /// Unwatch a path.
655    ///
656    /// Return `Err(_)` if given path has't be monitored.
657    fn unwatch_inner(&self, path: &Path) -> crate::Result<()> {
658        // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
659        self.watches
660            .lock()
661            .unwrap()
662            .remove(path)
663            .map(|_| ())
664            .ok_or_else(crate::Error::watch_not_found)
665    }
666}
667
668impl Watcher for PollWatcher {
669    /// Create a new [`PollWatcher`].
670    #[tracing::instrument(level = "debug", skip(event_handler))]
671    fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
672        Self::new(event_handler, config)
673    }
674
675    #[tracing::instrument(level = "debug", skip(self))]
676    fn watch(&mut self, path: &Path, watch_mode: WatchMode) -> crate::Result<()> {
677        self.watch_inner(path, watch_mode);
678
679        Ok(())
680    }
681
682    #[tracing::instrument(level = "debug", skip(self))]
683    fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
684        self.unwatch_inner(path)
685    }
686
687    fn kind() -> crate::WatcherKind {
688        crate::WatcherKind::PollWatcher
689    }
690}
691
692impl Drop for PollWatcher {
693    fn drop(&mut self) {
694        self.want_to_stop.store(true, Ordering::Relaxed);
695    }
696}
697
698#[cfg(test)]
699mod tests {
700    use super::PollWatcher;
701    use crate::{Error, ErrorKind, RecursiveMode, TargetMode, WatchMode, Watcher, test::*};
702
703    fn watcher() -> (TestWatcher<PollWatcher>, Receiver) {
704        poll_watcher_channel()
705    }
706
707    #[test]
708    fn poll_watcher_is_send_and_sync() {
709        fn check<T: Send + Sync>() {}
710        check::<PollWatcher>();
711    }
712
713    #[test]
714    fn create_file() {
715        let tmpdir = testdir();
716        let (mut watcher, rx) = watcher();
717        watcher.watch_recursively(&tmpdir);
718
719        let path = tmpdir.path().join("entry");
720        std::fs::File::create_new(&path).expect("Unable to create");
721
722        rx.sleep_until_exists(&path);
723        rx.wait_ordered_exact([expected(&path).create_any()]);
724    }
725
726    #[test]
727    #[ignore = "not implemented"]
728    fn create_self_file() {
729        let tmpdir = testdir();
730        let (mut watcher, rx) = watcher();
731
732        let path = tmpdir.path().join("entry");
733
734        watcher.watch_nonrecursively(&path);
735
736        std::fs::File::create_new(&path).expect("create");
737
738        rx.sleep_until_exists(&path);
739        rx.wait_ordered_exact([expected(&path).create_any()]);
740    }
741
742    #[test]
743    #[ignore = "not implemented"]
744    fn create_self_file_no_track() {
745        let tmpdir = testdir();
746        let (mut watcher, _) = watcher();
747
748        let path = tmpdir.path().join("entry");
749
750        let result = watcher.watcher.watch(
751            &path,
752            WatchMode {
753                recursive_mode: RecursiveMode::NonRecursive,
754                target_mode: TargetMode::NoTrack,
755            },
756        );
757        assert!(matches!(
758            result,
759            Err(Error {
760                paths: _,
761                kind: ErrorKind::PathNotFound
762            })
763        ));
764    }
765
766    #[test]
767    #[ignore = "TODO: not implemented"]
768    fn create_self_file_nested() {
769        let tmpdir = testdir();
770        let (mut watcher, rx) = watcher();
771
772        let path = tmpdir.path().join("entry/nested");
773
774        watcher.watch_nonrecursively(&path);
775
776        std::fs::create_dir_all(path.parent().unwrap()).expect("create");
777        std::fs::File::create_new(&path).expect("create");
778
779        rx.wait_ordered_exact([expected(&path).create_file()]);
780    }
781
782    #[test]
783    fn create_dir() {
784        let tmpdir = testdir();
785        let (mut watcher, rx) = watcher();
786        watcher.watch_recursively(&tmpdir);
787
788        let path = tmpdir.path().join("entry");
789        std::fs::create_dir(&path).expect("Unable to create");
790
791        rx.sleep_until_exists(&path);
792        rx.wait_ordered_exact([expected(&path).create_any()]);
793    }
794
795    #[test]
796    fn modify_file() {
797        let tmpdir = testdir();
798        let (mut watcher, rx) = watcher();
799        let path = tmpdir.path().join("entry");
800        std::fs::File::create_new(&path).expect("Unable to create");
801
802        watcher.watch_recursively(&tmpdir);
803        std::fs::write(&path, b"123").expect("Unable to write");
804
805        assert!(
806            rx.sleep_until(|| std::fs::read_to_string(&path).is_ok_and(|content| content == "123")),
807            "the file wasn't modified"
808        );
809        rx.wait_ordered_exact([expected(&path).modify_data_any()]);
810    }
811
812    #[test]
813    fn rename_file() {
814        let tmpdir = testdir();
815        let (mut watcher, rx) = watcher();
816        let path = tmpdir.path().join("entry");
817        let new_path = tmpdir.path().join("new_entry");
818        std::fs::File::create_new(&path).expect("Unable to create");
819
820        watcher.watch_recursively(&tmpdir);
821        std::fs::rename(&path, &new_path).expect("Unable to remove");
822
823        rx.sleep_while_exists(&path);
824        rx.sleep_until_exists(&new_path);
825
826        rx.wait_unordered_exact([
827            expected(&path).remove_any(),
828            expected(&new_path).create_any(),
829        ]);
830    }
831
832    #[test]
833    #[ignore = "TODO: not implemented"]
834    fn rename_self_file() {
835        let tmpdir = testdir();
836        let (mut watcher, rx) = watcher();
837
838        let path = tmpdir.path().join("entry");
839        std::fs::File::create_new(&path).expect("create");
840
841        watcher.watch_nonrecursively(&path);
842        let new_path = tmpdir.path().join("renamed");
843
844        std::fs::rename(&path, &new_path).expect("rename");
845
846        rx.sleep_while_exists(&path);
847        rx.sleep_until_exists(&new_path);
848
849        rx.wait_unordered_exact([expected(&path).remove_any()])
850            .ensure_no_tail();
851
852        std::fs::rename(&new_path, &path).expect("rename2");
853
854        rx.sleep_until_exists(&new_path);
855        rx.sleep_while_exists(&path);
856
857        rx.wait_unordered_exact([expected(&path).create_any()])
858            .ensure_no_tail();
859    }
860
861    #[test]
862    #[ignore = "TODO: not implemented"]
863    fn rename_self_file_no_track() {
864        let tmpdir = testdir();
865        let (mut watcher, rx) = watcher();
866
867        let path = tmpdir.path().join("entry");
868        std::fs::File::create_new(&path).expect("create");
869
870        watcher.watch(
871            &path,
872            WatchMode {
873                recursive_mode: RecursiveMode::NonRecursive,
874                target_mode: TargetMode::NoTrack,
875            },
876        );
877
878        let new_path = tmpdir.path().join("renamed");
879
880        std::fs::rename(&path, &new_path).expect("rename");
881
882        rx.sleep_while_exists(&path);
883        rx.sleep_until_exists(&new_path);
884
885        rx.wait_unordered_exact([expected(&path).remove_any()])
886            .ensure_no_tail();
887
888        let result = watcher.watcher.watch(
889            &path,
890            WatchMode {
891                recursive_mode: RecursiveMode::NonRecursive,
892                target_mode: TargetMode::NoTrack,
893            },
894        );
895        assert!(matches!(
896            result,
897            Err(Error {
898                paths: _,
899                kind: ErrorKind::PathNotFound
900            })
901        ));
902    }
903
904    #[test]
905    fn delete_file() {
906        let tmpdir = testdir();
907        let (mut watcher, rx) = watcher();
908        let path = tmpdir.path().join("entry");
909        std::fs::File::create_new(&path).expect("Unable to create");
910
911        watcher.watch_recursively(&tmpdir);
912        std::fs::remove_file(&path).expect("Unable to remove");
913
914        rx.sleep_while_exists(&path);
915        rx.wait_ordered_exact([expected(&path).remove_any()]);
916    }
917
918    #[test]
919    #[ignore = "TODO: not implemented"]
920    fn delete_self_file() {
921        let tmpdir = testdir();
922        let (mut watcher, rx) = watcher();
923        let path = tmpdir.path().join("entry");
924        std::fs::File::create_new(&path).expect("Unable to create");
925
926        watcher.watch_nonrecursively(&path);
927
928        std::fs::remove_file(&path).expect("Unable to remove");
929
930        rx.sleep_while_exists(&path);
931        rx.wait_ordered_exact([expected(&path).remove_any()]);
932
933        std::fs::write(&path, "").expect("write");
934
935        rx.sleep_until_exists(&path);
936        rx.wait_ordered_exact([expected(&path).create_file()]);
937    }
938
939    #[test]
940    #[ignore = "TODO: not implemented"]
941    fn delete_self_file_no_track() {
942        let tmpdir = testdir();
943        let (mut watcher, rx) = watcher();
944        let path = tmpdir.path().join("entry");
945        std::fs::File::create_new(&path).expect("Unable to create");
946
947        watcher.watch(
948            &path,
949            WatchMode {
950                recursive_mode: RecursiveMode::NonRecursive,
951                target_mode: TargetMode::NoTrack,
952            },
953        );
954
955        std::fs::remove_file(&path).expect("Unable to remove");
956
957        rx.sleep_while_exists(&path);
958        rx.wait_ordered_exact([expected(&path).remove_any()]);
959
960        std::fs::write(&path, "").expect("write");
961
962        rx.ensure_empty_with_wait();
963    }
964
965    #[test]
966    fn create_write_overwrite() {
967        let tmpdir = testdir();
968        let (mut watcher, rx) = watcher();
969        let overwritten_file = tmpdir.path().join("overwritten_file");
970        let overwriting_file = tmpdir.path().join("overwriting_file");
971        std::fs::write(&overwritten_file, "123").expect("write1");
972
973        watcher.watch_nonrecursively(&tmpdir);
974
975        std::fs::File::create(&overwriting_file).expect("create");
976        std::fs::write(&overwriting_file, "321").expect("write2");
977        std::fs::rename(&overwriting_file, &overwritten_file).expect("rename");
978
979        rx.sleep_while_exists(&overwriting_file);
980        assert!(
981            rx.sleep_until(
982                || std::fs::read_to_string(&overwritten_file).is_ok_and(|cnt| cnt == "321")
983            ),
984            "file {overwritten_file:?} was not replaced"
985        );
986
987        rx.wait_unordered([expected(&overwritten_file).modify_data_any()]);
988    }
989}