1use crate::error::Error;
2use std::{
3 collections::HashMap,
4 path::PathBuf,
5 sync::{
6 atomic::{AtomicBool, Ordering},
7 Arc, Mutex,
8 },
9 time::{Duration, Instant},
10};
11
12pub use notify;
13use notify::{Error as NotifyError, Event, RecommendedWatcher, Watcher};
14
15pub mod error;
16
17struct EventData {
19 insert: Instant,
21 update: Instant,
23}
24
25impl EventData {
26 fn new_any() -> Self {
27 let time = Instant::now();
28 Self {
29 insert: time,
30 update: time,
31 }
32 }
33}
34
35#[derive(Clone, Debug, Eq, Hash, PartialEq)]
37#[non_exhaustive]
38pub enum DebouncedEventKind {
39 Any,
41 AnyContinuous,
43}
44
45#[derive(Clone, Debug, Eq, Hash, PartialEq)]
49pub struct DebouncedEvent {
50 pub path: PathBuf,
52 pub kind: DebouncedEventKind,
54 pub event: Event,
56}
57
58impl DebouncedEvent {
59 fn new(path: PathBuf, kind: DebouncedEventKind, event: Event) -> Self {
60 Self { path, kind, event }
61 }
62}
63
64impl AsRef<Event> for DebouncedEvent {
65 fn as_ref(&self) -> &Event {
66 &self.event
67 }
68}
69
70type DebounceData = Arc<Mutex<DebounceDataInner>>;
71
72#[derive(Default)]
73struct DebounceDataInner {
74 d: HashMap<PathBuf, (Event, EventData)>,
75 timeout: Duration,
76 e: Vec<NotifyError>,
77}
78
79impl DebounceDataInner {
80 pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
82 let mut events_expired = Vec::with_capacity(self.d.len());
83 let mut data_back = HashMap::with_capacity(self.d.len());
84
85 for (k, (e, v)) in self.d.drain() {
87 if v.update.elapsed() >= self.timeout {
88 events_expired.push(DebouncedEvent::new(k, DebouncedEventKind::Any, e));
89 } else if v.insert.elapsed() >= self.timeout {
90 data_back.insert(k.clone(), (e.clone(), v));
91 events_expired.push(DebouncedEvent::new(k, DebouncedEventKind::AnyContinuous, e));
92 } else {
93 data_back.insert(k, (e, v));
94 }
95 }
96
97 self.d = data_back;
98 events_expired
99 }
100
101 pub fn errors(&mut self) -> Vec<NotifyError> {
103 std::mem::take(&mut self.e)
104 }
105
106 pub fn add_error(&mut self, e: NotifyError) {
108 self.e.push(e);
109 }
110
111 pub fn add_event(&mut self, e: Event) {
113 for path in e.paths.clone().into_iter() {
114 if let Some((_, v)) = self.d.get_mut(&path) {
115 v.update = Instant::now();
116 } else {
117 self.d.insert(path, (e.clone(), EventData::new_any()));
118 }
119 }
120 }
121}
122
123#[async_trait::async_trait]
124pub trait AsyncDebounceEventHandler {
125 async fn handle_event(&mut self, event: Result<Vec<DebouncedEvent>, Vec<NotifyError>>);
126}
127
128#[async_trait::async_trait]
129impl<F> AsyncDebounceEventHandler for F
130where
131 F: FnMut(Result<Vec<DebouncedEvent>, Vec<NotifyError>>) + Send + 'static,
132{
133 async fn handle_event(&mut self, event: Result<Vec<DebouncedEvent>, Vec<NotifyError>>) {
134 self(event)
135 }
136}
137
138#[async_trait::async_trait]
139impl AsyncDebounceEventHandler
140 for tokio::sync::mpsc::Sender<Result<Vec<DebouncedEvent>, Vec<NotifyError>>>
141{
142 async fn handle_event(&mut self, event: Result<Vec<DebouncedEvent>, Vec<NotifyError>>) {
143 let _ = self.send(event).await;
144 }
145}
146
147pub struct AsyncDebouncer<T: Watcher> {
148 stop: Arc<AtomicBool>,
149 watcher: T,
150 debouncer_task: Option<tokio::task::JoinHandle<()>>,
151}
152
153impl<T: Watcher> AsyncDebouncer<T> {
154 pub async fn stop(mut self) {
155 self.set_stop();
156 if let Some(t) = self.debouncer_task.take() {
157 let _ = t.await;
158 }
159 }
160
161 fn set_stop(&self) {
162 self.stop.store(true, Ordering::Relaxed);
163 }
164
165 pub fn watcher(&mut self) -> &mut dyn Watcher {
166 &mut self.watcher
167 }
168}
169
170impl<T: Watcher> Drop for AsyncDebouncer<T> {
171 fn drop(&mut self) {
172 self.set_stop();
174 }
175}
176
177impl<T: Watcher> AsyncDebouncer<T> {
178 pub async fn new_with_opts<F: AsyncDebounceEventHandler + Send + 'static>(
183 timeout: Duration,
184 tick_rate: Option<Duration>,
185 mut event_handler: F,
186 config: notify::Config,
187 ) -> Result<Self, Error> {
188 let data = DebounceData::default();
189
190 let stop = Arc::new(AtomicBool::new(false));
191
192 let tick_div = 4;
193 let tick = match tick_rate {
194 Some(v) => {
195 if v > timeout {
196 return Err(Error::InvalidTickRate(v, timeout));
197 }
198 v
199 }
200 None => timeout
201 .checked_div(tick_div)
202 .ok_or(Error::FailedToCalculateTick(timeout, tick_div))?,
203 };
204
205 {
206 let mut data_w = data.lock().unwrap();
207 data_w.timeout = timeout;
208 }
209
210 let data_c = data.clone();
211 let stop_c = stop.clone();
212 let debouncer_task = tokio::spawn(async move {
213 loop {
214 if stop_c.load(Ordering::Acquire) {
215 break;
216 }
217 tokio::time::sleep(tick).await;
218 let send_data;
219 let errors: Vec<NotifyError>;
220 {
221 let mut lock = data_c.lock().expect("can't lock debouncer data");
222 send_data = lock.debounced_events();
223 errors = lock.errors();
224 }
225 if !send_data.is_empty() {
226 event_handler.handle_event(Ok(send_data)).await;
227 }
228 if !errors.is_empty() {
229 event_handler.handle_event(Err(errors)).await;
230 }
231 }
232 });
233
234 let watcher = T::new(
235 move |e: Result<Event, NotifyError>| {
236 let mut lock = data.lock().expect("can't lock debouncer data");
237
238 match e {
239 Ok(e) => lock.add_event(e),
240 Err(e) => lock.add_error(e),
242 }
243 },
244 config,
245 )?;
246
247 let guard = AsyncDebouncer {
248 watcher,
249 debouncer_task: Some(debouncer_task),
250 stop,
251 };
252
253 Ok(guard)
254 }
255
256 pub async fn new_with_channel_and_opts(
262 timeout: Duration,
263 tick_rate: Option<Duration>,
264 config: notify::Config,
265 ) -> Result<
266 (
267 Self,
268 tokio::sync::mpsc::Receiver<Result<Vec<DebouncedEvent>, Vec<NotifyError>>>,
269 ),
270 Error,
271 > {
272 let (tx, rx) = tokio::sync::mpsc::channel(1);
273 let debouncer = Self::new_with_opts(timeout, tick_rate, tx, config).await?;
274 Ok((debouncer, rx))
275 }
276}
277
278impl AsyncDebouncer<RecommendedWatcher> {
279 pub async fn new<F: AsyncDebounceEventHandler + Send + 'static>(
284 timeout: Duration,
285 tick_rate: Option<Duration>,
286 event_handler: F,
287 ) -> Result<Self, Error> {
288 AsyncDebouncer::new_with_opts(timeout, tick_rate, event_handler, notify::Config::default())
289 .await
290 }
291
292 pub async fn new_with_channel(
298 timeout: Duration,
299 tick_rate: Option<Duration>,
300 ) -> Result<
301 (
302 Self,
303 tokio::sync::mpsc::Receiver<Result<Vec<DebouncedEvent>, Vec<NotifyError>>>,
304 ),
305 Error,
306 > {
307 AsyncDebouncer::new_with_channel_and_opts(timeout, tick_rate, notify::Config::default())
308 .await
309 }
310}