1use crate::{
7 Config, Error, EventHandler, PathsMut, Receiver, Result, Sender, WatchMode, Watcher,
8 poll::data::WatchData, unbounded,
9};
10use std::{
11 path::{Path, PathBuf},
12 sync::mpsc,
13 thread,
14 time::Duration,
15};
16
17pub type ScanEvent = crate::Result<PathBuf>;
19
20pub trait ScanEventHandler: Send + 'static {
25 fn handle_event(&mut self, event: ScanEvent);
27}
28
29impl<F> ScanEventHandler for F
30where
31 F: FnMut(ScanEvent) + Send + 'static,
32{
33 fn handle_event(&mut self, event: ScanEvent) {
34 (self)(event);
35 }
36}
37
38#[cfg(feature = "crossbeam-channel")]
39impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
40 fn handle_event(&mut self, event: ScanEvent) {
41 let result = self.send(event);
42 if let Err(e) = result {
43 tracing::error!(?e, "failed to send scan event result");
44 }
45 }
46}
47
48#[cfg(feature = "flume")]
49impl ScanEventHandler for flume::Sender<ScanEvent> {
50 fn handle_event(&mut self, event: ScanEvent) {
51 let result = self.send(event);
52 if let Err(e) = result {
53 tracing::error!(?e, "failed to send scan event result");
54 }
55 }
56}
57
58impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
59 fn handle_event(&mut self, event: ScanEvent) {
60 let result = self.send(event);
61 if let Err(e) = result {
62 tracing::error!(?e, "failed to send scan event result");
63 }
64 }
65}
66
67impl ScanEventHandler for () {
68 fn handle_event(&mut self, _event: ScanEvent) {}
69}
70
71use data::DataBuilder;
72mod data {
73 use crate::{
74 Error, EventHandler, Result, WatchMode,
75 consolidating_path_trie::ConsolidatingPathTrie,
76 event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
77 };
78 use std::{
79 cell::RefCell,
80 collections::{HashMap, hash_map::RandomState},
81 fmt::{self, Debug},
82 fs::{File, FileType, Metadata},
83 hash::{BuildHasher, Hasher},
84 io::{self, Read},
85 path::{Path, PathBuf},
86 time::Instant,
87 };
88 use walkdir::WalkDir;
89
90 use super::ScanEventHandler;
91
92 fn system_time_to_seconds(time: std::time::SystemTime) -> i64 {
93 #[expect(clippy::cast_possible_wrap)]
94 match time.duration_since(std::time::SystemTime::UNIX_EPOCH) {
95 Ok(d) => d.as_secs() as i64,
96 Err(e) => -(e.duration().as_secs() as i64),
97 }
98 }
99
100 pub(super) struct DataBuilder {
102 emitter: EventEmitter,
103 scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
104
105 build_hasher: Option<RandomState>,
108
109 now: Instant,
111 }
112
113 impl DataBuilder {
114 pub(super) fn new<F, G>(
115 event_handler: F,
116 compare_content: bool,
117 scan_emitter: Option<G>,
118 ) -> Self
119 where
120 F: EventHandler,
121 G: ScanEventHandler,
122 {
123 let scan_emitter = match scan_emitter {
124 None => None,
125 Some(v) => {
126 let intermediate: Box<RefCell<dyn ScanEventHandler>> =
128 Box::new(RefCell::new(v));
129 Some(intermediate)
130 }
131 };
132 Self {
133 emitter: EventEmitter::new(event_handler),
134 scan_emitter,
135 build_hasher: compare_content.then(RandomState::default),
136 now: Instant::now(),
137 }
138 }
139
140 pub(super) fn update_timestamp(&mut self) {
142 self.now = Instant::now();
143 }
144
145 fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
147 PathData::new(self, meta_path)
148 }
149 }
150
151 impl Debug for DataBuilder {
152 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
153 f.debug_struct("DataBuilder")
154 .field("build_hasher", &self.build_hasher)
155 .field("now", &self.now)
156 .finish_non_exhaustive()
157 }
158 }
159
160 #[derive(Debug)]
161 struct WatchHandlers {
162 current: HashMap<PathBuf, bool>,
163 next: HashMap<PathBuf, bool>,
164 is_stale: bool,
165 }
166
167 impl WatchHandlers {
168 fn new() -> Self {
169 Self {
170 current: HashMap::new(),
171 next: HashMap::new(),
172 is_stale: false,
173 }
174 }
175
176 fn recalculate(&mut self, watches: &HashMap<PathBuf, WatchMode>) {
178 self.next.clear();
179 self.is_stale = true;
180
181 let mut trie = ConsolidatingPathTrie::new(false);
182 for (path, mode) in watches {
183 if mode.recursive_mode == crate::RecursiveMode::Recursive {
184 trie.insert(path);
185 }
186 }
187 for (path, mode) in watches {
189 if mode.recursive_mode != crate::RecursiveMode::Recursive {
190 self.next.insert(path.clone(), false);
191 }
192 }
193 for path in trie.values() {
195 self.next.insert(path, true);
196 }
197 }
198
199 fn use_handlers(
200 &mut self,
201 ) -> (
202 &HashMap<PathBuf, bool>,
203 Option<HashMap<PathBuf, bool>>,
204 ) {
205 if self.is_stale {
206 let old_next = std::mem::take(&mut self.next);
207 let old_current = std::mem::replace(&mut self.current, old_next);
208 self.is_stale = false;
209 return (&self.current, Some(old_current));
210 }
211 (&self.current, None)
212 }
213 }
214
215 #[derive(Debug)]
216 pub(super) struct WatchData {
217 follow_symlinks: bool,
219
220 watches: HashMap<PathBuf, WatchMode>,
222 watch_handlers: WatchHandlers,
223 all_path_data: HashMap<PathBuf, PathData>,
224 }
225
226 impl WatchData {
227 pub fn new(follow_symlinks: bool) -> Self {
229 Self {
230 follow_symlinks,
231 watches: HashMap::new(),
232 watch_handlers: WatchHandlers::new(),
233 all_path_data: HashMap::new(),
234 }
235 }
236
237 pub fn add_watch(&mut self, path: PathBuf, mode: WatchMode) -> Result<()> {
238 if mode.target_mode == crate::TargetMode::NoTrack && !path.exists() {
239 return Err(crate::Error::path_not_found().add_path(path));
240 }
241
242 self.watches.insert(path, mode);
243 self.watch_handlers.recalculate(&self.watches);
244 Ok(())
245 }
246
247 pub fn add_watch_multiple(&mut self, paths: Vec<(PathBuf, WatchMode)>) -> Result<()> {
248 for (path, mode) in paths {
249 if mode.target_mode == crate::TargetMode::NoTrack && !path.exists() {
250 return Err(crate::Error::path_not_found().add_path(path));
251 }
252
253 self.watches.insert(path, mode);
254 }
255 self.watch_handlers.recalculate(&self.watches);
256 Ok(())
257 }
258
259 pub fn remove_watch(&mut self, path: &Path) -> Result<()> {
260 self.watches.remove(path).ok_or(Error::watch_not_found())?;
261 self.watch_handlers.recalculate(&self.watches);
262 Ok(())
263 }
264
265 pub(super) fn rescan(&mut self, data_builder: &DataBuilder) {
271 let (watch_handlers, old_watch_handlers) = self.watch_handlers.use_handlers();
272
273 for (path, new_path_data) in
275 Self::scan_all_path_data(data_builder, watch_handlers, self.follow_symlinks)
276 {
277 let old_path_data = self
278 .all_path_data
279 .insert(path.clone(), new_path_data.clone());
280
281 let is_initial = old_watch_handlers
282 .as_ref()
283 .is_some_and(|old_watch_handlers| {
284 !old_watch_handlers.contains_key(&path)
285 && !path.ancestors().skip(1).any(|ancestor| {
286 old_watch_handlers
287 .get(ancestor)
288 .is_some_and(|is_recursive| *is_recursive)
289 })
290 });
291 if is_initial {
292 if let Some(ref emitter) = data_builder.scan_emitter {
294 emitter.borrow_mut().handle_event(Ok(path.clone()));
295 }
296 } else {
297 let event = PathData::compare_to_event(
299 path,
300 old_path_data.as_ref(),
301 Some(&new_path_data),
302 );
303 if let Some(event) = event {
304 data_builder.emitter.emit_ok(event);
305 }
306 }
307 }
308
309 let mut disappeared_paths = Vec::new();
311 for (path, path_data) in &self.all_path_data {
312 if path_data.last_check < data_builder.now {
313 disappeared_paths.push(path.clone());
314 }
315 }
316
317 for path in disappeared_paths {
319 let old_path_data = self.all_path_data.remove(&path);
320
321 let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
323 if let Some(event) = event {
324 data_builder.emitter.emit_ok(event);
325 }
326 }
327 }
328
329 fn scan_all_path_data(
335 data_builder: &DataBuilder,
336 watch_handlers: &HashMap<PathBuf, bool>,
337 follow_symlinks: bool,
338 ) -> impl Iterator<Item = (PathBuf, PathData)> {
339 tracing::trace!("rescanning");
340
341 watch_handlers.iter().flat_map(move |(path, is_recursive)| {
342 tracing::trace!(?path, is_recursive, "scanning watch handler");
343
344 WalkDir::new(path)
349 .follow_links(follow_symlinks)
350 .max_depth(if *is_recursive { usize::MAX } else { 1 })
351 .into_iter()
352 .filter_map(|entry_res| match entry_res {
353 Ok(entry) => Some(entry),
354 Err(err) => {
355 tracing::warn!("walkdir error scanning {err:?}");
356
357 if let Some(io_error) = err.io_error() {
358 if io_error.kind() == io::ErrorKind::NotFound {
359 return None;
360 }
361 let new_io_error = io::Error::new(io_error.kind(), err.to_string());
363 data_builder.emitter.emit_io_err(new_io_error, err.path());
364 } else {
365 let crate_err =
366 Error::new(crate::ErrorKind::Generic(err.to_string()));
367 data_builder.emitter.emit(Err(crate_err));
368 }
369 None
370 }
371 })
372 .filter_map(move |entry| match entry.metadata() {
373 Ok(metadata) => {
374 let path = entry.into_path();
375 let meta_path = MetaPath::from_parts_unchecked(path, metadata);
376 let data_path = data_builder.build_path_data(&meta_path);
377
378 Some((meta_path.into_path(), data_path))
379 }
380 Err(err) => {
381 if let Some(io_error) = err.io_error()
382 && io_error.kind() == io::ErrorKind::NotFound
383 {
384 return None;
385 }
386
387 let path = entry.into_path();
389 data_builder.emitter.emit_io_err(err, Some(path));
390
391 None
392 }
393 })
394 })
395 }
396 }
397
398 #[derive(Debug, Clone)]
402 struct PathData {
403 mtime: i64,
405
406 file_type: FileType,
407
408 hash: Option<u64>,
411
412 last_check: Instant,
414 }
415
416 impl PathData {
417 fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
419 let metadata = meta_path.metadata();
420
421 PathData {
422 mtime: metadata.modified().map_or(0, system_time_to_seconds),
423 file_type: metadata.file_type(),
424 hash: data_builder
425 .build_hasher
426 .as_ref()
427 .filter(|_| metadata.is_file())
428 .and_then(|build_hasher| {
429 Self::get_content_hash(build_hasher, meta_path.path()).ok()
430 }),
431
432 last_check: data_builder.now,
433 }
434 }
435
436 fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
438 let mut hasher = build_hasher.build_hasher();
439 let mut file = File::open(path)?;
440 let mut buf = [0; 512];
441
442 loop {
443 let n = match file.read(&mut buf) {
444 Ok(0) => break,
445 Ok(len) => len,
446 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
447 Err(e) => return Err(e),
448 };
449
450 hasher.write(&buf[..n]);
451 }
452
453 Ok(hasher.finish())
454 }
455
456 fn get_create_kind(&self) -> CreateKind {
458 #[expect(clippy::filetype_is_file)]
459 if self.file_type.is_dir() {
460 CreateKind::Folder
461 } else if self.file_type.is_file() {
462 CreateKind::File
463 } else {
464 CreateKind::Any
465 }
466 }
467
468 fn get_remove_kind(&self) -> RemoveKind {
470 #[expect(clippy::filetype_is_file)]
471 if self.file_type.is_dir() {
472 RemoveKind::Folder
473 } else if self.file_type.is_file() {
474 RemoveKind::File
475 } else {
476 RemoveKind::Any
477 }
478 }
479
480 fn compare_to_event<P>(
482 path: P,
483 old: Option<&PathData>,
484 new: Option<&PathData>,
485 ) -> Option<Event>
486 where
487 P: Into<PathBuf>,
488 {
489 match (old, new) {
490 (Some(old), Some(new)) => {
491 if new.mtime > old.mtime {
492 Some(EventKind::Modify(ModifyKind::Metadata(
493 MetadataKind::WriteTime,
494 )))
495 } else if new.hash != old.hash {
496 Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
497 } else {
498 None
499 }
500 }
501 (None, Some(new)) => Some(EventKind::Create(new.get_create_kind())),
502 (Some(old), None) => Some(EventKind::Remove(old.get_remove_kind())),
503 (None, None) => None,
504 }
505 .map(|event_kind| Event::new(event_kind).add_path(path.into()))
506 }
507 }
508
509 #[derive(Debug)]
515 pub(super) struct MetaPath {
516 path: PathBuf,
517 metadata: Metadata,
518 }
519
520 impl MetaPath {
521 fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
527 Self { path, metadata }
528 }
529
530 fn path(&self) -> &Path {
531 &self.path
532 }
533
534 fn metadata(&self) -> &Metadata {
535 &self.metadata
536 }
537
538 fn into_path(self) -> PathBuf {
539 self.path
540 }
541 }
542
543 struct EventEmitter(
545 Box<RefCell<dyn EventHandler>>,
548 );
549
550 impl EventEmitter {
551 fn new<F: EventHandler>(event_handler: F) -> Self {
552 Self(Box::new(RefCell::new(event_handler)))
553 }
554
555 fn emit(&self, event: crate::Result<Event>) {
557 self.0.borrow_mut().handle_event(event);
558 }
559
560 fn emit_ok(&self, event: Event) {
562 self.emit(Ok(event));
563 }
564
565 fn emit_io_err<E, P>(&self, err: E, path: Option<P>)
567 where
568 E: Into<io::Error>,
569 P: Into<PathBuf>,
570 {
571 let e = crate::Error::io(err.into());
572 if let Some(path) = path {
573 self.emit(Err(e.add_path(path.into())));
574 } else {
575 self.emit(Err(e));
576 }
577 }
578 }
579}
580
581enum EventLoopMsg {
582 AddWatch(PathBuf, WatchMode, Sender<Result<()>>),
583 AddWatchMultiple(Vec<(PathBuf, WatchMode)>, Sender<Result<()>>),
584 RemoveWatch(PathBuf, Sender<Result<()>>),
585 #[cfg(test)]
586 WaitNextScan(Sender<Result<()>>),
587 Poll,
589 Shutdown,
590}
591
592struct PollPathsMut<'a> {
593 inner: &'a mut PollWatcher,
594 add_paths: Vec<(PathBuf, WatchMode)>,
595}
596impl<'a> PollPathsMut<'a> {
597 fn new(watcher: &'a mut PollWatcher) -> Self {
598 Self {
599 inner: watcher,
600 add_paths: Vec::new(),
601 }
602 }
603}
604impl PathsMut for PollPathsMut<'_> {
605 #[tracing::instrument(level = "debug", skip(self))]
606 fn add(&mut self, path: &Path, watch_mode: WatchMode) -> Result<()> {
607 self.add_paths.push((path.to_owned(), watch_mode));
608 Ok(())
609 }
610
611 #[tracing::instrument(level = "debug", skip(self))]
612 fn remove(&mut self, path: &Path) -> Result<()> {
613 self.inner.unwatch_inner(path)
614 }
615
616 #[tracing::instrument(level = "debug", skip(self))]
617 fn commit(self: Box<Self>) -> Result<()> {
618 let paths = self.add_paths;
619 self.inner.watch_multiple_inner(paths)
620 }
621}
622
623#[derive(Debug)]
630pub struct PollWatcher {
631 delay: Option<Duration>,
632 follow_symlinks: bool,
633
634 event_loop_tx: Sender<EventLoopMsg>,
635}
636
637impl PollWatcher {
638 pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
640 Ok(Self::with_opt::<_, ()>(event_handler, config, None))
641 }
642
643 pub fn poll(&self) -> crate::Result<()> {
645 self.event_loop_tx
646 .send(EventLoopMsg::Poll)
647 .map_err(|_| Error::generic("failed to send poll message"))?;
648 Ok(())
649 }
650
651 #[cfg(test)]
652 pub(crate) fn wait_next_scan(&self) -> crate::Result<()> {
653 let (tx, rx) = unbounded();
654 self.event_loop_tx
655 .send(EventLoopMsg::WaitNextScan(tx))
656 .map_err(|_| Error::generic("failed to send WaitNextScan message"))?;
657 rx.recv().unwrap()
658 }
659
660 #[cfg(test)]
662 pub(crate) fn poll_sender(&self) -> Sender<()> {
663 let inner_tx = self.event_loop_tx.clone();
664 let (tx, rx) = unbounded();
665 thread::Builder::new()
666 .name("notify-rs poll loop".to_string())
667 .spawn(move || {
668 for () in &rx {
669 if let Err(err) = inner_tx.send(EventLoopMsg::Poll) {
670 tracing::error!(?err, "failed to send poll message");
671 }
672 }
673 })
674 .unwrap();
675 tx
676 }
677
678 pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
682 event_handler: F,
683 config: Config,
684 scan_callback: G,
685 ) -> crate::Result<PollWatcher> {
686 Ok(Self::with_opt(event_handler, config, Some(scan_callback)))
687 }
688
689 fn with_opt<F: EventHandler, G: ScanEventHandler>(
691 event_handler: F,
692 config: Config,
693 scan_callback: Option<G>,
694 ) -> PollWatcher {
695 let (tx, rx) = unbounded();
696
697 let poll_watcher = PollWatcher {
698 delay: config.poll_interval(),
699 follow_symlinks: config.follow_symlinks(),
700
701 event_loop_tx: tx,
702 };
703
704 let data_builder =
705 DataBuilder::new(event_handler, config.compare_contents(), scan_callback);
706 poll_watcher.run(rx, data_builder);
707
708 poll_watcher
709 }
710
711 fn run(&self, rx: Receiver<EventLoopMsg>, mut data_builder: DataBuilder) {
712 let delay = self.delay;
713 let follow_symlinks = self.follow_symlinks;
714
715 let result = thread::Builder::new()
716 .name("notify-rs poll loop".to_string())
717 .spawn(move || {
718 let mut watch_data = WatchData::new(follow_symlinks);
719
720 loop {
721 data_builder.update_timestamp();
722 watch_data.rescan(&data_builder);
723
724 let result = if let Some(delay) = delay {
726 rx.recv_timeout(delay).or_else(|e| match e {
727 mpsc::RecvTimeoutError::Timeout => Ok(EventLoopMsg::Poll),
728 mpsc::RecvTimeoutError::Disconnected => Err(mpsc::RecvError),
729 })
730 } else {
731 rx.recv()
732 };
733 match result {
734 Ok(EventLoopMsg::AddWatch(path, mode, resp_tx)) => {
735 let result = resp_tx.send(watch_data.add_watch(path, mode));
736 if let Err(e) = result {
737 tracing::error!(?e, "failed to send AddWatch response");
738 }
739 }
740 Ok(EventLoopMsg::AddWatchMultiple(paths, resp_tx)) => {
741 let result = resp_tx.send(watch_data.add_watch_multiple(paths));
742 if let Err(e) = result {
743 tracing::error!(?e, "failed to send AddWatchMultiple response");
744 }
745 }
746 Ok(EventLoopMsg::RemoveWatch(path, resp_tx)) => {
747 let result = resp_tx.send(watch_data.remove_watch(&path));
748 if let Err(e) = result {
749 tracing::error!(?e, "failed to send RemoveWatch response");
750 }
751 }
752 Ok(EventLoopMsg::Poll) => {
753 }
755 #[cfg(test)]
756 Ok(EventLoopMsg::WaitNextScan(resp_tx)) => {
757 let result = resp_tx.send(Ok(()));
758 if let Err(e) = result {
759 tracing::error!(?e, "failed to send WaitNextScan response");
760 }
761 }
762 Ok(EventLoopMsg::Shutdown) => {
763 break;
764 }
765 Err(e) => {
766 tracing::error!(?e, "failed to receive poll message");
767 }
768 }
769 }
770 });
771 if let Err(e) = result {
772 tracing::error!(?e, "failed to start poll watcher thread");
773 }
774 }
775
776 fn watch_inner(&self, path: &Path, watch_mode: WatchMode) -> crate::Result<()> {
778 let (tx, rx) = unbounded();
779 self.event_loop_tx
780 .send(EventLoopMsg::AddWatch(path.to_path_buf(), watch_mode, tx))?;
781 rx.recv().unwrap()
782 }
783
784 fn watch_multiple_inner(&self, paths: Vec<(PathBuf, WatchMode)>) -> crate::Result<()> {
785 let (tx, rx) = unbounded();
786 self.event_loop_tx
787 .send(EventLoopMsg::AddWatchMultiple(paths, tx))?;
788 rx.recv().unwrap()
789 }
790
791 fn unwatch_inner(&self, path: &Path) -> crate::Result<()> {
795 let (tx, rx) = unbounded();
796 self.event_loop_tx
797 .send(EventLoopMsg::RemoveWatch(path.to_path_buf(), tx))?;
798 rx.recv().unwrap()
799 }
800}
801
802impl Watcher for PollWatcher {
803 #[tracing::instrument(level = "debug", skip(event_handler))]
805 fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
806 Self::new(event_handler, config)
807 }
808
809 #[tracing::instrument(level = "debug", skip(self))]
810 fn watch(&mut self, path: &Path, watch_mode: WatchMode) -> crate::Result<()> {
811 self.watch_inner(path, watch_mode)
812 }
813
814 #[tracing::instrument(level = "debug", skip(self))]
815 fn paths_mut<'me>(&'me mut self) -> Box<dyn PathsMut + 'me> {
816 Box::new(PollPathsMut::new(self))
817 }
818
819 #[tracing::instrument(level = "debug", skip(self))]
820 fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
821 self.unwatch_inner(path)
822 }
823
824 fn kind() -> crate::WatcherKind {
825 crate::WatcherKind::PollWatcher
826 }
827}
828
829impl Drop for PollWatcher {
830 fn drop(&mut self) {
831 let result = self.event_loop_tx.send(EventLoopMsg::Shutdown);
832 if let Err(e) = result {
833 tracing::error!(?e, "failed to send shutdown message to poll watcher thread");
834 }
835 }
836}
837
838#[cfg(test)]
839mod tests {
840 #[cfg(target_family = "wasm")]
841 use std::thread::sleep;
842 #[cfg(target_family = "wasm")]
843 use std::time::Duration;
844
845 use super::PollWatcher;
846 use crate::{Error, ErrorKind, RecursiveMode, TargetMode, WatchMode, Watcher, test::*};
847
848 fn watcher() -> (TestWatcher<PollWatcher>, Receiver) {
849 poll_watcher_channel()
850 }
851
852 #[test]
853 fn poll_watcher_is_send_and_sync() {
854 fn check<T: Send + Sync>() {}
855 check::<PollWatcher>();
856 }
857
858 #[test]
859 fn create_file() {
860 let tmpdir = testdir();
861 let (mut watcher, rx) = watcher();
862 watcher.watch_recursively(&tmpdir);
863 watcher.watcher.wait_next_scan().expect("wait next scan");
864
865 let path = tmpdir.path().join("entry");
866 std::fs::File::create_new(&path).expect("Unable to create");
867
868 rx.sleep_until_parent_contains(&path);
869 rx.sleep_until_exists(&path);
870
871 rx.wait_ordered_exact([expected(&path).create_file()]);
872 }
873
874 #[test]
875 fn create_self_file() {
876 let tmpdir = testdir();
877 let (mut watcher, rx) = watcher();
878
879 let path = tmpdir.path().join("entry");
880
881 watcher.watch_nonrecursively(&path);
882 watcher.watcher.wait_next_scan().expect("wait next scan");
883
884 std::fs::File::create_new(&path).expect("create");
885
886 rx.sleep_until_exists(&path);
887 rx.wait_ordered_exact([expected(&path).create_file()]);
888 }
889
890 #[test]
891 fn create_self_file_no_track() {
892 let tmpdir = testdir();
893 let (mut watcher, _) = watcher();
894
895 let path = tmpdir.path().join("entry");
896
897 let result = watcher.watcher.watch(
898 &path,
899 WatchMode {
900 recursive_mode: RecursiveMode::NonRecursive,
901 target_mode: TargetMode::NoTrack,
902 },
903 );
904 assert!(matches!(
905 result,
906 Err(Error {
907 paths: _,
908 kind: ErrorKind::PathNotFound
909 })
910 ));
911 }
912
913 #[test]
914 fn create_self_file_nested() {
915 let tmpdir = testdir();
916 let (mut watcher, rx) = watcher();
917
918 let path = tmpdir.path().join("entry/nested");
919
920 watcher.watch_nonrecursively(&path);
921 watcher.watcher.wait_next_scan().expect("wait next scan");
922
923 std::fs::create_dir_all(path.parent().unwrap()).expect("create");
924 std::fs::File::create_new(&path).expect("create");
925
926 rx.wait_ordered_exact([expected(&path).create_file()]);
927 }
928
929 #[test]
930 fn create_dir() {
931 let tmpdir = testdir();
932 let (mut watcher, rx) = watcher();
933 watcher.watch_recursively(&tmpdir);
934 watcher.watcher.wait_next_scan().expect("wait next scan");
935
936 let path = tmpdir.path().join("entry");
937 std::fs::create_dir(&path).expect("Unable to create");
938
939 rx.sleep_until_parent_contains(&path);
940 rx.sleep_until_exists(&path);
941
942 rx.wait_ordered_exact([expected(&path).create_folder()]);
943 }
944
945 #[test]
946 fn modify_file() {
947 let tmpdir = testdir();
948 let (mut watcher, rx) = watcher();
949 let path = tmpdir.path().join("entry");
950 std::fs::File::create_new(&path).expect("Unable to create");
951
952 rx.sleep_until_parent_contains(&path);
953
954 watcher.watch_recursively(&tmpdir);
955 watcher.watcher.wait_next_scan().expect("wait next scan");
956 std::fs::write(&path, b"123").expect("Unable to write");
957
958 assert!(
959 rx.sleep_until(|| std::fs::read_to_string(&path).is_ok_and(|content| content == "123")),
960 "the file wasn't modified"
961 );
962 rx.wait_ordered_exact([expected(&path).modify_data_any()]);
963 }
964
965 #[test]
966 fn rename_file() {
967 let tmpdir = testdir();
968 let (mut watcher, rx) = watcher();
969 let path = tmpdir.path().join("entry");
970 let new_path = tmpdir.path().join("new_entry");
971 std::fs::File::create_new(&path).expect("Unable to create");
972
973 rx.sleep_until_parent_contains(&path);
974
975 watcher.watch_recursively(&tmpdir);
976
977 watcher.watcher.wait_next_scan().expect("wait next scan");
978 std::fs::rename(&path, &new_path).expect("Unable to remove");
979
980 rx.sleep_while_exists(&path);
981 rx.sleep_until_exists(&new_path);
982
983 rx.sleep_while_parent_contains(&path);
984 rx.sleep_until_parent_contains(&new_path);
985
986 rx.wait_unordered_exact([
987 expected(&path).remove_file(),
988 expected(&new_path).create_file(),
989 ]);
990 }
991
992 #[test]
993 fn rename_self_file() {
994 let tmpdir = testdir();
995 let (mut watcher, rx) = watcher();
996
997 let path = tmpdir.path().join("entry");
998 std::fs::File::create_new(&path).expect("create");
999
1000 watcher.watch_nonrecursively(&path);
1001 watcher.watcher.wait_next_scan().expect("wait next scan");
1002 let new_path = tmpdir.path().join("renamed");
1003
1004 std::fs::rename(&path, &new_path).expect("rename");
1005
1006 rx.sleep_while_exists(&path);
1007 rx.sleep_until_exists(&new_path);
1008
1009 rx.wait_unordered_exact([expected(&path).remove_file()])
1010 .ensure_no_tail();
1011
1012 std::fs::rename(&new_path, &path).expect("rename2");
1013 watcher.watcher.wait_next_scan().expect("wait next scan");
1014
1015 rx.sleep_while_exists(&new_path);
1016 rx.sleep_until_exists(&path);
1017
1018 rx.wait_unordered_exact([expected(&path).create_file()])
1019 .ensure_no_tail();
1020 }
1021
1022 #[test]
1023 fn rename_self_file_no_track() {
1024 let tmpdir = testdir();
1025 let (mut watcher, rx) = watcher();
1026
1027 let path = tmpdir.path().join("entry");
1028 std::fs::File::create_new(&path).expect("create");
1029
1030 watcher.watch(
1031 &path,
1032 WatchMode {
1033 recursive_mode: RecursiveMode::NonRecursive,
1034 target_mode: TargetMode::NoTrack,
1035 },
1036 );
1037 watcher.watcher.wait_next_scan().expect("wait next scan");
1038
1039 let new_path = tmpdir.path().join("renamed");
1040
1041 std::fs::rename(&path, &new_path).expect("rename");
1042
1043 rx.sleep_while_exists(&path);
1044 rx.sleep_until_exists(&new_path);
1045
1046 #[cfg(target_family = "wasm")]
1047 sleep(Duration::from_millis(100));
1048
1049 rx.wait_unordered_exact([
1050 expected(&path).modify_data_any().optional(),
1051 expected(&path).remove_file(),
1052 ])
1053 .ensure_no_tail();
1054
1055 let result = watcher.watcher.watch(
1056 &path,
1057 WatchMode {
1058 recursive_mode: RecursiveMode::NonRecursive,
1059 target_mode: TargetMode::NoTrack,
1060 },
1061 );
1062 assert!(matches!(
1063 result,
1064 Err(Error {
1065 paths: _,
1066 kind: ErrorKind::PathNotFound
1067 })
1068 ));
1069 }
1070
1071 #[test]
1072 fn delete_file() {
1073 let tmpdir = testdir();
1074 let (mut watcher, rx) = watcher();
1075 let path = tmpdir.path().join("entry");
1076 std::fs::File::create_new(&path).expect("Unable to create");
1077
1078 rx.sleep_until_parent_contains(&path);
1079
1080 watcher.watch_recursively(&tmpdir);
1081 watcher.watcher.wait_next_scan().expect("wait next scan");
1082
1083 std::fs::remove_file(&path).expect("Unable to remove");
1084
1085 rx.sleep_while_exists(&path);
1086 rx.sleep_while_parent_contains(&path);
1087
1088 rx.wait_ordered_exact([
1089 expected(&path).modify_data_any().optional(),
1090 expected(&path).remove_file(),
1091 ]);
1092 }
1093
1094 #[test]
1095 fn delete_self_file() {
1096 let tmpdir = testdir();
1097 let (mut watcher, rx) = watcher();
1098 let path = tmpdir.path().join("entry");
1099 std::fs::File::create_new(&path).expect("Unable to create");
1100
1101 watcher.watch_nonrecursively(&path);
1102 watcher.watcher.wait_next_scan().expect("wait next scan");
1103
1104 std::fs::remove_file(&path).expect("Unable to remove");
1105
1106 rx.sleep_while_exists(&path);
1107 rx.wait_ordered_exact([
1108 expected(&path).modify_data_any().optional(),
1109 expected(&path).remove_file(),
1110 ]);
1111
1112 std::fs::write(&path, "").expect("write");
1113
1114 rx.sleep_until_exists(&path);
1115 rx.wait_ordered_exact([expected(&path).create_file()]);
1116 }
1117
1118 #[test]
1119 fn delete_self_file_no_track() {
1120 let tmpdir = testdir();
1121 let (mut watcher, rx) = watcher();
1122 let path = tmpdir.path().join("entry");
1123 std::fs::File::create_new(&path).expect("Unable to create");
1124
1125 watcher.watch(
1126 &path,
1127 WatchMode {
1128 recursive_mode: RecursiveMode::NonRecursive,
1129 target_mode: TargetMode::NoTrack,
1130 },
1131 );
1132 watcher.watcher.wait_next_scan().expect("wait next scan");
1133
1134 std::fs::remove_file(&path).expect("Unable to remove");
1135
1136 rx.sleep_while_exists(&path);
1137 rx.wait_ordered_exact([
1138 expected(&path).modify_data_any().optional(),
1139 expected(&path).remove_file(),
1140 ]);
1141
1142 #[cfg(target_family = "wasm")]
1143 sleep(Duration::from_millis(100));
1144
1145 std::fs::write(&path, "").expect("write");
1146
1147 rx.ensure_empty_with_wait();
1148 }
1149
1150 #[test]
1151 fn create_write_overwrite() {
1152 let tmpdir = testdir();
1153 let (mut watcher, rx) = watcher();
1154 let overwritten_file = tmpdir.path().join("overwritten_file");
1155 let overwriting_file = tmpdir.path().join("overwriting_file");
1156 std::fs::write(&overwritten_file, "123").expect("write1");
1157
1158 rx.sleep_until_parent_contains(&overwritten_file);
1159 rx.sleep_until_exists(&overwritten_file);
1160
1161 watcher.watch_nonrecursively(&tmpdir);
1162 watcher.watcher.wait_next_scan().expect("wait next scan");
1163
1164 std::fs::File::create(&overwriting_file).expect("create");
1165 std::fs::write(&overwriting_file, "321").expect("write2");
1166 std::fs::rename(&overwriting_file, &overwritten_file).expect("rename");
1167
1168 rx.sleep_while_exists(&overwriting_file);
1169 rx.sleep_while_parent_contains(&overwriting_file);
1170
1171 assert!(
1172 rx.sleep_until(
1173 || std::fs::read_to_string(&overwritten_file).is_ok_and(|cnt| cnt == "321")
1174 ),
1175 "file {overwritten_file:?} was not replaced"
1176 );
1177
1178 rx.wait_unordered([expected(&overwritten_file).modify_data_any()]);
1179 }
1180}