unistore_watcher/
watcher.rs

1//! 【文件监控器】- 核心监控实现
2//!
3//! 职责:
4//! - 管理文件系统监控
5//! - 处理事件分发
6
7use crate::config::WatcherConfig;
8use crate::deps::*;
9use crate::error::WatcherError;
10use crate::event::FileEvent;
11use std::sync::atomic::{AtomicBool, Ordering};
12use tracing::{error, info, warn};
13
14/// 文件监控器
15pub struct FileWatcher {
16    /// 底层 notify 监控器
17    watcher: RecommendedWatcher,
18    /// 配置
19    config: WatcherConfig,
20    /// 已监控的路径
21    watched_paths: Arc<RwLock<HashSet<PathBuf>>>,
22    /// 事件广播发送器
23    event_tx: broadcast::Sender<FileEvent>,
24    /// 是否已关闭
25    closed: Arc<AtomicBool>,
26}
27
28impl FileWatcher {
29    /// 创建新的文件监控器
30    pub fn new(config: WatcherConfig) -> Result<Self, WatcherError> {
31        let (event_tx, _) = broadcast::channel(config.buffer_size);
32        let event_tx_clone = event_tx.clone();
33
34        // 创建 notify 监控器
35        let watcher = notify::recommended_watcher(move |res: Result<NotifyEvent, notify::Error>| {
36            match res {
37                Ok(event) => {
38                    if let Some(file_event) = FileEvent::from_notify(event) {
39                        // 忽略发送失败(没有订阅者)
40                        let _ = event_tx_clone.send(file_event);
41                    }
42                }
43                Err(e) => {
44                    error!(error = ?e, "监控错误");
45                }
46            }
47        })
48        .map_err(WatcherError::NotifyError)?;
49
50        info!("文件监控器已创建");
51
52        Ok(Self {
53            watcher,
54            config,
55            watched_paths: Arc::new(RwLock::new(HashSet::new())),
56            event_tx,
57            closed: Arc::new(AtomicBool::new(false)),
58        })
59    }
60
61    /// 使用默认配置创建
62    pub fn with_defaults() -> Result<Self, WatcherError> {
63        Self::new(WatcherConfig::default())
64    }
65
66    /// 添加监控路径
67    pub fn watch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), WatcherError> {
68        let path = path.as_ref().to_path_buf();
69
70        if self.closed.load(Ordering::SeqCst) {
71            return Err(WatcherError::WatcherClosed);
72        }
73
74        // 检查路径是否存在
75        if !path.exists() {
76            return Err(WatcherError::PathNotFound(path));
77        }
78
79        // 检查是否已监控
80        {
81            let watched = self.watched_paths.read();
82            if watched.contains(&path) {
83                return Err(WatcherError::AlreadyWatched(path));
84            }
85        }
86
87        // 确定递归模式
88        let mode = if self.config.recursive {
89            RecursiveMode::Recursive
90        } else {
91            RecursiveMode::NonRecursive
92        };
93
94        // 添加监控
95        self.watcher.watch(&path, mode)?;
96
97        // 记录路径
98        self.watched_paths.write().insert(path.clone());
99
100        info!(path = ?path, recursive = self.config.recursive, "添加监控路径");
101
102        Ok(())
103    }
104
105    /// 移除监控路径
106    pub fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), WatcherError> {
107        let path = path.as_ref().to_path_buf();
108
109        if self.closed.load(Ordering::SeqCst) {
110            return Err(WatcherError::WatcherClosed);
111        }
112
113        // 检查是否已监控
114        {
115            let watched = self.watched_paths.read();
116            if !watched.contains(&path) {
117                return Err(WatcherError::NotWatched(path));
118            }
119        }
120
121        // 移除监控
122        self.watcher.unwatch(&path)?;
123
124        // 移除记录
125        self.watched_paths.write().remove(&path);
126
127        info!(path = ?path, "移除监控路径");
128
129        Ok(())
130    }
131
132    /// 获取事件接收器
133    pub fn subscribe(&self) -> broadcast::Receiver<FileEvent> {
134        self.event_tx.subscribe()
135    }
136
137    /// 获取已监控的路径列表
138    pub fn watched_paths(&self) -> Vec<PathBuf> {
139        self.watched_paths.read().iter().cloned().collect()
140    }
141
142    /// 检查路径是否被监控
143    pub fn is_watching<P: AsRef<Path>>(&self, path: P) -> bool {
144        self.watched_paths.read().contains(path.as_ref())
145    }
146
147    /// 获取监控路径数量
148    pub fn watch_count(&self) -> usize {
149        self.watched_paths.read().len()
150    }
151
152    /// 检查监控器是否已关闭
153    pub fn is_closed(&self) -> bool {
154        self.closed.load(Ordering::SeqCst)
155    }
156
157    /// 关闭监控器
158    pub fn close(&mut self) {
159        if self.closed.swap(true, Ordering::SeqCst) {
160            return; // 已关闭
161        }
162
163        // 移除所有监控
164        let paths: Vec<PathBuf> = self.watched_paths.read().iter().cloned().collect();
165        for path in paths {
166            if let Err(e) = self.watcher.unwatch(&path) {
167                warn!(path = ?path, error = ?e, "移除监控失败");
168            }
169        }
170
171        self.watched_paths.write().clear();
172        info!("文件监控器已关闭");
173    }
174
175    /// 获取配置引用
176    pub fn config(&self) -> &WatcherConfig {
177        &self.config
178    }
179}
180
181impl Drop for FileWatcher {
182    fn drop(&mut self) {
183        self.close();
184    }
185}
186
187/// 简化的监控 API
188pub async fn watch_path<P, F>(path: P, mut handler: F) -> Result<(), WatcherError>
189where
190    P: AsRef<Path>,
191    F: FnMut(FileEvent) + Send + 'static,
192{
193    let mut watcher = FileWatcher::with_defaults()?;
194    watcher.watch(path)?;
195
196    let mut rx = watcher.subscribe();
197
198    while let Ok(event) = rx.recv().await {
199        handler(event);
200    }
201
202    Ok(())
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use tempfile::tempdir;
209
210    #[test]
211    fn test_watcher_creation() {
212        let watcher = FileWatcher::with_defaults();
213        assert!(watcher.is_ok());
214    }
215
216    #[test]
217    fn test_watch_nonexistent_path() {
218        let mut watcher = FileWatcher::with_defaults().unwrap();
219        let result = watcher.watch("/nonexistent/path/that/does/not/exist");
220        assert!(result.is_err());
221        assert!(matches!(result.unwrap_err(), WatcherError::PathNotFound(_)));
222    }
223
224    #[test]
225    fn test_watch_and_unwatch() {
226        let dir = tempdir().unwrap();
227        let mut watcher = FileWatcher::with_defaults().unwrap();
228
229        // 添加监控
230        watcher.watch(dir.path()).unwrap();
231        assert!(watcher.is_watching(dir.path()));
232        assert_eq!(watcher.watch_count(), 1);
233
234        // 移除监控
235        watcher.unwatch(dir.path()).unwrap();
236        assert!(!watcher.is_watching(dir.path()));
237        assert_eq!(watcher.watch_count(), 0);
238    }
239
240    #[test]
241    fn test_double_watch() {
242        let dir = tempdir().unwrap();
243        let mut watcher = FileWatcher::with_defaults().unwrap();
244
245        watcher.watch(dir.path()).unwrap();
246        let result = watcher.watch(dir.path());
247        assert!(result.is_err());
248        assert!(matches!(result.unwrap_err(), WatcherError::AlreadyWatched(_)));
249    }
250
251    #[test]
252    fn test_close_watcher() {
253        let dir = tempdir().unwrap();
254        let mut watcher = FileWatcher::with_defaults().unwrap();
255
256        watcher.watch(dir.path()).unwrap();
257        assert!(!watcher.is_closed());
258
259        watcher.close();
260        assert!(watcher.is_closed());
261        assert_eq!(watcher.watch_count(), 0);
262    }
263}