use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use ignore::gitignore::Gitignore;
use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use tokio::sync::mpsc::{Receiver, Sender, channel};
use tokio::task::JoinHandle;
use tokio::time::{Instant, sleep_until};
use crate::trvignore;
pub const DEBOUNCE_WINDOW: Duration = Duration::from_millis(200);
pub const DEBOUNCE_HARD_CAP: Duration = Duration::from_secs(1);
const RAW_CHANNEL_CAP: usize = 64;
const EVENT_CHANNEL_CAP: usize = 16;
#[derive(Debug)]
pub enum LiveEvent {
Rescan,
WatcherError(String),
}
#[must_use]
pub fn is_vcs_metadata_path(path: &Path) -> bool {
const VCS_DIRS: &[&str] = &[".git", ".jj", ".hg"];
path.components().any(|component| {
if let Some(name) = component.as_os_str().to_str() {
VCS_DIRS.contains(&name)
} else {
false
}
})
}
#[must_use]
pub fn is_ignored_by_trv(matcher: &Gitignore, path: &Path) -> bool {
trvignore::is_ignored(matcher, path)
}
#[must_use]
#[deprecated(note = "use is_vcs_metadata_path (+ is_ignored_by_trv for user ignores)")]
pub fn is_ignored_path(path: &Path) -> bool {
is_vcs_metadata_path(path)
}
pub struct LiveWatcherHandle {
_watcher: RecommendedWatcher,
task: JoinHandle<()>,
}
impl LiveWatcherHandle {
pub fn stop(self) {
drop(self);
}
}
impl Drop for LiveWatcherHandle {
fn drop(&mut self) {
self.task.abort();
}
}
pub fn spawn_live_watcher(
root: PathBuf,
) -> Result<(LiveWatcherHandle, Receiver<LiveEvent>), String> {
let (event_tx, event_rx) = channel::<LiveEvent>(EVENT_CHANNEL_CAP);
let (raw_tx, raw_rx) = channel::<()>(RAW_CHANNEL_CAP);
let matcher: Option<Arc<Gitignore>> = trvignore::load_matcher(&root).map(Arc::new);
let matcher_cb = matcher.clone();
let err_tx = event_tx.clone();
let mut watcher = notify::recommended_watcher(move |res: notify::Result<Event>| match res {
Ok(event) => {
if should_forward_event(&event, matcher_cb.as_deref()) {
let _ = raw_tx.try_send(());
}
}
Err(e) => {
let _ = err_tx.try_send(LiveEvent::WatcherError(format!(
"filesystem watcher error: {e}"
)));
}
})
.map_err(|e| format!("failed to create filesystem watcher: {e}"))?;
watcher
.watch(&root, RecursiveMode::Recursive)
.map_err(|e| format!("failed to watch {}: {e}", root.display()))?;
let task = tokio::spawn(debounce_pump(raw_rx, event_tx));
Ok((
LiveWatcherHandle {
_watcher: watcher,
task,
},
event_rx,
))
}
fn should_forward_event(event: &Event, matcher: Option<&Gitignore>) -> bool {
let relevant = matches!(
event.kind,
EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
);
if !relevant {
return false;
}
event.paths.iter().any(|p| {
if is_vcs_metadata_path(p) {
return false;
}
if let Some(m) = matcher
&& is_ignored_by_trv(m, p)
{
return false;
}
true
})
}
pub(crate) async fn debounce_pump(mut raw_rx: Receiver<()>, event_tx: Sender<LiveEvent>) {
loop {
if raw_rx.recv().await.is_none() {
return;
}
let batch_start = Instant::now();
let hard_deadline = batch_start + DEBOUNCE_HARD_CAP;
let mut soft_deadline = batch_start + DEBOUNCE_WINDOW;
loop {
let deadline = soft_deadline.min(hard_deadline);
tokio::select! {
maybe = raw_rx.recv() => {
match maybe {
Some(()) => {
soft_deadline = (Instant::now() + DEBOUNCE_WINDOW).min(hard_deadline);
}
None => {
let _ = event_tx.try_send(LiveEvent::Rescan);
return;
}
}
}
() = sleep_until(deadline) => {
break;
}
}
}
let _ = event_tx.try_send(LiveEvent::Rescan);
}
}
#[cfg(test)]
mod tests {
use super::*;
use ignore::gitignore::GitignoreBuilder;
use std::path::PathBuf;
#[test]
fn flags_paths_under_dot_git_as_vcs_metadata() {
assert!(is_vcs_metadata_path(&PathBuf::from(".git/HEAD")));
assert!(is_vcs_metadata_path(&PathBuf::from(
"/repo/.git/refs/heads/main"
)));
}
#[test]
fn flags_paths_under_jj_and_hg_as_vcs_metadata() {
assert!(is_vcs_metadata_path(&PathBuf::from(".jj/repo/store")));
assert!(is_vcs_metadata_path(&PathBuf::from(
".hg/store/00changelog.i"
)));
}
#[test]
fn does_not_flag_source_paths_as_vcs_metadata() {
assert!(!is_vcs_metadata_path(&PathBuf::from("src/main.rs")));
assert!(!is_vcs_metadata_path(&PathBuf::from(
"crates/travelagent-core/src/lib.rs"
)));
}
#[test]
fn target_is_no_longer_hardcoded_as_vcs_metadata() {
assert!(!is_vcs_metadata_path(&PathBuf::from("target")));
assert!(!is_vcs_metadata_path(&PathBuf::from("src/target")));
assert!(!is_vcs_metadata_path(&PathBuf::from("target/debug/app")));
}
#[test]
fn honors_gitignore_rule_for_target_directory() {
let dir = tempfile::tempdir().expect("tmpdir");
std::fs::write(dir.path().join(".gitignore"), "target/\n").expect("write");
let matcher = trvignore::load_matcher(dir.path()).expect("matcher");
assert!(is_ignored_by_trv(
&matcher,
&dir.path().join("target/debug/foo"),
));
assert!(is_ignored_by_trv(&matcher, &dir.path().join("target")));
assert!(!is_ignored_by_trv(
&matcher,
&dir.path().join("src/main.rs"),
));
}
#[test]
fn honors_trvignore_unignore_over_gitignore() {
let dir = tempfile::tempdir().expect("tmpdir");
std::fs::write(dir.path().join(".gitignore"), "*.lock\n").expect("write");
std::fs::write(dir.path().join(".trvignore"), "!Cargo.lock\n").expect("write");
let matcher = trvignore::load_matcher(dir.path()).expect("matcher");
assert!(!is_ignored_by_trv(&matcher, &dir.path().join("Cargo.lock"),));
assert!(is_ignored_by_trv(&matcher, &dir.path().join("yarn.lock")));
}
fn mk_event(kind: EventKind, paths: Vec<PathBuf>) -> Event {
Event {
kind,
paths,
attrs: notify::event::EventAttributes::new(),
}
}
#[test]
fn should_forward_event_drops_vcs_metadata_even_without_matcher() {
let e = mk_event(
EventKind::Modify(notify::event::ModifyKind::Any),
vec![PathBuf::from(".git/index")],
);
assert!(!should_forward_event(&e, None));
}
#[test]
fn should_forward_event_forwards_when_no_matcher_and_not_vcs() {
let e = mk_event(
EventKind::Modify(notify::event::ModifyKind::Any),
vec![PathBuf::from("src/main.rs")],
);
assert!(should_forward_event(&e, None));
}
#[test]
fn should_forward_event_drops_matches_against_gitignore() {
let dir = tempfile::tempdir().expect("tmpdir");
let root = dir.path();
let mut builder = GitignoreBuilder::new(root);
builder.add_line(None, "target/").expect("add_line");
let matcher = builder.build().expect("build");
let ignored = mk_event(
EventKind::Modify(notify::event::ModifyKind::Any),
vec![root.join("target/debug/foo")],
);
assert!(!should_forward_event(&ignored, Some(&matcher)));
let kept = mk_event(
EventKind::Modify(notify::event::ModifyKind::Any),
vec![root.join("src/main.rs")],
);
assert!(should_forward_event(&kept, Some(&matcher)));
}
#[tokio::test(start_paused = true)]
async fn emits_single_rescan_after_debounce_window() {
let (raw_tx, raw_rx) = channel::<()>(RAW_CHANNEL_CAP);
let (event_tx, mut event_rx) = channel::<LiveEvent>(EVENT_CHANNEL_CAP);
let _task = tokio::spawn(debounce_pump(raw_rx, event_tx));
raw_tx.send(()).await.unwrap();
tokio::time::advance(DEBOUNCE_WINDOW + Duration::from_millis(10)).await;
let evt = event_rx.recv().await.expect("rescan delivered");
assert!(matches!(evt, LiveEvent::Rescan));
tokio::time::advance(DEBOUNCE_HARD_CAP * 3).await;
assert!(event_rx.try_recv().is_err(), "no extra rescan");
}
#[tokio::test(start_paused = true)]
async fn coalesces_burst_into_one_rescan() {
let (raw_tx, raw_rx) = channel::<()>(RAW_CHANNEL_CAP);
let (event_tx, mut event_rx) = channel::<LiveEvent>(EVENT_CHANNEL_CAP);
let _task = tokio::spawn(debounce_pump(raw_rx, event_tx));
for _ in 0..10 {
raw_tx.send(()).await.unwrap();
tokio::time::advance(Duration::from_millis(20)).await;
}
tokio::time::advance(DEBOUNCE_WINDOW + Duration::from_millis(50)).await;
let evt = event_rx.recv().await.expect("first rescan");
assert!(matches!(evt, LiveEvent::Rescan));
assert!(event_rx.try_recv().is_err(), "only one rescan");
}
#[tokio::test(start_paused = true)]
async fn hard_cap_forces_emission_during_long_write() {
let (raw_tx, raw_rx) = channel::<()>(RAW_CHANNEL_CAP);
let (event_tx, mut event_rx) = channel::<LiveEvent>(EVENT_CHANNEL_CAP);
let _task = tokio::spawn(debounce_pump(raw_rx, event_tx));
let total = DEBOUNCE_HARD_CAP + DEBOUNCE_HARD_CAP / 2;
let step = Duration::from_millis(50);
let steps = total.as_millis() / step.as_millis();
for _ in 0..steps {
raw_tx.send(()).await.unwrap();
tokio::time::advance(step).await;
}
let evt = event_rx.recv().await.expect("hard-cap rescan");
assert!(matches!(evt, LiveEvent::Rescan));
}
#[tokio::test(start_paused = true)]
async fn bounded_raw_channel_drops_overflow_without_blocking() {
let (raw_tx, raw_rx) = channel::<()>(RAW_CHANNEL_CAP);
let (event_tx, mut event_rx) = channel::<LiveEvent>(EVENT_CHANNEL_CAP);
let _task = tokio::spawn(debounce_pump(raw_rx, event_tx));
let mut accepted = 0;
for _ in 0..(RAW_CHANNEL_CAP * 10) {
if raw_tx.try_send(()).is_ok() {
accepted += 1;
}
}
assert!(
accepted >= 1,
"at least the first send landed ({accepted} accepted)"
);
assert!(
accepted <= RAW_CHANNEL_CAP + 1,
"sender respects the bound ({accepted} <= {} + slack)",
RAW_CHANNEL_CAP
);
tokio::time::advance(DEBOUNCE_WINDOW + Duration::from_millis(50)).await;
let evt = event_rx.recv().await.expect("rescan delivered");
assert!(matches!(evt, LiveEvent::Rescan));
}
#[tokio::test(start_paused = true)]
async fn two_spaced_batches_produce_two_rescans() {
let (raw_tx, raw_rx) = channel::<()>(RAW_CHANNEL_CAP);
let (event_tx, mut event_rx) = channel::<LiveEvent>(EVENT_CHANNEL_CAP);
let _task = tokio::spawn(debounce_pump(raw_rx, event_tx));
raw_tx.send(()).await.unwrap();
tokio::time::advance(DEBOUNCE_WINDOW + Duration::from_millis(50)).await;
let _ = event_rx.recv().await.expect("first rescan");
raw_tx.send(()).await.unwrap();
tokio::time::advance(DEBOUNCE_WINDOW + Duration::from_millis(50)).await;
let _ = event_rx.recv().await.expect("second rescan");
}
}