slipstream/applied.rs
1//! Cursor-after-apply watch combinator.
2//!
3//! [`watch_applied`] drives a [`KvWatcher`], batches incoming [`KvUpdate`]s over
4//! a short window (or a max count), hands each batch to a caller-supplied
5//! `apply` closure, and **only then** advances the resume cursor, checkpoints
6//! the snapshot, and fires `on_applied`. It encodes one discipline that every
7//! hand-rolled watch loop in the wider system gets subtly wrong:
8//!
9//! > **INVARIANT.** A persisted/reported cursor `C` implies every update with
10//! > revision ≤ `C` has been *applied* — the caller's `apply()` has returned for
11//! > it. The cursor never advances on *receipt* of an update, only after it has
12//! > durably taken effect.
13//!
14//! ## Why receipt is the wrong signal
15//!
16//! The tempting shortcut is to bump the cursor as each update arrives off the
17//! channel (`high_water = rev` on `rx.recv()`), then apply the batch later. On a
18//! crash between those two steps the persisted cursor claims "caught up to rev
19//! N" while rev N is still sitting in an unapplied buffer. On resume the watch
20//! starts *past* rev N and silently skips it — a correctness hole in the exact
21//! "resume after any restart" guarantee this crate advertises.
22//!
23//! Saltzer, Reed & Clark's *End-to-End Arguments in System Design* (1984) names
24//! the fix: a function placed below the endpoints (here, the channel receive)
25//! can only be a performance hint; the *endpoint* — the application of the
26//! update — is the only place the "it happened" guarantee can actually be
27//! established. So the cursor is written from `apply()`'s completion, not from
28//! the transport's delivery.
29//!
30//! The cursor-as-monotonic-index-into-a-log shape itself follows HashiCorp
31//! Consul's anti-entropy / blocking-query lineage: a client holds the last index
32//! it has *reconciled* and re-arms the watch from there, never from the index it
33//! merely *saw*.
34//!
35//! ## What the caller supplies
36//!
37//! - `parse`: maps a raw [`KvUpdate`] to an optional domain value `U`. Returning
38//! `None` (corrupt bytes, irrelevant key) is fine — the update is still
39//! *received*, so it still counts toward the cursor; there is simply nothing to
40//! apply for it.
41//! - `apply`: consumes a `Vec<U>` in revision order. This is the only domain
42//! logic; for the tunnel router it swaps the route table, for the edge origin
43//! watcher it rebuilds the hashrings.
44//! - `on_applied`: fires once per flush, *after* `apply` returns, with the new
45//! applied cursor. Callers use it to persist the cursor for the next restart.
46//!
47//! ## Panics
48//!
49//! `apply` runs inline on the watch task. If it panics, the panic propagates out
50//! of [`watch_applied`] and aborts the watch — that is the caller's contract,
51//! the same as a panic in any other supplied closure.
52
53use std::path::PathBuf;
54use std::sync::Arc;
55use std::time::Duration;
56
57use tokio::sync::mpsc;
58use tokio::sync::{oneshot, watch};
59use tracing::warn;
60
61use crate::artifact::ExportManifest;
62use crate::kv::{KvError, KvReader, KvUpdate, KvWatcher, WatchCursor};
63use crate::snapshot::{SnapshotError, SnapshotStore};
64
65/// A request, sent into a running [`watch_applied`] loop, to export the fold it
66/// owns (see [`SnapshotStore::export_to`]).
67///
68/// `watch_applied` takes its snapshot store **by value**, so a consumer that
69/// wants periodic artifacts of a live fold cannot call `export_to` itself. It
70/// instead passes an `mpsc::Receiver<ExportRequest>` to [`watch_applied`] and
71/// sends requests through the paired sender. The loop handles a request
72/// between batch flushes — after flushing any pending batch — so the artifact's
73/// embedded cursor is exactly the applied cursor at the moment of export.
74///
75/// The export result (or error) comes back on `reply`; an export failure is
76/// reported there and the watch keeps running (the snapshot is a cache — a
77/// failed artifact is the requester's problem, not the fold's).
78pub struct ExportRequest {
79 /// Where the artifact directory will be created. Must not exist (or be an
80 /// empty directory); same filesystem as the fold for cheap hardlinks.
81 pub dest_dir: PathBuf,
82 /// Receives the sealed manifest on success. A dropped receiver is ignored.
83 pub reply: oneshot::Sender<Result<ExportManifest, SnapshotError>>,
84}
85
86/// What to watch: every key, every key under a prefix, or the union of several
87/// prefixes.
88///
89/// Mirrors the [`KvWatcher`] surface — `All` maps to `watch_all` /
90/// `watch_all_from`, `Prefix` to `watch_prefix` / `watch_prefix_from`,
91/// `Prefixes` to `watch_prefixes` / `watch_prefixes_from` (one multi-filter
92/// consumer for the whole union, never one consumer per prefix).
93#[derive(Debug, Clone)]
94pub enum WatchScope {
95 /// Watch all keys in the bucket.
96 All,
97 /// Watch only keys beginning with this prefix.
98 Prefix(String),
99 /// Watch keys beginning with ANY of these prefixes, on a single consumer.
100 Prefixes(Vec<String>),
101}
102
103impl WatchScope {
104 /// The scope as a list of key prefixes (`All` = the empty prefix), for
105 /// callers that enumerate scope contents (live listings, fold ranges).
106 fn prefixes(&self) -> Vec<String> {
107 match self {
108 WatchScope::All => vec![String::new()],
109 WatchScope::Prefix(p) => vec![p.clone()],
110 WatchScope::Prefixes(ps) => ps.clone(),
111 }
112 }
113}
114
115/// Internal: a cursor-expired resync handoff from the watch task to the main
116/// loop. Carries the bucket's live key listing for the watch scope; the main
117/// loop diffs it against the fold and applies synthetic deletes for keys that
118/// vanished during the gap, then acks so the watch task can start the fallback
119/// watch — the ack ordering guarantees every synthetic delete is applied before
120/// the first re-list put arrives.
121struct ResyncRequest {
122 live_keys: Vec<String>,
123 ack: oneshot::Sender<()>,
124}
125
126/// Internal: what the watch task needs to initiate a resync — the reader that
127/// lists live keys and the channel into the main loop that owns the fold.
128type ResyncHandle = (Arc<dyn KvReader>, mpsc::Sender<ResyncRequest>);
129
130/// Batching policy for [`watch_applied`].
131///
132/// A flush fires when **either** bound is hit, whichever comes first: `window`
133/// time has elapsed since the batch opened, or `max` updates have accumulated.
134/// The window amortizes the cost of `apply` (e.g. one route-table clone per
135/// flush instead of one per update); `max` caps memory and latency when updates
136/// arrive faster than the window.
137#[derive(Debug, Clone, Copy)]
138pub struct BatchConfig {
139 /// Maximum time a batch stays open before being flushed.
140 pub window: Duration,
141 /// Maximum number of parsed updates in a batch before forcing a flush.
142 pub max: usize,
143}
144
145impl Default for BatchConfig {
146 /// 10 ms / 100 updates — the de-facto default every hand-rolled caller
147 /// already used, lifted into one place.
148 fn default() -> Self {
149 Self {
150 window: Duration::from_millis(10),
151 max: 100,
152 }
153 }
154}
155
156/// Drive a watch with cursor-after-apply semantics.
157///
158/// Subscribes per `scope` (resuming from `resume` when it carries a position),
159/// batches updates per `config`, applies each batch via `apply`, and only then
160/// advances the cursor / folds the batch into `store` / calls `on_applied`.
161/// Returns the final applied cursor when the watch ends (shutdown signalled, or
162/// the underlying stream closed).
163///
164/// `store` is any [`SnapshotStore`] backend the consumer chose (the in-RAM
165/// [`AppendLogSnapshot`](crate::AppendLogSnapshot) default, an on-disk backend, or
166/// its own impl) — or `None` to run without persistence. On each flush, *after*
167/// `apply` returns, the whole batch of raw [`KvUpdate`]s is handed to
168/// `store.apply(batch, applied_cursor)` on a blocking task, so the store's
169/// persisted cursor is always the post-apply cursor and never names a revision
170/// whose `apply` had not returned. The store fold is atomic (data + cursor), so a
171/// crash leaves the store consistent and resume re-folds only the tail.
172///
173/// # Cursor expiry and stale-key resync
174///
175/// On [`KvError::CursorExpired`] from the `*_from` resume path, this logs and
176/// falls back to a full-scope watch (`watch_all` / `watch_prefix` /
177/// `watch_prefixes`), whose state-sync re-list re-delivers the current value of
178/// every in-scope key as puts. The re-list alone cannot cover keys that were
179/// **deleted during the gap** and whose delete markers the backend has since
180/// evicted — they simply don't appear, leaving the fold (and the caller's
181/// domain state) holding them forever.
182///
183/// When both `reader` and `store` are provided, the expiry path closes that
184/// hole: before the fallback watch starts, the bucket's live keys are listed
185/// via `reader`, diffed against the fold's in-scope keys, and a synthetic
186/// [`KvUpdate::Delete`] (with an unknown [`VersionToken`](crate::VersionToken)) is run through
187/// `parse`/`apply`/`store` for each key that vanished. The synthetic deletes
188/// are strictly ordered before the first re-list put, so a key deleted and
189/// re-created during the gap converges correctly. Without a `reader` (or
190/// without a `store` to diff against) the fallback is re-list-only and a
191/// warning marks the possible stale keys.
192///
193/// A resync that was armed but FAILS (live-key listing or fold diff error) is
194/// fatal to the watch — degrading to re-list-only would silently leave
195/// deleted keys in the fold (`tests/model.rs` proves that divergence
196/// reachable), so the error surfaces and the caller's restart retries the
197/// resume → expiry → resync path from scratch.
198///
199/// See `ARCHITECTURE.md` ("Applied-Cursor Watch") for the invariant and its
200/// rationale.
201///
202/// # Type parameters
203/// - `U`: the caller's domain update type, produced by `parse` and consumed by
204/// `apply`.
205// This combinator takes each of its dependencies as a parameter so every
206// caller-supplied closure (`parse`/`apply`/`on_applied`) keeps its own distinct
207// type and is monomorphized at the call site. Folding them into a builder struct
208// would either box the closures or force a single generic bundle, losing that.
209#[allow(clippy::too_many_arguments)]
210// The flush macro resets `batch_high`/`batch_deadline` for the next loop
211// iteration. At the two flush sites that return immediately afterward (shutdown,
212// channel-close) those resets are dead stores — correct, but flagged. The allow
213// must sit on the function: a statement-scoped `#[allow]` inside the macro body
214// trips the experimental attributes-on-expressions gate (E0658) on stable.
215#[allow(unused_assignments)]
216pub async fn watch_applied<U, S, P, A, O>(
217 watcher: Arc<dyn KvWatcher>,
218 scope: WatchScope,
219 resume: Option<WatchCursor>,
220 // `Some(reader)` arms the cursor-expired stale-key resync (see the function
221 // docs); `None` keeps the re-list-only fallback. Only consulted on expiry —
222 // the hot path never touches it.
223 reader: Option<Arc<dyn KvReader>>,
224 mut store: Option<S>,
225 // `Some(rx)` arms an export-request arm in the select loop: each
226 // [`ExportRequest`] is handled between flushes (pending batch flushed
227 // first), so the exported artifact's cursor is the applied cursor (or,
228 // across a transiently failed store flush, the store's own lagging but
229 // self-consistent cursor — never a cursor past unfolded data).
230 // `None` (or dropping the paired sender) leaves the loop's behavior
231 // unchanged.
232 mut exports: Option<mpsc::Receiver<ExportRequest>>,
233 config: BatchConfig,
234 mut parse: P,
235 mut apply: A,
236 mut on_applied: O,
237 mut shutdown: watch::Receiver<bool>,
238) -> Result<WatchCursor, KvError>
239where
240 U: Send,
241 // `Send + 'static`: each flush moves `store` onto a blocking task to run its
242 // (potentially blocking) `apply`, then takes it back — the same offload the
243 // append log's compaction always used.
244 S: SnapshotStore + Send + 'static,
245 P: FnMut(&KvUpdate) -> Option<U> + Send,
246 A: FnMut(Vec<U>) + Send,
247 O: FnMut(WatchCursor) + Send,
248{
249 // The cursor we'll return. Initialized from the resume position so that a
250 // watch which receives nothing new still reports the position it resumed
251 // from as "applied" (it is — everything up to it was applied before the last
252 // run persisted it).
253 let mut applied = match &resume {
254 Some(c) => c.clone(),
255 None => WatchCursor::none(),
256 };
257
258 // The scope's prefixes, for the resync diff against the fold. Cloned out
259 // before `scope` moves into the watch task.
260 let scope_prefixes = scope.prefixes();
261
262 // Cursor-expired resync channel, armed only when there is a reader to list
263 // live keys AND a store to diff them against. The watch task sends the live
264 // listing here and waits for the ack before starting the fallback watch, so
265 // synthetic deletes always precede the re-list.
266 let (resync_pair, mut resyncs): (Option<ResyncHandle>, Option<mpsc::Receiver<ResyncRequest>>) =
267 match reader {
268 Some(reader) if store.is_some() => {
269 let (rs_tx, rs_rx) = mpsc::channel(1);
270 (Some((reader, rs_tx)), Some(rs_rx))
271 }
272 _ => (None, None),
273 };
274
275 // Spawn the watch task. It owns the cursor-expired fallback so the main loop
276 // only ever sees a clean ordered stream of updates on `rx`.
277 let (tx, mut rx) = mpsc::channel::<KvUpdate>(256);
278 let handle = {
279 let watcher = Arc::clone(&watcher);
280 tokio::spawn(
281 async move { run_watch(watcher.as_ref(), &scope, resume, resync_pair, tx).await },
282 )
283 };
284
285 // Batch state.
286 //
287 // `batch_high` tracks the version of the most recently *received* update
288 // since the last flush — including updates `parse` rejected. NATS delivers
289 // in revision order, so the last received is the highest, and advancing the
290 // cursor to it after a single atomic `apply` is correct: having seen the max
291 // means we've seen everything below it, and a rejected entry is still
292 // "nothing to apply", hence covered. Reset to `none()` after every flush.
293 // Pre-size to the flush bound so no batch ever re-climbs the reallocation
294 // ladder; `max(1)` only guards a nonsensical `max = 0` config.
295 let batch_cap = config.max.max(1);
296 let mut batch: Vec<U> = Vec::with_capacity(batch_cap);
297 // Raw received updates for the durable `store`, in revision order. Only
298 // populated when a `store` is present; the store folds the *raw* updates
299 // (including ones `parse` rejected — they are still part of the bucket's
300 // state), whereas the parsed `batch` above is the consumer's domain view.
301 let mut raw_batch: Vec<KvUpdate> = Vec::new();
302 let mut batch_high = WatchCursor::none();
303 // Consecutive store-apply failures. A transient failure re-queues its raw
304 // batch (cursor authority: the store's cursor and contents advance
305 // together, always); a persistent streak fail-stops before the re-queued
306 // backlog grows without bound.
307 const MAX_STORE_APPLY_FAILURES: u32 = 16;
308 let mut store_fail_streak: u32 = 0;
309 // `Some` once a batch has opened and the window timer is armed; `None`
310 // between flushes. Only the armed/idle distinction is read in the loop —
311 // the absolute instant lives in the pinned `sleep` future below.
312 let mut batch_deadline: Option<tokio::time::Instant> = None;
313
314 // Flush the current batch, in order: run the domain `apply` (if non-empty) to
315 // completion, advance the cursor, fold the raw batch + cursor durably into
316 // `store`, then fire `on_applied`. The store fold runs on a blocking task
317 // (its `apply` may block on I/O), moving the store in and taking it back — the
318 // same offload the append log's compaction always used. A TRANSIENT store
319 // error re-queues the raw batch for cumulative commit on the next flush
320 // (the watch continues; the store's cursor never advances past data it
321 // dropped) and a persistent failure streak is fatal; a panicked
322 // blocking task drops the store irrecoverably, which breaks the
323 // resume-after-restart guarantee, so it is surfaced as fatal.
324 macro_rules! flush {
325 () => {{
326 // Nothing received since the last flush → nothing to do at all.
327 // (`raw_batch` can be non-empty with no cursor advance only via the
328 // resync path's synthetic deletes, which carry no revision.)
329 if !batch.is_empty() || !raw_batch.is_empty() || !batch_high.is_none() {
330 if !batch.is_empty() {
331 // INVARIANT: apply() runs and RETURNS before any cursor
332 // advance below. Move the batch out so a panicking apply
333 // can't leave half-consumed state behind.
334 //
335 // `replace` (not `take`) leaves a pre-sized Vec behind so each
336 // batch after the first doesn't re-climb the reallocation
337 // ladder (4→8→…→cap).
338 apply(std::mem::replace(&mut batch, Vec::with_capacity(batch_cap)));
339 }
340 let advanced = !batch_high.is_none();
341 if advanced {
342 applied = batch_high.clone();
343 }
344 if !raw_batch.is_empty()
345 && let Some(mut st) = store.take()
346 {
347 let raw = std::mem::take(&mut raw_batch);
348 // Fold at the post-advance cursor. A synthetic-deletes-only
349 // batch leaves the cursor where it was (the deletes are a
350 // state correction, not log entries), which is safe: an
351 // unchanged — possibly expired — cursor only ever re-runs
352 // this same resync on the next restart.
353 let cur = applied.clone();
354 // Hand the store AND the raw batch back on a clean return:
355 // a *failed* apply (Ok(Err)) RE-QUEUES the batch so the
356 // next flush commits it cumulatively — the store's cursor
357 // and contents always advance together. Dropping the
358 // failed batch instead lets the NEXT successful flush
359 // advance the cursor over a hole that survives every
360 // restart (reproduced by
361 // `transient_store_failure_never_leaves_a_cursor_gap`).
362 // Only a *panicked* task (Err) loses the store: fatal.
363 match tokio::task::spawn_blocking(move || {
364 let res = st.apply(&raw, &cur);
365 (st, raw, res)
366 })
367 .await
368 {
369 Ok((st, _raw, Ok(()))) => {
370 store = Some(st);
371 store_fail_streak = 0;
372 }
373 Ok((st, raw, Err(e))) => {
374 store_fail_streak += 1;
375 if store_fail_streak >= MAX_STORE_APPLY_FAILURES {
376 // A persistently failing store would otherwise
377 // grow the re-queued batch without bound while
378 // the fold silently stales. Fail-stop: the
379 // restart refolds the tail from the store's
380 // last good cursor.
381 warn!(error = %e, streak = store_fail_streak,
382 "snapshot store apply failing persistently; aborting watch");
383 handle.abort();
384 return Err(KvError::WatchError(format!(
385 "snapshot store apply failed {store_fail_streak} consecutive times: {e}"
386 )));
387 }
388 warn!(error = %e, streak = store_fail_streak,
389 "snapshot store apply failed; batch re-queued for the next flush");
390 store = Some(st);
391 // Prepend: the failed range precedes anything
392 // received since (stream order is preserved for
393 // the eventual cumulative commit).
394 let newer = std::mem::replace(&mut raw_batch, raw);
395 raw_batch.extend(newer);
396 }
397 Err(e) => {
398 warn!(error = %e, "snapshot store task panicked; aborting watch");
399 handle.abort();
400 return Err(KvError::WatchError(format!(
401 "snapshot store task panicked: {e}"
402 )));
403 }
404 }
405 }
406 if advanced {
407 on_applied(applied.clone());
408 batch_high = WatchCursor::none();
409 }
410 }
411 batch_deadline = None;
412 }};
413 }
414
415 // A single timer future, reset in place each time a batch opens. The old
416 // `tokio::time::sleep(timeout)` lived inside the select arm, so it was
417 // re-created on every loop iteration — one Arc-backed timer-wheel entry
418 // allocated, registered, and immediately dropped per received update.
419 // Pinning one future and `reset`-ing it reuses that single allocation; the
420 // `if batch_deadline.is_some()` guard keeps it from firing while idle, so
421 // its initial already-elapsed deadline is never observed.
422 let sleep = tokio::time::sleep(Duration::ZERO);
423 tokio::pin!(sleep);
424
425 loop {
426 tokio::select! {
427 biased;
428
429 // Shutdown wins: flush whatever is batched (so the cursor reflects
430 // it), abandon any updates still in flight on the channel — they
431 // weren't applied, the cursor doesn't claim them, and they'll be
432 // re-delivered on the next resume — and return the applied cursor.
433 res = shutdown.changed() => {
434 if res.is_err() || *shutdown.borrow() {
435 flush!();
436 handle.abort();
437 // Observe the task's terminal state. An abort surfaces as a
438 // cancelled JoinError, which we ignore; a genuine panic that
439 // raced ahead of the abort is logged rather than silently lost.
440 if let Err(join) = handle.await
441 && !join.is_cancelled()
442 {
443 warn!(error = %join, "watch task panicked at shutdown");
444 }
445 return Ok(applied);
446 }
447 }
448
449 // Batch window elapsed.
450 () = &mut sleep, if batch_deadline.is_some() => {
451 flush!();
452 }
453
454 // Cursor-expired resync. Placed before `rx.recv()` (biased) so the
455 // synthetic deletes are folded before any update the fallback watch
456 // delivers — though the ack protocol already guarantees the fallback
457 // hasn't started while this arm runs. Diff the fold's in-scope keys
458 // against the bucket's live listing; anything the fold holds that
459 // the bucket no longer does vanished during the gap (its delete
460 // marker evicted with the cursor), so synthesize the delete the
461 // re-list can't deliver.
462 req = async { resyncs.as_mut().expect("arm guarded by is_some").recv().await },
463 if resyncs.is_some() => {
464 match req {
465 Some(ResyncRequest { live_keys, ack }) => {
466 // Flush first so the diff runs against a fold that
467 // reflects everything received so far.
468 flush!();
469 let live: std::collections::HashSet<&str> =
470 live_keys.iter().map(String::as_str).collect();
471 let mut stale: Vec<String> = Vec::new();
472 if let Some(st) = &store {
473 for prefix in &scope_prefixes {
474 match st.range(prefix) {
475 Ok(entries) => stale.extend(
476 entries
477 .into_iter()
478 .filter(|e| !live.contains(e.key.as_str()))
479 .map(|e| e.key),
480 ),
481 Err(e) => {
482 // FATAL, not a degrade: an incomplete
483 // diff silently leaves deleted keys in
484 // the fold forever (tests/model.rs
485 // proves the divergence reachable
486 // under degrade semantics). Fail the
487 // watch; the restart re-runs the
488 // resume → expiry → resync from
489 // scratch.
490 warn!(error = %e, prefix = %prefix,
491 "resync fold range failed; aborting watch rather than diverging");
492 handle.abort();
493 return Err(KvError::WatchError(format!(
494 "cursor-expired resync failed listing fold prefix {prefix:?}: {e}"
495 )));
496 }
497 }
498 }
499 }
500 // Overlapping prefixes can list a key twice.
501 stale.sort_unstable();
502 stale.dedup();
503 if !stale.is_empty() {
504 warn!(stale = stale.len(), "cursor-expired resync: deleting keys that vanished during the gap");
505 }
506 for key in stale {
507 // Synthetic: carries no revision (unknown version)
508 // and so never advances the cursor.
509 let u = KvUpdate::Delete {
510 key,
511 version: crate::kv::VersionToken::unknown(),
512 };
513 if store.is_some() {
514 raw_batch.push(u.clone());
515 }
516 if let Some(parsed) = parse(&u) {
517 batch.push(parsed);
518 }
519 }
520 flush!();
521 // Ack AFTER the deletes are applied: the watch task is
522 // holding the fallback watch until it hears back, which
523 // is what orders deletes before the re-list
524 // (tests/model_resync_order.rs proves the barrier
525 // load-bearing). If the flush's STORE apply failed
526 // transiently, the deletes sit re-queued at the FRONT
527 // of the raw batch — still strictly before any
528 // re-list put in the eventual cumulative commit, and
529 // the domain apply saw them before this ack either
530 // way.
531 let _ = ack.send(());
532 }
533 None => resyncs = None,
534 }
535 }
536
537 // Export request. Placed after shutdown/window (they stay prompt)
538 // and before `rx.recv()` so a firehose of updates cannot starve an
539 // export indefinitely. The pending batch is flushed first, so the
540 // exported cursor is exactly the applied cursor — except when
541 // that flush's store apply transiently failed (batch re-queued):
542 // the export then captures the store's OWN lagging cursor, which
543 // is still self-consistent with its contents (cursor authority,
544 // tests/model_applied.rs); the artifact never includes unfolded
545 // data, and a bootstrap from it simply replays the short gap.
546 // The export itself runs on a blocking task with the store moved
547 // in and taken back — the same offload the flush path uses.
548 req = async { exports.as_mut().expect("arm guarded by is_some").recv().await },
549 if exports.is_some() => {
550 match req {
551 Some(ExportRequest { dest_dir, reply }) => {
552 flush!();
553 match store.take() {
554 Some(mut st) => {
555 match tokio::task::spawn_blocking(move || {
556 let res = st.export_to(&dest_dir);
557 (st, res)
558 })
559 .await
560 {
561 // Hand the store back on any clean return; an
562 // export failure goes to the requester only —
563 // the watch keeps running (the snapshot is a
564 // cache). A panicked task lost the store,
565 // which breaks the resume guarantee: fatal,
566 // same as the flush path's apply panic.
567 Ok((st, res)) => {
568 store = Some(st);
569 let _ = reply.send(res);
570 }
571 Err(e) => {
572 warn!(error = %e, "snapshot export task panicked; aborting watch");
573 handle.abort();
574 return Err(KvError::WatchError(format!(
575 "snapshot export task panicked: {e}"
576 )));
577 }
578 }
579 }
580 None => {
581 let _ = reply.send(Err(SnapshotError::Backend(
582 "watch_applied runs without a snapshot store; nothing to export"
583 .into(),
584 )));
585 }
586 }
587 }
588 // Sender dropped: disarm the arm for the rest of the run.
589 None => exports = None,
590 }
591 }
592
593 update = rx.recv() => {
594 match update {
595 Some(u) => {
596 // Cursor authority: every received update bumps the
597 // pending high-water, regardless of whether `parse`
598 // keeps it.
599 batch_high = WatchCursor::from_version(u.version().clone());
600
601 // Buffer the raw update for the durable store fold (which
602 // commits the whole batch + cursor atomically on flush).
603 // Done before `parse` consumes `u` by reference, and only
604 // when a store is present so the no-persistence path keeps
605 // its zero-copy cost.
606 if store.is_some() {
607 raw_batch.push(u.clone());
608 }
609
610 if let Some(parsed) = parse(&u) {
611 batch.push(parsed);
612 }
613
614 // Arm the window on the first received update of a batch
615 // — even a parse-rejected one, so the cursor advances
616 // within `window` even through a run of irrelevant keys.
617 // Reset the pinned timer to the new deadline rather than
618 // allocating a fresh `Sleep`.
619 if batch_deadline.is_none() {
620 let deadline = tokio::time::Instant::now() + config.window;
621 sleep.as_mut().reset(deadline);
622 batch_deadline = Some(deadline);
623 }
624
625 // Flush on a full parsed batch, or — when persisting — a
626 // full raw batch, so a window packed with parse-rejected
627 // updates can't grow `raw_batch` without bound before the
628 // window elapses.
629 if batch.len() >= config.max || raw_batch.len() >= config.max {
630 flush!();
631 }
632 }
633 None => {
634 // Stream closed. Flush the remainder, then surface the
635 // watch task's terminal result: a clean end returns the
636 // applied cursor, an error propagates.
637 flush!();
638 return match handle.await {
639 Ok(Ok(())) => Ok(applied),
640 Ok(Err(e)) => Err(e),
641 Err(join) => Err(KvError::WatchError(format!(
642 "watch task panicked: {join}"
643 ))),
644 };
645 }
646 }
647 }
648 }
649 }
650}
651
652/// Run the underlying watch for `scope`, resuming from `resume` when it carries
653/// a position, with the [`KvError::CursorExpired`] → resync + full-watch
654/// fallback.
655async fn run_watch(
656 watcher: &dyn KvWatcher,
657 scope: &WatchScope,
658 resume: Option<WatchCursor>,
659 resync: Option<ResyncHandle>,
660 tx: mpsc::Sender<KvUpdate>,
661) -> Result<(), KvError> {
662 // Resume only when the cursor carries a real position; an absent or `none()`
663 // cursor falls through to a full watch. Binding `cursor` here makes "we have a
664 // resume position" structural — there is no separate bool whose truth a later
665 // edit could let drift from the `Some`.
666 let resume_cursor = resume.filter(|c| !c.is_none());
667
668 match scope {
669 WatchScope::All => {
670 if let Some(cursor) = resume_cursor {
671 match watcher.watch_all_from(&cursor, tx.clone()).await {
672 Err(KvError::CursorExpired) => {
673 warn!(
674 "watch cursor expired; resyncing, then falling back to full watch_all"
675 );
676 resync_stale_keys(scope, &resync).await?;
677 watcher.watch_all(tx).await
678 }
679 other => other,
680 }
681 } else {
682 watcher.watch_all(tx).await
683 }
684 }
685 WatchScope::Prefix(prefix) => {
686 if let Some(cursor) = resume_cursor {
687 match watcher.watch_prefix_from(prefix, &cursor, tx.clone()).await {
688 Err(KvError::CursorExpired) => {
689 warn!(
690 "watch cursor expired; resyncing, then falling back to full watch_prefix"
691 );
692 resync_stale_keys(scope, &resync).await?;
693 watcher.watch_prefix(prefix, tx).await
694 }
695 other => other,
696 }
697 } else {
698 watcher.watch_prefix(prefix, tx).await
699 }
700 }
701 WatchScope::Prefixes(prefixes) => {
702 let refs: Vec<&str> = prefixes.iter().map(String::as_str).collect();
703 if let Some(cursor) = resume_cursor {
704 match watcher
705 .watch_prefixes_from(&refs, &cursor, tx.clone())
706 .await
707 {
708 Err(KvError::CursorExpired) => {
709 warn!(
710 "watch cursor expired; resyncing, then falling back to full watch_prefixes"
711 );
712 resync_stale_keys(scope, &resync).await?;
713 watcher.watch_prefixes(&refs, tx).await
714 }
715 other => other,
716 }
717 } else {
718 watcher.watch_prefixes(&refs, tx).await
719 }
720 }
721 }
722}
723
724/// Cursor-expired stale-key resync, run BEFORE the fallback watch is
725/// established: list the scope's live keys, hand them to the main loop (which
726/// diffs them against the fold and applies synthetic deletes), and wait for the
727/// ack. That ordering — deletes applied, then fallback watch armed — is what
728/// makes a delete-then-recreate during the gap converge: the synthetic delete
729/// always lands before the re-list put.
730///
731/// With no reader/store wired (`resync` is `None`) the caller explicitly opted
732/// out: warn and fall back re-list-only (keys deleted during the gap stay in
733/// the fold — `tests/model.rs` pins this divergence as reachable).
734///
735/// A FAILED listing, by contrast, is **fatal** — it fails the watch rather
736/// than degrading. The resync is load-bearing for the "stale, never corrupt"
737/// convergence guarantee: a silently degraded resync leaves the fold holding
738/// keys the bucket deleted, with one warn line as the only witness
739/// (`tests/model.rs` proves this divergence reachable under degrade
740/// semantics). Failing the watch turns the violated guarantee into a visible
741/// error; the caller's restart re-resumes, hits `CursorExpired` again, and
742/// retries the resync from scratch.
743async fn resync_stale_keys(
744 scope: &WatchScope,
745 resync: &Option<ResyncHandle>,
746) -> Result<(), KvError> {
747 let Some((reader, resync_tx)) = resync else {
748 warn!(
749 "no reader wired for cursor-expired resync; keys deleted during the gap may persist in the fold"
750 );
751 return Ok(());
752 };
753 let mut live_keys = Vec::new();
754 for prefix in scope.prefixes() {
755 match reader.keys(&prefix).await {
756 Ok(keys) => live_keys.extend(keys),
757 Err(e) => {
758 return Err(KvError::WatchError(format!(
759 "cursor-expired resync failed listing live keys under {prefix:?}: {e}; \
760 failing the watch rather than silently keeping stale keys"
761 )));
762 }
763 }
764 }
765 let (ack_tx, ack_rx) = oneshot::channel();
766 if resync_tx
767 .send(ResyncRequest {
768 live_keys,
769 ack: ack_tx,
770 })
771 .await
772 .is_ok()
773 {
774 // A dropped ack (main loop shutting down) just means the fallback watch
775 // is about to die with it; nothing to recover.
776 let _ = ack_rx.await;
777 }
778 Ok(())
779}
780
781#[cfg(test)]
782mod tests {
783 use super::*;
784 use crate::kv::{KvEntry, VersionToken};
785 use crate::snapshot::AppendLogSnapshot;
786 use async_trait::async_trait;
787 use std::sync::Mutex;
788 use std::sync::atomic::{AtomicU64, Ordering};
789 use tokio::sync::mpsc::Sender;
790
791 fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
792 KvUpdate::Put(KvEntry {
793 key: key.to_string(),
794 value: value.to_vec(),
795 version: VersionToken::from_u64(rev),
796 })
797 }
798
799 /// A scripted watcher. Delivers a pre-set list of updates through the
800 /// channel, then either holds the channel open (so window/max/shutdown
801 /// flushes can be exercised without the stream ending) or returns cleanly
802 /// (so channel-close flushing can be exercised).
803 struct MockWatcher {
804 full: Mutex<Option<Vec<KvUpdate>>>,
805 from: Mutex<Option<Vec<KvUpdate>>>,
806 from_expires: bool,
807 hold: bool,
808 }
809
810 impl MockWatcher {
811 fn new(updates: Vec<KvUpdate>, hold: bool) -> Self {
812 Self {
813 full: Mutex::new(Some(updates)),
814 from: Mutex::new(None),
815 from_expires: false,
816 hold,
817 }
818 }
819
820 async fn deliver(&self, which: &Mutex<Option<Vec<KvUpdate>>>, tx: Sender<KvUpdate>) {
821 let updates = which.lock().unwrap().take().unwrap_or_default();
822 for u in updates {
823 if tx.send(u).await.is_err() {
824 return;
825 }
826 }
827 if self.hold {
828 // Keep `tx` alive (channel open) until this task is aborted.
829 std::future::pending::<()>().await;
830 }
831 }
832 }
833
834 #[async_trait]
835 impl KvWatcher for MockWatcher {
836 async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
837 self.deliver(&self.full, tx).await;
838 Ok(())
839 }
840
841 async fn watch_prefix(&self, _prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
842 self.deliver(&self.full, tx).await;
843 Ok(())
844 }
845
846 async fn watch_prefixes(
847 &self,
848 _prefixes: &[&str],
849 tx: Sender<KvUpdate>,
850 ) -> Result<(), KvError> {
851 // This mock scripts the applied-watch resumption tests, not prefix
852 // filtering; it delivers the same `full` script as `watch_prefix`.
853 // The real multi-filter scoping is proved in the NATS integration test.
854 self.deliver(&self.full, tx).await;
855 Ok(())
856 }
857
858 async fn watch_all_from(
859 &self,
860 _cursor: &WatchCursor,
861 tx: Sender<KvUpdate>,
862 ) -> Result<(), KvError> {
863 if self.from_expires {
864 return Err(KvError::CursorExpired);
865 }
866 self.deliver(&self.from, tx).await;
867 Ok(())
868 }
869
870 // Mirror watch_all_from so the prefix resume / expiry arms of run_watch
871 // are exercised against the same `from` script. Without this the trait's
872 // default impl would delegate to watch_prefix and silently deliver the
873 // full set instead of the delta.
874 async fn watch_prefix_from(
875 &self,
876 _prefix: &str,
877 _cursor: &WatchCursor,
878 tx: Sender<KvUpdate>,
879 ) -> Result<(), KvError> {
880 if self.from_expires {
881 return Err(KvError::CursorExpired);
882 }
883 self.deliver(&self.from, tx).await;
884 Ok(())
885 }
886
887 // Same mirroring for the multi-prefix resume arm.
888 async fn watch_prefixes_from(
889 &self,
890 _prefixes: &[&str],
891 _cursor: &WatchCursor,
892 tx: Sender<KvUpdate>,
893 ) -> Result<(), KvError> {
894 if self.from_expires {
895 return Err(KvError::CursorExpired);
896 }
897 self.deliver(&self.from, tx).await;
898 Ok(())
899 }
900 }
901
902 /// A reader whose `keys()` serves a scripted live listing — the only call
903 /// the cursor-expired resync makes. Filters by prefix like a real backend
904 /// so prefix-scoped resyncs are exercised faithfully.
905 struct MockReader {
906 live: Vec<String>,
907 }
908
909 #[async_trait]
910 impl KvReader for MockReader {
911 async fn get(&self, _key: &str) -> Result<Option<KvEntry>, KvError> {
912 unreachable!("resync only lists keys")
913 }
914
915 async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
916 Ok(self
917 .live
918 .iter()
919 .filter(|k| k.starts_with(prefix))
920 .cloned()
921 .collect())
922 }
923
924 async fn scan(&self, _prefix: &str) -> Result<Vec<KvEntry>, KvError> {
925 unreachable!("resync only lists keys")
926 }
927 }
928
929 /// A watcher whose entry points all fail. Used to prove the watch task's
930 /// terminal error is surfaced out of `watch_applied` rather than swallowed
931 /// as a clean `Ok(applied)` when the channel closes.
932 struct ErrorWatcher;
933
934 #[async_trait]
935 impl KvWatcher for ErrorWatcher {
936 async fn watch_all(&self, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
937 Err(KvError::WatchError("injected watch failure".into()))
938 }
939
940 async fn watch_prefix(&self, _prefix: &str, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
941 Err(KvError::WatchError("injected watch failure".into()))
942 }
943
944 async fn watch_prefixes(
945 &self,
946 _prefixes: &[&str],
947 _tx: Sender<KvUpdate>,
948 ) -> Result<(), KvError> {
949 Err(KvError::WatchError("injected watch failure".into()))
950 }
951 }
952
953 // A no-op parse that keeps every Put as the value bytes; drops deletes.
954 fn parse_put(u: &KvUpdate) -> Option<Vec<u8>> {
955 match u {
956 KvUpdate::Put(e) => Some(e.value.clone()),
957 _ => None,
958 }
959 }
960
961 /// The stream closes (hold = false) with a pending batch; the remainder is
962 /// flushed before returning, the returned cursor is the last revision, and
963 /// `on_applied` ran exactly once after `apply`.
964 #[tokio::test]
965 async fn flush_on_channel_close() {
966 let updates = vec![put("a", b"1", 1), put("b", b"2", 2), put("c", b"3", 3)];
967 let watcher = Arc::new(MockWatcher::new(updates, false));
968
969 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<Vec<u8>>>::new()));
970 let on_applied_cursors = Arc::new(Mutex::new(Vec::<u64>::new()));
971
972 let ab = Arc::clone(&applied_batches);
973 let oc = Arc::clone(&on_applied_cursors);
974 let (_sd_tx, sd_rx) = watch::channel(false);
975
976 let cursor = watch_applied(
977 watcher,
978 WatchScope::All,
979 None,
980 None, // reader (no resync in this test)
981 None::<AppendLogSnapshot>,
982 None,
983 BatchConfig::default(),
984 parse_put,
985 move |batch| ab.lock().unwrap().push(batch),
986 move |c| oc.lock().unwrap().push(c.as_u64().unwrap()),
987 sd_rx,
988 )
989 .await
990 .unwrap();
991
992 assert_eq!(cursor.as_u64(), Some(3));
993 let batches = applied_batches.lock().unwrap();
994 let flat: Vec<Vec<u8>> = batches.iter().flatten().cloned().collect();
995 assert_eq!(flat, vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec()]);
996 assert_eq!(*on_applied_cursors.lock().unwrap().last().unwrap(), 3);
997 }
998
999 /// Fewer than `max` updates, then the channel idles: the window timer must
1000 /// flush them and advance the cursor.
1001 #[tokio::test(start_paused = true)]
1002 async fn flush_on_window() {
1003 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1004 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1005
1006 let applied = Arc::new(AtomicU64::new(0));
1007 let count = Arc::new(AtomicU64::new(0));
1008 let a = Arc::clone(&applied);
1009 let c = Arc::clone(&count);
1010 let (sd_tx, sd_rx) = watch::channel(false);
1011
1012 let task = tokio::spawn(watch_applied(
1013 watcher,
1014 WatchScope::All,
1015 None,
1016 None, // reader (no resync in this test)
1017 None::<AppendLogSnapshot>,
1018 None,
1019 BatchConfig::default(),
1020 parse_put,
1021 move |batch: Vec<Vec<u8>>| {
1022 c.fetch_add(batch.len() as u64, Ordering::SeqCst);
1023 },
1024 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1025 sd_rx,
1026 ));
1027
1028 // Let the window (10ms) elapse under virtual time.
1029 tokio::time::sleep(Duration::from_millis(50)).await;
1030 assert_eq!(
1031 count.load(Ordering::SeqCst),
1032 2,
1033 "window should have flushed"
1034 );
1035 assert_eq!(applied.load(Ordering::SeqCst), 2);
1036
1037 sd_tx.send(true).unwrap();
1038 let cursor = task.await.unwrap().unwrap();
1039 assert_eq!(cursor.as_u64(), Some(2));
1040 }
1041
1042 /// Exactly `max` updates fills a batch and flushes immediately — before the
1043 /// window would have elapsed.
1044 #[tokio::test(start_paused = true)]
1045 async fn flush_on_max() {
1046 let max = 4;
1047 let updates: Vec<_> = (1..=max as u64)
1048 .map(|i| put(&format!("k{i}"), b"v", i))
1049 .collect();
1050 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1051
1052 let flushes = Arc::new(Mutex::new(Vec::<usize>::new()));
1053 let f = Arc::clone(&flushes);
1054 let (sd_tx, sd_rx) = watch::channel(false);
1055
1056 let task = tokio::spawn(watch_applied(
1057 watcher,
1058 WatchScope::All,
1059 None,
1060 None, // reader (no resync in this test)
1061 None::<AppendLogSnapshot>,
1062 None,
1063 BatchConfig {
1064 window: Duration::from_secs(3600), // effectively never
1065 max,
1066 },
1067 parse_put,
1068 move |batch: Vec<Vec<u8>>| f.lock().unwrap().push(batch.len()),
1069 move |_| {},
1070 sd_rx,
1071 ));
1072
1073 // Yield enough for the mock to push all `max` updates; the window is an
1074 // hour, so any flush is purely the max trigger.
1075 tokio::time::sleep(Duration::from_millis(1)).await;
1076 assert_eq!(
1077 *flushes.lock().unwrap(),
1078 vec![max],
1079 "a full batch should flush on max, not wait for the window"
1080 );
1081
1082 sd_tx.send(true).unwrap();
1083 task.await.unwrap().unwrap();
1084 }
1085
1086 /// A pending batch plus a shutdown signal: the batch is flushed and the
1087 /// applied cursor returned.
1088 #[tokio::test(start_paused = true)]
1089 async fn flush_on_shutdown() {
1090 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1091 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1092
1093 let applied = Arc::new(AtomicU64::new(0));
1094 let a = Arc::clone(&applied);
1095 let (sd_tx, sd_rx) = watch::channel(false);
1096
1097 let task = tokio::spawn(watch_applied(
1098 watcher,
1099 WatchScope::All,
1100 None,
1101 None, // reader (no resync in this test)
1102 None::<AppendLogSnapshot>,
1103 None,
1104 BatchConfig {
1105 window: Duration::from_secs(3600), // window won't fire
1106 max: 100,
1107 },
1108 parse_put,
1109 move |_batch: Vec<Vec<u8>>| {},
1110 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1111 sd_rx,
1112 ));
1113
1114 // Give the mock time to deliver both updates into the pending batch.
1115 tokio::time::sleep(Duration::from_millis(1)).await;
1116 sd_tx.send(true).unwrap();
1117
1118 let cursor = task.await.unwrap().unwrap();
1119 assert_eq!(
1120 cursor.as_u64(),
1121 Some(2),
1122 "shutdown flushes the pending batch"
1123 );
1124 assert_eq!(applied.load(Ordering::SeqCst), 2);
1125 }
1126
1127 /// The cursor must not advance until `apply` has returned. We prove it by
1128 /// having `apply` read the cursor that `on_applied` last published: when the
1129 /// second batch is applied, the visible cursor must still be the *first*
1130 /// batch's — never the second's, which only becomes visible after this
1131 /// `apply` returns.
1132 #[tokio::test(start_paused = true)]
1133 async fn cursor_advances_only_after_apply() {
1134 // Two batches of `max` updates each.
1135 let max = 2usize;
1136 let updates: Vec<_> = (1..=4u64).map(|i| put(&format!("k{i}"), b"v", i)).collect();
1137 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1138
1139 // Cursor as last published by on_applied; starts at 0 (nothing applied).
1140 let published = Arc::new(AtomicU64::new(0));
1141 // What `apply` observed as the published cursor at the moment it ran.
1142 let seen_at_apply = Arc::new(Mutex::new(Vec::<u64>::new()));
1143
1144 let pub_for_apply = Arc::clone(&published);
1145 let seen = Arc::clone(&seen_at_apply);
1146 let pub_for_on = Arc::clone(&published);
1147 let (sd_tx, sd_rx) = watch::channel(false);
1148
1149 let task = tokio::spawn(watch_applied(
1150 watcher,
1151 WatchScope::All,
1152 None,
1153 None, // reader (no resync in this test)
1154 None::<AppendLogSnapshot>,
1155 None,
1156 BatchConfig {
1157 window: Duration::from_secs(3600),
1158 max,
1159 },
1160 parse_put,
1161 move |_batch: Vec<Vec<u8>>| {
1162 // The cursor visible here is whatever the PREVIOUS flush
1163 // published — never this batch's, because we haven't returned.
1164 seen.lock()
1165 .unwrap()
1166 .push(pub_for_apply.load(Ordering::SeqCst));
1167 },
1168 move |cur| pub_for_on.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1169 sd_rx,
1170 ));
1171
1172 tokio::time::sleep(Duration::from_millis(1)).await;
1173 sd_tx.send(true).unwrap();
1174 task.await.unwrap().unwrap();
1175
1176 // First apply saw 0 (nothing applied yet); second apply saw 2 (first
1177 // batch's cursor), NOT 4. The cursor only reached 4 after the second
1178 // apply returned.
1179 assert_eq!(*seen_at_apply.lock().unwrap(), vec![0, 2]);
1180 assert_eq!(published.load(Ordering::SeqCst), 4);
1181 }
1182
1183 /// Updates whose `parse` returns `None` (corrupt / irrelevant) carry no
1184 /// domain work, but they were still received — so the cursor must advance
1185 /// over them.
1186 #[tokio::test]
1187 async fn corrupt_parse_entries_advance_cursor() {
1188 let updates = vec![put("a", b"1", 5), put("b", b"2", 6), put("c", b"3", 7)];
1189 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1190
1191 let apply_calls = Arc::new(AtomicU64::new(0));
1192 let on_applied_max = Arc::new(AtomicU64::new(0));
1193 let ac = Arc::clone(&apply_calls);
1194 let om = Arc::clone(&on_applied_max);
1195 let (_sd_tx, sd_rx) = watch::channel(false);
1196
1197 let cursor = watch_applied(
1198 watcher,
1199 WatchScope::All,
1200 None,
1201 None, // reader (no resync in this test)
1202 None::<AppendLogSnapshot>,
1203 None,
1204 BatchConfig::default(),
1205 // Reject everything — simulates corrupt/irrelevant entries.
1206 |_u: &KvUpdate| -> Option<Vec<u8>> { None },
1207 move |batch: Vec<Vec<u8>>| {
1208 ac.fetch_add(1, Ordering::SeqCst);
1209 assert!(batch.is_empty());
1210 },
1211 move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1212 sd_rx,
1213 )
1214 .await
1215 .unwrap();
1216
1217 assert_eq!(cursor.as_u64(), Some(7), "cursor covers rejected updates");
1218 assert_eq!(
1219 apply_calls.load(Ordering::SeqCst),
1220 0,
1221 "an all-rejected batch applies nothing"
1222 );
1223 assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1224 }
1225
1226 /// A resume whose cursor has expired falls back to the full watch and still
1227 /// applies the delivered updates.
1228 #[tokio::test]
1229 async fn cursor_expired_falls_back_to_full_watch() {
1230 let mock = MockWatcher {
1231 full: Mutex::new(Some(vec![put("a", b"1", 10), put("b", b"2", 11)])),
1232 from: Mutex::new(Some(vec![])),
1233 from_expires: true,
1234 hold: false,
1235 };
1236 let watcher = Arc::new(mock);
1237
1238 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1239 let ab = Arc::clone(&applied_batches);
1240 let (_sd_tx, sd_rx) = watch::channel(false);
1241
1242 let cursor = watch_applied(
1243 watcher,
1244 WatchScope::All,
1245 Some(WatchCursor::from_u64(5)), // resume position that "expired"
1246 None, // reader (no resync in this test)
1247 None::<AppendLogSnapshot>,
1248 None,
1249 BatchConfig::default(),
1250 parse_put,
1251 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1252 move |_| {},
1253 sd_rx,
1254 )
1255 .await
1256 .unwrap();
1257
1258 assert_eq!(cursor.as_u64(), Some(11));
1259 assert_eq!(
1260 *applied_batches.lock().unwrap(),
1261 vec![b"1".to_vec(), b"2".to_vec()],
1262 "fallback full watch's updates were applied"
1263 );
1264 }
1265
1266 /// Cursor-expired resync: with a reader + store wired, a key the fold holds
1267 /// that the live listing no longer does gets a synthetic delete — applied
1268 /// strictly BEFORE the fallback re-list — and the persisted fold converges
1269 /// to the live state. The synthetic delete (unknown version) must not move
1270 /// the cursor; the re-list put must.
1271 #[tokio::test]
1272 async fn cursor_expired_resync_deletes_stale_keys() {
1273 let dir = tempfile::TempDir::new().unwrap();
1274 let path = dir.path().join("resync.snap");
1275 let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1276 // The fold from the previous run: node.a and node.b at cursor 2.
1277 store
1278 .apply(
1279 &[put("node.a", b"1", 1), put("node.b", b"2", 2)],
1280 &WatchCursor::from_u64(2),
1281 )
1282 .unwrap();
1283
1284 // During the gap node.b was deleted (marker since evicted) and node.a
1285 // updated; the resume cursor (2) has expired. The fallback re-list
1286 // therefore carries only the surviving key.
1287 let mock = MockWatcher {
1288 full: Mutex::new(Some(vec![put("node.a", b"1b", 10)])),
1289 from: Mutex::new(Some(vec![])),
1290 from_expires: true,
1291 hold: false,
1292 };
1293 let reader = MockReader {
1294 live: vec!["node.a".to_string()],
1295 };
1296
1297 // Record everything `parse` sees, in order, deletes included.
1298 let seen = Arc::new(Mutex::new(Vec::<(String, bool)>::new()));
1299 let s = Arc::clone(&seen);
1300 let (_sd_tx, sd_rx) = watch::channel(false);
1301
1302 let cursor = watch_applied(
1303 Arc::new(mock),
1304 WatchScope::All,
1305 Some(WatchCursor::from_u64(2)),
1306 Some(Arc::new(reader) as Arc<dyn KvReader>),
1307 Some(store),
1308 None,
1309 BatchConfig::default(),
1310 move |u: &KvUpdate| {
1311 s.lock()
1312 .unwrap()
1313 .push((u.key().to_string(), matches!(u, KvUpdate::Delete { .. })));
1314 Some(())
1315 },
1316 |_batch: Vec<()>| {},
1317 |_| {},
1318 sd_rx,
1319 )
1320 .await
1321 .unwrap();
1322
1323 // The re-list put advanced the cursor; the synthetic delete did not.
1324 assert_eq!(cursor.as_u64(), Some(10));
1325 // The synthetic delete strictly precedes the re-list put.
1326 assert_eq!(
1327 *seen.lock().unwrap(),
1328 vec![("node.b".to_string(), true), ("node.a".to_string(), false)],
1329 "synthetic delete must be applied before the fallback re-list"
1330 );
1331
1332 // The persisted fold converged: stale key gone, live key updated.
1333 let snap = crate::snapshot::load(&path).unwrap().unwrap();
1334 assert_eq!(snap.cursor.as_u64(), Some(10));
1335 assert_eq!(snap.entries.len(), 1);
1336 assert_eq!(snap.entries["node.a"].value, b"1b");
1337 }
1338
1339 /// A prefix-scoped resync diffs only in-scope keys: an out-of-scope key the
1340 /// fold holds survives, the in-scope stale key is deleted, and a flush
1341 /// containing only synthetic deletes leaves the cursor untouched.
1342 #[tokio::test]
1343 async fn cursor_expired_resync_respects_scope() {
1344 let dir = tempfile::TempDir::new().unwrap();
1345 let path = dir.path().join("resync-scope.snap");
1346 let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1347 store
1348 .apply(
1349 &[put("node.b", b"2", 1), put("other.z", b"9", 2)],
1350 &WatchCursor::from_u64(2),
1351 )
1352 .unwrap();
1353
1354 // Expired resume; the bucket no longer has ANY node.* keys; the
1355 // fallback re-list is empty.
1356 let mock = MockWatcher {
1357 full: Mutex::new(Some(vec![])),
1358 from: Mutex::new(Some(vec![])),
1359 from_expires: true,
1360 hold: false,
1361 };
1362 let reader = MockReader { live: vec![] };
1363 let (_sd_tx, sd_rx) = watch::channel(false);
1364
1365 let cursor = watch_applied(
1366 Arc::new(mock),
1367 WatchScope::Prefix("node.".to_string()),
1368 Some(WatchCursor::from_u64(2)),
1369 Some(Arc::new(reader) as Arc<dyn KvReader>),
1370 Some(store),
1371 None,
1372 BatchConfig::default(),
1373 |_u: &KvUpdate| Some(()),
1374 |_batch: Vec<()>| {},
1375 |_| {},
1376 sd_rx,
1377 )
1378 .await
1379 .unwrap();
1380
1381 // Deletes-only flush: cursor stays at the resume position.
1382 assert_eq!(cursor.as_u64(), Some(2));
1383
1384 let snap = crate::snapshot::load(&path).unwrap().unwrap();
1385 assert_eq!(snap.cursor.as_u64(), Some(2));
1386 assert!(
1387 !snap.entries.contains_key("node.b"),
1388 "in-scope stale key must be resync-deleted"
1389 );
1390 assert_eq!(
1391 snap.entries["other.z"].value, b"9",
1392 "out-of-scope key must survive a prefix-scoped resync"
1393 );
1394 }
1395
1396 /// `WatchScope::Prefixes` dispatches to `watch_prefixes` (no resume) and to
1397 /// `watch_prefixes_from` with the expiry → full-watch fallback (resume).
1398 #[tokio::test]
1399 async fn prefixes_scope_dispatches_full_watch() {
1400 let updates = vec![put("a.x", b"1", 1), put("b.y", b"2", 2)];
1401 let watcher = Arc::new(MockWatcher::new(updates, false));
1402 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1403 let ab = Arc::clone(&applied_batches);
1404 let (_sd_tx, sd_rx) = watch::channel(false);
1405
1406 let cursor = watch_applied(
1407 watcher,
1408 WatchScope::Prefixes(vec!["a.".to_string(), "b.".to_string()]),
1409 None,
1410 None, // reader (no resync in this test)
1411 None::<AppendLogSnapshot>,
1412 None,
1413 BatchConfig::default(),
1414 parse_put,
1415 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1416 move |_| {},
1417 sd_rx,
1418 )
1419 .await
1420 .unwrap();
1421
1422 assert_eq!(cursor.as_u64(), Some(2));
1423 assert_eq!(
1424 *applied_batches.lock().unwrap(),
1425 vec![b"1".to_vec(), b"2".to_vec()]
1426 );
1427 }
1428
1429 /// `WatchScope::Prefixes` resume whose cursor has expired falls back to the
1430 /// full multi-prefix watch and applies its updates.
1431 #[tokio::test]
1432 async fn prefixes_scope_expired_resume_falls_back() {
1433 let mock = MockWatcher {
1434 full: Mutex::new(Some(vec![put("a.x", b"1", 7)])),
1435 from: Mutex::new(Some(vec![])),
1436 from_expires: true,
1437 hold: false,
1438 };
1439 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1440 let ab = Arc::clone(&applied_batches);
1441 let (_sd_tx, sd_rx) = watch::channel(false);
1442
1443 let cursor = watch_applied(
1444 Arc::new(mock),
1445 WatchScope::Prefixes(vec!["a.".to_string()]),
1446 Some(WatchCursor::from_u64(3)),
1447 None, // reader (no resync in this test)
1448 None::<AppendLogSnapshot>,
1449 None,
1450 BatchConfig::default(),
1451 parse_put,
1452 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1453 move |_| {},
1454 sd_rx,
1455 )
1456 .await
1457 .unwrap();
1458
1459 assert_eq!(cursor.as_u64(), Some(7));
1460 assert_eq!(*applied_batches.lock().unwrap(), vec![b"1".to_vec()]);
1461 }
1462
1463 /// End-to-end with a real snapshot file: after the run, the persisted
1464 /// snapshot's cursor equals the applied cursor and its entries match the
1465 /// applied state — proving the checkpoint is written at the post-apply
1466 /// cursor, never ahead of it.
1467 #[tokio::test]
1468 async fn snapshot_checkpoint_matches_applied_cursor() {
1469 let dir = tempfile::TempDir::new().unwrap();
1470 let path = dir.path().join("applied.snap");
1471 let (_resume, store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1472
1473 let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
1474 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1475 let (_sd_tx, sd_rx) = watch::channel(false);
1476
1477 let cursor = watch_applied(
1478 watcher,
1479 WatchScope::All,
1480 None,
1481 None, // reader (no resync in this test)
1482 Some(store),
1483 None,
1484 BatchConfig::default(),
1485 parse_put,
1486 move |_batch: Vec<Vec<u8>>| {},
1487 move |_| {},
1488 sd_rx,
1489 )
1490 .await
1491 .unwrap();
1492
1493 assert_eq!(cursor.as_u64(), Some(2));
1494
1495 let snap = crate::snapshot::load(&path).unwrap().unwrap();
1496 assert_eq!(
1497 snap.cursor.as_u64(),
1498 cursor.as_u64(),
1499 "snapshot checkpoint cursor must equal the applied cursor"
1500 );
1501 assert_eq!(snap.entries.len(), 2);
1502 assert_eq!(snap.entries["node.a"].value, b"1");
1503 assert_eq!(snap.entries["node.b"].value, b"2");
1504 }
1505
1506 /// Happy-path resume: a non-expired cursor takes the `*_from` path and the
1507 /// delta (the `from` script, NOT the full set) is applied. Proves the
1508 /// resume branch delivers only post-cursor updates and advances to their
1509 /// max revision.
1510 #[tokio::test]
1511 async fn resume_from_cursor_delivers_only_delta() {
1512 let mock = MockWatcher {
1513 // `full` would be delivered only if the resume path were (wrongly)
1514 // bypassed; a non-empty distinguishing value makes that visible.
1515 full: Mutex::new(Some(vec![put("full.x", b"FULL", 1)])),
1516 from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
1517 from_expires: false,
1518 hold: false,
1519 };
1520 let watcher = Arc::new(mock);
1521
1522 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1523 let ab = Arc::clone(&applied_batches);
1524 let (_sd_tx, sd_rx) = watch::channel(false);
1525
1526 let cursor = watch_applied(
1527 watcher,
1528 WatchScope::All,
1529 Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
1530 None, // reader (no resync in this test)
1531 None::<AppendLogSnapshot>,
1532 None,
1533 BatchConfig::default(),
1534 parse_put,
1535 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1536 move |_| {},
1537 sd_rx,
1538 )
1539 .await
1540 .unwrap();
1541
1542 assert_eq!(
1543 cursor.as_u64(),
1544 Some(11),
1545 "cursor advances to the delta max"
1546 );
1547 assert_eq!(
1548 *applied_batches.lock().unwrap(),
1549 vec![b"3".to_vec(), b"4".to_vec()],
1550 "only the post-cursor delta is applied, never the full set"
1551 );
1552 }
1553
1554 /// `WatchScope::Prefix` with no resume dispatches to `watch_prefix` and
1555 /// applies the delivered updates. Every other test uses `WatchScope::All`;
1556 /// this covers the prefix dispatch arm.
1557 #[tokio::test]
1558 async fn prefix_scope_applies_delivered_updates() {
1559 let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
1560 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1561
1562 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1563 let ab = Arc::clone(&applied_batches);
1564 let (_sd_tx, sd_rx) = watch::channel(false);
1565
1566 let cursor = watch_applied(
1567 watcher,
1568 WatchScope::Prefix("node.".to_string()),
1569 None,
1570 None, // reader (no resync in this test)
1571 None::<AppendLogSnapshot>,
1572 None,
1573 BatchConfig::default(),
1574 parse_put,
1575 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1576 move |_| {},
1577 sd_rx,
1578 )
1579 .await
1580 .unwrap();
1581
1582 assert_eq!(cursor.as_u64(), Some(2));
1583 assert_eq!(
1584 *applied_batches.lock().unwrap(),
1585 vec![b"1".to_vec(), b"2".to_vec()]
1586 );
1587 }
1588
1589 /// `WatchScope::Prefix` happy-path resume: a non-expired cursor takes the
1590 /// `watch_prefix_from` path and only the delta is applied — the prefix
1591 /// twin of `resume_from_cursor_delivers_only_delta`.
1592 #[tokio::test]
1593 async fn prefix_resume_from_cursor_delivers_only_delta() {
1594 let mock = MockWatcher {
1595 // `full` would be delivered only if the resume path were (wrongly)
1596 // bypassed; a distinguishing value makes that visible.
1597 full: Mutex::new(Some(vec![put("node.x", b"FULL", 1)])),
1598 from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
1599 from_expires: false,
1600 hold: false,
1601 };
1602 let watcher = Arc::new(mock);
1603
1604 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1605 let ab = Arc::clone(&applied_batches);
1606 let (_sd_tx, sd_rx) = watch::channel(false);
1607
1608 let cursor = watch_applied(
1609 watcher,
1610 WatchScope::Prefix("node.".to_string()),
1611 Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
1612 None, // reader (no resync in this test)
1613 None::<AppendLogSnapshot>,
1614 None,
1615 BatchConfig::default(),
1616 parse_put,
1617 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1618 move |_| {},
1619 sd_rx,
1620 )
1621 .await
1622 .unwrap();
1623
1624 assert_eq!(
1625 cursor.as_u64(),
1626 Some(11),
1627 "cursor advances to the delta max"
1628 );
1629 assert_eq!(
1630 *applied_batches.lock().unwrap(),
1631 vec![b"3".to_vec(), b"4".to_vec()],
1632 "only the post-cursor delta is applied via watch_prefix_from"
1633 );
1634 }
1635
1636 /// `WatchScope::Prefix` resume whose cursor has expired falls back to the
1637 /// full `watch_prefix` and still applies the delivered updates — the prefix
1638 /// twin of `cursor_expired_falls_back_to_full_watch`.
1639 #[tokio::test]
1640 async fn prefix_cursor_expired_falls_back_to_full_prefix_watch() {
1641 let mock = MockWatcher {
1642 full: Mutex::new(Some(vec![put("node.a", b"1", 10), put("node.b", b"2", 11)])),
1643 from: Mutex::new(Some(vec![])),
1644 from_expires: true,
1645 hold: false,
1646 };
1647 let watcher = Arc::new(mock);
1648
1649 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1650 let ab = Arc::clone(&applied_batches);
1651 let (_sd_tx, sd_rx) = watch::channel(false);
1652
1653 let cursor = watch_applied(
1654 watcher,
1655 WatchScope::Prefix("node.".to_string()),
1656 Some(WatchCursor::from_u64(5)), // resume position that "expired"
1657 None, // reader (no resync in this test)
1658 None::<AppendLogSnapshot>,
1659 None,
1660 BatchConfig::default(),
1661 parse_put,
1662 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1663 move |_| {},
1664 sd_rx,
1665 )
1666 .await
1667 .unwrap();
1668
1669 assert_eq!(cursor.as_u64(), Some(11));
1670 assert_eq!(
1671 *applied_batches.lock().unwrap(),
1672 vec![b"1".to_vec(), b"2".to_vec()],
1673 "prefix fallback full watch's updates were applied"
1674 );
1675 }
1676
1677 /// The watch task's terminal error must propagate out of `watch_applied`
1678 /// rather than being swallowed as `Ok(applied)` when the channel closes.
1679 #[tokio::test]
1680 async fn watch_task_error_propagates() {
1681 let watcher = Arc::new(ErrorWatcher);
1682 let (_sd_tx, sd_rx) = watch::channel(false);
1683
1684 let result = watch_applied(
1685 watcher,
1686 WatchScope::All,
1687 None,
1688 None, // reader (no resync in this test)
1689 None::<AppendLogSnapshot>,
1690 None,
1691 BatchConfig::default(),
1692 parse_put,
1693 move |_batch: Vec<Vec<u8>>| {},
1694 move |_| {},
1695 sd_rx,
1696 )
1697 .await;
1698
1699 match result {
1700 Err(KvError::WatchError(msg)) => {
1701 assert!(msg.contains("injected"), "error carries the cause: {msg}");
1702 }
1703 other => panic!("expected WatchError, got {other:?}"),
1704 }
1705 }
1706
1707 /// A batch where `parse` accepts some updates and rejects others: the cursor
1708 /// must still advance to the highest *received* revision (covering the
1709 /// rejected entry in the middle), while `apply` sees only the accepted ones.
1710 #[tokio::test]
1711 async fn mixed_parse_advances_cursor_over_rejected_entries() {
1712 let updates = vec![
1713 put("keep.a", b"1", 5),
1714 put("skip.b", b"2", 6), // rejected by parse
1715 put("keep.c", b"3", 7),
1716 ];
1717 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1718
1719 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1720 let on_applied_max = Arc::new(AtomicU64::new(0));
1721 let ab = Arc::clone(&applied_batches);
1722 let om = Arc::clone(&on_applied_max);
1723 let (_sd_tx, sd_rx) = watch::channel(false);
1724
1725 let cursor = watch_applied(
1726 watcher,
1727 WatchScope::All,
1728 None,
1729 None, // reader (no resync in this test)
1730 None::<AppendLogSnapshot>,
1731 None,
1732 BatchConfig::default(),
1733 // Keep only keys under "keep."; reject everything else.
1734 |u: &KvUpdate| -> Option<Vec<u8>> {
1735 match u {
1736 KvUpdate::Put(e) if e.key.starts_with("keep.") => Some(e.value.clone()),
1737 _ => None,
1738 }
1739 },
1740 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1741 move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1742 sd_rx,
1743 )
1744 .await
1745 .unwrap();
1746
1747 assert_eq!(
1748 cursor.as_u64(),
1749 Some(7),
1750 "cursor covers the rejected middle entry (rev 6)"
1751 );
1752 assert_eq!(
1753 *applied_batches.lock().unwrap(),
1754 vec![b"1".to_vec(), b"3".to_vec()],
1755 "apply sees only the accepted entries"
1756 );
1757 assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1758 }
1759
1760 /// Shutdown before any update arrives: nothing was received, so the cursor
1761 /// stays at the resume position (here `none()`), `apply` never runs, and
1762 /// `on_applied` never fires.
1763 #[tokio::test(start_paused = true)]
1764 async fn shutdown_with_no_pending_batch() {
1765 let watcher = Arc::new(MockWatcher::new(vec![], true)); // deliver nothing, hold open
1766
1767 let apply_calls = Arc::new(AtomicU64::new(0));
1768 let on_applied_calls = Arc::new(AtomicU64::new(0));
1769 let ac = Arc::clone(&apply_calls);
1770 let oc = Arc::clone(&on_applied_calls);
1771 let (sd_tx, sd_rx) = watch::channel(false);
1772
1773 let task = tokio::spawn(watch_applied(
1774 watcher,
1775 WatchScope::All,
1776 None,
1777 None, // reader (no resync in this test)
1778 None::<AppendLogSnapshot>,
1779 None,
1780 BatchConfig::default(),
1781 parse_put,
1782 move |_batch: Vec<Vec<u8>>| {
1783 ac.fetch_add(1, Ordering::SeqCst);
1784 },
1785 move |_| {
1786 oc.fetch_add(1, Ordering::SeqCst);
1787 },
1788 sd_rx,
1789 ));
1790
1791 // Let the watcher attach and idle (it has nothing to deliver), then shut down.
1792 tokio::time::sleep(Duration::from_millis(1)).await;
1793 sd_tx.send(true).unwrap();
1794
1795 let cursor = task.await.unwrap().unwrap();
1796 assert_eq!(
1797 cursor.as_u64(),
1798 None,
1799 "no updates received → cursor unmoved"
1800 );
1801 assert_eq!(apply_calls.load(Ordering::SeqCst), 0, "apply never runs");
1802 assert_eq!(
1803 on_applied_calls.load(Ordering::SeqCst),
1804 0,
1805 "on_applied never fires"
1806 );
1807 }
1808
1809 /// An [`ExportRequest`] flushes the pending batch first, so the artifact's
1810 /// cursor is exactly the applied cursor — and the artifact is importable
1811 /// with the batched entries in it.
1812 #[tokio::test(start_paused = true)]
1813 async fn export_request_flushes_pending_batch_first() {
1814 let dir = tempfile::TempDir::new().unwrap();
1815 let store_path = dir.path().join("fold.snap");
1816 let artifact = dir.path().join("artifact");
1817 let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1818
1819 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1820 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1821 let (sd_tx, sd_rx) = watch::channel(false);
1822 let (ex_tx, ex_rx) = mpsc::channel(1);
1823
1824 let task = tokio::spawn(watch_applied(
1825 watcher,
1826 WatchScope::All,
1827 None,
1828 None, // reader (no resync in this test)
1829 Some(store),
1830 Some(ex_rx),
1831 BatchConfig {
1832 window: Duration::from_secs(3600), // window never fires
1833 max: 100,
1834 },
1835 parse_put,
1836 move |_batch: Vec<Vec<u8>>| {},
1837 move |_| {},
1838 sd_rx,
1839 ));
1840
1841 // Let both updates land in the (unflushed) pending batch, then export.
1842 tokio::time::sleep(Duration::from_millis(1)).await;
1843 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1844 ex_tx
1845 .send(ExportRequest {
1846 dest_dir: artifact.clone(),
1847 reply: reply_tx,
1848 })
1849 .await
1850 .unwrap();
1851
1852 let manifest = reply_rx.await.unwrap().expect("export succeeds");
1853 assert_eq!(
1854 manifest.cursor.as_u64(),
1855 Some(2),
1856 "pending batch flushed before export: artifact cursor is the applied cursor"
1857 );
1858
1859 // The artifact is importable and holds both batched entries.
1860 let (cursor, imported) =
1861 AppendLogSnapshot::import(&artifact, &dir.path().join("imported.snap"), u64::MAX)
1862 .unwrap();
1863 assert_eq!(cursor.as_u64(), Some(2));
1864 assert_eq!(imported.get("a").unwrap().unwrap().value, b"1");
1865 assert_eq!(imported.get("b").unwrap().unwrap().value, b"2");
1866
1867 sd_tx.send(true).unwrap();
1868 task.await.unwrap().unwrap();
1869 }
1870
1871 /// An [`ExportRequest`] that arrives with NOTHING pending (the window
1872 /// already flushed everything) still produces a valid artifact whose
1873 /// cursor is the applied cursor. The flush-before-export step must be a
1874 /// clean no-op, not an error or a cursor regression.
1875 #[tokio::test(start_paused = true)]
1876 async fn export_with_empty_pending_batch_succeeds() {
1877 let dir = tempfile::TempDir::new().unwrap();
1878 let store_path = dir.path().join("fold.snap");
1879 let artifact = dir.path().join("artifact");
1880 let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1881
1882 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1883 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1884 let (sd_tx, sd_rx) = watch::channel(false);
1885 let (ex_tx, ex_rx) = mpsc::channel(1);
1886
1887 let task = tokio::spawn(watch_applied(
1888 watcher,
1889 WatchScope::All,
1890 None,
1891 None, // reader (no resync in this test)
1892 Some(store),
1893 Some(ex_rx),
1894 BatchConfig::default(), // 10 ms window
1895 parse_put,
1896 move |_batch: Vec<Vec<u8>>| {},
1897 move |_| {},
1898 sd_rx,
1899 ));
1900
1901 // Let the window flush both updates, so the export request finds an
1902 // EMPTY pending batch.
1903 tokio::time::sleep(Duration::from_millis(50)).await;
1904
1905 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1906 ex_tx
1907 .send(ExportRequest {
1908 dest_dir: artifact.clone(),
1909 reply: reply_tx,
1910 })
1911 .await
1912 .unwrap();
1913 let manifest = reply_rx
1914 .await
1915 .unwrap()
1916 .expect("export succeeds with nothing pending");
1917 assert_eq!(
1918 manifest.cursor.as_u64(),
1919 Some(2),
1920 "artifact cursor is the applied cursor, unchanged by the no-op flush"
1921 );
1922
1923 // The artifact is importable and holds the already-flushed entries.
1924 let (cursor, imported) =
1925 AppendLogSnapshot::import(&artifact, &dir.path().join("imported.snap"), u64::MAX)
1926 .unwrap();
1927 assert_eq!(cursor.as_u64(), Some(2));
1928 assert_eq!(imported.get("a").unwrap().unwrap().value, b"1");
1929 assert_eq!(imported.get("b").unwrap().unwrap().value, b"2");
1930
1931 sd_tx.send(true).unwrap();
1932 task.await.unwrap().unwrap();
1933 }
1934
1935 /// An export request against a store-less watch replies with an error and
1936 /// the watch keeps running.
1937 #[tokio::test(start_paused = true)]
1938 async fn export_without_store_replies_error() {
1939 let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
1940 let (sd_tx, sd_rx) = watch::channel(false);
1941 let (ex_tx, ex_rx) = mpsc::channel(1);
1942
1943 let task = tokio::spawn(watch_applied(
1944 watcher,
1945 WatchScope::All,
1946 None,
1947 None, // reader (no resync in this test)
1948 None::<AppendLogSnapshot>,
1949 Some(ex_rx),
1950 BatchConfig::default(),
1951 parse_put,
1952 move |_batch: Vec<Vec<u8>>| {},
1953 move |_| {},
1954 sd_rx,
1955 ));
1956
1957 tokio::time::sleep(Duration::from_millis(1)).await;
1958 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1959 ex_tx
1960 .send(ExportRequest {
1961 dest_dir: std::env::temp_dir().join("never-created"),
1962 reply: reply_tx,
1963 })
1964 .await
1965 .unwrap();
1966 assert!(
1967 reply_rx.await.unwrap().is_err(),
1968 "no store → export errors via the reply"
1969 );
1970
1971 // The watch is still alive and returns its applied cursor on shutdown.
1972 sd_tx.send(true).unwrap();
1973 let cursor = task.await.unwrap().unwrap();
1974 assert_eq!(cursor.as_u64(), Some(1));
1975 }
1976
1977 /// An export failure (unavailable destination) is reported on the reply and
1978 /// the watch keeps applying later updates.
1979 #[tokio::test(start_paused = true)]
1980 async fn export_error_does_not_kill_watch() {
1981 let dir = tempfile::TempDir::new().unwrap();
1982 let store_path = dir.path().join("fold.snap");
1983 let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1984
1985 // Occupied destination → export fails.
1986 let occupied = dir.path().join("occupied");
1987 std::fs::create_dir(&occupied).unwrap();
1988 std::fs::write(occupied.join("stray"), b"x").unwrap();
1989
1990 let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
1991 let (sd_tx, sd_rx) = watch::channel(false);
1992 let (ex_tx, ex_rx) = mpsc::channel(1);
1993
1994 let applied = Arc::new(AtomicU64::new(0));
1995 let a = Arc::clone(&applied);
1996
1997 let task = tokio::spawn(watch_applied(
1998 watcher,
1999 WatchScope::All,
2000 None,
2001 None, // reader (no resync in this test)
2002 Some(store),
2003 Some(ex_rx),
2004 BatchConfig::default(),
2005 parse_put,
2006 move |_batch: Vec<Vec<u8>>| {},
2007 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
2008 sd_rx,
2009 ));
2010
2011 tokio::time::sleep(Duration::from_millis(1)).await;
2012 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
2013 ex_tx
2014 .send(ExportRequest {
2015 dest_dir: occupied,
2016 reply: reply_tx,
2017 })
2018 .await
2019 .unwrap();
2020 match reply_rx.await.unwrap() {
2021 Err(crate::snapshot::SnapshotError::ArtifactInvalid(_)) => {}
2022 other => panic!("expected ArtifactInvalid, got {other:?}"),
2023 }
2024
2025 // Watch still folds: a clean shutdown returns the applied cursor.
2026 sd_tx.send(true).unwrap();
2027 let cursor = task.await.unwrap().unwrap();
2028 assert_eq!(cursor.as_u64(), Some(1), "watch survived the failed export");
2029 assert_eq!(applied.load(Ordering::SeqCst), 1);
2030 }
2031
2032 /// Dropping the export sender disarms the arm; the loop keeps batching and
2033 /// flushing normally.
2034 #[tokio::test(start_paused = true)]
2035 async fn export_sender_dropped_disarms_channel() {
2036 let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
2037 let (sd_tx, sd_rx) = watch::channel(false);
2038 let (ex_tx, ex_rx) = mpsc::channel::<ExportRequest>(1);
2039
2040 let applied = Arc::new(AtomicU64::new(0));
2041 let a = Arc::clone(&applied);
2042
2043 let task = tokio::spawn(watch_applied(
2044 watcher,
2045 WatchScope::All,
2046 None,
2047 None, // reader (no resync in this test)
2048 None::<AppendLogSnapshot>,
2049 Some(ex_rx),
2050 BatchConfig::default(),
2051 parse_put,
2052 move |_batch: Vec<Vec<u8>>| {},
2053 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
2054 sd_rx,
2055 ));
2056
2057 drop(ex_tx); // disarm
2058 tokio::time::sleep(Duration::from_millis(50)).await;
2059 assert_eq!(
2060 applied.load(Ordering::SeqCst),
2061 1,
2062 "loop keeps flushing after the export sender is gone"
2063 );
2064
2065 sd_tx.send(true).unwrap();
2066 task.await.unwrap().unwrap();
2067 }
2068
2069 /// With a low `compact_threshold`, the flush path's `spawn_blocking`
2070 /// compaction actually fires (every other snapshot test pins the threshold
2071 /// at `u64::MAX`, leaving that branch dead). After a compacting run the
2072 /// snapshot must still load cleanly with the right cursor and entries.
2073 #[tokio::test]
2074 async fn snapshot_compaction_fires_and_stays_consistent() {
2075 let dir = tempfile::TempDir::new().unwrap();
2076 let path = dir.path().join("applied.snap");
2077 // threshold 0 → every checkpoint reports "needs compact", forcing the
2078 // store's inline-compaction branch on each flush (run off the hot path via
2079 // spawn_blocking inside watch_applied).
2080 let (_resume, store) = AppendLogSnapshot::open(&path, 0).unwrap();
2081
2082 // Re-put the same key across flushes so compaction has duplicates to
2083 // dedup; small max forces multiple flushes (hence multiple compactions).
2084 let updates = vec![
2085 put("node.a", b"1", 1),
2086 put("node.a", b"2", 2),
2087 put("node.b", b"3", 3),
2088 put("node.a", b"4", 4),
2089 ];
2090 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
2091 let (_sd_tx, sd_rx) = watch::channel(false);
2092
2093 let cursor = watch_applied(
2094 watcher,
2095 WatchScope::All,
2096 None,
2097 None, // reader (no resync in this test)
2098 Some(store),
2099 None,
2100 BatchConfig {
2101 window: Duration::from_secs(3600),
2102 max: 1, // one update per flush → a compaction per update
2103 },
2104 parse_put,
2105 move |_batch: Vec<Vec<u8>>| {},
2106 move |_| {},
2107 sd_rx,
2108 )
2109 .await
2110 .unwrap();
2111
2112 assert_eq!(cursor.as_u64(), Some(4));
2113
2114 let snap = crate::snapshot::load(&path).unwrap().unwrap();
2115 assert_eq!(
2116 snap.cursor.as_u64(),
2117 cursor.as_u64(),
2118 "compacted snapshot's cursor still equals the applied cursor"
2119 );
2120 assert_eq!(snap.entries.len(), 2, "duplicates of node.a deduped");
2121 assert_eq!(
2122 snap.entries["node.a"].value, b"4",
2123 "last write per key survives compaction"
2124 );
2125 assert_eq!(snap.entries["node.b"].value, b"3");
2126 }
2127 /// A SnapshotStore whose FIRST apply fails (transient store error: disk
2128 /// pressure, lock timeout), then behaves normally — the trigger for the
2129 /// lost-raw-batch hazard in the flush path.
2130 struct FailOnceStore {
2131 inner: AppendLogSnapshot,
2132 failed: std::sync::atomic::AtomicBool,
2133 }
2134
2135 impl crate::snapshot::SnapshotStore for FailOnceStore {
2136 fn load(
2137 _path: &std::path::Path,
2138 ) -> Result<(WatchCursor, Self), crate::snapshot::SnapshotError> {
2139 unreachable!("test store is constructed directly")
2140 }
2141 fn apply(
2142 &mut self,
2143 batch: &[KvUpdate],
2144 cursor: &WatchCursor,
2145 ) -> Result<(), crate::snapshot::SnapshotError> {
2146 if !self.failed.swap(true, Ordering::SeqCst) {
2147 return Err(crate::snapshot::SnapshotError::Backend(
2148 "injected transient store failure".into(),
2149 ));
2150 }
2151 self.inner.apply(batch, cursor)
2152 }
2153 fn get(&self, key: &str) -> Result<Option<KvEntry>, crate::snapshot::SnapshotError> {
2154 self.inner.get(key)
2155 }
2156 fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, crate::snapshot::SnapshotError> {
2157 self.inner.range(prefix)
2158 }
2159 fn cursor(&self) -> WatchCursor {
2160 self.inner.cursor()
2161 }
2162 fn export_to(
2163 &mut self,
2164 dest_dir: &std::path::Path,
2165 ) -> Result<crate::artifact::ExportManifest, crate::snapshot::SnapshotError> {
2166 self.inner.export_to(dest_dir)
2167 }
2168 }
2169
2170 /// CURSOR AUTHORITY under a transient store failure: a failed store apply
2171 /// must NOT cause later successful applies to advance the persisted
2172 /// cursor past data that never landed. The failed batch is re-queued and
2173 /// committed cumulatively with the next flush, so the store's cursor
2174 /// never lies about its contents — a restart resuming from it sees
2175 /// exactly the missing tail, not a silent hole.
2176 ///
2177 /// (Pre-fix behavior, found while writing the watch_applied model: the
2178 /// failed batch's raw updates were dropped on the warn-and-continue
2179 /// path, and the NEXT successful flush committed only newer updates
2180 /// under the newest cursor — a permanent, restart-surviving gap in the
2181 /// fold.)
2182 #[tokio::test(start_paused = true)]
2183 async fn transient_store_failure_never_leaves_a_cursor_gap() {
2184 let dir = tempfile::TempDir::new().unwrap();
2185 let path = dir.path().join("fold.snap");
2186 let (_r, inner) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
2187 let store = FailOnceStore {
2188 inner,
2189 failed: std::sync::atomic::AtomicBool::new(false),
2190 };
2191
2192 // max: 1 -> one flush per update: flush #1 (a@1) hits the injected
2193 // failure, flush #2 (b@2) succeeds.
2194 let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
2195 let watcher = Arc::new(MockWatcher::new(updates, true));
2196 let (sd_tx, sd_rx) = watch::channel(false);
2197
2198 let task = tokio::spawn(watch_applied(
2199 watcher,
2200 WatchScope::All,
2201 None,
2202 None,
2203 Some(store),
2204 None,
2205 BatchConfig {
2206 window: Duration::from_millis(1),
2207 max: 1,
2208 },
2209 parse_put,
2210 |_batch: Vec<Vec<u8>>| {},
2211 |_| {},
2212 sd_rx,
2213 ));
2214
2215 tokio::time::sleep(Duration::from_millis(50)).await;
2216 sd_tx.send(true).unwrap();
2217 let cursor = task.await.unwrap().unwrap();
2218 assert_eq!(cursor.as_u64(), Some(2));
2219
2220 // The store on disk must be SELF-CONSISTENT: whatever its cursor
2221 // claims, the data at or below it is present. With the re-queue fix
2222 // the cumulative commit lands both keys at cursor 2.
2223 let (persisted, reopened) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
2224 assert_eq!(persisted.as_u64(), Some(2), "cursor reached the head");
2225 assert_eq!(
2226 reopened.get("node.a").unwrap().map(|e| e.value),
2227 Some(b"1".to_vec()),
2228 "the transiently-failed batch was re-queued, not silently dropped \
2229 behind an advancing cursor"
2230 );
2231 assert_eq!(
2232 reopened.get("node.b").unwrap().map(|e| e.value),
2233 Some(b"2".to_vec())
2234 );
2235 }
2236 /// A reader whose live-key listing always fails — the resync's I/O
2237 /// failure mode.
2238 struct FailingReader;
2239
2240 #[async_trait]
2241 impl KvReader for FailingReader {
2242 async fn get(&self, _key: &str) -> Result<Option<KvEntry>, KvError> {
2243 unreachable!("resync only lists keys")
2244 }
2245 async fn keys(&self, _prefix: &str) -> Result<Vec<String>, KvError> {
2246 Err(KvError::OperationFailed("injected listing failure".into()))
2247 }
2248 async fn scan(&self, _prefix: &str) -> Result<Vec<KvEntry>, KvError> {
2249 unreachable!("resync only lists keys")
2250 }
2251 }
2252
2253 /// REGRESSION PIN (code-level twin of tests/model.rs's Degrade
2254 /// configuration): a resync whose live-key listing fails must FAIL THE
2255 /// WATCH, not degrade to re-list-only with a warning — the degrade
2256 /// semantics provably break the convergence theorem (silent stale keys).
2257 /// Reverting `resync_stale_keys` to warn-and-continue fails this test.
2258 #[tokio::test]
2259 async fn resync_listing_failure_is_fatal_not_degraded() {
2260 let dir = tempfile::TempDir::new().unwrap();
2261 let path = dir.path().join("resync-fatal.snap");
2262 let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
2263 store
2264 .apply(&[put("node.a", b"1", 1)], &WatchCursor::from_u64(1))
2265 .unwrap();
2266
2267 // Resume cursor expired -> resync path -> reader listing fails.
2268 let mock = MockWatcher {
2269 full: Mutex::new(Some(vec![])),
2270 from: Mutex::new(Some(vec![])),
2271 from_expires: true,
2272 hold: false,
2273 };
2274 let (_sd_tx, sd_rx) = watch::channel(false);
2275 let err = watch_applied(
2276 Arc::new(mock),
2277 WatchScope::All,
2278 Some(WatchCursor::from_u64(1)),
2279 Some(Arc::new(FailingReader) as Arc<dyn KvReader>),
2280 Some(store),
2281 None,
2282 BatchConfig::default(),
2283 parse_put,
2284 |_batch: Vec<Vec<u8>>| {},
2285 |_| {},
2286 sd_rx,
2287 )
2288 .await
2289 .expect_err("a failed resync listing must fail the watch");
2290 assert!(
2291 err.to_string().contains("resync failed listing live keys"),
2292 "{err}"
2293 );
2294 }
2295}