1use {
2 crate::{
3 bank::{BankFieldsToDeserialize, BankFieldsToSerialize, BankHashStats, BankSlotDelta},
4 serde_snapshot::{
5 self, AccountsDbFields, ExtraFieldsToSerialize, SerializableAccountStorageEntry,
6 SnapshotAccountsDbFields, SnapshotBankFields, SnapshotStreams,
7 },
8 snapshot_archive_info::{
9 FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfo,
10 SnapshotArchiveInfoGetter,
11 },
12 snapshot_config::SnapshotConfig,
13 snapshot_hash::SnapshotHash,
14 snapshot_package::{SnapshotKind, SnapshotPackage},
15 snapshot_utils::snapshot_storage_rebuilder::{
16 get_slot_and_append_vec_id, SnapshotStorageRebuilder,
17 },
18 },
19 crossbeam_channel::{Receiver, Sender},
20 log::*,
21 regex::Regex,
22 solana_accounts_db::{
23 account_storage::{AccountStorageMap, AccountStoragesOrderer},
24 account_storage_reader::AccountStorageReader,
25 accounts_db::{AccountStorageEntry, AtomicAccountsFileId},
26 accounts_file::{AccountsFile, AccountsFileError, StorageAccess},
27 hardened_unpack::{self, UnpackError},
28 utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
29 },
30 solana_clock::{Epoch, Slot},
31 solana_hash::Hash,
32 solana_measure::{measure::Measure, measure_time, measure_us},
33 std::{
34 cmp::Ordering,
35 collections::{HashMap, HashSet},
36 fmt, fs,
37 io::{self, BufRead, BufReader, BufWriter, Error as IoError, Read, Seek, Write},
38 mem,
39 num::{NonZeroU64, NonZeroUsize},
40 ops::RangeInclusive,
41 path::{Path, PathBuf},
42 process::ExitStatus,
43 str::FromStr,
44 sync::{Arc, LazyLock},
45 thread::{Builder, JoinHandle},
46 },
47 tar::{self, Archive},
48 tempfile::TempDir,
49 thiserror::Error,
50};
51#[cfg(feature = "dev-context-only-utils")]
52use {
53 hardened_unpack::UnpackedAppendVecMap,
54 solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
55};
56
57mod archive_format;
58mod snapshot_interval;
59pub mod snapshot_storage_rebuilder;
60pub use {archive_format::*, snapshot_interval::SnapshotInterval};
61
62pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
63pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
64pub const SNAPSHOT_STATE_COMPLETE_FILENAME: &str = "state_complete";
65pub const SNAPSHOT_STORAGES_FLUSHED_FILENAME: &str = "storages_flushed";
66pub const SNAPSHOT_ACCOUNTS_HARDLINKS: &str = "accounts_hardlinks";
67pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
68pub const SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME: &str = "full_snapshot_slot";
69pub const BANK_SNAPSHOTS_DIR: &str = "snapshots";
73pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; const VERSION_STRING_V1_2_0: &str = "1.2.0";
76pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
77pub const BANK_SNAPSHOT_PRE_FILENAME_EXTENSION: &str = "pre";
78pub const DEFAULT_FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS: NonZeroU64 =
79 NonZeroU64::new(100_000).unwrap();
80pub const DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS: NonZeroU64 =
81 NonZeroU64::new(100).unwrap();
82pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
83 NonZeroUsize::new(2).unwrap();
84pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
85 NonZeroUsize::new(4).unwrap();
86pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str =
87 r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar\.zst|tar\.lz4)$";
88pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar\.zst|tar\.lz4)$";
89
90const MAX_SNAPSHOT_READER_BUF_SIZE: u64 = 128 * 1024 * 1024;
93const INTERLEAVE_TAR_ENTRIES_SMALL_TO_LARGE_RATIO: (usize, usize) = (4, 1);
97
98#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
99pub enum SnapshotVersion {
100 #[default]
101 V1_2_0,
102}
103
104impl fmt::Display for SnapshotVersion {
105 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
106 f.write_str(From::from(*self))
107 }
108}
109
110impl From<SnapshotVersion> for &'static str {
111 fn from(snapshot_version: SnapshotVersion) -> &'static str {
112 match snapshot_version {
113 SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
114 }
115 }
116}
117
118impl FromStr for SnapshotVersion {
119 type Err = &'static str;
120
121 fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
122 let version_string = if version_string
124 .get(..1)
125 .is_some_and(|s| s.eq_ignore_ascii_case("v"))
126 {
127 &version_string[1..]
128 } else {
129 version_string
130 };
131 match version_string {
132 VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
133 _ => Err("unsupported snapshot version"),
134 }
135 }
136}
137
138impl SnapshotVersion {
139 pub fn as_str(self) -> &'static str {
140 <&str as From<Self>>::from(self)
141 }
142}
143
144#[derive(PartialEq, Eq, Debug)]
147pub struct BankSnapshotInfo {
148 pub slot: Slot,
150 pub snapshot_kind: BankSnapshotKind,
152 pub snapshot_dir: PathBuf,
154 pub snapshot_version: SnapshotVersion,
156}
157
158impl PartialOrd for BankSnapshotInfo {
159 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
160 Some(self.cmp(other))
161 }
162}
163
164impl Ord for BankSnapshotInfo {
166 fn cmp(&self, other: &Self) -> Ordering {
167 self.slot.cmp(&other.slot)
168 }
169}
170
171impl BankSnapshotInfo {
172 pub fn new_from_dir(
173 bank_snapshots_dir: impl AsRef<Path>,
174 slot: Slot,
175 ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
176 let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
179
180 if !bank_snapshot_dir.is_dir() {
181 return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
182 bank_snapshot_dir,
183 ));
184 }
185
186 if !is_bank_snapshot_complete(&bank_snapshot_dir) {
192 return Err(SnapshotNewFromDirError::IncompleteDir(bank_snapshot_dir));
193 }
194
195 let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
196 if !status_cache_file.is_file() {
197 return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
198 status_cache_file,
199 ));
200 }
201
202 let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
203 let version_str = snapshot_version_from_file(&version_path).or(Err(
204 SnapshotNewFromDirError::MissingVersionFile(version_path),
205 ))?;
206 let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
207 .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
208
209 let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
210 let bank_snapshot_pre_path =
211 bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
212
213 let snapshot_kind = if bank_snapshot_pre_path.is_file() {
222 BankSnapshotKind::Pre
223 } else if bank_snapshot_post_path.is_file() {
224 BankSnapshotKind::Post
225 } else {
226 return Err(SnapshotNewFromDirError::MissingSnapshotFile(
227 bank_snapshot_dir,
228 ));
229 };
230
231 Ok(BankSnapshotInfo {
232 slot,
233 snapshot_kind,
234 snapshot_dir: bank_snapshot_dir,
235 snapshot_version,
236 })
237 }
238
239 pub fn snapshot_path(&self) -> PathBuf {
240 let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot));
241
242 let ext = match self.snapshot_kind {
243 BankSnapshotKind::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
244 BankSnapshotKind::Post => "",
245 };
246 bank_snapshot_path.set_extension(ext);
247
248 bank_snapshot_path
249 }
250}
251
252#[derive(Debug, Copy, Clone, Eq, PartialEq)]
261pub enum BankSnapshotKind {
262 Pre,
264 Post,
266}
267
268#[derive(Clone, Copy, Debug, Eq, PartialEq)]
272pub enum SnapshotFrom {
273 Archive,
275 Dir,
277}
278
279#[derive(Debug)]
282pub struct SnapshotRootPaths {
283 pub full_snapshot_root_file_path: PathBuf,
284 pub incremental_snapshot_root_file_path: Option<PathBuf>,
285}
286
287#[derive(Debug)]
289pub struct UnarchivedSnapshot {
290 #[allow(dead_code)]
291 unpack_dir: TempDir,
292 pub storage: AccountStorageMap,
293 pub bank_fields: BankFieldsToDeserialize,
294 pub accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
295 pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
296 pub measure_untar: Measure,
297}
298
299#[derive(Debug)]
301pub struct UnarchivedSnapshots {
302 pub full_storage: AccountStorageMap,
303 pub incremental_storage: Option<AccountStorageMap>,
304 pub bank_fields: SnapshotBankFields,
305 pub accounts_db_fields: SnapshotAccountsDbFields<SerializableAccountStorageEntry>,
306 pub full_unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
307 pub incremental_unpacked_snapshots_dir_and_version: Option<UnpackedSnapshotsDirAndVersion>,
308 pub full_measure_untar: Measure,
309 pub incremental_measure_untar: Option<Measure>,
310 pub next_append_vec_id: AtomicAccountsFileId,
311}
312
313#[allow(dead_code)]
316#[derive(Debug)]
317pub struct UnarchivedSnapshotsGuard {
318 full_unpack_dir: TempDir,
319 incremental_unpack_dir: Option<TempDir>,
320}
321#[derive(Debug)]
323pub struct UnpackedSnapshotsDirAndVersion {
324 pub unpacked_snapshots_dir: PathBuf,
325 pub snapshot_version: SnapshotVersion,
326}
327
328pub(crate) struct StorageAndNextAccountsFileId {
331 pub storage: AccountStorageMap,
332 pub next_append_vec_id: AtomicAccountsFileId,
333}
334
335#[derive(Error, Debug)]
336#[allow(clippy::large_enum_variant)]
337pub enum SnapshotError {
338 #[error("I/O error: {0}")]
339 Io(#[from] IoError),
340
341 #[error("AccountsFile error: {0}")]
342 AccountsFileError(#[from] AccountsFileError),
343
344 #[error("serialization error: {0}")]
345 Serialize(#[from] bincode::Error),
346
347 #[error("crossbeam send error: {0}")]
348 CrossbeamSend(#[from] crossbeam_channel::SendError<PathBuf>),
349
350 #[error("archive generation failure {0}")]
351 ArchiveGenerationFailure(ExitStatus),
352
353 #[error("Unpack error: {0}")]
354 UnpackError(#[from] UnpackError),
355
356 #[error("source({1}) - I/O error: {0}")]
357 IoWithSource(IoError, &'static str),
358
359 #[error("could not get file name from path '{0}'")]
360 PathToFileNameError(PathBuf),
361
362 #[error("could not get str from file name '{0}'")]
363 FileNameToStrError(PathBuf),
364
365 #[error("could not parse snapshot archive's file name '{0}'")]
366 ParseSnapshotArchiveFileNameError(String),
367
368 #[error(
369 "snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot \
370 ({1}) do not match"
371 )]
372 MismatchedBaseSlot(Slot, Slot),
373
374 #[error("no snapshot archives to load from '{0}'")]
375 NoSnapshotArchives(PathBuf),
376
377 #[error("snapshot slot mismatch: deserialized bank: {0}, snapshot archive: {1}")]
378 MismatchedSlot(Slot, Slot),
379
380 #[error("snapshot hash mismatch: deserialized bank: {0:?}, snapshot archive: {1:?}")]
381 MismatchedHash(SnapshotHash, SnapshotHash),
382
383 #[error("snapshot slot deltas are invalid: {0}")]
384 VerifySlotDeltas(#[from] VerifySlotDeltasError),
385
386 #[error("snapshot epoch stakes are invalid: {0}")]
387 VerifyEpochStakes(#[from] VerifyEpochStakesError),
388
389 #[error("bank_snapshot_info new_from_dir failed: {0}")]
390 NewFromDir(#[from] SnapshotNewFromDirError),
391
392 #[error("invalid snapshot dir path '{0}'")]
393 InvalidSnapshotDirPath(PathBuf),
394
395 #[error("invalid AppendVec path '{0}'")]
396 InvalidAppendVecPath(PathBuf),
397
398 #[error("invalid account path '{0}'")]
399 InvalidAccountPath(PathBuf),
400
401 #[error("no valid snapshot dir found under '{0}'")]
402 NoSnapshotSlotDir(PathBuf),
403
404 #[error("snapshot dir account paths mismatching")]
405 AccountPathsMismatch,
406
407 #[error("failed to add bank snapshot for slot {1}: {0}")]
408 AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
409
410 #[error("failed to archive snapshot package: {0}")]
411 ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
412
413 #[error("failed to rebuild snapshot storages: {0}")]
414 RebuildStorages(String),
415}
416
417#[derive(Error, Debug)]
418pub enum SnapshotNewFromDirError {
419 #[error("invalid bank snapshot directory '{0}'")]
420 InvalidBankSnapshotDir(PathBuf),
421
422 #[error("missing status cache file '{0}'")]
423 MissingStatusCacheFile(PathBuf),
424
425 #[error("missing version file '{0}'")]
426 MissingVersionFile(PathBuf),
427
428 #[error("invalid snapshot version '{0}'")]
429 InvalidVersion(String),
430
431 #[error("snapshot directory incomplete '{0}'")]
432 IncompleteDir(PathBuf),
433
434 #[error("missing snapshot file '{0}'")]
435 MissingSnapshotFile(PathBuf),
436}
437
438pub type Result<T> = std::result::Result<T, SnapshotError>;
439
440#[derive(Error, Debug, PartialEq, Eq)]
442pub enum VerifySlotDeltasError {
443 #[error("too many entries: {0} (max: {1})")]
444 TooManyEntries(usize, usize),
445
446 #[error("slot {0} is not a root")]
447 SlotIsNotRoot(Slot),
448
449 #[error("slot {0} is greater than bank slot {1}")]
450 SlotGreaterThanMaxRoot(Slot, Slot),
451
452 #[error("slot {0} has multiple entries")]
453 SlotHasMultipleEntries(Slot),
454
455 #[error("slot {0} was not found in slot history")]
456 SlotNotFoundInHistory(Slot),
457
458 #[error("slot {0} was in history but missing from slot deltas")]
459 SlotNotFoundInDeltas(Slot),
460
461 #[error("snapshot slot history is invalid: {0}")]
462 VerifySlotHistory(#[from] VerifySlotHistoryError),
463}
464
465#[derive(Error, Debug, PartialEq, Eq)]
467pub enum VerifySlotHistoryError {
468 #[error("newest slot does not match snapshot")]
469 InvalidNewestSlot,
470
471 #[error("invalid number of entries")]
472 InvalidNumEntries,
473}
474
475#[derive(Error, Debug, PartialEq, Eq)]
477pub enum VerifyEpochStakesError {
478 #[error("epoch {0} is greater than the max {1}")]
479 EpochGreaterThanMax(Epoch, Epoch),
480
481 #[error("stakes not found for epoch {0} (required epochs: {1:?})")]
482 StakesNotFound(Epoch, RangeInclusive<Epoch>),
483}
484
485#[derive(Error, Debug)]
487pub enum AddBankSnapshotError {
488 #[error("bank snapshot dir already exists '{0}'")]
489 SnapshotDirAlreadyExists(PathBuf),
490
491 #[error("failed to create snapshot dir '{1}': {0}")]
492 CreateSnapshotDir(#[source] IoError, PathBuf),
493
494 #[error("failed to flush storage '{1}': {0}")]
495 FlushStorage(#[source] AccountsFileError, PathBuf),
496
497 #[error("failed to mark snapshot storages as 'flushed': {0}")]
498 MarkStoragesFlushed(#[source] IoError),
499
500 #[error("failed to hard link storages: {0}")]
501 HardLinkStorages(#[source] HardLinkStoragesToSnapshotError),
502
503 #[error("failed to serialize bank: {0}")]
504 SerializeBank(#[source] Box<SnapshotError>),
505
506 #[error("failed to serialize status cache: {0}")]
507 SerializeStatusCache(#[source] Box<SnapshotError>),
508
509 #[error("failed to write snapshot version file '{1}': {0}")]
510 WriteSnapshotVersionFile(#[source] IoError, PathBuf),
511
512 #[error("failed to mark snapshot as 'complete': {0}")]
513 MarkSnapshotComplete(#[source] IoError),
514}
515
516#[derive(Error, Debug)]
518pub enum ArchiveSnapshotPackageError {
519 #[error("failed to create archive path '{1}': {0}")]
520 CreateArchiveDir(#[source] IoError, PathBuf),
521
522 #[error("failed to create staging dir inside '{1}': {0}")]
523 CreateStagingDir(#[source] IoError, PathBuf),
524
525 #[error("failed to create snapshot staging dir '{1}': {0}")]
526 CreateSnapshotStagingDir(#[source] IoError, PathBuf),
527
528 #[error("failed to canonicalize snapshot source dir '{1}': {0}")]
529 CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),
530
531 #[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
532 SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),
533
534 #[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
535 SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),
536
537 #[error("failed to symlink version file from '{1}' to '{2}': {0}")]
538 SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),
539
540 #[error("failed to create archive file '{1}': {0}")]
541 CreateArchiveFile(#[source] IoError, PathBuf),
542
543 #[error("failed to archive version file: {0}")]
544 ArchiveVersionFile(#[source] IoError),
545
546 #[error("failed to archive snapshots dir: {0}")]
547 ArchiveSnapshotsDir(#[source] IoError),
548
549 #[error("failed to archive account storage file '{1}': {0}")]
550 ArchiveAccountStorageFile(#[source] IoError, PathBuf),
551
552 #[error("failed to archive snapshot: {0}")]
553 FinishArchive(#[source] IoError),
554
555 #[error("failed to create encoder: {0}")]
556 CreateEncoder(#[source] IoError),
557
558 #[error("failed to encode archive: {0}")]
559 FinishEncoder(#[source] IoError),
560
561 #[error("failed to query archive metadata '{1}': {0}")]
562 QueryArchiveMetadata(#[source] IoError, PathBuf),
563
564 #[error("failed to move archive from '{1}' to '{2}': {0}")]
565 MoveArchive(#[source] IoError, PathBuf, PathBuf),
566
567 #[error("failed to create account storage reader '{1}': {0}")]
568 AccountStorageReaderError(#[source] IoError, PathBuf),
569}
570
571#[derive(Error, Debug)]
573pub enum HardLinkStoragesToSnapshotError {
574 #[error("failed to create accounts hard links dir '{1}': {0}")]
575 CreateAccountsHardLinksDir(#[source] IoError, PathBuf),
576
577 #[error("failed to get the snapshot's accounts hard link dir: {0}")]
578 GetSnapshotHardLinksDir(#[from] GetSnapshotAccountsHardLinkDirError),
579
580 #[error("failed to hard link storage from '{1}' to '{2}': {0}")]
581 HardLinkStorage(#[source] IoError, PathBuf, PathBuf),
582}
583
584#[derive(Error, Debug)]
586pub enum GetSnapshotAccountsHardLinkDirError {
587 #[error("invalid account storage path '{0}'")]
588 GetAccountPath(PathBuf),
589
590 #[error("failed to create the snapshot hard link dir '{1}': {0}")]
591 CreateSnapshotHardLinkDir(#[source] IoError, PathBuf),
592
593 #[error("failed to symlink snapshot hard link dir '{link}' to '{original}': {source}")]
594 SymlinkSnapshotHardLinkDir {
595 source: IoError,
596 original: PathBuf,
597 link: PathBuf,
598 },
599}
600
601pub fn clean_orphaned_account_snapshot_dirs(
609 bank_snapshots_dir: impl AsRef<Path>,
610 account_snapshot_paths: &[PathBuf],
611) -> io::Result<()> {
612 let mut account_snapshot_dirs_referenced = HashSet::new();
615 let snapshots = get_bank_snapshots(bank_snapshots_dir);
616 for snapshot in snapshots {
617 let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
618 let Ok(read_dir) = fs::read_dir(&account_hardlinks_dir) else {
620 debug!(
624 "failed to read account hardlinks dir '{}'",
625 account_hardlinks_dir.display(),
626 );
627 continue;
628 };
629 for entry in read_dir {
630 let path = entry?.path();
631 let target = fs::read_link(&path).map_err(|err| {
632 IoError::other(format!(
633 "failed to read symlink '{}': {err}",
634 path.display(),
635 ))
636 })?;
637 account_snapshot_dirs_referenced.insert(target);
638 }
639 }
640
641 for account_snapshot_path in account_snapshot_paths {
643 let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
644 IoError::other(format!(
645 "failed to read account snapshot dir '{}': {err}",
646 account_snapshot_path.display(),
647 ))
648 })?;
649 for entry in read_dir {
650 let path = entry?.path();
651 if !account_snapshot_dirs_referenced.contains(&path) {
652 info!(
653 "Removing orphaned account snapshot hardlink directory '{}'...",
654 path.display()
655 );
656 move_and_async_delete_path(&path);
657 }
658 }
659 }
660
661 Ok(())
662}
663
664pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
666 let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
667 return;
669 };
670
671 let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
672
673 let incomplete_dirs: Vec<_> = read_dir_iter
674 .filter_map(|entry| entry.ok())
675 .map(|entry| entry.path())
676 .filter(|path| path.is_dir())
677 .filter(is_incomplete)
678 .collect();
679
680 for incomplete_dir in incomplete_dirs {
682 let result = purge_bank_snapshot(&incomplete_dir);
683 match result {
684 Ok(_) => info!(
685 "Purged incomplete snapshot dir: {}",
686 incomplete_dir.display()
687 ),
688 Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
689 }
690 }
691}
692
693fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
695 let state_complete_path = bank_snapshot_dir
696 .as_ref()
697 .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
698 state_complete_path.is_file()
699}
700
701fn write_snapshot_state_complete_file(bank_snapshot_dir: impl AsRef<Path>) -> io::Result<()> {
703 let state_complete_path = bank_snapshot_dir
704 .as_ref()
705 .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
706 fs::File::create(&state_complete_path).map_err(|err| {
707 IoError::other(format!(
708 "failed to create file '{}': {err}",
709 state_complete_path.display(),
710 ))
711 })?;
712 Ok(())
713}
714
715pub fn write_full_snapshot_slot_file(
717 bank_snapshot_dir: impl AsRef<Path>,
718 full_snapshot_slot: Slot,
719) -> io::Result<()> {
720 let full_snapshot_slot_path = bank_snapshot_dir
721 .as_ref()
722 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
723 fs::write(
724 &full_snapshot_slot_path,
725 Slot::to_le_bytes(full_snapshot_slot),
726 )
727 .map_err(|err| {
728 IoError::other(format!(
729 "failed to write full snapshot slot file '{}': {err}",
730 full_snapshot_slot_path.display(),
731 ))
732 })
733}
734
735pub fn read_full_snapshot_slot_file(bank_snapshot_dir: impl AsRef<Path>) -> io::Result<Slot> {
737 const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
738 let full_snapshot_slot_path = bank_snapshot_dir
739 .as_ref()
740 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
741 let full_snapshot_slot_file_metadata = fs::metadata(&full_snapshot_slot_path)?;
742 if full_snapshot_slot_file_metadata.len() != SLOT_SIZE as u64 {
743 let error_message = format!(
744 "invalid full snapshot slot file size: '{}' has {} bytes (should be {} bytes)",
745 full_snapshot_slot_path.display(),
746 full_snapshot_slot_file_metadata.len(),
747 SLOT_SIZE,
748 );
749 return Err(IoError::other(error_message));
750 }
751 let mut full_snapshot_slot_file = fs::File::open(&full_snapshot_slot_path)?;
752 let mut buffer = [0; SLOT_SIZE];
753 full_snapshot_slot_file.read_exact(&mut buffer)?;
754 let slot = Slot::from_le_bytes(buffer);
755 Ok(slot)
756}
757
758pub fn write_storages_flushed_file(bank_snapshot_dir: impl AsRef<Path>) -> io::Result<()> {
760 let flushed_storages_path = bank_snapshot_dir
761 .as_ref()
762 .join(SNAPSHOT_STORAGES_FLUSHED_FILENAME);
763 fs::File::create(&flushed_storages_path).map_err(|err| {
764 IoError::other(format!(
765 "failed to create file '{}': {err}",
766 flushed_storages_path.display(),
767 ))
768 })?;
769 Ok(())
770}
771
772fn are_bank_snapshot_storages_flushed(bank_snapshot_dir: impl AsRef<Path>) -> bool {
774 let flushed_storages = bank_snapshot_dir
775 .as_ref()
776 .join(SNAPSHOT_STORAGES_FLUSHED_FILENAME);
777 flushed_storages.is_file()
778}
779
780pub fn get_highest_loadable_bank_snapshot(
788 snapshot_config: &SnapshotConfig,
789) -> Option<BankSnapshotInfo> {
790 let highest_bank_snapshot =
791 get_highest_bank_snapshot_post(&snapshot_config.bank_snapshots_dir)?;
792
793 if !snapshot_config.should_generate_snapshots() {
796 return Some(highest_bank_snapshot);
797 }
798
799 let highest_full_snapshot_archive_slot =
802 get_highest_full_snapshot_archive_slot(&snapshot_config.full_snapshot_archives_dir)?;
803 let full_snapshot_file_slot =
804 read_full_snapshot_slot_file(&highest_bank_snapshot.snapshot_dir).ok()?;
805 let are_storages_flushed =
806 are_bank_snapshot_storages_flushed(&highest_bank_snapshot.snapshot_dir);
807 (are_storages_flushed && (full_snapshot_file_slot == highest_full_snapshot_archive_slot))
808 .then_some(highest_bank_snapshot)
809}
810
811pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
814 if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
815 for entry in entries.flatten() {
816 if entry
817 .file_name()
818 .to_str()
819 .map(|file_name| file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX))
820 .unwrap_or(false)
821 {
822 let path = entry.path();
823 let result = if path.is_dir() {
824 fs::remove_dir_all(&path)
825 } else {
826 fs::remove_file(&path)
827 };
828 if let Err(err) = result {
829 warn!(
830 "Failed to remove temporary snapshot archive '{}': {err}",
831 path.display(),
832 );
833 }
834 }
835 }
836 }
837}
838
839pub fn serialize_and_archive_snapshot_package(
841 snapshot_package: SnapshotPackage,
842 snapshot_config: &SnapshotConfig,
843 should_flush_and_hard_link_storages: bool,
844) -> Result<SnapshotArchiveInfo> {
845 let SnapshotPackage {
846 snapshot_kind,
847 slot: snapshot_slot,
848 block_height,
849 hash: snapshot_hash,
850 mut snapshot_storages,
851 status_cache_slot_deltas,
852 bank_fields_to_serialize,
853 bank_hash_stats,
854 write_version,
855 enqueued: _,
856 } = snapshot_package;
857
858 let bank_snapshot_info = serialize_snapshot(
859 &snapshot_config.bank_snapshots_dir,
860 snapshot_config.snapshot_version,
861 snapshot_storages.as_slice(),
862 status_cache_slot_deltas.as_slice(),
863 bank_fields_to_serialize,
864 bank_hash_stats,
865 write_version,
866 should_flush_and_hard_link_storages,
867 )?;
868
869 let full_snapshot_archive_slot = match snapshot_kind {
871 SnapshotKind::FullSnapshot => snapshot_slot,
872 SnapshotKind::IncrementalSnapshot(base_slot) => base_slot,
873 };
874 write_full_snapshot_slot_file(&bank_snapshot_info.snapshot_dir, full_snapshot_archive_slot)
875 .map_err(|err| {
876 IoError::other(format!(
877 "failed to serialize snapshot slot {snapshot_slot}, block height {block_height}, \
878 kind {snapshot_kind:?}: {err}",
879 ))
880 })?;
881
882 let snapshot_archive_path = match snapshot_package.snapshot_kind {
883 SnapshotKind::FullSnapshot => build_full_snapshot_archive_path(
884 &snapshot_config.full_snapshot_archives_dir,
885 snapshot_package.slot,
886 &snapshot_package.hash,
887 snapshot_config.archive_format,
888 ),
889 SnapshotKind::IncrementalSnapshot(incremental_snapshot_base_slot) => {
890 snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
893 build_incremental_snapshot_archive_path(
894 &snapshot_config.incremental_snapshot_archives_dir,
895 incremental_snapshot_base_slot,
896 snapshot_package.slot,
897 &snapshot_package.hash,
898 snapshot_config.archive_format,
899 )
900 }
901 };
902
903 let snapshot_archive_info = archive_snapshot(
904 snapshot_kind,
905 snapshot_slot,
906 snapshot_hash,
907 snapshot_storages.as_slice(),
908 &bank_snapshot_info.snapshot_dir,
909 snapshot_archive_path,
910 snapshot_config.archive_format,
911 )?;
912
913 Ok(snapshot_archive_info)
914}
915
916#[allow(clippy::too_many_arguments)]
918fn serialize_snapshot(
919 bank_snapshots_dir: impl AsRef<Path>,
920 snapshot_version: SnapshotVersion,
921 snapshot_storages: &[Arc<AccountStorageEntry>],
922 slot_deltas: &[BankSlotDelta],
923 mut bank_fields: BankFieldsToSerialize,
924 bank_hash_stats: BankHashStats,
925 write_version: u64,
926 should_flush_and_hard_link_storages: bool,
927) -> Result<BankSnapshotInfo> {
928 let slot = bank_fields.slot;
929
930 let do_serialize_snapshot = || {
933 let mut measure_everything = Measure::start("");
934 let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
935 if bank_snapshot_dir.exists() {
936 return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
937 bank_snapshot_dir,
938 ));
939 }
940 fs::create_dir_all(&bank_snapshot_dir).map_err(|err| {
941 AddBankSnapshotError::CreateSnapshotDir(err, bank_snapshot_dir.clone())
942 })?;
943
944 let bank_snapshot_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
946 info!(
947 "Creating bank snapshot for slot {slot} at '{}'",
948 bank_snapshot_path.display(),
949 );
950
951 let (flush_storages_us, hard_link_storages_us) = if should_flush_and_hard_link_storages {
952 let flush_measure = Measure::start("");
953 for storage in snapshot_storages {
954 storage.flush().map_err(|err| {
955 AddBankSnapshotError::FlushStorage(err, storage.path().to_path_buf())
956 })?;
957 }
958 let flush_us = flush_measure.end_as_us();
959 let (_, hard_link_us) = measure_us!(hard_link_storages_to_snapshot(
960 &bank_snapshot_dir,
961 slot,
962 snapshot_storages
963 )
964 .map_err(AddBankSnapshotError::HardLinkStorages)?);
965 write_storages_flushed_file(&bank_snapshot_dir)
966 .map_err(AddBankSnapshotError::MarkStoragesFlushed)?;
967 Some((flush_us, hard_link_us))
968 } else {
969 None
970 }
971 .unzip();
972
973 let bank_snapshot_serializer = move |stream: &mut BufWriter<fs::File>| -> Result<()> {
974 let versioned_epoch_stakes = mem::take(&mut bank_fields.versioned_epoch_stakes);
975 let extra_fields = ExtraFieldsToSerialize {
976 lamports_per_signature: bank_fields.fee_rate_governor.lamports_per_signature,
977 obsolete_incremental_snapshot_persistence: None,
978 obsolete_epoch_accounts_hash: None,
979 versioned_epoch_stakes,
980 accounts_lt_hash: Some(bank_fields.accounts_lt_hash.clone().into()),
981 };
982 serde_snapshot::serialize_bank_snapshot_into(
983 stream,
984 bank_fields,
985 bank_hash_stats,
986 &get_storages_to_serialize(snapshot_storages),
987 extra_fields,
988 write_version,
989 )?;
990 Ok(())
991 };
992 let (bank_snapshot_consumed_size, bank_serialize) = measure_time!(
993 serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
994 .map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
995 "bank serialize"
996 );
997
998 let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
999 let (status_cache_consumed_size, status_cache_serialize_us) = measure_us!(
1000 serde_snapshot::serialize_status_cache(slot_deltas, &status_cache_path)
1001 .map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?
1002 );
1003
1004 let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1005 let (_, write_version_file_us) = measure_us!(fs::write(
1006 &version_path,
1007 snapshot_version.as_str().as_bytes(),
1008 )
1009 .map_err(|err| AddBankSnapshotError::WriteSnapshotVersionFile(err, version_path))?);
1010
1011 let (_, write_state_complete_file_us) = measure_us!({
1013 write_snapshot_state_complete_file(&bank_snapshot_dir)
1014 .map_err(AddBankSnapshotError::MarkSnapshotComplete)?
1015 });
1016
1017 measure_everything.stop();
1018
1019 datapoint_info!(
1021 "snapshot_bank",
1022 ("slot", slot, i64),
1023 ("bank_size", bank_snapshot_consumed_size, i64),
1024 ("status_cache_size", status_cache_consumed_size, i64),
1025 ("flush_storages_us", flush_storages_us, Option<i64>),
1026 ("hard_link_storages_us", hard_link_storages_us, Option<i64>),
1027 ("bank_serialize_us", bank_serialize.as_us(), i64),
1028 ("status_cache_serialize_us", status_cache_serialize_us, i64),
1029 ("write_version_file_us", write_version_file_us, i64),
1030 (
1031 "write_state_complete_file_us",
1032 write_state_complete_file_us,
1033 i64
1034 ),
1035 ("total_us", measure_everything.as_us(), i64),
1036 );
1037
1038 info!(
1039 "{} for slot {} at {}",
1040 bank_serialize,
1041 slot,
1042 bank_snapshot_path.display(),
1043 );
1044
1045 Ok(BankSnapshotInfo {
1046 slot,
1047 snapshot_kind: BankSnapshotKind::Pre,
1048 snapshot_dir: bank_snapshot_dir,
1049 snapshot_version,
1050 })
1051 };
1052
1053 do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
1054}
1055
1056fn archive_snapshot(
1058 snapshot_kind: SnapshotKind,
1059 snapshot_slot: Slot,
1060 snapshot_hash: SnapshotHash,
1061 snapshot_storages: &[Arc<AccountStorageEntry>],
1062 bank_snapshot_dir: impl AsRef<Path>,
1063 archive_path: impl AsRef<Path>,
1064 archive_format: ArchiveFormat,
1065) -> Result<SnapshotArchiveInfo> {
1066 use ArchiveSnapshotPackageError as E;
1067 const ACCOUNTS_DIR: &str = "accounts";
1068 info!("Generating snapshot archive for slot {snapshot_slot}, kind: {snapshot_kind:?}");
1069
1070 let mut timer = Measure::start("snapshot_package-package_snapshots");
1071 let tar_dir = archive_path
1072 .as_ref()
1073 .parent()
1074 .expect("Tar output path is invalid");
1075
1076 fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
1077
1078 let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
1080 let staging_dir = tempfile::Builder::new()
1081 .prefix(&format!("{staging_dir_prefix}{snapshot_slot}-"))
1082 .tempdir_in(tar_dir)
1083 .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
1084 let staging_snapshots_dir = staging_dir.path().join(BANK_SNAPSHOTS_DIR);
1085
1086 let slot_str = snapshot_slot.to_string();
1087 let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
1088 fs::create_dir_all(&staging_snapshot_dir)
1090 .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
1091
1092 let src_snapshot_dir = bank_snapshot_dir.as_ref().canonicalize().map_err(|err| {
1094 E::CanonicalizeSnapshotSourceDir(err, bank_snapshot_dir.as_ref().to_path_buf())
1095 })?;
1096 let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
1097 let src_snapshot_file = src_snapshot_dir.join(slot_str);
1098 symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
1099 .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
1100
1101 let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1104 let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1105 symlink::symlink_file(&src_status_cache, &staging_status_cache)
1106 .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
1107
1108 let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
1110 let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1111 symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
1112 E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
1113 })?;
1114
1115 let staging_archive_path = tar_dir.join(format!(
1117 "{}{}.{}",
1118 staging_dir_prefix,
1119 snapshot_slot,
1120 archive_format.extension(),
1121 ));
1122
1123 {
1124 let archive_file = fs::File::create(&staging_archive_path)
1125 .map_err(|err| E::CreateArchiveFile(err, staging_archive_path.clone()))?;
1126
1127 let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
1128 let mut archive = tar::Builder::new(encoder);
1129 archive.sparse(false);
1138 archive
1141 .append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
1142 .map_err(E::ArchiveVersionFile)?;
1143 archive
1144 .append_dir_all(BANK_SNAPSHOTS_DIR, &staging_snapshots_dir)
1145 .map_err(E::ArchiveSnapshotsDir)?;
1146
1147 let storages_orderer = AccountStoragesOrderer::with_small_to_large_ratio(
1148 snapshot_storages,
1149 INTERLEAVE_TAR_ENTRIES_SMALL_TO_LARGE_RATIO,
1150 );
1151 for storage in storages_orderer.iter() {
1152 let path_in_archive = Path::new(ACCOUNTS_DIR)
1153 .join(AccountsFile::file_name(storage.slot(), storage.id()));
1154
1155 let reader =
1156 AccountStorageReader::new(storage, Some(snapshot_slot)).map_err(|err| {
1157 E::AccountStorageReaderError(err, storage.path().to_path_buf())
1158 })?;
1159 let mut header = tar::Header::new_gnu();
1160 header.set_path(path_in_archive).map_err(|err| {
1161 E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1162 })?;
1163 header.set_size(reader.len() as u64);
1164 header.set_cksum();
1165 archive.append(&header, reader).map_err(|err| {
1166 E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1167 })?;
1168 }
1169
1170 archive.into_inner().map_err(E::FinishArchive)?;
1171 Ok(())
1172 };
1173
1174 match archive_format {
1175 ArchiveFormat::TarZstd { config } => {
1176 let mut encoder =
1177 zstd::stream::Encoder::new(archive_file, config.compression_level)
1178 .map_err(E::CreateEncoder)?;
1179 do_archive_files(&mut encoder)?;
1180 encoder.finish().map_err(E::FinishEncoder)?;
1181 }
1182 ArchiveFormat::TarLz4 => {
1183 let mut encoder = lz4::EncoderBuilder::new()
1184 .level(1)
1185 .build(archive_file)
1186 .map_err(E::CreateEncoder)?;
1187 do_archive_files(&mut encoder)?;
1188 let (_output, result) = encoder.finish();
1189 result.map_err(E::FinishEncoder)?;
1190 }
1191 };
1192 }
1193
1194 let metadata = fs::metadata(&staging_archive_path)
1196 .map_err(|err| E::QueryArchiveMetadata(err, staging_archive_path.clone()))?;
1197 let archive_path = archive_path.as_ref().to_path_buf();
1198 fs::rename(&staging_archive_path, &archive_path)
1199 .map_err(|err| E::MoveArchive(err, staging_archive_path, archive_path.clone()))?;
1200
1201 timer.stop();
1202 info!(
1203 "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
1204 archive_path.display(),
1205 snapshot_slot,
1206 timer.as_ms(),
1207 metadata.len()
1208 );
1209
1210 datapoint_info!(
1211 "archive-snapshot-package",
1212 ("slot", snapshot_slot, i64),
1213 ("archive_format", archive_format.to_string(), String),
1214 ("duration_ms", timer.as_ms(), i64),
1215 (
1216 if snapshot_kind.is_full_snapshot() {
1217 "full-snapshot-archive-size"
1218 } else {
1219 "incremental-snapshot-archive-size"
1220 },
1221 metadata.len(),
1222 i64
1223 ),
1224 );
1225 Ok(SnapshotArchiveInfo {
1226 path: archive_path,
1227 slot: snapshot_slot,
1228 hash: snapshot_hash,
1229 archive_format,
1230 })
1231}
1232
1233pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1235 let mut bank_snapshots = Vec::default();
1236 match fs::read_dir(&bank_snapshots_dir) {
1237 Err(err) => {
1238 info!(
1239 "Unable to read bank snapshots directory '{}': {err}",
1240 bank_snapshots_dir.as_ref().display(),
1241 );
1242 }
1243 Ok(paths) => paths
1244 .filter_map(|entry| {
1245 entry
1248 .ok()
1249 .filter(|entry| entry.path().is_dir())
1250 .and_then(|entry| {
1251 entry
1252 .path()
1253 .file_name()
1254 .and_then(|file_name| file_name.to_str())
1255 .and_then(|file_name| file_name.parse::<Slot>().ok())
1256 })
1257 })
1258 .for_each(
1259 |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
1260 Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
1261 Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
1264 },
1265 ),
1266 }
1267 bank_snapshots
1268}
1269
1270pub fn get_bank_snapshots_pre(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1274 let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1275 bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Pre);
1276 bank_snapshots
1277}
1278
1279pub fn get_bank_snapshots_post(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1283 let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1284 bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Post);
1285 bank_snapshots
1286}
1287
1288pub fn get_highest_bank_snapshot_pre(
1292 bank_snapshots_dir: impl AsRef<Path>,
1293) -> Option<BankSnapshotInfo> {
1294 do_get_highest_bank_snapshot(get_bank_snapshots_pre(bank_snapshots_dir))
1295}
1296
1297pub fn get_highest_bank_snapshot_post(
1301 bank_snapshots_dir: impl AsRef<Path>,
1302) -> Option<BankSnapshotInfo> {
1303 do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
1304}
1305
1306pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
1310 do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
1311}
1312
1313fn do_get_highest_bank_snapshot(
1314 mut bank_snapshots: Vec<BankSnapshotInfo>,
1315) -> Option<BankSnapshotInfo> {
1316 bank_snapshots.sort_unstable();
1317 bank_snapshots.into_iter().next_back()
1318}
1319
1320pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
1321where
1322 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1323{
1324 serialize_snapshot_data_file_capped::<F>(
1325 data_file_path,
1326 MAX_SNAPSHOT_DATA_FILE_SIZE,
1327 serializer,
1328 )
1329}
1330
1331pub fn deserialize_snapshot_data_file<T: Sized>(
1332 data_file_path: &Path,
1333 deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
1334) -> Result<T> {
1335 let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
1336 deserializer(streams.full_snapshot_stream)
1337 };
1338
1339 let wrapped_data_file_path = SnapshotRootPaths {
1340 full_snapshot_root_file_path: data_file_path.to_path_buf(),
1341 incremental_snapshot_root_file_path: None,
1342 };
1343
1344 deserialize_snapshot_data_files_capped(
1345 &wrapped_data_file_path,
1346 MAX_SNAPSHOT_DATA_FILE_SIZE,
1347 wrapped_deserializer,
1348 )
1349}
1350
1351pub fn deserialize_snapshot_data_files<T: Sized>(
1352 snapshot_root_paths: &SnapshotRootPaths,
1353 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1354) -> Result<T> {
1355 deserialize_snapshot_data_files_capped(
1356 snapshot_root_paths,
1357 MAX_SNAPSHOT_DATA_FILE_SIZE,
1358 deserializer,
1359 )
1360}
1361
1362fn serialize_snapshot_data_file_capped<F>(
1363 data_file_path: &Path,
1364 maximum_file_size: u64,
1365 serializer: F,
1366) -> Result<u64>
1367where
1368 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1369{
1370 let data_file = fs::File::create(data_file_path)?;
1371 let mut data_file_stream = BufWriter::new(data_file);
1372 serializer(&mut data_file_stream)?;
1373 data_file_stream.flush()?;
1374
1375 let consumed_size = data_file_stream.stream_position()?;
1376 if consumed_size > maximum_file_size {
1377 let error_message = format!(
1378 "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
1379 data_file_path.display(),
1380 );
1381 return Err(IoError::other(error_message).into());
1382 }
1383 Ok(consumed_size)
1384}
1385
1386fn deserialize_snapshot_data_files_capped<T: Sized>(
1387 snapshot_root_paths: &SnapshotRootPaths,
1388 maximum_file_size: u64,
1389 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1390) -> Result<T> {
1391 let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
1392 create_snapshot_data_file_stream(
1393 &snapshot_root_paths.full_snapshot_root_file_path,
1394 maximum_file_size,
1395 )?;
1396
1397 let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
1398 if let Some(ref incremental_snapshot_root_file_path) =
1399 snapshot_root_paths.incremental_snapshot_root_file_path
1400 {
1401 Some(create_snapshot_data_file_stream(
1402 incremental_snapshot_root_file_path,
1403 maximum_file_size,
1404 )?)
1405 } else {
1406 None
1407 }
1408 .unzip();
1409
1410 let mut snapshot_streams = SnapshotStreams {
1411 full_snapshot_stream: &mut full_snapshot_data_file_stream,
1412 incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
1413 };
1414 let ret = deserializer(&mut snapshot_streams)?;
1415
1416 check_deserialize_file_consumed(
1417 full_snapshot_file_size,
1418 &snapshot_root_paths.full_snapshot_root_file_path,
1419 &mut full_snapshot_data_file_stream,
1420 )?;
1421
1422 if let Some(ref incremental_snapshot_root_file_path) =
1423 snapshot_root_paths.incremental_snapshot_root_file_path
1424 {
1425 check_deserialize_file_consumed(
1426 incremental_snapshot_file_size.unwrap(),
1427 incremental_snapshot_root_file_path,
1428 incremental_snapshot_data_file_stream.as_mut().unwrap(),
1429 )?;
1430 }
1431
1432 Ok(ret)
1433}
1434
1435fn create_snapshot_data_file_stream(
1438 snapshot_root_file_path: impl AsRef<Path>,
1439 maximum_file_size: u64,
1440) -> Result<(u64, BufReader<std::fs::File>)> {
1441 let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
1442
1443 if snapshot_file_size > maximum_file_size {
1444 let error_message = format!(
1445 "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
1446 snapshot_root_file_path.as_ref().display(),
1447 snapshot_file_size,
1448 maximum_file_size,
1449 );
1450 return Err(IoError::other(error_message).into());
1451 }
1452
1453 let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
1454 let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
1455
1456 Ok((snapshot_file_size, snapshot_data_file_stream))
1457}
1458
1459fn check_deserialize_file_consumed(
1462 file_size: u64,
1463 file_path: impl AsRef<Path>,
1464 file_stream: &mut BufReader<std::fs::File>,
1465) -> Result<()> {
1466 let consumed_size = file_stream.stream_position()?;
1467
1468 if consumed_size != file_size {
1469 let error_message = format!(
1470 "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to \
1471 deserialize",
1472 file_path.as_ref().display(),
1473 file_size,
1474 consumed_size,
1475 );
1476 return Err(IoError::other(error_message).into());
1477 }
1478
1479 Ok(())
1480}
1481
1482fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
1484 let run_path = appendvec_path.parent()?;
1485 let run_file_name = run_path.file_name()?;
1486 if run_file_name != ACCOUNTS_RUN_DIR {
1489 error!(
1490 "The account path {} does not have run/ as its immediate parent directory.",
1491 run_path.display()
1492 );
1493 return None;
1494 }
1495 let account_path = run_path.parent()?;
1496 Some(account_path.to_path_buf())
1497}
1498
1499fn get_snapshot_accounts_hardlink_dir(
1502 appendvec_path: &Path,
1503 bank_slot: Slot,
1504 account_paths: &mut HashSet<PathBuf>,
1505 hardlinks_dir: impl AsRef<Path>,
1506) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1507 let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1508 GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1509 })?;
1510
1511 let snapshot_hardlink_dir = account_path
1512 .join(ACCOUNTS_SNAPSHOT_DIR)
1513 .join(bank_slot.to_string());
1514
1515 if !account_paths.contains(&account_path) {
1518 let idx = account_paths.len();
1519 debug!(
1520 "for appendvec_path {}, create hard-link path {}",
1521 appendvec_path.display(),
1522 snapshot_hardlink_dir.display()
1523 );
1524 fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1525 GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1526 err,
1527 snapshot_hardlink_dir.clone(),
1528 )
1529 })?;
1530 let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1531 symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1532 GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1533 source: err,
1534 original: snapshot_hardlink_dir.clone(),
1535 link: symlink_path,
1536 }
1537 })?;
1538 account_paths.insert(account_path);
1539 };
1540
1541 Ok(snapshot_hardlink_dir)
1542}
1543
1544pub fn hard_link_storages_to_snapshot(
1549 bank_snapshot_dir: impl AsRef<Path>,
1550 bank_slot: Slot,
1551 snapshot_storages: &[Arc<AccountStorageEntry>],
1552) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1553 let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1554 fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1555 HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1556 err,
1557 accounts_hardlinks_dir.clone(),
1558 )
1559 })?;
1560
1561 let mut account_paths: HashSet<PathBuf> = HashSet::new();
1562 for storage in snapshot_storages {
1563 let storage_path = storage.accounts.path();
1564 let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1565 storage_path,
1566 bank_slot,
1567 &mut account_paths,
1568 &accounts_hardlinks_dir,
1569 )?;
1570 let hardlink_filename = AccountsFile::file_name(storage.slot(), storage.id());
1573 let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1574 fs::hard_link(storage_path, &hard_link_path).map_err(|err| {
1575 HardLinkStoragesToSnapshotError::HardLinkStorage(
1576 err,
1577 storage_path.to_path_buf(),
1578 hard_link_path,
1579 )
1580 })?;
1581 }
1582 Ok(())
1583}
1584
1585pub(crate) fn get_storages_to_serialize(
1588 snapshot_storages: &[Arc<AccountStorageEntry>],
1589) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1590 snapshot_storages
1591 .iter()
1592 .map(|storage| vec![Arc::clone(storage)])
1593 .collect::<Vec<_>>()
1594}
1595
1596pub fn verify_and_unarchive_snapshots(
1598 bank_snapshots_dir: impl AsRef<Path>,
1599 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1600 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1601 account_paths: &[PathBuf],
1602 storage_access: StorageAccess,
1603) -> Result<(UnarchivedSnapshots, UnarchivedSnapshotsGuard)> {
1604 check_are_snapshots_compatible(
1605 full_snapshot_archive_info,
1606 incremental_snapshot_archive_info,
1607 )?;
1608
1609 let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
1610 let UnarchivedSnapshot {
1611 unpack_dir: full_unpack_dir,
1612 storage: full_storage,
1613 bank_fields: full_bank_fields,
1614 accounts_db_fields: full_accounts_db_fields,
1615 unpacked_snapshots_dir_and_version: full_unpacked_snapshots_dir_and_version,
1616 measure_untar: full_measure_untar,
1617 } = unarchive_snapshot(
1618 &bank_snapshots_dir,
1619 TMP_SNAPSHOT_ARCHIVE_PREFIX,
1620 full_snapshot_archive_info.path(),
1621 "snapshot untar",
1622 account_paths,
1623 full_snapshot_archive_info.archive_format(),
1624 next_append_vec_id.clone(),
1625 storage_access,
1626 )?;
1627
1628 let (
1629 incremental_unpack_dir,
1630 incremental_storage,
1631 incremental_bank_fields,
1632 incremental_accounts_db_fields,
1633 incremental_unpacked_snapshots_dir_and_version,
1634 incremental_measure_untar,
1635 ) = if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1636 let UnarchivedSnapshot {
1637 unpack_dir,
1638 storage,
1639 bank_fields,
1640 accounts_db_fields,
1641 unpacked_snapshots_dir_and_version,
1642 measure_untar,
1643 } = unarchive_snapshot(
1644 &bank_snapshots_dir,
1645 TMP_SNAPSHOT_ARCHIVE_PREFIX,
1646 incremental_snapshot_archive_info.path(),
1647 "incremental snapshot untar",
1648 account_paths,
1649 incremental_snapshot_archive_info.archive_format(),
1650 next_append_vec_id.clone(),
1651 storage_access,
1652 )?;
1653 (
1654 Some(unpack_dir),
1655 Some(storage),
1656 Some(bank_fields),
1657 Some(accounts_db_fields),
1658 Some(unpacked_snapshots_dir_and_version),
1659 Some(measure_untar),
1660 )
1661 } else {
1662 (None, None, None, None, None, None)
1663 };
1664
1665 let bank_fields = SnapshotBankFields::new(full_bank_fields, incremental_bank_fields);
1666 let accounts_db_fields =
1667 SnapshotAccountsDbFields::new(full_accounts_db_fields, incremental_accounts_db_fields);
1668 let next_append_vec_id = Arc::try_unwrap(next_append_vec_id).unwrap();
1669
1670 Ok((
1671 UnarchivedSnapshots {
1672 full_storage,
1673 incremental_storage,
1674 bank_fields,
1675 accounts_db_fields,
1676 full_unpacked_snapshots_dir_and_version,
1677 incremental_unpacked_snapshots_dir_and_version,
1678 full_measure_untar,
1679 incremental_measure_untar,
1680 next_append_vec_id,
1681 },
1682 UnarchivedSnapshotsGuard {
1683 full_unpack_dir,
1684 incremental_unpack_dir,
1685 },
1686 ))
1687}
1688
1689fn streaming_unarchive_snapshot(
1691 file_sender: Sender<PathBuf>,
1692 account_paths: Vec<PathBuf>,
1693 ledger_dir: PathBuf,
1694 snapshot_archive_path: PathBuf,
1695 archive_format: ArchiveFormat,
1696) -> JoinHandle<Result<()>> {
1697 Builder::new()
1698 .name("solTarUnpack".to_string())
1699 .spawn(move || {
1700 let archive_size = fs::metadata(&snapshot_archive_path)?.len();
1701 let buf_size = archive_size.min(MAX_SNAPSHOT_READER_BUF_SIZE);
1702 let decompressor =
1703 decompressed_tar_reader(archive_format, snapshot_archive_path, buf_size)?;
1704 hardened_unpack::streaming_unpack_snapshot(
1705 Archive::new(decompressor),
1706 archive_size,
1707 ledger_dir.as_path(),
1708 &account_paths,
1709 &file_sender,
1710 )?;
1711 Ok(())
1712 })
1713 .unwrap()
1714}
1715
1716fn decompressed_tar_reader(
1717 archive_format: ArchiveFormat,
1718 archive_path: impl AsRef<Path>,
1719 buf_size: u64,
1720) -> Result<ArchiveFormatDecompressor<Box<dyn BufRead + 'static>>> {
1721 let buf_reader =
1722 solana_accounts_db::large_file_buf_reader(archive_path.as_ref(), buf_size as usize)
1723 .map_err(|err| {
1724 io::Error::other(format!(
1725 "failed to open snapshot archive '{}': {err}",
1726 archive_path.as_ref().display(),
1727 ))
1728 })?;
1729 Ok(ArchiveFormatDecompressor::new(archive_format, buf_reader)?)
1730}
1731
1732#[derive(PartialEq, Debug)]
1734enum SnapshotFileKind {
1735 Version,
1736 BankFields,
1737 Storage,
1738}
1739
1740fn get_snapshot_file_kind(filename: &str) -> Option<SnapshotFileKind> {
1742 static VERSION_FILE_REGEX: LazyLock<Regex> =
1743 LazyLock::new(|| Regex::new(r"^version$").unwrap());
1744 static BANK_FIELDS_FILE_REGEX: LazyLock<Regex> =
1745 LazyLock::new(|| Regex::new(r"^[0-9]+(\.pre)?$").unwrap());
1746
1747 if VERSION_FILE_REGEX.is_match(filename) {
1748 Some(SnapshotFileKind::Version)
1749 } else if BANK_FIELDS_FILE_REGEX.is_match(filename) {
1750 Some(SnapshotFileKind::BankFields)
1751 } else if get_slot_and_append_vec_id(filename).is_ok() {
1752 Some(SnapshotFileKind::Storage)
1753 } else {
1754 None
1755 }
1756}
1757
1758fn get_version_and_snapshot_files(
1762 file_receiver: &Receiver<PathBuf>,
1763) -> Result<(PathBuf, PathBuf, Vec<PathBuf>)> {
1764 let mut append_vec_files = Vec::with_capacity(1024);
1765 let mut snapshot_version_path = None;
1766 let mut snapshot_file_path = None;
1767
1768 loop {
1769 if let Ok(path) = file_receiver.recv() {
1770 let filename = path.file_name().unwrap().to_str().unwrap();
1771 match get_snapshot_file_kind(filename) {
1772 Some(SnapshotFileKind::Version) => {
1773 snapshot_version_path = Some(path);
1774
1775 if snapshot_file_path.is_some() {
1777 break;
1778 }
1779 }
1780 Some(SnapshotFileKind::BankFields) => {
1781 snapshot_file_path = Some(path);
1782
1783 if snapshot_version_path.is_some() {
1785 break;
1786 }
1787 }
1788 Some(SnapshotFileKind::Storage) => {
1789 append_vec_files.push(path);
1790 }
1791 None => {} }
1793 } else {
1794 return Err(SnapshotError::RebuildStorages(
1795 "did not receive snapshot file from unpacking threads".to_string(),
1796 ));
1797 }
1798 }
1799 let snapshot_version_path = snapshot_version_path.unwrap();
1800 let snapshot_file_path = snapshot_file_path.unwrap();
1801
1802 Ok((snapshot_version_path, snapshot_file_path, append_vec_files))
1803}
1804
1805struct SnapshotFieldsBundle {
1807 snapshot_version: SnapshotVersion,
1808 bank_fields: BankFieldsToDeserialize,
1809 accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
1810 append_vec_files: Vec<PathBuf>,
1811}
1812
1813fn snapshot_fields_from_files(file_receiver: &Receiver<PathBuf>) -> Result<SnapshotFieldsBundle> {
1816 let (snapshot_version_path, snapshot_file_path, append_vec_files) =
1817 get_version_and_snapshot_files(file_receiver)?;
1818 let snapshot_version_str = snapshot_version_from_file(snapshot_version_path)?;
1819 let snapshot_version = snapshot_version_str.parse().map_err(|err| {
1820 IoError::other(format!(
1821 "unsupported snapshot version '{snapshot_version_str}': {err}",
1822 ))
1823 })?;
1824
1825 let snapshot_file = fs::File::open(snapshot_file_path).unwrap();
1826 let mut snapshot_stream = BufReader::new(snapshot_file);
1827 let (bank_fields, accounts_db_fields) = match snapshot_version {
1828 SnapshotVersion::V1_2_0 => serde_snapshot::fields_from_stream(&mut snapshot_stream)?,
1829 };
1830
1831 Ok(SnapshotFieldsBundle {
1832 snapshot_version,
1833 bank_fields,
1834 accounts_db_fields,
1835 append_vec_files,
1836 })
1837}
1838
1839fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1844 let snapshots_dir = unpack_dir.as_ref().join(BANK_SNAPSHOTS_DIR);
1845 if !snapshots_dir.is_dir() {
1846 return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1847 }
1848
1849 let slot_dir = std::fs::read_dir(&snapshots_dir)
1851 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1852 .find(|entry| entry.as_ref().unwrap().path().is_dir())
1853 .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1854 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1855 .path();
1856
1857 let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
1858 fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
1859
1860 let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1861 fs::hard_link(
1862 status_cache_file,
1863 slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
1864 )?;
1865
1866 write_snapshot_state_complete_file(slot_dir)?;
1867
1868 Ok(())
1869}
1870
1871fn unarchive_snapshot(
1875 bank_snapshots_dir: impl AsRef<Path>,
1876 unpacked_snapshots_dir_prefix: &'static str,
1877 snapshot_archive_path: impl AsRef<Path>,
1878 measure_name: &'static str,
1879 account_paths: &[PathBuf],
1880 archive_format: ArchiveFormat,
1881 next_append_vec_id: Arc<AtomicAccountsFileId>,
1882 storage_access: StorageAccess,
1883) -> Result<UnarchivedSnapshot> {
1884 let unpack_dir = tempfile::Builder::new()
1885 .prefix(unpacked_snapshots_dir_prefix)
1886 .tempdir_in(bank_snapshots_dir)?;
1887 let unpacked_snapshots_dir = unpack_dir.path().join(BANK_SNAPSHOTS_DIR);
1888
1889 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1890 let unarchive_handle = streaming_unarchive_snapshot(
1891 file_sender,
1892 account_paths.to_vec(),
1893 unpack_dir.path().to_path_buf(),
1894 snapshot_archive_path.as_ref().to_path_buf(),
1895 archive_format,
1896 );
1897
1898 let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1899 let snapshot_result = snapshot_fields_from_files(&file_receiver).and_then(
1900 |SnapshotFieldsBundle {
1901 snapshot_version,
1902 bank_fields,
1903 accounts_db_fields,
1904 append_vec_files,
1905 ..
1906 }| {
1907 let (storage, measure_untar) = measure_time!(
1908 SnapshotStorageRebuilder::rebuild_storage(
1909 &accounts_db_fields,
1910 append_vec_files,
1911 file_receiver,
1912 num_rebuilder_threads,
1913 next_append_vec_id,
1914 SnapshotFrom::Archive,
1915 storage_access,
1916 )?,
1917 measure_name
1918 );
1919 info!("{measure_untar}");
1920 create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1921
1922 Ok(UnarchivedSnapshot {
1923 unpack_dir,
1924 storage,
1925 bank_fields,
1926 accounts_db_fields,
1927 unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1928 unpacked_snapshots_dir,
1929 snapshot_version,
1930 },
1931 measure_untar,
1932 })
1933 },
1934 );
1935 unarchive_handle.join().unwrap()?;
1936 snapshot_result
1937}
1938
1939fn streaming_snapshot_dir_files(
1942 file_sender: Sender<PathBuf>,
1943 snapshot_file_path: impl Into<PathBuf>,
1944 snapshot_version_path: impl Into<PathBuf>,
1945 account_paths: &[PathBuf],
1946) -> Result<()> {
1947 file_sender.send(snapshot_file_path.into())?;
1948 file_sender.send(snapshot_version_path.into())?;
1949
1950 for account_path in account_paths {
1951 for file in fs::read_dir(account_path)? {
1952 file_sender.send(file?.path())?;
1953 }
1954 }
1955
1956 Ok(())
1957}
1958
1959pub fn rebuild_storages_from_snapshot_dir(
1964 snapshot_info: &BankSnapshotInfo,
1965 account_paths: &[PathBuf],
1966 next_append_vec_id: Arc<AtomicAccountsFileId>,
1967 storage_access: StorageAccess,
1968) -> Result<(
1969 AccountStorageMap,
1970 BankFieldsToDeserialize,
1971 AccountsDbFields<SerializableAccountStorageEntry>,
1972)> {
1973 let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1974 let accounts_hardlinks = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1975 let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1976
1977 let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1978 IoError::other(format!(
1979 "failed to read accounts hardlinks dir '{}': {err}",
1980 accounts_hardlinks.display(),
1981 ))
1982 })?;
1983 for dir_entry in read_dir {
1984 let symlink_path = dir_entry?.path();
1985 let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
1988 IoError::other(format!(
1989 "failed to read symlink '{}': {err}",
1990 symlink_path.display(),
1991 ))
1992 })?;
1993 let account_run_path = account_snapshot_path
1994 .parent()
1995 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1996 .parent()
1997 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1998 .join(ACCOUNTS_RUN_DIR);
1999 if !account_run_paths.contains(&account_run_path) {
2000 return Err(SnapshotError::AccountPathsMismatch);
2003 }
2004 let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
2007 IoError::other(format!(
2008 "failed to read account snapshot dir '{}': {err}",
2009 account_snapshot_path.display(),
2010 ))
2011 })?;
2012 for file in read_dir {
2013 let file_path = file?.path();
2014 let file_name = file_path
2015 .file_name()
2016 .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
2017 let dest_path = account_run_path.join(file_name);
2018 fs::hard_link(&file_path, &dest_path).map_err(|err| {
2019 IoError::other(format!(
2020 "failed to hard link from '{}' to '{}': {err}",
2021 file_path.display(),
2022 dest_path.display(),
2023 ))
2024 })?;
2025 }
2026 }
2027
2028 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
2029 let snapshot_file_path = &snapshot_info.snapshot_path();
2030 let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
2031 streaming_snapshot_dir_files(
2032 file_sender,
2033 snapshot_file_path,
2034 snapshot_version_path,
2035 account_paths,
2036 )?;
2037
2038 let SnapshotFieldsBundle {
2039 bank_fields,
2040 accounts_db_fields,
2041 append_vec_files,
2042 ..
2043 } = snapshot_fields_from_files(&file_receiver)?;
2044
2045 let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
2046 let storage = SnapshotStorageRebuilder::rebuild_storage(
2047 &accounts_db_fields,
2048 append_vec_files,
2049 file_receiver,
2050 num_rebuilder_threads,
2051 next_append_vec_id,
2052 SnapshotFrom::Dir,
2053 storage_access,
2054 )?;
2055
2056 Ok((storage, bank_fields, accounts_db_fields))
2057}
2058
2059fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
2063 let file_metadata = fs::metadata(&path).map_err(|err| {
2065 IoError::other(format!(
2066 "failed to query snapshot version file metadata '{}': {err}",
2067 path.as_ref().display(),
2068 ))
2069 })?;
2070 let file_size = file_metadata.len();
2071 if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
2072 let error_message = format!(
2073 "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
2074 path.as_ref().display(),
2075 file_size,
2076 MAX_SNAPSHOT_VERSION_FILE_SIZE,
2077 );
2078 return Err(IoError::other(error_message).into());
2079 }
2080
2081 let mut snapshot_version = String::new();
2083 let mut file = fs::File::open(&path).map_err(|err| {
2084 IoError::other(format!(
2085 "failed to open snapshot version file '{}': {err}",
2086 path.as_ref().display()
2087 ))
2088 })?;
2089 file.read_to_string(&mut snapshot_version).map_err(|err| {
2090 IoError::other(format!(
2091 "failed to read snapshot version from file '{}': {err}",
2092 path.as_ref().display()
2093 ))
2094 })?;
2095
2096 Ok(snapshot_version.trim().to_string())
2097}
2098
2099fn check_are_snapshots_compatible(
2102 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
2103 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
2104) -> Result<()> {
2105 if incremental_snapshot_archive_info.is_none() {
2106 return Ok(());
2107 }
2108
2109 let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
2110
2111 (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
2112 .then_some(())
2113 .ok_or_else(|| {
2114 SnapshotError::MismatchedBaseSlot(
2115 full_snapshot_archive_info.slot(),
2116 incremental_snapshot_archive_info.base_slot(),
2117 )
2118 })
2119}
2120
2121pub fn path_to_file_name_str(path: &Path) -> Result<&str> {
2123 path.file_name()
2124 .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))?
2125 .to_str()
2126 .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf()))
2127}
2128
2129pub fn build_snapshot_archives_remote_dir(snapshot_archives_dir: impl AsRef<Path>) -> PathBuf {
2130 snapshot_archives_dir
2131 .as_ref()
2132 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
2133}
2134
2135pub fn build_full_snapshot_archive_path(
2138 full_snapshot_archives_dir: impl AsRef<Path>,
2139 slot: Slot,
2140 hash: &SnapshotHash,
2141 archive_format: ArchiveFormat,
2142) -> PathBuf {
2143 full_snapshot_archives_dir.as_ref().join(format!(
2144 "snapshot-{}-{}.{}",
2145 slot,
2146 hash.0,
2147 archive_format.extension(),
2148 ))
2149}
2150
2151pub fn build_incremental_snapshot_archive_path(
2155 incremental_snapshot_archives_dir: impl AsRef<Path>,
2156 base_slot: Slot,
2157 slot: Slot,
2158 hash: &SnapshotHash,
2159 archive_format: ArchiveFormat,
2160) -> PathBuf {
2161 incremental_snapshot_archives_dir.as_ref().join(format!(
2162 "incremental-snapshot-{}-{}-{}.{}",
2163 base_slot,
2164 slot,
2165 hash.0,
2166 archive_format.extension(),
2167 ))
2168}
2169
2170pub(crate) fn parse_full_snapshot_archive_filename(
2172 archive_filename: &str,
2173) -> Result<(Slot, SnapshotHash, ArchiveFormat)> {
2174 static RE: std::sync::LazyLock<Regex> =
2175 std::sync::LazyLock::new(|| Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap());
2176
2177 let do_parse = || {
2178 RE.captures(archive_filename).and_then(|captures| {
2179 let slot = captures
2180 .name("slot")
2181 .map(|x| x.as_str().parse::<Slot>())?
2182 .ok()?;
2183 let hash = captures
2184 .name("hash")
2185 .map(|x| x.as_str().parse::<Hash>())?
2186 .ok()?;
2187 let archive_format = captures
2188 .name("ext")
2189 .map(|x| x.as_str().parse::<ArchiveFormat>())?
2190 .ok()?;
2191
2192 Some((slot, SnapshotHash(hash), archive_format))
2193 })
2194 };
2195
2196 do_parse().ok_or_else(|| {
2197 SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2198 })
2199}
2200
2201pub(crate) fn parse_incremental_snapshot_archive_filename(
2203 archive_filename: &str,
2204) -> Result<(Slot, Slot, SnapshotHash, ArchiveFormat)> {
2205 static RE: std::sync::LazyLock<Regex> = std::sync::LazyLock::new(|| {
2206 Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap()
2207 });
2208
2209 let do_parse = || {
2210 RE.captures(archive_filename).and_then(|captures| {
2211 let base_slot = captures
2212 .name("base")
2213 .map(|x| x.as_str().parse::<Slot>())?
2214 .ok()?;
2215 let slot = captures
2216 .name("slot")
2217 .map(|x| x.as_str().parse::<Slot>())?
2218 .ok()?;
2219 let hash = captures
2220 .name("hash")
2221 .map(|x| x.as_str().parse::<Hash>())?
2222 .ok()?;
2223 let archive_format = captures
2224 .name("ext")
2225 .map(|x| x.as_str().parse::<ArchiveFormat>())?
2226 .ok()?;
2227
2228 Some((base_slot, slot, SnapshotHash(hash), archive_format))
2229 })
2230 };
2231
2232 do_parse().ok_or_else(|| {
2233 SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2234 })
2235}
2236
2237fn get_snapshot_archives<T, F>(snapshot_archives_dir: &Path, cb: F) -> Vec<T>
2239where
2240 F: Fn(PathBuf) -> Result<T>,
2241{
2242 let walk_dir = |dir: &Path| -> Vec<T> {
2243 let entry_iter = fs::read_dir(dir);
2244 match entry_iter {
2245 Err(err) => {
2246 info!(
2247 "Unable to read snapshot archives directory '{}': {err}",
2248 dir.display(),
2249 );
2250 vec![]
2251 }
2252 Ok(entries) => entries
2253 .filter_map(|entry| entry.map_or(None, |entry| cb(entry.path()).ok()))
2254 .collect(),
2255 }
2256 };
2257
2258 let mut ret = walk_dir(snapshot_archives_dir);
2259 let remote_dir = build_snapshot_archives_remote_dir(snapshot_archives_dir);
2260 if remote_dir.exists() {
2261 ret.append(&mut walk_dir(remote_dir.as_ref()));
2262 }
2263 ret
2264}
2265
2266pub fn get_full_snapshot_archives(
2268 full_snapshot_archives_dir: impl AsRef<Path>,
2269) -> Vec<FullSnapshotArchiveInfo> {
2270 get_snapshot_archives(
2271 full_snapshot_archives_dir.as_ref(),
2272 FullSnapshotArchiveInfo::new_from_path,
2273 )
2274}
2275
2276pub fn get_incremental_snapshot_archives(
2278 incremental_snapshot_archives_dir: impl AsRef<Path>,
2279) -> Vec<IncrementalSnapshotArchiveInfo> {
2280 get_snapshot_archives(
2281 incremental_snapshot_archives_dir.as_ref(),
2282 IncrementalSnapshotArchiveInfo::new_from_path,
2283 )
2284}
2285
2286pub fn get_highest_full_snapshot_archive_slot(
2288 full_snapshot_archives_dir: impl AsRef<Path>,
2289) -> Option<Slot> {
2290 get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
2291 .map(|full_snapshot_archive_info| full_snapshot_archive_info.slot())
2292}
2293
2294pub fn get_highest_incremental_snapshot_archive_slot(
2297 incremental_snapshot_archives_dir: impl AsRef<Path>,
2298 full_snapshot_slot: Slot,
2299) -> Option<Slot> {
2300 get_highest_incremental_snapshot_archive_info(
2301 incremental_snapshot_archives_dir,
2302 full_snapshot_slot,
2303 )
2304 .map(|incremental_snapshot_archive_info| incremental_snapshot_archive_info.slot())
2305}
2306
2307pub fn get_highest_full_snapshot_archive_info(
2309 full_snapshot_archives_dir: impl AsRef<Path>,
2310) -> Option<FullSnapshotArchiveInfo> {
2311 let mut full_snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2312 full_snapshot_archives.sort_unstable();
2313 full_snapshot_archives.into_iter().next_back()
2314}
2315
2316pub fn get_highest_incremental_snapshot_archive_info(
2319 incremental_snapshot_archives_dir: impl AsRef<Path>,
2320 full_snapshot_slot: Slot,
2321) -> Option<IncrementalSnapshotArchiveInfo> {
2322 let mut incremental_snapshot_archives =
2326 get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
2327 .into_iter()
2328 .filter(|incremental_snapshot_archive_info| {
2329 incremental_snapshot_archive_info.base_slot() == full_snapshot_slot
2330 })
2331 .collect::<Vec<_>>();
2332 incremental_snapshot_archives.sort_unstable();
2333 incremental_snapshot_archives.into_iter().next_back()
2334}
2335
2336pub fn purge_old_snapshot_archives(
2337 full_snapshot_archives_dir: impl AsRef<Path>,
2338 incremental_snapshot_archives_dir: impl AsRef<Path>,
2339 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2340 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2341) {
2342 info!(
2343 "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
2344 full_snapshot_archives_dir.as_ref().display(),
2345 maximum_full_snapshot_archives_to_retain
2346 );
2347
2348 let mut full_snapshot_archives = get_full_snapshot_archives(&full_snapshot_archives_dir);
2349 full_snapshot_archives.sort_unstable();
2350 full_snapshot_archives.reverse();
2351
2352 let num_to_retain = full_snapshot_archives
2353 .len()
2354 .min(maximum_full_snapshot_archives_to_retain.get());
2355 trace!(
2356 "There are {} full snapshot archives, retaining {}",
2357 full_snapshot_archives.len(),
2358 num_to_retain,
2359 );
2360
2361 let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
2362 if full_snapshot_archives.is_empty() {
2363 None
2364 } else {
2365 Some(full_snapshot_archives.split_at(num_to_retain))
2366 }
2367 .unwrap_or_default();
2368
2369 let retained_full_snapshot_slots = full_snapshot_archives_to_retain
2370 .iter()
2371 .map(|ai| ai.slot())
2372 .collect::<HashSet<_>>();
2373
2374 fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
2375 for path in archives.iter().map(|a| a.path()) {
2376 trace!("Removing snapshot archive: {}", path.display());
2377 let result = fs::remove_file(path);
2378 if let Err(err) = result {
2379 info!(
2380 "Failed to remove snapshot archive '{}': {err}",
2381 path.display()
2382 );
2383 }
2384 }
2385 }
2386 remove_archives(full_snapshot_archives_to_remove);
2387
2388 info!(
2389 "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
2390 incremental_snapshot_archives_dir.as_ref().display(),
2391 maximum_incremental_snapshot_archives_to_retain
2392 );
2393 let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
2394 for incremental_snapshot_archive in
2395 get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
2396 {
2397 incremental_snapshot_archives_by_base_slot
2398 .entry(incremental_snapshot_archive.base_slot())
2399 .or_default()
2400 .push(incremental_snapshot_archive)
2401 }
2402
2403 let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
2404 for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
2405 {
2406 incremental_snapshot_archives.sort_unstable();
2407 let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
2408 maximum_incremental_snapshot_archives_to_retain.get()
2409 } else {
2410 usize::from(retained_full_snapshot_slots.contains(&base_slot))
2411 };
2412 trace!(
2413 "There are {} incremental snapshot archives for base slot {}, removing {} of them",
2414 incremental_snapshot_archives.len(),
2415 base_slot,
2416 incremental_snapshot_archives
2417 .len()
2418 .saturating_sub(num_to_retain),
2419 );
2420
2421 incremental_snapshot_archives.truncate(
2422 incremental_snapshot_archives
2423 .len()
2424 .saturating_sub(num_to_retain),
2425 );
2426 remove_archives(&incremental_snapshot_archives);
2427 }
2428}
2429
2430#[cfg(feature = "dev-context-only-utils")]
2431fn unpack_snapshot_local(
2432 snapshot_path: impl AsRef<Path>,
2433 archive_format: ArchiveFormat,
2434 ledger_dir: &Path,
2435 account_paths: &[PathBuf],
2436 num_threads: usize,
2437) -> Result<UnpackedAppendVecMap> {
2438 assert!(num_threads > 0);
2439 let archive_size = fs::metadata(&snapshot_path)?.len();
2440 let archive = Archive::new(decompressed_tar_reader(
2441 archive_format,
2442 snapshot_path,
2443 archive_size,
2444 )?);
2445 let unpacked_append_vec_map =
2446 hardened_unpack::unpack_snapshot(archive, archive_size, ledger_dir, account_paths)?;
2447
2448 Ok(unpacked_append_vec_map)
2449}
2450
2451pub fn verify_unpacked_snapshots_dir_and_version(
2452 unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
2453) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
2454 info!(
2455 "snapshot version: {}",
2456 &unpacked_snapshots_dir_and_version.snapshot_version
2457 );
2458
2459 let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
2460 let mut bank_snapshots =
2461 get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
2462 if bank_snapshots.len() > 1 {
2463 return Err(IoError::other(format!(
2464 "invalid snapshot format: only one snapshot allowed, but found {}",
2465 bank_snapshots.len(),
2466 ))
2467 .into());
2468 }
2469 let root_paths = bank_snapshots.pop().ok_or_else(|| {
2470 IoError::other(format!(
2471 "no snapshots found in snapshots directory '{}'",
2472 unpacked_snapshots_dir_and_version
2473 .unpacked_snapshots_dir
2474 .display(),
2475 ))
2476 })?;
2477 Ok((snapshot_version, root_paths))
2478}
2479
2480pub fn get_snapshot_file_name(slot: Slot) -> String {
2482 slot.to_string()
2483}
2484
2485pub fn get_bank_snapshot_dir(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) -> PathBuf {
2487 bank_snapshots_dir
2488 .as_ref()
2489 .join(get_snapshot_file_name(slot))
2490}
2491
2492#[derive(Debug, Copy, Clone)]
2493pub enum VerifyBank {
2495 Deterministic,
2497 NonDeterministic,
2500}
2501
2502#[cfg(feature = "dev-context-only-utils")]
2503pub fn verify_snapshot_archive(
2504 snapshot_archive: impl AsRef<Path>,
2505 snapshots_to_verify: impl AsRef<Path>,
2506 archive_format: ArchiveFormat,
2507 verify_bank: VerifyBank,
2508 slot: Slot,
2509) {
2510 let temp_dir = tempfile::TempDir::new().unwrap();
2511 let unpack_dir = temp_dir.path();
2512 let unpack_account_dir = create_accounts_run_and_snapshot_dirs(unpack_dir).unwrap().0;
2513 unpack_snapshot_local(
2514 snapshot_archive,
2515 archive_format,
2516 unpack_dir,
2517 &[unpack_account_dir.clone()],
2518 1,
2519 )
2520 .unwrap();
2521
2522 let unpacked_snapshots = unpack_dir.join(BANK_SNAPSHOTS_DIR);
2524
2525 let storages_to_verify = unpack_dir.join("storages_to_verify");
2528 fs::create_dir_all(&storages_to_verify).unwrap();
2530
2531 let slot = slot.to_string();
2532 let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
2533
2534 if let VerifyBank::NonDeterministic = verify_bank {
2535 let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
2537 let p2 = unpacked_snapshots.join(&slot).join(&slot);
2538 assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
2539 fs::remove_file(p1).unwrap();
2540 fs::remove_file(p2).unwrap();
2541 }
2542
2543 let existing_unpacked_status_cache_file =
2549 unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2550 let new_unpacked_status_cache_file = unpacked_snapshots
2551 .join(&slot)
2552 .join(SNAPSHOT_STATUS_CACHE_FILENAME);
2553 fs::rename(
2554 existing_unpacked_status_cache_file,
2555 new_unpacked_status_cache_file,
2556 )
2557 .unwrap();
2558
2559 let accounts_hardlinks_dir = snapshot_slot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2560 if accounts_hardlinks_dir.is_dir() {
2561 for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
2563 let link_dst_path = fs::read_link(entry.unwrap().path()).unwrap();
2564 for entry in fs::read_dir(&link_dst_path).unwrap() {
2566 let src_path = entry.unwrap().path();
2567 let dst_path = storages_to_verify.join(src_path.file_name().unwrap());
2568 fs::copy(src_path, dst_path).unwrap();
2569 }
2570 }
2571 fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
2572 }
2573
2574 let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
2575 if version_path.is_file() {
2576 fs::remove_file(version_path).unwrap();
2577 }
2578
2579 let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2580 if state_complete_path.is_file() {
2581 fs::remove_file(state_complete_path).unwrap();
2582 }
2583
2584 assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
2585
2586 _ = fs::remove_dir(unpack_account_dir.join("accounts"));
2592 assert!(!dir_diff::is_different(&storages_to_verify, unpack_account_dir).unwrap());
2594}
2595
2596pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
2598 let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2599 purge_bank_snapshots(&bank_snapshots);
2600}
2601
2602pub fn purge_old_bank_snapshots(
2604 bank_snapshots_dir: impl AsRef<Path>,
2605 num_bank_snapshots_to_retain: usize,
2606 filter_by_kind: Option<BankSnapshotKind>,
2607) {
2608 let mut bank_snapshots = match filter_by_kind {
2609 Some(BankSnapshotKind::Pre) => get_bank_snapshots_pre(&bank_snapshots_dir),
2610 Some(BankSnapshotKind::Post) => get_bank_snapshots_post(&bank_snapshots_dir),
2611 None => get_bank_snapshots(&bank_snapshots_dir),
2612 };
2613
2614 bank_snapshots.sort_unstable();
2615 purge_bank_snapshots(
2616 bank_snapshots
2617 .iter()
2618 .rev()
2619 .skip(num_bank_snapshots_to_retain),
2620 );
2621}
2622
2623pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
2628 purge_old_bank_snapshots(&bank_snapshots_dir, 0, Some(BankSnapshotKind::Pre));
2629 purge_old_bank_snapshots(&bank_snapshots_dir, 1, Some(BankSnapshotKind::Post));
2630
2631 let highest_bank_snapshot_post = get_highest_bank_snapshot_post(&bank_snapshots_dir);
2632 if let Some(highest_bank_snapshot_post) = highest_bank_snapshot_post {
2633 debug!(
2634 "Retained bank snapshot for slot {}, and purged the rest.",
2635 highest_bank_snapshot_post.slot
2636 );
2637 }
2638}
2639
2640pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
2642 let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2643 bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
2644 purge_bank_snapshots(&bank_snapshots);
2645}
2646
2647fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
2651 for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
2652 if purge_bank_snapshot(snapshot_dir).is_err() {
2653 warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
2654 }
2655 }
2656}
2657
2658pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
2660 const FN_ERR: &str = "failed to purge bank snapshot";
2661 let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2662 if accounts_hardlinks_dir.is_dir() {
2663 let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
2666 IoError::other(format!(
2667 "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
2668 accounts_hardlinks_dir.display(),
2669 ))
2670 })?;
2671 for entry in read_dir {
2672 let accounts_hardlink_dir = entry?.path();
2673 let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
2674 IoError::other(format!(
2675 "{FN_ERR}: failed to read symlink '{}': {err}",
2676 accounts_hardlink_dir.display(),
2677 ))
2678 })?;
2679 move_and_async_delete_path(&accounts_hardlink_dir);
2680 }
2681 }
2682 fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
2683 IoError::other(format!(
2684 "{FN_ERR}: failed to remove dir '{}': {err}",
2685 bank_snapshot_dir.as_ref().display(),
2686 ))
2687 })?;
2688 Ok(())
2689}
2690
2691pub fn should_take_full_snapshot(
2692 block_height: Slot,
2693 full_snapshot_archive_interval_slots: Slot,
2694) -> bool {
2695 block_height % full_snapshot_archive_interval_slots == 0
2696}
2697
2698pub fn should_take_incremental_snapshot(
2699 block_height: Slot,
2700 incremental_snapshot_archive_interval_slots: Slot,
2701 latest_full_snapshot_slot: Option<Slot>,
2702) -> bool {
2703 block_height % incremental_snapshot_archive_interval_slots == 0
2704 && latest_full_snapshot_slot.is_some()
2705}
2706
2707#[cfg(feature = "dev-context-only-utils")]
2712pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
2713 let tmp_dir = tempfile::TempDir::new().unwrap();
2714 let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
2715 (tmp_dir, account_dir)
2716}
2717
2718#[cfg(test)]
2719mod tests {
2720 use {
2721 super::*,
2722 assert_matches::assert_matches,
2723 bincode::{deserialize_from, serialize_into},
2724 std::{convert::TryFrom, mem::size_of},
2725 tempfile::NamedTempFile,
2726 };
2727
2728 #[test]
2729 fn test_serialize_snapshot_data_file_under_limit() {
2730 let temp_dir = tempfile::TempDir::new().unwrap();
2731 let expected_consumed_size = size_of::<u32>() as u64;
2732 let consumed_size = serialize_snapshot_data_file_capped(
2733 &temp_dir.path().join("data-file"),
2734 expected_consumed_size,
2735 |stream| {
2736 serialize_into(stream, &2323_u32)?;
2737 Ok(())
2738 },
2739 )
2740 .unwrap();
2741 assert_eq!(consumed_size, expected_consumed_size);
2742 }
2743
2744 #[test]
2745 fn test_serialize_snapshot_data_file_over_limit() {
2746 let temp_dir = tempfile::TempDir::new().unwrap();
2747 let expected_consumed_size = size_of::<u32>() as u64;
2748 let result = serialize_snapshot_data_file_capped(
2749 &temp_dir.path().join("data-file"),
2750 expected_consumed_size - 1,
2751 |stream| {
2752 serialize_into(stream, &2323_u32)?;
2753 Ok(())
2754 },
2755 );
2756 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
2757 }
2758
2759 #[test]
2760 fn test_deserialize_snapshot_data_file_under_limit() {
2761 let expected_data = 2323_u32;
2762 let expected_consumed_size = size_of::<u32>() as u64;
2763
2764 let temp_dir = tempfile::TempDir::new().unwrap();
2765 serialize_snapshot_data_file_capped(
2766 &temp_dir.path().join("data-file"),
2767 expected_consumed_size,
2768 |stream| {
2769 serialize_into(stream, &expected_data)?;
2770 Ok(())
2771 },
2772 )
2773 .unwrap();
2774
2775 let snapshot_root_paths = SnapshotRootPaths {
2776 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2777 incremental_snapshot_root_file_path: None,
2778 };
2779
2780 let actual_data = deserialize_snapshot_data_files_capped(
2781 &snapshot_root_paths,
2782 expected_consumed_size,
2783 |stream| {
2784 Ok(deserialize_from::<_, u32>(
2785 &mut stream.full_snapshot_stream,
2786 )?)
2787 },
2788 )
2789 .unwrap();
2790 assert_eq!(actual_data, expected_data);
2791 }
2792
2793 #[test]
2794 fn test_deserialize_snapshot_data_file_over_limit() {
2795 let expected_data = 2323_u32;
2796 let expected_consumed_size = size_of::<u32>() as u64;
2797
2798 let temp_dir = tempfile::TempDir::new().unwrap();
2799 serialize_snapshot_data_file_capped(
2800 &temp_dir.path().join("data-file"),
2801 expected_consumed_size,
2802 |stream| {
2803 serialize_into(stream, &expected_data)?;
2804 Ok(())
2805 },
2806 )
2807 .unwrap();
2808
2809 let snapshot_root_paths = SnapshotRootPaths {
2810 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2811 incremental_snapshot_root_file_path: None,
2812 };
2813
2814 let result = deserialize_snapshot_data_files_capped(
2815 &snapshot_root_paths,
2816 expected_consumed_size - 1,
2817 |stream| {
2818 Ok(deserialize_from::<_, u32>(
2819 &mut stream.full_snapshot_stream,
2820 )?)
2821 },
2822 );
2823 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
2824 }
2825
2826 #[test]
2827 fn test_deserialize_snapshot_data_file_extra_data() {
2828 let expected_data = 2323_u32;
2829 let expected_consumed_size = size_of::<u32>() as u64;
2830
2831 let temp_dir = tempfile::TempDir::new().unwrap();
2832 serialize_snapshot_data_file_capped(
2833 &temp_dir.path().join("data-file"),
2834 expected_consumed_size * 2,
2835 |stream| {
2836 serialize_into(stream.by_ref(), &expected_data)?;
2837 serialize_into(stream.by_ref(), &expected_data)?;
2838 Ok(())
2839 },
2840 )
2841 .unwrap();
2842
2843 let snapshot_root_paths = SnapshotRootPaths {
2844 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2845 incremental_snapshot_root_file_path: None,
2846 };
2847
2848 let result = deserialize_snapshot_data_files_capped(
2849 &snapshot_root_paths,
2850 expected_consumed_size * 2,
2851 |stream| {
2852 Ok(deserialize_from::<_, u32>(
2853 &mut stream.full_snapshot_stream,
2854 )?)
2855 },
2856 );
2857 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2858 }
2859
2860 #[test]
2861 fn test_snapshot_version_from_file_under_limit() {
2862 let file_content = SnapshotVersion::default().as_str();
2863 let mut file = NamedTempFile::new().unwrap();
2864 file.write_all(file_content.as_bytes()).unwrap();
2865 let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2866 assert_eq!(version_from_file, file_content);
2867 }
2868
2869 #[test]
2870 fn test_snapshot_version_from_file_over_limit() {
2871 let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2872 let file_content = vec![7u8; over_limit_size];
2873 let mut file = NamedTempFile::new().unwrap();
2874 file.write_all(&file_content).unwrap();
2875 assert_matches!(
2876 snapshot_version_from_file(file.path()),
2877 Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("snapshot version file too large")
2878 );
2879 }
2880
2881 #[test]
2882 fn test_parse_full_snapshot_archive_filename() {
2883 assert_eq!(
2884 parse_full_snapshot_archive_filename(&format!(
2885 "snapshot-43-{}.tar.zst",
2886 Hash::default()
2887 ))
2888 .unwrap(),
2889 (
2890 43,
2891 SnapshotHash(Hash::default()),
2892 ArchiveFormat::TarZstd {
2893 config: ZstdConfig::default(),
2894 }
2895 )
2896 );
2897 assert_eq!(
2898 parse_full_snapshot_archive_filename(&format!(
2899 "snapshot-45-{}.tar.lz4",
2900 Hash::default()
2901 ))
2902 .unwrap(),
2903 (45, SnapshotHash(Hash::default()), ArchiveFormat::TarLz4)
2904 );
2905
2906 assert!(parse_full_snapshot_archive_filename("invalid").is_err());
2907 assert!(
2908 parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err()
2909 );
2910
2911 assert!(
2912 parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err()
2913 );
2914 assert!(parse_full_snapshot_archive_filename(&format!(
2915 "snapshot-12345678-{}.bad!ext",
2916 Hash::new_unique()
2917 ))
2918 .is_err());
2919 assert!(
2920 parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar.zst").is_err()
2921 );
2922
2923 assert!(parse_full_snapshot_archive_filename(&format!(
2924 "snapshot-bad!slot-{}.bad!ext",
2925 Hash::new_unique()
2926 ))
2927 .is_err());
2928 assert!(parse_full_snapshot_archive_filename(&format!(
2929 "snapshot-12345678-{}.bad!ext",
2930 Hash::new_unique()
2931 ))
2932 .is_err());
2933 assert!(parse_full_snapshot_archive_filename(&format!(
2934 "snapshot-bad!slot-{}.tar.zst",
2935 Hash::new_unique()
2936 ))
2937 .is_err());
2938
2939 assert!(
2940 parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar.zst").is_err()
2941 );
2942 assert!(
2943 parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar.zst").is_err()
2944 );
2945 assert!(parse_full_snapshot_archive_filename(&format!(
2946 "snapshot-bad!slot-{}.tar.zst",
2947 Hash::new_unique()
2948 ))
2949 .is_err());
2950 }
2951
2952 #[test]
2953 fn test_parse_incremental_snapshot_archive_filename() {
2954 assert_eq!(
2955 parse_incremental_snapshot_archive_filename(&format!(
2956 "incremental-snapshot-43-234-{}.tar.zst",
2957 Hash::default()
2958 ))
2959 .unwrap(),
2960 (
2961 43,
2962 234,
2963 SnapshotHash(Hash::default()),
2964 ArchiveFormat::TarZstd {
2965 config: ZstdConfig::default(),
2966 }
2967 )
2968 );
2969 assert_eq!(
2970 parse_incremental_snapshot_archive_filename(&format!(
2971 "incremental-snapshot-45-456-{}.tar.lz4",
2972 Hash::default()
2973 ))
2974 .unwrap(),
2975 (
2976 45,
2977 456,
2978 SnapshotHash(Hash::default()),
2979 ArchiveFormat::TarLz4
2980 )
2981 );
2982
2983 assert!(parse_incremental_snapshot_archive_filename("invalid").is_err());
2984 assert!(parse_incremental_snapshot_archive_filename(&format!(
2985 "snapshot-42-{}.tar.zst",
2986 Hash::new_unique()
2987 ))
2988 .is_err());
2989 assert!(parse_incremental_snapshot_archive_filename(
2990 "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext"
2991 )
2992 .is_err());
2993
2994 assert!(parse_incremental_snapshot_archive_filename(&format!(
2995 "incremental-snapshot-bad!slot-56785678-{}.tar.zst",
2996 Hash::new_unique()
2997 ))
2998 .is_err());
2999
3000 assert!(parse_incremental_snapshot_archive_filename(&format!(
3001 "incremental-snapshot-12345678-bad!slot-{}.tar.zst",
3002 Hash::new_unique()
3003 ))
3004 .is_err());
3005
3006 assert!(parse_incremental_snapshot_archive_filename(
3007 "incremental-snapshot-12341234-56785678-bad!HASH.tar.zst"
3008 )
3009 .is_err());
3010
3011 assert!(parse_incremental_snapshot_archive_filename(&format!(
3012 "incremental-snapshot-12341234-56785678-{}.bad!ext",
3013 Hash::new_unique()
3014 ))
3015 .is_err());
3016 }
3017
3018 #[test]
3019 fn test_check_are_snapshots_compatible() {
3020 let slot1: Slot = 1234;
3021 let slot2: Slot = 5678;
3022 let slot3: Slot = 999_999;
3023
3024 let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
3025 format!("/dir/snapshot-{}-{}.tar.zst", slot1, Hash::new_unique()),
3026 ))
3027 .unwrap();
3028
3029 assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
3030
3031 let incremental_snapshot_archive_info =
3032 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
3033 "/dir/incremental-snapshot-{}-{}-{}.tar.zst",
3034 slot1,
3035 slot2,
3036 Hash::new_unique()
3037 )))
3038 .unwrap();
3039
3040 assert!(check_are_snapshots_compatible(
3041 &full_snapshot_archive_info,
3042 Some(&incremental_snapshot_archive_info)
3043 )
3044 .is_ok());
3045
3046 let incremental_snapshot_archive_info =
3047 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
3048 "/dir/incremental-snapshot-{}-{}-{}.tar.zst",
3049 slot2,
3050 slot3,
3051 Hash::new_unique()
3052 )))
3053 .unwrap();
3054
3055 assert!(check_are_snapshots_compatible(
3056 &full_snapshot_archive_info,
3057 Some(&incremental_snapshot_archive_info)
3058 )
3059 .is_err());
3060 }
3061
3062 fn common_create_bank_snapshot_files(
3064 bank_snapshots_dir: &Path,
3065 min_slot: Slot,
3066 max_slot: Slot,
3067 ) {
3068 for slot in min_slot..max_slot {
3069 let snapshot_dir = get_bank_snapshot_dir(bank_snapshots_dir, slot);
3070 fs::create_dir_all(&snapshot_dir).unwrap();
3071
3072 let snapshot_filename = get_snapshot_file_name(slot);
3073 let snapshot_path = snapshot_dir.join(snapshot_filename);
3074 fs::File::create(snapshot_path).unwrap();
3075
3076 let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
3077 fs::File::create(status_cache_file).unwrap();
3078
3079 let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
3080 fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
3081
3082 write_snapshot_state_complete_file(snapshot_dir).unwrap();
3084 }
3085 }
3086
3087 #[test]
3088 fn test_get_bank_snapshots() {
3089 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
3090 let min_slot = 10;
3091 let max_slot = 20;
3092 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
3093
3094 let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
3095 assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
3096 }
3097
3098 #[test]
3099 fn test_get_highest_bank_snapshot_post() {
3100 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
3101 let min_slot = 99;
3102 let max_slot = 123;
3103 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
3104
3105 let highest_bank_snapshot = get_highest_bank_snapshot_post(temp_snapshots_dir.path());
3106 assert!(highest_bank_snapshot.is_some());
3107 assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
3108 }
3109
3110 fn common_create_snapshot_archive_files(
3116 full_snapshot_archives_dir: &Path,
3117 incremental_snapshot_archives_dir: &Path,
3118 min_full_snapshot_slot: Slot,
3119 max_full_snapshot_slot: Slot,
3120 min_incremental_snapshot_slot: Slot,
3121 max_incremental_snapshot_slot: Slot,
3122 ) {
3123 fs::create_dir_all(full_snapshot_archives_dir).unwrap();
3124 fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
3125 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3126 for incremental_snapshot_slot in
3127 min_incremental_snapshot_slot..max_incremental_snapshot_slot
3128 {
3129 let snapshot_filename = format!(
3130 "incremental-snapshot-{}-{}-{}.tar.zst",
3131 full_snapshot_slot,
3132 incremental_snapshot_slot,
3133 Hash::default()
3134 );
3135 let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
3136 fs::File::create(snapshot_filepath).unwrap();
3137 }
3138
3139 let snapshot_filename = format!(
3140 "snapshot-{}-{}.tar.zst",
3141 full_snapshot_slot,
3142 Hash::default()
3143 );
3144 let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
3145 fs::File::create(snapshot_filepath).unwrap();
3146
3147 let bad_filename = format!(
3149 "incremental-snapshot-{}-{}-bad!hash.tar.zst",
3150 full_snapshot_slot,
3151 max_incremental_snapshot_slot + 1,
3152 );
3153 let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
3154 fs::File::create(bad_filepath).unwrap();
3155 }
3156
3157 let bad_filename = format!("snapshot-{}-bad!hash.tar.zst", max_full_snapshot_slot + 1);
3160 let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
3161 fs::File::create(bad_filepath).unwrap();
3162 }
3163
3164 #[test]
3165 fn test_get_full_snapshot_archives() {
3166 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3167 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3168 let min_slot = 123;
3169 let max_slot = 456;
3170 common_create_snapshot_archive_files(
3171 full_snapshot_archives_dir.path(),
3172 incremental_snapshot_archives_dir.path(),
3173 min_slot,
3174 max_slot,
3175 0,
3176 0,
3177 );
3178
3179 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3180 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3181 }
3182
3183 #[test]
3184 fn test_get_full_snapshot_archives_remote() {
3185 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3186 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3187 let min_slot = 123;
3188 let max_slot = 456;
3189 common_create_snapshot_archive_files(
3190 &full_snapshot_archives_dir
3191 .path()
3192 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3193 &incremental_snapshot_archives_dir
3194 .path()
3195 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3196 min_slot,
3197 max_slot,
3198 0,
3199 0,
3200 );
3201
3202 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3203 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3204 assert!(snapshot_archives.iter().all(|info| info.is_remote()));
3205 }
3206
3207 #[test]
3208 fn test_get_incremental_snapshot_archives() {
3209 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3210 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3211 let min_full_snapshot_slot = 12;
3212 let max_full_snapshot_slot = 23;
3213 let min_incremental_snapshot_slot = 34;
3214 let max_incremental_snapshot_slot = 45;
3215 common_create_snapshot_archive_files(
3216 full_snapshot_archives_dir.path(),
3217 incremental_snapshot_archives_dir.path(),
3218 min_full_snapshot_slot,
3219 max_full_snapshot_slot,
3220 min_incremental_snapshot_slot,
3221 max_incremental_snapshot_slot,
3222 );
3223
3224 let incremental_snapshot_archives =
3225 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3226 assert_eq!(
3227 incremental_snapshot_archives.len() as Slot,
3228 (max_full_snapshot_slot - min_full_snapshot_slot)
3229 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3230 );
3231 }
3232
3233 #[test]
3234 fn test_get_incremental_snapshot_archives_remote() {
3235 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3236 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3237 let min_full_snapshot_slot = 12;
3238 let max_full_snapshot_slot = 23;
3239 let min_incremental_snapshot_slot = 34;
3240 let max_incremental_snapshot_slot = 45;
3241 common_create_snapshot_archive_files(
3242 &full_snapshot_archives_dir
3243 .path()
3244 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3245 &incremental_snapshot_archives_dir
3246 .path()
3247 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3248 min_full_snapshot_slot,
3249 max_full_snapshot_slot,
3250 min_incremental_snapshot_slot,
3251 max_incremental_snapshot_slot,
3252 );
3253
3254 let incremental_snapshot_archives =
3255 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3256 assert_eq!(
3257 incremental_snapshot_archives.len() as Slot,
3258 (max_full_snapshot_slot - min_full_snapshot_slot)
3259 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3260 );
3261 assert!(incremental_snapshot_archives
3262 .iter()
3263 .all(|info| info.is_remote()));
3264 }
3265
3266 #[test]
3267 fn test_get_highest_full_snapshot_archive_slot() {
3268 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3269 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3270 let min_slot = 123;
3271 let max_slot = 456;
3272 common_create_snapshot_archive_files(
3273 full_snapshot_archives_dir.path(),
3274 incremental_snapshot_archives_dir.path(),
3275 min_slot,
3276 max_slot,
3277 0,
3278 0,
3279 );
3280
3281 assert_eq!(
3282 get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
3283 Some(max_slot - 1)
3284 );
3285 }
3286
3287 #[test]
3288 fn test_get_highest_incremental_snapshot_slot() {
3289 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3290 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3291 let min_full_snapshot_slot = 12;
3292 let max_full_snapshot_slot = 23;
3293 let min_incremental_snapshot_slot = 34;
3294 let max_incremental_snapshot_slot = 45;
3295 common_create_snapshot_archive_files(
3296 full_snapshot_archives_dir.path(),
3297 incremental_snapshot_archives_dir.path(),
3298 min_full_snapshot_slot,
3299 max_full_snapshot_slot,
3300 min_incremental_snapshot_slot,
3301 max_incremental_snapshot_slot,
3302 );
3303
3304 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3305 assert_eq!(
3306 get_highest_incremental_snapshot_archive_slot(
3307 incremental_snapshot_archives_dir.path(),
3308 full_snapshot_slot
3309 ),
3310 Some(max_incremental_snapshot_slot - 1)
3311 );
3312 }
3313
3314 assert_eq!(
3315 get_highest_incremental_snapshot_archive_slot(
3316 incremental_snapshot_archives_dir.path(),
3317 max_full_snapshot_slot
3318 ),
3319 None
3320 );
3321 }
3322
3323 fn common_test_purge_old_snapshot_archives(
3324 snapshot_names: &[&String],
3325 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
3326 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
3327 expected_snapshots: &[&String],
3328 ) {
3329 let temp_snap_dir = tempfile::TempDir::new().unwrap();
3330
3331 for snap_name in snapshot_names {
3332 let snap_path = temp_snap_dir.path().join(snap_name);
3333 let mut _snap_file = fs::File::create(snap_path);
3334 }
3335 purge_old_snapshot_archives(
3336 temp_snap_dir.path(),
3337 temp_snap_dir.path(),
3338 maximum_full_snapshot_archives_to_retain,
3339 maximum_incremental_snapshot_archives_to_retain,
3340 );
3341
3342 let mut retained_snaps = HashSet::new();
3343 for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
3344 let entry_path_buf = entry.unwrap().path();
3345 let entry_path = entry_path_buf.as_path();
3346 let snapshot_name = entry_path
3347 .file_name()
3348 .unwrap()
3349 .to_str()
3350 .unwrap()
3351 .to_string();
3352 retained_snaps.insert(snapshot_name);
3353 }
3354
3355 for snap_name in expected_snapshots {
3356 assert!(
3357 retained_snaps.contains(snap_name.as_str()),
3358 "{snap_name} not found"
3359 );
3360 }
3361 assert_eq!(retained_snaps.len(), expected_snapshots.len());
3362 }
3363
3364 #[test]
3365 fn test_purge_old_full_snapshot_archives() {
3366 let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
3367 let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
3368 let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
3369 let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
3370
3371 let expected_snapshots = vec![&snap3_name];
3373 common_test_purge_old_snapshot_archives(
3374 &snapshot_names,
3375 NonZeroUsize::new(1).unwrap(),
3376 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3377 &expected_snapshots,
3378 );
3379
3380 let expected_snapshots = vec![&snap2_name, &snap3_name];
3382 common_test_purge_old_snapshot_archives(
3383 &snapshot_names,
3384 NonZeroUsize::new(2).unwrap(),
3385 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3386 &expected_snapshots,
3387 );
3388
3389 let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
3391 common_test_purge_old_snapshot_archives(
3392 &snapshot_names,
3393 NonZeroUsize::new(3).unwrap(),
3394 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3395 &expected_snapshots,
3396 );
3397 }
3398
3399 #[test]
3403 fn test_purge_old_full_snapshot_archives_in_the_loop() {
3404 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3405 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3406 let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
3407 let starting_slot: Slot = 42;
3408
3409 for slot in (starting_slot..).take(100) {
3410 let full_snapshot_archive_file_name =
3411 format!("snapshot-{}-{}.tar.zst", slot, Hash::default());
3412 let full_snapshot_archive_path = full_snapshot_archives_dir
3413 .as_ref()
3414 .join(full_snapshot_archive_file_name);
3415 fs::File::create(full_snapshot_archive_path).unwrap();
3416
3417 if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
3419 continue;
3420 }
3421
3422 if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
3424 continue;
3425 }
3426
3427 purge_old_snapshot_archives(
3428 &full_snapshot_archives_dir,
3429 &incremental_snapshot_archives_dir,
3430 maximum_snapshots_to_retain,
3431 NonZeroUsize::new(usize::MAX).unwrap(),
3432 );
3433 let mut full_snapshot_archives =
3434 get_full_snapshot_archives(&full_snapshot_archives_dir);
3435 full_snapshot_archives.sort_unstable();
3436 assert_eq!(
3437 full_snapshot_archives.len(),
3438 maximum_snapshots_to_retain.get()
3439 );
3440 assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
3441 for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
3442 assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
3443 }
3444 }
3445 }
3446
3447 #[test]
3448 fn test_purge_old_incremental_snapshot_archives() {
3449 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3450 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3451 let starting_slot = 100_000;
3452
3453 let maximum_incremental_snapshot_archives_to_retain =
3454 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3455 let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3456
3457 let incremental_snapshot_interval = 100;
3458 let num_incremental_snapshots_per_full_snapshot =
3459 maximum_incremental_snapshot_archives_to_retain.get() * 2;
3460 let full_snapshot_interval =
3461 incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
3462
3463 let mut snapshot_filenames = vec![];
3464 (starting_slot..)
3465 .step_by(full_snapshot_interval)
3466 .take(
3467 maximum_full_snapshot_archives_to_retain
3468 .checked_mul(NonZeroUsize::new(2).unwrap())
3469 .unwrap()
3470 .get(),
3471 )
3472 .for_each(|full_snapshot_slot| {
3473 let snapshot_filename = format!(
3474 "snapshot-{}-{}.tar.zst",
3475 full_snapshot_slot,
3476 Hash::default()
3477 );
3478 let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
3479 fs::File::create(snapshot_path).unwrap();
3480 snapshot_filenames.push(snapshot_filename);
3481
3482 (full_snapshot_slot..)
3483 .step_by(incremental_snapshot_interval)
3484 .take(num_incremental_snapshots_per_full_snapshot)
3485 .skip(1)
3486 .for_each(|incremental_snapshot_slot| {
3487 let snapshot_filename = format!(
3488 "incremental-snapshot-{}-{}-{}.tar.zst",
3489 full_snapshot_slot,
3490 incremental_snapshot_slot,
3491 Hash::default()
3492 );
3493 let snapshot_path = incremental_snapshot_archives_dir
3494 .path()
3495 .join(&snapshot_filename);
3496 fs::File::create(snapshot_path).unwrap();
3497 snapshot_filenames.push(snapshot_filename);
3498 });
3499 });
3500
3501 purge_old_snapshot_archives(
3502 full_snapshot_archives_dir.path(),
3503 incremental_snapshot_archives_dir.path(),
3504 maximum_full_snapshot_archives_to_retain,
3505 maximum_incremental_snapshot_archives_to_retain,
3506 );
3507
3508 let mut remaining_full_snapshot_archives =
3510 get_full_snapshot_archives(full_snapshot_archives_dir.path());
3511 assert_eq!(
3512 remaining_full_snapshot_archives.len(),
3513 maximum_full_snapshot_archives_to_retain.get(),
3514 );
3515 remaining_full_snapshot_archives.sort_unstable();
3516 let latest_full_snapshot_archive_slot =
3517 remaining_full_snapshot_archives.last().unwrap().slot();
3518
3519 let mut remaining_incremental_snapshot_archives =
3524 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3525 assert_eq!(
3526 remaining_incremental_snapshot_archives.len(),
3527 maximum_incremental_snapshot_archives_to_retain
3528 .get()
3529 .saturating_add(
3530 maximum_full_snapshot_archives_to_retain
3531 .get()
3532 .saturating_sub(1)
3533 )
3534 );
3535 remaining_incremental_snapshot_archives.sort_unstable();
3536 remaining_incremental_snapshot_archives.reverse();
3537
3538 for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
3540 let incremental_snapshot_archive =
3541 remaining_incremental_snapshot_archives.pop().unwrap();
3542
3543 let expected_base_slot =
3544 latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
3545 assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
3546 let expected_slot = expected_base_slot
3547 + (full_snapshot_interval - incremental_snapshot_interval) as u64;
3548 assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
3549 }
3550
3551 for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
3553 assert_eq!(
3554 incremental_snapshot_archive.base_slot(),
3555 latest_full_snapshot_archive_slot
3556 );
3557 }
3558
3559 let expected_remaining_incremental_snapshot_archive_slots =
3561 (latest_full_snapshot_archive_slot..)
3562 .step_by(incremental_snapshot_interval)
3563 .take(num_incremental_snapshots_per_full_snapshot)
3564 .skip(
3565 num_incremental_snapshots_per_full_snapshot
3566 - maximum_incremental_snapshot_archives_to_retain.get(),
3567 )
3568 .collect::<HashSet<_>>();
3569
3570 let actual_remaining_incremental_snapshot_archive_slots =
3571 remaining_incremental_snapshot_archives
3572 .iter()
3573 .map(|snapshot| snapshot.slot())
3574 .collect::<HashSet<_>>();
3575 assert_eq!(
3576 actual_remaining_incremental_snapshot_archive_slots,
3577 expected_remaining_incremental_snapshot_archive_slots
3578 );
3579 }
3580
3581 #[test]
3582 fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
3583 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3584 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3585
3586 for snapshot_filenames in [
3587 format!("incremental-snapshot-100-120-{}.tar.zst", Hash::default()),
3588 format!("incremental-snapshot-100-140-{}.tar.zst", Hash::default()),
3589 format!("incremental-snapshot-100-160-{}.tar.zst", Hash::default()),
3590 format!("incremental-snapshot-100-180-{}.tar.zst", Hash::default()),
3591 format!("incremental-snapshot-200-220-{}.tar.zst", Hash::default()),
3592 format!("incremental-snapshot-200-240-{}.tar.zst", Hash::default()),
3593 format!("incremental-snapshot-200-260-{}.tar.zst", Hash::default()),
3594 format!("incremental-snapshot-200-280-{}.tar.zst", Hash::default()),
3595 ] {
3596 let snapshot_path = incremental_snapshot_archives_dir
3597 .path()
3598 .join(snapshot_filenames);
3599 fs::File::create(snapshot_path).unwrap();
3600 }
3601
3602 purge_old_snapshot_archives(
3603 full_snapshot_archives_dir.path(),
3604 incremental_snapshot_archives_dir.path(),
3605 NonZeroUsize::new(usize::MAX).unwrap(),
3606 NonZeroUsize::new(usize::MAX).unwrap(),
3607 );
3608
3609 let remaining_incremental_snapshot_archives =
3610 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3611 assert!(remaining_incremental_snapshot_archives.is_empty());
3612 }
3613
3614 #[test]
3615 fn test_get_snapshot_accounts_hardlink_dir() {
3616 let slot: Slot = 1;
3617
3618 let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
3619
3620 let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
3621 let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
3622 let accounts_hardlinks_dir = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
3623 fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
3624
3625 let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
3626 let appendvec_filename = format!("{slot}.0");
3627 let appendvec_path = accounts_dir.join(appendvec_filename);
3628
3629 let ret = get_snapshot_accounts_hardlink_dir(
3630 &appendvec_path,
3631 slot,
3632 &mut account_paths_set,
3633 &accounts_hardlinks_dir,
3634 );
3635 assert!(ret.is_ok());
3636
3637 let wrong_appendvec_path = appendvec_path
3638 .parent()
3639 .unwrap()
3640 .parent()
3641 .unwrap()
3642 .join(appendvec_path.file_name().unwrap());
3643 let ret = get_snapshot_accounts_hardlink_dir(
3644 &wrong_appendvec_path,
3645 slot,
3646 &mut account_paths_set,
3647 accounts_hardlinks_dir,
3648 );
3649
3650 assert_matches!(
3651 ret,
3652 Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
3653 );
3654 }
3655
3656 #[test]
3657 fn test_get_snapshot_file_kind() {
3658 assert_eq!(None, get_snapshot_file_kind("file.txt"));
3659 assert_eq!(
3660 Some(SnapshotFileKind::Version),
3661 get_snapshot_file_kind(SNAPSHOT_VERSION_FILENAME)
3662 );
3663 assert_eq!(
3664 Some(SnapshotFileKind::BankFields),
3665 get_snapshot_file_kind("1234")
3666 );
3667 assert_eq!(
3668 Some(SnapshotFileKind::Storage),
3669 get_snapshot_file_kind("1000.999")
3670 );
3671 }
3672
3673 #[test]
3674 fn test_full_snapshot_slot_file_good() {
3675 let slot_written = 123_456_789;
3676 let bank_snapshot_dir = TempDir::new().unwrap();
3677 write_full_snapshot_slot_file(&bank_snapshot_dir, slot_written).unwrap();
3678
3679 let slot_read = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap();
3680 assert_eq!(slot_read, slot_written);
3681 }
3682
3683 #[test]
3684 fn test_full_snapshot_slot_file_bad() {
3685 const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
3686 let too_small = [1u8; SLOT_SIZE - 1];
3687 let too_large = [1u8; SLOT_SIZE + 1];
3688
3689 for contents in [too_small.as_slice(), too_large.as_slice()] {
3690 let bank_snapshot_dir = TempDir::new().unwrap();
3691 let full_snapshot_slot_path = bank_snapshot_dir
3692 .as_ref()
3693 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
3694 fs::write(full_snapshot_slot_path, contents).unwrap();
3695
3696 let err = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap_err();
3697 assert!(err
3698 .to_string()
3699 .starts_with("invalid full snapshot slot file size"));
3700 }
3701 }
3702}