Skip to main content

notify_debouncer_mini/
lib.rs

1//! Debouncer for [notify](https://crates.io/crates/notify). Filters incoming events and emits only one event per timeframe per file.
2//!
3//! # Installation
4//!
5//! ```toml
6//! [dependencies]
7//! notify-debouncer-mini = "0.6.0"
8//! ```
9//! In case you want to select specific features of notify,
10//! specify notify as dependency explicitly in your dependencies.
11//! Otherwise you can just use the re-export of notify from debouncer-mini.
12//! ```toml
13//! notify-debouncer-mini = "0.6.0"
14//! notify = { version = "..", features = [".."] }
15//! ```
16//!
17//! # Examples
18//! See also the full configuration example [here](https://github.com/notify-rs/notify/blob/main/examples/debouncer_mini_custom.rs).
19//!
20//! ```rust,no_run
21//! # use std::path::Path;
22//! # use std::time::Duration;
23//! use notify_debouncer_mini::{notify::*,new_debouncer,DebounceEventResult};
24//!
25//! # fn main() {
26//!   // Select recommended watcher for debouncer.
27//!   // Using a callback here, could also be a channel.
28//!   let mut debouncer = new_debouncer(Duration::from_secs(2), |res: DebounceEventResult| {
29//!       match res {
30//!           Ok(events) => events.iter().for_each(|e|println!("Event {:?} for {:?}",e.kind,e.path)),
31//!           Err(e) => println!("Error {:?}",e),
32//!       }
33//!   }).unwrap();
34//!
35//!   // Add a path to be watched. All files and directories at that path and
36//!   // below will be monitored for changes.
37//!   debouncer.watcher().watch(Path::new("."), WatchMode::recursive()).unwrap();
38//!
39//!   // note that dropping the debouncer (as will happen here) also ends the debouncer
40//!   // thus this demo would need an endless loop to keep running
41//! # }
42//! ```
43//!
44//! # Features
45//!
46//! The following crate features can be turned on or off in your cargo dependency config:
47//!
48//! - `serde` passed down to notify-types, off by default
49//! - `crossbeam-channel` passed down to notify, off by default
50//! - `flume` passed down to notify, off by default
51//! - `macos_fsevent` passed down to notify, off by default
52//! - `macos_kqueue` passed down to notify, off by default
53//! - `serialization-compat-6` passed down to notify, off by default
54//!
55//! # Caveats
56//!
57//! As all file events are sourced from notify, the [known problems](https://docs.rs/notify/latest/notify/#known-problems) section applies here too.
58use std::{
59    collections::HashMap,
60    path::PathBuf,
61    sync::mpsc::{RecvTimeoutError, Sender},
62    time::{Duration, Instant},
63};
64
65pub use notify;
66pub use notify_types::debouncer_mini::{DebouncedEvent, DebouncedEventKind};
67
68use notify::{Error, Event, RecommendedWatcher, Watcher};
69
70/// The set of requirements for watcher debounce event handling functions.
71///
72/// # Example implementation
73///
74/// ```rust,no_run
75/// # use notify::{Event, Result, EventHandler};
76/// # use notify_debouncer_mini::{DebounceEventHandler,DebounceEventResult};
77///
78/// /// Prints received events
79/// struct EventPrinter;
80///
81/// impl DebounceEventHandler for EventPrinter {
82///     fn handle_event(&mut self, event: DebounceEventResult) {
83///         match event {
84///             Ok(events) => {
85///                 for event in events {
86///                     println!("Event {:?} for path {:?}",event.kind,event.path);
87///                 }
88///             },
89///             // errors are immediately reported
90///             Err(error) => println!("Got error {:?}",error),
91///         }
92///     }
93/// }
94/// ```
95pub trait DebounceEventHandler: Send + 'static {
96    /// Handles an event.
97    fn handle_event(&mut self, event: DebounceEventResult);
98}
99
100/// Config for debouncer-mini
101/// ```rust
102/// # use std::time::Duration;
103/// use notify_debouncer_mini::Config;
104/// let backend_config = notify::Config::default();
105///
106/// let config = Config::default().with_timeout(Duration::from_secs(1)).with_batch_mode(true)
107///     .with_notify_config(backend_config);
108/// ```
109#[derive(Clone, Debug, Eq, Hash, PartialEq)]
110pub struct Config {
111    timeout: Duration,
112    batch_mode: bool,
113    notify_config: notify::Config,
114}
115
116impl Default for Config {
117    fn default() -> Self {
118        Self {
119            timeout: Duration::from_millis(500),
120            batch_mode: true,
121            notify_config: notify::Config::default(),
122        }
123    }
124}
125
126impl Config {
127    /// Set timeout
128    ///
129    /// Timeout is the amount of time after which a debounced event is emitted or a continuous event is send, if there still are events incoming for the specific path.
130    #[must_use]
131    pub fn with_timeout(mut self, timeout: Duration) -> Self {
132        self.timeout = timeout;
133        self
134    }
135    /// Set batch mode
136    ///
137    /// When `batch_mode` is enabled, events may be delayed (at most 2x the specified timeout) and delivered with others.
138    /// If disabled, all events are delivered immediately when their debounce timeout is reached.
139    #[must_use]
140    pub fn with_batch_mode(mut self, batch_mode: bool) -> Self {
141        self.batch_mode = batch_mode;
142        self
143    }
144    /// Set [`notify::Config`] for the backend
145    #[must_use]
146    pub fn with_notify_config(mut self, notify_config: notify::Config) -> Self {
147        self.notify_config = notify_config;
148        self
149    }
150}
151
152impl<F> DebounceEventHandler for F
153where
154    F: FnMut(DebounceEventResult) + Send + 'static,
155{
156    fn handle_event(&mut self, event: DebounceEventResult) {
157        (self)(event);
158    }
159}
160
161#[cfg(feature = "crossbeam-channel")]
162impl DebounceEventHandler for crossbeam_channel::Sender<DebounceEventResult> {
163    fn handle_event(&mut self, event: DebounceEventResult) {
164        let result = self.send(event);
165        if let Err(e) = result {
166            tracing::error!(?e, "failed to send debounce event result");
167        }
168    }
169}
170
171#[cfg(feature = "flume")]
172impl DebounceEventHandler for flume::Sender<DebounceEventResult> {
173    fn handle_event(&mut self, event: DebounceEventResult) {
174        let result = self.send(event);
175        if let Err(e) = result {
176            tracing::error!(?e, "failed to send debounce event result");
177        }
178    }
179}
180
181impl DebounceEventHandler for std::sync::mpsc::Sender<DebounceEventResult> {
182    fn handle_event(&mut self, event: DebounceEventResult) {
183        let result = self.send(event);
184        if let Err(e) = result {
185            tracing::error!(?e, "failed to send debounce event result");
186        }
187    }
188}
189
190/// Deduplicate event data entry
191#[derive(Debug)]
192struct EventData {
193    /// Insertion Time
194    insert: Instant,
195    /// Last Update
196    update: Instant,
197}
198
199impl EventData {
200    #[inline(always)]
201    fn new_any(time: Instant) -> Self {
202        Self {
203            insert: time,
204            update: time,
205        }
206    }
207}
208
209/// A result of debounced events.
210/// Comes with either a vec of events or an immediate error.
211pub type DebounceEventResult = Result<Vec<DebouncedEvent>, Error>;
212
213enum InnerEvent {
214    NotifyEvent(Result<Event, Error>),
215    Shutdown,
216}
217
218struct DebounceDataInner {
219    /// Path -> Event data
220    event_map: HashMap<PathBuf, EventData>,
221    /// timeout used to compare all events against, config
222    timeout: Duration,
223    /// Whether to time events exactly, or batch multiple together.
224    /// This reduces the amount of updates but possibly waiting longer than necessary for some events
225    batch_mode: bool,
226    /// next debounce deadline
227    debounce_deadline: Option<Instant>,
228}
229
230impl DebounceDataInner {
231    pub fn new(timeout: Duration, batch_mode: bool) -> Self {
232        Self {
233            timeout,
234            debounce_deadline: None,
235            event_map: HashMap::default(),
236            batch_mode,
237        }
238    }
239
240    /// Returns a duration to wait for the next tick
241    #[inline]
242    pub fn next_tick(&self) -> Option<Duration> {
243        self.debounce_deadline
244            .map(|deadline| deadline.saturating_duration_since(Instant::now()))
245    }
246
247    /// Retrieve a vec of debounced events, removing them if not continuous
248    ///
249    /// Updates the internal tracker for the next tick
250    pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
251        let mut events_expired = Vec::with_capacity(self.event_map.len());
252        let mut data_back = HashMap::with_capacity(self.event_map.len());
253        // TODO: perfect fit for drain_filter https://github.com/rust-lang/rust/issues/59618
254        // reset deadline
255        self.debounce_deadline = None;
256        for (path, event) in self.event_map.drain() {
257            if event.update.elapsed() >= self.timeout {
258                tracing::trace!("debounced event: {:?}", DebouncedEventKind::Any);
259                events_expired.push(DebouncedEvent::new(path, DebouncedEventKind::Any));
260            } else if event.insert.elapsed() >= self.timeout {
261                tracing::trace!("debounced event: {:?}", DebouncedEventKind::AnyContinuous);
262                // set a new deadline, otherwise an 'AnyContinuous' will never resolve to a final 'Any' event
263                Self::check_deadline(
264                    self.batch_mode,
265                    self.timeout,
266                    &mut self.debounce_deadline,
267                    &event,
268                );
269                data_back.insert(path.clone(), event);
270                events_expired.push(DebouncedEvent::new(path, DebouncedEventKind::AnyContinuous));
271            } else {
272                // event is neither old enough for continuous event, nor is it expired for an Any event
273                Self::check_deadline(
274                    self.batch_mode,
275                    self.timeout,
276                    &mut self.debounce_deadline,
277                    &event,
278                );
279                data_back.insert(path, event);
280            }
281        }
282        self.event_map = data_back;
283        events_expired
284    }
285
286    /// Updates the deadline if none is set or when batch mode is disabled and the current deadline would miss the next event.
287    /// The new deadline is calculated based on the last event update time and the debounce timeout.
288    ///
289    /// can't sub-function this due to `event_map.drain()` holding `&mut self`
290    fn check_deadline(
291        batch_mode: bool,
292        timeout: Duration,
293        debounce_deadline: &mut Option<Instant>,
294        event: &EventData,
295    ) {
296        let deadline_candidate = event.update + timeout;
297        match debounce_deadline {
298            Some(current_deadline) => {
299                // shorten deadline to not delay the event
300                // with batch mode simply wait for the incoming deadline and delay the event until then
301                if !batch_mode && *current_deadline > deadline_candidate {
302                    *debounce_deadline = Some(deadline_candidate);
303                }
304            }
305            None => *debounce_deadline = Some(deadline_candidate),
306        }
307    }
308
309    /// Add new event to debouncer cache
310    #[inline(always)]
311    fn add_event(&mut self, event: Event) {
312        tracing::trace!("raw event: {event:?}");
313        let time = Instant::now();
314        if self.debounce_deadline.is_none() {
315            self.debounce_deadline = Some(time + self.timeout);
316        }
317        for path in event.paths {
318            if let Some(v) = self.event_map.get_mut(&path) {
319                v.update = time;
320            } else {
321                self.event_map.insert(path, EventData::new_any(time));
322            }
323        }
324    }
325}
326
327/// Debouncer guard, stops the debouncer on drop
328#[derive(Debug)]
329pub struct Debouncer<T: Watcher> {
330    watcher: T,
331    stop_channel: Sender<InnerEvent>,
332}
333
334impl<T: Watcher> Debouncer<T> {
335    /// Access to the internally used notify Watcher backend
336    pub fn watcher(&mut self) -> &mut dyn Watcher {
337        &mut self.watcher
338    }
339}
340
341impl<T: Watcher> Drop for Debouncer<T> {
342    fn drop(&mut self) {
343        // send error just means that it is stopped, can't do much else
344        let result = self.stop_channel.send(InnerEvent::Shutdown);
345        if let Err(e) = result {
346            tracing::error!(?e, "failed to send shutdown event");
347        }
348    }
349}
350
351/// Creates a new debounced watcher with custom configuration.
352#[expect(clippy::needless_pass_by_value)]
353#[tracing::instrument(level = "debug", skip(event_handler))]
354pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher>(
355    config: Config,
356    mut event_handler: F,
357) -> Result<Debouncer<T>, Error> {
358    let (tx, rx) = std::sync::mpsc::channel();
359
360    std::thread::Builder::new()
361        .name("notify-rs debouncer loop".to_string())
362        .spawn(move || {
363            let mut data = DebounceDataInner::new(config.timeout, config.batch_mode);
364            let mut run = true;
365            while run {
366                match data.next_tick() {
367                    Some(timeout) => {
368                        // wait for wakeup
369                        match rx.recv_timeout(timeout) {
370                            Ok(InnerEvent::NotifyEvent(event_result)) => match event_result {
371                                Ok(event) => data.add_event(event),
372                                Err(err) => event_handler.handle_event(Err(err)),
373                            },
374                            Err(RecvTimeoutError::Timeout) => {
375                                let send_data = data.debounced_events();
376                                if !send_data.is_empty() {
377                                    event_handler.handle_event(Ok(send_data));
378                                }
379                            }
380                            Ok(InnerEvent::Shutdown) | Err(RecvTimeoutError::Disconnected) => {
381                                run = false;
382                            }
383                        }
384                    }
385                    None => match rx.recv() {
386                        // no timeout, wait for event
387                        Ok(InnerEvent::NotifyEvent(e)) => match e {
388                            Ok(event) => data.add_event(event),
389                            Err(err) => event_handler.handle_event(Err(err)),
390                        },
391                        Ok(InnerEvent::Shutdown) | Err(_) => run = false,
392                    },
393                }
394            }
395        })?;
396
397    let tx_c = tx.clone();
398    let watcher = T::new(
399        move |e: Result<Event, Error>| {
400            // send failure can't be handled, would need a working channel to signal that
401            // also probably means that we're in the process of shutting down
402            let result = tx_c.send(InnerEvent::NotifyEvent(e));
403            if let Err(e) = result {
404                tracing::error!(?e, "failed to send notify event");
405            }
406        },
407        config.notify_config,
408    )?;
409
410    let guard = Debouncer {
411        watcher,
412        stop_channel: tx,
413    };
414
415    Ok(guard)
416}
417
418/// Short function to create a new debounced watcher with the recommended debouncer.
419///
420/// Timeout is the amount of time after which a debounced event is emitted or a continuous event is send, if there still are events incoming for the specific path.
421pub fn new_debouncer<F: DebounceEventHandler>(
422    timeout: Duration,
423    event_handler: F,
424) -> Result<Debouncer<RecommendedWatcher>, Error> {
425    let config = Config::default().with_timeout(timeout);
426    new_debouncer_opt::<F, RecommendedWatcher>(config, event_handler)
427}
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432    use notify::WatchMode;
433    use std::fs;
434    use tempfile::tempdir;
435
436    #[expect(clippy::print_stdout)]
437    #[test]
438    fn integration() -> Result<(), Box<dyn std::error::Error>> {
439        let dir = tempdir()?;
440
441        // set up the watcher
442        let (tx, rx) = std::sync::mpsc::channel();
443        let mut debouncer = new_debouncer(Duration::from_secs(1), tx)?;
444        debouncer
445            .watcher()
446            .watch(dir.path(), WatchMode::recursive())?;
447
448        // create a new file
449        let file_path = dir.path().join("file.txt");
450        fs::write(&file_path, b"Lorem ipsum")?;
451
452        println!("waiting for event at {}", file_path.display());
453
454        // wait for up to 10 seconds for the create event, ignore all other events
455        let deadline = Instant::now() + Duration::from_secs(10);
456        while deadline > Instant::now() {
457            let events = rx
458                .recv_timeout(deadline - Instant::now())
459                .expect("did not receive expected event")
460                .expect("received an error");
461
462            for event in events {
463                if event == DebouncedEvent::new(file_path.clone(), DebouncedEventKind::Any)
464                    || event
465                        == DebouncedEvent::new(file_path.canonicalize()?, DebouncedEventKind::Any)
466                {
467                    return Ok(());
468                }
469
470                println!("unexpected event: {event:?}");
471            }
472        }
473
474        panic!("did not receive expected event");
475    }
476}