1use {
2 crate::{
3 bank::{BankFieldsToSerialize, BankSlotDelta},
4 serde_snapshot::{
5 self, BankIncrementalSnapshotPersistence, ExtraFieldsToSerialize, SnapshotStreams,
6 },
7 snapshot_archive_info::{
8 FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfo,
9 SnapshotArchiveInfoGetter,
10 },
11 snapshot_bank_utils,
12 snapshot_config::SnapshotConfig,
13 snapshot_hash::SnapshotHash,
14 snapshot_package::{SnapshotKind, SnapshotPackage},
15 snapshot_utils::snapshot_storage_rebuilder::{
16 RebuiltSnapshotStorage, SnapshotStorageRebuilder,
17 },
18 },
19 bzip2::bufread::BzDecoder,
20 crossbeam_channel::Sender,
21 flate2::read::GzDecoder,
22 lazy_static::lazy_static,
23 log::*,
24 regex::Regex,
25 solana_accounts_db::{
26 account_storage::{meta::StoredMetaWriteVersion, AccountStorageMap},
27 accounts_db::{stats::BankHashStats, AccountStorageEntry, AtomicAccountsFileId},
28 accounts_file::{AccountsFile, AccountsFileError, InternalsForArchive, StorageAccess},
29 accounts_hash::{AccountsDeltaHash, AccountsHash},
30 epoch_accounts_hash::EpochAccountsHash,
31 hardened_unpack::{self, ParallelSelector, UnpackError},
32 shared_buffer_reader::{SharedBuffer, SharedBufferReader},
33 utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
34 },
35 solana_measure::{measure::Measure, measure_time, measure_us},
36 solana_sdk::{
37 clock::{Epoch, Slot},
38 hash::Hash,
39 },
40 std::{
41 cmp::Ordering,
42 collections::{HashMap, HashSet},
43 fmt, fs,
44 io::{BufReader, BufWriter, Error as IoError, Read, Result as IoResult, Seek, Write},
45 mem,
46 num::NonZeroUsize,
47 ops::RangeInclusive,
48 path::{Path, PathBuf},
49 process::ExitStatus,
50 str::FromStr,
51 sync::Arc,
52 thread::{Builder, JoinHandle},
53 },
54 tar::{self, Archive},
55 tempfile::TempDir,
56 thiserror::Error,
57};
58#[cfg(feature = "dev-context-only-utils")]
59use {
60 hardened_unpack::UnpackedAppendVecMap, rayon::prelude::*,
61 solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
62};
63
64mod archive_format;
65pub mod snapshot_storage_rebuilder;
66pub use archive_format::*;
67
68pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
69pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
70pub const SNAPSHOT_STATE_COMPLETE_FILENAME: &str = "state_complete";
71pub const SNAPSHOT_ACCOUNTS_HARDLINKS: &str = "accounts_hardlinks";
72pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
73pub const SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME: &str = "full_snapshot_slot";
74pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; const VERSION_STRING_V1_2_0: &str = "1.2.0";
77pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
78pub const BANK_SNAPSHOT_PRE_FILENAME_EXTENSION: &str = "pre";
79pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
84 unsafe { NonZeroUsize::new_unchecked(2) };
85pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
86 unsafe { NonZeroUsize::new_unchecked(4) };
87pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
88pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
89
90#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
91pub enum SnapshotVersion {
92 #[default]
93 V1_2_0,
94}
95
96impl fmt::Display for SnapshotVersion {
97 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
98 f.write_str(From::from(*self))
99 }
100}
101
102impl From<SnapshotVersion> for &'static str {
103 fn from(snapshot_version: SnapshotVersion) -> &'static str {
104 match snapshot_version {
105 SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
106 }
107 }
108}
109
110impl FromStr for SnapshotVersion {
111 type Err = &'static str;
112
113 fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
114 let version_string = if version_string
116 .get(..1)
117 .map_or(false, |s| s.eq_ignore_ascii_case("v"))
118 {
119 &version_string[1..]
120 } else {
121 version_string
122 };
123 match version_string {
124 VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
125 _ => Err("unsupported snapshot version"),
126 }
127 }
128}
129
130impl SnapshotVersion {
131 pub fn as_str(self) -> &'static str {
132 <&str as From<Self>>::from(self)
133 }
134}
135
136#[derive(PartialEq, Eq, Debug)]
139pub struct BankSnapshotInfo {
140 pub slot: Slot,
142 pub snapshot_kind: BankSnapshotKind,
144 pub snapshot_dir: PathBuf,
146 pub snapshot_version: SnapshotVersion,
148}
149
150impl PartialOrd for BankSnapshotInfo {
151 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
152 Some(self.cmp(other))
153 }
154}
155
156impl Ord for BankSnapshotInfo {
158 fn cmp(&self, other: &Self) -> Ordering {
159 self.slot.cmp(&other.slot)
160 }
161}
162
163impl BankSnapshotInfo {
164 pub fn new_from_dir(
165 bank_snapshots_dir: impl AsRef<Path>,
166 slot: Slot,
167 ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
168 let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
171
172 if !bank_snapshot_dir.is_dir() {
173 return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
174 bank_snapshot_dir,
175 ));
176 }
177
178 if !is_bank_snapshot_complete(&bank_snapshot_dir) {
184 return Err(SnapshotNewFromDirError::IncompleteDir(bank_snapshot_dir));
185 }
186
187 let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
188 if !status_cache_file.is_file() {
189 return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
190 status_cache_file,
191 ));
192 }
193
194 let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
195 let version_str = snapshot_version_from_file(&version_path).or(Err(
196 SnapshotNewFromDirError::MissingVersionFile(version_path),
197 ))?;
198 let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
199 .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
200
201 let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
202 let bank_snapshot_pre_path =
203 bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
204
205 let snapshot_kind = if bank_snapshot_pre_path.is_file() {
214 BankSnapshotKind::Pre
215 } else if bank_snapshot_post_path.is_file() {
216 BankSnapshotKind::Post
217 } else {
218 return Err(SnapshotNewFromDirError::MissingSnapshotFile(
219 bank_snapshot_dir,
220 ));
221 };
222
223 Ok(BankSnapshotInfo {
224 slot,
225 snapshot_kind,
226 snapshot_dir: bank_snapshot_dir,
227 snapshot_version,
228 })
229 }
230
231 pub fn snapshot_path(&self) -> PathBuf {
232 let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot));
233
234 let ext = match self.snapshot_kind {
235 BankSnapshotKind::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
236 BankSnapshotKind::Post => "",
237 };
238 bank_snapshot_path.set_extension(ext);
239
240 bank_snapshot_path
241 }
242}
243
244#[derive(Debug, Copy, Clone, Eq, PartialEq)]
253pub enum BankSnapshotKind {
254 Pre,
256 Post,
258}
259
260#[derive(Clone, Copy, Debug, Eq, PartialEq)]
264pub enum SnapshotFrom {
265 Archive,
267 Dir,
269}
270
271#[derive(Debug)]
274pub struct SnapshotRootPaths {
275 pub full_snapshot_root_file_path: PathBuf,
276 pub incremental_snapshot_root_file_path: Option<PathBuf>,
277}
278
279#[derive(Debug)]
281pub struct UnarchivedSnapshot {
282 #[allow(dead_code)]
283 unpack_dir: TempDir,
284 pub storage: AccountStorageMap,
285 pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
286 pub measure_untar: Measure,
287}
288
289#[derive(Debug)]
291pub struct UnpackedSnapshotsDirAndVersion {
292 pub unpacked_snapshots_dir: PathBuf,
293 pub snapshot_version: SnapshotVersion,
294}
295
296pub(crate) struct StorageAndNextAccountsFileId {
299 pub storage: AccountStorageMap,
300 pub next_append_vec_id: AtomicAccountsFileId,
301}
302
303#[derive(Error, Debug)]
304#[allow(clippy::large_enum_variant)]
305pub enum SnapshotError {
306 #[error("I/O error: {0}")]
307 Io(#[from] IoError),
308
309 #[error("AccountsFile error: {0}")]
310 AccountsFileError(#[from] AccountsFileError),
311
312 #[error("serialization error: {0}")]
313 Serialize(#[from] bincode::Error),
314
315 #[error("crossbeam send error: {0}")]
316 CrossbeamSend(#[from] crossbeam_channel::SendError<PathBuf>),
317
318 #[error("archive generation failure {0}")]
319 ArchiveGenerationFailure(ExitStatus),
320
321 #[error("Unpack error: {0}")]
322 UnpackError(#[from] UnpackError),
323
324 #[error("source({1}) - I/O error: {0}")]
325 IoWithSource(IoError, &'static str),
326
327 #[error("could not get file name from path '{0}'")]
328 PathToFileNameError(PathBuf),
329
330 #[error("could not get str from file name '{0}'")]
331 FileNameToStrError(PathBuf),
332
333 #[error("could not parse snapshot archive's file name '{0}'")]
334 ParseSnapshotArchiveFileNameError(String),
335
336 #[error("snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot ({1}) do not match")]
337 MismatchedBaseSlot(Slot, Slot),
338
339 #[error("no snapshot archives to load from '{0}'")]
340 NoSnapshotArchives(PathBuf),
341
342 #[error("snapshot has mismatch: deserialized bank: {0:?}, snapshot archive info: {1:?}")]
343 MismatchedSlotHash((Slot, SnapshotHash), (Slot, SnapshotHash)),
344
345 #[error("snapshot slot deltas are invalid: {0}")]
346 VerifySlotDeltas(#[from] VerifySlotDeltasError),
347
348 #[error("snapshot epoch stakes are invalid: {0}")]
349 VerifyEpochStakes(#[from] VerifyEpochStakesError),
350
351 #[error("bank_snapshot_info new_from_dir failed: {0}")]
352 NewFromDir(#[from] SnapshotNewFromDirError),
353
354 #[error("invalid snapshot dir path '{0}'")]
355 InvalidSnapshotDirPath(PathBuf),
356
357 #[error("invalid AppendVec path '{0}'")]
358 InvalidAppendVecPath(PathBuf),
359
360 #[error("invalid account path '{0}'")]
361 InvalidAccountPath(PathBuf),
362
363 #[error("no valid snapshot dir found under '{0}'")]
364 NoSnapshotSlotDir(PathBuf),
365
366 #[error("snapshot dir account paths mismatching")]
367 AccountPathsMismatch,
368
369 #[error("failed to add bank snapshot for slot {1}: {0}")]
370 AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
371
372 #[error("failed to archive snapshot package: {0}")]
373 ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
374
375 #[error("failed to rebuild snapshot storages: {0}")]
376 RebuildStorages(String),
377}
378
379#[derive(Error, Debug)]
380pub enum SnapshotNewFromDirError {
381 #[error("invalid bank snapshot directory '{0}'")]
382 InvalidBankSnapshotDir(PathBuf),
383
384 #[error("missing status cache file '{0}'")]
385 MissingStatusCacheFile(PathBuf),
386
387 #[error("missing version file '{0}'")]
388 MissingVersionFile(PathBuf),
389
390 #[error("invalid snapshot version '{0}'")]
391 InvalidVersion(String),
392
393 #[error("snapshot directory incomplete '{0}'")]
394 IncompleteDir(PathBuf),
395
396 #[error("missing snapshot file '{0}'")]
397 MissingSnapshotFile(PathBuf),
398}
399
400pub type Result<T> = std::result::Result<T, SnapshotError>;
401
402#[derive(Error, Debug, PartialEq, Eq)]
404pub enum VerifySlotDeltasError {
405 #[error("too many entries: {0} (max: {1})")]
406 TooManyEntries(usize, usize),
407
408 #[error("slot {0} is not a root")]
409 SlotIsNotRoot(Slot),
410
411 #[error("slot {0} is greater than bank slot {1}")]
412 SlotGreaterThanMaxRoot(Slot, Slot),
413
414 #[error("slot {0} has multiple entries")]
415 SlotHasMultipleEntries(Slot),
416
417 #[error("slot {0} was not found in slot history")]
418 SlotNotFoundInHistory(Slot),
419
420 #[error("slot {0} was in history but missing from slot deltas")]
421 SlotNotFoundInDeltas(Slot),
422
423 #[error("slot history is bad and cannot be used to verify slot deltas")]
424 BadSlotHistory,
425}
426
427#[derive(Error, Debug, PartialEq, Eq)]
429pub enum VerifyEpochStakesError {
430 #[error("epoch {0} is greater than the max {1}")]
431 EpochGreaterThanMax(Epoch, Epoch),
432
433 #[error("stakes not found for epoch {0} (required epochs: {1:?})")]
434 StakesNotFound(Epoch, RangeInclusive<Epoch>),
435}
436
437#[derive(Error, Debug)]
439pub enum AddBankSnapshotError {
440 #[error("bank snapshot dir already exists '{0}'")]
441 SnapshotDirAlreadyExists(PathBuf),
442
443 #[error("failed to create snapshot dir '{1}': {0}")]
444 CreateSnapshotDir(#[source] IoError, PathBuf),
445
446 #[error("failed to flush storage '{1}': {0}")]
447 FlushStorage(#[source] AccountsFileError, PathBuf),
448
449 #[error("failed to hard link storages: {0}")]
450 HardLinkStorages(#[source] HardLinkStoragesToSnapshotError),
451
452 #[error("failed to serialize bank: {0}")]
453 SerializeBank(#[source] Box<SnapshotError>),
454
455 #[error("failed to serialize status cache: {0}")]
456 SerializeStatusCache(#[source] Box<SnapshotError>),
457
458 #[error("failed to write snapshot version file '{1}': {0}")]
459 WriteSnapshotVersionFile(#[source] IoError, PathBuf),
460
461 #[error("failed to mark snapshot as 'complete': failed to create file '{1}': {0}")]
462 CreateStateCompleteFile(#[source] IoError, PathBuf),
463}
464
465#[derive(Error, Debug)]
467pub enum ArchiveSnapshotPackageError {
468 #[error("failed to create archive path '{1}': {0}")]
469 CreateArchiveDir(#[source] IoError, PathBuf),
470
471 #[error("failed to create staging dir inside '{1}': {0}")]
472 CreateStagingDir(#[source] IoError, PathBuf),
473
474 #[error("failed to create snapshot staging dir '{1}': {0}")]
475 CreateSnapshotStagingDir(#[source] IoError, PathBuf),
476
477 #[error("failed to canonicalize snapshot source dir '{1}': {0}")]
478 CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),
479
480 #[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
481 SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),
482
483 #[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
484 SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),
485
486 #[error("failed to symlink version file from '{1}' to '{2}': {0}")]
487 SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),
488
489 #[error("failed to create archive file '{1}': {0}")]
490 CreateArchiveFile(#[source] IoError, PathBuf),
491
492 #[error("failed to archive version file: {0}")]
493 ArchiveVersionFile(#[source] IoError),
494
495 #[error("failed to archive snapshots dir: {0}")]
496 ArchiveSnapshotsDir(#[source] IoError),
497
498 #[error("failed to archive account storage file '{1}': {0}")]
499 ArchiveAccountStorageFile(#[source] IoError, PathBuf),
500
501 #[error("failed to archive snapshot: {0}")]
502 FinishArchive(#[source] IoError),
503
504 #[error("failed to create encoder: {0}")]
505 CreateEncoder(#[source] IoError),
506
507 #[error("failed to encode archive: {0}")]
508 FinishEncoder(#[source] IoError),
509
510 #[error("failed to query archive metadata '{1}': {0}")]
511 QueryArchiveMetadata(#[source] IoError, PathBuf),
512
513 #[error("failed to move archive from '{1}' to '{2}': {0}")]
514 MoveArchive(#[source] IoError, PathBuf, PathBuf),
515}
516
517#[derive(Error, Debug)]
519pub enum HardLinkStoragesToSnapshotError {
520 #[error("failed to create accounts hard links dir '{1}': {0}")]
521 CreateAccountsHardLinksDir(#[source] IoError, PathBuf),
522
523 #[error("failed to get the snapshot's accounts hard link dir: {0}")]
524 GetSnapshotHardLinksDir(#[from] GetSnapshotAccountsHardLinkDirError),
525
526 #[error("failed to hard link storage from '{1}' to '{2}': {0}")]
527 HardLinkStorage(#[source] IoError, PathBuf, PathBuf),
528}
529
530#[derive(Error, Debug)]
532pub enum GetSnapshotAccountsHardLinkDirError {
533 #[error("invalid account storage path '{0}'")]
534 GetAccountPath(PathBuf),
535
536 #[error("failed to create the snapshot hard link dir '{1}': {0}")]
537 CreateSnapshotHardLinkDir(#[source] IoError, PathBuf),
538
539 #[error("failed to symlink snapshot hard link dir '{link}' to '{original}': {source}")]
540 SymlinkSnapshotHardLinkDir {
541 source: IoError,
542 original: PathBuf,
543 link: PathBuf,
544 },
545}
546
547pub fn clean_orphaned_account_snapshot_dirs(
555 bank_snapshots_dir: impl AsRef<Path>,
556 account_snapshot_paths: &[PathBuf],
557) -> IoResult<()> {
558 let mut account_snapshot_dirs_referenced = HashSet::new();
561 let snapshots = get_bank_snapshots(bank_snapshots_dir);
562 for snapshot in snapshots {
563 let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
564 let read_dir = fs::read_dir(&account_hardlinks_dir).map_err(|err| {
566 IoError::other(format!(
567 "failed to read account hardlinks dir '{}': {err}",
568 account_hardlinks_dir.display(),
569 ))
570 })?;
571 for entry in read_dir {
572 let path = entry?.path();
573 let target = fs::read_link(&path).map_err(|err| {
574 IoError::other(format!(
575 "failed to read symlink '{}': {err}",
576 path.display(),
577 ))
578 })?;
579 account_snapshot_dirs_referenced.insert(target);
580 }
581 }
582
583 for account_snapshot_path in account_snapshot_paths {
585 let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
586 IoError::other(format!(
587 "failed to read account snapshot dir '{}': {err}",
588 account_snapshot_path.display(),
589 ))
590 })?;
591 for entry in read_dir {
592 let path = entry?.path();
593 if !account_snapshot_dirs_referenced.contains(&path) {
594 info!(
595 "Removing orphaned account snapshot hardlink directory '{}'...",
596 path.display()
597 );
598 move_and_async_delete_path(&path);
599 }
600 }
601 }
602
603 Ok(())
604}
605
606pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
608 let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
609 return;
611 };
612
613 let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
614
615 let incomplete_dirs: Vec<_> = read_dir_iter
616 .filter_map(|entry| entry.ok())
617 .map(|entry| entry.path())
618 .filter(|path| path.is_dir())
619 .filter(is_incomplete)
620 .collect();
621
622 for incomplete_dir in incomplete_dirs {
624 let result = purge_bank_snapshot(&incomplete_dir);
625 match result {
626 Ok(_) => info!(
627 "Purged incomplete snapshot dir: {}",
628 incomplete_dir.display()
629 ),
630 Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
631 }
632 }
633}
634
635fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
637 let state_complete_path = bank_snapshot_dir
638 .as_ref()
639 .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
640 state_complete_path.is_file()
641}
642
643pub fn write_full_snapshot_slot_file(
645 bank_snapshot_dir: impl AsRef<Path>,
646 full_snapshot_slot: Slot,
647) -> IoResult<()> {
648 let full_snapshot_slot_path = bank_snapshot_dir
649 .as_ref()
650 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
651 fs::write(
652 &full_snapshot_slot_path,
653 Slot::to_le_bytes(full_snapshot_slot),
654 )
655 .map_err(|err| {
656 IoError::other(format!(
657 "failed to write full snapshot slot file '{}': {err}",
658 full_snapshot_slot_path.display(),
659 ))
660 })
661}
662
663pub fn read_full_snapshot_slot_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<Slot> {
665 const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
666 let full_snapshot_slot_path = bank_snapshot_dir
667 .as_ref()
668 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
669 let full_snapshot_slot_file_metadata = fs::metadata(&full_snapshot_slot_path)?;
670 if full_snapshot_slot_file_metadata.len() != SLOT_SIZE as u64 {
671 let error_message = format!(
672 "invalid full snapshot slot file size: '{}' has {} bytes (should be {} bytes)",
673 full_snapshot_slot_path.display(),
674 full_snapshot_slot_file_metadata.len(),
675 SLOT_SIZE,
676 );
677 return Err(IoError::other(error_message));
678 }
679 let mut full_snapshot_slot_file = fs::File::open(&full_snapshot_slot_path)?;
680 let mut buffer = [0; SLOT_SIZE];
681 full_snapshot_slot_file.read_exact(&mut buffer)?;
682 let slot = Slot::from_le_bytes(buffer);
683 Ok(slot)
684}
685
686pub fn get_highest_loadable_bank_snapshot(
693 snapshot_config: &SnapshotConfig,
694) -> Option<BankSnapshotInfo> {
695 let highest_bank_snapshot =
696 get_highest_bank_snapshot_post(&snapshot_config.bank_snapshots_dir)?;
697
698 if !snapshot_config.should_generate_snapshots() {
701 return Some(highest_bank_snapshot);
702 }
703
704 let highest_full_snapshot_archive_slot =
707 get_highest_full_snapshot_archive_slot(&snapshot_config.full_snapshot_archives_dir)?;
708 let full_snapshot_file_slot =
709 read_full_snapshot_slot_file(&highest_bank_snapshot.snapshot_dir).ok()?;
710 (full_snapshot_file_slot == highest_full_snapshot_archive_slot).then_some(highest_bank_snapshot)
711}
712
713pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
716 if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
717 for entry in entries.flatten() {
718 if entry
719 .file_name()
720 .to_str()
721 .map(|file_name| file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX))
722 .unwrap_or(false)
723 {
724 let path = entry.path();
725 let result = if path.is_dir() {
726 fs::remove_dir_all(&path)
727 } else {
728 fs::remove_file(&path)
729 };
730 if let Err(err) = result {
731 warn!(
732 "Failed to remove temporary snapshot archive '{}': {err}",
733 path.display(),
734 );
735 }
736 }
737 }
738 }
739}
740
741pub fn serialize_and_archive_snapshot_package(
743 snapshot_package: SnapshotPackage,
744 snapshot_config: &SnapshotConfig,
745) -> Result<SnapshotArchiveInfo> {
746 let SnapshotPackage {
747 snapshot_kind,
748 slot: snapshot_slot,
749 block_height,
750 hash: snapshot_hash,
751 mut snapshot_storages,
752 status_cache_slot_deltas,
753 bank_fields_to_serialize,
754 bank_hash_stats,
755 accounts_delta_hash,
756 accounts_hash,
757 epoch_accounts_hash,
758 bank_incremental_snapshot_persistence,
759 write_version,
760 enqueued: _,
761 } = snapshot_package;
762
763 let bank_snapshot_info = serialize_snapshot(
764 &snapshot_config.bank_snapshots_dir,
765 snapshot_config.snapshot_version,
766 snapshot_storages.as_slice(),
767 status_cache_slot_deltas.as_slice(),
768 bank_fields_to_serialize,
769 bank_hash_stats,
770 accounts_delta_hash,
771 accounts_hash,
772 epoch_accounts_hash,
773 bank_incremental_snapshot_persistence.as_ref(),
774 write_version,
775 )?;
776
777 let full_snapshot_archive_slot = match snapshot_kind {
779 SnapshotKind::FullSnapshot => snapshot_slot,
780 SnapshotKind::IncrementalSnapshot(base_slot) => base_slot,
781 };
782 write_full_snapshot_slot_file(&bank_snapshot_info.snapshot_dir, full_snapshot_archive_slot)
783 .map_err(|err| {
784 IoError::other(format!(
785 "failed to serialize snapshot slot {snapshot_slot}, block height {block_height}, kind {snapshot_kind:?}: {err}",
786 ))
787 })?;
788
789 let snapshot_archive_path = match snapshot_package.snapshot_kind {
790 SnapshotKind::FullSnapshot => build_full_snapshot_archive_path(
791 &snapshot_config.full_snapshot_archives_dir,
792 snapshot_package.slot,
793 &snapshot_package.hash,
794 snapshot_config.archive_format,
795 ),
796 SnapshotKind::IncrementalSnapshot(incremental_snapshot_base_slot) => {
797 snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
800 build_incremental_snapshot_archive_path(
801 &snapshot_config.incremental_snapshot_archives_dir,
802 incremental_snapshot_base_slot,
803 snapshot_package.slot,
804 &snapshot_package.hash,
805 snapshot_config.archive_format,
806 )
807 }
808 };
809
810 let snapshot_archive_info = archive_snapshot(
811 snapshot_kind,
812 snapshot_slot,
813 snapshot_hash,
814 snapshot_storages.as_slice(),
815 &bank_snapshot_info.snapshot_dir,
816 snapshot_archive_path,
817 snapshot_config.archive_format,
818 )?;
819
820 Ok(snapshot_archive_info)
821}
822
823#[allow(clippy::too_many_arguments)]
825fn serialize_snapshot(
826 bank_snapshots_dir: impl AsRef<Path>,
827 snapshot_version: SnapshotVersion,
828 snapshot_storages: &[Arc<AccountStorageEntry>],
829 slot_deltas: &[BankSlotDelta],
830 mut bank_fields: BankFieldsToSerialize,
831 bank_hash_stats: BankHashStats,
832 accounts_delta_hash: AccountsDeltaHash,
833 accounts_hash: AccountsHash,
834 epoch_accounts_hash: Option<EpochAccountsHash>,
835 bank_incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
836 write_version: StoredMetaWriteVersion,
837) -> Result<BankSnapshotInfo> {
838 let slot = bank_fields.slot;
839
840 let do_serialize_snapshot = || {
843 let mut measure_everything = Measure::start("");
844 let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
845 if bank_snapshot_dir.exists() {
846 return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
847 bank_snapshot_dir,
848 ));
849 }
850 fs::create_dir_all(&bank_snapshot_dir).map_err(|err| {
851 AddBankSnapshotError::CreateSnapshotDir(err, bank_snapshot_dir.clone())
852 })?;
853
854 let bank_snapshot_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
856 info!(
857 "Creating bank snapshot for slot {slot} at '{}'",
858 bank_snapshot_path.display(),
859 );
860
861 let (_, flush_storages_us) = measure_us!({
862 for storage in snapshot_storages {
863 storage.flush().map_err(|err| {
864 AddBankSnapshotError::FlushStorage(err, storage.path().to_path_buf())
865 })?;
866 }
867 });
868
869 let (_, hard_link_storages_us) = measure_us!(hard_link_storages_to_snapshot(
874 &bank_snapshot_dir,
875 slot,
876 snapshot_storages
877 )
878 .map_err(AddBankSnapshotError::HardLinkStorages)?);
879
880 let bank_snapshot_serializer = move |stream: &mut BufWriter<fs::File>| -> Result<()> {
881 let versioned_epoch_stakes = mem::take(&mut bank_fields.versioned_epoch_stakes);
882 let extra_fields = ExtraFieldsToSerialize {
883 lamports_per_signature: bank_fields.fee_rate_governor.lamports_per_signature,
884 incremental_snapshot_persistence: bank_incremental_snapshot_persistence,
885 epoch_accounts_hash,
886 versioned_epoch_stakes,
887 };
888 serde_snapshot::serialize_bank_snapshot_into(
889 stream,
890 bank_fields,
891 bank_hash_stats,
892 accounts_delta_hash,
893 accounts_hash,
894 &get_storages_to_serialize(snapshot_storages),
895 extra_fields,
896 write_version,
897 )?;
898 Ok(())
899 };
900 let (bank_snapshot_consumed_size, bank_serialize) = measure_time!(
901 serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
902 .map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
903 "bank serialize"
904 );
905
906 let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
907 let (status_cache_consumed_size, status_cache_serialize_us) = measure_us!(
908 snapshot_bank_utils::serialize_status_cache(slot_deltas, &status_cache_path)
909 .map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?
910 );
911
912 let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
913 let (_, write_version_file_us) = measure_us!(fs::write(
914 &version_path,
915 snapshot_version.as_str().as_bytes(),
916 )
917 .map_err(|err| AddBankSnapshotError::WriteSnapshotVersionFile(err, version_path))?);
918
919 let state_complete_path = bank_snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
921 let (_, write_state_complete_file_us) = measure_us!(fs::File::create(&state_complete_path)
922 .map_err(|err| {
923 AddBankSnapshotError::CreateStateCompleteFile(err, state_complete_path)
924 })?);
925
926 measure_everything.stop();
927
928 datapoint_info!(
930 "snapshot_bank",
931 ("slot", slot, i64),
932 ("bank_size", bank_snapshot_consumed_size, i64),
933 ("status_cache_size", status_cache_consumed_size, i64),
934 ("flush_storages_us", flush_storages_us, i64),
935 ("hard_link_storages_us", hard_link_storages_us, i64),
936 ("bank_serialize_us", bank_serialize.as_us(), i64),
937 ("status_cache_serialize_us", status_cache_serialize_us, i64),
938 ("write_version_file_us", write_version_file_us, i64),
939 (
940 "write_state_complete_file_us",
941 write_state_complete_file_us,
942 i64
943 ),
944 ("total_us", measure_everything.as_us(), i64),
945 );
946
947 info!(
948 "{} for slot {} at {}",
949 bank_serialize,
950 slot,
951 bank_snapshot_path.display(),
952 );
953
954 Ok(BankSnapshotInfo {
955 slot,
956 snapshot_kind: BankSnapshotKind::Pre,
957 snapshot_dir: bank_snapshot_dir,
958 snapshot_version,
959 })
960 };
961
962 do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
963}
964
965fn archive_snapshot(
967 snapshot_kind: SnapshotKind,
968 snapshot_slot: Slot,
969 snapshot_hash: SnapshotHash,
970 snapshot_storages: &[Arc<AccountStorageEntry>],
971 bank_snapshot_dir: impl AsRef<Path>,
972 archive_path: impl AsRef<Path>,
973 archive_format: ArchiveFormat,
974) -> Result<SnapshotArchiveInfo> {
975 use ArchiveSnapshotPackageError as E;
976 const SNAPSHOTS_DIR: &str = "snapshots";
977 const ACCOUNTS_DIR: &str = "accounts";
978 info!("Generating snapshot archive for slot {snapshot_slot}, kind: {snapshot_kind:?}");
979
980 let mut timer = Measure::start("snapshot_package-package_snapshots");
981 let tar_dir = archive_path
982 .as_ref()
983 .parent()
984 .expect("Tar output path is invalid");
985
986 fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
987
988 let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
990 let staging_dir = tempfile::Builder::new()
991 .prefix(&format!("{}{}-", staging_dir_prefix, snapshot_slot))
992 .tempdir_in(tar_dir)
993 .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
994 let staging_snapshots_dir = staging_dir.path().join(SNAPSHOTS_DIR);
995
996 let slot_str = snapshot_slot.to_string();
997 let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
998 fs::create_dir_all(&staging_snapshot_dir)
1000 .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
1001
1002 let src_snapshot_dir = bank_snapshot_dir.as_ref().canonicalize().map_err(|err| {
1004 E::CanonicalizeSnapshotSourceDir(err, bank_snapshot_dir.as_ref().to_path_buf())
1005 })?;
1006 let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
1007 let src_snapshot_file = src_snapshot_dir.join(slot_str);
1008 symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
1009 .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
1010
1011 let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1014 let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1015 symlink::symlink_file(&src_status_cache, &staging_status_cache)
1016 .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
1017
1018 let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
1020 let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1021 symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
1022 E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
1023 })?;
1024
1025 let staging_archive_path = tar_dir.join(format!(
1027 "{}{}.{}",
1028 staging_dir_prefix,
1029 snapshot_slot,
1030 archive_format.extension(),
1031 ));
1032
1033 {
1034 let mut archive_file = fs::File::create(&staging_archive_path)
1035 .map_err(|err| E::CreateArchiveFile(err, staging_archive_path.clone()))?;
1036
1037 let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
1038 let mut archive = tar::Builder::new(encoder);
1039 archive.sparse(false);
1048 archive
1051 .append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
1052 .map_err(E::ArchiveVersionFile)?;
1053 archive
1054 .append_dir_all(SNAPSHOTS_DIR, &staging_snapshots_dir)
1055 .map_err(E::ArchiveSnapshotsDir)?;
1056
1057 for storage in snapshot_storages {
1058 let path_in_archive = Path::new(ACCOUNTS_DIR)
1059 .join(AccountsFile::file_name(storage.slot(), storage.id()));
1060 match storage.accounts.internals_for_archive() {
1061 InternalsForArchive::Mmap(data) => {
1062 let mut header = tar::Header::new_gnu();
1063 header.set_path(path_in_archive).map_err(|err| {
1064 E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1065 })?;
1066 header.set_size(storage.capacity());
1067 header.set_cksum();
1068 archive.append(&header, data)
1069 }
1070 InternalsForArchive::FileIo(path) => {
1071 archive.append_path_with_name(path, path_in_archive)
1072 }
1073 }
1074 .map_err(|err| E::ArchiveAccountStorageFile(err, storage.path().to_path_buf()))?;
1075 }
1076
1077 archive.into_inner().map_err(E::FinishArchive)?;
1078 Ok(())
1079 };
1080
1081 match archive_format {
1082 ArchiveFormat::TarBzip2 => {
1083 let mut encoder =
1084 bzip2::write::BzEncoder::new(archive_file, bzip2::Compression::best());
1085 do_archive_files(&mut encoder)?;
1086 encoder.finish().map_err(E::FinishEncoder)?;
1087 }
1088 ArchiveFormat::TarGzip => {
1089 let mut encoder =
1090 flate2::write::GzEncoder::new(archive_file, flate2::Compression::default());
1091 do_archive_files(&mut encoder)?;
1092 encoder.finish().map_err(E::FinishEncoder)?;
1093 }
1094 ArchiveFormat::TarZstd { config } => {
1095 let mut encoder =
1096 zstd::stream::Encoder::new(archive_file, config.compression_level)
1097 .map_err(E::CreateEncoder)?;
1098 do_archive_files(&mut encoder)?;
1099 encoder.finish().map_err(E::FinishEncoder)?;
1100 }
1101 ArchiveFormat::TarLz4 => {
1102 let mut encoder = lz4::EncoderBuilder::new()
1103 .level(1)
1104 .build(archive_file)
1105 .map_err(E::CreateEncoder)?;
1106 do_archive_files(&mut encoder)?;
1107 let (_output, result) = encoder.finish();
1108 result.map_err(E::FinishEncoder)?;
1109 }
1110 ArchiveFormat::Tar => {
1111 do_archive_files(&mut archive_file)?;
1112 }
1113 };
1114 }
1115
1116 let metadata = fs::metadata(&staging_archive_path)
1118 .map_err(|err| E::QueryArchiveMetadata(err, staging_archive_path.clone()))?;
1119 let archive_path = archive_path.as_ref().to_path_buf();
1120 fs::rename(&staging_archive_path, &archive_path)
1121 .map_err(|err| E::MoveArchive(err, staging_archive_path, archive_path.clone()))?;
1122
1123 timer.stop();
1124 info!(
1125 "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
1126 archive_path.display(),
1127 snapshot_slot,
1128 timer.as_ms(),
1129 metadata.len()
1130 );
1131
1132 datapoint_info!(
1133 "archive-snapshot-package",
1134 ("slot", snapshot_slot, i64),
1135 ("archive_format", archive_format.to_string(), String),
1136 ("duration_ms", timer.as_ms(), i64),
1137 (
1138 if snapshot_kind.is_full_snapshot() {
1139 "full-snapshot-archive-size"
1140 } else {
1141 "incremental-snapshot-archive-size"
1142 },
1143 metadata.len(),
1144 i64
1145 ),
1146 );
1147 Ok(SnapshotArchiveInfo {
1148 path: archive_path,
1149 slot: snapshot_slot,
1150 hash: snapshot_hash,
1151 archive_format,
1152 })
1153}
1154
1155pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1157 let mut bank_snapshots = Vec::default();
1158 match fs::read_dir(&bank_snapshots_dir) {
1159 Err(err) => {
1160 info!(
1161 "Unable to read bank snapshots directory '{}': {err}",
1162 bank_snapshots_dir.as_ref().display(),
1163 );
1164 }
1165 Ok(paths) => paths
1166 .filter_map(|entry| {
1167 entry
1170 .ok()
1171 .filter(|entry| entry.path().is_dir())
1172 .and_then(|entry| {
1173 entry
1174 .path()
1175 .file_name()
1176 .and_then(|file_name| file_name.to_str())
1177 .and_then(|file_name| file_name.parse::<Slot>().ok())
1178 })
1179 })
1180 .for_each(
1181 |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
1182 Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
1183 Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
1186 },
1187 ),
1188 }
1189 bank_snapshots
1190}
1191
1192pub fn get_bank_snapshots_pre(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1196 let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1197 bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Pre);
1198 bank_snapshots
1199}
1200
1201pub fn get_bank_snapshots_post(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1205 let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1206 bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Post);
1207 bank_snapshots
1208}
1209
1210pub fn get_highest_bank_snapshot_pre(
1214 bank_snapshots_dir: impl AsRef<Path>,
1215) -> Option<BankSnapshotInfo> {
1216 do_get_highest_bank_snapshot(get_bank_snapshots_pre(bank_snapshots_dir))
1217}
1218
1219pub fn get_highest_bank_snapshot_post(
1223 bank_snapshots_dir: impl AsRef<Path>,
1224) -> Option<BankSnapshotInfo> {
1225 do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
1226}
1227
1228pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
1232 do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
1233}
1234
1235fn do_get_highest_bank_snapshot(
1236 mut bank_snapshots: Vec<BankSnapshotInfo>,
1237) -> Option<BankSnapshotInfo> {
1238 bank_snapshots.sort_unstable();
1239 bank_snapshots.into_iter().next_back()
1240}
1241
1242pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
1243where
1244 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1245{
1246 serialize_snapshot_data_file_capped::<F>(
1247 data_file_path,
1248 MAX_SNAPSHOT_DATA_FILE_SIZE,
1249 serializer,
1250 )
1251}
1252
1253pub fn deserialize_snapshot_data_file<T: Sized>(
1254 data_file_path: &Path,
1255 deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
1256) -> Result<T> {
1257 let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
1258 deserializer(streams.full_snapshot_stream)
1259 };
1260
1261 let wrapped_data_file_path = SnapshotRootPaths {
1262 full_snapshot_root_file_path: data_file_path.to_path_buf(),
1263 incremental_snapshot_root_file_path: None,
1264 };
1265
1266 deserialize_snapshot_data_files_capped(
1267 &wrapped_data_file_path,
1268 MAX_SNAPSHOT_DATA_FILE_SIZE,
1269 wrapped_deserializer,
1270 )
1271}
1272
1273pub fn deserialize_snapshot_data_files<T: Sized>(
1274 snapshot_root_paths: &SnapshotRootPaths,
1275 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1276) -> Result<T> {
1277 deserialize_snapshot_data_files_capped(
1278 snapshot_root_paths,
1279 MAX_SNAPSHOT_DATA_FILE_SIZE,
1280 deserializer,
1281 )
1282}
1283
1284fn serialize_snapshot_data_file_capped<F>(
1285 data_file_path: &Path,
1286 maximum_file_size: u64,
1287 serializer: F,
1288) -> Result<u64>
1289where
1290 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1291{
1292 let data_file = fs::File::create(data_file_path)?;
1293 let mut data_file_stream = BufWriter::new(data_file);
1294 serializer(&mut data_file_stream)?;
1295 data_file_stream.flush()?;
1296
1297 let consumed_size = data_file_stream.stream_position()?;
1298 if consumed_size > maximum_file_size {
1299 let error_message = format!(
1300 "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
1301 data_file_path.display(),
1302 );
1303 return Err(IoError::other(error_message).into());
1304 }
1305 Ok(consumed_size)
1306}
1307
1308fn deserialize_snapshot_data_files_capped<T: Sized>(
1309 snapshot_root_paths: &SnapshotRootPaths,
1310 maximum_file_size: u64,
1311 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1312) -> Result<T> {
1313 let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
1314 create_snapshot_data_file_stream(
1315 &snapshot_root_paths.full_snapshot_root_file_path,
1316 maximum_file_size,
1317 )?;
1318
1319 let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
1320 if let Some(ref incremental_snapshot_root_file_path) =
1321 snapshot_root_paths.incremental_snapshot_root_file_path
1322 {
1323 Some(create_snapshot_data_file_stream(
1324 incremental_snapshot_root_file_path,
1325 maximum_file_size,
1326 )?)
1327 } else {
1328 None
1329 }
1330 .unzip();
1331
1332 let mut snapshot_streams = SnapshotStreams {
1333 full_snapshot_stream: &mut full_snapshot_data_file_stream,
1334 incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
1335 };
1336 let ret = deserializer(&mut snapshot_streams)?;
1337
1338 check_deserialize_file_consumed(
1339 full_snapshot_file_size,
1340 &snapshot_root_paths.full_snapshot_root_file_path,
1341 &mut full_snapshot_data_file_stream,
1342 )?;
1343
1344 if let Some(ref incremental_snapshot_root_file_path) =
1345 snapshot_root_paths.incremental_snapshot_root_file_path
1346 {
1347 check_deserialize_file_consumed(
1348 incremental_snapshot_file_size.unwrap(),
1349 incremental_snapshot_root_file_path,
1350 incremental_snapshot_data_file_stream.as_mut().unwrap(),
1351 )?;
1352 }
1353
1354 Ok(ret)
1355}
1356
1357fn create_snapshot_data_file_stream(
1360 snapshot_root_file_path: impl AsRef<Path>,
1361 maximum_file_size: u64,
1362) -> Result<(u64, BufReader<std::fs::File>)> {
1363 let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
1364
1365 if snapshot_file_size > maximum_file_size {
1366 let error_message = format!(
1367 "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
1368 snapshot_root_file_path.as_ref().display(),
1369 snapshot_file_size,
1370 maximum_file_size,
1371 );
1372 return Err(IoError::other(error_message).into());
1373 }
1374
1375 let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
1376 let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
1377
1378 Ok((snapshot_file_size, snapshot_data_file_stream))
1379}
1380
1381fn check_deserialize_file_consumed(
1384 file_size: u64,
1385 file_path: impl AsRef<Path>,
1386 file_stream: &mut BufReader<std::fs::File>,
1387) -> Result<()> {
1388 let consumed_size = file_stream.stream_position()?;
1389
1390 if consumed_size != file_size {
1391 let error_message = format!(
1392 "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to deserialize",
1393 file_path.as_ref().display(),
1394 file_size,
1395 consumed_size,
1396 );
1397 return Err(IoError::other(error_message).into());
1398 }
1399
1400 Ok(())
1401}
1402
1403fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
1405 let run_path = appendvec_path.parent()?;
1406 let run_file_name = run_path.file_name()?;
1407 if run_file_name != ACCOUNTS_RUN_DIR {
1410 error!(
1411 "The account path {} does not have run/ as its immediate parent directory.",
1412 run_path.display()
1413 );
1414 return None;
1415 }
1416 let account_path = run_path.parent()?;
1417 Some(account_path.to_path_buf())
1418}
1419
1420fn get_snapshot_accounts_hardlink_dir(
1423 appendvec_path: &Path,
1424 bank_slot: Slot,
1425 account_paths: &mut HashSet<PathBuf>,
1426 hardlinks_dir: impl AsRef<Path>,
1427) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1428 let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1429 GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1430 })?;
1431
1432 let snapshot_hardlink_dir = account_path
1433 .join(ACCOUNTS_SNAPSHOT_DIR)
1434 .join(bank_slot.to_string());
1435
1436 if !account_paths.contains(&account_path) {
1439 let idx = account_paths.len();
1440 debug!(
1441 "for appendvec_path {}, create hard-link path {}",
1442 appendvec_path.display(),
1443 snapshot_hardlink_dir.display()
1444 );
1445 fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1446 GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1447 err,
1448 snapshot_hardlink_dir.clone(),
1449 )
1450 })?;
1451 let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1452 symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1453 GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1454 source: err,
1455 original: snapshot_hardlink_dir.clone(),
1456 link: symlink_path,
1457 }
1458 })?;
1459 account_paths.insert(account_path);
1460 };
1461
1462 Ok(snapshot_hardlink_dir)
1463}
1464
1465pub fn hard_link_storages_to_snapshot(
1470 bank_snapshot_dir: impl AsRef<Path>,
1471 bank_slot: Slot,
1472 snapshot_storages: &[Arc<AccountStorageEntry>],
1473) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1474 let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1475 fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1476 HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1477 err,
1478 accounts_hardlinks_dir.clone(),
1479 )
1480 })?;
1481
1482 let mut account_paths: HashSet<PathBuf> = HashSet::new();
1483 for storage in snapshot_storages {
1484 let storage_path = storage.accounts.path();
1485 let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1486 storage_path,
1487 bank_slot,
1488 &mut account_paths,
1489 &accounts_hardlinks_dir,
1490 )?;
1491 let hardlink_filename = AccountsFile::file_name(storage.slot(), storage.id());
1494 let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1495 fs::hard_link(storage_path, &hard_link_path).map_err(|err| {
1496 HardLinkStoragesToSnapshotError::HardLinkStorage(
1497 err,
1498 storage_path.to_path_buf(),
1499 hard_link_path,
1500 )
1501 })?;
1502 }
1503 Ok(())
1504}
1505
1506pub(crate) fn get_storages_to_serialize(
1509 snapshot_storages: &[Arc<AccountStorageEntry>],
1510) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1511 snapshot_storages
1512 .iter()
1513 .map(|storage| vec![Arc::clone(storage)])
1514 .collect::<Vec<_>>()
1515}
1516
1517const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
1519
1520pub fn verify_and_unarchive_snapshots(
1522 bank_snapshots_dir: impl AsRef<Path>,
1523 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1524 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1525 account_paths: &[PathBuf],
1526 storage_access: StorageAccess,
1527) -> Result<(
1528 UnarchivedSnapshot,
1529 Option<UnarchivedSnapshot>,
1530 AtomicAccountsFileId,
1531)> {
1532 check_are_snapshots_compatible(
1533 full_snapshot_archive_info,
1534 incremental_snapshot_archive_info,
1535 )?;
1536
1537 let parallel_divisions = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT);
1538
1539 let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
1540 let unarchived_full_snapshot = unarchive_snapshot(
1541 &bank_snapshots_dir,
1542 TMP_SNAPSHOT_ARCHIVE_PREFIX,
1543 full_snapshot_archive_info.path(),
1544 "snapshot untar",
1545 account_paths,
1546 full_snapshot_archive_info.archive_format(),
1547 parallel_divisions,
1548 next_append_vec_id.clone(),
1549 storage_access,
1550 )?;
1551
1552 let unarchived_incremental_snapshot =
1553 if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1554 let unarchived_incremental_snapshot = unarchive_snapshot(
1555 &bank_snapshots_dir,
1556 TMP_SNAPSHOT_ARCHIVE_PREFIX,
1557 incremental_snapshot_archive_info.path(),
1558 "incremental snapshot untar",
1559 account_paths,
1560 incremental_snapshot_archive_info.archive_format(),
1561 parallel_divisions,
1562 next_append_vec_id.clone(),
1563 storage_access,
1564 )?;
1565 Some(unarchived_incremental_snapshot)
1566 } else {
1567 None
1568 };
1569
1570 Ok((
1571 unarchived_full_snapshot,
1572 unarchived_incremental_snapshot,
1573 Arc::try_unwrap(next_append_vec_id).unwrap(),
1574 ))
1575}
1576
1577fn spawn_unpack_snapshot_thread(
1579 file_sender: Sender<PathBuf>,
1580 account_paths: Arc<Vec<PathBuf>>,
1581 ledger_dir: Arc<PathBuf>,
1582 mut archive: Archive<SharedBufferReader>,
1583 parallel_selector: Option<ParallelSelector>,
1584 thread_index: usize,
1585) -> JoinHandle<()> {
1586 Builder::new()
1587 .name(format!("solUnpkSnpsht{thread_index:02}"))
1588 .spawn(move || {
1589 hardened_unpack::streaming_unpack_snapshot(
1590 &mut archive,
1591 ledger_dir.as_path(),
1592 &account_paths,
1593 parallel_selector,
1594 &file_sender,
1595 )
1596 .unwrap();
1597 })
1598 .unwrap()
1599}
1600
1601fn streaming_unarchive_snapshot(
1603 file_sender: Sender<PathBuf>,
1604 account_paths: Vec<PathBuf>,
1605 ledger_dir: PathBuf,
1606 snapshot_archive_path: PathBuf,
1607 archive_format: ArchiveFormat,
1608 num_threads: usize,
1609) -> Vec<JoinHandle<()>> {
1610 let account_paths = Arc::new(account_paths);
1611 let ledger_dir = Arc::new(ledger_dir);
1612 let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format);
1613
1614 let archives: Vec<_> = (0..num_threads)
1616 .map(|_| {
1617 let reader = SharedBufferReader::new(&shared_buffer);
1618 Archive::new(reader)
1619 })
1620 .collect();
1621
1622 archives
1623 .into_iter()
1624 .enumerate()
1625 .map(|(thread_index, archive)| {
1626 let parallel_selector = Some(ParallelSelector {
1627 index: thread_index,
1628 divisions: num_threads,
1629 });
1630
1631 spawn_unpack_snapshot_thread(
1632 file_sender.clone(),
1633 account_paths.clone(),
1634 ledger_dir.clone(),
1635 archive,
1636 parallel_selector,
1637 thread_index,
1638 )
1639 })
1640 .collect()
1641}
1642
1643fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1648 let snapshots_dir = unpack_dir.as_ref().join("snapshots");
1649 if !snapshots_dir.is_dir() {
1650 return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1651 }
1652
1653 let slot_dir = std::fs::read_dir(&snapshots_dir)
1655 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1656 .find(|entry| entry.as_ref().unwrap().path().is_dir())
1657 .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1658 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1659 .path();
1660
1661 let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
1662 fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
1663
1664 let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1665 fs::hard_link(
1666 status_cache_file,
1667 slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
1668 )?;
1669
1670 let state_complete_file = slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
1671 fs::File::create(state_complete_file)?;
1672
1673 Ok(())
1674}
1675
1676fn unarchive_snapshot(
1680 bank_snapshots_dir: impl AsRef<Path>,
1681 unpacked_snapshots_dir_prefix: &'static str,
1682 snapshot_archive_path: impl AsRef<Path>,
1683 measure_name: &'static str,
1684 account_paths: &[PathBuf],
1685 archive_format: ArchiveFormat,
1686 parallel_divisions: usize,
1687 next_append_vec_id: Arc<AtomicAccountsFileId>,
1688 storage_access: StorageAccess,
1689) -> Result<UnarchivedSnapshot> {
1690 let unpack_dir = tempfile::Builder::new()
1691 .prefix(unpacked_snapshots_dir_prefix)
1692 .tempdir_in(bank_snapshots_dir)?;
1693 let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
1694
1695 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1696 streaming_unarchive_snapshot(
1697 file_sender,
1698 account_paths.to_vec(),
1699 unpack_dir.path().to_path_buf(),
1700 snapshot_archive_path.as_ref().to_path_buf(),
1701 archive_format,
1702 parallel_divisions,
1703 );
1704
1705 let num_rebuilder_threads = num_cpus::get_physical()
1706 .saturating_sub(parallel_divisions)
1707 .max(1);
1708 let (version_and_storages, measure_untar) = measure_time!(
1709 SnapshotStorageRebuilder::rebuild_storage(
1710 file_receiver,
1711 num_rebuilder_threads,
1712 next_append_vec_id,
1713 SnapshotFrom::Archive,
1714 storage_access,
1715 )?,
1716 measure_name
1717 );
1718 info!("{}", measure_untar);
1719
1720 create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1721
1722 let RebuiltSnapshotStorage {
1723 snapshot_version,
1724 storage,
1725 } = version_and_storages;
1726 Ok(UnarchivedSnapshot {
1727 unpack_dir,
1728 storage,
1729 unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1730 unpacked_snapshots_dir,
1731 snapshot_version,
1732 },
1733 measure_untar,
1734 })
1735}
1736
1737fn streaming_snapshot_dir_files(
1740 file_sender: Sender<PathBuf>,
1741 snapshot_file_path: impl Into<PathBuf>,
1742 snapshot_version_path: impl Into<PathBuf>,
1743 account_paths: &[PathBuf],
1744) -> Result<()> {
1745 file_sender.send(snapshot_file_path.into())?;
1746 file_sender.send(snapshot_version_path.into())?;
1747
1748 for account_path in account_paths {
1749 for file in fs::read_dir(account_path)? {
1750 file_sender.send(file?.path())?;
1751 }
1752 }
1753
1754 Ok(())
1755}
1756
1757pub fn rebuild_storages_from_snapshot_dir(
1762 snapshot_info: &BankSnapshotInfo,
1763 account_paths: &[PathBuf],
1764 next_append_vec_id: Arc<AtomicAccountsFileId>,
1765 storage_access: StorageAccess,
1766) -> Result<AccountStorageMap> {
1767 let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1768 let accounts_hardlinks = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1769 let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1770
1771 let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1772 IoError::other(format!(
1773 "failed to read accounts hardlinks dir '{}': {err}",
1774 accounts_hardlinks.display(),
1775 ))
1776 })?;
1777 for dir_entry in read_dir {
1778 let symlink_path = dir_entry?.path();
1779 let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
1782 IoError::other(format!(
1783 "failed to read symlink '{}': {err}",
1784 symlink_path.display(),
1785 ))
1786 })?;
1787 let account_run_path = account_snapshot_path
1788 .parent()
1789 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1790 .parent()
1791 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1792 .join(ACCOUNTS_RUN_DIR);
1793 if !account_run_paths.contains(&account_run_path) {
1794 return Err(SnapshotError::AccountPathsMismatch);
1797 }
1798 let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
1801 IoError::other(format!(
1802 "failed to read account snapshot dir '{}': {err}",
1803 account_snapshot_path.display(),
1804 ))
1805 })?;
1806 for file in read_dir {
1807 let file_path = file?.path();
1808 let file_name = file_path
1809 .file_name()
1810 .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
1811 let dest_path = account_run_path.join(file_name);
1812 fs::hard_link(&file_path, &dest_path).map_err(|err| {
1813 IoError::other(format!(
1814 "failed to hard link from '{}' to '{}': {err}",
1815 file_path.display(),
1816 dest_path.display(),
1817 ))
1818 })?;
1819 }
1820 }
1821
1822 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1823 let snapshot_file_path = &snapshot_info.snapshot_path();
1824 let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1825 streaming_snapshot_dir_files(
1826 file_sender,
1827 snapshot_file_path,
1828 snapshot_version_path,
1829 account_paths,
1830 )?;
1831
1832 let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1833 let version_and_storages = SnapshotStorageRebuilder::rebuild_storage(
1834 file_receiver,
1835 num_rebuilder_threads,
1836 next_append_vec_id,
1837 SnapshotFrom::Dir,
1838 storage_access,
1839 )?;
1840
1841 let RebuiltSnapshotStorage {
1842 snapshot_version: _,
1843 storage,
1844 } = version_and_storages;
1845 Ok(storage)
1846}
1847
1848fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
1852 let file_metadata = fs::metadata(&path).map_err(|err| {
1854 IoError::other(format!(
1855 "failed to query snapshot version file metadata '{}': {err}",
1856 path.as_ref().display(),
1857 ))
1858 })?;
1859 let file_size = file_metadata.len();
1860 if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
1861 let error_message = format!(
1862 "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
1863 path.as_ref().display(),
1864 file_size,
1865 MAX_SNAPSHOT_VERSION_FILE_SIZE,
1866 );
1867 return Err(IoError::other(error_message).into());
1868 }
1869
1870 let mut snapshot_version = String::new();
1872 let mut file = fs::File::open(&path).map_err(|err| {
1873 IoError::other(format!(
1874 "failed to open snapshot version file '{}': {err}",
1875 path.as_ref().display()
1876 ))
1877 })?;
1878 file.read_to_string(&mut snapshot_version).map_err(|err| {
1879 IoError::other(format!(
1880 "failed to read snapshot version from file '{}': {err}",
1881 path.as_ref().display()
1882 ))
1883 })?;
1884
1885 Ok(snapshot_version.trim().to_string())
1886}
1887
1888fn check_are_snapshots_compatible(
1891 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1892 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1893) -> Result<()> {
1894 if incremental_snapshot_archive_info.is_none() {
1895 return Ok(());
1896 }
1897
1898 let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
1899
1900 (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
1901 .then_some(())
1902 .ok_or_else(|| {
1903 SnapshotError::MismatchedBaseSlot(
1904 full_snapshot_archive_info.slot(),
1905 incremental_snapshot_archive_info.base_slot(),
1906 )
1907 })
1908}
1909
1910pub fn path_to_file_name_str(path: &Path) -> Result<&str> {
1912 path.file_name()
1913 .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))?
1914 .to_str()
1915 .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf()))
1916}
1917
1918pub fn build_snapshot_archives_remote_dir(snapshot_archives_dir: impl AsRef<Path>) -> PathBuf {
1919 snapshot_archives_dir
1920 .as_ref()
1921 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
1922}
1923
1924pub fn build_full_snapshot_archive_path(
1927 full_snapshot_archives_dir: impl AsRef<Path>,
1928 slot: Slot,
1929 hash: &SnapshotHash,
1930 archive_format: ArchiveFormat,
1931) -> PathBuf {
1932 full_snapshot_archives_dir.as_ref().join(format!(
1933 "snapshot-{}-{}.{}",
1934 slot,
1935 hash.0,
1936 archive_format.extension(),
1937 ))
1938}
1939
1940pub fn build_incremental_snapshot_archive_path(
1944 incremental_snapshot_archives_dir: impl AsRef<Path>,
1945 base_slot: Slot,
1946 slot: Slot,
1947 hash: &SnapshotHash,
1948 archive_format: ArchiveFormat,
1949) -> PathBuf {
1950 incremental_snapshot_archives_dir.as_ref().join(format!(
1951 "incremental-snapshot-{}-{}-{}.{}",
1952 base_slot,
1953 slot,
1954 hash.0,
1955 archive_format.extension(),
1956 ))
1957}
1958
1959pub(crate) fn parse_full_snapshot_archive_filename(
1961 archive_filename: &str,
1962) -> Result<(Slot, SnapshotHash, ArchiveFormat)> {
1963 lazy_static! {
1964 static ref RE: Regex = Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1965 }
1966
1967 let do_parse = || {
1968 RE.captures(archive_filename).and_then(|captures| {
1969 let slot = captures
1970 .name("slot")
1971 .map(|x| x.as_str().parse::<Slot>())?
1972 .ok()?;
1973 let hash = captures
1974 .name("hash")
1975 .map(|x| x.as_str().parse::<Hash>())?
1976 .ok()?;
1977 let archive_format = captures
1978 .name("ext")
1979 .map(|x| x.as_str().parse::<ArchiveFormat>())?
1980 .ok()?;
1981
1982 Some((slot, SnapshotHash(hash), archive_format))
1983 })
1984 };
1985
1986 do_parse().ok_or_else(|| {
1987 SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
1988 })
1989}
1990
1991pub(crate) fn parse_incremental_snapshot_archive_filename(
1993 archive_filename: &str,
1994) -> Result<(Slot, Slot, SnapshotHash, ArchiveFormat)> {
1995 lazy_static! {
1996 static ref RE: Regex = Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1997 }
1998
1999 let do_parse = || {
2000 RE.captures(archive_filename).and_then(|captures| {
2001 let base_slot = captures
2002 .name("base")
2003 .map(|x| x.as_str().parse::<Slot>())?
2004 .ok()?;
2005 let slot = captures
2006 .name("slot")
2007 .map(|x| x.as_str().parse::<Slot>())?
2008 .ok()?;
2009 let hash = captures
2010 .name("hash")
2011 .map(|x| x.as_str().parse::<Hash>())?
2012 .ok()?;
2013 let archive_format = captures
2014 .name("ext")
2015 .map(|x| x.as_str().parse::<ArchiveFormat>())?
2016 .ok()?;
2017
2018 Some((base_slot, slot, SnapshotHash(hash), archive_format))
2019 })
2020 };
2021
2022 do_parse().ok_or_else(|| {
2023 SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2024 })
2025}
2026
2027fn get_snapshot_archives<T, F>(snapshot_archives_dir: &Path, cb: F) -> Vec<T>
2029where
2030 F: Fn(PathBuf) -> Result<T>,
2031{
2032 let walk_dir = |dir: &Path| -> Vec<T> {
2033 let entry_iter = fs::read_dir(dir);
2034 match entry_iter {
2035 Err(err) => {
2036 info!(
2037 "Unable to read snapshot archives directory '{}': {err}",
2038 dir.display(),
2039 );
2040 vec![]
2041 }
2042 Ok(entries) => entries
2043 .filter_map(|entry| entry.map_or(None, |entry| cb(entry.path()).ok()))
2044 .collect(),
2045 }
2046 };
2047
2048 let mut ret = walk_dir(snapshot_archives_dir);
2049 let remote_dir = build_snapshot_archives_remote_dir(snapshot_archives_dir);
2050 if remote_dir.exists() {
2051 ret.append(&mut walk_dir(remote_dir.as_ref()));
2052 }
2053 ret
2054}
2055
2056pub fn get_full_snapshot_archives(
2058 full_snapshot_archives_dir: impl AsRef<Path>,
2059) -> Vec<FullSnapshotArchiveInfo> {
2060 get_snapshot_archives(
2061 full_snapshot_archives_dir.as_ref(),
2062 FullSnapshotArchiveInfo::new_from_path,
2063 )
2064}
2065
2066pub fn get_incremental_snapshot_archives(
2068 incremental_snapshot_archives_dir: impl AsRef<Path>,
2069) -> Vec<IncrementalSnapshotArchiveInfo> {
2070 get_snapshot_archives(
2071 incremental_snapshot_archives_dir.as_ref(),
2072 IncrementalSnapshotArchiveInfo::new_from_path,
2073 )
2074}
2075
2076pub fn get_highest_full_snapshot_archive_slot(
2078 full_snapshot_archives_dir: impl AsRef<Path>,
2079) -> Option<Slot> {
2080 get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
2081 .map(|full_snapshot_archive_info| full_snapshot_archive_info.slot())
2082}
2083
2084pub fn get_highest_incremental_snapshot_archive_slot(
2087 incremental_snapshot_archives_dir: impl AsRef<Path>,
2088 full_snapshot_slot: Slot,
2089) -> Option<Slot> {
2090 get_highest_incremental_snapshot_archive_info(
2091 incremental_snapshot_archives_dir,
2092 full_snapshot_slot,
2093 )
2094 .map(|incremental_snapshot_archive_info| incremental_snapshot_archive_info.slot())
2095}
2096
2097pub fn get_highest_full_snapshot_archive_info(
2099 full_snapshot_archives_dir: impl AsRef<Path>,
2100) -> Option<FullSnapshotArchiveInfo> {
2101 let mut full_snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2102 full_snapshot_archives.sort_unstable();
2103 full_snapshot_archives.into_iter().next_back()
2104}
2105
2106pub fn get_highest_incremental_snapshot_archive_info(
2109 incremental_snapshot_archives_dir: impl AsRef<Path>,
2110 full_snapshot_slot: Slot,
2111) -> Option<IncrementalSnapshotArchiveInfo> {
2112 let mut incremental_snapshot_archives =
2116 get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
2117 .into_iter()
2118 .filter(|incremental_snapshot_archive_info| {
2119 incremental_snapshot_archive_info.base_slot() == full_snapshot_slot
2120 })
2121 .collect::<Vec<_>>();
2122 incremental_snapshot_archives.sort_unstable();
2123 incremental_snapshot_archives.into_iter().next_back()
2124}
2125
2126pub fn purge_old_snapshot_archives(
2127 full_snapshot_archives_dir: impl AsRef<Path>,
2128 incremental_snapshot_archives_dir: impl AsRef<Path>,
2129 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2130 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2131) {
2132 info!(
2133 "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
2134 full_snapshot_archives_dir.as_ref().display(),
2135 maximum_full_snapshot_archives_to_retain
2136 );
2137
2138 let mut full_snapshot_archives = get_full_snapshot_archives(&full_snapshot_archives_dir);
2139 full_snapshot_archives.sort_unstable();
2140 full_snapshot_archives.reverse();
2141
2142 let num_to_retain = full_snapshot_archives
2143 .len()
2144 .min(maximum_full_snapshot_archives_to_retain.get());
2145 trace!(
2146 "There are {} full snapshot archives, retaining {}",
2147 full_snapshot_archives.len(),
2148 num_to_retain,
2149 );
2150
2151 let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
2152 if full_snapshot_archives.is_empty() {
2153 None
2154 } else {
2155 Some(full_snapshot_archives.split_at(num_to_retain))
2156 }
2157 .unwrap_or_default();
2158
2159 let retained_full_snapshot_slots = full_snapshot_archives_to_retain
2160 .iter()
2161 .map(|ai| ai.slot())
2162 .collect::<HashSet<_>>();
2163
2164 fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
2165 for path in archives.iter().map(|a| a.path()) {
2166 trace!("Removing snapshot archive: {}", path.display());
2167 let result = fs::remove_file(path);
2168 if let Err(err) = result {
2169 info!(
2170 "Failed to remove snapshot archive '{}': {err}",
2171 path.display()
2172 );
2173 }
2174 }
2175 }
2176 remove_archives(full_snapshot_archives_to_remove);
2177
2178 info!(
2179 "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
2180 incremental_snapshot_archives_dir.as_ref().display(),
2181 maximum_incremental_snapshot_archives_to_retain
2182 );
2183 let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
2184 for incremental_snapshot_archive in
2185 get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
2186 {
2187 incremental_snapshot_archives_by_base_slot
2188 .entry(incremental_snapshot_archive.base_slot())
2189 .or_default()
2190 .push(incremental_snapshot_archive)
2191 }
2192
2193 let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
2194 for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
2195 {
2196 incremental_snapshot_archives.sort_unstable();
2197 let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
2198 maximum_incremental_snapshot_archives_to_retain.get()
2199 } else {
2200 usize::from(retained_full_snapshot_slots.contains(&base_slot))
2201 };
2202 trace!(
2203 "There are {} incremental snapshot archives for base slot {}, removing {} of them",
2204 incremental_snapshot_archives.len(),
2205 base_slot,
2206 incremental_snapshot_archives
2207 .len()
2208 .saturating_sub(num_to_retain),
2209 );
2210
2211 incremental_snapshot_archives.truncate(
2212 incremental_snapshot_archives
2213 .len()
2214 .saturating_sub(num_to_retain),
2215 );
2216 remove_archives(&incremental_snapshot_archives);
2217 }
2218}
2219
2220#[cfg(feature = "dev-context-only-utils")]
2221fn unpack_snapshot_local(
2222 shared_buffer: SharedBuffer,
2223 ledger_dir: &Path,
2224 account_paths: &[PathBuf],
2225 parallel_divisions: usize,
2226) -> Result<UnpackedAppendVecMap> {
2227 assert!(parallel_divisions > 0);
2228
2229 let readers = (0..parallel_divisions)
2231 .map(|_| SharedBufferReader::new(&shared_buffer))
2232 .collect::<Vec<_>>();
2233
2234 let all_unpacked_append_vec_map = readers
2236 .into_par_iter()
2237 .enumerate()
2238 .map(|(index, reader)| {
2239 let parallel_selector = Some(ParallelSelector {
2240 index,
2241 divisions: parallel_divisions,
2242 });
2243 let mut archive = Archive::new(reader);
2244 hardened_unpack::unpack_snapshot(
2245 &mut archive,
2246 ledger_dir,
2247 account_paths,
2248 parallel_selector,
2249 )
2250 })
2251 .collect::<Vec<_>>();
2252
2253 let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
2254 for h in all_unpacked_append_vec_map {
2255 unpacked_append_vec_map.extend(h?);
2256 }
2257
2258 Ok(unpacked_append_vec_map)
2259}
2260
2261fn untar_snapshot_create_shared_buffer(
2262 snapshot_tar: &Path,
2263 archive_format: ArchiveFormat,
2264) -> SharedBuffer {
2265 let open_file = || {
2266 fs::File::open(snapshot_tar)
2267 .map_err(|err| {
2268 IoError::other(format!(
2269 "failed to open snapshot archive '{}': {err}",
2270 snapshot_tar.display(),
2271 ))
2272 })
2273 .unwrap()
2274 };
2275 match archive_format {
2276 ArchiveFormat::TarBzip2 => SharedBuffer::new(BzDecoder::new(BufReader::new(open_file()))),
2277 ArchiveFormat::TarGzip => SharedBuffer::new(GzDecoder::new(BufReader::new(open_file()))),
2278 ArchiveFormat::TarZstd { .. } => SharedBuffer::new(
2279 zstd::stream::read::Decoder::new(BufReader::new(open_file())).unwrap(),
2280 ),
2281 ArchiveFormat::TarLz4 => {
2282 SharedBuffer::new(lz4::Decoder::new(BufReader::new(open_file())).unwrap())
2283 }
2284 ArchiveFormat::Tar => SharedBuffer::new(BufReader::new(open_file())),
2285 }
2286}
2287
2288#[cfg(feature = "dev-context-only-utils")]
2289fn untar_snapshot_in(
2290 snapshot_tar: impl AsRef<Path>,
2291 unpack_dir: &Path,
2292 account_paths: &[PathBuf],
2293 archive_format: ArchiveFormat,
2294 parallel_divisions: usize,
2295) -> Result<UnpackedAppendVecMap> {
2296 let shared_buffer = untar_snapshot_create_shared_buffer(snapshot_tar.as_ref(), archive_format);
2297 unpack_snapshot_local(shared_buffer, unpack_dir, account_paths, parallel_divisions)
2298}
2299
2300pub fn verify_unpacked_snapshots_dir_and_version(
2301 unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
2302) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
2303 info!(
2304 "snapshot version: {}",
2305 &unpacked_snapshots_dir_and_version.snapshot_version
2306 );
2307
2308 let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
2309 let mut bank_snapshots =
2310 get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
2311 if bank_snapshots.len() > 1 {
2312 return Err(IoError::other(format!(
2313 "invalid snapshot format: only one snapshot allowed, but found {}",
2314 bank_snapshots.len(),
2315 ))
2316 .into());
2317 }
2318 let root_paths = bank_snapshots.pop().ok_or_else(|| {
2319 IoError::other(format!(
2320 "no snapshots found in snapshots directory '{}'",
2321 unpacked_snapshots_dir_and_version
2322 .unpacked_snapshots_dir
2323 .display(),
2324 ))
2325 })?;
2326 Ok((snapshot_version, root_paths))
2327}
2328
2329pub fn get_snapshot_file_name(slot: Slot) -> String {
2331 slot.to_string()
2332}
2333
2334pub fn get_bank_snapshot_dir(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) -> PathBuf {
2336 bank_snapshots_dir
2337 .as_ref()
2338 .join(get_snapshot_file_name(slot))
2339}
2340
2341#[derive(Debug, Copy, Clone)]
2342pub enum VerifyBank {
2344 Deterministic,
2346 NonDeterministic,
2349}
2350
2351#[cfg(feature = "dev-context-only-utils")]
2352pub fn verify_snapshot_archive(
2353 snapshot_archive: impl AsRef<Path>,
2354 snapshots_to_verify: impl AsRef<Path>,
2355 archive_format: ArchiveFormat,
2356 verify_bank: VerifyBank,
2357 slot: Slot,
2358) {
2359 let temp_dir = tempfile::TempDir::new().unwrap();
2360 let unpack_dir = temp_dir.path();
2361 let unpack_account_dir = create_accounts_run_and_snapshot_dirs(unpack_dir).unwrap().0;
2362 untar_snapshot_in(
2363 snapshot_archive,
2364 unpack_dir,
2365 &[unpack_account_dir.clone()],
2366 archive_format,
2367 1,
2368 )
2369 .unwrap();
2370
2371 let unpacked_snapshots = unpack_dir.join("snapshots");
2373
2374 let storages_to_verify = unpack_dir.join("storages_to_verify");
2377 fs::create_dir_all(&storages_to_verify).unwrap();
2379
2380 let slot = slot.to_string();
2381 let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
2382
2383 if let VerifyBank::NonDeterministic = verify_bank {
2384 let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
2386 let p2 = unpacked_snapshots.join(&slot).join(&slot);
2387 assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
2388 fs::remove_file(p1).unwrap();
2389 fs::remove_file(p2).unwrap();
2390 }
2391
2392 let existing_unpacked_status_cache_file =
2398 unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2399 let new_unpacked_status_cache_file = unpacked_snapshots
2400 .join(&slot)
2401 .join(SNAPSHOT_STATUS_CACHE_FILENAME);
2402 fs::rename(
2403 existing_unpacked_status_cache_file,
2404 new_unpacked_status_cache_file,
2405 )
2406 .unwrap();
2407
2408 let accounts_hardlinks_dir = snapshot_slot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2409 if accounts_hardlinks_dir.is_dir() {
2410 for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
2412 let link_dst_path = fs::read_link(entry.unwrap().path()).unwrap();
2413 for entry in fs::read_dir(&link_dst_path).unwrap() {
2415 let src_path = entry.unwrap().path();
2416 let dst_path = storages_to_verify.join(src_path.file_name().unwrap());
2417 fs::copy(src_path, dst_path).unwrap();
2418 }
2419 }
2420 fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
2421 }
2422
2423 let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
2424 if version_path.is_file() {
2425 fs::remove_file(version_path).unwrap();
2426 }
2427
2428 let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2429 if state_complete_path.is_file() {
2430 fs::remove_file(state_complete_path).unwrap();
2431 }
2432
2433 assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
2434
2435 _ = fs::remove_dir(unpack_account_dir.join("accounts"));
2441 assert!(!dir_diff::is_different(&storages_to_verify, unpack_account_dir).unwrap());
2443}
2444
2445pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
2447 let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2448 purge_bank_snapshots(&bank_snapshots);
2449}
2450
2451pub fn purge_old_bank_snapshots(
2453 bank_snapshots_dir: impl AsRef<Path>,
2454 num_bank_snapshots_to_retain: usize,
2455 filter_by_kind: Option<BankSnapshotKind>,
2456) {
2457 let mut bank_snapshots = match filter_by_kind {
2458 Some(BankSnapshotKind::Pre) => get_bank_snapshots_pre(&bank_snapshots_dir),
2459 Some(BankSnapshotKind::Post) => get_bank_snapshots_post(&bank_snapshots_dir),
2460 None => get_bank_snapshots(&bank_snapshots_dir),
2461 };
2462
2463 bank_snapshots.sort_unstable();
2464 purge_bank_snapshots(
2465 bank_snapshots
2466 .iter()
2467 .rev()
2468 .skip(num_bank_snapshots_to_retain),
2469 );
2470}
2471
2472pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
2477 purge_old_bank_snapshots(&bank_snapshots_dir, 0, Some(BankSnapshotKind::Pre));
2478 purge_old_bank_snapshots(&bank_snapshots_dir, 1, Some(BankSnapshotKind::Post));
2479
2480 let highest_bank_snapshot_post = get_highest_bank_snapshot_post(&bank_snapshots_dir);
2481 if let Some(highest_bank_snapshot_post) = highest_bank_snapshot_post {
2482 debug!(
2483 "Retained bank snapshot for slot {}, and purged the rest.",
2484 highest_bank_snapshot_post.slot
2485 );
2486 }
2487}
2488
2489pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
2491 let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2492 bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
2493 purge_bank_snapshots(&bank_snapshots);
2494}
2495
2496fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
2500 for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
2501 if purge_bank_snapshot(snapshot_dir).is_err() {
2502 warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
2503 }
2504 }
2505}
2506
2507pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
2509 const FN_ERR: &str = "failed to purge bank snapshot";
2510 let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2511 if accounts_hardlinks_dir.is_dir() {
2512 let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
2515 IoError::other(format!(
2516 "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
2517 accounts_hardlinks_dir.display(),
2518 ))
2519 })?;
2520 for entry in read_dir {
2521 let accounts_hardlink_dir = entry?.path();
2522 let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
2523 IoError::other(format!(
2524 "{FN_ERR}: failed to read symlink '{}': {err}",
2525 accounts_hardlink_dir.display(),
2526 ))
2527 })?;
2528 move_and_async_delete_path(&accounts_hardlink_dir);
2529 }
2530 }
2531 fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
2532 IoError::other(format!(
2533 "{FN_ERR}: failed to remove dir '{}': {err}",
2534 bank_snapshot_dir.as_ref().display(),
2535 ))
2536 })?;
2537 Ok(())
2538}
2539
2540pub fn should_take_full_snapshot(
2541 block_height: Slot,
2542 full_snapshot_archive_interval_slots: Slot,
2543) -> bool {
2544 block_height % full_snapshot_archive_interval_slots == 0
2545}
2546
2547pub fn should_take_incremental_snapshot(
2548 block_height: Slot,
2549 incremental_snapshot_archive_interval_slots: Slot,
2550 latest_full_snapshot_slot: Option<Slot>,
2551) -> bool {
2552 block_height % incremental_snapshot_archive_interval_slots == 0
2553 && latest_full_snapshot_slot.is_some()
2554}
2555
2556#[cfg(feature = "dev-context-only-utils")]
2561pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
2562 let tmp_dir = tempfile::TempDir::new().unwrap();
2563 let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
2564 (tmp_dir, account_dir)
2565}
2566
2567#[cfg(test)]
2568mod tests {
2569 use {
2570 super::*,
2571 assert_matches::assert_matches,
2572 bincode::{deserialize_from, serialize_into},
2573 std::{convert::TryFrom, mem::size_of},
2574 tempfile::NamedTempFile,
2575 };
2576
2577 #[test]
2578 fn test_serialize_snapshot_data_file_under_limit() {
2579 let temp_dir = tempfile::TempDir::new().unwrap();
2580 let expected_consumed_size = size_of::<u32>() as u64;
2581 let consumed_size = serialize_snapshot_data_file_capped(
2582 &temp_dir.path().join("data-file"),
2583 expected_consumed_size,
2584 |stream| {
2585 serialize_into(stream, &2323_u32)?;
2586 Ok(())
2587 },
2588 )
2589 .unwrap();
2590 assert_eq!(consumed_size, expected_consumed_size);
2591 }
2592
2593 #[test]
2594 fn test_serialize_snapshot_data_file_over_limit() {
2595 let temp_dir = tempfile::TempDir::new().unwrap();
2596 let expected_consumed_size = size_of::<u32>() as u64;
2597 let result = serialize_snapshot_data_file_capped(
2598 &temp_dir.path().join("data-file"),
2599 expected_consumed_size - 1,
2600 |stream| {
2601 serialize_into(stream, &2323_u32)?;
2602 Ok(())
2603 },
2604 );
2605 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
2606 }
2607
2608 #[test]
2609 fn test_deserialize_snapshot_data_file_under_limit() {
2610 let expected_data = 2323_u32;
2611 let expected_consumed_size = size_of::<u32>() as u64;
2612
2613 let temp_dir = tempfile::TempDir::new().unwrap();
2614 serialize_snapshot_data_file_capped(
2615 &temp_dir.path().join("data-file"),
2616 expected_consumed_size,
2617 |stream| {
2618 serialize_into(stream, &expected_data)?;
2619 Ok(())
2620 },
2621 )
2622 .unwrap();
2623
2624 let snapshot_root_paths = SnapshotRootPaths {
2625 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2626 incremental_snapshot_root_file_path: None,
2627 };
2628
2629 let actual_data = deserialize_snapshot_data_files_capped(
2630 &snapshot_root_paths,
2631 expected_consumed_size,
2632 |stream| {
2633 Ok(deserialize_from::<_, u32>(
2634 &mut stream.full_snapshot_stream,
2635 )?)
2636 },
2637 )
2638 .unwrap();
2639 assert_eq!(actual_data, expected_data);
2640 }
2641
2642 #[test]
2643 fn test_deserialize_snapshot_data_file_over_limit() {
2644 let expected_data = 2323_u32;
2645 let expected_consumed_size = size_of::<u32>() as u64;
2646
2647 let temp_dir = tempfile::TempDir::new().unwrap();
2648 serialize_snapshot_data_file_capped(
2649 &temp_dir.path().join("data-file"),
2650 expected_consumed_size,
2651 |stream| {
2652 serialize_into(stream, &expected_data)?;
2653 Ok(())
2654 },
2655 )
2656 .unwrap();
2657
2658 let snapshot_root_paths = SnapshotRootPaths {
2659 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2660 incremental_snapshot_root_file_path: None,
2661 };
2662
2663 let result = deserialize_snapshot_data_files_capped(
2664 &snapshot_root_paths,
2665 expected_consumed_size - 1,
2666 |stream| {
2667 Ok(deserialize_from::<_, u32>(
2668 &mut stream.full_snapshot_stream,
2669 )?)
2670 },
2671 );
2672 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
2673 }
2674
2675 #[test]
2676 fn test_deserialize_snapshot_data_file_extra_data() {
2677 let expected_data = 2323_u32;
2678 let expected_consumed_size = size_of::<u32>() as u64;
2679
2680 let temp_dir = tempfile::TempDir::new().unwrap();
2681 serialize_snapshot_data_file_capped(
2682 &temp_dir.path().join("data-file"),
2683 expected_consumed_size * 2,
2684 |stream| {
2685 serialize_into(stream.by_ref(), &expected_data)?;
2686 serialize_into(stream.by_ref(), &expected_data)?;
2687 Ok(())
2688 },
2689 )
2690 .unwrap();
2691
2692 let snapshot_root_paths = SnapshotRootPaths {
2693 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2694 incremental_snapshot_root_file_path: None,
2695 };
2696
2697 let result = deserialize_snapshot_data_files_capped(
2698 &snapshot_root_paths,
2699 expected_consumed_size * 2,
2700 |stream| {
2701 Ok(deserialize_from::<_, u32>(
2702 &mut stream.full_snapshot_stream,
2703 )?)
2704 },
2705 );
2706 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2707 }
2708
2709 #[test]
2710 fn test_snapshot_version_from_file_under_limit() {
2711 let file_content = SnapshotVersion::default().as_str();
2712 let mut file = NamedTempFile::new().unwrap();
2713 file.write_all(file_content.as_bytes()).unwrap();
2714 let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2715 assert_eq!(version_from_file, file_content);
2716 }
2717
2718 #[test]
2719 fn test_snapshot_version_from_file_over_limit() {
2720 let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2721 let file_content = vec![7u8; over_limit_size];
2722 let mut file = NamedTempFile::new().unwrap();
2723 file.write_all(&file_content).unwrap();
2724 assert_matches!(
2725 snapshot_version_from_file(file.path()),
2726 Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("snapshot version file too large")
2727 );
2728 }
2729
2730 #[test]
2731 fn test_parse_full_snapshot_archive_filename() {
2732 assert_eq!(
2733 parse_full_snapshot_archive_filename(&format!(
2734 "snapshot-42-{}.tar.bz2",
2735 Hash::default()
2736 ))
2737 .unwrap(),
2738 (42, SnapshotHash(Hash::default()), ArchiveFormat::TarBzip2)
2739 );
2740 assert_eq!(
2741 parse_full_snapshot_archive_filename(&format!(
2742 "snapshot-43-{}.tar.zst",
2743 Hash::default()
2744 ))
2745 .unwrap(),
2746 (
2747 43,
2748 SnapshotHash(Hash::default()),
2749 ArchiveFormat::TarZstd {
2750 config: ZstdConfig::default(),
2751 }
2752 )
2753 );
2754 assert_eq!(
2755 parse_full_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default()))
2756 .unwrap(),
2757 (44, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2758 );
2759 assert_eq!(
2760 parse_full_snapshot_archive_filename(&format!(
2761 "snapshot-45-{}.tar.lz4",
2762 Hash::default()
2763 ))
2764 .unwrap(),
2765 (45, SnapshotHash(Hash::default()), ArchiveFormat::TarLz4)
2766 );
2767
2768 assert!(parse_full_snapshot_archive_filename("invalid").is_err());
2769 assert!(
2770 parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err()
2771 );
2772
2773 assert!(
2774 parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err()
2775 );
2776 assert!(parse_full_snapshot_archive_filename(&format!(
2777 "snapshot-12345678-{}.bad!ext",
2778 Hash::new_unique()
2779 ))
2780 .is_err());
2781 assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2782
2783 assert!(parse_full_snapshot_archive_filename(&format!(
2784 "snapshot-bad!slot-{}.bad!ext",
2785 Hash::new_unique()
2786 ))
2787 .is_err());
2788 assert!(parse_full_snapshot_archive_filename(&format!(
2789 "snapshot-12345678-{}.bad!ext",
2790 Hash::new_unique()
2791 ))
2792 .is_err());
2793 assert!(parse_full_snapshot_archive_filename(&format!(
2794 "snapshot-bad!slot-{}.tar",
2795 Hash::new_unique()
2796 ))
2797 .is_err());
2798
2799 assert!(parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_err());
2800 assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2801 assert!(parse_full_snapshot_archive_filename(&format!(
2802 "snapshot-bad!slot-{}.tar",
2803 Hash::new_unique()
2804 ))
2805 .is_err());
2806 }
2807
2808 #[test]
2809 fn test_parse_incremental_snapshot_archive_filename() {
2810 assert_eq!(
2811 parse_incremental_snapshot_archive_filename(&format!(
2812 "incremental-snapshot-42-123-{}.tar.bz2",
2813 Hash::default()
2814 ))
2815 .unwrap(),
2816 (
2817 42,
2818 123,
2819 SnapshotHash(Hash::default()),
2820 ArchiveFormat::TarBzip2
2821 )
2822 );
2823 assert_eq!(
2824 parse_incremental_snapshot_archive_filename(&format!(
2825 "incremental-snapshot-43-234-{}.tar.zst",
2826 Hash::default()
2827 ))
2828 .unwrap(),
2829 (
2830 43,
2831 234,
2832 SnapshotHash(Hash::default()),
2833 ArchiveFormat::TarZstd {
2834 config: ZstdConfig::default(),
2835 }
2836 )
2837 );
2838 assert_eq!(
2839 parse_incremental_snapshot_archive_filename(&format!(
2840 "incremental-snapshot-44-345-{}.tar",
2841 Hash::default()
2842 ))
2843 .unwrap(),
2844 (44, 345, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2845 );
2846 assert_eq!(
2847 parse_incremental_snapshot_archive_filename(&format!(
2848 "incremental-snapshot-45-456-{}.tar.lz4",
2849 Hash::default()
2850 ))
2851 .unwrap(),
2852 (
2853 45,
2854 456,
2855 SnapshotHash(Hash::default()),
2856 ArchiveFormat::TarLz4
2857 )
2858 );
2859
2860 assert!(parse_incremental_snapshot_archive_filename("invalid").is_err());
2861 assert!(parse_incremental_snapshot_archive_filename(&format!(
2862 "snapshot-42-{}.tar",
2863 Hash::new_unique()
2864 ))
2865 .is_err());
2866 assert!(parse_incremental_snapshot_archive_filename(
2867 "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext"
2868 )
2869 .is_err());
2870
2871 assert!(parse_incremental_snapshot_archive_filename(&format!(
2872 "incremental-snapshot-bad!slot-56785678-{}.tar",
2873 Hash::new_unique()
2874 ))
2875 .is_err());
2876
2877 assert!(parse_incremental_snapshot_archive_filename(&format!(
2878 "incremental-snapshot-12345678-bad!slot-{}.tar",
2879 Hash::new_unique()
2880 ))
2881 .is_err());
2882
2883 assert!(parse_incremental_snapshot_archive_filename(
2884 "incremental-snapshot-12341234-56785678-bad!HASH.tar"
2885 )
2886 .is_err());
2887
2888 assert!(parse_incremental_snapshot_archive_filename(&format!(
2889 "incremental-snapshot-12341234-56785678-{}.bad!ext",
2890 Hash::new_unique()
2891 ))
2892 .is_err());
2893 }
2894
2895 #[test]
2896 fn test_check_are_snapshots_compatible() {
2897 let slot1: Slot = 1234;
2898 let slot2: Slot = 5678;
2899 let slot3: Slot = 999_999;
2900
2901 let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
2902 format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()),
2903 ))
2904 .unwrap();
2905
2906 assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
2907
2908 let incremental_snapshot_archive_info =
2909 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2910 "/dir/incremental-snapshot-{}-{}-{}.tar",
2911 slot1,
2912 slot2,
2913 Hash::new_unique()
2914 )))
2915 .unwrap();
2916
2917 assert!(check_are_snapshots_compatible(
2918 &full_snapshot_archive_info,
2919 Some(&incremental_snapshot_archive_info)
2920 )
2921 .is_ok());
2922
2923 let incremental_snapshot_archive_info =
2924 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2925 "/dir/incremental-snapshot-{}-{}-{}.tar",
2926 slot2,
2927 slot3,
2928 Hash::new_unique()
2929 )))
2930 .unwrap();
2931
2932 assert!(check_are_snapshots_compatible(
2933 &full_snapshot_archive_info,
2934 Some(&incremental_snapshot_archive_info)
2935 )
2936 .is_err());
2937 }
2938
2939 fn common_create_bank_snapshot_files(
2941 bank_snapshots_dir: &Path,
2942 min_slot: Slot,
2943 max_slot: Slot,
2944 ) {
2945 for slot in min_slot..max_slot {
2946 let snapshot_dir = get_bank_snapshot_dir(bank_snapshots_dir, slot);
2947 fs::create_dir_all(&snapshot_dir).unwrap();
2948
2949 let snapshot_filename = get_snapshot_file_name(slot);
2950 let snapshot_path = snapshot_dir.join(snapshot_filename);
2951 fs::File::create(snapshot_path).unwrap();
2952
2953 let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2954 fs::File::create(status_cache_file).unwrap();
2955
2956 let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
2957 fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
2958
2959 let state_complete_path = snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2961 fs::File::create(state_complete_path).unwrap();
2962 }
2963 }
2964
2965 #[test]
2966 fn test_get_bank_snapshots() {
2967 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2968 let min_slot = 10;
2969 let max_slot = 20;
2970 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2971
2972 let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
2973 assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
2974 }
2975
2976 #[test]
2977 fn test_get_highest_bank_snapshot_post() {
2978 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2979 let min_slot = 99;
2980 let max_slot = 123;
2981 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2982
2983 let highest_bank_snapshot = get_highest_bank_snapshot_post(temp_snapshots_dir.path());
2984 assert!(highest_bank_snapshot.is_some());
2985 assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
2986 }
2987
2988 fn common_create_snapshot_archive_files(
2994 full_snapshot_archives_dir: &Path,
2995 incremental_snapshot_archives_dir: &Path,
2996 min_full_snapshot_slot: Slot,
2997 max_full_snapshot_slot: Slot,
2998 min_incremental_snapshot_slot: Slot,
2999 max_incremental_snapshot_slot: Slot,
3000 ) {
3001 fs::create_dir_all(full_snapshot_archives_dir).unwrap();
3002 fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
3003 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3004 for incremental_snapshot_slot in
3005 min_incremental_snapshot_slot..max_incremental_snapshot_slot
3006 {
3007 let snapshot_filename = format!(
3008 "incremental-snapshot-{}-{}-{}.tar",
3009 full_snapshot_slot,
3010 incremental_snapshot_slot,
3011 Hash::default()
3012 );
3013 let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
3014 fs::File::create(snapshot_filepath).unwrap();
3015 }
3016
3017 let snapshot_filename =
3018 format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3019 let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
3020 fs::File::create(snapshot_filepath).unwrap();
3021
3022 let bad_filename = format!(
3024 "incremental-snapshot-{}-{}-bad!hash.tar",
3025 full_snapshot_slot,
3026 max_incremental_snapshot_slot + 1,
3027 );
3028 let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
3029 fs::File::create(bad_filepath).unwrap();
3030 }
3031
3032 let bad_filename = format!("snapshot-{}-bad!hash.tar", max_full_snapshot_slot + 1);
3035 let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
3036 fs::File::create(bad_filepath).unwrap();
3037 }
3038
3039 #[test]
3040 fn test_get_full_snapshot_archives() {
3041 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3042 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3043 let min_slot = 123;
3044 let max_slot = 456;
3045 common_create_snapshot_archive_files(
3046 full_snapshot_archives_dir.path(),
3047 incremental_snapshot_archives_dir.path(),
3048 min_slot,
3049 max_slot,
3050 0,
3051 0,
3052 );
3053
3054 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3055 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3056 }
3057
3058 #[test]
3059 fn test_get_full_snapshot_archives_remote() {
3060 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3061 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3062 let min_slot = 123;
3063 let max_slot = 456;
3064 common_create_snapshot_archive_files(
3065 &full_snapshot_archives_dir
3066 .path()
3067 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3068 &incremental_snapshot_archives_dir
3069 .path()
3070 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3071 min_slot,
3072 max_slot,
3073 0,
3074 0,
3075 );
3076
3077 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3078 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3079 assert!(snapshot_archives.iter().all(|info| info.is_remote()));
3080 }
3081
3082 #[test]
3083 fn test_get_incremental_snapshot_archives() {
3084 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3085 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3086 let min_full_snapshot_slot = 12;
3087 let max_full_snapshot_slot = 23;
3088 let min_incremental_snapshot_slot = 34;
3089 let max_incremental_snapshot_slot = 45;
3090 common_create_snapshot_archive_files(
3091 full_snapshot_archives_dir.path(),
3092 incremental_snapshot_archives_dir.path(),
3093 min_full_snapshot_slot,
3094 max_full_snapshot_slot,
3095 min_incremental_snapshot_slot,
3096 max_incremental_snapshot_slot,
3097 );
3098
3099 let incremental_snapshot_archives =
3100 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3101 assert_eq!(
3102 incremental_snapshot_archives.len() as Slot,
3103 (max_full_snapshot_slot - min_full_snapshot_slot)
3104 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3105 );
3106 }
3107
3108 #[test]
3109 fn test_get_incremental_snapshot_archives_remote() {
3110 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3111 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3112 let min_full_snapshot_slot = 12;
3113 let max_full_snapshot_slot = 23;
3114 let min_incremental_snapshot_slot = 34;
3115 let max_incremental_snapshot_slot = 45;
3116 common_create_snapshot_archive_files(
3117 &full_snapshot_archives_dir
3118 .path()
3119 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3120 &incremental_snapshot_archives_dir
3121 .path()
3122 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3123 min_full_snapshot_slot,
3124 max_full_snapshot_slot,
3125 min_incremental_snapshot_slot,
3126 max_incremental_snapshot_slot,
3127 );
3128
3129 let incremental_snapshot_archives =
3130 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3131 assert_eq!(
3132 incremental_snapshot_archives.len() as Slot,
3133 (max_full_snapshot_slot - min_full_snapshot_slot)
3134 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3135 );
3136 assert!(incremental_snapshot_archives
3137 .iter()
3138 .all(|info| info.is_remote()));
3139 }
3140
3141 #[test]
3142 fn test_get_highest_full_snapshot_archive_slot() {
3143 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3144 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3145 let min_slot = 123;
3146 let max_slot = 456;
3147 common_create_snapshot_archive_files(
3148 full_snapshot_archives_dir.path(),
3149 incremental_snapshot_archives_dir.path(),
3150 min_slot,
3151 max_slot,
3152 0,
3153 0,
3154 );
3155
3156 assert_eq!(
3157 get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
3158 Some(max_slot - 1)
3159 );
3160 }
3161
3162 #[test]
3163 fn test_get_highest_incremental_snapshot_slot() {
3164 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3165 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3166 let min_full_snapshot_slot = 12;
3167 let max_full_snapshot_slot = 23;
3168 let min_incremental_snapshot_slot = 34;
3169 let max_incremental_snapshot_slot = 45;
3170 common_create_snapshot_archive_files(
3171 full_snapshot_archives_dir.path(),
3172 incremental_snapshot_archives_dir.path(),
3173 min_full_snapshot_slot,
3174 max_full_snapshot_slot,
3175 min_incremental_snapshot_slot,
3176 max_incremental_snapshot_slot,
3177 );
3178
3179 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3180 assert_eq!(
3181 get_highest_incremental_snapshot_archive_slot(
3182 incremental_snapshot_archives_dir.path(),
3183 full_snapshot_slot
3184 ),
3185 Some(max_incremental_snapshot_slot - 1)
3186 );
3187 }
3188
3189 assert_eq!(
3190 get_highest_incremental_snapshot_archive_slot(
3191 incremental_snapshot_archives_dir.path(),
3192 max_full_snapshot_slot
3193 ),
3194 None
3195 );
3196 }
3197
3198 fn common_test_purge_old_snapshot_archives(
3199 snapshot_names: &[&String],
3200 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
3201 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
3202 expected_snapshots: &[&String],
3203 ) {
3204 let temp_snap_dir = tempfile::TempDir::new().unwrap();
3205
3206 for snap_name in snapshot_names {
3207 let snap_path = temp_snap_dir.path().join(snap_name);
3208 let mut _snap_file = fs::File::create(snap_path);
3209 }
3210 purge_old_snapshot_archives(
3211 temp_snap_dir.path(),
3212 temp_snap_dir.path(),
3213 maximum_full_snapshot_archives_to_retain,
3214 maximum_incremental_snapshot_archives_to_retain,
3215 );
3216
3217 let mut retained_snaps = HashSet::new();
3218 for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
3219 let entry_path_buf = entry.unwrap().path();
3220 let entry_path = entry_path_buf.as_path();
3221 let snapshot_name = entry_path
3222 .file_name()
3223 .unwrap()
3224 .to_str()
3225 .unwrap()
3226 .to_string();
3227 retained_snaps.insert(snapshot_name);
3228 }
3229
3230 for snap_name in expected_snapshots {
3231 assert!(
3232 retained_snaps.contains(snap_name.as_str()),
3233 "{snap_name} not found"
3234 );
3235 }
3236 assert_eq!(retained_snaps.len(), expected_snapshots.len());
3237 }
3238
3239 #[test]
3240 fn test_purge_old_full_snapshot_archives() {
3241 let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
3242 let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
3243 let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
3244 let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
3245
3246 let expected_snapshots = vec![&snap3_name];
3248 common_test_purge_old_snapshot_archives(
3249 &snapshot_names,
3250 NonZeroUsize::new(1).unwrap(),
3251 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3252 &expected_snapshots,
3253 );
3254
3255 let expected_snapshots = vec![&snap2_name, &snap3_name];
3257 common_test_purge_old_snapshot_archives(
3258 &snapshot_names,
3259 NonZeroUsize::new(2).unwrap(),
3260 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3261 &expected_snapshots,
3262 );
3263
3264 let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
3266 common_test_purge_old_snapshot_archives(
3267 &snapshot_names,
3268 NonZeroUsize::new(3).unwrap(),
3269 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3270 &expected_snapshots,
3271 );
3272 }
3273
3274 #[test]
3278 fn test_purge_old_full_snapshot_archives_in_the_loop() {
3279 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3280 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3281 let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
3282 let starting_slot: Slot = 42;
3283
3284 for slot in (starting_slot..).take(100) {
3285 let full_snapshot_archive_file_name =
3286 format!("snapshot-{}-{}.tar", slot, Hash::default());
3287 let full_snapshot_archive_path = full_snapshot_archives_dir
3288 .as_ref()
3289 .join(full_snapshot_archive_file_name);
3290 fs::File::create(full_snapshot_archive_path).unwrap();
3291
3292 if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
3294 continue;
3295 }
3296
3297 if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
3299 continue;
3300 }
3301
3302 purge_old_snapshot_archives(
3303 &full_snapshot_archives_dir,
3304 &incremental_snapshot_archives_dir,
3305 maximum_snapshots_to_retain,
3306 NonZeroUsize::new(usize::MAX).unwrap(),
3307 );
3308 let mut full_snapshot_archives =
3309 get_full_snapshot_archives(&full_snapshot_archives_dir);
3310 full_snapshot_archives.sort_unstable();
3311 assert_eq!(
3312 full_snapshot_archives.len(),
3313 maximum_snapshots_to_retain.get()
3314 );
3315 assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
3316 for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
3317 assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
3318 }
3319 }
3320 }
3321
3322 #[test]
3323 fn test_purge_old_incremental_snapshot_archives() {
3324 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3325 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3326 let starting_slot = 100_000;
3327
3328 let maximum_incremental_snapshot_archives_to_retain =
3329 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3330 let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3331
3332 let incremental_snapshot_interval = 100;
3333 let num_incremental_snapshots_per_full_snapshot =
3334 maximum_incremental_snapshot_archives_to_retain.get() * 2;
3335 let full_snapshot_interval =
3336 incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
3337
3338 let mut snapshot_filenames = vec![];
3339 (starting_slot..)
3340 .step_by(full_snapshot_interval)
3341 .take(
3342 maximum_full_snapshot_archives_to_retain
3343 .checked_mul(NonZeroUsize::new(2).unwrap())
3344 .unwrap()
3345 .get(),
3346 )
3347 .for_each(|full_snapshot_slot| {
3348 let snapshot_filename =
3349 format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3350 let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
3351 fs::File::create(snapshot_path).unwrap();
3352 snapshot_filenames.push(snapshot_filename);
3353
3354 (full_snapshot_slot..)
3355 .step_by(incremental_snapshot_interval)
3356 .take(num_incremental_snapshots_per_full_snapshot)
3357 .skip(1)
3358 .for_each(|incremental_snapshot_slot| {
3359 let snapshot_filename = format!(
3360 "incremental-snapshot-{}-{}-{}.tar",
3361 full_snapshot_slot,
3362 incremental_snapshot_slot,
3363 Hash::default()
3364 );
3365 let snapshot_path = incremental_snapshot_archives_dir
3366 .path()
3367 .join(&snapshot_filename);
3368 fs::File::create(snapshot_path).unwrap();
3369 snapshot_filenames.push(snapshot_filename);
3370 });
3371 });
3372
3373 purge_old_snapshot_archives(
3374 full_snapshot_archives_dir.path(),
3375 incremental_snapshot_archives_dir.path(),
3376 maximum_full_snapshot_archives_to_retain,
3377 maximum_incremental_snapshot_archives_to_retain,
3378 );
3379
3380 let mut remaining_full_snapshot_archives =
3382 get_full_snapshot_archives(full_snapshot_archives_dir.path());
3383 assert_eq!(
3384 remaining_full_snapshot_archives.len(),
3385 maximum_full_snapshot_archives_to_retain.get(),
3386 );
3387 remaining_full_snapshot_archives.sort_unstable();
3388 let latest_full_snapshot_archive_slot =
3389 remaining_full_snapshot_archives.last().unwrap().slot();
3390
3391 let mut remaining_incremental_snapshot_archives =
3396 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3397 assert_eq!(
3398 remaining_incremental_snapshot_archives.len(),
3399 maximum_incremental_snapshot_archives_to_retain
3400 .get()
3401 .saturating_add(
3402 maximum_full_snapshot_archives_to_retain
3403 .get()
3404 .saturating_sub(1)
3405 )
3406 );
3407 remaining_incremental_snapshot_archives.sort_unstable();
3408 remaining_incremental_snapshot_archives.reverse();
3409
3410 for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
3412 let incremental_snapshot_archive =
3413 remaining_incremental_snapshot_archives.pop().unwrap();
3414
3415 let expected_base_slot =
3416 latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
3417 assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
3418 let expected_slot = expected_base_slot
3419 + (full_snapshot_interval - incremental_snapshot_interval) as u64;
3420 assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
3421 }
3422
3423 for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
3425 assert_eq!(
3426 incremental_snapshot_archive.base_slot(),
3427 latest_full_snapshot_archive_slot
3428 );
3429 }
3430
3431 let expected_remaining_incremental_snapshot_archive_slots =
3433 (latest_full_snapshot_archive_slot..)
3434 .step_by(incremental_snapshot_interval)
3435 .take(num_incremental_snapshots_per_full_snapshot)
3436 .skip(
3437 num_incremental_snapshots_per_full_snapshot
3438 - maximum_incremental_snapshot_archives_to_retain.get(),
3439 )
3440 .collect::<HashSet<_>>();
3441
3442 let actual_remaining_incremental_snapshot_archive_slots =
3443 remaining_incremental_snapshot_archives
3444 .iter()
3445 .map(|snapshot| snapshot.slot())
3446 .collect::<HashSet<_>>();
3447 assert_eq!(
3448 actual_remaining_incremental_snapshot_archive_slots,
3449 expected_remaining_incremental_snapshot_archive_slots
3450 );
3451 }
3452
3453 #[test]
3454 fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
3455 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3456 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3457
3458 for snapshot_filenames in [
3459 format!("incremental-snapshot-100-120-{}.tar", Hash::default()),
3460 format!("incremental-snapshot-100-140-{}.tar", Hash::default()),
3461 format!("incremental-snapshot-100-160-{}.tar", Hash::default()),
3462 format!("incremental-snapshot-100-180-{}.tar", Hash::default()),
3463 format!("incremental-snapshot-200-220-{}.tar", Hash::default()),
3464 format!("incremental-snapshot-200-240-{}.tar", Hash::default()),
3465 format!("incremental-snapshot-200-260-{}.tar", Hash::default()),
3466 format!("incremental-snapshot-200-280-{}.tar", Hash::default()),
3467 ] {
3468 let snapshot_path = incremental_snapshot_archives_dir
3469 .path()
3470 .join(snapshot_filenames);
3471 fs::File::create(snapshot_path).unwrap();
3472 }
3473
3474 purge_old_snapshot_archives(
3475 full_snapshot_archives_dir.path(),
3476 incremental_snapshot_archives_dir.path(),
3477 NonZeroUsize::new(usize::MAX).unwrap(),
3478 NonZeroUsize::new(usize::MAX).unwrap(),
3479 );
3480
3481 let remaining_incremental_snapshot_archives =
3482 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3483 assert!(remaining_incremental_snapshot_archives.is_empty());
3484 }
3485
3486 #[test]
3487 fn test_get_snapshot_accounts_hardlink_dir() {
3488 let slot: Slot = 1;
3489
3490 let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
3491
3492 let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
3493 let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
3494 let accounts_hardlinks_dir = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
3495 fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
3496
3497 let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
3498 let appendvec_filename = format!("{slot}.0");
3499 let appendvec_path = accounts_dir.join(appendvec_filename);
3500
3501 let ret = get_snapshot_accounts_hardlink_dir(
3502 &appendvec_path,
3503 slot,
3504 &mut account_paths_set,
3505 &accounts_hardlinks_dir,
3506 );
3507 assert!(ret.is_ok());
3508
3509 let wrong_appendvec_path = appendvec_path
3510 .parent()
3511 .unwrap()
3512 .parent()
3513 .unwrap()
3514 .join(appendvec_path.file_name().unwrap());
3515 let ret = get_snapshot_accounts_hardlink_dir(
3516 &wrong_appendvec_path,
3517 slot,
3518 &mut account_paths_set,
3519 accounts_hardlinks_dir,
3520 );
3521
3522 assert_matches!(
3523 ret,
3524 Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
3525 );
3526 }
3527
3528 #[test]
3529 fn test_full_snapshot_slot_file_good() {
3530 let slot_written = 123_456_789;
3531 let bank_snapshot_dir = TempDir::new().unwrap();
3532 write_full_snapshot_slot_file(&bank_snapshot_dir, slot_written).unwrap();
3533
3534 let slot_read = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap();
3535 assert_eq!(slot_read, slot_written);
3536 }
3537
3538 #[test]
3539 fn test_full_snapshot_slot_file_bad() {
3540 const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
3541 let too_small = [1u8; SLOT_SIZE - 1];
3542 let too_large = [1u8; SLOT_SIZE + 1];
3543
3544 for contents in [too_small.as_slice(), too_large.as_slice()] {
3545 let bank_snapshot_dir = TempDir::new().unwrap();
3546 let full_snapshot_slot_path = bank_snapshot_dir
3547 .as_ref()
3548 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
3549 fs::write(full_snapshot_slot_path, contents).unwrap();
3550
3551 let err = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap_err();
3552 assert!(err
3553 .to_string()
3554 .starts_with("invalid full snapshot slot file size"));
3555 }
3556 }
3557}