solana_runtime/
snapshot_utils.rs

1use {
2    crate::{
3        bank::{BankFieldsToSerialize, BankHashStats, BankSlotDelta},
4        serde_snapshot::{
5            self, BankIncrementalSnapshotPersistence, ExtraFieldsToSerialize, SnapshotStreams,
6        },
7        snapshot_archive_info::{
8            FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfo,
9            SnapshotArchiveInfoGetter,
10        },
11        snapshot_bank_utils,
12        snapshot_config::SnapshotConfig,
13        snapshot_hash::SnapshotHash,
14        snapshot_package::{SnapshotKind, SnapshotPackage},
15        snapshot_utils::snapshot_storage_rebuilder::{
16            RebuiltSnapshotStorage, SnapshotStorageRebuilder,
17        },
18    },
19    bzip2::bufread::BzDecoder,
20    crossbeam_channel::Sender,
21    flate2::read::GzDecoder,
22    lazy_static::lazy_static,
23    log::*,
24    regex::Regex,
25    solana_accounts_db::{
26        account_storage::{meta::StoredMetaWriteVersion, AccountStorageMap},
27        accounts_db::{AccountStorageEntry, AtomicAccountsFileId},
28        accounts_file::{AccountsFile, AccountsFileError, InternalsForArchive, StorageAccess},
29        accounts_hash::{AccountsDeltaHash, AccountsHash},
30        epoch_accounts_hash::EpochAccountsHash,
31        hardened_unpack::{self, ParallelSelector, UnpackError},
32        shared_buffer_reader::{SharedBuffer, SharedBufferReader},
33        utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
34    },
35    solana_measure::{measure::Measure, measure_time, measure_us},
36    solana_sdk::{
37        clock::{Epoch, Slot},
38        hash::Hash,
39    },
40    std::{
41        cmp::Ordering,
42        collections::{HashMap, HashSet},
43        fmt, fs,
44        io::{BufReader, BufWriter, Error as IoError, Read, Result as IoResult, Seek, Write},
45        mem,
46        num::NonZeroUsize,
47        ops::RangeInclusive,
48        path::{Path, PathBuf},
49        process::ExitStatus,
50        str::FromStr,
51        sync::Arc,
52        thread::{Builder, JoinHandle},
53    },
54    tar::{self, Archive},
55    tempfile::TempDir,
56    thiserror::Error,
57};
58#[cfg(feature = "dev-context-only-utils")]
59use {
60    hardened_unpack::UnpackedAppendVecMap, rayon::prelude::*,
61    solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
62};
63
64mod archive_format;
65pub mod snapshot_storage_rebuilder;
66pub use archive_format::*;
67
68pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
69pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
70pub const SNAPSHOT_STATE_COMPLETE_FILENAME: &str = "state_complete";
71pub const SNAPSHOT_ACCOUNTS_HARDLINKS: &str = "accounts_hardlinks";
72pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
73pub const SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME: &str = "full_snapshot_slot";
74pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
75const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; // byte
76const VERSION_STRING_V1_2_0: &str = "1.2.0";
77pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
78pub const BANK_SNAPSHOT_PRE_FILENAME_EXTENSION: &str = "pre";
79// The following unsafes are
80// - Safe because the values are fixed, known non-zero constants
81// - Necessary in order to have a plain NonZeroUsize as the constant, NonZeroUsize
82//   returns an Option<NonZeroUsize> and we can't .unwrap() at compile time
83pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
84    unsafe { NonZeroUsize::new_unchecked(2) };
85pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
86    unsafe { NonZeroUsize::new_unchecked(4) };
87pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
88pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
89
90#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
91pub enum SnapshotVersion {
92    #[default]
93    V1_2_0,
94}
95
96impl fmt::Display for SnapshotVersion {
97    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
98        f.write_str(From::from(*self))
99    }
100}
101
102impl From<SnapshotVersion> for &'static str {
103    fn from(snapshot_version: SnapshotVersion) -> &'static str {
104        match snapshot_version {
105            SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
106        }
107    }
108}
109
110impl FromStr for SnapshotVersion {
111    type Err = &'static str;
112
113    fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
114        // Remove leading 'v' or 'V' from slice
115        let version_string = if version_string
116            .get(..1)
117            .is_some_and(|s| s.eq_ignore_ascii_case("v"))
118        {
119            &version_string[1..]
120        } else {
121            version_string
122        };
123        match version_string {
124            VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
125            _ => Err("unsupported snapshot version"),
126        }
127    }
128}
129
130impl SnapshotVersion {
131    pub fn as_str(self) -> &'static str {
132        <&str as From<Self>>::from(self)
133    }
134}
135
136/// Information about a bank snapshot. Namely the slot of the bank, the path to the snapshot, and
137/// the kind of the snapshot.
138#[derive(PartialEq, Eq, Debug)]
139pub struct BankSnapshotInfo {
140    /// Slot of the bank
141    pub slot: Slot,
142    /// Snapshot kind
143    pub snapshot_kind: BankSnapshotKind,
144    /// Path to the bank snapshot directory
145    pub snapshot_dir: PathBuf,
146    /// Snapshot version
147    pub snapshot_version: SnapshotVersion,
148}
149
150impl PartialOrd for BankSnapshotInfo {
151    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
152        Some(self.cmp(other))
153    }
154}
155
156// Order BankSnapshotInfo by slot (ascending), which practically is sorting chronologically
157impl Ord for BankSnapshotInfo {
158    fn cmp(&self, other: &Self) -> Ordering {
159        self.slot.cmp(&other.slot)
160    }
161}
162
163impl BankSnapshotInfo {
164    pub fn new_from_dir(
165        bank_snapshots_dir: impl AsRef<Path>,
166        slot: Slot,
167    ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
168        // check this directory to see if there is a BankSnapshotPre and/or
169        // BankSnapshotPost file
170        let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
171
172        if !bank_snapshot_dir.is_dir() {
173            return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
174                bank_snapshot_dir,
175            ));
176        }
177
178        // Among the files checks, the completion flag file check should be done first to avoid the later
179        // I/O errors.
180
181        // There is a time window from the slot directory being created, and the content being completely
182        // filled.  Check the completion to avoid using a highest found slot directory with missing content.
183        if !is_bank_snapshot_complete(&bank_snapshot_dir) {
184            return Err(SnapshotNewFromDirError::IncompleteDir(bank_snapshot_dir));
185        }
186
187        let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
188        if !status_cache_file.is_file() {
189            return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
190                status_cache_file,
191            ));
192        }
193
194        let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
195        let version_str = snapshot_version_from_file(&version_path).or(Err(
196            SnapshotNewFromDirError::MissingVersionFile(version_path),
197        ))?;
198        let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
199            .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
200
201        let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
202        let bank_snapshot_pre_path =
203            bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
204
205        // NOTE: It is important that checking for "Pre" happens before "Post.
206        //
207        // Consider the scenario where AccountsHashVerifier is actively processing an
208        // AccountsPackage for a snapshot/slot; if AHV is in the middle of reserializing the
209        // bank snapshot file (writing the new "Post" file), and then the process dies,
210        // there will be an incomplete "Post" file on disk.  We do not want only the existence of
211        // this "Post" file to be sufficient for deciding the snapshot kind as "Post".  More so,
212        // "Post" *requires* the *absence* of a "Pre" file.
213        let snapshot_kind = if bank_snapshot_pre_path.is_file() {
214            BankSnapshotKind::Pre
215        } else if bank_snapshot_post_path.is_file() {
216            BankSnapshotKind::Post
217        } else {
218            return Err(SnapshotNewFromDirError::MissingSnapshotFile(
219                bank_snapshot_dir,
220            ));
221        };
222
223        Ok(BankSnapshotInfo {
224            slot,
225            snapshot_kind,
226            snapshot_dir: bank_snapshot_dir,
227            snapshot_version,
228        })
229    }
230
231    pub fn snapshot_path(&self) -> PathBuf {
232        let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot));
233
234        let ext = match self.snapshot_kind {
235            BankSnapshotKind::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
236            BankSnapshotKind::Post => "",
237        };
238        bank_snapshot_path.set_extension(ext);
239
240        bank_snapshot_path
241    }
242}
243
244/// Bank snapshots traditionally had their accounts hash calculated prior to serialization.  Since
245/// the hash calculation takes a long time, an optimization has been put in to offload the accounts
246/// hash calculation.  The bank serialization format has not changed, so we need another way to
247/// identify if a bank snapshot contains the calculated accounts hash or not.
248///
249/// When a bank snapshot is first taken, it does not have the calculated accounts hash.  It is said
250/// that this bank snapshot is "pre" accounts hash.  Later, when the accounts hash is calculated,
251/// the bank snapshot is re-serialized, and is now "post" accounts hash.
252#[derive(Debug, Copy, Clone, Eq, PartialEq)]
253pub enum BankSnapshotKind {
254    /// This bank snapshot has *not* yet had its accounts hash calculated
255    Pre,
256    /// This bank snapshot *has* had its accounts hash calculated
257    Post,
258}
259
260/// When constructing a bank a snapshot, traditionally the snapshot was from a snapshot archive.  Now,
261/// the snapshot can be from a snapshot directory, or from a snapshot archive.  This is the flag to
262/// indicate which.
263#[derive(Clone, Copy, Debug, Eq, PartialEq)]
264pub enum SnapshotFrom {
265    /// Build from the snapshot archive
266    Archive,
267    /// Build directly from the bank snapshot directory
268    Dir,
269}
270
271/// Helper type when rebuilding from snapshots.  Designed to handle when rebuilding from just a
272/// full snapshot, or from both a full snapshot and an incremental snapshot.
273#[derive(Debug)]
274pub struct SnapshotRootPaths {
275    pub full_snapshot_root_file_path: PathBuf,
276    pub incremental_snapshot_root_file_path: Option<PathBuf>,
277}
278
279/// Helper type to bundle up the results from `unarchive_snapshot()`
280#[derive(Debug)]
281pub struct UnarchivedSnapshot {
282    #[allow(dead_code)]
283    unpack_dir: TempDir,
284    pub storage: AccountStorageMap,
285    pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
286    pub measure_untar: Measure,
287}
288
289/// Helper type for passing around the unpacked snapshots dir and the snapshot version together
290#[derive(Debug)]
291pub struct UnpackedSnapshotsDirAndVersion {
292    pub unpacked_snapshots_dir: PathBuf,
293    pub snapshot_version: SnapshotVersion,
294}
295
296/// Helper type for passing around account storage map and next append vec id
297/// for reconstructing accounts from a snapshot
298pub(crate) struct StorageAndNextAccountsFileId {
299    pub storage: AccountStorageMap,
300    pub next_append_vec_id: AtomicAccountsFileId,
301}
302
303#[derive(Error, Debug)]
304#[allow(clippy::large_enum_variant)]
305pub enum SnapshotError {
306    #[error("I/O error: {0}")]
307    Io(#[from] IoError),
308
309    #[error("AccountsFile error: {0}")]
310    AccountsFileError(#[from] AccountsFileError),
311
312    #[error("serialization error: {0}")]
313    Serialize(#[from] bincode::Error),
314
315    #[error("crossbeam send error: {0}")]
316    CrossbeamSend(#[from] crossbeam_channel::SendError<PathBuf>),
317
318    #[error("archive generation failure {0}")]
319    ArchiveGenerationFailure(ExitStatus),
320
321    #[error("Unpack error: {0}")]
322    UnpackError(#[from] UnpackError),
323
324    #[error("source({1}) - I/O error: {0}")]
325    IoWithSource(IoError, &'static str),
326
327    #[error("could not get file name from path '{0}'")]
328    PathToFileNameError(PathBuf),
329
330    #[error("could not get str from file name '{0}'")]
331    FileNameToStrError(PathBuf),
332
333    #[error("could not parse snapshot archive's file name '{0}'")]
334    ParseSnapshotArchiveFileNameError(String),
335
336    #[error("snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot ({1}) do not match")]
337    MismatchedBaseSlot(Slot, Slot),
338
339    #[error("no snapshot archives to load from '{0}'")]
340    NoSnapshotArchives(PathBuf),
341
342    #[error("snapshot slot mismatch: deserialized bank: {0}, snapshot archive: {1}")]
343    MismatchedSlot(Slot, Slot),
344
345    #[error("snapshot hash mismatch: deserialized bank: {0:?}, snapshot archive: {1:?}")]
346    MismatchedHash(SnapshotHash, SnapshotHash),
347
348    #[error("snapshot slot deltas are invalid: {0}")]
349    VerifySlotDeltas(#[from] VerifySlotDeltasError),
350
351    #[error("snapshot epoch stakes are invalid: {0}")]
352    VerifyEpochStakes(#[from] VerifyEpochStakesError),
353
354    #[error("bank_snapshot_info new_from_dir failed: {0}")]
355    NewFromDir(#[from] SnapshotNewFromDirError),
356
357    #[error("invalid snapshot dir path '{0}'")]
358    InvalidSnapshotDirPath(PathBuf),
359
360    #[error("invalid AppendVec path '{0}'")]
361    InvalidAppendVecPath(PathBuf),
362
363    #[error("invalid account path '{0}'")]
364    InvalidAccountPath(PathBuf),
365
366    #[error("no valid snapshot dir found under '{0}'")]
367    NoSnapshotSlotDir(PathBuf),
368
369    #[error("snapshot dir account paths mismatching")]
370    AccountPathsMismatch,
371
372    #[error("failed to add bank snapshot for slot {1}: {0}")]
373    AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
374
375    #[error("failed to archive snapshot package: {0}")]
376    ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
377
378    #[error("failed to rebuild snapshot storages: {0}")]
379    RebuildStorages(String),
380}
381
382#[derive(Error, Debug)]
383pub enum SnapshotNewFromDirError {
384    #[error("invalid bank snapshot directory '{0}'")]
385    InvalidBankSnapshotDir(PathBuf),
386
387    #[error("missing status cache file '{0}'")]
388    MissingStatusCacheFile(PathBuf),
389
390    #[error("missing version file '{0}'")]
391    MissingVersionFile(PathBuf),
392
393    #[error("invalid snapshot version '{0}'")]
394    InvalidVersion(String),
395
396    #[error("snapshot directory incomplete '{0}'")]
397    IncompleteDir(PathBuf),
398
399    #[error("missing snapshot file '{0}'")]
400    MissingSnapshotFile(PathBuf),
401}
402
403pub type Result<T> = std::result::Result<T, SnapshotError>;
404
405/// Errors that can happen in `verify_slot_deltas()`
406#[derive(Error, Debug, PartialEq, Eq)]
407pub enum VerifySlotDeltasError {
408    #[error("too many entries: {0} (max: {1})")]
409    TooManyEntries(usize, usize),
410
411    #[error("slot {0} is not a root")]
412    SlotIsNotRoot(Slot),
413
414    #[error("slot {0} is greater than bank slot {1}")]
415    SlotGreaterThanMaxRoot(Slot, Slot),
416
417    #[error("slot {0} has multiple entries")]
418    SlotHasMultipleEntries(Slot),
419
420    #[error("slot {0} was not found in slot history")]
421    SlotNotFoundInHistory(Slot),
422
423    #[error("slot {0} was in history but missing from slot deltas")]
424    SlotNotFoundInDeltas(Slot),
425
426    #[error("slot history is bad and cannot be used to verify slot deltas")]
427    BadSlotHistory,
428}
429
430/// Errors that can happen in `verify_epoch_stakes()`
431#[derive(Error, Debug, PartialEq, Eq)]
432pub enum VerifyEpochStakesError {
433    #[error("epoch {0} is greater than the max {1}")]
434    EpochGreaterThanMax(Epoch, Epoch),
435
436    #[error("stakes not found for epoch {0} (required epochs: {1:?})")]
437    StakesNotFound(Epoch, RangeInclusive<Epoch>),
438}
439
440/// Errors that can happen in `add_bank_snapshot()`
441#[derive(Error, Debug)]
442pub enum AddBankSnapshotError {
443    #[error("bank snapshot dir already exists '{0}'")]
444    SnapshotDirAlreadyExists(PathBuf),
445
446    #[error("failed to create snapshot dir '{1}': {0}")]
447    CreateSnapshotDir(#[source] IoError, PathBuf),
448
449    #[error("failed to flush storage '{1}': {0}")]
450    FlushStorage(#[source] AccountsFileError, PathBuf),
451
452    #[error("failed to hard link storages: {0}")]
453    HardLinkStorages(#[source] HardLinkStoragesToSnapshotError),
454
455    #[error("failed to serialize bank: {0}")]
456    SerializeBank(#[source] Box<SnapshotError>),
457
458    #[error("failed to serialize status cache: {0}")]
459    SerializeStatusCache(#[source] Box<SnapshotError>),
460
461    #[error("failed to write snapshot version file '{1}': {0}")]
462    WriteSnapshotVersionFile(#[source] IoError, PathBuf),
463
464    #[error("failed to mark snapshot as 'complete': failed to create file '{1}': {0}")]
465    CreateStateCompleteFile(#[source] IoError, PathBuf),
466}
467
468/// Errors that can happen in `archive_snapshot_package()`
469#[derive(Error, Debug)]
470pub enum ArchiveSnapshotPackageError {
471    #[error("failed to create archive path '{1}': {0}")]
472    CreateArchiveDir(#[source] IoError, PathBuf),
473
474    #[error("failed to create staging dir inside '{1}': {0}")]
475    CreateStagingDir(#[source] IoError, PathBuf),
476
477    #[error("failed to create snapshot staging dir '{1}': {0}")]
478    CreateSnapshotStagingDir(#[source] IoError, PathBuf),
479
480    #[error("failed to canonicalize snapshot source dir '{1}': {0}")]
481    CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),
482
483    #[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
484    SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),
485
486    #[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
487    SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),
488
489    #[error("failed to symlink version file from '{1}' to '{2}': {0}")]
490    SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),
491
492    #[error("failed to create archive file '{1}': {0}")]
493    CreateArchiveFile(#[source] IoError, PathBuf),
494
495    #[error("failed to archive version file: {0}")]
496    ArchiveVersionFile(#[source] IoError),
497
498    #[error("failed to archive snapshots dir: {0}")]
499    ArchiveSnapshotsDir(#[source] IoError),
500
501    #[error("failed to archive account storage file '{1}': {0}")]
502    ArchiveAccountStorageFile(#[source] IoError, PathBuf),
503
504    #[error("failed to archive snapshot: {0}")]
505    FinishArchive(#[source] IoError),
506
507    #[error("failed to create encoder: {0}")]
508    CreateEncoder(#[source] IoError),
509
510    #[error("failed to encode archive: {0}")]
511    FinishEncoder(#[source] IoError),
512
513    #[error("failed to query archive metadata '{1}': {0}")]
514    QueryArchiveMetadata(#[source] IoError, PathBuf),
515
516    #[error("failed to move archive from '{1}' to '{2}': {0}")]
517    MoveArchive(#[source] IoError, PathBuf, PathBuf),
518}
519
520/// Errors that can happen in `hard_link_storages_to_snapshot()`
521#[derive(Error, Debug)]
522pub enum HardLinkStoragesToSnapshotError {
523    #[error("failed to create accounts hard links dir '{1}': {0}")]
524    CreateAccountsHardLinksDir(#[source] IoError, PathBuf),
525
526    #[error("failed to get the snapshot's accounts hard link dir: {0}")]
527    GetSnapshotHardLinksDir(#[from] GetSnapshotAccountsHardLinkDirError),
528
529    #[error("failed to hard link storage from '{1}' to '{2}': {0}")]
530    HardLinkStorage(#[source] IoError, PathBuf, PathBuf),
531}
532
533/// Errors that can happen in `get_snapshot_accounts_hardlink_dir()`
534#[derive(Error, Debug)]
535pub enum GetSnapshotAccountsHardLinkDirError {
536    #[error("invalid account storage path '{0}'")]
537    GetAccountPath(PathBuf),
538
539    #[error("failed to create the snapshot hard link dir '{1}': {0}")]
540    CreateSnapshotHardLinkDir(#[source] IoError, PathBuf),
541
542    #[error("failed to symlink snapshot hard link dir '{link}' to '{original}': {source}")]
543    SymlinkSnapshotHardLinkDir {
544        source: IoError,
545        original: PathBuf,
546        link: PathBuf,
547    },
548}
549
550/// The account snapshot directories under <account_path>/snapshot/<slot> contain account files hardlinked
551/// from <account_path>/run taken at snapshot <slot> time.  They are referenced by the symlinks from the
552/// bank snapshot dir snapshot/<slot>/accounts_hardlinks/.  We observed that sometimes the bank snapshot dir
553/// could be deleted but the account snapshot directories were left behind, possibly by some manual operations
554/// or some legacy code not using the symlinks to clean up the account snapshot hardlink directories.
555/// This function cleans up any account snapshot directories that are no longer referenced by the bank
556/// snapshot dirs, to ensure proper snapshot operations.
557pub fn clean_orphaned_account_snapshot_dirs(
558    bank_snapshots_dir: impl AsRef<Path>,
559    account_snapshot_paths: &[PathBuf],
560) -> IoResult<()> {
561    // Create the HashSet of the account snapshot hardlink directories referenced by the snapshot dirs.
562    // This is used to clean up any hardlinks that are no longer referenced by the snapshot dirs.
563    let mut account_snapshot_dirs_referenced = HashSet::new();
564    let snapshots = get_bank_snapshots(bank_snapshots_dir);
565    for snapshot in snapshots {
566        let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
567        // loop through entries in the snapshot_hardlink_dir, read the symlinks, add the target to the HashSet
568        let read_dir = fs::read_dir(&account_hardlinks_dir).map_err(|err| {
569            IoError::other(format!(
570                "failed to read account hardlinks dir '{}': {err}",
571                account_hardlinks_dir.display(),
572            ))
573        })?;
574        for entry in read_dir {
575            let path = entry?.path();
576            let target = fs::read_link(&path).map_err(|err| {
577                IoError::other(format!(
578                    "failed to read symlink '{}': {err}",
579                    path.display(),
580                ))
581            })?;
582            account_snapshot_dirs_referenced.insert(target);
583        }
584    }
585
586    // loop through the account snapshot hardlink directories, if the directory is not in the account_snapshot_dirs_referenced set, delete it
587    for account_snapshot_path in account_snapshot_paths {
588        let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
589            IoError::other(format!(
590                "failed to read account snapshot dir '{}': {err}",
591                account_snapshot_path.display(),
592            ))
593        })?;
594        for entry in read_dir {
595            let path = entry?.path();
596            if !account_snapshot_dirs_referenced.contains(&path) {
597                info!(
598                    "Removing orphaned account snapshot hardlink directory '{}'...",
599                    path.display()
600                );
601                move_and_async_delete_path(&path);
602            }
603        }
604    }
605
606    Ok(())
607}
608
609/// Purges incomplete bank snapshots
610pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
611    let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
612        // If we cannot read the bank snapshots dir, then there's nothing to do
613        return;
614    };
615
616    let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
617
618    let incomplete_dirs: Vec<_> = read_dir_iter
619        .filter_map(|entry| entry.ok())
620        .map(|entry| entry.path())
621        .filter(|path| path.is_dir())
622        .filter(is_incomplete)
623        .collect();
624
625    // attempt to purge all the incomplete directories; do not exit early
626    for incomplete_dir in incomplete_dirs {
627        let result = purge_bank_snapshot(&incomplete_dir);
628        match result {
629            Ok(_) => info!(
630                "Purged incomplete snapshot dir: {}",
631                incomplete_dir.display()
632            ),
633            Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
634        }
635    }
636}
637
638/// Is the bank snapshot complete?
639fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
640    let state_complete_path = bank_snapshot_dir
641        .as_ref()
642        .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
643    state_complete_path.is_file()
644}
645
646/// Writes the full snapshot slot file into the bank snapshot dir
647pub fn write_full_snapshot_slot_file(
648    bank_snapshot_dir: impl AsRef<Path>,
649    full_snapshot_slot: Slot,
650) -> IoResult<()> {
651    let full_snapshot_slot_path = bank_snapshot_dir
652        .as_ref()
653        .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
654    fs::write(
655        &full_snapshot_slot_path,
656        Slot::to_le_bytes(full_snapshot_slot),
657    )
658    .map_err(|err| {
659        IoError::other(format!(
660            "failed to write full snapshot slot file '{}': {err}",
661            full_snapshot_slot_path.display(),
662        ))
663    })
664}
665
666// Reads the full snapshot slot file from the bank snapshot dir
667pub fn read_full_snapshot_slot_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<Slot> {
668    const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
669    let full_snapshot_slot_path = bank_snapshot_dir
670        .as_ref()
671        .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
672    let full_snapshot_slot_file_metadata = fs::metadata(&full_snapshot_slot_path)?;
673    if full_snapshot_slot_file_metadata.len() != SLOT_SIZE as u64 {
674        let error_message = format!(
675            "invalid full snapshot slot file size: '{}' has {} bytes (should be {} bytes)",
676            full_snapshot_slot_path.display(),
677            full_snapshot_slot_file_metadata.len(),
678            SLOT_SIZE,
679        );
680        return Err(IoError::other(error_message));
681    }
682    let mut full_snapshot_slot_file = fs::File::open(&full_snapshot_slot_path)?;
683    let mut buffer = [0; SLOT_SIZE];
684    full_snapshot_slot_file.read_exact(&mut buffer)?;
685    let slot = Slot::from_le_bytes(buffer);
686    Ok(slot)
687}
688
689/// Gets the highest, loadable, bank snapshot
690///
691/// The highest bank snapshot is the one with the highest slot.
692/// To be loadable, the bank snapshot must be a BankSnapshotKind::Post.
693/// And if we're generating snapshots (e.g. running a normal validator), then
694/// the full snapshot file's slot must match the highest full snapshot archive's.
695pub fn get_highest_loadable_bank_snapshot(
696    snapshot_config: &SnapshotConfig,
697) -> Option<BankSnapshotInfo> {
698    let highest_bank_snapshot =
699        get_highest_bank_snapshot_post(&snapshot_config.bank_snapshots_dir)?;
700
701    // If we're *not* generating snapshots, e.g. running ledger-tool, then we *can* load
702    // this bank snapshot, and we do not need to check for anything else.
703    if !snapshot_config.should_generate_snapshots() {
704        return Some(highest_bank_snapshot);
705    }
706
707    // Otherwise, the bank snapshot's full snapshot slot *must* be the same as
708    // the highest full snapshot archive's slot.
709    let highest_full_snapshot_archive_slot =
710        get_highest_full_snapshot_archive_slot(&snapshot_config.full_snapshot_archives_dir)?;
711    let full_snapshot_file_slot =
712        read_full_snapshot_slot_file(&highest_bank_snapshot.snapshot_dir).ok()?;
713    (full_snapshot_file_slot == highest_full_snapshot_archive_slot).then_some(highest_bank_snapshot)
714}
715
716/// If the validator halts in the middle of `archive_snapshot_package()`, the temporary staging
717/// directory won't be cleaned up.  Call this function to clean them up.
718pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
719    if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
720        for entry in entries.flatten() {
721            if entry
722                .file_name()
723                .to_str()
724                .map(|file_name| file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX))
725                .unwrap_or(false)
726            {
727                let path = entry.path();
728                let result = if path.is_dir() {
729                    fs::remove_dir_all(&path)
730                } else {
731                    fs::remove_file(&path)
732                };
733                if let Err(err) = result {
734                    warn!(
735                        "Failed to remove temporary snapshot archive '{}': {err}",
736                        path.display(),
737                    );
738                }
739            }
740        }
741    }
742}
743
744/// Serializes and archives a snapshot package
745pub fn serialize_and_archive_snapshot_package(
746    snapshot_package: SnapshotPackage,
747    snapshot_config: &SnapshotConfig,
748) -> Result<SnapshotArchiveInfo> {
749    let SnapshotPackage {
750        snapshot_kind,
751        slot: snapshot_slot,
752        block_height,
753        hash: snapshot_hash,
754        mut snapshot_storages,
755        status_cache_slot_deltas,
756        bank_fields_to_serialize,
757        bank_hash_stats,
758        accounts_delta_hash,
759        accounts_hash,
760        epoch_accounts_hash,
761        bank_incremental_snapshot_persistence,
762        write_version,
763        enqueued: _,
764    } = snapshot_package;
765
766    let bank_snapshot_info = serialize_snapshot(
767        &snapshot_config.bank_snapshots_dir,
768        snapshot_config.snapshot_version,
769        snapshot_storages.as_slice(),
770        status_cache_slot_deltas.as_slice(),
771        bank_fields_to_serialize,
772        bank_hash_stats,
773        accounts_delta_hash,
774        accounts_hash,
775        epoch_accounts_hash,
776        bank_incremental_snapshot_persistence.as_ref(),
777        write_version,
778    )?;
779
780    // now write the full snapshot slot file after serializing so this bank snapshot is loadable
781    let full_snapshot_archive_slot = match snapshot_kind {
782        SnapshotKind::FullSnapshot => snapshot_slot,
783        SnapshotKind::IncrementalSnapshot(base_slot) => base_slot,
784    };
785    write_full_snapshot_slot_file(&bank_snapshot_info.snapshot_dir, full_snapshot_archive_slot)
786        .map_err(|err| {
787            IoError::other(format!(
788                "failed to serialize snapshot slot {snapshot_slot}, block height {block_height}, kind {snapshot_kind:?}: {err}",
789            ))
790        })?;
791
792    let snapshot_archive_path = match snapshot_package.snapshot_kind {
793        SnapshotKind::FullSnapshot => build_full_snapshot_archive_path(
794            &snapshot_config.full_snapshot_archives_dir,
795            snapshot_package.slot,
796            &snapshot_package.hash,
797            snapshot_config.archive_format,
798        ),
799        SnapshotKind::IncrementalSnapshot(incremental_snapshot_base_slot) => {
800            // After the snapshot has been serialized, it is now safe (and required) to prune all
801            // the storages that are *not* to be archived for this incremental snapshot.
802            snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
803            build_incremental_snapshot_archive_path(
804                &snapshot_config.incremental_snapshot_archives_dir,
805                incremental_snapshot_base_slot,
806                snapshot_package.slot,
807                &snapshot_package.hash,
808                snapshot_config.archive_format,
809            )
810        }
811    };
812
813    let snapshot_archive_info = archive_snapshot(
814        snapshot_kind,
815        snapshot_slot,
816        snapshot_hash,
817        snapshot_storages.as_slice(),
818        &bank_snapshot_info.snapshot_dir,
819        snapshot_archive_path,
820        snapshot_config.archive_format,
821    )?;
822
823    Ok(snapshot_archive_info)
824}
825
826/// Serializes a snapshot into `bank_snapshots_dir`
827#[allow(clippy::too_many_arguments)]
828fn serialize_snapshot(
829    bank_snapshots_dir: impl AsRef<Path>,
830    snapshot_version: SnapshotVersion,
831    snapshot_storages: &[Arc<AccountStorageEntry>],
832    slot_deltas: &[BankSlotDelta],
833    mut bank_fields: BankFieldsToSerialize,
834    bank_hash_stats: BankHashStats,
835    accounts_delta_hash: AccountsDeltaHash,
836    accounts_hash: AccountsHash,
837    epoch_accounts_hash: Option<EpochAccountsHash>,
838    bank_incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
839    write_version: StoredMetaWriteVersion,
840) -> Result<BankSnapshotInfo> {
841    let slot = bank_fields.slot;
842
843    // this lambda function is to facilitate converting between
844    // the AddBankSnapshotError and SnapshotError types
845    let do_serialize_snapshot = || {
846        let mut measure_everything = Measure::start("");
847        let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
848        if bank_snapshot_dir.exists() {
849            return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
850                bank_snapshot_dir,
851            ));
852        }
853        fs::create_dir_all(&bank_snapshot_dir).map_err(|err| {
854            AddBankSnapshotError::CreateSnapshotDir(err, bank_snapshot_dir.clone())
855        })?;
856
857        // the bank snapshot is stored as bank_snapshots_dir/slot/slot
858        let bank_snapshot_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
859        info!(
860            "Creating bank snapshot for slot {slot} at '{}'",
861            bank_snapshot_path.display(),
862        );
863
864        let (_, flush_storages_us) = measure_us!({
865            for storage in snapshot_storages {
866                storage.flush().map_err(|err| {
867                    AddBankSnapshotError::FlushStorage(err, storage.path().to_path_buf())
868                })?;
869            }
870        });
871
872        // We are constructing the snapshot directory to contain the full snapshot state information to allow
873        // constructing a bank from this directory.  It acts like an archive to include the full state.
874        // The set of the account storages files is the necessary part of this snapshot state.  Hard-link them
875        // from the operational accounts/ directory to here.
876        let (_, hard_link_storages_us) = measure_us!(hard_link_storages_to_snapshot(
877            &bank_snapshot_dir,
878            slot,
879            snapshot_storages
880        )
881        .map_err(AddBankSnapshotError::HardLinkStorages)?);
882
883        let bank_snapshot_serializer = move |stream: &mut BufWriter<fs::File>| -> Result<()> {
884            let versioned_epoch_stakes = mem::take(&mut bank_fields.versioned_epoch_stakes);
885            let extra_fields = ExtraFieldsToSerialize {
886                lamports_per_signature: bank_fields.fee_rate_governor.lamports_per_signature,
887                incremental_snapshot_persistence: bank_incremental_snapshot_persistence,
888                epoch_accounts_hash,
889                versioned_epoch_stakes,
890                accounts_lt_hash: bank_fields.accounts_lt_hash.clone().map(Into::into),
891            };
892            serde_snapshot::serialize_bank_snapshot_into(
893                stream,
894                bank_fields,
895                bank_hash_stats,
896                accounts_delta_hash,
897                accounts_hash,
898                &get_storages_to_serialize(snapshot_storages),
899                extra_fields,
900                write_version,
901            )?;
902            Ok(())
903        };
904        let (bank_snapshot_consumed_size, bank_serialize) = measure_time!(
905            serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
906                .map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
907            "bank serialize"
908        );
909
910        let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
911        let (status_cache_consumed_size, status_cache_serialize_us) = measure_us!(
912            snapshot_bank_utils::serialize_status_cache(slot_deltas, &status_cache_path)
913                .map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?
914        );
915
916        let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
917        let (_, write_version_file_us) = measure_us!(fs::write(
918            &version_path,
919            snapshot_version.as_str().as_bytes(),
920        )
921        .map_err(|err| AddBankSnapshotError::WriteSnapshotVersionFile(err, version_path))?);
922
923        // Mark this directory complete so it can be used.  Check this flag first before selecting for deserialization.
924        let state_complete_path = bank_snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
925        let (_, write_state_complete_file_us) = measure_us!(fs::File::create(&state_complete_path)
926            .map_err(|err| {
927                AddBankSnapshotError::CreateStateCompleteFile(err, state_complete_path)
928            })?);
929
930        measure_everything.stop();
931
932        // Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
933        datapoint_info!(
934            "snapshot_bank",
935            ("slot", slot, i64),
936            ("bank_size", bank_snapshot_consumed_size, i64),
937            ("status_cache_size", status_cache_consumed_size, i64),
938            ("flush_storages_us", flush_storages_us, i64),
939            ("hard_link_storages_us", hard_link_storages_us, i64),
940            ("bank_serialize_us", bank_serialize.as_us(), i64),
941            ("status_cache_serialize_us", status_cache_serialize_us, i64),
942            ("write_version_file_us", write_version_file_us, i64),
943            (
944                "write_state_complete_file_us",
945                write_state_complete_file_us,
946                i64
947            ),
948            ("total_us", measure_everything.as_us(), i64),
949        );
950
951        info!(
952            "{} for slot {} at {}",
953            bank_serialize,
954            slot,
955            bank_snapshot_path.display(),
956        );
957
958        Ok(BankSnapshotInfo {
959            slot,
960            snapshot_kind: BankSnapshotKind::Pre,
961            snapshot_dir: bank_snapshot_dir,
962            snapshot_version,
963        })
964    };
965
966    do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
967}
968
969/// Archives a snapshot into `archive_path`
970fn archive_snapshot(
971    snapshot_kind: SnapshotKind,
972    snapshot_slot: Slot,
973    snapshot_hash: SnapshotHash,
974    snapshot_storages: &[Arc<AccountStorageEntry>],
975    bank_snapshot_dir: impl AsRef<Path>,
976    archive_path: impl AsRef<Path>,
977    archive_format: ArchiveFormat,
978) -> Result<SnapshotArchiveInfo> {
979    use ArchiveSnapshotPackageError as E;
980    const SNAPSHOTS_DIR: &str = "snapshots";
981    const ACCOUNTS_DIR: &str = "accounts";
982    info!("Generating snapshot archive for slot {snapshot_slot}, kind: {snapshot_kind:?}");
983
984    let mut timer = Measure::start("snapshot_package-package_snapshots");
985    let tar_dir = archive_path
986        .as_ref()
987        .parent()
988        .expect("Tar output path is invalid");
989
990    fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
991
992    // Create the staging directories
993    let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
994    let staging_dir = tempfile::Builder::new()
995        .prefix(&format!("{}{}-", staging_dir_prefix, snapshot_slot))
996        .tempdir_in(tar_dir)
997        .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
998    let staging_snapshots_dir = staging_dir.path().join(SNAPSHOTS_DIR);
999
1000    let slot_str = snapshot_slot.to_string();
1001    let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
1002    // Creates staging snapshots/<slot>/
1003    fs::create_dir_all(&staging_snapshot_dir)
1004        .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
1005
1006    // To be a source for symlinking and archiving, the path need to be an absolute path
1007    let src_snapshot_dir = bank_snapshot_dir.as_ref().canonicalize().map_err(|err| {
1008        E::CanonicalizeSnapshotSourceDir(err, bank_snapshot_dir.as_ref().to_path_buf())
1009    })?;
1010    let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
1011    let src_snapshot_file = src_snapshot_dir.join(slot_str);
1012    symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
1013        .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
1014
1015    // Following the existing archive format, the status cache is under snapshots/, not under <slot>/
1016    // like in the snapshot dir.
1017    let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1018    let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1019    symlink::symlink_file(&src_status_cache, &staging_status_cache)
1020        .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
1021
1022    // The bank snapshot has the version file, so symlink it to the correct staging path
1023    let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
1024    let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1025    symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
1026        E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
1027    })?;
1028
1029    // Tar the staging directory into the archive at `staging_archive_path`
1030    let staging_archive_path = tar_dir.join(format!(
1031        "{}{}.{}",
1032        staging_dir_prefix,
1033        snapshot_slot,
1034        archive_format.extension(),
1035    ));
1036
1037    {
1038        let mut archive_file = fs::File::create(&staging_archive_path)
1039            .map_err(|err| E::CreateArchiveFile(err, staging_archive_path.clone()))?;
1040
1041        let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
1042            let mut archive = tar::Builder::new(encoder);
1043            // Disable sparse file handling.  This seems to be the root cause of an issue when
1044            // upgrading v2.0 to v2.1, and the tar crate from 0.4.41 to 0.4.42.
1045            // Since the tarball will still go through compression (zstd/etc) afterwards, disabling
1046            // sparse handling in the tar itself should be fine.
1047            //
1048            // Likely introduced in [^1].  Tracking resolution in [^2].
1049            // [^1] https://github.com/alexcrichton/tar-rs/pull/375
1050            // [^2] https://github.com/alexcrichton/tar-rs/issues/403
1051            archive.sparse(false);
1052            // Serialize the version and snapshots files before accounts so we can quickly determine the version
1053            // and other bank fields. This is necessary if we want to interleave unpacking with reconstruction
1054            archive
1055                .append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
1056                .map_err(E::ArchiveVersionFile)?;
1057            archive
1058                .append_dir_all(SNAPSHOTS_DIR, &staging_snapshots_dir)
1059                .map_err(E::ArchiveSnapshotsDir)?;
1060
1061            for storage in snapshot_storages {
1062                let path_in_archive = Path::new(ACCOUNTS_DIR)
1063                    .join(AccountsFile::file_name(storage.slot(), storage.id()));
1064                match storage.accounts.internals_for_archive() {
1065                    InternalsForArchive::Mmap(data) => {
1066                        let mut header = tar::Header::new_gnu();
1067                        header.set_path(path_in_archive).map_err(|err| {
1068                            E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1069                        })?;
1070                        header.set_size(storage.capacity());
1071                        header.set_cksum();
1072                        archive.append(&header, data)
1073                    }
1074                    InternalsForArchive::FileIo(path) => {
1075                        archive.append_path_with_name(path, path_in_archive)
1076                    }
1077                }
1078                .map_err(|err| E::ArchiveAccountStorageFile(err, storage.path().to_path_buf()))?;
1079            }
1080
1081            archive.into_inner().map_err(E::FinishArchive)?;
1082            Ok(())
1083        };
1084
1085        match archive_format {
1086            ArchiveFormat::TarBzip2 => {
1087                let mut encoder =
1088                    bzip2::write::BzEncoder::new(archive_file, bzip2::Compression::best());
1089                do_archive_files(&mut encoder)?;
1090                encoder.finish().map_err(E::FinishEncoder)?;
1091            }
1092            ArchiveFormat::TarGzip => {
1093                let mut encoder =
1094                    flate2::write::GzEncoder::new(archive_file, flate2::Compression::default());
1095                do_archive_files(&mut encoder)?;
1096                encoder.finish().map_err(E::FinishEncoder)?;
1097            }
1098            ArchiveFormat::TarZstd { config } => {
1099                let mut encoder =
1100                    zstd::stream::Encoder::new(archive_file, config.compression_level)
1101                        .map_err(E::CreateEncoder)?;
1102                do_archive_files(&mut encoder)?;
1103                encoder.finish().map_err(E::FinishEncoder)?;
1104            }
1105            ArchiveFormat::TarLz4 => {
1106                let mut encoder = lz4::EncoderBuilder::new()
1107                    .level(1)
1108                    .build(archive_file)
1109                    .map_err(E::CreateEncoder)?;
1110                do_archive_files(&mut encoder)?;
1111                let (_output, result) = encoder.finish();
1112                result.map_err(E::FinishEncoder)?;
1113            }
1114            ArchiveFormat::Tar => {
1115                do_archive_files(&mut archive_file)?;
1116            }
1117        };
1118    }
1119
1120    // Atomically move the archive into position for other validators to find
1121    let metadata = fs::metadata(&staging_archive_path)
1122        .map_err(|err| E::QueryArchiveMetadata(err, staging_archive_path.clone()))?;
1123    let archive_path = archive_path.as_ref().to_path_buf();
1124    fs::rename(&staging_archive_path, &archive_path)
1125        .map_err(|err| E::MoveArchive(err, staging_archive_path, archive_path.clone()))?;
1126
1127    timer.stop();
1128    info!(
1129        "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
1130        archive_path.display(),
1131        snapshot_slot,
1132        timer.as_ms(),
1133        metadata.len()
1134    );
1135
1136    datapoint_info!(
1137        "archive-snapshot-package",
1138        ("slot", snapshot_slot, i64),
1139        ("archive_format", archive_format.to_string(), String),
1140        ("duration_ms", timer.as_ms(), i64),
1141        (
1142            if snapshot_kind.is_full_snapshot() {
1143                "full-snapshot-archive-size"
1144            } else {
1145                "incremental-snapshot-archive-size"
1146            },
1147            metadata.len(),
1148            i64
1149        ),
1150    );
1151    Ok(SnapshotArchiveInfo {
1152        path: archive_path,
1153        slot: snapshot_slot,
1154        hash: snapshot_hash,
1155        archive_format,
1156    })
1157}
1158
1159/// Get the bank snapshots in a directory
1160pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1161    let mut bank_snapshots = Vec::default();
1162    match fs::read_dir(&bank_snapshots_dir) {
1163        Err(err) => {
1164            info!(
1165                "Unable to read bank snapshots directory '{}': {err}",
1166                bank_snapshots_dir.as_ref().display(),
1167            );
1168        }
1169        Ok(paths) => paths
1170            .filter_map(|entry| {
1171                // check if this entry is a directory and only a Slot
1172                // bank snapshots are bank_snapshots_dir/slot/slot(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION)
1173                entry
1174                    .ok()
1175                    .filter(|entry| entry.path().is_dir())
1176                    .and_then(|entry| {
1177                        entry
1178                            .path()
1179                            .file_name()
1180                            .and_then(|file_name| file_name.to_str())
1181                            .and_then(|file_name| file_name.parse::<Slot>().ok())
1182                    })
1183            })
1184            .for_each(
1185                |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
1186                    Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
1187                    // Other threads may be modifying bank snapshots in parallel; only return
1188                    // snapshots that are complete as deemed by BankSnapshotInfo::new_from_dir()
1189                    Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
1190                },
1191            ),
1192    }
1193    bank_snapshots
1194}
1195
1196/// Get the bank snapshots in a directory
1197///
1198/// This function retains only the bank snapshots of kind BankSnapshotKind::Pre
1199pub fn get_bank_snapshots_pre(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1200    let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1201    bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Pre);
1202    bank_snapshots
1203}
1204
1205/// Get the bank snapshots in a directory
1206///
1207/// This function retains only the bank snapshots of kind BankSnapshotKind::Post
1208pub fn get_bank_snapshots_post(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1209    let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1210    bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Post);
1211    bank_snapshots
1212}
1213
1214/// Get the bank snapshot with the highest slot in a directory
1215///
1216/// This function gets the highest bank snapshot of kind BankSnapshotKind::Pre
1217pub fn get_highest_bank_snapshot_pre(
1218    bank_snapshots_dir: impl AsRef<Path>,
1219) -> Option<BankSnapshotInfo> {
1220    do_get_highest_bank_snapshot(get_bank_snapshots_pre(bank_snapshots_dir))
1221}
1222
1223/// Get the bank snapshot with the highest slot in a directory
1224///
1225/// This function gets the highest bank snapshot of kind BankSnapshotKind::Post
1226pub fn get_highest_bank_snapshot_post(
1227    bank_snapshots_dir: impl AsRef<Path>,
1228) -> Option<BankSnapshotInfo> {
1229    do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
1230}
1231
1232/// Get the bank snapshot with the highest slot in a directory
1233///
1234/// This function gets the highest bank snapshot of any kind
1235pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
1236    do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
1237}
1238
1239fn do_get_highest_bank_snapshot(
1240    mut bank_snapshots: Vec<BankSnapshotInfo>,
1241) -> Option<BankSnapshotInfo> {
1242    bank_snapshots.sort_unstable();
1243    bank_snapshots.into_iter().next_back()
1244}
1245
1246pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
1247where
1248    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1249{
1250    serialize_snapshot_data_file_capped::<F>(
1251        data_file_path,
1252        MAX_SNAPSHOT_DATA_FILE_SIZE,
1253        serializer,
1254    )
1255}
1256
1257pub fn deserialize_snapshot_data_file<T: Sized>(
1258    data_file_path: &Path,
1259    deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
1260) -> Result<T> {
1261    let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
1262        deserializer(streams.full_snapshot_stream)
1263    };
1264
1265    let wrapped_data_file_path = SnapshotRootPaths {
1266        full_snapshot_root_file_path: data_file_path.to_path_buf(),
1267        incremental_snapshot_root_file_path: None,
1268    };
1269
1270    deserialize_snapshot_data_files_capped(
1271        &wrapped_data_file_path,
1272        MAX_SNAPSHOT_DATA_FILE_SIZE,
1273        wrapped_deserializer,
1274    )
1275}
1276
1277pub fn deserialize_snapshot_data_files<T: Sized>(
1278    snapshot_root_paths: &SnapshotRootPaths,
1279    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1280) -> Result<T> {
1281    deserialize_snapshot_data_files_capped(
1282        snapshot_root_paths,
1283        MAX_SNAPSHOT_DATA_FILE_SIZE,
1284        deserializer,
1285    )
1286}
1287
1288fn serialize_snapshot_data_file_capped<F>(
1289    data_file_path: &Path,
1290    maximum_file_size: u64,
1291    serializer: F,
1292) -> Result<u64>
1293where
1294    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1295{
1296    let data_file = fs::File::create(data_file_path)?;
1297    let mut data_file_stream = BufWriter::new(data_file);
1298    serializer(&mut data_file_stream)?;
1299    data_file_stream.flush()?;
1300
1301    let consumed_size = data_file_stream.stream_position()?;
1302    if consumed_size > maximum_file_size {
1303        let error_message = format!(
1304            "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
1305            data_file_path.display(),
1306        );
1307        return Err(IoError::other(error_message).into());
1308    }
1309    Ok(consumed_size)
1310}
1311
1312fn deserialize_snapshot_data_files_capped<T: Sized>(
1313    snapshot_root_paths: &SnapshotRootPaths,
1314    maximum_file_size: u64,
1315    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1316) -> Result<T> {
1317    let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
1318        create_snapshot_data_file_stream(
1319            &snapshot_root_paths.full_snapshot_root_file_path,
1320            maximum_file_size,
1321        )?;
1322
1323    let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
1324        if let Some(ref incremental_snapshot_root_file_path) =
1325            snapshot_root_paths.incremental_snapshot_root_file_path
1326        {
1327            Some(create_snapshot_data_file_stream(
1328                incremental_snapshot_root_file_path,
1329                maximum_file_size,
1330            )?)
1331        } else {
1332            None
1333        }
1334        .unzip();
1335
1336    let mut snapshot_streams = SnapshotStreams {
1337        full_snapshot_stream: &mut full_snapshot_data_file_stream,
1338        incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
1339    };
1340    let ret = deserializer(&mut snapshot_streams)?;
1341
1342    check_deserialize_file_consumed(
1343        full_snapshot_file_size,
1344        &snapshot_root_paths.full_snapshot_root_file_path,
1345        &mut full_snapshot_data_file_stream,
1346    )?;
1347
1348    if let Some(ref incremental_snapshot_root_file_path) =
1349        snapshot_root_paths.incremental_snapshot_root_file_path
1350    {
1351        check_deserialize_file_consumed(
1352            incremental_snapshot_file_size.unwrap(),
1353            incremental_snapshot_root_file_path,
1354            incremental_snapshot_data_file_stream.as_mut().unwrap(),
1355        )?;
1356    }
1357
1358    Ok(ret)
1359}
1360
1361/// Before running the deserializer function, perform common operations on the snapshot archive
1362/// files, such as checking the file size and opening the file into a stream.
1363fn create_snapshot_data_file_stream(
1364    snapshot_root_file_path: impl AsRef<Path>,
1365    maximum_file_size: u64,
1366) -> Result<(u64, BufReader<std::fs::File>)> {
1367    let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
1368
1369    if snapshot_file_size > maximum_file_size {
1370        let error_message = format!(
1371            "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
1372            snapshot_root_file_path.as_ref().display(),
1373            snapshot_file_size,
1374            maximum_file_size,
1375        );
1376        return Err(IoError::other(error_message).into());
1377    }
1378
1379    let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
1380    let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
1381
1382    Ok((snapshot_file_size, snapshot_data_file_stream))
1383}
1384
1385/// After running the deserializer function, perform common checks to ensure the snapshot archive
1386/// files were consumed correctly.
1387fn check_deserialize_file_consumed(
1388    file_size: u64,
1389    file_path: impl AsRef<Path>,
1390    file_stream: &mut BufReader<std::fs::File>,
1391) -> Result<()> {
1392    let consumed_size = file_stream.stream_position()?;
1393
1394    if consumed_size != file_size {
1395        let error_message = format!(
1396            "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to deserialize",
1397            file_path.as_ref().display(),
1398            file_size,
1399            consumed_size,
1400        );
1401        return Err(IoError::other(error_message).into());
1402    }
1403
1404    Ok(())
1405}
1406
1407/// Return account path from the appendvec path after checking its format.
1408fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
1409    let run_path = appendvec_path.parent()?;
1410    let run_file_name = run_path.file_name()?;
1411    // All appendvec files should be under <account_path>/run/.
1412    // When generating the bank snapshot directory, they are hardlinked to <account_path>/snapshot/<slot>/
1413    if run_file_name != ACCOUNTS_RUN_DIR {
1414        error!(
1415            "The account path {} does not have run/ as its immediate parent directory.",
1416            run_path.display()
1417        );
1418        return None;
1419    }
1420    let account_path = run_path.parent()?;
1421    Some(account_path.to_path_buf())
1422}
1423
1424/// From an appendvec path, derive the snapshot hardlink path.  If the corresponding snapshot hardlink
1425/// directory does not exist, create it.
1426fn get_snapshot_accounts_hardlink_dir(
1427    appendvec_path: &Path,
1428    bank_slot: Slot,
1429    account_paths: &mut HashSet<PathBuf>,
1430    hardlinks_dir: impl AsRef<Path>,
1431) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1432    let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1433        GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1434    })?;
1435
1436    let snapshot_hardlink_dir = account_path
1437        .join(ACCOUNTS_SNAPSHOT_DIR)
1438        .join(bank_slot.to_string());
1439
1440    // Use the hashset to track, to avoid checking the file system.  Only set up the hardlink directory
1441    // and the symlink to it at the first time of seeing the account_path.
1442    if !account_paths.contains(&account_path) {
1443        let idx = account_paths.len();
1444        debug!(
1445            "for appendvec_path {}, create hard-link path {}",
1446            appendvec_path.display(),
1447            snapshot_hardlink_dir.display()
1448        );
1449        fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1450            GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1451                err,
1452                snapshot_hardlink_dir.clone(),
1453            )
1454        })?;
1455        let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1456        symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1457            GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1458                source: err,
1459                original: snapshot_hardlink_dir.clone(),
1460                link: symlink_path,
1461            }
1462        })?;
1463        account_paths.insert(account_path);
1464    };
1465
1466    Ok(snapshot_hardlink_dir)
1467}
1468
1469/// Hard-link the files from accounts/ to snapshot/<bank_slot>/accounts/
1470/// This keeps the appendvec files alive and with the bank snapshot.  The slot and id
1471/// in the file names are also updated in case its file is a recycled one with inconsistent slot
1472/// and id.
1473pub fn hard_link_storages_to_snapshot(
1474    bank_snapshot_dir: impl AsRef<Path>,
1475    bank_slot: Slot,
1476    snapshot_storages: &[Arc<AccountStorageEntry>],
1477) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1478    let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1479    fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1480        HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1481            err,
1482            accounts_hardlinks_dir.clone(),
1483        )
1484    })?;
1485
1486    let mut account_paths: HashSet<PathBuf> = HashSet::new();
1487    for storage in snapshot_storages {
1488        let storage_path = storage.accounts.path();
1489        let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1490            storage_path,
1491            bank_slot,
1492            &mut account_paths,
1493            &accounts_hardlinks_dir,
1494        )?;
1495        // The appendvec could be recycled, so its filename may not be consistent to the slot and id.
1496        // Use the storage slot and id to compose a consistent file name for the hard-link file.
1497        let hardlink_filename = AccountsFile::file_name(storage.slot(), storage.id());
1498        let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1499        fs::hard_link(storage_path, &hard_link_path).map_err(|err| {
1500            HardLinkStoragesToSnapshotError::HardLinkStorage(
1501                err,
1502                storage_path.to_path_buf(),
1503                hard_link_path,
1504            )
1505        })?;
1506    }
1507    Ok(())
1508}
1509
1510/// serializing needs Vec<Vec<Arc<AccountStorageEntry>>>, but data structure at runtime is Vec<Arc<AccountStorageEntry>>
1511/// translates to what we need
1512pub(crate) fn get_storages_to_serialize(
1513    snapshot_storages: &[Arc<AccountStorageEntry>],
1514) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1515    snapshot_storages
1516        .iter()
1517        .map(|storage| vec![Arc::clone(storage)])
1518        .collect::<Vec<_>>()
1519}
1520
1521// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later.
1522const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
1523
1524/// Unarchives the given full and incremental snapshot archives, as long as they are compatible.
1525pub fn verify_and_unarchive_snapshots(
1526    bank_snapshots_dir: impl AsRef<Path>,
1527    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1528    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1529    account_paths: &[PathBuf],
1530    storage_access: StorageAccess,
1531) -> Result<(
1532    UnarchivedSnapshot,
1533    Option<UnarchivedSnapshot>,
1534    AtomicAccountsFileId,
1535)> {
1536    check_are_snapshots_compatible(
1537        full_snapshot_archive_info,
1538        incremental_snapshot_archive_info,
1539    )?;
1540
1541    let parallel_divisions = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT);
1542
1543    let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
1544    let unarchived_full_snapshot = unarchive_snapshot(
1545        &bank_snapshots_dir,
1546        TMP_SNAPSHOT_ARCHIVE_PREFIX,
1547        full_snapshot_archive_info.path(),
1548        "snapshot untar",
1549        account_paths,
1550        full_snapshot_archive_info.archive_format(),
1551        parallel_divisions,
1552        next_append_vec_id.clone(),
1553        storage_access,
1554    )?;
1555
1556    let unarchived_incremental_snapshot =
1557        if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1558            let unarchived_incremental_snapshot = unarchive_snapshot(
1559                &bank_snapshots_dir,
1560                TMP_SNAPSHOT_ARCHIVE_PREFIX,
1561                incremental_snapshot_archive_info.path(),
1562                "incremental snapshot untar",
1563                account_paths,
1564                incremental_snapshot_archive_info.archive_format(),
1565                parallel_divisions,
1566                next_append_vec_id.clone(),
1567                storage_access,
1568            )?;
1569            Some(unarchived_incremental_snapshot)
1570        } else {
1571            None
1572        };
1573
1574    Ok((
1575        unarchived_full_snapshot,
1576        unarchived_incremental_snapshot,
1577        Arc::try_unwrap(next_append_vec_id).unwrap(),
1578    ))
1579}
1580
1581/// Spawns a thread for unpacking a snapshot
1582fn spawn_unpack_snapshot_thread(
1583    file_sender: Sender<PathBuf>,
1584    account_paths: Arc<Vec<PathBuf>>,
1585    ledger_dir: Arc<PathBuf>,
1586    mut archive: Archive<SharedBufferReader>,
1587    parallel_selector: Option<ParallelSelector>,
1588    thread_index: usize,
1589) -> JoinHandle<()> {
1590    Builder::new()
1591        .name(format!("solUnpkSnpsht{thread_index:02}"))
1592        .spawn(move || {
1593            hardened_unpack::streaming_unpack_snapshot(
1594                &mut archive,
1595                ledger_dir.as_path(),
1596                &account_paths,
1597                parallel_selector,
1598                &file_sender,
1599            )
1600            .unwrap();
1601        })
1602        .unwrap()
1603}
1604
1605/// Streams unpacked files across channel
1606fn streaming_unarchive_snapshot(
1607    file_sender: Sender<PathBuf>,
1608    account_paths: Vec<PathBuf>,
1609    ledger_dir: PathBuf,
1610    snapshot_archive_path: PathBuf,
1611    archive_format: ArchiveFormat,
1612    num_threads: usize,
1613) -> Vec<JoinHandle<()>> {
1614    let account_paths = Arc::new(account_paths);
1615    let ledger_dir = Arc::new(ledger_dir);
1616    let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format);
1617
1618    // All shared buffer readers need to be created before the threads are spawned
1619    let archives: Vec<_> = (0..num_threads)
1620        .map(|_| {
1621            let reader = SharedBufferReader::new(&shared_buffer);
1622            Archive::new(reader)
1623        })
1624        .collect();
1625
1626    archives
1627        .into_iter()
1628        .enumerate()
1629        .map(|(thread_index, archive)| {
1630            let parallel_selector = Some(ParallelSelector {
1631                index: thread_index,
1632                divisions: num_threads,
1633            });
1634
1635            spawn_unpack_snapshot_thread(
1636                file_sender.clone(),
1637                account_paths.clone(),
1638                ledger_dir.clone(),
1639                archive,
1640                parallel_selector,
1641                thread_index,
1642            )
1643        })
1644        .collect()
1645}
1646
1647/// BankSnapshotInfo::new_from_dir() requires a few meta files to accept a snapshot dir
1648/// as a valid one.  A dir unpacked from an archive lacks these files.  Fill them here to
1649/// allow new_from_dir() checks to pass.  These checks are not needed for unpacked dirs,
1650/// but it is not clean to add another flag to new_from_dir() to skip them.
1651fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1652    let snapshots_dir = unpack_dir.as_ref().join("snapshots");
1653    if !snapshots_dir.is_dir() {
1654        return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1655    }
1656
1657    // The unpacked dir has a single slot dir, which is the snapshot slot dir.
1658    let slot_dir = std::fs::read_dir(&snapshots_dir)
1659        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1660        .find(|entry| entry.as_ref().unwrap().path().is_dir())
1661        .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1662        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1663        .path();
1664
1665    let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
1666    fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
1667
1668    let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1669    fs::hard_link(
1670        status_cache_file,
1671        slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
1672    )?;
1673
1674    let state_complete_file = slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
1675    fs::File::create(state_complete_file)?;
1676
1677    Ok(())
1678}
1679
1680/// Perform the common tasks when unarchiving a snapshot.  Handles creating the temporary
1681/// directories, untaring, reading the version file, and then returning those fields plus the
1682/// rebuilt storage
1683fn unarchive_snapshot(
1684    bank_snapshots_dir: impl AsRef<Path>,
1685    unpacked_snapshots_dir_prefix: &'static str,
1686    snapshot_archive_path: impl AsRef<Path>,
1687    measure_name: &'static str,
1688    account_paths: &[PathBuf],
1689    archive_format: ArchiveFormat,
1690    parallel_divisions: usize,
1691    next_append_vec_id: Arc<AtomicAccountsFileId>,
1692    storage_access: StorageAccess,
1693) -> Result<UnarchivedSnapshot> {
1694    let unpack_dir = tempfile::Builder::new()
1695        .prefix(unpacked_snapshots_dir_prefix)
1696        .tempdir_in(bank_snapshots_dir)?;
1697    let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
1698
1699    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1700    streaming_unarchive_snapshot(
1701        file_sender,
1702        account_paths.to_vec(),
1703        unpack_dir.path().to_path_buf(),
1704        snapshot_archive_path.as_ref().to_path_buf(),
1705        archive_format,
1706        parallel_divisions,
1707    );
1708
1709    let num_rebuilder_threads = num_cpus::get_physical()
1710        .saturating_sub(parallel_divisions)
1711        .max(1);
1712    let (version_and_storages, measure_untar) = measure_time!(
1713        SnapshotStorageRebuilder::rebuild_storage(
1714            file_receiver,
1715            num_rebuilder_threads,
1716            next_append_vec_id,
1717            SnapshotFrom::Archive,
1718            storage_access,
1719        )?,
1720        measure_name
1721    );
1722    info!("{}", measure_untar);
1723
1724    create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1725
1726    let RebuiltSnapshotStorage {
1727        snapshot_version,
1728        storage,
1729    } = version_and_storages;
1730    Ok(UnarchivedSnapshot {
1731        unpack_dir,
1732        storage,
1733        unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1734            unpacked_snapshots_dir,
1735            snapshot_version,
1736        },
1737        measure_untar,
1738    })
1739}
1740
1741/// Streams snapshot dir files across channel
1742/// Follow the flow of streaming_unarchive_snapshot(), but handle the from_dir case.
1743fn streaming_snapshot_dir_files(
1744    file_sender: Sender<PathBuf>,
1745    snapshot_file_path: impl Into<PathBuf>,
1746    snapshot_version_path: impl Into<PathBuf>,
1747    account_paths: &[PathBuf],
1748) -> Result<()> {
1749    file_sender.send(snapshot_file_path.into())?;
1750    file_sender.send(snapshot_version_path.into())?;
1751
1752    for account_path in account_paths {
1753        for file in fs::read_dir(account_path)? {
1754            file_sender.send(file?.path())?;
1755        }
1756    }
1757
1758    Ok(())
1759}
1760
1761/// Performs the common tasks when deserializing a snapshot
1762///
1763/// Handles reading the snapshot file and version file,
1764/// then returning those fields plus the rebuilt storages.
1765pub fn rebuild_storages_from_snapshot_dir(
1766    snapshot_info: &BankSnapshotInfo,
1767    account_paths: &[PathBuf],
1768    next_append_vec_id: Arc<AtomicAccountsFileId>,
1769    storage_access: StorageAccess,
1770) -> Result<AccountStorageMap> {
1771    let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1772    let accounts_hardlinks = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1773    let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1774
1775    let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1776        IoError::other(format!(
1777            "failed to read accounts hardlinks dir '{}': {err}",
1778            accounts_hardlinks.display(),
1779        ))
1780    })?;
1781    for dir_entry in read_dir {
1782        let symlink_path = dir_entry?.path();
1783        // The symlink point to <account_path>/snapshot/<slot> which contain the account files hardlinks
1784        // The corresponding run path should be <account_path>/run/
1785        let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
1786            IoError::other(format!(
1787                "failed to read symlink '{}': {err}",
1788                symlink_path.display(),
1789            ))
1790        })?;
1791        let account_run_path = account_snapshot_path
1792            .parent()
1793            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1794            .parent()
1795            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1796            .join(ACCOUNTS_RUN_DIR);
1797        if !account_run_paths.contains(&account_run_path) {
1798            // The appendvec from the bank snapshot storage does not match any of the provided account_paths set.
1799            // The accout paths have changed so the snapshot is no longer usable.
1800            return Err(SnapshotError::AccountPathsMismatch);
1801        }
1802        // Generate hard-links to make the account files available in the main accounts/, and let the new appendvec
1803        // paths be in accounts/
1804        let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
1805            IoError::other(format!(
1806                "failed to read account snapshot dir '{}': {err}",
1807                account_snapshot_path.display(),
1808            ))
1809        })?;
1810        for file in read_dir {
1811            let file_path = file?.path();
1812            let file_name = file_path
1813                .file_name()
1814                .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
1815            let dest_path = account_run_path.join(file_name);
1816            fs::hard_link(&file_path, &dest_path).map_err(|err| {
1817                IoError::other(format!(
1818                    "failed to hard link from '{}' to '{}': {err}",
1819                    file_path.display(),
1820                    dest_path.display(),
1821                ))
1822            })?;
1823        }
1824    }
1825
1826    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1827    let snapshot_file_path = &snapshot_info.snapshot_path();
1828    let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1829    streaming_snapshot_dir_files(
1830        file_sender,
1831        snapshot_file_path,
1832        snapshot_version_path,
1833        account_paths,
1834    )?;
1835
1836    let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1837    let version_and_storages = SnapshotStorageRebuilder::rebuild_storage(
1838        file_receiver,
1839        num_rebuilder_threads,
1840        next_append_vec_id,
1841        SnapshotFrom::Dir,
1842        storage_access,
1843    )?;
1844
1845    let RebuiltSnapshotStorage {
1846        snapshot_version: _,
1847        storage,
1848    } = version_and_storages;
1849    Ok(storage)
1850}
1851
1852/// Reads the `snapshot_version` from a file. Before opening the file, its size
1853/// is compared to `MAX_SNAPSHOT_VERSION_FILE_SIZE`. If the size exceeds this
1854/// threshold, it is not opened and an error is returned.
1855fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
1856    // Check file size.
1857    let file_metadata = fs::metadata(&path).map_err(|err| {
1858        IoError::other(format!(
1859            "failed to query snapshot version file metadata '{}': {err}",
1860            path.as_ref().display(),
1861        ))
1862    })?;
1863    let file_size = file_metadata.len();
1864    if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
1865        let error_message = format!(
1866            "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
1867            path.as_ref().display(),
1868            file_size,
1869            MAX_SNAPSHOT_VERSION_FILE_SIZE,
1870        );
1871        return Err(IoError::other(error_message).into());
1872    }
1873
1874    // Read snapshot_version from file.
1875    let mut snapshot_version = String::new();
1876    let mut file = fs::File::open(&path).map_err(|err| {
1877        IoError::other(format!(
1878            "failed to open snapshot version file '{}': {err}",
1879            path.as_ref().display()
1880        ))
1881    })?;
1882    file.read_to_string(&mut snapshot_version).map_err(|err| {
1883        IoError::other(format!(
1884            "failed to read snapshot version from file '{}': {err}",
1885            path.as_ref().display()
1886        ))
1887    })?;
1888
1889    Ok(snapshot_version.trim().to_string())
1890}
1891
1892/// Check if an incremental snapshot is compatible with a full snapshot.  This is done by checking
1893/// if the incremental snapshot's base slot is the same as the full snapshot's slot.
1894fn check_are_snapshots_compatible(
1895    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1896    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1897) -> Result<()> {
1898    if incremental_snapshot_archive_info.is_none() {
1899        return Ok(());
1900    }
1901
1902    let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
1903
1904    (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
1905        .then_some(())
1906        .ok_or_else(|| {
1907            SnapshotError::MismatchedBaseSlot(
1908                full_snapshot_archive_info.slot(),
1909                incremental_snapshot_archive_info.base_slot(),
1910            )
1911        })
1912}
1913
1914/// Get the `&str` from a `&Path`
1915pub fn path_to_file_name_str(path: &Path) -> Result<&str> {
1916    path.file_name()
1917        .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))?
1918        .to_str()
1919        .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf()))
1920}
1921
1922pub fn build_snapshot_archives_remote_dir(snapshot_archives_dir: impl AsRef<Path>) -> PathBuf {
1923    snapshot_archives_dir
1924        .as_ref()
1925        .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
1926}
1927
1928/// Build the full snapshot archive path from its components: the snapshot archives directory, the
1929/// snapshot slot, the accounts hash, and the archive format.
1930pub fn build_full_snapshot_archive_path(
1931    full_snapshot_archives_dir: impl AsRef<Path>,
1932    slot: Slot,
1933    hash: &SnapshotHash,
1934    archive_format: ArchiveFormat,
1935) -> PathBuf {
1936    full_snapshot_archives_dir.as_ref().join(format!(
1937        "snapshot-{}-{}.{}",
1938        slot,
1939        hash.0,
1940        archive_format.extension(),
1941    ))
1942}
1943
1944/// Build the incremental snapshot archive path from its components: the snapshot archives
1945/// directory, the snapshot base slot, the snapshot slot, the accounts hash, and the archive
1946/// format.
1947pub fn build_incremental_snapshot_archive_path(
1948    incremental_snapshot_archives_dir: impl AsRef<Path>,
1949    base_slot: Slot,
1950    slot: Slot,
1951    hash: &SnapshotHash,
1952    archive_format: ArchiveFormat,
1953) -> PathBuf {
1954    incremental_snapshot_archives_dir.as_ref().join(format!(
1955        "incremental-snapshot-{}-{}-{}.{}",
1956        base_slot,
1957        slot,
1958        hash.0,
1959        archive_format.extension(),
1960    ))
1961}
1962
1963/// Parse a full snapshot archive filename into its Slot, Hash, and Archive Format
1964pub(crate) fn parse_full_snapshot_archive_filename(
1965    archive_filename: &str,
1966) -> Result<(Slot, SnapshotHash, ArchiveFormat)> {
1967    lazy_static! {
1968        static ref RE: Regex = Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1969    }
1970
1971    let do_parse = || {
1972        RE.captures(archive_filename).and_then(|captures| {
1973            let slot = captures
1974                .name("slot")
1975                .map(|x| x.as_str().parse::<Slot>())?
1976                .ok()?;
1977            let hash = captures
1978                .name("hash")
1979                .map(|x| x.as_str().parse::<Hash>())?
1980                .ok()?;
1981            let archive_format = captures
1982                .name("ext")
1983                .map(|x| x.as_str().parse::<ArchiveFormat>())?
1984                .ok()?;
1985
1986            Some((slot, SnapshotHash(hash), archive_format))
1987        })
1988    };
1989
1990    do_parse().ok_or_else(|| {
1991        SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
1992    })
1993}
1994
1995/// Parse an incremental snapshot archive filename into its base Slot, actual Slot, Hash, and Archive Format
1996pub(crate) fn parse_incremental_snapshot_archive_filename(
1997    archive_filename: &str,
1998) -> Result<(Slot, Slot, SnapshotHash, ArchiveFormat)> {
1999    lazy_static! {
2000        static ref RE: Regex = Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
2001    }
2002
2003    let do_parse = || {
2004        RE.captures(archive_filename).and_then(|captures| {
2005            let base_slot = captures
2006                .name("base")
2007                .map(|x| x.as_str().parse::<Slot>())?
2008                .ok()?;
2009            let slot = captures
2010                .name("slot")
2011                .map(|x| x.as_str().parse::<Slot>())?
2012                .ok()?;
2013            let hash = captures
2014                .name("hash")
2015                .map(|x| x.as_str().parse::<Hash>())?
2016                .ok()?;
2017            let archive_format = captures
2018                .name("ext")
2019                .map(|x| x.as_str().parse::<ArchiveFormat>())?
2020                .ok()?;
2021
2022            Some((base_slot, slot, SnapshotHash(hash), archive_format))
2023        })
2024    };
2025
2026    do_parse().ok_or_else(|| {
2027        SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2028    })
2029}
2030
2031/// Walk down the snapshot archive to collect snapshot archive file info
2032fn get_snapshot_archives<T, F>(snapshot_archives_dir: &Path, cb: F) -> Vec<T>
2033where
2034    F: Fn(PathBuf) -> Result<T>,
2035{
2036    let walk_dir = |dir: &Path| -> Vec<T> {
2037        let entry_iter = fs::read_dir(dir);
2038        match entry_iter {
2039            Err(err) => {
2040                info!(
2041                    "Unable to read snapshot archives directory '{}': {err}",
2042                    dir.display(),
2043                );
2044                vec![]
2045            }
2046            Ok(entries) => entries
2047                .filter_map(|entry| entry.map_or(None, |entry| cb(entry.path()).ok()))
2048                .collect(),
2049        }
2050    };
2051
2052    let mut ret = walk_dir(snapshot_archives_dir);
2053    let remote_dir = build_snapshot_archives_remote_dir(snapshot_archives_dir);
2054    if remote_dir.exists() {
2055        ret.append(&mut walk_dir(remote_dir.as_ref()));
2056    }
2057    ret
2058}
2059
2060/// Get a list of the full snapshot archives from a directory
2061pub fn get_full_snapshot_archives(
2062    full_snapshot_archives_dir: impl AsRef<Path>,
2063) -> Vec<FullSnapshotArchiveInfo> {
2064    get_snapshot_archives(
2065        full_snapshot_archives_dir.as_ref(),
2066        FullSnapshotArchiveInfo::new_from_path,
2067    )
2068}
2069
2070/// Get a list of the incremental snapshot archives from a directory
2071pub fn get_incremental_snapshot_archives(
2072    incremental_snapshot_archives_dir: impl AsRef<Path>,
2073) -> Vec<IncrementalSnapshotArchiveInfo> {
2074    get_snapshot_archives(
2075        incremental_snapshot_archives_dir.as_ref(),
2076        IncrementalSnapshotArchiveInfo::new_from_path,
2077    )
2078}
2079
2080/// Get the highest slot of the full snapshot archives in a directory
2081pub fn get_highest_full_snapshot_archive_slot(
2082    full_snapshot_archives_dir: impl AsRef<Path>,
2083) -> Option<Slot> {
2084    get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
2085        .map(|full_snapshot_archive_info| full_snapshot_archive_info.slot())
2086}
2087
2088/// Get the highest slot of the incremental snapshot archives in a directory, for a given full
2089/// snapshot slot
2090pub fn get_highest_incremental_snapshot_archive_slot(
2091    incremental_snapshot_archives_dir: impl AsRef<Path>,
2092    full_snapshot_slot: Slot,
2093) -> Option<Slot> {
2094    get_highest_incremental_snapshot_archive_info(
2095        incremental_snapshot_archives_dir,
2096        full_snapshot_slot,
2097    )
2098    .map(|incremental_snapshot_archive_info| incremental_snapshot_archive_info.slot())
2099}
2100
2101/// Get the path (and metadata) for the full snapshot archive with the highest slot in a directory
2102pub fn get_highest_full_snapshot_archive_info(
2103    full_snapshot_archives_dir: impl AsRef<Path>,
2104) -> Option<FullSnapshotArchiveInfo> {
2105    let mut full_snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2106    full_snapshot_archives.sort_unstable();
2107    full_snapshot_archives.into_iter().next_back()
2108}
2109
2110/// Get the path for the incremental snapshot archive with the highest slot, for a given full
2111/// snapshot slot, in a directory
2112pub fn get_highest_incremental_snapshot_archive_info(
2113    incremental_snapshot_archives_dir: impl AsRef<Path>,
2114    full_snapshot_slot: Slot,
2115) -> Option<IncrementalSnapshotArchiveInfo> {
2116    // Since we want to filter down to only the incremental snapshot archives that have the same
2117    // full snapshot slot as the value passed in, perform the filtering before sorting to avoid
2118    // doing unnecessary work.
2119    let mut incremental_snapshot_archives =
2120        get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
2121            .into_iter()
2122            .filter(|incremental_snapshot_archive_info| {
2123                incremental_snapshot_archive_info.base_slot() == full_snapshot_slot
2124            })
2125            .collect::<Vec<_>>();
2126    incremental_snapshot_archives.sort_unstable();
2127    incremental_snapshot_archives.into_iter().next_back()
2128}
2129
2130pub fn purge_old_snapshot_archives(
2131    full_snapshot_archives_dir: impl AsRef<Path>,
2132    incremental_snapshot_archives_dir: impl AsRef<Path>,
2133    maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2134    maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2135) {
2136    info!(
2137        "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
2138        full_snapshot_archives_dir.as_ref().display(),
2139        maximum_full_snapshot_archives_to_retain
2140    );
2141
2142    let mut full_snapshot_archives = get_full_snapshot_archives(&full_snapshot_archives_dir);
2143    full_snapshot_archives.sort_unstable();
2144    full_snapshot_archives.reverse();
2145
2146    let num_to_retain = full_snapshot_archives
2147        .len()
2148        .min(maximum_full_snapshot_archives_to_retain.get());
2149    trace!(
2150        "There are {} full snapshot archives, retaining {}",
2151        full_snapshot_archives.len(),
2152        num_to_retain,
2153    );
2154
2155    let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
2156        if full_snapshot_archives.is_empty() {
2157            None
2158        } else {
2159            Some(full_snapshot_archives.split_at(num_to_retain))
2160        }
2161        .unwrap_or_default();
2162
2163    let retained_full_snapshot_slots = full_snapshot_archives_to_retain
2164        .iter()
2165        .map(|ai| ai.slot())
2166        .collect::<HashSet<_>>();
2167
2168    fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
2169        for path in archives.iter().map(|a| a.path()) {
2170            trace!("Removing snapshot archive: {}", path.display());
2171            let result = fs::remove_file(path);
2172            if let Err(err) = result {
2173                info!(
2174                    "Failed to remove snapshot archive '{}': {err}",
2175                    path.display()
2176                );
2177            }
2178        }
2179    }
2180    remove_archives(full_snapshot_archives_to_remove);
2181
2182    info!(
2183        "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
2184        incremental_snapshot_archives_dir.as_ref().display(),
2185        maximum_incremental_snapshot_archives_to_retain
2186    );
2187    let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
2188    for incremental_snapshot_archive in
2189        get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
2190    {
2191        incremental_snapshot_archives_by_base_slot
2192            .entry(incremental_snapshot_archive.base_slot())
2193            .or_default()
2194            .push(incremental_snapshot_archive)
2195    }
2196
2197    let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
2198    for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
2199    {
2200        incremental_snapshot_archives.sort_unstable();
2201        let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
2202            maximum_incremental_snapshot_archives_to_retain.get()
2203        } else {
2204            usize::from(retained_full_snapshot_slots.contains(&base_slot))
2205        };
2206        trace!(
2207            "There are {} incremental snapshot archives for base slot {}, removing {} of them",
2208            incremental_snapshot_archives.len(),
2209            base_slot,
2210            incremental_snapshot_archives
2211                .len()
2212                .saturating_sub(num_to_retain),
2213        );
2214
2215        incremental_snapshot_archives.truncate(
2216            incremental_snapshot_archives
2217                .len()
2218                .saturating_sub(num_to_retain),
2219        );
2220        remove_archives(&incremental_snapshot_archives);
2221    }
2222}
2223
2224#[cfg(feature = "dev-context-only-utils")]
2225fn unpack_snapshot_local(
2226    shared_buffer: SharedBuffer,
2227    ledger_dir: &Path,
2228    account_paths: &[PathBuf],
2229    parallel_divisions: usize,
2230) -> Result<UnpackedAppendVecMap> {
2231    assert!(parallel_divisions > 0);
2232
2233    // allocate all readers before any readers start reading
2234    let readers = (0..parallel_divisions)
2235        .map(|_| SharedBufferReader::new(&shared_buffer))
2236        .collect::<Vec<_>>();
2237
2238    // create 'parallel_divisions' # of parallel workers, each responsible for 1/parallel_divisions of all the files to extract.
2239    let all_unpacked_append_vec_map = readers
2240        .into_par_iter()
2241        .enumerate()
2242        .map(|(index, reader)| {
2243            let parallel_selector = Some(ParallelSelector {
2244                index,
2245                divisions: parallel_divisions,
2246            });
2247            let mut archive = Archive::new(reader);
2248            hardened_unpack::unpack_snapshot(
2249                &mut archive,
2250                ledger_dir,
2251                account_paths,
2252                parallel_selector,
2253            )
2254        })
2255        .collect::<Vec<_>>();
2256
2257    let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
2258    for h in all_unpacked_append_vec_map {
2259        unpacked_append_vec_map.extend(h?);
2260    }
2261
2262    Ok(unpacked_append_vec_map)
2263}
2264
2265fn untar_snapshot_create_shared_buffer(
2266    snapshot_tar: &Path,
2267    archive_format: ArchiveFormat,
2268) -> SharedBuffer {
2269    let open_file = || {
2270        fs::File::open(snapshot_tar)
2271            .map_err(|err| {
2272                IoError::other(format!(
2273                    "failed to open snapshot archive '{}': {err}",
2274                    snapshot_tar.display(),
2275                ))
2276            })
2277            .unwrap()
2278    };
2279    match archive_format {
2280        ArchiveFormat::TarBzip2 => SharedBuffer::new(BzDecoder::new(BufReader::new(open_file()))),
2281        ArchiveFormat::TarGzip => SharedBuffer::new(GzDecoder::new(BufReader::new(open_file()))),
2282        ArchiveFormat::TarZstd { .. } => SharedBuffer::new(
2283            zstd::stream::read::Decoder::new(BufReader::new(open_file())).unwrap(),
2284        ),
2285        ArchiveFormat::TarLz4 => {
2286            SharedBuffer::new(lz4::Decoder::new(BufReader::new(open_file())).unwrap())
2287        }
2288        ArchiveFormat::Tar => SharedBuffer::new(BufReader::new(open_file())),
2289    }
2290}
2291
2292#[cfg(feature = "dev-context-only-utils")]
2293fn untar_snapshot_in(
2294    snapshot_tar: impl AsRef<Path>,
2295    unpack_dir: &Path,
2296    account_paths: &[PathBuf],
2297    archive_format: ArchiveFormat,
2298    parallel_divisions: usize,
2299) -> Result<UnpackedAppendVecMap> {
2300    let shared_buffer = untar_snapshot_create_shared_buffer(snapshot_tar.as_ref(), archive_format);
2301    unpack_snapshot_local(shared_buffer, unpack_dir, account_paths, parallel_divisions)
2302}
2303
2304pub fn verify_unpacked_snapshots_dir_and_version(
2305    unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
2306) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
2307    info!(
2308        "snapshot version: {}",
2309        &unpacked_snapshots_dir_and_version.snapshot_version
2310    );
2311
2312    let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
2313    let mut bank_snapshots =
2314        get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
2315    if bank_snapshots.len() > 1 {
2316        return Err(IoError::other(format!(
2317            "invalid snapshot format: only one snapshot allowed, but found {}",
2318            bank_snapshots.len(),
2319        ))
2320        .into());
2321    }
2322    let root_paths = bank_snapshots.pop().ok_or_else(|| {
2323        IoError::other(format!(
2324            "no snapshots found in snapshots directory '{}'",
2325            unpacked_snapshots_dir_and_version
2326                .unpacked_snapshots_dir
2327                .display(),
2328        ))
2329    })?;
2330    Ok((snapshot_version, root_paths))
2331}
2332
2333/// Returns the file name of the bank snapshot for `slot`
2334pub fn get_snapshot_file_name(slot: Slot) -> String {
2335    slot.to_string()
2336}
2337
2338/// Constructs the path to the bank snapshot directory for `slot` within `bank_snapshots_dir`
2339pub fn get_bank_snapshot_dir(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) -> PathBuf {
2340    bank_snapshots_dir
2341        .as_ref()
2342        .join(get_snapshot_file_name(slot))
2343}
2344
2345#[derive(Debug, Copy, Clone)]
2346/// allow tests to specify what happened to the serialized format
2347pub enum VerifyBank {
2348    /// the bank's serialized format is expected to be identical to what we are comparing against
2349    Deterministic,
2350    /// the serialized bank was 'reserialized' into a non-deterministic format
2351    /// so, deserialize both files and compare deserialized results
2352    NonDeterministic,
2353}
2354
2355#[cfg(feature = "dev-context-only-utils")]
2356pub fn verify_snapshot_archive(
2357    snapshot_archive: impl AsRef<Path>,
2358    snapshots_to_verify: impl AsRef<Path>,
2359    archive_format: ArchiveFormat,
2360    verify_bank: VerifyBank,
2361    slot: Slot,
2362) {
2363    let temp_dir = tempfile::TempDir::new().unwrap();
2364    let unpack_dir = temp_dir.path();
2365    let unpack_account_dir = create_accounts_run_and_snapshot_dirs(unpack_dir).unwrap().0;
2366    untar_snapshot_in(
2367        snapshot_archive,
2368        unpack_dir,
2369        &[unpack_account_dir.clone()],
2370        archive_format,
2371        1,
2372    )
2373    .unwrap();
2374
2375    // Check snapshots are the same
2376    let unpacked_snapshots = unpack_dir.join("snapshots");
2377
2378    // Since the unpack code collects all the appendvecs into one directory unpack_account_dir, we need to
2379    // collect all the appendvecs in account_paths/<slot>/snapshot/ into one directory for later comparison.
2380    let storages_to_verify = unpack_dir.join("storages_to_verify");
2381    // Create the directory if it doesn't exist
2382    fs::create_dir_all(&storages_to_verify).unwrap();
2383
2384    let slot = slot.to_string();
2385    let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
2386
2387    if let VerifyBank::NonDeterministic = verify_bank {
2388        // file contents may be different, but deserialized structs should be equal
2389        let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
2390        let p2 = unpacked_snapshots.join(&slot).join(&slot);
2391        assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
2392        fs::remove_file(p1).unwrap();
2393        fs::remove_file(p2).unwrap();
2394    }
2395
2396    // The new the status_cache file is inside the slot directory together with the snapshot file.
2397    // When unpacking an archive, the status_cache file from the archive is one-level up outside of
2398    //  the slot directory.
2399    // The unpacked status_cache file need to be put back into the slot directory for the directory
2400    // comparison to pass.
2401    let existing_unpacked_status_cache_file =
2402        unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2403    let new_unpacked_status_cache_file = unpacked_snapshots
2404        .join(&slot)
2405        .join(SNAPSHOT_STATUS_CACHE_FILENAME);
2406    fs::rename(
2407        existing_unpacked_status_cache_file,
2408        new_unpacked_status_cache_file,
2409    )
2410    .unwrap();
2411
2412    let accounts_hardlinks_dir = snapshot_slot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2413    if accounts_hardlinks_dir.is_dir() {
2414        // This directory contain symlinks to all <account_path>/snapshot/<slot> directories.
2415        for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
2416            let link_dst_path = fs::read_link(entry.unwrap().path()).unwrap();
2417            // Copy all the files in dst_path into the storages_to_verify directory.
2418            for entry in fs::read_dir(&link_dst_path).unwrap() {
2419                let src_path = entry.unwrap().path();
2420                let dst_path = storages_to_verify.join(src_path.file_name().unwrap());
2421                fs::copy(src_path, dst_path).unwrap();
2422            }
2423        }
2424        fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
2425    }
2426
2427    let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
2428    if version_path.is_file() {
2429        fs::remove_file(version_path).unwrap();
2430    }
2431
2432    let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2433    if state_complete_path.is_file() {
2434        fs::remove_file(state_complete_path).unwrap();
2435    }
2436
2437    assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
2438
2439    // In the unarchiving case, there is an extra empty "accounts" directory. The account
2440    // files in the archive accounts/ have been expanded to [account_paths].
2441    // Remove the empty "accounts" directory for the directory comparison below.
2442    // In some test cases the directory to compare do not come from unarchiving.
2443    // Ignore the error when this directory does not exist.
2444    _ = fs::remove_dir(unpack_account_dir.join("accounts"));
2445    // Check the account entries are the same
2446    assert!(!dir_diff::is_different(&storages_to_verify, unpack_account_dir).unwrap());
2447}
2448
2449/// Purges all bank snapshots
2450pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
2451    let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2452    purge_bank_snapshots(&bank_snapshots);
2453}
2454
2455/// Purges bank snapshots, retaining the newest `num_bank_snapshots_to_retain`
2456pub fn purge_old_bank_snapshots(
2457    bank_snapshots_dir: impl AsRef<Path>,
2458    num_bank_snapshots_to_retain: usize,
2459    filter_by_kind: Option<BankSnapshotKind>,
2460) {
2461    let mut bank_snapshots = match filter_by_kind {
2462        Some(BankSnapshotKind::Pre) => get_bank_snapshots_pre(&bank_snapshots_dir),
2463        Some(BankSnapshotKind::Post) => get_bank_snapshots_post(&bank_snapshots_dir),
2464        None => get_bank_snapshots(&bank_snapshots_dir),
2465    };
2466
2467    bank_snapshots.sort_unstable();
2468    purge_bank_snapshots(
2469        bank_snapshots
2470            .iter()
2471            .rev()
2472            .skip(num_bank_snapshots_to_retain),
2473    );
2474}
2475
2476/// At startup, purge old (i.e. unusable) bank snapshots
2477///
2478/// Only a single bank snapshot could be needed at startup (when using fast boot), so
2479/// retain the highest bank snapshot "post", and purge the rest.
2480pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
2481    purge_old_bank_snapshots(&bank_snapshots_dir, 0, Some(BankSnapshotKind::Pre));
2482    purge_old_bank_snapshots(&bank_snapshots_dir, 1, Some(BankSnapshotKind::Post));
2483
2484    let highest_bank_snapshot_post = get_highest_bank_snapshot_post(&bank_snapshots_dir);
2485    if let Some(highest_bank_snapshot_post) = highest_bank_snapshot_post {
2486        debug!(
2487            "Retained bank snapshot for slot {}, and purged the rest.",
2488            highest_bank_snapshot_post.slot
2489        );
2490    }
2491}
2492
2493/// Purges bank snapshots that are older than `slot`
2494pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
2495    let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2496    bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
2497    purge_bank_snapshots(&bank_snapshots);
2498}
2499
2500/// Purges all `bank_snapshots`
2501///
2502/// Does not exit early if there is an error while purging a bank snapshot.
2503fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
2504    for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
2505        if purge_bank_snapshot(snapshot_dir).is_err() {
2506            warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
2507        }
2508    }
2509}
2510
2511/// Remove the bank snapshot at this path
2512pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
2513    const FN_ERR: &str = "failed to purge bank snapshot";
2514    let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2515    if accounts_hardlinks_dir.is_dir() {
2516        // This directory contain symlinks to all accounts snapshot directories.
2517        // They should all be removed.
2518        let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
2519            IoError::other(format!(
2520                "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
2521                accounts_hardlinks_dir.display(),
2522            ))
2523        })?;
2524        for entry in read_dir {
2525            let accounts_hardlink_dir = entry?.path();
2526            let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
2527                IoError::other(format!(
2528                    "{FN_ERR}: failed to read symlink '{}': {err}",
2529                    accounts_hardlink_dir.display(),
2530                ))
2531            })?;
2532            move_and_async_delete_path(&accounts_hardlink_dir);
2533        }
2534    }
2535    fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
2536        IoError::other(format!(
2537            "{FN_ERR}: failed to remove dir '{}': {err}",
2538            bank_snapshot_dir.as_ref().display(),
2539        ))
2540    })?;
2541    Ok(())
2542}
2543
2544pub fn should_take_full_snapshot(
2545    block_height: Slot,
2546    full_snapshot_archive_interval_slots: Slot,
2547) -> bool {
2548    block_height % full_snapshot_archive_interval_slots == 0
2549}
2550
2551pub fn should_take_incremental_snapshot(
2552    block_height: Slot,
2553    incremental_snapshot_archive_interval_slots: Slot,
2554    latest_full_snapshot_slot: Option<Slot>,
2555) -> bool {
2556    block_height % incremental_snapshot_archive_interval_slots == 0
2557        && latest_full_snapshot_slot.is_some()
2558}
2559
2560/// Creates an "accounts path" directory for tests
2561///
2562/// This temporary directory will contain the "run" and "snapshot"
2563/// sub-directories required by a validator.
2564#[cfg(feature = "dev-context-only-utils")]
2565pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
2566    let tmp_dir = tempfile::TempDir::new().unwrap();
2567    let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
2568    (tmp_dir, account_dir)
2569}
2570
2571#[cfg(test)]
2572mod tests {
2573    use {
2574        super::*,
2575        assert_matches::assert_matches,
2576        bincode::{deserialize_from, serialize_into},
2577        std::{convert::TryFrom, mem::size_of},
2578        tempfile::NamedTempFile,
2579    };
2580
2581    #[test]
2582    fn test_serialize_snapshot_data_file_under_limit() {
2583        let temp_dir = tempfile::TempDir::new().unwrap();
2584        let expected_consumed_size = size_of::<u32>() as u64;
2585        let consumed_size = serialize_snapshot_data_file_capped(
2586            &temp_dir.path().join("data-file"),
2587            expected_consumed_size,
2588            |stream| {
2589                serialize_into(stream, &2323_u32)?;
2590                Ok(())
2591            },
2592        )
2593        .unwrap();
2594        assert_eq!(consumed_size, expected_consumed_size);
2595    }
2596
2597    #[test]
2598    fn test_serialize_snapshot_data_file_over_limit() {
2599        let temp_dir = tempfile::TempDir::new().unwrap();
2600        let expected_consumed_size = size_of::<u32>() as u64;
2601        let result = serialize_snapshot_data_file_capped(
2602            &temp_dir.path().join("data-file"),
2603            expected_consumed_size - 1,
2604            |stream| {
2605                serialize_into(stream, &2323_u32)?;
2606                Ok(())
2607            },
2608        );
2609        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
2610    }
2611
2612    #[test]
2613    fn test_deserialize_snapshot_data_file_under_limit() {
2614        let expected_data = 2323_u32;
2615        let expected_consumed_size = size_of::<u32>() as u64;
2616
2617        let temp_dir = tempfile::TempDir::new().unwrap();
2618        serialize_snapshot_data_file_capped(
2619            &temp_dir.path().join("data-file"),
2620            expected_consumed_size,
2621            |stream| {
2622                serialize_into(stream, &expected_data)?;
2623                Ok(())
2624            },
2625        )
2626        .unwrap();
2627
2628        let snapshot_root_paths = SnapshotRootPaths {
2629            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2630            incremental_snapshot_root_file_path: None,
2631        };
2632
2633        let actual_data = deserialize_snapshot_data_files_capped(
2634            &snapshot_root_paths,
2635            expected_consumed_size,
2636            |stream| {
2637                Ok(deserialize_from::<_, u32>(
2638                    &mut stream.full_snapshot_stream,
2639                )?)
2640            },
2641        )
2642        .unwrap();
2643        assert_eq!(actual_data, expected_data);
2644    }
2645
2646    #[test]
2647    fn test_deserialize_snapshot_data_file_over_limit() {
2648        let expected_data = 2323_u32;
2649        let expected_consumed_size = size_of::<u32>() as u64;
2650
2651        let temp_dir = tempfile::TempDir::new().unwrap();
2652        serialize_snapshot_data_file_capped(
2653            &temp_dir.path().join("data-file"),
2654            expected_consumed_size,
2655            |stream| {
2656                serialize_into(stream, &expected_data)?;
2657                Ok(())
2658            },
2659        )
2660        .unwrap();
2661
2662        let snapshot_root_paths = SnapshotRootPaths {
2663            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2664            incremental_snapshot_root_file_path: None,
2665        };
2666
2667        let result = deserialize_snapshot_data_files_capped(
2668            &snapshot_root_paths,
2669            expected_consumed_size - 1,
2670            |stream| {
2671                Ok(deserialize_from::<_, u32>(
2672                    &mut stream.full_snapshot_stream,
2673                )?)
2674            },
2675        );
2676        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
2677    }
2678
2679    #[test]
2680    fn test_deserialize_snapshot_data_file_extra_data() {
2681        let expected_data = 2323_u32;
2682        let expected_consumed_size = size_of::<u32>() as u64;
2683
2684        let temp_dir = tempfile::TempDir::new().unwrap();
2685        serialize_snapshot_data_file_capped(
2686            &temp_dir.path().join("data-file"),
2687            expected_consumed_size * 2,
2688            |stream| {
2689                serialize_into(stream.by_ref(), &expected_data)?;
2690                serialize_into(stream.by_ref(), &expected_data)?;
2691                Ok(())
2692            },
2693        )
2694        .unwrap();
2695
2696        let snapshot_root_paths = SnapshotRootPaths {
2697            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2698            incremental_snapshot_root_file_path: None,
2699        };
2700
2701        let result = deserialize_snapshot_data_files_capped(
2702            &snapshot_root_paths,
2703            expected_consumed_size * 2,
2704            |stream| {
2705                Ok(deserialize_from::<_, u32>(
2706                    &mut stream.full_snapshot_stream,
2707                )?)
2708            },
2709        );
2710        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2711    }
2712
2713    #[test]
2714    fn test_snapshot_version_from_file_under_limit() {
2715        let file_content = SnapshotVersion::default().as_str();
2716        let mut file = NamedTempFile::new().unwrap();
2717        file.write_all(file_content.as_bytes()).unwrap();
2718        let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2719        assert_eq!(version_from_file, file_content);
2720    }
2721
2722    #[test]
2723    fn test_snapshot_version_from_file_over_limit() {
2724        let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2725        let file_content = vec![7u8; over_limit_size];
2726        let mut file = NamedTempFile::new().unwrap();
2727        file.write_all(&file_content).unwrap();
2728        assert_matches!(
2729            snapshot_version_from_file(file.path()),
2730            Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("snapshot version file too large")
2731        );
2732    }
2733
2734    #[test]
2735    fn test_parse_full_snapshot_archive_filename() {
2736        assert_eq!(
2737            parse_full_snapshot_archive_filename(&format!(
2738                "snapshot-42-{}.tar.bz2",
2739                Hash::default()
2740            ))
2741            .unwrap(),
2742            (42, SnapshotHash(Hash::default()), ArchiveFormat::TarBzip2)
2743        );
2744        assert_eq!(
2745            parse_full_snapshot_archive_filename(&format!(
2746                "snapshot-43-{}.tar.zst",
2747                Hash::default()
2748            ))
2749            .unwrap(),
2750            (
2751                43,
2752                SnapshotHash(Hash::default()),
2753                ArchiveFormat::TarZstd {
2754                    config: ZstdConfig::default(),
2755                }
2756            )
2757        );
2758        assert_eq!(
2759            parse_full_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default()))
2760                .unwrap(),
2761            (44, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2762        );
2763        assert_eq!(
2764            parse_full_snapshot_archive_filename(&format!(
2765                "snapshot-45-{}.tar.lz4",
2766                Hash::default()
2767            ))
2768            .unwrap(),
2769            (45, SnapshotHash(Hash::default()), ArchiveFormat::TarLz4)
2770        );
2771
2772        assert!(parse_full_snapshot_archive_filename("invalid").is_err());
2773        assert!(
2774            parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err()
2775        );
2776
2777        assert!(
2778            parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err()
2779        );
2780        assert!(parse_full_snapshot_archive_filename(&format!(
2781            "snapshot-12345678-{}.bad!ext",
2782            Hash::new_unique()
2783        ))
2784        .is_err());
2785        assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2786
2787        assert!(parse_full_snapshot_archive_filename(&format!(
2788            "snapshot-bad!slot-{}.bad!ext",
2789            Hash::new_unique()
2790        ))
2791        .is_err());
2792        assert!(parse_full_snapshot_archive_filename(&format!(
2793            "snapshot-12345678-{}.bad!ext",
2794            Hash::new_unique()
2795        ))
2796        .is_err());
2797        assert!(parse_full_snapshot_archive_filename(&format!(
2798            "snapshot-bad!slot-{}.tar",
2799            Hash::new_unique()
2800        ))
2801        .is_err());
2802
2803        assert!(parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_err());
2804        assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2805        assert!(parse_full_snapshot_archive_filename(&format!(
2806            "snapshot-bad!slot-{}.tar",
2807            Hash::new_unique()
2808        ))
2809        .is_err());
2810    }
2811
2812    #[test]
2813    fn test_parse_incremental_snapshot_archive_filename() {
2814        assert_eq!(
2815            parse_incremental_snapshot_archive_filename(&format!(
2816                "incremental-snapshot-42-123-{}.tar.bz2",
2817                Hash::default()
2818            ))
2819            .unwrap(),
2820            (
2821                42,
2822                123,
2823                SnapshotHash(Hash::default()),
2824                ArchiveFormat::TarBzip2
2825            )
2826        );
2827        assert_eq!(
2828            parse_incremental_snapshot_archive_filename(&format!(
2829                "incremental-snapshot-43-234-{}.tar.zst",
2830                Hash::default()
2831            ))
2832            .unwrap(),
2833            (
2834                43,
2835                234,
2836                SnapshotHash(Hash::default()),
2837                ArchiveFormat::TarZstd {
2838                    config: ZstdConfig::default(),
2839                }
2840            )
2841        );
2842        assert_eq!(
2843            parse_incremental_snapshot_archive_filename(&format!(
2844                "incremental-snapshot-44-345-{}.tar",
2845                Hash::default()
2846            ))
2847            .unwrap(),
2848            (44, 345, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2849        );
2850        assert_eq!(
2851            parse_incremental_snapshot_archive_filename(&format!(
2852                "incremental-snapshot-45-456-{}.tar.lz4",
2853                Hash::default()
2854            ))
2855            .unwrap(),
2856            (
2857                45,
2858                456,
2859                SnapshotHash(Hash::default()),
2860                ArchiveFormat::TarLz4
2861            )
2862        );
2863
2864        assert!(parse_incremental_snapshot_archive_filename("invalid").is_err());
2865        assert!(parse_incremental_snapshot_archive_filename(&format!(
2866            "snapshot-42-{}.tar",
2867            Hash::new_unique()
2868        ))
2869        .is_err());
2870        assert!(parse_incremental_snapshot_archive_filename(
2871            "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext"
2872        )
2873        .is_err());
2874
2875        assert!(parse_incremental_snapshot_archive_filename(&format!(
2876            "incremental-snapshot-bad!slot-56785678-{}.tar",
2877            Hash::new_unique()
2878        ))
2879        .is_err());
2880
2881        assert!(parse_incremental_snapshot_archive_filename(&format!(
2882            "incremental-snapshot-12345678-bad!slot-{}.tar",
2883            Hash::new_unique()
2884        ))
2885        .is_err());
2886
2887        assert!(parse_incremental_snapshot_archive_filename(
2888            "incremental-snapshot-12341234-56785678-bad!HASH.tar"
2889        )
2890        .is_err());
2891
2892        assert!(parse_incremental_snapshot_archive_filename(&format!(
2893            "incremental-snapshot-12341234-56785678-{}.bad!ext",
2894            Hash::new_unique()
2895        ))
2896        .is_err());
2897    }
2898
2899    #[test]
2900    fn test_check_are_snapshots_compatible() {
2901        let slot1: Slot = 1234;
2902        let slot2: Slot = 5678;
2903        let slot3: Slot = 999_999;
2904
2905        let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
2906            format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()),
2907        ))
2908        .unwrap();
2909
2910        assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
2911
2912        let incremental_snapshot_archive_info =
2913            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2914                "/dir/incremental-snapshot-{}-{}-{}.tar",
2915                slot1,
2916                slot2,
2917                Hash::new_unique()
2918            )))
2919            .unwrap();
2920
2921        assert!(check_are_snapshots_compatible(
2922            &full_snapshot_archive_info,
2923            Some(&incremental_snapshot_archive_info)
2924        )
2925        .is_ok());
2926
2927        let incremental_snapshot_archive_info =
2928            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2929                "/dir/incremental-snapshot-{}-{}-{}.tar",
2930                slot2,
2931                slot3,
2932                Hash::new_unique()
2933            )))
2934            .unwrap();
2935
2936        assert!(check_are_snapshots_compatible(
2937            &full_snapshot_archive_info,
2938            Some(&incremental_snapshot_archive_info)
2939        )
2940        .is_err());
2941    }
2942
2943    /// A test heler function that creates bank snapshot files
2944    fn common_create_bank_snapshot_files(
2945        bank_snapshots_dir: &Path,
2946        min_slot: Slot,
2947        max_slot: Slot,
2948    ) {
2949        for slot in min_slot..max_slot {
2950            let snapshot_dir = get_bank_snapshot_dir(bank_snapshots_dir, slot);
2951            fs::create_dir_all(&snapshot_dir).unwrap();
2952
2953            let snapshot_filename = get_snapshot_file_name(slot);
2954            let snapshot_path = snapshot_dir.join(snapshot_filename);
2955            fs::File::create(snapshot_path).unwrap();
2956
2957            let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2958            fs::File::create(status_cache_file).unwrap();
2959
2960            let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
2961            fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
2962
2963            // Mark this directory complete so it can be used.  Check this flag first before selecting for deserialization.
2964            let state_complete_path = snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2965            fs::File::create(state_complete_path).unwrap();
2966        }
2967    }
2968
2969    #[test]
2970    fn test_get_bank_snapshots() {
2971        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2972        let min_slot = 10;
2973        let max_slot = 20;
2974        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2975
2976        let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
2977        assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
2978    }
2979
2980    #[test]
2981    fn test_get_highest_bank_snapshot_post() {
2982        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2983        let min_slot = 99;
2984        let max_slot = 123;
2985        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2986
2987        let highest_bank_snapshot = get_highest_bank_snapshot_post(temp_snapshots_dir.path());
2988        assert!(highest_bank_snapshot.is_some());
2989        assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
2990    }
2991
2992    /// A test helper function that creates full and incremental snapshot archive files.  Creates
2993    /// full snapshot files in the range (`min_full_snapshot_slot`, `max_full_snapshot_slot`], and
2994    /// incremental snapshot files in the range (`min_incremental_snapshot_slot`,
2995    /// `max_incremental_snapshot_slot`].  Additionally, "bad" files are created for both full and
2996    /// incremental snapshots to ensure the tests properly filter them out.
2997    fn common_create_snapshot_archive_files(
2998        full_snapshot_archives_dir: &Path,
2999        incremental_snapshot_archives_dir: &Path,
3000        min_full_snapshot_slot: Slot,
3001        max_full_snapshot_slot: Slot,
3002        min_incremental_snapshot_slot: Slot,
3003        max_incremental_snapshot_slot: Slot,
3004    ) {
3005        fs::create_dir_all(full_snapshot_archives_dir).unwrap();
3006        fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
3007        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3008            for incremental_snapshot_slot in
3009                min_incremental_snapshot_slot..max_incremental_snapshot_slot
3010            {
3011                let snapshot_filename = format!(
3012                    "incremental-snapshot-{}-{}-{}.tar",
3013                    full_snapshot_slot,
3014                    incremental_snapshot_slot,
3015                    Hash::default()
3016                );
3017                let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
3018                fs::File::create(snapshot_filepath).unwrap();
3019            }
3020
3021            let snapshot_filename =
3022                format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3023            let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
3024            fs::File::create(snapshot_filepath).unwrap();
3025
3026            // Add in an incremental snapshot with a bad filename and high slot to ensure filename are filtered and sorted correctly
3027            let bad_filename = format!(
3028                "incremental-snapshot-{}-{}-bad!hash.tar",
3029                full_snapshot_slot,
3030                max_incremental_snapshot_slot + 1,
3031            );
3032            let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
3033            fs::File::create(bad_filepath).unwrap();
3034        }
3035
3036        // Add in a snapshot with a bad filename and high slot to ensure filename are filtered and
3037        // sorted correctly
3038        let bad_filename = format!("snapshot-{}-bad!hash.tar", max_full_snapshot_slot + 1);
3039        let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
3040        fs::File::create(bad_filepath).unwrap();
3041    }
3042
3043    #[test]
3044    fn test_get_full_snapshot_archives() {
3045        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3046        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3047        let min_slot = 123;
3048        let max_slot = 456;
3049        common_create_snapshot_archive_files(
3050            full_snapshot_archives_dir.path(),
3051            incremental_snapshot_archives_dir.path(),
3052            min_slot,
3053            max_slot,
3054            0,
3055            0,
3056        );
3057
3058        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3059        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3060    }
3061
3062    #[test]
3063    fn test_get_full_snapshot_archives_remote() {
3064        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3065        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3066        let min_slot = 123;
3067        let max_slot = 456;
3068        common_create_snapshot_archive_files(
3069            &full_snapshot_archives_dir
3070                .path()
3071                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3072            &incremental_snapshot_archives_dir
3073                .path()
3074                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3075            min_slot,
3076            max_slot,
3077            0,
3078            0,
3079        );
3080
3081        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3082        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3083        assert!(snapshot_archives.iter().all(|info| info.is_remote()));
3084    }
3085
3086    #[test]
3087    fn test_get_incremental_snapshot_archives() {
3088        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3089        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3090        let min_full_snapshot_slot = 12;
3091        let max_full_snapshot_slot = 23;
3092        let min_incremental_snapshot_slot = 34;
3093        let max_incremental_snapshot_slot = 45;
3094        common_create_snapshot_archive_files(
3095            full_snapshot_archives_dir.path(),
3096            incremental_snapshot_archives_dir.path(),
3097            min_full_snapshot_slot,
3098            max_full_snapshot_slot,
3099            min_incremental_snapshot_slot,
3100            max_incremental_snapshot_slot,
3101        );
3102
3103        let incremental_snapshot_archives =
3104            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3105        assert_eq!(
3106            incremental_snapshot_archives.len() as Slot,
3107            (max_full_snapshot_slot - min_full_snapshot_slot)
3108                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3109        );
3110    }
3111
3112    #[test]
3113    fn test_get_incremental_snapshot_archives_remote() {
3114        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3115        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3116        let min_full_snapshot_slot = 12;
3117        let max_full_snapshot_slot = 23;
3118        let min_incremental_snapshot_slot = 34;
3119        let max_incremental_snapshot_slot = 45;
3120        common_create_snapshot_archive_files(
3121            &full_snapshot_archives_dir
3122                .path()
3123                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3124            &incremental_snapshot_archives_dir
3125                .path()
3126                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3127            min_full_snapshot_slot,
3128            max_full_snapshot_slot,
3129            min_incremental_snapshot_slot,
3130            max_incremental_snapshot_slot,
3131        );
3132
3133        let incremental_snapshot_archives =
3134            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3135        assert_eq!(
3136            incremental_snapshot_archives.len() as Slot,
3137            (max_full_snapshot_slot - min_full_snapshot_slot)
3138                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3139        );
3140        assert!(incremental_snapshot_archives
3141            .iter()
3142            .all(|info| info.is_remote()));
3143    }
3144
3145    #[test]
3146    fn test_get_highest_full_snapshot_archive_slot() {
3147        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3148        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3149        let min_slot = 123;
3150        let max_slot = 456;
3151        common_create_snapshot_archive_files(
3152            full_snapshot_archives_dir.path(),
3153            incremental_snapshot_archives_dir.path(),
3154            min_slot,
3155            max_slot,
3156            0,
3157            0,
3158        );
3159
3160        assert_eq!(
3161            get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
3162            Some(max_slot - 1)
3163        );
3164    }
3165
3166    #[test]
3167    fn test_get_highest_incremental_snapshot_slot() {
3168        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3169        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3170        let min_full_snapshot_slot = 12;
3171        let max_full_snapshot_slot = 23;
3172        let min_incremental_snapshot_slot = 34;
3173        let max_incremental_snapshot_slot = 45;
3174        common_create_snapshot_archive_files(
3175            full_snapshot_archives_dir.path(),
3176            incremental_snapshot_archives_dir.path(),
3177            min_full_snapshot_slot,
3178            max_full_snapshot_slot,
3179            min_incremental_snapshot_slot,
3180            max_incremental_snapshot_slot,
3181        );
3182
3183        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3184            assert_eq!(
3185                get_highest_incremental_snapshot_archive_slot(
3186                    incremental_snapshot_archives_dir.path(),
3187                    full_snapshot_slot
3188                ),
3189                Some(max_incremental_snapshot_slot - 1)
3190            );
3191        }
3192
3193        assert_eq!(
3194            get_highest_incremental_snapshot_archive_slot(
3195                incremental_snapshot_archives_dir.path(),
3196                max_full_snapshot_slot
3197            ),
3198            None
3199        );
3200    }
3201
3202    fn common_test_purge_old_snapshot_archives(
3203        snapshot_names: &[&String],
3204        maximum_full_snapshot_archives_to_retain: NonZeroUsize,
3205        maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
3206        expected_snapshots: &[&String],
3207    ) {
3208        let temp_snap_dir = tempfile::TempDir::new().unwrap();
3209
3210        for snap_name in snapshot_names {
3211            let snap_path = temp_snap_dir.path().join(snap_name);
3212            let mut _snap_file = fs::File::create(snap_path);
3213        }
3214        purge_old_snapshot_archives(
3215            temp_snap_dir.path(),
3216            temp_snap_dir.path(),
3217            maximum_full_snapshot_archives_to_retain,
3218            maximum_incremental_snapshot_archives_to_retain,
3219        );
3220
3221        let mut retained_snaps = HashSet::new();
3222        for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
3223            let entry_path_buf = entry.unwrap().path();
3224            let entry_path = entry_path_buf.as_path();
3225            let snapshot_name = entry_path
3226                .file_name()
3227                .unwrap()
3228                .to_str()
3229                .unwrap()
3230                .to_string();
3231            retained_snaps.insert(snapshot_name);
3232        }
3233
3234        for snap_name in expected_snapshots {
3235            assert!(
3236                retained_snaps.contains(snap_name.as_str()),
3237                "{snap_name} not found"
3238            );
3239        }
3240        assert_eq!(retained_snaps.len(), expected_snapshots.len());
3241    }
3242
3243    #[test]
3244    fn test_purge_old_full_snapshot_archives() {
3245        let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
3246        let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
3247        let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
3248        let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
3249
3250        // expecting only the newest to be retained
3251        let expected_snapshots = vec![&snap3_name];
3252        common_test_purge_old_snapshot_archives(
3253            &snapshot_names,
3254            NonZeroUsize::new(1).unwrap(),
3255            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3256            &expected_snapshots,
3257        );
3258
3259        // retaining 2, expecting the 2 newest to be retained
3260        let expected_snapshots = vec![&snap2_name, &snap3_name];
3261        common_test_purge_old_snapshot_archives(
3262            &snapshot_names,
3263            NonZeroUsize::new(2).unwrap(),
3264            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3265            &expected_snapshots,
3266        );
3267
3268        // retaining 3, all three should be retained
3269        let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
3270        common_test_purge_old_snapshot_archives(
3271            &snapshot_names,
3272            NonZeroUsize::new(3).unwrap(),
3273            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3274            &expected_snapshots,
3275        );
3276    }
3277
3278    /// Mimic a running node's behavior w.r.t. purging old snapshot archives.  Take snapshots in a
3279    /// loop, and periodically purge old snapshot archives.  After purging, check to make sure the
3280    /// snapshot archives on disk are correct.
3281    #[test]
3282    fn test_purge_old_full_snapshot_archives_in_the_loop() {
3283        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3284        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3285        let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
3286        let starting_slot: Slot = 42;
3287
3288        for slot in (starting_slot..).take(100) {
3289            let full_snapshot_archive_file_name =
3290                format!("snapshot-{}-{}.tar", slot, Hash::default());
3291            let full_snapshot_archive_path = full_snapshot_archives_dir
3292                .as_ref()
3293                .join(full_snapshot_archive_file_name);
3294            fs::File::create(full_snapshot_archive_path).unwrap();
3295
3296            // don't purge-and-check until enough snapshot archives have been created
3297            if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
3298                continue;
3299            }
3300
3301            // purge infrequently, so there will always be snapshot archives to purge
3302            if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
3303                continue;
3304            }
3305
3306            purge_old_snapshot_archives(
3307                &full_snapshot_archives_dir,
3308                &incremental_snapshot_archives_dir,
3309                maximum_snapshots_to_retain,
3310                NonZeroUsize::new(usize::MAX).unwrap(),
3311            );
3312            let mut full_snapshot_archives =
3313                get_full_snapshot_archives(&full_snapshot_archives_dir);
3314            full_snapshot_archives.sort_unstable();
3315            assert_eq!(
3316                full_snapshot_archives.len(),
3317                maximum_snapshots_to_retain.get()
3318            );
3319            assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
3320            for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
3321                assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
3322            }
3323        }
3324    }
3325
3326    #[test]
3327    fn test_purge_old_incremental_snapshot_archives() {
3328        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3329        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3330        let starting_slot = 100_000;
3331
3332        let maximum_incremental_snapshot_archives_to_retain =
3333            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3334        let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3335
3336        let incremental_snapshot_interval = 100;
3337        let num_incremental_snapshots_per_full_snapshot =
3338            maximum_incremental_snapshot_archives_to_retain.get() * 2;
3339        let full_snapshot_interval =
3340            incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
3341
3342        let mut snapshot_filenames = vec![];
3343        (starting_slot..)
3344            .step_by(full_snapshot_interval)
3345            .take(
3346                maximum_full_snapshot_archives_to_retain
3347                    .checked_mul(NonZeroUsize::new(2).unwrap())
3348                    .unwrap()
3349                    .get(),
3350            )
3351            .for_each(|full_snapshot_slot| {
3352                let snapshot_filename =
3353                    format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3354                let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
3355                fs::File::create(snapshot_path).unwrap();
3356                snapshot_filenames.push(snapshot_filename);
3357
3358                (full_snapshot_slot..)
3359                    .step_by(incremental_snapshot_interval)
3360                    .take(num_incremental_snapshots_per_full_snapshot)
3361                    .skip(1)
3362                    .for_each(|incremental_snapshot_slot| {
3363                        let snapshot_filename = format!(
3364                            "incremental-snapshot-{}-{}-{}.tar",
3365                            full_snapshot_slot,
3366                            incremental_snapshot_slot,
3367                            Hash::default()
3368                        );
3369                        let snapshot_path = incremental_snapshot_archives_dir
3370                            .path()
3371                            .join(&snapshot_filename);
3372                        fs::File::create(snapshot_path).unwrap();
3373                        snapshot_filenames.push(snapshot_filename);
3374                    });
3375            });
3376
3377        purge_old_snapshot_archives(
3378            full_snapshot_archives_dir.path(),
3379            incremental_snapshot_archives_dir.path(),
3380            maximum_full_snapshot_archives_to_retain,
3381            maximum_incremental_snapshot_archives_to_retain,
3382        );
3383
3384        // Ensure correct number of full snapshot archives are purged/retained
3385        let mut remaining_full_snapshot_archives =
3386            get_full_snapshot_archives(full_snapshot_archives_dir.path());
3387        assert_eq!(
3388            remaining_full_snapshot_archives.len(),
3389            maximum_full_snapshot_archives_to_retain.get(),
3390        );
3391        remaining_full_snapshot_archives.sort_unstable();
3392        let latest_full_snapshot_archive_slot =
3393            remaining_full_snapshot_archives.last().unwrap().slot();
3394
3395        // Ensure correct number of incremental snapshot archives are purged/retained
3396        // For each additional full snapshot archive, one additional (the newest)
3397        // incremental snapshot archive is retained. This is accounted for by the
3398        // `+ maximum_full_snapshot_archives_to_retain.saturating_sub(1)`
3399        let mut remaining_incremental_snapshot_archives =
3400            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3401        assert_eq!(
3402            remaining_incremental_snapshot_archives.len(),
3403            maximum_incremental_snapshot_archives_to_retain
3404                .get()
3405                .saturating_add(
3406                    maximum_full_snapshot_archives_to_retain
3407                        .get()
3408                        .saturating_sub(1)
3409                )
3410        );
3411        remaining_incremental_snapshot_archives.sort_unstable();
3412        remaining_incremental_snapshot_archives.reverse();
3413
3414        // Ensure there exists one incremental snapshot all but the latest full snapshot
3415        for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
3416            let incremental_snapshot_archive =
3417                remaining_incremental_snapshot_archives.pop().unwrap();
3418
3419            let expected_base_slot =
3420                latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
3421            assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
3422            let expected_slot = expected_base_slot
3423                + (full_snapshot_interval - incremental_snapshot_interval) as u64;
3424            assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
3425        }
3426
3427        // Ensure all remaining incremental snapshots are only for the latest full snapshot
3428        for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
3429            assert_eq!(
3430                incremental_snapshot_archive.base_slot(),
3431                latest_full_snapshot_archive_slot
3432            );
3433        }
3434
3435        // Ensure the remaining incremental snapshots are at the right slot
3436        let expected_remaining_incremental_snapshot_archive_slots =
3437            (latest_full_snapshot_archive_slot..)
3438                .step_by(incremental_snapshot_interval)
3439                .take(num_incremental_snapshots_per_full_snapshot)
3440                .skip(
3441                    num_incremental_snapshots_per_full_snapshot
3442                        - maximum_incremental_snapshot_archives_to_retain.get(),
3443                )
3444                .collect::<HashSet<_>>();
3445
3446        let actual_remaining_incremental_snapshot_archive_slots =
3447            remaining_incremental_snapshot_archives
3448                .iter()
3449                .map(|snapshot| snapshot.slot())
3450                .collect::<HashSet<_>>();
3451        assert_eq!(
3452            actual_remaining_incremental_snapshot_archive_slots,
3453            expected_remaining_incremental_snapshot_archive_slots
3454        );
3455    }
3456
3457    #[test]
3458    fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
3459        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3460        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3461
3462        for snapshot_filenames in [
3463            format!("incremental-snapshot-100-120-{}.tar", Hash::default()),
3464            format!("incremental-snapshot-100-140-{}.tar", Hash::default()),
3465            format!("incremental-snapshot-100-160-{}.tar", Hash::default()),
3466            format!("incremental-snapshot-100-180-{}.tar", Hash::default()),
3467            format!("incremental-snapshot-200-220-{}.tar", Hash::default()),
3468            format!("incremental-snapshot-200-240-{}.tar", Hash::default()),
3469            format!("incremental-snapshot-200-260-{}.tar", Hash::default()),
3470            format!("incremental-snapshot-200-280-{}.tar", Hash::default()),
3471        ] {
3472            let snapshot_path = incremental_snapshot_archives_dir
3473                .path()
3474                .join(snapshot_filenames);
3475            fs::File::create(snapshot_path).unwrap();
3476        }
3477
3478        purge_old_snapshot_archives(
3479            full_snapshot_archives_dir.path(),
3480            incremental_snapshot_archives_dir.path(),
3481            NonZeroUsize::new(usize::MAX).unwrap(),
3482            NonZeroUsize::new(usize::MAX).unwrap(),
3483        );
3484
3485        let remaining_incremental_snapshot_archives =
3486            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3487        assert!(remaining_incremental_snapshot_archives.is_empty());
3488    }
3489
3490    #[test]
3491    fn test_get_snapshot_accounts_hardlink_dir() {
3492        let slot: Slot = 1;
3493
3494        let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
3495
3496        let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
3497        let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
3498        let accounts_hardlinks_dir = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
3499        fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
3500
3501        let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
3502        let appendvec_filename = format!("{slot}.0");
3503        let appendvec_path = accounts_dir.join(appendvec_filename);
3504
3505        let ret = get_snapshot_accounts_hardlink_dir(
3506            &appendvec_path,
3507            slot,
3508            &mut account_paths_set,
3509            &accounts_hardlinks_dir,
3510        );
3511        assert!(ret.is_ok());
3512
3513        let wrong_appendvec_path = appendvec_path
3514            .parent()
3515            .unwrap()
3516            .parent()
3517            .unwrap()
3518            .join(appendvec_path.file_name().unwrap());
3519        let ret = get_snapshot_accounts_hardlink_dir(
3520            &wrong_appendvec_path,
3521            slot,
3522            &mut account_paths_set,
3523            accounts_hardlinks_dir,
3524        );
3525
3526        assert_matches!(
3527            ret,
3528            Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
3529        );
3530    }
3531
3532    #[test]
3533    fn test_full_snapshot_slot_file_good() {
3534        let slot_written = 123_456_789;
3535        let bank_snapshot_dir = TempDir::new().unwrap();
3536        write_full_snapshot_slot_file(&bank_snapshot_dir, slot_written).unwrap();
3537
3538        let slot_read = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap();
3539        assert_eq!(slot_read, slot_written);
3540    }
3541
3542    #[test]
3543    fn test_full_snapshot_slot_file_bad() {
3544        const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
3545        let too_small = [1u8; SLOT_SIZE - 1];
3546        let too_large = [1u8; SLOT_SIZE + 1];
3547
3548        for contents in [too_small.as_slice(), too_large.as_slice()] {
3549            let bank_snapshot_dir = TempDir::new().unwrap();
3550            let full_snapshot_slot_path = bank_snapshot_dir
3551                .as_ref()
3552                .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
3553            fs::write(full_snapshot_slot_path, contents).unwrap();
3554
3555            let err = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap_err();
3556            assert!(err
3557                .to_string()
3558                .starts_with("invalid full snapshot slot file size"));
3559        }
3560    }
3561}