slipstream/applied.rs
1//! Cursor-after-apply watch combinator.
2//!
3//! [`watch_applied`] drives a [`KvWatcher`], batches incoming [`KvUpdate`]s over
4//! a short window (or a max count), hands each batch to a caller-supplied
5//! `apply` closure, and **only then** advances the resume cursor, checkpoints
6//! the snapshot, and fires `on_applied`. It encodes one discipline that every
7//! hand-rolled watch loop in the wider system gets subtly wrong:
8//!
9//! > **INVARIANT.** A persisted/reported cursor `C` implies every update with
10//! > revision ≤ `C` has been *applied* — the caller's `apply()` has returned for
11//! > it. The cursor never advances on *receipt* of an update, only after it has
12//! > durably taken effect.
13//!
14//! ## Why receipt is the wrong signal
15//!
16//! The tempting shortcut is to bump the cursor as each update arrives off the
17//! channel (`high_water = rev` on `rx.recv()`), then apply the batch later. On a
18//! crash between those two steps the persisted cursor claims "caught up to rev
19//! N" while rev N is still sitting in an unapplied buffer. On resume the watch
20//! starts *past* rev N and silently skips it — a correctness hole in the exact
21//! "resume after any restart" guarantee this crate advertises.
22//!
23//! Saltzer, Reed & Clark's *End-to-End Arguments in System Design* (1984) names
24//! the fix: a function placed below the endpoints (here, the channel receive)
25//! can only be a performance hint; the *endpoint* — the application of the
26//! update — is the only place the "it happened" guarantee can actually be
27//! established. So the cursor is written from `apply()`'s completion, not from
28//! the transport's delivery.
29//!
30//! The cursor-as-monotonic-index-into-a-log shape itself follows HashiCorp
31//! Consul's anti-entropy / blocking-query lineage: a client holds the last index
32//! it has *reconciled* and re-arms the watch from there, never from the index it
33//! merely *saw*.
34//!
35//! ## What the caller supplies
36//!
37//! - `parse`: maps a raw [`KvUpdate`] to an optional domain value `U`. Returning
38//! `None` (corrupt bytes, irrelevant key) is fine — the update is still
39//! *received*, so it still counts toward the cursor; there is simply nothing to
40//! apply for it.
41//! - `apply`: consumes a `Vec<U>` in revision order. This is the only domain
42//! logic; for the tunnel router it swaps the route table, for the edge origin
43//! watcher it rebuilds the hashrings.
44//! - `on_applied`: fires once per flush, *after* `apply` returns, with the new
45//! applied cursor. Callers use it to persist the cursor for the next restart.
46//!
47//! ## Panics
48//!
49//! `apply` runs inline on the watch task. If it panics, the panic propagates out
50//! of [`watch_applied`] and aborts the watch — that is the caller's contract,
51//! the same as a panic in any other supplied closure.
52
53use std::sync::Arc;
54use std::time::Duration;
55
56use tokio::sync::mpsc;
57use tokio::sync::watch;
58use tracing::warn;
59
60use crate::kv::{KvError, KvUpdate, KvWatcher, WatchCursor};
61use crate::snapshot::SnapshotStore;
62
63/// What to watch: every key, or every key under a prefix.
64///
65/// Mirrors the [`KvWatcher`] surface — `All` maps to `watch_all` /
66/// `watch_all_from`, `Prefix` to `watch_prefix` / `watch_prefix_from`.
67#[derive(Debug, Clone)]
68pub enum WatchScope {
69 /// Watch all keys in the bucket.
70 All,
71 /// Watch only keys beginning with this prefix.
72 Prefix(String),
73}
74
75/// Batching policy for [`watch_applied`].
76///
77/// A flush fires when **either** bound is hit, whichever comes first: `window`
78/// time has elapsed since the batch opened, or `max` updates have accumulated.
79/// The window amortizes the cost of `apply` (e.g. one route-table clone per
80/// flush instead of one per update); `max` caps memory and latency when updates
81/// arrive faster than the window.
82#[derive(Debug, Clone, Copy)]
83pub struct BatchConfig {
84 /// Maximum time a batch stays open before being flushed.
85 pub window: Duration,
86 /// Maximum number of parsed updates in a batch before forcing a flush.
87 pub max: usize,
88}
89
90impl Default for BatchConfig {
91 /// 10 ms / 100 updates — the de-facto default every hand-rolled caller
92 /// already used, lifted into one place.
93 fn default() -> Self {
94 Self {
95 window: Duration::from_millis(10),
96 max: 100,
97 }
98 }
99}
100
101/// Drive a watch with cursor-after-apply semantics.
102///
103/// Subscribes per `scope` (resuming from `resume` when it carries a position),
104/// batches updates per `config`, applies each batch via `apply`, and only then
105/// advances the cursor / folds the batch into `store` / calls `on_applied`.
106/// Returns the final applied cursor when the watch ends (shutdown signalled, or
107/// the underlying stream closed).
108///
109/// `store` is any [`SnapshotStore`] backend the consumer chose (the in-RAM
110/// [`AppendLogSnapshot`](crate::AppendLogSnapshot) default, an on-disk backend, or
111/// its own impl) — or `None` to run without persistence. On each flush, *after*
112/// `apply` returns, the whole batch of raw [`KvUpdate`]s is handed to
113/// `store.apply(batch, applied_cursor)` on a blocking task, so the store's
114/// persisted cursor is always the post-apply cursor and never names a revision
115/// whose `apply` had not returned. The store fold is atomic (data + cursor), so a
116/// crash leaves the store consistent and resume re-folds only the tail.
117///
118/// On [`KvError::CursorExpired`] from the `*_from` resume path, this logs and
119/// falls back to a full-scope watch (`watch_all` / `watch_prefix`). Callers see
120/// the full re-list as a stream of puts, exactly as the hand-rolled loops did.
121///
122/// See `ARCHITECTURE.md` ("Applied-Cursor Watch") for the invariant and its
123/// rationale.
124///
125/// # Type parameters
126/// - `U`: the caller's domain update type, produced by `parse` and consumed by
127/// `apply`.
128// This combinator takes each of its dependencies as a parameter so every
129// caller-supplied closure (`parse`/`apply`/`on_applied`) keeps its own distinct
130// type and is monomorphized at the call site. Folding them into a builder struct
131// would either box the closures or force a single generic bundle, losing that.
132#[allow(clippy::too_many_arguments)]
133// The flush macro resets `batch_high`/`batch_deadline` for the next loop
134// iteration. At the two flush sites that return immediately afterward (shutdown,
135// channel-close) those resets are dead stores — correct, but flagged.
136#[allow(unused_assignments)]
137pub async fn watch_applied<U, S, P, A, O>(
138 watcher: Arc<dyn KvWatcher>,
139 scope: WatchScope,
140 resume: Option<WatchCursor>,
141 mut store: Option<S>,
142 config: BatchConfig,
143 mut parse: P,
144 mut apply: A,
145 mut on_applied: O,
146 mut shutdown: watch::Receiver<bool>,
147) -> Result<WatchCursor, KvError>
148where
149 U: Send,
150 // `Send + 'static`: each flush moves `store` onto a blocking task to run its
151 // (potentially blocking) `apply`, then takes it back — the same offload the
152 // append log's compaction always used.
153 S: SnapshotStore + Send + 'static,
154 P: FnMut(&KvUpdate) -> Option<U> + Send,
155 A: FnMut(Vec<U>) + Send,
156 O: FnMut(WatchCursor) + Send,
157{
158 // The cursor we'll return. Initialized from the resume position so that a
159 // watch which receives nothing new still reports the position it resumed
160 // from as "applied" (it is — everything up to it was applied before the last
161 // run persisted it).
162 let mut applied = match &resume {
163 Some(c) => c.clone(),
164 None => WatchCursor::none(),
165 };
166
167 // Spawn the watch task. It owns the cursor-expired fallback so the main loop
168 // only ever sees a clean ordered stream of updates on `rx`.
169 let (tx, mut rx) = mpsc::channel::<KvUpdate>(256);
170 let handle = {
171 let watcher = Arc::clone(&watcher);
172 tokio::spawn(async move { run_watch(watcher.as_ref(), &scope, resume, tx).await })
173 };
174
175 // Batch state.
176 //
177 // `batch_high` tracks the version of the most recently *received* update
178 // since the last flush — including updates `parse` rejected. NATS delivers
179 // in revision order, so the last received is the highest, and advancing the
180 // cursor to it after a single atomic `apply` is correct: having seen the max
181 // means we've seen everything below it, and a rejected entry is still
182 // "nothing to apply", hence covered. Reset to `none()` after every flush.
183 let batch_cap = config.max.clamp(1, 64);
184 let mut batch: Vec<U> = Vec::with_capacity(batch_cap);
185 // Raw received updates for the durable `store`, in revision order. Only
186 // populated when a `store` is present; the store folds the *raw* updates
187 // (including ones `parse` rejected — they are still part of the bucket's
188 // state), whereas the parsed `batch` above is the consumer's domain view.
189 let mut raw_batch: Vec<KvUpdate> = Vec::new();
190 let mut batch_high = WatchCursor::none();
191 // `Some` once a batch has opened and the window timer is armed; `None`
192 // between flushes. Only the armed/idle distinction is read in the loop —
193 // the absolute instant lives in the pinned `sleep` future below.
194 let mut batch_deadline: Option<tokio::time::Instant> = None;
195
196 // Flush the current batch, in order: run the domain `apply` (if non-empty) to
197 // completion, advance the cursor, fold the raw batch + cursor durably into
198 // `store`, then fire `on_applied`. The store fold runs on a blocking task
199 // (its `apply` may block on I/O), moving the store in and taking it back — the
200 // same offload the append log's compaction always used. A store error is
201 // logged and the watch continues (the snapshot is a cache); a panicked
202 // blocking task drops the store irrecoverably, which breaks the
203 // resume-after-restart guarantee, so it is surfaced as fatal.
204 macro_rules! flush {
205 () => {{
206 // Nothing received since the last flush → nothing to do at all.
207 if !batch.is_empty() || !batch_high.is_none() {
208 if !batch.is_empty() {
209 // INVARIANT: apply() runs and RETURNS before any cursor
210 // advance below. Move the batch out so a panicking apply
211 // can't leave half-consumed state behind.
212 //
213 // `replace` (not `take`) leaves a pre-sized Vec behind so each
214 // batch after the first doesn't re-climb the reallocation
215 // ladder (4→8→…→cap).
216 apply(std::mem::replace(&mut batch, Vec::with_capacity(batch_cap)));
217 }
218 if !batch_high.is_none() {
219 applied = batch_high.clone();
220 if let Some(mut st) = store.take() {
221 let raw = std::mem::take(&mut raw_batch);
222 let cur = applied.clone();
223 // Hand the store back unconditionally on a clean return so
224 // a *failed* apply (Ok(Err)) keeps the watch running; only
225 // a *panicked* task (Err) loses the store and is fatal.
226 match tokio::task::spawn_blocking(move || {
227 let res = st.apply(&raw, &cur);
228 (st, res)
229 })
230 .await
231 {
232 Ok((st, Ok(()))) => store = Some(st),
233 Ok((st, Err(e))) => {
234 warn!(error = %e, "snapshot store apply failed; continuing");
235 store = Some(st);
236 }
237 Err(e) => {
238 warn!(error = %e, "snapshot store task panicked; aborting watch");
239 handle.abort();
240 return Err(KvError::WatchError(format!(
241 "snapshot store task panicked: {e}"
242 )));
243 }
244 }
245 }
246 on_applied(applied.clone());
247 batch_high = WatchCursor::none();
248 }
249 }
250 batch_deadline = None;
251 }};
252 }
253
254 // A single timer future, reset in place each time a batch opens. The old
255 // `tokio::time::sleep(timeout)` lived inside the select arm, so it was
256 // re-created on every loop iteration — one Arc-backed timer-wheel entry
257 // allocated, registered, and immediately dropped per received update.
258 // Pinning one future and `reset`-ing it reuses that single allocation; the
259 // `if batch_deadline.is_some()` guard keeps it from firing while idle, so
260 // its initial already-elapsed deadline is never observed.
261 let sleep = tokio::time::sleep(Duration::ZERO);
262 tokio::pin!(sleep);
263
264 loop {
265 tokio::select! {
266 biased;
267
268 // Shutdown wins: flush whatever is batched (so the cursor reflects
269 // it), abandon any updates still in flight on the channel — they
270 // weren't applied, the cursor doesn't claim them, and they'll be
271 // re-delivered on the next resume — and return the applied cursor.
272 res = shutdown.changed() => {
273 if res.is_err() || *shutdown.borrow() {
274 flush!();
275 handle.abort();
276 // Observe the task's terminal state. An abort surfaces as a
277 // cancelled JoinError, which we ignore; a genuine panic that
278 // raced ahead of the abort is logged rather than silently lost.
279 if let Err(join) = handle.await
280 && !join.is_cancelled()
281 {
282 warn!(error = %join, "watch task panicked at shutdown");
283 }
284 return Ok(applied);
285 }
286 }
287
288 // Batch window elapsed.
289 () = &mut sleep, if batch_deadline.is_some() => {
290 flush!();
291 }
292
293 update = rx.recv() => {
294 match update {
295 Some(u) => {
296 // Cursor authority: every received update bumps the
297 // pending high-water, regardless of whether `parse`
298 // keeps it.
299 batch_high = WatchCursor::from_version(u.version().clone());
300
301 // Buffer the raw update for the durable store fold (which
302 // commits the whole batch + cursor atomically on flush).
303 // Done before `parse` consumes `u` by reference, and only
304 // when a store is present so the no-persistence path keeps
305 // its zero-copy cost.
306 if store.is_some() {
307 raw_batch.push(u.clone());
308 }
309
310 if let Some(parsed) = parse(&u) {
311 batch.push(parsed);
312 }
313
314 // Arm the window on the first received update of a batch
315 // — even a parse-rejected one, so the cursor advances
316 // within `window` even through a run of irrelevant keys.
317 // Reset the pinned timer to the new deadline rather than
318 // allocating a fresh `Sleep`.
319 if batch_deadline.is_none() {
320 let deadline = tokio::time::Instant::now() + config.window;
321 sleep.as_mut().reset(deadline);
322 batch_deadline = Some(deadline);
323 }
324
325 // Flush on a full parsed batch, or — when persisting — a
326 // full raw batch, so a window packed with parse-rejected
327 // updates can't grow `raw_batch` without bound before the
328 // window elapses.
329 if batch.len() >= config.max || raw_batch.len() >= config.max {
330 flush!();
331 }
332 }
333 None => {
334 // Stream closed. Flush the remainder, then surface the
335 // watch task's terminal result: a clean end returns the
336 // applied cursor, an error propagates.
337 flush!();
338 return match handle.await {
339 Ok(Ok(())) => Ok(applied),
340 Ok(Err(e)) => Err(e),
341 Err(join) => Err(KvError::WatchError(format!(
342 "watch task panicked: {join}"
343 ))),
344 };
345 }
346 }
347 }
348 }
349 }
350}
351
352/// Run the underlying watch for `scope`, resuming from `resume` when it carries
353/// a position, with the [`KvError::CursorExpired`] → full-watch fallback.
354async fn run_watch(
355 watcher: &dyn KvWatcher,
356 scope: &WatchScope,
357 resume: Option<WatchCursor>,
358 tx: mpsc::Sender<KvUpdate>,
359) -> Result<(), KvError> {
360 // Resume only when the cursor carries a real position; an absent or `none()`
361 // cursor falls through to a full watch. Binding `cursor` here makes "we have a
362 // resume position" structural — there is no separate bool whose truth a later
363 // edit could let drift from the `Some`.
364 let resume_cursor = resume.filter(|c| !c.is_none());
365
366 match scope {
367 WatchScope::All => {
368 if let Some(cursor) = resume_cursor {
369 match watcher.watch_all_from(&cursor, tx.clone()).await {
370 Err(KvError::CursorExpired) => {
371 // TODO(v2): signal a "resync" to the caller so it can
372 // diff the full re-list against prior state and emit
373 // synthetic deletes for keys that vanished during the
374 // gap (see Snapshot::stale_keys). For v1 the full
375 // re-list is replayed as a stream of puts, matching the
376 // hand-rolled loops this combinator replaces.
377 warn!("watch cursor expired, falling back to full watch_all");
378 watcher.watch_all(tx).await
379 }
380 other => other,
381 }
382 } else {
383 watcher.watch_all(tx).await
384 }
385 }
386 WatchScope::Prefix(prefix) => {
387 if let Some(cursor) = resume_cursor {
388 match watcher.watch_prefix_from(prefix, &cursor, tx.clone()).await {
389 Err(KvError::CursorExpired) => {
390 // TODO(v2): see the watch_all arm above.
391 warn!("watch cursor expired, falling back to full watch_prefix");
392 watcher.watch_prefix(prefix, tx).await
393 }
394 other => other,
395 }
396 } else {
397 watcher.watch_prefix(prefix, tx).await
398 }
399 }
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406 use crate::kv::{KvEntry, VersionToken};
407 use crate::snapshot::AppendLogSnapshot;
408 use async_trait::async_trait;
409 use std::sync::Mutex;
410 use std::sync::atomic::{AtomicU64, Ordering};
411 use tokio::sync::mpsc::Sender;
412
413 fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
414 KvUpdate::Put(KvEntry {
415 key: key.to_string(),
416 value: value.to_vec(),
417 version: VersionToken::from_u64(rev),
418 })
419 }
420
421 /// A scripted watcher. Delivers a pre-set list of updates through the
422 /// channel, then either holds the channel open (so window/max/shutdown
423 /// flushes can be exercised without the stream ending) or returns cleanly
424 /// (so channel-close flushing can be exercised).
425 struct MockWatcher {
426 full: Mutex<Option<Vec<KvUpdate>>>,
427 from: Mutex<Option<Vec<KvUpdate>>>,
428 from_expires: bool,
429 hold: bool,
430 }
431
432 impl MockWatcher {
433 fn new(updates: Vec<KvUpdate>, hold: bool) -> Self {
434 Self {
435 full: Mutex::new(Some(updates)),
436 from: Mutex::new(None),
437 from_expires: false,
438 hold,
439 }
440 }
441
442 async fn deliver(&self, which: &Mutex<Option<Vec<KvUpdate>>>, tx: Sender<KvUpdate>) {
443 let updates = which.lock().unwrap().take().unwrap_or_default();
444 for u in updates {
445 if tx.send(u).await.is_err() {
446 return;
447 }
448 }
449 if self.hold {
450 // Keep `tx` alive (channel open) until this task is aborted.
451 std::future::pending::<()>().await;
452 }
453 }
454 }
455
456 #[async_trait]
457 impl KvWatcher for MockWatcher {
458 async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
459 self.deliver(&self.full, tx).await;
460 Ok(())
461 }
462
463 async fn watch_prefix(&self, _prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
464 self.deliver(&self.full, tx).await;
465 Ok(())
466 }
467
468 async fn watch_all_from(
469 &self,
470 _cursor: &WatchCursor,
471 tx: Sender<KvUpdate>,
472 ) -> Result<(), KvError> {
473 if self.from_expires {
474 return Err(KvError::CursorExpired);
475 }
476 self.deliver(&self.from, tx).await;
477 Ok(())
478 }
479
480 // Mirror watch_all_from so the prefix resume / expiry arms of run_watch
481 // are exercised against the same `from` script. Without this the trait's
482 // default impl would delegate to watch_prefix and silently deliver the
483 // full set instead of the delta.
484 async fn watch_prefix_from(
485 &self,
486 _prefix: &str,
487 _cursor: &WatchCursor,
488 tx: Sender<KvUpdate>,
489 ) -> Result<(), KvError> {
490 if self.from_expires {
491 return Err(KvError::CursorExpired);
492 }
493 self.deliver(&self.from, tx).await;
494 Ok(())
495 }
496 }
497
498 /// A watcher whose entry points all fail. Used to prove the watch task's
499 /// terminal error is surfaced out of `watch_applied` rather than swallowed
500 /// as a clean `Ok(applied)` when the channel closes.
501 struct ErrorWatcher;
502
503 #[async_trait]
504 impl KvWatcher for ErrorWatcher {
505 async fn watch_all(&self, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
506 Err(KvError::WatchError("injected watch failure".into()))
507 }
508
509 async fn watch_prefix(&self, _prefix: &str, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
510 Err(KvError::WatchError("injected watch failure".into()))
511 }
512 }
513
514 // A no-op parse that keeps every Put as the value bytes; drops deletes.
515 fn parse_put(u: &KvUpdate) -> Option<Vec<u8>> {
516 match u {
517 KvUpdate::Put(e) => Some(e.value.clone()),
518 _ => None,
519 }
520 }
521
522 /// The stream closes (hold = false) with a pending batch; the remainder is
523 /// flushed before returning, the returned cursor is the last revision, and
524 /// `on_applied` ran exactly once after `apply`.
525 #[tokio::test]
526 async fn flush_on_channel_close() {
527 let updates = vec![put("a", b"1", 1), put("b", b"2", 2), put("c", b"3", 3)];
528 let watcher = Arc::new(MockWatcher::new(updates, false));
529
530 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<Vec<u8>>>::new()));
531 let on_applied_cursors = Arc::new(Mutex::new(Vec::<u64>::new()));
532
533 let ab = Arc::clone(&applied_batches);
534 let oc = Arc::clone(&on_applied_cursors);
535 let (_sd_tx, sd_rx) = watch::channel(false);
536
537 let cursor = watch_applied(
538 watcher,
539 WatchScope::All,
540 None,
541 None::<AppendLogSnapshot>,
542 BatchConfig::default(),
543 parse_put,
544 move |batch| ab.lock().unwrap().push(batch),
545 move |c| oc.lock().unwrap().push(c.as_u64().unwrap()),
546 sd_rx,
547 )
548 .await
549 .unwrap();
550
551 assert_eq!(cursor.as_u64(), Some(3));
552 let batches = applied_batches.lock().unwrap();
553 let flat: Vec<Vec<u8>> = batches.iter().flatten().cloned().collect();
554 assert_eq!(flat, vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec()]);
555 assert_eq!(*on_applied_cursors.lock().unwrap().last().unwrap(), 3);
556 }
557
558 /// Fewer than `max` updates, then the channel idles: the window timer must
559 /// flush them and advance the cursor.
560 #[tokio::test(start_paused = true)]
561 async fn flush_on_window() {
562 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
563 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
564
565 let applied = Arc::new(AtomicU64::new(0));
566 let count = Arc::new(AtomicU64::new(0));
567 let a = Arc::clone(&applied);
568 let c = Arc::clone(&count);
569 let (sd_tx, sd_rx) = watch::channel(false);
570
571 let task = tokio::spawn(watch_applied(
572 watcher,
573 WatchScope::All,
574 None,
575 None::<AppendLogSnapshot>,
576 BatchConfig::default(),
577 parse_put,
578 move |batch: Vec<Vec<u8>>| {
579 c.fetch_add(batch.len() as u64, Ordering::SeqCst);
580 },
581 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
582 sd_rx,
583 ));
584
585 // Let the window (10ms) elapse under virtual time.
586 tokio::time::sleep(Duration::from_millis(50)).await;
587 assert_eq!(
588 count.load(Ordering::SeqCst),
589 2,
590 "window should have flushed"
591 );
592 assert_eq!(applied.load(Ordering::SeqCst), 2);
593
594 sd_tx.send(true).unwrap();
595 let cursor = task.await.unwrap().unwrap();
596 assert_eq!(cursor.as_u64(), Some(2));
597 }
598
599 /// Exactly `max` updates fills a batch and flushes immediately — before the
600 /// window would have elapsed.
601 #[tokio::test(start_paused = true)]
602 async fn flush_on_max() {
603 let max = 4;
604 let updates: Vec<_> = (1..=max as u64)
605 .map(|i| put(&format!("k{i}"), b"v", i))
606 .collect();
607 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
608
609 let flushes = Arc::new(Mutex::new(Vec::<usize>::new()));
610 let f = Arc::clone(&flushes);
611 let (sd_tx, sd_rx) = watch::channel(false);
612
613 let task = tokio::spawn(watch_applied(
614 watcher,
615 WatchScope::All,
616 None,
617 None::<AppendLogSnapshot>,
618 BatchConfig {
619 window: Duration::from_secs(3600), // effectively never
620 max,
621 },
622 parse_put,
623 move |batch: Vec<Vec<u8>>| f.lock().unwrap().push(batch.len()),
624 move |_| {},
625 sd_rx,
626 ));
627
628 // Yield enough for the mock to push all `max` updates; the window is an
629 // hour, so any flush is purely the max trigger.
630 tokio::time::sleep(Duration::from_millis(1)).await;
631 assert_eq!(
632 *flushes.lock().unwrap(),
633 vec![max],
634 "a full batch should flush on max, not wait for the window"
635 );
636
637 sd_tx.send(true).unwrap();
638 task.await.unwrap().unwrap();
639 }
640
641 /// A pending batch plus a shutdown signal: the batch is flushed and the
642 /// applied cursor returned.
643 #[tokio::test(start_paused = true)]
644 async fn flush_on_shutdown() {
645 let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
646 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
647
648 let applied = Arc::new(AtomicU64::new(0));
649 let a = Arc::clone(&applied);
650 let (sd_tx, sd_rx) = watch::channel(false);
651
652 let task = tokio::spawn(watch_applied(
653 watcher,
654 WatchScope::All,
655 None,
656 None::<AppendLogSnapshot>,
657 BatchConfig {
658 window: Duration::from_secs(3600), // window won't fire
659 max: 100,
660 },
661 parse_put,
662 move |_batch: Vec<Vec<u8>>| {},
663 move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
664 sd_rx,
665 ));
666
667 // Give the mock time to deliver both updates into the pending batch.
668 tokio::time::sleep(Duration::from_millis(1)).await;
669 sd_tx.send(true).unwrap();
670
671 let cursor = task.await.unwrap().unwrap();
672 assert_eq!(
673 cursor.as_u64(),
674 Some(2),
675 "shutdown flushes the pending batch"
676 );
677 assert_eq!(applied.load(Ordering::SeqCst), 2);
678 }
679
680 /// The cursor must not advance until `apply` has returned. We prove it by
681 /// having `apply` read the cursor that `on_applied` last published: when the
682 /// second batch is applied, the visible cursor must still be the *first*
683 /// batch's — never the second's, which only becomes visible after this
684 /// `apply` returns.
685 #[tokio::test(start_paused = true)]
686 async fn cursor_advances_only_after_apply() {
687 // Two batches of `max` updates each.
688 let max = 2usize;
689 let updates: Vec<_> = (1..=4u64).map(|i| put(&format!("k{i}"), b"v", i)).collect();
690 let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
691
692 // Cursor as last published by on_applied; starts at 0 (nothing applied).
693 let published = Arc::new(AtomicU64::new(0));
694 // What `apply` observed as the published cursor at the moment it ran.
695 let seen_at_apply = Arc::new(Mutex::new(Vec::<u64>::new()));
696
697 let pub_for_apply = Arc::clone(&published);
698 let seen = Arc::clone(&seen_at_apply);
699 let pub_for_on = Arc::clone(&published);
700 let (sd_tx, sd_rx) = watch::channel(false);
701
702 let task = tokio::spawn(watch_applied(
703 watcher,
704 WatchScope::All,
705 None,
706 None::<AppendLogSnapshot>,
707 BatchConfig {
708 window: Duration::from_secs(3600),
709 max,
710 },
711 parse_put,
712 move |_batch: Vec<Vec<u8>>| {
713 // The cursor visible here is whatever the PREVIOUS flush
714 // published — never this batch's, because we haven't returned.
715 seen.lock()
716 .unwrap()
717 .push(pub_for_apply.load(Ordering::SeqCst));
718 },
719 move |cur| pub_for_on.store(cur.as_u64().unwrap(), Ordering::SeqCst),
720 sd_rx,
721 ));
722
723 tokio::time::sleep(Duration::from_millis(1)).await;
724 sd_tx.send(true).unwrap();
725 task.await.unwrap().unwrap();
726
727 // First apply saw 0 (nothing applied yet); second apply saw 2 (first
728 // batch's cursor), NOT 4. The cursor only reached 4 after the second
729 // apply returned.
730 assert_eq!(*seen_at_apply.lock().unwrap(), vec![0, 2]);
731 assert_eq!(published.load(Ordering::SeqCst), 4);
732 }
733
734 /// Updates whose `parse` returns `None` (corrupt / irrelevant) carry no
735 /// domain work, but they were still received — so the cursor must advance
736 /// over them.
737 #[tokio::test]
738 async fn corrupt_parse_entries_advance_cursor() {
739 let updates = vec![put("a", b"1", 5), put("b", b"2", 6), put("c", b"3", 7)];
740 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
741
742 let apply_calls = Arc::new(AtomicU64::new(0));
743 let on_applied_max = Arc::new(AtomicU64::new(0));
744 let ac = Arc::clone(&apply_calls);
745 let om = Arc::clone(&on_applied_max);
746 let (_sd_tx, sd_rx) = watch::channel(false);
747
748 let cursor = watch_applied(
749 watcher,
750 WatchScope::All,
751 None,
752 None::<AppendLogSnapshot>,
753 BatchConfig::default(),
754 // Reject everything — simulates corrupt/irrelevant entries.
755 |_u: &KvUpdate| -> Option<Vec<u8>> { None },
756 move |batch: Vec<Vec<u8>>| {
757 ac.fetch_add(1, Ordering::SeqCst);
758 assert!(batch.is_empty());
759 },
760 move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
761 sd_rx,
762 )
763 .await
764 .unwrap();
765
766 assert_eq!(cursor.as_u64(), Some(7), "cursor covers rejected updates");
767 assert_eq!(
768 apply_calls.load(Ordering::SeqCst),
769 0,
770 "an all-rejected batch applies nothing"
771 );
772 assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
773 }
774
775 /// A resume whose cursor has expired falls back to the full watch and still
776 /// applies the delivered updates.
777 #[tokio::test]
778 async fn cursor_expired_falls_back_to_full_watch() {
779 let mock = MockWatcher {
780 full: Mutex::new(Some(vec![put("a", b"1", 10), put("b", b"2", 11)])),
781 from: Mutex::new(Some(vec![])),
782 from_expires: true,
783 hold: false,
784 };
785 let watcher = Arc::new(mock);
786
787 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
788 let ab = Arc::clone(&applied_batches);
789 let (_sd_tx, sd_rx) = watch::channel(false);
790
791 let cursor = watch_applied(
792 watcher,
793 WatchScope::All,
794 Some(WatchCursor::from_u64(5)), // resume position that "expired"
795 None::<AppendLogSnapshot>,
796 BatchConfig::default(),
797 parse_put,
798 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
799 move |_| {},
800 sd_rx,
801 )
802 .await
803 .unwrap();
804
805 assert_eq!(cursor.as_u64(), Some(11));
806 assert_eq!(
807 *applied_batches.lock().unwrap(),
808 vec![b"1".to_vec(), b"2".to_vec()],
809 "fallback full watch's updates were applied"
810 );
811 }
812
813 /// End-to-end with a real snapshot file: after the run, the persisted
814 /// snapshot's cursor equals the applied cursor and its entries match the
815 /// applied state — proving the checkpoint is written at the post-apply
816 /// cursor, never ahead of it.
817 #[tokio::test]
818 async fn snapshot_checkpoint_matches_applied_cursor() {
819 let dir = tempfile::TempDir::new().unwrap();
820 let path = dir.path().join("applied.snap");
821 let (_resume, store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
822
823 let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
824 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
825 let (_sd_tx, sd_rx) = watch::channel(false);
826
827 let cursor = watch_applied(
828 watcher,
829 WatchScope::All,
830 None,
831 Some(store),
832 BatchConfig::default(),
833 parse_put,
834 move |_batch: Vec<Vec<u8>>| {},
835 move |_| {},
836 sd_rx,
837 )
838 .await
839 .unwrap();
840
841 assert_eq!(cursor.as_u64(), Some(2));
842
843 let snap = crate::snapshot::load(&path).unwrap().unwrap();
844 assert_eq!(
845 snap.cursor.as_u64(),
846 cursor.as_u64(),
847 "snapshot checkpoint cursor must equal the applied cursor"
848 );
849 assert_eq!(snap.entries.len(), 2);
850 assert_eq!(snap.entries["node.a"].value, b"1");
851 assert_eq!(snap.entries["node.b"].value, b"2");
852 }
853
854 /// Happy-path resume: a non-expired cursor takes the `*_from` path and the
855 /// delta (the `from` script, NOT the full set) is applied. Proves the
856 /// resume branch delivers only post-cursor updates and advances to their
857 /// max revision.
858 #[tokio::test]
859 async fn resume_from_cursor_delivers_only_delta() {
860 let mock = MockWatcher {
861 // `full` would be delivered only if the resume path were (wrongly)
862 // bypassed; a non-empty distinguishing value makes that visible.
863 full: Mutex::new(Some(vec![put("full.x", b"FULL", 1)])),
864 from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
865 from_expires: false,
866 hold: false,
867 };
868 let watcher = Arc::new(mock);
869
870 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
871 let ab = Arc::clone(&applied_batches);
872 let (_sd_tx, sd_rx) = watch::channel(false);
873
874 let cursor = watch_applied(
875 watcher,
876 WatchScope::All,
877 Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
878 None::<AppendLogSnapshot>,
879 BatchConfig::default(),
880 parse_put,
881 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
882 move |_| {},
883 sd_rx,
884 )
885 .await
886 .unwrap();
887
888 assert_eq!(
889 cursor.as_u64(),
890 Some(11),
891 "cursor advances to the delta max"
892 );
893 assert_eq!(
894 *applied_batches.lock().unwrap(),
895 vec![b"3".to_vec(), b"4".to_vec()],
896 "only the post-cursor delta is applied, never the full set"
897 );
898 }
899
900 /// `WatchScope::Prefix` with no resume dispatches to `watch_prefix` and
901 /// applies the delivered updates. Every other test uses `WatchScope::All`;
902 /// this covers the prefix dispatch arm.
903 #[tokio::test]
904 async fn prefix_scope_applies_delivered_updates() {
905 let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
906 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
907
908 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
909 let ab = Arc::clone(&applied_batches);
910 let (_sd_tx, sd_rx) = watch::channel(false);
911
912 let cursor = watch_applied(
913 watcher,
914 WatchScope::Prefix("node.".to_string()),
915 None,
916 None::<AppendLogSnapshot>,
917 BatchConfig::default(),
918 parse_put,
919 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
920 move |_| {},
921 sd_rx,
922 )
923 .await
924 .unwrap();
925
926 assert_eq!(cursor.as_u64(), Some(2));
927 assert_eq!(
928 *applied_batches.lock().unwrap(),
929 vec![b"1".to_vec(), b"2".to_vec()]
930 );
931 }
932
933 /// `WatchScope::Prefix` resume whose cursor has expired falls back to the
934 /// full `watch_prefix` and still applies the delivered updates — the prefix
935 /// twin of `cursor_expired_falls_back_to_full_watch`.
936 #[tokio::test]
937 async fn prefix_cursor_expired_falls_back_to_full_prefix_watch() {
938 let mock = MockWatcher {
939 full: Mutex::new(Some(vec![put("node.a", b"1", 10), put("node.b", b"2", 11)])),
940 from: Mutex::new(Some(vec![])),
941 from_expires: true,
942 hold: false,
943 };
944 let watcher = Arc::new(mock);
945
946 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
947 let ab = Arc::clone(&applied_batches);
948 let (_sd_tx, sd_rx) = watch::channel(false);
949
950 let cursor = watch_applied(
951 watcher,
952 WatchScope::Prefix("node.".to_string()),
953 Some(WatchCursor::from_u64(5)), // resume position that "expired"
954 None::<AppendLogSnapshot>,
955 BatchConfig::default(),
956 parse_put,
957 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
958 move |_| {},
959 sd_rx,
960 )
961 .await
962 .unwrap();
963
964 assert_eq!(cursor.as_u64(), Some(11));
965 assert_eq!(
966 *applied_batches.lock().unwrap(),
967 vec![b"1".to_vec(), b"2".to_vec()],
968 "prefix fallback full watch's updates were applied"
969 );
970 }
971
972 /// The watch task's terminal error must propagate out of `watch_applied`
973 /// rather than being swallowed as `Ok(applied)` when the channel closes.
974 #[tokio::test]
975 async fn watch_task_error_propagates() {
976 let watcher = Arc::new(ErrorWatcher);
977 let (_sd_tx, sd_rx) = watch::channel(false);
978
979 let result = watch_applied(
980 watcher,
981 WatchScope::All,
982 None,
983 None::<AppendLogSnapshot>,
984 BatchConfig::default(),
985 parse_put,
986 move |_batch: Vec<Vec<u8>>| {},
987 move |_| {},
988 sd_rx,
989 )
990 .await;
991
992 match result {
993 Err(KvError::WatchError(msg)) => {
994 assert!(msg.contains("injected"), "error carries the cause: {msg}");
995 }
996 other => panic!("expected WatchError, got {other:?}"),
997 }
998 }
999
1000 /// A batch where `parse` accepts some updates and rejects others: the cursor
1001 /// must still advance to the highest *received* revision (covering the
1002 /// rejected entry in the middle), while `apply` sees only the accepted ones.
1003 #[tokio::test]
1004 async fn mixed_parse_advances_cursor_over_rejected_entries() {
1005 let updates = vec![
1006 put("keep.a", b"1", 5),
1007 put("skip.b", b"2", 6), // rejected by parse
1008 put("keep.c", b"3", 7),
1009 ];
1010 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1011
1012 let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1013 let on_applied_max = Arc::new(AtomicU64::new(0));
1014 let ab = Arc::clone(&applied_batches);
1015 let om = Arc::clone(&on_applied_max);
1016 let (_sd_tx, sd_rx) = watch::channel(false);
1017
1018 let cursor = watch_applied(
1019 watcher,
1020 WatchScope::All,
1021 None,
1022 None::<AppendLogSnapshot>,
1023 BatchConfig::default(),
1024 // Keep only keys under "keep."; reject everything else.
1025 |u: &KvUpdate| -> Option<Vec<u8>> {
1026 match u {
1027 KvUpdate::Put(e) if e.key.starts_with("keep.") => Some(e.value.clone()),
1028 _ => None,
1029 }
1030 },
1031 move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1032 move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1033 sd_rx,
1034 )
1035 .await
1036 .unwrap();
1037
1038 assert_eq!(
1039 cursor.as_u64(),
1040 Some(7),
1041 "cursor covers the rejected middle entry (rev 6)"
1042 );
1043 assert_eq!(
1044 *applied_batches.lock().unwrap(),
1045 vec![b"1".to_vec(), b"3".to_vec()],
1046 "apply sees only the accepted entries"
1047 );
1048 assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1049 }
1050
1051 /// Shutdown before any update arrives: nothing was received, so the cursor
1052 /// stays at the resume position (here `none()`), `apply` never runs, and
1053 /// `on_applied` never fires.
1054 #[tokio::test(start_paused = true)]
1055 async fn shutdown_with_no_pending_batch() {
1056 let watcher = Arc::new(MockWatcher::new(vec![], true)); // deliver nothing, hold open
1057
1058 let apply_calls = Arc::new(AtomicU64::new(0));
1059 let on_applied_calls = Arc::new(AtomicU64::new(0));
1060 let ac = Arc::clone(&apply_calls);
1061 let oc = Arc::clone(&on_applied_calls);
1062 let (sd_tx, sd_rx) = watch::channel(false);
1063
1064 let task = tokio::spawn(watch_applied(
1065 watcher,
1066 WatchScope::All,
1067 None,
1068 None::<AppendLogSnapshot>,
1069 BatchConfig::default(),
1070 parse_put,
1071 move |_batch: Vec<Vec<u8>>| {
1072 ac.fetch_add(1, Ordering::SeqCst);
1073 },
1074 move |_| {
1075 oc.fetch_add(1, Ordering::SeqCst);
1076 },
1077 sd_rx,
1078 ));
1079
1080 // Let the watcher attach and idle (it has nothing to deliver), then shut down.
1081 tokio::time::sleep(Duration::from_millis(1)).await;
1082 sd_tx.send(true).unwrap();
1083
1084 let cursor = task.await.unwrap().unwrap();
1085 assert_eq!(
1086 cursor.as_u64(),
1087 None,
1088 "no updates received → cursor unmoved"
1089 );
1090 assert_eq!(apply_calls.load(Ordering::SeqCst), 0, "apply never runs");
1091 assert_eq!(
1092 on_applied_calls.load(Ordering::SeqCst),
1093 0,
1094 "on_applied never fires"
1095 );
1096 }
1097
1098 /// With a low `compact_threshold`, the flush path's `spawn_blocking`
1099 /// compaction actually fires (every other snapshot test pins the threshold
1100 /// at `u64::MAX`, leaving that branch dead). After a compacting run the
1101 /// snapshot must still load cleanly with the right cursor and entries.
1102 #[tokio::test]
1103 async fn snapshot_compaction_fires_and_stays_consistent() {
1104 let dir = tempfile::TempDir::new().unwrap();
1105 let path = dir.path().join("applied.snap");
1106 // threshold 0 → every checkpoint reports "needs compact", forcing the
1107 // store's inline-compaction branch on each flush (run off the hot path via
1108 // spawn_blocking inside watch_applied).
1109 let (_resume, store) = AppendLogSnapshot::open(&path, 0).unwrap();
1110
1111 // Re-put the same key across flushes so compaction has duplicates to
1112 // dedup; small max forces multiple flushes (hence multiple compactions).
1113 let updates = vec![
1114 put("node.a", b"1", 1),
1115 put("node.a", b"2", 2),
1116 put("node.b", b"3", 3),
1117 put("node.a", b"4", 4),
1118 ];
1119 let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1120 let (_sd_tx, sd_rx) = watch::channel(false);
1121
1122 let cursor = watch_applied(
1123 watcher,
1124 WatchScope::All,
1125 None,
1126 Some(store),
1127 BatchConfig {
1128 window: Duration::from_secs(3600),
1129 max: 1, // one update per flush → a compaction per update
1130 },
1131 parse_put,
1132 move |_batch: Vec<Vec<u8>>| {},
1133 move |_| {},
1134 sd_rx,
1135 )
1136 .await
1137 .unwrap();
1138
1139 assert_eq!(cursor.as_u64(), Some(4));
1140
1141 let snap = crate::snapshot::load(&path).unwrap().unwrap();
1142 assert_eq!(
1143 snap.cursor.as_u64(),
1144 cursor.as_u64(),
1145 "compacted snapshot's cursor still equals the applied cursor"
1146 );
1147 assert_eq!(snap.entries.len(), 2, "duplicates of node.a deduped");
1148 assert_eq!(
1149 snap.entries["node.a"].value, b"4",
1150 "last write per key survives compaction"
1151 );
1152 assert_eq!(snap.entries["node.b"].value, b"3");
1153 }
1154}