1#[cfg(feature = "dev-context-only-utils")]
2use solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs;
3use {
4 crate::{
5 bank::{BankFieldsToDeserialize, BankFieldsToSerialize, BankHashStats, BankSlotDelta},
6 serde_snapshot::{
7 self, AccountsDbFields, ExtraFieldsToSerialize, SerdeObsoleteAccountsMap,
8 SerializableAccountStorageEntry, SnapshotAccountsDbFields, SnapshotBankFields,
9 SnapshotStreams,
10 },
11 snapshot_package::SnapshotPackage,
12 snapshot_utils::snapshot_storage_rebuilder::{
13 get_slot_and_append_vec_id, SnapshotStorageRebuilder,
14 },
15 },
16 agave_snapshots::{
17 archive_snapshot,
18 error::{
19 AddBankSnapshotError, GetSnapshotAccountsHardLinkDirError,
20 HardLinkStoragesToSnapshotError, SnapshotError, SnapshotFastbootError,
21 SnapshotNewFromDirError,
22 },
23 paths::{self as snapshot_paths, get_incremental_snapshot_archives},
24 snapshot_archive_info::{
25 FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfo,
26 SnapshotArchiveInfoGetter,
27 },
28 snapshot_config::SnapshotConfig,
29 streaming_unarchive_snapshot, ArchiveFormat, Result, SnapshotKind, SnapshotVersion,
30 },
31 crossbeam_channel::{Receiver, Sender},
32 log::*,
33 regex::Regex,
34 semver::Version,
35 solana_accounts_db::{
36 account_storage::AccountStorageMap,
37 accounts_db::{AccountStorageEntry, AccountsDbConfig, AtomicAccountsFileId},
38 accounts_file::{AccountsFile, StorageAccess},
39 utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
40 },
41 solana_clock::Slot,
42 solana_measure::{measure::Measure, measure_time, measure_us},
43 std::{
44 cmp::Ordering,
45 collections::{HashMap, HashSet},
46 fs,
47 io::{self, BufReader, BufWriter, Error as IoError, Read, Seek, Write},
48 mem,
49 num::NonZeroUsize,
50 path::{Path, PathBuf},
51 str::FromStr,
52 sync::{Arc, LazyLock},
53 },
54 tempfile::TempDir,
55};
56
57pub mod snapshot_storage_rebuilder;
58
59pub const MAX_OBSOLETE_ACCOUNTS_FILE_SIZE: u64 = 1024 * 1024 * 1024 * 12; pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; const SNAPSHOT_FASTBOOT_VERSION: Version = Version::new(2, 0, 0);
74
75#[derive(PartialEq, Eq, Debug)]
78pub struct BankSnapshotInfo {
79 pub slot: Slot,
81 pub snapshot_dir: PathBuf,
83 pub snapshot_version: SnapshotVersion,
85 pub fastboot_version: Option<Version>,
87}
88
89impl PartialOrd for BankSnapshotInfo {
90 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
91 Some(self.cmp(other))
92 }
93}
94
95impl Ord for BankSnapshotInfo {
97 fn cmp(&self, other: &Self) -> Ordering {
98 self.slot.cmp(&other.slot)
99 }
100}
101
102impl BankSnapshotInfo {
103 pub fn new_from_dir(
104 bank_snapshots_dir: impl AsRef<Path>,
105 slot: Slot,
106 ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
107 let bank_snapshot_dir = snapshot_paths::get_bank_snapshot_dir(&bank_snapshots_dir, slot);
110
111 if !bank_snapshot_dir.is_dir() {
112 return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
113 bank_snapshot_dir,
114 ));
115 }
116
117 let version_path = bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
124 let version_str = snapshot_version_from_file(&version_path).map_err(|err| {
125 SnapshotNewFromDirError::IncompleteDir(err, bank_snapshot_dir.clone())
126 })?;
127
128 let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
129 .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
130
131 let status_cache_file =
132 bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_STATUS_CACHE_FILENAME);
133 if !status_cache_file.is_file() {
134 return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
135 status_cache_file,
136 ));
137 }
138
139 let bank_snapshot_path =
140 bank_snapshot_dir.join(snapshot_paths::get_snapshot_file_name(slot));
141 if !bank_snapshot_path.is_file() {
142 return Err(SnapshotNewFromDirError::MissingSnapshotFile(
143 bank_snapshot_dir,
144 ));
145 };
146
147 let snapshot_fastboot_version_path =
148 bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_FASTBOOT_VERSION_FILENAME);
149
150 let fastboot_version = fs::read_to_string(&snapshot_fastboot_version_path)
154 .ok()
155 .map(|version_string| {
156 Version::from_str(version_string.trim())
157 .map_err(|_| SnapshotNewFromDirError::InvalidFastbootVersion(version_string))
158 })
159 .transpose()?;
160
161 Ok(BankSnapshotInfo {
162 slot,
163 snapshot_dir: bank_snapshot_dir,
164 snapshot_version,
165 fastboot_version,
166 })
167 }
168
169 pub fn snapshot_path(&self) -> PathBuf {
170 self.snapshot_dir
171 .join(snapshot_paths::get_snapshot_file_name(self.slot))
172 }
173}
174
175#[derive(Clone, Copy, Debug, Eq, PartialEq)]
179pub enum SnapshotFrom {
180 Archive,
182 Dir,
184}
185
186#[derive(Debug)]
189pub struct SnapshotRootPaths {
190 pub full_snapshot_root_file_path: PathBuf,
191 pub incremental_snapshot_root_file_path: Option<PathBuf>,
192}
193
194#[derive(Debug)]
196pub struct UnarchivedSnapshot {
197 #[allow(dead_code)]
198 unpack_dir: TempDir,
199 pub storage: AccountStorageMap,
200 pub bank_fields: BankFieldsToDeserialize,
201 pub accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
202 pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
203 pub measure_untar: Measure,
204}
205
206#[derive(Debug)]
208pub struct UnarchivedSnapshots {
209 pub full_storage: AccountStorageMap,
210 pub incremental_storage: Option<AccountStorageMap>,
211 pub bank_fields: SnapshotBankFields,
212 pub accounts_db_fields: SnapshotAccountsDbFields<SerializableAccountStorageEntry>,
213 pub full_unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
214 pub incremental_unpacked_snapshots_dir_and_version: Option<UnpackedSnapshotsDirAndVersion>,
215 pub full_measure_untar: Measure,
216 pub incremental_measure_untar: Option<Measure>,
217 pub next_append_vec_id: AtomicAccountsFileId,
218}
219
220#[allow(dead_code)]
223#[derive(Debug)]
224pub struct UnarchivedSnapshotsGuard {
225 full_unpack_dir: TempDir,
226 incremental_unpack_dir: Option<TempDir>,
227}
228#[derive(Debug)]
230pub struct UnpackedSnapshotsDirAndVersion {
231 pub unpacked_snapshots_dir: PathBuf,
232 pub snapshot_version: SnapshotVersion,
233}
234
235pub(crate) struct StorageAndNextAccountsFileId {
238 pub storage: AccountStorageMap,
239 pub next_append_vec_id: AtomicAccountsFileId,
240}
241
242pub fn clean_orphaned_account_snapshot_dirs(
250 bank_snapshots_dir: impl AsRef<Path>,
251 account_snapshot_paths: &[PathBuf],
252) -> io::Result<()> {
253 let mut account_snapshot_dirs_referenced = HashSet::new();
256 let snapshots = get_bank_snapshots(bank_snapshots_dir);
257 for snapshot in snapshots {
258 let account_hardlinks_dir = snapshot
259 .snapshot_dir
260 .join(snapshot_paths::SNAPSHOT_ACCOUNTS_HARDLINKS);
261 let Ok(read_dir) = fs::read_dir(&account_hardlinks_dir) else {
263 debug!(
267 "failed to read account hardlinks dir '{}'",
268 account_hardlinks_dir.display(),
269 );
270 continue;
271 };
272 for entry in read_dir {
273 let path = entry?.path();
274 let target = fs::read_link(&path).map_err(|err| {
275 IoError::other(format!(
276 "failed to read symlink '{}': {err}",
277 path.display(),
278 ))
279 })?;
280 account_snapshot_dirs_referenced.insert(target);
281 }
282 }
283
284 for account_snapshot_path in account_snapshot_paths {
286 let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
287 IoError::other(format!(
288 "failed to read account snapshot dir '{}': {err}",
289 account_snapshot_path.display(),
290 ))
291 })?;
292 for entry in read_dir {
293 let path = entry?.path();
294 if !account_snapshot_dirs_referenced.contains(&path) {
295 info!(
296 "Removing orphaned account snapshot hardlink directory '{}'...",
297 path.display()
298 );
299 move_and_async_delete_path(&path);
300 }
301 }
302 }
303
304 Ok(())
305}
306
307pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
309 let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
310 return;
312 };
313
314 let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
315
316 let incomplete_dirs: Vec<_> = read_dir_iter
317 .filter_map(|entry| entry.ok())
318 .map(|entry| entry.path())
319 .filter(|path| path.is_dir())
320 .filter(is_incomplete)
321 .collect();
322
323 for incomplete_dir in incomplete_dirs {
325 let result = purge_bank_snapshot(&incomplete_dir);
326 match result {
327 Ok(_) => info!(
328 "Purged incomplete snapshot dir: {}",
329 incomplete_dir.display()
330 ),
331 Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
332 }
333 }
334}
335
336fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
338 let version_path = bank_snapshot_dir
339 .as_ref()
340 .join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
341 version_path.is_file()
342}
343
344pub fn write_full_snapshot_slot_file(
346 bank_snapshot_dir: impl AsRef<Path>,
347 full_snapshot_slot: Slot,
348) -> io::Result<()> {
349 let full_snapshot_slot_path = bank_snapshot_dir
350 .as_ref()
351 .join(snapshot_paths::SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
352 fs::write(
353 &full_snapshot_slot_path,
354 Slot::to_le_bytes(full_snapshot_slot),
355 )
356 .map_err(|err| {
357 IoError::other(format!(
358 "failed to write full snapshot slot file '{}': {err}",
359 full_snapshot_slot_path.display(),
360 ))
361 })
362}
363
364pub fn read_full_snapshot_slot_file(bank_snapshot_dir: impl AsRef<Path>) -> io::Result<Slot> {
366 const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
367 let full_snapshot_slot_path = bank_snapshot_dir
368 .as_ref()
369 .join(snapshot_paths::SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
370 let full_snapshot_slot_file_metadata = fs::metadata(&full_snapshot_slot_path)?;
371 if full_snapshot_slot_file_metadata.len() != SLOT_SIZE as u64 {
372 let error_message = format!(
373 "invalid full snapshot slot file size: '{}' has {} bytes (should be {} bytes)",
374 full_snapshot_slot_path.display(),
375 full_snapshot_slot_file_metadata.len(),
376 SLOT_SIZE,
377 );
378 return Err(IoError::other(error_message));
379 }
380 let mut full_snapshot_slot_file = fs::File::open(&full_snapshot_slot_path)?;
381 let mut buffer = [0; SLOT_SIZE];
382 full_snapshot_slot_file.read_exact(&mut buffer)?;
383 let slot = Slot::from_le_bytes(buffer);
384 Ok(slot)
385}
386
387pub fn mark_bank_snapshot_as_loadable(bank_snapshot_dir: impl AsRef<Path>) -> io::Result<()> {
389 let snapshot_fastboot_version_path = bank_snapshot_dir
390 .as_ref()
391 .join(snapshot_paths::SNAPSHOT_FASTBOOT_VERSION_FILENAME);
392 fs::write(
393 &snapshot_fastboot_version_path,
394 SNAPSHOT_FASTBOOT_VERSION.to_string(),
395 )
396 .map_err(|err| {
397 IoError::other(format!(
398 "failed to write fastboot version file '{}': {err}",
399 snapshot_fastboot_version_path.display(),
400 ))
401 })?;
402 Ok(())
403}
404
405fn is_bank_snapshot_loadable(
407 bank_snapshot_dir: impl AsRef<Path>,
408 fastboot_version: Option<&Version>,
409) -> std::result::Result<bool, SnapshotFastbootError> {
410 let flushed_storages = bank_snapshot_dir
413 .as_ref()
414 .join(snapshot_paths::SNAPSHOT_STORAGES_FLUSHED_FILENAME);
415 if flushed_storages.is_file() {
416 return Ok(true);
417 }
418
419 if let Some(fastboot_version) = fastboot_version {
420 is_snapshot_fastboot_compatible(fastboot_version)
421 } else {
422 Ok(false)
424 }
425}
426
427fn is_snapshot_fastboot_compatible(
429 version: &Version,
430) -> std::result::Result<bool, SnapshotFastbootError> {
431 if version.major <= SNAPSHOT_FASTBOOT_VERSION.major {
432 Ok(true)
433 } else {
434 Err(SnapshotFastbootError::IncompatibleVersion(version.clone()))
435 }
436}
437
438pub fn get_highest_loadable_bank_snapshot(
442 snapshot_config: &SnapshotConfig,
443) -> Option<BankSnapshotInfo> {
444 let highest_bank_snapshot = get_highest_bank_snapshot(&snapshot_config.bank_snapshots_dir)?;
445
446 let is_bank_snapshot_loadable = is_bank_snapshot_loadable(
447 &highest_bank_snapshot.snapshot_dir,
448 highest_bank_snapshot.fastboot_version.as_ref(),
449 );
450
451 match is_bank_snapshot_loadable {
452 Ok(true) => Some(highest_bank_snapshot),
453 Ok(false) => None,
454 Err(err) => {
455 warn!(
456 "Bank snapshot is not loadable '{}': {err}",
457 highest_bank_snapshot.snapshot_dir.display()
458 );
459 None
460 }
461 }
462}
463
464pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
467 if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
468 for entry in entries.flatten() {
469 if entry
470 .file_name()
471 .to_str()
472 .map(|file_name| file_name.starts_with(snapshot_paths::TMP_SNAPSHOT_ARCHIVE_PREFIX))
473 .unwrap_or(false)
474 {
475 let path = entry.path();
476 let result = if path.is_dir() {
477 fs::remove_dir_all(&path)
478 } else {
479 fs::remove_file(&path)
480 };
481 if let Err(err) = result {
482 warn!(
483 "Failed to remove temporary snapshot archive '{}': {err}",
484 path.display(),
485 );
486 }
487 }
488 }
489 }
490}
491
492pub fn serialize_and_archive_snapshot_package(
494 snapshot_package: SnapshotPackage,
495 snapshot_config: &SnapshotConfig,
496 should_flush_and_hard_link_storages: bool,
497) -> Result<SnapshotArchiveInfo> {
498 let SnapshotPackage {
499 snapshot_kind,
500 slot: snapshot_slot,
501 block_height,
502 hash: snapshot_hash,
503 mut snapshot_storages,
504 status_cache_slot_deltas,
505 bank_fields_to_serialize,
506 bank_hash_stats,
507 write_version,
508 enqueued: _,
509 } = snapshot_package;
510
511 let bank_snapshot_info = serialize_snapshot(
512 &snapshot_config.bank_snapshots_dir,
513 snapshot_config.snapshot_version,
514 snapshot_storages.as_slice(),
515 status_cache_slot_deltas.as_slice(),
516 bank_fields_to_serialize,
517 bank_hash_stats,
518 write_version,
519 should_flush_and_hard_link_storages,
520 )?;
521
522 let full_snapshot_archive_slot = match snapshot_kind {
524 SnapshotKind::FullSnapshot => snapshot_slot,
525 SnapshotKind::IncrementalSnapshot(base_slot) => base_slot,
526 };
527 write_full_snapshot_slot_file(&bank_snapshot_info.snapshot_dir, full_snapshot_archive_slot)
528 .map_err(|err| {
529 IoError::other(format!(
530 "failed to serialize snapshot slot {snapshot_slot}, block height {block_height}, \
531 kind {snapshot_kind:?}: {err}",
532 ))
533 })?;
534
535 let snapshot_archive_path = match snapshot_package.snapshot_kind {
536 SnapshotKind::FullSnapshot => snapshot_paths::build_full_snapshot_archive_path(
537 &snapshot_config.full_snapshot_archives_dir,
538 snapshot_package.slot,
539 &snapshot_package.hash,
540 snapshot_config.archive_format,
541 ),
542 SnapshotKind::IncrementalSnapshot(incremental_snapshot_base_slot) => {
543 snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
546 snapshot_paths::build_incremental_snapshot_archive_path(
547 &snapshot_config.incremental_snapshot_archives_dir,
548 incremental_snapshot_base_slot,
549 snapshot_package.slot,
550 &snapshot_package.hash,
551 snapshot_config.archive_format,
552 )
553 }
554 };
555
556 let snapshot_archive_info = archive_snapshot(
557 snapshot_kind,
558 snapshot_slot,
559 snapshot_hash,
560 snapshot_storages.as_slice(),
561 &bank_snapshot_info.snapshot_dir,
562 snapshot_archive_path,
563 snapshot_config.archive_format,
564 )?;
565
566 Ok(snapshot_archive_info)
567}
568
569#[allow(clippy::too_many_arguments)]
571fn serialize_snapshot(
572 bank_snapshots_dir: impl AsRef<Path>,
573 snapshot_version: SnapshotVersion,
574 snapshot_storages: &[Arc<AccountStorageEntry>],
575 slot_deltas: &[BankSlotDelta],
576 mut bank_fields: BankFieldsToSerialize,
577 bank_hash_stats: BankHashStats,
578 write_version: u64,
579 should_flush_and_hard_link_storages: bool,
580) -> Result<BankSnapshotInfo> {
581 let slot = bank_fields.slot;
582
583 let do_serialize_snapshot = || {
586 let mut measure_everything = Measure::start("");
587 let bank_snapshot_dir = snapshot_paths::get_bank_snapshot_dir(&bank_snapshots_dir, slot);
588 if bank_snapshot_dir.exists() {
589 return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
590 bank_snapshot_dir,
591 ));
592 }
593 fs::create_dir_all(&bank_snapshot_dir).map_err(|err| {
594 AddBankSnapshotError::CreateSnapshotDir(err, bank_snapshot_dir.clone())
595 })?;
596
597 let bank_snapshot_path =
599 bank_snapshot_dir.join(snapshot_paths::get_snapshot_file_name(slot));
600 info!(
601 "Creating bank snapshot for slot {slot} at '{}'",
602 bank_snapshot_path.display(),
603 );
604
605 let bank_snapshot_serializer = move |stream: &mut BufWriter<fs::File>| -> Result<()> {
606 let versioned_epoch_stakes = mem::take(&mut bank_fields.versioned_epoch_stakes);
607 let extra_fields = ExtraFieldsToSerialize {
608 lamports_per_signature: bank_fields.fee_rate_governor.lamports_per_signature,
609 obsolete_incremental_snapshot_persistence: None,
610 obsolete_epoch_accounts_hash: None,
611 versioned_epoch_stakes,
612 accounts_lt_hash: Some(bank_fields.accounts_lt_hash.clone().into()),
613 };
614 serde_snapshot::serialize_bank_snapshot_into(
615 stream,
616 bank_fields,
617 bank_hash_stats,
618 &get_storages_to_serialize(snapshot_storages),
619 extra_fields,
620 write_version,
621 )?;
622 Ok(())
623 };
624 let (bank_snapshot_consumed_size, bank_serialize) = measure_time!(
625 serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
626 .map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
627 "bank serialize"
628 );
629
630 let status_cache_path =
631 bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_STATUS_CACHE_FILENAME);
632 let (status_cache_consumed_size, status_cache_serialize_us) = measure_us!(
633 serde_snapshot::serialize_status_cache(slot_deltas, &status_cache_path)
634 .map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?
635 );
636
637 let version_path = bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
638 let (_, write_version_file_us) = measure_us!(fs::write(
639 &version_path,
640 snapshot_version.as_str().as_bytes(),
641 )
642 .map_err(|err| AddBankSnapshotError::WriteSnapshotVersionFile(err, version_path))?);
643
644 let (flush_storages_us, hard_link_storages_us, serialize_obsolete_accounts_us) =
645 if should_flush_and_hard_link_storages {
646 let flush_measure = Measure::start("");
647 for storage in snapshot_storages {
648 storage.flush().map_err(|err| {
649 AddBankSnapshotError::FlushStorage(err, storage.path().to_path_buf())
650 })?;
651 }
652 let flush_us = flush_measure.end_as_us();
653 let (_, hard_link_us) = measure_us!(hard_link_storages_to_snapshot(
654 &bank_snapshot_dir,
655 slot,
656 snapshot_storages
657 )
658 .map_err(AddBankSnapshotError::HardLinkStorages)?);
659
660 let (_, serialize_obsolete_accounts_us) = measure_us!({
661 write_obsolete_accounts_to_snapshot(&bank_snapshot_dir, snapshot_storages, slot)
662 .map_err(|err| {
663 AddBankSnapshotError::SerializeObsoleteAccounts(Box::new(err))
664 })?
665 });
666
667 mark_bank_snapshot_as_loadable(&bank_snapshot_dir)
668 .map_err(AddBankSnapshotError::MarkSnapshotLoadable)?;
669
670 (
671 Some(flush_us),
672 Some(hard_link_us),
673 Some(serialize_obsolete_accounts_us),
674 )
675 } else {
676 (None, None, None)
677 };
678
679 measure_everything.stop();
680
681 datapoint_info!(
683 "snapshot_bank",
684 ("slot", slot, i64),
685 ("bank_size", bank_snapshot_consumed_size, i64),
686 ("status_cache_size", status_cache_consumed_size, i64),
687 ("flush_storages_us", flush_storages_us, Option<i64>),
688 ("hard_link_storages_us", hard_link_storages_us, Option<i64>),
689 ("serialize_obsolete_accounts_us", serialize_obsolete_accounts_us, Option<i64>),
690 ("bank_serialize_us", bank_serialize.as_us(), i64),
691 ("status_cache_serialize_us", status_cache_serialize_us, i64),
692 ("write_version_file_us", write_version_file_us, i64),
693 ("total_us", measure_everything.as_us(), i64),
694 );
695
696 info!(
697 "{} for slot {} at {}",
698 bank_serialize,
699 slot,
700 bank_snapshot_path.display(),
701 );
702
703 Ok(BankSnapshotInfo {
704 slot,
705 snapshot_dir: bank_snapshot_dir,
706 snapshot_version,
707 fastboot_version: None,
708 })
709 };
710
711 do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
712}
713
714pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
716 let mut bank_snapshots = Vec::default();
717 match fs::read_dir(&bank_snapshots_dir) {
718 Err(err) => {
719 info!(
720 "Unable to read bank snapshots directory '{}': {err}",
721 bank_snapshots_dir.as_ref().display(),
722 );
723 }
724 Ok(paths) => paths
725 .filter_map(|entry| {
726 entry
729 .ok()
730 .filter(|entry| entry.path().is_dir())
731 .and_then(|entry| {
732 entry
733 .path()
734 .file_name()
735 .and_then(|file_name| file_name.to_str())
736 .and_then(|file_name| file_name.parse::<Slot>().ok())
737 })
738 })
739 .for_each(
740 |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
741 Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
742 Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
745 },
746 ),
747 }
748 bank_snapshots
749}
750
751pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
755 do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
756}
757
758fn do_get_highest_bank_snapshot(
759 mut bank_snapshots: Vec<BankSnapshotInfo>,
760) -> Option<BankSnapshotInfo> {
761 bank_snapshots.sort_unstable();
762 bank_snapshots.into_iter().next_back()
763}
764
765pub fn write_obsolete_accounts_to_snapshot(
766 bank_snapshot_dir: impl AsRef<Path>,
767 snapshot_storages: &[Arc<AccountStorageEntry>],
768 snapshot_slot: Slot,
769) -> Result<u64> {
770 let obsolete_accounts =
771 SerdeObsoleteAccountsMap::new_from_storages(snapshot_storages, snapshot_slot);
772 serialize_obsolete_accounts(
773 bank_snapshot_dir,
774 &obsolete_accounts,
775 MAX_OBSOLETE_ACCOUNTS_FILE_SIZE,
776 )
777}
778
779fn serialize_obsolete_accounts(
780 bank_snapshot_dir: impl AsRef<Path>,
781 obsolete_accounts_map: &SerdeObsoleteAccountsMap,
782 maximum_obsolete_accounts_file_size: u64,
783) -> Result<u64> {
784 let obsolete_accounts_path = bank_snapshot_dir
785 .as_ref()
786 .join(snapshot_paths::SNAPSHOT_OBSOLETE_ACCOUNTS_FILENAME);
787 let obsolete_accounts_file = fs::File::create(&obsolete_accounts_path)?;
788 let mut file_stream = BufWriter::new(obsolete_accounts_file);
789
790 serde_snapshot::serialize_into(&mut file_stream, obsolete_accounts_map)?;
791
792 file_stream.flush()?;
793
794 let consumed_size = file_stream.stream_position()?;
795 if consumed_size > maximum_obsolete_accounts_file_size {
796 let error_message = format!(
797 "too large obsolete accounts file to serialize: '{}' has {consumed_size} bytes, max \
798 size is {maximum_obsolete_accounts_file_size}",
799 obsolete_accounts_path.display(),
800 );
801 return Err(IoError::other(error_message).into());
802 }
803 Ok(consumed_size)
804}
805
806fn deserialize_obsolete_accounts(
807 bank_snapshot_dir: impl AsRef<Path>,
808 maximum_obsolete_accounts_file_size: u64,
809) -> Result<SerdeObsoleteAccountsMap> {
810 let obsolete_accounts_path = bank_snapshot_dir
811 .as_ref()
812 .join(snapshot_paths::SNAPSHOT_OBSOLETE_ACCOUNTS_FILENAME);
813 let obsolete_accounts_file = fs::File::open(&obsolete_accounts_path)?;
814 let obsolete_accounts_file_metadata = fs::metadata(&obsolete_accounts_path)?;
816 if obsolete_accounts_file_metadata.len() > maximum_obsolete_accounts_file_size {
817 let error_message = format!(
818 "too large obsolete accounts file to deserialize: '{}' has {} bytes (max size is \
819 {maximum_obsolete_accounts_file_size} bytes)",
820 obsolete_accounts_path.display(),
821 obsolete_accounts_file_metadata.len(),
822 );
823 return Err(IoError::other(error_message).into());
824 }
825
826 let mut data_file_stream = BufReader::new(obsolete_accounts_file);
827
828 let obsolete_accounts = serde_snapshot::deserialize_from(&mut data_file_stream)?;
829
830 Ok(obsolete_accounts)
831}
832
833pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
834where
835 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
836{
837 serialize_snapshot_data_file_capped::<F>(
838 data_file_path,
839 MAX_SNAPSHOT_DATA_FILE_SIZE,
840 serializer,
841 )
842}
843
844pub fn deserialize_snapshot_data_file<T: Sized>(
845 data_file_path: &Path,
846 deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
847) -> Result<T> {
848 let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
849 deserializer(streams.full_snapshot_stream)
850 };
851
852 let wrapped_data_file_path = SnapshotRootPaths {
853 full_snapshot_root_file_path: data_file_path.to_path_buf(),
854 incremental_snapshot_root_file_path: None,
855 };
856
857 deserialize_snapshot_data_files_capped(
858 &wrapped_data_file_path,
859 MAX_SNAPSHOT_DATA_FILE_SIZE,
860 wrapped_deserializer,
861 )
862}
863
864pub fn deserialize_snapshot_data_files<T: Sized>(
865 snapshot_root_paths: &SnapshotRootPaths,
866 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
867) -> Result<T> {
868 deserialize_snapshot_data_files_capped(
869 snapshot_root_paths,
870 MAX_SNAPSHOT_DATA_FILE_SIZE,
871 deserializer,
872 )
873}
874
875fn serialize_snapshot_data_file_capped<F>(
876 data_file_path: &Path,
877 maximum_file_size: u64,
878 serializer: F,
879) -> Result<u64>
880where
881 F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
882{
883 let data_file = fs::File::create(data_file_path)?;
884 let mut data_file_stream = BufWriter::new(data_file);
885 serializer(&mut data_file_stream)?;
886 data_file_stream.flush()?;
887
888 let consumed_size = data_file_stream.stream_position()?;
889 if consumed_size > maximum_file_size {
890 let error_message = format!(
891 "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
892 data_file_path.display(),
893 );
894 return Err(IoError::other(error_message).into());
895 }
896 Ok(consumed_size)
897}
898
899fn deserialize_snapshot_data_files_capped<T: Sized>(
900 snapshot_root_paths: &SnapshotRootPaths,
901 maximum_file_size: u64,
902 deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
903) -> Result<T> {
904 let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
905 create_snapshot_data_file_stream(
906 &snapshot_root_paths.full_snapshot_root_file_path,
907 maximum_file_size,
908 )?;
909
910 let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
911 if let Some(ref incremental_snapshot_root_file_path) =
912 snapshot_root_paths.incremental_snapshot_root_file_path
913 {
914 Some(create_snapshot_data_file_stream(
915 incremental_snapshot_root_file_path,
916 maximum_file_size,
917 )?)
918 } else {
919 None
920 }
921 .unzip();
922
923 let mut snapshot_streams = SnapshotStreams {
924 full_snapshot_stream: &mut full_snapshot_data_file_stream,
925 incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
926 };
927 let ret = deserializer(&mut snapshot_streams)?;
928
929 check_deserialize_file_consumed(
930 full_snapshot_file_size,
931 &snapshot_root_paths.full_snapshot_root_file_path,
932 &mut full_snapshot_data_file_stream,
933 )?;
934
935 if let Some(ref incremental_snapshot_root_file_path) =
936 snapshot_root_paths.incremental_snapshot_root_file_path
937 {
938 check_deserialize_file_consumed(
939 incremental_snapshot_file_size.unwrap(),
940 incremental_snapshot_root_file_path,
941 incremental_snapshot_data_file_stream.as_mut().unwrap(),
942 )?;
943 }
944
945 Ok(ret)
946}
947
948fn create_snapshot_data_file_stream(
951 snapshot_root_file_path: impl AsRef<Path>,
952 maximum_file_size: u64,
953) -> Result<(u64, BufReader<std::fs::File>)> {
954 let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
955
956 if snapshot_file_size > maximum_file_size {
957 let error_message = format!(
958 "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
959 snapshot_root_file_path.as_ref().display(),
960 snapshot_file_size,
961 maximum_file_size,
962 );
963 return Err(IoError::other(error_message).into());
964 }
965
966 let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
967 let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
968
969 Ok((snapshot_file_size, snapshot_data_file_stream))
970}
971
972fn check_deserialize_file_consumed(
975 file_size: u64,
976 file_path: impl AsRef<Path>,
977 file_stream: &mut BufReader<std::fs::File>,
978) -> Result<()> {
979 let consumed_size = file_stream.stream_position()?;
980
981 if consumed_size != file_size {
982 let error_message = format!(
983 "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to \
984 deserialize",
985 file_path.as_ref().display(),
986 file_size,
987 consumed_size,
988 );
989 return Err(IoError::other(error_message).into());
990 }
991
992 Ok(())
993}
994
995fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
997 let run_path = appendvec_path.parent()?;
998 let run_file_name = run_path.file_name()?;
999 if run_file_name != ACCOUNTS_RUN_DIR {
1002 error!(
1003 "The account path {} does not have run/ as its immediate parent directory.",
1004 run_path.display()
1005 );
1006 return None;
1007 }
1008 let account_path = run_path.parent()?;
1009 Some(account_path.to_path_buf())
1010}
1011
1012fn get_snapshot_accounts_hardlink_dir(
1015 appendvec_path: &Path,
1016 bank_slot: Slot,
1017 account_paths: &mut HashSet<PathBuf>,
1018 hardlinks_dir: impl AsRef<Path>,
1019) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
1020 let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
1021 GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
1022 })?;
1023
1024 let snapshot_hardlink_dir = account_path
1025 .join(ACCOUNTS_SNAPSHOT_DIR)
1026 .join(bank_slot.to_string());
1027
1028 if !account_paths.contains(&account_path) {
1031 let idx = account_paths.len();
1032 debug!(
1033 "for appendvec_path {}, create hard-link path {}",
1034 appendvec_path.display(),
1035 snapshot_hardlink_dir.display()
1036 );
1037 fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
1038 GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
1039 err,
1040 snapshot_hardlink_dir.clone(),
1041 )
1042 })?;
1043 let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
1044 symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
1045 GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
1046 source: err,
1047 original: snapshot_hardlink_dir.clone(),
1048 link: symlink_path,
1049 }
1050 })?;
1051 account_paths.insert(account_path);
1052 };
1053
1054 Ok(snapshot_hardlink_dir)
1055}
1056
1057pub fn hard_link_storages_to_snapshot(
1062 bank_snapshot_dir: impl AsRef<Path>,
1063 bank_slot: Slot,
1064 snapshot_storages: &[Arc<AccountStorageEntry>],
1065) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
1066 let accounts_hardlinks_dir = bank_snapshot_dir
1067 .as_ref()
1068 .join(snapshot_paths::SNAPSHOT_ACCOUNTS_HARDLINKS);
1069 fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
1070 HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
1071 err,
1072 accounts_hardlinks_dir.clone(),
1073 )
1074 })?;
1075
1076 let mut account_paths: HashSet<PathBuf> = HashSet::new();
1077 for storage in snapshot_storages {
1078 let storage_path = storage.accounts.path();
1079 let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
1080 storage_path,
1081 bank_slot,
1082 &mut account_paths,
1083 &accounts_hardlinks_dir,
1084 )?;
1085 let hardlink_filename = AccountsFile::file_name(storage.slot(), storage.id());
1088 let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
1089 fs::hard_link(storage_path, &hard_link_path).map_err(|err| {
1090 HardLinkStoragesToSnapshotError::HardLinkStorage(
1091 err,
1092 storage_path.to_path_buf(),
1093 hard_link_path,
1094 )
1095 })?;
1096 }
1097 Ok(())
1098}
1099
1100pub(crate) fn get_storages_to_serialize(
1103 snapshot_storages: &[Arc<AccountStorageEntry>],
1104) -> Vec<Vec<Arc<AccountStorageEntry>>> {
1105 snapshot_storages
1106 .iter()
1107 .map(|storage| vec![Arc::clone(storage)])
1108 .collect::<Vec<_>>()
1109}
1110
1111pub fn verify_and_unarchive_snapshots(
1113 bank_snapshots_dir: impl AsRef<Path>,
1114 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1115 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1116 account_paths: &[PathBuf],
1117 accounts_db_config: &AccountsDbConfig,
1118) -> Result<(UnarchivedSnapshots, UnarchivedSnapshotsGuard)> {
1119 check_are_snapshots_compatible(
1120 full_snapshot_archive_info,
1121 incremental_snapshot_archive_info,
1122 )?;
1123
1124 let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
1125 let UnarchivedSnapshot {
1126 unpack_dir: full_unpack_dir,
1127 storage: full_storage,
1128 bank_fields: full_bank_fields,
1129 accounts_db_fields: full_accounts_db_fields,
1130 unpacked_snapshots_dir_and_version: full_unpacked_snapshots_dir_and_version,
1131 measure_untar: full_measure_untar,
1132 } = unarchive_snapshot(
1133 &bank_snapshots_dir,
1134 snapshot_paths::TMP_SNAPSHOT_ARCHIVE_PREFIX,
1135 full_snapshot_archive_info.path(),
1136 "snapshot untar",
1137 account_paths,
1138 full_snapshot_archive_info.archive_format(),
1139 next_append_vec_id.clone(),
1140 accounts_db_config,
1141 )?;
1142
1143 let (
1144 incremental_unpack_dir,
1145 incremental_storage,
1146 incremental_bank_fields,
1147 incremental_accounts_db_fields,
1148 incremental_unpacked_snapshots_dir_and_version,
1149 incremental_measure_untar,
1150 ) = if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
1151 let UnarchivedSnapshot {
1152 unpack_dir,
1153 storage,
1154 bank_fields,
1155 accounts_db_fields,
1156 unpacked_snapshots_dir_and_version,
1157 measure_untar,
1158 } = unarchive_snapshot(
1159 &bank_snapshots_dir,
1160 snapshot_paths::TMP_SNAPSHOT_ARCHIVE_PREFIX,
1161 incremental_snapshot_archive_info.path(),
1162 "incremental snapshot untar",
1163 account_paths,
1164 incremental_snapshot_archive_info.archive_format(),
1165 next_append_vec_id.clone(),
1166 accounts_db_config,
1167 )?;
1168 (
1169 Some(unpack_dir),
1170 Some(storage),
1171 Some(bank_fields),
1172 Some(accounts_db_fields),
1173 Some(unpacked_snapshots_dir_and_version),
1174 Some(measure_untar),
1175 )
1176 } else {
1177 (None, None, None, None, None, None)
1178 };
1179
1180 let bank_fields = SnapshotBankFields::new(full_bank_fields, incremental_bank_fields);
1181 let accounts_db_fields =
1182 SnapshotAccountsDbFields::new(full_accounts_db_fields, incremental_accounts_db_fields);
1183 let next_append_vec_id = Arc::try_unwrap(next_append_vec_id).unwrap();
1184
1185 Ok((
1186 UnarchivedSnapshots {
1187 full_storage,
1188 incremental_storage,
1189 bank_fields,
1190 accounts_db_fields,
1191 full_unpacked_snapshots_dir_and_version,
1192 incremental_unpacked_snapshots_dir_and_version,
1193 full_measure_untar,
1194 incremental_measure_untar,
1195 next_append_vec_id,
1196 },
1197 UnarchivedSnapshotsGuard {
1198 full_unpack_dir,
1199 incremental_unpack_dir,
1200 },
1201 ))
1202}
1203
1204#[derive(PartialEq, Debug)]
1206enum SnapshotFileKind {
1207 Version,
1208 BankFields,
1209 Storage,
1210}
1211
1212fn get_snapshot_file_kind(filename: &str) -> Option<SnapshotFileKind> {
1214 static VERSION_FILE_REGEX: LazyLock<Regex> =
1215 LazyLock::new(|| Regex::new(r"^version$").unwrap());
1216 static BANK_FIELDS_FILE_REGEX: LazyLock<Regex> =
1217 LazyLock::new(|| Regex::new(r"^[0-9]+(\.pre)?$").unwrap());
1218
1219 if VERSION_FILE_REGEX.is_match(filename) {
1220 Some(SnapshotFileKind::Version)
1221 } else if BANK_FIELDS_FILE_REGEX.is_match(filename) {
1222 Some(SnapshotFileKind::BankFields)
1223 } else if get_slot_and_append_vec_id(filename).is_ok() {
1224 Some(SnapshotFileKind::Storage)
1225 } else {
1226 None
1227 }
1228}
1229
1230fn get_version_and_snapshot_files(
1234 file_receiver: &Receiver<PathBuf>,
1235) -> Result<(PathBuf, PathBuf, Vec<PathBuf>)> {
1236 let mut append_vec_files = Vec::with_capacity(1024);
1237 let mut snapshot_version_path = None;
1238 let mut snapshot_file_path = None;
1239
1240 loop {
1241 if let Ok(path) = file_receiver.recv() {
1242 let filename = path.file_name().unwrap().to_str().unwrap();
1243 match get_snapshot_file_kind(filename) {
1244 Some(SnapshotFileKind::Version) => {
1245 snapshot_version_path = Some(path);
1246
1247 if snapshot_file_path.is_some() {
1249 break;
1250 }
1251 }
1252 Some(SnapshotFileKind::BankFields) => {
1253 snapshot_file_path = Some(path);
1254
1255 if snapshot_version_path.is_some() {
1257 break;
1258 }
1259 }
1260 Some(SnapshotFileKind::Storage) => {
1261 append_vec_files.push(path);
1262 }
1263 None => {} }
1265 } else {
1266 return Err(SnapshotError::RebuildStorages(
1267 "did not receive snapshot file from unpacking threads".to_string(),
1268 ));
1269 }
1270 }
1271 let snapshot_version_path = snapshot_version_path.unwrap();
1272 let snapshot_file_path = snapshot_file_path.unwrap();
1273
1274 Ok((snapshot_version_path, snapshot_file_path, append_vec_files))
1275}
1276
1277struct SnapshotFieldsBundle {
1279 snapshot_version: SnapshotVersion,
1280 bank_fields: BankFieldsToDeserialize,
1281 accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
1282 append_vec_files: Vec<PathBuf>,
1283}
1284
1285fn snapshot_fields_from_files(file_receiver: &Receiver<PathBuf>) -> Result<SnapshotFieldsBundle> {
1288 let (snapshot_version_path, snapshot_file_path, append_vec_files) =
1289 get_version_and_snapshot_files(file_receiver)?;
1290 let snapshot_version_str = snapshot_version_from_file(snapshot_version_path)?;
1291 let snapshot_version = snapshot_version_str.parse().map_err(|err| {
1292 IoError::other(format!(
1293 "unsupported snapshot version '{snapshot_version_str}': {err}",
1294 ))
1295 })?;
1296
1297 let snapshot_file = fs::File::open(snapshot_file_path).unwrap();
1298 let mut snapshot_stream = BufReader::new(snapshot_file);
1299 let (bank_fields, accounts_db_fields) = match snapshot_version {
1300 SnapshotVersion::V1_2_0 => serde_snapshot::fields_from_stream(&mut snapshot_stream)?,
1301 };
1302
1303 Ok(SnapshotFieldsBundle {
1304 snapshot_version,
1305 bank_fields,
1306 accounts_db_fields,
1307 append_vec_files,
1308 })
1309}
1310
1311fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
1316 let snapshots_dir = unpack_dir.as_ref().join(snapshot_paths::BANK_SNAPSHOTS_DIR);
1317 if !snapshots_dir.is_dir() {
1318 return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
1319 }
1320
1321 let slot_dir = std::fs::read_dir(&snapshots_dir)
1323 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1324 .find(|entry| entry.as_ref().unwrap().path().is_dir())
1325 .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1326 .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
1327 .path();
1328
1329 let version_file = unpack_dir
1330 .as_ref()
1331 .join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
1332 fs::hard_link(
1333 version_file,
1334 slot_dir.join(snapshot_paths::SNAPSHOT_VERSION_FILENAME),
1335 )?;
1336
1337 let status_cache_file = snapshots_dir.join(snapshot_paths::SNAPSHOT_STATUS_CACHE_FILENAME);
1338 fs::hard_link(
1339 status_cache_file,
1340 slot_dir.join(snapshot_paths::SNAPSHOT_STATUS_CACHE_FILENAME),
1341 )?;
1342
1343 Ok(())
1344}
1345
1346fn unarchive_snapshot(
1350 bank_snapshots_dir: impl AsRef<Path>,
1351 unpacked_snapshots_dir_prefix: &'static str,
1352 snapshot_archive_path: impl AsRef<Path>,
1353 measure_name: &'static str,
1354 account_paths: &[PathBuf],
1355 archive_format: ArchiveFormat,
1356 next_append_vec_id: Arc<AtomicAccountsFileId>,
1357 accounts_db_config: &AccountsDbConfig,
1358) -> Result<UnarchivedSnapshot> {
1359 let unpack_dir = tempfile::Builder::new()
1360 .prefix(unpacked_snapshots_dir_prefix)
1361 .tempdir_in(bank_snapshots_dir)?;
1362 let unpacked_snapshots_dir = unpack_dir.path().join(snapshot_paths::BANK_SNAPSHOTS_DIR);
1363
1364 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1365 let unarchive_handle = streaming_unarchive_snapshot(
1366 file_sender,
1367 account_paths.to_vec(),
1368 unpack_dir.path().to_path_buf(),
1369 snapshot_archive_path.as_ref().to_path_buf(),
1370 archive_format,
1371 accounts_db_config.memlock_budget_size,
1372 );
1373
1374 let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1375 let snapshot_result = snapshot_fields_from_files(&file_receiver).and_then(
1376 |SnapshotFieldsBundle {
1377 snapshot_version,
1378 bank_fields,
1379 accounts_db_fields,
1380 append_vec_files,
1381 ..
1382 }| {
1383 let (storage, measure_untar) = measure_time!(
1384 SnapshotStorageRebuilder::rebuild_storage(
1385 &accounts_db_fields,
1386 append_vec_files,
1387 file_receiver,
1388 num_rebuilder_threads,
1389 next_append_vec_id,
1390 SnapshotFrom::Archive,
1391 accounts_db_config.storage_access,
1392 None,
1393 )?,
1394 measure_name
1395 );
1396 info!("{measure_untar}");
1397 create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
1398
1399 Ok(UnarchivedSnapshot {
1400 unpack_dir,
1401 storage,
1402 bank_fields,
1403 accounts_db_fields,
1404 unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
1405 unpacked_snapshots_dir,
1406 snapshot_version,
1407 },
1408 measure_untar,
1409 })
1410 },
1411 );
1412 unarchive_handle.join().unwrap()?;
1413 snapshot_result
1414}
1415
1416fn streaming_snapshot_dir_files(
1419 file_sender: Sender<PathBuf>,
1420 snapshot_file_path: impl Into<PathBuf>,
1421 snapshot_version_path: impl Into<PathBuf>,
1422 account_paths: &[PathBuf],
1423) -> Result<()> {
1424 file_sender.send(snapshot_file_path.into())?;
1425 file_sender.send(snapshot_version_path.into())?;
1426
1427 for account_path in account_paths {
1428 for file in fs::read_dir(account_path)? {
1429 file_sender.send(file?.path())?;
1430 }
1431 }
1432
1433 Ok(())
1434}
1435
1436pub fn rebuild_storages_from_snapshot_dir(
1441 snapshot_info: &BankSnapshotInfo,
1442 account_paths: &[PathBuf],
1443 next_append_vec_id: Arc<AtomicAccountsFileId>,
1444 storage_access: StorageAccess,
1445) -> Result<(
1446 AccountStorageMap,
1447 BankFieldsToDeserialize,
1448 AccountsDbFields<SerializableAccountStorageEntry>,
1449)> {
1450 let bank_snapshot_dir = &snapshot_info.snapshot_dir;
1451 let accounts_hardlinks = bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_ACCOUNTS_HARDLINKS);
1452 let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
1453
1454 let obsolete_accounts = snapshot_info
1458 .fastboot_version
1459 .as_ref()
1460 .is_some_and(|fastboot_version| fastboot_version.major >= 2)
1461 .then(|| deserialize_obsolete_accounts(bank_snapshot_dir, MAX_OBSOLETE_ACCOUNTS_FILE_SIZE))
1462 .transpose()
1463 .map_err(|err| {
1464 IoError::other(format!(
1465 "failed to read obsolete accounts file '{}': {err}",
1466 bank_snapshot_dir.display()
1467 ))
1468 })?;
1469
1470 let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
1471 IoError::other(format!(
1472 "failed to read accounts hardlinks dir '{}': {err}",
1473 accounts_hardlinks.display(),
1474 ))
1475 })?;
1476 for dir_entry in read_dir {
1477 let symlink_path = dir_entry?.path();
1478 let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
1481 IoError::other(format!(
1482 "failed to read symlink '{}': {err}",
1483 symlink_path.display(),
1484 ))
1485 })?;
1486 let account_run_path = account_snapshot_path
1487 .parent()
1488 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1489 .parent()
1490 .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
1491 .join(ACCOUNTS_RUN_DIR);
1492 if !account_run_paths.contains(&account_run_path) {
1493 return Err(SnapshotError::AccountPathsMismatch);
1496 }
1497 let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
1500 IoError::other(format!(
1501 "failed to read account snapshot dir '{}': {err}",
1502 account_snapshot_path.display(),
1503 ))
1504 })?;
1505 for file in read_dir {
1506 let file_path = file?.path();
1507 let file_name = file_path
1508 .file_name()
1509 .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
1510 let dest_path = account_run_path.join(file_name);
1511 fs::hard_link(&file_path, &dest_path).map_err(|err| {
1512 IoError::other(format!(
1513 "failed to hard link from '{}' to '{}': {err}",
1514 file_path.display(),
1515 dest_path.display(),
1516 ))
1517 })?;
1518 }
1519 }
1520
1521 let (file_sender, file_receiver) = crossbeam_channel::unbounded();
1522 let snapshot_file_path = &snapshot_info.snapshot_path();
1523 let snapshot_version_path = bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
1524 streaming_snapshot_dir_files(
1525 file_sender,
1526 snapshot_file_path,
1527 snapshot_version_path,
1528 account_paths,
1529 )?;
1530
1531 let SnapshotFieldsBundle {
1532 bank_fields,
1533 accounts_db_fields,
1534 append_vec_files,
1535 ..
1536 } = snapshot_fields_from_files(&file_receiver)?;
1537
1538 let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
1539 let storage = SnapshotStorageRebuilder::rebuild_storage(
1540 &accounts_db_fields,
1541 append_vec_files,
1542 file_receiver,
1543 num_rebuilder_threads,
1544 next_append_vec_id,
1545 SnapshotFrom::Dir,
1546 storage_access,
1547 obsolete_accounts,
1548 )?;
1549
1550 Ok((storage, bank_fields, accounts_db_fields))
1551}
1552
1553fn snapshot_version_from_file(path: impl AsRef<Path>) -> io::Result<String> {
1557 let file_metadata = fs::metadata(&path).map_err(|err| {
1559 IoError::other(format!(
1560 "failed to query snapshot version file metadata '{}': {err}",
1561 path.as_ref().display(),
1562 ))
1563 })?;
1564 let file_size = file_metadata.len();
1565 if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
1566 let error_message = format!(
1567 "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
1568 path.as_ref().display(),
1569 file_size,
1570 MAX_SNAPSHOT_VERSION_FILE_SIZE,
1571 );
1572 return Err(IoError::other(error_message));
1573 }
1574
1575 let mut snapshot_version = String::new();
1577 let mut file = fs::File::open(&path).map_err(|err| {
1578 IoError::other(format!(
1579 "failed to open snapshot version file '{}': {err}",
1580 path.as_ref().display()
1581 ))
1582 })?;
1583 file.read_to_string(&mut snapshot_version).map_err(|err| {
1584 IoError::other(format!(
1585 "failed to read snapshot version from file '{}': {err}",
1586 path.as_ref().display()
1587 ))
1588 })?;
1589
1590 Ok(snapshot_version.trim().to_string())
1591}
1592
1593fn check_are_snapshots_compatible(
1596 full_snapshot_archive_info: &FullSnapshotArchiveInfo,
1597 incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
1598) -> Result<()> {
1599 if incremental_snapshot_archive_info.is_none() {
1600 return Ok(());
1601 }
1602
1603 let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
1604
1605 (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
1606 .then_some(())
1607 .ok_or_else(|| {
1608 SnapshotError::MismatchedBaseSlot(
1609 full_snapshot_archive_info.slot(),
1610 incremental_snapshot_archive_info.base_slot(),
1611 )
1612 })
1613}
1614
1615pub fn purge_old_snapshot_archives(
1616 full_snapshot_archives_dir: impl AsRef<Path>,
1617 incremental_snapshot_archives_dir: impl AsRef<Path>,
1618 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
1619 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
1620) {
1621 info!(
1622 "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
1623 full_snapshot_archives_dir.as_ref().display(),
1624 maximum_full_snapshot_archives_to_retain
1625 );
1626
1627 let mut full_snapshot_archives =
1628 snapshot_paths::get_full_snapshot_archives(&full_snapshot_archives_dir);
1629 full_snapshot_archives.sort_unstable();
1630 full_snapshot_archives.reverse();
1631
1632 let num_to_retain = full_snapshot_archives
1633 .len()
1634 .min(maximum_full_snapshot_archives_to_retain.get());
1635 trace!(
1636 "There are {} full snapshot archives, retaining {}",
1637 full_snapshot_archives.len(),
1638 num_to_retain,
1639 );
1640
1641 let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
1642 if full_snapshot_archives.is_empty() {
1643 None
1644 } else {
1645 Some(full_snapshot_archives.split_at(num_to_retain))
1646 }
1647 .unwrap_or_default();
1648
1649 let retained_full_snapshot_slots = full_snapshot_archives_to_retain
1650 .iter()
1651 .map(|ai| ai.slot())
1652 .collect::<HashSet<_>>();
1653
1654 fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
1655 for path in archives.iter().map(|a| a.path()) {
1656 trace!("Removing snapshot archive: {}", path.display());
1657 let result = fs::remove_file(path);
1658 if let Err(err) = result {
1659 info!(
1660 "Failed to remove snapshot archive '{}': {err}",
1661 path.display()
1662 );
1663 }
1664 }
1665 }
1666 remove_archives(full_snapshot_archives_to_remove);
1667
1668 info!(
1669 "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
1670 incremental_snapshot_archives_dir.as_ref().display(),
1671 maximum_incremental_snapshot_archives_to_retain
1672 );
1673 let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
1674 for incremental_snapshot_archive in
1675 get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
1676 {
1677 incremental_snapshot_archives_by_base_slot
1678 .entry(incremental_snapshot_archive.base_slot())
1679 .or_default()
1680 .push(incremental_snapshot_archive)
1681 }
1682
1683 let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
1684 for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
1685 {
1686 incremental_snapshot_archives.sort_unstable();
1687 let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
1688 maximum_incremental_snapshot_archives_to_retain.get()
1689 } else {
1690 usize::from(retained_full_snapshot_slots.contains(&base_slot))
1691 };
1692 trace!(
1693 "There are {} incremental snapshot archives for base slot {}, removing {} of them",
1694 incremental_snapshot_archives.len(),
1695 base_slot,
1696 incremental_snapshot_archives
1697 .len()
1698 .saturating_sub(num_to_retain),
1699 );
1700
1701 incremental_snapshot_archives.truncate(
1702 incremental_snapshot_archives
1703 .len()
1704 .saturating_sub(num_to_retain),
1705 );
1706 remove_archives(&incremental_snapshot_archives);
1707 }
1708}
1709
1710pub fn verify_unpacked_snapshots_dir_and_version(
1711 unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
1712) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
1713 info!(
1714 "snapshot version: {}",
1715 &unpacked_snapshots_dir_and_version.snapshot_version
1716 );
1717
1718 let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
1719 let mut bank_snapshots =
1720 get_bank_snapshots(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
1721 if bank_snapshots.len() > 1 {
1722 return Err(IoError::other(format!(
1723 "invalid snapshot format: only one snapshot allowed, but found {}",
1724 bank_snapshots.len(),
1725 ))
1726 .into());
1727 }
1728 let root_paths = bank_snapshots.pop().ok_or_else(|| {
1729 IoError::other(format!(
1730 "no snapshots found in snapshots directory '{}'",
1731 unpacked_snapshots_dir_and_version
1732 .unpacked_snapshots_dir
1733 .display(),
1734 ))
1735 })?;
1736 Ok((snapshot_version, root_paths))
1737}
1738
1739#[derive(Debug, Copy, Clone)]
1740pub enum VerifyBank {
1742 Deterministic,
1744 NonDeterministic,
1747}
1748
1749pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
1751 let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
1752 purge_bank_snapshots(&bank_snapshots);
1753}
1754
1755pub fn purge_old_bank_snapshots(
1757 bank_snapshots_dir: impl AsRef<Path>,
1758 num_bank_snapshots_to_retain: usize,
1759) {
1760 let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
1761
1762 bank_snapshots.sort_unstable();
1763 purge_bank_snapshots(
1764 bank_snapshots
1765 .iter()
1766 .rev()
1767 .skip(num_bank_snapshots_to_retain),
1768 );
1769}
1770
1771pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
1773 purge_old_bank_snapshots(&bank_snapshots_dir, 1);
1774
1775 let highest_bank_snapshot = get_highest_bank_snapshot(&bank_snapshots_dir);
1776 if let Some(highest_bank_snapshot) = highest_bank_snapshot {
1777 debug!(
1778 "Retained bank snapshot for slot {}, and purged the rest.",
1779 highest_bank_snapshot.slot
1780 );
1781 }
1782}
1783
1784pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
1786 let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
1787 bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
1788 purge_bank_snapshots(&bank_snapshots);
1789}
1790
1791fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
1795 for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
1796 if purge_bank_snapshot(snapshot_dir).is_err() {
1797 warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
1798 }
1799 }
1800}
1801
1802pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
1804 const FN_ERR: &str = "failed to purge bank snapshot";
1805 let accounts_hardlinks_dir = bank_snapshot_dir
1806 .as_ref()
1807 .join(snapshot_paths::SNAPSHOT_ACCOUNTS_HARDLINKS);
1808 if accounts_hardlinks_dir.is_dir() {
1809 let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
1812 IoError::other(format!(
1813 "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
1814 accounts_hardlinks_dir.display(),
1815 ))
1816 })?;
1817 for entry in read_dir {
1818 let accounts_hardlink_dir = entry?.path();
1819 let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
1820 IoError::other(format!(
1821 "{FN_ERR}: failed to read symlink '{}': {err}",
1822 accounts_hardlink_dir.display(),
1823 ))
1824 })?;
1825 move_and_async_delete_path(&accounts_hardlink_dir);
1826 }
1827 }
1828 fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
1829 IoError::other(format!(
1830 "{FN_ERR}: failed to remove dir '{}': {err}",
1831 bank_snapshot_dir.as_ref().display(),
1832 ))
1833 })?;
1834 Ok(())
1835}
1836
1837pub fn should_take_full_snapshot(
1838 block_height: Slot,
1839 full_snapshot_archive_interval_slots: Slot,
1840) -> bool {
1841 block_height % full_snapshot_archive_interval_slots == 0
1842}
1843
1844pub fn should_take_incremental_snapshot(
1845 block_height: Slot,
1846 incremental_snapshot_archive_interval_slots: Slot,
1847 latest_full_snapshot_slot: Option<Slot>,
1848) -> bool {
1849 block_height % incremental_snapshot_archive_interval_slots == 0
1850 && latest_full_snapshot_slot.is_some()
1851}
1852
1853#[cfg(feature = "dev-context-only-utils")]
1858pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
1859 let tmp_dir = tempfile::TempDir::new().unwrap();
1860 let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
1861 (tmp_dir, account_dir)
1862}
1863
1864#[cfg(test)]
1865mod tests {
1866 use {
1867 super::*,
1868 agave_snapshots::{
1869 paths::{
1870 get_full_snapshot_archives, get_highest_full_snapshot_archive_slot,
1871 get_highest_incremental_snapshot_archive_slot,
1872 },
1873 snapshot_config::{
1874 DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN,
1875 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
1876 },
1877 },
1878 assert_matches::assert_matches,
1879 bincode::{deserialize_from, serialize_into},
1880 solana_accounts_db::accounts_file::AccountsFileProvider,
1881 solana_hash::Hash,
1882 std::{convert::TryFrom, mem::size_of},
1883 tempfile::NamedTempFile,
1884 test_case::test_case,
1885 };
1886
1887 #[test]
1888 fn test_serialize_snapshot_data_file_under_limit() {
1889 let temp_dir = tempfile::TempDir::new().unwrap();
1890 let expected_consumed_size = size_of::<u32>() as u64;
1891 let consumed_size = serialize_snapshot_data_file_capped(
1892 &temp_dir.path().join("data-file"),
1893 expected_consumed_size,
1894 |stream| {
1895 serialize_into(stream, &2323_u32)?;
1896 Ok(())
1897 },
1898 )
1899 .unwrap();
1900 assert_eq!(consumed_size, expected_consumed_size);
1901 }
1902
1903 #[test]
1904 fn test_serialize_snapshot_data_file_over_limit() {
1905 let temp_dir = tempfile::TempDir::new().unwrap();
1906 let expected_consumed_size = size_of::<u32>() as u64;
1907 let result = serialize_snapshot_data_file_capped(
1908 &temp_dir.path().join("data-file"),
1909 expected_consumed_size - 1,
1910 |stream| {
1911 serialize_into(stream, &2323_u32)?;
1912 Ok(())
1913 },
1914 );
1915 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
1916 }
1917
1918 #[test]
1919 fn test_deserialize_snapshot_data_file_under_limit() {
1920 let expected_data = 2323_u32;
1921 let expected_consumed_size = size_of::<u32>() as u64;
1922
1923 let temp_dir = tempfile::TempDir::new().unwrap();
1924 serialize_snapshot_data_file_capped(
1925 &temp_dir.path().join("data-file"),
1926 expected_consumed_size,
1927 |stream| {
1928 serialize_into(stream, &expected_data)?;
1929 Ok(())
1930 },
1931 )
1932 .unwrap();
1933
1934 let snapshot_root_paths = SnapshotRootPaths {
1935 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
1936 incremental_snapshot_root_file_path: None,
1937 };
1938
1939 let actual_data = deserialize_snapshot_data_files_capped(
1940 &snapshot_root_paths,
1941 expected_consumed_size,
1942 |stream| {
1943 Ok(deserialize_from::<_, u32>(
1944 &mut stream.full_snapshot_stream,
1945 )?)
1946 },
1947 )
1948 .unwrap();
1949 assert_eq!(actual_data, expected_data);
1950 }
1951
1952 #[test]
1953 fn test_deserialize_snapshot_data_file_over_limit() {
1954 let expected_data = 2323_u32;
1955 let expected_consumed_size = size_of::<u32>() as u64;
1956
1957 let temp_dir = tempfile::TempDir::new().unwrap();
1958 serialize_snapshot_data_file_capped(
1959 &temp_dir.path().join("data-file"),
1960 expected_consumed_size,
1961 |stream| {
1962 serialize_into(stream, &expected_data)?;
1963 Ok(())
1964 },
1965 )
1966 .unwrap();
1967
1968 let snapshot_root_paths = SnapshotRootPaths {
1969 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
1970 incremental_snapshot_root_file_path: None,
1971 };
1972
1973 let result = deserialize_snapshot_data_files_capped(
1974 &snapshot_root_paths,
1975 expected_consumed_size - 1,
1976 |stream| {
1977 Ok(deserialize_from::<_, u32>(
1978 &mut stream.full_snapshot_stream,
1979 )?)
1980 },
1981 );
1982 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
1983 }
1984
1985 #[test]
1986 fn test_deserialize_snapshot_data_file_extra_data() {
1987 let expected_data = 2323_u32;
1988 let expected_consumed_size = size_of::<u32>() as u64;
1989
1990 let temp_dir = tempfile::TempDir::new().unwrap();
1991 serialize_snapshot_data_file_capped(
1992 &temp_dir.path().join("data-file"),
1993 expected_consumed_size * 2,
1994 |stream| {
1995 serialize_into(stream.by_ref(), &expected_data)?;
1996 serialize_into(stream.by_ref(), &expected_data)?;
1997 Ok(())
1998 },
1999 )
2000 .unwrap();
2001
2002 let snapshot_root_paths = SnapshotRootPaths {
2003 full_snapshot_root_file_path: temp_dir.path().join("data-file"),
2004 incremental_snapshot_root_file_path: None,
2005 };
2006
2007 let result = deserialize_snapshot_data_files_capped(
2008 &snapshot_root_paths,
2009 expected_consumed_size * 2,
2010 |stream| {
2011 Ok(deserialize_from::<_, u32>(
2012 &mut stream.full_snapshot_stream,
2013 )?)
2014 },
2015 );
2016 assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
2017 }
2018
2019 #[test]
2020 fn test_snapshot_version_from_file_under_limit() {
2021 let file_content = SnapshotVersion::default().as_str();
2022 let mut file = NamedTempFile::new().unwrap();
2023 file.write_all(file_content.as_bytes()).unwrap();
2024 let version_from_file = snapshot_version_from_file(file.path()).unwrap();
2025 assert_eq!(version_from_file, file_content);
2026 }
2027
2028 #[test]
2029 fn test_snapshot_version_from_file_over_limit() {
2030 let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
2031 let file_content = vec![7u8; over_limit_size];
2032 let mut file = NamedTempFile::new().unwrap();
2033 file.write_all(&file_content).unwrap();
2034 assert_matches!(
2035 snapshot_version_from_file(file.path()),
2036 Err(ref message) if message.to_string().starts_with("snapshot version file too large")
2037 );
2038 }
2039
2040 #[test]
2041 fn test_check_are_snapshots_compatible() {
2042 let slot1: Slot = 1234;
2043 let slot2: Slot = 5678;
2044 let slot3: Slot = 999_999;
2045
2046 let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
2047 format!("/dir/snapshot-{}-{}.tar.zst", slot1, Hash::new_unique()),
2048 ))
2049 .unwrap();
2050
2051 assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
2052
2053 let incremental_snapshot_archive_info =
2054 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2055 "/dir/incremental-snapshot-{}-{}-{}.tar.zst",
2056 slot1,
2057 slot2,
2058 Hash::new_unique()
2059 )))
2060 .unwrap();
2061
2062 assert!(check_are_snapshots_compatible(
2063 &full_snapshot_archive_info,
2064 Some(&incremental_snapshot_archive_info)
2065 )
2066 .is_ok());
2067
2068 let incremental_snapshot_archive_info =
2069 IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
2070 "/dir/incremental-snapshot-{}-{}-{}.tar.zst",
2071 slot2,
2072 slot3,
2073 Hash::new_unique()
2074 )))
2075 .unwrap();
2076
2077 assert!(check_are_snapshots_compatible(
2078 &full_snapshot_archive_info,
2079 Some(&incremental_snapshot_archive_info)
2080 )
2081 .is_err());
2082 }
2083
2084 fn common_create_bank_snapshot_files(
2086 bank_snapshots_dir: &Path,
2087 min_slot: Slot,
2088 max_slot: Slot,
2089 ) {
2090 for slot in min_slot..max_slot {
2091 let snapshot_dir = snapshot_paths::get_bank_snapshot_dir(bank_snapshots_dir, slot);
2092 fs::create_dir_all(&snapshot_dir).unwrap();
2093
2094 let snapshot_filename = snapshot_paths::get_snapshot_file_name(slot);
2095 let snapshot_path = snapshot_dir.join(snapshot_filename);
2096 fs::File::create(snapshot_path).unwrap();
2097
2098 let status_cache_file =
2099 snapshot_dir.join(snapshot_paths::SNAPSHOT_STATUS_CACHE_FILENAME);
2100 fs::File::create(status_cache_file).unwrap();
2101
2102 let version_path = snapshot_dir.join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
2103 fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
2104 }
2105 }
2106
2107 #[test]
2108 fn test_get_bank_snapshots() {
2109 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2110 let min_slot = 10;
2111 let max_slot = 20;
2112 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2113
2114 let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
2115 assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
2116 }
2117
2118 #[test]
2119 fn test_get_highest_bank_snapshot() {
2120 let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
2121 let min_slot = 99;
2122 let max_slot = 123;
2123 common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
2124
2125 let highest_bank_snapshot = get_highest_bank_snapshot(temp_snapshots_dir.path());
2126 assert!(highest_bank_snapshot.is_some());
2127 assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
2128 }
2129
2130 fn common_create_snapshot_archive_files(
2136 full_snapshot_archives_dir: &Path,
2137 incremental_snapshot_archives_dir: &Path,
2138 min_full_snapshot_slot: Slot,
2139 max_full_snapshot_slot: Slot,
2140 min_incremental_snapshot_slot: Slot,
2141 max_incremental_snapshot_slot: Slot,
2142 ) {
2143 fs::create_dir_all(full_snapshot_archives_dir).unwrap();
2144 fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
2145 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
2146 for incremental_snapshot_slot in
2147 min_incremental_snapshot_slot..max_incremental_snapshot_slot
2148 {
2149 let snapshot_filename = format!(
2150 "incremental-snapshot-{}-{}-{}.tar.zst",
2151 full_snapshot_slot,
2152 incremental_snapshot_slot,
2153 Hash::default()
2154 );
2155 let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
2156 fs::File::create(snapshot_filepath).unwrap();
2157 }
2158
2159 let snapshot_filename = format!(
2160 "snapshot-{}-{}.tar.zst",
2161 full_snapshot_slot,
2162 Hash::default()
2163 );
2164 let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
2165 fs::File::create(snapshot_filepath).unwrap();
2166
2167 let bad_filename = format!(
2169 "incremental-snapshot-{}-{}-bad!hash.tar.zst",
2170 full_snapshot_slot,
2171 max_incremental_snapshot_slot + 1,
2172 );
2173 let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
2174 fs::File::create(bad_filepath).unwrap();
2175 }
2176
2177 let bad_filename = format!("snapshot-{}-bad!hash.tar.zst", max_full_snapshot_slot + 1);
2180 let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
2181 fs::File::create(bad_filepath).unwrap();
2182 }
2183
2184 #[test]
2185 fn test_get_full_snapshot_archives() {
2186 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2187 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2188 let min_slot = 123;
2189 let max_slot = 456;
2190 common_create_snapshot_archive_files(
2191 full_snapshot_archives_dir.path(),
2192 incremental_snapshot_archives_dir.path(),
2193 min_slot,
2194 max_slot,
2195 0,
2196 0,
2197 );
2198
2199 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2200 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
2201 }
2202
2203 #[test]
2204 fn test_get_full_snapshot_archives_remote() {
2205 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2206 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2207 let min_slot = 123;
2208 let max_slot = 456;
2209 common_create_snapshot_archive_files(
2210 &full_snapshot_archives_dir
2211 .path()
2212 .join(snapshot_paths::SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
2213 &incremental_snapshot_archives_dir
2214 .path()
2215 .join(snapshot_paths::SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
2216 min_slot,
2217 max_slot,
2218 0,
2219 0,
2220 );
2221
2222 let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
2223 assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
2224 assert!(snapshot_archives.iter().all(|info| info.is_remote()));
2225 }
2226
2227 #[test]
2228 fn test_get_incremental_snapshot_archives() {
2229 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2230 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2231 let min_full_snapshot_slot = 12;
2232 let max_full_snapshot_slot = 23;
2233 let min_incremental_snapshot_slot = 34;
2234 let max_incremental_snapshot_slot = 45;
2235 common_create_snapshot_archive_files(
2236 full_snapshot_archives_dir.path(),
2237 incremental_snapshot_archives_dir.path(),
2238 min_full_snapshot_slot,
2239 max_full_snapshot_slot,
2240 min_incremental_snapshot_slot,
2241 max_incremental_snapshot_slot,
2242 );
2243
2244 let incremental_snapshot_archives =
2245 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
2246 assert_eq!(
2247 incremental_snapshot_archives.len() as Slot,
2248 (max_full_snapshot_slot - min_full_snapshot_slot)
2249 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
2250 );
2251 }
2252
2253 #[test]
2254 fn test_get_incremental_snapshot_archives_remote() {
2255 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2256 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2257 let min_full_snapshot_slot = 12;
2258 let max_full_snapshot_slot = 23;
2259 let min_incremental_snapshot_slot = 34;
2260 let max_incremental_snapshot_slot = 45;
2261 common_create_snapshot_archive_files(
2262 &full_snapshot_archives_dir
2263 .path()
2264 .join(snapshot_paths::SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
2265 &incremental_snapshot_archives_dir
2266 .path()
2267 .join(snapshot_paths::SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
2268 min_full_snapshot_slot,
2269 max_full_snapshot_slot,
2270 min_incremental_snapshot_slot,
2271 max_incremental_snapshot_slot,
2272 );
2273
2274 let incremental_snapshot_archives =
2275 get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
2276 assert_eq!(
2277 incremental_snapshot_archives.len() as Slot,
2278 (max_full_snapshot_slot - min_full_snapshot_slot)
2279 * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
2280 );
2281 assert!(incremental_snapshot_archives
2282 .iter()
2283 .all(|info| info.is_remote()));
2284 }
2285
2286 #[test]
2287 fn test_get_highest_full_snapshot_archive_slot() {
2288 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2289 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2290 let min_slot = 123;
2291 let max_slot = 456;
2292 common_create_snapshot_archive_files(
2293 full_snapshot_archives_dir.path(),
2294 incremental_snapshot_archives_dir.path(),
2295 min_slot,
2296 max_slot,
2297 0,
2298 0,
2299 );
2300
2301 assert_eq!(
2302 get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
2303 Some(max_slot - 1)
2304 );
2305 }
2306
2307 #[test]
2308 fn test_get_highest_incremental_snapshot_slot() {
2309 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2310 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2311 let min_full_snapshot_slot = 12;
2312 let max_full_snapshot_slot = 23;
2313 let min_incremental_snapshot_slot = 34;
2314 let max_incremental_snapshot_slot = 45;
2315 common_create_snapshot_archive_files(
2316 full_snapshot_archives_dir.path(),
2317 incremental_snapshot_archives_dir.path(),
2318 min_full_snapshot_slot,
2319 max_full_snapshot_slot,
2320 min_incremental_snapshot_slot,
2321 max_incremental_snapshot_slot,
2322 );
2323
2324 for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
2325 assert_eq!(
2326 get_highest_incremental_snapshot_archive_slot(
2327 incremental_snapshot_archives_dir.path(),
2328 full_snapshot_slot
2329 ),
2330 Some(max_incremental_snapshot_slot - 1)
2331 );
2332 }
2333
2334 assert_eq!(
2335 get_highest_incremental_snapshot_archive_slot(
2336 incremental_snapshot_archives_dir.path(),
2337 max_full_snapshot_slot
2338 ),
2339 None
2340 );
2341 }
2342
2343 fn common_test_purge_old_snapshot_archives(
2344 snapshot_names: &[&String],
2345 maximum_full_snapshot_archives_to_retain: NonZeroUsize,
2346 maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
2347 expected_snapshots: &[&String],
2348 ) {
2349 let temp_snap_dir = tempfile::TempDir::new().unwrap();
2350
2351 for snap_name in snapshot_names {
2352 let snap_path = temp_snap_dir.path().join(snap_name);
2353 let mut _snap_file = fs::File::create(snap_path);
2354 }
2355 purge_old_snapshot_archives(
2356 temp_snap_dir.path(),
2357 temp_snap_dir.path(),
2358 maximum_full_snapshot_archives_to_retain,
2359 maximum_incremental_snapshot_archives_to_retain,
2360 );
2361
2362 let mut retained_snaps = HashSet::new();
2363 for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
2364 let entry_path_buf = entry.unwrap().path();
2365 let entry_path = entry_path_buf.as_path();
2366 let snapshot_name = entry_path
2367 .file_name()
2368 .unwrap()
2369 .to_str()
2370 .unwrap()
2371 .to_string();
2372 retained_snaps.insert(snapshot_name);
2373 }
2374
2375 for snap_name in expected_snapshots {
2376 assert!(
2377 retained_snaps.contains(snap_name.as_str()),
2378 "{snap_name} not found"
2379 );
2380 }
2381 assert_eq!(retained_snaps.len(), expected_snapshots.len());
2382 }
2383
2384 #[test]
2385 fn test_purge_old_full_snapshot_archives() {
2386 let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
2387 let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
2388 let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
2389 let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
2390
2391 let expected_snapshots = vec![&snap3_name];
2393 common_test_purge_old_snapshot_archives(
2394 &snapshot_names,
2395 NonZeroUsize::new(1).unwrap(),
2396 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
2397 &expected_snapshots,
2398 );
2399
2400 let expected_snapshots = vec![&snap2_name, &snap3_name];
2402 common_test_purge_old_snapshot_archives(
2403 &snapshot_names,
2404 NonZeroUsize::new(2).unwrap(),
2405 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
2406 &expected_snapshots,
2407 );
2408
2409 let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
2411 common_test_purge_old_snapshot_archives(
2412 &snapshot_names,
2413 NonZeroUsize::new(3).unwrap(),
2414 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
2415 &expected_snapshots,
2416 );
2417 }
2418
2419 #[test]
2423 fn test_purge_old_full_snapshot_archives_in_the_loop() {
2424 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2425 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2426 let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
2427 let starting_slot: Slot = 42;
2428
2429 for slot in (starting_slot..).take(100) {
2430 let full_snapshot_archive_file_name =
2431 format!("snapshot-{}-{}.tar.zst", slot, Hash::default());
2432 let full_snapshot_archive_path = full_snapshot_archives_dir
2433 .as_ref()
2434 .join(full_snapshot_archive_file_name);
2435 fs::File::create(full_snapshot_archive_path).unwrap();
2436
2437 if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
2439 continue;
2440 }
2441
2442 if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
2444 continue;
2445 }
2446
2447 purge_old_snapshot_archives(
2448 &full_snapshot_archives_dir,
2449 &incremental_snapshot_archives_dir,
2450 maximum_snapshots_to_retain,
2451 NonZeroUsize::new(usize::MAX).unwrap(),
2452 );
2453 let mut full_snapshot_archives =
2454 get_full_snapshot_archives(&full_snapshot_archives_dir);
2455 full_snapshot_archives.sort_unstable();
2456 assert_eq!(
2457 full_snapshot_archives.len(),
2458 maximum_snapshots_to_retain.get()
2459 );
2460 assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
2461 for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
2462 assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
2463 }
2464 }
2465 }
2466
2467 #[test]
2468 fn test_purge_old_incremental_snapshot_archives() {
2469 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2470 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2471 let starting_slot = 100_000;
2472
2473 let maximum_incremental_snapshot_archives_to_retain =
2474 DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
2475 let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
2476
2477 let incremental_snapshot_interval = 100;
2478 let num_incremental_snapshots_per_full_snapshot =
2479 maximum_incremental_snapshot_archives_to_retain.get() * 2;
2480 let full_snapshot_interval =
2481 incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
2482
2483 let mut snapshot_filenames = vec![];
2484 (starting_slot..)
2485 .step_by(full_snapshot_interval)
2486 .take(
2487 maximum_full_snapshot_archives_to_retain
2488 .checked_mul(NonZeroUsize::new(2).unwrap())
2489 .unwrap()
2490 .get(),
2491 )
2492 .for_each(|full_snapshot_slot| {
2493 let snapshot_filename = format!(
2494 "snapshot-{}-{}.tar.zst",
2495 full_snapshot_slot,
2496 Hash::default()
2497 );
2498 let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
2499 fs::File::create(snapshot_path).unwrap();
2500 snapshot_filenames.push(snapshot_filename);
2501
2502 (full_snapshot_slot..)
2503 .step_by(incremental_snapshot_interval)
2504 .take(num_incremental_snapshots_per_full_snapshot)
2505 .skip(1)
2506 .for_each(|incremental_snapshot_slot| {
2507 let snapshot_filename = format!(
2508 "incremental-snapshot-{}-{}-{}.tar.zst",
2509 full_snapshot_slot,
2510 incremental_snapshot_slot,
2511 Hash::default()
2512 );
2513 let snapshot_path = incremental_snapshot_archives_dir
2514 .path()
2515 .join(&snapshot_filename);
2516 fs::File::create(snapshot_path).unwrap();
2517 snapshot_filenames.push(snapshot_filename);
2518 });
2519 });
2520
2521 purge_old_snapshot_archives(
2522 full_snapshot_archives_dir.path(),
2523 incremental_snapshot_archives_dir.path(),
2524 maximum_full_snapshot_archives_to_retain,
2525 maximum_incremental_snapshot_archives_to_retain,
2526 );
2527
2528 let mut remaining_full_snapshot_archives =
2530 get_full_snapshot_archives(full_snapshot_archives_dir.path());
2531 assert_eq!(
2532 remaining_full_snapshot_archives.len(),
2533 maximum_full_snapshot_archives_to_retain.get(),
2534 );
2535 remaining_full_snapshot_archives.sort_unstable();
2536 let latest_full_snapshot_archive_slot =
2537 remaining_full_snapshot_archives.last().unwrap().slot();
2538
2539 let mut remaining_incremental_snapshot_archives =
2544 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
2545 assert_eq!(
2546 remaining_incremental_snapshot_archives.len(),
2547 maximum_incremental_snapshot_archives_to_retain
2548 .get()
2549 .saturating_add(
2550 maximum_full_snapshot_archives_to_retain
2551 .get()
2552 .saturating_sub(1)
2553 )
2554 );
2555 remaining_incremental_snapshot_archives.sort_unstable();
2556 remaining_incremental_snapshot_archives.reverse();
2557
2558 for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
2560 let incremental_snapshot_archive =
2561 remaining_incremental_snapshot_archives.pop().unwrap();
2562
2563 let expected_base_slot =
2564 latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
2565 assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
2566 let expected_slot = expected_base_slot
2567 + (full_snapshot_interval - incremental_snapshot_interval) as u64;
2568 assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
2569 }
2570
2571 for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
2573 assert_eq!(
2574 incremental_snapshot_archive.base_slot(),
2575 latest_full_snapshot_archive_slot
2576 );
2577 }
2578
2579 let expected_remaining_incremental_snapshot_archive_slots =
2581 (latest_full_snapshot_archive_slot..)
2582 .step_by(incremental_snapshot_interval)
2583 .take(num_incremental_snapshots_per_full_snapshot)
2584 .skip(
2585 num_incremental_snapshots_per_full_snapshot
2586 - maximum_incremental_snapshot_archives_to_retain.get(),
2587 )
2588 .collect::<HashSet<_>>();
2589
2590 let actual_remaining_incremental_snapshot_archive_slots =
2591 remaining_incremental_snapshot_archives
2592 .iter()
2593 .map(|snapshot| snapshot.slot())
2594 .collect::<HashSet<_>>();
2595 assert_eq!(
2596 actual_remaining_incremental_snapshot_archive_slots,
2597 expected_remaining_incremental_snapshot_archive_slots
2598 );
2599 }
2600
2601 #[test]
2602 fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
2603 let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2604 let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
2605
2606 for snapshot_filenames in [
2607 format!("incremental-snapshot-100-120-{}.tar.zst", Hash::default()),
2608 format!("incremental-snapshot-100-140-{}.tar.zst", Hash::default()),
2609 format!("incremental-snapshot-100-160-{}.tar.zst", Hash::default()),
2610 format!("incremental-snapshot-100-180-{}.tar.zst", Hash::default()),
2611 format!("incremental-snapshot-200-220-{}.tar.zst", Hash::default()),
2612 format!("incremental-snapshot-200-240-{}.tar.zst", Hash::default()),
2613 format!("incremental-snapshot-200-260-{}.tar.zst", Hash::default()),
2614 format!("incremental-snapshot-200-280-{}.tar.zst", Hash::default()),
2615 ] {
2616 let snapshot_path = incremental_snapshot_archives_dir
2617 .path()
2618 .join(snapshot_filenames);
2619 fs::File::create(snapshot_path).unwrap();
2620 }
2621
2622 purge_old_snapshot_archives(
2623 full_snapshot_archives_dir.path(),
2624 incremental_snapshot_archives_dir.path(),
2625 NonZeroUsize::new(usize::MAX).unwrap(),
2626 NonZeroUsize::new(usize::MAX).unwrap(),
2627 );
2628
2629 let remaining_incremental_snapshot_archives =
2630 get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
2631 assert!(remaining_incremental_snapshot_archives.is_empty());
2632 }
2633
2634 #[test]
2635 fn test_get_snapshot_accounts_hardlink_dir() {
2636 let slot: Slot = 1;
2637
2638 let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
2639
2640 let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
2641 let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
2642 let accounts_hardlinks_dir =
2643 bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_ACCOUNTS_HARDLINKS);
2644 fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
2645
2646 let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
2647 let appendvec_filename = format!("{slot}.0");
2648 let appendvec_path = accounts_dir.join(appendvec_filename);
2649
2650 let ret = get_snapshot_accounts_hardlink_dir(
2651 &appendvec_path,
2652 slot,
2653 &mut account_paths_set,
2654 &accounts_hardlinks_dir,
2655 );
2656 assert!(ret.is_ok());
2657
2658 let wrong_appendvec_path = appendvec_path
2659 .parent()
2660 .unwrap()
2661 .parent()
2662 .unwrap()
2663 .join(appendvec_path.file_name().unwrap());
2664 let ret = get_snapshot_accounts_hardlink_dir(
2665 &wrong_appendvec_path,
2666 slot,
2667 &mut account_paths_set,
2668 accounts_hardlinks_dir,
2669 );
2670
2671 assert_matches!(
2672 ret,
2673 Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
2674 );
2675 }
2676
2677 #[test]
2678 fn test_get_snapshot_file_kind() {
2679 assert_eq!(None, get_snapshot_file_kind("file.txt"));
2680 assert_eq!(
2681 Some(SnapshotFileKind::Version),
2682 get_snapshot_file_kind(snapshot_paths::SNAPSHOT_VERSION_FILENAME)
2683 );
2684 assert_eq!(
2685 Some(SnapshotFileKind::BankFields),
2686 get_snapshot_file_kind("1234")
2687 );
2688 assert_eq!(
2689 Some(SnapshotFileKind::Storage),
2690 get_snapshot_file_kind("1000.999")
2691 );
2692 }
2693
2694 #[test]
2695 fn test_full_snapshot_slot_file_good() {
2696 let slot_written = 123_456_789;
2697 let bank_snapshot_dir = TempDir::new().unwrap();
2698 write_full_snapshot_slot_file(&bank_snapshot_dir, slot_written).unwrap();
2699
2700 let slot_read = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap();
2701 assert_eq!(slot_read, slot_written);
2702 }
2703
2704 #[test]
2705 fn test_full_snapshot_slot_file_bad() {
2706 const SLOT_SIZE: usize = std::mem::size_of::<Slot>();
2707 let too_small = [1u8; SLOT_SIZE - 1];
2708 let too_large = [1u8; SLOT_SIZE + 1];
2709
2710 for contents in [too_small.as_slice(), too_large.as_slice()] {
2711 let bank_snapshot_dir = TempDir::new().unwrap();
2712 let full_snapshot_slot_path = bank_snapshot_dir
2713 .as_ref()
2714 .join(snapshot_paths::SNAPSHOT_FULL_SNAPSHOT_SLOT_FILENAME);
2715 fs::write(full_snapshot_slot_path, contents).unwrap();
2716
2717 let err = read_full_snapshot_slot_file(&bank_snapshot_dir).unwrap_err();
2718 assert!(err
2719 .to_string()
2720 .starts_with("invalid full snapshot slot file size"));
2721 }
2722 }
2723
2724 #[test_case(0)]
2725 #[test_case(1)]
2726 #[test_case(10)]
2727 fn test_serialize_deserialize_account_storage_entries(num_storages: u64) {
2728 let temp_dir = tempfile::tempdir().unwrap();
2729 let bank_snapshot_dir = temp_dir.path();
2730 let snapshot_slot = num_storages + 1 as Slot;
2731
2732 let mut snapshot_storages = Vec::new();
2734 for i in 0..num_storages {
2735 let storage = Arc::new(AccountStorageEntry::new(
2736 &PathBuf::new(),
2737 i, i as u32, 1024,
2740 AccountsFileProvider::AppendVec,
2741 StorageAccess::File,
2742 ));
2743 snapshot_storages.push(storage);
2744 }
2745
2746 write_obsolete_accounts_to_snapshot(bank_snapshot_dir, &snapshot_storages, snapshot_slot)
2748 .unwrap();
2749
2750 let deserialized_accounts =
2752 deserialize_obsolete_accounts(bank_snapshot_dir, MAX_OBSOLETE_ACCOUNTS_FILE_SIZE)
2753 .unwrap();
2754
2755 for storage in &snapshot_storages {
2757 assert!(deserialized_accounts.remove(&storage.slot()).unwrap().2 == 0);
2758 }
2759 }
2760
2761 #[test]
2762 #[should_panic(expected = "too large obsolete accounts file to serialize")]
2763 fn test_serialize_obsolete_accounts_too_large_file() {
2764 let temp_dir = tempfile::tempdir().unwrap();
2765 let bank_snapshot_dir = temp_dir.path();
2766 let num_storages = 10;
2767 let snapshot_slot = num_storages + 1 as Slot;
2768
2769 let mut snapshot_storages = Vec::new();
2771 for i in 0..num_storages {
2772 let storage = Arc::new(AccountStorageEntry::new(
2773 &PathBuf::new(),
2774 i, i as u32, 1024,
2777 AccountsFileProvider::AppendVec,
2778 StorageAccess::File,
2779 ));
2780 snapshot_storages.push(storage);
2781 }
2782
2783 let obsolete_accounts =
2785 SerdeObsoleteAccountsMap::new_from_storages(&snapshot_storages, snapshot_slot);
2786
2787 serialize_obsolete_accounts(bank_snapshot_dir, &obsolete_accounts, 100).unwrap();
2789 }
2790
2791 #[test]
2792 #[should_panic(expected = "too large obsolete accounts file to deserialize")]
2793 fn test_deserialize_obsolete_accounts_too_large_file() {
2794 let temp_dir = tempfile::tempdir().unwrap();
2795 let bank_snapshot_dir = temp_dir.path();
2796 let num_storages = 10;
2797 let snapshot_slot = num_storages + 1 as Slot;
2798
2799 let mut snapshot_storages = Vec::new();
2801 for i in 0..num_storages {
2802 let storage = Arc::new(AccountStorageEntry::new(
2803 &PathBuf::new(),
2804 i, i as u32, 1024,
2807 AccountsFileProvider::AppendVec,
2808 StorageAccess::File,
2809 ));
2810 snapshot_storages.push(storage);
2811 }
2812
2813 write_obsolete_accounts_to_snapshot(bank_snapshot_dir, &snapshot_storages, snapshot_slot)
2815 .unwrap();
2816
2817 deserialize_obsolete_accounts(bank_snapshot_dir, 100).unwrap();
2820 }
2821}