1mod cache;
65mod time;
66
67#[cfg(test)]
68mod testing;
69
70use std::{
71 cmp::Reverse,
72 collections::{BinaryHeap, HashMap, VecDeque},
73 path::{Path, PathBuf},
74 sync::{
75 atomic::{AtomicBool, Ordering},
76 Arc, Mutex,
77 },
78 time::{Duration, Instant},
79};
80
81use time::now;
82
83pub use cache::{FileIdCache, FileIdMap, NoCache, RecommendedCache};
84
85pub use file_id;
86pub use notify;
87pub use notify_types::debouncer_full::DebouncedEvent;
88
89use file_id::FileId;
90use notify::{
91 event::{ModifyKind, RemoveKind, RenameMode},
92 Error, ErrorKind, Event, EventKind, PathsMut, RecommendedWatcher, RecursiveMode, Watcher,
93 WatcherKind,
94};
95
96pub trait DebounceEventHandler: Send + 'static {
117 fn handle_event(&mut self, event: DebounceEventResult);
119}
120
121impl<F> DebounceEventHandler for F
122where
123 F: FnMut(DebounceEventResult) + Send + 'static,
124{
125 fn handle_event(&mut self, event: DebounceEventResult) {
126 (self)(event);
127 }
128}
129
130#[cfg(feature = "crossbeam-channel")]
131impl DebounceEventHandler for crossbeam_channel::Sender<DebounceEventResult> {
132 fn handle_event(&mut self, event: DebounceEventResult) {
133 let _ = self.send(event);
134 }
135}
136
137#[cfg(feature = "flume")]
138impl DebounceEventHandler for flume::Sender<DebounceEventResult> {
139 fn handle_event(&mut self, event: DebounceEventResult) {
140 let _ = self.send(event);
141 }
142}
143
144impl DebounceEventHandler for std::sync::mpsc::Sender<DebounceEventResult> {
145 fn handle_event(&mut self, event: DebounceEventResult) {
146 let _ = self.send(event);
147 }
148}
149
150pub type DebounceEventResult = Result<Vec<DebouncedEvent>, Vec<Error>>;
153
154type DebounceData<T> = Arc<Mutex<DebounceDataInner<T>>>;
155
156#[derive(Debug, Clone, Default, PartialEq, Eq)]
157struct Queue {
158 events: VecDeque<DebouncedEvent>,
163}
164
165impl Queue {
166 fn was_created(&self) -> bool {
167 self.events.front().is_some_and(|event| {
168 matches!(
169 event.kind,
170 EventKind::Create(_) | EventKind::Modify(ModifyKind::Name(RenameMode::To))
171 )
172 })
173 }
174
175 fn was_removed(&self) -> bool {
176 self.events.front().is_some_and(|event| {
177 matches!(
178 event.kind,
179 EventKind::Remove(_) | EventKind::Modify(ModifyKind::Name(RenameMode::From))
180 )
181 })
182 }
183}
184
185#[derive(Debug)]
186pub(crate) struct DebounceDataInner<T> {
187 queues: HashMap<PathBuf, Queue>,
188 roots: Vec<(PathBuf, RecursiveMode)>,
189 cache: T,
190 rename_event: Option<(DebouncedEvent, Option<FileId>)>,
191 rescan_event: Option<DebouncedEvent>,
192 errors: Vec<Error>,
193 timeout: Duration,
194}
195
196impl<T: FileIdCache> DebounceDataInner<T> {
197 pub(crate) fn new(cache: T, timeout: Duration) -> Self {
198 Self {
199 queues: HashMap::new(),
200 roots: Vec::new(),
201 cache,
202 rename_event: None,
203 rescan_event: None,
204 errors: Vec::new(),
205 timeout,
206 }
207 }
208
209 pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
211 let now = now();
212 let mut events_expired = Vec::with_capacity(self.queues.len());
213 let mut queues_remaining = HashMap::with_capacity(self.queues.len());
214
215 if let Some(event) = self.rescan_event.take() {
216 if now.saturating_duration_since(event.time) >= self.timeout {
217 log::trace!("debounced event: {event:?}");
218 events_expired.push(event);
219 } else {
220 self.rescan_event = Some(event);
221 }
222 }
223
224 for (path, mut queue) in self.queues.drain() {
227 let mut kind_index = HashMap::new();
228
229 while let Some(event) = queue.events.pop_front() {
230 if let Some(idx) = kind_index.get(&event.kind).copied() {
232 events_expired.remove(idx);
233
234 kind_index.values_mut().for_each(|i| {
235 if *i > idx {
236 *i -= 1
237 }
238 })
239 }
240
241 if now.saturating_duration_since(event.time) >= self.timeout {
242 kind_index.insert(event.kind, events_expired.len());
243
244 events_expired.push(event);
245 } else {
246 queue.events.push_front(event);
247 break;
248 }
249 }
250
251 if !queue.events.is_empty() {
252 queues_remaining.insert(path, queue);
253 }
254 }
255
256 self.queues = queues_remaining;
257
258 sort_events(events_expired)
259 }
260
261 pub fn errors(&mut self) -> Vec<Error> {
263 std::mem::take(&mut self.errors)
264 }
265
266 pub fn add_error(&mut self, error: Error) {
268 log::trace!("raw error: {error:?}");
269
270 self.errors.push(error);
271 }
272
273 pub fn add_event(&mut self, event: Event) {
275 log::trace!("raw event: {event:?}");
276
277 if event.need_rescan() {
278 self.cache.rescan(&self.roots);
279 self.rescan_event = Some(DebouncedEvent { event, time: now() });
280 return;
281 }
282
283 let path = match event.paths.first() {
284 Some(path) => path,
285 None => {
286 log::info!("skipping event with no paths: {event:?}");
287 return;
288 }
289 };
290
291 match &event.kind {
292 EventKind::Create(_) => {
293 let recursive_mode = self.recursive_mode(path);
294
295 self.cache.add_path(path, recursive_mode);
296
297 self.push_event(event, now());
298 }
299 EventKind::Modify(ModifyKind::Name(rename_mode)) => {
300 match rename_mode {
301 RenameMode::Any => {
302 if event.paths[0].exists() {
303 self.handle_rename_to(event);
304 } else {
305 self.handle_rename_from(event);
306 }
307 }
308 RenameMode::To => {
309 self.handle_rename_to(event);
310 }
311 RenameMode::From => {
312 self.handle_rename_from(event);
313 }
314 RenameMode::Both => {
315 }
317 RenameMode::Other => {
318 }
320 }
321 }
322 EventKind::Remove(_) => {
323 self.push_remove_event(event, now());
324 }
325 EventKind::Other => {
326 }
328 _ => {
329 if self.cache.cached_file_id(path).is_none() {
330 let recursive_mode = self.recursive_mode(path);
331
332 self.cache.add_path(path, recursive_mode);
333 }
334
335 self.push_event(event, now());
336 }
337 }
338 }
339
340 fn recursive_mode(&mut self, path: &Path) -> RecursiveMode {
341 self.roots
342 .iter()
343 .find_map(|(root, recursive_mode)| {
344 if path.starts_with(root) {
345 Some(*recursive_mode)
346 } else {
347 None
348 }
349 })
350 .unwrap_or(RecursiveMode::NonRecursive)
351 }
352
353 fn handle_rename_from(&mut self, event: Event) {
354 let time = now();
355 let path = &event.paths[0];
356
357 let file_id = self.cache.cached_file_id(path).map(|id| *id.as_ref());
359 self.rename_event = Some((DebouncedEvent::new(event.clone(), time), file_id));
360
361 self.cache.remove_path(path);
362
363 self.push_event(event, time);
364 }
365
366 fn handle_rename_to(&mut self, event: Event) {
367 let recursive_mode = self.recursive_mode(&event.paths[0]);
368
369 self.cache.add_path(&event.paths[0], recursive_mode);
370
371 let trackers_match = self
372 .rename_event
373 .as_ref()
374 .and_then(|(e, _)| e.tracker())
375 .and_then(|from_tracker| {
376 event
377 .attrs
378 .tracker()
379 .map(|to_tracker| from_tracker == to_tracker)
380 })
381 .unwrap_or_default();
382
383 let file_ids_match = self
384 .rename_event
385 .as_ref()
386 .and_then(|(_, id)| id.as_ref())
387 .and_then(|from_file_id| {
388 self.cache
389 .cached_file_id(&event.paths[0])
390 .map(|to_file_id| from_file_id == to_file_id.as_ref())
391 })
392 .unwrap_or_default();
393
394 if trackers_match || file_ids_match {
395 let (mut rename_event, _) = self.rename_event.take().unwrap(); let path = rename_event.paths.remove(0);
398 let time = rename_event.time;
399 self.push_rename_event(path, event, time);
400 } else {
401 self.push_event(event, now());
403 }
404
405 self.rename_event = None;
406 }
407
408 fn push_rename_event(&mut self, path: PathBuf, event: Event, time: Instant) {
409 self.cache.remove_path(&path);
410
411 let mut source_queue = self.queues.remove(&path).unwrap_or_default();
412
413 source_queue.events.pop_back();
415
416 let (remove_index, original_path, original_time) = source_queue
418 .events
419 .iter()
420 .enumerate()
421 .find_map(|(index, e)| {
422 if matches!(
423 e.kind,
424 EventKind::Modify(ModifyKind::Name(RenameMode::Both))
425 ) {
426 Some((Some(index), e.paths[0].clone(), e.time))
427 } else {
428 None
429 }
430 })
431 .unwrap_or((None, path, time));
432
433 if let Some(remove_index) = remove_index {
434 source_queue.events.remove(remove_index);
435 }
436
437 if source_queue.was_removed() {
439 let event = source_queue.events.pop_front().unwrap();
440
441 self.queues.insert(
442 event.paths[0].clone(),
443 Queue {
444 events: [event].into(),
445 },
446 );
447 }
448
449 for e in &mut source_queue.events {
451 e.paths = vec![event.paths[0].clone()];
452 }
453
454 if !source_queue.was_created() {
456 source_queue.events.push_front(DebouncedEvent {
457 event: Event {
458 kind: EventKind::Modify(ModifyKind::Name(RenameMode::Both)),
459 paths: vec![original_path, event.paths[0].clone()],
460 attrs: event.attrs,
461 },
462 time: original_time,
463 });
464 }
465
466 if let Some(target_queue) = self.queues.get_mut(&event.paths[0]) {
467 if !target_queue.was_created() {
468 let mut remove_event = DebouncedEvent {
469 event: Event {
470 kind: EventKind::Remove(RemoveKind::Any),
471 paths: vec![event.paths[0].clone()],
472 attrs: Default::default(),
473 },
474 time: original_time,
475 };
476 if !target_queue.was_removed() {
477 remove_event.event = remove_event.event.set_info("override");
478 }
479 source_queue.events.push_front(remove_event);
480 }
481 *target_queue = source_queue;
482 } else {
483 self.queues.insert(event.paths[0].clone(), source_queue);
484 }
485 }
486
487 fn push_remove_event(&mut self, event: Event, time: Instant) {
488 let path = &event.paths[0];
489
490 self.queues.retain(|p, _| !p.starts_with(path) || p == path);
492
493 self.cache.remove_path(path);
495
496 match self.queues.get_mut(path) {
497 Some(queue) if queue.was_created() => {
498 self.queues.remove(path);
499 }
500 Some(queue) => {
501 queue.events = [DebouncedEvent::new(event, time)].into();
502 }
503 None => {
504 self.push_event(event, time);
505 }
506 }
507 }
508
509 fn push_event(&mut self, event: Event, time: Instant) {
510 let path = &event.paths[0];
511
512 if let Some(queue) = self.queues.get_mut(path) {
513 if match event.kind {
516 EventKind::Modify(
517 ModifyKind::Any
518 | ModifyKind::Data(_)
519 | ModifyKind::Metadata(_)
520 | ModifyKind::Other,
521 )
522 | EventKind::Create(_) => !queue.was_created(),
523 _ => true,
524 } {
525 queue.events.push_back(DebouncedEvent::new(event, time));
526 }
527 } else {
528 self.queues.insert(
529 path.to_path_buf(),
530 Queue {
531 events: [DebouncedEvent::new(event, time)].into(),
532 },
533 );
534 }
535 }
536}
537
538#[derive(Debug)]
540pub struct Debouncer<T: Watcher, C: FileIdCache> {
541 watcher: T,
542 debouncer_thread: Option<std::thread::JoinHandle<()>>,
543 data: DebounceData<C>,
544 stop: Arc<AtomicBool>,
545}
546
547impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
548 pub fn stop(mut self) {
551 self.set_stop();
552 if let Some(t) = self.debouncer_thread.take() {
553 let _ = t.join();
554 }
555 }
556
557 pub fn stop_nonblocking(self) {
559 self.set_stop();
560 }
561
562 fn set_stop(&self) {
563 self.stop.store(true, Ordering::Relaxed);
564 }
565
566 #[deprecated = "`Debouncer` provides all methods from `Watcher` itself now. Remove `.watcher()` and use those methods directly."]
567 pub fn watcher(&mut self) {}
568
569 #[deprecated = "`Debouncer` now manages root paths automatically. Remove all calls to `add_root` and `remove_root`."]
570 pub fn cache(&mut self) {}
571
572 fn add_root(&mut self, path: impl Into<PathBuf>, recursive_mode: RecursiveMode) {
573 let path = path.into();
574
575 let mut data = self.data.lock().unwrap();
576
577 if data.roots.iter().any(|(p, _)| p == &path) {
579 return;
580 }
581
582 data.roots.push((path.clone(), recursive_mode));
583
584 data.cache.add_path(&path, recursive_mode);
585 }
586
587 fn remove_root(&mut self, path: impl AsRef<Path>) {
588 let mut data = self.data.lock().unwrap();
589
590 data.roots.retain(|(root, _)| !root.starts_with(&path));
591
592 data.cache.remove_path(path.as_ref());
593 }
594
595 pub fn watch(
596 &mut self,
597 path: impl AsRef<Path>,
598 recursive_mode: RecursiveMode,
599 ) -> notify::Result<()> {
600 self.watcher.watch(path.as_ref(), recursive_mode)?;
601 self.add_root(path.as_ref(), recursive_mode);
602 Ok(())
603 }
604
605 pub fn unwatch(&mut self, path: impl AsRef<Path>) -> notify::Result<()> {
606 self.watcher.unwatch(path.as_ref())?;
607 self.remove_root(path);
608 Ok(())
609 }
610
611 pub fn paths_mut<'me>(&'me mut self) -> Box<dyn PathsMut + 'me> {
612 self.watcher.paths_mut()
613 }
614
615 pub fn configure(&mut self, option: notify::Config) -> notify::Result<bool> {
616 self.watcher.configure(option)
617 }
618
619 pub fn kind() -> WatcherKind
620 where
621 Self: Sized,
622 {
623 T::kind()
624 }
625}
626
627impl<T: Watcher, C: FileIdCache> Drop for Debouncer<T, C> {
628 fn drop(&mut self) {
629 self.set_stop();
630 }
631}
632
633pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + Send + 'static>(
639 timeout: Duration,
640 tick_rate: Option<Duration>,
641 mut event_handler: F,
642 file_id_cache: C,
643 config: notify::Config,
644) -> Result<Debouncer<T, C>, Error> {
645 let data = Arc::new(Mutex::new(DebounceDataInner::new(file_id_cache, timeout)));
646 let stop = Arc::new(AtomicBool::new(false));
647
648 let tick_div = 4;
649 let tick = match tick_rate {
650 Some(v) => {
651 if v > timeout {
652 return Err(Error::new(ErrorKind::Generic(format!(
653 "Invalid tick_rate, tick rate {v:?} > {timeout:?} timeout!"
654 ))));
655 }
656 v
657 }
658 None => timeout.checked_div(tick_div).ok_or_else(|| {
659 Error::new(ErrorKind::Generic(format!(
660 "Failed to calculate tick as {timeout:?}/{tick_div}!"
661 )))
662 })?,
663 };
664
665 let data_c = data.clone();
666 let stop_c = stop.clone();
667 let thread = std::thread::Builder::new()
668 .name("notify-rs debouncer loop".to_string())
669 .spawn(move || loop {
670 if stop_c.load(Ordering::Acquire) {
671 break;
672 }
673 std::thread::sleep(tick);
674 let send_data;
675 let errors;
676 {
677 let mut lock = data_c.lock().unwrap();
678 send_data = lock.debounced_events();
679 errors = lock.errors();
680 }
681 if !send_data.is_empty() {
682 event_handler.handle_event(Ok(send_data));
683 }
684 if !errors.is_empty() {
685 event_handler.handle_event(Err(errors));
686 }
687 })?;
688
689 let data_c = data.clone();
690 let watcher = T::new(
691 move |e: Result<Event, Error>| {
692 let mut lock = data_c.lock().unwrap();
693
694 match e {
695 Ok(e) => lock.add_event(e),
696 Err(e) => lock.add_error(e),
698 }
699 },
700 config,
701 )?;
702
703 let guard = Debouncer {
704 watcher,
705 debouncer_thread: Some(thread),
706 data,
707 stop,
708 };
709
710 Ok(guard)
711}
712
713pub fn new_debouncer<F: DebounceEventHandler>(
719 timeout: Duration,
720 tick_rate: Option<Duration>,
721 event_handler: F,
722) -> Result<Debouncer<RecommendedWatcher, RecommendedCache>, Error> {
723 new_debouncer_opt::<F, RecommendedWatcher, RecommendedCache>(
724 timeout,
725 tick_rate,
726 event_handler,
727 RecommendedCache::new(),
728 notify::Config::default(),
729 )
730}
731
732fn sort_events(events: Vec<DebouncedEvent>) -> Vec<DebouncedEvent> {
733 let mut sorted = Vec::with_capacity(events.len());
734
735 let mut events_by_path: HashMap<_, VecDeque<_>> =
737 events.into_iter().fold(HashMap::new(), |mut acc, event| {
738 acc.entry(event.paths.last().cloned().unwrap_or_default())
739 .or_default()
740 .push_back(event);
741 acc
742 });
743
744 let mut min_time_heap = events_by_path
747 .iter()
748 .map(|(path, events)| Reverse((events[0].time, path.clone())))
749 .collect::<BinaryHeap<_>>();
750
751 while let Some(Reverse((min_time, path))) = min_time_heap.pop() {
752 let events = events_by_path.get_mut(&path).unwrap();
755
756 let mut push_next = false;
757
758 while events.front().is_some_and(|event| event.time <= min_time) {
759 let event = events.pop_front().unwrap();
761 sorted.push(event);
762 push_next = true;
763 }
764
765 if push_next {
766 if let Some(event) = events.front() {
767 min_time_heap.push(Reverse((event.time, path)));
768 }
769 }
770 }
771
772 sorted
773}
774
775#[cfg(test)]
776mod tests {
777 use std::{fs, path::Path};
778
779 use super::*;
780
781 use pretty_assertions::assert_eq;
782 use rstest::rstest;
783 use tempfile::tempdir;
784 use testing::TestCase;
785 use time::MockTime;
786
787 #[rstest]
788 fn state(
789 #[values(
790 "add_create_event",
791 "add_create_event_after_remove_event",
792 "add_create_dir_event_twice",
793 "add_event_with_no_paths_is_ok",
794 "add_modify_any_event_after_create_event",
795 "add_modify_content_event_after_create_event",
796 "add_rename_from_event",
797 "add_rename_from_event_after_create_event",
798 "add_rename_from_event_after_modify_event",
799 "add_rename_from_event_after_create_and_modify_event",
800 "add_rename_from_event_after_rename_from_event",
801 "add_rename_to_event",
802 "add_rename_to_dir_event",
803 "add_rename_from_and_to_event",
804 "add_rename_from_and_to_event_after_create",
805 "add_rename_from_and_to_event_after_rename",
806 "add_rename_from_and_to_event_after_modify_content",
807 "add_rename_from_and_to_event_override_created",
808 "add_rename_from_and_to_event_override_modified",
809 "add_rename_from_and_to_event_override_removed",
810 "add_rename_from_and_to_event_with_file_ids",
811 "add_rename_from_and_to_event_with_different_file_ids",
812 "add_rename_from_and_to_event_with_different_tracker",
813 "add_rename_both_event",
814 "add_remove_event",
815 "add_remove_event_after_create_event",
816 "add_remove_event_after_modify_event",
817 "add_remove_event_after_create_and_modify_event",
818 "add_remove_parent_event_after_remove_child_event",
819 "add_errors",
820 "debounce_modify_events",
821 "emit_continuous_modify_content_events",
822 "emit_events_in_chronological_order",
823 "emit_events_with_a_prepended_rename_event",
824 "emit_close_events_only_once",
825 "emit_modify_event_after_close_event",
826 "emit_needs_rescan_event",
827 "read_file_id_without_create_event",
828 "sort_events_chronologically",
829 "sort_events_with_reordering"
830 )]
831 file_name: &str,
832 ) {
833 let file_content =
834 fs::read_to_string(Path::new(&format!("./test_cases/{file_name}.hjson"))).unwrap();
835 let mut test_case = deser_hjson::from_str::<TestCase>(&file_content).unwrap();
836
837 let time = now();
838 MockTime::set_time(time);
839
840 let mut state = test_case.state.into_debounce_data_inner(time);
841 state.roots = vec![(PathBuf::from("/"), RecursiveMode::Recursive)];
842
843 let mut prev_event_time = Duration::default();
844
845 for event in test_case.events {
846 let event_time = Duration::from_millis(event.time);
847 let event = event.into_debounced_event(time, None);
848 MockTime::advance(event_time - prev_event_time);
849 prev_event_time = event_time;
850 state.add_event(event.event);
851 }
852
853 for error in test_case.errors {
854 let error = error.into_notify_error();
855 state.add_error(error);
856 }
857
858 let expected_errors = std::mem::take(&mut test_case.expected.errors);
859 let expected_events = std::mem::take(&mut test_case.expected.events);
860 let expected_state = test_case.expected.into_debounce_data_inner(time);
861 assert_eq!(
862 state.queues, expected_state.queues,
863 "queues not as expected"
864 );
865 assert_eq!(
866 state.rename_event, expected_state.rename_event,
867 "rename event not as expected"
868 );
869 assert_eq!(
870 state.rescan_event, expected_state.rescan_event,
871 "rescan event not as expected"
872 );
873 assert_eq!(
874 state.cache.paths, expected_state.cache.paths,
875 "cache not as expected"
876 );
877
878 assert_eq!(
879 state
880 .errors
881 .iter()
882 .map(|e| format!("{e:?}"))
883 .collect::<Vec<_>>(),
884 expected_errors
885 .iter()
886 .map(|e| format!("{:?}", e.clone().into_notify_error()))
887 .collect::<Vec<_>>(),
888 "errors not as expected"
889 );
890
891 let backup_time = now();
892 let backup_queues = state.queues.clone();
893
894 for (delay, events) in expected_events {
895 MockTime::set_time(backup_time);
896 state.queues = backup_queues.clone();
897
898 match delay.as_str() {
899 "none" => {}
900 "short" => MockTime::advance(Duration::from_millis(10)),
901 "long" => MockTime::advance(Duration::from_millis(100)),
902 _ => {
903 if let Ok(ts) = delay.parse::<u64>() {
904 MockTime::set_time(time + Duration::from_millis(ts));
905 }
906 }
907 }
908
909 let events = events
910 .into_iter()
911 .map(|event| event.into_debounced_event(time, None))
912 .collect::<Vec<_>>();
913
914 assert_eq!(
915 state.debounced_events(),
916 events,
917 "debounced events after a `{delay}` delay"
918 );
919 }
920 }
921
922 #[test]
923 fn integration() -> Result<(), Box<dyn std::error::Error>> {
924 let dir = tempdir()?;
925
926 let (tx, rx) = std::sync::mpsc::channel();
928 let mut debouncer = new_debouncer(Duration::from_millis(10), None, tx)?;
929 debouncer.watch(dir.path(), RecursiveMode::Recursive)?;
930
931 let file_path = dir.path().join("file.txt");
933 fs::write(&file_path, b"Lorem ipsum")?;
934
935 println!("waiting for event at {}", file_path.display());
936
937 let deadline = Instant::now() + Duration::from_secs(10);
939 while deadline > Instant::now() {
940 let events = rx
941 .recv_timeout(deadline - Instant::now())
942 .expect("did not receive expected event")
943 .expect("received an error");
944
945 for event in events {
946 if event.event.paths == vec![file_path.clone()]
947 || event.event.paths == vec![file_path.canonicalize()?]
948 {
949 return Ok(());
950 }
951
952 println!("unexpected event: {event:?}");
953 }
954 }
955
956 panic!("did not receive expected event");
957 }
958}