slipstream/applied.rs
1//! Cursor-after-apply watch combinator.
2//!
3//! [`watch_applied`] drives a [`KvWatcher`], batches incoming [`KvUpdate`]s over
4//! a short window (or a max count), hands each batch to a caller-supplied
5//! `apply` closure, and **only then** advances the resume cursor, checkpoints
6//! the snapshot, and fires `on_applied`. It encodes one discipline that every
7//! hand-rolled watch loop in the wider system gets subtly wrong:
8//!
9//! > **INVARIANT.** A persisted/reported cursor `C` implies every update with
10//! > revision ≤ `C` has been *applied* — the caller's `apply()` has returned for
11//! > it. The cursor never advances on *receipt* of an update, only after it has
12//! > durably taken effect.
13//!
14//! ## Why receipt is the wrong signal
15//!
16//! The tempting shortcut is to bump the cursor as each update arrives off the
17//! channel (`high_water = rev` on `rx.recv()`), then apply the batch later. On a
18//! crash between those two steps the persisted cursor claims "caught up to rev
19//! N" while rev N is still sitting in an unapplied buffer. On resume the watch
20//! starts *past* rev N and silently skips it — a correctness hole in the exact
21//! "resume after any restart" guarantee this crate advertises.
22//!
23//! Saltzer, Reed & Clark's *End-to-End Arguments in System Design* (1984) names
24//! the fix: a function placed below the endpoints (here, the channel receive)
25//! can only be a performance hint; the *endpoint* — the application of the
26//! update — is the only place the "it happened" guarantee can actually be
27//! established. So the cursor is written from `apply()`'s completion, not from
28//! the transport's delivery.
29//!
30//! The cursor-as-monotonic-index-into-a-log shape itself follows HashiCorp
31//! Consul's anti-entropy / blocking-query lineage: a client holds the last index
32//! it has *reconciled* and re-arms the watch from there, never from the index it
33//! merely *saw*.
34//!
35//! ## What the caller supplies
36//!
37//! - `parse`: maps a raw [`KvUpdate`] to an optional domain value `U`. Returning
38//! `None` (corrupt bytes, irrelevant key) is fine — the update is still
39//! *received*, so it still counts toward the cursor; there is simply nothing to
40//! apply for it.
41//! - `apply`: consumes a `Vec<U>` in revision order. This is the only domain
42//! logic; for the tunnel router it swaps the route table, for the edge origin
43//! watcher it rebuilds the hashrings.
44//! - `on_applied`: fires once per flush, *after* `apply` returns, with the new
45//! applied cursor. Callers use it to persist the cursor for the next restart.
46//!
47//! ## Panics
48//!
49//! `apply` runs inline on the watch task. If it panics, the panic propagates out
50//! of [`watch_applied`] and aborts the watch — that is the caller's contract,
51//! the same as a panic in any other supplied closure.
52
53use std::path::PathBuf;
54use std::sync::Arc;
55use std::time::Duration;
56
57use tokio::sync::mpsc;
58use tokio::sync::{oneshot, watch};
59use tracing::warn;
60
61use crate::artifact::ExportManifest;
62use crate::kv::{KvError, KvReader, KvUpdate, KvWatcher, WatchCursor};
63use crate::snapshot::{SnapshotError, SnapshotStore};
64
65/// A request, sent into a running [`watch_applied`] loop, to export the fold it
66/// owns (see [`SnapshotStore::export_to`]).
67///
68/// `watch_applied` takes its snapshot store **by value**, so a consumer that
69/// wants periodic artifacts of a live fold cannot call `export_to` itself. It
70/// instead passes an `mpsc::Receiver<ExportRequest>` to [`watch_applied`] and
71/// sends requests through the paired sender. The loop handles a request
72/// between batch flushes — after flushing any pending batch — so the artifact's
73/// embedded cursor is exactly the applied cursor at the moment of export.
74///
75/// The export result (or error) comes back on `reply`; an export failure is
76/// reported there and the watch keeps running (the snapshot is a cache — a
77/// failed artifact is the requester's problem, not the fold's).
78pub struct ExportRequest {
79 /// Where the artifact directory will be created. Must not exist (or be an
80 /// empty directory); same filesystem as the fold for cheap hardlinks.
81 pub dest_dir: PathBuf,
82 /// Receives the sealed manifest on success. A dropped receiver is ignored.
83 pub reply: oneshot::Sender<Result<ExportManifest, SnapshotError>>,
84}
85
86/// What to watch: every key, every key under a prefix, or the union of several
87/// prefixes.
88///
89/// Mirrors the [`KvWatcher`] surface — `All` maps to `watch_all` /
90/// `watch_all_from`, `Prefix` to `watch_prefix` / `watch_prefix_from`,
91/// `Prefixes` to `watch_prefixes` / `watch_prefixes_from` (one multi-filter
92/// consumer for the whole union, never one consumer per prefix).
93#[derive(Debug, Clone)]
94pub enum WatchScope {
95 /// Watch all keys in the bucket.
96 All,
97 /// Watch only keys beginning with this prefix.
98 Prefix(String),
99 /// Watch keys beginning with ANY of these prefixes, on a single consumer.
100 Prefixes(Vec<String>),
101}
102
103impl WatchScope {
104 /// The scope as a list of key prefixes (`All` = the empty prefix), for
105 /// callers that enumerate scope contents (live listings, fold ranges).
106 fn prefixes(&self) -> Vec<String> {
107 match self {
108 WatchScope::All => vec![String::new()],
109 WatchScope::Prefix(p) => vec![p.clone()],
110 WatchScope::Prefixes(ps) => ps.clone(),
111 }
112 }
113}
114
115/// Internal: a cursor-expired resync handoff from the watch task to the main
116/// loop. Carries the bucket's live key listing for the watch scope; the main
117/// loop diffs it against the fold and applies synthetic deletes for keys that
118/// vanished during the gap, then acks so the watch task can start the fallback
119/// watch — the ack ordering guarantees every synthetic delete is applied before
120/// the first re-list put arrives.
121struct ResyncRequest {
122 live_keys: Vec<String>,
123 ack: oneshot::Sender<()>,
124}
125
126/// Internal: what the watch task needs to initiate a resync — the reader that
127/// lists live keys and the channel into the main loop that owns the fold.
128type ResyncHandle = (Arc<dyn KvReader>, mpsc::Sender<ResyncRequest>);
129
130/// Batching policy for [`watch_applied`].
131///
132/// A flush fires when **either** bound is hit, whichever comes first: `window`
133/// time has elapsed since the batch opened, or `max` updates have accumulated.
134/// The window amortizes the cost of `apply` (e.g. one route-table clone per
135/// flush instead of one per update); `max` caps memory and latency when updates
136/// arrive faster than the window.
137#[derive(Debug, Clone, Copy)]
138pub struct BatchConfig {
139 /// Maximum time a batch stays open before being flushed.
140 pub window: Duration,
141 /// Maximum number of parsed updates in a batch before forcing a flush.
142 pub max: usize,
143}
144
145impl Default for BatchConfig {
146 /// 10 ms / 100 updates — the de-facto default every hand-rolled caller
147 /// already used, lifted into one place.
148 fn default() -> Self {
149 Self {
150 window: Duration::from_millis(10),
151 max: 100,
152 }
153 }
154}
155
156/// Drive a watch with cursor-after-apply semantics.
157///
158/// Subscribes per `scope` (resuming from `resume` when it carries a position),
159/// batches updates per `config`, applies each batch via `apply`, and only then
160/// advances the cursor / folds the batch into `store` / calls `on_applied`.
161/// Returns the final applied cursor when the watch ends (shutdown signalled, or
162/// the underlying stream closed).
163///
164/// `store` is any [`SnapshotStore`] backend the consumer chose (the in-RAM
165/// [`AppendLogSnapshot`](crate::AppendLogSnapshot) default, an on-disk backend, or
166/// its own impl) — or `None` to run without persistence. On each flush, *after*
167/// `apply` returns, the whole batch of raw [`KvUpdate`]s is handed to
168/// `store.apply(batch, applied_cursor)` on a blocking task, so the store's
169/// persisted cursor is always the post-apply cursor and never names a revision
170/// whose `apply` had not returned. The store fold is atomic (data + cursor), so a
171/// crash leaves the store consistent and resume re-folds only the tail.
172///
173/// # Cursor expiry and stale-key resync
174///
175/// On [`KvError::CursorExpired`] from the `*_from` resume path, this logs and
176/// falls back to a full-scope watch (`watch_all` / `watch_prefix` /
177/// `watch_prefixes`), whose state-sync re-list re-delivers the current value of
178/// every in-scope key as puts. The re-list alone cannot cover keys that were
179/// **deleted during the gap** and whose delete markers the backend has since
180/// evicted — they simply don't appear, leaving the fold (and the caller's
181/// domain state) holding them forever.
182///
183/// When both `reader` and `store` are provided, the expiry path closes that
184/// hole: before the fallback watch starts, the bucket's live keys are listed
185/// via `reader`, diffed against the fold's in-scope keys, and a synthetic
186/// [`KvUpdate::Delete`] (with an unknown [`VersionToken`](crate::VersionToken)) is run through
187/// `parse`/`apply`/`store` for each key that vanished. The synthetic deletes
188/// are strictly ordered before the first re-list put, so a key deleted and
189/// re-created during the gap converges correctly. Without a `reader` (or
190/// without a `store` to diff against) the fallback is re-list-only and a
191/// warning marks the possible stale keys.
192///
193/// See `ARCHITECTURE.md` ("Applied-Cursor Watch") for the invariant and its
194/// rationale.
195///
196/// # Type parameters
197/// - `U`: the caller's domain update type, produced by `parse` and consumed by
198/// `apply`.
199// This combinator takes each of its dependencies as a parameter so every
200// caller-supplied closure (`parse`/`apply`/`on_applied`) keeps its own distinct
201// type and is monomorphized at the call site. Folding them into a builder struct
202// would either box the closures or force a single generic bundle, losing that.
203#[allow(clippy::too_many_arguments)]
204// The flush macro resets `batch_high`/`batch_deadline` for the next loop
205// iteration. At the two flush sites that return immediately afterward (shutdown,
206// channel-close) those resets are dead stores — correct, but flagged. The allow
207// must sit on the function: a statement-scoped `#[allow]` inside the macro body
208// trips the experimental attributes-on-expressions gate (E0658) on stable.
209#[allow(unused_assignments)]
210pub async fn watch_applied<U, S, P, A, O>(
211 watcher: Arc<dyn KvWatcher>,
212 scope: WatchScope,
213 resume: Option<WatchCursor>,
214 // `Some(reader)` arms the cursor-expired stale-key resync (see the function
215 // docs); `None` keeps the re-list-only fallback. Only consulted on expiry —
216 // the hot path never touches it.
217 reader: Option<Arc<dyn KvReader>>,
218 mut store: Option<S>,
219 // `Some(rx)` arms an export-request arm in the select loop: each
220 // [`ExportRequest`] is handled between flushes (pending batch flushed
221 // first), so the exported artifact's cursor is exactly the applied cursor.
222 // `None` (or dropping the paired sender) leaves the loop's behavior
223 // unchanged.
224 mut exports: Option<mpsc::Receiver<ExportRequest>>,
225 config: BatchConfig,
226 mut parse: P,
227 mut apply: A,
228 mut on_applied: O,
229 mut shutdown: watch::Receiver<bool>,
230) -> Result<WatchCursor, KvError>
231where
232 U: Send,
233 // `Send + 'static`: each flush moves `store` onto a blocking task to run its
234 // (potentially blocking) `apply`, then takes it back — the same offload the
235 // append log's compaction always used.
236 S: SnapshotStore + Send + 'static,
237 P: FnMut(&KvUpdate) -> Option<U> + Send,
238 A: FnMut(Vec<U>) + Send,
239 O: FnMut(WatchCursor) + Send,
240{
241 // The cursor we'll return. Initialized from the resume position so that a
242 // watch which receives nothing new still reports the position it resumed
243 // from as "applied" (it is — everything up to it was applied before the last
244 // run persisted it).
245 let mut applied = match &resume {
246 Some(c) => c.clone(),
247 None => WatchCursor::none(),
248 };
249
250 // The scope's prefixes, for the resync diff against the fold. Cloned out
251 // before `scope` moves into the watch task.
252 let scope_prefixes = scope.prefixes();
253
254 // Cursor-expired resync channel, armed only when there is a reader to list
255 // live keys AND a store to diff them against. The watch task sends the live
256 // listing here and waits for the ack before starting the fallback watch, so
257 // synthetic deletes always precede the re-list.
258 let (resync_pair, mut resyncs): (Option<ResyncHandle>, Option<mpsc::Receiver<ResyncRequest>>) =
259 match reader {
260 Some(reader) if store.is_some() => {
261 let (rs_tx, rs_rx) = mpsc::channel(1);
262 (Some((reader, rs_tx)), Some(rs_rx))
263 }
264 _ => (None, None),
265 };
266
267 // Spawn the watch task. It owns the cursor-expired fallback so the main loop
268 // only ever sees a clean ordered stream of updates on `rx`.
269 let (tx, mut rx) = mpsc::channel::<KvUpdate>(256);
270 let handle = {
271 let watcher = Arc::clone(&watcher);
272 tokio::spawn(
273 async move { run_watch(watcher.as_ref(), &scope, resume, resync_pair, tx).await },
274 )
275 };
276
277 // Batch state.
278 //
279 // `batch_high` tracks the version of the most recently *received* update
280 // since the last flush — including updates `parse` rejected. NATS delivers
281 // in revision order, so the last received is the highest, and advancing the
282 // cursor to it after a single atomic `apply` is correct: having seen the max
283 // means we've seen everything below it, and a rejected entry is still
284 // "nothing to apply", hence covered. Reset to `none()` after every flush.
285 // Pre-size to the flush bound so no batch ever re-climbs the reallocation
286 // ladder; `max(1)` only guards a nonsensical `max = 0` config.
287 let batch_cap = config.max.max(1);
288 let mut batch: Vec<U> = Vec::with_capacity(batch_cap);
289 // Raw received updates for the durable `store`, in revision order. Only
290 // populated when a `store` is present; the store folds the *raw* updates
291 // (including ones `parse` rejected — they are still part of the bucket's
292 // state), whereas the parsed `batch` above is the consumer's domain view.
293 let mut raw_batch: Vec<KvUpdate> = Vec::new();
294 let mut batch_high = WatchCursor::none();
295 // `Some` once a batch has opened and the window timer is armed; `None`
296 // between flushes. Only the armed/idle distinction is read in the loop —
297 // the absolute instant lives in the pinned `sleep` future below.
298 let mut batch_deadline: Option<tokio::time::Instant> = None;
299
300 // Flush the current batch, in order: run the domain `apply` (if non-empty) to
301 // completion, advance the cursor, fold the raw batch + cursor durably into
302 // `store`, then fire `on_applied`. The store fold runs on a blocking task
303 // (its `apply` may block on I/O), moving the store in and taking it back — the
304 // same offload the append log's compaction always used. A store error is
305 // logged and the watch continues (the snapshot is a cache); a panicked
306 // blocking task drops the store irrecoverably, which breaks the
307 // resume-after-restart guarantee, so it is surfaced as fatal.
308 macro_rules! flush {
309 () => {{
310 // Nothing received since the last flush → nothing to do at all.
311 // (`raw_batch` can be non-empty with no cursor advance only via the
312 // resync path's synthetic deletes, which carry no revision.)
313 if !batch.is_empty() || !raw_batch.is_empty() || !batch_high.is_none() {
314 if !batch.is_empty() {
315 // INVARIANT: apply() runs and RETURNS before any cursor
316 // advance below. Move the batch out so a panicking apply
317 // can't leave half-consumed state behind.
318 //
319 // `replace` (not `take`) leaves a pre-sized Vec behind so each
320 // batch after the first doesn't re-climb the reallocation
321 // ladder (4→8→…→cap).
322 apply(std::mem::replace(&mut batch, Vec::with_capacity(batch_cap)));
323 }
324 let advanced = !batch_high.is_none();
325 if advanced {
326 applied = batch_high.clone();
327 }
328 if !raw_batch.is_empty()
329 && let Some(mut st) = store.take()
330 {
331 let raw = std::mem::take(&mut raw_batch);
332 // Fold at the post-advance cursor. A synthetic-deletes-only
333 // batch leaves the cursor where it was (the deletes are a
334 // state correction, not log entries), which is safe: an
335 // unchanged — possibly expired — cursor only ever re-runs
336 // this same resync on the next restart.
337 let cur = applied.clone();
338 // Hand the store back unconditionally on a clean return so
339 // a *failed* apply (Ok(Err)) keeps the watch running; only
340 // a *panicked* task (Err) loses the store and is fatal.
341 match tokio::task::spawn_blocking(move || {
342 let res = st.apply(&raw, &cur);
343 (st, res)
344 })
345 .await
346 {
347 Ok((st, Ok(()))) => store = Some(st),
348 Ok((st, Err(e))) => {
349 warn!(error = %e, "snapshot store apply failed; continuing");
350 store = Some(st);
351 }
352 Err(e) => {
353 warn!(error = %e, "snapshot store task panicked; aborting watch");
354 handle.abort();
355 return Err(KvError::WatchError(format!(
356 "snapshot store task panicked: {e}"
357 )));
358 }
359 }
360 }
361 if advanced {
362 on_applied(applied.clone());
363 batch_high = WatchCursor::none();
364 }
365 }
366 batch_deadline = None;
367 }};
368 }
369
370 // A single timer future, reset in place each time a batch opens. The old
371 // `tokio::time::sleep(timeout)` lived inside the select arm, so it was
372 // re-created on every loop iteration — one Arc-backed timer-wheel entry
373 // allocated, registered, and immediately dropped per received update.
374 // Pinning one future and `reset`-ing it reuses that single allocation; the
375 // `if batch_deadline.is_some()` guard keeps it from firing while idle, so
376 // its initial already-elapsed deadline is never observed.
377 let sleep = tokio::time::sleep(Duration::ZERO);
378 tokio::pin!(sleep);
379
380 loop {
381 tokio::select! {
382 biased;
383
384 // Shutdown wins: flush whatever is batched (so the cursor reflects
385 // it), abandon any updates still in flight on the channel — they
386 // weren't applied, the cursor doesn't claim them, and they'll be
387 // re-delivered on the next resume — and return the applied cursor.
388 res = shutdown.changed() => {
389 if res.is_err() || *shutdown.borrow() {
390 flush!();
391 handle.abort();
392 // Observe the task's terminal state. An abort surfaces as a
393 // cancelled JoinError, which we ignore; a genuine panic that
394 // raced ahead of the abort is logged rather than silently lost.
395 if let Err(join) = handle.await
396 && !join.is_cancelled()
397 {
398 warn!(error = %join, "watch task panicked at shutdown");
399 }
400 return Ok(applied);
401 }
402 }
403
404 // Batch window elapsed.
405 () = &mut sleep, if batch_deadline.is_some() => {
406 flush!();
407 }
408
409 // Cursor-expired resync. Placed before `rx.recv()` (biased) so the
410 // synthetic deletes are folded before any update the fallback watch
411 // delivers — though the ack protocol already guarantees the fallback
412 // hasn't started while this arm runs. Diff the fold's in-scope keys
413 // against the bucket's live listing; anything the fold holds that
414 // the bucket no longer does vanished during the gap (its delete
415 // marker evicted with the cursor), so synthesize the delete the
416 // re-list can't deliver.
417 req = async { resyncs.as_mut().expect("arm guarded by is_some").recv().await },
418 if resyncs.is_some() => {
419 match req {
420 Some(ResyncRequest { live_keys, ack }) => {
421 // Flush first so the diff runs against a fold that
422 // reflects everything received so far.
423 flush!();
424 let live: std::collections::HashSet<&str> =
425 live_keys.iter().map(String::as_str).collect();
426 let mut stale: Vec<String> = Vec::new();
427 if let Some(st) = &store {
428 for prefix in &scope_prefixes {
429 match st.range(prefix) {
430 Ok(entries) => stale.extend(
431 entries
432 .into_iter()
433 .filter(|e| !live.contains(e.key.as_str()))
434 .map(|e| e.key),
435 ),
436 Err(e) => {
437 warn!(error = %e, prefix = %prefix,
438 "resync fold range failed; stale keys under this prefix may persist");
439 }
440 }
441 }
442 }
443 // Overlapping prefixes can list a key twice.
444 stale.sort_unstable();
445 stale.dedup();
446 if !stale.is_empty() {
447 warn!(stale = stale.len(), "cursor-expired resync: deleting keys that vanished during the gap");
448 }
449 for key in stale {
450 // Synthetic: carries no revision (unknown version)
451 // and so never advances the cursor.
452 let u = KvUpdate::Delete {
453 key,
454 version: crate::kv::VersionToken::unknown(),
455 };
456 if store.is_some() {
457 raw_batch.push(u.clone());
458 }
459 if let Some(parsed) = parse(&u) {
460 batch.push(parsed);
461 }
462 }
463 flush!();
464 // Ack AFTER the deletes are applied: the watch task is
465 // holding the fallback watch until it hears back, which
466 // is what orders deletes before the re-list.
467 let _ = ack.send(());
468 }
469 None => resyncs = None,
470 }
471 }
472
473 // Export request. Placed after shutdown/window (they stay prompt)
474 // and before `rx.recv()` so a firehose of updates cannot starve an
475 // export indefinitely. The pending batch is flushed first, so the
476 // exported cursor is exactly the applied cursor; the export itself
477 // runs on a blocking task with the store moved in and taken back —
478 // the same offload the flush path uses.
479 req = async { exports.as_mut().expect("arm guarded by is_some").recv().await },
480 if exports.is_some() => {
481 match req {
482 Some(ExportRequest { dest_dir, reply }) => {
483 flush!();
484 match store.take() {
485 Some(mut st) => {
486 match tokio::task::spawn_blocking(move || {
487 let res = st.export_to(&dest_dir);
488 (st, res)
489 })
490 .await
491 {
492 // Hand the store back on any clean return; an
493 // export failure goes to the requester only —
494 // the watch keeps running (the snapshot is a
495 // cache). A panicked task lost the store,
496 // which breaks the resume guarantee: fatal,
497 // same as the flush path's apply panic.
498 Ok((st, res)) => {
499 store = Some(st);
500 let _ = reply.send(res);
501 }
502 Err(e) => {
503 warn!(error = %e, "snapshot export task panicked; aborting watch");
504 handle.abort();
505 return Err(KvError::WatchError(format!(
506 "snapshot export task panicked: {e}"
507 )));
508 }
509 }
510 }
511 None => {
512 let _ = reply.send(Err(SnapshotError::Backend(
513 "watch_applied runs without a snapshot store; nothing to export"
514 .into(),
515 )));
516 }
517 }
518 }
519 // Sender dropped: disarm the arm for the rest of the run.
520 None => exports = None,
521 }
522 }
523
524 update = rx.recv() => {
525 match update {
526 Some(u) => {
527 // Cursor authority: every received update bumps the
528 // pending high-water, regardless of whether `parse`
529 // keeps it.
530 batch_high = WatchCursor::from_version(u.version().clone());
531
532 // Buffer the raw update for the durable store fold (which
533 // commits the whole batch + cursor atomically on flush).
534 // Done before `parse` consumes `u` by reference, and only
535 // when a store is present so the no-persistence path keeps
536 // its zero-copy cost.
537 if store.is_some() {
538 raw_batch.push(u.clone());
539 }
540
541 if let Some(parsed) = parse(&u) {
542 batch.push(parsed);
543 }
544
545 // Arm the window on the first received update of a batch
546 // — even a parse-rejected one, so the cursor advances
547 // within `window` even through a run of irrelevant keys.
548 // Reset the pinned timer to the new deadline rather than
549 // allocating a fresh `Sleep`.
550 if batch_deadline.is_none() {
551 let deadline = tokio::time::Instant::now() + config.window;
552 sleep.as_mut().reset(deadline);
553 batch_deadline = Some(deadline);
554 }
555
556 // Flush on a full parsed batch, or — when persisting — a
557 // full raw batch, so a window packed with parse-rejected
558 // updates can't grow `raw_batch` without bound before the
559 // window elapses.
560 if batch.len() >= config.max || raw_batch.len() >= config.max {
561 flush!();
562 }
563 }
564 None => {
565 // Stream closed. Flush the remainder, then surface the
566 // watch task's terminal result: a clean end returns the
567 // applied cursor, an error propagates.
568 flush!();
569 return match handle.await {
570 Ok(Ok(())) => Ok(applied),
571 Ok(Err(e)) => Err(e),
572 Err(join) => Err(KvError::WatchError(format!(
573 "watch task panicked: {join}"
574 ))),
575 };
576 }
577 }
578 }
579 }
580 }
581}
582
583/// Run the underlying watch for `scope`, resuming from `resume` when it carries
584/// a position, with the [`KvError::CursorExpired`] → resync + full-watch
585/// fallback.
586async fn run_watch(
587 watcher: &dyn KvWatcher,
588 scope: &WatchScope,
589 resume: Option<WatchCursor>,
590 resync: Option<ResyncHandle>,
591 tx: mpsc::Sender<KvUpdate>,
592) -> Result<(), KvError> {
593 // Resume only when the cursor carries a real position; an absent or `none()`
594 // cursor falls through to a full watch. Binding `cursor` here makes "we have a
595 // resume position" structural — there is no separate bool whose truth a later
596 // edit could let drift from the `Some`.
597 let resume_cursor = resume.filter(|c| !c.is_none());
598
599 match scope {
600 WatchScope::All => {
601 if let Some(cursor) = resume_cursor {
602 match watcher.watch_all_from(&cursor, tx.clone()).await {
603 Err(KvError::CursorExpired) => {
604 warn!(
605 "watch cursor expired; resyncing, then falling back to full watch_all"
606 );
607 resync_stale_keys(scope, &resync).await;
608 watcher.watch_all(tx).await
609 }
610 other => other,
611 }
612 } else {
613 watcher.watch_all(tx).await
614 }
615 }
616 WatchScope::Prefix(prefix) => {
617 if let Some(cursor) = resume_cursor {
618 match watcher.watch_prefix_from(prefix, &cursor, tx.clone()).await {
619 Err(KvError::CursorExpired) => {
620 warn!(
621 "watch cursor expired; resyncing, then falling back to full watch_prefix"
622 );
623 resync_stale_keys(scope, &resync).await;
624 watcher.watch_prefix(prefix, tx).await
625 }
626 other => other,
627 }
628 } else {
629 watcher.watch_prefix(prefix, tx).await
630 }
631 }
632 WatchScope::Prefixes(prefixes) => {
633 let refs: Vec<&str> = prefixes.iter().map(String::as_str).collect();
634 if let Some(cursor) = resume_cursor {
635 match watcher
636 .watch_prefixes_from(&refs, &cursor, tx.clone())
637 .await
638 {
639 Err(KvError::CursorExpired) => {
640 warn!(
641 "watch cursor expired; resyncing, then falling back to full watch_prefixes"
642 );
643 resync_stale_keys(scope, &resync).await;
644 watcher.watch_prefixes(&refs, tx).await
645 }
646 other => other,
647 }
648 } else {
649 watcher.watch_prefixes(&refs, tx).await
650 }
651 }
652 }
653}
654
655/// Cursor-expired stale-key resync, run BEFORE the fallback watch is
656/// established: list the scope's live keys, hand them to the main loop (which
657/// diffs them against the fold and applies synthetic deletes), and wait for the
658/// ack. That ordering — deletes applied, then fallback watch armed — is what
659/// makes a delete-then-recreate during the gap converge: the synthetic delete
660/// always lands before the re-list put.
661///
662/// Best-effort: with no reader/store wired (`resync` is `None`), or a failed
663/// listing, this degrades to the warn-and-relist-only fallback — keys deleted
664/// during the gap stay in the fold until their next update.
665async fn resync_stale_keys(scope: &WatchScope, resync: &Option<ResyncHandle>) {
666 let Some((reader, resync_tx)) = resync else {
667 warn!(
668 "no reader wired for cursor-expired resync; keys deleted during the gap may persist in the fold"
669 );
670 return;
671 };
672 let mut live_keys = Vec::new();
673 for prefix in scope.prefixes() {
674 match reader.keys(&prefix).await {
675 Ok(keys) => live_keys.extend(keys),
676 Err(e) => {
677 warn!(error = %e, prefix = %prefix,
678 "resync live-key listing failed; keys deleted during the gap may persist in the fold");
679 return;
680 }
681 }
682 }
683 let (ack_tx, ack_rx) = oneshot::channel();
684 if resync_tx
685 .send(ResyncRequest {
686 live_keys,
687 ack: ack_tx,
688 })
689 .await
690 .is_ok()
691 {
692 // A dropped ack (main loop shutting down) just means the fallback watch
693 // is about to die with it; nothing to recover.
694 let _ = ack_rx.await;
695 }
696}
697
698#[cfg(test)]
699mod tests {
700 use super::*;
701 use crate::kv::{KvEntry, VersionToken};
702 use crate::snapshot::AppendLogSnapshot;
703 use async_trait::async_trait;
704 use std::sync::Mutex;
705 use std::sync::atomic::{AtomicU64, Ordering};
706 use tokio::sync::mpsc::Sender;
707
708 fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
709 KvUpdate::Put(KvEntry {
710 key: key.to_string(),
711 value: value.to_vec(),
712 version: VersionToken::from_u64(rev),
713 })
714 }
715
716 /// A scripted watcher. Delivers a pre-set list of updates through the
717 /// channel, then either holds the channel open (so window/max/shutdown
718 /// flushes can be exercised without the stream ending) or returns cleanly
719 /// (so channel-close flushing can be exercised).
720 struct MockWatcher {
721 full: Mutex<Option<Vec<KvUpdate>>>,
722 from: Mutex<Option<Vec<KvUpdate>>>,
723 from_expires: bool,
724 hold: bool,
725 }
726
727 impl MockWatcher {
728 fn new(updates: Vec<KvUpdate>, hold: bool) -> Self {
729 Self {
730 full: Mutex::new(Some(updates)),
731 from: Mutex::new(None),
732 from_expires: false,
733 hold,
734 }
735 }
736
737 async fn deliver(&self, which: &Mutex<Option<Vec<KvUpdate>>>, tx: Sender<KvUpdate>) {
738 let updates = which.lock().unwrap().take().unwrap_or_default();
739 for u in updates {
740 if tx.send(u).await.is_err() {
741 return;
742 }
743 }
744 if self.hold {
745 // Keep `tx` alive (channel open) until this task is aborted.
746 std::future::pending::<()>().await;
747 }
748 }
749 }
750
751 #[async_trait]
752 impl KvWatcher for MockWatcher {
753 async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
754 self.deliver(&self.full, tx).await;
755 Ok(())
756 }
757
758 async fn watch_prefix(&self, _prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
759 self.deliver(&self.full, tx).await;
760 Ok(())
761 }
762
763 async fn watch_prefixes(
764 &self,
765 _prefixes: &[&str],
766 tx: Sender<KvUpdate>,
767 ) -> Result<(), KvError> {
768 // This mock scripts the applied-watch resumption tests, not prefix
769 // filtering; it delivers the same `full` script as `watch_prefix`.
770 // The real multi-filter scoping is proved in the NATS integration test.
771 self.deliver(&self.full, tx).await;
772 Ok(())
773 }
774
775 async fn watch_all_from(
776 &self,
777 _cursor: &WatchCursor,
778 tx: Sender<KvUpdate>,
779 ) -> Result<(), KvError> {
780 if self.from_expires {
781 return Err(KvError::CursorExpired);
782 }
783 self.deliver(&self.from, tx).await;
784 Ok(())
785 }
786
787 // Mirror watch_all_from so the prefix resume / expiry arms of run_watch
788 // are exercised against the same `from` script. Without this the trait's
789 // default impl would delegate to watch_prefix and silently deliver the
790 // full set instead of the delta.
791 async fn watch_prefix_from(
792 &self,
793 _prefix: &str,
794 _cursor: &WatchCursor,
795 tx: Sender<KvUpdate>,
796 ) -> Result<(), KvError> {
797 if self.from_expires {
798 return Err(KvError::CursorExpired);
799 }
800 self.deliver(&self.from, tx).await;
801 Ok(())
802 }
803
804 // Same mirroring for the multi-prefix resume arm.
805 async fn watch_prefixes_from(
806 &self,
807 _prefixes: &[&str],
808 _cursor: &WatchCursor,
809 tx: Sender<KvUpdate>,
810 ) -> Result<(), KvError> {
811 if self.from_expires {
812 return Err(KvError::CursorExpired);
813 }
814 self.deliver(&self.from, tx).await;
815 Ok(())
816 }
817 }
818
819 /// A reader whose `keys()` serves a scripted live listing — the only call
820 /// the cursor-expired resync makes. Filters by prefix like a real backend
821 /// so prefix-scoped resyncs are exercised faithfully.
822 struct MockReader {
823 live: Vec<String>,
824 }
825
826 #[async_trait]
827 impl KvReader for MockReader {
828 async fn get(&self, _key: &str) -> Result<Option<KvEntry>, KvError> {
829 unreachable!("resync only lists keys")
830 }
831
832 async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
833 Ok(self
834 .live
835 .iter()
836 .filter(|k| k.starts_with(prefix))
837 .cloned()
838 .collect())
839 }
840
841 async fn scan(&self, _prefix: &str) -> Result<Vec<KvEntry>, KvError> {
842 unreachable!("resync only lists keys")
843 }
844 }
845
846 /// A watcher whose entry points all fail. Used to prove the watch task's
847 /// terminal error is surfaced out of `watch_applied` rather than swallowed
848 /// as a clean `Ok(applied)` when the channel closes.
849 struct ErrorWatcher;
850
851 #[async_trait]
852 impl KvWatcher for ErrorWatcher {
853 async fn watch_all(&self, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
854 Err(KvError::WatchError("injected watch failure".into()))
855 }
856
857 async fn watch_prefix(&self, _prefix: &str, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
858 Err(KvError::WatchError("injected watch failure".into()))
859 }
860
861 async fn watch_prefixes(
862 &self,
863 _prefixes: &[&str],
864 _tx: Sender<KvUpdate>,
865 ) -> Result<(), KvError> {
866 Err(KvError::WatchError("injected watch failure".into()))
867 }
868 }
869
870 // A no-op parse that keeps every Put as the value bytes; drops deletes.
871 fn parse_put(u: &KvUpdate) -> Option<Vec<u8>> {
872 match u {
873 KvUpdate::Put(e) => Some(e.value.clone()),
874 _ => None,
875 }
876 }
877
878 /// The stream closes (hold = false) with a pending batch; the remainder is
879 /// flushed before returning, the returned cursor is the last revision, and
880 /// `on_applied` ran exactly once after `apply`.
881 #[tokio::test]
882 async fn flush_on_channel_close() {
883 let updates = vec![put("a", b"1", 1), put("b", b"2", 2), put("c", b"3", 3)];
884 let watcher = Arc::new(MockWatcher::new(updates, false));
885
886 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<Vec<u8>>>::new()));
887 let on_applied_cursors = Arc::new(Mutex::new(Vec::<u64>::new()));
888
889 let ab = Arc::clone(&applied_batches);
890 let oc = Arc::clone(&on_applied_cursors);
891 let (_sd_tx, sd_rx) = watch::channel(false);
892
893 let cursor = watch_applied(
894 watcher,
895 WatchScope::All,
896 None,
897 None, // reader (no resync in this test)
898 None::<AppendLogSnapshot>,
899 None,
900 BatchConfig::default(),
901 parse_put,
902 move |batch| ab.lock().unwrap().push(batch),
903 move |c| oc.lock().unwrap().push(c.as_u64().unwrap()),
904 sd_rx,
905 )
906 .await
907 .unwrap();
908
909 assert_eq!(cursor.as_u64(), Some(3));
910 let batches = applied_batches.lock().unwrap();
911 let flat: Vec<Vec<u8>> = batches.iter().flatten().cloned().collect();
912 assert_eq!(flat, vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec()]);
913 assert_eq!(*on_applied_cursors.lock().unwrap().last().unwrap(), 3);
914 }
915
916 /// Fewer than `max` updates, then the channel idles: the window timer must
917 /// flush them and advance the cursor.
918 #[tokio::test(start_paused = true)]
919 async fn flush_on_window() {
920 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
921 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
922
923 let applied = Arc::new(AtomicU64::new(0));
924 let count = Arc::new(AtomicU64::new(0));
925 let a = Arc::clone(&applied);
926 let c = Arc::clone(&count);
927 let (sd_tx, sd_rx) = watch::channel(false);
928
929 let task = tokio::spawn(watch_applied(
930 watcher,
931 WatchScope::All,
932 None,
933 None, // reader (no resync in this test)
934 None::<AppendLogSnapshot>,
935 None,
936 BatchConfig::default(),
937 parse_put,
938 move |batch: Vec<Vec<u8>>| {
939 c.fetch_add(batch.len() as u64, Ordering::SeqCst);
940 },
941 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
942 sd_rx,
943 ));
944
945 // Let the window (10ms) elapse under virtual time.
946 tokio::time::sleep(Duration::from_millis(50)).await;
947 assert_eq!(
948 count.load(Ordering::SeqCst),
949 2,
950 "window should have flushed"
951 );
952 assert_eq!(applied.load(Ordering::SeqCst), 2);
953
954 sd_tx.send(true).unwrap();
955 let cursor = task.await.unwrap().unwrap();
956 assert_eq!(cursor.as_u64(), Some(2));
957 }
958
959 /// Exactly `max` updates fills a batch and flushes immediately — before the
960 /// window would have elapsed.
961 #[tokio::test(start_paused = true)]
962 async fn flush_on_max() {
963 let max = 4;
964 let updates: Vec<_> = (1..=max as u64)
965 .map(|i| put(&format!("k{i}"), b"v", i))
966 .collect();
967 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
968
969 let flushes = Arc::new(Mutex::new(Vec::<usize>::new()));
970 let f = Arc::clone(&flushes);
971 let (sd_tx, sd_rx) = watch::channel(false);
972
973 let task = tokio::spawn(watch_applied(
974 watcher,
975 WatchScope::All,
976 None,
977 None, // reader (no resync in this test)
978 None::<AppendLogSnapshot>,
979 None,
980 BatchConfig {
981 window: Duration::from_secs(3600), // effectively never
982 max,
983 },
984 parse_put,
985 move |batch: Vec<Vec<u8>>| f.lock().unwrap().push(batch.len()),
986 move |_| {},
987 sd_rx,
988 ));
989
990 // Yield enough for the mock to push all `max` updates; the window is an
991 // hour, so any flush is purely the max trigger.
992 tokio::time::sleep(Duration::from_millis(1)).await;
993 assert_eq!(
994 *flushes.lock().unwrap(),
995 vec![max],
996 "a full batch should flush on max, not wait for the window"
997 );
998
999 sd_tx.send(true).unwrap();
1000 task.await.unwrap().unwrap();
1001 }
1002
1003 /// A pending batch plus a shutdown signal: the batch is flushed and the
1004 /// applied cursor returned.
1005 #[tokio::test(start_paused = true)]
1006 async fn flush_on_shutdown() {
1007 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1008 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1009
1010 let applied = Arc::new(AtomicU64::new(0));
1011 let a = Arc::clone(&applied);
1012 let (sd_tx, sd_rx) = watch::channel(false);
1013
1014 let task = tokio::spawn(watch_applied(
1015 watcher,
1016 WatchScope::All,
1017 None,
1018 None, // reader (no resync in this test)
1019 None::<AppendLogSnapshot>,
1020 None,
1021 BatchConfig {
1022 window: Duration::from_secs(3600), // window won't fire
1023 max: 100,
1024 },
1025 parse_put,
1026 move |_batch: Vec<Vec<u8>>| {},
1027 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1028 sd_rx,
1029 ));
1030
1031 // Give the mock time to deliver both updates into the pending batch.
1032 tokio::time::sleep(Duration::from_millis(1)).await;
1033 sd_tx.send(true).unwrap();
1034
1035 let cursor = task.await.unwrap().unwrap();
1036 assert_eq!(
1037 cursor.as_u64(),
1038 Some(2),
1039 "shutdown flushes the pending batch"
1040 );
1041 assert_eq!(applied.load(Ordering::SeqCst), 2);
1042 }
1043
1044 /// The cursor must not advance until `apply` has returned. We prove it by
1045 /// having `apply` read the cursor that `on_applied` last published: when the
1046 /// second batch is applied, the visible cursor must still be the *first*
1047 /// batch's — never the second's, which only becomes visible after this
1048 /// `apply` returns.
1049 #[tokio::test(start_paused = true)]
1050 async fn cursor_advances_only_after_apply() {
1051 // Two batches of `max` updates each.
1052 let max = 2usize;
1053 let updates: Vec<_> = (1..=4u64).map(|i| put(&format!("k{i}"), b"v", i)).collect();
1054 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1055
1056 // Cursor as last published by on_applied; starts at 0 (nothing applied).
1057 let published = Arc::new(AtomicU64::new(0));
1058 // What `apply` observed as the published cursor at the moment it ran.
1059 let seen_at_apply = Arc::new(Mutex::new(Vec::<u64>::new()));
1060
1061 let pub_for_apply = Arc::clone(&published);
1062 let seen = Arc::clone(&seen_at_apply);
1063 let pub_for_on = Arc::clone(&published);
1064 let (sd_tx, sd_rx) = watch::channel(false);
1065
1066 let task = tokio::spawn(watch_applied(
1067 watcher,
1068 WatchScope::All,
1069 None,
1070 None, // reader (no resync in this test)
1071 None::<AppendLogSnapshot>,
1072 None,
1073 BatchConfig {
1074 window: Duration::from_secs(3600),
1075 max,
1076 },
1077 parse_put,
1078 move |_batch: Vec<Vec<u8>>| {
1079 // The cursor visible here is whatever the PREVIOUS flush
1080 // published — never this batch's, because we haven't returned.
1081 seen.lock()
1082 .unwrap()
1083 .push(pub_for_apply.load(Ordering::SeqCst));
1084 },
1085 move |cur| pub_for_on.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1086 sd_rx,
1087 ));
1088
1089 tokio::time::sleep(Duration::from_millis(1)).await;
1090 sd_tx.send(true).unwrap();
1091 task.await.unwrap().unwrap();
1092
1093 // First apply saw 0 (nothing applied yet); second apply saw 2 (first
1094 // batch's cursor), NOT 4. The cursor only reached 4 after the second
1095 // apply returned.
1096 assert_eq!(*seen_at_apply.lock().unwrap(), vec![0, 2]);
1097 assert_eq!(published.load(Ordering::SeqCst), 4);
1098 }
1099
1100 /// Updates whose `parse` returns `None` (corrupt / irrelevant) carry no
1101 /// domain work, but they were still received — so the cursor must advance
1102 /// over them.
1103 #[tokio::test]
1104 async fn corrupt_parse_entries_advance_cursor() {
1105 let updates = vec![put("a", b"1", 5), put("b", b"2", 6), put("c", b"3", 7)];
1106 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1107
1108 let apply_calls = Arc::new(AtomicU64::new(0));
1109 let on_applied_max = Arc::new(AtomicU64::new(0));
1110 let ac = Arc::clone(&apply_calls);
1111 let om = Arc::clone(&on_applied_max);
1112 let (_sd_tx, sd_rx) = watch::channel(false);
1113
1114 let cursor = watch_applied(
1115 watcher,
1116 WatchScope::All,
1117 None,
1118 None, // reader (no resync in this test)
1119 None::<AppendLogSnapshot>,
1120 None,
1121 BatchConfig::default(),
1122 // Reject everything — simulates corrupt/irrelevant entries.
1123 |_u: &KvUpdate| -> Option<Vec<u8>> { None },
1124 move |batch: Vec<Vec<u8>>| {
1125 ac.fetch_add(1, Ordering::SeqCst);
1126 assert!(batch.is_empty());
1127 },
1128 move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1129 sd_rx,
1130 )
1131 .await
1132 .unwrap();
1133
1134 assert_eq!(cursor.as_u64(), Some(7), "cursor covers rejected updates");
1135 assert_eq!(
1136 apply_calls.load(Ordering::SeqCst),
1137 0,
1138 "an all-rejected batch applies nothing"
1139 );
1140 assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1141 }
1142
1143 /// A resume whose cursor has expired falls back to the full watch and still
1144 /// applies the delivered updates.
1145 #[tokio::test]
1146 async fn cursor_expired_falls_back_to_full_watch() {
1147 let mock = MockWatcher {
1148 full: Mutex::new(Some(vec![put("a", b"1", 10), put("b", b"2", 11)])),
1149 from: Mutex::new(Some(vec![])),
1150 from_expires: true,
1151 hold: false,
1152 };
1153 let watcher = Arc::new(mock);
1154
1155 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1156 let ab = Arc::clone(&applied_batches);
1157 let (_sd_tx, sd_rx) = watch::channel(false);
1158
1159 let cursor = watch_applied(
1160 watcher,
1161 WatchScope::All,
1162 Some(WatchCursor::from_u64(5)), // resume position that "expired"
1163 None, // reader (no resync in this test)
1164 None::<AppendLogSnapshot>,
1165 None,
1166 BatchConfig::default(),
1167 parse_put,
1168 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1169 move |_| {},
1170 sd_rx,
1171 )
1172 .await
1173 .unwrap();
1174
1175 assert_eq!(cursor.as_u64(), Some(11));
1176 assert_eq!(
1177 *applied_batches.lock().unwrap(),
1178 vec![b"1".to_vec(), b"2".to_vec()],
1179 "fallback full watch's updates were applied"
1180 );
1181 }
1182
1183 /// Cursor-expired resync: with a reader + store wired, a key the fold holds
1184 /// that the live listing no longer does gets a synthetic delete — applied
1185 /// strictly BEFORE the fallback re-list — and the persisted fold converges
1186 /// to the live state. The synthetic delete (unknown version) must not move
1187 /// the cursor; the re-list put must.
1188 #[tokio::test]
1189 async fn cursor_expired_resync_deletes_stale_keys() {
1190 let dir = tempfile::TempDir::new().unwrap();
1191 let path = dir.path().join("resync.snap");
1192 let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1193 // The fold from the previous run: node.a and node.b at cursor 2.
1194 store
1195 .apply(
1196 &[put("node.a", b"1", 1), put("node.b", b"2", 2)],
1197 &WatchCursor::from_u64(2),
1198 )
1199 .unwrap();
1200
1201 // During the gap node.b was deleted (marker since evicted) and node.a
1202 // updated; the resume cursor (2) has expired. The fallback re-list
1203 // therefore carries only the surviving key.
1204 let mock = MockWatcher {
1205 full: Mutex::new(Some(vec![put("node.a", b"1b", 10)])),
1206 from: Mutex::new(Some(vec![])),
1207 from_expires: true,
1208 hold: false,
1209 };
1210 let reader = MockReader {
1211 live: vec!["node.a".to_string()],
1212 };
1213
1214 // Record everything `parse` sees, in order, deletes included.
1215 let seen = Arc::new(Mutex::new(Vec::<(String, bool)>::new()));
1216 let s = Arc::clone(&seen);
1217 let (_sd_tx, sd_rx) = watch::channel(false);
1218
1219 let cursor = watch_applied(
1220 Arc::new(mock),
1221 WatchScope::All,
1222 Some(WatchCursor::from_u64(2)),
1223 Some(Arc::new(reader) as Arc<dyn KvReader>),
1224 Some(store),
1225 None,
1226 BatchConfig::default(),
1227 move |u: &KvUpdate| {
1228 s.lock()
1229 .unwrap()
1230 .push((u.key().to_string(), matches!(u, KvUpdate::Delete { .. })));
1231 Some(())
1232 },
1233 |_batch: Vec<()>| {},
1234 |_| {},
1235 sd_rx,
1236 )
1237 .await
1238 .unwrap();
1239
1240 // The re-list put advanced the cursor; the synthetic delete did not.
1241 assert_eq!(cursor.as_u64(), Some(10));
1242 // The synthetic delete strictly precedes the re-list put.
1243 assert_eq!(
1244 *seen.lock().unwrap(),
1245 vec![("node.b".to_string(), true), ("node.a".to_string(), false)],
1246 "synthetic delete must be applied before the fallback re-list"
1247 );
1248
1249 // The persisted fold converged: stale key gone, live key updated.
1250 let snap = crate::snapshot::load(&path).unwrap().unwrap();
1251 assert_eq!(snap.cursor.as_u64(), Some(10));
1252 assert_eq!(snap.entries.len(), 1);
1253 assert_eq!(snap.entries["node.a"].value, b"1b");
1254 }
1255
1256 /// A prefix-scoped resync diffs only in-scope keys: an out-of-scope key the
1257 /// fold holds survives, the in-scope stale key is deleted, and a flush
1258 /// containing only synthetic deletes leaves the cursor untouched.
1259 #[tokio::test]
1260 async fn cursor_expired_resync_respects_scope() {
1261 let dir = tempfile::TempDir::new().unwrap();
1262 let path = dir.path().join("resync-scope.snap");
1263 let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1264 store
1265 .apply(
1266 &[put("node.b", b"2", 1), put("other.z", b"9", 2)],
1267 &WatchCursor::from_u64(2),
1268 )
1269 .unwrap();
1270
1271 // Expired resume; the bucket no longer has ANY node.* keys; the
1272 // fallback re-list is empty.
1273 let mock = MockWatcher {
1274 full: Mutex::new(Some(vec![])),
1275 from: Mutex::new(Some(vec![])),
1276 from_expires: true,
1277 hold: false,
1278 };
1279 let reader = MockReader { live: vec![] };
1280 let (_sd_tx, sd_rx) = watch::channel(false);
1281
1282 let cursor = watch_applied(
1283 Arc::new(mock),
1284 WatchScope::Prefix("node.".to_string()),
1285 Some(WatchCursor::from_u64(2)),
1286 Some(Arc::new(reader) as Arc<dyn KvReader>),
1287 Some(store),
1288 None,
1289 BatchConfig::default(),
1290 |_u: &KvUpdate| Some(()),
1291 |_batch: Vec<()>| {},
1292 |_| {},
1293 sd_rx,
1294 )
1295 .await
1296 .unwrap();
1297
1298 // Deletes-only flush: cursor stays at the resume position.
1299 assert_eq!(cursor.as_u64(), Some(2));
1300
1301 let snap = crate::snapshot::load(&path).unwrap().unwrap();
1302 assert_eq!(snap.cursor.as_u64(), Some(2));
1303 assert!(
1304 !snap.entries.contains_key("node.b"),
1305 "in-scope stale key must be resync-deleted"
1306 );
1307 assert_eq!(
1308 snap.entries["other.z"].value, b"9",
1309 "out-of-scope key must survive a prefix-scoped resync"
1310 );
1311 }
1312
1313 /// `WatchScope::Prefixes` dispatches to `watch_prefixes` (no resume) and to
1314 /// `watch_prefixes_from` with the expiry → full-watch fallback (resume).
1315 #[tokio::test]
1316 async fn prefixes_scope_dispatches_full_watch() {
1317 let updates = vec![put("a.x", b"1", 1), put("b.y", b"2", 2)];
1318 let watcher = Arc::new(MockWatcher::new(updates, false));
1319 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1320 let ab = Arc::clone(&applied_batches);
1321 let (_sd_tx, sd_rx) = watch::channel(false);
1322
1323 let cursor = watch_applied(
1324 watcher,
1325 WatchScope::Prefixes(vec!["a.".to_string(), "b.".to_string()]),
1326 None,
1327 None, // reader (no resync in this test)
1328 None::<AppendLogSnapshot>,
1329 None,
1330 BatchConfig::default(),
1331 parse_put,
1332 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1333 move |_| {},
1334 sd_rx,
1335 )
1336 .await
1337 .unwrap();
1338
1339 assert_eq!(cursor.as_u64(), Some(2));
1340 assert_eq!(
1341 *applied_batches.lock().unwrap(),
1342 vec![b"1".to_vec(), b"2".to_vec()]
1343 );
1344 }
1345
1346 /// `WatchScope::Prefixes` resume whose cursor has expired falls back to the
1347 /// full multi-prefix watch and applies its updates.
1348 #[tokio::test]
1349 async fn prefixes_scope_expired_resume_falls_back() {
1350 let mock = MockWatcher {
1351 full: Mutex::new(Some(vec![put("a.x", b"1", 7)])),
1352 from: Mutex::new(Some(vec![])),
1353 from_expires: true,
1354 hold: false,
1355 };
1356 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1357 let ab = Arc::clone(&applied_batches);
1358 let (_sd_tx, sd_rx) = watch::channel(false);
1359
1360 let cursor = watch_applied(
1361 Arc::new(mock),
1362 WatchScope::Prefixes(vec!["a.".to_string()]),
1363 Some(WatchCursor::from_u64(3)),
1364 None, // reader (no resync in this test)
1365 None::<AppendLogSnapshot>,
1366 None,
1367 BatchConfig::default(),
1368 parse_put,
1369 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1370 move |_| {},
1371 sd_rx,
1372 )
1373 .await
1374 .unwrap();
1375
1376 assert_eq!(cursor.as_u64(), Some(7));
1377 assert_eq!(*applied_batches.lock().unwrap(), vec![b"1".to_vec()]);
1378 }
1379
1380 /// End-to-end with a real snapshot file: after the run, the persisted
1381 /// snapshot's cursor equals the applied cursor and its entries match the
1382 /// applied state — proving the checkpoint is written at the post-apply
1383 /// cursor, never ahead of it.
1384 #[tokio::test]
1385 async fn snapshot_checkpoint_matches_applied_cursor() {
1386 let dir = tempfile::TempDir::new().unwrap();
1387 let path = dir.path().join("applied.snap");
1388 let (_resume, store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1389
1390 let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
1391 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1392 let (_sd_tx, sd_rx) = watch::channel(false);
1393
1394 let cursor = watch_applied(
1395 watcher,
1396 WatchScope::All,
1397 None,
1398 None, // reader (no resync in this test)
1399 Some(store),
1400 None,
1401 BatchConfig::default(),
1402 parse_put,
1403 move |_batch: Vec<Vec<u8>>| {},
1404 move |_| {},
1405 sd_rx,
1406 )
1407 .await
1408 .unwrap();
1409
1410 assert_eq!(cursor.as_u64(), Some(2));
1411
1412 let snap = crate::snapshot::load(&path).unwrap().unwrap();
1413 assert_eq!(
1414 snap.cursor.as_u64(),
1415 cursor.as_u64(),
1416 "snapshot checkpoint cursor must equal the applied cursor"
1417 );
1418 assert_eq!(snap.entries.len(), 2);
1419 assert_eq!(snap.entries["node.a"].value, b"1");
1420 assert_eq!(snap.entries["node.b"].value, b"2");
1421 }
1422
1423 /// Happy-path resume: a non-expired cursor takes the `*_from` path and the
1424 /// delta (the `from` script, NOT the full set) is applied. Proves the
1425 /// resume branch delivers only post-cursor updates and advances to their
1426 /// max revision.
1427 #[tokio::test]
1428 async fn resume_from_cursor_delivers_only_delta() {
1429 let mock = MockWatcher {
1430 // `full` would be delivered only if the resume path were (wrongly)
1431 // bypassed; a non-empty distinguishing value makes that visible.
1432 full: Mutex::new(Some(vec![put("full.x", b"FULL", 1)])),
1433 from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
1434 from_expires: false,
1435 hold: false,
1436 };
1437 let watcher = Arc::new(mock);
1438
1439 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1440 let ab = Arc::clone(&applied_batches);
1441 let (_sd_tx, sd_rx) = watch::channel(false);
1442
1443 let cursor = watch_applied(
1444 watcher,
1445 WatchScope::All,
1446 Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
1447 None, // reader (no resync in this test)
1448 None::<AppendLogSnapshot>,
1449 None,
1450 BatchConfig::default(),
1451 parse_put,
1452 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1453 move |_| {},
1454 sd_rx,
1455 )
1456 .await
1457 .unwrap();
1458
1459 assert_eq!(
1460 cursor.as_u64(),
1461 Some(11),
1462 "cursor advances to the delta max"
1463 );
1464 assert_eq!(
1465 *applied_batches.lock().unwrap(),
1466 vec![b"3".to_vec(), b"4".to_vec()],
1467 "only the post-cursor delta is applied, never the full set"
1468 );
1469 }
1470
1471 /// `WatchScope::Prefix` with no resume dispatches to `watch_prefix` and
1472 /// applies the delivered updates. Every other test uses `WatchScope::All`;
1473 /// this covers the prefix dispatch arm.
1474 #[tokio::test]
1475 async fn prefix_scope_applies_delivered_updates() {
1476 let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
1477 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1478
1479 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1480 let ab = Arc::clone(&applied_batches);
1481 let (_sd_tx, sd_rx) = watch::channel(false);
1482
1483 let cursor = watch_applied(
1484 watcher,
1485 WatchScope::Prefix("node.".to_string()),
1486 None,
1487 None, // reader (no resync in this test)
1488 None::<AppendLogSnapshot>,
1489 None,
1490 BatchConfig::default(),
1491 parse_put,
1492 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1493 move |_| {},
1494 sd_rx,
1495 )
1496 .await
1497 .unwrap();
1498
1499 assert_eq!(cursor.as_u64(), Some(2));
1500 assert_eq!(
1501 *applied_batches.lock().unwrap(),
1502 vec![b"1".to_vec(), b"2".to_vec()]
1503 );
1504 }
1505
1506 /// `WatchScope::Prefix` happy-path resume: a non-expired cursor takes the
1507 /// `watch_prefix_from` path and only the delta is applied — the prefix
1508 /// twin of `resume_from_cursor_delivers_only_delta`.
1509 #[tokio::test]
1510 async fn prefix_resume_from_cursor_delivers_only_delta() {
1511 let mock = MockWatcher {
1512 // `full` would be delivered only if the resume path were (wrongly)
1513 // bypassed; a distinguishing value makes that visible.
1514 full: Mutex::new(Some(vec![put("node.x", b"FULL", 1)])),
1515 from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
1516 from_expires: false,
1517 hold: false,
1518 };
1519 let watcher = Arc::new(mock);
1520
1521 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1522 let ab = Arc::clone(&applied_batches);
1523 let (_sd_tx, sd_rx) = watch::channel(false);
1524
1525 let cursor = watch_applied(
1526 watcher,
1527 WatchScope::Prefix("node.".to_string()),
1528 Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
1529 None, // reader (no resync in this test)
1530 None::<AppendLogSnapshot>,
1531 None,
1532 BatchConfig::default(),
1533 parse_put,
1534 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1535 move |_| {},
1536 sd_rx,
1537 )
1538 .await
1539 .unwrap();
1540
1541 assert_eq!(
1542 cursor.as_u64(),
1543 Some(11),
1544 "cursor advances to the delta max"
1545 );
1546 assert_eq!(
1547 *applied_batches.lock().unwrap(),
1548 vec![b"3".to_vec(), b"4".to_vec()],
1549 "only the post-cursor delta is applied via watch_prefix_from"
1550 );
1551 }
1552
1553 /// `WatchScope::Prefix` resume whose cursor has expired falls back to the
1554 /// full `watch_prefix` and still applies the delivered updates — the prefix
1555 /// twin of `cursor_expired_falls_back_to_full_watch`.
1556 #[tokio::test]
1557 async fn prefix_cursor_expired_falls_back_to_full_prefix_watch() {
1558 let mock = MockWatcher {
1559 full: Mutex::new(Some(vec![put("node.a", b"1", 10), put("node.b", b"2", 11)])),
1560 from: Mutex::new(Some(vec![])),
1561 from_expires: true,
1562 hold: false,
1563 };
1564 let watcher = Arc::new(mock);
1565
1566 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1567 let ab = Arc::clone(&applied_batches);
1568 let (_sd_tx, sd_rx) = watch::channel(false);
1569
1570 let cursor = watch_applied(
1571 watcher,
1572 WatchScope::Prefix("node.".to_string()),
1573 Some(WatchCursor::from_u64(5)), // resume position that "expired"
1574 None, // reader (no resync in this test)
1575 None::<AppendLogSnapshot>,
1576 None,
1577 BatchConfig::default(),
1578 parse_put,
1579 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1580 move |_| {},
1581 sd_rx,
1582 )
1583 .await
1584 .unwrap();
1585
1586 assert_eq!(cursor.as_u64(), Some(11));
1587 assert_eq!(
1588 *applied_batches.lock().unwrap(),
1589 vec![b"1".to_vec(), b"2".to_vec()],
1590 "prefix fallback full watch's updates were applied"
1591 );
1592 }
1593
1594 /// The watch task's terminal error must propagate out of `watch_applied`
1595 /// rather than being swallowed as `Ok(applied)` when the channel closes.
1596 #[tokio::test]
1597 async fn watch_task_error_propagates() {
1598 let watcher = Arc::new(ErrorWatcher);
1599 let (_sd_tx, sd_rx) = watch::channel(false);
1600
1601 let result = watch_applied(
1602 watcher,
1603 WatchScope::All,
1604 None,
1605 None, // reader (no resync in this test)
1606 None::<AppendLogSnapshot>,
1607 None,
1608 BatchConfig::default(),
1609 parse_put,
1610 move |_batch: Vec<Vec<u8>>| {},
1611 move |_| {},
1612 sd_rx,
1613 )
1614 .await;
1615
1616 match result {
1617 Err(KvError::WatchError(msg)) => {
1618 assert!(msg.contains("injected"), "error carries the cause: {msg}");
1619 }
1620 other => panic!("expected WatchError, got {other:?}"),
1621 }
1622 }
1623
1624 /// A batch where `parse` accepts some updates and rejects others: the cursor
1625 /// must still advance to the highest *received* revision (covering the
1626 /// rejected entry in the middle), while `apply` sees only the accepted ones.
1627 #[tokio::test]
1628 async fn mixed_parse_advances_cursor_over_rejected_entries() {
1629 let updates = vec![
1630 put("keep.a", b"1", 5),
1631 put("skip.b", b"2", 6), // rejected by parse
1632 put("keep.c", b"3", 7),
1633 ];
1634 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1635
1636 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1637 let on_applied_max = Arc::new(AtomicU64::new(0));
1638 let ab = Arc::clone(&applied_batches);
1639 let om = Arc::clone(&on_applied_max);
1640 let (_sd_tx, sd_rx) = watch::channel(false);
1641
1642 let cursor = watch_applied(
1643 watcher,
1644 WatchScope::All,
1645 None,
1646 None, // reader (no resync in this test)
1647 None::<AppendLogSnapshot>,
1648 None,
1649 BatchConfig::default(),
1650 // Keep only keys under "keep."; reject everything else.
1651 |u: &KvUpdate| -> Option<Vec<u8>> {
1652 match u {
1653 KvUpdate::Put(e) if e.key.starts_with("keep.") => Some(e.value.clone()),
1654 _ => None,
1655 }
1656 },
1657 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1658 move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1659 sd_rx,
1660 )
1661 .await
1662 .unwrap();
1663
1664 assert_eq!(
1665 cursor.as_u64(),
1666 Some(7),
1667 "cursor covers the rejected middle entry (rev 6)"
1668 );
1669 assert_eq!(
1670 *applied_batches.lock().unwrap(),
1671 vec![b"1".to_vec(), b"3".to_vec()],
1672 "apply sees only the accepted entries"
1673 );
1674 assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1675 }
1676
1677 /// Shutdown before any update arrives: nothing was received, so the cursor
1678 /// stays at the resume position (here `none()`), `apply` never runs, and
1679 /// `on_applied` never fires.
1680 #[tokio::test(start_paused = true)]
1681 async fn shutdown_with_no_pending_batch() {
1682 let watcher = Arc::new(MockWatcher::new(vec![], true)); // deliver nothing, hold open
1683
1684 let apply_calls = Arc::new(AtomicU64::new(0));
1685 let on_applied_calls = Arc::new(AtomicU64::new(0));
1686 let ac = Arc::clone(&apply_calls);
1687 let oc = Arc::clone(&on_applied_calls);
1688 let (sd_tx, sd_rx) = watch::channel(false);
1689
1690 let task = tokio::spawn(watch_applied(
1691 watcher,
1692 WatchScope::All,
1693 None,
1694 None, // reader (no resync in this test)
1695 None::<AppendLogSnapshot>,
1696 None,
1697 BatchConfig::default(),
1698 parse_put,
1699 move |_batch: Vec<Vec<u8>>| {
1700 ac.fetch_add(1, Ordering::SeqCst);
1701 },
1702 move |_| {
1703 oc.fetch_add(1, Ordering::SeqCst);
1704 },
1705 sd_rx,
1706 ));
1707
1708 // Let the watcher attach and idle (it has nothing to deliver), then shut down.
1709 tokio::time::sleep(Duration::from_millis(1)).await;
1710 sd_tx.send(true).unwrap();
1711
1712 let cursor = task.await.unwrap().unwrap();
1713 assert_eq!(
1714 cursor.as_u64(),
1715 None,
1716 "no updates received → cursor unmoved"
1717 );
1718 assert_eq!(apply_calls.load(Ordering::SeqCst), 0, "apply never runs");
1719 assert_eq!(
1720 on_applied_calls.load(Ordering::SeqCst),
1721 0,
1722 "on_applied never fires"
1723 );
1724 }
1725
1726 /// An [`ExportRequest`] flushes the pending batch first, so the artifact's
1727 /// cursor is exactly the applied cursor — and the artifact is importable
1728 /// with the batched entries in it.
1729 #[tokio::test(start_paused = true)]
1730 async fn export_request_flushes_pending_batch_first() {
1731 let dir = tempfile::TempDir::new().unwrap();
1732 let store_path = dir.path().join("fold.snap");
1733 let artifact = dir.path().join("artifact");
1734 let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1735
1736 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1737 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1738 let (sd_tx, sd_rx) = watch::channel(false);
1739 let (ex_tx, ex_rx) = mpsc::channel(1);
1740
1741 let task = tokio::spawn(watch_applied(
1742 watcher,
1743 WatchScope::All,
1744 None,
1745 None, // reader (no resync in this test)
1746 Some(store),
1747 Some(ex_rx),
1748 BatchConfig {
1749 window: Duration::from_secs(3600), // window never fires
1750 max: 100,
1751 },
1752 parse_put,
1753 move |_batch: Vec<Vec<u8>>| {},
1754 move |_| {},
1755 sd_rx,
1756 ));
1757
1758 // Let both updates land in the (unflushed) pending batch, then export.
1759 tokio::time::sleep(Duration::from_millis(1)).await;
1760 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1761 ex_tx
1762 .send(ExportRequest {
1763 dest_dir: artifact.clone(),
1764 reply: reply_tx,
1765 })
1766 .await
1767 .unwrap();
1768
1769 let manifest = reply_rx.await.unwrap().expect("export succeeds");
1770 assert_eq!(
1771 manifest.cursor.as_u64(),
1772 Some(2),
1773 "pending batch flushed before export: artifact cursor is the applied cursor"
1774 );
1775
1776 // The artifact is importable and holds both batched entries.
1777 let (cursor, imported) =
1778 AppendLogSnapshot::import(&artifact, &dir.path().join("imported.snap"), u64::MAX)
1779 .unwrap();
1780 assert_eq!(cursor.as_u64(), Some(2));
1781 assert_eq!(imported.get("a").unwrap().unwrap().value, b"1");
1782 assert_eq!(imported.get("b").unwrap().unwrap().value, b"2");
1783
1784 sd_tx.send(true).unwrap();
1785 task.await.unwrap().unwrap();
1786 }
1787
1788 /// An [`ExportRequest`] that arrives with NOTHING pending (the window
1789 /// already flushed everything) still produces a valid artifact whose
1790 /// cursor is the applied cursor. The flush-before-export step must be a
1791 /// clean no-op, not an error or a cursor regression.
1792 #[tokio::test(start_paused = true)]
1793 async fn export_with_empty_pending_batch_succeeds() {
1794 let dir = tempfile::TempDir::new().unwrap();
1795 let store_path = dir.path().join("fold.snap");
1796 let artifact = dir.path().join("artifact");
1797 let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1798
1799 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1800 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1801 let (sd_tx, sd_rx) = watch::channel(false);
1802 let (ex_tx, ex_rx) = mpsc::channel(1);
1803
1804 let task = tokio::spawn(watch_applied(
1805 watcher,
1806 WatchScope::All,
1807 None,
1808 None, // reader (no resync in this test)
1809 Some(store),
1810 Some(ex_rx),
1811 BatchConfig::default(), // 10 ms window
1812 parse_put,
1813 move |_batch: Vec<Vec<u8>>| {},
1814 move |_| {},
1815 sd_rx,
1816 ));
1817
1818 // Let the window flush both updates, so the export request finds an
1819 // EMPTY pending batch.
1820 tokio::time::sleep(Duration::from_millis(50)).await;
1821
1822 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1823 ex_tx
1824 .send(ExportRequest {
1825 dest_dir: artifact.clone(),
1826 reply: reply_tx,
1827 })
1828 .await
1829 .unwrap();
1830 let manifest = reply_rx
1831 .await
1832 .unwrap()
1833 .expect("export succeeds with nothing pending");
1834 assert_eq!(
1835 manifest.cursor.as_u64(),
1836 Some(2),
1837 "artifact cursor is the applied cursor, unchanged by the no-op flush"
1838 );
1839
1840 // The artifact is importable and holds the already-flushed entries.
1841 let (cursor, imported) =
1842 AppendLogSnapshot::import(&artifact, &dir.path().join("imported.snap"), u64::MAX)
1843 .unwrap();
1844 assert_eq!(cursor.as_u64(), Some(2));
1845 assert_eq!(imported.get("a").unwrap().unwrap().value, b"1");
1846 assert_eq!(imported.get("b").unwrap().unwrap().value, b"2");
1847
1848 sd_tx.send(true).unwrap();
1849 task.await.unwrap().unwrap();
1850 }
1851
1852 /// An export request against a store-less watch replies with an error and
1853 /// the watch keeps running.
1854 #[tokio::test(start_paused = true)]
1855 async fn export_without_store_replies_error() {
1856 let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
1857 let (sd_tx, sd_rx) = watch::channel(false);
1858 let (ex_tx, ex_rx) = mpsc::channel(1);
1859
1860 let task = tokio::spawn(watch_applied(
1861 watcher,
1862 WatchScope::All,
1863 None,
1864 None, // reader (no resync in this test)
1865 None::<AppendLogSnapshot>,
1866 Some(ex_rx),
1867 BatchConfig::default(),
1868 parse_put,
1869 move |_batch: Vec<Vec<u8>>| {},
1870 move |_| {},
1871 sd_rx,
1872 ));
1873
1874 tokio::time::sleep(Duration::from_millis(1)).await;
1875 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1876 ex_tx
1877 .send(ExportRequest {
1878 dest_dir: std::env::temp_dir().join("never-created"),
1879 reply: reply_tx,
1880 })
1881 .await
1882 .unwrap();
1883 assert!(
1884 reply_rx.await.unwrap().is_err(),
1885 "no store → export errors via the reply"
1886 );
1887
1888 // The watch is still alive and returns its applied cursor on shutdown.
1889 sd_tx.send(true).unwrap();
1890 let cursor = task.await.unwrap().unwrap();
1891 assert_eq!(cursor.as_u64(), Some(1));
1892 }
1893
1894 /// An export failure (unavailable destination) is reported on the reply and
1895 /// the watch keeps applying later updates.
1896 #[tokio::test(start_paused = true)]
1897 async fn export_error_does_not_kill_watch() {
1898 let dir = tempfile::TempDir::new().unwrap();
1899 let store_path = dir.path().join("fold.snap");
1900 let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1901
1902 // Occupied destination → export fails.
1903 let occupied = dir.path().join("occupied");
1904 std::fs::create_dir(&occupied).unwrap();
1905 std::fs::write(occupied.join("stray"), b"x").unwrap();
1906
1907 let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
1908 let (sd_tx, sd_rx) = watch::channel(false);
1909 let (ex_tx, ex_rx) = mpsc::channel(1);
1910
1911 let applied = Arc::new(AtomicU64::new(0));
1912 let a = Arc::clone(&applied);
1913
1914 let task = tokio::spawn(watch_applied(
1915 watcher,
1916 WatchScope::All,
1917 None,
1918 None, // reader (no resync in this test)
1919 Some(store),
1920 Some(ex_rx),
1921 BatchConfig::default(),
1922 parse_put,
1923 move |_batch: Vec<Vec<u8>>| {},
1924 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1925 sd_rx,
1926 ));
1927
1928 tokio::time::sleep(Duration::from_millis(1)).await;
1929 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1930 ex_tx
1931 .send(ExportRequest {
1932 dest_dir: occupied,
1933 reply: reply_tx,
1934 })
1935 .await
1936 .unwrap();
1937 match reply_rx.await.unwrap() {
1938 Err(crate::snapshot::SnapshotError::ArtifactInvalid(_)) => {}
1939 other => panic!("expected ArtifactInvalid, got {other:?}"),
1940 }
1941
1942 // Watch still folds: a clean shutdown returns the applied cursor.
1943 sd_tx.send(true).unwrap();
1944 let cursor = task.await.unwrap().unwrap();
1945 assert_eq!(cursor.as_u64(), Some(1), "watch survived the failed export");
1946 assert_eq!(applied.load(Ordering::SeqCst), 1);
1947 }
1948
1949 /// Dropping the export sender disarms the arm; the loop keeps batching and
1950 /// flushing normally.
1951 #[tokio::test(start_paused = true)]
1952 async fn export_sender_dropped_disarms_channel() {
1953 let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
1954 let (sd_tx, sd_rx) = watch::channel(false);
1955 let (ex_tx, ex_rx) = mpsc::channel::<ExportRequest>(1);
1956
1957 let applied = Arc::new(AtomicU64::new(0));
1958 let a = Arc::clone(&applied);
1959
1960 let task = tokio::spawn(watch_applied(
1961 watcher,
1962 WatchScope::All,
1963 None,
1964 None, // reader (no resync in this test)
1965 None::<AppendLogSnapshot>,
1966 Some(ex_rx),
1967 BatchConfig::default(),
1968 parse_put,
1969 move |_batch: Vec<Vec<u8>>| {},
1970 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1971 sd_rx,
1972 ));
1973
1974 drop(ex_tx); // disarm
1975 tokio::time::sleep(Duration::from_millis(50)).await;
1976 assert_eq!(
1977 applied.load(Ordering::SeqCst),
1978 1,
1979 "loop keeps flushing after the export sender is gone"
1980 );
1981
1982 sd_tx.send(true).unwrap();
1983 task.await.unwrap().unwrap();
1984 }
1985
1986 /// With a low `compact_threshold`, the flush path's `spawn_blocking`
1987 /// compaction actually fires (every other snapshot test pins the threshold
1988 /// at `u64::MAX`, leaving that branch dead). After a compacting run the
1989 /// snapshot must still load cleanly with the right cursor and entries.
1990 #[tokio::test]
1991 async fn snapshot_compaction_fires_and_stays_consistent() {
1992 let dir = tempfile::TempDir::new().unwrap();
1993 let path = dir.path().join("applied.snap");
1994 // threshold 0 → every checkpoint reports "needs compact", forcing the
1995 // store's inline-compaction branch on each flush (run off the hot path via
1996 // spawn_blocking inside watch_applied).
1997 let (_resume, store) = AppendLogSnapshot::open(&path, 0).unwrap();
1998
1999 // Re-put the same key across flushes so compaction has duplicates to
2000 // dedup; small max forces multiple flushes (hence multiple compactions).
2001 let updates = vec![
2002 put("node.a", b"1", 1),
2003 put("node.a", b"2", 2),
2004 put("node.b", b"3", 3),
2005 put("node.a", b"4", 4),
2006 ];
2007 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
2008 let (_sd_tx, sd_rx) = watch::channel(false);
2009
2010 let cursor = watch_applied(
2011 watcher,
2012 WatchScope::All,
2013 None,
2014 None, // reader (no resync in this test)
2015 Some(store),
2016 None,
2017 BatchConfig {
2018 window: Duration::from_secs(3600),
2019 max: 1, // one update per flush → a compaction per update
2020 },
2021 parse_put,
2022 move |_batch: Vec<Vec<u8>>| {},
2023 move |_| {},
2024 sd_rx,
2025 )
2026 .await
2027 .unwrap();
2028
2029 assert_eq!(cursor.as_u64(), Some(4));
2030
2031 let snap = crate::snapshot::load(&path).unwrap().unwrap();
2032 assert_eq!(
2033 snap.cursor.as_u64(),
2034 cursor.as_u64(),
2035 "compacted snapshot's cursor still equals the applied cursor"
2036 );
2037 assert_eq!(snap.entries.len(), 2, "duplicates of node.a deduped");
2038 assert_eq!(
2039 snap.entries["node.a"].value, b"4",
2040 "last write per key survives compaction"
2041 );
2042 assert_eq!(snap.entries["node.b"].value, b"3");
2043 }
2044}