solana_runtime/
snapshot_utils.rs

1use {
2    crate::{
3        bank::{BankFieldsToSerialize, BankHashStats, BankSlotDelta},
4        serde_snapshot::{
5            self, BankIncrementalSnapshotPersistence, ExtraFieldsToSerialize, SnapshotStreams,
6        },
7        snapshot_archive_info::{
8            FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfo,
9            SnapshotArchiveInfoGetter,
10        },
11        snapshot_bank_utils,
12        snapshot_config::SnapshotConfig,
13        snapshot_hash::SnapshotHash,
14        snapshot_package::{SnapshotKind, SnapshotPackage},
15        snapshot_utils::snapshot_storage_rebuilder::{
16            RebuiltSnapshotStorage, SnapshotStorageRebuilder,
17        },
18    },
19    bzip2::read::BzDecoder,
20    crossbeam_channel::Sender,
21    flate2::read::GzDecoder,
22    log::*,
23    regex::Regex,
24    solana_accounts_db::{
25        account_storage::AccountStorageMap,
26        account_storage_reader::AccountStorageReader,
27        accounts_db::{AccountStorageEntry, AtomicAccountsFileId},
28        accounts_file::{AccountsFile, AccountsFileError, StorageAccess},
29        accounts_hash::{AccountsDeltaHash, AccountsHash},
30        epoch_accounts_hash::EpochAccountsHash,
31        hardened_unpack::{self, ParallelSelector, UnpackError},
32        shared_buffer_reader::{SharedBuffer, SharedBufferReader},
33        utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
34    },
35    solana_clock::{Epoch, Slot},
36    solana_hash::Hash,
37    solana_measure::{measure::Measure, measure_time, measure_us},
38    std::{
39        cmp::Ordering,
40        collections::{HashMap, HashSet},
41        fmt, fs,
42        io::{BufReader, BufWriter, Error as IoError, Read, Result as IoResult, Seek, Write},
43        mem,
44        num::NonZeroUsize,
45        ops::RangeInclusive,
46        path::{Path, PathBuf},
47        process::ExitStatus,
48        str::FromStr,
49        sync::Arc,
50        thread::{Builder, JoinHandle},
51    },
52    tar::{self, Archive},
53    tempfile::TempDir,
54    thiserror::Error,
55};
56#[cfg(feature = "dev-context-only-utils")]
57use {
58    hardened_unpack::UnpackedAppendVecMap, rayon::prelude::*,
59    solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
60};
61
62mod archive_format;
63pub mod snapshot_storage_rebuilder;
64pub use archive_format::*;
65
66pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
67pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
68pub const SNAPSHOT_STATE_COMPLETE_FILENAME: &str = "state_complete";
69pub const SNAPSHOT_STORAGES_FLUSHED_FILENAME: &str = "storages_flushed";
70pub const SNAPSHOT_ACCOUNTS_HARDLINKS: &str = "accounts_hardlinks";
71pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
72pub const SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME: &str = "full_snapshot_slot";
73pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
74const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; // byte
75const VERSION_STRING_V1_2_0: &str = "1.2.0";
76pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
77pub const BANK_SNAPSHOT_PRE_FILENAME_EXTENSION: &str = "pre";
78// The following unsafes are
79// - Safe because the values are fixed, known non-zero constants
80// - Necessary in order to have a plain NonZeroUsize as the constant, NonZeroUsize
81//   returns an Option<NonZeroUsize> and we can't .unwrap() at compile time
82pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
83    NonZeroUsize::new(2).unwrap();
84pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
85    NonZeroUsize::new(4).unwrap();
86pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
87pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
88
89#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
90pub enum SnapshotVersion {
91    #[default]
92    V1_2_0,
93}
94
95impl fmt::Display for SnapshotVersion {
96    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
97        f.write_str(From::from(*self))
98    }
99}
100
101impl From<SnapshotVersion> for &'static str {
102    fn from(snapshot_version: SnapshotVersion) -> &'static str {
103        match snapshot_version {
104            SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
105        }
106    }
107}
108
109impl FromStr for SnapshotVersion {
110    type Err = &'static str;
111
112    fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
113        // Remove leading 'v' or 'V' from slice
114        let version_string = if version_string
115            .get(..1)
116            .is_some_and(|s| s.eq_ignore_ascii_case("v"))
117        {
118            &version_string[1..]
119        } else {
120            version_string
121        };
122        match version_string {
123            VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
124            _ => Err("unsupported snapshot version"),
125        }
126    }
127}
128
129impl SnapshotVersion {
130    pub fn as_str(self) -> &'static str {
131        <&str as From<Self>>::from(self)
132    }
133}
134
135/// Information about a bank snapshot. Namely the slot of the bank, the path to the snapshot, and
136/// the kind of the snapshot.
137#[derive(PartialEq, Eq, Debug)]
138pub struct BankSnapshotInfo {
139    /// Slot of the bank
140    pub slot: Slot,
141    /// Snapshot kind
142    pub snapshot_kind: BankSnapshotKind,
143    /// Path to the bank snapshot directory
144    pub snapshot_dir: PathBuf,
145    /// Snapshot version
146    pub snapshot_version: SnapshotVersion,
147}
148
149impl PartialOrd for BankSnapshotInfo {
150    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
151        Some(self.cmp(other))
152    }
153}
154
155// Order BankSnapshotInfo by slot (ascending), which practically is sorting chronologically
156impl Ord for BankSnapshotInfo {
157    fn cmp(&self, other: &Self) -> Ordering {
158        self.slot.cmp(&other.slot)
159    }
160}
161
162impl BankSnapshotInfo {
163    pub fn new_from_dir(
164        bank_snapshots_dir: impl AsRef<Path>,
165        slot: Slot,
166    ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
167        // check this directory to see if there is a BankSnapshotPre and/or
168        // BankSnapshotPost file
169        let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
170
171        if !bank_snapshot_dir.is_dir() {
172            return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
173                bank_snapshot_dir,
174            ));
175        }
176
177        // Among the files checks, the completion flag file check should be done first to avoid the later
178        // I/O errors.
179
180        // There is a time window from the slot directory being created, and the content being completely
181        // filled.  Check the completion to avoid using a highest found slot directory with missing content.
182        if !is_bank_snapshot_complete(&bank_snapshot_dir) {
183            return Err(SnapshotNewFromDirError::IncompleteDir(bank_snapshot_dir));
184        }
185
186        let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
187        if !status_cache_file.is_file() {
188            return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
189                status_cache_file,
190            ));
191        }
192
193        let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
194        let version_str = snapshot_version_from_file(&version_path).or(Err(
195            SnapshotNewFromDirError::MissingVersionFile(version_path),
196        ))?;
197        let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
198            .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
199
200        let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
201        let bank_snapshot_pre_path =
202            bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
203
204        // NOTE: It is important that checking for "Pre" happens before "Post.
205        //
206        // Consider the scenario where AccountsHashVerifier is actively processing an
207        // AccountsPackage for a snapshot/slot; if AHV is in the middle of reserializing the
208        // bank snapshot file (writing the new "Post" file), and then the process dies,
209        // there will be an incomplete "Post" file on disk.  We do not want only the existence of
210        // this "Post" file to be sufficient for deciding the snapshot kind as "Post".  More so,
211        // "Post" *requires* the *absence* of a "Pre" file.
212        let snapshot_kind = if bank_snapshot_pre_path.is_file() {
213            BankSnapshotKind::Pre
214        } else if bank_snapshot_post_path.is_file() {
215            BankSnapshotKind::Post
216        } else {
217            return Err(SnapshotNewFromDirError::MissingSnapshotFile(
218                bank_snapshot_dir,
219            ));
220        };
221
222        Ok(BankSnapshotInfo {
223            slot,
224            snapshot_kind,
225            snapshot_dir: bank_snapshot_dir,
226            snapshot_version,
227        })
228    }
229
230    pub fn snapshot_path(&self) -> PathBuf {
231        let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot));
232
233        let ext = match self.snapshot_kind {
234            BankSnapshotKind::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
235            BankSnapshotKind::Post => "",
236        };
237        bank_snapshot_path.set_extension(ext);
238
239        bank_snapshot_path
240    }
241}
242
243/// Bank snapshots traditionally had their accounts hash calculated prior to serialization.  Since
244/// the hash calculation takes a long time, an optimization has been put in to offload the accounts
245/// hash calculation.  The bank serialization format has not changed, so we need another way to
246/// identify if a bank snapshot contains the calculated accounts hash or not.
247///
248/// When a bank snapshot is first taken, it does not have the calculated accounts hash.  It is said
249/// that this bank snapshot is "pre" accounts hash.  Later, when the accounts hash is calculated,
250/// the bank snapshot is re-serialized, and is now "post" accounts hash.
251#[derive(Debug, Copy, Clone, Eq, PartialEq)]
252pub enum BankSnapshotKind {
253    /// This bank snapshot has *not* yet had its accounts hash calculated
254    Pre,
255    /// This bank snapshot *has* had its accounts hash calculated
256    Post,
257}
258
259/// When constructing a bank a snapshot, traditionally the snapshot was from a snapshot archive.  Now,
260/// the snapshot can be from a snapshot directory, or from a snapshot archive.  This is the flag to
261/// indicate which.
262#[derive(Clone, Copy, Debug, Eq, PartialEq)]
263pub enum SnapshotFrom {
264    /// Build from the snapshot archive
265    Archive,
266    /// Build directly from the bank snapshot directory
267    Dir,
268}
269
270/// Helper type when rebuilding from snapshots.  Designed to handle when rebuilding from just a
271/// full snapshot, or from both a full snapshot and an incremental snapshot.
272#[derive(Debug)]
273pub struct SnapshotRootPaths {
274    pub full_snapshot_root_file_path: PathBuf,
275    pub incremental_snapshot_root_file_path: Option<PathBuf>,
276}
277
278/// Helper type to bundle up the results from `unarchive_snapshot()`
279#[derive(Debug)]
280pub struct UnarchivedSnapshot {
281    #[allow(dead_code)]
282    unpack_dir: TempDir,
283    pub storage: AccountStorageMap,
284    pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
285    pub measure_untar: Measure,
286}
287
288/// Helper type for passing around the unpacked snapshots dir and the snapshot version together
289#[derive(Debug)]
290pub struct UnpackedSnapshotsDirAndVersion {
291    pub unpacked_snapshots_dir: PathBuf,
292    pub snapshot_version: SnapshotVersion,
293}
294
295/// Helper type for passing around account storage map and next append vec id
296/// for reconstructing accounts from a snapshot
297pub(crate) struct StorageAndNextAccountsFileId {
298    pub storage: AccountStorageMap,
299    pub next_append_vec_id: AtomicAccountsFileId,
300}
301
302#[derive(Error, Debug)]
303#[allow(clippy::large_enum_variant)]
304pub enum SnapshotError {
305    #[error("I/O error: {0}")]
306    Io(#[from] IoError),
307
308    #[error("AccountsFile error: {0}")]
309    AccountsFileError(#[from] AccountsFileError),
310
311    #[error("serialization error: {0}")]
312    Serialize(#[from] bincode::Error),
313
314    #[error("crossbeam send error: {0}")]
315    CrossbeamSend(#[from] crossbeam_channel::SendError<PathBuf>),
316
317    #[error("archive generation failure {0}")]
318    ArchiveGenerationFailure(ExitStatus),
319
320    #[error("Unpack error: {0}")]
321    UnpackError(#[from] UnpackError),
322
323    #[error("source({1}) - I/O error: {0}")]
324    IoWithSource(IoError, &'static str),
325
326    #[error("could not get file name from path '{0}'")]
327    PathToFileNameError(PathBuf),
328
329    #[error("could not get str from file name '{0}'")]
330    FileNameToStrError(PathBuf),
331
332    #[error("could not parse snapshot archive's file name '{0}'")]
333    ParseSnapshotArchiveFileNameError(String),
334
335    #[error("snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot ({1}) do not match")]
336    MismatchedBaseSlot(Slot, Slot),
337
338    #[error("no snapshot archives to load from '{0}'")]
339    NoSnapshotArchives(PathBuf),
340
341    #[error("snapshot slot mismatch: deserialized bank: {0}, snapshot archive: {1}")]
342    MismatchedSlot(Slot, Slot),
343
344    #[error("snapshot hash mismatch: deserialized bank: {0:?}, snapshot archive: {1:?}")]
345    MismatchedHash(SnapshotHash, SnapshotHash),
346
347    #[error("snapshot slot deltas are invalid: {0}")]
348    VerifySlotDeltas(#[from] VerifySlotDeltasError),
349
350    #[error("snapshot epoch stakes are invalid: {0}")]
351    VerifyEpochStakes(#[from] VerifyEpochStakesError),
352
353    #[error("bank_snapshot_info new_from_dir failed: {0}")]
354    NewFromDir(#[from] SnapshotNewFromDirError),
355
356    #[error("invalid snapshot dir path '{0}'")]
357    InvalidSnapshotDirPath(PathBuf),
358
359    #[error("invalid AppendVec path '{0}'")]
360    InvalidAppendVecPath(PathBuf),
361
362    #[error("invalid account path '{0}'")]
363    InvalidAccountPath(PathBuf),
364
365    #[error("no valid snapshot dir found under '{0}'")]
366    NoSnapshotSlotDir(PathBuf),
367
368    #[error("snapshot dir account paths mismatching")]
369    AccountPathsMismatch,
370
371    #[error("failed to add bank snapshot for slot {1}: {0}")]
372    AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
373
374    #[error("failed to archive snapshot package: {0}")]
375    ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
376
377    #[error("failed to rebuild snapshot storages: {0}")]
378    RebuildStorages(String),
379}
380
381#[derive(Error, Debug)]
382pub enum SnapshotNewFromDirError {
383    #[error("invalid bank snapshot directory '{0}'")]
384    InvalidBankSnapshotDir(PathBuf),
385
386    #[error("missing status cache file '{0}'")]
387    MissingStatusCacheFile(PathBuf),
388
389    #[error("missing version file '{0}'")]
390    MissingVersionFile(PathBuf),
391
392    #[error("invalid snapshot version '{0}'")]
393    InvalidVersion(String),
394
395    #[error("snapshot directory incomplete '{0}'")]
396    IncompleteDir(PathBuf),
397
398    #[error("missing snapshot file '{0}'")]
399    MissingSnapshotFile(PathBuf),
400}
401
402pub type Result<T> = std::result::Result<T, SnapshotError>;
403
404/// Errors that can happen in `verify_slot_deltas()`
405#[derive(Error, Debug, PartialEq, Eq)]
406pub enum VerifySlotDeltasError {
407    #[error("too many entries: {0} (max: {1})")]
408    TooManyEntries(usize, usize),
409
410    #[error("slot {0} is not a root")]
411    SlotIsNotRoot(Slot),
412
413    #[error("slot {0} is greater than bank slot {1}")]
414    SlotGreaterThanMaxRoot(Slot, Slot),
415
416    #[error("slot {0} has multiple entries")]
417    SlotHasMultipleEntries(Slot),
418
419    #[error("slot {0} was not found in slot history")]
420    SlotNotFoundInHistory(Slot),
421
422    #[error("slot {0} was in history but missing from slot deltas")]
423    SlotNotFoundInDeltas(Slot),
424
425    #[error("slot history is bad and cannot be used to verify slot deltas")]
426    BadSlotHistory,
427}
428
429/// Errors that can happen in `verify_epoch_stakes()`
430#[derive(Error, Debug, PartialEq, Eq)]
431pub enum VerifyEpochStakesError {
432    #[error("epoch {0} is greater than the max {1}")]
433    EpochGreaterThanMax(Epoch, Epoch),
434
435    #[error("stakes not found for epoch {0} (required epochs: {1:?})")]
436    StakesNotFound(Epoch, RangeInclusive<Epoch>),
437}
438
439/// Errors that can happen in `add_bank_snapshot()`
440#[derive(Error, Debug)]
441pub enum AddBankSnapshotError {
442    #[error("bank snapshot dir already exists '{0}'")]
443    SnapshotDirAlreadyExists(PathBuf),
444
445    #[error("failed to create snapshot dir '{1}': {0}")]
446    CreateSnapshotDir(#[source] IoError, PathBuf),
447
448    #[error("failed to flush storage '{1}': {0}")]
449    FlushStorage(#[source] AccountsFileError, PathBuf),
450
451    #[error("failed to mark snapshot storages as 'flushed': {0}")]
452    MarkStoragesFlushed(#[source] IoError),
453
454    #[error("failed to hard link storages: {0}")]
455    HardLinkStorages(#[source] HardLinkStoragesToSnapshotError),
456
457    #[error("failed to serialize bank: {0}")]
458    SerializeBank(#[source] Box<SnapshotError>),
459
460    #[error("failed to serialize status cache: {0}")]
461    SerializeStatusCache(#[source] Box<SnapshotError>),
462
463    #[error("failed to write snapshot version file '{1}': {0}")]
464    WriteSnapshotVersionFile(#[source] IoError, PathBuf),
465
466    #[error("failed to mark snapshot as 'complete': {0}")]
467    MarkSnapshotComplete(#[source] IoError),
468}
469
470/// Errors that can happen in `archive_snapshot_package()`
471#[derive(Error, Debug)]
472pub enum ArchiveSnapshotPackageError {
473    #[error("failed to create archive path '{1}': {0}")]
474    CreateArchiveDir(#[source] IoError, PathBuf),
475
476    #[error("failed to create staging dir inside '{1}': {0}")]
477    CreateStagingDir(#[source] IoError, PathBuf),
478
479    #[error("failed to create snapshot staging dir '{1}': {0}")]
480    CreateSnapshotStagingDir(#[source] IoError, PathBuf),
481
482    #[error("failed to canonicalize snapshot source dir '{1}': {0}")]
483    CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),
484
485    #[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
486    SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),
487
488    #[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
489    SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),
490
491    #[error("failed to symlink version file from '{1}' to '{2}': {0}")]
492    SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),
493
494    #[error("failed to create archive file '{1}': {0}")]
495    CreateArchiveFile(#[source] IoError, PathBuf),
496
497    #[error("failed to archive version file: {0}")]
498    ArchiveVersionFile(#[source] IoError),
499
500    #[error("failed to archive snapshots dir: {0}")]
501    ArchiveSnapshotsDir(#[source] IoError),
502
503    #[error("failed to archive account storage file '{1}': {0}")]
504    ArchiveAccountStorageFile(#[source] IoError, PathBuf),
505
506    #[error("failed to archive snapshot: {0}")]
507    FinishArchive(#[source] IoError),
508
509    #[error("failed to create encoder: {0}")]
510    CreateEncoder(#[source] IoError),
511
512    #[error("failed to encode archive: {0}")]
513    FinishEncoder(#[source] IoError),
514
515    #[error("failed to query archive metadata '{1}': {0}")]
516    QueryArchiveMetadata(#[source] IoError, PathBuf),
517
518    #[error("failed to move archive from '{1}' to '{2}': {0}")]
519    MoveArchive(#[source] IoError, PathBuf, PathBuf),
520
521    #[error("failed to create account storage reader '{1}': {0}")]
522    AccountStorageReaderError(#[source] IoError, PathBuf),
523}
524
525/// Errors that can happen in `hard_link_storages_to_snapshot()`
526#[derive(Error, Debug)]
527pub enum HardLinkStoragesToSnapshotError {
528    #[error("failed to create accounts hard links dir '{1}': {0}")]
529    CreateAccountsHardLinksDir(#[source] IoError, PathBuf),
530
531    #[error("failed to get the snapshot's accounts hard link dir: {0}")]
532    GetSnapshotHardLinksDir(#[from] GetSnapshotAccountsHardLinkDirError),
533
534    #[error("failed to hard link storage from '{1}' to '{2}': {0}")]
535    HardLinkStorage(#[source] IoError, PathBuf, PathBuf),
536}
537
538/// Errors that can happen in `get_snapshot_accounts_hardlink_dir()`
539#[derive(Error, Debug)]
540pub enum GetSnapshotAccountsHardLinkDirError {
541    #[error("invalid account storage path '{0}'")]
542    GetAccountPath(PathBuf),
543
544    #[error("failed to create the snapshot hard link dir '{1}': {0}")]
545    CreateSnapshotHardLinkDir(#[source] IoError, PathBuf),
546
547    #[error("failed to symlink snapshot hard link dir '{link}' to '{original}': {source}")]
548    SymlinkSnapshotHardLinkDir {
549        source: IoError,
550        original: PathBuf,
551        link: PathBuf,
552    },
553}
554
555/// The account snapshot directories under <account_path>/snapshot/<slot> contain account files hardlinked
556/// from <account_path>/run taken at snapshot <slot> time.  They are referenced by the symlinks from the
557/// bank snapshot dir snapshot/<slot>/accounts_hardlinks/.  We observed that sometimes the bank snapshot dir
558/// could be deleted but the account snapshot directories were left behind, possibly by some manual operations
559/// or some legacy code not using the symlinks to clean up the account snapshot hardlink directories.
560/// This function cleans up any account snapshot directories that are no longer referenced by the bank
561/// snapshot dirs, to ensure proper snapshot operations.
562pub fn clean_orphaned_account_snapshot_dirs(
563    bank_snapshots_dir: impl AsRef<Path>,
564    account_snapshot_paths: &[PathBuf],
565) -> IoResult<()> {
566    // Create the HashSet of the account snapshot hardlink directories referenced by the snapshot dirs.
567    // This is used to clean up any hardlinks that are no longer referenced by the snapshot dirs.
568    let mut account_snapshot_dirs_referenced = HashSet::new();
569    let snapshots = get_bank_snapshots(bank_snapshots_dir);
570    for snapshot in snapshots {
571        let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
572        // loop through entries in the snapshot_hardlink_dir, read the symlinks, add the target to the HashSet
573        let Ok(read_dir) = fs::read_dir(&account_hardlinks_dir) else {
574            // The bank snapshot may not have a hard links dir with the storages.
575            // This is fine, and happens for bank snapshots we do *not* fastboot from.
576            // In this case, log it and go to the next bank snapshot.
577            debug!(
578                "failed to read account hardlinks dir '{}'",
579                account_hardlinks_dir.display(),
580            );
581            continue;
582        };
583        for entry in read_dir {
584            let path = entry?.path();
585            let target = fs::read_link(&path).map_err(|err| {
586                IoError::other(format!(
587                    "failed to read symlink '{}': {err}",
588                    path.display(),
589                ))
590            })?;
591            account_snapshot_dirs_referenced.insert(target);
592        }
593    }
594
595    // loop through the account snapshot hardlink directories, if the directory is not in the account_snapshot_dirs_referenced set, delete it
596    for account_snapshot_path in account_snapshot_paths {
597        let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
598            IoError::other(format!(
599                "failed to read account snapshot dir '{}': {err}",
600                account_snapshot_path.display(),
601            ))
602        })?;
603        for entry in read_dir {
604            let path = entry?.path();
605            if !account_snapshot_dirs_referenced.contains(&path) {
606                info!(
607                    "Removing orphaned account snapshot hardlink directory '{}'...",
608                    path.display()
609                );
610                move_and_async_delete_path(&path);
611            }
612        }
613    }
614
615    Ok(())
616}
617
618/// Purges incomplete bank snapshots
619pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
620    let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
621        // If we cannot read the bank snapshots dir, then there's nothing to do
622        return;
623    };
624
625    let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
626
627    let incomplete_dirs: Vec<_> = read_dir_iter
628        .filter_map(|entry| entry.ok())
629        .map(|entry| entry.path())
630        .filter(|path| path.is_dir())
631        .filter(is_incomplete)
632        .collect();
633
634    // attempt to purge all the incomplete directories; do not exit early
635    for incomplete_dir in incomplete_dirs {
636        let result = purge_bank_snapshot(&incomplete_dir);
637        match result {
638            Ok(_) => info!(
639                "Purged incomplete snapshot dir: {}",
640                incomplete_dir.display()
641            ),
642            Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
643        }
644    }
645}
646
647/// Is the bank snapshot complete?
648fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
649    let state_complete_path = bank_snapshot_dir
650        .as_ref()
651        .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
652    state_complete_path.is_file()
653}
654
655/// Marks the bank snapshot as complete
656fn write_snapshot_state_complete_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<()> {
657    let state_complete_path = bank_snapshot_dir
658        .as_ref()
659        .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
660    fs::File::create(&state_complete_path).map_err(|err| {
661        IoError::other(format!(
662            "failed to create file '{}': {err}",
663            state_complete_path.display(),
664        ))
665    })?;
666    Ok(())
667}
668
669/// Writes the full snapshot slot file into the bank snapshot dir
670pub fn write_full_snapshot_slot_file(
671    bank_snapshot_dir: impl AsRef<Path>,
672    full_snapshot_slot: Slot,
673) -> IoResult<()> {
674    let full_snapshot_slot_path = bank_snapshot_dir
675        .as_ref()
676        .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
677    fs::write(
678        &full_snapshot_slot_path,
679        Slot::to_le_bytes(full_snapshot_slot),
680    )
681    .map_err(|err| {
682        IoError::other(format!(
683            "failed to write full snapshot slot file '{}': {err}",
684            full_snapshot_slot_path.display(),
685        ))
686    })
687}
688
689// Reads the full snapshot slot file from the bank snapshot dir
690pub fn read_full_snapshot_slot_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<Slot> {
691    const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
692    let full_snapshot_slot_path = bank_snapshot_dir
693        .as_ref()
694        .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
695    let full_snapshot_slot_file_metadata = fs::metadata(&full_snapshot_slot_path)?;
696    if full_snapshot_slot_file_metadata.len() != SLOT_SIZE as u64 {
697        let error_message = format!(
698            "invalid full snapshot slot file size: '{}' has {} bytes (should be {} bytes)",
699            full_snapshot_slot_path.display(),
700            full_snapshot_slot_file_metadata.len(),
701            SLOT_SIZE,
702        );
703        return Err(IoError::other(error_message));
704    }
705    let mut full_snapshot_slot_file = fs::File::open(&full_snapshot_slot_path)?;
706    let mut buffer = [0; SLOT_SIZE];
707    full_snapshot_slot_file.read_exact(&mut buffer)?;
708    let slot = Slot::from_le_bytes(buffer);
709    Ok(slot)
710}
711
712/// Writes the 'snapshot storages have been flushed' file to the bank snapshot dir
713pub fn write_storages_flushed_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<()> {
714    let flushed_storages_path = bank_snapshot_dir
715        .as_ref()
716        .join(SNAPSHOT_STORAGES_FLUSHED_FILENAME);
717    fs::File::create(&flushed_storages_path).map_err(|err| {
718        IoError::other(format!(
719            "failed to create file '{}': {err}",
720            flushed_storages_path.display(),
721        ))
722    })?;
723    Ok(())
724}
725
726/// Were the snapshot storages flushed in this bank snapshot?
727fn are_bank_snapshot_storages_flushed(bank_snapshot_dir: impl AsRef<Path>) -> bool {
728    let flushed_storages = bank_snapshot_dir
729        .as_ref()
730        .join(SNAPSHOT_STORAGES_FLUSHED_FILENAME);
731    flushed_storages.is_file()
732}
733
734/// Gets the highest, loadable, bank snapshot
735///
736/// The highest bank snapshot is the one with the highest slot.
737/// To be loadable, the bank snapshot must be a BankSnapshotKind::Post.
738/// And if we're generating snapshots (e.g. running a normal validator), then
739/// the full snapshot file's slot must match the highest full snapshot archive's.
740/// Lastly, the account storages must have been flushed to be loadable.
741pub fn get_highest_loadable_bank_snapshot(
742    snapshot_config: &SnapshotConfig,
743) -> Option<BankSnapshotInfo> {
744    let highest_bank_snapshot =
745        get_highest_bank_snapshot_post(&snapshot_config.bank_snapshots_dir)?;
746
747    // If we're *not* generating snapshots, e.g. running ledger-tool, then we *can* load
748    // this bank snapshot, and we do not need to check for anything else.
749    if !snapshot_config.should_generate_snapshots() {
750        return Some(highest_bank_snapshot);
751    }
752
753    // Otherwise, the bank snapshot's full snapshot slot *must* be the same as
754    // the highest full snapshot archive's slot.
755    let highest_full_snapshot_archive_slot =
756        get_highest_full_snapshot_archive_slot(&snapshot_config.full_snapshot_archives_dir)?;
757    let full_snapshot_file_slot =
758        read_full_snapshot_slot_file(&highest_bank_snapshot.snapshot_dir).ok()?;
759    let are_storages_flushed =
760        are_bank_snapshot_storages_flushed(&highest_bank_snapshot.snapshot_dir);
761    (are_storages_flushed && (full_snapshot_file_slot == highest_full_snapshot_archive_slot))
762        .then_some(highest_bank_snapshot)
763}
764
765/// If the validator halts in the middle of `archive_snapshot_package()`, the temporary staging
766/// directory won't be cleaned up.  Call this function to clean them up.
767pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
768    if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
769        for entry in entries.flatten() {
770            if entry
771                .file_name()
772                .to_str()
773                .map(|file_name| file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX))
774                .unwrap_or(false)
775            {
776                let path = entry.path();
777                let result = if path.is_dir() {
778                    fs::remove_dir_all(&path)
779                } else {
780                    fs::remove_file(&path)
781                };
782                if let Err(err) = result {
783                    warn!(
784                        "Failed to remove temporary snapshot archive '{}': {err}",
785                        path.display(),
786                    );
787                }
788            }
789        }
790    }
791}
792
793/// Serializes and archives a snapshot package
794pub fn serialize_and_archive_snapshot_package(
795    snapshot_package: SnapshotPackage,
796    snapshot_config: &SnapshotConfig,
797    should_flush_and_hard_link_storages: bool,
798) -> Result<SnapshotArchiveInfo> {
799    let SnapshotPackage {
800        snapshot_kind,
801        slot: snapshot_slot,
802        block_height,
803        hash: snapshot_hash,
804        mut snapshot_storages,
805        status_cache_slot_deltas,
806        bank_fields_to_serialize,
807        bank_hash_stats,
808        accounts_delta_hash,
809        accounts_hash,
810        epoch_accounts_hash,
811        bank_incremental_snapshot_persistence,
812        write_version,
813        enqueued: _,
814    } = snapshot_package;
815
816    let bank_snapshot_info = serialize_snapshot(
817        &snapshot_config.bank_snapshots_dir,
818        snapshot_config.snapshot_version,
819        snapshot_storages.as_slice(),
820        status_cache_slot_deltas.as_slice(),
821        bank_fields_to_serialize,
822        bank_hash_stats,
823        accounts_delta_hash,
824        accounts_hash,
825        epoch_accounts_hash,
826        bank_incremental_snapshot_persistence.as_ref(),
827        write_version,
828        should_flush_and_hard_link_storages,
829    )?;
830
831    // now write the full snapshot slot file after serializing so this bank snapshot is loadable
832    let full_snapshot_archive_slot = match snapshot_kind {
833        SnapshotKind::FullSnapshot => snapshot_slot,
834        SnapshotKind::IncrementalSnapshot(base_slot) => base_slot,
835    };
836    write_full_snapshot_slot_file(&bank_snapshot_info.snapshot_dir, full_snapshot_archive_slot)
837        .map_err(|err| {
838            IoError::other(format!(
839                "failed to serialize snapshot slot {snapshot_slot}, block height {block_height}, kind {snapshot_kind:?}: {err}",
840            ))
841        })?;
842
843    let snapshot_archive_path = match snapshot_package.snapshot_kind {
844        SnapshotKind::FullSnapshot => build_full_snapshot_archive_path(
845            &snapshot_config.full_snapshot_archives_dir,
846            snapshot_package.slot,
847            &snapshot_package.hash,
848            snapshot_config.archive_format,
849        ),
850        SnapshotKind::IncrementalSnapshot(incremental_snapshot_base_slot) => {
851            // After the snapshot has been serialized, it is now safe (and required) to prune all
852            // the storages that are *not* to be archived for this incremental snapshot.
853            snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
854            build_incremental_snapshot_archive_path(
855                &snapshot_config.incremental_snapshot_archives_dir,
856                incremental_snapshot_base_slot,
857                snapshot_package.slot,
858                &snapshot_package.hash,
859                snapshot_config.archive_format,
860            )
861        }
862    };
863
864    let snapshot_archive_info = archive_snapshot(
865        snapshot_kind,
866        snapshot_slot,
867        snapshot_hash,
868        snapshot_storages.as_slice(),
869        &bank_snapshot_info.snapshot_dir,
870        snapshot_archive_path,
871        snapshot_config.archive_format,
872    )?;
873
874    Ok(snapshot_archive_info)
875}
876
877/// Serializes a snapshot into `bank_snapshots_dir`
878#[allow(clippy::too_many_arguments)]
879fn serialize_snapshot(
880    bank_snapshots_dir: impl AsRef<Path>,
881    snapshot_version: SnapshotVersion,
882    snapshot_storages: &[Arc<AccountStorageEntry>],
883    slot_deltas: &[BankSlotDelta],
884    mut bank_fields: BankFieldsToSerialize,
885    bank_hash_stats: BankHashStats,
886    accounts_delta_hash: AccountsDeltaHash,
887    accounts_hash: AccountsHash,
888    epoch_accounts_hash: Option<EpochAccountsHash>,
889    bank_incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
890    write_version: u64,
891    should_flush_and_hard_link_storages: bool,
892) -> Result<BankSnapshotInfo> {
893    let slot = bank_fields.slot;
894
895    // this lambda function is to facilitate converting between
896    // the AddBankSnapshotError and SnapshotError types
897    let do_serialize_snapshot = || {
898        let mut measure_everything = Measure::start("");
899        let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
900        if bank_snapshot_dir.exists() {
901            return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
902                bank_snapshot_dir,
903            ));
904        }
905        fs::create_dir_all(&bank_snapshot_dir).map_err(|err| {
906            AddBankSnapshotError::CreateSnapshotDir(err, bank_snapshot_dir.clone())
907        })?;
908
909        // the bank snapshot is stored as bank_snapshots_dir/slot/slot
910        let bank_snapshot_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
911        info!(
912            "Creating bank snapshot for slot {slot} at '{}'",
913            bank_snapshot_path.display(),
914        );
915
916        let (flush_storages_us, hard_link_storages_us) = if should_flush_and_hard_link_storages {
917            let flush_measure = Measure::start("");
918            for storage in snapshot_storages {
919                storage.flush().map_err(|err| {
920                    AddBankSnapshotError::FlushStorage(err, storage.path().to_path_buf())
921                })?;
922            }
923            let flush_us = flush_measure.end_as_us();
924            let (_, hard_link_us) = measure_us!(hard_link_storages_to_snapshot(
925                &bank_snapshot_dir,
926                slot,
927                snapshot_storages
928            )
929            .map_err(AddBankSnapshotError::HardLinkStorages)?);
930            write_storages_flushed_file(&bank_snapshot_dir)
931                .map_err(AddBankSnapshotError::MarkStoragesFlushed)?;
932            Some((flush_us, hard_link_us))
933        } else {
934            None
935        }
936        .unzip();
937
938        let bank_snapshot_serializer = move |stream: &mut BufWriter<fs::File>| -> Result<()> {
939            let versioned_epoch_stakes = mem::take(&mut bank_fields.versioned_epoch_stakes);
940            let extra_fields = ExtraFieldsToSerialize {
941                lamports_per_signature: bank_fields.fee_rate_governor.lamports_per_signature,
942                incremental_snapshot_persistence: bank_incremental_snapshot_persistence,
943                epoch_accounts_hash,
944                versioned_epoch_stakes,
945                accounts_lt_hash: bank_fields.accounts_lt_hash.clone().map(Into::into),
946            };
947            serde_snapshot::serialize_bank_snapshot_into(
948                stream,
949                bank_fields,
950                bank_hash_stats,
951                accounts_delta_hash,
952                accounts_hash,
953                &get_storages_to_serialize(snapshot_storages),
954                extra_fields,
955                write_version,
956            )?;
957            Ok(())
958        };
959        let (bank_snapshot_consumed_size, bank_serialize) = measure_time!(
960            serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
961                .map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
962            "bank serialize"
963        );
964
965        let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
966        let (status_cache_consumed_size, status_cache_serialize_us) = measure_us!(
967            snapshot_bank_utils::serialize_status_cache(slot_deltas, &status_cache_path)
968                .map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?
969        );
970
971        let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
972        let (_, write_version_file_us) = measure_us!(fs::write(
973            &version_path,
974            snapshot_version.as_str().as_bytes(),
975        )
976        .map_err(|err| AddBankSnapshotError::WriteSnapshotVersionFile(err, version_path))?);
977
978        // Mark this directory complete so it can be used.  Check this flag first before selecting for deserialization.
979        let (_, write_state_complete_file_us) = measure_us!({
980            write_snapshot_state_complete_file(&bank_snapshot_dir)
981                .map_err(AddBankSnapshotError::MarkSnapshotComplete)?
982        });
983
984        measure_everything.stop();
985
986        // Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
987        datapoint_info!(
988            "snapshot_bank",
989            ("slot", slot, i64),
990            ("bank_size", bank_snapshot_consumed_size, i64),
991            ("status_cache_size", status_cache_consumed_size, i64),
992            ("flush_storages_us", flush_storages_us, Option<i64>),
993            ("hard_link_storages_us", hard_link_storages_us, Option<i64>),
994            ("bank_serialize_us", bank_serialize.as_us(), i64),
995            ("status_cache_serialize_us", status_cache_serialize_us, i64),
996            ("write_version_file_us", write_version_file_us, i64),
997            (
998                "write_state_complete_file_us",
999                write_state_complete_file_us,
1000                i64
1001            ),
1002            ("total_us", measure_everything.as_us(), i64),
1003        );
1004
1005        info!(
1006            "{} for slot {} at {}",
1007            bank_serialize,
1008            slot,
1009            bank_snapshot_path.display(),
1010        );
1011
1012        Ok(BankSnapshotInfo {
1013            slot,
1014            snapshot_kind: BankSnapshotKind::Pre,
1015            snapshot_dir: bank_snapshot_dir,
1016            snapshot_version,
1017        })
1018    };
1019
1020    do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
1021}
1022
1023/// Archives a snapshot into `archive_path`
1024fn archive_snapshot(
1025    snapshot_kind: SnapshotKind,
1026    snapshot_slot: Slot,
1027    snapshot_hash: SnapshotHash,
1028    snapshot_storages: &[Arc<AccountStorageEntry>],
1029    bank_snapshot_dir: impl AsRef<Path>,
1030    archive_path: impl AsRef<Path>,
1031    archive_format: ArchiveFormat,
1032) -> Result<SnapshotArchiveInfo> {
1033    use ArchiveSnapshotPackageError as E;
1034    const SNAPSHOTS_DIR: &str = "snapshots";
1035    const ACCOUNTS_DIR: &str = "accounts";
1036    info!("Generating snapshot archive for slot {snapshot_slot}, kind: {snapshot_kind:?}");
1037
1038    let mut timer = Measure::start("snapshot_package-package_snapshots");
1039    let tar_dir = archive_path
1040        .as_ref()
1041        .parent()
1042        .expect("Tar output path is invalid");
1043
1044    fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
1045
1046    // Create the staging directories
1047    let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
1048    let staging_dir = tempfile::Builder::new()
1049        .prefix(&format!("{}{}-", staging_dir_prefix, snapshot_slot))
1050        .tempdir_in(tar_dir)
1051        .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
1052    let staging_snapshots_dir = staging_dir.path().join(SNAPSHOTS_DIR);
1053
1054    let slot_str = snapshot_slot.to_string();
1055    let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
1056    // Creates staging snapshots/<slot>/
1057    fs::create_dir_all(&staging_snapshot_dir)
1058        .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
1059
1060    // To be a source for symlinking and archiving, the path need to be an absolute path
1061    let src_snapshot_dir = bank_snapshot_dir.as_ref().canonicalize().map_err(|err| {
1062        E::CanonicalizeSnapshotSourceDir(err, bank_snapshot_dir.as_ref().to_path_buf())
1063    })?;
1064    let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
1065    let src_snapshot_file = src_snapshot_dir.join(slot_str);
1066    symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
1067        .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
1068
1069    // Following the existing archive format, the status cache is under snapshots/, not under <slot>/
1070    // like in the snapshot dir.
1071    let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1072    let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1073    symlink::symlink_file(&src_status_cache, &staging_status_cache)
1074        .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
1075
1076    // The bank snapshot has the version file, so symlink it to the correct staging path
1077    let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
1078    let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1079    symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
1080        E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
1081    })?;
1082
1083    // Tar the staging directory into the archive at `staging_archive_path`
1084    let staging_archive_path = tar_dir.join(format!(
1085        "{}{}.{}",
1086        staging_dir_prefix,
1087        snapshot_slot,
1088        archive_format.extension(),
1089    ));
1090
1091    {
1092        let archive_file = fs::File::create(&staging_archive_path)
1093            .map_err(|err| E::CreateArchiveFile(err, staging_archive_path.clone()))?;
1094
1095        let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
1096            let mut archive = tar::Builder::new(encoder);
1097            // Disable sparse file handling.  This seems to be the root cause of an issue when
1098            // upgrading v2.0 to v2.1, and the tar crate from 0.4.41 to 0.4.42.
1099            // Since the tarball will still go through compression (zstd/etc) afterwards, disabling
1100            // sparse handling in the tar itself should be fine.
1101            //
1102            // Likely introduced in [^1].  Tracking resolution in [^2].
1103            // [^1] https://github.com/alexcrichton/tar-rs/pull/375
1104            // [^2] https://github.com/alexcrichton/tar-rs/issues/403
1105            archive.sparse(false);
1106            // Serialize the version and snapshots files before accounts so we can quickly determine the version
1107            // and other bank fields. This is necessary if we want to interleave unpacking with reconstruction
1108            archive
1109                .append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
1110                .map_err(E::ArchiveVersionFile)?;
1111            archive
1112                .append_dir_all(SNAPSHOTS_DIR, &staging_snapshots_dir)
1113                .map_err(E::ArchiveSnapshotsDir)?;
1114
1115            for storage in snapshot_storages {
1116                let path_in_archive = Path::new(ACCOUNTS_DIR)
1117                    .join(AccountsFile::file_name(storage.slot(), storage.id()));
1118
1119                let reader =
1120                    AccountStorageReader::new(storage, Some(snapshot_slot)).map_err(|err| {
1121                        E::AccountStorageReaderError(err, storage.path().to_path_buf())
1122                    })?;
1123                let mut header = tar::Header::new_gnu();
1124                header.set_path(path_in_archive).map_err(|err| {
1125                    E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1126                })?;
1127                header.set_size(reader.len() as u64);
1128                header.set_cksum();
1129                archive.append(&header, reader).map_err(|err| {
1130                    E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1131                })?;
1132            }
1133
1134            archive.into_inner().map_err(E::FinishArchive)?;
1135            Ok(())
1136        };
1137
1138        match archive_format {
1139            ArchiveFormat::TarZstd { config } => {
1140                let mut encoder =
1141                    zstd::stream::Encoder::new(archive_file, config.compression_level)
1142                        .map_err(E::CreateEncoder)?;
1143                do_archive_files(&mut encoder)?;
1144                encoder.finish().map_err(E::FinishEncoder)?;
1145            }
1146            ArchiveFormat::TarLz4 => {
1147                let mut encoder = lz4::EncoderBuilder::new()
1148                    .level(1)
1149                    .build(archive_file)
1150                    .map_err(E::CreateEncoder)?;
1151                do_archive_files(&mut encoder)?;
1152                let (_output, result) = encoder.finish();
1153                result.map_err(E::FinishEncoder)?;
1154            }
1155            _ => panic!("archiving snapshot with '{archive_format}' is not supported"),
1156        };
1157    }
1158
1159    // Atomically move the archive into position for other validators to find
1160    let metadata = fs::metadata(&staging_archive_path)
1161        .map_err(|err| E::QueryArchiveMetadata(err, staging_archive_path.clone()))?;
1162    let archive_path = archive_path.as_ref().to_path_buf();
1163    fs::rename(&staging_archive_path, &archive_path)
1164        .map_err(|err| E::MoveArchive(err, staging_archive_path, archive_path.clone()))?;
1165
1166    timer.stop();
1167    info!(
1168        "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
1169        archive_path.display(),
1170        snapshot_slot,
1171        timer.as_ms(),
1172        metadata.len()
1173    );
1174
1175    datapoint_info!(
1176        "archive-snapshot-package",
1177        ("slot", snapshot_slot, i64),
1178        ("archive_format", archive_format.to_string(), String),
1179        ("duration_ms", timer.as_ms(), i64),
1180        (
1181            if snapshot_kind.is_full_snapshot() {
1182                "full-snapshot-archive-size"
1183            } else {
1184                "incremental-snapshot-archive-size"
1185            },
1186            metadata.len(),
1187            i64
1188        ),
1189    );
1190    Ok(SnapshotArchiveInfo {
1191        path: archive_path,
1192        slot: snapshot_slot,
1193        hash: snapshot_hash,
1194        archive_format,
1195    })
1196}
1197
1198/// Get the bank snapshots in a directory
1199pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1200    let mut bank_snapshots = Vec::default();
1201    match fs::read_dir(&bank_snapshots_dir) {
1202        Err(err) => {
1203            info!(
1204                "Unable to read bank snapshots directory '{}': {err}",
1205                bank_snapshots_dir.as_ref().display(),
1206            );
1207        }
1208        Ok(paths) => paths
1209            .filter_map(|entry| {
1210                // check if this entry is a directory and only a Slot
1211                // bank snapshots are bank_snapshots_dir/slot/slot(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION)
1212                entry
1213                    .ok()
1214                    .filter(|entry| entry.path().is_dir())
1215                    .and_then(|entry| {
1216                        entry
1217                            .path()
1218                            .file_name()
1219                            .and_then(|file_name| file_name.to_str())
1220                            .and_then(|file_name| file_name.parse::<Slot>().ok())
1221                    })
1222            })
1223            .for_each(
1224                |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
1225                    Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
1226                    // Other threads may be modifying bank snapshots in parallel; only return
1227                    // snapshots that are complete as deemed by BankSnapshotInfo::new_from_dir()
1228                    Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
1229                },
1230            ),
1231    }
1232    bank_snapshots
1233}
1234
1235/// Get the bank snapshots in a directory
1236///
1237/// This function retains only the bank snapshots of kind BankSnapshotKind::Pre
1238pub fn get_bank_snapshots_pre(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1239    let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1240    bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Pre);
1241    bank_snapshots
1242}
1243
1244/// Get the bank snapshots in a directory
1245///
1246/// This function retains only the bank snapshots of kind BankSnapshotKind::Post
1247pub fn get_bank_snapshots_post(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1248    let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1249    bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Post);
1250    bank_snapshots
1251}
1252
1253/// Get the bank snapshot with the highest slot in a directory
1254///
1255/// This function gets the highest bank snapshot of kind BankSnapshotKind::Pre
1256pub fn get_highest_bank_snapshot_pre(
1257    bank_snapshots_dir: impl AsRef<Path>,
1258) -> Option<BankSnapshotInfo> {
1259    do_get_highest_bank_snapshot(get_bank_snapshots_pre(bank_snapshots_dir))
1260}
1261
1262/// Get the bank snapshot with the highest slot in a directory
1263///
1264/// This function gets the highest bank snapshot of kind BankSnapshotKind::Post
1265pub fn get_highest_bank_snapshot_post(
1266    bank_snapshots_dir: impl AsRef<Path>,
1267) -> Option<BankSnapshotInfo> {
1268    do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
1269}
1270
1271/// Get the bank snapshot with the highest slot in a directory
1272///
1273/// This function gets the highest bank snapshot of any kind
1274pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
1275    do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
1276}
1277
1278fn do_get_highest_bank_snapshot(
1279    mut bank_snapshots: Vec<BankSnapshotInfo>,
1280) -> Option<BankSnapshotInfo> {
1281    bank_snapshots.sort_unstable();
1282    bank_snapshots.into_iter().next_back()
1283}
1284
1285pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
1286where
1287    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1288{
1289    serialize_snapshot_data_file_capped::<F>(
1290        data_file_path,
1291        MAX_SNAPSHOT_DATA_FILE_SIZE,
1292        serializer,
1293    )
1294}
1295
1296pub fn deserialize_snapshot_data_file<T: Sized>(
1297    data_file_path: &Path,
1298    deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
1299) -> Result<T> {
1300    let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
1301        deserializer(streams.full_snapshot_stream)
1302    };
1303
1304    let wrapped_data_file_path = SnapshotRootPaths {
1305        full_snapshot_root_file_path: data_file_path.to_path_buf(),
1306        incremental_snapshot_root_file_path: None,
1307    };
1308
1309    deserialize_snapshot_data_files_capped(
1310        &wrapped_data_file_path,
1311        MAX_SNAPSHOT_DATA_FILE_SIZE,
1312        wrapped_deserializer,
1313    )
1314}
1315
1316pub fn deserialize_snapshot_data_files<T: Sized>(
1317    snapshot_root_paths: &SnapshotRootPaths,
1318    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1319) -> Result<T> {
1320    deserialize_snapshot_data_files_capped(
1321        snapshot_root_paths,
1322        MAX_SNAPSHOT_DATA_FILE_SIZE,
1323        deserializer,
1324    )
1325}
1326
1327fn serialize_snapshot_data_file_capped<F>(
1328    data_file_path: &Path,
1329    maximum_file_size: u64,
1330    serializer: F,
1331) -> Result<u64>
1332where
1333    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1334{
1335    let data_file = fs::File::create(data_file_path)?;
1336    let mut data_file_stream = BufWriter::new(data_file);
1337    serializer(&mut data_file_stream)?;
1338    data_file_stream.flush()?;
1339
1340    let consumed_size = data_file_stream.stream_position()?;
1341    if consumed_size > maximum_file_size {
1342        let error_message = format!(
1343            "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
1344            data_file_path.display(),
1345        );
1346        return Err(IoError::other(error_message).into());
1347    }
1348    Ok(consumed_size)
1349}
1350
1351fn deserialize_snapshot_data_files_capped<T: Sized>(
1352    snapshot_root_paths: &SnapshotRootPaths,
1353    maximum_file_size: u64,
1354    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1355) -> Result<T> {
1356    let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
1357        create_snapshot_data_file_stream(
1358            &snapshot_root_paths.full_snapshot_root_file_path,
1359            maximum_file_size,
1360        )?;
1361
1362    let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
1363        if let Some(ref incremental_snapshot_root_file_path) =
1364            snapshot_root_paths.incremental_snapshot_root_file_path
1365        {
1366            Some(create_snapshot_data_file_stream(
1367                incremental_snapshot_root_file_path,
1368                maximum_file_size,
1369            )?)
1370        } else {
1371            None
1372        }
1373        .unzip();
1374
1375    let mut snapshot_streams = SnapshotStreams {
1376        full_snapshot_stream: &mut full_snapshot_data_file_stream,
1377        incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
1378    };
1379    let ret = deserializer(&mut snapshot_streams)?;
1380
1381    check_deserialize_file_consumed(
1382        full_snapshot_file_size,
1383        &snapshot_root_paths.full_snapshot_root_file_path,
1384        &mut full_snapshot_data_file_stream,
1385    )?;
1386
1387    if let Some(ref incremental_snapshot_root_file_path) =
1388        snapshot_root_paths.incremental_snapshot_root_file_path
1389    {
1390        check_deserialize_file_consumed(
1391            incremental_snapshot_file_size.unwrap(),
1392            incremental_snapshot_root_file_path,
1393            incremental_snapshot_data_file_stream.as_mut().unwrap(),
1394        )?;
1395    }
1396
1397    Ok(ret)
1398}
1399
1400/// Before running the deserializer function, perform common operations on the snapshot archive
1401/// files, such as checking the file size and opening the file into a stream.
1402fn create_snapshot_data_file_stream(
1403    snapshot_root_file_path: impl AsRef<Path>,
1404    maximum_file_size: u64,
1405) -> Result<(u64, BufReader<std::fs::File>)> {
1406    let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
1407
1408    if snapshot_file_size > maximum_file_size {
1409        let error_message = format!(
1410            "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
1411            snapshot_root_file_path.as_ref().display(),
1412            snapshot_file_size,
1413            maximum_file_size,
1414        );
1415        return Err(IoError::other(error_message).into());
1416    }
1417
1418    let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
1419    let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
1420
1421    Ok((snapshot_file_size, snapshot_data_file_stream))
1422}
1423
1424/// After running the deserializer function, perform common checks to ensure the snapshot archive
1425/// files were consumed correctly.
1426fn check_deserialize_file_consumed(
1427    file_size: u64,
1428    file_path: impl AsRef<Path>,
1429    file_stream: &mut BufReader<std::fs::File>,
1430) -> Result<()> {
1431    let consumed_size = file_stream.stream_position()?;
1432
1433    if consumed_size != file_size {
1434        let error_message = format!(
1435            "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to deserialize",
1436            file_path.as_ref().display(),
1437            file_size,
1438            consumed_size,
1439        );
1440        return Err(IoError::other(error_message).into());
1441    }
1442
1443    Ok(())
1444}
1445
1446/// Return account path from the appendvec path after checking its format.
1447fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
1448    let run_path = appendvec_path.parent()?;
1449    let run_file_name = run_path.file_name()?;
1450    // All appendvec files should be under <account_path>/run/.
1451    // When generating the bank snapshot directory, they are hardlinked to <account_path>/snapshot/<slot>/
1452    if run_file_name != ACCOUNTS_RUN_DIR {
1453        error!(
1454            "The account path {} does not have run/ as its immediate parent directory.",
1455            run_path.display()
1456        );
1457        return None;
1458    }
1459    let account_path = run_path.parent()?;
1460    Some(account_path.to_path_buf())
1461}
1462
1463/// From an appendvec path, derive the snapshot hardlink path.  If the corresponding snapshot hardlink
1464/// directory does not exist, create it.
1465fn get_snapshot_accounts_hardlink_dir(
1466    appendvec_path: &Path,
1467    bank_slot: Slot,
1468    account_paths: &mut HashSet<PathBuf>,
1469    hardlinks_dir: impl AsRef<Path>,
1470) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1471    let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1472        GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1473    })?;
1474
1475    let snapshot_hardlink_dir = account_path
1476        .join(ACCOUNTS_SNAPSHOT_DIR)
1477        .join(bank_slot.to_string());
1478
1479    // Use the hashset to track, to avoid checking the file system.  Only set up the hardlink directory
1480    // and the symlink to it at the first time of seeing the account_path.
1481    if !account_paths.contains(&account_path) {
1482        let idx = account_paths.len();
1483        debug!(
1484            "for appendvec_path {}, create hard-link path {}",
1485            appendvec_path.display(),
1486            snapshot_hardlink_dir.display()
1487        );
1488        fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1489            GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1490                err,
1491                snapshot_hardlink_dir.clone(),
1492            )
1493        })?;
1494        let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1495        symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1496            GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1497                source: err,
1498                original: snapshot_hardlink_dir.clone(),
1499                link: symlink_path,
1500            }
1501        })?;
1502        account_paths.insert(account_path);
1503    };
1504
1505    Ok(snapshot_hardlink_dir)
1506}
1507
1508/// Hard-link the files from accounts/ to snapshot/<bank_slot>/accounts/
1509/// This keeps the appendvec files alive and with the bank snapshot.  The slot and id
1510/// in the file names are also updated in case its file is a recycled one with inconsistent slot
1511/// and id.
1512pub fn hard_link_storages_to_snapshot(
1513    bank_snapshot_dir: impl AsRef<Path>,
1514    bank_slot: Slot,
1515    snapshot_storages: &[Arc<AccountStorageEntry>],
1516) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1517    let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1518    fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1519        HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1520            err,
1521            accounts_hardlinks_dir.clone(),
1522        )
1523    })?;
1524
1525    let mut account_paths: HashSet<PathBuf> = HashSet::new();
1526    for storage in snapshot_storages {
1527        let storage_path = storage.accounts.path();
1528        let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1529            storage_path,
1530            bank_slot,
1531            &mut account_paths,
1532            &accounts_hardlinks_dir,
1533        )?;
1534        // The appendvec could be recycled, so its filename may not be consistent to the slot and id.
1535        // Use the storage slot and id to compose a consistent file name for the hard-link file.
1536        let hardlink_filename = AccountsFile::file_name(storage.slot(), storage.id());
1537        let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1538        fs::hard_link(storage_path, &hard_link_path).map_err(|err| {
1539            HardLinkStoragesToSnapshotError::HardLinkStorage(
1540                err,
1541                storage_path.to_path_buf(),
1542                hard_link_path,
1543            )
1544        })?;
1545    }
1546    Ok(())
1547}
1548
1549/// serializing needs Vec<Vec<Arc<AccountStorageEntry>>>, but data structure at runtime is Vec<Arc<AccountStorageEntry>>
1550/// translates to what we need
1551pub(crate) fn get_storages_to_serialize(
1552    snapshot_storages: &[Arc<AccountStorageEntry>],
1553) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1554    snapshot_storages
1555        .iter()
1556        .map(|storage| vec![Arc::clone(storage)])
1557        .collect::<Vec<_>>()
1558}
1559
1560// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later.
1561const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
1562
1563/// Unarchives the given full and incremental snapshot archives, as long as they are compatible.
1564pub fn verify_and_unarchive_snapshots(
1565    bank_snapshots_dir: impl AsRef<Path>,
1566    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1567    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1568    account_paths: &[PathBuf],
1569    storage_access: StorageAccess,
1570) -> Result<(
1571    UnarchivedSnapshot,
1572    Option<UnarchivedSnapshot>,
1573    AtomicAccountsFileId,
1574)> {
1575    check_are_snapshots_compatible(
1576        full_snapshot_archive_info,
1577        incremental_snapshot_archive_info,
1578    )?;
1579
1580    let parallel_divisions = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT);
1581
1582    let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
1583    let unarchived_full_snapshot = unarchive_snapshot(
1584        &bank_snapshots_dir,
1585        TMP_SNAPSHOT_ARCHIVE_PREFIX,
1586        full_snapshot_archive_info.path(),
1587        "snapshot untar",
1588        account_paths,
1589        full_snapshot_archive_info.archive_format(),
1590        parallel_divisions,
1591        next_append_vec_id.clone(),
1592        storage_access,
1593    )?;
1594
1595    let unarchived_incremental_snapshot =
1596        if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1597            let unarchived_incremental_snapshot = unarchive_snapshot(
1598                &bank_snapshots_dir,
1599                TMP_SNAPSHOT_ARCHIVE_PREFIX,
1600                incremental_snapshot_archive_info.path(),
1601                "incremental snapshot untar",
1602                account_paths,
1603                incremental_snapshot_archive_info.archive_format(),
1604                parallel_divisions,
1605                next_append_vec_id.clone(),
1606                storage_access,
1607            )?;
1608            Some(unarchived_incremental_snapshot)
1609        } else {
1610            None
1611        };
1612
1613    Ok((
1614        unarchived_full_snapshot,
1615        unarchived_incremental_snapshot,
1616        Arc::try_unwrap(next_append_vec_id).unwrap(),
1617    ))
1618}
1619
1620/// Spawns a thread for unpacking a snapshot
1621fn spawn_unpack_snapshot_thread(
1622    file_sender: Sender<PathBuf>,
1623    account_paths: Arc<Vec<PathBuf>>,
1624    ledger_dir: Arc<PathBuf>,
1625    mut archive: Archive<SharedBufferReader>,
1626    parallel_selector: Option<ParallelSelector>,
1627    thread_index: usize,
1628) -> JoinHandle<()> {
1629    Builder::new()
1630        .name(format!("solUnpkSnpsht{thread_index:02}"))
1631        .spawn(move || {
1632            hardened_unpack::streaming_unpack_snapshot(
1633                &mut archive,
1634                ledger_dir.as_path(),
1635                &account_paths,
1636                parallel_selector,
1637                &file_sender,
1638            )
1639            .unwrap();
1640        })
1641        .unwrap()
1642}
1643
1644/// Streams unpacked files across channel
1645fn streaming_unarchive_snapshot(
1646    file_sender: Sender<PathBuf>,
1647    account_paths: Vec<PathBuf>,
1648    ledger_dir: PathBuf,
1649    snapshot_archive_path: PathBuf,
1650    archive_format: ArchiveFormat,
1651    num_threads: usize,
1652) -> Vec<JoinHandle<()>> {
1653    let account_paths = Arc::new(account_paths);
1654    let ledger_dir = Arc::new(ledger_dir);
1655    let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format);
1656
1657    // All shared buffer readers need to be created before the threads are spawned
1658    let archives: Vec<_> = (0..num_threads)
1659        .map(|_| {
1660            let reader = SharedBufferReader::new(&shared_buffer);
1661            Archive::new(reader)
1662        })
1663        .collect();
1664
1665    archives
1666        .into_iter()
1667        .enumerate()
1668        .map(|(thread_index, archive)| {
1669            let parallel_selector = Some(ParallelSelector {
1670                index: thread_index,
1671                divisions: num_threads,
1672            });
1673
1674            spawn_unpack_snapshot_thread(
1675                file_sender.clone(),
1676                account_paths.clone(),
1677                ledger_dir.clone(),
1678                archive,
1679                parallel_selector,
1680                thread_index,
1681            )
1682        })
1683        .collect()
1684}
1685
1686/// BankSnapshotInfo::new_from_dir() requires a few meta files to accept a snapshot dir
1687/// as a valid one.  A dir unpacked from an archive lacks these files.  Fill them here to
1688/// allow new_from_dir() checks to pass.  These checks are not needed for unpacked dirs,
1689/// but it is not clean to add another flag to new_from_dir() to skip them.
1690fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1691    let snapshots_dir = unpack_dir.as_ref().join("snapshots");
1692    if !snapshots_dir.is_dir() {
1693        return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1694    }
1695
1696    // The unpacked dir has a single slot dir, which is the snapshot slot dir.
1697    let slot_dir = std::fs::read_dir(&snapshots_dir)
1698        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1699        .find(|entry| entry.as_ref().unwrap().path().is_dir())
1700        .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1701        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1702        .path();
1703
1704    let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
1705    fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
1706
1707    let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1708    fs::hard_link(
1709        status_cache_file,
1710        slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
1711    )?;
1712
1713    write_snapshot_state_complete_file(slot_dir)?;
1714
1715    Ok(())
1716}
1717
1718/// Perform the common tasks when unarchiving a snapshot.  Handles creating the temporary
1719/// directories, untaring, reading the version file, and then returning those fields plus the
1720/// rebuilt storage
1721fn unarchive_snapshot(
1722    bank_snapshots_dir: impl AsRef<Path>,
1723    unpacked_snapshots_dir_prefix: &'static str,
1724    snapshot_archive_path: impl AsRef<Path>,
1725    measure_name: &'static str,
1726    account_paths: &[PathBuf],
1727    archive_format: ArchiveFormat,
1728    parallel_divisions: usize,
1729    next_append_vec_id: Arc<AtomicAccountsFileId>,
1730    storage_access: StorageAccess,
1731) -> Result<UnarchivedSnapshot> {
1732    let unpack_dir = tempfile::Builder::new()
1733        .prefix(unpacked_snapshots_dir_prefix)
1734        .tempdir_in(bank_snapshots_dir)?;
1735    let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
1736
1737    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1738    streaming_unarchive_snapshot(
1739        file_sender,
1740        account_paths.to_vec(),
1741        unpack_dir.path().to_path_buf(),
1742        snapshot_archive_path.as_ref().to_path_buf(),
1743        archive_format,
1744        parallel_divisions,
1745    );
1746
1747    let num_rebuilder_threads = num_cpus::get_physical()
1748        .saturating_sub(parallel_divisions)
1749        .max(1);
1750    let (version_and_storages, measure_untar) = measure_time!(
1751        SnapshotStorageRebuilder::rebuild_storage(
1752            file_receiver,
1753            num_rebuilder_threads,
1754            next_append_vec_id,
1755            SnapshotFrom::Archive,
1756            storage_access,
1757        )?,
1758        measure_name
1759    );
1760    info!("{}", measure_untar);
1761
1762    create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1763
1764    let RebuiltSnapshotStorage {
1765        snapshot_version,
1766        storage,
1767    } = version_and_storages;
1768    Ok(UnarchivedSnapshot {
1769        unpack_dir,
1770        storage,
1771        unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1772            unpacked_snapshots_dir,
1773            snapshot_version,
1774        },
1775        measure_untar,
1776    })
1777}
1778
1779/// Streams snapshot dir files across channel
1780/// Follow the flow of streaming_unarchive_snapshot(), but handle the from_dir case.
1781fn streaming_snapshot_dir_files(
1782    file_sender: Sender<PathBuf>,
1783    snapshot_file_path: impl Into<PathBuf>,
1784    snapshot_version_path: impl Into<PathBuf>,
1785    account_paths: &[PathBuf],
1786) -> Result<()> {
1787    file_sender.send(snapshot_file_path.into())?;
1788    file_sender.send(snapshot_version_path.into())?;
1789
1790    for account_path in account_paths {
1791        for file in fs::read_dir(account_path)? {
1792            file_sender.send(file?.path())?;
1793        }
1794    }
1795
1796    Ok(())
1797}
1798
1799/// Performs the common tasks when deserializing a snapshot
1800///
1801/// Handles reading the snapshot file and version file,
1802/// then returning those fields plus the rebuilt storages.
1803pub fn rebuild_storages_from_snapshot_dir(
1804    snapshot_info: &BankSnapshotInfo,
1805    account_paths: &[PathBuf],
1806    next_append_vec_id: Arc<AtomicAccountsFileId>,
1807    storage_access: StorageAccess,
1808) -> Result<AccountStorageMap> {
1809    let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1810    let accounts_hardlinks = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1811    let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1812
1813    let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1814        IoError::other(format!(
1815            "failed to read accounts hardlinks dir '{}': {err}",
1816            accounts_hardlinks.display(),
1817        ))
1818    })?;
1819    for dir_entry in read_dir {
1820        let symlink_path = dir_entry?.path();
1821        // The symlink point to <account_path>/snapshot/<slot> which contain the account files hardlinks
1822        // The corresponding run path should be <account_path>/run/
1823        let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
1824            IoError::other(format!(
1825                "failed to read symlink '{}': {err}",
1826                symlink_path.display(),
1827            ))
1828        })?;
1829        let account_run_path = account_snapshot_path
1830            .parent()
1831            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1832            .parent()
1833            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1834            .join(ACCOUNTS_RUN_DIR);
1835        if !account_run_paths.contains(&account_run_path) {
1836            // The appendvec from the bank snapshot storage does not match any of the provided account_paths set.
1837            // The accout paths have changed so the snapshot is no longer usable.
1838            return Err(SnapshotError::AccountPathsMismatch);
1839        }
1840        // Generate hard-links to make the account files available in the main accounts/, and let the new appendvec
1841        // paths be in accounts/
1842        let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
1843            IoError::other(format!(
1844                "failed to read account snapshot dir '{}': {err}",
1845                account_snapshot_path.display(),
1846            ))
1847        })?;
1848        for file in read_dir {
1849            let file_path = file?.path();
1850            let file_name = file_path
1851                .file_name()
1852                .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
1853            let dest_path = account_run_path.join(file_name);
1854            fs::hard_link(&file_path, &dest_path).map_err(|err| {
1855                IoError::other(format!(
1856                    "failed to hard link from '{}' to '{}': {err}",
1857                    file_path.display(),
1858                    dest_path.display(),
1859                ))
1860            })?;
1861        }
1862    }
1863
1864    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1865    let snapshot_file_path = &snapshot_info.snapshot_path();
1866    let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1867    streaming_snapshot_dir_files(
1868        file_sender,
1869        snapshot_file_path,
1870        snapshot_version_path,
1871        account_paths,
1872    )?;
1873
1874    let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1875    let version_and_storages = SnapshotStorageRebuilder::rebuild_storage(
1876        file_receiver,
1877        num_rebuilder_threads,
1878        next_append_vec_id,
1879        SnapshotFrom::Dir,
1880        storage_access,
1881    )?;
1882
1883    let RebuiltSnapshotStorage {
1884        snapshot_version: _,
1885        storage,
1886    } = version_and_storages;
1887    Ok(storage)
1888}
1889
1890/// Reads the `snapshot_version` from a file. Before opening the file, its size
1891/// is compared to `MAX_SNAPSHOT_VERSION_FILE_SIZE`. If the size exceeds this
1892/// threshold, it is not opened and an error is returned.
1893fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
1894    // Check file size.
1895    let file_metadata = fs::metadata(&path).map_err(|err| {
1896        IoError::other(format!(
1897            "failed to query snapshot version file metadata '{}': {err}",
1898            path.as_ref().display(),
1899        ))
1900    })?;
1901    let file_size = file_metadata.len();
1902    if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
1903        let error_message = format!(
1904            "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
1905            path.as_ref().display(),
1906            file_size,
1907            MAX_SNAPSHOT_VERSION_FILE_SIZE,
1908        );
1909        return Err(IoError::other(error_message).into());
1910    }
1911
1912    // Read snapshot_version from file.
1913    let mut snapshot_version = String::new();
1914    let mut file = fs::File::open(&path).map_err(|err| {
1915        IoError::other(format!(
1916            "failed to open snapshot version file '{}': {err}",
1917            path.as_ref().display()
1918        ))
1919    })?;
1920    file.read_to_string(&mut snapshot_version).map_err(|err| {
1921        IoError::other(format!(
1922            "failed to read snapshot version from file '{}': {err}",
1923            path.as_ref().display()
1924        ))
1925    })?;
1926
1927    Ok(snapshot_version.trim().to_string())
1928}
1929
1930/// Check if an incremental snapshot is compatible with a full snapshot.  This is done by checking
1931/// if the incremental snapshot's base slot is the same as the full snapshot's slot.
1932fn check_are_snapshots_compatible(
1933    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1934    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1935) -> Result<()> {
1936    if incremental_snapshot_archive_info.is_none() {
1937        return Ok(());
1938    }
1939
1940    let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
1941
1942    (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
1943        .then_some(())
1944        .ok_or_else(|| {
1945            SnapshotError::MismatchedBaseSlot(
1946                full_snapshot_archive_info.slot(),
1947                incremental_snapshot_archive_info.base_slot(),
1948            )
1949        })
1950}
1951
1952/// Get the `&str` from a `&Path`
1953pub fn path_to_file_name_str(path: &Path) -> Result<&str> {
1954    path.file_name()
1955        .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))?
1956        .to_str()
1957        .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf()))
1958}
1959
1960pub fn build_snapshot_archives_remote_dir(snapshot_archives_dir: impl AsRef<Path>) -> PathBuf {
1961    snapshot_archives_dir
1962        .as_ref()
1963        .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
1964}
1965
1966/// Build the full snapshot archive path from its components: the snapshot archives directory, the
1967/// snapshot slot, the accounts hash, and the archive format.
1968pub fn build_full_snapshot_archive_path(
1969    full_snapshot_archives_dir: impl AsRef<Path>,
1970    slot: Slot,
1971    hash: &SnapshotHash,
1972    archive_format: ArchiveFormat,
1973) -> PathBuf {
1974    full_snapshot_archives_dir.as_ref().join(format!(
1975        "snapshot-{}-{}.{}",
1976        slot,
1977        hash.0,
1978        archive_format.extension(),
1979    ))
1980}
1981
1982/// Build the incremental snapshot archive path from its components: the snapshot archives
1983/// directory, the snapshot base slot, the snapshot slot, the accounts hash, and the archive
1984/// format.
1985pub fn build_incremental_snapshot_archive_path(
1986    incremental_snapshot_archives_dir: impl AsRef<Path>,
1987    base_slot: Slot,
1988    slot: Slot,
1989    hash: &SnapshotHash,
1990    archive_format: ArchiveFormat,
1991) -> PathBuf {
1992    incremental_snapshot_archives_dir.as_ref().join(format!(
1993        "incremental-snapshot-{}-{}-{}.{}",
1994        base_slot,
1995        slot,
1996        hash.0,
1997        archive_format.extension(),
1998    ))
1999}
2000
2001/// Parse a full snapshot archive filename into its Slot, Hash, and Archive Format
2002pub(crate) fn parse_full_snapshot_archive_filename(
2003    archive_filename: &str,
2004) -> Result<(Slot, SnapshotHash, ArchiveFormat)> {
2005    static RE: std::sync::LazyLock<Regex> =
2006        std::sync::LazyLock::new(|| Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap());
2007
2008    let do_parse = || {
2009        RE.captures(archive_filename).and_then(|captures| {
2010            let slot = captures
2011                .name("slot")
2012                .map(|x| x.as_str().parse::<Slot>())?
2013                .ok()?;
2014            let hash = captures
2015                .name("hash")
2016                .map(|x| x.as_str().parse::<Hash>())?
2017                .ok()?;
2018            let archive_format = captures
2019                .name("ext")
2020                .map(|x| x.as_str().parse::<ArchiveFormat>())?
2021                .ok()?;
2022
2023            Some((slot, SnapshotHash(hash), archive_format))
2024        })
2025    };
2026
2027    do_parse().ok_or_else(|| {
2028        SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2029    })
2030}
2031
2032/// Parse an incremental snapshot archive filename into its base Slot, actual Slot, Hash, and Archive Format
2033pub(crate) fn parse_incremental_snapshot_archive_filename(
2034    archive_filename: &str,
2035) -> Result<(Slot, Slot, SnapshotHash, ArchiveFormat)> {
2036    static RE: std::sync::LazyLock<Regex> = std::sync::LazyLock::new(|| {
2037        Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap()
2038    });
2039
2040    let do_parse = || {
2041        RE.captures(archive_filename).and_then(|captures| {
2042            let base_slot = captures
2043                .name("base")
2044                .map(|x| x.as_str().parse::<Slot>())?
2045                .ok()?;
2046            let slot = captures
2047                .name("slot")
2048                .map(|x| x.as_str().parse::<Slot>())?
2049                .ok()?;
2050            let hash = captures
2051                .name("hash")
2052                .map(|x| x.as_str().parse::<Hash>())?
2053                .ok()?;
2054            let archive_format = captures
2055                .name("ext")
2056                .map(|x| x.as_str().parse::<ArchiveFormat>())?
2057                .ok()?;
2058
2059            Some((base_slot, slot, SnapshotHash(hash), archive_format))
2060        })
2061    };
2062
2063    do_parse().ok_or_else(|| {
2064        SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2065    })
2066}
2067
2068/// Walk down the snapshot archive to collect snapshot archive file info
2069fn get_snapshot_archives<T, F>(snapshot_archives_dir: &Path, cb: F) -> Vec<T>
2070where
2071    F: Fn(PathBuf) -> Result<T>,
2072{
2073    let walk_dir = |dir: &Path| -> Vec<T> {
2074        let entry_iter = fs::read_dir(dir);
2075        match entry_iter {
2076            Err(err) => {
2077                info!(
2078                    "Unable to read snapshot archives directory '{}': {err}",
2079                    dir.display(),
2080                );
2081                vec![]
2082            }
2083            Ok(entries) => entries
2084                .filter_map(|entry| entry.map_or(None, |entry| cb(entry.path()).ok()))
2085                .collect(),
2086        }
2087    };
2088
2089    let mut ret = walk_dir(snapshot_archives_dir);
2090    let remote_dir = build_snapshot_archives_remote_dir(snapshot_archives_dir);
2091    if remote_dir.exists() {
2092        ret.append(&mut walk_dir(remote_dir.as_ref()));
2093    }
2094    ret
2095}
2096
2097/// Get a list of the full snapshot archives from a directory
2098pub fn get_full_snapshot_archives(
2099    full_snapshot_archives_dir: impl AsRef<Path>,
2100) -> Vec<FullSnapshotArchiveInfo> {
2101    get_snapshot_archives(
2102        full_snapshot_archives_dir.as_ref(),
2103        FullSnapshotArchiveInfo::new_from_path,
2104    )
2105}
2106
2107/// Get a list of the incremental snapshot archives from a directory
2108pub fn get_incremental_snapshot_archives(
2109    incremental_snapshot_archives_dir: impl AsRef<Path>,
2110) -> Vec<IncrementalSnapshotArchiveInfo> {
2111    get_snapshot_archives(
2112        incremental_snapshot_archives_dir.as_ref(),
2113        IncrementalSnapshotArchiveInfo::new_from_path,
2114    )
2115}
2116
2117/// Get the highest slot of the full snapshot archives in a directory
2118pub fn get_highest_full_snapshot_archive_slot(
2119    full_snapshot_archives_dir: impl AsRef<Path>,
2120) -> Option<Slot> {
2121    get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
2122        .map(|full_snapshot_archive_info| full_snapshot_archive_info.slot())
2123}
2124
2125/// Get the highest slot of the incremental snapshot archives in a directory, for a given full
2126/// snapshot slot
2127pub fn get_highest_incremental_snapshot_archive_slot(
2128    incremental_snapshot_archives_dir: impl AsRef<Path>,
2129    full_snapshot_slot: Slot,
2130) -> Option<Slot> {
2131    get_highest_incremental_snapshot_archive_info(
2132        incremental_snapshot_archives_dir,
2133        full_snapshot_slot,
2134    )
2135    .map(|incremental_snapshot_archive_info| incremental_snapshot_archive_info.slot())
2136}
2137
2138/// Get the path (and metadata) for the full snapshot archive with the highest slot in a directory
2139pub fn get_highest_full_snapshot_archive_info(
2140    full_snapshot_archives_dir: impl AsRef<Path>,
2141) -> Option<FullSnapshotArchiveInfo> {
2142    let mut full_snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2143    full_snapshot_archives.sort_unstable();
2144    full_snapshot_archives.into_iter().next_back()
2145}
2146
2147/// Get the path for the incremental snapshot archive with the highest slot, for a given full
2148/// snapshot slot, in a directory
2149pub fn get_highest_incremental_snapshot_archive_info(
2150    incremental_snapshot_archives_dir: impl AsRef<Path>,
2151    full_snapshot_slot: Slot,
2152) -> Option<IncrementalSnapshotArchiveInfo> {
2153    // Since we want to filter down to only the incremental snapshot archives that have the same
2154    // full snapshot slot as the value passed in, perform the filtering before sorting to avoid
2155    // doing unnecessary work.
2156    let mut incremental_snapshot_archives =
2157        get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
2158            .into_iter()
2159            .filter(|incremental_snapshot_archive_info| {
2160                incremental_snapshot_archive_info.base_slot() == full_snapshot_slot
2161            })
2162            .collect::<Vec<_>>();
2163    incremental_snapshot_archives.sort_unstable();
2164    incremental_snapshot_archives.into_iter().next_back()
2165}
2166
2167pub fn purge_old_snapshot_archives(
2168    full_snapshot_archives_dir: impl AsRef<Path>,
2169    incremental_snapshot_archives_dir: impl AsRef<Path>,
2170    maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2171    maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2172) {
2173    info!(
2174        "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
2175        full_snapshot_archives_dir.as_ref().display(),
2176        maximum_full_snapshot_archives_to_retain
2177    );
2178
2179    let mut full_snapshot_archives = get_full_snapshot_archives(&full_snapshot_archives_dir);
2180    full_snapshot_archives.sort_unstable();
2181    full_snapshot_archives.reverse();
2182
2183    let num_to_retain = full_snapshot_archives
2184        .len()
2185        .min(maximum_full_snapshot_archives_to_retain.get());
2186    trace!(
2187        "There are {} full snapshot archives, retaining {}",
2188        full_snapshot_archives.len(),
2189        num_to_retain,
2190    );
2191
2192    let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
2193        if full_snapshot_archives.is_empty() {
2194            None
2195        } else {
2196            Some(full_snapshot_archives.split_at(num_to_retain))
2197        }
2198        .unwrap_or_default();
2199
2200    let retained_full_snapshot_slots = full_snapshot_archives_to_retain
2201        .iter()
2202        .map(|ai| ai.slot())
2203        .collect::<HashSet<_>>();
2204
2205    fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
2206        for path in archives.iter().map(|a| a.path()) {
2207            trace!("Removing snapshot archive: {}", path.display());
2208            let result = fs::remove_file(path);
2209            if let Err(err) = result {
2210                info!(
2211                    "Failed to remove snapshot archive '{}': {err}",
2212                    path.display()
2213                );
2214            }
2215        }
2216    }
2217    remove_archives(full_snapshot_archives_to_remove);
2218
2219    info!(
2220        "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
2221        incremental_snapshot_archives_dir.as_ref().display(),
2222        maximum_incremental_snapshot_archives_to_retain
2223    );
2224    let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
2225    for incremental_snapshot_archive in
2226        get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
2227    {
2228        incremental_snapshot_archives_by_base_slot
2229            .entry(incremental_snapshot_archive.base_slot())
2230            .or_default()
2231            .push(incremental_snapshot_archive)
2232    }
2233
2234    let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
2235    for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
2236    {
2237        incremental_snapshot_archives.sort_unstable();
2238        let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
2239            maximum_incremental_snapshot_archives_to_retain.get()
2240        } else {
2241            usize::from(retained_full_snapshot_slots.contains(&base_slot))
2242        };
2243        trace!(
2244            "There are {} incremental snapshot archives for base slot {}, removing {} of them",
2245            incremental_snapshot_archives.len(),
2246            base_slot,
2247            incremental_snapshot_archives
2248                .len()
2249                .saturating_sub(num_to_retain),
2250        );
2251
2252        incremental_snapshot_archives.truncate(
2253            incremental_snapshot_archives
2254                .len()
2255                .saturating_sub(num_to_retain),
2256        );
2257        remove_archives(&incremental_snapshot_archives);
2258    }
2259}
2260
2261#[cfg(feature = "dev-context-only-utils")]
2262fn unpack_snapshot_local(
2263    shared_buffer: SharedBuffer,
2264    ledger_dir: &Path,
2265    account_paths: &[PathBuf],
2266    parallel_divisions: usize,
2267) -> Result<UnpackedAppendVecMap> {
2268    assert!(parallel_divisions > 0);
2269
2270    // allocate all readers before any readers start reading
2271    let readers = (0..parallel_divisions)
2272        .map(|_| SharedBufferReader::new(&shared_buffer))
2273        .collect::<Vec<_>>();
2274
2275    // create 'parallel_divisions' # of parallel workers, each responsible for 1/parallel_divisions of all the files to extract.
2276    let all_unpacked_append_vec_map = readers
2277        .into_par_iter()
2278        .enumerate()
2279        .map(|(index, reader)| {
2280            let parallel_selector = Some(ParallelSelector {
2281                index,
2282                divisions: parallel_divisions,
2283            });
2284            let mut archive = Archive::new(reader);
2285            hardened_unpack::unpack_snapshot(
2286                &mut archive,
2287                ledger_dir,
2288                account_paths,
2289                parallel_selector,
2290            )
2291        })
2292        .collect::<Vec<_>>();
2293
2294    let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
2295    for h in all_unpacked_append_vec_map {
2296        unpacked_append_vec_map.extend(h?);
2297    }
2298
2299    Ok(unpacked_append_vec_map)
2300}
2301
2302fn untar_snapshot_create_shared_buffer(
2303    snapshot_tar: &Path,
2304    archive_format: ArchiveFormat,
2305) -> SharedBuffer {
2306    let open_file = || {
2307        fs::File::open(snapshot_tar)
2308            .map_err(|err| {
2309                IoError::other(format!(
2310                    "failed to open snapshot archive '{}': {err}",
2311                    snapshot_tar.display(),
2312                ))
2313            })
2314            .unwrap()
2315    };
2316    // Apply buffered reader for decoders that do not buffer internally.
2317    match archive_format {
2318        ArchiveFormat::TarBzip2 => SharedBuffer::new(BzDecoder::new(open_file())),
2319        ArchiveFormat::TarGzip => SharedBuffer::new(GzDecoder::new(open_file())),
2320        ArchiveFormat::TarZstd { .. } => {
2321            SharedBuffer::new(zstd::stream::read::Decoder::new(open_file()).unwrap())
2322        }
2323        ArchiveFormat::TarLz4 => SharedBuffer::new(lz4::Decoder::new(open_file()).unwrap()),
2324        ArchiveFormat::Tar => SharedBuffer::new(BufReader::new(open_file())),
2325    }
2326}
2327
2328#[cfg(feature = "dev-context-only-utils")]
2329fn untar_snapshot_in(
2330    snapshot_tar: impl AsRef<Path>,
2331    unpack_dir: &Path,
2332    account_paths: &[PathBuf],
2333    archive_format: ArchiveFormat,
2334    parallel_divisions: usize,
2335) -> Result<UnpackedAppendVecMap> {
2336    let shared_buffer = untar_snapshot_create_shared_buffer(snapshot_tar.as_ref(), archive_format);
2337    unpack_snapshot_local(shared_buffer, unpack_dir, account_paths, parallel_divisions)
2338}
2339
2340pub fn verify_unpacked_snapshots_dir_and_version(
2341    unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
2342) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
2343    info!(
2344        "snapshot version: {}",
2345        &unpacked_snapshots_dir_and_version.snapshot_version
2346    );
2347
2348    let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
2349    let mut bank_snapshots =
2350        get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
2351    if bank_snapshots.len() > 1 {
2352        return Err(IoError::other(format!(
2353            "invalid snapshot format: only one snapshot allowed, but found {}",
2354            bank_snapshots.len(),
2355        ))
2356        .into());
2357    }
2358    let root_paths = bank_snapshots.pop().ok_or_else(|| {
2359        IoError::other(format!(
2360            "no snapshots found in snapshots directory '{}'",
2361            unpacked_snapshots_dir_and_version
2362                .unpacked_snapshots_dir
2363                .display(),
2364        ))
2365    })?;
2366    Ok((snapshot_version, root_paths))
2367}
2368
2369/// Returns the file name of the bank snapshot for `slot`
2370pub fn get_snapshot_file_name(slot: Slot) -> String {
2371    slot.to_string()
2372}
2373
2374/// Constructs the path to the bank snapshot directory for `slot` within `bank_snapshots_dir`
2375pub fn get_bank_snapshot_dir(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) -> PathBuf {
2376    bank_snapshots_dir
2377        .as_ref()
2378        .join(get_snapshot_file_name(slot))
2379}
2380
2381#[derive(Debug, Copy, Clone)]
2382/// allow tests to specify what happened to the serialized format
2383pub enum VerifyBank {
2384    /// the bank's serialized format is expected to be identical to what we are comparing against
2385    Deterministic,
2386    /// the serialized bank was 'reserialized' into a non-deterministic format
2387    /// so, deserialize both files and compare deserialized results
2388    NonDeterministic,
2389}
2390
2391#[cfg(feature = "dev-context-only-utils")]
2392pub fn verify_snapshot_archive(
2393    snapshot_archive: impl AsRef<Path>,
2394    snapshots_to_verify: impl AsRef<Path>,
2395    archive_format: ArchiveFormat,
2396    verify_bank: VerifyBank,
2397    slot: Slot,
2398) {
2399    let temp_dir = tempfile::TempDir::new().unwrap();
2400    let unpack_dir = temp_dir.path();
2401    let unpack_account_dir = create_accounts_run_and_snapshot_dirs(unpack_dir).unwrap().0;
2402    untar_snapshot_in(
2403        snapshot_archive,
2404        unpack_dir,
2405        &[unpack_account_dir.clone()],
2406        archive_format,
2407        1,
2408    )
2409    .unwrap();
2410
2411    // Check snapshots are the same
2412    let unpacked_snapshots = unpack_dir.join("snapshots");
2413
2414    // Since the unpack code collects all the appendvecs into one directory unpack_account_dir, we need to
2415    // collect all the appendvecs in account_paths/<slot>/snapshot/ into one directory for later comparison.
2416    let storages_to_verify = unpack_dir.join("storages_to_verify");
2417    // Create the directory if it doesn't exist
2418    fs::create_dir_all(&storages_to_verify).unwrap();
2419
2420    let slot = slot.to_string();
2421    let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
2422
2423    if let VerifyBank::NonDeterministic = verify_bank {
2424        // file contents may be different, but deserialized structs should be equal
2425        let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
2426        let p2 = unpacked_snapshots.join(&slot).join(&slot);
2427        assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
2428        fs::remove_file(p1).unwrap();
2429        fs::remove_file(p2).unwrap();
2430    }
2431
2432    // The new the status_cache file is inside the slot directory together with the snapshot file.
2433    // When unpacking an archive, the status_cache file from the archive is one-level up outside of
2434    //  the slot directory.
2435    // The unpacked status_cache file need to be put back into the slot directory for the directory
2436    // comparison to pass.
2437    let existing_unpacked_status_cache_file =
2438        unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2439    let new_unpacked_status_cache_file = unpacked_snapshots
2440        .join(&slot)
2441        .join(SNAPSHOT_STATUS_CACHE_FILENAME);
2442    fs::rename(
2443        existing_unpacked_status_cache_file,
2444        new_unpacked_status_cache_file,
2445    )
2446    .unwrap();
2447
2448    let accounts_hardlinks_dir = snapshot_slot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2449    if accounts_hardlinks_dir.is_dir() {
2450        // This directory contain symlinks to all <account_path>/snapshot/<slot> directories.
2451        for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
2452            let link_dst_path = fs::read_link(entry.unwrap().path()).unwrap();
2453            // Copy all the files in dst_path into the storages_to_verify directory.
2454            for entry in fs::read_dir(&link_dst_path).unwrap() {
2455                let src_path = entry.unwrap().path();
2456                let dst_path = storages_to_verify.join(src_path.file_name().unwrap());
2457                fs::copy(src_path, dst_path).unwrap();
2458            }
2459        }
2460        fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
2461    }
2462
2463    let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
2464    if version_path.is_file() {
2465        fs::remove_file(version_path).unwrap();
2466    }
2467
2468    let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2469    if state_complete_path.is_file() {
2470        fs::remove_file(state_complete_path).unwrap();
2471    }
2472
2473    assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
2474
2475    // In the unarchiving case, there is an extra empty "accounts" directory. The account
2476    // files in the archive accounts/ have been expanded to [account_paths].
2477    // Remove the empty "accounts" directory for the directory comparison below.
2478    // In some test cases the directory to compare do not come from unarchiving.
2479    // Ignore the error when this directory does not exist.
2480    _ = fs::remove_dir(unpack_account_dir.join("accounts"));
2481    // Check the account entries are the same
2482    assert!(!dir_diff::is_different(&storages_to_verify, unpack_account_dir).unwrap());
2483}
2484
2485/// Purges all bank snapshots
2486pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
2487    let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2488    purge_bank_snapshots(&bank_snapshots);
2489}
2490
2491/// Purges bank snapshots, retaining the newest `num_bank_snapshots_to_retain`
2492pub fn purge_old_bank_snapshots(
2493    bank_snapshots_dir: impl AsRef<Path>,
2494    num_bank_snapshots_to_retain: usize,
2495    filter_by_kind: Option<BankSnapshotKind>,
2496) {
2497    let mut bank_snapshots = match filter_by_kind {
2498        Some(BankSnapshotKind::Pre) => get_bank_snapshots_pre(&bank_snapshots_dir),
2499        Some(BankSnapshotKind::Post) => get_bank_snapshots_post(&bank_snapshots_dir),
2500        None => get_bank_snapshots(&bank_snapshots_dir),
2501    };
2502
2503    bank_snapshots.sort_unstable();
2504    purge_bank_snapshots(
2505        bank_snapshots
2506            .iter()
2507            .rev()
2508            .skip(num_bank_snapshots_to_retain),
2509    );
2510}
2511
2512/// At startup, purge old (i.e. unusable) bank snapshots
2513///
2514/// Only a single bank snapshot could be needed at startup (when using fast boot), so
2515/// retain the highest bank snapshot "post", and purge the rest.
2516pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
2517    purge_old_bank_snapshots(&bank_snapshots_dir, 0, Some(BankSnapshotKind::Pre));
2518    purge_old_bank_snapshots(&bank_snapshots_dir, 1, Some(BankSnapshotKind::Post));
2519
2520    let highest_bank_snapshot_post = get_highest_bank_snapshot_post(&bank_snapshots_dir);
2521    if let Some(highest_bank_snapshot_post) = highest_bank_snapshot_post {
2522        debug!(
2523            "Retained bank snapshot for slot {}, and purged the rest.",
2524            highest_bank_snapshot_post.slot
2525        );
2526    }
2527}
2528
2529/// Purges bank snapshots that are older than `slot`
2530pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
2531    let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2532    bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
2533    purge_bank_snapshots(&bank_snapshots);
2534}
2535
2536/// Purges all `bank_snapshots`
2537///
2538/// Does not exit early if there is an error while purging a bank snapshot.
2539fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
2540    for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
2541        if purge_bank_snapshot(snapshot_dir).is_err() {
2542            warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
2543        }
2544    }
2545}
2546
2547/// Remove the bank snapshot at this path
2548pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
2549    const FN_ERR: &str = "failed to purge bank snapshot";
2550    let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2551    if accounts_hardlinks_dir.is_dir() {
2552        // This directory contain symlinks to all accounts snapshot directories.
2553        // They should all be removed.
2554        let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
2555            IoError::other(format!(
2556                "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
2557                accounts_hardlinks_dir.display(),
2558            ))
2559        })?;
2560        for entry in read_dir {
2561            let accounts_hardlink_dir = entry?.path();
2562            let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
2563                IoError::other(format!(
2564                    "{FN_ERR}: failed to read symlink '{}': {err}",
2565                    accounts_hardlink_dir.display(),
2566                ))
2567            })?;
2568            move_and_async_delete_path(&accounts_hardlink_dir);
2569        }
2570    }
2571    fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
2572        IoError::other(format!(
2573            "{FN_ERR}: failed to remove dir '{}': {err}",
2574            bank_snapshot_dir.as_ref().display(),
2575        ))
2576    })?;
2577    Ok(())
2578}
2579
2580pub fn should_take_full_snapshot(
2581    block_height: Slot,
2582    full_snapshot_archive_interval_slots: Slot,
2583) -> bool {
2584    block_height % full_snapshot_archive_interval_slots == 0
2585}
2586
2587pub fn should_take_incremental_snapshot(
2588    block_height: Slot,
2589    incremental_snapshot_archive_interval_slots: Slot,
2590    latest_full_snapshot_slot: Option<Slot>,
2591) -> bool {
2592    block_height % incremental_snapshot_archive_interval_slots == 0
2593        && latest_full_snapshot_slot.is_some()
2594}
2595
2596/// Creates an "accounts path" directory for tests
2597///
2598/// This temporary directory will contain the "run" and "snapshot"
2599/// sub-directories required by a validator.
2600#[cfg(feature = "dev-context-only-utils")]
2601pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
2602    let tmp_dir = tempfile::TempDir::new().unwrap();
2603    let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
2604    (tmp_dir, account_dir)
2605}
2606
2607#[cfg(test)]
2608mod tests {
2609    use {
2610        super::*,
2611        assert_matches::assert_matches,
2612        bincode::{deserialize_from, serialize_into},
2613        std::{convert::TryFrom, mem::size_of},
2614        tempfile::NamedTempFile,
2615    };
2616
2617    #[test]
2618    fn test_serialize_snapshot_data_file_under_limit() {
2619        let temp_dir = tempfile::TempDir::new().unwrap();
2620        let expected_consumed_size = size_of::<u32>() as u64;
2621        let consumed_size = serialize_snapshot_data_file_capped(
2622            &temp_dir.path().join("data-file"),
2623            expected_consumed_size,
2624            |stream| {
2625                serialize_into(stream, &2323_u32)?;
2626                Ok(())
2627            },
2628        )
2629        .unwrap();
2630        assert_eq!(consumed_size, expected_consumed_size);
2631    }
2632
2633    #[test]
2634    fn test_serialize_snapshot_data_file_over_limit() {
2635        let temp_dir = tempfile::TempDir::new().unwrap();
2636        let expected_consumed_size = size_of::<u32>() as u64;
2637        let result = serialize_snapshot_data_file_capped(
2638            &temp_dir.path().join("data-file"),
2639            expected_consumed_size - 1,
2640            |stream| {
2641                serialize_into(stream, &2323_u32)?;
2642                Ok(())
2643            },
2644        );
2645        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
2646    }
2647
2648    #[test]
2649    fn test_deserialize_snapshot_data_file_under_limit() {
2650        let expected_data = 2323_u32;
2651        let expected_consumed_size = size_of::<u32>() as u64;
2652
2653        let temp_dir = tempfile::TempDir::new().unwrap();
2654        serialize_snapshot_data_file_capped(
2655            &temp_dir.path().join("data-file"),
2656            expected_consumed_size,
2657            |stream| {
2658                serialize_into(stream, &expected_data)?;
2659                Ok(())
2660            },
2661        )
2662        .unwrap();
2663
2664        let snapshot_root_paths = SnapshotRootPaths {
2665            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2666            incremental_snapshot_root_file_path: None,
2667        };
2668
2669        let actual_data = deserialize_snapshot_data_files_capped(
2670            &snapshot_root_paths,
2671            expected_consumed_size,
2672            |stream| {
2673                Ok(deserialize_from::<_, u32>(
2674                    &mut stream.full_snapshot_stream,
2675                )?)
2676            },
2677        )
2678        .unwrap();
2679        assert_eq!(actual_data, expected_data);
2680    }
2681
2682    #[test]
2683    fn test_deserialize_snapshot_data_file_over_limit() {
2684        let expected_data = 2323_u32;
2685        let expected_consumed_size = size_of::<u32>() as u64;
2686
2687        let temp_dir = tempfile::TempDir::new().unwrap();
2688        serialize_snapshot_data_file_capped(
2689            &temp_dir.path().join("data-file"),
2690            expected_consumed_size,
2691            |stream| {
2692                serialize_into(stream, &expected_data)?;
2693                Ok(())
2694            },
2695        )
2696        .unwrap();
2697
2698        let snapshot_root_paths = SnapshotRootPaths {
2699            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2700            incremental_snapshot_root_file_path: None,
2701        };
2702
2703        let result = deserialize_snapshot_data_files_capped(
2704            &snapshot_root_paths,
2705            expected_consumed_size - 1,
2706            |stream| {
2707                Ok(deserialize_from::<_, u32>(
2708                    &mut stream.full_snapshot_stream,
2709                )?)
2710            },
2711        );
2712        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
2713    }
2714
2715    #[test]
2716    fn test_deserialize_snapshot_data_file_extra_data() {
2717        let expected_data = 2323_u32;
2718        let expected_consumed_size = size_of::<u32>() as u64;
2719
2720        let temp_dir = tempfile::TempDir::new().unwrap();
2721        serialize_snapshot_data_file_capped(
2722            &temp_dir.path().join("data-file"),
2723            expected_consumed_size * 2,
2724            |stream| {
2725                serialize_into(stream.by_ref(), &expected_data)?;
2726                serialize_into(stream.by_ref(), &expected_data)?;
2727                Ok(())
2728            },
2729        )
2730        .unwrap();
2731
2732        let snapshot_root_paths = SnapshotRootPaths {
2733            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2734            incremental_snapshot_root_file_path: None,
2735        };
2736
2737        let result = deserialize_snapshot_data_files_capped(
2738            &snapshot_root_paths,
2739            expected_consumed_size * 2,
2740            |stream| {
2741                Ok(deserialize_from::<_, u32>(
2742                    &mut stream.full_snapshot_stream,
2743                )?)
2744            },
2745        );
2746        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2747    }
2748
2749    #[test]
2750    fn test_snapshot_version_from_file_under_limit() {
2751        let file_content = SnapshotVersion::default().as_str();
2752        let mut file = NamedTempFile::new().unwrap();
2753        file.write_all(file_content.as_bytes()).unwrap();
2754        let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2755        assert_eq!(version_from_file, file_content);
2756    }
2757
2758    #[test]
2759    fn test_snapshot_version_from_file_over_limit() {
2760        let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2761        let file_content = vec![7u8; over_limit_size];
2762        let mut file = NamedTempFile::new().unwrap();
2763        file.write_all(&file_content).unwrap();
2764        assert_matches!(
2765            snapshot_version_from_file(file.path()),
2766            Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("snapshot version file too large")
2767        );
2768    }
2769
2770    #[test]
2771    fn test_parse_full_snapshot_archive_filename() {
2772        assert_eq!(
2773            parse_full_snapshot_archive_filename(&format!(
2774                "snapshot-42-{}.tar.bz2",
2775                Hash::default()
2776            ))
2777            .unwrap(),
2778            (42, SnapshotHash(Hash::default()), ArchiveFormat::TarBzip2)
2779        );
2780        assert_eq!(
2781            parse_full_snapshot_archive_filename(&format!(
2782                "snapshot-43-{}.tar.zst",
2783                Hash::default()
2784            ))
2785            .unwrap(),
2786            (
2787                43,
2788                SnapshotHash(Hash::default()),
2789                ArchiveFormat::TarZstd {
2790                    config: ZstdConfig::default(),
2791                }
2792            )
2793        );
2794        assert_eq!(
2795            parse_full_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default()))
2796                .unwrap(),
2797            (44, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2798        );
2799        assert_eq!(
2800            parse_full_snapshot_archive_filename(&format!(
2801                "snapshot-45-{}.tar.lz4",
2802                Hash::default()
2803            ))
2804            .unwrap(),
2805            (45, SnapshotHash(Hash::default()), ArchiveFormat::TarLz4)
2806        );
2807
2808        assert!(parse_full_snapshot_archive_filename("invalid").is_err());
2809        assert!(
2810            parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err()
2811        );
2812
2813        assert!(
2814            parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err()
2815        );
2816        assert!(parse_full_snapshot_archive_filename(&format!(
2817            "snapshot-12345678-{}.bad!ext",
2818            Hash::new_unique()
2819        ))
2820        .is_err());
2821        assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2822
2823        assert!(parse_full_snapshot_archive_filename(&format!(
2824            "snapshot-bad!slot-{}.bad!ext",
2825            Hash::new_unique()
2826        ))
2827        .is_err());
2828        assert!(parse_full_snapshot_archive_filename(&format!(
2829            "snapshot-12345678-{}.bad!ext",
2830            Hash::new_unique()
2831        ))
2832        .is_err());
2833        assert!(parse_full_snapshot_archive_filename(&format!(
2834            "snapshot-bad!slot-{}.tar",
2835            Hash::new_unique()
2836        ))
2837        .is_err());
2838
2839        assert!(parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_err());
2840        assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2841        assert!(parse_full_snapshot_archive_filename(&format!(
2842            "snapshot-bad!slot-{}.tar",
2843            Hash::new_unique()
2844        ))
2845        .is_err());
2846    }
2847
2848    #[test]
2849    fn test_parse_incremental_snapshot_archive_filename() {
2850        assert_eq!(
2851            parse_incremental_snapshot_archive_filename(&format!(
2852                "incremental-snapshot-42-123-{}.tar.bz2",
2853                Hash::default()
2854            ))
2855            .unwrap(),
2856            (
2857                42,
2858                123,
2859                SnapshotHash(Hash::default()),
2860                ArchiveFormat::TarBzip2
2861            )
2862        );
2863        assert_eq!(
2864            parse_incremental_snapshot_archive_filename(&format!(
2865                "incremental-snapshot-43-234-{}.tar.zst",
2866                Hash::default()
2867            ))
2868            .unwrap(),
2869            (
2870                43,
2871                234,
2872                SnapshotHash(Hash::default()),
2873                ArchiveFormat::TarZstd {
2874                    config: ZstdConfig::default(),
2875                }
2876            )
2877        );
2878        assert_eq!(
2879            parse_incremental_snapshot_archive_filename(&format!(
2880                "incremental-snapshot-44-345-{}.tar",
2881                Hash::default()
2882            ))
2883            .unwrap(),
2884            (44, 345, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2885        );
2886        assert_eq!(
2887            parse_incremental_snapshot_archive_filename(&format!(
2888                "incremental-snapshot-45-456-{}.tar.lz4",
2889                Hash::default()
2890            ))
2891            .unwrap(),
2892            (
2893                45,
2894                456,
2895                SnapshotHash(Hash::default()),
2896                ArchiveFormat::TarLz4
2897            )
2898        );
2899
2900        assert!(parse_incremental_snapshot_archive_filename("invalid").is_err());
2901        assert!(parse_incremental_snapshot_archive_filename(&format!(
2902            "snapshot-42-{}.tar",
2903            Hash::new_unique()
2904        ))
2905        .is_err());
2906        assert!(parse_incremental_snapshot_archive_filename(
2907            "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext"
2908        )
2909        .is_err());
2910
2911        assert!(parse_incremental_snapshot_archive_filename(&format!(
2912            "incremental-snapshot-bad!slot-56785678-{}.tar",
2913            Hash::new_unique()
2914        ))
2915        .is_err());
2916
2917        assert!(parse_incremental_snapshot_archive_filename(&format!(
2918            "incremental-snapshot-12345678-bad!slot-{}.tar",
2919            Hash::new_unique()
2920        ))
2921        .is_err());
2922
2923        assert!(parse_incremental_snapshot_archive_filename(
2924            "incremental-snapshot-12341234-56785678-bad!HASH.tar"
2925        )
2926        .is_err());
2927
2928        assert!(parse_incremental_snapshot_archive_filename(&format!(
2929            "incremental-snapshot-12341234-56785678-{}.bad!ext",
2930            Hash::new_unique()
2931        ))
2932        .is_err());
2933    }
2934
2935    #[test]
2936    fn test_check_are_snapshots_compatible() {
2937        let slot1: Slot = 1234;
2938        let slot2: Slot = 5678;
2939        let slot3: Slot = 999_999;
2940
2941        let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
2942            format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()),
2943        ))
2944        .unwrap();
2945
2946        assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
2947
2948        let incremental_snapshot_archive_info =
2949            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2950                "/dir/incremental-snapshot-{}-{}-{}.tar",
2951                slot1,
2952                slot2,
2953                Hash::new_unique()
2954            )))
2955            .unwrap();
2956
2957        assert!(check_are_snapshots_compatible(
2958            &full_snapshot_archive_info,
2959            Some(&incremental_snapshot_archive_info)
2960        )
2961        .is_ok());
2962
2963        let incremental_snapshot_archive_info =
2964            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2965                "/dir/incremental-snapshot-{}-{}-{}.tar",
2966                slot2,
2967                slot3,
2968                Hash::new_unique()
2969            )))
2970            .unwrap();
2971
2972        assert!(check_are_snapshots_compatible(
2973            &full_snapshot_archive_info,
2974            Some(&incremental_snapshot_archive_info)
2975        )
2976        .is_err());
2977    }
2978
2979    /// A test heler function that creates bank snapshot files
2980    fn common_create_bank_snapshot_files(
2981        bank_snapshots_dir: &Path,
2982        min_slot: Slot,
2983        max_slot: Slot,
2984    ) {
2985        for slot in min_slot..max_slot {
2986            let snapshot_dir = get_bank_snapshot_dir(bank_snapshots_dir, slot);
2987            fs::create_dir_all(&snapshot_dir).unwrap();
2988
2989            let snapshot_filename = get_snapshot_file_name(slot);
2990            let snapshot_path = snapshot_dir.join(snapshot_filename);
2991            fs::File::create(snapshot_path).unwrap();
2992
2993            let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2994            fs::File::create(status_cache_file).unwrap();
2995
2996            let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
2997            fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
2998
2999            // Mark this directory complete so it can be used.  Check this flag first before selecting for deserialization.
3000            write_snapshot_state_complete_file(snapshot_dir).unwrap();
3001        }
3002    }
3003
3004    #[test]
3005    fn test_get_bank_snapshots() {
3006        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
3007        let min_slot = 10;
3008        let max_slot = 20;
3009        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
3010
3011        let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
3012        assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
3013    }
3014
3015    #[test]
3016    fn test_get_highest_bank_snapshot_post() {
3017        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
3018        let min_slot = 99;
3019        let max_slot = 123;
3020        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
3021
3022        let highest_bank_snapshot = get_highest_bank_snapshot_post(temp_snapshots_dir.path());
3023        assert!(highest_bank_snapshot.is_some());
3024        assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
3025    }
3026
3027    /// A test helper function that creates full and incremental snapshot archive files.  Creates
3028    /// full snapshot files in the range (`min_full_snapshot_slot`, `max_full_snapshot_slot`], and
3029    /// incremental snapshot files in the range (`min_incremental_snapshot_slot`,
3030    /// `max_incremental_snapshot_slot`].  Additionally, "bad" files are created for both full and
3031    /// incremental snapshots to ensure the tests properly filter them out.
3032    fn common_create_snapshot_archive_files(
3033        full_snapshot_archives_dir: &Path,
3034        incremental_snapshot_archives_dir: &Path,
3035        min_full_snapshot_slot: Slot,
3036        max_full_snapshot_slot: Slot,
3037        min_incremental_snapshot_slot: Slot,
3038        max_incremental_snapshot_slot: Slot,
3039    ) {
3040        fs::create_dir_all(full_snapshot_archives_dir).unwrap();
3041        fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
3042        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3043            for incremental_snapshot_slot in
3044                min_incremental_snapshot_slot..max_incremental_snapshot_slot
3045            {
3046                let snapshot_filename = format!(
3047                    "incremental-snapshot-{}-{}-{}.tar",
3048                    full_snapshot_slot,
3049                    incremental_snapshot_slot,
3050                    Hash::default()
3051                );
3052                let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
3053                fs::File::create(snapshot_filepath).unwrap();
3054            }
3055
3056            let snapshot_filename =
3057                format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3058            let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
3059            fs::File::create(snapshot_filepath).unwrap();
3060
3061            // Add in an incremental snapshot with a bad filename and high slot to ensure filename are filtered and sorted correctly
3062            let bad_filename = format!(
3063                "incremental-snapshot-{}-{}-bad!hash.tar",
3064                full_snapshot_slot,
3065                max_incremental_snapshot_slot + 1,
3066            );
3067            let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
3068            fs::File::create(bad_filepath).unwrap();
3069        }
3070
3071        // Add in a snapshot with a bad filename and high slot to ensure filename are filtered and
3072        // sorted correctly
3073        let bad_filename = format!("snapshot-{}-bad!hash.tar", max_full_snapshot_slot + 1);
3074        let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
3075        fs::File::create(bad_filepath).unwrap();
3076    }
3077
3078    #[test]
3079    fn test_get_full_snapshot_archives() {
3080        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3081        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3082        let min_slot = 123;
3083        let max_slot = 456;
3084        common_create_snapshot_archive_files(
3085            full_snapshot_archives_dir.path(),
3086            incremental_snapshot_archives_dir.path(),
3087            min_slot,
3088            max_slot,
3089            0,
3090            0,
3091        );
3092
3093        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3094        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3095    }
3096
3097    #[test]
3098    fn test_get_full_snapshot_archives_remote() {
3099        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3100        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3101        let min_slot = 123;
3102        let max_slot = 456;
3103        common_create_snapshot_archive_files(
3104            &full_snapshot_archives_dir
3105                .path()
3106                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3107            &incremental_snapshot_archives_dir
3108                .path()
3109                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3110            min_slot,
3111            max_slot,
3112            0,
3113            0,
3114        );
3115
3116        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3117        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3118        assert!(snapshot_archives.iter().all(|info| info.is_remote()));
3119    }
3120
3121    #[test]
3122    fn test_get_incremental_snapshot_archives() {
3123        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3124        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3125        let min_full_snapshot_slot = 12;
3126        let max_full_snapshot_slot = 23;
3127        let min_incremental_snapshot_slot = 34;
3128        let max_incremental_snapshot_slot = 45;
3129        common_create_snapshot_archive_files(
3130            full_snapshot_archives_dir.path(),
3131            incremental_snapshot_archives_dir.path(),
3132            min_full_snapshot_slot,
3133            max_full_snapshot_slot,
3134            min_incremental_snapshot_slot,
3135            max_incremental_snapshot_slot,
3136        );
3137
3138        let incremental_snapshot_archives =
3139            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3140        assert_eq!(
3141            incremental_snapshot_archives.len() as Slot,
3142            (max_full_snapshot_slot - min_full_snapshot_slot)
3143                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3144        );
3145    }
3146
3147    #[test]
3148    fn test_get_incremental_snapshot_archives_remote() {
3149        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3150        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3151        let min_full_snapshot_slot = 12;
3152        let max_full_snapshot_slot = 23;
3153        let min_incremental_snapshot_slot = 34;
3154        let max_incremental_snapshot_slot = 45;
3155        common_create_snapshot_archive_files(
3156            &full_snapshot_archives_dir
3157                .path()
3158                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3159            &incremental_snapshot_archives_dir
3160                .path()
3161                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3162            min_full_snapshot_slot,
3163            max_full_snapshot_slot,
3164            min_incremental_snapshot_slot,
3165            max_incremental_snapshot_slot,
3166        );
3167
3168        let incremental_snapshot_archives =
3169            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3170        assert_eq!(
3171            incremental_snapshot_archives.len() as Slot,
3172            (max_full_snapshot_slot - min_full_snapshot_slot)
3173                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3174        );
3175        assert!(incremental_snapshot_archives
3176            .iter()
3177            .all(|info| info.is_remote()));
3178    }
3179
3180    #[test]
3181    fn test_get_highest_full_snapshot_archive_slot() {
3182        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3183        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3184        let min_slot = 123;
3185        let max_slot = 456;
3186        common_create_snapshot_archive_files(
3187            full_snapshot_archives_dir.path(),
3188            incremental_snapshot_archives_dir.path(),
3189            min_slot,
3190            max_slot,
3191            0,
3192            0,
3193        );
3194
3195        assert_eq!(
3196            get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
3197            Some(max_slot - 1)
3198        );
3199    }
3200
3201    #[test]
3202    fn test_get_highest_incremental_snapshot_slot() {
3203        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3204        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3205        let min_full_snapshot_slot = 12;
3206        let max_full_snapshot_slot = 23;
3207        let min_incremental_snapshot_slot = 34;
3208        let max_incremental_snapshot_slot = 45;
3209        common_create_snapshot_archive_files(
3210            full_snapshot_archives_dir.path(),
3211            incremental_snapshot_archives_dir.path(),
3212            min_full_snapshot_slot,
3213            max_full_snapshot_slot,
3214            min_incremental_snapshot_slot,
3215            max_incremental_snapshot_slot,
3216        );
3217
3218        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3219            assert_eq!(
3220                get_highest_incremental_snapshot_archive_slot(
3221                    incremental_snapshot_archives_dir.path(),
3222                    full_snapshot_slot
3223                ),
3224                Some(max_incremental_snapshot_slot - 1)
3225            );
3226        }
3227
3228        assert_eq!(
3229            get_highest_incremental_snapshot_archive_slot(
3230                incremental_snapshot_archives_dir.path(),
3231                max_full_snapshot_slot
3232            ),
3233            None
3234        );
3235    }
3236
3237    fn common_test_purge_old_snapshot_archives(
3238        snapshot_names: &[&String],
3239        maximum_full_snapshot_archives_to_retain: NonZeroUsize,
3240        maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
3241        expected_snapshots: &[&String],
3242    ) {
3243        let temp_snap_dir = tempfile::TempDir::new().unwrap();
3244
3245        for snap_name in snapshot_names {
3246            let snap_path = temp_snap_dir.path().join(snap_name);
3247            let mut _snap_file = fs::File::create(snap_path);
3248        }
3249        purge_old_snapshot_archives(
3250            temp_snap_dir.path(),
3251            temp_snap_dir.path(),
3252            maximum_full_snapshot_archives_to_retain,
3253            maximum_incremental_snapshot_archives_to_retain,
3254        );
3255
3256        let mut retained_snaps = HashSet::new();
3257        for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
3258            let entry_path_buf = entry.unwrap().path();
3259            let entry_path = entry_path_buf.as_path();
3260            let snapshot_name = entry_path
3261                .file_name()
3262                .unwrap()
3263                .to_str()
3264                .unwrap()
3265                .to_string();
3266            retained_snaps.insert(snapshot_name);
3267        }
3268
3269        for snap_name in expected_snapshots {
3270            assert!(
3271                retained_snaps.contains(snap_name.as_str()),
3272                "{snap_name} not found"
3273            );
3274        }
3275        assert_eq!(retained_snaps.len(), expected_snapshots.len());
3276    }
3277
3278    #[test]
3279    fn test_purge_old_full_snapshot_archives() {
3280        let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
3281        let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
3282        let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
3283        let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
3284
3285        // expecting only the newest to be retained
3286        let expected_snapshots = vec![&snap3_name];
3287        common_test_purge_old_snapshot_archives(
3288            &snapshot_names,
3289            NonZeroUsize::new(1).unwrap(),
3290            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3291            &expected_snapshots,
3292        );
3293
3294        // retaining 2, expecting the 2 newest to be retained
3295        let expected_snapshots = vec![&snap2_name, &snap3_name];
3296        common_test_purge_old_snapshot_archives(
3297            &snapshot_names,
3298            NonZeroUsize::new(2).unwrap(),
3299            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3300            &expected_snapshots,
3301        );
3302
3303        // retaining 3, all three should be retained
3304        let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
3305        common_test_purge_old_snapshot_archives(
3306            &snapshot_names,
3307            NonZeroUsize::new(3).unwrap(),
3308            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3309            &expected_snapshots,
3310        );
3311    }
3312
3313    /// Mimic a running node's behavior w.r.t. purging old snapshot archives.  Take snapshots in a
3314    /// loop, and periodically purge old snapshot archives.  After purging, check to make sure the
3315    /// snapshot archives on disk are correct.
3316    #[test]
3317    fn test_purge_old_full_snapshot_archives_in_the_loop() {
3318        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3319        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3320        let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
3321        let starting_slot: Slot = 42;
3322
3323        for slot in (starting_slot..).take(100) {
3324            let full_snapshot_archive_file_name =
3325                format!("snapshot-{}-{}.tar", slot, Hash::default());
3326            let full_snapshot_archive_path = full_snapshot_archives_dir
3327                .as_ref()
3328                .join(full_snapshot_archive_file_name);
3329            fs::File::create(full_snapshot_archive_path).unwrap();
3330
3331            // don't purge-and-check until enough snapshot archives have been created
3332            if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
3333                continue;
3334            }
3335
3336            // purge infrequently, so there will always be snapshot archives to purge
3337            if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
3338                continue;
3339            }
3340
3341            purge_old_snapshot_archives(
3342                &full_snapshot_archives_dir,
3343                &incremental_snapshot_archives_dir,
3344                maximum_snapshots_to_retain,
3345                NonZeroUsize::new(usize::MAX).unwrap(),
3346            );
3347            let mut full_snapshot_archives =
3348                get_full_snapshot_archives(&full_snapshot_archives_dir);
3349            full_snapshot_archives.sort_unstable();
3350            assert_eq!(
3351                full_snapshot_archives.len(),
3352                maximum_snapshots_to_retain.get()
3353            );
3354            assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
3355            for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
3356                assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
3357            }
3358        }
3359    }
3360
3361    #[test]
3362    fn test_purge_old_incremental_snapshot_archives() {
3363        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3364        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3365        let starting_slot = 100_000;
3366
3367        let maximum_incremental_snapshot_archives_to_retain =
3368            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3369        let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3370
3371        let incremental_snapshot_interval = 100;
3372        let num_incremental_snapshots_per_full_snapshot =
3373            maximum_incremental_snapshot_archives_to_retain.get() * 2;
3374        let full_snapshot_interval =
3375            incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
3376
3377        let mut snapshot_filenames = vec![];
3378        (starting_slot..)
3379            .step_by(full_snapshot_interval)
3380            .take(
3381                maximum_full_snapshot_archives_to_retain
3382                    .checked_mul(NonZeroUsize::new(2).unwrap())
3383                    .unwrap()
3384                    .get(),
3385            )
3386            .for_each(|full_snapshot_slot| {
3387                let snapshot_filename =
3388                    format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3389                let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
3390                fs::File::create(snapshot_path).unwrap();
3391                snapshot_filenames.push(snapshot_filename);
3392
3393                (full_snapshot_slot..)
3394                    .step_by(incremental_snapshot_interval)
3395                    .take(num_incremental_snapshots_per_full_snapshot)
3396                    .skip(1)
3397                    .for_each(|incremental_snapshot_slot| {
3398                        let snapshot_filename = format!(
3399                            "incremental-snapshot-{}-{}-{}.tar",
3400                            full_snapshot_slot,
3401                            incremental_snapshot_slot,
3402                            Hash::default()
3403                        );
3404                        let snapshot_path = incremental_snapshot_archives_dir
3405                            .path()
3406                            .join(&snapshot_filename);
3407                        fs::File::create(snapshot_path).unwrap();
3408                        snapshot_filenames.push(snapshot_filename);
3409                    });
3410            });
3411
3412        purge_old_snapshot_archives(
3413            full_snapshot_archives_dir.path(),
3414            incremental_snapshot_archives_dir.path(),
3415            maximum_full_snapshot_archives_to_retain,
3416            maximum_incremental_snapshot_archives_to_retain,
3417        );
3418
3419        // Ensure correct number of full snapshot archives are purged/retained
3420        let mut remaining_full_snapshot_archives =
3421            get_full_snapshot_archives(full_snapshot_archives_dir.path());
3422        assert_eq!(
3423            remaining_full_snapshot_archives.len(),
3424            maximum_full_snapshot_archives_to_retain.get(),
3425        );
3426        remaining_full_snapshot_archives.sort_unstable();
3427        let latest_full_snapshot_archive_slot =
3428            remaining_full_snapshot_archives.last().unwrap().slot();
3429
3430        // Ensure correct number of incremental snapshot archives are purged/retained
3431        // For each additional full snapshot archive, one additional (the newest)
3432        // incremental snapshot archive is retained. This is accounted for by the
3433        // `+ maximum_full_snapshot_archives_to_retain.saturating_sub(1)`
3434        let mut remaining_incremental_snapshot_archives =
3435            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3436        assert_eq!(
3437            remaining_incremental_snapshot_archives.len(),
3438            maximum_incremental_snapshot_archives_to_retain
3439                .get()
3440                .saturating_add(
3441                    maximum_full_snapshot_archives_to_retain
3442                        .get()
3443                        .saturating_sub(1)
3444                )
3445        );
3446        remaining_incremental_snapshot_archives.sort_unstable();
3447        remaining_incremental_snapshot_archives.reverse();
3448
3449        // Ensure there exists one incremental snapshot all but the latest full snapshot
3450        for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
3451            let incremental_snapshot_archive =
3452                remaining_incremental_snapshot_archives.pop().unwrap();
3453
3454            let expected_base_slot =
3455                latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
3456            assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
3457            let expected_slot = expected_base_slot
3458                + (full_snapshot_interval - incremental_snapshot_interval) as u64;
3459            assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
3460        }
3461
3462        // Ensure all remaining incremental snapshots are only for the latest full snapshot
3463        for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
3464            assert_eq!(
3465                incremental_snapshot_archive.base_slot(),
3466                latest_full_snapshot_archive_slot
3467            );
3468        }
3469
3470        // Ensure the remaining incremental snapshots are at the right slot
3471        let expected_remaining_incremental_snapshot_archive_slots =
3472            (latest_full_snapshot_archive_slot..)
3473                .step_by(incremental_snapshot_interval)
3474                .take(num_incremental_snapshots_per_full_snapshot)
3475                .skip(
3476                    num_incremental_snapshots_per_full_snapshot
3477                        - maximum_incremental_snapshot_archives_to_retain.get(),
3478                )
3479                .collect::<HashSet<_>>();
3480
3481        let actual_remaining_incremental_snapshot_archive_slots =
3482            remaining_incremental_snapshot_archives
3483                .iter()
3484                .map(|snapshot| snapshot.slot())
3485                .collect::<HashSet<_>>();
3486        assert_eq!(
3487            actual_remaining_incremental_snapshot_archive_slots,
3488            expected_remaining_incremental_snapshot_archive_slots
3489        );
3490    }
3491
3492    #[test]
3493    fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
3494        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3495        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3496
3497        for snapshot_filenames in [
3498            format!("incremental-snapshot-100-120-{}.tar", Hash::default()),
3499            format!("incremental-snapshot-100-140-{}.tar", Hash::default()),
3500            format!("incremental-snapshot-100-160-{}.tar", Hash::default()),
3501            format!("incremental-snapshot-100-180-{}.tar", Hash::default()),
3502            format!("incremental-snapshot-200-220-{}.tar", Hash::default()),
3503            format!("incremental-snapshot-200-240-{}.tar", Hash::default()),
3504            format!("incremental-snapshot-200-260-{}.tar", Hash::default()),
3505            format!("incremental-snapshot-200-280-{}.tar", Hash::default()),
3506        ] {
3507            let snapshot_path = incremental_snapshot_archives_dir
3508                .path()
3509                .join(snapshot_filenames);
3510            fs::File::create(snapshot_path).unwrap();
3511        }
3512
3513        purge_old_snapshot_archives(
3514            full_snapshot_archives_dir.path(),
3515            incremental_snapshot_archives_dir.path(),
3516            NonZeroUsize::new(usize::MAX).unwrap(),
3517            NonZeroUsize::new(usize::MAX).unwrap(),
3518        );
3519
3520        let remaining_incremental_snapshot_archives =
3521            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3522        assert!(remaining_incremental_snapshot_archives.is_empty());
3523    }
3524
3525    #[test]
3526    fn test_get_snapshot_accounts_hardlink_dir() {
3527        let slot: Slot = 1;
3528
3529        let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
3530
3531        let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
3532        let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
3533        let accounts_hardlinks_dir = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
3534        fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
3535
3536        let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
3537        let appendvec_filename = format!("{slot}.0");
3538        let appendvec_path = accounts_dir.join(appendvec_filename);
3539
3540        let ret = get_snapshot_accounts_hardlink_dir(
3541            &appendvec_path,
3542            slot,
3543            &mut account_paths_set,
3544            &accounts_hardlinks_dir,
3545        );
3546        assert!(ret.is_ok());
3547
3548        let wrong_appendvec_path = appendvec_path
3549            .parent()
3550            .unwrap()
3551            .parent()
3552            .unwrap()
3553            .join(appendvec_path.file_name().unwrap());
3554        let ret = get_snapshot_accounts_hardlink_dir(
3555            &wrong_appendvec_path,
3556            slot,
3557            &mut account_paths_set,
3558            accounts_hardlinks_dir,
3559        );
3560
3561        assert_matches!(
3562            ret,
3563            Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
3564        );
3565    }
3566
3567    #[test]
3568    fn test_full_snapshot_slot_file_good() {
3569        let slot_written = 123_456_789;
3570        let bank_snapshot_dir = TempDir::new().unwrap();
3571        write_full_snapshot_slot_file(&bank_snapshot_dir, slot_written).unwrap();
3572
3573        let slot_read = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap();
3574        assert_eq!(slot_read, slot_written);
3575    }
3576
3577    #[test]
3578    fn test_full_snapshot_slot_file_bad() {
3579        const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
3580        let too_small = [1u8; SLOT_SIZE - 1];
3581        let too_large = [1u8; SLOT_SIZE + 1];
3582
3583        for contents in [too_small.as_slice(), too_large.as_slice()] {
3584            let bank_snapshot_dir = TempDir::new().unwrap();
3585            let full_snapshot_slot_path = bank_snapshot_dir
3586                .as_ref()
3587                .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
3588            fs::write(full_snapshot_slot_path, contents).unwrap();
3589
3590            let err = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap_err();
3591            assert!(err
3592                .to_string()
3593                .starts_with("invalid full snapshot slot file size"));
3594        }
3595    }
3596}