1use {
2 crate::{
3 bank::{BankFieldsToSerialize, BankHashStats, BankSlotDelta},
4 serde_snapshot::{
5 self, BankIncrementalSnapshotPersistence, ExtraFieldsToSerialize, SnapshotStreams,
6 },
7 snapshot_archive_info::{
8 FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfo,
9 SnapshotArchiveInfoGetter,
10 },
11 snapshot_bank_utils,
12 snapshot_config::SnapshotConfig,
13 snapshot_hash::SnapshotHash,
14 snapshot_package::{SnapshotKind, SnapshotPackage},
15 snapshot_utils::snapshot_storage_rebuilder::{
16 RebuiltSnapshotStorage, SnapshotStorageRebuilder,
17 },
18 },
19 bzip2::bufread::BzDecoder,
20 crossbeam_channel::Sender,
21 flate2::read::GzDecoder,
22 lazy_static::lazy_static,
23 log::*,
24 regex::Regex,
25 solana_accounts_db::{
26 account_storage::{meta::StoredMetaWriteVersion, AccountStorageMap},
27 accounts_db::{AccountStorageEntry, AtomicAccountsFileId},
28 accounts_file::{AccountsFile, AccountsFileError, InternalsForArchive, StorageAccess},
29 accounts_hash::{AccountsDeltaHash, AccountsHash},
30 epoch_accounts_hash::EpochAccountsHash,
31 hardened_unpack::{self, ParallelSelector, UnpackError},
32 shared_buffer_reader::{SharedBuffer, SharedBufferReader},
33 utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
34 },
35 solana_measure::{measure::Measure, measure_time, measure_us},
36 solana_sdk::{
37 clock::{Epoch, Slot},
38 hash::Hash,
39 },
40 std::{
41 cmp::Ordering,
42 collections::{HashMap, HashSet},
43 fmt, fs,
44 io::{BufReader, BufWriter, Error as IoError, Read, Result as IoResult, Seek, Write},
45 mem,
46 num::NonZeroUsize,
47 ops::RangeInclusive,
48 path::{Path, PathBuf},
49 process::ExitStatus,
50 str::FromStr,
51 sync::Arc,
52 thread::{Builder, JoinHandle},
53 },
54 tar::{self, Archive},
55 tempfile::TempDir,
56 thiserror::Error,
57};
58#[cfg(feature = "dev-context-only-utils")]
59use {
60 hardened_unpack::UnpackedAppendVecMap, rayon::prelude::*,
61 solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
62};
63
64mod archive_format;
65pub mod snapshot_storage_rebuilder;
66pub use archive_format::*;
67
68pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
69pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
70pub const SNAPSHOT_STATE_COMPLETE_FILENAME: &str = "state_complete";
71pub const SNAPSHOT_ACCOUNTS_HARDLINKS: &str = "accounts_hardlinks";
72pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
73pub const SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME: &str = "full_snapshot_slot";
74pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; const VERSION_STRING_V1_2_0: &str = "1.2.0";
77pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
78pub const BANK_SNAPSHOT_PRE_FILENAME_EXTENSION: &str = "pre";
79pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
84 unsafe { NonZeroUsize::new_unchecked(2) };
85pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
86 unsafe { NonZeroUsize::new_unchecked(4) };
87pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
88pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
89
90#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
91pub enum SnapshotVersion {
92 #[default]
93 V1_2_0,
94}
95
96impl fmt::Display for SnapshotVersion {
97 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
98 f.write_str(From::from(*self))
99 }
100}
101
102impl From<SnapshotVersion> for &'static str {
103 fn from(snapshot_version: SnapshotVersion) -> &'static str {
104 match snapshot_version {
105 SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
106 }
107 }
108}
109
110impl FromStr for SnapshotVersion {
111 type Err = &'static str;
112
113 fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
114 let version_string = if version_string
116 .get(..1)
117 .is_some_and(|s| s.eq_ignore_ascii_case("v"))
118 {
119 &version_string[1..]
120 } else {
121 version_string
122 };
123 match version_string {
124 VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
125 _ => Err("unsupported snapshot version"),
126 }
127 }
128}
129
130impl SnapshotVersion {
131 pub fn as_str(self) -> &'static str {
132 <&str as From<Self>>::from(self)
133 }
134}
135
136#[derive(PartialEq, Eq, Debug)]
139pub struct BankSnapshotInfo {
140 pub slot: Slot,
142 pub snapshot_kind: BankSnapshotKind,
144 pub snapshot_dir: PathBuf,
146 pub snapshot_version: SnapshotVersion,
148}
149
150impl PartialOrd for BankSnapshotInfo {
151 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
152 Some(self.cmp(other))
153 }
154}
155
156impl Ord for BankSnapshotInfo {
158 fn cmp(&self, other: &Self) -> Ordering {
159 self.slot.cmp(&other.slot)
160 }
161}
162
163impl BankSnapshotInfo {
164 pub fn new_from_dir(
165 bank_snapshots_dir: impl AsRef<Path>,
166 slot: Slot,
167 ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
168 let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
171
172 if !bank_snapshot_dir.is_dir() {
173 return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
174 bank_snapshot_dir,
175 ));
176 }
177
178 if !is_bank_snapshot_complete(&bank_snapshot_dir) {
184 return Err(SnapshotNewFromDirError::IncompleteDir(bank_snapshot_dir));
185 }
186
187 let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
188 if !status_cache_file.is_file() {
189 return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
190 status_cache_file,
191 ));
192 }
193
194 let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
195 let version_str = snapshot_version_from_file(&version_path).or(Err(
196 SnapshotNewFromDirError::MissingVersionFile(version_path),
197 ))?;
198 let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
199 .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
200
201 let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
202 let bank_snapshot_pre_path =
203 bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
204
205 let snapshot_kind = if bank_snapshot_pre_path.is_file() {
214 BankSnapshotKind::Pre
215 } else if bank_snapshot_post_path.is_file() {
216 BankSnapshotKind::Post
217 } else {
218 return Err(SnapshotNewFromDirError::MissingSnapshotFile(
219 bank_snapshot_dir,
220 ));
221 };
222
223 Ok(BankSnapshotInfo {
224 slot,
225 snapshot_kind,
226 snapshot_dir: bank_snapshot_dir,
227 snapshot_version,
228 })
229 }
230
231 pub fn snapshot_path(&self) -> PathBuf {
232 let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot));
233
234 let ext = match self.snapshot_kind {
235 BankSnapshotKind::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
236 BankSnapshotKind::Post => "",
237 };
238 bank_snapshot_path.set_extension(ext);
239
240 bank_snapshot_path
241 }
242}
243
244#[derive(Debug, Copy, Clone, Eq, PartialEq)]
253pub enum BankSnapshotKind {
254 Pre,
256 Post,
258}
259
260#[derive(Clone, Copy, Debug, Eq, PartialEq)]
264pub enum SnapshotFrom {
265 Archive,
267 Dir,
269}
270
271#[derive(Debug)]
274pub struct SnapshotRootPaths {
275 pub full_snapshot_root_file_path: PathBuf,
276 pub incremental_snapshot_root_file_path: Option<PathBuf>,
277}
278
279#[derive(Debug)]
281pub struct UnarchivedSnapshot {
282 #[allow(dead_code)]
283 unpack_dir: TempDir,
284 pub storage: AccountStorageMap,
285 pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
286 pub measure_untar: Measure,
287}
288
289#[derive(Debug)]
291pub struct UnpackedSnapshotsDirAndVersion {
292 pub unpacked_snapshots_dir: PathBuf,
293 pub snapshot_version: SnapshotVersion,
294}
295
296pub(crate) struct StorageAndNextAccountsFileId {
299 pub storage: AccountStorageMap,
300 pub next_append_vec_id: AtomicAccountsFileId,
301}
302
303#[derive(Error, Debug)]
304#[allow(clippy::large_enum_variant)]
305pub enum SnapshotError {
306 #[error("I/O error: {0}")]
307 Io(#[from] IoError),
308
309 #[error("AccountsFile error: {0}")]
310 AccountsFileError(#[from] AccountsFileError),
311
312 #[error("serialization error: {0}")]
313 Serialize(#[from] bincode::Error),
314
315 #[error("crossbeam send error: {0}")]
316 CrossbeamSend(#[from] crossbeam_channel::SendError<PathBuf>),
317
318 #[error("archive generation failure {0}")]
319 ArchiveGenerationFailure(ExitStatus),
320
321 #[error("Unpack error: {0}")]
322 UnpackError(#[from] UnpackError),
323
324 #[error("source({1}) - I/O error: {0}")]
325 IoWithSource(IoError, &'static str),
326
327 #[error("could not get file name from path '{0}'")]
328 PathToFileNameError(PathBuf),
329
330 #[error("could not get str from file name '{0}'")]
331 FileNameToStrError(PathBuf),
332
333 #[error("could not parse snapshot archive's file name '{0}'")]
334 ParseSnapshotArchiveFileNameError(String),
335
336 #[error("snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot ({1}) do not match")]
337 MismatchedBaseSlot(Slot, Slot),
338
339 #[error("no snapshot archives to load from '{0}'")]
340 NoSnapshotArchives(PathBuf),
341
342 #[error("snapshot slot mismatch: deserialized bank: {0}, snapshot archive: {1}")]
343 MismatchedSlot(Slot, Slot),
344
345 #[error("snapshot hash mismatch: deserialized bank: {0:?}, snapshot archive: {1:?}")]
346 MismatchedHash(SnapshotHash, SnapshotHash),
347
348 #[error("snapshot slot deltas are invalid: {0}")]
349 VerifySlotDeltas(#[from] VerifySlotDeltasError),
350
351 #[error("snapshot epoch stakes are invalid: {0}")]
352 VerifyEpochStakes(#[from] VerifyEpochStakesError),
353
354 #[error("bank_snapshot_info new_from_dir failed: {0}")]
355 NewFromDir(#[from] SnapshotNewFromDirError),
356
357 #[error("invalid snapshot dir path '{0}'")]
358 InvalidSnapshotDirPath(PathBuf),
359
360 #[error("invalid AppendVec path '{0}'")]
361 InvalidAppendVecPath(PathBuf),
362
363 #[error("invalid account path '{0}'")]
364 InvalidAccountPath(PathBuf),
365
366 #[error("no valid snapshot dir found under '{0}'")]
367 NoSnapshotSlotDir(PathBuf),
368
369 #[error("snapshot dir account paths mismatching")]
370 AccountPathsMismatch,
371
372 #[error("failed to add bank snapshot for slot {1}: {0}")]
373 AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
374
375 #[error("failed to archive snapshot package: {0}")]
376 ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
377
378 #[error("failed to rebuild snapshot storages: {0}")]
379 RebuildStorages(String),
380}
381
382#[derive(Error, Debug)]
383pub enum SnapshotNewFromDirError {
384 #[error("invalid bank snapshot directory '{0}'")]
385 InvalidBankSnapshotDir(PathBuf),
386
387 #[error("missing status cache file '{0}'")]
388 MissingStatusCacheFile(PathBuf),
389
390 #[error("missing version file '{0}'")]
391 MissingVersionFile(PathBuf),
392
393 #[error("invalid snapshot version '{0}'")]
394 InvalidVersion(String),
395
396 #[error("snapshot directory incomplete '{0}'")]
397 IncompleteDir(PathBuf),
398
399 #[error("missing snapshot file '{0}'")]
400 MissingSnapshotFile(PathBuf),
401}
402
403pub type Result<T> = std::result::Result<T, SnapshotError>;
404
405#[derive(Error, Debug, PartialEq, Eq)]
407pub enum VerifySlotDeltasError {
408 #[error("too many entries: {0} (max: {1})")]
409 TooManyEntries(usize, usize),
410
411 #[error("slot {0} is not a root")]
412 SlotIsNotRoot(Slot),
413
414 #[error("slot {0} is greater than bank slot {1}")]
415 SlotGreaterThanMaxRoot(Slot, Slot),
416
417 #[error("slot {0} has multiple entries")]
418 SlotHasMultipleEntries(Slot),
419
420 #[error("slot {0} was not found in slot history")]
421 SlotNotFoundInHistory(Slot),
422
423 #[error("slot {0} was in history but missing from slot deltas")]
424 SlotNotFoundInDeltas(Slot),
425
426 #[error("slot history is bad and cannot be used to verify slot deltas")]
427 BadSlotHistory,
428}
429
430#[derive(Error, Debug, PartialEq, Eq)]
432pub enum VerifyEpochStakesError {
433 #[error("epoch {0} is greater than the max {1}")]
434 EpochGreaterThanMax(Epoch, Epoch),
435
436 #[error("stakes not found for epoch {0} (required epochs: {1:?})")]
437 StakesNotFound(Epoch, RangeInclusive<Epoch>),
438}
439
440#[derive(Error, Debug)]
442pub enum AddBankSnapshotError {
443 #[error("bank snapshot dir already exists '{0}'")]
444 SnapshotDirAlreadyExists(PathBuf),
445
446 #[error("failed to create snapshot dir '{1}': {0}")]
447 CreateSnapshotDir(#[source] IoError, PathBuf),
448
449 #[error("failed to flush storage '{1}': {0}")]
450 FlushStorage(#[source] AccountsFileError, PathBuf),
451
452 #[error("failed to hard link storages: {0}")]
453 HardLinkStorages(#[source] HardLinkStoragesToSnapshotError),
454
455 #[error("failed to serialize bank: {0}")]
456 SerializeBank(#[source] Box<SnapshotError>),
457
458 #[error("failed to serialize status cache: {0}")]
459 SerializeStatusCache(#[source] Box<SnapshotError>),
460
461 #[error("failed to write snapshot version file '{1}': {0}")]
462 WriteSnapshotVersionFile(#[source] IoError, PathBuf),
463
464 #[error("failed to mark snapshot as 'complete': failed to create file '{1}': {0}")]
465 CreateStateCompleteFile(#[source] IoError, PathBuf),
466}
467
468#[derive(Error, Debug)]
470pub enum ArchiveSnapshotPackageError {
471 #[error("failed to create archive path '{1}': {0}")]
472 CreateArchiveDir(#[source] IoError, PathBuf),
473
474 #[error("failed to create staging dir inside '{1}': {0}")]
475 CreateStagingDir(#[source] IoError, PathBuf),
476
477 #[error("failed to create snapshot staging dir '{1}': {0}")]
478 CreateSnapshotStagingDir(#[source] IoError, PathBuf),
479
480 #[error("failed to canonicalize snapshot source dir '{1}': {0}")]
481 CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),
482
483 #[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
484 SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),
485
486 #[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
487 SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),
488
489 #[error("failed to symlink version file from '{1}' to '{2}': {0}")]
490 SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),
491
492 #[error("failed to create archive file '{1}': {0}")]
493 CreateArchiveFile(#[source] IoError, PathBuf),
494
495 #[error("failed to archive version file: {0}")]
496 ArchiveVersionFile(#[source] IoError),
497
498 #[error("failed to archive snapshots dir: {0}")]
499 ArchiveSnapshotsDir(#[source] IoError),
500
501 #[error("failed to archive account storage file '{1}': {0}")]
502 ArchiveAccountStorageFile(#[source] IoError, PathBuf),
503
504 #[error("failed to archive snapshot: {0}")]
505 FinishArchive(#[source] IoError),
506
507 #[error("failed to create encoder: {0}")]
508 CreateEncoder(#[source] IoError),
509
510 #[error("failed to encode archive: {0}")]
511 FinishEncoder(#[source] IoError),
512
513 #[error("failed to query archive metadata '{1}': {0}")]
514 QueryArchiveMetadata(#[source] IoError, PathBuf),
515
516 #[error("failed to move archive from '{1}' to '{2}': {0}")]
517 MoveArchive(#[source] IoError, PathBuf, PathBuf),
518}
519
520#[derive(Error, Debug)]
522pub enum HardLinkStoragesToSnapshotError {
523 #[error("failed to create accounts hard links dir '{1}': {0}")]
524 CreateAccountsHardLinksDir(#[source] IoError, PathBuf),
525
526 #[error("failed to get the snapshot's accounts hard link dir: {0}")]
527 GetSnapshotHardLinksDir(#[from] GetSnapshotAccountsHardLinkDirError),
528
529 #[error("failed to hard link storage from '{1}' to '{2}': {0}")]
530 HardLinkStorage(#[source] IoError, PathBuf, PathBuf),
531}
532
533#[derive(Error, Debug)]
535pub enum GetSnapshotAccountsHardLinkDirError {
536 #[error("invalid account storage path '{0}'")]
537 GetAccountPath(PathBuf),
538
539 #[error("failed to create the snapshot hard link dir '{1}': {0}")]
540 CreateSnapshotHardLinkDir(#[source] IoError, PathBuf),
541
542 #[error("failed to symlink snapshot hard link dir '{link}' to '{original}': {source}")]
543 SymlinkSnapshotHardLinkDir {
544 source: IoError,
545 original: PathBuf,
546 link: PathBuf,
547 },
548}
549
550pub fn clean_orphaned_account_snapshot_dirs(
558 bank_snapshots_dir: impl AsRef<Path>,
559 account_snapshot_paths: &[PathBuf],
560) -> IoResult<()> {
561 let mut account_snapshot_dirs_referenced = HashSet::new();
564 let snapshots = get_bank_snapshots(bank_snapshots_dir);
565 for snapshot in snapshots {
566 let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
567 let read_dir = fs::read_dir(&account_hardlinks_dir).map_err(|err| {
569 IoError::other(format!(
570 "failed to read account hardlinks dir '{}': {err}",
571 account_hardlinks_dir.display(),
572 ))
573 })?;
574 for entry in read_dir {
575 let path = entry?.path();
576 let target = fs::read_link(&path).map_err(|err| {
577 IoError::other(format!(
578 "failed to read symlink '{}': {err}",
579 path.display(),
580 ))
581 })?;
582 account_snapshot_dirs_referenced.insert(target);
583 }
584 }
585
586 for account_snapshot_path in account_snapshot_paths {
588 let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
589 IoError::other(format!(
590 "failed to read account snapshot dir '{}': {err}",
591 account_snapshot_path.display(),
592 ))
593 })?;
594 for entry in read_dir {
595 let path = entry?.path();
596 if !account_snapshot_dirs_referenced.contains(&path) {
597 info!(
598 "Removing orphaned account snapshot hardlink directory '{}'...",
599 path.display()
600 );
601 move_and_async_delete_path(&path);
602 }
603 }
604 }
605
606 Ok(())
607}
608
609pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
611 let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
612 return;
614 };
615
616 let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
617
618 let incomplete_dirs: Vec<_> = read_dir_iter
619 .filter_map(|entry| entry.ok())
620 .map(|entry| entry.path())
621 .filter(|path| path.is_dir())
622 .filter(is_incomplete)
623 .collect();
624
625 for incomplete_dir in incomplete_dirs {
627 let result = purge_bank_snapshot(&incomplete_dir);
628 match result {
629 Ok(_) => info!(
630 "Purged incomplete snapshot dir: {}",
631 incomplete_dir.display()
632 ),
633 Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
634 }
635 }
636}
637
638fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
640 let state_complete_path = bank_snapshot_dir
641 .as_ref()
642 .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
643 state_complete_path.is_file()
644}
645
646pub fn write_full_snapshot_slot_file(
648 bank_snapshot_dir: impl AsRef<Path>,
649 full_snapshot_slot: Slot,
650) -> IoResult<()> {
651 let full_snapshot_slot_path = bank_snapshot_dir
652 .as_ref()
653 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
654 fs::write(
655 &full_snapshot_slot_path,
656 Slot::to_le_bytes(full_snapshot_slot),
657 )
658 .map_err(|err| {
659 IoError::other(format!(
660 "failed to write full snapshot slot file '{}': {err}",
661 full_snapshot_slot_path.display(),
662 ))
663 })
664}
665
666pub fn read_full_snapshot_slot_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<Slot> {
668 const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
669 let full_snapshot_slot_path = bank_snapshot_dir
670 .as_ref()
671 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
672 let full_snapshot_slot_file_metadata = fs::metadata(&full_snapshot_slot_path)?;
673 if full_snapshot_slot_file_metadata.len() != SLOT_SIZE as u64 {
674 let error_message = format!(
675 "invalid full snapshot slot file size: '{}' has {} bytes (should be {} bytes)",
676 full_snapshot_slot_path.display(),
677 full_snapshot_slot_file_metadata.len(),
678 SLOT_SIZE,
679 );
680 return Err(IoError::other(error_message));
681 }
682 let mut full_snapshot_slot_file = fs::File::open(&full_snapshot_slot_path)?;
683 let mut buffer = [0; SLOT_SIZE];
684 full_snapshot_slot_file.read_exact(&mut buffer)?;
685 let slot = Slot::from_le_bytes(buffer);
686 Ok(slot)
687}
688
689pub fn get_highest_loadable_bank_snapshot(
696 snapshot_config: &SnapshotConfig,
697) -> Option<BankSnapshotInfo> {
698 let highest_bank_snapshot =
699 get_highest_bank_snapshot_post(&snapshot_config.bank_snapshots_dir)?;
700
701 if !snapshot_config.should_generate_snapshots() {
704 return Some(highest_bank_snapshot);
705 }
706
707 let highest_full_snapshot_archive_slot =
710 get_highest_full_snapshot_archive_slot(&snapshot_config.full_snapshot_archives_dir)?;
711 let full_snapshot_file_slot =
712 read_full_snapshot_slot_file(&highest_bank_snapshot.snapshot_dir).ok()?;
713 (full_snapshot_file_slot == highest_full_snapshot_archive_slot).then_some(highest_bank_snapshot)
714}
715
716pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
719 if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
720 for entry in entries.flatten() {
721 if entry
722 .file_name()
723 .to_str()
724 .map(|file_name| file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX))
725 .unwrap_or(false)
726 {
727 let path = entry.path();
728 let result = if path.is_dir() {
729 fs::remove_dir_all(&path)
730 } else {
731 fs::remove_file(&path)
732 };
733 if let Err(err) = result {
734 warn!(
735 "Failed to remove temporary snapshot archive '{}': {err}",
736 path.display(),
737 );
738 }
739 }
740 }
741 }
742}
743
744pub fn serialize_and_archive_snapshot_package(
746 snapshot_package: SnapshotPackage,
747 snapshot_config: &SnapshotConfig,
748) -> Result<SnapshotArchiveInfo> {
749 let SnapshotPackage {
750 snapshot_kind,
751 slot: snapshot_slot,
752 block_height,
753 hash: snapshot_hash,
754 mut snapshot_storages,
755 status_cache_slot_deltas,
756 bank_fields_to_serialize,
757 bank_hash_stats,
758 accounts_delta_hash,
759 accounts_hash,
760 epoch_accounts_hash,
761 bank_incremental_snapshot_persistence,
762 write_version,
763 enqueued: _,
764 } = snapshot_package;
765
766 let bank_snapshot_info = serialize_snapshot(
767 &snapshot_config.bank_snapshots_dir,
768 snapshot_config.snapshot_version,
769 snapshot_storages.as_slice(),
770 status_cache_slot_deltas.as_slice(),
771 bank_fields_to_serialize,
772 bank_hash_stats,
773 accounts_delta_hash,
774 accounts_hash,
775 epoch_accounts_hash,
776 bank_incremental_snapshot_persistence.as_ref(),
777 write_version,
778 )?;
779
780 let full_snapshot_archive_slot = match snapshot_kind {
782 SnapshotKind::FullSnapshot => snapshot_slot,
783 SnapshotKind::IncrementalSnapshot(base_slot) => base_slot,
784 };
785 write_full_snapshot_slot_file(&bank_snapshot_info.snapshot_dir, full_snapshot_archive_slot)
786 .map_err(|err| {
787 IoError::other(format!(
788 "failed to serialize snapshot slot {snapshot_slot}, block height {block_height}, kind {snapshot_kind:?}: {err}",
789 ))
790 })?;
791
792 let snapshot_archive_path = match snapshot_package.snapshot_kind {
793 SnapshotKind::FullSnapshot => build_full_snapshot_archive_path(
794 &snapshot_config.full_snapshot_archives_dir,
795 snapshot_package.slot,
796 &snapshot_package.hash,
797 snapshot_config.archive_format,
798 ),
799 SnapshotKind::IncrementalSnapshot(incremental_snapshot_base_slot) => {
800 snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
803 build_incremental_snapshot_archive_path(
804 &snapshot_config.incremental_snapshot_archives_dir,
805 incremental_snapshot_base_slot,
806 snapshot_package.slot,
807 &snapshot_package.hash,
808 snapshot_config.archive_format,
809 )
810 }
811 };
812
813 let snapshot_archive_info = archive_snapshot(
814 snapshot_kind,
815 snapshot_slot,
816 snapshot_hash,
817 snapshot_storages.as_slice(),
818 &bank_snapshot_info.snapshot_dir,
819 snapshot_archive_path,
820 snapshot_config.archive_format,
821 )?;
822
823 Ok(snapshot_archive_info)
824}
825
826#[allow(clippy::too_many_arguments)]
828fn serialize_snapshot(
829 bank_snapshots_dir: impl AsRef<Path>,
830 snapshot_version: SnapshotVersion,
831 snapshot_storages: &[Arc<AccountStorageEntry>],
832 slot_deltas: &[BankSlotDelta],
833 mut bank_fields: BankFieldsToSerialize,
834 bank_hash_stats: BankHashStats,
835 accounts_delta_hash: AccountsDeltaHash,
836 accounts_hash: AccountsHash,
837 epoch_accounts_hash: Option<EpochAccountsHash>,
838 bank_incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
839 write_version: StoredMetaWriteVersion,
840) -> Result<BankSnapshotInfo> {
841 let slot = bank_fields.slot;
842
843 let do_serialize_snapshot = || {
846 let mut measure_everything = Measure::start("");
847 let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
848 if bank_snapshot_dir.exists() {
849 return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
850 bank_snapshot_dir,
851 ));
852 }
853 fs::create_dir_all(&bank_snapshot_dir).map_err(|err| {
854 AddBankSnapshotError::CreateSnapshotDir(err, bank_snapshot_dir.clone())
855 })?;
856
857 let bank_snapshot_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
859 info!(
860 "Creating bank snapshot for slot {slot} at '{}'",
861 bank_snapshot_path.display(),
862 );
863
864 let (_, flush_storages_us) = measure_us!({
865 for storage in snapshot_storages {
866 storage.flush().map_err(|err| {
867 AddBankSnapshotError::FlushStorage(err, storage.path().to_path_buf())
868 })?;
869 }
870 });
871
872 let (_, hard_link_storages_us) = measure_us!(hard_link_storages_to_snapshot(
877 &bank_snapshot_dir,
878 slot,
879 snapshot_storages
880 )
881 .map_err(AddBankSnapshotError::HardLinkStorages)?);
882
883 let bank_snapshot_serializer = move |stream: &mut BufWriter<fs::File>| -> Result<()> {
884 let versioned_epoch_stakes = mem::take(&mut bank_fields.versioned_epoch_stakes);
885 let extra_fields = ExtraFieldsToSerialize {
886 lamports_per_signature: bank_fields.fee_rate_governor.lamports_per_signature,
887 incremental_snapshot_persistence: bank_incremental_snapshot_persistence,
888 epoch_accounts_hash,
889 versioned_epoch_stakes,
890 accounts_lt_hash: bank_fields.accounts_lt_hash.clone().map(Into::into),
891 };
892 serde_snapshot::serialize_bank_snapshot_into(
893 stream,
894 bank_fields,
895 bank_hash_stats,
896 accounts_delta_hash,
897 accounts_hash,
898 &get_storages_to_serialize(snapshot_storages),
899 extra_fields,
900 write_version,
901 )?;
902 Ok(())
903 };
904 let (bank_snapshot_consumed_size, bank_serialize) = measure_time!(
905 serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
906 .map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
907 "bank serialize"
908 );
909
910 let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
911 let (status_cache_consumed_size, status_cache_serialize_us) = measure_us!(
912 snapshot_bank_utils::serialize_status_cache(slot_deltas, &status_cache_path)
913 .map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?
914 );
915
916 let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
917 let (_, write_version_file_us) = measure_us!(fs::write(
918 &version_path,
919 snapshot_version.as_str().as_bytes(),
920 )
921 .map_err(|err| AddBankSnapshotError::WriteSnapshotVersionFile(err, version_path))?);
922
923 let state_complete_path = bank_snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
925 let (_, write_state_complete_file_us) = measure_us!(fs::File::create(&state_complete_path)
926 .map_err(|err| {
927 AddBankSnapshotError::CreateStateCompleteFile(err, state_complete_path)
928 })?);
929
930 measure_everything.stop();
931
932 datapoint_info!(
934 "snapshot_bank",
935 ("slot", slot, i64),
936 ("bank_size", bank_snapshot_consumed_size, i64),
937 ("status_cache_size", status_cache_consumed_size, i64),
938 ("flush_storages_us", flush_storages_us, i64),
939 ("hard_link_storages_us", hard_link_storages_us, i64),
940 ("bank_serialize_us", bank_serialize.as_us(), i64),
941 ("status_cache_serialize_us", status_cache_serialize_us, i64),
942 ("write_version_file_us", write_version_file_us, i64),
943 (
944 "write_state_complete_file_us",
945 write_state_complete_file_us,
946 i64
947 ),
948 ("total_us", measure_everything.as_us(), i64),
949 );
950
951 info!(
952 "{} for slot {} at {}",
953 bank_serialize,
954 slot,
955 bank_snapshot_path.display(),
956 );
957
958 Ok(BankSnapshotInfo {
959 slot,
960 snapshot_kind: BankSnapshotKind::Pre,
961 snapshot_dir: bank_snapshot_dir,
962 snapshot_version,
963 })
964 };
965
966 do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
967}
968
969fn archive_snapshot(
971 snapshot_kind: SnapshotKind,
972 snapshot_slot: Slot,
973 snapshot_hash: SnapshotHash,
974 snapshot_storages: &[Arc<AccountStorageEntry>],
975 bank_snapshot_dir: impl AsRef<Path>,
976 archive_path: impl AsRef<Path>,
977 archive_format: ArchiveFormat,
978) -> Result<SnapshotArchiveInfo> {
979 use ArchiveSnapshotPackageError as E;
980 const SNAPSHOTS_DIR: &str = "snapshots";
981 const ACCOUNTS_DIR: &str = "accounts";
982 info!("Generating snapshot archive for slot {snapshot_slot}, kind: {snapshot_kind:?}");
983
984 let mut timer = Measure::start("snapshot_package-package_snapshots");
985 let tar_dir = archive_path
986 .as_ref()
987 .parent()
988 .expect("Tar output path is invalid");
989
990 fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
991
992 let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
994 let staging_dir = tempfile::Builder::new()
995 .prefix(&format!("{}{}-", staging_dir_prefix, snapshot_slot))
996 .tempdir_in(tar_dir)
997 .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
998 let staging_snapshots_dir = staging_dir.path().join(SNAPSHOTS_DIR);
999
1000 let slot_str = snapshot_slot.to_string();
1001 let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
1002 fs::create_dir_all(&staging_snapshot_dir)
1004 .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
1005
1006 let src_snapshot_dir = bank_snapshot_dir.as_ref().canonicalize().map_err(|err| {
1008 E::CanonicalizeSnapshotSourceDir(err, bank_snapshot_dir.as_ref().to_path_buf())
1009 })?;
1010 let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
1011 let src_snapshot_file = src_snapshot_dir.join(slot_str);
1012 symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
1013 .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
1014
1015 let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1018 let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1019 symlink::symlink_file(&src_status_cache, &staging_status_cache)
1020 .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
1021
1022 let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
1024 let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1025 symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
1026 E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
1027 })?;
1028
1029 let staging_archive_path = tar_dir.join(format!(
1031 "{}{}.{}",
1032 staging_dir_prefix,
1033 snapshot_slot,
1034 archive_format.extension(),
1035 ));
1036
1037 {
1038 let mut archive_file = fs::File::create(&staging_archive_path)
1039 .map_err(|err| E::CreateArchiveFile(err, staging_archive_path.clone()))?;
1040
1041 let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
1042 let mut archive = tar::Builder::new(encoder);
1043 archive.sparse(false);
1052 archive
1055 .append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
1056 .map_err(E::ArchiveVersionFile)?;
1057 archive
1058 .append_dir_all(SNAPSHOTS_DIR, &staging_snapshots_dir)
1059 .map_err(E::ArchiveSnapshotsDir)?;
1060
1061 for storage in snapshot_storages {
1062 let path_in_archive = Path::new(ACCOUNTS_DIR)
1063 .join(AccountsFile::file_name(storage.slot(), storage.id()));
1064 match storage.accounts.internals_for_archive() {
1065 InternalsForArchive::Mmap(data) => {
1066 let mut header = tar::Header::new_gnu();
1067 header.set_path(path_in_archive).map_err(|err| {
1068 E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1069 })?;
1070 header.set_size(storage.capacity());
1071 header.set_cksum();
1072 archive.append(&header, data)
1073 }
1074 InternalsForArchive::FileIo(path) => {
1075 archive.append_path_with_name(path, path_in_archive)
1076 }
1077 }
1078 .map_err(|err| E::ArchiveAccountStorageFile(err, storage.path().to_path_buf()))?;
1079 }
1080
1081 archive.into_inner().map_err(E::FinishArchive)?;
1082 Ok(())
1083 };
1084
1085 match archive_format {
1086 ArchiveFormat::TarBzip2 => {
1087 let mut encoder =
1088 bzip2::write::BzEncoder::new(archive_file, bzip2::Compression::best());
1089 do_archive_files(&mut encoder)?;
1090 encoder.finish().map_err(E::FinishEncoder)?;
1091 }
1092 ArchiveFormat::TarGzip => {
1093 let mut encoder =
1094 flate2::write::GzEncoder::new(archive_file, flate2::Compression::default());
1095 do_archive_files(&mut encoder)?;
1096 encoder.finish().map_err(E::FinishEncoder)?;
1097 }
1098 ArchiveFormat::TarZstd { config } => {
1099 let mut encoder =
1100 zstd::stream::Encoder::new(archive_file, config.compression_level)
1101 .map_err(E::CreateEncoder)?;
1102 do_archive_files(&mut encoder)?;
1103 encoder.finish().map_err(E::FinishEncoder)?;
1104 }
1105 ArchiveFormat::TarLz4 => {
1106 let mut encoder = lz4::EncoderBuilder::new()
1107 .level(1)
1108 .build(archive_file)
1109 .map_err(E::CreateEncoder)?;
1110 do_archive_files(&mut encoder)?;
1111 let (_output, result) = encoder.finish();
1112 result.map_err(E::FinishEncoder)?;
1113 }
1114 ArchiveFormat::Tar => {
1115 do_archive_files(&mut archive_file)?;
1116 }
1117 };
1118 }
1119
1120 let metadata = fs::metadata(&staging_archive_path)
1122 .map_err(|err| E::QueryArchiveMetadata(err, staging_archive_path.clone()))?;
1123 let archive_path = archive_path.as_ref().to_path_buf();
1124 fs::rename(&staging_archive_path, &archive_path)
1125 .map_err(|err| E::MoveArchive(err, staging_archive_path, archive_path.clone()))?;
1126
1127 timer.stop();
1128 info!(
1129 "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
1130 archive_path.display(),
1131 snapshot_slot,
1132 timer.as_ms(),
1133 metadata.len()
1134 );
1135
1136 datapoint_info!(
1137 "archive-snapshot-package",
1138 ("slot", snapshot_slot, i64),
1139 ("archive_format", archive_format.to_string(), String),
1140 ("duration_ms", timer.as_ms(), i64),
1141 (
1142 if snapshot_kind.is_full_snapshot() {
1143 "full-snapshot-archive-size"
1144 } else {
1145 "incremental-snapshot-archive-size"
1146 },
1147 metadata.len(),
1148 i64
1149 ),
1150 );
1151 Ok(SnapshotArchiveInfo {
1152 path: archive_path,
1153 slot: snapshot_slot,
1154 hash: snapshot_hash,
1155 archive_format,
1156 })
1157}
1158
1159pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1161 let mut bank_snapshots = Vec::default();
1162 match fs::read_dir(&bank_snapshots_dir) {
1163 Err(err) => {
1164 info!(
1165 "Unable to read bank snapshots directory '{}': {err}",
1166 bank_snapshots_dir.as_ref().display(),
1167 );
1168 }
1169 Ok(paths) => paths
1170 .filter_map(|entry| {
1171 entry
1174 .ok()
1175 .filter(|entry| entry.path().is_dir())
1176 .and_then(|entry| {
1177 entry
1178 .path()
1179 .file_name()
1180 .and_then(|file_name| file_name.to_str())
1181 .and_then(|file_name| file_name.parse::<Slot>().ok())
1182 })
1183 })
1184 .for_each(
1185 |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
1186 Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
1187 Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
1190 },
1191 ),
1192 }
1193 bank_snapshots
1194}
1195
1196pub fn get_bank_snapshots_pre(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1200 let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1201 bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Pre);
1202 bank_snapshots
1203}
1204
1205pub fn get_bank_snapshots_post(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1209 let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1210 bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Post);
1211 bank_snapshots
1212}
1213
1214pub fn get_highest_bank_snapshot_pre(
1218 bank_snapshots_dir: impl AsRef<Path>,
1219) -> Option<BankSnapshotInfo> {
1220 do_get_highest_bank_snapshot(get_bank_snapshots_pre(bank_snapshots_dir))
1221}
1222
1223pub fn get_highest_bank_snapshot_post(
1227 bank_snapshots_dir: impl AsRef<Path>,
1228) -> Option<BankSnapshotInfo> {
1229 do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
1230}
1231
1232pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
1236 do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
1237}
1238
1239fn do_get_highest_bank_snapshot(
1240 mut bank_snapshots: Vec<BankSnapshotInfo>,
1241) -> Option<BankSnapshotInfo> {
1242 bank_snapshots.sort_unstable();
1243 bank_snapshots.into_iter().next_back()
1244}
1245
1246pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
1247where
1248 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1249{
1250 serialize_snapshot_data_file_capped::<F>(
1251 data_file_path,
1252 MAX_SNAPSHOT_DATA_FILE_SIZE,
1253 serializer,
1254 )
1255}
1256
1257pub fn deserialize_snapshot_data_file<T: Sized>(
1258 data_file_path: &Path,
1259 deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
1260) -> Result<T> {
1261 let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
1262 deserializer(streams.full_snapshot_stream)
1263 };
1264
1265 let wrapped_data_file_path = SnapshotRootPaths {
1266 full_snapshot_root_file_path: data_file_path.to_path_buf(),
1267 incremental_snapshot_root_file_path: None,
1268 };
1269
1270 deserialize_snapshot_data_files_capped(
1271 &wrapped_data_file_path,
1272 MAX_SNAPSHOT_DATA_FILE_SIZE,
1273 wrapped_deserializer,
1274 )
1275}
1276
1277pub fn deserialize_snapshot_data_files<T: Sized>(
1278 snapshot_root_paths: &SnapshotRootPaths,
1279 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1280) -> Result<T> {
1281 deserialize_snapshot_data_files_capped(
1282 snapshot_root_paths,
1283 MAX_SNAPSHOT_DATA_FILE_SIZE,
1284 deserializer,
1285 )
1286}
1287
1288fn serialize_snapshot_data_file_capped<F>(
1289 data_file_path: &Path,
1290 maximum_file_size: u64,
1291 serializer: F,
1292) -> Result<u64>
1293where
1294 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1295{
1296 let data_file = fs::File::create(data_file_path)?;
1297 let mut data_file_stream = BufWriter::new(data_file);
1298 serializer(&mut data_file_stream)?;
1299 data_file_stream.flush()?;
1300
1301 let consumed_size = data_file_stream.stream_position()?;
1302 if consumed_size > maximum_file_size {
1303 let error_message = format!(
1304 "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
1305 data_file_path.display(),
1306 );
1307 return Err(IoError::other(error_message).into());
1308 }
1309 Ok(consumed_size)
1310}
1311
1312fn deserialize_snapshot_data_files_capped<T: Sized>(
1313 snapshot_root_paths: &SnapshotRootPaths,
1314 maximum_file_size: u64,
1315 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1316) -> Result<T> {
1317 let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
1318 create_snapshot_data_file_stream(
1319 &snapshot_root_paths.full_snapshot_root_file_path,
1320 maximum_file_size,
1321 )?;
1322
1323 let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
1324 if let Some(ref incremental_snapshot_root_file_path) =
1325 snapshot_root_paths.incremental_snapshot_root_file_path
1326 {
1327 Some(create_snapshot_data_file_stream(
1328 incremental_snapshot_root_file_path,
1329 maximum_file_size,
1330 )?)
1331 } else {
1332 None
1333 }
1334 .unzip();
1335
1336 let mut snapshot_streams = SnapshotStreams {
1337 full_snapshot_stream: &mut full_snapshot_data_file_stream,
1338 incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
1339 };
1340 let ret = deserializer(&mut snapshot_streams)?;
1341
1342 check_deserialize_file_consumed(
1343 full_snapshot_file_size,
1344 &snapshot_root_paths.full_snapshot_root_file_path,
1345 &mut full_snapshot_data_file_stream,
1346 )?;
1347
1348 if let Some(ref incremental_snapshot_root_file_path) =
1349 snapshot_root_paths.incremental_snapshot_root_file_path
1350 {
1351 check_deserialize_file_consumed(
1352 incremental_snapshot_file_size.unwrap(),
1353 incremental_snapshot_root_file_path,
1354 incremental_snapshot_data_file_stream.as_mut().unwrap(),
1355 )?;
1356 }
1357
1358 Ok(ret)
1359}
1360
1361fn create_snapshot_data_file_stream(
1364 snapshot_root_file_path: impl AsRef<Path>,
1365 maximum_file_size: u64,
1366) -> Result<(u64, BufReader<std::fs::File>)> {
1367 let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
1368
1369 if snapshot_file_size > maximum_file_size {
1370 let error_message = format!(
1371 "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
1372 snapshot_root_file_path.as_ref().display(),
1373 snapshot_file_size,
1374 maximum_file_size,
1375 );
1376 return Err(IoError::other(error_message).into());
1377 }
1378
1379 let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
1380 let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
1381
1382 Ok((snapshot_file_size, snapshot_data_file_stream))
1383}
1384
1385fn check_deserialize_file_consumed(
1388 file_size: u64,
1389 file_path: impl AsRef<Path>,
1390 file_stream: &mut BufReader<std::fs::File>,
1391) -> Result<()> {
1392 let consumed_size = file_stream.stream_position()?;
1393
1394 if consumed_size != file_size {
1395 let error_message = format!(
1396 "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to deserialize",
1397 file_path.as_ref().display(),
1398 file_size,
1399 consumed_size,
1400 );
1401 return Err(IoError::other(error_message).into());
1402 }
1403
1404 Ok(())
1405}
1406
1407fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
1409 let run_path = appendvec_path.parent()?;
1410 let run_file_name = run_path.file_name()?;
1411 if run_file_name != ACCOUNTS_RUN_DIR {
1414 error!(
1415 "The account path {} does not have run/ as its immediate parent directory.",
1416 run_path.display()
1417 );
1418 return None;
1419 }
1420 let account_path = run_path.parent()?;
1421 Some(account_path.to_path_buf())
1422}
1423
1424fn get_snapshot_accounts_hardlink_dir(
1427 appendvec_path: &Path,
1428 bank_slot: Slot,
1429 account_paths: &mut HashSet<PathBuf>,
1430 hardlinks_dir: impl AsRef<Path>,
1431) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1432 let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1433 GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1434 })?;
1435
1436 let snapshot_hardlink_dir = account_path
1437 .join(ACCOUNTS_SNAPSHOT_DIR)
1438 .join(bank_slot.to_string());
1439
1440 if !account_paths.contains(&account_path) {
1443 let idx = account_paths.len();
1444 debug!(
1445 "for appendvec_path {}, create hard-link path {}",
1446 appendvec_path.display(),
1447 snapshot_hardlink_dir.display()
1448 );
1449 fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1450 GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1451 err,
1452 snapshot_hardlink_dir.clone(),
1453 )
1454 })?;
1455 let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1456 symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1457 GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1458 source: err,
1459 original: snapshot_hardlink_dir.clone(),
1460 link: symlink_path,
1461 }
1462 })?;
1463 account_paths.insert(account_path);
1464 };
1465
1466 Ok(snapshot_hardlink_dir)
1467}
1468
1469pub fn hard_link_storages_to_snapshot(
1474 bank_snapshot_dir: impl AsRef<Path>,
1475 bank_slot: Slot,
1476 snapshot_storages: &[Arc<AccountStorageEntry>],
1477) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1478 let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1479 fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1480 HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1481 err,
1482 accounts_hardlinks_dir.clone(),
1483 )
1484 })?;
1485
1486 let mut account_paths: HashSet<PathBuf> = HashSet::new();
1487 for storage in snapshot_storages {
1488 let storage_path = storage.accounts.path();
1489 let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1490 storage_path,
1491 bank_slot,
1492 &mut account_paths,
1493 &accounts_hardlinks_dir,
1494 )?;
1495 let hardlink_filename = AccountsFile::file_name(storage.slot(), storage.id());
1498 let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1499 fs::hard_link(storage_path, &hard_link_path).map_err(|err| {
1500 HardLinkStoragesToSnapshotError::HardLinkStorage(
1501 err,
1502 storage_path.to_path_buf(),
1503 hard_link_path,
1504 )
1505 })?;
1506 }
1507 Ok(())
1508}
1509
1510pub(crate) fn get_storages_to_serialize(
1513 snapshot_storages: &[Arc<AccountStorageEntry>],
1514) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1515 snapshot_storages
1516 .iter()
1517 .map(|storage| vec![Arc::clone(storage)])
1518 .collect::<Vec<_>>()
1519}
1520
1521const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
1523
1524pub fn verify_and_unarchive_snapshots(
1526 bank_snapshots_dir: impl AsRef<Path>,
1527 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1528 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1529 account_paths: &[PathBuf],
1530 storage_access: StorageAccess,
1531) -> Result<(
1532 UnarchivedSnapshot,
1533 Option<UnarchivedSnapshot>,
1534 AtomicAccountsFileId,
1535)> {
1536 check_are_snapshots_compatible(
1537 full_snapshot_archive_info,
1538 incremental_snapshot_archive_info,
1539 )?;
1540
1541 let parallel_divisions = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT);
1542
1543 let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
1544 let unarchived_full_snapshot = unarchive_snapshot(
1545 &bank_snapshots_dir,
1546 TMP_SNAPSHOT_ARCHIVE_PREFIX,
1547 full_snapshot_archive_info.path(),
1548 "snapshot untar",
1549 account_paths,
1550 full_snapshot_archive_info.archive_format(),
1551 parallel_divisions,
1552 next_append_vec_id.clone(),
1553 storage_access,
1554 )?;
1555
1556 let unarchived_incremental_snapshot =
1557 if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1558 let unarchived_incremental_snapshot = unarchive_snapshot(
1559 &bank_snapshots_dir,
1560 TMP_SNAPSHOT_ARCHIVE_PREFIX,
1561 incremental_snapshot_archive_info.path(),
1562 "incremental snapshot untar",
1563 account_paths,
1564 incremental_snapshot_archive_info.archive_format(),
1565 parallel_divisions,
1566 next_append_vec_id.clone(),
1567 storage_access,
1568 )?;
1569 Some(unarchived_incremental_snapshot)
1570 } else {
1571 None
1572 };
1573
1574 Ok((
1575 unarchived_full_snapshot,
1576 unarchived_incremental_snapshot,
1577 Arc::try_unwrap(next_append_vec_id).unwrap(),
1578 ))
1579}
1580
1581fn spawn_unpack_snapshot_thread(
1583 file_sender: Sender<PathBuf>,
1584 account_paths: Arc<Vec<PathBuf>>,
1585 ledger_dir: Arc<PathBuf>,
1586 mut archive: Archive<SharedBufferReader>,
1587 parallel_selector: Option<ParallelSelector>,
1588 thread_index: usize,
1589) -> JoinHandle<()> {
1590 Builder::new()
1591 .name(format!("solUnpkSnpsht{thread_index:02}"))
1592 .spawn(move || {
1593 hardened_unpack::streaming_unpack_snapshot(
1594 &mut archive,
1595 ledger_dir.as_path(),
1596 &account_paths,
1597 parallel_selector,
1598 &file_sender,
1599 )
1600 .unwrap();
1601 })
1602 .unwrap()
1603}
1604
1605fn streaming_unarchive_snapshot(
1607 file_sender: Sender<PathBuf>,
1608 account_paths: Vec<PathBuf>,
1609 ledger_dir: PathBuf,
1610 snapshot_archive_path: PathBuf,
1611 archive_format: ArchiveFormat,
1612 num_threads: usize,
1613) -> Vec<JoinHandle<()>> {
1614 let account_paths = Arc::new(account_paths);
1615 let ledger_dir = Arc::new(ledger_dir);
1616 let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format);
1617
1618 let archives: Vec<_> = (0..num_threads)
1620 .map(|_| {
1621 let reader = SharedBufferReader::new(&shared_buffer);
1622 Archive::new(reader)
1623 })
1624 .collect();
1625
1626 archives
1627 .into_iter()
1628 .enumerate()
1629 .map(|(thread_index, archive)| {
1630 let parallel_selector = Some(ParallelSelector {
1631 index: thread_index,
1632 divisions: num_threads,
1633 });
1634
1635 spawn_unpack_snapshot_thread(
1636 file_sender.clone(),
1637 account_paths.clone(),
1638 ledger_dir.clone(),
1639 archive,
1640 parallel_selector,
1641 thread_index,
1642 )
1643 })
1644 .collect()
1645}
1646
1647fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1652 let snapshots_dir = unpack_dir.as_ref().join("snapshots");
1653 if !snapshots_dir.is_dir() {
1654 return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1655 }
1656
1657 let slot_dir = std::fs::read_dir(&snapshots_dir)
1659 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1660 .find(|entry| entry.as_ref().unwrap().path().is_dir())
1661 .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1662 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1663 .path();
1664
1665 let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
1666 fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
1667
1668 let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1669 fs::hard_link(
1670 status_cache_file,
1671 slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
1672 )?;
1673
1674 let state_complete_file = slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
1675 fs::File::create(state_complete_file)?;
1676
1677 Ok(())
1678}
1679
1680fn unarchive_snapshot(
1684 bank_snapshots_dir: impl AsRef<Path>,
1685 unpacked_snapshots_dir_prefix: &'static str,
1686 snapshot_archive_path: impl AsRef<Path>,
1687 measure_name: &'static str,
1688 account_paths: &[PathBuf],
1689 archive_format: ArchiveFormat,
1690 parallel_divisions: usize,
1691 next_append_vec_id: Arc<AtomicAccountsFileId>,
1692 storage_access: StorageAccess,
1693) -> Result<UnarchivedSnapshot> {
1694 let unpack_dir = tempfile::Builder::new()
1695 .prefix(unpacked_snapshots_dir_prefix)
1696 .tempdir_in(bank_snapshots_dir)?;
1697 let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
1698
1699 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1700 streaming_unarchive_snapshot(
1701 file_sender,
1702 account_paths.to_vec(),
1703 unpack_dir.path().to_path_buf(),
1704 snapshot_archive_path.as_ref().to_path_buf(),
1705 archive_format,
1706 parallel_divisions,
1707 );
1708
1709 let num_rebuilder_threads = num_cpus::get_physical()
1710 .saturating_sub(parallel_divisions)
1711 .max(1);
1712 let (version_and_storages, measure_untar) = measure_time!(
1713 SnapshotStorageRebuilder::rebuild_storage(
1714 file_receiver,
1715 num_rebuilder_threads,
1716 next_append_vec_id,
1717 SnapshotFrom::Archive,
1718 storage_access,
1719 )?,
1720 measure_name
1721 );
1722 info!("{}", measure_untar);
1723
1724 create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1725
1726 let RebuiltSnapshotStorage {
1727 snapshot_version,
1728 storage,
1729 } = version_and_storages;
1730 Ok(UnarchivedSnapshot {
1731 unpack_dir,
1732 storage,
1733 unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1734 unpacked_snapshots_dir,
1735 snapshot_version,
1736 },
1737 measure_untar,
1738 })
1739}
1740
1741fn streaming_snapshot_dir_files(
1744 file_sender: Sender<PathBuf>,
1745 snapshot_file_path: impl Into<PathBuf>,
1746 snapshot_version_path: impl Into<PathBuf>,
1747 account_paths: &[PathBuf],
1748) -> Result<()> {
1749 file_sender.send(snapshot_file_path.into())?;
1750 file_sender.send(snapshot_version_path.into())?;
1751
1752 for account_path in account_paths {
1753 for file in fs::read_dir(account_path)? {
1754 file_sender.send(file?.path())?;
1755 }
1756 }
1757
1758 Ok(())
1759}
1760
1761pub fn rebuild_storages_from_snapshot_dir(
1766 snapshot_info: &BankSnapshotInfo,
1767 account_paths: &[PathBuf],
1768 next_append_vec_id: Arc<AtomicAccountsFileId>,
1769 storage_access: StorageAccess,
1770) -> Result<AccountStorageMap> {
1771 let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1772 let accounts_hardlinks = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1773 let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1774
1775 let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1776 IoError::other(format!(
1777 "failed to read accounts hardlinks dir '{}': {err}",
1778 accounts_hardlinks.display(),
1779 ))
1780 })?;
1781 for dir_entry in read_dir {
1782 let symlink_path = dir_entry?.path();
1783 let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
1786 IoError::other(format!(
1787 "failed to read symlink '{}': {err}",
1788 symlink_path.display(),
1789 ))
1790 })?;
1791 let account_run_path = account_snapshot_path
1792 .parent()
1793 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1794 .parent()
1795 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1796 .join(ACCOUNTS_RUN_DIR);
1797 if !account_run_paths.contains(&account_run_path) {
1798 return Err(SnapshotError::AccountPathsMismatch);
1801 }
1802 let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
1805 IoError::other(format!(
1806 "failed to read account snapshot dir '{}': {err}",
1807 account_snapshot_path.display(),
1808 ))
1809 })?;
1810 for file in read_dir {
1811 let file_path = file?.path();
1812 let file_name = file_path
1813 .file_name()
1814 .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
1815 let dest_path = account_run_path.join(file_name);
1816 fs::hard_link(&file_path, &dest_path).map_err(|err| {
1817 IoError::other(format!(
1818 "failed to hard link from '{}' to '{}': {err}",
1819 file_path.display(),
1820 dest_path.display(),
1821 ))
1822 })?;
1823 }
1824 }
1825
1826 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1827 let snapshot_file_path = &snapshot_info.snapshot_path();
1828 let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1829 streaming_snapshot_dir_files(
1830 file_sender,
1831 snapshot_file_path,
1832 snapshot_version_path,
1833 account_paths,
1834 )?;
1835
1836 let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1837 let version_and_storages = SnapshotStorageRebuilder::rebuild_storage(
1838 file_receiver,
1839 num_rebuilder_threads,
1840 next_append_vec_id,
1841 SnapshotFrom::Dir,
1842 storage_access,
1843 )?;
1844
1845 let RebuiltSnapshotStorage {
1846 snapshot_version: _,
1847 storage,
1848 } = version_and_storages;
1849 Ok(storage)
1850}
1851
1852fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
1856 let file_metadata = fs::metadata(&path).map_err(|err| {
1858 IoError::other(format!(
1859 "failed to query snapshot version file metadata '{}': {err}",
1860 path.as_ref().display(),
1861 ))
1862 })?;
1863 let file_size = file_metadata.len();
1864 if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
1865 let error_message = format!(
1866 "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
1867 path.as_ref().display(),
1868 file_size,
1869 MAX_SNAPSHOT_VERSION_FILE_SIZE,
1870 );
1871 return Err(IoError::other(error_message).into());
1872 }
1873
1874 let mut snapshot_version = String::new();
1876 let mut file = fs::File::open(&path).map_err(|err| {
1877 IoError::other(format!(
1878 "failed to open snapshot version file '{}': {err}",
1879 path.as_ref().display()
1880 ))
1881 })?;
1882 file.read_to_string(&mut snapshot_version).map_err(|err| {
1883 IoError::other(format!(
1884 "failed to read snapshot version from file '{}': {err}",
1885 path.as_ref().display()
1886 ))
1887 })?;
1888
1889 Ok(snapshot_version.trim().to_string())
1890}
1891
1892fn check_are_snapshots_compatible(
1895 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1896 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1897) -> Result<()> {
1898 if incremental_snapshot_archive_info.is_none() {
1899 return Ok(());
1900 }
1901
1902 let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
1903
1904 (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
1905 .then_some(())
1906 .ok_or_else(|| {
1907 SnapshotError::MismatchedBaseSlot(
1908 full_snapshot_archive_info.slot(),
1909 incremental_snapshot_archive_info.base_slot(),
1910 )
1911 })
1912}
1913
1914pub fn path_to_file_name_str(path: &Path) -> Result<&str> {
1916 path.file_name()
1917 .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))?
1918 .to_str()
1919 .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf()))
1920}
1921
1922pub fn build_snapshot_archives_remote_dir(snapshot_archives_dir: impl AsRef<Path>) -> PathBuf {
1923 snapshot_archives_dir
1924 .as_ref()
1925 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
1926}
1927
1928pub fn build_full_snapshot_archive_path(
1931 full_snapshot_archives_dir: impl AsRef<Path>,
1932 slot: Slot,
1933 hash: &SnapshotHash,
1934 archive_format: ArchiveFormat,
1935) -> PathBuf {
1936 full_snapshot_archives_dir.as_ref().join(format!(
1937 "snapshot-{}-{}.{}",
1938 slot,
1939 hash.0,
1940 archive_format.extension(),
1941 ))
1942}
1943
1944pub fn build_incremental_snapshot_archive_path(
1948 incremental_snapshot_archives_dir: impl AsRef<Path>,
1949 base_slot: Slot,
1950 slot: Slot,
1951 hash: &SnapshotHash,
1952 archive_format: ArchiveFormat,
1953) -> PathBuf {
1954 incremental_snapshot_archives_dir.as_ref().join(format!(
1955 "incremental-snapshot-{}-{}-{}.{}",
1956 base_slot,
1957 slot,
1958 hash.0,
1959 archive_format.extension(),
1960 ))
1961}
1962
1963pub(crate) fn parse_full_snapshot_archive_filename(
1965 archive_filename: &str,
1966) -> Result<(Slot, SnapshotHash, ArchiveFormat)> {
1967 lazy_static! {
1968 static ref RE: Regex = Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1969 }
1970
1971 let do_parse = || {
1972 RE.captures(archive_filename).and_then(|captures| {
1973 let slot = captures
1974 .name("slot")
1975 .map(|x| x.as_str().parse::<Slot>())?
1976 .ok()?;
1977 let hash = captures
1978 .name("hash")
1979 .map(|x| x.as_str().parse::<Hash>())?
1980 .ok()?;
1981 let archive_format = captures
1982 .name("ext")
1983 .map(|x| x.as_str().parse::<ArchiveFormat>())?
1984 .ok()?;
1985
1986 Some((slot, SnapshotHash(hash), archive_format))
1987 })
1988 };
1989
1990 do_parse().ok_or_else(|| {
1991 SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
1992 })
1993}
1994
1995pub(crate) fn parse_incremental_snapshot_archive_filename(
1997 archive_filename: &str,
1998) -> Result<(Slot, Slot, SnapshotHash, ArchiveFormat)> {
1999 lazy_static! {
2000 static ref RE: Regex = Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
2001 }
2002
2003 let do_parse = || {
2004 RE.captures(archive_filename).and_then(|captures| {
2005 let base_slot = captures
2006 .name("base")
2007 .map(|x| x.as_str().parse::<Slot>())?
2008 .ok()?;
2009 let slot = captures
2010 .name("slot")
2011 .map(|x| x.as_str().parse::<Slot>())?
2012 .ok()?;
2013 let hash = captures
2014 .name("hash")
2015 .map(|x| x.as_str().parse::<Hash>())?
2016 .ok()?;
2017 let archive_format = captures
2018 .name("ext")
2019 .map(|x| x.as_str().parse::<ArchiveFormat>())?
2020 .ok()?;
2021
2022 Some((base_slot, slot, SnapshotHash(hash), archive_format))
2023 })
2024 };
2025
2026 do_parse().ok_or_else(|| {
2027 SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2028 })
2029}
2030
2031fn get_snapshot_archives<T, F>(snapshot_archives_dir: &Path, cb: F) -> Vec<T>
2033where
2034 F: Fn(PathBuf) -> Result<T>,
2035{
2036 let walk_dir = |dir: &Path| -> Vec<T> {
2037 let entry_iter = fs::read_dir(dir);
2038 match entry_iter {
2039 Err(err) => {
2040 info!(
2041 "Unable to read snapshot archives directory '{}': {err}",
2042 dir.display(),
2043 );
2044 vec![]
2045 }
2046 Ok(entries) => entries
2047 .filter_map(|entry| entry.map_or(None, |entry| cb(entry.path()).ok()))
2048 .collect(),
2049 }
2050 };
2051
2052 let mut ret = walk_dir(snapshot_archives_dir);
2053 let remote_dir = build_snapshot_archives_remote_dir(snapshot_archives_dir);
2054 if remote_dir.exists() {
2055 ret.append(&mut walk_dir(remote_dir.as_ref()));
2056 }
2057 ret
2058}
2059
2060pub fn get_full_snapshot_archives(
2062 full_snapshot_archives_dir: impl AsRef<Path>,
2063) -> Vec<FullSnapshotArchiveInfo> {
2064 get_snapshot_archives(
2065 full_snapshot_archives_dir.as_ref(),
2066 FullSnapshotArchiveInfo::new_from_path,
2067 )
2068}
2069
2070pub fn get_incremental_snapshot_archives(
2072 incremental_snapshot_archives_dir: impl AsRef<Path>,
2073) -> Vec<IncrementalSnapshotArchiveInfo> {
2074 get_snapshot_archives(
2075 incremental_snapshot_archives_dir.as_ref(),
2076 IncrementalSnapshotArchiveInfo::new_from_path,
2077 )
2078}
2079
2080pub fn get_highest_full_snapshot_archive_slot(
2082 full_snapshot_archives_dir: impl AsRef<Path>,
2083) -> Option<Slot> {
2084 get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
2085 .map(|full_snapshot_archive_info| full_snapshot_archive_info.slot())
2086}
2087
2088pub fn get_highest_incremental_snapshot_archive_slot(
2091 incremental_snapshot_archives_dir: impl AsRef<Path>,
2092 full_snapshot_slot: Slot,
2093) -> Option<Slot> {
2094 get_highest_incremental_snapshot_archive_info(
2095 incremental_snapshot_archives_dir,
2096 full_snapshot_slot,
2097 )
2098 .map(|incremental_snapshot_archive_info| incremental_snapshot_archive_info.slot())
2099}
2100
2101pub fn get_highest_full_snapshot_archive_info(
2103 full_snapshot_archives_dir: impl AsRef<Path>,
2104) -> Option<FullSnapshotArchiveInfo> {
2105 let mut full_snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2106 full_snapshot_archives.sort_unstable();
2107 full_snapshot_archives.into_iter().next_back()
2108}
2109
2110pub fn get_highest_incremental_snapshot_archive_info(
2113 incremental_snapshot_archives_dir: impl AsRef<Path>,
2114 full_snapshot_slot: Slot,
2115) -> Option<IncrementalSnapshotArchiveInfo> {
2116 let mut incremental_snapshot_archives =
2120 get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
2121 .into_iter()
2122 .filter(|incremental_snapshot_archive_info| {
2123 incremental_snapshot_archive_info.base_slot() == full_snapshot_slot
2124 })
2125 .collect::<Vec<_>>();
2126 incremental_snapshot_archives.sort_unstable();
2127 incremental_snapshot_archives.into_iter().next_back()
2128}
2129
2130pub fn purge_old_snapshot_archives(
2131 full_snapshot_archives_dir: impl AsRef<Path>,
2132 incremental_snapshot_archives_dir: impl AsRef<Path>,
2133 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2134 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2135) {
2136 info!(
2137 "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
2138 full_snapshot_archives_dir.as_ref().display(),
2139 maximum_full_snapshot_archives_to_retain
2140 );
2141
2142 let mut full_snapshot_archives = get_full_snapshot_archives(&full_snapshot_archives_dir);
2143 full_snapshot_archives.sort_unstable();
2144 full_snapshot_archives.reverse();
2145
2146 let num_to_retain = full_snapshot_archives
2147 .len()
2148 .min(maximum_full_snapshot_archives_to_retain.get());
2149 trace!(
2150 "There are {} full snapshot archives, retaining {}",
2151 full_snapshot_archives.len(),
2152 num_to_retain,
2153 );
2154
2155 let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
2156 if full_snapshot_archives.is_empty() {
2157 None
2158 } else {
2159 Some(full_snapshot_archives.split_at(num_to_retain))
2160 }
2161 .unwrap_or_default();
2162
2163 let retained_full_snapshot_slots = full_snapshot_archives_to_retain
2164 .iter()
2165 .map(|ai| ai.slot())
2166 .collect::<HashSet<_>>();
2167
2168 fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
2169 for path in archives.iter().map(|a| a.path()) {
2170 trace!("Removing snapshot archive: {}", path.display());
2171 let result = fs::remove_file(path);
2172 if let Err(err) = result {
2173 info!(
2174 "Failed to remove snapshot archive '{}': {err}",
2175 path.display()
2176 );
2177 }
2178 }
2179 }
2180 remove_archives(full_snapshot_archives_to_remove);
2181
2182 info!(
2183 "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
2184 incremental_snapshot_archives_dir.as_ref().display(),
2185 maximum_incremental_snapshot_archives_to_retain
2186 );
2187 let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
2188 for incremental_snapshot_archive in
2189 get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
2190 {
2191 incremental_snapshot_archives_by_base_slot
2192 .entry(incremental_snapshot_archive.base_slot())
2193 .or_default()
2194 .push(incremental_snapshot_archive)
2195 }
2196
2197 let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
2198 for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
2199 {
2200 incremental_snapshot_archives.sort_unstable();
2201 let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
2202 maximum_incremental_snapshot_archives_to_retain.get()
2203 } else {
2204 usize::from(retained_full_snapshot_slots.contains(&base_slot))
2205 };
2206 trace!(
2207 "There are {} incremental snapshot archives for base slot {}, removing {} of them",
2208 incremental_snapshot_archives.len(),
2209 base_slot,
2210 incremental_snapshot_archives
2211 .len()
2212 .saturating_sub(num_to_retain),
2213 );
2214
2215 incremental_snapshot_archives.truncate(
2216 incremental_snapshot_archives
2217 .len()
2218 .saturating_sub(num_to_retain),
2219 );
2220 remove_archives(&incremental_snapshot_archives);
2221 }
2222}
2223
2224#[cfg(feature = "dev-context-only-utils")]
2225fn unpack_snapshot_local(
2226 shared_buffer: SharedBuffer,
2227 ledger_dir: &Path,
2228 account_paths: &[PathBuf],
2229 parallel_divisions: usize,
2230) -> Result<UnpackedAppendVecMap> {
2231 assert!(parallel_divisions > 0);
2232
2233 let readers = (0..parallel_divisions)
2235 .map(|_| SharedBufferReader::new(&shared_buffer))
2236 .collect::<Vec<_>>();
2237
2238 let all_unpacked_append_vec_map = readers
2240 .into_par_iter()
2241 .enumerate()
2242 .map(|(index, reader)| {
2243 let parallel_selector = Some(ParallelSelector {
2244 index,
2245 divisions: parallel_divisions,
2246 });
2247 let mut archive = Archive::new(reader);
2248 hardened_unpack::unpack_snapshot(
2249 &mut archive,
2250 ledger_dir,
2251 account_paths,
2252 parallel_selector,
2253 )
2254 })
2255 .collect::<Vec<_>>();
2256
2257 let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
2258 for h in all_unpacked_append_vec_map {
2259 unpacked_append_vec_map.extend(h?);
2260 }
2261
2262 Ok(unpacked_append_vec_map)
2263}
2264
2265fn untar_snapshot_create_shared_buffer(
2266 snapshot_tar: &Path,
2267 archive_format: ArchiveFormat,
2268) -> SharedBuffer {
2269 let open_file = || {
2270 fs::File::open(snapshot_tar)
2271 .map_err(|err| {
2272 IoError::other(format!(
2273 "failed to open snapshot archive '{}': {err}",
2274 snapshot_tar.display(),
2275 ))
2276 })
2277 .unwrap()
2278 };
2279 match archive_format {
2280 ArchiveFormat::TarBzip2 => SharedBuffer::new(BzDecoder::new(BufReader::new(open_file()))),
2281 ArchiveFormat::TarGzip => SharedBuffer::new(GzDecoder::new(BufReader::new(open_file()))),
2282 ArchiveFormat::TarZstd { .. } => SharedBuffer::new(
2283 zstd::stream::read::Decoder::new(BufReader::new(open_file())).unwrap(),
2284 ),
2285 ArchiveFormat::TarLz4 => {
2286 SharedBuffer::new(lz4::Decoder::new(BufReader::new(open_file())).unwrap())
2287 }
2288 ArchiveFormat::Tar => SharedBuffer::new(BufReader::new(open_file())),
2289 }
2290}
2291
2292#[cfg(feature = "dev-context-only-utils")]
2293fn untar_snapshot_in(
2294 snapshot_tar: impl AsRef<Path>,
2295 unpack_dir: &Path,
2296 account_paths: &[PathBuf],
2297 archive_format: ArchiveFormat,
2298 parallel_divisions: usize,
2299) -> Result<UnpackedAppendVecMap> {
2300 let shared_buffer = untar_snapshot_create_shared_buffer(snapshot_tar.as_ref(), archive_format);
2301 unpack_snapshot_local(shared_buffer, unpack_dir, account_paths, parallel_divisions)
2302}
2303
2304pub fn verify_unpacked_snapshots_dir_and_version(
2305 unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
2306) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
2307 info!(
2308 "snapshot version: {}",
2309 &unpacked_snapshots_dir_and_version.snapshot_version
2310 );
2311
2312 let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
2313 let mut bank_snapshots =
2314 get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
2315 if bank_snapshots.len() > 1 {
2316 return Err(IoError::other(format!(
2317 "invalid snapshot format: only one snapshot allowed, but found {}",
2318 bank_snapshots.len(),
2319 ))
2320 .into());
2321 }
2322 let root_paths = bank_snapshots.pop().ok_or_else(|| {
2323 IoError::other(format!(
2324 "no snapshots found in snapshots directory '{}'",
2325 unpacked_snapshots_dir_and_version
2326 .unpacked_snapshots_dir
2327 .display(),
2328 ))
2329 })?;
2330 Ok((snapshot_version, root_paths))
2331}
2332
2333pub fn get_snapshot_file_name(slot: Slot) -> String {
2335 slot.to_string()
2336}
2337
2338pub fn get_bank_snapshot_dir(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) -> PathBuf {
2340 bank_snapshots_dir
2341 .as_ref()
2342 .join(get_snapshot_file_name(slot))
2343}
2344
2345#[derive(Debug, Copy, Clone)]
2346pub enum VerifyBank {
2348 Deterministic,
2350 NonDeterministic,
2353}
2354
2355#[cfg(feature = "dev-context-only-utils")]
2356pub fn verify_snapshot_archive(
2357 snapshot_archive: impl AsRef<Path>,
2358 snapshots_to_verify: impl AsRef<Path>,
2359 archive_format: ArchiveFormat,
2360 verify_bank: VerifyBank,
2361 slot: Slot,
2362) {
2363 let temp_dir = tempfile::TempDir::new().unwrap();
2364 let unpack_dir = temp_dir.path();
2365 let unpack_account_dir = create_accounts_run_and_snapshot_dirs(unpack_dir).unwrap().0;
2366 untar_snapshot_in(
2367 snapshot_archive,
2368 unpack_dir,
2369 &[unpack_account_dir.clone()],
2370 archive_format,
2371 1,
2372 )
2373 .unwrap();
2374
2375 let unpacked_snapshots = unpack_dir.join("snapshots");
2377
2378 let storages_to_verify = unpack_dir.join("storages_to_verify");
2381 fs::create_dir_all(&storages_to_verify).unwrap();
2383
2384 let slot = slot.to_string();
2385 let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
2386
2387 if let VerifyBank::NonDeterministic = verify_bank {
2388 let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
2390 let p2 = unpacked_snapshots.join(&slot).join(&slot);
2391 assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
2392 fs::remove_file(p1).unwrap();
2393 fs::remove_file(p2).unwrap();
2394 }
2395
2396 let existing_unpacked_status_cache_file =
2402 unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2403 let new_unpacked_status_cache_file = unpacked_snapshots
2404 .join(&slot)
2405 .join(SNAPSHOT_STATUS_CACHE_FILENAME);
2406 fs::rename(
2407 existing_unpacked_status_cache_file,
2408 new_unpacked_status_cache_file,
2409 )
2410 .unwrap();
2411
2412 let accounts_hardlinks_dir = snapshot_slot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2413 if accounts_hardlinks_dir.is_dir() {
2414 for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
2416 let link_dst_path = fs::read_link(entry.unwrap().path()).unwrap();
2417 for entry in fs::read_dir(&link_dst_path).unwrap() {
2419 let src_path = entry.unwrap().path();
2420 let dst_path = storages_to_verify.join(src_path.file_name().unwrap());
2421 fs::copy(src_path, dst_path).unwrap();
2422 }
2423 }
2424 fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
2425 }
2426
2427 let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
2428 if version_path.is_file() {
2429 fs::remove_file(version_path).unwrap();
2430 }
2431
2432 let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2433 if state_complete_path.is_file() {
2434 fs::remove_file(state_complete_path).unwrap();
2435 }
2436
2437 assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
2438
2439 _ = fs::remove_dir(unpack_account_dir.join("accounts"));
2445 assert!(!dir_diff::is_different(&storages_to_verify, unpack_account_dir).unwrap());
2447}
2448
2449pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
2451 let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2452 purge_bank_snapshots(&bank_snapshots);
2453}
2454
2455pub fn purge_old_bank_snapshots(
2457 bank_snapshots_dir: impl AsRef<Path>,
2458 num_bank_snapshots_to_retain: usize,
2459 filter_by_kind: Option<BankSnapshotKind>,
2460) {
2461 let mut bank_snapshots = match filter_by_kind {
2462 Some(BankSnapshotKind::Pre) => get_bank_snapshots_pre(&bank_snapshots_dir),
2463 Some(BankSnapshotKind::Post) => get_bank_snapshots_post(&bank_snapshots_dir),
2464 None => get_bank_snapshots(&bank_snapshots_dir),
2465 };
2466
2467 bank_snapshots.sort_unstable();
2468 purge_bank_snapshots(
2469 bank_snapshots
2470 .iter()
2471 .rev()
2472 .skip(num_bank_snapshots_to_retain),
2473 );
2474}
2475
2476pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
2481 purge_old_bank_snapshots(&bank_snapshots_dir, 0, Some(BankSnapshotKind::Pre));
2482 purge_old_bank_snapshots(&bank_snapshots_dir, 1, Some(BankSnapshotKind::Post));
2483
2484 let highest_bank_snapshot_post = get_highest_bank_snapshot_post(&bank_snapshots_dir);
2485 if let Some(highest_bank_snapshot_post) = highest_bank_snapshot_post {
2486 debug!(
2487 "Retained bank snapshot for slot {}, and purged the rest.",
2488 highest_bank_snapshot_post.slot
2489 );
2490 }
2491}
2492
2493pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
2495 let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2496 bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
2497 purge_bank_snapshots(&bank_snapshots);
2498}
2499
2500fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
2504 for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
2505 if purge_bank_snapshot(snapshot_dir).is_err() {
2506 warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
2507 }
2508 }
2509}
2510
2511pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
2513 const FN_ERR: &str = "failed to purge bank snapshot";
2514 let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2515 if accounts_hardlinks_dir.is_dir() {
2516 let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
2519 IoError::other(format!(
2520 "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
2521 accounts_hardlinks_dir.display(),
2522 ))
2523 })?;
2524 for entry in read_dir {
2525 let accounts_hardlink_dir = entry?.path();
2526 let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
2527 IoError::other(format!(
2528 "{FN_ERR}: failed to read symlink '{}': {err}",
2529 accounts_hardlink_dir.display(),
2530 ))
2531 })?;
2532 move_and_async_delete_path(&accounts_hardlink_dir);
2533 }
2534 }
2535 fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
2536 IoError::other(format!(
2537 "{FN_ERR}: failed to remove dir '{}': {err}",
2538 bank_snapshot_dir.as_ref().display(),
2539 ))
2540 })?;
2541 Ok(())
2542}
2543
2544pub fn should_take_full_snapshot(
2545 block_height: Slot,
2546 full_snapshot_archive_interval_slots: Slot,
2547) -> bool {
2548 block_height % full_snapshot_archive_interval_slots == 0
2549}
2550
2551pub fn should_take_incremental_snapshot(
2552 block_height: Slot,
2553 incremental_snapshot_archive_interval_slots: Slot,
2554 latest_full_snapshot_slot: Option<Slot>,
2555) -> bool {
2556 block_height % incremental_snapshot_archive_interval_slots == 0
2557 && latest_full_snapshot_slot.is_some()
2558}
2559
2560#[cfg(feature = "dev-context-only-utils")]
2565pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
2566 let tmp_dir = tempfile::TempDir::new().unwrap();
2567 let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
2568 (tmp_dir, account_dir)
2569}
2570
2571#[cfg(test)]
2572mod tests {
2573 use {
2574 super::*,
2575 assert_matches::assert_matches,
2576 bincode::{deserialize_from, serialize_into},
2577 std::{convert::TryFrom, mem::size_of},
2578 tempfile::NamedTempFile,
2579 };
2580
2581 #[test]
2582 fn test_serialize_snapshot_data_file_under_limit() {
2583 let temp_dir = tempfile::TempDir::new().unwrap();
2584 let expected_consumed_size = size_of::<u32>() as u64;
2585 let consumed_size = serialize_snapshot_data_file_capped(
2586 &temp_dir.path().join("data-file"),
2587 expected_consumed_size,
2588 |stream| {
2589 serialize_into(stream, &2323_u32)?;
2590 Ok(())
2591 },
2592 )
2593 .unwrap();
2594 assert_eq!(consumed_size, expected_consumed_size);
2595 }
2596
2597 #[test]
2598 fn test_serialize_snapshot_data_file_over_limit() {
2599 let temp_dir = tempfile::TempDir::new().unwrap();
2600 let expected_consumed_size = size_of::<u32>() as u64;
2601 let result = serialize_snapshot_data_file_capped(
2602 &temp_dir.path().join("data-file"),
2603 expected_consumed_size - 1,
2604 |stream| {
2605 serialize_into(stream, &2323_u32)?;
2606 Ok(())
2607 },
2608 );
2609 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
2610 }
2611
2612 #[test]
2613 fn test_deserialize_snapshot_data_file_under_limit() {
2614 let expected_data = 2323_u32;
2615 let expected_consumed_size = size_of::<u32>() as u64;
2616
2617 let temp_dir = tempfile::TempDir::new().unwrap();
2618 serialize_snapshot_data_file_capped(
2619 &temp_dir.path().join("data-file"),
2620 expected_consumed_size,
2621 |stream| {
2622 serialize_into(stream, &expected_data)?;
2623 Ok(())
2624 },
2625 )
2626 .unwrap();
2627
2628 let snapshot_root_paths = SnapshotRootPaths {
2629 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2630 incremental_snapshot_root_file_path: None,
2631 };
2632
2633 let actual_data = deserialize_snapshot_data_files_capped(
2634 &snapshot_root_paths,
2635 expected_consumed_size,
2636 |stream| {
2637 Ok(deserialize_from::<_, u32>(
2638 &mut stream.full_snapshot_stream,
2639 )?)
2640 },
2641 )
2642 .unwrap();
2643 assert_eq!(actual_data, expected_data);
2644 }
2645
2646 #[test]
2647 fn test_deserialize_snapshot_data_file_over_limit() {
2648 let expected_data = 2323_u32;
2649 let expected_consumed_size = size_of::<u32>() as u64;
2650
2651 let temp_dir = tempfile::TempDir::new().unwrap();
2652 serialize_snapshot_data_file_capped(
2653 &temp_dir.path().join("data-file"),
2654 expected_consumed_size,
2655 |stream| {
2656 serialize_into(stream, &expected_data)?;
2657 Ok(())
2658 },
2659 )
2660 .unwrap();
2661
2662 let snapshot_root_paths = SnapshotRootPaths {
2663 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2664 incremental_snapshot_root_file_path: None,
2665 };
2666
2667 let result = deserialize_snapshot_data_files_capped(
2668 &snapshot_root_paths,
2669 expected_consumed_size - 1,
2670 |stream| {
2671 Ok(deserialize_from::<_, u32>(
2672 &mut stream.full_snapshot_stream,
2673 )?)
2674 },
2675 );
2676 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
2677 }
2678
2679 #[test]
2680 fn test_deserialize_snapshot_data_file_extra_data() {
2681 let expected_data = 2323_u32;
2682 let expected_consumed_size = size_of::<u32>() as u64;
2683
2684 let temp_dir = tempfile::TempDir::new().unwrap();
2685 serialize_snapshot_data_file_capped(
2686 &temp_dir.path().join("data-file"),
2687 expected_consumed_size * 2,
2688 |stream| {
2689 serialize_into(stream.by_ref(), &expected_data)?;
2690 serialize_into(stream.by_ref(), &expected_data)?;
2691 Ok(())
2692 },
2693 )
2694 .unwrap();
2695
2696 let snapshot_root_paths = SnapshotRootPaths {
2697 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2698 incremental_snapshot_root_file_path: None,
2699 };
2700
2701 let result = deserialize_snapshot_data_files_capped(
2702 &snapshot_root_paths,
2703 expected_consumed_size * 2,
2704 |stream| {
2705 Ok(deserialize_from::<_, u32>(
2706 &mut stream.full_snapshot_stream,
2707 )?)
2708 },
2709 );
2710 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2711 }
2712
2713 #[test]
2714 fn test_snapshot_version_from_file_under_limit() {
2715 let file_content = SnapshotVersion::default().as_str();
2716 let mut file = NamedTempFile::new().unwrap();
2717 file.write_all(file_content.as_bytes()).unwrap();
2718 let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2719 assert_eq!(version_from_file, file_content);
2720 }
2721
2722 #[test]
2723 fn test_snapshot_version_from_file_over_limit() {
2724 let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2725 let file_content = vec![7u8; over_limit_size];
2726 let mut file = NamedTempFile::new().unwrap();
2727 file.write_all(&file_content).unwrap();
2728 assert_matches!(
2729 snapshot_version_from_file(file.path()),
2730 Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("snapshot version file too large")
2731 );
2732 }
2733
2734 #[test]
2735 fn test_parse_full_snapshot_archive_filename() {
2736 assert_eq!(
2737 parse_full_snapshot_archive_filename(&format!(
2738 "snapshot-42-{}.tar.bz2",
2739 Hash::default()
2740 ))
2741 .unwrap(),
2742 (42, SnapshotHash(Hash::default()), ArchiveFormat::TarBzip2)
2743 );
2744 assert_eq!(
2745 parse_full_snapshot_archive_filename(&format!(
2746 "snapshot-43-{}.tar.zst",
2747 Hash::default()
2748 ))
2749 .unwrap(),
2750 (
2751 43,
2752 SnapshotHash(Hash::default()),
2753 ArchiveFormat::TarZstd {
2754 config: ZstdConfig::default(),
2755 }
2756 )
2757 );
2758 assert_eq!(
2759 parse_full_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default()))
2760 .unwrap(),
2761 (44, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2762 );
2763 assert_eq!(
2764 parse_full_snapshot_archive_filename(&format!(
2765 "snapshot-45-{}.tar.lz4",
2766 Hash::default()
2767 ))
2768 .unwrap(),
2769 (45, SnapshotHash(Hash::default()), ArchiveFormat::TarLz4)
2770 );
2771
2772 assert!(parse_full_snapshot_archive_filename("invalid").is_err());
2773 assert!(
2774 parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err()
2775 );
2776
2777 assert!(
2778 parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err()
2779 );
2780 assert!(parse_full_snapshot_archive_filename(&format!(
2781 "snapshot-12345678-{}.bad!ext",
2782 Hash::new_unique()
2783 ))
2784 .is_err());
2785 assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2786
2787 assert!(parse_full_snapshot_archive_filename(&format!(
2788 "snapshot-bad!slot-{}.bad!ext",
2789 Hash::new_unique()
2790 ))
2791 .is_err());
2792 assert!(parse_full_snapshot_archive_filename(&format!(
2793 "snapshot-12345678-{}.bad!ext",
2794 Hash::new_unique()
2795 ))
2796 .is_err());
2797 assert!(parse_full_snapshot_archive_filename(&format!(
2798 "snapshot-bad!slot-{}.tar",
2799 Hash::new_unique()
2800 ))
2801 .is_err());
2802
2803 assert!(parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_err());
2804 assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2805 assert!(parse_full_snapshot_archive_filename(&format!(
2806 "snapshot-bad!slot-{}.tar",
2807 Hash::new_unique()
2808 ))
2809 .is_err());
2810 }
2811
2812 #[test]
2813 fn test_parse_incremental_snapshot_archive_filename() {
2814 assert_eq!(
2815 parse_incremental_snapshot_archive_filename(&format!(
2816 "incremental-snapshot-42-123-{}.tar.bz2",
2817 Hash::default()
2818 ))
2819 .unwrap(),
2820 (
2821 42,
2822 123,
2823 SnapshotHash(Hash::default()),
2824 ArchiveFormat::TarBzip2
2825 )
2826 );
2827 assert_eq!(
2828 parse_incremental_snapshot_archive_filename(&format!(
2829 "incremental-snapshot-43-234-{}.tar.zst",
2830 Hash::default()
2831 ))
2832 .unwrap(),
2833 (
2834 43,
2835 234,
2836 SnapshotHash(Hash::default()),
2837 ArchiveFormat::TarZstd {
2838 config: ZstdConfig::default(),
2839 }
2840 )
2841 );
2842 assert_eq!(
2843 parse_incremental_snapshot_archive_filename(&format!(
2844 "incremental-snapshot-44-345-{}.tar",
2845 Hash::default()
2846 ))
2847 .unwrap(),
2848 (44, 345, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2849 );
2850 assert_eq!(
2851 parse_incremental_snapshot_archive_filename(&format!(
2852 "incremental-snapshot-45-456-{}.tar.lz4",
2853 Hash::default()
2854 ))
2855 .unwrap(),
2856 (
2857 45,
2858 456,
2859 SnapshotHash(Hash::default()),
2860 ArchiveFormat::TarLz4
2861 )
2862 );
2863
2864 assert!(parse_incremental_snapshot_archive_filename("invalid").is_err());
2865 assert!(parse_incremental_snapshot_archive_filename(&format!(
2866 "snapshot-42-{}.tar",
2867 Hash::new_unique()
2868 ))
2869 .is_err());
2870 assert!(parse_incremental_snapshot_archive_filename(
2871 "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext"
2872 )
2873 .is_err());
2874
2875 assert!(parse_incremental_snapshot_archive_filename(&format!(
2876 "incremental-snapshot-bad!slot-56785678-{}.tar",
2877 Hash::new_unique()
2878 ))
2879 .is_err());
2880
2881 assert!(parse_incremental_snapshot_archive_filename(&format!(
2882 "incremental-snapshot-12345678-bad!slot-{}.tar",
2883 Hash::new_unique()
2884 ))
2885 .is_err());
2886
2887 assert!(parse_incremental_snapshot_archive_filename(
2888 "incremental-snapshot-12341234-56785678-bad!HASH.tar"
2889 )
2890 .is_err());
2891
2892 assert!(parse_incremental_snapshot_archive_filename(&format!(
2893 "incremental-snapshot-12341234-56785678-{}.bad!ext",
2894 Hash::new_unique()
2895 ))
2896 .is_err());
2897 }
2898
2899 #[test]
2900 fn test_check_are_snapshots_compatible() {
2901 let slot1: Slot = 1234;
2902 let slot2: Slot = 5678;
2903 let slot3: Slot = 999_999;
2904
2905 let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
2906 format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()),
2907 ))
2908 .unwrap();
2909
2910 assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
2911
2912 let incremental_snapshot_archive_info =
2913 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2914 "/dir/incremental-snapshot-{}-{}-{}.tar",
2915 slot1,
2916 slot2,
2917 Hash::new_unique()
2918 )))
2919 .unwrap();
2920
2921 assert!(check_are_snapshots_compatible(
2922 &full_snapshot_archive_info,
2923 Some(&incremental_snapshot_archive_info)
2924 )
2925 .is_ok());
2926
2927 let incremental_snapshot_archive_info =
2928 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2929 "/dir/incremental-snapshot-{}-{}-{}.tar",
2930 slot2,
2931 slot3,
2932 Hash::new_unique()
2933 )))
2934 .unwrap();
2935
2936 assert!(check_are_snapshots_compatible(
2937 &full_snapshot_archive_info,
2938 Some(&incremental_snapshot_archive_info)
2939 )
2940 .is_err());
2941 }
2942
2943 fn common_create_bank_snapshot_files(
2945 bank_snapshots_dir: &Path,
2946 min_slot: Slot,
2947 max_slot: Slot,
2948 ) {
2949 for slot in min_slot..max_slot {
2950 let snapshot_dir = get_bank_snapshot_dir(bank_snapshots_dir, slot);
2951 fs::create_dir_all(&snapshot_dir).unwrap();
2952
2953 let snapshot_filename = get_snapshot_file_name(slot);
2954 let snapshot_path = snapshot_dir.join(snapshot_filename);
2955 fs::File::create(snapshot_path).unwrap();
2956
2957 let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2958 fs::File::create(status_cache_file).unwrap();
2959
2960 let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
2961 fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
2962
2963 let state_complete_path = snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2965 fs::File::create(state_complete_path).unwrap();
2966 }
2967 }
2968
2969 #[test]
2970 fn test_get_bank_snapshots() {
2971 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2972 let min_slot = 10;
2973 let max_slot = 20;
2974 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2975
2976 let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
2977 assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
2978 }
2979
2980 #[test]
2981 fn test_get_highest_bank_snapshot_post() {
2982 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2983 let min_slot = 99;
2984 let max_slot = 123;
2985 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2986
2987 let highest_bank_snapshot = get_highest_bank_snapshot_post(temp_snapshots_dir.path());
2988 assert!(highest_bank_snapshot.is_some());
2989 assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
2990 }
2991
2992 fn common_create_snapshot_archive_files(
2998 full_snapshot_archives_dir: &Path,
2999 incremental_snapshot_archives_dir: &Path,
3000 min_full_snapshot_slot: Slot,
3001 max_full_snapshot_slot: Slot,
3002 min_incremental_snapshot_slot: Slot,
3003 max_incremental_snapshot_slot: Slot,
3004 ) {
3005 fs::create_dir_all(full_snapshot_archives_dir).unwrap();
3006 fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
3007 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3008 for incremental_snapshot_slot in
3009 min_incremental_snapshot_slot..max_incremental_snapshot_slot
3010 {
3011 let snapshot_filename = format!(
3012 "incremental-snapshot-{}-{}-{}.tar",
3013 full_snapshot_slot,
3014 incremental_snapshot_slot,
3015 Hash::default()
3016 );
3017 let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
3018 fs::File::create(snapshot_filepath).unwrap();
3019 }
3020
3021 let snapshot_filename =
3022 format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3023 let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
3024 fs::File::create(snapshot_filepath).unwrap();
3025
3026 let bad_filename = format!(
3028 "incremental-snapshot-{}-{}-bad!hash.tar",
3029 full_snapshot_slot,
3030 max_incremental_snapshot_slot + 1,
3031 );
3032 let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
3033 fs::File::create(bad_filepath).unwrap();
3034 }
3035
3036 let bad_filename = format!("snapshot-{}-bad!hash.tar", max_full_snapshot_slot + 1);
3039 let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
3040 fs::File::create(bad_filepath).unwrap();
3041 }
3042
3043 #[test]
3044 fn test_get_full_snapshot_archives() {
3045 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3046 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3047 let min_slot = 123;
3048 let max_slot = 456;
3049 common_create_snapshot_archive_files(
3050 full_snapshot_archives_dir.path(),
3051 incremental_snapshot_archives_dir.path(),
3052 min_slot,
3053 max_slot,
3054 0,
3055 0,
3056 );
3057
3058 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3059 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3060 }
3061
3062 #[test]
3063 fn test_get_full_snapshot_archives_remote() {
3064 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3065 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3066 let min_slot = 123;
3067 let max_slot = 456;
3068 common_create_snapshot_archive_files(
3069 &full_snapshot_archives_dir
3070 .path()
3071 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3072 &incremental_snapshot_archives_dir
3073 .path()
3074 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3075 min_slot,
3076 max_slot,
3077 0,
3078 0,
3079 );
3080
3081 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3082 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3083 assert!(snapshot_archives.iter().all(|info| info.is_remote()));
3084 }
3085
3086 #[test]
3087 fn test_get_incremental_snapshot_archives() {
3088 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3089 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3090 let min_full_snapshot_slot = 12;
3091 let max_full_snapshot_slot = 23;
3092 let min_incremental_snapshot_slot = 34;
3093 let max_incremental_snapshot_slot = 45;
3094 common_create_snapshot_archive_files(
3095 full_snapshot_archives_dir.path(),
3096 incremental_snapshot_archives_dir.path(),
3097 min_full_snapshot_slot,
3098 max_full_snapshot_slot,
3099 min_incremental_snapshot_slot,
3100 max_incremental_snapshot_slot,
3101 );
3102
3103 let incremental_snapshot_archives =
3104 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3105 assert_eq!(
3106 incremental_snapshot_archives.len() as Slot,
3107 (max_full_snapshot_slot - min_full_snapshot_slot)
3108 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3109 );
3110 }
3111
3112 #[test]
3113 fn test_get_incremental_snapshot_archives_remote() {
3114 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3115 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3116 let min_full_snapshot_slot = 12;
3117 let max_full_snapshot_slot = 23;
3118 let min_incremental_snapshot_slot = 34;
3119 let max_incremental_snapshot_slot = 45;
3120 common_create_snapshot_archive_files(
3121 &full_snapshot_archives_dir
3122 .path()
3123 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3124 &incremental_snapshot_archives_dir
3125 .path()
3126 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3127 min_full_snapshot_slot,
3128 max_full_snapshot_slot,
3129 min_incremental_snapshot_slot,
3130 max_incremental_snapshot_slot,
3131 );
3132
3133 let incremental_snapshot_archives =
3134 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3135 assert_eq!(
3136 incremental_snapshot_archives.len() as Slot,
3137 (max_full_snapshot_slot - min_full_snapshot_slot)
3138 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3139 );
3140 assert!(incremental_snapshot_archives
3141 .iter()
3142 .all(|info| info.is_remote()));
3143 }
3144
3145 #[test]
3146 fn test_get_highest_full_snapshot_archive_slot() {
3147 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3148 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3149 let min_slot = 123;
3150 let max_slot = 456;
3151 common_create_snapshot_archive_files(
3152 full_snapshot_archives_dir.path(),
3153 incremental_snapshot_archives_dir.path(),
3154 min_slot,
3155 max_slot,
3156 0,
3157 0,
3158 );
3159
3160 assert_eq!(
3161 get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
3162 Some(max_slot - 1)
3163 );
3164 }
3165
3166 #[test]
3167 fn test_get_highest_incremental_snapshot_slot() {
3168 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3169 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3170 let min_full_snapshot_slot = 12;
3171 let max_full_snapshot_slot = 23;
3172 let min_incremental_snapshot_slot = 34;
3173 let max_incremental_snapshot_slot = 45;
3174 common_create_snapshot_archive_files(
3175 full_snapshot_archives_dir.path(),
3176 incremental_snapshot_archives_dir.path(),
3177 min_full_snapshot_slot,
3178 max_full_snapshot_slot,
3179 min_incremental_snapshot_slot,
3180 max_incremental_snapshot_slot,
3181 );
3182
3183 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3184 assert_eq!(
3185 get_highest_incremental_snapshot_archive_slot(
3186 incremental_snapshot_archives_dir.path(),
3187 full_snapshot_slot
3188 ),
3189 Some(max_incremental_snapshot_slot - 1)
3190 );
3191 }
3192
3193 assert_eq!(
3194 get_highest_incremental_snapshot_archive_slot(
3195 incremental_snapshot_archives_dir.path(),
3196 max_full_snapshot_slot
3197 ),
3198 None
3199 );
3200 }
3201
3202 fn common_test_purge_old_snapshot_archives(
3203 snapshot_names: &[&String],
3204 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
3205 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
3206 expected_snapshots: &[&String],
3207 ) {
3208 let temp_snap_dir = tempfile::TempDir::new().unwrap();
3209
3210 for snap_name in snapshot_names {
3211 let snap_path = temp_snap_dir.path().join(snap_name);
3212 let mut _snap_file = fs::File::create(snap_path);
3213 }
3214 purge_old_snapshot_archives(
3215 temp_snap_dir.path(),
3216 temp_snap_dir.path(),
3217 maximum_full_snapshot_archives_to_retain,
3218 maximum_incremental_snapshot_archives_to_retain,
3219 );
3220
3221 let mut retained_snaps = HashSet::new();
3222 for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
3223 let entry_path_buf = entry.unwrap().path();
3224 let entry_path = entry_path_buf.as_path();
3225 let snapshot_name = entry_path
3226 .file_name()
3227 .unwrap()
3228 .to_str()
3229 .unwrap()
3230 .to_string();
3231 retained_snaps.insert(snapshot_name);
3232 }
3233
3234 for snap_name in expected_snapshots {
3235 assert!(
3236 retained_snaps.contains(snap_name.as_str()),
3237 "{snap_name} not found"
3238 );
3239 }
3240 assert_eq!(retained_snaps.len(), expected_snapshots.len());
3241 }
3242
3243 #[test]
3244 fn test_purge_old_full_snapshot_archives() {
3245 let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
3246 let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
3247 let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
3248 let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
3249
3250 let expected_snapshots = vec![&snap3_name];
3252 common_test_purge_old_snapshot_archives(
3253 &snapshot_names,
3254 NonZeroUsize::new(1).unwrap(),
3255 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3256 &expected_snapshots,
3257 );
3258
3259 let expected_snapshots = vec![&snap2_name, &snap3_name];
3261 common_test_purge_old_snapshot_archives(
3262 &snapshot_names,
3263 NonZeroUsize::new(2).unwrap(),
3264 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3265 &expected_snapshots,
3266 );
3267
3268 let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
3270 common_test_purge_old_snapshot_archives(
3271 &snapshot_names,
3272 NonZeroUsize::new(3).unwrap(),
3273 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3274 &expected_snapshots,
3275 );
3276 }
3277
3278 #[test]
3282 fn test_purge_old_full_snapshot_archives_in_the_loop() {
3283 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3284 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3285 let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
3286 let starting_slot: Slot = 42;
3287
3288 for slot in (starting_slot..).take(100) {
3289 let full_snapshot_archive_file_name =
3290 format!("snapshot-{}-{}.tar", slot, Hash::default());
3291 let full_snapshot_archive_path = full_snapshot_archives_dir
3292 .as_ref()
3293 .join(full_snapshot_archive_file_name);
3294 fs::File::create(full_snapshot_archive_path).unwrap();
3295
3296 if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
3298 continue;
3299 }
3300
3301 if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
3303 continue;
3304 }
3305
3306 purge_old_snapshot_archives(
3307 &full_snapshot_archives_dir,
3308 &incremental_snapshot_archives_dir,
3309 maximum_snapshots_to_retain,
3310 NonZeroUsize::new(usize::MAX).unwrap(),
3311 );
3312 let mut full_snapshot_archives =
3313 get_full_snapshot_archives(&full_snapshot_archives_dir);
3314 full_snapshot_archives.sort_unstable();
3315 assert_eq!(
3316 full_snapshot_archives.len(),
3317 maximum_snapshots_to_retain.get()
3318 );
3319 assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
3320 for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
3321 assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
3322 }
3323 }
3324 }
3325
3326 #[test]
3327 fn test_purge_old_incremental_snapshot_archives() {
3328 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3329 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3330 let starting_slot = 100_000;
3331
3332 let maximum_incremental_snapshot_archives_to_retain =
3333 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3334 let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3335
3336 let incremental_snapshot_interval = 100;
3337 let num_incremental_snapshots_per_full_snapshot =
3338 maximum_incremental_snapshot_archives_to_retain.get() * 2;
3339 let full_snapshot_interval =
3340 incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
3341
3342 let mut snapshot_filenames = vec![];
3343 (starting_slot..)
3344 .step_by(full_snapshot_interval)
3345 .take(
3346 maximum_full_snapshot_archives_to_retain
3347 .checked_mul(NonZeroUsize::new(2).unwrap())
3348 .unwrap()
3349 .get(),
3350 )
3351 .for_each(|full_snapshot_slot| {
3352 let snapshot_filename =
3353 format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3354 let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
3355 fs::File::create(snapshot_path).unwrap();
3356 snapshot_filenames.push(snapshot_filename);
3357
3358 (full_snapshot_slot..)
3359 .step_by(incremental_snapshot_interval)
3360 .take(num_incremental_snapshots_per_full_snapshot)
3361 .skip(1)
3362 .for_each(|incremental_snapshot_slot| {
3363 let snapshot_filename = format!(
3364 "incremental-snapshot-{}-{}-{}.tar",
3365 full_snapshot_slot,
3366 incremental_snapshot_slot,
3367 Hash::default()
3368 );
3369 let snapshot_path = incremental_snapshot_archives_dir
3370 .path()
3371 .join(&snapshot_filename);
3372 fs::File::create(snapshot_path).unwrap();
3373 snapshot_filenames.push(snapshot_filename);
3374 });
3375 });
3376
3377 purge_old_snapshot_archives(
3378 full_snapshot_archives_dir.path(),
3379 incremental_snapshot_archives_dir.path(),
3380 maximum_full_snapshot_archives_to_retain,
3381 maximum_incremental_snapshot_archives_to_retain,
3382 );
3383
3384 let mut remaining_full_snapshot_archives =
3386 get_full_snapshot_archives(full_snapshot_archives_dir.path());
3387 assert_eq!(
3388 remaining_full_snapshot_archives.len(),
3389 maximum_full_snapshot_archives_to_retain.get(),
3390 );
3391 remaining_full_snapshot_archives.sort_unstable();
3392 let latest_full_snapshot_archive_slot =
3393 remaining_full_snapshot_archives.last().unwrap().slot();
3394
3395 let mut remaining_incremental_snapshot_archives =
3400 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3401 assert_eq!(
3402 remaining_incremental_snapshot_archives.len(),
3403 maximum_incremental_snapshot_archives_to_retain
3404 .get()
3405 .saturating_add(
3406 maximum_full_snapshot_archives_to_retain
3407 .get()
3408 .saturating_sub(1)
3409 )
3410 );
3411 remaining_incremental_snapshot_archives.sort_unstable();
3412 remaining_incremental_snapshot_archives.reverse();
3413
3414 for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
3416 let incremental_snapshot_archive =
3417 remaining_incremental_snapshot_archives.pop().unwrap();
3418
3419 let expected_base_slot =
3420 latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
3421 assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
3422 let expected_slot = expected_base_slot
3423 + (full_snapshot_interval - incremental_snapshot_interval) as u64;
3424 assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
3425 }
3426
3427 for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
3429 assert_eq!(
3430 incremental_snapshot_archive.base_slot(),
3431 latest_full_snapshot_archive_slot
3432 );
3433 }
3434
3435 let expected_remaining_incremental_snapshot_archive_slots =
3437 (latest_full_snapshot_archive_slot..)
3438 .step_by(incremental_snapshot_interval)
3439 .take(num_incremental_snapshots_per_full_snapshot)
3440 .skip(
3441 num_incremental_snapshots_per_full_snapshot
3442 - maximum_incremental_snapshot_archives_to_retain.get(),
3443 )
3444 .collect::<HashSet<_>>();
3445
3446 let actual_remaining_incremental_snapshot_archive_slots =
3447 remaining_incremental_snapshot_archives
3448 .iter()
3449 .map(|snapshot| snapshot.slot())
3450 .collect::<HashSet<_>>();
3451 assert_eq!(
3452 actual_remaining_incremental_snapshot_archive_slots,
3453 expected_remaining_incremental_snapshot_archive_slots
3454 );
3455 }
3456
3457 #[test]
3458 fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
3459 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3460 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3461
3462 for snapshot_filenames in [
3463 format!("incremental-snapshot-100-120-{}.tar", Hash::default()),
3464 format!("incremental-snapshot-100-140-{}.tar", Hash::default()),
3465 format!("incremental-snapshot-100-160-{}.tar", Hash::default()),
3466 format!("incremental-snapshot-100-180-{}.tar", Hash::default()),
3467 format!("incremental-snapshot-200-220-{}.tar", Hash::default()),
3468 format!("incremental-snapshot-200-240-{}.tar", Hash::default()),
3469 format!("incremental-snapshot-200-260-{}.tar", Hash::default()),
3470 format!("incremental-snapshot-200-280-{}.tar", Hash::default()),
3471 ] {
3472 let snapshot_path = incremental_snapshot_archives_dir
3473 .path()
3474 .join(snapshot_filenames);
3475 fs::File::create(snapshot_path).unwrap();
3476 }
3477
3478 purge_old_snapshot_archives(
3479 full_snapshot_archives_dir.path(),
3480 incremental_snapshot_archives_dir.path(),
3481 NonZeroUsize::new(usize::MAX).unwrap(),
3482 NonZeroUsize::new(usize::MAX).unwrap(),
3483 );
3484
3485 let remaining_incremental_snapshot_archives =
3486 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3487 assert!(remaining_incremental_snapshot_archives.is_empty());
3488 }
3489
3490 #[test]
3491 fn test_get_snapshot_accounts_hardlink_dir() {
3492 let slot: Slot = 1;
3493
3494 let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
3495
3496 let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
3497 let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
3498 let accounts_hardlinks_dir = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
3499 fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
3500
3501 let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
3502 let appendvec_filename = format!("{slot}.0");
3503 let appendvec_path = accounts_dir.join(appendvec_filename);
3504
3505 let ret = get_snapshot_accounts_hardlink_dir(
3506 &appendvec_path,
3507 slot,
3508 &mut account_paths_set,
3509 &accounts_hardlinks_dir,
3510 );
3511 assert!(ret.is_ok());
3512
3513 let wrong_appendvec_path = appendvec_path
3514 .parent()
3515 .unwrap()
3516 .parent()
3517 .unwrap()
3518 .join(appendvec_path.file_name().unwrap());
3519 let ret = get_snapshot_accounts_hardlink_dir(
3520 &wrong_appendvec_path,
3521 slot,
3522 &mut account_paths_set,
3523 accounts_hardlinks_dir,
3524 );
3525
3526 assert_matches!(
3527 ret,
3528 Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
3529 );
3530 }
3531
3532 #[test]
3533 fn test_full_snapshot_slot_file_good() {
3534 let slot_written = 123_456_789;
3535 let bank_snapshot_dir = TempDir::new().unwrap();
3536 write_full_snapshot_slot_file(&bank_snapshot_dir, slot_written).unwrap();
3537
3538 let slot_read = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap();
3539 assert_eq!(slot_read, slot_written);
3540 }
3541
3542 #[test]
3543 fn test_full_snapshot_slot_file_bad() {
3544 const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
3545 let too_small = [1u8; SLOT_SIZE - 1];
3546 let too_large = [1u8; SLOT_SIZE + 1];
3547
3548 for contents in [too_small.as_slice(), too_large.as_slice()] {
3549 let bank_snapshot_dir = TempDir::new().unwrap();
3550 let full_snapshot_slot_path = bank_snapshot_dir
3551 .as_ref()
3552 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
3553 fs::write(full_snapshot_slot_path, contents).unwrap();
3554
3555 let err = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap_err();
3556 assert!(err
3557 .to_string()
3558 .starts_with("invalid full snapshot slot file size"));
3559 }
3560 }
3561}