1mod cache;
65mod time;
66
67#[cfg(test)]
68mod testing;
69
70#[cfg(not(target_family = "wasm"))]
71mod file_id_map;
72
73use std::{
74 cmp::Reverse,
75 collections::{BinaryHeap, HashMap, VecDeque},
76 path::{Path, PathBuf},
77 sync::{
78 Arc, Mutex,
79 atomic::{AtomicBool, Ordering},
80 },
81 time::{Duration, Instant},
82};
83
84use time::now;
85
86pub use cache::{FileIdCache, NoCache, RecommendedCache};
87
88#[cfg(not(target_family = "wasm"))]
89pub use file_id_map::FileIdMap;
90
91pub use file_id;
92pub use notify;
93pub use notify_types::debouncer_full::DebouncedEvent;
94
95use file_id::FileId;
96use notify::{
97 Error, ErrorKind, Event, EventKind, PathsMut, RecommendedWatcher, RecursiveMode, TargetMode,
98 WatchMode, Watcher, WatcherKind,
99 event::{EventAttributes, ModifyKind, RemoveKind, RenameMode},
100};
101
102pub trait DebounceEventHandler: Send + 'static {
123 fn handle_event(&mut self, event: DebounceEventResult);
125}
126
127impl<F> DebounceEventHandler for F
128where
129 F: FnMut(DebounceEventResult) + Send + 'static,
130{
131 fn handle_event(&mut self, event: DebounceEventResult) {
132 (self)(event);
133 }
134}
135
136#[cfg(feature = "crossbeam-channel")]
137impl DebounceEventHandler for crossbeam_channel::Sender<DebounceEventResult> {
138 fn handle_event(&mut self, event: DebounceEventResult) {
139 let result = self.send(event);
140 if let Err(e) = result {
141 tracing::error!(?e, "failed to send debounce event result");
142 }
143 }
144}
145
146#[cfg(feature = "flume")]
147impl DebounceEventHandler for flume::Sender<DebounceEventResult> {
148 fn handle_event(&mut self, event: DebounceEventResult) {
149 let result = self.send(event);
150 if let Err(e) = result {
151 tracing::error!(?e, "failed to send debounce event result");
152 }
153 }
154}
155
156impl DebounceEventHandler for std::sync::mpsc::Sender<DebounceEventResult> {
157 fn handle_event(&mut self, event: DebounceEventResult) {
158 let result = self.send(event);
159 if let Err(e) = result {
160 tracing::error!(?e, "failed to send debounce event result");
161 }
162 }
163}
164
165pub type DebounceEventResult = Result<Vec<DebouncedEvent>, Vec<Error>>;
168
169type DebounceData<T> = Arc<Mutex<DebounceDataInner<T>>>;
170
171#[derive(Debug, Clone, Default, PartialEq, Eq)]
172struct Queue {
173 events: VecDeque<DebouncedEvent>,
178}
179
180impl Queue {
181 fn was_created(&self) -> bool {
182 self.events.front().is_some_and(|event| {
183 matches!(
184 event.kind,
185 EventKind::Create(_) | EventKind::Modify(ModifyKind::Name(RenameMode::To))
186 )
187 })
188 }
189
190 fn was_removed(&self) -> bool {
191 self.events.front().is_some_and(|event| {
192 matches!(
193 event.kind,
194 EventKind::Remove(_) | EventKind::Modify(ModifyKind::Name(RenameMode::From))
195 )
196 })
197 }
198}
199
200#[derive(Debug)]
201pub(crate) struct DebounceDataInner<T> {
202 queues: HashMap<PathBuf, Queue>,
203 roots: Vec<(PathBuf, WatchMode)>,
204 cache: T,
205 rename_event: Option<(DebouncedEvent, Option<FileId>)>,
206 rescan_event: Option<DebouncedEvent>,
207 errors: Vec<Error>,
208 timeout: Duration,
209}
210
211impl<T: FileIdCache> DebounceDataInner<T> {
212 pub(crate) fn new(cache: T, timeout: Duration) -> Self {
213 Self {
214 queues: HashMap::new(),
215 roots: Vec::new(),
216 cache,
217 rename_event: None,
218 rescan_event: None,
219 errors: Vec::new(),
220 timeout,
221 }
222 }
223
224 pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
226 let now = now();
227 let mut events_expired = Vec::with_capacity(self.queues.len());
228 let mut queues_remaining = HashMap::with_capacity(self.queues.len());
229
230 if let Some(event) = self.rescan_event.take() {
231 if now.saturating_duration_since(event.time) >= self.timeout {
232 tracing::trace!("debounce candidate rescan event: {event:?}");
233 events_expired.push(event);
234 } else {
235 self.rescan_event = Some(event);
236 }
237 }
238
239 for (path, mut queue) in self.queues.drain() {
242 let mut kind_index = HashMap::new();
243
244 while let Some(event) = queue.events.pop_front() {
245 if let Some(idx) = kind_index.get(&event.kind).copied() {
247 tracing::trace!("removed candidate event: {event:?}");
248 events_expired.remove(idx);
249
250 for i in kind_index.values_mut() {
251 if *i > idx {
252 *i -= 1;
253 }
254 }
255 }
256
257 if now.saturating_duration_since(event.time) >= self.timeout {
258 tracing::trace!("debounce candidate event: {event:?}");
259 kind_index.insert(event.kind, events_expired.len());
260
261 events_expired.push(event);
262 } else {
263 queue.events.push_front(event);
264 break;
265 }
266 }
267
268 if !queue.events.is_empty() {
269 queues_remaining.insert(path, queue);
270 }
271 }
272
273 self.queues = queues_remaining;
274
275 sort_events(events_expired)
276 }
277
278 pub fn errors(&mut self) -> Vec<Error> {
280 std::mem::take(&mut self.errors)
281 }
282
283 pub fn add_error(&mut self, error: Error) {
285 tracing::trace!("raw error: {error:?}");
286
287 self.errors.push(error);
288 }
289
290 pub fn add_event(&mut self, event: Event) {
292 tracing::trace!("raw event: {event:?}");
293
294 if event.need_rescan() {
295 self.cache.rescan(&self.roots);
296 self.rescan_event = Some(DebouncedEvent { event, time: now() });
297 return;
298 }
299
300 let Some(path) = event.paths.first() else {
301 tracing::info!("skipping event with no paths: {event:?}");
302 return;
303 };
304
305 match &event.kind {
306 EventKind::Create(_) => {
307 let watch_mode = self.watch_mode(path);
308
309 self.cache.add_path(path, watch_mode);
310
311 self.push_event(event, now());
312 }
313 EventKind::Modify(ModifyKind::Name(rename_mode)) => {
314 #[expect(clippy::match_same_arms)]
315 match rename_mode {
316 RenameMode::Any => {
317 if event.paths[0].exists() {
318 self.handle_rename_to(event);
319 } else {
320 self.handle_rename_from(event);
321 }
322 }
323 RenameMode::To => {
324 self.handle_rename_to(event);
325 }
326 RenameMode::From => {
327 self.handle_rename_from(event);
328 }
329 RenameMode::Both => {
330 }
332 RenameMode::Other => {
333 }
335 }
336 }
337 EventKind::Remove(_) => {
338 self.push_remove_event(event, now());
339 }
340 EventKind::Other => {
341 }
343 _ => {
344 if self.cache.cached_file_id(path).is_none() {
345 let watch_mode = self.watch_mode(path);
346
347 self.cache.add_path(path, watch_mode);
348 }
349
350 self.push_event(event, now());
351 }
352 }
353 }
354
355 fn watch_mode(&self, path: &Path) -> WatchMode {
356 self.roots
357 .iter()
358 .find_map(|(root, watch_mode)| {
359 if path.starts_with(root) {
360 Some(*watch_mode)
361 } else {
362 None
363 }
364 })
365 .unwrap_or(WatchMode {
366 recursive_mode: RecursiveMode::NonRecursive,
367 target_mode: TargetMode::TrackPath, })
369 }
370
371 fn handle_rename_from(&mut self, event: Event) {
372 let time = now();
373 let path = &event.paths[0];
374
375 let file_id = self.cache.cached_file_id(path).map(|id| *id.as_ref());
377 self.rename_event = Some((DebouncedEvent::new(event.clone(), time), file_id));
378
379 self.cache.remove_path(path);
380
381 self.push_event(event, time);
382 }
383
384 fn handle_rename_to(&mut self, event: Event) {
385 let watch_mode = self.watch_mode(&event.paths[0]);
386
387 self.cache.add_path(&event.paths[0], watch_mode);
388
389 let trackers_match = self
390 .rename_event
391 .as_ref()
392 .and_then(|(e, _)| e.tracker())
393 .and_then(|from_tracker| {
394 event
395 .attrs
396 .tracker()
397 .map(|to_tracker| from_tracker == to_tracker)
398 })
399 .unwrap_or_default();
400
401 let file_ids_match = self
402 .rename_event
403 .as_ref()
404 .and_then(|(_, id)| id.as_ref())
405 .and_then(|from_file_id| {
406 self.cache
407 .cached_file_id(&event.paths[0])
408 .map(|to_file_id| from_file_id == to_file_id.as_ref())
409 })
410 .unwrap_or_default();
411
412 if trackers_match || file_ids_match {
413 let (mut rename_event, _) = self.rename_event.take().unwrap(); let path = rename_event.paths.remove(0);
416 let time = rename_event.time;
417 self.push_rename_event(path, event, time);
418 } else {
419 self.push_event(event, now());
421 }
422
423 self.rename_event = None;
424 }
425
426 fn push_rename_event(&mut self, path: PathBuf, event: Event, time: Instant) {
427 self.cache.remove_path(&path);
428
429 let mut source_queue = self.queues.remove(&path).unwrap_or_default();
430
431 source_queue.events.pop_back();
433
434 let (remove_index, original_path, original_time) = source_queue
436 .events
437 .iter()
438 .enumerate()
439 .find_map(|(index, e)| {
440 if matches!(
441 e.kind,
442 EventKind::Modify(ModifyKind::Name(RenameMode::Both))
443 ) {
444 Some((Some(index), e.paths[0].clone(), e.time))
445 } else {
446 None
447 }
448 })
449 .unwrap_or((None, path, time));
450
451 if let Some(remove_index) = remove_index {
452 source_queue.events.remove(remove_index);
453 }
454
455 if source_queue.was_removed() {
457 let event = source_queue.events.pop_front().unwrap();
458
459 self.queues.insert(
460 event.paths[0].clone(),
461 Queue {
462 events: [event].into(),
463 },
464 );
465 }
466
467 for e in &mut source_queue.events {
469 e.paths = vec![event.paths[0].clone()];
470 }
471
472 if !source_queue.was_created() {
474 source_queue.events.push_front(DebouncedEvent {
475 event: Event {
476 kind: EventKind::Modify(ModifyKind::Name(RenameMode::Both)),
477 paths: vec![original_path, event.paths[0].clone()],
478 attrs: event.attrs,
479 },
480 time: original_time,
481 });
482 }
483
484 if let Some(target_queue) = self.queues.get_mut(&event.paths[0]) {
485 if !target_queue.was_created() {
486 let mut remove_event = DebouncedEvent {
487 event: Event {
488 kind: EventKind::Remove(RemoveKind::Any),
489 paths: vec![event.paths[0].clone()],
490 attrs: EventAttributes::default(),
491 },
492 time: original_time,
493 };
494 if !target_queue.was_removed() {
495 remove_event.event = remove_event.event.set_info("override");
496 }
497 source_queue.events.push_front(remove_event);
498 }
499 *target_queue = source_queue;
500 } else {
501 self.queues.insert(event.paths[0].clone(), source_queue);
502 }
503 }
504
505 fn push_remove_event(&mut self, event: Event, time: Instant) {
506 let path = &event.paths[0];
507
508 self.queues.retain(|p, _| !p.starts_with(path) || p == path);
510
511 self.cache.remove_path(path);
513
514 match self.queues.get_mut(path) {
515 Some(queue) if queue.was_created() => {
516 self.queues.remove(path);
517 }
518 Some(queue) => {
519 queue.events = [DebouncedEvent::new(event, time)].into();
520 }
521 None => {
522 self.push_event(event, time);
523 }
524 }
525 }
526
527 fn push_event(&mut self, event: Event, time: Instant) {
528 let path = &event.paths[0];
529
530 if let Some(queue) = self.queues.get_mut(path) {
531 if match event.kind {
534 EventKind::Modify(
535 ModifyKind::Any
536 | ModifyKind::Data(_)
537 | ModifyKind::Metadata(_)
538 | ModifyKind::Other,
539 )
540 | EventKind::Create(_) => !queue.was_created(),
541 _ => true,
542 } {
543 queue.events.push_back(DebouncedEvent::new(event, time));
544 }
545 } else {
546 self.queues.insert(
547 path.clone(),
548 Queue {
549 events: [DebouncedEvent::new(event, time)].into(),
550 },
551 );
552 }
553 }
554}
555
556#[derive(Debug)]
558pub struct Debouncer<T: Watcher, C: FileIdCache> {
559 watcher: T,
560 debouncer_thread: Option<std::thread::JoinHandle<()>>,
561 data: DebounceData<C>,
562 stop: Arc<AtomicBool>,
563}
564
565impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
566 pub fn stop(mut self) {
569 self.set_stop();
570 if let Some(t) = self.debouncer_thread.take() {
571 let result = t.join();
572 if let Err(e) = result {
573 tracing::error!(?e, "failed to join debouncer thread");
574 }
575 }
576 }
577
578 pub fn stop_nonblocking(self) {
580 self.set_stop();
581 }
582
583 fn set_stop(&self) {
584 self.stop.store(true, Ordering::Relaxed);
585 }
586
587 #[deprecated = "`Debouncer` provides all methods from `Watcher` itself now. Remove `.watcher()` and use those methods directly."]
588 pub fn watcher(&mut self) {}
589
590 #[deprecated = "`Debouncer` now manages root paths automatically. Remove all calls to `add_root` and `remove_root`."]
591 pub fn cache(&mut self) {}
592
593 fn add_root(&self, path: impl Into<PathBuf>, watch_mode: WatchMode) {
594 let path = path.into();
595
596 let mut data = self.data.lock().unwrap();
597
598 if data.roots.iter().any(|(p, _)| p == &path) {
600 return;
601 }
602
603 data.roots.push((path.clone(), watch_mode));
604
605 data.cache.add_path(&path, watch_mode);
606 }
607
608 fn remove_root(&self, path: impl AsRef<Path>) {
609 let mut data = self.data.lock().unwrap();
610
611 data.roots.retain(|(root, _)| !root.starts_with(&path));
612
613 data.cache.remove_path(path.as_ref());
614 }
615
616 pub fn watch(&mut self, path: impl AsRef<Path>, watch_mode: WatchMode) -> notify::Result<()> {
617 self.watcher.watch(path.as_ref(), watch_mode)?;
618 self.add_root(path.as_ref(), watch_mode);
619 Ok(())
620 }
621
622 pub fn unwatch(&mut self, path: impl AsRef<Path>) -> notify::Result<()> {
623 self.watcher.unwatch(path.as_ref())?;
624 self.remove_root(path);
625 Ok(())
626 }
627
628 pub fn paths_mut<'me>(&'me mut self) -> Box<dyn PathsMut + 'me> {
629 self.watcher.paths_mut()
630 }
631
632 pub fn configure(&mut self, option: notify::Config) -> notify::Result<bool> {
633 self.watcher.configure(option)
634 }
635
636 #[must_use]
637 pub fn kind() -> WatcherKind
638 where
639 Self: Sized,
640 {
641 T::kind()
642 }
643}
644
645impl<T: Watcher, C: FileIdCache> Drop for Debouncer<T, C> {
646 fn drop(&mut self) {
647 self.set_stop();
648 }
649}
650
651#[tracing::instrument(level = "debug", skip(event_handler, file_id_cache))]
660pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + Send + 'static>(
661 timeout: Duration,
662 tick_rate: Option<Duration>,
663 mut event_handler: F,
664 file_id_cache: C,
665 config: notify::Config,
666) -> Result<Debouncer<T, C>, Error> {
667 let data = Arc::new(Mutex::new(DebounceDataInner::new(file_id_cache, timeout)));
668 let stop = Arc::new(AtomicBool::new(false));
669
670 let tick_div = 4;
671 let tick = match tick_rate {
672 Some(v) => {
673 if v > timeout {
674 return Err(Error::new(ErrorKind::Generic(format!(
675 "Invalid tick_rate, tick rate {v:?} > {timeout:?} timeout!"
676 ))));
677 }
678 v
679 }
680 None => timeout.checked_div(tick_div).ok_or_else(|| {
681 Error::new(ErrorKind::Generic(format!(
682 "Failed to calculate tick as {timeout:?}/{tick_div}!"
683 )))
684 })?,
685 };
686
687 let data_c = Arc::clone(&data);
688 let stop_c = Arc::clone(&stop);
689 let thread = std::thread::Builder::new()
690 .name("notify-rs debouncer loop".to_string())
691 .spawn(move || {
692 loop {
693 if stop_c.load(Ordering::Acquire) {
694 break;
695 }
696 std::thread::sleep(tick);
697 let send_data;
698 let errors;
699 {
700 let mut lock = data_c.lock().unwrap();
701 send_data = lock.debounced_events();
702 errors = lock.errors();
703 }
704 if !send_data.is_empty() {
705 event_handler.handle_event(Ok(send_data));
706 }
707 if !errors.is_empty() {
708 event_handler.handle_event(Err(errors));
709 }
710 }
711 })?;
712
713 let data_c = Arc::clone(&data);
714 let watcher = T::new(
715 move |e: Result<Event, Error>| {
716 let mut lock = data_c.lock().unwrap();
717
718 match e {
719 Ok(e) => lock.add_event(e),
720 Err(e) => lock.add_error(e),
722 }
723 },
724 config,
725 )?;
726
727 let guard = Debouncer {
728 watcher,
729 debouncer_thread: Some(thread),
730 data,
731 stop,
732 };
733
734 Ok(guard)
735}
736
737pub fn new_debouncer<F: DebounceEventHandler>(
743 timeout: Duration,
744 tick_rate: Option<Duration>,
745 event_handler: F,
746) -> Result<Debouncer<RecommendedWatcher, RecommendedCache>, Error> {
747 new_debouncer_opt::<F, RecommendedWatcher, RecommendedCache>(
748 timeout,
749 tick_rate,
750 event_handler,
751 RecommendedCache::new(),
752 notify::Config::default(),
753 )
754}
755
756#[tracing::instrument(level = "trace", ret)]
757fn sort_events(events: Vec<DebouncedEvent>) -> Vec<DebouncedEvent> {
758 let mut sorted = Vec::with_capacity(events.len());
759
760 let mut events_by_path: HashMap<_, VecDeque<_>> =
762 events.into_iter().fold(HashMap::new(), |mut acc, event| {
763 acc.entry(event.paths.last().cloned().unwrap_or_default())
764 .or_default()
765 .push_back(event);
766 acc
767 });
768
769 let mut min_time_heap = events_by_path
772 .iter()
773 .map(|(path, events)| Reverse((events[0].time, path.clone())))
774 .collect::<BinaryHeap<_>>();
775
776 while let Some(Reverse((min_time, path))) = min_time_heap.pop() {
777 let events = events_by_path.get_mut(&path).unwrap();
780
781 let mut push_next = false;
782
783 while events.front().is_some_and(|event| event.time <= min_time) {
784 let event = events.pop_front().unwrap();
786 sorted.push(event);
787 push_next = true;
788 }
789
790 if push_next && let Some(event) = events.front() {
791 min_time_heap.push(Reverse((event.time, path)));
792 }
793 }
794
795 sorted
796}
797
798#[cfg(test)]
799mod tests {
800 use std::{fs, path::Path};
801
802 use super::*;
803
804 use pretty_assertions::assert_eq;
805 use rstest::rstest;
806 use tempfile::tempdir;
807 use testing::TestCase;
808 use time::MockTime;
809
810 #[rstest]
811 fn state(
812 #[values(
813 "add_create_event",
814 "add_create_event_after_remove_event",
815 "add_create_dir_event_twice",
816 "add_event_with_no_paths_is_ok",
817 "add_modify_any_event_after_create_event",
818 "add_modify_content_event_after_create_event",
819 "add_rename_from_event",
820 "add_rename_from_event_after_create_event",
821 "add_rename_from_event_after_modify_event",
822 "add_rename_from_event_after_create_and_modify_event",
823 "add_rename_from_event_after_rename_from_event",
824 "add_rename_to_event",
825 "add_rename_to_dir_event",
826 "add_rename_from_and_to_event",
827 "add_rename_from_and_to_event_after_create",
828 "add_rename_from_and_to_event_after_rename",
829 "add_rename_from_and_to_event_after_modify_content",
830 "add_rename_from_and_to_event_override_created",
831 "add_rename_from_and_to_event_override_modified",
832 "add_rename_from_and_to_event_override_removed",
833 "add_rename_from_and_to_event_with_file_ids",
834 "add_rename_from_and_to_event_with_different_file_ids",
835 "add_rename_from_and_to_event_with_different_tracker",
836 "add_rename_both_event",
837 "add_remove_event",
838 "add_remove_event_after_create_event",
839 "add_remove_event_after_modify_event",
840 "add_remove_event_after_create_and_modify_event",
841 "add_remove_parent_event_after_remove_child_event",
842 "add_errors",
843 "debounce_modify_events",
844 "emit_continuous_modify_content_events",
845 "emit_events_in_chronological_order",
846 "emit_events_with_a_prepended_rename_event",
847 "emit_close_events_only_once",
848 "emit_modify_event_after_close_event",
849 "emit_needs_rescan_event",
850 "read_file_id_without_create_event",
851 "sort_events_chronologically",
852 "sort_events_with_reordering"
853 )]
854 file_name: &str,
855 ) {
856 let file_content =
857 fs::read_to_string(Path::new(&format!("./test_cases/{file_name}.hjson"))).unwrap();
858 let mut test_case = deser_hjson::from_str::<TestCase>(&file_content).unwrap();
859
860 let time = now();
861 MockTime::set_time(time);
862
863 let mut state = test_case.state.into_debounce_data_inner(time);
864 state.roots = vec![(PathBuf::from("/"), WatchMode::recursive())];
865
866 let mut prev_event_time = Duration::default();
867
868 for event in test_case.events {
869 let event_time = Duration::from_millis(event.time);
870 let event = event.into_debounced_event(time, None);
871 MockTime::advance(event_time - prev_event_time);
872 prev_event_time = event_time;
873 state.add_event(event.event);
874 }
875
876 for error in test_case.errors {
877 let error = error.into_notify_error();
878 state.add_error(error);
879 }
880
881 let expected_errors = std::mem::take(&mut test_case.expected.errors);
882 let expected_events = std::mem::take(&mut test_case.expected.events);
883 let expected_state = test_case.expected.into_debounce_data_inner(time);
884 assert_eq!(
885 state.queues, expected_state.queues,
886 "queues not as expected"
887 );
888 assert_eq!(
889 state.rename_event, expected_state.rename_event,
890 "rename event not as expected"
891 );
892 assert_eq!(
893 state.rescan_event, expected_state.rescan_event,
894 "rescan event not as expected"
895 );
896 assert_eq!(
897 state.cache.paths, expected_state.cache.paths,
898 "cache not as expected"
899 );
900
901 assert_eq!(
902 state
903 .errors
904 .iter()
905 .map(|e| format!("{e:?}"))
906 .collect::<Vec<_>>(),
907 expected_errors
908 .iter()
909 .map(|e| format!("{:?}", e.clone().into_notify_error()))
910 .collect::<Vec<_>>(),
911 "errors not as expected"
912 );
913
914 let backup_time = now();
915 let backup_queues = state.queues.clone();
916
917 for (delay, events) in expected_events {
918 MockTime::set_time(backup_time);
919 state.queues = backup_queues.clone();
920
921 match delay.as_str() {
922 "none" => {}
923 "short" => MockTime::advance(Duration::from_millis(10)),
924 "long" => MockTime::advance(Duration::from_millis(100)),
925 _ => {
926 if let Ok(ts) = delay.parse::<u64>() {
927 MockTime::set_time(time + Duration::from_millis(ts));
928 }
929 }
930 }
931
932 let events = events
933 .into_iter()
934 .map(|event| event.into_debounced_event(time, None))
935 .collect::<Vec<_>>();
936
937 assert_eq!(
938 state.debounced_events(),
939 events,
940 "debounced events after a `{delay}` delay"
941 );
942 }
943 }
944
945 #[expect(clippy::print_stdout)]
946 #[test]
947 fn integration() -> Result<(), Box<dyn std::error::Error>> {
948 let dir = tempdir()?;
949
950 let (tx, rx) = std::sync::mpsc::channel();
952 let mut debouncer = new_debouncer(Duration::from_millis(10), None, tx)?;
953 debouncer.watch(dir.path(), WatchMode::recursive())?;
954
955 let file_path = dir.path().join("file.txt");
957 fs::write(&file_path, b"Lorem ipsum")?;
958
959 println!("waiting for event at {}", file_path.display());
960
961 let deadline = Instant::now() + Duration::from_secs(10);
963 while deadline > Instant::now() {
964 let events = rx
965 .recv_timeout(deadline - Instant::now())
966 .expect("did not receive expected event")
967 .expect("received an error");
968
969 for event in events {
970 if event.event.paths == vec![file_path.clone()]
971 || event.event.paths == vec![file_path.canonicalize()?]
972 {
973 return Ok(());
974 }
975
976 println!("unexpected event: {event:?}");
977 }
978 }
979
980 panic!("did not receive expected event");
981 }
982}