#![cfg_attr(docsrs, feature(doc_cfg))]
#![deny(rustdoc::broken_intra_doc_links)]
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
use std::time::Duration;
use notify::{RecursiveMode, Watcher};
use tokio::sync::mpsc;
use vcs_core::{BackendKind, VcsRepo};
mod error;
mod event;
pub use error::{Error, Result};
pub use event::{RepoChange, RepoEvent};
pub use vcs_core::{OperationState, RepoSnapshot};
const DEFAULT_DEBOUNCE: Duration = Duration::from_millis(250);
const DEFAULT_MAX_WAIT: Duration = Duration::from_secs(1);
pub const DEFAULT_REQUERY_TIMEOUT: Duration = Duration::from_secs(30);
const OUTPUT_CAPACITY: usize = 64;
struct LoopConfig {
debounce: Duration,
max_wait: Duration,
requery_timeout: Option<Duration>,
output_capacity: usize,
}
pub struct Builder {
repo: Box<dyn VcsRepo>,
working_tree: bool,
debounce: Duration,
max_wait: Duration,
requery_timeout: Option<Duration>,
}
impl Builder {
pub fn working_tree(mut self, yes: bool) -> Self {
self.working_tree = yes;
self
}
pub fn debounce(mut self, window: Duration) -> Self {
self.debounce = window;
self
}
pub fn max_wait(mut self, ceiling: Duration) -> Self {
self.max_wait = ceiling;
self
}
pub fn requery_timeout(mut self, timeout: Option<Duration>) -> Self {
self.requery_timeout = timeout;
self
}
pub async fn build(self) -> Result<RepoWatcher> {
let root = self.repo.root().to_path_buf();
let state_dirs = state_dirs(self.repo.kind(), &root)?;
let (raw_tx, raw_rx) = mpsc::unbounded_channel::<()>();
let mut watcher = notify::recommended_watcher(move |_res| {
let _ = raw_tx.send(());
})?;
if self.working_tree {
watcher.watch(&root, RecursiveMode::Recursive)?;
for dir in &state_dirs {
if !dir.starts_with(&root) {
watcher.watch(dir, RecursiveMode::Recursive)?;
}
}
} else {
for dir in &state_dirs {
watcher.watch(dir, RecursiveMode::Recursive)?;
}
}
let snapshot = self.repo.snapshot().await?;
let branches = self.repo.local_branches().await?;
let baseline = snapshot.clone();
let prev = event::WatchState::from_snapshot(&snapshot, branches);
let config = LoopConfig {
debounce: self.debounce,
max_wait: self.max_wait,
requery_timeout: self.requery_timeout,
output_capacity: OUTPUT_CAPACITY,
};
let stats = Arc::new(StatsInner::default());
let (out_tx, out_rx) = mpsc::channel::<RepoChange>(config.output_capacity);
let task = tokio::spawn(watch_loop(
self.repo,
raw_rx,
out_tx,
prev,
config,
Arc::clone(&stats),
));
Ok(RepoWatcher {
rx: out_rx,
current: baseline,
stats,
_watcher: watcher,
task,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum WatcherErrorKind {
Snapshot,
Branches,
Timeout,
}
#[derive(Debug, Clone, Copy)]
#[non_exhaustive]
pub struct WatcherStats {
pub requeries: u64,
pub changes: u64,
pub skipped: u64,
pub last_error: Option<WatcherErrorKind>,
}
#[derive(Default)]
struct StatsInner {
requeries: AtomicU64,
changes: AtomicU64,
skipped: AtomicU64,
last_error: AtomicU8,
}
impl StatsInner {
fn note_requery(&self) {
self.requeries.fetch_add(1, Ordering::Relaxed);
}
fn note_change(&self) {
self.changes.fetch_add(1, Ordering::Relaxed);
}
fn note_skip(&self, kind: WatcherErrorKind) {
self.skipped.fetch_add(1, Ordering::Relaxed);
let code = match kind {
WatcherErrorKind::Snapshot => 1,
WatcherErrorKind::Branches => 2,
WatcherErrorKind::Timeout => 3,
};
self.last_error.store(code, Ordering::Relaxed);
}
fn snapshot(&self) -> WatcherStats {
let last_error = match self.last_error.load(Ordering::Relaxed) {
1 => Some(WatcherErrorKind::Snapshot),
2 => Some(WatcherErrorKind::Branches),
3 => Some(WatcherErrorKind::Timeout),
_ => None,
};
WatcherStats {
requeries: self.requeries.load(Ordering::Relaxed),
changes: self.changes.load(Ordering::Relaxed),
skipped: self.skipped.load(Ordering::Relaxed),
last_error,
}
}
}
pub struct RepoWatcher {
rx: mpsc::Receiver<RepoChange>,
current: RepoSnapshot,
stats: Arc<StatsInner>,
_watcher: notify::RecommendedWatcher,
task: tokio::task::JoinHandle<()>,
}
impl RepoWatcher {
pub fn builder(repo: impl VcsRepo + 'static) -> Builder {
Builder {
repo: Box::new(repo),
working_tree: false,
debounce: DEFAULT_DEBOUNCE,
max_wait: DEFAULT_MAX_WAIT,
requery_timeout: Some(DEFAULT_REQUERY_TIMEOUT),
}
}
pub async fn watch(repo: impl VcsRepo + 'static) -> Result<RepoWatcher> {
Self::builder(repo).build().await
}
pub async fn recv(&mut self) -> Option<RepoChange> {
let change = self.rx.recv().await?;
self.current = change.snapshot.clone();
Some(change)
}
pub fn current(&self) -> &RepoSnapshot {
&self.current
}
pub fn stats(&self) -> WatcherStats {
self.stats.snapshot()
}
}
#[cfg(feature = "stream")]
#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
impl futures_core::Stream for RepoWatcher {
type Item = RepoChange;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<RepoChange>> {
let this = self.get_mut();
match this.rx.poll_recv(cx) {
std::task::Poll::Ready(Some(change)) => {
this.current = change.snapshot.clone();
std::task::Poll::Ready(Some(change))
}
other => other,
}
}
}
impl Drop for RepoWatcher {
fn drop(&mut self) {
self.task.abort();
}
}
async fn watch_loop(
repo: Box<dyn VcsRepo>,
mut raw_rx: mpsc::UnboundedReceiver<()>,
out_tx: mpsc::Sender<RepoChange>,
mut prev: event::WatchState,
config: LoopConfig,
stats: Arc<StatsInner>,
) {
loop {
if raw_rx.recv().await.is_none() {
return;
}
drain(&mut raw_rx);
let deadline = tokio::time::Instant::now() + config.max_wait;
loop {
tokio::select! {
biased;
sig = raw_rx.recv() => {
if sig.is_none() {
return; }
drain(&mut raw_rx);
if tokio::time::Instant::now() >= deadline {
break; }
}
_ = tokio::time::sleep_until(deadline) => break, _ = tokio::time::sleep(config.debounce) => break, }
}
stats.note_requery();
let requery = async {
let snapshot = repo
.snapshot()
.await
.map_err(|e| (WatcherErrorKind::Snapshot, e))?;
let branches = repo
.local_branches()
.await
.map_err(|e| (WatcherErrorKind::Branches, e))?;
Ok::<_, (WatcherErrorKind, vcs_core::Error)>((snapshot, branches))
};
let outcome = match config.requery_timeout {
Some(limit) => match tokio::time::timeout(limit, requery).await {
Ok(result) => result,
Err(_elapsed) => {
stats.note_skip(WatcherErrorKind::Timeout);
#[cfg(feature = "tracing")]
tracing::debug!(
timeout = ?limit,
"vcs-watch: re-query exceeded its deadline; killed and skipped"
);
continue;
}
},
None => requery.await,
};
let (snapshot, branches) = match outcome {
Ok(pair) => pair,
Err((kind, _e)) => {
stats.note_skip(kind);
#[cfg(feature = "tracing")]
tracing::debug!(error = %_e, "vcs-watch: re-query failed; skipping");
continue;
}
};
let next = event::WatchState::from_snapshot(&snapshot, branches);
let events = event::diff(&prev, &next);
prev = next;
if events.is_empty() {
continue;
}
if out_tx.send(RepoChange { snapshot, events }).await.is_err() {
return; }
stats.note_change();
}
}
fn drain(raw_rx: &mut mpsc::UnboundedReceiver<()>) {
while raw_rx.try_recv().is_ok() {}
}
fn state_dirs(kind: BackendKind, root: &Path) -> Result<Vec<PathBuf>> {
let state_dir = state_dir(kind, root)?;
let mut dirs = vec![state_dir.clone()];
if let Some(shared) = common_dir(&state_dir)
&& normalize(&shared) != normalize(&state_dir)
{
dirs.push(shared);
}
Ok(dirs)
}
fn state_dir(kind: BackendKind, root: &Path) -> Result<PathBuf> {
match kind {
BackendKind::Jj => Ok(root.join(".jj")),
BackendKind::Git => {
let dot_git = root.join(".git");
if dot_git.is_file() {
let content = std::fs::read_to_string(&dot_git)?;
if let Some(rest) = content.trim().strip_prefix("gitdir:") {
let p = PathBuf::from(rest.trim());
return Ok(if p.is_absolute() { p } else { root.join(p) });
}
}
Ok(dot_git)
}
_ => Ok(root.to_path_buf()),
}
}
fn common_dir(state_dir: &Path) -> Option<PathBuf> {
let commondir = state_dir.join("commondir");
let content = std::fs::read_to_string(&commondir).ok()?;
let rel = content.trim();
if rel.is_empty() {
return None;
}
let p = PathBuf::from(rel);
let joined = if p.is_absolute() {
p
} else {
state_dir.join(p)
};
Some(lexically_normalized(&joined))
}
fn lexically_normalized(p: &Path) -> PathBuf {
use std::path::Component;
let mut out = PathBuf::new();
for comp in p.components() {
match comp {
Component::ParentDir => {
if !out.pop() {
out.push(comp);
}
}
Component::CurDir => {}
other => out.push(other),
}
}
out
}
fn normalize(p: &Path) -> PathBuf {
let canonical = p.canonicalize().unwrap_or_else(|_| p.to_path_buf());
#[cfg(windows)]
{
let s = canonical.to_string_lossy();
if let Some(rest) = s.strip_prefix(r"\\?\")
&& !rest.starts_with("UNC\\")
{
return PathBuf::from(rest.to_string());
}
}
canonical
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
pub(crate) struct Scratch(pub(crate) PathBuf);
impl Scratch {
pub(crate) fn new() -> Self {
let p = std::env::temp_dir().join(format!(
"vcs-watch-commondir-{}-{}",
std::process::id(),
COUNTER.fetch_add(1, Ordering::Relaxed)
));
std::fs::create_dir_all(&p).expect("create scratch dir");
Scratch(p)
}
}
impl Drop for Scratch {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.0);
}
}
#[test]
fn no_commondir_file_yields_none() {
let scratch = Scratch::new();
let git_dir = scratch.0.join(".git");
std::fs::create_dir_all(&git_dir).expect("mkdir .git");
assert_eq!(common_dir(&git_dir), None);
}
#[test]
fn relative_commondir_resolves_to_shared_git_dir() {
let scratch = Scratch::new();
let shared = scratch.0.join(".git");
let private = shared.join("worktrees").join("wt");
std::fs::create_dir_all(&private).expect("mkdir private gitdir");
std::fs::write(private.join("commondir"), "../..\n").expect("write commondir");
let resolved = common_dir(&private).expect("Some(shared dir)");
assert_eq!(resolved, lexically_normalized(&shared));
assert!(
!resolved.to_string_lossy().contains(".."),
"the `..` segments must be resolved, got {}",
resolved.display()
);
}
#[test]
fn absolute_commondir_is_used_verbatim() {
let scratch = Scratch::new();
let shared = scratch.0.join("shared-git");
let private = scratch.0.join("private");
std::fs::create_dir_all(&private).expect("mkdir private");
std::fs::write(private.join("commondir"), format!("{}\n", shared.display()))
.expect("write commondir");
assert_eq!(common_dir(&private), Some(lexically_normalized(&shared)));
}
#[test]
fn state_dirs_includes_private_and_shared_for_worktree() {
let scratch = Scratch::new();
let root = scratch.0.join("wt-worktree");
let shared = scratch.0.join(".git");
let private = shared.join("worktrees").join("wt");
std::fs::create_dir_all(&private).expect("mkdir private gitdir");
std::fs::create_dir_all(&root).expect("mkdir worktree root");
std::fs::write(private.join("commondir"), "../..\n").expect("write commondir");
std::fs::write(
root.join(".git"),
format!("gitdir: {}\n", private.display()),
)
.expect("write gitlink");
let dirs = state_dirs(BackendKind::Git, &root).expect("state_dirs");
assert_eq!(dirs.len(), 2, "private + shared, got {dirs:?}");
assert_eq!(normalize(&dirs[0]), normalize(&private));
assert_eq!(normalize(&dirs[1]), normalize(&shared));
}
#[test]
fn self_referential_commondir_is_deduped() {
let scratch = Scratch::new();
let git_dir = scratch.0.join(".git");
std::fs::create_dir_all(&git_dir).expect("mkdir .git");
std::fs::write(git_dir.join("commondir"), ".\n").expect("write commondir");
let root = scratch.0.join("root");
std::fs::create_dir_all(&root).expect("mkdir root");
std::fs::write(
root.join(".git"),
format!("gitdir: {}\n", git_dir.display()),
)
.expect("write gitlink");
let dirs = state_dirs(BackendKind::Git, &root).expect("state_dirs");
assert_eq!(dirs.len(), 1, "self-reference deduped, got {dirs:?}");
}
}
#[cfg(test)]
mod pipeline_tests {
use super::tests::Scratch;
use super::*;
use processkit::{ProcessRunner, Reply, ScriptedRunner};
use vcs_core::Repo;
use vcs_core::vcs_git::Git;
fn v2(head: &str) -> String {
format!("# branch.oid {head}\0# branch.head main\0")
}
fn scripted(gitdir: &Path, head: &str) -> ScriptedRunner {
ScriptedRunner::new()
.on(["status"], Reply::ok(v2(head)))
.on(["rev-parse"], Reply::ok(format!("{}\n", gitdir.display())))
.on(["branch"], Reply::ok("* main\n"))
}
fn scripted_repo(gitdir: &Path, head: &str) -> Box<dyn VcsRepo> {
Box::new(Repo::from_git(
"/r",
"/r",
Git::with_runner(scripted(gitdir, head)),
))
}
async fn baseline(gitdir: &Path, head: &str) -> event::WatchState {
let repo = scripted_repo(gitdir, head);
let snap = repo.snapshot().await.expect("baseline snapshot");
let branches = repo.local_branches().await.expect("baseline branches");
event::WatchState::from_snapshot(&snap, branches)
}
fn defaults() -> LoopConfig {
LoopConfig {
debounce: Duration::from_millis(250),
max_wait: Duration::from_secs(1),
requery_timeout: Some(Duration::from_secs(30)),
output_capacity: 64,
}
}
struct Harness {
sig: mpsc::UnboundedSender<()>,
out: mpsc::Receiver<RepoChange>,
stats: Arc<StatsInner>,
task: tokio::task::JoinHandle<()>,
}
fn spawn_loop(repo: Box<dyn VcsRepo>, prev: event::WatchState, config: LoopConfig) -> Harness {
let (sig, raw_rx) = mpsc::unbounded_channel();
let (out_tx, out) = mpsc::channel(config.output_capacity);
let stats = Arc::new(StatsInner::default());
let task = tokio::spawn(watch_loop(
repo,
raw_rx,
out_tx,
prev,
config,
Arc::clone(&stats),
));
Harness {
sig,
out,
stats,
task,
}
}
async fn settle() {
for _ in 0..32 {
tokio::task::yield_now().await;
}
}
#[tokio::test(start_paused = true)]
async fn debounce_coalesces_burst() {
let scratch = Scratch::new();
let prev = baseline(&scratch.0, "aaa").await;
let mut h = spawn_loop(scripted_repo(&scratch.0, "bbb"), prev, defaults());
for _ in 0..5 {
h.sig.send(()).expect("send");
tokio::time::advance(Duration::from_millis(10)).await;
}
let change = h.out.recv().await.expect("one coalesced change");
assert!(
change
.events
.iter()
.any(|e| matches!(e, RepoEvent::HeadMoved { .. })),
"expected HeadMoved, got {:?}",
change.events
);
tokio::time::advance(Duration::from_secs(5)).await;
settle().await;
assert!(
h.out.try_recv().is_err(),
"burst must coalesce to one change"
);
let stats = h.stats.snapshot();
assert_eq!((stats.requeries, stats.changes), (1, 1));
}
#[tokio::test(start_paused = true)]
async fn max_wait_caps_continuous_signals() {
let scratch = Scratch::new();
let prev = baseline(&scratch.0, "aaa").await;
let h_config = defaults();
let mut h = spawn_loop(scripted_repo(&scratch.0, "bbb"), prev, h_config);
let pump_sig = h.sig.clone();
let pump = tokio::spawn(async move {
loop {
if pump_sig.send(()).is_err() {
return;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
let change = tokio::time::timeout(Duration::from_secs(2), h.out.recv())
.await
.expect("the ceiling must fire within max_wait")
.expect("change");
assert!(
change
.events
.iter()
.any(|e| matches!(e, RepoEvent::HeadMoved { .. })),
"got {:?}",
change.events
);
pump.abort();
}
#[tokio::test(start_paused = true)]
async fn quiet_gap_triggers_requery() {
let scratch = Scratch::new();
let prev = baseline(&scratch.0, "aaa").await;
let mut h = spawn_loop(scripted_repo(&scratch.0, "bbb"), prev, defaults());
h.sig.send(()).expect("send");
let change = h.out.recv().await.expect("change after the quiet gap");
assert!(
change
.events
.iter()
.any(|e| matches!(e, RepoEvent::HeadMoved { .. }))
);
}
#[tokio::test(start_paused = true)]
async fn no_change_yields_no_emission() {
let scratch = Scratch::new();
let prev = baseline(&scratch.0, "aaa").await;
let mut h = spawn_loop(scripted_repo(&scratch.0, "aaa"), prev, defaults());
h.sig.send(()).expect("send");
settle().await; tokio::time::advance(Duration::from_millis(300)).await; settle().await;
let stats = h.stats.snapshot();
assert_eq!((stats.requeries, stats.changes, stats.skipped), (1, 0, 0));
assert!(
h.out.try_recv().is_err(),
"no events for an unchanged state"
);
}
struct FlakyStatus {
fails_left: AtomicU64,
gitdir: PathBuf,
head: &'static str,
}
#[async_trait::async_trait]
impl ProcessRunner for FlakyStatus {
async fn output(
&self,
command: &processkit::Command,
) -> processkit::Result<processkit::ProcessResult<String>> {
let is_status = command.arguments().first().map(|a| a == "status") == Some(true);
if is_status && self.fails_left.load(Ordering::Relaxed) > 0 {
self.fails_left.fetch_sub(1, Ordering::Relaxed);
return Err(processkit::Error::Exit {
program: "git".into(),
code: 128,
stdout: String::new(),
stderr: "fatal: Unable to create '.git/index.lock'".into(),
});
}
scripted(&self.gitdir, self.head).output(command).await
}
}
#[tokio::test(start_paused = true)]
async fn transient_failure_skips_then_recovers() {
let scratch = Scratch::new();
let prev = baseline(&scratch.0, "aaa").await;
let repo = Box::new(Repo::from_git(
"/r",
"/r",
Git::with_runner(FlakyStatus {
fails_left: AtomicU64::new(1),
gitdir: scratch.0.clone(),
head: "bbb",
}),
));
let mut h = spawn_loop(repo, prev, defaults());
h.sig.send(()).expect("send");
settle().await; tokio::time::advance(Duration::from_millis(300)).await;
settle().await; let stats = h.stats.snapshot();
assert_eq!((stats.requeries, stats.skipped, stats.changes), (1, 1, 0));
assert_eq!(stats.last_error, Some(WatcherErrorKind::Snapshot));
assert!(h.out.try_recv().is_err());
h.sig.send(()).expect("send");
let change = h.out.recv().await.expect("recovered change");
assert!(
change
.events
.iter()
.any(|e| matches!(e, RepoEvent::HeadMoved { .. }))
);
let stats = h.stats.snapshot();
assert_eq!((stats.requeries, stats.changes), (2, 1));
}
struct Sleepy {
delay: Duration,
gitdir: PathBuf,
head: &'static str,
}
#[async_trait::async_trait]
impl ProcessRunner for Sleepy {
async fn output(
&self,
command: &processkit::Command,
) -> processkit::Result<processkit::ProcessResult<String>> {
tokio::time::sleep(self.delay).await;
scripted(&self.gitdir, self.head).output(command).await
}
}
#[tokio::test(start_paused = true)]
async fn requery_timeout_skips_as_transient() {
let scratch = Scratch::new();
let prev = baseline(&scratch.0, "aaa").await;
let repo = Box::new(Repo::from_git(
"/r",
"/r",
Git::with_runner(Sleepy {
delay: Duration::from_secs(10),
gitdir: scratch.0.clone(),
head: "bbb",
}),
));
let config = LoopConfig {
requery_timeout: Some(Duration::from_secs(5)),
..defaults()
};
let mut h = spawn_loop(repo, prev, config);
h.sig.send(()).expect("send");
settle().await; tokio::time::advance(Duration::from_millis(300)).await; settle().await; tokio::time::advance(Duration::from_secs(6)).await; settle().await;
let stats = h.stats.snapshot();
assert_eq!((stats.requeries, stats.skipped, stats.changes), (1, 1, 0));
assert_eq!(stats.last_error, Some(WatcherErrorKind::Timeout));
assert!(h.out.try_recv().is_err());
h.sig.send(()).expect("send");
settle().await;
tokio::time::advance(Duration::from_millis(300)).await;
settle().await;
tokio::time::advance(Duration::from_secs(6)).await;
settle().await;
assert_eq!(h.stats.snapshot().requeries, 2);
}
#[tokio::test(start_paused = true)]
async fn drop_teardown_mid_debounce() {
let scratch = Scratch::new();
let prev = baseline(&scratch.0, "aaa").await;
let Harness {
sig,
mut out,
stats: _,
task,
} = spawn_loop(scripted_repo(&scratch.0, "bbb"), prev, defaults());
sig.send(()).expect("send");
tokio::time::advance(Duration::from_millis(100)).await; drop(sig);
tokio::time::timeout(Duration::from_secs(1), task)
.await
.expect("loop ends promptly")
.expect("loop task joins cleanly");
assert!(out.recv().await.is_none(), "output closes with the loop");
}
struct VaryingHead {
statuses: AtomicU64,
gitdir: PathBuf,
}
#[async_trait::async_trait]
impl ProcessRunner for VaryingHead {
async fn output(
&self,
command: &processkit::Command,
) -> processkit::Result<processkit::ProcessResult<String>> {
let is_status = command.arguments().first().map(|a| a == "status") == Some(true);
let n = if is_status {
self.statuses.fetch_add(1, Ordering::Relaxed)
} else {
self.statuses.load(Ordering::Relaxed)
};
scripted(&self.gitdir, &format!("h{n}"))
.output(command)
.await
}
}
#[tokio::test(start_paused = true)]
async fn backpressure_parks_loop() {
let scratch = Scratch::new();
let prev = baseline(&scratch.0, "base").await;
let repo = Box::new(Repo::from_git(
"/r",
"/r",
Git::with_runner(VaryingHead {
statuses: AtomicU64::new(0),
gitdir: scratch.0.clone(),
}),
));
let config = LoopConfig {
output_capacity: 1,
..defaults()
};
let mut h = spawn_loop(repo, prev, config);
h.sig.send(()).expect("send");
settle().await; tokio::time::advance(Duration::from_millis(300)).await;
settle().await; h.sig.send(()).expect("send");
settle().await;
tokio::time::advance(Duration::from_millis(300)).await;
settle().await;
let stats = h.stats.snapshot();
assert_eq!(
(stats.requeries, stats.changes),
(2, 1),
"second emission must be parked on the full channel"
);
let first = h.out.recv().await.expect("first change");
assert!(
first
.events
.iter()
.any(|e| matches!(e, RepoEvent::HeadMoved { .. }))
);
let second = h.out.recv().await.expect("second change");
assert!(
second
.events
.iter()
.any(|e| matches!(e, RepoEvent::HeadMoved { .. }))
);
settle().await;
assert_eq!(h.stats.snapshot().changes, 2);
}
#[cfg(feature = "stream")]
#[tokio::test(start_paused = true)]
async fn stream_yields_changes_and_advances_current() {
use tokio_stream::StreamExt;
let scratch = Scratch::new();
let prev = baseline(&scratch.0, "aaa").await;
let h = spawn_loop(scripted_repo(&scratch.0, "bbb"), prev, defaults());
let baseline_snap = scripted_repo(&scratch.0, "aaa")
.snapshot()
.await
.expect("baseline snapshot");
let mut watcher = RepoWatcher {
rx: h.out,
current: baseline_snap,
stats: h.stats,
_watcher: notify::recommended_watcher(|_res| {}).expect("idle watcher"),
task: h.task,
};
assert_eq!(watcher.current().head.as_deref(), Some("aaa"));
h.sig.send(()).expect("send");
let change = watcher.next().await.expect("stream item");
assert!(
change
.events
.iter()
.any(|e| matches!(e, RepoEvent::HeadMoved { .. })),
"got {:?}",
change.events
);
assert_eq!(watcher.current().head.as_deref(), Some("bbb"));
}
}
#[doc = include_str!("../docs/watch.md")]
#[allow(rustdoc::broken_intra_doc_links)]
pub mod guide {}