testx/watcher/
file_watcher.rs1use std::path::{Path, PathBuf};
2use std::sync::mpsc::{self, Receiver, TryRecvError};
3use std::time::Duration;
4
5use notify::{RecursiveMode, Watcher};
6
7use crate::config::WatchConfig;
8use crate::watcher::debouncer::Debouncer;
9use crate::watcher::glob::{GlobPattern, should_ignore};
10
11#[allow(dead_code)]
14enum WatcherBackend {
15 Native(notify::RecommendedWatcher),
16 Poll(notify::PollWatcher),
17}
18
19pub struct FileWatcher {
21 rx: Receiver<PathBuf>,
23 debouncer: Debouncer,
25 ignore_patterns: Vec<GlobPattern>,
27 root: PathBuf,
29 _backend: WatcherBackend,
31}
32
33impl FileWatcher {
34 pub fn new(root: &Path, config: &WatchConfig) -> std::io::Result<Self> {
41 let ignore_patterns: Vec<GlobPattern> =
42 config.ignore.iter().map(|p| GlobPattern::new(p)).collect();
43
44 let debouncer = Debouncer::new(config.debounce_ms);
45 let (tx, rx) = mpsc::channel();
46
47 let handler = move |res: Result<notify::Event, notify::Error>| {
49 if let Ok(event) = res {
50 for path in event.paths {
51 let _ = tx.send(path);
52 }
53 }
54 };
55
56 let backend = if let Some(poll_ms) = config.poll_ms {
57 let poll_config =
58 notify::Config::default().with_poll_interval(Duration::from_millis(poll_ms));
59 let mut w =
60 notify::PollWatcher::new(handler, poll_config).map_err(std::io::Error::other)?;
61 w.watch(root, RecursiveMode::Recursive)
62 .map_err(std::io::Error::other)?;
63 WatcherBackend::Poll(w)
64 } else {
65 let mut w = notify::recommended_watcher(handler).map_err(std::io::Error::other)?;
66 w.watch(root, RecursiveMode::Recursive)
67 .map_err(std::io::Error::other)?;
68 WatcherBackend::Native(w)
69 };
70
71 Ok(Self {
72 rx,
73 debouncer,
74 ignore_patterns,
75 root: root.to_path_buf(),
76 _backend: backend,
77 })
78 }
79
80 pub fn wait_for_changes(&mut self) -> Vec<PathBuf> {
83 loop {
84 if self.drain_pending() {
86 if self.debouncer.has_pending() {
88 return self.debouncer.flush();
89 }
90 return Vec::new();
91 }
92
93 if self.debouncer.should_flush() {
95 return self.debouncer.flush();
96 }
97
98 if self.debouncer.has_pending() {
100 let remaining = self.debouncer.time_remaining();
101 if remaining > Duration::ZERO {
102 std::thread::sleep(remaining.min(Duration::from_millis(50)));
103 continue;
104 }
105 return self.debouncer.flush();
106 }
107
108 match self.rx.recv_timeout(Duration::from_millis(200)) {
110 Ok(path) => {
111 if !self.should_ignore(&path) {
112 self.debouncer.add(path);
113 }
114 }
115 Err(mpsc::RecvTimeoutError::Timeout) => continue,
116 Err(mpsc::RecvTimeoutError::Disconnected) => return Vec::new(),
117 }
118 }
119 }
120
121 pub fn poll_changes(&mut self, timeout: Duration) -> Vec<PathBuf> {
126 let deadline = std::time::Instant::now() + timeout;
127
128 loop {
129 if self.drain_pending() {
131 if self.debouncer.has_pending() {
132 return self.debouncer.flush();
133 }
134 return Vec::new();
135 }
136
137 if self.debouncer.should_flush() {
139 return self.debouncer.flush();
140 }
141
142 if self.debouncer.has_pending() {
143 let remaining = self.debouncer.time_remaining();
144 if remaining > Duration::ZERO {
145 std::thread::sleep(remaining.min(Duration::from_millis(50)));
146 continue;
147 }
148 return self.debouncer.flush();
149 }
150
151 let now = std::time::Instant::now();
153 if now >= deadline {
154 return Vec::new();
155 }
156
157 let wait = (deadline - now).min(Duration::from_millis(100));
158 match self.rx.recv_timeout(wait) {
159 Ok(path) => {
160 if !self.should_ignore(&path) {
161 self.debouncer.add(path);
162 }
163 }
164 Err(mpsc::RecvTimeoutError::Timeout) => {}
165 Err(mpsc::RecvTimeoutError::Disconnected) => return Vec::new(),
166 }
167 }
168 }
169
170 fn drain_pending(&mut self) -> bool {
173 loop {
174 match self.rx.try_recv() {
175 Ok(path) => {
176 if !self.should_ignore(&path) {
177 self.debouncer.add(path);
178 }
179 }
180 Err(TryRecvError::Empty) => return false,
181 Err(TryRecvError::Disconnected) => return true,
182 }
183 }
184 }
185
186 fn should_ignore(&self, path: &Path) -> bool {
188 let relative = path
189 .strip_prefix(&self.root)
190 .unwrap_or(path)
191 .to_string_lossy();
192
193 should_ignore(&relative, &self.ignore_patterns)
194 }
195
196 pub fn root(&self) -> &Path {
198 &self.root
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205
206 #[test]
207 fn file_watcher_construction() {
208 let dir = tempfile::tempdir().unwrap();
209 let config = WatchConfig::default();
210 let watcher = FileWatcher::new(dir.path(), &config);
211 assert!(watcher.is_ok());
212 let watcher = watcher.unwrap();
213 assert_eq!(watcher.root(), dir.path());
214 }
215
216 #[test]
217 fn file_watcher_poll_fallback() {
218 let dir = tempfile::tempdir().unwrap();
219 let config = WatchConfig {
220 poll_ms: Some(200),
221 ..WatchConfig::default()
222 };
223 let watcher = FileWatcher::new(dir.path(), &config);
224 assert!(watcher.is_ok());
225 }
226
227 #[test]
228 fn file_watcher_ignore_patterns() {
229 let dir = tempfile::tempdir().unwrap();
230 let config = WatchConfig {
231 ignore: vec!["*.log".to_string(), "target/**".to_string()],
232 ..WatchConfig::default()
233 };
234 let watcher = FileWatcher::new(dir.path(), &config).unwrap();
235 assert!(watcher.should_ignore(&dir.path().join("build.log")));
236 assert!(!watcher.should_ignore(&dir.path().join("main.rs")));
237 }
238
239 #[test]
240 fn file_watcher_nonexistent_dir() {
241 let config = WatchConfig::default();
242 let result = FileWatcher::new(Path::new("/nonexistent/dir"), &config);
243 assert!(result.is_err());
244 }
245}