nym_async_file_watcher/
lib.rs1use futures::StreamExt;
5use futures::channel::mpsc;
6use notify::event::{DataChange, MetadataKind, ModifyKind};
7use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use std::time::Duration;
11use tokio::time::Instant;
12
13pub use notify::{Error as NotifyError, Result as NotifyResult};
14
15pub type FileWatcherEventSender = mpsc::UnboundedSender<Event>;
16pub type FileWatcherEventReceiver = mpsc::UnboundedReceiver<Event>;
17
18pub struct AsyncFileWatcher {
20 path: PathBuf,
21 watcher: RecommendedWatcher,
22 is_watching: bool,
23 filters: Option<Vec<EventKind>>,
24 last_received: HashMap<EventKind, Instant>,
25 tick_duration: Duration,
26
27 inner_rx: mpsc::UnboundedReceiver<NotifyResult<Event>>,
28 event_sender: FileWatcherEventSender,
29}
30
31impl AsyncFileWatcher {
32 pub fn new_file_changes_watcher<P: AsRef<Path>>(
33 path: P,
34 event_sender: FileWatcherEventSender,
35 ) -> NotifyResult<Self> {
36 Self::new(
37 path,
38 event_sender,
39 Some(vec![
40 EventKind::Modify(ModifyKind::Data(DataChange::Content)),
41 EventKind::Modify(ModifyKind::Data(DataChange::Any)),
42 EventKind::Modify(ModifyKind::Metadata(MetadataKind::Any)),
43 ]),
44 None,
45 )
46 }
47
48 pub fn new<P: AsRef<Path>>(
49 path: P,
50 event_sender: FileWatcherEventSender,
51 filters: Option<Vec<EventKind>>,
52 tick_duration: Option<Duration>,
53 ) -> NotifyResult<Self> {
54 let watcher_config = Config::default();
55 let (inner_tx, inner_rx) = mpsc::unbounded();
56 let watcher = RecommendedWatcher::new(
57 move |res| {
58 if let Err(_err) = inner_tx.unbounded_send(res) {
59 log::error!(
61 "failed to send watched file event - the received must have been dropped!"
62 );
63 }
64 },
65 watcher_config,
66 )?;
67
68 Ok(AsyncFileWatcher {
69 path: path.as_ref().to_path_buf(),
70 watcher,
71 is_watching: false,
72 filters,
73 last_received: HashMap::new(),
74 tick_duration: tick_duration.unwrap_or(Duration::from_secs(5)),
75 inner_rx,
76 event_sender,
77 })
78 }
79
80 pub fn with_filters(mut self, filters: Option<Vec<EventKind>>) -> Self {
81 self.filters = filters;
82 self
83 }
84
85 pub fn with_filter(mut self, filter: EventKind) -> Self {
86 match &mut self.filters {
87 None => {
88 self.filters = Some(vec![filter]);
89 }
90 Some(filters) => filters.push(filter),
91 }
92 self
93 }
94
95 fn should_propagate(&self, event: &Event, now: Instant) -> bool {
96 if let Some(previous) = self.last_received.get(&event.kind)
100 && now.duration_since(*previous) < self.tick_duration
101 {
102 return false;
103 }
104
105 let Some(filters) = &self.filters else {
106 return true;
107 };
108
109 for filter in filters {
110 if &event.kind == filter {
111 return true;
112 }
113 }
114 false
115 }
116
117 fn start_watching(&mut self) -> NotifyResult<()> {
118 self.is_watching = true;
119 self.watcher.watch(&self.path, RecursiveMode::NonRecursive)
120 }
121
122 fn stop_watching(&mut self) -> NotifyResult<()> {
123 self.is_watching = false;
124 self.watcher.unwatch(&self.path)
125 }
126
127 pub async fn watch(&mut self) -> NotifyResult<()> {
128 self.start_watching()?;
129
130 while let Some(event) = self.inner_rx.next().await {
131 match event {
132 Ok(event) => {
133 let now = Instant::now();
134 if self.should_propagate(&event, now) {
135 self.last_received.insert(event.kind, now);
136 if let Err(_err) = self.event_sender.unbounded_send(event) {
137 log::error!("the file watcher receiver has been dropped!");
138 }
139 } else {
140 log::debug!("will not propagate information about {event:?}");
141 }
142 }
143 Err(err) => {
144 log::error!(
147 "encountered an error while watching {:?}: {err}",
148 self.path.as_path()
149 );
150 }
151 }
152 }
153
154 self.stop_watching()
155 }
156
157 pub fn is_watching(&self) -> bool {
158 self.is_watching
159 }
160}