use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use notify::{EventKind, RecursiveMode};
use notify_debouncer_full::{DebounceEventResult, new_debouncer};
use thiserror::Error;
use tracing::{debug, info, warn};
use crate::config::Config;
use crate::scanner::{ScanError, ScanReport};
use crate::store::Store;
#[derive(Debug, Error)]
pub enum WatchError {
#[error("notify error: {0}")]
Notify(#[from] notify::Error),
#[error("scan error: {0}")]
Scan(#[from] ScanError),
#[error("io: {0}")]
Io(#[from] std::io::Error),
}
pub type BatchCallback = Box<dyn FnMut(WatchBatch<'_>) + Send>;
pub struct WatchBatch<'a> {
pub kind: BatchKind,
pub report: &'a ScanReport,
}
#[derive(Debug, Clone, Copy)]
pub enum BatchKind {
InitialScan,
Incremental {
paths: usize,
},
}
pub fn watch_paths(
root: &Path,
config: &Config,
mut shutdown: tokio::sync::oneshot::Receiver<()>,
mut on_change: impl FnMut(Vec<PathBuf>, BatchKind),
) -> Result<(), WatchError> {
let (tx, rx) = std::sync::mpsc::channel::<DebounceEventResult>();
let debounce = Duration::from_millis(config.watch.debounce_ms);
let mut debouncer = new_debouncer(debounce, None, move |res| {
let _ = tx.send(res);
})?;
debouncer.watch(root, RecursiveMode::Recursive)?;
let basemind_subpath = root.join(crate::config::BASEMIND_DIR);
loop {
match rx.recv_timeout(Duration::from_millis(200)) {
Ok(Ok(events)) => {
let mut touched: Vec<PathBuf> = Vec::new();
for ev in events {
if !is_relevant(&ev.event.kind) {
continue;
}
for p in &ev.event.paths {
if p.starts_with(&basemind_subpath) {
continue;
}
touched.push(p.clone());
}
}
touched.sort();
touched.dedup();
if touched.is_empty() {
continue;
}
debug!(n = touched.len(), "debounced batch");
let n = touched.len();
on_change(touched, BatchKind::Incremental { paths: n });
}
Ok(Err(errors)) => {
for e in errors {
warn!(error = %e, "watch error");
}
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
if shutdown.try_recv().is_ok() {
info!("shutdown requested; exiting watcher");
return Ok(());
}
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
info!("debouncer channel closed; exiting watcher");
return Ok(());
}
}
}
}
pub fn watch(
root: &Path,
store: Arc<Mutex<Store>>,
config: Arc<Config>,
shutdown: tokio::sync::oneshot::Receiver<()>,
mut on_batch: BatchCallback,
) -> Result<(), WatchError> {
info!(root = %root.display(), "initial scan");
{
let mut guard = store.lock().expect("store poisoned");
let report = crate::scanner::scan(
root,
&mut guard,
&config,
crate::scanner::ScanSource::WorkingTree,
)?;
on_batch(WatchBatch {
kind: BatchKind::InitialScan,
report: &report,
});
}
info!("initial scan complete; entering watch mode");
watch_paths(root, &config, shutdown, |touched, kind| {
let mut guard = store.lock().expect("store poisoned");
match crate::scanner::scan_paths(root, &mut guard, &config, &touched) {
Ok(report) => {
on_batch(WatchBatch {
kind,
report: &report,
});
}
Err(e) => warn!(error = %e, "scan_paths failed"),
}
})
}
fn is_relevant(kind: &EventKind) -> bool {
matches!(
kind,
EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::mpsc;
use std::time::Duration;
#[test]
fn should_emit_changed_path_when_file_is_modified() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().canonicalize().expect("canonicalize tempdir");
let mut config = crate::config::default_for_root(&root);
config.watch.debounce_ms = 50;
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let (path_tx, path_rx) = mpsc::channel::<Vec<PathBuf>>();
let root_for_thread = root.clone();
let handle = std::thread::spawn(move || {
watch_paths(&root_for_thread, &config, shutdown_rx, |paths, kind| {
assert!(matches!(kind, BatchKind::Incremental { .. }));
let _ = path_tx.send(paths);
})
});
std::thread::sleep(Duration::from_millis(500));
let target = root.join("hello.rs");
std::fs::write(&target, b"fn main() {}\n").expect("write file");
let received = path_rx
.recv_timeout(Duration::from_secs(10))
.expect("changed path within window");
assert!(
received.iter().any(|p| p.ends_with("hello.rs")),
"expected hello.rs in {received:?}"
);
let _ = shutdown_tx.send(());
let _ = handle.join();
}
#[test]
fn should_ignore_changes_under_basemind_dir() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().canonicalize().expect("canonicalize tempdir");
std::fs::create_dir_all(root.join(crate::config::BASEMIND_DIR)).expect("mkdir .basemind");
let mut config = crate::config::default_for_root(&root);
config.watch.debounce_ms = 50;
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let (path_tx, path_rx) = mpsc::channel::<Vec<PathBuf>>();
let root_for_thread = root.clone();
let handle = std::thread::spawn(move || {
watch_paths(&root_for_thread, &config, shutdown_rx, |paths, _kind| {
let _ = path_tx.send(paths);
})
});
std::thread::sleep(Duration::from_millis(200));
std::fs::write(
root.join(crate::config::BASEMIND_DIR).join("noise.txt"),
b"ignored\n",
)
.expect("write basemind file");
let result = path_rx.recv_timeout(Duration::from_millis(800));
assert!(result.is_err(), "expected no emission, got {result:?}");
let _ = shutdown_tx.send(());
let _ = handle.join();
}
}