async_watcher/
lib.rs

1use crate::error::Error;
2use std::{
3    collections::HashMap,
4    path::PathBuf,
5    sync::{
6        atomic::{AtomicBool, Ordering},
7        Arc, Mutex,
8    },
9    time::{Duration, Instant},
10};
11
12pub use notify;
13use notify::{Error as NotifyError, Event, RecommendedWatcher, Watcher};
14
15pub mod error;
16
17/// Deduplicate event data
18struct EventData {
19    /// Insertion Time
20    insert: Instant,
21    /// Last Update
22    update: Instant,
23}
24
25impl EventData {
26    fn new_any() -> Self {
27        let time = Instant::now();
28        Self {
29            insert: time,
30            update: time,
31        }
32    }
33}
34
35/// The types of events that can be debounced.
36#[derive(Clone, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum DebouncedEventKind {
39    /// No precise events
40    Any,
41    /// Event where debounce timed out (for example continuous writes)
42    AnyContinuous,
43}
44
45/// A debounced event.
46///
47/// Does not emit any specific event type on purpose, only distinguishes between an any event and a continuous any event.
48#[derive(Clone, Debug, Eq, Hash, PartialEq)]
49pub struct DebouncedEvent {
50    /// Event path
51    pub path: PathBuf,
52    /// Event kind
53    pub kind: DebouncedEventKind,
54    /// Base event from notify
55    pub event: Event,
56}
57
58impl DebouncedEvent {
59    fn new(path: PathBuf, kind: DebouncedEventKind, event: Event) -> Self {
60        Self { path, kind, event }
61    }
62}
63
64impl AsRef<Event> for DebouncedEvent {
65    fn as_ref(&self) -> &Event {
66        &self.event
67    }
68}
69
70type DebounceData = Arc<Mutex<DebounceDataInner>>;
71
72#[derive(Default)]
73struct DebounceDataInner {
74    d: HashMap<PathBuf, (Event, EventData)>,
75    timeout: Duration,
76    e: Vec<NotifyError>,
77}
78
79impl DebounceDataInner {
80    /// Retrieve a vec of debounced events, removing them if they are not continuous
81    pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
82        let mut events_expired = Vec::with_capacity(self.d.len());
83        let mut data_back = HashMap::with_capacity(self.d.len());
84
85        // TODO: drain_filter https://github.com/rust-lang/rust/issues/59618
86        for (k, (e, v)) in self.d.drain() {
87            if v.update.elapsed() >= self.timeout {
88                events_expired.push(DebouncedEvent::new(k, DebouncedEventKind::Any, e));
89            } else if v.insert.elapsed() >= self.timeout {
90                data_back.insert(k.clone(), (e.clone(), v));
91                events_expired.push(DebouncedEvent::new(k, DebouncedEventKind::AnyContinuous, e));
92            } else {
93                data_back.insert(k, (e, v));
94            }
95        }
96
97        self.d = data_back;
98        events_expired
99    }
100
101    /// Takes all currently stored errors
102    pub fn errors(&mut self) -> Vec<NotifyError> {
103        std::mem::take(&mut self.e)
104    }
105
106    /// Add an error entry to re-send later on
107    pub fn add_error(&mut self, e: NotifyError) {
108        self.e.push(e);
109    }
110
111    /// Add new event to debouncer cache
112    pub fn add_event(&mut self, e: Event) {
113        for path in e.paths.clone().into_iter() {
114            if let Some((_, v)) = self.d.get_mut(&path) {
115                v.update = Instant::now();
116            } else {
117                self.d.insert(path, (e.clone(), EventData::new_any()));
118            }
119        }
120    }
121}
122
123#[async_trait::async_trait]
124pub trait AsyncDebounceEventHandler {
125    async fn handle_event(&mut self, event: Result<Vec<DebouncedEvent>, Vec<NotifyError>>);
126}
127
128#[async_trait::async_trait]
129impl<F> AsyncDebounceEventHandler for F
130where
131    F: FnMut(Result<Vec<DebouncedEvent>, Vec<NotifyError>>) + Send + 'static,
132{
133    async fn handle_event(&mut self, event: Result<Vec<DebouncedEvent>, Vec<NotifyError>>) {
134        self(event)
135    }
136}
137
138#[async_trait::async_trait]
139impl AsyncDebounceEventHandler
140    for tokio::sync::mpsc::Sender<Result<Vec<DebouncedEvent>, Vec<NotifyError>>>
141{
142    async fn handle_event(&mut self, event: Result<Vec<DebouncedEvent>, Vec<NotifyError>>) {
143        let _ = self.send(event).await;
144    }
145}
146
147pub struct AsyncDebouncer<T: Watcher> {
148    stop: Arc<AtomicBool>,
149    watcher: T,
150    debouncer_task: Option<tokio::task::JoinHandle<()>>,
151}
152
153impl<T: Watcher> AsyncDebouncer<T> {
154    pub async fn stop(mut self) {
155        self.set_stop();
156        if let Some(t) = self.debouncer_task.take() {
157            let _ = t.await;
158        }
159    }
160
161    fn set_stop(&self) {
162        self.stop.store(true, Ordering::Relaxed);
163    }
164
165    pub fn watcher(&mut self) -> &mut dyn Watcher {
166        &mut self.watcher
167    }
168}
169
170impl<T: Watcher> Drop for AsyncDebouncer<T> {
171    fn drop(&mut self) {
172        // don't block on drop
173        self.set_stop();
174    }
175}
176
177impl<T: Watcher> AsyncDebouncer<T> {
178    /// Creates a new debounced watcher with custom configuration.
179    /// The timeout specifies the amount of time that must elapse before an event is emitted, or a
180    /// continuous event is sent (if there still are events incoming for the specific path).
181    /// If tick_rate is set to None, then a tick rate will be selected that is less than the provided timeout.
182    pub async fn new_with_opts<F: AsyncDebounceEventHandler + Send + 'static>(
183        timeout: Duration,
184        tick_rate: Option<Duration>,
185        mut event_handler: F,
186        config: notify::Config,
187    ) -> Result<Self, Error> {
188        let data = DebounceData::default();
189
190        let stop = Arc::new(AtomicBool::new(false));
191
192        let tick_div = 4;
193        let tick = match tick_rate {
194            Some(v) => {
195                if v > timeout {
196                    return Err(Error::InvalidTickRate(v, timeout));
197                }
198                v
199            }
200            None => timeout
201                .checked_div(tick_div)
202                .ok_or(Error::FailedToCalculateTick(timeout, tick_div))?,
203        };
204
205        {
206            let mut data_w = data.lock().unwrap();
207            data_w.timeout = timeout;
208        }
209
210        let data_c = data.clone();
211        let stop_c = stop.clone();
212        let debouncer_task = tokio::spawn(async move {
213            loop {
214                if stop_c.load(Ordering::Acquire) {
215                    break;
216                }
217                tokio::time::sleep(tick).await;
218                let send_data;
219                let errors: Vec<NotifyError>;
220                {
221                    let mut lock = data_c.lock().expect("can't lock debouncer data");
222                    send_data = lock.debounced_events();
223                    errors = lock.errors();
224                }
225                if !send_data.is_empty() {
226                    event_handler.handle_event(Ok(send_data)).await;
227                }
228                if !errors.is_empty() {
229                    event_handler.handle_event(Err(errors)).await;
230                }
231            }
232        });
233
234        let watcher = T::new(
235            move |e: Result<Event, NotifyError>| {
236                let mut lock = data.lock().expect("can't lock debouncer data");
237
238                match e {
239                    Ok(e) => lock.add_event(e),
240                    // errors are stored and sent later on
241                    Err(e) => lock.add_error(e),
242                }
243            },
244            config,
245        )?;
246
247        let guard = AsyncDebouncer {
248            watcher,
249            debouncer_task: Some(debouncer_task),
250            stop,
251        };
252
253        Ok(guard)
254    }
255
256    /// Creates a new debounced watcher with custom configuration.
257    /// The timeout specifies the amount of time that must elapse before an event is emitted, or a
258    /// continuous event is sent (if there still are events incoming for the specific path).
259    /// If tick_rate is set to None, then a tick rate will be selected that is less than the provided timeout.
260    /// A handle to the debouncer is returned alongside a channel that can be used to receive events.
261    pub async fn new_with_channel_and_opts(
262        timeout: Duration,
263        tick_rate: Option<Duration>,
264        config: notify::Config,
265    ) -> Result<
266        (
267            Self,
268            tokio::sync::mpsc::Receiver<Result<Vec<DebouncedEvent>, Vec<NotifyError>>>,
269        ),
270        Error,
271    > {
272        let (tx, rx) = tokio::sync::mpsc::channel(1);
273        let debouncer = Self::new_with_opts(timeout, tick_rate, tx, config).await?;
274        Ok((debouncer, rx))
275    }
276}
277
278impl AsyncDebouncer<RecommendedWatcher> {
279    /// Creates a new debounced watcher with the recommended watcher implementation.
280    /// The timeout specifies the amount of time that must elapse before an event is emitted, or a
281    /// continuous event is sent (if there still are events incoming for the specific path).
282    /// If tick_rate is set to None, then a tick rate will be selected that is less than the provided timeout.
283    pub async fn new<F: AsyncDebounceEventHandler + Send + 'static>(
284        timeout: Duration,
285        tick_rate: Option<Duration>,
286        event_handler: F,
287    ) -> Result<Self, Error> {
288        AsyncDebouncer::new_with_opts(timeout, tick_rate, event_handler, notify::Config::default())
289            .await
290    }
291
292    /// Creates a new debounced watcher with the recommended watcher implementation.
293    /// The timeout specifies the amount of time that must elapse before an event is emitted, or a
294    /// continuous event is sent (if there still are events incoming for the specific path).
295    /// If tick_rate is set to None, then a tick rate will be selected that is less than the provided timeout.
296    /// A handle to the debouncer is returned alongside a channel that can be used to receive events.
297    pub async fn new_with_channel(
298        timeout: Duration,
299        tick_rate: Option<Duration>,
300    ) -> Result<
301        (
302            Self,
303            tokio::sync::mpsc::Receiver<Result<Vec<DebouncedEvent>, Vec<NotifyError>>>,
304        ),
305        Error,
306    > {
307        AsyncDebouncer::new_with_channel_and_opts(timeout, tick_rate, notify::Config::default())
308            .await
309    }
310}