1use std::{
2 sync::mpsc::{self, Receiver},
3 time::Duration,
4};
5
6use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
7
8use crate::{
9 error::{Result, WithWatchError},
10 snapshot::WatchInput,
11};
12
13#[derive(Debug, Default, Clone, Copy)]
14pub struct CollectedEvents {
15 pub event_count: usize,
16 pub path_count: usize,
17 pub error_count: usize,
18}
19
20pub struct WatchLoop {
21 _watcher: RecommendedWatcher,
22 rx: Receiver<notify::Result<Event>>,
23}
24
25impl WatchLoop {
26 pub fn new(inputs: &[WatchInput]) -> Result<Self> {
27 let (tx, rx) = mpsc::channel();
28 let mut watcher = notify::recommended_watcher(move |event| {
29 let _ = tx.send(event);
30 })
31 .map_err(WithWatchError::WatcherCreate)?;
32
33 let mut watched_anchors = Vec::new();
34 for input in inputs {
35 let anchor = input.watch_anchor().to_path_buf();
36 if watched_anchors.contains(&anchor) {
37 continue;
38 }
39 watcher
40 .watch(&anchor, RecursiveMode::Recursive)
41 .map_err(|source| WithWatchError::WatchPath {
42 path: anchor.clone(),
43 source,
44 })?;
45 watched_anchors.push(anchor);
46 }
47
48 Ok(Self {
49 _watcher: watcher,
50 rx,
51 })
52 }
53
54 pub fn collect_events(
55 &mut self,
56 timeout: Duration,
57 debounce_window: Duration,
58 ) -> Option<CollectedEvents> {
59 let first = self.rx.recv_timeout(timeout).ok()?;
60 let mut collected = CollectedEvents::default();
61 accumulate_event(first, &mut collected);
62
63 while let Ok(event) = self.rx.recv_timeout(debounce_window) {
64 accumulate_event(event, &mut collected);
65 }
66
67 Some(collected)
68 }
69}
70
71fn accumulate_event(event: notify::Result<Event>, collected: &mut CollectedEvents) {
72 collected.event_count += 1;
73 match event {
74 Ok(event) => {
75 collected.path_count += event.paths.len();
76 }
77 Err(_) => {
78 collected.error_count += 1;
79 }
80 }
81}