Skip to main content

notify_debouncer_full/
lib.rs

1//! A debouncer for [notify] that is optimized for ease of use.
2//!
3//! * Only emits a single `Rename` event if the rename `From` and `To` events can be matched
4//! * Merges multiple `Rename` events
5//! * Takes `Rename` events into account and updates paths for events that occurred before the rename event, but which haven't been emitted, yet
6//! * Optionally keeps track of the file system IDs all files and stitches rename events together (macOS FS Events, Windows)
7//! * Emits only one `Remove` event when deleting a directory (inotify)
8//! * Doesn't emit duplicate create events
9//! * Doesn't emit `Modify` events after a `Create` event
10//!
11//! # Installation
12//!
13//! ```toml
14//! [dependencies]
15//! notify-debouncer-full = "0.5.0"
16//! ```
17//!
18//! In case you want to select specific features of notify,
19//! specify notify as dependency explicitly in your dependencies.
20//! Otherwise you can just use the re-export of notify from debouncer-full.
21//!
22//! ```toml
23//! notify-debouncer-full = "0.5.0"
24//! notify = { version = "..", features = [".."] }
25//! ```
26//!
27//! # Examples
28//!
29//! ```rust,no_run
30//! # use std::path::Path;
31//! # use std::time::Duration;
32//! use notify_debouncer_full::{notify::*, new_debouncer, DebounceEventResult};
33//!
34//! // Select recommended watcher for debouncer.
35//! // Using a callback here, could also be a channel.
36//! let mut debouncer = new_debouncer(Duration::from_secs(2), None, |result: DebounceEventResult| {
37//!     match result {
38//!         Ok(events) => events.iter().for_each(|event| println!("{event:?}")),
39//!         Err(errors) => errors.iter().for_each(|error| println!("{error:?}")),
40//!     }
41//! }).unwrap();
42//!
43//! // Add a path to be watched. All files and directories at that path and
44//! // below will be monitored for changes.
45//! debouncer.watch(".", WatchMode::recursive()).unwrap();
46//! ```
47//!
48//! # Features
49//!
50//! The following crate features can be turned on or off in your cargo dependency config:
51//!
52//! - `serde` passed down to notify-types, off by default
53//! - `web-time` passed down to notify-types, off by default
54//! - `crossbeam-channel` passed down to notify, off by default
55//! - `flume` passed down to notify, off by default
56//! - `macos_fsevent` passed down to notify, off by default
57//! - `macos_kqueue` passed down to notify, off by default
58//! - `serialization-compat-6` passed down to notify, off by default
59//!
60//! # Caveats
61//!
62//! As all file events are sourced from notify, the [known problems](https://docs.rs/notify/latest/notify/#known-problems) section applies here too.
63
64mod cache;
65mod time;
66
67#[cfg(test)]
68mod testing;
69
70#[cfg(not(target_family = "wasm"))]
71mod file_id_map;
72
73use std::{
74    cmp::Reverse,
75    collections::{BinaryHeap, HashMap, VecDeque},
76    path::{Path, PathBuf},
77    sync::{
78        Arc, Mutex,
79        atomic::{AtomicBool, Ordering},
80    },
81    time::{Duration, Instant},
82};
83
84use time::now;
85
86pub use cache::{FileIdCache, NoCache, RecommendedCache};
87
88#[cfg(not(target_family = "wasm"))]
89pub use file_id_map::FileIdMap;
90
91pub use file_id;
92pub use notify;
93pub use notify_types::debouncer_full::DebouncedEvent;
94
95use file_id::FileId;
96use notify::{
97    Error, ErrorKind, Event, EventKind, PathsMut, RecommendedWatcher, RecursiveMode, TargetMode,
98    WatchMode, Watcher, WatcherKind,
99    event::{EventAttributes, ModifyKind, RemoveKind, RenameMode},
100};
101
102/// The set of requirements for watcher debounce event handling functions.
103///
104/// # Example implementation
105///
106/// ```rust,no_run
107/// # use notify::{Event, Result, EventHandler};
108/// # use notify_debouncer_full::{DebounceEventHandler, DebounceEventResult};
109///
110/// /// Prints received events
111/// struct EventPrinter;
112///
113/// impl DebounceEventHandler for EventPrinter {
114///     fn handle_event(&mut self, result: DebounceEventResult) {
115///         match result {
116///             Ok(events) => events.iter().for_each(|event| println!("{event:?}")),
117///             Err(errors) => errors.iter().for_each(|error| println!("{error:?}")),
118///         }
119///     }
120/// }
121/// ```
122pub trait DebounceEventHandler: Send + 'static {
123    /// Handles an event.
124    fn handle_event(&mut self, event: DebounceEventResult);
125}
126
127impl<F> DebounceEventHandler for F
128where
129    F: FnMut(DebounceEventResult) + Send + 'static,
130{
131    fn handle_event(&mut self, event: DebounceEventResult) {
132        (self)(event);
133    }
134}
135
136#[cfg(feature = "crossbeam-channel")]
137impl DebounceEventHandler for crossbeam_channel::Sender<DebounceEventResult> {
138    fn handle_event(&mut self, event: DebounceEventResult) {
139        let result = self.send(event);
140        if let Err(e) = result {
141            tracing::error!(?e, "failed to send debounce event result");
142        }
143    }
144}
145
146#[cfg(feature = "flume")]
147impl DebounceEventHandler for flume::Sender<DebounceEventResult> {
148    fn handle_event(&mut self, event: DebounceEventResult) {
149        let result = self.send(event);
150        if let Err(e) = result {
151            tracing::error!(?e, "failed to send debounce event result");
152        }
153    }
154}
155
156impl DebounceEventHandler for std::sync::mpsc::Sender<DebounceEventResult> {
157    fn handle_event(&mut self, event: DebounceEventResult) {
158        let result = self.send(event);
159        if let Err(e) = result {
160            tracing::error!(?e, "failed to send debounce event result");
161        }
162    }
163}
164
165/// A result of debounced events.
166/// Comes with either a vec of events or vec of errors.
167pub type DebounceEventResult = Result<Vec<DebouncedEvent>, Vec<Error>>;
168
169type DebounceData<T> = Arc<Mutex<DebounceDataInner<T>>>;
170
171#[derive(Debug, Clone, Default, PartialEq, Eq)]
172struct Queue {
173    /// Events must be stored in the following order:
174    /// 1. `remove` or `move out` event
175    /// 2. `rename` event
176    /// 3. Other events
177    events: VecDeque<DebouncedEvent>,
178}
179
180impl Queue {
181    fn was_created(&self) -> bool {
182        self.events.front().is_some_and(|event| {
183            matches!(
184                event.kind,
185                EventKind::Create(_) | EventKind::Modify(ModifyKind::Name(RenameMode::To))
186            )
187        })
188    }
189
190    fn was_removed(&self) -> bool {
191        self.events.front().is_some_and(|event| {
192            matches!(
193                event.kind,
194                EventKind::Remove(_) | EventKind::Modify(ModifyKind::Name(RenameMode::From))
195            )
196        })
197    }
198}
199
200#[derive(Debug)]
201pub(crate) struct DebounceDataInner<T> {
202    queues: HashMap<PathBuf, Queue>,
203    roots: Vec<(PathBuf, WatchMode)>,
204    cache: T,
205    rename_event: Option<(DebouncedEvent, Option<FileId>)>,
206    rescan_event: Option<DebouncedEvent>,
207    errors: Vec<Error>,
208    timeout: Duration,
209}
210
211impl<T: FileIdCache> DebounceDataInner<T> {
212    pub(crate) fn new(cache: T, timeout: Duration) -> Self {
213        Self {
214            queues: HashMap::new(),
215            roots: Vec::new(),
216            cache,
217            rename_event: None,
218            rescan_event: None,
219            errors: Vec::new(),
220            timeout,
221        }
222    }
223
224    /// Retrieve a vec of debounced events, removing them if not continuous
225    pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
226        let now = now();
227        let mut events_expired = Vec::with_capacity(self.queues.len());
228        let mut queues_remaining = HashMap::with_capacity(self.queues.len());
229
230        if let Some(event) = self.rescan_event.take() {
231            if now.saturating_duration_since(event.time) >= self.timeout {
232                tracing::trace!("debounce candidate rescan event: {event:?}");
233                events_expired.push(event);
234            } else {
235                self.rescan_event = Some(event);
236            }
237        }
238
239        // drain the entire queue, then process the expired events and re-add the rest
240        // TODO: perfect fit for drain_filter https://github.com/rust-lang/rust/issues/59618
241        for (path, mut queue) in self.queues.drain() {
242            let mut kind_index = HashMap::new();
243
244            while let Some(event) = queue.events.pop_front() {
245                // remove previous event of the same kind
246                if let Some(idx) = kind_index.get(&event.kind).copied() {
247                    tracing::trace!("removed candidate event: {event:?}");
248                    events_expired.remove(idx);
249
250                    for i in kind_index.values_mut() {
251                        if *i > idx {
252                            *i -= 1;
253                        }
254                    }
255                }
256
257                if now.saturating_duration_since(event.time) >= self.timeout {
258                    tracing::trace!("debounce candidate event: {event:?}");
259                    kind_index.insert(event.kind, events_expired.len());
260
261                    events_expired.push(event);
262                } else {
263                    queue.events.push_front(event);
264                    break;
265                }
266            }
267
268            if !queue.events.is_empty() {
269                queues_remaining.insert(path, queue);
270            }
271        }
272
273        self.queues = queues_remaining;
274
275        sort_events(events_expired)
276    }
277
278    /// Returns all currently stored errors
279    pub fn errors(&mut self) -> Vec<Error> {
280        std::mem::take(&mut self.errors)
281    }
282
283    /// Add an error entry to re-send later on
284    pub fn add_error(&mut self, error: Error) {
285        tracing::trace!("raw error: {error:?}");
286
287        self.errors.push(error);
288    }
289
290    /// Add new event to debouncer cache
291    pub fn add_event(&mut self, event: Event) {
292        tracing::trace!("raw event: {event:?}");
293
294        if event.need_rescan() {
295            self.cache.rescan(&self.roots);
296            self.rescan_event = Some(DebouncedEvent { event, time: now() });
297            return;
298        }
299
300        let Some(path) = event.paths.first() else {
301            tracing::info!("skipping event with no paths: {event:?}");
302            return;
303        };
304
305        match &event.kind {
306            EventKind::Create(_) => {
307                let watch_mode = self.watch_mode(path);
308
309                self.cache.add_path(path, watch_mode);
310
311                self.push_event(event, now());
312            }
313            EventKind::Modify(ModifyKind::Name(rename_mode)) => {
314                #[expect(clippy::match_same_arms)]
315                match rename_mode {
316                    RenameMode::Any => {
317                        if event.paths[0].exists() {
318                            self.handle_rename_to(event);
319                        } else {
320                            self.handle_rename_from(event);
321                        }
322                    }
323                    RenameMode::To => {
324                        self.handle_rename_to(event);
325                    }
326                    RenameMode::From => {
327                        self.handle_rename_from(event);
328                    }
329                    RenameMode::Both => {
330                        // ignore and handle `To` and `From` events instead
331                    }
332                    RenameMode::Other => {
333                        // unused
334                    }
335                }
336            }
337            EventKind::Remove(_) => {
338                self.push_remove_event(event, now());
339            }
340            EventKind::Other => {
341                // ignore meta events
342            }
343            _ => {
344                if self.cache.cached_file_id(path).is_none() {
345                    let watch_mode = self.watch_mode(path);
346
347                    self.cache.add_path(path, watch_mode);
348                }
349
350                self.push_event(event, now());
351            }
352        }
353    }
354
355    fn watch_mode(&self, path: &Path) -> WatchMode {
356        self.roots
357            .iter()
358            .find_map(|(root, watch_mode)| {
359                if path.starts_with(root) {
360                    Some(*watch_mode)
361                } else {
362                    None
363                }
364            })
365            .unwrap_or(WatchMode {
366                recursive_mode: RecursiveMode::NonRecursive,
367                target_mode: TargetMode::TrackPath, // TODO: correct default?
368            })
369    }
370
371    fn handle_rename_from(&mut self, event: Event) {
372        let time = now();
373        let path = &event.paths[0];
374
375        // store event
376        let file_id = self.cache.cached_file_id(path).map(|id| *id.as_ref());
377        self.rename_event = Some((DebouncedEvent::new(event.clone(), time), file_id));
378
379        self.cache.remove_path(path);
380
381        self.push_event(event, time);
382    }
383
384    fn handle_rename_to(&mut self, event: Event) {
385        let watch_mode = self.watch_mode(&event.paths[0]);
386
387        self.cache.add_path(&event.paths[0], watch_mode);
388
389        let trackers_match = self
390            .rename_event
391            .as_ref()
392            .and_then(|(e, _)| e.tracker())
393            .and_then(|from_tracker| {
394                event
395                    .attrs
396                    .tracker()
397                    .map(|to_tracker| from_tracker == to_tracker)
398            })
399            .unwrap_or_default();
400
401        let file_ids_match = self
402            .rename_event
403            .as_ref()
404            .and_then(|(_, id)| id.as_ref())
405            .and_then(|from_file_id| {
406                self.cache
407                    .cached_file_id(&event.paths[0])
408                    .map(|to_file_id| from_file_id == to_file_id.as_ref())
409            })
410            .unwrap_or_default();
411
412        if trackers_match || file_ids_match {
413            // connect rename
414            let (mut rename_event, _) = self.rename_event.take().unwrap(); // unwrap is safe because `rename_event` must be set at this point
415            let path = rename_event.paths.remove(0);
416            let time = rename_event.time;
417            self.push_rename_event(path, event, time);
418        } else {
419            // move in
420            self.push_event(event, now());
421        }
422
423        self.rename_event = None;
424    }
425
426    fn push_rename_event(&mut self, path: PathBuf, event: Event, time: Instant) {
427        self.cache.remove_path(&path);
428
429        let mut source_queue = self.queues.remove(&path).unwrap_or_default();
430
431        // remove rename `from` event
432        source_queue.events.pop_back();
433
434        // remove existing rename event
435        let (remove_index, original_path, original_time) = source_queue
436            .events
437            .iter()
438            .enumerate()
439            .find_map(|(index, e)| {
440                if matches!(
441                    e.kind,
442                    EventKind::Modify(ModifyKind::Name(RenameMode::Both))
443                ) {
444                    Some((Some(index), e.paths[0].clone(), e.time))
445                } else {
446                    None
447                }
448            })
449            .unwrap_or((None, path, time));
450
451        if let Some(remove_index) = remove_index {
452            source_queue.events.remove(remove_index);
453        }
454
455        // split off remove or move out event and add it back to the events map
456        if source_queue.was_removed() {
457            let event = source_queue.events.pop_front().unwrap();
458
459            self.queues.insert(
460                event.paths[0].clone(),
461                Queue {
462                    events: [event].into(),
463                },
464            );
465        }
466
467        // update paths
468        for e in &mut source_queue.events {
469            e.paths = vec![event.paths[0].clone()];
470        }
471
472        // insert rename event at the front, unless the file was just created
473        if !source_queue.was_created() {
474            source_queue.events.push_front(DebouncedEvent {
475                event: Event {
476                    kind: EventKind::Modify(ModifyKind::Name(RenameMode::Both)),
477                    paths: vec![original_path, event.paths[0].clone()],
478                    attrs: event.attrs,
479                },
480                time: original_time,
481            });
482        }
483
484        if let Some(target_queue) = self.queues.get_mut(&event.paths[0]) {
485            if !target_queue.was_created() {
486                let mut remove_event = DebouncedEvent {
487                    event: Event {
488                        kind: EventKind::Remove(RemoveKind::Any),
489                        paths: vec![event.paths[0].clone()],
490                        attrs: EventAttributes::default(),
491                    },
492                    time: original_time,
493                };
494                if !target_queue.was_removed() {
495                    remove_event.event = remove_event.event.set_info("override");
496                }
497                source_queue.events.push_front(remove_event);
498            }
499            *target_queue = source_queue;
500        } else {
501            self.queues.insert(event.paths[0].clone(), source_queue);
502        }
503    }
504
505    fn push_remove_event(&mut self, event: Event, time: Instant) {
506        let path = &event.paths[0];
507
508        // remove child queues
509        self.queues.retain(|p, _| !p.starts_with(path) || p == path);
510
511        // remove cached file ids
512        self.cache.remove_path(path);
513
514        match self.queues.get_mut(path) {
515            Some(queue) if queue.was_created() => {
516                self.queues.remove(path);
517            }
518            Some(queue) => {
519                queue.events = [DebouncedEvent::new(event, time)].into();
520            }
521            None => {
522                self.push_event(event, time);
523            }
524        }
525    }
526
527    fn push_event(&mut self, event: Event, time: Instant) {
528        let path = &event.paths[0];
529
530        if let Some(queue) = self.queues.get_mut(path) {
531            // Skip duplicate create events and modifications right after creation.
532            // This code relies on backends never emitting a `Modify` event with kind other than `Name` for a rename event.
533            if match event.kind {
534                EventKind::Modify(
535                    ModifyKind::Any
536                    | ModifyKind::Data(_)
537                    | ModifyKind::Metadata(_)
538                    | ModifyKind::Other,
539                )
540                | EventKind::Create(_) => !queue.was_created(),
541                _ => true,
542            } {
543                queue.events.push_back(DebouncedEvent::new(event, time));
544            }
545        } else {
546            self.queues.insert(
547                path.clone(),
548                Queue {
549                    events: [DebouncedEvent::new(event, time)].into(),
550                },
551            );
552        }
553    }
554}
555
556/// Debouncer guard, stops the debouncer on drop.
557#[derive(Debug)]
558pub struct Debouncer<T: Watcher, C: FileIdCache> {
559    watcher: T,
560    debouncer_thread: Option<std::thread::JoinHandle<()>>,
561    data: DebounceData<C>,
562    stop: Arc<AtomicBool>,
563}
564
565impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
566    /// Stop the debouncer, waits for the event thread to finish.
567    /// May block for the duration of one `tick_rate`.
568    pub fn stop(mut self) {
569        self.set_stop();
570        if let Some(t) = self.debouncer_thread.take() {
571            let result = t.join();
572            if let Err(e) = result {
573                tracing::error!(?e, "failed to join debouncer thread");
574            }
575        }
576    }
577
578    /// Stop the debouncer, does not wait for the event thread to finish.
579    pub fn stop_nonblocking(self) {
580        self.set_stop();
581    }
582
583    fn set_stop(&self) {
584        self.stop.store(true, Ordering::Relaxed);
585    }
586
587    #[deprecated = "`Debouncer` provides all methods from `Watcher` itself now. Remove `.watcher()` and use those methods directly."]
588    pub fn watcher(&mut self) {}
589
590    #[deprecated = "`Debouncer` now manages root paths automatically. Remove all calls to `add_root` and `remove_root`."]
591    pub fn cache(&mut self) {}
592
593    fn add_root(&self, path: impl Into<PathBuf>, watch_mode: WatchMode) {
594        let path = path.into();
595
596        let mut data = self.data.lock().unwrap();
597
598        // skip, if the root has already been added
599        if data.roots.iter().any(|(p, _)| p == &path) {
600            return;
601        }
602
603        data.roots.push((path.clone(), watch_mode));
604
605        data.cache.add_path(&path, watch_mode);
606    }
607
608    fn remove_root(&self, path: impl AsRef<Path>) {
609        let mut data = self.data.lock().unwrap();
610
611        data.roots.retain(|(root, _)| !root.starts_with(&path));
612
613        data.cache.remove_path(path.as_ref());
614    }
615
616    pub fn watch(&mut self, path: impl AsRef<Path>, watch_mode: WatchMode) -> notify::Result<()> {
617        self.watcher.watch(path.as_ref(), watch_mode)?;
618        self.add_root(path.as_ref(), watch_mode);
619        Ok(())
620    }
621
622    pub fn unwatch(&mut self, path: impl AsRef<Path>) -> notify::Result<()> {
623        self.watcher.unwatch(path.as_ref())?;
624        self.remove_root(path);
625        Ok(())
626    }
627
628    pub fn paths_mut<'me>(&'me mut self) -> Box<dyn PathsMut + 'me> {
629        self.watcher.paths_mut()
630    }
631
632    pub fn configure(&mut self, option: notify::Config) -> notify::Result<bool> {
633        self.watcher.configure(option)
634    }
635
636    #[must_use]
637    pub fn kind() -> WatcherKind
638    where
639        Self: Sized,
640    {
641        T::kind()
642    }
643}
644
645impl<T: Watcher, C: FileIdCache> Drop for Debouncer<T, C> {
646    fn drop(&mut self) {
647        self.set_stop();
648    }
649}
650
651/// Creates a new debounced watcher with custom configuration.
652///
653/// Timeout is the amount of time after which a debounced event is emitted.
654///
655/// If `tick_rate` is `None`, notify will select a tick rate that is 1/4 of the provided timeout.
656///
657/// # Panics
658/// Panics if the internal mutex is poisoned.
659#[tracing::instrument(level = "debug", skip(event_handler, file_id_cache))]
660pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + Send + 'static>(
661    timeout: Duration,
662    tick_rate: Option<Duration>,
663    mut event_handler: F,
664    file_id_cache: C,
665    config: notify::Config,
666) -> Result<Debouncer<T, C>, Error> {
667    let data = Arc::new(Mutex::new(DebounceDataInner::new(file_id_cache, timeout)));
668    let stop = Arc::new(AtomicBool::new(false));
669
670    let tick_div = 4;
671    let tick = match tick_rate {
672        Some(v) => {
673            if v > timeout {
674                return Err(Error::new(ErrorKind::Generic(format!(
675                    "Invalid tick_rate, tick rate {v:?} > {timeout:?} timeout!"
676                ))));
677            }
678            v
679        }
680        None => timeout.checked_div(tick_div).ok_or_else(|| {
681            Error::new(ErrorKind::Generic(format!(
682                "Failed to calculate tick as {timeout:?}/{tick_div}!"
683            )))
684        })?,
685    };
686
687    let data_c = Arc::clone(&data);
688    let stop_c = Arc::clone(&stop);
689    let thread = std::thread::Builder::new()
690        .name("notify-rs debouncer loop".to_string())
691        .spawn(move || {
692            loop {
693                if stop_c.load(Ordering::Acquire) {
694                    break;
695                }
696                std::thread::sleep(tick);
697                let send_data;
698                let errors;
699                {
700                    let mut lock = data_c.lock().unwrap();
701                    send_data = lock.debounced_events();
702                    errors = lock.errors();
703                }
704                if !send_data.is_empty() {
705                    event_handler.handle_event(Ok(send_data));
706                }
707                if !errors.is_empty() {
708                    event_handler.handle_event(Err(errors));
709                }
710            }
711        })?;
712
713    let data_c = Arc::clone(&data);
714    let watcher = T::new(
715        move |e: Result<Event, Error>| {
716            let mut lock = data_c.lock().unwrap();
717
718            match e {
719                Ok(e) => lock.add_event(e),
720                // can't have multiple TX, so we need to pipe that through our debouncer
721                Err(e) => lock.add_error(e),
722            }
723        },
724        config,
725    )?;
726
727    let guard = Debouncer {
728        watcher,
729        debouncer_thread: Some(thread),
730        data,
731        stop,
732    };
733
734    Ok(guard)
735}
736
737/// Short function to create a new debounced watcher with the recommended debouncer and the built-in file ID cache.
738///
739/// Timeout is the amount of time after which a debounced event is emitted.
740///
741/// If `tick_rate` is `None`, notify will select a tick rate that is 1/4 of the provided timeout.
742pub fn new_debouncer<F: DebounceEventHandler>(
743    timeout: Duration,
744    tick_rate: Option<Duration>,
745    event_handler: F,
746) -> Result<Debouncer<RecommendedWatcher, RecommendedCache>, Error> {
747    new_debouncer_opt::<F, RecommendedWatcher, RecommendedCache>(
748        timeout,
749        tick_rate,
750        event_handler,
751        RecommendedCache::new(),
752        notify::Config::default(),
753    )
754}
755
756#[tracing::instrument(level = "trace", ret)]
757fn sort_events(events: Vec<DebouncedEvent>) -> Vec<DebouncedEvent> {
758    let mut sorted = Vec::with_capacity(events.len());
759
760    // group events by path
761    let mut events_by_path: HashMap<_, VecDeque<_>> =
762        events.into_iter().fold(HashMap::new(), |mut acc, event| {
763            acc.entry(event.paths.last().cloned().unwrap_or_default())
764                .or_default()
765                .push_back(event);
766            acc
767        });
768
769    // push events for different paths in chronological order and keep the order of events with the same path
770
771    let mut min_time_heap = events_by_path
772        .iter()
773        .map(|(path, events)| Reverse((events[0].time, path.clone())))
774        .collect::<BinaryHeap<_>>();
775
776    while let Some(Reverse((min_time, path))) = min_time_heap.pop() {
777        // unwrap is safe because only paths from `events_by_path` are added to `min_time_heap`
778        // and they are never removed from `events_by_path`.
779        let events = events_by_path.get_mut(&path).unwrap();
780
781        let mut push_next = false;
782
783        while events.front().is_some_and(|event| event.time <= min_time) {
784            // unwrap is safe because `pop_front` mus return some in order to enter the loop
785            let event = events.pop_front().unwrap();
786            sorted.push(event);
787            push_next = true;
788        }
789
790        if push_next && let Some(event) = events.front() {
791            min_time_heap.push(Reverse((event.time, path)));
792        }
793    }
794
795    sorted
796}
797
798#[cfg(test)]
799mod tests {
800    use std::{fs, path::Path};
801
802    use super::*;
803
804    use pretty_assertions::assert_eq;
805    use rstest::rstest;
806    use tempfile::tempdir;
807    use testing::TestCase;
808    use time::MockTime;
809
810    #[rstest]
811    fn state(
812        #[values(
813            "add_create_event",
814            "add_create_event_after_remove_event",
815            "add_create_dir_event_twice",
816            "add_event_with_no_paths_is_ok",
817            "add_modify_any_event_after_create_event",
818            "add_modify_content_event_after_create_event",
819            "add_rename_from_event",
820            "add_rename_from_event_after_create_event",
821            "add_rename_from_event_after_modify_event",
822            "add_rename_from_event_after_create_and_modify_event",
823            "add_rename_from_event_after_rename_from_event",
824            "add_rename_to_event",
825            "add_rename_to_dir_event",
826            "add_rename_from_and_to_event",
827            "add_rename_from_and_to_event_after_create",
828            "add_rename_from_and_to_event_after_rename",
829            "add_rename_from_and_to_event_after_modify_content",
830            "add_rename_from_and_to_event_override_created",
831            "add_rename_from_and_to_event_override_modified",
832            "add_rename_from_and_to_event_override_removed",
833            "add_rename_from_and_to_event_with_file_ids",
834            "add_rename_from_and_to_event_with_different_file_ids",
835            "add_rename_from_and_to_event_with_different_tracker",
836            "add_rename_both_event",
837            "add_remove_event",
838            "add_remove_event_after_create_event",
839            "add_remove_event_after_modify_event",
840            "add_remove_event_after_create_and_modify_event",
841            "add_remove_parent_event_after_remove_child_event",
842            "add_errors",
843            "debounce_modify_events",
844            "emit_continuous_modify_content_events",
845            "emit_events_in_chronological_order",
846            "emit_events_with_a_prepended_rename_event",
847            "emit_close_events_only_once",
848            "emit_modify_event_after_close_event",
849            "emit_needs_rescan_event",
850            "read_file_id_without_create_event",
851            "sort_events_chronologically",
852            "sort_events_with_reordering"
853        )]
854        file_name: &str,
855    ) {
856        let file_content =
857            fs::read_to_string(Path::new(&format!("./test_cases/{file_name}.hjson"))).unwrap();
858        let mut test_case = deser_hjson::from_str::<TestCase>(&file_content).unwrap();
859
860        let time = now();
861        MockTime::set_time(time);
862
863        let mut state = test_case.state.into_debounce_data_inner(time);
864        state.roots = vec![(PathBuf::from("/"), WatchMode::recursive())];
865
866        let mut prev_event_time = Duration::default();
867
868        for event in test_case.events {
869            let event_time = Duration::from_millis(event.time);
870            let event = event.into_debounced_event(time, None);
871            MockTime::advance(event_time - prev_event_time);
872            prev_event_time = event_time;
873            state.add_event(event.event);
874        }
875
876        for error in test_case.errors {
877            let error = error.into_notify_error();
878            state.add_error(error);
879        }
880
881        let expected_errors = std::mem::take(&mut test_case.expected.errors);
882        let expected_events = std::mem::take(&mut test_case.expected.events);
883        let expected_state = test_case.expected.into_debounce_data_inner(time);
884        assert_eq!(
885            state.queues, expected_state.queues,
886            "queues not as expected"
887        );
888        assert_eq!(
889            state.rename_event, expected_state.rename_event,
890            "rename event not as expected"
891        );
892        assert_eq!(
893            state.rescan_event, expected_state.rescan_event,
894            "rescan event not as expected"
895        );
896        assert_eq!(
897            state.cache.paths, expected_state.cache.paths,
898            "cache not as expected"
899        );
900
901        assert_eq!(
902            state
903                .errors
904                .iter()
905                .map(|e| format!("{e:?}"))
906                .collect::<Vec<_>>(),
907            expected_errors
908                .iter()
909                .map(|e| format!("{:?}", e.clone().into_notify_error()))
910                .collect::<Vec<_>>(),
911            "errors not as expected"
912        );
913
914        let backup_time = now();
915        let backup_queues = state.queues.clone();
916
917        for (delay, events) in expected_events {
918            MockTime::set_time(backup_time);
919            state.queues = backup_queues.clone();
920
921            match delay.as_str() {
922                "none" => {}
923                "short" => MockTime::advance(Duration::from_millis(10)),
924                "long" => MockTime::advance(Duration::from_millis(100)),
925                _ => {
926                    if let Ok(ts) = delay.parse::<u64>() {
927                        MockTime::set_time(time + Duration::from_millis(ts));
928                    }
929                }
930            }
931
932            let events = events
933                .into_iter()
934                .map(|event| event.into_debounced_event(time, None))
935                .collect::<Vec<_>>();
936
937            assert_eq!(
938                state.debounced_events(),
939                events,
940                "debounced events after a `{delay}` delay"
941            );
942        }
943    }
944
945    #[expect(clippy::print_stdout)]
946    #[test]
947    fn integration() -> Result<(), Box<dyn std::error::Error>> {
948        let dir = tempdir()?;
949
950        // set up the watcher
951        let (tx, rx) = std::sync::mpsc::channel();
952        let mut debouncer = new_debouncer(Duration::from_millis(10), None, tx)?;
953        debouncer.watch(dir.path(), WatchMode::recursive())?;
954
955        // create a new file
956        let file_path = dir.path().join("file.txt");
957        fs::write(&file_path, b"Lorem ipsum")?;
958
959        println!("waiting for event at {}", file_path.display());
960
961        // wait for up to 10 seconds for the create event, ignore all other events
962        let deadline = Instant::now() + Duration::from_secs(10);
963        while deadline > Instant::now() {
964            let events = rx
965                .recv_timeout(deadline - Instant::now())
966                .expect("did not receive expected event")
967                .expect("received an error");
968
969            for event in events {
970                if event.event.paths == vec![file_path.clone()]
971                    || event.event.paths == vec![file_path.canonicalize()?]
972                {
973                    return Ok(());
974                }
975
976                println!("unexpected event: {event:?}");
977            }
978        }
979
980        panic!("did not receive expected event");
981    }
982}