1use {
2 crate::{
3 bank::{BankFieldsToSerialize, BankHashStats, BankSlotDelta},
4 serde_snapshot::{
5 self, BankIncrementalSnapshotPersistence, ExtraFieldsToSerialize, SnapshotStreams,
6 },
7 snapshot_archive_info::{
8 FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfo,
9 SnapshotArchiveInfoGetter,
10 },
11 snapshot_bank_utils,
12 snapshot_config::SnapshotConfig,
13 snapshot_hash::SnapshotHash,
14 snapshot_package::{SnapshotKind, SnapshotPackage},
15 snapshot_utils::snapshot_storage_rebuilder::{
16 RebuiltSnapshotStorage, SnapshotStorageRebuilder,
17 },
18 },
19 bzip2::read::BzDecoder,
20 crossbeam_channel::Sender,
21 flate2::read::GzDecoder,
22 log::*,
23 regex::Regex,
24 solana_accounts_db::{
25 account_storage::AccountStorageMap,
26 account_storage_reader::AccountStorageReader,
27 accounts_db::{AccountStorageEntry, AtomicAccountsFileId},
28 accounts_file::{AccountsFile, AccountsFileError, StorageAccess},
29 accounts_hash::{AccountsDeltaHash, AccountsHash},
30 epoch_accounts_hash::EpochAccountsHash,
31 hardened_unpack::{self, ParallelSelector, UnpackError},
32 shared_buffer_reader::{SharedBuffer, SharedBufferReader},
33 utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
34 },
35 solana_clock::{Epoch, Slot},
36 solana_hash::Hash,
37 solana_measure::{measure::Measure, measure_time, measure_us},
38 std::{
39 cmp::Ordering,
40 collections::{HashMap, HashSet},
41 fmt, fs,
42 io::{BufReader, BufWriter, Error as IoError, Read, Result as IoResult, Seek, Write},
43 mem,
44 num::NonZeroUsize,
45 ops::RangeInclusive,
46 path::{Path, PathBuf},
47 process::ExitStatus,
48 str::FromStr,
49 sync::Arc,
50 thread::{Builder, JoinHandle},
51 },
52 tar::{self, Archive},
53 tempfile::TempDir,
54 thiserror::Error,
55};
56#[cfg(feature = "dev-context-only-utils")]
57use {
58 hardened_unpack::UnpackedAppendVecMap, rayon::prelude::*,
59 solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
60};
61
62mod archive_format;
63pub mod snapshot_storage_rebuilder;
64pub use archive_format::*;
65
66pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
67pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
68pub const SNAPSHOT_STATE_COMPLETE_FILENAME: &str = "state_complete";
69pub const SNAPSHOT_STORAGES_FLUSHED_FILENAME: &str = "storages_flushed";
70pub const SNAPSHOT_ACCOUNTS_HARDLINKS: &str = "accounts_hardlinks";
71pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
72pub const SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME: &str = "full_snapshot_slot";
73pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; const VERSION_STRING_V1_2_0: &str = "1.2.0";
76pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
77pub const BANK_SNAPSHOT_PRE_FILENAME_EXTENSION: &str = "pre";
78pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
83 NonZeroUsize::new(2).unwrap();
84pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
85 NonZeroUsize::new(4).unwrap();
86pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
87pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
88
89#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
90pub enum SnapshotVersion {
91 #[default]
92 V1_2_0,
93}
94
95impl fmt::Display for SnapshotVersion {
96 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
97 f.write_str(From::from(*self))
98 }
99}
100
101impl From<SnapshotVersion> for &'static str {
102 fn from(snapshot_version: SnapshotVersion) -> &'static str {
103 match snapshot_version {
104 SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
105 }
106 }
107}
108
109impl FromStr for SnapshotVersion {
110 type Err = &'static str;
111
112 fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
113 let version_string = if version_string
115 .get(..1)
116 .is_some_and(|s| s.eq_ignore_ascii_case("v"))
117 {
118 &version_string[1..]
119 } else {
120 version_string
121 };
122 match version_string {
123 VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
124 _ => Err("unsupported snapshot version"),
125 }
126 }
127}
128
129impl SnapshotVersion {
130 pub fn as_str(self) -> &'static str {
131 <&str as From<Self>>::from(self)
132 }
133}
134
135#[derive(PartialEq, Eq, Debug)]
138pub struct BankSnapshotInfo {
139 pub slot: Slot,
141 pub snapshot_kind: BankSnapshotKind,
143 pub snapshot_dir: PathBuf,
145 pub snapshot_version: SnapshotVersion,
147}
148
149impl PartialOrd for BankSnapshotInfo {
150 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
151 Some(self.cmp(other))
152 }
153}
154
155impl Ord for BankSnapshotInfo {
157 fn cmp(&self, other: &Self) -> Ordering {
158 self.slot.cmp(&other.slot)
159 }
160}
161
162impl BankSnapshotInfo {
163 pub fn new_from_dir(
164 bank_snapshots_dir: impl AsRef<Path>,
165 slot: Slot,
166 ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
167 let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
170
171 if !bank_snapshot_dir.is_dir() {
172 return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
173 bank_snapshot_dir,
174 ));
175 }
176
177 if !is_bank_snapshot_complete(&bank_snapshot_dir) {
183 return Err(SnapshotNewFromDirError::IncompleteDir(bank_snapshot_dir));
184 }
185
186 let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
187 if !status_cache_file.is_file() {
188 return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
189 status_cache_file,
190 ));
191 }
192
193 let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
194 let version_str = snapshot_version_from_file(&version_path).or(Err(
195 SnapshotNewFromDirError::MissingVersionFile(version_path),
196 ))?;
197 let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
198 .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
199
200 let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
201 let bank_snapshot_pre_path =
202 bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
203
204 let snapshot_kind = if bank_snapshot_pre_path.is_file() {
213 BankSnapshotKind::Pre
214 } else if bank_snapshot_post_path.is_file() {
215 BankSnapshotKind::Post
216 } else {
217 return Err(SnapshotNewFromDirError::MissingSnapshotFile(
218 bank_snapshot_dir,
219 ));
220 };
221
222 Ok(BankSnapshotInfo {
223 slot,
224 snapshot_kind,
225 snapshot_dir: bank_snapshot_dir,
226 snapshot_version,
227 })
228 }
229
230 pub fn snapshot_path(&self) -> PathBuf {
231 let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot));
232
233 let ext = match self.snapshot_kind {
234 BankSnapshotKind::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
235 BankSnapshotKind::Post => "",
236 };
237 bank_snapshot_path.set_extension(ext);
238
239 bank_snapshot_path
240 }
241}
242
243#[derive(Debug, Copy, Clone, Eq, PartialEq)]
252pub enum BankSnapshotKind {
253 Pre,
255 Post,
257}
258
259#[derive(Clone, Copy, Debug, Eq, PartialEq)]
263pub enum SnapshotFrom {
264 Archive,
266 Dir,
268}
269
270#[derive(Debug)]
273pub struct SnapshotRootPaths {
274 pub full_snapshot_root_file_path: PathBuf,
275 pub incremental_snapshot_root_file_path: Option<PathBuf>,
276}
277
278#[derive(Debug)]
280pub struct UnarchivedSnapshot {
281 #[allow(dead_code)]
282 unpack_dir: TempDir,
283 pub storage: AccountStorageMap,
284 pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
285 pub measure_untar: Measure,
286}
287
288#[derive(Debug)]
290pub struct UnpackedSnapshotsDirAndVersion {
291 pub unpacked_snapshots_dir: PathBuf,
292 pub snapshot_version: SnapshotVersion,
293}
294
295pub(crate) struct StorageAndNextAccountsFileId {
298 pub storage: AccountStorageMap,
299 pub next_append_vec_id: AtomicAccountsFileId,
300}
301
302#[derive(Error, Debug)]
303#[allow(clippy::large_enum_variant)]
304pub enum SnapshotError {
305 #[error("I/O error: {0}")]
306 Io(#[from] IoError),
307
308 #[error("AccountsFile error: {0}")]
309 AccountsFileError(#[from] AccountsFileError),
310
311 #[error("serialization error: {0}")]
312 Serialize(#[from] bincode::Error),
313
314 #[error("crossbeam send error: {0}")]
315 CrossbeamSend(#[from] crossbeam_channel::SendError<PathBuf>),
316
317 #[error("archive generation failure {0}")]
318 ArchiveGenerationFailure(ExitStatus),
319
320 #[error("Unpack error: {0}")]
321 UnpackError(#[from] UnpackError),
322
323 #[error("source({1}) - I/O error: {0}")]
324 IoWithSource(IoError, &'static str),
325
326 #[error("could not get file name from path '{0}'")]
327 PathToFileNameError(PathBuf),
328
329 #[error("could not get str from file name '{0}'")]
330 FileNameToStrError(PathBuf),
331
332 #[error("could not parse snapshot archive's file name '{0}'")]
333 ParseSnapshotArchiveFileNameError(String),
334
335 #[error("snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot ({1}) do not match")]
336 MismatchedBaseSlot(Slot, Slot),
337
338 #[error("no snapshot archives to load from '{0}'")]
339 NoSnapshotArchives(PathBuf),
340
341 #[error("snapshot slot mismatch: deserialized bank: {0}, snapshot archive: {1}")]
342 MismatchedSlot(Slot, Slot),
343
344 #[error("snapshot hash mismatch: deserialized bank: {0:?}, snapshot archive: {1:?}")]
345 MismatchedHash(SnapshotHash, SnapshotHash),
346
347 #[error("snapshot slot deltas are invalid: {0}")]
348 VerifySlotDeltas(#[from] VerifySlotDeltasError),
349
350 #[error("snapshot epoch stakes are invalid: {0}")]
351 VerifyEpochStakes(#[from] VerifyEpochStakesError),
352
353 #[error("bank_snapshot_info new_from_dir failed: {0}")]
354 NewFromDir(#[from] SnapshotNewFromDirError),
355
356 #[error("invalid snapshot dir path '{0}'")]
357 InvalidSnapshotDirPath(PathBuf),
358
359 #[error("invalid AppendVec path '{0}'")]
360 InvalidAppendVecPath(PathBuf),
361
362 #[error("invalid account path '{0}'")]
363 InvalidAccountPath(PathBuf),
364
365 #[error("no valid snapshot dir found under '{0}'")]
366 NoSnapshotSlotDir(PathBuf),
367
368 #[error("snapshot dir account paths mismatching")]
369 AccountPathsMismatch,
370
371 #[error("failed to add bank snapshot for slot {1}: {0}")]
372 AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
373
374 #[error("failed to archive snapshot package: {0}")]
375 ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
376
377 #[error("failed to rebuild snapshot storages: {0}")]
378 RebuildStorages(String),
379}
380
381#[derive(Error, Debug)]
382pub enum SnapshotNewFromDirError {
383 #[error("invalid bank snapshot directory '{0}'")]
384 InvalidBankSnapshotDir(PathBuf),
385
386 #[error("missing status cache file '{0}'")]
387 MissingStatusCacheFile(PathBuf),
388
389 #[error("missing version file '{0}'")]
390 MissingVersionFile(PathBuf),
391
392 #[error("invalid snapshot version '{0}'")]
393 InvalidVersion(String),
394
395 #[error("snapshot directory incomplete '{0}'")]
396 IncompleteDir(PathBuf),
397
398 #[error("missing snapshot file '{0}'")]
399 MissingSnapshotFile(PathBuf),
400}
401
402pub type Result<T> = std::result::Result<T, SnapshotError>;
403
404#[derive(Error, Debug, PartialEq, Eq)]
406pub enum VerifySlotDeltasError {
407 #[error("too many entries: {0} (max: {1})")]
408 TooManyEntries(usize, usize),
409
410 #[error("slot {0} is not a root")]
411 SlotIsNotRoot(Slot),
412
413 #[error("slot {0} is greater than bank slot {1}")]
414 SlotGreaterThanMaxRoot(Slot, Slot),
415
416 #[error("slot {0} has multiple entries")]
417 SlotHasMultipleEntries(Slot),
418
419 #[error("slot {0} was not found in slot history")]
420 SlotNotFoundInHistory(Slot),
421
422 #[error("slot {0} was in history but missing from slot deltas")]
423 SlotNotFoundInDeltas(Slot),
424
425 #[error("slot history is bad and cannot be used to verify slot deltas")]
426 BadSlotHistory,
427}
428
429#[derive(Error, Debug, PartialEq, Eq)]
431pub enum VerifyEpochStakesError {
432 #[error("epoch {0} is greater than the max {1}")]
433 EpochGreaterThanMax(Epoch, Epoch),
434
435 #[error("stakes not found for epoch {0} (required epochs: {1:?})")]
436 StakesNotFound(Epoch, RangeInclusive<Epoch>),
437}
438
439#[derive(Error, Debug)]
441pub enum AddBankSnapshotError {
442 #[error("bank snapshot dir already exists '{0}'")]
443 SnapshotDirAlreadyExists(PathBuf),
444
445 #[error("failed to create snapshot dir '{1}': {0}")]
446 CreateSnapshotDir(#[source] IoError, PathBuf),
447
448 #[error("failed to flush storage '{1}': {0}")]
449 FlushStorage(#[source] AccountsFileError, PathBuf),
450
451 #[error("failed to mark snapshot storages as 'flushed': {0}")]
452 MarkStoragesFlushed(#[source] IoError),
453
454 #[error("failed to hard link storages: {0}")]
455 HardLinkStorages(#[source] HardLinkStoragesToSnapshotError),
456
457 #[error("failed to serialize bank: {0}")]
458 SerializeBank(#[source] Box<SnapshotError>),
459
460 #[error("failed to serialize status cache: {0}")]
461 SerializeStatusCache(#[source] Box<SnapshotError>),
462
463 #[error("failed to write snapshot version file '{1}': {0}")]
464 WriteSnapshotVersionFile(#[source] IoError, PathBuf),
465
466 #[error("failed to mark snapshot as 'complete': {0}")]
467 MarkSnapshotComplete(#[source] IoError),
468}
469
470#[derive(Error, Debug)]
472pub enum ArchiveSnapshotPackageError {
473 #[error("failed to create archive path '{1}': {0}")]
474 CreateArchiveDir(#[source] IoError, PathBuf),
475
476 #[error("failed to create staging dir inside '{1}': {0}")]
477 CreateStagingDir(#[source] IoError, PathBuf),
478
479 #[error("failed to create snapshot staging dir '{1}': {0}")]
480 CreateSnapshotStagingDir(#[source] IoError, PathBuf),
481
482 #[error("failed to canonicalize snapshot source dir '{1}': {0}")]
483 CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),
484
485 #[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
486 SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),
487
488 #[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
489 SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),
490
491 #[error("failed to symlink version file from '{1}' to '{2}': {0}")]
492 SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),
493
494 #[error("failed to create archive file '{1}': {0}")]
495 CreateArchiveFile(#[source] IoError, PathBuf),
496
497 #[error("failed to archive version file: {0}")]
498 ArchiveVersionFile(#[source] IoError),
499
500 #[error("failed to archive snapshots dir: {0}")]
501 ArchiveSnapshotsDir(#[source] IoError),
502
503 #[error("failed to archive account storage file '{1}': {0}")]
504 ArchiveAccountStorageFile(#[source] IoError, PathBuf),
505
506 #[error("failed to archive snapshot: {0}")]
507 FinishArchive(#[source] IoError),
508
509 #[error("failed to create encoder: {0}")]
510 CreateEncoder(#[source] IoError),
511
512 #[error("failed to encode archive: {0}")]
513 FinishEncoder(#[source] IoError),
514
515 #[error("failed to query archive metadata '{1}': {0}")]
516 QueryArchiveMetadata(#[source] IoError, PathBuf),
517
518 #[error("failed to move archive from '{1}' to '{2}': {0}")]
519 MoveArchive(#[source] IoError, PathBuf, PathBuf),
520
521 #[error("failed to create account storage reader '{1}': {0}")]
522 AccountStorageReaderError(#[source] IoError, PathBuf),
523}
524
525#[derive(Error, Debug)]
527pub enum HardLinkStoragesToSnapshotError {
528 #[error("failed to create accounts hard links dir '{1}': {0}")]
529 CreateAccountsHardLinksDir(#[source] IoError, PathBuf),
530
531 #[error("failed to get the snapshot's accounts hard link dir: {0}")]
532 GetSnapshotHardLinksDir(#[from] GetSnapshotAccountsHardLinkDirError),
533
534 #[error("failed to hard link storage from '{1}' to '{2}': {0}")]
535 HardLinkStorage(#[source] IoError, PathBuf, PathBuf),
536}
537
538#[derive(Error, Debug)]
540pub enum GetSnapshotAccountsHardLinkDirError {
541 #[error("invalid account storage path '{0}'")]
542 GetAccountPath(PathBuf),
543
544 #[error("failed to create the snapshot hard link dir '{1}': {0}")]
545 CreateSnapshotHardLinkDir(#[source] IoError, PathBuf),
546
547 #[error("failed to symlink snapshot hard link dir '{link}' to '{original}': {source}")]
548 SymlinkSnapshotHardLinkDir {
549 source: IoError,
550 original: PathBuf,
551 link: PathBuf,
552 },
553}
554
555pub fn clean_orphaned_account_snapshot_dirs(
563 bank_snapshots_dir: impl AsRef<Path>,
564 account_snapshot_paths: &[PathBuf],
565) -> IoResult<()> {
566 let mut account_snapshot_dirs_referenced = HashSet::new();
569 let snapshots = get_bank_snapshots(bank_snapshots_dir);
570 for snapshot in snapshots {
571 let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
572 let Ok(read_dir) = fs::read_dir(&account_hardlinks_dir) else {
574 debug!(
578 "failed to read account hardlinks dir '{}'",
579 account_hardlinks_dir.display(),
580 );
581 continue;
582 };
583 for entry in read_dir {
584 let path = entry?.path();
585 let target = fs::read_link(&path).map_err(|err| {
586 IoError::other(format!(
587 "failed to read symlink '{}': {err}",
588 path.display(),
589 ))
590 })?;
591 account_snapshot_dirs_referenced.insert(target);
592 }
593 }
594
595 for account_snapshot_path in account_snapshot_paths {
597 let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
598 IoError::other(format!(
599 "failed to read account snapshot dir '{}': {err}",
600 account_snapshot_path.display(),
601 ))
602 })?;
603 for entry in read_dir {
604 let path = entry?.path();
605 if !account_snapshot_dirs_referenced.contains(&path) {
606 info!(
607 "Removing orphaned account snapshot hardlink directory '{}'...",
608 path.display()
609 );
610 move_and_async_delete_path(&path);
611 }
612 }
613 }
614
615 Ok(())
616}
617
618pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
620 let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
621 return;
623 };
624
625 let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
626
627 let incomplete_dirs: Vec<_> = read_dir_iter
628 .filter_map(|entry| entry.ok())
629 .map(|entry| entry.path())
630 .filter(|path| path.is_dir())
631 .filter(is_incomplete)
632 .collect();
633
634 for incomplete_dir in incomplete_dirs {
636 let result = purge_bank_snapshot(&incomplete_dir);
637 match result {
638 Ok(_) => info!(
639 "Purged incomplete snapshot dir: {}",
640 incomplete_dir.display()
641 ),
642 Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
643 }
644 }
645}
646
647fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
649 let state_complete_path = bank_snapshot_dir
650 .as_ref()
651 .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
652 state_complete_path.is_file()
653}
654
655fn write_snapshot_state_complete_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<()> {
657 let state_complete_path = bank_snapshot_dir
658 .as_ref()
659 .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
660 fs::File::create(&state_complete_path).map_err(|err| {
661 IoError::other(format!(
662 "failed to create file '{}': {err}",
663 state_complete_path.display(),
664 ))
665 })?;
666 Ok(())
667}
668
669pub fn write_full_snapshot_slot_file(
671 bank_snapshot_dir: impl AsRef<Path>,
672 full_snapshot_slot: Slot,
673) -> IoResult<()> {
674 let full_snapshot_slot_path = bank_snapshot_dir
675 .as_ref()
676 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
677 fs::write(
678 &full_snapshot_slot_path,
679 Slot::to_le_bytes(full_snapshot_slot),
680 )
681 .map_err(|err| {
682 IoError::other(format!(
683 "failed to write full snapshot slot file '{}': {err}",
684 full_snapshot_slot_path.display(),
685 ))
686 })
687}
688
689pub fn read_full_snapshot_slot_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<Slot> {
691 const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
692 let full_snapshot_slot_path = bank_snapshot_dir
693 .as_ref()
694 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
695 let full_snapshot_slot_file_metadata = fs::metadata(&full_snapshot_slot_path)?;
696 if full_snapshot_slot_file_metadata.len() != SLOT_SIZE as u64 {
697 let error_message = format!(
698 "invalid full snapshot slot file size: '{}' has {} bytes (should be {} bytes)",
699 full_snapshot_slot_path.display(),
700 full_snapshot_slot_file_metadata.len(),
701 SLOT_SIZE,
702 );
703 return Err(IoError::other(error_message));
704 }
705 let mut full_snapshot_slot_file = fs::File::open(&full_snapshot_slot_path)?;
706 let mut buffer = [0; SLOT_SIZE];
707 full_snapshot_slot_file.read_exact(&mut buffer)?;
708 let slot = Slot::from_le_bytes(buffer);
709 Ok(slot)
710}
711
712pub fn write_storages_flushed_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<()> {
714 let flushed_storages_path = bank_snapshot_dir
715 .as_ref()
716 .join(SNAPSHOT_STORAGES_FLUSHED_FILENAME);
717 fs::File::create(&flushed_storages_path).map_err(|err| {
718 IoError::other(format!(
719 "failed to create file '{}': {err}",
720 flushed_storages_path.display(),
721 ))
722 })?;
723 Ok(())
724}
725
726fn are_bank_snapshot_storages_flushed(bank_snapshot_dir: impl AsRef<Path>) -> bool {
728 let flushed_storages = bank_snapshot_dir
729 .as_ref()
730 .join(SNAPSHOT_STORAGES_FLUSHED_FILENAME);
731 flushed_storages.is_file()
732}
733
734pub fn get_highest_loadable_bank_snapshot(
742 snapshot_config: &SnapshotConfig,
743) -> Option<BankSnapshotInfo> {
744 let highest_bank_snapshot =
745 get_highest_bank_snapshot_post(&snapshot_config.bank_snapshots_dir)?;
746
747 if !snapshot_config.should_generate_snapshots() {
750 return Some(highest_bank_snapshot);
751 }
752
753 let highest_full_snapshot_archive_slot =
756 get_highest_full_snapshot_archive_slot(&snapshot_config.full_snapshot_archives_dir)?;
757 let full_snapshot_file_slot =
758 read_full_snapshot_slot_file(&highest_bank_snapshot.snapshot_dir).ok()?;
759 let are_storages_flushed =
760 are_bank_snapshot_storages_flushed(&highest_bank_snapshot.snapshot_dir);
761 (are_storages_flushed && (full_snapshot_file_slot == highest_full_snapshot_archive_slot))
762 .then_some(highest_bank_snapshot)
763}
764
765pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
768 if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
769 for entry in entries.flatten() {
770 if entry
771 .file_name()
772 .to_str()
773 .map(|file_name| file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX))
774 .unwrap_or(false)
775 {
776 let path = entry.path();
777 let result = if path.is_dir() {
778 fs::remove_dir_all(&path)
779 } else {
780 fs::remove_file(&path)
781 };
782 if let Err(err) = result {
783 warn!(
784 "Failed to remove temporary snapshot archive '{}': {err}",
785 path.display(),
786 );
787 }
788 }
789 }
790 }
791}
792
793pub fn serialize_and_archive_snapshot_package(
795 snapshot_package: SnapshotPackage,
796 snapshot_config: &SnapshotConfig,
797 should_flush_and_hard_link_storages: bool,
798) -> Result<SnapshotArchiveInfo> {
799 let SnapshotPackage {
800 snapshot_kind,
801 slot: snapshot_slot,
802 block_height,
803 hash: snapshot_hash,
804 mut snapshot_storages,
805 status_cache_slot_deltas,
806 bank_fields_to_serialize,
807 bank_hash_stats,
808 accounts_delta_hash,
809 accounts_hash,
810 epoch_accounts_hash,
811 bank_incremental_snapshot_persistence,
812 write_version,
813 enqueued: _,
814 } = snapshot_package;
815
816 let bank_snapshot_info = serialize_snapshot(
817 &snapshot_config.bank_snapshots_dir,
818 snapshot_config.snapshot_version,
819 snapshot_storages.as_slice(),
820 status_cache_slot_deltas.as_slice(),
821 bank_fields_to_serialize,
822 bank_hash_stats,
823 accounts_delta_hash,
824 accounts_hash,
825 epoch_accounts_hash,
826 bank_incremental_snapshot_persistence.as_ref(),
827 write_version,
828 should_flush_and_hard_link_storages,
829 )?;
830
831 let full_snapshot_archive_slot = match snapshot_kind {
833 SnapshotKind::FullSnapshot => snapshot_slot,
834 SnapshotKind::IncrementalSnapshot(base_slot) => base_slot,
835 };
836 write_full_snapshot_slot_file(&bank_snapshot_info.snapshot_dir, full_snapshot_archive_slot)
837 .map_err(|err| {
838 IoError::other(format!(
839 "failed to serialize snapshot slot {snapshot_slot}, block height {block_height}, kind {snapshot_kind:?}: {err}",
840 ))
841 })?;
842
843 let snapshot_archive_path = match snapshot_package.snapshot_kind {
844 SnapshotKind::FullSnapshot => build_full_snapshot_archive_path(
845 &snapshot_config.full_snapshot_archives_dir,
846 snapshot_package.slot,
847 &snapshot_package.hash,
848 snapshot_config.archive_format,
849 ),
850 SnapshotKind::IncrementalSnapshot(incremental_snapshot_base_slot) => {
851 snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
854 build_incremental_snapshot_archive_path(
855 &snapshot_config.incremental_snapshot_archives_dir,
856 incremental_snapshot_base_slot,
857 snapshot_package.slot,
858 &snapshot_package.hash,
859 snapshot_config.archive_format,
860 )
861 }
862 };
863
864 let snapshot_archive_info = archive_snapshot(
865 snapshot_kind,
866 snapshot_slot,
867 snapshot_hash,
868 snapshot_storages.as_slice(),
869 &bank_snapshot_info.snapshot_dir,
870 snapshot_archive_path,
871 snapshot_config.archive_format,
872 )?;
873
874 Ok(snapshot_archive_info)
875}
876
877#[allow(clippy::too_many_arguments)]
879fn serialize_snapshot(
880 bank_snapshots_dir: impl AsRef<Path>,
881 snapshot_version: SnapshotVersion,
882 snapshot_storages: &[Arc<AccountStorageEntry>],
883 slot_deltas: &[BankSlotDelta],
884 mut bank_fields: BankFieldsToSerialize,
885 bank_hash_stats: BankHashStats,
886 accounts_delta_hash: AccountsDeltaHash,
887 accounts_hash: AccountsHash,
888 epoch_accounts_hash: Option<EpochAccountsHash>,
889 bank_incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
890 write_version: u64,
891 should_flush_and_hard_link_storages: bool,
892) -> Result<BankSnapshotInfo> {
893 let slot = bank_fields.slot;
894
895 let do_serialize_snapshot = || {
898 let mut measure_everything = Measure::start("");
899 let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
900 if bank_snapshot_dir.exists() {
901 return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
902 bank_snapshot_dir,
903 ));
904 }
905 fs::create_dir_all(&bank_snapshot_dir).map_err(|err| {
906 AddBankSnapshotError::CreateSnapshotDir(err, bank_snapshot_dir.clone())
907 })?;
908
909 let bank_snapshot_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
911 info!(
912 "Creating bank snapshot for slot {slot} at '{}'",
913 bank_snapshot_path.display(),
914 );
915
916 let (flush_storages_us, hard_link_storages_us) = if should_flush_and_hard_link_storages {
917 let flush_measure = Measure::start("");
918 for storage in snapshot_storages {
919 storage.flush().map_err(|err| {
920 AddBankSnapshotError::FlushStorage(err, storage.path().to_path_buf())
921 })?;
922 }
923 let flush_us = flush_measure.end_as_us();
924 let (_, hard_link_us) = measure_us!(hard_link_storages_to_snapshot(
925 &bank_snapshot_dir,
926 slot,
927 snapshot_storages
928 )
929 .map_err(AddBankSnapshotError::HardLinkStorages)?);
930 write_storages_flushed_file(&bank_snapshot_dir)
931 .map_err(AddBankSnapshotError::MarkStoragesFlushed)?;
932 Some((flush_us, hard_link_us))
933 } else {
934 None
935 }
936 .unzip();
937
938 let bank_snapshot_serializer = move |stream: &mut BufWriter<fs::File>| -> Result<()> {
939 let versioned_epoch_stakes = mem::take(&mut bank_fields.versioned_epoch_stakes);
940 let extra_fields = ExtraFieldsToSerialize {
941 lamports_per_signature: bank_fields.fee_rate_governor.lamports_per_signature,
942 incremental_snapshot_persistence: bank_incremental_snapshot_persistence,
943 epoch_accounts_hash,
944 versioned_epoch_stakes,
945 accounts_lt_hash: bank_fields.accounts_lt_hash.clone().map(Into::into),
946 };
947 serde_snapshot::serialize_bank_snapshot_into(
948 stream,
949 bank_fields,
950 bank_hash_stats,
951 accounts_delta_hash,
952 accounts_hash,
953 &get_storages_to_serialize(snapshot_storages),
954 extra_fields,
955 write_version,
956 )?;
957 Ok(())
958 };
959 let (bank_snapshot_consumed_size, bank_serialize) = measure_time!(
960 serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
961 .map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
962 "bank serialize"
963 );
964
965 let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
966 let (status_cache_consumed_size, status_cache_serialize_us) = measure_us!(
967 snapshot_bank_utils::serialize_status_cache(slot_deltas, &status_cache_path)
968 .map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?
969 );
970
971 let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
972 let (_, write_version_file_us) = measure_us!(fs::write(
973 &version_path,
974 snapshot_version.as_str().as_bytes(),
975 )
976 .map_err(|err| AddBankSnapshotError::WriteSnapshotVersionFile(err, version_path))?);
977
978 let (_, write_state_complete_file_us) = measure_us!({
980 write_snapshot_state_complete_file(&bank_snapshot_dir)
981 .map_err(AddBankSnapshotError::MarkSnapshotComplete)?
982 });
983
984 measure_everything.stop();
985
986 datapoint_info!(
988 "snapshot_bank",
989 ("slot", slot, i64),
990 ("bank_size", bank_snapshot_consumed_size, i64),
991 ("status_cache_size", status_cache_consumed_size, i64),
992 ("flush_storages_us", flush_storages_us, Option<i64>),
993 ("hard_link_storages_us", hard_link_storages_us, Option<i64>),
994 ("bank_serialize_us", bank_serialize.as_us(), i64),
995 ("status_cache_serialize_us", status_cache_serialize_us, i64),
996 ("write_version_file_us", write_version_file_us, i64),
997 (
998 "write_state_complete_file_us",
999 write_state_complete_file_us,
1000 i64
1001 ),
1002 ("total_us", measure_everything.as_us(), i64),
1003 );
1004
1005 info!(
1006 "{} for slot {} at {}",
1007 bank_serialize,
1008 slot,
1009 bank_snapshot_path.display(),
1010 );
1011
1012 Ok(BankSnapshotInfo {
1013 slot,
1014 snapshot_kind: BankSnapshotKind::Pre,
1015 snapshot_dir: bank_snapshot_dir,
1016 snapshot_version,
1017 })
1018 };
1019
1020 do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
1021}
1022
1023fn archive_snapshot(
1025 snapshot_kind: SnapshotKind,
1026 snapshot_slot: Slot,
1027 snapshot_hash: SnapshotHash,
1028 snapshot_storages: &[Arc<AccountStorageEntry>],
1029 bank_snapshot_dir: impl AsRef<Path>,
1030 archive_path: impl AsRef<Path>,
1031 archive_format: ArchiveFormat,
1032) -> Result<SnapshotArchiveInfo> {
1033 use ArchiveSnapshotPackageError as E;
1034 const SNAPSHOTS_DIR: &str = "snapshots";
1035 const ACCOUNTS_DIR: &str = "accounts";
1036 info!("Generating snapshot archive for slot {snapshot_slot}, kind: {snapshot_kind:?}");
1037
1038 let mut timer = Measure::start("snapshot_package-package_snapshots");
1039 let tar_dir = archive_path
1040 .as_ref()
1041 .parent()
1042 .expect("Tar output path is invalid");
1043
1044 fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
1045
1046 let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
1048 let staging_dir = tempfile::Builder::new()
1049 .prefix(&format!("{}{}-", staging_dir_prefix, snapshot_slot))
1050 .tempdir_in(tar_dir)
1051 .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
1052 let staging_snapshots_dir = staging_dir.path().join(SNAPSHOTS_DIR);
1053
1054 let slot_str = snapshot_slot.to_string();
1055 let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
1056 fs::create_dir_all(&staging_snapshot_dir)
1058 .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
1059
1060 let src_snapshot_dir = bank_snapshot_dir.as_ref().canonicalize().map_err(|err| {
1062 E::CanonicalizeSnapshotSourceDir(err, bank_snapshot_dir.as_ref().to_path_buf())
1063 })?;
1064 let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
1065 let src_snapshot_file = src_snapshot_dir.join(slot_str);
1066 symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
1067 .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
1068
1069 let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1072 let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1073 symlink::symlink_file(&src_status_cache, &staging_status_cache)
1074 .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
1075
1076 let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
1078 let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1079 symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
1080 E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
1081 })?;
1082
1083 let staging_archive_path = tar_dir.join(format!(
1085 "{}{}.{}",
1086 staging_dir_prefix,
1087 snapshot_slot,
1088 archive_format.extension(),
1089 ));
1090
1091 {
1092 let archive_file = fs::File::create(&staging_archive_path)
1093 .map_err(|err| E::CreateArchiveFile(err, staging_archive_path.clone()))?;
1094
1095 let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
1096 let mut archive = tar::Builder::new(encoder);
1097 archive.sparse(false);
1106 archive
1109 .append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
1110 .map_err(E::ArchiveVersionFile)?;
1111 archive
1112 .append_dir_all(SNAPSHOTS_DIR, &staging_snapshots_dir)
1113 .map_err(E::ArchiveSnapshotsDir)?;
1114
1115 for storage in snapshot_storages {
1116 let path_in_archive = Path::new(ACCOUNTS_DIR)
1117 .join(AccountsFile::file_name(storage.slot(), storage.id()));
1118
1119 let reader =
1120 AccountStorageReader::new(storage, Some(snapshot_slot)).map_err(|err| {
1121 E::AccountStorageReaderError(err, storage.path().to_path_buf())
1122 })?;
1123 let mut header = tar::Header::new_gnu();
1124 header.set_path(path_in_archive).map_err(|err| {
1125 E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1126 })?;
1127 header.set_size(reader.len() as u64);
1128 header.set_cksum();
1129 archive.append(&header, reader).map_err(|err| {
1130 E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1131 })?;
1132 }
1133
1134 archive.into_inner().map_err(E::FinishArchive)?;
1135 Ok(())
1136 };
1137
1138 match archive_format {
1139 ArchiveFormat::TarZstd { config } => {
1140 let mut encoder =
1141 zstd::stream::Encoder::new(archive_file, config.compression_level)
1142 .map_err(E::CreateEncoder)?;
1143 do_archive_files(&mut encoder)?;
1144 encoder.finish().map_err(E::FinishEncoder)?;
1145 }
1146 ArchiveFormat::TarLz4 => {
1147 let mut encoder = lz4::EncoderBuilder::new()
1148 .level(1)
1149 .build(archive_file)
1150 .map_err(E::CreateEncoder)?;
1151 do_archive_files(&mut encoder)?;
1152 let (_output, result) = encoder.finish();
1153 result.map_err(E::FinishEncoder)?;
1154 }
1155 _ => panic!("archiving snapshot with '{archive_format}' is not supported"),
1156 };
1157 }
1158
1159 let metadata = fs::metadata(&staging_archive_path)
1161 .map_err(|err| E::QueryArchiveMetadata(err, staging_archive_path.clone()))?;
1162 let archive_path = archive_path.as_ref().to_path_buf();
1163 fs::rename(&staging_archive_path, &archive_path)
1164 .map_err(|err| E::MoveArchive(err, staging_archive_path, archive_path.clone()))?;
1165
1166 timer.stop();
1167 info!(
1168 "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
1169 archive_path.display(),
1170 snapshot_slot,
1171 timer.as_ms(),
1172 metadata.len()
1173 );
1174
1175 datapoint_info!(
1176 "archive-snapshot-package",
1177 ("slot", snapshot_slot, i64),
1178 ("archive_format", archive_format.to_string(), String),
1179 ("duration_ms", timer.as_ms(), i64),
1180 (
1181 if snapshot_kind.is_full_snapshot() {
1182 "full-snapshot-archive-size"
1183 } else {
1184 "incremental-snapshot-archive-size"
1185 },
1186 metadata.len(),
1187 i64
1188 ),
1189 );
1190 Ok(SnapshotArchiveInfo {
1191 path: archive_path,
1192 slot: snapshot_slot,
1193 hash: snapshot_hash,
1194 archive_format,
1195 })
1196}
1197
1198pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1200 let mut bank_snapshots = Vec::default();
1201 match fs::read_dir(&bank_snapshots_dir) {
1202 Err(err) => {
1203 info!(
1204 "Unable to read bank snapshots directory '{}': {err}",
1205 bank_snapshots_dir.as_ref().display(),
1206 );
1207 }
1208 Ok(paths) => paths
1209 .filter_map(|entry| {
1210 entry
1213 .ok()
1214 .filter(|entry| entry.path().is_dir())
1215 .and_then(|entry| {
1216 entry
1217 .path()
1218 .file_name()
1219 .and_then(|file_name| file_name.to_str())
1220 .and_then(|file_name| file_name.parse::<Slot>().ok())
1221 })
1222 })
1223 .for_each(
1224 |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
1225 Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
1226 Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
1229 },
1230 ),
1231 }
1232 bank_snapshots
1233}
1234
1235pub fn get_bank_snapshots_pre(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1239 let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1240 bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Pre);
1241 bank_snapshots
1242}
1243
1244pub fn get_bank_snapshots_post(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1248 let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1249 bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Post);
1250 bank_snapshots
1251}
1252
1253pub fn get_highest_bank_snapshot_pre(
1257 bank_snapshots_dir: impl AsRef<Path>,
1258) -> Option<BankSnapshotInfo> {
1259 do_get_highest_bank_snapshot(get_bank_snapshots_pre(bank_snapshots_dir))
1260}
1261
1262pub fn get_highest_bank_snapshot_post(
1266 bank_snapshots_dir: impl AsRef<Path>,
1267) -> Option<BankSnapshotInfo> {
1268 do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
1269}
1270
1271pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
1275 do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
1276}
1277
1278fn do_get_highest_bank_snapshot(
1279 mut bank_snapshots: Vec<BankSnapshotInfo>,
1280) -> Option<BankSnapshotInfo> {
1281 bank_snapshots.sort_unstable();
1282 bank_snapshots.into_iter().next_back()
1283}
1284
1285pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
1286where
1287 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1288{
1289 serialize_snapshot_data_file_capped::<F>(
1290 data_file_path,
1291 MAX_SNAPSHOT_DATA_FILE_SIZE,
1292 serializer,
1293 )
1294}
1295
1296pub fn deserialize_snapshot_data_file<T: Sized>(
1297 data_file_path: &Path,
1298 deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
1299) -> Result<T> {
1300 let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
1301 deserializer(streams.full_snapshot_stream)
1302 };
1303
1304 let wrapped_data_file_path = SnapshotRootPaths {
1305 full_snapshot_root_file_path: data_file_path.to_path_buf(),
1306 incremental_snapshot_root_file_path: None,
1307 };
1308
1309 deserialize_snapshot_data_files_capped(
1310 &wrapped_data_file_path,
1311 MAX_SNAPSHOT_DATA_FILE_SIZE,
1312 wrapped_deserializer,
1313 )
1314}
1315
1316pub fn deserialize_snapshot_data_files<T: Sized>(
1317 snapshot_root_paths: &SnapshotRootPaths,
1318 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1319) -> Result<T> {
1320 deserialize_snapshot_data_files_capped(
1321 snapshot_root_paths,
1322 MAX_SNAPSHOT_DATA_FILE_SIZE,
1323 deserializer,
1324 )
1325}
1326
1327fn serialize_snapshot_data_file_capped<F>(
1328 data_file_path: &Path,
1329 maximum_file_size: u64,
1330 serializer: F,
1331) -> Result<u64>
1332where
1333 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1334{
1335 let data_file = fs::File::create(data_file_path)?;
1336 let mut data_file_stream = BufWriter::new(data_file);
1337 serializer(&mut data_file_stream)?;
1338 data_file_stream.flush()?;
1339
1340 let consumed_size = data_file_stream.stream_position()?;
1341 if consumed_size > maximum_file_size {
1342 let error_message = format!(
1343 "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
1344 data_file_path.display(),
1345 );
1346 return Err(IoError::other(error_message).into());
1347 }
1348 Ok(consumed_size)
1349}
1350
1351fn deserialize_snapshot_data_files_capped<T: Sized>(
1352 snapshot_root_paths: &SnapshotRootPaths,
1353 maximum_file_size: u64,
1354 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1355) -> Result<T> {
1356 let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
1357 create_snapshot_data_file_stream(
1358 &snapshot_root_paths.full_snapshot_root_file_path,
1359 maximum_file_size,
1360 )?;
1361
1362 let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
1363 if let Some(ref incremental_snapshot_root_file_path) =
1364 snapshot_root_paths.incremental_snapshot_root_file_path
1365 {
1366 Some(create_snapshot_data_file_stream(
1367 incremental_snapshot_root_file_path,
1368 maximum_file_size,
1369 )?)
1370 } else {
1371 None
1372 }
1373 .unzip();
1374
1375 let mut snapshot_streams = SnapshotStreams {
1376 full_snapshot_stream: &mut full_snapshot_data_file_stream,
1377 incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
1378 };
1379 let ret = deserializer(&mut snapshot_streams)?;
1380
1381 check_deserialize_file_consumed(
1382 full_snapshot_file_size,
1383 &snapshot_root_paths.full_snapshot_root_file_path,
1384 &mut full_snapshot_data_file_stream,
1385 )?;
1386
1387 if let Some(ref incremental_snapshot_root_file_path) =
1388 snapshot_root_paths.incremental_snapshot_root_file_path
1389 {
1390 check_deserialize_file_consumed(
1391 incremental_snapshot_file_size.unwrap(),
1392 incremental_snapshot_root_file_path,
1393 incremental_snapshot_data_file_stream.as_mut().unwrap(),
1394 )?;
1395 }
1396
1397 Ok(ret)
1398}
1399
1400fn create_snapshot_data_file_stream(
1403 snapshot_root_file_path: impl AsRef<Path>,
1404 maximum_file_size: u64,
1405) -> Result<(u64, BufReader<std::fs::File>)> {
1406 let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
1407
1408 if snapshot_file_size > maximum_file_size {
1409 let error_message = format!(
1410 "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
1411 snapshot_root_file_path.as_ref().display(),
1412 snapshot_file_size,
1413 maximum_file_size,
1414 );
1415 return Err(IoError::other(error_message).into());
1416 }
1417
1418 let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
1419 let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
1420
1421 Ok((snapshot_file_size, snapshot_data_file_stream))
1422}
1423
1424fn check_deserialize_file_consumed(
1427 file_size: u64,
1428 file_path: impl AsRef<Path>,
1429 file_stream: &mut BufReader<std::fs::File>,
1430) -> Result<()> {
1431 let consumed_size = file_stream.stream_position()?;
1432
1433 if consumed_size != file_size {
1434 let error_message = format!(
1435 "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to deserialize",
1436 file_path.as_ref().display(),
1437 file_size,
1438 consumed_size,
1439 );
1440 return Err(IoError::other(error_message).into());
1441 }
1442
1443 Ok(())
1444}
1445
1446fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
1448 let run_path = appendvec_path.parent()?;
1449 let run_file_name = run_path.file_name()?;
1450 if run_file_name != ACCOUNTS_RUN_DIR {
1453 error!(
1454 "The account path {} does not have run/ as its immediate parent directory.",
1455 run_path.display()
1456 );
1457 return None;
1458 }
1459 let account_path = run_path.parent()?;
1460 Some(account_path.to_path_buf())
1461}
1462
1463fn get_snapshot_accounts_hardlink_dir(
1466 appendvec_path: &Path,
1467 bank_slot: Slot,
1468 account_paths: &mut HashSet<PathBuf>,
1469 hardlinks_dir: impl AsRef<Path>,
1470) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1471 let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1472 GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1473 })?;
1474
1475 let snapshot_hardlink_dir = account_path
1476 .join(ACCOUNTS_SNAPSHOT_DIR)
1477 .join(bank_slot.to_string());
1478
1479 if !account_paths.contains(&account_path) {
1482 let idx = account_paths.len();
1483 debug!(
1484 "for appendvec_path {}, create hard-link path {}",
1485 appendvec_path.display(),
1486 snapshot_hardlink_dir.display()
1487 );
1488 fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1489 GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1490 err,
1491 snapshot_hardlink_dir.clone(),
1492 )
1493 })?;
1494 let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1495 symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1496 GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1497 source: err,
1498 original: snapshot_hardlink_dir.clone(),
1499 link: symlink_path,
1500 }
1501 })?;
1502 account_paths.insert(account_path);
1503 };
1504
1505 Ok(snapshot_hardlink_dir)
1506}
1507
1508pub fn hard_link_storages_to_snapshot(
1513 bank_snapshot_dir: impl AsRef<Path>,
1514 bank_slot: Slot,
1515 snapshot_storages: &[Arc<AccountStorageEntry>],
1516) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1517 let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1518 fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1519 HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1520 err,
1521 accounts_hardlinks_dir.clone(),
1522 )
1523 })?;
1524
1525 let mut account_paths: HashSet<PathBuf> = HashSet::new();
1526 for storage in snapshot_storages {
1527 let storage_path = storage.accounts.path();
1528 let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1529 storage_path,
1530 bank_slot,
1531 &mut account_paths,
1532 &accounts_hardlinks_dir,
1533 )?;
1534 let hardlink_filename = AccountsFile::file_name(storage.slot(), storage.id());
1537 let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1538 fs::hard_link(storage_path, &hard_link_path).map_err(|err| {
1539 HardLinkStoragesToSnapshotError::HardLinkStorage(
1540 err,
1541 storage_path.to_path_buf(),
1542 hard_link_path,
1543 )
1544 })?;
1545 }
1546 Ok(())
1547}
1548
1549pub(crate) fn get_storages_to_serialize(
1552 snapshot_storages: &[Arc<AccountStorageEntry>],
1553) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1554 snapshot_storages
1555 .iter()
1556 .map(|storage| vec![Arc::clone(storage)])
1557 .collect::<Vec<_>>()
1558}
1559
1560const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
1562
1563pub fn verify_and_unarchive_snapshots(
1565 bank_snapshots_dir: impl AsRef<Path>,
1566 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1567 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1568 account_paths: &[PathBuf],
1569 storage_access: StorageAccess,
1570) -> Result<(
1571 UnarchivedSnapshot,
1572 Option<UnarchivedSnapshot>,
1573 AtomicAccountsFileId,
1574)> {
1575 check_are_snapshots_compatible(
1576 full_snapshot_archive_info,
1577 incremental_snapshot_archive_info,
1578 )?;
1579
1580 let parallel_divisions = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT);
1581
1582 let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
1583 let unarchived_full_snapshot = unarchive_snapshot(
1584 &bank_snapshots_dir,
1585 TMP_SNAPSHOT_ARCHIVE_PREFIX,
1586 full_snapshot_archive_info.path(),
1587 "snapshot untar",
1588 account_paths,
1589 full_snapshot_archive_info.archive_format(),
1590 parallel_divisions,
1591 next_append_vec_id.clone(),
1592 storage_access,
1593 )?;
1594
1595 let unarchived_incremental_snapshot =
1596 if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1597 let unarchived_incremental_snapshot = unarchive_snapshot(
1598 &bank_snapshots_dir,
1599 TMP_SNAPSHOT_ARCHIVE_PREFIX,
1600 incremental_snapshot_archive_info.path(),
1601 "incremental snapshot untar",
1602 account_paths,
1603 incremental_snapshot_archive_info.archive_format(),
1604 parallel_divisions,
1605 next_append_vec_id.clone(),
1606 storage_access,
1607 )?;
1608 Some(unarchived_incremental_snapshot)
1609 } else {
1610 None
1611 };
1612
1613 Ok((
1614 unarchived_full_snapshot,
1615 unarchived_incremental_snapshot,
1616 Arc::try_unwrap(next_append_vec_id).unwrap(),
1617 ))
1618}
1619
1620fn spawn_unpack_snapshot_thread(
1622 file_sender: Sender<PathBuf>,
1623 account_paths: Arc<Vec<PathBuf>>,
1624 ledger_dir: Arc<PathBuf>,
1625 mut archive: Archive<SharedBufferReader>,
1626 parallel_selector: Option<ParallelSelector>,
1627 thread_index: usize,
1628) -> JoinHandle<()> {
1629 Builder::new()
1630 .name(format!("solUnpkSnpsht{thread_index:02}"))
1631 .spawn(move || {
1632 hardened_unpack::streaming_unpack_snapshot(
1633 &mut archive,
1634 ledger_dir.as_path(),
1635 &account_paths,
1636 parallel_selector,
1637 &file_sender,
1638 )
1639 .unwrap();
1640 })
1641 .unwrap()
1642}
1643
1644fn streaming_unarchive_snapshot(
1646 file_sender: Sender<PathBuf>,
1647 account_paths: Vec<PathBuf>,
1648 ledger_dir: PathBuf,
1649 snapshot_archive_path: PathBuf,
1650 archive_format: ArchiveFormat,
1651 num_threads: usize,
1652) -> Vec<JoinHandle<()>> {
1653 let account_paths = Arc::new(account_paths);
1654 let ledger_dir = Arc::new(ledger_dir);
1655 let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format);
1656
1657 let archives: Vec<_> = (0..num_threads)
1659 .map(|_| {
1660 let reader = SharedBufferReader::new(&shared_buffer);
1661 Archive::new(reader)
1662 })
1663 .collect();
1664
1665 archives
1666 .into_iter()
1667 .enumerate()
1668 .map(|(thread_index, archive)| {
1669 let parallel_selector = Some(ParallelSelector {
1670 index: thread_index,
1671 divisions: num_threads,
1672 });
1673
1674 spawn_unpack_snapshot_thread(
1675 file_sender.clone(),
1676 account_paths.clone(),
1677 ledger_dir.clone(),
1678 archive,
1679 parallel_selector,
1680 thread_index,
1681 )
1682 })
1683 .collect()
1684}
1685
1686fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1691 let snapshots_dir = unpack_dir.as_ref().join("snapshots");
1692 if !snapshots_dir.is_dir() {
1693 return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1694 }
1695
1696 let slot_dir = std::fs::read_dir(&snapshots_dir)
1698 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1699 .find(|entry| entry.as_ref().unwrap().path().is_dir())
1700 .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1701 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1702 .path();
1703
1704 let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
1705 fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
1706
1707 let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1708 fs::hard_link(
1709 status_cache_file,
1710 slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
1711 )?;
1712
1713 write_snapshot_state_complete_file(slot_dir)?;
1714
1715 Ok(())
1716}
1717
1718fn unarchive_snapshot(
1722 bank_snapshots_dir: impl AsRef<Path>,
1723 unpacked_snapshots_dir_prefix: &'static str,
1724 snapshot_archive_path: impl AsRef<Path>,
1725 measure_name: &'static str,
1726 account_paths: &[PathBuf],
1727 archive_format: ArchiveFormat,
1728 parallel_divisions: usize,
1729 next_append_vec_id: Arc<AtomicAccountsFileId>,
1730 storage_access: StorageAccess,
1731) -> Result<UnarchivedSnapshot> {
1732 let unpack_dir = tempfile::Builder::new()
1733 .prefix(unpacked_snapshots_dir_prefix)
1734 .tempdir_in(bank_snapshots_dir)?;
1735 let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
1736
1737 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1738 streaming_unarchive_snapshot(
1739 file_sender,
1740 account_paths.to_vec(),
1741 unpack_dir.path().to_path_buf(),
1742 snapshot_archive_path.as_ref().to_path_buf(),
1743 archive_format,
1744 parallel_divisions,
1745 );
1746
1747 let num_rebuilder_threads = num_cpus::get_physical()
1748 .saturating_sub(parallel_divisions)
1749 .max(1);
1750 let (version_and_storages, measure_untar) = measure_time!(
1751 SnapshotStorageRebuilder::rebuild_storage(
1752 file_receiver,
1753 num_rebuilder_threads,
1754 next_append_vec_id,
1755 SnapshotFrom::Archive,
1756 storage_access,
1757 )?,
1758 measure_name
1759 );
1760 info!("{}", measure_untar);
1761
1762 create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1763
1764 let RebuiltSnapshotStorage {
1765 snapshot_version,
1766 storage,
1767 } = version_and_storages;
1768 Ok(UnarchivedSnapshot {
1769 unpack_dir,
1770 storage,
1771 unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1772 unpacked_snapshots_dir,
1773 snapshot_version,
1774 },
1775 measure_untar,
1776 })
1777}
1778
1779fn streaming_snapshot_dir_files(
1782 file_sender: Sender<PathBuf>,
1783 snapshot_file_path: impl Into<PathBuf>,
1784 snapshot_version_path: impl Into<PathBuf>,
1785 account_paths: &[PathBuf],
1786) -> Result<()> {
1787 file_sender.send(snapshot_file_path.into())?;
1788 file_sender.send(snapshot_version_path.into())?;
1789
1790 for account_path in account_paths {
1791 for file in fs::read_dir(account_path)? {
1792 file_sender.send(file?.path())?;
1793 }
1794 }
1795
1796 Ok(())
1797}
1798
1799pub fn rebuild_storages_from_snapshot_dir(
1804 snapshot_info: &BankSnapshotInfo,
1805 account_paths: &[PathBuf],
1806 next_append_vec_id: Arc<AtomicAccountsFileId>,
1807 storage_access: StorageAccess,
1808) -> Result<AccountStorageMap> {
1809 let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1810 let accounts_hardlinks = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1811 let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1812
1813 let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1814 IoError::other(format!(
1815 "failed to read accounts hardlinks dir '{}': {err}",
1816 accounts_hardlinks.display(),
1817 ))
1818 })?;
1819 for dir_entry in read_dir {
1820 let symlink_path = dir_entry?.path();
1821 let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
1824 IoError::other(format!(
1825 "failed to read symlink '{}': {err}",
1826 symlink_path.display(),
1827 ))
1828 })?;
1829 let account_run_path = account_snapshot_path
1830 .parent()
1831 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1832 .parent()
1833 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1834 .join(ACCOUNTS_RUN_DIR);
1835 if !account_run_paths.contains(&account_run_path) {
1836 return Err(SnapshotError::AccountPathsMismatch);
1839 }
1840 let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
1843 IoError::other(format!(
1844 "failed to read account snapshot dir '{}': {err}",
1845 account_snapshot_path.display(),
1846 ))
1847 })?;
1848 for file in read_dir {
1849 let file_path = file?.path();
1850 let file_name = file_path
1851 .file_name()
1852 .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
1853 let dest_path = account_run_path.join(file_name);
1854 fs::hard_link(&file_path, &dest_path).map_err(|err| {
1855 IoError::other(format!(
1856 "failed to hard link from '{}' to '{}': {err}",
1857 file_path.display(),
1858 dest_path.display(),
1859 ))
1860 })?;
1861 }
1862 }
1863
1864 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1865 let snapshot_file_path = &snapshot_info.snapshot_path();
1866 let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1867 streaming_snapshot_dir_files(
1868 file_sender,
1869 snapshot_file_path,
1870 snapshot_version_path,
1871 account_paths,
1872 )?;
1873
1874 let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1875 let version_and_storages = SnapshotStorageRebuilder::rebuild_storage(
1876 file_receiver,
1877 num_rebuilder_threads,
1878 next_append_vec_id,
1879 SnapshotFrom::Dir,
1880 storage_access,
1881 )?;
1882
1883 let RebuiltSnapshotStorage {
1884 snapshot_version: _,
1885 storage,
1886 } = version_and_storages;
1887 Ok(storage)
1888}
1889
1890fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
1894 let file_metadata = fs::metadata(&path).map_err(|err| {
1896 IoError::other(format!(
1897 "failed to query snapshot version file metadata '{}': {err}",
1898 path.as_ref().display(),
1899 ))
1900 })?;
1901 let file_size = file_metadata.len();
1902 if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
1903 let error_message = format!(
1904 "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
1905 path.as_ref().display(),
1906 file_size,
1907 MAX_SNAPSHOT_VERSION_FILE_SIZE,
1908 );
1909 return Err(IoError::other(error_message).into());
1910 }
1911
1912 let mut snapshot_version = String::new();
1914 let mut file = fs::File::open(&path).map_err(|err| {
1915 IoError::other(format!(
1916 "failed to open snapshot version file '{}': {err}",
1917 path.as_ref().display()
1918 ))
1919 })?;
1920 file.read_to_string(&mut snapshot_version).map_err(|err| {
1921 IoError::other(format!(
1922 "failed to read snapshot version from file '{}': {err}",
1923 path.as_ref().display()
1924 ))
1925 })?;
1926
1927 Ok(snapshot_version.trim().to_string())
1928}
1929
1930fn check_are_snapshots_compatible(
1933 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1934 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1935) -> Result<()> {
1936 if incremental_snapshot_archive_info.is_none() {
1937 return Ok(());
1938 }
1939
1940 let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
1941
1942 (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
1943 .then_some(())
1944 .ok_or_else(|| {
1945 SnapshotError::MismatchedBaseSlot(
1946 full_snapshot_archive_info.slot(),
1947 incremental_snapshot_archive_info.base_slot(),
1948 )
1949 })
1950}
1951
1952pub fn path_to_file_name_str(path: &Path) -> Result<&str> {
1954 path.file_name()
1955 .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))?
1956 .to_str()
1957 .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf()))
1958}
1959
1960pub fn build_snapshot_archives_remote_dir(snapshot_archives_dir: impl AsRef<Path>) -> PathBuf {
1961 snapshot_archives_dir
1962 .as_ref()
1963 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
1964}
1965
1966pub fn build_full_snapshot_archive_path(
1969 full_snapshot_archives_dir: impl AsRef<Path>,
1970 slot: Slot,
1971 hash: &SnapshotHash,
1972 archive_format: ArchiveFormat,
1973) -> PathBuf {
1974 full_snapshot_archives_dir.as_ref().join(format!(
1975 "snapshot-{}-{}.{}",
1976 slot,
1977 hash.0,
1978 archive_format.extension(),
1979 ))
1980}
1981
1982pub fn build_incremental_snapshot_archive_path(
1986 incremental_snapshot_archives_dir: impl AsRef<Path>,
1987 base_slot: Slot,
1988 slot: Slot,
1989 hash: &SnapshotHash,
1990 archive_format: ArchiveFormat,
1991) -> PathBuf {
1992 incremental_snapshot_archives_dir.as_ref().join(format!(
1993 "incremental-snapshot-{}-{}-{}.{}",
1994 base_slot,
1995 slot,
1996 hash.0,
1997 archive_format.extension(),
1998 ))
1999}
2000
2001pub(crate) fn parse_full_snapshot_archive_filename(
2003 archive_filename: &str,
2004) -> Result<(Slot, SnapshotHash, ArchiveFormat)> {
2005 static RE: std::sync::LazyLock<Regex> =
2006 std::sync::LazyLock::new(|| Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap());
2007
2008 let do_parse = || {
2009 RE.captures(archive_filename).and_then(|captures| {
2010 let slot = captures
2011 .name("slot")
2012 .map(|x| x.as_str().parse::<Slot>())?
2013 .ok()?;
2014 let hash = captures
2015 .name("hash")
2016 .map(|x| x.as_str().parse::<Hash>())?
2017 .ok()?;
2018 let archive_format = captures
2019 .name("ext")
2020 .map(|x| x.as_str().parse::<ArchiveFormat>())?
2021 .ok()?;
2022
2023 Some((slot, SnapshotHash(hash), archive_format))
2024 })
2025 };
2026
2027 do_parse().ok_or_else(|| {
2028 SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2029 })
2030}
2031
2032pub(crate) fn parse_incremental_snapshot_archive_filename(
2034 archive_filename: &str,
2035) -> Result<(Slot, Slot, SnapshotHash, ArchiveFormat)> {
2036 static RE: std::sync::LazyLock<Regex> = std::sync::LazyLock::new(|| {
2037 Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap()
2038 });
2039
2040 let do_parse = || {
2041 RE.captures(archive_filename).and_then(|captures| {
2042 let base_slot = captures
2043 .name("base")
2044 .map(|x| x.as_str().parse::<Slot>())?
2045 .ok()?;
2046 let slot = captures
2047 .name("slot")
2048 .map(|x| x.as_str().parse::<Slot>())?
2049 .ok()?;
2050 let hash = captures
2051 .name("hash")
2052 .map(|x| x.as_str().parse::<Hash>())?
2053 .ok()?;
2054 let archive_format = captures
2055 .name("ext")
2056 .map(|x| x.as_str().parse::<ArchiveFormat>())?
2057 .ok()?;
2058
2059 Some((base_slot, slot, SnapshotHash(hash), archive_format))
2060 })
2061 };
2062
2063 do_parse().ok_or_else(|| {
2064 SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2065 })
2066}
2067
2068fn get_snapshot_archives<T, F>(snapshot_archives_dir: &Path, cb: F) -> Vec<T>
2070where
2071 F: Fn(PathBuf) -> Result<T>,
2072{
2073 let walk_dir = |dir: &Path| -> Vec<T> {
2074 let entry_iter = fs::read_dir(dir);
2075 match entry_iter {
2076 Err(err) => {
2077 info!(
2078 "Unable to read snapshot archives directory '{}': {err}",
2079 dir.display(),
2080 );
2081 vec![]
2082 }
2083 Ok(entries) => entries
2084 .filter_map(|entry| entry.map_or(None, |entry| cb(entry.path()).ok()))
2085 .collect(),
2086 }
2087 };
2088
2089 let mut ret = walk_dir(snapshot_archives_dir);
2090 let remote_dir = build_snapshot_archives_remote_dir(snapshot_archives_dir);
2091 if remote_dir.exists() {
2092 ret.append(&mut walk_dir(remote_dir.as_ref()));
2093 }
2094 ret
2095}
2096
2097pub fn get_full_snapshot_archives(
2099 full_snapshot_archives_dir: impl AsRef<Path>,
2100) -> Vec<FullSnapshotArchiveInfo> {
2101 get_snapshot_archives(
2102 full_snapshot_archives_dir.as_ref(),
2103 FullSnapshotArchiveInfo::new_from_path,
2104 )
2105}
2106
2107pub fn get_incremental_snapshot_archives(
2109 incremental_snapshot_archives_dir: impl AsRef<Path>,
2110) -> Vec<IncrementalSnapshotArchiveInfo> {
2111 get_snapshot_archives(
2112 incremental_snapshot_archives_dir.as_ref(),
2113 IncrementalSnapshotArchiveInfo::new_from_path,
2114 )
2115}
2116
2117pub fn get_highest_full_snapshot_archive_slot(
2119 full_snapshot_archives_dir: impl AsRef<Path>,
2120) -> Option<Slot> {
2121 get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
2122 .map(|full_snapshot_archive_info| full_snapshot_archive_info.slot())
2123}
2124
2125pub fn get_highest_incremental_snapshot_archive_slot(
2128 incremental_snapshot_archives_dir: impl AsRef<Path>,
2129 full_snapshot_slot: Slot,
2130) -> Option<Slot> {
2131 get_highest_incremental_snapshot_archive_info(
2132 incremental_snapshot_archives_dir,
2133 full_snapshot_slot,
2134 )
2135 .map(|incremental_snapshot_archive_info| incremental_snapshot_archive_info.slot())
2136}
2137
2138pub fn get_highest_full_snapshot_archive_info(
2140 full_snapshot_archives_dir: impl AsRef<Path>,
2141) -> Option<FullSnapshotArchiveInfo> {
2142 let mut full_snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2143 full_snapshot_archives.sort_unstable();
2144 full_snapshot_archives.into_iter().next_back()
2145}
2146
2147pub fn get_highest_incremental_snapshot_archive_info(
2150 incremental_snapshot_archives_dir: impl AsRef<Path>,
2151 full_snapshot_slot: Slot,
2152) -> Option<IncrementalSnapshotArchiveInfo> {
2153 let mut incremental_snapshot_archives =
2157 get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
2158 .into_iter()
2159 .filter(|incremental_snapshot_archive_info| {
2160 incremental_snapshot_archive_info.base_slot() == full_snapshot_slot
2161 })
2162 .collect::<Vec<_>>();
2163 incremental_snapshot_archives.sort_unstable();
2164 incremental_snapshot_archives.into_iter().next_back()
2165}
2166
2167pub fn purge_old_snapshot_archives(
2168 full_snapshot_archives_dir: impl AsRef<Path>,
2169 incremental_snapshot_archives_dir: impl AsRef<Path>,
2170 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2171 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2172) {
2173 info!(
2174 "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
2175 full_snapshot_archives_dir.as_ref().display(),
2176 maximum_full_snapshot_archives_to_retain
2177 );
2178
2179 let mut full_snapshot_archives = get_full_snapshot_archives(&full_snapshot_archives_dir);
2180 full_snapshot_archives.sort_unstable();
2181 full_snapshot_archives.reverse();
2182
2183 let num_to_retain = full_snapshot_archives
2184 .len()
2185 .min(maximum_full_snapshot_archives_to_retain.get());
2186 trace!(
2187 "There are {} full snapshot archives, retaining {}",
2188 full_snapshot_archives.len(),
2189 num_to_retain,
2190 );
2191
2192 let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
2193 if full_snapshot_archives.is_empty() {
2194 None
2195 } else {
2196 Some(full_snapshot_archives.split_at(num_to_retain))
2197 }
2198 .unwrap_or_default();
2199
2200 let retained_full_snapshot_slots = full_snapshot_archives_to_retain
2201 .iter()
2202 .map(|ai| ai.slot())
2203 .collect::<HashSet<_>>();
2204
2205 fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
2206 for path in archives.iter().map(|a| a.path()) {
2207 trace!("Removing snapshot archive: {}", path.display());
2208 let result = fs::remove_file(path);
2209 if let Err(err) = result {
2210 info!(
2211 "Failed to remove snapshot archive '{}': {err}",
2212 path.display()
2213 );
2214 }
2215 }
2216 }
2217 remove_archives(full_snapshot_archives_to_remove);
2218
2219 info!(
2220 "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
2221 incremental_snapshot_archives_dir.as_ref().display(),
2222 maximum_incremental_snapshot_archives_to_retain
2223 );
2224 let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
2225 for incremental_snapshot_archive in
2226 get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
2227 {
2228 incremental_snapshot_archives_by_base_slot
2229 .entry(incremental_snapshot_archive.base_slot())
2230 .or_default()
2231 .push(incremental_snapshot_archive)
2232 }
2233
2234 let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
2235 for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
2236 {
2237 incremental_snapshot_archives.sort_unstable();
2238 let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
2239 maximum_incremental_snapshot_archives_to_retain.get()
2240 } else {
2241 usize::from(retained_full_snapshot_slots.contains(&base_slot))
2242 };
2243 trace!(
2244 "There are {} incremental snapshot archives for base slot {}, removing {} of them",
2245 incremental_snapshot_archives.len(),
2246 base_slot,
2247 incremental_snapshot_archives
2248 .len()
2249 .saturating_sub(num_to_retain),
2250 );
2251
2252 incremental_snapshot_archives.truncate(
2253 incremental_snapshot_archives
2254 .len()
2255 .saturating_sub(num_to_retain),
2256 );
2257 remove_archives(&incremental_snapshot_archives);
2258 }
2259}
2260
2261#[cfg(feature = "dev-context-only-utils")]
2262fn unpack_snapshot_local(
2263 shared_buffer: SharedBuffer,
2264 ledger_dir: &Path,
2265 account_paths: &[PathBuf],
2266 parallel_divisions: usize,
2267) -> Result<UnpackedAppendVecMap> {
2268 assert!(parallel_divisions > 0);
2269
2270 let readers = (0..parallel_divisions)
2272 .map(|_| SharedBufferReader::new(&shared_buffer))
2273 .collect::<Vec<_>>();
2274
2275 let all_unpacked_append_vec_map = readers
2277 .into_par_iter()
2278 .enumerate()
2279 .map(|(index, reader)| {
2280 let parallel_selector = Some(ParallelSelector {
2281 index,
2282 divisions: parallel_divisions,
2283 });
2284 let mut archive = Archive::new(reader);
2285 hardened_unpack::unpack_snapshot(
2286 &mut archive,
2287 ledger_dir,
2288 account_paths,
2289 parallel_selector,
2290 )
2291 })
2292 .collect::<Vec<_>>();
2293
2294 let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
2295 for h in all_unpacked_append_vec_map {
2296 unpacked_append_vec_map.extend(h?);
2297 }
2298
2299 Ok(unpacked_append_vec_map)
2300}
2301
2302fn untar_snapshot_create_shared_buffer(
2303 snapshot_tar: &Path,
2304 archive_format: ArchiveFormat,
2305) -> SharedBuffer {
2306 let open_file = || {
2307 fs::File::open(snapshot_tar)
2308 .map_err(|err| {
2309 IoError::other(format!(
2310 "failed to open snapshot archive '{}': {err}",
2311 snapshot_tar.display(),
2312 ))
2313 })
2314 .unwrap()
2315 };
2316 match archive_format {
2318 ArchiveFormat::TarBzip2 => SharedBuffer::new(BzDecoder::new(open_file())),
2319 ArchiveFormat::TarGzip => SharedBuffer::new(GzDecoder::new(open_file())),
2320 ArchiveFormat::TarZstd { .. } => {
2321 SharedBuffer::new(zstd::stream::read::Decoder::new(open_file()).unwrap())
2322 }
2323 ArchiveFormat::TarLz4 => SharedBuffer::new(lz4::Decoder::new(open_file()).unwrap()),
2324 ArchiveFormat::Tar => SharedBuffer::new(BufReader::new(open_file())),
2325 }
2326}
2327
2328#[cfg(feature = "dev-context-only-utils")]
2329fn untar_snapshot_in(
2330 snapshot_tar: impl AsRef<Path>,
2331 unpack_dir: &Path,
2332 account_paths: &[PathBuf],
2333 archive_format: ArchiveFormat,
2334 parallel_divisions: usize,
2335) -> Result<UnpackedAppendVecMap> {
2336 let shared_buffer = untar_snapshot_create_shared_buffer(snapshot_tar.as_ref(), archive_format);
2337 unpack_snapshot_local(shared_buffer, unpack_dir, account_paths, parallel_divisions)
2338}
2339
2340pub fn verify_unpacked_snapshots_dir_and_version(
2341 unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
2342) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
2343 info!(
2344 "snapshot version: {}",
2345 &unpacked_snapshots_dir_and_version.snapshot_version
2346 );
2347
2348 let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
2349 let mut bank_snapshots =
2350 get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
2351 if bank_snapshots.len() > 1 {
2352 return Err(IoError::other(format!(
2353 "invalid snapshot format: only one snapshot allowed, but found {}",
2354 bank_snapshots.len(),
2355 ))
2356 .into());
2357 }
2358 let root_paths = bank_snapshots.pop().ok_or_else(|| {
2359 IoError::other(format!(
2360 "no snapshots found in snapshots directory '{}'",
2361 unpacked_snapshots_dir_and_version
2362 .unpacked_snapshots_dir
2363 .display(),
2364 ))
2365 })?;
2366 Ok((snapshot_version, root_paths))
2367}
2368
2369pub fn get_snapshot_file_name(slot: Slot) -> String {
2371 slot.to_string()
2372}
2373
2374pub fn get_bank_snapshot_dir(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) -> PathBuf {
2376 bank_snapshots_dir
2377 .as_ref()
2378 .join(get_snapshot_file_name(slot))
2379}
2380
2381#[derive(Debug, Copy, Clone)]
2382pub enum VerifyBank {
2384 Deterministic,
2386 NonDeterministic,
2389}
2390
2391#[cfg(feature = "dev-context-only-utils")]
2392pub fn verify_snapshot_archive(
2393 snapshot_archive: impl AsRef<Path>,
2394 snapshots_to_verify: impl AsRef<Path>,
2395 archive_format: ArchiveFormat,
2396 verify_bank: VerifyBank,
2397 slot: Slot,
2398) {
2399 let temp_dir = tempfile::TempDir::new().unwrap();
2400 let unpack_dir = temp_dir.path();
2401 let unpack_account_dir = create_accounts_run_and_snapshot_dirs(unpack_dir).unwrap().0;
2402 untar_snapshot_in(
2403 snapshot_archive,
2404 unpack_dir,
2405 &[unpack_account_dir.clone()],
2406 archive_format,
2407 1,
2408 )
2409 .unwrap();
2410
2411 let unpacked_snapshots = unpack_dir.join("snapshots");
2413
2414 let storages_to_verify = unpack_dir.join("storages_to_verify");
2417 fs::create_dir_all(&storages_to_verify).unwrap();
2419
2420 let slot = slot.to_string();
2421 let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
2422
2423 if let VerifyBank::NonDeterministic = verify_bank {
2424 let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
2426 let p2 = unpacked_snapshots.join(&slot).join(&slot);
2427 assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
2428 fs::remove_file(p1).unwrap();
2429 fs::remove_file(p2).unwrap();
2430 }
2431
2432 let existing_unpacked_status_cache_file =
2438 unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2439 let new_unpacked_status_cache_file = unpacked_snapshots
2440 .join(&slot)
2441 .join(SNAPSHOT_STATUS_CACHE_FILENAME);
2442 fs::rename(
2443 existing_unpacked_status_cache_file,
2444 new_unpacked_status_cache_file,
2445 )
2446 .unwrap();
2447
2448 let accounts_hardlinks_dir = snapshot_slot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2449 if accounts_hardlinks_dir.is_dir() {
2450 for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
2452 let link_dst_path = fs::read_link(entry.unwrap().path()).unwrap();
2453 for entry in fs::read_dir(&link_dst_path).unwrap() {
2455 let src_path = entry.unwrap().path();
2456 let dst_path = storages_to_verify.join(src_path.file_name().unwrap());
2457 fs::copy(src_path, dst_path).unwrap();
2458 }
2459 }
2460 fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
2461 }
2462
2463 let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
2464 if version_path.is_file() {
2465 fs::remove_file(version_path).unwrap();
2466 }
2467
2468 let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2469 if state_complete_path.is_file() {
2470 fs::remove_file(state_complete_path).unwrap();
2471 }
2472
2473 assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
2474
2475 _ = fs::remove_dir(unpack_account_dir.join("accounts"));
2481 assert!(!dir_diff::is_different(&storages_to_verify, unpack_account_dir).unwrap());
2483}
2484
2485pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
2487 let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2488 purge_bank_snapshots(&bank_snapshots);
2489}
2490
2491pub fn purge_old_bank_snapshots(
2493 bank_snapshots_dir: impl AsRef<Path>,
2494 num_bank_snapshots_to_retain: usize,
2495 filter_by_kind: Option<BankSnapshotKind>,
2496) {
2497 let mut bank_snapshots = match filter_by_kind {
2498 Some(BankSnapshotKind::Pre) => get_bank_snapshots_pre(&bank_snapshots_dir),
2499 Some(BankSnapshotKind::Post) => get_bank_snapshots_post(&bank_snapshots_dir),
2500 None => get_bank_snapshots(&bank_snapshots_dir),
2501 };
2502
2503 bank_snapshots.sort_unstable();
2504 purge_bank_snapshots(
2505 bank_snapshots
2506 .iter()
2507 .rev()
2508 .skip(num_bank_snapshots_to_retain),
2509 );
2510}
2511
2512pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
2517 purge_old_bank_snapshots(&bank_snapshots_dir, 0, Some(BankSnapshotKind::Pre));
2518 purge_old_bank_snapshots(&bank_snapshots_dir, 1, Some(BankSnapshotKind::Post));
2519
2520 let highest_bank_snapshot_post = get_highest_bank_snapshot_post(&bank_snapshots_dir);
2521 if let Some(highest_bank_snapshot_post) = highest_bank_snapshot_post {
2522 debug!(
2523 "Retained bank snapshot for slot {}, and purged the rest.",
2524 highest_bank_snapshot_post.slot
2525 );
2526 }
2527}
2528
2529pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
2531 let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2532 bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
2533 purge_bank_snapshots(&bank_snapshots);
2534}
2535
2536fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
2540 for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
2541 if purge_bank_snapshot(snapshot_dir).is_err() {
2542 warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
2543 }
2544 }
2545}
2546
2547pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
2549 const FN_ERR: &str = "failed to purge bank snapshot";
2550 let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2551 if accounts_hardlinks_dir.is_dir() {
2552 let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
2555 IoError::other(format!(
2556 "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
2557 accounts_hardlinks_dir.display(),
2558 ))
2559 })?;
2560 for entry in read_dir {
2561 let accounts_hardlink_dir = entry?.path();
2562 let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
2563 IoError::other(format!(
2564 "{FN_ERR}: failed to read symlink '{}': {err}",
2565 accounts_hardlink_dir.display(),
2566 ))
2567 })?;
2568 move_and_async_delete_path(&accounts_hardlink_dir);
2569 }
2570 }
2571 fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
2572 IoError::other(format!(
2573 "{FN_ERR}: failed to remove dir '{}': {err}",
2574 bank_snapshot_dir.as_ref().display(),
2575 ))
2576 })?;
2577 Ok(())
2578}
2579
2580pub fn should_take_full_snapshot(
2581 block_height: Slot,
2582 full_snapshot_archive_interval_slots: Slot,
2583) -> bool {
2584 block_height % full_snapshot_archive_interval_slots == 0
2585}
2586
2587pub fn should_take_incremental_snapshot(
2588 block_height: Slot,
2589 incremental_snapshot_archive_interval_slots: Slot,
2590 latest_full_snapshot_slot: Option<Slot>,
2591) -> bool {
2592 block_height % incremental_snapshot_archive_interval_slots == 0
2593 && latest_full_snapshot_slot.is_some()
2594}
2595
2596#[cfg(feature = "dev-context-only-utils")]
2601pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
2602 let tmp_dir = tempfile::TempDir::new().unwrap();
2603 let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
2604 (tmp_dir, account_dir)
2605}
2606
2607#[cfg(test)]
2608mod tests {
2609 use {
2610 super::*,
2611 assert_matches::assert_matches,
2612 bincode::{deserialize_from, serialize_into},
2613 std::{convert::TryFrom, mem::size_of},
2614 tempfile::NamedTempFile,
2615 };
2616
2617 #[test]
2618 fn test_serialize_snapshot_data_file_under_limit() {
2619 let temp_dir = tempfile::TempDir::new().unwrap();
2620 let expected_consumed_size = size_of::<u32>() as u64;
2621 let consumed_size = serialize_snapshot_data_file_capped(
2622 &temp_dir.path().join("data-file"),
2623 expected_consumed_size,
2624 |stream| {
2625 serialize_into(stream, &2323_u32)?;
2626 Ok(())
2627 },
2628 )
2629 .unwrap();
2630 assert_eq!(consumed_size, expected_consumed_size);
2631 }
2632
2633 #[test]
2634 fn test_serialize_snapshot_data_file_over_limit() {
2635 let temp_dir = tempfile::TempDir::new().unwrap();
2636 let expected_consumed_size = size_of::<u32>() as u64;
2637 let result = serialize_snapshot_data_file_capped(
2638 &temp_dir.path().join("data-file"),
2639 expected_consumed_size - 1,
2640 |stream| {
2641 serialize_into(stream, &2323_u32)?;
2642 Ok(())
2643 },
2644 );
2645 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
2646 }
2647
2648 #[test]
2649 fn test_deserialize_snapshot_data_file_under_limit() {
2650 let expected_data = 2323_u32;
2651 let expected_consumed_size = size_of::<u32>() as u64;
2652
2653 let temp_dir = tempfile::TempDir::new().unwrap();
2654 serialize_snapshot_data_file_capped(
2655 &temp_dir.path().join("data-file"),
2656 expected_consumed_size,
2657 |stream| {
2658 serialize_into(stream, &expected_data)?;
2659 Ok(())
2660 },
2661 )
2662 .unwrap();
2663
2664 let snapshot_root_paths = SnapshotRootPaths {
2665 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2666 incremental_snapshot_root_file_path: None,
2667 };
2668
2669 let actual_data = deserialize_snapshot_data_files_capped(
2670 &snapshot_root_paths,
2671 expected_consumed_size,
2672 |stream| {
2673 Ok(deserialize_from::<_, u32>(
2674 &mut stream.full_snapshot_stream,
2675 )?)
2676 },
2677 )
2678 .unwrap();
2679 assert_eq!(actual_data, expected_data);
2680 }
2681
2682 #[test]
2683 fn test_deserialize_snapshot_data_file_over_limit() {
2684 let expected_data = 2323_u32;
2685 let expected_consumed_size = size_of::<u32>() as u64;
2686
2687 let temp_dir = tempfile::TempDir::new().unwrap();
2688 serialize_snapshot_data_file_capped(
2689 &temp_dir.path().join("data-file"),
2690 expected_consumed_size,
2691 |stream| {
2692 serialize_into(stream, &expected_data)?;
2693 Ok(())
2694 },
2695 )
2696 .unwrap();
2697
2698 let snapshot_root_paths = SnapshotRootPaths {
2699 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2700 incremental_snapshot_root_file_path: None,
2701 };
2702
2703 let result = deserialize_snapshot_data_files_capped(
2704 &snapshot_root_paths,
2705 expected_consumed_size - 1,
2706 |stream| {
2707 Ok(deserialize_from::<_, u32>(
2708 &mut stream.full_snapshot_stream,
2709 )?)
2710 },
2711 );
2712 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
2713 }
2714
2715 #[test]
2716 fn test_deserialize_snapshot_data_file_extra_data() {
2717 let expected_data = 2323_u32;
2718 let expected_consumed_size = size_of::<u32>() as u64;
2719
2720 let temp_dir = tempfile::TempDir::new().unwrap();
2721 serialize_snapshot_data_file_capped(
2722 &temp_dir.path().join("data-file"),
2723 expected_consumed_size * 2,
2724 |stream| {
2725 serialize_into(stream.by_ref(), &expected_data)?;
2726 serialize_into(stream.by_ref(), &expected_data)?;
2727 Ok(())
2728 },
2729 )
2730 .unwrap();
2731
2732 let snapshot_root_paths = SnapshotRootPaths {
2733 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2734 incremental_snapshot_root_file_path: None,
2735 };
2736
2737 let result = deserialize_snapshot_data_files_capped(
2738 &snapshot_root_paths,
2739 expected_consumed_size * 2,
2740 |stream| {
2741 Ok(deserialize_from::<_, u32>(
2742 &mut stream.full_snapshot_stream,
2743 )?)
2744 },
2745 );
2746 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2747 }
2748
2749 #[test]
2750 fn test_snapshot_version_from_file_under_limit() {
2751 let file_content = SnapshotVersion::default().as_str();
2752 let mut file = NamedTempFile::new().unwrap();
2753 file.write_all(file_content.as_bytes()).unwrap();
2754 let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2755 assert_eq!(version_from_file, file_content);
2756 }
2757
2758 #[test]
2759 fn test_snapshot_version_from_file_over_limit() {
2760 let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2761 let file_content = vec![7u8; over_limit_size];
2762 let mut file = NamedTempFile::new().unwrap();
2763 file.write_all(&file_content).unwrap();
2764 assert_matches!(
2765 snapshot_version_from_file(file.path()),
2766 Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("snapshot version file too large")
2767 );
2768 }
2769
2770 #[test]
2771 fn test_parse_full_snapshot_archive_filename() {
2772 assert_eq!(
2773 parse_full_snapshot_archive_filename(&format!(
2774 "snapshot-42-{}.tar.bz2",
2775 Hash::default()
2776 ))
2777 .unwrap(),
2778 (42, SnapshotHash(Hash::default()), ArchiveFormat::TarBzip2)
2779 );
2780 assert_eq!(
2781 parse_full_snapshot_archive_filename(&format!(
2782 "snapshot-43-{}.tar.zst",
2783 Hash::default()
2784 ))
2785 .unwrap(),
2786 (
2787 43,
2788 SnapshotHash(Hash::default()),
2789 ArchiveFormat::TarZstd {
2790 config: ZstdConfig::default(),
2791 }
2792 )
2793 );
2794 assert_eq!(
2795 parse_full_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default()))
2796 .unwrap(),
2797 (44, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2798 );
2799 assert_eq!(
2800 parse_full_snapshot_archive_filename(&format!(
2801 "snapshot-45-{}.tar.lz4",
2802 Hash::default()
2803 ))
2804 .unwrap(),
2805 (45, SnapshotHash(Hash::default()), ArchiveFormat::TarLz4)
2806 );
2807
2808 assert!(parse_full_snapshot_archive_filename("invalid").is_err());
2809 assert!(
2810 parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err()
2811 );
2812
2813 assert!(
2814 parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err()
2815 );
2816 assert!(parse_full_snapshot_archive_filename(&format!(
2817 "snapshot-12345678-{}.bad!ext",
2818 Hash::new_unique()
2819 ))
2820 .is_err());
2821 assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2822
2823 assert!(parse_full_snapshot_archive_filename(&format!(
2824 "snapshot-bad!slot-{}.bad!ext",
2825 Hash::new_unique()
2826 ))
2827 .is_err());
2828 assert!(parse_full_snapshot_archive_filename(&format!(
2829 "snapshot-12345678-{}.bad!ext",
2830 Hash::new_unique()
2831 ))
2832 .is_err());
2833 assert!(parse_full_snapshot_archive_filename(&format!(
2834 "snapshot-bad!slot-{}.tar",
2835 Hash::new_unique()
2836 ))
2837 .is_err());
2838
2839 assert!(parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_err());
2840 assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2841 assert!(parse_full_snapshot_archive_filename(&format!(
2842 "snapshot-bad!slot-{}.tar",
2843 Hash::new_unique()
2844 ))
2845 .is_err());
2846 }
2847
2848 #[test]
2849 fn test_parse_incremental_snapshot_archive_filename() {
2850 assert_eq!(
2851 parse_incremental_snapshot_archive_filename(&format!(
2852 "incremental-snapshot-42-123-{}.tar.bz2",
2853 Hash::default()
2854 ))
2855 .unwrap(),
2856 (
2857 42,
2858 123,
2859 SnapshotHash(Hash::default()),
2860 ArchiveFormat::TarBzip2
2861 )
2862 );
2863 assert_eq!(
2864 parse_incremental_snapshot_archive_filename(&format!(
2865 "incremental-snapshot-43-234-{}.tar.zst",
2866 Hash::default()
2867 ))
2868 .unwrap(),
2869 (
2870 43,
2871 234,
2872 SnapshotHash(Hash::default()),
2873 ArchiveFormat::TarZstd {
2874 config: ZstdConfig::default(),
2875 }
2876 )
2877 );
2878 assert_eq!(
2879 parse_incremental_snapshot_archive_filename(&format!(
2880 "incremental-snapshot-44-345-{}.tar",
2881 Hash::default()
2882 ))
2883 .unwrap(),
2884 (44, 345, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2885 );
2886 assert_eq!(
2887 parse_incremental_snapshot_archive_filename(&format!(
2888 "incremental-snapshot-45-456-{}.tar.lz4",
2889 Hash::default()
2890 ))
2891 .unwrap(),
2892 (
2893 45,
2894 456,
2895 SnapshotHash(Hash::default()),
2896 ArchiveFormat::TarLz4
2897 )
2898 );
2899
2900 assert!(parse_incremental_snapshot_archive_filename("invalid").is_err());
2901 assert!(parse_incremental_snapshot_archive_filename(&format!(
2902 "snapshot-42-{}.tar",
2903 Hash::new_unique()
2904 ))
2905 .is_err());
2906 assert!(parse_incremental_snapshot_archive_filename(
2907 "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext"
2908 )
2909 .is_err());
2910
2911 assert!(parse_incremental_snapshot_archive_filename(&format!(
2912 "incremental-snapshot-bad!slot-56785678-{}.tar",
2913 Hash::new_unique()
2914 ))
2915 .is_err());
2916
2917 assert!(parse_incremental_snapshot_archive_filename(&format!(
2918 "incremental-snapshot-12345678-bad!slot-{}.tar",
2919 Hash::new_unique()
2920 ))
2921 .is_err());
2922
2923 assert!(parse_incremental_snapshot_archive_filename(
2924 "incremental-snapshot-12341234-56785678-bad!HASH.tar"
2925 )
2926 .is_err());
2927
2928 assert!(parse_incremental_snapshot_archive_filename(&format!(
2929 "incremental-snapshot-12341234-56785678-{}.bad!ext",
2930 Hash::new_unique()
2931 ))
2932 .is_err());
2933 }
2934
2935 #[test]
2936 fn test_check_are_snapshots_compatible() {
2937 let slot1: Slot = 1234;
2938 let slot2: Slot = 5678;
2939 let slot3: Slot = 999_999;
2940
2941 let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
2942 format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()),
2943 ))
2944 .unwrap();
2945
2946 assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
2947
2948 let incremental_snapshot_archive_info =
2949 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2950 "/dir/incremental-snapshot-{}-{}-{}.tar",
2951 slot1,
2952 slot2,
2953 Hash::new_unique()
2954 )))
2955 .unwrap();
2956
2957 assert!(check_are_snapshots_compatible(
2958 &full_snapshot_archive_info,
2959 Some(&incremental_snapshot_archive_info)
2960 )
2961 .is_ok());
2962
2963 let incremental_snapshot_archive_info =
2964 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2965 "/dir/incremental-snapshot-{}-{}-{}.tar",
2966 slot2,
2967 slot3,
2968 Hash::new_unique()
2969 )))
2970 .unwrap();
2971
2972 assert!(check_are_snapshots_compatible(
2973 &full_snapshot_archive_info,
2974 Some(&incremental_snapshot_archive_info)
2975 )
2976 .is_err());
2977 }
2978
2979 fn common_create_bank_snapshot_files(
2981 bank_snapshots_dir: &Path,
2982 min_slot: Slot,
2983 max_slot: Slot,
2984 ) {
2985 for slot in min_slot..max_slot {
2986 let snapshot_dir = get_bank_snapshot_dir(bank_snapshots_dir, slot);
2987 fs::create_dir_all(&snapshot_dir).unwrap();
2988
2989 let snapshot_filename = get_snapshot_file_name(slot);
2990 let snapshot_path = snapshot_dir.join(snapshot_filename);
2991 fs::File::create(snapshot_path).unwrap();
2992
2993 let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2994 fs::File::create(status_cache_file).unwrap();
2995
2996 let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
2997 fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
2998
2999 write_snapshot_state_complete_file(snapshot_dir).unwrap();
3001 }
3002 }
3003
3004 #[test]
3005 fn test_get_bank_snapshots() {
3006 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
3007 let min_slot = 10;
3008 let max_slot = 20;
3009 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
3010
3011 let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
3012 assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
3013 }
3014
3015 #[test]
3016 fn test_get_highest_bank_snapshot_post() {
3017 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
3018 let min_slot = 99;
3019 let max_slot = 123;
3020 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
3021
3022 let highest_bank_snapshot = get_highest_bank_snapshot_post(temp_snapshots_dir.path());
3023 assert!(highest_bank_snapshot.is_some());
3024 assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
3025 }
3026
3027 fn common_create_snapshot_archive_files(
3033 full_snapshot_archives_dir: &Path,
3034 incremental_snapshot_archives_dir: &Path,
3035 min_full_snapshot_slot: Slot,
3036 max_full_snapshot_slot: Slot,
3037 min_incremental_snapshot_slot: Slot,
3038 max_incremental_snapshot_slot: Slot,
3039 ) {
3040 fs::create_dir_all(full_snapshot_archives_dir).unwrap();
3041 fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
3042 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3043 for incremental_snapshot_slot in
3044 min_incremental_snapshot_slot..max_incremental_snapshot_slot
3045 {
3046 let snapshot_filename = format!(
3047 "incremental-snapshot-{}-{}-{}.tar",
3048 full_snapshot_slot,
3049 incremental_snapshot_slot,
3050 Hash::default()
3051 );
3052 let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
3053 fs::File::create(snapshot_filepath).unwrap();
3054 }
3055
3056 let snapshot_filename =
3057 format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3058 let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
3059 fs::File::create(snapshot_filepath).unwrap();
3060
3061 let bad_filename = format!(
3063 "incremental-snapshot-{}-{}-bad!hash.tar",
3064 full_snapshot_slot,
3065 max_incremental_snapshot_slot + 1,
3066 );
3067 let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
3068 fs::File::create(bad_filepath).unwrap();
3069 }
3070
3071 let bad_filename = format!("snapshot-{}-bad!hash.tar", max_full_snapshot_slot + 1);
3074 let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
3075 fs::File::create(bad_filepath).unwrap();
3076 }
3077
3078 #[test]
3079 fn test_get_full_snapshot_archives() {
3080 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3081 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3082 let min_slot = 123;
3083 let max_slot = 456;
3084 common_create_snapshot_archive_files(
3085 full_snapshot_archives_dir.path(),
3086 incremental_snapshot_archives_dir.path(),
3087 min_slot,
3088 max_slot,
3089 0,
3090 0,
3091 );
3092
3093 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3094 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3095 }
3096
3097 #[test]
3098 fn test_get_full_snapshot_archives_remote() {
3099 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3100 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3101 let min_slot = 123;
3102 let max_slot = 456;
3103 common_create_snapshot_archive_files(
3104 &full_snapshot_archives_dir
3105 .path()
3106 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3107 &incremental_snapshot_archives_dir
3108 .path()
3109 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3110 min_slot,
3111 max_slot,
3112 0,
3113 0,
3114 );
3115
3116 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3117 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3118 assert!(snapshot_archives.iter().all(|info| info.is_remote()));
3119 }
3120
3121 #[test]
3122 fn test_get_incremental_snapshot_archives() {
3123 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3124 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3125 let min_full_snapshot_slot = 12;
3126 let max_full_snapshot_slot = 23;
3127 let min_incremental_snapshot_slot = 34;
3128 let max_incremental_snapshot_slot = 45;
3129 common_create_snapshot_archive_files(
3130 full_snapshot_archives_dir.path(),
3131 incremental_snapshot_archives_dir.path(),
3132 min_full_snapshot_slot,
3133 max_full_snapshot_slot,
3134 min_incremental_snapshot_slot,
3135 max_incremental_snapshot_slot,
3136 );
3137
3138 let incremental_snapshot_archives =
3139 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3140 assert_eq!(
3141 incremental_snapshot_archives.len() as Slot,
3142 (max_full_snapshot_slot - min_full_snapshot_slot)
3143 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3144 );
3145 }
3146
3147 #[test]
3148 fn test_get_incremental_snapshot_archives_remote() {
3149 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3150 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3151 let min_full_snapshot_slot = 12;
3152 let max_full_snapshot_slot = 23;
3153 let min_incremental_snapshot_slot = 34;
3154 let max_incremental_snapshot_slot = 45;
3155 common_create_snapshot_archive_files(
3156 &full_snapshot_archives_dir
3157 .path()
3158 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3159 &incremental_snapshot_archives_dir
3160 .path()
3161 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3162 min_full_snapshot_slot,
3163 max_full_snapshot_slot,
3164 min_incremental_snapshot_slot,
3165 max_incremental_snapshot_slot,
3166 );
3167
3168 let incremental_snapshot_archives =
3169 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3170 assert_eq!(
3171 incremental_snapshot_archives.len() as Slot,
3172 (max_full_snapshot_slot - min_full_snapshot_slot)
3173 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3174 );
3175 assert!(incremental_snapshot_archives
3176 .iter()
3177 .all(|info| info.is_remote()));
3178 }
3179
3180 #[test]
3181 fn test_get_highest_full_snapshot_archive_slot() {
3182 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3183 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3184 let min_slot = 123;
3185 let max_slot = 456;
3186 common_create_snapshot_archive_files(
3187 full_snapshot_archives_dir.path(),
3188 incremental_snapshot_archives_dir.path(),
3189 min_slot,
3190 max_slot,
3191 0,
3192 0,
3193 );
3194
3195 assert_eq!(
3196 get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
3197 Some(max_slot - 1)
3198 );
3199 }
3200
3201 #[test]
3202 fn test_get_highest_incremental_snapshot_slot() {
3203 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3204 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3205 let min_full_snapshot_slot = 12;
3206 let max_full_snapshot_slot = 23;
3207 let min_incremental_snapshot_slot = 34;
3208 let max_incremental_snapshot_slot = 45;
3209 common_create_snapshot_archive_files(
3210 full_snapshot_archives_dir.path(),
3211 incremental_snapshot_archives_dir.path(),
3212 min_full_snapshot_slot,
3213 max_full_snapshot_slot,
3214 min_incremental_snapshot_slot,
3215 max_incremental_snapshot_slot,
3216 );
3217
3218 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3219 assert_eq!(
3220 get_highest_incremental_snapshot_archive_slot(
3221 incremental_snapshot_archives_dir.path(),
3222 full_snapshot_slot
3223 ),
3224 Some(max_incremental_snapshot_slot - 1)
3225 );
3226 }
3227
3228 assert_eq!(
3229 get_highest_incremental_snapshot_archive_slot(
3230 incremental_snapshot_archives_dir.path(),
3231 max_full_snapshot_slot
3232 ),
3233 None
3234 );
3235 }
3236
3237 fn common_test_purge_old_snapshot_archives(
3238 snapshot_names: &[&String],
3239 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
3240 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
3241 expected_snapshots: &[&String],
3242 ) {
3243 let temp_snap_dir = tempfile::TempDir::new().unwrap();
3244
3245 for snap_name in snapshot_names {
3246 let snap_path = temp_snap_dir.path().join(snap_name);
3247 let mut _snap_file = fs::File::create(snap_path);
3248 }
3249 purge_old_snapshot_archives(
3250 temp_snap_dir.path(),
3251 temp_snap_dir.path(),
3252 maximum_full_snapshot_archives_to_retain,
3253 maximum_incremental_snapshot_archives_to_retain,
3254 );
3255
3256 let mut retained_snaps = HashSet::new();
3257 for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
3258 let entry_path_buf = entry.unwrap().path();
3259 let entry_path = entry_path_buf.as_path();
3260 let snapshot_name = entry_path
3261 .file_name()
3262 .unwrap()
3263 .to_str()
3264 .unwrap()
3265 .to_string();
3266 retained_snaps.insert(snapshot_name);
3267 }
3268
3269 for snap_name in expected_snapshots {
3270 assert!(
3271 retained_snaps.contains(snap_name.as_str()),
3272 "{snap_name} not found"
3273 );
3274 }
3275 assert_eq!(retained_snaps.len(), expected_snapshots.len());
3276 }
3277
3278 #[test]
3279 fn test_purge_old_full_snapshot_archives() {
3280 let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
3281 let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
3282 let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
3283 let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
3284
3285 let expected_snapshots = vec![&snap3_name];
3287 common_test_purge_old_snapshot_archives(
3288 &snapshot_names,
3289 NonZeroUsize::new(1).unwrap(),
3290 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3291 &expected_snapshots,
3292 );
3293
3294 let expected_snapshots = vec![&snap2_name, &snap3_name];
3296 common_test_purge_old_snapshot_archives(
3297 &snapshot_names,
3298 NonZeroUsize::new(2).unwrap(),
3299 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3300 &expected_snapshots,
3301 );
3302
3303 let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
3305 common_test_purge_old_snapshot_archives(
3306 &snapshot_names,
3307 NonZeroUsize::new(3).unwrap(),
3308 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3309 &expected_snapshots,
3310 );
3311 }
3312
3313 #[test]
3317 fn test_purge_old_full_snapshot_archives_in_the_loop() {
3318 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3319 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3320 let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
3321 let starting_slot: Slot = 42;
3322
3323 for slot in (starting_slot..).take(100) {
3324 let full_snapshot_archive_file_name =
3325 format!("snapshot-{}-{}.tar", slot, Hash::default());
3326 let full_snapshot_archive_path = full_snapshot_archives_dir
3327 .as_ref()
3328 .join(full_snapshot_archive_file_name);
3329 fs::File::create(full_snapshot_archive_path).unwrap();
3330
3331 if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
3333 continue;
3334 }
3335
3336 if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
3338 continue;
3339 }
3340
3341 purge_old_snapshot_archives(
3342 &full_snapshot_archives_dir,
3343 &incremental_snapshot_archives_dir,
3344 maximum_snapshots_to_retain,
3345 NonZeroUsize::new(usize::MAX).unwrap(),
3346 );
3347 let mut full_snapshot_archives =
3348 get_full_snapshot_archives(&full_snapshot_archives_dir);
3349 full_snapshot_archives.sort_unstable();
3350 assert_eq!(
3351 full_snapshot_archives.len(),
3352 maximum_snapshots_to_retain.get()
3353 );
3354 assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
3355 for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
3356 assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
3357 }
3358 }
3359 }
3360
3361 #[test]
3362 fn test_purge_old_incremental_snapshot_archives() {
3363 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3364 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3365 let starting_slot = 100_000;
3366
3367 let maximum_incremental_snapshot_archives_to_retain =
3368 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3369 let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3370
3371 let incremental_snapshot_interval = 100;
3372 let num_incremental_snapshots_per_full_snapshot =
3373 maximum_incremental_snapshot_archives_to_retain.get() * 2;
3374 let full_snapshot_interval =
3375 incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
3376
3377 let mut snapshot_filenames = vec![];
3378 (starting_slot..)
3379 .step_by(full_snapshot_interval)
3380 .take(
3381 maximum_full_snapshot_archives_to_retain
3382 .checked_mul(NonZeroUsize::new(2).unwrap())
3383 .unwrap()
3384 .get(),
3385 )
3386 .for_each(|full_snapshot_slot| {
3387 let snapshot_filename =
3388 format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3389 let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
3390 fs::File::create(snapshot_path).unwrap();
3391 snapshot_filenames.push(snapshot_filename);
3392
3393 (full_snapshot_slot..)
3394 .step_by(incremental_snapshot_interval)
3395 .take(num_incremental_snapshots_per_full_snapshot)
3396 .skip(1)
3397 .for_each(|incremental_snapshot_slot| {
3398 let snapshot_filename = format!(
3399 "incremental-snapshot-{}-{}-{}.tar",
3400 full_snapshot_slot,
3401 incremental_snapshot_slot,
3402 Hash::default()
3403 );
3404 let snapshot_path = incremental_snapshot_archives_dir
3405 .path()
3406 .join(&snapshot_filename);
3407 fs::File::create(snapshot_path).unwrap();
3408 snapshot_filenames.push(snapshot_filename);
3409 });
3410 });
3411
3412 purge_old_snapshot_archives(
3413 full_snapshot_archives_dir.path(),
3414 incremental_snapshot_archives_dir.path(),
3415 maximum_full_snapshot_archives_to_retain,
3416 maximum_incremental_snapshot_archives_to_retain,
3417 );
3418
3419 let mut remaining_full_snapshot_archives =
3421 get_full_snapshot_archives(full_snapshot_archives_dir.path());
3422 assert_eq!(
3423 remaining_full_snapshot_archives.len(),
3424 maximum_full_snapshot_archives_to_retain.get(),
3425 );
3426 remaining_full_snapshot_archives.sort_unstable();
3427 let latest_full_snapshot_archive_slot =
3428 remaining_full_snapshot_archives.last().unwrap().slot();
3429
3430 let mut remaining_incremental_snapshot_archives =
3435 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3436 assert_eq!(
3437 remaining_incremental_snapshot_archives.len(),
3438 maximum_incremental_snapshot_archives_to_retain
3439 .get()
3440 .saturating_add(
3441 maximum_full_snapshot_archives_to_retain
3442 .get()
3443 .saturating_sub(1)
3444 )
3445 );
3446 remaining_incremental_snapshot_archives.sort_unstable();
3447 remaining_incremental_snapshot_archives.reverse();
3448
3449 for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
3451 let incremental_snapshot_archive =
3452 remaining_incremental_snapshot_archives.pop().unwrap();
3453
3454 let expected_base_slot =
3455 latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
3456 assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
3457 let expected_slot = expected_base_slot
3458 + (full_snapshot_interval - incremental_snapshot_interval) as u64;
3459 assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
3460 }
3461
3462 for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
3464 assert_eq!(
3465 incremental_snapshot_archive.base_slot(),
3466 latest_full_snapshot_archive_slot
3467 );
3468 }
3469
3470 let expected_remaining_incremental_snapshot_archive_slots =
3472 (latest_full_snapshot_archive_slot..)
3473 .step_by(incremental_snapshot_interval)
3474 .take(num_incremental_snapshots_per_full_snapshot)
3475 .skip(
3476 num_incremental_snapshots_per_full_snapshot
3477 - maximum_incremental_snapshot_archives_to_retain.get(),
3478 )
3479 .collect::<HashSet<_>>();
3480
3481 let actual_remaining_incremental_snapshot_archive_slots =
3482 remaining_incremental_snapshot_archives
3483 .iter()
3484 .map(|snapshot| snapshot.slot())
3485 .collect::<HashSet<_>>();
3486 assert_eq!(
3487 actual_remaining_incremental_snapshot_archive_slots,
3488 expected_remaining_incremental_snapshot_archive_slots
3489 );
3490 }
3491
3492 #[test]
3493 fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
3494 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3495 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3496
3497 for snapshot_filenames in [
3498 format!("incremental-snapshot-100-120-{}.tar", Hash::default()),
3499 format!("incremental-snapshot-100-140-{}.tar", Hash::default()),
3500 format!("incremental-snapshot-100-160-{}.tar", Hash::default()),
3501 format!("incremental-snapshot-100-180-{}.tar", Hash::default()),
3502 format!("incremental-snapshot-200-220-{}.tar", Hash::default()),
3503 format!("incremental-snapshot-200-240-{}.tar", Hash::default()),
3504 format!("incremental-snapshot-200-260-{}.tar", Hash::default()),
3505 format!("incremental-snapshot-200-280-{}.tar", Hash::default()),
3506 ] {
3507 let snapshot_path = incremental_snapshot_archives_dir
3508 .path()
3509 .join(snapshot_filenames);
3510 fs::File::create(snapshot_path).unwrap();
3511 }
3512
3513 purge_old_snapshot_archives(
3514 full_snapshot_archives_dir.path(),
3515 incremental_snapshot_archives_dir.path(),
3516 NonZeroUsize::new(usize::MAX).unwrap(),
3517 NonZeroUsize::new(usize::MAX).unwrap(),
3518 );
3519
3520 let remaining_incremental_snapshot_archives =
3521 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3522 assert!(remaining_incremental_snapshot_archives.is_empty());
3523 }
3524
3525 #[test]
3526 fn test_get_snapshot_accounts_hardlink_dir() {
3527 let slot: Slot = 1;
3528
3529 let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
3530
3531 let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
3532 let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
3533 let accounts_hardlinks_dir = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
3534 fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
3535
3536 let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
3537 let appendvec_filename = format!("{slot}.0");
3538 let appendvec_path = accounts_dir.join(appendvec_filename);
3539
3540 let ret = get_snapshot_accounts_hardlink_dir(
3541 &appendvec_path,
3542 slot,
3543 &mut account_paths_set,
3544 &accounts_hardlinks_dir,
3545 );
3546 assert!(ret.is_ok());
3547
3548 let wrong_appendvec_path = appendvec_path
3549 .parent()
3550 .unwrap()
3551 .parent()
3552 .unwrap()
3553 .join(appendvec_path.file_name().unwrap());
3554 let ret = get_snapshot_accounts_hardlink_dir(
3555 &wrong_appendvec_path,
3556 slot,
3557 &mut account_paths_set,
3558 accounts_hardlinks_dir,
3559 );
3560
3561 assert_matches!(
3562 ret,
3563 Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
3564 );
3565 }
3566
3567 #[test]
3568 fn test_full_snapshot_slot_file_good() {
3569 let slot_written = 123_456_789;
3570 let bank_snapshot_dir = TempDir::new().unwrap();
3571 write_full_snapshot_slot_file(&bank_snapshot_dir, slot_written).unwrap();
3572
3573 let slot_read = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap();
3574 assert_eq!(slot_read, slot_written);
3575 }
3576
3577 #[test]
3578 fn test_full_snapshot_slot_file_bad() {
3579 const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
3580 let too_small = [1u8; SLOT_SIZE - 1];
3581 let too_large = [1u8; SLOT_SIZE + 1];
3582
3583 for contents in [too_small.as_slice(), too_large.as_slice()] {
3584 let bank_snapshot_dir = TempDir::new().unwrap();
3585 let full_snapshot_slot_path = bank_snapshot_dir
3586 .as_ref()
3587 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
3588 fs::write(full_snapshot_slot_path, contents).unwrap();
3589
3590 let err = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap_err();
3591 assert!(err
3592 .to_string()
3593 .starts_with("invalid full snapshot slot file size"));
3594 }
3595 }
3596}