async_watcher/
lib.rs

1use crate::error::Error;
2use std::{
3    collections::HashMap,
4    path::PathBuf,
5    sync::{
6        atomic::{AtomicBool, Ordering},
7        Arc, Mutex,
8    },
9    time::{Duration, Instant},
10};
11
12pub use notify;
13use notify::{Error as NotifyError, Event, RecommendedWatcher, Watcher};
14
15pub mod error;
16
17/// Deduplicate event data
18struct EventData {
19    /// Insertion Time
20    insert: Instant,
21    /// Last Update
22    update: Instant,
23}
24
25impl EventData {
26    fn new_any() -> Self {
27        let time = Instant::now();
28        Self {
29            insert: time,
30            update: time,
31        }
32    }
33}
34
35/// The types of events that can be debounced.
36#[derive(Clone, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum DebouncedEventKind {
39    /// No precise events
40    Any,
41    /// Event where debounce timed out (for example continuous writes)
42    AnyContinuous,
43}
44
45/// A debounced event.
46///
47/// Does not emit any specific event type on purpose, only distinguishes between an any event and a continuous any event.
48#[derive(Clone, Debug, Eq, Hash, PartialEq)]
49#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
50pub struct DebouncedEvent {
51    /// Event path
52    pub path: PathBuf,
53    /// Event kind
54    pub kind: DebouncedEventKind,
55    /// Base event from notify
56    pub event: Event,
57}
58
59impl DebouncedEvent {
60    fn new(path: PathBuf, kind: DebouncedEventKind, event: Event) -> Self {
61        Self { path, kind, event }
62    }
63}
64
65impl AsRef<Event> for DebouncedEvent {
66    fn as_ref(&self) -> &Event {
67        &self.event
68    }
69}
70
71type DebounceData = Arc<Mutex<DebounceDataInner>>;
72
73#[derive(Default)]
74struct DebounceDataInner {
75    d: HashMap<PathBuf, (Event, EventData)>,
76    timeout: Duration,
77    e: Vec<NotifyError>,
78}
79
80impl DebounceDataInner {
81    /// Retrieve a vec of debounced events, removing them if they are not continuous
82    pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
83        let mut events_expired = Vec::with_capacity(self.d.len());
84        let mut data_back = HashMap::with_capacity(self.d.len());
85
86        // TODO: drain_filter https://github.com/rust-lang/rust/issues/59618
87        for (k, (e, v)) in self.d.drain() {
88            if v.update.elapsed() >= self.timeout {
89                events_expired.push(DebouncedEvent::new(k, DebouncedEventKind::Any, e));
90            } else if v.insert.elapsed() >= self.timeout {
91                data_back.insert(k.clone(), (e.clone(), v));
92                events_expired.push(DebouncedEvent::new(k, DebouncedEventKind::AnyContinuous, e));
93            } else {
94                data_back.insert(k, (e, v));
95            }
96        }
97
98        self.d = data_back;
99        events_expired
100    }
101
102    /// Takes all currently stored errors
103    pub fn errors(&mut self) -> Vec<NotifyError> {
104        std::mem::take(&mut self.e)
105    }
106
107    /// Add an error entry to re-send later on
108    pub fn add_error(&mut self, e: NotifyError) {
109        self.e.push(e);
110    }
111
112    /// Add new event to debouncer cache
113    pub fn add_event(&mut self, e: Event) {
114        for path in e.paths.clone().into_iter() {
115            if let Some((_, v)) = self.d.get_mut(&path) {
116                v.update = Instant::now();
117            } else {
118                self.d.insert(path, (e.clone(), EventData::new_any()));
119            }
120        }
121    }
122}
123
124#[async_trait::async_trait]
125pub trait AsyncDebounceEventHandler {
126    async fn handle_event(&mut self, event: Result<Vec<DebouncedEvent>, Vec<NotifyError>>);
127}
128
129#[async_trait::async_trait]
130impl<F> AsyncDebounceEventHandler for F
131where
132    F: FnMut(Result<Vec<DebouncedEvent>, Vec<NotifyError>>) + Send + 'static,
133{
134    async fn handle_event(&mut self, event: Result<Vec<DebouncedEvent>, Vec<NotifyError>>) {
135        self(event)
136    }
137}
138
139#[async_trait::async_trait]
140impl AsyncDebounceEventHandler
141    for tokio::sync::mpsc::Sender<Result<Vec<DebouncedEvent>, Vec<NotifyError>>>
142{
143    async fn handle_event(&mut self, event: Result<Vec<DebouncedEvent>, Vec<NotifyError>>) {
144        let _ = self.send(event).await;
145    }
146}
147
148pub struct AsyncDebouncer<T: Watcher> {
149    stop: Arc<AtomicBool>,
150    watcher: T,
151    debouncer_task: Option<tokio::task::JoinHandle<()>>,
152}
153
154impl<T: Watcher> AsyncDebouncer<T> {
155    pub async fn stop(mut self) {
156        self.set_stop();
157        if let Some(t) = self.debouncer_task.take() {
158            let _ = t.await;
159        }
160    }
161
162    fn set_stop(&self) {
163        self.stop.store(true, Ordering::Relaxed);
164    }
165
166    pub fn watcher(&mut self) -> &mut dyn Watcher {
167        &mut self.watcher
168    }
169}
170
171impl<T: Watcher> Drop for AsyncDebouncer<T> {
172    fn drop(&mut self) {
173        // don't block on drop
174        self.set_stop();
175    }
176}
177
178impl<T: Watcher> AsyncDebouncer<T> {
179    /// Creates a new debounced watcher with custom configuration.
180    /// The timeout specifies the amount of time that must elapse before an event is emitted, or a
181    /// continuous event is sent (if there still are events incoming for the specific path).
182    /// If tick_rate is set to None, then a tick rate will be selected that is less than the provided timeout.
183    pub async fn new_with_opts<F: AsyncDebounceEventHandler + Send + 'static>(
184        timeout: Duration,
185        tick_rate: Option<Duration>,
186        mut event_handler: F,
187        config: notify::Config,
188    ) -> Result<Self, Error> {
189        let data = DebounceData::default();
190
191        let stop = Arc::new(AtomicBool::new(false));
192
193        let tick_div = 4;
194        let tick = match tick_rate {
195            Some(v) => {
196                if v > timeout {
197                    return Err(Error::InvalidTickRate(v, timeout));
198                }
199                v
200            }
201            None => timeout
202                .checked_div(tick_div)
203                .ok_or_else(|| Error::FailedToCalculateTick(timeout, tick_div))?,
204        };
205
206        {
207            let mut data_w = data.lock().unwrap();
208            data_w.timeout = timeout;
209        }
210
211        let data_c = data.clone();
212        let stop_c = stop.clone();
213        let debouncer_task = tokio::spawn(async move {
214            loop {
215                if stop_c.load(Ordering::Acquire) {
216                    break;
217                }
218                tokio::time::sleep(tick).await;
219                let send_data;
220                let errors: Vec<NotifyError>;
221                {
222                    let mut lock = data_c.lock().expect("can't lock debouncer data");
223                    send_data = lock.debounced_events();
224                    errors = lock.errors();
225                }
226                if !send_data.is_empty() {
227                    event_handler.handle_event(Ok(send_data)).await;
228                }
229                if !errors.is_empty() {
230                    event_handler.handle_event(Err(errors)).await;
231                }
232            }
233        });
234
235        let watcher = T::new(
236            move |e: Result<Event, NotifyError>| {
237                let mut lock = data.lock().expect("can't lock debouncer data");
238
239                match e {
240                    Ok(e) => lock.add_event(e),
241                    // errors are stored and sent later on
242                    Err(e) => lock.add_error(e),
243                }
244            },
245            config,
246        )?;
247
248        let guard = AsyncDebouncer {
249            watcher,
250            debouncer_task: Some(debouncer_task),
251            stop,
252        };
253
254        Ok(guard)
255    }
256
257    /// Creates a new debounced watcher with custom configuration.
258    /// The timeout specifies the amount of time that must elapse before an event is emitted, or a
259    /// continuous event is sent (if there still are events incoming for the specific path).
260    /// If tick_rate is set to None, then a tick rate will be selected that is less than the provided timeout.
261    /// A handle to the debouncer is returned alongside a channel that can be used to receive events.
262    pub async fn new_with_channel_and_opts<F: AsyncDebounceEventHandler + Send + 'static>(
263        timeout: Duration,
264        tick_rate: Option<Duration>,
265        config: notify::Config,
266    ) -> Result<
267        (
268            Self,
269            tokio::sync::mpsc::Receiver<Result<Vec<DebouncedEvent>, Vec<NotifyError>>>,
270        ),
271        Error,
272    > {
273        let (tx, rx) = tokio::sync::mpsc::channel(1);
274        let debouncer = Self::new_with_opts(timeout, tick_rate, tx, config).await?;
275        Ok((debouncer, rx))
276    }
277}
278
279impl AsyncDebouncer<RecommendedWatcher> {
280    /// Creates a new debounced watcher with the recommended watcher implementation.
281    /// The timeout specifies the amount of time that must elapse before an event is emitted, or a
282    /// continuous event is sent (if there still are events incoming for the specific path).
283    /// If tick_rate is set to None, then a tick rate will be selected that is less than the provided timeout.
284    pub async fn new<F: AsyncDebounceEventHandler + Send + 'static>(
285        timeout: Duration,
286        tick_rate: Option<Duration>,
287        event_handler: F,
288    ) -> Result<Self, Error> {
289        AsyncDebouncer::new_with_opts(timeout, tick_rate, event_handler, notify::Config::default())
290            .await
291    }
292
293    /// Creates a new debounced watcher with the recommended watcher implementation.
294    /// The timeout specifies the amount of time that must elapse before an event is emitted, or a
295    /// continuous event is sent (if there still are events incoming for the specific path).
296    /// If tick_rate is set to None, then a tick rate will be selected that is less than the provided timeout.
297    /// A handle to the debouncer is returned alongside a channel that can be used to receive events.
298    pub async fn new_with_channel(
299        timeout: Duration,
300        tick_rate: Option<Duration>,
301    ) -> Result<
302        (
303            Self,
304            tokio::sync::mpsc::Receiver<Result<Vec<DebouncedEvent>, Vec<NotifyError>>>,
305        ),
306        Error,
307    > {
308        AsyncDebouncer::new_with_channel_and_opts::<
309            tokio::sync::mpsc::Sender<Result<Vec<DebouncedEvent>, Vec<NotifyError>>>,
310        >(timeout, tick_rate, notify::Config::default())
311        .await
312    }
313}