1#![allow(clippy::result_large_err)]
25
26use spacetimedb_durability::TxOffset;
27use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressCount, CompressReader, CompressType};
28use spacetimedb_fs_utils::{
29 dir_trie::{o_excl, o_rdonly, CountCreated, DirTrie},
30 lockfile::{Lockfile, LockfileError},
31};
32use spacetimedb_lib::Identity;
33use spacetimedb_paths::server::{SnapshotDirPath, SnapshotFilePath, SnapshotsPath};
34use spacetimedb_paths::FromPathUnchecked;
35use spacetimedb_primitives::TableId;
36use spacetimedb_sats::{bsatn, de::Deserialize, ser::Serialize};
37use spacetimedb_table::{
38 blob_store::{BlobHash, BlobStore, HashMapBlobStore},
39 page::Page,
40 page_pool::PagePool,
41 table::Table,
42};
43use std::{
44 collections::BTreeMap,
45 collections::HashMap,
46 ffi::OsStr,
47 fmt,
48 io::{BufWriter, Read, Write},
49 ops::{Add, AddAssign},
50 path::PathBuf,
51};
52use tokio::task::spawn_blocking;
53
54pub mod remote;
55use remote::verify_snapshot;
56
57#[derive(Debug, Copy, Clone)]
58pub enum ObjectType {
60 Blob(BlobHash),
61 Page(blake3::Hash),
62 Snapshot,
63}
64
65impl std::fmt::Display for ObjectType {
66 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
67 match *self {
68 ObjectType::Blob(hash) => write!(f, "blob {hash:x?}"),
69 ObjectType::Page(hash) => write!(f, "page {hash:x?}"),
70 ObjectType::Snapshot => write!(f, "snapshot"),
71 }
72 }
73}
74
75#[derive(thiserror::Error, Debug)]
76pub enum SnapshotError {
77 #[error("Cannot open SnapshotRepo {0}: not an accessible directory")]
78 Open(PathBuf),
79 #[error("Failed to write {ty} to {dest_repo:?}, attempting to hardlink link from {source_repo:?}: {cause}")]
80 WriteObject {
81 ty: ObjectType,
82 dest_repo: PathBuf,
83 source_repo: Option<PathBuf>,
84 #[source]
85 cause: std::io::Error,
86 },
87 #[error("Failed to read {ty} from {source_repo:?}: {cause}")]
88 ReadObject {
89 ty: ObjectType,
90 source_repo: PathBuf,
91 #[source]
92 cause: std::io::Error,
93 },
94 #[error("Encountered corrupted {ty} in {source_repo:?}: expected hash {expected:x?}, but computed {computed:x?}")]
95 HashMismatch {
96 ty: ObjectType,
97 expected: [u8; 32],
98 computed: [u8; 32],
99 source_repo: PathBuf,
100 },
101 #[error("Failed to BSATN serialize {ty}: {cause}")]
102 Serialize {
103 ty: ObjectType,
104 #[source]
105 cause: bsatn::ser::BsatnError,
106 },
107 #[error("Failed to BSATN deserialize {ty} from {source_repo:?}: {cause}")]
108 Deserialize {
109 ty: ObjectType,
110 source_repo: PathBuf,
111 cause: bsatn::DecodeError,
112 },
113 #[error("Refusing to reconstruct incomplete snapshot {tx_offset}: lockfile {lockfile:?} exists")]
114 Incomplete { tx_offset: TxOffset, lockfile: PathBuf },
115 #[error("Refusing to reconstruct snapshot {tx_offset} with bad magic number {magic:x?}")]
116 BadMagic { tx_offset: TxOffset, magic: [u8; 4] },
117 #[error("Refusing to reconstruct snapshot {tx_offset} with unsupported version {version}")]
118 BadVersion { tx_offset: TxOffset, version: u8 },
119 #[error("Cannot open snapshot repository in non-directory {root:?}")]
120 NotDirectory { root: SnapshotsPath },
121 #[error(transparent)]
122 Lockfile(#[from] LockfileError),
123 #[error(transparent)]
124 Io(#[from] std::io::Error),
125}
126
127pub const MAGIC: [u8; 4] = *b"txyz";
132
133pub const CURRENT_SNAPSHOT_VERSION: u8 = 0;
135
136pub const CURRENT_MODULE_ABI_VERSION: [u16; 2] = [7, 0];
138
139pub const SNAPSHOT_DIR_EXT: &str = "snapshot_dir";
141
142pub const SNAPSHOT_FILE_EXT: &str = "snapshot_bsatn";
144
145pub const INVALID_SNAPSHOT_DIR_EXT: &str = "invalid_snapshot";
147
148#[derive(Clone, Serialize, Deserialize)]
149struct BlobEntry {
151 hash: BlobHash,
152 uses: u32,
153}
154
155#[derive(Clone, Serialize, Deserialize)]
156struct TableEntry {
158 table_id: TableId,
159 pages: Vec<blake3::Hash>,
160}
161
162#[derive(Clone, Serialize, Deserialize)]
163pub struct Snapshot {
164 magic: [u8; 4],
166
167 version: u8,
169
170 pub database_identity: Identity,
172 pub replica_id: u64,
174
175 module_abi_version: [u16; 2],
179
180 pub tx_offset: TxOffset,
182
183 blobs: Vec<BlobEntry>,
185
186 tables: Vec<TableEntry>,
191}
192
193impl Snapshot {
194 fn write_blob(
201 &mut self,
202 object_repo: &DirTrie,
203 hash: &BlobHash,
204 uses: usize,
205 blob: &[u8],
206 prev_snapshot: Option<&DirTrie>,
207 counter: &mut CountCreated,
208 ) -> Result<(), SnapshotError> {
209 object_repo
210 .hardlink_or_write(prev_snapshot, &hash.data, || blob, counter)
211 .map_err(|cause| SnapshotError::WriteObject {
212 ty: ObjectType::Blob(*hash),
213 dest_repo: object_repo.root().to_path_buf(),
214 source_repo: prev_snapshot.map(|dest_repo| dest_repo.root().to_path_buf()),
215 cause,
216 })?;
217 self.blobs.push(BlobEntry {
218 hash: *hash,
219 uses: uses as u32,
220 });
221 Ok(())
222 }
223
224 fn write_all_blobs(
228 &mut self,
229 object_repo: &DirTrie,
230 blobs: &dyn BlobStore,
231 prev_snapshot: Option<&DirTrie>,
232 counter: &mut CountCreated,
233 ) -> Result<(), SnapshotError> {
234 for (hash, uses, blob) in blobs.iter_blobs() {
235 self.write_blob(object_repo, hash, uses, blob, prev_snapshot, counter)?;
236 }
237 Ok(())
238 }
239
240 fn write_page(
249 object_repo: &DirTrie,
250 page: &Page,
251 hash: blake3::Hash,
252 prev_snapshot: Option<&DirTrie>,
253 counter: &mut CountCreated,
254 ) -> Result<blake3::Hash, SnapshotError> {
255 debug_assert!(page.unmodified_hash().copied() == Some(hash));
256
257 object_repo
258 .hardlink_or_write(prev_snapshot, hash.as_bytes(), || bsatn::to_vec(page).unwrap(), counter)
259 .map_err(|cause| SnapshotError::WriteObject {
260 ty: ObjectType::Page(hash),
261 dest_repo: object_repo.root().to_path_buf(),
262 source_repo: prev_snapshot.map(|source_repo| source_repo.root().to_path_buf()),
263 cause,
264 })?;
265
266 Ok(hash)
267 }
268
269 fn write_table(
273 &mut self,
274 object_repo: &DirTrie,
275 table: &mut Table,
276 prev_snapshot: Option<&DirTrie>,
277 counter: &mut CountCreated,
278 ) -> Result<(), SnapshotError> {
279 let pages = table
280 .iter_pages_with_hashes()
281 .map(|(hash, page)| Self::write_page(object_repo, page, hash, prev_snapshot, counter))
282 .collect::<Result<Vec<blake3::Hash>, SnapshotError>>()?;
283
284 self.tables.push(TableEntry {
285 table_id: table.schema.table_id,
286 pages,
287 });
288 Ok(())
289 }
290
291 fn write_all_tables<'db>(
295 &mut self,
296 object_repo: &DirTrie,
297 tables: impl Iterator<Item = &'db mut Table>,
298 prev_snapshot: Option<&DirTrie>,
299 counter: &mut CountCreated,
300 ) -> Result<(), SnapshotError> {
301 for table in tables {
302 self.write_table(object_repo, table, prev_snapshot, counter)?;
303 }
304 Ok(())
305 }
306
307 pub fn read_from_file(path: &SnapshotFilePath) -> Result<(Self, CompressType), SnapshotError> {
317 let err_read_object = |cause| SnapshotError::ReadObject {
318 ty: ObjectType::Snapshot,
319 source_repo: path.0.clone(),
320 cause,
321 };
322 let snapshot_file = path.open_file(&o_rdonly()).map_err(err_read_object)?;
323 let mut snapshot_file = CompressReader::new(snapshot_file)?;
324
325 let mut hash = [0; blake3::OUT_LEN];
328 snapshot_file.read_exact(&mut hash).map_err(err_read_object)?;
329 let hash = blake3::Hash::from_bytes(hash);
330
331 let mut snapshot_bsatn = vec![];
333 snapshot_file
334 .read_to_end(&mut snapshot_bsatn)
335 .map_err(err_read_object)?;
336 let computed_hash = blake3::hash(&snapshot_bsatn);
337
338 if hash != computed_hash {
340 return Err(SnapshotError::HashMismatch {
341 ty: ObjectType::Snapshot,
342 expected: *hash.as_bytes(),
343 computed: *computed_hash.as_bytes(),
344 source_repo: path.0.clone(),
345 });
346 }
347
348 let snapshot = bsatn::from_slice::<Snapshot>(&snapshot_bsatn).map_err(|cause| SnapshotError::Deserialize {
349 ty: ObjectType::Snapshot,
350 source_repo: path.0.clone(),
351 cause,
352 })?;
353
354 Ok((snapshot, snapshot_file.compress_type()))
355 }
356
357 fn reconstruct_blob_store(&self, object_repo: &DirTrie) -> Result<HashMapBlobStore, SnapshotError> {
363 let mut blob_store = HashMapBlobStore::default();
364
365 for BlobEntry { hash, uses } in &self.blobs {
366 let buf = object_repo
368 .read_entry(&hash.data)
369 .map_err(|cause| SnapshotError::ReadObject {
370 ty: ObjectType::Blob(*hash),
371 source_repo: object_repo.root().to_path_buf(),
372 cause,
373 })?;
374
375 let computed_hash = BlobHash::hash_from_bytes(&buf);
377
378 if *hash != computed_hash {
381 return Err(SnapshotError::HashMismatch {
382 ty: ObjectType::Blob(*hash),
383 expected: hash.data,
384 computed: computed_hash.data,
385 source_repo: object_repo.root().to_path_buf(),
386 });
387 }
388
389 blob_store.insert_with_uses(hash, *uses as usize, buf.into_boxed_slice());
390 }
391
392 Ok(blob_store)
393 }
394
395 fn reconstruct_one_table_pages(
400 object_repo: &DirTrie,
401 pages: &[blake3::Hash],
402 page_pool: &PagePool,
403 ) -> Result<Vec<Box<Page>>, SnapshotError> {
404 pages
405 .iter()
406 .map(|hash| {
407 let buf = object_repo
409 .read_entry(hash.as_bytes())
410 .map_err(|cause| SnapshotError::ReadObject {
411 ty: ObjectType::Page(*hash),
412 source_repo: object_repo.root().to_path_buf(),
413 cause,
414 })?;
415
416 let page = page_pool.take_deserialize_from(&buf);
418 let page = page.map_err(|cause| SnapshotError::Deserialize {
419 ty: ObjectType::Page(*hash),
420 source_repo: object_repo.root().to_path_buf(),
421 cause,
422 })?;
423
424 let computed_hash = page.content_hash();
426
427 if *hash != computed_hash {
430 return Err(SnapshotError::HashMismatch {
431 ty: ObjectType::Page(*hash),
432 expected: *hash.as_bytes(),
433 computed: *computed_hash.as_bytes(),
434 source_repo: object_repo.root().to_path_buf(),
435 });
436 }
437
438 Ok::<Box<Page>, SnapshotError>(page)
439 })
440 .collect()
441 }
442
443 fn reconstruct_one_table(
444 object_repo: &DirTrie,
445 TableEntry { table_id, pages }: &TableEntry,
446 page_pool: &PagePool,
447 ) -> Result<(TableId, Vec<Box<Page>>), SnapshotError> {
448 Ok((
449 *table_id,
450 Self::reconstruct_one_table_pages(object_repo, pages, page_pool)?,
451 ))
452 }
453
454 fn reconstruct_tables(
465 &self,
466 object_repo: &DirTrie,
467 page_pool: &PagePool,
468 ) -> Result<BTreeMap<TableId, Vec<Box<Page>>>, SnapshotError> {
469 self.tables
470 .iter()
471 .map(|tbl| Self::reconstruct_one_table(object_repo, tbl, page_pool))
472 .collect()
473 }
474
475 pub fn total_objects(&self) -> usize {
477 self.blobs.len() + self.tables.iter().map(|table| table.pages.len()).sum::<usize>()
478 }
479
480 pub fn objects(&self) -> impl Iterator<Item = blake3::Hash> + '_ {
483 self.blobs
484 .iter()
485 .map(|b| blake3::Hash::from_bytes(b.hash.data))
486 .chain(self.tables.iter().flat_map(|t| t.pages.iter().copied()))
487 }
488
489 pub fn files<'a>(&'a self, src_repo: &'a DirTrie) -> impl Iterator<Item = (blake3::Hash, PathBuf)> + 'a {
491 self.objects().map(move |hash| {
492 let path = src_repo.file_path(hash.as_bytes());
493 (hash, path)
494 })
495 }
496}
497
498#[derive(Clone, Default)]
500pub struct SnapshotSize {
501 pub snapshot: CompressCount,
503 pub file_size: u64,
505 pub object_size: u64,
507 pub object_count: u64,
509 pub total_size: u64,
511}
512
513impl Add for SnapshotSize {
514 type Output = Self;
515
516 fn add(self, rhs: Self) -> Self::Output {
517 Self {
518 snapshot: CompressCount {
519 none: self.snapshot.none + rhs.snapshot.none,
520 zstd: self.snapshot.zstd + rhs.snapshot.zstd,
521 },
522 file_size: self.file_size + rhs.file_size,
523 object_size: self.object_size + rhs.object_size,
524 object_count: self.object_count + rhs.object_count,
525 total_size: self.total_size + rhs.total_size,
526 }
527 }
528}
529
530impl AddAssign for SnapshotSize {
531 fn add_assign(&mut self, rhs: Self) {
532 *self = self.clone() + rhs;
533 }
534}
535
536impl fmt::Debug for SnapshotSize {
537 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
538 f.debug_struct("SnapshotSize")
539 .field("snapshot ", &self.snapshot)
540 .field("object_count ", &self.object_count)
541 .field("file_size ", &format_args!("{:>8} bytes", self.file_size))
542 .field("object_size ", &format_args!("{:>8} bytes", self.object_size))
543 .field("total_size ", &format_args!("{:>8} bytes", self.total_size))
544 .finish()
545 }
546}
547
548#[derive(Clone)]
550pub struct SnapshotRepository {
551 root: SnapshotsPath,
553
554 database_identity: Identity,
556
557 replica_id: u64,
559 }
563
564impl SnapshotRepository {
565 pub fn database_identity(&self) -> Identity {
567 self.database_identity
568 }
569
570 pub fn create_snapshot<'db>(
578 &self,
579 tables: impl Iterator<Item = &'db mut Table>,
580 blobs: &'db dyn BlobStore,
581 tx_offset: TxOffset,
582 ) -> Result<SnapshotDirPath, SnapshotError> {
583 self.invalidate_newer_snapshots(tx_offset.saturating_sub(1))?;
593
594 let prev_snapshot = self.latest_snapshot()?.map(|offset| self.snapshot_dir_path(offset));
597
598 let prev_snapshot = if let Some(prev_snapshot) = prev_snapshot {
599 assert!(
600 prev_snapshot.0.is_dir(),
601 "prev_snapshot {prev_snapshot:?} is not a directory"
602 );
603 let object_repo = Self::object_repo(&prev_snapshot)?;
604 Some(object_repo)
605 } else {
606 None
607 };
608
609 let mut counter = CountCreated::default();
610
611 let snapshot_dir = self.snapshot_dir_path(tx_offset);
612
613 let _lock = Lockfile::for_file(&snapshot_dir)?;
617
618 snapshot_dir.create()?;
620
621 let object_repo = Self::object_repo(&snapshot_dir)?;
623
624 let mut snapshot = self.empty_snapshot(tx_offset);
626
627 snapshot.write_all_blobs(&object_repo, blobs, prev_snapshot.as_ref(), &mut counter)?;
630 snapshot.write_all_tables(&object_repo, tables, prev_snapshot.as_ref(), &mut counter)?;
631
632 let snapshot_bsatn = bsatn::to_vec(&snapshot).map_err(|cause| SnapshotError::Serialize {
634 ty: ObjectType::Snapshot,
635 cause,
636 })?;
637 let hash = blake3::hash(&snapshot_bsatn);
638
639 {
641 let mut snapshot_file = BufWriter::new(snapshot_dir.snapshot_file(tx_offset).open_file(&o_excl())?);
642 snapshot_file.write_all(hash.as_bytes())?;
643 snapshot_file.write_all(&snapshot_bsatn)?;
644 snapshot_file.flush()?;
645 }
646
647 log::info!(
648 "[{}] SNAPSHOT {:0>20}: Hardlinked {} objects and wrote {} objects",
649 self.database_identity,
650 tx_offset,
651 counter.objects_hardlinked,
652 counter.objects_written,
653 );
654 Ok(snapshot_dir)
657 }
658
659 fn empty_snapshot(&self, tx_offset: TxOffset) -> Snapshot {
660 Snapshot {
661 magic: MAGIC,
662 version: CURRENT_SNAPSHOT_VERSION,
663 database_identity: self.database_identity,
664 replica_id: self.replica_id,
665 module_abi_version: CURRENT_MODULE_ABI_VERSION,
666 tx_offset,
667 blobs: vec![],
668 tables: vec![],
669 }
670 }
671
672 pub fn snapshot_dir_path(&self, tx_offset: TxOffset) -> SnapshotDirPath {
689 self.root.snapshot_dir(tx_offset)
690 }
691
692 pub fn object_repo(snapshot_dir: &SnapshotDirPath) -> Result<DirTrie, std::io::Error> {
703 DirTrie::open(snapshot_dir.objects().0)
704 }
705
706 pub fn read_snapshot(
732 &self,
733 tx_offset: TxOffset,
734 page_pool: &PagePool,
735 ) -> Result<ReconstructedSnapshot, SnapshotError> {
736 let snapshot_dir = self.snapshot_dir_path(tx_offset);
737 let lockfile = Lockfile::lock_path(&snapshot_dir);
738 if lockfile.try_exists()? {
739 return Err(SnapshotError::Incomplete { tx_offset, lockfile });
740 }
741
742 let snapshot_file_path = snapshot_dir.snapshot_file(tx_offset);
743 let (snapshot, compress_type) = Snapshot::read_from_file(&snapshot_file_path)?;
744
745 if snapshot.magic != MAGIC {
746 return Err(SnapshotError::BadMagic {
747 tx_offset,
748 magic: snapshot.magic,
749 });
750 }
751
752 if snapshot.version != CURRENT_SNAPSHOT_VERSION {
753 return Err(SnapshotError::BadVersion {
754 tx_offset,
755 version: snapshot.version,
756 });
757 }
758
759 let snapshot_dir = self.snapshot_dir_path(tx_offset);
760 let object_repo = Self::object_repo(&snapshot_dir)?;
761
762 let blob_store = snapshot.reconstruct_blob_store(&object_repo)?;
763
764 let tables = snapshot.reconstruct_tables(&object_repo, page_pool)?;
765
766 Ok(ReconstructedSnapshot {
767 database_identity: snapshot.database_identity,
768 replica_id: snapshot.replica_id,
769 tx_offset: snapshot.tx_offset,
770 module_abi_version: snapshot.module_abi_version,
771 blob_store,
772 tables,
773 compress_type,
774 })
775 }
776
777 pub async fn verify_snapshot(&self, tx_offset: TxOffset) -> Result<Snapshot, SnapshotError> {
801 let snapshot_dir = self.snapshot_dir_path(tx_offset);
802 let snapshot = spawn_blocking({
803 let snapshot_dir = snapshot_dir.clone();
804 move || {
805 let lockfile = Lockfile::lock_path(&snapshot_dir);
806 if lockfile.try_exists()? {
807 return Err(SnapshotError::Incomplete { tx_offset, lockfile });
808 }
809
810 let snapshot_file_path = snapshot_dir.snapshot_file(tx_offset);
811 let (snapshot, _compress_type) = Snapshot::read_from_file(&snapshot_file_path)?;
812
813 if snapshot.magic != MAGIC {
814 return Err(SnapshotError::BadMagic {
815 tx_offset,
816 magic: snapshot.magic,
817 });
818 }
819 Ok(snapshot)
820 }
821 })
822 .await
823 .unwrap()?;
824 let object_repo = Self::object_repo(&snapshot_dir)?;
825 verify_snapshot(object_repo, self.root.clone(), snapshot.clone())
826 .await
827 .map(drop)?;
828 Ok(snapshot)
829 }
830
831 pub fn open(root: SnapshotsPath, database_identity: Identity, replica_id: u64) -> Result<Self, SnapshotError> {
836 if !root.is_dir() {
837 return Err(SnapshotError::NotDirectory { root });
838 }
839 Ok(Self {
840 root,
841 database_identity,
842 replica_id,
843 })
844 }
845
846 pub fn latest_snapshot_older_than(&self, upper_bound: TxOffset) -> Result<Option<TxOffset>, SnapshotError> {
856 Ok(self
857 .all_snapshots()?
858 .filter(|tx_offset| *tx_offset <= upper_bound)
860 .max())
862 }
863
864 pub fn all_snapshots(&self) -> Result<impl Iterator<Item = TxOffset>, SnapshotError> {
865 Ok(self
866 .root
867 .read_dir()?
869 .filter_map(Result::ok)
871 .map(|dirent| dirent.path())
873 .filter(|path| path.extension() == Some(OsStr::new(SNAPSHOT_DIR_EXT)))
875 .filter(|path| !Lockfile::lock_path(path).exists())
877 .filter_map(|path| {
883 let offset = TxOffset::from_str_radix(path.file_stem()?.to_str()?, 10).ok()?;
884 let snapshot_file = SnapshotDirPath::from_path_unchecked(path).snapshot_file(offset);
885 if !snapshot_file.0.exists() {
886 None
887 } else {
888 Some(offset)
889 }
890 }))
891 }
892
893 pub fn latest_snapshot(&self) -> Result<Option<TxOffset>, SnapshotError> {
898 self.latest_snapshot_older_than(TxOffset::MAX)
899 }
900
901 pub fn invalidate_newer_snapshots(&self, upper_bound: TxOffset) -> Result<(), SnapshotError> {
916 let newer_snapshots = self
917 .all_snapshots()?
918 .filter(|tx_offset| *tx_offset > upper_bound)
919 .collect::<Vec<TxOffset>>();
922
923 for newer_snapshot in newer_snapshots {
924 let path = self.snapshot_dir_path(newer_snapshot);
925 log::info!("Renaming snapshot newer than {upper_bound} from {path:?} to {path:?}");
926 path.rename_invalid()?;
927 }
928 Ok(())
929 }
930
931 pub fn compress_snapshot(
934 previous: Option<&(TxOffset, SnapshotDirPath)>,
935 current: &(TxOffset, SnapshotDirPath),
936 ) -> Result<CompressType, SnapshotError> {
937 let (tx_offset, snapshot_dir) = current;
938 let tx_offset = *tx_offset;
939 let snapshot_file = snapshot_dir.snapshot_file(tx_offset);
940 let (snapshot, compress_type) = Snapshot::read_from_file(&snapshot_file)?;
941
942 if compress_type != CompressType::None {
943 log::info!(
944 "Snapshot {snapshot_dir:?} of replica {} is already compressed: {compress_type:?}",
945 snapshot.replica_id
946 );
947 return Ok(compress_type);
948 }
949
950 let old = if let Some((tx_offset, snapshot_dir)) = previous {
951 let snapshot_file = snapshot_dir.snapshot_file(*tx_offset);
952 let (snapshot, _) = Snapshot::read_from_file(&snapshot_file)?;
953 let dir = SnapshotRepository::object_repo(snapshot_dir)?;
954 snapshot.files(&dir).collect()
955 } else {
956 HashMap::new()
957 };
958
959 fn compress(
961 old: &HashMap<blake3::Hash, PathBuf>,
962 src: &PathBuf,
963 hash: Option<blake3::Hash>,
964 ) -> Result<(), SnapshotError> {
965 let read = CompressReader::new(o_rdonly().open(src)?)?;
966 if read.compress_type() != CompressType::None {
967 return Ok(()); }
969 if let Some(hash) = hash {
970 if let Some(old_path) = old.get(&hash) {
971 let old_file = CompressReader::new(o_rdonly().open(old_path)?)?;
972 if old_file.compress_type() != CompressType::None {
973 std::fs::hard_link(old_path, src.with_extension("_tmp"))?;
974 std::fs::rename(src.with_extension("_tmp"), src)?;
975 return Ok(());
976 }
977 }
978 }
979
980 let dst = src.with_extension("_tmp");
981 let mut write = BufWriter::new(o_excl().open(&dst)?);
982 compress_with_zstd(read, &mut write, None)?;
984 std::fs::rename(dst, src)?;
985 Ok(())
986 }
987
988 let _lock = Lockfile::for_file(snapshot_dir)?;
989
990 log::info!(
991 "Compressing snapshot {snapshot_dir:?} of replica {}",
992 snapshot.replica_id
993 );
994
995 let dir = SnapshotRepository::object_repo(snapshot_dir)?;
996 for (hash, path) in snapshot.files(&dir) {
997 compress(&old, &path, Some(hash)).inspect_err(|err| {
998 log::error!("Failed to compress object file {path:?}: {err}");
999 })?;
1000 }
1001
1002 compress(&old, &snapshot_file.0, None).inspect_err(|err| {
1004 log::error!("Failed to compress snapshot file {snapshot_file:?}: {err}");
1005 })?;
1006
1007 log::info!(
1008 "Compressed snapshot {snapshot_dir:?} of replica {}: {compress_type:?}",
1009 snapshot.replica_id
1010 );
1011 Ok(CompressType::Zstd)
1012 }
1013
1014 pub fn compress_older_snapshots(&self, upper_bound: TxOffset) -> Result<CompressCount, SnapshotError> {
1018 let mut snapshots: Vec<_> = self
1020 .all_snapshots()?
1021 .filter_map(|tx_offset| {
1023 if tx_offset < upper_bound {
1024 let path = self.snapshot_dir_path(tx_offset);
1025 Some((tx_offset, path))
1026 } else {
1027 None
1028 }
1029 })
1030 .collect();
1031 snapshots.sort_by(|(a_offset, _), (b_offset, _)| a_offset.cmp(b_offset));
1032 let mut count = CompressCount::default();
1033 let mut previous = None;
1034 for current in snapshots.iter() {
1035 match Self::compress_snapshot(previous, current)
1036 .inspect_err(|err| {
1037 log::error!("Failed to compress snapshot {:?}: {err}", current.1);
1038 })
1039 .unwrap_or(CompressType::None)
1040 {
1041 CompressType::None => count.none += 1,
1042 CompressType::Zstd => count.zstd += 1,
1043 }
1044 previous = Some(current);
1045 }
1046
1047 Ok(count)
1048 }
1049
1050 pub fn size_on_disk(&self) -> Result<SnapshotSize, SnapshotError> {
1052 let mut size = SnapshotSize::default();
1053
1054 for snapshot in self.all_snapshots()? {
1055 size += self.size_on_disk_snapshot(snapshot)?;
1056 }
1057 Ok(size)
1058 }
1059
1060 pub fn size_on_disk_snapshot(&self, offset: TxOffset) -> Result<SnapshotSize, SnapshotError> {
1061 let mut size = SnapshotSize::default();
1062
1063 let snapshot_dir = self.snapshot_dir_path(offset);
1064 let snapshot_file = snapshot_dir.snapshot_file(offset);
1065 let snapshot_file_size = snapshot_file.metadata()?.len();
1066
1067 let (snapshot, compress_type) = Snapshot::read_from_file(&snapshot_file)?;
1068
1069 size.snapshot = match compress_type {
1070 CompressType::None => CompressCount { none: 1, zstd: 0 },
1071 CompressType::Zstd => CompressCount { none: 0, zstd: 1 },
1072 };
1073
1074 size.file_size += snapshot_file_size;
1075 size.total_size += snapshot_file_size;
1076 let repo = Self::object_repo(&snapshot_dir)?;
1077 for (_, f) in snapshot.files(&repo) {
1078 let file_size = f.metadata()?.len();
1079 size.object_size += file_size;
1080 size.total_size += file_size;
1081 size.object_count += 1;
1082 }
1083
1084 Ok(size)
1085 }
1086}
1087
1088pub struct ReconstructedSnapshot {
1089 pub database_identity: Identity,
1091 pub replica_id: u64,
1093 pub tx_offset: TxOffset,
1095 pub module_abi_version: [u16; 2],
1097
1098 pub blob_store: HashMapBlobStore,
1100
1101 pub tables: BTreeMap<TableId, Vec<Box<Page>>>,
1107 pub compress_type: CompressType,
1109}
1110
1111#[cfg(test)]
1112mod tests {
1113 use std::fs::OpenOptions;
1114
1115 use tempfile::tempdir;
1116
1117 use super::*;
1118
1119 #[test]
1120 fn listing_ignores_if_snapshot_file_is_missing() -> anyhow::Result<()> {
1121 let tmp = tempdir()?;
1122
1123 let root = SnapshotsPath::from_path_unchecked(tmp.path());
1124 let repo = SnapshotRepository::open(root, Identity::ZERO, 42)?;
1125 for i in 0..10 {
1126 repo.snapshot_dir_path(i).create()?;
1127 }
1128 repo.snapshot_dir_path(5)
1129 .snapshot_file(5)
1130 .open_file(OpenOptions::new().write(true).create_new(true))
1131 .map(drop)?;
1132
1133 assert_eq!(vec![5], repo.all_snapshots()?.collect::<Vec<_>>());
1134
1135 Ok(())
1136 }
1137
1138 #[test]
1139 fn listing_ignores_if_lockfile_exists() -> anyhow::Result<()> {
1140 let tmp = tempdir()?;
1141
1142 let root = SnapshotsPath::from_path_unchecked(tmp.path());
1143 let repo = SnapshotRepository::open(root, Identity::ZERO, 42)?;
1144 for i in 0..10 {
1145 let snapshot_dir = repo.snapshot_dir_path(i);
1146 snapshot_dir.create()?;
1147 snapshot_dir
1148 .snapshot_file(i)
1149 .open_file(OpenOptions::new().write(true).create_new(true))
1150 .map(drop)?;
1151 }
1152 let _lock = Lockfile::for_file(repo.snapshot_dir_path(5))?;
1153
1154 let mut snapshots = repo.all_snapshots()?.collect::<Vec<_>>();
1155 snapshots.sort();
1156 assert_eq!(vec![0, 1, 2, 3, 4, 6, 7, 8, 9], snapshots);
1157
1158 Ok(())
1159 }
1160}