solana_runtime/
snapshot_utils.rs

1#[cfg(feature = "dev-context-only-utils")]
2use solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs;
3use {
4    crate::{
5        bank::{BankFieldsToDeserialize, BankFieldsToSerialize, BankHashStats, BankSlotDelta},
6        serde_snapshot::{
7            self, AccountsDbFields, ExtraFieldsToSerialize, SerdeObsoleteAccountsMap,
8            SerializableAccountStorageEntry, SnapshotAccountsDbFields, SnapshotBankFields,
9            SnapshotStreams,
10        },
11        snapshot_package::SnapshotPackage,
12        snapshot_utils::snapshot_storage_rebuilder::{
13            get_slot_and_append_vec_id, SnapshotStorageRebuilder,
14        },
15    },
16    agave_snapshots::{
17        archive_snapshot,
18        error::{
19            AddBankSnapshotError, GetSnapshotAccountsHardLinkDirError,
20            HardLinkStoragesToSnapshotError, SnapshotError, SnapshotFastbootError,
21            SnapshotNewFromDirError,
22        },
23        paths::{self as snapshot_paths, get_incremental_snapshot_archives},
24        snapshot_archive_info::{
25            FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfo,
26            SnapshotArchiveInfoGetter,
27        },
28        snapshot_config::SnapshotConfig,
29        streaming_unarchive_snapshot, ArchiveFormat, Result, SnapshotKind, SnapshotVersion,
30    },
31    crossbeam_channel::{Receiver, Sender},
32    log::*,
33    regex::Regex,
34    semver::Version,
35    solana_accounts_db::{
36        account_storage::AccountStorageMap,
37        accounts_db::{AccountStorageEntry, AccountsDbConfig, AtomicAccountsFileId},
38        accounts_file::{AccountsFile, StorageAccess},
39        utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
40    },
41    solana_clock::Slot,
42    solana_measure::{measure::Measure, measure_time, measure_us},
43    std::{
44        cmp::Ordering,
45        collections::{HashMap, HashSet},
46        fs,
47        io::{self, BufReader, BufWriter, Error as IoError, Read, Seek, Write},
48        mem,
49        num::NonZeroUsize,
50        path::{Path, PathBuf},
51        str::FromStr,
52        sync::{Arc, LazyLock},
53    },
54    tempfile::TempDir,
55};
56
57pub mod snapshot_storage_rebuilder;
58
59/// Limit the size of the obsolete accounts file
60/// If it exceeds this limit, remove the file which will force restore from archives
61/// Limit is set assuming 24 bytes per entry, 5% of 10 billion accounts
62/// = 500 million entries * 24 bytes = 12 GB
63pub const MAX_OBSOLETE_ACCOUNTS_FILE_SIZE: u64 = 1024 * 1024 * 1024 * 12; // 12 GB
64pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
65const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; // byte
66
67// Snapshot Fastboot Version History
68// Legacy - No fastboot version file, storages flushed file presence determines if snapshot is loadable
69// 1.0.0 - Initial version file. Backwards and forwards compatible with Legacy.
70// 2.0.0 - Obsolete Accounts File added, storages flushed file not written anymore
71//         Snapshots created with version 2.0.0 will not fastboot to older versions
72//         Snapshots created with versions <2.0.0 will fastboot to version 2.0.0
73const SNAPSHOT_FASTBOOT_VERSION: Version = Version::new(2, 0, 0);
74
75/// Information about a bank snapshot. Namely the slot of the bank, the path to the snapshot, and
76/// the kind of the snapshot.
77#[derive(PartialEq, Eq, Debug)]
78pub struct BankSnapshotInfo {
79    /// Slot of the bank
80    pub slot: Slot,
81    /// Path to the bank snapshot directory
82    pub snapshot_dir: PathBuf,
83    /// Snapshot version
84    pub snapshot_version: SnapshotVersion,
85    /// Fastboot version
86    pub fastboot_version: Option<Version>,
87}
88
89impl PartialOrd for BankSnapshotInfo {
90    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
91        Some(self.cmp(other))
92    }
93}
94
95// Order BankSnapshotInfo by slot (ascending), which practically is sorting chronologically
96impl Ord for BankSnapshotInfo {
97    fn cmp(&self, other: &Self) -> Ordering {
98        self.slot.cmp(&other.slot)
99    }
100}
101
102impl BankSnapshotInfo {
103    pub fn new_from_dir(
104        bank_snapshots_dir: impl AsRef<Path>,
105        slot: Slot,
106    ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
107        // check this directory to see if there is a BankSnapshotPre and/or
108        // BankSnapshotPost file
109        let bank_snapshot_dir = snapshot_paths::get_bank_snapshot_dir(&bank_snapshots_dir, slot);
110
111        if !bank_snapshot_dir.is_dir() {
112            return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
113                bank_snapshot_dir,
114            ));
115        }
116
117        // Among the files checks, the completion flag file check should be done first to avoid the later
118        // I/O errors.
119
120        // There is a time window from the slot directory being created, and the content being completely
121        // filled.  Check the version file as it is the last file written to avoid using a highest
122        // found slot directory with missing content
123        let version_path = bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
124        let version_str = snapshot_version_from_file(&version_path).map_err(|err| {
125            SnapshotNewFromDirError::IncompleteDir(err, bank_snapshot_dir.clone())
126        })?;
127
128        let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
129            .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
130
131        let status_cache_file =
132            bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_STATUS_CACHE_FILENAME);
133        if !status_cache_file.is_file() {
134            return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
135                status_cache_file,
136            ));
137        }
138
139        let bank_snapshot_path =
140            bank_snapshot_dir.join(snapshot_paths::get_snapshot_file_name(slot));
141        if !bank_snapshot_path.is_file() {
142            return Err(SnapshotNewFromDirError::MissingSnapshotFile(
143                bank_snapshot_dir,
144            ));
145        };
146
147        let snapshot_fastboot_version_path =
148            bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_FASTBOOT_VERSION_FILENAME);
149
150        // If the version file is absent, fastboot_version will be None. This allows versions 3.1+
151        // to load snapshots created by versions <3.1. In version 3.2, the version file will become
152        // mandatory, and its absence can be treated as an error.
153        let fastboot_version = fs::read_to_string(&snapshot_fastboot_version_path)
154            .ok()
155            .map(|version_string| {
156                Version::from_str(version_string.trim())
157                    .map_err(|_| SnapshotNewFromDirError::InvalidFastbootVersion(version_string))
158            })
159            .transpose()?;
160
161        Ok(BankSnapshotInfo {
162            slot,
163            snapshot_dir: bank_snapshot_dir,
164            snapshot_version,
165            fastboot_version,
166        })
167    }
168
169    pub fn snapshot_path(&self) -> PathBuf {
170        self.snapshot_dir
171            .join(snapshot_paths::get_snapshot_file_name(self.slot))
172    }
173}
174
175/// When constructing a bank a snapshot, traditionally the snapshot was from a snapshot archive.  Now,
176/// the snapshot can be from a snapshot directory, or from a snapshot archive.  This is the flag to
177/// indicate which.
178#[derive(Clone, Copy, Debug, Eq, PartialEq)]
179pub enum SnapshotFrom {
180    /// Build from the snapshot archive
181    Archive,
182    /// Build directly from the bank snapshot directory
183    Dir,
184}
185
186/// Helper type when rebuilding from snapshots.  Designed to handle when rebuilding from just a
187/// full snapshot, or from both a full snapshot and an incremental snapshot.
188#[derive(Debug)]
189pub struct SnapshotRootPaths {
190    pub full_snapshot_root_file_path: PathBuf,
191    pub incremental_snapshot_root_file_path: Option<PathBuf>,
192}
193
194/// Helper type to bundle up the results from `unarchive_snapshot()`
195#[derive(Debug)]
196pub struct UnarchivedSnapshot {
197    #[allow(dead_code)]
198    unpack_dir: TempDir,
199    pub storage: AccountStorageMap,
200    pub bank_fields: BankFieldsToDeserialize,
201    pub accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
202    pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
203    pub measure_untar: Measure,
204}
205
206/// Helper type to bundle up the results from `verify_and_unarchive_snapshots()`.
207#[derive(Debug)]
208pub struct UnarchivedSnapshots {
209    pub full_storage: AccountStorageMap,
210    pub incremental_storage: Option<AccountStorageMap>,
211    pub bank_fields: SnapshotBankFields,
212    pub accounts_db_fields: SnapshotAccountsDbFields<SerializableAccountStorageEntry>,
213    pub full_unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
214    pub incremental_unpacked_snapshots_dir_and_version: Option<UnpackedSnapshotsDirAndVersion>,
215    pub full_measure_untar: Measure,
216    pub incremental_measure_untar: Option<Measure>,
217    pub next_append_vec_id: AtomicAccountsFileId,
218}
219
220/// Guard type that keeps the unpack directories of snapshots alive.
221/// Once dropped, the unpack directories are removed.
222#[allow(dead_code)]
223#[derive(Debug)]
224pub struct UnarchivedSnapshotsGuard {
225    full_unpack_dir: TempDir,
226    incremental_unpack_dir: Option<TempDir>,
227}
228/// Helper type for passing around the unpacked snapshots dir and the snapshot version together
229#[derive(Debug)]
230pub struct UnpackedSnapshotsDirAndVersion {
231    pub unpacked_snapshots_dir: PathBuf,
232    pub snapshot_version: SnapshotVersion,
233}
234
235/// Helper type for passing around account storage map and next append vec id
236/// for reconstructing accounts from a snapshot
237pub(crate) struct StorageAndNextAccountsFileId {
238    pub storage: AccountStorageMap,
239    pub next_append_vec_id: AtomicAccountsFileId,
240}
241
242/// The account snapshot directories under <account_path>/snapshot/<slot> contain account files hardlinked
243/// from <account_path>/run taken at snapshot <slot> time.  They are referenced by the symlinks from the
244/// bank snapshot dir snapshot/<slot>/accounts_hardlinks/.  We observed that sometimes the bank snapshot dir
245/// could be deleted but the account snapshot directories were left behind, possibly by some manual operations
246/// or some legacy code not using the symlinks to clean up the account snapshot hardlink directories.
247/// This function cleans up any account snapshot directories that are no longer referenced by the bank
248/// snapshot dirs, to ensure proper snapshot operations.
249pub fn clean_orphaned_account_snapshot_dirs(
250    bank_snapshots_dir: impl AsRef<Path>,
251    account_snapshot_paths: &[PathBuf],
252) -> io::Result<()> {
253    // Create the HashSet of the account snapshot hardlink directories referenced by the snapshot dirs.
254    // This is used to clean up any hardlinks that are no longer referenced by the snapshot dirs.
255    let mut account_snapshot_dirs_referenced = HashSet::new();
256    let snapshots = get_bank_snapshots(bank_snapshots_dir);
257    for snapshot in snapshots {
258        let account_hardlinks_dir = snapshot
259            .snapshot_dir
260            .join(snapshot_paths::SNAPSHOT_ACCOUNTS_HARDLINKS);
261        // loop through entries in the snapshot_hardlink_dir, read the symlinks, add the target to the HashSet
262        let Ok(read_dir) = fs::read_dir(&account_hardlinks_dir) else {
263            // The bank snapshot may not have a hard links dir with the storages.
264            // This is fine, and happens for bank snapshots we do *not* fastboot from.
265            // In this case, log it and go to the next bank snapshot.
266            debug!(
267                "failed to read account hardlinks dir '{}'",
268                account_hardlinks_dir.display(),
269            );
270            continue;
271        };
272        for entry in read_dir {
273            let path = entry?.path();
274            let target = fs::read_link(&path).map_err(|err| {
275                IoError::other(format!(
276                    "failed to read symlink '{}': {err}",
277                    path.display(),
278                ))
279            })?;
280            account_snapshot_dirs_referenced.insert(target);
281        }
282    }
283
284    // loop through the account snapshot hardlink directories, if the directory is not in the account_snapshot_dirs_referenced set, delete it
285    for account_snapshot_path in account_snapshot_paths {
286        let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
287            IoError::other(format!(
288                "failed to read account snapshot dir '{}': {err}",
289                account_snapshot_path.display(),
290            ))
291        })?;
292        for entry in read_dir {
293            let path = entry?.path();
294            if !account_snapshot_dirs_referenced.contains(&path) {
295                info!(
296                    "Removing orphaned account snapshot hardlink directory '{}'...",
297                    path.display()
298                );
299                move_and_async_delete_path(&path);
300            }
301        }
302    }
303
304    Ok(())
305}
306
307/// Purges incomplete bank snapshots
308pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
309    let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
310        // If we cannot read the bank snapshots dir, then there's nothing to do
311        return;
312    };
313
314    let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
315
316    let incomplete_dirs: Vec<_> = read_dir_iter
317        .filter_map(|entry| entry.ok())
318        .map(|entry| entry.path())
319        .filter(|path| path.is_dir())
320        .filter(is_incomplete)
321        .collect();
322
323    // attempt to purge all the incomplete directories; do not exit early
324    for incomplete_dir in incomplete_dirs {
325        let result = purge_bank_snapshot(&incomplete_dir);
326        match result {
327            Ok(_) => info!(
328                "Purged incomplete snapshot dir: {}",
329                incomplete_dir.display()
330            ),
331            Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
332        }
333    }
334}
335
336/// Is the bank snapshot complete?
337fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
338    let version_path = bank_snapshot_dir
339        .as_ref()
340        .join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
341    version_path.is_file()
342}
343
344/// Writes the full snapshot slot file into the bank snapshot dir
345pub fn write_full_snapshot_slot_file(
346    bank_snapshot_dir: impl AsRef<Path>,
347    full_snapshot_slot: Slot,
348) -> io::Result<()> {
349    let full_snapshot_slot_path = bank_snapshot_dir
350        .as_ref()
351        .join(snapshot_paths::SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
352    fs::write(
353        &full_snapshot_slot_path,
354        Slot::to_le_bytes(full_snapshot_slot),
355    )
356    .map_err(|err| {
357        IoError::other(format!(
358            "failed to write full snapshot slot file '{}': {err}",
359            full_snapshot_slot_path.display(),
360        ))
361    })
362}
363
364// Reads the full snapshot slot file from the bank snapshot dir
365pub fn read_full_snapshot_slot_file(bank_snapshot_dir: impl AsRef<Path>) -> io::Result<Slot> {
366    const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
367    let full_snapshot_slot_path = bank_snapshot_dir
368        .as_ref()
369        .join(snapshot_paths::SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
370    let full_snapshot_slot_file_metadata = fs::metadata(&full_snapshot_slot_path)?;
371    if full_snapshot_slot_file_metadata.len() != SLOT_SIZE as u64 {
372        let error_message = format!(
373            "invalid full snapshot slot file size: '{}' has {} bytes (should be {} bytes)",
374            full_snapshot_slot_path.display(),
375            full_snapshot_slot_file_metadata.len(),
376            SLOT_SIZE,
377        );
378        return Err(IoError::other(error_message));
379    }
380    let mut full_snapshot_slot_file = fs::File::open(&full_snapshot_slot_path)?;
381    let mut buffer = [0; SLOT_SIZE];
382    full_snapshot_slot_file.read_exact(&mut buffer)?;
383    let slot = Slot::from_le_bytes(buffer);
384    Ok(slot)
385}
386
387/// Writes files that indicate the bank snapshot is loadable by fastboot
388pub fn mark_bank_snapshot_as_loadable(bank_snapshot_dir: impl AsRef<Path>) -> io::Result<()> {
389    let snapshot_fastboot_version_path = bank_snapshot_dir
390        .as_ref()
391        .join(snapshot_paths::SNAPSHOT_FASTBOOT_VERSION_FILENAME);
392    fs::write(
393        &snapshot_fastboot_version_path,
394        SNAPSHOT_FASTBOOT_VERSION.to_string(),
395    )
396    .map_err(|err| {
397        IoError::other(format!(
398            "failed to write fastboot version file '{}': {err}",
399            snapshot_fastboot_version_path.display(),
400        ))
401    })?;
402    Ok(())
403}
404
405/// Is this bank snapshot loadable?
406fn is_bank_snapshot_loadable(
407    bank_snapshot_dir: impl AsRef<Path>,
408    fastboot_version: Option<&Version>,
409) -> std::result::Result<bool, SnapshotFastbootError> {
410    // Legacy storages flushed file
411    // Read in v3.1 for backwards compatibility, can be removed in v3.2
412    let flushed_storages = bank_snapshot_dir
413        .as_ref()
414        .join(snapshot_paths::SNAPSHOT_STORAGES_FLUSHED_FILENAME);
415    if flushed_storages.is_file() {
416        return Ok(true);
417    }
418
419    if let Some(fastboot_version) = fastboot_version {
420        is_snapshot_fastboot_compatible(fastboot_version)
421    } else {
422        // No fastboot version file, so this is not a fastbootable
423        Ok(false)
424    }
425}
426
427/// Is the fastboot snapshot version compatible?
428fn is_snapshot_fastboot_compatible(
429    version: &Version,
430) -> std::result::Result<bool, SnapshotFastbootError> {
431    if version.major <= SNAPSHOT_FASTBOOT_VERSION.major {
432        Ok(true)
433    } else {
434        Err(SnapshotFastbootError::IncompatibleVersion(version.clone()))
435    }
436}
437
438/// Gets the highest, loadable, bank snapshot
439///
440/// The highest bank snapshot is the one with the highest slot.
441pub fn get_highest_loadable_bank_snapshot(
442    snapshot_config: &SnapshotConfig,
443) -> Option<BankSnapshotInfo> {
444    let highest_bank_snapshot = get_highest_bank_snapshot(&snapshot_config.bank_snapshots_dir)?;
445
446    let is_bank_snapshot_loadable = is_bank_snapshot_loadable(
447        &highest_bank_snapshot.snapshot_dir,
448        highest_bank_snapshot.fastboot_version.as_ref(),
449    );
450
451    match is_bank_snapshot_loadable {
452        Ok(true) => Some(highest_bank_snapshot),
453        Ok(false) => None,
454        Err(err) => {
455            warn!(
456                "Bank snapshot is not loadable '{}': {err}",
457                highest_bank_snapshot.snapshot_dir.display()
458            );
459            None
460        }
461    }
462}
463
464/// If the validator halts in the middle of `archive_snapshot_package()`, the temporary staging
465/// directory won't be cleaned up.  Call this function to clean them up.
466pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
467    if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
468        for entry in entries.flatten() {
469            if entry
470                .file_name()
471                .to_str()
472                .map(|file_name| file_name.starts_with(snapshot_paths::TMP_SNAPSHOT_ARCHIVE_PREFIX))
473                .unwrap_or(false)
474            {
475                let path = entry.path();
476                let result = if path.is_dir() {
477                    fs::remove_dir_all(&path)
478                } else {
479                    fs::remove_file(&path)
480                };
481                if let Err(err) = result {
482                    warn!(
483                        "Failed to remove temporary snapshot archive '{}': {err}",
484                        path.display(),
485                    );
486                }
487            }
488        }
489    }
490}
491
492/// Serializes and archives a snapshot package
493pub fn serialize_and_archive_snapshot_package(
494    snapshot_package: SnapshotPackage,
495    snapshot_config: &SnapshotConfig,
496    should_flush_and_hard_link_storages: bool,
497) -> Result<SnapshotArchiveInfo> {
498    let SnapshotPackage {
499        snapshot_kind,
500        slot: snapshot_slot,
501        block_height,
502        hash: snapshot_hash,
503        mut snapshot_storages,
504        status_cache_slot_deltas,
505        bank_fields_to_serialize,
506        bank_hash_stats,
507        write_version,
508        enqueued: _,
509    } = snapshot_package;
510
511    let bank_snapshot_info = serialize_snapshot(
512        &snapshot_config.bank_snapshots_dir,
513        snapshot_config.snapshot_version,
514        snapshot_storages.as_slice(),
515        status_cache_slot_deltas.as_slice(),
516        bank_fields_to_serialize,
517        bank_hash_stats,
518        write_version,
519        should_flush_and_hard_link_storages,
520    )?;
521
522    // now write the full snapshot slot file after serializing so this bank snapshot is loadable
523    let full_snapshot_archive_slot = match snapshot_kind {
524        SnapshotKind::FullSnapshot => snapshot_slot,
525        SnapshotKind::IncrementalSnapshot(base_slot) => base_slot,
526    };
527    write_full_snapshot_slot_file(&bank_snapshot_info.snapshot_dir, full_snapshot_archive_slot)
528        .map_err(|err| {
529            IoError::other(format!(
530                "failed to serialize snapshot slot {snapshot_slot}, block height {block_height}, \
531                 kind {snapshot_kind:?}: {err}",
532            ))
533        })?;
534
535    let snapshot_archive_path = match snapshot_package.snapshot_kind {
536        SnapshotKind::FullSnapshot => snapshot_paths::build_full_snapshot_archive_path(
537            &snapshot_config.full_snapshot_archives_dir,
538            snapshot_package.slot,
539            &snapshot_package.hash,
540            snapshot_config.archive_format,
541        ),
542        SnapshotKind::IncrementalSnapshot(incremental_snapshot_base_slot) => {
543            // After the snapshot has been serialized, it is now safe (and required) to prune all
544            // the storages that are *not* to be archived for this incremental snapshot.
545            snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
546            snapshot_paths::build_incremental_snapshot_archive_path(
547                &snapshot_config.incremental_snapshot_archives_dir,
548                incremental_snapshot_base_slot,
549                snapshot_package.slot,
550                &snapshot_package.hash,
551                snapshot_config.archive_format,
552            )
553        }
554    };
555
556    let snapshot_archive_info = archive_snapshot(
557        snapshot_kind,
558        snapshot_slot,
559        snapshot_hash,
560        snapshot_storages.as_slice(),
561        &bank_snapshot_info.snapshot_dir,
562        snapshot_archive_path,
563        snapshot_config.archive_format,
564    )?;
565
566    Ok(snapshot_archive_info)
567}
568
569/// Serializes a snapshot into `bank_snapshots_dir`
570#[allow(clippy::too_many_arguments)]
571fn serialize_snapshot(
572    bank_snapshots_dir: impl AsRef<Path>,
573    snapshot_version: SnapshotVersion,
574    snapshot_storages: &[Arc<AccountStorageEntry>],
575    slot_deltas: &[BankSlotDelta],
576    mut bank_fields: BankFieldsToSerialize,
577    bank_hash_stats: BankHashStats,
578    write_version: u64,
579    should_flush_and_hard_link_storages: bool,
580) -> Result<BankSnapshotInfo> {
581    let slot = bank_fields.slot;
582
583    // this lambda function is to facilitate converting between
584    // the AddBankSnapshotError and SnapshotError types
585    let do_serialize_snapshot = || {
586        let mut measure_everything = Measure::start("");
587        let bank_snapshot_dir = snapshot_paths::get_bank_snapshot_dir(&bank_snapshots_dir, slot);
588        if bank_snapshot_dir.exists() {
589            return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
590                bank_snapshot_dir,
591            ));
592        }
593        fs::create_dir_all(&bank_snapshot_dir).map_err(|err| {
594            AddBankSnapshotError::CreateSnapshotDir(err, bank_snapshot_dir.clone())
595        })?;
596
597        // the bank snapshot is stored as bank_snapshots_dir/slot/slot
598        let bank_snapshot_path =
599            bank_snapshot_dir.join(snapshot_paths::get_snapshot_file_name(slot));
600        info!(
601            "Creating bank snapshot for slot {slot} at '{}'",
602            bank_snapshot_path.display(),
603        );
604
605        let bank_snapshot_serializer = move |stream: &mut BufWriter<fs::File>| -> Result<()> {
606            let versioned_epoch_stakes = mem::take(&mut bank_fields.versioned_epoch_stakes);
607            let extra_fields = ExtraFieldsToSerialize {
608                lamports_per_signature: bank_fields.fee_rate_governor.lamports_per_signature,
609                obsolete_incremental_snapshot_persistence: None,
610                obsolete_epoch_accounts_hash: None,
611                versioned_epoch_stakes,
612                accounts_lt_hash: Some(bank_fields.accounts_lt_hash.clone().into()),
613            };
614            serde_snapshot::serialize_bank_snapshot_into(
615                stream,
616                bank_fields,
617                bank_hash_stats,
618                &get_storages_to_serialize(snapshot_storages),
619                extra_fields,
620                write_version,
621            )?;
622            Ok(())
623        };
624        let (bank_snapshot_consumed_size, bank_serialize) = measure_time!(
625            serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
626                .map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
627            "bank serialize"
628        );
629
630        let status_cache_path =
631            bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_STATUS_CACHE_FILENAME);
632        let (status_cache_consumed_size, status_cache_serialize_us) = measure_us!(
633            serde_snapshot::serialize_status_cache(slot_deltas, &status_cache_path)
634                .map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?
635        );
636
637        let version_path = bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
638        let (_, write_version_file_us) = measure_us!(fs::write(
639            &version_path,
640            snapshot_version.as_str().as_bytes(),
641        )
642        .map_err(|err| AddBankSnapshotError::WriteSnapshotVersionFile(err, version_path))?);
643
644        let (flush_storages_us, hard_link_storages_us, serialize_obsolete_accounts_us) =
645            if should_flush_and_hard_link_storages {
646                let flush_measure = Measure::start("");
647                for storage in snapshot_storages {
648                    storage.flush().map_err(|err| {
649                        AddBankSnapshotError::FlushStorage(err, storage.path().to_path_buf())
650                    })?;
651                }
652                let flush_us = flush_measure.end_as_us();
653                let (_, hard_link_us) = measure_us!(hard_link_storages_to_snapshot(
654                    &bank_snapshot_dir,
655                    slot,
656                    snapshot_storages
657                )
658                .map_err(AddBankSnapshotError::HardLinkStorages)?);
659
660                let (_, serialize_obsolete_accounts_us) = measure_us!({
661                    write_obsolete_accounts_to_snapshot(&bank_snapshot_dir, snapshot_storages, slot)
662                        .map_err(|err| {
663                            AddBankSnapshotError::SerializeObsoleteAccounts(Box::new(err))
664                        })?
665                });
666
667                mark_bank_snapshot_as_loadable(&bank_snapshot_dir)
668                    .map_err(AddBankSnapshotError::MarkSnapshotLoadable)?;
669
670                (
671                    Some(flush_us),
672                    Some(hard_link_us),
673                    Some(serialize_obsolete_accounts_us),
674                )
675            } else {
676                (None, None, None)
677            };
678
679        measure_everything.stop();
680
681        // Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
682        datapoint_info!(
683            "snapshot_bank",
684            ("slot", slot, i64),
685            ("bank_size", bank_snapshot_consumed_size, i64),
686            ("status_cache_size", status_cache_consumed_size, i64),
687            ("flush_storages_us", flush_storages_us, Option<i64>),
688            ("hard_link_storages_us", hard_link_storages_us, Option<i64>),
689            ("serialize_obsolete_accounts_us", serialize_obsolete_accounts_us, Option<i64>),
690            ("bank_serialize_us", bank_serialize.as_us(), i64),
691            ("status_cache_serialize_us", status_cache_serialize_us, i64),
692            ("write_version_file_us", write_version_file_us, i64),
693            ("total_us", measure_everything.as_us(), i64),
694        );
695
696        info!(
697            "{} for slot {} at {}",
698            bank_serialize,
699            slot,
700            bank_snapshot_path.display(),
701        );
702
703        Ok(BankSnapshotInfo {
704            slot,
705            snapshot_dir: bank_snapshot_dir,
706            snapshot_version,
707            fastboot_version: None,
708        })
709    };
710
711    do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
712}
713
714/// Get the bank snapshots in a directory
715pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
716    let mut bank_snapshots = Vec::default();
717    match fs::read_dir(&bank_snapshots_dir) {
718        Err(err) => {
719            info!(
720                "Unable to read bank snapshots directory '{}': {err}",
721                bank_snapshots_dir.as_ref().display(),
722            );
723        }
724        Ok(paths) => paths
725            .filter_map(|entry| {
726                // check if this entry is a directory and only a Slot
727                // bank snapshots are bank_snapshots_dir/slot/slot
728                entry
729                    .ok()
730                    .filter(|entry| entry.path().is_dir())
731                    .and_then(|entry| {
732                        entry
733                            .path()
734                            .file_name()
735                            .and_then(|file_name| file_name.to_str())
736                            .and_then(|file_name| file_name.parse::<Slot>().ok())
737                    })
738            })
739            .for_each(
740                |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
741                    Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
742                    // Other threads may be modifying bank snapshots in parallel; only return
743                    // snapshots that are complete as deemed by BankSnapshotInfo::new_from_dir()
744                    Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
745                },
746            ),
747    }
748    bank_snapshots
749}
750
751/// Get the bank snapshot with the highest slot in a directory
752///
753/// This function gets the highest bank snapshot of any kind
754pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
755    do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
756}
757
758fn do_get_highest_bank_snapshot(
759    mut bank_snapshots: Vec<BankSnapshotInfo>,
760) -> Option<BankSnapshotInfo> {
761    bank_snapshots.sort_unstable();
762    bank_snapshots.into_iter().next_back()
763}
764
765pub fn write_obsolete_accounts_to_snapshot(
766    bank_snapshot_dir: impl AsRef<Path>,
767    snapshot_storages: &[Arc<AccountStorageEntry>],
768    snapshot_slot: Slot,
769) -> Result<u64> {
770    let obsolete_accounts =
771        SerdeObsoleteAccountsMap::new_from_storages(snapshot_storages, snapshot_slot);
772    serialize_obsolete_accounts(
773        bank_snapshot_dir,
774        &obsolete_accounts,
775        MAX_OBSOLETE_ACCOUNTS_FILE_SIZE,
776    )
777}
778
779fn serialize_obsolete_accounts(
780    bank_snapshot_dir: impl AsRef<Path>,
781    obsolete_accounts_map: &SerdeObsoleteAccountsMap,
782    maximum_obsolete_accounts_file_size: u64,
783) -> Result<u64> {
784    let obsolete_accounts_path = bank_snapshot_dir
785        .as_ref()
786        .join(snapshot_paths::SNAPSHOT_OBSOLETE_ACCOUNTS_FILENAME);
787    let obsolete_accounts_file = fs::File::create(&obsolete_accounts_path)?;
788    let mut file_stream = BufWriter::new(obsolete_accounts_file);
789
790    serde_snapshot::serialize_into(&mut file_stream, obsolete_accounts_map)?;
791
792    file_stream.flush()?;
793
794    let consumed_size = file_stream.stream_position()?;
795    if consumed_size > maximum_obsolete_accounts_file_size {
796        let error_message = format!(
797            "too large obsolete accounts file to serialize: '{}' has {consumed_size} bytes, max \
798             size is {maximum_obsolete_accounts_file_size}",
799            obsolete_accounts_path.display(),
800        );
801        return Err(IoError::other(error_message).into());
802    }
803    Ok(consumed_size)
804}
805
806fn deserialize_obsolete_accounts(
807    bank_snapshot_dir: impl AsRef<Path>,
808    maximum_obsolete_accounts_file_size: u64,
809) -> Result<SerdeObsoleteAccountsMap> {
810    let obsolete_accounts_path = bank_snapshot_dir
811        .as_ref()
812        .join(snapshot_paths::SNAPSHOT_OBSOLETE_ACCOUNTS_FILENAME);
813    let obsolete_accounts_file = fs::File::open(&obsolete_accounts_path)?;
814    // If the file is too large return error
815    let obsolete_accounts_file_metadata = fs::metadata(&obsolete_accounts_path)?;
816    if obsolete_accounts_file_metadata.len() > maximum_obsolete_accounts_file_size {
817        let error_message = format!(
818            "too large obsolete accounts file to deserialize: '{}' has {} bytes (max size is \
819             {maximum_obsolete_accounts_file_size} bytes)",
820            obsolete_accounts_path.display(),
821            obsolete_accounts_file_metadata.len(),
822        );
823        return Err(IoError::other(error_message).into());
824    }
825
826    let mut data_file_stream = BufReader::new(obsolete_accounts_file);
827
828    let obsolete_accounts = serde_snapshot::deserialize_from(&mut data_file_stream)?;
829
830    Ok(obsolete_accounts)
831}
832
833pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
834where
835    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
836{
837    serialize_snapshot_data_file_capped::<F>(
838        data_file_path,
839        MAX_SNAPSHOT_DATA_FILE_SIZE,
840        serializer,
841    )
842}
843
844pub fn deserialize_snapshot_data_file<T: Sized>(
845    data_file_path: &Path,
846    deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
847) -> Result<T> {
848    let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
849        deserializer(streams.full_snapshot_stream)
850    };
851
852    let wrapped_data_file_path = SnapshotRootPaths {
853        full_snapshot_root_file_path: data_file_path.to_path_buf(),
854        incremental_snapshot_root_file_path: None,
855    };
856
857    deserialize_snapshot_data_files_capped(
858        &wrapped_data_file_path,
859        MAX_SNAPSHOT_DATA_FILE_SIZE,
860        wrapped_deserializer,
861    )
862}
863
864pub fn deserialize_snapshot_data_files<T: Sized>(
865    snapshot_root_paths: &SnapshotRootPaths,
866    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
867) -> Result<T> {
868    deserialize_snapshot_data_files_capped(
869        snapshot_root_paths,
870        MAX_SNAPSHOT_DATA_FILE_SIZE,
871        deserializer,
872    )
873}
874
875fn serialize_snapshot_data_file_capped<F>(
876    data_file_path: &Path,
877    maximum_file_size: u64,
878    serializer: F,
879) -> Result<u64>
880where
881    F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
882{
883    let data_file = fs::File::create(data_file_path)?;
884    let mut data_file_stream = BufWriter::new(data_file);
885    serializer(&mut data_file_stream)?;
886    data_file_stream.flush()?;
887
888    let consumed_size = data_file_stream.stream_position()?;
889    if consumed_size > maximum_file_size {
890        let error_message = format!(
891            "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
892            data_file_path.display(),
893        );
894        return Err(IoError::other(error_message).into());
895    }
896    Ok(consumed_size)
897}
898
899fn deserialize_snapshot_data_files_capped<T: Sized>(
900    snapshot_root_paths: &SnapshotRootPaths,
901    maximum_file_size: u64,
902    deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
903) -> Result<T> {
904    let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
905        create_snapshot_data_file_stream(
906            &snapshot_root_paths.full_snapshot_root_file_path,
907            maximum_file_size,
908        )?;
909
910    let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
911        if let Some(ref incremental_snapshot_root_file_path) =
912            snapshot_root_paths.incremental_snapshot_root_file_path
913        {
914            Some(create_snapshot_data_file_stream(
915                incremental_snapshot_root_file_path,
916                maximum_file_size,
917            )?)
918        } else {
919            None
920        }
921        .unzip();
922
923    let mut snapshot_streams = SnapshotStreams {
924        full_snapshot_stream: &mut full_snapshot_data_file_stream,
925        incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
926    };
927    let ret = deserializer(&mut snapshot_streams)?;
928
929    check_deserialize_file_consumed(
930        full_snapshot_file_size,
931        &snapshot_root_paths.full_snapshot_root_file_path,
932        &mut full_snapshot_data_file_stream,
933    )?;
934
935    if let Some(ref incremental_snapshot_root_file_path) =
936        snapshot_root_paths.incremental_snapshot_root_file_path
937    {
938        check_deserialize_file_consumed(
939            incremental_snapshot_file_size.unwrap(),
940            incremental_snapshot_root_file_path,
941            incremental_snapshot_data_file_stream.as_mut().unwrap(),
942        )?;
943    }
944
945    Ok(ret)
946}
947
948/// Before running the deserializer function, perform common operations on the snapshot archive
949/// files, such as checking the file size and opening the file into a stream.
950fn create_snapshot_data_file_stream(
951    snapshot_root_file_path: impl AsRef<Path>,
952    maximum_file_size: u64,
953) -> Result<(u64, BufReader<std::fs::File>)> {
954    let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
955
956    if snapshot_file_size > maximum_file_size {
957        let error_message = format!(
958            "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
959            snapshot_root_file_path.as_ref().display(),
960            snapshot_file_size,
961            maximum_file_size,
962        );
963        return Err(IoError::other(error_message).into());
964    }
965
966    let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
967    let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
968
969    Ok((snapshot_file_size, snapshot_data_file_stream))
970}
971
972/// After running the deserializer function, perform common checks to ensure the snapshot archive
973/// files were consumed correctly.
974fn check_deserialize_file_consumed(
975    file_size: u64,
976    file_path: impl AsRef<Path>,
977    file_stream: &mut BufReader<std::fs::File>,
978) -> Result<()> {
979    let consumed_size = file_stream.stream_position()?;
980
981    if consumed_size != file_size {
982        let error_message = format!(
983            "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to \
984             deserialize",
985            file_path.as_ref().display(),
986            file_size,
987            consumed_size,
988        );
989        return Err(IoError::other(error_message).into());
990    }
991
992    Ok(())
993}
994
995/// Return account path from the appendvec path after checking its format.
996fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
997    let run_path = appendvec_path.parent()?;
998    let run_file_name = run_path.file_name()?;
999    // All appendvec files should be under <account_path>/run/.
1000    // When generating the bank snapshot directory, they are hardlinked to <account_path>/snapshot/<slot>/
1001    if run_file_name != ACCOUNTS_RUN_DIR {
1002        error!(
1003            "The account path {} does not have run/ as its immediate parent directory.",
1004            run_path.display()
1005        );
1006        return None;
1007    }
1008    let account_path = run_path.parent()?;
1009    Some(account_path.to_path_buf())
1010}
1011
1012/// From an appendvec path, derive the snapshot hardlink path.  If the corresponding snapshot hardlink
1013/// directory does not exist, create it.
1014fn get_snapshot_accounts_hardlink_dir(
1015    appendvec_path: &Path,
1016    bank_slot: Slot,
1017    account_paths: &mut HashSet<PathBuf>,
1018    hardlinks_dir: impl AsRef<Path>,
1019) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1020    let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1021        GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1022    })?;
1023
1024    let snapshot_hardlink_dir = account_path
1025        .join(ACCOUNTS_SNAPSHOT_DIR)
1026        .join(bank_slot.to_string());
1027
1028    // Use the hashset to track, to avoid checking the file system.  Only set up the hardlink directory
1029    // and the symlink to it at the first time of seeing the account_path.
1030    if !account_paths.contains(&account_path) {
1031        let idx = account_paths.len();
1032        debug!(
1033            "for appendvec_path {}, create hard-link path {}",
1034            appendvec_path.display(),
1035            snapshot_hardlink_dir.display()
1036        );
1037        fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1038            GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1039                err,
1040                snapshot_hardlink_dir.clone(),
1041            )
1042        })?;
1043        let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1044        symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1045            GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1046                source: err,
1047                original: snapshot_hardlink_dir.clone(),
1048                link: symlink_path,
1049            }
1050        })?;
1051        account_paths.insert(account_path);
1052    };
1053
1054    Ok(snapshot_hardlink_dir)
1055}
1056
1057/// Hard-link the files from accounts/ to snapshot/<bank_slot>/accounts/
1058/// This keeps the appendvec files alive and with the bank snapshot.  The slot and id
1059/// in the file names are also updated in case its file is a recycled one with inconsistent slot
1060/// and id.
1061pub fn hard_link_storages_to_snapshot(
1062    bank_snapshot_dir: impl AsRef<Path>,
1063    bank_slot: Slot,
1064    snapshot_storages: &[Arc<AccountStorageEntry>],
1065) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1066    let accounts_hardlinks_dir = bank_snapshot_dir
1067        .as_ref()
1068        .join(snapshot_paths::SNAPSHOT_ACCOUNTS_HARDLINKS);
1069    fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1070        HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1071            err,
1072            accounts_hardlinks_dir.clone(),
1073        )
1074    })?;
1075
1076    let mut account_paths: HashSet<PathBuf> = HashSet::new();
1077    for storage in snapshot_storages {
1078        let storage_path = storage.accounts.path();
1079        let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1080            storage_path,
1081            bank_slot,
1082            &mut account_paths,
1083            &accounts_hardlinks_dir,
1084        )?;
1085        // The appendvec could be recycled, so its filename may not be consistent to the slot and id.
1086        // Use the storage slot and id to compose a consistent file name for the hard-link file.
1087        let hardlink_filename = AccountsFile::file_name(storage.slot(), storage.id());
1088        let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1089        fs::hard_link(storage_path, &hard_link_path).map_err(|err| {
1090            HardLinkStoragesToSnapshotError::HardLinkStorage(
1091                err,
1092                storage_path.to_path_buf(),
1093                hard_link_path,
1094            )
1095        })?;
1096    }
1097    Ok(())
1098}
1099
1100/// serializing needs Vec<Vec<Arc<AccountStorageEntry>>>, but data structure at runtime is Vec<Arc<AccountStorageEntry>>
1101/// translates to what we need
1102pub(crate) fn get_storages_to_serialize(
1103    snapshot_storages: &[Arc<AccountStorageEntry>],
1104) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1105    snapshot_storages
1106        .iter()
1107        .map(|storage| vec![Arc::clone(storage)])
1108        .collect::<Vec<_>>()
1109}
1110
1111/// Unarchives the given full and incremental snapshot archives, as long as they are compatible.
1112pub fn verify_and_unarchive_snapshots(
1113    bank_snapshots_dir: impl AsRef<Path>,
1114    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1115    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1116    account_paths: &[PathBuf],
1117    accounts_db_config: &AccountsDbConfig,
1118) -> Result<(UnarchivedSnapshots, UnarchivedSnapshotsGuard)> {
1119    check_are_snapshots_compatible(
1120        full_snapshot_archive_info,
1121        incremental_snapshot_archive_info,
1122    )?;
1123
1124    let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
1125    let UnarchivedSnapshot {
1126        unpack_dir: full_unpack_dir,
1127        storage: full_storage,
1128        bank_fields: full_bank_fields,
1129        accounts_db_fields: full_accounts_db_fields,
1130        unpacked_snapshots_dir_and_version: full_unpacked_snapshots_dir_and_version,
1131        measure_untar: full_measure_untar,
1132    } = unarchive_snapshot(
1133        &bank_snapshots_dir,
1134        snapshot_paths::TMP_SNAPSHOT_ARCHIVE_PREFIX,
1135        full_snapshot_archive_info.path(),
1136        "snapshot untar",
1137        account_paths,
1138        full_snapshot_archive_info.archive_format(),
1139        next_append_vec_id.clone(),
1140        accounts_db_config,
1141    )?;
1142
1143    let (
1144        incremental_unpack_dir,
1145        incremental_storage,
1146        incremental_bank_fields,
1147        incremental_accounts_db_fields,
1148        incremental_unpacked_snapshots_dir_and_version,
1149        incremental_measure_untar,
1150    ) = if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1151        let UnarchivedSnapshot {
1152            unpack_dir,
1153            storage,
1154            bank_fields,
1155            accounts_db_fields,
1156            unpacked_snapshots_dir_and_version,
1157            measure_untar,
1158        } = unarchive_snapshot(
1159            &bank_snapshots_dir,
1160            snapshot_paths::TMP_SNAPSHOT_ARCHIVE_PREFIX,
1161            incremental_snapshot_archive_info.path(),
1162            "incremental snapshot untar",
1163            account_paths,
1164            incremental_snapshot_archive_info.archive_format(),
1165            next_append_vec_id.clone(),
1166            accounts_db_config,
1167        )?;
1168        (
1169            Some(unpack_dir),
1170            Some(storage),
1171            Some(bank_fields),
1172            Some(accounts_db_fields),
1173            Some(unpacked_snapshots_dir_and_version),
1174            Some(measure_untar),
1175        )
1176    } else {
1177        (None, None, None, None, None, None)
1178    };
1179
1180    let bank_fields = SnapshotBankFields::new(full_bank_fields, incremental_bank_fields);
1181    let accounts_db_fields =
1182        SnapshotAccountsDbFields::new(full_accounts_db_fields, incremental_accounts_db_fields);
1183    let next_append_vec_id = Arc::try_unwrap(next_append_vec_id).unwrap();
1184
1185    Ok((
1186        UnarchivedSnapshots {
1187            full_storage,
1188            incremental_storage,
1189            bank_fields,
1190            accounts_db_fields,
1191            full_unpacked_snapshots_dir_and_version,
1192            incremental_unpacked_snapshots_dir_and_version,
1193            full_measure_untar,
1194            incremental_measure_untar,
1195            next_append_vec_id,
1196        },
1197        UnarchivedSnapshotsGuard {
1198            full_unpack_dir,
1199            incremental_unpack_dir,
1200        },
1201    ))
1202}
1203
1204/// Used to determine if a filename is structured like a version file, bank file, or storage file
1205#[derive(PartialEq, Debug)]
1206enum SnapshotFileKind {
1207    Version,
1208    BankFields,
1209    Storage,
1210}
1211
1212/// Determines `SnapshotFileKind` for `filename` if any
1213fn get_snapshot_file_kind(filename: &str) -> Option<SnapshotFileKind> {
1214    static VERSION_FILE_REGEX: LazyLock<Regex> =
1215        LazyLock::new(|| Regex::new(r"^version$").unwrap());
1216    static BANK_FIELDS_FILE_REGEX: LazyLock<Regex> =
1217        LazyLock::new(|| Regex::new(r"^[0-9]+(\.pre)?$").unwrap());
1218
1219    if VERSION_FILE_REGEX.is_match(filename) {
1220        Some(SnapshotFileKind::Version)
1221    } else if BANK_FIELDS_FILE_REGEX.is_match(filename) {
1222        Some(SnapshotFileKind::BankFields)
1223    } else if get_slot_and_append_vec_id(filename).is_ok() {
1224        Some(SnapshotFileKind::Storage)
1225    } else {
1226        None
1227    }
1228}
1229
1230/// Waits for snapshot file
1231/// Due to parallel unpacking, we may receive some append_vec files before the snapshot file
1232/// This function will push append_vec files into a buffer until we receive the snapshot file
1233fn get_version_and_snapshot_files(
1234    file_receiver: &Receiver<PathBuf>,
1235) -> Result<(PathBuf, PathBuf, Vec<PathBuf>)> {
1236    let mut append_vec_files = Vec::with_capacity(1024);
1237    let mut snapshot_version_path = None;
1238    let mut snapshot_file_path = None;
1239
1240    loop {
1241        if let Ok(path) = file_receiver.recv() {
1242            let filename = path.file_name().unwrap().to_str().unwrap();
1243            match get_snapshot_file_kind(filename) {
1244                Some(SnapshotFileKind::Version) => {
1245                    snapshot_version_path = Some(path);
1246
1247                    // break if we have both the snapshot file and the version file
1248                    if snapshot_file_path.is_some() {
1249                        break;
1250                    }
1251                }
1252                Some(SnapshotFileKind::BankFields) => {
1253                    snapshot_file_path = Some(path);
1254
1255                    // break if we have both the snapshot file and the version file
1256                    if snapshot_version_path.is_some() {
1257                        break;
1258                    }
1259                }
1260                Some(SnapshotFileKind::Storage) => {
1261                    append_vec_files.push(path);
1262                }
1263                None => {} // do nothing for other kinds of files
1264            }
1265        } else {
1266            return Err(SnapshotError::RebuildStorages(
1267                "did not receive snapshot file from unpacking threads".to_string(),
1268            ));
1269        }
1270    }
1271    let snapshot_version_path = snapshot_version_path.unwrap();
1272    let snapshot_file_path = snapshot_file_path.unwrap();
1273
1274    Ok((snapshot_version_path, snapshot_file_path, append_vec_files))
1275}
1276
1277/// Fields and information parsed from the snapshot.
1278struct SnapshotFieldsBundle {
1279    snapshot_version: SnapshotVersion,
1280    bank_fields: BankFieldsToDeserialize,
1281    accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
1282    append_vec_files: Vec<PathBuf>,
1283}
1284
1285/// Parses fields and information from the snapshot files provided by
1286/// `file_receiver`.
1287fn snapshot_fields_from_files(file_receiver: &Receiver<PathBuf>) -> Result<SnapshotFieldsBundle> {
1288    let (snapshot_version_path, snapshot_file_path, append_vec_files) =
1289        get_version_and_snapshot_files(file_receiver)?;
1290    let snapshot_version_str = snapshot_version_from_file(snapshot_version_path)?;
1291    let snapshot_version = snapshot_version_str.parse().map_err(|err| {
1292        IoError::other(format!(
1293            "unsupported snapshot version '{snapshot_version_str}': {err}",
1294        ))
1295    })?;
1296
1297    let snapshot_file = fs::File::open(snapshot_file_path).unwrap();
1298    let mut snapshot_stream = BufReader::new(snapshot_file);
1299    let (bank_fields, accounts_db_fields) = match snapshot_version {
1300        SnapshotVersion::V1_2_0 => serde_snapshot::fields_from_stream(&mut snapshot_stream)?,
1301    };
1302
1303    Ok(SnapshotFieldsBundle {
1304        snapshot_version,
1305        bank_fields,
1306        accounts_db_fields,
1307        append_vec_files,
1308    })
1309}
1310
1311/// BankSnapshotInfo::new_from_dir() requires a few meta files to accept a snapshot dir
1312/// as a valid one.  A dir unpacked from an archive lacks these files.  Fill them here to
1313/// allow new_from_dir() checks to pass.  These checks are not needed for unpacked dirs,
1314/// but it is not clean to add another flag to new_from_dir() to skip them.
1315fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1316    let snapshots_dir = unpack_dir.as_ref().join(snapshot_paths::BANK_SNAPSHOTS_DIR);
1317    if !snapshots_dir.is_dir() {
1318        return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1319    }
1320
1321    // The unpacked dir has a single slot dir, which is the snapshot slot dir.
1322    let slot_dir = std::fs::read_dir(&snapshots_dir)
1323        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1324        .find(|entry| entry.as_ref().unwrap().path().is_dir())
1325        .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1326        .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1327        .path();
1328
1329    let version_file = unpack_dir
1330        .as_ref()
1331        .join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
1332    fs::hard_link(
1333        version_file,
1334        slot_dir.join(snapshot_paths::SNAPSHOT_VERSION_FILENAME),
1335    )?;
1336
1337    let status_cache_file = snapshots_dir.join(snapshot_paths::SNAPSHOT_STATUS_CACHE_FILENAME);
1338    fs::hard_link(
1339        status_cache_file,
1340        slot_dir.join(snapshot_paths::SNAPSHOT_STATUS_CACHE_FILENAME),
1341    )?;
1342
1343    Ok(())
1344}
1345
1346/// Perform the common tasks when unarchiving a snapshot.  Handles creating the temporary
1347/// directories, untaring, reading the version file, and then returning those fields plus the
1348/// rebuilt storage
1349fn unarchive_snapshot(
1350    bank_snapshots_dir: impl AsRef<Path>,
1351    unpacked_snapshots_dir_prefix: &'static str,
1352    snapshot_archive_path: impl AsRef<Path>,
1353    measure_name: &'static str,
1354    account_paths: &[PathBuf],
1355    archive_format: ArchiveFormat,
1356    next_append_vec_id: Arc<AtomicAccountsFileId>,
1357    accounts_db_config: &AccountsDbConfig,
1358) -> Result<UnarchivedSnapshot> {
1359    let unpack_dir = tempfile::Builder::new()
1360        .prefix(unpacked_snapshots_dir_prefix)
1361        .tempdir_in(bank_snapshots_dir)?;
1362    let unpacked_snapshots_dir = unpack_dir.path().join(snapshot_paths::BANK_SNAPSHOTS_DIR);
1363
1364    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1365    let unarchive_handle = streaming_unarchive_snapshot(
1366        file_sender,
1367        account_paths.to_vec(),
1368        unpack_dir.path().to_path_buf(),
1369        snapshot_archive_path.as_ref().to_path_buf(),
1370        archive_format,
1371        accounts_db_config.memlock_budget_size,
1372    );
1373
1374    let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1375    let snapshot_result = snapshot_fields_from_files(&file_receiver).and_then(
1376        |SnapshotFieldsBundle {
1377             snapshot_version,
1378             bank_fields,
1379             accounts_db_fields,
1380             append_vec_files,
1381             ..
1382         }| {
1383            let (storage, measure_untar) = measure_time!(
1384                SnapshotStorageRebuilder::rebuild_storage(
1385                    &accounts_db_fields,
1386                    append_vec_files,
1387                    file_receiver,
1388                    num_rebuilder_threads,
1389                    next_append_vec_id,
1390                    SnapshotFrom::Archive,
1391                    accounts_db_config.storage_access,
1392                    None,
1393                )?,
1394                measure_name
1395            );
1396            info!("{measure_untar}");
1397            create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1398
1399            Ok(UnarchivedSnapshot {
1400                unpack_dir,
1401                storage,
1402                bank_fields,
1403                accounts_db_fields,
1404                unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1405                    unpacked_snapshots_dir,
1406                    snapshot_version,
1407                },
1408                measure_untar,
1409            })
1410        },
1411    );
1412    unarchive_handle.join().unwrap()?;
1413    snapshot_result
1414}
1415
1416/// Streams snapshot dir files across channel
1417/// Follow the flow of streaming_unarchive_snapshot(), but handle the from_dir case.
1418fn streaming_snapshot_dir_files(
1419    file_sender: Sender<PathBuf>,
1420    snapshot_file_path: impl Into<PathBuf>,
1421    snapshot_version_path: impl Into<PathBuf>,
1422    account_paths: &[PathBuf],
1423) -> Result<()> {
1424    file_sender.send(snapshot_file_path.into())?;
1425    file_sender.send(snapshot_version_path.into())?;
1426
1427    for account_path in account_paths {
1428        for file in fs::read_dir(account_path)? {
1429            file_sender.send(file?.path())?;
1430        }
1431    }
1432
1433    Ok(())
1434}
1435
1436/// Performs the common tasks when deserializing a snapshot
1437///
1438/// Handles reading the snapshot file and version file,
1439/// then returning those fields plus the rebuilt storages.
1440pub fn rebuild_storages_from_snapshot_dir(
1441    snapshot_info: &BankSnapshotInfo,
1442    account_paths: &[PathBuf],
1443    next_append_vec_id: Arc<AtomicAccountsFileId>,
1444    storage_access: StorageAccess,
1445) -> Result<(
1446    AccountStorageMap,
1447    BankFieldsToDeserialize,
1448    AccountsDbFields<SerializableAccountStorageEntry>,
1449)> {
1450    let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1451    let accounts_hardlinks = bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_ACCOUNTS_HARDLINKS);
1452    let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1453
1454    // With fastboot_version >= 2, obsolete accounts are tracked and stored in the snapshot
1455    // Even if obsolete accounts are not enabled, the snapshot may still contain obsolete accounts
1456    // as the feature may have been enabled in previous validator runs.
1457    let obsolete_accounts = snapshot_info
1458        .fastboot_version
1459        .as_ref()
1460        .is_some_and(|fastboot_version| fastboot_version.major >= 2)
1461        .then(|| deserialize_obsolete_accounts(bank_snapshot_dir, MAX_OBSOLETE_ACCOUNTS_FILE_SIZE))
1462        .transpose()
1463        .map_err(|err| {
1464            IoError::other(format!(
1465                "failed to read obsolete accounts file '{}': {err}",
1466                bank_snapshot_dir.display()
1467            ))
1468        })?;
1469
1470    let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1471        IoError::other(format!(
1472            "failed to read accounts hardlinks dir '{}': {err}",
1473            accounts_hardlinks.display(),
1474        ))
1475    })?;
1476    for dir_entry in read_dir {
1477        let symlink_path = dir_entry?.path();
1478        // The symlink point to <account_path>/snapshot/<slot> which contain the account files hardlinks
1479        // The corresponding run path should be <account_path>/run/
1480        let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
1481            IoError::other(format!(
1482                "failed to read symlink '{}': {err}",
1483                symlink_path.display(),
1484            ))
1485        })?;
1486        let account_run_path = account_snapshot_path
1487            .parent()
1488            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1489            .parent()
1490            .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1491            .join(ACCOUNTS_RUN_DIR);
1492        if !account_run_paths.contains(&account_run_path) {
1493            // The appendvec from the bank snapshot storage does not match any of the provided account_paths set.
1494            // The accout paths have changed so the snapshot is no longer usable.
1495            return Err(SnapshotError::AccountPathsMismatch);
1496        }
1497        // Generate hard-links to make the account files available in the main accounts/, and let the new appendvec
1498        // paths be in accounts/
1499        let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
1500            IoError::other(format!(
1501                "failed to read account snapshot dir '{}': {err}",
1502                account_snapshot_path.display(),
1503            ))
1504        })?;
1505        for file in read_dir {
1506            let file_path = file?.path();
1507            let file_name = file_path
1508                .file_name()
1509                .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
1510            let dest_path = account_run_path.join(file_name);
1511            fs::hard_link(&file_path, &dest_path).map_err(|err| {
1512                IoError::other(format!(
1513                    "failed to hard link from '{}' to '{}': {err}",
1514                    file_path.display(),
1515                    dest_path.display(),
1516                ))
1517            })?;
1518        }
1519    }
1520
1521    let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1522    let snapshot_file_path = &snapshot_info.snapshot_path();
1523    let snapshot_version_path = bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
1524    streaming_snapshot_dir_files(
1525        file_sender,
1526        snapshot_file_path,
1527        snapshot_version_path,
1528        account_paths,
1529    )?;
1530
1531    let SnapshotFieldsBundle {
1532        bank_fields,
1533        accounts_db_fields,
1534        append_vec_files,
1535        ..
1536    } = snapshot_fields_from_files(&file_receiver)?;
1537
1538    let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1539    let storage = SnapshotStorageRebuilder::rebuild_storage(
1540        &accounts_db_fields,
1541        append_vec_files,
1542        file_receiver,
1543        num_rebuilder_threads,
1544        next_append_vec_id,
1545        SnapshotFrom::Dir,
1546        storage_access,
1547        obsolete_accounts,
1548    )?;
1549
1550    Ok((storage, bank_fields, accounts_db_fields))
1551}
1552
1553/// Reads the `snapshot_version` from a file. Before opening the file, its size
1554/// is compared to `MAX_SNAPSHOT_VERSION_FILE_SIZE`. If the size exceeds this
1555/// threshold, it is not opened and an error is returned.
1556fn snapshot_version_from_file(path: impl AsRef<Path>) -> io::Result<String> {
1557    // Check file size.
1558    let file_metadata = fs::metadata(&path).map_err(|err| {
1559        IoError::other(format!(
1560            "failed to query snapshot version file metadata '{}': {err}",
1561            path.as_ref().display(),
1562        ))
1563    })?;
1564    let file_size = file_metadata.len();
1565    if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
1566        let error_message = format!(
1567            "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
1568            path.as_ref().display(),
1569            file_size,
1570            MAX_SNAPSHOT_VERSION_FILE_SIZE,
1571        );
1572        return Err(IoError::other(error_message));
1573    }
1574
1575    // Read snapshot_version from file.
1576    let mut snapshot_version = String::new();
1577    let mut file = fs::File::open(&path).map_err(|err| {
1578        IoError::other(format!(
1579            "failed to open snapshot version file '{}': {err}",
1580            path.as_ref().display()
1581        ))
1582    })?;
1583    file.read_to_string(&mut snapshot_version).map_err(|err| {
1584        IoError::other(format!(
1585            "failed to read snapshot version from file '{}': {err}",
1586            path.as_ref().display()
1587        ))
1588    })?;
1589
1590    Ok(snapshot_version.trim().to_string())
1591}
1592
1593/// Check if an incremental snapshot is compatible with a full snapshot.  This is done by checking
1594/// if the incremental snapshot's base slot is the same as the full snapshot's slot.
1595fn check_are_snapshots_compatible(
1596    full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1597    incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1598) -> Result<()> {
1599    if incremental_snapshot_archive_info.is_none() {
1600        return Ok(());
1601    }
1602
1603    let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
1604
1605    (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
1606        .then_some(())
1607        .ok_or_else(|| {
1608            SnapshotError::MismatchedBaseSlot(
1609                full_snapshot_archive_info.slot(),
1610                incremental_snapshot_archive_info.base_slot(),
1611            )
1612        })
1613}
1614
1615pub fn purge_old_snapshot_archives(
1616    full_snapshot_archives_dir: impl AsRef<Path>,
1617    incremental_snapshot_archives_dir: impl AsRef<Path>,
1618    maximum_full_snapshot_archives_to_retain: NonZeroUsize,
1619    maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
1620) {
1621    info!(
1622        "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
1623        full_snapshot_archives_dir.as_ref().display(),
1624        maximum_full_snapshot_archives_to_retain
1625    );
1626
1627    let mut full_snapshot_archives =
1628        snapshot_paths::get_full_snapshot_archives(&full_snapshot_archives_dir);
1629    full_snapshot_archives.sort_unstable();
1630    full_snapshot_archives.reverse();
1631
1632    let num_to_retain = full_snapshot_archives
1633        .len()
1634        .min(maximum_full_snapshot_archives_to_retain.get());
1635    trace!(
1636        "There are {} full snapshot archives, retaining {}",
1637        full_snapshot_archives.len(),
1638        num_to_retain,
1639    );
1640
1641    let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
1642        if full_snapshot_archives.is_empty() {
1643            None
1644        } else {
1645            Some(full_snapshot_archives.split_at(num_to_retain))
1646        }
1647        .unwrap_or_default();
1648
1649    let retained_full_snapshot_slots = full_snapshot_archives_to_retain
1650        .iter()
1651        .map(|ai| ai.slot())
1652        .collect::<HashSet<_>>();
1653
1654    fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
1655        for path in archives.iter().map(|a| a.path()) {
1656            trace!("Removing snapshot archive: {}", path.display());
1657            let result = fs::remove_file(path);
1658            if let Err(err) = result {
1659                info!(
1660                    "Failed to remove snapshot archive '{}': {err}",
1661                    path.display()
1662                );
1663            }
1664        }
1665    }
1666    remove_archives(full_snapshot_archives_to_remove);
1667
1668    info!(
1669        "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
1670        incremental_snapshot_archives_dir.as_ref().display(),
1671        maximum_incremental_snapshot_archives_to_retain
1672    );
1673    let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
1674    for incremental_snapshot_archive in
1675        get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
1676    {
1677        incremental_snapshot_archives_by_base_slot
1678            .entry(incremental_snapshot_archive.base_slot())
1679            .or_default()
1680            .push(incremental_snapshot_archive)
1681    }
1682
1683    let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
1684    for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
1685    {
1686        incremental_snapshot_archives.sort_unstable();
1687        let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
1688            maximum_incremental_snapshot_archives_to_retain.get()
1689        } else {
1690            usize::from(retained_full_snapshot_slots.contains(&base_slot))
1691        };
1692        trace!(
1693            "There are {} incremental snapshot archives for base slot {}, removing {} of them",
1694            incremental_snapshot_archives.len(),
1695            base_slot,
1696            incremental_snapshot_archives
1697                .len()
1698                .saturating_sub(num_to_retain),
1699        );
1700
1701        incremental_snapshot_archives.truncate(
1702            incremental_snapshot_archives
1703                .len()
1704                .saturating_sub(num_to_retain),
1705        );
1706        remove_archives(&incremental_snapshot_archives);
1707    }
1708}
1709
1710pub fn verify_unpacked_snapshots_dir_and_version(
1711    unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
1712) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
1713    info!(
1714        "snapshot version: {}",
1715        &unpacked_snapshots_dir_and_version.snapshot_version
1716    );
1717
1718    let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
1719    let mut bank_snapshots =
1720        get_bank_snapshots(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
1721    if bank_snapshots.len() > 1 {
1722        return Err(IoError::other(format!(
1723            "invalid snapshot format: only one snapshot allowed, but found {}",
1724            bank_snapshots.len(),
1725        ))
1726        .into());
1727    }
1728    let root_paths = bank_snapshots.pop().ok_or_else(|| {
1729        IoError::other(format!(
1730            "no snapshots found in snapshots directory '{}'",
1731            unpacked_snapshots_dir_and_version
1732                .unpacked_snapshots_dir
1733                .display(),
1734        ))
1735    })?;
1736    Ok((snapshot_version, root_paths))
1737}
1738
1739#[derive(Debug, Copy, Clone)]
1740/// allow tests to specify what happened to the serialized format
1741pub enum VerifyBank {
1742    /// the bank's serialized format is expected to be identical to what we are comparing against
1743    Deterministic,
1744    /// the serialized bank was 'reserialized' into a non-deterministic format
1745    /// so, deserialize both files and compare deserialized results
1746    NonDeterministic,
1747}
1748
1749/// Purges all bank snapshots
1750pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
1751    let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
1752    purge_bank_snapshots(&bank_snapshots);
1753}
1754
1755/// Purges bank snapshots, retaining the newest `num_bank_snapshots_to_retain`
1756pub fn purge_old_bank_snapshots(
1757    bank_snapshots_dir: impl AsRef<Path>,
1758    num_bank_snapshots_to_retain: usize,
1759) {
1760    let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
1761
1762    bank_snapshots.sort_unstable();
1763    purge_bank_snapshots(
1764        bank_snapshots
1765            .iter()
1766            .rev()
1767            .skip(num_bank_snapshots_to_retain),
1768    );
1769}
1770
1771/// At startup, purge old (i.e. unusable) bank snapshots
1772pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
1773    purge_old_bank_snapshots(&bank_snapshots_dir, 1);
1774
1775    let highest_bank_snapshot = get_highest_bank_snapshot(&bank_snapshots_dir);
1776    if let Some(highest_bank_snapshot) = highest_bank_snapshot {
1777        debug!(
1778            "Retained bank snapshot for slot {}, and purged the rest.",
1779            highest_bank_snapshot.slot
1780        );
1781    }
1782}
1783
1784/// Purges bank snapshots that are older than `slot`
1785pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
1786    let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
1787    bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
1788    purge_bank_snapshots(&bank_snapshots);
1789}
1790
1791/// Purges all `bank_snapshots`
1792///
1793/// Does not exit early if there is an error while purging a bank snapshot.
1794fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
1795    for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
1796        if purge_bank_snapshot(snapshot_dir).is_err() {
1797            warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
1798        }
1799    }
1800}
1801
1802/// Remove the bank snapshot at this path
1803pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
1804    const FN_ERR: &str = "failed to purge bank snapshot";
1805    let accounts_hardlinks_dir = bank_snapshot_dir
1806        .as_ref()
1807        .join(snapshot_paths::SNAPSHOT_ACCOUNTS_HARDLINKS);
1808    if accounts_hardlinks_dir.is_dir() {
1809        // This directory contain symlinks to all accounts snapshot directories.
1810        // They should all be removed.
1811        let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
1812            IoError::other(format!(
1813                "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
1814                accounts_hardlinks_dir.display(),
1815            ))
1816        })?;
1817        for entry in read_dir {
1818            let accounts_hardlink_dir = entry?.path();
1819            let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
1820                IoError::other(format!(
1821                    "{FN_ERR}: failed to read symlink '{}': {err}",
1822                    accounts_hardlink_dir.display(),
1823                ))
1824            })?;
1825            move_and_async_delete_path(&accounts_hardlink_dir);
1826        }
1827    }
1828    fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
1829        IoError::other(format!(
1830            "{FN_ERR}: failed to remove dir '{}': {err}",
1831            bank_snapshot_dir.as_ref().display(),
1832        ))
1833    })?;
1834    Ok(())
1835}
1836
1837pub fn should_take_full_snapshot(
1838    block_height: Slot,
1839    full_snapshot_archive_interval_slots: Slot,
1840) -> bool {
1841    block_height % full_snapshot_archive_interval_slots == 0
1842}
1843
1844pub fn should_take_incremental_snapshot(
1845    block_height: Slot,
1846    incremental_snapshot_archive_interval_slots: Slot,
1847    latest_full_snapshot_slot: Option<Slot>,
1848) -> bool {
1849    block_height % incremental_snapshot_archive_interval_slots == 0
1850        && latest_full_snapshot_slot.is_some()
1851}
1852
1853/// Creates an "accounts path" directory for tests
1854///
1855/// This temporary directory will contain the "run" and "snapshot"
1856/// sub-directories required by a validator.
1857#[cfg(feature = "dev-context-only-utils")]
1858pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
1859    let tmp_dir = tempfile::TempDir::new().unwrap();
1860    let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
1861    (tmp_dir, account_dir)
1862}
1863
1864#[cfg(test)]
1865mod tests {
1866    use {
1867        super::*,
1868        agave_snapshots::{
1869            paths::{
1870                get_full_snapshot_archives, get_highest_full_snapshot_archive_slot,
1871                get_highest_incremental_snapshot_archive_slot,
1872            },
1873            snapshot_config::{
1874                DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN,
1875                DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
1876            },
1877        },
1878        assert_matches::assert_matches,
1879        bincode::{deserialize_from, serialize_into},
1880        solana_accounts_db::accounts_file::AccountsFileProvider,
1881        solana_hash::Hash,
1882        std::{convert::TryFrom, mem::size_of},
1883        tempfile::NamedTempFile,
1884        test_case::test_case,
1885    };
1886
1887    #[test]
1888    fn test_serialize_snapshot_data_file_under_limit() {
1889        let temp_dir = tempfile::TempDir::new().unwrap();
1890        let expected_consumed_size = size_of::<u32>() as u64;
1891        let consumed_size = serialize_snapshot_data_file_capped(
1892            &temp_dir.path().join("data-file"),
1893            expected_consumed_size,
1894            |stream| {
1895                serialize_into(stream, &2323_u32)?;
1896                Ok(())
1897            },
1898        )
1899        .unwrap();
1900        assert_eq!(consumed_size, expected_consumed_size);
1901    }
1902
1903    #[test]
1904    fn test_serialize_snapshot_data_file_over_limit() {
1905        let temp_dir = tempfile::TempDir::new().unwrap();
1906        let expected_consumed_size = size_of::<u32>() as u64;
1907        let result = serialize_snapshot_data_file_capped(
1908            &temp_dir.path().join("data-file"),
1909            expected_consumed_size - 1,
1910            |stream| {
1911                serialize_into(stream, &2323_u32)?;
1912                Ok(())
1913            },
1914        );
1915        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
1916    }
1917
1918    #[test]
1919    fn test_deserialize_snapshot_data_file_under_limit() {
1920        let expected_data = 2323_u32;
1921        let expected_consumed_size = size_of::<u32>() as u64;
1922
1923        let temp_dir = tempfile::TempDir::new().unwrap();
1924        serialize_snapshot_data_file_capped(
1925            &temp_dir.path().join("data-file"),
1926            expected_consumed_size,
1927            |stream| {
1928                serialize_into(stream, &expected_data)?;
1929                Ok(())
1930            },
1931        )
1932        .unwrap();
1933
1934        let snapshot_root_paths = SnapshotRootPaths {
1935            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
1936            incremental_snapshot_root_file_path: None,
1937        };
1938
1939        let actual_data = deserialize_snapshot_data_files_capped(
1940            &snapshot_root_paths,
1941            expected_consumed_size,
1942            |stream| {
1943                Ok(deserialize_from::<_, u32>(
1944                    &mut stream.full_snapshot_stream,
1945                )?)
1946            },
1947        )
1948        .unwrap();
1949        assert_eq!(actual_data, expected_data);
1950    }
1951
1952    #[test]
1953    fn test_deserialize_snapshot_data_file_over_limit() {
1954        let expected_data = 2323_u32;
1955        let expected_consumed_size = size_of::<u32>() as u64;
1956
1957        let temp_dir = tempfile::TempDir::new().unwrap();
1958        serialize_snapshot_data_file_capped(
1959            &temp_dir.path().join("data-file"),
1960            expected_consumed_size,
1961            |stream| {
1962                serialize_into(stream, &expected_data)?;
1963                Ok(())
1964            },
1965        )
1966        .unwrap();
1967
1968        let snapshot_root_paths = SnapshotRootPaths {
1969            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
1970            incremental_snapshot_root_file_path: None,
1971        };
1972
1973        let result = deserialize_snapshot_data_files_capped(
1974            &snapshot_root_paths,
1975            expected_consumed_size - 1,
1976            |stream| {
1977                Ok(deserialize_from::<_, u32>(
1978                    &mut stream.full_snapshot_stream,
1979                )?)
1980            },
1981        );
1982        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
1983    }
1984
1985    #[test]
1986    fn test_deserialize_snapshot_data_file_extra_data() {
1987        let expected_data = 2323_u32;
1988        let expected_consumed_size = size_of::<u32>() as u64;
1989
1990        let temp_dir = tempfile::TempDir::new().unwrap();
1991        serialize_snapshot_data_file_capped(
1992            &temp_dir.path().join("data-file"),
1993            expected_consumed_size * 2,
1994            |stream| {
1995                serialize_into(stream.by_ref(), &expected_data)?;
1996                serialize_into(stream.by_ref(), &expected_data)?;
1997                Ok(())
1998            },
1999        )
2000        .unwrap();
2001
2002        let snapshot_root_paths = SnapshotRootPaths {
2003            full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2004            incremental_snapshot_root_file_path: None,
2005        };
2006
2007        let result = deserialize_snapshot_data_files_capped(
2008            &snapshot_root_paths,
2009            expected_consumed_size * 2,
2010            |stream| {
2011                Ok(deserialize_from::<_, u32>(
2012                    &mut stream.full_snapshot_stream,
2013                )?)
2014            },
2015        );
2016        assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2017    }
2018
2019    #[test]
2020    fn test_snapshot_version_from_file_under_limit() {
2021        let file_content = SnapshotVersion::default().as_str();
2022        let mut file = NamedTempFile::new().unwrap();
2023        file.write_all(file_content.as_bytes()).unwrap();
2024        let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2025        assert_eq!(version_from_file, file_content);
2026    }
2027
2028    #[test]
2029    fn test_snapshot_version_from_file_over_limit() {
2030        let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2031        let file_content = vec![7u8; over_limit_size];
2032        let mut file = NamedTempFile::new().unwrap();
2033        file.write_all(&file_content).unwrap();
2034        assert_matches!(
2035            snapshot_version_from_file(file.path()),
2036            Err(ref message) if message.to_string().starts_with("snapshot version file too large")
2037        );
2038    }
2039
2040    #[test]
2041    fn test_check_are_snapshots_compatible() {
2042        let slot1: Slot = 1234;
2043        let slot2: Slot = 5678;
2044        let slot3: Slot = 999_999;
2045
2046        let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
2047            format!("/dir/snapshot-{}-{}.tar.zst", slot1, Hash::new_unique()),
2048        ))
2049        .unwrap();
2050
2051        assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
2052
2053        let incremental_snapshot_archive_info =
2054            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2055                "/dir/incremental-snapshot-{}-{}-{}.tar.zst",
2056                slot1,
2057                slot2,
2058                Hash::new_unique()
2059            )))
2060            .unwrap();
2061
2062        assert!(check_are_snapshots_compatible(
2063            &full_snapshot_archive_info,
2064            Some(&incremental_snapshot_archive_info)
2065        )
2066        .is_ok());
2067
2068        let incremental_snapshot_archive_info =
2069            IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2070                "/dir/incremental-snapshot-{}-{}-{}.tar.zst",
2071                slot2,
2072                slot3,
2073                Hash::new_unique()
2074            )))
2075            .unwrap();
2076
2077        assert!(check_are_snapshots_compatible(
2078            &full_snapshot_archive_info,
2079            Some(&incremental_snapshot_archive_info)
2080        )
2081        .is_err());
2082    }
2083
2084    /// A test heler function that creates bank snapshot files
2085    fn common_create_bank_snapshot_files(
2086        bank_snapshots_dir: &Path,
2087        min_slot: Slot,
2088        max_slot: Slot,
2089    ) {
2090        for slot in min_slot..max_slot {
2091            let snapshot_dir = snapshot_paths::get_bank_snapshot_dir(bank_snapshots_dir, slot);
2092            fs::create_dir_all(&snapshot_dir).unwrap();
2093
2094            let snapshot_filename = snapshot_paths::get_snapshot_file_name(slot);
2095            let snapshot_path = snapshot_dir.join(snapshot_filename);
2096            fs::File::create(snapshot_path).unwrap();
2097
2098            let status_cache_file =
2099                snapshot_dir.join(snapshot_paths::SNAPSHOT_STATUS_CACHE_FILENAME);
2100            fs::File::create(status_cache_file).unwrap();
2101
2102            let version_path = snapshot_dir.join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
2103            fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
2104        }
2105    }
2106
2107    #[test]
2108    fn test_get_bank_snapshots() {
2109        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2110        let min_slot = 10;
2111        let max_slot = 20;
2112        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2113
2114        let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
2115        assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
2116    }
2117
2118    #[test]
2119    fn test_get_highest_bank_snapshot() {
2120        let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2121        let min_slot = 99;
2122        let max_slot = 123;
2123        common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2124
2125        let highest_bank_snapshot = get_highest_bank_snapshot(temp_snapshots_dir.path());
2126        assert!(highest_bank_snapshot.is_some());
2127        assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
2128    }
2129
2130    /// A test helper function that creates full and incremental snapshot archive files.  Creates
2131    /// full snapshot files in the range (`min_full_snapshot_slot`, `max_full_snapshot_slot`], and
2132    /// incremental snapshot files in the range (`min_incremental_snapshot_slot`,
2133    /// `max_incremental_snapshot_slot`].  Additionally, "bad" files are created for both full and
2134    /// incremental snapshots to ensure the tests properly filter them out.
2135    fn common_create_snapshot_archive_files(
2136        full_snapshot_archives_dir: &Path,
2137        incremental_snapshot_archives_dir: &Path,
2138        min_full_snapshot_slot: Slot,
2139        max_full_snapshot_slot: Slot,
2140        min_incremental_snapshot_slot: Slot,
2141        max_incremental_snapshot_slot: Slot,
2142    ) {
2143        fs::create_dir_all(full_snapshot_archives_dir).unwrap();
2144        fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
2145        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
2146            for incremental_snapshot_slot in
2147                min_incremental_snapshot_slot..max_incremental_snapshot_slot
2148            {
2149                let snapshot_filename = format!(
2150                    "incremental-snapshot-{}-{}-{}.tar.zst",
2151                    full_snapshot_slot,
2152                    incremental_snapshot_slot,
2153                    Hash::default()
2154                );
2155                let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
2156                fs::File::create(snapshot_filepath).unwrap();
2157            }
2158
2159            let snapshot_filename = format!(
2160                "snapshot-{}-{}.tar.zst",
2161                full_snapshot_slot,
2162                Hash::default()
2163            );
2164            let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
2165            fs::File::create(snapshot_filepath).unwrap();
2166
2167            // Add in an incremental snapshot with a bad filename and high slot to ensure filename are filtered and sorted correctly
2168            let bad_filename = format!(
2169                "incremental-snapshot-{}-{}-bad!hash.tar.zst",
2170                full_snapshot_slot,
2171                max_incremental_snapshot_slot + 1,
2172            );
2173            let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
2174            fs::File::create(bad_filepath).unwrap();
2175        }
2176
2177        // Add in a snapshot with a bad filename and high slot to ensure filename are filtered and
2178        // sorted correctly
2179        let bad_filename = format!("snapshot-{}-bad!hash.tar.zst", max_full_snapshot_slot + 1);
2180        let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
2181        fs::File::create(bad_filepath).unwrap();
2182    }
2183
2184    #[test]
2185    fn test_get_full_snapshot_archives() {
2186        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2187        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2188        let min_slot = 123;
2189        let max_slot = 456;
2190        common_create_snapshot_archive_files(
2191            full_snapshot_archives_dir.path(),
2192            incremental_snapshot_archives_dir.path(),
2193            min_slot,
2194            max_slot,
2195            0,
2196            0,
2197        );
2198
2199        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2200        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
2201    }
2202
2203    #[test]
2204    fn test_get_full_snapshot_archives_remote() {
2205        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2206        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2207        let min_slot = 123;
2208        let max_slot = 456;
2209        common_create_snapshot_archive_files(
2210            &full_snapshot_archives_dir
2211                .path()
2212                .join(snapshot_paths::SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
2213            &incremental_snapshot_archives_dir
2214                .path()
2215                .join(snapshot_paths::SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
2216            min_slot,
2217            max_slot,
2218            0,
2219            0,
2220        );
2221
2222        let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2223        assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
2224        assert!(snapshot_archives.iter().all(|info| info.is_remote()));
2225    }
2226
2227    #[test]
2228    fn test_get_incremental_snapshot_archives() {
2229        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2230        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2231        let min_full_snapshot_slot = 12;
2232        let max_full_snapshot_slot = 23;
2233        let min_incremental_snapshot_slot = 34;
2234        let max_incremental_snapshot_slot = 45;
2235        common_create_snapshot_archive_files(
2236            full_snapshot_archives_dir.path(),
2237            incremental_snapshot_archives_dir.path(),
2238            min_full_snapshot_slot,
2239            max_full_snapshot_slot,
2240            min_incremental_snapshot_slot,
2241            max_incremental_snapshot_slot,
2242        );
2243
2244        let incremental_snapshot_archives =
2245            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
2246        assert_eq!(
2247            incremental_snapshot_archives.len() as Slot,
2248            (max_full_snapshot_slot - min_full_snapshot_slot)
2249                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
2250        );
2251    }
2252
2253    #[test]
2254    fn test_get_incremental_snapshot_archives_remote() {
2255        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2256        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2257        let min_full_snapshot_slot = 12;
2258        let max_full_snapshot_slot = 23;
2259        let min_incremental_snapshot_slot = 34;
2260        let max_incremental_snapshot_slot = 45;
2261        common_create_snapshot_archive_files(
2262            &full_snapshot_archives_dir
2263                .path()
2264                .join(snapshot_paths::SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
2265            &incremental_snapshot_archives_dir
2266                .path()
2267                .join(snapshot_paths::SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
2268            min_full_snapshot_slot,
2269            max_full_snapshot_slot,
2270            min_incremental_snapshot_slot,
2271            max_incremental_snapshot_slot,
2272        );
2273
2274        let incremental_snapshot_archives =
2275            get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
2276        assert_eq!(
2277            incremental_snapshot_archives.len() as Slot,
2278            (max_full_snapshot_slot - min_full_snapshot_slot)
2279                * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
2280        );
2281        assert!(incremental_snapshot_archives
2282            .iter()
2283            .all(|info| info.is_remote()));
2284    }
2285
2286    #[test]
2287    fn test_get_highest_full_snapshot_archive_slot() {
2288        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2289        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2290        let min_slot = 123;
2291        let max_slot = 456;
2292        common_create_snapshot_archive_files(
2293            full_snapshot_archives_dir.path(),
2294            incremental_snapshot_archives_dir.path(),
2295            min_slot,
2296            max_slot,
2297            0,
2298            0,
2299        );
2300
2301        assert_eq!(
2302            get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
2303            Some(max_slot - 1)
2304        );
2305    }
2306
2307    #[test]
2308    fn test_get_highest_incremental_snapshot_slot() {
2309        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2310        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2311        let min_full_snapshot_slot = 12;
2312        let max_full_snapshot_slot = 23;
2313        let min_incremental_snapshot_slot = 34;
2314        let max_incremental_snapshot_slot = 45;
2315        common_create_snapshot_archive_files(
2316            full_snapshot_archives_dir.path(),
2317            incremental_snapshot_archives_dir.path(),
2318            min_full_snapshot_slot,
2319            max_full_snapshot_slot,
2320            min_incremental_snapshot_slot,
2321            max_incremental_snapshot_slot,
2322        );
2323
2324        for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
2325            assert_eq!(
2326                get_highest_incremental_snapshot_archive_slot(
2327                    incremental_snapshot_archives_dir.path(),
2328                    full_snapshot_slot
2329                ),
2330                Some(max_incremental_snapshot_slot - 1)
2331            );
2332        }
2333
2334        assert_eq!(
2335            get_highest_incremental_snapshot_archive_slot(
2336                incremental_snapshot_archives_dir.path(),
2337                max_full_snapshot_slot
2338            ),
2339            None
2340        );
2341    }
2342
2343    fn common_test_purge_old_snapshot_archives(
2344        snapshot_names: &[&String],
2345        maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2346        maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2347        expected_snapshots: &[&String],
2348    ) {
2349        let temp_snap_dir = tempfile::TempDir::new().unwrap();
2350
2351        for snap_name in snapshot_names {
2352            let snap_path = temp_snap_dir.path().join(snap_name);
2353            let mut _snap_file = fs::File::create(snap_path);
2354        }
2355        purge_old_snapshot_archives(
2356            temp_snap_dir.path(),
2357            temp_snap_dir.path(),
2358            maximum_full_snapshot_archives_to_retain,
2359            maximum_incremental_snapshot_archives_to_retain,
2360        );
2361
2362        let mut retained_snaps = HashSet::new();
2363        for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
2364            let entry_path_buf = entry.unwrap().path();
2365            let entry_path = entry_path_buf.as_path();
2366            let snapshot_name = entry_path
2367                .file_name()
2368                .unwrap()
2369                .to_str()
2370                .unwrap()
2371                .to_string();
2372            retained_snaps.insert(snapshot_name);
2373        }
2374
2375        for snap_name in expected_snapshots {
2376            assert!(
2377                retained_snaps.contains(snap_name.as_str()),
2378                "{snap_name} not found"
2379            );
2380        }
2381        assert_eq!(retained_snaps.len(), expected_snapshots.len());
2382    }
2383
2384    #[test]
2385    fn test_purge_old_full_snapshot_archives() {
2386        let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
2387        let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
2388        let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
2389        let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
2390
2391        // expecting only the newest to be retained
2392        let expected_snapshots = vec![&snap3_name];
2393        common_test_purge_old_snapshot_archives(
2394            &snapshot_names,
2395            NonZeroUsize::new(1).unwrap(),
2396            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
2397            &expected_snapshots,
2398        );
2399
2400        // retaining 2, expecting the 2 newest to be retained
2401        let expected_snapshots = vec![&snap2_name, &snap3_name];
2402        common_test_purge_old_snapshot_archives(
2403            &snapshot_names,
2404            NonZeroUsize::new(2).unwrap(),
2405            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
2406            &expected_snapshots,
2407        );
2408
2409        // retaining 3, all three should be retained
2410        let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
2411        common_test_purge_old_snapshot_archives(
2412            &snapshot_names,
2413            NonZeroUsize::new(3).unwrap(),
2414            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
2415            &expected_snapshots,
2416        );
2417    }
2418
2419    /// Mimic a running node's behavior w.r.t. purging old snapshot archives.  Take snapshots in a
2420    /// loop, and periodically purge old snapshot archives.  After purging, check to make sure the
2421    /// snapshot archives on disk are correct.
2422    #[test]
2423    fn test_purge_old_full_snapshot_archives_in_the_loop() {
2424        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2425        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2426        let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
2427        let starting_slot: Slot = 42;
2428
2429        for slot in (starting_slot..).take(100) {
2430            let full_snapshot_archive_file_name =
2431                format!("snapshot-{}-{}.tar.zst", slot, Hash::default());
2432            let full_snapshot_archive_path = full_snapshot_archives_dir
2433                .as_ref()
2434                .join(full_snapshot_archive_file_name);
2435            fs::File::create(full_snapshot_archive_path).unwrap();
2436
2437            // don't purge-and-check until enough snapshot archives have been created
2438            if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
2439                continue;
2440            }
2441
2442            // purge infrequently, so there will always be snapshot archives to purge
2443            if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
2444                continue;
2445            }
2446
2447            purge_old_snapshot_archives(
2448                &full_snapshot_archives_dir,
2449                &incremental_snapshot_archives_dir,
2450                maximum_snapshots_to_retain,
2451                NonZeroUsize::new(usize::MAX).unwrap(),
2452            );
2453            let mut full_snapshot_archives =
2454                get_full_snapshot_archives(&full_snapshot_archives_dir);
2455            full_snapshot_archives.sort_unstable();
2456            assert_eq!(
2457                full_snapshot_archives.len(),
2458                maximum_snapshots_to_retain.get()
2459            );
2460            assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
2461            for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
2462                assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
2463            }
2464        }
2465    }
2466
2467    #[test]
2468    fn test_purge_old_incremental_snapshot_archives() {
2469        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2470        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2471        let starting_slot = 100_000;
2472
2473        let maximum_incremental_snapshot_archives_to_retain =
2474            DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
2475        let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
2476
2477        let incremental_snapshot_interval = 100;
2478        let num_incremental_snapshots_per_full_snapshot =
2479            maximum_incremental_snapshot_archives_to_retain.get() * 2;
2480        let full_snapshot_interval =
2481            incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
2482
2483        let mut snapshot_filenames = vec![];
2484        (starting_slot..)
2485            .step_by(full_snapshot_interval)
2486            .take(
2487                maximum_full_snapshot_archives_to_retain
2488                    .checked_mul(NonZeroUsize::new(2).unwrap())
2489                    .unwrap()
2490                    .get(),
2491            )
2492            .for_each(|full_snapshot_slot| {
2493                let snapshot_filename = format!(
2494                    "snapshot-{}-{}.tar.zst",
2495                    full_snapshot_slot,
2496                    Hash::default()
2497                );
2498                let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
2499                fs::File::create(snapshot_path).unwrap();
2500                snapshot_filenames.push(snapshot_filename);
2501
2502                (full_snapshot_slot..)
2503                    .step_by(incremental_snapshot_interval)
2504                    .take(num_incremental_snapshots_per_full_snapshot)
2505                    .skip(1)
2506                    .for_each(|incremental_snapshot_slot| {
2507                        let snapshot_filename = format!(
2508                            "incremental-snapshot-{}-{}-{}.tar.zst",
2509                            full_snapshot_slot,
2510                            incremental_snapshot_slot,
2511                            Hash::default()
2512                        );
2513                        let snapshot_path = incremental_snapshot_archives_dir
2514                            .path()
2515                            .join(&snapshot_filename);
2516                        fs::File::create(snapshot_path).unwrap();
2517                        snapshot_filenames.push(snapshot_filename);
2518                    });
2519            });
2520
2521        purge_old_snapshot_archives(
2522            full_snapshot_archives_dir.path(),
2523            incremental_snapshot_archives_dir.path(),
2524            maximum_full_snapshot_archives_to_retain,
2525            maximum_incremental_snapshot_archives_to_retain,
2526        );
2527
2528        // Ensure correct number of full snapshot archives are purged/retained
2529        let mut remaining_full_snapshot_archives =
2530            get_full_snapshot_archives(full_snapshot_archives_dir.path());
2531        assert_eq!(
2532            remaining_full_snapshot_archives.len(),
2533            maximum_full_snapshot_archives_to_retain.get(),
2534        );
2535        remaining_full_snapshot_archives.sort_unstable();
2536        let latest_full_snapshot_archive_slot =
2537            remaining_full_snapshot_archives.last().unwrap().slot();
2538
2539        // Ensure correct number of incremental snapshot archives are purged/retained
2540        // For each additional full snapshot archive, one additional (the newest)
2541        // incremental snapshot archive is retained. This is accounted for by the
2542        // `+ maximum_full_snapshot_archives_to_retain.saturating_sub(1)`
2543        let mut remaining_incremental_snapshot_archives =
2544            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
2545        assert_eq!(
2546            remaining_incremental_snapshot_archives.len(),
2547            maximum_incremental_snapshot_archives_to_retain
2548                .get()
2549                .saturating_add(
2550                    maximum_full_snapshot_archives_to_retain
2551                        .get()
2552                        .saturating_sub(1)
2553                )
2554        );
2555        remaining_incremental_snapshot_archives.sort_unstable();
2556        remaining_incremental_snapshot_archives.reverse();
2557
2558        // Ensure there exists one incremental snapshot all but the latest full snapshot
2559        for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
2560            let incremental_snapshot_archive =
2561                remaining_incremental_snapshot_archives.pop().unwrap();
2562
2563            let expected_base_slot =
2564                latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
2565            assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
2566            let expected_slot = expected_base_slot
2567                + (full_snapshot_interval - incremental_snapshot_interval) as u64;
2568            assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
2569        }
2570
2571        // Ensure all remaining incremental snapshots are only for the latest full snapshot
2572        for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
2573            assert_eq!(
2574                incremental_snapshot_archive.base_slot(),
2575                latest_full_snapshot_archive_slot
2576            );
2577        }
2578
2579        // Ensure the remaining incremental snapshots are at the right slot
2580        let expected_remaining_incremental_snapshot_archive_slots =
2581            (latest_full_snapshot_archive_slot..)
2582                .step_by(incremental_snapshot_interval)
2583                .take(num_incremental_snapshots_per_full_snapshot)
2584                .skip(
2585                    num_incremental_snapshots_per_full_snapshot
2586                        - maximum_incremental_snapshot_archives_to_retain.get(),
2587                )
2588                .collect::<HashSet<_>>();
2589
2590        let actual_remaining_incremental_snapshot_archive_slots =
2591            remaining_incremental_snapshot_archives
2592                .iter()
2593                .map(|snapshot| snapshot.slot())
2594                .collect::<HashSet<_>>();
2595        assert_eq!(
2596            actual_remaining_incremental_snapshot_archive_slots,
2597            expected_remaining_incremental_snapshot_archive_slots
2598        );
2599    }
2600
2601    #[test]
2602    fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
2603        let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2604        let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2605
2606        for snapshot_filenames in [
2607            format!("incremental-snapshot-100-120-{}.tar.zst", Hash::default()),
2608            format!("incremental-snapshot-100-140-{}.tar.zst", Hash::default()),
2609            format!("incremental-snapshot-100-160-{}.tar.zst", Hash::default()),
2610            format!("incremental-snapshot-100-180-{}.tar.zst", Hash::default()),
2611            format!("incremental-snapshot-200-220-{}.tar.zst", Hash::default()),
2612            format!("incremental-snapshot-200-240-{}.tar.zst", Hash::default()),
2613            format!("incremental-snapshot-200-260-{}.tar.zst", Hash::default()),
2614            format!("incremental-snapshot-200-280-{}.tar.zst", Hash::default()),
2615        ] {
2616            let snapshot_path = incremental_snapshot_archives_dir
2617                .path()
2618                .join(snapshot_filenames);
2619            fs::File::create(snapshot_path).unwrap();
2620        }
2621
2622        purge_old_snapshot_archives(
2623            full_snapshot_archives_dir.path(),
2624            incremental_snapshot_archives_dir.path(),
2625            NonZeroUsize::new(usize::MAX).unwrap(),
2626            NonZeroUsize::new(usize::MAX).unwrap(),
2627        );
2628
2629        let remaining_incremental_snapshot_archives =
2630            get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
2631        assert!(remaining_incremental_snapshot_archives.is_empty());
2632    }
2633
2634    #[test]
2635    fn test_get_snapshot_accounts_hardlink_dir() {
2636        let slot: Slot = 1;
2637
2638        let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
2639
2640        let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
2641        let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
2642        let accounts_hardlinks_dir =
2643            bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_ACCOUNTS_HARDLINKS);
2644        fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
2645
2646        let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
2647        let appendvec_filename = format!("{slot}.0");
2648        let appendvec_path = accounts_dir.join(appendvec_filename);
2649
2650        let ret = get_snapshot_accounts_hardlink_dir(
2651            &appendvec_path,
2652            slot,
2653            &mut account_paths_set,
2654            &accounts_hardlinks_dir,
2655        );
2656        assert!(ret.is_ok());
2657
2658        let wrong_appendvec_path = appendvec_path
2659            .parent()
2660            .unwrap()
2661            .parent()
2662            .unwrap()
2663            .join(appendvec_path.file_name().unwrap());
2664        let ret = get_snapshot_accounts_hardlink_dir(
2665            &wrong_appendvec_path,
2666            slot,
2667            &mut account_paths_set,
2668            accounts_hardlinks_dir,
2669        );
2670
2671        assert_matches!(
2672            ret,
2673            Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
2674        );
2675    }
2676
2677    #[test]
2678    fn test_get_snapshot_file_kind() {
2679        assert_eq!(None, get_snapshot_file_kind("file.txt"));
2680        assert_eq!(
2681            Some(SnapshotFileKind::Version),
2682            get_snapshot_file_kind(snapshot_paths::SNAPSHOT_VERSION_FILENAME)
2683        );
2684        assert_eq!(
2685            Some(SnapshotFileKind::BankFields),
2686            get_snapshot_file_kind("1234")
2687        );
2688        assert_eq!(
2689            Some(SnapshotFileKind::Storage),
2690            get_snapshot_file_kind("1000.999")
2691        );
2692    }
2693
2694    #[test]
2695    fn test_full_snapshot_slot_file_good() {
2696        let slot_written = 123_456_789;
2697        let bank_snapshot_dir = TempDir::new().unwrap();
2698        write_full_snapshot_slot_file(&bank_snapshot_dir, slot_written).unwrap();
2699
2700        let slot_read = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap();
2701        assert_eq!(slot_read, slot_written);
2702    }
2703
2704    #[test]
2705    fn test_full_snapshot_slot_file_bad() {
2706        const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
2707        let too_small = [1u8; SLOT_SIZE - 1];
2708        let too_large = [1u8; SLOT_SIZE + 1];
2709
2710        for contents in [too_small.as_slice(), too_large.as_slice()] {
2711            let bank_snapshot_dir = TempDir::new().unwrap();
2712            let full_snapshot_slot_path = bank_snapshot_dir
2713                .as_ref()
2714                .join(snapshot_paths::SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
2715            fs::write(full_snapshot_slot_path, contents).unwrap();
2716
2717            let err = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap_err();
2718            assert!(err
2719                .to_string()
2720                .starts_with("invalid full snapshot slot file size"));
2721        }
2722    }
2723
2724    #[test_case(0)]
2725    #[test_case(1)]
2726    #[test_case(10)]
2727    fn test_serialize_deserialize_account_storage_entries(num_storages: u64) {
2728        let temp_dir = tempfile::tempdir().unwrap();
2729        let bank_snapshot_dir = temp_dir.path();
2730        let snapshot_slot = num_storages + 1 as Slot;
2731
2732        // Create AccountStorageEntries
2733        let mut snapshot_storages = Vec::new();
2734        for i in 0..num_storages {
2735            let storage = Arc::new(AccountStorageEntry::new(
2736                &PathBuf::new(),
2737                i,        // Incrementing slot
2738                i as u32, // Incrementing id
2739                1024,
2740                AccountsFileProvider::AppendVec,
2741                StorageAccess::File,
2742            ));
2743            snapshot_storages.push(storage);
2744        }
2745
2746        // write obsolete accounts to snapshot
2747        write_obsolete_accounts_to_snapshot(bank_snapshot_dir, &snapshot_storages, snapshot_slot)
2748            .unwrap();
2749
2750        // Deserialize
2751        let deserialized_accounts =
2752            deserialize_obsolete_accounts(bank_snapshot_dir, MAX_OBSOLETE_ACCOUNTS_FILE_SIZE)
2753                .unwrap();
2754
2755        // Verify
2756        for storage in &snapshot_storages {
2757            assert!(deserialized_accounts.remove(&storage.slot()).unwrap().2 == 0);
2758        }
2759    }
2760
2761    #[test]
2762    #[should_panic(expected = "too large obsolete accounts file to serialize")]
2763    fn test_serialize_obsolete_accounts_too_large_file() {
2764        let temp_dir = tempfile::tempdir().unwrap();
2765        let bank_snapshot_dir = temp_dir.path();
2766        let num_storages = 10;
2767        let snapshot_slot = num_storages + 1 as Slot;
2768
2769        // Create AccountStorageEntries
2770        let mut snapshot_storages = Vec::new();
2771        for i in 0..num_storages {
2772            let storage = Arc::new(AccountStorageEntry::new(
2773                &PathBuf::new(),
2774                i,        // Incrementing slot
2775                i as u32, // Incrementing id
2776                1024,
2777                AccountsFileProvider::AppendVec,
2778                StorageAccess::File,
2779            ));
2780            snapshot_storages.push(storage);
2781        }
2782
2783        // write obsolete accounts to snapshot
2784        let obsolete_accounts =
2785            SerdeObsoleteAccountsMap::new_from_storages(&snapshot_storages, snapshot_slot);
2786
2787        // Limit the file size to something low for the test
2788        serialize_obsolete_accounts(bank_snapshot_dir, &obsolete_accounts, 100).unwrap();
2789    }
2790
2791    #[test]
2792    #[should_panic(expected = "too large obsolete accounts file to deserialize")]
2793    fn test_deserialize_obsolete_accounts_too_large_file() {
2794        let temp_dir = tempfile::tempdir().unwrap();
2795        let bank_snapshot_dir = temp_dir.path();
2796        let num_storages = 10;
2797        let snapshot_slot = num_storages + 1 as Slot;
2798
2799        // Create AccountStorageEntries
2800        let mut snapshot_storages = Vec::new();
2801        for i in 0..num_storages {
2802            let storage = Arc::new(AccountStorageEntry::new(
2803                &PathBuf::new(),
2804                i,        // Incrementing slot
2805                i as u32, // Incrementing id
2806                1024,
2807                AccountsFileProvider::AppendVec,
2808                StorageAccess::File,
2809            ));
2810            snapshot_storages.push(storage);
2811        }
2812
2813        // Write obsolete accounts to snapshot
2814        write_obsolete_accounts_to_snapshot(bank_snapshot_dir, &snapshot_storages, snapshot_slot)
2815            .unwrap();
2816
2817        // Set a very low maximum file size for deserialization
2818        // This should panic
2819        deserialize_obsolete_accounts(bank_snapshot_dir, 100).unwrap();
2820    }
2821}