notify/
poll.rs

1//! Generic Watcher implementation based on polling
2//!
3//! Checks the `watch`ed paths periodically to detect changes. This implementation only uses
4//! Rust stdlib APIs and should work on all of the platforms it supports.
5
6use crate::{Config, Error, EventHandler, Receiver, RecursiveMode, Sender, Watcher, unbounded};
7use std::{
8    collections::HashMap,
9    path::{Path, PathBuf},
10    sync::{
11        Arc, Mutex,
12        atomic::{AtomicBool, Ordering},
13    },
14    thread,
15    time::Duration,
16};
17
18/// Event sent for registered handlers on initial directory scans
19pub type ScanEvent = crate::Result<PathBuf>;
20
21/// Handler trait for receivers of [`ScanEvent`].
22/// Very much the same as [`EventHandler`], but including the Result.
23///
24/// See the full example for more information.
25pub trait ScanEventHandler: Send + 'static {
26    /// Handles an event.
27    fn handle_event(&mut self, event: ScanEvent);
28}
29
30impl<F> ScanEventHandler for F
31where
32    F: FnMut(ScanEvent) + Send + 'static,
33{
34    fn handle_event(&mut self, event: ScanEvent) {
35        (self)(event);
36    }
37}
38
39#[cfg(feature = "crossbeam-channel")]
40impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
41    fn handle_event(&mut self, event: ScanEvent) {
42        let _ = self.send(event);
43    }
44}
45
46#[cfg(feature = "flume")]
47impl ScanEventHandler for flume::Sender<ScanEvent> {
48    fn handle_event(&mut self, event: ScanEvent) {
49        let _ = self.send(event);
50    }
51}
52
53impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
54    fn handle_event(&mut self, event: ScanEvent) {
55        let _ = self.send(event);
56    }
57}
58
59impl ScanEventHandler for () {
60    fn handle_event(&mut self, _event: ScanEvent) {}
61}
62
63use data::{DataBuilder, WatchData};
64mod data {
65    use crate::{
66        EventHandler,
67        event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
68    };
69    use std::{
70        cell::RefCell,
71        collections::{HashMap, hash_map::RandomState},
72        fmt::{self, Debug},
73        fs::{self, File, Metadata},
74        hash::{BuildHasher, Hasher},
75        io::{self, Read},
76        path::{Path, PathBuf},
77        time::Instant,
78    };
79    use walkdir::WalkDir;
80
81    use super::ScanEventHandler;
82
83    fn system_time_to_seconds(time: std::time::SystemTime) -> i64 {
84        match time.duration_since(std::time::SystemTime::UNIX_EPOCH) {
85            Ok(d) => d.as_secs() as i64,
86            Err(e) => -(e.duration().as_secs() as i64),
87        }
88    }
89
90    /// Builder for [`WatchData`] & [`PathData`].
91    pub(super) struct DataBuilder {
92        emitter: EventEmitter,
93        scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
94
95        // TODO: May allow user setup their custom BuildHasher / BuildHasherDefault
96        // in future.
97        build_hasher: Option<RandomState>,
98
99        // current timestamp for building Data.
100        now: Instant,
101    }
102
103    impl DataBuilder {
104        pub(super) fn new<F, G>(
105            event_handler: F,
106            compare_content: bool,
107            scan_emitter: Option<G>,
108        ) -> Self
109        where
110            F: EventHandler,
111            G: ScanEventHandler,
112        {
113            let scan_emitter = match scan_emitter {
114                None => None,
115                Some(v) => {
116                    // workaround for a weird type resolution bug when directly going to dyn Trait
117                    let intermediate: Box<RefCell<dyn ScanEventHandler>> =
118                        Box::new(RefCell::new(v));
119                    Some(intermediate)
120                }
121            };
122            Self {
123                emitter: EventEmitter::new(event_handler),
124                scan_emitter,
125                build_hasher: compare_content.then(RandomState::default),
126                now: Instant::now(),
127            }
128        }
129
130        /// Update internal timestamp.
131        pub(super) fn update_timestamp(&mut self) {
132            self.now = Instant::now();
133        }
134
135        /// Create [`WatchData`].
136        ///
137        /// This function will return `Err(_)` if can not retrieve metadata from
138        /// the path location. (e.g., not found).
139        pub(super) fn build_watch_data(
140            &self,
141            root: PathBuf,
142            is_recursive: bool,
143            follow_symlinks: bool,
144        ) -> Option<WatchData> {
145            WatchData::new(self, root, is_recursive, follow_symlinks)
146        }
147
148        /// Create [`PathData`].
149        fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
150            PathData::new(self, meta_path)
151        }
152    }
153
154    impl Debug for DataBuilder {
155        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
156            f.debug_struct("DataBuilder")
157                .field("build_hasher", &self.build_hasher)
158                .field("now", &self.now)
159                .finish()
160        }
161    }
162
163    #[derive(Debug)]
164    pub(super) struct WatchData {
165        // config part, won't change.
166        root: PathBuf,
167        is_recursive: bool,
168        follow_symlinks: bool,
169
170        // current status part.
171        all_path_data: HashMap<PathBuf, PathData>,
172    }
173
174    impl WatchData {
175        /// Scan filesystem and create a new `WatchData`.
176        ///
177        /// # Side effect
178        ///
179        /// This function may send event by `data_builder.emitter`.
180        fn new(
181            data_builder: &DataBuilder,
182            root: PathBuf,
183            is_recursive: bool,
184            follow_symlinks: bool,
185        ) -> Option<Self> {
186            // If metadata read error at `root` path, it will emit
187            // a error event and stop to create the whole `WatchData`.
188            //
189            // QUESTION: inconsistent?
190            //
191            // When user try to *CREATE* a watch by `poll_watcher.watch(root, ..)`,
192            // if `root` path hit an io error, then watcher will reject to
193            // create this new watch.
194            //
195            // This may inconsistent with *POLLING* a watch. When watcher
196            // continue polling, io error at root path will not delete
197            // a existing watch. polling still working.
198            //
199            // So, consider a config file may not exists at first time but may
200            // create after a while, developer cannot watch it.
201            //
202            // FIXME: Can we always allow to watch a path, even file not
203            // found at this path?
204            if let Err(e) = fs::metadata(&root) {
205                data_builder.emitter.emit_io_err(e, Some(&root));
206                return None;
207            }
208
209            let all_path_data = Self::scan_all_path_data(
210                data_builder,
211                root.clone(),
212                is_recursive,
213                follow_symlinks,
214                true,
215            )
216            .collect();
217
218            Some(Self {
219                root,
220                is_recursive,
221                follow_symlinks,
222                all_path_data,
223            })
224        }
225
226        /// Rescan filesystem and update this `WatchData`.
227        ///
228        /// # Side effect
229        ///
230        /// This function may emit event by `data_builder.emitter`.
231        pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) {
232            // scan current filesystem.
233            for (path, new_path_data) in Self::scan_all_path_data(
234                data_builder,
235                self.root.clone(),
236                self.is_recursive,
237                self.follow_symlinks,
238                false,
239            ) {
240                let old_path_data = self
241                    .all_path_data
242                    .insert(path.clone(), new_path_data.clone());
243
244                // emit event
245                let event =
246                    PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data));
247                if let Some(event) = event {
248                    data_builder.emitter.emit_ok(event);
249                }
250            }
251
252            // scan for disappeared paths.
253            let mut disappeared_paths = Vec::new();
254            for (path, path_data) in self.all_path_data.iter() {
255                if path_data.last_check < data_builder.now {
256                    disappeared_paths.push(path.clone());
257                }
258            }
259
260            // remove disappeared paths
261            for path in disappeared_paths {
262                let old_path_data = self.all_path_data.remove(&path);
263
264                // emit event
265                let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
266                if let Some(event) = event {
267                    data_builder.emitter.emit_ok(event);
268                }
269            }
270        }
271
272        /// Get all `PathData` by given configuration.
273        ///
274        /// # Side Effect
275        ///
276        /// This function may emit some IO Error events by `data_builder.emitter`.
277        fn scan_all_path_data(
278            data_builder: &'_ DataBuilder,
279            root: PathBuf,
280            is_recursive: bool,
281            follow_symlinks: bool,
282            // whether this is an initial scan, used only for events
283            is_initial: bool,
284        ) -> impl Iterator<Item = (PathBuf, PathData)> + '_ {
285            log::trace!("rescanning {root:?}");
286            // WalkDir return only one entry if root is a file (not a folder),
287            // so we can use single logic to do the both file & dir's jobs.
288            //
289            // See: https://docs.rs/walkdir/2.0.1/walkdir/struct.WalkDir.html#method.new
290            WalkDir::new(root)
291                .follow_links(follow_symlinks)
292                .max_depth(Self::dir_scan_depth(is_recursive))
293                .into_iter()
294                .filter_map(|entry_res| match entry_res {
295                    Ok(entry) => Some(entry),
296                    Err(err) => {
297                        log::warn!("walkdir error scanning {err:?}");
298
299                        if let Some(io_error) = err.io_error() {
300                            // clone an io::Error, so we have to create a new one.
301                            let new_io_error = io::Error::new(io_error.kind(), err.to_string());
302                            data_builder.emitter.emit_io_err(new_io_error, err.path());
303                        } else {
304                            let crate_err =
305                                crate::Error::new(crate::ErrorKind::Generic(err.to_string()));
306                            data_builder.emitter.emit(Err(crate_err));
307                        }
308                        None
309                    }
310                })
311                .filter_map(move |entry| match entry.metadata() {
312                    Ok(metadata) => {
313                        let path = entry.into_path();
314                        if is_initial {
315                            // emit initial scans
316                            if let Some(ref emitter) = data_builder.scan_emitter {
317                                emitter.borrow_mut().handle_event(Ok(path.clone()));
318                            }
319                        }
320                        let meta_path = MetaPath::from_parts_unchecked(path, metadata);
321                        let data_path = data_builder.build_path_data(&meta_path);
322
323                        Some((meta_path.into_path(), data_path))
324                    }
325                    Err(e) => {
326                        // emit event.
327                        let path = entry.into_path();
328                        data_builder.emitter.emit_io_err(e, Some(path));
329
330                        None
331                    }
332                })
333        }
334
335        fn dir_scan_depth(is_recursive: bool) -> usize {
336            if is_recursive { usize::MAX } else { 1 }
337        }
338    }
339
340    /// Stored data for a one path locations.
341    ///
342    /// See [`WatchData`] for more detail.
343    #[derive(Debug, Clone)]
344    struct PathData {
345        /// File updated time.
346        mtime: i64,
347
348        /// Content's hash value, only available if user request compare file
349        /// contents and read successful.
350        hash: Option<u64>,
351
352        /// Checked time.
353        last_check: Instant,
354    }
355
356    impl PathData {
357        /// Create a new `PathData`.
358        fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
359            let metadata = meta_path.metadata();
360
361            PathData {
362                mtime: metadata.modified().map_or(0, system_time_to_seconds),
363                hash: data_builder
364                    .build_hasher
365                    .as_ref()
366                    .filter(|_| metadata.is_file())
367                    .and_then(|build_hasher| {
368                        Self::get_content_hash(build_hasher, meta_path.path()).ok()
369                    }),
370
371                last_check: data_builder.now,
372            }
373        }
374
375        /// Get hash value for the data content in given file `path`.
376        fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
377            let mut hasher = build_hasher.build_hasher();
378            let mut file = File::open(path)?;
379            let mut buf = [0; 512];
380
381            loop {
382                let n = match file.read(&mut buf) {
383                    Ok(0) => break,
384                    Ok(len) => len,
385                    Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
386                    Err(e) => return Err(e),
387                };
388
389                hasher.write(&buf[..n]);
390            }
391
392            Ok(hasher.finish())
393        }
394
395        /// Get [`Event`] by compare two optional [`PathData`].
396        fn compare_to_event<P>(
397            path: P,
398            old: Option<&PathData>,
399            new: Option<&PathData>,
400        ) -> Option<Event>
401        where
402            P: Into<PathBuf>,
403        {
404            match (old, new) {
405                (Some(old), Some(new)) => {
406                    if new.mtime > old.mtime {
407                        Some(EventKind::Modify(ModifyKind::Metadata(
408                            MetadataKind::WriteTime,
409                        )))
410                    } else if new.hash != old.hash {
411                        Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
412                    } else {
413                        None
414                    }
415                }
416                (None, Some(_new)) => Some(EventKind::Create(CreateKind::Any)),
417                (Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)),
418                (None, None) => None,
419            }
420            .map(|event_kind| Event::new(event_kind).add_path(path.into()))
421        }
422    }
423
424    /// Compose path and its metadata.
425    ///
426    /// This data structure designed for make sure path and its metadata can be
427    /// transferred in consistent way, and may avoid some duplicated
428    /// `fs::metadata()` function call in some situations.
429    #[derive(Debug)]
430    pub(super) struct MetaPath {
431        path: PathBuf,
432        metadata: Metadata,
433    }
434
435    impl MetaPath {
436        /// Create `MetaPath` by given parts.
437        ///
438        /// # Invariant
439        ///
440        /// User must make sure the input `metadata` are associated with `path`.
441        fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
442            Self { path, metadata }
443        }
444
445        fn path(&self) -> &Path {
446            &self.path
447        }
448
449        fn metadata(&self) -> &Metadata {
450            &self.metadata
451        }
452
453        fn into_path(self) -> PathBuf {
454            self.path
455        }
456    }
457
458    /// Thin wrapper for outer event handler, for easy to use.
459    struct EventEmitter(
460        // Use `RefCell` to make sure `emit()` only need shared borrow of self (&self).
461        // Use `Box` to make sure EventEmitter is Sized.
462        Box<RefCell<dyn EventHandler>>,
463    );
464
465    impl EventEmitter {
466        fn new<F: EventHandler>(event_handler: F) -> Self {
467            Self(Box::new(RefCell::new(event_handler)))
468        }
469
470        /// Emit single event.
471        fn emit(&self, event: crate::Result<Event>) {
472            self.0.borrow_mut().handle_event(event);
473        }
474
475        /// Emit event.
476        fn emit_ok(&self, event: Event) {
477            self.emit(Ok(event))
478        }
479
480        /// Emit io error event.
481        fn emit_io_err<E, P>(&self, err: E, path: Option<P>)
482        where
483            E: Into<io::Error>,
484            P: Into<PathBuf>,
485        {
486            let e = crate::Error::io(err.into());
487            if let Some(path) = path {
488                self.emit(Err(e.add_path(path.into())));
489            } else {
490                self.emit(Err(e));
491            }
492        }
493    }
494}
495
496/// Polling based `Watcher` implementation.
497///
498/// By default scans through all files and checks for changed entries based on their change date.
499/// Can also be changed to perform file content change checks.
500///
501/// See [Config] for more details.
502#[derive(Debug)]
503pub struct PollWatcher {
504    watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
505    data_builder: Arc<Mutex<DataBuilder>>,
506    want_to_stop: Arc<AtomicBool>,
507    /// channel to the poll loop
508    /// currently used only for manual polling
509    message_channel: Sender<()>,
510    delay: Option<Duration>,
511    follow_sylinks: bool,
512}
513
514impl PollWatcher {
515    /// Create a new [`PollWatcher`], configured as needed.
516    pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
517        Self::with_opt::<_, ()>(event_handler, config, None)
518    }
519
520    /// Actively poll for changes. Can be combined with a timeout of 0 to perform only manual polling.
521    pub fn poll(&self) -> crate::Result<()> {
522        self.message_channel
523            .send(())
524            .map_err(|_| Error::generic("failed to send poll message"))?;
525        Ok(())
526    }
527
528    /// Returns a sender to initiate changes detection.
529    #[cfg(test)]
530    pub(crate) fn poll_sender(&self) -> Sender<()> {
531        self.message_channel.clone()
532    }
533
534    /// Create a new [`PollWatcher`] with an scan event handler.
535    ///
536    /// `scan_fallback` is called on the initial scan with all files seen by the pollwatcher.
537    pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
538        event_handler: F,
539        config: Config,
540        scan_callback: G,
541    ) -> crate::Result<PollWatcher> {
542        Self::with_opt(event_handler, config, Some(scan_callback))
543    }
544
545    /// create a new [`PollWatcher`] with all options.
546    fn with_opt<F: EventHandler, G: ScanEventHandler>(
547        event_handler: F,
548        config: Config,
549        scan_callback: Option<G>,
550    ) -> crate::Result<PollWatcher> {
551        let data_builder =
552            DataBuilder::new(event_handler, config.compare_contents(), scan_callback);
553
554        let (tx, rx) = unbounded();
555
556        let poll_watcher = PollWatcher {
557            watches: Default::default(),
558            data_builder: Arc::new(Mutex::new(data_builder)),
559            want_to_stop: Arc::new(AtomicBool::new(false)),
560            delay: config.poll_interval(),
561            follow_sylinks: config.follow_symlinks(),
562            message_channel: tx,
563        };
564
565        poll_watcher.run(rx);
566
567        Ok(poll_watcher)
568    }
569
570    fn run(&self, rx: Receiver<()>) {
571        let watches = Arc::clone(&self.watches);
572        let data_builder = Arc::clone(&self.data_builder);
573        let want_to_stop = Arc::clone(&self.want_to_stop);
574        let delay = self.delay;
575
576        let _ = thread::Builder::new()
577            .name("notify-rs poll loop".to_string())
578            .spawn(move || {
579                loop {
580                    if want_to_stop.load(Ordering::SeqCst) {
581                        break;
582                    }
583
584                    // HINT: Make sure always lock in the same order to avoid deadlock.
585                    //
586                    // FIXME: inconsistent: some place mutex poison cause panic,
587                    // some place just ignore.
588                    if let (Ok(mut watches), Ok(mut data_builder)) =
589                        (watches.lock(), data_builder.lock())
590                    {
591                        data_builder.update_timestamp();
592
593                        let vals = watches.values_mut();
594                        for watch_data in vals {
595                            watch_data.rescan(&mut data_builder);
596                        }
597                    }
598                    // TODO: v7.0 use delay - (Instant::now().saturating_duration_since(start))
599                    if let Some(delay) = delay {
600                        let _ = rx.recv_timeout(delay);
601                    } else {
602                        let _ = rx.recv();
603                    }
604                }
605            });
606    }
607
608    /// Watch a path location.
609    ///
610    /// QUESTION: this function never return an Error, is it as intend?
611    /// Please also consider the IO Error event problem.
612    fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) {
613        // HINT: Make sure always lock in the same order to avoid deadlock.
614        //
615        // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
616        if let (Ok(mut watches), Ok(mut data_builder)) =
617            (self.watches.lock(), self.data_builder.lock())
618        {
619            data_builder.update_timestamp();
620
621            let watch_data = data_builder.build_watch_data(
622                path.to_path_buf(),
623                recursive_mode.is_recursive(),
624                self.follow_sylinks,
625            );
626
627            // if create watch_data successful, add it to watching list.
628            if let Some(watch_data) = watch_data {
629                watches.insert(path.to_path_buf(), watch_data);
630            }
631        }
632    }
633
634    /// Unwatch a path.
635    ///
636    /// Return `Err(_)` if given path has't be monitored.
637    fn unwatch_inner(&mut self, path: &Path) -> crate::Result<()> {
638        // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
639        self.watches
640            .lock()
641            .unwrap()
642            .remove(path)
643            .map(|_| ())
644            .ok_or_else(crate::Error::watch_not_found)
645    }
646}
647
648impl Watcher for PollWatcher {
649    /// Create a new [`PollWatcher`].
650    fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
651        Self::new(event_handler, config)
652    }
653
654    fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> crate::Result<()> {
655        self.watch_inner(path, recursive_mode);
656
657        Ok(())
658    }
659
660    fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
661        self.unwatch_inner(path)
662    }
663
664    fn kind() -> crate::WatcherKind {
665        crate::WatcherKind::PollWatcher
666    }
667}
668
669impl Drop for PollWatcher {
670    fn drop(&mut self) {
671        self.want_to_stop.store(true, Ordering::Relaxed);
672    }
673}
674
675#[cfg(test)]
676mod tests {
677    use super::PollWatcher;
678    use crate::test::*;
679
680    fn watcher() -> (TestWatcher<PollWatcher>, Receiver) {
681        poll_watcher_channel()
682    }
683
684    #[test]
685    fn poll_watcher_is_send_and_sync() {
686        fn check<T: Send + Sync>() {}
687        check::<PollWatcher>();
688    }
689
690    #[test]
691    fn create_file() {
692        let tmpdir = testdir();
693        let (mut watcher, mut rx) = watcher();
694        watcher.watch_recursively(&tmpdir);
695
696        let path = tmpdir.path().join("entry");
697        std::fs::File::create_new(&path).expect("Unable to create");
698
699        rx.sleep_until_exists(&path);
700        rx.wait_ordered_exact([expected(&path).create_any()]);
701    }
702
703    #[test]
704    fn create_dir() {
705        let tmpdir = testdir();
706        let (mut watcher, mut rx) = watcher();
707        watcher.watch_recursively(&tmpdir);
708
709        let path = tmpdir.path().join("entry");
710        std::fs::create_dir(&path).expect("Unable to create");
711
712        rx.sleep_until_exists(&path);
713        rx.wait_ordered_exact([expected(&path).create_any()]);
714    }
715
716    #[test]
717    fn modify_file() {
718        let tmpdir = testdir();
719        let (mut watcher, mut rx) = watcher();
720        let path = tmpdir.path().join("entry");
721        std::fs::File::create_new(&path).expect("Unable to create");
722
723        watcher.watch_recursively(&tmpdir);
724        std::fs::write(&path, b"123").expect("Unable to write");
725
726        assert!(
727            rx.sleep_until(|| std::fs::read_to_string(&path).is_ok_and(|content| content == "123")),
728            "the file wasn't modified"
729        );
730        rx.wait_ordered_exact([expected(&path).modify_data_any()]);
731    }
732
733    #[test]
734    fn remove_file() {
735        let tmpdir = testdir();
736        let (mut watcher, mut rx) = watcher();
737        let path = tmpdir.path().join("entry");
738        std::fs::File::create_new(&path).expect("Unable to create");
739
740        watcher.watch_recursively(&tmpdir);
741        std::fs::remove_file(&path).expect("Unable to remove");
742
743        rx.sleep_while_exists(&path);
744        rx.wait_ordered_exact([expected(&path).remove_any()]);
745    }
746
747    #[test]
748    fn rename_file() {
749        let tmpdir = testdir();
750        let (mut watcher, mut rx) = watcher();
751        let path = tmpdir.path().join("entry");
752        let new_path = tmpdir.path().join("new_entry");
753        std::fs::File::create_new(&path).expect("Unable to create");
754
755        watcher.watch_recursively(&tmpdir);
756        std::fs::rename(&path, &new_path).expect("Unable to remove");
757
758        rx.sleep_while_exists(&path);
759        rx.sleep_until_exists(&new_path);
760
761        rx.wait_unordered_exact([
762            expected(&path).remove_any(),
763            expected(&new_path).create_any(),
764        ]);
765    }
766
767    #[test]
768    fn create_write_overwrite() {
769        let tmpdir = testdir();
770        let (mut watcher, mut rx) = watcher();
771        let overwritten_file = tmpdir.path().join("overwritten_file");
772        let overwriting_file = tmpdir.path().join("overwriting_file");
773        std::fs::write(&overwritten_file, "123").expect("write1");
774
775        watcher.watch_nonrecursively(&tmpdir);
776
777        std::fs::File::create(&overwriting_file).expect("create");
778        std::fs::write(&overwriting_file, "321").expect("write2");
779        std::fs::rename(&overwriting_file, &overwritten_file).expect("rename");
780
781        rx.sleep_while_exists(&overwriting_file);
782        assert!(
783            rx.sleep_until(
784                || std::fs::read_to_string(&overwritten_file).is_ok_and(|cnt| cnt == "321")
785            ),
786            "file {overwritten_file:?} was not replaced"
787        );
788
789        rx.wait_unordered([expected(&overwritten_file).modify_data_any()]);
790    }
791}