Skip to main content

slipstream/
applied.rs

1//! Cursor-after-apply watch combinator.
2//!
3//! [`watch_applied`] drives a [`KvWatcher`], batches incoming [`KvUpdate`]s over
4//! a short window (or a max count), hands each batch to a caller-supplied
5//! `apply` closure, and **only then** advances the resume cursor, checkpoints
6//! the snapshot, and fires `on_applied`. It encodes one discipline that every
7//! hand-rolled watch loop in the wider system gets subtly wrong:
8//!
9//! > **INVARIANT.** A persisted/reported cursor `C` implies every update with
10//! > revision ≤ `C` has been *applied* — the caller's `apply()` has returned for
11//! > it. The cursor never advances on *receipt* of an update, only after it has
12//! > durably taken effect.
13//!
14//! ## Why receipt is the wrong signal
15//!
16//! The tempting shortcut is to bump the cursor as each update arrives off the
17//! channel (`high_water = rev` on `rx.recv()`), then apply the batch later. On a
18//! crash between those two steps the persisted cursor claims "caught up to rev
19//! N" while rev N is still sitting in an unapplied buffer. On resume the watch
20//! starts *past* rev N and silently skips it — a correctness hole in the exact
21//! "resume after any restart" guarantee this crate advertises.
22//!
23//! Saltzer, Reed & Clark's *End-to-End Arguments in System Design* (1984) names
24//! the fix: a function placed below the endpoints (here, the channel receive)
25//! can only be a performance hint; the *endpoint* — the application of the
26//! update — is the only place the "it happened" guarantee can actually be
27//! established. So the cursor is written from `apply()`'s completion, not from
28//! the transport's delivery.
29//!
30//! The cursor-as-monotonic-index-into-a-log shape itself follows HashiCorp
31//! Consul's anti-entropy / blocking-query lineage: a client holds the last index
32//! it has *reconciled* and re-arms the watch from there, never from the index it
33//! merely *saw*.
34//!
35//! ## What the caller supplies
36//!
37//! - `parse`: maps a raw [`KvUpdate`] to an optional domain value `U`. Returning
38//!   `None` (corrupt bytes, irrelevant key) is fine — the update is still
39//!   *received*, so it still counts toward the cursor; there is simply nothing to
40//!   apply for it.
41//! - `apply`: consumes a `Vec<U>` in revision order. This is the only domain
42//!   logic; for the tunnel router it swaps the route table, for the edge origin
43//!   watcher it rebuilds the hashrings.
44//! - `on_applied`: fires once per flush, *after* `apply` returns, with the new
45//!   applied cursor. Callers use it to persist the cursor for the next restart.
46//!
47//! ## Panics
48//!
49//! `apply` runs inline on the watch task. If it panics, the panic propagates out
50//! of [`watch_applied`] and aborts the watch — that is the caller's contract,
51//! the same as a panic in any other supplied closure.
52
53use std::sync::Arc;
54use std::time::Duration;
55
56use tokio::sync::mpsc;
57use tokio::sync::watch;
58use tracing::warn;
59
60use crate::kv::{KvError, KvUpdate, KvWatcher, WatchCursor};
61use crate::snapshot::SnapshotStore;
62
63/// What to watch: every key, or every key under a prefix.
64///
65/// Mirrors the [`KvWatcher`] surface — `All` maps to `watch_all` /
66/// `watch_all_from`, `Prefix` to `watch_prefix` / `watch_prefix_from`.
67#[derive(Debug, Clone)]
68pub enum WatchScope {
69    /// Watch all keys in the bucket.
70    All,
71    /// Watch only keys beginning with this prefix.
72    Prefix(String),
73}
74
75/// Batching policy for [`watch_applied`].
76///
77/// A flush fires when **either** bound is hit, whichever comes first: `window`
78/// time has elapsed since the batch opened, or `max` updates have accumulated.
79/// The window amortizes the cost of `apply` (e.g. one route-table clone per
80/// flush instead of one per update); `max` caps memory and latency when updates
81/// arrive faster than the window.
82#[derive(Debug, Clone, Copy)]
83pub struct BatchConfig {
84    /// Maximum time a batch stays open before being flushed.
85    pub window: Duration,
86    /// Maximum number of parsed updates in a batch before forcing a flush.
87    pub max: usize,
88}
89
90impl Default for BatchConfig {
91    /// 10 ms / 100 updates — the de-facto default every hand-rolled caller
92    /// already used, lifted into one place.
93    fn default() -> Self {
94        Self {
95            window: Duration::from_millis(10),
96            max: 100,
97        }
98    }
99}
100
101/// Drive a watch with cursor-after-apply semantics.
102///
103/// Subscribes per `scope` (resuming from `resume` when it carries a position),
104/// batches updates per `config`, applies each batch via `apply`, and only then
105/// advances the cursor / folds the batch into `store` / calls `on_applied`.
106/// Returns the final applied cursor when the watch ends (shutdown signalled, or
107/// the underlying stream closed).
108///
109/// `store` is any [`SnapshotStore`] backend the consumer chose (the in-RAM
110/// [`AppendLogSnapshot`](crate::AppendLogSnapshot) default, an on-disk backend, or
111/// its own impl) — or `None` to run without persistence. On each flush, *after*
112/// `apply` returns, the whole batch of raw [`KvUpdate`]s is handed to
113/// `store.apply(batch, applied_cursor)` on a blocking task, so the store's
114/// persisted cursor is always the post-apply cursor and never names a revision
115/// whose `apply` had not returned. The store fold is atomic (data + cursor), so a
116/// crash leaves the store consistent and resume re-folds only the tail.
117///
118/// On [`KvError::CursorExpired`] from the `*_from` resume path, this logs and
119/// falls back to a full-scope watch (`watch_all` / `watch_prefix`). Callers see
120/// the full re-list as a stream of puts, exactly as the hand-rolled loops did.
121///
122/// See `ARCHITECTURE.md` ("Applied-Cursor Watch") for the invariant and its
123/// rationale.
124///
125/// # Type parameters
126/// - `U`: the caller's domain update type, produced by `parse` and consumed by
127///   `apply`.
128// This combinator takes each of its dependencies as a parameter so every
129// caller-supplied closure (`parse`/`apply`/`on_applied`) keeps its own distinct
130// type and is monomorphized at the call site. Folding them into a builder struct
131// would either box the closures or force a single generic bundle, losing that.
132#[allow(clippy::too_many_arguments)]
133// The flush macro resets `batch_high`/`batch_deadline` for the next loop
134// iteration. At the two flush sites that return immediately afterward (shutdown,
135// channel-close) those resets are dead stores — correct, but flagged.
136#[allow(unused_assignments)]
137pub async fn watch_applied<U, S, P, A, O>(
138    watcher: Arc<dyn KvWatcher>,
139    scope: WatchScope,
140    resume: Option<WatchCursor>,
141    mut store: Option<S>,
142    config: BatchConfig,
143    mut parse: P,
144    mut apply: A,
145    mut on_applied: O,
146    mut shutdown: watch::Receiver<bool>,
147) -> Result<WatchCursor, KvError>
148where
149    U: Send,
150    // `Send + 'static`: each flush moves `store` onto a blocking task to run its
151    // (potentially blocking) `apply`, then takes it back — the same offload the
152    // append log's compaction always used.
153    S: SnapshotStore + Send + 'static,
154    P: FnMut(&KvUpdate) -> Option<U> + Send,
155    A: FnMut(Vec<U>) + Send,
156    O: FnMut(WatchCursor) + Send,
157{
158    // The cursor we'll return. Initialized from the resume position so that a
159    // watch which receives nothing new still reports the position it resumed
160    // from as "applied" (it is — everything up to it was applied before the last
161    // run persisted it).
162    let mut applied = match &resume {
163        Some(c) => c.clone(),
164        None => WatchCursor::none(),
165    };
166
167    // Spawn the watch task. It owns the cursor-expired fallback so the main loop
168    // only ever sees a clean ordered stream of updates on `rx`.
169    let (tx, mut rx) = mpsc::channel::<KvUpdate>(256);
170    let handle = {
171        let watcher = Arc::clone(&watcher);
172        tokio::spawn(async move { run_watch(watcher.as_ref(), &scope, resume, tx).await })
173    };
174
175    // Batch state.
176    //
177    // `batch_high` tracks the version of the most recently *received* update
178    // since the last flush — including updates `parse` rejected. NATS delivers
179    // in revision order, so the last received is the highest, and advancing the
180    // cursor to it after a single atomic `apply` is correct: having seen the max
181    // means we've seen everything below it, and a rejected entry is still
182    // "nothing to apply", hence covered. Reset to `none()` after every flush.
183    let batch_cap = config.max.clamp(1, 64);
184    let mut batch: Vec<U> = Vec::with_capacity(batch_cap);
185    // Raw received updates for the durable `store`, in revision order. Only
186    // populated when a `store` is present; the store folds the *raw* updates
187    // (including ones `parse` rejected — they are still part of the bucket's
188    // state), whereas the parsed `batch` above is the consumer's domain view.
189    let mut raw_batch: Vec<KvUpdate> = Vec::new();
190    let mut batch_high = WatchCursor::none();
191    // `Some` once a batch has opened and the window timer is armed; `None`
192    // between flushes. Only the armed/idle distinction is read in the loop —
193    // the absolute instant lives in the pinned `sleep` future below.
194    let mut batch_deadline: Option<tokio::time::Instant> = None;
195
196    // Flush the current batch, in order: run the domain `apply` (if non-empty) to
197    // completion, advance the cursor, fold the raw batch + cursor durably into
198    // `store`, then fire `on_applied`. The store fold runs on a blocking task
199    // (its `apply` may block on I/O), moving the store in and taking it back — the
200    // same offload the append log's compaction always used. A store error is
201    // logged and the watch continues (the snapshot is a cache); a panicked
202    // blocking task drops the store irrecoverably, which breaks the
203    // resume-after-restart guarantee, so it is surfaced as fatal.
204    macro_rules! flush {
205        () => {{
206            // Nothing received since the last flush → nothing to do at all.
207            if !batch.is_empty() || !batch_high.is_none() {
208                if !batch.is_empty() {
209                    // INVARIANT: apply() runs and RETURNS before any cursor
210                    // advance below. Move the batch out so a panicking apply
211                    // can't leave half-consumed state behind.
212                    //
213                    // `replace` (not `take`) leaves a pre-sized Vec behind so each
214                    // batch after the first doesn't re-climb the reallocation
215                    // ladder (4→8→…→cap).
216                    apply(std::mem::replace(&mut batch, Vec::with_capacity(batch_cap)));
217                }
218                if !batch_high.is_none() {
219                    applied = batch_high.clone();
220                    if let Some(mut st) = store.take() {
221                        let raw = std::mem::take(&mut raw_batch);
222                        let cur = applied.clone();
223                        // Hand the store back unconditionally on a clean return so
224                        // a *failed* apply (Ok(Err)) keeps the watch running; only
225                        // a *panicked* task (Err) loses the store and is fatal.
226                        match tokio::task::spawn_blocking(move || {
227                            let res = st.apply(&raw, &cur);
228                            (st, res)
229                        })
230                        .await
231                        {
232                            Ok((st, Ok(()))) => store = Some(st),
233                            Ok((st, Err(e))) => {
234                                warn!(error = %e, "snapshot store apply failed; continuing");
235                                store = Some(st);
236                            }
237                            Err(e) => {
238                                warn!(error = %e, "snapshot store task panicked; aborting watch");
239                                handle.abort();
240                                return Err(KvError::WatchError(format!(
241                                    "snapshot store task panicked: {e}"
242                                )));
243                            }
244                        }
245                    }
246                    on_applied(applied.clone());
247                    batch_high = WatchCursor::none();
248                }
249            }
250            batch_deadline = None;
251        }};
252    }
253
254    // A single timer future, reset in place each time a batch opens. The old
255    // `tokio::time::sleep(timeout)` lived inside the select arm, so it was
256    // re-created on every loop iteration — one Arc-backed timer-wheel entry
257    // allocated, registered, and immediately dropped per received update.
258    // Pinning one future and `reset`-ing it reuses that single allocation; the
259    // `if batch_deadline.is_some()` guard keeps it from firing while idle, so
260    // its initial already-elapsed deadline is never observed.
261    let sleep = tokio::time::sleep(Duration::ZERO);
262    tokio::pin!(sleep);
263
264    loop {
265        tokio::select! {
266            biased;
267
268            // Shutdown wins: flush whatever is batched (so the cursor reflects
269            // it), abandon any updates still in flight on the channel — they
270            // weren't applied, the cursor doesn't claim them, and they'll be
271            // re-delivered on the next resume — and return the applied cursor.
272            res = shutdown.changed() => {
273                if res.is_err() || *shutdown.borrow() {
274                    flush!();
275                    handle.abort();
276                    // Observe the task's terminal state. An abort surfaces as a
277                    // cancelled JoinError, which we ignore; a genuine panic that
278                    // raced ahead of the abort is logged rather than silently lost.
279                    if let Err(join) = handle.await
280                        && !join.is_cancelled()
281                    {
282                        warn!(error = %join, "watch task panicked at shutdown");
283                    }
284                    return Ok(applied);
285                }
286            }
287
288            // Batch window elapsed.
289            () = &mut sleep, if batch_deadline.is_some() => {
290                flush!();
291            }
292
293            update = rx.recv() => {
294                match update {
295                    Some(u) => {
296                        // Cursor authority: every received update bumps the
297                        // pending high-water, regardless of whether `parse`
298                        // keeps it.
299                        batch_high = WatchCursor::from_version(u.version().clone());
300
301                        // Buffer the raw update for the durable store fold (which
302                        // commits the whole batch + cursor atomically on flush).
303                        // Done before `parse` consumes `u` by reference, and only
304                        // when a store is present so the no-persistence path keeps
305                        // its zero-copy cost.
306                        if store.is_some() {
307                            raw_batch.push(u.clone());
308                        }
309
310                        if let Some(parsed) = parse(&u) {
311                            batch.push(parsed);
312                        }
313
314                        // Arm the window on the first received update of a batch
315                        // — even a parse-rejected one, so the cursor advances
316                        // within `window` even through a run of irrelevant keys.
317                        // Reset the pinned timer to the new deadline rather than
318                        // allocating a fresh `Sleep`.
319                        if batch_deadline.is_none() {
320                            let deadline = tokio::time::Instant::now() + config.window;
321                            sleep.as_mut().reset(deadline);
322                            batch_deadline = Some(deadline);
323                        }
324
325                        // Flush on a full parsed batch, or — when persisting — a
326                        // full raw batch, so a window packed with parse-rejected
327                        // updates can't grow `raw_batch` without bound before the
328                        // window elapses.
329                        if batch.len() >= config.max || raw_batch.len() >= config.max {
330                            flush!();
331                        }
332                    }
333                    None => {
334                        // Stream closed. Flush the remainder, then surface the
335                        // watch task's terminal result: a clean end returns the
336                        // applied cursor, an error propagates.
337                        flush!();
338                        return match handle.await {
339                            Ok(Ok(())) => Ok(applied),
340                            Ok(Err(e)) => Err(e),
341                            Err(join) => Err(KvError::WatchError(format!(
342                                "watch task panicked: {join}"
343                            ))),
344                        };
345                    }
346                }
347            }
348        }
349    }
350}
351
352/// Run the underlying watch for `scope`, resuming from `resume` when it carries
353/// a position, with the [`KvError::CursorExpired`] → full-watch fallback.
354async fn run_watch(
355    watcher: &dyn KvWatcher,
356    scope: &WatchScope,
357    resume: Option<WatchCursor>,
358    tx: mpsc::Sender<KvUpdate>,
359) -> Result<(), KvError> {
360    // Resume only when the cursor carries a real position; an absent or `none()`
361    // cursor falls through to a full watch. Binding `cursor` here makes "we have a
362    // resume position" structural — there is no separate bool whose truth a later
363    // edit could let drift from the `Some`.
364    let resume_cursor = resume.filter(|c| !c.is_none());
365
366    match scope {
367        WatchScope::All => {
368            if let Some(cursor) = resume_cursor {
369                match watcher.watch_all_from(&cursor, tx.clone()).await {
370                    Err(KvError::CursorExpired) => {
371                        // TODO(v2): signal a "resync" to the caller so it can
372                        // diff the full re-list against prior state and emit
373                        // synthetic deletes for keys that vanished during the
374                        // gap (see Snapshot::stale_keys). For v1 the full
375                        // re-list is replayed as a stream of puts, matching the
376                        // hand-rolled loops this combinator replaces.
377                        warn!("watch cursor expired, falling back to full watch_all");
378                        watcher.watch_all(tx).await
379                    }
380                    other => other,
381                }
382            } else {
383                watcher.watch_all(tx).await
384            }
385        }
386        WatchScope::Prefix(prefix) => {
387            if let Some(cursor) = resume_cursor {
388                match watcher.watch_prefix_from(prefix, &cursor, tx.clone()).await {
389                    Err(KvError::CursorExpired) => {
390                        // TODO(v2): see the watch_all arm above.
391                        warn!("watch cursor expired, falling back to full watch_prefix");
392                        watcher.watch_prefix(prefix, tx).await
393                    }
394                    other => other,
395                }
396            } else {
397                watcher.watch_prefix(prefix, tx).await
398            }
399        }
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406    use crate::kv::{KvEntry, VersionToken};
407    use crate::snapshot::AppendLogSnapshot;
408    use async_trait::async_trait;
409    use std::sync::Mutex;
410    use std::sync::atomic::{AtomicU64, Ordering};
411    use tokio::sync::mpsc::Sender;
412
413    fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
414        KvUpdate::Put(KvEntry {
415            key: key.to_string(),
416            value: value.to_vec(),
417            version: VersionToken::from_u64(rev),
418        })
419    }
420
421    /// A scripted watcher. Delivers a pre-set list of updates through the
422    /// channel, then either holds the channel open (so window/max/shutdown
423    /// flushes can be exercised without the stream ending) or returns cleanly
424    /// (so channel-close flushing can be exercised).
425    struct MockWatcher {
426        full: Mutex<Option<Vec<KvUpdate>>>,
427        from: Mutex<Option<Vec<KvUpdate>>>,
428        from_expires: bool,
429        hold: bool,
430    }
431
432    impl MockWatcher {
433        fn new(updates: Vec<KvUpdate>, hold: bool) -> Self {
434            Self {
435                full: Mutex::new(Some(updates)),
436                from: Mutex::new(None),
437                from_expires: false,
438                hold,
439            }
440        }
441
442        async fn deliver(&self, which: &Mutex<Option<Vec<KvUpdate>>>, tx: Sender<KvUpdate>) {
443            let updates = which.lock().unwrap().take().unwrap_or_default();
444            for u in updates {
445                if tx.send(u).await.is_err() {
446                    return;
447                }
448            }
449            if self.hold {
450                // Keep `tx` alive (channel open) until this task is aborted.
451                std::future::pending::<()>().await;
452            }
453        }
454    }
455
456    #[async_trait]
457    impl KvWatcher for MockWatcher {
458        async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
459            self.deliver(&self.full, tx).await;
460            Ok(())
461        }
462
463        async fn watch_prefix(&self, _prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
464            self.deliver(&self.full, tx).await;
465            Ok(())
466        }
467
468        async fn watch_prefixes(
469            &self,
470            _prefixes: &[&str],
471            tx: Sender<KvUpdate>,
472        ) -> Result<(), KvError> {
473            // This mock scripts the applied-watch resumption tests, not prefix
474            // filtering; it delivers the same `full` script as `watch_prefix`.
475            // The real multi-filter scoping is proved in the NATS integration test.
476            self.deliver(&self.full, tx).await;
477            Ok(())
478        }
479
480        async fn watch_all_from(
481            &self,
482            _cursor: &WatchCursor,
483            tx: Sender<KvUpdate>,
484        ) -> Result<(), KvError> {
485            if self.from_expires {
486                return Err(KvError::CursorExpired);
487            }
488            self.deliver(&self.from, tx).await;
489            Ok(())
490        }
491
492        // Mirror watch_all_from so the prefix resume / expiry arms of run_watch
493        // are exercised against the same `from` script. Without this the trait's
494        // default impl would delegate to watch_prefix and silently deliver the
495        // full set instead of the delta.
496        async fn watch_prefix_from(
497            &self,
498            _prefix: &str,
499            _cursor: &WatchCursor,
500            tx: Sender<KvUpdate>,
501        ) -> Result<(), KvError> {
502            if self.from_expires {
503                return Err(KvError::CursorExpired);
504            }
505            self.deliver(&self.from, tx).await;
506            Ok(())
507        }
508    }
509
510    /// A watcher whose entry points all fail. Used to prove the watch task's
511    /// terminal error is surfaced out of `watch_applied` rather than swallowed
512    /// as a clean `Ok(applied)` when the channel closes.
513    struct ErrorWatcher;
514
515    #[async_trait]
516    impl KvWatcher for ErrorWatcher {
517        async fn watch_all(&self, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
518            Err(KvError::WatchError("injected watch failure".into()))
519        }
520
521        async fn watch_prefix(&self, _prefix: &str, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
522            Err(KvError::WatchError("injected watch failure".into()))
523        }
524
525        async fn watch_prefixes(
526            &self,
527            _prefixes: &[&str],
528            _tx: Sender<KvUpdate>,
529        ) -> Result<(), KvError> {
530            Err(KvError::WatchError("injected watch failure".into()))
531        }
532    }
533
534    // A no-op parse that keeps every Put as the value bytes; drops deletes.
535    fn parse_put(u: &KvUpdate) -> Option<Vec<u8>> {
536        match u {
537            KvUpdate::Put(e) => Some(e.value.clone()),
538            _ => None,
539        }
540    }
541
542    /// The stream closes (hold = false) with a pending batch; the remainder is
543    /// flushed before returning, the returned cursor is the last revision, and
544    /// `on_applied` ran exactly once after `apply`.
545    #[tokio::test]
546    async fn flush_on_channel_close() {
547        let updates = vec![put("a", b"1", 1), put("b", b"2", 2), put("c", b"3", 3)];
548        let watcher = Arc::new(MockWatcher::new(updates, false));
549
550        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<Vec<u8>>>::new()));
551        let on_applied_cursors = Arc::new(Mutex::new(Vec::<u64>::new()));
552
553        let ab = Arc::clone(&applied_batches);
554        let oc = Arc::clone(&on_applied_cursors);
555        let (_sd_tx, sd_rx) = watch::channel(false);
556
557        let cursor = watch_applied(
558            watcher,
559            WatchScope::All,
560            None,
561            None::<AppendLogSnapshot>,
562            BatchConfig::default(),
563            parse_put,
564            move |batch| ab.lock().unwrap().push(batch),
565            move |c| oc.lock().unwrap().push(c.as_u64().unwrap()),
566            sd_rx,
567        )
568        .await
569        .unwrap();
570
571        assert_eq!(cursor.as_u64(), Some(3));
572        let batches = applied_batches.lock().unwrap();
573        let flat: Vec<Vec<u8>> = batches.iter().flatten().cloned().collect();
574        assert_eq!(flat, vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec()]);
575        assert_eq!(*on_applied_cursors.lock().unwrap().last().unwrap(), 3);
576    }
577
578    /// Fewer than `max` updates, then the channel idles: the window timer must
579    /// flush them and advance the cursor.
580    #[tokio::test(start_paused = true)]
581    async fn flush_on_window() {
582        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
583        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
584
585        let applied = Arc::new(AtomicU64::new(0));
586        let count = Arc::new(AtomicU64::new(0));
587        let a = Arc::clone(&applied);
588        let c = Arc::clone(&count);
589        let (sd_tx, sd_rx) = watch::channel(false);
590
591        let task = tokio::spawn(watch_applied(
592            watcher,
593            WatchScope::All,
594            None,
595            None::<AppendLogSnapshot>,
596            BatchConfig::default(),
597            parse_put,
598            move |batch: Vec<Vec<u8>>| {
599                c.fetch_add(batch.len() as u64, Ordering::SeqCst);
600            },
601            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
602            sd_rx,
603        ));
604
605        // Let the window (10ms) elapse under virtual time.
606        tokio::time::sleep(Duration::from_millis(50)).await;
607        assert_eq!(
608            count.load(Ordering::SeqCst),
609            2,
610            "window should have flushed"
611        );
612        assert_eq!(applied.load(Ordering::SeqCst), 2);
613
614        sd_tx.send(true).unwrap();
615        let cursor = task.await.unwrap().unwrap();
616        assert_eq!(cursor.as_u64(), Some(2));
617    }
618
619    /// Exactly `max` updates fills a batch and flushes immediately — before the
620    /// window would have elapsed.
621    #[tokio::test(start_paused = true)]
622    async fn flush_on_max() {
623        let max = 4;
624        let updates: Vec<_> = (1..=max as u64)
625            .map(|i| put(&format!("k{i}"), b"v", i))
626            .collect();
627        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
628
629        let flushes = Arc::new(Mutex::new(Vec::<usize>::new()));
630        let f = Arc::clone(&flushes);
631        let (sd_tx, sd_rx) = watch::channel(false);
632
633        let task = tokio::spawn(watch_applied(
634            watcher,
635            WatchScope::All,
636            None,
637            None::<AppendLogSnapshot>,
638            BatchConfig {
639                window: Duration::from_secs(3600), // effectively never
640                max,
641            },
642            parse_put,
643            move |batch: Vec<Vec<u8>>| f.lock().unwrap().push(batch.len()),
644            move |_| {},
645            sd_rx,
646        ));
647
648        // Yield enough for the mock to push all `max` updates; the window is an
649        // hour, so any flush is purely the max trigger.
650        tokio::time::sleep(Duration::from_millis(1)).await;
651        assert_eq!(
652            *flushes.lock().unwrap(),
653            vec![max],
654            "a full batch should flush on max, not wait for the window"
655        );
656
657        sd_tx.send(true).unwrap();
658        task.await.unwrap().unwrap();
659    }
660
661    /// A pending batch plus a shutdown signal: the batch is flushed and the
662    /// applied cursor returned.
663    #[tokio::test(start_paused = true)]
664    async fn flush_on_shutdown() {
665        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
666        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
667
668        let applied = Arc::new(AtomicU64::new(0));
669        let a = Arc::clone(&applied);
670        let (sd_tx, sd_rx) = watch::channel(false);
671
672        let task = tokio::spawn(watch_applied(
673            watcher,
674            WatchScope::All,
675            None,
676            None::<AppendLogSnapshot>,
677            BatchConfig {
678                window: Duration::from_secs(3600), // window won't fire
679                max: 100,
680            },
681            parse_put,
682            move |_batch: Vec<Vec<u8>>| {},
683            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
684            sd_rx,
685        ));
686
687        // Give the mock time to deliver both updates into the pending batch.
688        tokio::time::sleep(Duration::from_millis(1)).await;
689        sd_tx.send(true).unwrap();
690
691        let cursor = task.await.unwrap().unwrap();
692        assert_eq!(
693            cursor.as_u64(),
694            Some(2),
695            "shutdown flushes the pending batch"
696        );
697        assert_eq!(applied.load(Ordering::SeqCst), 2);
698    }
699
700    /// The cursor must not advance until `apply` has returned. We prove it by
701    /// having `apply` read the cursor that `on_applied` last published: when the
702    /// second batch is applied, the visible cursor must still be the *first*
703    /// batch's — never the second's, which only becomes visible after this
704    /// `apply` returns.
705    #[tokio::test(start_paused = true)]
706    async fn cursor_advances_only_after_apply() {
707        // Two batches of `max` updates each.
708        let max = 2usize;
709        let updates: Vec<_> = (1..=4u64).map(|i| put(&format!("k{i}"), b"v", i)).collect();
710        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
711
712        // Cursor as last published by on_applied; starts at 0 (nothing applied).
713        let published = Arc::new(AtomicU64::new(0));
714        // What `apply` observed as the published cursor at the moment it ran.
715        let seen_at_apply = Arc::new(Mutex::new(Vec::<u64>::new()));
716
717        let pub_for_apply = Arc::clone(&published);
718        let seen = Arc::clone(&seen_at_apply);
719        let pub_for_on = Arc::clone(&published);
720        let (sd_tx, sd_rx) = watch::channel(false);
721
722        let task = tokio::spawn(watch_applied(
723            watcher,
724            WatchScope::All,
725            None,
726            None::<AppendLogSnapshot>,
727            BatchConfig {
728                window: Duration::from_secs(3600),
729                max,
730            },
731            parse_put,
732            move |_batch: Vec<Vec<u8>>| {
733                // The cursor visible here is whatever the PREVIOUS flush
734                // published — never this batch's, because we haven't returned.
735                seen.lock()
736                    .unwrap()
737                    .push(pub_for_apply.load(Ordering::SeqCst));
738            },
739            move |cur| pub_for_on.store(cur.as_u64().unwrap(), Ordering::SeqCst),
740            sd_rx,
741        ));
742
743        tokio::time::sleep(Duration::from_millis(1)).await;
744        sd_tx.send(true).unwrap();
745        task.await.unwrap().unwrap();
746
747        // First apply saw 0 (nothing applied yet); second apply saw 2 (first
748        // batch's cursor), NOT 4. The cursor only reached 4 after the second
749        // apply returned.
750        assert_eq!(*seen_at_apply.lock().unwrap(), vec![0, 2]);
751        assert_eq!(published.load(Ordering::SeqCst), 4);
752    }
753
754    /// Updates whose `parse` returns `None` (corrupt / irrelevant) carry no
755    /// domain work, but they were still received — so the cursor must advance
756    /// over them.
757    #[tokio::test]
758    async fn corrupt_parse_entries_advance_cursor() {
759        let updates = vec![put("a", b"1", 5), put("b", b"2", 6), put("c", b"3", 7)];
760        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
761
762        let apply_calls = Arc::new(AtomicU64::new(0));
763        let on_applied_max = Arc::new(AtomicU64::new(0));
764        let ac = Arc::clone(&apply_calls);
765        let om = Arc::clone(&on_applied_max);
766        let (_sd_tx, sd_rx) = watch::channel(false);
767
768        let cursor = watch_applied(
769            watcher,
770            WatchScope::All,
771            None,
772            None::<AppendLogSnapshot>,
773            BatchConfig::default(),
774            // Reject everything — simulates corrupt/irrelevant entries.
775            |_u: &KvUpdate| -> Option<Vec<u8>> { None },
776            move |batch: Vec<Vec<u8>>| {
777                ac.fetch_add(1, Ordering::SeqCst);
778                assert!(batch.is_empty());
779            },
780            move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
781            sd_rx,
782        )
783        .await
784        .unwrap();
785
786        assert_eq!(cursor.as_u64(), Some(7), "cursor covers rejected updates");
787        assert_eq!(
788            apply_calls.load(Ordering::SeqCst),
789            0,
790            "an all-rejected batch applies nothing"
791        );
792        assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
793    }
794
795    /// A resume whose cursor has expired falls back to the full watch and still
796    /// applies the delivered updates.
797    #[tokio::test]
798    async fn cursor_expired_falls_back_to_full_watch() {
799        let mock = MockWatcher {
800            full: Mutex::new(Some(vec![put("a", b"1", 10), put("b", b"2", 11)])),
801            from: Mutex::new(Some(vec![])),
802            from_expires: true,
803            hold: false,
804        };
805        let watcher = Arc::new(mock);
806
807        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
808        let ab = Arc::clone(&applied_batches);
809        let (_sd_tx, sd_rx) = watch::channel(false);
810
811        let cursor = watch_applied(
812            watcher,
813            WatchScope::All,
814            Some(WatchCursor::from_u64(5)), // resume position that "expired"
815            None::<AppendLogSnapshot>,
816            BatchConfig::default(),
817            parse_put,
818            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
819            move |_| {},
820            sd_rx,
821        )
822        .await
823        .unwrap();
824
825        assert_eq!(cursor.as_u64(), Some(11));
826        assert_eq!(
827            *applied_batches.lock().unwrap(),
828            vec![b"1".to_vec(), b"2".to_vec()],
829            "fallback full watch's updates were applied"
830        );
831    }
832
833    /// End-to-end with a real snapshot file: after the run, the persisted
834    /// snapshot's cursor equals the applied cursor and its entries match the
835    /// applied state — proving the checkpoint is written at the post-apply
836    /// cursor, never ahead of it.
837    #[tokio::test]
838    async fn snapshot_checkpoint_matches_applied_cursor() {
839        let dir = tempfile::TempDir::new().unwrap();
840        let path = dir.path().join("applied.snap");
841        let (_resume, store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
842
843        let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
844        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
845        let (_sd_tx, sd_rx) = watch::channel(false);
846
847        let cursor = watch_applied(
848            watcher,
849            WatchScope::All,
850            None,
851            Some(store),
852            BatchConfig::default(),
853            parse_put,
854            move |_batch: Vec<Vec<u8>>| {},
855            move |_| {},
856            sd_rx,
857        )
858        .await
859        .unwrap();
860
861        assert_eq!(cursor.as_u64(), Some(2));
862
863        let snap = crate::snapshot::load(&path).unwrap().unwrap();
864        assert_eq!(
865            snap.cursor.as_u64(),
866            cursor.as_u64(),
867            "snapshot checkpoint cursor must equal the applied cursor"
868        );
869        assert_eq!(snap.entries.len(), 2);
870        assert_eq!(snap.entries["node.a"].value, b"1");
871        assert_eq!(snap.entries["node.b"].value, b"2");
872    }
873
874    /// Happy-path resume: a non-expired cursor takes the `*_from` path and the
875    /// delta (the `from` script, NOT the full set) is applied. Proves the
876    /// resume branch delivers only post-cursor updates and advances to their
877    /// max revision.
878    #[tokio::test]
879    async fn resume_from_cursor_delivers_only_delta() {
880        let mock = MockWatcher {
881            // `full` would be delivered only if the resume path were (wrongly)
882            // bypassed; a non-empty distinguishing value makes that visible.
883            full: Mutex::new(Some(vec![put("full.x", b"FULL", 1)])),
884            from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
885            from_expires: false,
886            hold: false,
887        };
888        let watcher = Arc::new(mock);
889
890        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
891        let ab = Arc::clone(&applied_batches);
892        let (_sd_tx, sd_rx) = watch::channel(false);
893
894        let cursor = watch_applied(
895            watcher,
896            WatchScope::All,
897            Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
898            None::<AppendLogSnapshot>,
899            BatchConfig::default(),
900            parse_put,
901            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
902            move |_| {},
903            sd_rx,
904        )
905        .await
906        .unwrap();
907
908        assert_eq!(
909            cursor.as_u64(),
910            Some(11),
911            "cursor advances to the delta max"
912        );
913        assert_eq!(
914            *applied_batches.lock().unwrap(),
915            vec![b"3".to_vec(), b"4".to_vec()],
916            "only the post-cursor delta is applied, never the full set"
917        );
918    }
919
920    /// `WatchScope::Prefix` with no resume dispatches to `watch_prefix` and
921    /// applies the delivered updates. Every other test uses `WatchScope::All`;
922    /// this covers the prefix dispatch arm.
923    #[tokio::test]
924    async fn prefix_scope_applies_delivered_updates() {
925        let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
926        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
927
928        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
929        let ab = Arc::clone(&applied_batches);
930        let (_sd_tx, sd_rx) = watch::channel(false);
931
932        let cursor = watch_applied(
933            watcher,
934            WatchScope::Prefix("node.".to_string()),
935            None,
936            None::<AppendLogSnapshot>,
937            BatchConfig::default(),
938            parse_put,
939            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
940            move |_| {},
941            sd_rx,
942        )
943        .await
944        .unwrap();
945
946        assert_eq!(cursor.as_u64(), Some(2));
947        assert_eq!(
948            *applied_batches.lock().unwrap(),
949            vec![b"1".to_vec(), b"2".to_vec()]
950        );
951    }
952
953    /// `WatchScope::Prefix` resume whose cursor has expired falls back to the
954    /// full `watch_prefix` and still applies the delivered updates — the prefix
955    /// twin of `cursor_expired_falls_back_to_full_watch`.
956    #[tokio::test]
957    async fn prefix_cursor_expired_falls_back_to_full_prefix_watch() {
958        let mock = MockWatcher {
959            full: Mutex::new(Some(vec![put("node.a", b"1", 10), put("node.b", b"2", 11)])),
960            from: Mutex::new(Some(vec![])),
961            from_expires: true,
962            hold: false,
963        };
964        let watcher = Arc::new(mock);
965
966        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
967        let ab = Arc::clone(&applied_batches);
968        let (_sd_tx, sd_rx) = watch::channel(false);
969
970        let cursor = watch_applied(
971            watcher,
972            WatchScope::Prefix("node.".to_string()),
973            Some(WatchCursor::from_u64(5)), // resume position that "expired"
974            None::<AppendLogSnapshot>,
975            BatchConfig::default(),
976            parse_put,
977            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
978            move |_| {},
979            sd_rx,
980        )
981        .await
982        .unwrap();
983
984        assert_eq!(cursor.as_u64(), Some(11));
985        assert_eq!(
986            *applied_batches.lock().unwrap(),
987            vec![b"1".to_vec(), b"2".to_vec()],
988            "prefix fallback full watch's updates were applied"
989        );
990    }
991
992    /// The watch task's terminal error must propagate out of `watch_applied`
993    /// rather than being swallowed as `Ok(applied)` when the channel closes.
994    #[tokio::test]
995    async fn watch_task_error_propagates() {
996        let watcher = Arc::new(ErrorWatcher);
997        let (_sd_tx, sd_rx) = watch::channel(false);
998
999        let result = watch_applied(
1000            watcher,
1001            WatchScope::All,
1002            None,
1003            None::<AppendLogSnapshot>,
1004            BatchConfig::default(),
1005            parse_put,
1006            move |_batch: Vec<Vec<u8>>| {},
1007            move |_| {},
1008            sd_rx,
1009        )
1010        .await;
1011
1012        match result {
1013            Err(KvError::WatchError(msg)) => {
1014                assert!(msg.contains("injected"), "error carries the cause: {msg}");
1015            }
1016            other => panic!("expected WatchError, got {other:?}"),
1017        }
1018    }
1019
1020    /// A batch where `parse` accepts some updates and rejects others: the cursor
1021    /// must still advance to the highest *received* revision (covering the
1022    /// rejected entry in the middle), while `apply` sees only the accepted ones.
1023    #[tokio::test]
1024    async fn mixed_parse_advances_cursor_over_rejected_entries() {
1025        let updates = vec![
1026            put("keep.a", b"1", 5),
1027            put("skip.b", b"2", 6), // rejected by parse
1028            put("keep.c", b"3", 7),
1029        ];
1030        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1031
1032        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1033        let on_applied_max = Arc::new(AtomicU64::new(0));
1034        let ab = Arc::clone(&applied_batches);
1035        let om = Arc::clone(&on_applied_max);
1036        let (_sd_tx, sd_rx) = watch::channel(false);
1037
1038        let cursor = watch_applied(
1039            watcher,
1040            WatchScope::All,
1041            None,
1042            None::<AppendLogSnapshot>,
1043            BatchConfig::default(),
1044            // Keep only keys under "keep."; reject everything else.
1045            |u: &KvUpdate| -> Option<Vec<u8>> {
1046                match u {
1047                    KvUpdate::Put(e) if e.key.starts_with("keep.") => Some(e.value.clone()),
1048                    _ => None,
1049                }
1050            },
1051            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1052            move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1053            sd_rx,
1054        )
1055        .await
1056        .unwrap();
1057
1058        assert_eq!(
1059            cursor.as_u64(),
1060            Some(7),
1061            "cursor covers the rejected middle entry (rev 6)"
1062        );
1063        assert_eq!(
1064            *applied_batches.lock().unwrap(),
1065            vec![b"1".to_vec(), b"3".to_vec()],
1066            "apply sees only the accepted entries"
1067        );
1068        assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1069    }
1070
1071    /// Shutdown before any update arrives: nothing was received, so the cursor
1072    /// stays at the resume position (here `none()`), `apply` never runs, and
1073    /// `on_applied` never fires.
1074    #[tokio::test(start_paused = true)]
1075    async fn shutdown_with_no_pending_batch() {
1076        let watcher = Arc::new(MockWatcher::new(vec![], true)); // deliver nothing, hold open
1077
1078        let apply_calls = Arc::new(AtomicU64::new(0));
1079        let on_applied_calls = Arc::new(AtomicU64::new(0));
1080        let ac = Arc::clone(&apply_calls);
1081        let oc = Arc::clone(&on_applied_calls);
1082        let (sd_tx, sd_rx) = watch::channel(false);
1083
1084        let task = tokio::spawn(watch_applied(
1085            watcher,
1086            WatchScope::All,
1087            None,
1088            None::<AppendLogSnapshot>,
1089            BatchConfig::default(),
1090            parse_put,
1091            move |_batch: Vec<Vec<u8>>| {
1092                ac.fetch_add(1, Ordering::SeqCst);
1093            },
1094            move |_| {
1095                oc.fetch_add(1, Ordering::SeqCst);
1096            },
1097            sd_rx,
1098        ));
1099
1100        // Let the watcher attach and idle (it has nothing to deliver), then shut down.
1101        tokio::time::sleep(Duration::from_millis(1)).await;
1102        sd_tx.send(true).unwrap();
1103
1104        let cursor = task.await.unwrap().unwrap();
1105        assert_eq!(
1106            cursor.as_u64(),
1107            None,
1108            "no updates received → cursor unmoved"
1109        );
1110        assert_eq!(apply_calls.load(Ordering::SeqCst), 0, "apply never runs");
1111        assert_eq!(
1112            on_applied_calls.load(Ordering::SeqCst),
1113            0,
1114            "on_applied never fires"
1115        );
1116    }
1117
1118    /// With a low `compact_threshold`, the flush path's `spawn_blocking`
1119    /// compaction actually fires (every other snapshot test pins the threshold
1120    /// at `u64::MAX`, leaving that branch dead). After a compacting run the
1121    /// snapshot must still load cleanly with the right cursor and entries.
1122    #[tokio::test]
1123    async fn snapshot_compaction_fires_and_stays_consistent() {
1124        let dir = tempfile::TempDir::new().unwrap();
1125        let path = dir.path().join("applied.snap");
1126        // threshold 0 → every checkpoint reports "needs compact", forcing the
1127        // store's inline-compaction branch on each flush (run off the hot path via
1128        // spawn_blocking inside watch_applied).
1129        let (_resume, store) = AppendLogSnapshot::open(&path, 0).unwrap();
1130
1131        // Re-put the same key across flushes so compaction has duplicates to
1132        // dedup; small max forces multiple flushes (hence multiple compactions).
1133        let updates = vec![
1134            put("node.a", b"1", 1),
1135            put("node.a", b"2", 2),
1136            put("node.b", b"3", 3),
1137            put("node.a", b"4", 4),
1138        ];
1139        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1140        let (_sd_tx, sd_rx) = watch::channel(false);
1141
1142        let cursor = watch_applied(
1143            watcher,
1144            WatchScope::All,
1145            None,
1146            Some(store),
1147            BatchConfig {
1148                window: Duration::from_secs(3600),
1149                max: 1, // one update per flush → a compaction per update
1150            },
1151            parse_put,
1152            move |_batch: Vec<Vec<u8>>| {},
1153            move |_| {},
1154            sd_rx,
1155        )
1156        .await
1157        .unwrap();
1158
1159        assert_eq!(cursor.as_u64(), Some(4));
1160
1161        let snap = crate::snapshot::load(&path).unwrap().unwrap();
1162        assert_eq!(
1163            snap.cursor.as_u64(),
1164            cursor.as_u64(),
1165            "compacted snapshot's cursor still equals the applied cursor"
1166        );
1167        assert_eq!(snap.entries.len(), 2, "duplicates of node.a deduped");
1168        assert_eq!(
1169            snap.entries["node.a"].value, b"4",
1170            "last write per key survives compaction"
1171        );
1172        assert_eq!(snap.entries["node.b"].value, b"3");
1173    }
1174}