1use anyhow::{Context, Result};
35use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
36use std::path::{Path, PathBuf};
37use std::sync::mpsc::{Receiver, TryRecvError, channel};
38use std::time::Duration;
39
40#[derive(Debug, Clone, PartialEq, Eq)]
42pub enum FileChange {
43 Created(PathBuf),
45 Modified(PathBuf),
47 Deleted(PathBuf),
49}
50
51pub struct FileWatcher {
58 _watcher: RecommendedWatcher,
60 receiver: Receiver<Result<Event, notify::Error>>,
62 root_path: PathBuf,
64}
65
66impl FileWatcher {
67 pub fn new(root_path: &Path) -> Result<Self> {
79 let (tx, rx) = channel();
80
81 let mut watcher = notify::recommended_watcher(move |res| {
82 let _ = tx.send(res);
84 })
85 .context("Failed to create file system watcher")?;
86
87 watcher
89 .watch(root_path, RecursiveMode::Recursive)
90 .with_context(|| format!("Failed to watch directory: {}", root_path.display()))?;
91
92 log::info!("File watcher started for: {}", root_path.display());
93
94 Ok(Self {
95 _watcher: watcher,
96 receiver: rx,
97 root_path: root_path.to_path_buf(),
98 })
99 }
100
101 #[must_use]
110 pub fn poll_changes(&self) -> Vec<FileChange> {
111 let mut changes = Vec::new();
112
113 loop {
115 match self.receiver.try_recv() {
116 Ok(Ok(event)) => {
117 changes.extend(Self::process_event(event));
118 }
119 Ok(Err(e)) => {
120 log::warn!("File watcher error: {e}");
121 }
122 Err(TryRecvError::Empty) => {
123 break;
125 }
126 Err(TryRecvError::Disconnected) => {
127 log::error!("File watcher channel disconnected");
128 break;
129 }
130 }
131 }
132
133 changes
134 }
135
136 pub fn wait_for_change(&self) -> Result<Vec<FileChange>> {
149 let event = self
151 .receiver
152 .recv()
153 .context("File watcher channel disconnected")?
154 .context("File watcher error")?;
155
156 let mut changes = Self::process_event(event);
157
158 changes.extend(self.poll_changes());
160
161 Ok(changes)
162 }
163
164 pub fn wait_with_debounce(&self, debounce_duration: Duration) -> Result<Vec<FileChange>> {
184 let mut changes = self.wait_for_change()?;
186
187 changes.extend(self.wait_until(debounce_duration));
189
190 Ok(Self::deduplicate_changes(changes))
192 }
193
194 #[must_use]
204 pub fn wait_until(&self, duration: Duration) -> Vec<FileChange> {
205 let deadline = std::time::Instant::now() + duration;
206 let mut changes = Vec::new();
207
208 while std::time::Instant::now() < deadline {
209 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
211 let poll_interval = Duration::from_millis(10).min(remaining);
212
213 match self.receiver.recv_timeout(poll_interval) {
214 Ok(Ok(event)) => {
215 changes.extend(Self::process_event(event));
216 }
217 Ok(Err(e)) => {
218 log::warn!("File watcher error: {e}");
219 }
220 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
221 }
223 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
224 log::error!("File watcher channel disconnected");
225 break;
226 }
227 }
228 }
229
230 changes
231 }
232
233 #[must_use]
235 pub fn root_path(&self) -> &Path {
236 &self.root_path
237 }
238
239 fn process_event(event: Event) -> Vec<FileChange> {
244 let mut changes = Vec::new();
245
246 match event.kind {
247 EventKind::Create(_) => {
248 for path in event.paths {
249 if path.is_file() {
251 log::debug!("File created: {}", path.display());
252 changes.push(FileChange::Created(path));
253 } else {
254 log::trace!("Ignoring directory creation: {}", path.display());
255 }
256 }
257 }
258 EventKind::Modify(_) => {
259 for path in event.paths {
260 if path.is_file() {
262 log::debug!("File modified: {}", path.display());
263 changes.push(FileChange::Modified(path));
264 } else {
265 log::trace!("Ignoring directory modification: {}", path.display());
266 }
267 }
268 }
269 EventKind::Remove(_) => {
270 for path in event.paths {
271 log::debug!("File deleted: {}", path.display());
275 changes.push(FileChange::Deleted(path));
276 }
277 }
278 _ => {
279 }
281 }
282
283 changes
284 }
285
286 fn deduplicate_changes(changes: Vec<FileChange>) -> Vec<FileChange> {
290 use std::collections::HashMap;
291
292 let mut map: HashMap<PathBuf, FileChange> = HashMap::new();
293
294 for change in changes {
295 let path = match &change {
296 FileChange::Created(p) | FileChange::Modified(p) | FileChange::Deleted(p) => {
297 p.clone()
298 }
299 };
300
301 map.insert(path, change);
302 }
303
304 map.into_values().collect()
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311 use std::fs;
312 use std::thread;
313 use std::time::{Duration, Instant};
314 use tempfile::TempDir;
315
316 fn event_timeout() -> Duration {
317 let base = if cfg!(target_os = "macos") {
319 Duration::from_secs(3)
320 } else {
321 Duration::from_secs(2) };
323
324 if std::env::var("CI").is_ok() {
326 base * 2
327 } else {
328 base
329 }
330 }
331
332 fn wait_for<F>(timeout: Duration, mut predicate: F) -> bool
333 where
334 F: FnMut() -> bool,
335 {
336 let deadline = Instant::now() + timeout;
337 loop {
338 if predicate() {
339 return true;
340 }
341 if Instant::now() >= deadline {
342 return false;
343 }
344 thread::sleep(Duration::from_millis(50));
345 }
346 }
347
348 #[test]
349 fn test_watcher_creation() {
350 let tmp_watch_workspace = TempDir::new().unwrap();
351 let watcher = FileWatcher::new(tmp_watch_workspace.path());
352 assert!(watcher.is_ok());
353 }
354
355 #[test]
356 #[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
357 fn test_watcher_detects_file_creation() {
358 let tmp_watch_workspace = TempDir::new().unwrap();
359 let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
360
361 let file_path = tmp_watch_workspace.path().join("test.txt");
363 fs::write(&file_path, "test content").unwrap();
364
365 let detected = wait_for(event_timeout(), || {
366 let changes = watcher.poll_changes();
367 changes
368 .iter()
369 .any(|c| matches!(c, FileChange::Created(p) if p == &file_path))
370 });
371
372 assert!(detected, "Expected FileWatcher to detect file creation");
373 }
374
375 #[test]
376 #[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
377 fn test_watcher_detects_file_modification() {
378 let tmp_watch_workspace = TempDir::new().unwrap();
379 let file_path = tmp_watch_workspace.path().join("test.txt");
380
381 fs::write(&file_path, "initial content").unwrap();
383
384 let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
385
386 thread::sleep(Duration::from_millis(50));
388
389 fs::write(&file_path, "modified content").unwrap();
391
392 let detected = wait_for(event_timeout(), || {
393 let changes = watcher.poll_changes();
394 changes
395 .iter()
396 .any(|c| matches!(c, FileChange::Modified(p) if p == &file_path))
397 });
398
399 assert!(detected, "Expected FileWatcher to detect file modification");
400 }
401
402 #[test]
403 #[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
404 fn test_watcher_detects_file_deletion() {
405 let tmp_watch_workspace = TempDir::new().unwrap();
406 let file_path = tmp_watch_workspace.path().join("test.txt");
407
408 fs::write(&file_path, "test content").unwrap();
410
411 let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
412
413 thread::sleep(Duration::from_millis(50));
415
416 fs::remove_file(&file_path).unwrap();
418
419 let detected = wait_for(event_timeout(), || {
420 let changes = watcher.poll_changes();
421 changes
422 .iter()
423 .any(|c| matches!(c, FileChange::Deleted(p) if p == &file_path))
424 });
425
426 assert!(detected, "Expected FileWatcher to detect file deletion");
427 }
428
429 #[test]
430 fn test_watcher_poll_returns_empty_when_no_changes() {
431 let tmp_watch_workspace = TempDir::new().unwrap();
432 let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
433
434 let changes = watcher.poll_changes();
436
437 assert!(changes.is_empty());
439 }
440
441 #[test]
442 #[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
443 fn test_watcher_ignores_directories() {
444 let tmp_watch_workspace = TempDir::new().unwrap();
445 let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
446
447 let sub_dir = tmp_watch_workspace.path().join("subdir");
449 fs::create_dir(&sub_dir).unwrap();
450
451 thread::sleep(Duration::from_millis(100));
453
454 let changes = watcher.poll_changes();
456
457 assert!(
460 changes.is_empty(),
461 "Watcher should not report directory creation events, found: {changes:?}"
462 );
463
464 let file_path = sub_dir.join("test.txt");
466 fs::write(&file_path, "test").unwrap();
467
468 let detected = wait_for(event_timeout(), || {
469 let changes = watcher.poll_changes();
470 changes
471 .iter()
472 .any(|c| matches!(c, FileChange::Created(p) if p == &file_path))
473 });
474
475 assert!(
476 detected,
477 "Expected watcher to detect file creation in subdirectory"
478 );
479 }
480
481 #[test]
482 fn test_deduplicate_changes() {
483 let changes = vec![
484 FileChange::Modified(PathBuf::from("file1.txt")),
485 FileChange::Modified(PathBuf::from("file1.txt")), FileChange::Created(PathBuf::from("file2.txt")),
487 FileChange::Modified(PathBuf::from("file1.txt")), ];
489
490 let deduped = FileWatcher::deduplicate_changes(changes);
491
492 assert_eq!(deduped.len(), 2);
494
495 assert_eq!(
497 deduped
498 .iter()
499 .filter(|c| matches!(c, FileChange::Modified(p) if p == Path::new("file1.txt")))
500 .count(),
501 1
502 );
503
504 assert_eq!(
506 deduped
507 .iter()
508 .filter(|c| matches!(c, FileChange::Created(p) if p == Path::new("file2.txt")))
509 .count(),
510 1
511 );
512 }
513}