notify_debouncer_mini/lib.rs
1//! Debouncer for [notify](https://crates.io/crates/notify). Filters incoming events and emits only one event per timeframe per file.
2//!
3//! # Installation
4//!
5//! ```toml
6//! [dependencies]
7//! notify-debouncer-mini = "0.6.0"
8//! ```
9//! In case you want to select specific features of notify,
10//! specify notify as dependency explicitly in your dependencies.
11//! Otherwise you can just use the re-export of notify from debouncer-mini.
12//! ```toml
13//! notify-debouncer-mini = "0.6.0"
14//! notify = { version = "..", features = [".."] }
15//! ```
16//!
17//! # Examples
18//! See also the full configuration example [here](https://github.com/notify-rs/notify/blob/main/examples/debouncer_mini_custom.rs).
19//!
20//! ```rust,no_run
21//! # use std::path::Path;
22//! # use std::time::Duration;
23//! use notify_debouncer_mini::{notify::*,new_debouncer,DebounceEventResult};
24//!
25//! # fn main() {
26//! // Select recommended watcher for debouncer.
27//! // Using a callback here, could also be a channel.
28//! let mut debouncer = new_debouncer(Duration::from_secs(2), |res: DebounceEventResult| {
29//! match res {
30//! Ok(events) => events.iter().for_each(|e|println!("Event {:?} for {:?}",e.kind,e.path)),
31//! Err(e) => println!("Error {:?}",e),
32//! }
33//! }).unwrap();
34//!
35//! // Add a path to be watched. All files and directories at that path and
36//! // below will be monitored for changes.
37//! debouncer.watcher().watch(Path::new("."), RecursiveMode::Recursive).unwrap();
38//!
39//! // note that dropping the debouncer (as will happen here) also ends the debouncer
40//! // thus this demo would need an endless loop to keep running
41//! # }
42//! ```
43//!
44//! # Features
45//!
46//! The following crate features can be turned on or off in your cargo dependency config:
47//!
48//! - `serde` passed down to notify-types, off by default
49//! - `crossbeam-channel` passed down to notify, off by default
50//! - `macos_fsevent` passed down to notify, off by default
51//! - `macos_kqueue` passed down to notify, off by default
52//! - `serialization-compat-6` passed down to notify, off by default
53//!
54//! # Caveats
55//!
56//! As all file events are sourced from notify, the [known problems](https://docs.rs/notify/latest/notify/#known-problems) section applies here too.
57use std::{
58 collections::HashMap,
59 path::PathBuf,
60 sync::mpsc::{RecvTimeoutError, Sender},
61 time::{Duration, Instant},
62};
63
64pub use notify;
65pub use notify_types::debouncer_mini::{DebouncedEvent, DebouncedEventKind};
66
67use notify::{Error, Event, RecommendedWatcher, Watcher};
68
69/// The set of requirements for watcher debounce event handling functions.
70///
71/// # Example implementation
72///
73/// ```rust,no_run
74/// # use notify::{Event, Result, EventHandler};
75/// # use notify_debouncer_mini::{DebounceEventHandler,DebounceEventResult};
76///
77/// /// Prints received events
78/// struct EventPrinter;
79///
80/// impl DebounceEventHandler for EventPrinter {
81/// fn handle_event(&mut self, event: DebounceEventResult) {
82/// match event {
83/// Ok(events) => {
84/// for event in events {
85/// println!("Event {:?} for path {:?}",event.kind,event.path);
86/// }
87/// },
88/// // errors are immediately reported
89/// Err(error) => println!("Got error {:?}",error),
90/// }
91/// }
92/// }
93/// ```
94pub trait DebounceEventHandler: Send + 'static {
95 /// Handles an event.
96 fn handle_event(&mut self, event: DebounceEventResult);
97}
98
99/// Config for debouncer-mini
100/// ```rust
101/// # use std::time::Duration;
102/// use notify_debouncer_mini::Config;
103/// let backend_config = notify::Config::default();
104///
105/// let config = Config::default().with_timeout(Duration::from_secs(1)).with_batch_mode(true)
106/// .with_notify_config(backend_config);
107/// ```
108#[derive(Clone, Debug, Eq, Hash, PartialEq)]
109pub struct Config {
110 timeout: Duration,
111 batch_mode: bool,
112 notify_config: notify::Config,
113}
114
115impl Default for Config {
116 fn default() -> Self {
117 Self {
118 timeout: Duration::from_millis(500),
119 batch_mode: true,
120 notify_config: notify::Config::default(),
121 }
122 }
123}
124
125impl Config {
126 /// Set timeout
127 ///
128 /// Timeout is the amount of time after which a debounced event is emitted or a continuous event is send, if there still are events incoming for the specific path.
129 pub fn with_timeout(mut self, timeout: Duration) -> Self {
130 self.timeout = timeout;
131 self
132 }
133 /// Set batch mode
134 ///
135 /// When `batch_mode` is enabled, events may be delayed (at most 2x the specified timeout) and delivered with others.
136 /// If disabled, all events are delivered immediately when their debounce timeout is reached.
137 pub fn with_batch_mode(mut self, batch_mode: bool) -> Self {
138 self.batch_mode = batch_mode;
139 self
140 }
141 /// Set [`notify::Config`] for the backend
142 pub fn with_notify_config(mut self, notify_config: notify::Config) -> Self {
143 self.notify_config = notify_config;
144 self
145 }
146}
147
148impl<F> DebounceEventHandler for F
149where
150 F: FnMut(DebounceEventResult) + Send + 'static,
151{
152 fn handle_event(&mut self, event: DebounceEventResult) {
153 (self)(event);
154 }
155}
156
157#[cfg(feature = "crossbeam-channel")]
158impl DebounceEventHandler for crossbeam_channel::Sender<DebounceEventResult> {
159 fn handle_event(&mut self, event: DebounceEventResult) {
160 let _ = self.send(event);
161 }
162}
163
164impl DebounceEventHandler for std::sync::mpsc::Sender<DebounceEventResult> {
165 fn handle_event(&mut self, event: DebounceEventResult) {
166 let _ = self.send(event);
167 }
168}
169
170/// Deduplicate event data entry
171#[derive(Debug)]
172struct EventData {
173 /// Insertion Time
174 insert: Instant,
175 /// Last Update
176 update: Instant,
177}
178
179impl EventData {
180 #[inline(always)]
181 fn new_any(time: Instant) -> Self {
182 Self {
183 insert: time,
184 update: time,
185 }
186 }
187}
188
189/// A result of debounced events.
190/// Comes with either a vec of events or an immediate error.
191pub type DebounceEventResult = Result<Vec<DebouncedEvent>, Error>;
192
193enum InnerEvent {
194 NotifyEvent(Result<Event, Error>),
195 Shutdown,
196}
197
198struct DebounceDataInner {
199 /// Path -> Event data
200 event_map: HashMap<PathBuf, EventData>,
201 /// timeout used to compare all events against, config
202 timeout: Duration,
203 /// Whether to time events exactly, or batch multiple together.
204 /// This reduces the amount of updates but possibly waiting longer than necessary for some events
205 batch_mode: bool,
206 /// next debounce deadline
207 debounce_deadline: Option<Instant>,
208}
209
210impl DebounceDataInner {
211 pub fn new(timeout: Duration, batch_mode: bool) -> Self {
212 Self {
213 timeout,
214 debounce_deadline: None,
215 event_map: Default::default(),
216 batch_mode,
217 }
218 }
219
220 /// Returns a duration to wait for the next tick
221 #[inline]
222 pub fn next_tick(&self) -> Option<Duration> {
223 self.debounce_deadline
224 .map(|deadline| deadline.saturating_duration_since(Instant::now()))
225 }
226
227 /// Retrieve a vec of debounced events, removing them if not continuous
228 ///
229 /// Updates the internal tracker for the next tick
230 pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
231 let mut events_expired = Vec::with_capacity(self.event_map.len());
232 let mut data_back = HashMap::with_capacity(self.event_map.len());
233 // TODO: perfect fit for drain_filter https://github.com/rust-lang/rust/issues/59618
234 // reset deadline
235 self.debounce_deadline = None;
236 for (path, event) in self.event_map.drain() {
237 if event.update.elapsed() >= self.timeout {
238 log::trace!("debounced event: {:?}", DebouncedEventKind::Any);
239 events_expired.push(DebouncedEvent::new(path, DebouncedEventKind::Any));
240 } else if event.insert.elapsed() >= self.timeout {
241 log::trace!("debounced event: {:?}", DebouncedEventKind::AnyContinuous);
242 // set a new deadline, otherwise an 'AnyContinuous' will never resolve to a final 'Any' event
243 Self::check_deadline(
244 self.batch_mode,
245 self.timeout,
246 &mut self.debounce_deadline,
247 &event,
248 );
249 data_back.insert(path.clone(), event);
250 events_expired.push(DebouncedEvent::new(path, DebouncedEventKind::AnyContinuous));
251 } else {
252 // event is neither old enough for continuous event, nor is it expired for an Any event
253 Self::check_deadline(
254 self.batch_mode,
255 self.timeout,
256 &mut self.debounce_deadline,
257 &event,
258 );
259 data_back.insert(path, event);
260 }
261 }
262 self.event_map = data_back;
263 events_expired
264 }
265
266 /// Updates the deadline if none is set or when batch mode is disabled and the current deadline would miss the next event.
267 /// The new deadline is calculated based on the last event update time and the debounce timeout.
268 ///
269 /// can't sub-function this due to `event_map.drain()` holding `&mut self`
270 fn check_deadline(
271 batch_mode: bool,
272 timeout: Duration,
273 debounce_deadline: &mut Option<Instant>,
274 event: &EventData,
275 ) {
276 let deadline_candidate = event.update + timeout;
277 match debounce_deadline {
278 Some(current_deadline) => {
279 // shorten deadline to not delay the event
280 // with batch mode simply wait for the incoming deadline and delay the event until then
281 if !batch_mode && *current_deadline > deadline_candidate {
282 *debounce_deadline = Some(deadline_candidate);
283 }
284 }
285 None => *debounce_deadline = Some(deadline_candidate),
286 }
287 }
288
289 /// Add new event to debouncer cache
290 #[inline(always)]
291 fn add_event(&mut self, event: Event) {
292 log::trace!("raw event: {event:?}");
293 let time = Instant::now();
294 if self.debounce_deadline.is_none() {
295 self.debounce_deadline = Some(time + self.timeout);
296 }
297 for path in event.paths.into_iter() {
298 if let Some(v) = self.event_map.get_mut(&path) {
299 v.update = time;
300 } else {
301 self.event_map.insert(path, EventData::new_any(time));
302 }
303 }
304 }
305}
306
307/// Debouncer guard, stops the debouncer on drop
308#[derive(Debug)]
309pub struct Debouncer<T: Watcher> {
310 watcher: T,
311 stop_channel: Sender<InnerEvent>,
312}
313
314impl<T: Watcher> Debouncer<T> {
315 /// Access to the internally used notify Watcher backend
316 pub fn watcher(&mut self) -> &mut dyn Watcher {
317 &mut self.watcher
318 }
319}
320
321impl<T: Watcher> Drop for Debouncer<T> {
322 fn drop(&mut self) {
323 // send error just means that it is stopped, can't do much else
324 let _ = self.stop_channel.send(InnerEvent::Shutdown);
325 }
326}
327
328/// Creates a new debounced watcher with custom configuration.
329pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher>(
330 config: Config,
331 mut event_handler: F,
332) -> Result<Debouncer<T>, Error> {
333 let (tx, rx) = std::sync::mpsc::channel();
334
335 std::thread::Builder::new()
336 .name("notify-rs debouncer loop".to_string())
337 .spawn(move || {
338 let mut data = DebounceDataInner::new(config.timeout, config.batch_mode);
339 let mut run = true;
340 while run {
341 match data.next_tick() {
342 Some(timeout) => {
343 // wait for wakeup
344 match rx.recv_timeout(timeout) {
345 Ok(InnerEvent::NotifyEvent(event_result)) => match event_result {
346 Ok(event) => data.add_event(event),
347 Err(err) => event_handler.handle_event(Err(err)),
348 },
349 Err(RecvTimeoutError::Timeout) => {
350 let send_data = data.debounced_events();
351 if !send_data.is_empty() {
352 event_handler.handle_event(Ok(send_data));
353 }
354 }
355 Ok(InnerEvent::Shutdown) | Err(RecvTimeoutError::Disconnected) => {
356 run = false
357 }
358 }
359 }
360 None => match rx.recv() {
361 // no timeout, wait for event
362 Ok(InnerEvent::NotifyEvent(e)) => match e {
363 Ok(event) => data.add_event(event),
364 Err(err) => event_handler.handle_event(Err(err)),
365 },
366 Ok(InnerEvent::Shutdown) => run = false,
367 Err(_) => run = false,
368 },
369 }
370 }
371 })?;
372
373 let tx_c = tx.clone();
374 let watcher = T::new(
375 move |e: Result<Event, Error>| {
376 // send failure can't be handled, would need a working channel to signal that
377 // also probably means that we're in the process of shutting down
378 let _ = tx_c.send(InnerEvent::NotifyEvent(e));
379 },
380 config.notify_config,
381 )?;
382
383 let guard = Debouncer {
384 watcher,
385 stop_channel: tx,
386 };
387
388 Ok(guard)
389}
390
391/// Short function to create a new debounced watcher with the recommended debouncer.
392///
393/// Timeout is the amount of time after which a debounced event is emitted or a continuous event is send, if there still are events incoming for the specific path.
394pub fn new_debouncer<F: DebounceEventHandler>(
395 timeout: Duration,
396 event_handler: F,
397) -> Result<Debouncer<RecommendedWatcher>, Error> {
398 let config = Config::default().with_timeout(timeout);
399 new_debouncer_opt::<F, RecommendedWatcher>(config, event_handler)
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405 use notify::RecursiveMode;
406 use std::fs;
407 use tempfile::tempdir;
408
409 #[test]
410 fn integration() -> Result<(), Box<dyn std::error::Error>> {
411 let dir = tempdir()?;
412
413 let (tx, rx) = std::sync::mpsc::channel();
414
415 let mut debouncer = new_debouncer(Duration::from_secs(1), tx)?;
416
417 debouncer
418 .watcher()
419 .watch(dir.path(), RecursiveMode::Recursive)?;
420
421 let file_path = dir.path().join("file.txt");
422 fs::write(&file_path, b"Lorem ipsum")?;
423
424 let events = rx
425 .recv_timeout(Duration::from_secs(10))
426 .expect("no events received")
427 .expect("received an error");
428
429 assert_eq!(
430 events,
431 vec![DebouncedEvent::new(file_path, DebouncedEventKind::Any)]
432 );
433
434 Ok(())
435 }
436}