gemachain_runtime/
snapshot_package.rs

1use crate::{
2    accounts_db::SnapshotStorages,
3    bank::{Bank, BankSlotDelta},
4};
5use crate::{
6    snapshot_archive_info::{SnapshotArchiveInfo, SnapshotArchiveInfoGetter},
7    snapshot_utils::{
8        self, ArchiveFormat, BankSnapshotInfo, Result, SnapshotVersion, TMP_BANK_SNAPSHOT_PREFIX,
9    },
10};
11use log::*;
12use gemachain_sdk::clock::Slot;
13use gemachain_sdk::genesis_config::ClusterType;
14use gemachain_sdk::hash::Hash;
15use std::{
16    fs,
17    path::{Path, PathBuf},
18    sync::{
19        mpsc::{Receiver, SendError, Sender},
20        Arc, Mutex,
21    },
22};
23use tempfile::TempDir;
24
25/// The sender side of the AccountsPackage channel, used by AccountsBackgroundService
26pub type AccountsPackageSender = Sender<AccountsPackage>;
27
28/// The receiver side of the AccountsPackage channel, used by AccountsHashVerifier
29pub type AccountsPackageReceiver = Receiver<AccountsPackage>;
30
31/// The error type when sending an AccountsPackage over the channel fails
32pub type AccountsPackageSendError = SendError<AccountsPackage>;
33
34/// The PendingSnapshotPackage passes a SnapshotPackage from AccountsHashVerifier to
35/// SnapshotPackagerService for archiving
36pub type PendingSnapshotPackage = Arc<Mutex<Option<SnapshotPackage>>>;
37
38#[derive(Debug)]
39pub struct AccountsPackage {
40    pub slot: Slot,
41    pub block_height: Slot,
42    pub slot_deltas: Vec<BankSlotDelta>,
43    pub snapshot_links: TempDir,
44    pub snapshot_storages: SnapshotStorages,
45    pub hash: Hash, // temporarily here while we still have to calculate hash before serializing bank
46    pub archive_format: ArchiveFormat,
47    pub snapshot_version: SnapshotVersion,
48    pub snapshot_archives_dir: PathBuf,
49    pub expected_capitalization: u64,
50    pub hash_for_testing: Option<Hash>,
51    pub cluster_type: ClusterType,
52    pub snapshot_type: Option<SnapshotType>,
53}
54
55impl AccountsPackage {
56    /// Package up bank files, storages, and slot deltas for a snapshot
57    #[allow(clippy::too_many_arguments)]
58    pub fn new(
59        bank: &Bank,
60        bank_snapshot_info: &BankSnapshotInfo,
61        bank_snapshots_dir: impl AsRef<Path>,
62        slot_deltas: Vec<BankSlotDelta>,
63        snapshot_archives_dir: impl AsRef<Path>,
64        snapshot_storages: SnapshotStorages,
65        archive_format: ArchiveFormat,
66        snapshot_version: SnapshotVersion,
67        hash_for_testing: Option<Hash>,
68        snapshot_type: Option<SnapshotType>,
69    ) -> Result<Self> {
70        info!(
71            "Package snapshot for bank {} has {} account storage entries (snapshot type: {:?})",
72            bank.slot(),
73            snapshot_storages.len(),
74            snapshot_type,
75        );
76
77        if let Some(SnapshotType::IncrementalSnapshot(incremental_snapshot_base_slot)) =
78            snapshot_type
79        {
80            assert!(
81                bank.slot() > incremental_snapshot_base_slot,
82                "Incremental snapshot base slot must be less than the bank being snapshotted!"
83            );
84            assert!(
85            snapshot_storages.iter().all(|storage| storage
86                .iter()
87                .all(|entry| entry.slot() > incremental_snapshot_base_slot)),
88            "Incremental snapshot package must only contain storage entries where slot > incremental snapshot base slot (i.e. full snapshot slot)!"
89            );
90        }
91
92        // Hard link the snapshot into a tmpdir, to ensure its not removed prior to packaging.
93        let snapshot_links = tempfile::Builder::new()
94            .prefix(&format!("{}{}-", TMP_BANK_SNAPSHOT_PREFIX, bank.slot()))
95            .tempdir_in(bank_snapshots_dir)?;
96        {
97            let snapshot_hardlink_dir = snapshot_links
98                .path()
99                .join(bank_snapshot_info.slot.to_string());
100            fs::create_dir_all(&snapshot_hardlink_dir)?;
101            fs::hard_link(
102                &bank_snapshot_info.snapshot_path,
103                &snapshot_hardlink_dir.join(bank_snapshot_info.slot.to_string()),
104            )?;
105        }
106
107        Ok(Self {
108            slot: bank.slot(),
109            block_height: bank.block_height(),
110            slot_deltas,
111            snapshot_links,
112            snapshot_storages,
113            hash: bank.get_accounts_hash(),
114            archive_format,
115            snapshot_version,
116            snapshot_archives_dir: snapshot_archives_dir.as_ref().to_path_buf(),
117            expected_capitalization: bank.capitalization(),
118            hash_for_testing,
119            cluster_type: bank.cluster_type(),
120            snapshot_type,
121        })
122    }
123}
124
125pub struct SnapshotPackage {
126    pub snapshot_archive_info: SnapshotArchiveInfo,
127    pub block_height: Slot,
128    pub slot_deltas: Vec<BankSlotDelta>,
129    pub snapshot_links: TempDir,
130    pub snapshot_storages: SnapshotStorages,
131    pub snapshot_version: SnapshotVersion,
132    pub snapshot_type: SnapshotType,
133}
134
135impl From<AccountsPackage> for SnapshotPackage {
136    fn from(accounts_package: AccountsPackage) -> Self {
137        assert!(
138            accounts_package.snapshot_type.is_some(),
139            "Cannot make a SnapshotPackage from an AccountsPackage when SnapshotType is None!"
140        );
141
142        let snapshot_archive_path = match accounts_package.snapshot_type.unwrap() {
143            SnapshotType::FullSnapshot => snapshot_utils::build_full_snapshot_archive_path(
144                accounts_package.snapshot_archives_dir,
145                accounts_package.slot,
146                &accounts_package.hash,
147                accounts_package.archive_format,
148            ),
149            SnapshotType::IncrementalSnapshot(incremental_snapshot_base_slot) => {
150                snapshot_utils::build_incremental_snapshot_archive_path(
151                    accounts_package.snapshot_archives_dir,
152                    incremental_snapshot_base_slot,
153                    accounts_package.slot,
154                    &accounts_package.hash,
155                    accounts_package.archive_format,
156                )
157            }
158        };
159
160        Self {
161            snapshot_archive_info: SnapshotArchiveInfo {
162                path: snapshot_archive_path,
163                slot: accounts_package.slot,
164                hash: accounts_package.hash,
165                archive_format: accounts_package.archive_format,
166            },
167            block_height: accounts_package.block_height,
168            slot_deltas: accounts_package.slot_deltas,
169            snapshot_links: accounts_package.snapshot_links,
170            snapshot_storages: accounts_package.snapshot_storages,
171            snapshot_version: accounts_package.snapshot_version,
172            snapshot_type: accounts_package.snapshot_type.unwrap(),
173        }
174    }
175}
176
177impl SnapshotArchiveInfoGetter for SnapshotPackage {
178    fn snapshot_archive_info(&self) -> &SnapshotArchiveInfo {
179        &self.snapshot_archive_info
180    }
181}
182
183/// Snapshots come in two flavors, Full and Incremental.  The IncrementalSnapshot has a Slot field,
184/// which is the incremental snapshot base slot.
185#[derive(Clone, Copy, Debug, Eq, PartialEq)]
186pub enum SnapshotType {
187    FullSnapshot,
188    IncrementalSnapshot(Slot),
189}
190
191impl SnapshotType {
192    pub fn is_full_snapshot(&self) -> bool {
193        matches!(self, SnapshotType::FullSnapshot)
194    }
195    pub fn is_incremental_snapshot(&self) -> bool {
196        matches!(self, SnapshotType::IncrementalSnapshot(_))
197    }
198}