unistore_watcher/
watcher.rs1use crate::config::WatcherConfig;
8use crate::deps::*;
9use crate::error::WatcherError;
10use crate::event::FileEvent;
11use std::sync::atomic::{AtomicBool, Ordering};
12use tracing::{error, info, warn};
13
14pub struct FileWatcher {
16 watcher: RecommendedWatcher,
18 config: WatcherConfig,
20 watched_paths: Arc<RwLock<HashSet<PathBuf>>>,
22 event_tx: broadcast::Sender<FileEvent>,
24 closed: Arc<AtomicBool>,
26}
27
28impl FileWatcher {
29 pub fn new(config: WatcherConfig) -> Result<Self, WatcherError> {
31 let (event_tx, _) = broadcast::channel(config.buffer_size);
32 let event_tx_clone = event_tx.clone();
33
34 let watcher = notify::recommended_watcher(move |res: Result<NotifyEvent, notify::Error>| {
36 match res {
37 Ok(event) => {
38 if let Some(file_event) = FileEvent::from_notify(event) {
39 let _ = event_tx_clone.send(file_event);
41 }
42 }
43 Err(e) => {
44 error!(error = ?e, "监控错误");
45 }
46 }
47 })
48 .map_err(WatcherError::NotifyError)?;
49
50 info!("文件监控器已创建");
51
52 Ok(Self {
53 watcher,
54 config,
55 watched_paths: Arc::new(RwLock::new(HashSet::new())),
56 event_tx,
57 closed: Arc::new(AtomicBool::new(false)),
58 })
59 }
60
61 pub fn with_defaults() -> Result<Self, WatcherError> {
63 Self::new(WatcherConfig::default())
64 }
65
66 pub fn watch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), WatcherError> {
68 let path = path.as_ref().to_path_buf();
69
70 if self.closed.load(Ordering::SeqCst) {
71 return Err(WatcherError::WatcherClosed);
72 }
73
74 if !path.exists() {
76 return Err(WatcherError::PathNotFound(path));
77 }
78
79 {
81 let watched = self.watched_paths.read();
82 if watched.contains(&path) {
83 return Err(WatcherError::AlreadyWatched(path));
84 }
85 }
86
87 let mode = if self.config.recursive {
89 RecursiveMode::Recursive
90 } else {
91 RecursiveMode::NonRecursive
92 };
93
94 self.watcher.watch(&path, mode)?;
96
97 self.watched_paths.write().insert(path.clone());
99
100 info!(path = ?path, recursive = self.config.recursive, "添加监控路径");
101
102 Ok(())
103 }
104
105 pub fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), WatcherError> {
107 let path = path.as_ref().to_path_buf();
108
109 if self.closed.load(Ordering::SeqCst) {
110 return Err(WatcherError::WatcherClosed);
111 }
112
113 {
115 let watched = self.watched_paths.read();
116 if !watched.contains(&path) {
117 return Err(WatcherError::NotWatched(path));
118 }
119 }
120
121 self.watcher.unwatch(&path)?;
123
124 self.watched_paths.write().remove(&path);
126
127 info!(path = ?path, "移除监控路径");
128
129 Ok(())
130 }
131
132 pub fn subscribe(&self) -> broadcast::Receiver<FileEvent> {
134 self.event_tx.subscribe()
135 }
136
137 pub fn watched_paths(&self) -> Vec<PathBuf> {
139 self.watched_paths.read().iter().cloned().collect()
140 }
141
142 pub fn is_watching<P: AsRef<Path>>(&self, path: P) -> bool {
144 self.watched_paths.read().contains(path.as_ref())
145 }
146
147 pub fn watch_count(&self) -> usize {
149 self.watched_paths.read().len()
150 }
151
152 pub fn is_closed(&self) -> bool {
154 self.closed.load(Ordering::SeqCst)
155 }
156
157 pub fn close(&mut self) {
159 if self.closed.swap(true, Ordering::SeqCst) {
160 return; }
162
163 let paths: Vec<PathBuf> = self.watched_paths.read().iter().cloned().collect();
165 for path in paths {
166 if let Err(e) = self.watcher.unwatch(&path) {
167 warn!(path = ?path, error = ?e, "移除监控失败");
168 }
169 }
170
171 self.watched_paths.write().clear();
172 info!("文件监控器已关闭");
173 }
174
175 pub fn config(&self) -> &WatcherConfig {
177 &self.config
178 }
179}
180
181impl Drop for FileWatcher {
182 fn drop(&mut self) {
183 self.close();
184 }
185}
186
187pub async fn watch_path<P, F>(path: P, mut handler: F) -> Result<(), WatcherError>
189where
190 P: AsRef<Path>,
191 F: FnMut(FileEvent) + Send + 'static,
192{
193 let mut watcher = FileWatcher::with_defaults()?;
194 watcher.watch(path)?;
195
196 let mut rx = watcher.subscribe();
197
198 while let Ok(event) = rx.recv().await {
199 handler(event);
200 }
201
202 Ok(())
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use tempfile::tempdir;
209
210 #[test]
211 fn test_watcher_creation() {
212 let watcher = FileWatcher::with_defaults();
213 assert!(watcher.is_ok());
214 }
215
216 #[test]
217 fn test_watch_nonexistent_path() {
218 let mut watcher = FileWatcher::with_defaults().unwrap();
219 let result = watcher.watch("/nonexistent/path/that/does/not/exist");
220 assert!(result.is_err());
221 assert!(matches!(result.unwrap_err(), WatcherError::PathNotFound(_)));
222 }
223
224 #[test]
225 fn test_watch_and_unwatch() {
226 let dir = tempdir().unwrap();
227 let mut watcher = FileWatcher::with_defaults().unwrap();
228
229 watcher.watch(dir.path()).unwrap();
231 assert!(watcher.is_watching(dir.path()));
232 assert_eq!(watcher.watch_count(), 1);
233
234 watcher.unwatch(dir.path()).unwrap();
236 assert!(!watcher.is_watching(dir.path()));
237 assert_eq!(watcher.watch_count(), 0);
238 }
239
240 #[test]
241 fn test_double_watch() {
242 let dir = tempdir().unwrap();
243 let mut watcher = FileWatcher::with_defaults().unwrap();
244
245 watcher.watch(dir.path()).unwrap();
246 let result = watcher.watch(dir.path());
247 assert!(result.is_err());
248 assert!(matches!(result.unwrap_err(), WatcherError::AlreadyWatched(_)));
249 }
250
251 #[test]
252 fn test_close_watcher() {
253 let dir = tempdir().unwrap();
254 let mut watcher = FileWatcher::with_defaults().unwrap();
255
256 watcher.watch(dir.path()).unwrap();
257 assert!(!watcher.is_closed());
258
259 watcher.close();
260 assert!(watcher.is_closed());
261 assert_eq!(watcher.watch_count(), 0);
262 }
263}