1use std::collections::{HashMap, HashSet};
42use std::fs::{self, File, OpenOptions};
43use std::io::{self, Write};
44use std::path::{Path, PathBuf};
45
46use crate::kv::{KvEntry, KvUpdate, VersionToken, WatchCursor};
47
48const MAGIC: &[u8; 4] = b"PGSS";
49const FORMAT_VERSION: u16 = 2;
54const HEADER_LEN: usize = 6;
55
56const REC_PUT: u8 = 0x01;
57const REC_DELETE: u8 = 0x02;
58const REC_CURSOR: u8 = 0x03;
59
60const MIN_CURSOR_RECORD: usize = 4 + 1 + 1; #[derive(Debug, thiserror::Error)]
69pub enum SnapshotError {
70 #[error("snapshot I/O error: {0}")]
71 Io(#[from] io::Error),
72 #[error("invalid snapshot format: {0}")]
73 InvalidFormat(String),
74 #[error("snapshot corrupted (CRC mismatch)")]
75 Corrupted,
76 #[error("snapshot backend error: {0}")]
81 Backend(String),
82}
83
84#[derive(Debug)]
86pub struct Snapshot {
87 pub cursor: WatchCursor,
89 pub entries: HashMap<String, KvEntry>,
91}
92
93impl Snapshot {
94 pub fn stale_keys<'a, I>(&'a self, current_keys: I) -> Vec<&'a str>
100 where
101 I: IntoIterator<Item = &'a str>,
102 {
103 let current: HashSet<&str> = current_keys.into_iter().collect();
104 self.entries
105 .keys()
106 .filter(|k| !current.contains(k.as_str()))
107 .map(|k| k.as_str())
108 .collect()
109 }
110}
111
112pub struct SnapshotWriter {
123 path: PathBuf,
124 writer: Option<io::BufWriter<File>>,
130 bytes_since_compact: u64,
131 compact_threshold: u64,
132}
133
134impl SnapshotWriter {
135 pub fn open(path: &Path, compact_threshold: u64) -> Result<Self, SnapshotError> {
141 let file = OpenOptions::new().create(true).append(true).open(path)?;
148 let existing_len = file.metadata()?.len();
149
150 let mut writer = io::BufWriter::new(file);
151
152 let bytes_since_compact = if existing_len >= HEADER_LEN as u64 {
157 existing_len - HEADER_LEN as u64
158 } else {
159 writer.write_all(MAGIC)?;
160 writer.write_all(&FORMAT_VERSION.to_le_bytes())?;
161 writer.flush()?;
162 0
163 };
164
165 Ok(Self {
166 path: path.to_path_buf(),
167 writer: Some(writer),
168 bytes_since_compact,
169 compact_threshold,
170 })
171 }
172
173 fn writer(&mut self) -> Result<&mut io::BufWriter<File>, SnapshotError> {
177 self.writer.as_mut().ok_or_else(|| {
178 SnapshotError::Io(io::Error::other(
179 "snapshot writer poisoned: a prior compact() failed to reopen the log for append",
180 ))
181 })
182 }
183
184 #[must_use = "I/O errors mean the write was lost"]
188 pub fn write_update(&mut self, update: &KvUpdate) -> Result<(), SnapshotError> {
189 let w = self.writer()?;
190 let bytes = match update {
191 KvUpdate::Put(entry) => write_put_record(w, &entry.key, &entry.value, &entry.version)?,
192 KvUpdate::Delete { key, version } | KvUpdate::Purge { key, version } => {
193 write_delete_record(w, key, version)?
194 }
195 };
196 self.bytes_since_compact += bytes as u64;
197 Ok(())
198 }
199
200 #[must_use = "returns true when compaction is needed"]
214 pub fn checkpoint(&mut self, cursor: &WatchCursor) -> Result<bool, SnapshotError> {
215 let w = self.writer()?;
216 let bytes = write_cursor_record(w, cursor)?;
217 w.flush()?;
218 self.bytes_since_compact += bytes as u64;
219 Ok(self.bytes_since_compact > self.compact_threshold)
220 }
221
222 #[must_use = "I/O errors mean the flush failed"]
224 pub fn flush(&mut self) -> Result<(), SnapshotError> {
225 self.writer()?.flush()?;
226 Ok(())
227 }
228
229 #[must_use = "compaction errors leave the log uncompacted"]
234 pub fn compact(&mut self) -> Result<(), SnapshotError> {
235 self.writer()?.flush()?;
242 let data = fs::read(&self.path)?;
243 let (entries, cursor, _already_compact) = replay_log(&data)?;
244 compact_to_file(&self.path, &entries, &cursor)?;
245
246 self.writer = None;
252 let file = OpenOptions::new().append(true).open(&self.path)?;
253 self.writer = Some(io::BufWriter::new(file));
254 self.bytes_since_compact = 0;
255
256 Ok(())
257 }
258}
259
260pub fn load(path: &Path) -> Result<Option<Snapshot>, SnapshotError> {
267 let data = match fs::read(path) {
268 Ok(d) => d,
269 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
270 Err(e) => return Err(SnapshotError::Io(e)),
271 };
272
273 let (entries, cursor, already_compact) = replay_log(&data)?;
274
275 if entries.is_empty() && cursor.is_none() {
276 return Ok(None);
277 }
278
279 if !already_compact {
285 compact_to_file(path, &entries, &cursor)?;
286 }
287
288 Ok(Some(Snapshot { cursor, entries }))
289}
290
291pub trait SnapshotStore: Sized + Send {
327 fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError>;
337
338 fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError>;
346
347 fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError>;
349
350 fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError>;
357
358 fn for_each_in_range(
372 &self,
373 prefix: &str,
374 mut f: impl FnMut(KvEntry) -> Result<(), SnapshotError>,
375 ) -> Result<(), SnapshotError> {
376 for entry in self.range(prefix)? {
377 f(entry)?;
378 }
379 Ok(())
380 }
381}
382
383pub const DEFAULT_COMPACT_THRESHOLD: u64 = 10 * 1024 * 1024;
391
392pub struct AppendLogSnapshot {
402 writer: SnapshotWriter,
403 entries: HashMap<String, KvEntry>,
404 cursor: WatchCursor,
405}
406
407impl AppendLogSnapshot {
408 pub fn open(path: &Path, compact_threshold: u64) -> Result<(WatchCursor, Self), SnapshotError> {
414 let (cursor, entries) = match load(path)? {
415 Some(snap) => (snap.cursor, snap.entries),
416 None => (WatchCursor::none(), HashMap::new()),
417 };
418 let writer = SnapshotWriter::open(path, compact_threshold)?;
419 Ok((
420 cursor.clone(),
421 Self {
422 writer,
423 entries,
424 cursor,
425 },
426 ))
427 }
428}
429
430impl SnapshotStore for AppendLogSnapshot {
431 fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError> {
432 Self::open(path, DEFAULT_COMPACT_THRESHOLD)
433 }
434
435 fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError> {
436 for update in batch {
441 self.writer.write_update(update)?;
442 match update {
443 KvUpdate::Put(entry) => {
444 self.entries.insert(entry.key.clone(), entry.clone());
445 }
446 KvUpdate::Delete { key, .. } | KvUpdate::Purge { key, .. } => {
447 self.entries.remove(key);
448 }
449 }
450 }
451 if self.writer.checkpoint(cursor)? {
456 self.writer.compact()?;
457 }
458 self.cursor = cursor.clone();
459 Ok(())
460 }
461
462 fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError> {
463 Ok(self.entries.get(key).cloned())
464 }
465
466 fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError> {
467 let mut out: Vec<KvEntry> = self
468 .entries
469 .values()
470 .filter(|e| e.key.starts_with(prefix))
471 .cloned()
472 .collect();
473 out.sort_unstable_by(|a, b| a.key.cmp(&b.key));
474 Ok(out)
475 }
476}
477
478fn replay_log(data: &[u8]) -> Result<(HashMap<String, KvEntry>, WatchCursor, bool), SnapshotError> {
490 if data.len() < HEADER_LEN {
491 return Err(SnapshotError::InvalidFormat("file too short".into()));
492 }
493 if &data[0..4] != MAGIC {
494 return Err(SnapshotError::InvalidFormat("bad magic".into()));
495 }
496 let version = u16::from_le_bytes([data[4], data[5]]);
497 if version != FORMAT_VERSION {
498 return Err(SnapshotError::InvalidFormat(format!(
499 "unsupported version {version}"
500 )));
501 }
502
503 let estimated = (data.len() - HEADER_LEN) / 30;
506 let mut live: HashMap<&str, (&[u8], VersionToken)> =
507 HashMap::with_capacity(estimated.min(4096));
508 let mut cursor = WatchCursor::none();
509 let mut pos = HEADER_LEN;
510
511 let mut redundant = false;
515 let mut clean_eof = true;
516
517 while pos < data.len() {
518 match parse_record(&data[pos..]) {
519 Ok((record, consumed)) => {
520 match record {
521 Record::Put {
522 key,
523 value,
524 version,
525 } => {
526 if live.insert(key, (value, version)).is_some() {
527 redundant = true;
528 }
529 }
530 Record::Delete { key } => {
531 live.remove(key);
532 redundant = true;
533 }
534 Record::Cursor(c) => {
535 if !cursor.is_none() {
543 redundant = true;
544 }
545 cursor = c;
546 }
547 }
548 pos += consumed;
549 }
550 Err(RecordError::Truncated) => {
551 clean_eof = false;
561 break;
562 }
563 Err(RecordError::CrcMismatch { consumed }) => {
564 if pos + consumed >= data.len() || data.len() - (pos + consumed) < MIN_CURSOR_RECORD
567 {
568 clean_eof = false;
569 break;
570 }
571 return Err(SnapshotError::Corrupted);
572 }
573 Err(RecordError::Invalid(msg)) => {
574 return Err(SnapshotError::InvalidFormat(msg));
575 }
576 }
577 }
578
579 let mut entries: HashMap<String, KvEntry> = HashMap::with_capacity(live.len());
582 for (key, (value, version)) in live {
583 let key = key.to_string();
584 entries.insert(
585 key.clone(),
586 KvEntry {
587 key,
588 value: value.to_vec(),
589 version,
590 },
591 );
592 }
593
594 let already_compact = !redundant && clean_eof;
595 Ok((entries, cursor, already_compact))
596}
597
598enum Record<'a> {
599 Put {
600 key: &'a str,
601 value: &'a [u8],
602 version: VersionToken,
603 },
604 Delete {
605 key: &'a str,
606 },
607 Cursor(WatchCursor),
608}
609
610enum RecordError {
611 Truncated,
612 CrcMismatch { consumed: usize },
613 Invalid(String),
614}
615
616fn parse_record(data: &[u8]) -> Result<(Record<'_>, usize), RecordError> {
617 if data.len() < 5 {
619 return Err(RecordError::Truncated);
620 }
621
622 let stored_crc = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
623
624 match data[4] {
625 REC_PUT => parse_put(data, stored_crc),
626 REC_DELETE => parse_delete(data, stored_crc),
627 REC_CURSOR => parse_cursor(data, stored_crc),
628 other => Err(RecordError::Invalid(format!(
629 "unknown record type: {other:#x}"
630 ))),
631 }
632}
633
634fn parse_put(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
635 if data.len() < 7 {
637 return Err(RecordError::Truncated);
638 }
639 let key_len = u16::from_le_bytes([data[5], data[6]]) as usize;
640 let vl_off = 7 + key_len;
641
642 if data.len() < vl_off + 4 {
643 return Err(RecordError::Truncated);
644 }
645 let value_len = u32::from_le_bytes([
646 data[vl_off],
647 data[vl_off + 1],
648 data[vl_off + 2],
649 data[vl_off + 3],
650 ]) as usize;
651
652 let ver_len_off = vl_off + 4 + value_len;
654 if data.len() < ver_len_off + 1 {
655 return Err(RecordError::Truncated);
656 }
657 let ver_len = data[ver_len_off] as usize;
658 if ver_len > 10 {
662 return Err(RecordError::Invalid(format!(
663 "version length {ver_len} exceeds max version token size (10)"
664 )));
665 }
666
667 let total = ver_len_off + 1 + ver_len;
668 if data.len() < total {
669 return Err(RecordError::Truncated);
670 }
671
672 let computed = crc32fast::hash(&data[4..total]);
673 if computed != stored_crc {
674 return Err(RecordError::CrcMismatch { consumed: total });
675 }
676
677 let key = std::str::from_utf8(&data[7..7 + key_len])
678 .map_err(|e| RecordError::Invalid(format!("invalid UTF-8 key: {e}")))?;
679 let value = &data[vl_off + 4..vl_off + 4 + value_len];
680 let version = VersionToken::from_raw(&data[ver_len_off + 1..total]).ok_or_else(|| {
683 RecordError::Invalid(format!(
684 "version length {ver_len} exceeds max version token size (10)"
685 ))
686 })?;
687
688 Ok((
689 Record::Put {
690 key,
691 value,
692 version,
693 },
694 total,
695 ))
696}
697
698fn parse_delete(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
699 if data.len() < 7 {
700 return Err(RecordError::Truncated);
701 }
702 let key_len = u16::from_le_bytes([data[5], data[6]]) as usize;
703 let ver_len_off = 7 + key_len;
704 if data.len() < ver_len_off + 1 {
705 return Err(RecordError::Truncated);
706 }
707 let ver_len = data[ver_len_off] as usize;
708 if ver_len > 10 {
710 return Err(RecordError::Invalid(format!(
711 "version length {ver_len} exceeds max version token size (10)"
712 )));
713 }
714 let total = ver_len_off + 1 + ver_len;
715
716 if data.len() < total {
717 return Err(RecordError::Truncated);
718 }
719
720 let computed = crc32fast::hash(&data[4..total]);
721 if computed != stored_crc {
722 return Err(RecordError::CrcMismatch { consumed: total });
723 }
724
725 let key = std::str::from_utf8(&data[7..7 + key_len])
726 .map_err(|e| RecordError::Invalid(format!("invalid UTF-8 key: {e}")))?;
727 Ok((Record::Delete { key }, total))
731}
732
733fn parse_cursor(data: &[u8], stored_crc: u32) -> Result<(Record<'_>, usize), RecordError> {
734 if data.len() < 6 {
735 return Err(RecordError::Truncated);
736 }
737 let cursor_len = data[5] as usize;
738 if cursor_len > 10 {
742 return Err(RecordError::Invalid(format!(
743 "cursor length {cursor_len} exceeds max version token size (10)"
744 )));
745 }
746 let total = 6 + cursor_len;
747
748 if data.len() < total {
749 return Err(RecordError::Truncated);
750 }
751
752 let computed = crc32fast::hash(&data[4..total]);
753 if computed != stored_crc {
754 return Err(RecordError::CrcMismatch { consumed: total });
755 }
756
757 let version = VersionToken::from_raw(&data[6..total]).ok_or_else(|| {
760 RecordError::Invalid(format!(
761 "cursor length {cursor_len} exceeds max version token size (10)"
762 ))
763 })?;
764
765 Ok((Record::Cursor(WatchCursor::from_version(version)), total))
766}
767
768fn write_put_record(
773 w: &mut impl Write,
774 key: &str,
775 value: &[u8],
776 version: &VersionToken,
777) -> Result<usize, SnapshotError> {
778 let kb = key.as_bytes();
779 let vb = version.as_bytes();
780
781 let key_len = u16::try_from(kb.len()).map_err(|_| {
786 SnapshotError::InvalidFormat(format!(
787 "key too long: {} bytes (max {})",
788 kb.len(),
789 u16::MAX
790 ))
791 })?;
792 let value_len = u32::try_from(value.len()).map_err(|_| {
793 SnapshotError::InvalidFormat(format!(
794 "value too long: {} bytes (max {})",
795 value.len(),
796 u32::MAX
797 ))
798 })?;
799 let ver_len = u8::try_from(vb.len()).map_err(|_| {
805 SnapshotError::InvalidFormat(format!(
806 "version too long: {} bytes (max {})",
807 vb.len(),
808 u8::MAX
809 ))
810 })?;
811
812 let mut h = crc32fast::Hasher::new();
813 h.update(&[REC_PUT]);
814 h.update(&key_len.to_le_bytes());
815 h.update(kb);
816 h.update(&value_len.to_le_bytes());
817 h.update(value);
818 h.update(&[ver_len]);
819 h.update(vb);
820 let crc = h.finalize();
821
822 w.write_all(&crc.to_le_bytes())?;
823 w.write_all(&[REC_PUT])?;
824 w.write_all(&key_len.to_le_bytes())?;
825 w.write_all(kb)?;
826 w.write_all(&value_len.to_le_bytes())?;
827 w.write_all(value)?;
828 w.write_all(&[ver_len])?;
829 w.write_all(vb)?;
830
831 Ok(4 + 1 + 2 + kb.len() + 4 + value.len() + 1 + vb.len())
832}
833
834fn write_delete_record(
835 w: &mut impl Write,
836 key: &str,
837 version: &VersionToken,
838) -> Result<usize, SnapshotError> {
839 let kb = key.as_bytes();
840 let vb = version.as_bytes();
841
842 let key_len = u16::try_from(kb.len()).map_err(|_| {
843 SnapshotError::InvalidFormat(format!(
844 "key too long: {} bytes (max {})",
845 kb.len(),
846 u16::MAX
847 ))
848 })?;
849 let ver_len = u8::try_from(vb.len()).map_err(|_| {
852 SnapshotError::InvalidFormat(format!(
853 "version too long: {} bytes (max {})",
854 vb.len(),
855 u8::MAX
856 ))
857 })?;
858
859 let mut h = crc32fast::Hasher::new();
860 h.update(&[REC_DELETE]);
861 h.update(&key_len.to_le_bytes());
862 h.update(kb);
863 h.update(&[ver_len]);
864 h.update(vb);
865 let crc = h.finalize();
866
867 w.write_all(&crc.to_le_bytes())?;
868 w.write_all(&[REC_DELETE])?;
869 w.write_all(&key_len.to_le_bytes())?;
870 w.write_all(kb)?;
871 w.write_all(&[ver_len])?;
872 w.write_all(vb)?;
873
874 Ok(4 + 1 + 2 + kb.len() + 1 + vb.len())
875}
876
877fn write_cursor_record(w: &mut impl Write, cursor: &WatchCursor) -> Result<usize, SnapshotError> {
878 let cb = cursor.version().as_bytes();
879 let cb_len = u8::try_from(cb.len()).map_err(|_| {
885 SnapshotError::InvalidFormat(format!(
886 "cursor too long: {} bytes (max {})",
887 cb.len(),
888 u8::MAX
889 ))
890 })?;
891
892 let mut h = crc32fast::Hasher::new();
893 h.update(&[REC_CURSOR]);
894 h.update(&[cb_len]);
895 h.update(cb);
896 let crc = h.finalize();
897
898 w.write_all(&crc.to_le_bytes())?;
899 w.write_all(&[REC_CURSOR])?;
900 w.write_all(&[cb_len])?;
901 w.write_all(cb)?;
902
903 Ok(4 + 1 + 1 + cb.len())
904}
905
906fn compact_to_file(
911 path: &Path,
912 entries: &HashMap<String, KvEntry>,
913 cursor: &WatchCursor,
914) -> Result<(), SnapshotError> {
915 let dir = path.parent().ok_or_else(|| {
921 SnapshotError::InvalidFormat(format!("snapshot path has no parent: {}", path.display()))
922 })?;
923 let mut sorted: Vec<&KvEntry> = entries.values().collect();
934 sorted.sort_unstable_by(|a, b| a.key.cmp(&b.key));
935
936 let estimated: usize = HEADER_LEN
943 + sorted
944 .iter()
945 .map(|e| 4 + 1 + 2 + e.key.len() + 4 + e.value.len() + 1 + e.version.as_bytes().len())
946 .sum::<usize>()
947 + if cursor.is_none() {
948 0
949 } else {
950 4 + 1 + 1 + cursor.version().as_bytes().len()
951 };
952 let capacity = estimated.clamp(8 * 1024, 1024 * 1024);
953 let mut buf = io::BufWriter::with_capacity(capacity, tempfile::NamedTempFile::new_in(dir)?);
954
955 buf.write_all(MAGIC)?;
956 buf.write_all(&FORMAT_VERSION.to_le_bytes())?;
957
958 for entry in sorted {
959 write_put_record(&mut buf, &entry.key, &entry.value, &entry.version)?;
960 }
961
962 if !cursor.is_none() {
963 write_cursor_record(&mut buf, cursor)?;
964 }
965
966 buf.flush()?;
967 let tmp = buf
968 .into_inner()
969 .map_err(|e| SnapshotError::Io(e.into_error()))?;
970
971 tmp.as_file().sync_all()?;
972 tmp.persist(path).map_err(|e| SnapshotError::Io(e.error))?;
973
974 Ok(())
975}
976
977#[cfg(test)]
982mod tests {
983 use super::*;
984 use tempfile::TempDir;
985
986 fn entry(key: &str, value: &[u8], rev: u64) -> KvEntry {
987 KvEntry {
988 key: key.to_string(),
989 value: value.to_vec(),
990 version: VersionToken::from_u64(rev),
991 }
992 }
993
994 fn cursor(rev: u64) -> WatchCursor {
995 WatchCursor::from_u64(rev)
996 }
997
998 fn put(key: &str, value: &[u8], rev: u64) -> KvUpdate {
999 KvUpdate::Put(entry(key, value, rev))
1000 }
1001
1002 fn delete(key: &str, rev: u64) -> KvUpdate {
1003 KvUpdate::Delete {
1004 key: key.to_string(),
1005 version: VersionToken::from_u64(rev),
1006 }
1007 }
1008
1009 #[test]
1010 fn round_trip() {
1011 let dir = TempDir::new().unwrap();
1012 let path = dir.path().join("test.snap");
1013
1014 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1015 w.write_update(&put("node.us-east-1", b"val1", 1)).unwrap();
1016 w.write_update(&put("node.eu-west-1", b"val2", 2)).unwrap();
1017 w.checkpoint(&cursor(2)).unwrap();
1018 drop(w);
1019
1020 let snap = load(&path).unwrap().unwrap();
1021 assert_eq!(snap.entries.len(), 2);
1022 assert_eq!(snap.cursor.as_u64(), Some(2));
1023
1024 assert_eq!(snap.entries["node.us-east-1"].value, b"val1");
1025 assert_eq!(snap.entries["node.eu-west-1"].value, b"val2");
1026 }
1027
1028 #[test]
1029 fn multiple_batches() {
1030 let dir = TempDir::new().unwrap();
1031 let path = dir.path().join("test.snap");
1032
1033 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1034 w.write_update(&put("a", b"v1", 1)).unwrap();
1035 w.checkpoint(&cursor(1)).unwrap();
1036 w.write_update(&put("b", b"v2", 2)).unwrap();
1037 w.checkpoint(&cursor(2)).unwrap();
1038 drop(w);
1039
1040 let snap = load(&path).unwrap().unwrap();
1041 assert_eq!(snap.entries.len(), 2);
1042 assert_eq!(snap.cursor.as_u64(), Some(2));
1043 }
1044
1045 #[test]
1046 fn delete_removes_entry() {
1047 let dir = TempDir::new().unwrap();
1048 let path = dir.path().join("test.snap");
1049
1050 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1051 w.write_update(&put("a", b"v1", 1)).unwrap();
1052 w.write_update(&put("b", b"v2", 2)).unwrap();
1053 w.checkpoint(&cursor(2)).unwrap();
1054 w.write_update(&delete("a", 3)).unwrap();
1055 w.checkpoint(&cursor(3)).unwrap();
1056 drop(w);
1057
1058 let snap = load(&path).unwrap().unwrap();
1059 assert_eq!(snap.entries.len(), 1);
1060 assert!(snap.entries.contains_key("b"));
1061 }
1062
1063 #[test]
1064 fn purge_removes_entry() {
1065 let dir = TempDir::new().unwrap();
1066 let path = dir.path().join("test.snap");
1067
1068 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1069 w.write_update(&put("a", b"v1", 1)).unwrap();
1070 w.checkpoint(&cursor(1)).unwrap();
1071 w.write_update(&KvUpdate::Purge {
1072 key: "a".to_string(),
1073 version: VersionToken::from_u64(2),
1074 })
1075 .unwrap();
1076 w.checkpoint(&cursor(2)).unwrap();
1077 drop(w);
1078
1079 let snap = load(&path).unwrap().unwrap();
1080 assert!(!snap.entries.contains_key("a"));
1081 }
1082
1083 #[test]
1084 fn missing_file_returns_none() {
1085 let dir = TempDir::new().unwrap();
1086 assert!(load(&dir.path().join("nope.snap")).unwrap().is_none());
1087 }
1088
1089 #[test]
1090 fn corrupted_mid_file() {
1091 let dir = TempDir::new().unwrap();
1092 let path = dir.path().join("test.snap");
1093
1094 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1095 w.write_update(&put("a", b"aaaa-long-value-here", 1))
1097 .unwrap();
1098 w.checkpoint(&cursor(1)).unwrap();
1099 w.write_update(&put("b", b"bbbb-long-value-here", 2))
1100 .unwrap();
1101 w.checkpoint(&cursor(2)).unwrap();
1102 w.write_update(&put("c", b"cccc-long-value-here", 3))
1103 .unwrap();
1104 w.checkpoint(&cursor(3)).unwrap();
1105 drop(w);
1106
1107 let mut data = fs::read(&path).unwrap();
1108 let target = HEADER_LEN + 40;
1110 assert!(
1111 target < data.len() - 60,
1112 "need enough room after corruption for valid records"
1113 );
1114 data[target] ^= 0xFF;
1115 fs::write(&path, &data).unwrap();
1116
1117 match load(&path) {
1118 Err(SnapshotError::Corrupted) => {}
1119 other => panic!("expected Corrupted, got {other:?}"),
1120 }
1121 }
1122
1123 #[test]
1124 fn truncated_final_record_recovered() {
1125 let dir = TempDir::new().unwrap();
1126 let path = dir.path().join("test.snap");
1127
1128 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1129 w.write_update(&put("a", b"v1", 1)).unwrap();
1130 w.checkpoint(&cursor(1)).unwrap();
1131 w.write_update(&put("b", b"v2", 2)).unwrap();
1132 w.checkpoint(&cursor(2)).unwrap();
1133 drop(w);
1134
1135 let mut data = fs::read(&path).unwrap();
1137 data.truncate(data.len() - 3);
1138 fs::write(&path, &data).unwrap();
1139
1140 let snap = load(&path).unwrap().unwrap();
1141 assert!(snap.entries.contains_key("a"));
1142 }
1143
1144 #[test]
1145 fn truncated_tail_repaired_then_appendable() {
1146 let dir = TempDir::new().unwrap();
1150 let path = dir.path().join("test.snap");
1151
1152 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1153 w.write_update(&put("a", b"v1", 1)).unwrap();
1154 w.write_update(&put("b", b"v2", 2)).unwrap();
1155 w.checkpoint(&cursor(2)).unwrap();
1156 drop(w);
1157
1158 let mut data = fs::read(&path).unwrap();
1160 data.truncate(data.len() - 3);
1161 fs::write(&path, &data).unwrap();
1162
1163 let snap = load(&path).unwrap().unwrap();
1165 assert!(snap.entries.contains_key("a"));
1166 assert!(snap.entries.contains_key("b"));
1167
1168 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1170 w.write_update(&put("c", b"v3", 3)).unwrap();
1171 w.checkpoint(&cursor(3)).unwrap();
1172 drop(w);
1173
1174 let snap = load(&path).unwrap().unwrap();
1175 assert_eq!(snap.entries.len(), 3);
1176 assert!(snap.entries.contains_key("c"));
1177 assert_eq!(snap.cursor.as_u64(), Some(3));
1178 }
1179
1180 #[test]
1181 fn repeated_cursor_records_trigger_compaction() {
1182 let dir = TempDir::new().unwrap();
1188 let path = dir.path().join("test.snap");
1189
1190 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1191 w.write_update(&put("a", b"v1", 1)).unwrap();
1192 for i in 1..=10u64 {
1193 w.checkpoint(&cursor(i)).unwrap();
1194 }
1195 drop(w);
1196
1197 let size_before = fs::metadata(&path).unwrap().len();
1198 let snap = load(&path).unwrap().unwrap();
1199 let size_after = fs::metadata(&path).unwrap().len();
1200
1201 assert_eq!(snap.entries.len(), 1);
1203 assert_eq!(snap.cursor.as_u64(), Some(10));
1204 assert!(
1205 size_after < size_before,
1206 "stale cursor records should be compacted away: {size_before} -> {size_after}"
1207 );
1208
1209 let after_first = fs::read(&path).unwrap();
1211 load(&path).unwrap().unwrap();
1212 let after_second = fs::read(&path).unwrap();
1213 assert_eq!(after_first, after_second);
1214 }
1215
1216 #[test]
1217 fn already_compact_file_reloads_unchanged() {
1218 let dir = TempDir::new().unwrap();
1221 let path = dir.path().join("test.snap");
1222
1223 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1224 w.write_update(&put("a", b"v1", 1)).unwrap();
1225 w.write_update(&put("b", b"v2", 2)).unwrap();
1226 w.checkpoint(&cursor(2)).unwrap();
1227 drop(w);
1228
1229 load(&path).unwrap().unwrap();
1231 let after_first = fs::read(&path).unwrap();
1232
1233 let snap = load(&path).unwrap().unwrap();
1236 let after_second = fs::read(&path).unwrap();
1237
1238 assert_eq!(after_first, after_second);
1239 assert_eq!(snap.entries.len(), 2);
1240 assert_eq!(snap.cursor.as_u64(), Some(2));
1241 }
1242
1243 #[test]
1244 fn bad_magic() {
1245 let dir = TempDir::new().unwrap();
1246 let path = dir.path().join("test.snap");
1247 fs::write(&path, b"XXXX\x01\x00").unwrap();
1248
1249 match load(&path) {
1250 Err(SnapshotError::InvalidFormat(msg)) => assert!(msg.contains("magic")),
1251 other => panic!("expected InvalidFormat, got {other:?}"),
1252 }
1253 }
1254
1255 #[test]
1256 fn wrong_version_rejected() {
1257 let dir = TempDir::new().unwrap();
1258 let path = dir.path().join("test.snap");
1259 let mut data = Vec::new();
1263 data.extend_from_slice(MAGIC);
1264 data.extend_from_slice(&(FORMAT_VERSION + 1).to_le_bytes());
1265 fs::write(&path, &data).unwrap();
1266
1267 match load(&path) {
1268 Err(SnapshotError::InvalidFormat(msg)) => {
1269 assert!(
1270 msg.contains("version"),
1271 "message should mention version: {msg}"
1272 )
1273 }
1274 other => panic!("expected InvalidFormat, got {other:?}"),
1275 }
1276 }
1277
1278 #[test]
1279 fn empty_log_returns_none() {
1280 let dir = TempDir::new().unwrap();
1281 let path = dir.path().join("test.snap");
1282
1283 let mut f = File::create(&path).unwrap();
1284 f.write_all(MAGIC).unwrap();
1285 f.write_all(&FORMAT_VERSION.to_le_bytes()).unwrap();
1286 drop(f);
1287
1288 assert!(load(&path).unwrap().is_none());
1289 }
1290
1291 #[test]
1292 fn compaction_on_load_shrinks_file() {
1293 let dir = TempDir::new().unwrap();
1294 let path = dir.path().join("test.snap");
1295
1296 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1297 for i in 0..10u64 {
1298 w.write_update(&put("same-key", format!("v{i}").as_bytes(), i))
1299 .unwrap();
1300 w.checkpoint(&cursor(i)).unwrap();
1301 }
1302 drop(w);
1303
1304 let size_before = fs::metadata(&path).unwrap().len();
1305 let snap = load(&path).unwrap().unwrap();
1306 let size_after = fs::metadata(&path).unwrap().len();
1307
1308 assert_eq!(snap.entries.len(), 1);
1309 assert_eq!(snap.entries["same-key"].value, b"v9");
1310 assert!(
1311 size_after < size_before,
1312 "compaction should shrink: {size_before} -> {size_after}"
1313 );
1314 }
1315
1316 #[test]
1317 fn compact_when_threshold_exceeded() {
1318 let dir = TempDir::new().unwrap();
1319 let path = dir.path().join("test.snap");
1320
1321 let mut w = SnapshotWriter::open(&path, 100).unwrap();
1323 for i in 0..20u64 {
1324 w.write_update(&put("key", format!("value-{i}").as_bytes(), i))
1325 .unwrap();
1326 if w.checkpoint(&cursor(i)).unwrap() {
1327 w.compact().unwrap();
1328 }
1329 }
1330 drop(w);
1331
1332 let snap = load(&path).unwrap().unwrap();
1333 assert_eq!(snap.entries.len(), 1);
1334 assert_eq!(snap.entries["key"].value, b"value-19");
1335 }
1336
1337 #[test]
1338 fn reopen_appends() {
1339 let dir = TempDir::new().unwrap();
1340 let path = dir.path().join("test.snap");
1341
1342 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1344 w.write_update(&put("a", b"v1", 1)).unwrap();
1345 w.checkpoint(&cursor(1)).unwrap();
1346 drop(w);
1347
1348 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1350 w.write_update(&put("b", b"v2", 2)).unwrap();
1351 w.checkpoint(&cursor(2)).unwrap();
1352 drop(w);
1353
1354 let snap = load(&path).unwrap().unwrap();
1355 assert_eq!(snap.entries.len(), 2);
1356 assert_eq!(snap.cursor.as_u64(), Some(2));
1357 }
1358
1359 #[test]
1360 fn large_values() {
1361 let dir = TempDir::new().unwrap();
1362 let path = dir.path().join("test.snap");
1363
1364 let big = vec![0xABu8; 100_000];
1365 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1366 w.write_update(&put("big", &big, 1)).unwrap();
1367 w.checkpoint(&cursor(1)).unwrap();
1368 drop(w);
1369
1370 let snap = load(&path).unwrap().unwrap();
1371 assert_eq!(snap.entries.len(), 1);
1372 assert_eq!(snap.entries["big"].value.len(), 100_000);
1373 assert!(snap.entries["big"].value.iter().all(|&b| b == 0xAB));
1374 }
1375
1376 #[test]
1377 fn cursor_only_snapshot_returns_some_with_empty_entries() {
1378 let dir = TempDir::new().unwrap();
1384 let path = dir.path().join("test.snap");
1385
1386 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1387 w.checkpoint(&cursor(42)).unwrap();
1388 drop(w);
1389
1390 let snap = load(&path)
1391 .unwrap()
1392 .expect("cursor-only snapshot should return Some");
1393 assert!(snap.entries.is_empty(), "no entries written, none expected");
1394 assert_eq!(
1395 snap.cursor.as_u64(),
1396 Some(42),
1397 "cursor must survive round-trip"
1398 );
1399 }
1400
1401 #[test]
1402 fn stale_keys_detects_removed_entries() {
1403 let dir = TempDir::new().unwrap();
1404 let path = dir.path().join("test.snap");
1405
1406 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1407 w.write_update(&put("node.a", b"v1", 1)).unwrap();
1408 w.write_update(&put("node.b", b"v2", 2)).unwrap();
1409 w.write_update(&put("node.c", b"v3", 3)).unwrap();
1410 w.checkpoint(&cursor(3)).unwrap();
1411 drop(w);
1412
1413 let snap = load(&path).unwrap().unwrap();
1414
1415 let mut stale = snap.stale_keys(["node.a", "node.c"]);
1417 stale.sort();
1418 assert_eq!(stale, vec!["node.b"]);
1419
1420 let stale = snap.stale_keys(["node.a", "node.b", "node.c"]);
1422 assert!(stale.is_empty());
1423
1424 let mut stale: Vec<&str> = snap.stale_keys(std::iter::empty::<&str>());
1426 stale.sort();
1427 assert_eq!(stale, vec!["node.a", "node.b", "node.c"]);
1428 }
1429
1430 #[test]
1431 fn non_u64_version_token_round_trips() {
1432 let dir = TempDir::new().unwrap();
1437 let path = dir.path().join("test.snap");
1438
1439 let stamp = [9u8, 8, 7, 6, 5, 4, 3, 2, 1, 0];
1440 let token = VersionToken::from_fdb_versionstamp(&stamp);
1441 assert!(token.as_u64().is_none(), "10-byte token is not a u64");
1442
1443 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1444 w.write_update(&KvUpdate::Put(KvEntry {
1445 key: "fdb.key".to_string(),
1446 value: b"v".to_vec(),
1447 version: token.clone(),
1448 }))
1449 .unwrap();
1450 w.checkpoint(&cursor(1)).unwrap();
1451 drop(w);
1452
1453 let snap = load(&path).unwrap().unwrap();
1454 assert_eq!(
1455 snap.entries["fdb.key"].version.as_bytes(),
1456 &stamp,
1457 "versionstamp must survive the snapshot round-trip byte-for-byte"
1458 );
1459 }
1460
1461 #[test]
1467 fn compact_preserves_uncheckpointed_writes() {
1468 let dir = TempDir::new().unwrap();
1469 let path = dir.path().join("test.snap");
1470
1471 let mut w = SnapshotWriter::open(&path, u64::MAX).unwrap();
1472 w.write_update(&put("node.a", b"survives", 1)).unwrap();
1474 w.compact().unwrap();
1475 drop(w);
1476
1477 let snap = load(&path).unwrap().unwrap();
1478 assert!(
1479 snap.entries.contains_key("node.a"),
1480 "compact() must not drop buffered-but-uncheckpointed writes"
1481 );
1482 assert_eq!(snap.entries["node.a"].value, b"survives");
1483 }
1484}