use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::time::Duration;
use notify_debouncer_full::notify::{self, RecursiveMode};
use notify_debouncer_full::{new_debouncer, DebouncedEvent, Debouncer, RecommendedCache};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
pub const IGNORED_DIRS: &[&str] = &[
".tokensave",
".git",
"node_modules",
"target",
".build",
"__pycache__",
".next",
"dist",
"build",
".cache",
];
fn path_is_ignored(path: &Path) -> bool {
path.components()
.any(|c| IGNORED_DIRS.contains(&c.as_os_str().to_str().unwrap_or("")))
}
fn is_watchable_top_level(name: &str) -> bool {
if IGNORED_DIRS.contains(&name) {
return false;
}
!name.starts_with('.') || name == ".tokensave-keepwatch"
}
pub struct ProjectWatcher {
project_root: PathBuf,
rx: mpsc::Receiver<Vec<PathBuf>>,
_debouncer: Debouncer<notify::RecommendedWatcher, RecommendedCache>,
}
impl ProjectWatcher {
pub fn new(project_root: PathBuf, debounce: Duration) -> Option<Self> {
let (tx, rx) = mpsc::channel::<Vec<PathBuf>>(64);
let tx_for_handler = tx.clone();
let mut debouncer = new_debouncer(
debounce,
None,
move |res: notify_debouncer_full::DebounceEventResult| {
let events = match res {
Ok(events) => events,
Err(errors) => {
for e in errors {
log_msg(&format!("watcher error: {e}"));
}
return;
}
};
let paths = extract_changed_paths(&events);
if paths.is_empty() {
return;
}
let _ = tx_for_handler.try_send(paths);
},
)
.ok()?;
debouncer
.watch(&project_root, RecursiveMode::NonRecursive)
.ok()?;
let mut watched = 0usize;
let mut ignored = 0usize;
if let Ok(entries) = std::fs::read_dir(&project_root) {
for entry in entries.flatten() {
let path = entry.path();
let name = entry.file_name().to_string_lossy().to_string();
let Ok(ft) = entry.file_type() else { continue };
if !ft.is_dir() {
continue;
}
if !is_watchable_top_level(&name) {
ignored += 1;
continue;
}
match debouncer.watch(&path, RecursiveMode::Recursive) {
Ok(()) => {
watched += 1;
}
Err(e) => {
log_msg(&format!(
"watcher: failed to watch {}: {e}",
path.display()
));
}
}
}
}
if watched == 0 && ignored == 0 {
log_msg(&format!(
"watcher: no subdirectories to watch under {}; relying on root-only watch",
project_root.display()
));
}
Some(Self {
project_root,
rx,
_debouncer: debouncer,
})
}
pub fn project_root(&self) -> &Path {
&self.project_root
}
pub async fn run_with_callback<F, Fut>(mut self, cancel: CancellationToken, on_sync: F)
where
F: Fn() -> Fut + Send + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
loop {
tokio::select! {
() = cancel.cancelled() => {
while let Ok(paths) = self.rx.try_recv() {
sync_project_paths(&self.project_root, &paths).await;
on_sync().await;
}
break;
}
Some(paths) = self.rx.recv() => {
sync_project_paths(&self.project_root, &paths).await;
on_sync().await;
}
}
}
}
}
fn extract_changed_paths(events: &[DebouncedEvent]) -> Vec<PathBuf> {
let mut seen: HashSet<PathBuf> = HashSet::new();
let mut out: Vec<PathBuf> = Vec::new();
for event in events {
if !matches!(
event.kind,
notify::EventKind::Create(_)
| notify::EventKind::Modify(_)
| notify::EventKind::Remove(_)
) {
continue;
}
for path in &event.paths {
if path_is_ignored(path) {
continue;
}
if seen.insert(path.clone()) {
out.push(path.clone());
}
}
}
out
}
pub async fn sync_project_paths(project_root: &Path, paths: &[PathBuf]) {
let root = project_root.to_path_buf();
let paths = paths.to_vec();
let result = tokio::task::spawn(async move {
sync_project_paths_inner(&root, &paths).await;
})
.await;
if let Err(e) = result {
let msg = if e.is_panic() {
let panic = e.into_panic();
if let Some(s) = panic.downcast_ref::<String>() {
s.clone()
} else if let Some(s) = panic.downcast_ref::<&str>() {
(*s).to_string()
} else {
"unknown panic".to_string()
}
} else {
format!("task error: {e}")
};
log_msg(&format!(
"sync panicked for {}: {msg}",
project_root.display()
));
}
}
async fn sync_project_paths_inner(project_root: &Path, paths: &[PathBuf]) {
let canonical_root = std::fs::canonicalize(project_root)
.ok()
.unwrap_or_else(|| project_root.to_path_buf());
let mut relative: Vec<String> = paths
.iter()
.filter_map(|abs| {
abs.strip_prefix(project_root)
.ok()
.or_else(|| abs.strip_prefix(&canonical_root).ok())
})
.map(|rel| rel.to_string_lossy().into_owned())
.filter(|s| !s.is_empty())
.collect();
relative.sort();
relative.dedup();
if relative.is_empty() {
return;
}
let start = std::time::Instant::now();
let Ok(cg) = crate::tokensave::TokenSave::open(project_root).await else {
log_msg(&format!("failed to open {}", project_root.display()));
return;
};
match cg.sync_if_stale_silent(&relative).await {
Ok(()) => {
let ms = start.elapsed().as_millis();
log_msg(&format!(
"sync_if_stale_silent {} — {} candidates ({}ms)",
project_root.display(),
relative.len(),
ms
));
if let Some(gdb) = crate::global_db::GlobalDb::open().await {
let tokens = cg.get_tokens_saved().await.unwrap_or(0);
gdb.upsert(project_root, tokens).await;
}
}
Err(e) => {
log_msg(&format!("sync failed for {}: {e}", project_root.display()));
}
}
}
fn log_msg(msg: &str) {
let secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
eprintln!("[{secs}] {msg}");
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn path_is_ignored_matches_components() {
assert!(path_is_ignored(Path::new("/proj/target/debug/foo")));
assert!(path_is_ignored(Path::new("/proj/node_modules/x/y")));
assert!(path_is_ignored(Path::new("/proj/.git/refs/heads/main")));
assert!(!path_is_ignored(Path::new("/proj/src/lib.rs")));
assert!(!path_is_ignored(Path::new("/proj/tests/foo.rs")));
}
#[test]
fn is_watchable_top_level_skips_ignored_and_dotdirs() {
assert!(!is_watchable_top_level("target"));
assert!(!is_watchable_top_level("node_modules"));
assert!(!is_watchable_top_level(".git"));
assert!(!is_watchable_top_level(".vscode"));
assert!(is_watchable_top_level("src"));
assert!(is_watchable_top_level("tests"));
}
}