solana_runtime/
snapshot_utils.rs

1use {
2    crate::{
3        bank::{BankFieldsToDeserialize, BankFieldsToSerialize, BankHashStats, BankSlotDelta},
4        serde_snapshot::{
5            self, AccountsDbFields, BankIncrementalSnapshotPersistence, ExtraFieldsToSerialize,
6            SerializableAccountStorageEntry, SnapshotAccountsDbFields, SnapshotBankFields,
7            SnapshotStreams,
8        },
9        snapshot_archive_info::{
10            FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfo,
11            SnapshotArchiveInfoGetter,
12        },
13        snapshot_bank_utils,
14        snapshot_config::SnapshotConfig,
15        snapshot_hash::SnapshotHash,
16        snapshot_package::{SnapshotKind, SnapshotPackage},
17        snapshot_utils::snapshot_storage_rebuilder::{
18            get_slot_and_append_vec_id, SnapshotStorageRebuilder,
19        },
20    },
21    bzip2::read::BzDecoder,
22    crossbeam_channel::{Receiver, Sender},
23    flate2::read::GzDecoder,
24    log::*,
25    regex::Regex,
26    solana_accounts_db::{
27        account_storage::AccountStorageMap,
28        account_storage_reader::AccountStorageReader,
29        accounts_db::{AccountStorageEntry, AtomicAccountsFileId},
30        accounts_file::{AccountsFile, AccountsFileError, StorageAccess},
31        accounts_hash::{AccountsDeltaHash, AccountsHash},
32        epoch_accounts_hash::EpochAccountsHash,
33        hardened_unpack::{self, ParallelSelector, UnpackError},
34        shared_buffer_reader::{SharedBuffer, SharedBufferReader},
35        utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
36    },
37    solana_clock::{Epoch, Slot},
38    solana_hash::Hash,
39    solana_measure::{measure::Measure, measure_time, measure_us},
40    std::{
41        cmp::Ordering,
42        collections::{HashMap, HashSet},
43        fmt, fs,
44        io::{BufReader, BufWriter, Error as IoError, Read, Result as IoResult, Seek, Write},
45        mem,
46        num::NonZeroUsize,
47        ops::RangeInclusive,
48        path::{Path, PathBuf},
49        process::ExitStatus,
50        str::FromStr,
51        sync::{Arc, LazyLock},
52        thread::{Builder, JoinHandle},
53    },
54    tar::{self, Archive},
55    tempfile::TempDir,
56    thiserror::Error,
57};
58#[cfg(feature = "dev-context-only-utils")]
59use {
60    hardened_unpack::UnpackedAppendVecMap, rayon::prelude::*,
61    solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
62};
63
64mod archive_format;
65pub mod snapshot_storage_rebuilder;
66pub use archive_format::*;
67
68pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
69pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
70pub const SNAPSHOT_STATE_COMPLETE_FILENAME: &str = "state_complete";
71pub const SNAPSHOT_STORAGES_FLUSHED_FILENAME: &str = "storages_flushed";
72pub const SNAPSHOT_ACCOUNTS_HARDLINKS: &str = "accounts_hardlinks";
73pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
74pub const SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME: &str = "full_snapshot_slot";
75pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
76const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; // byte
77const VERSION_STRING_V1_2_0: &str = "1.2.0";
78pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
79pub const BANK_SNAPSHOT_PRE_FILENAME_EXTENSION: &str = "pre";
80// The following unsafes are
81// - Safe because the values are fixed, known non-zero constants
82// - Necessary in order to have a plain NonZeroUsize as the constant, NonZeroUsize
83//   returns an Option<NonZeroUsize> and we can't .unwrap() at compile time
84pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
85    NonZeroUsize::new(2).unwrap();
86pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
87    NonZeroUsize::new(4).unwrap();
88pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
89pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
90
91#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
92pub enum SnapshotVersion {
93    #[default]
94    V1_2_0,
95}
96
97impl fmt::Display for SnapshotVersion {
98    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
99        f.write_str(From::from(*self))
100    }
101}
102
103impl From<SnapshotVersion> for &'static str {
104    fn from(snapshot_version: SnapshotVersion) -> &'static str {
105        match snapshot_version {
106            SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
107        }
108    }
109}
110
111impl FromStr for SnapshotVersion {
112    type Err = &'static str;
113
114    fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
115        // Remove leading 'v' or 'V' from slice
116        let version_string = if version_string
117            .get(..1)
118            .is_some_and(|s| s.eq_ignore_ascii_case("v"))
119        {
120            &version_string[1..]
121        } else {
122            version_string
123        };
124        match version_string {
125            VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
126            _ => Err("unsupported snapshot version"),
127        }
128    }
129}
130
131impl SnapshotVersion {
132    pub fn as_str(self) -> &'static str {
133        <&str as From<Self>>::from(self)
134    }
135}
136
137/// Information about a bank snapshot. Namely the slot of the bank, the path to the snapshot, and
138/// the kind of the snapshot.
139#[derive(PartialEq, Eq, Debug)]
140pub struct BankSnapshotInfo {
141    /// Slot of the bank
142    pub slot: Slot,
143    /// Snapshot kind
144    pub snapshot_kind: BankSnapshotKind,
145    /// Path to the bank snapshot directory
146    pub snapshot_dir: PathBuf,
147    /// Snapshot version
148    pub snapshot_version: SnapshotVersion,
149}
150
151impl PartialOrd for BankSnapshotInfo {
152    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
153        Some(self.cmp(other))
154    }
155}
156
157// Order BankSnapshotInfo by slot (ascending), which practically is sorting chronologically
158impl Ord for BankSnapshotInfo {
159    fn cmp(&self, other: &Self) -> Ordering {
160        self.slot.cmp(&other.slot)
161    }
162}
163
164impl BankSnapshotInfo {
165    pub fn new_from_dir(
166        bank_snapshots_dir: impl AsRef<Path>,
167        slot: Slot,
168    ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
169        // check this directory to see if there is a BankSnapshotPre and/or
170        // BankSnapshotPost file
171        let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
172
173        if !bank_snapshot_dir.is_dir() {
174            return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
175                bank_snapshot_dir,
176            ));
177        }
178
179        // Among the files checks, the completion flag file check should be done first to avoid the later
180        // I/O errors.
181
182        // There is a time window from the slot directory being created, and the content being completely
183        // filled.  Check the completion to avoid using a highest found slot directory with missing content.
184        if !is_bank_snapshot_complete(&bank_snapshot_dir) {
185            return Err(SnapshotNewFromDirError::IncompleteDir(bank_snapshot_dir));
186        }
187
188        let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
189        if !status_cache_file.is_file() {
190            return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
191                status_cache_file,
192            ));
193        }
194
195        let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
196        let version_str = snapshot_version_from_file(&version_path).or(Err(
197            SnapshotNewFromDirError::MissingVersionFile(version_path),
198        ))?;
199        let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
200            .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
201
202        let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
203        let bank_snapshot_pre_path =
204            bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
205
206        // NOTE: It is important that checking for "Pre" happens before "Post.
207        //
208        // Consider the scenario where AccountsHashVerifier is actively processing an
209        // AccountsPackage for a snapshot/slot; if AHV is in the middle of reserializing the
210        // bank snapshot file (writing the new "Post" file), and then the process dies,
211        // there will be an incomplete "Post" file on disk.  We do not want only the existence of
212        // this "Post" file to be sufficient for deciding the snapshot kind as "Post".  More so,
213        // "Post" *requires* the *absence* of a "Pre" file.
214        let snapshot_kind = if bank_snapshot_pre_path.is_file() {
215            BankSnapshotKind::Pre
216        } else if bank_snapshot_post_path.is_file() {
217            BankSnapshotKind::Post
218        } else {
219            return Err(SnapshotNewFromDirError::MissingSnapshotFile(
220                bank_snapshot_dir,
221            ));
222        };
223
224        Ok(BankSnapshotInfo {
225            slot,
226            snapshot_kind,
227            snapshot_dir: bank_snapshot_dir,
228            snapshot_version,
229        })
230    }
231
232    pub fn snapshot_path(&self) -> PathBuf {
233        let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot));
234
235        let ext = match self.snapshot_kind {
236            BankSnapshotKind::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
237            BankSnapshotKind::Post => "",
238        };
239        bank_snapshot_path.set_extension(ext);
240
241        bank_snapshot_path
242    }
243}
244
245/// Bank snapshots traditionally had their accounts hash calculated prior to serialization.  Since
246/// the hash calculation takes a long time, an optimization has been put in to offload the accounts
247/// hash calculation.  The bank serialization format has not changed, so we need another way to
248/// identify if a bank snapshot contains the calculated accounts hash or not.
249///
250/// When a bank snapshot is first taken, it does not have the calculated accounts hash.  It is said
251/// that this bank snapshot is "pre" accounts hash.  Later, when the accounts hash is calculated,
252/// the bank snapshot is re-serialized, and is now "post" accounts hash.
253#[derive(Debug, Copy, Clone, Eq, PartialEq)]
254pub enum BankSnapshotKind {
255    /// This bank snapshot has *not* yet had its accounts hash calculated
256    Pre,
257    /// This bank snapshot *has* had its accounts hash calculated
258    Post,
259}
260
261/// When constructing a bank a snapshot, traditionally the snapshot was from a snapshot archive.  Now,
262/// the snapshot can be from a snapshot directory, or from a snapshot archive.  This is the flag to
263/// indicate which.
264#[derive(Clone, Copy, Debug, Eq, PartialEq)]
265pub enum SnapshotFrom {
266    /// Build from the snapshot archive
267    Archive,
268    /// Build directly from the bank snapshot directory
269    Dir,
270}
271
272/// Helper type when rebuilding from snapshots.  Designed to handle when rebuilding from just a
273/// full snapshot, or from both a full snapshot and an incremental snapshot.
274#[derive(Debug)]
275pub struct SnapshotRootPaths {
276    pub full_snapshot_root_file_path: PathBuf,
277    pub incremental_snapshot_root_file_path: Option<PathBuf>,
278}
279
280/// Helper type to bundle up the results from `unarchive_snapshot()`
281#[derive(Debug)]
282pub struct UnarchivedSnapshot {
283    #[allow(dead_code)]
284    unpack_dir: TempDir,
285    pub storage: AccountStorageMap,
286    pub bank_fields: BankFieldsToDeserialize,
287    pub accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
288    pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
289    pub measure_untar: Measure,
290}
291
292/// Helper type to bundle up the results from `verify_and_unarchive_snapshots()`.
293#[derive(Debug)]
294pub struct UnarchivedSnapshots {
295    pub full_storage: AccountStorageMap,
296    pub incremental_storage: Option<AccountStorageMap>,
297    pub bank_fields: SnapshotBankFields,
298    pub accounts_db_fields: SnapshotAccountsDbFields<SerializableAccountStorageEntry>,
299    pub full_unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
300    pub incremental_unpacked_snapshots_dir_and_version: Option<UnpackedSnapshotsDirAndVersion>,
301    pub full_measure_untar: Measure,
302    pub incremental_measure_untar: Option<Measure>,
303    pub next_append_vec_id: AtomicAccountsFileId,
304}
305
306/// Guard type that keeps the unpack directories of snapshots alive.
307/// Once dropped, the unpack directories are removed.
308#[allow(dead_code)]
309#[derive(Debug)]
310pub struct UnarchivedSnapshotsGuard {
311    full_unpack_dir: TempDir,
312    incremental_unpack_dir: Option<TempDir>,
313}
314/// Helper type for passing around the unpacked snapshots dir and the snapshot version together
315#[derive(Debug)]
316pub struct UnpackedSnapshotsDirAndVersion {
317    pub unpacked_snapshots_dir: PathBuf,
318    pub snapshot_version: SnapshotVersion,
319}
320
321/// Helper type for passing around account storage map and next append vec id
322/// for reconstructing accounts from a snapshot
323pub(crate) struct StorageAndNextAccountsFileId {
324    pub storage: AccountStorageMap,
325    pub next_append_vec_id: AtomicAccountsFileId,
326}
327
328#[derive(Error, Debug)]
329#[allow(clippy::large_enum_variant)]
330pub enum SnapshotError {
331    #[error("I/O error: {0}")]
332    Io(#[from] IoError),
333
334    #[error("AccountsFile error: {0}")]
335    AccountsFileError(#[from] AccountsFileError),
336
337    #[error("serialization error: {0}")]
338    Serialize(#[from] bincode::Error),
339
340    #[error("crossbeam send error: {0}")]
341    CrossbeamSend(#[from] crossbeam_channel::SendError<PathBuf>),
342
343    #[error("archive generation failure {0}")]
344    ArchiveGenerationFailure(ExitStatus),
345
346    #[error("Unpack error: {0}")]
347    UnpackError(#[from] UnpackError),
348
349    #[error("source({1}) - I/O error: {0}")]
350    IoWithSource(IoError, &'static str),
351
352    #[error("could not get file name from path '{0}'")]
353    PathToFileNameError(PathBuf),
354
355    #[error("could not get str from file name '{0}'")]
356    FileNameToStrError(PathBuf),
357
358    #[error("could not parse snapshot archive's file name '{0}'")]
359    ParseSnapshotArchiveFileNameError(String),
360
361    #[error("snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot ({1}) do not match")]
362    MismatchedBaseSlot(Slot, Slot),
363
364    #[error("no snapshot archives to load from '{0}'")]
365    NoSnapshotArchives(PathBuf),
366
367    #[error("snapshot slot mismatch: deserialized bank: {0}, snapshot archive: {1}")]
368    MismatchedSlot(Slot, Slot),
369
370    #[error("snapshot hash mismatch: deserialized bank: {0:?}, snapshot archive: {1:?}")]
371    MismatchedHash(SnapshotHash, SnapshotHash),
372
373    #[error("snapshot slot deltas are invalid: {0}")]
374    VerifySlotDeltas(#[from] VerifySlotDeltasError),
375
376    #[error("snapshot epoch stakes are invalid: {0}")]
377    VerifyEpochStakes(#[from] VerifyEpochStakesError),
378
379    #[error("bank_snapshot_info new_from_dir failed: {0}")]
380    NewFromDir(#[from] SnapshotNewFromDirError),
381
382    #[error("invalid snapshot dir path '{0}'")]
383    InvalidSnapshotDirPath(PathBuf),
384
385    #[error("invalid AppendVec path '{0}'")]
386    InvalidAppendVecPath(PathBuf),
387
388    #[error("invalid account path '{0}'")]
389    InvalidAccountPath(PathBuf),
390
391    #[error("no valid snapshot dir found under '{0}'")]
392    NoSnapshotSlotDir(PathBuf),
393
394    #[error("snapshot dir account paths mismatching")]
395    AccountPathsMismatch,
396
397    #[error("failed to add bank snapshot for slot {1}: {0}")]
398    AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
399
400    #[error("failed to archive snapshot package: {0}")]
401    ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
402
403    #[error("failed to rebuild snapshot storages: {0}")]
404    RebuildStorages(String),
405}
406
407#[derive(Error, Debug)]
408pub enum SnapshotNewFromDirError {
409    #[error("invalid bank snapshot directory '{0}'")]
410    InvalidBankSnapshotDir(PathBuf),
411
412    #[error("missing status cache file '{0}'")]
413    MissingStatusCacheFile(PathBuf),
414
415    #[error("missing version file '{0}'")]
416    MissingVersionFile(PathBuf),
417
418    #[error("invalid snapshot version '{0}'")]
419    InvalidVersion(String),
420
421    #[error("snapshot directory incomplete '{0}'")]
422    IncompleteDir(PathBuf),
423
424    #[error("missing snapshot file '{0}'")]
425    MissingSnapshotFile(PathBuf),
426}
427
428pub type Result<T> = std::result::Result<T, SnapshotError>;
429
430/// Errors that can happen in `verify_slot_deltas()`
431#[derive(Error, Debug, PartialEq, Eq)]
432pub enum VerifySlotDeltasError {
433    #[error("too many entries: {0} (max: {1})")]
434    TooManyEntries(usize, usize),
435
436    #[error("slot {0} is not a root")]
437    SlotIsNotRoot(Slot),
438
439    #[error("slot {0} is greater than bank slot {1}")]
440    SlotGreaterThanMaxRoot(Slot, Slot),
441
442    #[error("slot {0} has multiple entries")]
443    SlotHasMultipleEntries(Slot),
444
445    #[error("slot {0} was not found in slot history")]
446    SlotNotFoundInHistory(Slot),
447
448    #[error("slot {0} was in history but missing from slot deltas")]
449    SlotNotFoundInDeltas(Slot),
450
451    #[error("slot history is bad and cannot be used to verify slot deltas")]
452    BadSlotHistory,
453}
454
455/// Errors that can happen in `verify_epoch_stakes()`
456#[derive(Error, Debug, PartialEq, Eq)]
457pub enum VerifyEpochStakesError {
458    #[error("epoch {0} is greater than the max {1}")]
459    EpochGreaterThanMax(Epoch, Epoch),
460
461    #[error("stakes not found for epoch {0} (required epochs: {1:?})")]
462    StakesNotFound(Epoch, RangeInclusive<Epoch>),
463}
464
465/// Errors that can happen in `add_bank_snapshot()`
466#[derive(Error, Debug)]
467pub enum AddBankSnapshotError {
468    #[error("bank snapshot dir already exists '{0}'")]
469    SnapshotDirAlreadyExists(PathBuf),
470
471    #[error("failed to create snapshot dir '{1}': {0}")]
472    CreateSnapshotDir(#[source] IoError, PathBuf),
473
474    #[error("failed to flush storage '{1}': {0}")]
475    FlushStorage(#[source] AccountsFileError, PathBuf),
476
477    #[error("failed to mark snapshot storages as 'flushed': {0}")]
478    MarkStoragesFlushed(#[source] IoError),
479
480    #[error("failed to hard link storages: {0}")]
481    HardLinkStorages(#[source] HardLinkStoragesToSnapshotError),
482
483    #[error("failed to serialize bank: {0}")]
484    SerializeBank(#[source] Box<SnapshotError>),
485
486    #[error("failed to serialize status cache: {0}")]
487    SerializeStatusCache(#[source] Box<SnapshotError>),
488
489    #[error("failed to write snapshot version file '{1}': {0}")]
490    WriteSnapshotVersionFile(#[source] IoError, PathBuf),
491
492    #[error("failed to mark snapshot as 'complete': {0}")]
493    MarkSnapshotComplete(#[source] IoError),
494}
495
496/// Errors that can happen in `archive_snapshot_package()`
497#[derive(Error, Debug)]
498pub enum ArchiveSnapshotPackageError {
499    #[error("failed to create archive path '{1}': {0}")]
500    CreateArchiveDir(#[source] IoError, PathBuf),
501
502    #[error("failed to create staging dir inside '{1}': {0}")]
503    CreateStagingDir(#[source] IoError, PathBuf),
504
505    #[error("failed to create snapshot staging dir '{1}': {0}")]
506    CreateSnapshotStagingDir(#[source] IoError, PathBuf),
507
508    #[error("failed to canonicalize snapshot source dir '{1}': {0}")]
509    CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),
510
511    #[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
512    SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),
513
514    #[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
515    SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),
516
517    #[error("failed to symlink version file from '{1}' to '{2}': {0}")]
518    SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),
519
520    #[error("failed to create archive file '{1}': {0}")]
521    CreateArchiveFile(#[source] IoError, PathBuf),
522
523    #[error("failed to archive version file: {0}")]
524    ArchiveVersionFile(#[source] IoError),
525
526    #[error("failed to archive snapshots dir: {0}")]
527    ArchiveSnapshotsDir(#[source] IoError),
528
529    #[error("failed to archive account storage file '{1}': {0}")]
530    ArchiveAccountStorageFile(#[source] IoError, PathBuf),
531
532    #[error("failed to archive snapshot: {0}")]
533    FinishArchive(#[source] IoError),
534
535    #[error("failed to create encoder: {0}")]
536    CreateEncoder(#[source] IoError),
537
538    #[error("failed to encode archive: {0}")]
539    FinishEncoder(#[source] IoError),
540
541    #[error("failed to query archive metadata '{1}': {0}")]
542    QueryArchiveMetadata(#[source] IoError, PathBuf),
543
544    #[error("failed to move archive from '{1}' to '{2}': {0}")]
545    MoveArchive(#[source] IoError, PathBuf, PathBuf),
546
547    #[error("failed to create account storage reader '{1}': {0}")]
548    AccountStorageReaderError(#[source] IoError, PathBuf),
549}
550
551/// Errors that can happen in `hard_link_storages_to_snapshot()`
552#[derive(Error, Debug)]
553pub enum HardLinkStoragesToSnapshotError {
554    #[error("failed to create accounts hard links dir '{1}': {0}")]
555    CreateAccountsHardLinksDir(#[source] IoError, PathBuf),
556
557    #[error("failed to get the snapshot's accounts hard link dir: {0}")]
558    GetSnapshotHardLinksDir(#[from] GetSnapshotAccountsHardLinkDirError),
559
560    #[error("failed to hard link storage from '{1}' to '{2}': {0}")]
561    HardLinkStorage(#[source] IoError, PathBuf, PathBuf),
562}
563
564/// Errors that can happen in `get_snapshot_accounts_hardlink_dir()`
565#[derive(Error, Debug)]
566pub enum GetSnapshotAccountsHardLinkDirError {
567    #[error("invalid account storage path '{0}'")]
568    GetAccountPath(PathBuf),
569
570    #[error("failed to create the snapshot hard link dir '{1}': {0}")]
571    CreateSnapshotHardLinkDir(#[source] IoError, PathBuf),
572
573    #[error("failed to symlink snapshot hard link dir '{link}' to '{original}': {source}")]
574    SymlinkSnapshotHardLinkDir {
575        source: IoError,
576        original: PathBuf,
577        link: PathBuf,
578    },
579}
580
581/// The account snapshot directories under <account_path>/snapshot/<slot> contain account files hardlinked
582/// from <account_path>/run taken at snapshot <slot> time.  They are referenced by the symlinks from the
583/// bank snapshot dir snapshot/<slot>/accounts_hardlinks/.  We observed that sometimes the bank snapshot dir
584/// could be deleted but the account snapshot directories were left behind, possibly by some manual operations
585/// or some legacy code not using the symlinks to clean up the account snapshot hardlink directories.
586/// This function cleans up any account snapshot directories that are no longer referenced by the bank
587/// snapshot dirs, to ensure proper snapshot operations.
588pub fn clean_orphaned_account_snapshot_dirs(
589    bank_snapshots_dir: impl AsRef<Path>,
590    account_snapshot_paths: &[PathBuf],
591) -> IoResult<()> {
592    // Create the HashSet of the account snapshot hardlink directories referenced by the snapshot dirs.
593    // This is used to clean up any hardlinks that are no longer referenced by the snapshot dirs.
594    let mut account_snapshot_dirs_referenced = HashSet::new();
595    let snapshots = get_bank_snapshots(bank_snapshots_dir);
596    for snapshot in snapshots {
597        let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
598        // loop through entries in the snapshot_hardlink_dir, read the symlinks, add the target to the HashSet
599        let Ok(read_dir) = fs::read_dir(&account_hardlinks_dir) else {
600            // The bank snapshot may not have a hard links dir with the storages.
601            // This is fine, and happens for bank snapshots we do *not* fastboot from.
602            // In this case, log it and go to the next bank snapshot.
603            debug!(
604                "failed to read account hardlinks dir '{}'",
605                account_hardlinks_dir.display(),
606            );
607            continue;
608        };
609        for entry in read_dir {
610            let path = entry?.path();
611            let target = fs::read_link(&path).map_err(|err| {
612                IoError::other(format!(
613                    "failed to read symlink '{}': {err}",
614                    path.display(),
615                ))
616            })?;
617            account_snapshot_dirs_referenced.insert(target);
618        }
619    }
620
621    // loop through the account snapshot hardlink directories, if the directory is not in the account_snapshot_dirs_referenced set, delete it
622    for account_snapshot_path in account_snapshot_paths {
623        let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
624            IoError::other(format!(
625                "failed to read account snapshot dir '{}': {err}",
626                account_snapshot_path.display(),
627            ))
628        })?;
629        for entry in read_dir {
630            let path = entry?.path();
631            if !account_snapshot_dirs_referenced.contains(&path) {
632                info!(
633                    "Removing orphaned account snapshot hardlink directory '{}'...",
634                    path.display()
635                );
636                move_and_async_delete_path(&path);
637            }
638        }
639    }
640
641    Ok(())
642}
643
644/// Purges incomplete bank snapshots
645pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
646    let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
647        // If we cannot read the bank snapshots dir, then there's nothing to do
648        return;
649    };
650
651    let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
652
653    let incomplete_dirs: Vec<_> = read_dir_iter
654        .filter_map(|entry| entry.ok())
655        .map(|entry| entry.path())
656        .filter(|path| path.is_dir())
657        .filter(is_incomplete)
658        .collect();
659
660    // attempt to purge all the incomplete directories; do not exit early
661    for incomplete_dir in incomplete_dirs {
662        let result = purge_bank_snapshot(&incomplete_dir);
663        match result {
664            Ok(_) => info!(
665                "Purged incomplete snapshot dir: {}",
666                incomplete_dir.display()
667            ),
668            Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
669        }
670    }
671}
672
673/// Is the bank snapshot complete?
674fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
675    let state_complete_path = bank_snapshot_dir
676        .as_ref()
677        .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
678    state_complete_path.is_file()
679}
680
681/// Marks the bank snapshot as complete
682fn write_snapshot_state_complete_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<()> {
683    let state_complete_path = bank_snapshot_dir
684        .as_ref()
685        .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
686    fs::File::create(&state_complete_path).map_err(|err| {
687        IoError::other(format!(
688            "failed to create file '{}': {err}",
689            state_complete_path.display(),
690        ))
691    })?;
692    Ok(())
693}
694
695/// Writes the full snapshot slot file into the bank snapshot dir
696pub fn write_full_snapshot_slot_file(
697    bank_snapshot_dir: impl AsRef<Path>,
698    full_snapshot_slot: Slot,
699) -> IoResult<()> {
700    let full_snapshot_slot_path = bank_snapshot_dir
701        .as_ref()
702        .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
703    fs::write(
704        &full_snapshot_slot_path,
705        Slot::to_le_bytes(full_snapshot_slot),
706    )
707    .map_err(|err| {
708        IoError::other(format!(
709            "failed to write full snapshot slot file '{}': {err}",
710            full_snapshot_slot_path.display(),
711        ))
712    })
713}
714
715// Reads the full snapshot slot file from the bank snapshot dir
716pub fn read_full_snapshot_slot_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<Slot> {
717    const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
718    let full_snapshot_slot_path = bank_snapshot_dir
719        .as_ref()
720        .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
721    let full_snapshot_slot_file_metadata = fs::metadata(&full_snapshot_slot_path)?;
722    if full_snapshot_slot_file_metadata.len() != SLOT_SIZE as u64 {
723        let error_message = format!(
724            "invalid full snapshot slot file size: '{}' has {} bytes (should be {} bytes)",
725            full_snapshot_slot_path.display(),
726            full_snapshot_slot_file_metadata.len(),
727            SLOT_SIZE,
728        );
729        return Err(IoError::other(error_message));
730    }
731    let mut full_snapshot_slot_file = fs::File::open(&full_snapshot_slot_path)?;
732    let mut buffer = [0; SLOT_SIZE];
733    full_snapshot_slot_file.read_exact(&mut buffer)?;
734    let slot = Slot::from_le_bytes(buffer);
735    Ok(slot)
736}
737
738/// Writes the 'snapshot storages have been flushed' file to the bank snapshot dir
739pub fn write_storages_flushed_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<()> {
740    let flushed_storages_path = bank_snapshot_dir
741        .as_ref()
742        .join(SNAPSHOT_STORAGES_FLUSHED_FILENAME);
743    fs::File::create(&flushed_storages_path).map_err(|err| {
744        IoError::other(format!(
745            "failed to create file '{}': {err}",
746            flushed_storages_path.display(),
747        ))
748    })?;
749    Ok(())
750}
751
752/// Were the snapshot storages flushed in this bank snapshot?
753fn are_bank_snapshot_storages_flushed(bank_snapshot_dir: impl AsRef<Path>) -> bool {
754    let flushed_storages = bank_snapshot_dir
755        .as_ref()
756        .join(SNAPSHOT_STORAGES_FLUSHED_FILENAME);
757    flushed_storages.is_file()
758}
759
760/// Gets the highest, loadable, bank snapshot
761///
762/// The highest bank snapshot is the one with the highest slot.
763/// To be loadable, the bank snapshot must be a BankSnapshotKind::Post.
764/// And if we're generating snapshots (e.g. running a normal validator), then
765/// the full snapshot file's slot must match the highest full snapshot archive's.
766/// Lastly, the account storages must have been flushed to be loadable.
767pub fn get_highest_loadable_bank_snapshot(
768    snapshot_config: &SnapshotConfig,
769) -> Option<BankSnapshotInfo> {
770    let highest_bank_snapshot =
771        get_highest_bank_snapshot_post(&snapshot_config.bank_snapshots_dir)?;
772
773    // If we're *not* generating snapshots, e.g. running ledger-tool, then we *can* load
774    // this bank snapshot, and we do not need to check for anything else.
775    if !snapshot_config.should_generate_snapshots() {
776        return Some(highest_bank_snapshot);
777    }
778
779    // Otherwise, the bank snapshot's full snapshot slot *must* be the same as
780    // the highest full snapshot archive's slot.
781    let highest_full_snapshot_archive_slot =
782        get_highest_full_snapshot_archive_slot(&snapshot_config.full_snapshot_archives_dir)?;
783    let full_snapshot_file_slot =
784        read_full_snapshot_slot_file(&highest_bank_snapshot.snapshot_dir).ok()?;
785    let are_storages_flushed =
786        are_bank_snapshot_storages_flushed(&highest_bank_snapshot.snapshot_dir);
787    (are_storages_flushed && (full_snapshot_file_slot == highest_full_snapshot_archive_slot))
788        .then_some(highest_bank_snapshot)
789}
790
791/// If the validator halts in the middle of `archive_snapshot_package()`, the temporary staging
792/// directory won't be cleaned up.  Call this function to clean them up.
793pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
794    if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
795        for entry in entries.flatten() {
796            if entry
797                .file_name()
798                .to_str()
799                .map(|file_name| file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX))
800                .unwrap_or(false)
801            {
802                let path = entry.path();
803                let result = if path.is_dir() {
804                    fs::remove_dir_all(&path)
805                } else {
806                    fs::remove_file(&path)
807                };
808                if let Err(err) = result {
809                    warn!(
810                        "Failed to remove temporary snapshot archive '{}': {err}",
811                        path.display(),
812                    );
813                }
814            }
815        }
816    }
817}
818
819/// Serializes and archives a snapshot package
820pub fn serialize_and_archive_snapshot_package(
821    snapshot_package: SnapshotPackage,
822    snapshot_config: &SnapshotConfig,
823    should_flush_and_hard_link_storages: bool,
824) -> Result<SnapshotArchiveInfo> {
825    let SnapshotPackage {
826        snapshot_kind,
827        slot: snapshot_slot,
828        block_height,
829        hash: snapshot_hash,
830        mut snapshot_storages,
831        status_cache_slot_deltas,
832        bank_fields_to_serialize,
833        bank_hash_stats,
834        accounts_delta_hash,
835        accounts_hash,
836        epoch_accounts_hash,
837        bank_incremental_snapshot_persistence,
838        write_version,
839        enqueued: _,
840    } = snapshot_package;
841
842    let bank_snapshot_info = serialize_snapshot(
843        &snapshot_config.bank_snapshots_dir,
844        snapshot_config.snapshot_version,
845        snapshot_storages.as_slice(),
846        status_cache_slot_deltas.as_slice(),
847        bank_fields_to_serialize,
848        bank_hash_stats,
849        accounts_delta_hash,
850        accounts_hash,
851        epoch_accounts_hash,
852        bank_incremental_snapshot_persistence.as_ref(),
853        write_version,
854        should_flush_and_hard_link_storages,
855    )?;
856
857    // now write the full snapshot slot file after serializing so this bank snapshot is loadable
858    let full_snapshot_archive_slot = match snapshot_kind {
859        SnapshotKind::FullSnapshot => snapshot_slot,
860        SnapshotKind::IncrementalSnapshot(base_slot) => base_slot,
861    };
862    write_full_snapshot_slot_file(&bank_snapshot_info.snapshot_dir, full_snapshot_archive_slot)
863        .map_err(|err| {
864            IoError::other(format!(
865                "failed to serialize snapshot slot {snapshot_slot}, block height {block_height}, kind {snapshot_kind:?}: {err}",
866            ))
867        })?;
868
869    let snapshot_archive_path = match snapshot_package.snapshot_kind {
870        SnapshotKind::FullSnapshot => build_full_snapshot_archive_path(
871            &snapshot_config.full_snapshot_archives_dir,
872            snapshot_package.slot,
873            &snapshot_package.hash,
874            snapshot_config.archive_format,
875        ),
876        SnapshotKind::IncrementalSnapshot(incremental_snapshot_base_slot) => {
877            // After the snapshot has been serialized, it is now safe (and required) to prune all
878            // the storages that are *not* to be archived for this incremental snapshot.
879            snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
880            build_incremental_snapshot_archive_path(
881                &snapshot_config.incremental_snapshot_archives_dir,
882                incremental_snapshot_base_slot,
883                snapshot_package.slot,
884                &snapshot_package.hash,
885                snapshot_config.archive_format,
886            )
887        }
888    };
889
890    let snapshot_archive_info = archive_snapshot(
891        snapshot_kind,
892        snapshot_slot,
893        snapshot_hash,
894        snapshot_storages.as_slice(),
895        &bank_snapshot_info.snapshot_dir,
896        snapshot_archive_path,
897        snapshot_config.archive_format,
898    )?;
899
900    Ok(snapshot_archive_info)
901}
902
903/// Serializes a snapshot into `bank_snapshots_dir`
904#[allow(clippy::too_many_arguments)]
905fn serialize_snapshot(
906    bank_snapshots_dir: impl AsRef<Path>,
907    snapshot_version: SnapshotVersion,
908    snapshot_storages: &[Arc<AccountStorageEntry>],
909    slot_deltas: &[BankSlotDelta],
910    mut bank_fields: BankFieldsToSerialize,
911    bank_hash_stats: BankHashStats,
912    accounts_delta_hash: AccountsDeltaHash,
913    accounts_hash: AccountsHash,
914    epoch_accounts_hash: Option<EpochAccountsHash>,
915    bank_incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
916    write_version: u64,
917    should_flush_and_hard_link_storages: bool,
918) -> Result<BankSnapshotInfo> {
919    let slot = bank_fields.slot;
920
921    // this lambda function is to facilitate converting between
922    // the AddBankSnapshotError and SnapshotError types
923    let do_serialize_snapshot = || {
924        let mut measure_everything = Measure::start("");
925        let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
926        if bank_snapshot_dir.exists() {
927            return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
928                bank_snapshot_dir,
929            ));
930        }
931        fs::create_dir_all(&bank_snapshot_dir).map_err(|err| {
932            AddBankSnapshotError::CreateSnapshotDir(err, bank_snapshot_dir.clone())
933        })?;
934
935        // the bank snapshot is stored as bank_snapshots_dir/slot/slot
936        let bank_snapshot_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
937        info!(
938            "Creating bank snapshot for slot {slot} at '{}'",
939            bank_snapshot_path.display(),
940        );
941
942        let (flush_storages_us, hard_link_storages_us) = if should_flush_and_hard_link_storages {
943            let flush_measure = Measure::start("");
944            for storage in snapshot_storages {
945                storage.flush().map_err(|err| {
946                    AddBankSnapshotError::FlushStorage(err, storage.path().to_path_buf())
947                })?;
948            }
949            let flush_us = flush_measure.end_as_us();
950            let (_, hard_link_us) = measure_us!(hard_link_storages_to_snapshot(
951                &bank_snapshot_dir,
952                slot,
953                snapshot_storages
954            )
955            .map_err(AddBankSnapshotError::HardLinkStorages)?);
956            write_storages_flushed_file(&bank_snapshot_dir)
957                .map_err(AddBankSnapshotError::MarkStoragesFlushed)?;
958            Some((flush_us, hard_link_us))
959        } else {
960            None
961        }
962        .unzip();
963
964        let bank_snapshot_serializer = move |stream: &mut BufWriter<fs::File>| -> Result<()> {
965            let versioned_epoch_stakes = mem::take(&mut bank_fields.versioned_epoch_stakes);
966            let extra_fields = ExtraFieldsToSerialize {
967                lamports_per_signature: bank_fields.fee_rate_governor.lamports_per_signature,
968                incremental_snapshot_persistence: bank_incremental_snapshot_persistence,
969                epoch_accounts_hash,
970                versioned_epoch_stakes,
971                accounts_lt_hash: bank_fields.accounts_lt_hash.clone().map(Into::into),
972            };
973            serde_snapshot::serialize_bank_snapshot_into(
974                stream,
975                bank_fields,
976                bank_hash_stats,
977                accounts_delta_hash,
978                accounts_hash,
979                &get_storages_to_serialize(snapshot_storages),
980                extra_fields,
981                write_version,
982            )?;
983            Ok(())
984        };
985        let (bank_snapshot_consumed_size, bank_serialize) = measure_time!(
986            serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
987                .map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
988            "bank serialize"
989        );
990
991        let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
992        let (status_cache_consumed_size, status_cache_serialize_us) = measure_us!(
993            snapshot_bank_utils::serialize_status_cache(slot_deltas, &status_cache_path)
994                .map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?
995        );
996
997        let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
998        let (_, write_version_file_us) = measure_us!(fs::write(
999            &version_path,
1000            snapshot_version.as_str().as_bytes(),
1001        )
1002        .map_err(|err| AddBankSnapshotError::WriteSnapshotVersionFile(err, version_path))?);
1003
1004        // Mark this directory complete so it can be used.  Check this flag first before selecting for deserialization.
1005        let (_, write_state_complete_file_us) = measure_us!({
1006            write_snapshot_state_complete_file(&bank_snapshot_dir)
1007                .map_err(AddBankSnapshotError::MarkSnapshotComplete)?
1008        });
1009
1010        measure_everything.stop();
1011
1012        // Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
1013        datapoint_info!(
1014            "snapshot_bank",
1015            ("slot", slot, i64),
1016            ("bank_size", bank_snapshot_consumed_size, i64),
1017            ("status_cache_size", status_cache_consumed_size, i64),
1018            ("flush_storages_us", flush_storages_us, Option<i64>),
1019            ("hard_link_storages_us", hard_link_storages_us, Option<i64>),
1020            ("bank_serialize_us", bank_serialize.as_us(), i64),
1021            ("status_cache_serialize_us", status_cache_serialize_us, i64),
1022            ("write_version_file_us", write_version_file_us, i64),
1023            (
1024                "write_state_complete_file_us",
1025                write_state_complete_file_us,
1026                i64
1027            ),
1028            ("total_us", measure_everything.as_us(), i64),
1029        );
1030
1031        info!(
1032            "{} for slot {} at {}",
1033            bank_serialize,
1034            slot,
1035            bank_snapshot_path.display(),
1036        );
1037
1038        Ok(BankSnapshotInfo {
1039            slot,
1040            snapshot_kind: BankSnapshotKind::Pre,
1041            snapshot_dir: bank_snapshot_dir,
1042            snapshot_version,
1043        })
1044    };
1045
1046    do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
1047}
1048
1049/// Archives a snapshot into `archive_path`
1050fn archive_snapshot(
1051    snapshot_kind: SnapshotKind,
1052    snapshot_slot: Slot,
1053    snapshot_hash: SnapshotHash,
1054    snapshot_storages: &[Arc<AccountStorageEntry>],
1055    bank_snapshot_dir: impl AsRef<Path>,
1056    archive_path: impl AsRef<Path>,
1057    archive_format: ArchiveFormat,
1058) -> Result<SnapshotArchiveInfo> {
1059    use ArchiveSnapshotPackageError as E;
1060    const SNAPSHOTS_DIR: &str = "snapshots";
1061    const ACCOUNTS_DIR: &str = "accounts";
1062    info!("Generating snapshot archive for slot {snapshot_slot}, kind: {snapshot_kind:?}");
1063
1064    let mut timer = Measure::start("snapshot_package-package_snapshots");
1065    let tar_dir = archive_path
1066        .as_ref()
1067        .parent()
1068        .expect("Tar output path is invalid");
1069
1070    fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
1071
1072    // Create the staging directories
1073    let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
1074    let staging_dir = tempfile::Builder::new()
1075        .prefix(&format!("{}{}-", staging_dir_prefix, snapshot_slot))
1076        .tempdir_in(tar_dir)
1077        .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
1078    let staging_snapshots_dir = staging_dir.path().join(SNAPSHOTS_DIR);
1079
1080    let slot_str = snapshot_slot.to_string();
1081    let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
1082    // Creates staging snapshots/<slot>/
1083    fs::create_dir_all(&staging_snapshot_dir)
1084        .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
1085
1086    // To be a source for symlinking and archiving, the path need to be an absolute path
1087    let src_snapshot_dir = bank_snapshot_dir.as_ref().canonicalize().map_err(|err| {
1088        E::CanonicalizeSnapshotSourceDir(err, bank_snapshot_dir.as_ref().to_path_buf())
1089    })?;
1090    let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
1091    let src_snapshot_file = src_snapshot_dir.join(slot_str);
1092    symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
1093        .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
1094
1095    // Following the existing archive format, the status cache is under snapshots/, not under <slot>/
1096    // like in the snapshot dir.
1097    let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1098    let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1099    symlink::symlink_file(&src_status_cache, &staging_status_cache)
1100        .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
1101
1102    // The bank snapshot has the version file, so symlink it to the correct staging path
1103    let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
1104    let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1105    symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
1106        E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
1107    })?;
1108
1109    // Tar the staging directory into the archive at `staging_archive_path`
1110    let staging_archive_path = tar_dir.join(format!(
1111        "{}{}.{}",
1112        staging_dir_prefix,
1113        snapshot_slot,
1114        archive_format.extension(),
1115    ));
1116
1117    {
1118        let archive_file = fs::File::create(&staging_archive_path)
1119            .map_err(|err| E::CreateArchiveFile(err, staging_archive_path.clone()))?;
1120
1121        let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
1122            let mut archive = tar::Builder::new(encoder);
1123            // Disable sparse file handling.  This seems to be the root cause of an issue when
1124            // upgrading v2.0 to v2.1, and the tar crate from 0.4.41 to 0.4.42.
1125            // Since the tarball will still go through compression (zstd/etc) afterwards, disabling
1126            // sparse handling in the tar itself should be fine.
1127            //
1128            // Likely introduced in [^1].  Tracking resolution in [^2].
1129            // [^1] https://github.com/alexcrichton/tar-rs/pull/375
1130            // [^2] https://github.com/alexcrichton/tar-rs/issues/403
1131            archive.sparse(false);
1132            // Serialize the version and snapshots files before accounts so we can quickly determine the version
1133            // and other bank fields. This is necessary if we want to interleave unpacking with reconstruction
1134            archive
1135                .append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
1136                .map_err(E::ArchiveVersionFile)?;
1137            archive
1138                .append_dir_all(SNAPSHOTS_DIR, &staging_snapshots_dir)
1139                .map_err(E::ArchiveSnapshotsDir)?;
1140
1141            for storage in snapshot_storages {
1142                let path_in_archive = Path::new(ACCOUNTS_DIR)
1143                    .join(AccountsFile::file_name(storage.slot(), storage.id()));
1144
1145                let reader =
1146                    AccountStorageReader::new(storage, Some(snapshot_slot)).map_err(|err| {
1147                        E::AccountStorageReaderError(err, storage.path().to_path_buf())
1148                    })?;
1149                let mut header = tar::Header::new_gnu();
1150                header.set_path(path_in_archive).map_err(|err| {
1151                    E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1152                })?;
1153                header.set_size(reader.len() as u64);
1154                header.set_cksum();
1155                archive.append(&header, reader).map_err(|err| {
1156                    E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1157                })?;
1158            }
1159
1160            archive.into_inner().map_err(E::FinishArchive)?;
1161            Ok(())
1162        };
1163
1164        match archive_format {
1165            ArchiveFormat::TarZstd { config } => {
1166                let mut encoder =
1167                    zstd::stream::Encoder::new(archive_file, config.compression_level)
1168                        .map_err(E::CreateEncoder)?;
1169                do_archive_files(&mut encoder)?;
1170                encoder.finish().map_err(E::FinishEncoder)?;
1171            }
1172            ArchiveFormat::TarLz4 => {
1173                let mut encoder = lz4::EncoderBuilder::new()
1174                    .level(1)
1175                    .build(archive_file)
1176                    .map_err(E::CreateEncoder)?;
1177                do_archive_files(&mut encoder)?;
1178                let (_output, result) = encoder.finish();
1179                result.map_err(E::FinishEncoder)?;
1180            }
1181            _ => panic!("archiving snapshot with '{archive_format}' is not supported"),
1182        };
1183    }
1184
1185    // Atomically move the archive into position for other validators to find
1186    let metadata = fs::metadata(&staging_archive_path)
1187        .map_err(|err| E::QueryArchiveMetadata(err, staging_archive_path.clone()))?;
1188    let archive_path = archive_path.as_ref().to_path_buf();
1189    fs::rename(&staging_archive_path, &archive_path)
1190        .map_err(|err| E::MoveArchive(err, staging_archive_path, archive_path.clone()))?;
1191
1192    timer.stop();
1193    info!(
1194        "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
1195        archive_path.display(),
1196        snapshot_slot,
1197        timer.as_ms(),
1198        metadata.len()
1199    );
1200
1201    datapoint_info!(
1202        "archive-snapshot-package",
1203        ("slot", snapshot_slot, i64),
1204        ("archive_format", archive_format.to_string(), String),
1205        ("duration_ms", timer.as_ms(), i64),
1206        (
1207            if snapshot_kind.is_full_snapshot() {
1208                "full-snapshot-archive-size"
1209            } else {
1210                "incremental-snapshot-archive-size"
1211            },
1212            metadata.len(),
1213            i64
1214        ),
1215    );
1216    Ok(SnapshotArchiveInfo {
1217        path: archive_path,
1218        slot: snapshot_slot,
1219        hash: snapshot_hash,
1220        archive_format,
1221    })
1222}
1223
1224/// Get the bank snapshots in a directory
1225pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1226    let mut bank_snapshots = Vec::default();
1227    match fs::read_dir(&bank_snapshots_dir) {
1228        Err(err) => {
1229            info!(
1230                "Unable to read bank snapshots directory '{}': {err}",
1231                bank_snapshots_dir.as_ref().display(),
1232            );
1233        }
1234        Ok(paths) => paths
1235            .filter_map(|entry| {
1236                // check if this entry is a directory and only a Slot
1237                // bank snapshots are bank_snapshots_dir/slot/slot(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION)
1238                entry
1239                    .ok()
1240                    .filter(|entry| entry.path().is_dir())
1241                    .and_then(|entry| {
1242                        entry
1243                            .path()
1244                            .file_name()
1245                            .and_then(|file_name| file_name.to_str())
1246                            .and_then(|file_name| file_name.parse::<Slot>().ok())
1247                    })
1248            })
1249            .for_each(
1250                |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
1251                    Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
1252                    // Other threads may be modifying bank snapshots in parallel; only return
1253                    // snapshots that are complete as deemed by BankSnapshotInfo::new_from_dir()
1254                    Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
1255                },
1256            ),
1257    }
1258    bank_snapshots
1259}
1260
1261/// Get the bank snapshots in a directory
1262///
1263/// This function retains only the bank snapshots of kind BankSnapshotKind::Pre
1264pub fn get_bank_snapshots_pre(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1265    let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1266    bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Pre);
1267    bank_snapshots
1268}
1269
1270/// Get the bank snapshots in a directory
1271///
1272/// This function retains only the bank snapshots of kind BankSnapshotKind::Post
1273pub fn get_bank_snapshots_post(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1274    let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1275    bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Post);
1276    bank_snapshots
1277}
1278
1279/// Get the bank snapshot with the highest slot in a directory
1280///
1281/// This function gets the highest bank snapshot of kind BankSnapshotKind::Pre
1282pub fn get_highest_bank_snapshot_pre(
1283    bank_snapshots_dir: impl AsRef<Path>,
1284) -> Option<BankSnapshotInfo> {
1285    do_get_highest_bank_snapshot(get_bank_snapshots_pre(bank_snapshots_dir))
1286}
1287
1288/// Get the bank snapshot with the highest slot in a directory
1289///
1290/// This function gets the highest bank snapshot of kind BankSnapshotKind::Post
1291pub fn get_highest_bank_snapshot_post(
1292    bank_snapshots_dir: impl AsRef<Path>,
1293) -> Option<BankSnapshotInfo> {
1294    do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
1295}
1296
1297/// Get the bank snapshot with the highest slot in a directory
1298///
1299/// This function gets the highest bank snapshot of any kind
1300pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
1301    do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
1302}
1303
1304fn do_get_highest_bank_snapshot(
1305    mut bank_snapshots: Vec<BankSnapshotInfo>,
1306) -> Option<BankSnapshotInfo> {
1307    bank_snapshots.sort_unstable();
1308    bank_snapshots.into_iter().next_back()
1309}
1310
1311pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
1312where
1313    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1314{
1315    serialize_snapshot_data_file_capped::<F>(
1316        data_file_path,
1317        MAX_SNAPSHOT_DATA_FILE_SIZE,
1318        serializer,
1319    )
1320}
1321
1322pub fn deserialize_snapshot_data_file<T: Sized>(
1323    data_file_path: &Path,
1324    deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
1325) -> Result<T> {
1326    let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
1327        deserializer(streams.full_snapshot_stream)
1328    };
1329
1330    let wrapped_data_file_path = SnapshotRootPaths {
1331        full_snapshot_root_file_path: data_file_path.to_path_buf(),
1332        incremental_snapshot_root_file_path: None,
1333    };
1334
1335    deserialize_snapshot_data_files_capped(
1336        &wrapped_data_file_path,
1337        MAX_SNAPSHOT_DATA_FILE_SIZE,
1338        wrapped_deserializer,
1339    )
1340}
1341
1342pub fn deserialize_snapshot_data_files<T: Sized>(
1343    snapshot_root_paths: &SnapshotRootPaths,
1344    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1345) -> Result<T> {
1346    deserialize_snapshot_data_files_capped(
1347        snapshot_root_paths,
1348        MAX_SNAPSHOT_DATA_FILE_SIZE,
1349        deserializer,
1350    )
1351}
1352
1353fn serialize_snapshot_data_file_capped<F>(
1354    data_file_path: &Path,
1355    maximum_file_size: u64,
1356    serializer: F,
1357) -> Result<u64>
1358where
1359    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1360{
1361    let data_file = fs::File::create(data_file_path)?;
1362    let mut data_file_stream = BufWriter::new(data_file);
1363    serializer(&mut data_file_stream)?;
1364    data_file_stream.flush()?;
1365
1366    let consumed_size = data_file_stream.stream_position()?;
1367    if consumed_size > maximum_file_size {
1368        let error_message = format!(
1369            "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
1370            data_file_path.display(),
1371        );
1372        return Err(IoError::other(error_message).into());
1373    }
1374    Ok(consumed_size)
1375}
1376
1377fn deserialize_snapshot_data_files_capped<T: Sized>(
1378    snapshot_root_paths: &SnapshotRootPaths,
1379    maximum_file_size: u64,
1380    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1381) -> Result<T> {
1382    let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
1383        create_snapshot_data_file_stream(
1384            &snapshot_root_paths.full_snapshot_root_file_path,
1385            maximum_file_size,
1386        )?;
1387
1388    let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
1389        if let Some(ref incremental_snapshot_root_file_path) =
1390            snapshot_root_paths.incremental_snapshot_root_file_path
1391        {
1392            Some(create_snapshot_data_file_stream(
1393                incremental_snapshot_root_file_path,
1394                maximum_file_size,
1395            )?)
1396        } else {
1397            None
1398        }
1399        .unzip();
1400
1401    let mut snapshot_streams = SnapshotStreams {
1402        full_snapshot_stream: &mut full_snapshot_data_file_stream,
1403        incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
1404    };
1405    let ret = deserializer(&mut snapshot_streams)?;
1406
1407    check_deserialize_file_consumed(
1408        full_snapshot_file_size,
1409        &snapshot_root_paths.full_snapshot_root_file_path,
1410        &mut full_snapshot_data_file_stream,
1411    )?;
1412
1413    if let Some(ref incremental_snapshot_root_file_path) =
1414        snapshot_root_paths.incremental_snapshot_root_file_path
1415    {
1416        check_deserialize_file_consumed(
1417            incremental_snapshot_file_size.unwrap(),
1418            incremental_snapshot_root_file_path,
1419            incremental_snapshot_data_file_stream.as_mut().unwrap(),
1420        )?;
1421    }
1422
1423    Ok(ret)
1424}
1425
1426/// Before running the deserializer function, perform common operations on the snapshot archive
1427/// files, such as checking the file size and opening the file into a stream.
1428fn create_snapshot_data_file_stream(
1429    snapshot_root_file_path: impl AsRef<Path>,
1430    maximum_file_size: u64,
1431) -> Result<(u64, BufReader<std::fs::File>)> {
1432    let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
1433
1434    if snapshot_file_size > maximum_file_size {
1435        let error_message = format!(
1436            "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
1437            snapshot_root_file_path.as_ref().display(),
1438            snapshot_file_size,
1439            maximum_file_size,
1440        );
1441        return Err(IoError::other(error_message).into());
1442    }
1443
1444    let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
1445    let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
1446
1447    Ok((snapshot_file_size, snapshot_data_file_stream))
1448}
1449
1450/// After running the deserializer function, perform common checks to ensure the snapshot archive
1451/// files were consumed correctly.
1452fn check_deserialize_file_consumed(
1453    file_size: u64,
1454    file_path: impl AsRef<Path>,
1455    file_stream: &mut BufReader<std::fs::File>,
1456) -> Result<()> {
1457    let consumed_size = file_stream.stream_position()?;
1458
1459    if consumed_size != file_size {
1460        let error_message = format!(
1461            "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to deserialize",
1462            file_path.as_ref().display(),
1463            file_size,
1464            consumed_size,
1465        );
1466        return Err(IoError::other(error_message).into());
1467    }
1468
1469    Ok(())
1470}
1471
1472/// Return account path from the appendvec path after checking its format.
1473fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
1474    let run_path = appendvec_path.parent()?;
1475    let run_file_name = run_path.file_name()?;
1476    // All appendvec files should be under <account_path>/run/.
1477    // When generating the bank snapshot directory, they are hardlinked to <account_path>/snapshot/<slot>/
1478    if run_file_name != ACCOUNTS_RUN_DIR {
1479        error!(
1480            "The account path {} does not have run/ as its immediate parent directory.",
1481            run_path.display()
1482        );
1483        return None;
1484    }
1485    let account_path = run_path.parent()?;
1486    Some(account_path.to_path_buf())
1487}
1488
1489/// From an appendvec path, derive the snapshot hardlink path.  If the corresponding snapshot hardlink
1490/// directory does not exist, create it.
1491fn get_snapshot_accounts_hardlink_dir(
1492    appendvec_path: &Path,
1493    bank_slot: Slot,
1494    account_paths: &mut HashSet<PathBuf>,
1495    hardlinks_dir: impl AsRef<Path>,
1496) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1497    let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1498        GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1499    })?;
1500
1501    let snapshot_hardlink_dir = account_path
1502        .join(ACCOUNTS_SNAPSHOT_DIR)
1503        .join(bank_slot.to_string());
1504
1505    // Use the hashset to track, to avoid checking the file system.  Only set up the hardlink directory
1506    // and the symlink to it at the first time of seeing the account_path.
1507    if !account_paths.contains(&account_path) {
1508        let idx = account_paths.len();
1509        debug!(
1510            "for appendvec_path {}, create hard-link path {}",
1511            appendvec_path.display(),
1512            snapshot_hardlink_dir.display()
1513        );
1514        fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1515            GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1516                err,
1517                snapshot_hardlink_dir.clone(),
1518            )
1519        })?;
1520        let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1521        symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1522            GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1523                source: err,
1524                original: snapshot_hardlink_dir.clone(),
1525                link: symlink_path,
1526            }
1527        })?;
1528        account_paths.insert(account_path);
1529    };
1530
1531    Ok(snapshot_hardlink_dir)
1532}
1533
1534/// Hard-link the files from accounts/ to snapshot/<bank_slot>/accounts/
1535/// This keeps the appendvec files alive and with the bank snapshot.  The slot and id
1536/// in the file names are also updated in case its file is a recycled one with inconsistent slot
1537/// and id.
1538pub fn hard_link_storages_to_snapshot(
1539    bank_snapshot_dir: impl AsRef<Path>,
1540    bank_slot: Slot,
1541    snapshot_storages: &[Arc<AccountStorageEntry>],
1542) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1543    let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1544    fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1545        HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1546            err,
1547            accounts_hardlinks_dir.clone(),
1548        )
1549    })?;
1550
1551    let mut account_paths: HashSet<PathBuf> = HashSet::new();
1552    for storage in snapshot_storages {
1553        let storage_path = storage.accounts.path();
1554        let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1555            storage_path,
1556            bank_slot,
1557            &mut account_paths,
1558            &accounts_hardlinks_dir,
1559        )?;
1560        // The appendvec could be recycled, so its filename may not be consistent to the slot and id.
1561        // Use the storage slot and id to compose a consistent file name for the hard-link file.
1562        let hardlink_filename = AccountsFile::file_name(storage.slot(), storage.id());
1563        let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1564        fs::hard_link(storage_path, &hard_link_path).map_err(|err| {
1565            HardLinkStoragesToSnapshotError::HardLinkStorage(
1566                err,
1567                storage_path.to_path_buf(),
1568                hard_link_path,
1569            )
1570        })?;
1571    }
1572    Ok(())
1573}
1574
1575/// serializing needs Vec<Vec<Arc<AccountStorageEntry>>>, but data structure at runtime is Vec<Arc<AccountStorageEntry>>
1576/// translates to what we need
1577pub(crate) fn get_storages_to_serialize(
1578    snapshot_storages: &[Arc<AccountStorageEntry>],
1579) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1580    snapshot_storages
1581        .iter()
1582        .map(|storage| vec![Arc::clone(storage)])
1583        .collect::<Vec<_>>()
1584}
1585
1586// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later.
1587const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
1588
1589/// Unarchives the given full and incremental snapshot archives, as long as they are compatible.
1590pub fn verify_and_unarchive_snapshots(
1591    bank_snapshots_dir: impl AsRef<Path>,
1592    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1593    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1594    account_paths: &[PathBuf],
1595    storage_access: StorageAccess,
1596) -> Result<(UnarchivedSnapshots, UnarchivedSnapshotsGuard)> {
1597    check_are_snapshots_compatible(
1598        full_snapshot_archive_info,
1599        incremental_snapshot_archive_info,
1600    )?;
1601
1602    let parallel_divisions = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT);
1603
1604    let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
1605    let UnarchivedSnapshot {
1606        unpack_dir: full_unpack_dir,
1607        storage: full_storage,
1608        bank_fields: full_bank_fields,
1609        accounts_db_fields: full_accounts_db_fields,
1610        unpacked_snapshots_dir_and_version: full_unpacked_snapshots_dir_and_version,
1611        measure_untar: full_measure_untar,
1612    } = unarchive_snapshot(
1613        &bank_snapshots_dir,
1614        TMP_SNAPSHOT_ARCHIVE_PREFIX,
1615        full_snapshot_archive_info.path(),
1616        "snapshot untar",
1617        account_paths,
1618        full_snapshot_archive_info.archive_format(),
1619        parallel_divisions,
1620        next_append_vec_id.clone(),
1621        storage_access,
1622    )?;
1623
1624    let (
1625        incremental_unpack_dir,
1626        incremental_storage,
1627        incremental_bank_fields,
1628        incremental_accounts_db_fields,
1629        incremental_unpacked_snapshots_dir_and_version,
1630        incremental_measure_untar,
1631    ) = if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1632        let UnarchivedSnapshot {
1633            unpack_dir,
1634            storage,
1635            bank_fields,
1636            accounts_db_fields,
1637            unpacked_snapshots_dir_and_version,
1638            measure_untar,
1639        } = unarchive_snapshot(
1640            &bank_snapshots_dir,
1641            TMP_SNAPSHOT_ARCHIVE_PREFIX,
1642            incremental_snapshot_archive_info.path(),
1643            "incremental snapshot untar",
1644            account_paths,
1645            incremental_snapshot_archive_info.archive_format(),
1646            parallel_divisions,
1647            next_append_vec_id.clone(),
1648            storage_access,
1649        )?;
1650        (
1651            Some(unpack_dir),
1652            Some(storage),
1653            Some(bank_fields),
1654            Some(accounts_db_fields),
1655            Some(unpacked_snapshots_dir_and_version),
1656            Some(measure_untar),
1657        )
1658    } else {
1659        (None, None, None, None, None, None)
1660    };
1661
1662    let bank_fields = SnapshotBankFields::new(full_bank_fields, incremental_bank_fields);
1663    let accounts_db_fields =
1664        SnapshotAccountsDbFields::new(full_accounts_db_fields, incremental_accounts_db_fields);
1665    let next_append_vec_id = Arc::try_unwrap(next_append_vec_id).unwrap();
1666
1667    Ok((
1668        UnarchivedSnapshots {
1669            full_storage,
1670            incremental_storage,
1671            bank_fields,
1672            accounts_db_fields,
1673            full_unpacked_snapshots_dir_and_version,
1674            incremental_unpacked_snapshots_dir_and_version,
1675            full_measure_untar,
1676            incremental_measure_untar,
1677            next_append_vec_id,
1678        },
1679        UnarchivedSnapshotsGuard {
1680            full_unpack_dir,
1681            incremental_unpack_dir,
1682        },
1683    ))
1684}
1685
1686/// Spawns a thread for unpacking a snapshot
1687fn spawn_unpack_snapshot_thread(
1688    file_sender: Sender<PathBuf>,
1689    account_paths: Arc<Vec<PathBuf>>,
1690    ledger_dir: Arc<PathBuf>,
1691    mut archive: Archive<SharedBufferReader>,
1692    parallel_selector: Option<ParallelSelector>,
1693    thread_index: usize,
1694) -> JoinHandle<()> {
1695    Builder::new()
1696        .name(format!("solUnpkSnpsht{thread_index:02}"))
1697        .spawn(move || {
1698            hardened_unpack::streaming_unpack_snapshot(
1699                &mut archive,
1700                ledger_dir.as_path(),
1701                &account_paths,
1702                parallel_selector,
1703                &file_sender,
1704            )
1705            .unwrap();
1706        })
1707        .unwrap()
1708}
1709
1710/// Streams unpacked files across channel
1711fn streaming_unarchive_snapshot(
1712    file_sender: Sender<PathBuf>,
1713    account_paths: Vec<PathBuf>,
1714    ledger_dir: PathBuf,
1715    snapshot_archive_path: PathBuf,
1716    archive_format: ArchiveFormat,
1717    num_threads: usize,
1718) -> Vec<JoinHandle<()>> {
1719    let account_paths = Arc::new(account_paths);
1720    let ledger_dir = Arc::new(ledger_dir);
1721    let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format);
1722
1723    // All shared buffer readers need to be created before the threads are spawned
1724    let archives: Vec<_> = (0..num_threads)
1725        .map(|_| {
1726            let reader = SharedBufferReader::new(&shared_buffer);
1727            Archive::new(reader)
1728        })
1729        .collect();
1730
1731    archives
1732        .into_iter()
1733        .enumerate()
1734        .map(|(thread_index, archive)| {
1735            let parallel_selector = Some(ParallelSelector {
1736                index: thread_index,
1737                divisions: num_threads,
1738            });
1739
1740            spawn_unpack_snapshot_thread(
1741                file_sender.clone(),
1742                account_paths.clone(),
1743                ledger_dir.clone(),
1744                archive,
1745                parallel_selector,
1746                thread_index,
1747            )
1748        })
1749        .collect()
1750}
1751
1752/// Used to determine if a filename is structured like a version file, bank file, or storage file
1753#[derive(PartialEq, Debug)]
1754enum SnapshotFileKind {
1755    Version,
1756    BankFields,
1757    Storage,
1758}
1759
1760/// Determines `SnapshotFileKind` for `filename` if any
1761fn get_snapshot_file_kind(filename: &str) -> Option<SnapshotFileKind> {
1762    static VERSION_FILE_REGEX: LazyLock<Regex> =
1763        LazyLock::new(|| Regex::new(r"^version$").unwrap());
1764    static BANK_FIELDS_FILE_REGEX: LazyLock<Regex> =
1765        LazyLock::new(|| Regex::new(r"^[0-9]+(\.pre)?$").unwrap());
1766
1767    if VERSION_FILE_REGEX.is_match(filename) {
1768        Some(SnapshotFileKind::Version)
1769    } else if BANK_FIELDS_FILE_REGEX.is_match(filename) {
1770        Some(SnapshotFileKind::BankFields)
1771    } else if get_slot_and_append_vec_id(filename).is_ok() {
1772        Some(SnapshotFileKind::Storage)
1773    } else {
1774        None
1775    }
1776}
1777
1778/// Waits for snapshot file
1779/// Due to parallel unpacking, we may receive some append_vec files before the snapshot file
1780/// This function will push append_vec files into a buffer until we receive the snapshot file
1781fn get_version_and_snapshot_files(
1782    file_receiver: &Receiver<PathBuf>,
1783) -> (PathBuf, PathBuf, Vec<PathBuf>) {
1784    let mut append_vec_files = Vec::with_capacity(1024);
1785    let mut snapshot_version_path = None;
1786    let mut snapshot_file_path = None;
1787
1788    loop {
1789        if let Ok(path) = file_receiver.recv() {
1790            let filename = path.file_name().unwrap().to_str().unwrap();
1791            match get_snapshot_file_kind(filename) {
1792                Some(SnapshotFileKind::Version) => {
1793                    snapshot_version_path = Some(path);
1794
1795                    // break if we have both the snapshot file and the version file
1796                    if snapshot_file_path.is_some() {
1797                        break;
1798                    }
1799                }
1800                Some(SnapshotFileKind::BankFields) => {
1801                    snapshot_file_path = Some(path);
1802
1803                    // break if we have both the snapshot file and the version file
1804                    if snapshot_version_path.is_some() {
1805                        break;
1806                    }
1807                }
1808                Some(SnapshotFileKind::Storage) => {
1809                    append_vec_files.push(path);
1810                }
1811                None => {} // do nothing for other kinds of files
1812            }
1813        } else {
1814            panic!("did not receive snapshot file from unpacking threads");
1815        }
1816    }
1817    let snapshot_version_path = snapshot_version_path.unwrap();
1818    let snapshot_file_path = snapshot_file_path.unwrap();
1819
1820    (snapshot_version_path, snapshot_file_path, append_vec_files)
1821}
1822
1823/// Fields and information parsed from the snapshot.
1824struct SnapshotFieldsBundle {
1825    snapshot_version: SnapshotVersion,
1826    bank_fields: BankFieldsToDeserialize,
1827    accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
1828    append_vec_files: Vec<PathBuf>,
1829}
1830
1831/// Parses fields and information from the snapshot files provided by
1832/// `file_receiver`.
1833fn snapshot_fields_from_files(file_receiver: &Receiver<PathBuf>) -> Result<SnapshotFieldsBundle> {
1834    let (snapshot_version_path, snapshot_file_path, append_vec_files) =
1835        get_version_and_snapshot_files(file_receiver);
1836    let snapshot_version_str = snapshot_version_from_file(snapshot_version_path)?;
1837    let snapshot_version = snapshot_version_str.parse().map_err(|err| {
1838        IoError::other(format!(
1839            "unsupported snapshot version '{snapshot_version_str}': {err}",
1840        ))
1841    })?;
1842
1843    let snapshot_file = fs::File::open(snapshot_file_path).unwrap();
1844    let mut snapshot_stream = BufReader::new(snapshot_file);
1845    let (bank_fields, accounts_db_fields) = match snapshot_version {
1846        SnapshotVersion::V1_2_0 => serde_snapshot::fields_from_stream(&mut snapshot_stream)?,
1847    };
1848
1849    Ok(SnapshotFieldsBundle {
1850        snapshot_version,
1851        bank_fields,
1852        accounts_db_fields,
1853        append_vec_files,
1854    })
1855}
1856
1857/// BankSnapshotInfo::new_from_dir() requires a few meta files to accept a snapshot dir
1858/// as a valid one.  A dir unpacked from an archive lacks these files.  Fill them here to
1859/// allow new_from_dir() checks to pass.  These checks are not needed for unpacked dirs,
1860/// but it is not clean to add another flag to new_from_dir() to skip them.
1861fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1862    let snapshots_dir = unpack_dir.as_ref().join("snapshots");
1863    if !snapshots_dir.is_dir() {
1864        return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1865    }
1866
1867    // The unpacked dir has a single slot dir, which is the snapshot slot dir.
1868    let slot_dir = std::fs::read_dir(&snapshots_dir)
1869        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1870        .find(|entry| entry.as_ref().unwrap().path().is_dir())
1871        .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1872        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1873        .path();
1874
1875    let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
1876    fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
1877
1878    let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1879    fs::hard_link(
1880        status_cache_file,
1881        slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
1882    )?;
1883
1884    write_snapshot_state_complete_file(slot_dir)?;
1885
1886    Ok(())
1887}
1888
1889/// Perform the common tasks when unarchiving a snapshot.  Handles creating the temporary
1890/// directories, untaring, reading the version file, and then returning those fields plus the
1891/// rebuilt storage
1892fn unarchive_snapshot(
1893    bank_snapshots_dir: impl AsRef<Path>,
1894    unpacked_snapshots_dir_prefix: &'static str,
1895    snapshot_archive_path: impl AsRef<Path>,
1896    measure_name: &'static str,
1897    account_paths: &[PathBuf],
1898    archive_format: ArchiveFormat,
1899    parallel_divisions: usize,
1900    next_append_vec_id: Arc<AtomicAccountsFileId>,
1901    storage_access: StorageAccess,
1902) -> Result<UnarchivedSnapshot> {
1903    let unpack_dir = tempfile::Builder::new()
1904        .prefix(unpacked_snapshots_dir_prefix)
1905        .tempdir_in(bank_snapshots_dir)?;
1906    let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
1907
1908    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1909    streaming_unarchive_snapshot(
1910        file_sender,
1911        account_paths.to_vec(),
1912        unpack_dir.path().to_path_buf(),
1913        snapshot_archive_path.as_ref().to_path_buf(),
1914        archive_format,
1915        parallel_divisions,
1916    );
1917
1918    let num_rebuilder_threads = num_cpus::get_physical()
1919        .saturating_sub(parallel_divisions)
1920        .max(1);
1921    let SnapshotFieldsBundle {
1922        snapshot_version,
1923        bank_fields,
1924        accounts_db_fields,
1925        append_vec_files,
1926        ..
1927    } = snapshot_fields_from_files(&file_receiver)?;
1928    let (storage, measure_untar) = measure_time!(
1929        SnapshotStorageRebuilder::rebuild_storage(
1930            &accounts_db_fields,
1931            append_vec_files,
1932            file_receiver,
1933            num_rebuilder_threads,
1934            next_append_vec_id,
1935            SnapshotFrom::Archive,
1936            storage_access,
1937        )?,
1938        measure_name
1939    );
1940    info!("{}", measure_untar);
1941
1942    create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1943
1944    Ok(UnarchivedSnapshot {
1945        unpack_dir,
1946        storage,
1947        bank_fields,
1948        accounts_db_fields,
1949        unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1950            unpacked_snapshots_dir,
1951            snapshot_version,
1952        },
1953        measure_untar,
1954    })
1955}
1956
1957/// Streams snapshot dir files across channel
1958/// Follow the flow of streaming_unarchive_snapshot(), but handle the from_dir case.
1959fn streaming_snapshot_dir_files(
1960    file_sender: Sender<PathBuf>,
1961    snapshot_file_path: impl Into<PathBuf>,
1962    snapshot_version_path: impl Into<PathBuf>,
1963    account_paths: &[PathBuf],
1964) -> Result<()> {
1965    file_sender.send(snapshot_file_path.into())?;
1966    file_sender.send(snapshot_version_path.into())?;
1967
1968    for account_path in account_paths {
1969        for file in fs::read_dir(account_path)? {
1970            file_sender.send(file?.path())?;
1971        }
1972    }
1973
1974    Ok(())
1975}
1976
1977/// Performs the common tasks when deserializing a snapshot
1978///
1979/// Handles reading the snapshot file and version file,
1980/// then returning those fields plus the rebuilt storages.
1981pub fn rebuild_storages_from_snapshot_dir(
1982    snapshot_info: &BankSnapshotInfo,
1983    account_paths: &[PathBuf],
1984    next_append_vec_id: Arc<AtomicAccountsFileId>,
1985    storage_access: StorageAccess,
1986) -> Result<(
1987    AccountStorageMap,
1988    BankFieldsToDeserialize,
1989    AccountsDbFields<SerializableAccountStorageEntry>,
1990)> {
1991    let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1992    let accounts_hardlinks = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1993    let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1994
1995    let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1996        IoError::other(format!(
1997            "failed to read accounts hardlinks dir '{}': {err}",
1998            accounts_hardlinks.display(),
1999        ))
2000    })?;
2001    for dir_entry in read_dir {
2002        let symlink_path = dir_entry?.path();
2003        // The symlink point to <account_path>/snapshot/<slot> which contain the account files hardlinks
2004        // The corresponding run path should be <account_path>/run/
2005        let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
2006            IoError::other(format!(
2007                "failed to read symlink '{}': {err}",
2008                symlink_path.display(),
2009            ))
2010        })?;
2011        let account_run_path = account_snapshot_path
2012            .parent()
2013            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
2014            .parent()
2015            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
2016            .join(ACCOUNTS_RUN_DIR);
2017        if !account_run_paths.contains(&account_run_path) {
2018            // The appendvec from the bank snapshot storage does not match any of the provided account_paths set.
2019            // The accout paths have changed so the snapshot is no longer usable.
2020            return Err(SnapshotError::AccountPathsMismatch);
2021        }
2022        // Generate hard-links to make the account files available in the main accounts/, and let the new appendvec
2023        // paths be in accounts/
2024        let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
2025            IoError::other(format!(
2026                "failed to read account snapshot dir '{}': {err}",
2027                account_snapshot_path.display(),
2028            ))
2029        })?;
2030        for file in read_dir {
2031            let file_path = file?.path();
2032            let file_name = file_path
2033                .file_name()
2034                .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
2035            let dest_path = account_run_path.join(file_name);
2036            fs::hard_link(&file_path, &dest_path).map_err(|err| {
2037                IoError::other(format!(
2038                    "failed to hard link from '{}' to '{}': {err}",
2039                    file_path.display(),
2040                    dest_path.display(),
2041                ))
2042            })?;
2043        }
2044    }
2045
2046    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
2047    let snapshot_file_path = &snapshot_info.snapshot_path();
2048    let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
2049    streaming_snapshot_dir_files(
2050        file_sender,
2051        snapshot_file_path,
2052        snapshot_version_path,
2053        account_paths,
2054    )?;
2055
2056    let SnapshotFieldsBundle {
2057        bank_fields,
2058        accounts_db_fields,
2059        append_vec_files,
2060        ..
2061    } = snapshot_fields_from_files(&file_receiver)?;
2062
2063    let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
2064    let storage = SnapshotStorageRebuilder::rebuild_storage(
2065        &accounts_db_fields,
2066        append_vec_files,
2067        file_receiver,
2068        num_rebuilder_threads,
2069        next_append_vec_id,
2070        SnapshotFrom::Dir,
2071        storage_access,
2072    )?;
2073
2074    Ok((storage, bank_fields, accounts_db_fields))
2075}
2076
2077/// Reads the `snapshot_version` from a file. Before opening the file, its size
2078/// is compared to `MAX_SNAPSHOT_VERSION_FILE_SIZE`. If the size exceeds this
2079/// threshold, it is not opened and an error is returned.
2080fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
2081    // Check file size.
2082    let file_metadata = fs::metadata(&path).map_err(|err| {
2083        IoError::other(format!(
2084            "failed to query snapshot version file metadata '{}': {err}",
2085            path.as_ref().display(),
2086        ))
2087    })?;
2088    let file_size = file_metadata.len();
2089    if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
2090        let error_message = format!(
2091            "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
2092            path.as_ref().display(),
2093            file_size,
2094            MAX_SNAPSHOT_VERSION_FILE_SIZE,
2095        );
2096        return Err(IoError::other(error_message).into());
2097    }
2098
2099    // Read snapshot_version from file.
2100    let mut snapshot_version = String::new();
2101    let mut file = fs::File::open(&path).map_err(|err| {
2102        IoError::other(format!(
2103            "failed to open snapshot version file '{}': {err}",
2104            path.as_ref().display()
2105        ))
2106    })?;
2107    file.read_to_string(&mut snapshot_version).map_err(|err| {
2108        IoError::other(format!(
2109            "failed to read snapshot version from file '{}': {err}",
2110            path.as_ref().display()
2111        ))
2112    })?;
2113
2114    Ok(snapshot_version.trim().to_string())
2115}
2116
2117/// Check if an incremental snapshot is compatible with a full snapshot.  This is done by checking
2118/// if the incremental snapshot's base slot is the same as the full snapshot's slot.
2119fn check_are_snapshots_compatible(
2120    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
2121    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
2122) -> Result<()> {
2123    if incremental_snapshot_archive_info.is_none() {
2124        return Ok(());
2125    }
2126
2127    let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
2128
2129    (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
2130        .then_some(())
2131        .ok_or_else(|| {
2132            SnapshotError::MismatchedBaseSlot(
2133                full_snapshot_archive_info.slot(),
2134                incremental_snapshot_archive_info.base_slot(),
2135            )
2136        })
2137}
2138
2139/// Get the `&str` from a `&Path`
2140pub fn path_to_file_name_str(path: &Path) -> Result<&str> {
2141    path.file_name()
2142        .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))?
2143        .to_str()
2144        .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf()))
2145}
2146
2147pub fn build_snapshot_archives_remote_dir(snapshot_archives_dir: impl AsRef<Path>) -> PathBuf {
2148    snapshot_archives_dir
2149        .as_ref()
2150        .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
2151}
2152
2153/// Build the full snapshot archive path from its components: the snapshot archives directory, the
2154/// snapshot slot, the accounts hash, and the archive format.
2155pub fn build_full_snapshot_archive_path(
2156    full_snapshot_archives_dir: impl AsRef<Path>,
2157    slot: Slot,
2158    hash: &SnapshotHash,
2159    archive_format: ArchiveFormat,
2160) -> PathBuf {
2161    full_snapshot_archives_dir.as_ref().join(format!(
2162        "snapshot-{}-{}.{}",
2163        slot,
2164        hash.0,
2165        archive_format.extension(),
2166    ))
2167}
2168
2169/// Build the incremental snapshot archive path from its components: the snapshot archives
2170/// directory, the snapshot base slot, the snapshot slot, the accounts hash, and the archive
2171/// format.
2172pub fn build_incremental_snapshot_archive_path(
2173    incremental_snapshot_archives_dir: impl AsRef<Path>,
2174    base_slot: Slot,
2175    slot: Slot,
2176    hash: &SnapshotHash,
2177    archive_format: ArchiveFormat,
2178) -> PathBuf {
2179    incremental_snapshot_archives_dir.as_ref().join(format!(
2180        "incremental-snapshot-{}-{}-{}.{}",
2181        base_slot,
2182        slot,
2183        hash.0,
2184        archive_format.extension(),
2185    ))
2186}
2187
2188/// Parse a full snapshot archive filename into its Slot, Hash, and Archive Format
2189pub(crate) fn parse_full_snapshot_archive_filename(
2190    archive_filename: &str,
2191) -> Result<(Slot, SnapshotHash, ArchiveFormat)> {
2192    static RE: std::sync::LazyLock<Regex> =
2193        std::sync::LazyLock::new(|| Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap());
2194
2195    let do_parse = || {
2196        RE.captures(archive_filename).and_then(|captures| {
2197            let slot = captures
2198                .name("slot")
2199                .map(|x| x.as_str().parse::<Slot>())?
2200                .ok()?;
2201            let hash = captures
2202                .name("hash")
2203                .map(|x| x.as_str().parse::<Hash>())?
2204                .ok()?;
2205            let archive_format = captures
2206                .name("ext")
2207                .map(|x| x.as_str().parse::<ArchiveFormat>())?
2208                .ok()?;
2209
2210            Some((slot, SnapshotHash(hash), archive_format))
2211        })
2212    };
2213
2214    do_parse().ok_or_else(|| {
2215        SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2216    })
2217}
2218
2219/// Parse an incremental snapshot archive filename into its base Slot, actual Slot, Hash, and Archive Format
2220pub(crate) fn parse_incremental_snapshot_archive_filename(
2221    archive_filename: &str,
2222) -> Result<(Slot, Slot, SnapshotHash, ArchiveFormat)> {
2223    static RE: std::sync::LazyLock<Regex> = std::sync::LazyLock::new(|| {
2224        Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap()
2225    });
2226
2227    let do_parse = || {
2228        RE.captures(archive_filename).and_then(|captures| {
2229            let base_slot = captures
2230                .name("base")
2231                .map(|x| x.as_str().parse::<Slot>())?
2232                .ok()?;
2233            let slot = captures
2234                .name("slot")
2235                .map(|x| x.as_str().parse::<Slot>())?
2236                .ok()?;
2237            let hash = captures
2238                .name("hash")
2239                .map(|x| x.as_str().parse::<Hash>())?
2240                .ok()?;
2241            let archive_format = captures
2242                .name("ext")
2243                .map(|x| x.as_str().parse::<ArchiveFormat>())?
2244                .ok()?;
2245
2246            Some((base_slot, slot, SnapshotHash(hash), archive_format))
2247        })
2248    };
2249
2250    do_parse().ok_or_else(|| {
2251        SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2252    })
2253}
2254
2255/// Walk down the snapshot archive to collect snapshot archive file info
2256fn get_snapshot_archives<T, F>(snapshot_archives_dir: &Path, cb: F) -> Vec<T>
2257where
2258    F: Fn(PathBuf) -> Result<T>,
2259{
2260    let walk_dir = |dir: &Path| -> Vec<T> {
2261        let entry_iter = fs::read_dir(dir);
2262        match entry_iter {
2263            Err(err) => {
2264                info!(
2265                    "Unable to read snapshot archives directory '{}': {err}",
2266                    dir.display(),
2267                );
2268                vec![]
2269            }
2270            Ok(entries) => entries
2271                .filter_map(|entry| entry.map_or(None, |entry| cb(entry.path()).ok()))
2272                .collect(),
2273        }
2274    };
2275
2276    let mut ret = walk_dir(snapshot_archives_dir);
2277    let remote_dir = build_snapshot_archives_remote_dir(snapshot_archives_dir);
2278    if remote_dir.exists() {
2279        ret.append(&mut walk_dir(remote_dir.as_ref()));
2280    }
2281    ret
2282}
2283
2284/// Get a list of the full snapshot archives from a directory
2285pub fn get_full_snapshot_archives(
2286    full_snapshot_archives_dir: impl AsRef<Path>,
2287) -> Vec<FullSnapshotArchiveInfo> {
2288    get_snapshot_archives(
2289        full_snapshot_archives_dir.as_ref(),
2290        FullSnapshotArchiveInfo::new_from_path,
2291    )
2292}
2293
2294/// Get a list of the incremental snapshot archives from a directory
2295pub fn get_incremental_snapshot_archives(
2296    incremental_snapshot_archives_dir: impl AsRef<Path>,
2297) -> Vec<IncrementalSnapshotArchiveInfo> {
2298    get_snapshot_archives(
2299        incremental_snapshot_archives_dir.as_ref(),
2300        IncrementalSnapshotArchiveInfo::new_from_path,
2301    )
2302}
2303
2304/// Get the highest slot of the full snapshot archives in a directory
2305pub fn get_highest_full_snapshot_archive_slot(
2306    full_snapshot_archives_dir: impl AsRef<Path>,
2307) -> Option<Slot> {
2308    get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
2309        .map(|full_snapshot_archive_info| full_snapshot_archive_info.slot())
2310}
2311
2312/// Get the highest slot of the incremental snapshot archives in a directory, for a given full
2313/// snapshot slot
2314pub fn get_highest_incremental_snapshot_archive_slot(
2315    incremental_snapshot_archives_dir: impl AsRef<Path>,
2316    full_snapshot_slot: Slot,
2317) -> Option<Slot> {
2318    get_highest_incremental_snapshot_archive_info(
2319        incremental_snapshot_archives_dir,
2320        full_snapshot_slot,
2321    )
2322    .map(|incremental_snapshot_archive_info| incremental_snapshot_archive_info.slot())
2323}
2324
2325/// Get the path (and metadata) for the full snapshot archive with the highest slot in a directory
2326pub fn get_highest_full_snapshot_archive_info(
2327    full_snapshot_archives_dir: impl AsRef<Path>,
2328) -> Option<FullSnapshotArchiveInfo> {
2329    let mut full_snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2330    full_snapshot_archives.sort_unstable();
2331    full_snapshot_archives.into_iter().next_back()
2332}
2333
2334/// Get the path for the incremental snapshot archive with the highest slot, for a given full
2335/// snapshot slot, in a directory
2336pub fn get_highest_incremental_snapshot_archive_info(
2337    incremental_snapshot_archives_dir: impl AsRef<Path>,
2338    full_snapshot_slot: Slot,
2339) -> Option<IncrementalSnapshotArchiveInfo> {
2340    // Since we want to filter down to only the incremental snapshot archives that have the same
2341    // full snapshot slot as the value passed in, perform the filtering before sorting to avoid
2342    // doing unnecessary work.
2343    let mut incremental_snapshot_archives =
2344        get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
2345            .into_iter()
2346            .filter(|incremental_snapshot_archive_info| {
2347                incremental_snapshot_archive_info.base_slot() == full_snapshot_slot
2348            })
2349            .collect::<Vec<_>>();
2350    incremental_snapshot_archives.sort_unstable();
2351    incremental_snapshot_archives.into_iter().next_back()
2352}
2353
2354pub fn purge_old_snapshot_archives(
2355    full_snapshot_archives_dir: impl AsRef<Path>,
2356    incremental_snapshot_archives_dir: impl AsRef<Path>,
2357    maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2358    maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2359) {
2360    info!(
2361        "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
2362        full_snapshot_archives_dir.as_ref().display(),
2363        maximum_full_snapshot_archives_to_retain
2364    );
2365
2366    let mut full_snapshot_archives = get_full_snapshot_archives(&full_snapshot_archives_dir);
2367    full_snapshot_archives.sort_unstable();
2368    full_snapshot_archives.reverse();
2369
2370    let num_to_retain = full_snapshot_archives
2371        .len()
2372        .min(maximum_full_snapshot_archives_to_retain.get());
2373    trace!(
2374        "There are {} full snapshot archives, retaining {}",
2375        full_snapshot_archives.len(),
2376        num_to_retain,
2377    );
2378
2379    let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
2380        if full_snapshot_archives.is_empty() {
2381            None
2382        } else {
2383            Some(full_snapshot_archives.split_at(num_to_retain))
2384        }
2385        .unwrap_or_default();
2386
2387    let retained_full_snapshot_slots = full_snapshot_archives_to_retain
2388        .iter()
2389        .map(|ai| ai.slot())
2390        .collect::<HashSet<_>>();
2391
2392    fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
2393        for path in archives.iter().map(|a| a.path()) {
2394            trace!("Removing snapshot archive: {}", path.display());
2395            let result = fs::remove_file(path);
2396            if let Err(err) = result {
2397                info!(
2398                    "Failed to remove snapshot archive '{}': {err}",
2399                    path.display()
2400                );
2401            }
2402        }
2403    }
2404    remove_archives(full_snapshot_archives_to_remove);
2405
2406    info!(
2407        "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
2408        incremental_snapshot_archives_dir.as_ref().display(),
2409        maximum_incremental_snapshot_archives_to_retain
2410    );
2411    let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
2412    for incremental_snapshot_archive in
2413        get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
2414    {
2415        incremental_snapshot_archives_by_base_slot
2416            .entry(incremental_snapshot_archive.base_slot())
2417            .or_default()
2418            .push(incremental_snapshot_archive)
2419    }
2420
2421    let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
2422    for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
2423    {
2424        incremental_snapshot_archives.sort_unstable();
2425        let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
2426            maximum_incremental_snapshot_archives_to_retain.get()
2427        } else {
2428            usize::from(retained_full_snapshot_slots.contains(&base_slot))
2429        };
2430        trace!(
2431            "There are {} incremental snapshot archives for base slot {}, removing {} of them",
2432            incremental_snapshot_archives.len(),
2433            base_slot,
2434            incremental_snapshot_archives
2435                .len()
2436                .saturating_sub(num_to_retain),
2437        );
2438
2439        incremental_snapshot_archives.truncate(
2440            incremental_snapshot_archives
2441                .len()
2442                .saturating_sub(num_to_retain),
2443        );
2444        remove_archives(&incremental_snapshot_archives);
2445    }
2446}
2447
2448#[cfg(feature = "dev-context-only-utils")]
2449fn unpack_snapshot_local(
2450    shared_buffer: SharedBuffer,
2451    ledger_dir: &Path,
2452    account_paths: &[PathBuf],
2453    parallel_divisions: usize,
2454) -> Result<UnpackedAppendVecMap> {
2455    assert!(parallel_divisions > 0);
2456
2457    // allocate all readers before any readers start reading
2458    let readers = (0..parallel_divisions)
2459        .map(|_| SharedBufferReader::new(&shared_buffer))
2460        .collect::<Vec<_>>();
2461
2462    // create 'parallel_divisions' # of parallel workers, each responsible for 1/parallel_divisions of all the files to extract.
2463    let all_unpacked_append_vec_map = readers
2464        .into_par_iter()
2465        .enumerate()
2466        .map(|(index, reader)| {
2467            let parallel_selector = Some(ParallelSelector {
2468                index,
2469                divisions: parallel_divisions,
2470            });
2471            let mut archive = Archive::new(reader);
2472            hardened_unpack::unpack_snapshot(
2473                &mut archive,
2474                ledger_dir,
2475                account_paths,
2476                parallel_selector,
2477            )
2478        })
2479        .collect::<Vec<_>>();
2480
2481    let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
2482    for h in all_unpacked_append_vec_map {
2483        unpacked_append_vec_map.extend(h?);
2484    }
2485
2486    Ok(unpacked_append_vec_map)
2487}
2488
2489fn untar_snapshot_create_shared_buffer(
2490    snapshot_tar: &Path,
2491    archive_format: ArchiveFormat,
2492) -> SharedBuffer {
2493    let open_file = || {
2494        fs::File::open(snapshot_tar)
2495            .map_err(|err| {
2496                IoError::other(format!(
2497                    "failed to open snapshot archive '{}': {err}",
2498                    snapshot_tar.display(),
2499                ))
2500            })
2501            .unwrap()
2502    };
2503    // Apply buffered reader for decoders that do not buffer internally.
2504    match archive_format {
2505        ArchiveFormat::TarBzip2 => SharedBuffer::new(BzDecoder::new(open_file())),
2506        ArchiveFormat::TarGzip => SharedBuffer::new(GzDecoder::new(open_file())),
2507        ArchiveFormat::TarZstd { .. } => {
2508            SharedBuffer::new(zstd::stream::read::Decoder::new(open_file()).unwrap())
2509        }
2510        ArchiveFormat::TarLz4 => SharedBuffer::new(lz4::Decoder::new(open_file()).unwrap()),
2511        ArchiveFormat::Tar => SharedBuffer::new(BufReader::new(open_file())),
2512    }
2513}
2514
2515#[cfg(feature = "dev-context-only-utils")]
2516fn untar_snapshot_in(
2517    snapshot_tar: impl AsRef<Path>,
2518    unpack_dir: &Path,
2519    account_paths: &[PathBuf],
2520    archive_format: ArchiveFormat,
2521    parallel_divisions: usize,
2522) -> Result<UnpackedAppendVecMap> {
2523    let shared_buffer = untar_snapshot_create_shared_buffer(snapshot_tar.as_ref(), archive_format);
2524    unpack_snapshot_local(shared_buffer, unpack_dir, account_paths, parallel_divisions)
2525}
2526
2527pub fn verify_unpacked_snapshots_dir_and_version(
2528    unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
2529) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
2530    info!(
2531        "snapshot version: {}",
2532        &unpacked_snapshots_dir_and_version.snapshot_version
2533    );
2534
2535    let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
2536    let mut bank_snapshots =
2537        get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
2538    if bank_snapshots.len() > 1 {
2539        return Err(IoError::other(format!(
2540            "invalid snapshot format: only one snapshot allowed, but found {}",
2541            bank_snapshots.len(),
2542        ))
2543        .into());
2544    }
2545    let root_paths = bank_snapshots.pop().ok_or_else(|| {
2546        IoError::other(format!(
2547            "no snapshots found in snapshots directory '{}'",
2548            unpacked_snapshots_dir_and_version
2549                .unpacked_snapshots_dir
2550                .display(),
2551        ))
2552    })?;
2553    Ok((snapshot_version, root_paths))
2554}
2555
2556/// Returns the file name of the bank snapshot for `slot`
2557pub fn get_snapshot_file_name(slot: Slot) -> String {
2558    slot.to_string()
2559}
2560
2561/// Constructs the path to the bank snapshot directory for `slot` within `bank_snapshots_dir`
2562pub fn get_bank_snapshot_dir(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) -> PathBuf {
2563    bank_snapshots_dir
2564        .as_ref()
2565        .join(get_snapshot_file_name(slot))
2566}
2567
2568#[derive(Debug, Copy, Clone)]
2569/// allow tests to specify what happened to the serialized format
2570pub enum VerifyBank {
2571    /// the bank's serialized format is expected to be identical to what we are comparing against
2572    Deterministic,
2573    /// the serialized bank was 'reserialized' into a non-deterministic format
2574    /// so, deserialize both files and compare deserialized results
2575    NonDeterministic,
2576}
2577
2578#[cfg(feature = "dev-context-only-utils")]
2579pub fn verify_snapshot_archive(
2580    snapshot_archive: impl AsRef<Path>,
2581    snapshots_to_verify: impl AsRef<Path>,
2582    archive_format: ArchiveFormat,
2583    verify_bank: VerifyBank,
2584    slot: Slot,
2585) {
2586    let temp_dir = tempfile::TempDir::new().unwrap();
2587    let unpack_dir = temp_dir.path();
2588    let unpack_account_dir = create_accounts_run_and_snapshot_dirs(unpack_dir).unwrap().0;
2589    untar_snapshot_in(
2590        snapshot_archive,
2591        unpack_dir,
2592        &[unpack_account_dir.clone()],
2593        archive_format,
2594        1,
2595    )
2596    .unwrap();
2597
2598    // Check snapshots are the same
2599    let unpacked_snapshots = unpack_dir.join("snapshots");
2600
2601    // Since the unpack code collects all the appendvecs into one directory unpack_account_dir, we need to
2602    // collect all the appendvecs in account_paths/<slot>/snapshot/ into one directory for later comparison.
2603    let storages_to_verify = unpack_dir.join("storages_to_verify");
2604    // Create the directory if it doesn't exist
2605    fs::create_dir_all(&storages_to_verify).unwrap();
2606
2607    let slot = slot.to_string();
2608    let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
2609
2610    if let VerifyBank::NonDeterministic = verify_bank {
2611        // file contents may be different, but deserialized structs should be equal
2612        let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
2613        let p2 = unpacked_snapshots.join(&slot).join(&slot);
2614        assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
2615        fs::remove_file(p1).unwrap();
2616        fs::remove_file(p2).unwrap();
2617    }
2618
2619    // The new the status_cache file is inside the slot directory together with the snapshot file.
2620    // When unpacking an archive, the status_cache file from the archive is one-level up outside of
2621    //  the slot directory.
2622    // The unpacked status_cache file need to be put back into the slot directory for the directory
2623    // comparison to pass.
2624    let existing_unpacked_status_cache_file =
2625        unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2626    let new_unpacked_status_cache_file = unpacked_snapshots
2627        .join(&slot)
2628        .join(SNAPSHOT_STATUS_CACHE_FILENAME);
2629    fs::rename(
2630        existing_unpacked_status_cache_file,
2631        new_unpacked_status_cache_file,
2632    )
2633    .unwrap();
2634
2635    let accounts_hardlinks_dir = snapshot_slot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2636    if accounts_hardlinks_dir.is_dir() {
2637        // This directory contain symlinks to all <account_path>/snapshot/<slot> directories.
2638        for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
2639            let link_dst_path = fs::read_link(entry.unwrap().path()).unwrap();
2640            // Copy all the files in dst_path into the storages_to_verify directory.
2641            for entry in fs::read_dir(&link_dst_path).unwrap() {
2642                let src_path = entry.unwrap().path();
2643                let dst_path = storages_to_verify.join(src_path.file_name().unwrap());
2644                fs::copy(src_path, dst_path).unwrap();
2645            }
2646        }
2647        fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
2648    }
2649
2650    let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
2651    if version_path.is_file() {
2652        fs::remove_file(version_path).unwrap();
2653    }
2654
2655    let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2656    if state_complete_path.is_file() {
2657        fs::remove_file(state_complete_path).unwrap();
2658    }
2659
2660    assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
2661
2662    // In the unarchiving case, there is an extra empty "accounts" directory. The account
2663    // files in the archive accounts/ have been expanded to [account_paths].
2664    // Remove the empty "accounts" directory for the directory comparison below.
2665    // In some test cases the directory to compare do not come from unarchiving.
2666    // Ignore the error when this directory does not exist.
2667    _ = fs::remove_dir(unpack_account_dir.join("accounts"));
2668    // Check the account entries are the same
2669    assert!(!dir_diff::is_different(&storages_to_verify, unpack_account_dir).unwrap());
2670}
2671
2672/// Purges all bank snapshots
2673pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
2674    let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2675    purge_bank_snapshots(&bank_snapshots);
2676}
2677
2678/// Purges bank snapshots, retaining the newest `num_bank_snapshots_to_retain`
2679pub fn purge_old_bank_snapshots(
2680    bank_snapshots_dir: impl AsRef<Path>,
2681    num_bank_snapshots_to_retain: usize,
2682    filter_by_kind: Option<BankSnapshotKind>,
2683) {
2684    let mut bank_snapshots = match filter_by_kind {
2685        Some(BankSnapshotKind::Pre) => get_bank_snapshots_pre(&bank_snapshots_dir),
2686        Some(BankSnapshotKind::Post) => get_bank_snapshots_post(&bank_snapshots_dir),
2687        None => get_bank_snapshots(&bank_snapshots_dir),
2688    };
2689
2690    bank_snapshots.sort_unstable();
2691    purge_bank_snapshots(
2692        bank_snapshots
2693            .iter()
2694            .rev()
2695            .skip(num_bank_snapshots_to_retain),
2696    );
2697}
2698
2699/// At startup, purge old (i.e. unusable) bank snapshots
2700///
2701/// Only a single bank snapshot could be needed at startup (when using fast boot), so
2702/// retain the highest bank snapshot "post", and purge the rest.
2703pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
2704    purge_old_bank_snapshots(&bank_snapshots_dir, 0, Some(BankSnapshotKind::Pre));
2705    purge_old_bank_snapshots(&bank_snapshots_dir, 1, Some(BankSnapshotKind::Post));
2706
2707    let highest_bank_snapshot_post = get_highest_bank_snapshot_post(&bank_snapshots_dir);
2708    if let Some(highest_bank_snapshot_post) = highest_bank_snapshot_post {
2709        debug!(
2710            "Retained bank snapshot for slot {}, and purged the rest.",
2711            highest_bank_snapshot_post.slot
2712        );
2713    }
2714}
2715
2716/// Purges bank snapshots that are older than `slot`
2717pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
2718    let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2719    bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
2720    purge_bank_snapshots(&bank_snapshots);
2721}
2722
2723/// Purges all `bank_snapshots`
2724///
2725/// Does not exit early if there is an error while purging a bank snapshot.
2726fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
2727    for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
2728        if purge_bank_snapshot(snapshot_dir).is_err() {
2729            warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
2730        }
2731    }
2732}
2733
2734/// Remove the bank snapshot at this path
2735pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
2736    const FN_ERR: &str = "failed to purge bank snapshot";
2737    let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2738    if accounts_hardlinks_dir.is_dir() {
2739        // This directory contain symlinks to all accounts snapshot directories.
2740        // They should all be removed.
2741        let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
2742            IoError::other(format!(
2743                "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
2744                accounts_hardlinks_dir.display(),
2745            ))
2746        })?;
2747        for entry in read_dir {
2748            let accounts_hardlink_dir = entry?.path();
2749            let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
2750                IoError::other(format!(
2751                    "{FN_ERR}: failed to read symlink '{}': {err}",
2752                    accounts_hardlink_dir.display(),
2753                ))
2754            })?;
2755            move_and_async_delete_path(&accounts_hardlink_dir);
2756        }
2757    }
2758    fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
2759        IoError::other(format!(
2760            "{FN_ERR}: failed to remove dir '{}': {err}",
2761            bank_snapshot_dir.as_ref().display(),
2762        ))
2763    })?;
2764    Ok(())
2765}
2766
2767pub fn should_take_full_snapshot(
2768    block_height: Slot,
2769    full_snapshot_archive_interval_slots: Slot,
2770) -> bool {
2771    block_height % full_snapshot_archive_interval_slots == 0
2772}
2773
2774pub fn should_take_incremental_snapshot(
2775    block_height: Slot,
2776    incremental_snapshot_archive_interval_slots: Slot,
2777    latest_full_snapshot_slot: Option<Slot>,
2778) -> bool {
2779    block_height % incremental_snapshot_archive_interval_slots == 0
2780        && latest_full_snapshot_slot.is_some()
2781}
2782
2783/// Creates an "accounts path" directory for tests
2784///
2785/// This temporary directory will contain the "run" and "snapshot"
2786/// sub-directories required by a validator.
2787#[cfg(feature = "dev-context-only-utils")]
2788pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
2789    let tmp_dir = tempfile::TempDir::new().unwrap();
2790    let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
2791    (tmp_dir, account_dir)
2792}
2793
2794#[cfg(test)]
2795mod tests {
2796    use {
2797        super::*,
2798        assert_matches::assert_matches,
2799        bincode::{deserialize_from, serialize_into},
2800        std::{convert::TryFrom, mem::size_of},
2801        tempfile::NamedTempFile,
2802    };
2803
2804    #[test]
2805    fn test_serialize_snapshot_data_file_under_limit() {
2806        let temp_dir = tempfile::TempDir::new().unwrap();
2807        let expected_consumed_size = size_of::<u32>() as u64;
2808        let consumed_size = serialize_snapshot_data_file_capped(
2809            &temp_dir.path().join("data-file"),
2810            expected_consumed_size,
2811            |stream| {
2812                serialize_into(stream, &2323_u32)?;
2813                Ok(())
2814            },
2815        )
2816        .unwrap();
2817        assert_eq!(consumed_size, expected_consumed_size);
2818    }
2819
2820    #[test]
2821    fn test_serialize_snapshot_data_file_over_limit() {
2822        let temp_dir = tempfile::TempDir::new().unwrap();
2823        let expected_consumed_size = size_of::<u32>() as u64;
2824        let result = serialize_snapshot_data_file_capped(
2825            &temp_dir.path().join("data-file"),
2826            expected_consumed_size - 1,
2827            |stream| {
2828                serialize_into(stream, &2323_u32)?;
2829                Ok(())
2830            },
2831        );
2832        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
2833    }
2834
2835    #[test]
2836    fn test_deserialize_snapshot_data_file_under_limit() {
2837        let expected_data = 2323_u32;
2838        let expected_consumed_size = size_of::<u32>() as u64;
2839
2840        let temp_dir = tempfile::TempDir::new().unwrap();
2841        serialize_snapshot_data_file_capped(
2842            &temp_dir.path().join("data-file"),
2843            expected_consumed_size,
2844            |stream| {
2845                serialize_into(stream, &expected_data)?;
2846                Ok(())
2847            },
2848        )
2849        .unwrap();
2850
2851        let snapshot_root_paths = SnapshotRootPaths {
2852            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2853            incremental_snapshot_root_file_path: None,
2854        };
2855
2856        let actual_data = deserialize_snapshot_data_files_capped(
2857            &snapshot_root_paths,
2858            expected_consumed_size,
2859            |stream| {
2860                Ok(deserialize_from::<_, u32>(
2861                    &mut stream.full_snapshot_stream,
2862                )?)
2863            },
2864        )
2865        .unwrap();
2866        assert_eq!(actual_data, expected_data);
2867    }
2868
2869    #[test]
2870    fn test_deserialize_snapshot_data_file_over_limit() {
2871        let expected_data = 2323_u32;
2872        let expected_consumed_size = size_of::<u32>() as u64;
2873
2874        let temp_dir = tempfile::TempDir::new().unwrap();
2875        serialize_snapshot_data_file_capped(
2876            &temp_dir.path().join("data-file"),
2877            expected_consumed_size,
2878            |stream| {
2879                serialize_into(stream, &expected_data)?;
2880                Ok(())
2881            },
2882        )
2883        .unwrap();
2884
2885        let snapshot_root_paths = SnapshotRootPaths {
2886            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2887            incremental_snapshot_root_file_path: None,
2888        };
2889
2890        let result = deserialize_snapshot_data_files_capped(
2891            &snapshot_root_paths,
2892            expected_consumed_size - 1,
2893            |stream| {
2894                Ok(deserialize_from::<_, u32>(
2895                    &mut stream.full_snapshot_stream,
2896                )?)
2897            },
2898        );
2899        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
2900    }
2901
2902    #[test]
2903    fn test_deserialize_snapshot_data_file_extra_data() {
2904        let expected_data = 2323_u32;
2905        let expected_consumed_size = size_of::<u32>() as u64;
2906
2907        let temp_dir = tempfile::TempDir::new().unwrap();
2908        serialize_snapshot_data_file_capped(
2909            &temp_dir.path().join("data-file"),
2910            expected_consumed_size * 2,
2911            |stream| {
2912                serialize_into(stream.by_ref(), &expected_data)?;
2913                serialize_into(stream.by_ref(), &expected_data)?;
2914                Ok(())
2915            },
2916        )
2917        .unwrap();
2918
2919        let snapshot_root_paths = SnapshotRootPaths {
2920            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2921            incremental_snapshot_root_file_path: None,
2922        };
2923
2924        let result = deserialize_snapshot_data_files_capped(
2925            &snapshot_root_paths,
2926            expected_consumed_size * 2,
2927            |stream| {
2928                Ok(deserialize_from::<_, u32>(
2929                    &mut stream.full_snapshot_stream,
2930                )?)
2931            },
2932        );
2933        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2934    }
2935
2936    #[test]
2937    fn test_snapshot_version_from_file_under_limit() {
2938        let file_content = SnapshotVersion::default().as_str();
2939        let mut file = NamedTempFile::new().unwrap();
2940        file.write_all(file_content.as_bytes()).unwrap();
2941        let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2942        assert_eq!(version_from_file, file_content);
2943    }
2944
2945    #[test]
2946    fn test_snapshot_version_from_file_over_limit() {
2947        let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2948        let file_content = vec![7u8; over_limit_size];
2949        let mut file = NamedTempFile::new().unwrap();
2950        file.write_all(&file_content).unwrap();
2951        assert_matches!(
2952            snapshot_version_from_file(file.path()),
2953            Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("snapshot version file too large")
2954        );
2955    }
2956
2957    #[test]
2958    fn test_parse_full_snapshot_archive_filename() {
2959        assert_eq!(
2960            parse_full_snapshot_archive_filename(&format!(
2961                "snapshot-42-{}.tar.bz2",
2962                Hash::default()
2963            ))
2964            .unwrap(),
2965            (42, SnapshotHash(Hash::default()), ArchiveFormat::TarBzip2)
2966        );
2967        assert_eq!(
2968            parse_full_snapshot_archive_filename(&format!(
2969                "snapshot-43-{}.tar.zst",
2970                Hash::default()
2971            ))
2972            .unwrap(),
2973            (
2974                43,
2975                SnapshotHash(Hash::default()),
2976                ArchiveFormat::TarZstd {
2977                    config: ZstdConfig::default(),
2978                }
2979            )
2980        );
2981        assert_eq!(
2982            parse_full_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default()))
2983                .unwrap(),
2984            (44, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2985        );
2986        assert_eq!(
2987            parse_full_snapshot_archive_filename(&format!(
2988                "snapshot-45-{}.tar.lz4",
2989                Hash::default()
2990            ))
2991            .unwrap(),
2992            (45, SnapshotHash(Hash::default()), ArchiveFormat::TarLz4)
2993        );
2994
2995        assert!(parse_full_snapshot_archive_filename("invalid").is_err());
2996        assert!(
2997            parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err()
2998        );
2999
3000        assert!(
3001            parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err()
3002        );
3003        assert!(parse_full_snapshot_archive_filename(&format!(
3004            "snapshot-12345678-{}.bad!ext",
3005            Hash::new_unique()
3006        ))
3007        .is_err());
3008        assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
3009
3010        assert!(parse_full_snapshot_archive_filename(&format!(
3011            "snapshot-bad!slot-{}.bad!ext",
3012            Hash::new_unique()
3013        ))
3014        .is_err());
3015        assert!(parse_full_snapshot_archive_filename(&format!(
3016            "snapshot-12345678-{}.bad!ext",
3017            Hash::new_unique()
3018        ))
3019        .is_err());
3020        assert!(parse_full_snapshot_archive_filename(&format!(
3021            "snapshot-bad!slot-{}.tar",
3022            Hash::new_unique()
3023        ))
3024        .is_err());
3025
3026        assert!(parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_err());
3027        assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
3028        assert!(parse_full_snapshot_archive_filename(&format!(
3029            "snapshot-bad!slot-{}.tar",
3030            Hash::new_unique()
3031        ))
3032        .is_err());
3033    }
3034
3035    #[test]
3036    fn test_parse_incremental_snapshot_archive_filename() {
3037        assert_eq!(
3038            parse_incremental_snapshot_archive_filename(&format!(
3039                "incremental-snapshot-42-123-{}.tar.bz2",
3040                Hash::default()
3041            ))
3042            .unwrap(),
3043            (
3044                42,
3045                123,
3046                SnapshotHash(Hash::default()),
3047                ArchiveFormat::TarBzip2
3048            )
3049        );
3050        assert_eq!(
3051            parse_incremental_snapshot_archive_filename(&format!(
3052                "incremental-snapshot-43-234-{}.tar.zst",
3053                Hash::default()
3054            ))
3055            .unwrap(),
3056            (
3057                43,
3058                234,
3059                SnapshotHash(Hash::default()),
3060                ArchiveFormat::TarZstd {
3061                    config: ZstdConfig::default(),
3062                }
3063            )
3064        );
3065        assert_eq!(
3066            parse_incremental_snapshot_archive_filename(&format!(
3067                "incremental-snapshot-44-345-{}.tar",
3068                Hash::default()
3069            ))
3070            .unwrap(),
3071            (44, 345, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
3072        );
3073        assert_eq!(
3074            parse_incremental_snapshot_archive_filename(&format!(
3075                "incremental-snapshot-45-456-{}.tar.lz4",
3076                Hash::default()
3077            ))
3078            .unwrap(),
3079            (
3080                45,
3081                456,
3082                SnapshotHash(Hash::default()),
3083                ArchiveFormat::TarLz4
3084            )
3085        );
3086
3087        assert!(parse_incremental_snapshot_archive_filename("invalid").is_err());
3088        assert!(parse_incremental_snapshot_archive_filename(&format!(
3089            "snapshot-42-{}.tar",
3090            Hash::new_unique()
3091        ))
3092        .is_err());
3093        assert!(parse_incremental_snapshot_archive_filename(
3094            "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext"
3095        )
3096        .is_err());
3097
3098        assert!(parse_incremental_snapshot_archive_filename(&format!(
3099            "incremental-snapshot-bad!slot-56785678-{}.tar",
3100            Hash::new_unique()
3101        ))
3102        .is_err());
3103
3104        assert!(parse_incremental_snapshot_archive_filename(&format!(
3105            "incremental-snapshot-12345678-bad!slot-{}.tar",
3106            Hash::new_unique()
3107        ))
3108        .is_err());
3109
3110        assert!(parse_incremental_snapshot_archive_filename(
3111            "incremental-snapshot-12341234-56785678-bad!HASH.tar"
3112        )
3113        .is_err());
3114
3115        assert!(parse_incremental_snapshot_archive_filename(&format!(
3116            "incremental-snapshot-12341234-56785678-{}.bad!ext",
3117            Hash::new_unique()
3118        ))
3119        .is_err());
3120    }
3121
3122    #[test]
3123    fn test_check_are_snapshots_compatible() {
3124        let slot1: Slot = 1234;
3125        let slot2: Slot = 5678;
3126        let slot3: Slot = 999_999;
3127
3128        let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
3129            format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()),
3130        ))
3131        .unwrap();
3132
3133        assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
3134
3135        let incremental_snapshot_archive_info =
3136            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
3137                "/dir/incremental-snapshot-{}-{}-{}.tar",
3138                slot1,
3139                slot2,
3140                Hash::new_unique()
3141            )))
3142            .unwrap();
3143
3144        assert!(check_are_snapshots_compatible(
3145            &full_snapshot_archive_info,
3146            Some(&incremental_snapshot_archive_info)
3147        )
3148        .is_ok());
3149
3150        let incremental_snapshot_archive_info =
3151            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
3152                "/dir/incremental-snapshot-{}-{}-{}.tar",
3153                slot2,
3154                slot3,
3155                Hash::new_unique()
3156            )))
3157            .unwrap();
3158
3159        assert!(check_are_snapshots_compatible(
3160            &full_snapshot_archive_info,
3161            Some(&incremental_snapshot_archive_info)
3162        )
3163        .is_err());
3164    }
3165
3166    /// A test heler function that creates bank snapshot files
3167    fn common_create_bank_snapshot_files(
3168        bank_snapshots_dir: &Path,
3169        min_slot: Slot,
3170        max_slot: Slot,
3171    ) {
3172        for slot in min_slot..max_slot {
3173            let snapshot_dir = get_bank_snapshot_dir(bank_snapshots_dir, slot);
3174            fs::create_dir_all(&snapshot_dir).unwrap();
3175
3176            let snapshot_filename = get_snapshot_file_name(slot);
3177            let snapshot_path = snapshot_dir.join(snapshot_filename);
3178            fs::File::create(snapshot_path).unwrap();
3179
3180            let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
3181            fs::File::create(status_cache_file).unwrap();
3182
3183            let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
3184            fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
3185
3186            // Mark this directory complete so it can be used.  Check this flag first before selecting for deserialization.
3187            write_snapshot_state_complete_file(snapshot_dir).unwrap();
3188        }
3189    }
3190
3191    #[test]
3192    fn test_get_bank_snapshots() {
3193        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
3194        let min_slot = 10;
3195        let max_slot = 20;
3196        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
3197
3198        let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
3199        assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
3200    }
3201
3202    #[test]
3203    fn test_get_highest_bank_snapshot_post() {
3204        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
3205        let min_slot = 99;
3206        let max_slot = 123;
3207        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
3208
3209        let highest_bank_snapshot = get_highest_bank_snapshot_post(temp_snapshots_dir.path());
3210        assert!(highest_bank_snapshot.is_some());
3211        assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
3212    }
3213
3214    /// A test helper function that creates full and incremental snapshot archive files.  Creates
3215    /// full snapshot files in the range (`min_full_snapshot_slot`, `max_full_snapshot_slot`], and
3216    /// incremental snapshot files in the range (`min_incremental_snapshot_slot`,
3217    /// `max_incremental_snapshot_slot`].  Additionally, "bad" files are created for both full and
3218    /// incremental snapshots to ensure the tests properly filter them out.
3219    fn common_create_snapshot_archive_files(
3220        full_snapshot_archives_dir: &Path,
3221        incremental_snapshot_archives_dir: &Path,
3222        min_full_snapshot_slot: Slot,
3223        max_full_snapshot_slot: Slot,
3224        min_incremental_snapshot_slot: Slot,
3225        max_incremental_snapshot_slot: Slot,
3226    ) {
3227        fs::create_dir_all(full_snapshot_archives_dir).unwrap();
3228        fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
3229        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3230            for incremental_snapshot_slot in
3231                min_incremental_snapshot_slot..max_incremental_snapshot_slot
3232            {
3233                let snapshot_filename = format!(
3234                    "incremental-snapshot-{}-{}-{}.tar",
3235                    full_snapshot_slot,
3236                    incremental_snapshot_slot,
3237                    Hash::default()
3238                );
3239                let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
3240                fs::File::create(snapshot_filepath).unwrap();
3241            }
3242
3243            let snapshot_filename =
3244                format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3245            let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
3246            fs::File::create(snapshot_filepath).unwrap();
3247
3248            // Add in an incremental snapshot with a bad filename and high slot to ensure filename are filtered and sorted correctly
3249            let bad_filename = format!(
3250                "incremental-snapshot-{}-{}-bad!hash.tar",
3251                full_snapshot_slot,
3252                max_incremental_snapshot_slot + 1,
3253            );
3254            let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
3255            fs::File::create(bad_filepath).unwrap();
3256        }
3257
3258        // Add in a snapshot with a bad filename and high slot to ensure filename are filtered and
3259        // sorted correctly
3260        let bad_filename = format!("snapshot-{}-bad!hash.tar", max_full_snapshot_slot + 1);
3261        let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
3262        fs::File::create(bad_filepath).unwrap();
3263    }
3264
3265    #[test]
3266    fn test_get_full_snapshot_archives() {
3267        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3268        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3269        let min_slot = 123;
3270        let max_slot = 456;
3271        common_create_snapshot_archive_files(
3272            full_snapshot_archives_dir.path(),
3273            incremental_snapshot_archives_dir.path(),
3274            min_slot,
3275            max_slot,
3276            0,
3277            0,
3278        );
3279
3280        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3281        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3282    }
3283
3284    #[test]
3285    fn test_get_full_snapshot_archives_remote() {
3286        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3287        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3288        let min_slot = 123;
3289        let max_slot = 456;
3290        common_create_snapshot_archive_files(
3291            &full_snapshot_archives_dir
3292                .path()
3293                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3294            &incremental_snapshot_archives_dir
3295                .path()
3296                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3297            min_slot,
3298            max_slot,
3299            0,
3300            0,
3301        );
3302
3303        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3304        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3305        assert!(snapshot_archives.iter().all(|info| info.is_remote()));
3306    }
3307
3308    #[test]
3309    fn test_get_incremental_snapshot_archives() {
3310        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3311        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3312        let min_full_snapshot_slot = 12;
3313        let max_full_snapshot_slot = 23;
3314        let min_incremental_snapshot_slot = 34;
3315        let max_incremental_snapshot_slot = 45;
3316        common_create_snapshot_archive_files(
3317            full_snapshot_archives_dir.path(),
3318            incremental_snapshot_archives_dir.path(),
3319            min_full_snapshot_slot,
3320            max_full_snapshot_slot,
3321            min_incremental_snapshot_slot,
3322            max_incremental_snapshot_slot,
3323        );
3324
3325        let incremental_snapshot_archives =
3326            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3327        assert_eq!(
3328            incremental_snapshot_archives.len() as Slot,
3329            (max_full_snapshot_slot - min_full_snapshot_slot)
3330                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3331        );
3332    }
3333
3334    #[test]
3335    fn test_get_incremental_snapshot_archives_remote() {
3336        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3337        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3338        let min_full_snapshot_slot = 12;
3339        let max_full_snapshot_slot = 23;
3340        let min_incremental_snapshot_slot = 34;
3341        let max_incremental_snapshot_slot = 45;
3342        common_create_snapshot_archive_files(
3343            &full_snapshot_archives_dir
3344                .path()
3345                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3346            &incremental_snapshot_archives_dir
3347                .path()
3348                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3349            min_full_snapshot_slot,
3350            max_full_snapshot_slot,
3351            min_incremental_snapshot_slot,
3352            max_incremental_snapshot_slot,
3353        );
3354
3355        let incremental_snapshot_archives =
3356            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3357        assert_eq!(
3358            incremental_snapshot_archives.len() as Slot,
3359            (max_full_snapshot_slot - min_full_snapshot_slot)
3360                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3361        );
3362        assert!(incremental_snapshot_archives
3363            .iter()
3364            .all(|info| info.is_remote()));
3365    }
3366
3367    #[test]
3368    fn test_get_highest_full_snapshot_archive_slot() {
3369        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3370        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3371        let min_slot = 123;
3372        let max_slot = 456;
3373        common_create_snapshot_archive_files(
3374            full_snapshot_archives_dir.path(),
3375            incremental_snapshot_archives_dir.path(),
3376            min_slot,
3377            max_slot,
3378            0,
3379            0,
3380        );
3381
3382        assert_eq!(
3383            get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
3384            Some(max_slot - 1)
3385        );
3386    }
3387
3388    #[test]
3389    fn test_get_highest_incremental_snapshot_slot() {
3390        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3391        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3392        let min_full_snapshot_slot = 12;
3393        let max_full_snapshot_slot = 23;
3394        let min_incremental_snapshot_slot = 34;
3395        let max_incremental_snapshot_slot = 45;
3396        common_create_snapshot_archive_files(
3397            full_snapshot_archives_dir.path(),
3398            incremental_snapshot_archives_dir.path(),
3399            min_full_snapshot_slot,
3400            max_full_snapshot_slot,
3401            min_incremental_snapshot_slot,
3402            max_incremental_snapshot_slot,
3403        );
3404
3405        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3406            assert_eq!(
3407                get_highest_incremental_snapshot_archive_slot(
3408                    incremental_snapshot_archives_dir.path(),
3409                    full_snapshot_slot
3410                ),
3411                Some(max_incremental_snapshot_slot - 1)
3412            );
3413        }
3414
3415        assert_eq!(
3416            get_highest_incremental_snapshot_archive_slot(
3417                incremental_snapshot_archives_dir.path(),
3418                max_full_snapshot_slot
3419            ),
3420            None
3421        );
3422    }
3423
3424    fn common_test_purge_old_snapshot_archives(
3425        snapshot_names: &[&String],
3426        maximum_full_snapshot_archives_to_retain: NonZeroUsize,
3427        maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
3428        expected_snapshots: &[&String],
3429    ) {
3430        let temp_snap_dir = tempfile::TempDir::new().unwrap();
3431
3432        for snap_name in snapshot_names {
3433            let snap_path = temp_snap_dir.path().join(snap_name);
3434            let mut _snap_file = fs::File::create(snap_path);
3435        }
3436        purge_old_snapshot_archives(
3437            temp_snap_dir.path(),
3438            temp_snap_dir.path(),
3439            maximum_full_snapshot_archives_to_retain,
3440            maximum_incremental_snapshot_archives_to_retain,
3441        );
3442
3443        let mut retained_snaps = HashSet::new();
3444        for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
3445            let entry_path_buf = entry.unwrap().path();
3446            let entry_path = entry_path_buf.as_path();
3447            let snapshot_name = entry_path
3448                .file_name()
3449                .unwrap()
3450                .to_str()
3451                .unwrap()
3452                .to_string();
3453            retained_snaps.insert(snapshot_name);
3454        }
3455
3456        for snap_name in expected_snapshots {
3457            assert!(
3458                retained_snaps.contains(snap_name.as_str()),
3459                "{snap_name} not found"
3460            );
3461        }
3462        assert_eq!(retained_snaps.len(), expected_snapshots.len());
3463    }
3464
3465    #[test]
3466    fn test_purge_old_full_snapshot_archives() {
3467        let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
3468        let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
3469        let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
3470        let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
3471
3472        // expecting only the newest to be retained
3473        let expected_snapshots = vec![&snap3_name];
3474        common_test_purge_old_snapshot_archives(
3475            &snapshot_names,
3476            NonZeroUsize::new(1).unwrap(),
3477            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3478            &expected_snapshots,
3479        );
3480
3481        // retaining 2, expecting the 2 newest to be retained
3482        let expected_snapshots = vec![&snap2_name, &snap3_name];
3483        common_test_purge_old_snapshot_archives(
3484            &snapshot_names,
3485            NonZeroUsize::new(2).unwrap(),
3486            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3487            &expected_snapshots,
3488        );
3489
3490        // retaining 3, all three should be retained
3491        let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
3492        common_test_purge_old_snapshot_archives(
3493            &snapshot_names,
3494            NonZeroUsize::new(3).unwrap(),
3495            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3496            &expected_snapshots,
3497        );
3498    }
3499
3500    /// Mimic a running node's behavior w.r.t. purging old snapshot archives.  Take snapshots in a
3501    /// loop, and periodically purge old snapshot archives.  After purging, check to make sure the
3502    /// snapshot archives on disk are correct.
3503    #[test]
3504    fn test_purge_old_full_snapshot_archives_in_the_loop() {
3505        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3506        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3507        let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
3508        let starting_slot: Slot = 42;
3509
3510        for slot in (starting_slot..).take(100) {
3511            let full_snapshot_archive_file_name =
3512                format!("snapshot-{}-{}.tar", slot, Hash::default());
3513            let full_snapshot_archive_path = full_snapshot_archives_dir
3514                .as_ref()
3515                .join(full_snapshot_archive_file_name);
3516            fs::File::create(full_snapshot_archive_path).unwrap();
3517
3518            // don't purge-and-check until enough snapshot archives have been created
3519            if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
3520                continue;
3521            }
3522
3523            // purge infrequently, so there will always be snapshot archives to purge
3524            if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
3525                continue;
3526            }
3527
3528            purge_old_snapshot_archives(
3529                &full_snapshot_archives_dir,
3530                &incremental_snapshot_archives_dir,
3531                maximum_snapshots_to_retain,
3532                NonZeroUsize::new(usize::MAX).unwrap(),
3533            );
3534            let mut full_snapshot_archives =
3535                get_full_snapshot_archives(&full_snapshot_archives_dir);
3536            full_snapshot_archives.sort_unstable();
3537            assert_eq!(
3538                full_snapshot_archives.len(),
3539                maximum_snapshots_to_retain.get()
3540            );
3541            assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
3542            for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
3543                assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
3544            }
3545        }
3546    }
3547
3548    #[test]
3549    fn test_purge_old_incremental_snapshot_archives() {
3550        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3551        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3552        let starting_slot = 100_000;
3553
3554        let maximum_incremental_snapshot_archives_to_retain =
3555            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3556        let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3557
3558        let incremental_snapshot_interval = 100;
3559        let num_incremental_snapshots_per_full_snapshot =
3560            maximum_incremental_snapshot_archives_to_retain.get() * 2;
3561        let full_snapshot_interval =
3562            incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
3563
3564        let mut snapshot_filenames = vec![];
3565        (starting_slot..)
3566            .step_by(full_snapshot_interval)
3567            .take(
3568                maximum_full_snapshot_archives_to_retain
3569                    .checked_mul(NonZeroUsize::new(2).unwrap())
3570                    .unwrap()
3571                    .get(),
3572            )
3573            .for_each(|full_snapshot_slot| {
3574                let snapshot_filename =
3575                    format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3576                let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
3577                fs::File::create(snapshot_path).unwrap();
3578                snapshot_filenames.push(snapshot_filename);
3579
3580                (full_snapshot_slot..)
3581                    .step_by(incremental_snapshot_interval)
3582                    .take(num_incremental_snapshots_per_full_snapshot)
3583                    .skip(1)
3584                    .for_each(|incremental_snapshot_slot| {
3585                        let snapshot_filename = format!(
3586                            "incremental-snapshot-{}-{}-{}.tar",
3587                            full_snapshot_slot,
3588                            incremental_snapshot_slot,
3589                            Hash::default()
3590                        );
3591                        let snapshot_path = incremental_snapshot_archives_dir
3592                            .path()
3593                            .join(&snapshot_filename);
3594                        fs::File::create(snapshot_path).unwrap();
3595                        snapshot_filenames.push(snapshot_filename);
3596                    });
3597            });
3598
3599        purge_old_snapshot_archives(
3600            full_snapshot_archives_dir.path(),
3601            incremental_snapshot_archives_dir.path(),
3602            maximum_full_snapshot_archives_to_retain,
3603            maximum_incremental_snapshot_archives_to_retain,
3604        );
3605
3606        // Ensure correct number of full snapshot archives are purged/retained
3607        let mut remaining_full_snapshot_archives =
3608            get_full_snapshot_archives(full_snapshot_archives_dir.path());
3609        assert_eq!(
3610            remaining_full_snapshot_archives.len(),
3611            maximum_full_snapshot_archives_to_retain.get(),
3612        );
3613        remaining_full_snapshot_archives.sort_unstable();
3614        let latest_full_snapshot_archive_slot =
3615            remaining_full_snapshot_archives.last().unwrap().slot();
3616
3617        // Ensure correct number of incremental snapshot archives are purged/retained
3618        // For each additional full snapshot archive, one additional (the newest)
3619        // incremental snapshot archive is retained. This is accounted for by the
3620        // `+ maximum_full_snapshot_archives_to_retain.saturating_sub(1)`
3621        let mut remaining_incremental_snapshot_archives =
3622            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3623        assert_eq!(
3624            remaining_incremental_snapshot_archives.len(),
3625            maximum_incremental_snapshot_archives_to_retain
3626                .get()
3627                .saturating_add(
3628                    maximum_full_snapshot_archives_to_retain
3629                        .get()
3630                        .saturating_sub(1)
3631                )
3632        );
3633        remaining_incremental_snapshot_archives.sort_unstable();
3634        remaining_incremental_snapshot_archives.reverse();
3635
3636        // Ensure there exists one incremental snapshot all but the latest full snapshot
3637        for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
3638            let incremental_snapshot_archive =
3639                remaining_incremental_snapshot_archives.pop().unwrap();
3640
3641            let expected_base_slot =
3642                latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
3643            assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
3644            let expected_slot = expected_base_slot
3645                + (full_snapshot_interval - incremental_snapshot_interval) as u64;
3646            assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
3647        }
3648
3649        // Ensure all remaining incremental snapshots are only for the latest full snapshot
3650        for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
3651            assert_eq!(
3652                incremental_snapshot_archive.base_slot(),
3653                latest_full_snapshot_archive_slot
3654            );
3655        }
3656
3657        // Ensure the remaining incremental snapshots are at the right slot
3658        let expected_remaining_incremental_snapshot_archive_slots =
3659            (latest_full_snapshot_archive_slot..)
3660                .step_by(incremental_snapshot_interval)
3661                .take(num_incremental_snapshots_per_full_snapshot)
3662                .skip(
3663                    num_incremental_snapshots_per_full_snapshot
3664                        - maximum_incremental_snapshot_archives_to_retain.get(),
3665                )
3666                .collect::<HashSet<_>>();
3667
3668        let actual_remaining_incremental_snapshot_archive_slots =
3669            remaining_incremental_snapshot_archives
3670                .iter()
3671                .map(|snapshot| snapshot.slot())
3672                .collect::<HashSet<_>>();
3673        assert_eq!(
3674            actual_remaining_incremental_snapshot_archive_slots,
3675            expected_remaining_incremental_snapshot_archive_slots
3676        );
3677    }
3678
3679    #[test]
3680    fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
3681        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3682        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3683
3684        for snapshot_filenames in [
3685            format!("incremental-snapshot-100-120-{}.tar", Hash::default()),
3686            format!("incremental-snapshot-100-140-{}.tar", Hash::default()),
3687            format!("incremental-snapshot-100-160-{}.tar", Hash::default()),
3688            format!("incremental-snapshot-100-180-{}.tar", Hash::default()),
3689            format!("incremental-snapshot-200-220-{}.tar", Hash::default()),
3690            format!("incremental-snapshot-200-240-{}.tar", Hash::default()),
3691            format!("incremental-snapshot-200-260-{}.tar", Hash::default()),
3692            format!("incremental-snapshot-200-280-{}.tar", Hash::default()),
3693        ] {
3694            let snapshot_path = incremental_snapshot_archives_dir
3695                .path()
3696                .join(snapshot_filenames);
3697            fs::File::create(snapshot_path).unwrap();
3698        }
3699
3700        purge_old_snapshot_archives(
3701            full_snapshot_archives_dir.path(),
3702            incremental_snapshot_archives_dir.path(),
3703            NonZeroUsize::new(usize::MAX).unwrap(),
3704            NonZeroUsize::new(usize::MAX).unwrap(),
3705        );
3706
3707        let remaining_incremental_snapshot_archives =
3708            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3709        assert!(remaining_incremental_snapshot_archives.is_empty());
3710    }
3711
3712    #[test]
3713    fn test_get_snapshot_accounts_hardlink_dir() {
3714        let slot: Slot = 1;
3715
3716        let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
3717
3718        let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
3719        let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
3720        let accounts_hardlinks_dir = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
3721        fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
3722
3723        let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
3724        let appendvec_filename = format!("{slot}.0");
3725        let appendvec_path = accounts_dir.join(appendvec_filename);
3726
3727        let ret = get_snapshot_accounts_hardlink_dir(
3728            &appendvec_path,
3729            slot,
3730            &mut account_paths_set,
3731            &accounts_hardlinks_dir,
3732        );
3733        assert!(ret.is_ok());
3734
3735        let wrong_appendvec_path = appendvec_path
3736            .parent()
3737            .unwrap()
3738            .parent()
3739            .unwrap()
3740            .join(appendvec_path.file_name().unwrap());
3741        let ret = get_snapshot_accounts_hardlink_dir(
3742            &wrong_appendvec_path,
3743            slot,
3744            &mut account_paths_set,
3745            accounts_hardlinks_dir,
3746        );
3747
3748        assert_matches!(
3749            ret,
3750            Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
3751        );
3752    }
3753
3754    #[test]
3755    fn test_get_snapshot_file_kind() {
3756        assert_eq!(None, get_snapshot_file_kind("file.txt"));
3757        assert_eq!(
3758            Some(SnapshotFileKind::Version),
3759            get_snapshot_file_kind(SNAPSHOT_VERSION_FILENAME)
3760        );
3761        assert_eq!(
3762            Some(SnapshotFileKind::BankFields),
3763            get_snapshot_file_kind("1234")
3764        );
3765        assert_eq!(
3766            Some(SnapshotFileKind::Storage),
3767            get_snapshot_file_kind("1000.999")
3768        );
3769    }
3770
3771    #[test]
3772    fn test_full_snapshot_slot_file_good() {
3773        let slot_written = 123_456_789;
3774        let bank_snapshot_dir = TempDir::new().unwrap();
3775        write_full_snapshot_slot_file(&bank_snapshot_dir, slot_written).unwrap();
3776
3777        let slot_read = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap();
3778        assert_eq!(slot_read, slot_written);
3779    }
3780
3781    #[test]
3782    fn test_full_snapshot_slot_file_bad() {
3783        const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
3784        let too_small = [1u8; SLOT_SIZE - 1];
3785        let too_large = [1u8; SLOT_SIZE + 1];
3786
3787        for contents in [too_small.as_slice(), too_large.as_slice()] {
3788            let bank_snapshot_dir = TempDir::new().unwrap();
3789            let full_snapshot_slot_path = bank_snapshot_dir
3790                .as_ref()
3791                .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
3792            fs::write(full_snapshot_slot_path, contents).unwrap();
3793
3794            let err = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap_err();
3795            assert!(err
3796                .to_string()
3797                .starts_with("invalid full snapshot slot file size"));
3798        }
3799    }
3800}