1use crate::{
7 Config, Error, EventHandler, PathsMut, Receiver, Result, Sender, WatchMode, Watcher,
8 poll::data::WatchData, unbounded,
9};
10use std::{
11 path::{Path, PathBuf},
12 sync::mpsc,
13 thread,
14 time::Duration,
15};
16
17pub type ScanEvent = crate::Result<PathBuf>;
19
20pub trait ScanEventHandler: Send + 'static {
25 fn handle_event(&mut self, event: ScanEvent);
27}
28
29impl<F> ScanEventHandler for F
30where
31 F: FnMut(ScanEvent) + Send + 'static,
32{
33 fn handle_event(&mut self, event: ScanEvent) {
34 (self)(event);
35 }
36}
37
38#[cfg(feature = "crossbeam-channel")]
39impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
40 fn handle_event(&mut self, event: ScanEvent) {
41 let result = self.send(event);
42 if let Err(e) = result {
43 tracing::error!(?e, "failed to send scan event result");
44 }
45 }
46}
47
48#[cfg(feature = "flume")]
49impl ScanEventHandler for flume::Sender<ScanEvent> {
50 fn handle_event(&mut self, event: ScanEvent) {
51 let result = self.send(event);
52 if let Err(e) = result {
53 tracing::error!(?e, "failed to send scan event result");
54 }
55 }
56}
57
58impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
59 fn handle_event(&mut self, event: ScanEvent) {
60 let result = self.send(event);
61 if let Err(e) = result {
62 tracing::error!(?e, "failed to send scan event result");
63 }
64 }
65}
66
67impl ScanEventHandler for () {
68 fn handle_event(&mut self, _event: ScanEvent) {}
69}
70
71use data::DataBuilder;
72mod data {
73 use crate::{
74 Error, EventHandler, Result, WatchMode,
75 consolidating_path_trie::ConsolidatingPathTrie,
76 event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
77 };
78 use std::{
79 cell::RefCell,
80 collections::{HashMap, hash_map::RandomState},
81 fmt::{self, Debug},
82 fs::{File, FileType, Metadata},
83 hash::{BuildHasher, Hasher},
84 io::{self, Read},
85 path::{Path, PathBuf},
86 time::Instant,
87 };
88 use walkdir::WalkDir;
89
90 use super::ScanEventHandler;
91
92 fn system_time_to_seconds(time: std::time::SystemTime) -> i64 {
93 #[expect(clippy::cast_possible_wrap)]
94 match time.duration_since(std::time::SystemTime::UNIX_EPOCH) {
95 Ok(d) => d.as_secs() as i64,
96 Err(e) => -(e.duration().as_secs() as i64),
97 }
98 }
99
100 pub(super) struct DataBuilder {
102 emitter: EventEmitter,
103 scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
104
105 build_hasher: Option<RandomState>,
108
109 now: Instant,
111 }
112
113 impl DataBuilder {
114 pub(super) fn new<F, G>(
115 event_handler: F,
116 compare_content: bool,
117 scan_emitter: Option<G>,
118 ) -> Self
119 where
120 F: EventHandler,
121 G: ScanEventHandler,
122 {
123 let scan_emitter = match scan_emitter {
124 None => None,
125 Some(v) => {
126 let intermediate: Box<RefCell<dyn ScanEventHandler>> =
128 Box::new(RefCell::new(v));
129 Some(intermediate)
130 }
131 };
132 Self {
133 emitter: EventEmitter::new(event_handler),
134 scan_emitter,
135 build_hasher: compare_content.then(RandomState::default),
136 now: Instant::now(),
137 }
138 }
139
140 pub(super) fn update_timestamp(&mut self) {
142 self.now = Instant::now();
143 }
144
145 fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
147 PathData::new(self, meta_path)
148 }
149 }
150
151 impl Debug for DataBuilder {
152 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
153 f.debug_struct("DataBuilder")
154 .field("build_hasher", &self.build_hasher)
155 .field("now", &self.now)
156 .finish_non_exhaustive()
157 }
158 }
159
160 #[derive(Debug)]
161 struct WatchHandlers {
162 current: HashMap<PathBuf, bool>,
163 next: HashMap<PathBuf, bool>,
164 is_stale: bool,
165 }
166
167 impl WatchHandlers {
168 fn new() -> Self {
169 Self {
170 current: HashMap::new(),
171 next: HashMap::new(),
172 is_stale: false,
173 }
174 }
175
176 fn recalculate(&mut self, watches: &HashMap<PathBuf, WatchMode>) {
178 self.next.clear();
179 self.is_stale = true;
180
181 let mut trie = ConsolidatingPathTrie::new(false);
182 for (path, mode) in watches {
183 if mode.recursive_mode == crate::RecursiveMode::Recursive {
184 trie.insert(path);
185 }
186 }
187 for (path, mode) in watches {
189 if mode.recursive_mode != crate::RecursiveMode::Recursive {
190 self.next.insert(path.clone(), false);
191 }
192 }
193 for path in trie.values() {
195 self.next.insert(path, true);
196 }
197 }
198
199 fn use_handlers(
200 &mut self,
201 ) -> (
202 &HashMap<PathBuf, bool>,
203 Option<HashMap<PathBuf, bool>>,
204 ) {
205 if self.is_stale {
206 let old_next = std::mem::take(&mut self.next);
207 let old_current = std::mem::replace(&mut self.current, old_next);
208 self.is_stale = false;
209 return (&self.current, Some(old_current));
210 }
211 (&self.current, None)
212 }
213 }
214
215 #[derive(Debug)]
216 pub(super) struct WatchData {
217 follow_symlinks: bool,
219
220 watches: HashMap<PathBuf, WatchMode>,
222 watch_handlers: WatchHandlers,
223 all_path_data: HashMap<PathBuf, PathData>,
224 }
225
226 impl WatchData {
227 pub fn new(follow_symlinks: bool) -> Self {
229 Self {
230 follow_symlinks,
231 watches: HashMap::new(),
232 watch_handlers: WatchHandlers::new(),
233 all_path_data: HashMap::new(),
234 }
235 }
236
237 pub fn add_watch(&mut self, path: PathBuf, mode: WatchMode) -> Result<()> {
238 if mode.target_mode == crate::TargetMode::NoTrack && !path.exists() {
239 return Err(crate::Error::path_not_found().add_path(path));
240 }
241
242 self.watches.insert(path, mode);
243 self.watch_handlers.recalculate(&self.watches);
244 Ok(())
245 }
246
247 pub fn add_watch_multiple(&mut self, paths: Vec<(PathBuf, WatchMode)>) -> Result<()> {
248 for (path, mode) in paths {
249 if mode.target_mode == crate::TargetMode::NoTrack && !path.exists() {
250 return Err(crate::Error::path_not_found().add_path(path));
251 }
252
253 self.watches.insert(path, mode);
254 }
255 self.watch_handlers.recalculate(&self.watches);
256 Ok(())
257 }
258
259 pub fn remove_watch(&mut self, path: &Path) -> Result<()> {
260 self.watches.remove(path).ok_or(Error::watch_not_found())?;
261 self.watch_handlers.recalculate(&self.watches);
262 Ok(())
263 }
264
265 pub(super) fn rescan(&mut self, data_builder: &DataBuilder) {
271 let (watch_handlers, old_watch_handlers) = self.watch_handlers.use_handlers();
272
273 for (path, new_path_data) in
275 Self::scan_all_path_data(data_builder, watch_handlers, self.follow_symlinks)
276 {
277 let old_path_data = self
278 .all_path_data
279 .insert(path.clone(), new_path_data.clone());
280
281 let is_initial = old_watch_handlers
282 .as_ref()
283 .is_some_and(|old_watch_handlers| {
284 !old_watch_handlers.contains_key(&path)
285 && !path.ancestors().skip(1).any(|ancestor| {
286 old_watch_handlers
287 .get(ancestor)
288 .is_some_and(|is_recursive| *is_recursive)
289 })
290 });
291 if is_initial {
292 if let Some(ref emitter) = data_builder.scan_emitter {
294 emitter.borrow_mut().handle_event(Ok(path.clone()));
295 }
296 } else {
297 let event = PathData::compare_to_event(
299 path,
300 old_path_data.as_ref(),
301 Some(&new_path_data),
302 );
303 if let Some(event) = event {
304 data_builder.emitter.emit_ok(event);
305 }
306 }
307 }
308
309 let mut disappeared_paths = Vec::new();
311 for (path, path_data) in &self.all_path_data {
312 if path_data.last_check < data_builder.now {
313 disappeared_paths.push(path.clone());
314 }
315 }
316
317 for path in disappeared_paths {
319 let old_path_data = self.all_path_data.remove(&path);
320
321 let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
323 if let Some(event) = event {
324 data_builder.emitter.emit_ok(event);
325 }
326 }
327 }
328
329 fn scan_all_path_data(
335 data_builder: &DataBuilder,
336 watch_handlers: &HashMap<PathBuf, bool>,
337 follow_symlinks: bool,
338 ) -> impl Iterator<Item = (PathBuf, PathData)> {
339 tracing::trace!("rescanning");
340
341 watch_handlers.iter().flat_map(move |(path, is_recursive)| {
342 tracing::trace!(?path, is_recursive, "scanning watch handler");
343
344 WalkDir::new(path)
349 .follow_links(follow_symlinks)
350 .max_depth(if *is_recursive { usize::MAX } else { 1 })
351 .into_iter()
352 .filter_map(|entry_res| match entry_res {
353 Ok(entry) => Some(entry),
354 Err(err) => {
355 tracing::warn!("walkdir error scanning {err:?}");
356
357 if let Some(io_error) = err.io_error() {
358 if io_error.kind() == io::ErrorKind::NotFound {
359 return None;
360 }
361 let new_io_error = io::Error::new(io_error.kind(), err.to_string());
363 data_builder.emitter.emit_io_err(new_io_error, err.path());
364 } else {
365 let crate_err =
366 Error::new(crate::ErrorKind::Generic(err.to_string()));
367 data_builder.emitter.emit(Err(crate_err));
368 }
369 None
370 }
371 })
372 .filter_map(move |entry| match entry.metadata() {
373 Ok(metadata) => {
374 let path = entry.into_path();
375 let meta_path = MetaPath::from_parts_unchecked(path, metadata);
376 let data_path = data_builder.build_path_data(&meta_path);
377
378 Some((meta_path.into_path(), data_path))
379 }
380 Err(err) => {
381 if let Some(io_error) = err.io_error()
382 && io_error.kind() == io::ErrorKind::NotFound
383 {
384 return None;
385 }
386
387 let path = entry.into_path();
389 data_builder.emitter.emit_io_err(err, Some(path));
390
391 None
392 }
393 })
394 })
395 }
396 }
397
398 #[derive(Debug, Clone)]
402 struct PathData {
403 mtime: i64,
405
406 file_type: FileType,
407
408 hash: Option<u64>,
411
412 last_check: Instant,
414 }
415
416 impl PathData {
417 fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
419 let metadata = meta_path.metadata();
420
421 PathData {
422 mtime: metadata.modified().map_or(0, system_time_to_seconds),
423 file_type: metadata.file_type(),
424 hash: data_builder
425 .build_hasher
426 .as_ref()
427 .filter(|_| metadata.is_file())
428 .and_then(|build_hasher| {
429 Self::get_content_hash(build_hasher, meta_path.path()).ok()
430 }),
431
432 last_check: data_builder.now,
433 }
434 }
435
436 fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
438 let mut hasher = build_hasher.build_hasher();
439 let mut file = File::open(path)?;
440 let mut buf = [0; 512];
441
442 loop {
443 let n = match file.read(&mut buf) {
444 Ok(0) => break,
445 Ok(len) => len,
446 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
447 Err(e) => return Err(e),
448 };
449
450 hasher.write(&buf[..n]);
451 }
452
453 Ok(hasher.finish())
454 }
455
456 fn get_create_kind(&self) -> CreateKind {
458 #[expect(clippy::filetype_is_file)]
459 if self.file_type.is_dir() {
460 CreateKind::Folder
461 } else if self.file_type.is_file() {
462 CreateKind::File
463 } else {
464 CreateKind::Any
465 }
466 }
467
468 fn get_remove_kind(&self) -> RemoveKind {
470 #[expect(clippy::filetype_is_file)]
471 if self.file_type.is_dir() {
472 RemoveKind::Folder
473 } else if self.file_type.is_file() {
474 RemoveKind::File
475 } else {
476 RemoveKind::Any
477 }
478 }
479
480 fn compare_to_event<P>(
482 path: P,
483 old: Option<&PathData>,
484 new: Option<&PathData>,
485 ) -> Option<Event>
486 where
487 P: Into<PathBuf>,
488 {
489 match (old, new) {
490 (Some(old), Some(new)) => {
491 if new.mtime > old.mtime {
492 Some(EventKind::Modify(ModifyKind::Metadata(
493 MetadataKind::WriteTime,
494 )))
495 } else if new.hash != old.hash {
496 Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
497 } else {
498 None
499 }
500 }
501 (None, Some(new)) => Some(EventKind::Create(new.get_create_kind())),
502 (Some(old), None) => Some(EventKind::Remove(old.get_remove_kind())),
503 (None, None) => None,
504 }
505 .map(|event_kind| Event::new(event_kind).add_path(path.into()))
506 }
507 }
508
509 #[derive(Debug)]
515 pub(super) struct MetaPath {
516 path: PathBuf,
517 metadata: Metadata,
518 }
519
520 impl MetaPath {
521 fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
527 Self { path, metadata }
528 }
529
530 fn path(&self) -> &Path {
531 &self.path
532 }
533
534 fn metadata(&self) -> &Metadata {
535 &self.metadata
536 }
537
538 fn into_path(self) -> PathBuf {
539 self.path
540 }
541 }
542
543 struct EventEmitter(
545 Box<RefCell<dyn EventHandler>>,
548 );
549
550 impl EventEmitter {
551 fn new<F: EventHandler>(event_handler: F) -> Self {
552 Self(Box::new(RefCell::new(event_handler)))
553 }
554
555 fn emit(&self, event: crate::Result<Event>) {
557 self.0.borrow_mut().handle_event(event);
558 }
559
560 fn emit_ok(&self, event: Event) {
562 self.emit(Ok(event));
563 }
564
565 fn emit_io_err<E, P>(&self, err: E, path: Option<P>)
567 where
568 E: Into<io::Error>,
569 P: Into<PathBuf>,
570 {
571 let e = crate::Error::io(err.into());
572 if let Some(path) = path {
573 self.emit(Err(e.add_path(path.into())));
574 } else {
575 self.emit(Err(e));
576 }
577 }
578 }
579}
580
581enum EventLoopMsg {
582 AddWatch(PathBuf, WatchMode, Sender<Result<()>>),
583 AddWatchMultiple(Vec<(PathBuf, WatchMode)>, Sender<Result<()>>),
584 RemoveWatch(PathBuf, Sender<Result<()>>),
585 #[cfg(test)]
586 WaitNextScan(Sender<Result<()>>),
587 Poll,
589 Shutdown,
590}
591
592struct PollPathsMut<'a> {
593 inner: &'a mut PollWatcher,
594 add_paths: Vec<(PathBuf, WatchMode)>,
595}
596impl<'a> PollPathsMut<'a> {
597 fn new(watcher: &'a mut PollWatcher) -> Self {
598 Self {
599 inner: watcher,
600 add_paths: Vec::new(),
601 }
602 }
603}
604impl PathsMut for PollPathsMut<'_> {
605 #[tracing::instrument(level = "debug", skip(self))]
606 fn add(&mut self, path: &Path, watch_mode: WatchMode) -> Result<()> {
607 self.add_paths.push((path.to_owned(), watch_mode));
608 Ok(())
609 }
610
611 #[tracing::instrument(level = "debug", skip(self))]
612 fn remove(&mut self, path: &Path) -> Result<()> {
613 self.inner.unwatch_inner(path)
614 }
615
616 #[tracing::instrument(level = "debug", skip(self))]
617 fn commit(self: Box<Self>) -> Result<()> {
618 let paths = self.add_paths;
619 self.inner.watch_multiple_inner(paths)
620 }
621}
622
623#[derive(Debug)]
630pub struct PollWatcher {
631 delay: Option<Duration>,
632 follow_symlinks: bool,
633
634 event_loop_tx: Sender<EventLoopMsg>,
635}
636
637impl PollWatcher {
638 pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
640 Ok(Self::with_opt::<_, ()>(event_handler, config, None))
641 }
642
643 pub fn poll(&self) -> crate::Result<()> {
645 self.event_loop_tx
646 .send(EventLoopMsg::Poll)
647 .map_err(|_| Error::generic("failed to send poll message"))?;
648 Ok(())
649 }
650
651 #[cfg(test)]
652 pub(crate) fn wait_next_scan(&self) -> crate::Result<()> {
653 let (tx, rx) = unbounded();
654 self.event_loop_tx
655 .send(EventLoopMsg::WaitNextScan(tx))
656 .map_err(|_| Error::generic("failed to send WaitNextScan message"))?;
657 rx.recv().unwrap()
658 }
659
660 #[cfg(test)]
662 pub(crate) fn poll_sender(&self) -> Sender<()> {
663 let inner_tx = self.event_loop_tx.clone();
664 let (tx, rx) = unbounded();
665 thread::Builder::new()
666 .name("notify-rs poll loop".to_string())
667 .spawn(move || {
668 for () in &rx {
669 if let Err(err) = inner_tx.send(EventLoopMsg::Poll) {
670 tracing::error!(?err, "failed to send poll message");
671 }
672 }
673 })
674 .unwrap();
675 tx
676 }
677
678 pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
682 event_handler: F,
683 config: Config,
684 scan_callback: G,
685 ) -> crate::Result<PollWatcher> {
686 Ok(Self::with_opt(event_handler, config, Some(scan_callback)))
687 }
688
689 fn with_opt<F: EventHandler, G: ScanEventHandler>(
691 event_handler: F,
692 config: Config,
693 scan_callback: Option<G>,
694 ) -> PollWatcher {
695 let (tx, rx) = unbounded();
696
697 let poll_watcher = PollWatcher {
698 delay: config.poll_interval(),
699 follow_symlinks: config.follow_symlinks(),
700
701 event_loop_tx: tx,
702 };
703
704 let data_builder =
705 DataBuilder::new(event_handler, config.compare_contents(), scan_callback);
706 poll_watcher.run(rx, data_builder);
707
708 poll_watcher
709 }
710
711 fn run(&self, rx: Receiver<EventLoopMsg>, mut data_builder: DataBuilder) {
712 let delay = self.delay;
713 let follow_symlinks = self.follow_symlinks;
714
715 let result = thread::Builder::new()
716 .name("notify-rs poll loop".to_string())
717 .spawn(move || {
718 let mut watch_data = WatchData::new(follow_symlinks);
719
720 loop {
721 data_builder.update_timestamp();
722 watch_data.rescan(&data_builder);
723
724 let result = if let Some(delay) = delay {
726 rx.recv_timeout(delay).or_else(|e| match e {
727 mpsc::RecvTimeoutError::Timeout => Ok(EventLoopMsg::Poll),
728 mpsc::RecvTimeoutError::Disconnected => Err(mpsc::RecvError),
729 })
730 } else {
731 rx.recv()
732 };
733 match result {
734 Ok(EventLoopMsg::AddWatch(path, mode, resp_tx)) => {
735 let result = resp_tx.send(watch_data.add_watch(path, mode));
736 if let Err(e) = result {
737 tracing::error!(?e, "failed to send AddWatch response");
738 }
739 }
740 Ok(EventLoopMsg::AddWatchMultiple(paths, resp_tx)) => {
741 let result = resp_tx.send(watch_data.add_watch_multiple(paths));
742 if let Err(e) = result {
743 tracing::error!(?e, "failed to send AddWatchMultiple response");
744 }
745 }
746 Ok(EventLoopMsg::RemoveWatch(path, resp_tx)) => {
747 let result = resp_tx.send(watch_data.remove_watch(&path));
748 if let Err(e) = result {
749 tracing::error!(?e, "failed to send RemoveWatch response");
750 }
751 }
752 Ok(EventLoopMsg::Poll) => {
753 }
755 #[cfg(test)]
756 Ok(EventLoopMsg::WaitNextScan(resp_tx)) => {
757 let result = resp_tx.send(Ok(()));
758 if let Err(e) = result {
759 tracing::error!(?e, "failed to send WaitNextScan response");
760 }
761 }
762 Ok(EventLoopMsg::Shutdown) => {
763 break;
764 }
765 Err(e) => {
766 tracing::error!(?e, "failed to receive poll message");
767 }
768 }
769 }
770 });
771 if let Err(e) = result {
772 tracing::error!(?e, "failed to start poll watcher thread");
773 }
774 }
775
776 fn watch_inner(&self, path: &Path, watch_mode: WatchMode) -> crate::Result<()> {
778 let (tx, rx) = unbounded();
779 self.event_loop_tx
780 .send(EventLoopMsg::AddWatch(path.to_path_buf(), watch_mode, tx))?;
781 rx.recv().unwrap()
782 }
783
784 fn watch_multiple_inner(&self, paths: Vec<(PathBuf, WatchMode)>) -> crate::Result<()> {
785 let (tx, rx) = unbounded();
786 self.event_loop_tx
787 .send(EventLoopMsg::AddWatchMultiple(paths, tx))?;
788 rx.recv().unwrap()
789 }
790
791 fn unwatch_inner(&self, path: &Path) -> crate::Result<()> {
795 let (tx, rx) = unbounded();
796 self.event_loop_tx
797 .send(EventLoopMsg::RemoveWatch(path.to_path_buf(), tx))?;
798 rx.recv().unwrap()
799 }
800}
801
802impl Watcher for PollWatcher {
803 #[tracing::instrument(level = "debug", skip(event_handler))]
805 fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
806 Self::new(event_handler, config)
807 }
808
809 #[tracing::instrument(level = "debug", skip(self))]
810 fn watch(&mut self, path: &Path, watch_mode: WatchMode) -> crate::Result<()> {
811 self.watch_inner(path, watch_mode)
812 }
813
814 #[tracing::instrument(level = "debug", skip(self))]
815 fn paths_mut<'me>(&'me mut self) -> Box<dyn PathsMut + 'me> {
816 Box::new(PollPathsMut::new(self))
817 }
818
819 #[tracing::instrument(level = "debug", skip(self))]
820 fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
821 self.unwatch_inner(path)
822 }
823
824 fn kind() -> crate::WatcherKind {
825 crate::WatcherKind::PollWatcher
826 }
827}
828
829impl Drop for PollWatcher {
830 fn drop(&mut self) {
831 let result = self.event_loop_tx.send(EventLoopMsg::Shutdown);
832 if let Err(e) = result {
833 tracing::error!(?e, "failed to send shutdown message to poll watcher thread");
834 }
835 }
836}
837
838#[cfg(test)]
839mod tests {
840 use super::PollWatcher;
841 use crate::{Error, ErrorKind, RecursiveMode, TargetMode, WatchMode, Watcher, test::*};
842
843 fn watcher() -> (TestWatcher<PollWatcher>, Receiver) {
844 poll_watcher_channel()
845 }
846
847 #[test]
848 fn poll_watcher_is_send_and_sync() {
849 fn check<T: Send + Sync>() {}
850 check::<PollWatcher>();
851 }
852
853 #[test]
854 fn create_file() {
855 let tmpdir = testdir();
856 let (mut watcher, rx) = watcher();
857 watcher.watch_recursively(&tmpdir);
858 watcher.watcher.wait_next_scan().expect("wait next scan");
859
860 let path = tmpdir.path().join("entry");
861 std::fs::File::create_new(&path).expect("Unable to create");
862
863 rx.sleep_until_exists(&path);
864 rx.wait_ordered_exact([expected(&path).create_file()]);
865 }
866
867 #[test]
868 fn create_self_file() {
869 let tmpdir = testdir();
870 let (mut watcher, rx) = watcher();
871
872 let path = tmpdir.path().join("entry");
873
874 watcher.watch_nonrecursively(&path);
875 watcher.watcher.wait_next_scan().expect("wait next scan");
876
877 std::fs::File::create_new(&path).expect("create");
878
879 rx.sleep_until_exists(&path);
880 rx.wait_ordered_exact([expected(&path).create_file()]);
881 }
882
883 #[test]
884 fn create_self_file_no_track() {
885 let tmpdir = testdir();
886 let (mut watcher, _) = watcher();
887
888 let path = tmpdir.path().join("entry");
889
890 let result = watcher.watcher.watch(
891 &path,
892 WatchMode {
893 recursive_mode: RecursiveMode::NonRecursive,
894 target_mode: TargetMode::NoTrack,
895 },
896 );
897 assert!(matches!(
898 result,
899 Err(Error {
900 paths: _,
901 kind: ErrorKind::PathNotFound
902 })
903 ));
904 }
905
906 #[test]
907 fn create_self_file_nested() {
908 let tmpdir = testdir();
909 let (mut watcher, rx) = watcher();
910
911 let path = tmpdir.path().join("entry/nested");
912
913 watcher.watch_nonrecursively(&path);
914 watcher.watcher.wait_next_scan().expect("wait next scan");
915
916 std::fs::create_dir_all(path.parent().unwrap()).expect("create");
917 std::fs::File::create_new(&path).expect("create");
918
919 rx.wait_ordered_exact([expected(&path).create_file()]);
920 }
921
922 #[test]
923 fn create_dir() {
924 let tmpdir = testdir();
925 let (mut watcher, rx) = watcher();
926 watcher.watch_recursively(&tmpdir);
927 watcher.watcher.wait_next_scan().expect("wait next scan");
928
929 let path = tmpdir.path().join("entry");
930 std::fs::create_dir(&path).expect("Unable to create");
931
932 rx.sleep_until_exists(&path);
933 rx.wait_ordered_exact([expected(&path).create_folder()]);
934 }
935
936 #[test]
937 fn modify_file() {
938 let tmpdir = testdir();
939 let (mut watcher, rx) = watcher();
940 let path = tmpdir.path().join("entry");
941 std::fs::File::create_new(&path).expect("Unable to create");
942
943 watcher.watch_recursively(&tmpdir);
944 watcher.watcher.wait_next_scan().expect("wait next scan");
945 std::fs::write(&path, b"123").expect("Unable to write");
946
947 assert!(
948 rx.sleep_until(|| std::fs::read_to_string(&path).is_ok_and(|content| content == "123")),
949 "the file wasn't modified"
950 );
951 rx.wait_ordered_exact([expected(&path).modify_data_any()]);
952 }
953
954 #[test]
955 fn rename_file() {
956 let tmpdir = testdir();
957 let (mut watcher, rx) = watcher();
958 let path = tmpdir.path().join("entry");
959 let new_path = tmpdir.path().join("new_entry");
960 std::fs::File::create_new(&path).expect("Unable to create");
961
962 watcher.watch_recursively(&tmpdir);
963 watcher.watcher.wait_next_scan().expect("wait next scan");
964 std::fs::rename(&path, &new_path).expect("Unable to remove");
965
966 rx.sleep_while_exists(&path);
967 rx.sleep_until_exists(&new_path);
968
969 rx.wait_unordered_exact([
970 expected(&path).remove_file(),
971 expected(&new_path).create_file(),
972 ]);
973 }
974
975 #[test]
976 fn rename_self_file() {
977 let tmpdir = testdir();
978 let (mut watcher, rx) = watcher();
979
980 let path = tmpdir.path().join("entry");
981 std::fs::File::create_new(&path).expect("create");
982
983 watcher.watch_nonrecursively(&path);
984 watcher.watcher.wait_next_scan().expect("wait next scan");
985 let new_path = tmpdir.path().join("renamed");
986
987 std::fs::rename(&path, &new_path).expect("rename");
988
989 rx.sleep_while_exists(&path);
990 rx.sleep_until_exists(&new_path);
991
992 rx.wait_unordered_exact([expected(&path).remove_file()])
993 .ensure_no_tail();
994
995 std::fs::rename(&new_path, &path).expect("rename2");
996
997 rx.sleep_while_exists(&new_path);
998 rx.sleep_until_exists(&path);
999
1000 rx.wait_unordered_exact([expected(&path).create_file()])
1001 .ensure_no_tail();
1002 }
1003
1004 #[test]
1005 fn rename_self_file_no_track() {
1006 let tmpdir = testdir();
1007 let (mut watcher, rx) = watcher();
1008
1009 let path = tmpdir.path().join("entry");
1010 std::fs::File::create_new(&path).expect("create");
1011
1012 watcher.watch(
1013 &path,
1014 WatchMode {
1015 recursive_mode: RecursiveMode::NonRecursive,
1016 target_mode: TargetMode::NoTrack,
1017 },
1018 );
1019 watcher.watcher.wait_next_scan().expect("wait next scan");
1020
1021 let new_path = tmpdir.path().join("renamed");
1022
1023 std::fs::rename(&path, &new_path).expect("rename");
1024
1025 rx.sleep_while_exists(&path);
1026 rx.sleep_until_exists(&new_path);
1027
1028 rx.wait_unordered_exact([expected(&path).remove_file()])
1029 .ensure_no_tail();
1030
1031 let result = watcher.watcher.watch(
1032 &path,
1033 WatchMode {
1034 recursive_mode: RecursiveMode::NonRecursive,
1035 target_mode: TargetMode::NoTrack,
1036 },
1037 );
1038 assert!(matches!(
1039 result,
1040 Err(Error {
1041 paths: _,
1042 kind: ErrorKind::PathNotFound
1043 })
1044 ));
1045 }
1046
1047 #[test]
1048 fn delete_file() {
1049 let tmpdir = testdir();
1050 let (mut watcher, rx) = watcher();
1051 let path = tmpdir.path().join("entry");
1052 std::fs::File::create_new(&path).expect("Unable to create");
1053
1054 watcher.watch_recursively(&tmpdir);
1055 watcher.watcher.wait_next_scan().expect("wait next scan");
1056 std::fs::remove_file(&path).expect("Unable to remove");
1057
1058 rx.sleep_while_exists(&path);
1059 rx.wait_ordered_exact([
1060 expected(&path).modify_data_any().optional(),
1061 expected(&path).remove_file(),
1062 ]);
1063 }
1064
1065 #[test]
1066 fn delete_self_file() {
1067 let tmpdir = testdir();
1068 let (mut watcher, rx) = watcher();
1069 let path = tmpdir.path().join("entry");
1070 std::fs::File::create_new(&path).expect("Unable to create");
1071
1072 watcher.watch_nonrecursively(&path);
1073 watcher.watcher.wait_next_scan().expect("wait next scan");
1074
1075 std::fs::remove_file(&path).expect("Unable to remove");
1076
1077 rx.sleep_while_exists(&path);
1078 rx.wait_ordered_exact([
1079 expected(&path).modify_data_any().optional(),
1080 expected(&path).remove_file(),
1081 ]);
1082
1083 std::fs::write(&path, "").expect("write");
1084
1085 rx.sleep_until_exists(&path);
1086 rx.wait_ordered_exact([expected(&path).create_file()]);
1087 }
1088
1089 #[test]
1090 fn delete_self_file_no_track() {
1091 let tmpdir = testdir();
1092 let (mut watcher, rx) = watcher();
1093 let path = tmpdir.path().join("entry");
1094 std::fs::File::create_new(&path).expect("Unable to create");
1095
1096 watcher.watch(
1097 &path,
1098 WatchMode {
1099 recursive_mode: RecursiveMode::NonRecursive,
1100 target_mode: TargetMode::NoTrack,
1101 },
1102 );
1103 watcher.watcher.wait_next_scan().expect("wait next scan");
1104
1105 std::fs::remove_file(&path).expect("Unable to remove");
1106
1107 rx.sleep_while_exists(&path);
1108 rx.wait_ordered_exact([
1109 expected(&path).modify_data_any().optional(),
1110 expected(&path).remove_file(),
1111 ]);
1112
1113 std::fs::write(&path, "").expect("write");
1114
1115 rx.ensure_empty_with_wait();
1116 }
1117
1118 #[test]
1119 fn create_write_overwrite() {
1120 let tmpdir = testdir();
1121 let (mut watcher, rx) = watcher();
1122 let overwritten_file = tmpdir.path().join("overwritten_file");
1123 let overwriting_file = tmpdir.path().join("overwriting_file");
1124 std::fs::write(&overwritten_file, "123").expect("write1");
1125
1126 watcher.watch_nonrecursively(&tmpdir);
1127 watcher.watcher.wait_next_scan().expect("wait next scan");
1128
1129 std::fs::File::create(&overwriting_file).expect("create");
1130 std::fs::write(&overwriting_file, "321").expect("write2");
1131 std::fs::rename(&overwriting_file, &overwritten_file).expect("rename");
1132
1133 rx.sleep_while_exists(&overwriting_file);
1134 assert!(
1135 rx.sleep_until(
1136 || std::fs::read_to_string(&overwritten_file).is_ok_and(|cnt| cnt == "321")
1137 ),
1138 "file {overwritten_file:?} was not replaced"
1139 );
1140
1141 rx.wait_unordered([expected(&overwritten_file).modify_data_any()]);
1142 }
1143}