slipstream/applied.rs
1//! Cursor-after-apply watch combinator.
2//!
3//! [`watch_applied`] drives a [`KvWatcher`], batches incoming [`KvUpdate`]s over
4//! a short window (or a max count), hands each batch to a caller-supplied
5//! `apply` closure, and **only then** advances the resume cursor, checkpoints
6//! the snapshot, and fires `on_applied`. It encodes one discipline that every
7//! hand-rolled watch loop in the wider system gets subtly wrong:
8//!
9//! > **INVARIANT.** A persisted/reported cursor `C` implies every update with
10//! > revision ≤ `C` has been *applied* — the caller's `apply()` has returned for
11//! > it. The cursor never advances on *receipt* of an update, only after it has
12//! > durably taken effect.
13//!
14//! ## Why receipt is the wrong signal
15//!
16//! The tempting shortcut is to bump the cursor as each update arrives off the
17//! channel (`high_water = rev` on `rx.recv()`), then apply the batch later. On a
18//! crash between those two steps the persisted cursor claims "caught up to rev
19//! N" while rev N is still sitting in an unapplied buffer. On resume the watch
20//! starts *past* rev N and silently skips it — a correctness hole in the exact
21//! "resume after any restart" guarantee this crate advertises.
22//!
23//! Saltzer, Reed & Clark's *End-to-End Arguments in System Design* (1984) names
24//! the fix: a function placed below the endpoints (here, the channel receive)
25//! can only be a performance hint; the *endpoint* — the application of the
26//! update — is the only place the "it happened" guarantee can actually be
27//! established. So the cursor is written from `apply()`'s completion, not from
28//! the transport's delivery.
29//!
30//! The cursor-as-monotonic-index-into-a-log shape itself follows HashiCorp
31//! Consul's anti-entropy / blocking-query lineage: a client holds the last index
32//! it has *reconciled* and re-arms the watch from there, never from the index it
33//! merely *saw*.
34//!
35//! ## What the caller supplies
36//!
37//! - `parse`: maps a raw [`KvUpdate`] to an optional domain value `U`. Returning
38//! `None` (corrupt bytes, irrelevant key) is fine — the update is still
39//! *received*, so it still counts toward the cursor; there is simply nothing to
40//! apply for it.
41//! - `apply`: consumes a `Vec<U>` in revision order. This is the only domain
42//! logic; for the tunnel router it swaps the route table, for the edge origin
43//! watcher it rebuilds the hashrings.
44//! - `on_applied`: fires once per flush, *after* `apply` returns, with the new
45//! applied cursor. Callers use it to persist the cursor for the next restart.
46//!
47//! ## Panics
48//!
49//! `apply` runs inline on the watch task. If it panics, the panic propagates out
50//! of [`watch_applied`] and aborts the watch — that is the caller's contract,
51//! the same as a panic in any other supplied closure.
52
53use std::sync::Arc;
54use std::time::Duration;
55
56use tokio::sync::mpsc;
57use tokio::sync::watch;
58use tracing::warn;
59
60use crate::kv::{KvError, KvUpdate, KvWatcher, WatchCursor};
61use crate::snapshot::SnapshotStore;
62
63/// What to watch: every key, or every key under a prefix.
64///
65/// Mirrors the [`KvWatcher`] surface — `All` maps to `watch_all` /
66/// `watch_all_from`, `Prefix` to `watch_prefix` / `watch_prefix_from`.
67#[derive(Debug, Clone)]
68pub enum WatchScope {
69 /// Watch all keys in the bucket.
70 All,
71 /// Watch only keys beginning with this prefix.
72 Prefix(String),
73}
74
75/// Batching policy for [`watch_applied`].
76///
77/// A flush fires when **either** bound is hit, whichever comes first: `window`
78/// time has elapsed since the batch opened, or `max` updates have accumulated.
79/// The window amortizes the cost of `apply` (e.g. one route-table clone per
80/// flush instead of one per update); `max` caps memory and latency when updates
81/// arrive faster than the window.
82#[derive(Debug, Clone, Copy)]
83pub struct BatchConfig {
84 /// Maximum time a batch stays open before being flushed.
85 pub window: Duration,
86 /// Maximum number of parsed updates in a batch before forcing a flush.
87 pub max: usize,
88}
89
90impl Default for BatchConfig {
91 /// 10 ms / 100 updates — the de-facto default every hand-rolled caller
92 /// already used, lifted into one place.
93 fn default() -> Self {
94 Self {
95 window: Duration::from_millis(10),
96 max: 100,
97 }
98 }
99}
100
101/// Drive a watch with cursor-after-apply semantics.
102///
103/// Subscribes per `scope` (resuming from `resume` when it carries a position),
104/// batches updates per `config`, applies each batch via `apply`, and only then
105/// advances the cursor / folds the batch into `store` / calls `on_applied`.
106/// Returns the final applied cursor when the watch ends (shutdown signalled, or
107/// the underlying stream closed).
108///
109/// `store` is any [`SnapshotStore`] backend the consumer chose (the in-RAM
110/// [`AppendLogSnapshot`](crate::AppendLogSnapshot) default, an on-disk backend, or
111/// its own impl) — or `None` to run without persistence. On each flush, *after*
112/// `apply` returns, the whole batch of raw [`KvUpdate`]s is handed to
113/// `store.apply(batch, applied_cursor)` on a blocking task, so the store's
114/// persisted cursor is always the post-apply cursor and never names a revision
115/// whose `apply` had not returned. The store fold is atomic (data + cursor), so a
116/// crash leaves the store consistent and resume re-folds only the tail.
117///
118/// On [`KvError::CursorExpired`] from the `*_from` resume path, this logs and
119/// falls back to a full-scope watch (`watch_all` / `watch_prefix`). Callers see
120/// the full re-list as a stream of puts, exactly as the hand-rolled loops did.
121///
122/// See `ARCHITECTURE.md` ("Applied-Cursor Watch") for the invariant and its
123/// rationale.
124///
125/// # Type parameters
126/// - `U`: the caller's domain update type, produced by `parse` and consumed by
127/// `apply`.
128// This combinator takes each of its dependencies as a parameter so every
129// caller-supplied closure (`parse`/`apply`/`on_applied`) keeps its own distinct
130// type and is monomorphized at the call site. Folding them into a builder struct
131// would either box the closures or force a single generic bundle, losing that.
132#[allow(clippy::too_many_arguments)]
133// The flush macro resets `batch_high`/`batch_deadline` for the next loop
134// iteration. At the two flush sites that return immediately afterward (shutdown,
135// channel-close) those resets are dead stores — correct, but flagged.
136#[allow(unused_assignments)]
137pub async fn watch_applied<U, S, P, A, O>(
138 watcher: Arc<dyn KvWatcher>,
139 scope: WatchScope,
140 resume: Option<WatchCursor>,
141 mut store: Option<S>,
142 config: BatchConfig,
143 mut parse: P,
144 mut apply: A,
145 mut on_applied: O,
146 mut shutdown: watch::Receiver<bool>,
147) -> Result<WatchCursor, KvError>
148where
149 U: Send,
150 // `Send + 'static`: each flush moves `store` onto a blocking task to run its
151 // (potentially blocking) `apply`, then takes it back — the same offload the
152 // append log's compaction always used.
153 S: SnapshotStore + Send + 'static,
154 P: FnMut(&KvUpdate) -> Option<U> + Send,
155 A: FnMut(Vec<U>) + Send,
156 O: FnMut(WatchCursor) + Send,
157{
158 // The cursor we'll return. Initialized from the resume position so that a
159 // watch which receives nothing new still reports the position it resumed
160 // from as "applied" (it is — everything up to it was applied before the last
161 // run persisted it).
162 let mut applied = match &resume {
163 Some(c) => c.clone(),
164 None => WatchCursor::none(),
165 };
166
167 // Spawn the watch task. It owns the cursor-expired fallback so the main loop
168 // only ever sees a clean ordered stream of updates on `rx`.
169 let (tx, mut rx) = mpsc::channel::<KvUpdate>(256);
170 let handle = {
171 let watcher = Arc::clone(&watcher);
172 tokio::spawn(async move { run_watch(watcher.as_ref(), &scope, resume, tx).await })
173 };
174
175 // Batch state.
176 //
177 // `batch_high` tracks the version of the most recently *received* update
178 // since the last flush — including updates `parse` rejected. NATS delivers
179 // in revision order, so the last received is the highest, and advancing the
180 // cursor to it after a single atomic `apply` is correct: having seen the max
181 // means we've seen everything below it, and a rejected entry is still
182 // "nothing to apply", hence covered. Reset to `none()` after every flush.
183 let batch_cap = config.max.clamp(1, 64);
184 let mut batch: Vec<U> = Vec::with_capacity(batch_cap);
185 // Raw received updates for the durable `store`, in revision order. Only
186 // populated when a `store` is present; the store folds the *raw* updates
187 // (including ones `parse` rejected — they are still part of the bucket's
188 // state), whereas the parsed `batch` above is the consumer's domain view.
189 let mut raw_batch: Vec<KvUpdate> = Vec::new();
190 let mut batch_high = WatchCursor::none();
191 // `Some` once a batch has opened and the window timer is armed; `None`
192 // between flushes. Only the armed/idle distinction is read in the loop —
193 // the absolute instant lives in the pinned `sleep` future below.
194 let mut batch_deadline: Option<tokio::time::Instant> = None;
195
196 // Flush the current batch, in order: run the domain `apply` (if non-empty) to
197 // completion, advance the cursor, fold the raw batch + cursor durably into
198 // `store`, then fire `on_applied`. The store fold runs on a blocking task
199 // (its `apply` may block on I/O), moving the store in and taking it back — the
200 // same offload the append log's compaction always used. A store error is
201 // logged and the watch continues (the snapshot is a cache); a panicked
202 // blocking task drops the store irrecoverably, which breaks the
203 // resume-after-restart guarantee, so it is surfaced as fatal.
204 macro_rules! flush {
205 () => {{
206 // Nothing received since the last flush → nothing to do at all.
207 if !batch.is_empty() || !batch_high.is_none() {
208 if !batch.is_empty() {
209 // INVARIANT: apply() runs and RETURNS before any cursor
210 // advance below. Move the batch out so a panicking apply
211 // can't leave half-consumed state behind.
212 //
213 // `replace` (not `take`) leaves a pre-sized Vec behind so each
214 // batch after the first doesn't re-climb the reallocation
215 // ladder (4→8→…→cap).
216 apply(std::mem::replace(&mut batch, Vec::with_capacity(batch_cap)));
217 }
218 if !batch_high.is_none() {
219 applied = batch_high.clone();
220 if let Some(mut st) = store.take() {
221 let raw = std::mem::take(&mut raw_batch);
222 let cur = applied.clone();
223 // Hand the store back unconditionally on a clean return so
224 // a *failed* apply (Ok(Err)) keeps the watch running; only
225 // a *panicked* task (Err) loses the store and is fatal.
226 match tokio::task::spawn_blocking(move || {
227 let res = st.apply(&raw, &cur);
228 (st, res)
229 })
230 .await
231 {
232 Ok((st, Ok(()))) => store = Some(st),
233 Ok((st, Err(e))) => {
234 warn!(error = %e, "snapshot store apply failed; continuing");
235 store = Some(st);
236 }
237 Err(e) => {
238 warn!(error = %e, "snapshot store task panicked; aborting watch");
239 handle.abort();
240 return Err(KvError::WatchError(format!(
241 "snapshot store task panicked: {e}"
242 )));
243 }
244 }
245 }
246 on_applied(applied.clone());
247 batch_high = WatchCursor::none();
248 }
249 }
250 batch_deadline = None;
251 }};
252 }
253
254 // A single timer future, reset in place each time a batch opens. The old
255 // `tokio::time::sleep(timeout)` lived inside the select arm, so it was
256 // re-created on every loop iteration — one Arc-backed timer-wheel entry
257 // allocated, registered, and immediately dropped per received update.
258 // Pinning one future and `reset`-ing it reuses that single allocation; the
259 // `if batch_deadline.is_some()` guard keeps it from firing while idle, so
260 // its initial already-elapsed deadline is never observed.
261 let sleep = tokio::time::sleep(Duration::ZERO);
262 tokio::pin!(sleep);
263
264 loop {
265 tokio::select! {
266 biased;
267
268 // Shutdown wins: flush whatever is batched (so the cursor reflects
269 // it), abandon any updates still in flight on the channel — they
270 // weren't applied, the cursor doesn't claim them, and they'll be
271 // re-delivered on the next resume — and return the applied cursor.
272 res = shutdown.changed() => {
273 if res.is_err() || *shutdown.borrow() {
274 flush!();
275 handle.abort();
276 // Observe the task's terminal state. An abort surfaces as a
277 // cancelled JoinError, which we ignore; a genuine panic that
278 // raced ahead of the abort is logged rather than silently lost.
279 if let Err(join) = handle.await
280 && !join.is_cancelled()
281 {
282 warn!(error = %join, "watch task panicked at shutdown");
283 }
284 return Ok(applied);
285 }
286 }
287
288 // Batch window elapsed.
289 () = &mut sleep, if batch_deadline.is_some() => {
290 flush!();
291 }
292
293 update = rx.recv() => {
294 match update {
295 Some(u) => {
296 // Cursor authority: every received update bumps the
297 // pending high-water, regardless of whether `parse`
298 // keeps it.
299 batch_high = WatchCursor::from_version(u.version().clone());
300
301 // Buffer the raw update for the durable store fold (which
302 // commits the whole batch + cursor atomically on flush).
303 // Done before `parse` consumes `u` by reference, and only
304 // when a store is present so the no-persistence path keeps
305 // its zero-copy cost.
306 if store.is_some() {
307 raw_batch.push(u.clone());
308 }
309
310 if let Some(parsed) = parse(&u) {
311 batch.push(parsed);
312 }
313
314 // Arm the window on the first received update of a batch
315 // — even a parse-rejected one, so the cursor advances
316 // within `window` even through a run of irrelevant keys.
317 // Reset the pinned timer to the new deadline rather than
318 // allocating a fresh `Sleep`.
319 if batch_deadline.is_none() {
320 let deadline = tokio::time::Instant::now() + config.window;
321 sleep.as_mut().reset(deadline);
322 batch_deadline = Some(deadline);
323 }
324
325 // Flush on a full parsed batch, or — when persisting — a
326 // full raw batch, so a window packed with parse-rejected
327 // updates can't grow `raw_batch` without bound before the
328 // window elapses.
329 if batch.len() >= config.max || raw_batch.len() >= config.max {
330 flush!();
331 }
332 }
333 None => {
334 // Stream closed. Flush the remainder, then surface the
335 // watch task's terminal result: a clean end returns the
336 // applied cursor, an error propagates.
337 flush!();
338 return match handle.await {
339 Ok(Ok(())) => Ok(applied),
340 Ok(Err(e)) => Err(e),
341 Err(join) => Err(KvError::WatchError(format!(
342 "watch task panicked: {join}"
343 ))),
344 };
345 }
346 }
347 }
348 }
349 }
350}
351
352/// Run the underlying watch for `scope`, resuming from `resume` when it carries
353/// a position, with the [`KvError::CursorExpired`] → full-watch fallback.
354async fn run_watch(
355 watcher: &dyn KvWatcher,
356 scope: &WatchScope,
357 resume: Option<WatchCursor>,
358 tx: mpsc::Sender<KvUpdate>,
359) -> Result<(), KvError> {
360 // Resume only when the cursor carries a real position; an absent or `none()`
361 // cursor falls through to a full watch. Binding `cursor` here makes "we have a
362 // resume position" structural — there is no separate bool whose truth a later
363 // edit could let drift from the `Some`.
364 let resume_cursor = resume.filter(|c| !c.is_none());
365
366 match scope {
367 WatchScope::All => {
368 if let Some(cursor) = resume_cursor {
369 match watcher.watch_all_from(&cursor, tx.clone()).await {
370 Err(KvError::CursorExpired) => {
371 // TODO(v2): signal a "resync" to the caller so it can
372 // diff the full re-list against prior state and emit
373 // synthetic deletes for keys that vanished during the
374 // gap (see Snapshot::stale_keys). For v1 the full
375 // re-list is replayed as a stream of puts, matching the
376 // hand-rolled loops this combinator replaces.
377 warn!("watch cursor expired, falling back to full watch_all");
378 watcher.watch_all(tx).await
379 }
380 other => other,
381 }
382 } else {
383 watcher.watch_all(tx).await
384 }
385 }
386 WatchScope::Prefix(prefix) => {
387 if let Some(cursor) = resume_cursor {
388 match watcher.watch_prefix_from(prefix, &cursor, tx.clone()).await {
389 Err(KvError::CursorExpired) => {
390 // TODO(v2): see the watch_all arm above.
391 warn!("watch cursor expired, falling back to full watch_prefix");
392 watcher.watch_prefix(prefix, tx).await
393 }
394 other => other,
395 }
396 } else {
397 watcher.watch_prefix(prefix, tx).await
398 }
399 }
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406 use crate::kv::{KvEntry, VersionToken};
407 use crate::snapshot::AppendLogSnapshot;
408 use async_trait::async_trait;
409 use std::sync::Mutex;
410 use std::sync::atomic::{AtomicU64, Ordering};
411 use tokio::sync::mpsc::Sender;
412
413 fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
414 KvUpdate::Put(KvEntry {
415 key: key.to_string(),
416 value: value.to_vec(),
417 version: VersionToken::from_u64(rev),
418 })
419 }
420
421 /// A scripted watcher. Delivers a pre-set list of updates through the
422 /// channel, then either holds the channel open (so window/max/shutdown
423 /// flushes can be exercised without the stream ending) or returns cleanly
424 /// (so channel-close flushing can be exercised).
425 struct MockWatcher {
426 full: Mutex<Option<Vec<KvUpdate>>>,
427 from: Mutex<Option<Vec<KvUpdate>>>,
428 from_expires: bool,
429 hold: bool,
430 }
431
432 impl MockWatcher {
433 fn new(updates: Vec<KvUpdate>, hold: bool) -> Self {
434 Self {
435 full: Mutex::new(Some(updates)),
436 from: Mutex::new(None),
437 from_expires: false,
438 hold,
439 }
440 }
441
442 async fn deliver(&self, which: &Mutex<Option<Vec<KvUpdate>>>, tx: Sender<KvUpdate>) {
443 let updates = which.lock().unwrap().take().unwrap_or_default();
444 for u in updates {
445 if tx.send(u).await.is_err() {
446 return;
447 }
448 }
449 if self.hold {
450 // Keep `tx` alive (channel open) until this task is aborted.
451 std::future::pending::<()>().await;
452 }
453 }
454 }
455
456 #[async_trait]
457 impl KvWatcher for MockWatcher {
458 async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
459 self.deliver(&self.full, tx).await;
460 Ok(())
461 }
462
463 async fn watch_prefix(&self, _prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
464 self.deliver(&self.full, tx).await;
465 Ok(())
466 }
467
468 async fn watch_prefixes(
469 &self,
470 _prefixes: &[&str],
471 tx: Sender<KvUpdate>,
472 ) -> Result<(), KvError> {
473 // This mock scripts the applied-watch resumption tests, not prefix
474 // filtering; it delivers the same `full` script as `watch_prefix`.
475 // The real multi-filter scoping is proved in the NATS integration test.
476 self.deliver(&self.full, tx).await;
477 Ok(())
478 }
479
480 async fn watch_all_from(
481 &self,
482 _cursor: &WatchCursor,
483 tx: Sender<KvUpdate>,
484 ) -> Result<(), KvError> {
485 if self.from_expires {
486 return Err(KvError::CursorExpired);
487 }
488 self.deliver(&self.from, tx).await;
489 Ok(())
490 }
491
492 // Mirror watch_all_from so the prefix resume / expiry arms of run_watch
493 // are exercised against the same `from` script. Without this the trait's
494 // default impl would delegate to watch_prefix and silently deliver the
495 // full set instead of the delta.
496 async fn watch_prefix_from(
497 &self,
498 _prefix: &str,
499 _cursor: &WatchCursor,
500 tx: Sender<KvUpdate>,
501 ) -> Result<(), KvError> {
502 if self.from_expires {
503 return Err(KvError::CursorExpired);
504 }
505 self.deliver(&self.from, tx).await;
506 Ok(())
507 }
508 }
509
510 /// A watcher whose entry points all fail. Used to prove the watch task's
511 /// terminal error is surfaced out of `watch_applied` rather than swallowed
512 /// as a clean `Ok(applied)` when the channel closes.
513 struct ErrorWatcher;
514
515 #[async_trait]
516 impl KvWatcher for ErrorWatcher {
517 async fn watch_all(&self, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
518 Err(KvError::WatchError("injected watch failure".into()))
519 }
520
521 async fn watch_prefix(&self, _prefix: &str, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
522 Err(KvError::WatchError("injected watch failure".into()))
523 }
524
525 async fn watch_prefixes(
526 &self,
527 _prefixes: &[&str],
528 _tx: Sender<KvUpdate>,
529 ) -> Result<(), KvError> {
530 Err(KvError::WatchError("injected watch failure".into()))
531 }
532 }
533
534 // A no-op parse that keeps every Put as the value bytes; drops deletes.
535 fn parse_put(u: &KvUpdate) -> Option<Vec<u8>> {
536 match u {
537 KvUpdate::Put(e) => Some(e.value.clone()),
538 _ => None,
539 }
540 }
541
542 /// The stream closes (hold = false) with a pending batch; the remainder is
543 /// flushed before returning, the returned cursor is the last revision, and
544 /// `on_applied` ran exactly once after `apply`.
545 #[tokio::test]
546 async fn flush_on_channel_close() {
547 let updates = vec![put("a", b"1", 1), put("b", b"2", 2), put("c", b"3", 3)];
548 let watcher = Arc::new(MockWatcher::new(updates, false));
549
550 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<Vec<u8>>>::new()));
551 let on_applied_cursors = Arc::new(Mutex::new(Vec::<u64>::new()));
552
553 let ab = Arc::clone(&applied_batches);
554 let oc = Arc::clone(&on_applied_cursors);
555 let (_sd_tx, sd_rx) = watch::channel(false);
556
557 let cursor = watch_applied(
558 watcher,
559 WatchScope::All,
560 None,
561 None::<AppendLogSnapshot>,
562 BatchConfig::default(),
563 parse_put,
564 move |batch| ab.lock().unwrap().push(batch),
565 move |c| oc.lock().unwrap().push(c.as_u64().unwrap()),
566 sd_rx,
567 )
568 .await
569 .unwrap();
570
571 assert_eq!(cursor.as_u64(), Some(3));
572 let batches = applied_batches.lock().unwrap();
573 let flat: Vec<Vec<u8>> = batches.iter().flatten().cloned().collect();
574 assert_eq!(flat, vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec()]);
575 assert_eq!(*on_applied_cursors.lock().unwrap().last().unwrap(), 3);
576 }
577
578 /// Fewer than `max` updates, then the channel idles: the window timer must
579 /// flush them and advance the cursor.
580 #[tokio::test(start_paused = true)]
581 async fn flush_on_window() {
582 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
583 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
584
585 let applied = Arc::new(AtomicU64::new(0));
586 let count = Arc::new(AtomicU64::new(0));
587 let a = Arc::clone(&applied);
588 let c = Arc::clone(&count);
589 let (sd_tx, sd_rx) = watch::channel(false);
590
591 let task = tokio::spawn(watch_applied(
592 watcher,
593 WatchScope::All,
594 None,
595 None::<AppendLogSnapshot>,
596 BatchConfig::default(),
597 parse_put,
598 move |batch: Vec<Vec<u8>>| {
599 c.fetch_add(batch.len() as u64, Ordering::SeqCst);
600 },
601 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
602 sd_rx,
603 ));
604
605 // Let the window (10ms) elapse under virtual time.
606 tokio::time::sleep(Duration::from_millis(50)).await;
607 assert_eq!(
608 count.load(Ordering::SeqCst),
609 2,
610 "window should have flushed"
611 );
612 assert_eq!(applied.load(Ordering::SeqCst), 2);
613
614 sd_tx.send(true).unwrap();
615 let cursor = task.await.unwrap().unwrap();
616 assert_eq!(cursor.as_u64(), Some(2));
617 }
618
619 /// Exactly `max` updates fills a batch and flushes immediately — before the
620 /// window would have elapsed.
621 #[tokio::test(start_paused = true)]
622 async fn flush_on_max() {
623 let max = 4;
624 let updates: Vec<_> = (1..=max as u64)
625 .map(|i| put(&format!("k{i}"), b"v", i))
626 .collect();
627 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
628
629 let flushes = Arc::new(Mutex::new(Vec::<usize>::new()));
630 let f = Arc::clone(&flushes);
631 let (sd_tx, sd_rx) = watch::channel(false);
632
633 let task = tokio::spawn(watch_applied(
634 watcher,
635 WatchScope::All,
636 None,
637 None::<AppendLogSnapshot>,
638 BatchConfig {
639 window: Duration::from_secs(3600), // effectively never
640 max,
641 },
642 parse_put,
643 move |batch: Vec<Vec<u8>>| f.lock().unwrap().push(batch.len()),
644 move |_| {},
645 sd_rx,
646 ));
647
648 // Yield enough for the mock to push all `max` updates; the window is an
649 // hour, so any flush is purely the max trigger.
650 tokio::time::sleep(Duration::from_millis(1)).await;
651 assert_eq!(
652 *flushes.lock().unwrap(),
653 vec![max],
654 "a full batch should flush on max, not wait for the window"
655 );
656
657 sd_tx.send(true).unwrap();
658 task.await.unwrap().unwrap();
659 }
660
661 /// A pending batch plus a shutdown signal: the batch is flushed and the
662 /// applied cursor returned.
663 #[tokio::test(start_paused = true)]
664 async fn flush_on_shutdown() {
665 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
666 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
667
668 let applied = Arc::new(AtomicU64::new(0));
669 let a = Arc::clone(&applied);
670 let (sd_tx, sd_rx) = watch::channel(false);
671
672 let task = tokio::spawn(watch_applied(
673 watcher,
674 WatchScope::All,
675 None,
676 None::<AppendLogSnapshot>,
677 BatchConfig {
678 window: Duration::from_secs(3600), // window won't fire
679 max: 100,
680 },
681 parse_put,
682 move |_batch: Vec<Vec<u8>>| {},
683 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
684 sd_rx,
685 ));
686
687 // Give the mock time to deliver both updates into the pending batch.
688 tokio::time::sleep(Duration::from_millis(1)).await;
689 sd_tx.send(true).unwrap();
690
691 let cursor = task.await.unwrap().unwrap();
692 assert_eq!(
693 cursor.as_u64(),
694 Some(2),
695 "shutdown flushes the pending batch"
696 );
697 assert_eq!(applied.load(Ordering::SeqCst), 2);
698 }
699
700 /// The cursor must not advance until `apply` has returned. We prove it by
701 /// having `apply` read the cursor that `on_applied` last published: when the
702 /// second batch is applied, the visible cursor must still be the *first*
703 /// batch's — never the second's, which only becomes visible after this
704 /// `apply` returns.
705 #[tokio::test(start_paused = true)]
706 async fn cursor_advances_only_after_apply() {
707 // Two batches of `max` updates each.
708 let max = 2usize;
709 let updates: Vec<_> = (1..=4u64).map(|i| put(&format!("k{i}"), b"v", i)).collect();
710 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
711
712 // Cursor as last published by on_applied; starts at 0 (nothing applied).
713 let published = Arc::new(AtomicU64::new(0));
714 // What `apply` observed as the published cursor at the moment it ran.
715 let seen_at_apply = Arc::new(Mutex::new(Vec::<u64>::new()));
716
717 let pub_for_apply = Arc::clone(&published);
718 let seen = Arc::clone(&seen_at_apply);
719 let pub_for_on = Arc::clone(&published);
720 let (sd_tx, sd_rx) = watch::channel(false);
721
722 let task = tokio::spawn(watch_applied(
723 watcher,
724 WatchScope::All,
725 None,
726 None::<AppendLogSnapshot>,
727 BatchConfig {
728 window: Duration::from_secs(3600),
729 max,
730 },
731 parse_put,
732 move |_batch: Vec<Vec<u8>>| {
733 // The cursor visible here is whatever the PREVIOUS flush
734 // published — never this batch's, because we haven't returned.
735 seen.lock()
736 .unwrap()
737 .push(pub_for_apply.load(Ordering::SeqCst));
738 },
739 move |cur| pub_for_on.store(cur.as_u64().unwrap(), Ordering::SeqCst),
740 sd_rx,
741 ));
742
743 tokio::time::sleep(Duration::from_millis(1)).await;
744 sd_tx.send(true).unwrap();
745 task.await.unwrap().unwrap();
746
747 // First apply saw 0 (nothing applied yet); second apply saw 2 (first
748 // batch's cursor), NOT 4. The cursor only reached 4 after the second
749 // apply returned.
750 assert_eq!(*seen_at_apply.lock().unwrap(), vec![0, 2]);
751 assert_eq!(published.load(Ordering::SeqCst), 4);
752 }
753
754 /// Updates whose `parse` returns `None` (corrupt / irrelevant) carry no
755 /// domain work, but they were still received — so the cursor must advance
756 /// over them.
757 #[tokio::test]
758 async fn corrupt_parse_entries_advance_cursor() {
759 let updates = vec![put("a", b"1", 5), put("b", b"2", 6), put("c", b"3", 7)];
760 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
761
762 let apply_calls = Arc::new(AtomicU64::new(0));
763 let on_applied_max = Arc::new(AtomicU64::new(0));
764 let ac = Arc::clone(&apply_calls);
765 let om = Arc::clone(&on_applied_max);
766 let (_sd_tx, sd_rx) = watch::channel(false);
767
768 let cursor = watch_applied(
769 watcher,
770 WatchScope::All,
771 None,
772 None::<AppendLogSnapshot>,
773 BatchConfig::default(),
774 // Reject everything — simulates corrupt/irrelevant entries.
775 |_u: &KvUpdate| -> Option<Vec<u8>> { None },
776 move |batch: Vec<Vec<u8>>| {
777 ac.fetch_add(1, Ordering::SeqCst);
778 assert!(batch.is_empty());
779 },
780 move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
781 sd_rx,
782 )
783 .await
784 .unwrap();
785
786 assert_eq!(cursor.as_u64(), Some(7), "cursor covers rejected updates");
787 assert_eq!(
788 apply_calls.load(Ordering::SeqCst),
789 0,
790 "an all-rejected batch applies nothing"
791 );
792 assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
793 }
794
795 /// A resume whose cursor has expired falls back to the full watch and still
796 /// applies the delivered updates.
797 #[tokio::test]
798 async fn cursor_expired_falls_back_to_full_watch() {
799 let mock = MockWatcher {
800 full: Mutex::new(Some(vec![put("a", b"1", 10), put("b", b"2", 11)])),
801 from: Mutex::new(Some(vec![])),
802 from_expires: true,
803 hold: false,
804 };
805 let watcher = Arc::new(mock);
806
807 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
808 let ab = Arc::clone(&applied_batches);
809 let (_sd_tx, sd_rx) = watch::channel(false);
810
811 let cursor = watch_applied(
812 watcher,
813 WatchScope::All,
814 Some(WatchCursor::from_u64(5)), // resume position that "expired"
815 None::<AppendLogSnapshot>,
816 BatchConfig::default(),
817 parse_put,
818 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
819 move |_| {},
820 sd_rx,
821 )
822 .await
823 .unwrap();
824
825 assert_eq!(cursor.as_u64(), Some(11));
826 assert_eq!(
827 *applied_batches.lock().unwrap(),
828 vec![b"1".to_vec(), b"2".to_vec()],
829 "fallback full watch's updates were applied"
830 );
831 }
832
833 /// End-to-end with a real snapshot file: after the run, the persisted
834 /// snapshot's cursor equals the applied cursor and its entries match the
835 /// applied state — proving the checkpoint is written at the post-apply
836 /// cursor, never ahead of it.
837 #[tokio::test]
838 async fn snapshot_checkpoint_matches_applied_cursor() {
839 let dir = tempfile::TempDir::new().unwrap();
840 let path = dir.path().join("applied.snap");
841 let (_resume, store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
842
843 let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
844 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
845 let (_sd_tx, sd_rx) = watch::channel(false);
846
847 let cursor = watch_applied(
848 watcher,
849 WatchScope::All,
850 None,
851 Some(store),
852 BatchConfig::default(),
853 parse_put,
854 move |_batch: Vec<Vec<u8>>| {},
855 move |_| {},
856 sd_rx,
857 )
858 .await
859 .unwrap();
860
861 assert_eq!(cursor.as_u64(), Some(2));
862
863 let snap = crate::snapshot::load(&path).unwrap().unwrap();
864 assert_eq!(
865 snap.cursor.as_u64(),
866 cursor.as_u64(),
867 "snapshot checkpoint cursor must equal the applied cursor"
868 );
869 assert_eq!(snap.entries.len(), 2);
870 assert_eq!(snap.entries["node.a"].value, b"1");
871 assert_eq!(snap.entries["node.b"].value, b"2");
872 }
873
874 /// Happy-path resume: a non-expired cursor takes the `*_from` path and the
875 /// delta (the `from` script, NOT the full set) is applied. Proves the
876 /// resume branch delivers only post-cursor updates and advances to their
877 /// max revision.
878 #[tokio::test]
879 async fn resume_from_cursor_delivers_only_delta() {
880 let mock = MockWatcher {
881 // `full` would be delivered only if the resume path were (wrongly)
882 // bypassed; a non-empty distinguishing value makes that visible.
883 full: Mutex::new(Some(vec![put("full.x", b"FULL", 1)])),
884 from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
885 from_expires: false,
886 hold: false,
887 };
888 let watcher = Arc::new(mock);
889
890 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
891 let ab = Arc::clone(&applied_batches);
892 let (_sd_tx, sd_rx) = watch::channel(false);
893
894 let cursor = watch_applied(
895 watcher,
896 WatchScope::All,
897 Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
898 None::<AppendLogSnapshot>,
899 BatchConfig::default(),
900 parse_put,
901 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
902 move |_| {},
903 sd_rx,
904 )
905 .await
906 .unwrap();
907
908 assert_eq!(
909 cursor.as_u64(),
910 Some(11),
911 "cursor advances to the delta max"
912 );
913 assert_eq!(
914 *applied_batches.lock().unwrap(),
915 vec![b"3".to_vec(), b"4".to_vec()],
916 "only the post-cursor delta is applied, never the full set"
917 );
918 }
919
920 /// `WatchScope::Prefix` with no resume dispatches to `watch_prefix` and
921 /// applies the delivered updates. Every other test uses `WatchScope::All`;
922 /// this covers the prefix dispatch arm.
923 #[tokio::test]
924 async fn prefix_scope_applies_delivered_updates() {
925 let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
926 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
927
928 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
929 let ab = Arc::clone(&applied_batches);
930 let (_sd_tx, sd_rx) = watch::channel(false);
931
932 let cursor = watch_applied(
933 watcher,
934 WatchScope::Prefix("node.".to_string()),
935 None,
936 None::<AppendLogSnapshot>,
937 BatchConfig::default(),
938 parse_put,
939 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
940 move |_| {},
941 sd_rx,
942 )
943 .await
944 .unwrap();
945
946 assert_eq!(cursor.as_u64(), Some(2));
947 assert_eq!(
948 *applied_batches.lock().unwrap(),
949 vec![b"1".to_vec(), b"2".to_vec()]
950 );
951 }
952
953 /// `WatchScope::Prefix` resume whose cursor has expired falls back to the
954 /// full `watch_prefix` and still applies the delivered updates — the prefix
955 /// twin of `cursor_expired_falls_back_to_full_watch`.
956 #[tokio::test]
957 async fn prefix_cursor_expired_falls_back_to_full_prefix_watch() {
958 let mock = MockWatcher {
959 full: Mutex::new(Some(vec![put("node.a", b"1", 10), put("node.b", b"2", 11)])),
960 from: Mutex::new(Some(vec![])),
961 from_expires: true,
962 hold: false,
963 };
964 let watcher = Arc::new(mock);
965
966 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
967 let ab = Arc::clone(&applied_batches);
968 let (_sd_tx, sd_rx) = watch::channel(false);
969
970 let cursor = watch_applied(
971 watcher,
972 WatchScope::Prefix("node.".to_string()),
973 Some(WatchCursor::from_u64(5)), // resume position that "expired"
974 None::<AppendLogSnapshot>,
975 BatchConfig::default(),
976 parse_put,
977 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
978 move |_| {},
979 sd_rx,
980 )
981 .await
982 .unwrap();
983
984 assert_eq!(cursor.as_u64(), Some(11));
985 assert_eq!(
986 *applied_batches.lock().unwrap(),
987 vec![b"1".to_vec(), b"2".to_vec()],
988 "prefix fallback full watch's updates were applied"
989 );
990 }
991
992 /// The watch task's terminal error must propagate out of `watch_applied`
993 /// rather than being swallowed as `Ok(applied)` when the channel closes.
994 #[tokio::test]
995 async fn watch_task_error_propagates() {
996 let watcher = Arc::new(ErrorWatcher);
997 let (_sd_tx, sd_rx) = watch::channel(false);
998
999 let result = watch_applied(
1000 watcher,
1001 WatchScope::All,
1002 None,
1003 None::<AppendLogSnapshot>,
1004 BatchConfig::default(),
1005 parse_put,
1006 move |_batch: Vec<Vec<u8>>| {},
1007 move |_| {},
1008 sd_rx,
1009 )
1010 .await;
1011
1012 match result {
1013 Err(KvError::WatchError(msg)) => {
1014 assert!(msg.contains("injected"), "error carries the cause: {msg}");
1015 }
1016 other => panic!("expected WatchError, got {other:?}"),
1017 }
1018 }
1019
1020 /// A batch where `parse` accepts some updates and rejects others: the cursor
1021 /// must still advance to the highest *received* revision (covering the
1022 /// rejected entry in the middle), while `apply` sees only the accepted ones.
1023 #[tokio::test]
1024 async fn mixed_parse_advances_cursor_over_rejected_entries() {
1025 let updates = vec![
1026 put("keep.a", b"1", 5),
1027 put("skip.b", b"2", 6), // rejected by parse
1028 put("keep.c", b"3", 7),
1029 ];
1030 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1031
1032 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1033 let on_applied_max = Arc::new(AtomicU64::new(0));
1034 let ab = Arc::clone(&applied_batches);
1035 let om = Arc::clone(&on_applied_max);
1036 let (_sd_tx, sd_rx) = watch::channel(false);
1037
1038 let cursor = watch_applied(
1039 watcher,
1040 WatchScope::All,
1041 None,
1042 None::<AppendLogSnapshot>,
1043 BatchConfig::default(),
1044 // Keep only keys under "keep."; reject everything else.
1045 |u: &KvUpdate| -> Option<Vec<u8>> {
1046 match u {
1047 KvUpdate::Put(e) if e.key.starts_with("keep.") => Some(e.value.clone()),
1048 _ => None,
1049 }
1050 },
1051 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1052 move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1053 sd_rx,
1054 )
1055 .await
1056 .unwrap();
1057
1058 assert_eq!(
1059 cursor.as_u64(),
1060 Some(7),
1061 "cursor covers the rejected middle entry (rev 6)"
1062 );
1063 assert_eq!(
1064 *applied_batches.lock().unwrap(),
1065 vec![b"1".to_vec(), b"3".to_vec()],
1066 "apply sees only the accepted entries"
1067 );
1068 assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1069 }
1070
1071 /// Shutdown before any update arrives: nothing was received, so the cursor
1072 /// stays at the resume position (here `none()`), `apply` never runs, and
1073 /// `on_applied` never fires.
1074 #[tokio::test(start_paused = true)]
1075 async fn shutdown_with_no_pending_batch() {
1076 let watcher = Arc::new(MockWatcher::new(vec![], true)); // deliver nothing, hold open
1077
1078 let apply_calls = Arc::new(AtomicU64::new(0));
1079 let on_applied_calls = Arc::new(AtomicU64::new(0));
1080 let ac = Arc::clone(&apply_calls);
1081 let oc = Arc::clone(&on_applied_calls);
1082 let (sd_tx, sd_rx) = watch::channel(false);
1083
1084 let task = tokio::spawn(watch_applied(
1085 watcher,
1086 WatchScope::All,
1087 None,
1088 None::<AppendLogSnapshot>,
1089 BatchConfig::default(),
1090 parse_put,
1091 move |_batch: Vec<Vec<u8>>| {
1092 ac.fetch_add(1, Ordering::SeqCst);
1093 },
1094 move |_| {
1095 oc.fetch_add(1, Ordering::SeqCst);
1096 },
1097 sd_rx,
1098 ));
1099
1100 // Let the watcher attach and idle (it has nothing to deliver), then shut down.
1101 tokio::time::sleep(Duration::from_millis(1)).await;
1102 sd_tx.send(true).unwrap();
1103
1104 let cursor = task.await.unwrap().unwrap();
1105 assert_eq!(
1106 cursor.as_u64(),
1107 None,
1108 "no updates received → cursor unmoved"
1109 );
1110 assert_eq!(apply_calls.load(Ordering::SeqCst), 0, "apply never runs");
1111 assert_eq!(
1112 on_applied_calls.load(Ordering::SeqCst),
1113 0,
1114 "on_applied never fires"
1115 );
1116 }
1117
1118 /// With a low `compact_threshold`, the flush path's `spawn_blocking`
1119 /// compaction actually fires (every other snapshot test pins the threshold
1120 /// at `u64::MAX`, leaving that branch dead). After a compacting run the
1121 /// snapshot must still load cleanly with the right cursor and entries.
1122 #[tokio::test]
1123 async fn snapshot_compaction_fires_and_stays_consistent() {
1124 let dir = tempfile::TempDir::new().unwrap();
1125 let path = dir.path().join("applied.snap");
1126 // threshold 0 → every checkpoint reports "needs compact", forcing the
1127 // store's inline-compaction branch on each flush (run off the hot path via
1128 // spawn_blocking inside watch_applied).
1129 let (_resume, store) = AppendLogSnapshot::open(&path, 0).unwrap();
1130
1131 // Re-put the same key across flushes so compaction has duplicates to
1132 // dedup; small max forces multiple flushes (hence multiple compactions).
1133 let updates = vec![
1134 put("node.a", b"1", 1),
1135 put("node.a", b"2", 2),
1136 put("node.b", b"3", 3),
1137 put("node.a", b"4", 4),
1138 ];
1139 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1140 let (_sd_tx, sd_rx) = watch::channel(false);
1141
1142 let cursor = watch_applied(
1143 watcher,
1144 WatchScope::All,
1145 None,
1146 Some(store),
1147 BatchConfig {
1148 window: Duration::from_secs(3600),
1149 max: 1, // one update per flush → a compaction per update
1150 },
1151 parse_put,
1152 move |_batch: Vec<Vec<u8>>| {},
1153 move |_| {},
1154 sd_rx,
1155 )
1156 .await
1157 .unwrap();
1158
1159 assert_eq!(cursor.as_u64(), Some(4));
1160
1161 let snap = crate::snapshot::load(&path).unwrap().unwrap();
1162 assert_eq!(
1163 snap.cursor.as_u64(),
1164 cursor.as_u64(),
1165 "compacted snapshot's cursor still equals the applied cursor"
1166 );
1167 assert_eq!(snap.entries.len(), 2, "duplicates of node.a deduped");
1168 assert_eq!(
1169 snap.entries["node.a"].value, b"4",
1170 "last write per key survives compaction"
1171 );
1172 assert_eq!(snap.entries["node.b"].value, b"3");
1173 }
1174}