slipstream/applied.rs
1//! Cursor-after-apply watch combinator.
2//!
3//! [`watch_applied`] drives a [`KvWatcher`], batches incoming [`KvUpdate`]s over
4//! a short window (or a max count), hands each batch to a caller-supplied
5//! `apply` closure, and **only then** advances the resume cursor, checkpoints
6//! the snapshot, and fires `on_applied`. It encodes one discipline that every
7//! hand-rolled watch loop in the wider system gets subtly wrong:
8//!
9//! > **INVARIANT.** A persisted/reported cursor `C` implies every update with
10//! > revision ≤ `C` has been *applied* — the caller's `apply()` has returned for
11//! > it. The cursor never advances on *receipt* of an update, only after it has
12//! > durably taken effect.
13//!
14//! ## Why receipt is the wrong signal
15//!
16//! The tempting shortcut is to bump the cursor as each update arrives off the
17//! channel (`high_water = rev` on `rx.recv()`), then apply the batch later. On a
18//! crash between those two steps the persisted cursor claims "caught up to rev
19//! N" while rev N is still sitting in an unapplied buffer. On resume the watch
20//! starts *past* rev N and silently skips it — a correctness hole in the exact
21//! "resume after any restart" guarantee this crate advertises.
22//!
23//! Saltzer, Reed & Clark's *End-to-End Arguments in System Design* (1984) names
24//! the fix: a function placed below the endpoints (here, the channel receive)
25//! can only be a performance hint; the *endpoint* — the application of the
26//! update — is the only place the "it happened" guarantee can actually be
27//! established. So the cursor is written from `apply()`'s completion, not from
28//! the transport's delivery.
29//!
30//! The cursor-as-monotonic-index-into-a-log shape itself follows HashiCorp
31//! Consul's anti-entropy / blocking-query lineage: a client holds the last index
32//! it has *reconciled* and re-arms the watch from there, never from the index it
33//! merely *saw*.
34//!
35//! ## What the caller supplies
36//!
37//! - `parse`: maps a raw [`KvUpdate`] to an optional domain value `U`. Returning
38//! `None` (corrupt bytes, irrelevant key) is fine — the update is still
39//! *received*, so it still counts toward the cursor; there is simply nothing to
40//! apply for it.
41//! - `apply`: consumes a `Vec<U>` in revision order. This is the only domain
42//! logic; for the tunnel router it swaps the route table, for the edge origin
43//! watcher it rebuilds the hashrings.
44//! - `on_applied`: fires once per flush, *after* `apply` returns, with the new
45//! applied cursor. Callers use it to persist the cursor for the next restart.
46//!
47//! ## Panics
48//!
49//! `apply` runs inline on the watch task. If it panics, the panic propagates out
50//! of [`watch_applied`] and aborts the watch — that is the caller's contract,
51//! the same as a panic in any other supplied closure.
52
53use std::path::PathBuf;
54use std::sync::Arc;
55use std::time::Duration;
56
57use tokio::sync::mpsc;
58use tokio::sync::{oneshot, watch};
59use tracing::warn;
60
61use crate::artifact::ExportManifest;
62use crate::kv::{KvError, KvReader, KvUpdate, KvWatcher, WatchCursor};
63use crate::snapshot::{SnapshotError, SnapshotStore};
64
65/// A request, sent into a running [`watch_applied`] loop, to export the fold it
66/// owns (see [`SnapshotStore::export_to`]).
67///
68/// `watch_applied` takes its snapshot store **by value**, so a consumer that
69/// wants periodic artifacts of a live fold cannot call `export_to` itself. It
70/// instead passes an `mpsc::Receiver<ExportRequest>` to [`watch_applied`] and
71/// sends requests through the paired sender. The loop handles a request
72/// between batch flushes — after flushing any pending batch — so the artifact's
73/// embedded cursor is exactly the applied cursor at the moment of export.
74///
75/// The export result (or error) comes back on `reply`; an export failure is
76/// reported there and the watch keeps running (the snapshot is a cache — a
77/// failed artifact is the requester's problem, not the fold's).
78pub struct ExportRequest {
79 /// Where the artifact directory will be created. Must not exist (or be an
80 /// empty directory); same filesystem as the fold for cheap hardlinks.
81 pub dest_dir: PathBuf,
82 /// Receives the sealed manifest on success. A dropped receiver is ignored.
83 pub reply: oneshot::Sender<Result<ExportManifest, SnapshotError>>,
84}
85
86/// What to watch: every key, every key under a prefix, or the union of several
87/// prefixes.
88///
89/// Mirrors the [`KvWatcher`] surface — `All` maps to `watch_all` /
90/// `watch_all_from`, `Prefix` to `watch_prefix` / `watch_prefix_from`,
91/// `Prefixes` to `watch_prefixes` / `watch_prefixes_from` (one multi-filter
92/// consumer for the whole union, never one consumer per prefix).
93#[derive(Debug, Clone)]
94pub enum WatchScope {
95 /// Watch all keys in the bucket.
96 All,
97 /// Watch only keys beginning with this prefix.
98 Prefix(String),
99 /// Watch keys beginning with ANY of these prefixes, on a single consumer.
100 Prefixes(Vec<String>),
101}
102
103impl WatchScope {
104 /// The scope as a list of key prefixes (`All` = the empty prefix), for
105 /// callers that enumerate scope contents (live listings, fold ranges).
106 fn prefixes(&self) -> Vec<String> {
107 match self {
108 WatchScope::All => vec![String::new()],
109 WatchScope::Prefix(p) => vec![p.clone()],
110 WatchScope::Prefixes(ps) => ps.clone(),
111 }
112 }
113}
114
115/// Internal: a cursor-expired resync handoff from the watch task to the main
116/// loop. Carries the bucket's live key listing for the watch scope; the main
117/// loop diffs it against the fold and applies synthetic deletes for keys that
118/// vanished during the gap, then acks so the watch task can start the fallback
119/// watch — the ack ordering guarantees every synthetic delete is applied before
120/// the first re-list put arrives.
121struct ResyncRequest {
122 live_keys: Vec<String>,
123 ack: oneshot::Sender<()>,
124}
125
126/// Internal: what the watch task needs to initiate a resync — the reader that
127/// lists live keys and the channel into the main loop that owns the fold.
128type ResyncHandle = (Arc<dyn KvReader>, mpsc::Sender<ResyncRequest>);
129
130/// Batching policy for [`watch_applied`].
131///
132/// A flush fires when **either** bound is hit, whichever comes first: `window`
133/// time has elapsed since the batch opened, or `max` updates have accumulated.
134/// The window amortizes the cost of `apply` (e.g. one route-table clone per
135/// flush instead of one per update); `max` caps memory and latency when updates
136/// arrive faster than the window.
137#[derive(Debug, Clone, Copy)]
138pub struct BatchConfig {
139 /// Maximum time a batch stays open before being flushed.
140 pub window: Duration,
141 /// Maximum number of parsed updates in a batch before forcing a flush.
142 pub max: usize,
143}
144
145impl Default for BatchConfig {
146 /// 10 ms / 100 updates — the de-facto default every hand-rolled caller
147 /// already used, lifted into one place.
148 fn default() -> Self {
149 Self {
150 window: Duration::from_millis(10),
151 max: 100,
152 }
153 }
154}
155
156/// Drive a watch with cursor-after-apply semantics.
157///
158/// Subscribes per `scope` (resuming from `resume` when it carries a position),
159/// batches updates per `config`, applies each batch via `apply`, and only then
160/// advances the cursor / folds the batch into `store` / calls `on_applied`.
161/// Returns the final applied cursor when the watch ends (shutdown signalled, or
162/// the underlying stream closed).
163///
164/// `store` is any [`SnapshotStore`] backend the consumer chose (the in-RAM
165/// [`AppendLogSnapshot`](crate::AppendLogSnapshot) default, an on-disk backend, or
166/// its own impl) — or `None` to run without persistence. On each flush, *after*
167/// `apply` returns, the whole batch of raw [`KvUpdate`]s is handed to
168/// `store.apply(batch, applied_cursor)` on a blocking task, so the store's
169/// persisted cursor is always the post-apply cursor and never names a revision
170/// whose `apply` had not returned. The store fold is atomic (data + cursor), so a
171/// crash leaves the store consistent and resume re-folds only the tail.
172///
173/// # Cursor expiry and stale-key resync
174///
175/// On [`KvError::CursorExpired`] from the `*_from` resume path, this logs and
176/// falls back to a full-scope watch (`watch_all` / `watch_prefix` /
177/// `watch_prefixes`), whose state-sync re-list re-delivers the current value of
178/// every in-scope key as puts. The re-list alone cannot cover keys that were
179/// **deleted during the gap** and whose delete markers the backend has since
180/// evicted — they simply don't appear, leaving the fold (and the caller's
181/// domain state) holding them forever.
182///
183/// When both `reader` and `store` are provided, the expiry path closes that
184/// hole: before the fallback watch starts, the bucket's live keys are listed
185/// via `reader`, diffed against the fold's in-scope keys, and a synthetic
186/// [`KvUpdate::Delete`] (with an unknown [`VersionToken`](crate::VersionToken)) is run through
187/// `parse`/`apply`/`store` for each key that vanished. The synthetic deletes
188/// are strictly ordered before the first re-list put, so a key deleted and
189/// re-created during the gap converges correctly. Without a `reader` (or
190/// without a `store` to diff against) the fallback is re-list-only and a
191/// warning marks the possible stale keys.
192///
193/// A resync that was armed but FAILS (live-key listing or fold diff error) is
194/// fatal to the watch — degrading to re-list-only would silently leave
195/// deleted keys in the fold (`tests/model.rs` proves that divergence
196/// reachable), so the error surfaces and the caller's restart retries the
197/// resume → expiry → resync path from scratch.
198///
199/// See `ARCHITECTURE.md` ("Applied-Cursor Watch") for the invariant and its
200/// rationale.
201///
202/// # Type parameters
203/// - `U`: the caller's domain update type, produced by `parse` and consumed by
204/// `apply`.
205// This combinator takes each of its dependencies as a parameter so every
206// caller-supplied closure (`parse`/`apply`/`on_applied`) keeps its own distinct
207// type and is monomorphized at the call site. Folding them into a builder struct
208// would either box the closures or force a single generic bundle, losing that.
209#[allow(clippy::too_many_arguments)]
210// The flush macro resets `batch_high`/`batch_deadline` for the next loop
211// iteration. At the two flush sites that return immediately afterward (shutdown,
212// channel-close) those resets are dead stores — correct, but flagged. The allow
213// must sit on the function: a statement-scoped `#[allow]` inside the macro body
214// trips the experimental attributes-on-expressions gate (E0658) on stable.
215#[allow(unused_assignments)]
216pub async fn watch_applied<U, S, P, A, O>(
217 watcher: Arc<dyn KvWatcher>,
218 scope: WatchScope,
219 resume: Option<WatchCursor>,
220 // `Some(reader)` arms the cursor-expired stale-key resync (see the function
221 // docs); `None` keeps the re-list-only fallback. Only consulted on expiry —
222 // the hot path never touches it.
223 reader: Option<Arc<dyn KvReader>>,
224 mut store: Option<S>,
225 // `Some(rx)` arms an export-request arm in the select loop: each
226 // [`ExportRequest`] is handled between flushes (pending batch flushed
227 // first), so the exported artifact's cursor is exactly the applied cursor.
228 // `None` (or dropping the paired sender) leaves the loop's behavior
229 // unchanged.
230 mut exports: Option<mpsc::Receiver<ExportRequest>>,
231 config: BatchConfig,
232 mut parse: P,
233 mut apply: A,
234 mut on_applied: O,
235 mut shutdown: watch::Receiver<bool>,
236) -> Result<WatchCursor, KvError>
237where
238 U: Send,
239 // `Send + 'static`: each flush moves `store` onto a blocking task to run its
240 // (potentially blocking) `apply`, then takes it back — the same offload the
241 // append log's compaction always used.
242 S: SnapshotStore + Send + 'static,
243 P: FnMut(&KvUpdate) -> Option<U> + Send,
244 A: FnMut(Vec<U>) + Send,
245 O: FnMut(WatchCursor) + Send,
246{
247 // The cursor we'll return. Initialized from the resume position so that a
248 // watch which receives nothing new still reports the position it resumed
249 // from as "applied" (it is — everything up to it was applied before the last
250 // run persisted it).
251 let mut applied = match &resume {
252 Some(c) => c.clone(),
253 None => WatchCursor::none(),
254 };
255
256 // The scope's prefixes, for the resync diff against the fold. Cloned out
257 // before `scope` moves into the watch task.
258 let scope_prefixes = scope.prefixes();
259
260 // Cursor-expired resync channel, armed only when there is a reader to list
261 // live keys AND a store to diff them against. The watch task sends the live
262 // listing here and waits for the ack before starting the fallback watch, so
263 // synthetic deletes always precede the re-list.
264 let (resync_pair, mut resyncs): (Option<ResyncHandle>, Option<mpsc::Receiver<ResyncRequest>>) =
265 match reader {
266 Some(reader) if store.is_some() => {
267 let (rs_tx, rs_rx) = mpsc::channel(1);
268 (Some((reader, rs_tx)), Some(rs_rx))
269 }
270 _ => (None, None),
271 };
272
273 // Spawn the watch task. It owns the cursor-expired fallback so the main loop
274 // only ever sees a clean ordered stream of updates on `rx`.
275 let (tx, mut rx) = mpsc::channel::<KvUpdate>(256);
276 let handle = {
277 let watcher = Arc::clone(&watcher);
278 tokio::spawn(
279 async move { run_watch(watcher.as_ref(), &scope, resume, resync_pair, tx).await },
280 )
281 };
282
283 // Batch state.
284 //
285 // `batch_high` tracks the version of the most recently *received* update
286 // since the last flush — including updates `parse` rejected. NATS delivers
287 // in revision order, so the last received is the highest, and advancing the
288 // cursor to it after a single atomic `apply` is correct: having seen the max
289 // means we've seen everything below it, and a rejected entry is still
290 // "nothing to apply", hence covered. Reset to `none()` after every flush.
291 // Pre-size to the flush bound so no batch ever re-climbs the reallocation
292 // ladder; `max(1)` only guards a nonsensical `max = 0` config.
293 let batch_cap = config.max.max(1);
294 let mut batch: Vec<U> = Vec::with_capacity(batch_cap);
295 // Raw received updates for the durable `store`, in revision order. Only
296 // populated when a `store` is present; the store folds the *raw* updates
297 // (including ones `parse` rejected — they are still part of the bucket's
298 // state), whereas the parsed `batch` above is the consumer's domain view.
299 let mut raw_batch: Vec<KvUpdate> = Vec::new();
300 let mut batch_high = WatchCursor::none();
301 // `Some` once a batch has opened and the window timer is armed; `None`
302 // between flushes. Only the armed/idle distinction is read in the loop —
303 // the absolute instant lives in the pinned `sleep` future below.
304 let mut batch_deadline: Option<tokio::time::Instant> = None;
305
306 // Flush the current batch, in order: run the domain `apply` (if non-empty) to
307 // completion, advance the cursor, fold the raw batch + cursor durably into
308 // `store`, then fire `on_applied`. The store fold runs on a blocking task
309 // (its `apply` may block on I/O), moving the store in and taking it back — the
310 // same offload the append log's compaction always used. A store error is
311 // logged and the watch continues (the snapshot is a cache); a panicked
312 // blocking task drops the store irrecoverably, which breaks the
313 // resume-after-restart guarantee, so it is surfaced as fatal.
314 macro_rules! flush {
315 () => {{
316 // Nothing received since the last flush → nothing to do at all.
317 // (`raw_batch` can be non-empty with no cursor advance only via the
318 // resync path's synthetic deletes, which carry no revision.)
319 if !batch.is_empty() || !raw_batch.is_empty() || !batch_high.is_none() {
320 if !batch.is_empty() {
321 // INVARIANT: apply() runs and RETURNS before any cursor
322 // advance below. Move the batch out so a panicking apply
323 // can't leave half-consumed state behind.
324 //
325 // `replace` (not `take`) leaves a pre-sized Vec behind so each
326 // batch after the first doesn't re-climb the reallocation
327 // ladder (4→8→…→cap).
328 apply(std::mem::replace(&mut batch, Vec::with_capacity(batch_cap)));
329 }
330 let advanced = !batch_high.is_none();
331 if advanced {
332 applied = batch_high.clone();
333 }
334 if !raw_batch.is_empty()
335 && let Some(mut st) = store.take()
336 {
337 let raw = std::mem::take(&mut raw_batch);
338 // Fold at the post-advance cursor. A synthetic-deletes-only
339 // batch leaves the cursor where it was (the deletes are a
340 // state correction, not log entries), which is safe: an
341 // unchanged — possibly expired — cursor only ever re-runs
342 // this same resync on the next restart.
343 let cur = applied.clone();
344 // Hand the store back unconditionally on a clean return so
345 // a *failed* apply (Ok(Err)) keeps the watch running; only
346 // a *panicked* task (Err) loses the store and is fatal.
347 match tokio::task::spawn_blocking(move || {
348 let res = st.apply(&raw, &cur);
349 (st, res)
350 })
351 .await
352 {
353 Ok((st, Ok(()))) => store = Some(st),
354 Ok((st, Err(e))) => {
355 warn!(error = %e, "snapshot store apply failed; continuing");
356 store = Some(st);
357 }
358 Err(e) => {
359 warn!(error = %e, "snapshot store task panicked; aborting watch");
360 handle.abort();
361 return Err(KvError::WatchError(format!(
362 "snapshot store task panicked: {e}"
363 )));
364 }
365 }
366 }
367 if advanced {
368 on_applied(applied.clone());
369 batch_high = WatchCursor::none();
370 }
371 }
372 batch_deadline = None;
373 }};
374 }
375
376 // A single timer future, reset in place each time a batch opens. The old
377 // `tokio::time::sleep(timeout)` lived inside the select arm, so it was
378 // re-created on every loop iteration — one Arc-backed timer-wheel entry
379 // allocated, registered, and immediately dropped per received update.
380 // Pinning one future and `reset`-ing it reuses that single allocation; the
381 // `if batch_deadline.is_some()` guard keeps it from firing while idle, so
382 // its initial already-elapsed deadline is never observed.
383 let sleep = tokio::time::sleep(Duration::ZERO);
384 tokio::pin!(sleep);
385
386 loop {
387 tokio::select! {
388 biased;
389
390 // Shutdown wins: flush whatever is batched (so the cursor reflects
391 // it), abandon any updates still in flight on the channel — they
392 // weren't applied, the cursor doesn't claim them, and they'll be
393 // re-delivered on the next resume — and return the applied cursor.
394 res = shutdown.changed() => {
395 if res.is_err() || *shutdown.borrow() {
396 flush!();
397 handle.abort();
398 // Observe the task's terminal state. An abort surfaces as a
399 // cancelled JoinError, which we ignore; a genuine panic that
400 // raced ahead of the abort is logged rather than silently lost.
401 if let Err(join) = handle.await
402 && !join.is_cancelled()
403 {
404 warn!(error = %join, "watch task panicked at shutdown");
405 }
406 return Ok(applied);
407 }
408 }
409
410 // Batch window elapsed.
411 () = &mut sleep, if batch_deadline.is_some() => {
412 flush!();
413 }
414
415 // Cursor-expired resync. Placed before `rx.recv()` (biased) so the
416 // synthetic deletes are folded before any update the fallback watch
417 // delivers — though the ack protocol already guarantees the fallback
418 // hasn't started while this arm runs. Diff the fold's in-scope keys
419 // against the bucket's live listing; anything the fold holds that
420 // the bucket no longer does vanished during the gap (its delete
421 // marker evicted with the cursor), so synthesize the delete the
422 // re-list can't deliver.
423 req = async { resyncs.as_mut().expect("arm guarded by is_some").recv().await },
424 if resyncs.is_some() => {
425 match req {
426 Some(ResyncRequest { live_keys, ack }) => {
427 // Flush first so the diff runs against a fold that
428 // reflects everything received so far.
429 flush!();
430 let live: std::collections::HashSet<&str> =
431 live_keys.iter().map(String::as_str).collect();
432 let mut stale: Vec<String> = Vec::new();
433 if let Some(st) = &store {
434 for prefix in &scope_prefixes {
435 match st.range(prefix) {
436 Ok(entries) => stale.extend(
437 entries
438 .into_iter()
439 .filter(|e| !live.contains(e.key.as_str()))
440 .map(|e| e.key),
441 ),
442 Err(e) => {
443 // FATAL, not a degrade: an incomplete
444 // diff silently leaves deleted keys in
445 // the fold forever (tests/model.rs
446 // proves the divergence reachable
447 // under degrade semantics). Fail the
448 // watch; the restart re-runs the
449 // resume → expiry → resync from
450 // scratch.
451 warn!(error = %e, prefix = %prefix,
452 "resync fold range failed; aborting watch rather than diverging");
453 handle.abort();
454 return Err(KvError::WatchError(format!(
455 "cursor-expired resync failed listing fold prefix {prefix:?}: {e}"
456 )));
457 }
458 }
459 }
460 }
461 // Overlapping prefixes can list a key twice.
462 stale.sort_unstable();
463 stale.dedup();
464 if !stale.is_empty() {
465 warn!(stale = stale.len(), "cursor-expired resync: deleting keys that vanished during the gap");
466 }
467 for key in stale {
468 // Synthetic: carries no revision (unknown version)
469 // and so never advances the cursor.
470 let u = KvUpdate::Delete {
471 key,
472 version: crate::kv::VersionToken::unknown(),
473 };
474 if store.is_some() {
475 raw_batch.push(u.clone());
476 }
477 if let Some(parsed) = parse(&u) {
478 batch.push(parsed);
479 }
480 }
481 flush!();
482 // Ack AFTER the deletes are applied: the watch task is
483 // holding the fallback watch until it hears back, which
484 // is what orders deletes before the re-list.
485 let _ = ack.send(());
486 }
487 None => resyncs = None,
488 }
489 }
490
491 // Export request. Placed after shutdown/window (they stay prompt)
492 // and before `rx.recv()` so a firehose of updates cannot starve an
493 // export indefinitely. The pending batch is flushed first, so the
494 // exported cursor is exactly the applied cursor; the export itself
495 // runs on a blocking task with the store moved in and taken back —
496 // the same offload the flush path uses.
497 req = async { exports.as_mut().expect("arm guarded by is_some").recv().await },
498 if exports.is_some() => {
499 match req {
500 Some(ExportRequest { dest_dir, reply }) => {
501 flush!();
502 match store.take() {
503 Some(mut st) => {
504 match tokio::task::spawn_blocking(move || {
505 let res = st.export_to(&dest_dir);
506 (st, res)
507 })
508 .await
509 {
510 // Hand the store back on any clean return; an
511 // export failure goes to the requester only —
512 // the watch keeps running (the snapshot is a
513 // cache). A panicked task lost the store,
514 // which breaks the resume guarantee: fatal,
515 // same as the flush path's apply panic.
516 Ok((st, res)) => {
517 store = Some(st);
518 let _ = reply.send(res);
519 }
520 Err(e) => {
521 warn!(error = %e, "snapshot export task panicked; aborting watch");
522 handle.abort();
523 return Err(KvError::WatchError(format!(
524 "snapshot export task panicked: {e}"
525 )));
526 }
527 }
528 }
529 None => {
530 let _ = reply.send(Err(SnapshotError::Backend(
531 "watch_applied runs without a snapshot store; nothing to export"
532 .into(),
533 )));
534 }
535 }
536 }
537 // Sender dropped: disarm the arm for the rest of the run.
538 None => exports = None,
539 }
540 }
541
542 update = rx.recv() => {
543 match update {
544 Some(u) => {
545 // Cursor authority: every received update bumps the
546 // pending high-water, regardless of whether `parse`
547 // keeps it.
548 batch_high = WatchCursor::from_version(u.version().clone());
549
550 // Buffer the raw update for the durable store fold (which
551 // commits the whole batch + cursor atomically on flush).
552 // Done before `parse` consumes `u` by reference, and only
553 // when a store is present so the no-persistence path keeps
554 // its zero-copy cost.
555 if store.is_some() {
556 raw_batch.push(u.clone());
557 }
558
559 if let Some(parsed) = parse(&u) {
560 batch.push(parsed);
561 }
562
563 // Arm the window on the first received update of a batch
564 // — even a parse-rejected one, so the cursor advances
565 // within `window` even through a run of irrelevant keys.
566 // Reset the pinned timer to the new deadline rather than
567 // allocating a fresh `Sleep`.
568 if batch_deadline.is_none() {
569 let deadline = tokio::time::Instant::now() + config.window;
570 sleep.as_mut().reset(deadline);
571 batch_deadline = Some(deadline);
572 }
573
574 // Flush on a full parsed batch, or — when persisting — a
575 // full raw batch, so a window packed with parse-rejected
576 // updates can't grow `raw_batch` without bound before the
577 // window elapses.
578 if batch.len() >= config.max || raw_batch.len() >= config.max {
579 flush!();
580 }
581 }
582 None => {
583 // Stream closed. Flush the remainder, then surface the
584 // watch task's terminal result: a clean end returns the
585 // applied cursor, an error propagates.
586 flush!();
587 return match handle.await {
588 Ok(Ok(())) => Ok(applied),
589 Ok(Err(e)) => Err(e),
590 Err(join) => Err(KvError::WatchError(format!(
591 "watch task panicked: {join}"
592 ))),
593 };
594 }
595 }
596 }
597 }
598 }
599}
600
601/// Run the underlying watch for `scope`, resuming from `resume` when it carries
602/// a position, with the [`KvError::CursorExpired`] → resync + full-watch
603/// fallback.
604async fn run_watch(
605 watcher: &dyn KvWatcher,
606 scope: &WatchScope,
607 resume: Option<WatchCursor>,
608 resync: Option<ResyncHandle>,
609 tx: mpsc::Sender<KvUpdate>,
610) -> Result<(), KvError> {
611 // Resume only when the cursor carries a real position; an absent or `none()`
612 // cursor falls through to a full watch. Binding `cursor` here makes "we have a
613 // resume position" structural — there is no separate bool whose truth a later
614 // edit could let drift from the `Some`.
615 let resume_cursor = resume.filter(|c| !c.is_none());
616
617 match scope {
618 WatchScope::All => {
619 if let Some(cursor) = resume_cursor {
620 match watcher.watch_all_from(&cursor, tx.clone()).await {
621 Err(KvError::CursorExpired) => {
622 warn!(
623 "watch cursor expired; resyncing, then falling back to full watch_all"
624 );
625 resync_stale_keys(scope, &resync).await?;
626 watcher.watch_all(tx).await
627 }
628 other => other,
629 }
630 } else {
631 watcher.watch_all(tx).await
632 }
633 }
634 WatchScope::Prefix(prefix) => {
635 if let Some(cursor) = resume_cursor {
636 match watcher.watch_prefix_from(prefix, &cursor, tx.clone()).await {
637 Err(KvError::CursorExpired) => {
638 warn!(
639 "watch cursor expired; resyncing, then falling back to full watch_prefix"
640 );
641 resync_stale_keys(scope, &resync).await?;
642 watcher.watch_prefix(prefix, tx).await
643 }
644 other => other,
645 }
646 } else {
647 watcher.watch_prefix(prefix, tx).await
648 }
649 }
650 WatchScope::Prefixes(prefixes) => {
651 let refs: Vec<&str> = prefixes.iter().map(String::as_str).collect();
652 if let Some(cursor) = resume_cursor {
653 match watcher
654 .watch_prefixes_from(&refs, &cursor, tx.clone())
655 .await
656 {
657 Err(KvError::CursorExpired) => {
658 warn!(
659 "watch cursor expired; resyncing, then falling back to full watch_prefixes"
660 );
661 resync_stale_keys(scope, &resync).await?;
662 watcher.watch_prefixes(&refs, tx).await
663 }
664 other => other,
665 }
666 } else {
667 watcher.watch_prefixes(&refs, tx).await
668 }
669 }
670 }
671}
672
673/// Cursor-expired stale-key resync, run BEFORE the fallback watch is
674/// established: list the scope's live keys, hand them to the main loop (which
675/// diffs them against the fold and applies synthetic deletes), and wait for the
676/// ack. That ordering — deletes applied, then fallback watch armed — is what
677/// makes a delete-then-recreate during the gap converge: the synthetic delete
678/// always lands before the re-list put.
679///
680/// With no reader/store wired (`resync` is `None`) the caller explicitly opted
681/// out: warn and fall back re-list-only (keys deleted during the gap stay in
682/// the fold — `tests/model.rs` pins this divergence as reachable).
683///
684/// A FAILED listing, by contrast, is **fatal** — it fails the watch rather
685/// than degrading. The resync is load-bearing for the "stale, never corrupt"
686/// convergence guarantee: a silently degraded resync leaves the fold holding
687/// keys the bucket deleted, with one warn line as the only witness
688/// (`tests/model.rs` proves this divergence reachable under degrade
689/// semantics). Failing the watch turns the violated guarantee into a visible
690/// error; the caller's restart re-resumes, hits `CursorExpired` again, and
691/// retries the resync from scratch.
692async fn resync_stale_keys(
693 scope: &WatchScope,
694 resync: &Option<ResyncHandle>,
695) -> Result<(), KvError> {
696 let Some((reader, resync_tx)) = resync else {
697 warn!(
698 "no reader wired for cursor-expired resync; keys deleted during the gap may persist in the fold"
699 );
700 return Ok(());
701 };
702 let mut live_keys = Vec::new();
703 for prefix in scope.prefixes() {
704 match reader.keys(&prefix).await {
705 Ok(keys) => live_keys.extend(keys),
706 Err(e) => {
707 return Err(KvError::WatchError(format!(
708 "cursor-expired resync failed listing live keys under {prefix:?}: {e}; \
709 failing the watch rather than silently keeping stale keys"
710 )));
711 }
712 }
713 }
714 let (ack_tx, ack_rx) = oneshot::channel();
715 if resync_tx
716 .send(ResyncRequest {
717 live_keys,
718 ack: ack_tx,
719 })
720 .await
721 .is_ok()
722 {
723 // A dropped ack (main loop shutting down) just means the fallback watch
724 // is about to die with it; nothing to recover.
725 let _ = ack_rx.await;
726 }
727 Ok(())
728}
729
730#[cfg(test)]
731mod tests {
732 use super::*;
733 use crate::kv::{KvEntry, VersionToken};
734 use crate::snapshot::AppendLogSnapshot;
735 use async_trait::async_trait;
736 use std::sync::Mutex;
737 use std::sync::atomic::{AtomicU64, Ordering};
738 use tokio::sync::mpsc::Sender;
739
740 fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
741 KvUpdate::Put(KvEntry {
742 key: key.to_string(),
743 value: value.to_vec(),
744 version: VersionToken::from_u64(rev),
745 })
746 }
747
748 /// A scripted watcher. Delivers a pre-set list of updates through the
749 /// channel, then either holds the channel open (so window/max/shutdown
750 /// flushes can be exercised without the stream ending) or returns cleanly
751 /// (so channel-close flushing can be exercised).
752 struct MockWatcher {
753 full: Mutex<Option<Vec<KvUpdate>>>,
754 from: Mutex<Option<Vec<KvUpdate>>>,
755 from_expires: bool,
756 hold: bool,
757 }
758
759 impl MockWatcher {
760 fn new(updates: Vec<KvUpdate>, hold: bool) -> Self {
761 Self {
762 full: Mutex::new(Some(updates)),
763 from: Mutex::new(None),
764 from_expires: false,
765 hold,
766 }
767 }
768
769 async fn deliver(&self, which: &Mutex<Option<Vec<KvUpdate>>>, tx: Sender<KvUpdate>) {
770 let updates = which.lock().unwrap().take().unwrap_or_default();
771 for u in updates {
772 if tx.send(u).await.is_err() {
773 return;
774 }
775 }
776 if self.hold {
777 // Keep `tx` alive (channel open) until this task is aborted.
778 std::future::pending::<()>().await;
779 }
780 }
781 }
782
783 #[async_trait]
784 impl KvWatcher for MockWatcher {
785 async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
786 self.deliver(&self.full, tx).await;
787 Ok(())
788 }
789
790 async fn watch_prefix(&self, _prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
791 self.deliver(&self.full, tx).await;
792 Ok(())
793 }
794
795 async fn watch_prefixes(
796 &self,
797 _prefixes: &[&str],
798 tx: Sender<KvUpdate>,
799 ) -> Result<(), KvError> {
800 // This mock scripts the applied-watch resumption tests, not prefix
801 // filtering; it delivers the same `full` script as `watch_prefix`.
802 // The real multi-filter scoping is proved in the NATS integration test.
803 self.deliver(&self.full, tx).await;
804 Ok(())
805 }
806
807 async fn watch_all_from(
808 &self,
809 _cursor: &WatchCursor,
810 tx: Sender<KvUpdate>,
811 ) -> Result<(), KvError> {
812 if self.from_expires {
813 return Err(KvError::CursorExpired);
814 }
815 self.deliver(&self.from, tx).await;
816 Ok(())
817 }
818
819 // Mirror watch_all_from so the prefix resume / expiry arms of run_watch
820 // are exercised against the same `from` script. Without this the trait's
821 // default impl would delegate to watch_prefix and silently deliver the
822 // full set instead of the delta.
823 async fn watch_prefix_from(
824 &self,
825 _prefix: &str,
826 _cursor: &WatchCursor,
827 tx: Sender<KvUpdate>,
828 ) -> Result<(), KvError> {
829 if self.from_expires {
830 return Err(KvError::CursorExpired);
831 }
832 self.deliver(&self.from, tx).await;
833 Ok(())
834 }
835
836 // Same mirroring for the multi-prefix resume arm.
837 async fn watch_prefixes_from(
838 &self,
839 _prefixes: &[&str],
840 _cursor: &WatchCursor,
841 tx: Sender<KvUpdate>,
842 ) -> Result<(), KvError> {
843 if self.from_expires {
844 return Err(KvError::CursorExpired);
845 }
846 self.deliver(&self.from, tx).await;
847 Ok(())
848 }
849 }
850
851 /// A reader whose `keys()` serves a scripted live listing — the only call
852 /// the cursor-expired resync makes. Filters by prefix like a real backend
853 /// so prefix-scoped resyncs are exercised faithfully.
854 struct MockReader {
855 live: Vec<String>,
856 }
857
858 #[async_trait]
859 impl KvReader for MockReader {
860 async fn get(&self, _key: &str) -> Result<Option<KvEntry>, KvError> {
861 unreachable!("resync only lists keys")
862 }
863
864 async fn keys(&self, prefix: &str) -> Result<Vec<String>, KvError> {
865 Ok(self
866 .live
867 .iter()
868 .filter(|k| k.starts_with(prefix))
869 .cloned()
870 .collect())
871 }
872
873 async fn scan(&self, _prefix: &str) -> Result<Vec<KvEntry>, KvError> {
874 unreachable!("resync only lists keys")
875 }
876 }
877
878 /// A watcher whose entry points all fail. Used to prove the watch task's
879 /// terminal error is surfaced out of `watch_applied` rather than swallowed
880 /// as a clean `Ok(applied)` when the channel closes.
881 struct ErrorWatcher;
882
883 #[async_trait]
884 impl KvWatcher for ErrorWatcher {
885 async fn watch_all(&self, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
886 Err(KvError::WatchError("injected watch failure".into()))
887 }
888
889 async fn watch_prefix(&self, _prefix: &str, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
890 Err(KvError::WatchError("injected watch failure".into()))
891 }
892
893 async fn watch_prefixes(
894 &self,
895 _prefixes: &[&str],
896 _tx: Sender<KvUpdate>,
897 ) -> Result<(), KvError> {
898 Err(KvError::WatchError("injected watch failure".into()))
899 }
900 }
901
902 // A no-op parse that keeps every Put as the value bytes; drops deletes.
903 fn parse_put(u: &KvUpdate) -> Option<Vec<u8>> {
904 match u {
905 KvUpdate::Put(e) => Some(e.value.clone()),
906 _ => None,
907 }
908 }
909
910 /// The stream closes (hold = false) with a pending batch; the remainder is
911 /// flushed before returning, the returned cursor is the last revision, and
912 /// `on_applied` ran exactly once after `apply`.
913 #[tokio::test]
914 async fn flush_on_channel_close() {
915 let updates = vec![put("a", b"1", 1), put("b", b"2", 2), put("c", b"3", 3)];
916 let watcher = Arc::new(MockWatcher::new(updates, false));
917
918 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<Vec<u8>>>::new()));
919 let on_applied_cursors = Arc::new(Mutex::new(Vec::<u64>::new()));
920
921 let ab = Arc::clone(&applied_batches);
922 let oc = Arc::clone(&on_applied_cursors);
923 let (_sd_tx, sd_rx) = watch::channel(false);
924
925 let cursor = watch_applied(
926 watcher,
927 WatchScope::All,
928 None,
929 None, // reader (no resync in this test)
930 None::<AppendLogSnapshot>,
931 None,
932 BatchConfig::default(),
933 parse_put,
934 move |batch| ab.lock().unwrap().push(batch),
935 move |c| oc.lock().unwrap().push(c.as_u64().unwrap()),
936 sd_rx,
937 )
938 .await
939 .unwrap();
940
941 assert_eq!(cursor.as_u64(), Some(3));
942 let batches = applied_batches.lock().unwrap();
943 let flat: Vec<Vec<u8>> = batches.iter().flatten().cloned().collect();
944 assert_eq!(flat, vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec()]);
945 assert_eq!(*on_applied_cursors.lock().unwrap().last().unwrap(), 3);
946 }
947
948 /// Fewer than `max` updates, then the channel idles: the window timer must
949 /// flush them and advance the cursor.
950 #[tokio::test(start_paused = true)]
951 async fn flush_on_window() {
952 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
953 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
954
955 let applied = Arc::new(AtomicU64::new(0));
956 let count = Arc::new(AtomicU64::new(0));
957 let a = Arc::clone(&applied);
958 let c = Arc::clone(&count);
959 let (sd_tx, sd_rx) = watch::channel(false);
960
961 let task = tokio::spawn(watch_applied(
962 watcher,
963 WatchScope::All,
964 None,
965 None, // reader (no resync in this test)
966 None::<AppendLogSnapshot>,
967 None,
968 BatchConfig::default(),
969 parse_put,
970 move |batch: Vec<Vec<u8>>| {
971 c.fetch_add(batch.len() as u64, Ordering::SeqCst);
972 },
973 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
974 sd_rx,
975 ));
976
977 // Let the window (10ms) elapse under virtual time.
978 tokio::time::sleep(Duration::from_millis(50)).await;
979 assert_eq!(
980 count.load(Ordering::SeqCst),
981 2,
982 "window should have flushed"
983 );
984 assert_eq!(applied.load(Ordering::SeqCst), 2);
985
986 sd_tx.send(true).unwrap();
987 let cursor = task.await.unwrap().unwrap();
988 assert_eq!(cursor.as_u64(), Some(2));
989 }
990
991 /// Exactly `max` updates fills a batch and flushes immediately — before the
992 /// window would have elapsed.
993 #[tokio::test(start_paused = true)]
994 async fn flush_on_max() {
995 let max = 4;
996 let updates: Vec<_> = (1..=max as u64)
997 .map(|i| put(&format!("k{i}"), b"v", i))
998 .collect();
999 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1000
1001 let flushes = Arc::new(Mutex::new(Vec::<usize>::new()));
1002 let f = Arc::clone(&flushes);
1003 let (sd_tx, sd_rx) = watch::channel(false);
1004
1005 let task = tokio::spawn(watch_applied(
1006 watcher,
1007 WatchScope::All,
1008 None,
1009 None, // reader (no resync in this test)
1010 None::<AppendLogSnapshot>,
1011 None,
1012 BatchConfig {
1013 window: Duration::from_secs(3600), // effectively never
1014 max,
1015 },
1016 parse_put,
1017 move |batch: Vec<Vec<u8>>| f.lock().unwrap().push(batch.len()),
1018 move |_| {},
1019 sd_rx,
1020 ));
1021
1022 // Yield enough for the mock to push all `max` updates; the window is an
1023 // hour, so any flush is purely the max trigger.
1024 tokio::time::sleep(Duration::from_millis(1)).await;
1025 assert_eq!(
1026 *flushes.lock().unwrap(),
1027 vec![max],
1028 "a full batch should flush on max, not wait for the window"
1029 );
1030
1031 sd_tx.send(true).unwrap();
1032 task.await.unwrap().unwrap();
1033 }
1034
1035 /// A pending batch plus a shutdown signal: the batch is flushed and the
1036 /// applied cursor returned.
1037 #[tokio::test(start_paused = true)]
1038 async fn flush_on_shutdown() {
1039 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1040 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1041
1042 let applied = Arc::new(AtomicU64::new(0));
1043 let a = Arc::clone(&applied);
1044 let (sd_tx, sd_rx) = watch::channel(false);
1045
1046 let task = tokio::spawn(watch_applied(
1047 watcher,
1048 WatchScope::All,
1049 None,
1050 None, // reader (no resync in this test)
1051 None::<AppendLogSnapshot>,
1052 None,
1053 BatchConfig {
1054 window: Duration::from_secs(3600), // window won't fire
1055 max: 100,
1056 },
1057 parse_put,
1058 move |_batch: Vec<Vec<u8>>| {},
1059 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1060 sd_rx,
1061 ));
1062
1063 // Give the mock time to deliver both updates into the pending batch.
1064 tokio::time::sleep(Duration::from_millis(1)).await;
1065 sd_tx.send(true).unwrap();
1066
1067 let cursor = task.await.unwrap().unwrap();
1068 assert_eq!(
1069 cursor.as_u64(),
1070 Some(2),
1071 "shutdown flushes the pending batch"
1072 );
1073 assert_eq!(applied.load(Ordering::SeqCst), 2);
1074 }
1075
1076 /// The cursor must not advance until `apply` has returned. We prove it by
1077 /// having `apply` read the cursor that `on_applied` last published: when the
1078 /// second batch is applied, the visible cursor must still be the *first*
1079 /// batch's — never the second's, which only becomes visible after this
1080 /// `apply` returns.
1081 #[tokio::test(start_paused = true)]
1082 async fn cursor_advances_only_after_apply() {
1083 // Two batches of `max` updates each.
1084 let max = 2usize;
1085 let updates: Vec<_> = (1..=4u64).map(|i| put(&format!("k{i}"), b"v", i)).collect();
1086 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1087
1088 // Cursor as last published by on_applied; starts at 0 (nothing applied).
1089 let published = Arc::new(AtomicU64::new(0));
1090 // What `apply` observed as the published cursor at the moment it ran.
1091 let seen_at_apply = Arc::new(Mutex::new(Vec::<u64>::new()));
1092
1093 let pub_for_apply = Arc::clone(&published);
1094 let seen = Arc::clone(&seen_at_apply);
1095 let pub_for_on = Arc::clone(&published);
1096 let (sd_tx, sd_rx) = watch::channel(false);
1097
1098 let task = tokio::spawn(watch_applied(
1099 watcher,
1100 WatchScope::All,
1101 None,
1102 None, // reader (no resync in this test)
1103 None::<AppendLogSnapshot>,
1104 None,
1105 BatchConfig {
1106 window: Duration::from_secs(3600),
1107 max,
1108 },
1109 parse_put,
1110 move |_batch: Vec<Vec<u8>>| {
1111 // The cursor visible here is whatever the PREVIOUS flush
1112 // published — never this batch's, because we haven't returned.
1113 seen.lock()
1114 .unwrap()
1115 .push(pub_for_apply.load(Ordering::SeqCst));
1116 },
1117 move |cur| pub_for_on.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1118 sd_rx,
1119 ));
1120
1121 tokio::time::sleep(Duration::from_millis(1)).await;
1122 sd_tx.send(true).unwrap();
1123 task.await.unwrap().unwrap();
1124
1125 // First apply saw 0 (nothing applied yet); second apply saw 2 (first
1126 // batch's cursor), NOT 4. The cursor only reached 4 after the second
1127 // apply returned.
1128 assert_eq!(*seen_at_apply.lock().unwrap(), vec![0, 2]);
1129 assert_eq!(published.load(Ordering::SeqCst), 4);
1130 }
1131
1132 /// Updates whose `parse` returns `None` (corrupt / irrelevant) carry no
1133 /// domain work, but they were still received — so the cursor must advance
1134 /// over them.
1135 #[tokio::test]
1136 async fn corrupt_parse_entries_advance_cursor() {
1137 let updates = vec![put("a", b"1", 5), put("b", b"2", 6), put("c", b"3", 7)];
1138 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1139
1140 let apply_calls = Arc::new(AtomicU64::new(0));
1141 let on_applied_max = Arc::new(AtomicU64::new(0));
1142 let ac = Arc::clone(&apply_calls);
1143 let om = Arc::clone(&on_applied_max);
1144 let (_sd_tx, sd_rx) = watch::channel(false);
1145
1146 let cursor = watch_applied(
1147 watcher,
1148 WatchScope::All,
1149 None,
1150 None, // reader (no resync in this test)
1151 None::<AppendLogSnapshot>,
1152 None,
1153 BatchConfig::default(),
1154 // Reject everything — simulates corrupt/irrelevant entries.
1155 |_u: &KvUpdate| -> Option<Vec<u8>> { None },
1156 move |batch: Vec<Vec<u8>>| {
1157 ac.fetch_add(1, Ordering::SeqCst);
1158 assert!(batch.is_empty());
1159 },
1160 move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1161 sd_rx,
1162 )
1163 .await
1164 .unwrap();
1165
1166 assert_eq!(cursor.as_u64(), Some(7), "cursor covers rejected updates");
1167 assert_eq!(
1168 apply_calls.load(Ordering::SeqCst),
1169 0,
1170 "an all-rejected batch applies nothing"
1171 );
1172 assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1173 }
1174
1175 /// A resume whose cursor has expired falls back to the full watch and still
1176 /// applies the delivered updates.
1177 #[tokio::test]
1178 async fn cursor_expired_falls_back_to_full_watch() {
1179 let mock = MockWatcher {
1180 full: Mutex::new(Some(vec![put("a", b"1", 10), put("b", b"2", 11)])),
1181 from: Mutex::new(Some(vec![])),
1182 from_expires: true,
1183 hold: false,
1184 };
1185 let watcher = Arc::new(mock);
1186
1187 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1188 let ab = Arc::clone(&applied_batches);
1189 let (_sd_tx, sd_rx) = watch::channel(false);
1190
1191 let cursor = watch_applied(
1192 watcher,
1193 WatchScope::All,
1194 Some(WatchCursor::from_u64(5)), // resume position that "expired"
1195 None, // reader (no resync in this test)
1196 None::<AppendLogSnapshot>,
1197 None,
1198 BatchConfig::default(),
1199 parse_put,
1200 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1201 move |_| {},
1202 sd_rx,
1203 )
1204 .await
1205 .unwrap();
1206
1207 assert_eq!(cursor.as_u64(), Some(11));
1208 assert_eq!(
1209 *applied_batches.lock().unwrap(),
1210 vec![b"1".to_vec(), b"2".to_vec()],
1211 "fallback full watch's updates were applied"
1212 );
1213 }
1214
1215 /// Cursor-expired resync: with a reader + store wired, a key the fold holds
1216 /// that the live listing no longer does gets a synthetic delete — applied
1217 /// strictly BEFORE the fallback re-list — and the persisted fold converges
1218 /// to the live state. The synthetic delete (unknown version) must not move
1219 /// the cursor; the re-list put must.
1220 #[tokio::test]
1221 async fn cursor_expired_resync_deletes_stale_keys() {
1222 let dir = tempfile::TempDir::new().unwrap();
1223 let path = dir.path().join("resync.snap");
1224 let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1225 // The fold from the previous run: node.a and node.b at cursor 2.
1226 store
1227 .apply(
1228 &[put("node.a", b"1", 1), put("node.b", b"2", 2)],
1229 &WatchCursor::from_u64(2),
1230 )
1231 .unwrap();
1232
1233 // During the gap node.b was deleted (marker since evicted) and node.a
1234 // updated; the resume cursor (2) has expired. The fallback re-list
1235 // therefore carries only the surviving key.
1236 let mock = MockWatcher {
1237 full: Mutex::new(Some(vec![put("node.a", b"1b", 10)])),
1238 from: Mutex::new(Some(vec![])),
1239 from_expires: true,
1240 hold: false,
1241 };
1242 let reader = MockReader {
1243 live: vec!["node.a".to_string()],
1244 };
1245
1246 // Record everything `parse` sees, in order, deletes included.
1247 let seen = Arc::new(Mutex::new(Vec::<(String, bool)>::new()));
1248 let s = Arc::clone(&seen);
1249 let (_sd_tx, sd_rx) = watch::channel(false);
1250
1251 let cursor = watch_applied(
1252 Arc::new(mock),
1253 WatchScope::All,
1254 Some(WatchCursor::from_u64(2)),
1255 Some(Arc::new(reader) as Arc<dyn KvReader>),
1256 Some(store),
1257 None,
1258 BatchConfig::default(),
1259 move |u: &KvUpdate| {
1260 s.lock()
1261 .unwrap()
1262 .push((u.key().to_string(), matches!(u, KvUpdate::Delete { .. })));
1263 Some(())
1264 },
1265 |_batch: Vec<()>| {},
1266 |_| {},
1267 sd_rx,
1268 )
1269 .await
1270 .unwrap();
1271
1272 // The re-list put advanced the cursor; the synthetic delete did not.
1273 assert_eq!(cursor.as_u64(), Some(10));
1274 // The synthetic delete strictly precedes the re-list put.
1275 assert_eq!(
1276 *seen.lock().unwrap(),
1277 vec![("node.b".to_string(), true), ("node.a".to_string(), false)],
1278 "synthetic delete must be applied before the fallback re-list"
1279 );
1280
1281 // The persisted fold converged: stale key gone, live key updated.
1282 let snap = crate::snapshot::load(&path).unwrap().unwrap();
1283 assert_eq!(snap.cursor.as_u64(), Some(10));
1284 assert_eq!(snap.entries.len(), 1);
1285 assert_eq!(snap.entries["node.a"].value, b"1b");
1286 }
1287
1288 /// A prefix-scoped resync diffs only in-scope keys: an out-of-scope key the
1289 /// fold holds survives, the in-scope stale key is deleted, and a flush
1290 /// containing only synthetic deletes leaves the cursor untouched.
1291 #[tokio::test]
1292 async fn cursor_expired_resync_respects_scope() {
1293 let dir = tempfile::TempDir::new().unwrap();
1294 let path = dir.path().join("resync-scope.snap");
1295 let (_r, mut store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1296 store
1297 .apply(
1298 &[put("node.b", b"2", 1), put("other.z", b"9", 2)],
1299 &WatchCursor::from_u64(2),
1300 )
1301 .unwrap();
1302
1303 // Expired resume; the bucket no longer has ANY node.* keys; the
1304 // fallback re-list is empty.
1305 let mock = MockWatcher {
1306 full: Mutex::new(Some(vec![])),
1307 from: Mutex::new(Some(vec![])),
1308 from_expires: true,
1309 hold: false,
1310 };
1311 let reader = MockReader { live: vec![] };
1312 let (_sd_tx, sd_rx) = watch::channel(false);
1313
1314 let cursor = watch_applied(
1315 Arc::new(mock),
1316 WatchScope::Prefix("node.".to_string()),
1317 Some(WatchCursor::from_u64(2)),
1318 Some(Arc::new(reader) as Arc<dyn KvReader>),
1319 Some(store),
1320 None,
1321 BatchConfig::default(),
1322 |_u: &KvUpdate| Some(()),
1323 |_batch: Vec<()>| {},
1324 |_| {},
1325 sd_rx,
1326 )
1327 .await
1328 .unwrap();
1329
1330 // Deletes-only flush: cursor stays at the resume position.
1331 assert_eq!(cursor.as_u64(), Some(2));
1332
1333 let snap = crate::snapshot::load(&path).unwrap().unwrap();
1334 assert_eq!(snap.cursor.as_u64(), Some(2));
1335 assert!(
1336 !snap.entries.contains_key("node.b"),
1337 "in-scope stale key must be resync-deleted"
1338 );
1339 assert_eq!(
1340 snap.entries["other.z"].value, b"9",
1341 "out-of-scope key must survive a prefix-scoped resync"
1342 );
1343 }
1344
1345 /// `WatchScope::Prefixes` dispatches to `watch_prefixes` (no resume) and to
1346 /// `watch_prefixes_from` with the expiry → full-watch fallback (resume).
1347 #[tokio::test]
1348 async fn prefixes_scope_dispatches_full_watch() {
1349 let updates = vec![put("a.x", b"1", 1), put("b.y", b"2", 2)];
1350 let watcher = Arc::new(MockWatcher::new(updates, false));
1351 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1352 let ab = Arc::clone(&applied_batches);
1353 let (_sd_tx, sd_rx) = watch::channel(false);
1354
1355 let cursor = watch_applied(
1356 watcher,
1357 WatchScope::Prefixes(vec!["a.".to_string(), "b.".to_string()]),
1358 None,
1359 None, // reader (no resync in this test)
1360 None::<AppendLogSnapshot>,
1361 None,
1362 BatchConfig::default(),
1363 parse_put,
1364 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1365 move |_| {},
1366 sd_rx,
1367 )
1368 .await
1369 .unwrap();
1370
1371 assert_eq!(cursor.as_u64(), Some(2));
1372 assert_eq!(
1373 *applied_batches.lock().unwrap(),
1374 vec![b"1".to_vec(), b"2".to_vec()]
1375 );
1376 }
1377
1378 /// `WatchScope::Prefixes` resume whose cursor has expired falls back to the
1379 /// full multi-prefix watch and applies its updates.
1380 #[tokio::test]
1381 async fn prefixes_scope_expired_resume_falls_back() {
1382 let mock = MockWatcher {
1383 full: Mutex::new(Some(vec![put("a.x", b"1", 7)])),
1384 from: Mutex::new(Some(vec![])),
1385 from_expires: true,
1386 hold: false,
1387 };
1388 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1389 let ab = Arc::clone(&applied_batches);
1390 let (_sd_tx, sd_rx) = watch::channel(false);
1391
1392 let cursor = watch_applied(
1393 Arc::new(mock),
1394 WatchScope::Prefixes(vec!["a.".to_string()]),
1395 Some(WatchCursor::from_u64(3)),
1396 None, // reader (no resync in this test)
1397 None::<AppendLogSnapshot>,
1398 None,
1399 BatchConfig::default(),
1400 parse_put,
1401 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1402 move |_| {},
1403 sd_rx,
1404 )
1405 .await
1406 .unwrap();
1407
1408 assert_eq!(cursor.as_u64(), Some(7));
1409 assert_eq!(*applied_batches.lock().unwrap(), vec![b"1".to_vec()]);
1410 }
1411
1412 /// End-to-end with a real snapshot file: after the run, the persisted
1413 /// snapshot's cursor equals the applied cursor and its entries match the
1414 /// applied state — proving the checkpoint is written at the post-apply
1415 /// cursor, never ahead of it.
1416 #[tokio::test]
1417 async fn snapshot_checkpoint_matches_applied_cursor() {
1418 let dir = tempfile::TempDir::new().unwrap();
1419 let path = dir.path().join("applied.snap");
1420 let (_resume, store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
1421
1422 let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
1423 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1424 let (_sd_tx, sd_rx) = watch::channel(false);
1425
1426 let cursor = watch_applied(
1427 watcher,
1428 WatchScope::All,
1429 None,
1430 None, // reader (no resync in this test)
1431 Some(store),
1432 None,
1433 BatchConfig::default(),
1434 parse_put,
1435 move |_batch: Vec<Vec<u8>>| {},
1436 move |_| {},
1437 sd_rx,
1438 )
1439 .await
1440 .unwrap();
1441
1442 assert_eq!(cursor.as_u64(), Some(2));
1443
1444 let snap = crate::snapshot::load(&path).unwrap().unwrap();
1445 assert_eq!(
1446 snap.cursor.as_u64(),
1447 cursor.as_u64(),
1448 "snapshot checkpoint cursor must equal the applied cursor"
1449 );
1450 assert_eq!(snap.entries.len(), 2);
1451 assert_eq!(snap.entries["node.a"].value, b"1");
1452 assert_eq!(snap.entries["node.b"].value, b"2");
1453 }
1454
1455 /// Happy-path resume: a non-expired cursor takes the `*_from` path and the
1456 /// delta (the `from` script, NOT the full set) is applied. Proves the
1457 /// resume branch delivers only post-cursor updates and advances to their
1458 /// max revision.
1459 #[tokio::test]
1460 async fn resume_from_cursor_delivers_only_delta() {
1461 let mock = MockWatcher {
1462 // `full` would be delivered only if the resume path were (wrongly)
1463 // bypassed; a non-empty distinguishing value makes that visible.
1464 full: Mutex::new(Some(vec![put("full.x", b"FULL", 1)])),
1465 from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
1466 from_expires: false,
1467 hold: false,
1468 };
1469 let watcher = Arc::new(mock);
1470
1471 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1472 let ab = Arc::clone(&applied_batches);
1473 let (_sd_tx, sd_rx) = watch::channel(false);
1474
1475 let cursor = watch_applied(
1476 watcher,
1477 WatchScope::All,
1478 Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
1479 None, // reader (no resync in this test)
1480 None::<AppendLogSnapshot>,
1481 None,
1482 BatchConfig::default(),
1483 parse_put,
1484 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1485 move |_| {},
1486 sd_rx,
1487 )
1488 .await
1489 .unwrap();
1490
1491 assert_eq!(
1492 cursor.as_u64(),
1493 Some(11),
1494 "cursor advances to the delta max"
1495 );
1496 assert_eq!(
1497 *applied_batches.lock().unwrap(),
1498 vec![b"3".to_vec(), b"4".to_vec()],
1499 "only the post-cursor delta is applied, never the full set"
1500 );
1501 }
1502
1503 /// `WatchScope::Prefix` with no resume dispatches to `watch_prefix` and
1504 /// applies the delivered updates. Every other test uses `WatchScope::All`;
1505 /// this covers the prefix dispatch arm.
1506 #[tokio::test]
1507 async fn prefix_scope_applies_delivered_updates() {
1508 let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
1509 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1510
1511 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1512 let ab = Arc::clone(&applied_batches);
1513 let (_sd_tx, sd_rx) = watch::channel(false);
1514
1515 let cursor = watch_applied(
1516 watcher,
1517 WatchScope::Prefix("node.".to_string()),
1518 None,
1519 None, // reader (no resync in this test)
1520 None::<AppendLogSnapshot>,
1521 None,
1522 BatchConfig::default(),
1523 parse_put,
1524 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1525 move |_| {},
1526 sd_rx,
1527 )
1528 .await
1529 .unwrap();
1530
1531 assert_eq!(cursor.as_u64(), Some(2));
1532 assert_eq!(
1533 *applied_batches.lock().unwrap(),
1534 vec![b"1".to_vec(), b"2".to_vec()]
1535 );
1536 }
1537
1538 /// `WatchScope::Prefix` happy-path resume: a non-expired cursor takes the
1539 /// `watch_prefix_from` path and only the delta is applied — the prefix
1540 /// twin of `resume_from_cursor_delivers_only_delta`.
1541 #[tokio::test]
1542 async fn prefix_resume_from_cursor_delivers_only_delta() {
1543 let mock = MockWatcher {
1544 // `full` would be delivered only if the resume path were (wrongly)
1545 // bypassed; a distinguishing value makes that visible.
1546 full: Mutex::new(Some(vec![put("node.x", b"FULL", 1)])),
1547 from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
1548 from_expires: false,
1549 hold: false,
1550 };
1551 let watcher = Arc::new(mock);
1552
1553 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1554 let ab = Arc::clone(&applied_batches);
1555 let (_sd_tx, sd_rx) = watch::channel(false);
1556
1557 let cursor = watch_applied(
1558 watcher,
1559 WatchScope::Prefix("node.".to_string()),
1560 Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
1561 None, // reader (no resync in this test)
1562 None::<AppendLogSnapshot>,
1563 None,
1564 BatchConfig::default(),
1565 parse_put,
1566 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1567 move |_| {},
1568 sd_rx,
1569 )
1570 .await
1571 .unwrap();
1572
1573 assert_eq!(
1574 cursor.as_u64(),
1575 Some(11),
1576 "cursor advances to the delta max"
1577 );
1578 assert_eq!(
1579 *applied_batches.lock().unwrap(),
1580 vec![b"3".to_vec(), b"4".to_vec()],
1581 "only the post-cursor delta is applied via watch_prefix_from"
1582 );
1583 }
1584
1585 /// `WatchScope::Prefix` resume whose cursor has expired falls back to the
1586 /// full `watch_prefix` and still applies the delivered updates — the prefix
1587 /// twin of `cursor_expired_falls_back_to_full_watch`.
1588 #[tokio::test]
1589 async fn prefix_cursor_expired_falls_back_to_full_prefix_watch() {
1590 let mock = MockWatcher {
1591 full: Mutex::new(Some(vec![put("node.a", b"1", 10), put("node.b", b"2", 11)])),
1592 from: Mutex::new(Some(vec![])),
1593 from_expires: true,
1594 hold: false,
1595 };
1596 let watcher = Arc::new(mock);
1597
1598 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1599 let ab = Arc::clone(&applied_batches);
1600 let (_sd_tx, sd_rx) = watch::channel(false);
1601
1602 let cursor = watch_applied(
1603 watcher,
1604 WatchScope::Prefix("node.".to_string()),
1605 Some(WatchCursor::from_u64(5)), // resume position that "expired"
1606 None, // reader (no resync in this test)
1607 None::<AppendLogSnapshot>,
1608 None,
1609 BatchConfig::default(),
1610 parse_put,
1611 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1612 move |_| {},
1613 sd_rx,
1614 )
1615 .await
1616 .unwrap();
1617
1618 assert_eq!(cursor.as_u64(), Some(11));
1619 assert_eq!(
1620 *applied_batches.lock().unwrap(),
1621 vec![b"1".to_vec(), b"2".to_vec()],
1622 "prefix fallback full watch's updates were applied"
1623 );
1624 }
1625
1626 /// The watch task's terminal error must propagate out of `watch_applied`
1627 /// rather than being swallowed as `Ok(applied)` when the channel closes.
1628 #[tokio::test]
1629 async fn watch_task_error_propagates() {
1630 let watcher = Arc::new(ErrorWatcher);
1631 let (_sd_tx, sd_rx) = watch::channel(false);
1632
1633 let result = watch_applied(
1634 watcher,
1635 WatchScope::All,
1636 None,
1637 None, // reader (no resync in this test)
1638 None::<AppendLogSnapshot>,
1639 None,
1640 BatchConfig::default(),
1641 parse_put,
1642 move |_batch: Vec<Vec<u8>>| {},
1643 move |_| {},
1644 sd_rx,
1645 )
1646 .await;
1647
1648 match result {
1649 Err(KvError::WatchError(msg)) => {
1650 assert!(msg.contains("injected"), "error carries the cause: {msg}");
1651 }
1652 other => panic!("expected WatchError, got {other:?}"),
1653 }
1654 }
1655
1656 /// A batch where `parse` accepts some updates and rejects others: the cursor
1657 /// must still advance to the highest *received* revision (covering the
1658 /// rejected entry in the middle), while `apply` sees only the accepted ones.
1659 #[tokio::test]
1660 async fn mixed_parse_advances_cursor_over_rejected_entries() {
1661 let updates = vec![
1662 put("keep.a", b"1", 5),
1663 put("skip.b", b"2", 6), // rejected by parse
1664 put("keep.c", b"3", 7),
1665 ];
1666 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1667
1668 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1669 let on_applied_max = Arc::new(AtomicU64::new(0));
1670 let ab = Arc::clone(&applied_batches);
1671 let om = Arc::clone(&on_applied_max);
1672 let (_sd_tx, sd_rx) = watch::channel(false);
1673
1674 let cursor = watch_applied(
1675 watcher,
1676 WatchScope::All,
1677 None,
1678 None, // reader (no resync in this test)
1679 None::<AppendLogSnapshot>,
1680 None,
1681 BatchConfig::default(),
1682 // Keep only keys under "keep."; reject everything else.
1683 |u: &KvUpdate| -> Option<Vec<u8>> {
1684 match u {
1685 KvUpdate::Put(e) if e.key.starts_with("keep.") => Some(e.value.clone()),
1686 _ => None,
1687 }
1688 },
1689 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1690 move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1691 sd_rx,
1692 )
1693 .await
1694 .unwrap();
1695
1696 assert_eq!(
1697 cursor.as_u64(),
1698 Some(7),
1699 "cursor covers the rejected middle entry (rev 6)"
1700 );
1701 assert_eq!(
1702 *applied_batches.lock().unwrap(),
1703 vec![b"1".to_vec(), b"3".to_vec()],
1704 "apply sees only the accepted entries"
1705 );
1706 assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1707 }
1708
1709 /// Shutdown before any update arrives: nothing was received, so the cursor
1710 /// stays at the resume position (here `none()`), `apply` never runs, and
1711 /// `on_applied` never fires.
1712 #[tokio::test(start_paused = true)]
1713 async fn shutdown_with_no_pending_batch() {
1714 let watcher = Arc::new(MockWatcher::new(vec![], true)); // deliver nothing, hold open
1715
1716 let apply_calls = Arc::new(AtomicU64::new(0));
1717 let on_applied_calls = Arc::new(AtomicU64::new(0));
1718 let ac = Arc::clone(&apply_calls);
1719 let oc = Arc::clone(&on_applied_calls);
1720 let (sd_tx, sd_rx) = watch::channel(false);
1721
1722 let task = tokio::spawn(watch_applied(
1723 watcher,
1724 WatchScope::All,
1725 None,
1726 None, // reader (no resync in this test)
1727 None::<AppendLogSnapshot>,
1728 None,
1729 BatchConfig::default(),
1730 parse_put,
1731 move |_batch: Vec<Vec<u8>>| {
1732 ac.fetch_add(1, Ordering::SeqCst);
1733 },
1734 move |_| {
1735 oc.fetch_add(1, Ordering::SeqCst);
1736 },
1737 sd_rx,
1738 ));
1739
1740 // Let the watcher attach and idle (it has nothing to deliver), then shut down.
1741 tokio::time::sleep(Duration::from_millis(1)).await;
1742 sd_tx.send(true).unwrap();
1743
1744 let cursor = task.await.unwrap().unwrap();
1745 assert_eq!(
1746 cursor.as_u64(),
1747 None,
1748 "no updates received → cursor unmoved"
1749 );
1750 assert_eq!(apply_calls.load(Ordering::SeqCst), 0, "apply never runs");
1751 assert_eq!(
1752 on_applied_calls.load(Ordering::SeqCst),
1753 0,
1754 "on_applied never fires"
1755 );
1756 }
1757
1758 /// An [`ExportRequest`] flushes the pending batch first, so the artifact's
1759 /// cursor is exactly the applied cursor — and the artifact is importable
1760 /// with the batched entries in it.
1761 #[tokio::test(start_paused = true)]
1762 async fn export_request_flushes_pending_batch_first() {
1763 let dir = tempfile::TempDir::new().unwrap();
1764 let store_path = dir.path().join("fold.snap");
1765 let artifact = dir.path().join("artifact");
1766 let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1767
1768 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1769 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1770 let (sd_tx, sd_rx) = watch::channel(false);
1771 let (ex_tx, ex_rx) = mpsc::channel(1);
1772
1773 let task = tokio::spawn(watch_applied(
1774 watcher,
1775 WatchScope::All,
1776 None,
1777 None, // reader (no resync in this test)
1778 Some(store),
1779 Some(ex_rx),
1780 BatchConfig {
1781 window: Duration::from_secs(3600), // window never fires
1782 max: 100,
1783 },
1784 parse_put,
1785 move |_batch: Vec<Vec<u8>>| {},
1786 move |_| {},
1787 sd_rx,
1788 ));
1789
1790 // Let both updates land in the (unflushed) pending batch, then export.
1791 tokio::time::sleep(Duration::from_millis(1)).await;
1792 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1793 ex_tx
1794 .send(ExportRequest {
1795 dest_dir: artifact.clone(),
1796 reply: reply_tx,
1797 })
1798 .await
1799 .unwrap();
1800
1801 let manifest = reply_rx.await.unwrap().expect("export succeeds");
1802 assert_eq!(
1803 manifest.cursor.as_u64(),
1804 Some(2),
1805 "pending batch flushed before export: artifact cursor is the applied cursor"
1806 );
1807
1808 // The artifact is importable and holds both batched entries.
1809 let (cursor, imported) =
1810 AppendLogSnapshot::import(&artifact, &dir.path().join("imported.snap"), u64::MAX)
1811 .unwrap();
1812 assert_eq!(cursor.as_u64(), Some(2));
1813 assert_eq!(imported.get("a").unwrap().unwrap().value, b"1");
1814 assert_eq!(imported.get("b").unwrap().unwrap().value, b"2");
1815
1816 sd_tx.send(true).unwrap();
1817 task.await.unwrap().unwrap();
1818 }
1819
1820 /// An [`ExportRequest`] that arrives with NOTHING pending (the window
1821 /// already flushed everything) still produces a valid artifact whose
1822 /// cursor is the applied cursor. The flush-before-export step must be a
1823 /// clean no-op, not an error or a cursor regression.
1824 #[tokio::test(start_paused = true)]
1825 async fn export_with_empty_pending_batch_succeeds() {
1826 let dir = tempfile::TempDir::new().unwrap();
1827 let store_path = dir.path().join("fold.snap");
1828 let artifact = dir.path().join("artifact");
1829 let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1830
1831 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
1832 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
1833 let (sd_tx, sd_rx) = watch::channel(false);
1834 let (ex_tx, ex_rx) = mpsc::channel(1);
1835
1836 let task = tokio::spawn(watch_applied(
1837 watcher,
1838 WatchScope::All,
1839 None,
1840 None, // reader (no resync in this test)
1841 Some(store),
1842 Some(ex_rx),
1843 BatchConfig::default(), // 10 ms window
1844 parse_put,
1845 move |_batch: Vec<Vec<u8>>| {},
1846 move |_| {},
1847 sd_rx,
1848 ));
1849
1850 // Let the window flush both updates, so the export request finds an
1851 // EMPTY pending batch.
1852 tokio::time::sleep(Duration::from_millis(50)).await;
1853
1854 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1855 ex_tx
1856 .send(ExportRequest {
1857 dest_dir: artifact.clone(),
1858 reply: reply_tx,
1859 })
1860 .await
1861 .unwrap();
1862 let manifest = reply_rx
1863 .await
1864 .unwrap()
1865 .expect("export succeeds with nothing pending");
1866 assert_eq!(
1867 manifest.cursor.as_u64(),
1868 Some(2),
1869 "artifact cursor is the applied cursor, unchanged by the no-op flush"
1870 );
1871
1872 // The artifact is importable and holds the already-flushed entries.
1873 let (cursor, imported) =
1874 AppendLogSnapshot::import(&artifact, &dir.path().join("imported.snap"), u64::MAX)
1875 .unwrap();
1876 assert_eq!(cursor.as_u64(), Some(2));
1877 assert_eq!(imported.get("a").unwrap().unwrap().value, b"1");
1878 assert_eq!(imported.get("b").unwrap().unwrap().value, b"2");
1879
1880 sd_tx.send(true).unwrap();
1881 task.await.unwrap().unwrap();
1882 }
1883
1884 /// An export request against a store-less watch replies with an error and
1885 /// the watch keeps running.
1886 #[tokio::test(start_paused = true)]
1887 async fn export_without_store_replies_error() {
1888 let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
1889 let (sd_tx, sd_rx) = watch::channel(false);
1890 let (ex_tx, ex_rx) = mpsc::channel(1);
1891
1892 let task = tokio::spawn(watch_applied(
1893 watcher,
1894 WatchScope::All,
1895 None,
1896 None, // reader (no resync in this test)
1897 None::<AppendLogSnapshot>,
1898 Some(ex_rx),
1899 BatchConfig::default(),
1900 parse_put,
1901 move |_batch: Vec<Vec<u8>>| {},
1902 move |_| {},
1903 sd_rx,
1904 ));
1905
1906 tokio::time::sleep(Duration::from_millis(1)).await;
1907 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1908 ex_tx
1909 .send(ExportRequest {
1910 dest_dir: std::env::temp_dir().join("never-created"),
1911 reply: reply_tx,
1912 })
1913 .await
1914 .unwrap();
1915 assert!(
1916 reply_rx.await.unwrap().is_err(),
1917 "no store → export errors via the reply"
1918 );
1919
1920 // The watch is still alive and returns its applied cursor on shutdown.
1921 sd_tx.send(true).unwrap();
1922 let cursor = task.await.unwrap().unwrap();
1923 assert_eq!(cursor.as_u64(), Some(1));
1924 }
1925
1926 /// An export failure (unavailable destination) is reported on the reply and
1927 /// the watch keeps applying later updates.
1928 #[tokio::test(start_paused = true)]
1929 async fn export_error_does_not_kill_watch() {
1930 let dir = tempfile::TempDir::new().unwrap();
1931 let store_path = dir.path().join("fold.snap");
1932 let (_r, store) = AppendLogSnapshot::open(&store_path, u64::MAX).unwrap();
1933
1934 // Occupied destination → export fails.
1935 let occupied = dir.path().join("occupied");
1936 std::fs::create_dir(&occupied).unwrap();
1937 std::fs::write(occupied.join("stray"), b"x").unwrap();
1938
1939 let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
1940 let (sd_tx, sd_rx) = watch::channel(false);
1941 let (ex_tx, ex_rx) = mpsc::channel(1);
1942
1943 let applied = Arc::new(AtomicU64::new(0));
1944 let a = Arc::clone(&applied);
1945
1946 let task = tokio::spawn(watch_applied(
1947 watcher,
1948 WatchScope::All,
1949 None,
1950 None, // reader (no resync in this test)
1951 Some(store),
1952 Some(ex_rx),
1953 BatchConfig::default(),
1954 parse_put,
1955 move |_batch: Vec<Vec<u8>>| {},
1956 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1957 sd_rx,
1958 ));
1959
1960 tokio::time::sleep(Duration::from_millis(1)).await;
1961 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
1962 ex_tx
1963 .send(ExportRequest {
1964 dest_dir: occupied,
1965 reply: reply_tx,
1966 })
1967 .await
1968 .unwrap();
1969 match reply_rx.await.unwrap() {
1970 Err(crate::snapshot::SnapshotError::ArtifactInvalid(_)) => {}
1971 other => panic!("expected ArtifactInvalid, got {other:?}"),
1972 }
1973
1974 // Watch still folds: a clean shutdown returns the applied cursor.
1975 sd_tx.send(true).unwrap();
1976 let cursor = task.await.unwrap().unwrap();
1977 assert_eq!(cursor.as_u64(), Some(1), "watch survived the failed export");
1978 assert_eq!(applied.load(Ordering::SeqCst), 1);
1979 }
1980
1981 /// Dropping the export sender disarms the arm; the loop keeps batching and
1982 /// flushing normally.
1983 #[tokio::test(start_paused = true)]
1984 async fn export_sender_dropped_disarms_channel() {
1985 let watcher = Arc::new(MockWatcher::new(vec![put("a", b"1", 1)], true));
1986 let (sd_tx, sd_rx) = watch::channel(false);
1987 let (ex_tx, ex_rx) = mpsc::channel::<ExportRequest>(1);
1988
1989 let applied = Arc::new(AtomicU64::new(0));
1990 let a = Arc::clone(&applied);
1991
1992 let task = tokio::spawn(watch_applied(
1993 watcher,
1994 WatchScope::All,
1995 None,
1996 None, // reader (no resync in this test)
1997 None::<AppendLogSnapshot>,
1998 Some(ex_rx),
1999 BatchConfig::default(),
2000 parse_put,
2001 move |_batch: Vec<Vec<u8>>| {},
2002 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
2003 sd_rx,
2004 ));
2005
2006 drop(ex_tx); // disarm
2007 tokio::time::sleep(Duration::from_millis(50)).await;
2008 assert_eq!(
2009 applied.load(Ordering::SeqCst),
2010 1,
2011 "loop keeps flushing after the export sender is gone"
2012 );
2013
2014 sd_tx.send(true).unwrap();
2015 task.await.unwrap().unwrap();
2016 }
2017
2018 /// With a low `compact_threshold`, the flush path's `spawn_blocking`
2019 /// compaction actually fires (every other snapshot test pins the threshold
2020 /// at `u64::MAX`, leaving that branch dead). After a compacting run the
2021 /// snapshot must still load cleanly with the right cursor and entries.
2022 #[tokio::test]
2023 async fn snapshot_compaction_fires_and_stays_consistent() {
2024 let dir = tempfile::TempDir::new().unwrap();
2025 let path = dir.path().join("applied.snap");
2026 // threshold 0 → every checkpoint reports "needs compact", forcing the
2027 // store's inline-compaction branch on each flush (run off the hot path via
2028 // spawn_blocking inside watch_applied).
2029 let (_resume, store) = AppendLogSnapshot::open(&path, 0).unwrap();
2030
2031 // Re-put the same key across flushes so compaction has duplicates to
2032 // dedup; small max forces multiple flushes (hence multiple compactions).
2033 let updates = vec![
2034 put("node.a", b"1", 1),
2035 put("node.a", b"2", 2),
2036 put("node.b", b"3", 3),
2037 put("node.a", b"4", 4),
2038 ];
2039 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
2040 let (_sd_tx, sd_rx) = watch::channel(false);
2041
2042 let cursor = watch_applied(
2043 watcher,
2044 WatchScope::All,
2045 None,
2046 None, // reader (no resync in this test)
2047 Some(store),
2048 None,
2049 BatchConfig {
2050 window: Duration::from_secs(3600),
2051 max: 1, // one update per flush → a compaction per update
2052 },
2053 parse_put,
2054 move |_batch: Vec<Vec<u8>>| {},
2055 move |_| {},
2056 sd_rx,
2057 )
2058 .await
2059 .unwrap();
2060
2061 assert_eq!(cursor.as_u64(), Some(4));
2062
2063 let snap = crate::snapshot::load(&path).unwrap().unwrap();
2064 assert_eq!(
2065 snap.cursor.as_u64(),
2066 cursor.as_u64(),
2067 "compacted snapshot's cursor still equals the applied cursor"
2068 );
2069 assert_eq!(snap.entries.len(), 2, "duplicates of node.a deduped");
2070 assert_eq!(
2071 snap.entries["node.a"].value, b"4",
2072 "last write per key survives compaction"
2073 );
2074 assert_eq!(snap.entries["node.b"].value, b"3");
2075 }
2076}