1use crate::error::Error;
2use std::{
3 collections::HashMap,
4 path::PathBuf,
5 sync::{
6 atomic::{AtomicBool, Ordering},
7 Arc, Mutex,
8 },
9 time::{Duration, Instant},
10};
11
12pub use notify;
13use notify::{Error as NotifyError, Event, RecommendedWatcher, Watcher};
14
15pub mod error;
16
17struct EventData {
19 insert: Instant,
21 update: Instant,
23}
24
25impl EventData {
26 fn new_any() -> Self {
27 let time = Instant::now();
28 Self {
29 insert: time,
30 update: time,
31 }
32 }
33}
34
35#[derive(Clone, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum DebouncedEventKind {
39 Any,
41 AnyContinuous,
43}
44
45#[derive(Clone, Debug, Eq, Hash, PartialEq)]
49#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
50pub struct DebouncedEvent {
51 pub path: PathBuf,
53 pub kind: DebouncedEventKind,
55 pub event: Event,
57}
58
59impl DebouncedEvent {
60 fn new(path: PathBuf, kind: DebouncedEventKind, event: Event) -> Self {
61 Self { path, kind, event }
62 }
63}
64
65impl AsRef<Event> for DebouncedEvent {
66 fn as_ref(&self) -> &Event {
67 &self.event
68 }
69}
70
71type DebounceData = Arc<Mutex<DebounceDataInner>>;
72
73#[derive(Default)]
74struct DebounceDataInner {
75 d: HashMap<PathBuf, (Event, EventData)>,
76 timeout: Duration,
77 e: Vec<NotifyError>,
78}
79
80impl DebounceDataInner {
81 pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
83 let mut events_expired = Vec::with_capacity(self.d.len());
84 let mut data_back = HashMap::with_capacity(self.d.len());
85
86 for (k, (e, v)) in self.d.drain() {
88 if v.update.elapsed() >= self.timeout {
89 events_expired.push(DebouncedEvent::new(k, DebouncedEventKind::Any, e));
90 } else if v.insert.elapsed() >= self.timeout {
91 data_back.insert(k.clone(), (e.clone(), v));
92 events_expired.push(DebouncedEvent::new(k, DebouncedEventKind::AnyContinuous, e));
93 } else {
94 data_back.insert(k, (e, v));
95 }
96 }
97
98 self.d = data_back;
99 events_expired
100 }
101
102 pub fn errors(&mut self) -> Vec<NotifyError> {
104 std::mem::take(&mut self.e)
105 }
106
107 pub fn add_error(&mut self, e: NotifyError) {
109 self.e.push(e);
110 }
111
112 pub fn add_event(&mut self, e: Event) {
114 for path in e.paths.clone().into_iter() {
115 if let Some((_, v)) = self.d.get_mut(&path) {
116 v.update = Instant::now();
117 } else {
118 self.d.insert(path, (e.clone(), EventData::new_any()));
119 }
120 }
121 }
122}
123
124#[async_trait::async_trait]
125pub trait AsyncDebounceEventHandler {
126 async fn handle_event(&mut self, event: Result<Vec<DebouncedEvent>, Vec<NotifyError>>);
127}
128
129#[async_trait::async_trait]
130impl<F> AsyncDebounceEventHandler for F
131where
132 F: FnMut(Result<Vec<DebouncedEvent>, Vec<NotifyError>>) + Send + 'static,
133{
134 async fn handle_event(&mut self, event: Result<Vec<DebouncedEvent>, Vec<NotifyError>>) {
135 self(event)
136 }
137}
138
139#[async_trait::async_trait]
140impl AsyncDebounceEventHandler
141 for tokio::sync::mpsc::Sender<Result<Vec<DebouncedEvent>, Vec<NotifyError>>>
142{
143 async fn handle_event(&mut self, event: Result<Vec<DebouncedEvent>, Vec<NotifyError>>) {
144 let _ = self.send(event).await;
145 }
146}
147
148pub struct AsyncDebouncer<T: Watcher> {
149 stop: Arc<AtomicBool>,
150 watcher: T,
151 debouncer_task: Option<tokio::task::JoinHandle<()>>,
152}
153
154impl<T: Watcher> AsyncDebouncer<T> {
155 pub async fn stop(mut self) {
156 self.set_stop();
157 if let Some(t) = self.debouncer_task.take() {
158 let _ = t.await;
159 }
160 }
161
162 fn set_stop(&self) {
163 self.stop.store(true, Ordering::Relaxed);
164 }
165
166 pub fn watcher(&mut self) -> &mut dyn Watcher {
167 &mut self.watcher
168 }
169}
170
171impl<T: Watcher> Drop for AsyncDebouncer<T> {
172 fn drop(&mut self) {
173 self.set_stop();
175 }
176}
177
178impl<T: Watcher> AsyncDebouncer<T> {
179 pub async fn new_with_opts<F: AsyncDebounceEventHandler + Send + 'static>(
184 timeout: Duration,
185 tick_rate: Option<Duration>,
186 mut event_handler: F,
187 config: notify::Config,
188 ) -> Result<Self, Error> {
189 let data = DebounceData::default();
190
191 let stop = Arc::new(AtomicBool::new(false));
192
193 let tick_div = 4;
194 let tick = match tick_rate {
195 Some(v) => {
196 if v > timeout {
197 return Err(Error::InvalidTickRate(v, timeout));
198 }
199 v
200 }
201 None => timeout
202 .checked_div(tick_div)
203 .ok_or_else(|| Error::FailedToCalculateTick(timeout, tick_div))?,
204 };
205
206 {
207 let mut data_w = data.lock().unwrap();
208 data_w.timeout = timeout;
209 }
210
211 let data_c = data.clone();
212 let stop_c = stop.clone();
213 let debouncer_task = tokio::spawn(async move {
214 loop {
215 if stop_c.load(Ordering::Acquire) {
216 break;
217 }
218 tokio::time::sleep(tick).await;
219 let send_data;
220 let errors: Vec<NotifyError>;
221 {
222 let mut lock = data_c.lock().expect("can't lock debouncer data");
223 send_data = lock.debounced_events();
224 errors = lock.errors();
225 }
226 if !send_data.is_empty() {
227 event_handler.handle_event(Ok(send_data)).await;
228 }
229 if !errors.is_empty() {
230 event_handler.handle_event(Err(errors)).await;
231 }
232 }
233 });
234
235 let watcher = T::new(
236 move |e: Result<Event, NotifyError>| {
237 let mut lock = data.lock().expect("can't lock debouncer data");
238
239 match e {
240 Ok(e) => lock.add_event(e),
241 Err(e) => lock.add_error(e),
243 }
244 },
245 config,
246 )?;
247
248 let guard = AsyncDebouncer {
249 watcher,
250 debouncer_task: Some(debouncer_task),
251 stop,
252 };
253
254 Ok(guard)
255 }
256
257 pub async fn new_with_channel_and_opts<F: AsyncDebounceEventHandler + Send + 'static>(
263 timeout: Duration,
264 tick_rate: Option<Duration>,
265 config: notify::Config,
266 ) -> Result<
267 (
268 Self,
269 tokio::sync::mpsc::Receiver<Result<Vec<DebouncedEvent>, Vec<NotifyError>>>,
270 ),
271 Error,
272 > {
273 let (tx, rx) = tokio::sync::mpsc::channel(1);
274 let debouncer = Self::new_with_opts(timeout, tick_rate, tx, config).await?;
275 Ok((debouncer, rx))
276 }
277}
278
279impl AsyncDebouncer<RecommendedWatcher> {
280 pub async fn new<F: AsyncDebounceEventHandler + Send + 'static>(
285 timeout: Duration,
286 tick_rate: Option<Duration>,
287 event_handler: F,
288 ) -> Result<Self, Error> {
289 AsyncDebouncer::new_with_opts(timeout, tick_rate, event_handler, notify::Config::default())
290 .await
291 }
292
293 pub async fn new_with_channel(
299 timeout: Duration,
300 tick_rate: Option<Duration>,
301 ) -> Result<
302 (
303 Self,
304 tokio::sync::mpsc::Receiver<Result<Vec<DebouncedEvent>, Vec<NotifyError>>>,
305 ),
306 Error,
307 > {
308 AsyncDebouncer::new_with_channel_and_opts::<
309 tokio::sync::mpsc::Sender<Result<Vec<DebouncedEvent>, Vec<NotifyError>>>,
310 >(timeout, tick_rate, notify::Config::default())
311 .await
312 }
313}