Skip to main content

slipstream/
applied.rs

1//! Cursor-after-apply watch combinator.
2//!
3//! [`watch_applied`] drives a [`KvWatcher`], batches incoming [`KvUpdate`]s over
4//! a short window (or a max count), hands each batch to a caller-supplied
5//! `apply` closure, and **only then** advances the resume cursor, checkpoints
6//! the snapshot, and fires `on_applied`. It encodes one discipline that every
7//! hand-rolled watch loop in the wider system gets subtly wrong:
8//!
9//! > **INVARIANT.** A persisted/reported cursor `C` implies every update with
10//! > revision ≤ `C` has been *applied* — the caller's `apply()` has returned for
11//! > it. The cursor never advances on *receipt* of an update, only after it has
12//! > durably taken effect.
13//!
14//! ## Why receipt is the wrong signal
15//!
16//! The tempting shortcut is to bump the cursor as each update arrives off the
17//! channel (`high_water = rev` on `rx.recv()`), then apply the batch later. On a
18//! crash between those two steps the persisted cursor claims "caught up to rev
19//! N" while rev N is still sitting in an unapplied buffer. On resume the watch
20//! starts *past* rev N and silently skips it — a correctness hole in the exact
21//! "resume after any restart" guarantee this crate advertises.
22//!
23//! Saltzer, Reed & Clark's *End-to-End Arguments in System Design* (1984) names
24//! the fix: a function placed below the endpoints (here, the channel receive)
25//! can only be a performance hint; the *endpoint* — the application of the
26//! update — is the only place the "it happened" guarantee can actually be
27//! established. So the cursor is written from `apply()`'s completion, not from
28//! the transport's delivery.
29//!
30//! The cursor-as-monotonic-index-into-a-log shape itself follows HashiCorp
31//! Consul's anti-entropy / blocking-query lineage: a client holds the last index
32//! it has *reconciled* and re-arms the watch from there, never from the index it
33//! merely *saw*.
34//!
35//! ## What the caller supplies
36//!
37//! - `parse`: maps a raw [`KvUpdate`] to an optional domain value `U`. Returning
38//!   `None` (corrupt bytes, irrelevant key) is fine — the update is still
39//!   *received*, so it still counts toward the cursor; there is simply nothing to
40//!   apply for it.
41//! - `apply`: consumes a `Vec<U>` in revision order. This is the only domain
42//!   logic; for the tunnel router it swaps the route table, for the edge origin
43//!   watcher it rebuilds the hashrings.
44//! - `on_applied`: fires once per flush, *after* `apply` returns, with the new
45//!   applied cursor. Callers use it to persist the cursor for the next restart.
46//!
47//! ## Panics
48//!
49//! `apply` runs inline on the watch task. If it panics, the panic propagates out
50//! of [`watch_applied`] and aborts the watch — that is the caller's contract,
51//! the same as a panic in any other supplied closure.
52
53use std::sync::Arc;
54use std::time::Duration;
55
56use tokio::sync::mpsc;
57use tokio::sync::watch;
58use tracing::warn;
59
60use crate::kv::{KvError, KvUpdate, KvWatcher, WatchCursor};
61use crate::snapshot::SnapshotStore;
62
63/// What to watch: every key, or every key under a prefix.
64///
65/// Mirrors the [`KvWatcher`] surface — `All` maps to `watch_all` /
66/// `watch_all_from`, `Prefix` to `watch_prefix` / `watch_prefix_from`.
67#[derive(Debug, Clone)]
68pub enum WatchScope {
69    /// Watch all keys in the bucket.
70    All,
71    /// Watch only keys beginning with this prefix.
72    Prefix(String),
73}
74
75/// Batching policy for [`watch_applied`].
76///
77/// A flush fires when **either** bound is hit, whichever comes first: `window`
78/// time has elapsed since the batch opened, or `max` updates have accumulated.
79/// The window amortizes the cost of `apply` (e.g. one route-table clone per
80/// flush instead of one per update); `max` caps memory and latency when updates
81/// arrive faster than the window.
82#[derive(Debug, Clone, Copy)]
83pub struct BatchConfig {
84    /// Maximum time a batch stays open before being flushed.
85    pub window: Duration,
86    /// Maximum number of parsed updates in a batch before forcing a flush.
87    pub max: usize,
88}
89
90impl Default for BatchConfig {
91    /// 10 ms / 100 updates — the de-facto default every hand-rolled caller
92    /// already used, lifted into one place.
93    fn default() -> Self {
94        Self {
95            window: Duration::from_millis(10),
96            max: 100,
97        }
98    }
99}
100
101/// Drive a watch with cursor-after-apply semantics.
102///
103/// Subscribes per `scope` (resuming from `resume` when it carries a position),
104/// batches updates per `config`, applies each batch via `apply`, and only then
105/// advances the cursor / folds the batch into `store` / calls `on_applied`.
106/// Returns the final applied cursor when the watch ends (shutdown signalled, or
107/// the underlying stream closed).
108///
109/// `store` is any [`SnapshotStore`] backend the consumer chose (the in-RAM
110/// [`AppendLogSnapshot`](crate::AppendLogSnapshot) default, an on-disk backend, or
111/// its own impl) — or `None` to run without persistence. On each flush, *after*
112/// `apply` returns, the whole batch of raw [`KvUpdate`]s is handed to
113/// `store.apply(batch, applied_cursor)` on a blocking task, so the store's
114/// persisted cursor is always the post-apply cursor and never names a revision
115/// whose `apply` had not returned. The store fold is atomic (data + cursor), so a
116/// crash leaves the store consistent and resume re-folds only the tail.
117///
118/// On [`KvError::CursorExpired`] from the `*_from` resume path, this logs and
119/// falls back to a full-scope watch (`watch_all` / `watch_prefix`). Callers see
120/// the full re-list as a stream of puts, exactly as the hand-rolled loops did.
121///
122/// See `ARCHITECTURE.md` ("Applied-Cursor Watch") for the invariant and its
123/// rationale.
124///
125/// # Type parameters
126/// - `U`: the caller's domain update type, produced by `parse` and consumed by
127///   `apply`.
128// This combinator takes each of its dependencies as a parameter so every
129// caller-supplied closure (`parse`/`apply`/`on_applied`) keeps its own distinct
130// type and is monomorphized at the call site. Folding them into a builder struct
131// would either box the closures or force a single generic bundle, losing that.
132#[allow(clippy::too_many_arguments)]
133// The flush macro resets `batch_high`/`batch_deadline` for the next loop
134// iteration. At the two flush sites that return immediately afterward (shutdown,
135// channel-close) those resets are dead stores — correct, but flagged.
136#[allow(unused_assignments)]
137pub async fn watch_applied<U, S, P, A, O>(
138    watcher: Arc<dyn KvWatcher>,
139    scope: WatchScope,
140    resume: Option<WatchCursor>,
141    mut store: Option<S>,
142    config: BatchConfig,
143    mut parse: P,
144    mut apply: A,
145    mut on_applied: O,
146    mut shutdown: watch::Receiver<bool>,
147) -> Result<WatchCursor, KvError>
148where
149    U: Send,
150    // `Send + 'static`: each flush moves `store` onto a blocking task to run its
151    // (potentially blocking) `apply`, then takes it back — the same offload the
152    // append log's compaction always used.
153    S: SnapshotStore + Send + 'static,
154    P: FnMut(&KvUpdate) -> Option<U> + Send,
155    A: FnMut(Vec<U>) + Send,
156    O: FnMut(WatchCursor) + Send,
157{
158    // The cursor we'll return. Initialized from the resume position so that a
159    // watch which receives nothing new still reports the position it resumed
160    // from as "applied" (it is — everything up to it was applied before the last
161    // run persisted it).
162    let mut applied = match &resume {
163        Some(c) => c.clone(),
164        None => WatchCursor::none(),
165    };
166
167    // Spawn the watch task. It owns the cursor-expired fallback so the main loop
168    // only ever sees a clean ordered stream of updates on `rx`.
169    let (tx, mut rx) = mpsc::channel::<KvUpdate>(256);
170    let handle = {
171        let watcher = Arc::clone(&watcher);
172        tokio::spawn(async move { run_watch(watcher.as_ref(), &scope, resume, tx).await })
173    };
174
175    // Batch state.
176    //
177    // `batch_high` tracks the version of the most recently *received* update
178    // since the last flush — including updates `parse` rejected. NATS delivers
179    // in revision order, so the last received is the highest, and advancing the
180    // cursor to it after a single atomic `apply` is correct: having seen the max
181    // means we've seen everything below it, and a rejected entry is still
182    // "nothing to apply", hence covered. Reset to `none()` after every flush.
183    let batch_cap = config.max.clamp(1, 64);
184    let mut batch: Vec<U> = Vec::with_capacity(batch_cap);
185    // Raw received updates for the durable `store`, in revision order. Only
186    // populated when a `store` is present; the store folds the *raw* updates
187    // (including ones `parse` rejected — they are still part of the bucket's
188    // state), whereas the parsed `batch` above is the consumer's domain view.
189    let mut raw_batch: Vec<KvUpdate> = Vec::new();
190    let mut batch_high = WatchCursor::none();
191    // `Some` once a batch has opened and the window timer is armed; `None`
192    // between flushes. Only the armed/idle distinction is read in the loop —
193    // the absolute instant lives in the pinned `sleep` future below.
194    let mut batch_deadline: Option<tokio::time::Instant> = None;
195
196    // Flush the current batch, in order: run the domain `apply` (if non-empty) to
197    // completion, advance the cursor, fold the raw batch + cursor durably into
198    // `store`, then fire `on_applied`. The store fold runs on a blocking task
199    // (its `apply` may block on I/O), moving the store in and taking it back — the
200    // same offload the append log's compaction always used. A store error is
201    // logged and the watch continues (the snapshot is a cache); a panicked
202    // blocking task drops the store irrecoverably, which breaks the
203    // resume-after-restart guarantee, so it is surfaced as fatal.
204    macro_rules! flush {
205        () => {{
206            // Nothing received since the last flush → nothing to do at all.
207            if !batch.is_empty() || !batch_high.is_none() {
208                if !batch.is_empty() {
209                    // INVARIANT: apply() runs and RETURNS before any cursor
210                    // advance below. Move the batch out so a panicking apply
211                    // can't leave half-consumed state behind.
212                    //
213                    // `replace` (not `take`) leaves a pre-sized Vec behind so each
214                    // batch after the first doesn't re-climb the reallocation
215                    // ladder (4→8→…→cap).
216                    apply(std::mem::replace(&mut batch, Vec::with_capacity(batch_cap)));
217                }
218                if !batch_high.is_none() {
219                    applied = batch_high.clone();
220                    if let Some(mut st) = store.take() {
221                        let raw = std::mem::take(&mut raw_batch);
222                        let cur = applied.clone();
223                        // Hand the store back unconditionally on a clean return so
224                        // a *failed* apply (Ok(Err)) keeps the watch running; only
225                        // a *panicked* task (Err) loses the store and is fatal.
226                        match tokio::task::spawn_blocking(move || {
227                            let res = st.apply(&raw, &cur);
228                            (st, res)
229                        })
230                        .await
231                        {
232                            Ok((st, Ok(()))) => store = Some(st),
233                            Ok((st, Err(e))) => {
234                                warn!(error = %e, "snapshot store apply failed; continuing");
235                                store = Some(st);
236                            }
237                            Err(e) => {
238                                warn!(error = %e, "snapshot store task panicked; aborting watch");
239                                handle.abort();
240                                return Err(KvError::WatchError(format!(
241                                    "snapshot store task panicked: {e}"
242                                )));
243                            }
244                        }
245                    }
246                    on_applied(applied.clone());
247                    batch_high = WatchCursor::none();
248                }
249            }
250            batch_deadline = None;
251        }};
252    }
253
254    // A single timer future, reset in place each time a batch opens. The old
255    // `tokio::time::sleep(timeout)` lived inside the select arm, so it was
256    // re-created on every loop iteration — one Arc-backed timer-wheel entry
257    // allocated, registered, and immediately dropped per received update.
258    // Pinning one future and `reset`-ing it reuses that single allocation; the
259    // `if batch_deadline.is_some()` guard keeps it from firing while idle, so
260    // its initial already-elapsed deadline is never observed.
261    let sleep = tokio::time::sleep(Duration::ZERO);
262    tokio::pin!(sleep);
263
264    loop {
265        tokio::select! {
266            biased;
267
268            // Shutdown wins: flush whatever is batched (so the cursor reflects
269            // it), abandon any updates still in flight on the channel — they
270            // weren't applied, the cursor doesn't claim them, and they'll be
271            // re-delivered on the next resume — and return the applied cursor.
272            res = shutdown.changed() => {
273                if res.is_err() || *shutdown.borrow() {
274                    flush!();
275                    handle.abort();
276                    // Observe the task's terminal state. An abort surfaces as a
277                    // cancelled JoinError, which we ignore; a genuine panic that
278                    // raced ahead of the abort is logged rather than silently lost.
279                    if let Err(join) = handle.await
280                        && !join.is_cancelled()
281                    {
282                        warn!(error = %join, "watch task panicked at shutdown");
283                    }
284                    return Ok(applied);
285                }
286            }
287
288            // Batch window elapsed.
289            () = &mut sleep, if batch_deadline.is_some() => {
290                flush!();
291            }
292
293            update = rx.recv() => {
294                match update {
295                    Some(u) => {
296                        // Cursor authority: every received update bumps the
297                        // pending high-water, regardless of whether `parse`
298                        // keeps it.
299                        batch_high = WatchCursor::from_version(u.version().clone());
300
301                        // Buffer the raw update for the durable store fold (which
302                        // commits the whole batch + cursor atomically on flush).
303                        // Done before `parse` consumes `u` by reference, and only
304                        // when a store is present so the no-persistence path keeps
305                        // its zero-copy cost.
306                        if store.is_some() {
307                            raw_batch.push(u.clone());
308                        }
309
310                        if let Some(parsed) = parse(&u) {
311                            batch.push(parsed);
312                        }
313
314                        // Arm the window on the first received update of a batch
315                        // — even a parse-rejected one, so the cursor advances
316                        // within `window` even through a run of irrelevant keys.
317                        // Reset the pinned timer to the new deadline rather than
318                        // allocating a fresh `Sleep`.
319                        if batch_deadline.is_none() {
320                            let deadline = tokio::time::Instant::now() + config.window;
321                            sleep.as_mut().reset(deadline);
322                            batch_deadline = Some(deadline);
323                        }
324
325                        // Flush on a full parsed batch, or — when persisting — a
326                        // full raw batch, so a window packed with parse-rejected
327                        // updates can't grow `raw_batch` without bound before the
328                        // window elapses.
329                        if batch.len() >= config.max || raw_batch.len() >= config.max {
330                            flush!();
331                        }
332                    }
333                    None => {
334                        // Stream closed. Flush the remainder, then surface the
335                        // watch task's terminal result: a clean end returns the
336                        // applied cursor, an error propagates.
337                        flush!();
338                        return match handle.await {
339                            Ok(Ok(())) => Ok(applied),
340                            Ok(Err(e)) => Err(e),
341                            Err(join) => Err(KvError::WatchError(format!(
342                                "watch task panicked: {join}"
343                            ))),
344                        };
345                    }
346                }
347            }
348        }
349    }
350}
351
352/// Run the underlying watch for `scope`, resuming from `resume` when it carries
353/// a position, with the [`KvError::CursorExpired`] → full-watch fallback.
354async fn run_watch(
355    watcher: &dyn KvWatcher,
356    scope: &WatchScope,
357    resume: Option<WatchCursor>,
358    tx: mpsc::Sender<KvUpdate>,
359) -> Result<(), KvError> {
360    // Resume only when the cursor carries a real position; an absent or `none()`
361    // cursor falls through to a full watch. Binding `cursor` here makes "we have a
362    // resume position" structural — there is no separate bool whose truth a later
363    // edit could let drift from the `Some`.
364    let resume_cursor = resume.filter(|c| !c.is_none());
365
366    match scope {
367        WatchScope::All => {
368            if let Some(cursor) = resume_cursor {
369                match watcher.watch_all_from(&cursor, tx.clone()).await {
370                    Err(KvError::CursorExpired) => {
371                        // TODO(v2): signal a "resync" to the caller so it can
372                        // diff the full re-list against prior state and emit
373                        // synthetic deletes for keys that vanished during the
374                        // gap (see Snapshot::stale_keys). For v1 the full
375                        // re-list is replayed as a stream of puts, matching the
376                        // hand-rolled loops this combinator replaces.
377                        warn!("watch cursor expired, falling back to full watch_all");
378                        watcher.watch_all(tx).await
379                    }
380                    other => other,
381                }
382            } else {
383                watcher.watch_all(tx).await
384            }
385        }
386        WatchScope::Prefix(prefix) => {
387            if let Some(cursor) = resume_cursor {
388                match watcher.watch_prefix_from(prefix, &cursor, tx.clone()).await {
389                    Err(KvError::CursorExpired) => {
390                        // TODO(v2): see the watch_all arm above.
391                        warn!("watch cursor expired, falling back to full watch_prefix");
392                        watcher.watch_prefix(prefix, tx).await
393                    }
394                    other => other,
395                }
396            } else {
397                watcher.watch_prefix(prefix, tx).await
398            }
399        }
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406    use crate::kv::{KvEntry, VersionToken};
407    use crate::snapshot::AppendLogSnapshot;
408    use async_trait::async_trait;
409    use std::sync::Mutex;
410    use std::sync::atomic::{AtomicU64, Ordering};
411    use tokio::sync::mpsc::Sender;
412
413    fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
414        KvUpdate::Put(KvEntry {
415            key: key.to_string(),
416            value: value.to_vec(),
417            version: VersionToken::from_u64(rev),
418        })
419    }
420
421    /// A scripted watcher. Delivers a pre-set list of updates through the
422    /// channel, then either holds the channel open (so window/max/shutdown
423    /// flushes can be exercised without the stream ending) or returns cleanly
424    /// (so channel-close flushing can be exercised).
425    struct MockWatcher {
426        full: Mutex<Option<Vec<KvUpdate>>>,
427        from: Mutex<Option<Vec<KvUpdate>>>,
428        from_expires: bool,
429        hold: bool,
430    }
431
432    impl MockWatcher {
433        fn new(updates: Vec<KvUpdate>, hold: bool) -> Self {
434            Self {
435                full: Mutex::new(Some(updates)),
436                from: Mutex::new(None),
437                from_expires: false,
438                hold,
439            }
440        }
441
442        async fn deliver(&self, which: &Mutex<Option<Vec<KvUpdate>>>, tx: Sender<KvUpdate>) {
443            let updates = which.lock().unwrap().take().unwrap_or_default();
444            for u in updates {
445                if tx.send(u).await.is_err() {
446                    return;
447                }
448            }
449            if self.hold {
450                // Keep `tx` alive (channel open) until this task is aborted.
451                std::future::pending::<()>().await;
452            }
453        }
454    }
455
456    #[async_trait]
457    impl KvWatcher for MockWatcher {
458        async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
459            self.deliver(&self.full, tx).await;
460            Ok(())
461        }
462
463        async fn watch_prefix(&self, _prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
464            self.deliver(&self.full, tx).await;
465            Ok(())
466        }
467
468        async fn watch_all_from(
469            &self,
470            _cursor: &WatchCursor,
471            tx: Sender<KvUpdate>,
472        ) -> Result<(), KvError> {
473            if self.from_expires {
474                return Err(KvError::CursorExpired);
475            }
476            self.deliver(&self.from, tx).await;
477            Ok(())
478        }
479
480        // Mirror watch_all_from so the prefix resume / expiry arms of run_watch
481        // are exercised against the same `from` script. Without this the trait's
482        // default impl would delegate to watch_prefix and silently deliver the
483        // full set instead of the delta.
484        async fn watch_prefix_from(
485            &self,
486            _prefix: &str,
487            _cursor: &WatchCursor,
488            tx: Sender<KvUpdate>,
489        ) -> Result<(), KvError> {
490            if self.from_expires {
491                return Err(KvError::CursorExpired);
492            }
493            self.deliver(&self.from, tx).await;
494            Ok(())
495        }
496    }
497
498    /// A watcher whose entry points all fail. Used to prove the watch task's
499    /// terminal error is surfaced out of `watch_applied` rather than swallowed
500    /// as a clean `Ok(applied)` when the channel closes.
501    struct ErrorWatcher;
502
503    #[async_trait]
504    impl KvWatcher for ErrorWatcher {
505        async fn watch_all(&self, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
506            Err(KvError::WatchError("injected watch failure".into()))
507        }
508
509        async fn watch_prefix(&self, _prefix: &str, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
510            Err(KvError::WatchError("injected watch failure".into()))
511        }
512    }
513
514    // A no-op parse that keeps every Put as the value bytes; drops deletes.
515    fn parse_put(u: &KvUpdate) -> Option<Vec<u8>> {
516        match u {
517            KvUpdate::Put(e) => Some(e.value.clone()),
518            _ => None,
519        }
520    }
521
522    /// The stream closes (hold = false) with a pending batch; the remainder is
523    /// flushed before returning, the returned cursor is the last revision, and
524    /// `on_applied` ran exactly once after `apply`.
525    #[tokio::test]
526    async fn flush_on_channel_close() {
527        let updates = vec![put("a", b"1", 1), put("b", b"2", 2), put("c", b"3", 3)];
528        let watcher = Arc::new(MockWatcher::new(updates, false));
529
530        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<Vec<u8>>>::new()));
531        let on_applied_cursors = Arc::new(Mutex::new(Vec::<u64>::new()));
532
533        let ab = Arc::clone(&applied_batches);
534        let oc = Arc::clone(&on_applied_cursors);
535        let (_sd_tx, sd_rx) = watch::channel(false);
536
537        let cursor = watch_applied(
538            watcher,
539            WatchScope::All,
540            None,
541            None::<AppendLogSnapshot>,
542            BatchConfig::default(),
543            parse_put,
544            move |batch| ab.lock().unwrap().push(batch),
545            move |c| oc.lock().unwrap().push(c.as_u64().unwrap()),
546            sd_rx,
547        )
548        .await
549        .unwrap();
550
551        assert_eq!(cursor.as_u64(), Some(3));
552        let batches = applied_batches.lock().unwrap();
553        let flat: Vec<Vec<u8>> = batches.iter().flatten().cloned().collect();
554        assert_eq!(flat, vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec()]);
555        assert_eq!(*on_applied_cursors.lock().unwrap().last().unwrap(), 3);
556    }
557
558    /// Fewer than `max` updates, then the channel idles: the window timer must
559    /// flush them and advance the cursor.
560    #[tokio::test(start_paused = true)]
561    async fn flush_on_window() {
562        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
563        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
564
565        let applied = Arc::new(AtomicU64::new(0));
566        let count = Arc::new(AtomicU64::new(0));
567        let a = Arc::clone(&applied);
568        let c = Arc::clone(&count);
569        let (sd_tx, sd_rx) = watch::channel(false);
570
571        let task = tokio::spawn(watch_applied(
572            watcher,
573            WatchScope::All,
574            None,
575            None::<AppendLogSnapshot>,
576            BatchConfig::default(),
577            parse_put,
578            move |batch: Vec<Vec<u8>>| {
579                c.fetch_add(batch.len() as u64, Ordering::SeqCst);
580            },
581            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
582            sd_rx,
583        ));
584
585        // Let the window (10ms) elapse under virtual time.
586        tokio::time::sleep(Duration::from_millis(50)).await;
587        assert_eq!(
588            count.load(Ordering::SeqCst),
589            2,
590            "window should have flushed"
591        );
592        assert_eq!(applied.load(Ordering::SeqCst), 2);
593
594        sd_tx.send(true).unwrap();
595        let cursor = task.await.unwrap().unwrap();
596        assert_eq!(cursor.as_u64(), Some(2));
597    }
598
599    /// Exactly `max` updates fills a batch and flushes immediately — before the
600    /// window would have elapsed.
601    #[tokio::test(start_paused = true)]
602    async fn flush_on_max() {
603        let max = 4;
604        let updates: Vec<_> = (1..=max as u64)
605            .map(|i| put(&format!("k{i}"), b"v", i))
606            .collect();
607        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
608
609        let flushes = Arc::new(Mutex::new(Vec::<usize>::new()));
610        let f = Arc::clone(&flushes);
611        let (sd_tx, sd_rx) = watch::channel(false);
612
613        let task = tokio::spawn(watch_applied(
614            watcher,
615            WatchScope::All,
616            None,
617            None::<AppendLogSnapshot>,
618            BatchConfig {
619                window: Duration::from_secs(3600), // effectively never
620                max,
621            },
622            parse_put,
623            move |batch: Vec<Vec<u8>>| f.lock().unwrap().push(batch.len()),
624            move |_| {},
625            sd_rx,
626        ));
627
628        // Yield enough for the mock to push all `max` updates; the window is an
629        // hour, so any flush is purely the max trigger.
630        tokio::time::sleep(Duration::from_millis(1)).await;
631        assert_eq!(
632            *flushes.lock().unwrap(),
633            vec![max],
634            "a full batch should flush on max, not wait for the window"
635        );
636
637        sd_tx.send(true).unwrap();
638        task.await.unwrap().unwrap();
639    }
640
641    /// A pending batch plus a shutdown signal: the batch is flushed and the
642    /// applied cursor returned.
643    #[tokio::test(start_paused = true)]
644    async fn flush_on_shutdown() {
645        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
646        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
647
648        let applied = Arc::new(AtomicU64::new(0));
649        let a = Arc::clone(&applied);
650        let (sd_tx, sd_rx) = watch::channel(false);
651
652        let task = tokio::spawn(watch_applied(
653            watcher,
654            WatchScope::All,
655            None,
656            None::<AppendLogSnapshot>,
657            BatchConfig {
658                window: Duration::from_secs(3600), // window won't fire
659                max: 100,
660            },
661            parse_put,
662            move |_batch: Vec<Vec<u8>>| {},
663            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
664            sd_rx,
665        ));
666
667        // Give the mock time to deliver both updates into the pending batch.
668        tokio::time::sleep(Duration::from_millis(1)).await;
669        sd_tx.send(true).unwrap();
670
671        let cursor = task.await.unwrap().unwrap();
672        assert_eq!(
673            cursor.as_u64(),
674            Some(2),
675            "shutdown flushes the pending batch"
676        );
677        assert_eq!(applied.load(Ordering::SeqCst), 2);
678    }
679
680    /// The cursor must not advance until `apply` has returned. We prove it by
681    /// having `apply` read the cursor that `on_applied` last published: when the
682    /// second batch is applied, the visible cursor must still be the *first*
683    /// batch's — never the second's, which only becomes visible after this
684    /// `apply` returns.
685    #[tokio::test(start_paused = true)]
686    async fn cursor_advances_only_after_apply() {
687        // Two batches of `max` updates each.
688        let max = 2usize;
689        let updates: Vec<_> = (1..=4u64).map(|i| put(&format!("k{i}"), b"v", i)).collect();
690        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
691
692        // Cursor as last published by on_applied; starts at 0 (nothing applied).
693        let published = Arc::new(AtomicU64::new(0));
694        // What `apply` observed as the published cursor at the moment it ran.
695        let seen_at_apply = Arc::new(Mutex::new(Vec::<u64>::new()));
696
697        let pub_for_apply = Arc::clone(&published);
698        let seen = Arc::clone(&seen_at_apply);
699        let pub_for_on = Arc::clone(&published);
700        let (sd_tx, sd_rx) = watch::channel(false);
701
702        let task = tokio::spawn(watch_applied(
703            watcher,
704            WatchScope::All,
705            None,
706            None::<AppendLogSnapshot>,
707            BatchConfig {
708                window: Duration::from_secs(3600),
709                max,
710            },
711            parse_put,
712            move |_batch: Vec<Vec<u8>>| {
713                // The cursor visible here is whatever the PREVIOUS flush
714                // published — never this batch's, because we haven't returned.
715                seen.lock()
716                    .unwrap()
717                    .push(pub_for_apply.load(Ordering::SeqCst));
718            },
719            move |cur| pub_for_on.store(cur.as_u64().unwrap(), Ordering::SeqCst),
720            sd_rx,
721        ));
722
723        tokio::time::sleep(Duration::from_millis(1)).await;
724        sd_tx.send(true).unwrap();
725        task.await.unwrap().unwrap();
726
727        // First apply saw 0 (nothing applied yet); second apply saw 2 (first
728        // batch's cursor), NOT 4. The cursor only reached 4 after the second
729        // apply returned.
730        assert_eq!(*seen_at_apply.lock().unwrap(), vec![0, 2]);
731        assert_eq!(published.load(Ordering::SeqCst), 4);
732    }
733
734    /// Updates whose `parse` returns `None` (corrupt / irrelevant) carry no
735    /// domain work, but they were still received — so the cursor must advance
736    /// over them.
737    #[tokio::test]
738    async fn corrupt_parse_entries_advance_cursor() {
739        let updates = vec![put("a", b"1", 5), put("b", b"2", 6), put("c", b"3", 7)];
740        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
741
742        let apply_calls = Arc::new(AtomicU64::new(0));
743        let on_applied_max = Arc::new(AtomicU64::new(0));
744        let ac = Arc::clone(&apply_calls);
745        let om = Arc::clone(&on_applied_max);
746        let (_sd_tx, sd_rx) = watch::channel(false);
747
748        let cursor = watch_applied(
749            watcher,
750            WatchScope::All,
751            None,
752            None::<AppendLogSnapshot>,
753            BatchConfig::default(),
754            // Reject everything — simulates corrupt/irrelevant entries.
755            |_u: &KvUpdate| -> Option<Vec<u8>> { None },
756            move |batch: Vec<Vec<u8>>| {
757                ac.fetch_add(1, Ordering::SeqCst);
758                assert!(batch.is_empty());
759            },
760            move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
761            sd_rx,
762        )
763        .await
764        .unwrap();
765
766        assert_eq!(cursor.as_u64(), Some(7), "cursor covers rejected updates");
767        assert_eq!(
768            apply_calls.load(Ordering::SeqCst),
769            0,
770            "an all-rejected batch applies nothing"
771        );
772        assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
773    }
774
775    /// A resume whose cursor has expired falls back to the full watch and still
776    /// applies the delivered updates.
777    #[tokio::test]
778    async fn cursor_expired_falls_back_to_full_watch() {
779        let mock = MockWatcher {
780            full: Mutex::new(Some(vec![put("a", b"1", 10), put("b", b"2", 11)])),
781            from: Mutex::new(Some(vec![])),
782            from_expires: true,
783            hold: false,
784        };
785        let watcher = Arc::new(mock);
786
787        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
788        let ab = Arc::clone(&applied_batches);
789        let (_sd_tx, sd_rx) = watch::channel(false);
790
791        let cursor = watch_applied(
792            watcher,
793            WatchScope::All,
794            Some(WatchCursor::from_u64(5)), // resume position that "expired"
795            None::<AppendLogSnapshot>,
796            BatchConfig::default(),
797            parse_put,
798            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
799            move |_| {},
800            sd_rx,
801        )
802        .await
803        .unwrap();
804
805        assert_eq!(cursor.as_u64(), Some(11));
806        assert_eq!(
807            *applied_batches.lock().unwrap(),
808            vec![b"1".to_vec(), b"2".to_vec()],
809            "fallback full watch's updates were applied"
810        );
811    }
812
813    /// End-to-end with a real snapshot file: after the run, the persisted
814    /// snapshot's cursor equals the applied cursor and its entries match the
815    /// applied state — proving the checkpoint is written at the post-apply
816    /// cursor, never ahead of it.
817    #[tokio::test]
818    async fn snapshot_checkpoint_matches_applied_cursor() {
819        let dir = tempfile::TempDir::new().unwrap();
820        let path = dir.path().join("applied.snap");
821        let (_resume, store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
822
823        let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
824        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
825        let (_sd_tx, sd_rx) = watch::channel(false);
826
827        let cursor = watch_applied(
828            watcher,
829            WatchScope::All,
830            None,
831            Some(store),
832            BatchConfig::default(),
833            parse_put,
834            move |_batch: Vec<Vec<u8>>| {},
835            move |_| {},
836            sd_rx,
837        )
838        .await
839        .unwrap();
840
841        assert_eq!(cursor.as_u64(), Some(2));
842
843        let snap = crate::snapshot::load(&path).unwrap().unwrap();
844        assert_eq!(
845            snap.cursor.as_u64(),
846            cursor.as_u64(),
847            "snapshot checkpoint cursor must equal the applied cursor"
848        );
849        assert_eq!(snap.entries.len(), 2);
850        assert_eq!(snap.entries["node.a"].value, b"1");
851        assert_eq!(snap.entries["node.b"].value, b"2");
852    }
853
854    /// Happy-path resume: a non-expired cursor takes the `*_from` path and the
855    /// delta (the `from` script, NOT the full set) is applied. Proves the
856    /// resume branch delivers only post-cursor updates and advances to their
857    /// max revision.
858    #[tokio::test]
859    async fn resume_from_cursor_delivers_only_delta() {
860        let mock = MockWatcher {
861            // `full` would be delivered only if the resume path were (wrongly)
862            // bypassed; a non-empty distinguishing value makes that visible.
863            full: Mutex::new(Some(vec![put("full.x", b"FULL", 1)])),
864            from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
865            from_expires: false,
866            hold: false,
867        };
868        let watcher = Arc::new(mock);
869
870        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
871        let ab = Arc::clone(&applied_batches);
872        let (_sd_tx, sd_rx) = watch::channel(false);
873
874        let cursor = watch_applied(
875            watcher,
876            WatchScope::All,
877            Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
878            None::<AppendLogSnapshot>,
879            BatchConfig::default(),
880            parse_put,
881            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
882            move |_| {},
883            sd_rx,
884        )
885        .await
886        .unwrap();
887
888        assert_eq!(
889            cursor.as_u64(),
890            Some(11),
891            "cursor advances to the delta max"
892        );
893        assert_eq!(
894            *applied_batches.lock().unwrap(),
895            vec![b"3".to_vec(), b"4".to_vec()],
896            "only the post-cursor delta is applied, never the full set"
897        );
898    }
899
900    /// `WatchScope::Prefix` with no resume dispatches to `watch_prefix` and
901    /// applies the delivered updates. Every other test uses `WatchScope::All`;
902    /// this covers the prefix dispatch arm.
903    #[tokio::test]
904    async fn prefix_scope_applies_delivered_updates() {
905        let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
906        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
907
908        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
909        let ab = Arc::clone(&applied_batches);
910        let (_sd_tx, sd_rx) = watch::channel(false);
911
912        let cursor = watch_applied(
913            watcher,
914            WatchScope::Prefix("node.".to_string()),
915            None,
916            None::<AppendLogSnapshot>,
917            BatchConfig::default(),
918            parse_put,
919            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
920            move |_| {},
921            sd_rx,
922        )
923        .await
924        .unwrap();
925
926        assert_eq!(cursor.as_u64(), Some(2));
927        assert_eq!(
928            *applied_batches.lock().unwrap(),
929            vec![b"1".to_vec(), b"2".to_vec()]
930        );
931    }
932
933    /// `WatchScope::Prefix` resume whose cursor has expired falls back to the
934    /// full `watch_prefix` and still applies the delivered updates — the prefix
935    /// twin of `cursor_expired_falls_back_to_full_watch`.
936    #[tokio::test]
937    async fn prefix_cursor_expired_falls_back_to_full_prefix_watch() {
938        let mock = MockWatcher {
939            full: Mutex::new(Some(vec![put("node.a", b"1", 10), put("node.b", b"2", 11)])),
940            from: Mutex::new(Some(vec![])),
941            from_expires: true,
942            hold: false,
943        };
944        let watcher = Arc::new(mock);
945
946        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
947        let ab = Arc::clone(&applied_batches);
948        let (_sd_tx, sd_rx) = watch::channel(false);
949
950        let cursor = watch_applied(
951            watcher,
952            WatchScope::Prefix("node.".to_string()),
953            Some(WatchCursor::from_u64(5)), // resume position that "expired"
954            None::<AppendLogSnapshot>,
955            BatchConfig::default(),
956            parse_put,
957            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
958            move |_| {},
959            sd_rx,
960        )
961        .await
962        .unwrap();
963
964        assert_eq!(cursor.as_u64(), Some(11));
965        assert_eq!(
966            *applied_batches.lock().unwrap(),
967            vec![b"1".to_vec(), b"2".to_vec()],
968            "prefix fallback full watch's updates were applied"
969        );
970    }
971
972    /// The watch task's terminal error must propagate out of `watch_applied`
973    /// rather than being swallowed as `Ok(applied)` when the channel closes.
974    #[tokio::test]
975    async fn watch_task_error_propagates() {
976        let watcher = Arc::new(ErrorWatcher);
977        let (_sd_tx, sd_rx) = watch::channel(false);
978
979        let result = watch_applied(
980            watcher,
981            WatchScope::All,
982            None,
983            None::<AppendLogSnapshot>,
984            BatchConfig::default(),
985            parse_put,
986            move |_batch: Vec<Vec<u8>>| {},
987            move |_| {},
988            sd_rx,
989        )
990        .await;
991
992        match result {
993            Err(KvError::WatchError(msg)) => {
994                assert!(msg.contains("injected"), "error carries the cause: {msg}");
995            }
996            other => panic!("expected WatchError, got {other:?}"),
997        }
998    }
999
1000    /// A batch where `parse` accepts some updates and rejects others: the cursor
1001    /// must still advance to the highest *received* revision (covering the
1002    /// rejected entry in the middle), while `apply` sees only the accepted ones.
1003    #[tokio::test]
1004    async fn mixed_parse_advances_cursor_over_rejected_entries() {
1005        let updates = vec![
1006            put("keep.a", b"1", 5),
1007            put("skip.b", b"2", 6), // rejected by parse
1008            put("keep.c", b"3", 7),
1009        ];
1010        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1011
1012        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1013        let on_applied_max = Arc::new(AtomicU64::new(0));
1014        let ab = Arc::clone(&applied_batches);
1015        let om = Arc::clone(&on_applied_max);
1016        let (_sd_tx, sd_rx) = watch::channel(false);
1017
1018        let cursor = watch_applied(
1019            watcher,
1020            WatchScope::All,
1021            None,
1022            None::<AppendLogSnapshot>,
1023            BatchConfig::default(),
1024            // Keep only keys under "keep."; reject everything else.
1025            |u: &KvUpdate| -> Option<Vec<u8>> {
1026                match u {
1027                    KvUpdate::Put(e) if e.key.starts_with("keep.") => Some(e.value.clone()),
1028                    _ => None,
1029                }
1030            },
1031            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1032            move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1033            sd_rx,
1034        )
1035        .await
1036        .unwrap();
1037
1038        assert_eq!(
1039            cursor.as_u64(),
1040            Some(7),
1041            "cursor covers the rejected middle entry (rev 6)"
1042        );
1043        assert_eq!(
1044            *applied_batches.lock().unwrap(),
1045            vec![b"1".to_vec(), b"3".to_vec()],
1046            "apply sees only the accepted entries"
1047        );
1048        assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1049    }
1050
1051    /// Shutdown before any update arrives: nothing was received, so the cursor
1052    /// stays at the resume position (here `none()`), `apply` never runs, and
1053    /// `on_applied` never fires.
1054    #[tokio::test(start_paused = true)]
1055    async fn shutdown_with_no_pending_batch() {
1056        let watcher = Arc::new(MockWatcher::new(vec![], true)); // deliver nothing, hold open
1057
1058        let apply_calls = Arc::new(AtomicU64::new(0));
1059        let on_applied_calls = Arc::new(AtomicU64::new(0));
1060        let ac = Arc::clone(&apply_calls);
1061        let oc = Arc::clone(&on_applied_calls);
1062        let (sd_tx, sd_rx) = watch::channel(false);
1063
1064        let task = tokio::spawn(watch_applied(
1065            watcher,
1066            WatchScope::All,
1067            None,
1068            None::<AppendLogSnapshot>,
1069            BatchConfig::default(),
1070            parse_put,
1071            move |_batch: Vec<Vec<u8>>| {
1072                ac.fetch_add(1, Ordering::SeqCst);
1073            },
1074            move |_| {
1075                oc.fetch_add(1, Ordering::SeqCst);
1076            },
1077            sd_rx,
1078        ));
1079
1080        // Let the watcher attach and idle (it has nothing to deliver), then shut down.
1081        tokio::time::sleep(Duration::from_millis(1)).await;
1082        sd_tx.send(true).unwrap();
1083
1084        let cursor = task.await.unwrap().unwrap();
1085        assert_eq!(
1086            cursor.as_u64(),
1087            None,
1088            "no updates received → cursor unmoved"
1089        );
1090        assert_eq!(apply_calls.load(Ordering::SeqCst), 0, "apply never runs");
1091        assert_eq!(
1092            on_applied_calls.load(Ordering::SeqCst),
1093            0,
1094            "on_applied never fires"
1095        );
1096    }
1097
1098    /// With a low `compact_threshold`, the flush path's `spawn_blocking`
1099    /// compaction actually fires (every other snapshot test pins the threshold
1100    /// at `u64::MAX`, leaving that branch dead). After a compacting run the
1101    /// snapshot must still load cleanly with the right cursor and entries.
1102    #[tokio::test]
1103    async fn snapshot_compaction_fires_and_stays_consistent() {
1104        let dir = tempfile::TempDir::new().unwrap();
1105        let path = dir.path().join("applied.snap");
1106        // threshold 0 → every checkpoint reports "needs compact", forcing the
1107        // store's inline-compaction branch on each flush (run off the hot path via
1108        // spawn_blocking inside watch_applied).
1109        let (_resume, store) = AppendLogSnapshot::open(&path, 0).unwrap();
1110
1111        // Re-put the same key across flushes so compaction has duplicates to
1112        // dedup; small max forces multiple flushes (hence multiple compactions).
1113        let updates = vec![
1114            put("node.a", b"1", 1),
1115            put("node.a", b"2", 2),
1116            put("node.b", b"3", 3),
1117            put("node.a", b"4", 4),
1118        ];
1119        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1120        let (_sd_tx, sd_rx) = watch::channel(false);
1121
1122        let cursor = watch_applied(
1123            watcher,
1124            WatchScope::All,
1125            None,
1126            Some(store),
1127            BatchConfig {
1128                window: Duration::from_secs(3600),
1129                max: 1, // one update per flush → a compaction per update
1130            },
1131            parse_put,
1132            move |_batch: Vec<Vec<u8>>| {},
1133            move |_| {},
1134            sd_rx,
1135        )
1136        .await
1137        .unwrap();
1138
1139        assert_eq!(cursor.as_u64(), Some(4));
1140
1141        let snap = crate::snapshot::load(&path).unwrap().unwrap();
1142        assert_eq!(
1143            snap.cursor.as_u64(),
1144            cursor.as_u64(),
1145            "compacted snapshot's cursor still equals the applied cursor"
1146        );
1147        assert_eq!(snap.entries.len(), 2, "duplicates of node.a deduped");
1148        assert_eq!(
1149            snap.entries["node.a"].value, b"4",
1150            "last write per key survives compaction"
1151        );
1152        assert_eq!(snap.entries["node.b"].value, b"3");
1153    }
1154}