slipstream/applied.rs
1//! Cursor-after-apply watch combinator.
2//!
3//! [`watch_applied`] drives a [`KvWatcher`], batches incoming [`KvUpdate`]s over
4//! a short window (or a max count), hands each batch to a caller-supplied
5//! `apply` closure, and **only then** advances the resume cursor, checkpoints
6//! the snapshot, and fires `on_applied`. It encodes one discipline that every
7//! hand-rolled watch loop in the wider system gets subtly wrong:
8//!
9//! > **INVARIANT.** A persisted/reported cursor `C` implies every update with
10//! > revision ≤ `C` has been *applied* — the caller's `apply()` has returned for
11//! > it. The cursor never advances on *receipt* of an update, only after it has
12//! > durably taken effect.
13//!
14//! ## Why receipt is the wrong signal
15//!
16//! The tempting shortcut is to bump the cursor as each update arrives off the
17//! channel (`high_water = rev` on `rx.recv()`), then apply the batch later. On a
18//! crash between those two steps the persisted cursor claims "caught up to rev
19//! N" while rev N is still sitting in an unapplied buffer. On resume the watch
20//! starts *past* rev N and silently skips it — a correctness hole in the exact
21//! "resume after any restart" guarantee this crate advertises.
22//!
23//! Saltzer, Reed & Clark's *End-to-End Arguments in System Design* (1984) names
24//! the fix: a function placed below the endpoints (here, the channel receive)
25//! can only be a performance hint; the *endpoint* — the application of the
26//! update — is the only place the "it happened" guarantee can actually be
27//! established. So the cursor is written from `apply()`'s completion, not from
28//! the transport's delivery.
29//!
30//! The cursor-as-monotonic-index-into-a-log shape itself follows HashiCorp
31//! Consul's anti-entropy / blocking-query lineage: a client holds the last index
32//! it has *reconciled* and re-arms the watch from there, never from the index it
33//! merely *saw*.
34//!
35//! ## What the caller supplies
36//!
37//! - `parse`: maps a raw [`KvUpdate`] to an optional domain value `U`. Returning
38//! `None` (corrupt bytes, irrelevant key) is fine — the update is still
39//! *received*, so it still counts toward the cursor; there is simply nothing to
40//! apply for it.
41//! - `apply`: consumes a `Vec<U>` in revision order. This is the only domain
42//! logic; for the tunnel router it swaps the route table, for the edge origin
43//! watcher it rebuilds the hashrings.
44//! - `on_applied`: fires once per flush, *after* `apply` returns, with the new
45//! applied cursor. Callers use it to persist the cursor for the next restart.
46//!
47//! ## Panics
48//!
49//! `apply` runs inline on the watch task. If it panics, the panic propagates out
50//! of [`watch_applied`] and aborts the watch — that is the caller's contract,
51//! the same as a panic in any other supplied closure.
52
53use std::sync::Arc;
54use std::time::Duration;
55
56use tokio::sync::mpsc;
57use tokio::sync::watch;
58use tracing::warn;
59
60use crate::kv::{KvError, KvUpdate, KvWatcher, WatchCursor};
61use crate::snapshot::SnapshotWriter;
62
63/// What to watch: every key, or every key under a prefix.
64///
65/// Mirrors the [`KvWatcher`] surface — `All` maps to `watch_all` /
66/// `watch_all_from`, `Prefix` to `watch_prefix` / `watch_prefix_from`.
67#[derive(Debug, Clone)]
68pub enum WatchScope {
69 /// Watch all keys in the bucket.
70 All,
71 /// Watch only keys beginning with this prefix.
72 Prefix(String),
73}
74
75/// Batching policy for [`watch_applied`].
76///
77/// A flush fires when **either** bound is hit, whichever comes first: `window`
78/// time has elapsed since the batch opened, or `max` updates have accumulated.
79/// The window amortizes the cost of `apply` (e.g. one route-table clone per
80/// flush instead of one per update); `max` caps memory and latency when updates
81/// arrive faster than the window.
82#[derive(Debug, Clone, Copy)]
83pub struct BatchConfig {
84 /// Maximum time a batch stays open before being flushed.
85 pub window: Duration,
86 /// Maximum number of parsed updates in a batch before forcing a flush.
87 pub max: usize,
88}
89
90impl Default for BatchConfig {
91 /// 10 ms / 100 updates — the de-facto default every hand-rolled caller
92 /// already used, lifted into one place.
93 fn default() -> Self {
94 Self {
95 window: Duration::from_millis(10),
96 max: 100,
97 }
98 }
99}
100
101/// Drive a watch with cursor-after-apply semantics.
102///
103/// Subscribes per `scope` (resuming from `resume` when it carries a position),
104/// batches updates per `config`, applies each batch via `apply`, and only then
105/// advances the cursor / checkpoints `snapshot` / calls `on_applied`. Returns
106/// the final applied cursor when the watch ends (shutdown signalled, or the
107/// underlying stream closed).
108///
109/// Raw [`KvUpdate`]s are streamed to `snapshot` as they arrive, but the
110/// *checkpoint* cursor written on each flush is the post-apply cursor — so a
111/// loaded snapshot's cursor is always consistent with the state it carries
112/// (the cursor never names a revision whose `apply` had not returned before the
113/// checkpoint).
114///
115/// On [`KvError::CursorExpired`] from the `*_from` resume path, this logs and
116/// falls back to a full-scope watch (`watch_all` / `watch_prefix`). Callers see
117/// the full re-list as a stream of puts, exactly as the hand-rolled loops did.
118///
119/// See `ARCHITECTURE.md` ("Applied-Cursor Watch") for the invariant and its
120/// rationale.
121///
122/// # Type parameters
123/// - `U`: the caller's domain update type, produced by `parse` and consumed by
124/// `apply`.
125// This combinator takes each of its dependencies as a parameter so every
126// caller-supplied closure (`parse`/`apply`/`on_applied`) keeps its own distinct
127// type and is monomorphized at the call site. Folding them into a builder struct
128// would either box the closures or force a single generic bundle, losing that.
129#[allow(clippy::too_many_arguments)]
130// The flush macro resets `batch_high`/`batch_deadline`/`snapshot` for the next
131// loop iteration. At the two flush sites that return immediately afterward
132// (shutdown, channel-close) those resets are dead stores — correct, but flagged.
133#[allow(unused_assignments)]
134pub async fn watch_applied<U, P, A, O>(
135 watcher: Arc<dyn KvWatcher>,
136 scope: WatchScope,
137 resume: Option<WatchCursor>,
138 mut snapshot: Option<SnapshotWriter>,
139 config: BatchConfig,
140 mut parse: P,
141 mut apply: A,
142 mut on_applied: O,
143 mut shutdown: watch::Receiver<bool>,
144) -> Result<WatchCursor, KvError>
145where
146 U: Send,
147 P: FnMut(&KvUpdate) -> Option<U> + Send,
148 A: FnMut(Vec<U>) + Send,
149 O: FnMut(WatchCursor) + Send,
150{
151 // The cursor we'll return. Initialized from the resume position so that a
152 // watch which receives nothing new still reports the position it resumed
153 // from as "applied" (it is — everything up to it was applied before the last
154 // run persisted it).
155 let mut applied = match &resume {
156 Some(c) => c.clone(),
157 None => WatchCursor::none(),
158 };
159
160 // Spawn the watch task. It owns the cursor-expired fallback so the main loop
161 // only ever sees a clean ordered stream of updates on `rx`.
162 let (tx, mut rx) = mpsc::channel::<KvUpdate>(256);
163 let handle = {
164 let watcher = Arc::clone(&watcher);
165 tokio::spawn(async move { run_watch(watcher.as_ref(), &scope, resume, tx).await })
166 };
167
168 // Batch state.
169 //
170 // `batch_high` tracks the version of the most recently *received* update
171 // since the last flush — including updates `parse` rejected. NATS delivers
172 // in revision order, so the last received is the highest, and advancing the
173 // cursor to it after a single atomic `apply` is correct: having seen the max
174 // means we've seen everything below it, and a rejected entry is still
175 // "nothing to apply", hence covered. Reset to `none()` after every flush.
176 let batch_cap = config.max.clamp(1, 64);
177 let mut batch: Vec<U> = Vec::with_capacity(batch_cap);
178 let mut batch_high = WatchCursor::none();
179 // `Some` once a batch has opened and the window timer is armed; `None`
180 // between flushes. Only the armed/idle distinction is read in the loop —
181 // the absolute instant lives in the pinned `sleep` future below.
182 let mut batch_deadline: Option<tokio::time::Instant> = None;
183
184 // Flush the current batch: apply (if non-empty), then advance the cursor,
185 // checkpoint the snapshot at that cursor, and fire `on_applied` — in that
186 // order. Returns whether the snapshot grew past its compaction threshold.
187 // Defined as a macro so it can mutate the locals above and `.await` nothing
188 // itself; compaction (which does block) is handled by the caller via
189 // `flush_and_compact!`.
190 macro_rules! flush_inner {
191 () => {{
192 let mut needs_compact = false;
193 // Nothing received since the last flush → nothing to do at all.
194 if !batch.is_empty() || !batch_high.is_none() {
195 if !batch.is_empty() {
196 // INVARIANT: apply() runs and RETURNS before any cursor
197 // advance below. Move the batch out so a panicking apply
198 // can't leave half-consumed state behind.
199 //
200 // `replace` (not `take`) leaves a pre-sized Vec behind:
201 // `take` swaps in a zero-capacity `Vec::new()`, so every
202 // batch after the first re-climbs the reallocation ladder
203 // (4→8→…→cap). Handing back a `with_capacity` Vec keeps the
204 // amortized allocation to one per batch.
205 apply(std::mem::replace(&mut batch, Vec::with_capacity(batch_cap)));
206 }
207 if !batch_high.is_none() {
208 applied = batch_high.clone();
209 if let Some(sw) = snapshot.as_mut() {
210 match sw.checkpoint(&applied) {
211 Ok(true) => needs_compact = true,
212 Ok(false) => {}
213 Err(e) => warn!(error = %e, "snapshot checkpoint failed"),
214 }
215 }
216 on_applied(applied.clone());
217 }
218 batch_high = WatchCursor::none();
219 }
220 batch_deadline = None;
221 needs_compact
222 }};
223 }
224
225 // Flush, then run compaction off the hot path if the log grew too large.
226 // Compaction reads and rewrites the whole snapshot file, so it must not run
227 // on the async reactor thread — `spawn_blocking` moves it to the blocking
228 // pool and we reclaim the writer afterward.
229 macro_rules! flush_and_compact {
230 () => {{
231 if flush_inner!()
232 && let Some(mut sw) = snapshot.take()
233 {
234 // Return the writer to the closure unconditionally so a *failed*
235 // compaction (Ok(Err)) still hands the writer back — checkpoints
236 // continue, only this compaction was skipped. A *panicked*
237 // blocking task (Err) drops the writer on the blocking thread and
238 // we can't recover it; rather than silently run the rest of the
239 // watch without persistence — which breaks the resume-after-restart
240 // guarantee — we surface it as a fatal error.
241 match tokio::task::spawn_blocking(move || {
242 let res = sw.compact();
243 (sw, res)
244 })
245 .await
246 {
247 Ok((sw, Ok(()))) => snapshot = Some(sw),
248 Ok((sw, Err(e))) => {
249 warn!(error = %e, "snapshot compaction failed; continuing without compacting");
250 snapshot = Some(sw);
251 }
252 Err(e) => {
253 warn!(error = %e, "snapshot compaction task panicked; aborting watch");
254 handle.abort();
255 return Err(KvError::WatchError(format!(
256 "snapshot compaction task panicked: {e}"
257 )));
258 }
259 }
260 }
261 }};
262 }
263
264 // A single timer future, reset in place each time a batch opens. The old
265 // `tokio::time::sleep(timeout)` lived inside the select arm, so it was
266 // re-created on every loop iteration — one Arc-backed timer-wheel entry
267 // allocated, registered, and immediately dropped per received update.
268 // Pinning one future and `reset`-ing it reuses that single allocation; the
269 // `if batch_deadline.is_some()` guard keeps it from firing while idle, so
270 // its initial already-elapsed deadline is never observed.
271 let sleep = tokio::time::sleep(Duration::ZERO);
272 tokio::pin!(sleep);
273
274 loop {
275 tokio::select! {
276 biased;
277
278 // Shutdown wins: flush whatever is batched (so the cursor reflects
279 // it), abandon any updates still in flight on the channel — they
280 // weren't applied, the cursor doesn't claim them, and they'll be
281 // re-delivered on the next resume — and return the applied cursor.
282 res = shutdown.changed() => {
283 if res.is_err() || *shutdown.borrow() {
284 flush_and_compact!();
285 handle.abort();
286 // Observe the task's terminal state. An abort surfaces as a
287 // cancelled JoinError, which we ignore; a genuine panic that
288 // raced ahead of the abort is logged rather than silently lost.
289 if let Err(join) = handle.await
290 && !join.is_cancelled()
291 {
292 warn!(error = %join, "watch task panicked at shutdown");
293 }
294 return Ok(applied);
295 }
296 }
297
298 // Batch window elapsed.
299 () = &mut sleep, if batch_deadline.is_some() => {
300 flush_and_compact!();
301 }
302
303 update = rx.recv() => {
304 match update {
305 Some(u) => {
306 // Cursor authority: every received update bumps the
307 // pending high-water, regardless of whether `parse`
308 // keeps it.
309 batch_high = WatchCursor::from_version(u.version().clone());
310
311 // Stream the raw update to the snapshot log as it
312 // arrives. The durable checkpoint is written later at
313 // the applied cursor, so a crash here just means the log
314 // holds data ahead of its cursor — re-applied on resume,
315 // never skipped.
316 if let Some(sw) = snapshot.as_mut()
317 && let Err(e) = sw.write_update(&u)
318 {
319 warn!(error = %e, "snapshot write failed");
320 }
321
322 if let Some(parsed) = parse(&u) {
323 batch.push(parsed);
324 }
325
326 // Arm the window on the first received update of a batch
327 // — even a parse-rejected one, so the cursor advances
328 // within `window` even through a run of irrelevant keys.
329 // Reset the pinned timer to the new deadline rather than
330 // allocating a fresh `Sleep`.
331 if batch_deadline.is_none() {
332 let deadline = tokio::time::Instant::now() + config.window;
333 sleep.as_mut().reset(deadline);
334 batch_deadline = Some(deadline);
335 }
336
337 if batch.len() >= config.max {
338 flush_and_compact!();
339 }
340 }
341 None => {
342 // Stream closed. Flush the remainder, then surface the
343 // watch task's terminal result: a clean end returns the
344 // applied cursor, an error propagates.
345 flush_and_compact!();
346 return match handle.await {
347 Ok(Ok(())) => Ok(applied),
348 Ok(Err(e)) => Err(e),
349 Err(join) => Err(KvError::WatchError(format!(
350 "watch task panicked: {join}"
351 ))),
352 };
353 }
354 }
355 }
356 }
357 }
358}
359
360/// Run the underlying watch for `scope`, resuming from `resume` when it carries
361/// a position, with the [`KvError::CursorExpired`] → full-watch fallback.
362async fn run_watch(
363 watcher: &dyn KvWatcher,
364 scope: &WatchScope,
365 resume: Option<WatchCursor>,
366 tx: mpsc::Sender<KvUpdate>,
367) -> Result<(), KvError> {
368 // Resume only when the cursor carries a real position; an absent or `none()`
369 // cursor falls through to a full watch. Binding `cursor` here makes "we have a
370 // resume position" structural — there is no separate bool whose truth a later
371 // edit could let drift from the `Some`.
372 let resume_cursor = resume.filter(|c| !c.is_none());
373
374 match scope {
375 WatchScope::All => {
376 if let Some(cursor) = resume_cursor {
377 match watcher.watch_all_from(&cursor, tx.clone()).await {
378 Err(KvError::CursorExpired) => {
379 // TODO(v2): signal a "resync" to the caller so it can
380 // diff the full re-list against prior state and emit
381 // synthetic deletes for keys that vanished during the
382 // gap (see Snapshot::stale_keys). For v1 the full
383 // re-list is replayed as a stream of puts, matching the
384 // hand-rolled loops this combinator replaces.
385 warn!("watch cursor expired, falling back to full watch_all");
386 watcher.watch_all(tx).await
387 }
388 other => other,
389 }
390 } else {
391 watcher.watch_all(tx).await
392 }
393 }
394 WatchScope::Prefix(prefix) => {
395 if let Some(cursor) = resume_cursor {
396 match watcher.watch_prefix_from(prefix, &cursor, tx.clone()).await {
397 Err(KvError::CursorExpired) => {
398 // TODO(v2): see the watch_all arm above.
399 warn!("watch cursor expired, falling back to full watch_prefix");
400 watcher.watch_prefix(prefix, tx).await
401 }
402 other => other,
403 }
404 } else {
405 watcher.watch_prefix(prefix, tx).await
406 }
407 }
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414 use crate::kv::{KvEntry, VersionToken};
415 use async_trait::async_trait;
416 use std::sync::Mutex;
417 use std::sync::atomic::{AtomicU64, Ordering};
418 use tokio::sync::mpsc::Sender;
419
420 fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
421 KvUpdate::Put(KvEntry {
422 key: key.to_string(),
423 value: value.to_vec(),
424 version: VersionToken::from_u64(rev),
425 })
426 }
427
428 /// A scripted watcher. Delivers a pre-set list of updates through the
429 /// channel, then either holds the channel open (so window/max/shutdown
430 /// flushes can be exercised without the stream ending) or returns cleanly
431 /// (so channel-close flushing can be exercised).
432 struct MockWatcher {
433 full: Mutex<Option<Vec<KvUpdate>>>,
434 from: Mutex<Option<Vec<KvUpdate>>>,
435 from_expires: bool,
436 hold: bool,
437 }
438
439 impl MockWatcher {
440 fn new(updates: Vec<KvUpdate>, hold: bool) -> Self {
441 Self {
442 full: Mutex::new(Some(updates)),
443 from: Mutex::new(None),
444 from_expires: false,
445 hold,
446 }
447 }
448
449 async fn deliver(&self, which: &Mutex<Option<Vec<KvUpdate>>>, tx: Sender<KvUpdate>) {
450 let updates = which.lock().unwrap().take().unwrap_or_default();
451 for u in updates {
452 if tx.send(u).await.is_err() {
453 return;
454 }
455 }
456 if self.hold {
457 // Keep `tx` alive (channel open) until this task is aborted.
458 std::future::pending::<()>().await;
459 }
460 }
461 }
462
463 #[async_trait]
464 impl KvWatcher for MockWatcher {
465 async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
466 self.deliver(&self.full, tx).await;
467 Ok(())
468 }
469
470 async fn watch_prefix(&self, _prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
471 self.deliver(&self.full, tx).await;
472 Ok(())
473 }
474
475 async fn watch_all_from(
476 &self,
477 _cursor: &WatchCursor,
478 tx: Sender<KvUpdate>,
479 ) -> Result<(), KvError> {
480 if self.from_expires {
481 return Err(KvError::CursorExpired);
482 }
483 self.deliver(&self.from, tx).await;
484 Ok(())
485 }
486
487 // Mirror watch_all_from so the prefix resume / expiry arms of run_watch
488 // are exercised against the same `from` script. Without this the trait's
489 // default impl would delegate to watch_prefix and silently deliver the
490 // full set instead of the delta.
491 async fn watch_prefix_from(
492 &self,
493 _prefix: &str,
494 _cursor: &WatchCursor,
495 tx: Sender<KvUpdate>,
496 ) -> Result<(), KvError> {
497 if self.from_expires {
498 return Err(KvError::CursorExpired);
499 }
500 self.deliver(&self.from, tx).await;
501 Ok(())
502 }
503 }
504
505 /// A watcher whose entry points all fail. Used to prove the watch task's
506 /// terminal error is surfaced out of `watch_applied` rather than swallowed
507 /// as a clean `Ok(applied)` when the channel closes.
508 struct ErrorWatcher;
509
510 #[async_trait]
511 impl KvWatcher for ErrorWatcher {
512 async fn watch_all(&self, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
513 Err(KvError::WatchError("injected watch failure".into()))
514 }
515
516 async fn watch_prefix(&self, _prefix: &str, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
517 Err(KvError::WatchError("injected watch failure".into()))
518 }
519 }
520
521 // A no-op parse that keeps every Put as the value bytes; drops deletes.
522 fn parse_put(u: &KvUpdate) -> Option<Vec<u8>> {
523 match u {
524 KvUpdate::Put(e) => Some(e.value.clone()),
525 _ => None,
526 }
527 }
528
529 /// The stream closes (hold = false) with a pending batch; the remainder is
530 /// flushed before returning, the returned cursor is the last revision, and
531 /// `on_applied` ran exactly once after `apply`.
532 #[tokio::test]
533 async fn flush_on_channel_close() {
534 let updates = vec![put("a", b"1", 1), put("b", b"2", 2), put("c", b"3", 3)];
535 let watcher = Arc::new(MockWatcher::new(updates, false));
536
537 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<Vec<u8>>>::new()));
538 let on_applied_cursors = Arc::new(Mutex::new(Vec::<u64>::new()));
539
540 let ab = Arc::clone(&applied_batches);
541 let oc = Arc::clone(&on_applied_cursors);
542 let (_sd_tx, sd_rx) = watch::channel(false);
543
544 let cursor = watch_applied(
545 watcher,
546 WatchScope::All,
547 None,
548 None,
549 BatchConfig::default(),
550 parse_put,
551 move |batch| ab.lock().unwrap().push(batch),
552 move |c| oc.lock().unwrap().push(c.as_u64().unwrap()),
553 sd_rx,
554 )
555 .await
556 .unwrap();
557
558 assert_eq!(cursor.as_u64(), Some(3));
559 let batches = applied_batches.lock().unwrap();
560 let flat: Vec<Vec<u8>> = batches.iter().flatten().cloned().collect();
561 assert_eq!(flat, vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec()]);
562 assert_eq!(*on_applied_cursors.lock().unwrap().last().unwrap(), 3);
563 }
564
565 /// Fewer than `max` updates, then the channel idles: the window timer must
566 /// flush them and advance the cursor.
567 #[tokio::test(start_paused = true)]
568 async fn flush_on_window() {
569 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
570 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
571
572 let applied = Arc::new(AtomicU64::new(0));
573 let count = Arc::new(AtomicU64::new(0));
574 let a = Arc::clone(&applied);
575 let c = Arc::clone(&count);
576 let (sd_tx, sd_rx) = watch::channel(false);
577
578 let task = tokio::spawn(watch_applied(
579 watcher,
580 WatchScope::All,
581 None,
582 None,
583 BatchConfig::default(),
584 parse_put,
585 move |batch: Vec<Vec<u8>>| {
586 c.fetch_add(batch.len() as u64, Ordering::SeqCst);
587 },
588 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
589 sd_rx,
590 ));
591
592 // Let the window (10ms) elapse under virtual time.
593 tokio::time::sleep(Duration::from_millis(50)).await;
594 assert_eq!(
595 count.load(Ordering::SeqCst),
596 2,
597 "window should have flushed"
598 );
599 assert_eq!(applied.load(Ordering::SeqCst), 2);
600
601 sd_tx.send(true).unwrap();
602 let cursor = task.await.unwrap().unwrap();
603 assert_eq!(cursor.as_u64(), Some(2));
604 }
605
606 /// Exactly `max` updates fills a batch and flushes immediately — before the
607 /// window would have elapsed.
608 #[tokio::test(start_paused = true)]
609 async fn flush_on_max() {
610 let max = 4;
611 let updates: Vec<_> = (1..=max as u64)
612 .map(|i| put(&format!("k{i}"), b"v", i))
613 .collect();
614 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
615
616 let flushes = Arc::new(Mutex::new(Vec::<usize>::new()));
617 let f = Arc::clone(&flushes);
618 let (sd_tx, sd_rx) = watch::channel(false);
619
620 let task = tokio::spawn(watch_applied(
621 watcher,
622 WatchScope::All,
623 None,
624 None,
625 BatchConfig {
626 window: Duration::from_secs(3600), // effectively never
627 max,
628 },
629 parse_put,
630 move |batch: Vec<Vec<u8>>| f.lock().unwrap().push(batch.len()),
631 move |_| {},
632 sd_rx,
633 ));
634
635 // Yield enough for the mock to push all `max` updates; the window is an
636 // hour, so any flush is purely the max trigger.
637 tokio::time::sleep(Duration::from_millis(1)).await;
638 assert_eq!(
639 *flushes.lock().unwrap(),
640 vec![max],
641 "a full batch should flush on max, not wait for the window"
642 );
643
644 sd_tx.send(true).unwrap();
645 task.await.unwrap().unwrap();
646 }
647
648 /// A pending batch plus a shutdown signal: the batch is flushed and the
649 /// applied cursor returned.
650 #[tokio::test(start_paused = true)]
651 async fn flush_on_shutdown() {
652 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
653 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
654
655 let applied = Arc::new(AtomicU64::new(0));
656 let a = Arc::clone(&applied);
657 let (sd_tx, sd_rx) = watch::channel(false);
658
659 let task = tokio::spawn(watch_applied(
660 watcher,
661 WatchScope::All,
662 None,
663 None,
664 BatchConfig {
665 window: Duration::from_secs(3600), // window won't fire
666 max: 100,
667 },
668 parse_put,
669 move |_batch: Vec<Vec<u8>>| {},
670 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
671 sd_rx,
672 ));
673
674 // Give the mock time to deliver both updates into the pending batch.
675 tokio::time::sleep(Duration::from_millis(1)).await;
676 sd_tx.send(true).unwrap();
677
678 let cursor = task.await.unwrap().unwrap();
679 assert_eq!(
680 cursor.as_u64(),
681 Some(2),
682 "shutdown flushes the pending batch"
683 );
684 assert_eq!(applied.load(Ordering::SeqCst), 2);
685 }
686
687 /// The cursor must not advance until `apply` has returned. We prove it by
688 /// having `apply` read the cursor that `on_applied` last published: when the
689 /// second batch is applied, the visible cursor must still be the *first*
690 /// batch's — never the second's, which only becomes visible after this
691 /// `apply` returns.
692 #[tokio::test(start_paused = true)]
693 async fn cursor_advances_only_after_apply() {
694 // Two batches of `max` updates each.
695 let max = 2usize;
696 let updates: Vec<_> = (1..=4u64).map(|i| put(&format!("k{i}"), b"v", i)).collect();
697 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
698
699 // Cursor as last published by on_applied; starts at 0 (nothing applied).
700 let published = Arc::new(AtomicU64::new(0));
701 // What `apply` observed as the published cursor at the moment it ran.
702 let seen_at_apply = Arc::new(Mutex::new(Vec::<u64>::new()));
703
704 let pub_for_apply = Arc::clone(&published);
705 let seen = Arc::clone(&seen_at_apply);
706 let pub_for_on = Arc::clone(&published);
707 let (sd_tx, sd_rx) = watch::channel(false);
708
709 let task = tokio::spawn(watch_applied(
710 watcher,
711 WatchScope::All,
712 None,
713 None,
714 BatchConfig {
715 window: Duration::from_secs(3600),
716 max,
717 },
718 parse_put,
719 move |_batch: Vec<Vec<u8>>| {
720 // The cursor visible here is whatever the PREVIOUS flush
721 // published — never this batch's, because we haven't returned.
722 seen.lock()
723 .unwrap()
724 .push(pub_for_apply.load(Ordering::SeqCst));
725 },
726 move |cur| pub_for_on.store(cur.as_u64().unwrap(), Ordering::SeqCst),
727 sd_rx,
728 ));
729
730 tokio::time::sleep(Duration::from_millis(1)).await;
731 sd_tx.send(true).unwrap();
732 task.await.unwrap().unwrap();
733
734 // First apply saw 0 (nothing applied yet); second apply saw 2 (first
735 // batch's cursor), NOT 4. The cursor only reached 4 after the second
736 // apply returned.
737 assert_eq!(*seen_at_apply.lock().unwrap(), vec![0, 2]);
738 assert_eq!(published.load(Ordering::SeqCst), 4);
739 }
740
741 /// Updates whose `parse` returns `None` (corrupt / irrelevant) carry no
742 /// domain work, but they were still received — so the cursor must advance
743 /// over them.
744 #[tokio::test]
745 async fn corrupt_parse_entries_advance_cursor() {
746 let updates = vec![put("a", b"1", 5), put("b", b"2", 6), put("c", b"3", 7)];
747 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
748
749 let apply_calls = Arc::new(AtomicU64::new(0));
750 let on_applied_max = Arc::new(AtomicU64::new(0));
751 let ac = Arc::clone(&apply_calls);
752 let om = Arc::clone(&on_applied_max);
753 let (_sd_tx, sd_rx) = watch::channel(false);
754
755 let cursor = watch_applied(
756 watcher,
757 WatchScope::All,
758 None,
759 None,
760 BatchConfig::default(),
761 // Reject everything — simulates corrupt/irrelevant entries.
762 |_u: &KvUpdate| -> Option<Vec<u8>> { None },
763 move |batch: Vec<Vec<u8>>| {
764 ac.fetch_add(1, Ordering::SeqCst);
765 assert!(batch.is_empty());
766 },
767 move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
768 sd_rx,
769 )
770 .await
771 .unwrap();
772
773 assert_eq!(cursor.as_u64(), Some(7), "cursor covers rejected updates");
774 assert_eq!(
775 apply_calls.load(Ordering::SeqCst),
776 0,
777 "an all-rejected batch applies nothing"
778 );
779 assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
780 }
781
782 /// A resume whose cursor has expired falls back to the full watch and still
783 /// applies the delivered updates.
784 #[tokio::test]
785 async fn cursor_expired_falls_back_to_full_watch() {
786 let mock = MockWatcher {
787 full: Mutex::new(Some(vec![put("a", b"1", 10), put("b", b"2", 11)])),
788 from: Mutex::new(Some(vec![])),
789 from_expires: true,
790 hold: false,
791 };
792 let watcher = Arc::new(mock);
793
794 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
795 let ab = Arc::clone(&applied_batches);
796 let (_sd_tx, sd_rx) = watch::channel(false);
797
798 let cursor = watch_applied(
799 watcher,
800 WatchScope::All,
801 Some(WatchCursor::from_u64(5)), // resume position that "expired"
802 None,
803 BatchConfig::default(),
804 parse_put,
805 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
806 move |_| {},
807 sd_rx,
808 )
809 .await
810 .unwrap();
811
812 assert_eq!(cursor.as_u64(), Some(11));
813 assert_eq!(
814 *applied_batches.lock().unwrap(),
815 vec![b"1".to_vec(), b"2".to_vec()],
816 "fallback full watch's updates were applied"
817 );
818 }
819
820 /// End-to-end with a real snapshot file: after the run, the persisted
821 /// snapshot's cursor equals the applied cursor and its entries match the
822 /// applied state — proving the checkpoint is written at the post-apply
823 /// cursor, never ahead of it.
824 #[tokio::test]
825 async fn snapshot_checkpoint_matches_applied_cursor() {
826 let dir = tempfile::TempDir::new().unwrap();
827 let path = dir.path().join("applied.snap");
828 let writer = SnapshotWriter::open(&path, u64::MAX).unwrap();
829
830 let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
831 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
832 let (_sd_tx, sd_rx) = watch::channel(false);
833
834 let cursor = watch_applied(
835 watcher,
836 WatchScope::All,
837 None,
838 Some(writer),
839 BatchConfig::default(),
840 parse_put,
841 move |_batch: Vec<Vec<u8>>| {},
842 move |_| {},
843 sd_rx,
844 )
845 .await
846 .unwrap();
847
848 assert_eq!(cursor.as_u64(), Some(2));
849
850 let snap = crate::snapshot::load(&path).unwrap().unwrap();
851 assert_eq!(
852 snap.cursor.as_u64(),
853 cursor.as_u64(),
854 "snapshot checkpoint cursor must equal the applied cursor"
855 );
856 assert_eq!(snap.entries.len(), 2);
857 assert_eq!(snap.entries["node.a"].value, b"1");
858 assert_eq!(snap.entries["node.b"].value, b"2");
859 }
860
861 /// Happy-path resume: a non-expired cursor takes the `*_from` path and the
862 /// delta (the `from` script, NOT the full set) is applied. Proves the
863 /// resume branch delivers only post-cursor updates and advances to their
864 /// max revision.
865 #[tokio::test]
866 async fn resume_from_cursor_delivers_only_delta() {
867 let mock = MockWatcher {
868 // `full` would be delivered only if the resume path were (wrongly)
869 // bypassed; a non-empty distinguishing value makes that visible.
870 full: Mutex::new(Some(vec![put("full.x", b"FULL", 1)])),
871 from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
872 from_expires: false,
873 hold: false,
874 };
875 let watcher = Arc::new(mock);
876
877 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
878 let ab = Arc::clone(&applied_batches);
879 let (_sd_tx, sd_rx) = watch::channel(false);
880
881 let cursor = watch_applied(
882 watcher,
883 WatchScope::All,
884 Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
885 None,
886 BatchConfig::default(),
887 parse_put,
888 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
889 move |_| {},
890 sd_rx,
891 )
892 .await
893 .unwrap();
894
895 assert_eq!(
896 cursor.as_u64(),
897 Some(11),
898 "cursor advances to the delta max"
899 );
900 assert_eq!(
901 *applied_batches.lock().unwrap(),
902 vec![b"3".to_vec(), b"4".to_vec()],
903 "only the post-cursor delta is applied, never the full set"
904 );
905 }
906
907 /// `WatchScope::Prefix` with no resume dispatches to `watch_prefix` and
908 /// applies the delivered updates. Every other test uses `WatchScope::All`;
909 /// this covers the prefix dispatch arm.
910 #[tokio::test]
911 async fn prefix_scope_applies_delivered_updates() {
912 let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
913 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
914
915 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
916 let ab = Arc::clone(&applied_batches);
917 let (_sd_tx, sd_rx) = watch::channel(false);
918
919 let cursor = watch_applied(
920 watcher,
921 WatchScope::Prefix("node.".to_string()),
922 None,
923 None,
924 BatchConfig::default(),
925 parse_put,
926 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
927 move |_| {},
928 sd_rx,
929 )
930 .await
931 .unwrap();
932
933 assert_eq!(cursor.as_u64(), Some(2));
934 assert_eq!(
935 *applied_batches.lock().unwrap(),
936 vec![b"1".to_vec(), b"2".to_vec()]
937 );
938 }
939
940 /// `WatchScope::Prefix` resume whose cursor has expired falls back to the
941 /// full `watch_prefix` and still applies the delivered updates — the prefix
942 /// twin of `cursor_expired_falls_back_to_full_watch`.
943 #[tokio::test]
944 async fn prefix_cursor_expired_falls_back_to_full_prefix_watch() {
945 let mock = MockWatcher {
946 full: Mutex::new(Some(vec![put("node.a", b"1", 10), put("node.b", b"2", 11)])),
947 from: Mutex::new(Some(vec![])),
948 from_expires: true,
949 hold: false,
950 };
951 let watcher = Arc::new(mock);
952
953 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
954 let ab = Arc::clone(&applied_batches);
955 let (_sd_tx, sd_rx) = watch::channel(false);
956
957 let cursor = watch_applied(
958 watcher,
959 WatchScope::Prefix("node.".to_string()),
960 Some(WatchCursor::from_u64(5)), // resume position that "expired"
961 None,
962 BatchConfig::default(),
963 parse_put,
964 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
965 move |_| {},
966 sd_rx,
967 )
968 .await
969 .unwrap();
970
971 assert_eq!(cursor.as_u64(), Some(11));
972 assert_eq!(
973 *applied_batches.lock().unwrap(),
974 vec![b"1".to_vec(), b"2".to_vec()],
975 "prefix fallback full watch's updates were applied"
976 );
977 }
978
979 /// The watch task's terminal error must propagate out of `watch_applied`
980 /// rather than being swallowed as `Ok(applied)` when the channel closes.
981 #[tokio::test]
982 async fn watch_task_error_propagates() {
983 let watcher = Arc::new(ErrorWatcher);
984 let (_sd_tx, sd_rx) = watch::channel(false);
985
986 let result = watch_applied(
987 watcher,
988 WatchScope::All,
989 None,
990 None,
991 BatchConfig::default(),
992 parse_put,
993 move |_batch: Vec<Vec<u8>>| {},
994 move |_| {},
995 sd_rx,
996 )
997 .await;
998
999 match result {
1000 Err(KvError::WatchError(msg)) => {
1001 assert!(msg.contains("injected"), "error carries the cause: {msg}");
1002 }
1003 other => panic!("expected WatchError, got {other:?}"),
1004 }
1005 }
1006
1007 /// A batch where `parse` accepts some updates and rejects others: the cursor
1008 /// must still advance to the highest *received* revision (covering the
1009 /// rejected entry in the middle), while `apply` sees only the accepted ones.
1010 #[tokio::test]
1011 async fn mixed_parse_advances_cursor_over_rejected_entries() {
1012 let updates = vec![
1013 put("keep.a", b"1", 5),
1014 put("skip.b", b"2", 6), // rejected by parse
1015 put("keep.c", b"3", 7),
1016 ];
1017 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1018
1019 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1020 let on_applied_max = Arc::new(AtomicU64::new(0));
1021 let ab = Arc::clone(&applied_batches);
1022 let om = Arc::clone(&on_applied_max);
1023 let (_sd_tx, sd_rx) = watch::channel(false);
1024
1025 let cursor = watch_applied(
1026 watcher,
1027 WatchScope::All,
1028 None,
1029 None,
1030 BatchConfig::default(),
1031 // Keep only keys under "keep."; reject everything else.
1032 |u: &KvUpdate| -> Option<Vec<u8>> {
1033 match u {
1034 KvUpdate::Put(e) if e.key.starts_with("keep.") => Some(e.value.clone()),
1035 _ => None,
1036 }
1037 },
1038 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1039 move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1040 sd_rx,
1041 )
1042 .await
1043 .unwrap();
1044
1045 assert_eq!(
1046 cursor.as_u64(),
1047 Some(7),
1048 "cursor covers the rejected middle entry (rev 6)"
1049 );
1050 assert_eq!(
1051 *applied_batches.lock().unwrap(),
1052 vec![b"1".to_vec(), b"3".to_vec()],
1053 "apply sees only the accepted entries"
1054 );
1055 assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1056 }
1057
1058 /// Shutdown before any update arrives: nothing was received, so the cursor
1059 /// stays at the resume position (here `none()`), `apply` never runs, and
1060 /// `on_applied` never fires.
1061 #[tokio::test(start_paused = true)]
1062 async fn shutdown_with_no_pending_batch() {
1063 let watcher = Arc::new(MockWatcher::new(vec![], true)); // deliver nothing, hold open
1064
1065 let apply_calls = Arc::new(AtomicU64::new(0));
1066 let on_applied_calls = Arc::new(AtomicU64::new(0));
1067 let ac = Arc::clone(&apply_calls);
1068 let oc = Arc::clone(&on_applied_calls);
1069 let (sd_tx, sd_rx) = watch::channel(false);
1070
1071 let task = tokio::spawn(watch_applied(
1072 watcher,
1073 WatchScope::All,
1074 None,
1075 None,
1076 BatchConfig::default(),
1077 parse_put,
1078 move |_batch: Vec<Vec<u8>>| {
1079 ac.fetch_add(1, Ordering::SeqCst);
1080 },
1081 move |_| {
1082 oc.fetch_add(1, Ordering::SeqCst);
1083 },
1084 sd_rx,
1085 ));
1086
1087 // Let the watcher attach and idle (it has nothing to deliver), then shut down.
1088 tokio::time::sleep(Duration::from_millis(1)).await;
1089 sd_tx.send(true).unwrap();
1090
1091 let cursor = task.await.unwrap().unwrap();
1092 assert_eq!(
1093 cursor.as_u64(),
1094 None,
1095 "no updates received → cursor unmoved"
1096 );
1097 assert_eq!(apply_calls.load(Ordering::SeqCst), 0, "apply never runs");
1098 assert_eq!(
1099 on_applied_calls.load(Ordering::SeqCst),
1100 0,
1101 "on_applied never fires"
1102 );
1103 }
1104
1105 /// With a low `compact_threshold`, the flush path's `spawn_blocking`
1106 /// compaction actually fires (every other snapshot test pins the threshold
1107 /// at `u64::MAX`, leaving that branch dead). After a compacting run the
1108 /// snapshot must still load cleanly with the right cursor and entries.
1109 #[tokio::test]
1110 async fn snapshot_compaction_fires_and_stays_consistent() {
1111 let dir = tempfile::TempDir::new().unwrap();
1112 let path = dir.path().join("applied.snap");
1113 // threshold 0 → every checkpoint reports "needs compact", forcing the
1114 // spawn_blocking compaction branch on each flush.
1115 let writer = SnapshotWriter::open(&path, 0).unwrap();
1116
1117 // Re-put the same key across flushes so compaction has duplicates to
1118 // dedup; small max forces multiple flushes (hence multiple compactions).
1119 let updates = vec![
1120 put("node.a", b"1", 1),
1121 put("node.a", b"2", 2),
1122 put("node.b", b"3", 3),
1123 put("node.a", b"4", 4),
1124 ];
1125 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1126 let (_sd_tx, sd_rx) = watch::channel(false);
1127
1128 let cursor = watch_applied(
1129 watcher,
1130 WatchScope::All,
1131 None,
1132 Some(writer),
1133 BatchConfig {
1134 window: Duration::from_secs(3600),
1135 max: 1, // one update per flush → a compaction per update
1136 },
1137 parse_put,
1138 move |_batch: Vec<Vec<u8>>| {},
1139 move |_| {},
1140 sd_rx,
1141 )
1142 .await
1143 .unwrap();
1144
1145 assert_eq!(cursor.as_u64(), Some(4));
1146
1147 let snap = crate::snapshot::load(&path).unwrap().unwrap();
1148 assert_eq!(
1149 snap.cursor.as_u64(),
1150 cursor.as_u64(),
1151 "compacted snapshot's cursor still equals the applied cursor"
1152 );
1153 assert_eq!(snap.entries.len(), 2, "duplicates of node.a deduped");
1154 assert_eq!(
1155 snap.entries["node.a"].value, b"4",
1156 "last write per key survives compaction"
1157 );
1158 assert_eq!(snap.entries["node.b"].value, b"3");
1159 }
1160}