1use {
2 crate::{
3 bank::{BankFieldsToDeserialize, BankFieldsToSerialize, BankHashStats, BankSlotDelta},
4 serde_snapshot::{
5 self, AccountsDbFields, BankIncrementalSnapshotPersistence, ExtraFieldsToSerialize,
6 SerializableAccountStorageEntry, SnapshotAccountsDbFields, SnapshotBankFields,
7 SnapshotStreams,
8 },
9 snapshot_archive_info::{
10 FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfo,
11 SnapshotArchiveInfoGetter,
12 },
13 snapshot_bank_utils,
14 snapshot_config::SnapshotConfig,
15 snapshot_hash::SnapshotHash,
16 snapshot_package::{SnapshotKind, SnapshotPackage},
17 snapshot_utils::snapshot_storage_rebuilder::{
18 get_slot_and_append_vec_id, SnapshotStorageRebuilder,
19 },
20 },
21 bzip2::read::BzDecoder,
22 crossbeam_channel::{Receiver, Sender},
23 flate2::read::GzDecoder,
24 log::*,
25 regex::Regex,
26 solana_accounts_db::{
27 account_storage::AccountStorageMap,
28 account_storage_reader::AccountStorageReader,
29 accounts_db::{AccountStorageEntry, AtomicAccountsFileId},
30 accounts_file::{AccountsFile, AccountsFileError, StorageAccess},
31 accounts_hash::{AccountsDeltaHash, AccountsHash},
32 epoch_accounts_hash::EpochAccountsHash,
33 hardened_unpack::{self, ParallelSelector, UnpackError},
34 shared_buffer_reader::{SharedBuffer, SharedBufferReader},
35 utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
36 },
37 solana_clock::{Epoch, Slot},
38 solana_hash::Hash,
39 solana_measure::{measure::Measure, measure_time, measure_us},
40 std::{
41 cmp::Ordering,
42 collections::{HashMap, HashSet},
43 fmt, fs,
44 io::{BufReader, BufWriter, Error as IoError, Read, Result as IoResult, Seek, Write},
45 mem,
46 num::NonZeroUsize,
47 ops::RangeInclusive,
48 path::{Path, PathBuf},
49 process::ExitStatus,
50 str::FromStr,
51 sync::{Arc, LazyLock},
52 thread::{Builder, JoinHandle},
53 },
54 tar::{self, Archive},
55 tempfile::TempDir,
56 thiserror::Error,
57};
58#[cfg(feature = "dev-context-only-utils")]
59use {
60 hardened_unpack::UnpackedAppendVecMap, rayon::prelude::*,
61 solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
62};
63
64mod archive_format;
65pub mod snapshot_storage_rebuilder;
66pub use archive_format::*;
67
68pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
69pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
70pub const SNAPSHOT_STATE_COMPLETE_FILENAME: &str = "state_complete";
71pub const SNAPSHOT_STORAGES_FLUSHED_FILENAME: &str = "storages_flushed";
72pub const SNAPSHOT_ACCOUNTS_HARDLINKS: &str = "accounts_hardlinks";
73pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
74pub const SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME: &str = "full_snapshot_slot";
75pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; const VERSION_STRING_V1_2_0: &str = "1.2.0";
78pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
79pub const BANK_SNAPSHOT_PRE_FILENAME_EXTENSION: &str = "pre";
80pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
85 NonZeroUsize::new(2).unwrap();
86pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
87 NonZeroUsize::new(4).unwrap();
88pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
89pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
90
91#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
92pub enum SnapshotVersion {
93 #[default]
94 V1_2_0,
95}
96
97impl fmt::Display for SnapshotVersion {
98 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
99 f.write_str(From::from(*self))
100 }
101}
102
103impl From<SnapshotVersion> for &'static str {
104 fn from(snapshot_version: SnapshotVersion) -> &'static str {
105 match snapshot_version {
106 SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
107 }
108 }
109}
110
111impl FromStr for SnapshotVersion {
112 type Err = &'static str;
113
114 fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
115 let version_string = if version_string
117 .get(..1)
118 .is_some_and(|s| s.eq_ignore_ascii_case("v"))
119 {
120 &version_string[1..]
121 } else {
122 version_string
123 };
124 match version_string {
125 VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
126 _ => Err("unsupported snapshot version"),
127 }
128 }
129}
130
131impl SnapshotVersion {
132 pub fn as_str(self) -> &'static str {
133 <&str as From<Self>>::from(self)
134 }
135}
136
137#[derive(PartialEq, Eq, Debug)]
140pub struct BankSnapshotInfo {
141 pub slot: Slot,
143 pub snapshot_kind: BankSnapshotKind,
145 pub snapshot_dir: PathBuf,
147 pub snapshot_version: SnapshotVersion,
149}
150
151impl PartialOrd for BankSnapshotInfo {
152 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
153 Some(self.cmp(other))
154 }
155}
156
157impl Ord for BankSnapshotInfo {
159 fn cmp(&self, other: &Self) -> Ordering {
160 self.slot.cmp(&other.slot)
161 }
162}
163
164impl BankSnapshotInfo {
165 pub fn new_from_dir(
166 bank_snapshots_dir: impl AsRef<Path>,
167 slot: Slot,
168 ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
169 let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
172
173 if !bank_snapshot_dir.is_dir() {
174 return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
175 bank_snapshot_dir,
176 ));
177 }
178
179 if !is_bank_snapshot_complete(&bank_snapshot_dir) {
185 return Err(SnapshotNewFromDirError::IncompleteDir(bank_snapshot_dir));
186 }
187
188 let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
189 if !status_cache_file.is_file() {
190 return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
191 status_cache_file,
192 ));
193 }
194
195 let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
196 let version_str = snapshot_version_from_file(&version_path).or(Err(
197 SnapshotNewFromDirError::MissingVersionFile(version_path),
198 ))?;
199 let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
200 .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
201
202 let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
203 let bank_snapshot_pre_path =
204 bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
205
206 let snapshot_kind = if bank_snapshot_pre_path.is_file() {
215 BankSnapshotKind::Pre
216 } else if bank_snapshot_post_path.is_file() {
217 BankSnapshotKind::Post
218 } else {
219 return Err(SnapshotNewFromDirError::MissingSnapshotFile(
220 bank_snapshot_dir,
221 ));
222 };
223
224 Ok(BankSnapshotInfo {
225 slot,
226 snapshot_kind,
227 snapshot_dir: bank_snapshot_dir,
228 snapshot_version,
229 })
230 }
231
232 pub fn snapshot_path(&self) -> PathBuf {
233 let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot));
234
235 let ext = match self.snapshot_kind {
236 BankSnapshotKind::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
237 BankSnapshotKind::Post => "",
238 };
239 bank_snapshot_path.set_extension(ext);
240
241 bank_snapshot_path
242 }
243}
244
245#[derive(Debug, Copy, Clone, Eq, PartialEq)]
254pub enum BankSnapshotKind {
255 Pre,
257 Post,
259}
260
261#[derive(Clone, Copy, Debug, Eq, PartialEq)]
265pub enum SnapshotFrom {
266 Archive,
268 Dir,
270}
271
272#[derive(Debug)]
275pub struct SnapshotRootPaths {
276 pub full_snapshot_root_file_path: PathBuf,
277 pub incremental_snapshot_root_file_path: Option<PathBuf>,
278}
279
280#[derive(Debug)]
282pub struct UnarchivedSnapshot {
283 #[allow(dead_code)]
284 unpack_dir: TempDir,
285 pub storage: AccountStorageMap,
286 pub bank_fields: BankFieldsToDeserialize,
287 pub accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
288 pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
289 pub measure_untar: Measure,
290}
291
292#[derive(Debug)]
294pub struct UnarchivedSnapshots {
295 pub full_storage: AccountStorageMap,
296 pub incremental_storage: Option<AccountStorageMap>,
297 pub bank_fields: SnapshotBankFields,
298 pub accounts_db_fields: SnapshotAccountsDbFields<SerializableAccountStorageEntry>,
299 pub full_unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
300 pub incremental_unpacked_snapshots_dir_and_version: Option<UnpackedSnapshotsDirAndVersion>,
301 pub full_measure_untar: Measure,
302 pub incremental_measure_untar: Option<Measure>,
303 pub next_append_vec_id: AtomicAccountsFileId,
304}
305
306#[allow(dead_code)]
309#[derive(Debug)]
310pub struct UnarchivedSnapshotsGuard {
311 full_unpack_dir: TempDir,
312 incremental_unpack_dir: Option<TempDir>,
313}
314#[derive(Debug)]
316pub struct UnpackedSnapshotsDirAndVersion {
317 pub unpacked_snapshots_dir: PathBuf,
318 pub snapshot_version: SnapshotVersion,
319}
320
321pub(crate) struct StorageAndNextAccountsFileId {
324 pub storage: AccountStorageMap,
325 pub next_append_vec_id: AtomicAccountsFileId,
326}
327
328#[derive(Error, Debug)]
329#[allow(clippy::large_enum_variant)]
330pub enum SnapshotError {
331 #[error("I/O error: {0}")]
332 Io(#[from] IoError),
333
334 #[error("AccountsFile error: {0}")]
335 AccountsFileError(#[from] AccountsFileError),
336
337 #[error("serialization error: {0}")]
338 Serialize(#[from] bincode::Error),
339
340 #[error("crossbeam send error: {0}")]
341 CrossbeamSend(#[from] crossbeam_channel::SendError<PathBuf>),
342
343 #[error("archive generation failure {0}")]
344 ArchiveGenerationFailure(ExitStatus),
345
346 #[error("Unpack error: {0}")]
347 UnpackError(#[from] UnpackError),
348
349 #[error("source({1}) - I/O error: {0}")]
350 IoWithSource(IoError, &'static str),
351
352 #[error("could not get file name from path '{0}'")]
353 PathToFileNameError(PathBuf),
354
355 #[error("could not get str from file name '{0}'")]
356 FileNameToStrError(PathBuf),
357
358 #[error("could not parse snapshot archive's file name '{0}'")]
359 ParseSnapshotArchiveFileNameError(String),
360
361 #[error("snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot ({1}) do not match")]
362 MismatchedBaseSlot(Slot, Slot),
363
364 #[error("no snapshot archives to load from '{0}'")]
365 NoSnapshotArchives(PathBuf),
366
367 #[error("snapshot slot mismatch: deserialized bank: {0}, snapshot archive: {1}")]
368 MismatchedSlot(Slot, Slot),
369
370 #[error("snapshot hash mismatch: deserialized bank: {0:?}, snapshot archive: {1:?}")]
371 MismatchedHash(SnapshotHash, SnapshotHash),
372
373 #[error("snapshot slot deltas are invalid: {0}")]
374 VerifySlotDeltas(#[from] VerifySlotDeltasError),
375
376 #[error("snapshot epoch stakes are invalid: {0}")]
377 VerifyEpochStakes(#[from] VerifyEpochStakesError),
378
379 #[error("bank_snapshot_info new_from_dir failed: {0}")]
380 NewFromDir(#[from] SnapshotNewFromDirError),
381
382 #[error("invalid snapshot dir path '{0}'")]
383 InvalidSnapshotDirPath(PathBuf),
384
385 #[error("invalid AppendVec path '{0}'")]
386 InvalidAppendVecPath(PathBuf),
387
388 #[error("invalid account path '{0}'")]
389 InvalidAccountPath(PathBuf),
390
391 #[error("no valid snapshot dir found under '{0}'")]
392 NoSnapshotSlotDir(PathBuf),
393
394 #[error("snapshot dir account paths mismatching")]
395 AccountPathsMismatch,
396
397 #[error("failed to add bank snapshot for slot {1}: {0}")]
398 AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
399
400 #[error("failed to archive snapshot package: {0}")]
401 ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
402
403 #[error("failed to rebuild snapshot storages: {0}")]
404 RebuildStorages(String),
405}
406
407#[derive(Error, Debug)]
408pub enum SnapshotNewFromDirError {
409 #[error("invalid bank snapshot directory '{0}'")]
410 InvalidBankSnapshotDir(PathBuf),
411
412 #[error("missing status cache file '{0}'")]
413 MissingStatusCacheFile(PathBuf),
414
415 #[error("missing version file '{0}'")]
416 MissingVersionFile(PathBuf),
417
418 #[error("invalid snapshot version '{0}'")]
419 InvalidVersion(String),
420
421 #[error("snapshot directory incomplete '{0}'")]
422 IncompleteDir(PathBuf),
423
424 #[error("missing snapshot file '{0}'")]
425 MissingSnapshotFile(PathBuf),
426}
427
428pub type Result<T> = std::result::Result<T, SnapshotError>;
429
430#[derive(Error, Debug, PartialEq, Eq)]
432pub enum VerifySlotDeltasError {
433 #[error("too many entries: {0} (max: {1})")]
434 TooManyEntries(usize, usize),
435
436 #[error("slot {0} is not a root")]
437 SlotIsNotRoot(Slot),
438
439 #[error("slot {0} is greater than bank slot {1}")]
440 SlotGreaterThanMaxRoot(Slot, Slot),
441
442 #[error("slot {0} has multiple entries")]
443 SlotHasMultipleEntries(Slot),
444
445 #[error("slot {0} was not found in slot history")]
446 SlotNotFoundInHistory(Slot),
447
448 #[error("slot {0} was in history but missing from slot deltas")]
449 SlotNotFoundInDeltas(Slot),
450
451 #[error("slot history is bad and cannot be used to verify slot deltas")]
452 BadSlotHistory,
453}
454
455#[derive(Error, Debug, PartialEq, Eq)]
457pub enum VerifyEpochStakesError {
458 #[error("epoch {0} is greater than the max {1}")]
459 EpochGreaterThanMax(Epoch, Epoch),
460
461 #[error("stakes not found for epoch {0} (required epochs: {1:?})")]
462 StakesNotFound(Epoch, RangeInclusive<Epoch>),
463}
464
465#[derive(Error, Debug)]
467pub enum AddBankSnapshotError {
468 #[error("bank snapshot dir already exists '{0}'")]
469 SnapshotDirAlreadyExists(PathBuf),
470
471 #[error("failed to create snapshot dir '{1}': {0}")]
472 CreateSnapshotDir(#[source] IoError, PathBuf),
473
474 #[error("failed to flush storage '{1}': {0}")]
475 FlushStorage(#[source] AccountsFileError, PathBuf),
476
477 #[error("failed to mark snapshot storages as 'flushed': {0}")]
478 MarkStoragesFlushed(#[source] IoError),
479
480 #[error("failed to hard link storages: {0}")]
481 HardLinkStorages(#[source] HardLinkStoragesToSnapshotError),
482
483 #[error("failed to serialize bank: {0}")]
484 SerializeBank(#[source] Box<SnapshotError>),
485
486 #[error("failed to serialize status cache: {0}")]
487 SerializeStatusCache(#[source] Box<SnapshotError>),
488
489 #[error("failed to write snapshot version file '{1}': {0}")]
490 WriteSnapshotVersionFile(#[source] IoError, PathBuf),
491
492 #[error("failed to mark snapshot as 'complete': {0}")]
493 MarkSnapshotComplete(#[source] IoError),
494}
495
496#[derive(Error, Debug)]
498pub enum ArchiveSnapshotPackageError {
499 #[error("failed to create archive path '{1}': {0}")]
500 CreateArchiveDir(#[source] IoError, PathBuf),
501
502 #[error("failed to create staging dir inside '{1}': {0}")]
503 CreateStagingDir(#[source] IoError, PathBuf),
504
505 #[error("failed to create snapshot staging dir '{1}': {0}")]
506 CreateSnapshotStagingDir(#[source] IoError, PathBuf),
507
508 #[error("failed to canonicalize snapshot source dir '{1}': {0}")]
509 CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),
510
511 #[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
512 SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),
513
514 #[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
515 SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),
516
517 #[error("failed to symlink version file from '{1}' to '{2}': {0}")]
518 SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),
519
520 #[error("failed to create archive file '{1}': {0}")]
521 CreateArchiveFile(#[source] IoError, PathBuf),
522
523 #[error("failed to archive version file: {0}")]
524 ArchiveVersionFile(#[source] IoError),
525
526 #[error("failed to archive snapshots dir: {0}")]
527 ArchiveSnapshotsDir(#[source] IoError),
528
529 #[error("failed to archive account storage file '{1}': {0}")]
530 ArchiveAccountStorageFile(#[source] IoError, PathBuf),
531
532 #[error("failed to archive snapshot: {0}")]
533 FinishArchive(#[source] IoError),
534
535 #[error("failed to create encoder: {0}")]
536 CreateEncoder(#[source] IoError),
537
538 #[error("failed to encode archive: {0}")]
539 FinishEncoder(#[source] IoError),
540
541 #[error("failed to query archive metadata '{1}': {0}")]
542 QueryArchiveMetadata(#[source] IoError, PathBuf),
543
544 #[error("failed to move archive from '{1}' to '{2}': {0}")]
545 MoveArchive(#[source] IoError, PathBuf, PathBuf),
546
547 #[error("failed to create account storage reader '{1}': {0}")]
548 AccountStorageReaderError(#[source] IoError, PathBuf),
549}
550
551#[derive(Error, Debug)]
553pub enum HardLinkStoragesToSnapshotError {
554 #[error("failed to create accounts hard links dir '{1}': {0}")]
555 CreateAccountsHardLinksDir(#[source] IoError, PathBuf),
556
557 #[error("failed to get the snapshot's accounts hard link dir: {0}")]
558 GetSnapshotHardLinksDir(#[from] GetSnapshotAccountsHardLinkDirError),
559
560 #[error("failed to hard link storage from '{1}' to '{2}': {0}")]
561 HardLinkStorage(#[source] IoError, PathBuf, PathBuf),
562}
563
564#[derive(Error, Debug)]
566pub enum GetSnapshotAccountsHardLinkDirError {
567 #[error("invalid account storage path '{0}'")]
568 GetAccountPath(PathBuf),
569
570 #[error("failed to create the snapshot hard link dir '{1}': {0}")]
571 CreateSnapshotHardLinkDir(#[source] IoError, PathBuf),
572
573 #[error("failed to symlink snapshot hard link dir '{link}' to '{original}': {source}")]
574 SymlinkSnapshotHardLinkDir {
575 source: IoError,
576 original: PathBuf,
577 link: PathBuf,
578 },
579}
580
581pub fn clean_orphaned_account_snapshot_dirs(
589 bank_snapshots_dir: impl AsRef<Path>,
590 account_snapshot_paths: &[PathBuf],
591) -> IoResult<()> {
592 let mut account_snapshot_dirs_referenced = HashSet::new();
595 let snapshots = get_bank_snapshots(bank_snapshots_dir);
596 for snapshot in snapshots {
597 let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
598 let Ok(read_dir) = fs::read_dir(&account_hardlinks_dir) else {
600 debug!(
604 "failed to read account hardlinks dir '{}'",
605 account_hardlinks_dir.display(),
606 );
607 continue;
608 };
609 for entry in read_dir {
610 let path = entry?.path();
611 let target = fs::read_link(&path).map_err(|err| {
612 IoError::other(format!(
613 "failed to read symlink '{}': {err}",
614 path.display(),
615 ))
616 })?;
617 account_snapshot_dirs_referenced.insert(target);
618 }
619 }
620
621 for account_snapshot_path in account_snapshot_paths {
623 let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
624 IoError::other(format!(
625 "failed to read account snapshot dir '{}': {err}",
626 account_snapshot_path.display(),
627 ))
628 })?;
629 for entry in read_dir {
630 let path = entry?.path();
631 if !account_snapshot_dirs_referenced.contains(&path) {
632 info!(
633 "Removing orphaned account snapshot hardlink directory '{}'...",
634 path.display()
635 );
636 move_and_async_delete_path(&path);
637 }
638 }
639 }
640
641 Ok(())
642}
643
644pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
646 let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
647 return;
649 };
650
651 let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
652
653 let incomplete_dirs: Vec<_> = read_dir_iter
654 .filter_map(|entry| entry.ok())
655 .map(|entry| entry.path())
656 .filter(|path| path.is_dir())
657 .filter(is_incomplete)
658 .collect();
659
660 for incomplete_dir in incomplete_dirs {
662 let result = purge_bank_snapshot(&incomplete_dir);
663 match result {
664 Ok(_) => info!(
665 "Purged incomplete snapshot dir: {}",
666 incomplete_dir.display()
667 ),
668 Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
669 }
670 }
671}
672
673fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
675 let state_complete_path = bank_snapshot_dir
676 .as_ref()
677 .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
678 state_complete_path.is_file()
679}
680
681fn write_snapshot_state_complete_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<()> {
683 let state_complete_path = bank_snapshot_dir
684 .as_ref()
685 .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
686 fs::File::create(&state_complete_path).map_err(|err| {
687 IoError::other(format!(
688 "failed to create file '{}': {err}",
689 state_complete_path.display(),
690 ))
691 })?;
692 Ok(())
693}
694
695pub fn write_full_snapshot_slot_file(
697 bank_snapshot_dir: impl AsRef<Path>,
698 full_snapshot_slot: Slot,
699) -> IoResult<()> {
700 let full_snapshot_slot_path = bank_snapshot_dir
701 .as_ref()
702 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
703 fs::write(
704 &full_snapshot_slot_path,
705 Slot::to_le_bytes(full_snapshot_slot),
706 )
707 .map_err(|err| {
708 IoError::other(format!(
709 "failed to write full snapshot slot file '{}': {err}",
710 full_snapshot_slot_path.display(),
711 ))
712 })
713}
714
715pub fn read_full_snapshot_slot_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<Slot> {
717 const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
718 let full_snapshot_slot_path = bank_snapshot_dir
719 .as_ref()
720 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
721 let full_snapshot_slot_file_metadata = fs::metadata(&full_snapshot_slot_path)?;
722 if full_snapshot_slot_file_metadata.len() != SLOT_SIZE as u64 {
723 let error_message = format!(
724 "invalid full snapshot slot file size: '{}' has {} bytes (should be {} bytes)",
725 full_snapshot_slot_path.display(),
726 full_snapshot_slot_file_metadata.len(),
727 SLOT_SIZE,
728 );
729 return Err(IoError::other(error_message));
730 }
731 let mut full_snapshot_slot_file = fs::File::open(&full_snapshot_slot_path)?;
732 let mut buffer = [0; SLOT_SIZE];
733 full_snapshot_slot_file.read_exact(&mut buffer)?;
734 let slot = Slot::from_le_bytes(buffer);
735 Ok(slot)
736}
737
738pub fn write_storages_flushed_file(bank_snapshot_dir: impl AsRef<Path>) -> IoResult<()> {
740 let flushed_storages_path = bank_snapshot_dir
741 .as_ref()
742 .join(SNAPSHOT_STORAGES_FLUSHED_FILENAME);
743 fs::File::create(&flushed_storages_path).map_err(|err| {
744 IoError::other(format!(
745 "failed to create file '{}': {err}",
746 flushed_storages_path.display(),
747 ))
748 })?;
749 Ok(())
750}
751
752fn are_bank_snapshot_storages_flushed(bank_snapshot_dir: impl AsRef<Path>) -> bool {
754 let flushed_storages = bank_snapshot_dir
755 .as_ref()
756 .join(SNAPSHOT_STORAGES_FLUSHED_FILENAME);
757 flushed_storages.is_file()
758}
759
760pub fn get_highest_loadable_bank_snapshot(
768 snapshot_config: &SnapshotConfig,
769) -> Option<BankSnapshotInfo> {
770 let highest_bank_snapshot =
771 get_highest_bank_snapshot_post(&snapshot_config.bank_snapshots_dir)?;
772
773 if !snapshot_config.should_generate_snapshots() {
776 return Some(highest_bank_snapshot);
777 }
778
779 let highest_full_snapshot_archive_slot =
782 get_highest_full_snapshot_archive_slot(&snapshot_config.full_snapshot_archives_dir)?;
783 let full_snapshot_file_slot =
784 read_full_snapshot_slot_file(&highest_bank_snapshot.snapshot_dir).ok()?;
785 let are_storages_flushed =
786 are_bank_snapshot_storages_flushed(&highest_bank_snapshot.snapshot_dir);
787 (are_storages_flushed && (full_snapshot_file_slot == highest_full_snapshot_archive_slot))
788 .then_some(highest_bank_snapshot)
789}
790
791pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
794 if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
795 for entry in entries.flatten() {
796 if entry
797 .file_name()
798 .to_str()
799 .map(|file_name| file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX))
800 .unwrap_or(false)
801 {
802 let path = entry.path();
803 let result = if path.is_dir() {
804 fs::remove_dir_all(&path)
805 } else {
806 fs::remove_file(&path)
807 };
808 if let Err(err) = result {
809 warn!(
810 "Failed to remove temporary snapshot archive '{}': {err}",
811 path.display(),
812 );
813 }
814 }
815 }
816 }
817}
818
819pub fn serialize_and_archive_snapshot_package(
821 snapshot_package: SnapshotPackage,
822 snapshot_config: &SnapshotConfig,
823 should_flush_and_hard_link_storages: bool,
824) -> Result<SnapshotArchiveInfo> {
825 let SnapshotPackage {
826 snapshot_kind,
827 slot: snapshot_slot,
828 block_height,
829 hash: snapshot_hash,
830 mut snapshot_storages,
831 status_cache_slot_deltas,
832 bank_fields_to_serialize,
833 bank_hash_stats,
834 accounts_delta_hash,
835 accounts_hash,
836 epoch_accounts_hash,
837 bank_incremental_snapshot_persistence,
838 write_version,
839 enqueued: _,
840 } = snapshot_package;
841
842 let bank_snapshot_info = serialize_snapshot(
843 &snapshot_config.bank_snapshots_dir,
844 snapshot_config.snapshot_version,
845 snapshot_storages.as_slice(),
846 status_cache_slot_deltas.as_slice(),
847 bank_fields_to_serialize,
848 bank_hash_stats,
849 accounts_delta_hash,
850 accounts_hash,
851 epoch_accounts_hash,
852 bank_incremental_snapshot_persistence.as_ref(),
853 write_version,
854 should_flush_and_hard_link_storages,
855 )?;
856
857 let full_snapshot_archive_slot = match snapshot_kind {
859 SnapshotKind::FullSnapshot => snapshot_slot,
860 SnapshotKind::IncrementalSnapshot(base_slot) => base_slot,
861 };
862 write_full_snapshot_slot_file(&bank_snapshot_info.snapshot_dir, full_snapshot_archive_slot)
863 .map_err(|err| {
864 IoError::other(format!(
865 "failed to serialize snapshot slot {snapshot_slot}, block height {block_height}, kind {snapshot_kind:?}: {err}",
866 ))
867 })?;
868
869 let snapshot_archive_path = match snapshot_package.snapshot_kind {
870 SnapshotKind::FullSnapshot => build_full_snapshot_archive_path(
871 &snapshot_config.full_snapshot_archives_dir,
872 snapshot_package.slot,
873 &snapshot_package.hash,
874 snapshot_config.archive_format,
875 ),
876 SnapshotKind::IncrementalSnapshot(incremental_snapshot_base_slot) => {
877 snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
880 build_incremental_snapshot_archive_path(
881 &snapshot_config.incremental_snapshot_archives_dir,
882 incremental_snapshot_base_slot,
883 snapshot_package.slot,
884 &snapshot_package.hash,
885 snapshot_config.archive_format,
886 )
887 }
888 };
889
890 let snapshot_archive_info = archive_snapshot(
891 snapshot_kind,
892 snapshot_slot,
893 snapshot_hash,
894 snapshot_storages.as_slice(),
895 &bank_snapshot_info.snapshot_dir,
896 snapshot_archive_path,
897 snapshot_config.archive_format,
898 )?;
899
900 Ok(snapshot_archive_info)
901}
902
903#[allow(clippy::too_many_arguments)]
905fn serialize_snapshot(
906 bank_snapshots_dir: impl AsRef<Path>,
907 snapshot_version: SnapshotVersion,
908 snapshot_storages: &[Arc<AccountStorageEntry>],
909 slot_deltas: &[BankSlotDelta],
910 mut bank_fields: BankFieldsToSerialize,
911 bank_hash_stats: BankHashStats,
912 accounts_delta_hash: AccountsDeltaHash,
913 accounts_hash: AccountsHash,
914 epoch_accounts_hash: Option<EpochAccountsHash>,
915 bank_incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
916 write_version: u64,
917 should_flush_and_hard_link_storages: bool,
918) -> Result<BankSnapshotInfo> {
919 let slot = bank_fields.slot;
920
921 let do_serialize_snapshot = || {
924 let mut measure_everything = Measure::start("");
925 let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
926 if bank_snapshot_dir.exists() {
927 return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
928 bank_snapshot_dir,
929 ));
930 }
931 fs::create_dir_all(&bank_snapshot_dir).map_err(|err| {
932 AddBankSnapshotError::CreateSnapshotDir(err, bank_snapshot_dir.clone())
933 })?;
934
935 let bank_snapshot_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
937 info!(
938 "Creating bank snapshot for slot {slot} at '{}'",
939 bank_snapshot_path.display(),
940 );
941
942 let (flush_storages_us, hard_link_storages_us) = if should_flush_and_hard_link_storages {
943 let flush_measure = Measure::start("");
944 for storage in snapshot_storages {
945 storage.flush().map_err(|err| {
946 AddBankSnapshotError::FlushStorage(err, storage.path().to_path_buf())
947 })?;
948 }
949 let flush_us = flush_measure.end_as_us();
950 let (_, hard_link_us) = measure_us!(hard_link_storages_to_snapshot(
951 &bank_snapshot_dir,
952 slot,
953 snapshot_storages
954 )
955 .map_err(AddBankSnapshotError::HardLinkStorages)?);
956 write_storages_flushed_file(&bank_snapshot_dir)
957 .map_err(AddBankSnapshotError::MarkStoragesFlushed)?;
958 Some((flush_us, hard_link_us))
959 } else {
960 None
961 }
962 .unzip();
963
964 let bank_snapshot_serializer = move |stream: &mut BufWriter<fs::File>| -> Result<()> {
965 let versioned_epoch_stakes = mem::take(&mut bank_fields.versioned_epoch_stakes);
966 let extra_fields = ExtraFieldsToSerialize {
967 lamports_per_signature: bank_fields.fee_rate_governor.lamports_per_signature,
968 incremental_snapshot_persistence: bank_incremental_snapshot_persistence,
969 epoch_accounts_hash,
970 versioned_epoch_stakes,
971 accounts_lt_hash: bank_fields.accounts_lt_hash.clone().map(Into::into),
972 };
973 serde_snapshot::serialize_bank_snapshot_into(
974 stream,
975 bank_fields,
976 bank_hash_stats,
977 accounts_delta_hash,
978 accounts_hash,
979 &get_storages_to_serialize(snapshot_storages),
980 extra_fields,
981 write_version,
982 )?;
983 Ok(())
984 };
985 let (bank_snapshot_consumed_size, bank_serialize) = measure_time!(
986 serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
987 .map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
988 "bank serialize"
989 );
990
991 let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
992 let (status_cache_consumed_size, status_cache_serialize_us) = measure_us!(
993 snapshot_bank_utils::serialize_status_cache(slot_deltas, &status_cache_path)
994 .map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?
995 );
996
997 let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
998 let (_, write_version_file_us) = measure_us!(fs::write(
999 &version_path,
1000 snapshot_version.as_str().as_bytes(),
1001 )
1002 .map_err(|err| AddBankSnapshotError::WriteSnapshotVersionFile(err, version_path))?);
1003
1004 let (_, write_state_complete_file_us) = measure_us!({
1006 write_snapshot_state_complete_file(&bank_snapshot_dir)
1007 .map_err(AddBankSnapshotError::MarkSnapshotComplete)?
1008 });
1009
1010 measure_everything.stop();
1011
1012 datapoint_info!(
1014 "snapshot_bank",
1015 ("slot", slot, i64),
1016 ("bank_size", bank_snapshot_consumed_size, i64),
1017 ("status_cache_size", status_cache_consumed_size, i64),
1018 ("flush_storages_us", flush_storages_us, Option<i64>),
1019 ("hard_link_storages_us", hard_link_storages_us, Option<i64>),
1020 ("bank_serialize_us", bank_serialize.as_us(), i64),
1021 ("status_cache_serialize_us", status_cache_serialize_us, i64),
1022 ("write_version_file_us", write_version_file_us, i64),
1023 (
1024 "write_state_complete_file_us",
1025 write_state_complete_file_us,
1026 i64
1027 ),
1028 ("total_us", measure_everything.as_us(), i64),
1029 );
1030
1031 info!(
1032 "{} for slot {} at {}",
1033 bank_serialize,
1034 slot,
1035 bank_snapshot_path.display(),
1036 );
1037
1038 Ok(BankSnapshotInfo {
1039 slot,
1040 snapshot_kind: BankSnapshotKind::Pre,
1041 snapshot_dir: bank_snapshot_dir,
1042 snapshot_version,
1043 })
1044 };
1045
1046 do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
1047}
1048
1049fn archive_snapshot(
1051 snapshot_kind: SnapshotKind,
1052 snapshot_slot: Slot,
1053 snapshot_hash: SnapshotHash,
1054 snapshot_storages: &[Arc<AccountStorageEntry>],
1055 bank_snapshot_dir: impl AsRef<Path>,
1056 archive_path: impl AsRef<Path>,
1057 archive_format: ArchiveFormat,
1058) -> Result<SnapshotArchiveInfo> {
1059 use ArchiveSnapshotPackageError as E;
1060 const SNAPSHOTS_DIR: &str = "snapshots";
1061 const ACCOUNTS_DIR: &str = "accounts";
1062 info!("Generating snapshot archive for slot {snapshot_slot}, kind: {snapshot_kind:?}");
1063
1064 let mut timer = Measure::start("snapshot_package-package_snapshots");
1065 let tar_dir = archive_path
1066 .as_ref()
1067 .parent()
1068 .expect("Tar output path is invalid");
1069
1070 fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
1071
1072 let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
1074 let staging_dir = tempfile::Builder::new()
1075 .prefix(&format!("{}{}-", staging_dir_prefix, snapshot_slot))
1076 .tempdir_in(tar_dir)
1077 .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
1078 let staging_snapshots_dir = staging_dir.path().join(SNAPSHOTS_DIR);
1079
1080 let slot_str = snapshot_slot.to_string();
1081 let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
1082 fs::create_dir_all(&staging_snapshot_dir)
1084 .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
1085
1086 let src_snapshot_dir = bank_snapshot_dir.as_ref().canonicalize().map_err(|err| {
1088 E::CanonicalizeSnapshotSourceDir(err, bank_snapshot_dir.as_ref().to_path_buf())
1089 })?;
1090 let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
1091 let src_snapshot_file = src_snapshot_dir.join(slot_str);
1092 symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
1093 .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
1094
1095 let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1098 let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1099 symlink::symlink_file(&src_status_cache, &staging_status_cache)
1100 .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
1101
1102 let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
1104 let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1105 symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
1106 E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
1107 })?;
1108
1109 let staging_archive_path = tar_dir.join(format!(
1111 "{}{}.{}",
1112 staging_dir_prefix,
1113 snapshot_slot,
1114 archive_format.extension(),
1115 ));
1116
1117 {
1118 let archive_file = fs::File::create(&staging_archive_path)
1119 .map_err(|err| E::CreateArchiveFile(err, staging_archive_path.clone()))?;
1120
1121 let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
1122 let mut archive = tar::Builder::new(encoder);
1123 archive.sparse(false);
1132 archive
1135 .append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
1136 .map_err(E::ArchiveVersionFile)?;
1137 archive
1138 .append_dir_all(SNAPSHOTS_DIR, &staging_snapshots_dir)
1139 .map_err(E::ArchiveSnapshotsDir)?;
1140
1141 for storage in snapshot_storages {
1142 let path_in_archive = Path::new(ACCOUNTS_DIR)
1143 .join(AccountsFile::file_name(storage.slot(), storage.id()));
1144
1145 let reader =
1146 AccountStorageReader::new(storage, Some(snapshot_slot)).map_err(|err| {
1147 E::AccountStorageReaderError(err, storage.path().to_path_buf())
1148 })?;
1149 let mut header = tar::Header::new_gnu();
1150 header.set_path(path_in_archive).map_err(|err| {
1151 E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1152 })?;
1153 header.set_size(reader.len() as u64);
1154 header.set_cksum();
1155 archive.append(&header, reader).map_err(|err| {
1156 E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
1157 })?;
1158 }
1159
1160 archive.into_inner().map_err(E::FinishArchive)?;
1161 Ok(())
1162 };
1163
1164 match archive_format {
1165 ArchiveFormat::TarZstd { config } => {
1166 let mut encoder =
1167 zstd::stream::Encoder::new(archive_file, config.compression_level)
1168 .map_err(E::CreateEncoder)?;
1169 do_archive_files(&mut encoder)?;
1170 encoder.finish().map_err(E::FinishEncoder)?;
1171 }
1172 ArchiveFormat::TarLz4 => {
1173 let mut encoder = lz4::EncoderBuilder::new()
1174 .level(1)
1175 .build(archive_file)
1176 .map_err(E::CreateEncoder)?;
1177 do_archive_files(&mut encoder)?;
1178 let (_output, result) = encoder.finish();
1179 result.map_err(E::FinishEncoder)?;
1180 }
1181 _ => panic!("archiving snapshot with '{archive_format}' is not supported"),
1182 };
1183 }
1184
1185 let metadata = fs::metadata(&staging_archive_path)
1187 .map_err(|err| E::QueryArchiveMetadata(err, staging_archive_path.clone()))?;
1188 let archive_path = archive_path.as_ref().to_path_buf();
1189 fs::rename(&staging_archive_path, &archive_path)
1190 .map_err(|err| E::MoveArchive(err, staging_archive_path, archive_path.clone()))?;
1191
1192 timer.stop();
1193 info!(
1194 "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
1195 archive_path.display(),
1196 snapshot_slot,
1197 timer.as_ms(),
1198 metadata.len()
1199 );
1200
1201 datapoint_info!(
1202 "archive-snapshot-package",
1203 ("slot", snapshot_slot, i64),
1204 ("archive_format", archive_format.to_string(), String),
1205 ("duration_ms", timer.as_ms(), i64),
1206 (
1207 if snapshot_kind.is_full_snapshot() {
1208 "full-snapshot-archive-size"
1209 } else {
1210 "incremental-snapshot-archive-size"
1211 },
1212 metadata.len(),
1213 i64
1214 ),
1215 );
1216 Ok(SnapshotArchiveInfo {
1217 path: archive_path,
1218 slot: snapshot_slot,
1219 hash: snapshot_hash,
1220 archive_format,
1221 })
1222}
1223
1224pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1226 let mut bank_snapshots = Vec::default();
1227 match fs::read_dir(&bank_snapshots_dir) {
1228 Err(err) => {
1229 info!(
1230 "Unable to read bank snapshots directory '{}': {err}",
1231 bank_snapshots_dir.as_ref().display(),
1232 );
1233 }
1234 Ok(paths) => paths
1235 .filter_map(|entry| {
1236 entry
1239 .ok()
1240 .filter(|entry| entry.path().is_dir())
1241 .and_then(|entry| {
1242 entry
1243 .path()
1244 .file_name()
1245 .and_then(|file_name| file_name.to_str())
1246 .and_then(|file_name| file_name.parse::<Slot>().ok())
1247 })
1248 })
1249 .for_each(
1250 |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
1251 Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
1252 Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
1255 },
1256 ),
1257 }
1258 bank_snapshots
1259}
1260
1261pub fn get_bank_snapshots_pre(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1265 let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1266 bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Pre);
1267 bank_snapshots
1268}
1269
1270pub fn get_bank_snapshots_post(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
1274 let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
1275 bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Post);
1276 bank_snapshots
1277}
1278
1279pub fn get_highest_bank_snapshot_pre(
1283 bank_snapshots_dir: impl AsRef<Path>,
1284) -> Option<BankSnapshotInfo> {
1285 do_get_highest_bank_snapshot(get_bank_snapshots_pre(bank_snapshots_dir))
1286}
1287
1288pub fn get_highest_bank_snapshot_post(
1292 bank_snapshots_dir: impl AsRef<Path>,
1293) -> Option<BankSnapshotInfo> {
1294 do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
1295}
1296
1297pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
1301 do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
1302}
1303
1304fn do_get_highest_bank_snapshot(
1305 mut bank_snapshots: Vec<BankSnapshotInfo>,
1306) -> Option<BankSnapshotInfo> {
1307 bank_snapshots.sort_unstable();
1308 bank_snapshots.into_iter().next_back()
1309}
1310
1311pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
1312where
1313 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1314{
1315 serialize_snapshot_data_file_capped::<F>(
1316 data_file_path,
1317 MAX_SNAPSHOT_DATA_FILE_SIZE,
1318 serializer,
1319 )
1320}
1321
1322pub fn deserialize_snapshot_data_file<T: Sized>(
1323 data_file_path: &Path,
1324 deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
1325) -> Result<T> {
1326 let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
1327 deserializer(streams.full_snapshot_stream)
1328 };
1329
1330 let wrapped_data_file_path = SnapshotRootPaths {
1331 full_snapshot_root_file_path: data_file_path.to_path_buf(),
1332 incremental_snapshot_root_file_path: None,
1333 };
1334
1335 deserialize_snapshot_data_files_capped(
1336 &wrapped_data_file_path,
1337 MAX_SNAPSHOT_DATA_FILE_SIZE,
1338 wrapped_deserializer,
1339 )
1340}
1341
1342pub fn deserialize_snapshot_data_files<T: Sized>(
1343 snapshot_root_paths: &SnapshotRootPaths,
1344 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1345) -> Result<T> {
1346 deserialize_snapshot_data_files_capped(
1347 snapshot_root_paths,
1348 MAX_SNAPSHOT_DATA_FILE_SIZE,
1349 deserializer,
1350 )
1351}
1352
1353fn serialize_snapshot_data_file_capped<F>(
1354 data_file_path: &Path,
1355 maximum_file_size: u64,
1356 serializer: F,
1357) -> Result<u64>
1358where
1359 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
1360{
1361 let data_file = fs::File::create(data_file_path)?;
1362 let mut data_file_stream = BufWriter::new(data_file);
1363 serializer(&mut data_file_stream)?;
1364 data_file_stream.flush()?;
1365
1366 let consumed_size = data_file_stream.stream_position()?;
1367 if consumed_size > maximum_file_size {
1368 let error_message = format!(
1369 "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
1370 data_file_path.display(),
1371 );
1372 return Err(IoError::other(error_message).into());
1373 }
1374 Ok(consumed_size)
1375}
1376
1377fn deserialize_snapshot_data_files_capped<T: Sized>(
1378 snapshot_root_paths: &SnapshotRootPaths,
1379 maximum_file_size: u64,
1380 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1381) -> Result<T> {
1382 let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
1383 create_snapshot_data_file_stream(
1384 &snapshot_root_paths.full_snapshot_root_file_path,
1385 maximum_file_size,
1386 )?;
1387
1388 let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
1389 if let Some(ref incremental_snapshot_root_file_path) =
1390 snapshot_root_paths.incremental_snapshot_root_file_path
1391 {
1392 Some(create_snapshot_data_file_stream(
1393 incremental_snapshot_root_file_path,
1394 maximum_file_size,
1395 )?)
1396 } else {
1397 None
1398 }
1399 .unzip();
1400
1401 let mut snapshot_streams = SnapshotStreams {
1402 full_snapshot_stream: &mut full_snapshot_data_file_stream,
1403 incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
1404 };
1405 let ret = deserializer(&mut snapshot_streams)?;
1406
1407 check_deserialize_file_consumed(
1408 full_snapshot_file_size,
1409 &snapshot_root_paths.full_snapshot_root_file_path,
1410 &mut full_snapshot_data_file_stream,
1411 )?;
1412
1413 if let Some(ref incremental_snapshot_root_file_path) =
1414 snapshot_root_paths.incremental_snapshot_root_file_path
1415 {
1416 check_deserialize_file_consumed(
1417 incremental_snapshot_file_size.unwrap(),
1418 incremental_snapshot_root_file_path,
1419 incremental_snapshot_data_file_stream.as_mut().unwrap(),
1420 )?;
1421 }
1422
1423 Ok(ret)
1424}
1425
1426fn create_snapshot_data_file_stream(
1429 snapshot_root_file_path: impl AsRef<Path>,
1430 maximum_file_size: u64,
1431) -> Result<(u64, BufReader<std::fs::File>)> {
1432 let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
1433
1434 if snapshot_file_size > maximum_file_size {
1435 let error_message = format!(
1436 "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
1437 snapshot_root_file_path.as_ref().display(),
1438 snapshot_file_size,
1439 maximum_file_size,
1440 );
1441 return Err(IoError::other(error_message).into());
1442 }
1443
1444 let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
1445 let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
1446
1447 Ok((snapshot_file_size, snapshot_data_file_stream))
1448}
1449
1450fn check_deserialize_file_consumed(
1453 file_size: u64,
1454 file_path: impl AsRef<Path>,
1455 file_stream: &mut BufReader<std::fs::File>,
1456) -> Result<()> {
1457 let consumed_size = file_stream.stream_position()?;
1458
1459 if consumed_size != file_size {
1460 let error_message = format!(
1461 "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to deserialize",
1462 file_path.as_ref().display(),
1463 file_size,
1464 consumed_size,
1465 );
1466 return Err(IoError::other(error_message).into());
1467 }
1468
1469 Ok(())
1470}
1471
1472fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
1474 let run_path = appendvec_path.parent()?;
1475 let run_file_name = run_path.file_name()?;
1476 if run_file_name != ACCOUNTS_RUN_DIR {
1479 error!(
1480 "The account path {} does not have run/ as its immediate parent directory.",
1481 run_path.display()
1482 );
1483 return None;
1484 }
1485 let account_path = run_path.parent()?;
1486 Some(account_path.to_path_buf())
1487}
1488
1489fn get_snapshot_accounts_hardlink_dir(
1492 appendvec_path: &Path,
1493 bank_slot: Slot,
1494 account_paths: &mut HashSet<PathBuf>,
1495 hardlinks_dir: impl AsRef<Path>,
1496) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1497 let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1498 GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1499 })?;
1500
1501 let snapshot_hardlink_dir = account_path
1502 .join(ACCOUNTS_SNAPSHOT_DIR)
1503 .join(bank_slot.to_string());
1504
1505 if !account_paths.contains(&account_path) {
1508 let idx = account_paths.len();
1509 debug!(
1510 "for appendvec_path {}, create hard-link path {}",
1511 appendvec_path.display(),
1512 snapshot_hardlink_dir.display()
1513 );
1514 fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1515 GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1516 err,
1517 snapshot_hardlink_dir.clone(),
1518 )
1519 })?;
1520 let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1521 symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1522 GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1523 source: err,
1524 original: snapshot_hardlink_dir.clone(),
1525 link: symlink_path,
1526 }
1527 })?;
1528 account_paths.insert(account_path);
1529 };
1530
1531 Ok(snapshot_hardlink_dir)
1532}
1533
1534pub fn hard_link_storages_to_snapshot(
1539 bank_snapshot_dir: impl AsRef<Path>,
1540 bank_slot: Slot,
1541 snapshot_storages: &[Arc<AccountStorageEntry>],
1542) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1543 let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1544 fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1545 HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1546 err,
1547 accounts_hardlinks_dir.clone(),
1548 )
1549 })?;
1550
1551 let mut account_paths: HashSet<PathBuf> = HashSet::new();
1552 for storage in snapshot_storages {
1553 let storage_path = storage.accounts.path();
1554 let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1555 storage_path,
1556 bank_slot,
1557 &mut account_paths,
1558 &accounts_hardlinks_dir,
1559 )?;
1560 let hardlink_filename = AccountsFile::file_name(storage.slot(), storage.id());
1563 let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1564 fs::hard_link(storage_path, &hard_link_path).map_err(|err| {
1565 HardLinkStoragesToSnapshotError::HardLinkStorage(
1566 err,
1567 storage_path.to_path_buf(),
1568 hard_link_path,
1569 )
1570 })?;
1571 }
1572 Ok(())
1573}
1574
1575pub(crate) fn get_storages_to_serialize(
1578 snapshot_storages: &[Arc<AccountStorageEntry>],
1579) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1580 snapshot_storages
1581 .iter()
1582 .map(|storage| vec![Arc::clone(storage)])
1583 .collect::<Vec<_>>()
1584}
1585
1586const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
1588
1589pub fn verify_and_unarchive_snapshots(
1591 bank_snapshots_dir: impl AsRef<Path>,
1592 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1593 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1594 account_paths: &[PathBuf],
1595 storage_access: StorageAccess,
1596) -> Result<(UnarchivedSnapshots, UnarchivedSnapshotsGuard)> {
1597 check_are_snapshots_compatible(
1598 full_snapshot_archive_info,
1599 incremental_snapshot_archive_info,
1600 )?;
1601
1602 let parallel_divisions = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT);
1603
1604 let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
1605 let UnarchivedSnapshot {
1606 unpack_dir: full_unpack_dir,
1607 storage: full_storage,
1608 bank_fields: full_bank_fields,
1609 accounts_db_fields: full_accounts_db_fields,
1610 unpacked_snapshots_dir_and_version: full_unpacked_snapshots_dir_and_version,
1611 measure_untar: full_measure_untar,
1612 } = unarchive_snapshot(
1613 &bank_snapshots_dir,
1614 TMP_SNAPSHOT_ARCHIVE_PREFIX,
1615 full_snapshot_archive_info.path(),
1616 "snapshot untar",
1617 account_paths,
1618 full_snapshot_archive_info.archive_format(),
1619 parallel_divisions,
1620 next_append_vec_id.clone(),
1621 storage_access,
1622 )?;
1623
1624 let (
1625 incremental_unpack_dir,
1626 incremental_storage,
1627 incremental_bank_fields,
1628 incremental_accounts_db_fields,
1629 incremental_unpacked_snapshots_dir_and_version,
1630 incremental_measure_untar,
1631 ) = if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1632 let UnarchivedSnapshot {
1633 unpack_dir,
1634 storage,
1635 bank_fields,
1636 accounts_db_fields,
1637 unpacked_snapshots_dir_and_version,
1638 measure_untar,
1639 } = unarchive_snapshot(
1640 &bank_snapshots_dir,
1641 TMP_SNAPSHOT_ARCHIVE_PREFIX,
1642 incremental_snapshot_archive_info.path(),
1643 "incremental snapshot untar",
1644 account_paths,
1645 incremental_snapshot_archive_info.archive_format(),
1646 parallel_divisions,
1647 next_append_vec_id.clone(),
1648 storage_access,
1649 )?;
1650 (
1651 Some(unpack_dir),
1652 Some(storage),
1653 Some(bank_fields),
1654 Some(accounts_db_fields),
1655 Some(unpacked_snapshots_dir_and_version),
1656 Some(measure_untar),
1657 )
1658 } else {
1659 (None, None, None, None, None, None)
1660 };
1661
1662 let bank_fields = SnapshotBankFields::new(full_bank_fields, incremental_bank_fields);
1663 let accounts_db_fields =
1664 SnapshotAccountsDbFields::new(full_accounts_db_fields, incremental_accounts_db_fields);
1665 let next_append_vec_id = Arc::try_unwrap(next_append_vec_id).unwrap();
1666
1667 Ok((
1668 UnarchivedSnapshots {
1669 full_storage,
1670 incremental_storage,
1671 bank_fields,
1672 accounts_db_fields,
1673 full_unpacked_snapshots_dir_and_version,
1674 incremental_unpacked_snapshots_dir_and_version,
1675 full_measure_untar,
1676 incremental_measure_untar,
1677 next_append_vec_id,
1678 },
1679 UnarchivedSnapshotsGuard {
1680 full_unpack_dir,
1681 incremental_unpack_dir,
1682 },
1683 ))
1684}
1685
1686fn spawn_unpack_snapshot_thread(
1688 file_sender: Sender<PathBuf>,
1689 account_paths: Arc<Vec<PathBuf>>,
1690 ledger_dir: Arc<PathBuf>,
1691 mut archive: Archive<SharedBufferReader>,
1692 parallel_selector: Option<ParallelSelector>,
1693 thread_index: usize,
1694) -> JoinHandle<()> {
1695 Builder::new()
1696 .name(format!("solUnpkSnpsht{thread_index:02}"))
1697 .spawn(move || {
1698 hardened_unpack::streaming_unpack_snapshot(
1699 &mut archive,
1700 ledger_dir.as_path(),
1701 &account_paths,
1702 parallel_selector,
1703 &file_sender,
1704 )
1705 .unwrap();
1706 })
1707 .unwrap()
1708}
1709
1710fn streaming_unarchive_snapshot(
1712 file_sender: Sender<PathBuf>,
1713 account_paths: Vec<PathBuf>,
1714 ledger_dir: PathBuf,
1715 snapshot_archive_path: PathBuf,
1716 archive_format: ArchiveFormat,
1717 num_threads: usize,
1718) -> Vec<JoinHandle<()>> {
1719 let account_paths = Arc::new(account_paths);
1720 let ledger_dir = Arc::new(ledger_dir);
1721 let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format);
1722
1723 let archives: Vec<_> = (0..num_threads)
1725 .map(|_| {
1726 let reader = SharedBufferReader::new(&shared_buffer);
1727 Archive::new(reader)
1728 })
1729 .collect();
1730
1731 archives
1732 .into_iter()
1733 .enumerate()
1734 .map(|(thread_index, archive)| {
1735 let parallel_selector = Some(ParallelSelector {
1736 index: thread_index,
1737 divisions: num_threads,
1738 });
1739
1740 spawn_unpack_snapshot_thread(
1741 file_sender.clone(),
1742 account_paths.clone(),
1743 ledger_dir.clone(),
1744 archive,
1745 parallel_selector,
1746 thread_index,
1747 )
1748 })
1749 .collect()
1750}
1751
1752#[derive(PartialEq, Debug)]
1754enum SnapshotFileKind {
1755 Version,
1756 BankFields,
1757 Storage,
1758}
1759
1760fn get_snapshot_file_kind(filename: &str) -> Option<SnapshotFileKind> {
1762 static VERSION_FILE_REGEX: LazyLock<Regex> =
1763 LazyLock::new(|| Regex::new(r"^version$").unwrap());
1764 static BANK_FIELDS_FILE_REGEX: LazyLock<Regex> =
1765 LazyLock::new(|| Regex::new(r"^[0-9]+(\.pre)?$").unwrap());
1766
1767 if VERSION_FILE_REGEX.is_match(filename) {
1768 Some(SnapshotFileKind::Version)
1769 } else if BANK_FIELDS_FILE_REGEX.is_match(filename) {
1770 Some(SnapshotFileKind::BankFields)
1771 } else if get_slot_and_append_vec_id(filename).is_ok() {
1772 Some(SnapshotFileKind::Storage)
1773 } else {
1774 None
1775 }
1776}
1777
1778fn get_version_and_snapshot_files(
1782 file_receiver: &Receiver<PathBuf>,
1783) -> (PathBuf, PathBuf, Vec<PathBuf>) {
1784 let mut append_vec_files = Vec::with_capacity(1024);
1785 let mut snapshot_version_path = None;
1786 let mut snapshot_file_path = None;
1787
1788 loop {
1789 if let Ok(path) = file_receiver.recv() {
1790 let filename = path.file_name().unwrap().to_str().unwrap();
1791 match get_snapshot_file_kind(filename) {
1792 Some(SnapshotFileKind::Version) => {
1793 snapshot_version_path = Some(path);
1794
1795 if snapshot_file_path.is_some() {
1797 break;
1798 }
1799 }
1800 Some(SnapshotFileKind::BankFields) => {
1801 snapshot_file_path = Some(path);
1802
1803 if snapshot_version_path.is_some() {
1805 break;
1806 }
1807 }
1808 Some(SnapshotFileKind::Storage) => {
1809 append_vec_files.push(path);
1810 }
1811 None => {} }
1813 } else {
1814 panic!("did not receive snapshot file from unpacking threads");
1815 }
1816 }
1817 let snapshot_version_path = snapshot_version_path.unwrap();
1818 let snapshot_file_path = snapshot_file_path.unwrap();
1819
1820 (snapshot_version_path, snapshot_file_path, append_vec_files)
1821}
1822
1823struct SnapshotFieldsBundle {
1825 snapshot_version: SnapshotVersion,
1826 bank_fields: BankFieldsToDeserialize,
1827 accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
1828 append_vec_files: Vec<PathBuf>,
1829}
1830
1831fn snapshot_fields_from_files(file_receiver: &Receiver<PathBuf>) -> Result<SnapshotFieldsBundle> {
1834 let (snapshot_version_path, snapshot_file_path, append_vec_files) =
1835 get_version_and_snapshot_files(file_receiver);
1836 let snapshot_version_str = snapshot_version_from_file(snapshot_version_path)?;
1837 let snapshot_version = snapshot_version_str.parse().map_err(|err| {
1838 IoError::other(format!(
1839 "unsupported snapshot version '{snapshot_version_str}': {err}",
1840 ))
1841 })?;
1842
1843 let snapshot_file = fs::File::open(snapshot_file_path).unwrap();
1844 let mut snapshot_stream = BufReader::new(snapshot_file);
1845 let (bank_fields, accounts_db_fields) = match snapshot_version {
1846 SnapshotVersion::V1_2_0 => serde_snapshot::fields_from_stream(&mut snapshot_stream)?,
1847 };
1848
1849 Ok(SnapshotFieldsBundle {
1850 snapshot_version,
1851 bank_fields,
1852 accounts_db_fields,
1853 append_vec_files,
1854 })
1855}
1856
1857fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1862 let snapshots_dir = unpack_dir.as_ref().join("snapshots");
1863 if !snapshots_dir.is_dir() {
1864 return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1865 }
1866
1867 let slot_dir = std::fs::read_dir(&snapshots_dir)
1869 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1870 .find(|entry| entry.as_ref().unwrap().path().is_dir())
1871 .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1872 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1873 .path();
1874
1875 let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
1876 fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
1877
1878 let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1879 fs::hard_link(
1880 status_cache_file,
1881 slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
1882 )?;
1883
1884 write_snapshot_state_complete_file(slot_dir)?;
1885
1886 Ok(())
1887}
1888
1889fn unarchive_snapshot(
1893 bank_snapshots_dir: impl AsRef<Path>,
1894 unpacked_snapshots_dir_prefix: &'static str,
1895 snapshot_archive_path: impl AsRef<Path>,
1896 measure_name: &'static str,
1897 account_paths: &[PathBuf],
1898 archive_format: ArchiveFormat,
1899 parallel_divisions: usize,
1900 next_append_vec_id: Arc<AtomicAccountsFileId>,
1901 storage_access: StorageAccess,
1902) -> Result<UnarchivedSnapshot> {
1903 let unpack_dir = tempfile::Builder::new()
1904 .prefix(unpacked_snapshots_dir_prefix)
1905 .tempdir_in(bank_snapshots_dir)?;
1906 let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
1907
1908 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1909 streaming_unarchive_snapshot(
1910 file_sender,
1911 account_paths.to_vec(),
1912 unpack_dir.path().to_path_buf(),
1913 snapshot_archive_path.as_ref().to_path_buf(),
1914 archive_format,
1915 parallel_divisions,
1916 );
1917
1918 let num_rebuilder_threads = num_cpus::get_physical()
1919 .saturating_sub(parallel_divisions)
1920 .max(1);
1921 let SnapshotFieldsBundle {
1922 snapshot_version,
1923 bank_fields,
1924 accounts_db_fields,
1925 append_vec_files,
1926 ..
1927 } = snapshot_fields_from_files(&file_receiver)?;
1928 let (storage, measure_untar) = measure_time!(
1929 SnapshotStorageRebuilder::rebuild_storage(
1930 &accounts_db_fields,
1931 append_vec_files,
1932 file_receiver,
1933 num_rebuilder_threads,
1934 next_append_vec_id,
1935 SnapshotFrom::Archive,
1936 storage_access,
1937 )?,
1938 measure_name
1939 );
1940 info!("{}", measure_untar);
1941
1942 create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1943
1944 Ok(UnarchivedSnapshot {
1945 unpack_dir,
1946 storage,
1947 bank_fields,
1948 accounts_db_fields,
1949 unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1950 unpacked_snapshots_dir,
1951 snapshot_version,
1952 },
1953 measure_untar,
1954 })
1955}
1956
1957fn streaming_snapshot_dir_files(
1960 file_sender: Sender<PathBuf>,
1961 snapshot_file_path: impl Into<PathBuf>,
1962 snapshot_version_path: impl Into<PathBuf>,
1963 account_paths: &[PathBuf],
1964) -> Result<()> {
1965 file_sender.send(snapshot_file_path.into())?;
1966 file_sender.send(snapshot_version_path.into())?;
1967
1968 for account_path in account_paths {
1969 for file in fs::read_dir(account_path)? {
1970 file_sender.send(file?.path())?;
1971 }
1972 }
1973
1974 Ok(())
1975}
1976
1977pub fn rebuild_storages_from_snapshot_dir(
1982 snapshot_info: &BankSnapshotInfo,
1983 account_paths: &[PathBuf],
1984 next_append_vec_id: Arc<AtomicAccountsFileId>,
1985 storage_access: StorageAccess,
1986) -> Result<(
1987 AccountStorageMap,
1988 BankFieldsToDeserialize,
1989 AccountsDbFields<SerializableAccountStorageEntry>,
1990)> {
1991 let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1992 let accounts_hardlinks = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1993 let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1994
1995 let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1996 IoError::other(format!(
1997 "failed to read accounts hardlinks dir '{}': {err}",
1998 accounts_hardlinks.display(),
1999 ))
2000 })?;
2001 for dir_entry in read_dir {
2002 let symlink_path = dir_entry?.path();
2003 let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
2006 IoError::other(format!(
2007 "failed to read symlink '{}': {err}",
2008 symlink_path.display(),
2009 ))
2010 })?;
2011 let account_run_path = account_snapshot_path
2012 .parent()
2013 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
2014 .parent()
2015 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
2016 .join(ACCOUNTS_RUN_DIR);
2017 if !account_run_paths.contains(&account_run_path) {
2018 return Err(SnapshotError::AccountPathsMismatch);
2021 }
2022 let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
2025 IoError::other(format!(
2026 "failed to read account snapshot dir '{}': {err}",
2027 account_snapshot_path.display(),
2028 ))
2029 })?;
2030 for file in read_dir {
2031 let file_path = file?.path();
2032 let file_name = file_path
2033 .file_name()
2034 .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
2035 let dest_path = account_run_path.join(file_name);
2036 fs::hard_link(&file_path, &dest_path).map_err(|err| {
2037 IoError::other(format!(
2038 "failed to hard link from '{}' to '{}': {err}",
2039 file_path.display(),
2040 dest_path.display(),
2041 ))
2042 })?;
2043 }
2044 }
2045
2046 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
2047 let snapshot_file_path = &snapshot_info.snapshot_path();
2048 let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
2049 streaming_snapshot_dir_files(
2050 file_sender,
2051 snapshot_file_path,
2052 snapshot_version_path,
2053 account_paths,
2054 )?;
2055
2056 let SnapshotFieldsBundle {
2057 bank_fields,
2058 accounts_db_fields,
2059 append_vec_files,
2060 ..
2061 } = snapshot_fields_from_files(&file_receiver)?;
2062
2063 let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
2064 let storage = SnapshotStorageRebuilder::rebuild_storage(
2065 &accounts_db_fields,
2066 append_vec_files,
2067 file_receiver,
2068 num_rebuilder_threads,
2069 next_append_vec_id,
2070 SnapshotFrom::Dir,
2071 storage_access,
2072 )?;
2073
2074 Ok((storage, bank_fields, accounts_db_fields))
2075}
2076
2077fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
2081 let file_metadata = fs::metadata(&path).map_err(|err| {
2083 IoError::other(format!(
2084 "failed to query snapshot version file metadata '{}': {err}",
2085 path.as_ref().display(),
2086 ))
2087 })?;
2088 let file_size = file_metadata.len();
2089 if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
2090 let error_message = format!(
2091 "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
2092 path.as_ref().display(),
2093 file_size,
2094 MAX_SNAPSHOT_VERSION_FILE_SIZE,
2095 );
2096 return Err(IoError::other(error_message).into());
2097 }
2098
2099 let mut snapshot_version = String::new();
2101 let mut file = fs::File::open(&path).map_err(|err| {
2102 IoError::other(format!(
2103 "failed to open snapshot version file '{}': {err}",
2104 path.as_ref().display()
2105 ))
2106 })?;
2107 file.read_to_string(&mut snapshot_version).map_err(|err| {
2108 IoError::other(format!(
2109 "failed to read snapshot version from file '{}': {err}",
2110 path.as_ref().display()
2111 ))
2112 })?;
2113
2114 Ok(snapshot_version.trim().to_string())
2115}
2116
2117fn check_are_snapshots_compatible(
2120 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
2121 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
2122) -> Result<()> {
2123 if incremental_snapshot_archive_info.is_none() {
2124 return Ok(());
2125 }
2126
2127 let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
2128
2129 (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
2130 .then_some(())
2131 .ok_or_else(|| {
2132 SnapshotError::MismatchedBaseSlot(
2133 full_snapshot_archive_info.slot(),
2134 incremental_snapshot_archive_info.base_slot(),
2135 )
2136 })
2137}
2138
2139pub fn path_to_file_name_str(path: &Path) -> Result<&str> {
2141 path.file_name()
2142 .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))?
2143 .to_str()
2144 .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf()))
2145}
2146
2147pub fn build_snapshot_archives_remote_dir(snapshot_archives_dir: impl AsRef<Path>) -> PathBuf {
2148 snapshot_archives_dir
2149 .as_ref()
2150 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
2151}
2152
2153pub fn build_full_snapshot_archive_path(
2156 full_snapshot_archives_dir: impl AsRef<Path>,
2157 slot: Slot,
2158 hash: &SnapshotHash,
2159 archive_format: ArchiveFormat,
2160) -> PathBuf {
2161 full_snapshot_archives_dir.as_ref().join(format!(
2162 "snapshot-{}-{}.{}",
2163 slot,
2164 hash.0,
2165 archive_format.extension(),
2166 ))
2167}
2168
2169pub fn build_incremental_snapshot_archive_path(
2173 incremental_snapshot_archives_dir: impl AsRef<Path>,
2174 base_slot: Slot,
2175 slot: Slot,
2176 hash: &SnapshotHash,
2177 archive_format: ArchiveFormat,
2178) -> PathBuf {
2179 incremental_snapshot_archives_dir.as_ref().join(format!(
2180 "incremental-snapshot-{}-{}-{}.{}",
2181 base_slot,
2182 slot,
2183 hash.0,
2184 archive_format.extension(),
2185 ))
2186}
2187
2188pub(crate) fn parse_full_snapshot_archive_filename(
2190 archive_filename: &str,
2191) -> Result<(Slot, SnapshotHash, ArchiveFormat)> {
2192 static RE: std::sync::LazyLock<Regex> =
2193 std::sync::LazyLock::new(|| Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap());
2194
2195 let do_parse = || {
2196 RE.captures(archive_filename).and_then(|captures| {
2197 let slot = captures
2198 .name("slot")
2199 .map(|x| x.as_str().parse::<Slot>())?
2200 .ok()?;
2201 let hash = captures
2202 .name("hash")
2203 .map(|x| x.as_str().parse::<Hash>())?
2204 .ok()?;
2205 let archive_format = captures
2206 .name("ext")
2207 .map(|x| x.as_str().parse::<ArchiveFormat>())?
2208 .ok()?;
2209
2210 Some((slot, SnapshotHash(hash), archive_format))
2211 })
2212 };
2213
2214 do_parse().ok_or_else(|| {
2215 SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2216 })
2217}
2218
2219pub(crate) fn parse_incremental_snapshot_archive_filename(
2221 archive_filename: &str,
2222) -> Result<(Slot, Slot, SnapshotHash, ArchiveFormat)> {
2223 static RE: std::sync::LazyLock<Regex> = std::sync::LazyLock::new(|| {
2224 Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap()
2225 });
2226
2227 let do_parse = || {
2228 RE.captures(archive_filename).and_then(|captures| {
2229 let base_slot = captures
2230 .name("base")
2231 .map(|x| x.as_str().parse::<Slot>())?
2232 .ok()?;
2233 let slot = captures
2234 .name("slot")
2235 .map(|x| x.as_str().parse::<Slot>())?
2236 .ok()?;
2237 let hash = captures
2238 .name("hash")
2239 .map(|x| x.as_str().parse::<Hash>())?
2240 .ok()?;
2241 let archive_format = captures
2242 .name("ext")
2243 .map(|x| x.as_str().parse::<ArchiveFormat>())?
2244 .ok()?;
2245
2246 Some((base_slot, slot, SnapshotHash(hash), archive_format))
2247 })
2248 };
2249
2250 do_parse().ok_or_else(|| {
2251 SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
2252 })
2253}
2254
2255fn get_snapshot_archives<T, F>(snapshot_archives_dir: &Path, cb: F) -> Vec<T>
2257where
2258 F: Fn(PathBuf) -> Result<T>,
2259{
2260 let walk_dir = |dir: &Path| -> Vec<T> {
2261 let entry_iter = fs::read_dir(dir);
2262 match entry_iter {
2263 Err(err) => {
2264 info!(
2265 "Unable to read snapshot archives directory '{}': {err}",
2266 dir.display(),
2267 );
2268 vec![]
2269 }
2270 Ok(entries) => entries
2271 .filter_map(|entry| entry.map_or(None, |entry| cb(entry.path()).ok()))
2272 .collect(),
2273 }
2274 };
2275
2276 let mut ret = walk_dir(snapshot_archives_dir);
2277 let remote_dir = build_snapshot_archives_remote_dir(snapshot_archives_dir);
2278 if remote_dir.exists() {
2279 ret.append(&mut walk_dir(remote_dir.as_ref()));
2280 }
2281 ret
2282}
2283
2284pub fn get_full_snapshot_archives(
2286 full_snapshot_archives_dir: impl AsRef<Path>,
2287) -> Vec<FullSnapshotArchiveInfo> {
2288 get_snapshot_archives(
2289 full_snapshot_archives_dir.as_ref(),
2290 FullSnapshotArchiveInfo::new_from_path,
2291 )
2292}
2293
2294pub fn get_incremental_snapshot_archives(
2296 incremental_snapshot_archives_dir: impl AsRef<Path>,
2297) -> Vec<IncrementalSnapshotArchiveInfo> {
2298 get_snapshot_archives(
2299 incremental_snapshot_archives_dir.as_ref(),
2300 IncrementalSnapshotArchiveInfo::new_from_path,
2301 )
2302}
2303
2304pub fn get_highest_full_snapshot_archive_slot(
2306 full_snapshot_archives_dir: impl AsRef<Path>,
2307) -> Option<Slot> {
2308 get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
2309 .map(|full_snapshot_archive_info| full_snapshot_archive_info.slot())
2310}
2311
2312pub fn get_highest_incremental_snapshot_archive_slot(
2315 incremental_snapshot_archives_dir: impl AsRef<Path>,
2316 full_snapshot_slot: Slot,
2317) -> Option<Slot> {
2318 get_highest_incremental_snapshot_archive_info(
2319 incremental_snapshot_archives_dir,
2320 full_snapshot_slot,
2321 )
2322 .map(|incremental_snapshot_archive_info| incremental_snapshot_archive_info.slot())
2323}
2324
2325pub fn get_highest_full_snapshot_archive_info(
2327 full_snapshot_archives_dir: impl AsRef<Path>,
2328) -> Option<FullSnapshotArchiveInfo> {
2329 let mut full_snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2330 full_snapshot_archives.sort_unstable();
2331 full_snapshot_archives.into_iter().next_back()
2332}
2333
2334pub fn get_highest_incremental_snapshot_archive_info(
2337 incremental_snapshot_archives_dir: impl AsRef<Path>,
2338 full_snapshot_slot: Slot,
2339) -> Option<IncrementalSnapshotArchiveInfo> {
2340 let mut incremental_snapshot_archives =
2344 get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
2345 .into_iter()
2346 .filter(|incremental_snapshot_archive_info| {
2347 incremental_snapshot_archive_info.base_slot() == full_snapshot_slot
2348 })
2349 .collect::<Vec<_>>();
2350 incremental_snapshot_archives.sort_unstable();
2351 incremental_snapshot_archives.into_iter().next_back()
2352}
2353
2354pub fn purge_old_snapshot_archives(
2355 full_snapshot_archives_dir: impl AsRef<Path>,
2356 incremental_snapshot_archives_dir: impl AsRef<Path>,
2357 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2358 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2359) {
2360 info!(
2361 "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
2362 full_snapshot_archives_dir.as_ref().display(),
2363 maximum_full_snapshot_archives_to_retain
2364 );
2365
2366 let mut full_snapshot_archives = get_full_snapshot_archives(&full_snapshot_archives_dir);
2367 full_snapshot_archives.sort_unstable();
2368 full_snapshot_archives.reverse();
2369
2370 let num_to_retain = full_snapshot_archives
2371 .len()
2372 .min(maximum_full_snapshot_archives_to_retain.get());
2373 trace!(
2374 "There are {} full snapshot archives, retaining {}",
2375 full_snapshot_archives.len(),
2376 num_to_retain,
2377 );
2378
2379 let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
2380 if full_snapshot_archives.is_empty() {
2381 None
2382 } else {
2383 Some(full_snapshot_archives.split_at(num_to_retain))
2384 }
2385 .unwrap_or_default();
2386
2387 let retained_full_snapshot_slots = full_snapshot_archives_to_retain
2388 .iter()
2389 .map(|ai| ai.slot())
2390 .collect::<HashSet<_>>();
2391
2392 fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
2393 for path in archives.iter().map(|a| a.path()) {
2394 trace!("Removing snapshot archive: {}", path.display());
2395 let result = fs::remove_file(path);
2396 if let Err(err) = result {
2397 info!(
2398 "Failed to remove snapshot archive '{}': {err}",
2399 path.display()
2400 );
2401 }
2402 }
2403 }
2404 remove_archives(full_snapshot_archives_to_remove);
2405
2406 info!(
2407 "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
2408 incremental_snapshot_archives_dir.as_ref().display(),
2409 maximum_incremental_snapshot_archives_to_retain
2410 );
2411 let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
2412 for incremental_snapshot_archive in
2413 get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
2414 {
2415 incremental_snapshot_archives_by_base_slot
2416 .entry(incremental_snapshot_archive.base_slot())
2417 .or_default()
2418 .push(incremental_snapshot_archive)
2419 }
2420
2421 let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
2422 for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
2423 {
2424 incremental_snapshot_archives.sort_unstable();
2425 let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
2426 maximum_incremental_snapshot_archives_to_retain.get()
2427 } else {
2428 usize::from(retained_full_snapshot_slots.contains(&base_slot))
2429 };
2430 trace!(
2431 "There are {} incremental snapshot archives for base slot {}, removing {} of them",
2432 incremental_snapshot_archives.len(),
2433 base_slot,
2434 incremental_snapshot_archives
2435 .len()
2436 .saturating_sub(num_to_retain),
2437 );
2438
2439 incremental_snapshot_archives.truncate(
2440 incremental_snapshot_archives
2441 .len()
2442 .saturating_sub(num_to_retain),
2443 );
2444 remove_archives(&incremental_snapshot_archives);
2445 }
2446}
2447
2448#[cfg(feature = "dev-context-only-utils")]
2449fn unpack_snapshot_local(
2450 shared_buffer: SharedBuffer,
2451 ledger_dir: &Path,
2452 account_paths: &[PathBuf],
2453 parallel_divisions: usize,
2454) -> Result<UnpackedAppendVecMap> {
2455 assert!(parallel_divisions > 0);
2456
2457 let readers = (0..parallel_divisions)
2459 .map(|_| SharedBufferReader::new(&shared_buffer))
2460 .collect::<Vec<_>>();
2461
2462 let all_unpacked_append_vec_map = readers
2464 .into_par_iter()
2465 .enumerate()
2466 .map(|(index, reader)| {
2467 let parallel_selector = Some(ParallelSelector {
2468 index,
2469 divisions: parallel_divisions,
2470 });
2471 let mut archive = Archive::new(reader);
2472 hardened_unpack::unpack_snapshot(
2473 &mut archive,
2474 ledger_dir,
2475 account_paths,
2476 parallel_selector,
2477 )
2478 })
2479 .collect::<Vec<_>>();
2480
2481 let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
2482 for h in all_unpacked_append_vec_map {
2483 unpacked_append_vec_map.extend(h?);
2484 }
2485
2486 Ok(unpacked_append_vec_map)
2487}
2488
2489fn untar_snapshot_create_shared_buffer(
2490 snapshot_tar: &Path,
2491 archive_format: ArchiveFormat,
2492) -> SharedBuffer {
2493 let open_file = || {
2494 fs::File::open(snapshot_tar)
2495 .map_err(|err| {
2496 IoError::other(format!(
2497 "failed to open snapshot archive '{}': {err}",
2498 snapshot_tar.display(),
2499 ))
2500 })
2501 .unwrap()
2502 };
2503 match archive_format {
2505 ArchiveFormat::TarBzip2 => SharedBuffer::new(BzDecoder::new(open_file())),
2506 ArchiveFormat::TarGzip => SharedBuffer::new(GzDecoder::new(open_file())),
2507 ArchiveFormat::TarZstd { .. } => {
2508 SharedBuffer::new(zstd::stream::read::Decoder::new(open_file()).unwrap())
2509 }
2510 ArchiveFormat::TarLz4 => SharedBuffer::new(lz4::Decoder::new(open_file()).unwrap()),
2511 ArchiveFormat::Tar => SharedBuffer::new(BufReader::new(open_file())),
2512 }
2513}
2514
2515#[cfg(feature = "dev-context-only-utils")]
2516fn untar_snapshot_in(
2517 snapshot_tar: impl AsRef<Path>,
2518 unpack_dir: &Path,
2519 account_paths: &[PathBuf],
2520 archive_format: ArchiveFormat,
2521 parallel_divisions: usize,
2522) -> Result<UnpackedAppendVecMap> {
2523 let shared_buffer = untar_snapshot_create_shared_buffer(snapshot_tar.as_ref(), archive_format);
2524 unpack_snapshot_local(shared_buffer, unpack_dir, account_paths, parallel_divisions)
2525}
2526
2527pub fn verify_unpacked_snapshots_dir_and_version(
2528 unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
2529) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
2530 info!(
2531 "snapshot version: {}",
2532 &unpacked_snapshots_dir_and_version.snapshot_version
2533 );
2534
2535 let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
2536 let mut bank_snapshots =
2537 get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
2538 if bank_snapshots.len() > 1 {
2539 return Err(IoError::other(format!(
2540 "invalid snapshot format: only one snapshot allowed, but found {}",
2541 bank_snapshots.len(),
2542 ))
2543 .into());
2544 }
2545 let root_paths = bank_snapshots.pop().ok_or_else(|| {
2546 IoError::other(format!(
2547 "no snapshots found in snapshots directory '{}'",
2548 unpacked_snapshots_dir_and_version
2549 .unpacked_snapshots_dir
2550 .display(),
2551 ))
2552 })?;
2553 Ok((snapshot_version, root_paths))
2554}
2555
2556pub fn get_snapshot_file_name(slot: Slot) -> String {
2558 slot.to_string()
2559}
2560
2561pub fn get_bank_snapshot_dir(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) -> PathBuf {
2563 bank_snapshots_dir
2564 .as_ref()
2565 .join(get_snapshot_file_name(slot))
2566}
2567
2568#[derive(Debug, Copy, Clone)]
2569pub enum VerifyBank {
2571 Deterministic,
2573 NonDeterministic,
2576}
2577
2578#[cfg(feature = "dev-context-only-utils")]
2579pub fn verify_snapshot_archive(
2580 snapshot_archive: impl AsRef<Path>,
2581 snapshots_to_verify: impl AsRef<Path>,
2582 archive_format: ArchiveFormat,
2583 verify_bank: VerifyBank,
2584 slot: Slot,
2585) {
2586 let temp_dir = tempfile::TempDir::new().unwrap();
2587 let unpack_dir = temp_dir.path();
2588 let unpack_account_dir = create_accounts_run_and_snapshot_dirs(unpack_dir).unwrap().0;
2589 untar_snapshot_in(
2590 snapshot_archive,
2591 unpack_dir,
2592 &[unpack_account_dir.clone()],
2593 archive_format,
2594 1,
2595 )
2596 .unwrap();
2597
2598 let unpacked_snapshots = unpack_dir.join("snapshots");
2600
2601 let storages_to_verify = unpack_dir.join("storages_to_verify");
2604 fs::create_dir_all(&storages_to_verify).unwrap();
2606
2607 let slot = slot.to_string();
2608 let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
2609
2610 if let VerifyBank::NonDeterministic = verify_bank {
2611 let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
2613 let p2 = unpacked_snapshots.join(&slot).join(&slot);
2614 assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
2615 fs::remove_file(p1).unwrap();
2616 fs::remove_file(p2).unwrap();
2617 }
2618
2619 let existing_unpacked_status_cache_file =
2625 unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2626 let new_unpacked_status_cache_file = unpacked_snapshots
2627 .join(&slot)
2628 .join(SNAPSHOT_STATUS_CACHE_FILENAME);
2629 fs::rename(
2630 existing_unpacked_status_cache_file,
2631 new_unpacked_status_cache_file,
2632 )
2633 .unwrap();
2634
2635 let accounts_hardlinks_dir = snapshot_slot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2636 if accounts_hardlinks_dir.is_dir() {
2637 for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
2639 let link_dst_path = fs::read_link(entry.unwrap().path()).unwrap();
2640 for entry in fs::read_dir(&link_dst_path).unwrap() {
2642 let src_path = entry.unwrap().path();
2643 let dst_path = storages_to_verify.join(src_path.file_name().unwrap());
2644 fs::copy(src_path, dst_path).unwrap();
2645 }
2646 }
2647 fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
2648 }
2649
2650 let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
2651 if version_path.is_file() {
2652 fs::remove_file(version_path).unwrap();
2653 }
2654
2655 let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2656 if state_complete_path.is_file() {
2657 fs::remove_file(state_complete_path).unwrap();
2658 }
2659
2660 assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
2661
2662 _ = fs::remove_dir(unpack_account_dir.join("accounts"));
2668 assert!(!dir_diff::is_different(&storages_to_verify, unpack_account_dir).unwrap());
2670}
2671
2672pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
2674 let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2675 purge_bank_snapshots(&bank_snapshots);
2676}
2677
2678pub fn purge_old_bank_snapshots(
2680 bank_snapshots_dir: impl AsRef<Path>,
2681 num_bank_snapshots_to_retain: usize,
2682 filter_by_kind: Option<BankSnapshotKind>,
2683) {
2684 let mut bank_snapshots = match filter_by_kind {
2685 Some(BankSnapshotKind::Pre) => get_bank_snapshots_pre(&bank_snapshots_dir),
2686 Some(BankSnapshotKind::Post) => get_bank_snapshots_post(&bank_snapshots_dir),
2687 None => get_bank_snapshots(&bank_snapshots_dir),
2688 };
2689
2690 bank_snapshots.sort_unstable();
2691 purge_bank_snapshots(
2692 bank_snapshots
2693 .iter()
2694 .rev()
2695 .skip(num_bank_snapshots_to_retain),
2696 );
2697}
2698
2699pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
2704 purge_old_bank_snapshots(&bank_snapshots_dir, 0, Some(BankSnapshotKind::Pre));
2705 purge_old_bank_snapshots(&bank_snapshots_dir, 1, Some(BankSnapshotKind::Post));
2706
2707 let highest_bank_snapshot_post = get_highest_bank_snapshot_post(&bank_snapshots_dir);
2708 if let Some(highest_bank_snapshot_post) = highest_bank_snapshot_post {
2709 debug!(
2710 "Retained bank snapshot for slot {}, and purged the rest.",
2711 highest_bank_snapshot_post.slot
2712 );
2713 }
2714}
2715
2716pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
2718 let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2719 bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
2720 purge_bank_snapshots(&bank_snapshots);
2721}
2722
2723fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
2727 for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
2728 if purge_bank_snapshot(snapshot_dir).is_err() {
2729 warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
2730 }
2731 }
2732}
2733
2734pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
2736 const FN_ERR: &str = "failed to purge bank snapshot";
2737 let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2738 if accounts_hardlinks_dir.is_dir() {
2739 let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
2742 IoError::other(format!(
2743 "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
2744 accounts_hardlinks_dir.display(),
2745 ))
2746 })?;
2747 for entry in read_dir {
2748 let accounts_hardlink_dir = entry?.path();
2749 let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
2750 IoError::other(format!(
2751 "{FN_ERR}: failed to read symlink '{}': {err}",
2752 accounts_hardlink_dir.display(),
2753 ))
2754 })?;
2755 move_and_async_delete_path(&accounts_hardlink_dir);
2756 }
2757 }
2758 fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
2759 IoError::other(format!(
2760 "{FN_ERR}: failed to remove dir '{}': {err}",
2761 bank_snapshot_dir.as_ref().display(),
2762 ))
2763 })?;
2764 Ok(())
2765}
2766
2767pub fn should_take_full_snapshot(
2768 block_height: Slot,
2769 full_snapshot_archive_interval_slots: Slot,
2770) -> bool {
2771 block_height % full_snapshot_archive_interval_slots == 0
2772}
2773
2774pub fn should_take_incremental_snapshot(
2775 block_height: Slot,
2776 incremental_snapshot_archive_interval_slots: Slot,
2777 latest_full_snapshot_slot: Option<Slot>,
2778) -> bool {
2779 block_height % incremental_snapshot_archive_interval_slots == 0
2780 && latest_full_snapshot_slot.is_some()
2781}
2782
2783#[cfg(feature = "dev-context-only-utils")]
2788pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
2789 let tmp_dir = tempfile::TempDir::new().unwrap();
2790 let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
2791 (tmp_dir, account_dir)
2792}
2793
2794#[cfg(test)]
2795mod tests {
2796 use {
2797 super::*,
2798 assert_matches::assert_matches,
2799 bincode::{deserialize_from, serialize_into},
2800 std::{convert::TryFrom, mem::size_of},
2801 tempfile::NamedTempFile,
2802 };
2803
2804 #[test]
2805 fn test_serialize_snapshot_data_file_under_limit() {
2806 let temp_dir = tempfile::TempDir::new().unwrap();
2807 let expected_consumed_size = size_of::<u32>() as u64;
2808 let consumed_size = serialize_snapshot_data_file_capped(
2809 &temp_dir.path().join("data-file"),
2810 expected_consumed_size,
2811 |stream| {
2812 serialize_into(stream, &2323_u32)?;
2813 Ok(())
2814 },
2815 )
2816 .unwrap();
2817 assert_eq!(consumed_size, expected_consumed_size);
2818 }
2819
2820 #[test]
2821 fn test_serialize_snapshot_data_file_over_limit() {
2822 let temp_dir = tempfile::TempDir::new().unwrap();
2823 let expected_consumed_size = size_of::<u32>() as u64;
2824 let result = serialize_snapshot_data_file_capped(
2825 &temp_dir.path().join("data-file"),
2826 expected_consumed_size - 1,
2827 |stream| {
2828 serialize_into(stream, &2323_u32)?;
2829 Ok(())
2830 },
2831 );
2832 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
2833 }
2834
2835 #[test]
2836 fn test_deserialize_snapshot_data_file_under_limit() {
2837 let expected_data = 2323_u32;
2838 let expected_consumed_size = size_of::<u32>() as u64;
2839
2840 let temp_dir = tempfile::TempDir::new().unwrap();
2841 serialize_snapshot_data_file_capped(
2842 &temp_dir.path().join("data-file"),
2843 expected_consumed_size,
2844 |stream| {
2845 serialize_into(stream, &expected_data)?;
2846 Ok(())
2847 },
2848 )
2849 .unwrap();
2850
2851 let snapshot_root_paths = SnapshotRootPaths {
2852 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2853 incremental_snapshot_root_file_path: None,
2854 };
2855
2856 let actual_data = deserialize_snapshot_data_files_capped(
2857 &snapshot_root_paths,
2858 expected_consumed_size,
2859 |stream| {
2860 Ok(deserialize_from::<_, u32>(
2861 &mut stream.full_snapshot_stream,
2862 )?)
2863 },
2864 )
2865 .unwrap();
2866 assert_eq!(actual_data, expected_data);
2867 }
2868
2869 #[test]
2870 fn test_deserialize_snapshot_data_file_over_limit() {
2871 let expected_data = 2323_u32;
2872 let expected_consumed_size = size_of::<u32>() as u64;
2873
2874 let temp_dir = tempfile::TempDir::new().unwrap();
2875 serialize_snapshot_data_file_capped(
2876 &temp_dir.path().join("data-file"),
2877 expected_consumed_size,
2878 |stream| {
2879 serialize_into(stream, &expected_data)?;
2880 Ok(())
2881 },
2882 )
2883 .unwrap();
2884
2885 let snapshot_root_paths = SnapshotRootPaths {
2886 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2887 incremental_snapshot_root_file_path: None,
2888 };
2889
2890 let result = deserialize_snapshot_data_files_capped(
2891 &snapshot_root_paths,
2892 expected_consumed_size - 1,
2893 |stream| {
2894 Ok(deserialize_from::<_, u32>(
2895 &mut stream.full_snapshot_stream,
2896 )?)
2897 },
2898 );
2899 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
2900 }
2901
2902 #[test]
2903 fn test_deserialize_snapshot_data_file_extra_data() {
2904 let expected_data = 2323_u32;
2905 let expected_consumed_size = size_of::<u32>() as u64;
2906
2907 let temp_dir = tempfile::TempDir::new().unwrap();
2908 serialize_snapshot_data_file_capped(
2909 &temp_dir.path().join("data-file"),
2910 expected_consumed_size * 2,
2911 |stream| {
2912 serialize_into(stream.by_ref(), &expected_data)?;
2913 serialize_into(stream.by_ref(), &expected_data)?;
2914 Ok(())
2915 },
2916 )
2917 .unwrap();
2918
2919 let snapshot_root_paths = SnapshotRootPaths {
2920 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2921 incremental_snapshot_root_file_path: None,
2922 };
2923
2924 let result = deserialize_snapshot_data_files_capped(
2925 &snapshot_root_paths,
2926 expected_consumed_size * 2,
2927 |stream| {
2928 Ok(deserialize_from::<_, u32>(
2929 &mut stream.full_snapshot_stream,
2930 )?)
2931 },
2932 );
2933 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2934 }
2935
2936 #[test]
2937 fn test_snapshot_version_from_file_under_limit() {
2938 let file_content = SnapshotVersion::default().as_str();
2939 let mut file = NamedTempFile::new().unwrap();
2940 file.write_all(file_content.as_bytes()).unwrap();
2941 let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2942 assert_eq!(version_from_file, file_content);
2943 }
2944
2945 #[test]
2946 fn test_snapshot_version_from_file_over_limit() {
2947 let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2948 let file_content = vec![7u8; over_limit_size];
2949 let mut file = NamedTempFile::new().unwrap();
2950 file.write_all(&file_content).unwrap();
2951 assert_matches!(
2952 snapshot_version_from_file(file.path()),
2953 Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("snapshot version file too large")
2954 );
2955 }
2956
2957 #[test]
2958 fn test_parse_full_snapshot_archive_filename() {
2959 assert_eq!(
2960 parse_full_snapshot_archive_filename(&format!(
2961 "snapshot-42-{}.tar.bz2",
2962 Hash::default()
2963 ))
2964 .unwrap(),
2965 (42, SnapshotHash(Hash::default()), ArchiveFormat::TarBzip2)
2966 );
2967 assert_eq!(
2968 parse_full_snapshot_archive_filename(&format!(
2969 "snapshot-43-{}.tar.zst",
2970 Hash::default()
2971 ))
2972 .unwrap(),
2973 (
2974 43,
2975 SnapshotHash(Hash::default()),
2976 ArchiveFormat::TarZstd {
2977 config: ZstdConfig::default(),
2978 }
2979 )
2980 );
2981 assert_eq!(
2982 parse_full_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default()))
2983 .unwrap(),
2984 (44, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2985 );
2986 assert_eq!(
2987 parse_full_snapshot_archive_filename(&format!(
2988 "snapshot-45-{}.tar.lz4",
2989 Hash::default()
2990 ))
2991 .unwrap(),
2992 (45, SnapshotHash(Hash::default()), ArchiveFormat::TarLz4)
2993 );
2994
2995 assert!(parse_full_snapshot_archive_filename("invalid").is_err());
2996 assert!(
2997 parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err()
2998 );
2999
3000 assert!(
3001 parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err()
3002 );
3003 assert!(parse_full_snapshot_archive_filename(&format!(
3004 "snapshot-12345678-{}.bad!ext",
3005 Hash::new_unique()
3006 ))
3007 .is_err());
3008 assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
3009
3010 assert!(parse_full_snapshot_archive_filename(&format!(
3011 "snapshot-bad!slot-{}.bad!ext",
3012 Hash::new_unique()
3013 ))
3014 .is_err());
3015 assert!(parse_full_snapshot_archive_filename(&format!(
3016 "snapshot-12345678-{}.bad!ext",
3017 Hash::new_unique()
3018 ))
3019 .is_err());
3020 assert!(parse_full_snapshot_archive_filename(&format!(
3021 "snapshot-bad!slot-{}.tar",
3022 Hash::new_unique()
3023 ))
3024 .is_err());
3025
3026 assert!(parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_err());
3027 assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
3028 assert!(parse_full_snapshot_archive_filename(&format!(
3029 "snapshot-bad!slot-{}.tar",
3030 Hash::new_unique()
3031 ))
3032 .is_err());
3033 }
3034
3035 #[test]
3036 fn test_parse_incremental_snapshot_archive_filename() {
3037 assert_eq!(
3038 parse_incremental_snapshot_archive_filename(&format!(
3039 "incremental-snapshot-42-123-{}.tar.bz2",
3040 Hash::default()
3041 ))
3042 .unwrap(),
3043 (
3044 42,
3045 123,
3046 SnapshotHash(Hash::default()),
3047 ArchiveFormat::TarBzip2
3048 )
3049 );
3050 assert_eq!(
3051 parse_incremental_snapshot_archive_filename(&format!(
3052 "incremental-snapshot-43-234-{}.tar.zst",
3053 Hash::default()
3054 ))
3055 .unwrap(),
3056 (
3057 43,
3058 234,
3059 SnapshotHash(Hash::default()),
3060 ArchiveFormat::TarZstd {
3061 config: ZstdConfig::default(),
3062 }
3063 )
3064 );
3065 assert_eq!(
3066 parse_incremental_snapshot_archive_filename(&format!(
3067 "incremental-snapshot-44-345-{}.tar",
3068 Hash::default()
3069 ))
3070 .unwrap(),
3071 (44, 345, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
3072 );
3073 assert_eq!(
3074 parse_incremental_snapshot_archive_filename(&format!(
3075 "incremental-snapshot-45-456-{}.tar.lz4",
3076 Hash::default()
3077 ))
3078 .unwrap(),
3079 (
3080 45,
3081 456,
3082 SnapshotHash(Hash::default()),
3083 ArchiveFormat::TarLz4
3084 )
3085 );
3086
3087 assert!(parse_incremental_snapshot_archive_filename("invalid").is_err());
3088 assert!(parse_incremental_snapshot_archive_filename(&format!(
3089 "snapshot-42-{}.tar",
3090 Hash::new_unique()
3091 ))
3092 .is_err());
3093 assert!(parse_incremental_snapshot_archive_filename(
3094 "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext"
3095 )
3096 .is_err());
3097
3098 assert!(parse_incremental_snapshot_archive_filename(&format!(
3099 "incremental-snapshot-bad!slot-56785678-{}.tar",
3100 Hash::new_unique()
3101 ))
3102 .is_err());
3103
3104 assert!(parse_incremental_snapshot_archive_filename(&format!(
3105 "incremental-snapshot-12345678-bad!slot-{}.tar",
3106 Hash::new_unique()
3107 ))
3108 .is_err());
3109
3110 assert!(parse_incremental_snapshot_archive_filename(
3111 "incremental-snapshot-12341234-56785678-bad!HASH.tar"
3112 )
3113 .is_err());
3114
3115 assert!(parse_incremental_snapshot_archive_filename(&format!(
3116 "incremental-snapshot-12341234-56785678-{}.bad!ext",
3117 Hash::new_unique()
3118 ))
3119 .is_err());
3120 }
3121
3122 #[test]
3123 fn test_check_are_snapshots_compatible() {
3124 let slot1: Slot = 1234;
3125 let slot2: Slot = 5678;
3126 let slot3: Slot = 999_999;
3127
3128 let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
3129 format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()),
3130 ))
3131 .unwrap();
3132
3133 assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
3134
3135 let incremental_snapshot_archive_info =
3136 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
3137 "/dir/incremental-snapshot-{}-{}-{}.tar",
3138 slot1,
3139 slot2,
3140 Hash::new_unique()
3141 )))
3142 .unwrap();
3143
3144 assert!(check_are_snapshots_compatible(
3145 &full_snapshot_archive_info,
3146 Some(&incremental_snapshot_archive_info)
3147 )
3148 .is_ok());
3149
3150 let incremental_snapshot_archive_info =
3151 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
3152 "/dir/incremental-snapshot-{}-{}-{}.tar",
3153 slot2,
3154 slot3,
3155 Hash::new_unique()
3156 )))
3157 .unwrap();
3158
3159 assert!(check_are_snapshots_compatible(
3160 &full_snapshot_archive_info,
3161 Some(&incremental_snapshot_archive_info)
3162 )
3163 .is_err());
3164 }
3165
3166 fn common_create_bank_snapshot_files(
3168 bank_snapshots_dir: &Path,
3169 min_slot: Slot,
3170 max_slot: Slot,
3171 ) {
3172 for slot in min_slot..max_slot {
3173 let snapshot_dir = get_bank_snapshot_dir(bank_snapshots_dir, slot);
3174 fs::create_dir_all(&snapshot_dir).unwrap();
3175
3176 let snapshot_filename = get_snapshot_file_name(slot);
3177 let snapshot_path = snapshot_dir.join(snapshot_filename);
3178 fs::File::create(snapshot_path).unwrap();
3179
3180 let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
3181 fs::File::create(status_cache_file).unwrap();
3182
3183 let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
3184 fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
3185
3186 write_snapshot_state_complete_file(snapshot_dir).unwrap();
3188 }
3189 }
3190
3191 #[test]
3192 fn test_get_bank_snapshots() {
3193 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
3194 let min_slot = 10;
3195 let max_slot = 20;
3196 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
3197
3198 let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
3199 assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
3200 }
3201
3202 #[test]
3203 fn test_get_highest_bank_snapshot_post() {
3204 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
3205 let min_slot = 99;
3206 let max_slot = 123;
3207 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
3208
3209 let highest_bank_snapshot = get_highest_bank_snapshot_post(temp_snapshots_dir.path());
3210 assert!(highest_bank_snapshot.is_some());
3211 assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
3212 }
3213
3214 fn common_create_snapshot_archive_files(
3220 full_snapshot_archives_dir: &Path,
3221 incremental_snapshot_archives_dir: &Path,
3222 min_full_snapshot_slot: Slot,
3223 max_full_snapshot_slot: Slot,
3224 min_incremental_snapshot_slot: Slot,
3225 max_incremental_snapshot_slot: Slot,
3226 ) {
3227 fs::create_dir_all(full_snapshot_archives_dir).unwrap();
3228 fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
3229 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3230 for incremental_snapshot_slot in
3231 min_incremental_snapshot_slot..max_incremental_snapshot_slot
3232 {
3233 let snapshot_filename = format!(
3234 "incremental-snapshot-{}-{}-{}.tar",
3235 full_snapshot_slot,
3236 incremental_snapshot_slot,
3237 Hash::default()
3238 );
3239 let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
3240 fs::File::create(snapshot_filepath).unwrap();
3241 }
3242
3243 let snapshot_filename =
3244 format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3245 let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
3246 fs::File::create(snapshot_filepath).unwrap();
3247
3248 let bad_filename = format!(
3250 "incremental-snapshot-{}-{}-bad!hash.tar",
3251 full_snapshot_slot,
3252 max_incremental_snapshot_slot + 1,
3253 );
3254 let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
3255 fs::File::create(bad_filepath).unwrap();
3256 }
3257
3258 let bad_filename = format!("snapshot-{}-bad!hash.tar", max_full_snapshot_slot + 1);
3261 let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
3262 fs::File::create(bad_filepath).unwrap();
3263 }
3264
3265 #[test]
3266 fn test_get_full_snapshot_archives() {
3267 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3268 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3269 let min_slot = 123;
3270 let max_slot = 456;
3271 common_create_snapshot_archive_files(
3272 full_snapshot_archives_dir.path(),
3273 incremental_snapshot_archives_dir.path(),
3274 min_slot,
3275 max_slot,
3276 0,
3277 0,
3278 );
3279
3280 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3281 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3282 }
3283
3284 #[test]
3285 fn test_get_full_snapshot_archives_remote() {
3286 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3287 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3288 let min_slot = 123;
3289 let max_slot = 456;
3290 common_create_snapshot_archive_files(
3291 &full_snapshot_archives_dir
3292 .path()
3293 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3294 &incremental_snapshot_archives_dir
3295 .path()
3296 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3297 min_slot,
3298 max_slot,
3299 0,
3300 0,
3301 );
3302
3303 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
3304 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
3305 assert!(snapshot_archives.iter().all(|info| info.is_remote()));
3306 }
3307
3308 #[test]
3309 fn test_get_incremental_snapshot_archives() {
3310 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3311 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3312 let min_full_snapshot_slot = 12;
3313 let max_full_snapshot_slot = 23;
3314 let min_incremental_snapshot_slot = 34;
3315 let max_incremental_snapshot_slot = 45;
3316 common_create_snapshot_archive_files(
3317 full_snapshot_archives_dir.path(),
3318 incremental_snapshot_archives_dir.path(),
3319 min_full_snapshot_slot,
3320 max_full_snapshot_slot,
3321 min_incremental_snapshot_slot,
3322 max_incremental_snapshot_slot,
3323 );
3324
3325 let incremental_snapshot_archives =
3326 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3327 assert_eq!(
3328 incremental_snapshot_archives.len() as Slot,
3329 (max_full_snapshot_slot - min_full_snapshot_slot)
3330 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3331 );
3332 }
3333
3334 #[test]
3335 fn test_get_incremental_snapshot_archives_remote() {
3336 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3337 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3338 let min_full_snapshot_slot = 12;
3339 let max_full_snapshot_slot = 23;
3340 let min_incremental_snapshot_slot = 34;
3341 let max_incremental_snapshot_slot = 45;
3342 common_create_snapshot_archive_files(
3343 &full_snapshot_archives_dir
3344 .path()
3345 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3346 &incremental_snapshot_archives_dir
3347 .path()
3348 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
3349 min_full_snapshot_slot,
3350 max_full_snapshot_slot,
3351 min_incremental_snapshot_slot,
3352 max_incremental_snapshot_slot,
3353 );
3354
3355 let incremental_snapshot_archives =
3356 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
3357 assert_eq!(
3358 incremental_snapshot_archives.len() as Slot,
3359 (max_full_snapshot_slot - min_full_snapshot_slot)
3360 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
3361 );
3362 assert!(incremental_snapshot_archives
3363 .iter()
3364 .all(|info| info.is_remote()));
3365 }
3366
3367 #[test]
3368 fn test_get_highest_full_snapshot_archive_slot() {
3369 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3370 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3371 let min_slot = 123;
3372 let max_slot = 456;
3373 common_create_snapshot_archive_files(
3374 full_snapshot_archives_dir.path(),
3375 incremental_snapshot_archives_dir.path(),
3376 min_slot,
3377 max_slot,
3378 0,
3379 0,
3380 );
3381
3382 assert_eq!(
3383 get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
3384 Some(max_slot - 1)
3385 );
3386 }
3387
3388 #[test]
3389 fn test_get_highest_incremental_snapshot_slot() {
3390 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3391 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3392 let min_full_snapshot_slot = 12;
3393 let max_full_snapshot_slot = 23;
3394 let min_incremental_snapshot_slot = 34;
3395 let max_incremental_snapshot_slot = 45;
3396 common_create_snapshot_archive_files(
3397 full_snapshot_archives_dir.path(),
3398 incremental_snapshot_archives_dir.path(),
3399 min_full_snapshot_slot,
3400 max_full_snapshot_slot,
3401 min_incremental_snapshot_slot,
3402 max_incremental_snapshot_slot,
3403 );
3404
3405 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
3406 assert_eq!(
3407 get_highest_incremental_snapshot_archive_slot(
3408 incremental_snapshot_archives_dir.path(),
3409 full_snapshot_slot
3410 ),
3411 Some(max_incremental_snapshot_slot - 1)
3412 );
3413 }
3414
3415 assert_eq!(
3416 get_highest_incremental_snapshot_archive_slot(
3417 incremental_snapshot_archives_dir.path(),
3418 max_full_snapshot_slot
3419 ),
3420 None
3421 );
3422 }
3423
3424 fn common_test_purge_old_snapshot_archives(
3425 snapshot_names: &[&String],
3426 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
3427 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
3428 expected_snapshots: &[&String],
3429 ) {
3430 let temp_snap_dir = tempfile::TempDir::new().unwrap();
3431
3432 for snap_name in snapshot_names {
3433 let snap_path = temp_snap_dir.path().join(snap_name);
3434 let mut _snap_file = fs::File::create(snap_path);
3435 }
3436 purge_old_snapshot_archives(
3437 temp_snap_dir.path(),
3438 temp_snap_dir.path(),
3439 maximum_full_snapshot_archives_to_retain,
3440 maximum_incremental_snapshot_archives_to_retain,
3441 );
3442
3443 let mut retained_snaps = HashSet::new();
3444 for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
3445 let entry_path_buf = entry.unwrap().path();
3446 let entry_path = entry_path_buf.as_path();
3447 let snapshot_name = entry_path
3448 .file_name()
3449 .unwrap()
3450 .to_str()
3451 .unwrap()
3452 .to_string();
3453 retained_snaps.insert(snapshot_name);
3454 }
3455
3456 for snap_name in expected_snapshots {
3457 assert!(
3458 retained_snaps.contains(snap_name.as_str()),
3459 "{snap_name} not found"
3460 );
3461 }
3462 assert_eq!(retained_snaps.len(), expected_snapshots.len());
3463 }
3464
3465 #[test]
3466 fn test_purge_old_full_snapshot_archives() {
3467 let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
3468 let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
3469 let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
3470 let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
3471
3472 let expected_snapshots = vec![&snap3_name];
3474 common_test_purge_old_snapshot_archives(
3475 &snapshot_names,
3476 NonZeroUsize::new(1).unwrap(),
3477 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3478 &expected_snapshots,
3479 );
3480
3481 let expected_snapshots = vec![&snap2_name, &snap3_name];
3483 common_test_purge_old_snapshot_archives(
3484 &snapshot_names,
3485 NonZeroUsize::new(2).unwrap(),
3486 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3487 &expected_snapshots,
3488 );
3489
3490 let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
3492 common_test_purge_old_snapshot_archives(
3493 &snapshot_names,
3494 NonZeroUsize::new(3).unwrap(),
3495 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
3496 &expected_snapshots,
3497 );
3498 }
3499
3500 #[test]
3504 fn test_purge_old_full_snapshot_archives_in_the_loop() {
3505 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3506 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3507 let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
3508 let starting_slot: Slot = 42;
3509
3510 for slot in (starting_slot..).take(100) {
3511 let full_snapshot_archive_file_name =
3512 format!("snapshot-{}-{}.tar", slot, Hash::default());
3513 let full_snapshot_archive_path = full_snapshot_archives_dir
3514 .as_ref()
3515 .join(full_snapshot_archive_file_name);
3516 fs::File::create(full_snapshot_archive_path).unwrap();
3517
3518 if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
3520 continue;
3521 }
3522
3523 if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
3525 continue;
3526 }
3527
3528 purge_old_snapshot_archives(
3529 &full_snapshot_archives_dir,
3530 &incremental_snapshot_archives_dir,
3531 maximum_snapshots_to_retain,
3532 NonZeroUsize::new(usize::MAX).unwrap(),
3533 );
3534 let mut full_snapshot_archives =
3535 get_full_snapshot_archives(&full_snapshot_archives_dir);
3536 full_snapshot_archives.sort_unstable();
3537 assert_eq!(
3538 full_snapshot_archives.len(),
3539 maximum_snapshots_to_retain.get()
3540 );
3541 assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
3542 for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
3543 assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
3544 }
3545 }
3546 }
3547
3548 #[test]
3549 fn test_purge_old_incremental_snapshot_archives() {
3550 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3551 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3552 let starting_slot = 100_000;
3553
3554 let maximum_incremental_snapshot_archives_to_retain =
3555 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3556 let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3557
3558 let incremental_snapshot_interval = 100;
3559 let num_incremental_snapshots_per_full_snapshot =
3560 maximum_incremental_snapshot_archives_to_retain.get() * 2;
3561 let full_snapshot_interval =
3562 incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
3563
3564 let mut snapshot_filenames = vec![];
3565 (starting_slot..)
3566 .step_by(full_snapshot_interval)
3567 .take(
3568 maximum_full_snapshot_archives_to_retain
3569 .checked_mul(NonZeroUsize::new(2).unwrap())
3570 .unwrap()
3571 .get(),
3572 )
3573 .for_each(|full_snapshot_slot| {
3574 let snapshot_filename =
3575 format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3576 let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
3577 fs::File::create(snapshot_path).unwrap();
3578 snapshot_filenames.push(snapshot_filename);
3579
3580 (full_snapshot_slot..)
3581 .step_by(incremental_snapshot_interval)
3582 .take(num_incremental_snapshots_per_full_snapshot)
3583 .skip(1)
3584 .for_each(|incremental_snapshot_slot| {
3585 let snapshot_filename = format!(
3586 "incremental-snapshot-{}-{}-{}.tar",
3587 full_snapshot_slot,
3588 incremental_snapshot_slot,
3589 Hash::default()
3590 );
3591 let snapshot_path = incremental_snapshot_archives_dir
3592 .path()
3593 .join(&snapshot_filename);
3594 fs::File::create(snapshot_path).unwrap();
3595 snapshot_filenames.push(snapshot_filename);
3596 });
3597 });
3598
3599 purge_old_snapshot_archives(
3600 full_snapshot_archives_dir.path(),
3601 incremental_snapshot_archives_dir.path(),
3602 maximum_full_snapshot_archives_to_retain,
3603 maximum_incremental_snapshot_archives_to_retain,
3604 );
3605
3606 let mut remaining_full_snapshot_archives =
3608 get_full_snapshot_archives(full_snapshot_archives_dir.path());
3609 assert_eq!(
3610 remaining_full_snapshot_archives.len(),
3611 maximum_full_snapshot_archives_to_retain.get(),
3612 );
3613 remaining_full_snapshot_archives.sort_unstable();
3614 let latest_full_snapshot_archive_slot =
3615 remaining_full_snapshot_archives.last().unwrap().slot();
3616
3617 let mut remaining_incremental_snapshot_archives =
3622 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3623 assert_eq!(
3624 remaining_incremental_snapshot_archives.len(),
3625 maximum_incremental_snapshot_archives_to_retain
3626 .get()
3627 .saturating_add(
3628 maximum_full_snapshot_archives_to_retain
3629 .get()
3630 .saturating_sub(1)
3631 )
3632 );
3633 remaining_incremental_snapshot_archives.sort_unstable();
3634 remaining_incremental_snapshot_archives.reverse();
3635
3636 for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
3638 let incremental_snapshot_archive =
3639 remaining_incremental_snapshot_archives.pop().unwrap();
3640
3641 let expected_base_slot =
3642 latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
3643 assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
3644 let expected_slot = expected_base_slot
3645 + (full_snapshot_interval - incremental_snapshot_interval) as u64;
3646 assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
3647 }
3648
3649 for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
3651 assert_eq!(
3652 incremental_snapshot_archive.base_slot(),
3653 latest_full_snapshot_archive_slot
3654 );
3655 }
3656
3657 let expected_remaining_incremental_snapshot_archive_slots =
3659 (latest_full_snapshot_archive_slot..)
3660 .step_by(incremental_snapshot_interval)
3661 .take(num_incremental_snapshots_per_full_snapshot)
3662 .skip(
3663 num_incremental_snapshots_per_full_snapshot
3664 - maximum_incremental_snapshot_archives_to_retain.get(),
3665 )
3666 .collect::<HashSet<_>>();
3667
3668 let actual_remaining_incremental_snapshot_archive_slots =
3669 remaining_incremental_snapshot_archives
3670 .iter()
3671 .map(|snapshot| snapshot.slot())
3672 .collect::<HashSet<_>>();
3673 assert_eq!(
3674 actual_remaining_incremental_snapshot_archive_slots,
3675 expected_remaining_incremental_snapshot_archive_slots
3676 );
3677 }
3678
3679 #[test]
3680 fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
3681 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3682 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3683
3684 for snapshot_filenames in [
3685 format!("incremental-snapshot-100-120-{}.tar", Hash::default()),
3686 format!("incremental-snapshot-100-140-{}.tar", Hash::default()),
3687 format!("incremental-snapshot-100-160-{}.tar", Hash::default()),
3688 format!("incremental-snapshot-100-180-{}.tar", Hash::default()),
3689 format!("incremental-snapshot-200-220-{}.tar", Hash::default()),
3690 format!("incremental-snapshot-200-240-{}.tar", Hash::default()),
3691 format!("incremental-snapshot-200-260-{}.tar", Hash::default()),
3692 format!("incremental-snapshot-200-280-{}.tar", Hash::default()),
3693 ] {
3694 let snapshot_path = incremental_snapshot_archives_dir
3695 .path()
3696 .join(snapshot_filenames);
3697 fs::File::create(snapshot_path).unwrap();
3698 }
3699
3700 purge_old_snapshot_archives(
3701 full_snapshot_archives_dir.path(),
3702 incremental_snapshot_archives_dir.path(),
3703 NonZeroUsize::new(usize::MAX).unwrap(),
3704 NonZeroUsize::new(usize::MAX).unwrap(),
3705 );
3706
3707 let remaining_incremental_snapshot_archives =
3708 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3709 assert!(remaining_incremental_snapshot_archives.is_empty());
3710 }
3711
3712 #[test]
3713 fn test_get_snapshot_accounts_hardlink_dir() {
3714 let slot: Slot = 1;
3715
3716 let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
3717
3718 let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
3719 let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
3720 let accounts_hardlinks_dir = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
3721 fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
3722
3723 let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
3724 let appendvec_filename = format!("{slot}.0");
3725 let appendvec_path = accounts_dir.join(appendvec_filename);
3726
3727 let ret = get_snapshot_accounts_hardlink_dir(
3728 &appendvec_path,
3729 slot,
3730 &mut account_paths_set,
3731 &accounts_hardlinks_dir,
3732 );
3733 assert!(ret.is_ok());
3734
3735 let wrong_appendvec_path = appendvec_path
3736 .parent()
3737 .unwrap()
3738 .parent()
3739 .unwrap()
3740 .join(appendvec_path.file_name().unwrap());
3741 let ret = get_snapshot_accounts_hardlink_dir(
3742 &wrong_appendvec_path,
3743 slot,
3744 &mut account_paths_set,
3745 accounts_hardlinks_dir,
3746 );
3747
3748 assert_matches!(
3749 ret,
3750 Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
3751 );
3752 }
3753
3754 #[test]
3755 fn test_get_snapshot_file_kind() {
3756 assert_eq!(None, get_snapshot_file_kind("file.txt"));
3757 assert_eq!(
3758 Some(SnapshotFileKind::Version),
3759 get_snapshot_file_kind(SNAPSHOT_VERSION_FILENAME)
3760 );
3761 assert_eq!(
3762 Some(SnapshotFileKind::BankFields),
3763 get_snapshot_file_kind("1234")
3764 );
3765 assert_eq!(
3766 Some(SnapshotFileKind::Storage),
3767 get_snapshot_file_kind("1000.999")
3768 );
3769 }
3770
3771 #[test]
3772 fn test_full_snapshot_slot_file_good() {
3773 let slot_written = 123_456_789;
3774 let bank_snapshot_dir = TempDir::new().unwrap();
3775 write_full_snapshot_slot_file(&bank_snapshot_dir, slot_written).unwrap();
3776
3777 let slot_read = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap();
3778 assert_eq!(slot_read, slot_written);
3779 }
3780
3781 #[test]
3782 fn test_full_snapshot_slot_file_bad() {
3783 const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
3784 let too_small = [1u8; SLOT_SIZE - 1];
3785 let too_large = [1u8; SLOT_SIZE + 1];
3786
3787 for contents in [too_small.as_slice(), too_large.as_slice()] {
3788 let bank_snapshot_dir = TempDir::new().unwrap();
3789 let full_snapshot_slot_path = bank_snapshot_dir
3790 .as_ref()
3791 .join(SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
3792 fs::write(full_snapshot_slot_path, contents).unwrap();
3793
3794 let err = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap_err();
3795 assert!(err
3796 .to_string()
3797 .starts_with("invalid full snapshot slot file size"));
3798 }
3799 }
3800}