Skip to main content

slipstream/
applied.rs

1//! Cursor-after-apply watch combinator.
2//!
3//! [`watch_applied`] drives a [`KvWatcher`], batches incoming [`KvUpdate`]s over
4//! a short window (or a max count), hands each batch to a caller-supplied
5//! `apply` closure, and **only then** advances the resume cursor, checkpoints
6//! the snapshot, and fires `on_applied`. It encodes one discipline that every
7//! hand-rolled watch loop in the wider system gets subtly wrong:
8//!
9//! > **INVARIANT.** A persisted/reported cursor `C` implies every update with
10//! > revision ≤ `C` has been *applied* — the caller's `apply()` has returned for
11//! > it. The cursor never advances on *receipt* of an update, only after it has
12//! > durably taken effect.
13//!
14//! ## Why receipt is the wrong signal
15//!
16//! The tempting shortcut is to bump the cursor as each update arrives off the
17//! channel (`high_water = rev` on `rx.recv()`), then apply the batch later. On a
18//! crash between those two steps the persisted cursor claims "caught up to rev
19//! N" while rev N is still sitting in an unapplied buffer. On resume the watch
20//! starts *past* rev N and silently skips it — a correctness hole in the exact
21//! "resume after any restart" guarantee this crate advertises.
22//!
23//! Saltzer, Reed & Clark's *End-to-End Arguments in System Design* (1984) names
24//! the fix: a function placed below the endpoints (here, the channel receive)
25//! can only be a performance hint; the *endpoint* — the application of the
26//! update — is the only place the "it happened" guarantee can actually be
27//! established. So the cursor is written from `apply()`'s completion, not from
28//! the transport's delivery.
29//!
30//! The cursor-as-monotonic-index-into-a-log shape itself follows HashiCorp
31//! Consul's anti-entropy / blocking-query lineage: a client holds the last index
32//! it has *reconciled* and re-arms the watch from there, never from the index it
33//! merely *saw*.
34//!
35//! ## What the caller supplies
36//!
37//! - `parse`: maps a raw [`KvUpdate`] to an optional domain value `U`. Returning
38//!   `None` (corrupt bytes, irrelevant key) is fine — the update is still
39//!   *received*, so it still counts toward the cursor; there is simply nothing to
40//!   apply for it.
41//! - `apply`: consumes a `Vec<U>` in revision order. This is the only domain
42//!   logic; for the tunnel router it swaps the route table, for the edge origin
43//!   watcher it rebuilds the hashrings.
44//! - `on_applied`: fires once per flush, *after* `apply` returns, with the new
45//!   applied cursor. Callers use it to persist the cursor for the next restart.
46//!
47//! ## Panics
48//!
49//! `apply` runs inline on the watch task. If it panics, the panic propagates out
50//! of [`watch_applied`] and aborts the watch — that is the caller's contract,
51//! the same as a panic in any other supplied closure.
52
53use std::path::PathBuf;
54use std::sync::Arc;
55use std::time::Duration;
56
57use tokio::sync::mpsc;
58use tokio::sync::{oneshot, watch};
59use tracing::warn;
60
61use crate::artifact::ExportManifest;
62use crate::kv::{KvError, KvReader, KvUpdate, KvWatcher, WatchCursor};
63use crate::snapshot::{SnapshotError, SnapshotStore};
64
65/// A request, sent into a running [`watch_applied`] loop, to export the fold it
66/// owns (see [`SnapshotStore::export_to`]).
67///
68/// `watch_applied` takes its snapshot store **by value**, so a consumer that
69/// wants periodic artifacts of a live fold cannot call `export_to` itself. It
70/// instead passes an `mpsc::Receiver<ExportRequest>` to [`watch_applied`] and
71/// sends requests through the paired sender. The loop handles a request
72/// between batch flushes — after flushing any pending batch — so the artifact's
73/// embedded cursor is exactly the applied cursor at the moment of export.
74///
75/// The export result (or error) comes back on `reply`; an export failure is
76/// reported there and the watch keeps running (the snapshot is a cache — a
77/// failed artifact is the requester's problem, not the fold's).
78pub struct ExportRequest {
79    /// Where the artifact directory will be created. Must not exist (or be an
80    /// empty directory); same filesystem as the fold for cheap hardlinks.
81    pub dest_dir: PathBuf,
82    /// Receives the sealed manifest on success. A dropped receiver is ignored.
83    pub reply: oneshot::Sender<Result<ExportManifest, SnapshotError>>,
84}
85
86/// What to watch: every key, every key under a prefix, or the union of several
87/// prefixes.
88///
89/// Mirrors the [`KvWatcher`] surface — `All` maps to `watch_all` /
90/// `watch_all_from`, `Prefix` to `watch_prefix` / `watch_prefix_from`,
91/// `Prefixes` to `watch_prefixes` / `watch_prefixes_from` (one multi-filter
92/// consumer for the whole union, never one consumer per prefix).
93#[derive(Debug, Clone)]
94pub enum WatchScope {
95    /// Watch all keys in the bucket.
96    All,
97    /// Watch only keys beginning with this prefix.
98    Prefix(String),
99    /// Watch keys beginning with ANY of these prefixes, on a single consumer.
100    Prefixes(Vec<String>),
101}
102
103impl WatchScope {
104    /// The scope as a list of key prefixes (`All` = the empty prefix), for
105    /// callers that enumerate scope contents (live listings, fold ranges).
106    fn prefixes(&self) -> Vec<String> {
107        match self {
108            WatchScope::All => vec![String::new()],
109            WatchScope::Prefix(p) => vec![p.clone()],
110            WatchScope::Prefixes(ps) => ps.clone(),
111        }
112    }
113}
114
115/// Internal: a cursor-expired resync handoff from the watch task to the main
116/// loop. Carries the bucket's live key listing for the watch scope; the main
117/// loop diffs it against the fold and applies synthetic deletes for keys that
118/// vanished during the gap, then acks so the watch task can start the fallback
119/// watch — the ack ordering guarantees every synthetic delete is applied before
120/// the first re-list put arrives.
121struct ResyncRequest {
122    live_keys: Vec<String>,
123    ack: oneshot::Sender<()>,
124}
125
126/// Internal: what the watch task needs to initiate a resync — the reader that
127/// lists live keys and the channel into the main loop that owns the fold.
128type ResyncHandle = (Arc<dyn KvReader>, mpsc::Sender<ResyncRequest>);
129
130/// Batching policy for [`watch_applied`].
131///
132/// A flush fires when **either** bound is hit, whichever comes first: `window`
133/// time has elapsed since the batch opened, or `max` updates have accumulated.
134/// The window amortizes the cost of `apply` (e.g. one route-table clone per
135/// flush instead of one per update); `max` caps memory and latency when updates
136/// arrive faster than the window.
137#[derive(Debug, Clone, Copy)]
138pub struct BatchConfig {
139    /// Maximum time a batch stays open before being flushed.
140    pub window: Duration,
141    /// Maximum number of parsed updates in a batch before forcing a flush.
142    pub max: usize,
143}
144
145impl Default for BatchConfig {
146    /// 10 ms / 100 updates — the de-facto default every hand-rolled caller
147    /// already used, lifted into one place.
148    fn default() -> Self {
149        Self {
150            window: Duration::from_millis(10),
151            max: 100,
152        }
153    }
154}
155
156/// Drive a watch with cursor-after-apply semantics.
157///
158/// Subscribes per `scope` (resuming from `resume` when it carries a position),
159/// batches updates per `config`, applies each batch via `apply`, and only then
160/// advances the cursor / folds the batch into `store` / calls `on_applied`.
161/// Returns the final applied cursor when the watch ends (shutdown signalled, or
162/// the underlying stream closed).
163///
164/// `store` is any [`SnapshotStore`] backend the consumer chose (the in-RAM
165/// [`AppendLogSnapshot`](crate::AppendLogSnapshot) default, an on-disk backend, or
166/// its own impl) — or `None` to run without persistence. On each flush, *after*
167/// `apply` returns, the whole batch of raw [`KvUpdate`]s is handed to
168/// `store.apply(batch, applied_cursor)` on a blocking task, so the store's
169/// persisted cursor is always the post-apply cursor and never names a revision
170/// whose `apply` had not returned. The store fold is atomic (data + cursor), so a
171/// crash leaves the store consistent and resume re-folds only the tail.
172///
173/// # Cursor expiry and stale-key resync
174///
175/// On [`KvError::CursorExpired`] from the `*_from` resume path, this logs and
176/// falls back to a full-scope watch (`watch_all` / `watch_prefix` /
177/// `watch_prefixes`), whose state-sync re-list re-delivers the current value of
178/// every in-scope key as puts. The re-list alone cannot cover keys that were
179/// **deleted during the gap** and whose delete markers the backend has since
180/// evicted — they simply don't appear, leaving the fold (and the caller's
181/// domain state) holding them forever.
182///
183/// When both `reader` and `store` are provided, the expiry path closes that
184/// hole: before the fallback watch starts, the bucket's live keys are listed
185/// via `reader`, diffed against the fold's in-scope keys, and a synthetic
186/// [`KvUpdate::Delete`] (with an unknown [`VersionToken`](crate::VersionToken)) is run through
187/// `parse`/`apply`/`store` for each key that vanished. The synthetic deletes
188/// are strictly ordered before the first re-list put, so a key deleted and
189/// re-created during the gap converges correctly. Without a `reader` (or
190/// without a `store` to diff against) the fallback is re-list-only and a
191/// warning marks the possible stale keys.
192///
193/// A resync that was armed but FAILS (live-key listing or fold diff error) is
194/// fatal to the watch — degrading to re-list-only would silently leave
195/// deleted keys in the fold (`tests/model.rs` proves that divergence
196/// reachable), so the error surfaces and the caller's restart retries the
197/// resume → expiry → resync path from scratch.
198///
199/// See `ARCHITECTURE.md` ("Applied-Cursor Watch") for the invariant and its
200/// rationale.
201///
202/// # Type parameters
203/// - `U`: the caller's domain update type, produced by `parse` and consumed by
204///   `apply`.
205// This combinator takes each of its dependencies as a parameter so every
206// caller-supplied closure (`parse`/`apply`/`on_applied`) keeps its own distinct
207// type and is monomorphized at the call site. Folding them into a builder struct
208// would either box the closures or force a single generic bundle, losing that.
209#[allow(clippy::too_many_arguments)]
210// The flush macro resets `batch_high`/`batch_deadline` for the next loop
211// iteration. At the two flush sites that return immediately afterward (shutdown,
212// channel-close) those resets are dead stores — correct, but flagged. The allow
213// must sit on the function: a statement-scoped `#[allow]` inside the macro body
214// trips the experimental attributes-on-expressions gate (E0658) on stable.
215#[allow(unused_assignments)]
216pub async fn watch_applied<U, S, P, A, O>(
217    watcher: Arc<dyn KvWatcher>,
218    scope: WatchScope,
219    resume: Option<WatchCursor>,
220    // `Some(reader)` arms the cursor-expired stale-key resync (see the function
221    // docs); `None` keeps the re-list-only fallback. Only consulted on expiry —
222    // the hot path never touches it.
223    reader: Option<Arc<dyn KvReader>>,
224    mut store: Option<S>,
225    // `Some(rx)` arms an export-request arm in the select loop: each
226    // [`ExportRequest`] is handled between flushes (pending batch flushed
227    // first), so the exported artifact's cursor is the applied cursor (or,
228    // across a transiently failed store flush, the store's own lagging but
229    // self-consistent cursor — never a cursor past unfolded data).
230    // `None` (or dropping the paired sender) leaves the loop's behavior
231    // unchanged.
232    mut exports: Option<mpsc::Receiver<ExportRequest>>,
233    config: BatchConfig,
234    mut parse: P,
235    mut apply: A,
236    mut on_applied: O,
237    mut shutdown: watch::Receiver<bool>,
238) -> Result<WatchCursor, KvError>
239where
240    U: Send,
241    // `Send + 'static`: each flush moves `store` onto a blocking task to run its
242    // (potentially blocking) `apply`, then takes it back — the same offload the
243    // append log's compaction always used.
244    S: SnapshotStore + Send + 'static,
245    P: FnMut(&KvUpdate) -> Option<U> + Send,
246    A: FnMut(Vec<U>) + Send,
247    O: FnMut(WatchCursor) + Send,
248{
249    // The cursor we'll return. Initialized from the resume position so that a
250    // watch which receives nothing new still reports the position it resumed
251    // from as "applied" (it is — everything up to it was applied before the last
252    // run persisted it).
253    let mut applied = match &resume {
254        Some(c) => c.clone(),
255        None => WatchCursor::none(),
256    };
257
258    // The scope's prefixes, for the resync diff against the fold. Cloned out
259    // before `scope` moves into the watch task.
260    let scope_prefixes = scope.prefixes();
261
262    // Cursor-expired resync channel, armed only when there is a reader to list
263    // live keys AND a store to diff them against. The watch task sends the live
264    // listing here and waits for the ack before starting the fallback watch, so
265    // synthetic deletes always precede the re-list.
266    let (resync_pair, mut resyncs): (Option<ResyncHandle>, Option<mpsc::Receiver<ResyncRequest>>) =
267        match reader {
268            Some(reader) if store.is_some() => {
269                let (rs_tx, rs_rx) = mpsc::channel(1);
270                (Some((reader, rs_tx)), Some(rs_rx))
271            }
272            _ => (None, None),
273        };
274
275    // Spawn the watch task. It owns the cursor-expired fallback so the main loop
276    // only ever sees a clean ordered stream of updates on `rx`.
277    let (tx, mut rx) = mpsc::channel::<KvUpdate>(256);
278    let handle = {
279        let watcher = Arc::clone(&watcher);
280        tokio::spawn(
281            async move { run_watch(watcher.as_ref(), &scope, resume, resync_pair, tx).await },
282        )
283    };
284
285    // Batch state.
286    //
287    // `batch_high` tracks the version of the most recently *received* update
288    // since the last flush — including updates `parse` rejected. NATS delivers
289    // in revision order, so the last received is the highest, and advancing the
290    // cursor to it after a single atomic `apply` is correct: having seen the max
291    // means we've seen everything below it, and a rejected entry is still
292    // "nothing to apply", hence covered. Reset to `none()` after every flush.
293    // Pre-size to the flush bound so no batch ever re-climbs the reallocation
294    // ladder; `max(1)` only guards a nonsensical `max = 0` config.
295    let batch_cap = config.max.max(1);
296    let mut batch: Vec<U> = Vec::with_capacity(batch_cap);
297    // Raw received updates for the durable `store`, in revision order. Only
298    // populated when a `store` is present; the store folds the *raw* updates
299    // (including ones `parse` rejected — they are still part of the bucket's
300    // state), whereas the parsed `batch` above is the consumer's domain view.
301    let mut raw_batch: Vec<KvUpdate> = Vec::new();
302    let mut batch_high = WatchCursor::none();
303    // Consecutive store-apply failures. A transient failure re-queues its raw
304    // batch (cursor authority: the store's cursor and contents advance
305    // together, always); a persistent streak fail-stops before the re-queued
306    // backlog grows without bound.
307    const MAX_STORE_APPLY_FAILURES: u32 = 16;
308    let mut store_fail_streak: u32 = 0;
309    // `Some` once a batch has opened and the window timer is armed; `None`
310    // between flushes. Only the armed/idle distinction is read in the loop —
311    // the absolute instant lives in the pinned `sleep` future below.
312    let mut batch_deadline: Option<tokio::time::Instant> = None;
313
314    // Flush the current batch, in order: run the domain `apply` (if non-empty) to
315    // completion, advance the cursor, fold the raw batch + cursor durably into
316    // `store`, then fire `on_applied`. The store fold runs on a blocking task
317    // (its `apply` may block on I/O), moving the store in and taking it back — the
318    // same offload the append log's compaction always used. A TRANSIENT store
319    // error re-queues the raw batch for cumulative commit on the next flush
320    // (the watch continues; the store's cursor never advances past data it
321    // dropped) and a persistent failure streak is fatal; a panicked
322    // blocking task drops the store irrecoverably, which breaks the
323    // resume-after-restart guarantee, so it is surfaced as fatal.
324    macro_rules! flush {
325        () => {{
326            // Nothing received since the last flush → nothing to do at all.
327            // (`raw_batch` can be non-empty with no cursor advance only via the
328            // resync path's synthetic deletes, which carry no revision.)
329            if !batch.is_empty() || !raw_batch.is_empty() || !batch_high.is_none() {
330                if !batch.is_empty() {
331                    // INVARIANT: apply() runs and RETURNS before any cursor
332                    // advance below. Move the batch out so a panicking apply
333                    // can't leave half-consumed state behind.
334                    //
335                    // `replace` (not `take`) leaves a pre-sized Vec behind so each
336                    // batch after the first doesn't re-climb the reallocation
337                    // ladder (4→8→…→cap).
338                    apply(std::mem::replace(&mut batch, Vec::with_capacity(batch_cap)));
339                }
340                let advanced = !batch_high.is_none();
341                if advanced {
342                    applied = batch_high.clone();
343                }
344                if !raw_batch.is_empty()
345                    && let Some(mut st) = store.take()
346                {
347                    let raw = std::mem::take(&mut raw_batch);
348                    // Fold at the post-advance cursor. A synthetic-deletes-only
349                    // batch leaves the cursor where it was (the deletes are a
350                    // state correction, not log entries), which is safe: an
351                    // unchanged — possibly expired — cursor only ever re-runs
352                    // this same resync on the next restart.
353                    let cur = applied.clone();
354                    // Hand the store AND the raw batch back on a clean return:
355                    // a *failed* apply (Ok(Err)) RE-QUEUES the batch so the
356                    // next flush commits it cumulatively — the store's cursor
357                    // and contents always advance together. Dropping the
358                    // failed batch instead lets the NEXT successful flush
359                    // advance the cursor over a hole that survives every
360                    // restart (reproduced by
361                    // `transient_store_failure_never_leaves_a_cursor_gap`).
362                    // Only a *panicked* task (Err) loses the store: fatal.
363                    match tokio::task::spawn_blocking(move || {
364                        let res = st.apply(&raw, &cur);
365                        (st, raw, res)
366                    })
367                    .await
368                    {
369                        Ok((st, _raw, Ok(()))) => {
370                            store = Some(st);
371                            store_fail_streak = 0;
372                        }
373                        Ok((st, raw, Err(e))) => {
374                            store_fail_streak += 1;
375                            if store_fail_streak >= MAX_STORE_APPLY_FAILURES {
376                                // A persistently failing store would otherwise
377                                // grow the re-queued batch without bound while
378                                // the fold silently stales. Fail-stop: the
379                                // restart refolds the tail from the store's
380                                // last good cursor.
381                                warn!(error = %e, streak = store_fail_streak,
382                                    "snapshot store apply failing persistently; aborting watch");
383                                handle.abort();
384                                return Err(KvError::WatchError(format!(
385                                    "snapshot store apply failed {store_fail_streak} consecutive times: {e}"
386                                )));
387                            }
388                            warn!(error = %e, streak = store_fail_streak,
389                                "snapshot store apply failed; batch re-queued for the next flush");
390                            store = Some(st);
391                            // Prepend: the failed range precedes anything
392                            // received since (stream order is preserved for
393                            // the eventual cumulative commit).
394                            let newer = std::mem::replace(&mut raw_batch, raw);
395                            raw_batch.extend(newer);
396                        }
397                        Err(e) => {
398                            warn!(error = %e, "snapshot store task panicked; aborting watch");
399                            handle.abort();
400                            return Err(KvError::WatchError(format!(
401                                "snapshot store task panicked: {e}"
402                            )));
403                        }
404                    }
405                }
406                if advanced {
407                    on_applied(applied.clone());
408                    batch_high = WatchCursor::none();
409                }
410            }
411            batch_deadline = None;
412        }};
413    }
414
415    // A single timer future, reset in place each time a batch opens. The old
416    // `tokio::time::sleep(timeout)` lived inside the select arm, so it was
417    // re-created on every loop iteration — one Arc-backed timer-wheel entry
418    // allocated, registered, and immediately dropped per received update.
419    // Pinning one future and `reset`-ing it reuses that single allocation; the
420    // `if batch_deadline.is_some()` guard keeps it from firing while idle, so
421    // its initial already-elapsed deadline is never observed.
422    let sleep = tokio::time::sleep(Duration::ZERO);
423    tokio::pin!(sleep);
424
425    loop {
426        tokio::select! {
427            biased;
428
429            // Shutdown wins: flush whatever is batched (so the cursor reflects
430            // it), abandon any updates still in flight on the channel — they
431            // weren't applied, the cursor doesn't claim them, and they'll be
432            // re-delivered on the next resume — and return the applied cursor.
433            res = shutdown.changed() => {
434                if res.is_err() || *shutdown.borrow() {
435                    flush!();
436                    handle.abort();
437                    // Observe the task's terminal state. An abort surfaces as a
438                    // cancelled JoinError, which we ignore; a genuine panic that
439                    // raced ahead of the abort is logged rather than silently lost.
440                    if let Err(join) = handle.await
441                        && !join.is_cancelled()
442                    {
443                        warn!(error = %join, "watch task panicked at shutdown");
444                    }
445                    return Ok(applied);
446                }
447            }
448
449            // Batch window elapsed.
450            () = &mut sleep, if batch_deadline.is_some() => {
451                flush!();
452            }
453
454            // Cursor-expired resync. Placed before `rx.recv()` (biased) so the
455            // synthetic deletes are folded before any update the fallback watch
456            // delivers — though the ack protocol already guarantees the fallback
457            // hasn't started while this arm runs. Diff the fold's in-scope keys
458            // against the bucket's live listing; anything the fold holds that
459            // the bucket no longer does vanished during the gap (its delete
460            // marker evicted with the cursor), so synthesize the delete the
461            // re-list can't deliver.
462            req = async { resyncs.as_mut().expect("arm guarded by is_some").recv().await },
463                if resyncs.is_some() => {
464                match req {
465                    Some(ResyncRequest { live_keys, ack }) => {
466                        // Flush first so the diff runs against a fold that
467                        // reflects everything received so far.
468                        flush!();
469                        let live: std::collections::HashSet<&str> =
470                            live_keys.iter().map(String::as_str).collect();
471                        let mut stale: Vec<String> = Vec::new();
472                        if let Some(st) = &store {
473                            for prefix in &scope_prefixes {
474                                match st.range(prefix) {
475                                    Ok(entries) => stale.extend(
476                                        entries
477                                            .into_iter()
478                                            .filter(|e| !live.contains(e.key.as_str()))
479                                            .map(|e| e.key),
480                                    ),
481                                    Err(e) => {
482                                        // FATAL, not a degrade: an incomplete
483                                        // diff silently leaves deleted keys in
484                                        // the fold forever (tests/model.rs
485                                        // proves the divergence reachable
486                                        // under degrade semantics). Fail the
487                                        // watch; the restart re-runs the
488                                        // resume → expiry → resync from
489                                        // scratch.
490                                        warn!(error = %e, prefix = %prefix,
491                                            "resync fold range failed; aborting watch rather than diverging");
492                                        handle.abort();
493                                        return Err(KvError::WatchError(format!(
494                                            "cursor-expired resync failed listing fold prefix {prefix:?}: {e}"
495                                        )));
496                                    }
497                                }
498                            }
499                        }
500                        // Overlapping prefixes can list a key twice.
501                        stale.sort_unstable();
502                        stale.dedup();
503                        if !stale.is_empty() {
504                            warn!(stale = stale.len(), "cursor-expired resync: deleting keys that vanished during the gap");
505                        }
506                        for key in stale {
507                            // Synthetic: carries no revision (unknown version)
508                            // and so never advances the cursor.
509                            let u = KvUpdate::Delete {
510                                key,
511                                version: crate::kv::VersionToken::unknown(),
512                            };
513                            if store.is_some() {
514                                raw_batch.push(u.clone());
515                            }
516                            if let Some(parsed) = parse(&u) {
517                                batch.push(parsed);
518                            }
519                        }
520                        flush!();
521                        // Ack AFTER the deletes are applied: the watch task is
522                        // holding the fallback watch until it hears back, which
523                        // is what orders deletes before the re-list
524                        // (tests/model_resync_order.rs proves the barrier
525                        // load-bearing). If the flush's STORE apply failed
526                        // transiently, the deletes sit re-queued at the FRONT
527                        // of the raw batch — still strictly before any
528                        // re-list put in the eventual cumulative commit, and
529                        // the domain apply saw them before this ack either
530                        // way.
531                        let _ = ack.send(());
532                    }
533                    None => resyncs = None,
534                }
535            }
536
537            // Export request. Placed after shutdown/window (they stay prompt)
538            // and before `rx.recv()` so a firehose of updates cannot starve an
539            // export indefinitely. The pending batch is flushed first, so the
540            // exported cursor is exactly the applied cursor — except when
541            // that flush's store apply transiently failed (batch re-queued):
542            // the export then captures the store's OWN lagging cursor, which
543            // is still self-consistent with its contents (cursor authority,
544            // tests/model_applied.rs); the artifact never includes unfolded
545            // data, and a bootstrap from it simply replays the short gap.
546            // The export itself runs on a blocking task with the store moved
547            // in and taken back — the same offload the flush path uses.
548            req = async { exports.as_mut().expect("arm guarded by is_some").recv().await },
549                if exports.is_some() => {
550                match req {
551                    Some(ExportRequest { dest_dir, reply }) => {
552                        flush!();
553                        match store.take() {
554                            Some(mut st) => {
555                                match tokio::task::spawn_blocking(move || {
556                                    let res = st.export_to(&dest_dir);
557                                    (st, res)
558                                })
559                                .await
560                                {
561                                    // Hand the store back on any clean return; an
562                                    // export failure goes to the requester only —
563                                    // the watch keeps running (the snapshot is a
564                                    // cache). A panicked task lost the store,
565                                    // which breaks the resume guarantee: fatal,
566                                    // same as the flush path's apply panic.
567                                    Ok((st, res)) => {
568                                        store = Some(st);
569                                        let _ = reply.send(res);
570                                    }
571                                    Err(e) => {
572                                        warn!(error = %e, "snapshot export task panicked; aborting watch");
573                                        handle.abort();
574                                        return Err(KvError::WatchError(format!(
575                                            "snapshot export task panicked: {e}"
576                                        )));
577                                    }
578                                }
579                            }
580                            None => {
581                                let _ = reply.send(Err(SnapshotError::Backend(
582                                    "watch_applied runs without a snapshot store; nothing to export"
583                                        .into(),
584                                )));
585                            }
586                        }
587                    }
588                    // Sender dropped: disarm the arm for the rest of the run.
589                    None => exports = None,
590                }
591            }
592
593            update = rx.recv() => {
594                match update {
595                    Some(u) => {
596                        // Cursor authority: every received update bumps the
597                        // pending high-water, regardless of whether `parse`
598                        // keeps it.
599                        batch_high = WatchCursor::from_version(u.version().clone());
600
601                        // Buffer the raw update for the durable store fold (which
602                        // commits the whole batch + cursor atomically on flush).
603                        // Done before `parse` consumes `u` by reference, and only
604                        // when a store is present so the no-persistence path keeps
605                        // its zero-copy cost.
606                        if store.is_some() {
607                            raw_batch.push(u.clone());
608                        }
609
610                        if let Some(parsed) = parse(&u) {
611                            batch.push(parsed);
612                        }
613
614                        // Arm the window on the first received update of a batch
615                        // — even a parse-rejected one, so the cursor advances
616                        // within `window` even through a run of irrelevant keys.
617                        // Reset the pinned timer to the new deadline rather than
618                        // allocating a fresh `Sleep`.
619                        if batch_deadline.is_none() {
620                            let deadline = tokio::time::Instant::now() + config.window;
621                            sleep.as_mut().reset(deadline);
622                            batch_deadline = Some(deadline);
623                        }
624
625                        // Flush on a full parsed batch, or — when persisting — a
626                        // full raw batch, so a window packed with parse-rejected
627                        // updates can't grow `raw_batch` without bound before the
628                        // window elapses.
629                        if batch.len() >= config.max || raw_batch.len() >= config.max {
630                            flush!();
631                        }
632                    }
633                    None => {
634                        // Stream closed. Flush the remainder, then surface the
635                        // watch task's terminal result: a clean end returns the
636                        // applied cursor, an error propagates.
637                        flush!();
638                        return match handle.await {
639                            Ok(Ok(())) => Ok(applied),
640                            Ok(Err(e)) => Err(e),
641                            Err(join) => Err(KvError::WatchError(format!(
642                                "watch task panicked: {join}"
643                            ))),
644                        };
645                    }
646                }
647            }
648        }
649    }
650}
651
652/// Run the underlying watch for `scope`, resuming from `resume` when it carries
653/// a position, with the [`KvError::CursorExpired`] → resync + full-watch
654/// fallback.
655async fn run_watch(
656    watcher: &dyn KvWatcher,
657    scope: &WatchScope,
658    resume: Option<WatchCursor>,
659    resync: Option<ResyncHandle>,
660    tx: mpsc::Sender<KvUpdate>,
661) -> Result<(), KvError> {
662    // Resume only when the cursor carries a real position; an absent or `none()`
663    // cursor falls through to a full watch. Binding `cursor` here makes "we have a
664    // resume position" structural — there is no separate bool whose truth a later
665    // edit could let drift from the `Some`.
666    let resume_cursor = resume.filter(|c| !c.is_none());
667
668    match scope {
669        WatchScope::All => {
670            if let Some(cursor) = resume_cursor {
671                match watcher.watch_all_from(&cursor, tx.clone()).await {
672                    Err(KvError::CursorExpired) => {
673                        warn!(
674                            "watch cursor expired; resyncing, then falling back to full watch_all"
675                        );
676                        resync_stale_keys(scope, &resync).await?;
677                        watcher.watch_all(tx).await
678                    }
679                    other => other,
680                }
681            } else {
682                watcher.watch_all(tx).await
683            }
684        }
685        WatchScope::Prefix(prefix) => {
686            if let Some(cursor) = resume_cursor {
687                match watcher.watch_prefix_from(prefix, &cursor, tx.clone()).await {
688                    Err(KvError::CursorExpired) => {
689                        warn!(
690                            "watch cursor expired; resyncing, then falling back to full watch_prefix"
691                        );
692                        resync_stale_keys(scope, &resync).await?;
693                        watcher.watch_prefix(prefix, tx).await
694                    }
695                    other => other,
696                }
697            } else {
698                watcher.watch_prefix(prefix, tx).await
699            }
700        }
701        WatchScope::Prefixes(prefixes) => {
702            let refs: Vec<&str> = prefixes.iter().map(String::as_str).collect();
703            if let Some(cursor) = resume_cursor {
704                match watcher
705                    .watch_prefixes_from(&refs, &cursor, tx.clone())
706                    .await
707                {
708                    Err(KvError::CursorExpired) => {
709                        warn!(
710                            "watch cursor expired; resyncing, then falling back to full watch_prefixes"
711                        );
712                        resync_stale_keys(scope, &resync).await?;
713                        watcher.watch_prefixes(&refs, tx).await
714                    }
715                    other => other,
716                }
717            } else {
718                watcher.watch_prefixes(&refs, tx).await
719            }
720        }
721    }
722}
723
724/// Cursor-expired stale-key resync, run BEFORE the fallback watch is
725/// established: list the scope's live keys, hand them to the main loop (which
726/// diffs them against the fold and applies synthetic deletes), and wait for the
727/// ack. That ordering — deletes applied, then fallback watch armed — is what
728/// makes a delete-then-recreate during the gap converge: the synthetic delete
729/// always lands before the re-list put.
730///
731/// With no reader/store wired (`resync` is `None`) the caller explicitly opted
732/// out: warn and fall back re-list-only (keys deleted during the gap stay in
733/// the fold — `tests/model.rs` pins this divergence as reachable).
734///
735/// A FAILED listing, by contrast, is **fatal** — it fails the watch rather
736/// than degrading. The resync is load-bearing for the "stale, never corrupt"
737/// convergence guarantee: a silently degraded resync leaves the fold holding
738/// keys the bucket deleted, with one warn line as the only witness
739/// (`tests/model.rs` proves this divergence reachable under degrade
740/// semantics). Failing the watch turns the violated guarantee into a visible
741/// error; the caller's restart re-resumes, hits `CursorExpired` again, and
742/// retries the resync from scratch.
743async fn resync_stale_keys(
744    scope: &WatchScope,
745    resync: &Option<ResyncHandle>,
746) -> Result<(), KvError> {
747    let Some((reader, resync_tx)) = resync else {
748        warn!(
749            "no reader wired for cursor-expired resync; keys deleted during the gap may persist in the fold"
750        );
751        return Ok(());
752    };
753    let mut live_keys = Vec::new();
754    for prefix in scope.prefixes() {
755        match reader.keys(&prefix).await {
756            Ok(keys) => live_keys.extend(keys),
757            Err(e) => {
758                return Err(KvError::WatchError(format!(
759                    "cursor-expired resync failed listing live keys under {prefix:?}: {e}; \
760                     failing the watch rather than silently keeping stale keys"
761                )));
762            }
763        }
764    }
765    let (ack_tx, ack_rx) = oneshot::channel();
766    if resync_tx
767        .send(ResyncRequest {
768            live_keys,
769            ack: ack_tx,
770        })
771        .await
772        .is_ok()
773    {
774        // A dropped ack (main loop shutting down) just means the fallback watch
775        // is about to die with it; nothing to recover.
776        let _ = ack_rx.await;
777    }
778    Ok(())
779}
780
781#[cfg(test)]
782mod tests {
783    use super::*;
784    use crate::kv::{KvEntry, VersionToken};
785    use crate::snapshot::AppendLogSnapshot;
786    use async_trait::async_trait;
787    use std::sync::Mutex;
788    use std::sync::atomic::{AtomicU64, Ordering};
789    use tokio::sync::mpsc::Sender;
790
791    fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
792        KvUpdate::Put(KvEntry {
793            key: key.to_string(),
794            value: value.to_vec(),
795            version: VersionToken::from_u64(rev),
796        })
797    }
798
799    /// A scripted watcher. Delivers a pre-set list of updates through the
800    /// channel, then either holds the channel open (so window/max/shutdown
801    /// flushes can be exercised without the stream ending) or returns cleanly
802    /// (so channel-close flushing can be exercised).
803    struct MockWatcher {
804        full: Mutex<Option<Vec<KvUpdate>>>,
805        from: Mutex<Option<Vec<KvUpdate>>>,
806        from_expires: bool,
807        hold: bool,
808    }
809
810    impl MockWatcher {
811        fn new(updates: Vec<KvUpdate>, hold: bool) -> Self {
812            Self {
813                full: Mutex::new(Some(updates)),
814                from: Mutex::new(None),
815                from_expires: false,
816                hold,
817            }
818        }
819
820        async fn deliver(&self, which: &Mutex<Option<Vec<KvUpdate>>>, tx: Sender<KvUpdate>) {
821            let updates = which.lock().unwrap().take().unwrap_or_default();
822            for u in updates {
823                if tx.send(u).await.is_err() {
824                    return;
825                }
826            }
827            if self.hold {
828                // Keep `tx` alive (channel open) until this task is aborted.
829                std::future::pending::<()>().await;
830            }
831        }
832    }
833
834    #[async_trait]
835    impl KvWatcher for MockWatcher {
836        async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
837            self.deliver(&self.full, tx).await;
838            Ok(())
839        }
840
841        async fn watch_prefix(&self, _prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
842            self.deliver(&self.full, tx).await;
843            Ok(())
844        }
845
846        async fn watch_prefixes(
847            &self,
848            _prefixes: &[&str],
849            tx: Sender<KvUpdate>,
850        ) -> Result<(), KvError> {
851            // This mock scripts the applied-watch resumption tests, not prefix
852            // filtering; it delivers the same `full` script as `watch_prefix`.
853            // The real multi-filter scoping is proved in the NATS integration test.
854            self.deliver(&self.full, tx).await;
855            Ok(())
856        }
857
858        async fn watch_all_from(
859            &self,
860            _cursor: &WatchCursor,
861            tx: Sender<KvUpdate>,
862        ) -> Result<(), KvError> {
863            if self.from_expires {
864                return Err(KvError::CursorExpired);
865            }
866            self.deliver(&self.from, tx).await;
867            Ok(())
868        }
869
870        // Mirror watch_all_from so the prefix resume / expiry arms of run_watch
871        // are exercised against the same `from` script. Without this the trait's
872        // default impl would delegate to watch_prefix and silently deliver the
873        // full set instead of the delta.
874        async fn watch_prefix_from(
875            &self,
876            _prefix: &str,
877            _cursor: &WatchCursor,
878            tx: Sender<KvUpdate>,
879        ) -> Result<(), KvError> {
880            if self.from_expires {
881                return Err(KvError::CursorExpired);
882            }
883            self.deliver(&self.from, tx).await;
884            Ok(())
885        }
886
887        // Same mirroring for the multi-prefix resume arm.
888        async fn watch_prefixes_from(
889            &self,
890            _prefixes: &[&str],
891            _cursor: &WatchCursor,
892            tx: Sender<KvUpdate>,
893        ) -> Result<(), KvError> {
894            if self.from_expires {
895                return Err(KvError::CursorExpired);
896            }
897            self.deliver(&self.from, tx).await;
898            Ok(())
899        }
900    }
901
902    /// A reader whose `keys()` serves a scripted live listing — the only call
903    /// the cursor-expired resync makes. Filters by prefix like a real backend
904    /// so prefix-scoped resyncs are exercised faithfully.
905    struct MockReader {
906        live: Vec<String>,
907    }
908
909    #[async_trait]
910    impl KvReader for MockReader {
911        async fn get(&self, _key: &str) -> Result<Option<KvEntry>, KvError> {
912            unreachable!("resync only lists keys")
913        }
914
915        async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
916            Ok(self
917                .live
918                .iter()
919                .filter(|k| k.starts_with(prefix))
920                .cloned()
921                .collect())
922        }
923
924        async fn scan(&self, _prefix: &str) -> Result<Vec<KvEntry>, KvError> {
925            unreachable!("resync only lists keys")
926        }
927    }
928
929    /// A watcher whose entry points all fail. Used to prove the watch task's
930    /// terminal error is surfaced out of `watch_applied` rather than swallowed
931    /// as a clean `Ok(applied)` when the channel closes.
932    struct ErrorWatcher;
933
934    #[async_trait]
935    impl KvWatcher for ErrorWatcher {
936        async fn watch_all(&self, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
937            Err(KvError::WatchError("injected watch failure".into()))
938        }
939
940        async fn watch_prefix(&self, _prefix: &str, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
941            Err(KvError::WatchError("injected watch failure".into()))
942        }
943
944        async fn watch_prefixes(
945            &self,
946            _prefixes: &[&str],
947            _tx: Sender<KvUpdate>,
948        ) -> Result<(), KvError> {
949            Err(KvError::WatchError("injected watch failure".into()))
950        }
951    }
952
953    // A no-op parse that keeps every Put as the value bytes; drops deletes.
954    fn parse_put(u: &KvUpdate) -> Option<Vec<u8>> {
955        match u {
956            KvUpdate::Put(e) => Some(e.value.clone()),
957            _ => None,
958        }
959    }
960
961    /// The stream closes (hold = false) with a pending batch; the remainder is
962    /// flushed before returning, the returned cursor is the last revision, and
963    /// `on_applied` ran exactly once after `apply`.
964    #[tokio::test]
965    async fn flush_on_channel_close() {
966        let updates = vec![put("a", b"1", 1), put("b", b"2", 2), put("c", b"3", 3)];
967        let watcher = Arc::new(MockWatcher::new(updates, false));
968
969        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<Vec<u8>>>::new()));
970        let on_applied_cursors = Arc::new(Mutex::new(Vec::<u64>::new()));
971
972        let ab = Arc::clone(&applied_batches);
973        let oc = Arc::clone(&on_applied_cursors);
974        let (_sd_tx, sd_rx) = watch::channel(false);
975
976        let cursor = watch_applied(
977            watcher,
978            WatchScope::All,
979            None,
980            None, // reader (no resync in this test)
981            None::<AppendLogSnapshot>,
982            None,
983            BatchConfig::default(),
984            parse_put,
985            move |batch| ab.lock().unwrap().push(batch),
986            move |c| oc.lock().unwrap().push(c.as_u64().unwrap()),
987            sd_rx,
988        )
989        .await
990        .unwrap();
991
992        assert_eq!(cursor.as_u64(), Some(3));
993        let batches = applied_batches.lock().unwrap();
994        let flat: Vec<Vec<u8>> = batches.iter().flatten().cloned().collect();
995        assert_eq!(flat, vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec()]);
996        assert_eq!(*on_applied_cursors.lock().unwrap().last().unwrap(), 3);
997    }
998
999    /// Fewer than `max` updates, then the channel idles: the window timer must
1000    /// flush them and advance the cursor.
1001    #[tokio::test(start_paused = true)]
1002    async fn flush_on_window() {
1003        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1004        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1005
1006        let applied = Arc::new(AtomicU64::new(0));
1007        let count = Arc::new(AtomicU64::new(0));
1008        let a = Arc::clone(&applied);
1009        let c = Arc::clone(&count);
1010        let (sd_tx, sd_rx) = watch::channel(false);
1011
1012        let task = tokio::spawn(watch_applied(
1013            watcher,
1014            WatchScope::All,
1015            None,
1016            None, // reader (no resync in this test)
1017            None::<AppendLogSnapshot>,
1018            None,
1019            BatchConfig::default(),
1020            parse_put,
1021            move |batch: Vec<Vec<u8>>| {
1022                c.fetch_add(batch.len() as u64, Ordering::SeqCst);
1023            },
1024            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1025            sd_rx,
1026        ));
1027
1028        // Let the window (10ms) elapse under virtual time.
1029        tokio::time::sleep(Duration::from_millis(50)).await;
1030        assert_eq!(
1031            count.load(Ordering::SeqCst),
1032            2,
1033            "window should have flushed"
1034        );
1035        assert_eq!(applied.load(Ordering::SeqCst), 2);
1036
1037        sd_tx.send(true).unwrap();
1038        let cursor = task.await.unwrap().unwrap();
1039        assert_eq!(cursor.as_u64(), Some(2));
1040    }
1041
1042    /// Exactly `max` updates fills a batch and flushes immediately — before the
1043    /// window would have elapsed.
1044    #[tokio::test(start_paused = true)]
1045    async fn flush_on_max() {
1046        let max = 4;
1047        let updates: Vec<_> = (1..=max as u64)
1048            .map(|i| put(&format!("k{i}"), b"v", i))
1049            .collect();
1050        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1051
1052        let flushes = Arc::new(Mutex::new(Vec::<usize>::new()));
1053        let f = Arc::clone(&flushes);
1054        let (sd_tx, sd_rx) = watch::channel(false);
1055
1056        let task = tokio::spawn(watch_applied(
1057            watcher,
1058            WatchScope::All,
1059            None,
1060            None, // reader (no resync in this test)
1061            None::<AppendLogSnapshot>,
1062            None,
1063            BatchConfig {
1064                window: Duration::from_secs(3600), // effectively never
1065                max,
1066            },
1067            parse_put,
1068            move |batch: Vec<Vec<u8>>| f.lock().unwrap().push(batch.len()),
1069            move |_| {},
1070            sd_rx,
1071        ));
1072
1073        // Yield enough for the mock to push all `max` updates; the window is an
1074        // hour, so any flush is purely the max trigger.
1075        tokio::time::sleep(Duration::from_millis(1)).await;
1076        assert_eq!(
1077            *flushes.lock().unwrap(),
1078            vec![max],
1079            "a full batch should flush on max, not wait for the window"
1080        );
1081
1082        sd_tx.send(true).unwrap();
1083        task.await.unwrap().unwrap();
1084    }
1085
1086    /// A pending batch plus a shutdown signal: the batch is flushed and the
1087    /// applied cursor returned.
1088    #[tokio::test(start_paused = true)]
1089    async fn flush_on_shutdown() {
1090        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1091        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1092
1093        let applied = Arc::new(AtomicU64::new(0));
1094        let a = Arc::clone(&applied);
1095        let (sd_tx, sd_rx) = watch::channel(false);
1096
1097        let task = tokio::spawn(watch_applied(
1098            watcher,
1099            WatchScope::All,
1100            None,
1101            None, // reader (no resync in this test)
1102            None::<AppendLogSnapshot>,
1103            None,
1104            BatchConfig {
1105                window: Duration::from_secs(3600), // window won't fire
1106                max: 100,
1107            },
1108            parse_put,
1109            move |_batch: Vec<Vec<u8>>| {},
1110            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1111            sd_rx,
1112        ));
1113
1114        // Give the mock time to deliver both updates into the pending batch.
1115        tokio::time::sleep(Duration::from_millis(1)).await;
1116        sd_tx.send(true).unwrap();
1117
1118        let cursor = task.await.unwrap().unwrap();
1119        assert_eq!(
1120            cursor.as_u64(),
1121            Some(2),
1122            "shutdown flushes the pending batch"
1123        );
1124        assert_eq!(applied.load(Ordering::SeqCst), 2);
1125    }
1126
1127    /// The cursor must not advance until `apply` has returned. We prove it by
1128    /// having `apply` read the cursor that `on_applied` last published: when the
1129    /// second batch is applied, the visible cursor must still be the *first*
1130    /// batch's — never the second's, which only becomes visible after this
1131    /// `apply` returns.
1132    #[tokio::test(start_paused = true)]
1133    async fn cursor_advances_only_after_apply() {
1134        // Two batches of `max` updates each.
1135        let max = 2usize;
1136        let updates: Vec<_> = (1..=4u64).map(|i| put(&format!("k{i}"), b"v", i)).collect();
1137        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1138
1139        // Cursor as last published by on_applied; starts at 0 (nothing applied).
1140        let published = Arc::new(AtomicU64::new(0));
1141        // What `apply` observed as the published cursor at the moment it ran.
1142        let seen_at_apply = Arc::new(Mutex::new(Vec::<u64>::new()));
1143
1144        let pub_for_apply = Arc::clone(&published);
1145        let seen = Arc::clone(&seen_at_apply);
1146        let pub_for_on = Arc::clone(&published);
1147        let (sd_tx, sd_rx) = watch::channel(false);
1148
1149        let task = tokio::spawn(watch_applied(
1150            watcher,
1151            WatchScope::All,
1152            None,
1153            None, // reader (no resync in this test)
1154            None::<AppendLogSnapshot>,
1155            None,
1156            BatchConfig {
1157                window: Duration::from_secs(3600),
1158                max,
1159            },
1160            parse_put,
1161            move |_batch: Vec<Vec<u8>>| {
1162                // The cursor visible here is whatever the PREVIOUS flush
1163                // published — never this batch's, because we haven't returned.
1164                seen.lock()
1165                    .unwrap()
1166                    .push(pub_for_apply.load(Ordering::SeqCst));
1167            },
1168            move |cur| pub_for_on.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1169            sd_rx,
1170        ));
1171
1172        tokio::time::sleep(Duration::from_millis(1)).await;
1173        sd_tx.send(true).unwrap();
1174        task.await.unwrap().unwrap();
1175
1176        // First apply saw 0 (nothing applied yet); second apply saw 2 (first
1177        // batch's cursor), NOT 4. The cursor only reached 4 after the second
1178        // apply returned.
1179        assert_eq!(*seen_at_apply.lock().unwrap(), vec![0, 2]);
1180        assert_eq!(published.load(Ordering::SeqCst), 4);
1181    }
1182
1183    /// Updates whose `parse` returns `None` (corrupt / irrelevant) carry no
1184    /// domain work, but they were still received — so the cursor must advance
1185    /// over them.
1186    #[tokio::test]
1187    async fn corrupt_parse_entries_advance_cursor() {
1188        let updates = vec![put("a", b"1", 5), put("b", b"2", 6), put("c", b"3", 7)];
1189        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1190
1191        let apply_calls = Arc::new(AtomicU64::new(0));
1192        let on_applied_max = Arc::new(AtomicU64::new(0));
1193        let ac = Arc::clone(&apply_calls);
1194        let om = Arc::clone(&on_applied_max);
1195        let (_sd_tx, sd_rx) = watch::channel(false);
1196
1197        let cursor = watch_applied(
1198            watcher,
1199            WatchScope::All,
1200            None,
1201            None, // reader (no resync in this test)
1202            None::<AppendLogSnapshot>,
1203            None,
1204            BatchConfig::default(),
1205            // Reject everything — simulates corrupt/irrelevant entries.
1206            |_u: &KvUpdate| -> Option<Vec<u8>> { None },
1207            move |batch: Vec<Vec<u8>>| {
1208                ac.fetch_add(1, Ordering::SeqCst);
1209                assert!(batch.is_empty());
1210            },
1211            move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1212            sd_rx,
1213        )
1214        .await
1215        .unwrap();
1216
1217        assert_eq!(cursor.as_u64(), Some(7), "cursor covers rejected updates");
1218        assert_eq!(
1219            apply_calls.load(Ordering::SeqCst),
1220            0,
1221            "an all-rejected batch applies nothing"
1222        );
1223        assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1224    }
1225
1226    /// A resume whose cursor has expired falls back to the full watch and still
1227    /// applies the delivered updates.
1228    #[tokio::test]
1229    async fn cursor_expired_falls_back_to_full_watch() {
1230        let mock = MockWatcher {
1231            full: Mutex::new(Some(vec![put("a", b"1", 10), put("b", b"2", 11)])),
1232            from: Mutex::new(Some(vec![])),
1233            from_expires: true,
1234            hold: false,
1235        };
1236        let watcher = Arc::new(mock);
1237
1238        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1239        let ab = Arc::clone(&applied_batches);
1240        let (_sd_tx, sd_rx) = watch::channel(false);
1241
1242        let cursor = watch_applied(
1243            watcher,
1244            WatchScope::All,
1245            Some(WatchCursor::from_u64(5)), // resume position that "expired"
1246            None,                           // reader (no resync in this test)
1247            None::<AppendLogSnapshot>,
1248            None,
1249            BatchConfig::default(),
1250            parse_put,
1251            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1252            move |_| {},
1253            sd_rx,
1254        )
1255        .await
1256        .unwrap();
1257
1258        assert_eq!(cursor.as_u64(), Some(11));
1259        assert_eq!(
1260            *applied_batches.lock().unwrap(),
1261            vec![b"1".to_vec(), b"2".to_vec()],
1262            "fallback full watch's updates were applied"
1263        );
1264    }
1265
1266    /// Cursor-expired resync: with a reader + store wired, a key the fold holds
1267    /// that the live listing no longer does gets a synthetic delete — applied
1268    /// strictly BEFORE the fallback re-list — and the persisted fold converges
1269    /// to the live state. The synthetic delete (unknown version) must not move
1270    /// the cursor; the re-list put must.
1271    #[tokio::test]
1272    async fn cursor_expired_resync_deletes_stale_keys() {
1273        let dir = tempfile::TempDir::new().unwrap();
1274        let path = dir.path().join("resync.snap");
1275        let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1276        // The fold from the previous run: node.a and node.b at cursor 2.
1277        store
1278            .apply(
1279                &[put("node.a", b"1", 1), put("node.b", b"2", 2)],
1280                &WatchCursor::from_u64(2),
1281            )
1282            .unwrap();
1283
1284        // During the gap node.b was deleted (marker since evicted) and node.a
1285        // updated; the resume cursor (2) has expired. The fallback re-list
1286        // therefore carries only the surviving key.
1287        let mock = MockWatcher {
1288            full: Mutex::new(Some(vec![put("node.a", b"1b", 10)])),
1289            from: Mutex::new(Some(vec![])),
1290            from_expires: true,
1291            hold: false,
1292        };
1293        let reader = MockReader {
1294            live: vec!["node.a".to_string()],
1295        };
1296
1297        // Record everything `parse` sees, in order, deletes included.
1298        let seen = Arc::new(Mutex::new(Vec::<(String, bool)>::new()));
1299        let s = Arc::clone(&seen);
1300        let (_sd_tx, sd_rx) = watch::channel(false);
1301
1302        let cursor = watch_applied(
1303            Arc::new(mock),
1304            WatchScope::All,
1305            Some(WatchCursor::from_u64(2)),
1306            Some(Arc::new(reader) as Arc<dyn KvReader>),
1307            Some(store),
1308            None,
1309            BatchConfig::default(),
1310            move |u: &KvUpdate| {
1311                s.lock()
1312                    .unwrap()
1313                    .push((u.key().to_string(), matches!(u, KvUpdate::Delete { .. })));
1314                Some(())
1315            },
1316            |_batch: Vec<()>| {},
1317            |_| {},
1318            sd_rx,
1319        )
1320        .await
1321        .unwrap();
1322
1323        // The re-list put advanced the cursor; the synthetic delete did not.
1324        assert_eq!(cursor.as_u64(), Some(10));
1325        // The synthetic delete strictly precedes the re-list put.
1326        assert_eq!(
1327            *seen.lock().unwrap(),
1328            vec![("node.b".to_string(), true), ("node.a".to_string(), false)],
1329            "synthetic delete must be applied before the fallback re-list"
1330        );
1331
1332        // The persisted fold converged: stale key gone, live key updated.
1333        let snap = crate::snapshot::load(&path).unwrap().unwrap();
1334        assert_eq!(snap.cursor.as_u64(), Some(10));
1335        assert_eq!(snap.entries.len(), 1);
1336        assert_eq!(snap.entries["node.a"].value, b"1b");
1337    }
1338
1339    /// A prefix-scoped resync diffs only in-scope keys: an out-of-scope key the
1340    /// fold holds survives, the in-scope stale key is deleted, and a flush
1341    /// containing only synthetic deletes leaves the cursor untouched.
1342    #[tokio::test]
1343    async fn cursor_expired_resync_respects_scope() {
1344        let dir = tempfile::TempDir::new().unwrap();
1345        let path = dir.path().join("resync-scope.snap");
1346        let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1347        store
1348            .apply(
1349                &[put("node.b", b"2", 1), put("other.z", b"9", 2)],
1350                &WatchCursor::from_u64(2),
1351            )
1352            .unwrap();
1353
1354        // Expired resume; the bucket no longer has ANY node.* keys; the
1355        // fallback re-list is empty.
1356        let mock = MockWatcher {
1357            full: Mutex::new(Some(vec![])),
1358            from: Mutex::new(Some(vec![])),
1359            from_expires: true,
1360            hold: false,
1361        };
1362        let reader = MockReader { live: vec![] };
1363        let (_sd_tx, sd_rx) = watch::channel(false);
1364
1365        let cursor = watch_applied(
1366            Arc::new(mock),
1367            WatchScope::Prefix("node.".to_string()),
1368            Some(WatchCursor::from_u64(2)),
1369            Some(Arc::new(reader) as Arc<dyn KvReader>),
1370            Some(store),
1371            None,
1372            BatchConfig::default(),
1373            |_u: &KvUpdate| Some(()),
1374            |_batch: Vec<()>| {},
1375            |_| {},
1376            sd_rx,
1377        )
1378        .await
1379        .unwrap();
1380
1381        // Deletes-only flush: cursor stays at the resume position.
1382        assert_eq!(cursor.as_u64(), Some(2));
1383
1384        let snap = crate::snapshot::load(&path).unwrap().unwrap();
1385        assert_eq!(snap.cursor.as_u64(), Some(2));
1386        assert!(
1387            !snap.entries.contains_key("node.b"),
1388            "in-scope stale key must be resync-deleted"
1389        );
1390        assert_eq!(
1391            snap.entries["other.z"].value, b"9",
1392            "out-of-scope key must survive a prefix-scoped resync"
1393        );
1394    }
1395
1396    /// `WatchScope::Prefixes` dispatches to `watch_prefixes` (no resume) and to
1397    /// `watch_prefixes_from` with the expiry → full-watch fallback (resume).
1398    #[tokio::test]
1399    async fn prefixes_scope_dispatches_full_watch() {
1400        let updates = vec![put("a.x", b"1", 1), put("b.y", b"2", 2)];
1401        let watcher = Arc::new(MockWatcher::new(updates, false));
1402        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1403        let ab = Arc::clone(&applied_batches);
1404        let (_sd_tx, sd_rx) = watch::channel(false);
1405
1406        let cursor = watch_applied(
1407            watcher,
1408            WatchScope::Prefixes(vec!["a.".to_string(), "b.".to_string()]),
1409            None,
1410            None, // reader (no resync in this test)
1411            None::<AppendLogSnapshot>,
1412            None,
1413            BatchConfig::default(),
1414            parse_put,
1415            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1416            move |_| {},
1417            sd_rx,
1418        )
1419        .await
1420        .unwrap();
1421
1422        assert_eq!(cursor.as_u64(), Some(2));
1423        assert_eq!(
1424            *applied_batches.lock().unwrap(),
1425            vec![b"1".to_vec(), b"2".to_vec()]
1426        );
1427    }
1428
1429    /// `WatchScope::Prefixes` resume whose cursor has expired falls back to the
1430    /// full multi-prefix watch and applies its updates.
1431    #[tokio::test]
1432    async fn prefixes_scope_expired_resume_falls_back() {
1433        let mock = MockWatcher {
1434            full: Mutex::new(Some(vec![put("a.x", b"1", 7)])),
1435            from: Mutex::new(Some(vec![])),
1436            from_expires: true,
1437            hold: false,
1438        };
1439        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1440        let ab = Arc::clone(&applied_batches);
1441        let (_sd_tx, sd_rx) = watch::channel(false);
1442
1443        let cursor = watch_applied(
1444            Arc::new(mock),
1445            WatchScope::Prefixes(vec!["a.".to_string()]),
1446            Some(WatchCursor::from_u64(3)),
1447            None, // reader (no resync in this test)
1448            None::<AppendLogSnapshot>,
1449            None,
1450            BatchConfig::default(),
1451            parse_put,
1452            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1453            move |_| {},
1454            sd_rx,
1455        )
1456        .await
1457        .unwrap();
1458
1459        assert_eq!(cursor.as_u64(), Some(7));
1460        assert_eq!(*applied_batches.lock().unwrap(), vec![b"1".to_vec()]);
1461    }
1462
1463    /// End-to-end with a real snapshot file: after the run, the persisted
1464    /// snapshot's cursor equals the applied cursor and its entries match the
1465    /// applied state — proving the checkpoint is written at the post-apply
1466    /// cursor, never ahead of it.
1467    #[tokio::test]
1468    async fn snapshot_checkpoint_matches_applied_cursor() {
1469        let dir = tempfile::TempDir::new().unwrap();
1470        let path = dir.path().join("applied.snap");
1471        let (_resume, store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1472
1473        let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
1474        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1475        let (_sd_tx, sd_rx) = watch::channel(false);
1476
1477        let cursor = watch_applied(
1478            watcher,
1479            WatchScope::All,
1480            None,
1481            None, // reader (no resync in this test)
1482            Some(store),
1483            None,
1484            BatchConfig::default(),
1485            parse_put,
1486            move |_batch: Vec<Vec<u8>>| {},
1487            move |_| {},
1488            sd_rx,
1489        )
1490        .await
1491        .unwrap();
1492
1493        assert_eq!(cursor.as_u64(), Some(2));
1494
1495        let snap = crate::snapshot::load(&path).unwrap().unwrap();
1496        assert_eq!(
1497            snap.cursor.as_u64(),
1498            cursor.as_u64(),
1499            "snapshot checkpoint cursor must equal the applied cursor"
1500        );
1501        assert_eq!(snap.entries.len(), 2);
1502        assert_eq!(snap.entries["node.a"].value, b"1");
1503        assert_eq!(snap.entries["node.b"].value, b"2");
1504    }
1505
1506    /// Happy-path resume: a non-expired cursor takes the `*_from` path and the
1507    /// delta (the `from` script, NOT the full set) is applied. Proves the
1508    /// resume branch delivers only post-cursor updates and advances to their
1509    /// max revision.
1510    #[tokio::test]
1511    async fn resume_from_cursor_delivers_only_delta() {
1512        let mock = MockWatcher {
1513            // `full` would be delivered only if the resume path were (wrongly)
1514            // bypassed; a non-empty distinguishing value makes that visible.
1515            full: Mutex::new(Some(vec![put("full.x", b"FULL", 1)])),
1516            from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
1517            from_expires: false,
1518            hold: false,
1519        };
1520        let watcher = Arc::new(mock);
1521
1522        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1523        let ab = Arc::clone(&applied_batches);
1524        let (_sd_tx, sd_rx) = watch::channel(false);
1525
1526        let cursor = watch_applied(
1527            watcher,
1528            WatchScope::All,
1529            Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
1530            None,                           // reader (no resync in this test)
1531            None::<AppendLogSnapshot>,
1532            None,
1533            BatchConfig::default(),
1534            parse_put,
1535            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1536            move |_| {},
1537            sd_rx,
1538        )
1539        .await
1540        .unwrap();
1541
1542        assert_eq!(
1543            cursor.as_u64(),
1544            Some(11),
1545            "cursor advances to the delta max"
1546        );
1547        assert_eq!(
1548            *applied_batches.lock().unwrap(),
1549            vec![b"3".to_vec(), b"4".to_vec()],
1550            "only the post-cursor delta is applied, never the full set"
1551        );
1552    }
1553
1554    /// `WatchScope::Prefix` with no resume dispatches to `watch_prefix` and
1555    /// applies the delivered updates. Every other test uses `WatchScope::All`;
1556    /// this covers the prefix dispatch arm.
1557    #[tokio::test]
1558    async fn prefix_scope_applies_delivered_updates() {
1559        let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
1560        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1561
1562        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1563        let ab = Arc::clone(&applied_batches);
1564        let (_sd_tx, sd_rx) = watch::channel(false);
1565
1566        let cursor = watch_applied(
1567            watcher,
1568            WatchScope::Prefix("node.".to_string()),
1569            None,
1570            None, // reader (no resync in this test)
1571            None::<AppendLogSnapshot>,
1572            None,
1573            BatchConfig::default(),
1574            parse_put,
1575            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1576            move |_| {},
1577            sd_rx,
1578        )
1579        .await
1580        .unwrap();
1581
1582        assert_eq!(cursor.as_u64(), Some(2));
1583        assert_eq!(
1584            *applied_batches.lock().unwrap(),
1585            vec![b"1".to_vec(), b"2".to_vec()]
1586        );
1587    }
1588
1589    /// `WatchScope::Prefix` happy-path resume: a non-expired cursor takes the
1590    /// `watch_prefix_from` path and only the delta is applied — the prefix
1591    /// twin of `resume_from_cursor_delivers_only_delta`.
1592    #[tokio::test]
1593    async fn prefix_resume_from_cursor_delivers_only_delta() {
1594        let mock = MockWatcher {
1595            // `full` would be delivered only if the resume path were (wrongly)
1596            // bypassed; a distinguishing value makes that visible.
1597            full: Mutex::new(Some(vec![put("node.x", b"FULL", 1)])),
1598            from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
1599            from_expires: false,
1600            hold: false,
1601        };
1602        let watcher = Arc::new(mock);
1603
1604        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1605        let ab = Arc::clone(&applied_batches);
1606        let (_sd_tx, sd_rx) = watch::channel(false);
1607
1608        let cursor = watch_applied(
1609            watcher,
1610            WatchScope::Prefix("node.".to_string()),
1611            Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
1612            None,                           // reader (no resync in this test)
1613            None::<AppendLogSnapshot>,
1614            None,
1615            BatchConfig::default(),
1616            parse_put,
1617            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1618            move |_| {},
1619            sd_rx,
1620        )
1621        .await
1622        .unwrap();
1623
1624        assert_eq!(
1625            cursor.as_u64(),
1626            Some(11),
1627            "cursor advances to the delta max"
1628        );
1629        assert_eq!(
1630            *applied_batches.lock().unwrap(),
1631            vec![b"3".to_vec(), b"4".to_vec()],
1632            "only the post-cursor delta is applied via watch_prefix_from"
1633        );
1634    }
1635
1636    /// `WatchScope::Prefix` resume whose cursor has expired falls back to the
1637    /// full `watch_prefix` and still applies the delivered updates — the prefix
1638    /// twin of `cursor_expired_falls_back_to_full_watch`.
1639    #[tokio::test]
1640    async fn prefix_cursor_expired_falls_back_to_full_prefix_watch() {
1641        let mock = MockWatcher {
1642            full: Mutex::new(Some(vec![put("node.a", b"1", 10), put("node.b", b"2", 11)])),
1643            from: Mutex::new(Some(vec![])),
1644            from_expires: true,
1645            hold: false,
1646        };
1647        let watcher = Arc::new(mock);
1648
1649        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1650        let ab = Arc::clone(&applied_batches);
1651        let (_sd_tx, sd_rx) = watch::channel(false);
1652
1653        let cursor = watch_applied(
1654            watcher,
1655            WatchScope::Prefix("node.".to_string()),
1656            Some(WatchCursor::from_u64(5)), // resume position that "expired"
1657            None,                           // reader (no resync in this test)
1658            None::<AppendLogSnapshot>,
1659            None,
1660            BatchConfig::default(),
1661            parse_put,
1662            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1663            move |_| {},
1664            sd_rx,
1665        )
1666        .await
1667        .unwrap();
1668
1669        assert_eq!(cursor.as_u64(), Some(11));
1670        assert_eq!(
1671            *applied_batches.lock().unwrap(),
1672            vec![b"1".to_vec(), b"2".to_vec()],
1673            "prefix fallback full watch's updates were applied"
1674        );
1675    }
1676
1677    /// The watch task's terminal error must propagate out of `watch_applied`
1678    /// rather than being swallowed as `Ok(applied)` when the channel closes.
1679    #[tokio::test]
1680    async fn watch_task_error_propagates() {
1681        let watcher = Arc::new(ErrorWatcher);
1682        let (_sd_tx, sd_rx) = watch::channel(false);
1683
1684        let result = watch_applied(
1685            watcher,
1686            WatchScope::All,
1687            None,
1688            None, // reader (no resync in this test)
1689            None::<AppendLogSnapshot>,
1690            None,
1691            BatchConfig::default(),
1692            parse_put,
1693            move |_batch: Vec<Vec<u8>>| {},
1694            move |_| {},
1695            sd_rx,
1696        )
1697        .await;
1698
1699        match result {
1700            Err(KvError::WatchError(msg)) => {
1701                assert!(msg.contains("injected"), "error carries the cause: {msg}");
1702            }
1703            other => panic!("expected WatchError, got {other:?}"),
1704        }
1705    }
1706
1707    /// A batch where `parse` accepts some updates and rejects others: the cursor
1708    /// must still advance to the highest *received* revision (covering the
1709    /// rejected entry in the middle), while `apply` sees only the accepted ones.
1710    #[tokio::test]
1711    async fn mixed_parse_advances_cursor_over_rejected_entries() {
1712        let updates = vec![
1713            put("keep.a", b"1", 5),
1714            put("skip.b", b"2", 6), // rejected by parse
1715            put("keep.c", b"3", 7),
1716        ];
1717        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1718
1719        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1720        let on_applied_max = Arc::new(AtomicU64::new(0));
1721        let ab = Arc::clone(&applied_batches);
1722        let om = Arc::clone(&on_applied_max);
1723        let (_sd_tx, sd_rx) = watch::channel(false);
1724
1725        let cursor = watch_applied(
1726            watcher,
1727            WatchScope::All,
1728            None,
1729            None, // reader (no resync in this test)
1730            None::<AppendLogSnapshot>,
1731            None,
1732            BatchConfig::default(),
1733            // Keep only keys under "keep."; reject everything else.
1734            |u: &KvUpdate| -> Option<Vec<u8>> {
1735                match u {
1736                    KvUpdate::Put(e) if e.key.starts_with("keep.") => Some(e.value.clone()),
1737                    _ => None,
1738                }
1739            },
1740            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1741            move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1742            sd_rx,
1743        )
1744        .await
1745        .unwrap();
1746
1747        assert_eq!(
1748            cursor.as_u64(),
1749            Some(7),
1750            "cursor covers the rejected middle entry (rev 6)"
1751        );
1752        assert_eq!(
1753            *applied_batches.lock().unwrap(),
1754            vec![b"1".to_vec(), b"3".to_vec()],
1755            "apply sees only the accepted entries"
1756        );
1757        assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1758    }
1759
1760    /// Shutdown before any update arrives: nothing was received, so the cursor
1761    /// stays at the resume position (here `none()`), `apply` never runs, and
1762    /// `on_applied` never fires.
1763    #[tokio::test(start_paused = true)]
1764    async fn shutdown_with_no_pending_batch() {
1765        let watcher = Arc::new(MockWatcher::new(vec![], true)); // deliver nothing, hold open
1766
1767        let apply_calls = Arc::new(AtomicU64::new(0));
1768        let on_applied_calls = Arc::new(AtomicU64::new(0));
1769        let ac = Arc::clone(&apply_calls);
1770        let oc = Arc::clone(&on_applied_calls);
1771        let (sd_tx, sd_rx) = watch::channel(false);
1772
1773        let task = tokio::spawn(watch_applied(
1774            watcher,
1775            WatchScope::All,
1776            None,
1777            None, // reader (no resync in this test)
1778            None::<AppendLogSnapshot>,
1779            None,
1780            BatchConfig::default(),
1781            parse_put,
1782            move |_batch: Vec<Vec<u8>>| {
1783                ac.fetch_add(1, Ordering::SeqCst);
1784            },
1785            move |_| {
1786                oc.fetch_add(1, Ordering::SeqCst);
1787            },
1788            sd_rx,
1789        ));
1790
1791        // Let the watcher attach and idle (it has nothing to deliver), then shut down.
1792        tokio::time::sleep(Duration::from_millis(1)).await;
1793        sd_tx.send(true).unwrap();
1794
1795        let cursor = task.await.unwrap().unwrap();
1796        assert_eq!(
1797            cursor.as_u64(),
1798            None,
1799            "no updates received → cursor unmoved"
1800        );
1801        assert_eq!(apply_calls.load(Ordering::SeqCst), 0, "apply never runs");
1802        assert_eq!(
1803            on_applied_calls.load(Ordering::SeqCst),
1804            0,
1805            "on_applied never fires"
1806        );
1807    }
1808
1809    /// An [`ExportRequest`] flushes the pending batch first, so the artifact's
1810    /// cursor is exactly the applied cursor — and the artifact is importable
1811    /// with the batched entries in it.
1812    #[tokio::test(start_paused = true)]
1813    async fn export_request_flushes_pending_batch_first() {
1814        let dir = tempfile::TempDir::new().unwrap();
1815        let store_path = dir.path().join("fold.snap");
1816        let artifact = dir.path().join("artifact");
1817        let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1818
1819        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1820        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1821        let (sd_tx, sd_rx) = watch::channel(false);
1822        let (ex_tx, ex_rx) = mpsc::channel(1);
1823
1824        let task = tokio::spawn(watch_applied(
1825            watcher,
1826            WatchScope::All,
1827            None,
1828            None, // reader (no resync in this test)
1829            Some(store),
1830            Some(ex_rx),
1831            BatchConfig {
1832                window: Duration::from_secs(3600), // window never fires
1833                max: 100,
1834            },
1835            parse_put,
1836            move |_batch: Vec<Vec<u8>>| {},
1837            move |_| {},
1838            sd_rx,
1839        ));
1840
1841        // Let both updates land in the (unflushed) pending batch, then export.
1842        tokio::time::sleep(Duration::from_millis(1)).await;
1843        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1844        ex_tx
1845            .send(ExportRequest {
1846                dest_dir: artifact.clone(),
1847                reply: reply_tx,
1848            })
1849            .await
1850            .unwrap();
1851
1852        let manifest = reply_rx.await.unwrap().expect("export succeeds");
1853        assert_eq!(
1854            manifest.cursor.as_u64(),
1855            Some(2),
1856            "pending batch flushed before export: artifact cursor is the applied cursor"
1857        );
1858
1859        // The artifact is importable and holds both batched entries.
1860        let (cursor, imported) =
1861            AppendLogSnapshot::import(&artifact, &dir.path().join("imported.snap"), u64::MAX)
1862                .unwrap();
1863        assert_eq!(cursor.as_u64(), Some(2));
1864        assert_eq!(imported.get("a").unwrap().unwrap().value, b"1");
1865        assert_eq!(imported.get("b").unwrap().unwrap().value, b"2");
1866
1867        sd_tx.send(true).unwrap();
1868        task.await.unwrap().unwrap();
1869    }
1870
1871    /// An [`ExportRequest`] that arrives with NOTHING pending (the window
1872    /// already flushed everything) still produces a valid artifact whose
1873    /// cursor is the applied cursor. The flush-before-export step must be a
1874    /// clean no-op, not an error or a cursor regression.
1875    #[tokio::test(start_paused = true)]
1876    async fn export_with_empty_pending_batch_succeeds() {
1877        let dir = tempfile::TempDir::new().unwrap();
1878        let store_path = dir.path().join("fold.snap");
1879        let artifact = dir.path().join("artifact");
1880        let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1881
1882        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1883        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1884        let (sd_tx, sd_rx) = watch::channel(false);
1885        let (ex_tx, ex_rx) = mpsc::channel(1);
1886
1887        let task = tokio::spawn(watch_applied(
1888            watcher,
1889            WatchScope::All,
1890            None,
1891            None, // reader (no resync in this test)
1892            Some(store),
1893            Some(ex_rx),
1894            BatchConfig::default(), // 10 ms window
1895            parse_put,
1896            move |_batch: Vec<Vec<u8>>| {},
1897            move |_| {},
1898            sd_rx,
1899        ));
1900
1901        // Let the window flush both updates, so the export request finds an
1902        // EMPTY pending batch.
1903        tokio::time::sleep(Duration::from_millis(50)).await;
1904
1905        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1906        ex_tx
1907            .send(ExportRequest {
1908                dest_dir: artifact.clone(),
1909                reply: reply_tx,
1910            })
1911            .await
1912            .unwrap();
1913        let manifest = reply_rx
1914            .await
1915            .unwrap()
1916            .expect("export succeeds with nothing pending");
1917        assert_eq!(
1918            manifest.cursor.as_u64(),
1919            Some(2),
1920            "artifact cursor is the applied cursor, unchanged by the no-op flush"
1921        );
1922
1923        // The artifact is importable and holds the already-flushed entries.
1924        let (cursor, imported) =
1925            AppendLogSnapshot::import(&artifact, &dir.path().join("imported.snap"), u64::MAX)
1926                .unwrap();
1927        assert_eq!(cursor.as_u64(), Some(2));
1928        assert_eq!(imported.get("a").unwrap().unwrap().value, b"1");
1929        assert_eq!(imported.get("b").unwrap().unwrap().value, b"2");
1930
1931        sd_tx.send(true).unwrap();
1932        task.await.unwrap().unwrap();
1933    }
1934
1935    /// An export request against a store-less watch replies with an error and
1936    /// the watch keeps running.
1937    #[tokio::test(start_paused = true)]
1938    async fn export_without_store_replies_error() {
1939        let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
1940        let (sd_tx, sd_rx) = watch::channel(false);
1941        let (ex_tx, ex_rx) = mpsc::channel(1);
1942
1943        let task = tokio::spawn(watch_applied(
1944            watcher,
1945            WatchScope::All,
1946            None,
1947            None, // reader (no resync in this test)
1948            None::<AppendLogSnapshot>,
1949            Some(ex_rx),
1950            BatchConfig::default(),
1951            parse_put,
1952            move |_batch: Vec<Vec<u8>>| {},
1953            move |_| {},
1954            sd_rx,
1955        ));
1956
1957        tokio::time::sleep(Duration::from_millis(1)).await;
1958        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1959        ex_tx
1960            .send(ExportRequest {
1961                dest_dir: std::env::temp_dir().join("never-created"),
1962                reply: reply_tx,
1963            })
1964            .await
1965            .unwrap();
1966        assert!(
1967            reply_rx.await.unwrap().is_err(),
1968            "no store → export errors via the reply"
1969        );
1970
1971        // The watch is still alive and returns its applied cursor on shutdown.
1972        sd_tx.send(true).unwrap();
1973        let cursor = task.await.unwrap().unwrap();
1974        assert_eq!(cursor.as_u64(), Some(1));
1975    }
1976
1977    /// An export failure (unavailable destination) is reported on the reply and
1978    /// the watch keeps applying later updates.
1979    #[tokio::test(start_paused = true)]
1980    async fn export_error_does_not_kill_watch() {
1981        let dir = tempfile::TempDir::new().unwrap();
1982        let store_path = dir.path().join("fold.snap");
1983        let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1984
1985        // Occupied destination → export fails.
1986        let occupied = dir.path().join("occupied");
1987        std::fs::create_dir(&occupied).unwrap();
1988        std::fs::write(occupied.join("stray"), b"x").unwrap();
1989
1990        let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
1991        let (sd_tx, sd_rx) = watch::channel(false);
1992        let (ex_tx, ex_rx) = mpsc::channel(1);
1993
1994        let applied = Arc::new(AtomicU64::new(0));
1995        let a = Arc::clone(&applied);
1996
1997        let task = tokio::spawn(watch_applied(
1998            watcher,
1999            WatchScope::All,
2000            None,
2001            None, // reader (no resync in this test)
2002            Some(store),
2003            Some(ex_rx),
2004            BatchConfig::default(),
2005            parse_put,
2006            move |_batch: Vec<Vec<u8>>| {},
2007            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
2008            sd_rx,
2009        ));
2010
2011        tokio::time::sleep(Duration::from_millis(1)).await;
2012        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
2013        ex_tx
2014            .send(ExportRequest {
2015                dest_dir: occupied,
2016                reply: reply_tx,
2017            })
2018            .await
2019            .unwrap();
2020        match reply_rx.await.unwrap() {
2021            Err(crate::snapshot::SnapshotError::ArtifactInvalid(_)) => {}
2022            other => panic!("expected ArtifactInvalid, got {other:?}"),
2023        }
2024
2025        // Watch still folds: a clean shutdown returns the applied cursor.
2026        sd_tx.send(true).unwrap();
2027        let cursor = task.await.unwrap().unwrap();
2028        assert_eq!(cursor.as_u64(), Some(1), "watch survived the failed export");
2029        assert_eq!(applied.load(Ordering::SeqCst), 1);
2030    }
2031
2032    /// Dropping the export sender disarms the arm; the loop keeps batching and
2033    /// flushing normally.
2034    #[tokio::test(start_paused = true)]
2035    async fn export_sender_dropped_disarms_channel() {
2036        let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
2037        let (sd_tx, sd_rx) = watch::channel(false);
2038        let (ex_tx, ex_rx) = mpsc::channel::<ExportRequest>(1);
2039
2040        let applied = Arc::new(AtomicU64::new(0));
2041        let a = Arc::clone(&applied);
2042
2043        let task = tokio::spawn(watch_applied(
2044            watcher,
2045            WatchScope::All,
2046            None,
2047            None, // reader (no resync in this test)
2048            None::<AppendLogSnapshot>,
2049            Some(ex_rx),
2050            BatchConfig::default(),
2051            parse_put,
2052            move |_batch: Vec<Vec<u8>>| {},
2053            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
2054            sd_rx,
2055        ));
2056
2057        drop(ex_tx); // disarm
2058        tokio::time::sleep(Duration::from_millis(50)).await;
2059        assert_eq!(
2060            applied.load(Ordering::SeqCst),
2061            1,
2062            "loop keeps flushing after the export sender is gone"
2063        );
2064
2065        sd_tx.send(true).unwrap();
2066        task.await.unwrap().unwrap();
2067    }
2068
2069    /// With a low `compact_threshold`, the flush path's `spawn_blocking`
2070    /// compaction actually fires (every other snapshot test pins the threshold
2071    /// at `u64::MAX`, leaving that branch dead). After a compacting run the
2072    /// snapshot must still load cleanly with the right cursor and entries.
2073    #[tokio::test]
2074    async fn snapshot_compaction_fires_and_stays_consistent() {
2075        let dir = tempfile::TempDir::new().unwrap();
2076        let path = dir.path().join("applied.snap");
2077        // threshold 0 → every checkpoint reports "needs compact", forcing the
2078        // store's inline-compaction branch on each flush (run off the hot path via
2079        // spawn_blocking inside watch_applied).
2080        let (_resume, store) = AppendLogSnapshot::open(&path, 0).unwrap();
2081
2082        // Re-put the same key across flushes so compaction has duplicates to
2083        // dedup; small max forces multiple flushes (hence multiple compactions).
2084        let updates = vec![
2085            put("node.a", b"1", 1),
2086            put("node.a", b"2", 2),
2087            put("node.b", b"3", 3),
2088            put("node.a", b"4", 4),
2089        ];
2090        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
2091        let (_sd_tx, sd_rx) = watch::channel(false);
2092
2093        let cursor = watch_applied(
2094            watcher,
2095            WatchScope::All,
2096            None,
2097            None, // reader (no resync in this test)
2098            Some(store),
2099            None,
2100            BatchConfig {
2101                window: Duration::from_secs(3600),
2102                max: 1, // one update per flush → a compaction per update
2103            },
2104            parse_put,
2105            move |_batch: Vec<Vec<u8>>| {},
2106            move |_| {},
2107            sd_rx,
2108        )
2109        .await
2110        .unwrap();
2111
2112        assert_eq!(cursor.as_u64(), Some(4));
2113
2114        let snap = crate::snapshot::load(&path).unwrap().unwrap();
2115        assert_eq!(
2116            snap.cursor.as_u64(),
2117            cursor.as_u64(),
2118            "compacted snapshot's cursor still equals the applied cursor"
2119        );
2120        assert_eq!(snap.entries.len(), 2, "duplicates of node.a deduped");
2121        assert_eq!(
2122            snap.entries["node.a"].value, b"4",
2123            "last write per key survives compaction"
2124        );
2125        assert_eq!(snap.entries["node.b"].value, b"3");
2126    }
2127    /// A SnapshotStore whose FIRST apply fails (transient store error: disk
2128    /// pressure, lock timeout), then behaves normally — the trigger for the
2129    /// lost-raw-batch hazard in the flush path.
2130    struct FailOnceStore {
2131        inner: AppendLogSnapshot,
2132        failed: std::sync::atomic::AtomicBool,
2133    }
2134
2135    impl crate::snapshot::SnapshotStore for FailOnceStore {
2136        fn load(
2137            _path: &std::path::Path,
2138        ) -> Result<(WatchCursor, Self), crate::snapshot::SnapshotError> {
2139            unreachable!("test store is constructed directly")
2140        }
2141        fn apply(
2142            &mut self,
2143            batch: &[KvUpdate],
2144            cursor: &WatchCursor,
2145        ) -> Result<(), crate::snapshot::SnapshotError> {
2146            if !self.failed.swap(true, Ordering::SeqCst) {
2147                return Err(crate::snapshot::SnapshotError::Backend(
2148                    "injected transient store failure".into(),
2149                ));
2150            }
2151            self.inner.apply(batch, cursor)
2152        }
2153        fn get(&self, key: &str) -> Result<Option<KvEntry>, crate::snapshot::SnapshotError> {
2154            self.inner.get(key)
2155        }
2156        fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, crate::snapshot::SnapshotError> {
2157            self.inner.range(prefix)
2158        }
2159        fn cursor(&self) -> WatchCursor {
2160            self.inner.cursor()
2161        }
2162        fn export_to(
2163            &mut self,
2164            dest_dir: &std::path::Path,
2165        ) -> Result<crate::artifact::ExportManifest, crate::snapshot::SnapshotError> {
2166            self.inner.export_to(dest_dir)
2167        }
2168    }
2169
2170    /// CURSOR AUTHORITY under a transient store failure: a failed store apply
2171    /// must NOT cause later successful applies to advance the persisted
2172    /// cursor past data that never landed. The failed batch is re-queued and
2173    /// committed cumulatively with the next flush, so the store's cursor
2174    /// never lies about its contents — a restart resuming from it sees
2175    /// exactly the missing tail, not a silent hole.
2176    ///
2177    /// (Pre-fix behavior, found while writing the watch_applied model: the
2178    /// failed batch's raw updates were dropped on the warn-and-continue
2179    /// path, and the NEXT successful flush committed only newer updates
2180    /// under the newest cursor — a permanent, restart-surviving gap in the
2181    /// fold.)
2182    #[tokio::test(start_paused = true)]
2183    async fn transient_store_failure_never_leaves_a_cursor_gap() {
2184        let dir = tempfile::TempDir::new().unwrap();
2185        let path = dir.path().join("fold.snap");
2186        let (_r, inner) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
2187        let store = FailOnceStore {
2188            inner,
2189            failed: std::sync::atomic::AtomicBool::new(false),
2190        };
2191
2192        // max: 1 -> one flush per update: flush #1 (a@1) hits the injected
2193        // failure, flush #2 (b@2) succeeds.
2194        let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
2195        let watcher = Arc::new(MockWatcher::new(updates, true));
2196        let (sd_tx, sd_rx) = watch::channel(false);
2197
2198        let task = tokio::spawn(watch_applied(
2199            watcher,
2200            WatchScope::All,
2201            None,
2202            None,
2203            Some(store),
2204            None,
2205            BatchConfig {
2206                window: Duration::from_millis(1),
2207                max: 1,
2208            },
2209            parse_put,
2210            |_batch: Vec<Vec<u8>>| {},
2211            |_| {},
2212            sd_rx,
2213        ));
2214
2215        tokio::time::sleep(Duration::from_millis(50)).await;
2216        sd_tx.send(true).unwrap();
2217        let cursor = task.await.unwrap().unwrap();
2218        assert_eq!(cursor.as_u64(), Some(2));
2219
2220        // The store on disk must be SELF-CONSISTENT: whatever its cursor
2221        // claims, the data at or below it is present. With the re-queue fix
2222        // the cumulative commit lands both keys at cursor 2.
2223        let (persisted, reopened) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
2224        assert_eq!(persisted.as_u64(), Some(2), "cursor reached the head");
2225        assert_eq!(
2226            reopened.get("node.a").unwrap().map(|e| e.value),
2227            Some(b"1".to_vec()),
2228            "the transiently-failed batch was re-queued, not silently dropped \
2229             behind an advancing cursor"
2230        );
2231        assert_eq!(
2232            reopened.get("node.b").unwrap().map(|e| e.value),
2233            Some(b"2".to_vec())
2234        );
2235    }
2236    /// A reader whose live-key listing always fails — the resync's I/O
2237    /// failure mode.
2238    struct FailingReader;
2239
2240    #[async_trait]
2241    impl KvReader for FailingReader {
2242        async fn get(&self, _key: &str) -> Result<Option<KvEntry>, KvError> {
2243            unreachable!("resync only lists keys")
2244        }
2245        async fn keys(&self, _prefix: &str) -> Result<Vec<String>, KvError> {
2246            Err(KvError::OperationFailed("injected listing failure".into()))
2247        }
2248        async fn scan(&self, _prefix: &str) -> Result<Vec<KvEntry>, KvError> {
2249            unreachable!("resync only lists keys")
2250        }
2251    }
2252
2253    /// REGRESSION PIN (code-level twin of tests/model.rs's Degrade
2254    /// configuration): a resync whose live-key listing fails must FAIL THE
2255    /// WATCH, not degrade to re-list-only with a warning — the degrade
2256    /// semantics provably break the convergence theorem (silent stale keys).
2257    /// Reverting `resync_stale_keys` to warn-and-continue fails this test.
2258    #[tokio::test]
2259    async fn resync_listing_failure_is_fatal_not_degraded() {
2260        let dir = tempfile::TempDir::new().unwrap();
2261        let path = dir.path().join("resync-fatal.snap");
2262        let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
2263        store
2264            .apply(&[put("node.a", b"1", 1)], &WatchCursor::from_u64(1))
2265            .unwrap();
2266
2267        // Resume cursor expired -> resync path -> reader listing fails.
2268        let mock = MockWatcher {
2269            full: Mutex::new(Some(vec![])),
2270            from: Mutex::new(Some(vec![])),
2271            from_expires: true,
2272            hold: false,
2273        };
2274        let (_sd_tx, sd_rx) = watch::channel(false);
2275        let err = watch_applied(
2276            Arc::new(mock),
2277            WatchScope::All,
2278            Some(WatchCursor::from_u64(1)),
2279            Some(Arc::new(FailingReader) as Arc<dyn KvReader>),
2280            Some(store),
2281            None,
2282            BatchConfig::default(),
2283            parse_put,
2284            |_batch: Vec<Vec<u8>>| {},
2285            |_| {},
2286            sd_rx,
2287        )
2288        .await
2289        .expect_err("a failed resync listing must fail the watch");
2290        assert!(
2291            err.to_string().contains("resync failed listing live keys"),
2292            "{err}"
2293        );
2294    }
2295}