vcs_watch/lib.rs
1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![deny(rustdoc::broken_intra_doc_links)]
3//! `vcs-watch` — filesystem-watch a git/jj repository and emit typed state-change
4//! events.
5//!
6//! A [`RepoWatcher`] watches a repository's `.git`/`.jj` state directory (and,
7//! optionally, the working tree), **debounces** the burst of writes a VCS
8//! operation makes, **re-queries** the repo state through
9//! [`vcs-core`](vcs_core)'s batched [`snapshot`](vcs_core::Repo::snapshot), and
10//! **diffs** it against the previous state to yield typed [`RepoEvent`]s. Each
11//! settled change arrives as a [`RepoChange`] carrying both the new
12//! [`RepoSnapshot`] (to render a prompt/status line) and the deltas (to react).
13//! It's the foundation for prompts, status bars, TUIs, and repo daemons.
14//!
15//! Re-query-and-diff — rather than interpreting raw filesystem events — is what
16//! makes it robust: git's ref temp-file renames, `index.lock` churn, and reflog
17//! noise all just coalesce into one "re-check the settled state" instead of being
18//! (mis)read as events. Noise that doesn't move observable state emits nothing,
19//! and every emission carries the true current state, so a stray event can't
20//! desync the consumer.
21//!
22//! # The surface
23//!
24//! - **[`RepoWatcher`]** — a live watch over one repository. Start it with
25//! [`RepoWatcher::watch`] (defaults) or the [`Builder`]; drop it to stop the OS
26//! watch and the background task.
27//! - **[`Builder`]** ([`RepoWatcher::builder`]) — set the watch scope and timing,
28//! then [`build`](Builder::build): [`working_tree`](Builder::working_tree) to
29//! also watch the tree recursively, [`debounce`](Builder::debounce) (the quiet
30//! window), [`max_wait`](Builder::max_wait) (the re-query ceiling under a
31//! continuous stream), [`requery_timeout`](Builder::requery_timeout) (the
32//! per-re-query deadline). The [`DEFAULT_REQUERY_TIMEOUT`] et al. name the
33//! defaults.
34//! - **[`RepoEvent`]** — one typed delta, derived by diffing two snapshots:
35//! [`HeadMoved`](RepoEvent::HeadMoved),
36//! [`BranchSwitched`](RepoEvent::BranchSwitched),
37//! [`BranchCreated`](RepoEvent::BranchCreated) /
38//! [`BranchDeleted`](RepoEvent::BranchDeleted),
39//! [`WorkingCopyChanged`](RepoEvent::WorkingCopyChanged), and the
40//! upstream/ahead-behind/operation/conflict variants (`#[non_exhaustive]`).
41//! - **[`RepoChange`]** — a settled change: the fresh [`RepoSnapshot`] (render a
42//! status line off it) plus the non-empty `events` vec (react to it).
43//! - **Consumption** — pull changes with [`recv`](RepoWatcher::recv)
44//! (`Option<RepoChange>`; `None` once dropped), or, under the **`stream`**
45//! feature, poll the watcher as a `futures_core::Stream`. Both pull from the
46//! same channel and advance [`current`](RepoWatcher::current), the last-pulled
47//! snapshot.
48//! - **[`WatcherStats`]** ([`stats`](RepoWatcher::stats)) — lock-free health
49//! counters (re-queries run, changes emitted, skips, and the last skip's
50//! [`WatcherErrorKind`]). Climbing [`skipped`](WatcherStats::skipped) with flat
51//! [`changes`](WatcherStats::changes) means a wedged repo — poll it from a
52//! health check rather than inferring health from event silence.
53//!
54//! # Recipes
55//!
56//! Watch with the defaults and react to each settled change:
57//!
58//! ```no_run
59//! use vcs_core::Repo;
60//! use vcs_watch::RepoWatcher;
61//! # async fn run() -> vcs_watch::Result<()> {
62//! let repo = Repo::open(".")?;
63//! let mut watcher = RepoWatcher::watch(repo).await?;
64//! while let Some(change) = watcher.recv().await {
65//! for event in &change.events {
66//! println!("{event:?}");
67//! }
68//! // `change.snapshot` is the fresh full state — render a status line off it.
69//! }
70//! # Ok(()) }
71//! ```
72//!
73//! Under the **`stream`** feature the watcher *is* a `futures_core::Stream`,
74//! so it drops into stream combinators and `tokio::select!` directly (needs
75//! `futures`/`tokio-stream`'s `StreamExt` in scope):
76//!
77//! ```ignore
78//! use futures::StreamExt;
79//! use vcs_core::Repo;
80//! use vcs_watch::RepoWatcher;
81//! # async fn run() -> vcs_watch::Result<()> {
82//! let repo = Repo::open(".")?;
83//! let mut watcher = RepoWatcher::watch(repo).await?;
84//! while let Some(change) = watcher.next().await {
85//! println!("{} event(s)", change.events.len());
86//! }
87//! # Ok(()) }
88//! ```
89//!
90//! **Runtime:** unlike the rest of the toolkit (which hides tokio behind
91//! `processkit`), `vcs-watch` uses **tokio at runtime** — the watch task and the
92//! debounce timer run on the caller's tokio runtime, so build/await it from
93//! within one.
94//!
95//! # Testing
96//!
97//! The debounce → ceiling → re-query pipeline is a free function over injected
98//! seams, so it is exercised hermetically on a **paused clock** (no real
99//! filesystem or sleeps); a consumer's own watch code tests the same way it tests
100//! any [`vcs-core`](vcs_core) consumer — build the [`Repo`](vcs_core::Repo) over a
101//! fake runner (processkit's `ScriptedRunner`) so the re-query returns canned
102//! state. See
103//! [vcs-testkit's guide](https://docs.rs/vcs-testkit/latest/vcs_testkit/guide/testing/).
104//!
105//! # In-depth guide
106//!
107//! Beyond this page, this crate ships a full how-to guide — rendered on docs.rs
108//! from `docs/`. See the [`guide`] module.
109
110use std::path::{Path, PathBuf};
111use std::sync::Arc;
112use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
113use std::time::Duration;
114
115use notify::{RecursiveMode, Watcher};
116use tokio::sync::mpsc;
117use vcs_core::{BackendKind, VcsRepo};
118
119mod error;
120mod event;
121
122pub use error::{Error, Result};
123pub use event::{RepoChange, RepoEvent};
124// Re-export the snapshot types a consumer reads off a `RepoChange`, so depending
125// on `vcs-watch` alone suffices.
126pub use vcs_core::{OperationState, RepoSnapshot};
127
128/// Default quiet window: a re-query fires once the watched dir has been silent
129/// for this long after the last event.
130const DEFAULT_DEBOUNCE: Duration = Duration::from_millis(250);
131/// Default ceiling: even under a continuous stream of events, re-query at least
132/// this often (so a long bulk operation still reports progress).
133const DEFAULT_MAX_WAIT: Duration = Duration::from_secs(1);
134/// Default deadline on a single re-query (`snapshot` + branch list): a wedged
135/// command (e.g. a held `index.lock` with no client timeout configured) is
136/// killed and skipped instead of stalling the watch loop forever.
137pub const DEFAULT_REQUERY_TIMEOUT: Duration = Duration::from_secs(30);
138/// Bounded output channel: a slow consumer applies backpressure (the loop pauses
139/// re-querying), and pending filesystem signals coalesce into one catch-up query.
140const OUTPUT_CAPACITY: usize = 64;
141
142/// The timing/capacity knobs the background loop runs under — bundled so the
143/// loop signature stays small and the hermetic tests can vary them (notably
144/// `output_capacity`, which the backpressure test shrinks to 1).
145struct LoopConfig {
146 debounce: Duration,
147 max_wait: Duration,
148 /// `None` disables the per-re-query deadline.
149 requery_timeout: Option<Duration>,
150 output_capacity: usize,
151}
152
153/// Builder for a [`RepoWatcher`] — set the watch scope and debounce timing, then
154/// [`build`](Builder::build).
155pub struct Builder {
156 repo: Box<dyn VcsRepo>,
157 working_tree: bool,
158 debounce: Duration,
159 max_wait: Duration,
160 requery_timeout: Option<Duration>,
161}
162
163impl Builder {
164 /// Also watch the **working tree** recursively, so a bare unstaged edit
165 /// (`vim file`) fires [`WorkingCopyChanged`](RepoEvent::WorkingCopyChanged)
166 /// immediately. Off by default (only the `.git`/`.jj` state dir is watched,
167 /// which catches an unstaged edit once it touches the index / a jj snapshot).
168 ///
169 /// Note: `notify` is `.gitignore`-unaware, so this also watches ignored and
170 /// build directories — heavier on a large tree.
171 pub fn working_tree(mut self, yes: bool) -> Self {
172 self.working_tree = yes;
173 self
174 }
175
176 /// The quiet window: re-query once the watched dir has been silent this long
177 /// after the last event (default 250 ms). Coalesces an operation's write
178 /// burst into one re-check.
179 pub fn debounce(mut self, window: Duration) -> Self {
180 self.debounce = window;
181 self
182 }
183
184 /// The ceiling on how long a continuous event stream defers the re-query
185 /// (default 1 s) — a long bulk operation still reports at this cadence.
186 pub fn max_wait(mut self, ceiling: Duration) -> Self {
187 self.max_wait = ceiling;
188 self
189 }
190
191 /// Deadline on a single re-query (the `snapshot` + branch-list pair), default
192 /// [`DEFAULT_REQUERY_TIMEOUT`] (30 s); `None` disables it. Orthogonal to
193 /// [`max_wait`](Self::max_wait): that bounds how long signals may *defer* a
194 /// re-query, this bounds how long one re-query may *run*. On overrun the
195 /// spawned commands are killed (kill-on-drop) and the re-query is skipped as
196 /// transient — the next filesystem event re-checks.
197 ///
198 /// Note: on a very large repository a *cold-cache* `git status` (first run
199 /// after a `gc`, or on a slow disk) can legitimately exceed the 30 s default
200 /// — raise it (or pass `None`) there; a watcher whose every re-query is
201 /// being killed shows up as climbing [`WatcherStats::skipped`] with flat
202 /// `changes`.
203 pub fn requery_timeout(mut self, timeout: Option<Duration>) -> Self {
204 self.requery_timeout = timeout;
205 self
206 }
207
208 /// Start watching. Captures the baseline state, registers the filesystem
209 /// watch, and spawns the background re-query task on the current tokio
210 /// runtime.
211 pub async fn build(self) -> Result<RepoWatcher> {
212 let root = self.repo.root().to_path_buf();
213 // The dirs whose writes mean "re-check": the `.git`/`.jj` state dir, plus
214 // — for a linked git worktree — the *shared* git dir it points at via
215 // `commondir` (where `refs/heads/*` and `packed-refs` actually live, so
216 // branch create/delete is seen). See `state_dirs`.
217 let state_dirs = state_dirs(self.repo.kind(), &root)?;
218
219 // Bridge: notify's callback thread pushes a unit signal per event into an
220 // unbounded channel (non-blocking, thread-safe); the debounce loop drains
221 // it. Build the watcher and register paths *before* the baseline snapshot,
222 // so a change racing the baseline is queued, not lost.
223 let (raw_tx, raw_rx) = mpsc::unbounded_channel::<()>();
224 let mut watcher = notify::recommended_watcher(move |_res| {
225 // Content is irrelevant — we re-query state, so any event (or watch
226 // error) just means "re-check". Send fails only after the loop ends.
227 let _ = raw_tx.send(());
228 })?;
229 if self.working_tree {
230 watcher.watch(&root, RecursiveMode::Recursive)?;
231 // A worktree gitlink puts the real (private and shared) git dirs
232 // outside `root`; cover any not already under the recursive root watch.
233 for dir in &state_dirs {
234 if !dir.starts_with(&root) {
235 watcher.watch(dir, RecursiveMode::Recursive)?;
236 }
237 }
238 } else {
239 for dir in &state_dirs {
240 watcher.watch(dir, RecursiveMode::Recursive)?;
241 }
242 }
243
244 let snapshot = self.repo.snapshot().await?;
245 let branches = self.repo.local_branches().await?;
246 let baseline = snapshot.clone();
247 let prev = event::WatchState::from_snapshot(&snapshot, branches);
248
249 let config = LoopConfig {
250 debounce: self.debounce,
251 max_wait: self.max_wait,
252 requery_timeout: self.requery_timeout,
253 output_capacity: OUTPUT_CAPACITY,
254 };
255 let stats = Arc::new(StatsInner::default());
256 let (out_tx, out_rx) = mpsc::channel::<RepoChange>(config.output_capacity);
257 let task = tokio::spawn(watch_loop(
258 self.repo,
259 raw_rx,
260 out_tx,
261 prev,
262 config,
263 Arc::clone(&stats),
264 ));
265
266 Ok(RepoWatcher {
267 rx: out_rx,
268 current: baseline,
269 stats,
270 _watcher: watcher,
271 task,
272 })
273 }
274}
275
276// --- Watcher health counters --------------------------------------------------
277
278/// What the last skipped re-query failed on (see [`WatcherStats::last_error`]).
279#[derive(Debug, Clone, Copy, PartialEq, Eq)]
280#[non_exhaustive]
281pub enum WatcherErrorKind {
282 /// The snapshot re-query returned an error (e.g. a transiently held lock).
283 Snapshot,
284 /// The branch-list re-query returned an error.
285 Branches,
286 /// The re-query exceeded [`Builder::requery_timeout`] and was killed.
287 Timeout,
288}
289
290/// A cheap point-in-time copy of the watcher's health counters — see
291/// [`RepoWatcher::stats`]. Lets a long-running consumer notice a watcher that is
292/// silently skipping re-queries (e.g. a permanently wedged repository) instead
293/// of inferring health from event silence.
294#[derive(Debug, Clone, Copy)]
295#[non_exhaustive]
296pub struct WatcherStats {
297 /// Re-query attempts started (settled bursts that reached the query step).
298 pub requeries: u64,
299 /// Re-queries that emitted a [`RepoChange`] (the rest found no difference).
300 pub changes: u64,
301 /// Re-queries skipped — transient query failures plus deadline overruns.
302 pub skipped: u64,
303 /// What the most recent skip failed on; `None` when nothing was ever skipped.
304 pub last_error: Option<WatcherErrorKind>,
305}
306
307/// Lock-free counter cell shared between the loop and `stats()` readers. Relaxed
308/// ordering is enough: the counters are independent monotonic telemetry, not a
309/// synchronization protocol.
310#[derive(Default)]
311struct StatsInner {
312 requeries: AtomicU64,
313 changes: AtomicU64,
314 skipped: AtomicU64,
315 /// 0 = none, else `WatcherErrorKind as u8 + 1`.
316 last_error: AtomicU8,
317}
318
319impl StatsInner {
320 fn note_requery(&self) {
321 self.requeries.fetch_add(1, Ordering::Relaxed);
322 }
323
324 fn note_change(&self) {
325 self.changes.fetch_add(1, Ordering::Relaxed);
326 }
327
328 fn note_skip(&self, kind: WatcherErrorKind) {
329 self.skipped.fetch_add(1, Ordering::Relaxed);
330 let code = match kind {
331 WatcherErrorKind::Snapshot => 1,
332 WatcherErrorKind::Branches => 2,
333 WatcherErrorKind::Timeout => 3,
334 };
335 self.last_error.store(code, Ordering::Relaxed);
336 }
337
338 fn snapshot(&self) -> WatcherStats {
339 let last_error = match self.last_error.load(Ordering::Relaxed) {
340 1 => Some(WatcherErrorKind::Snapshot),
341 2 => Some(WatcherErrorKind::Branches),
342 3 => Some(WatcherErrorKind::Timeout),
343 _ => None,
344 };
345 WatcherStats {
346 requeries: self.requeries.load(Ordering::Relaxed),
347 changes: self.changes.load(Ordering::Relaxed),
348 skipped: self.skipped.load(Ordering::Relaxed),
349 last_error,
350 }
351 }
352}
353
354/// A live watch over a repository, yielding [`RepoChange`]s as the repo's state
355/// changes. Dropping it stops the filesystem watch and the background task.
356pub struct RepoWatcher {
357 rx: mpsc::Receiver<RepoChange>,
358 current: RepoSnapshot,
359 stats: Arc<StatsInner>,
360 // Held to keep the OS watch alive; dropping it ends the watch (and the loop).
361 _watcher: notify::RecommendedWatcher,
362 task: tokio::task::JoinHandle<()>,
363}
364
365impl RepoWatcher {
366 /// A builder over `repo` (any [`VcsRepo`] — e.g. a [`vcs_core::Repo`]).
367 pub fn builder(repo: impl VcsRepo + 'static) -> Builder {
368 Builder {
369 repo: Box::new(repo),
370 working_tree: false,
371 debounce: DEFAULT_DEBOUNCE,
372 max_wait: DEFAULT_MAX_WAIT,
373 requery_timeout: Some(DEFAULT_REQUERY_TIMEOUT),
374 }
375 }
376
377 /// Start watching `repo` with the defaults (state dir only, 250 ms debounce).
378 pub async fn watch(repo: impl VcsRepo + 'static) -> Result<RepoWatcher> {
379 Self::builder(repo).build().await
380 }
381
382 /// Await the next settled change. Returns `None` once the watcher is dropped
383 /// or its background task ends.
384 pub async fn recv(&mut self) -> Option<RepoChange> {
385 let change = self.rx.recv().await?;
386 self.current = change.snapshot.clone();
387 Some(change)
388 }
389
390 /// The most recent known snapshot — the baseline captured at
391 /// [`build`](Builder::build), then the snapshot from each [`recv`](Self::recv).
392 /// It advances **only when you call [`recv`](Self::recv)**, so it is as fresh
393 /// as your last `recv`, not a live view.
394 pub fn current(&self) -> &RepoSnapshot {
395 &self.current
396 }
397
398 /// The watcher's health counters (re-queries run / changes emitted / skips,
399 /// and what the last skip failed on). Cheap relaxed-atomic reads — poll it
400 /// from a health check or log it periodically; a climbing
401 /// [`skipped`](WatcherStats::skipped) with flat
402 /// [`requeries`](WatcherStats::requeries) means the repository is wedged.
403 pub fn stats(&self) -> WatcherStats {
404 self.stats.snapshot()
405 }
406}
407
408/// Yields each settled [`RepoChange`] as a stream item (the `stream` feature).
409/// Equivalent to looping [`recv`](RepoWatcher::recv) — both pull from the same
410/// underlying channel (an item is delivered to whichever is polled first, never
411/// duplicated) and both advance [`current`](RepoWatcher::current).
412#[cfg(feature = "stream")]
413#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
414impl futures_core::Stream for RepoWatcher {
415 type Item = RepoChange;
416
417 fn poll_next(
418 self: std::pin::Pin<&mut Self>,
419 cx: &mut std::task::Context<'_>,
420 ) -> std::task::Poll<Option<RepoChange>> {
421 // All fields are Unpin, so the watcher is Unpin and get_mut is sound.
422 let this = self.get_mut();
423 match this.rx.poll_recv(cx) {
424 std::task::Poll::Ready(Some(change)) => {
425 this.current = change.snapshot.clone();
426 std::task::Poll::Ready(Some(change))
427 }
428 other => other,
429 }
430 }
431}
432
433impl Drop for RepoWatcher {
434 fn drop(&mut self) {
435 // The dropped `_watcher` already closes the signal channel (ending the
436 // loop); abort is belt-and-braces for prompt teardown.
437 self.task.abort();
438 }
439}
440
441/// The background loop: coalesce a burst of filesystem signals, re-query the
442/// settled state, diff against the previous, and emit a [`RepoChange`] when
443/// anything changed.
444///
445/// A free function over plain channels + a [`VcsRepo`] (not a method) on
446/// purpose: the hermetic pipeline tests below drive it directly — a fake signal
447/// channel in, a `ScriptedRunner`-backed `Repo`, a paused tokio clock — pinning
448/// the debounce/ceiling/skip semantics without any real filesystem or process.
449async fn watch_loop(
450 repo: Box<dyn VcsRepo>,
451 mut raw_rx: mpsc::UnboundedReceiver<()>,
452 out_tx: mpsc::Sender<RepoChange>,
453 mut prev: event::WatchState,
454 config: LoopConfig,
455 stats: Arc<StatsInner>,
456) {
457 loop {
458 // Block until the first signal (or exit when the watcher is dropped).
459 if raw_rx.recv().await.is_none() {
460 return;
461 }
462 // Coalesce the burst: reset a `debounce` quiet-timer on every new signal,
463 // but never wait past `max_wait` total. The dedicated `sleep_until` arm
464 // makes the ceiling exact (it fires even when no further signal arrives);
465 // the in-arm deadline check guards against a signal stream so dense that
466 // the `biased` select never polls the timer arms.
467 drain(&mut raw_rx);
468 let deadline = tokio::time::Instant::now() + config.max_wait;
469 loop {
470 tokio::select! {
471 biased;
472 sig = raw_rx.recv() => {
473 if sig.is_none() {
474 return; // watcher dropped mid-burst
475 }
476 // Collapse the queued backlog: under a notify storm each
477 // queued unit signal would otherwise cost a select iteration
478 // that re-creates BOTH timer futures — a burst is one
479 // "still busy" observation, not N.
480 drain(&mut raw_rx);
481 if tokio::time::Instant::now() >= deadline {
482 break; // ceiling reached — re-query now
483 }
484 // else: another event — loop resets the quiet timer
485 }
486 _ = tokio::time::sleep_until(deadline) => break, // ceiling
487 _ = tokio::time::sleep(config.debounce) => break, // settled
488 }
489 }
490
491 // Re-query the settled state, bounded by the configured deadline — a
492 // wedged command (a held `index.lock` on a client with no timeout) must
493 // not stall the watch forever. Dropping the overrun future kills the
494 // spawned process tree (processkit's kill-on-drop group), so a timed-out
495 // query leaves no orphan. Failures and overruns are *transient skips*:
496 // counted, traced, and re-checked on the next filesystem event.
497 stats.note_requery();
498 let requery = async {
499 let snapshot = repo
500 .snapshot()
501 .await
502 .map_err(|e| (WatcherErrorKind::Snapshot, e))?;
503 let branches = repo
504 .local_branches()
505 .await
506 .map_err(|e| (WatcherErrorKind::Branches, e))?;
507 Ok::<_, (WatcherErrorKind, vcs_core::Error)>((snapshot, branches))
508 };
509 let outcome = match config.requery_timeout {
510 Some(limit) => match tokio::time::timeout(limit, requery).await {
511 Ok(result) => result,
512 Err(_elapsed) => {
513 stats.note_skip(WatcherErrorKind::Timeout);
514 #[cfg(feature = "tracing")]
515 tracing::debug!(
516 timeout = ?limit,
517 "vcs-watch: re-query exceeded its deadline; killed and skipped"
518 );
519 continue;
520 }
521 },
522 None => requery.await,
523 };
524 let (snapshot, branches) = match outcome {
525 Ok(pair) => pair,
526 Err((kind, _e)) => {
527 stats.note_skip(kind);
528 #[cfg(feature = "tracing")]
529 tracing::debug!(error = %_e, "vcs-watch: re-query failed; skipping");
530 continue;
531 }
532 };
533
534 let next = event::WatchState::from_snapshot(&snapshot, branches);
535 let events = event::diff(&prev, &next);
536 prev = next;
537 if events.is_empty() {
538 continue;
539 }
540 if out_tx.send(RepoChange { snapshot, events }).await.is_err() {
541 return; // receiver dropped — stop
542 }
543 stats.note_change();
544 }
545}
546
547/// Drop every already-queued unit signal — the burst is one observation. Leaves
548/// channel-closed detection to the caller's next `recv` (a drained-empty and a
549/// closed channel both just stop yielding here).
550fn drain(raw_rx: &mut mpsc::UnboundedReceiver<()>) {
551 while raw_rx.try_recv().is_ok() {}
552}
553
554/// The directories to watch for a backend, deduplicated. Normally one — the
555/// `.git`/`.jj` state dir (see [`state_dir`]) — but a **linked git worktree** has
556/// two: its private gitdir (HEAD/index/logs) *and* the shared git dir it points
557/// at via `commondir` (`refs/heads/*` and `packed-refs`, where branch
558/// create/delete actually lands). Watching only the private dir would miss every
559/// `BranchCreated`/`BranchDeleted` on a worktree, since the shared dir is a
560/// *sibling*, not nested under it (see [`common_dir`]).
561///
562/// Overlapping watches are harmless — the re-query+debounce coalesces duplicate
563/// signals — but we drop a second dir whose normalized path equals the first, so
564/// `notify` isn't asked to watch the same path twice.
565fn state_dirs(kind: BackendKind, root: &Path) -> Result<Vec<PathBuf>> {
566 let state_dir = state_dir(kind, root)?;
567 let mut dirs = vec![state_dir.clone()];
568 if let Some(shared) = common_dir(&state_dir)
569 && normalize(&shared) != normalize(&state_dir)
570 {
571 dirs.push(shared);
572 }
573 Ok(dirs)
574}
575
576/// The directory to watch for a backend: `.jj` for jj, `.git` for git. A
577/// worktree's `.git` is a gitlink *file* (`gitdir: <path>`); resolve it to the
578/// real git directory. Best-effort — falls back to the `.git` path itself.
579fn state_dir(kind: BackendKind, root: &Path) -> Result<PathBuf> {
580 match kind {
581 BackendKind::Jj => Ok(root.join(".jj")),
582 BackendKind::Git => {
583 let dot_git = root.join(".git");
584 if dot_git.is_file() {
585 let content = std::fs::read_to_string(&dot_git)?;
586 if let Some(rest) = content.trim().strip_prefix("gitdir:") {
587 let p = PathBuf::from(rest.trim());
588 return Ok(if p.is_absolute() { p } else { root.join(p) });
589 }
590 }
591 Ok(dot_git)
592 }
593 // `BackendKind` is `#[non_exhaustive]`; for an unknown future backend
594 // watch the repo root itself — coarser, but it can't miss the state dir.
595 _ => Ok(root.to_path_buf()),
596 }
597}
598
599/// The **shared** git directory for a linked worktree, or `None` for a plain
600/// repo. A linked worktree's resolved gitdir holds a `commondir` file whose
601/// content is a path (typically relative, e.g. `../..`) to the shared `.git` —
602/// where `refs/heads/*` and `packed-refs` live. We join it to the gitdir and
603/// resolve `..` (lexically, matching the no-canonicalize style of [`state_dir`],
604/// so the registered path stays plain rather than a Windows `\\?\` verbatim one).
605/// A plain repo has no `commondir` file, so this is `None` and behaviour is
606/// unchanged.
607fn common_dir(state_dir: &Path) -> Option<PathBuf> {
608 let commondir = state_dir.join("commondir");
609 let content = std::fs::read_to_string(&commondir).ok()?;
610 let rel = content.trim();
611 if rel.is_empty() {
612 return None;
613 }
614 let p = PathBuf::from(rel);
615 let joined = if p.is_absolute() {
616 p
617 } else {
618 state_dir.join(p)
619 };
620 Some(lexically_normalized(&joined))
621}
622
623/// Resolve `.`/`..` components without touching the filesystem, keeping the path
624/// in its original (non-verbatim) form — `commondir`'s `../..` plus a Windows
625/// gitdir would otherwise leave literal `..` segments in the watched path.
626fn lexically_normalized(p: &Path) -> PathBuf {
627 use std::path::Component;
628 let mut out = PathBuf::new();
629 for comp in p.components() {
630 match comp {
631 Component::ParentDir => {
632 // Pop a real segment; keep a leading `..` that can't be resolved.
633 if !out.pop() {
634 out.push(comp);
635 }
636 }
637 Component::CurDir => {}
638 other => out.push(other),
639 }
640 }
641 out
642}
643
644/// Canonicalize for comparison and strip the Windows verbatim prefix (`\\?\…`,
645/// which `canonicalize` adds), so two spellings of the same dir dedup. Mirrors
646/// `vcs-core`'s path-compare normalization; falls back to the input when the path
647/// can't be canonicalized (then equal paths still compare equal byte-for-byte).
648fn normalize(p: &Path) -> PathBuf {
649 let canonical = p.canonicalize().unwrap_or_else(|_| p.to_path_buf());
650 #[cfg(windows)]
651 {
652 let s = canonical.to_string_lossy();
653 if let Some(rest) = s.strip_prefix(r"\\?\")
654 && !rest.starts_with("UNC\\")
655 {
656 return PathBuf::from(rest.to_string());
657 }
658 }
659 canonical
660}
661
662#[cfg(test)]
663mod tests {
664 use super::*;
665 use std::sync::atomic::{AtomicU64, Ordering};
666
667 static COUNTER: AtomicU64 = AtomicU64::new(0);
668
669 /// A unique, self-cleaning temp dir (no temp-dir crate needed for these
670 /// hermetic helper tests — pid + counter keeps parallel tests from colliding).
671 /// `pub(crate)`: the pipeline tests below reuse it for the scripted repo's
672 /// on-disk git dir (the snapshot's MERGE_HEAD probe reads the filesystem).
673 pub(crate) struct Scratch(pub(crate) PathBuf);
674 impl Scratch {
675 pub(crate) fn new() -> Self {
676 let p = std::env::temp_dir().join(format!(
677 "vcs-watch-commondir-{}-{}",
678 std::process::id(),
679 COUNTER.fetch_add(1, Ordering::Relaxed)
680 ));
681 std::fs::create_dir_all(&p).expect("create scratch dir");
682 Scratch(p)
683 }
684 }
685 impl Drop for Scratch {
686 fn drop(&mut self) {
687 let _ = std::fs::remove_dir_all(&self.0);
688 }
689 }
690
691 // A plain (non-worktree) git dir has no `commondir` file → no shared dir, so
692 // behaviour is exactly today's single-dir watch.
693 #[test]
694 fn no_commondir_file_yields_none() {
695 let scratch = Scratch::new();
696 let git_dir = scratch.0.join(".git");
697 std::fs::create_dir_all(&git_dir).expect("mkdir .git");
698 assert_eq!(common_dir(&git_dir), None);
699 }
700
701 // A linked-worktree layout: the private gitdir holds `commondir` = `../..`
702 // (git's actual content), which must resolve to the sibling shared `.git`.
703 #[test]
704 fn relative_commondir_resolves_to_shared_git_dir() {
705 let scratch = Scratch::new();
706 let shared = scratch.0.join(".git");
707 let private = shared.join("worktrees").join("wt");
708 std::fs::create_dir_all(&private).expect("mkdir private gitdir");
709 // git writes `../..` (relative to the private dir) here.
710 std::fs::write(private.join("commondir"), "../..\n").expect("write commondir");
711
712 let resolved = common_dir(&private).expect("Some(shared dir)");
713 // `<shared>/worktrees/wt` + `../..` == `<shared>` (lexically, no `..` left).
714 assert_eq!(resolved, lexically_normalized(&shared));
715 assert!(
716 !resolved.to_string_lossy().contains(".."),
717 "the `..` segments must be resolved, got {}",
718 resolved.display()
719 );
720 }
721
722 // An absolute `commondir` (git permits it) is taken as-is.
723 #[test]
724 fn absolute_commondir_is_used_verbatim() {
725 let scratch = Scratch::new();
726 let shared = scratch.0.join("shared-git");
727 let private = scratch.0.join("private");
728 std::fs::create_dir_all(&private).expect("mkdir private");
729 std::fs::write(private.join("commondir"), format!("{}\n", shared.display()))
730 .expect("write commondir");
731
732 assert_eq!(common_dir(&private), Some(lexically_normalized(&shared)));
733 }
734
735 // `state_dirs` returns both the private and shared dirs for a worktree, and
736 // the shared dir is not the private one (so two distinct watches register).
737 #[test]
738 fn state_dirs_includes_private_and_shared_for_worktree() {
739 let scratch = Scratch::new();
740 let root = scratch.0.join("wt-worktree");
741 let shared = scratch.0.join(".git");
742 let private = shared.join("worktrees").join("wt");
743 std::fs::create_dir_all(&private).expect("mkdir private gitdir");
744 std::fs::create_dir_all(&root).expect("mkdir worktree root");
745 std::fs::write(private.join("commondir"), "../..\n").expect("write commondir");
746 // The worktree's `.git` gitlink file points at the private dir.
747 std::fs::write(
748 root.join(".git"),
749 format!("gitdir: {}\n", private.display()),
750 )
751 .expect("write gitlink");
752
753 let dirs = state_dirs(BackendKind::Git, &root).expect("state_dirs");
754 assert_eq!(dirs.len(), 2, "private + shared, got {dirs:?}");
755 assert_eq!(normalize(&dirs[0]), normalize(&private));
756 assert_eq!(normalize(&dirs[1]), normalize(&shared));
757 }
758
759 // When `commondir` resolves back to the state dir itself (degenerate), the
760 // duplicate is dropped — we never register the same path twice.
761 #[test]
762 fn self_referential_commondir_is_deduped() {
763 let scratch = Scratch::new();
764 let git_dir = scratch.0.join(".git");
765 std::fs::create_dir_all(&git_dir).expect("mkdir .git");
766 // `.` resolves to the dir itself.
767 std::fs::write(git_dir.join("commondir"), ".\n").expect("write commondir");
768 // The gitlink points the worktree root at this very dir.
769 let root = scratch.0.join("root");
770 std::fs::create_dir_all(&root).expect("mkdir root");
771 std::fs::write(
772 root.join(".git"),
773 format!("gitdir: {}\n", git_dir.display()),
774 )
775 .expect("write gitlink");
776
777 let dirs = state_dirs(BackendKind::Git, &root).expect("state_dirs");
778 assert_eq!(dirs.len(), 1, "self-reference deduped, got {dirs:?}");
779 }
780}
781
782/// Hermetic tests of the debounce → ceiling → re-query → diff pipeline itself:
783/// `watch_loop` is driven directly with a fake signal channel, a
784/// `ScriptedRunner`-backed `Repo`, and a **paused tokio clock** — no real
785/// filesystem watch, no real process, no real sleeps. These pin the *loop's*
786/// timing contract; the notify→signal bridge stays covered by the `#[ignore]`
787/// integration tests (fake time says nothing about real OS event batching).
788#[cfg(test)]
789mod pipeline_tests {
790 use super::tests::Scratch;
791 use super::*;
792 use processkit::{ProcessRunner, Reply, ScriptedRunner};
793 use vcs_core::Repo;
794 use vcs_core::vcs_git::Git;
795
796 /// Porcelain-v2 (NUL-separated) status output for a repo at `head`, clean.
797 fn v2(head: &str) -> String {
798 format!("# branch.oid {head}\0# branch.head main\0")
799 }
800
801 /// The exact command set one snapshot+branches re-query issues, scripted:
802 /// `status --porcelain=v2`, the `rev-parse --git-dir` probe (must point at a
803 /// real dir — the op-state probe reads `MERGE_HEAD` off the filesystem), and
804 /// `branch --no-column`.
805 fn scripted(gitdir: &Path, head: &str) -> ScriptedRunner {
806 ScriptedRunner::new()
807 .on(["status"], Reply::ok(v2(head)))
808 .on(["rev-parse"], Reply::ok(format!("{}\n", gitdir.display())))
809 .on(["branch"], Reply::ok("* main\n"))
810 }
811
812 fn scripted_repo(gitdir: &Path, head: &str) -> Box<dyn VcsRepo> {
813 Box::new(Repo::from_git(
814 "/r",
815 "/r",
816 Git::with_runner(scripted(gitdir, head)),
817 ))
818 }
819
820 /// The baseline `prev` state the loop diffs against, taken through the same
821 /// snapshot path `Builder::build` uses.
822 async fn baseline(gitdir: &Path, head: &str) -> event::WatchState {
823 let repo = scripted_repo(gitdir, head);
824 let snap = repo.snapshot().await.expect("baseline snapshot");
825 let branches = repo.local_branches().await.expect("baseline branches");
826 event::WatchState::from_snapshot(&snap, branches)
827 }
828
829 fn defaults() -> LoopConfig {
830 LoopConfig {
831 debounce: Duration::from_millis(250),
832 max_wait: Duration::from_secs(1),
833 requery_timeout: Some(Duration::from_secs(30)),
834 output_capacity: 64,
835 }
836 }
837
838 struct Harness {
839 sig: mpsc::UnboundedSender<()>,
840 out: mpsc::Receiver<RepoChange>,
841 stats: Arc<StatsInner>,
842 task: tokio::task::JoinHandle<()>,
843 }
844
845 fn spawn_loop(repo: Box<dyn VcsRepo>, prev: event::WatchState, config: LoopConfig) -> Harness {
846 let (sig, raw_rx) = mpsc::unbounded_channel();
847 let (out_tx, out) = mpsc::channel(config.output_capacity);
848 let stats = Arc::new(StatsInner::default());
849 let task = tokio::spawn(watch_loop(
850 repo,
851 raw_rx,
852 out_tx,
853 prev,
854 config,
855 Arc::clone(&stats),
856 ));
857 Harness {
858 sig,
859 out,
860 stats,
861 task,
862 }
863 }
864
865 /// Let the loop task run to a quiescent point without advancing time —
866 /// paused-clock auto-advance only triggers when every task idles on a timer,
867 /// so a bounded yield burst (never a spin-until loop) is the safe way to let
868 /// an already-runnable re-query complete.
869 async fn settle() {
870 for _ in 0..32 {
871 tokio::task::yield_now().await;
872 }
873 }
874
875 // A burst of sub-debounce signals coalesces into exactly one re-query and
876 // one emitted change.
877 #[tokio::test(start_paused = true)]
878 async fn debounce_coalesces_burst() {
879 let scratch = Scratch::new();
880 let prev = baseline(&scratch.0, "aaa").await;
881 let mut h = spawn_loop(scripted_repo(&scratch.0, "bbb"), prev, defaults());
882
883 for _ in 0..5 {
884 h.sig.send(()).expect("send");
885 tokio::time::advance(Duration::from_millis(10)).await;
886 }
887 let change = h.out.recv().await.expect("one coalesced change");
888 assert!(
889 change
890 .events
891 .iter()
892 .any(|e| matches!(e, RepoEvent::HeadMoved { .. })),
893 "expected HeadMoved, got {:?}",
894 change.events
895 );
896
897 // Long quiet: nothing else arrives, and exactly one re-query ran.
898 tokio::time::advance(Duration::from_secs(5)).await;
899 settle().await;
900 assert!(
901 h.out.try_recv().is_err(),
902 "burst must coalesce to one change"
903 );
904 let stats = h.stats.snapshot();
905 assert_eq!((stats.requeries, stats.changes), (1, 1));
906 }
907
908 // Signals arriving faster than the quiet window forever: the `max_wait`
909 // ceiling still forces a re-query at its cadence (the dedicated
910 // `sleep_until` arm — not just "on the next signal after the deadline").
911 #[tokio::test(start_paused = true)]
912 async fn max_wait_caps_continuous_signals() {
913 let scratch = Scratch::new();
914 let prev = baseline(&scratch.0, "aaa").await;
915 let h_config = defaults();
916 let mut h = spawn_loop(scripted_repo(&scratch.0, "bbb"), prev, h_config);
917
918 // A pump that fires a signal every 100 ms — always inside the 250 ms
919 // quiet window, so only the ceiling can break the burst.
920 let pump_sig = h.sig.clone();
921 let pump = tokio::spawn(async move {
922 loop {
923 if pump_sig.send(()).is_err() {
924 return;
925 }
926 tokio::time::sleep(Duration::from_millis(100)).await;
927 }
928 });
929
930 let change = tokio::time::timeout(Duration::from_secs(2), h.out.recv())
931 .await
932 .expect("the ceiling must fire within max_wait")
933 .expect("change");
934 assert!(
935 change
936 .events
937 .iter()
938 .any(|e| matches!(e, RepoEvent::HeadMoved { .. })),
939 "got {:?}",
940 change.events
941 );
942 pump.abort();
943 }
944
945 // The base case: one signal, a quiet gap, one re-query.
946 #[tokio::test(start_paused = true)]
947 async fn quiet_gap_triggers_requery() {
948 let scratch = Scratch::new();
949 let prev = baseline(&scratch.0, "aaa").await;
950 let mut h = spawn_loop(scripted_repo(&scratch.0, "bbb"), prev, defaults());
951
952 h.sig.send(()).expect("send");
953 let change = h.out.recv().await.expect("change after the quiet gap");
954 assert!(
955 change
956 .events
957 .iter()
958 .any(|e| matches!(e, RepoEvent::HeadMoved { .. }))
959 );
960 }
961
962 // A re-query that finds the same state emits nothing — but it *ran* (the
963 // stats distinguish "no change" from "never re-queried").
964 #[tokio::test(start_paused = true)]
965 async fn no_change_yields_no_emission() {
966 let scratch = Scratch::new();
967 let prev = baseline(&scratch.0, "aaa").await;
968 // Same head as the baseline → empty diff.
969 let mut h = spawn_loop(scripted_repo(&scratch.0, "aaa"), prev, defaults());
970
971 h.sig.send(()).expect("send");
972 settle().await; // let the loop register its quiet timer first
973 tokio::time::advance(Duration::from_millis(300)).await; // past debounce
974 settle().await; // let the re-query run
975
976 let stats = h.stats.snapshot();
977 assert_eq!((stats.requeries, stats.changes, stats.skipped), (1, 0, 0));
978 assert!(
979 h.out.try_recv().is_err(),
980 "no events for an unchanged state"
981 );
982 }
983
984 /// Fails the first `status` call (a transiently held lock), then behaves —
985 /// `ScriptedRunner` rules are stateless, so the two-phase behaviour needs a
986 /// tiny stateful runner delegating to throwaway scripted ones.
987 struct FlakyStatus {
988 fails_left: AtomicU64,
989 gitdir: PathBuf,
990 head: &'static str,
991 }
992
993 #[async_trait::async_trait]
994 impl ProcessRunner for FlakyStatus {
995 async fn output(
996 &self,
997 command: &processkit::Command,
998 ) -> processkit::Result<processkit::ProcessResult<String>> {
999 let is_status = command.arguments().first().map(|a| a == "status") == Some(true);
1000 if is_status && self.fails_left.load(Ordering::Relaxed) > 0 {
1001 self.fails_left.fetch_sub(1, Ordering::Relaxed);
1002 return Err(processkit::Error::Exit {
1003 program: "git".into(),
1004 code: 128,
1005 stdout: String::new(),
1006 stderr: "fatal: Unable to create '.git/index.lock'".into(),
1007 });
1008 }
1009 scripted(&self.gitdir, self.head).output(command).await
1010 }
1011 }
1012
1013 // A transient re-query failure is skipped (counted, no emission); the next
1014 // signal re-checks and recovers.
1015 #[tokio::test(start_paused = true)]
1016 async fn transient_failure_skips_then_recovers() {
1017 let scratch = Scratch::new();
1018 let prev = baseline(&scratch.0, "aaa").await;
1019 let repo = Box::new(Repo::from_git(
1020 "/r",
1021 "/r",
1022 Git::with_runner(FlakyStatus {
1023 fails_left: AtomicU64::new(1),
1024 gitdir: scratch.0.clone(),
1025 head: "bbb",
1026 }),
1027 ));
1028 let mut h = spawn_loop(repo, prev, defaults());
1029
1030 // First attempt: the snapshot fails → skip, nothing emitted.
1031 h.sig.send(()).expect("send");
1032 settle().await; // loop registers the quiet timer
1033 tokio::time::advance(Duration::from_millis(300)).await;
1034 settle().await; // the (failing) re-query runs
1035 let stats = h.stats.snapshot();
1036 assert_eq!((stats.requeries, stats.skipped, stats.changes), (1, 1, 0));
1037 assert_eq!(stats.last_error, Some(WatcherErrorKind::Snapshot));
1038 assert!(h.out.try_recv().is_err());
1039
1040 // Second signal: the lock "cleared" — the re-query recovers and emits.
1041 h.sig.send(()).expect("send");
1042 let change = h.out.recv().await.expect("recovered change");
1043 assert!(
1044 change
1045 .events
1046 .iter()
1047 .any(|e| matches!(e, RepoEvent::HeadMoved { .. }))
1048 );
1049 let stats = h.stats.snapshot();
1050 assert_eq!((stats.requeries, stats.changes), (2, 1));
1051 }
1052
1053 /// Delays every reply by `delay` (virtual time — `tokio::time::sleep`, NOT a
1054 /// thread sleep, so the paused clock controls it). `ScriptedRunner` replies
1055 /// instantly, so this is the only way to exercise the `requery_timeout`
1056 /// wrapper — a scripted `Reply::timeout()` resolves immediately and would
1057 /// test the *error* path, not the deadline.
1058 struct Sleepy {
1059 delay: Duration,
1060 gitdir: PathBuf,
1061 head: &'static str,
1062 }
1063
1064 #[async_trait::async_trait]
1065 impl ProcessRunner for Sleepy {
1066 async fn output(
1067 &self,
1068 command: &processkit::Command,
1069 ) -> processkit::Result<processkit::ProcessResult<String>> {
1070 tokio::time::sleep(self.delay).await;
1071 scripted(&self.gitdir, self.head).output(command).await
1072 }
1073 }
1074
1075 // A re-query exceeding the configured deadline is killed and skipped as
1076 // transient; the loop survives (a later attempt runs and is also bounded).
1077 #[tokio::test(start_paused = true)]
1078 async fn requery_timeout_skips_as_transient() {
1079 let scratch = Scratch::new();
1080 let prev = baseline(&scratch.0, "aaa").await;
1081 let repo = Box::new(Repo::from_git(
1082 "/r",
1083 "/r",
1084 Git::with_runner(Sleepy {
1085 delay: Duration::from_secs(10),
1086 gitdir: scratch.0.clone(),
1087 head: "bbb",
1088 }),
1089 ));
1090 let config = LoopConfig {
1091 requery_timeout: Some(Duration::from_secs(5)),
1092 ..defaults()
1093 };
1094 let mut h = spawn_loop(repo, prev, config);
1095
1096 h.sig.send(()).expect("send");
1097 settle().await; // loop registers the quiet timer
1098 tokio::time::advance(Duration::from_millis(300)).await; // debounce
1099 settle().await; // re-query starts; Sleepy + the deadline register timers
1100 tokio::time::advance(Duration::from_secs(6)).await; // past the deadline
1101 settle().await;
1102 let stats = h.stats.snapshot();
1103 assert_eq!((stats.requeries, stats.skipped, stats.changes), (1, 1, 0));
1104 assert_eq!(stats.last_error, Some(WatcherErrorKind::Timeout));
1105 assert!(h.out.try_recv().is_err());
1106
1107 // The loop is alive: a second attempt runs (and times out the same way).
1108 h.sig.send(()).expect("send");
1109 settle().await;
1110 tokio::time::advance(Duration::from_millis(300)).await;
1111 settle().await;
1112 tokio::time::advance(Duration::from_secs(6)).await;
1113 settle().await;
1114 assert_eq!(h.stats.snapshot().requeries, 2);
1115 }
1116
1117 // Closing the signal channel mid-debounce ends the loop promptly and closes
1118 // the output channel.
1119 #[tokio::test(start_paused = true)]
1120 async fn drop_teardown_mid_debounce() {
1121 let scratch = Scratch::new();
1122 let prev = baseline(&scratch.0, "aaa").await;
1123 let Harness {
1124 sig,
1125 mut out,
1126 stats: _,
1127 task,
1128 } = spawn_loop(scripted_repo(&scratch.0, "bbb"), prev, defaults());
1129
1130 sig.send(()).expect("send");
1131 tokio::time::advance(Duration::from_millis(100)).await; // mid-debounce
1132 drop(sig);
1133
1134 tokio::time::timeout(Duration::from_secs(1), task)
1135 .await
1136 .expect("loop ends promptly")
1137 .expect("loop task joins cleanly");
1138 assert!(out.recv().await.is_none(), "output closes with the loop");
1139 }
1140
1141 /// Reports a different head on every `status` call, so every re-query
1142 /// produces a `HeadMoved` — the emission generator the backpressure test
1143 /// needs to fill the bounded output channel.
1144 struct VaryingHead {
1145 statuses: AtomicU64,
1146 gitdir: PathBuf,
1147 }
1148
1149 #[async_trait::async_trait]
1150 impl ProcessRunner for VaryingHead {
1151 async fn output(
1152 &self,
1153 command: &processkit::Command,
1154 ) -> processkit::Result<processkit::ProcessResult<String>> {
1155 let is_status = command.arguments().first().map(|a| a == "status") == Some(true);
1156 let n = if is_status {
1157 self.statuses.fetch_add(1, Ordering::Relaxed)
1158 } else {
1159 self.statuses.load(Ordering::Relaxed)
1160 };
1161 scripted(&self.gitdir, &format!("h{n}"))
1162 .output(command)
1163 .await
1164 }
1165 }
1166
1167 // A full output channel parks the loop at `send` (backpressure) instead of
1168 // dropping or buffering unboundedly; draining one item unparks it.
1169 #[tokio::test(start_paused = true)]
1170 async fn backpressure_parks_loop() {
1171 let scratch = Scratch::new();
1172 let prev = baseline(&scratch.0, "base").await;
1173 let repo = Box::new(Repo::from_git(
1174 "/r",
1175 "/r",
1176 Git::with_runner(VaryingHead {
1177 statuses: AtomicU64::new(0),
1178 gitdir: scratch.0.clone(),
1179 }),
1180 ));
1181 let config = LoopConfig {
1182 output_capacity: 1,
1183 ..defaults()
1184 };
1185 let mut h = spawn_loop(repo, prev, config);
1186
1187 // First change fills the capacity-1 channel.
1188 h.sig.send(()).expect("send");
1189 settle().await; // loop registers the quiet timer
1190 tokio::time::advance(Duration::from_millis(300)).await;
1191 settle().await; // re-query runs; emission 1 fills the channel
1192 // Second re-query produces another change; the send parks (channel full):
1193 // the re-query ran but the emission hasn't landed.
1194 h.sig.send(()).expect("send");
1195 settle().await;
1196 tokio::time::advance(Duration::from_millis(300)).await;
1197 settle().await;
1198 let stats = h.stats.snapshot();
1199 assert_eq!(
1200 (stats.requeries, stats.changes),
1201 (2, 1),
1202 "second emission must be parked on the full channel"
1203 );
1204
1205 // Draining unparks the loop; both changes arrive in order.
1206 let first = h.out.recv().await.expect("first change");
1207 assert!(
1208 first
1209 .events
1210 .iter()
1211 .any(|e| matches!(e, RepoEvent::HeadMoved { .. }))
1212 );
1213 let second = h.out.recv().await.expect("second change");
1214 assert!(
1215 second
1216 .events
1217 .iter()
1218 .any(|e| matches!(e, RepoEvent::HeadMoved { .. }))
1219 );
1220 settle().await;
1221 assert_eq!(h.stats.snapshot().changes, 2);
1222 }
1223
1224 // The `stream` feature: `StreamExt::next` on the REAL `RepoWatcher` yields
1225 // what `recv` would and advances `current()` identically. The watcher is
1226 // assembled directly (same crate) around the loop harness's channel, with an
1227 // idle notify watcher standing in for the OS watch.
1228 #[cfg(feature = "stream")]
1229 #[tokio::test(start_paused = true)]
1230 async fn stream_yields_changes_and_advances_current() {
1231 use tokio_stream::StreamExt;
1232
1233 let scratch = Scratch::new();
1234 let prev = baseline(&scratch.0, "aaa").await;
1235 let h = spawn_loop(scripted_repo(&scratch.0, "bbb"), prev, defaults());
1236
1237 let baseline_snap = scripted_repo(&scratch.0, "aaa")
1238 .snapshot()
1239 .await
1240 .expect("baseline snapshot");
1241 let mut watcher = RepoWatcher {
1242 rx: h.out,
1243 current: baseline_snap,
1244 stats: h.stats,
1245 _watcher: notify::recommended_watcher(|_res| {}).expect("idle watcher"),
1246 task: h.task,
1247 };
1248 assert_eq!(watcher.current().head.as_deref(), Some("aaa"));
1249
1250 h.sig.send(()).expect("send");
1251 let change = watcher.next().await.expect("stream item");
1252 assert!(
1253 change
1254 .events
1255 .iter()
1256 .any(|e| matches!(e, RepoEvent::HeadMoved { .. })),
1257 "got {:?}",
1258 change.events
1259 );
1260 // Polling through the Stream advanced `current()` exactly like `recv`.
1261 assert_eq!(watcher.current().head.as_deref(), Some("bbb"));
1262 }
1263}
1264
1265// Long-form how-to guides, rendered from this crate's docs/*.md on docs.rs.
1266#[doc = include_str!("../docs/watch.md")]
1267#[allow(rustdoc::broken_intra_doc_links)]
1268pub mod guide {}