atlas_runtime/
snapshot_utils.rs

1use {
2    crate::{
3        bank::{BankFieldsToDeserialize, BankFieldsToSerialize, BankHashStats, BankSlotDelta},
4        serde_snapshot::{
5            self, AccountsDbFields, ExtraFieldsToSerialize, SerializableAccountStorageEntry,
6            SnapshotAccountsDbFields, SnapshotBankFields, SnapshotStreams,
7        },
8        snapshot_archive_info::{
9            FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfo,
10            SnapshotArchiveInfoGetter,
11        },
12        snapshot_config::SnapshotConfig,
13        snapshot_hash::SnapshotHash,
14        snapshot_package::{SnapshotKind, SnapshotPackage},
15        snapshot_utils::snapshot_storage_rebuilder::{
16            get_slot_and_append_vec_id, SnapshotStorageRebuilder,
17        },
18    },
19    crossbeam_channel::{Receiver, Sender},
20    log::*,
21    regex::Regex,
22    solana_accounts_db::{
23        account_storage::{AccountStorageMap, AccountStoragesOrderer},
24        account_storage_reader::AccountStorageReader,
25        accounts_db::{AccountStorageEntry, AtomicAccountsFileId},
26        accounts_file::{AccountsFile, AccountsFileError, StorageAccess},
27        hardened_unpack::{self, UnpackError},
28        utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
29    },
30    solana_clock::{Epoch, Slot},
31    solana_hash::Hash,
32    solana_measure::{measure::Measure, measure_time, measure_us},
33    std::{
34        cmp::Ordering,
35        collections::{HashMap, HashSet},
36        fmt, fs,
37        io::{self, BufRead, BufReader, BufWriter, Error as IoError, Read, Seek, Write},
38        mem,
39        num::{NonZeroU64, NonZeroUsize},
40        ops::RangeInclusive,
41        path::{Path, PathBuf},
42        process::ExitStatus,
43        str::FromStr,
44        sync::{Arc, LazyLock},
45        thread::{Builder, JoinHandle},
46    },
47    tar::{self, Archive},
48    tempfile::TempDir,
49    thiserror::Error,
50};
51#[cfg(feature = "dev-context-only-utils")]
52use {
53    hardened_unpack::UnpackedAppendVecMap,
54    solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
55};
56
57mod archive_format;
58mod snapshot_interval;
59pub mod snapshot_storage_rebuilder;
60pub use {archive_format::*, snapshot_interval::SnapshotInterval};
61
62pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
63pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
64pub const SNAPSHOT_STATE_COMPLETE_FILENAME: &str = "state_complete";
65pub const SNAPSHOT_STORAGES_FLUSHED_FILENAME: &str = "storages_flushed";
66pub const SNAPSHOT_ACCOUNTS_HARDLINKS: &str = "accounts_hardlinks";
67pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
68pub const SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME: &str = "full_snapshot_slot";
69/// When a snapshot is taken of a bank, the state is serialized under this directory.
70/// Specifically in `BANK_SNAPSHOTS_DIR/SLOT/`.
71/// This is also where the bank state is located in the snapshot archive.
72pub const BANK_SNAPSHOTS_DIR: &str = "snapshots";
73pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
74const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; // byte
75const VERSION_STRING_V1_2_0: &str = "1.2.0";
76pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
77pub const BANK_SNAPSHOT_PRE_FILENAME_EXTENSION: &str = "pre";
78pub const DEFAULT_FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS: NonZeroU64 =
79    NonZeroU64::new(100_000).unwrap();
80pub const DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS: NonZeroU64 =
81    NonZeroU64::new(100).unwrap();
82pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
83    NonZeroUsize::new(2).unwrap();
84pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
85    NonZeroUsize::new(4).unwrap();
86pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str =
87    r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar\.zst|tar\.lz4)$";
88pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar\.zst|tar\.lz4)$";
89
90// Allows scheduling a large number of reads such that temporary disk access delays
91// shouldn't block decompression (unless read bandwidth is saturated).
92const MAX_SNAPSHOT_READER_BUF_SIZE: u64 = 128 * 1024 * 1024;
93// Balance large and small files order in snapshot tar with bias towards small (4 small + 1 large),
94// such that during unpacking large writes are mixed with file metadata operations
95// and towards the end of archive (sizes equalize) writes are >256KiB / file.
96const INTERLEAVE_TAR_ENTRIES_SMALL_TO_LARGE_RATIO: (usize, usize) = (4, 1);
97
98#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
99pub enum SnapshotVersion {
100    #[default]
101    V1_2_0,
102}
103
104impl fmt::Display for SnapshotVersion {
105    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
106        f.write_str(From::from(*self))
107    }
108}
109
110impl From<SnapshotVersion> for &'static str {
111    fn from(snapshot_version: SnapshotVersion) -> &'static str {
112        match snapshot_version {
113            SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
114        }
115    }
116}
117
118impl FromStr for SnapshotVersion {
119    type Err = &'static str;
120
121    fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
122        // Remove leading 'v' or 'V' from slice
123        let version_string = if version_string
124            .get(..1)
125            .is_some_and(|s| s.eq_ignore_ascii_case("v"))
126        {
127            &version_string[1..]
128        } else {
129            version_string
130        };
131        match version_string {
132            VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
133            _ => Err("unsupported snapshot version"),
134        }
135    }
136}
137
138impl SnapshotVersion {
139    pub fn as_str(self) -> &'static str {
140        <&str as From<Self>>::from(self)
141    }
142}
143
144/// Information about a bank snapshot. Namely the slot of the bank, the path to the snapshot, and
145/// the kind of the snapshot.
146#[derive(PartialEq, Eq, Debug)]
147pub struct BankSnapshotInfo {
148    /// Slot of the bank
149    pub slot: Slot,
150    /// Snapshot kind
151    pub snapshot_kind: BankSnapshotKind,
152    /// Path to the bank snapshot directory
153    pub snapshot_dir: PathBuf,
154    /// Snapshot version
155    pub snapshot_version: SnapshotVersion,
156}
157
158impl PartialOrd for BankSnapshotInfo {
159    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
160        Some(self.cmp(other))
161    }
162}
163
164// Order BankSnapshotInfo by slot (ascending), which practically is sorting chronologically
165impl Ord for BankSnapshotInfo {
166    fn cmp(&self, other: &Self) -> Ordering {
167        self.slot.cmp(&other.slot)
168    }
169}
170
171impl BankSnapshotInfo {
172    pub fn new_from_dir(
173        bank_snapshots_dir: impl AsRef<Path>,
174        slot: Slot,
175    ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
176        // check this directory to see if there is a BankSnapshotPre and/or
177        // BankSnapshotPost file
178        let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
179
180        if !bank_snapshot_dir.is_dir() {
181            return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
182                bank_snapshot_dir,
183            ));
184        }
185
186        // Among the files checks, the completion flag file check should be done first to avoid the later
187        // I/O errors.
188
189        // There is a time window from the slot directory being created, and the content being completely
190        // filled.  Check the completion to avoid using a highest found slot directory with missing content.
191        if !is_bank_snapshot_complete(&bank_snapshot_dir) {
192            return Err(SnapshotNewFromDirError::IncompleteDir(bank_snapshot_dir));
193        }
194
195        let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
196        if !status_cache_file.is_file() {
197            return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
198                status_cache_file,
199            ));
200        }
201
202        let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
203        let version_str = snapshot_version_from_file(&version_path).or(Err(
204            SnapshotNewFromDirError::MissingVersionFile(version_path),
205        ))?;
206        let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
207            .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
208
209        let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
210        let bank_snapshot_pre_path =
211            bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
212
213        // NOTE: It is important that checking for "Pre" happens before "Post.
214        //
215        // Consider the scenario where AccountsHashVerifier is actively processing an
216        // AccountsPackage for a snapshot/slot; if AHV is in the middle of reserializing the
217        // bank snapshot file (writing the new "Post" file), and then the process dies,
218        // there will be an incomplete "Post" file on disk.  We do not want only the existence of
219        // this "Post" file to be sufficient for deciding the snapshot kind as "Post".  More so,
220        // "Post" *requires* the *absence* of a "Pre" file.
221        let snapshot_kind = if bank_snapshot_pre_path.is_file() {
222            BankSnapshotKind::Pre
223        } else if bank_snapshot_post_path.is_file() {
224            BankSnapshotKind::Post
225        } else {
226            return Err(SnapshotNewFromDirError::MissingSnapshotFile(
227                bank_snapshot_dir,
228            ));
229        };
230
231        Ok(BankSnapshotInfo {
232            slot,
233            snapshot_kind,
234            snapshot_dir: bank_snapshot_dir,
235            snapshot_version,
236        })
237    }
238
239    pub fn snapshot_path(&self) -> PathBuf {
240        let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot));
241
242        let ext = match self.snapshot_kind {
243            BankSnapshotKind::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
244            BankSnapshotKind::Post => "",
245        };
246        bank_snapshot_path.set_extension(ext);
247
248        bank_snapshot_path
249    }
250}
251
252/// Bank snapshots traditionally had their accounts hash calculated prior to serialization.  Since
253/// the hash calculation takes a long time, an optimization has been put in to offload the accounts
254/// hash calculation.  The bank serialization format has not changed, so we need another way to
255/// identify if a bank snapshot contains the calculated accounts hash or not.
256///
257/// When a bank snapshot is first taken, it does not have the calculated accounts hash.  It is said
258/// that this bank snapshot is "pre" accounts hash.  Later, when the accounts hash is calculated,
259/// the bank snapshot is re-serialized, and is now "post" accounts hash.
260#[derive(Debug, Copy, Clone, Eq, PartialEq)]
261pub enum BankSnapshotKind {
262    /// This bank snapshot has *not* yet had its accounts hash calculated
263    Pre,
264    /// This bank snapshot *has* had its accounts hash calculated
265    Post,
266}
267
268/// When constructing a bank a snapshot, traditionally the snapshot was from a snapshot archive.  Now,
269/// the snapshot can be from a snapshot directory, or from a snapshot archive.  This is the flag to
270/// indicate which.
271#[derive(Clone, Copy, Debug, Eq, PartialEq)]
272pub enum SnapshotFrom {
273    /// Build from the snapshot archive
274    Archive,
275    /// Build directly from the bank snapshot directory
276    Dir,
277}
278
279/// Helper type when rebuilding from snapshots.  Designed to handle when rebuilding from just a
280/// full snapshot, or from both a full snapshot and an incremental snapshot.
281#[derive(Debug)]
282pub struct SnapshotRootPaths {
283    pub full_snapshot_root_file_path: PathBuf,
284    pub incremental_snapshot_root_file_path: Option<PathBuf>,
285}
286
287/// Helper type to bundle up the results from `unarchive_snapshot()`
288#[derive(Debug)]
289pub struct UnarchivedSnapshot {
290    #[allow(dead_code)]
291    unpack_dir: TempDir,
292    pub storage: AccountStorageMap,
293    pub bank_fields: BankFieldsToDeserialize,
294    pub accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
295    pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
296    pub measure_untar: Measure,
297}
298
299/// Helper type to bundle up the results from `verify_and_unarchive_snapshots()`.
300#[derive(Debug)]
301pub struct UnarchivedSnapshots {
302    pub full_storage: AccountStorageMap,
303    pub incremental_storage: Option<AccountStorageMap>,
304    pub bank_fields: SnapshotBankFields,
305    pub accounts_db_fields: SnapshotAccountsDbFields<SerializableAccountStorageEntry>,
306    pub full_unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
307    pub incremental_unpacked_snapshots_dir_and_version: Option<UnpackedSnapshotsDirAndVersion>,
308    pub full_measure_untar: Measure,
309    pub incremental_measure_untar: Option<Measure>,
310    pub next_append_vec_id: AtomicAccountsFileId,
311}
312
313/// Guard type that keeps the unpack directories of snapshots alive.
314/// Once dropped, the unpack directories are removed.
315#[allow(dead_code)]
316#[derive(Debug)]
317pub struct UnarchivedSnapshotsGuard {
318    full_unpack_dir: TempDir,
319    incremental_unpack_dir: Option<TempDir>,
320}
321/// Helper type for passing around the unpacked snapshots dir and the snapshot version together
322#[derive(Debug)]
323pub struct UnpackedSnapshotsDirAndVersion {
324    pub unpacked_snapshots_dir: PathBuf,
325    pub snapshot_version: SnapshotVersion,
326}
327
328/// Helper type for passing around account storage map and next append vec id
329/// for reconstructing accounts from a snapshot
330pub(crate) struct StorageAndNextAccountsFileId {
331    pub storage: AccountStorageMap,
332    pub next_append_vec_id: AtomicAccountsFileId,
333}
334
335#[derive(Error, Debug)]
336#[allow(clippy::large_enum_variant)]
337pub enum SnapshotError {
338    #[error("I/O error: {0}")]
339    Io(#[from] IoError),
340
341    #[error("AccountsFile error: {0}")]
342    AccountsFileError(#[from] AccountsFileError),
343
344    #[error("serialization error: {0}")]
345    Serialize(#[from] bincode::Error),
346
347    #[error("crossbeam send error: {0}")]
348    CrossbeamSend(#[from] crossbeam_channel::SendError<PathBuf>),
349
350    #[error("archive generation failure {0}")]
351    ArchiveGenerationFailure(ExitStatus),
352
353    #[error("Unpack error: {0}")]
354    UnpackError(#[from] UnpackError),
355
356    #[error("source({1}) - I/O error: {0}")]
357    IoWithSource(IoError, &'static str),
358
359    #[error("could not get file name from path '{0}'")]
360    PathToFileNameError(PathBuf),
361
362    #[error("could not get str from file name '{0}'")]
363    FileNameToStrError(PathBuf),
364
365    #[error("could not parse snapshot archive's file name '{0}'")]
366    ParseSnapshotArchiveFileNameError(String),
367
368    #[error(
369        "snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot \
370         ({1}) do not match"
371    )]
372    MismatchedBaseSlot(Slot, Slot),
373
374    #[error("no snapshot archives to load from '{0}'")]
375    NoSnapshotArchives(PathBuf),
376
377    #[error("snapshot slot mismatch: deserialized bank: {0}, snapshot archive: {1}")]
378    MismatchedSlot(Slot, Slot),
379
380    #[error("snapshot hash mismatch: deserialized bank: {0:?}, snapshot archive: {1:?}")]
381    MismatchedHash(SnapshotHash, SnapshotHash),
382
383    #[error("snapshot slot deltas are invalid: {0}")]
384    VerifySlotDeltas(#[from] VerifySlotDeltasError),
385
386    #[error("snapshot epoch stakes are invalid: {0}")]
387    VerifyEpochStakes(#[from] VerifyEpochStakesError),
388
389    #[error("bank_snapshot_info new_from_dir failed: {0}")]
390    NewFromDir(#[from] SnapshotNewFromDirError),
391
392    #[error("invalid snapshot dir path '{0}'")]
393    InvalidSnapshotDirPath(PathBuf),
394
395    #[error("invalid AppendVec path '{0}'")]
396    InvalidAppendVecPath(PathBuf),
397
398    #[error("invalid account path '{0}'")]
399    InvalidAccountPath(PathBuf),
400
401    #[error("no valid snapshot dir found under '{0}'")]
402    NoSnapshotSlotDir(PathBuf),
403
404    #[error("snapshot dir account paths mismatching")]
405    AccountPathsMismatch,
406
407    #[error("failed to add bank snapshot for slot {1}: {0}")]
408    AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
409
410    #[error("failed to archive snapshot package: {0}")]
411    ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
412
413    #[error("failed to rebuild snapshot storages: {0}")]
414    RebuildStorages(String),
415}
416
417#[derive(Error, Debug)]
418pub enum SnapshotNewFromDirError {
419    #[error("invalid bank snapshot directory '{0}'")]
420    InvalidBankSnapshotDir(PathBuf),
421
422    #[error("missing status cache file '{0}'")]
423    MissingStatusCacheFile(PathBuf),
424
425    #[error("missing version file '{0}'")]
426    MissingVersionFile(PathBuf),
427
428    #[error("invalid snapshot version '{0}'")]
429    InvalidVersion(String),
430
431    #[error("snapshot directory incomplete '{0}'")]
432    IncompleteDir(PathBuf),
433
434    #[error("missing snapshot file '{0}'")]
435    MissingSnapshotFile(PathBuf),
436}
437
438pub type Result<T> = std::result::Result<T, SnapshotError>;
439
440/// Errors that can happen in `verify_slot_deltas()`
441#[derive(Error, Debug, PartialEq, Eq)]
442pub enum VerifySlotDeltasError {
443    #[error("too many entries: {0} (max: {1})")]
444    TooManyEntries(usize, usize),
445
446    #[error("slot {0} is not a root")]
447    SlotIsNotRoot(Slot),
448
449    #[error("slot {0} is greater than bank slot {1}")]
450    SlotGreaterThanMaxRoot(Slot, Slot),
451
452    #[error("slot {0} has multiple entries")]
453    SlotHasMultipleEntries(Slot),
454
455    #[error("slot {0} was not found in slot history")]
456    SlotNotFoundInHistory(Slot),
457
458    #[error("slot {0} was in history but missing from slot deltas")]
459    SlotNotFoundInDeltas(Slot),
460
461    #[error("snapshot slot history is invalid: {0}")]
462    VerifySlotHistory(#[from] VerifySlotHistoryError),
463}
464
465/// Errors that can happen in `verify_slot_history()`
466#[derive(Error, Debug, PartialEq, Eq)]
467pub enum VerifySlotHistoryError {
468    #[error("newest slot does not match snapshot")]
469    InvalidNewestSlot,
470
471    #[error("invalid number of entries")]
472    InvalidNumEntries,
473}
474
475/// Errors that can happen in `verify_epoch_stakes()`
476#[derive(Error, Debug, PartialEq, Eq)]
477pub enum VerifyEpochStakesError {
478    #[error("epoch {0} is greater than the max {1}")]
479    EpochGreaterThanMax(Epoch, Epoch),
480
481    #[error("stakes not found for epoch {0} (required epochs: {1:?})")]
482    StakesNotFound(Epoch, RangeInclusive<Epoch>),
483}
484
485/// Errors that can happen in `add_bank_snapshot()`
486#[derive(Error, Debug)]
487pub enum AddBankSnapshotError {
488    #[error("bank snapshot dir already exists '{0}'")]
489    SnapshotDirAlreadyExists(PathBuf),
490
491    #[error("failed to create snapshot dir '{1}': {0}")]
492    CreateSnapshotDir(#[source] IoError, PathBuf),
493
494    #[error("failed to flush storage '{1}': {0}")]
495    FlushStorage(#[source] AccountsFileError, PathBuf),
496
497    #[error("failed to mark snapshot storages as 'flushed': {0}")]
498    MarkStoragesFlushed(#[source] IoError),
499
500    #[error("failed to hard link storages: {0}")]
501    HardLinkStorages(#[source] HardLinkStoragesToSnapshotError),
502
503    #[error("failed to serialize bank: {0}")]
504    SerializeBank(#[source] Box<SnapshotError>),
505
506    #[error("failed to serialize status cache: {0}")]
507    SerializeStatusCache(#[source] Box<SnapshotError>),
508
509    #[error("failed to write snapshot version file '{1}': {0}")]
510    WriteSnapshotVersionFile(#[source] IoError, PathBuf),
511
512    #[error("failed to mark snapshot as 'complete': {0}")]
513    MarkSnapshotComplete(#[source] IoError),
514}
515
516/// Errors that can happen in `archive_snapshot_package()`
517#[derive(Error, Debug)]
518pub enum ArchiveSnapshotPackageError {
519    #[error("failed to create archive path '{1}': {0}")]
520    CreateArchiveDir(#[source] IoError, PathBuf),
521
522    #[error("failed to create staging dir inside '{1}': {0}")]
523    CreateStagingDir(#[source] IoError, PathBuf),
524
525    #[error("failed to create snapshot staging dir '{1}': {0}")]
526    CreateSnapshotStagingDir(#[source] IoError, PathBuf),
527
528    #[error("failed to canonicalize snapshot source dir '{1}': {0}")]
529    CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),
530
531    #[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
532    SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),
533
534    #[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
535    SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),
536
537    #[error("failed to symlink version file from '{1}' to '{2}': {0}")]
538    SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),
539
540    #[error("failed to create archive file '{1}': {0}")]
541    CreateArchiveFile(#[source] IoError, PathBuf),
542
543    #[error("failed to archive version file: {0}")]
544    ArchiveVersionFile(#[source] IoError),
545
546    #[error("failed to archive snapshots dir: {0}")]
547    ArchiveSnapshotsDir(#[source] IoError),
548
549    #[error("failed to archive account storage file '{1}': {0}")]
550    ArchiveAccountStorageFile(#[source] IoError, PathBuf),
551
552    #[error("failed to archive snapshot: {0}")]
553    FinishArchive(#[source] IoError),
554
555    #[error("failed to create encoder: {0}")]
556    CreateEncoder(#[source] IoError),
557
558    #[error("failed to encode archive: {0}")]
559    FinishEncoder(#[source] IoError),
560
561    #[error("failed to query archive metadata '{1}': {0}")]
562    QueryArchiveMetadata(#[source] IoError, PathBuf),
563
564    #[error("failed to move archive from '{1}' to '{2}': {0}")]
565    MoveArchive(#[source] IoError, PathBuf, PathBuf),
566
567    #[error("failed to create account storage reader '{1}': {0}")]
568    AccountStorageReaderError(#[source] IoError, PathBuf),
569}
570
571/// Errors that can happen in `hard_link_storages_to_snapshot()`
572#[derive(Error, Debug)]
573pub enum HardLinkStoragesToSnapshotError {
574    #[error("failed to create accounts hard links dir '{1}': {0}")]
575    CreateAccountsHardLinksDir(#[source] IoError, PathBuf),
576
577    #[error("failed to get the snapshot's accounts hard link dir: {0}")]
578    GetSnapshotHardLinksDir(#[from] GetSnapshotAccountsHardLinkDirError),
579
580    #[error("failed to hard link storage from '{1}' to '{2}': {0}")]
581    HardLinkStorage(#[source] IoError, PathBuf, PathBuf),
582}
583
584/// Errors that can happen in `get_snapshot_accounts_hardlink_dir()`
585#[derive(Error, Debug)]
586pub enum GetSnapshotAccountsHardLinkDirError {
587    #[error("invalid account storage path '{0}'")]
588    GetAccountPath(PathBuf),
589
590    #[error("failed to create the snapshot hard link dir '{1}': {0}")]
591    CreateSnapshotHardLinkDir(#[source] IoError, PathBuf),
592
593    #[error("failed to symlink snapshot hard link dir '{link}' to '{original}': {source}")]
594    SymlinkSnapshotHardLinkDir {
595        source: IoError,
596        original: PathBuf,
597        link: PathBuf,
598    },
599}
600
601/// The account snapshot directories under <account_path>/snapshot/<slot> contain account files hardlinked
602/// from <account_path>/run taken at snapshot <slot> time.  They are referenced by the symlinks from the
603/// bank snapshot dir snapshot/<slot>/accounts_hardlinks/.  We observed that sometimes the bank snapshot dir
604/// could be deleted but the account snapshot directories were left behind, possibly by some manual operations
605/// or some legacy code not using the symlinks to clean up the account snapshot hardlink directories.
606/// This function cleans up any account snapshot directories that are no longer referenced by the bank
607/// snapshot dirs, to ensure proper snapshot operations.
608pub fn clean_orphaned_account_snapshot_dirs(
609    bank_snapshots_dir: impl AsRef<Path>,
610    account_snapshot_paths: &[PathBuf],
611) -> io::Result<()> {
612    // Create the HashSet of the account snapshot hardlink directories referenced by the snapshot dirs.
613    // This is used to clean up any hardlinks that are no longer referenced by the snapshot dirs.
614    let mut account_snapshot_dirs_referenced = HashSet::new();
615    let snapshots = get_bank_snapshots(bank_snapshots_dir);
616    for snapshot in snapshots {
617        let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
618        // loop through entries in the snapshot_hardlink_dir, read the symlinks, add the target to the HashSet
619        let Ok(read_dir) = fs::read_dir(&account_hardlinks_dir) else {
620            // The bank snapshot may not have a hard links dir with the storages.
621            // This is fine, and happens for bank snapshots we do *not* fastboot from.
622            // In this case, log it and go to the next bank snapshot.
623            debug!(
624                "failed to read account hardlinks dir '{}'",
625                account_hardlinks_dir.display(),
626            );
627            continue;
628        };
629        for entry in read_dir {
630            let path = entry?.path();
631            let target = fs::read_link(&path).map_err(|err| {
632                IoError::other(format!(
633                    "failed to read symlink '{}': {err}",
634                    path.display(),
635                ))
636            })?;
637            account_snapshot_dirs_referenced.insert(target);
638        }
639    }
640
641    // loop through the account snapshot hardlink directories, if the directory is not in the account_snapshot_dirs_referenced set, delete it
642    for account_snapshot_path in account_snapshot_paths {
643        let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
644            IoError::other(format!(
645                "failed to read account snapshot dir '{}': {err}",
646                account_snapshot_path.display(),
647            ))
648        })?;
649        for entry in read_dir {
650            let path = entry?.path();
651            if !account_snapshot_dirs_referenced.contains(&path) {
652                info!(
653                    "Removing orphaned account snapshot hardlink directory '{}'...",
654                    path.display()
655                );
656                move_and_async_delete_path(&path);
657            }
658        }
659    }
660
661    Ok(())
662}
663
664/// Purges incomplete bank snapshots
665pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
666    let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
667        // If we cannot read the bank snapshots dir, then there's nothing to do
668        return;
669    };
670
671    let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
672
673    let incomplete_dirs: Vec<_> = read_dir_iter
674        .filter_map(|entry| entry.ok())
675        .map(|entry| entry.path())
676        .filter(|path| path.is_dir())
677        .filter(is_incomplete)
678        .collect();
679
680    // attempt to purge all the incomplete directories; do not exit early
681    for incomplete_dir in incomplete_dirs {
682        let result = purge_bank_snapshot(&incomplete_dir);
683        match result {
684            Ok(_) => info!(
685                "Purged incomplete snapshot dir: {}",
686                incomplete_dir.display()
687            ),
688            Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
689        }
690    }
691}
692
693/// Is the bank snapshot complete?
694fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
695    let state_complete_path = bank_snapshot_dir
696        .as_ref()
697        .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
698    state_complete_path.is_file()
699}
700
701/// Marks the bank snapshot as complete
702fn write_snapshot_state_complete_file(bank_snapshot_dir: impl AsRef<Path>) -> io::Result<()> {
703    let state_complete_path = bank_snapshot_dir
704        .as_ref()
705        .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
706    fs::File::create(&state_complete_path).map_err(|err| {
707        IoError::other(format!(
708            "failed to create file '{}': {err}",
709            state_complete_path.display(),
710        ))
711    })?;
712    Ok(())
713}
714
715/// Writes the full snapshot slot file into the bank snapshot dir
716pub fn write_full_snapshot_slot_file(
717    bank_snapshot_dir: impl AsRef<Path>,
718    full_snapshot_slot: Slot,
719) -> io::Result<()> {
720    let full_snapshot_slot_path = bank_snapshot_dir
721        .as_ref()
722        .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
723    fs::write(
724        &full_snapshot_slot_path,
725        Slot::to_le_bytes(full_snapshot_slot),
726    )
727    .map_err(|err| {
728        IoError::other(format!(
729            "failed to write full snapshot slot file '{}': {err}",
730            full_snapshot_slot_path.display(),
731        ))
732    })
733}
734
735// Reads the full snapshot slot file from the bank snapshot dir
736pub fn read_full_snapshot_slot_file(bank_snapshot_dir: impl AsRef<Path>) -> io::Result<Slot> {
737    const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
738    let full_snapshot_slot_path = bank_snapshot_dir
739        .as_ref()
740        .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
741    let full_snapshot_slot_file_metadata = fs::metadata(&full_snapshot_slot_path)?;
742    if full_snapshot_slot_file_metadata.len() != SLOT_SIZE as u64 {
743        let error_message = format!(
744            "invalid full snapshot slot file size: '{}' has {} bytes (should be {} bytes)",
745            full_snapshot_slot_path.display(),
746            full_snapshot_slot_file_metadata.len(),
747            SLOT_SIZE,
748        );
749        return Err(IoError::other(error_message));
750    }
751    let mut full_snapshot_slot_file = fs::File::open(&full_snapshot_slot_path)?;
752    let mut buffer = [0; SLOT_SIZE];
753    full_snapshot_slot_file.read_exact(&mut buffer)?;
754    let slot = Slot::from_le_bytes(buffer);
755    Ok(slot)
756}
757
758/// Writes the 'snapshot storages have been flushed' file to the bank snapshot dir
759pub fn write_storages_flushed_file(bank_snapshot_dir: impl AsRef<Path>) -> io::Result<()> {
760    let flushed_storages_path = bank_snapshot_dir
761        .as_ref()
762        .join(SNAPSHOT_STORAGES_FLUSHED_FILENAME);
763    fs::File::create(&flushed_storages_path).map_err(|err| {
764        IoError::other(format!(
765            "failed to create file '{}': {err}",
766            flushed_storages_path.display(),
767        ))
768    })?;
769    Ok(())
770}
771
772/// Were the snapshot storages flushed in this bank snapshot?
773fn are_bank_snapshot_storages_flushed(bank_snapshot_dir: impl AsRef<Path>) -> bool {
774    let flushed_storages = bank_snapshot_dir
775        .as_ref()
776        .join(SNAPSHOT_STORAGES_FLUSHED_FILENAME);
777    flushed_storages.is_file()
778}
779
780/// Gets the highest, loadable, bank snapshot
781///
782/// The highest bank snapshot is the one with the highest slot.
783/// To be loadable, the bank snapshot must be a BankSnapshotKind::Post.
784/// And if we're generating snapshots (e.g. running a normal validator), then
785/// the full snapshot file's slot must match the highest full snapshot archive's.
786/// Lastly, the account storages must have been flushed to be loadable.
787pub fn get_highest_loadable_bank_snapshot(
788    snapshot_config: &SnapshotConfig,
789) -> Option<BankSnapshotInfo> {
790    let highest_bank_snapshot =
791        get_highest_bank_snapshot_post(&snapshot_config.bank_snapshots_dir)?;
792
793    // If we're *not* generating snapshots, e.g. running ledger-tool, then we *can* load
794    // this bank snapshot, and we do not need to check for anything else.
795    if !snapshot_config.should_generate_snapshots() {
796        return Some(highest_bank_snapshot);
797    }
798
799    // Otherwise, the bank snapshot's full snapshot slot *must* be the same as
800    // the highest full snapshot archive's slot.
801    let highest_full_snapshot_archive_slot =
802        get_highest_full_snapshot_archive_slot(&snapshot_config.full_snapshot_archives_dir)?;
803    let full_snapshot_file_slot =
804        read_full_snapshot_slot_file(&highest_bank_snapshot.snapshot_dir).ok()?;
805    let are_storages_flushed =
806        are_bank_snapshot_storages_flushed(&highest_bank_snapshot.snapshot_dir);
807    (are_storages_flushed && (full_snapshot_file_slot == highest_full_snapshot_archive_slot))
808        .then_some(highest_bank_snapshot)
809}
810
811/// If the validator halts in the middle of `archive_snapshot_package()`, the temporary staging
812/// directory won't be cleaned up.  Call this function to clean them up.
813pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
814    if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
815        for entry in entries.flatten() {
816            if entry
817                .file_name()
818                .to_str()
819                .map(|file_name| file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX))
820                .unwrap_or(false)
821            {
822                let path = entry.path();
823                let result = if path.is_dir() {
824                    fs::remove_dir_all(&path)
825                } else {
826                    fs::remove_file(&path)
827                };
828                if let Err(err) = result {
829                    warn!(
830                        "Failed to remove temporary snapshot archive '{}': {err}",
831                        path.display(),
832                    );
833                }
834            }
835        }
836    }
837}
838
839/// Serializes and archives a snapshot package
840pub fn serialize_and_archive_snapshot_package(
841    snapshot_package: SnapshotPackage,
842    snapshot_config: &SnapshotConfig,
843    should_flush_and_hard_link_storages: bool,
844) -> Result<SnapshotArchiveInfo> {
845    let SnapshotPackage {
846        snapshot_kind,
847        slot: snapshot_slot,
848        block_height,
849        hash: snapshot_hash,
850        mut snapshot_storages,
851        status_cache_slot_deltas,
852        bank_fields_to_serialize,
853        bank_hash_stats,
854        write_version,
855        enqueued: _,
856    } = snapshot_package;
857
858    let bank_snapshot_info = serialize_snapshot(
859        &snapshot_config.bank_snapshots_dir,
860        snapshot_config.snapshot_version,
861        snapshot_storages.as_slice(),
862        status_cache_slot_deltas.as_slice(),
863        bank_fields_to_serialize,
864        bank_hash_stats,
865        write_version,
866        should_flush_and_hard_link_storages,
867    )?;
868
869    // now write the full snapshot slot file after serializing so this bank snapshot is loadable
870    let full_snapshot_archive_slot = match snapshot_kind {
871        SnapshotKind::FullSnapshot => snapshot_slot,
872        SnapshotKind::IncrementalSnapshot(base_slot) => base_slot,
873    };
874    write_full_snapshot_slot_file(&bank_snapshot_info.snapshot_dir, full_snapshot_archive_slot)
875        .map_err(|err| {
876            IoError::other(format!(
877                "failed to serialize snapshot slot {snapshot_slot}, block height {block_height}, \
878                 kind {snapshot_kind:?}: {err}",
879            ))
880        })?;
881
882    let snapshot_archive_path = match snapshot_package.snapshot_kind {
883        SnapshotKind::FullSnapshot => build_full_snapshot_archive_path(
884            &snapshot_config.full_snapshot_archives_dir,
885            snapshot_package.slot,
886            &snapshot_package.hash,
887            snapshot_config.archive_format,
888        ),
889        SnapshotKind::IncrementalSnapshot(incremental_snapshot_base_slot) => {
890            // After the snapshot has been serialized, it is now safe (and required) to prune all
891            // the storages that are *not* to be archived for this incremental snapshot.
892            snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
893            build_incremental_snapshot_archive_path(
894                &snapshot_config.incremental_snapshot_archives_dir,
895                incremental_snapshot_base_slot,
896                snapshot_package.slot,
897                &snapshot_package.hash,
898                snapshot_config.archive_format,
899            )
900        }
901    };
902
903    let snapshot_archive_info = archive_snapshot(
904        snapshot_kind,
905        snapshot_slot,
906        snapshot_hash,
907        snapshot_storages.as_slice(),
908        &bank_snapshot_info.snapshot_dir,
909        snapshot_archive_path,
910        snapshot_config.archive_format,
911    )?;
912
913    Ok(snapshot_archive_info)
914}
915
916/// Serializes a snapshot into `bank_snapshots_dir`
917#[allow(clippy::too_many_arguments)]
918fn serialize_snapshot(
919    bank_snapshots_dir: impl AsRef<Path>,
920    snapshot_version: SnapshotVersion,
921    snapshot_storages: &[Arc<AccountStorageEntry>],
922    slot_deltas: &[BankSlotDelta],
923    mut bank_fields: BankFieldsToSerialize,
924    bank_hash_stats: BankHashStats,
925    write_version: u64,
926    should_flush_and_hard_link_storages: bool,
927) -> Result<BankSnapshotInfo> {
928    let slot = bank_fields.slot;
929
930    // this lambda function is to facilitate converting between
931    // the AddBankSnapshotError and SnapshotError types
932    let do_serialize_snapshot = || {
933        let mut measure_everything = Measure::start("");
934        let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
935        if bank_snapshot_dir.exists() {
936            return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
937                bank_snapshot_dir,
938            ));
939        }
940        fs::create_dir_all(&bank_snapshot_dir).map_err(|err| {
941            AddBankSnapshotError::CreateSnapshotDir(err, bank_snapshot_dir.clone())
942        })?;
943
944        // the bank snapshot is stored as bank_snapshots_dir/slot/slot
945        let bank_snapshot_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
946        info!(
947            "Creating bank snapshot for slot {slot} at '{}'",
948            bank_snapshot_path.display(),
949        );
950
951        let (flush_storages_us, hard_link_storages_us) = if should_flush_and_hard_link_storages {
952            let flush_measure = Measure::start("");
953            for storage in snapshot_storages {
954                storage.flush().map_err(|err| {
955                    AddBankSnapshotError::FlushStorage(err, storage.path().to_path_buf())
956                })?;
957            }
958            let flush_us = flush_measure.end_as_us();
959            let (_, hard_link_us) = measure_us!(hard_link_storages_to_snapshot(
960                &bank_snapshot_dir,
961                slot,
962                snapshot_storages
963            )
964            .map_err(AddBankSnapshotError::HardLinkStorages)?);
965            write_storages_flushed_file(&bank_snapshot_dir)
966                .map_err(AddBankSnapshotError::MarkStoragesFlushed)?;
967            Some((flush_us, hard_link_us))
968        } else {
969            None
970        }
971        .unzip();
972
973        let bank_snapshot_serializer = move |stream: &mut BufWriter<fs::File>| -> Result<()> {
974            let versioned_epoch_stakes = mem::take(&mut bank_fields.versioned_epoch_stakes);
975            let extra_fields = ExtraFieldsToSerialize {
976                lamports_per_signature: bank_fields.fee_rate_governor.lamports_per_signature,
977                obsolete_incremental_snapshot_persistence: None,
978                obsolete_epoch_accounts_hash: None,
979                versioned_epoch_stakes,
980                accounts_lt_hash: Some(bank_fields.accounts_lt_hash.clone().into()),
981            };
982            serde_snapshot::serialize_bank_snapshot_into(
983                stream,
984                bank_fields,
985                bank_hash_stats,
986                &get_storages_to_serialize(snapshot_storages),
987                extra_fields,
988                write_version,
989            )?;
990            Ok(())
991        };
992        let (bank_snapshot_consumed_size, bank_serialize) = measure_time!(
993            serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
994                .map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
995            "bank serialize"
996        );
997
998        let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
999        let (status_cache_consumed_size, status_cache_serialize_us) = measure_us!(
1000            serde_snapshot::serialize_status_cache(slot_deltas, &status_cache_path)
1001                .map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?
1002        );
1003
1004        let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1005        let (_, write_version_file_us) = measure_us!(fs::write(
1006            &version_path,
1007            snapshot_version.as_str().as_bytes(),
1008        )
1009        .map_err(|err| AddBankSnapshotError::WriteSnapshotVersionFile(err, version_path))?);
1010
1011        // Mark this directory complete so it can be used.  Check this flag first before selecting for deserialization.
1012        let (_, write_state_complete_file_us) = measure_us!({
1013            write_snapshot_state_complete_file(&bank_snapshot_dir)
1014                .map_err(AddBankSnapshotError::MarkSnapshotComplete)?
1015        });
1016
1017        measure_everything.stop();
1018
1019        // Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
1020        datapoint_info!(
1021            "snapshot_bank",
1022            ("slot", slot, i64),
1023            ("bank_size", bank_snapshot_consumed_size, i64),
1024            ("status_cache_size", status_cache_consumed_size, i64),
1025            ("flush_storages_us", flush_storages_us, Option<i64>),
1026            ("hard_link_storages_us", hard_link_storages_us, Option<i64>),
1027            ("bank_serialize_us", bank_serialize.as_us(), i64),
1028            ("status_cache_serialize_us", status_cache_serialize_us, i64),
1029            ("write_version_file_us", write_version_file_us, i64),
1030            (
1031                "write_state_complete_file_us",
1032                write_state_complete_file_us,
1033                i64
1034            ),
1035            ("total_us", measure_everything.as_us(), i64),
1036        );
1037
1038        info!(
1039            "{} for slot {} at {}",
1040            bank_serialize,
1041            slot,
1042            bank_snapshot_path.display(),
1043        );
1044
1045        Ok(BankSnapshotInfo {
1046            slot,
1047            snapshot_kind: BankSnapshotKind::Pre,
1048            snapshot_dir: bank_snapshot_dir,
1049            snapshot_version,
1050        })
1051    };
1052
1053    do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
1054}
1055
1056/// Archives a snapshot into `archive_path`
1057fn archive_snapshot(
1058    snapshot_kind: SnapshotKind,
1059    snapshot_slot: Slot,
1060    snapshot_hash: SnapshotHash,
1061    snapshot_storages: &[Arc<AccountStorageEntry>],
1062    bank_snapshot_dir: impl AsRef<Path>,
1063    archive_path: impl AsRef<Path>,
1064    archive_format: ArchiveFormat,
1065) -> Result<SnapshotArchiveInfo> {
1066    use ArchiveSnapshotPackageError as E;
1067    const ACCOUNTS_DIR: &str = "accounts";
1068    info!("Generating snapshot archive for slot {snapshot_slot}, kind: {snapshot_kind:?}");
1069
1070    let mut timer = Measure::start("snapshot_package-package_snapshots");
1071    let tar_dir = archive_path
1072        .as_ref()
1073        .parent()
1074        .expect("Tar output path is invalid");
1075
1076    fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
1077
1078    // Create the staging directories
1079    let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
1080    let staging_dir = tempfile::Builder::new()
1081        .prefix(&format!("{staging_dir_prefix}{snapshot_slot}-"))
1082        .tempdir_in(tar_dir)
1083        .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
1084    let staging_snapshots_dir = staging_dir.path().join(BANK_SNAPSHOTS_DIR);
1085
1086    let slot_str = snapshot_slot.to_string();
1087    let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
1088    // Creates staging snapshots/<slot>/
1089    fs::create_dir_all(&staging_snapshot_dir)
1090        .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
1091
1092    // To be a source for symlinking and archiving, the path need to be an absolute path
1093    let src_snapshot_dir = bank_snapshot_dir.as_ref().canonicalize().map_err(|err| {
1094        E::CanonicalizeSnapshotSourceDir(err, bank_snapshot_dir.as_ref().to_path_buf())
1095    })?;
1096    let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
1097    let src_snapshot_file = src_snapshot_dir.join(slot_str);
1098    symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
1099        .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
1100
1101    // Following the existing archive format, the status cache is under snapshots/, not under <slot>/
1102    // like in the snapshot dir.
1103    let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1104    let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1105    symlink::symlink_file(&src_status_cache, &staging_status_cache)
1106        .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
1107
1108    // The bank snapshot has the version file, so symlink it to the correct staging path
1109    let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
1110    let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1111    symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
1112        E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
1113    })?;
1114
1115    // Tar the staging directory into the archive at `staging_archive_path`
1116    let staging_archive_path = tar_dir.join(format!(
1117        "{}{}.{}",
1118        staging_dir_prefix,
1119        snapshot_slot,
1120        archive_format.extension(),
1121    ));
1122
1123    {
1124        let archive_file = fs::File::create(&staging_archive_path)
1125            .map_err(|err| E::CreateArchiveFile(err, staging_archive_path.clone()))?;
1126
1127        let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
1128            let mut archive = tar::Builder::new(encoder);
1129            // Disable sparse file handling.  This seems to be the root cause of an issue when
1130            // upgrading v2.0 to v2.1, and the tar crate from 0.4.41 to 0.4.42.
1131            // Since the tarball will still go through compression (zstd/etc) afterwards, disabling
1132            // sparse handling in the tar itself should be fine.
1133            //
1134            // Likely introduced in [^1].  Tracking resolution in [^2].
1135            // [^1] https://github.com/alexcrichton/tar-rs/pull/375
1136            // [^2] https://github.com/alexcrichton/tar-rs/issues/403
1137            archive.sparse(false);
1138            // Serialize the version and snapshots files before accounts so we can quickly determine the version
1139            // and other bank fields. This is necessary if we want to interleave unpacking with reconstruction
1140            archive
1141                .append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
1142                .map_err(E::ArchiveVersionFile)?;
1143            archive
1144                .append_dir_all(BANK_SNAPSHOTS_DIR, &staging_snapshots_dir)
1145                .map_err(E::ArchiveSnapshotsDir)?;
1146
1147            let storages_orderer = AccountStoragesOrderer::with_small_to_large_ratio(
1148                snapshot_storages,
1149                INTERLEAVE_TAR_ENTRIES_SMALL_TO_LARGE_RATIO,
1150            );
1151            for storage in storages_orderer.iter() {
1152                let path_in_archive = Path::new(ACCOUNTS_DIR)
1153                    .join(AccountsFile::file_name(storage.slot(), storage.id()));
1154
1155                let reader =
1156                    AccountStorageReader::new(storage, Some(snapshot_slot)).map_err(|err| {
1157                        E::AccountStorageReaderError(err, storage.path().to_path_buf())
1158                    })?;
1159                let mut header = tar::Header::new_gnu();
1160                header.set_path(path_in_archive).map_err(|err| {
1161                    E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1162                })?;
1163                header.set_size(reader.len() as u64);
1164                header.set_cksum();
1165                archive.append(&header, reader).map_err(|err| {
1166                    E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1167                })?;
1168            }
1169
1170            archive.into_inner().map_err(E::FinishArchive)?;
1171            Ok(())
1172        };
1173
1174        match archive_format {
1175            ArchiveFormat::TarZstd { config } => {
1176                let mut encoder =
1177                    zstd::stream::Encoder::new(archive_file, config.compression_level)
1178                        .map_err(E::CreateEncoder)?;
1179                do_archive_files(&mut encoder)?;
1180                encoder.finish().map_err(E::FinishEncoder)?;
1181            }
1182            ArchiveFormat::TarLz4 => {
1183                let mut encoder = lz4::EncoderBuilder::new()
1184                    .level(1)
1185                    .build(archive_file)
1186                    .map_err(E::CreateEncoder)?;
1187                do_archive_files(&mut encoder)?;
1188                let (_output, result) = encoder.finish();
1189                result.map_err(E::FinishEncoder)?;
1190            }
1191        };
1192    }
1193
1194    // Atomically move the archive into position for other validators to find
1195    let metadata = fs::metadata(&staging_archive_path)
1196        .map_err(|err| E::QueryArchiveMetadata(err, staging_archive_path.clone()))?;
1197    let archive_path = archive_path.as_ref().to_path_buf();
1198    fs::rename(&staging_archive_path, &archive_path)
1199        .map_err(|err| E::MoveArchive(err, staging_archive_path, archive_path.clone()))?;
1200
1201    timer.stop();
1202    info!(
1203        "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
1204        archive_path.display(),
1205        snapshot_slot,
1206        timer.as_ms(),
1207        metadata.len()
1208    );
1209
1210    datapoint_info!(
1211        "archive-snapshot-package",
1212        ("slot", snapshot_slot, i64),
1213        ("archive_format", archive_format.to_string(), String),
1214        ("duration_ms", timer.as_ms(), i64),
1215        (
1216            if snapshot_kind.is_full_snapshot() {
1217                "full-snapshot-archive-size"
1218            } else {
1219                "incremental-snapshot-archive-size"
1220            },
1221            metadata.len(),
1222            i64
1223        ),
1224    );
1225    Ok(SnapshotArchiveInfo {
1226        path: archive_path,
1227        slot: snapshot_slot,
1228        hash: snapshot_hash,
1229        archive_format,
1230    })
1231}
1232
1233/// Get the bank snapshots in a directory
1234pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1235    let mut bank_snapshots = Vec::default();
1236    match fs::read_dir(&bank_snapshots_dir) {
1237        Err(err) => {
1238            info!(
1239                "Unable to read bank snapshots directory '{}': {err}",
1240                bank_snapshots_dir.as_ref().display(),
1241            );
1242        }
1243        Ok(paths) => paths
1244            .filter_map(|entry| {
1245                // check if this entry is a directory and only a Slot
1246                // bank snapshots are bank_snapshots_dir/slot/slot(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION)
1247                entry
1248                    .ok()
1249                    .filter(|entry| entry.path().is_dir())
1250                    .and_then(|entry| {
1251                        entry
1252                            .path()
1253                            .file_name()
1254                            .and_then(|file_name| file_name.to_str())
1255                            .and_then(|file_name| file_name.parse::<Slot>().ok())
1256                    })
1257            })
1258            .for_each(
1259                |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
1260                    Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
1261                    // Other threads may be modifying bank snapshots in parallel; only return
1262                    // snapshots that are complete as deemed by BankSnapshotInfo::new_from_dir()
1263                    Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
1264                },
1265            ),
1266    }
1267    bank_snapshots
1268}
1269
1270/// Get the bank snapshots in a directory
1271///
1272/// This function retains only the bank snapshots of kind BankSnapshotKind::Pre
1273pub fn get_bank_snapshots_pre(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1274    let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1275    bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Pre);
1276    bank_snapshots
1277}
1278
1279/// Get the bank snapshots in a directory
1280///
1281/// This function retains only the bank snapshots of kind BankSnapshotKind::Post
1282pub fn get_bank_snapshots_post(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1283    let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1284    bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Post);
1285    bank_snapshots
1286}
1287
1288/// Get the bank snapshot with the highest slot in a directory
1289///
1290/// This function gets the highest bank snapshot of kind BankSnapshotKind::Pre
1291pub fn get_highest_bank_snapshot_pre(
1292    bank_snapshots_dir: impl AsRef<Path>,
1293) -> Option<BankSnapshotInfo> {
1294    do_get_highest_bank_snapshot(get_bank_snapshots_pre(bank_snapshots_dir))
1295}
1296
1297/// Get the bank snapshot with the highest slot in a directory
1298///
1299/// This function gets the highest bank snapshot of kind BankSnapshotKind::Post
1300pub fn get_highest_bank_snapshot_post(
1301    bank_snapshots_dir: impl AsRef<Path>,
1302) -> Option<BankSnapshotInfo> {
1303    do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
1304}
1305
1306/// Get the bank snapshot with the highest slot in a directory
1307///
1308/// This function gets the highest bank snapshot of any kind
1309pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
1310    do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
1311}
1312
1313fn do_get_highest_bank_snapshot(
1314    mut bank_snapshots: Vec<BankSnapshotInfo>,
1315) -> Option<BankSnapshotInfo> {
1316    bank_snapshots.sort_unstable();
1317    bank_snapshots.into_iter().next_back()
1318}
1319
1320pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
1321where
1322    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1323{
1324    serialize_snapshot_data_file_capped::<F>(
1325        data_file_path,
1326        MAX_SNAPSHOT_DATA_FILE_SIZE,
1327        serializer,
1328    )
1329}
1330
1331pub fn deserialize_snapshot_data_file<T: Sized>(
1332    data_file_path: &Path,
1333    deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
1334) -> Result<T> {
1335    let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
1336        deserializer(streams.full_snapshot_stream)
1337    };
1338
1339    let wrapped_data_file_path = SnapshotRootPaths {
1340        full_snapshot_root_file_path: data_file_path.to_path_buf(),
1341        incremental_snapshot_root_file_path: None,
1342    };
1343
1344    deserialize_snapshot_data_files_capped(
1345        &wrapped_data_file_path,
1346        MAX_SNAPSHOT_DATA_FILE_SIZE,
1347        wrapped_deserializer,
1348    )
1349}
1350
1351pub fn deserialize_snapshot_data_files<T: Sized>(
1352    snapshot_root_paths: &SnapshotRootPaths,
1353    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1354) -> Result<T> {
1355    deserialize_snapshot_data_files_capped(
1356        snapshot_root_paths,
1357        MAX_SNAPSHOT_DATA_FILE_SIZE,
1358        deserializer,
1359    )
1360}
1361
1362fn serialize_snapshot_data_file_capped<F>(
1363    data_file_path: &Path,
1364    maximum_file_size: u64,
1365    serializer: F,
1366) -> Result<u64>
1367where
1368    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1369{
1370    let data_file = fs::File::create(data_file_path)?;
1371    let mut data_file_stream = BufWriter::new(data_file);
1372    serializer(&mut data_file_stream)?;
1373    data_file_stream.flush()?;
1374
1375    let consumed_size = data_file_stream.stream_position()?;
1376    if consumed_size > maximum_file_size {
1377        let error_message = format!(
1378            "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
1379            data_file_path.display(),
1380        );
1381        return Err(IoError::other(error_message).into());
1382    }
1383    Ok(consumed_size)
1384}
1385
1386fn deserialize_snapshot_data_files_capped<T: Sized>(
1387    snapshot_root_paths: &SnapshotRootPaths,
1388    maximum_file_size: u64,
1389    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1390) -> Result<T> {
1391    let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
1392        create_snapshot_data_file_stream(
1393            &snapshot_root_paths.full_snapshot_root_file_path,
1394            maximum_file_size,
1395        )?;
1396
1397    let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
1398        if let Some(ref incremental_snapshot_root_file_path) =
1399            snapshot_root_paths.incremental_snapshot_root_file_path
1400        {
1401            Some(create_snapshot_data_file_stream(
1402                incremental_snapshot_root_file_path,
1403                maximum_file_size,
1404            )?)
1405        } else {
1406            None
1407        }
1408        .unzip();
1409
1410    let mut snapshot_streams = SnapshotStreams {
1411        full_snapshot_stream: &mut full_snapshot_data_file_stream,
1412        incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
1413    };
1414    let ret = deserializer(&mut snapshot_streams)?;
1415
1416    check_deserialize_file_consumed(
1417        full_snapshot_file_size,
1418        &snapshot_root_paths.full_snapshot_root_file_path,
1419        &mut full_snapshot_data_file_stream,
1420    )?;
1421
1422    if let Some(ref incremental_snapshot_root_file_path) =
1423        snapshot_root_paths.incremental_snapshot_root_file_path
1424    {
1425        check_deserialize_file_consumed(
1426            incremental_snapshot_file_size.unwrap(),
1427            incremental_snapshot_root_file_path,
1428            incremental_snapshot_data_file_stream.as_mut().unwrap(),
1429        )?;
1430    }
1431
1432    Ok(ret)
1433}
1434
1435/// Before running the deserializer function, perform common operations on the snapshot archive
1436/// files, such as checking the file size and opening the file into a stream.
1437fn create_snapshot_data_file_stream(
1438    snapshot_root_file_path: impl AsRef<Path>,
1439    maximum_file_size: u64,
1440) -> Result<(u64, BufReader<std::fs::File>)> {
1441    let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
1442
1443    if snapshot_file_size > maximum_file_size {
1444        let error_message = format!(
1445            "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
1446            snapshot_root_file_path.as_ref().display(),
1447            snapshot_file_size,
1448            maximum_file_size,
1449        );
1450        return Err(IoError::other(error_message).into());
1451    }
1452
1453    let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
1454    let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
1455
1456    Ok((snapshot_file_size, snapshot_data_file_stream))
1457}
1458
1459/// After running the deserializer function, perform common checks to ensure the snapshot archive
1460/// files were consumed correctly.
1461fn check_deserialize_file_consumed(
1462    file_size: u64,
1463    file_path: impl AsRef<Path>,
1464    file_stream: &mut BufReader<std::fs::File>,
1465) -> Result<()> {
1466    let consumed_size = file_stream.stream_position()?;
1467
1468    if consumed_size != file_size {
1469        let error_message = format!(
1470            "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to \
1471             deserialize",
1472            file_path.as_ref().display(),
1473            file_size,
1474            consumed_size,
1475        );
1476        return Err(IoError::other(error_message).into());
1477    }
1478
1479    Ok(())
1480}
1481
1482/// Return account path from the appendvec path after checking its format.
1483fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
1484    let run_path = appendvec_path.parent()?;
1485    let run_file_name = run_path.file_name()?;
1486    // All appendvec files should be under <account_path>/run/.
1487    // When generating the bank snapshot directory, they are hardlinked to <account_path>/snapshot/<slot>/
1488    if run_file_name != ACCOUNTS_RUN_DIR {
1489        error!(
1490            "The account path {} does not have run/ as its immediate parent directory.",
1491            run_path.display()
1492        );
1493        return None;
1494    }
1495    let account_path = run_path.parent()?;
1496    Some(account_path.to_path_buf())
1497}
1498
1499/// From an appendvec path, derive the snapshot hardlink path.  If the corresponding snapshot hardlink
1500/// directory does not exist, create it.
1501fn get_snapshot_accounts_hardlink_dir(
1502    appendvec_path: &Path,
1503    bank_slot: Slot,
1504    account_paths: &mut HashSet<PathBuf>,
1505    hardlinks_dir: impl AsRef<Path>,
1506) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1507    let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1508        GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1509    })?;
1510
1511    let snapshot_hardlink_dir = account_path
1512        .join(ACCOUNTS_SNAPSHOT_DIR)
1513        .join(bank_slot.to_string());
1514
1515    // Use the hashset to track, to avoid checking the file system.  Only set up the hardlink directory
1516    // and the symlink to it at the first time of seeing the account_path.
1517    if !account_paths.contains(&account_path) {
1518        let idx = account_paths.len();
1519        debug!(
1520            "for appendvec_path {}, create hard-link path {}",
1521            appendvec_path.display(),
1522            snapshot_hardlink_dir.display()
1523        );
1524        fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1525            GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1526                err,
1527                snapshot_hardlink_dir.clone(),
1528            )
1529        })?;
1530        let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1531        symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1532            GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1533                source: err,
1534                original: snapshot_hardlink_dir.clone(),
1535                link: symlink_path,
1536            }
1537        })?;
1538        account_paths.insert(account_path);
1539    };
1540
1541    Ok(snapshot_hardlink_dir)
1542}
1543
1544/// Hard-link the files from accounts/ to snapshot/<bank_slot>/accounts/
1545/// This keeps the appendvec files alive and with the bank snapshot.  The slot and id
1546/// in the file names are also updated in case its file is a recycled one with inconsistent slot
1547/// and id.
1548pub fn hard_link_storages_to_snapshot(
1549    bank_snapshot_dir: impl AsRef<Path>,
1550    bank_slot: Slot,
1551    snapshot_storages: &[Arc<AccountStorageEntry>],
1552) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1553    let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1554    fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1555        HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1556            err,
1557            accounts_hardlinks_dir.clone(),
1558        )
1559    })?;
1560
1561    let mut account_paths: HashSet<PathBuf> = HashSet::new();
1562    for storage in snapshot_storages {
1563        let storage_path = storage.accounts.path();
1564        let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1565            storage_path,
1566            bank_slot,
1567            &mut account_paths,
1568            &accounts_hardlinks_dir,
1569        )?;
1570        // The appendvec could be recycled, so its filename may not be consistent to the slot and id.
1571        // Use the storage slot and id to compose a consistent file name for the hard-link file.
1572        let hardlink_filename = AccountsFile::file_name(storage.slot(), storage.id());
1573        let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1574        fs::hard_link(storage_path, &hard_link_path).map_err(|err| {
1575            HardLinkStoragesToSnapshotError::HardLinkStorage(
1576                err,
1577                storage_path.to_path_buf(),
1578                hard_link_path,
1579            )
1580        })?;
1581    }
1582    Ok(())
1583}
1584
1585/// serializing needs Vec<Vec<Arc<AccountStorageEntry>>>, but data structure at runtime is Vec<Arc<AccountStorageEntry>>
1586/// translates to what we need
1587pub(crate) fn get_storages_to_serialize(
1588    snapshot_storages: &[Arc<AccountStorageEntry>],
1589) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1590    snapshot_storages
1591        .iter()
1592        .map(|storage| vec![Arc::clone(storage)])
1593        .collect::<Vec<_>>()
1594}
1595
1596/// Unarchives the given full and incremental snapshot archives, as long as they are compatible.
1597pub fn verify_and_unarchive_snapshots(
1598    bank_snapshots_dir: impl AsRef<Path>,
1599    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1600    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1601    account_paths: &[PathBuf],
1602    storage_access: StorageAccess,
1603) -> Result<(UnarchivedSnapshots, UnarchivedSnapshotsGuard)> {
1604    check_are_snapshots_compatible(
1605        full_snapshot_archive_info,
1606        incremental_snapshot_archive_info,
1607    )?;
1608
1609    let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
1610    let UnarchivedSnapshot {
1611        unpack_dir: full_unpack_dir,
1612        storage: full_storage,
1613        bank_fields: full_bank_fields,
1614        accounts_db_fields: full_accounts_db_fields,
1615        unpacked_snapshots_dir_and_version: full_unpacked_snapshots_dir_and_version,
1616        measure_untar: full_measure_untar,
1617    } = unarchive_snapshot(
1618        &bank_snapshots_dir,
1619        TMP_SNAPSHOT_ARCHIVE_PREFIX,
1620        full_snapshot_archive_info.path(),
1621        "snapshot untar",
1622        account_paths,
1623        full_snapshot_archive_info.archive_format(),
1624        next_append_vec_id.clone(),
1625        storage_access,
1626    )?;
1627
1628    let (
1629        incremental_unpack_dir,
1630        incremental_storage,
1631        incremental_bank_fields,
1632        incremental_accounts_db_fields,
1633        incremental_unpacked_snapshots_dir_and_version,
1634        incremental_measure_untar,
1635    ) = if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1636        let UnarchivedSnapshot {
1637            unpack_dir,
1638            storage,
1639            bank_fields,
1640            accounts_db_fields,
1641            unpacked_snapshots_dir_and_version,
1642            measure_untar,
1643        } = unarchive_snapshot(
1644            &bank_snapshots_dir,
1645            TMP_SNAPSHOT_ARCHIVE_PREFIX,
1646            incremental_snapshot_archive_info.path(),
1647            "incremental snapshot untar",
1648            account_paths,
1649            incremental_snapshot_archive_info.archive_format(),
1650            next_append_vec_id.clone(),
1651            storage_access,
1652        )?;
1653        (
1654            Some(unpack_dir),
1655            Some(storage),
1656            Some(bank_fields),
1657            Some(accounts_db_fields),
1658            Some(unpacked_snapshots_dir_and_version),
1659            Some(measure_untar),
1660        )
1661    } else {
1662        (None, None, None, None, None, None)
1663    };
1664
1665    let bank_fields = SnapshotBankFields::new(full_bank_fields, incremental_bank_fields);
1666    let accounts_db_fields =
1667        SnapshotAccountsDbFields::new(full_accounts_db_fields, incremental_accounts_db_fields);
1668    let next_append_vec_id = Arc::try_unwrap(next_append_vec_id).unwrap();
1669
1670    Ok((
1671        UnarchivedSnapshots {
1672            full_storage,
1673            incremental_storage,
1674            bank_fields,
1675            accounts_db_fields,
1676            full_unpacked_snapshots_dir_and_version,
1677            incremental_unpacked_snapshots_dir_and_version,
1678            full_measure_untar,
1679            incremental_measure_untar,
1680            next_append_vec_id,
1681        },
1682        UnarchivedSnapshotsGuard {
1683            full_unpack_dir,
1684            incremental_unpack_dir,
1685        },
1686    ))
1687}
1688
1689/// Streams unpacked files across channel
1690fn streaming_unarchive_snapshot(
1691    file_sender: Sender<PathBuf>,
1692    account_paths: Vec<PathBuf>,
1693    ledger_dir: PathBuf,
1694    snapshot_archive_path: PathBuf,
1695    archive_format: ArchiveFormat,
1696) -> JoinHandle<Result<()>> {
1697    Builder::new()
1698        .name("solTarUnpack".to_string())
1699        .spawn(move || {
1700            let archive_size = fs::metadata(&snapshot_archive_path)?.len();
1701            let buf_size = archive_size.min(MAX_SNAPSHOT_READER_BUF_SIZE);
1702            let decompressor =
1703                decompressed_tar_reader(archive_format, snapshot_archive_path, buf_size)?;
1704            hardened_unpack::streaming_unpack_snapshot(
1705                Archive::new(decompressor),
1706                archive_size,
1707                ledger_dir.as_path(),
1708                &account_paths,
1709                &file_sender,
1710            )?;
1711            Ok(())
1712        })
1713        .unwrap()
1714}
1715
1716fn decompressed_tar_reader(
1717    archive_format: ArchiveFormat,
1718    archive_path: impl AsRef<Path>,
1719    buf_size: u64,
1720) -> Result<ArchiveFormatDecompressor<Box<dyn BufRead + 'static>>> {
1721    let buf_reader =
1722        solana_accounts_db::large_file_buf_reader(archive_path.as_ref(), buf_size as usize)
1723            .map_err(|err| {
1724                io::Error::other(format!(
1725                    "failed to open snapshot archive '{}': {err}",
1726                    archive_path.as_ref().display(),
1727                ))
1728            })?;
1729    Ok(ArchiveFormatDecompressor::new(archive_format, buf_reader)?)
1730}
1731
1732/// Used to determine if a filename is structured like a version file, bank file, or storage file
1733#[derive(PartialEq, Debug)]
1734enum SnapshotFileKind {
1735    Version,
1736    BankFields,
1737    Storage,
1738}
1739
1740/// Determines `SnapshotFileKind` for `filename` if any
1741fn get_snapshot_file_kind(filename: &str) -> Option<SnapshotFileKind> {
1742    static VERSION_FILE_REGEX: LazyLock<Regex> =
1743        LazyLock::new(|| Regex::new(r"^version$").unwrap());
1744    static BANK_FIELDS_FILE_REGEX: LazyLock<Regex> =
1745        LazyLock::new(|| Regex::new(r"^[0-9]+(\.pre)?$").unwrap());
1746
1747    if VERSION_FILE_REGEX.is_match(filename) {
1748        Some(SnapshotFileKind::Version)
1749    } else if BANK_FIELDS_FILE_REGEX.is_match(filename) {
1750        Some(SnapshotFileKind::BankFields)
1751    } else if get_slot_and_append_vec_id(filename).is_ok() {
1752        Some(SnapshotFileKind::Storage)
1753    } else {
1754        None
1755    }
1756}
1757
1758/// Waits for snapshot file
1759/// Due to parallel unpacking, we may receive some append_vec files before the snapshot file
1760/// This function will push append_vec files into a buffer until we receive the snapshot file
1761fn get_version_and_snapshot_files(
1762    file_receiver: &Receiver<PathBuf>,
1763) -> Result<(PathBuf, PathBuf, Vec<PathBuf>)> {
1764    let mut append_vec_files = Vec::with_capacity(1024);
1765    let mut snapshot_version_path = None;
1766    let mut snapshot_file_path = None;
1767
1768    loop {
1769        if let Ok(path) = file_receiver.recv() {
1770            let filename = path.file_name().unwrap().to_str().unwrap();
1771            match get_snapshot_file_kind(filename) {
1772                Some(SnapshotFileKind::Version) => {
1773                    snapshot_version_path = Some(path);
1774
1775                    // break if we have both the snapshot file and the version file
1776                    if snapshot_file_path.is_some() {
1777                        break;
1778                    }
1779                }
1780                Some(SnapshotFileKind::BankFields) => {
1781                    snapshot_file_path = Some(path);
1782
1783                    // break if we have both the snapshot file and the version file
1784                    if snapshot_version_path.is_some() {
1785                        break;
1786                    }
1787                }
1788                Some(SnapshotFileKind::Storage) => {
1789                    append_vec_files.push(path);
1790                }
1791                None => {} // do nothing for other kinds of files
1792            }
1793        } else {
1794            return Err(SnapshotError::RebuildStorages(
1795                "did not receive snapshot file from unpacking threads".to_string(),
1796            ));
1797        }
1798    }
1799    let snapshot_version_path = snapshot_version_path.unwrap();
1800    let snapshot_file_path = snapshot_file_path.unwrap();
1801
1802    Ok((snapshot_version_path, snapshot_file_path, append_vec_files))
1803}
1804
1805/// Fields and information parsed from the snapshot.
1806struct SnapshotFieldsBundle {
1807    snapshot_version: SnapshotVersion,
1808    bank_fields: BankFieldsToDeserialize,
1809    accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
1810    append_vec_files: Vec<PathBuf>,
1811}
1812
1813/// Parses fields and information from the snapshot files provided by
1814/// `file_receiver`.
1815fn snapshot_fields_from_files(file_receiver: &Receiver<PathBuf>) -> Result<SnapshotFieldsBundle> {
1816    let (snapshot_version_path, snapshot_file_path, append_vec_files) =
1817        get_version_and_snapshot_files(file_receiver)?;
1818    let snapshot_version_str = snapshot_version_from_file(snapshot_version_path)?;
1819    let snapshot_version = snapshot_version_str.parse().map_err(|err| {
1820        IoError::other(format!(
1821            "unsupported snapshot version '{snapshot_version_str}': {err}",
1822        ))
1823    })?;
1824
1825    let snapshot_file = fs::File::open(snapshot_file_path).unwrap();
1826    let mut snapshot_stream = BufReader::new(snapshot_file);
1827    let (bank_fields, accounts_db_fields) = match snapshot_version {
1828        SnapshotVersion::V1_2_0 => serde_snapshot::fields_from_stream(&mut snapshot_stream)?,
1829    };
1830
1831    Ok(SnapshotFieldsBundle {
1832        snapshot_version,
1833        bank_fields,
1834        accounts_db_fields,
1835        append_vec_files,
1836    })
1837}
1838
1839/// BankSnapshotInfo::new_from_dir() requires a few meta files to accept a snapshot dir
1840/// as a valid one.  A dir unpacked from an archive lacks these files.  Fill them here to
1841/// allow new_from_dir() checks to pass.  These checks are not needed for unpacked dirs,
1842/// but it is not clean to add another flag to new_from_dir() to skip them.
1843fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1844    let snapshots_dir = unpack_dir.as_ref().join(BANK_SNAPSHOTS_DIR);
1845    if !snapshots_dir.is_dir() {
1846        return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1847    }
1848
1849    // The unpacked dir has a single slot dir, which is the snapshot slot dir.
1850    let slot_dir = std::fs::read_dir(&snapshots_dir)
1851        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1852        .find(|entry| entry.as_ref().unwrap().path().is_dir())
1853        .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1854        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1855        .path();
1856
1857    let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
1858    fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
1859
1860    let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1861    fs::hard_link(
1862        status_cache_file,
1863        slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
1864    )?;
1865
1866    write_snapshot_state_complete_file(slot_dir)?;
1867
1868    Ok(())
1869}
1870
1871/// Perform the common tasks when unarchiving a snapshot.  Handles creating the temporary
1872/// directories, untaring, reading the version file, and then returning those fields plus the
1873/// rebuilt storage
1874fn unarchive_snapshot(
1875    bank_snapshots_dir: impl AsRef<Path>,
1876    unpacked_snapshots_dir_prefix: &'static str,
1877    snapshot_archive_path: impl AsRef<Path>,
1878    measure_name: &'static str,
1879    account_paths: &[PathBuf],
1880    archive_format: ArchiveFormat,
1881    next_append_vec_id: Arc<AtomicAccountsFileId>,
1882    storage_access: StorageAccess,
1883) -> Result<UnarchivedSnapshot> {
1884    let unpack_dir = tempfile::Builder::new()
1885        .prefix(unpacked_snapshots_dir_prefix)
1886        .tempdir_in(bank_snapshots_dir)?;
1887    let unpacked_snapshots_dir = unpack_dir.path().join(BANK_SNAPSHOTS_DIR);
1888
1889    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1890    let unarchive_handle = streaming_unarchive_snapshot(
1891        file_sender,
1892        account_paths.to_vec(),
1893        unpack_dir.path().to_path_buf(),
1894        snapshot_archive_path.as_ref().to_path_buf(),
1895        archive_format,
1896    );
1897
1898    let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1899    let snapshot_result = snapshot_fields_from_files(&file_receiver).and_then(
1900        |SnapshotFieldsBundle {
1901             snapshot_version,
1902             bank_fields,
1903             accounts_db_fields,
1904             append_vec_files,
1905             ..
1906         }| {
1907            let (storage, measure_untar) = measure_time!(
1908                SnapshotStorageRebuilder::rebuild_storage(
1909                    &accounts_db_fields,
1910                    append_vec_files,
1911                    file_receiver,
1912                    num_rebuilder_threads,
1913                    next_append_vec_id,
1914                    SnapshotFrom::Archive,
1915                    storage_access,
1916                )?,
1917                measure_name
1918            );
1919            info!("{measure_untar}");
1920            create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1921
1922            Ok(UnarchivedSnapshot {
1923                unpack_dir,
1924                storage,
1925                bank_fields,
1926                accounts_db_fields,
1927                unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1928                    unpacked_snapshots_dir,
1929                    snapshot_version,
1930                },
1931                measure_untar,
1932            })
1933        },
1934    );
1935    unarchive_handle.join().unwrap()?;
1936    snapshot_result
1937}
1938
1939/// Streams snapshot dir files across channel
1940/// Follow the flow of streaming_unarchive_snapshot(), but handle the from_dir case.
1941fn streaming_snapshot_dir_files(
1942    file_sender: Sender<PathBuf>,
1943    snapshot_file_path: impl Into<PathBuf>,
1944    snapshot_version_path: impl Into<PathBuf>,
1945    account_paths: &[PathBuf],
1946) -> Result<()> {
1947    file_sender.send(snapshot_file_path.into())?;
1948    file_sender.send(snapshot_version_path.into())?;
1949
1950    for account_path in account_paths {
1951        for file in fs::read_dir(account_path)? {
1952            file_sender.send(file?.path())?;
1953        }
1954    }
1955
1956    Ok(())
1957}
1958
1959/// Performs the common tasks when deserializing a snapshot
1960///
1961/// Handles reading the snapshot file and version file,
1962/// then returning those fields plus the rebuilt storages.
1963pub fn rebuild_storages_from_snapshot_dir(
1964    snapshot_info: &BankSnapshotInfo,
1965    account_paths: &[PathBuf],
1966    next_append_vec_id: Arc<AtomicAccountsFileId>,
1967    storage_access: StorageAccess,
1968) -> Result<(
1969    AccountStorageMap,
1970    BankFieldsToDeserialize,
1971    AccountsDbFields<SerializableAccountStorageEntry>,
1972)> {
1973    let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1974    let accounts_hardlinks = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1975    let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1976
1977    let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1978        IoError::other(format!(
1979            "failed to read accounts hardlinks dir '{}': {err}",
1980            accounts_hardlinks.display(),
1981        ))
1982    })?;
1983    for dir_entry in read_dir {
1984        let symlink_path = dir_entry?.path();
1985        // The symlink point to <account_path>/snapshot/<slot> which contain the account files hardlinks
1986        // The corresponding run path should be <account_path>/run/
1987        let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
1988            IoError::other(format!(
1989                "failed to read symlink '{}': {err}",
1990                symlink_path.display(),
1991            ))
1992        })?;
1993        let account_run_path = account_snapshot_path
1994            .parent()
1995            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1996            .parent()
1997            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1998            .join(ACCOUNTS_RUN_DIR);
1999        if !account_run_paths.contains(&account_run_path) {
2000            // The appendvec from the bank snapshot storage does not match any of the provided account_paths set.
2001            // The accout paths have changed so the snapshot is no longer usable.
2002            return Err(SnapshotError::AccountPathsMismatch);
2003        }
2004        // Generate hard-links to make the account files available in the main accounts/, and let the new appendvec
2005        // paths be in accounts/
2006        let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
2007            IoError::other(format!(
2008                "failed to read account snapshot dir '{}': {err}",
2009                account_snapshot_path.display(),
2010            ))
2011        })?;
2012        for file in read_dir {
2013            let file_path = file?.path();
2014            let file_name = file_path
2015                .file_name()
2016                .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
2017            let dest_path = account_run_path.join(file_name);
2018            fs::hard_link(&file_path, &dest_path).map_err(|err| {
2019                IoError::other(format!(
2020                    "failed to hard link from '{}' to '{}': {err}",
2021                    file_path.display(),
2022                    dest_path.display(),
2023                ))
2024            })?;
2025        }
2026    }
2027
2028    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
2029    let snapshot_file_path = &snapshot_info.snapshot_path();
2030    let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
2031    streaming_snapshot_dir_files(
2032        file_sender,
2033        snapshot_file_path,
2034        snapshot_version_path,
2035        account_paths,
2036    )?;
2037
2038    let SnapshotFieldsBundle {
2039        bank_fields,
2040        accounts_db_fields,
2041        append_vec_files,
2042        ..
2043    } = snapshot_fields_from_files(&file_receiver)?;
2044
2045    let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
2046    let storage = SnapshotStorageRebuilder::rebuild_storage(
2047        &accounts_db_fields,
2048        append_vec_files,
2049        file_receiver,
2050        num_rebuilder_threads,
2051        next_append_vec_id,
2052        SnapshotFrom::Dir,
2053        storage_access,
2054    )?;
2055
2056    Ok((storage, bank_fields, accounts_db_fields))
2057}
2058
2059/// Reads the `snapshot_version` from a file. Before opening the file, its size
2060/// is compared to `MAX_SNAPSHOT_VERSION_FILE_SIZE`. If the size exceeds this
2061/// threshold, it is not opened and an error is returned.
2062fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
2063    // Check file size.
2064    let file_metadata = fs::metadata(&path).map_err(|err| {
2065        IoError::other(format!(
2066            "failed to query snapshot version file metadata '{}': {err}",
2067            path.as_ref().display(),
2068        ))
2069    })?;
2070    let file_size = file_metadata.len();
2071    if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
2072        let error_message = format!(
2073            "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
2074            path.as_ref().display(),
2075            file_size,
2076            MAX_SNAPSHOT_VERSION_FILE_SIZE,
2077        );
2078        return Err(IoError::other(error_message).into());
2079    }
2080
2081    // Read snapshot_version from file.
2082    let mut snapshot_version = String::new();
2083    let mut file = fs::File::open(&path).map_err(|err| {
2084        IoError::other(format!(
2085            "failed to open snapshot version file '{}': {err}",
2086            path.as_ref().display()
2087        ))
2088    })?;
2089    file.read_to_string(&mut snapshot_version).map_err(|err| {
2090        IoError::other(format!(
2091            "failed to read snapshot version from file '{}': {err}",
2092            path.as_ref().display()
2093        ))
2094    })?;
2095
2096    Ok(snapshot_version.trim().to_string())
2097}
2098
2099/// Check if an incremental snapshot is compatible with a full snapshot.  This is done by checking
2100/// if the incremental snapshot's base slot is the same as the full snapshot's slot.
2101fn check_are_snapshots_compatible(
2102    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
2103    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
2104) -> Result<()> {
2105    if incremental_snapshot_archive_info.is_none() {
2106        return Ok(());
2107    }
2108
2109    let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
2110
2111    (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
2112        .then_some(())
2113        .ok_or_else(|| {
2114            SnapshotError::MismatchedBaseSlot(
2115                full_snapshot_archive_info.slot(),
2116                incremental_snapshot_archive_info.base_slot(),
2117            )
2118        })
2119}
2120
2121/// Get the `&str` from a `&Path`
2122pub fn path_to_file_name_str(path: &Path) -> Result<&str> {
2123    path.file_name()
2124        .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))?
2125        .to_str()
2126        .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf()))
2127}
2128
2129pub fn build_snapshot_archives_remote_dir(snapshot_archives_dir: impl AsRef<Path>) -> PathBuf {
2130    snapshot_archives_dir
2131        .as_ref()
2132        .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
2133}
2134
2135/// Build the full snapshot archive path from its components: the snapshot archives directory, the
2136/// snapshot slot, the accounts hash, and the archive format.
2137pub fn build_full_snapshot_archive_path(
2138    full_snapshot_archives_dir: impl AsRef<Path>,
2139    slot: Slot,
2140    hash: &SnapshotHash,
2141    archive_format: ArchiveFormat,
2142) -> PathBuf {
2143    full_snapshot_archives_dir.as_ref().join(format!(
2144        "snapshot-{}-{}.{}",
2145        slot,
2146        hash.0,
2147        archive_format.extension(),
2148    ))
2149}
2150
2151/// Build the incremental snapshot archive path from its components: the snapshot archives
2152/// directory, the snapshot base slot, the snapshot slot, the accounts hash, and the archive
2153/// format.
2154pub fn build_incremental_snapshot_archive_path(
2155    incremental_snapshot_archives_dir: impl AsRef<Path>,
2156    base_slot: Slot,
2157    slot: Slot,
2158    hash: &SnapshotHash,
2159    archive_format: ArchiveFormat,
2160) -> PathBuf {
2161    incremental_snapshot_archives_dir.as_ref().join(format!(
2162        "incremental-snapshot-{}-{}-{}.{}",
2163        base_slot,
2164        slot,
2165        hash.0,
2166        archive_format.extension(),
2167    ))
2168}
2169
2170/// Parse a full snapshot archive filename into its Slot, Hash, and Archive Format
2171pub(crate) fn parse_full_snapshot_archive_filename(
2172    archive_filename: &str,
2173) -> Result<(Slot, SnapshotHash, ArchiveFormat)> {
2174    static RE: std::sync::LazyLock<Regex> =
2175        std::sync::LazyLock::new(|| Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap());
2176
2177    let do_parse = || {
2178        RE.captures(archive_filename).and_then(|captures| {
2179            let slot = captures
2180                .name("slot")
2181                .map(|x| x.as_str().parse::<Slot>())?
2182                .ok()?;
2183            let hash = captures
2184                .name("hash")
2185                .map(|x| x.as_str().parse::<Hash>())?
2186                .ok()?;
2187            let archive_format = captures
2188                .name("ext")
2189                .map(|x| x.as_str().parse::<ArchiveFormat>())?
2190                .ok()?;
2191
2192            Some((slot, SnapshotHash(hash), archive_format))
2193        })
2194    };
2195
2196    do_parse().ok_or_else(|| {
2197        SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2198    })
2199}
2200
2201/// Parse an incremental snapshot archive filename into its base Slot, actual Slot, Hash, and Archive Format
2202pub(crate) fn parse_incremental_snapshot_archive_filename(
2203    archive_filename: &str,
2204) -> Result<(Slot, Slot, SnapshotHash, ArchiveFormat)> {
2205    static RE: std::sync::LazyLock<Regex> = std::sync::LazyLock::new(|| {
2206        Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap()
2207    });
2208
2209    let do_parse = || {
2210        RE.captures(archive_filename).and_then(|captures| {
2211            let base_slot = captures
2212                .name("base")
2213                .map(|x| x.as_str().parse::<Slot>())?
2214                .ok()?;
2215            let slot = captures
2216                .name("slot")
2217                .map(|x| x.as_str().parse::<Slot>())?
2218                .ok()?;
2219            let hash = captures
2220                .name("hash")
2221                .map(|x| x.as_str().parse::<Hash>())?
2222                .ok()?;
2223            let archive_format = captures
2224                .name("ext")
2225                .map(|x| x.as_str().parse::<ArchiveFormat>())?
2226                .ok()?;
2227
2228            Some((base_slot, slot, SnapshotHash(hash), archive_format))
2229        })
2230    };
2231
2232    do_parse().ok_or_else(|| {
2233        SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2234    })
2235}
2236
2237/// Walk down the snapshot archive to collect snapshot archive file info
2238fn get_snapshot_archives<T, F>(snapshot_archives_dir: &Path, cb: F) -> Vec<T>
2239where
2240    F: Fn(PathBuf) -> Result<T>,
2241{
2242    let walk_dir = |dir: &Path| -> Vec<T> {
2243        let entry_iter = fs::read_dir(dir);
2244        match entry_iter {
2245            Err(err) => {
2246                info!(
2247                    "Unable to read snapshot archives directory '{}': {err}",
2248                    dir.display(),
2249                );
2250                vec![]
2251            }
2252            Ok(entries) => entries
2253                .filter_map(|entry| entry.map_or(None, |entry| cb(entry.path()).ok()))
2254                .collect(),
2255        }
2256    };
2257
2258    let mut ret = walk_dir(snapshot_archives_dir);
2259    let remote_dir = build_snapshot_archives_remote_dir(snapshot_archives_dir);
2260    if remote_dir.exists() {
2261        ret.append(&mut walk_dir(remote_dir.as_ref()));
2262    }
2263    ret
2264}
2265
2266/// Get a list of the full snapshot archives from a directory
2267pub fn get_full_snapshot_archives(
2268    full_snapshot_archives_dir: impl AsRef<Path>,
2269) -> Vec<FullSnapshotArchiveInfo> {
2270    get_snapshot_archives(
2271        full_snapshot_archives_dir.as_ref(),
2272        FullSnapshotArchiveInfo::new_from_path,
2273    )
2274}
2275
2276/// Get a list of the incremental snapshot archives from a directory
2277pub fn get_incremental_snapshot_archives(
2278    incremental_snapshot_archives_dir: impl AsRef<Path>,
2279) -> Vec<IncrementalSnapshotArchiveInfo> {
2280    get_snapshot_archives(
2281        incremental_snapshot_archives_dir.as_ref(),
2282        IncrementalSnapshotArchiveInfo::new_from_path,
2283    )
2284}
2285
2286/// Get the highest slot of the full snapshot archives in a directory
2287pub fn get_highest_full_snapshot_archive_slot(
2288    full_snapshot_archives_dir: impl AsRef<Path>,
2289) -> Option<Slot> {
2290    get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
2291        .map(|full_snapshot_archive_info| full_snapshot_archive_info.slot())
2292}
2293
2294/// Get the highest slot of the incremental snapshot archives in a directory, for a given full
2295/// snapshot slot
2296pub fn get_highest_incremental_snapshot_archive_slot(
2297    incremental_snapshot_archives_dir: impl AsRef<Path>,
2298    full_snapshot_slot: Slot,
2299) -> Option<Slot> {
2300    get_highest_incremental_snapshot_archive_info(
2301        incremental_snapshot_archives_dir,
2302        full_snapshot_slot,
2303    )
2304    .map(|incremental_snapshot_archive_info| incremental_snapshot_archive_info.slot())
2305}
2306
2307/// Get the path (and metadata) for the full snapshot archive with the highest slot in a directory
2308pub fn get_highest_full_snapshot_archive_info(
2309    full_snapshot_archives_dir: impl AsRef<Path>,
2310) -> Option<FullSnapshotArchiveInfo> {
2311    let mut full_snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2312    full_snapshot_archives.sort_unstable();
2313    full_snapshot_archives.into_iter().next_back()
2314}
2315
2316/// Get the path for the incremental snapshot archive with the highest slot, for a given full
2317/// snapshot slot, in a directory
2318pub fn get_highest_incremental_snapshot_archive_info(
2319    incremental_snapshot_archives_dir: impl AsRef<Path>,
2320    full_snapshot_slot: Slot,
2321) -> Option<IncrementalSnapshotArchiveInfo> {
2322    // Since we want to filter down to only the incremental snapshot archives that have the same
2323    // full snapshot slot as the value passed in, perform the filtering before sorting to avoid
2324    // doing unnecessary work.
2325    let mut incremental_snapshot_archives =
2326        get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
2327            .into_iter()
2328            .filter(|incremental_snapshot_archive_info| {
2329                incremental_snapshot_archive_info.base_slot() == full_snapshot_slot
2330            })
2331            .collect::<Vec<_>>();
2332    incremental_snapshot_archives.sort_unstable();
2333    incremental_snapshot_archives.into_iter().next_back()
2334}
2335
2336pub fn purge_old_snapshot_archives(
2337    full_snapshot_archives_dir: impl AsRef<Path>,
2338    incremental_snapshot_archives_dir: impl AsRef<Path>,
2339    maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2340    maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2341) {
2342    info!(
2343        "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
2344        full_snapshot_archives_dir.as_ref().display(),
2345        maximum_full_snapshot_archives_to_retain
2346    );
2347
2348    let mut full_snapshot_archives = get_full_snapshot_archives(&full_snapshot_archives_dir);
2349    full_snapshot_archives.sort_unstable();
2350    full_snapshot_archives.reverse();
2351
2352    let num_to_retain = full_snapshot_archives
2353        .len()
2354        .min(maximum_full_snapshot_archives_to_retain.get());
2355    trace!(
2356        "There are {} full snapshot archives, retaining {}",
2357        full_snapshot_archives.len(),
2358        num_to_retain,
2359    );
2360
2361    let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
2362        if full_snapshot_archives.is_empty() {
2363            None
2364        } else {
2365            Some(full_snapshot_archives.split_at(num_to_retain))
2366        }
2367        .unwrap_or_default();
2368
2369    let retained_full_snapshot_slots = full_snapshot_archives_to_retain
2370        .iter()
2371        .map(|ai| ai.slot())
2372        .collect::<HashSet<_>>();
2373
2374    fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
2375        for path in archives.iter().map(|a| a.path()) {
2376            trace!("Removing snapshot archive: {}", path.display());
2377            let result = fs::remove_file(path);
2378            if let Err(err) = result {
2379                info!(
2380                    "Failed to remove snapshot archive '{}': {err}",
2381                    path.display()
2382                );
2383            }
2384        }
2385    }
2386    remove_archives(full_snapshot_archives_to_remove);
2387
2388    info!(
2389        "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
2390        incremental_snapshot_archives_dir.as_ref().display(),
2391        maximum_incremental_snapshot_archives_to_retain
2392    );
2393    let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
2394    for incremental_snapshot_archive in
2395        get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
2396    {
2397        incremental_snapshot_archives_by_base_slot
2398            .entry(incremental_snapshot_archive.base_slot())
2399            .or_default()
2400            .push(incremental_snapshot_archive)
2401    }
2402
2403    let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
2404    for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
2405    {
2406        incremental_snapshot_archives.sort_unstable();
2407        let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
2408            maximum_incremental_snapshot_archives_to_retain.get()
2409        } else {
2410            usize::from(retained_full_snapshot_slots.contains(&base_slot))
2411        };
2412        trace!(
2413            "There are {} incremental snapshot archives for base slot {}, removing {} of them",
2414            incremental_snapshot_archives.len(),
2415            base_slot,
2416            incremental_snapshot_archives
2417                .len()
2418                .saturating_sub(num_to_retain),
2419        );
2420
2421        incremental_snapshot_archives.truncate(
2422            incremental_snapshot_archives
2423                .len()
2424                .saturating_sub(num_to_retain),
2425        );
2426        remove_archives(&incremental_snapshot_archives);
2427    }
2428}
2429
2430#[cfg(feature = "dev-context-only-utils")]
2431fn unpack_snapshot_local(
2432    snapshot_path: impl AsRef<Path>,
2433    archive_format: ArchiveFormat,
2434    ledger_dir: &Path,
2435    account_paths: &[PathBuf],
2436    num_threads: usize,
2437) -> Result<UnpackedAppendVecMap> {
2438    assert!(num_threads > 0);
2439    let archive_size = fs::metadata(&snapshot_path)?.len();
2440    let archive = Archive::new(decompressed_tar_reader(
2441        archive_format,
2442        snapshot_path,
2443        archive_size,
2444    )?);
2445    let unpacked_append_vec_map =
2446        hardened_unpack::unpack_snapshot(archive, archive_size, ledger_dir, account_paths)?;
2447
2448    Ok(unpacked_append_vec_map)
2449}
2450
2451pub fn verify_unpacked_snapshots_dir_and_version(
2452    unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
2453) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
2454    info!(
2455        "snapshot version: {}",
2456        &unpacked_snapshots_dir_and_version.snapshot_version
2457    );
2458
2459    let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
2460    let mut bank_snapshots =
2461        get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
2462    if bank_snapshots.len() > 1 {
2463        return Err(IoError::other(format!(
2464            "invalid snapshot format: only one snapshot allowed, but found {}",
2465            bank_snapshots.len(),
2466        ))
2467        .into());
2468    }
2469    let root_paths = bank_snapshots.pop().ok_or_else(|| {
2470        IoError::other(format!(
2471            "no snapshots found in snapshots directory '{}'",
2472            unpacked_snapshots_dir_and_version
2473                .unpacked_snapshots_dir
2474                .display(),
2475        ))
2476    })?;
2477    Ok((snapshot_version, root_paths))
2478}
2479
2480/// Returns the file name of the bank snapshot for `slot`
2481pub fn get_snapshot_file_name(slot: Slot) -> String {
2482    slot.to_string()
2483}
2484
2485/// Constructs the path to the bank snapshot directory for `slot` within `bank_snapshots_dir`
2486pub fn get_bank_snapshot_dir(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) -> PathBuf {
2487    bank_snapshots_dir
2488        .as_ref()
2489        .join(get_snapshot_file_name(slot))
2490}
2491
2492#[derive(Debug, Copy, Clone)]
2493/// allow tests to specify what happened to the serialized format
2494pub enum VerifyBank {
2495    /// the bank's serialized format is expected to be identical to what we are comparing against
2496    Deterministic,
2497    /// the serialized bank was 'reserialized' into a non-deterministic format
2498    /// so, deserialize both files and compare deserialized results
2499    NonDeterministic,
2500}
2501
2502#[cfg(feature = "dev-context-only-utils")]
2503pub fn verify_snapshot_archive(
2504    snapshot_archive: impl AsRef<Path>,
2505    snapshots_to_verify: impl AsRef<Path>,
2506    archive_format: ArchiveFormat,
2507    verify_bank: VerifyBank,
2508    slot: Slot,
2509) {
2510    let temp_dir = tempfile::TempDir::new().unwrap();
2511    let unpack_dir = temp_dir.path();
2512    let unpack_account_dir = create_accounts_run_and_snapshot_dirs(unpack_dir).unwrap().0;
2513    unpack_snapshot_local(
2514        snapshot_archive,
2515        archive_format,
2516        unpack_dir,
2517        &[unpack_account_dir.clone()],
2518        1,
2519    )
2520    .unwrap();
2521
2522    // Check snapshots are the same
2523    let unpacked_snapshots = unpack_dir.join(BANK_SNAPSHOTS_DIR);
2524
2525    // Since the unpack code collects all the appendvecs into one directory unpack_account_dir, we need to
2526    // collect all the appendvecs in account_paths/<slot>/snapshot/ into one directory for later comparison.
2527    let storages_to_verify = unpack_dir.join("storages_to_verify");
2528    // Create the directory if it doesn't exist
2529    fs::create_dir_all(&storages_to_verify).unwrap();
2530
2531    let slot = slot.to_string();
2532    let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
2533
2534    if let VerifyBank::NonDeterministic = verify_bank {
2535        // file contents may be different, but deserialized structs should be equal
2536        let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
2537        let p2 = unpacked_snapshots.join(&slot).join(&slot);
2538        assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
2539        fs::remove_file(p1).unwrap();
2540        fs::remove_file(p2).unwrap();
2541    }
2542
2543    // The new the status_cache file is inside the slot directory together with the snapshot file.
2544    // When unpacking an archive, the status_cache file from the archive is one-level up outside of
2545    //  the slot directory.
2546    // The unpacked status_cache file need to be put back into the slot directory for the directory
2547    // comparison to pass.
2548    let existing_unpacked_status_cache_file =
2549        unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2550    let new_unpacked_status_cache_file = unpacked_snapshots
2551        .join(&slot)
2552        .join(SNAPSHOT_STATUS_CACHE_FILENAME);
2553    fs::rename(
2554        existing_unpacked_status_cache_file,
2555        new_unpacked_status_cache_file,
2556    )
2557    .unwrap();
2558
2559    let accounts_hardlinks_dir = snapshot_slot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2560    if accounts_hardlinks_dir.is_dir() {
2561        // This directory contain symlinks to all <account_path>/snapshot/<slot> directories.
2562        for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
2563            let link_dst_path = fs::read_link(entry.unwrap().path()).unwrap();
2564            // Copy all the files in dst_path into the storages_to_verify directory.
2565            for entry in fs::read_dir(&link_dst_path).unwrap() {
2566                let src_path = entry.unwrap().path();
2567                let dst_path = storages_to_verify.join(src_path.file_name().unwrap());
2568                fs::copy(src_path, dst_path).unwrap();
2569            }
2570        }
2571        fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
2572    }
2573
2574    let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
2575    if version_path.is_file() {
2576        fs::remove_file(version_path).unwrap();
2577    }
2578
2579    let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2580    if state_complete_path.is_file() {
2581        fs::remove_file(state_complete_path).unwrap();
2582    }
2583
2584    assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
2585
2586    // In the unarchiving case, there is an extra empty "accounts" directory. The account
2587    // files in the archive accounts/ have been expanded to [account_paths].
2588    // Remove the empty "accounts" directory for the directory comparison below.
2589    // In some test cases the directory to compare do not come from unarchiving.
2590    // Ignore the error when this directory does not exist.
2591    _ = fs::remove_dir(unpack_account_dir.join("accounts"));
2592    // Check the account entries are the same
2593    assert!(!dir_diff::is_different(&storages_to_verify, unpack_account_dir).unwrap());
2594}
2595
2596/// Purges all bank snapshots
2597pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
2598    let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2599    purge_bank_snapshots(&bank_snapshots);
2600}
2601
2602/// Purges bank snapshots, retaining the newest `num_bank_snapshots_to_retain`
2603pub fn purge_old_bank_snapshots(
2604    bank_snapshots_dir: impl AsRef<Path>,
2605    num_bank_snapshots_to_retain: usize,
2606    filter_by_kind: Option<BankSnapshotKind>,
2607) {
2608    let mut bank_snapshots = match filter_by_kind {
2609        Some(BankSnapshotKind::Pre) => get_bank_snapshots_pre(&bank_snapshots_dir),
2610        Some(BankSnapshotKind::Post) => get_bank_snapshots_post(&bank_snapshots_dir),
2611        None => get_bank_snapshots(&bank_snapshots_dir),
2612    };
2613
2614    bank_snapshots.sort_unstable();
2615    purge_bank_snapshots(
2616        bank_snapshots
2617            .iter()
2618            .rev()
2619            .skip(num_bank_snapshots_to_retain),
2620    );
2621}
2622
2623/// At startup, purge old (i.e. unusable) bank snapshots
2624///
2625/// Only a single bank snapshot could be needed at startup (when using fast boot), so
2626/// retain the highest bank snapshot "post", and purge the rest.
2627pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
2628    purge_old_bank_snapshots(&bank_snapshots_dir, 0, Some(BankSnapshotKind::Pre));
2629    purge_old_bank_snapshots(&bank_snapshots_dir, 1, Some(BankSnapshotKind::Post));
2630
2631    let highest_bank_snapshot_post = get_highest_bank_snapshot_post(&bank_snapshots_dir);
2632    if let Some(highest_bank_snapshot_post) = highest_bank_snapshot_post {
2633        debug!(
2634            "Retained bank snapshot for slot {}, and purged the rest.",
2635            highest_bank_snapshot_post.slot
2636        );
2637    }
2638}
2639
2640/// Purges bank snapshots that are older than `slot`
2641pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
2642    let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2643    bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
2644    purge_bank_snapshots(&bank_snapshots);
2645}
2646
2647/// Purges all `bank_snapshots`
2648///
2649/// Does not exit early if there is an error while purging a bank snapshot.
2650fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
2651    for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
2652        if purge_bank_snapshot(snapshot_dir).is_err() {
2653            warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
2654        }
2655    }
2656}
2657
2658/// Remove the bank snapshot at this path
2659pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
2660    const FN_ERR: &str = "failed to purge bank snapshot";
2661    let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2662    if accounts_hardlinks_dir.is_dir() {
2663        // This directory contain symlinks to all accounts snapshot directories.
2664        // They should all be removed.
2665        let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
2666            IoError::other(format!(
2667                "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
2668                accounts_hardlinks_dir.display(),
2669            ))
2670        })?;
2671        for entry in read_dir {
2672            let accounts_hardlink_dir = entry?.path();
2673            let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
2674                IoError::other(format!(
2675                    "{FN_ERR}: failed to read symlink '{}': {err}",
2676                    accounts_hardlink_dir.display(),
2677                ))
2678            })?;
2679            move_and_async_delete_path(&accounts_hardlink_dir);
2680        }
2681    }
2682    fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
2683        IoError::other(format!(
2684            "{FN_ERR}: failed to remove dir '{}': {err}",
2685            bank_snapshot_dir.as_ref().display(),
2686        ))
2687    })?;
2688    Ok(())
2689}
2690
2691pub fn should_take_full_snapshot(
2692    block_height: Slot,
2693    full_snapshot_archive_interval_slots: Slot,
2694) -> bool {
2695    block_height % full_snapshot_archive_interval_slots == 0
2696}
2697
2698pub fn should_take_incremental_snapshot(
2699    block_height: Slot,
2700    incremental_snapshot_archive_interval_slots: Slot,
2701    latest_full_snapshot_slot: Option<Slot>,
2702) -> bool {
2703    block_height % incremental_snapshot_archive_interval_slots == 0
2704        && latest_full_snapshot_slot.is_some()
2705}
2706
2707/// Creates an "accounts path" directory for tests
2708///
2709/// This temporary directory will contain the "run" and "snapshot"
2710/// sub-directories required by a validator.
2711#[cfg(feature = "dev-context-only-utils")]
2712pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
2713    let tmp_dir = tempfile::TempDir::new().unwrap();
2714    let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
2715    (tmp_dir, account_dir)
2716}
2717
2718#[cfg(test)]
2719mod tests {
2720    use {
2721        super::*,
2722        assert_matches::assert_matches,
2723        bincode::{deserialize_from, serialize_into},
2724        std::{convert::TryFrom, mem::size_of},
2725        tempfile::NamedTempFile,
2726    };
2727
2728    #[test]
2729    fn test_serialize_snapshot_data_file_under_limit() {
2730        let temp_dir = tempfile::TempDir::new().unwrap();
2731        let expected_consumed_size = size_of::<u32>() as u64;
2732        let consumed_size = serialize_snapshot_data_file_capped(
2733            &temp_dir.path().join("data-file"),
2734            expected_consumed_size,
2735            |stream| {
2736                serialize_into(stream, &2323_u32)?;
2737                Ok(())
2738            },
2739        )
2740        .unwrap();
2741        assert_eq!(consumed_size, expected_consumed_size);
2742    }
2743
2744    #[test]
2745    fn test_serialize_snapshot_data_file_over_limit() {
2746        let temp_dir = tempfile::TempDir::new().unwrap();
2747        let expected_consumed_size = size_of::<u32>() as u64;
2748        let result = serialize_snapshot_data_file_capped(
2749            &temp_dir.path().join("data-file"),
2750            expected_consumed_size - 1,
2751            |stream| {
2752                serialize_into(stream, &2323_u32)?;
2753                Ok(())
2754            },
2755        );
2756        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
2757    }
2758
2759    #[test]
2760    fn test_deserialize_snapshot_data_file_under_limit() {
2761        let expected_data = 2323_u32;
2762        let expected_consumed_size = size_of::<u32>() as u64;
2763
2764        let temp_dir = tempfile::TempDir::new().unwrap();
2765        serialize_snapshot_data_file_capped(
2766            &temp_dir.path().join("data-file"),
2767            expected_consumed_size,
2768            |stream| {
2769                serialize_into(stream, &expected_data)?;
2770                Ok(())
2771            },
2772        )
2773        .unwrap();
2774
2775        let snapshot_root_paths = SnapshotRootPaths {
2776            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2777            incremental_snapshot_root_file_path: None,
2778        };
2779
2780        let actual_data = deserialize_snapshot_data_files_capped(
2781            &snapshot_root_paths,
2782            expected_consumed_size,
2783            |stream| {
2784                Ok(deserialize_from::<_, u32>(
2785                    &mut stream.full_snapshot_stream,
2786                )?)
2787            },
2788        )
2789        .unwrap();
2790        assert_eq!(actual_data, expected_data);
2791    }
2792
2793    #[test]
2794    fn test_deserialize_snapshot_data_file_over_limit() {
2795        let expected_data = 2323_u32;
2796        let expected_consumed_size = size_of::<u32>() as u64;
2797
2798        let temp_dir = tempfile::TempDir::new().unwrap();
2799        serialize_snapshot_data_file_capped(
2800            &temp_dir.path().join("data-file"),
2801            expected_consumed_size,
2802            |stream| {
2803                serialize_into(stream, &expected_data)?;
2804                Ok(())
2805            },
2806        )
2807        .unwrap();
2808
2809        let snapshot_root_paths = SnapshotRootPaths {
2810            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2811            incremental_snapshot_root_file_path: None,
2812        };
2813
2814        let result = deserialize_snapshot_data_files_capped(
2815            &snapshot_root_paths,
2816            expected_consumed_size - 1,
2817            |stream| {
2818                Ok(deserialize_from::<_, u32>(
2819                    &mut stream.full_snapshot_stream,
2820                )?)
2821            },
2822        );
2823        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
2824    }
2825
2826    #[test]
2827    fn test_deserialize_snapshot_data_file_extra_data() {
2828        let expected_data = 2323_u32;
2829        let expected_consumed_size = size_of::<u32>() as u64;
2830
2831        let temp_dir = tempfile::TempDir::new().unwrap();
2832        serialize_snapshot_data_file_capped(
2833            &temp_dir.path().join("data-file"),
2834            expected_consumed_size * 2,
2835            |stream| {
2836                serialize_into(stream.by_ref(), &expected_data)?;
2837                serialize_into(stream.by_ref(), &expected_data)?;
2838                Ok(())
2839            },
2840        )
2841        .unwrap();
2842
2843        let snapshot_root_paths = SnapshotRootPaths {
2844            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2845            incremental_snapshot_root_file_path: None,
2846        };
2847
2848        let result = deserialize_snapshot_data_files_capped(
2849            &snapshot_root_paths,
2850            expected_consumed_size * 2,
2851            |stream| {
2852                Ok(deserialize_from::<_, u32>(
2853                    &mut stream.full_snapshot_stream,
2854                )?)
2855            },
2856        );
2857        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2858    }
2859
2860    #[test]
2861    fn test_snapshot_version_from_file_under_limit() {
2862        let file_content = SnapshotVersion::default().as_str();
2863        let mut file = NamedTempFile::new().unwrap();
2864        file.write_all(file_content.as_bytes()).unwrap();
2865        let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2866        assert_eq!(version_from_file, file_content);
2867    }
2868
2869    #[test]
2870    fn test_snapshot_version_from_file_over_limit() {
2871        let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2872        let file_content = vec![7u8; over_limit_size];
2873        let mut file = NamedTempFile::new().unwrap();
2874        file.write_all(&file_content).unwrap();
2875        assert_matches!(
2876            snapshot_version_from_file(file.path()),
2877            Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("snapshot version file too large")
2878        );
2879    }
2880
2881    #[test]
2882    fn test_parse_full_snapshot_archive_filename() {
2883        assert_eq!(
2884            parse_full_snapshot_archive_filename(&format!(
2885                "snapshot-43-{}.tar.zst",
2886                Hash::default()
2887            ))
2888            .unwrap(),
2889            (
2890                43,
2891                SnapshotHash(Hash::default()),
2892                ArchiveFormat::TarZstd {
2893                    config: ZstdConfig::default(),
2894                }
2895            )
2896        );
2897        assert_eq!(
2898            parse_full_snapshot_archive_filename(&format!(
2899                "snapshot-45-{}.tar.lz4",
2900                Hash::default()
2901            ))
2902            .unwrap(),
2903            (45, SnapshotHash(Hash::default()), ArchiveFormat::TarLz4)
2904        );
2905
2906        assert!(parse_full_snapshot_archive_filename("invalid").is_err());
2907        assert!(
2908            parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err()
2909        );
2910
2911        assert!(
2912            parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err()
2913        );
2914        assert!(parse_full_snapshot_archive_filename(&format!(
2915            "snapshot-12345678-{}.bad!ext",
2916            Hash::new_unique()
2917        ))
2918        .is_err());
2919        assert!(
2920            parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar.zst").is_err()
2921        );
2922
2923        assert!(parse_full_snapshot_archive_filename(&format!(
2924            "snapshot-bad!slot-{}.bad!ext",
2925            Hash::new_unique()
2926        ))
2927        .is_err());
2928        assert!(parse_full_snapshot_archive_filename(&format!(
2929            "snapshot-12345678-{}.bad!ext",
2930            Hash::new_unique()
2931        ))
2932        .is_err());
2933        assert!(parse_full_snapshot_archive_filename(&format!(
2934            "snapshot-bad!slot-{}.tar.zst",
2935            Hash::new_unique()
2936        ))
2937        .is_err());
2938
2939        assert!(
2940            parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar.zst").is_err()
2941        );
2942        assert!(
2943            parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar.zst").is_err()
2944        );
2945        assert!(parse_full_snapshot_archive_filename(&format!(
2946            "snapshot-bad!slot-{}.tar.zst",
2947            Hash::new_unique()
2948        ))
2949        .is_err());
2950    }
2951
2952    #[test]
2953    fn test_parse_incremental_snapshot_archive_filename() {
2954        assert_eq!(
2955            parse_incremental_snapshot_archive_filename(&format!(
2956                "incremental-snapshot-43-234-{}.tar.zst",
2957                Hash::default()
2958            ))
2959            .unwrap(),
2960            (
2961                43,
2962                234,
2963                SnapshotHash(Hash::default()),
2964                ArchiveFormat::TarZstd {
2965                    config: ZstdConfig::default(),
2966                }
2967            )
2968        );
2969        assert_eq!(
2970            parse_incremental_snapshot_archive_filename(&format!(
2971                "incremental-snapshot-45-456-{}.tar.lz4",
2972                Hash::default()
2973            ))
2974            .unwrap(),
2975            (
2976                45,
2977                456,
2978                SnapshotHash(Hash::default()),
2979                ArchiveFormat::TarLz4
2980            )
2981        );
2982
2983        assert!(parse_incremental_snapshot_archive_filename("invalid").is_err());
2984        assert!(parse_incremental_snapshot_archive_filename(&format!(
2985            "snapshot-42-{}.tar.zst",
2986            Hash::new_unique()
2987        ))
2988        .is_err());
2989        assert!(parse_incremental_snapshot_archive_filename(
2990            "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext"
2991        )
2992        .is_err());
2993
2994        assert!(parse_incremental_snapshot_archive_filename(&format!(
2995            "incremental-snapshot-bad!slot-56785678-{}.tar.zst",
2996            Hash::new_unique()
2997        ))
2998        .is_err());
2999
3000        assert!(parse_incremental_snapshot_archive_filename(&format!(
3001            "incremental-snapshot-12345678-bad!slot-{}.tar.zst",
3002            Hash::new_unique()
3003        ))
3004        .is_err());
3005
3006        assert!(parse_incremental_snapshot_archive_filename(
3007            "incremental-snapshot-12341234-56785678-bad!HASH.tar.zst"
3008        )
3009        .is_err());
3010
3011        assert!(parse_incremental_snapshot_archive_filename(&format!(
3012            "incremental-snapshot-12341234-56785678-{}.bad!ext",
3013            Hash::new_unique()
3014        ))
3015        .is_err());
3016    }
3017
3018    #[test]
3019    fn test_check_are_snapshots_compatible() {
3020        let slot1: Slot = 1234;
3021        let slot2: Slot = 5678;
3022        let slot3: Slot = 999_999;
3023
3024        let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
3025            format!("/dir/snapshot-{}-{}.tar.zst", slot1, Hash::new_unique()),
3026        ))
3027        .unwrap();
3028
3029        assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
3030
3031        let incremental_snapshot_archive_info =
3032            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
3033                "/dir/incremental-snapshot-{}-{}-{}.tar.zst",
3034                slot1,
3035                slot2,
3036                Hash::new_unique()
3037            )))
3038            .unwrap();
3039
3040        assert!(check_are_snapshots_compatible(
3041            &full_snapshot_archive_info,
3042            Some(&incremental_snapshot_archive_info)
3043        )
3044        .is_ok());
3045
3046        let incremental_snapshot_archive_info =
3047            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
3048                "/dir/incremental-snapshot-{}-{}-{}.tar.zst",
3049                slot2,
3050                slot3,
3051                Hash::new_unique()
3052            )))
3053            .unwrap();
3054
3055        assert!(check_are_snapshots_compatible(
3056            &full_snapshot_archive_info,
3057            Some(&incremental_snapshot_archive_info)
3058        )
3059        .is_err());
3060    }
3061
3062    /// A test heler function that creates bank snapshot files
3063    fn common_create_bank_snapshot_files(
3064        bank_snapshots_dir: &Path,
3065        min_slot: Slot,
3066        max_slot: Slot,
3067    ) {
3068        for slot in min_slot..max_slot {
3069            let snapshot_dir = get_bank_snapshot_dir(bank_snapshots_dir, slot);
3070            fs::create_dir_all(&snapshot_dir).unwrap();
3071
3072            let snapshot_filename = get_snapshot_file_name(slot);
3073            let snapshot_path = snapshot_dir.join(snapshot_filename);
3074            fs::File::create(snapshot_path).unwrap();
3075
3076            let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
3077            fs::File::create(status_cache_file).unwrap();
3078
3079            let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
3080            fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
3081
3082            // Mark this directory complete so it can be used.  Check this flag first before selecting for deserialization.
3083            write_snapshot_state_complete_file(snapshot_dir).unwrap();
3084        }
3085    }
3086
3087    #[test]
3088    fn test_get_bank_snapshots() {
3089        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
3090        let min_slot = 10;
3091        let max_slot = 20;
3092        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
3093
3094        let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
3095        assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
3096    }
3097
3098    #[test]
3099    fn test_get_highest_bank_snapshot_post() {
3100        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
3101        let min_slot = 99;
3102        let max_slot = 123;
3103        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
3104
3105        let highest_bank_snapshot = get_highest_bank_snapshot_post(temp_snapshots_dir.path());
3106        assert!(highest_bank_snapshot.is_some());
3107        assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
3108    }
3109
3110    /// A test helper function that creates full and incremental snapshot archive files.  Creates
3111    /// full snapshot files in the range (`min_full_snapshot_slot`, `max_full_snapshot_slot`], and
3112    /// incremental snapshot files in the range (`min_incremental_snapshot_slot`,
3113    /// `max_incremental_snapshot_slot`].  Additionally, "bad" files are created for both full and
3114    /// incremental snapshots to ensure the tests properly filter them out.
3115    fn common_create_snapshot_archive_files(
3116        full_snapshot_archives_dir: &Path,
3117        incremental_snapshot_archives_dir: &Path,
3118        min_full_snapshot_slot: Slot,
3119        max_full_snapshot_slot: Slot,
3120        min_incremental_snapshot_slot: Slot,
3121        max_incremental_snapshot_slot: Slot,
3122    ) {
3123        fs::create_dir_all(full_snapshot_archives_dir).unwrap();
3124        fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
3125        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3126            for incremental_snapshot_slot in
3127                min_incremental_snapshot_slot..max_incremental_snapshot_slot
3128            {
3129                let snapshot_filename = format!(
3130                    "incremental-snapshot-{}-{}-{}.tar.zst",
3131                    full_snapshot_slot,
3132                    incremental_snapshot_slot,
3133                    Hash::default()
3134                );
3135                let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
3136                fs::File::create(snapshot_filepath).unwrap();
3137            }
3138
3139            let snapshot_filename = format!(
3140                "snapshot-{}-{}.tar.zst",
3141                full_snapshot_slot,
3142                Hash::default()
3143            );
3144            let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
3145            fs::File::create(snapshot_filepath).unwrap();
3146
3147            // Add in an incremental snapshot with a bad filename and high slot to ensure filename are filtered and sorted correctly
3148            let bad_filename = format!(
3149                "incremental-snapshot-{}-{}-bad!hash.tar.zst",
3150                full_snapshot_slot,
3151                max_incremental_snapshot_slot + 1,
3152            );
3153            let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
3154            fs::File::create(bad_filepath).unwrap();
3155        }
3156
3157        // Add in a snapshot with a bad filename and high slot to ensure filename are filtered and
3158        // sorted correctly
3159        let bad_filename = format!("snapshot-{}-bad!hash.tar.zst", max_full_snapshot_slot + 1);
3160        let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
3161        fs::File::create(bad_filepath).unwrap();
3162    }
3163
3164    #[test]
3165    fn test_get_full_snapshot_archives() {
3166        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3167        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3168        let min_slot = 123;
3169        let max_slot = 456;
3170        common_create_snapshot_archive_files(
3171            full_snapshot_archives_dir.path(),
3172            incremental_snapshot_archives_dir.path(),
3173            min_slot,
3174            max_slot,
3175            0,
3176            0,
3177        );
3178
3179        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3180        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3181    }
3182
3183    #[test]
3184    fn test_get_full_snapshot_archives_remote() {
3185        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3186        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3187        let min_slot = 123;
3188        let max_slot = 456;
3189        common_create_snapshot_archive_files(
3190            &full_snapshot_archives_dir
3191                .path()
3192                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3193            &incremental_snapshot_archives_dir
3194                .path()
3195                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3196            min_slot,
3197            max_slot,
3198            0,
3199            0,
3200        );
3201
3202        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3203        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3204        assert!(snapshot_archives.iter().all(|info| info.is_remote()));
3205    }
3206
3207    #[test]
3208    fn test_get_incremental_snapshot_archives() {
3209        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3210        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3211        let min_full_snapshot_slot = 12;
3212        let max_full_snapshot_slot = 23;
3213        let min_incremental_snapshot_slot = 34;
3214        let max_incremental_snapshot_slot = 45;
3215        common_create_snapshot_archive_files(
3216            full_snapshot_archives_dir.path(),
3217            incremental_snapshot_archives_dir.path(),
3218            min_full_snapshot_slot,
3219            max_full_snapshot_slot,
3220            min_incremental_snapshot_slot,
3221            max_incremental_snapshot_slot,
3222        );
3223
3224        let incremental_snapshot_archives =
3225            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3226        assert_eq!(
3227            incremental_snapshot_archives.len() as Slot,
3228            (max_full_snapshot_slot - min_full_snapshot_slot)
3229                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3230        );
3231    }
3232
3233    #[test]
3234    fn test_get_incremental_snapshot_archives_remote() {
3235        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3236        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3237        let min_full_snapshot_slot = 12;
3238        let max_full_snapshot_slot = 23;
3239        let min_incremental_snapshot_slot = 34;
3240        let max_incremental_snapshot_slot = 45;
3241        common_create_snapshot_archive_files(
3242            &full_snapshot_archives_dir
3243                .path()
3244                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3245            &incremental_snapshot_archives_dir
3246                .path()
3247                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3248            min_full_snapshot_slot,
3249            max_full_snapshot_slot,
3250            min_incremental_snapshot_slot,
3251            max_incremental_snapshot_slot,
3252        );
3253
3254        let incremental_snapshot_archives =
3255            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3256        assert_eq!(
3257            incremental_snapshot_archives.len() as Slot,
3258            (max_full_snapshot_slot - min_full_snapshot_slot)
3259                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3260        );
3261        assert!(incremental_snapshot_archives
3262            .iter()
3263            .all(|info| info.is_remote()));
3264    }
3265
3266    #[test]
3267    fn test_get_highest_full_snapshot_archive_slot() {
3268        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3269        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3270        let min_slot = 123;
3271        let max_slot = 456;
3272        common_create_snapshot_archive_files(
3273            full_snapshot_archives_dir.path(),
3274            incremental_snapshot_archives_dir.path(),
3275            min_slot,
3276            max_slot,
3277            0,
3278            0,
3279        );
3280
3281        assert_eq!(
3282            get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
3283            Some(max_slot - 1)
3284        );
3285    }
3286
3287    #[test]
3288    fn test_get_highest_incremental_snapshot_slot() {
3289        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3290        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3291        let min_full_snapshot_slot = 12;
3292        let max_full_snapshot_slot = 23;
3293        let min_incremental_snapshot_slot = 34;
3294        let max_incremental_snapshot_slot = 45;
3295        common_create_snapshot_archive_files(
3296            full_snapshot_archives_dir.path(),
3297            incremental_snapshot_archives_dir.path(),
3298            min_full_snapshot_slot,
3299            max_full_snapshot_slot,
3300            min_incremental_snapshot_slot,
3301            max_incremental_snapshot_slot,
3302        );
3303
3304        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3305            assert_eq!(
3306                get_highest_incremental_snapshot_archive_slot(
3307                    incremental_snapshot_archives_dir.path(),
3308                    full_snapshot_slot
3309                ),
3310                Some(max_incremental_snapshot_slot - 1)
3311            );
3312        }
3313
3314        assert_eq!(
3315            get_highest_incremental_snapshot_archive_slot(
3316                incremental_snapshot_archives_dir.path(),
3317                max_full_snapshot_slot
3318            ),
3319            None
3320        );
3321    }
3322
3323    fn common_test_purge_old_snapshot_archives(
3324        snapshot_names: &[&String],
3325        maximum_full_snapshot_archives_to_retain: NonZeroUsize,
3326        maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
3327        expected_snapshots: &[&String],
3328    ) {
3329        let temp_snap_dir = tempfile::TempDir::new().unwrap();
3330
3331        for snap_name in snapshot_names {
3332            let snap_path = temp_snap_dir.path().join(snap_name);
3333            let mut _snap_file = fs::File::create(snap_path);
3334        }
3335        purge_old_snapshot_archives(
3336            temp_snap_dir.path(),
3337            temp_snap_dir.path(),
3338            maximum_full_snapshot_archives_to_retain,
3339            maximum_incremental_snapshot_archives_to_retain,
3340        );
3341
3342        let mut retained_snaps = HashSet::new();
3343        for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
3344            let entry_path_buf = entry.unwrap().path();
3345            let entry_path = entry_path_buf.as_path();
3346            let snapshot_name = entry_path
3347                .file_name()
3348                .unwrap()
3349                .to_str()
3350                .unwrap()
3351                .to_string();
3352            retained_snaps.insert(snapshot_name);
3353        }
3354
3355        for snap_name in expected_snapshots {
3356            assert!(
3357                retained_snaps.contains(snap_name.as_str()),
3358                "{snap_name} not found"
3359            );
3360        }
3361        assert_eq!(retained_snaps.len(), expected_snapshots.len());
3362    }
3363
3364    #[test]
3365    fn test_purge_old_full_snapshot_archives() {
3366        let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
3367        let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
3368        let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
3369        let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
3370
3371        // expecting only the newest to be retained
3372        let expected_snapshots = vec![&snap3_name];
3373        common_test_purge_old_snapshot_archives(
3374            &snapshot_names,
3375            NonZeroUsize::new(1).unwrap(),
3376            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3377            &expected_snapshots,
3378        );
3379
3380        // retaining 2, expecting the 2 newest to be retained
3381        let expected_snapshots = vec![&snap2_name, &snap3_name];
3382        common_test_purge_old_snapshot_archives(
3383            &snapshot_names,
3384            NonZeroUsize::new(2).unwrap(),
3385            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3386            &expected_snapshots,
3387        );
3388
3389        // retaining 3, all three should be retained
3390        let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
3391        common_test_purge_old_snapshot_archives(
3392            &snapshot_names,
3393            NonZeroUsize::new(3).unwrap(),
3394            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3395            &expected_snapshots,
3396        );
3397    }
3398
3399    /// Mimic a running node's behavior w.r.t. purging old snapshot archives.  Take snapshots in a
3400    /// loop, and periodically purge old snapshot archives.  After purging, check to make sure the
3401    /// snapshot archives on disk are correct.
3402    #[test]
3403    fn test_purge_old_full_snapshot_archives_in_the_loop() {
3404        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3405        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3406        let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
3407        let starting_slot: Slot = 42;
3408
3409        for slot in (starting_slot..).take(100) {
3410            let full_snapshot_archive_file_name =
3411                format!("snapshot-{}-{}.tar.zst", slot, Hash::default());
3412            let full_snapshot_archive_path = full_snapshot_archives_dir
3413                .as_ref()
3414                .join(full_snapshot_archive_file_name);
3415            fs::File::create(full_snapshot_archive_path).unwrap();
3416
3417            // don't purge-and-check until enough snapshot archives have been created
3418            if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
3419                continue;
3420            }
3421
3422            // purge infrequently, so there will always be snapshot archives to purge
3423            if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
3424                continue;
3425            }
3426
3427            purge_old_snapshot_archives(
3428                &full_snapshot_archives_dir,
3429                &incremental_snapshot_archives_dir,
3430                maximum_snapshots_to_retain,
3431                NonZeroUsize::new(usize::MAX).unwrap(),
3432            );
3433            let mut full_snapshot_archives =
3434                get_full_snapshot_archives(&full_snapshot_archives_dir);
3435            full_snapshot_archives.sort_unstable();
3436            assert_eq!(
3437                full_snapshot_archives.len(),
3438                maximum_snapshots_to_retain.get()
3439            );
3440            assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
3441            for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
3442                assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
3443            }
3444        }
3445    }
3446
3447    #[test]
3448    fn test_purge_old_incremental_snapshot_archives() {
3449        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3450        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3451        let starting_slot = 100_000;
3452
3453        let maximum_incremental_snapshot_archives_to_retain =
3454            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3455        let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3456
3457        let incremental_snapshot_interval = 100;
3458        let num_incremental_snapshots_per_full_snapshot =
3459            maximum_incremental_snapshot_archives_to_retain.get() * 2;
3460        let full_snapshot_interval =
3461            incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
3462
3463        let mut snapshot_filenames = vec![];
3464        (starting_slot..)
3465            .step_by(full_snapshot_interval)
3466            .take(
3467                maximum_full_snapshot_archives_to_retain
3468                    .checked_mul(NonZeroUsize::new(2).unwrap())
3469                    .unwrap()
3470                    .get(),
3471            )
3472            .for_each(|full_snapshot_slot| {
3473                let snapshot_filename = format!(
3474                    "snapshot-{}-{}.tar.zst",
3475                    full_snapshot_slot,
3476                    Hash::default()
3477                );
3478                let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
3479                fs::File::create(snapshot_path).unwrap();
3480                snapshot_filenames.push(snapshot_filename);
3481
3482                (full_snapshot_slot..)
3483                    .step_by(incremental_snapshot_interval)
3484                    .take(num_incremental_snapshots_per_full_snapshot)
3485                    .skip(1)
3486                    .for_each(|incremental_snapshot_slot| {
3487                        let snapshot_filename = format!(
3488                            "incremental-snapshot-{}-{}-{}.tar.zst",
3489                            full_snapshot_slot,
3490                            incremental_snapshot_slot,
3491                            Hash::default()
3492                        );
3493                        let snapshot_path = incremental_snapshot_archives_dir
3494                            .path()
3495                            .join(&snapshot_filename);
3496                        fs::File::create(snapshot_path).unwrap();
3497                        snapshot_filenames.push(snapshot_filename);
3498                    });
3499            });
3500
3501        purge_old_snapshot_archives(
3502            full_snapshot_archives_dir.path(),
3503            incremental_snapshot_archives_dir.path(),
3504            maximum_full_snapshot_archives_to_retain,
3505            maximum_incremental_snapshot_archives_to_retain,
3506        );
3507
3508        // Ensure correct number of full snapshot archives are purged/retained
3509        let mut remaining_full_snapshot_archives =
3510            get_full_snapshot_archives(full_snapshot_archives_dir.path());
3511        assert_eq!(
3512            remaining_full_snapshot_archives.len(),
3513            maximum_full_snapshot_archives_to_retain.get(),
3514        );
3515        remaining_full_snapshot_archives.sort_unstable();
3516        let latest_full_snapshot_archive_slot =
3517            remaining_full_snapshot_archives.last().unwrap().slot();
3518
3519        // Ensure correct number of incremental snapshot archives are purged/retained
3520        // For each additional full snapshot archive, one additional (the newest)
3521        // incremental snapshot archive is retained. This is accounted for by the
3522        // `+ maximum_full_snapshot_archives_to_retain.saturating_sub(1)`
3523        let mut remaining_incremental_snapshot_archives =
3524            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3525        assert_eq!(
3526            remaining_incremental_snapshot_archives.len(),
3527            maximum_incremental_snapshot_archives_to_retain
3528                .get()
3529                .saturating_add(
3530                    maximum_full_snapshot_archives_to_retain
3531                        .get()
3532                        .saturating_sub(1)
3533                )
3534        );
3535        remaining_incremental_snapshot_archives.sort_unstable();
3536        remaining_incremental_snapshot_archives.reverse();
3537
3538        // Ensure there exists one incremental snapshot all but the latest full snapshot
3539        for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
3540            let incremental_snapshot_archive =
3541                remaining_incremental_snapshot_archives.pop().unwrap();
3542
3543            let expected_base_slot =
3544                latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
3545            assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
3546            let expected_slot = expected_base_slot
3547                + (full_snapshot_interval - incremental_snapshot_interval) as u64;
3548            assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
3549        }
3550
3551        // Ensure all remaining incremental snapshots are only for the latest full snapshot
3552        for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
3553            assert_eq!(
3554                incremental_snapshot_archive.base_slot(),
3555                latest_full_snapshot_archive_slot
3556            );
3557        }
3558
3559        // Ensure the remaining incremental snapshots are at the right slot
3560        let expected_remaining_incremental_snapshot_archive_slots =
3561            (latest_full_snapshot_archive_slot..)
3562                .step_by(incremental_snapshot_interval)
3563                .take(num_incremental_snapshots_per_full_snapshot)
3564                .skip(
3565                    num_incremental_snapshots_per_full_snapshot
3566                        - maximum_incremental_snapshot_archives_to_retain.get(),
3567                )
3568                .collect::<HashSet<_>>();
3569
3570        let actual_remaining_incremental_snapshot_archive_slots =
3571            remaining_incremental_snapshot_archives
3572                .iter()
3573                .map(|snapshot| snapshot.slot())
3574                .collect::<HashSet<_>>();
3575        assert_eq!(
3576            actual_remaining_incremental_snapshot_archive_slots,
3577            expected_remaining_incremental_snapshot_archive_slots
3578        );
3579    }
3580
3581    #[test]
3582    fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
3583        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3584        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3585
3586        for snapshot_filenames in [
3587            format!("incremental-snapshot-100-120-{}.tar.zst", Hash::default()),
3588            format!("incremental-snapshot-100-140-{}.tar.zst", Hash::default()),
3589            format!("incremental-snapshot-100-160-{}.tar.zst", Hash::default()),
3590            format!("incremental-snapshot-100-180-{}.tar.zst", Hash::default()),
3591            format!("incremental-snapshot-200-220-{}.tar.zst", Hash::default()),
3592            format!("incremental-snapshot-200-240-{}.tar.zst", Hash::default()),
3593            format!("incremental-snapshot-200-260-{}.tar.zst", Hash::default()),
3594            format!("incremental-snapshot-200-280-{}.tar.zst", Hash::default()),
3595        ] {
3596            let snapshot_path = incremental_snapshot_archives_dir
3597                .path()
3598                .join(snapshot_filenames);
3599            fs::File::create(snapshot_path).unwrap();
3600        }
3601
3602        purge_old_snapshot_archives(
3603            full_snapshot_archives_dir.path(),
3604            incremental_snapshot_archives_dir.path(),
3605            NonZeroUsize::new(usize::MAX).unwrap(),
3606            NonZeroUsize::new(usize::MAX).unwrap(),
3607        );
3608
3609        let remaining_incremental_snapshot_archives =
3610            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3611        assert!(remaining_incremental_snapshot_archives.is_empty());
3612    }
3613
3614    #[test]
3615    fn test_get_snapshot_accounts_hardlink_dir() {
3616        let slot: Slot = 1;
3617
3618        let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
3619
3620        let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
3621        let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
3622        let accounts_hardlinks_dir = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
3623        fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
3624
3625        let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
3626        let appendvec_filename = format!("{slot}.0");
3627        let appendvec_path = accounts_dir.join(appendvec_filename);
3628
3629        let ret = get_snapshot_accounts_hardlink_dir(
3630            &appendvec_path,
3631            slot,
3632            &mut account_paths_set,
3633            &accounts_hardlinks_dir,
3634        );
3635        assert!(ret.is_ok());
3636
3637        let wrong_appendvec_path = appendvec_path
3638            .parent()
3639            .unwrap()
3640            .parent()
3641            .unwrap()
3642            .join(appendvec_path.file_name().unwrap());
3643        let ret = get_snapshot_accounts_hardlink_dir(
3644            &wrong_appendvec_path,
3645            slot,
3646            &mut account_paths_set,
3647            accounts_hardlinks_dir,
3648        );
3649
3650        assert_matches!(
3651            ret,
3652            Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
3653        );
3654    }
3655
3656    #[test]
3657    fn test_get_snapshot_file_kind() {
3658        assert_eq!(None, get_snapshot_file_kind("file.txt"));
3659        assert_eq!(
3660            Some(SnapshotFileKind::Version),
3661            get_snapshot_file_kind(SNAPSHOT_VERSION_FILENAME)
3662        );
3663        assert_eq!(
3664            Some(SnapshotFileKind::BankFields),
3665            get_snapshot_file_kind("1234")
3666        );
3667        assert_eq!(
3668            Some(SnapshotFileKind::Storage),
3669            get_snapshot_file_kind("1000.999")
3670        );
3671    }
3672
3673    #[test]
3674    fn test_full_snapshot_slot_file_good() {
3675        let slot_written = 123_456_789;
3676        let bank_snapshot_dir = TempDir::new().unwrap();
3677        write_full_snapshot_slot_file(&bank_snapshot_dir, slot_written).unwrap();
3678
3679        let slot_read = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap();
3680        assert_eq!(slot_read, slot_written);
3681    }
3682
3683    #[test]
3684    fn test_full_snapshot_slot_file_bad() {
3685        const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
3686        let too_small = [1u8; SLOT_SIZE - 1];
3687        let too_large = [1u8; SLOT_SIZE + 1];
3688
3689        for contents in [too_small.as_slice(), too_large.as_slice()] {
3690            let bank_snapshot_dir = TempDir::new().unwrap();
3691            let full_snapshot_slot_path = bank_snapshot_dir
3692                .as_ref()
3693                .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
3694            fs::write(full_snapshot_slot_path, contents).unwrap();
3695
3696            let err = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap_err();
3697            assert!(err
3698                .to_string()
3699                .starts_with("invalid full snapshot slot file size"));
3700        }
3701    }
3702}