notify_debouncer_mini/
lib.rs

1//! Debouncer for [notify](https://crates.io/crates/notify). Filters incoming events and emits only one event per timeframe per file.
2//!
3//! # Installation
4//!
5//! ```toml
6//! [dependencies]
7//! notify-debouncer-mini = "0.6.0"
8//! ```
9//! In case you want to select specific features of notify,
10//! specify notify as dependency explicitly in your dependencies.
11//! Otherwise you can just use the re-export of notify from debouncer-mini.
12//! ```toml
13//! notify-debouncer-mini = "0.6.0"
14//! notify = { version = "..", features = [".."] }
15//! ```
16//!
17//! # Examples
18//! See also the full configuration example [here](https://github.com/notify-rs/notify/blob/main/examples/debouncer_mini_custom.rs).
19//!
20//! ```rust,no_run
21//! # use std::path::Path;
22//! # use std::time::Duration;
23//! use notify_debouncer_mini::{notify::*,new_debouncer,DebounceEventResult};
24//!
25//! # fn main() {
26//!   // Select recommended watcher for debouncer.
27//!   // Using a callback here, could also be a channel.
28//!   let mut debouncer = new_debouncer(Duration::from_secs(2), |res: DebounceEventResult| {
29//!       match res {
30//!           Ok(events) => events.iter().for_each(|e|println!("Event {:?} for {:?}",e.kind,e.path)),
31//!           Err(e) => println!("Error {:?}",e),
32//!       }
33//!   }).unwrap();
34//!
35//!   // Add a path to be watched. All files and directories at that path and
36//!   // below will be monitored for changes.
37//!   debouncer.watcher().watch(Path::new("."), RecursiveMode::Recursive).unwrap();
38//!
39//!   // note that dropping the debouncer (as will happen here) also ends the debouncer
40//!   // thus this demo would need an endless loop to keep running
41//! # }
42//! ```
43//!
44//! # Features
45//!
46//! The following crate features can be turned on or off in your cargo dependency config:
47//!
48//! - `serde` passed down to notify-types, off by default
49//! - `crossbeam-channel` passed down to notify, off by default
50//! - `macos_fsevent` passed down to notify, off by default
51//! - `macos_kqueue` passed down to notify, off by default
52//! - `serialization-compat-6` passed down to notify, off by default
53//!
54//! # Caveats
55//!
56//! As all file events are sourced from notify, the [known problems](https://docs.rs/notify/latest/notify/#known-problems) section applies here too.
57use std::{
58    collections::HashMap,
59    path::PathBuf,
60    sync::mpsc::{RecvTimeoutError, Sender},
61    time::{Duration, Instant},
62};
63
64pub use notify;
65pub use notify_types::debouncer_mini::{DebouncedEvent, DebouncedEventKind};
66
67use notify::{Error, Event, RecommendedWatcher, Watcher};
68
69/// The set of requirements for watcher debounce event handling functions.
70///
71/// # Example implementation
72///
73/// ```rust,no_run
74/// # use notify::{Event, Result, EventHandler};
75/// # use notify_debouncer_mini::{DebounceEventHandler,DebounceEventResult};
76///
77/// /// Prints received events
78/// struct EventPrinter;
79///
80/// impl DebounceEventHandler for EventPrinter {
81///     fn handle_event(&mut self, event: DebounceEventResult) {
82///         match event {
83///             Ok(events) => {
84///                 for event in events {
85///                     println!("Event {:?} for path {:?}",event.kind,event.path);
86///                 }
87///             },
88///             // errors are immediately reported
89///             Err(error) => println!("Got error {:?}",error),
90///         }
91///     }
92/// }
93/// ```
94pub trait DebounceEventHandler: Send + 'static {
95    /// Handles an event.
96    fn handle_event(&mut self, event: DebounceEventResult);
97}
98
99/// Config for debouncer-mini
100/// ```rust
101/// # use std::time::Duration;
102/// use notify_debouncer_mini::Config;
103/// let backend_config = notify::Config::default();
104///
105/// let config = Config::default().with_timeout(Duration::from_secs(1)).with_batch_mode(true)
106///     .with_notify_config(backend_config);
107/// ```
108#[derive(Clone, Debug, Eq, Hash, PartialEq)]
109pub struct Config {
110    timeout: Duration,
111    batch_mode: bool,
112    notify_config: notify::Config,
113}
114
115impl Default for Config {
116    fn default() -> Self {
117        Self {
118            timeout: Duration::from_millis(500),
119            batch_mode: true,
120            notify_config: notify::Config::default(),
121        }
122    }
123}
124
125impl Config {
126    /// Set timeout
127    ///
128    /// Timeout is the amount of time after which a debounced event is emitted or a continuous event is send, if there still are events incoming for the specific path.
129    pub fn with_timeout(mut self, timeout: Duration) -> Self {
130        self.timeout = timeout;
131        self
132    }
133    /// Set batch mode
134    ///
135    /// When `batch_mode` is enabled, events may be delayed (at most 2x the specified timeout) and delivered with others.
136    /// If disabled, all events are delivered immediately when their debounce timeout is reached.
137    pub fn with_batch_mode(mut self, batch_mode: bool) -> Self {
138        self.batch_mode = batch_mode;
139        self
140    }
141    /// Set [`notify::Config`] for the backend
142    pub fn with_notify_config(mut self, notify_config: notify::Config) -> Self {
143        self.notify_config = notify_config;
144        self
145    }
146}
147
148impl<F> DebounceEventHandler for F
149where
150    F: FnMut(DebounceEventResult) + Send + 'static,
151{
152    fn handle_event(&mut self, event: DebounceEventResult) {
153        (self)(event);
154    }
155}
156
157#[cfg(feature = "crossbeam-channel")]
158impl DebounceEventHandler for crossbeam_channel::Sender<DebounceEventResult> {
159    fn handle_event(&mut self, event: DebounceEventResult) {
160        let _ = self.send(event);
161    }
162}
163
164impl DebounceEventHandler for std::sync::mpsc::Sender<DebounceEventResult> {
165    fn handle_event(&mut self, event: DebounceEventResult) {
166        let _ = self.send(event);
167    }
168}
169
170/// Deduplicate event data entry
171#[derive(Debug)]
172struct EventData {
173    /// Insertion Time
174    insert: Instant,
175    /// Last Update
176    update: Instant,
177}
178
179impl EventData {
180    #[inline(always)]
181    fn new_any(time: Instant) -> Self {
182        Self {
183            insert: time,
184            update: time,
185        }
186    }
187}
188
189/// A result of debounced events.
190/// Comes with either a vec of events or an immediate error.
191pub type DebounceEventResult = Result<Vec<DebouncedEvent>, Error>;
192
193enum InnerEvent {
194    NotifyEvent(Result<Event, Error>),
195    Shutdown,
196}
197
198struct DebounceDataInner {
199    /// Path -> Event data
200    event_map: HashMap<PathBuf, EventData>,
201    /// timeout used to compare all events against, config
202    timeout: Duration,
203    /// Whether to time events exactly, or batch multiple together.
204    /// This reduces the amount of updates but possibly waiting longer than necessary for some events
205    batch_mode: bool,
206    /// next debounce deadline
207    debounce_deadline: Option<Instant>,
208}
209
210impl DebounceDataInner {
211    pub fn new(timeout: Duration, batch_mode: bool) -> Self {
212        Self {
213            timeout,
214            debounce_deadline: None,
215            event_map: Default::default(),
216            batch_mode,
217        }
218    }
219
220    /// Returns a duration to wait for the next tick
221    #[inline]
222    pub fn next_tick(&self) -> Option<Duration> {
223        self.debounce_deadline
224            .map(|deadline| deadline.saturating_duration_since(Instant::now()))
225    }
226
227    /// Retrieve a vec of debounced events, removing them if not continuous
228    ///
229    /// Updates the internal tracker for the next tick
230    pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
231        let mut events_expired = Vec::with_capacity(self.event_map.len());
232        let mut data_back = HashMap::with_capacity(self.event_map.len());
233        // TODO: perfect fit for drain_filter https://github.com/rust-lang/rust/issues/59618
234        // reset deadline
235        self.debounce_deadline = None;
236        for (path, event) in self.event_map.drain() {
237            if event.update.elapsed() >= self.timeout {
238                log::trace!("debounced event: {:?}", DebouncedEventKind::Any);
239                events_expired.push(DebouncedEvent::new(path, DebouncedEventKind::Any));
240            } else if event.insert.elapsed() >= self.timeout {
241                log::trace!("debounced event: {:?}", DebouncedEventKind::AnyContinuous);
242                // set a new deadline, otherwise an 'AnyContinuous' will never resolve to a final 'Any' event
243                Self::check_deadline(
244                    self.batch_mode,
245                    self.timeout,
246                    &mut self.debounce_deadline,
247                    &event,
248                );
249                data_back.insert(path.clone(), event);
250                events_expired.push(DebouncedEvent::new(path, DebouncedEventKind::AnyContinuous));
251            } else {
252                // event is neither old enough for continuous event, nor is it expired for an Any event
253                Self::check_deadline(
254                    self.batch_mode,
255                    self.timeout,
256                    &mut self.debounce_deadline,
257                    &event,
258                );
259                data_back.insert(path, event);
260            }
261        }
262        self.event_map = data_back;
263        events_expired
264    }
265
266    /// Updates the deadline if none is set or when batch mode is disabled and the current deadline would miss the next event.
267    /// The new deadline is calculated based on the last event update time and the debounce timeout.
268    ///
269    /// can't sub-function this due to `event_map.drain()` holding `&mut self`
270    fn check_deadline(
271        batch_mode: bool,
272        timeout: Duration,
273        debounce_deadline: &mut Option<Instant>,
274        event: &EventData,
275    ) {
276        let deadline_candidate = event.update + timeout;
277        match debounce_deadline {
278            Some(current_deadline) => {
279                // shorten deadline to not delay the event
280                // with batch mode simply wait for the incoming deadline and delay the event until then
281                if !batch_mode && *current_deadline > deadline_candidate {
282                    *debounce_deadline = Some(deadline_candidate);
283                }
284            }
285            None => *debounce_deadline = Some(deadline_candidate),
286        }
287    }
288
289    /// Add new event to debouncer cache
290    #[inline(always)]
291    fn add_event(&mut self, event: Event) {
292        log::trace!("raw event: {event:?}");
293        let time = Instant::now();
294        if self.debounce_deadline.is_none() {
295            self.debounce_deadline = Some(time + self.timeout);
296        }
297        for path in event.paths.into_iter() {
298            if let Some(v) = self.event_map.get_mut(&path) {
299                v.update = time;
300            } else {
301                self.event_map.insert(path, EventData::new_any(time));
302            }
303        }
304    }
305}
306
307/// Debouncer guard, stops the debouncer on drop
308#[derive(Debug)]
309pub struct Debouncer<T: Watcher> {
310    watcher: T,
311    stop_channel: Sender<InnerEvent>,
312}
313
314impl<T: Watcher> Debouncer<T> {
315    /// Access to the internally used notify Watcher backend
316    pub fn watcher(&mut self) -> &mut dyn Watcher {
317        &mut self.watcher
318    }
319}
320
321impl<T: Watcher> Drop for Debouncer<T> {
322    fn drop(&mut self) {
323        // send error just means that it is stopped, can't do much else
324        let _ = self.stop_channel.send(InnerEvent::Shutdown);
325    }
326}
327
328/// Creates a new debounced watcher with custom configuration.
329pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher>(
330    config: Config,
331    mut event_handler: F,
332) -> Result<Debouncer<T>, Error> {
333    let (tx, rx) = std::sync::mpsc::channel();
334
335    std::thread::Builder::new()
336        .name("notify-rs debouncer loop".to_string())
337        .spawn(move || {
338            let mut data = DebounceDataInner::new(config.timeout, config.batch_mode);
339            let mut run = true;
340            while run {
341                match data.next_tick() {
342                    Some(timeout) => {
343                        // wait for wakeup
344                        match rx.recv_timeout(timeout) {
345                            Ok(InnerEvent::NotifyEvent(event_result)) => match event_result {
346                                Ok(event) => data.add_event(event),
347                                Err(err) => event_handler.handle_event(Err(err)),
348                            },
349                            Err(RecvTimeoutError::Timeout) => {
350                                let send_data = data.debounced_events();
351                                if !send_data.is_empty() {
352                                    event_handler.handle_event(Ok(send_data));
353                                }
354                            }
355                            Ok(InnerEvent::Shutdown) | Err(RecvTimeoutError::Disconnected) => {
356                                run = false
357                            }
358                        }
359                    }
360                    None => match rx.recv() {
361                        // no timeout, wait for event
362                        Ok(InnerEvent::NotifyEvent(e)) => match e {
363                            Ok(event) => data.add_event(event),
364                            Err(err) => event_handler.handle_event(Err(err)),
365                        },
366                        Ok(InnerEvent::Shutdown) => run = false,
367                        Err(_) => run = false,
368                    },
369                }
370            }
371        })?;
372
373    let tx_c = tx.clone();
374    let watcher = T::new(
375        move |e: Result<Event, Error>| {
376            // send failure can't be handled, would need a working channel to signal that
377            // also probably means that we're in the process of shutting down
378            let _ = tx_c.send(InnerEvent::NotifyEvent(e));
379        },
380        config.notify_config,
381    )?;
382
383    let guard = Debouncer {
384        watcher,
385        stop_channel: tx,
386    };
387
388    Ok(guard)
389}
390
391/// Short function to create a new debounced watcher with the recommended debouncer.
392///
393/// Timeout is the amount of time after which a debounced event is emitted or a continuous event is send, if there still are events incoming for the specific path.
394pub fn new_debouncer<F: DebounceEventHandler>(
395    timeout: Duration,
396    event_handler: F,
397) -> Result<Debouncer<RecommendedWatcher>, Error> {
398    let config = Config::default().with_timeout(timeout);
399    new_debouncer_opt::<F, RecommendedWatcher>(config, event_handler)
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405    use notify::RecursiveMode;
406    use std::fs;
407    use tempfile::tempdir;
408
409    #[test]
410    fn integration() -> Result<(), Box<dyn std::error::Error>> {
411        let dir = tempdir()?;
412
413        let (tx, rx) = std::sync::mpsc::channel();
414
415        let mut debouncer = new_debouncer(Duration::from_secs(1), tx)?;
416
417        debouncer
418            .watcher()
419            .watch(dir.path(), RecursiveMode::Recursive)?;
420
421        let file_path = dir.path().join("file.txt");
422        fs::write(&file_path, b"Lorem ipsum")?;
423
424        let events = rx
425            .recv_timeout(Duration::from_secs(10))
426            .expect("no events received")
427            .expect("received an error");
428
429        assert_eq!(
430            events,
431            vec![DebouncedEvent::new(file_path, DebouncedEventKind::Any)]
432        );
433
434        Ok(())
435    }
436}