Skip to main content

slipstream/
applied.rs

1//! Cursor-after-apply watch combinator.
2//!
3//! [`watch_applied`] drives a [`KvWatcher`], batches incoming [`KvUpdate`]s over
4//! a short window (or a max count), hands each batch to a caller-supplied
5//! `apply` closure, and **only then** advances the resume cursor, checkpoints
6//! the snapshot, and fires `on_applied`. It encodes one discipline that every
7//! hand-rolled watch loop in the wider system gets subtly wrong:
8//!
9//! > **INVARIANT.** A persisted/reported cursor `C` implies every update with
10//! > revision ≤ `C` has been *applied* — the caller's `apply()` has returned for
11//! > it. The cursor never advances on *receipt* of an update, only after it has
12//! > durably taken effect.
13//!
14//! ## Why receipt is the wrong signal
15//!
16//! The tempting shortcut is to bump the cursor as each update arrives off the
17//! channel (`high_water = rev` on `rx.recv()`), then apply the batch later. On a
18//! crash between those two steps the persisted cursor claims "caught up to rev
19//! N" while rev N is still sitting in an unapplied buffer. On resume the watch
20//! starts *past* rev N and silently skips it — a correctness hole in the exact
21//! "resume after any restart" guarantee this crate advertises.
22//!
23//! Saltzer, Reed & Clark's *End-to-End Arguments in System Design* (1984) names
24//! the fix: a function placed below the endpoints (here, the channel receive)
25//! can only be a performance hint; the *endpoint* — the application of the
26//! update — is the only place the "it happened" guarantee can actually be
27//! established. So the cursor is written from `apply()`'s completion, not from
28//! the transport's delivery.
29//!
30//! The cursor-as-monotonic-index-into-a-log shape itself follows HashiCorp
31//! Consul's anti-entropy / blocking-query lineage: a client holds the last index
32//! it has *reconciled* and re-arms the watch from there, never from the index it
33//! merely *saw*.
34//!
35//! ## What the caller supplies
36//!
37//! - `parse`: maps a raw [`KvUpdate`] to an optional domain value `U`. Returning
38//!   `None` (corrupt bytes, irrelevant key) is fine — the update is still
39//!   *received*, so it still counts toward the cursor; there is simply nothing to
40//!   apply for it.
41//! - `apply`: consumes a `Vec<U>` in revision order. This is the only domain
42//!   logic; for the tunnel router it swaps the route table, for the edge origin
43//!   watcher it rebuilds the hashrings.
44//! - `on_applied`: fires once per flush, *after* `apply` returns, with the new
45//!   applied cursor. Callers use it to persist the cursor for the next restart.
46//!
47//! ## Panics
48//!
49//! `apply` runs inline on the watch task. If it panics, the panic propagates out
50//! of [`watch_applied`] and aborts the watch — that is the caller's contract,
51//! the same as a panic in any other supplied closure.
52
53use std::path::PathBuf;
54use std::sync::Arc;
55use std::time::Duration;
56
57use tokio::sync::mpsc;
58use tokio::sync::{oneshot, watch};
59use tracing::warn;
60
61use crate::artifact::ExportManifest;
62use crate::kv::{KvError, KvReader, KvUpdate, KvWatcher, WatchCursor};
63use crate::snapshot::{SnapshotError, SnapshotStore};
64
65/// A request, sent into a running [`watch_applied`] loop, to export the fold it
66/// owns (see [`SnapshotStore::export_to`]).
67///
68/// `watch_applied` takes its snapshot store **by value**, so a consumer that
69/// wants periodic artifacts of a live fold cannot call `export_to` itself. It
70/// instead passes an `mpsc::Receiver<ExportRequest>` to [`watch_applied`] and
71/// sends requests through the paired sender. The loop handles a request
72/// between batch flushes — after flushing any pending batch — so the artifact's
73/// embedded cursor is exactly the applied cursor at the moment of export.
74///
75/// The export result (or error) comes back on `reply`; an export failure is
76/// reported there and the watch keeps running (the snapshot is a cache — a
77/// failed artifact is the requester's problem, not the fold's).
78pub struct ExportRequest {
79    /// Where the artifact directory will be created. Must not exist (or be an
80    /// empty directory); same filesystem as the fold for cheap hardlinks.
81    pub dest_dir: PathBuf,
82    /// Receives the sealed manifest on success. A dropped receiver is ignored.
83    pub reply: oneshot::Sender<Result<ExportManifest, SnapshotError>>,
84}
85
86/// What to watch: every key, every key under a prefix, or the union of several
87/// prefixes.
88///
89/// Mirrors the [`KvWatcher`] surface — `All` maps to `watch_all` /
90/// `watch_all_from`, `Prefix` to `watch_prefix` / `watch_prefix_from`,
91/// `Prefixes` to `watch_prefixes` / `watch_prefixes_from` (one multi-filter
92/// consumer for the whole union, never one consumer per prefix).
93#[derive(Debug, Clone)]
94pub enum WatchScope {
95    /// Watch all keys in the bucket.
96    All,
97    /// Watch only keys beginning with this prefix.
98    Prefix(String),
99    /// Watch keys beginning with ANY of these prefixes, on a single consumer.
100    Prefixes(Vec<String>),
101}
102
103impl WatchScope {
104    /// The scope as a list of key prefixes (`All` = the empty prefix), for
105    /// callers that enumerate scope contents (live listings, fold ranges).
106    fn prefixes(&self) -> Vec<String> {
107        match self {
108            WatchScope::All => vec![String::new()],
109            WatchScope::Prefix(p) => vec![p.clone()],
110            WatchScope::Prefixes(ps) => ps.clone(),
111        }
112    }
113}
114
115/// Internal: a cursor-expired resync handoff from the watch task to the main
116/// loop. Carries the bucket's live key listing for the watch scope; the main
117/// loop diffs it against the fold and applies synthetic deletes for keys that
118/// vanished during the gap, then acks so the watch task can start the fallback
119/// watch — the ack ordering guarantees every synthetic delete is applied before
120/// the first re-list put arrives.
121struct ResyncRequest {
122    live_keys: Vec<String>,
123    ack: oneshot::Sender<()>,
124}
125
126/// Internal: what the watch task needs to initiate a resync — the reader that
127/// lists live keys and the channel into the main loop that owns the fold.
128type ResyncHandle = (Arc<dyn KvReader>, mpsc::Sender<ResyncRequest>);
129
130/// Batching policy for [`watch_applied`].
131///
132/// A flush fires when **either** bound is hit, whichever comes first: `window`
133/// time has elapsed since the batch opened, or `max` updates have accumulated.
134/// The window amortizes the cost of `apply` (e.g. one route-table clone per
135/// flush instead of one per update); `max` caps memory and latency when updates
136/// arrive faster than the window.
137#[derive(Debug, Clone, Copy)]
138pub struct BatchConfig {
139    /// Maximum time a batch stays open before being flushed.
140    pub window: Duration,
141    /// Maximum number of parsed updates in a batch before forcing a flush.
142    pub max: usize,
143    /// Capacity of the internal watch-task → main-loop channel. When the main
144    /// loop falls behind (slow `apply`, blocking store flush), a full channel
145    /// backpressures the watch task — that is the design — but during initial
146    /// state-sync hydration of a large bucket the channel can fill faster than
147    /// the window flushes, making *this* the effective batch boundary rather
148    /// than [`max`](Self::max). Tune it together with `max` for high-fanout
149    /// hydration; clamped to a minimum of 1.
150    pub channel_capacity: usize,
151}
152
153impl Default for BatchConfig {
154    /// 10 ms / 100 updates — the de-facto default every hand-rolled caller
155    /// already used, lifted into one place — and the 256-deep channel the
156    /// loop always allocated, now tunable.
157    fn default() -> Self {
158        Self {
159            window: Duration::from_millis(10),
160            max: 100,
161            channel_capacity: 256,
162        }
163    }
164}
165
166/// Drive a watch with cursor-after-apply semantics.
167///
168/// Subscribes per `scope` (resuming from `resume` when it carries a position),
169/// batches updates per `config`, applies each batch via `apply`, and only then
170/// advances the cursor / folds the batch into `store` / calls `on_applied`.
171/// Returns the final applied cursor when the watch ends (shutdown signalled, or
172/// the underlying stream closed).
173///
174/// `store` is any [`SnapshotStore`] backend the consumer chose (the in-RAM
175/// [`AppendLogSnapshot`](crate::AppendLogSnapshot) default, an on-disk backend, or
176/// its own impl) — or `None` to run without persistence. On each flush, *after*
177/// `apply` returns, the whole batch of raw [`KvUpdate`]s is handed to
178/// `store.apply(batch, applied_cursor)` on a blocking task, so the store's
179/// persisted cursor is always the post-apply cursor and never names a revision
180/// whose `apply` had not returned. The store fold is atomic (data + cursor), so a
181/// crash leaves the store consistent and resume re-folds only the tail.
182///
183/// # Cursor expiry and stale-key resync
184///
185/// On [`KvError::CursorExpired`] from the `*_from` resume path, this logs and
186/// falls back to a full-scope watch (`watch_all` / `watch_prefix` /
187/// `watch_prefixes`), whose state-sync re-list re-delivers the current value of
188/// every in-scope key as puts. The re-list alone cannot cover keys that were
189/// **deleted during the gap** and whose delete markers the backend has since
190/// evicted — they simply don't appear, leaving the fold (and the caller's
191/// domain state) holding them forever.
192///
193/// When both `reader` and `store` are provided, the expiry path closes that
194/// hole: before the fallback watch starts, the bucket's live keys are listed
195/// via `reader`, diffed against the fold's in-scope keys, and a synthetic
196/// [`KvUpdate::Delete`] (with an unknown [`VersionToken`](crate::VersionToken)) is run through
197/// `parse`/`apply`/`store` for each key that vanished. The synthetic deletes
198/// are strictly ordered before the first re-list put, so a key deleted and
199/// re-created during the gap converges correctly. Without a `reader` (or
200/// without a `store` to diff against) the fallback is re-list-only and a
201/// warning marks the possible stale keys.
202///
203/// A resync that was armed but FAILS (live-key listing or fold diff error) is
204/// fatal to the watch — degrading to re-list-only would silently leave
205/// deleted keys in the fold (`tests/model.rs` proves that divergence
206/// reachable), so the error surfaces and the caller's restart retries the
207/// resume → expiry → resync path from scratch.
208///
209/// See `ARCHITECTURE.md` ("Applied-Cursor Watch") for the invariant and its
210/// rationale.
211///
212/// # Type parameters
213/// - `U`: the caller's domain update type, produced by `parse` and consumed by
214///   `apply`.
215// This combinator takes each of its dependencies as a parameter so every
216// caller-supplied closure (`parse`/`apply`/`on_applied`) keeps its own distinct
217// type and is monomorphized at the call site. Folding them into a builder struct
218// would either box the closures or force a single generic bundle, losing that.
219#[allow(clippy::too_many_arguments)]
220// The flush macro resets `batch_high`/`batch_deadline` for the next loop
221// iteration. At the two flush sites that return immediately afterward (shutdown,
222// channel-close) those resets are dead stores — correct, but flagged. The allow
223// must sit on the function: a statement-scoped `#[allow]` inside the macro body
224// trips the experimental attributes-on-expressions gate (E0658) on stable.
225#[allow(unused_assignments)]
226pub async fn watch_applied<U, S, P, A, O>(
227    watcher: Arc<dyn KvWatcher>,
228    scope: WatchScope,
229    resume: Option<WatchCursor>,
230    // `Some(reader)` arms the cursor-expired stale-key resync (see the function
231    // docs); `None` keeps the re-list-only fallback. Only consulted on expiry —
232    // the hot path never touches it.
233    reader: Option<Arc<dyn KvReader>>,
234    mut store: Option<S>,
235    // `Some(rx)` arms an export-request arm in the select loop: each
236    // [`ExportRequest`] is handled between flushes (pending batch flushed
237    // first), so the exported artifact's cursor is the applied cursor (or,
238    // across a transiently failed store flush, the store's own lagging but
239    // self-consistent cursor — never a cursor past unfolded data).
240    // `None` (or dropping the paired sender) leaves the loop's behavior
241    // unchanged.
242    mut exports: Option<mpsc::Receiver<ExportRequest>>,
243    config: BatchConfig,
244    mut parse: P,
245    mut apply: A,
246    mut on_applied: O,
247    mut shutdown: watch::Receiver<bool>,
248) -> Result<WatchCursor, KvError>
249where
250    U: Send,
251    // `Send + 'static`: each flush moves `store` onto a blocking task to run its
252    // (potentially blocking) `apply`, then takes it back — the same offload the
253    // append log's compaction always used.
254    S: SnapshotStore + Send + 'static,
255    P: FnMut(&KvUpdate) -> Option<U> + Send,
256    A: FnMut(Vec<U>) + Send,
257    O: FnMut(WatchCursor) + Send,
258{
259    // The cursor we'll return. Initialized from the resume position so that a
260    // watch which receives nothing new still reports the position it resumed
261    // from as "applied" (it is — everything up to it was applied before the last
262    // run persisted it).
263    let mut applied = match &resume {
264        Some(c) => c.clone(),
265        None => WatchCursor::none(),
266    };
267
268    // The scope's prefixes, for the resync diff against the fold. Cloned out
269    // before `scope` moves into the watch task.
270    let scope_prefixes = scope.prefixes();
271
272    // Cursor-expired resync channel, armed only when there is a reader to list
273    // live keys AND a store to diff them against. The watch task sends the live
274    // listing here and waits for the ack before starting the fallback watch, so
275    // synthetic deletes always precede the re-list.
276    let (resync_pair, mut resyncs): (Option<ResyncHandle>, Option<mpsc::Receiver<ResyncRequest>>) =
277        match reader {
278            Some(reader) if store.is_some() => {
279                let (rs_tx, rs_rx) = mpsc::channel(1);
280                (Some((reader, rs_tx)), Some(rs_rx))
281            }
282            _ => (None, None),
283        };
284
285    // Spawn the watch task. It owns the cursor-expired fallback so the main loop
286    // only ever sees a clean ordered stream of updates on `rx`.
287    let (tx, mut rx) = mpsc::channel::<KvUpdate>(config.channel_capacity.max(1));
288    let handle = {
289        let watcher = Arc::clone(&watcher);
290        tokio::spawn(
291            async move { run_watch(watcher.as_ref(), &scope, resume, resync_pair, tx).await },
292        )
293    };
294
295    // Batch state.
296    //
297    // `batch_high` tracks the version of the most recently *received* update
298    // since the last flush — including updates `parse` rejected. NATS delivers
299    // in revision order, so the last received is the highest, and advancing the
300    // cursor to it after a single atomic `apply` is correct: having seen the max
301    // means we've seen everything below it, and a rejected entry is still
302    // "nothing to apply", hence covered. Reset to `none()` after every flush.
303    // Pre-size to the flush bound so no batch ever re-climbs the reallocation
304    // ladder; `max(1)` only guards a nonsensical `max = 0` config.
305    let batch_cap = config.max.max(1);
306    let mut batch: Vec<U> = Vec::with_capacity(batch_cap);
307    // Raw received updates for the durable `store`, in revision order. Only
308    // populated when a `store` is present; the store folds the *raw* updates
309    // (including ones `parse` rejected — they are still part of the bucket's
310    // state), whereas the parsed `batch` above is the consumer's domain view.
311    let mut raw_batch: Vec<KvUpdate> = Vec::new();
312    let mut batch_high = WatchCursor::none();
313    // Consecutive store-apply failures. A transient failure re-queues its raw
314    // batch (cursor authority: the store's cursor and contents advance
315    // together, always); a persistent streak fail-stops before the re-queued
316    // backlog grows without bound.
317    const MAX_STORE_APPLY_FAILURES: u32 = 16;
318    let mut store_fail_streak: u32 = 0;
319    // `Some` once a batch has opened and the window timer is armed; `None`
320    // between flushes. Only the armed/idle distinction is read in the loop —
321    // the absolute instant lives in the pinned `sleep` future below.
322    let mut batch_deadline: Option<tokio::time::Instant> = None;
323
324    // Flush the current batch, in order: run the domain `apply` (if non-empty) to
325    // completion, advance the cursor, fold the raw batch + cursor durably into
326    // `store`, then fire `on_applied`. The store fold runs on a blocking task
327    // (its `apply` may block on I/O), moving the store in and taking it back — the
328    // same offload the append log's compaction always used. A TRANSIENT store
329    // error re-queues the raw batch for cumulative commit on the next flush
330    // (the watch continues; the store's cursor never advances past data it
331    // dropped) and a persistent failure streak is fatal; a panicked
332    // blocking task drops the store irrecoverably, which breaks the
333    // resume-after-restart guarantee, so it is surfaced as fatal.
334    macro_rules! flush {
335        () => {{
336            // Nothing received since the last flush → nothing to do at all.
337            // (`raw_batch` can be non-empty with no cursor advance only via the
338            // resync path's synthetic deletes, which carry no revision.)
339            if !batch.is_empty() || !raw_batch.is_empty() || !batch_high.is_none() {
340                if !batch.is_empty() {
341                    // INVARIANT: apply() runs and RETURNS before any cursor
342                    // advance below. Move the batch out so a panicking apply
343                    // can't leave half-consumed state behind.
344                    //
345                    // `replace` (not `take`) leaves a pre-sized Vec behind so each
346                    // batch after the first doesn't re-climb the reallocation
347                    // ladder (4→8→…→cap).
348                    apply(std::mem::replace(&mut batch, Vec::with_capacity(batch_cap)));
349                }
350                let advanced = !batch_high.is_none();
351                if advanced {
352                    applied = batch_high.clone();
353                }
354                if !raw_batch.is_empty()
355                    && let Some(mut st) = store.take()
356                {
357                    let raw = std::mem::take(&mut raw_batch);
358                    // Fold at the post-advance cursor. A synthetic-deletes-only
359                    // batch leaves the cursor where it was (the deletes are a
360                    // state correction, not log entries), which is safe: an
361                    // unchanged — possibly expired — cursor only ever re-runs
362                    // this same resync on the next restart.
363                    let cur = applied.clone();
364                    // Hand the store AND the raw batch back on a clean return:
365                    // a *failed* apply (Ok(Err)) RE-QUEUES the batch so the
366                    // next flush commits it cumulatively — the store's cursor
367                    // and contents always advance together. Dropping the
368                    // failed batch instead lets the NEXT successful flush
369                    // advance the cursor over a hole that survives every
370                    // restart (reproduced by
371                    // `transient_store_failure_never_leaves_a_cursor_gap`).
372                    // Only a *panicked* task (Err) loses the store: fatal.
373                    match tokio::task::spawn_blocking(move || {
374                        let res = st.apply(&raw, &cur);
375                        (st, raw, res)
376                    })
377                    .await
378                    {
379                        Ok((st, _raw, Ok(()))) => {
380                            store = Some(st);
381                            store_fail_streak = 0;
382                        }
383                        Ok((st, raw, Err(e))) => {
384                            store_fail_streak += 1;
385                            if store_fail_streak >= MAX_STORE_APPLY_FAILURES {
386                                // A persistently failing store would otherwise
387                                // grow the re-queued batch without bound while
388                                // the fold silently stales. Fail-stop: the
389                                // restart refolds the tail from the store's
390                                // last good cursor.
391                                warn!(error = %e, streak = store_fail_streak,
392                                    "snapshot store apply failing persistently; aborting watch");
393                                handle.abort();
394                                return Err(KvError::WatchError(format!(
395                                    "snapshot store apply failed {store_fail_streak} consecutive times: {e}"
396                                )));
397                            }
398                            warn!(error = %e, streak = store_fail_streak,
399                                "snapshot store apply failed; batch re-queued for the next flush");
400                            store = Some(st);
401                            // Prepend: the failed range precedes anything
402                            // received since (stream order is preserved for
403                            // the eventual cumulative commit).
404                            let newer = std::mem::replace(&mut raw_batch, raw);
405                            raw_batch.extend(newer);
406                        }
407                        Err(e) => {
408                            warn!(error = %e, "snapshot store task panicked; aborting watch");
409                            handle.abort();
410                            return Err(KvError::WatchError(format!(
411                                "snapshot store task panicked: {e}"
412                            )));
413                        }
414                    }
415                }
416                if advanced {
417                    on_applied(applied.clone());
418                    batch_high = WatchCursor::none();
419                }
420            }
421            batch_deadline = None;
422        }};
423    }
424
425    // A single timer future, reset in place each time a batch opens. The old
426    // `tokio::time::sleep(timeout)` lived inside the select arm, so it was
427    // re-created on every loop iteration — one Arc-backed timer-wheel entry
428    // allocated, registered, and immediately dropped per received update.
429    // Pinning one future and `reset`-ing it reuses that single allocation; the
430    // `if batch_deadline.is_some()` guard keeps it from firing while idle, so
431    // its initial already-elapsed deadline is never observed.
432    let sleep = tokio::time::sleep(Duration::ZERO);
433    tokio::pin!(sleep);
434
435    loop {
436        tokio::select! {
437            biased;
438
439            // Shutdown wins: flush whatever is batched (so the cursor reflects
440            // it), abandon any updates still in flight on the channel — they
441            // weren't applied, the cursor doesn't claim them, and they'll be
442            // re-delivered on the next resume — and return the applied cursor.
443            res = shutdown.changed() => {
444                if res.is_err() || *shutdown.borrow() {
445                    flush!();
446                    handle.abort();
447                    // Observe the task's terminal state. An abort surfaces as a
448                    // cancelled JoinError, which we ignore; a genuine panic that
449                    // raced ahead of the abort is logged rather than silently lost.
450                    if let Err(join) = handle.await
451                        && !join.is_cancelled()
452                    {
453                        warn!(error = %join, "watch task panicked at shutdown");
454                    }
455                    return Ok(applied);
456                }
457            }
458
459            // Batch window elapsed.
460            () = &mut sleep, if batch_deadline.is_some() => {
461                flush!();
462            }
463
464            // Cursor-expired resync. Placed before `rx.recv()` (biased) so the
465            // synthetic deletes are folded before any update the fallback watch
466            // delivers — though the ack protocol already guarantees the fallback
467            // hasn't started while this arm runs. Diff the fold's in-scope keys
468            // against the bucket's live listing; anything the fold holds that
469            // the bucket no longer does vanished during the gap (its delete
470            // marker evicted with the cursor), so synthesize the delete the
471            // re-list can't deliver.
472            req = async { resyncs.as_mut().expect("arm guarded by is_some").recv().await },
473                if resyncs.is_some() => {
474                match req {
475                    Some(ResyncRequest { live_keys, ack }) => {
476                        // Flush first so the diff runs against a fold that
477                        // reflects everything received so far.
478                        flush!();
479                        let live: std::collections::HashSet<&str> =
480                            live_keys.iter().map(String::as_str).collect();
481                        let mut stale: Vec<String> = Vec::new();
482                        if let Some(st) = &store {
483                            for prefix in &scope_prefixes {
484                                // Stream the fold's keys rather than `range()`,
485                                // which buffers every in-scope entry — values
486                                // included — into one Vec. On an on-disk backend
487                                // holding a fold larger than RAM (the case those
488                                // backends exist for), an All-scope resync would
489                                // materialize the entire fold on the repair
490                                // path. Only the keys matter for the diff.
491                                if let Err(e) = st.for_each_in_range(prefix, |entry| {
492                                    if !live.contains(entry.key.as_str()) {
493                                        stale.push(entry.key);
494                                    }
495                                    Ok(())
496                                }) {
497                                    // FATAL, not a degrade: an incomplete
498                                    // diff silently leaves deleted keys in
499                                    // the fold forever (tests/model.rs
500                                    // proves the divergence reachable
501                                    // under degrade semantics). Fail the
502                                    // watch; the restart re-runs the
503                                    // resume → expiry → resync from
504                                    // scratch.
505                                    warn!(error = %e, prefix = %prefix,
506                                        "resync fold scan failed; aborting watch rather than diverging");
507                                    handle.abort();
508                                    return Err(KvError::WatchError(format!(
509                                        "cursor-expired resync failed listing fold prefix {prefix:?}: {e}"
510                                    )));
511                                }
512                            }
513                        }
514                        // Overlapping prefixes can list a key twice.
515                        stale.sort_unstable();
516                        stale.dedup();
517                        if !stale.is_empty() {
518                            warn!(stale = stale.len(), "cursor-expired resync: deleting keys that vanished during the gap");
519                        }
520                        for key in stale {
521                            // Synthetic: carries no revision (unknown version)
522                            // and so never advances the cursor.
523                            let u = KvUpdate::Delete {
524                                key,
525                                version: crate::kv::VersionToken::unknown(),
526                            };
527                            if store.is_some() {
528                                raw_batch.push(u.clone());
529                            }
530                            if let Some(parsed) = parse(&u) {
531                                batch.push(parsed);
532                            }
533                        }
534                        flush!();
535                        // Ack AFTER the deletes are applied: the watch task is
536                        // holding the fallback watch until it hears back, which
537                        // is what orders deletes before the re-list
538                        // (tests/model_resync_order.rs proves the barrier
539                        // load-bearing). If the flush's STORE apply failed
540                        // transiently, the deletes sit re-queued at the FRONT
541                        // of the raw batch — still strictly before any
542                        // re-list put in the eventual cumulative commit, and
543                        // the domain apply saw them before this ack either
544                        // way.
545                        let _ = ack.send(());
546                    }
547                    None => resyncs = None,
548                }
549            }
550
551            // Export request. Placed after shutdown/window (they stay prompt)
552            // and before `rx.recv()` so a firehose of updates cannot starve an
553            // export indefinitely. The pending batch is flushed first, so the
554            // exported cursor is exactly the applied cursor — except when
555            // that flush's store apply transiently failed (batch re-queued):
556            // the export then captures the store's OWN lagging cursor, which
557            // is still self-consistent with its contents (cursor authority,
558            // tests/model_applied.rs); the artifact never includes unfolded
559            // data, and a bootstrap from it simply replays the short gap.
560            // The export itself runs on a blocking task with the store moved
561            // in and taken back — the same offload the flush path uses.
562            req = async { exports.as_mut().expect("arm guarded by is_some").recv().await },
563                if exports.is_some() => {
564                match req {
565                    Some(ExportRequest { dest_dir, reply }) => {
566                        flush!();
567                        match store.take() {
568                            Some(mut st) => {
569                                match tokio::task::spawn_blocking(move || {
570                                    let res = st.export_to(&dest_dir);
571                                    (st, res)
572                                })
573                                .await
574                                {
575                                    // Hand the store back on any clean return; an
576                                    // export failure goes to the requester only —
577                                    // the watch keeps running (the snapshot is a
578                                    // cache). A panicked task lost the store,
579                                    // which breaks the resume guarantee: fatal,
580                                    // same as the flush path's apply panic.
581                                    Ok((st, res)) => {
582                                        store = Some(st);
583                                        let _ = reply.send(res);
584                                    }
585                                    Err(e) => {
586                                        warn!(error = %e, "snapshot export task panicked; aborting watch");
587                                        handle.abort();
588                                        return Err(KvError::WatchError(format!(
589                                            "snapshot export task panicked: {e}"
590                                        )));
591                                    }
592                                }
593                            }
594                            None => {
595                                let _ = reply.send(Err(SnapshotError::Backend(
596                                    "watch_applied runs without a snapshot store; nothing to export"
597                                        .into(),
598                                )));
599                            }
600                        }
601                    }
602                    // Sender dropped: disarm the arm for the rest of the run.
603                    None => exports = None,
604                }
605            }
606
607            update = rx.recv() => {
608                match update {
609                    Some(u) => {
610                        // Cursor authority: every received update bumps the
611                        // pending high-water, regardless of whether `parse`
612                        // keeps it — but only when it carries a real position.
613                        // An unknown version (e.g. an unparseable ACK subject
614                        // on the hand-built multi-prefix consumer path) must
615                        // neither mint a fake cursor nor clobber the real high
616                        // from earlier in the batch; skipping it under-advances
617                        // at worst, and re-delivery on resume is idempotent.
618                        if !u.version().is_unknown() {
619                            batch_high = WatchCursor::from_version(u.version().clone());
620                        }
621
622                        // Buffer the raw update for the durable store fold (which
623                        // commits the whole batch + cursor atomically on flush).
624                        // Done before `parse` consumes `u` by reference, and only
625                        // when a store is present so the no-persistence path keeps
626                        // its zero-copy cost.
627                        if store.is_some() {
628                            raw_batch.push(u.clone());
629                        }
630
631                        if let Some(parsed) = parse(&u) {
632                            batch.push(parsed);
633                        }
634
635                        // Arm the window on the first received update of a batch
636                        // — even a parse-rejected one, so the cursor advances
637                        // within `window` even through a run of irrelevant keys.
638                        // Reset the pinned timer to the new deadline rather than
639                        // allocating a fresh `Sleep`.
640                        if batch_deadline.is_none() {
641                            let deadline = tokio::time::Instant::now() + config.window;
642                            sleep.as_mut().reset(deadline);
643                            batch_deadline = Some(deadline);
644                        }
645
646                        // Flush on a full parsed batch, or — when persisting — a
647                        // full raw batch, so a window packed with parse-rejected
648                        // updates can't grow `raw_batch` without bound before the
649                        // window elapses.
650                        if batch.len() >= config.max || raw_batch.len() >= config.max {
651                            flush!();
652                        }
653                    }
654                    None => {
655                        // Stream closed. Flush the remainder, then surface the
656                        // watch task's terminal result: a clean end returns the
657                        // applied cursor, an error propagates.
658                        flush!();
659                        return match handle.await {
660                            Ok(Ok(())) => Ok(applied),
661                            Ok(Err(e)) => Err(e),
662                            Err(join) => Err(KvError::WatchError(format!(
663                                "watch task panicked: {join}"
664                            ))),
665                        };
666                    }
667                }
668            }
669        }
670    }
671}
672
673/// Run the underlying watch for `scope`, resuming from `resume` when it carries
674/// a position, with the [`KvError::CursorExpired`] → resync + full-watch
675/// fallback.
676async fn run_watch(
677    watcher: &dyn KvWatcher,
678    scope: &WatchScope,
679    resume: Option<WatchCursor>,
680    resync: Option<ResyncHandle>,
681    tx: mpsc::Sender<KvUpdate>,
682) -> Result<(), KvError> {
683    // Resume only when the cursor carries a real position; an absent or `none()`
684    // cursor falls through to a full watch. Binding `cursor` here makes "we have a
685    // resume position" structural — there is no separate bool whose truth a later
686    // edit could let drift from the `Some`.
687    let resume_cursor = resume.filter(|c| !c.is_none());
688
689    match scope {
690        WatchScope::All => {
691            if let Some(cursor) = resume_cursor {
692                match watcher.watch_all_from(&cursor, tx.clone()).await {
693                    Err(KvError::CursorExpired) => {
694                        warn!(
695                            "watch cursor expired; resyncing, then falling back to full watch_all"
696                        );
697                        resync_stale_keys(scope, &resync).await?;
698                        watcher.watch_all(tx).await
699                    }
700                    other => other,
701                }
702            } else {
703                watcher.watch_all(tx).await
704            }
705        }
706        WatchScope::Prefix(prefix) => {
707            if let Some(cursor) = resume_cursor {
708                match watcher.watch_prefix_from(prefix, &cursor, tx.clone()).await {
709                    Err(KvError::CursorExpired) => {
710                        warn!(
711                            "watch cursor expired; resyncing, then falling back to full watch_prefix"
712                        );
713                        resync_stale_keys(scope, &resync).await?;
714                        watcher.watch_prefix(prefix, tx).await
715                    }
716                    other => other,
717                }
718            } else {
719                watcher.watch_prefix(prefix, tx).await
720            }
721        }
722        WatchScope::Prefixes(prefixes) => {
723            let refs: Vec<&str> = prefixes.iter().map(String::as_str).collect();
724            if let Some(cursor) = resume_cursor {
725                match watcher
726                    .watch_prefixes_from(&refs, &cursor, tx.clone())
727                    .await
728                {
729                    Err(KvError::CursorExpired) => {
730                        warn!(
731                            "watch cursor expired; resyncing, then falling back to full watch_prefixes"
732                        );
733                        resync_stale_keys(scope, &resync).await?;
734                        watcher.watch_prefixes(&refs, tx).await
735                    }
736                    other => other,
737                }
738            } else {
739                watcher.watch_prefixes(&refs, tx).await
740            }
741        }
742    }
743}
744
745/// Cursor-expired stale-key resync, run BEFORE the fallback watch is
746/// established: list the scope's live keys, hand them to the main loop (which
747/// diffs them against the fold and applies synthetic deletes), and wait for the
748/// ack. That ordering — deletes applied, then fallback watch armed — is what
749/// makes a delete-then-recreate during the gap converge: the synthetic delete
750/// always lands before the re-list put.
751///
752/// With no reader/store wired (`resync` is `None`) the caller explicitly opted
753/// out: warn and fall back re-list-only (keys deleted during the gap stay in
754/// the fold — `tests/model.rs` pins this divergence as reachable).
755///
756/// A FAILED listing, by contrast, is **fatal** — it fails the watch rather
757/// than degrading. The resync is load-bearing for the "stale, never corrupt"
758/// convergence guarantee: a silently degraded resync leaves the fold holding
759/// keys the bucket deleted, with one warn line as the only witness
760/// (`tests/model.rs` proves this divergence reachable under degrade
761/// semantics). Failing the watch turns the violated guarantee into a visible
762/// error; the caller's restart re-resumes, hits `CursorExpired` again, and
763/// retries the resync from scratch.
764async fn resync_stale_keys(
765    scope: &WatchScope,
766    resync: &Option<ResyncHandle>,
767) -> Result<(), KvError> {
768    let Some((reader, resync_tx)) = resync else {
769        warn!(
770            "no reader wired for cursor-expired resync; keys deleted during the gap may persist in the fold"
771        );
772        return Ok(());
773    };
774    let mut live_keys = Vec::new();
775    for prefix in scope.prefixes() {
776        match reader.keys(&prefix).await {
777            Ok(keys) => live_keys.extend(keys),
778            Err(e) => {
779                return Err(KvError::WatchError(format!(
780                    "cursor-expired resync failed listing live keys under {prefix:?}: {e}; \
781                     failing the watch rather than silently keeping stale keys"
782                )));
783            }
784        }
785    }
786    let (ack_tx, ack_rx) = oneshot::channel();
787    if resync_tx
788        .send(ResyncRequest {
789            live_keys,
790            ack: ack_tx,
791        })
792        .await
793        .is_ok()
794    {
795        // A dropped ack (main loop shutting down) just means the fallback watch
796        // is about to die with it; nothing to recover.
797        let _ = ack_rx.await;
798    }
799    Ok(())
800}
801
802#[cfg(test)]
803mod tests {
804    use super::*;
805    use crate::kv::{KvEntry, VersionToken};
806    use crate::snapshot::AppendLogSnapshot;
807    use async_trait::async_trait;
808    use std::sync::Mutex;
809    use std::sync::atomic::{AtomicU64, Ordering};
810    use tokio::sync::mpsc::Sender;
811
812    fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
813        KvUpdate::Put(KvEntry {
814            key: key.to_string(),
815            value: value.to_vec(),
816            version: VersionToken::from_u64(rev),
817        })
818    }
819
820    /// A scripted watcher. Delivers a pre-set list of updates through the
821    /// channel, then either holds the channel open (so window/max/shutdown
822    /// flushes can be exercised without the stream ending) or returns cleanly
823    /// (so channel-close flushing can be exercised).
824    struct MockWatcher {
825        full: Mutex<Option<Vec<KvUpdate>>>,
826        from: Mutex<Option<Vec<KvUpdate>>>,
827        from_expires: bool,
828        hold: bool,
829    }
830
831    impl MockWatcher {
832        fn new(updates: Vec<KvUpdate>, hold: bool) -> Self {
833            Self {
834                full: Mutex::new(Some(updates)),
835                from: Mutex::new(None),
836                from_expires: false,
837                hold,
838            }
839        }
840
841        async fn deliver(&self, which: &Mutex<Option<Vec<KvUpdate>>>, tx: Sender<KvUpdate>) {
842            let updates = which.lock().unwrap().take().unwrap_or_default();
843            for u in updates {
844                if tx.send(u).await.is_err() {
845                    return;
846                }
847            }
848            if self.hold {
849                // Keep `tx` alive (channel open) until this task is aborted.
850                std::future::pending::<()>().await;
851            }
852        }
853    }
854
855    #[async_trait]
856    impl KvWatcher for MockWatcher {
857        async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
858            self.deliver(&self.full, tx).await;
859            Ok(())
860        }
861
862        async fn watch_prefix(&self, _prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
863            self.deliver(&self.full, tx).await;
864            Ok(())
865        }
866
867        async fn watch_prefixes(
868            &self,
869            _prefixes: &[&str],
870            tx: Sender<KvUpdate>,
871        ) -> Result<(), KvError> {
872            // This mock scripts the applied-watch resumption tests, not prefix
873            // filtering; it delivers the same `full` script as `watch_prefix`.
874            // The real multi-filter scoping is proved in the NATS integration test.
875            self.deliver(&self.full, tx).await;
876            Ok(())
877        }
878
879        async fn watch_all_from(
880            &self,
881            _cursor: &WatchCursor,
882            tx: Sender<KvUpdate>,
883        ) -> Result<(), KvError> {
884            if self.from_expires {
885                return Err(KvError::CursorExpired);
886            }
887            self.deliver(&self.from, tx).await;
888            Ok(())
889        }
890
891        // Mirror watch_all_from so the prefix resume / expiry arms of run_watch
892        // are exercised against the same `from` script. Without this the trait's
893        // default impl would delegate to watch_prefix and silently deliver the
894        // full set instead of the delta.
895        async fn watch_prefix_from(
896            &self,
897            _prefix: &str,
898            _cursor: &WatchCursor,
899            tx: Sender<KvUpdate>,
900        ) -> Result<(), KvError> {
901            if self.from_expires {
902                return Err(KvError::CursorExpired);
903            }
904            self.deliver(&self.from, tx).await;
905            Ok(())
906        }
907
908        // Same mirroring for the multi-prefix resume arm.
909        async fn watch_prefixes_from(
910            &self,
911            _prefixes: &[&str],
912            _cursor: &WatchCursor,
913            tx: Sender<KvUpdate>,
914        ) -> Result<(), KvError> {
915            if self.from_expires {
916                return Err(KvError::CursorExpired);
917            }
918            self.deliver(&self.from, tx).await;
919            Ok(())
920        }
921    }
922
923    /// A reader whose `keys()` serves a scripted live listing — the only call
924    /// the cursor-expired resync makes. Filters by prefix like a real backend
925    /// so prefix-scoped resyncs are exercised faithfully.
926    struct MockReader {
927        live: Vec<String>,
928    }
929
930    #[async_trait]
931    impl KvReader for MockReader {
932        async fn get(&self, _key: &str) -> Result<Option<KvEntry>, KvError> {
933            unreachable!("resync only lists keys")
934        }
935
936        async fn entry(&self, _key: &str) -> Result<Option<KvEntry>, KvError> {
937            unreachable!("resync only lists keys")
938        }
939
940        async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
941            Ok(self
942                .live
943                .iter()
944                .filter(|k| k.starts_with(prefix))
945                .cloned()
946                .collect())
947        }
948
949        async fn scan(&self, _prefix: &str) -> Result<Vec<KvEntry>, KvError> {
950            unreachable!("resync only lists keys")
951        }
952    }
953
954    /// A watcher whose entry points all fail. Used to prove the watch task's
955    /// terminal error is surfaced out of `watch_applied` rather than swallowed
956    /// as a clean `Ok(applied)` when the channel closes.
957    struct ErrorWatcher;
958
959    #[async_trait]
960    impl KvWatcher for ErrorWatcher {
961        async fn watch_all(&self, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
962            Err(KvError::WatchError("injected watch failure".into()))
963        }
964
965        async fn watch_prefix(&self, _prefix: &str, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
966            Err(KvError::WatchError("injected watch failure".into()))
967        }
968
969        async fn watch_prefixes(
970            &self,
971            _prefixes: &[&str],
972            _tx: Sender<KvUpdate>,
973        ) -> Result<(), KvError> {
974            Err(KvError::WatchError("injected watch failure".into()))
975        }
976    }
977
978    // A no-op parse that keeps every Put as the value bytes; drops deletes.
979    fn parse_put(u: &KvUpdate) -> Option<Vec<u8>> {
980        match u {
981            KvUpdate::Put(e) => Some(e.value.clone()),
982            _ => None,
983        }
984    }
985
986    /// The stream closes (hold = false) with a pending batch; the remainder is
987    /// flushed before returning, the returned cursor is the last revision, and
988    /// `on_applied` ran exactly once after `apply`.
989    #[tokio::test]
990    async fn flush_on_channel_close() {
991        let updates = vec![put("a", b"1", 1), put("b", b"2", 2), put("c", b"3", 3)];
992        let watcher = Arc::new(MockWatcher::new(updates, false));
993
994        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<Vec<u8>>>::new()));
995        let on_applied_cursors = Arc::new(Mutex::new(Vec::<u64>::new()));
996
997        let ab = Arc::clone(&applied_batches);
998        let oc = Arc::clone(&on_applied_cursors);
999        let (_sd_tx, sd_rx) = watch::channel(false);
1000
1001        let cursor = watch_applied(
1002            watcher,
1003            WatchScope::All,
1004            None,
1005            None, // reader (no resync in this test)
1006            None::<AppendLogSnapshot>,
1007            None,
1008            BatchConfig::default(),
1009            parse_put,
1010            move |batch| ab.lock().unwrap().push(batch),
1011            move |c| oc.lock().unwrap().push(c.as_u64().unwrap()),
1012            sd_rx,
1013        )
1014        .await
1015        .unwrap();
1016
1017        assert_eq!(cursor.as_u64(), Some(3));
1018        let batches = applied_batches.lock().unwrap();
1019        let flat: Vec<Vec<u8>> = batches.iter().flatten().cloned().collect();
1020        assert_eq!(flat, vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec()]);
1021        assert_eq!(*on_applied_cursors.lock().unwrap().last().unwrap(), 3);
1022    }
1023
1024    /// Fewer than `max` updates, then the channel idles: the window timer must
1025    /// flush them and advance the cursor.
1026    #[tokio::test(start_paused = true)]
1027    async fn flush_on_window() {
1028        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1029        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1030
1031        let applied = Arc::new(AtomicU64::new(0));
1032        let count = Arc::new(AtomicU64::new(0));
1033        let a = Arc::clone(&applied);
1034        let c = Arc::clone(&count);
1035        let (sd_tx, sd_rx) = watch::channel(false);
1036
1037        let task = tokio::spawn(watch_applied(
1038            watcher,
1039            WatchScope::All,
1040            None,
1041            None, // reader (no resync in this test)
1042            None::<AppendLogSnapshot>,
1043            None,
1044            BatchConfig::default(),
1045            parse_put,
1046            move |batch: Vec<Vec<u8>>| {
1047                c.fetch_add(batch.len() as u64, Ordering::SeqCst);
1048            },
1049            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1050            sd_rx,
1051        ));
1052
1053        // Let the window (10ms) elapse under virtual time.
1054        tokio::time::sleep(Duration::from_millis(50)).await;
1055        assert_eq!(
1056            count.load(Ordering::SeqCst),
1057            2,
1058            "window should have flushed"
1059        );
1060        assert_eq!(applied.load(Ordering::SeqCst), 2);
1061
1062        sd_tx.send(true).unwrap();
1063        let cursor = task.await.unwrap().unwrap();
1064        assert_eq!(cursor.as_u64(), Some(2));
1065    }
1066
1067    /// Exactly `max` updates fills a batch and flushes immediately — before the
1068    /// window would have elapsed.
1069    #[tokio::test(start_paused = true)]
1070    async fn flush_on_max() {
1071        let max = 4;
1072        let updates: Vec<_> = (1..=max as u64)
1073            .map(|i| put(&format!("k{i}"), b"v", i))
1074            .collect();
1075        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1076
1077        let flushes = Arc::new(Mutex::new(Vec::<usize>::new()));
1078        let f = Arc::clone(&flushes);
1079        let (sd_tx, sd_rx) = watch::channel(false);
1080
1081        let task = tokio::spawn(watch_applied(
1082            watcher,
1083            WatchScope::All,
1084            None,
1085            None, // reader (no resync in this test)
1086            None::<AppendLogSnapshot>,
1087            None,
1088            BatchConfig {
1089                window: Duration::from_secs(3600), // effectively never
1090                max,
1091                ..BatchConfig::default()
1092            },
1093            parse_put,
1094            move |batch: Vec<Vec<u8>>| f.lock().unwrap().push(batch.len()),
1095            move |_| {},
1096            sd_rx,
1097        ));
1098
1099        // Yield enough for the mock to push all `max` updates; the window is an
1100        // hour, so any flush is purely the max trigger.
1101        tokio::time::sleep(Duration::from_millis(1)).await;
1102        assert_eq!(
1103            *flushes.lock().unwrap(),
1104            vec![max],
1105            "a full batch should flush on max, not wait for the window"
1106        );
1107
1108        sd_tx.send(true).unwrap();
1109        task.await.unwrap().unwrap();
1110    }
1111
1112    /// A pending batch plus a shutdown signal: the batch is flushed and the
1113    /// applied cursor returned.
1114    #[tokio::test(start_paused = true)]
1115    async fn flush_on_shutdown() {
1116        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1117        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1118
1119        let applied = Arc::new(AtomicU64::new(0));
1120        let a = Arc::clone(&applied);
1121        let (sd_tx, sd_rx) = watch::channel(false);
1122
1123        let task = tokio::spawn(watch_applied(
1124            watcher,
1125            WatchScope::All,
1126            None,
1127            None, // reader (no resync in this test)
1128            None::<AppendLogSnapshot>,
1129            None,
1130            BatchConfig {
1131                window: Duration::from_secs(3600), // window won't fire
1132                max: 100,
1133                ..BatchConfig::default()
1134            },
1135            parse_put,
1136            move |_batch: Vec<Vec<u8>>| {},
1137            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1138            sd_rx,
1139        ));
1140
1141        // Give the mock time to deliver both updates into the pending batch.
1142        tokio::time::sleep(Duration::from_millis(1)).await;
1143        sd_tx.send(true).unwrap();
1144
1145        let cursor = task.await.unwrap().unwrap();
1146        assert_eq!(
1147            cursor.as_u64(),
1148            Some(2),
1149            "shutdown flushes the pending batch"
1150        );
1151        assert_eq!(applied.load(Ordering::SeqCst), 2);
1152    }
1153
1154    /// The cursor must not advance until `apply` has returned. We prove it by
1155    /// having `apply` read the cursor that `on_applied` last published: when the
1156    /// second batch is applied, the visible cursor must still be the *first*
1157    /// batch's — never the second's, which only becomes visible after this
1158    /// `apply` returns.
1159    #[tokio::test(start_paused = true)]
1160    async fn cursor_advances_only_after_apply() {
1161        // Two batches of `max` updates each.
1162        let max = 2usize;
1163        let updates: Vec<_> = (1..=4u64).map(|i| put(&format!("k{i}"), b"v", i)).collect();
1164        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1165
1166        // Cursor as last published by on_applied; starts at 0 (nothing applied).
1167        let published = Arc::new(AtomicU64::new(0));
1168        // What `apply` observed as the published cursor at the moment it ran.
1169        let seen_at_apply = Arc::new(Mutex::new(Vec::<u64>::new()));
1170
1171        let pub_for_apply = Arc::clone(&published);
1172        let seen = Arc::clone(&seen_at_apply);
1173        let pub_for_on = Arc::clone(&published);
1174        let (sd_tx, sd_rx) = watch::channel(false);
1175
1176        let task = tokio::spawn(watch_applied(
1177            watcher,
1178            WatchScope::All,
1179            None,
1180            None, // reader (no resync in this test)
1181            None::<AppendLogSnapshot>,
1182            None,
1183            BatchConfig {
1184                window: Duration::from_secs(3600),
1185                max,
1186                ..BatchConfig::default()
1187            },
1188            parse_put,
1189            move |_batch: Vec<Vec<u8>>| {
1190                // The cursor visible here is whatever the PREVIOUS flush
1191                // published — never this batch's, because we haven't returned.
1192                seen.lock()
1193                    .unwrap()
1194                    .push(pub_for_apply.load(Ordering::SeqCst));
1195            },
1196            move |cur| pub_for_on.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1197            sd_rx,
1198        ));
1199
1200        tokio::time::sleep(Duration::from_millis(1)).await;
1201        sd_tx.send(true).unwrap();
1202        task.await.unwrap().unwrap();
1203
1204        // First apply saw 0 (nothing applied yet); second apply saw 2 (first
1205        // batch's cursor), NOT 4. The cursor only reached 4 after the second
1206        // apply returned.
1207        assert_eq!(*seen_at_apply.lock().unwrap(), vec![0, 2]);
1208        assert_eq!(published.load(Ordering::SeqCst), 4);
1209    }
1210
1211    /// Updates whose `parse` returns `None` (corrupt / irrelevant) carry no
1212    /// domain work, but they were still received — so the cursor must advance
1213    /// over them.
1214    #[tokio::test]
1215    async fn corrupt_parse_entries_advance_cursor() {
1216        let updates = vec![put("a", b"1", 5), put("b", b"2", 6), put("c", b"3", 7)];
1217        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1218
1219        let apply_calls = Arc::new(AtomicU64::new(0));
1220        let on_applied_max = Arc::new(AtomicU64::new(0));
1221        let ac = Arc::clone(&apply_calls);
1222        let om = Arc::clone(&on_applied_max);
1223        let (_sd_tx, sd_rx) = watch::channel(false);
1224
1225        let cursor = watch_applied(
1226            watcher,
1227            WatchScope::All,
1228            None,
1229            None, // reader (no resync in this test)
1230            None::<AppendLogSnapshot>,
1231            None,
1232            BatchConfig::default(),
1233            // Reject everything — simulates corrupt/irrelevant entries.
1234            |_u: &KvUpdate| -> Option<Vec<u8>> { None },
1235            move |batch: Vec<Vec<u8>>| {
1236                ac.fetch_add(1, Ordering::SeqCst);
1237                assert!(batch.is_empty());
1238            },
1239            move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1240            sd_rx,
1241        )
1242        .await
1243        .unwrap();
1244
1245        assert_eq!(cursor.as_u64(), Some(7), "cursor covers rejected updates");
1246        assert_eq!(
1247            apply_calls.load(Ordering::SeqCst),
1248            0,
1249            "an all-rejected batch applies nothing"
1250        );
1251        assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1252    }
1253
1254    /// An update carrying the UNKNOWN version (an unparseable ACK subject on
1255    /// the hand-built multi-prefix consumer path) must neither mint a cursor
1256    /// position nor clobber the real high-water from earlier in the batch.
1257    /// Pre-guard behavior: `kv_message_to_update` fabricated revision 0 for
1258    /// such updates and the unconditional `batch_high = ...` adopted it,
1259    /// regressing the persisted cursor to 0. The update itself is still
1260    /// applied — only the cursor ignores it.
1261    #[tokio::test]
1262    async fn unknown_version_update_does_not_move_or_clobber_cursor() {
1263        let unknown_put = KvUpdate::Put(KvEntry {
1264            key: "u".to_string(),
1265            value: b"x".to_vec(),
1266            version: VersionToken::unknown(),
1267        });
1268        let updates = vec![put("a", b"1", 5), unknown_put];
1269        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1270
1271        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1272        let ab = Arc::clone(&applied_batches);
1273        let (_sd_tx, sd_rx) = watch::channel(false);
1274
1275        let cursor = watch_applied(
1276            watcher,
1277            WatchScope::All,
1278            None,
1279            None, // reader (no resync in this test)
1280            None::<AppendLogSnapshot>,
1281            None,
1282            BatchConfig::default(),
1283            parse_put,
1284            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1285            move |_| {},
1286            sd_rx,
1287        )
1288        .await
1289        .unwrap();
1290
1291        assert_eq!(
1292            cursor.as_u64(),
1293            Some(5),
1294            "the unknown-version update must not clobber the real batch high"
1295        );
1296        assert_eq!(
1297            *applied_batches.lock().unwrap(),
1298            vec![b"1".to_vec(), b"x".to_vec()],
1299            "the unknown-version update is still applied"
1300        );
1301    }
1302
1303    /// A resume whose cursor has expired falls back to the full watch and still
1304    /// applies the delivered updates.
1305    #[tokio::test]
1306    async fn cursor_expired_falls_back_to_full_watch() {
1307        let mock = MockWatcher {
1308            full: Mutex::new(Some(vec![put("a", b"1", 10), put("b", b"2", 11)])),
1309            from: Mutex::new(Some(vec![])),
1310            from_expires: true,
1311            hold: false,
1312        };
1313        let watcher = Arc::new(mock);
1314
1315        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1316        let ab = Arc::clone(&applied_batches);
1317        let (_sd_tx, sd_rx) = watch::channel(false);
1318
1319        let cursor = watch_applied(
1320            watcher,
1321            WatchScope::All,
1322            Some(WatchCursor::from_u64(5)), // resume position that "expired"
1323            None,                           // reader (no resync in this test)
1324            None::<AppendLogSnapshot>,
1325            None,
1326            BatchConfig::default(),
1327            parse_put,
1328            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1329            move |_| {},
1330            sd_rx,
1331        )
1332        .await
1333        .unwrap();
1334
1335        assert_eq!(cursor.as_u64(), Some(11));
1336        assert_eq!(
1337            *applied_batches.lock().unwrap(),
1338            vec![b"1".to_vec(), b"2".to_vec()],
1339            "fallback full watch's updates were applied"
1340        );
1341    }
1342
1343    /// Cursor-expired resync: with a reader + store wired, a key the fold holds
1344    /// that the live listing no longer does gets a synthetic delete — applied
1345    /// strictly BEFORE the fallback re-list — and the persisted fold converges
1346    /// to the live state. The synthetic delete (unknown version) must not move
1347    /// the cursor; the re-list put must.
1348    #[tokio::test]
1349    async fn cursor_expired_resync_deletes_stale_keys() {
1350        let dir = tempfile::TempDir::new().unwrap();
1351        let path = dir.path().join("resync.snap");
1352        let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1353        // The fold from the previous run: node.a and node.b at cursor 2.
1354        store
1355            .apply(
1356                &[put("node.a", b"1", 1), put("node.b", b"2", 2)],
1357                &WatchCursor::from_u64(2),
1358            )
1359            .unwrap();
1360
1361        // During the gap node.b was deleted (marker since evicted) and node.a
1362        // updated; the resume cursor (2) has expired. The fallback re-list
1363        // therefore carries only the surviving key.
1364        let mock = MockWatcher {
1365            full: Mutex::new(Some(vec![put("node.a", b"1b", 10)])),
1366            from: Mutex::new(Some(vec![])),
1367            from_expires: true,
1368            hold: false,
1369        };
1370        let reader = MockReader {
1371            live: vec!["node.a".to_string()],
1372        };
1373
1374        // Record everything `parse` sees, in order, deletes included.
1375        let seen = Arc::new(Mutex::new(Vec::<(String, bool)>::new()));
1376        let s = Arc::clone(&seen);
1377        let (_sd_tx, sd_rx) = watch::channel(false);
1378
1379        let cursor = watch_applied(
1380            Arc::new(mock),
1381            WatchScope::All,
1382            Some(WatchCursor::from_u64(2)),
1383            Some(Arc::new(reader) as Arc<dyn KvReader>),
1384            Some(store),
1385            None,
1386            BatchConfig::default(),
1387            move |u: &KvUpdate| {
1388                s.lock()
1389                    .unwrap()
1390                    .push((u.key().to_string(), matches!(u, KvUpdate::Delete { .. })));
1391                Some(())
1392            },
1393            |_batch: Vec<()>| {},
1394            |_| {},
1395            sd_rx,
1396        )
1397        .await
1398        .unwrap();
1399
1400        // The re-list put advanced the cursor; the synthetic delete did not.
1401        assert_eq!(cursor.as_u64(), Some(10));
1402        // The synthetic delete strictly precedes the re-list put.
1403        assert_eq!(
1404            *seen.lock().unwrap(),
1405            vec![("node.b".to_string(), true), ("node.a".to_string(), false)],
1406            "synthetic delete must be applied before the fallback re-list"
1407        );
1408
1409        // The persisted fold converged: stale key gone, live key updated.
1410        let snap = crate::snapshot::load(&path).unwrap().unwrap();
1411        assert_eq!(snap.cursor.as_u64(), Some(10));
1412        assert_eq!(snap.entries.len(), 1);
1413        assert_eq!(snap.entries["node.a"].value, b"1b");
1414    }
1415
1416    /// A prefix-scoped resync diffs only in-scope keys: an out-of-scope key the
1417    /// fold holds survives, the in-scope stale key is deleted, and a flush
1418    /// containing only synthetic deletes leaves the cursor untouched.
1419    #[tokio::test]
1420    async fn cursor_expired_resync_respects_scope() {
1421        let dir = tempfile::TempDir::new().unwrap();
1422        let path = dir.path().join("resync-scope.snap");
1423        let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1424        store
1425            .apply(
1426                &[put("node.b", b"2", 1), put("other.z", b"9", 2)],
1427                &WatchCursor::from_u64(2),
1428            )
1429            .unwrap();
1430
1431        // Expired resume; the bucket no longer has ANY node.* keys; the
1432        // fallback re-list is empty.
1433        let mock = MockWatcher {
1434            full: Mutex::new(Some(vec![])),
1435            from: Mutex::new(Some(vec![])),
1436            from_expires: true,
1437            hold: false,
1438        };
1439        let reader = MockReader { live: vec![] };
1440        let (_sd_tx, sd_rx) = watch::channel(false);
1441
1442        let cursor = watch_applied(
1443            Arc::new(mock),
1444            WatchScope::Prefix("node.".to_string()),
1445            Some(WatchCursor::from_u64(2)),
1446            Some(Arc::new(reader) as Arc<dyn KvReader>),
1447            Some(store),
1448            None,
1449            BatchConfig::default(),
1450            |_u: &KvUpdate| Some(()),
1451            |_batch: Vec<()>| {},
1452            |_| {},
1453            sd_rx,
1454        )
1455        .await
1456        .unwrap();
1457
1458        // Deletes-only flush: cursor stays at the resume position.
1459        assert_eq!(cursor.as_u64(), Some(2));
1460
1461        let snap = crate::snapshot::load(&path).unwrap().unwrap();
1462        assert_eq!(snap.cursor.as_u64(), Some(2));
1463        assert!(
1464            !snap.entries.contains_key("node.b"),
1465            "in-scope stale key must be resync-deleted"
1466        );
1467        assert_eq!(
1468            snap.entries["other.z"].value, b"9",
1469            "out-of-scope key must survive a prefix-scoped resync"
1470        );
1471    }
1472
1473    /// `WatchScope::Prefixes` dispatches to `watch_prefixes` (no resume) and to
1474    /// `watch_prefixes_from` with the expiry → full-watch fallback (resume).
1475    #[tokio::test]
1476    async fn prefixes_scope_dispatches_full_watch() {
1477        let updates = vec![put("a.x", b"1", 1), put("b.y", b"2", 2)];
1478        let watcher = Arc::new(MockWatcher::new(updates, false));
1479        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1480        let ab = Arc::clone(&applied_batches);
1481        let (_sd_tx, sd_rx) = watch::channel(false);
1482
1483        let cursor = watch_applied(
1484            watcher,
1485            WatchScope::Prefixes(vec!["a.".to_string(), "b.".to_string()]),
1486            None,
1487            None, // reader (no resync in this test)
1488            None::<AppendLogSnapshot>,
1489            None,
1490            BatchConfig::default(),
1491            parse_put,
1492            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1493            move |_| {},
1494            sd_rx,
1495        )
1496        .await
1497        .unwrap();
1498
1499        assert_eq!(cursor.as_u64(), Some(2));
1500        assert_eq!(
1501            *applied_batches.lock().unwrap(),
1502            vec![b"1".to_vec(), b"2".to_vec()]
1503        );
1504    }
1505
1506    /// `WatchScope::Prefixes` resume whose cursor has expired falls back to the
1507    /// full multi-prefix watch and applies its updates.
1508    #[tokio::test]
1509    async fn prefixes_scope_expired_resume_falls_back() {
1510        let mock = MockWatcher {
1511            full: Mutex::new(Some(vec![put("a.x", b"1", 7)])),
1512            from: Mutex::new(Some(vec![])),
1513            from_expires: true,
1514            hold: false,
1515        };
1516        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1517        let ab = Arc::clone(&applied_batches);
1518        let (_sd_tx, sd_rx) = watch::channel(false);
1519
1520        let cursor = watch_applied(
1521            Arc::new(mock),
1522            WatchScope::Prefixes(vec!["a.".to_string()]),
1523            Some(WatchCursor::from_u64(3)),
1524            None, // reader (no resync in this test)
1525            None::<AppendLogSnapshot>,
1526            None,
1527            BatchConfig::default(),
1528            parse_put,
1529            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1530            move |_| {},
1531            sd_rx,
1532        )
1533        .await
1534        .unwrap();
1535
1536        assert_eq!(cursor.as_u64(), Some(7));
1537        assert_eq!(*applied_batches.lock().unwrap(), vec![b"1".to_vec()]);
1538    }
1539
1540    /// End-to-end with a real snapshot file: after the run, the persisted
1541    /// snapshot's cursor equals the applied cursor and its entries match the
1542    /// applied state — proving the checkpoint is written at the post-apply
1543    /// cursor, never ahead of it.
1544    #[tokio::test]
1545    async fn snapshot_checkpoint_matches_applied_cursor() {
1546        let dir = tempfile::TempDir::new().unwrap();
1547        let path = dir.path().join("applied.snap");
1548        let (_resume, store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1549
1550        let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
1551        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1552        let (_sd_tx, sd_rx) = watch::channel(false);
1553
1554        let cursor = watch_applied(
1555            watcher,
1556            WatchScope::All,
1557            None,
1558            None, // reader (no resync in this test)
1559            Some(store),
1560            None,
1561            BatchConfig::default(),
1562            parse_put,
1563            move |_batch: Vec<Vec<u8>>| {},
1564            move |_| {},
1565            sd_rx,
1566        )
1567        .await
1568        .unwrap();
1569
1570        assert_eq!(cursor.as_u64(), Some(2));
1571
1572        let snap = crate::snapshot::load(&path).unwrap().unwrap();
1573        assert_eq!(
1574            snap.cursor.as_u64(),
1575            cursor.as_u64(),
1576            "snapshot checkpoint cursor must equal the applied cursor"
1577        );
1578        assert_eq!(snap.entries.len(), 2);
1579        assert_eq!(snap.entries["node.a"].value, b"1");
1580        assert_eq!(snap.entries["node.b"].value, b"2");
1581    }
1582
1583    /// Happy-path resume: a non-expired cursor takes the `*_from` path and the
1584    /// delta (the `from` script, NOT the full set) is applied. Proves the
1585    /// resume branch delivers only post-cursor updates and advances to their
1586    /// max revision.
1587    #[tokio::test]
1588    async fn resume_from_cursor_delivers_only_delta() {
1589        let mock = MockWatcher {
1590            // `full` would be delivered only if the resume path were (wrongly)
1591            // bypassed; a non-empty distinguishing value makes that visible.
1592            full: Mutex::new(Some(vec![put("full.x", b"FULL", 1)])),
1593            from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
1594            from_expires: false,
1595            hold: false,
1596        };
1597        let watcher = Arc::new(mock);
1598
1599        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1600        let ab = Arc::clone(&applied_batches);
1601        let (_sd_tx, sd_rx) = watch::channel(false);
1602
1603        let cursor = watch_applied(
1604            watcher,
1605            WatchScope::All,
1606            Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
1607            None,                           // reader (no resync in this test)
1608            None::<AppendLogSnapshot>,
1609            None,
1610            BatchConfig::default(),
1611            parse_put,
1612            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1613            move |_| {},
1614            sd_rx,
1615        )
1616        .await
1617        .unwrap();
1618
1619        assert_eq!(
1620            cursor.as_u64(),
1621            Some(11),
1622            "cursor advances to the delta max"
1623        );
1624        assert_eq!(
1625            *applied_batches.lock().unwrap(),
1626            vec![b"3".to_vec(), b"4".to_vec()],
1627            "only the post-cursor delta is applied, never the full set"
1628        );
1629    }
1630
1631    /// `WatchScope::Prefix` with no resume dispatches to `watch_prefix` and
1632    /// applies the delivered updates. Every other test uses `WatchScope::All`;
1633    /// this covers the prefix dispatch arm.
1634    #[tokio::test]
1635    async fn prefix_scope_applies_delivered_updates() {
1636        let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
1637        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1638
1639        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1640        let ab = Arc::clone(&applied_batches);
1641        let (_sd_tx, sd_rx) = watch::channel(false);
1642
1643        let cursor = watch_applied(
1644            watcher,
1645            WatchScope::Prefix("node.".to_string()),
1646            None,
1647            None, // reader (no resync in this test)
1648            None::<AppendLogSnapshot>,
1649            None,
1650            BatchConfig::default(),
1651            parse_put,
1652            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1653            move |_| {},
1654            sd_rx,
1655        )
1656        .await
1657        .unwrap();
1658
1659        assert_eq!(cursor.as_u64(), Some(2));
1660        assert_eq!(
1661            *applied_batches.lock().unwrap(),
1662            vec![b"1".to_vec(), b"2".to_vec()]
1663        );
1664    }
1665
1666    /// `WatchScope::Prefix` happy-path resume: a non-expired cursor takes the
1667    /// `watch_prefix_from` path and only the delta is applied — the prefix
1668    /// twin of `resume_from_cursor_delivers_only_delta`.
1669    #[tokio::test]
1670    async fn prefix_resume_from_cursor_delivers_only_delta() {
1671        let mock = MockWatcher {
1672            // `full` would be delivered only if the resume path were (wrongly)
1673            // bypassed; a distinguishing value makes that visible.
1674            full: Mutex::new(Some(vec![put("node.x", b"FULL", 1)])),
1675            from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
1676            from_expires: false,
1677            hold: false,
1678        };
1679        let watcher = Arc::new(mock);
1680
1681        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1682        let ab = Arc::clone(&applied_batches);
1683        let (_sd_tx, sd_rx) = watch::channel(false);
1684
1685        let cursor = watch_applied(
1686            watcher,
1687            WatchScope::Prefix("node.".to_string()),
1688            Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
1689            None,                           // reader (no resync in this test)
1690            None::<AppendLogSnapshot>,
1691            None,
1692            BatchConfig::default(),
1693            parse_put,
1694            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1695            move |_| {},
1696            sd_rx,
1697        )
1698        .await
1699        .unwrap();
1700
1701        assert_eq!(
1702            cursor.as_u64(),
1703            Some(11),
1704            "cursor advances to the delta max"
1705        );
1706        assert_eq!(
1707            *applied_batches.lock().unwrap(),
1708            vec![b"3".to_vec(), b"4".to_vec()],
1709            "only the post-cursor delta is applied via watch_prefix_from"
1710        );
1711    }
1712
1713    /// `WatchScope::Prefix` resume whose cursor has expired falls back to the
1714    /// full `watch_prefix` and still applies the delivered updates — the prefix
1715    /// twin of `cursor_expired_falls_back_to_full_watch`.
1716    #[tokio::test]
1717    async fn prefix_cursor_expired_falls_back_to_full_prefix_watch() {
1718        let mock = MockWatcher {
1719            full: Mutex::new(Some(vec![put("node.a", b"1", 10), put("node.b", b"2", 11)])),
1720            from: Mutex::new(Some(vec![])),
1721            from_expires: true,
1722            hold: false,
1723        };
1724        let watcher = Arc::new(mock);
1725
1726        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1727        let ab = Arc::clone(&applied_batches);
1728        let (_sd_tx, sd_rx) = watch::channel(false);
1729
1730        let cursor = watch_applied(
1731            watcher,
1732            WatchScope::Prefix("node.".to_string()),
1733            Some(WatchCursor::from_u64(5)), // resume position that "expired"
1734            None,                           // reader (no resync in this test)
1735            None::<AppendLogSnapshot>,
1736            None,
1737            BatchConfig::default(),
1738            parse_put,
1739            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1740            move |_| {},
1741            sd_rx,
1742        )
1743        .await
1744        .unwrap();
1745
1746        assert_eq!(cursor.as_u64(), Some(11));
1747        assert_eq!(
1748            *applied_batches.lock().unwrap(),
1749            vec![b"1".to_vec(), b"2".to_vec()],
1750            "prefix fallback full watch's updates were applied"
1751        );
1752    }
1753
1754    /// The watch task's terminal error must propagate out of `watch_applied`
1755    /// rather than being swallowed as `Ok(applied)` when the channel closes.
1756    #[tokio::test]
1757    async fn watch_task_error_propagates() {
1758        let watcher = Arc::new(ErrorWatcher);
1759        let (_sd_tx, sd_rx) = watch::channel(false);
1760
1761        let result = watch_applied(
1762            watcher,
1763            WatchScope::All,
1764            None,
1765            None, // reader (no resync in this test)
1766            None::<AppendLogSnapshot>,
1767            None,
1768            BatchConfig::default(),
1769            parse_put,
1770            move |_batch: Vec<Vec<u8>>| {},
1771            move |_| {},
1772            sd_rx,
1773        )
1774        .await;
1775
1776        match result {
1777            Err(KvError::WatchError(msg)) => {
1778                assert!(msg.contains("injected"), "error carries the cause: {msg}");
1779            }
1780            other => panic!("expected WatchError, got {other:?}"),
1781        }
1782    }
1783
1784    /// A batch where `parse` accepts some updates and rejects others: the cursor
1785    /// must still advance to the highest *received* revision (covering the
1786    /// rejected entry in the middle), while `apply` sees only the accepted ones.
1787    #[tokio::test]
1788    async fn mixed_parse_advances_cursor_over_rejected_entries() {
1789        let updates = vec![
1790            put("keep.a", b"1", 5),
1791            put("skip.b", b"2", 6), // rejected by parse
1792            put("keep.c", b"3", 7),
1793        ];
1794        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1795
1796        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1797        let on_applied_max = Arc::new(AtomicU64::new(0));
1798        let ab = Arc::clone(&applied_batches);
1799        let om = Arc::clone(&on_applied_max);
1800        let (_sd_tx, sd_rx) = watch::channel(false);
1801
1802        let cursor = watch_applied(
1803            watcher,
1804            WatchScope::All,
1805            None,
1806            None, // reader (no resync in this test)
1807            None::<AppendLogSnapshot>,
1808            None,
1809            BatchConfig::default(),
1810            // Keep only keys under "keep."; reject everything else.
1811            |u: &KvUpdate| -> Option<Vec<u8>> {
1812                match u {
1813                    KvUpdate::Put(e) if e.key.starts_with("keep.") => Some(e.value.clone()),
1814                    _ => None,
1815                }
1816            },
1817            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1818            move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1819            sd_rx,
1820        )
1821        .await
1822        .unwrap();
1823
1824        assert_eq!(
1825            cursor.as_u64(),
1826            Some(7),
1827            "cursor covers the rejected middle entry (rev 6)"
1828        );
1829        assert_eq!(
1830            *applied_batches.lock().unwrap(),
1831            vec![b"1".to_vec(), b"3".to_vec()],
1832            "apply sees only the accepted entries"
1833        );
1834        assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1835    }
1836
1837    /// Shutdown before any update arrives: nothing was received, so the cursor
1838    /// stays at the resume position (here `none()`), `apply` never runs, and
1839    /// `on_applied` never fires.
1840    #[tokio::test(start_paused = true)]
1841    async fn shutdown_with_no_pending_batch() {
1842        let watcher = Arc::new(MockWatcher::new(vec![], true)); // deliver nothing, hold open
1843
1844        let apply_calls = Arc::new(AtomicU64::new(0));
1845        let on_applied_calls = Arc::new(AtomicU64::new(0));
1846        let ac = Arc::clone(&apply_calls);
1847        let oc = Arc::clone(&on_applied_calls);
1848        let (sd_tx, sd_rx) = watch::channel(false);
1849
1850        let task = tokio::spawn(watch_applied(
1851            watcher,
1852            WatchScope::All,
1853            None,
1854            None, // reader (no resync in this test)
1855            None::<AppendLogSnapshot>,
1856            None,
1857            BatchConfig::default(),
1858            parse_put,
1859            move |_batch: Vec<Vec<u8>>| {
1860                ac.fetch_add(1, Ordering::SeqCst);
1861            },
1862            move |_| {
1863                oc.fetch_add(1, Ordering::SeqCst);
1864            },
1865            sd_rx,
1866        ));
1867
1868        // Let the watcher attach and idle (it has nothing to deliver), then shut down.
1869        tokio::time::sleep(Duration::from_millis(1)).await;
1870        sd_tx.send(true).unwrap();
1871
1872        let cursor = task.await.unwrap().unwrap();
1873        assert_eq!(
1874            cursor.as_u64(),
1875            None,
1876            "no updates received → cursor unmoved"
1877        );
1878        assert_eq!(apply_calls.load(Ordering::SeqCst), 0, "apply never runs");
1879        assert_eq!(
1880            on_applied_calls.load(Ordering::SeqCst),
1881            0,
1882            "on_applied never fires"
1883        );
1884    }
1885
1886    /// An [`ExportRequest`] flushes the pending batch first, so the artifact's
1887    /// cursor is exactly the applied cursor — and the artifact is importable
1888    /// with the batched entries in it.
1889    #[tokio::test(start_paused = true)]
1890    async fn export_request_flushes_pending_batch_first() {
1891        let dir = tempfile::TempDir::new().unwrap();
1892        let store_path = dir.path().join("fold.snap");
1893        let artifact = dir.path().join("artifact");
1894        let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1895
1896        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1897        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1898        let (sd_tx, sd_rx) = watch::channel(false);
1899        let (ex_tx, ex_rx) = mpsc::channel(1);
1900
1901        let task = tokio::spawn(watch_applied(
1902            watcher,
1903            WatchScope::All,
1904            None,
1905            None, // reader (no resync in this test)
1906            Some(store),
1907            Some(ex_rx),
1908            BatchConfig {
1909                window: Duration::from_secs(3600), // window never fires
1910                max: 100,
1911                ..BatchConfig::default()
1912            },
1913            parse_put,
1914            move |_batch: Vec<Vec<u8>>| {},
1915            move |_| {},
1916            sd_rx,
1917        ));
1918
1919        // Let both updates land in the (unflushed) pending batch, then export.
1920        tokio::time::sleep(Duration::from_millis(1)).await;
1921        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1922        ex_tx
1923            .send(ExportRequest {
1924                dest_dir: artifact.clone(),
1925                reply: reply_tx,
1926            })
1927            .await
1928            .unwrap();
1929
1930        let manifest = reply_rx.await.unwrap().expect("export succeeds");
1931        assert_eq!(
1932            manifest.cursor.as_u64(),
1933            Some(2),
1934            "pending batch flushed before export: artifact cursor is the applied cursor"
1935        );
1936
1937        // The artifact is importable and holds both batched entries.
1938        let (cursor, imported) =
1939            AppendLogSnapshot::import(&artifact, &dir.path().join("imported.snap"), u64::MAX)
1940                .unwrap();
1941        assert_eq!(cursor.as_u64(), Some(2));
1942        assert_eq!(imported.get("a").unwrap().unwrap().value, b"1");
1943        assert_eq!(imported.get("b").unwrap().unwrap().value, b"2");
1944
1945        sd_tx.send(true).unwrap();
1946        task.await.unwrap().unwrap();
1947    }
1948
1949    /// An [`ExportRequest`] that arrives with NOTHING pending (the window
1950    /// already flushed everything) still produces a valid artifact whose
1951    /// cursor is the applied cursor. The flush-before-export step must be a
1952    /// clean no-op, not an error or a cursor regression.
1953    #[tokio::test(start_paused = true)]
1954    async fn export_with_empty_pending_batch_succeeds() {
1955        let dir = tempfile::TempDir::new().unwrap();
1956        let store_path = dir.path().join("fold.snap");
1957        let artifact = dir.path().join("artifact");
1958        let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1959
1960        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1961        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1962        let (sd_tx, sd_rx) = watch::channel(false);
1963        let (ex_tx, ex_rx) = mpsc::channel(1);
1964
1965        let task = tokio::spawn(watch_applied(
1966            watcher,
1967            WatchScope::All,
1968            None,
1969            None, // reader (no resync in this test)
1970            Some(store),
1971            Some(ex_rx),
1972            BatchConfig::default(), // 10 ms window
1973            parse_put,
1974            move |_batch: Vec<Vec<u8>>| {},
1975            move |_| {},
1976            sd_rx,
1977        ));
1978
1979        // Let the window flush both updates, so the export request finds an
1980        // EMPTY pending batch.
1981        tokio::time::sleep(Duration::from_millis(50)).await;
1982
1983        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1984        ex_tx
1985            .send(ExportRequest {
1986                dest_dir: artifact.clone(),
1987                reply: reply_tx,
1988            })
1989            .await
1990            .unwrap();
1991        let manifest = reply_rx
1992            .await
1993            .unwrap()
1994            .expect("export succeeds with nothing pending");
1995        assert_eq!(
1996            manifest.cursor.as_u64(),
1997            Some(2),
1998            "artifact cursor is the applied cursor, unchanged by the no-op flush"
1999        );
2000
2001        // The artifact is importable and holds the already-flushed entries.
2002        let (cursor, imported) =
2003            AppendLogSnapshot::import(&artifact, &dir.path().join("imported.snap"), u64::MAX)
2004                .unwrap();
2005        assert_eq!(cursor.as_u64(), Some(2));
2006        assert_eq!(imported.get("a").unwrap().unwrap().value, b"1");
2007        assert_eq!(imported.get("b").unwrap().unwrap().value, b"2");
2008
2009        sd_tx.send(true).unwrap();
2010        task.await.unwrap().unwrap();
2011    }
2012
2013    /// An export request against a store-less watch replies with an error and
2014    /// the watch keeps running.
2015    #[tokio::test(start_paused = true)]
2016    async fn export_without_store_replies_error() {
2017        let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
2018        let (sd_tx, sd_rx) = watch::channel(false);
2019        let (ex_tx, ex_rx) = mpsc::channel(1);
2020
2021        let task = tokio::spawn(watch_applied(
2022            watcher,
2023            WatchScope::All,
2024            None,
2025            None, // reader (no resync in this test)
2026            None::<AppendLogSnapshot>,
2027            Some(ex_rx),
2028            BatchConfig::default(),
2029            parse_put,
2030            move |_batch: Vec<Vec<u8>>| {},
2031            move |_| {},
2032            sd_rx,
2033        ));
2034
2035        tokio::time::sleep(Duration::from_millis(1)).await;
2036        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
2037        ex_tx
2038            .send(ExportRequest {
2039                dest_dir: std::env::temp_dir().join("never-created"),
2040                reply: reply_tx,
2041            })
2042            .await
2043            .unwrap();
2044        assert!(
2045            reply_rx.await.unwrap().is_err(),
2046            "no store → export errors via the reply"
2047        );
2048
2049        // The watch is still alive and returns its applied cursor on shutdown.
2050        sd_tx.send(true).unwrap();
2051        let cursor = task.await.unwrap().unwrap();
2052        assert_eq!(cursor.as_u64(), Some(1));
2053    }
2054
2055    /// An export failure (unavailable destination) is reported on the reply and
2056    /// the watch keeps applying later updates.
2057    #[tokio::test(start_paused = true)]
2058    async fn export_error_does_not_kill_watch() {
2059        let dir = tempfile::TempDir::new().unwrap();
2060        let store_path = dir.path().join("fold.snap");
2061        let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
2062
2063        // Occupied destination → export fails.
2064        let occupied = dir.path().join("occupied");
2065        std::fs::create_dir(&occupied).unwrap();
2066        std::fs::write(occupied.join("stray"), b"x").unwrap();
2067
2068        let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
2069        let (sd_tx, sd_rx) = watch::channel(false);
2070        let (ex_tx, ex_rx) = mpsc::channel(1);
2071
2072        let applied = Arc::new(AtomicU64::new(0));
2073        let a = Arc::clone(&applied);
2074
2075        let task = tokio::spawn(watch_applied(
2076            watcher,
2077            WatchScope::All,
2078            None,
2079            None, // reader (no resync in this test)
2080            Some(store),
2081            Some(ex_rx),
2082            BatchConfig::default(),
2083            parse_put,
2084            move |_batch: Vec<Vec<u8>>| {},
2085            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
2086            sd_rx,
2087        ));
2088
2089        tokio::time::sleep(Duration::from_millis(1)).await;
2090        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
2091        ex_tx
2092            .send(ExportRequest {
2093                dest_dir: occupied,
2094                reply: reply_tx,
2095            })
2096            .await
2097            .unwrap();
2098        match reply_rx.await.unwrap() {
2099            Err(crate::snapshot::SnapshotError::ArtifactInvalid(_)) => {}
2100            other => panic!("expected ArtifactInvalid, got {other:?}"),
2101        }
2102
2103        // Watch still folds: a clean shutdown returns the applied cursor.
2104        sd_tx.send(true).unwrap();
2105        let cursor = task.await.unwrap().unwrap();
2106        assert_eq!(cursor.as_u64(), Some(1), "watch survived the failed export");
2107        assert_eq!(applied.load(Ordering::SeqCst), 1);
2108    }
2109
2110    /// Dropping the export sender disarms the arm; the loop keeps batching and
2111    /// flushing normally.
2112    #[tokio::test(start_paused = true)]
2113    async fn export_sender_dropped_disarms_channel() {
2114        let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
2115        let (sd_tx, sd_rx) = watch::channel(false);
2116        let (ex_tx, ex_rx) = mpsc::channel::<ExportRequest>(1);
2117
2118        let applied = Arc::new(AtomicU64::new(0));
2119        let a = Arc::clone(&applied);
2120
2121        let task = tokio::spawn(watch_applied(
2122            watcher,
2123            WatchScope::All,
2124            None,
2125            None, // reader (no resync in this test)
2126            None::<AppendLogSnapshot>,
2127            Some(ex_rx),
2128            BatchConfig::default(),
2129            parse_put,
2130            move |_batch: Vec<Vec<u8>>| {},
2131            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
2132            sd_rx,
2133        ));
2134
2135        drop(ex_tx); // disarm
2136        tokio::time::sleep(Duration::from_millis(50)).await;
2137        assert_eq!(
2138            applied.load(Ordering::SeqCst),
2139            1,
2140            "loop keeps flushing after the export sender is gone"
2141        );
2142
2143        sd_tx.send(true).unwrap();
2144        task.await.unwrap().unwrap();
2145    }
2146
2147    /// With a low `compact_threshold`, the flush path's `spawn_blocking`
2148    /// compaction actually fires (every other snapshot test pins the threshold
2149    /// at `u64::MAX`, leaving that branch dead). After a compacting run the
2150    /// snapshot must still load cleanly with the right cursor and entries.
2151    #[tokio::test]
2152    async fn snapshot_compaction_fires_and_stays_consistent() {
2153        let dir = tempfile::TempDir::new().unwrap();
2154        let path = dir.path().join("applied.snap");
2155        // threshold 0 → every checkpoint reports "needs compact", forcing the
2156        // store's inline-compaction branch on each flush (run off the hot path via
2157        // spawn_blocking inside watch_applied).
2158        let (_resume, store) = AppendLogSnapshot::open(&path, 0).unwrap();
2159
2160        // Re-put the same key across flushes so compaction has duplicates to
2161        // dedup; small max forces multiple flushes (hence multiple compactions).
2162        let updates = vec![
2163            put("node.a", b"1", 1),
2164            put("node.a", b"2", 2),
2165            put("node.b", b"3", 3),
2166            put("node.a", b"4", 4),
2167        ];
2168        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
2169        let (_sd_tx, sd_rx) = watch::channel(false);
2170
2171        let cursor = watch_applied(
2172            watcher,
2173            WatchScope::All,
2174            None,
2175            None, // reader (no resync in this test)
2176            Some(store),
2177            None,
2178            BatchConfig {
2179                window: Duration::from_secs(3600),
2180                max: 1, // one update per flush → a compaction per update
2181                ..BatchConfig::default()
2182            },
2183            parse_put,
2184            move |_batch: Vec<Vec<u8>>| {},
2185            move |_| {},
2186            sd_rx,
2187        )
2188        .await
2189        .unwrap();
2190
2191        assert_eq!(cursor.as_u64(), Some(4));
2192
2193        let snap = crate::snapshot::load(&path).unwrap().unwrap();
2194        assert_eq!(
2195            snap.cursor.as_u64(),
2196            cursor.as_u64(),
2197            "compacted snapshot's cursor still equals the applied cursor"
2198        );
2199        assert_eq!(snap.entries.len(), 2, "duplicates of node.a deduped");
2200        assert_eq!(
2201            snap.entries["node.a"].value, b"4",
2202            "last write per key survives compaction"
2203        );
2204        assert_eq!(snap.entries["node.b"].value, b"3");
2205    }
2206    /// A SnapshotStore whose FIRST apply fails (transient store error: disk
2207    /// pressure, lock timeout), then behaves normally — the trigger for the
2208    /// lost-raw-batch hazard in the flush path.
2209    struct FailOnceStore {
2210        inner: AppendLogSnapshot,
2211        failed: std::sync::atomic::AtomicBool,
2212    }
2213
2214    impl crate::snapshot::SnapshotStore for FailOnceStore {
2215        fn load(
2216            _path: &std::path::Path,
2217        ) -> Result<(WatchCursor, Self), crate::snapshot::SnapshotError> {
2218            unreachable!("test store is constructed directly")
2219        }
2220        fn apply(
2221            &mut self,
2222            batch: &[KvUpdate],
2223            cursor: &WatchCursor,
2224        ) -> Result<(), crate::snapshot::SnapshotError> {
2225            if !self.failed.swap(true, Ordering::SeqCst) {
2226                return Err(crate::snapshot::SnapshotError::Backend(
2227                    "injected transient store failure".into(),
2228                ));
2229            }
2230            self.inner.apply(batch, cursor)
2231        }
2232        fn get(&self, key: &str) -> Result<Option<KvEntry>, crate::snapshot::SnapshotError> {
2233            self.inner.get(key)
2234        }
2235        fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, crate::snapshot::SnapshotError> {
2236            self.inner.range(prefix)
2237        }
2238        fn cursor(&self) -> WatchCursor {
2239            self.inner.cursor()
2240        }
2241        fn export_to(
2242            &mut self,
2243            dest_dir: &std::path::Path,
2244        ) -> Result<crate::artifact::ExportManifest, crate::snapshot::SnapshotError> {
2245            self.inner.export_to(dest_dir)
2246        }
2247    }
2248
2249    /// CURSOR AUTHORITY under a transient store failure: a failed store apply
2250    /// must NOT cause later successful applies to advance the persisted
2251    /// cursor past data that never landed. The failed batch is re-queued and
2252    /// committed cumulatively with the next flush, so the store's cursor
2253    /// never lies about its contents — a restart resuming from it sees
2254    /// exactly the missing tail, not a silent hole.
2255    ///
2256    /// (Pre-fix behavior, found while writing the watch_applied model: the
2257    /// failed batch's raw updates were dropped on the warn-and-continue
2258    /// path, and the NEXT successful flush committed only newer updates
2259    /// under the newest cursor — a permanent, restart-surviving gap in the
2260    /// fold.)
2261    #[tokio::test(start_paused = true)]
2262    async fn transient_store_failure_never_leaves_a_cursor_gap() {
2263        let dir = tempfile::TempDir::new().unwrap();
2264        let path = dir.path().join("fold.snap");
2265        let (_r, inner) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
2266        let store = FailOnceStore {
2267            inner,
2268            failed: std::sync::atomic::AtomicBool::new(false),
2269        };
2270
2271        // max: 1 -> one flush per update: flush #1 (a@1) hits the injected
2272        // failure, flush #2 (b@2) succeeds.
2273        let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
2274        let watcher = Arc::new(MockWatcher::new(updates, true));
2275        let (sd_tx, sd_rx) = watch::channel(false);
2276
2277        let task = tokio::spawn(watch_applied(
2278            watcher,
2279            WatchScope::All,
2280            None,
2281            None,
2282            Some(store),
2283            None,
2284            BatchConfig {
2285                window: Duration::from_millis(1),
2286                max: 1,
2287                ..BatchConfig::default()
2288            },
2289            parse_put,
2290            |_batch: Vec<Vec<u8>>| {},
2291            |_| {},
2292            sd_rx,
2293        ));
2294
2295        tokio::time::sleep(Duration::from_millis(50)).await;
2296        sd_tx.send(true).unwrap();
2297        let cursor = task.await.unwrap().unwrap();
2298        assert_eq!(cursor.as_u64(), Some(2));
2299
2300        // The store on disk must be SELF-CONSISTENT: whatever its cursor
2301        // claims, the data at or below it is present. With the re-queue fix
2302        // the cumulative commit lands both keys at cursor 2.
2303        let (persisted, reopened) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
2304        assert_eq!(persisted.as_u64(), Some(2), "cursor reached the head");
2305        assert_eq!(
2306            reopened.get("node.a").unwrap().map(|e| e.value),
2307            Some(b"1".to_vec()),
2308            "the transiently-failed batch was re-queued, not silently dropped \
2309             behind an advancing cursor"
2310        );
2311        assert_eq!(
2312            reopened.get("node.b").unwrap().map(|e| e.value),
2313            Some(b"2".to_vec())
2314        );
2315    }
2316    /// A reader whose live-key listing always fails — the resync's I/O
2317    /// failure mode.
2318    struct FailingReader;
2319
2320    #[async_trait]
2321    impl KvReader for FailingReader {
2322        async fn get(&self, _key: &str) -> Result<Option<KvEntry>, KvError> {
2323            unreachable!("resync only lists keys")
2324        }
2325        async fn entry(&self, _key: &str) -> Result<Option<KvEntry>, KvError> {
2326            unreachable!("resync only lists keys")
2327        }
2328        async fn keys(&self, _prefix: &str) -> Result<Vec<String>, KvError> {
2329            Err(KvError::OperationFailed("injected listing failure".into()))
2330        }
2331        async fn scan(&self, _prefix: &str) -> Result<Vec<KvEntry>, KvError> {
2332            unreachable!("resync only lists keys")
2333        }
2334    }
2335
2336    /// REGRESSION PIN (code-level twin of tests/model.rs's Degrade
2337    /// configuration): a resync whose live-key listing fails must FAIL THE
2338    /// WATCH, not degrade to re-list-only with a warning — the degrade
2339    /// semantics provably break the convergence theorem (silent stale keys).
2340    /// Reverting `resync_stale_keys` to warn-and-continue fails this test.
2341    #[tokio::test]
2342    async fn resync_listing_failure_is_fatal_not_degraded() {
2343        let dir = tempfile::TempDir::new().unwrap();
2344        let path = dir.path().join("resync-fatal.snap");
2345        let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
2346        store
2347            .apply(&[put("node.a", b"1", 1)], &WatchCursor::from_u64(1))
2348            .unwrap();
2349
2350        // Resume cursor expired -> resync path -> reader listing fails.
2351        let mock = MockWatcher {
2352            full: Mutex::new(Some(vec![])),
2353            from: Mutex::new(Some(vec![])),
2354            from_expires: true,
2355            hold: false,
2356        };
2357        let (_sd_tx, sd_rx) = watch::channel(false);
2358        let err = watch_applied(
2359            Arc::new(mock),
2360            WatchScope::All,
2361            Some(WatchCursor::from_u64(1)),
2362            Some(Arc::new(FailingReader) as Arc<dyn KvReader>),
2363            Some(store),
2364            None,
2365            BatchConfig::default(),
2366            parse_put,
2367            |_batch: Vec<Vec<u8>>| {},
2368            |_| {},
2369            sd_rx,
2370        )
2371        .await
2372        .expect_err("a failed resync listing must fail the watch");
2373        assert!(
2374            err.to_string().contains("resync failed listing live keys"),
2375            "{err}"
2376        );
2377    }
2378}