1use crate::{Config, Error, EventHandler, Receiver, Sender, WatchMode, Watcher, unbounded};
7use std::{
8 collections::HashMap,
9 path::{Path, PathBuf},
10 sync::{
11 Arc, Mutex,
12 atomic::{AtomicBool, Ordering},
13 mpsc,
14 },
15 thread,
16 time::Duration,
17};
18
19pub type ScanEvent = crate::Result<PathBuf>;
21
22pub trait ScanEventHandler: Send + 'static {
27 fn handle_event(&mut self, event: ScanEvent);
29}
30
31impl<F> ScanEventHandler for F
32where
33 F: FnMut(ScanEvent) + Send + 'static,
34{
35 fn handle_event(&mut self, event: ScanEvent) {
36 (self)(event);
37 }
38}
39
40#[cfg(feature = "crossbeam-channel")]
41impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
42 fn handle_event(&mut self, event: ScanEvent) {
43 let result = self.send(event);
44 if let Err(e) = result {
45 tracing::error!(?e, "failed to send scan event result");
46 }
47 }
48}
49
50#[cfg(feature = "flume")]
51impl ScanEventHandler for flume::Sender<ScanEvent> {
52 fn handle_event(&mut self, event: ScanEvent) {
53 let result = self.send(event);
54 if let Err(e) = result {
55 tracing::error!(?e, "failed to send scan event result");
56 }
57 }
58}
59
60impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
61 fn handle_event(&mut self, event: ScanEvent) {
62 let result = self.send(event);
63 if let Err(e) = result {
64 tracing::error!(?e, "failed to send scan event result");
65 }
66 }
67}
68
69impl ScanEventHandler for () {
70 fn handle_event(&mut self, _event: ScanEvent) {}
71}
72
73use data::{DataBuilder, WatchData};
74mod data {
75 use crate::{
76 EventHandler,
77 event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
78 };
79 use std::{
80 cell::RefCell,
81 collections::{HashMap, hash_map::RandomState},
82 fmt::{self, Debug},
83 fs::{self, File, Metadata},
84 hash::{BuildHasher, Hasher},
85 io::{self, Read},
86 path::{Path, PathBuf},
87 time::Instant,
88 };
89 use walkdir::WalkDir;
90
91 use super::ScanEventHandler;
92
93 fn system_time_to_seconds(time: std::time::SystemTime) -> i64 {
94 match time.duration_since(std::time::SystemTime::UNIX_EPOCH) {
95 Ok(d) => d.as_secs() as i64,
96 Err(e) => -(e.duration().as_secs() as i64),
97 }
98 }
99
100 pub(super) struct DataBuilder {
102 emitter: EventEmitter,
103 scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
104
105 build_hasher: Option<RandomState>,
108
109 now: Instant,
111 }
112
113 impl DataBuilder {
114 pub(super) fn new<F, G>(
115 event_handler: F,
116 compare_content: bool,
117 scan_emitter: Option<G>,
118 ) -> Self
119 where
120 F: EventHandler,
121 G: ScanEventHandler,
122 {
123 let scan_emitter = match scan_emitter {
124 None => None,
125 Some(v) => {
126 let intermediate: Box<RefCell<dyn ScanEventHandler>> =
128 Box::new(RefCell::new(v));
129 Some(intermediate)
130 }
131 };
132 Self {
133 emitter: EventEmitter::new(event_handler),
134 scan_emitter,
135 build_hasher: compare_content.then(RandomState::default),
136 now: Instant::now(),
137 }
138 }
139
140 pub(super) fn update_timestamp(&mut self) {
142 self.now = Instant::now();
143 }
144
145 pub(super) fn build_watch_data(
150 &self,
151 root: PathBuf,
152 is_recursive: bool,
153 follow_symlinks: bool,
154 ) -> Option<WatchData> {
155 WatchData::new(self, root, is_recursive, follow_symlinks)
156 }
157
158 fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
160 PathData::new(self, meta_path)
161 }
162 }
163
164 impl Debug for DataBuilder {
165 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
166 f.debug_struct("DataBuilder")
167 .field("build_hasher", &self.build_hasher)
168 .field("now", &self.now)
169 .finish()
170 }
171 }
172
173 #[derive(Debug)]
174 pub(super) struct WatchData {
175 root: PathBuf,
177 is_recursive: bool,
178 follow_symlinks: bool,
179
180 all_path_data: HashMap<PathBuf, PathData>,
182 }
183
184 impl WatchData {
185 fn new(
191 data_builder: &DataBuilder,
192 root: PathBuf,
193 is_recursive: bool,
194 follow_symlinks: bool,
195 ) -> Option<Self> {
196 if let Err(e) = fs::metadata(&root) {
215 data_builder.emitter.emit_io_err(e, Some(&root));
216 return None;
217 }
218
219 let all_path_data = Self::scan_all_path_data(
220 data_builder,
221 root.clone(),
222 is_recursive,
223 follow_symlinks,
224 true,
225 )
226 .collect();
227
228 Some(Self {
229 root,
230 is_recursive,
231 follow_symlinks,
232 all_path_data,
233 })
234 }
235
236 pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) {
242 for (path, new_path_data) in Self::scan_all_path_data(
244 data_builder,
245 self.root.clone(),
246 self.is_recursive,
247 self.follow_symlinks,
248 false,
249 ) {
250 let old_path_data = self
251 .all_path_data
252 .insert(path.clone(), new_path_data.clone());
253
254 let event =
256 PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data));
257 if let Some(event) = event {
258 data_builder.emitter.emit_ok(event);
259 }
260 }
261
262 let mut disappeared_paths = Vec::new();
264 for (path, path_data) in self.all_path_data.iter() {
265 if path_data.last_check < data_builder.now {
266 disappeared_paths.push(path.clone());
267 }
268 }
269
270 for path in disappeared_paths {
272 let old_path_data = self.all_path_data.remove(&path);
273
274 let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
276 if let Some(event) = event {
277 data_builder.emitter.emit_ok(event);
278 }
279 }
280 }
281
282 fn scan_all_path_data(
288 data_builder: &'_ DataBuilder,
289 root: PathBuf,
290 is_recursive: bool,
291 follow_symlinks: bool,
292 is_initial: bool,
294 ) -> impl Iterator<Item = (PathBuf, PathData)> + '_ {
295 tracing::trace!("rescanning {root:?}");
296 WalkDir::new(root)
301 .follow_links(follow_symlinks)
302 .max_depth(Self::dir_scan_depth(is_recursive))
303 .into_iter()
304 .filter_map(|entry_res| match entry_res {
305 Ok(entry) => Some(entry),
306 Err(err) => {
307 tracing::warn!("walkdir error scanning {err:?}");
308
309 if let Some(io_error) = err.io_error() {
310 let new_io_error = io::Error::new(io_error.kind(), err.to_string());
312 data_builder.emitter.emit_io_err(new_io_error, err.path());
313 } else {
314 let crate_err =
315 crate::Error::new(crate::ErrorKind::Generic(err.to_string()));
316 data_builder.emitter.emit(Err(crate_err));
317 }
318 None
319 }
320 })
321 .filter_map(move |entry| match entry.metadata() {
322 Ok(metadata) => {
323 let path = entry.into_path();
324 if is_initial {
325 if let Some(ref emitter) = data_builder.scan_emitter {
327 emitter.borrow_mut().handle_event(Ok(path.clone()));
328 }
329 }
330 let meta_path = MetaPath::from_parts_unchecked(path, metadata);
331 let data_path = data_builder.build_path_data(&meta_path);
332
333 Some((meta_path.into_path(), data_path))
334 }
335 Err(e) => {
336 let path = entry.into_path();
338 data_builder.emitter.emit_io_err(e, Some(path));
339
340 None
341 }
342 })
343 }
344
345 fn dir_scan_depth(is_recursive: bool) -> usize {
346 if is_recursive { usize::MAX } else { 1 }
347 }
348 }
349
350 #[derive(Debug, Clone)]
354 struct PathData {
355 mtime: i64,
357
358 hash: Option<u64>,
361
362 last_check: Instant,
364 }
365
366 impl PathData {
367 fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
369 let metadata = meta_path.metadata();
370
371 PathData {
372 mtime: metadata.modified().map_or(0, system_time_to_seconds),
373 hash: data_builder
374 .build_hasher
375 .as_ref()
376 .filter(|_| metadata.is_file())
377 .and_then(|build_hasher| {
378 Self::get_content_hash(build_hasher, meta_path.path()).ok()
379 }),
380
381 last_check: data_builder.now,
382 }
383 }
384
385 fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
387 let mut hasher = build_hasher.build_hasher();
388 let mut file = File::open(path)?;
389 let mut buf = [0; 512];
390
391 loop {
392 let n = match file.read(&mut buf) {
393 Ok(0) => break,
394 Ok(len) => len,
395 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
396 Err(e) => return Err(e),
397 };
398
399 hasher.write(&buf[..n]);
400 }
401
402 Ok(hasher.finish())
403 }
404
405 fn compare_to_event<P>(
407 path: P,
408 old: Option<&PathData>,
409 new: Option<&PathData>,
410 ) -> Option<Event>
411 where
412 P: Into<PathBuf>,
413 {
414 match (old, new) {
415 (Some(old), Some(new)) => {
416 if new.mtime > old.mtime {
417 Some(EventKind::Modify(ModifyKind::Metadata(
418 MetadataKind::WriteTime,
419 )))
420 } else if new.hash != old.hash {
421 Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
422 } else {
423 None
424 }
425 }
426 (None, Some(_new)) => Some(EventKind::Create(CreateKind::Any)),
427 (Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)),
428 (None, None) => None,
429 }
430 .map(|event_kind| Event::new(event_kind).add_path(path.into()))
431 }
432 }
433
434 #[derive(Debug)]
440 pub(super) struct MetaPath {
441 path: PathBuf,
442 metadata: Metadata,
443 }
444
445 impl MetaPath {
446 fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
452 Self { path, metadata }
453 }
454
455 fn path(&self) -> &Path {
456 &self.path
457 }
458
459 fn metadata(&self) -> &Metadata {
460 &self.metadata
461 }
462
463 fn into_path(self) -> PathBuf {
464 self.path
465 }
466 }
467
468 struct EventEmitter(
470 Box<RefCell<dyn EventHandler>>,
473 );
474
475 impl EventEmitter {
476 fn new<F: EventHandler>(event_handler: F) -> Self {
477 Self(Box::new(RefCell::new(event_handler)))
478 }
479
480 fn emit(&self, event: crate::Result<Event>) {
482 self.0.borrow_mut().handle_event(event);
483 }
484
485 fn emit_ok(&self, event: Event) {
487 self.emit(Ok(event))
488 }
489
490 fn emit_io_err<E, P>(&self, err: E, path: Option<P>)
492 where
493 E: Into<io::Error>,
494 P: Into<PathBuf>,
495 {
496 let e = crate::Error::io(err.into());
497 if let Some(path) = path {
498 self.emit(Err(e.add_path(path.into())));
499 } else {
500 self.emit(Err(e));
501 }
502 }
503 }
504}
505
506#[derive(Debug)]
513pub struct PollWatcher {
514 watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
515 data_builder: Arc<Mutex<DataBuilder>>,
516 want_to_stop: Arc<AtomicBool>,
517 message_channel: Sender<()>,
520 delay: Option<Duration>,
521 follow_sylinks: bool,
522}
523
524impl PollWatcher {
525 pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
527 Self::with_opt::<_, ()>(event_handler, config, None)
528 }
529
530 pub fn poll(&self) -> crate::Result<()> {
532 self.message_channel
533 .send(())
534 .map_err(|_| Error::generic("failed to send poll message"))?;
535 Ok(())
536 }
537
538 #[cfg(test)]
540 pub(crate) fn poll_sender(&self) -> Sender<()> {
541 self.message_channel.clone()
542 }
543
544 pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
548 event_handler: F,
549 config: Config,
550 scan_callback: G,
551 ) -> crate::Result<PollWatcher> {
552 Self::with_opt(event_handler, config, Some(scan_callback))
553 }
554
555 fn with_opt<F: EventHandler, G: ScanEventHandler>(
557 event_handler: F,
558 config: Config,
559 scan_callback: Option<G>,
560 ) -> crate::Result<PollWatcher> {
561 let data_builder =
562 DataBuilder::new(event_handler, config.compare_contents(), scan_callback);
563
564 let (tx, rx) = unbounded();
565
566 let poll_watcher = PollWatcher {
567 watches: Default::default(),
568 data_builder: Arc::new(Mutex::new(data_builder)),
569 want_to_stop: Arc::new(AtomicBool::new(false)),
570 delay: config.poll_interval(),
571 follow_sylinks: config.follow_symlinks(),
572 message_channel: tx,
573 };
574
575 poll_watcher.run(rx);
576
577 Ok(poll_watcher)
578 }
579
580 fn run(&self, rx: Receiver<()>) {
581 let watches = Arc::clone(&self.watches);
582 let data_builder = Arc::clone(&self.data_builder);
583 let want_to_stop = Arc::clone(&self.want_to_stop);
584 let delay = self.delay;
585
586 let result = thread::Builder::new()
587 .name("notify-rs poll loop".to_string())
588 .spawn(move || {
589 loop {
590 if want_to_stop.load(Ordering::SeqCst) {
591 break;
592 }
593
594 if let (Ok(mut watches), Ok(mut data_builder)) =
599 (watches.lock(), data_builder.lock())
600 {
601 data_builder.update_timestamp();
602
603 let vals = watches.values_mut();
604 for watch_data in vals {
605 watch_data.rescan(&mut data_builder);
606 }
607 }
608 let result = if let Some(delay) = delay {
610 rx.recv_timeout(delay).or_else(|e| match e {
611 mpsc::RecvTimeoutError::Timeout => Ok(()),
612 mpsc::RecvTimeoutError::Disconnected => Err(mpsc::RecvError),
613 })
614 } else {
615 rx.recv()
616 };
617 if let Err(e) = result {
618 tracing::error!(?e, "failed to receive poll message");
619 }
620 }
621 });
622 if let Err(e) = result {
623 tracing::error!(?e, "failed to start poll watcher thread");
624 }
625 }
626
627 fn watch_inner(&mut self, path: &Path, watch_mode: WatchMode) {
632 if let (Ok(mut watches), Ok(mut data_builder)) =
636 (self.watches.lock(), self.data_builder.lock())
637 {
638 data_builder.update_timestamp();
639
640 let watch_data = data_builder.build_watch_data(
641 path.to_path_buf(),
642 watch_mode.recursive_mode.is_recursive(),
643 self.follow_sylinks,
644 );
645
646 if let Some(watch_data) = watch_data {
648 watches.insert(path.to_path_buf(), watch_data);
649 }
650 }
651 }
652
653 fn unwatch_inner(&mut self, path: &Path) -> crate::Result<()> {
657 self.watches
659 .lock()
660 .unwrap()
661 .remove(path)
662 .map(|_| ())
663 .ok_or_else(crate::Error::watch_not_found)
664 }
665}
666
667impl Watcher for PollWatcher {
668 #[tracing::instrument(level = "debug", skip(event_handler))]
670 fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
671 Self::new(event_handler, config)
672 }
673
674 #[tracing::instrument(level = "debug", skip(self))]
675 fn watch(&mut self, path: &Path, watch_mode: WatchMode) -> crate::Result<()> {
676 self.watch_inner(path, watch_mode);
677
678 Ok(())
679 }
680
681 #[tracing::instrument(level = "debug", skip(self))]
682 fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
683 self.unwatch_inner(path)
684 }
685
686 fn kind() -> crate::WatcherKind {
687 crate::WatcherKind::PollWatcher
688 }
689}
690
691impl Drop for PollWatcher {
692 fn drop(&mut self) {
693 self.want_to_stop.store(true, Ordering::Relaxed);
694 }
695}
696
697#[cfg(test)]
698mod tests {
699 use super::PollWatcher;
700 use crate::{Error, ErrorKind, RecursiveMode, TargetMode, WatchMode, Watcher, test::*};
701
702 fn watcher() -> (TestWatcher<PollWatcher>, Receiver) {
703 poll_watcher_channel()
704 }
705
706 #[test]
707 fn poll_watcher_is_send_and_sync() {
708 fn check<T: Send + Sync>() {}
709 check::<PollWatcher>();
710 }
711
712 #[test]
713 fn create_file() {
714 let tmpdir = testdir();
715 let (mut watcher, mut rx) = watcher();
716 watcher.watch_recursively(&tmpdir);
717
718 let path = tmpdir.path().join("entry");
719 std::fs::File::create_new(&path).expect("Unable to create");
720
721 rx.sleep_until_exists(&path);
722 rx.wait_ordered_exact([expected(&path).create_any()]);
723 }
724
725 #[test]
726 #[ignore = "not implemented"]
727 fn create_self_file() {
728 let tmpdir = testdir();
729 let (mut watcher, mut rx) = watcher();
730
731 let path = tmpdir.path().join("entry");
732
733 watcher.watch_nonrecursively(&path);
734
735 std::fs::File::create_new(&path).expect("create");
736
737 rx.sleep_until_exists(&path);
738 rx.wait_ordered_exact([expected(&path).create_any()]);
739 }
740
741 #[test]
742 #[ignore = "not implemented"]
743 fn create_self_file_no_track() {
744 let tmpdir = testdir();
745 let (mut watcher, _) = watcher();
746
747 let path = tmpdir.path().join("entry");
748
749 let result = watcher.watcher.watch(
750 &path,
751 WatchMode {
752 recursive_mode: RecursiveMode::NonRecursive,
753 target_mode: TargetMode::NoTrack,
754 },
755 );
756 assert!(matches!(
757 result,
758 Err(Error {
759 paths: _,
760 kind: ErrorKind::PathNotFound
761 })
762 ));
763 }
764
765 #[test]
766 #[ignore = "TODO: not implemented"]
767 fn create_self_file_nested() {
768 let tmpdir = testdir();
769 let (mut watcher, mut rx) = watcher();
770
771 let path = tmpdir.path().join("entry/nested");
772
773 watcher.watch_nonrecursively(&path);
774
775 std::fs::create_dir_all(path.parent().unwrap()).expect("create");
776 std::fs::File::create_new(&path).expect("create");
777
778 rx.wait_ordered_exact([expected(&path).create_file()]);
779 }
780
781 #[test]
782 fn create_dir() {
783 let tmpdir = testdir();
784 let (mut watcher, mut rx) = watcher();
785 watcher.watch_recursively(&tmpdir);
786
787 let path = tmpdir.path().join("entry");
788 std::fs::create_dir(&path).expect("Unable to create");
789
790 rx.sleep_until_exists(&path);
791 rx.wait_ordered_exact([expected(&path).create_any()]);
792 }
793
794 #[test]
795 fn modify_file() {
796 let tmpdir = testdir();
797 let (mut watcher, mut rx) = watcher();
798 let path = tmpdir.path().join("entry");
799 std::fs::File::create_new(&path).expect("Unable to create");
800
801 watcher.watch_recursively(&tmpdir);
802 std::fs::write(&path, b"123").expect("Unable to write");
803
804 assert!(
805 rx.sleep_until(|| std::fs::read_to_string(&path).is_ok_and(|content| content == "123")),
806 "the file wasn't modified"
807 );
808 rx.wait_ordered_exact([expected(&path).modify_data_any()]);
809 }
810
811 #[test]
812 fn rename_file() {
813 let tmpdir = testdir();
814 let (mut watcher, mut rx) = watcher();
815 let path = tmpdir.path().join("entry");
816 let new_path = tmpdir.path().join("new_entry");
817 std::fs::File::create_new(&path).expect("Unable to create");
818
819 watcher.watch_recursively(&tmpdir);
820 std::fs::rename(&path, &new_path).expect("Unable to remove");
821
822 rx.sleep_while_exists(&path);
823 rx.sleep_until_exists(&new_path);
824
825 rx.wait_unordered_exact([
826 expected(&path).remove_any(),
827 expected(&new_path).create_any(),
828 ]);
829 }
830
831 #[test]
832 #[ignore = "TODO: not implemented"]
833 fn rename_self_file() {
834 let tmpdir = testdir();
835 let (mut watcher, mut rx) = watcher();
836
837 let path = tmpdir.path().join("entry");
838 std::fs::File::create_new(&path).expect("create");
839
840 watcher.watch_nonrecursively(&path);
841 let new_path = tmpdir.path().join("renamed");
842
843 std::fs::rename(&path, &new_path).expect("rename");
844
845 rx.sleep_while_exists(&path);
846 rx.sleep_until_exists(&new_path);
847
848 rx.wait_unordered_exact([expected(&path).remove_any()])
849 .ensure_no_tail();
850
851 std::fs::rename(&new_path, &path).expect("rename2");
852
853 rx.sleep_until_exists(&new_path);
854 rx.sleep_while_exists(&path);
855
856 rx.wait_unordered_exact([expected(&path).create_any()])
857 .ensure_no_tail();
858 }
859
860 #[test]
861 #[ignore = "TODO: not implemented"]
862 fn rename_self_file_no_track() {
863 let tmpdir = testdir();
864 let (mut watcher, mut rx) = watcher();
865
866 let path = tmpdir.path().join("entry");
867 std::fs::File::create_new(&path).expect("create");
868
869 watcher.watch(
870 &path,
871 WatchMode {
872 recursive_mode: RecursiveMode::NonRecursive,
873 target_mode: TargetMode::NoTrack,
874 },
875 );
876
877 let new_path = tmpdir.path().join("renamed");
878
879 std::fs::rename(&path, &new_path).expect("rename");
880
881 rx.sleep_while_exists(&path);
882 rx.sleep_until_exists(&new_path);
883
884 rx.wait_unordered_exact([expected(&path).remove_any()])
885 .ensure_no_tail();
886
887 let result = watcher.watcher.watch(
888 &path,
889 WatchMode {
890 recursive_mode: RecursiveMode::NonRecursive,
891 target_mode: TargetMode::NoTrack,
892 },
893 );
894 assert!(matches!(
895 result,
896 Err(Error {
897 paths: _,
898 kind: ErrorKind::PathNotFound
899 })
900 ));
901 }
902
903 #[test]
904 fn delete_file() {
905 let tmpdir = testdir();
906 let (mut watcher, mut rx) = watcher();
907 let path = tmpdir.path().join("entry");
908 std::fs::File::create_new(&path).expect("Unable to create");
909
910 watcher.watch_recursively(&tmpdir);
911 std::fs::remove_file(&path).expect("Unable to remove");
912
913 rx.sleep_while_exists(&path);
914 rx.wait_ordered_exact([expected(&path).remove_any()]);
915 }
916
917 #[test]
918 #[ignore = "TODO: not implemented"]
919 fn delete_self_file() {
920 let tmpdir = testdir();
921 let (mut watcher, mut rx) = watcher();
922 let path = tmpdir.path().join("entry");
923 std::fs::File::create_new(&path).expect("Unable to create");
924
925 watcher.watch_nonrecursively(&path);
926
927 std::fs::remove_file(&path).expect("Unable to remove");
928
929 rx.sleep_while_exists(&path);
930 rx.wait_ordered_exact([expected(&path).remove_any()]);
931
932 std::fs::write(&path, "").expect("write");
933
934 rx.sleep_until_exists(&path);
935 rx.wait_ordered_exact([expected(&path).create_file()]);
936 }
937
938 #[test]
939 #[ignore = "TODO: not implemented"]
940 fn delete_self_file_no_track() {
941 let tmpdir = testdir();
942 let (mut watcher, mut rx) = watcher();
943 let path = tmpdir.path().join("entry");
944 std::fs::File::create_new(&path).expect("Unable to create");
945
946 watcher.watch(
947 &path,
948 WatchMode {
949 recursive_mode: RecursiveMode::NonRecursive,
950 target_mode: TargetMode::NoTrack,
951 },
952 );
953
954 std::fs::remove_file(&path).expect("Unable to remove");
955
956 rx.sleep_while_exists(&path);
957 rx.wait_ordered_exact([expected(&path).remove_any()]);
958
959 std::fs::write(&path, "").expect("write");
960
961 rx.ensure_empty_with_wait();
962 }
963
964 #[test]
965 fn create_write_overwrite() {
966 let tmpdir = testdir();
967 let (mut watcher, mut rx) = watcher();
968 let overwritten_file = tmpdir.path().join("overwritten_file");
969 let overwriting_file = tmpdir.path().join("overwriting_file");
970 std::fs::write(&overwritten_file, "123").expect("write1");
971
972 watcher.watch_nonrecursively(&tmpdir);
973
974 std::fs::File::create(&overwriting_file).expect("create");
975 std::fs::write(&overwriting_file, "321").expect("write2");
976 std::fs::rename(&overwriting_file, &overwritten_file).expect("rename");
977
978 rx.sleep_while_exists(&overwriting_file);
979 assert!(
980 rx.sleep_until(
981 || std::fs::read_to_string(&overwritten_file).is_ok_and(|cnt| cnt == "321")
982 ),
983 "file {overwritten_file:?} was not replaced"
984 );
985
986 rx.wait_unordered([expected(&overwritten_file).modify_data_any()]);
987 }
988}