1use {
2 crate::{
3 serde_snapshot::SnapshotStreams,
4 snapshot_archive_info::{
5 FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfoGetter,
6 },
7 snapshot_hash::SnapshotHash,
8 snapshot_package::SnapshotPackage,
9 snapshot_utils::snapshot_storage_rebuilder::{
10 RebuiltSnapshotStorage, SnapshotStorageRebuilder,
11 },
12 },
13 bzip2::bufread::BzDecoder,
14 crossbeam_channel::Sender,
15 flate2::read::GzDecoder,
16 lazy_static::lazy_static,
17 log::*,
18 miraland_accounts_db::{
19 account_storage::AccountStorageMap,
20 accounts_db::{AccountStorageEntry, AtomicAppendVecId},
21 accounts_file::AccountsFileError,
22 append_vec::AppendVec,
23 hardened_unpack::{self, ParallelSelector, UnpackError},
24 shared_buffer_reader::{SharedBuffer, SharedBufferReader},
25 utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
26 },
27 miraland_measure::{measure, measure::Measure},
28 regex::Regex,
29 miraland_sdk::{clock::Slot, hash::Hash},
30 std::{
31 cmp::Ordering,
32 collections::{HashMap, HashSet},
33 fmt, fs,
34 io::{BufReader, BufWriter, Error as IoError, Read, Result as IoResult, Seek, Write},
35 num::NonZeroUsize,
36 path::{Path, PathBuf},
37 process::ExitStatus,
38 str::FromStr,
39 sync::Arc,
40 thread::{Builder, JoinHandle},
41 },
42 tar::{self, Archive},
43 tempfile::TempDir,
44 thiserror::Error,
45};
46#[cfg(feature = "dev-context-only-utils")]
47use {
48 hardened_unpack::UnpackedAppendVecMap,
49 miraland_accounts_db::utils::create_accounts_run_and_snapshot_dirs, rayon::prelude::*,
50};
51
52mod archive_format;
53pub mod snapshot_storage_rebuilder;
54pub use archive_format::*;
55
56pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
57pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
58pub const SNAPSHOT_STATE_COMPLETE_FILENAME: &str = "state_complete";
59pub const SNAPSHOT_ACCOUNTS_HARDLINKS: &str = "accounts_hardlinks";
60pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
61pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; const VERSION_STRING_V1_2_0: &str = "1.2.0";
64pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
65pub const BANK_SNAPSHOT_PRE_FILENAME_EXTENSION: &str = "pre";
66pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
71 unsafe { NonZeroUsize::new_unchecked(2) };
72pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: NonZeroUsize =
73 unsafe { NonZeroUsize::new_unchecked(4) };
74pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
75pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz|tar\.lz4)$";
76
77#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
78pub enum SnapshotVersion {
79 #[default]
80 V1_2_0,
81}
82
83impl fmt::Display for SnapshotVersion {
84 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
85 f.write_str(From::from(*self))
86 }
87}
88
89impl From<SnapshotVersion> for &'static str {
90 fn from(snapshot_version: SnapshotVersion) -> &'static str {
91 match snapshot_version {
92 SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
93 }
94 }
95}
96
97impl FromStr for SnapshotVersion {
98 type Err = &'static str;
99
100 fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
101 let version_string = if version_string
103 .get(..1)
104 .map_or(false, |s| s.eq_ignore_ascii_case("v"))
105 {
106 &version_string[1..]
107 } else {
108 version_string
109 };
110 match version_string {
111 VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
112 _ => Err("unsupported snapshot version"),
113 }
114 }
115}
116
117impl SnapshotVersion {
118 pub fn as_str(self) -> &'static str {
119 <&str as From<Self>>::from(self)
120 }
121}
122
123#[derive(PartialEq, Eq, Debug)]
126pub struct BankSnapshotInfo {
127 pub slot: Slot,
129 pub snapshot_kind: BankSnapshotKind,
131 pub snapshot_dir: PathBuf,
133 pub snapshot_version: SnapshotVersion,
135}
136
137impl PartialOrd for BankSnapshotInfo {
138 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
139 Some(self.cmp(other))
140 }
141}
142
143impl Ord for BankSnapshotInfo {
145 fn cmp(&self, other: &Self) -> Ordering {
146 self.slot.cmp(&other.slot)
147 }
148}
149
150impl BankSnapshotInfo {
151 pub fn new_from_dir(
152 bank_snapshots_dir: impl AsRef<Path>,
153 slot: Slot,
154 ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
155 let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
158
159 if !bank_snapshot_dir.is_dir() {
160 return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
161 bank_snapshot_dir,
162 ));
163 }
164
165 if !is_bank_snapshot_complete(&bank_snapshot_dir) {
171 return Err(SnapshotNewFromDirError::IncompleteDir(bank_snapshot_dir));
172 }
173
174 let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
175 if !status_cache_file.is_file() {
176 return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
177 status_cache_file,
178 ));
179 }
180
181 let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
182 let version_str = snapshot_version_from_file(&version_path).or(Err(
183 SnapshotNewFromDirError::MissingVersionFile(version_path),
184 ))?;
185 let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
186 .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
187
188 let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
189 let bank_snapshot_pre_path =
190 bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
191
192 let snapshot_kind = if bank_snapshot_pre_path.is_file() {
201 BankSnapshotKind::Pre
202 } else if bank_snapshot_post_path.is_file() {
203 BankSnapshotKind::Post
204 } else {
205 return Err(SnapshotNewFromDirError::MissingSnapshotFile(
206 bank_snapshot_dir,
207 ));
208 };
209
210 Ok(BankSnapshotInfo {
211 slot,
212 snapshot_kind,
213 snapshot_dir: bank_snapshot_dir,
214 snapshot_version,
215 })
216 }
217
218 pub fn snapshot_path(&self) -> PathBuf {
219 let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot));
220
221 let ext = match self.snapshot_kind {
222 BankSnapshotKind::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
223 BankSnapshotKind::Post => "",
224 };
225 bank_snapshot_path.set_extension(ext);
226
227 bank_snapshot_path
228 }
229}
230#[derive(Debug, Copy, Clone, Eq, PartialEq)]
239pub enum BankSnapshotKind {
240 Pre,
242 Post,
244}
245
246#[derive(Clone, Copy, Debug, Eq, PartialEq)]
250pub enum SnapshotFrom {
251 Archive,
253 Dir,
255}
256
257#[derive(Debug)]
260pub struct SnapshotRootPaths {
261 pub full_snapshot_root_file_path: PathBuf,
262 pub incremental_snapshot_root_file_path: Option<PathBuf>,
263}
264
265#[derive(Debug)]
267pub struct UnarchivedSnapshot {
268 #[allow(dead_code)]
269 unpack_dir: TempDir,
270 pub storage: AccountStorageMap,
271 pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
272 pub measure_untar: Measure,
273}
274
275#[derive(Debug)]
277pub struct UnpackedSnapshotsDirAndVersion {
278 pub unpacked_snapshots_dir: PathBuf,
279 pub snapshot_version: SnapshotVersion,
280}
281
282pub(crate) struct StorageAndNextAppendVecId {
285 pub storage: AccountStorageMap,
286 pub next_append_vec_id: AtomicAppendVecId,
287}
288
289#[derive(Error, Debug)]
290#[allow(clippy::large_enum_variant)]
291pub enum SnapshotError {
292 #[error("I/O error: {0}")]
293 Io(#[from] IoError),
294
295 #[error("AccountsFile error: {0}")]
296 AccountsFileError(#[from] AccountsFileError),
297
298 #[error("serialization error: {0}")]
299 Serialize(#[from] bincode::Error),
300
301 #[error("crossbeam send error: {0}")]
302 CrossbeamSend(#[from] crossbeam_channel::SendError<PathBuf>),
303
304 #[error("archive generation failure {0}")]
305 ArchiveGenerationFailure(ExitStatus),
306
307 #[error("Unpack error: {0}")]
308 UnpackError(#[from] UnpackError),
309
310 #[error("source({1}) - I/O error: {0}")]
311 IoWithSource(IoError, &'static str),
312
313 #[error("could not get file name from path '{0}'")]
314 PathToFileNameError(PathBuf),
315
316 #[error("could not get str from file name '{0}'")]
317 FileNameToStrError(PathBuf),
318
319 #[error("could not parse snapshot archive's file name '{0}'")]
320 ParseSnapshotArchiveFileNameError(String),
321
322 #[error("snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot ({1}) do not match")]
323 MismatchedBaseSlot(Slot, Slot),
324
325 #[error("no snapshot archives to load from '{0}'")]
326 NoSnapshotArchives(PathBuf),
327
328 #[error("snapshot has mismatch: deserialized bank: {0:?}, snapshot archive info: {1:?}")]
329 MismatchedSlotHash((Slot, SnapshotHash), (Slot, SnapshotHash)),
330
331 #[error("snapshot slot deltas are invalid: {0}")]
332 VerifySlotDeltas(#[from] VerifySlotDeltasError),
333
334 #[error("bank_snapshot_info new_from_dir failed: {0}")]
335 NewFromDir(#[from] SnapshotNewFromDirError),
336
337 #[error("invalid snapshot dir path '{0}'")]
338 InvalidSnapshotDirPath(PathBuf),
339
340 #[error("invalid AppendVec path '{0}'")]
341 InvalidAppendVecPath(PathBuf),
342
343 #[error("invalid account path '{0}'")]
344 InvalidAccountPath(PathBuf),
345
346 #[error("no valid snapshot dir found under '{0}'")]
347 NoSnapshotSlotDir(PathBuf),
348
349 #[error("snapshot dir account paths mismatching")]
350 AccountPathsMismatch,
351
352 #[error("failed to add bank snapshot for slot {1}: {0}")]
353 AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
354
355 #[error("failed to archive snapshot package: {0}")]
356 ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
357}
358
359#[derive(Error, Debug)]
360pub enum SnapshotNewFromDirError {
361 #[error("invalid bank snapshot directory '{0}'")]
362 InvalidBankSnapshotDir(PathBuf),
363
364 #[error("missing status cache file '{0}'")]
365 MissingStatusCacheFile(PathBuf),
366
367 #[error("missing version file '{0}'")]
368 MissingVersionFile(PathBuf),
369
370 #[error("invalid snapshot version '{0}'")]
371 InvalidVersion(String),
372
373 #[error("snapshot directory incomplete '{0}'")]
374 IncompleteDir(PathBuf),
375
376 #[error("missing snapshot file '{0}'")]
377 MissingSnapshotFile(PathBuf),
378}
379
380pub type Result<T> = std::result::Result<T, SnapshotError>;
381
382#[derive(Error, Debug, PartialEq, Eq)]
384pub enum VerifySlotDeltasError {
385 #[error("too many entries: {0} (max: {1})")]
386 TooManyEntries(usize, usize),
387
388 #[error("slot {0} is not a root")]
389 SlotIsNotRoot(Slot),
390
391 #[error("slot {0} is greater than bank slot {1}")]
392 SlotGreaterThanMaxRoot(Slot, Slot),
393
394 #[error("slot {0} has multiple entries")]
395 SlotHasMultipleEntries(Slot),
396
397 #[error("slot {0} was not found in slot history")]
398 SlotNotFoundInHistory(Slot),
399
400 #[error("slot {0} was in history but missing from slot deltas")]
401 SlotNotFoundInDeltas(Slot),
402
403 #[error("slot history is bad and cannot be used to verify slot deltas")]
404 BadSlotHistory,
405}
406
407#[derive(Error, Debug)]
409pub enum AddBankSnapshotError {
410 #[error("bank snapshot dir already exists '{0}'")]
411 SnapshotDirAlreadyExists(PathBuf),
412
413 #[error("failed to create snapshot dir '{1}': {0}")]
414 CreateSnapshotDir(#[source] IoError, PathBuf),
415
416 #[error("failed to hard link storages: {0}")]
417 HardLinkStorages(#[source] HardLinkStoragesToSnapshotError),
418
419 #[error("failed to serialize bank: {0}")]
420 SerializeBank(#[source] Box<SnapshotError>),
421
422 #[error("failed to serialize status cache: {0}")]
423 SerializeStatusCache(#[source] Box<SnapshotError>),
424
425 #[error("failed to write snapshot version file '{1}': {0}")]
426 WriteSnapshotVersionFile(#[source] IoError, PathBuf),
427
428 #[error("failed to mark snapshot as 'complete': failed to create file '{1}': {0}")]
429 CreateStateCompleteFile(#[source] IoError, PathBuf),
430}
431
432#[derive(Error, Debug)]
434pub enum ArchiveSnapshotPackageError {
435 #[error("failed to create archive path '{1}': {0}")]
436 CreateArchiveDir(#[source] IoError, PathBuf),
437
438 #[error("failed to create staging dir inside '{1}': {0}")]
439 CreateStagingDir(#[source] IoError, PathBuf),
440
441 #[error("failed to create accounts staging dir '{1}': {0}")]
442 CreateAccountsStagingDir(#[source] IoError, PathBuf),
443
444 #[error("failed to create snapshot staging dir '{1}': {0}")]
445 CreateSnapshotStagingDir(#[source] IoError, PathBuf),
446
447 #[error("failed to canonicalize snapshot source dir '{1}': {0}")]
448 CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),
449
450 #[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
451 SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),
452
453 #[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
454 SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),
455
456 #[error("failed to symlink version file from '{1}' to '{2}': {0}")]
457 SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),
458
459 #[error("failed to flush account storage file '{1}': {0}")]
460 FlushAccountStorageFile(#[source] AccountsFileError, PathBuf),
461
462 #[error("failed to canonicalize account storage file '{1}': {0}")]
463 CanonicalizeAccountStorageFile(#[source] IoError, PathBuf),
464
465 #[error("failed to symlink account storage file from '{1}' to '{2}': {0}")]
466 SymlinkAccountStorageFile(#[source] IoError, PathBuf, PathBuf),
467
468 #[error("account storage staging file is invalid '{0}'")]
469 InvalidAccountStorageStagingFile(PathBuf),
470
471 #[error("failed to create archive file '{1}': {0}")]
472 CreateArchiveFile(#[source] IoError, PathBuf),
473
474 #[error("failed to archive version file: {0}")]
475 ArchiveVersionFile(#[source] IoError),
476
477 #[error("failed to archive snapshots dir: {0}")]
478 ArchiveSnapshotsDir(#[source] IoError),
479
480 #[error("failed to archive accounts dir: {0}")]
481 ArchiveAccountsDir(#[source] IoError),
482
483 #[error("failed to archive snapshot: {0}")]
484 FinishArchive(#[source] IoError),
485
486 #[error("failed to create encoder: {0}")]
487 CreateEncoder(#[source] IoError),
488
489 #[error("failed to encode archive: {0}")]
490 FinishEncoder(#[source] IoError),
491
492 #[error("failed to query archive metadata '{1}': {0}")]
493 QueryArchiveMetadata(#[source] IoError, PathBuf),
494
495 #[error("failed to move archive from '{1}' to '{2}': {0}")]
496 MoveArchive(#[source] IoError, PathBuf, PathBuf),
497}
498
499#[derive(Error, Debug)]
501pub enum HardLinkStoragesToSnapshotError {
502 #[error("failed to create accounts hard links dir '{1}': {0}")]
503 CreateAccountsHardLinksDir(#[source] IoError, PathBuf),
504
505 #[error("failed to flush storage: {0}")]
506 FlushStorage(#[source] AccountsFileError),
507
508 #[error("failed to get the snapshot's accounts hard link dir: {0}")]
509 GetSnapshotHardLinksDir(#[from] GetSnapshotAccountsHardLinkDirError),
510
511 #[error("failed to hard link storage from '{1}' to '{2}': {0}")]
512 HardLinkStorage(#[source] IoError, PathBuf, PathBuf),
513}
514
515#[derive(Error, Debug)]
517pub enum GetSnapshotAccountsHardLinkDirError {
518 #[error("invalid account storage path '{0}'")]
519 GetAccountPath(PathBuf),
520
521 #[error("failed to create the snapshot hard link dir '{1}': {0}")]
522 CreateSnapshotHardLinkDir(#[source] IoError, PathBuf),
523
524 #[error("failed to symlink snapshot hard link dir '{link}' to '{original}': {source}")]
525 SymlinkSnapshotHardLinkDir {
526 source: IoError,
527 original: PathBuf,
528 link: PathBuf,
529 },
530}
531
532pub fn clean_orphaned_account_snapshot_dirs(
540 bank_snapshots_dir: impl AsRef<Path>,
541 account_snapshot_paths: &[PathBuf],
542) -> IoResult<()> {
543 let mut account_snapshot_dirs_referenced = HashSet::new();
546 let snapshots = get_bank_snapshots(bank_snapshots_dir);
547 for snapshot in snapshots {
548 let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
549 let read_dir = fs::read_dir(&account_hardlinks_dir).map_err(|err| {
551 IoError::other(format!(
552 "failed to read account hardlinks dir '{}': {err}",
553 account_hardlinks_dir.display(),
554 ))
555 })?;
556 for entry in read_dir {
557 let path = entry?.path();
558 let target = fs::read_link(&path).map_err(|err| {
559 IoError::other(format!(
560 "failed to read symlink '{}': {err}",
561 path.display(),
562 ))
563 })?;
564 account_snapshot_dirs_referenced.insert(target);
565 }
566 }
567
568 for account_snapshot_path in account_snapshot_paths {
570 let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
571 IoError::other(format!(
572 "failed to read account snapshot dir '{}': {err}",
573 account_snapshot_path.display(),
574 ))
575 })?;
576 for entry in read_dir {
577 let path = entry?.path();
578 if !account_snapshot_dirs_referenced.contains(&path) {
579 info!(
580 "Removing orphaned account snapshot hardlink directory '{}'...",
581 path.display()
582 );
583 move_and_async_delete_path(&path);
584 }
585 }
586 }
587
588 Ok(())
589}
590
591pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
593 let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
594 return;
596 };
597
598 let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
599
600 let incomplete_dirs: Vec<_> = read_dir_iter
601 .filter_map(|entry| entry.ok())
602 .map(|entry| entry.path())
603 .filter(|path| path.is_dir())
604 .filter(is_incomplete)
605 .collect();
606
607 for incomplete_dir in incomplete_dirs {
609 let result = purge_bank_snapshot(&incomplete_dir);
610 match result {
611 Ok(_) => info!(
612 "Purged incomplete snapshot dir: {}",
613 incomplete_dir.display()
614 ),
615 Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
616 }
617 }
618}
619
620fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
622 let state_complete_path = bank_snapshot_dir
623 .as_ref()
624 .join(SNAPSHOT_STATE_COMPLETE_FILENAME);
625 state_complete_path.is_file()
626}
627
628pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
631 if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
632 for entry in entries.flatten() {
633 if entry
634 .file_name()
635 .to_str()
636 .map(|file_name| file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX))
637 .unwrap_or(false)
638 {
639 let path = entry.path();
640 let result = if path.is_dir() {
641 fs::remove_dir_all(&path)
642 } else {
643 fs::remove_file(&path)
644 };
645 if let Err(err) = result {
646 warn!(
647 "Failed to remove temporary snapshot archive '{}': {err}",
648 path.display(),
649 );
650 }
651 }
652 }
653 }
654}
655
656pub fn archive_snapshot_package(
658 snapshot_package: &SnapshotPackage,
659 full_snapshot_archives_dir: impl AsRef<Path>,
660 incremental_snapshot_archives_dir: impl AsRef<Path>,
661 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
662 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
663) -> Result<()> {
664 use ArchiveSnapshotPackageError as E;
665 const SNAPSHOTS_DIR: &str = "snapshots";
666 const ACCOUNTS_DIR: &str = "accounts";
667 info!(
668 "Generating snapshot archive for slot {}",
669 snapshot_package.slot()
670 );
671
672 let mut timer = Measure::start("snapshot_package-package_snapshots");
673 let tar_dir = snapshot_package
674 .path()
675 .parent()
676 .expect("Tar output path is invalid");
677
678 fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
679
680 let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
682 let staging_dir = tempfile::Builder::new()
683 .prefix(&format!(
684 "{}{}-",
685 staging_dir_prefix,
686 snapshot_package.slot()
687 ))
688 .tempdir_in(tar_dir)
689 .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
690
691 let staging_snapshots_dir = staging_dir.path().join(SNAPSHOTS_DIR);
692 let staging_accounts_dir = staging_dir.path().join(ACCOUNTS_DIR);
693
694 fs::create_dir_all(&staging_accounts_dir)
696 .map_err(|err| E::CreateAccountsStagingDir(err, staging_accounts_dir.clone()))?;
697
698 let slot_str = snapshot_package.slot().to_string();
699 let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
700 fs::create_dir_all(&staging_snapshot_dir)
702 .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
703
704 let src_snapshot_dir = &snapshot_package.bank_snapshot_dir;
705 let src_snapshot_dir = src_snapshot_dir
707 .canonicalize()
708 .map_err(|err| E::CanonicalizeSnapshotSourceDir(err, src_snapshot_dir.clone()))?;
709 let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
710 let src_snapshot_file = src_snapshot_dir.join(slot_str);
711 symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
712 .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
713
714 let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
717 let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
718 symlink::symlink_file(&src_status_cache, &staging_status_cache)
719 .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
720
721 let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
723 let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
724 symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
725 E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
726 })?;
727
728 for storage in snapshot_package.snapshot_storages.iter() {
730 let storage_path = storage.get_path();
731 storage
732 .flush()
733 .map_err(|err| E::FlushAccountStorageFile(err, storage_path.clone()))?;
734 let staging_storage_path = staging_accounts_dir.join(AppendVec::file_name(
735 storage.slot(),
736 storage.append_vec_id(),
737 ));
738
739 let src_storage_path = fs::canonicalize(&storage_path)
742 .map_err(|err| E::CanonicalizeAccountStorageFile(err, storage_path))?;
743 symlink::symlink_file(&src_storage_path, &staging_storage_path).map_err(|err| {
744 E::SymlinkAccountStorageFile(err, src_storage_path, staging_storage_path.clone())
745 })?;
746 if !staging_storage_path.is_file() {
747 return Err(E::InvalidAccountStorageStagingFile(staging_storage_path).into());
748 }
749 }
750
751 let archive_path = tar_dir.join(format!(
753 "{}{}.{}",
754 staging_dir_prefix,
755 snapshot_package.slot(),
756 snapshot_package.archive_format().extension(),
757 ));
758
759 {
760 let mut archive_file = fs::File::create(&archive_path)
761 .map_err(|err| E::CreateArchiveFile(err, archive_path.clone()))?;
762
763 let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
764 let mut archive = tar::Builder::new(encoder);
765 archive
768 .append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
769 .map_err(E::ArchiveVersionFile)?;
770 archive
771 .append_dir_all(SNAPSHOTS_DIR, &staging_snapshots_dir)
772 .map_err(E::ArchiveSnapshotsDir)?;
773 archive
774 .append_dir_all(ACCOUNTS_DIR, &staging_accounts_dir)
775 .map_err(E::ArchiveAccountsDir)?;
776 archive.into_inner().map_err(E::FinishArchive)?;
777 Ok(())
778 };
779
780 match snapshot_package.archive_format() {
781 ArchiveFormat::TarBzip2 => {
782 let mut encoder =
783 bzip2::write::BzEncoder::new(archive_file, bzip2::Compression::best());
784 do_archive_files(&mut encoder)?;
785 encoder.finish().map_err(E::FinishEncoder)?;
786 }
787 ArchiveFormat::TarGzip => {
788 let mut encoder =
789 flate2::write::GzEncoder::new(archive_file, flate2::Compression::default());
790 do_archive_files(&mut encoder)?;
791 encoder.finish().map_err(E::FinishEncoder)?;
792 }
793 ArchiveFormat::TarZstd => {
794 let mut encoder =
795 zstd::stream::Encoder::new(archive_file, 0).map_err(E::CreateEncoder)?;
796 do_archive_files(&mut encoder)?;
797 encoder.finish().map_err(E::FinishEncoder)?;
798 }
799 ArchiveFormat::TarLz4 => {
800 let mut encoder = lz4::EncoderBuilder::new()
801 .level(1)
802 .build(archive_file)
803 .map_err(E::CreateEncoder)?;
804 do_archive_files(&mut encoder)?;
805 let (_output, result) = encoder.finish();
806 result.map_err(E::FinishEncoder)?;
807 }
808 ArchiveFormat::Tar => {
809 do_archive_files(&mut archive_file)?;
810 }
811 };
812 }
813
814 let metadata = fs::metadata(&archive_path)
816 .map_err(|err| E::QueryArchiveMetadata(err, archive_path.clone()))?;
817 fs::rename(&archive_path, snapshot_package.path())
818 .map_err(|err| E::MoveArchive(err, archive_path, snapshot_package.path().clone()))?;
819
820 purge_old_snapshot_archives(
821 full_snapshot_archives_dir,
822 incremental_snapshot_archives_dir,
823 maximum_full_snapshot_archives_to_retain,
824 maximum_incremental_snapshot_archives_to_retain,
825 );
826
827 timer.stop();
828 info!(
829 "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
830 snapshot_package.path().display(),
831 snapshot_package.slot(),
832 timer.as_ms(),
833 metadata.len()
834 );
835
836 datapoint_info!(
837 "archive-snapshot-package",
838 ("slot", snapshot_package.slot(), i64),
839 (
840 "archive_format",
841 snapshot_package.archive_format().to_string(),
842 String
843 ),
844 ("duration_ms", timer.as_ms(), i64),
845 (
846 if snapshot_package.snapshot_kind.is_full_snapshot() {
847 "full-snapshot-archive-size"
848 } else {
849 "incremental-snapshot-archive-size"
850 },
851 metadata.len(),
852 i64
853 ),
854 );
855 Ok(())
856}
857
858pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
860 let mut bank_snapshots = Vec::default();
861 match fs::read_dir(&bank_snapshots_dir) {
862 Err(err) => {
863 info!(
864 "Unable to read bank snapshots directory '{}': {err}",
865 bank_snapshots_dir.as_ref().display(),
866 );
867 }
868 Ok(paths) => paths
869 .filter_map(|entry| {
870 entry
873 .ok()
874 .filter(|entry| entry.path().is_dir())
875 .and_then(|entry| {
876 entry
877 .path()
878 .file_name()
879 .and_then(|file_name| file_name.to_str())
880 .and_then(|file_name| file_name.parse::<Slot>().ok())
881 })
882 })
883 .for_each(
884 |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
885 Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
886 Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
889 },
890 ),
891 }
892 bank_snapshots
893}
894
895pub fn get_bank_snapshots_pre(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
899 let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
900 bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Pre);
901 bank_snapshots
902}
903
904pub fn get_bank_snapshots_post(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
908 let mut bank_snapshots = get_bank_snapshots(bank_snapshots_dir);
909 bank_snapshots.retain(|bank_snapshot| bank_snapshot.snapshot_kind == BankSnapshotKind::Post);
910 bank_snapshots
911}
912
913pub fn get_highest_bank_snapshot_pre(
917 bank_snapshots_dir: impl AsRef<Path>,
918) -> Option<BankSnapshotInfo> {
919 do_get_highest_bank_snapshot(get_bank_snapshots_pre(bank_snapshots_dir))
920}
921
922pub fn get_highest_bank_snapshot_post(
926 bank_snapshots_dir: impl AsRef<Path>,
927) -> Option<BankSnapshotInfo> {
928 do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
929}
930
931pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
935 do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
936}
937
938fn do_get_highest_bank_snapshot(
939 mut bank_snapshots: Vec<BankSnapshotInfo>,
940) -> Option<BankSnapshotInfo> {
941 bank_snapshots.sort_unstable();
942 bank_snapshots.into_iter().next_back()
943}
944
945pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
946where
947 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
948{
949 serialize_snapshot_data_file_capped::<F>(
950 data_file_path,
951 MAX_SNAPSHOT_DATA_FILE_SIZE,
952 serializer,
953 )
954}
955
956pub fn deserialize_snapshot_data_file<T: Sized>(
957 data_file_path: &Path,
958 deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
959) -> Result<T> {
960 let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
961 deserializer(streams.full_snapshot_stream)
962 };
963
964 let wrapped_data_file_path = SnapshotRootPaths {
965 full_snapshot_root_file_path: data_file_path.to_path_buf(),
966 incremental_snapshot_root_file_path: None,
967 };
968
969 deserialize_snapshot_data_files_capped(
970 &wrapped_data_file_path,
971 MAX_SNAPSHOT_DATA_FILE_SIZE,
972 wrapped_deserializer,
973 )
974}
975
976pub fn deserialize_snapshot_data_files<T: Sized>(
977 snapshot_root_paths: &SnapshotRootPaths,
978 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
979) -> Result<T> {
980 deserialize_snapshot_data_files_capped(
981 snapshot_root_paths,
982 MAX_SNAPSHOT_DATA_FILE_SIZE,
983 deserializer,
984 )
985}
986
987fn serialize_snapshot_data_file_capped<F>(
988 data_file_path: &Path,
989 maximum_file_size: u64,
990 serializer: F,
991) -> Result<u64>
992where
993 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
994{
995 let data_file = fs::File::create(data_file_path)?;
996 let mut data_file_stream = BufWriter::new(data_file);
997 serializer(&mut data_file_stream)?;
998 data_file_stream.flush()?;
999
1000 let consumed_size = data_file_stream.stream_position()?;
1001 if consumed_size > maximum_file_size {
1002 let error_message = format!(
1003 "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
1004 data_file_path.display(),
1005 );
1006 return Err(IoError::other(error_message).into());
1007 }
1008 Ok(consumed_size)
1009}
1010
1011fn deserialize_snapshot_data_files_capped<T: Sized>(
1012 snapshot_root_paths: &SnapshotRootPaths,
1013 maximum_file_size: u64,
1014 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
1015) -> Result<T> {
1016 let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
1017 create_snapshot_data_file_stream(
1018 &snapshot_root_paths.full_snapshot_root_file_path,
1019 maximum_file_size,
1020 )?;
1021
1022 let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
1023 if let Some(ref incremental_snapshot_root_file_path) =
1024 snapshot_root_paths.incremental_snapshot_root_file_path
1025 {
1026 Some(create_snapshot_data_file_stream(
1027 incremental_snapshot_root_file_path,
1028 maximum_file_size,
1029 )?)
1030 } else {
1031 None
1032 }
1033 .unzip();
1034
1035 let mut snapshot_streams = SnapshotStreams {
1036 full_snapshot_stream: &mut full_snapshot_data_file_stream,
1037 incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
1038 };
1039 let ret = deserializer(&mut snapshot_streams)?;
1040
1041 check_deserialize_file_consumed(
1042 full_snapshot_file_size,
1043 &snapshot_root_paths.full_snapshot_root_file_path,
1044 &mut full_snapshot_data_file_stream,
1045 )?;
1046
1047 if let Some(ref incremental_snapshot_root_file_path) =
1048 snapshot_root_paths.incremental_snapshot_root_file_path
1049 {
1050 check_deserialize_file_consumed(
1051 incremental_snapshot_file_size.unwrap(),
1052 incremental_snapshot_root_file_path,
1053 incremental_snapshot_data_file_stream.as_mut().unwrap(),
1054 )?;
1055 }
1056
1057 Ok(ret)
1058}
1059
1060fn create_snapshot_data_file_stream(
1063 snapshot_root_file_path: impl AsRef<Path>,
1064 maximum_file_size: u64,
1065) -> Result<(u64, BufReader<std::fs::File>)> {
1066 let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
1067
1068 if snapshot_file_size > maximum_file_size {
1069 let error_message = format!(
1070 "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
1071 snapshot_root_file_path.as_ref().display(),
1072 snapshot_file_size,
1073 maximum_file_size,
1074 );
1075 return Err(IoError::other(error_message).into());
1076 }
1077
1078 let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
1079 let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
1080
1081 Ok((snapshot_file_size, snapshot_data_file_stream))
1082}
1083
1084fn check_deserialize_file_consumed(
1087 file_size: u64,
1088 file_path: impl AsRef<Path>,
1089 file_stream: &mut BufReader<std::fs::File>,
1090) -> Result<()> {
1091 let consumed_size = file_stream.stream_position()?;
1092
1093 if consumed_size != file_size {
1094 let error_message = format!(
1095 "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to deserialize",
1096 file_path.as_ref().display(),
1097 file_size,
1098 consumed_size,
1099 );
1100 return Err(IoError::other(error_message).into());
1101 }
1102
1103 Ok(())
1104}
1105
1106fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
1108 let run_path = appendvec_path.parent()?;
1109 let run_file_name = run_path.file_name()?;
1110 if run_file_name != ACCOUNTS_RUN_DIR {
1113 error!(
1114 "The account path {} does not have run/ as its immediate parent directory.",
1115 run_path.display()
1116 );
1117 return None;
1118 }
1119 let account_path = run_path.parent()?;
1120 Some(account_path.to_path_buf())
1121}
1122
1123fn get_snapshot_accounts_hardlink_dir(
1126 appendvec_path: &Path,
1127 bank_slot: Slot,
1128 account_paths: &mut HashSet<PathBuf>,
1129 hardlinks_dir: impl AsRef<Path>,
1130) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1131 let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1132 GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1133 })?;
1134
1135 let snapshot_hardlink_dir = account_path
1136 .join(ACCOUNTS_SNAPSHOT_DIR)
1137 .join(bank_slot.to_string());
1138
1139 if !account_paths.contains(&account_path) {
1142 let idx = account_paths.len();
1143 debug!(
1144 "for appendvec_path {}, create hard-link path {}",
1145 appendvec_path.display(),
1146 snapshot_hardlink_dir.display()
1147 );
1148 fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1149 GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1150 err,
1151 snapshot_hardlink_dir.clone(),
1152 )
1153 })?;
1154 let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1155 symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1156 GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1157 source: err,
1158 original: snapshot_hardlink_dir.clone(),
1159 link: symlink_path,
1160 }
1161 })?;
1162 account_paths.insert(account_path);
1163 };
1164
1165 Ok(snapshot_hardlink_dir)
1166}
1167
1168pub fn hard_link_storages_to_snapshot(
1173 bank_snapshot_dir: impl AsRef<Path>,
1174 bank_slot: Slot,
1175 snapshot_storages: &[Arc<AccountStorageEntry>],
1176) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1177 let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1178 fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1179 HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1180 err,
1181 accounts_hardlinks_dir.clone(),
1182 )
1183 })?;
1184
1185 let mut account_paths: HashSet<PathBuf> = HashSet::new();
1186 for storage in snapshot_storages {
1187 storage
1188 .flush()
1189 .map_err(HardLinkStoragesToSnapshotError::FlushStorage)?;
1190 let storage_path = storage.accounts.get_path();
1191 let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1192 &storage_path,
1193 bank_slot,
1194 &mut account_paths,
1195 &accounts_hardlinks_dir,
1196 )?;
1197 let hardlink_filename = AppendVec::file_name(storage.slot(), storage.append_vec_id());
1200 let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1201 fs::hard_link(&storage_path, &hard_link_path).map_err(|err| {
1202 HardLinkStoragesToSnapshotError::HardLinkStorage(err, storage_path, hard_link_path)
1203 })?;
1204 }
1205 Ok(())
1206}
1207
1208pub(crate) fn get_storages_to_serialize(
1211 snapshot_storages: &[Arc<AccountStorageEntry>],
1212) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1213 snapshot_storages
1214 .iter()
1215 .map(|storage| vec![Arc::clone(storage)])
1216 .collect::<Vec<_>>()
1217}
1218
1219const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
1221
1222pub fn verify_and_unarchive_snapshots(
1224 bank_snapshots_dir: impl AsRef<Path>,
1225 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1226 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1227 account_paths: &[PathBuf],
1228) -> Result<(
1229 UnarchivedSnapshot,
1230 Option<UnarchivedSnapshot>,
1231 AtomicAppendVecId,
1232)> {
1233 check_are_snapshots_compatible(
1234 full_snapshot_archive_info,
1235 incremental_snapshot_archive_info,
1236 )?;
1237
1238 let parallel_divisions = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT);
1239
1240 let next_append_vec_id = Arc::new(AtomicAppendVecId::new(0));
1241 let unarchived_full_snapshot = unarchive_snapshot(
1242 &bank_snapshots_dir,
1243 TMP_SNAPSHOT_ARCHIVE_PREFIX,
1244 full_snapshot_archive_info.path(),
1245 "snapshot untar",
1246 account_paths,
1247 full_snapshot_archive_info.archive_format(),
1248 parallel_divisions,
1249 next_append_vec_id.clone(),
1250 )?;
1251
1252 let unarchived_incremental_snapshot =
1253 if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1254 let unarchived_incremental_snapshot = unarchive_snapshot(
1255 &bank_snapshots_dir,
1256 TMP_SNAPSHOT_ARCHIVE_PREFIX,
1257 incremental_snapshot_archive_info.path(),
1258 "incremental snapshot untar",
1259 account_paths,
1260 incremental_snapshot_archive_info.archive_format(),
1261 parallel_divisions,
1262 next_append_vec_id.clone(),
1263 )?;
1264 Some(unarchived_incremental_snapshot)
1265 } else {
1266 None
1267 };
1268
1269 Ok((
1270 unarchived_full_snapshot,
1271 unarchived_incremental_snapshot,
1272 Arc::try_unwrap(next_append_vec_id).unwrap(),
1273 ))
1274}
1275
1276fn spawn_unpack_snapshot_thread(
1278 file_sender: Sender<PathBuf>,
1279 account_paths: Arc<Vec<PathBuf>>,
1280 ledger_dir: Arc<PathBuf>,
1281 mut archive: Archive<SharedBufferReader>,
1282 parallel_selector: Option<ParallelSelector>,
1283 thread_index: usize,
1284) -> JoinHandle<()> {
1285 Builder::new()
1286 .name(format!("mlnUnpkSnpsht{thread_index:02}"))
1287 .spawn(move || {
1288 hardened_unpack::streaming_unpack_snapshot(
1289 &mut archive,
1290 ledger_dir.as_path(),
1291 &account_paths,
1292 parallel_selector,
1293 &file_sender,
1294 )
1295 .unwrap();
1296 })
1297 .unwrap()
1298}
1299
1300fn streaming_unarchive_snapshot(
1302 file_sender: Sender<PathBuf>,
1303 account_paths: Vec<PathBuf>,
1304 ledger_dir: PathBuf,
1305 snapshot_archive_path: PathBuf,
1306 archive_format: ArchiveFormat,
1307 num_threads: usize,
1308) -> Vec<JoinHandle<()>> {
1309 let account_paths = Arc::new(account_paths);
1310 let ledger_dir = Arc::new(ledger_dir);
1311 let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format);
1312
1313 #[allow(clippy::needless_collect)]
1315 let archives: Vec<_> = (0..num_threads)
1316 .map(|_| {
1317 let reader = SharedBufferReader::new(&shared_buffer);
1318 Archive::new(reader)
1319 })
1320 .collect();
1321
1322 archives
1323 .into_iter()
1324 .enumerate()
1325 .map(|(thread_index, archive)| {
1326 let parallel_selector = Some(ParallelSelector {
1327 index: thread_index,
1328 divisions: num_threads,
1329 });
1330
1331 spawn_unpack_snapshot_thread(
1332 file_sender.clone(),
1333 account_paths.clone(),
1334 ledger_dir.clone(),
1335 archive,
1336 parallel_selector,
1337 thread_index,
1338 )
1339 })
1340 .collect()
1341}
1342
1343fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1348 let snapshots_dir = unpack_dir.as_ref().join("snapshots");
1349 if !snapshots_dir.is_dir() {
1350 return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1351 }
1352
1353 let slot_dir = std::fs::read_dir(&snapshots_dir)
1355 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1356 .find(|entry| entry.as_ref().unwrap().path().is_dir())
1357 .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1358 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1359 .path();
1360
1361 let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
1362 fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
1363
1364 let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
1365 fs::hard_link(
1366 status_cache_file,
1367 slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
1368 )?;
1369
1370 let state_complete_file = slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
1371 fs::File::create(state_complete_file)?;
1372
1373 Ok(())
1374}
1375
1376fn unarchive_snapshot(
1380 bank_snapshots_dir: impl AsRef<Path>,
1381 unpacked_snapshots_dir_prefix: &'static str,
1382 snapshot_archive_path: impl AsRef<Path>,
1383 measure_name: &'static str,
1384 account_paths: &[PathBuf],
1385 archive_format: ArchiveFormat,
1386 parallel_divisions: usize,
1387 next_append_vec_id: Arc<AtomicAppendVecId>,
1388) -> Result<UnarchivedSnapshot> {
1389 let unpack_dir = tempfile::Builder::new()
1390 .prefix(unpacked_snapshots_dir_prefix)
1391 .tempdir_in(bank_snapshots_dir)?;
1392 let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
1393
1394 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1395 streaming_unarchive_snapshot(
1396 file_sender,
1397 account_paths.to_vec(),
1398 unpack_dir.path().to_path_buf(),
1399 snapshot_archive_path.as_ref().to_path_buf(),
1400 archive_format,
1401 parallel_divisions,
1402 );
1403
1404 let num_rebuilder_threads = num_cpus::get_physical()
1405 .saturating_sub(parallel_divisions)
1406 .max(1);
1407 let (version_and_storages, measure_untar) = measure!(
1408 SnapshotStorageRebuilder::rebuild_storage(
1409 file_receiver,
1410 num_rebuilder_threads,
1411 next_append_vec_id,
1412 SnapshotFrom::Archive,
1413 )?,
1414 measure_name
1415 );
1416 info!("{}", measure_untar);
1417
1418 create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1419
1420 let RebuiltSnapshotStorage {
1421 snapshot_version,
1422 storage,
1423 } = version_and_storages;
1424 Ok(UnarchivedSnapshot {
1425 unpack_dir,
1426 storage,
1427 unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1428 unpacked_snapshots_dir,
1429 snapshot_version,
1430 },
1431 measure_untar,
1432 })
1433}
1434
1435fn streaming_snapshot_dir_files(
1438 file_sender: Sender<PathBuf>,
1439 snapshot_file_path: impl Into<PathBuf>,
1440 snapshot_version_path: impl Into<PathBuf>,
1441 account_paths: &[PathBuf],
1442) -> Result<()> {
1443 file_sender.send(snapshot_file_path.into())?;
1444 file_sender.send(snapshot_version_path.into())?;
1445
1446 for account_path in account_paths {
1447 for file in fs::read_dir(account_path)? {
1448 file_sender.send(file?.path())?;
1449 }
1450 }
1451
1452 Ok(())
1453}
1454
1455pub fn rebuild_storages_from_snapshot_dir(
1460 snapshot_info: &BankSnapshotInfo,
1461 account_paths: &[PathBuf],
1462 next_append_vec_id: Arc<AtomicAppendVecId>,
1463) -> Result<AccountStorageMap> {
1464 let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1465 let accounts_hardlinks = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
1466 let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1467
1468 let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1469 IoError::other(format!(
1470 "failed to read accounts hardlinks dir '{}': {err}",
1471 accounts_hardlinks.display(),
1472 ))
1473 })?;
1474 for dir_entry in read_dir {
1475 let symlink_path = dir_entry?.path();
1476 let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
1479 IoError::other(format!(
1480 "failed to read symlink '{}': {err}",
1481 symlink_path.display(),
1482 ))
1483 })?;
1484 let account_run_path = account_snapshot_path
1485 .parent()
1486 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1487 .parent()
1488 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1489 .join(ACCOUNTS_RUN_DIR);
1490 if !account_run_paths.contains(&account_run_path) {
1491 return Err(SnapshotError::AccountPathsMismatch);
1494 }
1495 let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
1498 IoError::other(format!(
1499 "failed to read account snapshot dir '{}': {err}",
1500 account_snapshot_path.display(),
1501 ))
1502 })?;
1503 for file in read_dir {
1504 let file_path = file?.path();
1505 let file_name = file_path
1506 .file_name()
1507 .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
1508 let dest_path = account_run_path.join(file_name);
1509 fs::hard_link(&file_path, &dest_path).map_err(|err| {
1510 IoError::other(format!(
1511 "failed to hard link from '{}' to '{}': {err}",
1512 file_path.display(),
1513 dest_path.display(),
1514 ))
1515 })?;
1516 }
1517 }
1518
1519 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1520 let snapshot_file_path = &snapshot_info.snapshot_path();
1521 let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
1522 streaming_snapshot_dir_files(
1523 file_sender,
1524 snapshot_file_path,
1525 snapshot_version_path,
1526 account_paths,
1527 )?;
1528
1529 let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1530 let version_and_storages = SnapshotStorageRebuilder::rebuild_storage(
1531 file_receiver,
1532 num_rebuilder_threads,
1533 next_append_vec_id,
1534 SnapshotFrom::Dir,
1535 )?;
1536
1537 let RebuiltSnapshotStorage {
1538 snapshot_version: _,
1539 storage,
1540 } = version_and_storages;
1541 Ok(storage)
1542}
1543
1544fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
1548 let file_metadata = fs::metadata(&path).map_err(|err| {
1550 IoError::other(format!(
1551 "failed to query snapshot version file metadata '{}': {err}",
1552 path.as_ref().display(),
1553 ))
1554 })?;
1555 let file_size = file_metadata.len();
1556 if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
1557 let error_message = format!(
1558 "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
1559 path.as_ref().display(),
1560 file_size,
1561 MAX_SNAPSHOT_VERSION_FILE_SIZE,
1562 );
1563 return Err(IoError::other(error_message).into());
1564 }
1565
1566 let mut snapshot_version = String::new();
1568 let mut file = fs::File::open(&path).map_err(|err| {
1569 IoError::other(format!(
1570 "failed to open snapshot version file '{}': {err}",
1571 path.as_ref().display()
1572 ))
1573 })?;
1574 file.read_to_string(&mut snapshot_version).map_err(|err| {
1575 IoError::other(format!(
1576 "failed to read snapshot version from file '{}': {err}",
1577 path.as_ref().display()
1578 ))
1579 })?;
1580
1581 Ok(snapshot_version.trim().to_string())
1582}
1583
1584fn check_are_snapshots_compatible(
1587 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1588 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1589) -> Result<()> {
1590 if incremental_snapshot_archive_info.is_none() {
1591 return Ok(());
1592 }
1593
1594 let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
1595
1596 (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
1597 .then_some(())
1598 .ok_or_else(|| {
1599 SnapshotError::MismatchedBaseSlot(
1600 full_snapshot_archive_info.slot(),
1601 incremental_snapshot_archive_info.base_slot(),
1602 )
1603 })
1604}
1605
1606pub fn path_to_file_name_str(path: &Path) -> Result<&str> {
1608 path.file_name()
1609 .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))?
1610 .to_str()
1611 .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf()))
1612}
1613
1614pub fn build_snapshot_archives_remote_dir(snapshot_archives_dir: impl AsRef<Path>) -> PathBuf {
1615 snapshot_archives_dir
1616 .as_ref()
1617 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
1618}
1619
1620pub fn build_full_snapshot_archive_path(
1623 full_snapshot_archives_dir: impl AsRef<Path>,
1624 slot: Slot,
1625 hash: &SnapshotHash,
1626 archive_format: ArchiveFormat,
1627) -> PathBuf {
1628 full_snapshot_archives_dir.as_ref().join(format!(
1629 "snapshot-{}-{}.{}",
1630 slot,
1631 hash.0,
1632 archive_format.extension(),
1633 ))
1634}
1635
1636pub fn build_incremental_snapshot_archive_path(
1640 incremental_snapshot_archives_dir: impl AsRef<Path>,
1641 base_slot: Slot,
1642 slot: Slot,
1643 hash: &SnapshotHash,
1644 archive_format: ArchiveFormat,
1645) -> PathBuf {
1646 incremental_snapshot_archives_dir.as_ref().join(format!(
1647 "incremental-snapshot-{}-{}-{}.{}",
1648 base_slot,
1649 slot,
1650 hash.0,
1651 archive_format.extension(),
1652 ))
1653}
1654
1655pub(crate) fn parse_full_snapshot_archive_filename(
1657 archive_filename: &str,
1658) -> Result<(Slot, SnapshotHash, ArchiveFormat)> {
1659 lazy_static! {
1660 static ref RE: Regex = Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1661 }
1662
1663 let do_parse = || {
1664 RE.captures(archive_filename).and_then(|captures| {
1665 let slot = captures
1666 .name("slot")
1667 .map(|x| x.as_str().parse::<Slot>())?
1668 .ok()?;
1669 let hash = captures
1670 .name("hash")
1671 .map(|x| x.as_str().parse::<Hash>())?
1672 .ok()?;
1673 let archive_format = captures
1674 .name("ext")
1675 .map(|x| x.as_str().parse::<ArchiveFormat>())?
1676 .ok()?;
1677
1678 Some((slot, SnapshotHash(hash), archive_format))
1679 })
1680 };
1681
1682 do_parse().ok_or_else(|| {
1683 SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
1684 })
1685}
1686
1687pub(crate) fn parse_incremental_snapshot_archive_filename(
1689 archive_filename: &str,
1690) -> Result<(Slot, Slot, SnapshotHash, ArchiveFormat)> {
1691 lazy_static! {
1692 static ref RE: Regex = Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap();
1693 }
1694
1695 let do_parse = || {
1696 RE.captures(archive_filename).and_then(|captures| {
1697 let base_slot = captures
1698 .name("base")
1699 .map(|x| x.as_str().parse::<Slot>())?
1700 .ok()?;
1701 let slot = captures
1702 .name("slot")
1703 .map(|x| x.as_str().parse::<Slot>())?
1704 .ok()?;
1705 let hash = captures
1706 .name("hash")
1707 .map(|x| x.as_str().parse::<Hash>())?
1708 .ok()?;
1709 let archive_format = captures
1710 .name("ext")
1711 .map(|x| x.as_str().parse::<ArchiveFormat>())?
1712 .ok()?;
1713
1714 Some((base_slot, slot, SnapshotHash(hash), archive_format))
1715 })
1716 };
1717
1718 do_parse().ok_or_else(|| {
1719 SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string())
1720 })
1721}
1722
1723fn get_snapshot_archives<T, F>(snapshot_archives_dir: &Path, cb: F) -> Vec<T>
1725where
1726 F: Fn(PathBuf) -> Result<T>,
1727{
1728 let walk_dir = |dir: &Path| -> Vec<T> {
1729 let entry_iter = fs::read_dir(dir);
1730 match entry_iter {
1731 Err(err) => {
1732 info!(
1733 "Unable to read snapshot archives directory '{}': {err}",
1734 dir.display(),
1735 );
1736 vec![]
1737 }
1738 Ok(entries) => entries
1739 .filter_map(|entry| entry.map_or(None, |entry| cb(entry.path()).ok()))
1740 .collect(),
1741 }
1742 };
1743
1744 let mut ret = walk_dir(snapshot_archives_dir);
1745 let remote_dir = build_snapshot_archives_remote_dir(snapshot_archives_dir);
1746 if remote_dir.exists() {
1747 ret.append(&mut walk_dir(remote_dir.as_ref()));
1748 }
1749 ret
1750}
1751
1752pub fn get_full_snapshot_archives(
1754 full_snapshot_archives_dir: impl AsRef<Path>,
1755) -> Vec<FullSnapshotArchiveInfo> {
1756 get_snapshot_archives(
1757 full_snapshot_archives_dir.as_ref(),
1758 FullSnapshotArchiveInfo::new_from_path,
1759 )
1760}
1761
1762pub fn get_incremental_snapshot_archives(
1764 incremental_snapshot_archives_dir: impl AsRef<Path>,
1765) -> Vec<IncrementalSnapshotArchiveInfo> {
1766 get_snapshot_archives(
1767 incremental_snapshot_archives_dir.as_ref(),
1768 IncrementalSnapshotArchiveInfo::new_from_path,
1769 )
1770}
1771
1772pub fn get_highest_full_snapshot_archive_slot(
1774 full_snapshot_archives_dir: impl AsRef<Path>,
1775) -> Option<Slot> {
1776 get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
1777 .map(|full_snapshot_archive_info| full_snapshot_archive_info.slot())
1778}
1779
1780pub fn get_highest_incremental_snapshot_archive_slot(
1783 incremental_snapshot_archives_dir: impl AsRef<Path>,
1784 full_snapshot_slot: Slot,
1785) -> Option<Slot> {
1786 get_highest_incremental_snapshot_archive_info(
1787 incremental_snapshot_archives_dir,
1788 full_snapshot_slot,
1789 )
1790 .map(|incremental_snapshot_archive_info| incremental_snapshot_archive_info.slot())
1791}
1792
1793pub fn get_highest_full_snapshot_archive_info(
1795 full_snapshot_archives_dir: impl AsRef<Path>,
1796) -> Option<FullSnapshotArchiveInfo> {
1797 let mut full_snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
1798 full_snapshot_archives.sort_unstable();
1799 full_snapshot_archives.into_iter().next_back()
1800}
1801
1802pub fn get_highest_incremental_snapshot_archive_info(
1805 incremental_snapshot_archives_dir: impl AsRef<Path>,
1806 full_snapshot_slot: Slot,
1807) -> Option<IncrementalSnapshotArchiveInfo> {
1808 let mut incremental_snapshot_archives =
1812 get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
1813 .into_iter()
1814 .filter(|incremental_snapshot_archive_info| {
1815 incremental_snapshot_archive_info.base_slot() == full_snapshot_slot
1816 })
1817 .collect::<Vec<_>>();
1818 incremental_snapshot_archives.sort_unstable();
1819 incremental_snapshot_archives.into_iter().next_back()
1820}
1821
1822pub fn purge_old_snapshot_archives(
1823 full_snapshot_archives_dir: impl AsRef<Path>,
1824 incremental_snapshot_archives_dir: impl AsRef<Path>,
1825 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
1826 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
1827) {
1828 info!(
1829 "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
1830 full_snapshot_archives_dir.as_ref().display(),
1831 maximum_full_snapshot_archives_to_retain
1832 );
1833
1834 let mut full_snapshot_archives = get_full_snapshot_archives(&full_snapshot_archives_dir);
1835 full_snapshot_archives.sort_unstable();
1836 full_snapshot_archives.reverse();
1837
1838 let num_to_retain = full_snapshot_archives
1839 .len()
1840 .min(maximum_full_snapshot_archives_to_retain.get());
1841 trace!(
1842 "There are {} full snapshot archives, retaining {}",
1843 full_snapshot_archives.len(),
1844 num_to_retain,
1845 );
1846
1847 let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
1848 if full_snapshot_archives.is_empty() {
1849 None
1850 } else {
1851 Some(full_snapshot_archives.split_at(num_to_retain))
1852 }
1853 .unwrap_or_default();
1854
1855 let retained_full_snapshot_slots = full_snapshot_archives_to_retain
1856 .iter()
1857 .map(|ai| ai.slot())
1858 .collect::<HashSet<_>>();
1859
1860 fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
1861 for path in archives.iter().map(|a| a.path()) {
1862 trace!("Removing snapshot archive: {}", path.display());
1863 let result = fs::remove_file(path);
1864 if let Err(err) = result {
1865 info!(
1866 "Failed to remove snapshot archive '{}': {err}",
1867 path.display()
1868 );
1869 }
1870 }
1871 }
1872 remove_archives(full_snapshot_archives_to_remove);
1873
1874 info!(
1875 "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
1876 incremental_snapshot_archives_dir.as_ref().display(),
1877 maximum_incremental_snapshot_archives_to_retain
1878 );
1879 let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
1880 for incremental_snapshot_archive in
1881 get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
1882 {
1883 incremental_snapshot_archives_by_base_slot
1884 .entry(incremental_snapshot_archive.base_slot())
1885 .or_default()
1886 .push(incremental_snapshot_archive)
1887 }
1888
1889 let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
1890 for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
1891 {
1892 incremental_snapshot_archives.sort_unstable();
1893 let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
1894 maximum_incremental_snapshot_archives_to_retain.get()
1895 } else {
1896 usize::from(retained_full_snapshot_slots.contains(&base_slot))
1897 };
1898 trace!(
1899 "There are {} incremental snapshot archives for base slot {}, removing {} of them",
1900 incremental_snapshot_archives.len(),
1901 base_slot,
1902 incremental_snapshot_archives
1903 .len()
1904 .saturating_sub(num_to_retain),
1905 );
1906
1907 incremental_snapshot_archives.truncate(
1908 incremental_snapshot_archives
1909 .len()
1910 .saturating_sub(num_to_retain),
1911 );
1912 remove_archives(&incremental_snapshot_archives);
1913 }
1914}
1915
1916#[cfg(feature = "dev-context-only-utils")]
1917fn unpack_snapshot_local(
1918 shared_buffer: SharedBuffer,
1919 ledger_dir: &Path,
1920 account_paths: &[PathBuf],
1921 parallel_divisions: usize,
1922) -> Result<UnpackedAppendVecMap> {
1923 assert!(parallel_divisions > 0);
1924
1925 let readers = (0..parallel_divisions)
1927 .map(|_| SharedBufferReader::new(&shared_buffer))
1928 .collect::<Vec<_>>();
1929
1930 let all_unpacked_append_vec_map = readers
1932 .into_par_iter()
1933 .enumerate()
1934 .map(|(index, reader)| {
1935 let parallel_selector = Some(ParallelSelector {
1936 index,
1937 divisions: parallel_divisions,
1938 });
1939 let mut archive = Archive::new(reader);
1940 hardened_unpack::unpack_snapshot(
1941 &mut archive,
1942 ledger_dir,
1943 account_paths,
1944 parallel_selector,
1945 )
1946 })
1947 .collect::<Vec<_>>();
1948
1949 let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
1950 for h in all_unpacked_append_vec_map {
1951 unpacked_append_vec_map.extend(h?);
1952 }
1953
1954 Ok(unpacked_append_vec_map)
1955}
1956
1957fn untar_snapshot_create_shared_buffer(
1958 snapshot_tar: &Path,
1959 archive_format: ArchiveFormat,
1960) -> SharedBuffer {
1961 let open_file = || {
1962 fs::File::open(snapshot_tar)
1963 .map_err(|err| {
1964 IoError::other(format!(
1965 "failed to open snapshot archive '{}': {err}",
1966 snapshot_tar.display(),
1967 ))
1968 })
1969 .unwrap()
1970 };
1971 match archive_format {
1972 ArchiveFormat::TarBzip2 => SharedBuffer::new(BzDecoder::new(BufReader::new(open_file()))),
1973 ArchiveFormat::TarGzip => SharedBuffer::new(GzDecoder::new(BufReader::new(open_file()))),
1974 ArchiveFormat::TarZstd => SharedBuffer::new(
1975 zstd::stream::read::Decoder::new(BufReader::new(open_file())).unwrap(),
1976 ),
1977 ArchiveFormat::TarLz4 => {
1978 SharedBuffer::new(lz4::Decoder::new(BufReader::new(open_file())).unwrap())
1979 }
1980 ArchiveFormat::Tar => SharedBuffer::new(BufReader::new(open_file())),
1981 }
1982}
1983
1984#[cfg(feature = "dev-context-only-utils")]
1985fn untar_snapshot_in(
1986 snapshot_tar: impl AsRef<Path>,
1987 unpack_dir: &Path,
1988 account_paths: &[PathBuf],
1989 archive_format: ArchiveFormat,
1990 parallel_divisions: usize,
1991) -> Result<UnpackedAppendVecMap> {
1992 let shared_buffer = untar_snapshot_create_shared_buffer(snapshot_tar.as_ref(), archive_format);
1993 unpack_snapshot_local(shared_buffer, unpack_dir, account_paths, parallel_divisions)
1994}
1995
1996pub fn verify_unpacked_snapshots_dir_and_version(
1997 unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
1998) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
1999 info!(
2000 "snapshot version: {}",
2001 &unpacked_snapshots_dir_and_version.snapshot_version
2002 );
2003
2004 let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
2005 let mut bank_snapshots =
2006 get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
2007 if bank_snapshots.len() > 1 {
2008 return Err(IoError::other(format!(
2009 "invalid snapshot format: only one snapshot allowed, but found {}",
2010 bank_snapshots.len(),
2011 ))
2012 .into());
2013 }
2014 let root_paths = bank_snapshots.pop().ok_or_else(|| {
2015 IoError::other(format!(
2016 "no snapshots found in snapshots directory '{}'",
2017 unpacked_snapshots_dir_and_version
2018 .unpacked_snapshots_dir
2019 .display(),
2020 ))
2021 })?;
2022 Ok((snapshot_version, root_paths))
2023}
2024
2025pub fn get_snapshot_file_name(slot: Slot) -> String {
2027 slot.to_string()
2028}
2029
2030pub fn get_bank_snapshot_dir(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) -> PathBuf {
2032 bank_snapshots_dir
2033 .as_ref()
2034 .join(get_snapshot_file_name(slot))
2035}
2036
2037#[derive(Debug, Copy, Clone)]
2038pub enum VerifyBank {
2040 Deterministic,
2042 NonDeterministic,
2045}
2046
2047#[cfg(feature = "dev-context-only-utils")]
2048pub fn verify_snapshot_archive(
2049 snapshot_archive: impl AsRef<Path>,
2050 snapshots_to_verify: impl AsRef<Path>,
2051 archive_format: ArchiveFormat,
2052 verify_bank: VerifyBank,
2053 slot: Slot,
2054) {
2055 let temp_dir = tempfile::TempDir::new().unwrap();
2056 let unpack_dir = temp_dir.path();
2057 let unpack_account_dir = create_accounts_run_and_snapshot_dirs(unpack_dir).unwrap().0;
2058 untar_snapshot_in(
2059 snapshot_archive,
2060 unpack_dir,
2061 &[unpack_account_dir.clone()],
2062 archive_format,
2063 1,
2064 )
2065 .unwrap();
2066
2067 let unpacked_snapshots = unpack_dir.join("snapshots");
2069
2070 let storages_to_verify = unpack_dir.join("storages_to_verify");
2073 fs::create_dir_all(&storages_to_verify).unwrap();
2075
2076 let slot = slot.to_string();
2077 let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
2078
2079 if let VerifyBank::NonDeterministic = verify_bank {
2080 let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
2082 let p2 = unpacked_snapshots.join(&slot).join(&slot);
2083 assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
2084 fs::remove_file(p1).unwrap();
2085 fs::remove_file(p2).unwrap();
2086 }
2087
2088 let existing_unpacked_status_cache_file =
2094 unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2095 let new_unpacked_status_cache_file = unpacked_snapshots
2096 .join(&slot)
2097 .join(SNAPSHOT_STATUS_CACHE_FILENAME);
2098 fs::rename(
2099 existing_unpacked_status_cache_file,
2100 new_unpacked_status_cache_file,
2101 )
2102 .unwrap();
2103
2104 let accounts_hardlinks_dir = snapshot_slot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2105 if accounts_hardlinks_dir.is_dir() {
2106 for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
2108 let link_dst_path = fs::read_link(entry.unwrap().path()).unwrap();
2109 for entry in fs::read_dir(&link_dst_path).unwrap() {
2111 let src_path = entry.unwrap().path();
2112 let dst_path = storages_to_verify.join(src_path.file_name().unwrap());
2113 fs::copy(src_path, dst_path).unwrap();
2114 }
2115 }
2116 fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
2117 }
2118
2119 let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
2120 if version_path.is_file() {
2121 fs::remove_file(version_path).unwrap();
2122 }
2123
2124 let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2125 if state_complete_path.is_file() {
2126 fs::remove_file(state_complete_path).unwrap();
2127 }
2128
2129 assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
2130
2131 _ = fs::remove_dir(unpack_account_dir.join("accounts"));
2137 assert!(!dir_diff::is_different(&storages_to_verify, unpack_account_dir).unwrap());
2139}
2140
2141pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
2143 let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2144 purge_bank_snapshots(&bank_snapshots);
2145}
2146
2147pub fn purge_old_bank_snapshots(
2149 bank_snapshots_dir: impl AsRef<Path>,
2150 num_bank_snapshots_to_retain: usize,
2151 filter_by_kind: Option<BankSnapshotKind>,
2152) {
2153 let mut bank_snapshots = match filter_by_kind {
2154 Some(BankSnapshotKind::Pre) => get_bank_snapshots_pre(&bank_snapshots_dir),
2155 Some(BankSnapshotKind::Post) => get_bank_snapshots_post(&bank_snapshots_dir),
2156 None => get_bank_snapshots(&bank_snapshots_dir),
2157 };
2158
2159 bank_snapshots.sort_unstable();
2160 purge_bank_snapshots(
2161 bank_snapshots
2162 .iter()
2163 .rev()
2164 .skip(num_bank_snapshots_to_retain),
2165 );
2166}
2167
2168pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
2173 purge_old_bank_snapshots(&bank_snapshots_dir, 0, Some(BankSnapshotKind::Pre));
2174 purge_old_bank_snapshots(&bank_snapshots_dir, 1, Some(BankSnapshotKind::Post));
2175
2176 let highest_bank_snapshot_post = get_highest_bank_snapshot_post(&bank_snapshots_dir);
2177 if let Some(highest_bank_snapshot_post) = highest_bank_snapshot_post {
2178 debug!(
2179 "Retained bank snapshot for slot {}, and purged the rest.",
2180 highest_bank_snapshot_post.slot
2181 );
2182 }
2183}
2184
2185pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
2187 let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
2188 bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
2189 purge_bank_snapshots(&bank_snapshots);
2190}
2191
2192fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
2196 for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
2197 if purge_bank_snapshot(snapshot_dir).is_err() {
2198 warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
2199 }
2200 }
2201}
2202
2203pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
2205 const FN_ERR: &str = "failed to purge bank snapshot";
2206 let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
2207 if accounts_hardlinks_dir.is_dir() {
2208 let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
2211 IoError::other(format!(
2212 "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
2213 accounts_hardlinks_dir.display(),
2214 ))
2215 })?;
2216 for entry in read_dir {
2217 let accounts_hardlink_dir = entry?.path();
2218 let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
2219 IoError::other(format!(
2220 "{FN_ERR}: failed to read symlink '{}': {err}",
2221 accounts_hardlink_dir.display(),
2222 ))
2223 })?;
2224 move_and_async_delete_path(&accounts_hardlink_dir);
2225 }
2226 }
2227 fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
2228 IoError::other(format!(
2229 "{FN_ERR}: failed to remove dir '{}': {err}",
2230 bank_snapshot_dir.as_ref().display(),
2231 ))
2232 })?;
2233 Ok(())
2234}
2235
2236pub fn should_take_full_snapshot(
2237 block_height: Slot,
2238 full_snapshot_archive_interval_slots: Slot,
2239) -> bool {
2240 block_height % full_snapshot_archive_interval_slots == 0
2241}
2242
2243pub fn should_take_incremental_snapshot(
2244 block_height: Slot,
2245 incremental_snapshot_archive_interval_slots: Slot,
2246 last_full_snapshot_slot: Option<Slot>,
2247) -> bool {
2248 block_height % incremental_snapshot_archive_interval_slots == 0
2249 && last_full_snapshot_slot.is_some()
2250}
2251
2252#[cfg(feature = "dev-context-only-utils")]
2257pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
2258 let tmp_dir = tempfile::TempDir::new().unwrap();
2259 let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
2260 (tmp_dir, account_dir)
2261}
2262
2263#[cfg(test)]
2264mod tests {
2265 use {
2266 super::*,
2267 assert_matches::assert_matches,
2268 bincode::{deserialize_from, serialize_into},
2269 std::{convert::TryFrom, mem::size_of},
2270 tempfile::NamedTempFile,
2271 };
2272 #[test]
2273 fn test_serialize_snapshot_data_file_under_limit() {
2274 let temp_dir = tempfile::TempDir::new().unwrap();
2275 let expected_consumed_size = size_of::<u32>() as u64;
2276 let consumed_size = serialize_snapshot_data_file_capped(
2277 &temp_dir.path().join("data-file"),
2278 expected_consumed_size,
2279 |stream| {
2280 serialize_into(stream, &2323_u32)?;
2281 Ok(())
2282 },
2283 )
2284 .unwrap();
2285 assert_eq!(consumed_size, expected_consumed_size);
2286 }
2287
2288 #[test]
2289 fn test_serialize_snapshot_data_file_over_limit() {
2290 let temp_dir = tempfile::TempDir::new().unwrap();
2291 let expected_consumed_size = size_of::<u32>() as u64;
2292 let result = serialize_snapshot_data_file_capped(
2293 &temp_dir.path().join("data-file"),
2294 expected_consumed_size - 1,
2295 |stream| {
2296 serialize_into(stream, &2323_u32)?;
2297 Ok(())
2298 },
2299 );
2300 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
2301 }
2302
2303 #[test]
2304 fn test_deserialize_snapshot_data_file_under_limit() {
2305 let expected_data = 2323_u32;
2306 let expected_consumed_size = size_of::<u32>() as u64;
2307
2308 let temp_dir = tempfile::TempDir::new().unwrap();
2309 serialize_snapshot_data_file_capped(
2310 &temp_dir.path().join("data-file"),
2311 expected_consumed_size,
2312 |stream| {
2313 serialize_into(stream, &expected_data)?;
2314 Ok(())
2315 },
2316 )
2317 .unwrap();
2318
2319 let snapshot_root_paths = SnapshotRootPaths {
2320 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2321 incremental_snapshot_root_file_path: None,
2322 };
2323
2324 let actual_data = deserialize_snapshot_data_files_capped(
2325 &snapshot_root_paths,
2326 expected_consumed_size,
2327 |stream| {
2328 Ok(deserialize_from::<_, u32>(
2329 &mut stream.full_snapshot_stream,
2330 )?)
2331 },
2332 )
2333 .unwrap();
2334 assert_eq!(actual_data, expected_data);
2335 }
2336
2337 #[test]
2338 fn test_deserialize_snapshot_data_file_over_limit() {
2339 let expected_data = 2323_u32;
2340 let expected_consumed_size = size_of::<u32>() as u64;
2341
2342 let temp_dir = tempfile::TempDir::new().unwrap();
2343 serialize_snapshot_data_file_capped(
2344 &temp_dir.path().join("data-file"),
2345 expected_consumed_size,
2346 |stream| {
2347 serialize_into(stream, &expected_data)?;
2348 Ok(())
2349 },
2350 )
2351 .unwrap();
2352
2353 let snapshot_root_paths = SnapshotRootPaths {
2354 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2355 incremental_snapshot_root_file_path: None,
2356 };
2357
2358 let result = deserialize_snapshot_data_files_capped(
2359 &snapshot_root_paths,
2360 expected_consumed_size - 1,
2361 |stream| {
2362 Ok(deserialize_from::<_, u32>(
2363 &mut stream.full_snapshot_stream,
2364 )?)
2365 },
2366 );
2367 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
2368 }
2369
2370 #[test]
2371 fn test_deserialize_snapshot_data_file_extra_data() {
2372 let expected_data = 2323_u32;
2373 let expected_consumed_size = size_of::<u32>() as u64;
2374
2375 let temp_dir = tempfile::TempDir::new().unwrap();
2376 serialize_snapshot_data_file_capped(
2377 &temp_dir.path().join("data-file"),
2378 expected_consumed_size * 2,
2379 |stream| {
2380 serialize_into(stream.by_ref(), &expected_data)?;
2381 serialize_into(stream.by_ref(), &expected_data)?;
2382 Ok(())
2383 },
2384 )
2385 .unwrap();
2386
2387 let snapshot_root_paths = SnapshotRootPaths {
2388 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2389 incremental_snapshot_root_file_path: None,
2390 };
2391
2392 let result = deserialize_snapshot_data_files_capped(
2393 &snapshot_root_paths,
2394 expected_consumed_size * 2,
2395 |stream| {
2396 Ok(deserialize_from::<_, u32>(
2397 &mut stream.full_snapshot_stream,
2398 )?)
2399 },
2400 );
2401 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2402 }
2403
2404 #[test]
2405 fn test_snapshot_version_from_file_under_limit() {
2406 let file_content = SnapshotVersion::default().as_str();
2407 let mut file = NamedTempFile::new().unwrap();
2408 file.write_all(file_content.as_bytes()).unwrap();
2409 let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2410 assert_eq!(version_from_file, file_content);
2411 }
2412
2413 #[test]
2414 fn test_snapshot_version_from_file_over_limit() {
2415 let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2416 let file_content = vec![7u8; over_limit_size];
2417 let mut file = NamedTempFile::new().unwrap();
2418 file.write_all(&file_content).unwrap();
2419 assert_matches!(
2420 snapshot_version_from_file(file.path()),
2421 Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("snapshot version file too large")
2422 );
2423 }
2424
2425 #[test]
2426 fn test_parse_full_snapshot_archive_filename() {
2427 assert_eq!(
2428 parse_full_snapshot_archive_filename(&format!(
2429 "snapshot-42-{}.tar.bz2",
2430 Hash::default()
2431 ))
2432 .unwrap(),
2433 (42, SnapshotHash(Hash::default()), ArchiveFormat::TarBzip2)
2434 );
2435 assert_eq!(
2436 parse_full_snapshot_archive_filename(&format!(
2437 "snapshot-43-{}.tar.zst",
2438 Hash::default()
2439 ))
2440 .unwrap(),
2441 (43, SnapshotHash(Hash::default()), ArchiveFormat::TarZstd)
2442 );
2443 assert_eq!(
2444 parse_full_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default()))
2445 .unwrap(),
2446 (44, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2447 );
2448 assert_eq!(
2449 parse_full_snapshot_archive_filename(&format!(
2450 "snapshot-45-{}.tar.lz4",
2451 Hash::default()
2452 ))
2453 .unwrap(),
2454 (45, SnapshotHash(Hash::default()), ArchiveFormat::TarLz4)
2455 );
2456
2457 assert!(parse_full_snapshot_archive_filename("invalid").is_err());
2458 assert!(
2459 parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err()
2460 );
2461
2462 assert!(
2463 parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err()
2464 );
2465 assert!(parse_full_snapshot_archive_filename(&format!(
2466 "snapshot-12345678-{}.bad!ext",
2467 Hash::new_unique()
2468 ))
2469 .is_err());
2470 assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2471
2472 assert!(parse_full_snapshot_archive_filename(&format!(
2473 "snapshot-bad!slot-{}.bad!ext",
2474 Hash::new_unique()
2475 ))
2476 .is_err());
2477 assert!(parse_full_snapshot_archive_filename(&format!(
2478 "snapshot-12345678-{}.bad!ext",
2479 Hash::new_unique()
2480 ))
2481 .is_err());
2482 assert!(parse_full_snapshot_archive_filename(&format!(
2483 "snapshot-bad!slot-{}.tar",
2484 Hash::new_unique()
2485 ))
2486 .is_err());
2487
2488 assert!(parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_err());
2489 assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err());
2490 assert!(parse_full_snapshot_archive_filename(&format!(
2491 "snapshot-bad!slot-{}.tar",
2492 Hash::new_unique()
2493 ))
2494 .is_err());
2495 }
2496
2497 #[test]
2498 fn test_parse_incremental_snapshot_archive_filename() {
2499 assert_eq!(
2500 parse_incremental_snapshot_archive_filename(&format!(
2501 "incremental-snapshot-42-123-{}.tar.bz2",
2502 Hash::default()
2503 ))
2504 .unwrap(),
2505 (
2506 42,
2507 123,
2508 SnapshotHash(Hash::default()),
2509 ArchiveFormat::TarBzip2
2510 )
2511 );
2512 assert_eq!(
2513 parse_incremental_snapshot_archive_filename(&format!(
2514 "incremental-snapshot-43-234-{}.tar.zst",
2515 Hash::default()
2516 ))
2517 .unwrap(),
2518 (
2519 43,
2520 234,
2521 SnapshotHash(Hash::default()),
2522 ArchiveFormat::TarZstd
2523 )
2524 );
2525 assert_eq!(
2526 parse_incremental_snapshot_archive_filename(&format!(
2527 "incremental-snapshot-44-345-{}.tar",
2528 Hash::default()
2529 ))
2530 .unwrap(),
2531 (44, 345, SnapshotHash(Hash::default()), ArchiveFormat::Tar)
2532 );
2533 assert_eq!(
2534 parse_incremental_snapshot_archive_filename(&format!(
2535 "incremental-snapshot-45-456-{}.tar.lz4",
2536 Hash::default()
2537 ))
2538 .unwrap(),
2539 (
2540 45,
2541 456,
2542 SnapshotHash(Hash::default()),
2543 ArchiveFormat::TarLz4
2544 )
2545 );
2546
2547 assert!(parse_incremental_snapshot_archive_filename("invalid").is_err());
2548 assert!(parse_incremental_snapshot_archive_filename(&format!(
2549 "snapshot-42-{}.tar",
2550 Hash::new_unique()
2551 ))
2552 .is_err());
2553 assert!(parse_incremental_snapshot_archive_filename(
2554 "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext"
2555 )
2556 .is_err());
2557
2558 assert!(parse_incremental_snapshot_archive_filename(&format!(
2559 "incremental-snapshot-bad!slot-56785678-{}.tar",
2560 Hash::new_unique()
2561 ))
2562 .is_err());
2563
2564 assert!(parse_incremental_snapshot_archive_filename(&format!(
2565 "incremental-snapshot-12345678-bad!slot-{}.tar",
2566 Hash::new_unique()
2567 ))
2568 .is_err());
2569
2570 assert!(parse_incremental_snapshot_archive_filename(
2571 "incremental-snapshot-12341234-56785678-bad!HASH.tar"
2572 )
2573 .is_err());
2574
2575 assert!(parse_incremental_snapshot_archive_filename(&format!(
2576 "incremental-snapshot-12341234-56785678-{}.bad!ext",
2577 Hash::new_unique()
2578 ))
2579 .is_err());
2580 }
2581
2582 #[test]
2583 fn test_check_are_snapshots_compatible() {
2584 let slot1: Slot = 1234;
2585 let slot2: Slot = 5678;
2586 let slot3: Slot = 999_999;
2587
2588 let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
2589 format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()),
2590 ))
2591 .unwrap();
2592
2593 assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
2594
2595 let incremental_snapshot_archive_info =
2596 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2597 "/dir/incremental-snapshot-{}-{}-{}.tar",
2598 slot1,
2599 slot2,
2600 Hash::new_unique()
2601 )))
2602 .unwrap();
2603
2604 assert!(check_are_snapshots_compatible(
2605 &full_snapshot_archive_info,
2606 Some(&incremental_snapshot_archive_info)
2607 )
2608 .is_ok());
2609
2610 let incremental_snapshot_archive_info =
2611 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2612 "/dir/incremental-snapshot-{}-{}-{}.tar",
2613 slot2,
2614 slot3,
2615 Hash::new_unique()
2616 )))
2617 .unwrap();
2618
2619 assert!(check_are_snapshots_compatible(
2620 &full_snapshot_archive_info,
2621 Some(&incremental_snapshot_archive_info)
2622 )
2623 .is_err());
2624 }
2625
2626 fn common_create_bank_snapshot_files(
2628 bank_snapshots_dir: &Path,
2629 min_slot: Slot,
2630 max_slot: Slot,
2631 ) {
2632 for slot in min_slot..max_slot {
2633 let snapshot_dir = get_bank_snapshot_dir(bank_snapshots_dir, slot);
2634 fs::create_dir_all(&snapshot_dir).unwrap();
2635
2636 let snapshot_filename = get_snapshot_file_name(slot);
2637 let snapshot_path = snapshot_dir.join(snapshot_filename);
2638 fs::File::create(snapshot_path).unwrap();
2639
2640 let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
2641 fs::File::create(status_cache_file).unwrap();
2642
2643 let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
2644 fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
2645
2646 let state_complete_path = snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
2648 fs::File::create(state_complete_path).unwrap();
2649 }
2650 }
2651
2652 #[test]
2653 fn test_get_bank_snapshots() {
2654 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2655 let min_slot = 10;
2656 let max_slot = 20;
2657 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2658
2659 let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
2660 assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
2661 }
2662
2663 #[test]
2664 fn test_get_highest_bank_snapshot_post() {
2665 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2666 let min_slot = 99;
2667 let max_slot = 123;
2668 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2669
2670 let highest_bank_snapshot = get_highest_bank_snapshot_post(temp_snapshots_dir.path());
2671 assert!(highest_bank_snapshot.is_some());
2672 assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
2673 }
2674
2675 fn common_create_snapshot_archive_files(
2681 full_snapshot_archives_dir: &Path,
2682 incremental_snapshot_archives_dir: &Path,
2683 min_full_snapshot_slot: Slot,
2684 max_full_snapshot_slot: Slot,
2685 min_incremental_snapshot_slot: Slot,
2686 max_incremental_snapshot_slot: Slot,
2687 ) {
2688 fs::create_dir_all(full_snapshot_archives_dir).unwrap();
2689 fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
2690 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
2691 for incremental_snapshot_slot in
2692 min_incremental_snapshot_slot..max_incremental_snapshot_slot
2693 {
2694 let snapshot_filename = format!(
2695 "incremental-snapshot-{}-{}-{}.tar",
2696 full_snapshot_slot,
2697 incremental_snapshot_slot,
2698 Hash::default()
2699 );
2700 let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
2701 fs::File::create(snapshot_filepath).unwrap();
2702 }
2703
2704 let snapshot_filename =
2705 format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
2706 let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
2707 fs::File::create(snapshot_filepath).unwrap();
2708
2709 let bad_filename = format!(
2711 "incremental-snapshot-{}-{}-bad!hash.tar",
2712 full_snapshot_slot,
2713 max_incremental_snapshot_slot + 1,
2714 );
2715 let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
2716 fs::File::create(bad_filepath).unwrap();
2717 }
2718
2719 let bad_filename = format!("snapshot-{}-bad!hash.tar", max_full_snapshot_slot + 1);
2722 let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
2723 fs::File::create(bad_filepath).unwrap();
2724 }
2725
2726 #[test]
2727 fn test_get_full_snapshot_archives() {
2728 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2729 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2730 let min_slot = 123;
2731 let max_slot = 456;
2732 common_create_snapshot_archive_files(
2733 full_snapshot_archives_dir.path(),
2734 incremental_snapshot_archives_dir.path(),
2735 min_slot,
2736 max_slot,
2737 0,
2738 0,
2739 );
2740
2741 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2742 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
2743 }
2744
2745 #[test]
2746 fn test_get_full_snapshot_archives_remote() {
2747 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2748 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2749 let min_slot = 123;
2750 let max_slot = 456;
2751 common_create_snapshot_archive_files(
2752 &full_snapshot_archives_dir
2753 .path()
2754 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
2755 &incremental_snapshot_archives_dir
2756 .path()
2757 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
2758 min_slot,
2759 max_slot,
2760 0,
2761 0,
2762 );
2763
2764 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2765 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
2766 assert!(snapshot_archives.iter().all(|info| info.is_remote()));
2767 }
2768
2769 #[test]
2770 fn test_get_incremental_snapshot_archives() {
2771 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2772 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2773 let min_full_snapshot_slot = 12;
2774 let max_full_snapshot_slot = 23;
2775 let min_incremental_snapshot_slot = 34;
2776 let max_incremental_snapshot_slot = 45;
2777 common_create_snapshot_archive_files(
2778 full_snapshot_archives_dir.path(),
2779 incremental_snapshot_archives_dir.path(),
2780 min_full_snapshot_slot,
2781 max_full_snapshot_slot,
2782 min_incremental_snapshot_slot,
2783 max_incremental_snapshot_slot,
2784 );
2785
2786 let incremental_snapshot_archives =
2787 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
2788 assert_eq!(
2789 incremental_snapshot_archives.len() as Slot,
2790 (max_full_snapshot_slot - min_full_snapshot_slot)
2791 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
2792 );
2793 }
2794
2795 #[test]
2796 fn test_get_incremental_snapshot_archives_remote() {
2797 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2798 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2799 let min_full_snapshot_slot = 12;
2800 let max_full_snapshot_slot = 23;
2801 let min_incremental_snapshot_slot = 34;
2802 let max_incremental_snapshot_slot = 45;
2803 common_create_snapshot_archive_files(
2804 &full_snapshot_archives_dir
2805 .path()
2806 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
2807 &incremental_snapshot_archives_dir
2808 .path()
2809 .join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
2810 min_full_snapshot_slot,
2811 max_full_snapshot_slot,
2812 min_incremental_snapshot_slot,
2813 max_incremental_snapshot_slot,
2814 );
2815
2816 let incremental_snapshot_archives =
2817 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
2818 assert_eq!(
2819 incremental_snapshot_archives.len() as Slot,
2820 (max_full_snapshot_slot - min_full_snapshot_slot)
2821 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
2822 );
2823 assert!(incremental_snapshot_archives
2824 .iter()
2825 .all(|info| info.is_remote()));
2826 }
2827
2828 #[test]
2829 fn test_get_highest_full_snapshot_archive_slot() {
2830 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2831 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2832 let min_slot = 123;
2833 let max_slot = 456;
2834 common_create_snapshot_archive_files(
2835 full_snapshot_archives_dir.path(),
2836 incremental_snapshot_archives_dir.path(),
2837 min_slot,
2838 max_slot,
2839 0,
2840 0,
2841 );
2842
2843 assert_eq!(
2844 get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
2845 Some(max_slot - 1)
2846 );
2847 }
2848
2849 #[test]
2850 fn test_get_highest_incremental_snapshot_slot() {
2851 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2852 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2853 let min_full_snapshot_slot = 12;
2854 let max_full_snapshot_slot = 23;
2855 let min_incremental_snapshot_slot = 34;
2856 let max_incremental_snapshot_slot = 45;
2857 common_create_snapshot_archive_files(
2858 full_snapshot_archives_dir.path(),
2859 incremental_snapshot_archives_dir.path(),
2860 min_full_snapshot_slot,
2861 max_full_snapshot_slot,
2862 min_incremental_snapshot_slot,
2863 max_incremental_snapshot_slot,
2864 );
2865
2866 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
2867 assert_eq!(
2868 get_highest_incremental_snapshot_archive_slot(
2869 incremental_snapshot_archives_dir.path(),
2870 full_snapshot_slot
2871 ),
2872 Some(max_incremental_snapshot_slot - 1)
2873 );
2874 }
2875
2876 assert_eq!(
2877 get_highest_incremental_snapshot_archive_slot(
2878 incremental_snapshot_archives_dir.path(),
2879 max_full_snapshot_slot
2880 ),
2881 None
2882 );
2883 }
2884
2885 fn common_test_purge_old_snapshot_archives(
2886 snapshot_names: &[&String],
2887 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2888 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2889 expected_snapshots: &[&String],
2890 ) {
2891 let temp_snap_dir = tempfile::TempDir::new().unwrap();
2892
2893 for snap_name in snapshot_names {
2894 let snap_path = temp_snap_dir.path().join(snap_name);
2895 let mut _snap_file = fs::File::create(snap_path);
2896 }
2897 purge_old_snapshot_archives(
2898 temp_snap_dir.path(),
2899 temp_snap_dir.path(),
2900 maximum_full_snapshot_archives_to_retain,
2901 maximum_incremental_snapshot_archives_to_retain,
2902 );
2903
2904 let mut retained_snaps = HashSet::new();
2905 for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
2906 let entry_path_buf = entry.unwrap().path();
2907 let entry_path = entry_path_buf.as_path();
2908 let snapshot_name = entry_path
2909 .file_name()
2910 .unwrap()
2911 .to_str()
2912 .unwrap()
2913 .to_string();
2914 retained_snaps.insert(snapshot_name);
2915 }
2916
2917 for snap_name in expected_snapshots {
2918 assert!(
2919 retained_snaps.contains(snap_name.as_str()),
2920 "{snap_name} not found"
2921 );
2922 }
2923 assert_eq!(retained_snaps.len(), expected_snapshots.len());
2924 }
2925
2926 #[test]
2927 fn test_purge_old_full_snapshot_archives() {
2928 let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
2929 let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
2930 let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
2931 let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
2932
2933 let expected_snapshots = vec![&snap3_name];
2935 common_test_purge_old_snapshot_archives(
2936 &snapshot_names,
2937 NonZeroUsize::new(1).unwrap(),
2938 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
2939 &expected_snapshots,
2940 );
2941
2942 let expected_snapshots = vec![&snap2_name, &snap3_name];
2944 common_test_purge_old_snapshot_archives(
2945 &snapshot_names,
2946 NonZeroUsize::new(2).unwrap(),
2947 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
2948 &expected_snapshots,
2949 );
2950
2951 let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
2953 common_test_purge_old_snapshot_archives(
2954 &snapshot_names,
2955 NonZeroUsize::new(3).unwrap(),
2956 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
2957 &expected_snapshots,
2958 );
2959 }
2960
2961 #[test]
2965 fn test_purge_old_full_snapshot_archives_in_the_loop() {
2966 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2967 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2968 let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
2969 let starting_slot: Slot = 42;
2970
2971 for slot in (starting_slot..).take(100) {
2972 let full_snapshot_archive_file_name =
2973 format!("snapshot-{}-{}.tar", slot, Hash::default());
2974 let full_snapshot_archive_path = full_snapshot_archives_dir
2975 .as_ref()
2976 .join(full_snapshot_archive_file_name);
2977 fs::File::create(full_snapshot_archive_path).unwrap();
2978
2979 if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
2981 continue;
2982 }
2983
2984 if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
2986 continue;
2987 }
2988
2989 purge_old_snapshot_archives(
2990 &full_snapshot_archives_dir,
2991 &incremental_snapshot_archives_dir,
2992 maximum_snapshots_to_retain,
2993 NonZeroUsize::new(usize::MAX).unwrap(),
2994 );
2995 let mut full_snapshot_archives =
2996 get_full_snapshot_archives(&full_snapshot_archives_dir);
2997 full_snapshot_archives.sort_unstable();
2998 assert_eq!(
2999 full_snapshot_archives.len(),
3000 maximum_snapshots_to_retain.get()
3001 );
3002 assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
3003 for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
3004 assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
3005 }
3006 }
3007 }
3008
3009 #[test]
3010 fn test_purge_old_incremental_snapshot_archives() {
3011 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3012 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3013 let starting_slot = 100_000;
3014
3015 let maximum_incremental_snapshot_archives_to_retain =
3016 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3017 let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
3018
3019 let incremental_snapshot_interval = 100;
3020 let num_incremental_snapshots_per_full_snapshot =
3021 maximum_incremental_snapshot_archives_to_retain.get() * 2;
3022 let full_snapshot_interval =
3023 incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
3024
3025 let mut snapshot_filenames = vec![];
3026 (starting_slot..)
3027 .step_by(full_snapshot_interval)
3028 .take(
3029 maximum_full_snapshot_archives_to_retain
3030 .checked_mul(NonZeroUsize::new(2).unwrap())
3031 .unwrap()
3032 .get(),
3033 )
3034 .for_each(|full_snapshot_slot| {
3035 let snapshot_filename =
3036 format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
3037 let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
3038 fs::File::create(snapshot_path).unwrap();
3039 snapshot_filenames.push(snapshot_filename);
3040
3041 (full_snapshot_slot..)
3042 .step_by(incremental_snapshot_interval)
3043 .take(num_incremental_snapshots_per_full_snapshot)
3044 .skip(1)
3045 .for_each(|incremental_snapshot_slot| {
3046 let snapshot_filename = format!(
3047 "incremental-snapshot-{}-{}-{}.tar",
3048 full_snapshot_slot,
3049 incremental_snapshot_slot,
3050 Hash::default()
3051 );
3052 let snapshot_path = incremental_snapshot_archives_dir
3053 .path()
3054 .join(&snapshot_filename);
3055 fs::File::create(snapshot_path).unwrap();
3056 snapshot_filenames.push(snapshot_filename);
3057 });
3058 });
3059
3060 purge_old_snapshot_archives(
3061 full_snapshot_archives_dir.path(),
3062 incremental_snapshot_archives_dir.path(),
3063 maximum_full_snapshot_archives_to_retain,
3064 maximum_incremental_snapshot_archives_to_retain,
3065 );
3066
3067 let mut remaining_full_snapshot_archives =
3069 get_full_snapshot_archives(full_snapshot_archives_dir.path());
3070 assert_eq!(
3071 remaining_full_snapshot_archives.len(),
3072 maximum_full_snapshot_archives_to_retain.get(),
3073 );
3074 remaining_full_snapshot_archives.sort_unstable();
3075 let latest_full_snapshot_archive_slot =
3076 remaining_full_snapshot_archives.last().unwrap().slot();
3077
3078 let mut remaining_incremental_snapshot_archives =
3083 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3084 assert_eq!(
3085 remaining_incremental_snapshot_archives.len(),
3086 maximum_incremental_snapshot_archives_to_retain
3087 .get()
3088 .saturating_add(
3089 maximum_full_snapshot_archives_to_retain
3090 .get()
3091 .saturating_sub(1)
3092 )
3093 );
3094 remaining_incremental_snapshot_archives.sort_unstable();
3095 remaining_incremental_snapshot_archives.reverse();
3096
3097 for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
3099 let incremental_snapshot_archive =
3100 remaining_incremental_snapshot_archives.pop().unwrap();
3101
3102 let expected_base_slot =
3103 latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
3104 assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
3105 let expected_slot = expected_base_slot
3106 + (full_snapshot_interval - incremental_snapshot_interval) as u64;
3107 assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
3108 }
3109
3110 for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
3112 assert_eq!(
3113 incremental_snapshot_archive.base_slot(),
3114 latest_full_snapshot_archive_slot
3115 );
3116 }
3117
3118 let expected_remaining_incremental_snapshot_archive_slots =
3120 (latest_full_snapshot_archive_slot..)
3121 .step_by(incremental_snapshot_interval)
3122 .take(num_incremental_snapshots_per_full_snapshot)
3123 .skip(
3124 num_incremental_snapshots_per_full_snapshot
3125 - maximum_incremental_snapshot_archives_to_retain.get(),
3126 )
3127 .collect::<HashSet<_>>();
3128
3129 let actual_remaining_incremental_snapshot_archive_slots =
3130 remaining_incremental_snapshot_archives
3131 .iter()
3132 .map(|snapshot| snapshot.slot())
3133 .collect::<HashSet<_>>();
3134 assert_eq!(
3135 actual_remaining_incremental_snapshot_archive_slots,
3136 expected_remaining_incremental_snapshot_archive_slots
3137 );
3138 }
3139
3140 #[test]
3141 fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
3142 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3143 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
3144
3145 for snapshot_filenames in [
3146 format!("incremental-snapshot-100-120-{}.tar", Hash::default()),
3147 format!("incremental-snapshot-100-140-{}.tar", Hash::default()),
3148 format!("incremental-snapshot-100-160-{}.tar", Hash::default()),
3149 format!("incremental-snapshot-100-180-{}.tar", Hash::default()),
3150 format!("incremental-snapshot-200-220-{}.tar", Hash::default()),
3151 format!("incremental-snapshot-200-240-{}.tar", Hash::default()),
3152 format!("incremental-snapshot-200-260-{}.tar", Hash::default()),
3153 format!("incremental-snapshot-200-280-{}.tar", Hash::default()),
3154 ] {
3155 let snapshot_path = incremental_snapshot_archives_dir
3156 .path()
3157 .join(snapshot_filenames);
3158 fs::File::create(snapshot_path).unwrap();
3159 }
3160
3161 purge_old_snapshot_archives(
3162 full_snapshot_archives_dir.path(),
3163 incremental_snapshot_archives_dir.path(),
3164 NonZeroUsize::new(usize::MAX).unwrap(),
3165 NonZeroUsize::new(usize::MAX).unwrap(),
3166 );
3167
3168 let remaining_incremental_snapshot_archives =
3169 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
3170 assert!(remaining_incremental_snapshot_archives.is_empty());
3171 }
3172
3173 #[test]
3174 fn test_get_snapshot_accounts_hardlink_dir() {
3175 let slot: Slot = 1;
3176
3177 let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
3178
3179 let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
3180 let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
3181 let accounts_hardlinks_dir = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
3182 fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
3183
3184 let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
3185 let appendvec_filename = format!("{slot}.0");
3186 let appendvec_path = accounts_dir.join(appendvec_filename);
3187
3188 let ret = get_snapshot_accounts_hardlink_dir(
3189 &appendvec_path,
3190 slot,
3191 &mut account_paths_set,
3192 &accounts_hardlinks_dir,
3193 );
3194 assert!(ret.is_ok());
3195
3196 let wrong_appendvec_path = appendvec_path
3197 .parent()
3198 .unwrap()
3199 .parent()
3200 .unwrap()
3201 .join(appendvec_path.file_name().unwrap());
3202 let ret = get_snapshot_accounts_hardlink_dir(
3203 &wrong_appendvec_path,
3204 slot,
3205 &mut account_paths_set,
3206 accounts_hardlinks_dir,
3207 );
3208
3209 assert_matches!(
3210 ret,
3211 Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
3212 );
3213 }
3214}