1mod cache;
65mod time;
66
67#[cfg(test)]
68mod testing;
69
70use std::{
71 cmp::Reverse,
72 collections::{BinaryHeap, HashMap, VecDeque},
73 path::{Path, PathBuf},
74 sync::{
75 atomic::{AtomicBool, Ordering},
76 Arc, Mutex,
77 },
78 time::{Duration, Instant},
79};
80
81use time::now;
82
83pub use cache::{FileIdCache, FileIdMap, NoCache, RecommendedCache};
84
85pub use file_id;
86pub use notify;
87pub use notify_types::debouncer_full::DebouncedEvent;
88
89use file_id::FileId;
90use notify::{
91 event::{ModifyKind, RemoveKind, RenameMode},
92 Error, ErrorKind, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, WatcherKind,
93};
94
95pub trait DebounceEventHandler: Send + 'static {
116 fn handle_event(&mut self, event: DebounceEventResult);
118}
119
120impl<F> DebounceEventHandler for F
121where
122 F: FnMut(DebounceEventResult) + Send + 'static,
123{
124 fn handle_event(&mut self, event: DebounceEventResult) {
125 (self)(event);
126 }
127}
128
129#[cfg(feature = "crossbeam-channel")]
130impl DebounceEventHandler for crossbeam_channel::Sender<DebounceEventResult> {
131 fn handle_event(&mut self, event: DebounceEventResult) {
132 let _ = self.send(event);
133 }
134}
135
136#[cfg(feature = "flume")]
137impl DebounceEventHandler for flume::Sender<DebounceEventResult> {
138 fn handle_event(&mut self, event: DebounceEventResult) {
139 let _ = self.send(event);
140 }
141}
142
143impl DebounceEventHandler for std::sync::mpsc::Sender<DebounceEventResult> {
144 fn handle_event(&mut self, event: DebounceEventResult) {
145 let _ = self.send(event);
146 }
147}
148
149pub type DebounceEventResult = Result<Vec<DebouncedEvent>, Vec<Error>>;
152
153type DebounceData<T> = Arc<Mutex<DebounceDataInner<T>>>;
154
155#[derive(Debug, Clone, Default, PartialEq, Eq)]
156struct Queue {
157 events: VecDeque<DebouncedEvent>,
162}
163
164impl Queue {
165 fn was_created(&self) -> bool {
166 self.events.front().is_some_and(|event| {
167 matches!(
168 event.kind,
169 EventKind::Create(_) | EventKind::Modify(ModifyKind::Name(RenameMode::To))
170 )
171 })
172 }
173
174 fn was_removed(&self) -> bool {
175 self.events.front().is_some_and(|event| {
176 matches!(
177 event.kind,
178 EventKind::Remove(_) | EventKind::Modify(ModifyKind::Name(RenameMode::From))
179 )
180 })
181 }
182}
183
184#[derive(Debug)]
185pub(crate) struct DebounceDataInner<T> {
186 queues: HashMap<PathBuf, Queue>,
187 roots: Vec<(PathBuf, RecursiveMode)>,
188 cache: T,
189 rename_event: Option<(DebouncedEvent, Option<FileId>)>,
190 rescan_event: Option<DebouncedEvent>,
191 errors: Vec<Error>,
192 timeout: Duration,
193}
194
195impl<T: FileIdCache> DebounceDataInner<T> {
196 pub(crate) fn new(cache: T, timeout: Duration) -> Self {
197 Self {
198 queues: HashMap::new(),
199 roots: Vec::new(),
200 cache,
201 rename_event: None,
202 rescan_event: None,
203 errors: Vec::new(),
204 timeout,
205 }
206 }
207
208 pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
210 let now = now();
211 let mut events_expired = Vec::with_capacity(self.queues.len());
212 let mut queues_remaining = HashMap::with_capacity(self.queues.len());
213
214 if let Some(event) = self.rescan_event.take() {
215 if now.saturating_duration_since(event.time) >= self.timeout {
216 log::trace!("debounced event: {event:?}");
217 events_expired.push(event);
218 } else {
219 self.rescan_event = Some(event);
220 }
221 }
222
223 for (path, mut queue) in self.queues.drain() {
226 let mut kind_index = HashMap::new();
227
228 while let Some(event) = queue.events.pop_front() {
229 if now.saturating_duration_since(event.time) >= self.timeout {
230 if let Some(idx) = kind_index.get(&event.kind).copied() {
232 events_expired.remove(idx);
233
234 kind_index.values_mut().for_each(|i| {
235 if *i > idx {
236 *i -= 1
237 }
238 })
239 }
240
241 kind_index.insert(event.kind, events_expired.len());
242
243 events_expired.push(event);
244 } else {
245 queue.events.push_front(event);
246 break;
247 }
248 }
249
250 if !queue.events.is_empty() {
251 queues_remaining.insert(path, queue);
252 }
253 }
254
255 self.queues = queues_remaining;
256
257 sort_events(events_expired)
258 }
259
260 pub fn errors(&mut self) -> Vec<Error> {
262 std::mem::take(&mut self.errors)
263 }
264
265 pub fn add_error(&mut self, error: Error) {
267 log::trace!("raw error: {error:?}");
268
269 self.errors.push(error);
270 }
271
272 pub fn add_event(&mut self, event: Event) {
274 log::trace!("raw event: {event:?}");
275
276 if event.need_rescan() {
277 self.cache.rescan(&self.roots);
278 self.rescan_event = Some(DebouncedEvent { event, time: now() });
279 return;
280 }
281
282 let path = match event.paths.first() {
283 Some(path) => path,
284 None => {
285 log::info!("skipping event with no paths: {event:?}");
286 return;
287 }
288 };
289
290 match &event.kind {
291 EventKind::Create(_) => {
292 let recursive_mode = self.recursive_mode(path);
293
294 self.cache.add_path(path, recursive_mode);
295
296 self.push_event(event, now());
297 }
298 EventKind::Modify(ModifyKind::Name(rename_mode)) => {
299 match rename_mode {
300 RenameMode::Any => {
301 if event.paths[0].exists() {
302 self.handle_rename_to(event);
303 } else {
304 self.handle_rename_from(event);
305 }
306 }
307 RenameMode::To => {
308 self.handle_rename_to(event);
309 }
310 RenameMode::From => {
311 self.handle_rename_from(event);
312 }
313 RenameMode::Both => {
314 }
316 RenameMode::Other => {
317 }
319 }
320 }
321 EventKind::Remove(_) => {
322 self.push_remove_event(event, now());
323 }
324 EventKind::Other => {
325 }
327 _ => {
328 if self.cache.cached_file_id(path).is_none() {
329 let recursive_mode = self.recursive_mode(path);
330
331 self.cache.add_path(path, recursive_mode);
332 }
333
334 self.push_event(event, now());
335 }
336 }
337 }
338
339 fn recursive_mode(&mut self, path: &Path) -> RecursiveMode {
340 self.roots
341 .iter()
342 .find_map(|(root, recursive_mode)| {
343 if path.starts_with(root) {
344 Some(*recursive_mode)
345 } else {
346 None
347 }
348 })
349 .unwrap_or(RecursiveMode::NonRecursive)
350 }
351
352 fn handle_rename_from(&mut self, event: Event) {
353 let time = now();
354 let path = &event.paths[0];
355
356 let file_id = self.cache.cached_file_id(path).map(|id| *id.as_ref());
358 self.rename_event = Some((DebouncedEvent::new(event.clone(), time), file_id));
359
360 self.cache.remove_path(path);
361
362 self.push_event(event, time);
363 }
364
365 fn handle_rename_to(&mut self, event: Event) {
366 let recursive_mode = self.recursive_mode(&event.paths[0]);
367
368 self.cache.add_path(&event.paths[0], recursive_mode);
369
370 let trackers_match = self
371 .rename_event
372 .as_ref()
373 .and_then(|(e, _)| e.tracker())
374 .and_then(|from_tracker| {
375 event
376 .attrs
377 .tracker()
378 .map(|to_tracker| from_tracker == to_tracker)
379 })
380 .unwrap_or_default();
381
382 let file_ids_match = self
383 .rename_event
384 .as_ref()
385 .and_then(|(_, id)| id.as_ref())
386 .and_then(|from_file_id| {
387 self.cache
388 .cached_file_id(&event.paths[0])
389 .map(|to_file_id| from_file_id == to_file_id.as_ref())
390 })
391 .unwrap_or_default();
392
393 if trackers_match || file_ids_match {
394 let (mut rename_event, _) = self.rename_event.take().unwrap(); let path = rename_event.paths.remove(0);
397 let time = rename_event.time;
398 self.push_rename_event(path, event, time);
399 } else {
400 self.push_event(event, now());
402 }
403
404 self.rename_event = None;
405 }
406
407 fn push_rename_event(&mut self, path: PathBuf, event: Event, time: Instant) {
408 self.cache.remove_path(&path);
409
410 let mut source_queue = self.queues.remove(&path).unwrap_or_default();
411
412 source_queue.events.pop_back();
414
415 let (remove_index, original_path, original_time) = source_queue
417 .events
418 .iter()
419 .enumerate()
420 .find_map(|(index, e)| {
421 if matches!(
422 e.kind,
423 EventKind::Modify(ModifyKind::Name(RenameMode::Both))
424 ) {
425 Some((Some(index), e.paths[0].clone(), e.time))
426 } else {
427 None
428 }
429 })
430 .unwrap_or((None, path, time));
431
432 if let Some(remove_index) = remove_index {
433 source_queue.events.remove(remove_index);
434 }
435
436 if source_queue.was_removed() {
438 let event = source_queue.events.pop_front().unwrap();
439
440 self.queues.insert(
441 event.paths[0].clone(),
442 Queue {
443 events: [event].into(),
444 },
445 );
446 }
447
448 for e in &mut source_queue.events {
450 e.paths = vec![event.paths[0].clone()];
451 }
452
453 if !source_queue.was_created() {
455 source_queue.events.push_front(DebouncedEvent {
456 event: Event {
457 kind: EventKind::Modify(ModifyKind::Name(RenameMode::Both)),
458 paths: vec![original_path, event.paths[0].clone()],
459 attrs: event.attrs,
460 },
461 time: original_time,
462 });
463 }
464
465 if let Some(target_queue) = self.queues.get_mut(&event.paths[0]) {
466 if !target_queue.was_created() {
467 let mut remove_event = DebouncedEvent {
468 event: Event {
469 kind: EventKind::Remove(RemoveKind::Any),
470 paths: vec![event.paths[0].clone()],
471 attrs: Default::default(),
472 },
473 time: original_time,
474 };
475 if !target_queue.was_removed() {
476 remove_event.event = remove_event.event.set_info("override");
477 }
478 source_queue.events.push_front(remove_event);
479 }
480 *target_queue = source_queue;
481 } else {
482 self.queues.insert(event.paths[0].clone(), source_queue);
483 }
484 }
485
486 fn push_remove_event(&mut self, event: Event, time: Instant) {
487 let path = &event.paths[0];
488
489 self.queues.retain(|p, _| !p.starts_with(path) || p == path);
491
492 self.cache.remove_path(path);
494
495 match self.queues.get_mut(path) {
496 Some(queue) if queue.was_created() => {
497 self.queues.remove(path);
498 }
499 Some(queue) => {
500 queue.events = [DebouncedEvent::new(event, time)].into();
501 }
502 None => {
503 self.push_event(event, time);
504 }
505 }
506 }
507
508 fn push_event(&mut self, event: Event, time: Instant) {
509 let path = &event.paths[0];
510
511 if let Some(queue) = self.queues.get_mut(path) {
512 if match event.kind {
515 EventKind::Modify(
516 ModifyKind::Any
517 | ModifyKind::Data(_)
518 | ModifyKind::Metadata(_)
519 | ModifyKind::Other,
520 )
521 | EventKind::Create(_) => !queue.was_created(),
522 _ => true,
523 } {
524 queue.events.push_back(DebouncedEvent::new(event, time));
525 }
526 } else {
527 self.queues.insert(
528 path.to_path_buf(),
529 Queue {
530 events: [DebouncedEvent::new(event, time)].into(),
531 },
532 );
533 }
534 }
535}
536
537#[derive(Debug)]
539pub struct Debouncer<T: Watcher, C: FileIdCache> {
540 watcher: T,
541 debouncer_thread: Option<std::thread::JoinHandle<()>>,
542 data: DebounceData<C>,
543 stop: Arc<AtomicBool>,
544}
545
546impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
547 pub fn stop(mut self) {
550 self.set_stop();
551 if let Some(t) = self.debouncer_thread.take() {
552 let _ = t.join();
553 }
554 }
555
556 pub fn stop_nonblocking(self) {
558 self.set_stop();
559 }
560
561 fn set_stop(&self) {
562 self.stop.store(true, Ordering::Relaxed);
563 }
564
565 #[deprecated = "`Debouncer` provides all methods from `Watcher` itself now. Remove `.watcher()` and use those methods directly."]
566 pub fn watcher(&mut self) {}
567
568 #[deprecated = "`Debouncer` now manages root paths automatically. Remove all calls to `add_root` and `remove_root`."]
569 pub fn cache(&mut self) {}
570
571 fn add_root(&mut self, path: impl Into<PathBuf>, recursive_mode: RecursiveMode) {
572 let path = path.into();
573
574 let mut data = self.data.lock().unwrap();
575
576 if data.roots.iter().any(|(p, _)| p == &path) {
578 return;
579 }
580
581 data.roots.push((path.clone(), recursive_mode));
582
583 data.cache.add_path(&path, recursive_mode);
584 }
585
586 fn remove_root(&mut self, path: impl AsRef<Path>) {
587 let mut data = self.data.lock().unwrap();
588
589 data.roots.retain(|(root, _)| !root.starts_with(&path));
590
591 data.cache.remove_path(path.as_ref());
592 }
593
594 pub fn watch(
595 &mut self,
596 path: impl AsRef<Path>,
597 recursive_mode: RecursiveMode,
598 ) -> notify::Result<()> {
599 self.watcher.watch(path.as_ref(), recursive_mode)?;
600 self.add_root(path.as_ref(), recursive_mode);
601 Ok(())
602 }
603
604 pub fn unwatch(&mut self, path: impl AsRef<Path>) -> notify::Result<()> {
605 self.watcher.unwatch(path.as_ref())?;
606 self.remove_root(path);
607 Ok(())
608 }
609
610 pub fn configure(&mut self, option: notify::Config) -> notify::Result<bool> {
611 self.watcher.configure(option)
612 }
613
614 pub fn kind() -> WatcherKind
615 where
616 Self: Sized,
617 {
618 T::kind()
619 }
620}
621
622impl<T: Watcher, C: FileIdCache> Drop for Debouncer<T, C> {
623 fn drop(&mut self) {
624 self.set_stop();
625 }
626}
627
628pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + Send + 'static>(
634 timeout: Duration,
635 tick_rate: Option<Duration>,
636 mut event_handler: F,
637 file_id_cache: C,
638 config: notify::Config,
639) -> Result<Debouncer<T, C>, Error> {
640 let data = Arc::new(Mutex::new(DebounceDataInner::new(file_id_cache, timeout)));
641 let stop = Arc::new(AtomicBool::new(false));
642
643 let tick_div = 4;
644 let tick = match tick_rate {
645 Some(v) => {
646 if v > timeout {
647 return Err(Error::new(ErrorKind::Generic(format!(
648 "Invalid tick_rate, tick rate {v:?} > {timeout:?} timeout!"
649 ))));
650 }
651 v
652 }
653 None => timeout.checked_div(tick_div).ok_or_else(|| {
654 Error::new(ErrorKind::Generic(format!(
655 "Failed to calculate tick as {timeout:?}/{tick_div}!"
656 )))
657 })?,
658 };
659
660 let data_c = data.clone();
661 let stop_c = stop.clone();
662 let thread = std::thread::Builder::new()
663 .name("notify-rs debouncer loop".to_string())
664 .spawn(move || loop {
665 if stop_c.load(Ordering::Acquire) {
666 break;
667 }
668 std::thread::sleep(tick);
669 let send_data;
670 let errors;
671 {
672 let mut lock = data_c.lock().unwrap();
673 send_data = lock.debounced_events();
674 errors = lock.errors();
675 }
676 if !send_data.is_empty() {
677 event_handler.handle_event(Ok(send_data));
678 }
679 if !errors.is_empty() {
680 event_handler.handle_event(Err(errors));
681 }
682 })?;
683
684 let data_c = data.clone();
685 let watcher = T::new(
686 move |e: Result<Event, Error>| {
687 let mut lock = data_c.lock().unwrap();
688
689 match e {
690 Ok(e) => lock.add_event(e),
691 Err(e) => lock.add_error(e),
693 }
694 },
695 config,
696 )?;
697
698 let guard = Debouncer {
699 watcher,
700 debouncer_thread: Some(thread),
701 data,
702 stop,
703 };
704
705 Ok(guard)
706}
707
708pub fn new_debouncer<F: DebounceEventHandler>(
714 timeout: Duration,
715 tick_rate: Option<Duration>,
716 event_handler: F,
717) -> Result<Debouncer<RecommendedWatcher, RecommendedCache>, Error> {
718 new_debouncer_opt::<F, RecommendedWatcher, RecommendedCache>(
719 timeout,
720 tick_rate,
721 event_handler,
722 RecommendedCache::new(),
723 notify::Config::default(),
724 )
725}
726
727fn sort_events(events: Vec<DebouncedEvent>) -> Vec<DebouncedEvent> {
728 let mut sorted = Vec::with_capacity(events.len());
729
730 let mut events_by_path: HashMap<_, VecDeque<_>> =
732 events.into_iter().fold(HashMap::new(), |mut acc, event| {
733 acc.entry(event.paths.last().cloned().unwrap_or_default())
734 .or_default()
735 .push_back(event);
736 acc
737 });
738
739 let mut min_time_heap = events_by_path
742 .iter()
743 .map(|(path, events)| Reverse((events[0].time, path.clone())))
744 .collect::<BinaryHeap<_>>();
745
746 while let Some(Reverse((min_time, path))) = min_time_heap.pop() {
747 let events = events_by_path.get_mut(&path).unwrap();
750
751 let mut push_next = false;
752
753 while events.front().is_some_and(|event| event.time <= min_time) {
754 let event = events.pop_front().unwrap();
756 sorted.push(event);
757 push_next = true;
758 }
759
760 if push_next {
761 if let Some(event) = events.front() {
762 min_time_heap.push(Reverse((event.time, path)));
763 }
764 }
765 }
766
767 sorted
768}
769
770#[cfg(test)]
771mod tests {
772 use std::{fs, path::Path};
773
774 use super::*;
775
776 use pretty_assertions::assert_eq;
777 use rstest::rstest;
778 use tempfile::tempdir;
779 use testing::TestCase;
780 use time::MockTime;
781
782 #[rstest]
783 fn state(
784 #[values(
785 "add_create_event",
786 "add_create_event_after_remove_event",
787 "add_create_dir_event_twice",
788 "add_event_with_no_paths_is_ok",
789 "add_modify_any_event_after_create_event",
790 "add_modify_content_event_after_create_event",
791 "add_rename_from_event",
792 "add_rename_from_event_after_create_event",
793 "add_rename_from_event_after_modify_event",
794 "add_rename_from_event_after_create_and_modify_event",
795 "add_rename_from_event_after_rename_from_event",
796 "add_rename_to_event",
797 "add_rename_to_dir_event",
798 "add_rename_from_and_to_event",
799 "add_rename_from_and_to_event_after_create",
800 "add_rename_from_and_to_event_after_rename",
801 "add_rename_from_and_to_event_after_modify_content",
802 "add_rename_from_and_to_event_override_created",
803 "add_rename_from_and_to_event_override_modified",
804 "add_rename_from_and_to_event_override_removed",
805 "add_rename_from_and_to_event_with_file_ids",
806 "add_rename_from_and_to_event_with_different_file_ids",
807 "add_rename_from_and_to_event_with_different_tracker",
808 "add_rename_both_event",
809 "add_remove_event",
810 "add_remove_event_after_create_event",
811 "add_remove_event_after_modify_event",
812 "add_remove_event_after_create_and_modify_event",
813 "add_remove_parent_event_after_remove_child_event",
814 "add_errors",
815 "emit_continuous_modify_content_events",
816 "emit_events_in_chronological_order",
817 "emit_events_with_a_prepended_rename_event",
818 "emit_close_events_only_once",
819 "emit_modify_event_after_close_event",
820 "emit_needs_rescan_event",
821 "read_file_id_without_create_event",
822 "sort_events_chronologically",
823 "sort_events_with_reordering"
824 )]
825 file_name: &str,
826 ) {
827 let file_content =
828 fs::read_to_string(Path::new(&format!("./test_cases/{file_name}.hjson"))).unwrap();
829 let mut test_case = deser_hjson::from_str::<TestCase>(&file_content).unwrap();
830
831 let time = now();
832 MockTime::set_time(time);
833
834 let mut state = test_case.state.into_debounce_data_inner(time);
835 state.roots = vec![(PathBuf::from("/"), RecursiveMode::Recursive)];
836
837 let mut prev_event_time = Duration::default();
838
839 for event in test_case.events {
840 let event_time = Duration::from_millis(event.time);
841 let event = event.into_debounced_event(time, None);
842 MockTime::advance(event_time - prev_event_time);
843 prev_event_time = event_time;
844 state.add_event(event.event);
845 }
846
847 for error in test_case.errors {
848 let error = error.into_notify_error();
849 state.add_error(error);
850 }
851
852 let expected_errors = std::mem::take(&mut test_case.expected.errors);
853 let expected_events = std::mem::take(&mut test_case.expected.events);
854 let expected_state = test_case.expected.into_debounce_data_inner(time);
855 assert_eq!(
856 state.queues, expected_state.queues,
857 "queues not as expected"
858 );
859 assert_eq!(
860 state.rename_event, expected_state.rename_event,
861 "rename event not as expected"
862 );
863 assert_eq!(
864 state.rescan_event, expected_state.rescan_event,
865 "rescan event not as expected"
866 );
867 assert_eq!(
868 state.cache.paths, expected_state.cache.paths,
869 "cache not as expected"
870 );
871
872 assert_eq!(
873 state
874 .errors
875 .iter()
876 .map(|e| format!("{e:?}"))
877 .collect::<Vec<_>>(),
878 expected_errors
879 .iter()
880 .map(|e| format!("{:?}", e.clone().into_notify_error()))
881 .collect::<Vec<_>>(),
882 "errors not as expected"
883 );
884
885 let backup_time = now();
886 let backup_queues = state.queues.clone();
887
888 for (delay, events) in expected_events {
889 MockTime::set_time(backup_time);
890 state.queues = backup_queues.clone();
891
892 match delay.as_str() {
893 "none" => {}
894 "short" => MockTime::advance(Duration::from_millis(10)),
895 "long" => MockTime::advance(Duration::from_millis(100)),
896 _ => {
897 if let Ok(ts) = delay.parse::<u64>() {
898 MockTime::set_time(time + Duration::from_millis(ts));
899 }
900 }
901 }
902
903 let events = events
904 .into_iter()
905 .map(|event| event.into_debounced_event(time, None))
906 .collect::<Vec<_>>();
907
908 assert_eq!(
909 state.debounced_events(),
910 events,
911 "debounced events after a `{delay}` delay"
912 );
913 }
914 }
915
916 #[test]
917 fn integration() -> Result<(), Box<dyn std::error::Error>> {
918 let dir = tempdir()?;
919
920 let (tx, rx) = std::sync::mpsc::channel();
922 let mut debouncer = new_debouncer(Duration::from_millis(10), None, tx)?;
923 debouncer.watch(dir.path(), RecursiveMode::Recursive)?;
924
925 let file_path = dir.path().join("file.txt");
927 fs::write(&file_path, b"Lorem ipsum")?;
928
929 println!("waiting for event at {}", file_path.display());
930
931 let deadline = Instant::now() + Duration::from_secs(10);
933 while deadline > Instant::now() {
934 let events = rx
935 .recv_timeout(deadline - Instant::now())
936 .expect("did not receive expected event")
937 .expect("received an error");
938
939 for event in events {
940 if event.event.paths == vec![file_path.clone()]
941 || event.event.paths == vec![file_path.canonicalize()?]
942 {
943 return Ok(());
944 }
945
946 println!("unexpected event: {event:?}");
947 }
948 }
949
950 panic!("did not receive expected event");
951 }
952}