pub mod git_state;
pub mod source_tree;
pub use git_state::{GitChangeClass, GitStateWatcher, LastIndexedGitState};
pub use source_tree::{ChangeSet, SourceTreeWatcher};
use anyhow::{Context, Result};
use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
use std::path::{Path, PathBuf};
use std::sync::mpsc::{Receiver, TryRecvError, channel};
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FileChange {
Created(PathBuf),
Modified(PathBuf),
Deleted(PathBuf),
}
pub struct FileWatcher {
_watcher: RecommendedWatcher,
receiver: Receiver<Result<Event, notify::Error>>,
root_path: PathBuf,
}
impl FileWatcher {
pub fn new(root_path: &Path) -> Result<Self> {
let (tx, rx) = channel();
let mut watcher = notify::recommended_watcher(move |res| {
let _ = tx.send(res);
})
.context("Failed to create file system watcher")?;
watcher
.watch(root_path, RecursiveMode::Recursive)
.with_context(|| format!("Failed to watch directory: {}", root_path.display()))?;
log::info!("File watcher started for: {}", root_path.display());
Ok(Self {
_watcher: watcher,
receiver: rx,
root_path: root_path.to_path_buf(),
})
}
#[must_use]
pub fn poll_changes(&self) -> Vec<FileChange> {
let mut changes = Vec::new();
loop {
match self.receiver.try_recv() {
Ok(Ok(event)) => {
changes.extend(Self::process_event(event));
}
Ok(Err(e)) => {
log::warn!("File watcher error: {e}");
}
Err(TryRecvError::Empty) => {
break;
}
Err(TryRecvError::Disconnected) => {
log::error!("File watcher channel disconnected");
break;
}
}
}
changes
}
pub fn wait_for_change(&self) -> Result<Vec<FileChange>> {
let event = self
.receiver
.recv()
.context("File watcher channel disconnected")?
.context("File watcher error")?;
let mut changes = Self::process_event(event);
changes.extend(self.poll_changes());
Ok(changes)
}
pub fn wait_with_debounce(&self, debounce_duration: Duration) -> Result<Vec<FileChange>> {
let mut changes = self.wait_for_change()?;
changes.extend(self.wait_until(debounce_duration));
Ok(Self::deduplicate_changes(changes))
}
#[must_use]
pub fn wait_until(&self, duration: Duration) -> Vec<FileChange> {
let deadline = std::time::Instant::now() + duration;
let mut changes = Vec::new();
while std::time::Instant::now() < deadline {
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
let poll_interval = Duration::from_millis(10).min(remaining);
match self.receiver.recv_timeout(poll_interval) {
Ok(Ok(event)) => {
changes.extend(Self::process_event(event));
}
Ok(Err(e)) => {
log::warn!("File watcher error: {e}");
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
log::error!("File watcher channel disconnected");
break;
}
}
}
changes
}
#[must_use]
pub fn root_path(&self) -> &Path {
&self.root_path
}
fn process_event(event: Event) -> Vec<FileChange> {
let mut changes = Vec::new();
match event.kind {
EventKind::Create(_) => {
for path in event.paths {
if path.is_file() {
log::debug!("File created: {}", path.display());
changes.push(FileChange::Created(path));
} else {
log::trace!("Ignoring directory creation: {}", path.display());
}
}
}
EventKind::Modify(_) => {
for path in event.paths {
if path.is_file() {
log::debug!("File modified: {}", path.display());
changes.push(FileChange::Modified(path));
} else {
log::trace!("Ignoring directory modification: {}", path.display());
}
}
}
EventKind::Remove(_) => {
for path in event.paths {
log::debug!("File deleted: {}", path.display());
changes.push(FileChange::Deleted(path));
}
}
_ => {
}
}
changes
}
fn deduplicate_changes(changes: Vec<FileChange>) -> Vec<FileChange> {
use std::collections::HashMap;
let mut map: HashMap<PathBuf, FileChange> = HashMap::new();
for change in changes {
let path = match &change {
FileChange::Created(p) | FileChange::Modified(p) | FileChange::Deleted(p) => {
p.clone()
}
};
map.insert(path, change);
}
map.into_values().collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::thread;
use std::time::{Duration, Instant};
use tempfile::TempDir;
fn event_timeout() -> Duration {
let base = if cfg!(target_os = "macos") {
Duration::from_secs(3)
} else {
Duration::from_secs(2) };
if std::env::var("CI").is_ok() {
base * 2
} else {
base
}
}
fn wait_for<F>(timeout: Duration, mut predicate: F) -> bool
where
F: FnMut() -> bool,
{
let deadline = Instant::now() + timeout;
loop {
if predicate() {
return true;
}
if Instant::now() >= deadline {
return false;
}
thread::sleep(Duration::from_millis(50));
}
}
#[test]
fn test_watcher_creation() {
let tmp_watch_workspace = TempDir::new().unwrap();
let watcher = FileWatcher::new(tmp_watch_workspace.path());
assert!(watcher.is_ok());
}
#[test]
#[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
fn test_watcher_detects_file_creation() {
let tmp_watch_workspace = TempDir::new().unwrap();
let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
let file_path = tmp_watch_workspace.path().join("test.txt");
fs::write(&file_path, "test content").unwrap();
let detected = wait_for(event_timeout(), || {
let changes = watcher.poll_changes();
changes
.iter()
.any(|c| matches!(c, FileChange::Created(p) if p == &file_path))
});
assert!(detected, "Expected FileWatcher to detect file creation");
}
#[test]
#[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
fn test_watcher_detects_file_modification() {
let tmp_watch_workspace = TempDir::new().unwrap();
let file_path = tmp_watch_workspace.path().join("test.txt");
fs::write(&file_path, "initial content").unwrap();
let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
thread::sleep(Duration::from_millis(50));
fs::write(&file_path, "modified content").unwrap();
let detected = wait_for(event_timeout(), || {
let changes = watcher.poll_changes();
changes
.iter()
.any(|c| matches!(c, FileChange::Modified(p) if p == &file_path))
});
assert!(detected, "Expected FileWatcher to detect file modification");
}
#[test]
#[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
fn test_watcher_detects_file_deletion() {
let tmp_watch_workspace = TempDir::new().unwrap();
let file_path = tmp_watch_workspace.path().join("test.txt");
fs::write(&file_path, "test content").unwrap();
let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
thread::sleep(Duration::from_millis(50));
fs::remove_file(&file_path).unwrap();
let detected = wait_for(event_timeout(), || {
let changes = watcher.poll_changes();
changes
.iter()
.any(|c| matches!(c, FileChange::Deleted(p) if p == &file_path))
});
assert!(detected, "Expected FileWatcher to detect file deletion");
}
#[test]
fn test_watcher_poll_returns_empty_when_no_changes() {
let tmp_watch_workspace = TempDir::new().unwrap();
let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
let changes = watcher.poll_changes();
assert!(changes.is_empty());
}
#[test]
#[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
fn test_watcher_ignores_directories() {
let tmp_watch_workspace = TempDir::new().unwrap();
let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
let sub_dir = tmp_watch_workspace.path().join("subdir");
fs::create_dir(&sub_dir).unwrap();
thread::sleep(Duration::from_millis(100));
let changes = watcher.poll_changes();
assert!(
changes.is_empty(),
"Watcher should not report directory creation events, found: {changes:?}"
);
let file_path = sub_dir.join("test.txt");
fs::write(&file_path, "test").unwrap();
let detected = wait_for(event_timeout(), || {
let changes = watcher.poll_changes();
changes
.iter()
.any(|c| matches!(c, FileChange::Created(p) if p == &file_path))
});
assert!(
detected,
"Expected watcher to detect file creation in subdirectory"
);
}
#[test]
fn test_deduplicate_changes() {
let changes = vec![
FileChange::Modified(PathBuf::from("file1.txt")),
FileChange::Modified(PathBuf::from("file1.txt")), FileChange::Created(PathBuf::from("file2.txt")),
FileChange::Modified(PathBuf::from("file1.txt")), ];
let deduped = FileWatcher::deduplicate_changes(changes);
assert_eq!(deduped.len(), 2);
assert_eq!(
deduped
.iter()
.filter(|c| matches!(c, FileChange::Modified(p) if p == Path::new("file1.txt")))
.count(),
1
);
assert_eq!(
deduped
.iter()
.filter(|c| matches!(c, FileChange::Created(p) if p == Path::new("file2.txt")))
.count(),
1
);
}
}