Skip to main content

slipstream/
applied.rs

1//! Cursor-after-apply watch combinator.
2//!
3//! [`watch_applied`] drives a [`KvWatcher`], batches incoming [`KvUpdate`]s over
4//! a short window (or a max count), hands each batch to a caller-supplied
5//! `apply` closure, and **only then** advances the resume cursor, checkpoints
6//! the snapshot, and fires `on_applied`. It encodes one discipline that every
7//! hand-rolled watch loop in the wider system gets subtly wrong:
8//!
9//! > **INVARIANT.** A persisted/reported cursor `C` implies every update with
10//! > revision ≤ `C` has been *applied* — the caller's `apply()` has returned for
11//! > it. The cursor never advances on *receipt* of an update, only after it has
12//! > durably taken effect.
13//!
14//! ## Why receipt is the wrong signal
15//!
16//! The tempting shortcut is to bump the cursor as each update arrives off the
17//! channel (`high_water = rev` on `rx.recv()`), then apply the batch later. On a
18//! crash between those two steps the persisted cursor claims "caught up to rev
19//! N" while rev N is still sitting in an unapplied buffer. On resume the watch
20//! starts *past* rev N and silently skips it — a correctness hole in the exact
21//! "resume after any restart" guarantee this crate advertises.
22//!
23//! Saltzer, Reed & Clark's *End-to-End Arguments in System Design* (1984) names
24//! the fix: a function placed below the endpoints (here, the channel receive)
25//! can only be a performance hint; the *endpoint* — the application of the
26//! update — is the only place the "it happened" guarantee can actually be
27//! established. So the cursor is written from `apply()`'s completion, not from
28//! the transport's delivery.
29//!
30//! The cursor-as-monotonic-index-into-a-log shape itself follows HashiCorp
31//! Consul's anti-entropy / blocking-query lineage: a client holds the last index
32//! it has *reconciled* and re-arms the watch from there, never from the index it
33//! merely *saw*.
34//!
35//! ## What the caller supplies
36//!
37//! - `parse`: maps a raw [`KvUpdate`] to an optional domain value `U`. Returning
38//!   `None` (corrupt bytes, irrelevant key) is fine — the update is still
39//!   *received*, so it still counts toward the cursor; there is simply nothing to
40//!   apply for it.
41//! - `apply`: consumes a `Vec<U>` in revision order. This is the only domain
42//!   logic; for the tunnel router it swaps the route table, for the edge origin
43//!   watcher it rebuilds the hashrings.
44//! - `on_applied`: fires once per flush, *after* `apply` returns, with the new
45//!   applied cursor. Callers use it to persist the cursor for the next restart.
46//!
47//! ## Panics
48//!
49//! `apply` runs inline on the watch task. If it panics, the panic propagates out
50//! of [`watch_applied`] and aborts the watch — that is the caller's contract,
51//! the same as a panic in any other supplied closure.
52
53use std::sync::Arc;
54use std::time::Duration;
55
56use tokio::sync::mpsc;
57use tokio::sync::watch;
58use tracing::warn;
59
60use crate::kv::{KvError, KvUpdate, KvWatcher, WatchCursor};
61use crate::snapshot::SnapshotWriter;
62
63/// What to watch: every key, or every key under a prefix.
64///
65/// Mirrors the [`KvWatcher`] surface — `All` maps to `watch_all` /
66/// `watch_all_from`, `Prefix` to `watch_prefix` / `watch_prefix_from`.
67#[derive(Debug, Clone)]
68pub enum WatchScope {
69    /// Watch all keys in the bucket.
70    All,
71    /// Watch only keys beginning with this prefix.
72    Prefix(String),
73}
74
75/// Batching policy for [`watch_applied`].
76///
77/// A flush fires when **either** bound is hit, whichever comes first: `window`
78/// time has elapsed since the batch opened, or `max` updates have accumulated.
79/// The window amortizes the cost of `apply` (e.g. one route-table clone per
80/// flush instead of one per update); `max` caps memory and latency when updates
81/// arrive faster than the window.
82#[derive(Debug, Clone, Copy)]
83pub struct BatchConfig {
84    /// Maximum time a batch stays open before being flushed.
85    pub window: Duration,
86    /// Maximum number of parsed updates in a batch before forcing a flush.
87    pub max: usize,
88}
89
90impl Default for BatchConfig {
91    /// 10 ms / 100 updates — the de-facto default every hand-rolled caller
92    /// already used, lifted into one place.
93    fn default() -> Self {
94        Self {
95            window: Duration::from_millis(10),
96            max: 100,
97        }
98    }
99}
100
101/// Drive a watch with cursor-after-apply semantics.
102///
103/// Subscribes per `scope` (resuming from `resume` when it carries a position),
104/// batches updates per `config`, applies each batch via `apply`, and only then
105/// advances the cursor / checkpoints `snapshot` / calls `on_applied`. Returns
106/// the final applied cursor when the watch ends (shutdown signalled, or the
107/// underlying stream closed).
108///
109/// Raw [`KvUpdate`]s are streamed to `snapshot` as they arrive, but the
110/// *checkpoint* cursor written on each flush is the post-apply cursor — so a
111/// loaded snapshot's cursor is always consistent with the state it carries
112/// (the cursor never names a revision whose `apply` had not returned before the
113/// checkpoint).
114///
115/// On [`KvError::CursorExpired`] from the `*_from` resume path, this logs and
116/// falls back to a full-scope watch (`watch_all` / `watch_prefix`). Callers see
117/// the full re-list as a stream of puts, exactly as the hand-rolled loops did.
118///
119/// See `ARCHITECTURE.md` ("Applied-Cursor Watch") for the invariant and its
120/// rationale.
121///
122/// # Type parameters
123/// - `U`: the caller's domain update type, produced by `parse` and consumed by
124///   `apply`.
125// This combinator takes each of its dependencies as a parameter so every
126// caller-supplied closure (`parse`/`apply`/`on_applied`) keeps its own distinct
127// type and is monomorphized at the call site. Folding them into a builder struct
128// would either box the closures or force a single generic bundle, losing that.
129#[allow(clippy::too_many_arguments)]
130// The flush macro resets `batch_high`/`batch_deadline`/`snapshot` for the next
131// loop iteration. At the two flush sites that return immediately afterward
132// (shutdown, channel-close) those resets are dead stores — correct, but flagged.
133#[allow(unused_assignments)]
134pub async fn watch_applied<U, P, A, O>(
135    watcher: Arc<dyn KvWatcher>,
136    scope: WatchScope,
137    resume: Option<WatchCursor>,
138    mut snapshot: Option<SnapshotWriter>,
139    config: BatchConfig,
140    mut parse: P,
141    mut apply: A,
142    mut on_applied: O,
143    mut shutdown: watch::Receiver<bool>,
144) -> Result<WatchCursor, KvError>
145where
146    U: Send,
147    P: FnMut(&KvUpdate) -> Option<U> + Send,
148    A: FnMut(Vec<U>) + Send,
149    O: FnMut(WatchCursor) + Send,
150{
151    // The cursor we'll return. Initialized from the resume position so that a
152    // watch which receives nothing new still reports the position it resumed
153    // from as "applied" (it is — everything up to it was applied before the last
154    // run persisted it).
155    let mut applied = match &resume {
156        Some(c) => c.clone(),
157        None => WatchCursor::none(),
158    };
159
160    // Spawn the watch task. It owns the cursor-expired fallback so the main loop
161    // only ever sees a clean ordered stream of updates on `rx`.
162    let (tx, mut rx) = mpsc::channel::<KvUpdate>(256);
163    let handle = {
164        let watcher = Arc::clone(&watcher);
165        tokio::spawn(async move { run_watch(watcher.as_ref(), &scope, resume, tx).await })
166    };
167
168    // Batch state.
169    //
170    // `batch_high` tracks the version of the most recently *received* update
171    // since the last flush — including updates `parse` rejected. NATS delivers
172    // in revision order, so the last received is the highest, and advancing the
173    // cursor to it after a single atomic `apply` is correct: having seen the max
174    // means we've seen everything below it, and a rejected entry is still
175    // "nothing to apply", hence covered. Reset to `none()` after every flush.
176    let batch_cap = config.max.clamp(1, 64);
177    let mut batch: Vec<U> = Vec::with_capacity(batch_cap);
178    let mut batch_high = WatchCursor::none();
179    // `Some` once a batch has opened and the window timer is armed; `None`
180    // between flushes. Only the armed/idle distinction is read in the loop —
181    // the absolute instant lives in the pinned `sleep` future below.
182    let mut batch_deadline: Option<tokio::time::Instant> = None;
183
184    // Flush the current batch: apply (if non-empty), then advance the cursor,
185    // checkpoint the snapshot at that cursor, and fire `on_applied` — in that
186    // order. Returns whether the snapshot grew past its compaction threshold.
187    // Defined as a macro so it can mutate the locals above and `.await` nothing
188    // itself; compaction (which does block) is handled by the caller via
189    // `flush_and_compact!`.
190    macro_rules! flush_inner {
191        () => {{
192            let mut needs_compact = false;
193            // Nothing received since the last flush → nothing to do at all.
194            if !batch.is_empty() || !batch_high.is_none() {
195                if !batch.is_empty() {
196                    // INVARIANT: apply() runs and RETURNS before any cursor
197                    // advance below. Move the batch out so a panicking apply
198                    // can't leave half-consumed state behind.
199                    //
200                    // `replace` (not `take`) leaves a pre-sized Vec behind:
201                    // `take` swaps in a zero-capacity `Vec::new()`, so every
202                    // batch after the first re-climbs the reallocation ladder
203                    // (4→8→…→cap). Handing back a `with_capacity` Vec keeps the
204                    // amortized allocation to one per batch.
205                    apply(std::mem::replace(&mut batch, Vec::with_capacity(batch_cap)));
206                }
207                if !batch_high.is_none() {
208                    applied = batch_high.clone();
209                    if let Some(sw) = snapshot.as_mut() {
210                        match sw.checkpoint(&applied) {
211                            Ok(true) => needs_compact = true,
212                            Ok(false) => {}
213                            Err(e) => warn!(error = %e, "snapshot checkpoint failed"),
214                        }
215                    }
216                    on_applied(applied.clone());
217                }
218                batch_high = WatchCursor::none();
219            }
220            batch_deadline = None;
221            needs_compact
222        }};
223    }
224
225    // Flush, then run compaction off the hot path if the log grew too large.
226    // Compaction reads and rewrites the whole snapshot file, so it must not run
227    // on the async reactor thread — `spawn_blocking` moves it to the blocking
228    // pool and we reclaim the writer afterward.
229    macro_rules! flush_and_compact {
230        () => {{
231            if flush_inner!()
232                && let Some(mut sw) = snapshot.take()
233            {
234                // Return the writer to the closure unconditionally so a *failed*
235                // compaction (Ok(Err)) still hands the writer back — checkpoints
236                // continue, only this compaction was skipped. A *panicked*
237                // blocking task (Err) drops the writer on the blocking thread and
238                // we can't recover it; rather than silently run the rest of the
239                // watch without persistence — which breaks the resume-after-restart
240                // guarantee — we surface it as a fatal error.
241                match tokio::task::spawn_blocking(move || {
242                    let res = sw.compact();
243                    (sw, res)
244                })
245                .await
246                {
247                    Ok((sw, Ok(()))) => snapshot = Some(sw),
248                    Ok((sw, Err(e))) => {
249                        warn!(error = %e, "snapshot compaction failed; continuing without compacting");
250                        snapshot = Some(sw);
251                    }
252                    Err(e) => {
253                        warn!(error = %e, "snapshot compaction task panicked; aborting watch");
254                        handle.abort();
255                        return Err(KvError::WatchError(format!(
256                            "snapshot compaction task panicked: {e}"
257                        )));
258                    }
259                }
260            }
261        }};
262    }
263
264    // A single timer future, reset in place each time a batch opens. The old
265    // `tokio::time::sleep(timeout)` lived inside the select arm, so it was
266    // re-created on every loop iteration — one Arc-backed timer-wheel entry
267    // allocated, registered, and immediately dropped per received update.
268    // Pinning one future and `reset`-ing it reuses that single allocation; the
269    // `if batch_deadline.is_some()` guard keeps it from firing while idle, so
270    // its initial already-elapsed deadline is never observed.
271    let sleep = tokio::time::sleep(Duration::ZERO);
272    tokio::pin!(sleep);
273
274    loop {
275        tokio::select! {
276            biased;
277
278            // Shutdown wins: flush whatever is batched (so the cursor reflects
279            // it), abandon any updates still in flight on the channel — they
280            // weren't applied, the cursor doesn't claim them, and they'll be
281            // re-delivered on the next resume — and return the applied cursor.
282            res = shutdown.changed() => {
283                if res.is_err() || *shutdown.borrow() {
284                    flush_and_compact!();
285                    handle.abort();
286                    // Observe the task's terminal state. An abort surfaces as a
287                    // cancelled JoinError, which we ignore; a genuine panic that
288                    // raced ahead of the abort is logged rather than silently lost.
289                    if let Err(join) = handle.await
290                        && !join.is_cancelled()
291                    {
292                        warn!(error = %join, "watch task panicked at shutdown");
293                    }
294                    return Ok(applied);
295                }
296            }
297
298            // Batch window elapsed.
299            () = &mut sleep, if batch_deadline.is_some() => {
300                flush_and_compact!();
301            }
302
303            update = rx.recv() => {
304                match update {
305                    Some(u) => {
306                        // Cursor authority: every received update bumps the
307                        // pending high-water, regardless of whether `parse`
308                        // keeps it.
309                        batch_high = WatchCursor::from_version(u.version().clone());
310
311                        // Stream the raw update to the snapshot log as it
312                        // arrives. The durable checkpoint is written later at
313                        // the applied cursor, so a crash here just means the log
314                        // holds data ahead of its cursor — re-applied on resume,
315                        // never skipped.
316                        if let Some(sw) = snapshot.as_mut()
317                            && let Err(e) = sw.write_update(&u)
318                        {
319                            warn!(error = %e, "snapshot write failed");
320                        }
321
322                        if let Some(parsed) = parse(&u) {
323                            batch.push(parsed);
324                        }
325
326                        // Arm the window on the first received update of a batch
327                        // — even a parse-rejected one, so the cursor advances
328                        // within `window` even through a run of irrelevant keys.
329                        // Reset the pinned timer to the new deadline rather than
330                        // allocating a fresh `Sleep`.
331                        if batch_deadline.is_none() {
332                            let deadline = tokio::time::Instant::now() + config.window;
333                            sleep.as_mut().reset(deadline);
334                            batch_deadline = Some(deadline);
335                        }
336
337                        if batch.len() >= config.max {
338                            flush_and_compact!();
339                        }
340                    }
341                    None => {
342                        // Stream closed. Flush the remainder, then surface the
343                        // watch task's terminal result: a clean end returns the
344                        // applied cursor, an error propagates.
345                        flush_and_compact!();
346                        return match handle.await {
347                            Ok(Ok(())) => Ok(applied),
348                            Ok(Err(e)) => Err(e),
349                            Err(join) => Err(KvError::WatchError(format!(
350                                "watch task panicked: {join}"
351                            ))),
352                        };
353                    }
354                }
355            }
356        }
357    }
358}
359
360/// Run the underlying watch for `scope`, resuming from `resume` when it carries
361/// a position, with the [`KvError::CursorExpired`] → full-watch fallback.
362async fn run_watch(
363    watcher: &dyn KvWatcher,
364    scope: &WatchScope,
365    resume: Option<WatchCursor>,
366    tx: mpsc::Sender<KvUpdate>,
367) -> Result<(), KvError> {
368    // Resume only when the cursor carries a real position; an absent or `none()`
369    // cursor falls through to a full watch. Binding `cursor` here makes "we have a
370    // resume position" structural — there is no separate bool whose truth a later
371    // edit could let drift from the `Some`.
372    let resume_cursor = resume.filter(|c| !c.is_none());
373
374    match scope {
375        WatchScope::All => {
376            if let Some(cursor) = resume_cursor {
377                match watcher.watch_all_from(&cursor, tx.clone()).await {
378                    Err(KvError::CursorExpired) => {
379                        // TODO(v2): signal a "resync" to the caller so it can
380                        // diff the full re-list against prior state and emit
381                        // synthetic deletes for keys that vanished during the
382                        // gap (see Snapshot::stale_keys). For v1 the full
383                        // re-list is replayed as a stream of puts, matching the
384                        // hand-rolled loops this combinator replaces.
385                        warn!("watch cursor expired, falling back to full watch_all");
386                        watcher.watch_all(tx).await
387                    }
388                    other => other,
389                }
390            } else {
391                watcher.watch_all(tx).await
392            }
393        }
394        WatchScope::Prefix(prefix) => {
395            if let Some(cursor) = resume_cursor {
396                match watcher.watch_prefix_from(prefix, &cursor, tx.clone()).await {
397                    Err(KvError::CursorExpired) => {
398                        // TODO(v2): see the watch_all arm above.
399                        warn!("watch cursor expired, falling back to full watch_prefix");
400                        watcher.watch_prefix(prefix, tx).await
401                    }
402                    other => other,
403                }
404            } else {
405                watcher.watch_prefix(prefix, tx).await
406            }
407        }
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414    use crate::kv::{KvEntry, VersionToken};
415    use async_trait::async_trait;
416    use std::sync::Mutex;
417    use std::sync::atomic::{AtomicU64, Ordering};
418    use tokio::sync::mpsc::Sender;
419
420    fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
421        KvUpdate::Put(KvEntry {
422            key: key.to_string(),
423            value: value.to_vec(),
424            version: VersionToken::from_u64(rev),
425        })
426    }
427
428    /// A scripted watcher. Delivers a pre-set list of updates through the
429    /// channel, then either holds the channel open (so window/max/shutdown
430    /// flushes can be exercised without the stream ending) or returns cleanly
431    /// (so channel-close flushing can be exercised).
432    struct MockWatcher {
433        full: Mutex<Option<Vec<KvUpdate>>>,
434        from: Mutex<Option<Vec<KvUpdate>>>,
435        from_expires: bool,
436        hold: bool,
437    }
438
439    impl MockWatcher {
440        fn new(updates: Vec<KvUpdate>, hold: bool) -> Self {
441            Self {
442                full: Mutex::new(Some(updates)),
443                from: Mutex::new(None),
444                from_expires: false,
445                hold,
446            }
447        }
448
449        async fn deliver(&self, which: &Mutex<Option<Vec<KvUpdate>>>, tx: Sender<KvUpdate>) {
450            let updates = which.lock().unwrap().take().unwrap_or_default();
451            for u in updates {
452                if tx.send(u).await.is_err() {
453                    return;
454                }
455            }
456            if self.hold {
457                // Keep `tx` alive (channel open) until this task is aborted.
458                std::future::pending::<()>().await;
459            }
460        }
461    }
462
463    #[async_trait]
464    impl KvWatcher for MockWatcher {
465        async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
466            self.deliver(&self.full, tx).await;
467            Ok(())
468        }
469
470        async fn watch_prefix(&self, _prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
471            self.deliver(&self.full, tx).await;
472            Ok(())
473        }
474
475        async fn watch_all_from(
476            &self,
477            _cursor: &WatchCursor,
478            tx: Sender<KvUpdate>,
479        ) -> Result<(), KvError> {
480            if self.from_expires {
481                return Err(KvError::CursorExpired);
482            }
483            self.deliver(&self.from, tx).await;
484            Ok(())
485        }
486
487        // Mirror watch_all_from so the prefix resume / expiry arms of run_watch
488        // are exercised against the same `from` script. Without this the trait's
489        // default impl would delegate to watch_prefix and silently deliver the
490        // full set instead of the delta.
491        async fn watch_prefix_from(
492            &self,
493            _prefix: &str,
494            _cursor: &WatchCursor,
495            tx: Sender<KvUpdate>,
496        ) -> Result<(), KvError> {
497            if self.from_expires {
498                return Err(KvError::CursorExpired);
499            }
500            self.deliver(&self.from, tx).await;
501            Ok(())
502        }
503    }
504
505    /// A watcher whose entry points all fail. Used to prove the watch task's
506    /// terminal error is surfaced out of `watch_applied` rather than swallowed
507    /// as a clean `Ok(applied)` when the channel closes.
508    struct ErrorWatcher;
509
510    #[async_trait]
511    impl KvWatcher for ErrorWatcher {
512        async fn watch_all(&self, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
513            Err(KvError::WatchError("injected watch failure".into()))
514        }
515
516        async fn watch_prefix(&self, _prefix: &str, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
517            Err(KvError::WatchError("injected watch failure".into()))
518        }
519    }
520
521    // A no-op parse that keeps every Put as the value bytes; drops deletes.
522    fn parse_put(u: &KvUpdate) -> Option<Vec<u8>> {
523        match u {
524            KvUpdate::Put(e) => Some(e.value.clone()),
525            _ => None,
526        }
527    }
528
529    /// The stream closes (hold = false) with a pending batch; the remainder is
530    /// flushed before returning, the returned cursor is the last revision, and
531    /// `on_applied` ran exactly once after `apply`.
532    #[tokio::test]
533    async fn flush_on_channel_close() {
534        let updates = vec![put("a", b"1", 1), put("b", b"2", 2), put("c", b"3", 3)];
535        let watcher = Arc::new(MockWatcher::new(updates, false));
536
537        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<Vec<u8>>>::new()));
538        let on_applied_cursors = Arc::new(Mutex::new(Vec::<u64>::new()));
539
540        let ab = Arc::clone(&applied_batches);
541        let oc = Arc::clone(&on_applied_cursors);
542        let (_sd_tx, sd_rx) = watch::channel(false);
543
544        let cursor = watch_applied(
545            watcher,
546            WatchScope::All,
547            None,
548            None,
549            BatchConfig::default(),
550            parse_put,
551            move |batch| ab.lock().unwrap().push(batch),
552            move |c| oc.lock().unwrap().push(c.as_u64().unwrap()),
553            sd_rx,
554        )
555        .await
556        .unwrap();
557
558        assert_eq!(cursor.as_u64(), Some(3));
559        let batches = applied_batches.lock().unwrap();
560        let flat: Vec<Vec<u8>> = batches.iter().flatten().cloned().collect();
561        assert_eq!(flat, vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec()]);
562        assert_eq!(*on_applied_cursors.lock().unwrap().last().unwrap(), 3);
563    }
564
565    /// Fewer than `max` updates, then the channel idles: the window timer must
566    /// flush them and advance the cursor.
567    #[tokio::test(start_paused = true)]
568    async fn flush_on_window() {
569        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
570        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
571
572        let applied = Arc::new(AtomicU64::new(0));
573        let count = Arc::new(AtomicU64::new(0));
574        let a = Arc::clone(&applied);
575        let c = Arc::clone(&count);
576        let (sd_tx, sd_rx) = watch::channel(false);
577
578        let task = tokio::spawn(watch_applied(
579            watcher,
580            WatchScope::All,
581            None,
582            None,
583            BatchConfig::default(),
584            parse_put,
585            move |batch: Vec<Vec<u8>>| {
586                c.fetch_add(batch.len() as u64, Ordering::SeqCst);
587            },
588            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
589            sd_rx,
590        ));
591
592        // Let the window (10ms) elapse under virtual time.
593        tokio::time::sleep(Duration::from_millis(50)).await;
594        assert_eq!(
595            count.load(Ordering::SeqCst),
596            2,
597            "window should have flushed"
598        );
599        assert_eq!(applied.load(Ordering::SeqCst), 2);
600
601        sd_tx.send(true).unwrap();
602        let cursor = task.await.unwrap().unwrap();
603        assert_eq!(cursor.as_u64(), Some(2));
604    }
605
606    /// Exactly `max` updates fills a batch and flushes immediately — before the
607    /// window would have elapsed.
608    #[tokio::test(start_paused = true)]
609    async fn flush_on_max() {
610        let max = 4;
611        let updates: Vec<_> = (1..=max as u64)
612            .map(|i| put(&format!("k{i}"), b"v", i))
613            .collect();
614        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
615
616        let flushes = Arc::new(Mutex::new(Vec::<usize>::new()));
617        let f = Arc::clone(&flushes);
618        let (sd_tx, sd_rx) = watch::channel(false);
619
620        let task = tokio::spawn(watch_applied(
621            watcher,
622            WatchScope::All,
623            None,
624            None,
625            BatchConfig {
626                window: Duration::from_secs(3600), // effectively never
627                max,
628            },
629            parse_put,
630            move |batch: Vec<Vec<u8>>| f.lock().unwrap().push(batch.len()),
631            move |_| {},
632            sd_rx,
633        ));
634
635        // Yield enough for the mock to push all `max` updates; the window is an
636        // hour, so any flush is purely the max trigger.
637        tokio::time::sleep(Duration::from_millis(1)).await;
638        assert_eq!(
639            *flushes.lock().unwrap(),
640            vec![max],
641            "a full batch should flush on max, not wait for the window"
642        );
643
644        sd_tx.send(true).unwrap();
645        task.await.unwrap().unwrap();
646    }
647
648    /// A pending batch plus a shutdown signal: the batch is flushed and the
649    /// applied cursor returned.
650    #[tokio::test(start_paused = true)]
651    async fn flush_on_shutdown() {
652        let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
653        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
654
655        let applied = Arc::new(AtomicU64::new(0));
656        let a = Arc::clone(&applied);
657        let (sd_tx, sd_rx) = watch::channel(false);
658
659        let task = tokio::spawn(watch_applied(
660            watcher,
661            WatchScope::All,
662            None,
663            None,
664            BatchConfig {
665                window: Duration::from_secs(3600), // window won't fire
666                max: 100,
667            },
668            parse_put,
669            move |_batch: Vec<Vec<u8>>| {},
670            move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
671            sd_rx,
672        ));
673
674        // Give the mock time to deliver both updates into the pending batch.
675        tokio::time::sleep(Duration::from_millis(1)).await;
676        sd_tx.send(true).unwrap();
677
678        let cursor = task.await.unwrap().unwrap();
679        assert_eq!(
680            cursor.as_u64(),
681            Some(2),
682            "shutdown flushes the pending batch"
683        );
684        assert_eq!(applied.load(Ordering::SeqCst), 2);
685    }
686
687    /// The cursor must not advance until `apply` has returned. We prove it by
688    /// having `apply` read the cursor that `on_applied` last published: when the
689    /// second batch is applied, the visible cursor must still be the *first*
690    /// batch's — never the second's, which only becomes visible after this
691    /// `apply` returns.
692    #[tokio::test(start_paused = true)]
693    async fn cursor_advances_only_after_apply() {
694        // Two batches of `max` updates each.
695        let max = 2usize;
696        let updates: Vec<_> = (1..=4u64).map(|i| put(&format!("k{i}"), b"v", i)).collect();
697        let watcher = Arc::new(MockWatcher::new(updates, true)); // hold open
698
699        // Cursor as last published by on_applied; starts at 0 (nothing applied).
700        let published = Arc::new(AtomicU64::new(0));
701        // What `apply` observed as the published cursor at the moment it ran.
702        let seen_at_apply = Arc::new(Mutex::new(Vec::<u64>::new()));
703
704        let pub_for_apply = Arc::clone(&published);
705        let seen = Arc::clone(&seen_at_apply);
706        let pub_for_on = Arc::clone(&published);
707        let (sd_tx, sd_rx) = watch::channel(false);
708
709        let task = tokio::spawn(watch_applied(
710            watcher,
711            WatchScope::All,
712            None,
713            None,
714            BatchConfig {
715                window: Duration::from_secs(3600),
716                max,
717            },
718            parse_put,
719            move |_batch: Vec<Vec<u8>>| {
720                // The cursor visible here is whatever the PREVIOUS flush
721                // published — never this batch's, because we haven't returned.
722                seen.lock()
723                    .unwrap()
724                    .push(pub_for_apply.load(Ordering::SeqCst));
725            },
726            move |cur| pub_for_on.store(cur.as_u64().unwrap(), Ordering::SeqCst),
727            sd_rx,
728        ));
729
730        tokio::time::sleep(Duration::from_millis(1)).await;
731        sd_tx.send(true).unwrap();
732        task.await.unwrap().unwrap();
733
734        // First apply saw 0 (nothing applied yet); second apply saw 2 (first
735        // batch's cursor), NOT 4. The cursor only reached 4 after the second
736        // apply returned.
737        assert_eq!(*seen_at_apply.lock().unwrap(), vec![0, 2]);
738        assert_eq!(published.load(Ordering::SeqCst), 4);
739    }
740
741    /// Updates whose `parse` returns `None` (corrupt / irrelevant) carry no
742    /// domain work, but they were still received — so the cursor must advance
743    /// over them.
744    #[tokio::test]
745    async fn corrupt_parse_entries_advance_cursor() {
746        let updates = vec![put("a", b"1", 5), put("b", b"2", 6), put("c", b"3", 7)];
747        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
748
749        let apply_calls = Arc::new(AtomicU64::new(0));
750        let on_applied_max = Arc::new(AtomicU64::new(0));
751        let ac = Arc::clone(&apply_calls);
752        let om = Arc::clone(&on_applied_max);
753        let (_sd_tx, sd_rx) = watch::channel(false);
754
755        let cursor = watch_applied(
756            watcher,
757            WatchScope::All,
758            None,
759            None,
760            BatchConfig::default(),
761            // Reject everything — simulates corrupt/irrelevant entries.
762            |_u: &KvUpdate| -> Option<Vec<u8>> { None },
763            move |batch: Vec<Vec<u8>>| {
764                ac.fetch_add(1, Ordering::SeqCst);
765                assert!(batch.is_empty());
766            },
767            move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
768            sd_rx,
769        )
770        .await
771        .unwrap();
772
773        assert_eq!(cursor.as_u64(), Some(7), "cursor covers rejected updates");
774        assert_eq!(
775            apply_calls.load(Ordering::SeqCst),
776            0,
777            "an all-rejected batch applies nothing"
778        );
779        assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
780    }
781
782    /// A resume whose cursor has expired falls back to the full watch and still
783    /// applies the delivered updates.
784    #[tokio::test]
785    async fn cursor_expired_falls_back_to_full_watch() {
786        let mock = MockWatcher {
787            full: Mutex::new(Some(vec![put("a", b"1", 10), put("b", b"2", 11)])),
788            from: Mutex::new(Some(vec![])),
789            from_expires: true,
790            hold: false,
791        };
792        let watcher = Arc::new(mock);
793
794        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
795        let ab = Arc::clone(&applied_batches);
796        let (_sd_tx, sd_rx) = watch::channel(false);
797
798        let cursor = watch_applied(
799            watcher,
800            WatchScope::All,
801            Some(WatchCursor::from_u64(5)), // resume position that "expired"
802            None,
803            BatchConfig::default(),
804            parse_put,
805            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
806            move |_| {},
807            sd_rx,
808        )
809        .await
810        .unwrap();
811
812        assert_eq!(cursor.as_u64(), Some(11));
813        assert_eq!(
814            *applied_batches.lock().unwrap(),
815            vec![b"1".to_vec(), b"2".to_vec()],
816            "fallback full watch's updates were applied"
817        );
818    }
819
820    /// End-to-end with a real snapshot file: after the run, the persisted
821    /// snapshot's cursor equals the applied cursor and its entries match the
822    /// applied state — proving the checkpoint is written at the post-apply
823    /// cursor, never ahead of it.
824    #[tokio::test]
825    async fn snapshot_checkpoint_matches_applied_cursor() {
826        let dir = tempfile::TempDir::new().unwrap();
827        let path = dir.path().join("applied.snap");
828        let writer = SnapshotWriter::open(&path, u64::MAX).unwrap();
829
830        let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
831        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
832        let (_sd_tx, sd_rx) = watch::channel(false);
833
834        let cursor = watch_applied(
835            watcher,
836            WatchScope::All,
837            None,
838            Some(writer),
839            BatchConfig::default(),
840            parse_put,
841            move |_batch: Vec<Vec<u8>>| {},
842            move |_| {},
843            sd_rx,
844        )
845        .await
846        .unwrap();
847
848        assert_eq!(cursor.as_u64(), Some(2));
849
850        let snap = crate::snapshot::load(&path).unwrap().unwrap();
851        assert_eq!(
852            snap.cursor.as_u64(),
853            cursor.as_u64(),
854            "snapshot checkpoint cursor must equal the applied cursor"
855        );
856        assert_eq!(snap.entries.len(), 2);
857        assert_eq!(snap.entries["node.a"].value, b"1");
858        assert_eq!(snap.entries["node.b"].value, b"2");
859    }
860
861    /// Happy-path resume: a non-expired cursor takes the `*_from` path and the
862    /// delta (the `from` script, NOT the full set) is applied. Proves the
863    /// resume branch delivers only post-cursor updates and advances to their
864    /// max revision.
865    #[tokio::test]
866    async fn resume_from_cursor_delivers_only_delta() {
867        let mock = MockWatcher {
868            // `full` would be delivered only if the resume path were (wrongly)
869            // bypassed; a non-empty distinguishing value makes that visible.
870            full: Mutex::new(Some(vec![put("full.x", b"FULL", 1)])),
871            from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
872            from_expires: false,
873            hold: false,
874        };
875        let watcher = Arc::new(mock);
876
877        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
878        let ab = Arc::clone(&applied_batches);
879        let (_sd_tx, sd_rx) = watch::channel(false);
880
881        let cursor = watch_applied(
882            watcher,
883            WatchScope::All,
884            Some(WatchCursor::from_u64(9)), // resume past rev 9 — not expired
885            None,
886            BatchConfig::default(),
887            parse_put,
888            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
889            move |_| {},
890            sd_rx,
891        )
892        .await
893        .unwrap();
894
895        assert_eq!(
896            cursor.as_u64(),
897            Some(11),
898            "cursor advances to the delta max"
899        );
900        assert_eq!(
901            *applied_batches.lock().unwrap(),
902            vec![b"3".to_vec(), b"4".to_vec()],
903            "only the post-cursor delta is applied, never the full set"
904        );
905    }
906
907    /// `WatchScope::Prefix` with no resume dispatches to `watch_prefix` and
908    /// applies the delivered updates. Every other test uses `WatchScope::All`;
909    /// this covers the prefix dispatch arm.
910    #[tokio::test]
911    async fn prefix_scope_applies_delivered_updates() {
912        let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
913        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
914
915        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
916        let ab = Arc::clone(&applied_batches);
917        let (_sd_tx, sd_rx) = watch::channel(false);
918
919        let cursor = watch_applied(
920            watcher,
921            WatchScope::Prefix("node.".to_string()),
922            None,
923            None,
924            BatchConfig::default(),
925            parse_put,
926            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
927            move |_| {},
928            sd_rx,
929        )
930        .await
931        .unwrap();
932
933        assert_eq!(cursor.as_u64(), Some(2));
934        assert_eq!(
935            *applied_batches.lock().unwrap(),
936            vec![b"1".to_vec(), b"2".to_vec()]
937        );
938    }
939
940    /// `WatchScope::Prefix` resume whose cursor has expired falls back to the
941    /// full `watch_prefix` and still applies the delivered updates — the prefix
942    /// twin of `cursor_expired_falls_back_to_full_watch`.
943    #[tokio::test]
944    async fn prefix_cursor_expired_falls_back_to_full_prefix_watch() {
945        let mock = MockWatcher {
946            full: Mutex::new(Some(vec![put("node.a", b"1", 10), put("node.b", b"2", 11)])),
947            from: Mutex::new(Some(vec![])),
948            from_expires: true,
949            hold: false,
950        };
951        let watcher = Arc::new(mock);
952
953        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
954        let ab = Arc::clone(&applied_batches);
955        let (_sd_tx, sd_rx) = watch::channel(false);
956
957        let cursor = watch_applied(
958            watcher,
959            WatchScope::Prefix("node.".to_string()),
960            Some(WatchCursor::from_u64(5)), // resume position that "expired"
961            None,
962            BatchConfig::default(),
963            parse_put,
964            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
965            move |_| {},
966            sd_rx,
967        )
968        .await
969        .unwrap();
970
971        assert_eq!(cursor.as_u64(), Some(11));
972        assert_eq!(
973            *applied_batches.lock().unwrap(),
974            vec![b"1".to_vec(), b"2".to_vec()],
975            "prefix fallback full watch's updates were applied"
976        );
977    }
978
979    /// The watch task's terminal error must propagate out of `watch_applied`
980    /// rather than being swallowed as `Ok(applied)` when the channel closes.
981    #[tokio::test]
982    async fn watch_task_error_propagates() {
983        let watcher = Arc::new(ErrorWatcher);
984        let (_sd_tx, sd_rx) = watch::channel(false);
985
986        let result = watch_applied(
987            watcher,
988            WatchScope::All,
989            None,
990            None,
991            BatchConfig::default(),
992            parse_put,
993            move |_batch: Vec<Vec<u8>>| {},
994            move |_| {},
995            sd_rx,
996        )
997        .await;
998
999        match result {
1000            Err(KvError::WatchError(msg)) => {
1001                assert!(msg.contains("injected"), "error carries the cause: {msg}");
1002            }
1003            other => panic!("expected WatchError, got {other:?}"),
1004        }
1005    }
1006
1007    /// A batch where `parse` accepts some updates and rejects others: the cursor
1008    /// must still advance to the highest *received* revision (covering the
1009    /// rejected entry in the middle), while `apply` sees only the accepted ones.
1010    #[tokio::test]
1011    async fn mixed_parse_advances_cursor_over_rejected_entries() {
1012        let updates = vec![
1013            put("keep.a", b"1", 5),
1014            put("skip.b", b"2", 6), // rejected by parse
1015            put("keep.c", b"3", 7),
1016        ];
1017        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1018
1019        let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
1020        let on_applied_max = Arc::new(AtomicU64::new(0));
1021        let ab = Arc::clone(&applied_batches);
1022        let om = Arc::clone(&on_applied_max);
1023        let (_sd_tx, sd_rx) = watch::channel(false);
1024
1025        let cursor = watch_applied(
1026            watcher,
1027            WatchScope::All,
1028            None,
1029            None,
1030            BatchConfig::default(),
1031            // Keep only keys under "keep."; reject everything else.
1032            |u: &KvUpdate| -> Option<Vec<u8>> {
1033                match u {
1034                    KvUpdate::Put(e) if e.key.starts_with("keep.") => Some(e.value.clone()),
1035                    _ => None,
1036                }
1037            },
1038            move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
1039            move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
1040            sd_rx,
1041        )
1042        .await
1043        .unwrap();
1044
1045        assert_eq!(
1046            cursor.as_u64(),
1047            Some(7),
1048            "cursor covers the rejected middle entry (rev 6)"
1049        );
1050        assert_eq!(
1051            *applied_batches.lock().unwrap(),
1052            vec![b"1".to_vec(), b"3".to_vec()],
1053            "apply sees only the accepted entries"
1054        );
1055        assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
1056    }
1057
1058    /// Shutdown before any update arrives: nothing was received, so the cursor
1059    /// stays at the resume position (here `none()`), `apply` never runs, and
1060    /// `on_applied` never fires.
1061    #[tokio::test(start_paused = true)]
1062    async fn shutdown_with_no_pending_batch() {
1063        let watcher = Arc::new(MockWatcher::new(vec![], true)); // deliver nothing, hold open
1064
1065        let apply_calls = Arc::new(AtomicU64::new(0));
1066        let on_applied_calls = Arc::new(AtomicU64::new(0));
1067        let ac = Arc::clone(&apply_calls);
1068        let oc = Arc::clone(&on_applied_calls);
1069        let (sd_tx, sd_rx) = watch::channel(false);
1070
1071        let task = tokio::spawn(watch_applied(
1072            watcher,
1073            WatchScope::All,
1074            None,
1075            None,
1076            BatchConfig::default(),
1077            parse_put,
1078            move |_batch: Vec<Vec<u8>>| {
1079                ac.fetch_add(1, Ordering::SeqCst);
1080            },
1081            move |_| {
1082                oc.fetch_add(1, Ordering::SeqCst);
1083            },
1084            sd_rx,
1085        ));
1086
1087        // Let the watcher attach and idle (it has nothing to deliver), then shut down.
1088        tokio::time::sleep(Duration::from_millis(1)).await;
1089        sd_tx.send(true).unwrap();
1090
1091        let cursor = task.await.unwrap().unwrap();
1092        assert_eq!(
1093            cursor.as_u64(),
1094            None,
1095            "no updates received → cursor unmoved"
1096        );
1097        assert_eq!(apply_calls.load(Ordering::SeqCst), 0, "apply never runs");
1098        assert_eq!(
1099            on_applied_calls.load(Ordering::SeqCst),
1100            0,
1101            "on_applied never fires"
1102        );
1103    }
1104
1105    /// With a low `compact_threshold`, the flush path's `spawn_blocking`
1106    /// compaction actually fires (every other snapshot test pins the threshold
1107    /// at `u64::MAX`, leaving that branch dead). After a compacting run the
1108    /// snapshot must still load cleanly with the right cursor and entries.
1109    #[tokio::test]
1110    async fn snapshot_compaction_fires_and_stays_consistent() {
1111        let dir = tempfile::TempDir::new().unwrap();
1112        let path = dir.path().join("applied.snap");
1113        // threshold 0 → every checkpoint reports "needs compact", forcing the
1114        // spawn_blocking compaction branch on each flush.
1115        let writer = SnapshotWriter::open(&path, 0).unwrap();
1116
1117        // Re-put the same key across flushes so compaction has duplicates to
1118        // dedup; small max forces multiple flushes (hence multiple compactions).
1119        let updates = vec![
1120            put("node.a", b"1", 1),
1121            put("node.a", b"2", 2),
1122            put("node.b", b"3", 3),
1123            put("node.a", b"4", 4),
1124        ];
1125        let watcher = Arc::new(MockWatcher::new(updates, false)); // close after
1126        let (_sd_tx, sd_rx) = watch::channel(false);
1127
1128        let cursor = watch_applied(
1129            watcher,
1130            WatchScope::All,
1131            None,
1132            Some(writer),
1133            BatchConfig {
1134                window: Duration::from_secs(3600),
1135                max: 1, // one update per flush → a compaction per update
1136            },
1137            parse_put,
1138            move |_batch: Vec<Vec<u8>>| {},
1139            move |_| {},
1140            sd_rx,
1141        )
1142        .await
1143        .unwrap();
1144
1145        assert_eq!(cursor.as_u64(), Some(4));
1146
1147        let snap = crate::snapshot::load(&path).unwrap().unwrap();
1148        assert_eq!(
1149            snap.cursor.as_u64(),
1150            cursor.as_u64(),
1151            "compacted snapshot's cursor still equals the applied cursor"
1152        );
1153        assert_eq!(snap.entries.len(), 2, "duplicates of node.a deduped");
1154        assert_eq!(
1155            snap.entries["node.a"].value, b"4",
1156            "last write per key survives compaction"
1157        );
1158        assert_eq!(snap.entries["node.b"].value, b"3");
1159    }
1160}