Skip to main content

vcs_watch/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![deny(rustdoc::broken_intra_doc_links)]
3//! `vcs-watch` — filesystem-watch a git/jj repository and emit typed state-change
4//! events.
5//!
6//! A [`RepoWatcher`] watches a repository's `.git`/`.jj` state directory (and,
7//! optionally, the working tree), **debounces** the burst of writes a VCS
8//! operation makes, **re-queries** the repo state through
9//! [`vcs-core`](vcs_core)'s batched [`snapshot`](vcs_core::Repo::snapshot), and
10//! **diffs** it against the previous state to yield typed [`RepoEvent`]s. Each
11//! settled change arrives as a [`RepoChange`] carrying both the new
12//! [`RepoSnapshot`] (to render a prompt/status line) and the deltas (to react).
13//! It's the foundation for prompts, status bars, TUIs, and repo daemons.
14//!
15//! Re-query-and-diff — rather than interpreting raw filesystem events — is what
16//! makes it robust: git's ref temp-file renames, `index.lock` churn, and reflog
17//! noise all just coalesce into one "re-check the settled state" instead of being
18//! (mis)read as events. Noise that doesn't move observable state emits nothing,
19//! and every emission carries the true current state, so a stray event can't
20//! desync the consumer.
21//!
22//! # The surface
23//!
24//! - **[`RepoWatcher`]** — a live watch over one repository. Start it with
25//!   [`RepoWatcher::watch`] (defaults) or the [`Builder`]; drop it to stop the OS
26//!   watch and the background task.
27//! - **[`Builder`]** ([`RepoWatcher::builder`]) — set the watch scope and timing,
28//!   then [`build`](Builder::build): [`working_tree`](Builder::working_tree) to
29//!   also watch the tree recursively, [`debounce`](Builder::debounce) (the quiet
30//!   window), [`max_wait`](Builder::max_wait) (the re-query ceiling under a
31//!   continuous stream), [`requery_timeout`](Builder::requery_timeout) (the
32//!   per-re-query deadline). The [`DEFAULT_REQUERY_TIMEOUT`] et al. name the
33//!   defaults.
34//! - **[`RepoEvent`]** — one typed delta, derived by diffing two snapshots:
35//!   [`HeadMoved`](RepoEvent::HeadMoved),
36//!   [`BranchSwitched`](RepoEvent::BranchSwitched),
37//!   [`BranchCreated`](RepoEvent::BranchCreated) /
38//!   [`BranchDeleted`](RepoEvent::BranchDeleted),
39//!   [`WorkingCopyChanged`](RepoEvent::WorkingCopyChanged), and the
40//!   upstream/ahead-behind/operation/conflict variants (`#[non_exhaustive]`).
41//! - **[`RepoChange`]** — a settled change: the fresh [`RepoSnapshot`] (render a
42//!   status line off it) plus the non-empty `events` vec (react to it).
43//! - **Consumption** — pull changes with [`recv`](RepoWatcher::recv)
44//!   (`Option<RepoChange>`; `None` once dropped), or, under the **`stream`**
45//!   feature, poll the watcher as a `futures_core::Stream`. Both pull from the
46//!   same channel and advance [`current`](RepoWatcher::current), the last-pulled
47//!   snapshot.
48//! - **[`WatcherStats`]** ([`stats`](RepoWatcher::stats)) — lock-free health
49//!   counters (re-queries run, changes emitted, skips, and the last skip's
50//!   [`WatcherErrorKind`]). Climbing [`skipped`](WatcherStats::skipped) with flat
51//!   [`changes`](WatcherStats::changes) means a wedged repo — poll it from a
52//!   health check rather than inferring health from event silence.
53//!
54//! # Recipes
55//!
56//! Watch with the defaults and react to each settled change:
57//!
58//! ```no_run
59//! use vcs_core::Repo;
60//! use vcs_watch::RepoWatcher;
61//! # async fn run() -> vcs_watch::Result<()> {
62//! let repo = Repo::open(".")?;
63//! let mut watcher = RepoWatcher::watch(repo).await?;
64//! while let Some(change) = watcher.recv().await {
65//!     for event in &change.events {
66//!         println!("{event:?}");
67//!     }
68//!     // `change.snapshot` is the fresh full state — render a status line off it.
69//! }
70//! # Ok(()) }
71//! ```
72//!
73//! Under the **`stream`** feature the watcher *is* a `futures_core::Stream`,
74//! so it drops into stream combinators and `tokio::select!` directly (needs
75//! `futures`/`tokio-stream`'s `StreamExt` in scope):
76//!
77//! ```ignore
78//! use futures::StreamExt;
79//! use vcs_core::Repo;
80//! use vcs_watch::RepoWatcher;
81//! # async fn run() -> vcs_watch::Result<()> {
82//! let repo = Repo::open(".")?;
83//! let mut watcher = RepoWatcher::watch(repo).await?;
84//! while let Some(change) = watcher.next().await {
85//!     println!("{} event(s)", change.events.len());
86//! }
87//! # Ok(()) }
88//! ```
89//!
90//! **Runtime:** unlike the rest of the toolkit (which hides tokio behind
91//! `processkit`), `vcs-watch` uses **tokio at runtime** — the watch task and the
92//! debounce timer run on the caller's tokio runtime, so build/await it from
93//! within one.
94//!
95//! # Testing
96//!
97//! The debounce → ceiling → re-query pipeline is a free function over injected
98//! seams, so it is exercised hermetically on a **paused clock** (no real
99//! filesystem or sleeps); a consumer's own watch code tests the same way it tests
100//! any [`vcs-core`](vcs_core) consumer — build the [`Repo`](vcs_core::Repo) over a
101//! fake runner (processkit's `ScriptedRunner`) so the re-query returns canned
102//! state. See
103//! [vcs-testkit's guide](https://docs.rs/vcs-testkit/latest/vcs_testkit/guide/testing/).
104//!
105//! # In-depth guide
106//!
107//! Beyond this page, this crate ships a full how-to guide — rendered on docs.rs
108//! from `docs/`. See the [`guide`] module.
109
110use std::path::{Path, PathBuf};
111use std::sync::Arc;
112use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
113use std::time::Duration;
114
115use notify::{RecursiveMode, Watcher};
116use tokio::sync::mpsc;
117use vcs_core::{BackendKind, VcsRepo};
118
119mod error;
120mod event;
121
122pub use error::{Error, Result};
123pub use event::{RepoChange, RepoEvent};
124// Re-export the snapshot types a consumer reads off a `RepoChange`, so depending
125// on `vcs-watch` alone suffices.
126pub use vcs_core::{OperationState, RepoSnapshot};
127
128/// Default quiet window: a re-query fires once the watched dir has been silent
129/// for this long after the last event.
130const DEFAULT_DEBOUNCE: Duration = Duration::from_millis(250);
131/// Default ceiling: even under a continuous stream of events, re-query at least
132/// this often (so a long bulk operation still reports progress).
133const DEFAULT_MAX_WAIT: Duration = Duration::from_secs(1);
134/// Default deadline on a single re-query (`snapshot` + branch list): a wedged
135/// command (e.g. a held `index.lock` with no client timeout configured) is
136/// killed and skipped instead of stalling the watch loop forever.
137pub const DEFAULT_REQUERY_TIMEOUT: Duration = Duration::from_secs(30);
138/// Bounded output channel: a slow consumer applies backpressure (the loop pauses
139/// re-querying), and pending filesystem signals coalesce into one catch-up query.
140const OUTPUT_CAPACITY: usize = 64;
141
142/// The timing/capacity knobs the background loop runs under — bundled so the
143/// loop signature stays small and the hermetic tests can vary them (notably
144/// `output_capacity`, which the backpressure test shrinks to 1).
145struct LoopConfig {
146    debounce: Duration,
147    max_wait: Duration,
148    /// `None` disables the per-re-query deadline.
149    requery_timeout: Option<Duration>,
150    output_capacity: usize,
151}
152
153/// Builder for a [`RepoWatcher`] — set the watch scope and debounce timing, then
154/// [`build`](Builder::build).
155pub struct Builder {
156    repo: Box<dyn VcsRepo>,
157    working_tree: bool,
158    debounce: Duration,
159    max_wait: Duration,
160    requery_timeout: Option<Duration>,
161}
162
163impl Builder {
164    /// Also watch the **working tree** recursively, so a bare unstaged edit
165    /// (`vim file`) fires [`WorkingCopyChanged`](RepoEvent::WorkingCopyChanged)
166    /// immediately. Off by default (only the `.git`/`.jj` state dir is watched,
167    /// which catches an unstaged edit once it touches the index / a jj snapshot).
168    ///
169    /// Note: `notify` is `.gitignore`-unaware, so this also watches ignored and
170    /// build directories — heavier on a large tree.
171    pub fn working_tree(mut self, yes: bool) -> Self {
172        self.working_tree = yes;
173        self
174    }
175
176    /// The quiet window: re-query once the watched dir has been silent this long
177    /// after the last event (default 250 ms). Coalesces an operation's write
178    /// burst into one re-check.
179    pub fn debounce(mut self, window: Duration) -> Self {
180        self.debounce = window;
181        self
182    }
183
184    /// The ceiling on how long a continuous event stream defers the re-query
185    /// (default 1 s) — a long bulk operation still reports at this cadence.
186    pub fn max_wait(mut self, ceiling: Duration) -> Self {
187        self.max_wait = ceiling;
188        self
189    }
190
191    /// Deadline on a single re-query (the `snapshot` + branch-list pair), default
192    /// [`DEFAULT_REQUERY_TIMEOUT`] (30 s); `None` disables it. Orthogonal to
193    /// [`max_wait`](Self::max_wait): that bounds how long signals may *defer* a
194    /// re-query, this bounds how long one re-query may *run*. On overrun the
195    /// spawned commands are killed (kill-on-drop) and the re-query is skipped as
196    /// transient — the next filesystem event re-checks.
197    ///
198    /// Note: on a very large repository a *cold-cache* `git status` (first run
199    /// after a `gc`, or on a slow disk) can legitimately exceed the 30 s default
200    /// — raise it (or pass `None`) there; a watcher whose every re-query is
201    /// being killed shows up as climbing [`WatcherStats::skipped`] with flat
202    /// `changes`.
203    pub fn requery_timeout(mut self, timeout: Option<Duration>) -> Self {
204        self.requery_timeout = timeout;
205        self
206    }
207
208    /// Start watching. Captures the baseline state, registers the filesystem
209    /// watch, and spawns the background re-query task on the current tokio
210    /// runtime.
211    pub async fn build(self) -> Result<RepoWatcher> {
212        let root = self.repo.root().to_path_buf();
213        // The dirs whose writes mean "re-check": the `.git`/`.jj` state dir, plus
214        // — for a linked git worktree — the *shared* git dir it points at via
215        // `commondir` (where `refs/heads/*` and `packed-refs` actually live, so
216        // branch create/delete is seen). See `state_dirs`.
217        let state_dirs = state_dirs(self.repo.kind(), &root)?;
218
219        // Bridge: notify's callback thread pushes a unit signal per event into an
220        // unbounded channel (non-blocking, thread-safe); the debounce loop drains
221        // it. Build the watcher and register paths *before* the baseline snapshot,
222        // so a change racing the baseline is queued, not lost.
223        let (raw_tx, raw_rx) = mpsc::unbounded_channel::<()>();
224        let mut watcher = notify::recommended_watcher(move |_res| {
225            // Content is irrelevant — we re-query state, so any event (or watch
226            // error) just means "re-check". Send fails only after the loop ends.
227            let _ = raw_tx.send(());
228        })?;
229        if self.working_tree {
230            watcher.watch(&root, RecursiveMode::Recursive)?;
231            // A worktree gitlink puts the real (private and shared) git dirs
232            // outside `root`; cover any not already under the recursive root watch.
233            for dir in &state_dirs {
234                if !dir.starts_with(&root) {
235                    watcher.watch(dir, RecursiveMode::Recursive)?;
236                }
237            }
238        } else {
239            for dir in &state_dirs {
240                watcher.watch(dir, RecursiveMode::Recursive)?;
241            }
242        }
243
244        let snapshot = self.repo.snapshot().await?;
245        let branches = self.repo.local_branches().await?;
246        let baseline = snapshot.clone();
247        let prev = event::WatchState::from_snapshot(&snapshot, branches);
248
249        let config = LoopConfig {
250            debounce: self.debounce,
251            max_wait: self.max_wait,
252            requery_timeout: self.requery_timeout,
253            output_capacity: OUTPUT_CAPACITY,
254        };
255        let stats = Arc::new(StatsInner::default());
256        let (out_tx, out_rx) = mpsc::channel::<RepoChange>(config.output_capacity);
257        let task = tokio::spawn(watch_loop(
258            self.repo,
259            raw_rx,
260            out_tx,
261            prev,
262            config,
263            Arc::clone(&stats),
264        ));
265
266        Ok(RepoWatcher {
267            rx: out_rx,
268            current: baseline,
269            stats,
270            _watcher: watcher,
271            task,
272        })
273    }
274}
275
276// --- Watcher health counters --------------------------------------------------
277
278/// What the last skipped re-query failed on (see [`WatcherStats::last_error`]).
279#[derive(Debug, Clone, Copy, PartialEq, Eq)]
280#[non_exhaustive]
281pub enum WatcherErrorKind {
282    /// The snapshot re-query returned an error (e.g. a transiently held lock).
283    Snapshot,
284    /// The branch-list re-query returned an error.
285    Branches,
286    /// The re-query exceeded [`Builder::requery_timeout`] and was killed.
287    Timeout,
288}
289
290/// A cheap point-in-time copy of the watcher's health counters — see
291/// [`RepoWatcher::stats`]. Lets a long-running consumer notice a watcher that is
292/// silently skipping re-queries (e.g. a permanently wedged repository) instead
293/// of inferring health from event silence.
294#[derive(Debug, Clone, Copy)]
295#[non_exhaustive]
296pub struct WatcherStats {
297    /// Re-query attempts started (settled bursts that reached the query step).
298    pub requeries: u64,
299    /// Re-queries that emitted a [`RepoChange`] (the rest found no difference).
300    pub changes: u64,
301    /// Re-queries skipped — transient query failures plus deadline overruns.
302    pub skipped: u64,
303    /// What the most recent skip failed on; `None` when nothing was ever skipped.
304    pub last_error: Option<WatcherErrorKind>,
305}
306
307/// Lock-free counter cell shared between the loop and `stats()` readers. Relaxed
308/// ordering is enough: the counters are independent monotonic telemetry, not a
309/// synchronization protocol.
310#[derive(Default)]
311struct StatsInner {
312    requeries: AtomicU64,
313    changes: AtomicU64,
314    skipped: AtomicU64,
315    /// 0 = none, else `WatcherErrorKind as u8 + 1`.
316    last_error: AtomicU8,
317}
318
319impl StatsInner {
320    fn note_requery(&self) {
321        self.requeries.fetch_add(1, Ordering::Relaxed);
322    }
323
324    fn note_change(&self) {
325        self.changes.fetch_add(1, Ordering::Relaxed);
326    }
327
328    fn note_skip(&self, kind: WatcherErrorKind) {
329        self.skipped.fetch_add(1, Ordering::Relaxed);
330        let code = match kind {
331            WatcherErrorKind::Snapshot => 1,
332            WatcherErrorKind::Branches => 2,
333            WatcherErrorKind::Timeout => 3,
334        };
335        self.last_error.store(code, Ordering::Relaxed);
336    }
337
338    fn snapshot(&self) -> WatcherStats {
339        let last_error = match self.last_error.load(Ordering::Relaxed) {
340            1 => Some(WatcherErrorKind::Snapshot),
341            2 => Some(WatcherErrorKind::Branches),
342            3 => Some(WatcherErrorKind::Timeout),
343            _ => None,
344        };
345        WatcherStats {
346            requeries: self.requeries.load(Ordering::Relaxed),
347            changes: self.changes.load(Ordering::Relaxed),
348            skipped: self.skipped.load(Ordering::Relaxed),
349            last_error,
350        }
351    }
352}
353
354/// A live watch over a repository, yielding [`RepoChange`]s as the repo's state
355/// changes. Dropping it stops the filesystem watch and the background task.
356pub struct RepoWatcher {
357    rx: mpsc::Receiver<RepoChange>,
358    current: RepoSnapshot,
359    stats: Arc<StatsInner>,
360    // Held to keep the OS watch alive; dropping it ends the watch (and the loop).
361    _watcher: notify::RecommendedWatcher,
362    task: tokio::task::JoinHandle<()>,
363}
364
365impl RepoWatcher {
366    /// A builder over `repo` (any [`VcsRepo`] — e.g. a [`vcs_core::Repo`]).
367    pub fn builder(repo: impl VcsRepo + 'static) -> Builder {
368        Builder {
369            repo: Box::new(repo),
370            working_tree: false,
371            debounce: DEFAULT_DEBOUNCE,
372            max_wait: DEFAULT_MAX_WAIT,
373            requery_timeout: Some(DEFAULT_REQUERY_TIMEOUT),
374        }
375    }
376
377    /// Start watching `repo` with the defaults (state dir only, 250 ms debounce).
378    pub async fn watch(repo: impl VcsRepo + 'static) -> Result<RepoWatcher> {
379        Self::builder(repo).build().await
380    }
381
382    /// Await the next settled change. Returns `None` once the watcher is dropped
383    /// or its background task ends.
384    pub async fn recv(&mut self) -> Option<RepoChange> {
385        let change = self.rx.recv().await?;
386        self.current = change.snapshot.clone();
387        Some(change)
388    }
389
390    /// The most recent known snapshot — the baseline captured at
391    /// [`build`](Builder::build), then the snapshot from each [`recv`](Self::recv).
392    /// It advances **only when you call [`recv`](Self::recv)**, so it is as fresh
393    /// as your last `recv`, not a live view.
394    pub fn current(&self) -> &RepoSnapshot {
395        &self.current
396    }
397
398    /// The watcher's health counters (re-queries run / changes emitted / skips,
399    /// and what the last skip failed on). Cheap relaxed-atomic reads — poll it
400    /// from a health check or log it periodically; a climbing
401    /// [`skipped`](WatcherStats::skipped) with flat
402    /// [`requeries`](WatcherStats::requeries) means the repository is wedged.
403    pub fn stats(&self) -> WatcherStats {
404        self.stats.snapshot()
405    }
406}
407
408/// Yields each settled [`RepoChange`] as a stream item (the `stream` feature).
409/// Equivalent to looping [`recv`](RepoWatcher::recv) — both pull from the same
410/// underlying channel (an item is delivered to whichever is polled first, never
411/// duplicated) and both advance [`current`](RepoWatcher::current).
412#[cfg(feature = "stream")]
413#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
414impl futures_core::Stream for RepoWatcher {
415    type Item = RepoChange;
416
417    fn poll_next(
418        self: std::pin::Pin<&mut Self>,
419        cx: &mut std::task::Context<'_>,
420    ) -> std::task::Poll<Option<RepoChange>> {
421        // All fields are Unpin, so the watcher is Unpin and get_mut is sound.
422        let this = self.get_mut();
423        match this.rx.poll_recv(cx) {
424            std::task::Poll::Ready(Some(change)) => {
425                this.current = change.snapshot.clone();
426                std::task::Poll::Ready(Some(change))
427            }
428            other => other,
429        }
430    }
431}
432
433impl Drop for RepoWatcher {
434    fn drop(&mut self) {
435        // The dropped `_watcher` already closes the signal channel (ending the
436        // loop); abort is belt-and-braces for prompt teardown.
437        self.task.abort();
438    }
439}
440
441/// The background loop: coalesce a burst of filesystem signals, re-query the
442/// settled state, diff against the previous, and emit a [`RepoChange`] when
443/// anything changed.
444///
445/// A free function over plain channels + a [`VcsRepo`] (not a method) on
446/// purpose: the hermetic pipeline tests below drive it directly — a fake signal
447/// channel in, a `ScriptedRunner`-backed `Repo`, a paused tokio clock — pinning
448/// the debounce/ceiling/skip semantics without any real filesystem or process.
449async fn watch_loop(
450    repo: Box<dyn VcsRepo>,
451    mut raw_rx: mpsc::UnboundedReceiver<()>,
452    out_tx: mpsc::Sender<RepoChange>,
453    mut prev: event::WatchState,
454    config: LoopConfig,
455    stats: Arc<StatsInner>,
456) {
457    loop {
458        // Block until the first signal (or exit when the watcher is dropped).
459        if raw_rx.recv().await.is_none() {
460            return;
461        }
462        // Coalesce the burst: reset a `debounce` quiet-timer on every new signal,
463        // but never wait past `max_wait` total. The dedicated `sleep_until` arm
464        // makes the ceiling exact (it fires even when no further signal arrives);
465        // the in-arm deadline check guards against a signal stream so dense that
466        // the `biased` select never polls the timer arms.
467        drain(&mut raw_rx);
468        let deadline = tokio::time::Instant::now() + config.max_wait;
469        loop {
470            tokio::select! {
471                biased;
472                sig = raw_rx.recv() => {
473                    if sig.is_none() {
474                        return; // watcher dropped mid-burst
475                    }
476                    // Collapse the queued backlog: under a notify storm each
477                    // queued unit signal would otherwise cost a select iteration
478                    // that re-creates BOTH timer futures — a burst is one
479                    // "still busy" observation, not N.
480                    drain(&mut raw_rx);
481                    if tokio::time::Instant::now() >= deadline {
482                        break; // ceiling reached — re-query now
483                    }
484                    // else: another event — loop resets the quiet timer
485                }
486                _ = tokio::time::sleep_until(deadline) => break, // ceiling
487                _ = tokio::time::sleep(config.debounce) => break, // settled
488            }
489        }
490
491        // Re-query the settled state, bounded by the configured deadline — a
492        // wedged command (a held `index.lock` on a client with no timeout) must
493        // not stall the watch forever. Dropping the overrun future kills the
494        // spawned process tree (processkit's kill-on-drop group), so a timed-out
495        // query leaves no orphan. Failures and overruns are *transient skips*:
496        // counted, traced, and re-checked on the next filesystem event.
497        stats.note_requery();
498        let requery = async {
499            let snapshot = repo
500                .snapshot()
501                .await
502                .map_err(|e| (WatcherErrorKind::Snapshot, e))?;
503            let branches = repo
504                .local_branches()
505                .await
506                .map_err(|e| (WatcherErrorKind::Branches, e))?;
507            Ok::<_, (WatcherErrorKind, vcs_core::Error)>((snapshot, branches))
508        };
509        let outcome = match config.requery_timeout {
510            Some(limit) => match tokio::time::timeout(limit, requery).await {
511                Ok(result) => result,
512                Err(_elapsed) => {
513                    stats.note_skip(WatcherErrorKind::Timeout);
514                    #[cfg(feature = "tracing")]
515                    tracing::debug!(
516                        timeout = ?limit,
517                        "vcs-watch: re-query exceeded its deadline; killed and skipped"
518                    );
519                    continue;
520                }
521            },
522            None => requery.await,
523        };
524        let (snapshot, branches) = match outcome {
525            Ok(pair) => pair,
526            Err((kind, _e)) => {
527                stats.note_skip(kind);
528                #[cfg(feature = "tracing")]
529                tracing::debug!(error = %_e, "vcs-watch: re-query failed; skipping");
530                continue;
531            }
532        };
533
534        let next = event::WatchState::from_snapshot(&snapshot, branches);
535        let events = event::diff(&prev, &next);
536        prev = next;
537        if events.is_empty() {
538            continue;
539        }
540        if out_tx.send(RepoChange { snapshot, events }).await.is_err() {
541            return; // receiver dropped — stop
542        }
543        stats.note_change();
544    }
545}
546
547/// Drop every already-queued unit signal — the burst is one observation. Leaves
548/// channel-closed detection to the caller's next `recv` (a drained-empty and a
549/// closed channel both just stop yielding here).
550fn drain(raw_rx: &mut mpsc::UnboundedReceiver<()>) {
551    while raw_rx.try_recv().is_ok() {}
552}
553
554/// The directories to watch for a backend, deduplicated. Normally one — the
555/// `.git`/`.jj` state dir (see [`state_dir`]) — but a **linked git worktree** has
556/// two: its private gitdir (HEAD/index/logs) *and* the shared git dir it points
557/// at via `commondir` (`refs/heads/*` and `packed-refs`, where branch
558/// create/delete actually lands). Watching only the private dir would miss every
559/// `BranchCreated`/`BranchDeleted` on a worktree, since the shared dir is a
560/// *sibling*, not nested under it (see [`common_dir`]).
561///
562/// Overlapping watches are harmless — the re-query+debounce coalesces duplicate
563/// signals — but we drop a second dir whose normalized path equals the first, so
564/// `notify` isn't asked to watch the same path twice.
565fn state_dirs(kind: BackendKind, root: &Path) -> Result<Vec<PathBuf>> {
566    let state_dir = state_dir(kind, root)?;
567    let mut dirs = vec![state_dir.clone()];
568    if let Some(shared) = common_dir(&state_dir)
569        && normalize(&shared) != normalize(&state_dir)
570    {
571        dirs.push(shared);
572    }
573    Ok(dirs)
574}
575
576/// The directory to watch for a backend: `.jj` for jj, `.git` for git. A
577/// worktree's `.git` is a gitlink *file* (`gitdir: <path>`); resolve it to the
578/// real git directory. Best-effort — falls back to the `.git` path itself.
579fn state_dir(kind: BackendKind, root: &Path) -> Result<PathBuf> {
580    match kind {
581        BackendKind::Jj => Ok(root.join(".jj")),
582        BackendKind::Git => {
583            let dot_git = root.join(".git");
584            if dot_git.is_file() {
585                let content = std::fs::read_to_string(&dot_git)?;
586                if let Some(rest) = content.trim().strip_prefix("gitdir:") {
587                    let p = PathBuf::from(rest.trim());
588                    return Ok(if p.is_absolute() { p } else { root.join(p) });
589                }
590            }
591            Ok(dot_git)
592        }
593        // `BackendKind` is `#[non_exhaustive]`; for an unknown future backend
594        // watch the repo root itself — coarser, but it can't miss the state dir.
595        _ => Ok(root.to_path_buf()),
596    }
597}
598
599/// The **shared** git directory for a linked worktree, or `None` for a plain
600/// repo. A linked worktree's resolved gitdir holds a `commondir` file whose
601/// content is a path (typically relative, e.g. `../..`) to the shared `.git` —
602/// where `refs/heads/*` and `packed-refs` live. We join it to the gitdir and
603/// resolve `..` (lexically, matching the no-canonicalize style of [`state_dir`],
604/// so the registered path stays plain rather than a Windows `\\?\` verbatim one).
605/// A plain repo has no `commondir` file, so this is `None` and behaviour is
606/// unchanged.
607fn common_dir(state_dir: &Path) -> Option<PathBuf> {
608    let commondir = state_dir.join("commondir");
609    let content = std::fs::read_to_string(&commondir).ok()?;
610    let rel = content.trim();
611    if rel.is_empty() {
612        return None;
613    }
614    let p = PathBuf::from(rel);
615    let joined = if p.is_absolute() {
616        p
617    } else {
618        state_dir.join(p)
619    };
620    Some(lexically_normalized(&joined))
621}
622
623/// Resolve `.`/`..` components without touching the filesystem, keeping the path
624/// in its original (non-verbatim) form — `commondir`'s `../..` plus a Windows
625/// gitdir would otherwise leave literal `..` segments in the watched path.
626fn lexically_normalized(p: &Path) -> PathBuf {
627    use std::path::Component;
628    let mut out = PathBuf::new();
629    for comp in p.components() {
630        match comp {
631            Component::ParentDir => {
632                // Pop a real segment; keep a leading `..` that can't be resolved.
633                if !out.pop() {
634                    out.push(comp);
635                }
636            }
637            Component::CurDir => {}
638            other => out.push(other),
639        }
640    }
641    out
642}
643
644/// Canonicalize for comparison and strip the Windows verbatim prefix (`\\?\…`,
645/// which `canonicalize` adds), so two spellings of the same dir dedup. Mirrors
646/// `vcs-core`'s path-compare normalization; falls back to the input when the path
647/// can't be canonicalized (then equal paths still compare equal byte-for-byte).
648fn normalize(p: &Path) -> PathBuf {
649    let canonical = p.canonicalize().unwrap_or_else(|_| p.to_path_buf());
650    #[cfg(windows)]
651    {
652        let s = canonical.to_string_lossy();
653        if let Some(rest) = s.strip_prefix(r"\\?\")
654            && !rest.starts_with("UNC\\")
655        {
656            return PathBuf::from(rest.to_string());
657        }
658    }
659    canonical
660}
661
662#[cfg(test)]
663mod tests {
664    use super::*;
665    use std::sync::atomic::{AtomicU64, Ordering};
666
667    static COUNTER: AtomicU64 = AtomicU64::new(0);
668
669    /// A unique, self-cleaning temp dir (no temp-dir crate needed for these
670    /// hermetic helper tests — pid + counter keeps parallel tests from colliding).
671    /// `pub(crate)`: the pipeline tests below reuse it for the scripted repo's
672    /// on-disk git dir (the snapshot's MERGE_HEAD probe reads the filesystem).
673    pub(crate) struct Scratch(pub(crate) PathBuf);
674    impl Scratch {
675        pub(crate) fn new() -> Self {
676            let p = std::env::temp_dir().join(format!(
677                "vcs-watch-commondir-{}-{}",
678                std::process::id(),
679                COUNTER.fetch_add(1, Ordering::Relaxed)
680            ));
681            std::fs::create_dir_all(&p).expect("create scratch dir");
682            Scratch(p)
683        }
684    }
685    impl Drop for Scratch {
686        fn drop(&mut self) {
687            let _ = std::fs::remove_dir_all(&self.0);
688        }
689    }
690
691    // A plain (non-worktree) git dir has no `commondir` file → no shared dir, so
692    // behaviour is exactly today's single-dir watch.
693    #[test]
694    fn no_commondir_file_yields_none() {
695        let scratch = Scratch::new();
696        let git_dir = scratch.0.join(".git");
697        std::fs::create_dir_all(&git_dir).expect("mkdir .git");
698        assert_eq!(common_dir(&git_dir), None);
699    }
700
701    // A linked-worktree layout: the private gitdir holds `commondir` = `../..`
702    // (git's actual content), which must resolve to the sibling shared `.git`.
703    #[test]
704    fn relative_commondir_resolves_to_shared_git_dir() {
705        let scratch = Scratch::new();
706        let shared = scratch.0.join(".git");
707        let private = shared.join("worktrees").join("wt");
708        std::fs::create_dir_all(&private).expect("mkdir private gitdir");
709        // git writes `../..` (relative to the private dir) here.
710        std::fs::write(private.join("commondir"), "../..\n").expect("write commondir");
711
712        let resolved = common_dir(&private).expect("Some(shared dir)");
713        // `<shared>/worktrees/wt` + `../..` == `<shared>` (lexically, no `..` left).
714        assert_eq!(resolved, lexically_normalized(&shared));
715        assert!(
716            !resolved.to_string_lossy().contains(".."),
717            "the `..` segments must be resolved, got {}",
718            resolved.display()
719        );
720    }
721
722    // An absolute `commondir` (git permits it) is taken as-is.
723    #[test]
724    fn absolute_commondir_is_used_verbatim() {
725        let scratch = Scratch::new();
726        let shared = scratch.0.join("shared-git");
727        let private = scratch.0.join("private");
728        std::fs::create_dir_all(&private).expect("mkdir private");
729        std::fs::write(private.join("commondir"), format!("{}\n", shared.display()))
730            .expect("write commondir");
731
732        assert_eq!(common_dir(&private), Some(lexically_normalized(&shared)));
733    }
734
735    // `state_dirs` returns both the private and shared dirs for a worktree, and
736    // the shared dir is not the private one (so two distinct watches register).
737    #[test]
738    fn state_dirs_includes_private_and_shared_for_worktree() {
739        let scratch = Scratch::new();
740        let root = scratch.0.join("wt-worktree");
741        let shared = scratch.0.join(".git");
742        let private = shared.join("worktrees").join("wt");
743        std::fs::create_dir_all(&private).expect("mkdir private gitdir");
744        std::fs::create_dir_all(&root).expect("mkdir worktree root");
745        std::fs::write(private.join("commondir"), "../..\n").expect("write commondir");
746        // The worktree's `.git` gitlink file points at the private dir.
747        std::fs::write(
748            root.join(".git"),
749            format!("gitdir: {}\n", private.display()),
750        )
751        .expect("write gitlink");
752
753        let dirs = state_dirs(BackendKind::Git, &root).expect("state_dirs");
754        assert_eq!(dirs.len(), 2, "private + shared, got {dirs:?}");
755        assert_eq!(normalize(&dirs[0]), normalize(&private));
756        assert_eq!(normalize(&dirs[1]), normalize(&shared));
757    }
758
759    // When `commondir` resolves back to the state dir itself (degenerate), the
760    // duplicate is dropped — we never register the same path twice.
761    #[test]
762    fn self_referential_commondir_is_deduped() {
763        let scratch = Scratch::new();
764        let git_dir = scratch.0.join(".git");
765        std::fs::create_dir_all(&git_dir).expect("mkdir .git");
766        // `.` resolves to the dir itself.
767        std::fs::write(git_dir.join("commondir"), ".\n").expect("write commondir");
768        // The gitlink points the worktree root at this very dir.
769        let root = scratch.0.join("root");
770        std::fs::create_dir_all(&root).expect("mkdir root");
771        std::fs::write(
772            root.join(".git"),
773            format!("gitdir: {}\n", git_dir.display()),
774        )
775        .expect("write gitlink");
776
777        let dirs = state_dirs(BackendKind::Git, &root).expect("state_dirs");
778        assert_eq!(dirs.len(), 1, "self-reference deduped, got {dirs:?}");
779    }
780}
781
782/// Hermetic tests of the debounce → ceiling → re-query → diff pipeline itself:
783/// `watch_loop` is driven directly with a fake signal channel, a
784/// `ScriptedRunner`-backed `Repo`, and a **paused tokio clock** — no real
785/// filesystem watch, no real process, no real sleeps. These pin the *loop's*
786/// timing contract; the notify→signal bridge stays covered by the `#[ignore]`
787/// integration tests (fake time says nothing about real OS event batching).
788#[cfg(test)]
789mod pipeline_tests {
790    use super::tests::Scratch;
791    use super::*;
792    use processkit::{ProcessRunner, Reply, ScriptedRunner};
793    use vcs_core::Repo;
794    use vcs_core::vcs_git::Git;
795
796    /// Porcelain-v2 (NUL-separated) status output for a repo at `head`, clean.
797    fn v2(head: &str) -> String {
798        format!("# branch.oid {head}\0# branch.head main\0")
799    }
800
801    /// The exact command set one snapshot+branches re-query issues, scripted:
802    /// `status --porcelain=v2`, the `rev-parse --git-dir` probe (must point at a
803    /// real dir — the op-state probe reads `MERGE_HEAD` off the filesystem), and
804    /// `branch --no-column`.
805    fn scripted(gitdir: &Path, head: &str) -> ScriptedRunner {
806        ScriptedRunner::new()
807            .on(["status"], Reply::ok(v2(head)))
808            .on(["rev-parse"], Reply::ok(format!("{}\n", gitdir.display())))
809            .on(["branch"], Reply::ok("* main\n"))
810    }
811
812    fn scripted_repo(gitdir: &Path, head: &str) -> Box<dyn VcsRepo> {
813        Box::new(Repo::from_git(
814            "/r",
815            "/r",
816            Git::with_runner(scripted(gitdir, head)),
817        ))
818    }
819
820    /// The baseline `prev` state the loop diffs against, taken through the same
821    /// snapshot path `Builder::build` uses.
822    async fn baseline(gitdir: &Path, head: &str) -> event::WatchState {
823        let repo = scripted_repo(gitdir, head);
824        let snap = repo.snapshot().await.expect("baseline snapshot");
825        let branches = repo.local_branches().await.expect("baseline branches");
826        event::WatchState::from_snapshot(&snap, branches)
827    }
828
829    fn defaults() -> LoopConfig {
830        LoopConfig {
831            debounce: Duration::from_millis(250),
832            max_wait: Duration::from_secs(1),
833            requery_timeout: Some(Duration::from_secs(30)),
834            output_capacity: 64,
835        }
836    }
837
838    struct Harness {
839        sig: mpsc::UnboundedSender<()>,
840        out: mpsc::Receiver<RepoChange>,
841        stats: Arc<StatsInner>,
842        task: tokio::task::JoinHandle<()>,
843    }
844
845    fn spawn_loop(repo: Box<dyn VcsRepo>, prev: event::WatchState, config: LoopConfig) -> Harness {
846        let (sig, raw_rx) = mpsc::unbounded_channel();
847        let (out_tx, out) = mpsc::channel(config.output_capacity);
848        let stats = Arc::new(StatsInner::default());
849        let task = tokio::spawn(watch_loop(
850            repo,
851            raw_rx,
852            out_tx,
853            prev,
854            config,
855            Arc::clone(&stats),
856        ));
857        Harness {
858            sig,
859            out,
860            stats,
861            task,
862        }
863    }
864
865    /// Let the loop task run to a quiescent point without advancing time —
866    /// paused-clock auto-advance only triggers when every task idles on a timer,
867    /// so a bounded yield burst (never a spin-until loop) is the safe way to let
868    /// an already-runnable re-query complete.
869    async fn settle() {
870        for _ in 0..32 {
871            tokio::task::yield_now().await;
872        }
873    }
874
875    // A burst of sub-debounce signals coalesces into exactly one re-query and
876    // one emitted change.
877    #[tokio::test(start_paused = true)]
878    async fn debounce_coalesces_burst() {
879        let scratch = Scratch::new();
880        let prev = baseline(&scratch.0, "aaa").await;
881        let mut h = spawn_loop(scripted_repo(&scratch.0, "bbb"), prev, defaults());
882
883        for _ in 0..5 {
884            h.sig.send(()).expect("send");
885            tokio::time::advance(Duration::from_millis(10)).await;
886        }
887        let change = h.out.recv().await.expect("one coalesced change");
888        assert!(
889            change
890                .events
891                .iter()
892                .any(|e| matches!(e, RepoEvent::HeadMoved { .. })),
893            "expected HeadMoved, got {:?}",
894            change.events
895        );
896
897        // Long quiet: nothing else arrives, and exactly one re-query ran.
898        tokio::time::advance(Duration::from_secs(5)).await;
899        settle().await;
900        assert!(
901            h.out.try_recv().is_err(),
902            "burst must coalesce to one change"
903        );
904        let stats = h.stats.snapshot();
905        assert_eq!((stats.requeries, stats.changes), (1, 1));
906    }
907
908    // Signals arriving faster than the quiet window forever: the `max_wait`
909    // ceiling still forces a re-query at its cadence (the dedicated
910    // `sleep_until` arm — not just "on the next signal after the deadline").
911    #[tokio::test(start_paused = true)]
912    async fn max_wait_caps_continuous_signals() {
913        let scratch = Scratch::new();
914        let prev = baseline(&scratch.0, "aaa").await;
915        let h_config = defaults();
916        let mut h = spawn_loop(scripted_repo(&scratch.0, "bbb"), prev, h_config);
917
918        // A pump that fires a signal every 100 ms — always inside the 250 ms
919        // quiet window, so only the ceiling can break the burst.
920        let pump_sig = h.sig.clone();
921        let pump = tokio::spawn(async move {
922            loop {
923                if pump_sig.send(()).is_err() {
924                    return;
925                }
926                tokio::time::sleep(Duration::from_millis(100)).await;
927            }
928        });
929
930        let change = tokio::time::timeout(Duration::from_secs(2), h.out.recv())
931            .await
932            .expect("the ceiling must fire within max_wait")
933            .expect("change");
934        assert!(
935            change
936                .events
937                .iter()
938                .any(|e| matches!(e, RepoEvent::HeadMoved { .. })),
939            "got {:?}",
940            change.events
941        );
942        pump.abort();
943    }
944
945    // The base case: one signal, a quiet gap, one re-query.
946    #[tokio::test(start_paused = true)]
947    async fn quiet_gap_triggers_requery() {
948        let scratch = Scratch::new();
949        let prev = baseline(&scratch.0, "aaa").await;
950        let mut h = spawn_loop(scripted_repo(&scratch.0, "bbb"), prev, defaults());
951
952        h.sig.send(()).expect("send");
953        let change = h.out.recv().await.expect("change after the quiet gap");
954        assert!(
955            change
956                .events
957                .iter()
958                .any(|e| matches!(e, RepoEvent::HeadMoved { .. }))
959        );
960    }
961
962    // A re-query that finds the same state emits nothing — but it *ran* (the
963    // stats distinguish "no change" from "never re-queried").
964    #[tokio::test(start_paused = true)]
965    async fn no_change_yields_no_emission() {
966        let scratch = Scratch::new();
967        let prev = baseline(&scratch.0, "aaa").await;
968        // Same head as the baseline → empty diff.
969        let mut h = spawn_loop(scripted_repo(&scratch.0, "aaa"), prev, defaults());
970
971        h.sig.send(()).expect("send");
972        settle().await; // let the loop register its quiet timer first
973        tokio::time::advance(Duration::from_millis(300)).await; // past debounce
974        settle().await; // let the re-query run
975
976        let stats = h.stats.snapshot();
977        assert_eq!((stats.requeries, stats.changes, stats.skipped), (1, 0, 0));
978        assert!(
979            h.out.try_recv().is_err(),
980            "no events for an unchanged state"
981        );
982    }
983
984    /// Fails the first `status` call (a transiently held lock), then behaves —
985    /// `ScriptedRunner` rules are stateless, so the two-phase behaviour needs a
986    /// tiny stateful runner delegating to throwaway scripted ones.
987    struct FlakyStatus {
988        fails_left: AtomicU64,
989        gitdir: PathBuf,
990        head: &'static str,
991    }
992
993    #[async_trait::async_trait]
994    impl ProcessRunner for FlakyStatus {
995        async fn output(
996            &self,
997            command: &processkit::Command,
998        ) -> processkit::Result<processkit::ProcessResult<String>> {
999            let is_status = command.arguments().first().map(|a| a == "status") == Some(true);
1000            if is_status && self.fails_left.load(Ordering::Relaxed) > 0 {
1001                self.fails_left.fetch_sub(1, Ordering::Relaxed);
1002                return Err(processkit::Error::Exit {
1003                    program: "git".into(),
1004                    code: 128,
1005                    stdout: String::new(),
1006                    stderr: "fatal: Unable to create '.git/index.lock'".into(),
1007                });
1008            }
1009            scripted(&self.gitdir, self.head).output(command).await
1010        }
1011    }
1012
1013    // A transient re-query failure is skipped (counted, no emission); the next
1014    // signal re-checks and recovers.
1015    #[tokio::test(start_paused = true)]
1016    async fn transient_failure_skips_then_recovers() {
1017        let scratch = Scratch::new();
1018        let prev = baseline(&scratch.0, "aaa").await;
1019        let repo = Box::new(Repo::from_git(
1020            "/r",
1021            "/r",
1022            Git::with_runner(FlakyStatus {
1023                fails_left: AtomicU64::new(1),
1024                gitdir: scratch.0.clone(),
1025                head: "bbb",
1026            }),
1027        ));
1028        let mut h = spawn_loop(repo, prev, defaults());
1029
1030        // First attempt: the snapshot fails → skip, nothing emitted.
1031        h.sig.send(()).expect("send");
1032        settle().await; // loop registers the quiet timer
1033        tokio::time::advance(Duration::from_millis(300)).await;
1034        settle().await; // the (failing) re-query runs
1035        let stats = h.stats.snapshot();
1036        assert_eq!((stats.requeries, stats.skipped, stats.changes), (1, 1, 0));
1037        assert_eq!(stats.last_error, Some(WatcherErrorKind::Snapshot));
1038        assert!(h.out.try_recv().is_err());
1039
1040        // Second signal: the lock "cleared" — the re-query recovers and emits.
1041        h.sig.send(()).expect("send");
1042        let change = h.out.recv().await.expect("recovered change");
1043        assert!(
1044            change
1045                .events
1046                .iter()
1047                .any(|e| matches!(e, RepoEvent::HeadMoved { .. }))
1048        );
1049        let stats = h.stats.snapshot();
1050        assert_eq!((stats.requeries, stats.changes), (2, 1));
1051    }
1052
1053    /// Delays every reply by `delay` (virtual time — `tokio::time::sleep`, NOT a
1054    /// thread sleep, so the paused clock controls it). `ScriptedRunner` replies
1055    /// instantly, so this is the only way to exercise the `requery_timeout`
1056    /// wrapper — a scripted `Reply::timeout()` resolves immediately and would
1057    /// test the *error* path, not the deadline.
1058    struct Sleepy {
1059        delay: Duration,
1060        gitdir: PathBuf,
1061        head: &'static str,
1062    }
1063
1064    #[async_trait::async_trait]
1065    impl ProcessRunner for Sleepy {
1066        async fn output(
1067            &self,
1068            command: &processkit::Command,
1069        ) -> processkit::Result<processkit::ProcessResult<String>> {
1070            tokio::time::sleep(self.delay).await;
1071            scripted(&self.gitdir, self.head).output(command).await
1072        }
1073    }
1074
1075    // A re-query exceeding the configured deadline is killed and skipped as
1076    // transient; the loop survives (a later attempt runs and is also bounded).
1077    #[tokio::test(start_paused = true)]
1078    async fn requery_timeout_skips_as_transient() {
1079        let scratch = Scratch::new();
1080        let prev = baseline(&scratch.0, "aaa").await;
1081        let repo = Box::new(Repo::from_git(
1082            "/r",
1083            "/r",
1084            Git::with_runner(Sleepy {
1085                delay: Duration::from_secs(10),
1086                gitdir: scratch.0.clone(),
1087                head: "bbb",
1088            }),
1089        ));
1090        let config = LoopConfig {
1091            requery_timeout: Some(Duration::from_secs(5)),
1092            ..defaults()
1093        };
1094        let mut h = spawn_loop(repo, prev, config);
1095
1096        h.sig.send(()).expect("send");
1097        settle().await; // loop registers the quiet timer
1098        tokio::time::advance(Duration::from_millis(300)).await; // debounce
1099        settle().await; // re-query starts; Sleepy + the deadline register timers
1100        tokio::time::advance(Duration::from_secs(6)).await; // past the deadline
1101        settle().await;
1102        let stats = h.stats.snapshot();
1103        assert_eq!((stats.requeries, stats.skipped, stats.changes), (1, 1, 0));
1104        assert_eq!(stats.last_error, Some(WatcherErrorKind::Timeout));
1105        assert!(h.out.try_recv().is_err());
1106
1107        // The loop is alive: a second attempt runs (and times out the same way).
1108        h.sig.send(()).expect("send");
1109        settle().await;
1110        tokio::time::advance(Duration::from_millis(300)).await;
1111        settle().await;
1112        tokio::time::advance(Duration::from_secs(6)).await;
1113        settle().await;
1114        assert_eq!(h.stats.snapshot().requeries, 2);
1115    }
1116
1117    // Closing the signal channel mid-debounce ends the loop promptly and closes
1118    // the output channel.
1119    #[tokio::test(start_paused = true)]
1120    async fn drop_teardown_mid_debounce() {
1121        let scratch = Scratch::new();
1122        let prev = baseline(&scratch.0, "aaa").await;
1123        let Harness {
1124            sig,
1125            mut out,
1126            stats: _,
1127            task,
1128        } = spawn_loop(scripted_repo(&scratch.0, "bbb"), prev, defaults());
1129
1130        sig.send(()).expect("send");
1131        tokio::time::advance(Duration::from_millis(100)).await; // mid-debounce
1132        drop(sig);
1133
1134        tokio::time::timeout(Duration::from_secs(1), task)
1135            .await
1136            .expect("loop ends promptly")
1137            .expect("loop task joins cleanly");
1138        assert!(out.recv().await.is_none(), "output closes with the loop");
1139    }
1140
1141    /// Reports a different head on every `status` call, so every re-query
1142    /// produces a `HeadMoved` — the emission generator the backpressure test
1143    /// needs to fill the bounded output channel.
1144    struct VaryingHead {
1145        statuses: AtomicU64,
1146        gitdir: PathBuf,
1147    }
1148
1149    #[async_trait::async_trait]
1150    impl ProcessRunner for VaryingHead {
1151        async fn output(
1152            &self,
1153            command: &processkit::Command,
1154        ) -> processkit::Result<processkit::ProcessResult<String>> {
1155            let is_status = command.arguments().first().map(|a| a == "status") == Some(true);
1156            let n = if is_status {
1157                self.statuses.fetch_add(1, Ordering::Relaxed)
1158            } else {
1159                self.statuses.load(Ordering::Relaxed)
1160            };
1161            scripted(&self.gitdir, &format!("h{n}"))
1162                .output(command)
1163                .await
1164        }
1165    }
1166
1167    // A full output channel parks the loop at `send` (backpressure) instead of
1168    // dropping or buffering unboundedly; draining one item unparks it.
1169    #[tokio::test(start_paused = true)]
1170    async fn backpressure_parks_loop() {
1171        let scratch = Scratch::new();
1172        let prev = baseline(&scratch.0, "base").await;
1173        let repo = Box::new(Repo::from_git(
1174            "/r",
1175            "/r",
1176            Git::with_runner(VaryingHead {
1177                statuses: AtomicU64::new(0),
1178                gitdir: scratch.0.clone(),
1179            }),
1180        ));
1181        let config = LoopConfig {
1182            output_capacity: 1,
1183            ..defaults()
1184        };
1185        let mut h = spawn_loop(repo, prev, config);
1186
1187        // First change fills the capacity-1 channel.
1188        h.sig.send(()).expect("send");
1189        settle().await; // loop registers the quiet timer
1190        tokio::time::advance(Duration::from_millis(300)).await;
1191        settle().await; // re-query runs; emission 1 fills the channel
1192        // Second re-query produces another change; the send parks (channel full):
1193        // the re-query ran but the emission hasn't landed.
1194        h.sig.send(()).expect("send");
1195        settle().await;
1196        tokio::time::advance(Duration::from_millis(300)).await;
1197        settle().await;
1198        let stats = h.stats.snapshot();
1199        assert_eq!(
1200            (stats.requeries, stats.changes),
1201            (2, 1),
1202            "second emission must be parked on the full channel"
1203        );
1204
1205        // Draining unparks the loop; both changes arrive in order.
1206        let first = h.out.recv().await.expect("first change");
1207        assert!(
1208            first
1209                .events
1210                .iter()
1211                .any(|e| matches!(e, RepoEvent::HeadMoved { .. }))
1212        );
1213        let second = h.out.recv().await.expect("second change");
1214        assert!(
1215            second
1216                .events
1217                .iter()
1218                .any(|e| matches!(e, RepoEvent::HeadMoved { .. }))
1219        );
1220        settle().await;
1221        assert_eq!(h.stats.snapshot().changes, 2);
1222    }
1223
1224    // The `stream` feature: `StreamExt::next` on the REAL `RepoWatcher` yields
1225    // what `recv` would and advances `current()` identically. The watcher is
1226    // assembled directly (same crate) around the loop harness's channel, with an
1227    // idle notify watcher standing in for the OS watch.
1228    #[cfg(feature = "stream")]
1229    #[tokio::test(start_paused = true)]
1230    async fn stream_yields_changes_and_advances_current() {
1231        use tokio_stream::StreamExt;
1232
1233        let scratch = Scratch::new();
1234        let prev = baseline(&scratch.0, "aaa").await;
1235        let h = spawn_loop(scripted_repo(&scratch.0, "bbb"), prev, defaults());
1236
1237        let baseline_snap = scripted_repo(&scratch.0, "aaa")
1238            .snapshot()
1239            .await
1240            .expect("baseline snapshot");
1241        let mut watcher = RepoWatcher {
1242            rx: h.out,
1243            current: baseline_snap,
1244            stats: h.stats,
1245            _watcher: notify::recommended_watcher(|_res| {}).expect("idle watcher"),
1246            task: h.task,
1247        };
1248        assert_eq!(watcher.current().head.as_deref(), Some("aaa"));
1249
1250        h.sig.send(()).expect("send");
1251        let change = watcher.next().await.expect("stream item");
1252        assert!(
1253            change
1254                .events
1255                .iter()
1256                .any(|e| matches!(e, RepoEvent::HeadMoved { .. })),
1257            "got {:?}",
1258            change.events
1259        );
1260        // Polling through the Stream advanced `current()` exactly like `recv`.
1261        assert_eq!(watcher.current().head.as_deref(), Some("bbb"));
1262    }
1263}
1264
1265// Long-form how-to guides, rendered from this crate's docs/*.md on docs.rs.
1266#[doc = include_str!("../docs/watch.md")]
1267#[allow(rustdoc::broken_intra_doc_links)]
1268pub mod guide {}