solana_runtime/
snapshot_utils.rs

1use {
2    crate::{
3        bank::{BankFieldsToSerialize, BankSlotDelta},
4        serde_snapshot::{
5            self, BankIncrementalSnapshotPersistence, ExtraFieldsToSerialize, SnapshotStreams,
6        },
7        snapshot_archive_info::{
8            FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfo,
9            SnapshotArchiveInfoGetter,
10        },
11        snapshot_bank_utils,
12        snapshot_config::SnapshotConfig,
13        snapshot_hash::SnapshotHash,
14        snapshot_package::{SnapshotKind, SnapshotPackage},
15        snapshot_utils::snapshot_storage_rebuilder::{
16            RebuiltSnapshotStorage, SnapshotStorageRebuilder,
17        },
18    },
19    bzip2::bufread::BzDecoder,
20    crossbeam_channel::Sender,
21    flate2::read::GzDecoder,
22    lazy_static::lazy_static,
23    log::*,
24    regex::Regex,
25    solana_accounts_db::{
26        account_storage::{meta::StoredMetaWriteVersion, AccountStorageMap},
27        accounts_db::{stats::BankHashStats, AccountStorageEntry, AtomicAccountsFileId},
28        accounts_file::{AccountsFile, AccountsFileError, InternalsForArchive, StorageAccess},
29        accounts_hash::{AccountsDeltaHash, AccountsHash},
30        epoch_accounts_hash::EpochAccountsHash,
31        hardened_unpack::{self, ParallelSelector, UnpackError},
32        shared_buffer_reader::{SharedBuffer, SharedBufferReader},
33        utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
34    },
35    solana_measure::{measure::Measure, measure_time, measure_us},
36    solana_sdk::{
37        clock::{Epoch, Slot},
38        hash::Hash,
39    },
40    std::{
41        cmp::Ordering,
42        collections::{HashMap, HashSet},
43        fmt, fs,
44        io::{BufReader, BufWriter, Error as IoError, Read, Result as IoResult, Seek, Write},
45        mem,
46        num::NonZeroUsize,
47        ops::RangeInclusive,
48        path::{Path, PathBuf},
49        process::ExitStatus,
50        str::FromStr,
51        sync::Arc,
52        thread::{Builder, JoinHandle},
53    },
54    tar::{self, Archive},
55    tempfile::TempDir,
56    thiserror::Error,
57};
58#[cfg(feature = "dev-context-only-utils")]
59use {
60    hardened_unpack::UnpackedAppendVecMap, rayon::prelude::*,
61    solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
62};
63
64mod archive_format;
65pub mod snapshot_storage_rebuilder;
66pub use archive_format::*;
67
68pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
69pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
70pub const SNAPSHOT_STATE_COMPLETE_FILENAME: &str = "state_complete";
71pub const SNAPSHOT_ACCOUNTS_HARDLINKS: &str = "accounts_hardlinks";
72pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
73pub const SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME: &str = "full_snapshot_slot";
74pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
75const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; // byte
76const VERSION_STRING_V1_2_0: &str = "1.2.0";
77pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
78pub const BANK_SNAPSHOT_PRE_FILENAME_EXTENSION: &str = "pre";
79// The following unsafes are
80// - Safe because the values are fixed, known non-zero constants
81// - Necessary in order to have a plain NonZeroUsize as the constant, NonZeroUsize
82//   returns an Option<NonZeroUsize> and we can't .unwrap() at compile time
83pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
84    unsafe { NonZeroUsize::new_unchecked(2) };
85pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
86    unsafe { NonZeroUsize::new_unchecked(4) };
87pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
88pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
89
90#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
91pub enum SnapshotVersion {
92    #[default]
93    V1_2_0,
94}
95
96impl fmt::Display for SnapshotVersion {
97    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
98        f.write_str(From::from(*self))
99    }
100}
101
102impl From<SnapshotVersion> for &'static str {
103    fn from(snapshot_version: SnapshotVersion) -> &'static str {
104        match snapshot_version {
105            SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
106        }
107    }
108}
109
110impl FromStr for SnapshotVersion {
111    type Err = &'static str;
112
113    fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
114        // Remove leading 'v' or 'V' from slice
115        let version_string = if version_string
116            .get(..1)
117            .map_or(false, |s| s.eq_ignore_ascii_case("v"))
118        {
119            &version_string[1..]
120        } else {
121            version_string
122        };
123        match version_string {
124            VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
125            _ => Err("unsupported snapshot version"),
126        }
127    }
128}
129
130impl SnapshotVersion {
131    pub fn as_str(self) -> &'static str {
132        <&str as From<Self>>::from(self)
133    }
134}
135
136/// Information about a bank snapshot. Namely the slot of the bank, the path to the snapshot, and
137/// the kind of the snapshot.
138#[derive(PartialEq, Eq, Debug)]
139pub struct BankSnapshotInfo {
140    /// Slot of the bank
141    pub slot: Slot,
142    /// Snapshot kind
143    pub snapshot_kind: BankSnapshotKind,
144    /// Path to the bank snapshot directory
145    pub snapshot_dir: PathBuf,
146    /// Snapshot version
147    pub snapshot_version: SnapshotVersion,
148}
149
150impl PartialOrd for BankSnapshotInfo {
151    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
152        Some(self.cmp(other))
153    }
154}
155
156// Order BankSnapshotInfo by slot (ascending), which practically is sorting chronologically
157impl Ord for BankSnapshotInfo {
158    fn cmp(&self, other: &Self) -> Ordering {
159        self.slot.cmp(&other.slot)
160    }
161}
162
163impl BankSnapshotInfo {
164    pub fn new_from_dir(
165        bank_snapshots_dir: impl AsRef<Path>,
166        slot: Slot,
167    ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
168        // check this directory to see if there is a BankSnapshotPre and/or
169        // BankSnapshotPost file
170        let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
171
172        if !bank_snapshot_dir.is_dir() {
173            return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
174                bank_snapshot_dir,
175            ));
176        }
177
178        // Among the files checks, the completion flag file check should be done first to avoid the later
179        // I/O errors.
180
181        // There is a time window from the slot directory being created, and the content being completely
182        // filled.  Check the completion to avoid using a highest found slot directory with missing content.
183        if !is_bank_snapshot_complete(&bank_snapshot_dir) {
184            return Err(SnapshotNewFromDirError::IncompleteDir(bank_snapshot_dir));
185        }
186
187        let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
188        if !status_cache_file.is_file() {
189            return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
190                status_cache_file,
191            ));
192        }
193
194        let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
195        let version_str = snapshot_version_from_file(&version_path).or(Err(
196            SnapshotNewFromDirError::MissingVersionFile(version_path),
197        ))?;
198        let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
199            .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
200
201        let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
202        let bank_snapshot_pre_path =
203            bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
204
205        // NOTE: It is important that checking for "Pre" happens before "Post.
206        //
207        // Consider the scenario where AccountsHashVerifier is actively processing an
208        // AccountsPackage for a snapshot/slot; if AHV is in the middle of reserializing the
209        // bank snapshot file (writing the new "Post" file), and then the process dies,
210        // there will be an incomplete "Post" file on disk.  We do not want only the existence of
211        // this "Post" file to be sufficient for deciding the snapshot kind as "Post".  More so,
212        // "Post" *requires* the *absence* of a "Pre" file.
213        let snapshot_kind = if bank_snapshot_pre_path.is_file() {
214            BankSnapshotKind::Pre
215        } else if bank_snapshot_post_path.is_file() {
216            BankSnapshotKind::Post
217        } else {
218            return Err(SnapshotNewFromDirError::MissingSnapshotFile(
219                bank_snapshot_dir,
220            ));
221        };
222
223        Ok(BankSnapshotInfo {
224            slot,
225            snapshot_kind,
226            snapshot_dir: bank_snapshot_dir,
227            snapshot_version,
228        })
229    }
230
231    pub fn snapshot_path(&self) -> PathBuf {
232        let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot));
233
234        let ext = match self.snapshot_kind {
235            BankSnapshotKind::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
236            BankSnapshotKind::Post => "",
237        };
238        bank_snapshot_path.set_extension(ext);
239
240        bank_snapshot_path
241    }
242}
243
244/// Bank snapshots traditionally had their accounts hash calculated prior to serialization.  Since
245/// the hash calculation takes a long time, an optimization has been put in to offload the accounts
246/// hash calculation.  The bank serialization format has not changed, so we need another way to
247/// identify if a bank snapshot contains the calculated accounts hash or not.
248///
249/// When a bank snapshot is first taken, it does not have the calculated accounts hash.  It is said
250/// that this bank snapshot is "pre" accounts hash.  Later, when the accounts hash is calculated,
251/// the bank snapshot is re-serialized, and is now "post" accounts hash.
252#[derive(Debug, Copy, Clone, Eq, PartialEq)]
253pub enum BankSnapshotKind {
254    /// This bank snapshot has *not* yet had its accounts hash calculated
255    Pre,
256    /// This bank snapshot *has* had its accounts hash calculated
257    Post,
258}
259
260/// When constructing a bank a snapshot, traditionally the snapshot was from a snapshot archive.  Now,
261/// the snapshot can be from a snapshot directory, or from a snapshot archive.  This is the flag to
262/// indicate which.
263#[derive(Clone, Copy, Debug, Eq, PartialEq)]
264pub enum SnapshotFrom {
265    /// Build from the snapshot archive
266    Archive,
267    /// Build directly from the bank snapshot directory
268    Dir,
269}
270
271/// Helper type when rebuilding from snapshots.  Designed to handle when rebuilding from just a
272/// full snapshot, or from both a full snapshot and an incremental snapshot.
273#[derive(Debug)]
274pub struct SnapshotRootPaths {
275    pub full_snapshot_root_file_path: PathBuf,
276    pub incremental_snapshot_root_file_path: Option<PathBuf>,
277}
278
279/// Helper type to bundle up the results from `unarchive_snapshot()`
280#[derive(Debug)]
281pub struct UnarchivedSnapshot {
282    #[allow(dead_code)]
283    unpack_dir: TempDir,
284    pub storage: AccountStorageMap,
285    pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
286    pub measure_untar: Measure,
287}
288
289/// Helper type for passing around the unpacked snapshots dir and the snapshot version together
290#[derive(Debug)]
291pub struct UnpackedSnapshotsDirAndVersion {
292    pub unpacked_snapshots_dir: PathBuf,
293    pub snapshot_version: SnapshotVersion,
294}
295
296/// Helper type for passing around account storage map and next append vec id
297/// for reconstructing accounts from a snapshot
298pub(crate) struct StorageAndNextAccountsFileId {
299    pub storage: AccountStorageMap,
300    pub next_append_vec_id: AtomicAccountsFileId,
301}
302
303#[derive(Error, Debug)]
304#[allow(clippy::large_enum_variant)]
305pub enum SnapshotError {
306    #[error("I/O error: {0}")]
307    Io(#[from] IoError),
308
309    #[error("AccountsFile error: {0}")]
310    AccountsFileError(#[from] AccountsFileError),
311
312    #[error("serialization error: {0}")]
313    Serialize(#[from] bincode::Error),
314
315    #[error("crossbeam send error: {0}")]
316    CrossbeamSend(#[from] crossbeam_channel::SendError<PathBuf>),
317
318    #[error("archive generation failure {0}")]
319    ArchiveGenerationFailure(ExitStatus),
320
321    #[error("Unpack error: {0}")]
322    UnpackError(#[from] UnpackError),
323
324    #[error("source({1}) - I/O error: {0}")]
325    IoWithSource(IoError, &'static str),
326
327    #[error("could not get file name from path '{0}'")]
328    PathToFileNameError(PathBuf),
329
330    #[error("could not get str from file name '{0}'")]
331    FileNameToStrError(PathBuf),
332
333    #[error("could not parse snapshot archive's file name '{0}'")]
334    ParseSnapshotArchiveFileNameError(String),
335
336    #[error("snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot ({1}) do not match")]
337    MismatchedBaseSlot(Slot, Slot),
338
339    #[error("no snapshot archives to load from '{0}'")]
340    NoSnapshotArchives(PathBuf),
341
342    #[error("snapshot has mismatch: deserialized bank: {0:?}, snapshot archive info: {1:?}")]
343    MismatchedSlotHash((Slot, SnapshotHash), (Slot, SnapshotHash)),
344
345    #[error("snapshot slot deltas are invalid: {0}")]
346    VerifySlotDeltas(#[from] VerifySlotDeltasError),
347
348    #[error("snapshot epoch stakes are invalid: {0}")]
349    VerifyEpochStakes(#[from] VerifyEpochStakesError),
350
351    #[error("bank_snapshot_info new_from_dir failed: {0}")]
352    NewFromDir(#[from] SnapshotNewFromDirError),
353
354    #[error("invalid snapshot dir path '{0}'")]
355    InvalidSnapshotDirPath(PathBuf),
356
357    #[error("invalid AppendVec path '{0}'")]
358    InvalidAppendVecPath(PathBuf),
359
360    #[error("invalid account path '{0}'")]
361    InvalidAccountPath(PathBuf),
362
363    #[error("no valid snapshot dir found under '{0}'")]
364    NoSnapshotSlotDir(PathBuf),
365
366    #[error("snapshot dir account paths mismatching")]
367    AccountPathsMismatch,
368
369    #[error("failed to add bank snapshot for slot {1}: {0}")]
370    AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
371
372    #[error("failed to archive snapshot package: {0}")]
373    ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
374
375    #[error("failed to rebuild snapshot storages: {0}")]
376    RebuildStorages(String),
377}
378
379#[derive(Error, Debug)]
380pub enum SnapshotNewFromDirError {
381    #[error("invalid bank snapshot directory '{0}'")]
382    InvalidBankSnapshotDir(PathBuf),
383
384    #[error("missing status cache file '{0}'")]
385    MissingStatusCacheFile(PathBuf),
386
387    #[error("missing version file '{0}'")]
388    MissingVersionFile(PathBuf),
389
390    #[error("invalid snapshot version '{0}'")]
391    InvalidVersion(String),
392
393    #[error("snapshot directory incomplete '{0}'")]
394    IncompleteDir(PathBuf),
395
396    #[error("missing snapshot file '{0}'")]
397    MissingSnapshotFile(PathBuf),
398}
399
400pub type Result<T> = std::result::Result<T, SnapshotError>;
401
402/// Errors that can happen in `verify_slot_deltas()`
403#[derive(Error, Debug, PartialEq, Eq)]
404pub enum VerifySlotDeltasError {
405    #[error("too many entries: {0} (max: {1})")]
406    TooManyEntries(usize, usize),
407
408    #[error("slot {0} is not a root")]
409    SlotIsNotRoot(Slot),
410
411    #[error("slot {0} is greater than bank slot {1}")]
412    SlotGreaterThanMaxRoot(Slot, Slot),
413
414    #[error("slot {0} has multiple entries")]
415    SlotHasMultipleEntries(Slot),
416
417    #[error("slot {0} was not found in slot history")]
418    SlotNotFoundInHistory(Slot),
419
420    #[error("slot {0} was in history but missing from slot deltas")]
421    SlotNotFoundInDeltas(Slot),
422
423    #[error("slot history is bad and cannot be used to verify slot deltas")]
424    BadSlotHistory,
425}
426
427/// Errors that can happen in `verify_epoch_stakes()`
428#[derive(Error, Debug, PartialEq, Eq)]
429pub enum VerifyEpochStakesError {
430    #[error("epoch {0} is greater than the max {1}")]
431    EpochGreaterThanMax(Epoch, Epoch),
432
433    #[error("stakes not found for epoch {0} (required epochs: {1:?})")]
434    StakesNotFound(Epoch, RangeInclusive<Epoch>),
435}
436
437/// Errors that can happen in `add_bank_snapshot()`
438#[derive(Error, Debug)]
439pub enum AddBankSnapshotError {
440    #[error("bank snapshot dir already exists '{0}'")]
441    SnapshotDirAlreadyExists(PathBuf),
442
443    #[error("failed to create snapshot dir '{1}': {0}")]
444    CreateSnapshotDir(#[source] IoError, PathBuf),
445
446    #[error("failed to flush storage '{1}': {0}")]
447    FlushStorage(#[source] AccountsFileError, PathBuf),
448
449    #[error("failed to hard link storages: {0}")]
450    HardLinkStorages(#[source] HardLinkStoragesToSnapshotError),
451
452    #[error("failed to serialize bank: {0}")]
453    SerializeBank(#[source] Box<SnapshotError>),
454
455    #[error("failed to serialize status cache: {0}")]
456    SerializeStatusCache(#[source] Box<SnapshotError>),
457
458    #[error("failed to write snapshot version file '{1}': {0}")]
459    WriteSnapshotVersionFile(#[source] IoError, PathBuf),
460
461    #[error("failed to mark snapshot as 'complete': failed to create file '{1}': {0}")]
462    CreateStateCompleteFile(#[source] IoError, PathBuf),
463}
464
465/// Errors that can happen in `archive_snapshot_package()`
466#[derive(Error, Debug)]
467pub enum ArchiveSnapshotPackageError {
468    #[error("failed to create archive path '{1}': {0}")]
469    CreateArchiveDir(#[source] IoError, PathBuf),
470
471    #[error("failed to create staging dir inside '{1}': {0}")]
472    CreateStagingDir(#[source] IoError, PathBuf),
473
474    #[error("failed to create snapshot staging dir '{1}': {0}")]
475    CreateSnapshotStagingDir(#[source] IoError, PathBuf),
476
477    #[error("failed to canonicalize snapshot source dir '{1}': {0}")]
478    CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),
479
480    #[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
481    SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),
482
483    #[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
484    SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),
485
486    #[error("failed to symlink version file from '{1}' to '{2}': {0}")]
487    SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),
488
489    #[error("failed to create archive file '{1}': {0}")]
490    CreateArchiveFile(#[source] IoError, PathBuf),
491
492    #[error("failed to archive version file: {0}")]
493    ArchiveVersionFile(#[source] IoError),
494
495    #[error("failed to archive snapshots dir: {0}")]
496    ArchiveSnapshotsDir(#[source] IoError),
497
498    #[error("failed to archive account storage file '{1}': {0}")]
499    ArchiveAccountStorageFile(#[source] IoError, PathBuf),
500
501    #[error("failed to archive snapshot: {0}")]
502    FinishArchive(#[source] IoError),
503
504    #[error("failed to create encoder: {0}")]
505    CreateEncoder(#[source] IoError),
506
507    #[error("failed to encode archive: {0}")]
508    FinishEncoder(#[source] IoError),
509
510    #[error("failed to query archive metadata '{1}': {0}")]
511    QueryArchiveMetadata(#[source] IoError, PathBuf),
512
513    #[error("failed to move archive from '{1}' to '{2}': {0}")]
514    MoveArchive(#[source] IoError, PathBuf, PathBuf),
515}
516
517/// Errors that can happen in `hard_link_storages_to_snapshot()`
518#[derive(Error, Debug)]
519pub enum HardLinkStoragesToSnapshotError {
520    #[error("failed to create accounts hard links dir '{1}': {0}")]
521    CreateAccountsHardLinksDir(#[source] IoError, PathBuf),
522
523    #[error("failed to get the snapshot's accounts hard link dir: {0}")]
524    GetSnapshotHardLinksDir(#[from] GetSnapshotAccountsHardLinkDirError),
525
526    #[error("failed to hard link storage from '{1}' to '{2}': {0}")]
527    HardLinkStorage(#[source] IoError, PathBuf, PathBuf),
528}
529
530/// Errors that can happen in `get_snapshot_accounts_hardlink_dir()`
531#[derive(Error, Debug)]
532pub enum GetSnapshotAccountsHardLinkDirError {
533    #[error("invalid account storage path '{0}'")]
534    GetAccountPath(PathBuf),
535
536    #[error("failed to create the snapshot hard link dir '{1}': {0}")]
537    CreateSnapshotHardLinkDir(#[source] IoError, PathBuf),
538
539    #[error("failed to symlink snapshot hard link dir '{link}' to '{original}': {source}")]
540    SymlinkSnapshotHardLinkDir {
541        source: IoError,
542        original: PathBuf,
543        link: PathBuf,
544    },
545}
546
547/// The account snapshot directories under <account_path>/snapshot/<slot> contain account files hardlinked
548/// from <account_path>/run taken at snapshot <slot> time.  They are referenced by the symlinks from the
549/// bank snapshot dir snapshot/<slot>/accounts_hardlinks/.  We observed that sometimes the bank snapshot dir
550/// could be deleted but the account snapshot directories were left behind, possibly by some manual operations
551/// or some legacy code not using the symlinks to clean up the account snapshot hardlink directories.
552/// This function cleans up any account snapshot directories that are no longer referenced by the bank
553/// snapshot dirs, to ensure proper snapshot operations.
554pub fn clean_orphaned_account_snapshot_dirs(
555    bank_snapshots_dir: impl AsRef<Path>,
556    account_snapshot_paths: &[PathBuf],
557) -> IoResult<()> {
558    // Create the HashSet of the account snapshot hardlink directories referenced by the snapshot dirs.
559    // This is used to clean up any hardlinks that are no longer referenced by the snapshot dirs.
560    let mut account_snapshot_dirs_referenced = HashSet::new();
561    let snapshots = get_bank_snapshots(bank_snapshots_dir);
562    for snapshot in snapshots {
563        let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
564        // loop through entries in the snapshot_hardlink_dir, read the symlinks, add the target to the HashSet
565        let read_dir = fs::read_dir(&account_hardlinks_dir).map_err(|err| {
566            IoError::other(format!(
567                "failed to read account hardlinks dir '{}': {err}",
568                account_hardlinks_dir.display(),
569            ))
570        })?;
571        for entry in read_dir {
572            let path = entry?.path();
573            let target = fs::read_link(&path).map_err(|err| {
574                IoError::other(format!(
575                    "failed to read symlink '{}': {err}",
576                    path.display(),
577                ))
578            })?;
579            account_snapshot_dirs_referenced.insert(target);
580        }
581    }
582
583    // loop through the account snapshot hardlink directories, if the directory is not in the account_snapshot_dirs_referenced set, delete it
584    for account_snapshot_path in account_snapshot_paths {
585        let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
586            IoError::other(format!(
587                "failed to read account snapshot dir '{}': {err}",
588                account_snapshot_path.display(),
589            ))
590        })?;
591        for entry in read_dir {
592            let path = entry?.path();
593            if !account_snapshot_dirs_referenced.contains(&path) {
594                info!(
595                    "Removing orphaned account snapshot hardlink directory '{}'...",
596                    path.display()
597                );
598                move_and_async_delete_path(&path);
599            }
600        }
601    }
602
603    Ok(())
604}
605
606/// Purges incomplete bank snapshots
607pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
608    let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
609        // If we cannot read the bank snapshots dir, then there's nothing to do
610        return;
611    };
612
613    let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
614
615    let incomplete_dirs: Vec<_> = read_dir_iter
616        .filter_map(|entry| entry.ok())
617        .map(|entry| entry.path())
618        .filter(|path| path.is_dir())
619        .filter(is_incomplete)
620        .collect();
621
622    // attempt to purge all the incomplete directories; do not exit early
623    for incomplete_dir in incomplete_dirs {
624        let result = purge_bank_snapshot(&incomplete_dir);
625        match result {
626            Ok(_) => info!(
627                "Purged incomplete snapshot dir: {}",
628                incomplete_dir.display()
629            ),
630            Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
631        }
632    }
633}
634
635/// Is the bank snapshot complete?
636fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
637    let state_complete_path = bank_snapshot_dir
638        .as_ref()
639        .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
640    state_complete_path.is_file()
641}
642
643/// Writes the full snapshot slot file into the bank snapshot dir
644pub fn write_full_snapshot_slot_file(
645    bank_snapshot_dir: impl AsRef<Path>,
646    full_snapshot_slot: Slot,
647) -> IoResult<()> {
648    let full_snapshot_slot_path = bank_snapshot_dir
649        .as_ref()
650        .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
651    fs::write(
652        &full_snapshot_slot_path,
653        Slot::to_le_bytes(full_snapshot_slot),
654    )
655    .map_err(|err| {
656        IoError::other(format!(
657            "failed to write full snapshot slot file '{}': {err}",
658            full_snapshot_slot_path.display(),
659        ))
660    })
661}
662
663// Reads the full snapshot slot file from the bank snapshot dir
664pub fn read_full_snapshot_slot_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<Slot> {
665    const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
666    let full_snapshot_slot_path = bank_snapshot_dir
667        .as_ref()
668        .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
669    let full_snapshot_slot_file_metadata = fs::metadata(&full_snapshot_slot_path)?;
670    if full_snapshot_slot_file_metadata.len() != SLOT_SIZE as u64 {
671        let error_message = format!(
672            "invalid full snapshot slot file size: '{}' has {} bytes (should be {} bytes)",
673            full_snapshot_slot_path.display(),
674            full_snapshot_slot_file_metadata.len(),
675            SLOT_SIZE,
676        );
677        return Err(IoError::other(error_message));
678    }
679    let mut full_snapshot_slot_file = fs::File::open(&full_snapshot_slot_path)?;
680    let mut buffer = [0; SLOT_SIZE];
681    full_snapshot_slot_file.read_exact(&mut buffer)?;
682    let slot = Slot::from_le_bytes(buffer);
683    Ok(slot)
684}
685
686/// Gets the highest, loadable, bank snapshot
687///
688/// The highest bank snapshot is the one with the highest slot.
689/// To be loadable, the bank snapshot must be a BankSnapshotKind::Post.
690/// And if we're generating snapshots (e.g. running a normal validator), then
691/// the full snapshot file's slot must match the highest full snapshot archive's.
692pub fn get_highest_loadable_bank_snapshot(
693    snapshot_config: &SnapshotConfig,
694) -> Option<BankSnapshotInfo> {
695    let highest_bank_snapshot =
696        get_highest_bank_snapshot_post(&snapshot_config.bank_snapshots_dir)?;
697
698    // If we're *not* generating snapshots, e.g. running ledger-tool, then we *can* load
699    // this bank snapshot, and we do not need to check for anything else.
700    if !snapshot_config.should_generate_snapshots() {
701        return Some(highest_bank_snapshot);
702    }
703
704    // Otherwise, the bank snapshot's full snapshot slot *must* be the same as
705    // the highest full snapshot archive's slot.
706    let highest_full_snapshot_archive_slot =
707        get_highest_full_snapshot_archive_slot(&snapshot_config.full_snapshot_archives_dir)?;
708    let full_snapshot_file_slot =
709        read_full_snapshot_slot_file(&highest_bank_snapshot.snapshot_dir).ok()?;
710    (full_snapshot_file_slot == highest_full_snapshot_archive_slot).then_some(highest_bank_snapshot)
711}
712
713/// If the validator halts in the middle of `archive_snapshot_package()`, the temporary staging
714/// directory won't be cleaned up.  Call this function to clean them up.
715pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
716    if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
717        for entry in entries.flatten() {
718            if entry
719                .file_name()
720                .to_str()
721                .map(|file_name| file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX))
722                .unwrap_or(false)
723            {
724                let path = entry.path();
725                let result = if path.is_dir() {
726                    fs::remove_dir_all(&path)
727                } else {
728                    fs::remove_file(&path)
729                };
730                if let Err(err) = result {
731                    warn!(
732                        "Failed to remove temporary snapshot archive '{}': {err}",
733                        path.display(),
734                    );
735                }
736            }
737        }
738    }
739}
740
741/// Serializes and archives a snapshot package
742pub fn serialize_and_archive_snapshot_package(
743    snapshot_package: SnapshotPackage,
744    snapshot_config: &SnapshotConfig,
745) -> Result<SnapshotArchiveInfo> {
746    let SnapshotPackage {
747        snapshot_kind,
748        slot: snapshot_slot,
749        block_height,
750        hash: snapshot_hash,
751        mut snapshot_storages,
752        status_cache_slot_deltas,
753        bank_fields_to_serialize,
754        bank_hash_stats,
755        accounts_delta_hash,
756        accounts_hash,
757        epoch_accounts_hash,
758        bank_incremental_snapshot_persistence,
759        write_version,
760        enqueued: _,
761    } = snapshot_package;
762
763    let bank_snapshot_info = serialize_snapshot(
764        &snapshot_config.bank_snapshots_dir,
765        snapshot_config.snapshot_version,
766        snapshot_storages.as_slice(),
767        status_cache_slot_deltas.as_slice(),
768        bank_fields_to_serialize,
769        bank_hash_stats,
770        accounts_delta_hash,
771        accounts_hash,
772        epoch_accounts_hash,
773        bank_incremental_snapshot_persistence.as_ref(),
774        write_version,
775    )?;
776
777    // now write the full snapshot slot file after serializing so this bank snapshot is loadable
778    let full_snapshot_archive_slot = match snapshot_kind {
779        SnapshotKind::FullSnapshot => snapshot_slot,
780        SnapshotKind::IncrementalSnapshot(base_slot) => base_slot,
781    };
782    write_full_snapshot_slot_file(&bank_snapshot_info.snapshot_dir, full_snapshot_archive_slot)
783        .map_err(|err| {
784            IoError::other(format!(
785                "failed to serialize snapshot slot {snapshot_slot}, block height {block_height}, kind {snapshot_kind:?}: {err}",
786            ))
787        })?;
788
789    let snapshot_archive_path = match snapshot_package.snapshot_kind {
790        SnapshotKind::FullSnapshot => build_full_snapshot_archive_path(
791            &snapshot_config.full_snapshot_archives_dir,
792            snapshot_package.slot,
793            &snapshot_package.hash,
794            snapshot_config.archive_format,
795        ),
796        SnapshotKind::IncrementalSnapshot(incremental_snapshot_base_slot) => {
797            // After the snapshot has been serialized, it is now safe (and required) to prune all
798            // the storages that are *not* to be archived for this incremental snapshot.
799            snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
800            build_incremental_snapshot_archive_path(
801                &snapshot_config.incremental_snapshot_archives_dir,
802                incremental_snapshot_base_slot,
803                snapshot_package.slot,
804                &snapshot_package.hash,
805                snapshot_config.archive_format,
806            )
807        }
808    };
809
810    let snapshot_archive_info = archive_snapshot(
811        snapshot_kind,
812        snapshot_slot,
813        snapshot_hash,
814        snapshot_storages.as_slice(),
815        &bank_snapshot_info.snapshot_dir,
816        snapshot_archive_path,
817        snapshot_config.archive_format,
818    )?;
819
820    Ok(snapshot_archive_info)
821}
822
823/// Serializes a snapshot into `bank_snapshots_dir`
824#[allow(clippy::too_many_arguments)]
825fn serialize_snapshot(
826    bank_snapshots_dir: impl AsRef<Path>,
827    snapshot_version: SnapshotVersion,
828    snapshot_storages: &[Arc<AccountStorageEntry>],
829    slot_deltas: &[BankSlotDelta],
830    mut bank_fields: BankFieldsToSerialize,
831    bank_hash_stats: BankHashStats,
832    accounts_delta_hash: AccountsDeltaHash,
833    accounts_hash: AccountsHash,
834    epoch_accounts_hash: Option<EpochAccountsHash>,
835    bank_incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
836    write_version: StoredMetaWriteVersion,
837) -> Result<BankSnapshotInfo> {
838    let slot = bank_fields.slot;
839
840    // this lambda function is to facilitate converting between
841    // the AddBankSnapshotError and SnapshotError types
842    let do_serialize_snapshot = || {
843        let mut measure_everything = Measure::start("");
844        let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
845        if bank_snapshot_dir.exists() {
846            return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
847                bank_snapshot_dir,
848            ));
849        }
850        fs::create_dir_all(&bank_snapshot_dir).map_err(|err| {
851            AddBankSnapshotError::CreateSnapshotDir(err, bank_snapshot_dir.clone())
852        })?;
853
854        // the bank snapshot is stored as bank_snapshots_dir/slot/slot
855        let bank_snapshot_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
856        info!(
857            "Creating bank snapshot for slot {slot} at '{}'",
858            bank_snapshot_path.display(),
859        );
860
861        let (_, flush_storages_us) = measure_us!({
862            for storage in snapshot_storages {
863                storage.flush().map_err(|err| {
864                    AddBankSnapshotError::FlushStorage(err, storage.path().to_path_buf())
865                })?;
866            }
867        });
868
869        // We are constructing the snapshot directory to contain the full snapshot state information to allow
870        // constructing a bank from this directory.  It acts like an archive to include the full state.
871        // The set of the account storages files is the necessary part of this snapshot state.  Hard-link them
872        // from the operational accounts/ directory to here.
873        let (_, hard_link_storages_us) = measure_us!(hard_link_storages_to_snapshot(
874            &bank_snapshot_dir,
875            slot,
876            snapshot_storages
877        )
878        .map_err(AddBankSnapshotError::HardLinkStorages)?);
879
880        let bank_snapshot_serializer = move |stream: &mut BufWriter<fs::File>| -> Result<()> {
881            let versioned_epoch_stakes = mem::take(&mut bank_fields.versioned_epoch_stakes);
882            let extra_fields = ExtraFieldsToSerialize {
883                lamports_per_signature: bank_fields.fee_rate_governor.lamports_per_signature,
884                incremental_snapshot_persistence: bank_incremental_snapshot_persistence,
885                epoch_accounts_hash,
886                versioned_epoch_stakes,
887            };
888            serde_snapshot::serialize_bank_snapshot_into(
889                stream,
890                bank_fields,
891                bank_hash_stats,
892                accounts_delta_hash,
893                accounts_hash,
894                &get_storages_to_serialize(snapshot_storages),
895                extra_fields,
896                write_version,
897            )?;
898            Ok(())
899        };
900        let (bank_snapshot_consumed_size, bank_serialize) = measure_time!(
901            serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
902                .map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
903            "bank serialize"
904        );
905
906        let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
907        let (status_cache_consumed_size, status_cache_serialize_us) = measure_us!(
908            snapshot_bank_utils::serialize_status_cache(slot_deltas, &status_cache_path)
909                .map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?
910        );
911
912        let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
913        let (_, write_version_file_us) = measure_us!(fs::write(
914            &version_path,
915            snapshot_version.as_str().as_bytes(),
916        )
917        .map_err(|err| AddBankSnapshotError::WriteSnapshotVersionFile(err, version_path))?);
918
919        // Mark this directory complete so it can be used.  Check this flag first before selecting for deserialization.
920        let state_complete_path = bank_snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
921        let (_, write_state_complete_file_us) = measure_us!(fs::File::create(&state_complete_path)
922            .map_err(|err| {
923                AddBankSnapshotError::CreateStateCompleteFile(err, state_complete_path)
924            })?);
925
926        measure_everything.stop();
927
928        // Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
929        datapoint_info!(
930            "snapshot_bank",
931            ("slot", slot, i64),
932            ("bank_size", bank_snapshot_consumed_size, i64),
933            ("status_cache_size", status_cache_consumed_size, i64),
934            ("flush_storages_us", flush_storages_us, i64),
935            ("hard_link_storages_us", hard_link_storages_us, i64),
936            ("bank_serialize_us", bank_serialize.as_us(), i64),
937            ("status_cache_serialize_us", status_cache_serialize_us, i64),
938            ("write_version_file_us", write_version_file_us, i64),
939            (
940                "write_state_complete_file_us",
941                write_state_complete_file_us,
942                i64
943            ),
944            ("total_us", measure_everything.as_us(), i64),
945        );
946
947        info!(
948            "{} for slot {} at {}",
949            bank_serialize,
950            slot,
951            bank_snapshot_path.display(),
952        );
953
954        Ok(BankSnapshotInfo {
955            slot,
956            snapshot_kind: BankSnapshotKind::Pre,
957            snapshot_dir: bank_snapshot_dir,
958            snapshot_version,
959        })
960    };
961
962    do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
963}
964
965/// Archives a snapshot into `archive_path`
966fn archive_snapshot(
967    snapshot_kind: SnapshotKind,
968    snapshot_slot: Slot,
969    snapshot_hash: SnapshotHash,
970    snapshot_storages: &[Arc<AccountStorageEntry>],
971    bank_snapshot_dir: impl AsRef<Path>,
972    archive_path: impl AsRef<Path>,
973    archive_format: ArchiveFormat,
974) -> Result<SnapshotArchiveInfo> {
975    use ArchiveSnapshotPackageError as E;
976    const SNAPSHOTS_DIR: &str = "snapshots";
977    const ACCOUNTS_DIR: &str = "accounts";
978    info!("Generating snapshot archive for slot {snapshot_slot}, kind: {snapshot_kind:?}");
979
980    let mut timer = Measure::start("snapshot_package-package_snapshots");
981    let tar_dir = archive_path
982        .as_ref()
983        .parent()
984        .expect("Tar output path is invalid");
985
986    fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
987
988    // Create the staging directories
989    let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
990    let staging_dir = tempfile::Builder::new()
991        .prefix(&format!("{}{}-", staging_dir_prefix, snapshot_slot))
992        .tempdir_in(tar_dir)
993        .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
994    let staging_snapshots_dir = staging_dir.path().join(SNAPSHOTS_DIR);
995
996    let slot_str = snapshot_slot.to_string();
997    let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
998    // Creates staging snapshots/<slot>/
999    fs::create_dir_all(&staging_snapshot_dir)
1000        .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
1001
1002    // To be a source for symlinking and archiving, the path need to be an absolute path
1003    let src_snapshot_dir = bank_snapshot_dir.as_ref().canonicalize().map_err(|err| {
1004        E::CanonicalizeSnapshotSourceDir(err, bank_snapshot_dir.as_ref().to_path_buf())
1005    })?;
1006    let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
1007    let src_snapshot_file = src_snapshot_dir.join(slot_str);
1008    symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
1009        .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
1010
1011    // Following the existing archive format, the status cache is under snapshots/, not under <slot>/
1012    // like in the snapshot dir.
1013    let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1014    let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1015    symlink::symlink_file(&src_status_cache, &staging_status_cache)
1016        .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
1017
1018    // The bank snapshot has the version file, so symlink it to the correct staging path
1019    let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
1020    let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1021    symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
1022        E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
1023    })?;
1024
1025    // Tar the staging directory into the archive at `staging_archive_path`
1026    let staging_archive_path = tar_dir.join(format!(
1027        "{}{}.{}",
1028        staging_dir_prefix,
1029        snapshot_slot,
1030        archive_format.extension(),
1031    ));
1032
1033    {
1034        let mut archive_file = fs::File::create(&staging_archive_path)
1035            .map_err(|err| E::CreateArchiveFile(err, staging_archive_path.clone()))?;
1036
1037        let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
1038            let mut archive = tar::Builder::new(encoder);
1039            // Disable sparse file handling.  This seems to be the root cause of an issue when
1040            // upgrading v2.0 to v2.1, and the tar crate from 0.4.41 to 0.4.42.
1041            // Since the tarball will still go through compression (zstd/etc) afterwards, disabling
1042            // sparse handling in the tar itself should be fine.
1043            //
1044            // Likely introduced in [^1].  Tracking resolution in [^2].
1045            // [^1] https://github.com/alexcrichton/tar-rs/pull/375
1046            // [^2] https://github.com/alexcrichton/tar-rs/issues/403
1047            archive.sparse(false);
1048            // Serialize the version and snapshots files before accounts so we can quickly determine the version
1049            // and other bank fields. This is necessary if we want to interleave unpacking with reconstruction
1050            archive
1051                .append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
1052                .map_err(E::ArchiveVersionFile)?;
1053            archive
1054                .append_dir_all(SNAPSHOTS_DIR, &staging_snapshots_dir)
1055                .map_err(E::ArchiveSnapshotsDir)?;
1056
1057            for storage in snapshot_storages {
1058                let path_in_archive = Path::new(ACCOUNTS_DIR)
1059                    .join(AccountsFile::file_name(storage.slot(), storage.id()));
1060                match storage.accounts.internals_for_archive() {
1061                    InternalsForArchive::Mmap(data) => {
1062                        let mut header = tar::Header::new_gnu();
1063                        header.set_path(path_in_archive).map_err(|err| {
1064                            E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1065                        })?;
1066                        header.set_size(storage.capacity());
1067                        header.set_cksum();
1068                        archive.append(&header, data)
1069                    }
1070                    InternalsForArchive::FileIo(path) => {
1071                        archive.append_path_with_name(path, path_in_archive)
1072                    }
1073                }
1074                .map_err(|err| E::ArchiveAccountStorageFile(err, storage.path().to_path_buf()))?;
1075            }
1076
1077            archive.into_inner().map_err(E::FinishArchive)?;
1078            Ok(())
1079        };
1080
1081        match archive_format {
1082            ArchiveFormat::TarBzip2 => {
1083                let mut encoder =
1084                    bzip2::write::BzEncoder::new(archive_file, bzip2::Compression::best());
1085                do_archive_files(&mut encoder)?;
1086                encoder.finish().map_err(E::FinishEncoder)?;
1087            }
1088            ArchiveFormat::TarGzip => {
1089                let mut encoder =
1090                    flate2::write::GzEncoder::new(archive_file, flate2::Compression::default());
1091                do_archive_files(&mut encoder)?;
1092                encoder.finish().map_err(E::FinishEncoder)?;
1093            }
1094            ArchiveFormat::TarZstd { config } => {
1095                let mut encoder =
1096                    zstd::stream::Encoder::new(archive_file, config.compression_level)
1097                        .map_err(E::CreateEncoder)?;
1098                do_archive_files(&mut encoder)?;
1099                encoder.finish().map_err(E::FinishEncoder)?;
1100            }
1101            ArchiveFormat::TarLz4 => {
1102                let mut encoder = lz4::EncoderBuilder::new()
1103                    .level(1)
1104                    .build(archive_file)
1105                    .map_err(E::CreateEncoder)?;
1106                do_archive_files(&mut encoder)?;
1107                let (_output, result) = encoder.finish();
1108                result.map_err(E::FinishEncoder)?;
1109            }
1110            ArchiveFormat::Tar => {
1111                do_archive_files(&mut archive_file)?;
1112            }
1113        };
1114    }
1115
1116    // Atomically move the archive into position for other validators to find
1117    let metadata = fs::metadata(&staging_archive_path)
1118        .map_err(|err| E::QueryArchiveMetadata(err, staging_archive_path.clone()))?;
1119    let archive_path = archive_path.as_ref().to_path_buf();
1120    fs::rename(&staging_archive_path, &archive_path)
1121        .map_err(|err| E::MoveArchive(err, staging_archive_path, archive_path.clone()))?;
1122
1123    timer.stop();
1124    info!(
1125        "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
1126        archive_path.display(),
1127        snapshot_slot,
1128        timer.as_ms(),
1129        metadata.len()
1130    );
1131
1132    datapoint_info!(
1133        "archive-snapshot-package",
1134        ("slot", snapshot_slot, i64),
1135        ("archive_format", archive_format.to_string(), String),
1136        ("duration_ms", timer.as_ms(), i64),
1137        (
1138            if snapshot_kind.is_full_snapshot() {
1139                "full-snapshot-archive-size"
1140            } else {
1141                "incremental-snapshot-archive-size"
1142            },
1143            metadata.len(),
1144            i64
1145        ),
1146    );
1147    Ok(SnapshotArchiveInfo {
1148        path: archive_path,
1149        slot: snapshot_slot,
1150        hash: snapshot_hash,
1151        archive_format,
1152    })
1153}
1154
1155/// Get the bank snapshots in a directory
1156pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1157    let mut bank_snapshots = Vec::default();
1158    match fs::read_dir(&bank_snapshots_dir) {
1159        Err(err) => {
1160            info!(
1161                "Unable to read bank snapshots directory '{}': {err}",
1162                bank_snapshots_dir.as_ref().display(),
1163            );
1164        }
1165        Ok(paths) => paths
1166            .filter_map(|entry| {
1167                // check if this entry is a directory and only a Slot
1168                // bank snapshots are bank_snapshots_dir/slot/slot(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION)
1169                entry
1170                    .ok()
1171                    .filter(|entry| entry.path().is_dir())
1172                    .and_then(|entry| {
1173                        entry
1174                            .path()
1175                            .file_name()
1176                            .and_then(|file_name| file_name.to_str())
1177                            .and_then(|file_name| file_name.parse::<Slot>().ok())
1178                    })
1179            })
1180            .for_each(
1181                |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
1182                    Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
1183                    // Other threads may be modifying bank snapshots in parallel; only return
1184                    // snapshots that are complete as deemed by BankSnapshotInfo::new_from_dir()
1185                    Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
1186                },
1187            ),
1188    }
1189    bank_snapshots
1190}
1191
1192/// Get the bank snapshots in a directory
1193///
1194/// This function retains only the bank snapshots of kind BankSnapshotKind::Pre
1195pub fn get_bank_snapshots_pre(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1196    let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1197    bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Pre);
1198    bank_snapshots
1199}
1200
1201/// Get the bank snapshots in a directory
1202///
1203/// This function retains only the bank snapshots of kind BankSnapshotKind::Post
1204pub fn get_bank_snapshots_post(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1205    let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1206    bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Post);
1207    bank_snapshots
1208}
1209
1210/// Get the bank snapshot with the highest slot in a directory
1211///
1212/// This function gets the highest bank snapshot of kind BankSnapshotKind::Pre
1213pub fn get_highest_bank_snapshot_pre(
1214    bank_snapshots_dir: impl AsRef<Path>,
1215) -> Option<BankSnapshotInfo> {
1216    do_get_highest_bank_snapshot(get_bank_snapshots_pre(bank_snapshots_dir))
1217}
1218
1219/// Get the bank snapshot with the highest slot in a directory
1220///
1221/// This function gets the highest bank snapshot of kind BankSnapshotKind::Post
1222pub fn get_highest_bank_snapshot_post(
1223    bank_snapshots_dir: impl AsRef<Path>,
1224) -> Option<BankSnapshotInfo> {
1225    do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
1226}
1227
1228/// Get the bank snapshot with the highest slot in a directory
1229///
1230/// This function gets the highest bank snapshot of any kind
1231pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
1232    do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
1233}
1234
1235fn do_get_highest_bank_snapshot(
1236    mut bank_snapshots: Vec<BankSnapshotInfo>,
1237) -> Option<BankSnapshotInfo> {
1238    bank_snapshots.sort_unstable();
1239    bank_snapshots.into_iter().next_back()
1240}
1241
1242pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
1243where
1244    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1245{
1246    serialize_snapshot_data_file_capped::<F>(
1247        data_file_path,
1248        MAX_SNAPSHOT_DATA_FILE_SIZE,
1249        serializer,
1250    )
1251}
1252
1253pub fn deserialize_snapshot_data_file<T: Sized>(
1254    data_file_path: &Path,
1255    deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
1256) -> Result<T> {
1257    let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
1258        deserializer(streams.full_snapshot_stream)
1259    };
1260
1261    let wrapped_data_file_path = SnapshotRootPaths {
1262        full_snapshot_root_file_path: data_file_path.to_path_buf(),
1263        incremental_snapshot_root_file_path: None,
1264    };
1265
1266    deserialize_snapshot_data_files_capped(
1267        &wrapped_data_file_path,
1268        MAX_SNAPSHOT_DATA_FILE_SIZE,
1269        wrapped_deserializer,
1270    )
1271}
1272
1273pub fn deserialize_snapshot_data_files<T: Sized>(
1274    snapshot_root_paths: &SnapshotRootPaths,
1275    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1276) -> Result<T> {
1277    deserialize_snapshot_data_files_capped(
1278        snapshot_root_paths,
1279        MAX_SNAPSHOT_DATA_FILE_SIZE,
1280        deserializer,
1281    )
1282}
1283
1284fn serialize_snapshot_data_file_capped<F>(
1285    data_file_path: &Path,
1286    maximum_file_size: u64,
1287    serializer: F,
1288) -> Result<u64>
1289where
1290    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1291{
1292    let data_file = fs::File::create(data_file_path)?;
1293    let mut data_file_stream = BufWriter::new(data_file);
1294    serializer(&mut data_file_stream)?;
1295    data_file_stream.flush()?;
1296
1297    let consumed_size = data_file_stream.stream_position()?;
1298    if consumed_size > maximum_file_size {
1299        let error_message = format!(
1300            "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
1301            data_file_path.display(),
1302        );
1303        return Err(IoError::other(error_message).into());
1304    }
1305    Ok(consumed_size)
1306}
1307
1308fn deserialize_snapshot_data_files_capped<T: Sized>(
1309    snapshot_root_paths: &SnapshotRootPaths,
1310    maximum_file_size: u64,
1311    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1312) -> Result<T> {
1313    let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
1314        create_snapshot_data_file_stream(
1315            &snapshot_root_paths.full_snapshot_root_file_path,
1316            maximum_file_size,
1317        )?;
1318
1319    let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
1320        if let Some(ref incremental_snapshot_root_file_path) =
1321            snapshot_root_paths.incremental_snapshot_root_file_path
1322        {
1323            Some(create_snapshot_data_file_stream(
1324                incremental_snapshot_root_file_path,
1325                maximum_file_size,
1326            )?)
1327        } else {
1328            None
1329        }
1330        .unzip();
1331
1332    let mut snapshot_streams = SnapshotStreams {
1333        full_snapshot_stream: &mut full_snapshot_data_file_stream,
1334        incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
1335    };
1336    let ret = deserializer(&mut snapshot_streams)?;
1337
1338    check_deserialize_file_consumed(
1339        full_snapshot_file_size,
1340        &snapshot_root_paths.full_snapshot_root_file_path,
1341        &mut full_snapshot_data_file_stream,
1342    )?;
1343
1344    if let Some(ref incremental_snapshot_root_file_path) =
1345        snapshot_root_paths.incremental_snapshot_root_file_path
1346    {
1347        check_deserialize_file_consumed(
1348            incremental_snapshot_file_size.unwrap(),
1349            incremental_snapshot_root_file_path,
1350            incremental_snapshot_data_file_stream.as_mut().unwrap(),
1351        )?;
1352    }
1353
1354    Ok(ret)
1355}
1356
1357/// Before running the deserializer function, perform common operations on the snapshot archive
1358/// files, such as checking the file size and opening the file into a stream.
1359fn create_snapshot_data_file_stream(
1360    snapshot_root_file_path: impl AsRef<Path>,
1361    maximum_file_size: u64,
1362) -> Result<(u64, BufReader<std::fs::File>)> {
1363    let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
1364
1365    if snapshot_file_size > maximum_file_size {
1366        let error_message = format!(
1367            "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
1368            snapshot_root_file_path.as_ref().display(),
1369            snapshot_file_size,
1370            maximum_file_size,
1371        );
1372        return Err(IoError::other(error_message).into());
1373    }
1374
1375    let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
1376    let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
1377
1378    Ok((snapshot_file_size, snapshot_data_file_stream))
1379}
1380
1381/// After running the deserializer function, perform common checks to ensure the snapshot archive
1382/// files were consumed correctly.
1383fn check_deserialize_file_consumed(
1384    file_size: u64,
1385    file_path: impl AsRef<Path>,
1386    file_stream: &mut BufReader<std::fs::File>,
1387) -> Result<()> {
1388    let consumed_size = file_stream.stream_position()?;
1389
1390    if consumed_size != file_size {
1391        let error_message = format!(
1392            "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to deserialize",
1393            file_path.as_ref().display(),
1394            file_size,
1395            consumed_size,
1396        );
1397        return Err(IoError::other(error_message).into());
1398    }
1399
1400    Ok(())
1401}
1402
1403/// Return account path from the appendvec path after checking its format.
1404fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
1405    let run_path = appendvec_path.parent()?;
1406    let run_file_name = run_path.file_name()?;
1407    // All appendvec files should be under <account_path>/run/.
1408    // When generating the bank snapshot directory, they are hardlinked to <account_path>/snapshot/<slot>/
1409    if run_file_name != ACCOUNTS_RUN_DIR {
1410        error!(
1411            "The account path {} does not have run/ as its immediate parent directory.",
1412            run_path.display()
1413        );
1414        return None;
1415    }
1416    let account_path = run_path.parent()?;
1417    Some(account_path.to_path_buf())
1418}
1419
1420/// From an appendvec path, derive the snapshot hardlink path.  If the corresponding snapshot hardlink
1421/// directory does not exist, create it.
1422fn get_snapshot_accounts_hardlink_dir(
1423    appendvec_path: &Path,
1424    bank_slot: Slot,
1425    account_paths: &mut HashSet<PathBuf>,
1426    hardlinks_dir: impl AsRef<Path>,
1427) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1428    let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1429        GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1430    })?;
1431
1432    let snapshot_hardlink_dir = account_path
1433        .join(ACCOUNTS_SNAPSHOT_DIR)
1434        .join(bank_slot.to_string());
1435
1436    // Use the hashset to track, to avoid checking the file system.  Only set up the hardlink directory
1437    // and the symlink to it at the first time of seeing the account_path.
1438    if !account_paths.contains(&account_path) {
1439        let idx = account_paths.len();
1440        debug!(
1441            "for appendvec_path {}, create hard-link path {}",
1442            appendvec_path.display(),
1443            snapshot_hardlink_dir.display()
1444        );
1445        fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1446            GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1447                err,
1448                snapshot_hardlink_dir.clone(),
1449            )
1450        })?;
1451        let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1452        symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1453            GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1454                source: err,
1455                original: snapshot_hardlink_dir.clone(),
1456                link: symlink_path,
1457            }
1458        })?;
1459        account_paths.insert(account_path);
1460    };
1461
1462    Ok(snapshot_hardlink_dir)
1463}
1464
1465/// Hard-link the files from accounts/ to snapshot/<bank_slot>/accounts/
1466/// This keeps the appendvec files alive and with the bank snapshot.  The slot and id
1467/// in the file names are also updated in case its file is a recycled one with inconsistent slot
1468/// and id.
1469pub fn hard_link_storages_to_snapshot(
1470    bank_snapshot_dir: impl AsRef<Path>,
1471    bank_slot: Slot,
1472    snapshot_storages: &[Arc<AccountStorageEntry>],
1473) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1474    let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1475    fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1476        HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1477            err,
1478            accounts_hardlinks_dir.clone(),
1479        )
1480    })?;
1481
1482    let mut account_paths: HashSet<PathBuf> = HashSet::new();
1483    for storage in snapshot_storages {
1484        let storage_path = storage.accounts.path();
1485        let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1486            storage_path,
1487            bank_slot,
1488            &mut account_paths,
1489            &accounts_hardlinks_dir,
1490        )?;
1491        // The appendvec could be recycled, so its filename may not be consistent to the slot and id.
1492        // Use the storage slot and id to compose a consistent file name for the hard-link file.
1493        let hardlink_filename = AccountsFile::file_name(storage.slot(), storage.id());
1494        let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1495        fs::hard_link(storage_path, &hard_link_path).map_err(|err| {
1496            HardLinkStoragesToSnapshotError::HardLinkStorage(
1497                err,
1498                storage_path.to_path_buf(),
1499                hard_link_path,
1500            )
1501        })?;
1502    }
1503    Ok(())
1504}
1505
1506/// serializing needs Vec<Vec<Arc<AccountStorageEntry>>>, but data structure at runtime is Vec<Arc<AccountStorageEntry>>
1507/// translates to what we need
1508pub(crate) fn get_storages_to_serialize(
1509    snapshot_storages: &[Arc<AccountStorageEntry>],
1510) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1511    snapshot_storages
1512        .iter()
1513        .map(|storage| vec![Arc::clone(storage)])
1514        .collect::<Vec<_>>()
1515}
1516
1517// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later.
1518const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
1519
1520/// Unarchives the given full and incremental snapshot archives, as long as they are compatible.
1521pub fn verify_and_unarchive_snapshots(
1522    bank_snapshots_dir: impl AsRef<Path>,
1523    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1524    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1525    account_paths: &[PathBuf],
1526    storage_access: StorageAccess,
1527) -> Result<(
1528    UnarchivedSnapshot,
1529    Option<UnarchivedSnapshot>,
1530    AtomicAccountsFileId,
1531)> {
1532    check_are_snapshots_compatible(
1533        full_snapshot_archive_info,
1534        incremental_snapshot_archive_info,
1535    )?;
1536
1537    let parallel_divisions = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT);
1538
1539    let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
1540    let unarchived_full_snapshot = unarchive_snapshot(
1541        &bank_snapshots_dir,
1542        TMP_SNAPSHOT_ARCHIVE_PREFIX,
1543        full_snapshot_archive_info.path(),
1544        "snapshot untar",
1545        account_paths,
1546        full_snapshot_archive_info.archive_format(),
1547        parallel_divisions,
1548        next_append_vec_id.clone(),
1549        storage_access,
1550    )?;
1551
1552    let unarchived_incremental_snapshot =
1553        if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1554            let unarchived_incremental_snapshot = unarchive_snapshot(
1555                &bank_snapshots_dir,
1556                TMP_SNAPSHOT_ARCHIVE_PREFIX,
1557                incremental_snapshot_archive_info.path(),
1558                "incremental snapshot untar",
1559                account_paths,
1560                incremental_snapshot_archive_info.archive_format(),
1561                parallel_divisions,
1562                next_append_vec_id.clone(),
1563                storage_access,
1564            )?;
1565            Some(unarchived_incremental_snapshot)
1566        } else {
1567            None
1568        };
1569
1570    Ok((
1571        unarchived_full_snapshot,
1572        unarchived_incremental_snapshot,
1573        Arc::try_unwrap(next_append_vec_id).unwrap(),
1574    ))
1575}
1576
1577/// Spawns a thread for unpacking a snapshot
1578fn spawn_unpack_snapshot_thread(
1579    file_sender: Sender<PathBuf>,
1580    account_paths: Arc<Vec<PathBuf>>,
1581    ledger_dir: Arc<PathBuf>,
1582    mut archive: Archive<SharedBufferReader>,
1583    parallel_selector: Option<ParallelSelector>,
1584    thread_index: usize,
1585) -> JoinHandle<()> {
1586    Builder::new()
1587        .name(format!("solUnpkSnpsht{thread_index:02}"))
1588        .spawn(move || {
1589            hardened_unpack::streaming_unpack_snapshot(
1590                &mut archive,
1591                ledger_dir.as_path(),
1592                &account_paths,
1593                parallel_selector,
1594                &file_sender,
1595            )
1596            .unwrap();
1597        })
1598        .unwrap()
1599}
1600
1601/// Streams unpacked files across channel
1602fn streaming_unarchive_snapshot(
1603    file_sender: Sender<PathBuf>,
1604    account_paths: Vec<PathBuf>,
1605    ledger_dir: PathBuf,
1606    snapshot_archive_path: PathBuf,
1607    archive_format: ArchiveFormat,
1608    num_threads: usize,
1609) -> Vec<JoinHandle<()>> {
1610    let account_paths = Arc::new(account_paths);
1611    let ledger_dir = Arc::new(ledger_dir);
1612    let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format);
1613
1614    // All shared buffer readers need to be created before the threads are spawned
1615    let archives: Vec<_> = (0..num_threads)
1616        .map(|_| {
1617            let reader = SharedBufferReader::new(&shared_buffer);
1618            Archive::new(reader)
1619        })
1620        .collect();
1621
1622    archives
1623        .into_iter()
1624        .enumerate()
1625        .map(|(thread_index, archive)| {
1626            let parallel_selector = Some(ParallelSelector {
1627                index: thread_index,
1628                divisions: num_threads,
1629            });
1630
1631            spawn_unpack_snapshot_thread(
1632                file_sender.clone(),
1633                account_paths.clone(),
1634                ledger_dir.clone(),
1635                archive,
1636                parallel_selector,
1637                thread_index,
1638            )
1639        })
1640        .collect()
1641}
1642
1643/// BankSnapshotInfo::new_from_dir() requires a few meta files to accept a snapshot dir
1644/// as a valid one.  A dir unpacked from an archive lacks these files.  Fill them here to
1645/// allow new_from_dir() checks to pass.  These checks are not needed for unpacked dirs,
1646/// but it is not clean to add another flag to new_from_dir() to skip them.
1647fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1648    let snapshots_dir = unpack_dir.as_ref().join("snapshots");
1649    if !snapshots_dir.is_dir() {
1650        return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1651    }
1652
1653    // The unpacked dir has a single slot dir, which is the snapshot slot dir.
1654    let slot_dir = std::fs::read_dir(&snapshots_dir)
1655        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1656        .find(|entry| entry.as_ref().unwrap().path().is_dir())
1657        .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1658        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1659        .path();
1660
1661    let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
1662    fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
1663
1664    let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1665    fs::hard_link(
1666        status_cache_file,
1667        slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
1668    )?;
1669
1670    let state_complete_file = slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
1671    fs::File::create(state_complete_file)?;
1672
1673    Ok(())
1674}
1675
1676/// Perform the common tasks when unarchiving a snapshot.  Handles creating the temporary
1677/// directories, untaring, reading the version file, and then returning those fields plus the
1678/// rebuilt storage
1679fn unarchive_snapshot(
1680    bank_snapshots_dir: impl AsRef<Path>,
1681    unpacked_snapshots_dir_prefix: &'static str,
1682    snapshot_archive_path: impl AsRef<Path>,
1683    measure_name: &'static str,
1684    account_paths: &[PathBuf],
1685    archive_format: ArchiveFormat,
1686    parallel_divisions: usize,
1687    next_append_vec_id: Arc<AtomicAccountsFileId>,
1688    storage_access: StorageAccess,
1689) -> Result<UnarchivedSnapshot> {
1690    let unpack_dir = tempfile::Builder::new()
1691        .prefix(unpacked_snapshots_dir_prefix)
1692        .tempdir_in(bank_snapshots_dir)?;
1693    let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
1694
1695    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1696    streaming_unarchive_snapshot(
1697        file_sender,
1698        account_paths.to_vec(),
1699        unpack_dir.path().to_path_buf(),
1700        snapshot_archive_path.as_ref().to_path_buf(),
1701        archive_format,
1702        parallel_divisions,
1703    );
1704
1705    let num_rebuilder_threads = num_cpus::get_physical()
1706        .saturating_sub(parallel_divisions)
1707        .max(1);
1708    let (version_and_storages, measure_untar) = measure_time!(
1709        SnapshotStorageRebuilder::rebuild_storage(
1710            file_receiver,
1711            num_rebuilder_threads,
1712            next_append_vec_id,
1713            SnapshotFrom::Archive,
1714            storage_access,
1715        )?,
1716        measure_name
1717    );
1718    info!("{}", measure_untar);
1719
1720    create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1721
1722    let RebuiltSnapshotStorage {
1723        snapshot_version,
1724        storage,
1725    } = version_and_storages;
1726    Ok(UnarchivedSnapshot {
1727        unpack_dir,
1728        storage,
1729        unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1730            unpacked_snapshots_dir,
1731            snapshot_version,
1732        },
1733        measure_untar,
1734    })
1735}
1736
1737/// Streams snapshot dir files across channel
1738/// Follow the flow of streaming_unarchive_snapshot(), but handle the from_dir case.
1739fn streaming_snapshot_dir_files(
1740    file_sender: Sender<PathBuf>,
1741    snapshot_file_path: impl Into<PathBuf>,
1742    snapshot_version_path: impl Into<PathBuf>,
1743    account_paths: &[PathBuf],
1744) -> Result<()> {
1745    file_sender.send(snapshot_file_path.into())?;
1746    file_sender.send(snapshot_version_path.into())?;
1747
1748    for account_path in account_paths {
1749        for file in fs::read_dir(account_path)? {
1750            file_sender.send(file?.path())?;
1751        }
1752    }
1753
1754    Ok(())
1755}
1756
1757/// Performs the common tasks when deserializing a snapshot
1758///
1759/// Handles reading the snapshot file and version file,
1760/// then returning those fields plus the rebuilt storages.
1761pub fn rebuild_storages_from_snapshot_dir(
1762    snapshot_info: &BankSnapshotInfo,
1763    account_paths: &[PathBuf],
1764    next_append_vec_id: Arc<AtomicAccountsFileId>,
1765    storage_access: StorageAccess,
1766) -> Result<AccountStorageMap> {
1767    let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1768    let accounts_hardlinks = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1769    let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1770
1771    let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1772        IoError::other(format!(
1773            "failed to read accounts hardlinks dir '{}': {err}",
1774            accounts_hardlinks.display(),
1775        ))
1776    })?;
1777    for dir_entry in read_dir {
1778        let symlink_path = dir_entry?.path();
1779        // The symlink point to <account_path>/snapshot/<slot> which contain the account files hardlinks
1780        // The corresponding run path should be <account_path>/run/
1781        let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
1782            IoError::other(format!(
1783                "failed to read symlink '{}': {err}",
1784                symlink_path.display(),
1785            ))
1786        })?;
1787        let account_run_path = account_snapshot_path
1788            .parent()
1789            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1790            .parent()
1791            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1792            .join(ACCOUNTS_RUN_DIR);
1793        if !account_run_paths.contains(&account_run_path) {
1794            // The appendvec from the bank snapshot storage does not match any of the provided account_paths set.
1795            // The accout paths have changed so the snapshot is no longer usable.
1796            return Err(SnapshotError::AccountPathsMismatch);
1797        }
1798        // Generate hard-links to make the account files available in the main accounts/, and let the new appendvec
1799        // paths be in accounts/
1800        let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
1801            IoError::other(format!(
1802                "failed to read account snapshot dir '{}': {err}",
1803                account_snapshot_path.display(),
1804            ))
1805        })?;
1806        for file in read_dir {
1807            let file_path = file?.path();
1808            let file_name = file_path
1809                .file_name()
1810                .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
1811            let dest_path = account_run_path.join(file_name);
1812            fs::hard_link(&file_path, &dest_path).map_err(|err| {
1813                IoError::other(format!(
1814                    "failed to hard link from '{}' to '{}': {err}",
1815                    file_path.display(),
1816                    dest_path.display(),
1817                ))
1818            })?;
1819        }
1820    }
1821
1822    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1823    let snapshot_file_path = &snapshot_info.snapshot_path();
1824    let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1825    streaming_snapshot_dir_files(
1826        file_sender,
1827        snapshot_file_path,
1828        snapshot_version_path,
1829        account_paths,
1830    )?;
1831
1832    let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1833    let version_and_storages = SnapshotStorageRebuilder::rebuild_storage(
1834        file_receiver,
1835        num_rebuilder_threads,
1836        next_append_vec_id,
1837        SnapshotFrom::Dir,
1838        storage_access,
1839    )?;
1840
1841    let RebuiltSnapshotStorage {
1842        snapshot_version: _,
1843        storage,
1844    } = version_and_storages;
1845    Ok(storage)
1846}
1847
1848/// Reads the `snapshot_version` from a file. Before opening the file, its size
1849/// is compared to `MAX_SNAPSHOT_VERSION_FILE_SIZE`. If the size exceeds this
1850/// threshold, it is not opened and an error is returned.
1851fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
1852    // Check file size.
1853    let file_metadata = fs::metadata(&path).map_err(|err| {
1854        IoError::other(format!(
1855            "failed to query snapshot version file metadata '{}': {err}",
1856            path.as_ref().display(),
1857        ))
1858    })?;
1859    let file_size = file_metadata.len();
1860    if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
1861        let error_message = format!(
1862            "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
1863            path.as_ref().display(),
1864            file_size,
1865            MAX_SNAPSHOT_VERSION_FILE_SIZE,
1866        );
1867        return Err(IoError::other(error_message).into());
1868    }
1869
1870    // Read snapshot_version from file.
1871    let mut snapshot_version = String::new();
1872    let mut file = fs::File::open(&path).map_err(|err| {
1873        IoError::other(format!(
1874            "failed to open snapshot version file '{}': {err}",
1875            path.as_ref().display()
1876        ))
1877    })?;
1878    file.read_to_string(&mut snapshot_version).map_err(|err| {
1879        IoError::other(format!(
1880            "failed to read snapshot version from file '{}': {err}",
1881            path.as_ref().display()
1882        ))
1883    })?;
1884
1885    Ok(snapshot_version.trim().to_string())
1886}
1887
1888/// Check if an incremental snapshot is compatible with a full snapshot.  This is done by checking
1889/// if the incremental snapshot's base slot is the same as the full snapshot's slot.
1890fn check_are_snapshots_compatible(
1891    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1892    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1893) -> Result<()> {
1894    if incremental_snapshot_archive_info.is_none() {
1895        return Ok(());
1896    }
1897
1898    let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
1899
1900    (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
1901        .then_some(())
1902        .ok_or_else(|| {
1903            SnapshotError::MismatchedBaseSlot(
1904                full_snapshot_archive_info.slot(),
1905                incremental_snapshot_archive_info.base_slot(),
1906            )
1907        })
1908}
1909
1910/// Get the `&str` from a `&Path`
1911pub fn path_to_file_name_str(path: &Path) -> Result<&str> {
1912    path.file_name()
1913        .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))?
1914        .to_str()
1915        .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf()))
1916}
1917
1918pub fn build_snapshot_archives_remote_dir(snapshot_archives_dir: impl AsRef<Path>) -> PathBuf {
1919    snapshot_archives_dir
1920        .as_ref()
1921        .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
1922}
1923
1924/// Build the full snapshot archive path from its components: the snapshot archives directory, the
1925/// snapshot slot, the accounts hash, and the archive format.
1926pub fn build_full_snapshot_archive_path(
1927    full_snapshot_archives_dir: impl AsRef<Path>,
1928    slot: Slot,
1929    hash: &SnapshotHash,
1930    archive_format: ArchiveFormat,
1931) -> PathBuf {
1932    full_snapshot_archives_dir.as_ref().join(format!(
1933        "snapshot-{}-{}.{}",
1934        slot,
1935        hash.0,
1936        archive_format.extension(),
1937    ))
1938}
1939
1940/// Build the incremental snapshot archive path from its components: the snapshot archives
1941/// directory, the snapshot base slot, the snapshot slot, the accounts hash, and the archive
1942/// format.
1943pub fn build_incremental_snapshot_archive_path(
1944    incremental_snapshot_archives_dir: impl AsRef<Path>,
1945    base_slot: Slot,
1946    slot: Slot,
1947    hash: &SnapshotHash,
1948    archive_format: ArchiveFormat,
1949) -> PathBuf {
1950    incremental_snapshot_archives_dir.as_ref().join(format!(
1951        "incremental-snapshot-{}-{}-{}.{}",
1952        base_slot,
1953        slot,
1954        hash.0,
1955        archive_format.extension(),
1956    ))
1957}
1958
1959/// Parse a full snapshot archive filename into its Slot, Hash, and Archive Format
1960pub(crate) fn parse_full_snapshot_archive_filename(
1961    archive_filename: &str,
1962) -> Result<(Slot, SnapshotHash, ArchiveFormat)> {
1963    lazy_static! {
1964        static ref RE: Regex = Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1965    }
1966
1967    let do_parse = || {
1968        RE.captures(archive_filename).and_then(|captures| {
1969            let slot = captures
1970                .name("slot")
1971                .map(|x| x.as_str().parse::<Slot>())?
1972                .ok()?;
1973            let hash = captures
1974                .name("hash")
1975                .map(|x| x.as_str().parse::<Hash>())?
1976                .ok()?;
1977            let archive_format = captures
1978                .name("ext")
1979                .map(|x| x.as_str().parse::<ArchiveFormat>())?
1980                .ok()?;
1981
1982            Some((slot, SnapshotHash(hash), archive_format))
1983        })
1984    };
1985
1986    do_parse().ok_or_else(|| {
1987        SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
1988    })
1989}
1990
1991/// Parse an incremental snapshot archive filename into its base Slot, actual Slot, Hash, and Archive Format
1992pub(crate) fn parse_incremental_snapshot_archive_filename(
1993    archive_filename: &str,
1994) -> Result<(Slot, Slot, SnapshotHash, ArchiveFormat)> {
1995    lazy_static! {
1996        static ref RE: Regex = Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1997    }
1998
1999    let do_parse = || {
2000        RE.captures(archive_filename).and_then(|captures| {
2001            let base_slot = captures
2002                .name("base")
2003                .map(|x| x.as_str().parse::<Slot>())?
2004                .ok()?;
2005            let slot = captures
2006                .name("slot")
2007                .map(|x| x.as_str().parse::<Slot>())?
2008                .ok()?;
2009            let hash = captures
2010                .name("hash")
2011                .map(|x| x.as_str().parse::<Hash>())?
2012                .ok()?;
2013            let archive_format = captures
2014                .name("ext")
2015                .map(|x| x.as_str().parse::<ArchiveFormat>())?
2016                .ok()?;
2017
2018            Some((base_slot, slot, SnapshotHash(hash), archive_format))
2019        })
2020    };
2021
2022    do_parse().ok_or_else(|| {
2023        SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2024    })
2025}
2026
2027/// Walk down the snapshot archive to collect snapshot archive file info
2028fn get_snapshot_archives<T, F>(snapshot_archives_dir: &Path, cb: F) -> Vec<T>
2029where
2030    F: Fn(PathBuf) -> Result<T>,
2031{
2032    let walk_dir = |dir: &Path| -> Vec<T> {
2033        let entry_iter = fs::read_dir(dir);
2034        match entry_iter {
2035            Err(err) => {
2036                info!(
2037                    "Unable to read snapshot archives directory '{}': {err}",
2038                    dir.display(),
2039                );
2040                vec![]
2041            }
2042            Ok(entries) => entries
2043                .filter_map(|entry| entry.map_or(None, |entry| cb(entry.path()).ok()))
2044                .collect(),
2045        }
2046    };
2047
2048    let mut ret = walk_dir(snapshot_archives_dir);
2049    let remote_dir = build_snapshot_archives_remote_dir(snapshot_archives_dir);
2050    if remote_dir.exists() {
2051        ret.append(&mut walk_dir(remote_dir.as_ref()));
2052    }
2053    ret
2054}
2055
2056/// Get a list of the full snapshot archives from a directory
2057pub fn get_full_snapshot_archives(
2058    full_snapshot_archives_dir: impl AsRef<Path>,
2059) -> Vec<FullSnapshotArchiveInfo> {
2060    get_snapshot_archives(
2061        full_snapshot_archives_dir.as_ref(),
2062        FullSnapshotArchiveInfo::new_from_path,
2063    )
2064}
2065
2066/// Get a list of the incremental snapshot archives from a directory
2067pub fn get_incremental_snapshot_archives(
2068    incremental_snapshot_archives_dir: impl AsRef<Path>,
2069) -> Vec<IncrementalSnapshotArchiveInfo> {
2070    get_snapshot_archives(
2071        incremental_snapshot_archives_dir.as_ref(),
2072        IncrementalSnapshotArchiveInfo::new_from_path,
2073    )
2074}
2075
2076/// Get the highest slot of the full snapshot archives in a directory
2077pub fn get_highest_full_snapshot_archive_slot(
2078    full_snapshot_archives_dir: impl AsRef<Path>,
2079) -> Option<Slot> {
2080    get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
2081        .map(|full_snapshot_archive_info| full_snapshot_archive_info.slot())
2082}
2083
2084/// Get the highest slot of the incremental snapshot archives in a directory, for a given full
2085/// snapshot slot
2086pub fn get_highest_incremental_snapshot_archive_slot(
2087    incremental_snapshot_archives_dir: impl AsRef<Path>,
2088    full_snapshot_slot: Slot,
2089) -> Option<Slot> {
2090    get_highest_incremental_snapshot_archive_info(
2091        incremental_snapshot_archives_dir,
2092        full_snapshot_slot,
2093    )
2094    .map(|incremental_snapshot_archive_info| incremental_snapshot_archive_info.slot())
2095}
2096
2097/// Get the path (and metadata) for the full snapshot archive with the highest slot in a directory
2098pub fn get_highest_full_snapshot_archive_info(
2099    full_snapshot_archives_dir: impl AsRef<Path>,
2100) -> Option<FullSnapshotArchiveInfo> {
2101    let mut full_snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2102    full_snapshot_archives.sort_unstable();
2103    full_snapshot_archives.into_iter().next_back()
2104}
2105
2106/// Get the path for the incremental snapshot archive with the highest slot, for a given full
2107/// snapshot slot, in a directory
2108pub fn get_highest_incremental_snapshot_archive_info(
2109    incremental_snapshot_archives_dir: impl AsRef<Path>,
2110    full_snapshot_slot: Slot,
2111) -> Option<IncrementalSnapshotArchiveInfo> {
2112    // Since we want to filter down to only the incremental snapshot archives that have the same
2113    // full snapshot slot as the value passed in, perform the filtering before sorting to avoid
2114    // doing unnecessary work.
2115    let mut incremental_snapshot_archives =
2116        get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
2117            .into_iter()
2118            .filter(|incremental_snapshot_archive_info| {
2119                incremental_snapshot_archive_info.base_slot() == full_snapshot_slot
2120            })
2121            .collect::<Vec<_>>();
2122    incremental_snapshot_archives.sort_unstable();
2123    incremental_snapshot_archives.into_iter().next_back()
2124}
2125
2126pub fn purge_old_snapshot_archives(
2127    full_snapshot_archives_dir: impl AsRef<Path>,
2128    incremental_snapshot_archives_dir: impl AsRef<Path>,
2129    maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2130    maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2131) {
2132    info!(
2133        "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
2134        full_snapshot_archives_dir.as_ref().display(),
2135        maximum_full_snapshot_archives_to_retain
2136    );
2137
2138    let mut full_snapshot_archives = get_full_snapshot_archives(&full_snapshot_archives_dir);
2139    full_snapshot_archives.sort_unstable();
2140    full_snapshot_archives.reverse();
2141
2142    let num_to_retain = full_snapshot_archives
2143        .len()
2144        .min(maximum_full_snapshot_archives_to_retain.get());
2145    trace!(
2146        "There are {} full snapshot archives, retaining {}",
2147        full_snapshot_archives.len(),
2148        num_to_retain,
2149    );
2150
2151    let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
2152        if full_snapshot_archives.is_empty() {
2153            None
2154        } else {
2155            Some(full_snapshot_archives.split_at(num_to_retain))
2156        }
2157        .unwrap_or_default();
2158
2159    let retained_full_snapshot_slots = full_snapshot_archives_to_retain
2160        .iter()
2161        .map(|ai| ai.slot())
2162        .collect::<HashSet<_>>();
2163
2164    fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
2165        for path in archives.iter().map(|a| a.path()) {
2166            trace!("Removing snapshot archive: {}", path.display());
2167            let result = fs::remove_file(path);
2168            if let Err(err) = result {
2169                info!(
2170                    "Failed to remove snapshot archive '{}': {err}",
2171                    path.display()
2172                );
2173            }
2174        }
2175    }
2176    remove_archives(full_snapshot_archives_to_remove);
2177
2178    info!(
2179        "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
2180        incremental_snapshot_archives_dir.as_ref().display(),
2181        maximum_incremental_snapshot_archives_to_retain
2182    );
2183    let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
2184    for incremental_snapshot_archive in
2185        get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
2186    {
2187        incremental_snapshot_archives_by_base_slot
2188            .entry(incremental_snapshot_archive.base_slot())
2189            .or_default()
2190            .push(incremental_snapshot_archive)
2191    }
2192
2193    let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
2194    for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
2195    {
2196        incremental_snapshot_archives.sort_unstable();
2197        let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
2198            maximum_incremental_snapshot_archives_to_retain.get()
2199        } else {
2200            usize::from(retained_full_snapshot_slots.contains(&base_slot))
2201        };
2202        trace!(
2203            "There are {} incremental snapshot archives for base slot {}, removing {} of them",
2204            incremental_snapshot_archives.len(),
2205            base_slot,
2206            incremental_snapshot_archives
2207                .len()
2208                .saturating_sub(num_to_retain),
2209        );
2210
2211        incremental_snapshot_archives.truncate(
2212            incremental_snapshot_archives
2213                .len()
2214                .saturating_sub(num_to_retain),
2215        );
2216        remove_archives(&incremental_snapshot_archives);
2217    }
2218}
2219
2220#[cfg(feature = "dev-context-only-utils")]
2221fn unpack_snapshot_local(
2222    shared_buffer: SharedBuffer,
2223    ledger_dir: &Path,
2224    account_paths: &[PathBuf],
2225    parallel_divisions: usize,
2226) -> Result<UnpackedAppendVecMap> {
2227    assert!(parallel_divisions > 0);
2228
2229    // allocate all readers before any readers start reading
2230    let readers = (0..parallel_divisions)
2231        .map(|_| SharedBufferReader::new(&shared_buffer))
2232        .collect::<Vec<_>>();
2233
2234    // create 'parallel_divisions' # of parallel workers, each responsible for 1/parallel_divisions of all the files to extract.
2235    let all_unpacked_append_vec_map = readers
2236        .into_par_iter()
2237        .enumerate()
2238        .map(|(index, reader)| {
2239            let parallel_selector = Some(ParallelSelector {
2240                index,
2241                divisions: parallel_divisions,
2242            });
2243            let mut archive = Archive::new(reader);
2244            hardened_unpack::unpack_snapshot(
2245                &mut archive,
2246                ledger_dir,
2247                account_paths,
2248                parallel_selector,
2249            )
2250        })
2251        .collect::<Vec<_>>();
2252
2253    let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
2254    for h in all_unpacked_append_vec_map {
2255        unpacked_append_vec_map.extend(h?);
2256    }
2257
2258    Ok(unpacked_append_vec_map)
2259}
2260
2261fn untar_snapshot_create_shared_buffer(
2262    snapshot_tar: &Path,
2263    archive_format: ArchiveFormat,
2264) -> SharedBuffer {
2265    let open_file = || {
2266        fs::File::open(snapshot_tar)
2267            .map_err(|err| {
2268                IoError::other(format!(
2269                    "failed to open snapshot archive '{}': {err}",
2270                    snapshot_tar.display(),
2271                ))
2272            })
2273            .unwrap()
2274    };
2275    match archive_format {
2276        ArchiveFormat::TarBzip2 => SharedBuffer::new(BzDecoder::new(BufReader::new(open_file()))),
2277        ArchiveFormat::TarGzip => SharedBuffer::new(GzDecoder::new(BufReader::new(open_file()))),
2278        ArchiveFormat::TarZstd { .. } => SharedBuffer::new(
2279            zstd::stream::read::Decoder::new(BufReader::new(open_file())).unwrap(),
2280        ),
2281        ArchiveFormat::TarLz4 => {
2282            SharedBuffer::new(lz4::Decoder::new(BufReader::new(open_file())).unwrap())
2283        }
2284        ArchiveFormat::Tar => SharedBuffer::new(BufReader::new(open_file())),
2285    }
2286}
2287
2288#[cfg(feature = "dev-context-only-utils")]
2289fn untar_snapshot_in(
2290    snapshot_tar: impl AsRef<Path>,
2291    unpack_dir: &Path,
2292    account_paths: &[PathBuf],
2293    archive_format: ArchiveFormat,
2294    parallel_divisions: usize,
2295) -> Result<UnpackedAppendVecMap> {
2296    let shared_buffer = untar_snapshot_create_shared_buffer(snapshot_tar.as_ref(), archive_format);
2297    unpack_snapshot_local(shared_buffer, unpack_dir, account_paths, parallel_divisions)
2298}
2299
2300pub fn verify_unpacked_snapshots_dir_and_version(
2301    unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
2302) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
2303    info!(
2304        "snapshot version: {}",
2305        &unpacked_snapshots_dir_and_version.snapshot_version
2306    );
2307
2308    let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
2309    let mut bank_snapshots =
2310        get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
2311    if bank_snapshots.len() > 1 {
2312        return Err(IoError::other(format!(
2313            "invalid snapshot format: only one snapshot allowed, but found {}",
2314            bank_snapshots.len(),
2315        ))
2316        .into());
2317    }
2318    let root_paths = bank_snapshots.pop().ok_or_else(|| {
2319        IoError::other(format!(
2320            "no snapshots found in snapshots directory '{}'",
2321            unpacked_snapshots_dir_and_version
2322                .unpacked_snapshots_dir
2323                .display(),
2324        ))
2325    })?;
2326    Ok((snapshot_version, root_paths))
2327}
2328
2329/// Returns the file name of the bank snapshot for `slot`
2330pub fn get_snapshot_file_name(slot: Slot) -> String {
2331    slot.to_string()
2332}
2333
2334/// Constructs the path to the bank snapshot directory for `slot` within `bank_snapshots_dir`
2335pub fn get_bank_snapshot_dir(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) -> PathBuf {
2336    bank_snapshots_dir
2337        .as_ref()
2338        .join(get_snapshot_file_name(slot))
2339}
2340
2341#[derive(Debug, Copy, Clone)]
2342/// allow tests to specify what happened to the serialized format
2343pub enum VerifyBank {
2344    /// the bank's serialized format is expected to be identical to what we are comparing against
2345    Deterministic,
2346    /// the serialized bank was 'reserialized' into a non-deterministic format
2347    /// so, deserialize both files and compare deserialized results
2348    NonDeterministic,
2349}
2350
2351#[cfg(feature = "dev-context-only-utils")]
2352pub fn verify_snapshot_archive(
2353    snapshot_archive: impl AsRef<Path>,
2354    snapshots_to_verify: impl AsRef<Path>,
2355    archive_format: ArchiveFormat,
2356    verify_bank: VerifyBank,
2357    slot: Slot,
2358) {
2359    let temp_dir = tempfile::TempDir::new().unwrap();
2360    let unpack_dir = temp_dir.path();
2361    let unpack_account_dir = create_accounts_run_and_snapshot_dirs(unpack_dir).unwrap().0;
2362    untar_snapshot_in(
2363        snapshot_archive,
2364        unpack_dir,
2365        &[unpack_account_dir.clone()],
2366        archive_format,
2367        1,
2368    )
2369    .unwrap();
2370
2371    // Check snapshots are the same
2372    let unpacked_snapshots = unpack_dir.join("snapshots");
2373
2374    // Since the unpack code collects all the appendvecs into one directory unpack_account_dir, we need to
2375    // collect all the appendvecs in account_paths/<slot>/snapshot/ into one directory for later comparison.
2376    let storages_to_verify = unpack_dir.join("storages_to_verify");
2377    // Create the directory if it doesn't exist
2378    fs::create_dir_all(&storages_to_verify).unwrap();
2379
2380    let slot = slot.to_string();
2381    let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
2382
2383    if let VerifyBank::NonDeterministic = verify_bank {
2384        // file contents may be different, but deserialized structs should be equal
2385        let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
2386        let p2 = unpacked_snapshots.join(&slot).join(&slot);
2387        assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
2388        fs::remove_file(p1).unwrap();
2389        fs::remove_file(p2).unwrap();
2390    }
2391
2392    // The new the status_cache file is inside the slot directory together with the snapshot file.
2393    // When unpacking an archive, the status_cache file from the archive is one-level up outside of
2394    //  the slot directory.
2395    // The unpacked status_cache file need to be put back into the slot directory for the directory
2396    // comparison to pass.
2397    let existing_unpacked_status_cache_file =
2398        unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2399    let new_unpacked_status_cache_file = unpacked_snapshots
2400        .join(&slot)
2401        .join(SNAPSHOT_STATUS_CACHE_FILENAME);
2402    fs::rename(
2403        existing_unpacked_status_cache_file,
2404        new_unpacked_status_cache_file,
2405    )
2406    .unwrap();
2407
2408    let accounts_hardlinks_dir = snapshot_slot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2409    if accounts_hardlinks_dir.is_dir() {
2410        // This directory contain symlinks to all <account_path>/snapshot/<slot> directories.
2411        for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
2412            let link_dst_path = fs::read_link(entry.unwrap().path()).unwrap();
2413            // Copy all the files in dst_path into the storages_to_verify directory.
2414            for entry in fs::read_dir(&link_dst_path).unwrap() {
2415                let src_path = entry.unwrap().path();
2416                let dst_path = storages_to_verify.join(src_path.file_name().unwrap());
2417                fs::copy(src_path, dst_path).unwrap();
2418            }
2419        }
2420        fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
2421    }
2422
2423    let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
2424    if version_path.is_file() {
2425        fs::remove_file(version_path).unwrap();
2426    }
2427
2428    let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2429    if state_complete_path.is_file() {
2430        fs::remove_file(state_complete_path).unwrap();
2431    }
2432
2433    assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
2434
2435    // In the unarchiving case, there is an extra empty "accounts" directory. The account
2436    // files in the archive accounts/ have been expanded to [account_paths].
2437    // Remove the empty "accounts" directory for the directory comparison below.
2438    // In some test cases the directory to compare do not come from unarchiving.
2439    // Ignore the error when this directory does not exist.
2440    _ = fs::remove_dir(unpack_account_dir.join("accounts"));
2441    // Check the account entries are the same
2442    assert!(!dir_diff::is_different(&storages_to_verify, unpack_account_dir).unwrap());
2443}
2444
2445/// Purges all bank snapshots
2446pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
2447    let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2448    purge_bank_snapshots(&bank_snapshots);
2449}
2450
2451/// Purges bank snapshots, retaining the newest `num_bank_snapshots_to_retain`
2452pub fn purge_old_bank_snapshots(
2453    bank_snapshots_dir: impl AsRef<Path>,
2454    num_bank_snapshots_to_retain: usize,
2455    filter_by_kind: Option<BankSnapshotKind>,
2456) {
2457    let mut bank_snapshots = match filter_by_kind {
2458        Some(BankSnapshotKind::Pre) => get_bank_snapshots_pre(&bank_snapshots_dir),
2459        Some(BankSnapshotKind::Post) => get_bank_snapshots_post(&bank_snapshots_dir),
2460        None => get_bank_snapshots(&bank_snapshots_dir),
2461    };
2462
2463    bank_snapshots.sort_unstable();
2464    purge_bank_snapshots(
2465        bank_snapshots
2466            .iter()
2467            .rev()
2468            .skip(num_bank_snapshots_to_retain),
2469    );
2470}
2471
2472/// At startup, purge old (i.e. unusable) bank snapshots
2473///
2474/// Only a single bank snapshot could be needed at startup (when using fast boot), so
2475/// retain the highest bank snapshot "post", and purge the rest.
2476pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
2477    purge_old_bank_snapshots(&bank_snapshots_dir, 0, Some(BankSnapshotKind::Pre));
2478    purge_old_bank_snapshots(&bank_snapshots_dir, 1, Some(BankSnapshotKind::Post));
2479
2480    let highest_bank_snapshot_post = get_highest_bank_snapshot_post(&bank_snapshots_dir);
2481    if let Some(highest_bank_snapshot_post) = highest_bank_snapshot_post {
2482        debug!(
2483            "Retained bank snapshot for slot {}, and purged the rest.",
2484            highest_bank_snapshot_post.slot
2485        );
2486    }
2487}
2488
2489/// Purges bank snapshots that are older than `slot`
2490pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
2491    let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2492    bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
2493    purge_bank_snapshots(&bank_snapshots);
2494}
2495
2496/// Purges all `bank_snapshots`
2497///
2498/// Does not exit early if there is an error while purging a bank snapshot.
2499fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
2500    for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
2501        if purge_bank_snapshot(snapshot_dir).is_err() {
2502            warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
2503        }
2504    }
2505}
2506
2507/// Remove the bank snapshot at this path
2508pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
2509    const FN_ERR: &str = "failed to purge bank snapshot";
2510    let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2511    if accounts_hardlinks_dir.is_dir() {
2512        // This directory contain symlinks to all accounts snapshot directories.
2513        // They should all be removed.
2514        let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
2515            IoError::other(format!(
2516                "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
2517                accounts_hardlinks_dir.display(),
2518            ))
2519        })?;
2520        for entry in read_dir {
2521            let accounts_hardlink_dir = entry?.path();
2522            let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
2523                IoError::other(format!(
2524                    "{FN_ERR}: failed to read symlink '{}': {err}",
2525                    accounts_hardlink_dir.display(),
2526                ))
2527            })?;
2528            move_and_async_delete_path(&accounts_hardlink_dir);
2529        }
2530    }
2531    fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
2532        IoError::other(format!(
2533            "{FN_ERR}: failed to remove dir '{}': {err}",
2534            bank_snapshot_dir.as_ref().display(),
2535        ))
2536    })?;
2537    Ok(())
2538}
2539
2540pub fn should_take_full_snapshot(
2541    block_height: Slot,
2542    full_snapshot_archive_interval_slots: Slot,
2543) -> bool {
2544    block_height % full_snapshot_archive_interval_slots == 0
2545}
2546
2547pub fn should_take_incremental_snapshot(
2548    block_height: Slot,
2549    incremental_snapshot_archive_interval_slots: Slot,
2550    latest_full_snapshot_slot: Option<Slot>,
2551) -> bool {
2552    block_height % incremental_snapshot_archive_interval_slots == 0
2553        && latest_full_snapshot_slot.is_some()
2554}
2555
2556/// Creates an "accounts path" directory for tests
2557///
2558/// This temporary directory will contain the "run" and "snapshot"
2559/// sub-directories required by a validator.
2560#[cfg(feature = "dev-context-only-utils")]
2561pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
2562    let tmp_dir = tempfile::TempDir::new().unwrap();
2563    let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
2564    (tmp_dir, account_dir)
2565}
2566
2567#[cfg(test)]
2568mod tests {
2569    use {
2570        super::*,
2571        assert_matches::assert_matches,
2572        bincode::{deserialize_from, serialize_into},
2573        std::{convert::TryFrom, mem::size_of},
2574        tempfile::NamedTempFile,
2575    };
2576
2577    #[test]
2578    fn test_serialize_snapshot_data_file_under_limit() {
2579        let temp_dir = tempfile::TempDir::new().unwrap();
2580        let expected_consumed_size = size_of::<u32>() as u64;
2581        let consumed_size = serialize_snapshot_data_file_capped(
2582            &temp_dir.path().join("data-file"),
2583            expected_consumed_size,
2584            |stream| {
2585                serialize_into(stream, &2323_u32)?;
2586                Ok(())
2587            },
2588        )
2589        .unwrap();
2590        assert_eq!(consumed_size, expected_consumed_size);
2591    }
2592
2593    #[test]
2594    fn test_serialize_snapshot_data_file_over_limit() {
2595        let temp_dir = tempfile::TempDir::new().unwrap();
2596        let expected_consumed_size = size_of::<u32>() as u64;
2597        let result = serialize_snapshot_data_file_capped(
2598            &temp_dir.path().join("data-file"),
2599            expected_consumed_size - 1,
2600            |stream| {
2601                serialize_into(stream, &2323_u32)?;
2602                Ok(())
2603            },
2604        );
2605        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
2606    }
2607
2608    #[test]
2609    fn test_deserialize_snapshot_data_file_under_limit() {
2610        let expected_data = 2323_u32;
2611        let expected_consumed_size = size_of::<u32>() as u64;
2612
2613        let temp_dir = tempfile::TempDir::new().unwrap();
2614        serialize_snapshot_data_file_capped(
2615            &temp_dir.path().join("data-file"),
2616            expected_consumed_size,
2617            |stream| {
2618                serialize_into(stream, &expected_data)?;
2619                Ok(())
2620            },
2621        )
2622        .unwrap();
2623
2624        let snapshot_root_paths = SnapshotRootPaths {
2625            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2626            incremental_snapshot_root_file_path: None,
2627        };
2628
2629        let actual_data = deserialize_snapshot_data_files_capped(
2630            &snapshot_root_paths,
2631            expected_consumed_size,
2632            |stream| {
2633                Ok(deserialize_from::<_, u32>(
2634                    &mut stream.full_snapshot_stream,
2635                )?)
2636            },
2637        )
2638        .unwrap();
2639        assert_eq!(actual_data, expected_data);
2640    }
2641
2642    #[test]
2643    fn test_deserialize_snapshot_data_file_over_limit() {
2644        let expected_data = 2323_u32;
2645        let expected_consumed_size = size_of::<u32>() as u64;
2646
2647        let temp_dir = tempfile::TempDir::new().unwrap();
2648        serialize_snapshot_data_file_capped(
2649            &temp_dir.path().join("data-file"),
2650            expected_consumed_size,
2651            |stream| {
2652                serialize_into(stream, &expected_data)?;
2653                Ok(())
2654            },
2655        )
2656        .unwrap();
2657
2658        let snapshot_root_paths = SnapshotRootPaths {
2659            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2660            incremental_snapshot_root_file_path: None,
2661        };
2662
2663        let result = deserialize_snapshot_data_files_capped(
2664            &snapshot_root_paths,
2665            expected_consumed_size - 1,
2666            |stream| {
2667                Ok(deserialize_from::<_, u32>(
2668                    &mut stream.full_snapshot_stream,
2669                )?)
2670            },
2671        );
2672        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
2673    }
2674
2675    #[test]
2676    fn test_deserialize_snapshot_data_file_extra_data() {
2677        let expected_data = 2323_u32;
2678        let expected_consumed_size = size_of::<u32>() as u64;
2679
2680        let temp_dir = tempfile::TempDir::new().unwrap();
2681        serialize_snapshot_data_file_capped(
2682            &temp_dir.path().join("data-file"),
2683            expected_consumed_size * 2,
2684            |stream| {
2685                serialize_into(stream.by_ref(), &expected_data)?;
2686                serialize_into(stream.by_ref(), &expected_data)?;
2687                Ok(())
2688            },
2689        )
2690        .unwrap();
2691
2692        let snapshot_root_paths = SnapshotRootPaths {
2693            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2694            incremental_snapshot_root_file_path: None,
2695        };
2696
2697        let result = deserialize_snapshot_data_files_capped(
2698            &snapshot_root_paths,
2699            expected_consumed_size * 2,
2700            |stream| {
2701                Ok(deserialize_from::<_, u32>(
2702                    &mut stream.full_snapshot_stream,
2703                )?)
2704            },
2705        );
2706        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2707    }
2708
2709    #[test]
2710    fn test_snapshot_version_from_file_under_limit() {
2711        let file_content = SnapshotVersion::default().as_str();
2712        let mut file = NamedTempFile::new().unwrap();
2713        file.write_all(file_content.as_bytes()).unwrap();
2714        let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2715        assert_eq!(version_from_file, file_content);
2716    }
2717
2718    #[test]
2719    fn test_snapshot_version_from_file_over_limit() {
2720        let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2721        let file_content = vec![7u8; over_limit_size];
2722        let mut file = NamedTempFile::new().unwrap();
2723        file.write_all(&file_content).unwrap();
2724        assert_matches!(
2725            snapshot_version_from_file(file.path()),
2726            Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("snapshot version file too large")
2727        );
2728    }
2729
2730    #[test]
2731    fn test_parse_full_snapshot_archive_filename() {
2732        assert_eq!(
2733            parse_full_snapshot_archive_filename(&format!(
2734                "snapshot-42-{}.tar.bz2",
2735                Hash::default()
2736            ))
2737            .unwrap(),
2738            (42, SnapshotHash(Hash::default()), ArchiveFormat::TarBzip2)
2739        );
2740        assert_eq!(
2741            parse_full_snapshot_archive_filename(&format!(
2742                "snapshot-43-{}.tar.zst",
2743                Hash::default()
2744            ))
2745            .unwrap(),
2746            (
2747                43,
2748                SnapshotHash(Hash::default()),
2749                ArchiveFormat::TarZstd {
2750                    config: ZstdConfig::default(),
2751                }
2752            )
2753        );
2754        assert_eq!(
2755            parse_full_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default()))
2756                .unwrap(),
2757            (44, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2758        );
2759        assert_eq!(
2760            parse_full_snapshot_archive_filename(&format!(
2761                "snapshot-45-{}.tar.lz4",
2762                Hash::default()
2763            ))
2764            .unwrap(),
2765            (45, SnapshotHash(Hash::default()), ArchiveFormat::TarLz4)
2766        );
2767
2768        assert!(parse_full_snapshot_archive_filename("invalid").is_err());
2769        assert!(
2770            parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err()
2771        );
2772
2773        assert!(
2774            parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err()
2775        );
2776        assert!(parse_full_snapshot_archive_filename(&format!(
2777            "snapshot-12345678-{}.bad!ext",
2778            Hash::new_unique()
2779        ))
2780        .is_err());
2781        assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2782
2783        assert!(parse_full_snapshot_archive_filename(&format!(
2784            "snapshot-bad!slot-{}.bad!ext",
2785            Hash::new_unique()
2786        ))
2787        .is_err());
2788        assert!(parse_full_snapshot_archive_filename(&format!(
2789            "snapshot-12345678-{}.bad!ext",
2790            Hash::new_unique()
2791        ))
2792        .is_err());
2793        assert!(parse_full_snapshot_archive_filename(&format!(
2794            "snapshot-bad!slot-{}.tar",
2795            Hash::new_unique()
2796        ))
2797        .is_err());
2798
2799        assert!(parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_err());
2800        assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2801        assert!(parse_full_snapshot_archive_filename(&format!(
2802            "snapshot-bad!slot-{}.tar",
2803            Hash::new_unique()
2804        ))
2805        .is_err());
2806    }
2807
2808    #[test]
2809    fn test_parse_incremental_snapshot_archive_filename() {
2810        assert_eq!(
2811            parse_incremental_snapshot_archive_filename(&format!(
2812                "incremental-snapshot-42-123-{}.tar.bz2",
2813                Hash::default()
2814            ))
2815            .unwrap(),
2816            (
2817                42,
2818                123,
2819                SnapshotHash(Hash::default()),
2820                ArchiveFormat::TarBzip2
2821            )
2822        );
2823        assert_eq!(
2824            parse_incremental_snapshot_archive_filename(&format!(
2825                "incremental-snapshot-43-234-{}.tar.zst",
2826                Hash::default()
2827            ))
2828            .unwrap(),
2829            (
2830                43,
2831                234,
2832                SnapshotHash(Hash::default()),
2833                ArchiveFormat::TarZstd {
2834                    config: ZstdConfig::default(),
2835                }
2836            )
2837        );
2838        assert_eq!(
2839            parse_incremental_snapshot_archive_filename(&format!(
2840                "incremental-snapshot-44-345-{}.tar",
2841                Hash::default()
2842            ))
2843            .unwrap(),
2844            (44, 345, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2845        );
2846        assert_eq!(
2847            parse_incremental_snapshot_archive_filename(&format!(
2848                "incremental-snapshot-45-456-{}.tar.lz4",
2849                Hash::default()
2850            ))
2851            .unwrap(),
2852            (
2853                45,
2854                456,
2855                SnapshotHash(Hash::default()),
2856                ArchiveFormat::TarLz4
2857            )
2858        );
2859
2860        assert!(parse_incremental_snapshot_archive_filename("invalid").is_err());
2861        assert!(parse_incremental_snapshot_archive_filename(&format!(
2862            "snapshot-42-{}.tar",
2863            Hash::new_unique()
2864        ))
2865        .is_err());
2866        assert!(parse_incremental_snapshot_archive_filename(
2867            "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext"
2868        )
2869        .is_err());
2870
2871        assert!(parse_incremental_snapshot_archive_filename(&format!(
2872            "incremental-snapshot-bad!slot-56785678-{}.tar",
2873            Hash::new_unique()
2874        ))
2875        .is_err());
2876
2877        assert!(parse_incremental_snapshot_archive_filename(&format!(
2878            "incremental-snapshot-12345678-bad!slot-{}.tar",
2879            Hash::new_unique()
2880        ))
2881        .is_err());
2882
2883        assert!(parse_incremental_snapshot_archive_filename(
2884            "incremental-snapshot-12341234-56785678-bad!HASH.tar"
2885        )
2886        .is_err());
2887
2888        assert!(parse_incremental_snapshot_archive_filename(&format!(
2889            "incremental-snapshot-12341234-56785678-{}.bad!ext",
2890            Hash::new_unique()
2891        ))
2892        .is_err());
2893    }
2894
2895    #[test]
2896    fn test_check_are_snapshots_compatible() {
2897        let slot1: Slot = 1234;
2898        let slot2: Slot = 5678;
2899        let slot3: Slot = 999_999;
2900
2901        let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
2902            format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()),
2903        ))
2904        .unwrap();
2905
2906        assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
2907
2908        let incremental_snapshot_archive_info =
2909            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2910                "/dir/incremental-snapshot-{}-{}-{}.tar",
2911                slot1,
2912                slot2,
2913                Hash::new_unique()
2914            )))
2915            .unwrap();
2916
2917        assert!(check_are_snapshots_compatible(
2918            &full_snapshot_archive_info,
2919            Some(&incremental_snapshot_archive_info)
2920        )
2921        .is_ok());
2922
2923        let incremental_snapshot_archive_info =
2924            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2925                "/dir/incremental-snapshot-{}-{}-{}.tar",
2926                slot2,
2927                slot3,
2928                Hash::new_unique()
2929            )))
2930            .unwrap();
2931
2932        assert!(check_are_snapshots_compatible(
2933            &full_snapshot_archive_info,
2934            Some(&incremental_snapshot_archive_info)
2935        )
2936        .is_err());
2937    }
2938
2939    /// A test heler function that creates bank snapshot files
2940    fn common_create_bank_snapshot_files(
2941        bank_snapshots_dir: &Path,
2942        min_slot: Slot,
2943        max_slot: Slot,
2944    ) {
2945        for slot in min_slot..max_slot {
2946            let snapshot_dir = get_bank_snapshot_dir(bank_snapshots_dir, slot);
2947            fs::create_dir_all(&snapshot_dir).unwrap();
2948
2949            let snapshot_filename = get_snapshot_file_name(slot);
2950            let snapshot_path = snapshot_dir.join(snapshot_filename);
2951            fs::File::create(snapshot_path).unwrap();
2952
2953            let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2954            fs::File::create(status_cache_file).unwrap();
2955
2956            let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
2957            fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
2958
2959            // Mark this directory complete so it can be used.  Check this flag first before selecting for deserialization.
2960            let state_complete_path = snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2961            fs::File::create(state_complete_path).unwrap();
2962        }
2963    }
2964
2965    #[test]
2966    fn test_get_bank_snapshots() {
2967        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2968        let min_slot = 10;
2969        let max_slot = 20;
2970        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2971
2972        let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
2973        assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
2974    }
2975
2976    #[test]
2977    fn test_get_highest_bank_snapshot_post() {
2978        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2979        let min_slot = 99;
2980        let max_slot = 123;
2981        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2982
2983        let highest_bank_snapshot = get_highest_bank_snapshot_post(temp_snapshots_dir.path());
2984        assert!(highest_bank_snapshot.is_some());
2985        assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
2986    }
2987
2988    /// A test helper function that creates full and incremental snapshot archive files.  Creates
2989    /// full snapshot files in the range (`min_full_snapshot_slot`, `max_full_snapshot_slot`], and
2990    /// incremental snapshot files in the range (`min_incremental_snapshot_slot`,
2991    /// `max_incremental_snapshot_slot`].  Additionally, "bad" files are created for both full and
2992    /// incremental snapshots to ensure the tests properly filter them out.
2993    fn common_create_snapshot_archive_files(
2994        full_snapshot_archives_dir: &Path,
2995        incremental_snapshot_archives_dir: &Path,
2996        min_full_snapshot_slot: Slot,
2997        max_full_snapshot_slot: Slot,
2998        min_incremental_snapshot_slot: Slot,
2999        max_incremental_snapshot_slot: Slot,
3000    ) {
3001        fs::create_dir_all(full_snapshot_archives_dir).unwrap();
3002        fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
3003        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3004            for incremental_snapshot_slot in
3005                min_incremental_snapshot_slot..max_incremental_snapshot_slot
3006            {
3007                let snapshot_filename = format!(
3008                    "incremental-snapshot-{}-{}-{}.tar",
3009                    full_snapshot_slot,
3010                    incremental_snapshot_slot,
3011                    Hash::default()
3012                );
3013                let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
3014                fs::File::create(snapshot_filepath).unwrap();
3015            }
3016
3017            let snapshot_filename =
3018                format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3019            let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
3020            fs::File::create(snapshot_filepath).unwrap();
3021
3022            // Add in an incremental snapshot with a bad filename and high slot to ensure filename are filtered and sorted correctly
3023            let bad_filename = format!(
3024                "incremental-snapshot-{}-{}-bad!hash.tar",
3025                full_snapshot_slot,
3026                max_incremental_snapshot_slot + 1,
3027            );
3028            let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
3029            fs::File::create(bad_filepath).unwrap();
3030        }
3031
3032        // Add in a snapshot with a bad filename and high slot to ensure filename are filtered and
3033        // sorted correctly
3034        let bad_filename = format!("snapshot-{}-bad!hash.tar", max_full_snapshot_slot + 1);
3035        let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
3036        fs::File::create(bad_filepath).unwrap();
3037    }
3038
3039    #[test]
3040    fn test_get_full_snapshot_archives() {
3041        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3042        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3043        let min_slot = 123;
3044        let max_slot = 456;
3045        common_create_snapshot_archive_files(
3046            full_snapshot_archives_dir.path(),
3047            incremental_snapshot_archives_dir.path(),
3048            min_slot,
3049            max_slot,
3050            0,
3051            0,
3052        );
3053
3054        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3055        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3056    }
3057
3058    #[test]
3059    fn test_get_full_snapshot_archives_remote() {
3060        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3061        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3062        let min_slot = 123;
3063        let max_slot = 456;
3064        common_create_snapshot_archive_files(
3065            &full_snapshot_archives_dir
3066                .path()
3067                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3068            &incremental_snapshot_archives_dir
3069                .path()
3070                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3071            min_slot,
3072            max_slot,
3073            0,
3074            0,
3075        );
3076
3077        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3078        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3079        assert!(snapshot_archives.iter().all(|info| info.is_remote()));
3080    }
3081
3082    #[test]
3083    fn test_get_incremental_snapshot_archives() {
3084        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3085        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3086        let min_full_snapshot_slot = 12;
3087        let max_full_snapshot_slot = 23;
3088        let min_incremental_snapshot_slot = 34;
3089        let max_incremental_snapshot_slot = 45;
3090        common_create_snapshot_archive_files(
3091            full_snapshot_archives_dir.path(),
3092            incremental_snapshot_archives_dir.path(),
3093            min_full_snapshot_slot,
3094            max_full_snapshot_slot,
3095            min_incremental_snapshot_slot,
3096            max_incremental_snapshot_slot,
3097        );
3098
3099        let incremental_snapshot_archives =
3100            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3101        assert_eq!(
3102            incremental_snapshot_archives.len() as Slot,
3103            (max_full_snapshot_slot - min_full_snapshot_slot)
3104                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3105        );
3106    }
3107
3108    #[test]
3109    fn test_get_incremental_snapshot_archives_remote() {
3110        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3111        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3112        let min_full_snapshot_slot = 12;
3113        let max_full_snapshot_slot = 23;
3114        let min_incremental_snapshot_slot = 34;
3115        let max_incremental_snapshot_slot = 45;
3116        common_create_snapshot_archive_files(
3117            &full_snapshot_archives_dir
3118                .path()
3119                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3120            &incremental_snapshot_archives_dir
3121                .path()
3122                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3123            min_full_snapshot_slot,
3124            max_full_snapshot_slot,
3125            min_incremental_snapshot_slot,
3126            max_incremental_snapshot_slot,
3127        );
3128
3129        let incremental_snapshot_archives =
3130            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3131        assert_eq!(
3132            incremental_snapshot_archives.len() as Slot,
3133            (max_full_snapshot_slot - min_full_snapshot_slot)
3134                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3135        );
3136        assert!(incremental_snapshot_archives
3137            .iter()
3138            .all(|info| info.is_remote()));
3139    }
3140
3141    #[test]
3142    fn test_get_highest_full_snapshot_archive_slot() {
3143        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3144        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3145        let min_slot = 123;
3146        let max_slot = 456;
3147        common_create_snapshot_archive_files(
3148            full_snapshot_archives_dir.path(),
3149            incremental_snapshot_archives_dir.path(),
3150            min_slot,
3151            max_slot,
3152            0,
3153            0,
3154        );
3155
3156        assert_eq!(
3157            get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
3158            Some(max_slot - 1)
3159        );
3160    }
3161
3162    #[test]
3163    fn test_get_highest_incremental_snapshot_slot() {
3164        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3165        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3166        let min_full_snapshot_slot = 12;
3167        let max_full_snapshot_slot = 23;
3168        let min_incremental_snapshot_slot = 34;
3169        let max_incremental_snapshot_slot = 45;
3170        common_create_snapshot_archive_files(
3171            full_snapshot_archives_dir.path(),
3172            incremental_snapshot_archives_dir.path(),
3173            min_full_snapshot_slot,
3174            max_full_snapshot_slot,
3175            min_incremental_snapshot_slot,
3176            max_incremental_snapshot_slot,
3177        );
3178
3179        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3180            assert_eq!(
3181                get_highest_incremental_snapshot_archive_slot(
3182                    incremental_snapshot_archives_dir.path(),
3183                    full_snapshot_slot
3184                ),
3185                Some(max_incremental_snapshot_slot - 1)
3186            );
3187        }
3188
3189        assert_eq!(
3190            get_highest_incremental_snapshot_archive_slot(
3191                incremental_snapshot_archives_dir.path(),
3192                max_full_snapshot_slot
3193            ),
3194            None
3195        );
3196    }
3197
3198    fn common_test_purge_old_snapshot_archives(
3199        snapshot_names: &[&String],
3200        maximum_full_snapshot_archives_to_retain: NonZeroUsize,
3201        maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
3202        expected_snapshots: &[&String],
3203    ) {
3204        let temp_snap_dir = tempfile::TempDir::new().unwrap();
3205
3206        for snap_name in snapshot_names {
3207            let snap_path = temp_snap_dir.path().join(snap_name);
3208            let mut _snap_file = fs::File::create(snap_path);
3209        }
3210        purge_old_snapshot_archives(
3211            temp_snap_dir.path(),
3212            temp_snap_dir.path(),
3213            maximum_full_snapshot_archives_to_retain,
3214            maximum_incremental_snapshot_archives_to_retain,
3215        );
3216
3217        let mut retained_snaps = HashSet::new();
3218        for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
3219            let entry_path_buf = entry.unwrap().path();
3220            let entry_path = entry_path_buf.as_path();
3221            let snapshot_name = entry_path
3222                .file_name()
3223                .unwrap()
3224                .to_str()
3225                .unwrap()
3226                .to_string();
3227            retained_snaps.insert(snapshot_name);
3228        }
3229
3230        for snap_name in expected_snapshots {
3231            assert!(
3232                retained_snaps.contains(snap_name.as_str()),
3233                "{snap_name} not found"
3234            );
3235        }
3236        assert_eq!(retained_snaps.len(), expected_snapshots.len());
3237    }
3238
3239    #[test]
3240    fn test_purge_old_full_snapshot_archives() {
3241        let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
3242        let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
3243        let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
3244        let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
3245
3246        // expecting only the newest to be retained
3247        let expected_snapshots = vec![&snap3_name];
3248        common_test_purge_old_snapshot_archives(
3249            &snapshot_names,
3250            NonZeroUsize::new(1).unwrap(),
3251            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3252            &expected_snapshots,
3253        );
3254
3255        // retaining 2, expecting the 2 newest to be retained
3256        let expected_snapshots = vec![&snap2_name, &snap3_name];
3257        common_test_purge_old_snapshot_archives(
3258            &snapshot_names,
3259            NonZeroUsize::new(2).unwrap(),
3260            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3261            &expected_snapshots,
3262        );
3263
3264        // retaining 3, all three should be retained
3265        let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
3266        common_test_purge_old_snapshot_archives(
3267            &snapshot_names,
3268            NonZeroUsize::new(3).unwrap(),
3269            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3270            &expected_snapshots,
3271        );
3272    }
3273
3274    /// Mimic a running node's behavior w.r.t. purging old snapshot archives.  Take snapshots in a
3275    /// loop, and periodically purge old snapshot archives.  After purging, check to make sure the
3276    /// snapshot archives on disk are correct.
3277    #[test]
3278    fn test_purge_old_full_snapshot_archives_in_the_loop() {
3279        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3280        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3281        let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
3282        let starting_slot: Slot = 42;
3283
3284        for slot in (starting_slot..).take(100) {
3285            let full_snapshot_archive_file_name =
3286                format!("snapshot-{}-{}.tar", slot, Hash::default());
3287            let full_snapshot_archive_path = full_snapshot_archives_dir
3288                .as_ref()
3289                .join(full_snapshot_archive_file_name);
3290            fs::File::create(full_snapshot_archive_path).unwrap();
3291
3292            // don't purge-and-check until enough snapshot archives have been created
3293            if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
3294                continue;
3295            }
3296
3297            // purge infrequently, so there will always be snapshot archives to purge
3298            if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
3299                continue;
3300            }
3301
3302            purge_old_snapshot_archives(
3303                &full_snapshot_archives_dir,
3304                &incremental_snapshot_archives_dir,
3305                maximum_snapshots_to_retain,
3306                NonZeroUsize::new(usize::MAX).unwrap(),
3307            );
3308            let mut full_snapshot_archives =
3309                get_full_snapshot_archives(&full_snapshot_archives_dir);
3310            full_snapshot_archives.sort_unstable();
3311            assert_eq!(
3312                full_snapshot_archives.len(),
3313                maximum_snapshots_to_retain.get()
3314            );
3315            assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
3316            for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
3317                assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
3318            }
3319        }
3320    }
3321
3322    #[test]
3323    fn test_purge_old_incremental_snapshot_archives() {
3324        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3325        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3326        let starting_slot = 100_000;
3327
3328        let maximum_incremental_snapshot_archives_to_retain =
3329            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3330        let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3331
3332        let incremental_snapshot_interval = 100;
3333        let num_incremental_snapshots_per_full_snapshot =
3334            maximum_incremental_snapshot_archives_to_retain.get() * 2;
3335        let full_snapshot_interval =
3336            incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
3337
3338        let mut snapshot_filenames = vec![];
3339        (starting_slot..)
3340            .step_by(full_snapshot_interval)
3341            .take(
3342                maximum_full_snapshot_archives_to_retain
3343                    .checked_mul(NonZeroUsize::new(2).unwrap())
3344                    .unwrap()
3345                    .get(),
3346            )
3347            .for_each(|full_snapshot_slot| {
3348                let snapshot_filename =
3349                    format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3350                let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
3351                fs::File::create(snapshot_path).unwrap();
3352                snapshot_filenames.push(snapshot_filename);
3353
3354                (full_snapshot_slot..)
3355                    .step_by(incremental_snapshot_interval)
3356                    .take(num_incremental_snapshots_per_full_snapshot)
3357                    .skip(1)
3358                    .for_each(|incremental_snapshot_slot| {
3359                        let snapshot_filename = format!(
3360                            "incremental-snapshot-{}-{}-{}.tar",
3361                            full_snapshot_slot,
3362                            incremental_snapshot_slot,
3363                            Hash::default()
3364                        );
3365                        let snapshot_path = incremental_snapshot_archives_dir
3366                            .path()
3367                            .join(&snapshot_filename);
3368                        fs::File::create(snapshot_path).unwrap();
3369                        snapshot_filenames.push(snapshot_filename);
3370                    });
3371            });
3372
3373        purge_old_snapshot_archives(
3374            full_snapshot_archives_dir.path(),
3375            incremental_snapshot_archives_dir.path(),
3376            maximum_full_snapshot_archives_to_retain,
3377            maximum_incremental_snapshot_archives_to_retain,
3378        );
3379
3380        // Ensure correct number of full snapshot archives are purged/retained
3381        let mut remaining_full_snapshot_archives =
3382            get_full_snapshot_archives(full_snapshot_archives_dir.path());
3383        assert_eq!(
3384            remaining_full_snapshot_archives.len(),
3385            maximum_full_snapshot_archives_to_retain.get(),
3386        );
3387        remaining_full_snapshot_archives.sort_unstable();
3388        let latest_full_snapshot_archive_slot =
3389            remaining_full_snapshot_archives.last().unwrap().slot();
3390
3391        // Ensure correct number of incremental snapshot archives are purged/retained
3392        // For each additional full snapshot archive, one additional (the newest)
3393        // incremental snapshot archive is retained. This is accounted for by the
3394        // `+ maximum_full_snapshot_archives_to_retain.saturating_sub(1)`
3395        let mut remaining_incremental_snapshot_archives =
3396            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3397        assert_eq!(
3398            remaining_incremental_snapshot_archives.len(),
3399            maximum_incremental_snapshot_archives_to_retain
3400                .get()
3401                .saturating_add(
3402                    maximum_full_snapshot_archives_to_retain
3403                        .get()
3404                        .saturating_sub(1)
3405                )
3406        );
3407        remaining_incremental_snapshot_archives.sort_unstable();
3408        remaining_incremental_snapshot_archives.reverse();
3409
3410        // Ensure there exists one incremental snapshot all but the latest full snapshot
3411        for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
3412            let incremental_snapshot_archive =
3413                remaining_incremental_snapshot_archives.pop().unwrap();
3414
3415            let expected_base_slot =
3416                latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
3417            assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
3418            let expected_slot = expected_base_slot
3419                + (full_snapshot_interval - incremental_snapshot_interval) as u64;
3420            assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
3421        }
3422
3423        // Ensure all remaining incremental snapshots are only for the latest full snapshot
3424        for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
3425            assert_eq!(
3426                incremental_snapshot_archive.base_slot(),
3427                latest_full_snapshot_archive_slot
3428            );
3429        }
3430
3431        // Ensure the remaining incremental snapshots are at the right slot
3432        let expected_remaining_incremental_snapshot_archive_slots =
3433            (latest_full_snapshot_archive_slot..)
3434                .step_by(incremental_snapshot_interval)
3435                .take(num_incremental_snapshots_per_full_snapshot)
3436                .skip(
3437                    num_incremental_snapshots_per_full_snapshot
3438                        - maximum_incremental_snapshot_archives_to_retain.get(),
3439                )
3440                .collect::<HashSet<_>>();
3441
3442        let actual_remaining_incremental_snapshot_archive_slots =
3443            remaining_incremental_snapshot_archives
3444                .iter()
3445                .map(|snapshot| snapshot.slot())
3446                .collect::<HashSet<_>>();
3447        assert_eq!(
3448            actual_remaining_incremental_snapshot_archive_slots,
3449            expected_remaining_incremental_snapshot_archive_slots
3450        );
3451    }
3452
3453    #[test]
3454    fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
3455        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3456        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3457
3458        for snapshot_filenames in [
3459            format!("incremental-snapshot-100-120-{}.tar", Hash::default()),
3460            format!("incremental-snapshot-100-140-{}.tar", Hash::default()),
3461            format!("incremental-snapshot-100-160-{}.tar", Hash::default()),
3462            format!("incremental-snapshot-100-180-{}.tar", Hash::default()),
3463            format!("incremental-snapshot-200-220-{}.tar", Hash::default()),
3464            format!("incremental-snapshot-200-240-{}.tar", Hash::default()),
3465            format!("incremental-snapshot-200-260-{}.tar", Hash::default()),
3466            format!("incremental-snapshot-200-280-{}.tar", Hash::default()),
3467        ] {
3468            let snapshot_path = incremental_snapshot_archives_dir
3469                .path()
3470                .join(snapshot_filenames);
3471            fs::File::create(snapshot_path).unwrap();
3472        }
3473
3474        purge_old_snapshot_archives(
3475            full_snapshot_archives_dir.path(),
3476            incremental_snapshot_archives_dir.path(),
3477            NonZeroUsize::new(usize::MAX).unwrap(),
3478            NonZeroUsize::new(usize::MAX).unwrap(),
3479        );
3480
3481        let remaining_incremental_snapshot_archives =
3482            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3483        assert!(remaining_incremental_snapshot_archives.is_empty());
3484    }
3485
3486    #[test]
3487    fn test_get_snapshot_accounts_hardlink_dir() {
3488        let slot: Slot = 1;
3489
3490        let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
3491
3492        let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
3493        let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
3494        let accounts_hardlinks_dir = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
3495        fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
3496
3497        let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
3498        let appendvec_filename = format!("{slot}.0");
3499        let appendvec_path = accounts_dir.join(appendvec_filename);
3500
3501        let ret = get_snapshot_accounts_hardlink_dir(
3502            &appendvec_path,
3503            slot,
3504            &mut account_paths_set,
3505            &accounts_hardlinks_dir,
3506        );
3507        assert!(ret.is_ok());
3508
3509        let wrong_appendvec_path = appendvec_path
3510            .parent()
3511            .unwrap()
3512            .parent()
3513            .unwrap()
3514            .join(appendvec_path.file_name().unwrap());
3515        let ret = get_snapshot_accounts_hardlink_dir(
3516            &wrong_appendvec_path,
3517            slot,
3518            &mut account_paths_set,
3519            accounts_hardlinks_dir,
3520        );
3521
3522        assert_matches!(
3523            ret,
3524            Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
3525        );
3526    }
3527
3528    #[test]
3529    fn test_full_snapshot_slot_file_good() {
3530        let slot_written = 123_456_789;
3531        let bank_snapshot_dir = TempDir::new().unwrap();
3532        write_full_snapshot_slot_file(&bank_snapshot_dir, slot_written).unwrap();
3533
3534        let slot_read = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap();
3535        assert_eq!(slot_read, slot_written);
3536    }
3537
3538    #[test]
3539    fn test_full_snapshot_slot_file_bad() {
3540        const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
3541        let too_small = [1u8; SLOT_SIZE - 1];
3542        let too_large = [1u8; SLOT_SIZE + 1];
3543
3544        for contents in [too_small.as_slice(), too_large.as_slice()] {
3545            let bank_snapshot_dir = TempDir::new().unwrap();
3546            let full_snapshot_slot_path = bank_snapshot_dir
3547                .as_ref()
3548                .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
3549            fs::write(full_snapshot_slot_path, contents).unwrap();
3550
3551            let err = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap_err();
3552            assert!(err
3553                .to_string()
3554                .starts_with("invalid full snapshot slot file size"));
3555        }
3556    }
3557}