notify_debouncer_mini/
lib.rs

1//! Debouncer for [notify](https://crates.io/crates/notify). Filters incoming events and emits only one event per timeframe per file.
2//!
3//! # Installation
4//!
5//! ```toml
6//! [dependencies]
7//! notify-debouncer-mini = "0.6.0"
8//! ```
9//! In case you want to select specific features of notify,
10//! specify notify as dependency explicitly in your dependencies.
11//! Otherwise you can just use the re-export of notify from debouncer-mini.
12//! ```toml
13//! notify-debouncer-mini = "0.6.0"
14//! notify = { version = "..", features = [".."] }
15//! ```
16//!
17//! # Examples
18//! See also the full configuration example [here](https://github.com/notify-rs/notify/blob/main/examples/debouncer_mini_custom.rs).
19//!
20//! ```rust,no_run
21//! # use std::path::Path;
22//! # use std::time::Duration;
23//! use notify_debouncer_mini::{notify::*,new_debouncer,DebounceEventResult};
24//!
25//! # fn main() {
26//!   // Select recommended watcher for debouncer.
27//!   // Using a callback here, could also be a channel.
28//!   let mut debouncer = new_debouncer(Duration::from_secs(2), |res: DebounceEventResult| {
29//!       match res {
30//!           Ok(events) => events.iter().for_each(|e|println!("Event {:?} for {:?}",e.kind,e.path)),
31//!           Err(e) => println!("Error {:?}",e),
32//!       }
33//!   }).unwrap();
34//!
35//!   // Add a path to be watched. All files and directories at that path and
36//!   // below will be monitored for changes.
37//!   debouncer.watcher().watch(Path::new("."), RecursiveMode::Recursive).unwrap();
38//!
39//!   // note that dropping the debouncer (as will happen here) also ends the debouncer
40//!   // thus this demo would need an endless loop to keep running
41//! # }
42//! ```
43//!
44//! # Features
45//!
46//! The following crate features can be turned on or off in your cargo dependency config:
47//!
48//! - `serde` passed down to notify-types, off by default
49//! - `crossbeam-channel` passed down to notify, off by default
50//! - `flume` passed down to notify, off by default
51//! - `macos_fsevent` passed down to notify, off by default
52//! - `macos_kqueue` passed down to notify, off by default
53//! - `serialization-compat-6` passed down to notify, off by default
54//!
55//! # Caveats
56//!
57//! As all file events are sourced from notify, the [known problems](https://docs.rs/notify/latest/notify/#known-problems) section applies here too.
58use std::{
59    collections::HashMap,
60    path::PathBuf,
61    sync::mpsc::{RecvTimeoutError, Sender},
62    time::{Duration, Instant},
63};
64
65pub use notify;
66pub use notify_types::debouncer_mini::{DebouncedEvent, DebouncedEventKind};
67
68use notify::{Error, Event, RecommendedWatcher, Watcher};
69
70/// The set of requirements for watcher debounce event handling functions.
71///
72/// # Example implementation
73///
74/// ```rust,no_run
75/// # use notify::{Event, Result, EventHandler};
76/// # use notify_debouncer_mini::{DebounceEventHandler,DebounceEventResult};
77///
78/// /// Prints received events
79/// struct EventPrinter;
80///
81/// impl DebounceEventHandler for EventPrinter {
82///     fn handle_event(&mut self, event: DebounceEventResult) {
83///         match event {
84///             Ok(events) => {
85///                 for event in events {
86///                     println!("Event {:?} for path {:?}",event.kind,event.path);
87///                 }
88///             },
89///             // errors are immediately reported
90///             Err(error) => println!("Got error {:?}",error),
91///         }
92///     }
93/// }
94/// ```
95pub trait DebounceEventHandler: Send + 'static {
96    /// Handles an event.
97    fn handle_event(&mut self, event: DebounceEventResult);
98}
99
100/// Config for debouncer-mini
101/// ```rust
102/// # use std::time::Duration;
103/// use notify_debouncer_mini::Config;
104/// let backend_config = notify::Config::default();
105///
106/// let config = Config::default().with_timeout(Duration::from_secs(1)).with_batch_mode(true)
107///     .with_notify_config(backend_config);
108/// ```
109#[derive(Clone, Debug, Eq, Hash, PartialEq)]
110pub struct Config {
111    timeout: Duration,
112    batch_mode: bool,
113    notify_config: notify::Config,
114}
115
116impl Default for Config {
117    fn default() -> Self {
118        Self {
119            timeout: Duration::from_millis(500),
120            batch_mode: true,
121            notify_config: notify::Config::default(),
122        }
123    }
124}
125
126impl Config {
127    /// Set timeout
128    ///
129    /// Timeout is the amount of time after which a debounced event is emitted or a continuous event is send, if there still are events incoming for the specific path.
130    pub fn with_timeout(mut self, timeout: Duration) -> Self {
131        self.timeout = timeout;
132        self
133    }
134    /// Set batch mode
135    ///
136    /// When `batch_mode` is enabled, events may be delayed (at most 2x the specified timeout) and delivered with others.
137    /// If disabled, all events are delivered immediately when their debounce timeout is reached.
138    pub fn with_batch_mode(mut self, batch_mode: bool) -> Self {
139        self.batch_mode = batch_mode;
140        self
141    }
142    /// Set [`notify::Config`] for the backend
143    pub fn with_notify_config(mut self, notify_config: notify::Config) -> Self {
144        self.notify_config = notify_config;
145        self
146    }
147}
148
149impl<F> DebounceEventHandler for F
150where
151    F: FnMut(DebounceEventResult) + Send + 'static,
152{
153    fn handle_event(&mut self, event: DebounceEventResult) {
154        (self)(event);
155    }
156}
157
158#[cfg(feature = "crossbeam-channel")]
159impl DebounceEventHandler for crossbeam_channel::Sender<DebounceEventResult> {
160    fn handle_event(&mut self, event: DebounceEventResult) {
161        let _ = self.send(event);
162    }
163}
164
165#[cfg(feature = "flume")]
166impl DebounceEventHandler for flume::Sender<DebounceEventResult> {
167    fn handle_event(&mut self, event: DebounceEventResult) {
168        let _ = self.send(event);
169    }
170}
171
172impl DebounceEventHandler for std::sync::mpsc::Sender<DebounceEventResult> {
173    fn handle_event(&mut self, event: DebounceEventResult) {
174        let _ = self.send(event);
175    }
176}
177
178/// Deduplicate event data entry
179#[derive(Debug)]
180struct EventData {
181    /// Insertion Time
182    insert: Instant,
183    /// Last Update
184    update: Instant,
185}
186
187impl EventData {
188    #[inline(always)]
189    fn new_any(time: Instant) -> Self {
190        Self {
191            insert: time,
192            update: time,
193        }
194    }
195}
196
197/// A result of debounced events.
198/// Comes with either a vec of events or an immediate error.
199pub type DebounceEventResult = Result<Vec<DebouncedEvent>, Error>;
200
201enum InnerEvent {
202    NotifyEvent(Result<Event, Error>),
203    Shutdown,
204}
205
206struct DebounceDataInner {
207    /// Path -> Event data
208    event_map: HashMap<PathBuf, EventData>,
209    /// timeout used to compare all events against, config
210    timeout: Duration,
211    /// Whether to time events exactly, or batch multiple together.
212    /// This reduces the amount of updates but possibly waiting longer than necessary for some events
213    batch_mode: bool,
214    /// next debounce deadline
215    debounce_deadline: Option<Instant>,
216}
217
218impl DebounceDataInner {
219    pub fn new(timeout: Duration, batch_mode: bool) -> Self {
220        Self {
221            timeout,
222            debounce_deadline: None,
223            event_map: Default::default(),
224            batch_mode,
225        }
226    }
227
228    /// Returns a duration to wait for the next tick
229    #[inline]
230    pub fn next_tick(&self) -> Option<Duration> {
231        self.debounce_deadline
232            .map(|deadline| deadline.saturating_duration_since(Instant::now()))
233    }
234
235    /// Retrieve a vec of debounced events, removing them if not continuous
236    ///
237    /// Updates the internal tracker for the next tick
238    pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
239        let mut events_expired = Vec::with_capacity(self.event_map.len());
240        let mut data_back = HashMap::with_capacity(self.event_map.len());
241        // TODO: perfect fit for drain_filter https://github.com/rust-lang/rust/issues/59618
242        // reset deadline
243        self.debounce_deadline = None;
244        for (path, event) in self.event_map.drain() {
245            if event.update.elapsed() >= self.timeout {
246                log::trace!("debounced event: {:?}", DebouncedEventKind::Any);
247                events_expired.push(DebouncedEvent::new(path, DebouncedEventKind::Any));
248            } else if event.insert.elapsed() >= self.timeout {
249                log::trace!("debounced event: {:?}", DebouncedEventKind::AnyContinuous);
250                // set a new deadline, otherwise an 'AnyContinuous' will never resolve to a final 'Any' event
251                Self::check_deadline(
252                    self.batch_mode,
253                    self.timeout,
254                    &mut self.debounce_deadline,
255                    &event,
256                );
257                data_back.insert(path.clone(), event);
258                events_expired.push(DebouncedEvent::new(path, DebouncedEventKind::AnyContinuous));
259            } else {
260                // event is neither old enough for continuous event, nor is it expired for an Any event
261                Self::check_deadline(
262                    self.batch_mode,
263                    self.timeout,
264                    &mut self.debounce_deadline,
265                    &event,
266                );
267                data_back.insert(path, event);
268            }
269        }
270        self.event_map = data_back;
271        events_expired
272    }
273
274    /// Updates the deadline if none is set or when batch mode is disabled and the current deadline would miss the next event.
275    /// The new deadline is calculated based on the last event update time and the debounce timeout.
276    ///
277    /// can't sub-function this due to `event_map.drain()` holding `&mut self`
278    fn check_deadline(
279        batch_mode: bool,
280        timeout: Duration,
281        debounce_deadline: &mut Option<Instant>,
282        event: &EventData,
283    ) {
284        let deadline_candidate = event.update + timeout;
285        match debounce_deadline {
286            Some(current_deadline) => {
287                // shorten deadline to not delay the event
288                // with batch mode simply wait for the incoming deadline and delay the event until then
289                if !batch_mode && *current_deadline > deadline_candidate {
290                    *debounce_deadline = Some(deadline_candidate);
291                }
292            }
293            None => *debounce_deadline = Some(deadline_candidate),
294        }
295    }
296
297    /// Add new event to debouncer cache
298    #[inline(always)]
299    fn add_event(&mut self, event: Event) {
300        log::trace!("raw event: {event:?}");
301        let time = Instant::now();
302        if self.debounce_deadline.is_none() {
303            self.debounce_deadline = Some(time + self.timeout);
304        }
305        for path in event.paths.into_iter() {
306            if let Some(v) = self.event_map.get_mut(&path) {
307                v.update = time;
308            } else {
309                self.event_map.insert(path, EventData::new_any(time));
310            }
311        }
312    }
313}
314
315/// Debouncer guard, stops the debouncer on drop
316#[derive(Debug)]
317pub struct Debouncer<T: Watcher> {
318    watcher: T,
319    stop_channel: Sender<InnerEvent>,
320}
321
322impl<T: Watcher> Debouncer<T> {
323    /// Access to the internally used notify Watcher backend
324    pub fn watcher(&mut self) -> &mut dyn Watcher {
325        &mut self.watcher
326    }
327}
328
329impl<T: Watcher> Drop for Debouncer<T> {
330    fn drop(&mut self) {
331        // send error just means that it is stopped, can't do much else
332        let _ = self.stop_channel.send(InnerEvent::Shutdown);
333    }
334}
335
336/// Creates a new debounced watcher with custom configuration.
337pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher>(
338    config: Config,
339    mut event_handler: F,
340) -> Result<Debouncer<T>, Error> {
341    let (tx, rx) = std::sync::mpsc::channel();
342
343    std::thread::Builder::new()
344        .name("notify-rs debouncer loop".to_string())
345        .spawn(move || {
346            let mut data = DebounceDataInner::new(config.timeout, config.batch_mode);
347            let mut run = true;
348            while run {
349                match data.next_tick() {
350                    Some(timeout) => {
351                        // wait for wakeup
352                        match rx.recv_timeout(timeout) {
353                            Ok(InnerEvent::NotifyEvent(event_result)) => match event_result {
354                                Ok(event) => data.add_event(event),
355                                Err(err) => event_handler.handle_event(Err(err)),
356                            },
357                            Err(RecvTimeoutError::Timeout) => {
358                                let send_data = data.debounced_events();
359                                if !send_data.is_empty() {
360                                    event_handler.handle_event(Ok(send_data));
361                                }
362                            }
363                            Ok(InnerEvent::Shutdown) | Err(RecvTimeoutError::Disconnected) => {
364                                run = false
365                            }
366                        }
367                    }
368                    None => match rx.recv() {
369                        // no timeout, wait for event
370                        Ok(InnerEvent::NotifyEvent(e)) => match e {
371                            Ok(event) => data.add_event(event),
372                            Err(err) => event_handler.handle_event(Err(err)),
373                        },
374                        Ok(InnerEvent::Shutdown) => run = false,
375                        Err(_) => run = false,
376                    },
377                }
378            }
379        })?;
380
381    let tx_c = tx.clone();
382    let watcher = T::new(
383        move |e: Result<Event, Error>| {
384            // send failure can't be handled, would need a working channel to signal that
385            // also probably means that we're in the process of shutting down
386            let _ = tx_c.send(InnerEvent::NotifyEvent(e));
387        },
388        config.notify_config,
389    )?;
390
391    let guard = Debouncer {
392        watcher,
393        stop_channel: tx,
394    };
395
396    Ok(guard)
397}
398
399/// Short function to create a new debounced watcher with the recommended debouncer.
400///
401/// Timeout is the amount of time after which a debounced event is emitted or a continuous event is send, if there still are events incoming for the specific path.
402pub fn new_debouncer<F: DebounceEventHandler>(
403    timeout: Duration,
404    event_handler: F,
405) -> Result<Debouncer<RecommendedWatcher>, Error> {
406    let config = Config::default().with_timeout(timeout);
407    new_debouncer_opt::<F, RecommendedWatcher>(config, event_handler)
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413    use notify::RecursiveMode;
414    use std::fs;
415    use tempfile::tempdir;
416
417    #[test]
418    fn integration() -> Result<(), Box<dyn std::error::Error>> {
419        let dir = tempdir()?;
420
421        // set up the watcher
422        let (tx, rx) = std::sync::mpsc::channel();
423        let mut debouncer = new_debouncer(Duration::from_secs(1), tx)?;
424        debouncer
425            .watcher()
426            .watch(dir.path(), RecursiveMode::Recursive)?;
427
428        // create a new file
429        let file_path = dir.path().join("file.txt");
430        fs::write(&file_path, b"Lorem ipsum")?;
431
432        println!("waiting for event at {}", file_path.display());
433
434        // wait for up to 10 seconds for the create event, ignore all other events
435        let deadline = Instant::now() + Duration::from_secs(10);
436        while deadline > Instant::now() {
437            let events = rx
438                .recv_timeout(deadline - Instant::now())
439                .expect("did not receive expected event")
440                .expect("received an error");
441
442            for event in events {
443                if event == DebouncedEvent::new(file_path.clone(), DebouncedEventKind::Any)
444                    || event
445                        == DebouncedEvent::new(file_path.canonicalize()?, DebouncedEventKind::Any)
446                {
447                    return Ok(());
448                }
449
450                println!("unexpected event: {event:?}");
451            }
452        }
453
454        panic!("did not receive expected event");
455    }
456}