1use std::collections::{HashMap, HashSet};
42use std::fs::{self, File, OpenOptions};
43use std::io::{self, Write};
44use std::path::{Path, PathBuf};
45
46use crate::kv::{KvEntry, KvUpdate, VersionToken, WatchCursor};
47
48const MAGIC: &[u8; 4] = b"PGSS";
49const FORMAT_VERSION: u16 = 2;
54const HEADER_LEN: usize = 6;
55
56const REC_PUT: u8 = 0x01;
57const REC_DELETE: u8 = 0x02;
58const REC_CURSOR: u8 = 0x03;
59
60const MIN_CURSOR_RECORD: usize = 4 + 1 + 1; #[derive(Debug, thiserror::Error)]
69pub enum SnapshotError {
70 #[error("snapshot I/O error: {0}")]
71 Io(#[from] io::Error),
72 #[error("invalid snapshot format: {0}")]
73 InvalidFormat(String),
74 #[error("snapshot corrupted (CRC mismatch)")]
75 Corrupted,
76}
77
78#[derive(Debug)]
80pub struct Snapshot {
81 pub cursor: WatchCursor,
83 pub entries: HashMap<String, KvEntry>,
85}
86
87impl Snapshot {
88 pub fn stale_keys<'a, I>(&'a self, current_keys: I) -> Vec<&'a str>
94 where
95 I: IntoIterator<Item = &'a str>,
96 {
97 let current: HashSet<&str> = current_keys.into_iter().collect();
98 self.entries
99 .keys()
100 .filter(|k| !current.contains(k.as_str()))
101 .map(|k| k.as_str())
102 .collect()
103 }
104}
105
106pub struct SnapshotWriter {
117 path: PathBuf,
118 writer: Option<io::BufWriter<File>>,
124 bytes_since_compact: u64,
125 compact_threshold: u64,
126}
127
128impl SnapshotWriter {
129 pub fn open(path: &Path, compact_threshold: u64) -> Result<Self, SnapshotError> {
135 let file = OpenOptions::new().create(true).append(true).open(path)?;
142 let existing_len = file.metadata()?.len();
143
144 let mut writer = io::BufWriter::new(file);
145
146 let bytes_since_compact = if existing_len >= HEADER_LEN as u64 {
151 existing_len - HEADER_LEN as u64
152 } else {
153 writer.write_all(MAGIC)?;
154 writer.write_all(&FORMAT_VERSION.to_le_bytes())?;
155 writer.flush()?;
156 0
157 };
158
159 Ok(Self {
160 path: path.to_path_buf(),
161 writer: Some(writer),
162 bytes_since_compact,
163 compact_threshold,
164 })
165 }
166
167 fn writer(&mut self) -> Result<&mut io::BufWriter<File>, SnapshotError> {
171 self.writer.as_mut().ok_or_else(|| {
172 SnapshotError::Io(io::Error::other(
173 "snapshot writer poisoned: a prior compact() failed to reopen the log for append",
174 ))
175 })
176 }
177
178 #[must_use = "I/O errors mean the write was lost"]
182 pub fn write_update(&mut self, update: &KvUpdate) -> Result<(), SnapshotError> {
183 let w = self.writer()?;
184 let bytes = match update {
185 KvUpdate::Put(entry) => write_put_record(w, &entry.key, &entry.value, &entry.version)?,
186 KvUpdate::Delete { key, version } | KvUpdate::Purge { key, version } => {
187 write_delete_record(w, key, version)?
188 }
189 };
190 self.bytes_since_compact += bytes as u64;
191 Ok(())
192 }
193
194 #[must_use = "returns true when compaction is needed"]
208 pub fn checkpoint(&mut self, cursor: &WatchCursor) -> Result<bool, SnapshotError> {
209 let w = self.writer()?;
210 let bytes = write_cursor_record(w, cursor)?;
211 w.flush()?;
212 self.bytes_since_compact += bytes as u64;
213 Ok(self.bytes_since_compact > self.compact_threshold)
214 }
215
216 #[must_use = "I/O errors mean the flush failed"]
218 pub fn flush(&mut self) -> Result<(), SnapshotError> {
219 self.writer()?.flush()?;
220 Ok(())
221 }
222
223 #[must_use = "compaction errors leave the log uncompacted"]
228 pub fn compact(&mut self) -> Result<(), SnapshotError> {
229 self.writer()?.flush()?;
236 let data = fs::read(&self.path)?;
237 let (entries, cursor, _already_compact) = replay_log(&data)?;
238 compact_to_file(&self.path, &entries, &cursor)?;
239
240 self.writer = None;
246 let file = OpenOptions::new().append(true).open(&self.path)?;
247 self.writer = Some(io::BufWriter::new(file));
248 self.bytes_since_compact = 0;
249
250 Ok(())
251 }
252}
253
254pub fn load(path: &Path) -> Result<Option<Snapshot>, SnapshotError> {
261 let data = match fs::read(path) {
262 Ok(d) => d,
263 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
264 Err(e) => return Err(SnapshotError::Io(e)),
265 };
266
267 let (entries, cursor, already_compact) = replay_log(&data)?;
268
269 if entries.is_empty() && cursor.is_none() {
270 return Ok(None);
271 }
272
273 if !already_compact {
279 compact_to_file(path, &entries, &cursor)?;
280 }
281
282 Ok(Some(Snapshot { cursor, entries }))
283}
284
285fn replay_log(data: &[u8]) -> Result<(HashMap<String, KvEntry>, WatchCursor, bool), SnapshotError> {
297 if data.len() < HEADER_LEN {
298 return Err(SnapshotError::InvalidFormat("file too short".into()));
299 }
300 if &data[0..4] != MAGIC {
301 return Err(SnapshotError::InvalidFormat("bad magic".into()));
302 }
303 let version = u16::from_le_bytes([data[4], data[5]]);
304 if version != FORMAT_VERSION {
305 return Err(SnapshotError::InvalidFormat(format!(
306 "unsupported version {version}"
307 )));
308 }
309
310 let estimated = (data.len() - HEADER_LEN) / 30;
313 let mut live: HashMap<&str, (&[u8], VersionToken)> =
314 HashMap::with_capacity(estimated.min(4096));
315 let mut cursor = WatchCursor::none();
316 let mut pos = HEADER_LEN;
317
318 let mut redundant = false;
322 let mut clean_eof = true;
323
324 while pos < data.len() {
325 match parse_record(&data[pos..]) {
326 Ok((record, consumed)) => {
327 match record {
328 Record::Put {
329 key,
330 value,
331 version,
332 } => {
333 if live.insert(key, (value, version)).is_some() {
334 redundant = true;
335 }
336 }
337 Record::Delete { key } => {
338 live.remove(key);
339 redundant = true;
340 }
341 Record::Cursor(c) => {
342 if !cursor.is_none() {
350 redundant = true;
351 }
352 cursor = c;
353 }
354 }
355 pos += consumed;
356 }
357 Err(RecordError::Truncated) => {
358 clean_eof = false;
368 break;
369 }
370 Err(RecordError::CrcMismatch { consumed }) => {
371 if pos + consumed >= data.len() || data.len() - (pos + consumed) < MIN_CURSOR_RECORD
374 {
375 clean_eof = false;
376 break;
377 }
378 return Err(SnapshotError::Corrupted);
379 }
380 Err(RecordError::Invalid(msg)) => {
381 return Err(SnapshotError::InvalidFormat(msg));
382 }
383 }
384 }
385
386 let mut entries: HashMap<String, KvEntry> = HashMap::with_capacity(live.len());
389 for (key, (value, version)) in live {
390 let key = key.to_string();
391 entries.insert(
392 key.clone(),
393 KvEntry {
394 key,
395 value: value.to_vec(),
396 version,
397 },
398 );
399 }
400
401 let already_compact = !redundant && clean_eof;
402 Ok((entries, cursor, already_compact))
403}
404
405enum Record<'a> {
406 Put {
407 key: &'a str,
408 value: &'a [u8],
409 version: VersionToken,
410 },
411 Delete {
412 key: &'a str,
413 },
414 Cursor(WatchCursor),
415}
416
417enum RecordError {
418 Truncated,
419 CrcMismatch { consumed: usize },
420 Invalid(String),
421}
422
423fn parse_record(data: &[u8]) -> Result<(Record<'_>, usize), RecordError> {
424 if data.len() < 5 {
426 return Err(RecordError::Truncated);
427 }
428
429 let stored_crc = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
430
431 match data[4] {
432 REC_PUT => parse_put(data, stored_crc),
433 REC_DELETE => parse_delete(data, stored_crc),
434 REC_CURSOR => parse_cursor(data, stored_crc),
435 other => Err(RecordError::Invalid(format!(
436 "unknown record type: {other:#x}"
437 ))),
438 }
439}
440
441fn parse_put(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
442 if data.len() < 7 {
444 return Err(RecordError::Truncated);
445 }
446 let key_len = u16::from_le_bytes([data[5], data[6]]) as usize;
447 let vl_off = 7 + key_len;
448
449 if data.len() < vl_off + 4 {
450 return Err(RecordError::Truncated);
451 }
452 let value_len = u32::from_le_bytes([
453 data[vl_off],
454 data[vl_off + 1],
455 data[vl_off + 2],
456 data[vl_off + 3],
457 ]) as usize;
458
459 let ver_len_off = vl_off + 4 + value_len;
461 if data.len() < ver_len_off + 1 {
462 return Err(RecordError::Truncated);
463 }
464 let ver_len = data[ver_len_off] as usize;
465 if ver_len > 10 {
469 return Err(RecordError::Invalid(format!(
470 "version length {ver_len} exceeds max version token size (10)"
471 )));
472 }
473
474 let total = ver_len_off + 1 + ver_len;
475 if data.len() < total {
476 return Err(RecordError::Truncated);
477 }
478
479 let computed = crc32fast::hash(&data[4..total]);
480 if computed != stored_crc {
481 return Err(RecordError::CrcMismatch { consumed: total });
482 }
483
484 let key = std::str::from_utf8(&data[7..7 + key_len])
485 .map_err(|e| RecordError::Invalid(format!("invalid UTF-8 key: {e}")))?;
486 let value = &data[vl_off + 4..vl_off + 4 + value_len];
487 let version = VersionToken::from_raw(&data[ver_len_off + 1..total]).ok_or_else(|| {
490 RecordError::Invalid(format!(
491 "version length {ver_len} exceeds max version token size (10)"
492 ))
493 })?;
494
495 Ok((
496 Record::Put {
497 key,
498 value,
499 version,
500 },
501 total,
502 ))
503}
504
505fn parse_delete(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
506 if data.len() < 7 {
507 return Err(RecordError::Truncated);
508 }
509 let key_len = u16::from_le_bytes([data[5], data[6]]) as usize;
510 let ver_len_off = 7 + key_len;
511 if data.len() < ver_len_off + 1 {
512 return Err(RecordError::Truncated);
513 }
514 let ver_len = data[ver_len_off] as usize;
515 if ver_len > 10 {
517 return Err(RecordError::Invalid(format!(
518 "version length {ver_len} exceeds max version token size (10)"
519 )));
520 }
521 let total = ver_len_off + 1 + ver_len;
522
523 if data.len() < total {
524 return Err(RecordError::Truncated);
525 }
526
527 let computed = crc32fast::hash(&data[4..total]);
528 if computed != stored_crc {
529 return Err(RecordError::CrcMismatch { consumed: total });
530 }
531
532 let key = std::str::from_utf8(&data[7..7 + key_len])
533 .map_err(|e| RecordError::Invalid(format!("invalid UTF-8 key: {e}")))?;
534 Ok((Record::Delete { key }, total))
538}
539
540fn parse_cursor(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
541 if data.len() < 6 {
542 return Err(RecordError::Truncated);
543 }
544 let cursor_len = data[5] as usize;
545 if cursor_len > 10 {
549 return Err(RecordError::Invalid(format!(
550 "cursor length {cursor_len} exceeds max version token size (10)"
551 )));
552 }
553 let total = 6 + cursor_len;
554
555 if data.len() < total {
556 return Err(RecordError::Truncated);
557 }
558
559 let computed = crc32fast::hash(&data[4..total]);
560 if computed != stored_crc {
561 return Err(RecordError::CrcMismatch { consumed: total });
562 }
563
564 let version = VersionToken::from_raw(&data[6..total]).ok_or_else(|| {
567 RecordError::Invalid(format!(
568 "cursor length {cursor_len} exceeds max version token size (10)"
569 ))
570 })?;
571
572 Ok((Record::Cursor(WatchCursor::from_version(version)), total))
573}
574
575fn write_put_record(
580 w: &mut impl Write,
581 key: &str,
582 value: &[u8],
583 version: &VersionToken,
584) -> Result<usize, SnapshotError> {
585 let kb = key.as_bytes();
586 let vb = version.as_bytes();
587
588 let key_len = u16::try_from(kb.len()).map_err(|_| {
593 SnapshotError::InvalidFormat(format!(
594 "key too long: {} bytes (max {})",
595 kb.len(),
596 u16::MAX
597 ))
598 })?;
599 let value_len = u32::try_from(value.len()).map_err(|_| {
600 SnapshotError::InvalidFormat(format!(
601 "value too long: {} bytes (max {})",
602 value.len(),
603 u32::MAX
604 ))
605 })?;
606 let ver_len = u8::try_from(vb.len()).map_err(|_| {
612 SnapshotError::InvalidFormat(format!(
613 "version too long: {} bytes (max {})",
614 vb.len(),
615 u8::MAX
616 ))
617 })?;
618
619 let mut h = crc32fast::Hasher::new();
620 h.update(&[REC_PUT]);
621 h.update(&key_len.to_le_bytes());
622 h.update(kb);
623 h.update(&value_len.to_le_bytes());
624 h.update(value);
625 h.update(&[ver_len]);
626 h.update(vb);
627 let crc = h.finalize();
628
629 w.write_all(&crc.to_le_bytes())?;
630 w.write_all(&[REC_PUT])?;
631 w.write_all(&key_len.to_le_bytes())?;
632 w.write_all(kb)?;
633 w.write_all(&value_len.to_le_bytes())?;
634 w.write_all(value)?;
635 w.write_all(&[ver_len])?;
636 w.write_all(vb)?;
637
638 Ok(4 + 1 + 2 + kb.len() + 4 + value.len() + 1 + vb.len())
639}
640
641fn write_delete_record(
642 w: &mut impl Write,
643 key: &str,
644 version: &VersionToken,
645) -> Result<usize, SnapshotError> {
646 let kb = key.as_bytes();
647 let vb = version.as_bytes();
648
649 let key_len = u16::try_from(kb.len()).map_err(|_| {
650 SnapshotError::InvalidFormat(format!(
651 "key too long: {} bytes (max {})",
652 kb.len(),
653 u16::MAX
654 ))
655 })?;
656 let ver_len = u8::try_from(vb.len()).map_err(|_| {
659 SnapshotError::InvalidFormat(format!(
660 "version too long: {} bytes (max {})",
661 vb.len(),
662 u8::MAX
663 ))
664 })?;
665
666 let mut h = crc32fast::Hasher::new();
667 h.update(&[REC_DELETE]);
668 h.update(&key_len.to_le_bytes());
669 h.update(kb);
670 h.update(&[ver_len]);
671 h.update(vb);
672 let crc = h.finalize();
673
674 w.write_all(&crc.to_le_bytes())?;
675 w.write_all(&[REC_DELETE])?;
676 w.write_all(&key_len.to_le_bytes())?;
677 w.write_all(kb)?;
678 w.write_all(&[ver_len])?;
679 w.write_all(vb)?;
680
681 Ok(4 + 1 + 2 + kb.len() + 1 + vb.len())
682}
683
684fn write_cursor_record(w: &mut impl Write, cursor: &WatchCursor) -> Result<usize, SnapshotError> {
685 let cb = cursor.version().as_bytes();
686 let cb_len = u8::try_from(cb.len()).map_err(|_| {
692 SnapshotError::InvalidFormat(format!(
693 "cursor too long: {} bytes (max {})",
694 cb.len(),
695 u8::MAX
696 ))
697 })?;
698
699 let mut h = crc32fast::Hasher::new();
700 h.update(&[REC_CURSOR]);
701 h.update(&[cb_len]);
702 h.update(cb);
703 let crc = h.finalize();
704
705 w.write_all(&crc.to_le_bytes())?;
706 w.write_all(&[REC_CURSOR])?;
707 w.write_all(&[cb_len])?;
708 w.write_all(cb)?;
709
710 Ok(4 + 1 + 1 + cb.len())
711}
712
713fn compact_to_file(
718 path: &Path,
719 entries: &HashMap<String, KvEntry>,
720 cursor: &WatchCursor,
721) -> Result<(), SnapshotError> {
722 let dir = path.parent().ok_or_else(|| {
728 SnapshotError::InvalidFormat(format!("snapshot path has no parent: {}", path.display()))
729 })?;
730 let mut sorted: Vec<&KvEntry> = entries.values().collect();
741 sorted.sort_unstable_by(|a, b| a.key.cmp(&b.key));
742
743 let estimated: usize = HEADER_LEN
750 + sorted
751 .iter()
752 .map(|e| 4 + 1 + 2 + e.key.len() + 4 + e.value.len() + 1 + e.version.as_bytes().len())
753 .sum::<usize>()
754 + if cursor.is_none() {
755 0
756 } else {
757 4 + 1 + 1 + cursor.version().as_bytes().len()
758 };
759 let capacity = estimated.clamp(8 * 1024, 1024 * 1024);
760 let mut buf = io::BufWriter::with_capacity(capacity, tempfile::NamedTempFile::new_in(dir)?);
761
762 buf.write_all(MAGIC)?;
763 buf.write_all(&FORMAT_VERSION.to_le_bytes())?;
764
765 for entry in sorted {
766 write_put_record(&mut buf, &entry.key, &entry.value, &entry.version)?;
767 }
768
769 if !cursor.is_none() {
770 write_cursor_record(&mut buf, cursor)?;
771 }
772
773 buf.flush()?;
774 let tmp = buf
775 .into_inner()
776 .map_err(|e| SnapshotError::Io(e.into_error()))?;
777
778 tmp.as_file().sync_all()?;
779 tmp.persist(path).map_err(|e| SnapshotError::Io(e.error))?;
780
781 Ok(())
782}
783
784#[cfg(test)]
789mod tests {
790 use super::*;
791 use tempfile::TempDir;
792
793 fn entry(key: &str, value: &[u8], rev: u64) -> KvEntry {
794 KvEntry {
795 key: key.to_string(),
796 value: value.to_vec(),
797 version: VersionToken::from_u64(rev),
798 }
799 }
800
801 fn cursor(rev: u64) -> WatchCursor {
802 WatchCursor::from_u64(rev)
803 }
804
805 fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
806 KvUpdate::Put(entry(key, value, rev))
807 }
808
809 fn delete(key: &str, rev: u64) -> KvUpdate {
810 KvUpdate::Delete {
811 key: key.to_string(),
812 version: VersionToken::from_u64(rev),
813 }
814 }
815
816 #[test]
817 fn round_trip() {
818 let dir = TempDir::new().unwrap();
819 let path = dir.path().join("test.snap");
820
821 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
822 w.write_update(&put("node.us-east-1", b"val1", 1)).unwrap();
823 w.write_update(&put("node.eu-west-1", b"val2", 2)).unwrap();
824 w.checkpoint(&cursor(2)).unwrap();
825 drop(w);
826
827 let snap = load(&path).unwrap().unwrap();
828 assert_eq!(snap.entries.len(), 2);
829 assert_eq!(snap.cursor.as_u64(), Some(2));
830
831 assert_eq!(snap.entries["node.us-east-1"].value, b"val1");
832 assert_eq!(snap.entries["node.eu-west-1"].value, b"val2");
833 }
834
835 #[test]
836 fn multiple_batches() {
837 let dir = TempDir::new().unwrap();
838 let path = dir.path().join("test.snap");
839
840 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
841 w.write_update(&put("a", b"v1", 1)).unwrap();
842 w.checkpoint(&cursor(1)).unwrap();
843 w.write_update(&put("b", b"v2", 2)).unwrap();
844 w.checkpoint(&cursor(2)).unwrap();
845 drop(w);
846
847 let snap = load(&path).unwrap().unwrap();
848 assert_eq!(snap.entries.len(), 2);
849 assert_eq!(snap.cursor.as_u64(), Some(2));
850 }
851
852 #[test]
853 fn delete_removes_entry() {
854 let dir = TempDir::new().unwrap();
855 let path = dir.path().join("test.snap");
856
857 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
858 w.write_update(&put("a", b"v1", 1)).unwrap();
859 w.write_update(&put("b", b"v2", 2)).unwrap();
860 w.checkpoint(&cursor(2)).unwrap();
861 w.write_update(&delete("a", 3)).unwrap();
862 w.checkpoint(&cursor(3)).unwrap();
863 drop(w);
864
865 let snap = load(&path).unwrap().unwrap();
866 assert_eq!(snap.entries.len(), 1);
867 assert!(snap.entries.contains_key("b"));
868 }
869
870 #[test]
871 fn purge_removes_entry() {
872 let dir = TempDir::new().unwrap();
873 let path = dir.path().join("test.snap");
874
875 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
876 w.write_update(&put("a", b"v1", 1)).unwrap();
877 w.checkpoint(&cursor(1)).unwrap();
878 w.write_update(&KvUpdate::Purge {
879 key: "a".to_string(),
880 version: VersionToken::from_u64(2),
881 })
882 .unwrap();
883 w.checkpoint(&cursor(2)).unwrap();
884 drop(w);
885
886 let snap = load(&path).unwrap().unwrap();
887 assert!(!snap.entries.contains_key("a"));
888 }
889
890 #[test]
891 fn missing_file_returns_none() {
892 let dir = TempDir::new().unwrap();
893 assert!(load(&dir.path().join("nope.snap")).unwrap().is_none());
894 }
895
896 #[test]
897 fn corrupted_mid_file() {
898 let dir = TempDir::new().unwrap();
899 let path = dir.path().join("test.snap");
900
901 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
902 w.write_update(&put("a", b"aaaa-long-value-here", 1))
904 .unwrap();
905 w.checkpoint(&cursor(1)).unwrap();
906 w.write_update(&put("b", b"bbbb-long-value-here", 2))
907 .unwrap();
908 w.checkpoint(&cursor(2)).unwrap();
909 w.write_update(&put("c", b"cccc-long-value-here", 3))
910 .unwrap();
911 w.checkpoint(&cursor(3)).unwrap();
912 drop(w);
913
914 let mut data = fs::read(&path).unwrap();
915 let target = HEADER_LEN + 40;
917 assert!(
918 target < data.len() - 60,
919 "need enough room after corruption for valid records"
920 );
921 data[target] ^= 0xFF;
922 fs::write(&path, &data).unwrap();
923
924 match load(&path) {
925 Err(SnapshotError::Corrupted) => {}
926 other => panic!("expected Corrupted, got {other:?}"),
927 }
928 }
929
930 #[test]
931 fn truncated_final_record_recovered() {
932 let dir = TempDir::new().unwrap();
933 let path = dir.path().join("test.snap");
934
935 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
936 w.write_update(&put("a", b"v1", 1)).unwrap();
937 w.checkpoint(&cursor(1)).unwrap();
938 w.write_update(&put("b", b"v2", 2)).unwrap();
939 w.checkpoint(&cursor(2)).unwrap();
940 drop(w);
941
942 let mut data = fs::read(&path).unwrap();
944 data.truncate(data.len() - 3);
945 fs::write(&path, &data).unwrap();
946
947 let snap = load(&path).unwrap().unwrap();
948 assert!(snap.entries.contains_key("a"));
949 }
950
951 #[test]
952 fn truncated_tail_repaired_then_appendable() {
953 let dir = TempDir::new().unwrap();
957 let path = dir.path().join("test.snap");
958
959 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
960 w.write_update(&put("a", b"v1", 1)).unwrap();
961 w.write_update(&put("b", b"v2", 2)).unwrap();
962 w.checkpoint(&cursor(2)).unwrap();
963 drop(w);
964
965 let mut data = fs::read(&path).unwrap();
967 data.truncate(data.len() - 3);
968 fs::write(&path, &data).unwrap();
969
970 let snap = load(&path).unwrap().unwrap();
972 assert!(snap.entries.contains_key("a"));
973 assert!(snap.entries.contains_key("b"));
974
975 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
977 w.write_update(&put("c", b"v3", 3)).unwrap();
978 w.checkpoint(&cursor(3)).unwrap();
979 drop(w);
980
981 let snap = load(&path).unwrap().unwrap();
982 assert_eq!(snap.entries.len(), 3);
983 assert!(snap.entries.contains_key("c"));
984 assert_eq!(snap.cursor.as_u64(), Some(3));
985 }
986
987 #[test]
988 fn repeated_cursor_records_trigger_compaction() {
989 let dir = TempDir::new().unwrap();
995 let path = dir.path().join("test.snap");
996
997 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
998 w.write_update(&put("a", b"v1", 1)).unwrap();
999 for i in 1..=10u64 {
1000 w.checkpoint(&cursor(i)).unwrap();
1001 }
1002 drop(w);
1003
1004 let size_before = fs::metadata(&path).unwrap().len();
1005 let snap = load(&path).unwrap().unwrap();
1006 let size_after = fs::metadata(&path).unwrap().len();
1007
1008 assert_eq!(snap.entries.len(), 1);
1010 assert_eq!(snap.cursor.as_u64(), Some(10));
1011 assert!(
1012 size_after < size_before,
1013 "stale cursor records should be compacted away: {size_before} -> {size_after}"
1014 );
1015
1016 let after_first = fs::read(&path).unwrap();
1018 load(&path).unwrap().unwrap();
1019 let after_second = fs::read(&path).unwrap();
1020 assert_eq!(after_first, after_second);
1021 }
1022
1023 #[test]
1024 fn already_compact_file_reloads_unchanged() {
1025 let dir = TempDir::new().unwrap();
1028 let path = dir.path().join("test.snap");
1029
1030 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1031 w.write_update(&put("a", b"v1", 1)).unwrap();
1032 w.write_update(&put("b", b"v2", 2)).unwrap();
1033 w.checkpoint(&cursor(2)).unwrap();
1034 drop(w);
1035
1036 load(&path).unwrap().unwrap();
1038 let after_first = fs::read(&path).unwrap();
1039
1040 let snap = load(&path).unwrap().unwrap();
1043 let after_second = fs::read(&path).unwrap();
1044
1045 assert_eq!(after_first, after_second);
1046 assert_eq!(snap.entries.len(), 2);
1047 assert_eq!(snap.cursor.as_u64(), Some(2));
1048 }
1049
1050 #[test]
1051 fn bad_magic() {
1052 let dir = TempDir::new().unwrap();
1053 let path = dir.path().join("test.snap");
1054 fs::write(&path, b"XXXX\x01\x00").unwrap();
1055
1056 match load(&path) {
1057 Err(SnapshotError::InvalidFormat(msg)) => assert!(msg.contains("magic")),
1058 other => panic!("expected InvalidFormat, got {other:?}"),
1059 }
1060 }
1061
1062 #[test]
1063 fn wrong_version_rejected() {
1064 let dir = TempDir::new().unwrap();
1065 let path = dir.path().join("test.snap");
1066 let mut data = Vec::new();
1070 data.extend_from_slice(MAGIC);
1071 data.extend_from_slice(&(FORMAT_VERSION + 1).to_le_bytes());
1072 fs::write(&path, &data).unwrap();
1073
1074 match load(&path) {
1075 Err(SnapshotError::InvalidFormat(msg)) => {
1076 assert!(
1077 msg.contains("version"),
1078 "message should mention version: {msg}"
1079 )
1080 }
1081 other => panic!("expected InvalidFormat, got {other:?}"),
1082 }
1083 }
1084
1085 #[test]
1086 fn empty_log_returns_none() {
1087 let dir = TempDir::new().unwrap();
1088 let path = dir.path().join("test.snap");
1089
1090 let mut f = File::create(&path).unwrap();
1091 f.write_all(MAGIC).unwrap();
1092 f.write_all(&FORMAT_VERSION.to_le_bytes()).unwrap();
1093 drop(f);
1094
1095 assert!(load(&path).unwrap().is_none());
1096 }
1097
1098 #[test]
1099 fn compaction_on_load_shrinks_file() {
1100 let dir = TempDir::new().unwrap();
1101 let path = dir.path().join("test.snap");
1102
1103 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1104 for i in 0..10u64 {
1105 w.write_update(&put("same-key", format!("v{i}").as_bytes(), i))
1106 .unwrap();
1107 w.checkpoint(&cursor(i)).unwrap();
1108 }
1109 drop(w);
1110
1111 let size_before = fs::metadata(&path).unwrap().len();
1112 let snap = load(&path).unwrap().unwrap();
1113 let size_after = fs::metadata(&path).unwrap().len();
1114
1115 assert_eq!(snap.entries.len(), 1);
1116 assert_eq!(snap.entries["same-key"].value, b"v9");
1117 assert!(
1118 size_after < size_before,
1119 "compaction should shrink: {size_before} -> {size_after}"
1120 );
1121 }
1122
1123 #[test]
1124 fn compact_when_threshold_exceeded() {
1125 let dir = TempDir::new().unwrap();
1126 let path = dir.path().join("test.snap");
1127
1128 let mut w = SnapshotWriter::open(&path, 100).unwrap();
1130 for i in 0..20u64 {
1131 w.write_update(&put("key", format!("value-{i}").as_bytes(), i))
1132 .unwrap();
1133 if w.checkpoint(&cursor(i)).unwrap() {
1134 w.compact().unwrap();
1135 }
1136 }
1137 drop(w);
1138
1139 let snap = load(&path).unwrap().unwrap();
1140 assert_eq!(snap.entries.len(), 1);
1141 assert_eq!(snap.entries["key"].value, b"value-19");
1142 }
1143
1144 #[test]
1145 fn reopen_appends() {
1146 let dir = TempDir::new().unwrap();
1147 let path = dir.path().join("test.snap");
1148
1149 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1151 w.write_update(&put("a", b"v1", 1)).unwrap();
1152 w.checkpoint(&cursor(1)).unwrap();
1153 drop(w);
1154
1155 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1157 w.write_update(&put("b", b"v2", 2)).unwrap();
1158 w.checkpoint(&cursor(2)).unwrap();
1159 drop(w);
1160
1161 let snap = load(&path).unwrap().unwrap();
1162 assert_eq!(snap.entries.len(), 2);
1163 assert_eq!(snap.cursor.as_u64(), Some(2));
1164 }
1165
1166 #[test]
1167 fn large_values() {
1168 let dir = TempDir::new().unwrap();
1169 let path = dir.path().join("test.snap");
1170
1171 let big = vec![0xABu8; 100_000];
1172 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1173 w.write_update(&put("big", &big, 1)).unwrap();
1174 w.checkpoint(&cursor(1)).unwrap();
1175 drop(w);
1176
1177 let snap = load(&path).unwrap().unwrap();
1178 assert_eq!(snap.entries.len(), 1);
1179 assert_eq!(snap.entries["big"].value.len(), 100_000);
1180 assert!(snap.entries["big"].value.iter().all(|&b| b == 0xAB));
1181 }
1182
1183 #[test]
1184 fn cursor_only_snapshot_returns_some_with_empty_entries() {
1185 let dir = TempDir::new().unwrap();
1191 let path = dir.path().join("test.snap");
1192
1193 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1194 w.checkpoint(&cursor(42)).unwrap();
1195 drop(w);
1196
1197 let snap = load(&path)
1198 .unwrap()
1199 .expect("cursor-only snapshot should return Some");
1200 assert!(snap.entries.is_empty(), "no entries written, none expected");
1201 assert_eq!(
1202 snap.cursor.as_u64(),
1203 Some(42),
1204 "cursor must survive round-trip"
1205 );
1206 }
1207
1208 #[test]
1209 fn stale_keys_detects_removed_entries() {
1210 let dir = TempDir::new().unwrap();
1211 let path = dir.path().join("test.snap");
1212
1213 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1214 w.write_update(&put("node.a", b"v1", 1)).unwrap();
1215 w.write_update(&put("node.b", b"v2", 2)).unwrap();
1216 w.write_update(&put("node.c", b"v3", 3)).unwrap();
1217 w.checkpoint(&cursor(3)).unwrap();
1218 drop(w);
1219
1220 let snap = load(&path).unwrap().unwrap();
1221
1222 let mut stale = snap.stale_keys(["node.a", "node.c"]);
1224 stale.sort();
1225 assert_eq!(stale, vec!["node.b"]);
1226
1227 let stale = snap.stale_keys(["node.a", "node.b", "node.c"]);
1229 assert!(stale.is_empty());
1230
1231 let mut stale: Vec<&str> = snap.stale_keys(std::iter::empty::<&str>());
1233 stale.sort();
1234 assert_eq!(stale, vec!["node.a", "node.b", "node.c"]);
1235 }
1236
1237 #[test]
1238 fn non_u64_version_token_round_trips() {
1239 let dir = TempDir::new().unwrap();
1244 let path = dir.path().join("test.snap");
1245
1246 let stamp = [9u8, 8, 7, 6, 5, 4, 3, 2, 1, 0];
1247 let token = VersionToken::from_fdb_versionstamp(&stamp);
1248 assert!(token.as_u64().is_none(), "10-byte token is not a u64");
1249
1250 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1251 w.write_update(&KvUpdate::Put(KvEntry {
1252 key: "fdb.key".to_string(),
1253 value: b"v".to_vec(),
1254 version: token.clone(),
1255 }))
1256 .unwrap();
1257 w.checkpoint(&cursor(1)).unwrap();
1258 drop(w);
1259
1260 let snap = load(&path).unwrap().unwrap();
1261 assert_eq!(
1262 snap.entries["fdb.key"].version.as_bytes(),
1263 &stamp,
1264 "versionstamp must survive the snapshot round-trip byte-for-byte"
1265 );
1266 }
1267
1268 #[test]
1274 fn compact_preserves_uncheckpointed_writes() {
1275 let dir = TempDir::new().unwrap();
1276 let path = dir.path().join("test.snap");
1277
1278 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1279 w.write_update(&put("node.a", b"survives", 1)).unwrap();
1281 w.compact().unwrap();
1282 drop(w);
1283
1284 let snap = load(&path).unwrap().unwrap();
1285 assert!(
1286 snap.entries.contains_key("node.a"),
1287 "compact() must not drop buffered-but-uncheckpointed writes"
1288 );
1289 assert_eq!(snap.entries["node.a"].value, b"survives");
1290 }
1291}