slipstream/applied.rs
1//! Cursor-after-apply watch combinator.
2//!
3//! [`watch_applied`] drives a [`KvWatcher`], batches incoming [`KvUpdate`]s over
4//! a short window (or a max count), hands each batch to a caller-supplied
5//! `apply` closure, and **only then** advances the resume cursor, checkpoints
6//! the snapshot, and fires `on_applied`. It encodes one discipline that every
7//! hand-rolled watch loop in the wider system gets subtly wrong:
8//!
9//! > **INVARIANT.** A persisted/reported cursor `C` implies every update with
10//! > revision ≤ `C` has been *applied* — the caller's `apply()` has returned for
11//! > it. The cursor never advances on *receipt* of an update, only after it has
12//! > durably taken effect.
13//!
14//! ## Why receipt is the wrong signal
15//!
16//! The tempting shortcut is to bump the cursor as each update arrives off the
17//! channel (`high_water = rev` on `rx.recv()`), then apply the batch later. On a
18//! crash between those two steps the persisted cursor claims "caught up to rev
19//! N" while rev N is still sitting in an unapplied buffer. On resume the watch
20//! starts *past* rev N and silently skips it — a correctness hole in the exact
21//! "resume after any restart" guarantee this crate advertises.
22//!
23//! Saltzer, Reed & Clark's *End-to-End Arguments in System Design* (1984) names
24//! the fix: a function placed below the endpoints (here, the channel receive)
25//! can only be a performance hint; the *endpoint* — the application of the
26//! update — is the only place the "it happened" guarantee can actually be
27//! established. So the cursor is written from `apply()`'s completion, not from
28//! the transport's delivery.
29//!
30//! The cursor-as-monotonic-index-into-a-log shape itself follows HashiCorp
31//! Consul's anti-entropy / blocking-query lineage: a client holds the last index
32//! it has *reconciled* and re-arms the watch from there, never from the index it
33//! merely *saw*.
34//!
35//! ## What the caller supplies
36//!
37//! - `parse`: maps a raw [`KvUpdate`] to an optional domain value `U`. Returning
38//! `None` (corrupt bytes, irrelevant key) is fine — the update is still
39//! *received*, so it still counts toward the cursor; there is simply nothing to
40//! apply for it.
41//! - `apply`: consumes a `Vec<U>` in revision order. This is the only domain
42//! logic; for the tunnel router it swaps the route table, for the edge origin
43//! watcher it rebuilds the hashrings.
44//! - `on_applied`: fires once per flush, *after* `apply` returns, with the new
45//! applied cursor. Callers use it to persist the cursor for the next restart.
46//!
47//! ## Panics
48//!
49//! `apply` runs inline on the watch task. If it panics, the panic propagates out
50//! of [`watch_applied`] and aborts the watch — that is the caller's contract,
51//! the same as a panic in any other supplied closure.
52
53use std::path::PathBuf;
54use std::sync::Arc;
55use std::time::Duration;
56
57use tokio::sync::mpsc;
58use tokio::sync::{oneshot, watch};
59use tracing::warn;
60
61use crate::artifact::ExportManifest;
62use crate::kv::{KvError, KvReader, KvUpdate, KvWatcher, WatchCursor};
63use crate::snapshot::{SnapshotError, SnapshotStore};
64
65/// A request, sent into a running [`watch_applied`] loop, to export the fold it
66/// owns (see [`SnapshotStore::export_to`]).
67///
68/// `watch_applied` takes its snapshot store **by value**, so a consumer that
69/// wants periodic artifacts of a live fold cannot call `export_to` itself. It
70/// instead passes an `mpsc::Receiver<ExportRequest>` to [`watch_applied`] and
71/// sends requests through the paired sender. The loop handles a request
72/// between batch flushes — after flushing any pending batch — so the artifact's
73/// embedded cursor is exactly the applied cursor at the moment of export.
74///
75/// The export result (or error) comes back on `reply`; an export failure is
76/// reported there and the watch keeps running (the snapshot is a cache — a
77/// failed artifact is the requester's problem, not the fold's).
78pub struct ExportRequest {
79 /// Where the artifact directory will be created. Must not exist (or be an
80 /// empty directory); same filesystem as the fold for cheap hardlinks.
81 pub dest_dir: PathBuf,
82 /// Receives the sealed manifest on success. A dropped receiver is ignored.
83 pub reply: oneshot::Sender<Result<ExportManifest, SnapshotError>>,
84}
85
86/// What to watch: every key, every key under a prefix, or the union of several
87/// prefixes.
88///
89/// Mirrors the [`KvWatcher`] surface — `All` maps to `watch_all` /
90/// `watch_all_from`, `Prefix` to `watch_prefix` / `watch_prefix_from`,
91/// `Prefixes` to `watch_prefixes` / `watch_prefixes_from` (one multi-filter
92/// consumer for the whole union, never one consumer per prefix).
93#[derive(Debug, Clone)]
94pub enum WatchScope {
95 /// Watch all keys in the bucket.
96 All,
97 /// Watch only keys beginning with this prefix.
98 Prefix(String),
99 /// Watch keys beginning with ANY of these prefixes, on a single consumer.
100 Prefixes(Vec<String>),
101}
102
103impl WatchScope {
104 /// The scope as a list of key prefixes (`All` = the empty prefix), for
105 /// callers that enumerate scope contents (live listings, fold ranges).
106 fn prefixes(&self) -> Vec<String> {
107 match self {
108 WatchScope::All => vec![String::new()],
109 WatchScope::Prefix(p) => vec![p.clone()],
110 WatchScope::Prefixes(ps) => ps.clone(),
111 }
112 }
113}
114
115/// Internal: a cursor-expired resync handoff from the watch task to the main
116/// loop. Carries the bucket's live key listing for the watch scope; the main
117/// loop diffs it against the fold and applies synthetic deletes for keys that
118/// vanished during the gap, then acks so the watch task can start the fallback
119/// watch — the ack ordering guarantees every synthetic delete is applied before
120/// the first re-list put arrives.
121struct ResyncRequest {
122 live_keys: Vec<String>,
123 ack: oneshot::Sender<()>,
124}
125
126/// Internal: what the watch task needs to initiate a resync — the reader that
127/// lists live keys and the channel into the main loop that owns the fold.
128type ResyncHandle = (Arc<dyn KvReader>, mpsc::Sender<ResyncRequest>);
129
130/// Batching policy for [`watch_applied`].
131///
132/// A flush fires when **either** bound is hit, whichever comes first: `window`
133/// time has elapsed since the batch opened, or `max` updates have accumulated.
134/// The window amortizes the cost of `apply` (e.g. one route-table clone per
135/// flush instead of one per update); `max` caps memory and latency when updates
136/// arrive faster than the window.
137#[derive(Debug, Clone, Copy)]
138pub struct BatchConfig {
139 /// Maximum time a batch stays open before being flushed.
140 pub window: Duration,
141 /// Maximum number of parsed updates in a batch before forcing a flush.
142 pub max: usize,
143 /// Capacity of the internal watch-task → main-loop channel. When the main
144 /// loop falls behind (slow `apply`, blocking store flush), a full channel
145 /// backpressures the watch task — that is the design — but during initial
146 /// state-sync hydration of a large bucket the channel can fill faster than
147 /// the window flushes, making *this* the effective batch boundary rather
148 /// than [`max`](Self::max). Tune it together with `max` for high-fanout
149 /// hydration; clamped to a minimum of 1.
150 pub channel_capacity: usize,
151}
152
153impl Default for BatchConfig {
154 /// 10 ms / 100 updates — the de-facto default every hand-rolled caller
155 /// already used, lifted into one place — and the 256-deep channel the
156 /// loop always allocated, now tunable.
157 fn default() -> Self {
158 Self {
159 window: Duration::from_millis(10),
160 max: 100,
161 channel_capacity: 256,
162 }
163 }
164}
165
166/// Drive a watch with cursor-after-apply semantics.
167///
168/// Subscribes per `scope` (resuming from `resume` when it carries a position),
169/// batches updates per `config`, applies each batch via `apply`, and only then
170/// advances the cursor / folds the batch into `store` / calls `on_applied`.
171/// Returns the final applied cursor when the watch ends (shutdown signalled, or
172/// the underlying stream closed).
173///
174/// `store` is any [`SnapshotStore`] backend the consumer chose (the in-RAM
175/// [`AppendLogSnapshot`](crate::AppendLogSnapshot) default, an on-disk backend, or
176/// its own impl) — or `None` to run without persistence. On each flush, *after*
177/// `apply` returns, the whole batch of raw [`KvUpdate`]s is handed to
178/// `store.apply(batch, applied_cursor)` on a blocking task, so the store's
179/// persisted cursor is always the post-apply cursor and never names a revision
180/// whose `apply` had not returned. The store fold is atomic (data + cursor), so a
181/// crash leaves the store consistent and resume re-folds only the tail.
182///
183/// # Cursor expiry and stale-key resync
184///
185/// On [`KvError::CursorExpired`] from the `*_from` resume path, this logs and
186/// falls back to a full-scope watch (`watch_all` / `watch_prefix` /
187/// `watch_prefixes`), whose state-sync re-list re-delivers the current value of
188/// every in-scope key as puts. The re-list alone cannot cover keys that were
189/// **deleted during the gap** and whose delete markers the backend has since
190/// evicted — they simply don't appear, leaving the fold (and the caller's
191/// domain state) holding them forever.
192///
193/// When both `reader` and `store` are provided, the expiry path closes that
194/// hole: before the fallback watch starts, the bucket's live keys are listed
195/// via `reader`, diffed against the fold's in-scope keys, and a synthetic
196/// [`KvUpdate::Delete`] (with an unknown [`VersionToken`](crate::VersionToken)) is run through
197/// `parse`/`apply`/`store` for each key that vanished. The synthetic deletes
198/// are strictly ordered before the first re-list put, so a key deleted and
199/// re-created during the gap converges correctly. Without a `reader` (or
200/// without a `store` to diff against) the fallback is re-list-only and a
201/// warning marks the possible stale keys.
202///
203/// A resync that was armed but FAILS (live-key listing or fold diff error) is
204/// fatal to the watch — degrading to re-list-only would silently leave
205/// deleted keys in the fold (`tests/model.rs` proves that divergence
206/// reachable), so the error surfaces and the caller's restart retries the
207/// resume → expiry → resync path from scratch.
208///
209/// See `ARCHITECTURE.md` ("Applied-Cursor Watch") for the invariant and its
210/// rationale.
211///
212/// # Type parameters
213/// - `U`: the caller's domain update type, produced by `parse` and consumed by
214/// `apply`.
215// This combinator takes each of its dependencies as a parameter so every
216// caller-supplied closure (`parse`/`apply`/`on_applied`) keeps its own distinct
217// type and is monomorphized at the call site. Folding them into a builder struct
218// would either box the closures or force a single generic bundle, losing that.
219#[allow(clippy::too_many_arguments)]
220// The flush macro resets `batch_high`/`batch_deadline` for the next loop
221// iteration. At the two flush sites that return immediately afterward (shutdown,
222// channel-close) those resets are dead stores — correct, but flagged. The allow
223// must sit on the function: a statement-scoped `#[allow]` inside the macro body
224// trips the experimental attributes-on-expressions gate (E0658) on stable.
225#[allow(unused_assignments)]
226pub async fn watch_applied<U, S, P, A, O>(
227 watcher: Arc<dyn KvWatcher>,
228 scope: WatchScope,
229 resume: Option<WatchCursor>,
230 // `Some(reader)` arms the cursor-expired stale-key resync (see the function
231 // docs); `None` keeps the re-list-only fallback. Only consulted on expiry —
232 // the hot path never touches it.
233 reader: Option<Arc<dyn KvReader>>,
234 mut store: Option<S>,
235 // `Some(rx)` arms an export-request arm in the select loop: each
236 // [`ExportRequest`] is handled between flushes (pending batch flushed
237 // first), so the exported artifact's cursor is the applied cursor (or,
238 // across a transiently failed store flush, the store's own lagging but
239 // self-consistent cursor — never a cursor past unfolded data).
240 // `None` (or dropping the paired sender) leaves the loop's behavior
241 // unchanged.
242 mut exports: Option<mpsc::Receiver<ExportRequest>>,
243 config: BatchConfig,
244 mut parse: P,
245 mut apply: A,
246 mut on_applied: O,
247 mut shutdown: watch::Receiver<bool>,
248) -> Result<WatchCursor, KvError>
249where
250 U: Send,
251 // `Send + 'static`: each flush moves `store` onto a blocking task to run its
252 // (potentially blocking) `apply`, then takes it back — the same offload the
253 // append log's compaction always used.
254 S: SnapshotStore + Send + 'static,
255 P: FnMut(&KvUpdate) -> Option<U> + Send,
256 A: FnMut(Vec<U>) + Send,
257 O: FnMut(WatchCursor) + Send,
258{
259 // The cursor we'll return. Initialized from the resume position so that a
260 // watch which receives nothing new still reports the position it resumed
261 // from as "applied" (it is — everything up to it was applied before the last
262 // run persisted it).
263 let mut applied = match &resume {
264 Some(c) => c.clone(),
265 None => WatchCursor::none(),
266 };
267
268 // The scope's prefixes, for the resync diff against the fold. Cloned out
269 // before `scope` moves into the watch task.
270 let scope_prefixes = scope.prefixes();
271
272 // Cursor-expired resync channel, armed only when there is a reader to list
273 // live keys AND a store to diff them against. The watch task sends the live
274 // listing here and waits for the ack before starting the fallback watch, so
275 // synthetic deletes always precede the re-list.
276 let (resync_pair, mut resyncs): (Option<ResyncHandle>, Option<mpsc::Receiver<ResyncRequest>>) =
277 match reader {
278 Some(reader) if store.is_some() => {
279 let (rs_tx, rs_rx) = mpsc::channel(1);
280 (Some((reader, rs_tx)), Some(rs_rx))
281 }
282 _ => (None, None),
283 };
284
285 // Spawn the watch task. It owns the cursor-expired fallback so the main loop
286 // only ever sees a clean ordered stream of updates on `rx`.
287 let (tx, mut rx) = mpsc::channel::<KvUpdate>(config.channel_capacity.max(1));
288 let handle = {
289 let watcher = Arc::clone(&watcher);
290 tokio::spawn(
291 async move { run_watch(watcher.as_ref(), &scope, resume, resync_pair, tx).await },
292 )
293 };
294
295 // Batch state.
296 //
297 // `batch_high` tracks the version of the most recently *received* update
298 // since the last flush — including updates `parse` rejected. NATS delivers
299 // in revision order, so the last received is the highest, and advancing the
300 // cursor to it after a single atomic `apply` is correct: having seen the max
301 // means we've seen everything below it, and a rejected entry is still
302 // "nothing to apply", hence covered. Reset to `none()` after every flush.
303 // Pre-size to the flush bound so no batch ever re-climbs the reallocation
304 // ladder; `max(1)` only guards a nonsensical `max = 0` config.
305 let batch_cap = config.max.max(1);
306 let mut batch: Vec<U> = Vec::with_capacity(batch_cap);
307 // Raw received updates for the durable `store`, in revision order. Only
308 // populated when a `store` is present; the store folds the *raw* updates
309 // (including ones `parse` rejected — they are still part of the bucket's
310 // state), whereas the parsed `batch` above is the consumer's domain view.
311 let mut raw_batch: Vec<KvUpdate> = Vec::new();
312 let mut batch_high = WatchCursor::none();
313 // Consecutive store-apply failures. A transient failure re-queues its raw
314 // batch (cursor authority: the store's cursor and contents advance
315 // together, always); a persistent streak fail-stops before the re-queued
316 // backlog grows without bound.
317 const MAX_STORE_APPLY_FAILURES: u32 = 16;
318 let mut store_fail_streak: u32 = 0;
319 // `Some` once a batch has opened and the window timer is armed; `None`
320 // between flushes. Only the armed/idle distinction is read in the loop —
321 // the absolute instant lives in the pinned `sleep` future below.
322 let mut batch_deadline: Option<tokio::time::Instant> = None;
323
324 // Flush the current batch, in order: run the domain `apply` (if non-empty) to
325 // completion, advance the cursor, fold the raw batch + cursor durably into
326 // `store`, then fire `on_applied`. The store fold runs on a blocking task
327 // (its `apply` may block on I/O), moving the store in and taking it back — the
328 // same offload the append log's compaction always used. A TRANSIENT store
329 // error re-queues the raw batch for cumulative commit on the next flush
330 // (the watch continues; the store's cursor never advances past data it
331 // dropped) and a persistent failure streak is fatal; a panicked
332 // blocking task drops the store irrecoverably, which breaks the
333 // resume-after-restart guarantee, so it is surfaced as fatal.
334 macro_rules! flush {
335 () => {{
336 // Nothing received since the last flush → nothing to do at all.
337 // (`raw_batch` can be non-empty with no cursor advance only via the
338 // resync path's synthetic deletes, which carry no revision.)
339 if !batch.is_empty() || !raw_batch.is_empty() || !batch_high.is_none() {
340 if !batch.is_empty() {
341 // INVARIANT: apply() runs and RETURNS before any cursor
342 // advance below. Move the batch out so a panicking apply
343 // can't leave half-consumed state behind.
344 //
345 // `replace` (not `take`) leaves a pre-sized Vec behind so each
346 // batch after the first doesn't re-climb the reallocation
347 // ladder (4→8→…→cap).
348 apply(std::mem::replace(&mut batch, Vec::with_capacity(batch_cap)));
349 }
350 let advanced = !batch_high.is_none();
351 if advanced {
352 applied = batch_high.clone();
353 }
354 if !raw_batch.is_empty()
355 && let Some(mut st) = store.take()
356 {
357 let raw = std::mem::take(&mut raw_batch);
358 // Fold at the post-advance cursor. A synthetic-deletes-only
359 // batch leaves the cursor where it was (the deletes are a
360 // state correction, not log entries), which is safe: an
361 // unchanged — possibly expired — cursor only ever re-runs
362 // this same resync on the next restart.
363 let cur = applied.clone();
364 // Hand the store AND the raw batch back on a clean return:
365 // a *failed* apply (Ok(Err)) RE-QUEUES the batch so the
366 // next flush commits it cumulatively — the store's cursor
367 // and contents always advance together. Dropping the
368 // failed batch instead lets the NEXT successful flush
369 // advance the cursor over a hole that survives every
370 // restart (reproduced by
371 // `transient_store_failure_never_leaves_a_cursor_gap`).
372 // Only a *panicked* task (Err) loses the store: fatal.
373 match tokio::task::spawn_blocking(move || {
374 let res = st.apply(&raw, &cur);
375 (st, raw, res)
376 })
377 .await
378 {
379 Ok((st, _raw, Ok(()))) => {
380 store = Some(st);
381 store_fail_streak = 0;
382 }
383 Ok((st, raw, Err(e))) => {
384 store_fail_streak += 1;
385 if store_fail_streak >= MAX_STORE_APPLY_FAILURES {
386 // A persistently failing store would otherwise
387 // grow the re-queued batch without bound while
388 // the fold silently stales. Fail-stop: the
389 // restart refolds the tail from the store's
390 // last good cursor.
391 warn!(error = %e, streak = store_fail_streak,
392 "snapshot store apply failing persistently; aborting watch");
393 handle.abort();
394 return Err(KvError::WatchError(format!(
395 "snapshot store apply failed {store_fail_streak} consecutive times: {e}"
396 )));
397 }
398 warn!(error = %e, streak = store_fail_streak,
399 "snapshot store apply failed; batch re-queued for the next flush");
400 store = Some(st);
401 // Prepend: the failed range precedes anything
402 // received since (stream order is preserved for
403 // the eventual cumulative commit).
404 let newer = std::mem::replace(&mut raw_batch, raw);
405 raw_batch.extend(newer);
406 }
407 Err(e) => {
408 warn!(error = %e, "snapshot store task panicked; aborting watch");
409 handle.abort();
410 return Err(KvError::WatchError(format!(
411 "snapshot store task panicked: {e}"
412 )));
413 }
414 }
415 }
416 if advanced {
417 on_applied(applied.clone());
418 batch_high = WatchCursor::none();
419 }
420 }
421 batch_deadline = None;
422 }};
423 }
424
425 // A single timer future, reset in place each time a batch opens. The old
426 // `tokio::time::sleep(timeout)` lived inside the select arm, so it was
427 // re-created on every loop iteration — one Arc-backed timer-wheel entry
428 // allocated, registered, and immediately dropped per received update.
429 // Pinning one future and `reset`-ing it reuses that single allocation; the
430 // `if batch_deadline.is_some()` guard keeps it from firing while idle, so
431 // its initial already-elapsed deadline is never observed.
432 let sleep = tokio::time::sleep(Duration::ZERO);
433 tokio::pin!(sleep);
434
435 loop {
436 tokio::select! {
437 biased;
438
439 // Shutdown wins: flush whatever is batched (so the cursor reflects
440 // it), abandon any updates still in flight on the channel — they
441 // weren't applied, the cursor doesn't claim them, and they'll be
442 // re-delivered on the next resume — and return the applied cursor.
443 res = shutdown.changed() => {
444 if res.is_err() || *shutdown.borrow() {
445 flush!();
446 handle.abort();
447 // Observe the task's terminal state. An abort surfaces as a
448 // cancelled JoinError, which we ignore; a genuine panic that
449 // raced ahead of the abort is logged rather than silently lost.
450 if let Err(join) = handle.await
451 && !join.is_cancelled()
452 {
453 warn!(error = %join, "watch task panicked at shutdown");
454 }
455 return Ok(applied);
456 }
457 }
458
459 // Batch window elapsed.
460 () = &mut sleep, if batch_deadline.is_some() => {
461 flush!();
462 }
463
464 // Cursor-expired resync. Placed before `rx.recv()` (biased) so the
465 // synthetic deletes are folded before any update the fallback watch
466 // delivers — though the ack protocol already guarantees the fallback
467 // hasn't started while this arm runs. Diff the fold's in-scope keys
468 // against the bucket's live listing; anything the fold holds that
469 // the bucket no longer does vanished during the gap (its delete
470 // marker evicted with the cursor), so synthesize the delete the
471 // re-list can't deliver.
472 req = async { resyncs.as_mut().expect("arm guarded by is_some").recv().await },
473 if resyncs.is_some() => {
474 match req {
475 Some(ResyncRequest { live_keys, ack }) => {
476 // Flush first so the diff runs against a fold that
477 // reflects everything received so far.
478 flush!();
479 let live: std::collections::HashSet<&str> =
480 live_keys.iter().map(String::as_str).collect();
481 let mut stale: Vec<String> = Vec::new();
482 if let Some(st) = &store {
483 for prefix in &scope_prefixes {
484 // Stream the fold's keys rather than `range()`,
485 // which buffers every in-scope entry — values
486 // included — into one Vec. On an on-disk backend
487 // holding a fold larger than RAM (the case those
488 // backends exist for), an All-scope resync would
489 // materialize the entire fold on the repair
490 // path. Only the keys matter for the diff.
491 if let Err(e) = st.for_each_in_range(prefix, |entry| {
492 if !live.contains(entry.key.as_str()) {
493 stale.push(entry.key);
494 }
495 Ok(())
496 }) {
497 // FATAL, not a degrade: an incomplete
498 // diff silently leaves deleted keys in
499 // the fold forever (tests/model.rs
500 // proves the divergence reachable
501 // under degrade semantics). Fail the
502 // watch; the restart re-runs the
503 // resume → expiry → resync from
504 // scratch.
505 warn!(error = %e, prefix = %prefix,
506 "resync fold scan failed; aborting watch rather than diverging");
507 handle.abort();
508 return Err(KvError::WatchError(format!(
509 "cursor-expired resync failed listing fold prefix {prefix:?}: {e}"
510 )));
511 }
512 }
513 }
514 // Overlapping prefixes can list a key twice.
515 stale.sort_unstable();
516 stale.dedup();
517 if !stale.is_empty() {
518 warn!(stale = stale.len(), "cursor-expired resync: deleting keys that vanished during the gap");
519 }
520 for key in stale {
521 // Synthetic: carries no revision (unknown version)
522 // and so never advances the cursor.
523 let u = KvUpdate::Delete {
524 key,
525 version: crate::kv::VersionToken::unknown(),
526 };
527 if store.is_some() {
528 raw_batch.push(u.clone());
529 }
530 if let Some(parsed) = parse(&u) {
531 batch.push(parsed);
532 }
533 }
534 flush!();
535 // Ack AFTER the deletes are applied: the watch task is
536 // holding the fallback watch until it hears back, which
537 // is what orders deletes before the re-list
538 // (tests/model_resync_order.rs proves the barrier
539 // load-bearing). If the flush's STORE apply failed
540 // transiently, the deletes sit re-queued at the FRONT
541 // of the raw batch — still strictly before any
542 // re-list put in the eventual cumulative commit, and
543 // the domain apply saw them before this ack either
544 // way.
545 let _ = ack.send(());
546 }
547 None => resyncs = None,
548 }
549 }
550
551 // Export request. Placed after shutdown/window (they stay prompt)
552 // and before `rx.recv()` so a firehose of updates cannot starve an
553 // export indefinitely. The pending batch is flushed first, so the
554 // exported cursor is exactly the applied cursor — except when
555 // that flush's store apply transiently failed (batch re-queued):
556 // the export then captures the store's OWN lagging cursor, which
557 // is still self-consistent with its contents (cursor authority,
558 // tests/model_applied.rs); the artifact never includes unfolded
559 // data, and a bootstrap from it simply replays the short gap.
560 // The export itself runs on a blocking task with the store moved
561 // in and taken back — the same offload the flush path uses.
562 req = async { exports.as_mut().expect("arm guarded by is_some").recv().await },
563 if exports.is_some() => {
564 match req {
565 Some(ExportRequest { dest_dir, reply }) => {
566 flush!();
567 match store.take() {
568 Some(mut st) => {
569 match tokio::task::spawn_blocking(move || {
570 let res = st.export_to(&dest_dir);
571 (st, res)
572 })
573 .await
574 {
575 // Hand the store back on any clean return; an
576 // export failure goes to the requester only —
577 // the watch keeps running (the snapshot is a
578 // cache). A panicked task lost the store,
579 // which breaks the resume guarantee: fatal,
580 // same as the flush path's apply panic.
581 Ok((st, res)) => {
582 store = Some(st);
583 let _ = reply.send(res);
584 }
585 Err(e) => {
586 warn!(error = %e, "snapshot export task panicked; aborting watch");
587 handle.abort();
588 return Err(KvError::WatchError(format!(
589 "snapshot export task panicked: {e}"
590 )));
591 }
592 }
593 }
594 None => {
595 let _ = reply.send(Err(SnapshotError::Backend(
596 "watch_applied runs without a snapshot store; nothing to export"
597 .into(),
598 )));
599 }
600 }
601 }
602 // Sender dropped: disarm the arm for the rest of the run.
603 None => exports = None,
604 }
605 }
606
607 update = rx.recv() => {
608 match update {
609 Some(u) => {
610 // Cursor authority: every received update bumps the
611 // pending high-water, regardless of whether `parse`
612 // keeps it — but only when it carries a real position.
613 // An unknown version (e.g. an unparseable ACK subject
614 // on the hand-built multi-prefix consumer path) must
615 // neither mint a fake cursor nor clobber the real high
616 // from earlier in the batch; skipping it under-advances
617 // at worst, and re-delivery on resume is idempotent.
618 if !u.version().is_unknown() {
619 batch_high = WatchCursor::from_version(u.version().clone());
620 }
621
622 // Buffer the raw update for the durable store fold (which
623 // commits the whole batch + cursor atomically on flush).
624 // Done before `parse` consumes `u` by reference, and only
625 // when a store is present so the no-persistence path keeps
626 // its zero-copy cost.
627 if store.is_some() {
628 raw_batch.push(u.clone());
629 }
630
631 if let Some(parsed) = parse(&u) {
632 batch.push(parsed);
633 }
634
635 // Arm the window on the first received update of a batch
636 // — even a parse-rejected one, so the cursor advances
637 // within `window` even through a run of irrelevant keys.
638 // Reset the pinned timer to the new deadline rather than
639 // allocating a fresh `Sleep`.
640 if batch_deadline.is_none() {
641 let deadline = tokio::time::Instant::now() + config.window;
642 sleep.as_mut().reset(deadline);
643 batch_deadline = Some(deadline);
644 }
645
646 // Flush on a full parsed batch, or — when persisting — a
647 // full raw batch, so a window packed with parse-rejected
648 // updates can't grow `raw_batch` without bound before the
649 // window elapses.
650 if batch.len() >= config.max || raw_batch.len() >= config.max {
651 flush!();
652 }
653 }
654 None => {
655 // Stream closed. Flush the remainder, then surface the
656 // watch task's terminal result: a clean end returns the
657 // applied cursor, an error propagates.
658 flush!();
659 return match handle.await {
660 Ok(Ok(())) => Ok(applied),
661 Ok(Err(e)) => Err(e),
662 Err(join) => Err(KvError::WatchError(format!(
663 "watch task panicked: {join}"
664 ))),
665 };
666 }
667 }
668 }
669 }
670 }
671}
672
673/// Run the underlying watch for `scope`, resuming from `resume` when it carries
674/// a position, with the [`KvError::CursorExpired`] → resync + full-watch
675/// fallback.
676async fn run_watch(
677 watcher: &dyn KvWatcher,
678 scope: &WatchScope,
679 resume: Option<WatchCursor>,
680 resync: Option<ResyncHandle>,
681 tx: mpsc::Sender<KvUpdate>,
682) -> Result<(), KvError> {
683 // Resume only when the cursor carries a real position; an absent or `none()`
684 // cursor falls through to a full watch. Binding `cursor` here makes "we have a
685 // resume position" structural — there is no separate bool whose truth a later
686 // edit could let drift from the `Some`.
687 let resume_cursor = resume.filter(|c| !c.is_none());
688
689 match scope {
690 WatchScope::All => {
691 if let Some(cursor) = resume_cursor {
692 match watcher.watch_all_from(&cursor, tx.clone()).await {
693 Err(KvError::CursorExpired) => {
694 warn!(
695 "watch cursor expired; resyncing, then falling back to full watch_all"
696 );
697 resync_stale_keys(scope, &resync).await?;
698 watcher.watch_all(tx).await
699 }
700 other => other,
701 }
702 } else {
703 watcher.watch_all(tx).await
704 }
705 }
706 WatchScope::Prefix(prefix) => {
707 if let Some(cursor) = resume_cursor {
708 match watcher.watch_prefix_from(prefix, &cursor, tx.clone()).await {
709 Err(KvError::CursorExpired) => {
710 warn!(
711 "watch cursor expired; resyncing, then falling back to full watch_prefix"
712 );
713 resync_stale_keys(scope, &resync).await?;
714 watcher.watch_prefix(prefix, tx).await
715 }
716 other => other,
717 }
718 } else {
719 watcher.watch_prefix(prefix, tx).await
720 }
721 }
722 WatchScope::Prefixes(prefixes) => {
723 let refs: Vec<&str> = prefixes.iter().map(String::as_str).collect();
724 if let Some(cursor) = resume_cursor {
725 match watcher
726 .watch_prefixes_from(&refs, &cursor, tx.clone())
727 .await
728 {
729 Err(KvError::CursorExpired) => {
730 warn!(
731 "watch cursor expired; resyncing, then falling back to full watch_prefixes"
732 );
733 resync_stale_keys(scope, &resync).await?;
734 watcher.watch_prefixes(&refs, tx).await
735 }
736 other => other,
737 }
738 } else {
739 watcher.watch_prefixes(&refs, tx).await
740 }
741 }
742 }
743}
744
745/// Cursor-expired stale-key resync, run BEFORE the fallback watch is
746/// established: list the scope's live keys, hand them to the main loop (which
747/// diffs them against the fold and applies synthetic deletes), and wait for the
748/// ack. That ordering — deletes applied, then fallback watch armed — is what
749/// makes a delete-then-recreate during the gap converge: the synthetic delete
750/// always lands before the re-list put.
751///
752/// With no reader/store wired (`resync` is `None`) the caller explicitly opted
753/// out: warn and fall back re-list-only (keys deleted during the gap stay in
754/// the fold — `tests/model.rs` pins this divergence as reachable).
755///
756/// A FAILED listing, by contrast, is **fatal** — it fails the watch rather
757/// than degrading. The resync is load-bearing for the "stale, never corrupt"
758/// convergence guarantee: a silently degraded resync leaves the fold holding
759/// keys the bucket deleted, with one warn line as the only witness
760/// (`tests/model.rs` proves this divergence reachable under degrade
761/// semantics). Failing the watch turns the violated guarantee into a visible
762/// error; the caller's restart re-resumes, hits `CursorExpired` again, and
763/// retries the resync from scratch.
764async fn resync_stale_keys(
765 scope: &WatchScope,
766 resync: &Option<ResyncHandle>,
767) -> Result<(), KvError> {
768 let Some((reader, resync_tx)) = resync else {
769 warn!(
770 "no reader wired for cursor-expired resync; keys deleted during the gap may persist in the fold"
771 );
772 return Ok(());
773 };
774 let mut live_keys = Vec::new();
775 for prefix in scope.prefixes() {
776 match reader.keys(&prefix).await {
777 Ok(keys) => live_keys.extend(keys),
778 Err(e) => {
779 return Err(KvError::WatchError(format!(
780 "cursor-expired resync failed listing live keys under {prefix:?}: {e}; \
781 failing the watch rather than silently keeping stale keys"
782 )));
783 }
784 }
785 }
786 let (ack_tx, ack_rx) = oneshot::channel();
787 if resync_tx
788 .send(ResyncRequest {
789 live_keys,
790 ack: ack_tx,
791 })
792 .await
793 .is_ok()
794 {
795 // A dropped ack (main loop shutting down) just means the fallback watch
796 // is about to die with it; nothing to recover.
797 let _ = ack_rx.await;
798 }
799 Ok(())
800}
801
802#[cfg(test)]
803mod tests {
804 use super::*;
805 use crate::kv::{KvEntry, VersionToken};
806 use crate::snapshot::AppendLogSnapshot;
807 use async_trait::async_trait;
808 use std::sync::Mutex;
809 use std::sync::atomic::{AtomicU64, Ordering};
810 use tokio::sync::mpsc::Sender;
811
812 fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
813 KvUpdate::Put(KvEntry {
814 key: key.to_string(),
815 value: value.to_vec(),
816 version: VersionToken::from_u64(rev),
817 })
818 }
819
820 /// A scripted watcher. Delivers a pre-set list of updates through the
821 /// channel, then either holds the channel open (so window/max/shutdown
822 /// flushes can be exercised without the stream ending) or returns cleanly
823 /// (so channel-close flushing can be exercised).
824 struct MockWatcher {
825 full: Mutex<Option<Vec<KvUpdate>>>,
826 from: Mutex<Option<Vec<KvUpdate>>>,
827 from_expires: bool,
828 hold: bool,
829 }
830
831 impl MockWatcher {
832 fn new(updates: Vec<KvUpdate>, hold: bool) -> Self {
833 Self {
834 full: Mutex::new(Some(updates)),
835 from: Mutex::new(None),
836 from_expires: false,
837 hold,
838 }
839 }
840
841 async fn deliver(&self, which: &Mutex<Option<Vec<KvUpdate>>>, tx: Sender<KvUpdate>) {
842 let updates = which.lock().unwrap().take().unwrap_or_default();
843 for u in updates {
844 if tx.send(u).await.is_err() {
845 return;
846 }
847 }
848 if self.hold {
849 // Keep `tx` alive (channel open) until this task is aborted.
850 std::future::pending::<()>().await;
851 }
852 }
853 }
854
855 #[async_trait]
856 impl KvWatcher for MockWatcher {
857 async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
858 self.deliver(&self.full, tx).await;
859 Ok(())
860 }
861
862 async fn watch_prefix(&self, _prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
863 self.deliver(&self.full, tx).await;
864 Ok(())
865 }
866
867 async fn watch_prefixes(
868 &self,
869 _prefixes: &[&str],
870 tx: Sender<KvUpdate>,
871 ) -> Result<(), KvError> {
872 // This mock scripts the applied-watch resumption tests, not prefix
873 // filtering; it delivers the same `full` script as `watch_prefix`.
874 // The real multi-filter scoping is proved in the NATS integration test.
875 self.deliver(&self.full, tx).await;
876 Ok(())
877 }
878
879 async fn watch_all_from(
880 &self,
881 _cursor: &WatchCursor,
882 tx: Sender<KvUpdate>,
883 ) -> Result<(), KvError> {
884 if self.from_expires {
885 return Err(KvError::CursorExpired);
886 }
887 self.deliver(&self.from, tx).await;
888 Ok(())
889 }
890
891 // Mirror watch_all_from so the prefix resume / expiry arms of run_watch
892 // are exercised against the same `from` script. Without this the trait's
893 // default impl would delegate to watch_prefix and silently deliver the
894 // full set instead of the delta.
895 async fn watch_prefix_from(
896 &self,
897 _prefix: &str,
898 _cursor: &WatchCursor,
899 tx: Sender<KvUpdate>,
900 ) -> Result<(), KvError> {
901 if self.from_expires {
902 return Err(KvError::CursorExpired);
903 }
904 self.deliver(&self.from, tx).await;
905 Ok(())
906 }
907
908 // Same mirroring for the multi-prefix resume arm.
909 async fn watch_prefixes_from(
910 &self,
911 _prefixes: &[&str],
912 _cursor: &WatchCursor,
913 tx: Sender<KvUpdate>,
914 ) -> Result<(), KvError> {
915 if self.from_expires {
916 return Err(KvError::CursorExpired);
917 }
918 self.deliver(&self.from, tx).await;
919 Ok(())
920 }
921 }
922
923 /// A reader whose `keys()` serves a scripted live listing — the only call
924 /// the cursor-expired resync makes. Filters by prefix like a real backend
925 /// so prefix-scoped resyncs are exercised faithfully.
926 struct MockReader {
927 live: Vec<String>,
928 }
929
930 #[async_trait]
931 impl KvReader for MockReader {
932 async fn get(&self, _key: &str) -> Result<Option<KvEntry>, KvError> {
933 unreachable!("resync only lists keys")
934 }
935
936 async fn entry(&self, _key: &str) -> Result<Option<KvEntry>, KvError> {
937 unreachable!("resync only lists keys")
938 }
939
940 async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
941 Ok(self
942 .live
943 .iter()
944 .filter(|k| k.starts_with(prefix))
945 .cloned()
946 .collect())
947 }
948
949 async fn scan(&self, _prefix: &str) -> Result<Vec<KvEntry>, KvError> {
950 unreachable!("resync only lists keys")
951 }
952 }
953
954 /// A watcher whose entry points all fail. Used to prove the watch task's
955 /// terminal error is surfaced out of `watch_applied` rather than swallowed
956 /// as a clean `Ok(applied)` when the channel closes.
957 struct ErrorWatcher;
958
959 #[async_trait]
960 impl KvWatcher for ErrorWatcher {
961 async fn watch_all(&self, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
962 Err(KvError::WatchError("injected watch failure".into()))
963 }
964
965 async fn watch_prefix(&self, _prefix: &str, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
966 Err(KvError::WatchError("injected watch failure".into()))
967 }
968
969 async fn watch_prefixes(
970 &self,
971 _prefixes: &[&str],
972 _tx: Sender<KvUpdate>,
973 ) -> Result<(), KvError> {
974 Err(KvError::WatchError("injected watch failure".into()))
975 }
976 }
977
978 // A no-op parse that keeps every Put as the value bytes; drops deletes.
979 fn parse_put(u: &KvUpdate) -> Option<Vec<u8>> {
980 match u {
981 KvUpdate::Put(e) => Some(e.value.clone()),
982 _ => None,
983 }
984 }
985
986 /// The stream closes (hold = false) with a pending batch; the remainder is
987 /// flushed before returning, the returned cursor is the last revision, and
988 /// `on_applied` ran exactly once after `apply`.
989 #[tokio::test]
990 async fn flush_on_channel_close() {
991 let updates = vec![put("a", b"1", 1), put("b", b"2", 2), put("c", b"3", 3)];
992 let watcher = Arc::new(MockWatcher::new(updates, false));
993
994 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<Vec<u8>>>::new()));
995 let on_applied_cursors = Arc::new(Mutex::new(Vec::<u64>::new()));
996
997 let ab = Arc::clone(&applied_batches);
998 let oc = Arc::clone(&on_applied_cursors);
999 let (_sd_tx, sd_rx) = watch::channel(false);
1000
1001 let cursor = watch_applied(
1002 watcher,
1003 WatchScope::All,
1004 None,
1005 None, // reader (no resync in this test)
1006 None::<AppendLogSnapshot>,
1007 None,
1008 BatchConfig::default(),
1009 parse_put,
1010 move |batch| ab.lock().unwrap().push(batch),
1011 move |c| oc.lock().unwrap().push(c.as_u64().unwrap()),
1012 sd_rx,
1013 )
1014 .await
1015 .unwrap();
1016
1017 assert_eq!(cursor.as_u64(), Some(3));
1018 let batches = applied_batches.lock().unwrap();
1019 let flat: Vec<Vec<u8>> = batches.iter().flatten().cloned().collect();
1020 assert_eq!(flat, vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec()]);
1021 assert_eq!(*on_applied_cursors.lock().unwrap().last().unwrap(), 3);
1022 }
1023
1024 /// Fewer than `max` updates, then the channel idles: the window timer must
1025 /// flush them and advance the cursor.
1026 #[tokio::test(start_paused = true)]
1027 async fn flush_on_window() {
1028 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1029 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1030
1031 let applied = Arc::new(AtomicU64::new(0));
1032 let count = Arc::new(AtomicU64::new(0));
1033 let a = Arc::clone(&applied);
1034 let c = Arc::clone(&count);
1035 let (sd_tx, sd_rx) = watch::channel(false);
1036
1037 let task = tokio::spawn(watch_applied(
1038 watcher,
1039 WatchScope::All,
1040 None,
1041 None, // reader (no resync in this test)
1042 None::<AppendLogSnapshot>,
1043 None,
1044 BatchConfig::default(),
1045 parse_put,
1046 move |batch: Vec<Vec<u8>>| {
1047 c.fetch_add(batch.len() as u64, Ordering::SeqCst);
1048 },
1049 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1050 sd_rx,
1051 ));
1052
1053 // Let the window (10ms) elapse under virtual time.
1054 tokio::time::sleep(Duration::from_millis(50)).await;
1055 assert_eq!(
1056 count.load(Ordering::SeqCst),
1057 2,
1058 "window should have flushed"
1059 );
1060 assert_eq!(applied.load(Ordering::SeqCst), 2);
1061
1062 sd_tx.send(true).unwrap();
1063 let cursor = task.await.unwrap().unwrap();
1064 assert_eq!(cursor.as_u64(), Some(2));
1065 }
1066
1067 /// Exactly `max` updates fills a batch and flushes immediately — before the
1068 /// window would have elapsed.
1069 #[tokio::test(start_paused = true)]
1070 async fn flush_on_max() {
1071 let max = 4;
1072 let updates: Vec<_> = (1..=max as u64)
1073 .map(|i| put(&format!("k{i}"), b"v", i))
1074 .collect();
1075 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1076
1077 let flushes = Arc::new(Mutex::new(Vec::<usize>::new()));
1078 let f = Arc::clone(&flushes);
1079 let (sd_tx, sd_rx) = watch::channel(false);
1080
1081 let task = tokio::spawn(watch_applied(
1082 watcher,
1083 WatchScope::All,
1084 None,
1085 None, // reader (no resync in this test)
1086 None::<AppendLogSnapshot>,
1087 None,
1088 BatchConfig {
1089 window: Duration::from_secs(3600), // effectively never
1090 max,
1091 ..BatchConfig::default()
1092 },
1093 parse_put,
1094 move |batch: Vec<Vec<u8>>| f.lock().unwrap().push(batch.len()),
1095 move |_| {},
1096 sd_rx,
1097 ));
1098
1099 // Yield enough for the mock to push all `max` updates; the window is an
1100 // hour, so any flush is purely the max trigger.
1101 tokio::time::sleep(Duration::from_millis(1)).await;
1102 assert_eq!(
1103 *flushes.lock().unwrap(),
1104 vec![max],
1105 "a full batch should flush on max, not wait for the window"
1106 );
1107
1108 sd_tx.send(true).unwrap();
1109 task.await.unwrap().unwrap();
1110 }
1111
1112 /// A pending batch plus a shutdown signal: the batch is flushed and the
1113 /// applied cursor returned.
1114 #[tokio::test(start_paused = true)]
1115 async fn flush_on_shutdown() {
1116 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1117 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1118
1119 let applied = Arc::new(AtomicU64::new(0));
1120 let a = Arc::clone(&applied);
1121 let (sd_tx, sd_rx) = watch::channel(false);
1122
1123 let task = tokio::spawn(watch_applied(
1124 watcher,
1125 WatchScope::All,
1126 None,
1127 None, // reader (no resync in this test)
1128 None::<AppendLogSnapshot>,
1129 None,
1130 BatchConfig {
1131 window: Duration::from_secs(3600), // window won't fire
1132 max: 100,
1133 ..BatchConfig::default()
1134 },
1135 parse_put,
1136 move |_batch: Vec<Vec<u8>>| {},
1137 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1138 sd_rx,
1139 ));
1140
1141 // Give the mock time to deliver both updates into the pending batch.
1142 tokio::time::sleep(Duration::from_millis(1)).await;
1143 sd_tx.send(true).unwrap();
1144
1145 let cursor = task.await.unwrap().unwrap();
1146 assert_eq!(
1147 cursor.as_u64(),
1148 Some(2),
1149 "shutdown flushes the pending batch"
1150 );
1151 assert_eq!(applied.load(Ordering::SeqCst), 2);
1152 }
1153
1154 /// The cursor must not advance until `apply` has returned. We prove it by
1155 /// having `apply` read the cursor that `on_applied` last published: when the
1156 /// second batch is applied, the visible cursor must still be the *first*
1157 /// batch's — never the second's, which only becomes visible after this
1158 /// `apply` returns.
1159 #[tokio::test(start_paused = true)]
1160 async fn cursor_advances_only_after_apply() {
1161 // Two batches of `max` updates each.
1162 let max = 2usize;
1163 let updates: Vec<_> = (1..=4u64).map(|i| put(&format!("k{i}"), b"v", i)).collect();
1164 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1165
1166 // Cursor as last published by on_applied; starts at 0 (nothing applied).
1167 let published = Arc::new(AtomicU64::new(0));
1168 // What `apply` observed as the published cursor at the moment it ran.
1169 let seen_at_apply = Arc::new(Mutex::new(Vec::<u64>::new()));
1170
1171 let pub_for_apply = Arc::clone(&published);
1172 let seen = Arc::clone(&seen_at_apply);
1173 let pub_for_on = Arc::clone(&published);
1174 let (sd_tx, sd_rx) = watch::channel(false);
1175
1176 let task = tokio::spawn(watch_applied(
1177 watcher,
1178 WatchScope::All,
1179 None,
1180 None, // reader (no resync in this test)
1181 None::<AppendLogSnapshot>,
1182 None,
1183 BatchConfig {
1184 window: Duration::from_secs(3600),
1185 max,
1186 ..BatchConfig::default()
1187 },
1188 parse_put,
1189 move |_batch: Vec<Vec<u8>>| {
1190 // The cursor visible here is whatever the PREVIOUS flush
1191 // published — never this batch's, because we haven't returned.
1192 seen.lock()
1193 .unwrap()
1194 .push(pub_for_apply.load(Ordering::SeqCst));
1195 },
1196 move |cur| pub_for_on.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1197 sd_rx,
1198 ));
1199
1200 tokio::time::sleep(Duration::from_millis(1)).await;
1201 sd_tx.send(true).unwrap();
1202 task.await.unwrap().unwrap();
1203
1204 // First apply saw 0 (nothing applied yet); second apply saw 2 (first
1205 // batch's cursor), NOT 4. The cursor only reached 4 after the second
1206 // apply returned.
1207 assert_eq!(*seen_at_apply.lock().unwrap(), vec![0, 2]);
1208 assert_eq!(published.load(Ordering::SeqCst), 4);
1209 }
1210
1211 /// Updates whose `parse` returns `None` (corrupt / irrelevant) carry no
1212 /// domain work, but they were still received — so the cursor must advance
1213 /// over them.
1214 #[tokio::test]
1215 async fn corrupt_parse_entries_advance_cursor() {
1216 let updates = vec![put("a", b"1", 5), put("b", b"2", 6), put("c", b"3", 7)];
1217 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1218
1219 let apply_calls = Arc::new(AtomicU64::new(0));
1220 let on_applied_max = Arc::new(AtomicU64::new(0));
1221 let ac = Arc::clone(&apply_calls);
1222 let om = Arc::clone(&on_applied_max);
1223 let (_sd_tx, sd_rx) = watch::channel(false);
1224
1225 let cursor = watch_applied(
1226 watcher,
1227 WatchScope::All,
1228 None,
1229 None, // reader (no resync in this test)
1230 None::<AppendLogSnapshot>,
1231 None,
1232 BatchConfig::default(),
1233 // Reject everything — simulates corrupt/irrelevant entries.
1234 |_u: &KvUpdate| -> Option<Vec<u8>> { None },
1235 move |batch: Vec<Vec<u8>>| {
1236 ac.fetch_add(1, Ordering::SeqCst);
1237 assert!(batch.is_empty());
1238 },
1239 move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1240 sd_rx,
1241 )
1242 .await
1243 .unwrap();
1244
1245 assert_eq!(cursor.as_u64(), Some(7), "cursor covers rejected updates");
1246 assert_eq!(
1247 apply_calls.load(Ordering::SeqCst),
1248 0,
1249 "an all-rejected batch applies nothing"
1250 );
1251 assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1252 }
1253
1254 /// An update carrying the UNKNOWN version (an unparseable ACK subject on
1255 /// the hand-built multi-prefix consumer path) must neither mint a cursor
1256 /// position nor clobber the real high-water from earlier in the batch.
1257 /// Pre-guard behavior: `kv_message_to_update` fabricated revision 0 for
1258 /// such updates and the unconditional `batch_high = ...` adopted it,
1259 /// regressing the persisted cursor to 0. The update itself is still
1260 /// applied — only the cursor ignores it.
1261 #[tokio::test]
1262 async fn unknown_version_update_does_not_move_or_clobber_cursor() {
1263 let unknown_put = KvUpdate::Put(KvEntry {
1264 key: "u".to_string(),
1265 value: b"x".to_vec(),
1266 version: VersionToken::unknown(),
1267 });
1268 let updates = vec![put("a", b"1", 5), unknown_put];
1269 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1270
1271 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1272 let ab = Arc::clone(&applied_batches);
1273 let (_sd_tx, sd_rx) = watch::channel(false);
1274
1275 let cursor = watch_applied(
1276 watcher,
1277 WatchScope::All,
1278 None,
1279 None, // reader (no resync in this test)
1280 None::<AppendLogSnapshot>,
1281 None,
1282 BatchConfig::default(),
1283 parse_put,
1284 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1285 move |_| {},
1286 sd_rx,
1287 )
1288 .await
1289 .unwrap();
1290
1291 assert_eq!(
1292 cursor.as_u64(),
1293 Some(5),
1294 "the unknown-version update must not clobber the real batch high"
1295 );
1296 assert_eq!(
1297 *applied_batches.lock().unwrap(),
1298 vec![b"1".to_vec(), b"x".to_vec()],
1299 "the unknown-version update is still applied"
1300 );
1301 }
1302
1303 /// A resume whose cursor has expired falls back to the full watch and still
1304 /// applies the delivered updates.
1305 #[tokio::test]
1306 async fn cursor_expired_falls_back_to_full_watch() {
1307 let mock = MockWatcher {
1308 full: Mutex::new(Some(vec![put("a", b"1", 10), put("b", b"2", 11)])),
1309 from: Mutex::new(Some(vec![])),
1310 from_expires: true,
1311 hold: false,
1312 };
1313 let watcher = Arc::new(mock);
1314
1315 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1316 let ab = Arc::clone(&applied_batches);
1317 let (_sd_tx, sd_rx) = watch::channel(false);
1318
1319 let cursor = watch_applied(
1320 watcher,
1321 WatchScope::All,
1322 Some(WatchCursor::from_u64(5)), // resume position that "expired"
1323 None, // reader (no resync in this test)
1324 None::<AppendLogSnapshot>,
1325 None,
1326 BatchConfig::default(),
1327 parse_put,
1328 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1329 move |_| {},
1330 sd_rx,
1331 )
1332 .await
1333 .unwrap();
1334
1335 assert_eq!(cursor.as_u64(), Some(11));
1336 assert_eq!(
1337 *applied_batches.lock().unwrap(),
1338 vec![b"1".to_vec(), b"2".to_vec()],
1339 "fallback full watch's updates were applied"
1340 );
1341 }
1342
1343 /// Cursor-expired resync: with a reader + store wired, a key the fold holds
1344 /// that the live listing no longer does gets a synthetic delete — applied
1345 /// strictly BEFORE the fallback re-list — and the persisted fold converges
1346 /// to the live state. The synthetic delete (unknown version) must not move
1347 /// the cursor; the re-list put must.
1348 #[tokio::test]
1349 async fn cursor_expired_resync_deletes_stale_keys() {
1350 let dir = tempfile::TempDir::new().unwrap();
1351 let path = dir.path().join("resync.snap");
1352 let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1353 // The fold from the previous run: node.a and node.b at cursor 2.
1354 store
1355 .apply(
1356 &[put("node.a", b"1", 1), put("node.b", b"2", 2)],
1357 &WatchCursor::from_u64(2),
1358 )
1359 .unwrap();
1360
1361 // During the gap node.b was deleted (marker since evicted) and node.a
1362 // updated; the resume cursor (2) has expired. The fallback re-list
1363 // therefore carries only the surviving key.
1364 let mock = MockWatcher {
1365 full: Mutex::new(Some(vec![put("node.a", b"1b", 10)])),
1366 from: Mutex::new(Some(vec![])),
1367 from_expires: true,
1368 hold: false,
1369 };
1370 let reader = MockReader {
1371 live: vec!["node.a".to_string()],
1372 };
1373
1374 // Record everything `parse` sees, in order, deletes included.
1375 let seen = Arc::new(Mutex::new(Vec::<(String, bool)>::new()));
1376 let s = Arc::clone(&seen);
1377 let (_sd_tx, sd_rx) = watch::channel(false);
1378
1379 let cursor = watch_applied(
1380 Arc::new(mock),
1381 WatchScope::All,
1382 Some(WatchCursor::from_u64(2)),
1383 Some(Arc::new(reader) as Arc<dyn KvReader>),
1384 Some(store),
1385 None,
1386 BatchConfig::default(),
1387 move |u: &KvUpdate| {
1388 s.lock()
1389 .unwrap()
1390 .push((u.key().to_string(), matches!(u, KvUpdate::Delete { .. })));
1391 Some(())
1392 },
1393 |_batch: Vec<()>| {},
1394 |_| {},
1395 sd_rx,
1396 )
1397 .await
1398 .unwrap();
1399
1400 // The re-list put advanced the cursor; the synthetic delete did not.
1401 assert_eq!(cursor.as_u64(), Some(10));
1402 // The synthetic delete strictly precedes the re-list put.
1403 assert_eq!(
1404 *seen.lock().unwrap(),
1405 vec![("node.b".to_string(), true), ("node.a".to_string(), false)],
1406 "synthetic delete must be applied before the fallback re-list"
1407 );
1408
1409 // The persisted fold converged: stale key gone, live key updated.
1410 let snap = crate::snapshot::load(&path).unwrap().unwrap();
1411 assert_eq!(snap.cursor.as_u64(), Some(10));
1412 assert_eq!(snap.entries.len(), 1);
1413 assert_eq!(snap.entries["node.a"].value, b"1b");
1414 }
1415
1416 /// A prefix-scoped resync diffs only in-scope keys: an out-of-scope key the
1417 /// fold holds survives, the in-scope stale key is deleted, and a flush
1418 /// containing only synthetic deletes leaves the cursor untouched.
1419 #[tokio::test]
1420 async fn cursor_expired_resync_respects_scope() {
1421 let dir = tempfile::TempDir::new().unwrap();
1422 let path = dir.path().join("resync-scope.snap");
1423 let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1424 store
1425 .apply(
1426 &[put("node.b", b"2", 1), put("other.z", b"9", 2)],
1427 &WatchCursor::from_u64(2),
1428 )
1429 .unwrap();
1430
1431 // Expired resume; the bucket no longer has ANY node.* keys; the
1432 // fallback re-list is empty.
1433 let mock = MockWatcher {
1434 full: Mutex::new(Some(vec![])),
1435 from: Mutex::new(Some(vec![])),
1436 from_expires: true,
1437 hold: false,
1438 };
1439 let reader = MockReader { live: vec![] };
1440 let (_sd_tx, sd_rx) = watch::channel(false);
1441
1442 let cursor = watch_applied(
1443 Arc::new(mock),
1444 WatchScope::Prefix("node.".to_string()),
1445 Some(WatchCursor::from_u64(2)),
1446 Some(Arc::new(reader) as Arc<dyn KvReader>),
1447 Some(store),
1448 None,
1449 BatchConfig::default(),
1450 |_u: &KvUpdate| Some(()),
1451 |_batch: Vec<()>| {},
1452 |_| {},
1453 sd_rx,
1454 )
1455 .await
1456 .unwrap();
1457
1458 // Deletes-only flush: cursor stays at the resume position.
1459 assert_eq!(cursor.as_u64(), Some(2));
1460
1461 let snap = crate::snapshot::load(&path).unwrap().unwrap();
1462 assert_eq!(snap.cursor.as_u64(), Some(2));
1463 assert!(
1464 !snap.entries.contains_key("node.b"),
1465 "in-scope stale key must be resync-deleted"
1466 );
1467 assert_eq!(
1468 snap.entries["other.z"].value, b"9",
1469 "out-of-scope key must survive a prefix-scoped resync"
1470 );
1471 }
1472
1473 /// `WatchScope::Prefixes` dispatches to `watch_prefixes` (no resume) and to
1474 /// `watch_prefixes_from` with the expiry → full-watch fallback (resume).
1475 #[tokio::test]
1476 async fn prefixes_scope_dispatches_full_watch() {
1477 let updates = vec![put("a.x", b"1", 1), put("b.y", b"2", 2)];
1478 let watcher = Arc::new(MockWatcher::new(updates, false));
1479 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1480 let ab = Arc::clone(&applied_batches);
1481 let (_sd_tx, sd_rx) = watch::channel(false);
1482
1483 let cursor = watch_applied(
1484 watcher,
1485 WatchScope::Prefixes(vec!["a.".to_string(), "b.".to_string()]),
1486 None,
1487 None, // reader (no resync in this test)
1488 None::<AppendLogSnapshot>,
1489 None,
1490 BatchConfig::default(),
1491 parse_put,
1492 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1493 move |_| {},
1494 sd_rx,
1495 )
1496 .await
1497 .unwrap();
1498
1499 assert_eq!(cursor.as_u64(), Some(2));
1500 assert_eq!(
1501 *applied_batches.lock().unwrap(),
1502 vec![b"1".to_vec(), b"2".to_vec()]
1503 );
1504 }
1505
1506 /// `WatchScope::Prefixes` resume whose cursor has expired falls back to the
1507 /// full multi-prefix watch and applies its updates.
1508 #[tokio::test]
1509 async fn prefixes_scope_expired_resume_falls_back() {
1510 let mock = MockWatcher {
1511 full: Mutex::new(Some(vec![put("a.x", b"1", 7)])),
1512 from: Mutex::new(Some(vec![])),
1513 from_expires: true,
1514 hold: false,
1515 };
1516 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1517 let ab = Arc::clone(&applied_batches);
1518 let (_sd_tx, sd_rx) = watch::channel(false);
1519
1520 let cursor = watch_applied(
1521 Arc::new(mock),
1522 WatchScope::Prefixes(vec!["a.".to_string()]),
1523 Some(WatchCursor::from_u64(3)),
1524 None, // reader (no resync in this test)
1525 None::<AppendLogSnapshot>,
1526 None,
1527 BatchConfig::default(),
1528 parse_put,
1529 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1530 move |_| {},
1531 sd_rx,
1532 )
1533 .await
1534 .unwrap();
1535
1536 assert_eq!(cursor.as_u64(), Some(7));
1537 assert_eq!(*applied_batches.lock().unwrap(), vec![b"1".to_vec()]);
1538 }
1539
1540 /// End-to-end with a real snapshot file: after the run, the persisted
1541 /// snapshot's cursor equals the applied cursor and its entries match the
1542 /// applied state — proving the checkpoint is written at the post-apply
1543 /// cursor, never ahead of it.
1544 #[tokio::test]
1545 async fn snapshot_checkpoint_matches_applied_cursor() {
1546 let dir = tempfile::TempDir::new().unwrap();
1547 let path = dir.path().join("applied.snap");
1548 let (_resume, store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1549
1550 let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
1551 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1552 let (_sd_tx, sd_rx) = watch::channel(false);
1553
1554 let cursor = watch_applied(
1555 watcher,
1556 WatchScope::All,
1557 None,
1558 None, // reader (no resync in this test)
1559 Some(store),
1560 None,
1561 BatchConfig::default(),
1562 parse_put,
1563 move |_batch: Vec<Vec<u8>>| {},
1564 move |_| {},
1565 sd_rx,
1566 )
1567 .await
1568 .unwrap();
1569
1570 assert_eq!(cursor.as_u64(), Some(2));
1571
1572 let snap = crate::snapshot::load(&path).unwrap().unwrap();
1573 assert_eq!(
1574 snap.cursor.as_u64(),
1575 cursor.as_u64(),
1576 "snapshot checkpoint cursor must equal the applied cursor"
1577 );
1578 assert_eq!(snap.entries.len(), 2);
1579 assert_eq!(snap.entries["node.a"].value, b"1");
1580 assert_eq!(snap.entries["node.b"].value, b"2");
1581 }
1582
1583 /// Happy-path resume: a non-expired cursor takes the `*_from` path and the
1584 /// delta (the `from` script, NOT the full set) is applied. Proves the
1585 /// resume branch delivers only post-cursor updates and advances to their
1586 /// max revision.
1587 #[tokio::test]
1588 async fn resume_from_cursor_delivers_only_delta() {
1589 let mock = MockWatcher {
1590 // `full` would be delivered only if the resume path were (wrongly)
1591 // bypassed; a non-empty distinguishing value makes that visible.
1592 full: Mutex::new(Some(vec![put("full.x", b"FULL", 1)])),
1593 from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
1594 from_expires: false,
1595 hold: false,
1596 };
1597 let watcher = Arc::new(mock);
1598
1599 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1600 let ab = Arc::clone(&applied_batches);
1601 let (_sd_tx, sd_rx) = watch::channel(false);
1602
1603 let cursor = watch_applied(
1604 watcher,
1605 WatchScope::All,
1606 Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
1607 None, // reader (no resync in this test)
1608 None::<AppendLogSnapshot>,
1609 None,
1610 BatchConfig::default(),
1611 parse_put,
1612 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1613 move |_| {},
1614 sd_rx,
1615 )
1616 .await
1617 .unwrap();
1618
1619 assert_eq!(
1620 cursor.as_u64(),
1621 Some(11),
1622 "cursor advances to the delta max"
1623 );
1624 assert_eq!(
1625 *applied_batches.lock().unwrap(),
1626 vec![b"3".to_vec(), b"4".to_vec()],
1627 "only the post-cursor delta is applied, never the full set"
1628 );
1629 }
1630
1631 /// `WatchScope::Prefix` with no resume dispatches to `watch_prefix` and
1632 /// applies the delivered updates. Every other test uses `WatchScope::All`;
1633 /// this covers the prefix dispatch arm.
1634 #[tokio::test]
1635 async fn prefix_scope_applies_delivered_updates() {
1636 let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
1637 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1638
1639 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1640 let ab = Arc::clone(&applied_batches);
1641 let (_sd_tx, sd_rx) = watch::channel(false);
1642
1643 let cursor = watch_applied(
1644 watcher,
1645 WatchScope::Prefix("node.".to_string()),
1646 None,
1647 None, // reader (no resync in this test)
1648 None::<AppendLogSnapshot>,
1649 None,
1650 BatchConfig::default(),
1651 parse_put,
1652 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1653 move |_| {},
1654 sd_rx,
1655 )
1656 .await
1657 .unwrap();
1658
1659 assert_eq!(cursor.as_u64(), Some(2));
1660 assert_eq!(
1661 *applied_batches.lock().unwrap(),
1662 vec![b"1".to_vec(), b"2".to_vec()]
1663 );
1664 }
1665
1666 /// `WatchScope::Prefix` happy-path resume: a non-expired cursor takes the
1667 /// `watch_prefix_from` path and only the delta is applied — the prefix
1668 /// twin of `resume_from_cursor_delivers_only_delta`.
1669 #[tokio::test]
1670 async fn prefix_resume_from_cursor_delivers_only_delta() {
1671 let mock = MockWatcher {
1672 // `full` would be delivered only if the resume path were (wrongly)
1673 // bypassed; a distinguishing value makes that visible.
1674 full: Mutex::new(Some(vec![put("node.x", b"FULL", 1)])),
1675 from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
1676 from_expires: false,
1677 hold: false,
1678 };
1679 let watcher = Arc::new(mock);
1680
1681 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1682 let ab = Arc::clone(&applied_batches);
1683 let (_sd_tx, sd_rx) = watch::channel(false);
1684
1685 let cursor = watch_applied(
1686 watcher,
1687 WatchScope::Prefix("node.".to_string()),
1688 Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
1689 None, // reader (no resync in this test)
1690 None::<AppendLogSnapshot>,
1691 None,
1692 BatchConfig::default(),
1693 parse_put,
1694 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1695 move |_| {},
1696 sd_rx,
1697 )
1698 .await
1699 .unwrap();
1700
1701 assert_eq!(
1702 cursor.as_u64(),
1703 Some(11),
1704 "cursor advances to the delta max"
1705 );
1706 assert_eq!(
1707 *applied_batches.lock().unwrap(),
1708 vec![b"3".to_vec(), b"4".to_vec()],
1709 "only the post-cursor delta is applied via watch_prefix_from"
1710 );
1711 }
1712
1713 /// `WatchScope::Prefix` resume whose cursor has expired falls back to the
1714 /// full `watch_prefix` and still applies the delivered updates — the prefix
1715 /// twin of `cursor_expired_falls_back_to_full_watch`.
1716 #[tokio::test]
1717 async fn prefix_cursor_expired_falls_back_to_full_prefix_watch() {
1718 let mock = MockWatcher {
1719 full: Mutex::new(Some(vec![put("node.a", b"1", 10), put("node.b", b"2", 11)])),
1720 from: Mutex::new(Some(vec![])),
1721 from_expires: true,
1722 hold: false,
1723 };
1724 let watcher = Arc::new(mock);
1725
1726 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1727 let ab = Arc::clone(&applied_batches);
1728 let (_sd_tx, sd_rx) = watch::channel(false);
1729
1730 let cursor = watch_applied(
1731 watcher,
1732 WatchScope::Prefix("node.".to_string()),
1733 Some(WatchCursor::from_u64(5)), // resume position that "expired"
1734 None, // reader (no resync in this test)
1735 None::<AppendLogSnapshot>,
1736 None,
1737 BatchConfig::default(),
1738 parse_put,
1739 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1740 move |_| {},
1741 sd_rx,
1742 )
1743 .await
1744 .unwrap();
1745
1746 assert_eq!(cursor.as_u64(), Some(11));
1747 assert_eq!(
1748 *applied_batches.lock().unwrap(),
1749 vec![b"1".to_vec(), b"2".to_vec()],
1750 "prefix fallback full watch's updates were applied"
1751 );
1752 }
1753
1754 /// The watch task's terminal error must propagate out of `watch_applied`
1755 /// rather than being swallowed as `Ok(applied)` when the channel closes.
1756 #[tokio::test]
1757 async fn watch_task_error_propagates() {
1758 let watcher = Arc::new(ErrorWatcher);
1759 let (_sd_tx, sd_rx) = watch::channel(false);
1760
1761 let result = watch_applied(
1762 watcher,
1763 WatchScope::All,
1764 None,
1765 None, // reader (no resync in this test)
1766 None::<AppendLogSnapshot>,
1767 None,
1768 BatchConfig::default(),
1769 parse_put,
1770 move |_batch: Vec<Vec<u8>>| {},
1771 move |_| {},
1772 sd_rx,
1773 )
1774 .await;
1775
1776 match result {
1777 Err(KvError::WatchError(msg)) => {
1778 assert!(msg.contains("injected"), "error carries the cause: {msg}");
1779 }
1780 other => panic!("expected WatchError, got {other:?}"),
1781 }
1782 }
1783
1784 /// A batch where `parse` accepts some updates and rejects others: the cursor
1785 /// must still advance to the highest *received* revision (covering the
1786 /// rejected entry in the middle), while `apply` sees only the accepted ones.
1787 #[tokio::test]
1788 async fn mixed_parse_advances_cursor_over_rejected_entries() {
1789 let updates = vec![
1790 put("keep.a", b"1", 5),
1791 put("skip.b", b"2", 6), // rejected by parse
1792 put("keep.c", b"3", 7),
1793 ];
1794 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1795
1796 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1797 let on_applied_max = Arc::new(AtomicU64::new(0));
1798 let ab = Arc::clone(&applied_batches);
1799 let om = Arc::clone(&on_applied_max);
1800 let (_sd_tx, sd_rx) = watch::channel(false);
1801
1802 let cursor = watch_applied(
1803 watcher,
1804 WatchScope::All,
1805 None,
1806 None, // reader (no resync in this test)
1807 None::<AppendLogSnapshot>,
1808 None,
1809 BatchConfig::default(),
1810 // Keep only keys under "keep."; reject everything else.
1811 |u: &KvUpdate| -> Option<Vec<u8>> {
1812 match u {
1813 KvUpdate::Put(e) if e.key.starts_with("keep.") => Some(e.value.clone()),
1814 _ => None,
1815 }
1816 },
1817 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1818 move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1819 sd_rx,
1820 )
1821 .await
1822 .unwrap();
1823
1824 assert_eq!(
1825 cursor.as_u64(),
1826 Some(7),
1827 "cursor covers the rejected middle entry (rev 6)"
1828 );
1829 assert_eq!(
1830 *applied_batches.lock().unwrap(),
1831 vec![b"1".to_vec(), b"3".to_vec()],
1832 "apply sees only the accepted entries"
1833 );
1834 assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1835 }
1836
1837 /// Shutdown before any update arrives: nothing was received, so the cursor
1838 /// stays at the resume position (here `none()`), `apply` never runs, and
1839 /// `on_applied` never fires.
1840 #[tokio::test(start_paused = true)]
1841 async fn shutdown_with_no_pending_batch() {
1842 let watcher = Arc::new(MockWatcher::new(vec![], true)); // deliver nothing, hold open
1843
1844 let apply_calls = Arc::new(AtomicU64::new(0));
1845 let on_applied_calls = Arc::new(AtomicU64::new(0));
1846 let ac = Arc::clone(&apply_calls);
1847 let oc = Arc::clone(&on_applied_calls);
1848 let (sd_tx, sd_rx) = watch::channel(false);
1849
1850 let task = tokio::spawn(watch_applied(
1851 watcher,
1852 WatchScope::All,
1853 None,
1854 None, // reader (no resync in this test)
1855 None::<AppendLogSnapshot>,
1856 None,
1857 BatchConfig::default(),
1858 parse_put,
1859 move |_batch: Vec<Vec<u8>>| {
1860 ac.fetch_add(1, Ordering::SeqCst);
1861 },
1862 move |_| {
1863 oc.fetch_add(1, Ordering::SeqCst);
1864 },
1865 sd_rx,
1866 ));
1867
1868 // Let the watcher attach and idle (it has nothing to deliver), then shut down.
1869 tokio::time::sleep(Duration::from_millis(1)).await;
1870 sd_tx.send(true).unwrap();
1871
1872 let cursor = task.await.unwrap().unwrap();
1873 assert_eq!(
1874 cursor.as_u64(),
1875 None,
1876 "no updates received → cursor unmoved"
1877 );
1878 assert_eq!(apply_calls.load(Ordering::SeqCst), 0, "apply never runs");
1879 assert_eq!(
1880 on_applied_calls.load(Ordering::SeqCst),
1881 0,
1882 "on_applied never fires"
1883 );
1884 }
1885
1886 /// An [`ExportRequest`] flushes the pending batch first, so the artifact's
1887 /// cursor is exactly the applied cursor — and the artifact is importable
1888 /// with the batched entries in it.
1889 #[tokio::test(start_paused = true)]
1890 async fn export_request_flushes_pending_batch_first() {
1891 let dir = tempfile::TempDir::new().unwrap();
1892 let store_path = dir.path().join("fold.snap");
1893 let artifact = dir.path().join("artifact");
1894 let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1895
1896 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1897 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1898 let (sd_tx, sd_rx) = watch::channel(false);
1899 let (ex_tx, ex_rx) = mpsc::channel(1);
1900
1901 let task = tokio::spawn(watch_applied(
1902 watcher,
1903 WatchScope::All,
1904 None,
1905 None, // reader (no resync in this test)
1906 Some(store),
1907 Some(ex_rx),
1908 BatchConfig {
1909 window: Duration::from_secs(3600), // window never fires
1910 max: 100,
1911 ..BatchConfig::default()
1912 },
1913 parse_put,
1914 move |_batch: Vec<Vec<u8>>| {},
1915 move |_| {},
1916 sd_rx,
1917 ));
1918
1919 // Let both updates land in the (unflushed) pending batch, then export.
1920 tokio::time::sleep(Duration::from_millis(1)).await;
1921 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1922 ex_tx
1923 .send(ExportRequest {
1924 dest_dir: artifact.clone(),
1925 reply: reply_tx,
1926 })
1927 .await
1928 .unwrap();
1929
1930 let manifest = reply_rx.await.unwrap().expect("export succeeds");
1931 assert_eq!(
1932 manifest.cursor.as_u64(),
1933 Some(2),
1934 "pending batch flushed before export: artifact cursor is the applied cursor"
1935 );
1936
1937 // The artifact is importable and holds both batched entries.
1938 let (cursor, imported) =
1939 AppendLogSnapshot::import(&artifact, &dir.path().join("imported.snap"), u64::MAX)
1940 .unwrap();
1941 assert_eq!(cursor.as_u64(), Some(2));
1942 assert_eq!(imported.get("a").unwrap().unwrap().value, b"1");
1943 assert_eq!(imported.get("b").unwrap().unwrap().value, b"2");
1944
1945 sd_tx.send(true).unwrap();
1946 task.await.unwrap().unwrap();
1947 }
1948
1949 /// An [`ExportRequest`] that arrives with NOTHING pending (the window
1950 /// already flushed everything) still produces a valid artifact whose
1951 /// cursor is the applied cursor. The flush-before-export step must be a
1952 /// clean no-op, not an error or a cursor regression.
1953 #[tokio::test(start_paused = true)]
1954 async fn export_with_empty_pending_batch_succeeds() {
1955 let dir = tempfile::TempDir::new().unwrap();
1956 let store_path = dir.path().join("fold.snap");
1957 let artifact = dir.path().join("artifact");
1958 let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1959
1960 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1961 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1962 let (sd_tx, sd_rx) = watch::channel(false);
1963 let (ex_tx, ex_rx) = mpsc::channel(1);
1964
1965 let task = tokio::spawn(watch_applied(
1966 watcher,
1967 WatchScope::All,
1968 None,
1969 None, // reader (no resync in this test)
1970 Some(store),
1971 Some(ex_rx),
1972 BatchConfig::default(), // 10 ms window
1973 parse_put,
1974 move |_batch: Vec<Vec<u8>>| {},
1975 move |_| {},
1976 sd_rx,
1977 ));
1978
1979 // Let the window flush both updates, so the export request finds an
1980 // EMPTY pending batch.
1981 tokio::time::sleep(Duration::from_millis(50)).await;
1982
1983 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1984 ex_tx
1985 .send(ExportRequest {
1986 dest_dir: artifact.clone(),
1987 reply: reply_tx,
1988 })
1989 .await
1990 .unwrap();
1991 let manifest = reply_rx
1992 .await
1993 .unwrap()
1994 .expect("export succeeds with nothing pending");
1995 assert_eq!(
1996 manifest.cursor.as_u64(),
1997 Some(2),
1998 "artifact cursor is the applied cursor, unchanged by the no-op flush"
1999 );
2000
2001 // The artifact is importable and holds the already-flushed entries.
2002 let (cursor, imported) =
2003 AppendLogSnapshot::import(&artifact, &dir.path().join("imported.snap"), u64::MAX)
2004 .unwrap();
2005 assert_eq!(cursor.as_u64(), Some(2));
2006 assert_eq!(imported.get("a").unwrap().unwrap().value, b"1");
2007 assert_eq!(imported.get("b").unwrap().unwrap().value, b"2");
2008
2009 sd_tx.send(true).unwrap();
2010 task.await.unwrap().unwrap();
2011 }
2012
2013 /// An export request against a store-less watch replies with an error and
2014 /// the watch keeps running.
2015 #[tokio::test(start_paused = true)]
2016 async fn export_without_store_replies_error() {
2017 let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
2018 let (sd_tx, sd_rx) = watch::channel(false);
2019 let (ex_tx, ex_rx) = mpsc::channel(1);
2020
2021 let task = tokio::spawn(watch_applied(
2022 watcher,
2023 WatchScope::All,
2024 None,
2025 None, // reader (no resync in this test)
2026 None::<AppendLogSnapshot>,
2027 Some(ex_rx),
2028 BatchConfig::default(),
2029 parse_put,
2030 move |_batch: Vec<Vec<u8>>| {},
2031 move |_| {},
2032 sd_rx,
2033 ));
2034
2035 tokio::time::sleep(Duration::from_millis(1)).await;
2036 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
2037 ex_tx
2038 .send(ExportRequest {
2039 dest_dir: std::env::temp_dir().join("never-created"),
2040 reply: reply_tx,
2041 })
2042 .await
2043 .unwrap();
2044 assert!(
2045 reply_rx.await.unwrap().is_err(),
2046 "no store → export errors via the reply"
2047 );
2048
2049 // The watch is still alive and returns its applied cursor on shutdown.
2050 sd_tx.send(true).unwrap();
2051 let cursor = task.await.unwrap().unwrap();
2052 assert_eq!(cursor.as_u64(), Some(1));
2053 }
2054
2055 /// An export failure (unavailable destination) is reported on the reply and
2056 /// the watch keeps applying later updates.
2057 #[tokio::test(start_paused = true)]
2058 async fn export_error_does_not_kill_watch() {
2059 let dir = tempfile::TempDir::new().unwrap();
2060 let store_path = dir.path().join("fold.snap");
2061 let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
2062
2063 // Occupied destination → export fails.
2064 let occupied = dir.path().join("occupied");
2065 std::fs::create_dir(&occupied).unwrap();
2066 std::fs::write(occupied.join("stray"), b"x").unwrap();
2067
2068 let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
2069 let (sd_tx, sd_rx) = watch::channel(false);
2070 let (ex_tx, ex_rx) = mpsc::channel(1);
2071
2072 let applied = Arc::new(AtomicU64::new(0));
2073 let a = Arc::clone(&applied);
2074
2075 let task = tokio::spawn(watch_applied(
2076 watcher,
2077 WatchScope::All,
2078 None,
2079 None, // reader (no resync in this test)
2080 Some(store),
2081 Some(ex_rx),
2082 BatchConfig::default(),
2083 parse_put,
2084 move |_batch: Vec<Vec<u8>>| {},
2085 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
2086 sd_rx,
2087 ));
2088
2089 tokio::time::sleep(Duration::from_millis(1)).await;
2090 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
2091 ex_tx
2092 .send(ExportRequest {
2093 dest_dir: occupied,
2094 reply: reply_tx,
2095 })
2096 .await
2097 .unwrap();
2098 match reply_rx.await.unwrap() {
2099 Err(crate::snapshot::SnapshotError::ArtifactInvalid(_)) => {}
2100 other => panic!("expected ArtifactInvalid, got {other:?}"),
2101 }
2102
2103 // Watch still folds: a clean shutdown returns the applied cursor.
2104 sd_tx.send(true).unwrap();
2105 let cursor = task.await.unwrap().unwrap();
2106 assert_eq!(cursor.as_u64(), Some(1), "watch survived the failed export");
2107 assert_eq!(applied.load(Ordering::SeqCst), 1);
2108 }
2109
2110 /// Dropping the export sender disarms the arm; the loop keeps batching and
2111 /// flushing normally.
2112 #[tokio::test(start_paused = true)]
2113 async fn export_sender_dropped_disarms_channel() {
2114 let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
2115 let (sd_tx, sd_rx) = watch::channel(false);
2116 let (ex_tx, ex_rx) = mpsc::channel::<ExportRequest>(1);
2117
2118 let applied = Arc::new(AtomicU64::new(0));
2119 let a = Arc::clone(&applied);
2120
2121 let task = tokio::spawn(watch_applied(
2122 watcher,
2123 WatchScope::All,
2124 None,
2125 None, // reader (no resync in this test)
2126 None::<AppendLogSnapshot>,
2127 Some(ex_rx),
2128 BatchConfig::default(),
2129 parse_put,
2130 move |_batch: Vec<Vec<u8>>| {},
2131 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
2132 sd_rx,
2133 ));
2134
2135 drop(ex_tx); // disarm
2136 tokio::time::sleep(Duration::from_millis(50)).await;
2137 assert_eq!(
2138 applied.load(Ordering::SeqCst),
2139 1,
2140 "loop keeps flushing after the export sender is gone"
2141 );
2142
2143 sd_tx.send(true).unwrap();
2144 task.await.unwrap().unwrap();
2145 }
2146
2147 /// With a low `compact_threshold`, the flush path's `spawn_blocking`
2148 /// compaction actually fires (every other snapshot test pins the threshold
2149 /// at `u64::MAX`, leaving that branch dead). After a compacting run the
2150 /// snapshot must still load cleanly with the right cursor and entries.
2151 #[tokio::test]
2152 async fn snapshot_compaction_fires_and_stays_consistent() {
2153 let dir = tempfile::TempDir::new().unwrap();
2154 let path = dir.path().join("applied.snap");
2155 // threshold 0 → every checkpoint reports "needs compact", forcing the
2156 // store's inline-compaction branch on each flush (run off the hot path via
2157 // spawn_blocking inside watch_applied).
2158 let (_resume, store) = AppendLogSnapshot::open(&path, 0).unwrap();
2159
2160 // Re-put the same key across flushes so compaction has duplicates to
2161 // dedup; small max forces multiple flushes (hence multiple compactions).
2162 let updates = vec![
2163 put("node.a", b"1", 1),
2164 put("node.a", b"2", 2),
2165 put("node.b", b"3", 3),
2166 put("node.a", b"4", 4),
2167 ];
2168 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
2169 let (_sd_tx, sd_rx) = watch::channel(false);
2170
2171 let cursor = watch_applied(
2172 watcher,
2173 WatchScope::All,
2174 None,
2175 None, // reader (no resync in this test)
2176 Some(store),
2177 None,
2178 BatchConfig {
2179 window: Duration::from_secs(3600),
2180 max: 1, // one update per flush → a compaction per update
2181 ..BatchConfig::default()
2182 },
2183 parse_put,
2184 move |_batch: Vec<Vec<u8>>| {},
2185 move |_| {},
2186 sd_rx,
2187 )
2188 .await
2189 .unwrap();
2190
2191 assert_eq!(cursor.as_u64(), Some(4));
2192
2193 let snap = crate::snapshot::load(&path).unwrap().unwrap();
2194 assert_eq!(
2195 snap.cursor.as_u64(),
2196 cursor.as_u64(),
2197 "compacted snapshot's cursor still equals the applied cursor"
2198 );
2199 assert_eq!(snap.entries.len(), 2, "duplicates of node.a deduped");
2200 assert_eq!(
2201 snap.entries["node.a"].value, b"4",
2202 "last write per key survives compaction"
2203 );
2204 assert_eq!(snap.entries["node.b"].value, b"3");
2205 }
2206 /// A SnapshotStore whose FIRST apply fails (transient store error: disk
2207 /// pressure, lock timeout), then behaves normally — the trigger for the
2208 /// lost-raw-batch hazard in the flush path.
2209 struct FailOnceStore {
2210 inner: AppendLogSnapshot,
2211 failed: std::sync::atomic::AtomicBool,
2212 }
2213
2214 impl crate::snapshot::SnapshotStore for FailOnceStore {
2215 fn load(
2216 _path: &std::path::Path,
2217 ) -> Result<(WatchCursor, Self), crate::snapshot::SnapshotError> {
2218 unreachable!("test store is constructed directly")
2219 }
2220 fn apply(
2221 &mut self,
2222 batch: &[KvUpdate],
2223 cursor: &WatchCursor,
2224 ) -> Result<(), crate::snapshot::SnapshotError> {
2225 if !self.failed.swap(true, Ordering::SeqCst) {
2226 return Err(crate::snapshot::SnapshotError::Backend(
2227 "injected transient store failure".into(),
2228 ));
2229 }
2230 self.inner.apply(batch, cursor)
2231 }
2232 fn get(&self, key: &str) -> Result<Option<KvEntry>, crate::snapshot::SnapshotError> {
2233 self.inner.get(key)
2234 }
2235 fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, crate::snapshot::SnapshotError> {
2236 self.inner.range(prefix)
2237 }
2238 fn cursor(&self) -> WatchCursor {
2239 self.inner.cursor()
2240 }
2241 fn export_to(
2242 &mut self,
2243 dest_dir: &std::path::Path,
2244 ) -> Result<crate::artifact::ExportManifest, crate::snapshot::SnapshotError> {
2245 self.inner.export_to(dest_dir)
2246 }
2247 }
2248
2249 /// CURSOR AUTHORITY under a transient store failure: a failed store apply
2250 /// must NOT cause later successful applies to advance the persisted
2251 /// cursor past data that never landed. The failed batch is re-queued and
2252 /// committed cumulatively with the next flush, so the store's cursor
2253 /// never lies about its contents — a restart resuming from it sees
2254 /// exactly the missing tail, not a silent hole.
2255 ///
2256 /// (Pre-fix behavior, found while writing the watch_applied model: the
2257 /// failed batch's raw updates were dropped on the warn-and-continue
2258 /// path, and the NEXT successful flush committed only newer updates
2259 /// under the newest cursor — a permanent, restart-surviving gap in the
2260 /// fold.)
2261 #[tokio::test(start_paused = true)]
2262 async fn transient_store_failure_never_leaves_a_cursor_gap() {
2263 let dir = tempfile::TempDir::new().unwrap();
2264 let path = dir.path().join("fold.snap");
2265 let (_r, inner) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
2266 let store = FailOnceStore {
2267 inner,
2268 failed: std::sync::atomic::AtomicBool::new(false),
2269 };
2270
2271 // max: 1 -> one flush per update: flush #1 (a@1) hits the injected
2272 // failure, flush #2 (b@2) succeeds.
2273 let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
2274 let watcher = Arc::new(MockWatcher::new(updates, true));
2275 let (sd_tx, sd_rx) = watch::channel(false);
2276
2277 let task = tokio::spawn(watch_applied(
2278 watcher,
2279 WatchScope::All,
2280 None,
2281 None,
2282 Some(store),
2283 None,
2284 BatchConfig {
2285 window: Duration::from_millis(1),
2286 max: 1,
2287 ..BatchConfig::default()
2288 },
2289 parse_put,
2290 |_batch: Vec<Vec<u8>>| {},
2291 |_| {},
2292 sd_rx,
2293 ));
2294
2295 tokio::time::sleep(Duration::from_millis(50)).await;
2296 sd_tx.send(true).unwrap();
2297 let cursor = task.await.unwrap().unwrap();
2298 assert_eq!(cursor.as_u64(), Some(2));
2299
2300 // The store on disk must be SELF-CONSISTENT: whatever its cursor
2301 // claims, the data at or below it is present. With the re-queue fix
2302 // the cumulative commit lands both keys at cursor 2.
2303 let (persisted, reopened) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
2304 assert_eq!(persisted.as_u64(), Some(2), "cursor reached the head");
2305 assert_eq!(
2306 reopened.get("node.a").unwrap().map(|e| e.value),
2307 Some(b"1".to_vec()),
2308 "the transiently-failed batch was re-queued, not silently dropped \
2309 behind an advancing cursor"
2310 );
2311 assert_eq!(
2312 reopened.get("node.b").unwrap().map(|e| e.value),
2313 Some(b"2".to_vec())
2314 );
2315 }
2316 /// A reader whose live-key listing always fails — the resync's I/O
2317 /// failure mode.
2318 struct FailingReader;
2319
2320 #[async_trait]
2321 impl KvReader for FailingReader {
2322 async fn get(&self, _key: &str) -> Result<Option<KvEntry>, KvError> {
2323 unreachable!("resync only lists keys")
2324 }
2325 async fn entry(&self, _key: &str) -> Result<Option<KvEntry>, KvError> {
2326 unreachable!("resync only lists keys")
2327 }
2328 async fn keys(&self, _prefix: &str) -> Result<Vec<String>, KvError> {
2329 Err(KvError::OperationFailed("injected listing failure".into()))
2330 }
2331 async fn scan(&self, _prefix: &str) -> Result<Vec<KvEntry>, KvError> {
2332 unreachable!("resync only lists keys")
2333 }
2334 }
2335
2336 /// REGRESSION PIN (code-level twin of tests/model.rs's Degrade
2337 /// configuration): a resync whose live-key listing fails must FAIL THE
2338 /// WATCH, not degrade to re-list-only with a warning — the degrade
2339 /// semantics provably break the convergence theorem (silent stale keys).
2340 /// Reverting `resync_stale_keys` to warn-and-continue fails this test.
2341 #[tokio::test]
2342 async fn resync_listing_failure_is_fatal_not_degraded() {
2343 let dir = tempfile::TempDir::new().unwrap();
2344 let path = dir.path().join("resync-fatal.snap");
2345 let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
2346 store
2347 .apply(&[put("node.a", b"1", 1)], &WatchCursor::from_u64(1))
2348 .unwrap();
2349
2350 // Resume cursor expired -> resync path -> reader listing fails.
2351 let mock = MockWatcher {
2352 full: Mutex::new(Some(vec![])),
2353 from: Mutex::new(Some(vec![])),
2354 from_expires: true,
2355 hold: false,
2356 };
2357 let (_sd_tx, sd_rx) = watch::channel(false);
2358 let err = watch_applied(
2359 Arc::new(mock),
2360 WatchScope::All,
2361 Some(WatchCursor::from_u64(1)),
2362 Some(Arc::new(FailingReader) as Arc<dyn KvReader>),
2363 Some(store),
2364 None,
2365 BatchConfig::default(),
2366 parse_put,
2367 |_batch: Vec<Vec<u8>>| {},
2368 |_| {},
2369 sd_rx,
2370 )
2371 .await
2372 .expect_err("a failed resync listing must fail the watch");
2373 assert!(
2374 err.to_string().contains("resync failed listing live keys"),
2375 "{err}"
2376 );
2377 }
2378}