Skip to main content

slipstream/
applied.rs

1//! Cursor-after-apply watch combinator.
2//!
3//! [`watch_applied`] drives a [`KvWatcher`], batches incoming [`KvUpdate`]s over
4//! a short window (or a max count), hands each batch to a caller-supplied
5//! `apply` closure, and **only then** advances the resume cursor, checkpoints
6//! the snapshot, and fires `on_applied`. It encodes one discipline that every
7//! hand-rolled watch loop in the wider system gets subtly wrong:
8//!
9//! > **INVARIANT.** A persisted/reported cursor `C` implies every update with
10//! > revision ≤ `C` has been *applied* — the caller's `apply()` has returned for
11//! > it. The cursor never advances on *receipt* of an update, only after it has
12//! > durably taken effect.
13//!
14//! ## Why receipt is the wrong signal
15//!
16//! The tempting shortcut is to bump the cursor as each update arrives off the
17//! channel (`high_water = rev` on `rx.recv()`), then apply the batch later. On a
18//! crash between those two steps the persisted cursor claims "caught up to rev
19//! N" while rev N is still sitting in an unapplied buffer. On resume the watch
20//! starts *past* rev N and silently skips it — a correctness hole in the exact
21//! "resume after any restart" guarantee this crate advertises.
22//!
23//! Saltzer, Reed & Clark's *End-to-End Arguments in System Design* (1984) names
24//! the fix: a function placed below the endpoints (here, the channel receive)
25//! can only be a performance hint; the *endpoint* — the application of the
26//! update — is the only place the "it happened" guarantee can actually be
27//! established. So the cursor is written from `apply()`'s completion, not from
28//! the transport's delivery.
29//!
30//! The cursor-as-monotonic-index-into-a-log shape itself follows HashiCorp
31//! Consul's anti-entropy / blocking-query lineage: a client holds the last index
32//! it has *reconciled* and re-arms the watch from there, never from the index it
33//! merely *saw*.
34//!
35//! ## What the caller supplies
36//!
37//! - `parse`: maps a raw [`KvUpdate`] to an optional domain value `U`. Returning
38//!   `None` (corrupt bytes, irrelevant key) is fine — the update is still
39//!   *received*, so it still counts toward the cursor; there is simply nothing to
40//!   apply for it.
41//! - `apply`: consumes a `Vec<U>` in revision order. This is the only domain
42//!   logic; for the tunnel router it swaps the route table, for the edge origin
43//!   watcher it rebuilds the hashrings.
44//! - `on_applied`: fires once per flush, *after* `apply` returns, with the new
45//!   applied cursor. Callers use it to persist the cursor for the next restart.
46//!
47//! ## Panics
48//!
49//! `apply` runs inline on the watch task. If it panics, the panic propagates out
50//! of [`watch_applied`] and aborts the watch — that is the caller's contract,
51//! the same as a panic in any other supplied closure.
52
53use std::path::PathBuf;
54use std::sync::Arc;
55use std::time::Duration;
56
57use tokio::sync::mpsc;
58use tokio::sync::{oneshot, watch};
59use tracing::warn;
60
61use crate::artifact::ExportManifest;
62use crate::kv::{KvError, KvReader, KvUpdate, KvWatcher, WatchCursor};
63use crate::snapshot::{SnapshotError, SnapshotStore};
64
65/// A request, sent into a running [`watch_applied`] loop, to export the fold it
66/// owns (see [`SnapshotStore::export_to`]).
67///
68/// `watch_applied` takes its snapshot store **by value**, so a consumer that
69/// wants periodic artifacts of a live fold cannot call `export_to` itself. It
70/// instead passes an `mpsc::Receiver<ExportRequest>` to [`watch_applied`] and
71/// sends requests through the paired sender. The loop handles a request
72/// between batch flushes — after flushing any pending batch — so the artifact's
73/// embedded cursor is exactly the applied cursor at the moment of export.
74///
75/// The export result (or error) comes back on `reply`; an export failure is
76/// reported there and the watch keeps running (the snapshot is a cache — a
77/// failed artifact is the requester's problem, not the fold's).
78pub struct ExportRequest {
79    /// Where the artifact directory will be created. Must not exist (or be an
80    /// empty directory); same filesystem as the fold for cheap hardlinks.
81    pub dest_dir: PathBuf,
82    /// Receives the sealed manifest on success. A dropped receiver is ignored.
83    pub reply: oneshot::Sender<Result<ExportManifest, SnapshotError>>,
84}
85
86/// What to watch: every key, every key under a prefix, or the union of several
87/// prefixes.
88///
89/// Mirrors the [`KvWatcher`] surface — `All` maps to `watch_all` /
90/// `watch_all_from`, `Prefix` to `watch_prefix` / `watch_prefix_from`,
91/// `Prefixes` to `watch_prefixes` / `watch_prefixes_from` (one multi-filter
92/// consumer for the whole union, never one consumer per prefix).
93#[derive(Debug, Clone)]
94pub enum WatchScope {
95    /// Watch all keys in the bucket.
96    All,
97    /// Watch only keys beginning with this prefix.
98    Prefix(String),
99    /// Watch keys beginning with ANY of these prefixes, on a single consumer.
100    Prefixes(Vec<String>),
101}
102
103impl WatchScope {
104    /// The scope as a list of key prefixes (`All` = the empty prefix), for
105    /// callers that enumerate scope contents (live listings, fold ranges).
106    fn prefixes(&self) -> Vec<String> {
107        match self {
108            WatchScope::All => vec![String::new()],
109            WatchScope::Prefix(p) => vec![p.clone()],
110            WatchScope::Prefixes(ps) => ps.clone(),
111        }
112    }
113}
114
115/// Internal: a cursor-expired resync handoff from the watch task to the main
116/// loop. Carries the bucket's live key listing for the watch scope; the main
117/// loop diffs it against the fold and applies synthetic deletes for keys that
118/// vanished during the gap, then acks so the watch task can start the fallback
119/// watch — the ack ordering guarantees every synthetic delete is applied before
120/// the first re-list put arrives.
121struct ResyncRequest {
122    live_keys: Vec<String>,
123    ack: oneshot::Sender<()>,
124}
125
126/// Internal: what the watch task needs to initiate a resync — the reader that
127/// lists live keys and the channel into the main loop that owns the fold.
128type ResyncHandle = (Arc<dyn KvReader>, mpsc::Sender<ResyncRequest>);
129
130/// Batching policy for [`watch_applied`].
131///
132/// A flush fires when **either** bound is hit, whichever comes first: `window`
133/// time has elapsed since the batch opened, or `max` updates have accumulated.
134/// The window amortizes the cost of `apply` (e.g. one route-table clone per
135/// flush instead of one per update); `max` caps memory and latency when updates
136/// arrive faster than the window.
137#[derive(Debug, Clone, Copy)]
138pub struct BatchConfig {
139    /// Maximum time a batch stays open before being flushed.
140    pub window: Duration,
141    /// Maximum number of parsed updates in a batch before forcing a flush.
142    pub max: usize,
143}
144
145impl Default for BatchConfig {
146    /// 10 ms / 100 updates — the de-facto default every hand-rolled caller
147    /// already used, lifted into one place.
148    fn default() -> Self {
149        Self {
150            window: Duration::from_millis(10),
151            max: 100,
152        }
153    }
154}
155
156/// Drive a watch with cursor-after-apply semantics.
157///
158/// Subscribes per `scope` (resuming from `resume` when it carries a position),
159/// batches updates per `config`, applies each batch via `apply`, and only then
160/// advances the cursor / folds the batch into `store` / calls `on_applied`.
161/// Returns the final applied cursor when the watch ends (shutdown signalled, or
162/// the underlying stream closed).
163///
164/// `store` is any [`SnapshotStore`] backend the consumer chose (the in-RAM
165/// [`AppendLogSnapshot`](crate::AppendLogSnapshot) default, an on-disk backend, or
166/// its own impl) — or `None` to run without persistence. On each flush, *after*
167/// `apply` returns, the whole batch of raw [`KvUpdate`]s is handed to
168/// `store.apply(batch, applied_cursor)` on a blocking task, so the store's
169/// persisted cursor is always the post-apply cursor and never names a revision
170/// whose `apply` had not returned. The store fold is atomic (data + cursor), so a
171/// crash leaves the store consistent and resume re-folds only the tail.
172///
173/// # Cursor expiry and stale-key resync
174///
175/// On [`KvError::CursorExpired`] from the `*_from` resume path, this logs and
176/// falls back to a full-scope watch (`watch_all` / `watch_prefix` /
177/// `watch_prefixes`), whose state-sync re-list re-delivers the current value of
178/// every in-scope key as puts. The re-list alone cannot cover keys that were
179/// **deleted during the gap** and whose delete markers the backend has since
180/// evicted — they simply don't appear, leaving the fold (and the caller's
181/// domain state) holding them forever.
182///
183/// When both `reader` and `store` are provided, the expiry path closes that
184/// hole: before the fallback watch starts, the bucket's live keys are listed
185/// via `reader`, diffed against the fold's in-scope keys, and a synthetic
186/// [`KvUpdate::Delete`] (with an unknown [`VersionToken`](crate::VersionToken)) is run through
187/// `parse`/`apply`/`store` for each key that vanished. The synthetic deletes
188/// are strictly ordered before the first re-list put, so a key deleted and
189/// re-created during the gap converges correctly. Without a `reader` (or
190/// without a `store` to diff against) the fallback is re-list-only and a
191/// warning marks the possible stale keys.
192///
193/// A resync that was armed but FAILS (live-key listing or fold diff error) is
194/// fatal to the watch — degrading to re-list-only would silently leave
195/// deleted keys in the fold (`tests/model.rs` proves that divergence
196/// reachable), so the error surfaces and the caller's restart retries the
197/// resume → expiry → resync path from scratch.
198///
199/// See `ARCHITECTURE.md` ("Applied-Cursor Watch") for the invariant and its
200/// rationale.
201///
202/// # Type parameters
203/// - `U`: the caller's domain update type, produced by `parse` and consumed by
204///   `apply`.
205// This combinator takes each of its dependencies as a parameter so every
206// caller-supplied closure (`parse`/`apply`/`on_applied`) keeps its own distinct
207// type and is monomorphized at the call site. Folding them into a builder struct
208// would either box the closures or force a single generic bundle, losing that.
209#[allow(clippy::too_many_arguments)]
210// The flush macro resets `batch_high`/`batch_deadline` for the next loop
211// iteration. At the two flush sites that return immediately afterward (shutdown,
212// channel-close) those resets are dead stores — correct, but flagged. The allow
213// must sit on the function: a statement-scoped `#[allow]` inside the macro body
214// trips the experimental attributes-on-expressions gate (E0658) on stable.
215#[allow(unused_assignments)]
216pub async fn watch_applied<U, S, P, A, O>(
217    watcher: Arc<dyn KvWatcher>,
218    scope: WatchScope,
219    resume: Option<WatchCursor>,
220    // `Some(reader)` arms the cursor-expired stale-key resync (see the function
221    // docs); `None` keeps the re-list-only fallback. Only consulted on expiry —
222    // the hot path never touches it.
223    reader: Option<Arc<dyn KvReader>>,
224    mut store: Option<S>,
225    // `Some(rx)` arms an export-request arm in the select loop: each
226    // [`ExportRequest`] is handled between flushes (pending batch flushed
227    // first), so the exported artifact's cursor is exactly the applied cursor.
228    // `None` (or dropping the paired sender) leaves the loop's behavior
229    // unchanged.
230    mut exports: Option<mpsc::Receiver<ExportRequest>>,
231    config: BatchConfig,
232    mut parse: P,
233    mut apply: A,
234    mut on_applied: O,
235    mut shutdown: watch::Receiver<bool>,
236) -> Result<WatchCursor, KvError>
237where
238    U: Send,
239    // `Send + 'static`: each flush moves `store` onto a blocking task to run its
240    // (potentially blocking) `apply`, then takes it back — the same offload the
241    // append log's compaction always used.
242    S: SnapshotStore + Send + 'static,
243    P: FnMut(&KvUpdate) -> Option<U> + Send,
244    A: FnMut(Vec<U>) + Send,
245    O: FnMut(WatchCursor) + Send,
246{
247    // The cursor we'll return. Initialized from the resume position so that a
248    // watch which receives nothing new still reports the position it resumed
249    // from as "applied" (it is — everything up to it was applied before the last
250    // run persisted it).
251    let mut applied = match &resume {
252        Some(c) => c.clone(),
253        None => WatchCursor::none(),
254    };
255
256    // The scope's prefixes, for the resync diff against the fold. Cloned out
257    // before `scope` moves into the watch task.
258    let scope_prefixes = scope.prefixes();
259
260    // Cursor-expired resync channel, armed only when there is a reader to list
261    // live keys AND a store to diff them against. The watch task sends the live
262    // listing here and waits for the ack before starting the fallback watch, so
263    // synthetic deletes always precede the re-list.
264    let (resync_pair, mut resyncs): (Option<ResyncHandle>, Option<mpsc::Receiver<ResyncRequest>>) =
265        match reader {
266            Some(reader) if store.is_some() => {
267                let (rs_tx, rs_rx) = mpsc::channel(1);
268                (Some((reader, rs_tx)), Some(rs_rx))
269            }
270            _ => (None, None),
271        };
272
273    // Spawn the watch task. It owns the cursor-expired fallback so the main loop
274    // only ever sees a clean ordered stream of updates on `rx`.
275    let (tx, mut rx) = mpsc::channel::<KvUpdate>(256);
276    let handle = {
277        let watcher = Arc::clone(&watcher);
278        tokio::spawn(
279            async move { run_watch(watcher.as_ref(), &scope, resume, resync_pair, tx).await },
280        )
281    };
282
283    // Batch state.
284    //
285    // `batch_high` tracks the version of the most recently *received* update
286    // since the last flush — including updates `parse` rejected. NATS delivers
287    // in revision order, so the last received is the highest, and advancing the
288    // cursor to it after a single atomic `apply` is correct: having seen the max
289    // means we've seen everything below it, and a rejected entry is still
290    // "nothing to apply", hence covered. Reset to `none()` after every flush.
291    // Pre-size to the flush bound so no batch ever re-climbs the reallocation
292    // ladder; `max(1)` only guards a nonsensical `max = 0` config.
293    let batch_cap = config.max.max(1);
294    let mut batch: Vec<U> = Vec::with_capacity(batch_cap);
295    // Raw received updates for the durable `store`, in revision order. Only
296    // populated when a `store` is present; the store folds the *raw* updates
297    // (including ones `parse` rejected — they are still part of the bucket's
298    // state), whereas the parsed `batch` above is the consumer's domain view.
299    let mut raw_batch: Vec<KvUpdate> = Vec::new();
300    let mut batch_high = WatchCursor::none();
301    // `Some` once a batch has opened and the window timer is armed; `None`
302    // between flushes. Only the armed/idle distinction is read in the loop —
303    // the absolute instant lives in the pinned `sleep` future below.
304    let mut batch_deadline: Option<tokio::time::Instant> = None;
305
306    // Flush the current batch, in order: run the domain `apply` (if non-empty) to
307    // completion, advance the cursor, fold the raw batch + cursor durably into
308    // `store`, then fire `on_applied`. The store fold runs on a blocking task
309    // (its `apply` may block on I/O), moving the store in and taking it back — the
310    // same offload the append log's compaction always used. A store error is
311    // logged and the watch continues (the snapshot is a cache); a panicked
312    // blocking task drops the store irrecoverably, which breaks the
313    // resume-after-restart guarantee, so it is surfaced as fatal.
314    macro_rules! flush {
315        () => {{
316            // Nothing received since the last flush → nothing to do at all.
317            // (`raw_batch` can be non-empty with no cursor advance only via the
318            // resync path's synthetic deletes, which carry no revision.)
319            if !batch.is_empty() || !raw_batch.is_empty() || !batch_high.is_none() {
320                if !batch.is_empty() {
321                    // INVARIANT: apply() runs and RETURNS before any cursor
322                    // advance below. Move the batch out so a panicking apply
323                    // can't leave half-consumed state behind.
324                    //
325                    // `replace` (not `take`) leaves a pre-sized Vec behind so each
326                    // batch after the first doesn't re-climb the reallocation
327                    // ladder (4→8→…→cap).
328                    apply(std::mem::replace(&mut batch, Vec::with_capacity(batch_cap)));
329                }
330                let advanced = !batch_high.is_none();
331                if advanced {
332                    applied = batch_high.clone();
333                }
334                if !raw_batch.is_empty()
335                    && let Some(mut st) = store.take()
336                {
337                    let raw = std::mem::take(&mut raw_batch);
338                    // Fold at the post-advance cursor. A synthetic-deletes-only
339                    // batch leaves the cursor where it was (the deletes are a
340                    // state correction, not log entries), which is safe: an
341                    // unchanged — possibly expired — cursor only ever re-runs
342                    // this same resync on the next restart.
343                    let cur = applied.clone();
344                    // Hand the store back unconditionally on a clean return so
345                    // a *failed* apply (Ok(Err)) keeps the watch running; only
346                    // a *panicked* task (Err) loses the store and is fatal.
347                    match tokio::task::spawn_blocking(move || {
348                        let res = st.apply(&raw, &cur);
349                        (st, res)
350                    })
351                    .await
352                    {
353                        Ok((st, Ok(()))) => store = Some(st),
354                        Ok((st, Err(e))) => {
355                            warn!(error = %e, "snapshot store apply failed; continuing");
356                            store = Some(st);
357                        }
358                        Err(e) => {
359                            warn!(error = %e, "snapshot store task panicked; aborting watch");
360                            handle.abort();
361                            return Err(KvError::WatchError(format!(
362                                "snapshot store task panicked: {e}"
363                            )));
364                        }
365                    }
366                }
367                if advanced {
368                    on_applied(applied.clone());
369                    batch_high = WatchCursor::none();
370                }
371            }
372            batch_deadline = None;
373        }};
374    }
375
376    // A single timer future, reset in place each time a batch opens. The old
377    // `tokio::time::sleep(timeout)` lived inside the select arm, so it was
378    // re-created on every loop iteration — one Arc-backed timer-wheel entry
379    // allocated, registered, and immediately dropped per received update.
380    // Pinning one future and `reset`-ing it reuses that single allocation; the
381    // `if batch_deadline.is_some()` guard keeps it from firing while idle, so
382    // its initial already-elapsed deadline is never observed.
383    let sleep = tokio::time::sleep(Duration::ZERO);
384    tokio::pin!(sleep);
385
386    loop {
387        tokio::select! {
388            biased;
389
390            // Shutdown wins: flush whatever is batched (so the cursor reflects
391            // it), abandon any updates still in flight on the channel — they
392            // weren't applied, the cursor doesn't claim them, and they'll be
393            // re-delivered on the next resume — and return the applied cursor.
394            res = shutdown.changed() => {
395                if res.is_err() || *shutdown.borrow() {
396                    flush!();
397                    handle.abort();
398                    // Observe the task's terminal state. An abort surfaces as a
399                    // cancelled JoinError, which we ignore; a genuine panic that
400                    // raced ahead of the abort is logged rather than silently lost.
401                    if let Err(join) = handle.await
402                        && !join.is_cancelled()
403                    {
404                        warn!(error = %join, "watch task panicked at shutdown");
405                    }
406                    return Ok(applied);
407                }
408            }
409
410            // Batch window elapsed.
411            () = &mut sleep, if batch_deadline.is_some() => {
412                flush!();
413            }
414
415            // Cursor-expired resync. Placed before `rx.recv()` (biased) so the
416            // synthetic deletes are folded before any update the fallback watch
417            // delivers — though the ack protocol already guarantees the fallback
418            // hasn't started while this arm runs. Diff the fold's in-scope keys
419            // against the bucket's live listing; anything the fold holds that
420            // the bucket no longer does vanished during the gap (its delete
421            // marker evicted with the cursor), so synthesize the delete the
422            // re-list can't deliver.
423            req = async { resyncs.as_mut().expect("arm guarded by is_some").recv().await },
424                if resyncs.is_some() => {
425                match req {
426                    Some(ResyncRequest { live_keys, ack }) => {
427                        // Flush first so the diff runs against a fold that
428                        // reflects everything received so far.
429                        flush!();
430                        let live: std::collections::HashSet<&str> =
431                            live_keys.iter().map(String::as_str).collect();
432                        let mut stale: Vec<String> = Vec::new();
433                        if let Some(st) = &store {
434                            for prefix in &scope_prefixes {
435                                match st.range(prefix) {
436                                    Ok(entries) => stale.extend(
437                                        entries
438                                            .into_iter()
439                                            .filter(|e| !live.contains(e.key.as_str()))
440                                            .map(|e| e.key),
441                                    ),
442                                    Err(e) => {
443                                        // FATAL, not a degrade: an incomplete
444                                        // diff silently leaves deleted keys in
445                                        // the fold forever (tests/model.rs
446                                        // proves the divergence reachable
447                                        // under degrade semantics). Fail the
448                                        // watch; the restart re-runs the
449                                        // resume → expiry → resync from
450                                        // scratch.
451                                        warn!(error = %e, prefix = %prefix,
452                                            "resync fold range failed; aborting watch rather than diverging");
453                                        handle.abort();
454                                        return Err(KvError::WatchError(format!(
455                                            "cursor-expired resync failed listing fold prefix {prefix:?}: {e}"
456                                        )));
457                                    }
458                                }
459                            }
460                        }
461                        // Overlapping prefixes can list a key twice.
462                        stale.sort_unstable();
463                        stale.dedup();
464                        if !stale.is_empty() {
465                            warn!(stale = stale.len(), "cursor-expired resync: deleting keys that vanished during the gap");
466                        }
467                        for key in stale {
468                            // Synthetic: carries no revision (unknown version)
469                            // and so never advances the cursor.
470                            let u = KvUpdate::Delete {
471                                key,
472                                version: crate::kv::VersionToken::unknown(),
473                            };
474                            if store.is_some() {
475                                raw_batch.push(u.clone());
476                            }
477                            if let Some(parsed) = parse(&u) {
478                                batch.push(parsed);
479                            }
480                        }
481                        flush!();
482                        // Ack AFTER the deletes are applied: the watch task is
483                        // holding the fallback watch until it hears back, which
484                        // is what orders deletes before the re-list.
485                        let _ = ack.send(());
486                    }
487                    None => resyncs = None,
488                }
489            }
490
491            // Export request. Placed after shutdown/window (they stay prompt)
492            // and before `rx.recv()` so a firehose of updates cannot starve an
493            // export indefinitely. The pending batch is flushed first, so the
494            // exported cursor is exactly the applied cursor; the export itself
495            // runs on a blocking task with the store moved in and taken back —
496            // the same offload the flush path uses.
497            req = async { exports.as_mut().expect("arm guarded by is_some").recv().await },
498                if exports.is_some() => {
499                match req {
500                    Some(ExportRequest { dest_dir, reply }) => {
501                        flush!();
502                        match store.take() {
503                            Some(mut st) => {
504                                match tokio::task::spawn_blocking(move || {
505                                    let res = st.export_to(&dest_dir);
506                                    (st, res)
507                                })
508                                .await
509                                {
510                                    // Hand the store back on any clean return; an
511                                    // export failure goes to the requester only —
512                                    // the watch keeps running (the snapshot is a
513                                    // cache). A panicked task lost the store,
514                                    // which breaks the resume guarantee: fatal,
515                                    // same as the flush path's apply panic.
516                                    Ok((st, res)) => {
517                                        store = Some(st);
518                                        let _ = reply.send(res);
519                                    }
520                                    Err(e) => {
521                                        warn!(error = %e, "snapshot export task panicked; aborting watch");
522                                        handle.abort();
523                                        return Err(KvError::WatchError(format!(
524                                            "snapshot export task panicked: {e}"
525                                        )));
526                                    }
527                                }
528                            }
529                            None => {
530                                let _ = reply.send(Err(SnapshotError::Backend(
531                                    "watch_applied runs without a snapshot store; nothing to export"
532                                        .into(),
533                                )));
534                            }
535                        }
536                    }
537                    // Sender dropped: disarm the arm for the rest of the run.
538                    None => exports = None,
539                }
540            }
541
542            update = rx.recv() => {
543                match update {
544                    Some(u) => {
545                        // Cursor authority: every received update bumps the
546                        // pending high-water, regardless of whether `parse`
547                        // keeps it.
548                        batch_high = WatchCursor::from_version(u.version().clone());
549
550                        // Buffer the raw update for the durable store fold (which
551                        // commits the whole batch + cursor atomically on flush).
552                        // Done before `parse` consumes `u` by reference, and only
553                        // when a store is present so the no-persistence path keeps
554                        // its zero-copy cost.
555                        if store.is_some() {
556                            raw_batch.push(u.clone());
557                        }
558
559                        if let Some(parsed) = parse(&u) {
560                            batch.push(parsed);
561                        }
562
563                        // Arm the window on the first received update of a batch
564                        // — even a parse-rejected one, so the cursor advances
565                        // within `window` even through a run of irrelevant keys.
566                        // Reset the pinned timer to the new deadline rather than
567                        // allocating a fresh `Sleep`.
568                        if batch_deadline.is_none() {
569                            let deadline = tokio::time::Instant::now() + config.window;
570                            sleep.as_mut().reset(deadline);
571                            batch_deadline = Some(deadline);
572                        }
573
574                        // Flush on a full parsed batch, or — when persisting — a
575                        // full raw batch, so a window packed with parse-rejected
576                        // updates can't grow `raw_batch` without bound before the
577                        // window elapses.
578                        if batch.len() >= config.max || raw_batch.len() >= config.max {
579                            flush!();
580                        }
581                    }
582                    None => {
583                        // Stream closed. Flush the remainder, then surface the
584                        // watch task's terminal result: a clean end returns the
585                        // applied cursor, an error propagates.
586                        flush!();
587                        return match handle.await {
588                            Ok(Ok(())) => Ok(applied),
589                            Ok(Err(e)) => Err(e),
590                            Err(join) => Err(KvError::WatchError(format!(
591                                "watch task panicked: {join}"
592                            ))),
593                        };
594                    }
595                }
596            }
597        }
598    }
599}
600
601/// Run the underlying watch for `scope`, resuming from `resume` when it carries
602/// a position, with the [`KvError::CursorExpired`] → resync + full-watch
603/// fallback.
604async fn run_watch(
605    watcher: &dyn KvWatcher,
606    scope: &WatchScope,
607    resume: Option<WatchCursor>,
608    resync: Option<ResyncHandle>,
609    tx: mpsc::Sender<KvUpdate>,
610) -> Result<(), KvError> {
611    // Resume only when the cursor carries a real position; an absent or `none()`
612    // cursor falls through to a full watch. Binding `cursor` here makes "we have a
613    // resume position" structural — there is no separate bool whose truth a later
614    // edit could let drift from the `Some`.
615    let resume_cursor = resume.filter(|c| !c.is_none());
616
617    match scope {
618        WatchScope::All => {
619            if let Some(cursor) = resume_cursor {
620                match watcher.watch_all_from(&cursor, tx.clone()).await {
621                    Err(KvError::CursorExpired) => {
622                        warn!(
623                            "watch cursor expired; resyncing, then falling back to full watch_all"
624                        );
625                        resync_stale_keys(scope, &resync).await?;
626                        watcher.watch_all(tx).await
627                    }
628                    other => other,
629                }
630            } else {
631                watcher.watch_all(tx).await
632            }
633        }
634        WatchScope::Prefix(prefix) => {
635            if let Some(cursor) = resume_cursor {
636                match watcher.watch_prefix_from(prefix, &cursor, tx.clone()).await {
637                    Err(KvError::CursorExpired) => {
638                        warn!(
639                            "watch cursor expired; resyncing, then falling back to full watch_prefix"
640                        );
641                        resync_stale_keys(scope, &resync).await?;
642                        watcher.watch_prefix(prefix, tx).await
643                    }
644                    other => other,
645                }
646            } else {
647                watcher.watch_prefix(prefix, tx).await
648            }
649        }
650        WatchScope::Prefixes(prefixes) => {
651            let refs: Vec<&str> = prefixes.iter().map(String::as_str).collect();
652            if let Some(cursor) = resume_cursor {
653                match watcher
654                    .watch_prefixes_from(&refs, &cursor, tx.clone())
655                    .await
656                {
657                    Err(KvError::CursorExpired) => {
658                        warn!(
659                            "watch cursor expired; resyncing, then falling back to full watch_prefixes"
660                        );
661                        resync_stale_keys(scope, &resync).await?;
662                        watcher.watch_prefixes(&refs, tx).await
663                    }
664                    other => other,
665                }
666            } else {
667                watcher.watch_prefixes(&refs, tx).await
668            }
669        }
670    }
671}
672
673/// Cursor-expired stale-key resync, run BEFORE the fallback watch is
674/// established: list the scope's live keys, hand them to the main loop (which
675/// diffs them against the fold and applies synthetic deletes), and wait for the
676/// ack. That ordering — deletes applied, then fallback watch armed — is what
677/// makes a delete-then-recreate during the gap converge: the synthetic delete
678/// always lands before the re-list put.
679///
680/// With no reader/store wired (`resync` is `None`) the caller explicitly opted
681/// out: warn and fall back re-list-only (keys deleted during the gap stay in
682/// the fold — `tests/model.rs` pins this divergence as reachable).
683///
684/// A FAILED listing, by contrast, is **fatal** — it fails the watch rather
685/// than degrading. The resync is load-bearing for the "stale, never corrupt"
686/// convergence guarantee: a silently degraded resync leaves the fold holding
687/// keys the bucket deleted, with one warn line as the only witness
688/// (`tests/model.rs` proves this divergence reachable under degrade
689/// semantics). Failing the watch turns the violated guarantee into a visible
690/// error; the caller's restart re-resumes, hits `CursorExpired` again, and
691/// retries the resync from scratch.
692async fn resync_stale_keys(
693    scope: &WatchScope,
694    resync: &Option<ResyncHandle>,
695) -> Result<(), KvError> {
696    let Some((reader, resync_tx)) = resync else {
697        warn!(
698            "no reader wired for cursor-expired resync; keys deleted during the gap may persist in the fold"
699        );
700        return Ok(());
701    };
702    let mut live_keys = Vec::new();
703    for prefix in scope.prefixes() {
704        match reader.keys(&prefix).await {
705            Ok(keys) => live_keys.extend(keys),
706            Err(e) => {
707                return Err(KvError::WatchError(format!(
708                    "cursor-expired resync failed listing live keys under {prefix:?}: {e}; \
709                     failing the watch rather than silently keeping stale keys"
710                )));
711            }
712        }
713    }
714    let (ack_tx, ack_rx) = oneshot::channel();
715    if resync_tx
716        .send(ResyncRequest {
717            live_keys,
718            ack: ack_tx,
719        })
720        .await
721        .is_ok()
722    {
723        // A dropped ack (main loop shutting down) just means the fallback watch
724        // is about to die with it; nothing to recover.
725        let _ = ack_rx.await;
726    }
727    Ok(())
728}
729
730#[cfg(test)]
731mod tests {
732    use super::*;
733    use crate::kv::{KvEntry, VersionToken};
734    use crate::snapshot::AppendLogSnapshot;
735    use async_trait::async_trait;
736    use std::sync::Mutex;
737    use std::sync::atomic::{AtomicU64, Ordering};
738    use tokio::sync::mpsc::Sender;
739
740    fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
741        KvUpdate::Put(KvEntry {
742            key: key.to_string(),
743            value: value.to_vec(),
744            version: VersionToken::from_u64(rev),
745        })
746    }
747
748    /// A scripted watcher. Delivers a pre-set list of updates through the
749    /// channel, then either holds the channel open (so window/max/shutdown
750    /// flushes can be exercised without the stream ending) or returns cleanly
751    /// (so channel-close flushing can be exercised).
752    struct MockWatcher {
753        full: Mutex<Option<Vec<KvUpdate>>>,
754        from: Mutex<Option<Vec<KvUpdate>>>,
755        from_expires: bool,
756        hold: bool,
757    }
758
759    impl MockWatcher {
760        fn new(updates: Vec<KvUpdate>, hold: bool) -> Self {
761            Self {
762                full: Mutex::new(Some(updates)),
763                from: Mutex::new(None),
764                from_expires: false,
765                hold,
766            }
767        }
768
769        async fn deliver(&self, which: &Mutex<Option<Vec<KvUpdate>>>, tx: Sender<KvUpdate>) {
770            let updates = which.lock().unwrap().take().unwrap_or_default();
771            for u in updates {
772                if tx.send(u).await.is_err() {
773                    return;
774                }
775            }
776            if self.hold {
777                // Keep `tx` alive (channel open) until this task is aborted.
778                std::future::pending::<()>().await;
779            }
780        }
781    }
782
783    #[async_trait]
784    impl KvWatcher for MockWatcher {
785        async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
786            self.deliver(&self.full, tx).await;
787            Ok(())
788        }
789
790        async fn watch_prefix(&self, _prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
791            self.deliver(&self.full, tx).await;
792            Ok(())
793        }
794
795        async fn watch_prefixes(
796            &self,
797            _prefixes: &[&str],
798            tx: Sender<KvUpdate>,
799        ) -> Result<(), KvError> {
800            // This mock scripts the applied-watch resumption tests, not prefix
801            // filtering; it delivers the same `full` script as `watch_prefix`.
802            // The real multi-filter scoping is proved in the NATS integration test.
803            self.deliver(&self.full, tx).await;
804            Ok(())
805        }
806
807        async fn watch_all_from(
808            &self,
809            _cursor: &WatchCursor,
810            tx: Sender<KvUpdate>,
811        ) -> Result<(), KvError> {
812            if self.from_expires {
813                return Err(KvError::CursorExpired);
814            }
815            self.deliver(&self.from, tx).await;
816            Ok(())
817        }
818
819        // Mirror watch_all_from so the prefix resume / expiry arms of run_watch
820        // are exercised against the same `from` script. Without this the trait's
821        // default impl would delegate to watch_prefix and silently deliver the
822        // full set instead of the delta.
823        async fn watch_prefix_from(
824            &self,
825            _prefix: &str,
826            _cursor: &WatchCursor,
827            tx: Sender<KvUpdate>,
828        ) -> Result<(), KvError> {
829            if self.from_expires {
830                return Err(KvError::CursorExpired);
831            }
832            self.deliver(&self.from, tx).await;
833            Ok(())
834        }
835
836        // Same mirroring for the multi-prefix resume arm.
837        async fn watch_prefixes_from(
838            &self,
839            _prefixes: &[&str],
840            _cursor: &WatchCursor,
841            tx: Sender<KvUpdate>,
842        ) -> Result<(), KvError> {
843            if self.from_expires {
844                return Err(KvError::CursorExpired);
845            }
846            self.deliver(&self.from, tx).await;
847            Ok(())
848        }
849    }
850
851    /// A reader whose `keys()` serves a scripted live listing — the only call
852    /// the cursor-expired resync makes. Filters by prefix like a real backend
853    /// so prefix-scoped resyncs are exercised faithfully.
854    struct MockReader {
855        live: Vec<String>,
856    }
857
858    #[async_trait]
859    impl KvReader for MockReader {
860        async fn get(&self, _key: &str) -> Result<Option<KvEntry>, KvError> {
861            unreachable!("resync only lists keys")
862        }
863
864        async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
865            Ok(self
866                .live
867                .iter()
868                .filter(|k| k.starts_with(prefix))
869                .cloned()
870                .collect())
871        }
872
873        async fn scan(&self, _prefix: &str) -> Result<Vec<KvEntry>, KvError> {
874            unreachable!("resync only lists keys")
875        }
876    }
877
878    /// A watcher whose entry points all fail. Used to prove the watch task's
879    /// terminal error is surfaced out of `watch_applied` rather than swallowed
880    /// as a clean `Ok(applied)` when the channel closes.
881    struct ErrorWatcher;
882
883    #[async_trait]
884    impl KvWatcher for ErrorWatcher {
885        async fn watch_all(&self, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
886            Err(KvError::WatchError("injected watch failure".into()))
887        }
888
889        async fn watch_prefix(&self, _prefix: &str, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
890            Err(KvError::WatchError("injected watch failure".into()))
891        }
892
893        async fn watch_prefixes(
894            &self,
895            _prefixes: &[&str],
896            _tx: Sender<KvUpdate>,
897        ) -> Result<(), KvError> {
898            Err(KvError::WatchError("injected watch failure".into()))
899        }
900    }
901
902    // A no-op parse that keeps every Put as the value bytes; drops deletes.
903    fn parse_put(u: &KvUpdate) -> Option<Vec<u8>> {
904        match u {
905            KvUpdate::Put(e) => Some(e.value.clone()),
906            _ => None,
907        }
908    }
909
910    /// The stream closes (hold = false) with a pending batch; the remainder is
911    /// flushed before returning, the returned cursor is the last revision, and
912    /// `on_applied` ran exactly once after `apply`.
913    #[tokio::test]
914    async fn flush_on_channel_close() {
915        let updates = vec![put("a", b"1", 1), put("b", b"2", 2), put("c", b"3", 3)];
916        let watcher = Arc::new(MockWatcher::new(updates, false));
917
918        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<Vec<u8>>>::new()));
919        let on_applied_cursors = Arc::new(Mutex::new(Vec::<u64>::new()));
920
921        let ab = Arc::clone(&applied_batches);
922        let oc = Arc::clone(&on_applied_cursors);
923        let (_sd_tx, sd_rx) = watch::channel(false);
924
925        let cursor = watch_applied(
926            watcher,
927            WatchScope::All,
928            None,
929            None, // reader (no resync in this test)
930            None::<AppendLogSnapshot>,
931            None,
932            BatchConfig::default(),
933            parse_put,
934            move |batch| ab.lock().unwrap().push(batch),
935            move |c| oc.lock().unwrap().push(c.as_u64().unwrap()),
936            sd_rx,
937        )
938        .await
939        .unwrap();
940
941        assert_eq!(cursor.as_u64(), Some(3));
942        let batches = applied_batches.lock().unwrap();
943        let flat: Vec<Vec<u8>> = batches.iter().flatten().cloned().collect();
944        assert_eq!(flat, vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec()]);
945        assert_eq!(*on_applied_cursors.lock().unwrap().last().unwrap(), 3);
946    }
947
948    /// Fewer than `max` updates, then the channel idles: the window timer must
949    /// flush them and advance the cursor.
950    #[tokio::test(start_paused = true)]
951    async fn flush_on_window() {
952        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
953        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
954
955        let applied = Arc::new(AtomicU64::new(0));
956        let count = Arc::new(AtomicU64::new(0));
957        let a = Arc::clone(&applied);
958        let c = Arc::clone(&count);
959        let (sd_tx, sd_rx) = watch::channel(false);
960
961        let task = tokio::spawn(watch_applied(
962            watcher,
963            WatchScope::All,
964            None,
965            None, // reader (no resync in this test)
966            None::<AppendLogSnapshot>,
967            None,
968            BatchConfig::default(),
969            parse_put,
970            move |batch: Vec<Vec<u8>>| {
971                c.fetch_add(batch.len() as u64, Ordering::SeqCst);
972            },
973            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
974            sd_rx,
975        ));
976
977        // Let the window (10ms) elapse under virtual time.
978        tokio::time::sleep(Duration::from_millis(50)).await;
979        assert_eq!(
980            count.load(Ordering::SeqCst),
981            2,
982            "window should have flushed"
983        );
984        assert_eq!(applied.load(Ordering::SeqCst), 2);
985
986        sd_tx.send(true).unwrap();
987        let cursor = task.await.unwrap().unwrap();
988        assert_eq!(cursor.as_u64(), Some(2));
989    }
990
991    /// Exactly `max` updates fills a batch and flushes immediately — before the
992    /// window would have elapsed.
993    #[tokio::test(start_paused = true)]
994    async fn flush_on_max() {
995        let max = 4;
996        let updates: Vec<_> = (1..=max as u64)
997            .map(|i| put(&format!("k{i}"), b"v", i))
998            .collect();
999        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1000
1001        let flushes = Arc::new(Mutex::new(Vec::<usize>::new()));
1002        let f = Arc::clone(&flushes);
1003        let (sd_tx, sd_rx) = watch::channel(false);
1004
1005        let task = tokio::spawn(watch_applied(
1006            watcher,
1007            WatchScope::All,
1008            None,
1009            None, // reader (no resync in this test)
1010            None::<AppendLogSnapshot>,
1011            None,
1012            BatchConfig {
1013                window: Duration::from_secs(3600), // effectively never
1014                max,
1015            },
1016            parse_put,
1017            move |batch: Vec<Vec<u8>>| f.lock().unwrap().push(batch.len()),
1018            move |_| {},
1019            sd_rx,
1020        ));
1021
1022        // Yield enough for the mock to push all `max` updates; the window is an
1023        // hour, so any flush is purely the max trigger.
1024        tokio::time::sleep(Duration::from_millis(1)).await;
1025        assert_eq!(
1026            *flushes.lock().unwrap(),
1027            vec![max],
1028            "a full batch should flush on max, not wait for the window"
1029        );
1030
1031        sd_tx.send(true).unwrap();
1032        task.await.unwrap().unwrap();
1033    }
1034
1035    /// A pending batch plus a shutdown signal: the batch is flushed and the
1036    /// applied cursor returned.
1037    #[tokio::test(start_paused = true)]
1038    async fn flush_on_shutdown() {
1039        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1040        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1041
1042        let applied = Arc::new(AtomicU64::new(0));
1043        let a = Arc::clone(&applied);
1044        let (sd_tx, sd_rx) = watch::channel(false);
1045
1046        let task = tokio::spawn(watch_applied(
1047            watcher,
1048            WatchScope::All,
1049            None,
1050            None, // reader (no resync in this test)
1051            None::<AppendLogSnapshot>,
1052            None,
1053            BatchConfig {
1054                window: Duration::from_secs(3600), // window won't fire
1055                max: 100,
1056            },
1057            parse_put,
1058            move |_batch: Vec<Vec<u8>>| {},
1059            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1060            sd_rx,
1061        ));
1062
1063        // Give the mock time to deliver both updates into the pending batch.
1064        tokio::time::sleep(Duration::from_millis(1)).await;
1065        sd_tx.send(true).unwrap();
1066
1067        let cursor = task.await.unwrap().unwrap();
1068        assert_eq!(
1069            cursor.as_u64(),
1070            Some(2),
1071            "shutdown flushes the pending batch"
1072        );
1073        assert_eq!(applied.load(Ordering::SeqCst), 2);
1074    }
1075
1076    /// The cursor must not advance until `apply` has returned. We prove it by
1077    /// having `apply` read the cursor that `on_applied` last published: when the
1078    /// second batch is applied, the visible cursor must still be the *first*
1079    /// batch's — never the second's, which only becomes visible after this
1080    /// `apply` returns.
1081    #[tokio::test(start_paused = true)]
1082    async fn cursor_advances_only_after_apply() {
1083        // Two batches of `max` updates each.
1084        let max = 2usize;
1085        let updates: Vec<_> = (1..=4u64).map(|i| put(&format!("k{i}"), b"v", i)).collect();
1086        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1087
1088        // Cursor as last published by on_applied; starts at 0 (nothing applied).
1089        let published = Arc::new(AtomicU64::new(0));
1090        // What `apply` observed as the published cursor at the moment it ran.
1091        let seen_at_apply = Arc::new(Mutex::new(Vec::<u64>::new()));
1092
1093        let pub_for_apply = Arc::clone(&published);
1094        let seen = Arc::clone(&seen_at_apply);
1095        let pub_for_on = Arc::clone(&published);
1096        let (sd_tx, sd_rx) = watch::channel(false);
1097
1098        let task = tokio::spawn(watch_applied(
1099            watcher,
1100            WatchScope::All,
1101            None,
1102            None, // reader (no resync in this test)
1103            None::<AppendLogSnapshot>,
1104            None,
1105            BatchConfig {
1106                window: Duration::from_secs(3600),
1107                max,
1108            },
1109            parse_put,
1110            move |_batch: Vec<Vec<u8>>| {
1111                // The cursor visible here is whatever the PREVIOUS flush
1112                // published — never this batch's, because we haven't returned.
1113                seen.lock()
1114                    .unwrap()
1115                    .push(pub_for_apply.load(Ordering::SeqCst));
1116            },
1117            move |cur| pub_for_on.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1118            sd_rx,
1119        ));
1120
1121        tokio::time::sleep(Duration::from_millis(1)).await;
1122        sd_tx.send(true).unwrap();
1123        task.await.unwrap().unwrap();
1124
1125        // First apply saw 0 (nothing applied yet); second apply saw 2 (first
1126        // batch's cursor), NOT 4. The cursor only reached 4 after the second
1127        // apply returned.
1128        assert_eq!(*seen_at_apply.lock().unwrap(), vec![0, 2]);
1129        assert_eq!(published.load(Ordering::SeqCst), 4);
1130    }
1131
1132    /// Updates whose `parse` returns `None` (corrupt / irrelevant) carry no
1133    /// domain work, but they were still received — so the cursor must advance
1134    /// over them.
1135    #[tokio::test]
1136    async fn corrupt_parse_entries_advance_cursor() {
1137        let updates = vec![put("a", b"1", 5), put("b", b"2", 6), put("c", b"3", 7)];
1138        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1139
1140        let apply_calls = Arc::new(AtomicU64::new(0));
1141        let on_applied_max = Arc::new(AtomicU64::new(0));
1142        let ac = Arc::clone(&apply_calls);
1143        let om = Arc::clone(&on_applied_max);
1144        let (_sd_tx, sd_rx) = watch::channel(false);
1145
1146        let cursor = watch_applied(
1147            watcher,
1148            WatchScope::All,
1149            None,
1150            None, // reader (no resync in this test)
1151            None::<AppendLogSnapshot>,
1152            None,
1153            BatchConfig::default(),
1154            // Reject everything — simulates corrupt/irrelevant entries.
1155            |_u: &KvUpdate| -> Option<Vec<u8>> { None },
1156            move |batch: Vec<Vec<u8>>| {
1157                ac.fetch_add(1, Ordering::SeqCst);
1158                assert!(batch.is_empty());
1159            },
1160            move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1161            sd_rx,
1162        )
1163        .await
1164        .unwrap();
1165
1166        assert_eq!(cursor.as_u64(), Some(7), "cursor covers rejected updates");
1167        assert_eq!(
1168            apply_calls.load(Ordering::SeqCst),
1169            0,
1170            "an all-rejected batch applies nothing"
1171        );
1172        assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1173    }
1174
1175    /// A resume whose cursor has expired falls back to the full watch and still
1176    /// applies the delivered updates.
1177    #[tokio::test]
1178    async fn cursor_expired_falls_back_to_full_watch() {
1179        let mock = MockWatcher {
1180            full: Mutex::new(Some(vec![put("a", b"1", 10), put("b", b"2", 11)])),
1181            from: Mutex::new(Some(vec![])),
1182            from_expires: true,
1183            hold: false,
1184        };
1185        let watcher = Arc::new(mock);
1186
1187        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1188        let ab = Arc::clone(&applied_batches);
1189        let (_sd_tx, sd_rx) = watch::channel(false);
1190
1191        let cursor = watch_applied(
1192            watcher,
1193            WatchScope::All,
1194            Some(WatchCursor::from_u64(5)), // resume position that "expired"
1195            None,                           // reader (no resync in this test)
1196            None::<AppendLogSnapshot>,
1197            None,
1198            BatchConfig::default(),
1199            parse_put,
1200            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1201            move |_| {},
1202            sd_rx,
1203        )
1204        .await
1205        .unwrap();
1206
1207        assert_eq!(cursor.as_u64(), Some(11));
1208        assert_eq!(
1209            *applied_batches.lock().unwrap(),
1210            vec![b"1".to_vec(), b"2".to_vec()],
1211            "fallback full watch's updates were applied"
1212        );
1213    }
1214
1215    /// Cursor-expired resync: with a reader + store wired, a key the fold holds
1216    /// that the live listing no longer does gets a synthetic delete — applied
1217    /// strictly BEFORE the fallback re-list — and the persisted fold converges
1218    /// to the live state. The synthetic delete (unknown version) must not move
1219    /// the cursor; the re-list put must.
1220    #[tokio::test]
1221    async fn cursor_expired_resync_deletes_stale_keys() {
1222        let dir = tempfile::TempDir::new().unwrap();
1223        let path = dir.path().join("resync.snap");
1224        let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1225        // The fold from the previous run: node.a and node.b at cursor 2.
1226        store
1227            .apply(
1228                &[put("node.a", b"1", 1), put("node.b", b"2", 2)],
1229                &WatchCursor::from_u64(2),
1230            )
1231            .unwrap();
1232
1233        // During the gap node.b was deleted (marker since evicted) and node.a
1234        // updated; the resume cursor (2) has expired. The fallback re-list
1235        // therefore carries only the surviving key.
1236        let mock = MockWatcher {
1237            full: Mutex::new(Some(vec![put("node.a", b"1b", 10)])),
1238            from: Mutex::new(Some(vec![])),
1239            from_expires: true,
1240            hold: false,
1241        };
1242        let reader = MockReader {
1243            live: vec!["node.a".to_string()],
1244        };
1245
1246        // Record everything `parse` sees, in order, deletes included.
1247        let seen = Arc::new(Mutex::new(Vec::<(String, bool)>::new()));
1248        let s = Arc::clone(&seen);
1249        let (_sd_tx, sd_rx) = watch::channel(false);
1250
1251        let cursor = watch_applied(
1252            Arc::new(mock),
1253            WatchScope::All,
1254            Some(WatchCursor::from_u64(2)),
1255            Some(Arc::new(reader) as Arc<dyn KvReader>),
1256            Some(store),
1257            None,
1258            BatchConfig::default(),
1259            move |u: &KvUpdate| {
1260                s.lock()
1261                    .unwrap()
1262                    .push((u.key().to_string(), matches!(u, KvUpdate::Delete { .. })));
1263                Some(())
1264            },
1265            |_batch: Vec<()>| {},
1266            |_| {},
1267            sd_rx,
1268        )
1269        .await
1270        .unwrap();
1271
1272        // The re-list put advanced the cursor; the synthetic delete did not.
1273        assert_eq!(cursor.as_u64(), Some(10));
1274        // The synthetic delete strictly precedes the re-list put.
1275        assert_eq!(
1276            *seen.lock().unwrap(),
1277            vec![("node.b".to_string(), true), ("node.a".to_string(), false)],
1278            "synthetic delete must be applied before the fallback re-list"
1279        );
1280
1281        // The persisted fold converged: stale key gone, live key updated.
1282        let snap = crate::snapshot::load(&path).unwrap().unwrap();
1283        assert_eq!(snap.cursor.as_u64(), Some(10));
1284        assert_eq!(snap.entries.len(), 1);
1285        assert_eq!(snap.entries["node.a"].value, b"1b");
1286    }
1287
1288    /// A prefix-scoped resync diffs only in-scope keys: an out-of-scope key the
1289    /// fold holds survives, the in-scope stale key is deleted, and a flush
1290    /// containing only synthetic deletes leaves the cursor untouched.
1291    #[tokio::test]
1292    async fn cursor_expired_resync_respects_scope() {
1293        let dir = tempfile::TempDir::new().unwrap();
1294        let path = dir.path().join("resync-scope.snap");
1295        let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1296        store
1297            .apply(
1298                &[put("node.b", b"2", 1), put("other.z", b"9", 2)],
1299                &WatchCursor::from_u64(2),
1300            )
1301            .unwrap();
1302
1303        // Expired resume; the bucket no longer has ANY node.* keys; the
1304        // fallback re-list is empty.
1305        let mock = MockWatcher {
1306            full: Mutex::new(Some(vec![])),
1307            from: Mutex::new(Some(vec![])),
1308            from_expires: true,
1309            hold: false,
1310        };
1311        let reader = MockReader { live: vec![] };
1312        let (_sd_tx, sd_rx) = watch::channel(false);
1313
1314        let cursor = watch_applied(
1315            Arc::new(mock),
1316            WatchScope::Prefix("node.".to_string()),
1317            Some(WatchCursor::from_u64(2)),
1318            Some(Arc::new(reader) as Arc<dyn KvReader>),
1319            Some(store),
1320            None,
1321            BatchConfig::default(),
1322            |_u: &KvUpdate| Some(()),
1323            |_batch: Vec<()>| {},
1324            |_| {},
1325            sd_rx,
1326        )
1327        .await
1328        .unwrap();
1329
1330        // Deletes-only flush: cursor stays at the resume position.
1331        assert_eq!(cursor.as_u64(), Some(2));
1332
1333        let snap = crate::snapshot::load(&path).unwrap().unwrap();
1334        assert_eq!(snap.cursor.as_u64(), Some(2));
1335        assert!(
1336            !snap.entries.contains_key("node.b"),
1337            "in-scope stale key must be resync-deleted"
1338        );
1339        assert_eq!(
1340            snap.entries["other.z"].value, b"9",
1341            "out-of-scope key must survive a prefix-scoped resync"
1342        );
1343    }
1344
1345    /// `WatchScope::Prefixes` dispatches to `watch_prefixes` (no resume) and to
1346    /// `watch_prefixes_from` with the expiry → full-watch fallback (resume).
1347    #[tokio::test]
1348    async fn prefixes_scope_dispatches_full_watch() {
1349        let updates = vec![put("a.x", b"1", 1), put("b.y", b"2", 2)];
1350        let watcher = Arc::new(MockWatcher::new(updates, false));
1351        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1352        let ab = Arc::clone(&applied_batches);
1353        let (_sd_tx, sd_rx) = watch::channel(false);
1354
1355        let cursor = watch_applied(
1356            watcher,
1357            WatchScope::Prefixes(vec!["a.".to_string(), "b.".to_string()]),
1358            None,
1359            None, // reader (no resync in this test)
1360            None::<AppendLogSnapshot>,
1361            None,
1362            BatchConfig::default(),
1363            parse_put,
1364            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1365            move |_| {},
1366            sd_rx,
1367        )
1368        .await
1369        .unwrap();
1370
1371        assert_eq!(cursor.as_u64(), Some(2));
1372        assert_eq!(
1373            *applied_batches.lock().unwrap(),
1374            vec![b"1".to_vec(), b"2".to_vec()]
1375        );
1376    }
1377
1378    /// `WatchScope::Prefixes` resume whose cursor has expired falls back to the
1379    /// full multi-prefix watch and applies its updates.
1380    #[tokio::test]
1381    async fn prefixes_scope_expired_resume_falls_back() {
1382        let mock = MockWatcher {
1383            full: Mutex::new(Some(vec![put("a.x", b"1", 7)])),
1384            from: Mutex::new(Some(vec![])),
1385            from_expires: true,
1386            hold: false,
1387        };
1388        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1389        let ab = Arc::clone(&applied_batches);
1390        let (_sd_tx, sd_rx) = watch::channel(false);
1391
1392        let cursor = watch_applied(
1393            Arc::new(mock),
1394            WatchScope::Prefixes(vec!["a.".to_string()]),
1395            Some(WatchCursor::from_u64(3)),
1396            None, // reader (no resync in this test)
1397            None::<AppendLogSnapshot>,
1398            None,
1399            BatchConfig::default(),
1400            parse_put,
1401            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1402            move |_| {},
1403            sd_rx,
1404        )
1405        .await
1406        .unwrap();
1407
1408        assert_eq!(cursor.as_u64(), Some(7));
1409        assert_eq!(*applied_batches.lock().unwrap(), vec![b"1".to_vec()]);
1410    }
1411
1412    /// End-to-end with a real snapshot file: after the run, the persisted
1413    /// snapshot's cursor equals the applied cursor and its entries match the
1414    /// applied state — proving the checkpoint is written at the post-apply
1415    /// cursor, never ahead of it.
1416    #[tokio::test]
1417    async fn snapshot_checkpoint_matches_applied_cursor() {
1418        let dir = tempfile::TempDir::new().unwrap();
1419        let path = dir.path().join("applied.snap");
1420        let (_resume, store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1421
1422        let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
1423        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1424        let (_sd_tx, sd_rx) = watch::channel(false);
1425
1426        let cursor = watch_applied(
1427            watcher,
1428            WatchScope::All,
1429            None,
1430            None, // reader (no resync in this test)
1431            Some(store),
1432            None,
1433            BatchConfig::default(),
1434            parse_put,
1435            move |_batch: Vec<Vec<u8>>| {},
1436            move |_| {},
1437            sd_rx,
1438        )
1439        .await
1440        .unwrap();
1441
1442        assert_eq!(cursor.as_u64(), Some(2));
1443
1444        let snap = crate::snapshot::load(&path).unwrap().unwrap();
1445        assert_eq!(
1446            snap.cursor.as_u64(),
1447            cursor.as_u64(),
1448            "snapshot checkpoint cursor must equal the applied cursor"
1449        );
1450        assert_eq!(snap.entries.len(), 2);
1451        assert_eq!(snap.entries["node.a"].value, b"1");
1452        assert_eq!(snap.entries["node.b"].value, b"2");
1453    }
1454
1455    /// Happy-path resume: a non-expired cursor takes the `*_from` path and the
1456    /// delta (the `from` script, NOT the full set) is applied. Proves the
1457    /// resume branch delivers only post-cursor updates and advances to their
1458    /// max revision.
1459    #[tokio::test]
1460    async fn resume_from_cursor_delivers_only_delta() {
1461        let mock = MockWatcher {
1462            // `full` would be delivered only if the resume path were (wrongly)
1463            // bypassed; a non-empty distinguishing value makes that visible.
1464            full: Mutex::new(Some(vec![put("full.x", b"FULL", 1)])),
1465            from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
1466            from_expires: false,
1467            hold: false,
1468        };
1469        let watcher = Arc::new(mock);
1470
1471        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1472        let ab = Arc::clone(&applied_batches);
1473        let (_sd_tx, sd_rx) = watch::channel(false);
1474
1475        let cursor = watch_applied(
1476            watcher,
1477            WatchScope::All,
1478            Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
1479            None,                           // reader (no resync in this test)
1480            None::<AppendLogSnapshot>,
1481            None,
1482            BatchConfig::default(),
1483            parse_put,
1484            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1485            move |_| {},
1486            sd_rx,
1487        )
1488        .await
1489        .unwrap();
1490
1491        assert_eq!(
1492            cursor.as_u64(),
1493            Some(11),
1494            "cursor advances to the delta max"
1495        );
1496        assert_eq!(
1497            *applied_batches.lock().unwrap(),
1498            vec![b"3".to_vec(), b"4".to_vec()],
1499            "only the post-cursor delta is applied, never the full set"
1500        );
1501    }
1502
1503    /// `WatchScope::Prefix` with no resume dispatches to `watch_prefix` and
1504    /// applies the delivered updates. Every other test uses `WatchScope::All`;
1505    /// this covers the prefix dispatch arm.
1506    #[tokio::test]
1507    async fn prefix_scope_applies_delivered_updates() {
1508        let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
1509        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1510
1511        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1512        let ab = Arc::clone(&applied_batches);
1513        let (_sd_tx, sd_rx) = watch::channel(false);
1514
1515        let cursor = watch_applied(
1516            watcher,
1517            WatchScope::Prefix("node.".to_string()),
1518            None,
1519            None, // reader (no resync in this test)
1520            None::<AppendLogSnapshot>,
1521            None,
1522            BatchConfig::default(),
1523            parse_put,
1524            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1525            move |_| {},
1526            sd_rx,
1527        )
1528        .await
1529        .unwrap();
1530
1531        assert_eq!(cursor.as_u64(), Some(2));
1532        assert_eq!(
1533            *applied_batches.lock().unwrap(),
1534            vec![b"1".to_vec(), b"2".to_vec()]
1535        );
1536    }
1537
1538    /// `WatchScope::Prefix` happy-path resume: a non-expired cursor takes the
1539    /// `watch_prefix_from` path and only the delta is applied — the prefix
1540    /// twin of `resume_from_cursor_delivers_only_delta`.
1541    #[tokio::test]
1542    async fn prefix_resume_from_cursor_delivers_only_delta() {
1543        let mock = MockWatcher {
1544            // `full` would be delivered only if the resume path were (wrongly)
1545            // bypassed; a distinguishing value makes that visible.
1546            full: Mutex::new(Some(vec![put("node.x", b"FULL", 1)])),
1547            from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
1548            from_expires: false,
1549            hold: false,
1550        };
1551        let watcher = Arc::new(mock);
1552
1553        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1554        let ab = Arc::clone(&applied_batches);
1555        let (_sd_tx, sd_rx) = watch::channel(false);
1556
1557        let cursor = watch_applied(
1558            watcher,
1559            WatchScope::Prefix("node.".to_string()),
1560            Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
1561            None,                           // reader (no resync in this test)
1562            None::<AppendLogSnapshot>,
1563            None,
1564            BatchConfig::default(),
1565            parse_put,
1566            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1567            move |_| {},
1568            sd_rx,
1569        )
1570        .await
1571        .unwrap();
1572
1573        assert_eq!(
1574            cursor.as_u64(),
1575            Some(11),
1576            "cursor advances to the delta max"
1577        );
1578        assert_eq!(
1579            *applied_batches.lock().unwrap(),
1580            vec![b"3".to_vec(), b"4".to_vec()],
1581            "only the post-cursor delta is applied via watch_prefix_from"
1582        );
1583    }
1584
1585    /// `WatchScope::Prefix` resume whose cursor has expired falls back to the
1586    /// full `watch_prefix` and still applies the delivered updates — the prefix
1587    /// twin of `cursor_expired_falls_back_to_full_watch`.
1588    #[tokio::test]
1589    async fn prefix_cursor_expired_falls_back_to_full_prefix_watch() {
1590        let mock = MockWatcher {
1591            full: Mutex::new(Some(vec![put("node.a", b"1", 10), put("node.b", b"2", 11)])),
1592            from: Mutex::new(Some(vec![])),
1593            from_expires: true,
1594            hold: false,
1595        };
1596        let watcher = Arc::new(mock);
1597
1598        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1599        let ab = Arc::clone(&applied_batches);
1600        let (_sd_tx, sd_rx) = watch::channel(false);
1601
1602        let cursor = watch_applied(
1603            watcher,
1604            WatchScope::Prefix("node.".to_string()),
1605            Some(WatchCursor::from_u64(5)), // resume position that "expired"
1606            None,                           // reader (no resync in this test)
1607            None::<AppendLogSnapshot>,
1608            None,
1609            BatchConfig::default(),
1610            parse_put,
1611            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1612            move |_| {},
1613            sd_rx,
1614        )
1615        .await
1616        .unwrap();
1617
1618        assert_eq!(cursor.as_u64(), Some(11));
1619        assert_eq!(
1620            *applied_batches.lock().unwrap(),
1621            vec![b"1".to_vec(), b"2".to_vec()],
1622            "prefix fallback full watch's updates were applied"
1623        );
1624    }
1625
1626    /// The watch task's terminal error must propagate out of `watch_applied`
1627    /// rather than being swallowed as `Ok(applied)` when the channel closes.
1628    #[tokio::test]
1629    async fn watch_task_error_propagates() {
1630        let watcher = Arc::new(ErrorWatcher);
1631        let (_sd_tx, sd_rx) = watch::channel(false);
1632
1633        let result = watch_applied(
1634            watcher,
1635            WatchScope::All,
1636            None,
1637            None, // reader (no resync in this test)
1638            None::<AppendLogSnapshot>,
1639            None,
1640            BatchConfig::default(),
1641            parse_put,
1642            move |_batch: Vec<Vec<u8>>| {},
1643            move |_| {},
1644            sd_rx,
1645        )
1646        .await;
1647
1648        match result {
1649            Err(KvError::WatchError(msg)) => {
1650                assert!(msg.contains("injected"), "error carries the cause: {msg}");
1651            }
1652            other => panic!("expected WatchError, got {other:?}"),
1653        }
1654    }
1655
1656    /// A batch where `parse` accepts some updates and rejects others: the cursor
1657    /// must still advance to the highest *received* revision (covering the
1658    /// rejected entry in the middle), while `apply` sees only the accepted ones.
1659    #[tokio::test]
1660    async fn mixed_parse_advances_cursor_over_rejected_entries() {
1661        let updates = vec![
1662            put("keep.a", b"1", 5),
1663            put("skip.b", b"2", 6), // rejected by parse
1664            put("keep.c", b"3", 7),
1665        ];
1666        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1667
1668        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1669        let on_applied_max = Arc::new(AtomicU64::new(0));
1670        let ab = Arc::clone(&applied_batches);
1671        let om = Arc::clone(&on_applied_max);
1672        let (_sd_tx, sd_rx) = watch::channel(false);
1673
1674        let cursor = watch_applied(
1675            watcher,
1676            WatchScope::All,
1677            None,
1678            None, // reader (no resync in this test)
1679            None::<AppendLogSnapshot>,
1680            None,
1681            BatchConfig::default(),
1682            // Keep only keys under "keep."; reject everything else.
1683            |u: &KvUpdate| -> Option<Vec<u8>> {
1684                match u {
1685                    KvUpdate::Put(e) if e.key.starts_with("keep.") => Some(e.value.clone()),
1686                    _ => None,
1687                }
1688            },
1689            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1690            move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1691            sd_rx,
1692        )
1693        .await
1694        .unwrap();
1695
1696        assert_eq!(
1697            cursor.as_u64(),
1698            Some(7),
1699            "cursor covers the rejected middle entry (rev 6)"
1700        );
1701        assert_eq!(
1702            *applied_batches.lock().unwrap(),
1703            vec![b"1".to_vec(), b"3".to_vec()],
1704            "apply sees only the accepted entries"
1705        );
1706        assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1707    }
1708
1709    /// Shutdown before any update arrives: nothing was received, so the cursor
1710    /// stays at the resume position (here `none()`), `apply` never runs, and
1711    /// `on_applied` never fires.
1712    #[tokio::test(start_paused = true)]
1713    async fn shutdown_with_no_pending_batch() {
1714        let watcher = Arc::new(MockWatcher::new(vec![], true)); // deliver nothing, hold open
1715
1716        let apply_calls = Arc::new(AtomicU64::new(0));
1717        let on_applied_calls = Arc::new(AtomicU64::new(0));
1718        let ac = Arc::clone(&apply_calls);
1719        let oc = Arc::clone(&on_applied_calls);
1720        let (sd_tx, sd_rx) = watch::channel(false);
1721
1722        let task = tokio::spawn(watch_applied(
1723            watcher,
1724            WatchScope::All,
1725            None,
1726            None, // reader (no resync in this test)
1727            None::<AppendLogSnapshot>,
1728            None,
1729            BatchConfig::default(),
1730            parse_put,
1731            move |_batch: Vec<Vec<u8>>| {
1732                ac.fetch_add(1, Ordering::SeqCst);
1733            },
1734            move |_| {
1735                oc.fetch_add(1, Ordering::SeqCst);
1736            },
1737            sd_rx,
1738        ));
1739
1740        // Let the watcher attach and idle (it has nothing to deliver), then shut down.
1741        tokio::time::sleep(Duration::from_millis(1)).await;
1742        sd_tx.send(true).unwrap();
1743
1744        let cursor = task.await.unwrap().unwrap();
1745        assert_eq!(
1746            cursor.as_u64(),
1747            None,
1748            "no updates received → cursor unmoved"
1749        );
1750        assert_eq!(apply_calls.load(Ordering::SeqCst), 0, "apply never runs");
1751        assert_eq!(
1752            on_applied_calls.load(Ordering::SeqCst),
1753            0,
1754            "on_applied never fires"
1755        );
1756    }
1757
1758    /// An [`ExportRequest`] flushes the pending batch first, so the artifact's
1759    /// cursor is exactly the applied cursor — and the artifact is importable
1760    /// with the batched entries in it.
1761    #[tokio::test(start_paused = true)]
1762    async fn export_request_flushes_pending_batch_first() {
1763        let dir = tempfile::TempDir::new().unwrap();
1764        let store_path = dir.path().join("fold.snap");
1765        let artifact = dir.path().join("artifact");
1766        let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1767
1768        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1769        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1770        let (sd_tx, sd_rx) = watch::channel(false);
1771        let (ex_tx, ex_rx) = mpsc::channel(1);
1772
1773        let task = tokio::spawn(watch_applied(
1774            watcher,
1775            WatchScope::All,
1776            None,
1777            None, // reader (no resync in this test)
1778            Some(store),
1779            Some(ex_rx),
1780            BatchConfig {
1781                window: Duration::from_secs(3600), // window never fires
1782                max: 100,
1783            },
1784            parse_put,
1785            move |_batch: Vec<Vec<u8>>| {},
1786            move |_| {},
1787            sd_rx,
1788        ));
1789
1790        // Let both updates land in the (unflushed) pending batch, then export.
1791        tokio::time::sleep(Duration::from_millis(1)).await;
1792        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1793        ex_tx
1794            .send(ExportRequest {
1795                dest_dir: artifact.clone(),
1796                reply: reply_tx,
1797            })
1798            .await
1799            .unwrap();
1800
1801        let manifest = reply_rx.await.unwrap().expect("export succeeds");
1802        assert_eq!(
1803            manifest.cursor.as_u64(),
1804            Some(2),
1805            "pending batch flushed before export: artifact cursor is the applied cursor"
1806        );
1807
1808        // The artifact is importable and holds both batched entries.
1809        let (cursor, imported) =
1810            AppendLogSnapshot::import(&artifact, &dir.path().join("imported.snap"), u64::MAX)
1811                .unwrap();
1812        assert_eq!(cursor.as_u64(), Some(2));
1813        assert_eq!(imported.get("a").unwrap().unwrap().value, b"1");
1814        assert_eq!(imported.get("b").unwrap().unwrap().value, b"2");
1815
1816        sd_tx.send(true).unwrap();
1817        task.await.unwrap().unwrap();
1818    }
1819
1820    /// An [`ExportRequest`] that arrives with NOTHING pending (the window
1821    /// already flushed everything) still produces a valid artifact whose
1822    /// cursor is the applied cursor. The flush-before-export step must be a
1823    /// clean no-op, not an error or a cursor regression.
1824    #[tokio::test(start_paused = true)]
1825    async fn export_with_empty_pending_batch_succeeds() {
1826        let dir = tempfile::TempDir::new().unwrap();
1827        let store_path = dir.path().join("fold.snap");
1828        let artifact = dir.path().join("artifact");
1829        let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1830
1831        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1832        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1833        let (sd_tx, sd_rx) = watch::channel(false);
1834        let (ex_tx, ex_rx) = mpsc::channel(1);
1835
1836        let task = tokio::spawn(watch_applied(
1837            watcher,
1838            WatchScope::All,
1839            None,
1840            None, // reader (no resync in this test)
1841            Some(store),
1842            Some(ex_rx),
1843            BatchConfig::default(), // 10 ms window
1844            parse_put,
1845            move |_batch: Vec<Vec<u8>>| {},
1846            move |_| {},
1847            sd_rx,
1848        ));
1849
1850        // Let the window flush both updates, so the export request finds an
1851        // EMPTY pending batch.
1852        tokio::time::sleep(Duration::from_millis(50)).await;
1853
1854        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1855        ex_tx
1856            .send(ExportRequest {
1857                dest_dir: artifact.clone(),
1858                reply: reply_tx,
1859            })
1860            .await
1861            .unwrap();
1862        let manifest = reply_rx
1863            .await
1864            .unwrap()
1865            .expect("export succeeds with nothing pending");
1866        assert_eq!(
1867            manifest.cursor.as_u64(),
1868            Some(2),
1869            "artifact cursor is the applied cursor, unchanged by the no-op flush"
1870        );
1871
1872        // The artifact is importable and holds the already-flushed entries.
1873        let (cursor, imported) =
1874            AppendLogSnapshot::import(&artifact, &dir.path().join("imported.snap"), u64::MAX)
1875                .unwrap();
1876        assert_eq!(cursor.as_u64(), Some(2));
1877        assert_eq!(imported.get("a").unwrap().unwrap().value, b"1");
1878        assert_eq!(imported.get("b").unwrap().unwrap().value, b"2");
1879
1880        sd_tx.send(true).unwrap();
1881        task.await.unwrap().unwrap();
1882    }
1883
1884    /// An export request against a store-less watch replies with an error and
1885    /// the watch keeps running.
1886    #[tokio::test(start_paused = true)]
1887    async fn export_without_store_replies_error() {
1888        let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
1889        let (sd_tx, sd_rx) = watch::channel(false);
1890        let (ex_tx, ex_rx) = mpsc::channel(1);
1891
1892        let task = tokio::spawn(watch_applied(
1893            watcher,
1894            WatchScope::All,
1895            None,
1896            None, // reader (no resync in this test)
1897            None::<AppendLogSnapshot>,
1898            Some(ex_rx),
1899            BatchConfig::default(),
1900            parse_put,
1901            move |_batch: Vec<Vec<u8>>| {},
1902            move |_| {},
1903            sd_rx,
1904        ));
1905
1906        tokio::time::sleep(Duration::from_millis(1)).await;
1907        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1908        ex_tx
1909            .send(ExportRequest {
1910                dest_dir: std::env::temp_dir().join("never-created"),
1911                reply: reply_tx,
1912            })
1913            .await
1914            .unwrap();
1915        assert!(
1916            reply_rx.await.unwrap().is_err(),
1917            "no store → export errors via the reply"
1918        );
1919
1920        // The watch is still alive and returns its applied cursor on shutdown.
1921        sd_tx.send(true).unwrap();
1922        let cursor = task.await.unwrap().unwrap();
1923        assert_eq!(cursor.as_u64(), Some(1));
1924    }
1925
1926    /// An export failure (unavailable destination) is reported on the reply and
1927    /// the watch keeps applying later updates.
1928    #[tokio::test(start_paused = true)]
1929    async fn export_error_does_not_kill_watch() {
1930        let dir = tempfile::TempDir::new().unwrap();
1931        let store_path = dir.path().join("fold.snap");
1932        let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1933
1934        // Occupied destination → export fails.
1935        let occupied = dir.path().join("occupied");
1936        std::fs::create_dir(&occupied).unwrap();
1937        std::fs::write(occupied.join("stray"), b"x").unwrap();
1938
1939        let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
1940        let (sd_tx, sd_rx) = watch::channel(false);
1941        let (ex_tx, ex_rx) = mpsc::channel(1);
1942
1943        let applied = Arc::new(AtomicU64::new(0));
1944        let a = Arc::clone(&applied);
1945
1946        let task = tokio::spawn(watch_applied(
1947            watcher,
1948            WatchScope::All,
1949            None,
1950            None, // reader (no resync in this test)
1951            Some(store),
1952            Some(ex_rx),
1953            BatchConfig::default(),
1954            parse_put,
1955            move |_batch: Vec<Vec<u8>>| {},
1956            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1957            sd_rx,
1958        ));
1959
1960        tokio::time::sleep(Duration::from_millis(1)).await;
1961        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1962        ex_tx
1963            .send(ExportRequest {
1964                dest_dir: occupied,
1965                reply: reply_tx,
1966            })
1967            .await
1968            .unwrap();
1969        match reply_rx.await.unwrap() {
1970            Err(crate::snapshot::SnapshotError::ArtifactInvalid(_)) => {}
1971            other => panic!("expected ArtifactInvalid, got {other:?}"),
1972        }
1973
1974        // Watch still folds: a clean shutdown returns the applied cursor.
1975        sd_tx.send(true).unwrap();
1976        let cursor = task.await.unwrap().unwrap();
1977        assert_eq!(cursor.as_u64(), Some(1), "watch survived the failed export");
1978        assert_eq!(applied.load(Ordering::SeqCst), 1);
1979    }
1980
1981    /// Dropping the export sender disarms the arm; the loop keeps batching and
1982    /// flushing normally.
1983    #[tokio::test(start_paused = true)]
1984    async fn export_sender_dropped_disarms_channel() {
1985        let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
1986        let (sd_tx, sd_rx) = watch::channel(false);
1987        let (ex_tx, ex_rx) = mpsc::channel::<ExportRequest>(1);
1988
1989        let applied = Arc::new(AtomicU64::new(0));
1990        let a = Arc::clone(&applied);
1991
1992        let task = tokio::spawn(watch_applied(
1993            watcher,
1994            WatchScope::All,
1995            None,
1996            None, // reader (no resync in this test)
1997            None::<AppendLogSnapshot>,
1998            Some(ex_rx),
1999            BatchConfig::default(),
2000            parse_put,
2001            move |_batch: Vec<Vec<u8>>| {},
2002            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
2003            sd_rx,
2004        ));
2005
2006        drop(ex_tx); // disarm
2007        tokio::time::sleep(Duration::from_millis(50)).await;
2008        assert_eq!(
2009            applied.load(Ordering::SeqCst),
2010            1,
2011            "loop keeps flushing after the export sender is gone"
2012        );
2013
2014        sd_tx.send(true).unwrap();
2015        task.await.unwrap().unwrap();
2016    }
2017
2018    /// With a low `compact_threshold`, the flush path's `spawn_blocking`
2019    /// compaction actually fires (every other snapshot test pins the threshold
2020    /// at `u64::MAX`, leaving that branch dead). After a compacting run the
2021    /// snapshot must still load cleanly with the right cursor and entries.
2022    #[tokio::test]
2023    async fn snapshot_compaction_fires_and_stays_consistent() {
2024        let dir = tempfile::TempDir::new().unwrap();
2025        let path = dir.path().join("applied.snap");
2026        // threshold 0 → every checkpoint reports "needs compact", forcing the
2027        // store's inline-compaction branch on each flush (run off the hot path via
2028        // spawn_blocking inside watch_applied).
2029        let (_resume, store) = AppendLogSnapshot::open(&path, 0).unwrap();
2030
2031        // Re-put the same key across flushes so compaction has duplicates to
2032        // dedup; small max forces multiple flushes (hence multiple compactions).
2033        let updates = vec![
2034            put("node.a", b"1", 1),
2035            put("node.a", b"2", 2),
2036            put("node.b", b"3", 3),
2037            put("node.a", b"4", 4),
2038        ];
2039        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
2040        let (_sd_tx, sd_rx) = watch::channel(false);
2041
2042        let cursor = watch_applied(
2043            watcher,
2044            WatchScope::All,
2045            None,
2046            None, // reader (no resync in this test)
2047            Some(store),
2048            None,
2049            BatchConfig {
2050                window: Duration::from_secs(3600),
2051                max: 1, // one update per flush → a compaction per update
2052            },
2053            parse_put,
2054            move |_batch: Vec<Vec<u8>>| {},
2055            move |_| {},
2056            sd_rx,
2057        )
2058        .await
2059        .unwrap();
2060
2061        assert_eq!(cursor.as_u64(), Some(4));
2062
2063        let snap = crate::snapshot::load(&path).unwrap().unwrap();
2064        assert_eq!(
2065            snap.cursor.as_u64(),
2066            cursor.as_u64(),
2067            "compacted snapshot's cursor still equals the applied cursor"
2068        );
2069        assert_eq!(snap.entries.len(), 2, "duplicates of node.a deduped");
2070        assert_eq!(
2071            snap.entries["node.a"].value, b"4",
2072            "last write per key survives compaction"
2073        );
2074        assert_eq!(snap.entries["node.b"].value, b"3");
2075    }
2076}