spacetimedb_snapshot/
lib.rs

1//! This crate implements capturing and restoring snapshots in SpacetimeDB.
2//!
3//! A snapshot is an on-disk view of the committed state of a database at a particular transaction offset.
4//! Snapshots exist as an optimization over replaying the commitlog;
5//! when restoring to the most recent transaction, rather than replaying the commitlog from 0,
6//! we can reload the most recent snapshot, then replay only the suffix of the commitlog.
7//!
8//! This crate is responsible for:
9//! - The on-disk format of snapshots.
10//! - A [`SnapshotRepository`] which contains multiple snapshots of a DB and can create and retrieve them.
11//! - Creating a snapshot given a view of a DB's committed state in [`SnapshotRepository::create_snapshot`].
12//! - Reading an on-disk snapshot into memory as a [`ReconstructedSnapshot`] in [`SnapshotRepository::read_snapshot`].
13//!   The [`ReconstructedSnapshot`] can then be installed into a datastore.
14//! - Locating the most-recent snapshot of a DB, or the most recent snapshot not newer than a given tx offset,
15//!   in [`SnapshotRepository::latest_snapshot`] and [`SnapshotRepository::latest_snapshot_older_than`].
16//!
17//! This crate *is not* responsible for:
18//! - Determining when to capture snapshots.
19//! - Deciding which snapshot to restore from after a restart.
20//! - Replaying the suffix of the commitlog after restoring a snapshot.
21//! - Transforming a [`ReconstructedSnapshot`] into a live Spacetime datastore.
22// TODO(docs): consider making the snapshot proposal public and either linking or pasting it here.
23
24#![allow(clippy::result_large_err)]
25
26use spacetimedb_durability::TxOffset;
27use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressCount, CompressReader, CompressType};
28use spacetimedb_fs_utils::{
29    dir_trie::{o_excl, o_rdonly, CountCreated, DirTrie},
30    lockfile::{Lockfile, LockfileError},
31};
32use spacetimedb_lib::Identity;
33use spacetimedb_paths::server::{SnapshotDirPath, SnapshotFilePath, SnapshotsPath};
34use spacetimedb_paths::FromPathUnchecked;
35use spacetimedb_primitives::TableId;
36use spacetimedb_sats::{bsatn, de::Deserialize, ser::Serialize};
37use spacetimedb_table::{
38    blob_store::{BlobHash, BlobStore, HashMapBlobStore},
39    page::Page,
40    page_pool::PagePool,
41    table::Table,
42};
43use std::{
44    collections::BTreeMap,
45    collections::HashMap,
46    ffi::OsStr,
47    fmt,
48    io::{BufWriter, Read, Write},
49    ops::{Add, AddAssign},
50    path::PathBuf,
51};
52use tokio::task::spawn_blocking;
53
54pub mod remote;
55use remote::verify_snapshot;
56
57#[derive(Debug, Copy, Clone)]
58/// An object which may be associated with an error during snapshotting.
59pub enum ObjectType {
60    Blob(BlobHash),
61    Page(blake3::Hash),
62    Snapshot,
63}
64
65impl std::fmt::Display for ObjectType {
66    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
67        match *self {
68            ObjectType::Blob(hash) => write!(f, "blob {hash:x?}"),
69            ObjectType::Page(hash) => write!(f, "page {hash:x?}"),
70            ObjectType::Snapshot => write!(f, "snapshot"),
71        }
72    }
73}
74
75#[derive(thiserror::Error, Debug)]
76pub enum SnapshotError {
77    #[error("Cannot open SnapshotRepo {0}: not an accessible directory")]
78    Open(PathBuf),
79    #[error("Failed to write {ty} to {dest_repo:?}, attempting to hardlink link from {source_repo:?}: {cause}")]
80    WriteObject {
81        ty: ObjectType,
82        dest_repo: PathBuf,
83        source_repo: Option<PathBuf>,
84        #[source]
85        cause: std::io::Error,
86    },
87    #[error("Failed to read {ty} from {source_repo:?}: {cause}")]
88    ReadObject {
89        ty: ObjectType,
90        source_repo: PathBuf,
91        #[source]
92        cause: std::io::Error,
93    },
94    #[error("Encountered corrupted {ty} in {source_repo:?}: expected hash {expected:x?}, but computed {computed:x?}")]
95    HashMismatch {
96        ty: ObjectType,
97        expected: [u8; 32],
98        computed: [u8; 32],
99        source_repo: PathBuf,
100    },
101    #[error("Failed to BSATN serialize {ty}: {cause}")]
102    Serialize {
103        ty: ObjectType,
104        #[source]
105        cause: bsatn::ser::BsatnError,
106    },
107    #[error("Failed to BSATN deserialize {ty} from {source_repo:?}: {cause}")]
108    Deserialize {
109        ty: ObjectType,
110        source_repo: PathBuf,
111        cause: bsatn::DecodeError,
112    },
113    #[error("Refusing to reconstruct incomplete snapshot {tx_offset}: lockfile {lockfile:?} exists")]
114    Incomplete { tx_offset: TxOffset, lockfile: PathBuf },
115    #[error("Refusing to reconstruct snapshot {tx_offset} with bad magic number {magic:x?}")]
116    BadMagic { tx_offset: TxOffset, magic: [u8; 4] },
117    #[error("Refusing to reconstruct snapshot {tx_offset} with unsupported version {version}")]
118    BadVersion { tx_offset: TxOffset, version: u8 },
119    #[error("Cannot open snapshot repository in non-directory {root:?}")]
120    NotDirectory { root: SnapshotsPath },
121    #[error(transparent)]
122    Lockfile(#[from] LockfileError),
123    #[error(transparent)]
124    Io(#[from] std::io::Error),
125}
126
127/// Magic number for snapshot files: a point in spacetime.
128///
129/// Chosen because the commitlog magic number is a spacetime interval,
130/// so a snapshot should be a point to which an interval can be applied.
131pub const MAGIC: [u8; 4] = *b"txyz";
132
133/// Snapshot format version number.
134pub const CURRENT_SNAPSHOT_VERSION: u8 = 0;
135
136/// ABI version of the module from which this snapshot was created, as [MAJOR, MINOR].
137pub const CURRENT_MODULE_ABI_VERSION: [u16; 2] = [7, 0];
138
139/// File extension of snapshot directories.
140pub const SNAPSHOT_DIR_EXT: &str = "snapshot_dir";
141
142/// File extension of snapshot files, which contain BSATN-encoded [`Snapshot`]s preceded by [`blake3::Hash`]es.
143pub const SNAPSHOT_FILE_EXT: &str = "snapshot_bsatn";
144
145/// File extension of snapshots which have been marked invalid by [`SnapshotRepository::invalidate_newer_snapshots`].
146pub const INVALID_SNAPSHOT_DIR_EXT: &str = "invalid_snapshot";
147
148#[derive(Clone, Serialize, Deserialize)]
149/// The hash and refcount of a single blob in the blob store.
150struct BlobEntry {
151    hash: BlobHash,
152    uses: u32,
153}
154
155#[derive(Clone, Serialize, Deserialize)]
156/// A snapshot of a single table, containing the hashes of all its resident pages.
157struct TableEntry {
158    table_id: TableId,
159    pages: Vec<blake3::Hash>,
160}
161
162#[derive(Clone, Serialize, Deserialize)]
163pub struct Snapshot {
164    /// A magic number: must be equal to [`MAGIC`].
165    magic: [u8; 4],
166
167    /// The snapshot version number. Must be equal to [`CURRENT_SNAPSHOT_VERSION`].
168    version: u8,
169
170    /// The identity of the snapshotted database.
171    pub database_identity: Identity,
172    /// The instance ID of the snapshotted database.
173    pub replica_id: u64,
174
175    /// ABI version of the module from which this snapshot was created, as [MAJOR, MINOR].
176    ///
177    /// As of this proposal, [7, 0].
178    module_abi_version: [u16; 2],
179
180    /// The transaction offset of the state this snapshot reflects.
181    pub tx_offset: TxOffset,
182
183    /// The hashes and reference counts of all objects in the blob store.
184    blobs: Vec<BlobEntry>,
185
186    /// For each table, its table ID followed by the hashes of all resident pages.
187    ///
188    /// It's necessary to store the table ID rather than relying on order
189    /// because table IDs may be sparse (and will be, once we reserve a bunch of system table ids).
190    tables: Vec<TableEntry>,
191}
192
193impl Snapshot {
194    /// Insert a single large blob from a [`BlobStore`]
195    /// into the in-memory snapshot `self`
196    /// and the on-disk object repository `object_repo`.
197    ///
198    /// If the `prev_snapshot` is supplied, this method will attempt to hardlink the blob's on-disk object
199    /// from that previous snapshot into `object_repo` rather than creating a fresh object.
200    fn write_blob(
201        &mut self,
202        object_repo: &DirTrie,
203        hash: &BlobHash,
204        uses: usize,
205        blob: &[u8],
206        prev_snapshot: Option<&DirTrie>,
207        counter: &mut CountCreated,
208    ) -> Result<(), SnapshotError> {
209        object_repo
210            .hardlink_or_write(prev_snapshot, &hash.data, || blob, counter)
211            .map_err(|cause| SnapshotError::WriteObject {
212                ty: ObjectType::Blob(*hash),
213                dest_repo: object_repo.root().to_path_buf(),
214                source_repo: prev_snapshot.map(|dest_repo| dest_repo.root().to_path_buf()),
215                cause,
216            })?;
217        self.blobs.push(BlobEntry {
218            hash: *hash,
219            uses: uses as u32,
220        });
221        Ok(())
222    }
223
224    /// Populate the in-memory snapshot `self`,
225    /// and the on-disk object repository `object_repo`,
226    /// with all large blobs from `blobs`.
227    fn write_all_blobs(
228        &mut self,
229        object_repo: &DirTrie,
230        blobs: &dyn BlobStore,
231        prev_snapshot: Option<&DirTrie>,
232        counter: &mut CountCreated,
233    ) -> Result<(), SnapshotError> {
234        for (hash, uses, blob) in blobs.iter_blobs() {
235            self.write_blob(object_repo, hash, uses, blob, prev_snapshot, counter)?;
236        }
237        Ok(())
238    }
239
240    /// Write a single `page` into the on-disk object repository `object_repo`.
241    ///
242    /// `hash` must be the content hash of `page`, and must be stored in `page.unmodified_hash()`.
243    ///
244    /// Returns the `hash` for convenient use with [`Iter::map`] in [`Self::write_table`].
245    ///
246    /// If the `prev_snapshot` is supplied, this function will attempt to hardlink the page's on-disk object
247    /// from that previous snapshot into `object_repo` rather than creating a fresh object.
248    fn write_page(
249        object_repo: &DirTrie,
250        page: &Page,
251        hash: blake3::Hash,
252        prev_snapshot: Option<&DirTrie>,
253        counter: &mut CountCreated,
254    ) -> Result<blake3::Hash, SnapshotError> {
255        debug_assert!(page.unmodified_hash().copied() == Some(hash));
256
257        object_repo
258            .hardlink_or_write(prev_snapshot, hash.as_bytes(), || bsatn::to_vec(page).unwrap(), counter)
259            .map_err(|cause| SnapshotError::WriteObject {
260                ty: ObjectType::Page(hash),
261                dest_repo: object_repo.root().to_path_buf(),
262                source_repo: prev_snapshot.map(|source_repo| source_repo.root().to_path_buf()),
263                cause,
264            })?;
265
266        Ok(hash)
267    }
268
269    /// Populate the in-memory snapshot `self`,
270    /// and the on-disk object repository `object_repo`,
271    /// with all pages from `table`.
272    fn write_table(
273        &mut self,
274        object_repo: &DirTrie,
275        table: &mut Table,
276        prev_snapshot: Option<&DirTrie>,
277        counter: &mut CountCreated,
278    ) -> Result<(), SnapshotError> {
279        let pages = table
280            .iter_pages_with_hashes()
281            .map(|(hash, page)| Self::write_page(object_repo, page, hash, prev_snapshot, counter))
282            .collect::<Result<Vec<blake3::Hash>, SnapshotError>>()?;
283
284        self.tables.push(TableEntry {
285            table_id: table.schema.table_id,
286            pages,
287        });
288        Ok(())
289    }
290
291    /// Populate the in-memory snapshot `self`,
292    /// and the on-disk object repository `object_repo`,
293    /// with all pages from all tables in `tables`.
294    fn write_all_tables<'db>(
295        &mut self,
296        object_repo: &DirTrie,
297        tables: impl Iterator<Item = &'db mut Table>,
298        prev_snapshot: Option<&DirTrie>,
299        counter: &mut CountCreated,
300    ) -> Result<(), SnapshotError> {
301        for table in tables {
302            self.write_table(object_repo, table, prev_snapshot, counter)?;
303        }
304        Ok(())
305    }
306
307    /// Read a [`Snapshot`] from the file at `path`, verify its hash, and return it.
308    ///
309    /// **NOTE**: It detects if the file was compressed or not.
310    ///
311    /// Fails if:
312    /// - `path` does not refer to a readable file.
313    /// - Fails to check if is compressed or not.
314    /// - The file at `path` is corrupted,
315    ///   as detected by comparing the hash of its bytes to a hash recorded in the file.
316    pub fn read_from_file(path: &SnapshotFilePath) -> Result<(Self, CompressType), SnapshotError> {
317        let err_read_object = |cause| SnapshotError::ReadObject {
318            ty: ObjectType::Snapshot,
319            source_repo: path.0.clone(),
320            cause,
321        };
322        let snapshot_file = path.open_file(&o_rdonly()).map_err(err_read_object)?;
323        let mut snapshot_file = CompressReader::new(snapshot_file)?;
324
325        // The snapshot file is prefixed with the hash of the `Snapshot`'s BSATN.
326        // Read that hash.
327        let mut hash = [0; blake3::OUT_LEN];
328        snapshot_file.read_exact(&mut hash).map_err(err_read_object)?;
329        let hash = blake3::Hash::from_bytes(hash);
330
331        // Read the `Snapshot`'s BSATN and compute its hash.
332        let mut snapshot_bsatn = vec![];
333        snapshot_file
334            .read_to_end(&mut snapshot_bsatn)
335            .map_err(err_read_object)?;
336        let computed_hash = blake3::hash(&snapshot_bsatn);
337
338        // Compare the saved and computed hashes, and fail if they do not match.
339        if hash != computed_hash {
340            return Err(SnapshotError::HashMismatch {
341                ty: ObjectType::Snapshot,
342                expected: *hash.as_bytes(),
343                computed: *computed_hash.as_bytes(),
344                source_repo: path.0.clone(),
345            });
346        }
347
348        let snapshot = bsatn::from_slice::<Snapshot>(&snapshot_bsatn).map_err(|cause| SnapshotError::Deserialize {
349            ty: ObjectType::Snapshot,
350            source_repo: path.0.clone(),
351            cause,
352        })?;
353
354        Ok((snapshot, snapshot_file.compress_type()))
355    }
356
357    /// Construct a [`HashMapBlobStore`] containing all the blobs referenced in `self`,
358    /// reading their data from files in the `object_repo`.
359    ///
360    /// Fails if any of the object files is missing or corrupted,
361    /// as detected by comparing the hash of its bytes to the hash recorded in `self`.
362    fn reconstruct_blob_store(&self, object_repo: &DirTrie) -> Result<HashMapBlobStore, SnapshotError> {
363        let mut blob_store = HashMapBlobStore::default();
364
365        for BlobEntry { hash, uses } in &self.blobs {
366            // Read the bytes of the blob object.
367            let buf = object_repo
368                .read_entry(&hash.data)
369                .map_err(|cause| SnapshotError::ReadObject {
370                    ty: ObjectType::Blob(*hash),
371                    source_repo: object_repo.root().to_path_buf(),
372                    cause,
373                })?;
374
375            // Compute the blob's hash.
376            let computed_hash = BlobHash::hash_from_bytes(&buf);
377
378            // Compare the computed hash to the one recorded in the `Snapshot`,
379            // and fail if they do not match.
380            if *hash != computed_hash {
381                return Err(SnapshotError::HashMismatch {
382                    ty: ObjectType::Blob(*hash),
383                    expected: hash.data,
384                    computed: computed_hash.data,
385                    source_repo: object_repo.root().to_path_buf(),
386                });
387            }
388
389            blob_store.insert_with_uses(hash, *uses as usize, buf.into_boxed_slice());
390        }
391
392        Ok(blob_store)
393    }
394
395    /// Read all the pages referenced by `pages` from the `object_repo`.
396    ///
397    /// Fails if any of the pages files is missing or corrupted,
398    /// as detected by comparing the hash of its bytes to the hash listed in `pages`.
399    fn reconstruct_one_table_pages(
400        object_repo: &DirTrie,
401        pages: &[blake3::Hash],
402        page_pool: &PagePool,
403    ) -> Result<Vec<Box<Page>>, SnapshotError> {
404        pages
405            .iter()
406            .map(|hash| {
407                // Read the BSATN bytes of the on-disk page object.
408                let buf = object_repo
409                    .read_entry(hash.as_bytes())
410                    .map_err(|cause| SnapshotError::ReadObject {
411                        ty: ObjectType::Page(*hash),
412                        source_repo: object_repo.root().to_path_buf(),
413                        cause,
414                    })?;
415
416                // Deserialize the bytes into a `Page`.
417                let page = page_pool.take_deserialize_from(&buf);
418                let page = page.map_err(|cause| SnapshotError::Deserialize {
419                    ty: ObjectType::Page(*hash),
420                    source_repo: object_repo.root().to_path_buf(),
421                    cause,
422                })?;
423
424                // Compute the hash of the page.
425                let computed_hash = page.content_hash();
426
427                // Compare the computed hash to the one recorded in the `Snapshot`,
428                // and fail if they do not match.
429                if *hash != computed_hash {
430                    return Err(SnapshotError::HashMismatch {
431                        ty: ObjectType::Page(*hash),
432                        expected: *hash.as_bytes(),
433                        computed: *computed_hash.as_bytes(),
434                        source_repo: object_repo.root().to_path_buf(),
435                    });
436                }
437
438                Ok::<Box<Page>, SnapshotError>(page)
439            })
440            .collect()
441    }
442
443    fn reconstruct_one_table(
444        object_repo: &DirTrie,
445        TableEntry { table_id, pages }: &TableEntry,
446        page_pool: &PagePool,
447    ) -> Result<(TableId, Vec<Box<Page>>), SnapshotError> {
448        Ok((
449            *table_id,
450            Self::reconstruct_one_table_pages(object_repo, pages, page_pool)?,
451        ))
452    }
453
454    /// Reconstruct all the table data from `self`,
455    /// reading pages from files in the `object_repo`.
456    ///
457    /// This method cannot construct [`Table`] objects
458    /// because doing so requires knowledge of the system tables' schemas
459    /// to compute the schemas of the user-defined tables
460    ///
461    /// Fails if any object file referenced in `self` (as a page or large blob)
462    /// is missing or corrupted,
463    /// as detected by comparing the hash of its bytes to the hash recorded in `self`.
464    fn reconstruct_tables(
465        &self,
466        object_repo: &DirTrie,
467        page_pool: &PagePool,
468    ) -> Result<BTreeMap<TableId, Vec<Box<Page>>>, SnapshotError> {
469        self.tables
470            .iter()
471            .map(|tbl| Self::reconstruct_one_table(object_repo, tbl, page_pool))
472            .collect()
473    }
474
475    /// The number of objects in this snapshot, both blobs and pages.
476    pub fn total_objects(&self) -> usize {
477        self.blobs.len() + self.tables.iter().map(|table| table.pages.len()).sum::<usize>()
478    }
479
480    /// Obtain an iterator over the [`blake3::Hash`]es of all objects
481    /// this snapshot is referring to.
482    pub fn objects(&self) -> impl Iterator<Item = blake3::Hash> + '_ {
483        self.blobs
484            .iter()
485            .map(|b| blake3::Hash::from_bytes(b.hash.data))
486            .chain(self.tables.iter().flat_map(|t| t.pages.iter().copied()))
487    }
488
489    /// Obtain an iterator over the [`Path`]s of all objects
490    pub fn files<'a>(&'a self, src_repo: &'a DirTrie) -> impl Iterator<Item = (blake3::Hash, PathBuf)> + 'a {
491        self.objects().map(move |hash| {
492            let path = src_repo.file_path(hash.as_bytes());
493            (hash, path)
494        })
495    }
496}
497
498/// Collect the size of the snapshot and the number of objects in it.
499#[derive(Clone, Default)]
500pub struct SnapshotSize {
501    /// How many snapshots are in the snapshot directory, and what `CompressType` they are.
502    pub snapshot: CompressCount,
503    /// The size of the snapshot file in `bytes`.
504    pub file_size: u64,
505    /// The size of the snapshot's objects in `bytes`.
506    pub object_size: u64,
507    /// The number of objects in the snapshot.
508    pub object_count: u64,
509    /// Total size of the snapshot in `bytes`, `file_size + object_size`.
510    pub total_size: u64,
511}
512
513impl Add for SnapshotSize {
514    type Output = Self;
515
516    fn add(self, rhs: Self) -> Self::Output {
517        Self {
518            snapshot: CompressCount {
519                none: self.snapshot.none + rhs.snapshot.none,
520                zstd: self.snapshot.zstd + rhs.snapshot.zstd,
521            },
522            file_size: self.file_size + rhs.file_size,
523            object_size: self.object_size + rhs.object_size,
524            object_count: self.object_count + rhs.object_count,
525            total_size: self.total_size + rhs.total_size,
526        }
527    }
528}
529
530impl AddAssign for SnapshotSize {
531    fn add_assign(&mut self, rhs: Self) {
532        *self = self.clone() + rhs;
533    }
534}
535
536impl fmt::Debug for SnapshotSize {
537    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
538        f.debug_struct("SnapshotSize")
539            .field("snapshot       ", &self.snapshot)
540            .field("object_count   ", &self.object_count)
541            .field("file_size      ", &format_args!("{:>8} bytes", self.file_size))
542            .field("object_size    ", &format_args!("{:>8} bytes", self.object_size))
543            .field("total_size     ", &format_args!("{:>8} bytes", self.total_size))
544            .finish()
545    }
546}
547
548/// A repository of snapshots of a particular database instance.
549#[derive(Clone)]
550pub struct SnapshotRepository {
551    /// The directory which contains all the snapshots.
552    root: SnapshotsPath,
553
554    /// The database identity of the database instance for which this repository stores snapshots.
555    database_identity: Identity,
556
557    /// The database instance ID of the database instance for which this repository stores snapshots.
558    replica_id: u64,
559    // TODO(deduplication): track the most recent successful snapshot
560    // (possibly in a file)
561    // and hardlink its objects into the next snapshot for deduplication.
562}
563
564impl SnapshotRepository {
565    /// Returns the [`Identity`] of the database this [`SnapshotRepository`] is configured to snapshot.
566    pub fn database_identity(&self) -> Identity {
567        self.database_identity
568    }
569
570    /// Capture a snapshot of the state of the database at `tx_offset`,
571    /// where `tables` is the committed state of all the tables in the database,
572    /// and `blobs` is the committed state's blob store.
573    ///
574    /// Returns the path of the newly-created snapshot directory.
575    ///
576    /// **NOTE**: The current snapshot is uncompressed to avoid the potential slowdown.
577    pub fn create_snapshot<'db>(
578        &self,
579        tables: impl Iterator<Item = &'db mut Table>,
580        blobs: &'db dyn BlobStore,
581        tx_offset: TxOffset,
582    ) -> Result<SnapshotDirPath, SnapshotError> {
583        // Invalidate equal to or newer than `tx_offset`.
584        //
585        // This is because snapshots don't currently track the epoch in which
586        // they were created:
587        //
588        // Say, for example, a snapshot was created at offset 10, then a leader
589        // failover causes the commitlog to be reset to offset 9. The next
590        // transaction (also offset 10) will trigger snapshot creation, but we'd
591        // mistake the existing snapshot (now invalid) as the previous snapshot.
592        self.invalidate_newer_snapshots(tx_offset.saturating_sub(1))?;
593
594        // If a previous snapshot exists in this snapshot repo,
595        // get a handle on its object repo in order to hardlink shared objects into the new snapshot.
596        let prev_snapshot = self.latest_snapshot()?.map(|offset| self.snapshot_dir_path(offset));
597
598        let prev_snapshot = if let Some(prev_snapshot) = prev_snapshot {
599            assert!(
600                prev_snapshot.0.is_dir(),
601                "prev_snapshot {prev_snapshot:?} is not a directory"
602            );
603            let object_repo = Self::object_repo(&prev_snapshot)?;
604            Some(object_repo)
605        } else {
606            None
607        };
608
609        let mut counter = CountCreated::default();
610
611        let snapshot_dir = self.snapshot_dir_path(tx_offset);
612
613        // Before performing any observable operations,
614        // acquire a lockfile on the snapshot you want to create.
615        // Because we could be compressing the snapshot.
616        let _lock = Lockfile::for_file(&snapshot_dir)?;
617
618        // Create the snapshot directory.
619        snapshot_dir.create()?;
620
621        // Create a new `DirTrie` to hold all the content-addressed objects in the snapshot.
622        let object_repo = Self::object_repo(&snapshot_dir)?;
623
624        // Build the in-memory `Snapshot` object.
625        let mut snapshot = self.empty_snapshot(tx_offset);
626
627        // Populate both the in-memory `Snapshot` object and the on-disk object repository
628        // with all the blobs and pages.
629        snapshot.write_all_blobs(&object_repo, blobs, prev_snapshot.as_ref(), &mut counter)?;
630        snapshot.write_all_tables(&object_repo, tables, prev_snapshot.as_ref(), &mut counter)?;
631
632        // Serialize and hash the in-memory `Snapshot` object.
633        let snapshot_bsatn = bsatn::to_vec(&snapshot).map_err(|cause| SnapshotError::Serialize {
634            ty: ObjectType::Snapshot,
635            cause,
636        })?;
637        let hash = blake3::hash(&snapshot_bsatn);
638
639        // Create the snapshot file, containing first the hash, then the `Snapshot`.
640        {
641            let mut snapshot_file = BufWriter::new(snapshot_dir.snapshot_file(tx_offset).open_file(&o_excl())?);
642            snapshot_file.write_all(hash.as_bytes())?;
643            snapshot_file.write_all(&snapshot_bsatn)?;
644            snapshot_file.flush()?;
645        }
646
647        log::info!(
648            "[{}] SNAPSHOT {:0>20}: Hardlinked {} objects and wrote {} objects",
649            self.database_identity,
650            tx_offset,
651            counter.objects_hardlinked,
652            counter.objects_written,
653        );
654        // Success! return the directory of the newly-created snapshot.
655        // The lockfile will be dropped here.
656        Ok(snapshot_dir)
657    }
658
659    fn empty_snapshot(&self, tx_offset: TxOffset) -> Snapshot {
660        Snapshot {
661            magic: MAGIC,
662            version: CURRENT_SNAPSHOT_VERSION,
663            database_identity: self.database_identity,
664            replica_id: self.replica_id,
665            module_abi_version: CURRENT_MODULE_ABI_VERSION,
666            tx_offset,
667            blobs: vec![],
668            tables: vec![],
669        }
670    }
671
672    /// Get the path to the directory which would contain the snapshot of transaction `tx_offset`.
673    ///
674    /// The directory may not exist if no snapshot has been taken of `tx_offset`.
675    ///
676    /// The directory may exist but be locked or incomplete
677    /// if a file with the same name and the extension `.lock` exists.
678    /// In this case, callers should treat the snapshot as if it did not exist.
679    ///
680    /// Use `[Self::all_snapshots]` to get `tx_offsets` which will return valid extant paths.
681    /// `[Self::all_snapshots]` will never return a `tx_offset` for a locked or incomplete snapshot.
682    /// `[Self::all_snapshots]` does not validate the contents of snapshots,
683    /// so it may return a `tx_offset` whose snapshot is corrupted.
684    ///
685    /// Any mutations to any files contained in the returned directory
686    /// will likely corrupt the snapshot,
687    /// causing attempts to reconstruct it to fail.
688    pub fn snapshot_dir_path(&self, tx_offset: TxOffset) -> SnapshotDirPath {
689        self.root.snapshot_dir(tx_offset)
690    }
691
692    /// Given `snapshot_dir` as the result of [`Self::snapshot_dir_path`],
693    /// get the [`DirTrie`] which contains serialized objects (pages and large blobs)
694    /// referenced by the [`Snapshot`] contained in the [`Self::snapshot_file_path`].
695    ///
696    /// Consequences are unspecified if this method is called from outside this crate
697    /// on a non-existent, locked or incomplete `snapshot_dir`.
698    ///
699    /// Any mutations to the returned [`DirTrie`] or its contents
700    /// will likely render the snapshot corrupted,
701    /// causing future attempts to reconstruct it to fail.
702    pub fn object_repo(snapshot_dir: &SnapshotDirPath) -> Result<DirTrie, std::io::Error> {
703        DirTrie::open(snapshot_dir.objects().0)
704    }
705
706    /// Read a snapshot contained in self referring to `tx_offset`,
707    /// verify its hashes,
708    /// and parse it into an in-memory structure [`ReconstructedSnapshot`]
709    /// which can be used to build a `CommittedState`.
710    ///
711    /// This method cannot construct [`Table`] objects
712    /// because doing so requires knowledge of the system tables' schemas
713    /// to compute the schemas of the user-defined tables.
714    ///
715    /// Fails if:
716    /// - No snapshot exists in `self` for `tx_offset`.
717    /// - The snapshot is incomplete, as detected by its lockfile still existing.
718    /// - Any object file (page or large blob) referenced by the snapshot file
719    ///   is missing or corrupted,
720    ///   as detected by comparing the hash of its bytes to the hash recorded in the snapshot file.
721    /// - The snapshot file's magic number does not match [`MAGIC`].
722    /// - The snapshot file's version does not match [`CURRENT_SNAPSHOT_VERSION`].
723    ///
724    /// The following conditions are not detected or considered as errors:
725    /// - The snapshot file's database identity or instance ID do not match those in `self`.
726    /// - The snapshot file's module ABI version does not match [`CURRENT_MODULE_ABI_VERSION`].
727    /// - The snapshot file's recorded transaction offset does not match `tx_offset`.
728    ///
729    /// This means that callers must inspect the returned [`ReconstructedSnapshot`]
730    /// and verify that they can handle its contained database identity, instance ID, module ABI version and transaction offset.
731    pub fn read_snapshot(
732        &self,
733        tx_offset: TxOffset,
734        page_pool: &PagePool,
735    ) -> Result<ReconstructedSnapshot, SnapshotError> {
736        let snapshot_dir = self.snapshot_dir_path(tx_offset);
737        let lockfile = Lockfile::lock_path(&snapshot_dir);
738        if lockfile.try_exists()? {
739            return Err(SnapshotError::Incomplete { tx_offset, lockfile });
740        }
741
742        let snapshot_file_path = snapshot_dir.snapshot_file(tx_offset);
743        let (snapshot, compress_type) = Snapshot::read_from_file(&snapshot_file_path)?;
744
745        if snapshot.magic != MAGIC {
746            return Err(SnapshotError::BadMagic {
747                tx_offset,
748                magic: snapshot.magic,
749            });
750        }
751
752        if snapshot.version != CURRENT_SNAPSHOT_VERSION {
753            return Err(SnapshotError::BadVersion {
754                tx_offset,
755                version: snapshot.version,
756            });
757        }
758
759        let snapshot_dir = self.snapshot_dir_path(tx_offset);
760        let object_repo = Self::object_repo(&snapshot_dir)?;
761
762        let blob_store = snapshot.reconstruct_blob_store(&object_repo)?;
763
764        let tables = snapshot.reconstruct_tables(&object_repo, page_pool)?;
765
766        Ok(ReconstructedSnapshot {
767            database_identity: snapshot.database_identity,
768            replica_id: snapshot.replica_id,
769            tx_offset: snapshot.tx_offset,
770            module_abi_version: snapshot.module_abi_version,
771            blob_store,
772            tables,
773            compress_type,
774        })
775    }
776
777    /// Read the [`Snapshot`] metadata at `tx_offset` and verify the integrity
778    /// of all objects it refers to.
779    ///
780    /// Fails if:
781    ///
782    /// - No snapshot exists in `self` for `tx_offset`
783    /// - The snapshot is incomplete, as detected by its lockfile still existing.
784    /// - The snapshot file's magic number does not match [`MAGIC`].
785    /// - Any object file (page or large blob) referenced by the snapshot file
786    ///   is missing or corrupted.
787    ///
788    /// The following conditions are not detected or considered as errors:
789    ///
790    /// - The snapshot file's version does not match [`CURRENT_SNAPSHOT_VERSION`].
791    /// - The snapshot file's database identity or instance ID do not match
792    ///   those in `self`.
793    /// - The snapshot file's module ABI version does not match
794    ///   [`CURRENT_MODULE_ABI_VERSION`].
795    /// - The snapshot file's recorded transaction offset does not match
796    ///   `tx_offset`.
797    ///
798    /// Callers may want to inspect the returned [`Snapshot`] and ensure its
799    /// contents match their expectations.
800    pub async fn verify_snapshot(&self, tx_offset: TxOffset) -> Result<Snapshot, SnapshotError> {
801        let snapshot_dir = self.snapshot_dir_path(tx_offset);
802        let snapshot = spawn_blocking({
803            let snapshot_dir = snapshot_dir.clone();
804            move || {
805                let lockfile = Lockfile::lock_path(&snapshot_dir);
806                if lockfile.try_exists()? {
807                    return Err(SnapshotError::Incomplete { tx_offset, lockfile });
808                }
809
810                let snapshot_file_path = snapshot_dir.snapshot_file(tx_offset);
811                let (snapshot, _compress_type) = Snapshot::read_from_file(&snapshot_file_path)?;
812
813                if snapshot.magic != MAGIC {
814                    return Err(SnapshotError::BadMagic {
815                        tx_offset,
816                        magic: snapshot.magic,
817                    });
818                }
819                Ok(snapshot)
820            }
821        })
822        .await
823        .unwrap()?;
824        let object_repo = Self::object_repo(&snapshot_dir)?;
825        verify_snapshot(object_repo, self.root.clone(), snapshot.clone())
826            .await
827            .map(drop)?;
828        Ok(snapshot)
829    }
830
831    /// Open a repository at `root`, failing if the `root` doesn't exist or isn't a directory.
832    ///
833    /// Calls [`Path::is_dir`] and requires that the result is `true`.
834    /// See that method for more detailed preconditions on this function.
835    pub fn open(root: SnapshotsPath, database_identity: Identity, replica_id: u64) -> Result<Self, SnapshotError> {
836        if !root.is_dir() {
837            return Err(SnapshotError::NotDirectory { root });
838        }
839        Ok(Self {
840            root,
841            database_identity,
842            replica_id,
843        })
844    }
845
846    /// Return the `TxOffset` of the highest-offset complete snapshot in the repository
847    /// lower than or equal to `upper_bound`.
848    ///
849    /// When searching for a snapshot to restore,
850    /// we will pass the [`spacetimedb_durability::Durability::durable_tx_offset`]
851    /// as the `upper_bound` to ensure we don't lose TXes.
852    ///
853    /// Does not verify that the snapshot of the returned `TxOffset` is valid and uncorrupted,
854    /// so a subsequent [`Self::read_snapshot`] may fail.
855    pub fn latest_snapshot_older_than(&self, upper_bound: TxOffset) -> Result<Option<TxOffset>, SnapshotError> {
856        Ok(self
857            .all_snapshots()?
858            // Ignore `tx_offset`s greater than the current upper bound.
859            .filter(|tx_offset| *tx_offset <= upper_bound)
860            // Select the largest TxOffset.
861            .max())
862    }
863
864    pub fn all_snapshots(&self) -> Result<impl Iterator<Item = TxOffset>, SnapshotError> {
865        Ok(self
866            .root
867            // Item = Result<DirEntry>
868            .read_dir()?
869            // Item = DirEntry
870            .filter_map(Result::ok)
871            // Item = PathBuf
872            .map(|dirent| dirent.path())
873            // Ignore entries not shaped like snapshot directories.
874            .filter(|path| path.extension() == Some(OsStr::new(SNAPSHOT_DIR_EXT)))
875            // Ignore entries whose lockfile still exists.
876            .filter(|path| !Lockfile::lock_path(path).exists())
877            // Parse each entry's TxOffset from the file name; ignore unparsable.
878            // Also ignore if the snapshot file doesn't exists.
879            // This can happen on incomplete transfers, or if something went
880            // wrong during creation.
881            // Item = TxOffset
882            .filter_map(|path| {
883                let offset = TxOffset::from_str_radix(path.file_stem()?.to_str()?, 10).ok()?;
884                let snapshot_file = SnapshotDirPath::from_path_unchecked(path).snapshot_file(offset);
885                if !snapshot_file.0.exists() {
886                    None
887                } else {
888                    Some(offset)
889                }
890            }))
891    }
892
893    /// Return the `TxOffset` of the highest-offset complete snapshot in the repository.
894    ///
895    /// Does not verify that the snapshot of the returned `TxOffset` is valid and uncorrupted,
896    /// so a subsequent [`Self::read_snapshot`] may fail.
897    pub fn latest_snapshot(&self) -> Result<Option<TxOffset>, SnapshotError> {
898        self.latest_snapshot_older_than(TxOffset::MAX)
899    }
900
901    /// Rename any snapshot newer than `upper_bound` with [`INVALID_SNAPSHOT_DIR_EXT`].
902    ///
903    /// When rebuilding a database, we will call this method
904    /// with the [`spacetimedb_durability::Durability::durable_tx_offset`] as the `upper_bound`
905    /// in order to prevent us from retaining snapshots which will be superseded by the new diverging history.
906    ///
907    /// It is also called when creating a new snapshot via [`Self::create_snapshot`]
908    /// in order to prevent a diverging snapshot from being used as its own parent.
909    ///
910    /// Does not invalidate snapshots which are locked.
911    ///
912    /// This may overwrite previously-invalidated snapshots.
913    ///
914    /// If this method returns an error, some snapshots may have been invalidated, but not all will have been.
915    pub fn invalidate_newer_snapshots(&self, upper_bound: TxOffset) -> Result<(), SnapshotError> {
916        let newer_snapshots = self
917            .all_snapshots()?
918            .filter(|tx_offset| *tx_offset > upper_bound)
919            // Collect to a vec to avoid iterator invalidation,
920            // as the subsequent `for` loop will mutate the directory.
921            .collect::<Vec<TxOffset>>();
922
923        for newer_snapshot in newer_snapshots {
924            let path = self.snapshot_dir_path(newer_snapshot);
925            log::info!("Renaming snapshot newer than {upper_bound} from {path:?} to {path:?}");
926            path.rename_invalid()?;
927        }
928        Ok(())
929    }
930
931    /// Compress the snapshot (if not already compressed)
932    /// of the replica with the given `tx_offset`, and return the [`CompressType`] type..
933    pub fn compress_snapshot(
934        previous: Option<&(TxOffset, SnapshotDirPath)>,
935        current: &(TxOffset, SnapshotDirPath),
936    ) -> Result<CompressType, SnapshotError> {
937        let (tx_offset, snapshot_dir) = current;
938        let tx_offset = *tx_offset;
939        let snapshot_file = snapshot_dir.snapshot_file(tx_offset);
940        let (snapshot, compress_type) = Snapshot::read_from_file(&snapshot_file)?;
941
942        if compress_type != CompressType::None {
943            log::info!(
944                "Snapshot {snapshot_dir:?} of replica {} is already compressed: {compress_type:?}",
945                snapshot.replica_id
946            );
947            return Ok(compress_type);
948        }
949
950        let old = if let Some((tx_offset, snapshot_dir)) = previous {
951            let snapshot_file = snapshot_dir.snapshot_file(*tx_offset);
952            let (snapshot, _) = Snapshot::read_from_file(&snapshot_file)?;
953            let dir = SnapshotRepository::object_repo(snapshot_dir)?;
954            snapshot.files(&dir).collect()
955        } else {
956            HashMap::new()
957        };
958
959        // Replace the original file with the compressed one.
960        fn compress(
961            old: &HashMap<blake3::Hash, PathBuf>,
962            src: &PathBuf,
963            hash: Option<blake3::Hash>,
964        ) -> Result<(), SnapshotError> {
965            let read = CompressReader::new(o_rdonly().open(src)?)?;
966            if read.compress_type() != CompressType::None {
967                return Ok(()); // Already compressed
968            }
969            if let Some(hash) = hash {
970                if let Some(old_path) = old.get(&hash) {
971                    let old_file = CompressReader::new(o_rdonly().open(old_path)?)?;
972                    if old_file.compress_type() != CompressType::None {
973                        std::fs::hard_link(old_path, src.with_extension("_tmp"))?;
974                        std::fs::rename(src.with_extension("_tmp"), src)?;
975                        return Ok(());
976                    }
977                }
978            }
979
980            let dst = src.with_extension("_tmp");
981            let mut write = BufWriter::new(o_excl().open(&dst)?);
982            // The default frame size compress better.
983            compress_with_zstd(read, &mut write, None)?;
984            std::fs::rename(dst, src)?;
985            Ok(())
986        }
987
988        let _lock = Lockfile::for_file(snapshot_dir)?;
989
990        log::info!(
991            "Compressing snapshot {snapshot_dir:?} of replica {}",
992            snapshot.replica_id
993        );
994
995        let dir = SnapshotRepository::object_repo(snapshot_dir)?;
996        for (hash, path) in snapshot.files(&dir) {
997            compress(&old, &path, Some(hash)).inspect_err(|err| {
998                log::error!("Failed to compress object file {path:?}: {err}");
999            })?;
1000        }
1001
1002        // Compress the snapshot file last, so it marks it compressed.
1003        compress(&old, &snapshot_file.0, None).inspect_err(|err| {
1004            log::error!("Failed to compress snapshot file {snapshot_file:?}: {err}");
1005        })?;
1006
1007        log::info!(
1008            "Compressed snapshot {snapshot_dir:?} of replica {}: {compress_type:?}",
1009            snapshot.replica_id
1010        );
1011        Ok(CompressType::Zstd)
1012    }
1013
1014    /// Compress the snapshots older than the given [`TxOffset`].
1015    ///
1016    /// *NOTE*: Compression errors are logged but not returned.
1017    pub fn compress_older_snapshots(&self, upper_bound: TxOffset) -> Result<CompressCount, SnapshotError> {
1018        // TODO: The more snapshots we have, the more time it takes to compress, we need a way to limit this.
1019        let mut snapshots: Vec<_> = self
1020            .all_snapshots()?
1021            // Ignore `tx_offset`s greater than the current upper bound.
1022            .filter_map(|tx_offset| {
1023                if tx_offset < upper_bound {
1024                    let path = self.snapshot_dir_path(tx_offset);
1025                    Some((tx_offset, path))
1026                } else {
1027                    None
1028                }
1029            })
1030            .collect();
1031        snapshots.sort_by(|(a_offset, _), (b_offset, _)| a_offset.cmp(b_offset));
1032        let mut count = CompressCount::default();
1033        let mut previous = None;
1034        for current in snapshots.iter() {
1035            match Self::compress_snapshot(previous, current)
1036                .inspect_err(|err| {
1037                    log::error!("Failed to compress snapshot {:?}: {err}", current.1);
1038                })
1039                .unwrap_or(CompressType::None)
1040            {
1041                CompressType::None => count.none += 1,
1042                CompressType::Zstd => count.zstd += 1,
1043            }
1044            previous = Some(current);
1045        }
1046
1047        Ok(count)
1048    }
1049
1050    /// Calculate the size of the snapshot repository in bytes.
1051    pub fn size_on_disk(&self) -> Result<SnapshotSize, SnapshotError> {
1052        let mut size = SnapshotSize::default();
1053
1054        for snapshot in self.all_snapshots()? {
1055            size += self.size_on_disk_snapshot(snapshot)?;
1056        }
1057        Ok(size)
1058    }
1059
1060    pub fn size_on_disk_snapshot(&self, offset: TxOffset) -> Result<SnapshotSize, SnapshotError> {
1061        let mut size = SnapshotSize::default();
1062
1063        let snapshot_dir = self.snapshot_dir_path(offset);
1064        let snapshot_file = snapshot_dir.snapshot_file(offset);
1065        let snapshot_file_size = snapshot_file.metadata()?.len();
1066
1067        let (snapshot, compress_type) = Snapshot::read_from_file(&snapshot_file)?;
1068
1069        size.snapshot = match compress_type {
1070            CompressType::None => CompressCount { none: 1, zstd: 0 },
1071            CompressType::Zstd => CompressCount { none: 0, zstd: 1 },
1072        };
1073
1074        size.file_size += snapshot_file_size;
1075        size.total_size += snapshot_file_size;
1076        let repo = Self::object_repo(&snapshot_dir)?;
1077        for (_, f) in snapshot.files(&repo) {
1078            let file_size = f.metadata()?.len();
1079            size.object_size += file_size;
1080            size.total_size += file_size;
1081            size.object_count += 1;
1082        }
1083
1084        Ok(size)
1085    }
1086}
1087
1088pub struct ReconstructedSnapshot {
1089    /// The identity of the snapshotted database.
1090    pub database_identity: Identity,
1091    /// The instance ID of the snapshotted database.
1092    pub replica_id: u64,
1093    /// The transaction offset of the state this snapshot reflects.
1094    pub tx_offset: TxOffset,
1095    /// ABI version of the module from which this snapshot was created, as [MAJOR, MINOR].
1096    pub module_abi_version: [u16; 2],
1097
1098    /// The blob store of the snapshotted state.
1099    pub blob_store: HashMapBlobStore,
1100
1101    /// All the tables from the snapshotted state, sans schema information and indexes.
1102    ///
1103    /// This includes the system tables,
1104    /// so the schema of user-defined tables can be recovered
1105    /// given knowledge of the schema of `st_table` and `st_column`.
1106    pub tables: BTreeMap<TableId, Vec<Box<Page>>>,
1107    /// If the snapshot was compressed or not.
1108    pub compress_type: CompressType,
1109}
1110
1111#[cfg(test)]
1112mod tests {
1113    use std::fs::OpenOptions;
1114
1115    use tempfile::tempdir;
1116
1117    use super::*;
1118
1119    #[test]
1120    fn listing_ignores_if_snapshot_file_is_missing() -> anyhow::Result<()> {
1121        let tmp = tempdir()?;
1122
1123        let root = SnapshotsPath::from_path_unchecked(tmp.path());
1124        let repo = SnapshotRepository::open(root, Identity::ZERO, 42)?;
1125        for i in 0..10 {
1126            repo.snapshot_dir_path(i).create()?;
1127        }
1128        repo.snapshot_dir_path(5)
1129            .snapshot_file(5)
1130            .open_file(OpenOptions::new().write(true).create_new(true))
1131            .map(drop)?;
1132
1133        assert_eq!(vec![5], repo.all_snapshots()?.collect::<Vec<_>>());
1134
1135        Ok(())
1136    }
1137
1138    #[test]
1139    fn listing_ignores_if_lockfile_exists() -> anyhow::Result<()> {
1140        let tmp = tempdir()?;
1141
1142        let root = SnapshotsPath::from_path_unchecked(tmp.path());
1143        let repo = SnapshotRepository::open(root, Identity::ZERO, 42)?;
1144        for i in 0..10 {
1145            let snapshot_dir = repo.snapshot_dir_path(i);
1146            snapshot_dir.create()?;
1147            snapshot_dir
1148                .snapshot_file(i)
1149                .open_file(OpenOptions::new().write(true).create_new(true))
1150                .map(drop)?;
1151        }
1152        let _lock = Lockfile::for_file(repo.snapshot_dir_path(5))?;
1153
1154        let mut snapshots = repo.all_snapshots()?.collect::<Vec<_>>();
1155        snapshots.sort();
1156        assert_eq!(vec![0, 1, 2, 3, 4, 6, 7, 8, 9], snapshots);
1157
1158        Ok(())
1159    }
1160}