1pub mod git_state;
35pub mod source_tree;
36
37pub use git_state::{GitChangeClass, GitStateWatcher, LastIndexedGitState};
38pub use source_tree::{ChangeSet, SourceTreeWatcher};
39
40use anyhow::{Context, Result};
41use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
42use std::path::{Path, PathBuf};
43use std::sync::mpsc::{Receiver, TryRecvError, channel};
44use std::time::Duration;
45
46#[derive(Debug, Clone, PartialEq, Eq)]
48pub enum FileChange {
49 Created(PathBuf),
51 Modified(PathBuf),
53 Deleted(PathBuf),
55}
56
57pub struct FileWatcher {
64 _watcher: RecommendedWatcher,
66 receiver: Receiver<Result<Event, notify::Error>>,
68 root_path: PathBuf,
70}
71
72impl FileWatcher {
73 pub fn new(root_path: &Path) -> Result<Self> {
85 let (tx, rx) = channel();
86
87 let mut watcher = notify::recommended_watcher(move |res| {
88 let _ = tx.send(res);
90 })
91 .context("Failed to create file system watcher")?;
92
93 watcher
95 .watch(root_path, RecursiveMode::Recursive)
96 .with_context(|| format!("Failed to watch directory: {}", root_path.display()))?;
97
98 log::info!("File watcher started for: {}", root_path.display());
99
100 Ok(Self {
101 _watcher: watcher,
102 receiver: rx,
103 root_path: root_path.to_path_buf(),
104 })
105 }
106
107 #[must_use]
116 pub fn poll_changes(&self) -> Vec<FileChange> {
117 let mut changes = Vec::new();
118
119 loop {
121 match self.receiver.try_recv() {
122 Ok(Ok(event)) => {
123 changes.extend(Self::process_event(event));
124 }
125 Ok(Err(e)) => {
126 log::warn!("File watcher error: {e}");
127 }
128 Err(TryRecvError::Empty) => {
129 break;
131 }
132 Err(TryRecvError::Disconnected) => {
133 log::error!("File watcher channel disconnected");
134 break;
135 }
136 }
137 }
138
139 changes
140 }
141
142 pub fn wait_for_change(&self) -> Result<Vec<FileChange>> {
155 let event = self
157 .receiver
158 .recv()
159 .context("File watcher channel disconnected")?
160 .context("File watcher error")?;
161
162 let mut changes = Self::process_event(event);
163
164 changes.extend(self.poll_changes());
166
167 Ok(changes)
168 }
169
170 pub fn wait_with_debounce(&self, debounce_duration: Duration) -> Result<Vec<FileChange>> {
190 let mut changes = self.wait_for_change()?;
192
193 changes.extend(self.wait_until(debounce_duration));
195
196 Ok(Self::deduplicate_changes(changes))
198 }
199
200 #[must_use]
210 pub fn wait_until(&self, duration: Duration) -> Vec<FileChange> {
211 let deadline = std::time::Instant::now() + duration;
212 let mut changes = Vec::new();
213
214 while std::time::Instant::now() < deadline {
215 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
217 let poll_interval = Duration::from_millis(10).min(remaining);
218
219 match self.receiver.recv_timeout(poll_interval) {
220 Ok(Ok(event)) => {
221 changes.extend(Self::process_event(event));
222 }
223 Ok(Err(e)) => {
224 log::warn!("File watcher error: {e}");
225 }
226 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
227 }
229 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
230 log::error!("File watcher channel disconnected");
231 break;
232 }
233 }
234 }
235
236 changes
237 }
238
239 #[must_use]
241 pub fn root_path(&self) -> &Path {
242 &self.root_path
243 }
244
245 fn process_event(event: Event) -> Vec<FileChange> {
250 let mut changes = Vec::new();
251
252 match event.kind {
253 EventKind::Create(_) => {
254 for path in event.paths {
255 if path.is_file() {
257 log::debug!("File created: {}", path.display());
258 changes.push(FileChange::Created(path));
259 } else {
260 log::trace!("Ignoring directory creation: {}", path.display());
261 }
262 }
263 }
264 EventKind::Modify(_) => {
265 for path in event.paths {
266 if path.is_file() {
268 log::debug!("File modified: {}", path.display());
269 changes.push(FileChange::Modified(path));
270 } else {
271 log::trace!("Ignoring directory modification: {}", path.display());
272 }
273 }
274 }
275 EventKind::Remove(_) => {
276 for path in event.paths {
277 log::debug!("File deleted: {}", path.display());
281 changes.push(FileChange::Deleted(path));
282 }
283 }
284 _ => {
285 }
287 }
288
289 changes
290 }
291
292 fn deduplicate_changes(changes: Vec<FileChange>) -> Vec<FileChange> {
296 use std::collections::HashMap;
297
298 let mut map: HashMap<PathBuf, FileChange> = HashMap::new();
299
300 for change in changes {
301 let path = match &change {
302 FileChange::Created(p) | FileChange::Modified(p) | FileChange::Deleted(p) => {
303 p.clone()
304 }
305 };
306
307 map.insert(path, change);
308 }
309
310 map.into_values().collect()
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317 use std::fs;
318 use std::thread;
319 use std::time::{Duration, Instant};
320 use tempfile::TempDir;
321
322 fn event_timeout() -> Duration {
323 let base = if cfg!(target_os = "macos") {
325 Duration::from_secs(3)
326 } else {
327 Duration::from_secs(2) };
329
330 if std::env::var("CI").is_ok() {
332 base * 2
333 } else {
334 base
335 }
336 }
337
338 fn wait_for<F>(timeout: Duration, mut predicate: F) -> bool
339 where
340 F: FnMut() -> bool,
341 {
342 let deadline = Instant::now() + timeout;
343 loop {
344 if predicate() {
345 return true;
346 }
347 if Instant::now() >= deadline {
348 return false;
349 }
350 thread::sleep(Duration::from_millis(50));
351 }
352 }
353
354 #[test]
355 fn test_watcher_creation() {
356 let tmp_watch_workspace = TempDir::new().unwrap();
357 let watcher = FileWatcher::new(tmp_watch_workspace.path());
358 assert!(watcher.is_ok());
359 }
360
361 #[test]
362 #[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
363 fn test_watcher_detects_file_creation() {
364 let tmp_watch_workspace = TempDir::new().unwrap();
365 let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
366
367 let file_path = tmp_watch_workspace.path().join("test.txt");
369 fs::write(&file_path, "test content").unwrap();
370
371 let detected = wait_for(event_timeout(), || {
372 let changes = watcher.poll_changes();
373 changes
374 .iter()
375 .any(|c| matches!(c, FileChange::Created(p) if p == &file_path))
376 });
377
378 assert!(detected, "Expected FileWatcher to detect file creation");
379 }
380
381 #[test]
382 #[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
383 fn test_watcher_detects_file_modification() {
384 let tmp_watch_workspace = TempDir::new().unwrap();
385 let file_path = tmp_watch_workspace.path().join("test.txt");
386
387 fs::write(&file_path, "initial content").unwrap();
389
390 let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
391
392 thread::sleep(Duration::from_millis(50));
394
395 fs::write(&file_path, "modified content").unwrap();
397
398 let detected = wait_for(event_timeout(), || {
399 let changes = watcher.poll_changes();
400 changes
401 .iter()
402 .any(|c| matches!(c, FileChange::Modified(p) if p == &file_path))
403 });
404
405 assert!(detected, "Expected FileWatcher to detect file modification");
406 }
407
408 #[test]
409 #[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
410 fn test_watcher_detects_file_deletion() {
411 let tmp_watch_workspace = TempDir::new().unwrap();
412 let file_path = tmp_watch_workspace.path().join("test.txt");
413
414 fs::write(&file_path, "test content").unwrap();
416
417 let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
418
419 thread::sleep(Duration::from_millis(50));
421
422 fs::remove_file(&file_path).unwrap();
424
425 let detected = wait_for(event_timeout(), || {
426 let changes = watcher.poll_changes();
427 changes
428 .iter()
429 .any(|c| matches!(c, FileChange::Deleted(p) if p == &file_path))
430 });
431
432 assert!(detected, "Expected FileWatcher to detect file deletion");
433 }
434
435 #[test]
436 fn test_watcher_poll_returns_empty_when_no_changes() {
437 let tmp_watch_workspace = TempDir::new().unwrap();
438 let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
439
440 let changes = watcher.poll_changes();
442
443 assert!(changes.is_empty());
445 }
446
447 #[test]
448 #[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
449 fn test_watcher_ignores_directories() {
450 let tmp_watch_workspace = TempDir::new().unwrap();
451 let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
452
453 let sub_dir = tmp_watch_workspace.path().join("subdir");
455 fs::create_dir(&sub_dir).unwrap();
456
457 thread::sleep(Duration::from_millis(100));
459
460 let changes = watcher.poll_changes();
462
463 assert!(
466 changes.is_empty(),
467 "Watcher should not report directory creation events, found: {changes:?}"
468 );
469
470 let file_path = sub_dir.join("test.txt");
472 fs::write(&file_path, "test").unwrap();
473
474 let detected = wait_for(event_timeout(), || {
475 let changes = watcher.poll_changes();
476 changes
477 .iter()
478 .any(|c| matches!(c, FileChange::Created(p) if p == &file_path))
479 });
480
481 assert!(
482 detected,
483 "Expected watcher to detect file creation in subdirectory"
484 );
485 }
486
487 #[test]
488 fn test_deduplicate_changes() {
489 let changes = vec![
490 FileChange::Modified(PathBuf::from("file1.txt")),
491 FileChange::Modified(PathBuf::from("file1.txt")), FileChange::Created(PathBuf::from("file2.txt")),
493 FileChange::Modified(PathBuf::from("file1.txt")), ];
495
496 let deduped = FileWatcher::deduplicate_changes(changes);
497
498 assert_eq!(deduped.len(), 2);
500
501 assert_eq!(
503 deduped
504 .iter()
505 .filter(|c| matches!(c, FileChange::Modified(p) if p == Path::new("file1.txt")))
506 .count(),
507 1
508 );
509
510 assert_eq!(
512 deduped
513 .iter()
514 .filter(|c| matches!(c, FileChange::Created(p) if p == Path::new("file2.txt")))
515 .count(),
516 1
517 );
518 }
519}