use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::watch;
use tracing::warn;
use crate::kv::{KvError, KvUpdate, KvWatcher, WatchCursor};
use crate::snapshot::SnapshotStore;
#[derive(Debug, Clone)]
pub enum WatchScope {
All,
Prefix(String),
}
#[derive(Debug, Clone, Copy)]
pub struct BatchConfig {
pub window: Duration,
pub max: usize,
}
impl Default for BatchConfig {
fn default() -> Self {
Self {
window: Duration::from_millis(10),
max: 100,
}
}
}
#[allow(clippy::too_many_arguments)]
#[allow(unused_assignments)]
pub async fn watch_applied<U, S, P, A, O>(
watcher: Arc<dyn KvWatcher>,
scope: WatchScope,
resume: Option<WatchCursor>,
mut store: Option<S>,
config: BatchConfig,
mut parse: P,
mut apply: A,
mut on_applied: O,
mut shutdown: watch::Receiver<bool>,
) -> Result<WatchCursor, KvError>
where
U: Send,
S: SnapshotStore + Send + 'static,
P: FnMut(&KvUpdate) -> Option<U> + Send,
A: FnMut(Vec<U>) + Send,
O: FnMut(WatchCursor) + Send,
{
let mut applied = match &resume {
Some(c) => c.clone(),
None => WatchCursor::none(),
};
let (tx, mut rx) = mpsc::channel::<KvUpdate>(256);
let handle = {
let watcher = Arc::clone(&watcher);
tokio::spawn(async move { run_watch(watcher.as_ref(), &scope, resume, tx).await })
};
let batch_cap = config.max.clamp(1, 64);
let mut batch: Vec<U> = Vec::with_capacity(batch_cap);
let mut raw_batch: Vec<KvUpdate> = Vec::new();
let mut batch_high = WatchCursor::none();
let mut batch_deadline: Option<tokio::time::Instant> = None;
macro_rules! flush {
() => {{
if !batch.is_empty() || !batch_high.is_none() {
if !batch.is_empty() {
apply(std::mem::replace(&mut batch, Vec::with_capacity(batch_cap)));
}
if !batch_high.is_none() {
applied = batch_high.clone();
if let Some(mut st) = store.take() {
let raw = std::mem::take(&mut raw_batch);
let cur = applied.clone();
match tokio::task::spawn_blocking(move || {
let res = st.apply(&raw, &cur);
(st, res)
})
.await
{
Ok((st, Ok(()))) => store = Some(st),
Ok((st, Err(e))) => {
warn!(error = %e, "snapshot store apply failed; continuing");
store = Some(st);
}
Err(e) => {
warn!(error = %e, "snapshot store task panicked; aborting watch");
handle.abort();
return Err(KvError::WatchError(format!(
"snapshot store task panicked: {e}"
)));
}
}
}
on_applied(applied.clone());
batch_high = WatchCursor::none();
}
}
batch_deadline = None;
}};
}
let sleep = tokio::time::sleep(Duration::ZERO);
tokio::pin!(sleep);
loop {
tokio::select! {
biased;
res = shutdown.changed() => {
if res.is_err() || *shutdown.borrow() {
flush!();
handle.abort();
if let Err(join) = handle.await
&& !join.is_cancelled()
{
warn!(error = %join, "watch task panicked at shutdown");
}
return Ok(applied);
}
}
() = &mut sleep, if batch_deadline.is_some() => {
flush!();
}
update = rx.recv() => {
match update {
Some(u) => {
batch_high = WatchCursor::from_version(u.version().clone());
if store.is_some() {
raw_batch.push(u.clone());
}
if let Some(parsed) = parse(&u) {
batch.push(parsed);
}
if batch_deadline.is_none() {
let deadline = tokio::time::Instant::now() + config.window;
sleep.as_mut().reset(deadline);
batch_deadline = Some(deadline);
}
if batch.len() >= config.max || raw_batch.len() >= config.max {
flush!();
}
}
None => {
flush!();
return match handle.await {
Ok(Ok(())) => Ok(applied),
Ok(Err(e)) => Err(e),
Err(join) => Err(KvError::WatchError(format!(
"watch task panicked: {join}"
))),
};
}
}
}
}
}
}
async fn run_watch(
watcher: &dyn KvWatcher,
scope: &WatchScope,
resume: Option<WatchCursor>,
tx: mpsc::Sender<KvUpdate>,
) -> Result<(), KvError> {
let resume_cursor = resume.filter(|c| !c.is_none());
match scope {
WatchScope::All => {
if let Some(cursor) = resume_cursor {
match watcher.watch_all_from(&cursor, tx.clone()).await {
Err(KvError::CursorExpired) => {
warn!("watch cursor expired, falling back to full watch_all");
watcher.watch_all(tx).await
}
other => other,
}
} else {
watcher.watch_all(tx).await
}
}
WatchScope::Prefix(prefix) => {
if let Some(cursor) = resume_cursor {
match watcher.watch_prefix_from(prefix, &cursor, tx.clone()).await {
Err(KvError::CursorExpired) => {
warn!("watch cursor expired, falling back to full watch_prefix");
watcher.watch_prefix(prefix, tx).await
}
other => other,
}
} else {
watcher.watch_prefix(prefix, tx).await
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kv::{KvEntry, VersionToken};
use crate::snapshot::AppendLogSnapshot;
use async_trait::async_trait;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::mpsc::Sender;
fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
KvUpdate::Put(KvEntry {
key: key.to_string(),
value: value.to_vec(),
version: VersionToken::from_u64(rev),
})
}
struct MockWatcher {
full: Mutex<Option<Vec<KvUpdate>>>,
from: Mutex<Option<Vec<KvUpdate>>>,
from_expires: bool,
hold: bool,
}
impl MockWatcher {
fn new(updates: Vec<KvUpdate>, hold: bool) -> Self {
Self {
full: Mutex::new(Some(updates)),
from: Mutex::new(None),
from_expires: false,
hold,
}
}
async fn deliver(&self, which: &Mutex<Option<Vec<KvUpdate>>>, tx: Sender<KvUpdate>) {
let updates = which.lock().unwrap().take().unwrap_or_default();
for u in updates {
if tx.send(u).await.is_err() {
return;
}
}
if self.hold {
std::future::pending::<()>().await;
}
}
}
#[async_trait]
impl KvWatcher for MockWatcher {
async fn watch_all(&self, tx: Sender<KvUpdate>) -> Result<(), KvError> {
self.deliver(&self.full, tx).await;
Ok(())
}
async fn watch_prefix(&self, _prefix: &str, tx: Sender<KvUpdate>) -> Result<(), KvError> {
self.deliver(&self.full, tx).await;
Ok(())
}
async fn watch_all_from(
&self,
_cursor: &WatchCursor,
tx: Sender<KvUpdate>,
) -> Result<(), KvError> {
if self.from_expires {
return Err(KvError::CursorExpired);
}
self.deliver(&self.from, tx).await;
Ok(())
}
async fn watch_prefix_from(
&self,
_prefix: &str,
_cursor: &WatchCursor,
tx: Sender<KvUpdate>,
) -> Result<(), KvError> {
if self.from_expires {
return Err(KvError::CursorExpired);
}
self.deliver(&self.from, tx).await;
Ok(())
}
}
struct ErrorWatcher;
#[async_trait]
impl KvWatcher for ErrorWatcher {
async fn watch_all(&self, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
Err(KvError::WatchError("injected watch failure".into()))
}
async fn watch_prefix(&self, _prefix: &str, _tx: Sender<KvUpdate>) -> Result<(), KvError> {
Err(KvError::WatchError("injected watch failure".into()))
}
}
fn parse_put(u: &KvUpdate) -> Option<Vec<u8>> {
match u {
KvUpdate::Put(e) => Some(e.value.clone()),
_ => None,
}
}
#[tokio::test]
async fn flush_on_channel_close() {
let updates = vec![put("a", b"1", 1), put("b", b"2", 2), put("c", b"3", 3)];
let watcher = Arc::new(MockWatcher::new(updates, false));
let applied_batches = Arc::new(Mutex::new(Vec::<Vec<Vec<u8>>>::new()));
let on_applied_cursors = Arc::new(Mutex::new(Vec::<u64>::new()));
let ab = Arc::clone(&applied_batches);
let oc = Arc::clone(&on_applied_cursors);
let (_sd_tx, sd_rx) = watch::channel(false);
let cursor = watch_applied(
watcher,
WatchScope::All,
None,
None::<AppendLogSnapshot>,
BatchConfig::default(),
parse_put,
move |batch| ab.lock().unwrap().push(batch),
move |c| oc.lock().unwrap().push(c.as_u64().unwrap()),
sd_rx,
)
.await
.unwrap();
assert_eq!(cursor.as_u64(), Some(3));
let batches = applied_batches.lock().unwrap();
let flat: Vec<Vec<u8>> = batches.iter().flatten().cloned().collect();
assert_eq!(flat, vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec()]);
assert_eq!(*on_applied_cursors.lock().unwrap().last().unwrap(), 3);
}
#[tokio::test(start_paused = true)]
async fn flush_on_window() {
let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
let watcher = Arc::new(MockWatcher::new(updates, true));
let applied = Arc::new(AtomicU64::new(0));
let count = Arc::new(AtomicU64::new(0));
let a = Arc::clone(&applied);
let c = Arc::clone(&count);
let (sd_tx, sd_rx) = watch::channel(false);
let task = tokio::spawn(watch_applied(
watcher,
WatchScope::All,
None,
None::<AppendLogSnapshot>,
BatchConfig::default(),
parse_put,
move |batch: Vec<Vec<u8>>| {
c.fetch_add(batch.len() as u64, Ordering::SeqCst);
},
move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
sd_rx,
));
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(
count.load(Ordering::SeqCst),
2,
"window should have flushed"
);
assert_eq!(applied.load(Ordering::SeqCst), 2);
sd_tx.send(true).unwrap();
let cursor = task.await.unwrap().unwrap();
assert_eq!(cursor.as_u64(), Some(2));
}
#[tokio::test(start_paused = true)]
async fn flush_on_max() {
let max = 4;
let updates: Vec<_> = (1..=max as u64)
.map(|i| put(&format!("k{i}"), b"v", i))
.collect();
let watcher = Arc::new(MockWatcher::new(updates, true));
let flushes = Arc::new(Mutex::new(Vec::<usize>::new()));
let f = Arc::clone(&flushes);
let (sd_tx, sd_rx) = watch::channel(false);
let task = tokio::spawn(watch_applied(
watcher,
WatchScope::All,
None,
None::<AppendLogSnapshot>,
BatchConfig {
window: Duration::from_secs(3600), max,
},
parse_put,
move |batch: Vec<Vec<u8>>| f.lock().unwrap().push(batch.len()),
move |_| {},
sd_rx,
));
tokio::time::sleep(Duration::from_millis(1)).await;
assert_eq!(
*flushes.lock().unwrap(),
vec![max],
"a full batch should flush on max, not wait for the window"
);
sd_tx.send(true).unwrap();
task.await.unwrap().unwrap();
}
#[tokio::test(start_paused = true)]
async fn flush_on_shutdown() {
let updates = vec![put("a", b"1", 1), put("b", b"2", 2)];
let watcher = Arc::new(MockWatcher::new(updates, true));
let applied = Arc::new(AtomicU64::new(0));
let a = Arc::clone(&applied);
let (sd_tx, sd_rx) = watch::channel(false);
let task = tokio::spawn(watch_applied(
watcher,
WatchScope::All,
None,
None::<AppendLogSnapshot>,
BatchConfig {
window: Duration::from_secs(3600), max: 100,
},
parse_put,
move |_batch: Vec<Vec<u8>>| {},
move |cur| a.store(cur.as_u64().unwrap(), Ordering::SeqCst),
sd_rx,
));
tokio::time::sleep(Duration::from_millis(1)).await;
sd_tx.send(true).unwrap();
let cursor = task.await.unwrap().unwrap();
assert_eq!(
cursor.as_u64(),
Some(2),
"shutdown flushes the pending batch"
);
assert_eq!(applied.load(Ordering::SeqCst), 2);
}
#[tokio::test(start_paused = true)]
async fn cursor_advances_only_after_apply() {
let max = 2usize;
let updates: Vec<_> = (1..=4u64).map(|i| put(&format!("k{i}"), b"v", i)).collect();
let watcher = Arc::new(MockWatcher::new(updates, true));
let published = Arc::new(AtomicU64::new(0));
let seen_at_apply = Arc::new(Mutex::new(Vec::<u64>::new()));
let pub_for_apply = Arc::clone(&published);
let seen = Arc::clone(&seen_at_apply);
let pub_for_on = Arc::clone(&published);
let (sd_tx, sd_rx) = watch::channel(false);
let task = tokio::spawn(watch_applied(
watcher,
WatchScope::All,
None,
None::<AppendLogSnapshot>,
BatchConfig {
window: Duration::from_secs(3600),
max,
},
parse_put,
move |_batch: Vec<Vec<u8>>| {
seen.lock()
.unwrap()
.push(pub_for_apply.load(Ordering::SeqCst));
},
move |cur| pub_for_on.store(cur.as_u64().unwrap(), Ordering::SeqCst),
sd_rx,
));
tokio::time::sleep(Duration::from_millis(1)).await;
sd_tx.send(true).unwrap();
task.await.unwrap().unwrap();
assert_eq!(*seen_at_apply.lock().unwrap(), vec![0, 2]);
assert_eq!(published.load(Ordering::SeqCst), 4);
}
#[tokio::test]
async fn corrupt_parse_entries_advance_cursor() {
let updates = vec![put("a", b"1", 5), put("b", b"2", 6), put("c", b"3", 7)];
let watcher = Arc::new(MockWatcher::new(updates, false));
let apply_calls = Arc::new(AtomicU64::new(0));
let on_applied_max = Arc::new(AtomicU64::new(0));
let ac = Arc::clone(&apply_calls);
let om = Arc::clone(&on_applied_max);
let (_sd_tx, sd_rx) = watch::channel(false);
let cursor = watch_applied(
watcher,
WatchScope::All,
None,
None::<AppendLogSnapshot>,
BatchConfig::default(),
|_u: &KvUpdate| -> Option<Vec<u8>> { None },
move |batch: Vec<Vec<u8>>| {
ac.fetch_add(1, Ordering::SeqCst);
assert!(batch.is_empty());
},
move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
sd_rx,
)
.await
.unwrap();
assert_eq!(cursor.as_u64(), Some(7), "cursor covers rejected updates");
assert_eq!(
apply_calls.load(Ordering::SeqCst),
0,
"an all-rejected batch applies nothing"
);
assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
}
#[tokio::test]
async fn cursor_expired_falls_back_to_full_watch() {
let mock = MockWatcher {
full: Mutex::new(Some(vec![put("a", b"1", 10), put("b", b"2", 11)])),
from: Mutex::new(Some(vec![])),
from_expires: true,
hold: false,
};
let watcher = Arc::new(mock);
let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
let ab = Arc::clone(&applied_batches);
let (_sd_tx, sd_rx) = watch::channel(false);
let cursor = watch_applied(
watcher,
WatchScope::All,
Some(WatchCursor::from_u64(5)), None::<AppendLogSnapshot>,
BatchConfig::default(),
parse_put,
move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
move |_| {},
sd_rx,
)
.await
.unwrap();
assert_eq!(cursor.as_u64(), Some(11));
assert_eq!(
*applied_batches.lock().unwrap(),
vec![b"1".to_vec(), b"2".to_vec()],
"fallback full watch's updates were applied"
);
}
#[tokio::test]
async fn snapshot_checkpoint_matches_applied_cursor() {
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("applied.snap");
let (_resume, store) = AppendLogSnapshot::open(&path, u64::MAX).unwrap();
let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
let watcher = Arc::new(MockWatcher::new(updates, false)); let (_sd_tx, sd_rx) = watch::channel(false);
let cursor = watch_applied(
watcher,
WatchScope::All,
None,
Some(store),
BatchConfig::default(),
parse_put,
move |_batch: Vec<Vec<u8>>| {},
move |_| {},
sd_rx,
)
.await
.unwrap();
assert_eq!(cursor.as_u64(), Some(2));
let snap = crate::snapshot::load(&path).unwrap().unwrap();
assert_eq!(
snap.cursor.as_u64(),
cursor.as_u64(),
"snapshot checkpoint cursor must equal the applied cursor"
);
assert_eq!(snap.entries.len(), 2);
assert_eq!(snap.entries["node.a"].value, b"1");
assert_eq!(snap.entries["node.b"].value, b"2");
}
#[tokio::test]
async fn resume_from_cursor_delivers_only_delta() {
let mock = MockWatcher {
full: Mutex::new(Some(vec![put("full.x", b"FULL", 1)])),
from: Mutex::new(Some(vec![put("node.c", b"3", 10), put("node.d", b"4", 11)])),
from_expires: false,
hold: false,
};
let watcher = Arc::new(mock);
let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
let ab = Arc::clone(&applied_batches);
let (_sd_tx, sd_rx) = watch::channel(false);
let cursor = watch_applied(
watcher,
WatchScope::All,
Some(WatchCursor::from_u64(9)), None::<AppendLogSnapshot>,
BatchConfig::default(),
parse_put,
move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
move |_| {},
sd_rx,
)
.await
.unwrap();
assert_eq!(
cursor.as_u64(),
Some(11),
"cursor advances to the delta max"
);
assert_eq!(
*applied_batches.lock().unwrap(),
vec![b"3".to_vec(), b"4".to_vec()],
"only the post-cursor delta is applied, never the full set"
);
}
#[tokio::test]
async fn prefix_scope_applies_delivered_updates() {
let updates = vec![put("node.a", b"1", 1), put("node.b", b"2", 2)];
let watcher = Arc::new(MockWatcher::new(updates, false));
let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
let ab = Arc::clone(&applied_batches);
let (_sd_tx, sd_rx) = watch::channel(false);
let cursor = watch_applied(
watcher,
WatchScope::Prefix("node.".to_string()),
None,
None::<AppendLogSnapshot>,
BatchConfig::default(),
parse_put,
move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
move |_| {},
sd_rx,
)
.await
.unwrap();
assert_eq!(cursor.as_u64(), Some(2));
assert_eq!(
*applied_batches.lock().unwrap(),
vec![b"1".to_vec(), b"2".to_vec()]
);
}
#[tokio::test]
async fn prefix_cursor_expired_falls_back_to_full_prefix_watch() {
let mock = MockWatcher {
full: Mutex::new(Some(vec![put("node.a", b"1", 10), put("node.b", b"2", 11)])),
from: Mutex::new(Some(vec![])),
from_expires: true,
hold: false,
};
let watcher = Arc::new(mock);
let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
let ab = Arc::clone(&applied_batches);
let (_sd_tx, sd_rx) = watch::channel(false);
let cursor = watch_applied(
watcher,
WatchScope::Prefix("node.".to_string()),
Some(WatchCursor::from_u64(5)), None::<AppendLogSnapshot>,
BatchConfig::default(),
parse_put,
move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
move |_| {},
sd_rx,
)
.await
.unwrap();
assert_eq!(cursor.as_u64(), Some(11));
assert_eq!(
*applied_batches.lock().unwrap(),
vec![b"1".to_vec(), b"2".to_vec()],
"prefix fallback full watch's updates were applied"
);
}
#[tokio::test]
async fn watch_task_error_propagates() {
let watcher = Arc::new(ErrorWatcher);
let (_sd_tx, sd_rx) = watch::channel(false);
let result = watch_applied(
watcher,
WatchScope::All,
None,
None::<AppendLogSnapshot>,
BatchConfig::default(),
parse_put,
move |_batch: Vec<Vec<u8>>| {},
move |_| {},
sd_rx,
)
.await;
match result {
Err(KvError::WatchError(msg)) => {
assert!(msg.contains("injected"), "error carries the cause: {msg}");
}
other => panic!("expected WatchError, got {other:?}"),
}
}
#[tokio::test]
async fn mixed_parse_advances_cursor_over_rejected_entries() {
let updates = vec![
put("keep.a", b"1", 5),
put("skip.b", b"2", 6), put("keep.c", b"3", 7),
];
let watcher = Arc::new(MockWatcher::new(updates, false));
let applied_batches = Arc::new(Mutex::new(Vec::<Vec<u8>>::new()));
let on_applied_max = Arc::new(AtomicU64::new(0));
let ab = Arc::clone(&applied_batches);
let om = Arc::clone(&on_applied_max);
let (_sd_tx, sd_rx) = watch::channel(false);
let cursor = watch_applied(
watcher,
WatchScope::All,
None,
None::<AppendLogSnapshot>,
BatchConfig::default(),
|u: &KvUpdate| -> Option<Vec<u8>> {
match u {
KvUpdate::Put(e) if e.key.starts_with("keep.") => Some(e.value.clone()),
_ => None,
}
},
move |batch: Vec<Vec<u8>>| ab.lock().unwrap().extend(batch),
move |cur| om.store(cur.as_u64().unwrap(), Ordering::SeqCst),
sd_rx,
)
.await
.unwrap();
assert_eq!(
cursor.as_u64(),
Some(7),
"cursor covers the rejected middle entry (rev 6)"
);
assert_eq!(
*applied_batches.lock().unwrap(),
vec![b"1".to_vec(), b"3".to_vec()],
"apply sees only the accepted entries"
);
assert_eq!(on_applied_max.load(Ordering::SeqCst), 7);
}
#[tokio::test(start_paused = true)]
async fn shutdown_with_no_pending_batch() {
let watcher = Arc::new(MockWatcher::new(vec![], true));
let apply_calls = Arc::new(AtomicU64::new(0));
let on_applied_calls = Arc::new(AtomicU64::new(0));
let ac = Arc::clone(&apply_calls);
let oc = Arc::clone(&on_applied_calls);
let (sd_tx, sd_rx) = watch::channel(false);
let task = tokio::spawn(watch_applied(
watcher,
WatchScope::All,
None,
None::<AppendLogSnapshot>,
BatchConfig::default(),
parse_put,
move |_batch: Vec<Vec<u8>>| {
ac.fetch_add(1, Ordering::SeqCst);
},
move |_| {
oc.fetch_add(1, Ordering::SeqCst);
},
sd_rx,
));
tokio::time::sleep(Duration::from_millis(1)).await;
sd_tx.send(true).unwrap();
let cursor = task.await.unwrap().unwrap();
assert_eq!(
cursor.as_u64(),
None,
"no updates received → cursor unmoved"
);
assert_eq!(apply_calls.load(Ordering::SeqCst), 0, "apply never runs");
assert_eq!(
on_applied_calls.load(Ordering::SeqCst),
0,
"on_applied never fires"
);
}
#[tokio::test]
async fn snapshot_compaction_fires_and_stays_consistent() {
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("applied.snap");
let (_resume, store) = AppendLogSnapshot::open(&path, 0).unwrap();
let updates = vec![
put("node.a", b"1", 1),
put("node.a", b"2", 2),
put("node.b", b"3", 3),
put("node.a", b"4", 4),
];
let watcher = Arc::new(MockWatcher::new(updates, false)); let (_sd_tx, sd_rx) = watch::channel(false);
let cursor = watch_applied(
watcher,
WatchScope::All,
None,
Some(store),
BatchConfig {
window: Duration::from_secs(3600),
max: 1, },
parse_put,
move |_batch: Vec<Vec<u8>>| {},
move |_| {},
sd_rx,
)
.await
.unwrap();
assert_eq!(cursor.as_u64(), Some(4));
let snap = crate::snapshot::load(&path).unwrap().unwrap();
assert_eq!(
snap.cursor.as_u64(),
cursor.as_u64(),
"compacted snapshot's cursor still equals the applied cursor"
);
assert_eq!(snap.entries.len(), 2, "duplicates of node.a deduped");
assert_eq!(
snap.entries["node.a"].value, b"4",
"last write per key survives compaction"
);
assert_eq!(snap.entries["node.b"].value, b"3");
}
}