Skip to main content

slipstream/
applied.rs

1//! Cursor-after-apply watch combinator.
2//!
3//! [`watch_applied`] drives a [`KvWatcher`], batches incoming [`KvUpdate`]s over
4//! a short window (or a max count), hands each batch to a caller-supplied
5//! `apply` closure, and **only then** advances the resume cursor, checkpoints
6//! the snapshot, and fires `on_applied`. It encodes one discipline that every
7//! hand-rolled watch loop in the wider system gets subtly wrong:
8//!
9//! > **INVARIANT.** A persisted/reported cursor `C` implies every update with
10//! > revision ≤ `C` has been *applied* — the caller's `apply()` has returned for
11//! > it. The cursor never advances on *receipt* of an update, only after it has
12//! > durably taken effect.
13//!
14//! ## Why receipt is the wrong signal
15//!
16//! The tempting shortcut is to bump the cursor as each update arrives off the
17//! channel (`high_water = rev` on `rx.recv()`), then apply the batch later. On a
18//! crash between those two steps the persisted cursor claims "caught up to rev
19//! N" while rev N is still sitting in an unapplied buffer. On resume the watch
20//! starts *past* rev N and silently skips it — a correctness hole in the exact
21//! "resume after any restart" guarantee this crate advertises.
22//!
23//! Saltzer, Reed & Clark's *End-to-End Arguments in System Design* (1984) names
24//! the fix: a function placed below the endpoints (here, the channel receive)
25//! can only be a performance hint; the *endpoint* — the application of the
26//! update — is the only place the "it happened" guarantee can actually be
27//! established. So the cursor is written from `apply()`'s completion, not from
28//! the transport's delivery.
29//!
30//! The cursor-as-monotonic-index-into-a-log shape itself follows HashiCorp
31//! Consul's anti-entropy / blocking-query lineage: a client holds the last index
32//! it has *reconciled* and re-arms the watch from there, never from the index it
33//! merely *saw*.
34//!
35//! ## What the caller supplies
36//!
37//! - `parse`: maps a raw [`KvUpdate`] to an optional domain value `U`. Returning
38//!   `None` (corrupt bytes, irrelevant key) is fine — the update is still
39//!   *received*, so it still counts toward the cursor; there is simply nothing to
40//!   apply for it.
41//! - `apply`: consumes a `Vec<U>` in revision order. This is the only domain
42//!   logic; for the tunnel router it swaps the route table, for the edge origin
43//!   watcher it rebuilds the hashrings.
44//! - `on_applied`: fires once per flush, *after* `apply` returns, with the new
45//!   applied cursor. Callers use it to persist the cursor for the next restart.
46//!
47//! ## Panics
48//!
49//! `apply` runs inline on the watch task. If it panics, the panic propagates out
50//! of [`watch_applied`] and aborts the watch — that is the caller's contract,
51//! the same as a panic in any other supplied closure.
52
53use std::path::PathBuf;
54use std::sync::Arc;
55use std::time::Duration;
56
57use tokio::sync::mpsc;
58use tokio::sync::{oneshot, watch};
59use tracing::warn;
60
61use crate::artifact::ExportManifest;
62use crate::kv::{KvError, KvReader, KvUpdate, KvWatcher, WatchCursor};
63use crate::snapshot::{SnapshotError, SnapshotStore};
64
65/// A request, sent into a running [`watch_applied`] loop, to export the fold it
66/// owns (see [`SnapshotStore::export_to`]).
67///
68/// `watch_applied` takes its snapshot store **by value**, so a consumer that
69/// wants periodic artifacts of a live fold cannot call `export_to` itself. It
70/// instead passes an `mpsc::Receiver<ExportRequest>` to [`watch_applied`] and
71/// sends requests through the paired sender. The loop handles a request
72/// between batch flushes — after flushing any pending batch — so the artifact's
73/// embedded cursor is exactly the applied cursor at the moment of export.
74///
75/// The export result (or error) comes back on `reply`; an export failure is
76/// reported there and the watch keeps running (the snapshot is a cache — a
77/// failed artifact is the requester's problem, not the fold's).
78pub struct ExportRequest {
79    /// Where the artifact directory will be created. Must not exist (or be an
80    /// empty directory); same filesystem as the fold for cheap hardlinks.
81    pub dest_dir: PathBuf,
82    /// Receives the sealed manifest on success. A dropped receiver is ignored.
83    pub reply: oneshot::Sender<Result<ExportManifest, SnapshotError>>,
84}
85
86/// What to watch: every key, every key under a prefix, or the union of several
87/// prefixes.
88///
89/// Mirrors the [`KvWatcher`] surface — `All` maps to `watch_all` /
90/// `watch_all_from`, `Prefix` to `watch_prefix` / `watch_prefix_from`,
91/// `Prefixes` to `watch_prefixes` / `watch_prefixes_from` (one multi-filter
92/// consumer for the whole union, never one consumer per prefix).
93#[derive(Debug, Clone)]
94pub enum WatchScope {
95    /// Watch all keys in the bucket.
96    All,
97    /// Watch only keys beginning with this prefix.
98    Prefix(String),
99    /// Watch keys beginning with ANY of these prefixes, on a single consumer.
100    Prefixes(Vec<String>),
101}
102
103impl WatchScope {
104    /// The scope as a list of key prefixes (`All` = the empty prefix), for
105    /// callers that enumerate scope contents (live listings, fold ranges).
106    fn prefixes(&self) -> Vec<String> {
107        match self {
108            WatchScope::All => vec![String::new()],
109            WatchScope::Prefix(p) => vec![p.clone()],
110            WatchScope::Prefixes(ps) => ps.clone(),
111        }
112    }
113}
114
115/// Internal: a cursor-expired resync handoff from the watch task to the main
116/// loop. Carries the bucket's live key listing for the watch scope; the main
117/// loop diffs it against the fold and applies synthetic deletes for keys that
118/// vanished during the gap, then acks so the watch task can start the fallback
119/// watch — the ack ordering guarantees every synthetic delete is applied before
120/// the first re-list put arrives.
121struct ResyncRequest {
122    live_keys: Vec<String>,
123    ack: oneshot::Sender<()>,
124}
125
126/// Internal: what the watch task needs to initiate a resync — the reader that
127/// lists live keys and the channel into the main loop that owns the fold.
128type ResyncHandle = (Arc<dyn KvReader>, mpsc::Sender<ResyncRequest>);
129
130/// Batching policy for [`watch_applied`].
131///
132/// A flush fires when **either** bound is hit, whichever comes first: `window`
133/// time has elapsed since the batch opened, or `max` updates have accumulated.
134/// The window amortizes the cost of `apply` (e.g. one route-table clone per
135/// flush instead of one per update); `max` caps memory and latency when updates
136/// arrive faster than the window.
137#[derive(Debug, Clone, Copy)]
138pub struct BatchConfig {
139    /// Maximum time a batch stays open before being flushed.
140    pub window: Duration,
141    /// Maximum number of parsed updates in a batch before forcing a flush.
142    pub max: usize,
143}
144
145impl Default for BatchConfig {
146    /// 10 ms / 100 updates — the de-facto default every hand-rolled caller
147    /// already used, lifted into one place.
148    fn default() -> Self {
149        Self {
150            window: Duration::from_millis(10),
151            max: 100,
152        }
153    }
154}
155
156/// Drive a watch with cursor-after-apply semantics.
157///
158/// Subscribes per `scope` (resuming from `resume` when it carries a position),
159/// batches updates per `config`, applies each batch via `apply`, and only then
160/// advances the cursor / folds the batch into `store` / calls `on_applied`.
161/// Returns the final applied cursor when the watch ends (shutdown signalled, or
162/// the underlying stream closed).
163///
164/// `store` is any [`SnapshotStore`] backend the consumer chose (the in-RAM
165/// [`AppendLogSnapshot`](crate::AppendLogSnapshot) default, an on-disk backend, or
166/// its own impl) — or `None` to run without persistence. On each flush, *after*
167/// `apply` returns, the whole batch of raw [`KvUpdate`]s is handed to
168/// `store.apply(batch, applied_cursor)` on a blocking task, so the store's
169/// persisted cursor is always the post-apply cursor and never names a revision
170/// whose `apply` had not returned. The store fold is atomic (data + cursor), so a
171/// crash leaves the store consistent and resume re-folds only the tail.
172///
173/// # Cursor expiry and stale-key resync
174///
175/// On [`KvError::CursorExpired`] from the `*_from` resume path, this logs and
176/// falls back to a full-scope watch (`watch_all` / `watch_prefix` /
177/// `watch_prefixes`), whose state-sync re-list re-delivers the current value of
178/// every in-scope key as puts. The re-list alone cannot cover keys that were
179/// **deleted during the gap** and whose delete markers the backend has since
180/// evicted — they simply don't appear, leaving the fold (and the caller's
181/// domain state) holding them forever.
182///
183/// When both `reader` and `store` are provided, the expiry path closes that
184/// hole: before the fallback watch starts, the bucket's live keys are listed
185/// via `reader`, diffed against the fold's in-scope keys, and a synthetic
186/// [`KvUpdate::Delete`] (with an unknown [`VersionToken`](crate::VersionToken)) is run through
187/// `parse`/`apply`/`store` for each key that vanished. The synthetic deletes
188/// are strictly ordered before the first re-list put, so a key deleted and
189/// re-created during the gap converges correctly. Without a `reader` (or
190/// without a `store` to diff against) the fallback is re-list-only and a
191/// warning marks the possible stale keys.
192///
193/// See `ARCHITECTURE.md` ("Applied-Cursor Watch") for the invariant and its
194/// rationale.
195///
196/// # Type parameters
197/// - `U`: the caller's domain update type, produced by `parse` and consumed by
198///   `apply`.
199// This combinator takes each of its dependencies as a parameter so every
200// caller-supplied closure (`parse`/`apply`/`on_applied`) keeps its own distinct
201// type and is monomorphized at the call site. Folding them into a builder struct
202// would either box the closures or force a single generic bundle, losing that.
203#[allow(clippy::too_many_arguments)]
204// The flush macro resets `batch_high`/`batch_deadline` for the next loop
205// iteration. At the two flush sites that return immediately afterward (shutdown,
206// channel-close) those resets are dead stores — correct, but flagged. The allow
207// must sit on the function: a statement-scoped `#[allow]` inside the macro body
208// trips the experimental attributes-on-expressions gate (E0658) on stable.
209#[allow(unused_assignments)]
210pub async fn watch_applied<U, S, P, A, O>(
211    watcher: Arc<dyn KvWatcher>,
212    scope: WatchScope,
213    resume: Option<WatchCursor>,
214    // `Some(reader)` arms the cursor-expired stale-key resync (see the function
215    // docs); `None` keeps the re-list-only fallback. Only consulted on expiry —
216    // the hot path never touches it.
217    reader: Option<Arc<dyn KvReader>>,
218    mut store: Option<S>,
219    // `Some(rx)` arms an export-request arm in the select loop: each
220    // [`ExportRequest`] is handled between flushes (pending batch flushed
221    // first), so the exported artifact's cursor is exactly the applied cursor.
222    // `None` (or dropping the paired sender) leaves the loop's behavior
223    // unchanged.
224    mut exports: Option<mpsc::Receiver<ExportRequest>>,
225    config: BatchConfig,
226    mut parse: P,
227    mut apply: A,
228    mut on_applied: O,
229    mut shutdown: watch::Receiver<bool>,
230) -> Result<WatchCursor, KvError>
231where
232    U: Send,
233    // `Send + 'static`: each flush moves `store` onto a blocking task to run its
234    // (potentially blocking) `apply`, then takes it back — the same offload the
235    // append log's compaction always used.
236    S: SnapshotStore + Send + 'static,
237    P: FnMut(&KvUpdate) -> Option<U> + Send,
238    A: FnMut(Vec<U>) + Send,
239    O: FnMut(WatchCursor) + Send,
240{
241    // The cursor we'll return. Initialized from the resume position so that a
242    // watch which receives nothing new still reports the position it resumed
243    // from as "applied" (it is — everything up to it was applied before the last
244    // run persisted it).
245    let mut applied = match &resume {
246        Some(c) => c.clone(),
247        None => WatchCursor::none(),
248    };
249
250    // The scope's prefixes, for the resync diff against the fold. Cloned out
251    // before `scope` moves into the watch task.
252    let scope_prefixes = scope.prefixes();
253
254    // Cursor-expired resync channel, armed only when there is a reader to list
255    // live keys AND a store to diff them against. The watch task sends the live
256    // listing here and waits for the ack before starting the fallback watch, so
257    // synthetic deletes always precede the re-list.
258    let (resync_pair, mut resyncs): (Option<ResyncHandle>, Option<mpsc::Receiver<ResyncRequest>>) =
259        match reader {
260            Some(reader) if store.is_some() => {
261                let (rs_tx, rs_rx) = mpsc::channel(1);
262                (Some((reader, rs_tx)), Some(rs_rx))
263            }
264            _ => (None, None),
265        };
266
267    // Spawn the watch task. It owns the cursor-expired fallback so the main loop
268    // only ever sees a clean ordered stream of updates on `rx`.
269    let (tx, mut rx) = mpsc::channel::<KvUpdate>(256);
270    let handle = {
271        let watcher = Arc::clone(&watcher);
272        tokio::spawn(
273            async move { run_watch(watcher.as_ref(), &scope, resume, resync_pair, tx).await },
274        )
275    };
276
277    // Batch state.
278    //
279    // `batch_high` tracks the version of the most recently *received* update
280    // since the last flush — including updates `parse` rejected. NATS delivers
281    // in revision order, so the last received is the highest, and advancing the
282    // cursor to it after a single atomic `apply` is correct: having seen the max
283    // means we've seen everything below it, and a rejected entry is still
284    // "nothing to apply", hence covered. Reset to `none()` after every flush.
285    // Pre-size to the flush bound so no batch ever re-climbs the reallocation
286    // ladder; `max(1)` only guards a nonsensical `max = 0` config.
287    let batch_cap = config.max.max(1);
288    let mut batch: Vec<U> = Vec::with_capacity(batch_cap);
289    // Raw received updates for the durable `store`, in revision order. Only
290    // populated when a `store` is present; the store folds the *raw* updates
291    // (including ones `parse` rejected — they are still part of the bucket's
292    // state), whereas the parsed `batch` above is the consumer's domain view.
293    let mut raw_batch: Vec<KvUpdate> = Vec::new();
294    let mut batch_high = WatchCursor::none();
295    // `Some` once a batch has opened and the window timer is armed; `None`
296    // between flushes. Only the armed/idle distinction is read in the loop —
297    // the absolute instant lives in the pinned `sleep` future below.
298    let mut batch_deadline: Option<tokio::time::Instant> = None;
299
300    // Flush the current batch, in order: run the domain `apply` (if non-empty) to
301    // completion, advance the cursor, fold the raw batch + cursor durably into
302    // `store`, then fire `on_applied`. The store fold runs on a blocking task
303    // (its `apply` may block on I/O), moving the store in and taking it back — the
304    // same offload the append log's compaction always used. A store error is
305    // logged and the watch continues (the snapshot is a cache); a panicked
306    // blocking task drops the store irrecoverably, which breaks the
307    // resume-after-restart guarantee, so it is surfaced as fatal.
308    macro_rules! flush {
309        () => {{
310            // Nothing received since the last flush → nothing to do at all.
311            // (`raw_batch` can be non-empty with no cursor advance only via the
312            // resync path's synthetic deletes, which carry no revision.)
313            if !batch.is_empty() || !raw_batch.is_empty() || !batch_high.is_none() {
314                if !batch.is_empty() {
315                    // INVARIANT: apply() runs and RETURNS before any cursor
316                    // advance below. Move the batch out so a panicking apply
317                    // can't leave half-consumed state behind.
318                    //
319                    // `replace` (not `take`) leaves a pre-sized Vec behind so each
320                    // batch after the first doesn't re-climb the reallocation
321                    // ladder (4→8→…→cap).
322                    apply(std::mem::replace(&mut batch, Vec::with_capacity(batch_cap)));
323                }
324                let advanced = !batch_high.is_none();
325                if advanced {
326                    applied = batch_high.clone();
327                }
328                if !raw_batch.is_empty()
329                    && let Some(mut st) = store.take()
330                {
331                    let raw = std::mem::take(&mut raw_batch);
332                    // Fold at the post-advance cursor. A synthetic-deletes-only
333                    // batch leaves the cursor where it was (the deletes are a
334                    // state correction, not log entries), which is safe: an
335                    // unchanged — possibly expired — cursor only ever re-runs
336                    // this same resync on the next restart.
337                    let cur = applied.clone();
338                    // Hand the store back unconditionally on a clean return so
339                    // a *failed* apply (Ok(Err)) keeps the watch running; only
340                    // a *panicked* task (Err) loses the store and is fatal.
341                    match tokio::task::spawn_blocking(move || {
342                        let res = st.apply(&raw, &cur);
343                        (st, res)
344                    })
345                    .await
346                    {
347                        Ok((st, Ok(()))) => store = Some(st),
348                        Ok((st, Err(e))) => {
349                            warn!(error = %e, "snapshot store apply failed; continuing");
350                            store = Some(st);
351                        }
352                        Err(e) => {
353                            warn!(error = %e, "snapshot store task panicked; aborting watch");
354                            handle.abort();
355                            return Err(KvError::WatchError(format!(
356                                "snapshot store task panicked: {e}"
357                            )));
358                        }
359                    }
360                }
361                if advanced {
362                    on_applied(applied.clone());
363                    batch_high = WatchCursor::none();
364                }
365            }
366            batch_deadline = None;
367        }};
368    }
369
370    // A single timer future, reset in place each time a batch opens. The old
371    // `tokio::time::sleep(timeout)` lived inside the select arm, so it was
372    // re-created on every loop iteration — one Arc-backed timer-wheel entry
373    // allocated, registered, and immediately dropped per received update.
374    // Pinning one future and `reset`-ing it reuses that single allocation; the
375    // `if batch_deadline.is_some()` guard keeps it from firing while idle, so
376    // its initial already-elapsed deadline is never observed.
377    let sleep = tokio::time::sleep(Duration::ZERO);
378    tokio::pin!(sleep);
379
380    loop {
381        tokio::select! {
382            biased;
383
384            // Shutdown wins: flush whatever is batched (so the cursor reflects
385            // it), abandon any updates still in flight on the channel — they
386            // weren't applied, the cursor doesn't claim them, and they'll be
387            // re-delivered on the next resume — and return the applied cursor.
388            res = shutdown.changed() => {
389                if res.is_err() || *shutdown.borrow() {
390                    flush!();
391                    handle.abort();
392                    // Observe the task's terminal state. An abort surfaces as a
393                    // cancelled JoinError, which we ignore; a genuine panic that
394                    // raced ahead of the abort is logged rather than silently lost.
395                    if let Err(join) = handle.await
396                        && !join.is_cancelled()
397                    {
398                        warn!(error = %join, "watch task panicked at shutdown");
399                    }
400                    return Ok(applied);
401                }
402            }
403
404            // Batch window elapsed.
405            () = &mut sleep, if batch_deadline.is_some() => {
406                flush!();
407            }
408
409            // Cursor-expired resync. Placed before `rx.recv()` (biased) so the
410            // synthetic deletes are folded before any update the fallback watch
411            // delivers — though the ack protocol already guarantees the fallback
412            // hasn't started while this arm runs. Diff the fold's in-scope keys
413            // against the bucket's live listing; anything the fold holds that
414            // the bucket no longer does vanished during the gap (its delete
415            // marker evicted with the cursor), so synthesize the delete the
416            // re-list can't deliver.
417            req = async { resyncs.as_mut().expect("arm guarded by is_some").recv().await },
418                if resyncs.is_some() => {
419                match req {
420                    Some(ResyncRequest { live_keys, ack }) => {
421                        // Flush first so the diff runs against a fold that
422                        // reflects everything received so far.
423                        flush!();
424                        let live: std::collections::HashSet<&str> =
425                            live_keys.iter().map(String::as_str).collect();
426                        let mut stale: Vec<String> = Vec::new();
427                        if let Some(st) = &store {
428                            for prefix in &scope_prefixes {
429                                match st.range(prefix) {
430                                    Ok(entries) => stale.extend(
431                                        entries
432                                            .into_iter()
433                                            .filter(|e| !live.contains(e.key.as_str()))
434                                            .map(|e| e.key),
435                                    ),
436                                    Err(e) => {
437                                        warn!(error = %e, prefix = %prefix,
438                                            "resync fold range failed; stale keys under this prefix may persist");
439                                    }
440                                }
441                            }
442                        }
443                        // Overlapping prefixes can list a key twice.
444                        stale.sort_unstable();
445                        stale.dedup();
446                        if !stale.is_empty() {
447                            warn!(stale = stale.len(), "cursor-expired resync: deleting keys that vanished during the gap");
448                        }
449                        for key in stale {
450                            // Synthetic: carries no revision (unknown version)
451                            // and so never advances the cursor.
452                            let u = KvUpdate::Delete {
453                                key,
454                                version: crate::kv::VersionToken::unknown(),
455                            };
456                            if store.is_some() {
457                                raw_batch.push(u.clone());
458                            }
459                            if let Some(parsed) = parse(&u) {
460                                batch.push(parsed);
461                            }
462                        }
463                        flush!();
464                        // Ack AFTER the deletes are applied: the watch task is
465                        // holding the fallback watch until it hears back, which
466                        // is what orders deletes before the re-list.
467                        let _ = ack.send(());
468                    }
469                    None => resyncs = None,
470                }
471            }
472
473            // Export request. Placed after shutdown/window (they stay prompt)
474            // and before `rx.recv()` so a firehose of updates cannot starve an
475            // export indefinitely. The pending batch is flushed first, so the
476            // exported cursor is exactly the applied cursor; the export itself
477            // runs on a blocking task with the store moved in and taken back —
478            // the same offload the flush path uses.
479            req = async { exports.as_mut().expect("arm guarded by is_some").recv().await },
480                if exports.is_some() => {
481                match req {
482                    Some(ExportRequest { dest_dir, reply }) => {
483                        flush!();
484                        match store.take() {
485                            Some(mut st) => {
486                                match tokio::task::spawn_blocking(move || {
487                                    let res = st.export_to(&dest_dir);
488                                    (st, res)
489                                })
490                                .await
491                                {
492                                    // Hand the store back on any clean return; an
493                                    // export failure goes to the requester only —
494                                    // the watch keeps running (the snapshot is a
495                                    // cache). A panicked task lost the store,
496                                    // which breaks the resume guarantee: fatal,
497                                    // same as the flush path's apply panic.
498                                    Ok((st, res)) => {
499                                        store = Some(st);
500                                        let _ = reply.send(res);
501                                    }
502                                    Err(e) => {
503                                        warn!(error = %e, "snapshot export task panicked; aborting watch");
504                                        handle.abort();
505                                        return Err(KvError::WatchError(format!(
506                                            "snapshot export task panicked: {e}"
507                                        )));
508                                    }
509                                }
510                            }
511                            None => {
512                                let _ = reply.send(Err(SnapshotError::Backend(
513                                    "watch_applied runs without a snapshot store; nothing to export"
514                                        .into(),
515                                )));
516                            }
517                        }
518                    }
519                    // Sender dropped: disarm the arm for the rest of the run.
520                    None => exports = None,
521                }
522            }
523
524            update = rx.recv() => {
525                match update {
526                    Some(u) => {
527                        // Cursor authority: every received update bumps the
528                        // pending high-water, regardless of whether `parse`
529                        // keeps it.
530                        batch_high = WatchCursor::from_version(u.version().clone());
531
532                        // Buffer the raw update for the durable store fold (which
533                        // commits the whole batch + cursor atomically on flush).
534                        // Done before `parse` consumes `u` by reference, and only
535                        // when a store is present so the no-persistence path keeps
536                        // its zero-copy cost.
537                        if store.is_some() {
538                            raw_batch.push(u.clone());
539                        }
540
541                        if let Some(parsed) = parse(&u) {
542                            batch.push(parsed);
543                        }
544
545                        // Arm the window on the first received update of a batch
546                        // — even a parse-rejected one, so the cursor advances
547                        // within `window` even through a run of irrelevant keys.
548                        // Reset the pinned timer to the new deadline rather than
549                        // allocating a fresh `Sleep`.
550                        if batch_deadline.is_none() {
551                            let deadline = tokio::time::Instant::now() + config.window;
552                            sleep.as_mut().reset(deadline);
553                            batch_deadline = Some(deadline);
554                        }
555
556                        // Flush on a full parsed batch, or — when persisting — a
557                        // full raw batch, so a window packed with parse-rejected
558                        // updates can't grow `raw_batch` without bound before the
559                        // window elapses.
560                        if batch.len() >= config.max || raw_batch.len() >= config.max {
561                            flush!();
562                        }
563                    }
564                    None => {
565                        // Stream closed. Flush the remainder, then surface the
566                        // watch task's terminal result: a clean end returns the
567                        // applied cursor, an error propagates.
568                        flush!();
569                        return match handle.await {
570                            Ok(Ok(())) => Ok(applied),
571                            Ok(Err(e)) => Err(e),
572                            Err(join) => Err(KvError::WatchError(format!(
573                                "watch task panicked: {join}"
574                            ))),
575                        };
576                    }
577                }
578            }
579        }
580    }
581}
582
583/// Run the underlying watch for `scope`, resuming from `resume` when it carries
584/// a position, with the [`KvError::CursorExpired`] → resync + full-watch
585/// fallback.
586async fn run_watch(
587    watcher: &dyn KvWatcher,
588    scope: &WatchScope,
589    resume: Option<WatchCursor>,
590    resync: Option<ResyncHandle>,
591    tx: mpsc::Sender<KvUpdate>,
592) -> Result<(), KvError> {
593    // Resume only when the cursor carries a real position; an absent or `none()`
594    // cursor falls through to a full watch. Binding `cursor` here makes "we have a
595    // resume position" structural — there is no separate bool whose truth a later
596    // edit could let drift from the `Some`.
597    let resume_cursor = resume.filter(|c| !c.is_none());
598
599    match scope {
600        WatchScope::All => {
601            if let Some(cursor) = resume_cursor {
602                match watcher.watch_all_from(&cursor, tx.clone()).await {
603                    Err(KvError::CursorExpired) => {
604                        warn!(
605                            "watch cursor expired; resyncing, then falling back to full watch_all"
606                        );
607                        resync_stale_keys(scope, &resync).await;
608                        watcher.watch_all(tx).await
609                    }
610                    other => other,
611                }
612            } else {
613                watcher.watch_all(tx).await
614            }
615        }
616        WatchScope::Prefix(prefix) => {
617            if let Some(cursor) = resume_cursor {
618                match watcher.watch_prefix_from(prefix, &cursor, tx.clone()).await {
619                    Err(KvError::CursorExpired) => {
620                        warn!(
621                            "watch cursor expired; resyncing, then falling back to full watch_prefix"
622                        );
623                        resync_stale_keys(scope, &resync).await;
624                        watcher.watch_prefix(prefix, tx).await
625                    }
626                    other => other,
627                }
628            } else {
629                watcher.watch_prefix(prefix, tx).await
630            }
631        }
632        WatchScope::Prefixes(prefixes) => {
633            let refs: Vec<&str> = prefixes.iter().map(String::as_str).collect();
634            if let Some(cursor) = resume_cursor {
635                match watcher
636                    .watch_prefixes_from(&refs, &cursor, tx.clone())
637                    .await
638                {
639                    Err(KvError::CursorExpired) => {
640                        warn!(
641                            "watch cursor expired; resyncing, then falling back to full watch_prefixes"
642                        );
643                        resync_stale_keys(scope, &resync).await;
644                        watcher.watch_prefixes(&refs, tx).await
645                    }
646                    other => other,
647                }
648            } else {
649                watcher.watch_prefixes(&refs, tx).await
650            }
651        }
652    }
653}
654
655/// Cursor-expired stale-key resync, run BEFORE the fallback watch is
656/// established: list the scope's live keys, hand them to the main loop (which
657/// diffs them against the fold and applies synthetic deletes), and wait for the
658/// ack. That ordering — deletes applied, then fallback watch armed — is what
659/// makes a delete-then-recreate during the gap converge: the synthetic delete
660/// always lands before the re-list put.
661///
662/// Best-effort: with no reader/store wired (`resync` is `None`), or a failed
663/// listing, this degrades to the warn-and-relist-only fallback — keys deleted
664/// during the gap stay in the fold until their next update.
665async fn resync_stale_keys(scope: &WatchScope, resync: &Option<ResyncHandle>) {
666    let Some((reader, resync_tx)) = resync else {
667        warn!(
668            "no reader wired for cursor-expired resync; keys deleted during the gap may persist in the fold"
669        );
670        return;
671    };
672    let mut live_keys = Vec::new();
673    for prefix in scope.prefixes() {
674        match reader.keys(&prefix).await {
675            Ok(keys) => live_keys.extend(keys),
676            Err(e) => {
677                warn!(error = %e, prefix = %prefix,
678                    "resync live-key listing failed; keys deleted during the gap may persist in the fold");
679                return;
680            }
681        }
682    }
683    let (ack_tx, ack_rx) = oneshot::channel();
684    if resync_tx
685        .send(ResyncRequest {
686            live_keys,
687            ack: ack_tx,
688        })
689        .await
690        .is_ok()
691    {
692        // A dropped ack (main loop shutting down) just means the fallback watch
693        // is about to die with it; nothing to recover.
694        let _ = ack_rx.await;
695    }
696}
697
698#[cfg(test)]
699mod tests {
700    use super::*;
701    use crate::kv::{KvEntry, VersionToken};
702    use crate::snapshot::AppendLogSnapshot;
703    use async_trait::async_trait;
704    use std::sync::Mutex;
705    use std::sync::atomic::{AtomicU64, Ordering};
706    use tokio::sync::mpsc::Sender;
707
708    fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
709        KvUpdate::Put(KvEntry {
710            key: key.to_string(),
711            value: value.to_vec(),
712            version: VersionToken::from_u64(rev),
713        })
714    }
715
716    /// A scripted watcher. Delivers a pre-set list of updates through the
717    /// channel, then either holds the channel open (so window/max/shutdown
718    /// flushes can be exercised without the stream ending) or returns cleanly
719    /// (so channel-close flushing can be exercised).
720    struct MockWatcher {
721        full: Mutex<Option<Vec<KvUpdate>>>,
722        from: Mutex<Option<Vec<KvUpdate>>>,
723        from_expires: bool,
724        hold: bool,
725    }
726
727    impl MockWatcher {
728        fn new(updates: Vec<KvUpdate>, hold: bool) -> Self {
729            Self {
730                full: Mutex::new(Some(updates)),
731                from: Mutex::new(None),
732                from_expires: false,
733                hold,
734            }
735        }
736
737        async fn deliver(&self, which: &Mutex<Option<Vec<KvUpdate>>>, tx: Sender<KvUpdate>) {
738            let updates = which.lock().unwrap().take().unwrap_or_default();
739            for u in updates {
740                if tx.send(u).await.is_err() {
741                    return;
742                }
743            }
744            if self.hold {
745                // Keep `tx` alive (channel open) until this task is aborted.
746                std::future::pending::<()>().await;
747            }
748        }
749    }
750
751    #[async_trait]
752    impl KvWatcher for MockWatcher {
753        async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
754            self.deliver(&self.full, tx).await;
755            Ok(())
756        }
757
758        async fn watch_prefix(&self, _prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
759            self.deliver(&self.full, tx).await;
760            Ok(())
761        }
762
763        async fn watch_prefixes(
764            &self,
765            _prefixes: &[&str],
766            tx: Sender<KvUpdate>,
767        ) -> Result<(), KvError> {
768            // This mock scripts the applied-watch resumption tests, not prefix
769            // filtering; it delivers the same `full` script as `watch_prefix`.
770            // The real multi-filter scoping is proved in the NATS integration test.
771            self.deliver(&self.full, tx).await;
772            Ok(())
773        }
774
775        async fn watch_all_from(
776            &self,
777            _cursor: &WatchCursor,
778            tx: Sender<KvUpdate>,
779        ) -> Result<(), KvError> {
780            if self.from_expires {
781                return Err(KvError::CursorExpired);
782            }
783            self.deliver(&self.from, tx).await;
784            Ok(())
785        }
786
787        // Mirror watch_all_from so the prefix resume / expiry arms of run_watch
788        // are exercised against the same `from` script. Without this the trait's
789        // default impl would delegate to watch_prefix and silently deliver the
790        // full set instead of the delta.
791        async fn watch_prefix_from(
792            &self,
793            _prefix: &str,
794            _cursor: &WatchCursor,
795            tx: Sender<KvUpdate>,
796        ) -> Result<(), KvError> {
797            if self.from_expires {
798                return Err(KvError::CursorExpired);
799            }
800            self.deliver(&self.from, tx).await;
801            Ok(())
802        }
803
804        // Same mirroring for the multi-prefix resume arm.
805        async fn watch_prefixes_from(
806            &self,
807            _prefixes: &[&str],
808            _cursor: &WatchCursor,
809            tx: Sender<KvUpdate>,
810        ) -> Result<(), KvError> {
811            if self.from_expires {
812                return Err(KvError::CursorExpired);
813            }
814            self.deliver(&self.from, tx).await;
815            Ok(())
816        }
817    }
818
819    /// A reader whose `keys()` serves a scripted live listing — the only call
820    /// the cursor-expired resync makes. Filters by prefix like a real backend
821    /// so prefix-scoped resyncs are exercised faithfully.
822    struct MockReader {
823        live: Vec<String>,
824    }
825
826    #[async_trait]
827    impl KvReader for MockReader {
828        async fn get(&self, _key: &str) -> Result<Option<KvEntry>, KvError> {
829            unreachable!("resync only lists keys")
830        }
831
832        async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
833            Ok(self
834                .live
835                .iter()
836                .filter(|k| k.starts_with(prefix))
837                .cloned()
838                .collect())
839        }
840
841        async fn scan(&self, _prefix: &str) -> Result<Vec<KvEntry>, KvError> {
842            unreachable!("resync only lists keys")
843        }
844    }
845
846    /// A watcher whose entry points all fail. Used to prove the watch task's
847    /// terminal error is surfaced out of `watch_applied` rather than swallowed
848    /// as a clean `Ok(applied)` when the channel closes.
849    struct ErrorWatcher;
850
851    #[async_trait]
852    impl KvWatcher for ErrorWatcher {
853        async fn watch_all(&self, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
854            Err(KvError::WatchError("injected watch failure".into()))
855        }
856
857        async fn watch_prefix(&self, _prefix: &str, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
858            Err(KvError::WatchError("injected watch failure".into()))
859        }
860
861        async fn watch_prefixes(
862            &self,
863            _prefixes: &[&str],
864            _tx: Sender<KvUpdate>,
865        ) -> Result<(), KvError> {
866            Err(KvError::WatchError("injected watch failure".into()))
867        }
868    }
869
870    // A no-op parse that keeps every Put as the value bytes; drops deletes.
871    fn parse_put(u: &KvUpdate) -> Option<Vec<u8>> {
872        match u {
873            KvUpdate::Put(e) => Some(e.value.clone()),
874            _ => None,
875        }
876    }
877
878    /// The stream closes (hold = false) with a pending batch; the remainder is
879    /// flushed before returning, the returned cursor is the last revision, and
880    /// `on_applied` ran exactly once after `apply`.
881    #[tokio::test]
882    async fn flush_on_channel_close() {
883        let updates = vec![put("a", b"1", 1), put("b", b"2", 2), put("c", b"3", 3)];
884        let watcher = Arc::new(MockWatcher::new(updates, false));
885
886        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<Vec<u8>>>::new()));
887        let on_applied_cursors = Arc::new(Mutex::new(Vec::<u64>::new()));
888
889        let ab = Arc::clone(&applied_batches);
890        let oc = Arc::clone(&on_applied_cursors);
891        let (_sd_tx, sd_rx) = watch::channel(false);
892
893        let cursor = watch_applied(
894            watcher,
895            WatchScope::All,
896            None,
897            None, // reader (no resync in this test)
898            None::<AppendLogSnapshot>,
899            None,
900            BatchConfig::default(),
901            parse_put,
902            move |batch| ab.lock().unwrap().push(batch),
903            move |c| oc.lock().unwrap().push(c.as_u64().unwrap()),
904            sd_rx,
905        )
906        .await
907        .unwrap();
908
909        assert_eq!(cursor.as_u64(), Some(3));
910        let batches = applied_batches.lock().unwrap();
911        let flat: Vec<Vec<u8>> = batches.iter().flatten().cloned().collect();
912        assert_eq!(flat, vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec()]);
913        assert_eq!(*on_applied_cursors.lock().unwrap().last().unwrap(), 3);
914    }
915
916    /// Fewer than `max` updates, then the channel idles: the window timer must
917    /// flush them and advance the cursor.
918    #[tokio::test(start_paused = true)]
919    async fn flush_on_window() {
920        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
921        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
922
923        let applied = Arc::new(AtomicU64::new(0));
924        let count = Arc::new(AtomicU64::new(0));
925        let a = Arc::clone(&applied);
926        let c = Arc::clone(&count);
927        let (sd_tx, sd_rx) = watch::channel(false);
928
929        let task = tokio::spawn(watch_applied(
930            watcher,
931            WatchScope::All,
932            None,
933            None, // reader (no resync in this test)
934            None::<AppendLogSnapshot>,
935            None,
936            BatchConfig::default(),
937            parse_put,
938            move |batch: Vec<Vec<u8>>| {
939                c.fetch_add(batch.len() as u64, Ordering::SeqCst);
940            },
941            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
942            sd_rx,
943        ));
944
945        // Let the window (10ms) elapse under virtual time.
946        tokio::time::sleep(Duration::from_millis(50)).await;
947        assert_eq!(
948            count.load(Ordering::SeqCst),
949            2,
950            "window should have flushed"
951        );
952        assert_eq!(applied.load(Ordering::SeqCst), 2);
953
954        sd_tx.send(true).unwrap();
955        let cursor = task.await.unwrap().unwrap();
956        assert_eq!(cursor.as_u64(), Some(2));
957    }
958
959    /// Exactly `max` updates fills a batch and flushes immediately — before the
960    /// window would have elapsed.
961    #[tokio::test(start_paused = true)]
962    async fn flush_on_max() {
963        let max = 4;
964        let updates: Vec<_> = (1..=max as u64)
965            .map(|i| put(&format!("k{i}"), b"v", i))
966            .collect();
967        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
968
969        let flushes = Arc::new(Mutex::new(Vec::<usize>::new()));
970        let f = Arc::clone(&flushes);
971        let (sd_tx, sd_rx) = watch::channel(false);
972
973        let task = tokio::spawn(watch_applied(
974            watcher,
975            WatchScope::All,
976            None,
977            None, // reader (no resync in this test)
978            None::<AppendLogSnapshot>,
979            None,
980            BatchConfig {
981                window: Duration::from_secs(3600), // effectively never
982                max,
983            },
984            parse_put,
985            move |batch: Vec<Vec<u8>>| f.lock().unwrap().push(batch.len()),
986            move |_| {},
987            sd_rx,
988        ));
989
990        // Yield enough for the mock to push all `max` updates; the window is an
991        // hour, so any flush is purely the max trigger.
992        tokio::time::sleep(Duration::from_millis(1)).await;
993        assert_eq!(
994            *flushes.lock().unwrap(),
995            vec![max],
996            "a full batch should flush on max, not wait for the window"
997        );
998
999        sd_tx.send(true).unwrap();
1000        task.await.unwrap().unwrap();
1001    }
1002
1003    /// A pending batch plus a shutdown signal: the batch is flushed and the
1004    /// applied cursor returned.
1005    #[tokio::test(start_paused = true)]
1006    async fn flush_on_shutdown() {
1007        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1008        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1009
1010        let applied = Arc::new(AtomicU64::new(0));
1011        let a = Arc::clone(&applied);
1012        let (sd_tx, sd_rx) = watch::channel(false);
1013
1014        let task = tokio::spawn(watch_applied(
1015            watcher,
1016            WatchScope::All,
1017            None,
1018            None, // reader (no resync in this test)
1019            None::<AppendLogSnapshot>,
1020            None,
1021            BatchConfig {
1022                window: Duration::from_secs(3600), // window won't fire
1023                max: 100,
1024            },
1025            parse_put,
1026            move |_batch: Vec<Vec<u8>>| {},
1027            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1028            sd_rx,
1029        ));
1030
1031        // Give the mock time to deliver both updates into the pending batch.
1032        tokio::time::sleep(Duration::from_millis(1)).await;
1033        sd_tx.send(true).unwrap();
1034
1035        let cursor = task.await.unwrap().unwrap();
1036        assert_eq!(
1037            cursor.as_u64(),
1038            Some(2),
1039            "shutdown flushes the pending batch"
1040        );
1041        assert_eq!(applied.load(Ordering::SeqCst), 2);
1042    }
1043
1044    /// The cursor must not advance until `apply` has returned. We prove it by
1045    /// having `apply` read the cursor that `on_applied` last published: when the
1046    /// second batch is applied, the visible cursor must still be the *first*
1047    /// batch's — never the second's, which only becomes visible after this
1048    /// `apply` returns.
1049    #[tokio::test(start_paused = true)]
1050    async fn cursor_advances_only_after_apply() {
1051        // Two batches of `max` updates each.
1052        let max = 2usize;
1053        let updates: Vec<_> = (1..=4u64).map(|i| put(&format!("k{i}"), b"v", i)).collect();
1054        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1055
1056        // Cursor as last published by on_applied; starts at 0 (nothing applied).
1057        let published = Arc::new(AtomicU64::new(0));
1058        // What `apply` observed as the published cursor at the moment it ran.
1059        let seen_at_apply = Arc::new(Mutex::new(Vec::<u64>::new()));
1060
1061        let pub_for_apply = Arc::clone(&published);
1062        let seen = Arc::clone(&seen_at_apply);
1063        let pub_for_on = Arc::clone(&published);
1064        let (sd_tx, sd_rx) = watch::channel(false);
1065
1066        let task = tokio::spawn(watch_applied(
1067            watcher,
1068            WatchScope::All,
1069            None,
1070            None, // reader (no resync in this test)
1071            None::<AppendLogSnapshot>,
1072            None,
1073            BatchConfig {
1074                window: Duration::from_secs(3600),
1075                max,
1076            },
1077            parse_put,
1078            move |_batch: Vec<Vec<u8>>| {
1079                // The cursor visible here is whatever the PREVIOUS flush
1080                // published — never this batch's, because we haven't returned.
1081                seen.lock()
1082                    .unwrap()
1083                    .push(pub_for_apply.load(Ordering::SeqCst));
1084            },
1085            move |cur| pub_for_on.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1086            sd_rx,
1087        ));
1088
1089        tokio::time::sleep(Duration::from_millis(1)).await;
1090        sd_tx.send(true).unwrap();
1091        task.await.unwrap().unwrap();
1092
1093        // First apply saw 0 (nothing applied yet); second apply saw 2 (first
1094        // batch's cursor), NOT 4. The cursor only reached 4 after the second
1095        // apply returned.
1096        assert_eq!(*seen_at_apply.lock().unwrap(), vec![0, 2]);
1097        assert_eq!(published.load(Ordering::SeqCst), 4);
1098    }
1099
1100    /// Updates whose `parse` returns `None` (corrupt / irrelevant) carry no
1101    /// domain work, but they were still received — so the cursor must advance
1102    /// over them.
1103    #[tokio::test]
1104    async fn corrupt_parse_entries_advance_cursor() {
1105        let updates = vec![put("a", b"1", 5), put("b", b"2", 6), put("c", b"3", 7)];
1106        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1107
1108        let apply_calls = Arc::new(AtomicU64::new(0));
1109        let on_applied_max = Arc::new(AtomicU64::new(0));
1110        let ac = Arc::clone(&apply_calls);
1111        let om = Arc::clone(&on_applied_max);
1112        let (_sd_tx, sd_rx) = watch::channel(false);
1113
1114        let cursor = watch_applied(
1115            watcher,
1116            WatchScope::All,
1117            None,
1118            None, // reader (no resync in this test)
1119            None::<AppendLogSnapshot>,
1120            None,
1121            BatchConfig::default(),
1122            // Reject everything — simulates corrupt/irrelevant entries.
1123            |_u: &KvUpdate| -> Option<Vec<u8>> { None },
1124            move |batch: Vec<Vec<u8>>| {
1125                ac.fetch_add(1, Ordering::SeqCst);
1126                assert!(batch.is_empty());
1127            },
1128            move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1129            sd_rx,
1130        )
1131        .await
1132        .unwrap();
1133
1134        assert_eq!(cursor.as_u64(), Some(7), "cursor covers rejected updates");
1135        assert_eq!(
1136            apply_calls.load(Ordering::SeqCst),
1137            0,
1138            "an all-rejected batch applies nothing"
1139        );
1140        assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1141    }
1142
1143    /// A resume whose cursor has expired falls back to the full watch and still
1144    /// applies the delivered updates.
1145    #[tokio::test]
1146    async fn cursor_expired_falls_back_to_full_watch() {
1147        let mock = MockWatcher {
1148            full: Mutex::new(Some(vec![put("a", b"1", 10), put("b", b"2", 11)])),
1149            from: Mutex::new(Some(vec![])),
1150            from_expires: true,
1151            hold: false,
1152        };
1153        let watcher = Arc::new(mock);
1154
1155        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1156        let ab = Arc::clone(&applied_batches);
1157        let (_sd_tx, sd_rx) = watch::channel(false);
1158
1159        let cursor = watch_applied(
1160            watcher,
1161            WatchScope::All,
1162            Some(WatchCursor::from_u64(5)), // resume position that "expired"
1163            None,                           // reader (no resync in this test)
1164            None::<AppendLogSnapshot>,
1165            None,
1166            BatchConfig::default(),
1167            parse_put,
1168            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1169            move |_| {},
1170            sd_rx,
1171        )
1172        .await
1173        .unwrap();
1174
1175        assert_eq!(cursor.as_u64(), Some(11));
1176        assert_eq!(
1177            *applied_batches.lock().unwrap(),
1178            vec![b"1".to_vec(), b"2".to_vec()],
1179            "fallback full watch's updates were applied"
1180        );
1181    }
1182
1183    /// Cursor-expired resync: with a reader + store wired, a key the fold holds
1184    /// that the live listing no longer does gets a synthetic delete — applied
1185    /// strictly BEFORE the fallback re-list — and the persisted fold converges
1186    /// to the live state. The synthetic delete (unknown version) must not move
1187    /// the cursor; the re-list put must.
1188    #[tokio::test]
1189    async fn cursor_expired_resync_deletes_stale_keys() {
1190        let dir = tempfile::TempDir::new().unwrap();
1191        let path = dir.path().join("resync.snap");
1192        let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1193        // The fold from the previous run: node.a and node.b at cursor 2.
1194        store
1195            .apply(
1196                &[put("node.a", b"1", 1), put("node.b", b"2", 2)],
1197                &WatchCursor::from_u64(2),
1198            )
1199            .unwrap();
1200
1201        // During the gap node.b was deleted (marker since evicted) and node.a
1202        // updated; the resume cursor (2) has expired. The fallback re-list
1203        // therefore carries only the surviving key.
1204        let mock = MockWatcher {
1205            full: Mutex::new(Some(vec![put("node.a", b"1b", 10)])),
1206            from: Mutex::new(Some(vec![])),
1207            from_expires: true,
1208            hold: false,
1209        };
1210        let reader = MockReader {
1211            live: vec!["node.a".to_string()],
1212        };
1213
1214        // Record everything `parse` sees, in order, deletes included.
1215        let seen = Arc::new(Mutex::new(Vec::<(String, bool)>::new()));
1216        let s = Arc::clone(&seen);
1217        let (_sd_tx, sd_rx) = watch::channel(false);
1218
1219        let cursor = watch_applied(
1220            Arc::new(mock),
1221            WatchScope::All,
1222            Some(WatchCursor::from_u64(2)),
1223            Some(Arc::new(reader) as Arc<dyn KvReader>),
1224            Some(store),
1225            None,
1226            BatchConfig::default(),
1227            move |u: &KvUpdate| {
1228                s.lock()
1229                    .unwrap()
1230                    .push((u.key().to_string(), matches!(u, KvUpdate::Delete { .. })));
1231                Some(())
1232            },
1233            |_batch: Vec<()>| {},
1234            |_| {},
1235            sd_rx,
1236        )
1237        .await
1238        .unwrap();
1239
1240        // The re-list put advanced the cursor; the synthetic delete did not.
1241        assert_eq!(cursor.as_u64(), Some(10));
1242        // The synthetic delete strictly precedes the re-list put.
1243        assert_eq!(
1244            *seen.lock().unwrap(),
1245            vec![("node.b".to_string(), true), ("node.a".to_string(), false)],
1246            "synthetic delete must be applied before the fallback re-list"
1247        );
1248
1249        // The persisted fold converged: stale key gone, live key updated.
1250        let snap = crate::snapshot::load(&path).unwrap().unwrap();
1251        assert_eq!(snap.cursor.as_u64(), Some(10));
1252        assert_eq!(snap.entries.len(), 1);
1253        assert_eq!(snap.entries["node.a"].value, b"1b");
1254    }
1255
1256    /// A prefix-scoped resync diffs only in-scope keys: an out-of-scope key the
1257    /// fold holds survives, the in-scope stale key is deleted, and a flush
1258    /// containing only synthetic deletes leaves the cursor untouched.
1259    #[tokio::test]
1260    async fn cursor_expired_resync_respects_scope() {
1261        let dir = tempfile::TempDir::new().unwrap();
1262        let path = dir.path().join("resync-scope.snap");
1263        let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1264        store
1265            .apply(
1266                &[put("node.b", b"2", 1), put("other.z", b"9", 2)],
1267                &WatchCursor::from_u64(2),
1268            )
1269            .unwrap();
1270
1271        // Expired resume; the bucket no longer has ANY node.* keys; the
1272        // fallback re-list is empty.
1273        let mock = MockWatcher {
1274            full: Mutex::new(Some(vec![])),
1275            from: Mutex::new(Some(vec![])),
1276            from_expires: true,
1277            hold: false,
1278        };
1279        let reader = MockReader { live: vec![] };
1280        let (_sd_tx, sd_rx) = watch::channel(false);
1281
1282        let cursor = watch_applied(
1283            Arc::new(mock),
1284            WatchScope::Prefix("node.".to_string()),
1285            Some(WatchCursor::from_u64(2)),
1286            Some(Arc::new(reader) as Arc<dyn KvReader>),
1287            Some(store),
1288            None,
1289            BatchConfig::default(),
1290            |_u: &KvUpdate| Some(()),
1291            |_batch: Vec<()>| {},
1292            |_| {},
1293            sd_rx,
1294        )
1295        .await
1296        .unwrap();
1297
1298        // Deletes-only flush: cursor stays at the resume position.
1299        assert_eq!(cursor.as_u64(), Some(2));
1300
1301        let snap = crate::snapshot::load(&path).unwrap().unwrap();
1302        assert_eq!(snap.cursor.as_u64(), Some(2));
1303        assert!(
1304            !snap.entries.contains_key("node.b"),
1305            "in-scope stale key must be resync-deleted"
1306        );
1307        assert_eq!(
1308            snap.entries["other.z"].value, b"9",
1309            "out-of-scope key must survive a prefix-scoped resync"
1310        );
1311    }
1312
1313    /// `WatchScope::Prefixes` dispatches to `watch_prefixes` (no resume) and to
1314    /// `watch_prefixes_from` with the expiry → full-watch fallback (resume).
1315    #[tokio::test]
1316    async fn prefixes_scope_dispatches_full_watch() {
1317        let updates = vec![put("a.x", b"1", 1), put("b.y", b"2", 2)];
1318        let watcher = Arc::new(MockWatcher::new(updates, false));
1319        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1320        let ab = Arc::clone(&applied_batches);
1321        let (_sd_tx, sd_rx) = watch::channel(false);
1322
1323        let cursor = watch_applied(
1324            watcher,
1325            WatchScope::Prefixes(vec!["a.".to_string(), "b.".to_string()]),
1326            None,
1327            None, // reader (no resync in this test)
1328            None::<AppendLogSnapshot>,
1329            None,
1330            BatchConfig::default(),
1331            parse_put,
1332            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1333            move |_| {},
1334            sd_rx,
1335        )
1336        .await
1337        .unwrap();
1338
1339        assert_eq!(cursor.as_u64(), Some(2));
1340        assert_eq!(
1341            *applied_batches.lock().unwrap(),
1342            vec![b"1".to_vec(), b"2".to_vec()]
1343        );
1344    }
1345
1346    /// `WatchScope::Prefixes` resume whose cursor has expired falls back to the
1347    /// full multi-prefix watch and applies its updates.
1348    #[tokio::test]
1349    async fn prefixes_scope_expired_resume_falls_back() {
1350        let mock = MockWatcher {
1351            full: Mutex::new(Some(vec![put("a.x", b"1", 7)])),
1352            from: Mutex::new(Some(vec![])),
1353            from_expires: true,
1354            hold: false,
1355        };
1356        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1357        let ab = Arc::clone(&applied_batches);
1358        let (_sd_tx, sd_rx) = watch::channel(false);
1359
1360        let cursor = watch_applied(
1361            Arc::new(mock),
1362            WatchScope::Prefixes(vec!["a.".to_string()]),
1363            Some(WatchCursor::from_u64(3)),
1364            None, // reader (no resync in this test)
1365            None::<AppendLogSnapshot>,
1366            None,
1367            BatchConfig::default(),
1368            parse_put,
1369            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1370            move |_| {},
1371            sd_rx,
1372        )
1373        .await
1374        .unwrap();
1375
1376        assert_eq!(cursor.as_u64(), Some(7));
1377        assert_eq!(*applied_batches.lock().unwrap(), vec![b"1".to_vec()]);
1378    }
1379
1380    /// End-to-end with a real snapshot file: after the run, the persisted
1381    /// snapshot's cursor equals the applied cursor and its entries match the
1382    /// applied state — proving the checkpoint is written at the post-apply
1383    /// cursor, never ahead of it.
1384    #[tokio::test]
1385    async fn snapshot_checkpoint_matches_applied_cursor() {
1386        let dir = tempfile::TempDir::new().unwrap();
1387        let path = dir.path().join("applied.snap");
1388        let (_resume, store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1389
1390        let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
1391        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1392        let (_sd_tx, sd_rx) = watch::channel(false);
1393
1394        let cursor = watch_applied(
1395            watcher,
1396            WatchScope::All,
1397            None,
1398            None, // reader (no resync in this test)
1399            Some(store),
1400            None,
1401            BatchConfig::default(),
1402            parse_put,
1403            move |_batch: Vec<Vec<u8>>| {},
1404            move |_| {},
1405            sd_rx,
1406        )
1407        .await
1408        .unwrap();
1409
1410        assert_eq!(cursor.as_u64(), Some(2));
1411
1412        let snap = crate::snapshot::load(&path).unwrap().unwrap();
1413        assert_eq!(
1414            snap.cursor.as_u64(),
1415            cursor.as_u64(),
1416            "snapshot checkpoint cursor must equal the applied cursor"
1417        );
1418        assert_eq!(snap.entries.len(), 2);
1419        assert_eq!(snap.entries["node.a"].value, b"1");
1420        assert_eq!(snap.entries["node.b"].value, b"2");
1421    }
1422
1423    /// Happy-path resume: a non-expired cursor takes the `*_from` path and the
1424    /// delta (the `from` script, NOT the full set) is applied. Proves the
1425    /// resume branch delivers only post-cursor updates and advances to their
1426    /// max revision.
1427    #[tokio::test]
1428    async fn resume_from_cursor_delivers_only_delta() {
1429        let mock = MockWatcher {
1430            // `full` would be delivered only if the resume path were (wrongly)
1431            // bypassed; a non-empty distinguishing value makes that visible.
1432            full: Mutex::new(Some(vec![put("full.x", b"FULL", 1)])),
1433            from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
1434            from_expires: false,
1435            hold: false,
1436        };
1437        let watcher = Arc::new(mock);
1438
1439        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1440        let ab = Arc::clone(&applied_batches);
1441        let (_sd_tx, sd_rx) = watch::channel(false);
1442
1443        let cursor = watch_applied(
1444            watcher,
1445            WatchScope::All,
1446            Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
1447            None,                           // reader (no resync in this test)
1448            None::<AppendLogSnapshot>,
1449            None,
1450            BatchConfig::default(),
1451            parse_put,
1452            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1453            move |_| {},
1454            sd_rx,
1455        )
1456        .await
1457        .unwrap();
1458
1459        assert_eq!(
1460            cursor.as_u64(),
1461            Some(11),
1462            "cursor advances to the delta max"
1463        );
1464        assert_eq!(
1465            *applied_batches.lock().unwrap(),
1466            vec![b"3".to_vec(), b"4".to_vec()],
1467            "only the post-cursor delta is applied, never the full set"
1468        );
1469    }
1470
1471    /// `WatchScope::Prefix` with no resume dispatches to `watch_prefix` and
1472    /// applies the delivered updates. Every other test uses `WatchScope::All`;
1473    /// this covers the prefix dispatch arm.
1474    #[tokio::test]
1475    async fn prefix_scope_applies_delivered_updates() {
1476        let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
1477        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1478
1479        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1480        let ab = Arc::clone(&applied_batches);
1481        let (_sd_tx, sd_rx) = watch::channel(false);
1482
1483        let cursor = watch_applied(
1484            watcher,
1485            WatchScope::Prefix("node.".to_string()),
1486            None,
1487            None, // reader (no resync in this test)
1488            None::<AppendLogSnapshot>,
1489            None,
1490            BatchConfig::default(),
1491            parse_put,
1492            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1493            move |_| {},
1494            sd_rx,
1495        )
1496        .await
1497        .unwrap();
1498
1499        assert_eq!(cursor.as_u64(), Some(2));
1500        assert_eq!(
1501            *applied_batches.lock().unwrap(),
1502            vec![b"1".to_vec(), b"2".to_vec()]
1503        );
1504    }
1505
1506    /// `WatchScope::Prefix` happy-path resume: a non-expired cursor takes the
1507    /// `watch_prefix_from` path and only the delta is applied — the prefix
1508    /// twin of `resume_from_cursor_delivers_only_delta`.
1509    #[tokio::test]
1510    async fn prefix_resume_from_cursor_delivers_only_delta() {
1511        let mock = MockWatcher {
1512            // `full` would be delivered only if the resume path were (wrongly)
1513            // bypassed; a distinguishing value makes that visible.
1514            full: Mutex::new(Some(vec![put("node.x", b"FULL", 1)])),
1515            from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
1516            from_expires: false,
1517            hold: false,
1518        };
1519        let watcher = Arc::new(mock);
1520
1521        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1522        let ab = Arc::clone(&applied_batches);
1523        let (_sd_tx, sd_rx) = watch::channel(false);
1524
1525        let cursor = watch_applied(
1526            watcher,
1527            WatchScope::Prefix("node.".to_string()),
1528            Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
1529            None,                           // reader (no resync in this test)
1530            None::<AppendLogSnapshot>,
1531            None,
1532            BatchConfig::default(),
1533            parse_put,
1534            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1535            move |_| {},
1536            sd_rx,
1537        )
1538        .await
1539        .unwrap();
1540
1541        assert_eq!(
1542            cursor.as_u64(),
1543            Some(11),
1544            "cursor advances to the delta max"
1545        );
1546        assert_eq!(
1547            *applied_batches.lock().unwrap(),
1548            vec![b"3".to_vec(), b"4".to_vec()],
1549            "only the post-cursor delta is applied via watch_prefix_from"
1550        );
1551    }
1552
1553    /// `WatchScope::Prefix` resume whose cursor has expired falls back to the
1554    /// full `watch_prefix` and still applies the delivered updates — the prefix
1555    /// twin of `cursor_expired_falls_back_to_full_watch`.
1556    #[tokio::test]
1557    async fn prefix_cursor_expired_falls_back_to_full_prefix_watch() {
1558        let mock = MockWatcher {
1559            full: Mutex::new(Some(vec![put("node.a", b"1", 10), put("node.b", b"2", 11)])),
1560            from: Mutex::new(Some(vec![])),
1561            from_expires: true,
1562            hold: false,
1563        };
1564        let watcher = Arc::new(mock);
1565
1566        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1567        let ab = Arc::clone(&applied_batches);
1568        let (_sd_tx, sd_rx) = watch::channel(false);
1569
1570        let cursor = watch_applied(
1571            watcher,
1572            WatchScope::Prefix("node.".to_string()),
1573            Some(WatchCursor::from_u64(5)), // resume position that "expired"
1574            None,                           // reader (no resync in this test)
1575            None::<AppendLogSnapshot>,
1576            None,
1577            BatchConfig::default(),
1578            parse_put,
1579            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1580            move |_| {},
1581            sd_rx,
1582        )
1583        .await
1584        .unwrap();
1585
1586        assert_eq!(cursor.as_u64(), Some(11));
1587        assert_eq!(
1588            *applied_batches.lock().unwrap(),
1589            vec![b"1".to_vec(), b"2".to_vec()],
1590            "prefix fallback full watch's updates were applied"
1591        );
1592    }
1593
1594    /// The watch task's terminal error must propagate out of `watch_applied`
1595    /// rather than being swallowed as `Ok(applied)` when the channel closes.
1596    #[tokio::test]
1597    async fn watch_task_error_propagates() {
1598        let watcher = Arc::new(ErrorWatcher);
1599        let (_sd_tx, sd_rx) = watch::channel(false);
1600
1601        let result = watch_applied(
1602            watcher,
1603            WatchScope::All,
1604            None,
1605            None, // reader (no resync in this test)
1606            None::<AppendLogSnapshot>,
1607            None,
1608            BatchConfig::default(),
1609            parse_put,
1610            move |_batch: Vec<Vec<u8>>| {},
1611            move |_| {},
1612            sd_rx,
1613        )
1614        .await;
1615
1616        match result {
1617            Err(KvError::WatchError(msg)) => {
1618                assert!(msg.contains("injected"), "error carries the cause: {msg}");
1619            }
1620            other => panic!("expected WatchError, got {other:?}"),
1621        }
1622    }
1623
1624    /// A batch where `parse` accepts some updates and rejects others: the cursor
1625    /// must still advance to the highest *received* revision (covering the
1626    /// rejected entry in the middle), while `apply` sees only the accepted ones.
1627    #[tokio::test]
1628    async fn mixed_parse_advances_cursor_over_rejected_entries() {
1629        let updates = vec![
1630            put("keep.a", b"1", 5),
1631            put("skip.b", b"2", 6), // rejected by parse
1632            put("keep.c", b"3", 7),
1633        ];
1634        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1635
1636        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1637        let on_applied_max = Arc::new(AtomicU64::new(0));
1638        let ab = Arc::clone(&applied_batches);
1639        let om = Arc::clone(&on_applied_max);
1640        let (_sd_tx, sd_rx) = watch::channel(false);
1641
1642        let cursor = watch_applied(
1643            watcher,
1644            WatchScope::All,
1645            None,
1646            None, // reader (no resync in this test)
1647            None::<AppendLogSnapshot>,
1648            None,
1649            BatchConfig::default(),
1650            // Keep only keys under "keep."; reject everything else.
1651            |u: &KvUpdate| -> Option<Vec<u8>> {
1652                match u {
1653                    KvUpdate::Put(e) if e.key.starts_with("keep.") => Some(e.value.clone()),
1654                    _ => None,
1655                }
1656            },
1657            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1658            move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1659            sd_rx,
1660        )
1661        .await
1662        .unwrap();
1663
1664        assert_eq!(
1665            cursor.as_u64(),
1666            Some(7),
1667            "cursor covers the rejected middle entry (rev 6)"
1668        );
1669        assert_eq!(
1670            *applied_batches.lock().unwrap(),
1671            vec![b"1".to_vec(), b"3".to_vec()],
1672            "apply sees only the accepted entries"
1673        );
1674        assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1675    }
1676
1677    /// Shutdown before any update arrives: nothing was received, so the cursor
1678    /// stays at the resume position (here `none()`), `apply` never runs, and
1679    /// `on_applied` never fires.
1680    #[tokio::test(start_paused = true)]
1681    async fn shutdown_with_no_pending_batch() {
1682        let watcher = Arc::new(MockWatcher::new(vec![], true)); // deliver nothing, hold open
1683
1684        let apply_calls = Arc::new(AtomicU64::new(0));
1685        let on_applied_calls = Arc::new(AtomicU64::new(0));
1686        let ac = Arc::clone(&apply_calls);
1687        let oc = Arc::clone(&on_applied_calls);
1688        let (sd_tx, sd_rx) = watch::channel(false);
1689
1690        let task = tokio::spawn(watch_applied(
1691            watcher,
1692            WatchScope::All,
1693            None,
1694            None, // reader (no resync in this test)
1695            None::<AppendLogSnapshot>,
1696            None,
1697            BatchConfig::default(),
1698            parse_put,
1699            move |_batch: Vec<Vec<u8>>| {
1700                ac.fetch_add(1, Ordering::SeqCst);
1701            },
1702            move |_| {
1703                oc.fetch_add(1, Ordering::SeqCst);
1704            },
1705            sd_rx,
1706        ));
1707
1708        // Let the watcher attach and idle (it has nothing to deliver), then shut down.
1709        tokio::time::sleep(Duration::from_millis(1)).await;
1710        sd_tx.send(true).unwrap();
1711
1712        let cursor = task.await.unwrap().unwrap();
1713        assert_eq!(
1714            cursor.as_u64(),
1715            None,
1716            "no updates received → cursor unmoved"
1717        );
1718        assert_eq!(apply_calls.load(Ordering::SeqCst), 0, "apply never runs");
1719        assert_eq!(
1720            on_applied_calls.load(Ordering::SeqCst),
1721            0,
1722            "on_applied never fires"
1723        );
1724    }
1725
1726    /// An [`ExportRequest`] flushes the pending batch first, so the artifact's
1727    /// cursor is exactly the applied cursor — and the artifact is importable
1728    /// with the batched entries in it.
1729    #[tokio::test(start_paused = true)]
1730    async fn export_request_flushes_pending_batch_first() {
1731        let dir = tempfile::TempDir::new().unwrap();
1732        let store_path = dir.path().join("fold.snap");
1733        let artifact = dir.path().join("artifact");
1734        let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1735
1736        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1737        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1738        let (sd_tx, sd_rx) = watch::channel(false);
1739        let (ex_tx, ex_rx) = mpsc::channel(1);
1740
1741        let task = tokio::spawn(watch_applied(
1742            watcher,
1743            WatchScope::All,
1744            None,
1745            None, // reader (no resync in this test)
1746            Some(store),
1747            Some(ex_rx),
1748            BatchConfig {
1749                window: Duration::from_secs(3600), // window never fires
1750                max: 100,
1751            },
1752            parse_put,
1753            move |_batch: Vec<Vec<u8>>| {},
1754            move |_| {},
1755            sd_rx,
1756        ));
1757
1758        // Let both updates land in the (unflushed) pending batch, then export.
1759        tokio::time::sleep(Duration::from_millis(1)).await;
1760        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1761        ex_tx
1762            .send(ExportRequest {
1763                dest_dir: artifact.clone(),
1764                reply: reply_tx,
1765            })
1766            .await
1767            .unwrap();
1768
1769        let manifest = reply_rx.await.unwrap().expect("export succeeds");
1770        assert_eq!(
1771            manifest.cursor.as_u64(),
1772            Some(2),
1773            "pending batch flushed before export: artifact cursor is the applied cursor"
1774        );
1775
1776        // The artifact is importable and holds both batched entries.
1777        let (cursor, imported) =
1778            AppendLogSnapshot::import(&artifact, &dir.path().join("imported.snap"), u64::MAX)
1779                .unwrap();
1780        assert_eq!(cursor.as_u64(), Some(2));
1781        assert_eq!(imported.get("a").unwrap().unwrap().value, b"1");
1782        assert_eq!(imported.get("b").unwrap().unwrap().value, b"2");
1783
1784        sd_tx.send(true).unwrap();
1785        task.await.unwrap().unwrap();
1786    }
1787
1788    /// An [`ExportRequest`] that arrives with NOTHING pending (the window
1789    /// already flushed everything) still produces a valid artifact whose
1790    /// cursor is the applied cursor. The flush-before-export step must be a
1791    /// clean no-op, not an error or a cursor regression.
1792    #[tokio::test(start_paused = true)]
1793    async fn export_with_empty_pending_batch_succeeds() {
1794        let dir = tempfile::TempDir::new().unwrap();
1795        let store_path = dir.path().join("fold.snap");
1796        let artifact = dir.path().join("artifact");
1797        let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1798
1799        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1800        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1801        let (sd_tx, sd_rx) = watch::channel(false);
1802        let (ex_tx, ex_rx) = mpsc::channel(1);
1803
1804        let task = tokio::spawn(watch_applied(
1805            watcher,
1806            WatchScope::All,
1807            None,
1808            None, // reader (no resync in this test)
1809            Some(store),
1810            Some(ex_rx),
1811            BatchConfig::default(), // 10 ms window
1812            parse_put,
1813            move |_batch: Vec<Vec<u8>>| {},
1814            move |_| {},
1815            sd_rx,
1816        ));
1817
1818        // Let the window flush both updates, so the export request finds an
1819        // EMPTY pending batch.
1820        tokio::time::sleep(Duration::from_millis(50)).await;
1821
1822        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1823        ex_tx
1824            .send(ExportRequest {
1825                dest_dir: artifact.clone(),
1826                reply: reply_tx,
1827            })
1828            .await
1829            .unwrap();
1830        let manifest = reply_rx
1831            .await
1832            .unwrap()
1833            .expect("export succeeds with nothing pending");
1834        assert_eq!(
1835            manifest.cursor.as_u64(),
1836            Some(2),
1837            "artifact cursor is the applied cursor, unchanged by the no-op flush"
1838        );
1839
1840        // The artifact is importable and holds the already-flushed entries.
1841        let (cursor, imported) =
1842            AppendLogSnapshot::import(&artifact, &dir.path().join("imported.snap"), u64::MAX)
1843                .unwrap();
1844        assert_eq!(cursor.as_u64(), Some(2));
1845        assert_eq!(imported.get("a").unwrap().unwrap().value, b"1");
1846        assert_eq!(imported.get("b").unwrap().unwrap().value, b"2");
1847
1848        sd_tx.send(true).unwrap();
1849        task.await.unwrap().unwrap();
1850    }
1851
1852    /// An export request against a store-less watch replies with an error and
1853    /// the watch keeps running.
1854    #[tokio::test(start_paused = true)]
1855    async fn export_without_store_replies_error() {
1856        let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
1857        let (sd_tx, sd_rx) = watch::channel(false);
1858        let (ex_tx, ex_rx) = mpsc::channel(1);
1859
1860        let task = tokio::spawn(watch_applied(
1861            watcher,
1862            WatchScope::All,
1863            None,
1864            None, // reader (no resync in this test)
1865            None::<AppendLogSnapshot>,
1866            Some(ex_rx),
1867            BatchConfig::default(),
1868            parse_put,
1869            move |_batch: Vec<Vec<u8>>| {},
1870            move |_| {},
1871            sd_rx,
1872        ));
1873
1874        tokio::time::sleep(Duration::from_millis(1)).await;
1875        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1876        ex_tx
1877            .send(ExportRequest {
1878                dest_dir: std::env::temp_dir().join("never-created"),
1879                reply: reply_tx,
1880            })
1881            .await
1882            .unwrap();
1883        assert!(
1884            reply_rx.await.unwrap().is_err(),
1885            "no store → export errors via the reply"
1886        );
1887
1888        // The watch is still alive and returns its applied cursor on shutdown.
1889        sd_tx.send(true).unwrap();
1890        let cursor = task.await.unwrap().unwrap();
1891        assert_eq!(cursor.as_u64(), Some(1));
1892    }
1893
1894    /// An export failure (unavailable destination) is reported on the reply and
1895    /// the watch keeps applying later updates.
1896    #[tokio::test(start_paused = true)]
1897    async fn export_error_does_not_kill_watch() {
1898        let dir = tempfile::TempDir::new().unwrap();
1899        let store_path = dir.path().join("fold.snap");
1900        let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1901
1902        // Occupied destination → export fails.
1903        let occupied = dir.path().join("occupied");
1904        std::fs::create_dir(&occupied).unwrap();
1905        std::fs::write(occupied.join("stray"), b"x").unwrap();
1906
1907        let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
1908        let (sd_tx, sd_rx) = watch::channel(false);
1909        let (ex_tx, ex_rx) = mpsc::channel(1);
1910
1911        let applied = Arc::new(AtomicU64::new(0));
1912        let a = Arc::clone(&applied);
1913
1914        let task = tokio::spawn(watch_applied(
1915            watcher,
1916            WatchScope::All,
1917            None,
1918            None, // reader (no resync in this test)
1919            Some(store),
1920            Some(ex_rx),
1921            BatchConfig::default(),
1922            parse_put,
1923            move |_batch: Vec<Vec<u8>>| {},
1924            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1925            sd_rx,
1926        ));
1927
1928        tokio::time::sleep(Duration::from_millis(1)).await;
1929        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1930        ex_tx
1931            .send(ExportRequest {
1932                dest_dir: occupied,
1933                reply: reply_tx,
1934            })
1935            .await
1936            .unwrap();
1937        match reply_rx.await.unwrap() {
1938            Err(crate::snapshot::SnapshotError::ArtifactInvalid(_)) => {}
1939            other => panic!("expected ArtifactInvalid, got {other:?}"),
1940        }
1941
1942        // Watch still folds: a clean shutdown returns the applied cursor.
1943        sd_tx.send(true).unwrap();
1944        let cursor = task.await.unwrap().unwrap();
1945        assert_eq!(cursor.as_u64(), Some(1), "watch survived the failed export");
1946        assert_eq!(applied.load(Ordering::SeqCst), 1);
1947    }
1948
1949    /// Dropping the export sender disarms the arm; the loop keeps batching and
1950    /// flushing normally.
1951    #[tokio::test(start_paused = true)]
1952    async fn export_sender_dropped_disarms_channel() {
1953        let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
1954        let (sd_tx, sd_rx) = watch::channel(false);
1955        let (ex_tx, ex_rx) = mpsc::channel::<ExportRequest>(1);
1956
1957        let applied = Arc::new(AtomicU64::new(0));
1958        let a = Arc::clone(&applied);
1959
1960        let task = tokio::spawn(watch_applied(
1961            watcher,
1962            WatchScope::All,
1963            None,
1964            None, // reader (no resync in this test)
1965            None::<AppendLogSnapshot>,
1966            Some(ex_rx),
1967            BatchConfig::default(),
1968            parse_put,
1969            move |_batch: Vec<Vec<u8>>| {},
1970            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1971            sd_rx,
1972        ));
1973
1974        drop(ex_tx); // disarm
1975        tokio::time::sleep(Duration::from_millis(50)).await;
1976        assert_eq!(
1977            applied.load(Ordering::SeqCst),
1978            1,
1979            "loop keeps flushing after the export sender is gone"
1980        );
1981
1982        sd_tx.send(true).unwrap();
1983        task.await.unwrap().unwrap();
1984    }
1985
1986    /// With a low `compact_threshold`, the flush path's `spawn_blocking`
1987    /// compaction actually fires (every other snapshot test pins the threshold
1988    /// at `u64::MAX`, leaving that branch dead). After a compacting run the
1989    /// snapshot must still load cleanly with the right cursor and entries.
1990    #[tokio::test]
1991    async fn snapshot_compaction_fires_and_stays_consistent() {
1992        let dir = tempfile::TempDir::new().unwrap();
1993        let path = dir.path().join("applied.snap");
1994        // threshold 0 → every checkpoint reports "needs compact", forcing the
1995        // store's inline-compaction branch on each flush (run off the hot path via
1996        // spawn_blocking inside watch_applied).
1997        let (_resume, store) = AppendLogSnapshot::open(&path, 0).unwrap();
1998
1999        // Re-put the same key across flushes so compaction has duplicates to
2000        // dedup; small max forces multiple flushes (hence multiple compactions).
2001        let updates = vec![
2002            put("node.a", b"1", 1),
2003            put("node.a", b"2", 2),
2004            put("node.b", b"3", 3),
2005            put("node.a", b"4", 4),
2006        ];
2007        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
2008        let (_sd_tx, sd_rx) = watch::channel(false);
2009
2010        let cursor = watch_applied(
2011            watcher,
2012            WatchScope::All,
2013            None,
2014            None, // reader (no resync in this test)
2015            Some(store),
2016            None,
2017            BatchConfig {
2018                window: Duration::from_secs(3600),
2019                max: 1, // one update per flush → a compaction per update
2020            },
2021            parse_put,
2022            move |_batch: Vec<Vec<u8>>| {},
2023            move |_| {},
2024            sd_rx,
2025        )
2026        .await
2027        .unwrap();
2028
2029        assert_eq!(cursor.as_u64(), Some(4));
2030
2031        let snap = crate::snapshot::load(&path).unwrap().unwrap();
2032        assert_eq!(
2033            snap.cursor.as_u64(),
2034            cursor.as_u64(),
2035            "compacted snapshot's cursor still equals the applied cursor"
2036        );
2037        assert_eq!(snap.entries.len(), 2, "duplicates of node.a deduped");
2038        assert_eq!(
2039            snap.entries["node.a"].value, b"4",
2040            "last write per key survives compaction"
2041        );
2042        assert_eq!(snap.entries["node.b"].value, b"3");
2043    }
2044}