miraland_runtime/
snapshot_utils.rs

1use {
2    crate::{
3        serde_snapshot::SnapshotStreams,
4        snapshot_archive_info::{
5            FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfoGetter,
6        },
7        snapshot_hash::SnapshotHash,
8        snapshot_package::SnapshotPackage,
9        snapshot_utils::snapshot_storage_rebuilder::{
10            RebuiltSnapshotStorage, SnapshotStorageRebuilder,
11        },
12    },
13    bzip2::bufread::BzDecoder,
14    crossbeam_channel::Sender,
15    flate2::read::GzDecoder,
16    lazy_static::lazy_static,
17    log::*,
18    miraland_accounts_db::{
19        account_storage::AccountStorageMap,
20        accounts_db::{AccountStorageEntry, AtomicAppendVecId},
21        accounts_file::AccountsFileError,
22        append_vec::AppendVec,
23        hardened_unpack::{self, ParallelSelector, UnpackError},
24        shared_buffer_reader::{SharedBuffer, SharedBufferReader},
25        utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
26    },
27    miraland_measure::{measure, measure::Measure},
28    regex::Regex,
29    miraland_sdk::{clock::Slot, hash::Hash},
30    std::{
31        cmp::Ordering,
32        collections::{HashMap, HashSet},
33        fmt, fs,
34        io::{BufReader, BufWriter, Error as IoError, Read, Result as IoResult, Seek, Write},
35        num::NonZeroUsize,
36        path::{Path, PathBuf},
37        process::ExitStatus,
38        str::FromStr,
39        sync::Arc,
40        thread::{Builder, JoinHandle},
41    },
42    tar::{self, Archive},
43    tempfile::TempDir,
44    thiserror::Error,
45};
46#[cfg(feature = "dev-context-only-utils")]
47use {
48    hardened_unpack::UnpackedAppendVecMap,
49    miraland_accounts_db::utils::create_accounts_run_and_snapshot_dirs, rayon::prelude::*,
50};
51
52mod archive_format;
53pub mod snapshot_storage_rebuilder;
54pub use archive_format::*;
55
56pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
57pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
58pub const SNAPSHOT_STATE_COMPLETE_FILENAME: &str = "state_complete";
59pub const SNAPSHOT_ACCOUNTS_HARDLINKS: &str = "accounts_hardlinks";
60pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
61pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
62const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; // byte
63const VERSION_STRING_V1_2_0: &str = "1.2.0";
64pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
65pub const BANK_SNAPSHOT_PRE_FILENAME_EXTENSION: &str = "pre";
66// The following unsafes are
67// - Safe because the values are fixed, known non-zero constants
68// - Necessary in order to have a plain NonZeroUsize as the constant, NonZeroUsize
69//   returns an Option<NonZeroUsize> and we can't .unwrap() at compile time
70pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
71    unsafe { NonZeroUsize::new_unchecked(2) };
72pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
73    unsafe { NonZeroUsize::new_unchecked(4) };
74pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
75pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
76
77#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
78pub enum SnapshotVersion {
79    #[default]
80    V1_2_0,
81}
82
83impl fmt::Display for SnapshotVersion {
84    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
85        f.write_str(From::from(*self))
86    }
87}
88
89impl From<SnapshotVersion> for &'static str {
90    fn from(snapshot_version: SnapshotVersion) -> &'static str {
91        match snapshot_version {
92            SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
93        }
94    }
95}
96
97impl FromStr for SnapshotVersion {
98    type Err = &'static str;
99
100    fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
101        // Remove leading 'v' or 'V' from slice
102        let version_string = if version_string
103            .get(..1)
104            .map_or(false, |s| s.eq_ignore_ascii_case("v"))
105        {
106            &version_string[1..]
107        } else {
108            version_string
109        };
110        match version_string {
111            VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
112            _ => Err("unsupported snapshot version"),
113        }
114    }
115}
116
117impl SnapshotVersion {
118    pub fn as_str(self) -> &'static str {
119        <&str as From<Self>>::from(self)
120    }
121}
122
123/// Information about a bank snapshot. Namely the slot of the bank, the path to the snapshot, and
124/// the kind of the snapshot.
125#[derive(PartialEq, Eq, Debug)]
126pub struct BankSnapshotInfo {
127    /// Slot of the bank
128    pub slot: Slot,
129    /// Snapshot kind
130    pub snapshot_kind: BankSnapshotKind,
131    /// Path to the bank snapshot directory
132    pub snapshot_dir: PathBuf,
133    /// Snapshot version
134    pub snapshot_version: SnapshotVersion,
135}
136
137impl PartialOrd for BankSnapshotInfo {
138    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
139        Some(self.cmp(other))
140    }
141}
142
143// Order BankSnapshotInfo by slot (ascending), which practically is sorting chronologically
144impl Ord for BankSnapshotInfo {
145    fn cmp(&self, other: &Self) -> Ordering {
146        self.slot.cmp(&other.slot)
147    }
148}
149
150impl BankSnapshotInfo {
151    pub fn new_from_dir(
152        bank_snapshots_dir: impl AsRef<Path>,
153        slot: Slot,
154    ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
155        // check this directory to see if there is a BankSnapshotPre and/or
156        // BankSnapshotPost file
157        let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
158
159        if !bank_snapshot_dir.is_dir() {
160            return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
161                bank_snapshot_dir,
162            ));
163        }
164
165        // Among the files checks, the completion flag file check should be done first to avoid the later
166        // I/O errors.
167
168        // There is a time window from the slot directory being created, and the content being completely
169        // filled.  Check the completion to avoid using a highest found slot directory with missing content.
170        if !is_bank_snapshot_complete(&bank_snapshot_dir) {
171            return Err(SnapshotNewFromDirError::IncompleteDir(bank_snapshot_dir));
172        }
173
174        let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
175        if !status_cache_file.is_file() {
176            return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
177                status_cache_file,
178            ));
179        }
180
181        let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
182        let version_str = snapshot_version_from_file(&version_path).or(Err(
183            SnapshotNewFromDirError::MissingVersionFile(version_path),
184        ))?;
185        let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
186            .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
187
188        let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
189        let bank_snapshot_pre_path =
190            bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
191
192        // NOTE: It is important that checking for "Pre" happens before "Post.
193        //
194        // Consider the scenario where AccountsHashVerifier is actively processing an
195        // AccountsPackage for a snapshot/slot; if AHV is in the middle of reserializing the
196        // bank snapshot file (writing the new "Post" file), and then the process dies,
197        // there will be an incomplete "Post" file on disk.  We do not want only the existence of
198        // this "Post" file to be sufficient for deciding the snapshot kind as "Post".  More so,
199        // "Post" *requires* the *absence* of a "Pre" file.
200        let snapshot_kind = if bank_snapshot_pre_path.is_file() {
201            BankSnapshotKind::Pre
202        } else if bank_snapshot_post_path.is_file() {
203            BankSnapshotKind::Post
204        } else {
205            return Err(SnapshotNewFromDirError::MissingSnapshotFile(
206                bank_snapshot_dir,
207            ));
208        };
209
210        Ok(BankSnapshotInfo {
211            slot,
212            snapshot_kind,
213            snapshot_dir: bank_snapshot_dir,
214            snapshot_version,
215        })
216    }
217
218    pub fn snapshot_path(&self) -> PathBuf {
219        let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot));
220
221        let ext = match self.snapshot_kind {
222            BankSnapshotKind::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
223            BankSnapshotKind::Post => "",
224        };
225        bank_snapshot_path.set_extension(ext);
226
227        bank_snapshot_path
228    }
229}
230/// Bank snapshots traditionally had their accounts hash calculated prior to serialization.  Since
231/// the hash calculation takes a long time, an optimization has been put in to offload the accounts
232/// hash calculation.  The bank serialization format has not changed, so we need another way to
233/// identify if a bank snapshot contains the calculated accounts hash or not.
234///
235/// When a bank snapshot is first taken, it does not have the calculated accounts hash.  It is said
236/// that this bank snapshot is "pre" accounts hash.  Later, when the accounts hash is calculated,
237/// the bank snapshot is re-serialized, and is now "post" accounts hash.
238#[derive(Debug, Copy, Clone, Eq, PartialEq)]
239pub enum BankSnapshotKind {
240    /// This bank snapshot has *not* yet had its accounts hash calculated
241    Pre,
242    /// This bank snapshot *has* had its accounts hash calculated
243    Post,
244}
245
246/// When constructing a bank a snapshot, traditionally the snapshot was from a snapshot archive.  Now,
247/// the snapshot can be from a snapshot directory, or from a snapshot archive.  This is the flag to
248/// indicate which.
249#[derive(Clone, Copy, Debug, Eq, PartialEq)]
250pub enum SnapshotFrom {
251    /// Build from the snapshot archive
252    Archive,
253    /// Build directly from the bank snapshot directory
254    Dir,
255}
256
257/// Helper type when rebuilding from snapshots.  Designed to handle when rebuilding from just a
258/// full snapshot, or from both a full snapshot and an incremental snapshot.
259#[derive(Debug)]
260pub struct SnapshotRootPaths {
261    pub full_snapshot_root_file_path: PathBuf,
262    pub incremental_snapshot_root_file_path: Option<PathBuf>,
263}
264
265/// Helper type to bundle up the results from `unarchive_snapshot()`
266#[derive(Debug)]
267pub struct UnarchivedSnapshot {
268    #[allow(dead_code)]
269    unpack_dir: TempDir,
270    pub storage: AccountStorageMap,
271    pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
272    pub measure_untar: Measure,
273}
274
275/// Helper type for passing around the unpacked snapshots dir and the snapshot version together
276#[derive(Debug)]
277pub struct UnpackedSnapshotsDirAndVersion {
278    pub unpacked_snapshots_dir: PathBuf,
279    pub snapshot_version: SnapshotVersion,
280}
281
282/// Helper type for passing around account storage map and next append vec id
283/// for reconstructing accounts from a snapshot
284pub(crate) struct StorageAndNextAppendVecId {
285    pub storage: AccountStorageMap,
286    pub next_append_vec_id: AtomicAppendVecId,
287}
288
289#[derive(Error, Debug)]
290#[allow(clippy::large_enum_variant)]
291pub enum SnapshotError {
292    #[error("I/O error: {0}")]
293    Io(#[from] IoError),
294
295    #[error("AccountsFile error: {0}")]
296    AccountsFileError(#[from] AccountsFileError),
297
298    #[error("serialization error: {0}")]
299    Serialize(#[from] bincode::Error),
300
301    #[error("crossbeam send error: {0}")]
302    CrossbeamSend(#[from] crossbeam_channel::SendError<PathBuf>),
303
304    #[error("archive generation failure {0}")]
305    ArchiveGenerationFailure(ExitStatus),
306
307    #[error("Unpack error: {0}")]
308    UnpackError(#[from] UnpackError),
309
310    #[error("source({1}) - I/O error: {0}")]
311    IoWithSource(IoError, &'static str),
312
313    #[error("could not get file name from path '{0}'")]
314    PathToFileNameError(PathBuf),
315
316    #[error("could not get str from file name '{0}'")]
317    FileNameToStrError(PathBuf),
318
319    #[error("could not parse snapshot archive's file name '{0}'")]
320    ParseSnapshotArchiveFileNameError(String),
321
322    #[error("snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot ({1}) do not match")]
323    MismatchedBaseSlot(Slot, Slot),
324
325    #[error("no snapshot archives to load from '{0}'")]
326    NoSnapshotArchives(PathBuf),
327
328    #[error("snapshot has mismatch: deserialized bank: {0:?}, snapshot archive info: {1:?}")]
329    MismatchedSlotHash((Slot, SnapshotHash), (Slot, SnapshotHash)),
330
331    #[error("snapshot slot deltas are invalid: {0}")]
332    VerifySlotDeltas(#[from] VerifySlotDeltasError),
333
334    #[error("bank_snapshot_info new_from_dir failed: {0}")]
335    NewFromDir(#[from] SnapshotNewFromDirError),
336
337    #[error("invalid snapshot dir path '{0}'")]
338    InvalidSnapshotDirPath(PathBuf),
339
340    #[error("invalid AppendVec path '{0}'")]
341    InvalidAppendVecPath(PathBuf),
342
343    #[error("invalid account path '{0}'")]
344    InvalidAccountPath(PathBuf),
345
346    #[error("no valid snapshot dir found under '{0}'")]
347    NoSnapshotSlotDir(PathBuf),
348
349    #[error("snapshot dir account paths mismatching")]
350    AccountPathsMismatch,
351
352    #[error("failed to add bank snapshot for slot {1}: {0}")]
353    AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
354
355    #[error("failed to archive snapshot package: {0}")]
356    ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
357}
358
359#[derive(Error, Debug)]
360pub enum SnapshotNewFromDirError {
361    #[error("invalid bank snapshot directory '{0}'")]
362    InvalidBankSnapshotDir(PathBuf),
363
364    #[error("missing status cache file '{0}'")]
365    MissingStatusCacheFile(PathBuf),
366
367    #[error("missing version file '{0}'")]
368    MissingVersionFile(PathBuf),
369
370    #[error("invalid snapshot version '{0}'")]
371    InvalidVersion(String),
372
373    #[error("snapshot directory incomplete '{0}'")]
374    IncompleteDir(PathBuf),
375
376    #[error("missing snapshot file '{0}'")]
377    MissingSnapshotFile(PathBuf),
378}
379
380pub type Result<T> = std::result::Result<T, SnapshotError>;
381
382/// Errors that can happen in `verify_slot_deltas()`
383#[derive(Error, Debug, PartialEq, Eq)]
384pub enum VerifySlotDeltasError {
385    #[error("too many entries: {0} (max: {1})")]
386    TooManyEntries(usize, usize),
387
388    #[error("slot {0} is not a root")]
389    SlotIsNotRoot(Slot),
390
391    #[error("slot {0} is greater than bank slot {1}")]
392    SlotGreaterThanMaxRoot(Slot, Slot),
393
394    #[error("slot {0} has multiple entries")]
395    SlotHasMultipleEntries(Slot),
396
397    #[error("slot {0} was not found in slot history")]
398    SlotNotFoundInHistory(Slot),
399
400    #[error("slot {0} was in history but missing from slot deltas")]
401    SlotNotFoundInDeltas(Slot),
402
403    #[error("slot history is bad and cannot be used to verify slot deltas")]
404    BadSlotHistory,
405}
406
407/// Errors that can happen in `add_bank_snapshot()`
408#[derive(Error, Debug)]
409pub enum AddBankSnapshotError {
410    #[error("bank snapshot dir already exists '{0}'")]
411    SnapshotDirAlreadyExists(PathBuf),
412
413    #[error("failed to create snapshot dir '{1}': {0}")]
414    CreateSnapshotDir(#[source] IoError, PathBuf),
415
416    #[error("failed to hard link storages: {0}")]
417    HardLinkStorages(#[source] HardLinkStoragesToSnapshotError),
418
419    #[error("failed to serialize bank: {0}")]
420    SerializeBank(#[source] Box<SnapshotError>),
421
422    #[error("failed to serialize status cache: {0}")]
423    SerializeStatusCache(#[source] Box<SnapshotError>),
424
425    #[error("failed to write snapshot version file '{1}': {0}")]
426    WriteSnapshotVersionFile(#[source] IoError, PathBuf),
427
428    #[error("failed to mark snapshot as 'complete': failed to create file '{1}': {0}")]
429    CreateStateCompleteFile(#[source] IoError, PathBuf),
430}
431
432/// Errors that can happen in `archive_snapshot_package()`
433#[derive(Error, Debug)]
434pub enum ArchiveSnapshotPackageError {
435    #[error("failed to create archive path '{1}': {0}")]
436    CreateArchiveDir(#[source] IoError, PathBuf),
437
438    #[error("failed to create staging dir inside '{1}': {0}")]
439    CreateStagingDir(#[source] IoError, PathBuf),
440
441    #[error("failed to create accounts staging dir '{1}': {0}")]
442    CreateAccountsStagingDir(#[source] IoError, PathBuf),
443
444    #[error("failed to create snapshot staging dir '{1}': {0}")]
445    CreateSnapshotStagingDir(#[source] IoError, PathBuf),
446
447    #[error("failed to canonicalize snapshot source dir '{1}': {0}")]
448    CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),
449
450    #[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
451    SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),
452
453    #[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
454    SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),
455
456    #[error("failed to symlink version file from '{1}' to '{2}': {0}")]
457    SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),
458
459    #[error("failed to flush account storage file '{1}': {0}")]
460    FlushAccountStorageFile(#[source] AccountsFileError, PathBuf),
461
462    #[error("failed to canonicalize account storage file '{1}': {0}")]
463    CanonicalizeAccountStorageFile(#[source] IoError, PathBuf),
464
465    #[error("failed to symlink account storage file from '{1}' to '{2}': {0}")]
466    SymlinkAccountStorageFile(#[source] IoError, PathBuf, PathBuf),
467
468    #[error("account storage staging file is invalid '{0}'")]
469    InvalidAccountStorageStagingFile(PathBuf),
470
471    #[error("failed to create archive file '{1}': {0}")]
472    CreateArchiveFile(#[source] IoError, PathBuf),
473
474    #[error("failed to archive version file: {0}")]
475    ArchiveVersionFile(#[source] IoError),
476
477    #[error("failed to archive snapshots dir: {0}")]
478    ArchiveSnapshotsDir(#[source] IoError),
479
480    #[error("failed to archive accounts dir: {0}")]
481    ArchiveAccountsDir(#[source] IoError),
482
483    #[error("failed to archive snapshot: {0}")]
484    FinishArchive(#[source] IoError),
485
486    #[error("failed to create encoder: {0}")]
487    CreateEncoder(#[source] IoError),
488
489    #[error("failed to encode archive: {0}")]
490    FinishEncoder(#[source] IoError),
491
492    #[error("failed to query archive metadata '{1}': {0}")]
493    QueryArchiveMetadata(#[source] IoError, PathBuf),
494
495    #[error("failed to move archive from '{1}' to '{2}': {0}")]
496    MoveArchive(#[source] IoError, PathBuf, PathBuf),
497}
498
499/// Errors that can happen in `hard_link_storages_to_snapshot()`
500#[derive(Error, Debug)]
501pub enum HardLinkStoragesToSnapshotError {
502    #[error("failed to create accounts hard links dir '{1}': {0}")]
503    CreateAccountsHardLinksDir(#[source] IoError, PathBuf),
504
505    #[error("failed to flush storage: {0}")]
506    FlushStorage(#[source] AccountsFileError),
507
508    #[error("failed to get the snapshot's accounts hard link dir: {0}")]
509    GetSnapshotHardLinksDir(#[from] GetSnapshotAccountsHardLinkDirError),
510
511    #[error("failed to hard link storage from '{1}' to '{2}': {0}")]
512    HardLinkStorage(#[source] IoError, PathBuf, PathBuf),
513}
514
515/// Errors that can happen in `get_snapshot_accounts_hardlink_dir()`
516#[derive(Error, Debug)]
517pub enum GetSnapshotAccountsHardLinkDirError {
518    #[error("invalid account storage path '{0}'")]
519    GetAccountPath(PathBuf),
520
521    #[error("failed to create the snapshot hard link dir '{1}': {0}")]
522    CreateSnapshotHardLinkDir(#[source] IoError, PathBuf),
523
524    #[error("failed to symlink snapshot hard link dir '{link}' to '{original}': {source}")]
525    SymlinkSnapshotHardLinkDir {
526        source: IoError,
527        original: PathBuf,
528        link: PathBuf,
529    },
530}
531
532/// The account snapshot directories under <account_path>/snapshot/<slot> contain account files hardlinked
533/// from <account_path>/run taken at snapshot <slot> time.  They are referenced by the symlinks from the
534/// bank snapshot dir snapshot/<slot>/accounts_hardlinks/.  We observed that sometimes the bank snapshot dir
535/// could be deleted but the account snapshot directories were left behind, possibly by some manual operations
536/// or some legacy code not using the symlinks to clean up the acccount snapshot hardlink directories.
537/// This function cleans up any account snapshot directories that are no longer referenced by the bank
538/// snapshot dirs, to ensure proper snapshot operations.
539pub fn clean_orphaned_account_snapshot_dirs(
540    bank_snapshots_dir: impl AsRef<Path>,
541    account_snapshot_paths: &[PathBuf],
542) -> IoResult<()> {
543    // Create the HashSet of the account snapshot hardlink directories referenced by the snapshot dirs.
544    // This is used to clean up any hardlinks that are no longer referenced by the snapshot dirs.
545    let mut account_snapshot_dirs_referenced = HashSet::new();
546    let snapshots = get_bank_snapshots(bank_snapshots_dir);
547    for snapshot in snapshots {
548        let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
549        // loop through entries in the snapshot_hardlink_dir, read the symlinks, add the target to the HashSet
550        let read_dir = fs::read_dir(&account_hardlinks_dir).map_err(|err| {
551            IoError::other(format!(
552                "failed to read account hardlinks dir '{}': {err}",
553                account_hardlinks_dir.display(),
554            ))
555        })?;
556        for entry in read_dir {
557            let path = entry?.path();
558            let target = fs::read_link(&path).map_err(|err| {
559                IoError::other(format!(
560                    "failed to read symlink '{}': {err}",
561                    path.display(),
562                ))
563            })?;
564            account_snapshot_dirs_referenced.insert(target);
565        }
566    }
567
568    // loop through the account snapshot hardlink directories, if the directory is not in the account_snapshot_dirs_referenced set, delete it
569    for account_snapshot_path in account_snapshot_paths {
570        let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
571            IoError::other(format!(
572                "failed to read account snapshot dir '{}': {err}",
573                account_snapshot_path.display(),
574            ))
575        })?;
576        for entry in read_dir {
577            let path = entry?.path();
578            if !account_snapshot_dirs_referenced.contains(&path) {
579                info!(
580                    "Removing orphaned account snapshot hardlink directory '{}'...",
581                    path.display()
582                );
583                move_and_async_delete_path(&path);
584            }
585        }
586    }
587
588    Ok(())
589}
590
591/// Purges incomplete bank snapshots
592pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
593    let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
594        // If we cannot read the bank snapshots dir, then there's nothing to do
595        return;
596    };
597
598    let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
599
600    let incomplete_dirs: Vec<_> = read_dir_iter
601        .filter_map(|entry| entry.ok())
602        .map(|entry| entry.path())
603        .filter(|path| path.is_dir())
604        .filter(is_incomplete)
605        .collect();
606
607    // attempt to purge all the incomplete directories; do not exit early
608    for incomplete_dir in incomplete_dirs {
609        let result = purge_bank_snapshot(&incomplete_dir);
610        match result {
611            Ok(_) => info!(
612                "Purged incomplete snapshot dir: {}",
613                incomplete_dir.display()
614            ),
615            Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
616        }
617    }
618}
619
620/// Is the bank snapshot complete?
621fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
622    let state_complete_path = bank_snapshot_dir
623        .as_ref()
624        .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
625    state_complete_path.is_file()
626}
627
628/// If the validator halts in the middle of `archive_snapshot_package()`, the temporary staging
629/// directory won't be cleaned up.  Call this function to clean them up.
630pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
631    if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
632        for entry in entries.flatten() {
633            if entry
634                .file_name()
635                .to_str()
636                .map(|file_name| file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX))
637                .unwrap_or(false)
638            {
639                let path = entry.path();
640                let result = if path.is_dir() {
641                    fs::remove_dir_all(&path)
642                } else {
643                    fs::remove_file(&path)
644                };
645                if let Err(err) = result {
646                    warn!(
647                        "Failed to remove temporary snapshot archive '{}': {err}",
648                        path.display(),
649                    );
650                }
651            }
652        }
653    }
654}
655
656/// Make a snapshot archive out of the snapshot package
657pub fn archive_snapshot_package(
658    snapshot_package: &SnapshotPackage,
659    full_snapshot_archives_dir: impl AsRef<Path>,
660    incremental_snapshot_archives_dir: impl AsRef<Path>,
661    maximum_full_snapshot_archives_to_retain: NonZeroUsize,
662    maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
663) -> Result<()> {
664    use ArchiveSnapshotPackageError as E;
665    const SNAPSHOTS_DIR: &str = "snapshots";
666    const ACCOUNTS_DIR: &str = "accounts";
667    info!(
668        "Generating snapshot archive for slot {}",
669        snapshot_package.slot()
670    );
671
672    let mut timer = Measure::start("snapshot_package-package_snapshots");
673    let tar_dir = snapshot_package
674        .path()
675        .parent()
676        .expect("Tar output path is invalid");
677
678    fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
679
680    // Create the staging directories
681    let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
682    let staging_dir = tempfile::Builder::new()
683        .prefix(&format!(
684            "{}{}-",
685            staging_dir_prefix,
686            snapshot_package.slot()
687        ))
688        .tempdir_in(tar_dir)
689        .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
690
691    let staging_snapshots_dir = staging_dir.path().join(SNAPSHOTS_DIR);
692    let staging_accounts_dir = staging_dir.path().join(ACCOUNTS_DIR);
693
694    // Create staging/accounts/
695    fs::create_dir_all(&staging_accounts_dir)
696        .map_err(|err| E::CreateAccountsStagingDir(err, staging_accounts_dir.clone()))?;
697
698    let slot_str = snapshot_package.slot().to_string();
699    let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
700    // Creates staging snapshots/<slot>/
701    fs::create_dir_all(&staging_snapshot_dir)
702        .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
703
704    let src_snapshot_dir = &snapshot_package.bank_snapshot_dir;
705    // To be a source for symlinking and archiving, the path need to be an absolute path
706    let src_snapshot_dir = src_snapshot_dir
707        .canonicalize()
708        .map_err(|err| E::CanonicalizeSnapshotSourceDir(err, src_snapshot_dir.clone()))?;
709    let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
710    let src_snapshot_file = src_snapshot_dir.join(slot_str);
711    symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
712        .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
713
714    // Following the existing archive format, the status cache is under snapshots/, not under <slot>/
715    // like in the snapshot dir.
716    let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
717    let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
718    symlink::symlink_file(&src_status_cache, &staging_status_cache)
719        .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
720
721    // The bank snapshot has the version file, so symlink it to the correct staging path
722    let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
723    let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
724    symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
725        E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
726    })?;
727
728    // Add the AppendVecs into the compressible list
729    for storage in snapshot_package.snapshot_storages.iter() {
730        let storage_path = storage.get_path();
731        storage
732            .flush()
733            .map_err(|err| E::FlushAccountStorageFile(err, storage_path.clone()))?;
734        let staging_storage_path = staging_accounts_dir.join(AppendVec::file_name(
735            storage.slot(),
736            storage.append_vec_id(),
737        ));
738
739        // `src_storage_path` - The file path where the AppendVec itself is located
740        // `staging_storage_path` - The file path where the AppendVec will be placed in the staging directory.
741        let src_storage_path = fs::canonicalize(&storage_path)
742            .map_err(|err| E::CanonicalizeAccountStorageFile(err, storage_path))?;
743        symlink::symlink_file(&src_storage_path, &staging_storage_path).map_err(|err| {
744            E::SymlinkAccountStorageFile(err, src_storage_path, staging_storage_path.clone())
745        })?;
746        if !staging_storage_path.is_file() {
747            return Err(E::InvalidAccountStorageStagingFile(staging_storage_path).into());
748        }
749    }
750
751    // Tar the staging directory into the archive at `archive_path`
752    let archive_path = tar_dir.join(format!(
753        "{}{}.{}",
754        staging_dir_prefix,
755        snapshot_package.slot(),
756        snapshot_package.archive_format().extension(),
757    ));
758
759    {
760        let mut archive_file = fs::File::create(&archive_path)
761            .map_err(|err| E::CreateArchiveFile(err, archive_path.clone()))?;
762
763        let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
764            let mut archive = tar::Builder::new(encoder);
765            // Serialize the version and snapshots files before accounts so we can quickly determine the version
766            // and other bank fields. This is necessary if we want to interleave unpacking with reconstruction
767            archive
768                .append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
769                .map_err(E::ArchiveVersionFile)?;
770            archive
771                .append_dir_all(SNAPSHOTS_DIR, &staging_snapshots_dir)
772                .map_err(E::ArchiveSnapshotsDir)?;
773            archive
774                .append_dir_all(ACCOUNTS_DIR, &staging_accounts_dir)
775                .map_err(E::ArchiveAccountsDir)?;
776            archive.into_inner().map_err(E::FinishArchive)?;
777            Ok(())
778        };
779
780        match snapshot_package.archive_format() {
781            ArchiveFormat::TarBzip2 => {
782                let mut encoder =
783                    bzip2::write::BzEncoder::new(archive_file, bzip2::Compression::best());
784                do_archive_files(&mut encoder)?;
785                encoder.finish().map_err(E::FinishEncoder)?;
786            }
787            ArchiveFormat::TarGzip => {
788                let mut encoder =
789                    flate2::write::GzEncoder::new(archive_file, flate2::Compression::default());
790                do_archive_files(&mut encoder)?;
791                encoder.finish().map_err(E::FinishEncoder)?;
792            }
793            ArchiveFormat::TarZstd => {
794                let mut encoder =
795                    zstd::stream::Encoder::new(archive_file, 0).map_err(E::CreateEncoder)?;
796                do_archive_files(&mut encoder)?;
797                encoder.finish().map_err(E::FinishEncoder)?;
798            }
799            ArchiveFormat::TarLz4 => {
800                let mut encoder = lz4::EncoderBuilder::new()
801                    .level(1)
802                    .build(archive_file)
803                    .map_err(E::CreateEncoder)?;
804                do_archive_files(&mut encoder)?;
805                let (_output, result) = encoder.finish();
806                result.map_err(E::FinishEncoder)?;
807            }
808            ArchiveFormat::Tar => {
809                do_archive_files(&mut archive_file)?;
810            }
811        };
812    }
813
814    // Atomically move the archive into position for other validators to find
815    let metadata = fs::metadata(&archive_path)
816        .map_err(|err| E::QueryArchiveMetadata(err, archive_path.clone()))?;
817    fs::rename(&archive_path, snapshot_package.path())
818        .map_err(|err| E::MoveArchive(err, archive_path, snapshot_package.path().clone()))?;
819
820    purge_old_snapshot_archives(
821        full_snapshot_archives_dir,
822        incremental_snapshot_archives_dir,
823        maximum_full_snapshot_archives_to_retain,
824        maximum_incremental_snapshot_archives_to_retain,
825    );
826
827    timer.stop();
828    info!(
829        "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
830        snapshot_package.path().display(),
831        snapshot_package.slot(),
832        timer.as_ms(),
833        metadata.len()
834    );
835
836    datapoint_info!(
837        "archive-snapshot-package",
838        ("slot", snapshot_package.slot(), i64),
839        (
840            "archive_format",
841            snapshot_package.archive_format().to_string(),
842            String
843        ),
844        ("duration_ms", timer.as_ms(), i64),
845        (
846            if snapshot_package.snapshot_kind.is_full_snapshot() {
847                "full-snapshot-archive-size"
848            } else {
849                "incremental-snapshot-archive-size"
850            },
851            metadata.len(),
852            i64
853        ),
854    );
855    Ok(())
856}
857
858/// Get the bank snapshots in a directory
859pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
860    let mut bank_snapshots = Vec::default();
861    match fs::read_dir(&bank_snapshots_dir) {
862        Err(err) => {
863            info!(
864                "Unable to read bank snapshots directory '{}': {err}",
865                bank_snapshots_dir.as_ref().display(),
866            );
867        }
868        Ok(paths) => paths
869            .filter_map(|entry| {
870                // check if this entry is a directory and only a Slot
871                // bank snapshots are bank_snapshots_dir/slot/slot(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION)
872                entry
873                    .ok()
874                    .filter(|entry| entry.path().is_dir())
875                    .and_then(|entry| {
876                        entry
877                            .path()
878                            .file_name()
879                            .and_then(|file_name| file_name.to_str())
880                            .and_then(|file_name| file_name.parse::<Slot>().ok())
881                    })
882            })
883            .for_each(
884                |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
885                    Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
886                    // Other threads may be modifying bank snapshots in parallel; only return
887                    // snapshots that are complete as deemed by BankSnapshotInfo::new_from_dir()
888                    Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
889                },
890            ),
891    }
892    bank_snapshots
893}
894
895/// Get the bank snapshots in a directory
896///
897/// This function retains only the bank snapshots of kind BankSnapshotKind::Pre
898pub fn get_bank_snapshots_pre(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
899    let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
900    bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Pre);
901    bank_snapshots
902}
903
904/// Get the bank snapshots in a directory
905///
906/// This function retains only the bank snapshots of kind BankSnapshotKind::Post
907pub fn get_bank_snapshots_post(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
908    let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
909    bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Post);
910    bank_snapshots
911}
912
913/// Get the bank snapshot with the highest slot in a directory
914///
915/// This function gets the highest bank snapshot of kind BankSnapshotKind::Pre
916pub fn get_highest_bank_snapshot_pre(
917    bank_snapshots_dir: impl AsRef<Path>,
918) -> Option<BankSnapshotInfo> {
919    do_get_highest_bank_snapshot(get_bank_snapshots_pre(bank_snapshots_dir))
920}
921
922/// Get the bank snapshot with the highest slot in a directory
923///
924/// This function gets the highest bank snapshot of kind BankSnapshotKind::Post
925pub fn get_highest_bank_snapshot_post(
926    bank_snapshots_dir: impl AsRef<Path>,
927) -> Option<BankSnapshotInfo> {
928    do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
929}
930
931/// Get the bank snapshot with the highest slot in a directory
932///
933/// This function gets the highest bank snapshot of any kind
934pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
935    do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
936}
937
938fn do_get_highest_bank_snapshot(
939    mut bank_snapshots: Vec<BankSnapshotInfo>,
940) -> Option<BankSnapshotInfo> {
941    bank_snapshots.sort_unstable();
942    bank_snapshots.into_iter().next_back()
943}
944
945pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
946where
947    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
948{
949    serialize_snapshot_data_file_capped::<F>(
950        data_file_path,
951        MAX_SNAPSHOT_DATA_FILE_SIZE,
952        serializer,
953    )
954}
955
956pub fn deserialize_snapshot_data_file<T: Sized>(
957    data_file_path: &Path,
958    deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
959) -> Result<T> {
960    let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
961        deserializer(streams.full_snapshot_stream)
962    };
963
964    let wrapped_data_file_path = SnapshotRootPaths {
965        full_snapshot_root_file_path: data_file_path.to_path_buf(),
966        incremental_snapshot_root_file_path: None,
967    };
968
969    deserialize_snapshot_data_files_capped(
970        &wrapped_data_file_path,
971        MAX_SNAPSHOT_DATA_FILE_SIZE,
972        wrapped_deserializer,
973    )
974}
975
976pub fn deserialize_snapshot_data_files<T: Sized>(
977    snapshot_root_paths: &SnapshotRootPaths,
978    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
979) -> Result<T> {
980    deserialize_snapshot_data_files_capped(
981        snapshot_root_paths,
982        MAX_SNAPSHOT_DATA_FILE_SIZE,
983        deserializer,
984    )
985}
986
987fn serialize_snapshot_data_file_capped<F>(
988    data_file_path: &Path,
989    maximum_file_size: u64,
990    serializer: F,
991) -> Result<u64>
992where
993    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
994{
995    let data_file = fs::File::create(data_file_path)?;
996    let mut data_file_stream = BufWriter::new(data_file);
997    serializer(&mut data_file_stream)?;
998    data_file_stream.flush()?;
999
1000    let consumed_size = data_file_stream.stream_position()?;
1001    if consumed_size > maximum_file_size {
1002        let error_message = format!(
1003            "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
1004            data_file_path.display(),
1005        );
1006        return Err(IoError::other(error_message).into());
1007    }
1008    Ok(consumed_size)
1009}
1010
1011fn deserialize_snapshot_data_files_capped<T: Sized>(
1012    snapshot_root_paths: &SnapshotRootPaths,
1013    maximum_file_size: u64,
1014    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1015) -> Result<T> {
1016    let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
1017        create_snapshot_data_file_stream(
1018            &snapshot_root_paths.full_snapshot_root_file_path,
1019            maximum_file_size,
1020        )?;
1021
1022    let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
1023        if let Some(ref incremental_snapshot_root_file_path) =
1024            snapshot_root_paths.incremental_snapshot_root_file_path
1025        {
1026            Some(create_snapshot_data_file_stream(
1027                incremental_snapshot_root_file_path,
1028                maximum_file_size,
1029            )?)
1030        } else {
1031            None
1032        }
1033        .unzip();
1034
1035    let mut snapshot_streams = SnapshotStreams {
1036        full_snapshot_stream: &mut full_snapshot_data_file_stream,
1037        incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
1038    };
1039    let ret = deserializer(&mut snapshot_streams)?;
1040
1041    check_deserialize_file_consumed(
1042        full_snapshot_file_size,
1043        &snapshot_root_paths.full_snapshot_root_file_path,
1044        &mut full_snapshot_data_file_stream,
1045    )?;
1046
1047    if let Some(ref incremental_snapshot_root_file_path) =
1048        snapshot_root_paths.incremental_snapshot_root_file_path
1049    {
1050        check_deserialize_file_consumed(
1051            incremental_snapshot_file_size.unwrap(),
1052            incremental_snapshot_root_file_path,
1053            incremental_snapshot_data_file_stream.as_mut().unwrap(),
1054        )?;
1055    }
1056
1057    Ok(ret)
1058}
1059
1060/// Before running the deserializer function, perform common operations on the snapshot archive
1061/// files, such as checking the file size and opening the file into a stream.
1062fn create_snapshot_data_file_stream(
1063    snapshot_root_file_path: impl AsRef<Path>,
1064    maximum_file_size: u64,
1065) -> Result<(u64, BufReader<std::fs::File>)> {
1066    let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
1067
1068    if snapshot_file_size > maximum_file_size {
1069        let error_message = format!(
1070            "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
1071            snapshot_root_file_path.as_ref().display(),
1072            snapshot_file_size,
1073            maximum_file_size,
1074        );
1075        return Err(IoError::other(error_message).into());
1076    }
1077
1078    let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
1079    let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
1080
1081    Ok((snapshot_file_size, snapshot_data_file_stream))
1082}
1083
1084/// After running the deserializer function, perform common checks to ensure the snapshot archive
1085/// files were consumed correctly.
1086fn check_deserialize_file_consumed(
1087    file_size: u64,
1088    file_path: impl AsRef<Path>,
1089    file_stream: &mut BufReader<std::fs::File>,
1090) -> Result<()> {
1091    let consumed_size = file_stream.stream_position()?;
1092
1093    if consumed_size != file_size {
1094        let error_message = format!(
1095            "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to deserialize",
1096            file_path.as_ref().display(),
1097            file_size,
1098            consumed_size,
1099        );
1100        return Err(IoError::other(error_message).into());
1101    }
1102
1103    Ok(())
1104}
1105
1106/// Return account path from the appendvec path after checking its format.
1107fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
1108    let run_path = appendvec_path.parent()?;
1109    let run_file_name = run_path.file_name()?;
1110    // All appendvec files should be under <account_path>/run/.
1111    // When generating the bank snapshot directory, they are hardlinked to <account_path>/snapshot/<slot>/
1112    if run_file_name != ACCOUNTS_RUN_DIR {
1113        error!(
1114            "The account path {} does not have run/ as its immediate parent directory.",
1115            run_path.display()
1116        );
1117        return None;
1118    }
1119    let account_path = run_path.parent()?;
1120    Some(account_path.to_path_buf())
1121}
1122
1123/// From an appendvec path, derive the snapshot hardlink path.  If the corresponding snapshot hardlink
1124/// directory does not exist, create it.
1125fn get_snapshot_accounts_hardlink_dir(
1126    appendvec_path: &Path,
1127    bank_slot: Slot,
1128    account_paths: &mut HashSet<PathBuf>,
1129    hardlinks_dir: impl AsRef<Path>,
1130) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1131    let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1132        GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1133    })?;
1134
1135    let snapshot_hardlink_dir = account_path
1136        .join(ACCOUNTS_SNAPSHOT_DIR)
1137        .join(bank_slot.to_string());
1138
1139    // Use the hashset to track, to avoid checking the file system.  Only set up the hardlink directory
1140    // and the symlink to it at the first time of seeing the account_path.
1141    if !account_paths.contains(&account_path) {
1142        let idx = account_paths.len();
1143        debug!(
1144            "for appendvec_path {}, create hard-link path {}",
1145            appendvec_path.display(),
1146            snapshot_hardlink_dir.display()
1147        );
1148        fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1149            GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1150                err,
1151                snapshot_hardlink_dir.clone(),
1152            )
1153        })?;
1154        let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1155        symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1156            GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1157                source: err,
1158                original: snapshot_hardlink_dir.clone(),
1159                link: symlink_path,
1160            }
1161        })?;
1162        account_paths.insert(account_path);
1163    };
1164
1165    Ok(snapshot_hardlink_dir)
1166}
1167
1168/// Hard-link the files from accounts/ to snapshot/<bank_slot>/accounts/
1169/// This keeps the appendvec files alive and with the bank snapshot.  The slot and id
1170/// in the file names are also updated in case its file is a recycled one with inconsistent slot
1171/// and id.
1172pub fn hard_link_storages_to_snapshot(
1173    bank_snapshot_dir: impl AsRef<Path>,
1174    bank_slot: Slot,
1175    snapshot_storages: &[Arc<AccountStorageEntry>],
1176) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1177    let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1178    fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1179        HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1180            err,
1181            accounts_hardlinks_dir.clone(),
1182        )
1183    })?;
1184
1185    let mut account_paths: HashSet<PathBuf> = HashSet::new();
1186    for storage in snapshot_storages {
1187        storage
1188            .flush()
1189            .map_err(HardLinkStoragesToSnapshotError::FlushStorage)?;
1190        let storage_path = storage.accounts.get_path();
1191        let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1192            &storage_path,
1193            bank_slot,
1194            &mut account_paths,
1195            &accounts_hardlinks_dir,
1196        )?;
1197        // The appendvec could be recycled, so its filename may not be consistent to the slot and id.
1198        // Use the storage slot and id to compose a consistent file name for the hard-link file.
1199        let hardlink_filename = AppendVec::file_name(storage.slot(), storage.append_vec_id());
1200        let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1201        fs::hard_link(&storage_path, &hard_link_path).map_err(|err| {
1202            HardLinkStoragesToSnapshotError::HardLinkStorage(err, storage_path, hard_link_path)
1203        })?;
1204    }
1205    Ok(())
1206}
1207
1208/// serializing needs Vec<Vec<Arc<AccountStorageEntry>>>, but data structure at runtime is Vec<Arc<AccountStorageEntry>>
1209/// translates to what we need
1210pub(crate) fn get_storages_to_serialize(
1211    snapshot_storages: &[Arc<AccountStorageEntry>],
1212) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1213    snapshot_storages
1214        .iter()
1215        .map(|storage| vec![Arc::clone(storage)])
1216        .collect::<Vec<_>>()
1217}
1218
1219// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later.
1220const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
1221
1222/// Unarchives the given full and incremental snapshot archives, as long as they are compatible.
1223pub fn verify_and_unarchive_snapshots(
1224    bank_snapshots_dir: impl AsRef<Path>,
1225    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1226    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1227    account_paths: &[PathBuf],
1228) -> Result<(
1229    UnarchivedSnapshot,
1230    Option<UnarchivedSnapshot>,
1231    AtomicAppendVecId,
1232)> {
1233    check_are_snapshots_compatible(
1234        full_snapshot_archive_info,
1235        incremental_snapshot_archive_info,
1236    )?;
1237
1238    let parallel_divisions = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT);
1239
1240    let next_append_vec_id = Arc::new(AtomicAppendVecId::new(0));
1241    let unarchived_full_snapshot = unarchive_snapshot(
1242        &bank_snapshots_dir,
1243        TMP_SNAPSHOT_ARCHIVE_PREFIX,
1244        full_snapshot_archive_info.path(),
1245        "snapshot untar",
1246        account_paths,
1247        full_snapshot_archive_info.archive_format(),
1248        parallel_divisions,
1249        next_append_vec_id.clone(),
1250    )?;
1251
1252    let unarchived_incremental_snapshot =
1253        if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1254            let unarchived_incremental_snapshot = unarchive_snapshot(
1255                &bank_snapshots_dir,
1256                TMP_SNAPSHOT_ARCHIVE_PREFIX,
1257                incremental_snapshot_archive_info.path(),
1258                "incremental snapshot untar",
1259                account_paths,
1260                incremental_snapshot_archive_info.archive_format(),
1261                parallel_divisions,
1262                next_append_vec_id.clone(),
1263            )?;
1264            Some(unarchived_incremental_snapshot)
1265        } else {
1266            None
1267        };
1268
1269    Ok((
1270        unarchived_full_snapshot,
1271        unarchived_incremental_snapshot,
1272        Arc::try_unwrap(next_append_vec_id).unwrap(),
1273    ))
1274}
1275
1276/// Spawns a thread for unpacking a snapshot
1277fn spawn_unpack_snapshot_thread(
1278    file_sender: Sender<PathBuf>,
1279    account_paths: Arc<Vec<PathBuf>>,
1280    ledger_dir: Arc<PathBuf>,
1281    mut archive: Archive<SharedBufferReader>,
1282    parallel_selector: Option<ParallelSelector>,
1283    thread_index: usize,
1284) -> JoinHandle<()> {
1285    Builder::new()
1286        .name(format!("mlnUnpkSnpsht{thread_index:02}"))
1287        .spawn(move || {
1288            hardened_unpack::streaming_unpack_snapshot(
1289                &mut archive,
1290                ledger_dir.as_path(),
1291                &account_paths,
1292                parallel_selector,
1293                &file_sender,
1294            )
1295            .unwrap();
1296        })
1297        .unwrap()
1298}
1299
1300/// Streams unpacked files across channel
1301fn streaming_unarchive_snapshot(
1302    file_sender: Sender<PathBuf>,
1303    account_paths: Vec<PathBuf>,
1304    ledger_dir: PathBuf,
1305    snapshot_archive_path: PathBuf,
1306    archive_format: ArchiveFormat,
1307    num_threads: usize,
1308) -> Vec<JoinHandle<()>> {
1309    let account_paths = Arc::new(account_paths);
1310    let ledger_dir = Arc::new(ledger_dir);
1311    let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format);
1312
1313    // All shared buffer readers need to be created before the threads are spawned
1314    #[allow(clippy::needless_collect)]
1315    let archives: Vec<_> = (0..num_threads)
1316        .map(|_| {
1317            let reader = SharedBufferReader::new(&shared_buffer);
1318            Archive::new(reader)
1319        })
1320        .collect();
1321
1322    archives
1323        .into_iter()
1324        .enumerate()
1325        .map(|(thread_index, archive)| {
1326            let parallel_selector = Some(ParallelSelector {
1327                index: thread_index,
1328                divisions: num_threads,
1329            });
1330
1331            spawn_unpack_snapshot_thread(
1332                file_sender.clone(),
1333                account_paths.clone(),
1334                ledger_dir.clone(),
1335                archive,
1336                parallel_selector,
1337                thread_index,
1338            )
1339        })
1340        .collect()
1341}
1342
1343/// BankSnapshotInfo::new_from_dir() requires a few meta files to accept a snapshot dir
1344/// as a valid one.  A dir unpacked from an archive lacks these files.  Fill them here to
1345/// allow new_from_dir() checks to pass.  These checks are not needed for unpacked dirs,
1346/// but it is not clean to add another flag to new_from_dir() to skip them.
1347fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1348    let snapshots_dir = unpack_dir.as_ref().join("snapshots");
1349    if !snapshots_dir.is_dir() {
1350        return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1351    }
1352
1353    // The unpacked dir has a single slot dir, which is the snapshot slot dir.
1354    let slot_dir = std::fs::read_dir(&snapshots_dir)
1355        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1356        .find(|entry| entry.as_ref().unwrap().path().is_dir())
1357        .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1358        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1359        .path();
1360
1361    let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
1362    fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
1363
1364    let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1365    fs::hard_link(
1366        status_cache_file,
1367        slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
1368    )?;
1369
1370    let state_complete_file = slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
1371    fs::File::create(state_complete_file)?;
1372
1373    Ok(())
1374}
1375
1376/// Perform the common tasks when unarchiving a snapshot.  Handles creating the temporary
1377/// directories, untaring, reading the version file, and then returning those fields plus the
1378/// rebuilt storage
1379fn unarchive_snapshot(
1380    bank_snapshots_dir: impl AsRef<Path>,
1381    unpacked_snapshots_dir_prefix: &'static str,
1382    snapshot_archive_path: impl AsRef<Path>,
1383    measure_name: &'static str,
1384    account_paths: &[PathBuf],
1385    archive_format: ArchiveFormat,
1386    parallel_divisions: usize,
1387    next_append_vec_id: Arc<AtomicAppendVecId>,
1388) -> Result<UnarchivedSnapshot> {
1389    let unpack_dir = tempfile::Builder::new()
1390        .prefix(unpacked_snapshots_dir_prefix)
1391        .tempdir_in(bank_snapshots_dir)?;
1392    let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
1393
1394    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1395    streaming_unarchive_snapshot(
1396        file_sender,
1397        account_paths.to_vec(),
1398        unpack_dir.path().to_path_buf(),
1399        snapshot_archive_path.as_ref().to_path_buf(),
1400        archive_format,
1401        parallel_divisions,
1402    );
1403
1404    let num_rebuilder_threads = num_cpus::get_physical()
1405        .saturating_sub(parallel_divisions)
1406        .max(1);
1407    let (version_and_storages, measure_untar) = measure!(
1408        SnapshotStorageRebuilder::rebuild_storage(
1409            file_receiver,
1410            num_rebuilder_threads,
1411            next_append_vec_id,
1412            SnapshotFrom::Archive,
1413        )?,
1414        measure_name
1415    );
1416    info!("{}", measure_untar);
1417
1418    create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1419
1420    let RebuiltSnapshotStorage {
1421        snapshot_version,
1422        storage,
1423    } = version_and_storages;
1424    Ok(UnarchivedSnapshot {
1425        unpack_dir,
1426        storage,
1427        unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1428            unpacked_snapshots_dir,
1429            snapshot_version,
1430        },
1431        measure_untar,
1432    })
1433}
1434
1435/// Streams snapshot dir files across channel
1436/// Follow the flow of streaming_unarchive_snapshot(), but handle the from_dir case.
1437fn streaming_snapshot_dir_files(
1438    file_sender: Sender<PathBuf>,
1439    snapshot_file_path: impl Into<PathBuf>,
1440    snapshot_version_path: impl Into<PathBuf>,
1441    account_paths: &[PathBuf],
1442) -> Result<()> {
1443    file_sender.send(snapshot_file_path.into())?;
1444    file_sender.send(snapshot_version_path.into())?;
1445
1446    for account_path in account_paths {
1447        for file in fs::read_dir(account_path)? {
1448            file_sender.send(file?.path())?;
1449        }
1450    }
1451
1452    Ok(())
1453}
1454
1455/// Performs the common tasks when deserializing a snapshot
1456///
1457/// Handles reading the snapshot file and version file,
1458/// then returning those fields plus the rebuilt storages.
1459pub fn rebuild_storages_from_snapshot_dir(
1460    snapshot_info: &BankSnapshotInfo,
1461    account_paths: &[PathBuf],
1462    next_append_vec_id: Arc<AtomicAppendVecId>,
1463) -> Result<AccountStorageMap> {
1464    let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1465    let accounts_hardlinks = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1466    let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1467
1468    let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1469        IoError::other(format!(
1470            "failed to read accounts hardlinks dir '{}': {err}",
1471            accounts_hardlinks.display(),
1472        ))
1473    })?;
1474    for dir_entry in read_dir {
1475        let symlink_path = dir_entry?.path();
1476        // The symlink point to <account_path>/snapshot/<slot> which contain the account files hardlinks
1477        // The corresponding run path should be <account_path>/run/
1478        let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
1479            IoError::other(format!(
1480                "failed to read symlink '{}': {err}",
1481                symlink_path.display(),
1482            ))
1483        })?;
1484        let account_run_path = account_snapshot_path
1485            .parent()
1486            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1487            .parent()
1488            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1489            .join(ACCOUNTS_RUN_DIR);
1490        if !account_run_paths.contains(&account_run_path) {
1491            // The appendvec from the bank snapshot storage does not match any of the provided account_paths set.
1492            // The accout paths have changed so the snapshot is no longer usable.
1493            return Err(SnapshotError::AccountPathsMismatch);
1494        }
1495        // Generate hard-links to make the account files available in the main accounts/, and let the new appendvec
1496        // paths be in accounts/
1497        let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
1498            IoError::other(format!(
1499                "failed to read account snapshot dir '{}': {err}",
1500                account_snapshot_path.display(),
1501            ))
1502        })?;
1503        for file in read_dir {
1504            let file_path = file?.path();
1505            let file_name = file_path
1506                .file_name()
1507                .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
1508            let dest_path = account_run_path.join(file_name);
1509            fs::hard_link(&file_path, &dest_path).map_err(|err| {
1510                IoError::other(format!(
1511                    "failed to hard link from '{}' to '{}': {err}",
1512                    file_path.display(),
1513                    dest_path.display(),
1514                ))
1515            })?;
1516        }
1517    }
1518
1519    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1520    let snapshot_file_path = &snapshot_info.snapshot_path();
1521    let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1522    streaming_snapshot_dir_files(
1523        file_sender,
1524        snapshot_file_path,
1525        snapshot_version_path,
1526        account_paths,
1527    )?;
1528
1529    let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1530    let version_and_storages = SnapshotStorageRebuilder::rebuild_storage(
1531        file_receiver,
1532        num_rebuilder_threads,
1533        next_append_vec_id,
1534        SnapshotFrom::Dir,
1535    )?;
1536
1537    let RebuiltSnapshotStorage {
1538        snapshot_version: _,
1539        storage,
1540    } = version_and_storages;
1541    Ok(storage)
1542}
1543
1544/// Reads the `snapshot_version` from a file. Before opening the file, its size
1545/// is compared to `MAX_SNAPSHOT_VERSION_FILE_SIZE`. If the size exceeds this
1546/// threshold, it is not opened and an error is returned.
1547fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
1548    // Check file size.
1549    let file_metadata = fs::metadata(&path).map_err(|err| {
1550        IoError::other(format!(
1551            "failed to query snapshot version file metadata '{}': {err}",
1552            path.as_ref().display(),
1553        ))
1554    })?;
1555    let file_size = file_metadata.len();
1556    if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
1557        let error_message = format!(
1558            "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
1559            path.as_ref().display(),
1560            file_size,
1561            MAX_SNAPSHOT_VERSION_FILE_SIZE,
1562        );
1563        return Err(IoError::other(error_message).into());
1564    }
1565
1566    // Read snapshot_version from file.
1567    let mut snapshot_version = String::new();
1568    let mut file = fs::File::open(&path).map_err(|err| {
1569        IoError::other(format!(
1570            "failed to open snapshot version file '{}': {err}",
1571            path.as_ref().display()
1572        ))
1573    })?;
1574    file.read_to_string(&mut snapshot_version).map_err(|err| {
1575        IoError::other(format!(
1576            "failed to read snapshot version from file '{}': {err}",
1577            path.as_ref().display()
1578        ))
1579    })?;
1580
1581    Ok(snapshot_version.trim().to_string())
1582}
1583
1584/// Check if an incremental snapshot is compatible with a full snapshot.  This is done by checking
1585/// if the incremental snapshot's base slot is the same as the full snapshot's slot.
1586fn check_are_snapshots_compatible(
1587    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1588    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1589) -> Result<()> {
1590    if incremental_snapshot_archive_info.is_none() {
1591        return Ok(());
1592    }
1593
1594    let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
1595
1596    (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
1597        .then_some(())
1598        .ok_or_else(|| {
1599            SnapshotError::MismatchedBaseSlot(
1600                full_snapshot_archive_info.slot(),
1601                incremental_snapshot_archive_info.base_slot(),
1602            )
1603        })
1604}
1605
1606/// Get the `&str` from a `&Path`
1607pub fn path_to_file_name_str(path: &Path) -> Result<&str> {
1608    path.file_name()
1609        .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))?
1610        .to_str()
1611        .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf()))
1612}
1613
1614pub fn build_snapshot_archives_remote_dir(snapshot_archives_dir: impl AsRef<Path>) -> PathBuf {
1615    snapshot_archives_dir
1616        .as_ref()
1617        .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
1618}
1619
1620/// Build the full snapshot archive path from its components: the snapshot archives directory, the
1621/// snapshot slot, the accounts hash, and the archive format.
1622pub fn build_full_snapshot_archive_path(
1623    full_snapshot_archives_dir: impl AsRef<Path>,
1624    slot: Slot,
1625    hash: &SnapshotHash,
1626    archive_format: ArchiveFormat,
1627) -> PathBuf {
1628    full_snapshot_archives_dir.as_ref().join(format!(
1629        "snapshot-{}-{}.{}",
1630        slot,
1631        hash.0,
1632        archive_format.extension(),
1633    ))
1634}
1635
1636/// Build the incremental snapshot archive path from its components: the snapshot archives
1637/// directory, the snapshot base slot, the snapshot slot, the accounts hash, and the archive
1638/// format.
1639pub fn build_incremental_snapshot_archive_path(
1640    incremental_snapshot_archives_dir: impl AsRef<Path>,
1641    base_slot: Slot,
1642    slot: Slot,
1643    hash: &SnapshotHash,
1644    archive_format: ArchiveFormat,
1645) -> PathBuf {
1646    incremental_snapshot_archives_dir.as_ref().join(format!(
1647        "incremental-snapshot-{}-{}-{}.{}",
1648        base_slot,
1649        slot,
1650        hash.0,
1651        archive_format.extension(),
1652    ))
1653}
1654
1655/// Parse a full snapshot archive filename into its Slot, Hash, and Archive Format
1656pub(crate) fn parse_full_snapshot_archive_filename(
1657    archive_filename: &str,
1658) -> Result<(Slot, SnapshotHash, ArchiveFormat)> {
1659    lazy_static! {
1660        static ref RE: Regex = Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1661    }
1662
1663    let do_parse = || {
1664        RE.captures(archive_filename).and_then(|captures| {
1665            let slot = captures
1666                .name("slot")
1667                .map(|x| x.as_str().parse::<Slot>())?
1668                .ok()?;
1669            let hash = captures
1670                .name("hash")
1671                .map(|x| x.as_str().parse::<Hash>())?
1672                .ok()?;
1673            let archive_format = captures
1674                .name("ext")
1675                .map(|x| x.as_str().parse::<ArchiveFormat>())?
1676                .ok()?;
1677
1678            Some((slot, SnapshotHash(hash), archive_format))
1679        })
1680    };
1681
1682    do_parse().ok_or_else(|| {
1683        SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
1684    })
1685}
1686
1687/// Parse an incremental snapshot archive filename into its base Slot, actual Slot, Hash, and Archive Format
1688pub(crate) fn parse_incremental_snapshot_archive_filename(
1689    archive_filename: &str,
1690) -> Result<(Slot, Slot, SnapshotHash, ArchiveFormat)> {
1691    lazy_static! {
1692        static ref RE: Regex = Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1693    }
1694
1695    let do_parse = || {
1696        RE.captures(archive_filename).and_then(|captures| {
1697            let base_slot = captures
1698                .name("base")
1699                .map(|x| x.as_str().parse::<Slot>())?
1700                .ok()?;
1701            let slot = captures
1702                .name("slot")
1703                .map(|x| x.as_str().parse::<Slot>())?
1704                .ok()?;
1705            let hash = captures
1706                .name("hash")
1707                .map(|x| x.as_str().parse::<Hash>())?
1708                .ok()?;
1709            let archive_format = captures
1710                .name("ext")
1711                .map(|x| x.as_str().parse::<ArchiveFormat>())?
1712                .ok()?;
1713
1714            Some((base_slot, slot, SnapshotHash(hash), archive_format))
1715        })
1716    };
1717
1718    do_parse().ok_or_else(|| {
1719        SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
1720    })
1721}
1722
1723/// Walk down the snapshot archive to collect snapshot archive file info
1724fn get_snapshot_archives<T, F>(snapshot_archives_dir: &Path, cb: F) -> Vec<T>
1725where
1726    F: Fn(PathBuf) -> Result<T>,
1727{
1728    let walk_dir = |dir: &Path| -> Vec<T> {
1729        let entry_iter = fs::read_dir(dir);
1730        match entry_iter {
1731            Err(err) => {
1732                info!(
1733                    "Unable to read snapshot archives directory '{}': {err}",
1734                    dir.display(),
1735                );
1736                vec![]
1737            }
1738            Ok(entries) => entries
1739                .filter_map(|entry| entry.map_or(None, |entry| cb(entry.path()).ok()))
1740                .collect(),
1741        }
1742    };
1743
1744    let mut ret = walk_dir(snapshot_archives_dir);
1745    let remote_dir = build_snapshot_archives_remote_dir(snapshot_archives_dir);
1746    if remote_dir.exists() {
1747        ret.append(&mut walk_dir(remote_dir.as_ref()));
1748    }
1749    ret
1750}
1751
1752/// Get a list of the full snapshot archives from a directory
1753pub fn get_full_snapshot_archives(
1754    full_snapshot_archives_dir: impl AsRef<Path>,
1755) -> Vec<FullSnapshotArchiveInfo> {
1756    get_snapshot_archives(
1757        full_snapshot_archives_dir.as_ref(),
1758        FullSnapshotArchiveInfo::new_from_path,
1759    )
1760}
1761
1762/// Get a list of the incremental snapshot archives from a directory
1763pub fn get_incremental_snapshot_archives(
1764    incremental_snapshot_archives_dir: impl AsRef<Path>,
1765) -> Vec<IncrementalSnapshotArchiveInfo> {
1766    get_snapshot_archives(
1767        incremental_snapshot_archives_dir.as_ref(),
1768        IncrementalSnapshotArchiveInfo::new_from_path,
1769    )
1770}
1771
1772/// Get the highest slot of the full snapshot archives in a directory
1773pub fn get_highest_full_snapshot_archive_slot(
1774    full_snapshot_archives_dir: impl AsRef<Path>,
1775) -> Option<Slot> {
1776    get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
1777        .map(|full_snapshot_archive_info| full_snapshot_archive_info.slot())
1778}
1779
1780/// Get the highest slot of the incremental snapshot archives in a directory, for a given full
1781/// snapshot slot
1782pub fn get_highest_incremental_snapshot_archive_slot(
1783    incremental_snapshot_archives_dir: impl AsRef<Path>,
1784    full_snapshot_slot: Slot,
1785) -> Option<Slot> {
1786    get_highest_incremental_snapshot_archive_info(
1787        incremental_snapshot_archives_dir,
1788        full_snapshot_slot,
1789    )
1790    .map(|incremental_snapshot_archive_info| incremental_snapshot_archive_info.slot())
1791}
1792
1793/// Get the path (and metadata) for the full snapshot archive with the highest slot in a directory
1794pub fn get_highest_full_snapshot_archive_info(
1795    full_snapshot_archives_dir: impl AsRef<Path>,
1796) -> Option<FullSnapshotArchiveInfo> {
1797    let mut full_snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
1798    full_snapshot_archives.sort_unstable();
1799    full_snapshot_archives.into_iter().next_back()
1800}
1801
1802/// Get the path for the incremental snapshot archive with the highest slot, for a given full
1803/// snapshot slot, in a directory
1804pub fn get_highest_incremental_snapshot_archive_info(
1805    incremental_snapshot_archives_dir: impl AsRef<Path>,
1806    full_snapshot_slot: Slot,
1807) -> Option<IncrementalSnapshotArchiveInfo> {
1808    // Since we want to filter down to only the incremental snapshot archives that have the same
1809    // full snapshot slot as the value passed in, perform the filtering before sorting to avoid
1810    // doing unnecessary work.
1811    let mut incremental_snapshot_archives =
1812        get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
1813            .into_iter()
1814            .filter(|incremental_snapshot_archive_info| {
1815                incremental_snapshot_archive_info.base_slot() == full_snapshot_slot
1816            })
1817            .collect::<Vec<_>>();
1818    incremental_snapshot_archives.sort_unstable();
1819    incremental_snapshot_archives.into_iter().next_back()
1820}
1821
1822pub fn purge_old_snapshot_archives(
1823    full_snapshot_archives_dir: impl AsRef<Path>,
1824    incremental_snapshot_archives_dir: impl AsRef<Path>,
1825    maximum_full_snapshot_archives_to_retain: NonZeroUsize,
1826    maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
1827) {
1828    info!(
1829        "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
1830        full_snapshot_archives_dir.as_ref().display(),
1831        maximum_full_snapshot_archives_to_retain
1832    );
1833
1834    let mut full_snapshot_archives = get_full_snapshot_archives(&full_snapshot_archives_dir);
1835    full_snapshot_archives.sort_unstable();
1836    full_snapshot_archives.reverse();
1837
1838    let num_to_retain = full_snapshot_archives
1839        .len()
1840        .min(maximum_full_snapshot_archives_to_retain.get());
1841    trace!(
1842        "There are {} full snapshot archives, retaining {}",
1843        full_snapshot_archives.len(),
1844        num_to_retain,
1845    );
1846
1847    let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
1848        if full_snapshot_archives.is_empty() {
1849            None
1850        } else {
1851            Some(full_snapshot_archives.split_at(num_to_retain))
1852        }
1853        .unwrap_or_default();
1854
1855    let retained_full_snapshot_slots = full_snapshot_archives_to_retain
1856        .iter()
1857        .map(|ai| ai.slot())
1858        .collect::<HashSet<_>>();
1859
1860    fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
1861        for path in archives.iter().map(|a| a.path()) {
1862            trace!("Removing snapshot archive: {}", path.display());
1863            let result = fs::remove_file(path);
1864            if let Err(err) = result {
1865                info!(
1866                    "Failed to remove snapshot archive '{}': {err}",
1867                    path.display()
1868                );
1869            }
1870        }
1871    }
1872    remove_archives(full_snapshot_archives_to_remove);
1873
1874    info!(
1875        "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
1876        incremental_snapshot_archives_dir.as_ref().display(),
1877        maximum_incremental_snapshot_archives_to_retain
1878    );
1879    let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
1880    for incremental_snapshot_archive in
1881        get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
1882    {
1883        incremental_snapshot_archives_by_base_slot
1884            .entry(incremental_snapshot_archive.base_slot())
1885            .or_default()
1886            .push(incremental_snapshot_archive)
1887    }
1888
1889    let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
1890    for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
1891    {
1892        incremental_snapshot_archives.sort_unstable();
1893        let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
1894            maximum_incremental_snapshot_archives_to_retain.get()
1895        } else {
1896            usize::from(retained_full_snapshot_slots.contains(&base_slot))
1897        };
1898        trace!(
1899            "There are {} incremental snapshot archives for base slot {}, removing {} of them",
1900            incremental_snapshot_archives.len(),
1901            base_slot,
1902            incremental_snapshot_archives
1903                .len()
1904                .saturating_sub(num_to_retain),
1905        );
1906
1907        incremental_snapshot_archives.truncate(
1908            incremental_snapshot_archives
1909                .len()
1910                .saturating_sub(num_to_retain),
1911        );
1912        remove_archives(&incremental_snapshot_archives);
1913    }
1914}
1915
1916#[cfg(feature = "dev-context-only-utils")]
1917fn unpack_snapshot_local(
1918    shared_buffer: SharedBuffer,
1919    ledger_dir: &Path,
1920    account_paths: &[PathBuf],
1921    parallel_divisions: usize,
1922) -> Result<UnpackedAppendVecMap> {
1923    assert!(parallel_divisions > 0);
1924
1925    // allocate all readers before any readers start reading
1926    let readers = (0..parallel_divisions)
1927        .map(|_| SharedBufferReader::new(&shared_buffer))
1928        .collect::<Vec<_>>();
1929
1930    // create 'parallel_divisions' # of parallel workers, each responsible for 1/parallel_divisions of all the files to extract.
1931    let all_unpacked_append_vec_map = readers
1932        .into_par_iter()
1933        .enumerate()
1934        .map(|(index, reader)| {
1935            let parallel_selector = Some(ParallelSelector {
1936                index,
1937                divisions: parallel_divisions,
1938            });
1939            let mut archive = Archive::new(reader);
1940            hardened_unpack::unpack_snapshot(
1941                &mut archive,
1942                ledger_dir,
1943                account_paths,
1944                parallel_selector,
1945            )
1946        })
1947        .collect::<Vec<_>>();
1948
1949    let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
1950    for h in all_unpacked_append_vec_map {
1951        unpacked_append_vec_map.extend(h?);
1952    }
1953
1954    Ok(unpacked_append_vec_map)
1955}
1956
1957fn untar_snapshot_create_shared_buffer(
1958    snapshot_tar: &Path,
1959    archive_format: ArchiveFormat,
1960) -> SharedBuffer {
1961    let open_file = || {
1962        fs::File::open(snapshot_tar)
1963            .map_err(|err| {
1964                IoError::other(format!(
1965                    "failed to open snapshot archive '{}': {err}",
1966                    snapshot_tar.display(),
1967                ))
1968            })
1969            .unwrap()
1970    };
1971    match archive_format {
1972        ArchiveFormat::TarBzip2 => SharedBuffer::new(BzDecoder::new(BufReader::new(open_file()))),
1973        ArchiveFormat::TarGzip => SharedBuffer::new(GzDecoder::new(BufReader::new(open_file()))),
1974        ArchiveFormat::TarZstd => SharedBuffer::new(
1975            zstd::stream::read::Decoder::new(BufReader::new(open_file())).unwrap(),
1976        ),
1977        ArchiveFormat::TarLz4 => {
1978            SharedBuffer::new(lz4::Decoder::new(BufReader::new(open_file())).unwrap())
1979        }
1980        ArchiveFormat::Tar => SharedBuffer::new(BufReader::new(open_file())),
1981    }
1982}
1983
1984#[cfg(feature = "dev-context-only-utils")]
1985fn untar_snapshot_in(
1986    snapshot_tar: impl AsRef<Path>,
1987    unpack_dir: &Path,
1988    account_paths: &[PathBuf],
1989    archive_format: ArchiveFormat,
1990    parallel_divisions: usize,
1991) -> Result<UnpackedAppendVecMap> {
1992    let shared_buffer = untar_snapshot_create_shared_buffer(snapshot_tar.as_ref(), archive_format);
1993    unpack_snapshot_local(shared_buffer, unpack_dir, account_paths, parallel_divisions)
1994}
1995
1996pub fn verify_unpacked_snapshots_dir_and_version(
1997    unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
1998) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
1999    info!(
2000        "snapshot version: {}",
2001        &unpacked_snapshots_dir_and_version.snapshot_version
2002    );
2003
2004    let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
2005    let mut bank_snapshots =
2006        get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
2007    if bank_snapshots.len() > 1 {
2008        return Err(IoError::other(format!(
2009            "invalid snapshot format: only one snapshot allowed, but found {}",
2010            bank_snapshots.len(),
2011        ))
2012        .into());
2013    }
2014    let root_paths = bank_snapshots.pop().ok_or_else(|| {
2015        IoError::other(format!(
2016            "no snapshots found in snapshots directory '{}'",
2017            unpacked_snapshots_dir_and_version
2018                .unpacked_snapshots_dir
2019                .display(),
2020        ))
2021    })?;
2022    Ok((snapshot_version, root_paths))
2023}
2024
2025/// Returns the file name of the bank snapshot for `slot`
2026pub fn get_snapshot_file_name(slot: Slot) -> String {
2027    slot.to_string()
2028}
2029
2030/// Constructs the path to the bank snapshot directory for `slot` within `bank_snapshots_dir`
2031pub fn get_bank_snapshot_dir(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) -> PathBuf {
2032    bank_snapshots_dir
2033        .as_ref()
2034        .join(get_snapshot_file_name(slot))
2035}
2036
2037#[derive(Debug, Copy, Clone)]
2038/// allow tests to specify what happened to the serialized format
2039pub enum VerifyBank {
2040    /// the bank's serialized format is expected to be identical to what we are comparing against
2041    Deterministic,
2042    /// the serialized bank was 'reserialized' into a non-deterministic format
2043    /// so, deserialize both files and compare deserialized results
2044    NonDeterministic,
2045}
2046
2047#[cfg(feature = "dev-context-only-utils")]
2048pub fn verify_snapshot_archive(
2049    snapshot_archive: impl AsRef<Path>,
2050    snapshots_to_verify: impl AsRef<Path>,
2051    archive_format: ArchiveFormat,
2052    verify_bank: VerifyBank,
2053    slot: Slot,
2054) {
2055    let temp_dir = tempfile::TempDir::new().unwrap();
2056    let unpack_dir = temp_dir.path();
2057    let unpack_account_dir = create_accounts_run_and_snapshot_dirs(unpack_dir).unwrap().0;
2058    untar_snapshot_in(
2059        snapshot_archive,
2060        unpack_dir,
2061        &[unpack_account_dir.clone()],
2062        archive_format,
2063        1,
2064    )
2065    .unwrap();
2066
2067    // Check snapshots are the same
2068    let unpacked_snapshots = unpack_dir.join("snapshots");
2069
2070    // Since the unpack code collects all the appendvecs into one directory unpack_account_dir, we need to
2071    // collect all the appendvecs in account_paths/<slot>/snapshot/ into one directory for later comparison.
2072    let storages_to_verify = unpack_dir.join("storages_to_verify");
2073    // Create the directory if it doesn't exist
2074    fs::create_dir_all(&storages_to_verify).unwrap();
2075
2076    let slot = slot.to_string();
2077    let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
2078
2079    if let VerifyBank::NonDeterministic = verify_bank {
2080        // file contents may be different, but deserialized structs should be equal
2081        let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
2082        let p2 = unpacked_snapshots.join(&slot).join(&slot);
2083        assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
2084        fs::remove_file(p1).unwrap();
2085        fs::remove_file(p2).unwrap();
2086    }
2087
2088    // The new the status_cache file is inside the slot directory together with the snapshot file.
2089    // When unpacking an archive, the status_cache file from the archive is one-level up outside of
2090    //  the slot directory.
2091    // The unpacked status_cache file need to be put back into the slot directory for the directory
2092    // comparison to pass.
2093    let existing_unpacked_status_cache_file =
2094        unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2095    let new_unpacked_status_cache_file = unpacked_snapshots
2096        .join(&slot)
2097        .join(SNAPSHOT_STATUS_CACHE_FILENAME);
2098    fs::rename(
2099        existing_unpacked_status_cache_file,
2100        new_unpacked_status_cache_file,
2101    )
2102    .unwrap();
2103
2104    let accounts_hardlinks_dir = snapshot_slot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2105    if accounts_hardlinks_dir.is_dir() {
2106        // This directory contain symlinks to all <account_path>/snapshot/<slot> directories.
2107        for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
2108            let link_dst_path = fs::read_link(entry.unwrap().path()).unwrap();
2109            // Copy all the files in dst_path into the storages_to_verify directory.
2110            for entry in fs::read_dir(&link_dst_path).unwrap() {
2111                let src_path = entry.unwrap().path();
2112                let dst_path = storages_to_verify.join(src_path.file_name().unwrap());
2113                fs::copy(src_path, dst_path).unwrap();
2114            }
2115        }
2116        fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
2117    }
2118
2119    let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
2120    if version_path.is_file() {
2121        fs::remove_file(version_path).unwrap();
2122    }
2123
2124    let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2125    if state_complete_path.is_file() {
2126        fs::remove_file(state_complete_path).unwrap();
2127    }
2128
2129    assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
2130
2131    // In the unarchiving case, there is an extra empty "accounts" directory. The account
2132    // files in the archive accounts/ have been expanded to [account_paths].
2133    // Remove the empty "accounts" directory for the directory comparison below.
2134    // In some test cases the directory to compare do not come from unarchiving.
2135    // Ignore the error when this directory does not exist.
2136    _ = fs::remove_dir(unpack_account_dir.join("accounts"));
2137    // Check the account entries are the same
2138    assert!(!dir_diff::is_different(&storages_to_verify, unpack_account_dir).unwrap());
2139}
2140
2141/// Purges all bank snapshots
2142pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
2143    let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2144    purge_bank_snapshots(&bank_snapshots);
2145}
2146
2147/// Purges bank snapshots, retaining the newest `num_bank_snapshots_to_retain`
2148pub fn purge_old_bank_snapshots(
2149    bank_snapshots_dir: impl AsRef<Path>,
2150    num_bank_snapshots_to_retain: usize,
2151    filter_by_kind: Option<BankSnapshotKind>,
2152) {
2153    let mut bank_snapshots = match filter_by_kind {
2154        Some(BankSnapshotKind::Pre) => get_bank_snapshots_pre(&bank_snapshots_dir),
2155        Some(BankSnapshotKind::Post) => get_bank_snapshots_post(&bank_snapshots_dir),
2156        None => get_bank_snapshots(&bank_snapshots_dir),
2157    };
2158
2159    bank_snapshots.sort_unstable();
2160    purge_bank_snapshots(
2161        bank_snapshots
2162            .iter()
2163            .rev()
2164            .skip(num_bank_snapshots_to_retain),
2165    );
2166}
2167
2168/// At startup, purge old (i.e. unusable) bank snapshots
2169///
2170/// Only a single bank snapshot could be needed at startup (when using fast boot), so
2171/// retain the highest bank snapshot "post", and purge the rest.
2172pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
2173    purge_old_bank_snapshots(&bank_snapshots_dir, 0, Some(BankSnapshotKind::Pre));
2174    purge_old_bank_snapshots(&bank_snapshots_dir, 1, Some(BankSnapshotKind::Post));
2175
2176    let highest_bank_snapshot_post = get_highest_bank_snapshot_post(&bank_snapshots_dir);
2177    if let Some(highest_bank_snapshot_post) = highest_bank_snapshot_post {
2178        debug!(
2179            "Retained bank snapshot for slot {}, and purged the rest.",
2180            highest_bank_snapshot_post.slot
2181        );
2182    }
2183}
2184
2185/// Purges bank snapshots that are older than `slot`
2186pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
2187    let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2188    bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
2189    purge_bank_snapshots(&bank_snapshots);
2190}
2191
2192/// Purges all `bank_snapshots`
2193///
2194/// Does not exit early if there is an error while purging a bank snapshot.
2195fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
2196    for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
2197        if purge_bank_snapshot(snapshot_dir).is_err() {
2198            warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
2199        }
2200    }
2201}
2202
2203/// Remove the bank snapshot at this path
2204pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
2205    const FN_ERR: &str = "failed to purge bank snapshot";
2206    let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2207    if accounts_hardlinks_dir.is_dir() {
2208        // This directory contain symlinks to all accounts snapshot directories.
2209        // They should all be removed.
2210        let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
2211            IoError::other(format!(
2212                "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
2213                accounts_hardlinks_dir.display(),
2214            ))
2215        })?;
2216        for entry in read_dir {
2217            let accounts_hardlink_dir = entry?.path();
2218            let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
2219                IoError::other(format!(
2220                    "{FN_ERR}: failed to read symlink '{}': {err}",
2221                    accounts_hardlink_dir.display(),
2222                ))
2223            })?;
2224            move_and_async_delete_path(&accounts_hardlink_dir);
2225        }
2226    }
2227    fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
2228        IoError::other(format!(
2229            "{FN_ERR}: failed to remove dir '{}': {err}",
2230            bank_snapshot_dir.as_ref().display(),
2231        ))
2232    })?;
2233    Ok(())
2234}
2235
2236pub fn should_take_full_snapshot(
2237    block_height: Slot,
2238    full_snapshot_archive_interval_slots: Slot,
2239) -> bool {
2240    block_height % full_snapshot_archive_interval_slots == 0
2241}
2242
2243pub fn should_take_incremental_snapshot(
2244    block_height: Slot,
2245    incremental_snapshot_archive_interval_slots: Slot,
2246    last_full_snapshot_slot: Option<Slot>,
2247) -> bool {
2248    block_height % incremental_snapshot_archive_interval_slots == 0
2249        && last_full_snapshot_slot.is_some()
2250}
2251
2252/// Creates an "accounts path" directory for tests
2253///
2254/// This temporary directory will contain the "run" and "snapshot"
2255/// sub-directories required by a validator.
2256#[cfg(feature = "dev-context-only-utils")]
2257pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
2258    let tmp_dir = tempfile::TempDir::new().unwrap();
2259    let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
2260    (tmp_dir, account_dir)
2261}
2262
2263#[cfg(test)]
2264mod tests {
2265    use {
2266        super::*,
2267        assert_matches::assert_matches,
2268        bincode::{deserialize_from, serialize_into},
2269        std::{convert::TryFrom, mem::size_of},
2270        tempfile::NamedTempFile,
2271    };
2272    #[test]
2273    fn test_serialize_snapshot_data_file_under_limit() {
2274        let temp_dir = tempfile::TempDir::new().unwrap();
2275        let expected_consumed_size = size_of::<u32>() as u64;
2276        let consumed_size = serialize_snapshot_data_file_capped(
2277            &temp_dir.path().join("data-file"),
2278            expected_consumed_size,
2279            |stream| {
2280                serialize_into(stream, &2323_u32)?;
2281                Ok(())
2282            },
2283        )
2284        .unwrap();
2285        assert_eq!(consumed_size, expected_consumed_size);
2286    }
2287
2288    #[test]
2289    fn test_serialize_snapshot_data_file_over_limit() {
2290        let temp_dir = tempfile::TempDir::new().unwrap();
2291        let expected_consumed_size = size_of::<u32>() as u64;
2292        let result = serialize_snapshot_data_file_capped(
2293            &temp_dir.path().join("data-file"),
2294            expected_consumed_size - 1,
2295            |stream| {
2296                serialize_into(stream, &2323_u32)?;
2297                Ok(())
2298            },
2299        );
2300        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
2301    }
2302
2303    #[test]
2304    fn test_deserialize_snapshot_data_file_under_limit() {
2305        let expected_data = 2323_u32;
2306        let expected_consumed_size = size_of::<u32>() as u64;
2307
2308        let temp_dir = tempfile::TempDir::new().unwrap();
2309        serialize_snapshot_data_file_capped(
2310            &temp_dir.path().join("data-file"),
2311            expected_consumed_size,
2312            |stream| {
2313                serialize_into(stream, &expected_data)?;
2314                Ok(())
2315            },
2316        )
2317        .unwrap();
2318
2319        let snapshot_root_paths = SnapshotRootPaths {
2320            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2321            incremental_snapshot_root_file_path: None,
2322        };
2323
2324        let actual_data = deserialize_snapshot_data_files_capped(
2325            &snapshot_root_paths,
2326            expected_consumed_size,
2327            |stream| {
2328                Ok(deserialize_from::<_, u32>(
2329                    &mut stream.full_snapshot_stream,
2330                )?)
2331            },
2332        )
2333        .unwrap();
2334        assert_eq!(actual_data, expected_data);
2335    }
2336
2337    #[test]
2338    fn test_deserialize_snapshot_data_file_over_limit() {
2339        let expected_data = 2323_u32;
2340        let expected_consumed_size = size_of::<u32>() as u64;
2341
2342        let temp_dir = tempfile::TempDir::new().unwrap();
2343        serialize_snapshot_data_file_capped(
2344            &temp_dir.path().join("data-file"),
2345            expected_consumed_size,
2346            |stream| {
2347                serialize_into(stream, &expected_data)?;
2348                Ok(())
2349            },
2350        )
2351        .unwrap();
2352
2353        let snapshot_root_paths = SnapshotRootPaths {
2354            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2355            incremental_snapshot_root_file_path: None,
2356        };
2357
2358        let result = deserialize_snapshot_data_files_capped(
2359            &snapshot_root_paths,
2360            expected_consumed_size - 1,
2361            |stream| {
2362                Ok(deserialize_from::<_, u32>(
2363                    &mut stream.full_snapshot_stream,
2364                )?)
2365            },
2366        );
2367        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
2368    }
2369
2370    #[test]
2371    fn test_deserialize_snapshot_data_file_extra_data() {
2372        let expected_data = 2323_u32;
2373        let expected_consumed_size = size_of::<u32>() as u64;
2374
2375        let temp_dir = tempfile::TempDir::new().unwrap();
2376        serialize_snapshot_data_file_capped(
2377            &temp_dir.path().join("data-file"),
2378            expected_consumed_size * 2,
2379            |stream| {
2380                serialize_into(stream.by_ref(), &expected_data)?;
2381                serialize_into(stream.by_ref(), &expected_data)?;
2382                Ok(())
2383            },
2384        )
2385        .unwrap();
2386
2387        let snapshot_root_paths = SnapshotRootPaths {
2388            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2389            incremental_snapshot_root_file_path: None,
2390        };
2391
2392        let result = deserialize_snapshot_data_files_capped(
2393            &snapshot_root_paths,
2394            expected_consumed_size * 2,
2395            |stream| {
2396                Ok(deserialize_from::<_, u32>(
2397                    &mut stream.full_snapshot_stream,
2398                )?)
2399            },
2400        );
2401        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2402    }
2403
2404    #[test]
2405    fn test_snapshot_version_from_file_under_limit() {
2406        let file_content = SnapshotVersion::default().as_str();
2407        let mut file = NamedTempFile::new().unwrap();
2408        file.write_all(file_content.as_bytes()).unwrap();
2409        let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2410        assert_eq!(version_from_file, file_content);
2411    }
2412
2413    #[test]
2414    fn test_snapshot_version_from_file_over_limit() {
2415        let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2416        let file_content = vec![7u8; over_limit_size];
2417        let mut file = NamedTempFile::new().unwrap();
2418        file.write_all(&file_content).unwrap();
2419        assert_matches!(
2420            snapshot_version_from_file(file.path()),
2421            Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("snapshot version file too large")
2422        );
2423    }
2424
2425    #[test]
2426    fn test_parse_full_snapshot_archive_filename() {
2427        assert_eq!(
2428            parse_full_snapshot_archive_filename(&format!(
2429                "snapshot-42-{}.tar.bz2",
2430                Hash::default()
2431            ))
2432            .unwrap(),
2433            (42, SnapshotHash(Hash::default()), ArchiveFormat::TarBzip2)
2434        );
2435        assert_eq!(
2436            parse_full_snapshot_archive_filename(&format!(
2437                "snapshot-43-{}.tar.zst",
2438                Hash::default()
2439            ))
2440            .unwrap(),
2441            (43, SnapshotHash(Hash::default()), ArchiveFormat::TarZstd)
2442        );
2443        assert_eq!(
2444            parse_full_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default()))
2445                .unwrap(),
2446            (44, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2447        );
2448        assert_eq!(
2449            parse_full_snapshot_archive_filename(&format!(
2450                "snapshot-45-{}.tar.lz4",
2451                Hash::default()
2452            ))
2453            .unwrap(),
2454            (45, SnapshotHash(Hash::default()), ArchiveFormat::TarLz4)
2455        );
2456
2457        assert!(parse_full_snapshot_archive_filename("invalid").is_err());
2458        assert!(
2459            parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err()
2460        );
2461
2462        assert!(
2463            parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err()
2464        );
2465        assert!(parse_full_snapshot_archive_filename(&format!(
2466            "snapshot-12345678-{}.bad!ext",
2467            Hash::new_unique()
2468        ))
2469        .is_err());
2470        assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2471
2472        assert!(parse_full_snapshot_archive_filename(&format!(
2473            "snapshot-bad!slot-{}.bad!ext",
2474            Hash::new_unique()
2475        ))
2476        .is_err());
2477        assert!(parse_full_snapshot_archive_filename(&format!(
2478            "snapshot-12345678-{}.bad!ext",
2479            Hash::new_unique()
2480        ))
2481        .is_err());
2482        assert!(parse_full_snapshot_archive_filename(&format!(
2483            "snapshot-bad!slot-{}.tar",
2484            Hash::new_unique()
2485        ))
2486        .is_err());
2487
2488        assert!(parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_err());
2489        assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2490        assert!(parse_full_snapshot_archive_filename(&format!(
2491            "snapshot-bad!slot-{}.tar",
2492            Hash::new_unique()
2493        ))
2494        .is_err());
2495    }
2496
2497    #[test]
2498    fn test_parse_incremental_snapshot_archive_filename() {
2499        assert_eq!(
2500            parse_incremental_snapshot_archive_filename(&format!(
2501                "incremental-snapshot-42-123-{}.tar.bz2",
2502                Hash::default()
2503            ))
2504            .unwrap(),
2505            (
2506                42,
2507                123,
2508                SnapshotHash(Hash::default()),
2509                ArchiveFormat::TarBzip2
2510            )
2511        );
2512        assert_eq!(
2513            parse_incremental_snapshot_archive_filename(&format!(
2514                "incremental-snapshot-43-234-{}.tar.zst",
2515                Hash::default()
2516            ))
2517            .unwrap(),
2518            (
2519                43,
2520                234,
2521                SnapshotHash(Hash::default()),
2522                ArchiveFormat::TarZstd
2523            )
2524        );
2525        assert_eq!(
2526            parse_incremental_snapshot_archive_filename(&format!(
2527                "incremental-snapshot-44-345-{}.tar",
2528                Hash::default()
2529            ))
2530            .unwrap(),
2531            (44, 345, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2532        );
2533        assert_eq!(
2534            parse_incremental_snapshot_archive_filename(&format!(
2535                "incremental-snapshot-45-456-{}.tar.lz4",
2536                Hash::default()
2537            ))
2538            .unwrap(),
2539            (
2540                45,
2541                456,
2542                SnapshotHash(Hash::default()),
2543                ArchiveFormat::TarLz4
2544            )
2545        );
2546
2547        assert!(parse_incremental_snapshot_archive_filename("invalid").is_err());
2548        assert!(parse_incremental_snapshot_archive_filename(&format!(
2549            "snapshot-42-{}.tar",
2550            Hash::new_unique()
2551        ))
2552        .is_err());
2553        assert!(parse_incremental_snapshot_archive_filename(
2554            "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext"
2555        )
2556        .is_err());
2557
2558        assert!(parse_incremental_snapshot_archive_filename(&format!(
2559            "incremental-snapshot-bad!slot-56785678-{}.tar",
2560            Hash::new_unique()
2561        ))
2562        .is_err());
2563
2564        assert!(parse_incremental_snapshot_archive_filename(&format!(
2565            "incremental-snapshot-12345678-bad!slot-{}.tar",
2566            Hash::new_unique()
2567        ))
2568        .is_err());
2569
2570        assert!(parse_incremental_snapshot_archive_filename(
2571            "incremental-snapshot-12341234-56785678-bad!HASH.tar"
2572        )
2573        .is_err());
2574
2575        assert!(parse_incremental_snapshot_archive_filename(&format!(
2576            "incremental-snapshot-12341234-56785678-{}.bad!ext",
2577            Hash::new_unique()
2578        ))
2579        .is_err());
2580    }
2581
2582    #[test]
2583    fn test_check_are_snapshots_compatible() {
2584        let slot1: Slot = 1234;
2585        let slot2: Slot = 5678;
2586        let slot3: Slot = 999_999;
2587
2588        let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
2589            format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()),
2590        ))
2591        .unwrap();
2592
2593        assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
2594
2595        let incremental_snapshot_archive_info =
2596            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2597                "/dir/incremental-snapshot-{}-{}-{}.tar",
2598                slot1,
2599                slot2,
2600                Hash::new_unique()
2601            )))
2602            .unwrap();
2603
2604        assert!(check_are_snapshots_compatible(
2605            &full_snapshot_archive_info,
2606            Some(&incremental_snapshot_archive_info)
2607        )
2608        .is_ok());
2609
2610        let incremental_snapshot_archive_info =
2611            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2612                "/dir/incremental-snapshot-{}-{}-{}.tar",
2613                slot2,
2614                slot3,
2615                Hash::new_unique()
2616            )))
2617            .unwrap();
2618
2619        assert!(check_are_snapshots_compatible(
2620            &full_snapshot_archive_info,
2621            Some(&incremental_snapshot_archive_info)
2622        )
2623        .is_err());
2624    }
2625
2626    /// A test heler function that creates bank snapshot files
2627    fn common_create_bank_snapshot_files(
2628        bank_snapshots_dir: &Path,
2629        min_slot: Slot,
2630        max_slot: Slot,
2631    ) {
2632        for slot in min_slot..max_slot {
2633            let snapshot_dir = get_bank_snapshot_dir(bank_snapshots_dir, slot);
2634            fs::create_dir_all(&snapshot_dir).unwrap();
2635
2636            let snapshot_filename = get_snapshot_file_name(slot);
2637            let snapshot_path = snapshot_dir.join(snapshot_filename);
2638            fs::File::create(snapshot_path).unwrap();
2639
2640            let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2641            fs::File::create(status_cache_file).unwrap();
2642
2643            let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
2644            fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
2645
2646            // Mark this directory complete so it can be used.  Check this flag first before selecting for deserialization.
2647            let state_complete_path = snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2648            fs::File::create(state_complete_path).unwrap();
2649        }
2650    }
2651
2652    #[test]
2653    fn test_get_bank_snapshots() {
2654        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2655        let min_slot = 10;
2656        let max_slot = 20;
2657        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2658
2659        let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
2660        assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
2661    }
2662
2663    #[test]
2664    fn test_get_highest_bank_snapshot_post() {
2665        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2666        let min_slot = 99;
2667        let max_slot = 123;
2668        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2669
2670        let highest_bank_snapshot = get_highest_bank_snapshot_post(temp_snapshots_dir.path());
2671        assert!(highest_bank_snapshot.is_some());
2672        assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
2673    }
2674
2675    /// A test helper function that creates full and incremental snapshot archive files.  Creates
2676    /// full snapshot files in the range (`min_full_snapshot_slot`, `max_full_snapshot_slot`], and
2677    /// incremental snapshot files in the range (`min_incremental_snapshot_slot`,
2678    /// `max_incremental_snapshot_slot`].  Additionally, "bad" files are created for both full and
2679    /// incremental snapshots to ensure the tests properly filter them out.
2680    fn common_create_snapshot_archive_files(
2681        full_snapshot_archives_dir: &Path,
2682        incremental_snapshot_archives_dir: &Path,
2683        min_full_snapshot_slot: Slot,
2684        max_full_snapshot_slot: Slot,
2685        min_incremental_snapshot_slot: Slot,
2686        max_incremental_snapshot_slot: Slot,
2687    ) {
2688        fs::create_dir_all(full_snapshot_archives_dir).unwrap();
2689        fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
2690        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
2691            for incremental_snapshot_slot in
2692                min_incremental_snapshot_slot..max_incremental_snapshot_slot
2693            {
2694                let snapshot_filename = format!(
2695                    "incremental-snapshot-{}-{}-{}.tar",
2696                    full_snapshot_slot,
2697                    incremental_snapshot_slot,
2698                    Hash::default()
2699                );
2700                let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
2701                fs::File::create(snapshot_filepath).unwrap();
2702            }
2703
2704            let snapshot_filename =
2705                format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
2706            let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
2707            fs::File::create(snapshot_filepath).unwrap();
2708
2709            // Add in an incremental snapshot with a bad filename and high slot to ensure filename are filtered and sorted correctly
2710            let bad_filename = format!(
2711                "incremental-snapshot-{}-{}-bad!hash.tar",
2712                full_snapshot_slot,
2713                max_incremental_snapshot_slot + 1,
2714            );
2715            let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
2716            fs::File::create(bad_filepath).unwrap();
2717        }
2718
2719        // Add in a snapshot with a bad filename and high slot to ensure filename are filtered and
2720        // sorted correctly
2721        let bad_filename = format!("snapshot-{}-bad!hash.tar", max_full_snapshot_slot + 1);
2722        let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
2723        fs::File::create(bad_filepath).unwrap();
2724    }
2725
2726    #[test]
2727    fn test_get_full_snapshot_archives() {
2728        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2729        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2730        let min_slot = 123;
2731        let max_slot = 456;
2732        common_create_snapshot_archive_files(
2733            full_snapshot_archives_dir.path(),
2734            incremental_snapshot_archives_dir.path(),
2735            min_slot,
2736            max_slot,
2737            0,
2738            0,
2739        );
2740
2741        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2742        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
2743    }
2744
2745    #[test]
2746    fn test_get_full_snapshot_archives_remote() {
2747        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2748        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2749        let min_slot = 123;
2750        let max_slot = 456;
2751        common_create_snapshot_archive_files(
2752            &full_snapshot_archives_dir
2753                .path()
2754                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
2755            &incremental_snapshot_archives_dir
2756                .path()
2757                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
2758            min_slot,
2759            max_slot,
2760            0,
2761            0,
2762        );
2763
2764        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2765        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
2766        assert!(snapshot_archives.iter().all(|info| info.is_remote()));
2767    }
2768
2769    #[test]
2770    fn test_get_incremental_snapshot_archives() {
2771        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2772        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2773        let min_full_snapshot_slot = 12;
2774        let max_full_snapshot_slot = 23;
2775        let min_incremental_snapshot_slot = 34;
2776        let max_incremental_snapshot_slot = 45;
2777        common_create_snapshot_archive_files(
2778            full_snapshot_archives_dir.path(),
2779            incremental_snapshot_archives_dir.path(),
2780            min_full_snapshot_slot,
2781            max_full_snapshot_slot,
2782            min_incremental_snapshot_slot,
2783            max_incremental_snapshot_slot,
2784        );
2785
2786        let incremental_snapshot_archives =
2787            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
2788        assert_eq!(
2789            incremental_snapshot_archives.len() as Slot,
2790            (max_full_snapshot_slot - min_full_snapshot_slot)
2791                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
2792        );
2793    }
2794
2795    #[test]
2796    fn test_get_incremental_snapshot_archives_remote() {
2797        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2798        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2799        let min_full_snapshot_slot = 12;
2800        let max_full_snapshot_slot = 23;
2801        let min_incremental_snapshot_slot = 34;
2802        let max_incremental_snapshot_slot = 45;
2803        common_create_snapshot_archive_files(
2804            &full_snapshot_archives_dir
2805                .path()
2806                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
2807            &incremental_snapshot_archives_dir
2808                .path()
2809                .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
2810            min_full_snapshot_slot,
2811            max_full_snapshot_slot,
2812            min_incremental_snapshot_slot,
2813            max_incremental_snapshot_slot,
2814        );
2815
2816        let incremental_snapshot_archives =
2817            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
2818        assert_eq!(
2819            incremental_snapshot_archives.len() as Slot,
2820            (max_full_snapshot_slot - min_full_snapshot_slot)
2821                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
2822        );
2823        assert!(incremental_snapshot_archives
2824            .iter()
2825            .all(|info| info.is_remote()));
2826    }
2827
2828    #[test]
2829    fn test_get_highest_full_snapshot_archive_slot() {
2830        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2831        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2832        let min_slot = 123;
2833        let max_slot = 456;
2834        common_create_snapshot_archive_files(
2835            full_snapshot_archives_dir.path(),
2836            incremental_snapshot_archives_dir.path(),
2837            min_slot,
2838            max_slot,
2839            0,
2840            0,
2841        );
2842
2843        assert_eq!(
2844            get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
2845            Some(max_slot - 1)
2846        );
2847    }
2848
2849    #[test]
2850    fn test_get_highest_incremental_snapshot_slot() {
2851        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2852        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2853        let min_full_snapshot_slot = 12;
2854        let max_full_snapshot_slot = 23;
2855        let min_incremental_snapshot_slot = 34;
2856        let max_incremental_snapshot_slot = 45;
2857        common_create_snapshot_archive_files(
2858            full_snapshot_archives_dir.path(),
2859            incremental_snapshot_archives_dir.path(),
2860            min_full_snapshot_slot,
2861            max_full_snapshot_slot,
2862            min_incremental_snapshot_slot,
2863            max_incremental_snapshot_slot,
2864        );
2865
2866        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
2867            assert_eq!(
2868                get_highest_incremental_snapshot_archive_slot(
2869                    incremental_snapshot_archives_dir.path(),
2870                    full_snapshot_slot
2871                ),
2872                Some(max_incremental_snapshot_slot - 1)
2873            );
2874        }
2875
2876        assert_eq!(
2877            get_highest_incremental_snapshot_archive_slot(
2878                incremental_snapshot_archives_dir.path(),
2879                max_full_snapshot_slot
2880            ),
2881            None
2882        );
2883    }
2884
2885    fn common_test_purge_old_snapshot_archives(
2886        snapshot_names: &[&String],
2887        maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2888        maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2889        expected_snapshots: &[&String],
2890    ) {
2891        let temp_snap_dir = tempfile::TempDir::new().unwrap();
2892
2893        for snap_name in snapshot_names {
2894            let snap_path = temp_snap_dir.path().join(snap_name);
2895            let mut _snap_file = fs::File::create(snap_path);
2896        }
2897        purge_old_snapshot_archives(
2898            temp_snap_dir.path(),
2899            temp_snap_dir.path(),
2900            maximum_full_snapshot_archives_to_retain,
2901            maximum_incremental_snapshot_archives_to_retain,
2902        );
2903
2904        let mut retained_snaps = HashSet::new();
2905        for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
2906            let entry_path_buf = entry.unwrap().path();
2907            let entry_path = entry_path_buf.as_path();
2908            let snapshot_name = entry_path
2909                .file_name()
2910                .unwrap()
2911                .to_str()
2912                .unwrap()
2913                .to_string();
2914            retained_snaps.insert(snapshot_name);
2915        }
2916
2917        for snap_name in expected_snapshots {
2918            assert!(
2919                retained_snaps.contains(snap_name.as_str()),
2920                "{snap_name} not found"
2921            );
2922        }
2923        assert_eq!(retained_snaps.len(), expected_snapshots.len());
2924    }
2925
2926    #[test]
2927    fn test_purge_old_full_snapshot_archives() {
2928        let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
2929        let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
2930        let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
2931        let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
2932
2933        // expecting only the newest to be retained
2934        let expected_snapshots = vec![&snap3_name];
2935        common_test_purge_old_snapshot_archives(
2936            &snapshot_names,
2937            NonZeroUsize::new(1).unwrap(),
2938            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
2939            &expected_snapshots,
2940        );
2941
2942        // retaining 2, expecting the 2 newest to be retained
2943        let expected_snapshots = vec![&snap2_name, &snap3_name];
2944        common_test_purge_old_snapshot_archives(
2945            &snapshot_names,
2946            NonZeroUsize::new(2).unwrap(),
2947            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
2948            &expected_snapshots,
2949        );
2950
2951        // retaining 3, all three should be retained
2952        let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
2953        common_test_purge_old_snapshot_archives(
2954            &snapshot_names,
2955            NonZeroUsize::new(3).unwrap(),
2956            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
2957            &expected_snapshots,
2958        );
2959    }
2960
2961    /// Mimic a running node's behavior w.r.t. purging old snapshot archives.  Take snapshots in a
2962    /// loop, and periodically purge old snapshot archives.  After purging, check to make sure the
2963    /// snapshot archives on disk are correct.
2964    #[test]
2965    fn test_purge_old_full_snapshot_archives_in_the_loop() {
2966        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2967        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2968        let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
2969        let starting_slot: Slot = 42;
2970
2971        for slot in (starting_slot..).take(100) {
2972            let full_snapshot_archive_file_name =
2973                format!("snapshot-{}-{}.tar", slot, Hash::default());
2974            let full_snapshot_archive_path = full_snapshot_archives_dir
2975                .as_ref()
2976                .join(full_snapshot_archive_file_name);
2977            fs::File::create(full_snapshot_archive_path).unwrap();
2978
2979            // don't purge-and-check until enough snapshot archives have been created
2980            if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
2981                continue;
2982            }
2983
2984            // purge infrequently, so there will always be snapshot archives to purge
2985            if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
2986                continue;
2987            }
2988
2989            purge_old_snapshot_archives(
2990                &full_snapshot_archives_dir,
2991                &incremental_snapshot_archives_dir,
2992                maximum_snapshots_to_retain,
2993                NonZeroUsize::new(usize::MAX).unwrap(),
2994            );
2995            let mut full_snapshot_archives =
2996                get_full_snapshot_archives(&full_snapshot_archives_dir);
2997            full_snapshot_archives.sort_unstable();
2998            assert_eq!(
2999                full_snapshot_archives.len(),
3000                maximum_snapshots_to_retain.get()
3001            );
3002            assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
3003            for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
3004                assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
3005            }
3006        }
3007    }
3008
3009    #[test]
3010    fn test_purge_old_incremental_snapshot_archives() {
3011        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3012        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3013        let starting_slot = 100_000;
3014
3015        let maximum_incremental_snapshot_archives_to_retain =
3016            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3017        let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3018
3019        let incremental_snapshot_interval = 100;
3020        let num_incremental_snapshots_per_full_snapshot =
3021            maximum_incremental_snapshot_archives_to_retain.get() * 2;
3022        let full_snapshot_interval =
3023            incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
3024
3025        let mut snapshot_filenames = vec![];
3026        (starting_slot..)
3027            .step_by(full_snapshot_interval)
3028            .take(
3029                maximum_full_snapshot_archives_to_retain
3030                    .checked_mul(NonZeroUsize::new(2).unwrap())
3031                    .unwrap()
3032                    .get(),
3033            )
3034            .for_each(|full_snapshot_slot| {
3035                let snapshot_filename =
3036                    format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3037                let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
3038                fs::File::create(snapshot_path).unwrap();
3039                snapshot_filenames.push(snapshot_filename);
3040
3041                (full_snapshot_slot..)
3042                    .step_by(incremental_snapshot_interval)
3043                    .take(num_incremental_snapshots_per_full_snapshot)
3044                    .skip(1)
3045                    .for_each(|incremental_snapshot_slot| {
3046                        let snapshot_filename = format!(
3047                            "incremental-snapshot-{}-{}-{}.tar",
3048                            full_snapshot_slot,
3049                            incremental_snapshot_slot,
3050                            Hash::default()
3051                        );
3052                        let snapshot_path = incremental_snapshot_archives_dir
3053                            .path()
3054                            .join(&snapshot_filename);
3055                        fs::File::create(snapshot_path).unwrap();
3056                        snapshot_filenames.push(snapshot_filename);
3057                    });
3058            });
3059
3060        purge_old_snapshot_archives(
3061            full_snapshot_archives_dir.path(),
3062            incremental_snapshot_archives_dir.path(),
3063            maximum_full_snapshot_archives_to_retain,
3064            maximum_incremental_snapshot_archives_to_retain,
3065        );
3066
3067        // Ensure correct number of full snapshot archives are purged/retained
3068        let mut remaining_full_snapshot_archives =
3069            get_full_snapshot_archives(full_snapshot_archives_dir.path());
3070        assert_eq!(
3071            remaining_full_snapshot_archives.len(),
3072            maximum_full_snapshot_archives_to_retain.get(),
3073        );
3074        remaining_full_snapshot_archives.sort_unstable();
3075        let latest_full_snapshot_archive_slot =
3076            remaining_full_snapshot_archives.last().unwrap().slot();
3077
3078        // Ensure correct number of incremental snapshot archives are purged/retained
3079        // For each additional full snapshot archive, one additional (the newest)
3080        // incremental snapshot archive is retained. This is accounted for by the
3081        // `+ maximum_full_snapshot_archives_to_retain.saturating_sub(1)`
3082        let mut remaining_incremental_snapshot_archives =
3083            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3084        assert_eq!(
3085            remaining_incremental_snapshot_archives.len(),
3086            maximum_incremental_snapshot_archives_to_retain
3087                .get()
3088                .saturating_add(
3089                    maximum_full_snapshot_archives_to_retain
3090                        .get()
3091                        .saturating_sub(1)
3092                )
3093        );
3094        remaining_incremental_snapshot_archives.sort_unstable();
3095        remaining_incremental_snapshot_archives.reverse();
3096
3097        // Ensure there exists one incremental snapshot all but the latest full snapshot
3098        for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
3099            let incremental_snapshot_archive =
3100                remaining_incremental_snapshot_archives.pop().unwrap();
3101
3102            let expected_base_slot =
3103                latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
3104            assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
3105            let expected_slot = expected_base_slot
3106                + (full_snapshot_interval - incremental_snapshot_interval) as u64;
3107            assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
3108        }
3109
3110        // Ensure all remaining incremental snapshots are only for the latest full snapshot
3111        for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
3112            assert_eq!(
3113                incremental_snapshot_archive.base_slot(),
3114                latest_full_snapshot_archive_slot
3115            );
3116        }
3117
3118        // Ensure the remaining incremental snapshots are at the right slot
3119        let expected_remaining_incremental_snapshot_archive_slots =
3120            (latest_full_snapshot_archive_slot..)
3121                .step_by(incremental_snapshot_interval)
3122                .take(num_incremental_snapshots_per_full_snapshot)
3123                .skip(
3124                    num_incremental_snapshots_per_full_snapshot
3125                        - maximum_incremental_snapshot_archives_to_retain.get(),
3126                )
3127                .collect::<HashSet<_>>();
3128
3129        let actual_remaining_incremental_snapshot_archive_slots =
3130            remaining_incremental_snapshot_archives
3131                .iter()
3132                .map(|snapshot| snapshot.slot())
3133                .collect::<HashSet<_>>();
3134        assert_eq!(
3135            actual_remaining_incremental_snapshot_archive_slots,
3136            expected_remaining_incremental_snapshot_archive_slots
3137        );
3138    }
3139
3140    #[test]
3141    fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
3142        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3143        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3144
3145        for snapshot_filenames in [
3146            format!("incremental-snapshot-100-120-{}.tar", Hash::default()),
3147            format!("incremental-snapshot-100-140-{}.tar", Hash::default()),
3148            format!("incremental-snapshot-100-160-{}.tar", Hash::default()),
3149            format!("incremental-snapshot-100-180-{}.tar", Hash::default()),
3150            format!("incremental-snapshot-200-220-{}.tar", Hash::default()),
3151            format!("incremental-snapshot-200-240-{}.tar", Hash::default()),
3152            format!("incremental-snapshot-200-260-{}.tar", Hash::default()),
3153            format!("incremental-snapshot-200-280-{}.tar", Hash::default()),
3154        ] {
3155            let snapshot_path = incremental_snapshot_archives_dir
3156                .path()
3157                .join(snapshot_filenames);
3158            fs::File::create(snapshot_path).unwrap();
3159        }
3160
3161        purge_old_snapshot_archives(
3162            full_snapshot_archives_dir.path(),
3163            incremental_snapshot_archives_dir.path(),
3164            NonZeroUsize::new(usize::MAX).unwrap(),
3165            NonZeroUsize::new(usize::MAX).unwrap(),
3166        );
3167
3168        let remaining_incremental_snapshot_archives =
3169            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3170        assert!(remaining_incremental_snapshot_archives.is_empty());
3171    }
3172
3173    #[test]
3174    fn test_get_snapshot_accounts_hardlink_dir() {
3175        let slot: Slot = 1;
3176
3177        let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
3178
3179        let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
3180        let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
3181        let accounts_hardlinks_dir = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
3182        fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
3183
3184        let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
3185        let appendvec_filename = format!("{slot}.0");
3186        let appendvec_path = accounts_dir.join(appendvec_filename);
3187
3188        let ret = get_snapshot_accounts_hardlink_dir(
3189            &appendvec_path,
3190            slot,
3191            &mut account_paths_set,
3192            &accounts_hardlinks_dir,
3193        );
3194        assert!(ret.is_ok());
3195
3196        let wrong_appendvec_path = appendvec_path
3197            .parent()
3198            .unwrap()
3199            .parent()
3200            .unwrap()
3201            .join(appendvec_path.file_name().unwrap());
3202        let ret = get_snapshot_accounts_hardlink_dir(
3203            &wrong_appendvec_path,
3204            slot,
3205            &mut account_paths_set,
3206            accounts_hardlinks_dir,
3207        );
3208
3209        assert_matches!(
3210            ret,
3211            Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
3212        );
3213    }
3214}