use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::mpsc,
time::{Duration, Instant},
};
use notify::{Event, EventKind, RecursiveMode, Watcher};
use tracing::{debug, error, warn};
use super::config::FileWatcherConfig;
use crate::error::{EngramError, Result};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ChangeKind {
Created,
Modified,
Deleted,
}
impl std::fmt::Display for ChangeKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ChangeKind::Created => write!(f, "created"),
ChangeKind::Modified => write!(f, "modified"),
ChangeKind::Deleted => write!(f, "deleted"),
}
}
}
#[derive(Debug, Clone)]
pub struct FileEvent {
pub path: PathBuf,
pub kind: ChangeKind,
pub timestamp: String,
}
impl FileEvent {
pub fn to_memory_content(&self) -> String {
format!(
"File {}: {} at {}",
self.kind,
self.path.display(),
self.timestamp
)
}
}
#[derive(Debug)]
struct PendingEvent {
kind: ChangeKind,
earliest_fire_at: Instant,
}
pub struct FsWatcher<F>
where
F: Fn(FileEvent) + Send + 'static,
{
config: FileWatcherConfig,
callback: F,
stop_rx: mpsc::Receiver<()>,
event_rx: mpsc::Receiver<notify::Result<Event>>,
_watcher: Box<dyn Watcher + Send>,
}
impl<F> FsWatcher<F>
where
F: Fn(FileEvent) + Send + 'static,
{
pub fn new(config: FileWatcherConfig, callback: F) -> Result<(Self, mpsc::SyncSender<()>)> {
let (event_tx, event_rx) = mpsc::channel::<notify::Result<Event>>();
let mut watcher = notify::recommended_watcher(move |res| {
let _ = event_tx.send(res);
})
.map_err(|e| EngramError::Config(format!("Cannot create filesystem watcher: {e}")))?;
for path in &config.paths {
if !path.exists() {
warn!(path = ?path, "Watched path does not exist; skipping");
continue;
}
watcher
.watch(path, RecursiveMode::Recursive)
.map_err(|e| EngramError::Config(format!("Cannot watch path {:?}: {e}", path)))?;
debug!(path = ?path, "Watching path");
}
let (stop_tx, stop_rx) = mpsc::sync_channel::<()>(1);
let fs_watcher = Self {
config,
callback,
stop_rx,
event_rx,
_watcher: Box::new(watcher),
};
Ok((fs_watcher, stop_tx))
}
pub fn run(self) {
if !self.config.enabled {
debug!("File watcher is disabled; exiting immediately");
return;
}
let debounce = Duration::from_millis(self.config.debounce_ms);
let mut pending: HashMap<PathBuf, PendingEvent> = HashMap::new();
loop {
let recv_timeout = Self::next_fire_delay(&pending, debounce)
.unwrap_or_else(|| Duration::from_millis(50));
match self.event_rx.recv_timeout(recv_timeout) {
Ok(Ok(event)) => {
self.handle_raw_event(event, debounce, &mut pending);
}
Ok(Err(e)) => {
error!(error = %e, "Notify watcher error");
}
Err(mpsc::RecvTimeoutError::Timeout) => {
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
debug!("Event channel disconnected; shutting down");
break;
}
}
self.flush_pending(&mut pending);
match self.stop_rx.try_recv() {
Ok(()) | Err(mpsc::TryRecvError::Disconnected) => {
debug!("Stop signal received; shutting down file watcher");
break;
}
Err(mpsc::TryRecvError::Empty) => {}
}
}
}
fn handle_raw_event(
&self,
event: Event,
debounce: Duration,
pending: &mut HashMap<PathBuf, PendingEvent>,
) {
let kind = match classify_event_kind(&event.kind) {
Some(k) => k,
None => return,
};
for path in &event.paths {
if !self.should_watch(path) {
continue;
}
let fire_at = Instant::now() + debounce;
pending
.entry(path.clone())
.and_modify(|p| {
if kind_priority(&kind) > kind_priority(&p.kind) {
p.kind = kind.clone();
}
p.earliest_fire_at = fire_at;
})
.or_insert(PendingEvent {
kind: kind.clone(),
earliest_fire_at: fire_at,
});
}
}
fn flush_pending(&self, pending: &mut HashMap<PathBuf, PendingEvent>) {
let now = Instant::now();
let ready: Vec<PathBuf> = pending
.iter()
.filter(|(_, p)| now >= p.earliest_fire_at)
.map(|(path, _)| path.clone())
.collect();
for path in ready {
if let Some(p) = pending.remove(&path) {
let event = FileEvent {
path,
kind: p.kind,
timestamp: chrono::Utc::now().to_rfc3339(),
};
debug!(path = ?event.path, kind = ?event.kind, "Firing debounced file event");
(self.callback)(event);
}
}
}
fn next_fire_delay(
pending: &HashMap<PathBuf, PendingEvent>,
debounce: Duration,
) -> Option<Duration> {
pending
.values()
.map(|p| p.earliest_fire_at)
.min()
.map(|earliest| {
let now = Instant::now();
if earliest > now {
(earliest - now).min(debounce)
} else {
Duration::ZERO
}
})
}
pub(crate) fn should_watch(&self, path: &Path) -> bool {
if !self.config.extensions.is_empty() {
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
if !self.config.extensions.iter().any(|e| e == ext) {
return false;
}
}
let path_str = path.to_string_lossy();
for pattern in &self.config.ignore_patterns {
if path_str.contains(pattern.as_str()) {
return false;
}
}
true
}
}
pub(crate) fn classify_event_kind(kind: &EventKind) -> Option<ChangeKind> {
match kind {
EventKind::Create(_) => Some(ChangeKind::Created),
EventKind::Modify(_) => Some(ChangeKind::Modified),
EventKind::Remove(_) => Some(ChangeKind::Deleted),
_ => None,
}
}
fn kind_priority(kind: &ChangeKind) -> u8 {
match kind {
ChangeKind::Deleted => 3,
ChangeKind::Created => 2,
ChangeKind::Modified => 1,
}
}
#[cfg(test)]
mod tests {
use super::*;
use notify::{
event::{CreateKind, ModifyKind, RemoveKind},
Config as NotifyConfig, NullWatcher,
};
fn make_test_watcher(
config: FileWatcherConfig,
) -> FsWatcher<impl Fn(FileEvent) + Send + 'static> {
let (_event_tx, event_rx) = mpsc::channel::<notify::Result<Event>>();
let (_stop_tx, stop_rx) = mpsc::sync_channel::<()>(1);
let null_watcher = NullWatcher::new(|_: notify::Result<Event>| {}, NotifyConfig::default())
.expect("NullWatcher should always succeed");
FsWatcher {
config,
callback: |_: FileEvent| {},
stop_rx,
event_rx,
_watcher: Box::new(null_watcher),
}
}
fn config_with(extensions: Vec<&str>, ignore: Vec<&str>) -> FileWatcherConfig {
FileWatcherConfig {
enabled: true,
paths: Vec::new(),
extensions: extensions.into_iter().map(String::from).collect(),
debounce_ms: 50,
ignore_patterns: ignore.into_iter().map(String::from).collect(),
}
}
#[test]
fn test_classify_create_event() {
let kind = EventKind::Create(CreateKind::File);
assert_eq!(classify_event_kind(&kind), Some(ChangeKind::Created));
}
#[test]
fn test_classify_modify_event() {
let kind = EventKind::Modify(ModifyKind::Any);
assert_eq!(classify_event_kind(&kind), Some(ChangeKind::Modified));
}
#[test]
fn test_classify_remove_event() {
let kind = EventKind::Remove(RemoveKind::File);
assert_eq!(classify_event_kind(&kind), Some(ChangeKind::Deleted));
}
#[test]
fn test_classify_access_event_returns_none() {
let kind = EventKind::Access(notify::event::AccessKind::Any);
assert!(classify_event_kind(&kind).is_none());
}
#[test]
fn test_classify_other_event_returns_none() {
assert!(classify_event_kind(&EventKind::Other).is_none());
}
#[test]
fn test_extension_filter_passes_matching_extension() {
let w = make_test_watcher(config_with(vec!["rs", "md"], vec![]));
assert!(w.should_watch(Path::new("/home/user/notes/README.md")));
assert!(w.should_watch(Path::new("/project/src/main.rs")));
}
#[test]
fn test_extension_filter_rejects_non_matching_extension() {
let w = make_test_watcher(config_with(vec!["rs", "md"], vec![]));
assert!(!w.should_watch(Path::new("/project/image.png")));
assert!(!w.should_watch(Path::new("/project/data.json")));
}
#[test]
fn test_empty_extension_list_passes_all() {
let w = make_test_watcher(config_with(vec![], vec![]));
assert!(w.should_watch(Path::new("/anything/file.xyz")));
assert!(w.should_watch(Path::new("/no-extension")));
}
#[test]
fn test_ignore_pattern_rejects_matching_path() {
let w = make_test_watcher(config_with(vec![], vec![".git", "node_modules"]));
assert!(!w.should_watch(Path::new("/project/.git/config")));
assert!(!w.should_watch(Path::new("/project/node_modules/lodash/index.js")));
}
#[test]
fn test_ignore_pattern_passes_non_matching_path() {
let w = make_test_watcher(config_with(vec![], vec![".git"]));
assert!(w.should_watch(Path::new("/project/src/main.rs")));
}
#[test]
fn test_extension_and_ignore_combined() {
let w = make_test_watcher(config_with(vec!["rs"], vec!["target"]));
assert!(w.should_watch(Path::new("/project/src/lib.rs")));
assert!(!w.should_watch(Path::new("/project/target/debug/build/foo.rs")));
assert!(!w.should_watch(Path::new("/project/src/style.css")));
}
#[test]
fn test_file_event_to_memory_content() {
let event = FileEvent {
path: PathBuf::from("/home/user/notes/README.md"),
kind: ChangeKind::Modified,
timestamp: "2026-03-09T00:00:00Z".to_string(),
};
let content = event.to_memory_content();
assert!(content.contains("modified"), "content: {content}");
assert!(content.contains("README.md"), "content: {content}");
assert!(
content.contains("2026-03-09T00:00:00Z"),
"content: {content}"
);
}
#[test]
fn test_change_kind_display() {
assert_eq!(ChangeKind::Created.to_string(), "created");
assert_eq!(ChangeKind::Modified.to_string(), "modified");
assert_eq!(ChangeKind::Deleted.to_string(), "deleted");
}
#[test]
fn test_next_fire_delay_empty_returns_none() {
let pending: HashMap<PathBuf, PendingEvent> = HashMap::new();
assert!(
FsWatcher::<fn(FileEvent)>::next_fire_delay(&pending, Duration::from_millis(500))
.is_none()
);
}
#[test]
fn test_next_fire_delay_with_entry_returns_some_bounded_by_debounce() {
let debounce = Duration::from_millis(500);
let mut pending: HashMap<PathBuf, PendingEvent> = HashMap::new();
pending.insert(
PathBuf::from("/tmp/file.txt"),
PendingEvent {
kind: ChangeKind::Modified,
earliest_fire_at: Instant::now() + Duration::from_millis(200),
},
);
let delay = FsWatcher::<fn(FileEvent)>::next_fire_delay(&pending, debounce)
.expect("should be Some");
assert!(
delay <= debounce,
"delay {delay:?} should be <= debounce {debounce:?}"
);
}
#[test]
fn test_disabled_watcher_run_returns_immediately() {
let (_event_tx, event_rx) = mpsc::channel::<notify::Result<Event>>();
let (_stop_tx, stop_rx) = mpsc::sync_channel::<()>(1);
let null_watcher = NullWatcher::new(|_: notify::Result<Event>| {}, NotifyConfig::default())
.expect("NullWatcher should always succeed");
let watcher = FsWatcher {
config: FileWatcherConfig {
enabled: false,
..FileWatcherConfig::default()
},
callback: |_: FileEvent| {},
stop_rx,
event_rx,
_watcher: Box::new(null_watcher),
};
let handle = std::thread::spawn(move || watcher.run());
handle
.join()
.expect("disabled watcher thread should not panic");
}
}