notify_debouncer_mini/lib.rs
1//! Debouncer for [notify](https://crates.io/crates/notify). Filters incoming events and emits only one event per timeframe per file.
2//!
3//! # Installation
4//!
5//! ```toml
6//! [dependencies]
7//! notify-debouncer-mini = "0.6.0"
8//! ```
9//! In case you want to select specific features of notify,
10//! specify notify as dependency explicitly in your dependencies.
11//! Otherwise you can just use the re-export of notify from debouncer-mini.
12//! ```toml
13//! notify-debouncer-mini = "0.6.0"
14//! notify = { version = "..", features = [".."] }
15//! ```
16//!
17//! # Examples
18//! See also the full configuration example [here](https://github.com/notify-rs/notify/blob/main/examples/debouncer_mini_custom.rs).
19//!
20//! ```rust,no_run
21//! # use std::path::Path;
22//! # use std::time::Duration;
23//! use notify_debouncer_mini::{notify::*,new_debouncer,DebounceEventResult};
24//!
25//! # fn main() {
26//! // Select recommended watcher for debouncer.
27//! // Using a callback here, could also be a channel.
28//! let mut debouncer = new_debouncer(Duration::from_secs(2), |res: DebounceEventResult| {
29//! match res {
30//! Ok(events) => events.iter().for_each(|e|println!("Event {:?} for {:?}",e.kind,e.path)),
31//! Err(e) => println!("Error {:?}",e),
32//! }
33//! }).unwrap();
34//!
35//! // Add a path to be watched. All files and directories at that path and
36//! // below will be monitored for changes.
37//! debouncer.watcher().watch(Path::new("."), WatchMode::recursive()).unwrap();
38//!
39//! // note that dropping the debouncer (as will happen here) also ends the debouncer
40//! // thus this demo would need an endless loop to keep running
41//! # }
42//! ```
43//!
44//! # Features
45//!
46//! The following crate features can be turned on or off in your cargo dependency config:
47//!
48//! - `serde` passed down to notify-types, off by default
49//! - `crossbeam-channel` passed down to notify, off by default
50//! - `flume` passed down to notify, off by default
51//! - `macos_fsevent` passed down to notify, off by default
52//! - `macos_kqueue` passed down to notify, off by default
53//! - `serialization-compat-6` passed down to notify, off by default
54//!
55//! # Caveats
56//!
57//! As all file events are sourced from notify, the [known problems](https://docs.rs/notify/latest/notify/#known-problems) section applies here too.
58use std::{
59 collections::HashMap,
60 path::PathBuf,
61 sync::mpsc::{RecvTimeoutError, Sender},
62 time::{Duration, Instant},
63};
64
65pub use notify;
66pub use notify_types::debouncer_mini::{DebouncedEvent, DebouncedEventKind};
67
68use notify::{Error, Event, RecommendedWatcher, Watcher};
69
70/// The set of requirements for watcher debounce event handling functions.
71///
72/// # Example implementation
73///
74/// ```rust,no_run
75/// # use notify::{Event, Result, EventHandler};
76/// # use notify_debouncer_mini::{DebounceEventHandler,DebounceEventResult};
77///
78/// /// Prints received events
79/// struct EventPrinter;
80///
81/// impl DebounceEventHandler for EventPrinter {
82/// fn handle_event(&mut self, event: DebounceEventResult) {
83/// match event {
84/// Ok(events) => {
85/// for event in events {
86/// println!("Event {:?} for path {:?}",event.kind,event.path);
87/// }
88/// },
89/// // errors are immediately reported
90/// Err(error) => println!("Got error {:?}",error),
91/// }
92/// }
93/// }
94/// ```
95pub trait DebounceEventHandler: Send + 'static {
96 /// Handles an event.
97 fn handle_event(&mut self, event: DebounceEventResult);
98}
99
100/// Config for debouncer-mini
101/// ```rust
102/// # use std::time::Duration;
103/// use notify_debouncer_mini::Config;
104/// let backend_config = notify::Config::default();
105///
106/// let config = Config::default().with_timeout(Duration::from_secs(1)).with_batch_mode(true)
107/// .with_notify_config(backend_config);
108/// ```
109#[derive(Clone, Debug, Eq, Hash, PartialEq)]
110pub struct Config {
111 timeout: Duration,
112 batch_mode: bool,
113 notify_config: notify::Config,
114}
115
116impl Default for Config {
117 fn default() -> Self {
118 Self {
119 timeout: Duration::from_millis(500),
120 batch_mode: true,
121 notify_config: notify::Config::default(),
122 }
123 }
124}
125
126impl Config {
127 /// Set timeout
128 ///
129 /// Timeout is the amount of time after which a debounced event is emitted or a continuous event is send, if there still are events incoming for the specific path.
130 #[must_use]
131 pub fn with_timeout(mut self, timeout: Duration) -> Self {
132 self.timeout = timeout;
133 self
134 }
135 /// Set batch mode
136 ///
137 /// When `batch_mode` is enabled, events may be delayed (at most 2x the specified timeout) and delivered with others.
138 /// If disabled, all events are delivered immediately when their debounce timeout is reached.
139 #[must_use]
140 pub fn with_batch_mode(mut self, batch_mode: bool) -> Self {
141 self.batch_mode = batch_mode;
142 self
143 }
144 /// Set [`notify::Config`] for the backend
145 #[must_use]
146 pub fn with_notify_config(mut self, notify_config: notify::Config) -> Self {
147 self.notify_config = notify_config;
148 self
149 }
150}
151
152impl<F> DebounceEventHandler for F
153where
154 F: FnMut(DebounceEventResult) + Send + 'static,
155{
156 fn handle_event(&mut self, event: DebounceEventResult) {
157 (self)(event);
158 }
159}
160
161#[cfg(feature = "crossbeam-channel")]
162impl DebounceEventHandler for crossbeam_channel::Sender<DebounceEventResult> {
163 fn handle_event(&mut self, event: DebounceEventResult) {
164 let result = self.send(event);
165 if let Err(e) = result {
166 tracing::error!(?e, "failed to send debounce event result");
167 }
168 }
169}
170
171#[cfg(feature = "flume")]
172impl DebounceEventHandler for flume::Sender<DebounceEventResult> {
173 fn handle_event(&mut self, event: DebounceEventResult) {
174 let result = self.send(event);
175 if let Err(e) = result {
176 tracing::error!(?e, "failed to send debounce event result");
177 }
178 }
179}
180
181impl DebounceEventHandler for std::sync::mpsc::Sender<DebounceEventResult> {
182 fn handle_event(&mut self, event: DebounceEventResult) {
183 let result = self.send(event);
184 if let Err(e) = result {
185 tracing::error!(?e, "failed to send debounce event result");
186 }
187 }
188}
189
190/// Deduplicate event data entry
191#[derive(Debug)]
192struct EventData {
193 /// Insertion Time
194 insert: Instant,
195 /// Last Update
196 update: Instant,
197}
198
199impl EventData {
200 #[inline(always)]
201 fn new_any(time: Instant) -> Self {
202 Self {
203 insert: time,
204 update: time,
205 }
206 }
207}
208
209/// A result of debounced events.
210/// Comes with either a vec of events or an immediate error.
211pub type DebounceEventResult = Result<Vec<DebouncedEvent>, Error>;
212
213enum InnerEvent {
214 NotifyEvent(Result<Event, Error>),
215 Shutdown,
216}
217
218struct DebounceDataInner {
219 /// Path -> Event data
220 event_map: HashMap<PathBuf, EventData>,
221 /// timeout used to compare all events against, config
222 timeout: Duration,
223 /// Whether to time events exactly, or batch multiple together.
224 /// This reduces the amount of updates but possibly waiting longer than necessary for some events
225 batch_mode: bool,
226 /// next debounce deadline
227 debounce_deadline: Option<Instant>,
228}
229
230impl DebounceDataInner {
231 pub fn new(timeout: Duration, batch_mode: bool) -> Self {
232 Self {
233 timeout,
234 debounce_deadline: None,
235 event_map: HashMap::default(),
236 batch_mode,
237 }
238 }
239
240 /// Returns a duration to wait for the next tick
241 #[inline]
242 pub fn next_tick(&self) -> Option<Duration> {
243 self.debounce_deadline
244 .map(|deadline| deadline.saturating_duration_since(Instant::now()))
245 }
246
247 /// Retrieve a vec of debounced events, removing them if not continuous
248 ///
249 /// Updates the internal tracker for the next tick
250 pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
251 let mut events_expired = Vec::with_capacity(self.event_map.len());
252 let mut data_back = HashMap::with_capacity(self.event_map.len());
253 // TODO: perfect fit for drain_filter https://github.com/rust-lang/rust/issues/59618
254 // reset deadline
255 self.debounce_deadline = None;
256 for (path, event) in self.event_map.drain() {
257 if event.update.elapsed() >= self.timeout {
258 tracing::trace!("debounced event: {:?}", DebouncedEventKind::Any);
259 events_expired.push(DebouncedEvent::new(path, DebouncedEventKind::Any));
260 } else if event.insert.elapsed() >= self.timeout {
261 tracing::trace!("debounced event: {:?}", DebouncedEventKind::AnyContinuous);
262 // set a new deadline, otherwise an 'AnyContinuous' will never resolve to a final 'Any' event
263 Self::check_deadline(
264 self.batch_mode,
265 self.timeout,
266 &mut self.debounce_deadline,
267 &event,
268 );
269 data_back.insert(path.clone(), event);
270 events_expired.push(DebouncedEvent::new(path, DebouncedEventKind::AnyContinuous));
271 } else {
272 // event is neither old enough for continuous event, nor is it expired for an Any event
273 Self::check_deadline(
274 self.batch_mode,
275 self.timeout,
276 &mut self.debounce_deadline,
277 &event,
278 );
279 data_back.insert(path, event);
280 }
281 }
282 self.event_map = data_back;
283 events_expired
284 }
285
286 /// Updates the deadline if none is set or when batch mode is disabled and the current deadline would miss the next event.
287 /// The new deadline is calculated based on the last event update time and the debounce timeout.
288 ///
289 /// can't sub-function this due to `event_map.drain()` holding `&mut self`
290 fn check_deadline(
291 batch_mode: bool,
292 timeout: Duration,
293 debounce_deadline: &mut Option<Instant>,
294 event: &EventData,
295 ) {
296 let deadline_candidate = event.update + timeout;
297 match debounce_deadline {
298 Some(current_deadline) => {
299 // shorten deadline to not delay the event
300 // with batch mode simply wait for the incoming deadline and delay the event until then
301 if !batch_mode && *current_deadline > deadline_candidate {
302 *debounce_deadline = Some(deadline_candidate);
303 }
304 }
305 None => *debounce_deadline = Some(deadline_candidate),
306 }
307 }
308
309 /// Add new event to debouncer cache
310 #[inline(always)]
311 fn add_event(&mut self, event: Event) {
312 tracing::trace!("raw event: {event:?}");
313 let time = Instant::now();
314 if self.debounce_deadline.is_none() {
315 self.debounce_deadline = Some(time + self.timeout);
316 }
317 for path in event.paths {
318 if let Some(v) = self.event_map.get_mut(&path) {
319 v.update = time;
320 } else {
321 self.event_map.insert(path, EventData::new_any(time));
322 }
323 }
324 }
325}
326
327/// Debouncer guard, stops the debouncer on drop
328#[derive(Debug)]
329pub struct Debouncer<T: Watcher> {
330 watcher: T,
331 stop_channel: Sender<InnerEvent>,
332}
333
334impl<T: Watcher> Debouncer<T> {
335 /// Access to the internally used notify Watcher backend
336 pub fn watcher(&mut self) -> &mut dyn Watcher {
337 &mut self.watcher
338 }
339}
340
341impl<T: Watcher> Drop for Debouncer<T> {
342 fn drop(&mut self) {
343 // send error just means that it is stopped, can't do much else
344 let result = self.stop_channel.send(InnerEvent::Shutdown);
345 if let Err(e) = result {
346 tracing::error!(?e, "failed to send shutdown event");
347 }
348 }
349}
350
351/// Creates a new debounced watcher with custom configuration.
352#[expect(clippy::needless_pass_by_value)]
353#[tracing::instrument(level = "debug", skip(event_handler))]
354pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher>(
355 config: Config,
356 mut event_handler: F,
357) -> Result<Debouncer<T>, Error> {
358 let (tx, rx) = std::sync::mpsc::channel();
359
360 std::thread::Builder::new()
361 .name("notify-rs debouncer loop".to_string())
362 .spawn(move || {
363 let mut data = DebounceDataInner::new(config.timeout, config.batch_mode);
364 let mut run = true;
365 while run {
366 match data.next_tick() {
367 Some(timeout) => {
368 // wait for wakeup
369 match rx.recv_timeout(timeout) {
370 Ok(InnerEvent::NotifyEvent(event_result)) => match event_result {
371 Ok(event) => data.add_event(event),
372 Err(err) => event_handler.handle_event(Err(err)),
373 },
374 Err(RecvTimeoutError::Timeout) => {
375 let send_data = data.debounced_events();
376 if !send_data.is_empty() {
377 event_handler.handle_event(Ok(send_data));
378 }
379 }
380 Ok(InnerEvent::Shutdown) | Err(RecvTimeoutError::Disconnected) => {
381 run = false;
382 }
383 }
384 }
385 None => match rx.recv() {
386 // no timeout, wait for event
387 Ok(InnerEvent::NotifyEvent(e)) => match e {
388 Ok(event) => data.add_event(event),
389 Err(err) => event_handler.handle_event(Err(err)),
390 },
391 Ok(InnerEvent::Shutdown) | Err(_) => run = false,
392 },
393 }
394 }
395 })?;
396
397 let tx_c = tx.clone();
398 let watcher = T::new(
399 move |e: Result<Event, Error>| {
400 // send failure can't be handled, would need a working channel to signal that
401 // also probably means that we're in the process of shutting down
402 let result = tx_c.send(InnerEvent::NotifyEvent(e));
403 if let Err(e) = result {
404 tracing::error!(?e, "failed to send notify event");
405 }
406 },
407 config.notify_config,
408 )?;
409
410 let guard = Debouncer {
411 watcher,
412 stop_channel: tx,
413 };
414
415 Ok(guard)
416}
417
418/// Short function to create a new debounced watcher with the recommended debouncer.
419///
420/// Timeout is the amount of time after which a debounced event is emitted or a continuous event is send, if there still are events incoming for the specific path.
421pub fn new_debouncer<F: DebounceEventHandler>(
422 timeout: Duration,
423 event_handler: F,
424) -> Result<Debouncer<RecommendedWatcher>, Error> {
425 let config = Config::default().with_timeout(timeout);
426 new_debouncer_opt::<F, RecommendedWatcher>(config, event_handler)
427}
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432 use notify::WatchMode;
433 use std::fs;
434 use tempfile::tempdir;
435
436 #[expect(clippy::print_stdout)]
437 #[test]
438 fn integration() -> Result<(), Box<dyn std::error::Error>> {
439 let dir = tempdir()?;
440
441 // set up the watcher
442 let (tx, rx) = std::sync::mpsc::channel();
443 let mut debouncer = new_debouncer(Duration::from_secs(1), tx)?;
444 debouncer
445 .watcher()
446 .watch(dir.path(), WatchMode::recursive())?;
447
448 // create a new file
449 let file_path = dir.path().join("file.txt");
450 fs::write(&file_path, b"Lorem ipsum")?;
451
452 println!("waiting for event at {}", file_path.display());
453
454 // wait for up to 10 seconds for the create event, ignore all other events
455 let deadline = Instant::now() + Duration::from_secs(10);
456 while deadline > Instant::now() {
457 let events = rx
458 .recv_timeout(deadline - Instant::now())
459 .expect("did not receive expected event")
460 .expect("received an error");
461
462 for event in events {
463 if event == DebouncedEvent::new(file_path.clone(), DebouncedEventKind::Any)
464 || event
465 == DebouncedEvent::new(file_path.canonicalize()?, DebouncedEventKind::Any)
466 {
467 return Ok(());
468 }
469
470 println!("unexpected event: {event:?}");
471 }
472 }
473
474 panic!("did not receive expected event");
475 }
476}