#[cfg(all(target_os = "linux", target_env = "gnu"))]
use std::{
ffi::{CStr, CString},
path::Path,
};
use {
crate::{
bank::{Bank, BankFieldsToDeserialize, BankFieldsToSerialize, BankHashStats, BankRc},
epoch_stakes::{DeserializableVersionedEpochStakes, VersionedEpochStakes},
rent_collector::RentCollector,
runtime_config::RuntimeConfig,
snapshot_utils::StorageAndNextAccountsFileId,
stake_account::StakeAccount,
stakes::{DeserializableStakes, Stakes, serialize_stake_accounts_to_delegation_format},
},
agave_fs::FileInfo,
agave_snapshots::error::SnapshotError,
bincode::{self, Error, config::Options},
log::*,
serde::{Deserialize, Serialize, de::DeserializeOwned},
smallvec::SmallVec,
solana_accounts_db::{
ObsoleteAccounts,
account_storage_entry::AccountStorageEntry,
accounts::Accounts,
accounts_db::{
AccountsDb, AccountsDbConfig, AccountsFileId, AtomicAccountsFileId, IndexGenerationInfo,
},
accounts_file::{AccountsFile, StorageAccess},
accounts_hash::AccountsLtHash,
accounts_update_notifier_interface::AccountsUpdateNotifier,
blockhash_queue::BlockhashQueue,
},
solana_clock::{Epoch, Slot, UnixTimestamp},
solana_epoch_schedule::EpochSchedule,
solana_fee_calculator::FeeRateGovernor,
solana_genesis_config::GenesisConfig,
solana_hard_forks::HardForks,
solana_hash::Hash,
solana_inflation::Inflation,
solana_lattice_hash::lt_hash::LtHash,
solana_measure::measure::Measure,
solana_pubkey::Pubkey,
solana_serde::default_on_eof,
solana_stake_interface::state::Delegation,
std::{
collections::{HashMap, HashSet},
io::{self, BufReader, Read, Write},
path::PathBuf,
result::Result,
sync::{
Arc,
atomic::{AtomicBool, AtomicUsize, Ordering},
},
thread,
time::Instant,
},
storage::SerializableStorage,
types::SerdeAccountsLtHash,
};
mod obsolete_accounts;
mod status_cache;
mod storage;
mod tests;
mod types;
mod utils;
pub(crate) use {
obsolete_accounts::SerdeObsoleteAccountsMap,
status_cache::{deserialize_status_cache, serialize_status_cache},
storage::{SerializableAccountStorageEntry, SerializedAccountsFileId},
};
const MAX_STREAM_SIZE: u64 = 32 * 1024 * 1024 * 1024;
#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
#[derive(Debug, Deserialize)]
pub(crate) struct AccountsDbFields<T>(
Vec<(Slot, SmallVec<[T; 1]>)>,
u64, Slot,
BankHashInfo,
#[serde(deserialize_with = "default_on_eof")]
Vec<Slot>,
#[serde(deserialize_with = "default_on_eof")]
Vec<(Slot, Hash)>,
);
impl<T: SerializableStorage> AccountsDbFields<T> {
pub(crate) fn get_storage_lengths_for_snapshot_slots(
&self,
base_slot: Option<Slot>,
) -> Result<HashMap<Slot, usize>, SnapshotError> {
let AccountsDbFields(snapshot_storage, _, snapshot_slot, ..) = self;
let filtered_min_slot = base_slot.map(|slot| slot + 1).unwrap_or(Slot::MIN);
let mut lengths = HashMap::with_capacity(snapshot_storage.len());
for (slot, slot_storage) in snapshot_storage {
if slot > snapshot_slot {
return Err(SnapshotError::MismatchedSnapshotStorageSlot(
*slot,
*snapshot_slot,
));
}
if *slot < filtered_min_slot {
continue;
}
assert_eq!(slot_storage.len(), 1, "invalid storage count (slot={slot})");
let storage_entry = &slot_storage[0];
lengths.insert(*slot, storage_entry.current_len());
}
Ok(lengths)
}
}
#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
#[cfg_attr(feature = "dev-context-only-utils", derive(Default, PartialEq))]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct UnusedIncrementalSnapshotPersistence {
pub full_slot: u64,
pub full_hash: [u8; 32],
pub full_capitalization: u64,
pub incremental_hash: [u8; 32],
pub incremental_capitalization: u64,
}
#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)]
struct BankHashInfo {
unused_accounts_delta_hash: [u8; 32],
unused_accounts_hash: [u8; 32],
stats: BankHashStats,
}
#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
#[derive(Default, Clone, PartialEq, Eq, Debug, Deserialize, Serialize)]
struct UnusedAccounts {
unused1: HashSet<Pubkey>,
unused2: HashSet<Pubkey>,
unused3: HashMap<Pubkey, u64>,
}
#[derive(Clone, Deserialize)]
struct DeserializableVersionedBank {
blockhash_queue: BlockhashQueue,
_unused_ancestors: HashMap<Slot, usize>,
hash: Hash,
parent_hash: Hash,
parent_slot: Slot,
hard_forks: HardForks,
transaction_count: u64,
tick_height: u64,
signature_count: u64,
capitalization: u64,
max_tick_height: u64,
hashes_per_tick: Option<u64>,
ticks_per_slot: u64,
ns_per_slot: u128,
genesis_creation_time: UnixTimestamp,
slots_per_year: f64,
accounts_data_len: u64,
slot: Slot,
_unused_epoch: Epoch,
block_height: u64,
leader_id: Pubkey,
_unused_collector_fees: u64,
_unused_fee_calculator: u64,
fee_rate_governor: FeeRateGovernor,
_unused_collected_rent: u64,
_unused_rent_collector: RentCollector,
epoch_schedule: EpochSchedule,
inflation: Inflation,
stakes: DeserializableStakes<Delegation>,
_unused_accounts: UnusedAccounts,
unused_epoch_stakes: HashMap<Epoch, ()>,
is_delta: bool,
}
impl From<DeserializableVersionedBank> for BankFieldsToDeserialize {
fn from(dvb: DeserializableVersionedBank) -> Self {
const LT_HASH_CANARY: LtHash = LtHash([0xCAFE; LtHash::NUM_ELEMENTS]);
BankFieldsToDeserialize {
blockhash_queue: dvb.blockhash_queue,
hash: dvb.hash,
parent_hash: dvb.parent_hash,
parent_slot: dvb.parent_slot,
hard_forks: dvb.hard_forks,
transaction_count: dvb.transaction_count,
tick_height: dvb.tick_height,
signature_count: dvb.signature_count,
capitalization: dvb.capitalization,
max_tick_height: dvb.max_tick_height,
hashes_per_tick: dvb.hashes_per_tick,
ticks_per_slot: dvb.ticks_per_slot,
ns_per_slot: dvb.ns_per_slot,
genesis_creation_time: dvb.genesis_creation_time,
slots_per_year: dvb.slots_per_year,
accounts_data_len: dvb.accounts_data_len,
slot: dvb.slot,
block_height: dvb.block_height,
leader_id: dvb.leader_id,
fee_rate_governor: dvb.fee_rate_governor,
epoch_schedule: dvb.epoch_schedule,
inflation: dvb.inflation,
stakes: dvb.stakes,
is_delta: dvb.is_delta,
versioned_epoch_stakes: vec![], accounts_lt_hash: AccountsLtHash(LT_HASH_CANARY), bank_hash_stats: BankHashStats::default(), }
}
}
#[derive(Serialize)]
struct SerializableVersionedBank {
blockhash_queue: BlockhashQueue,
unused_ancestors: HashMap<Slot, usize>,
hash: Hash,
parent_hash: Hash,
parent_slot: Slot,
hard_forks: HardForks,
transaction_count: u64,
tick_height: u64,
signature_count: u64,
capitalization: u64,
max_tick_height: u64,
hashes_per_tick: Option<u64>,
ticks_per_slot: u64,
ns_per_slot: u128,
genesis_creation_time: UnixTimestamp,
slots_per_year: f64,
accounts_data_len: u64,
slot: Slot,
epoch: Epoch,
block_height: u64,
leader_id: Pubkey,
unused_collector_fees: u64,
unused_fee_calculator: u64,
fee_rate_governor: FeeRateGovernor,
unused_collected_rent: u64,
rent_collector: RentCollector,
epoch_schedule: EpochSchedule,
inflation: Inflation,
#[serde(serialize_with = "serialize_stake_accounts_to_delegation_format")]
stakes: Stakes<StakeAccount<Delegation>>,
unused_accounts: UnusedAccounts,
unused_epoch_stakes: HashMap<Epoch, ()>,
is_delta: bool,
}
impl From<BankFieldsToSerialize> for SerializableVersionedBank {
fn from(rhs: BankFieldsToSerialize) -> Self {
Self {
blockhash_queue: rhs.blockhash_queue,
unused_ancestors: HashMap::default(),
hash: rhs.hash,
parent_hash: rhs.parent_hash,
parent_slot: rhs.parent_slot,
hard_forks: rhs.hard_forks,
transaction_count: rhs.transaction_count,
tick_height: rhs.tick_height,
signature_count: rhs.signature_count,
capitalization: rhs.capitalization,
max_tick_height: rhs.max_tick_height,
hashes_per_tick: rhs.hashes_per_tick,
ticks_per_slot: rhs.ticks_per_slot,
ns_per_slot: rhs.ns_per_slot,
genesis_creation_time: rhs.genesis_creation_time,
slots_per_year: rhs.slots_per_year,
accounts_data_len: rhs.accounts_data_len,
slot: rhs.slot,
epoch: rhs.epoch,
block_height: rhs.block_height,
leader_id: rhs.leader_id,
unused_collector_fees: 0,
unused_fee_calculator: 0,
fee_rate_governor: rhs.fee_rate_governor,
unused_collected_rent: u64::default(),
rent_collector: rhs.rent_collector,
epoch_schedule: rhs.epoch_schedule,
inflation: rhs.inflation,
stakes: rhs.stakes,
unused_accounts: UnusedAccounts::default(),
unused_epoch_stakes: HashMap::default(),
is_delta: rhs.is_delta,
}
}
}
#[cfg(feature = "frozen-abi")]
impl solana_frozen_abi::abi_example::TransparentAsHelper for SerializableVersionedBank {}
pub struct SnapshotStreams<'a, R> {
pub full_snapshot_stream: &'a mut BufReader<R>,
pub incremental_snapshot_stream: Option<&'a mut BufReader<R>>,
}
#[derive(Debug)]
pub struct SnapshotBankFields {
full: BankFieldsToDeserialize,
incremental: Option<BankFieldsToDeserialize>,
}
impl SnapshotBankFields {
pub fn new(
full: BankFieldsToDeserialize,
incremental: Option<BankFieldsToDeserialize>,
) -> Self {
Self { full, incremental }
}
pub fn collapse_into(self) -> BankFieldsToDeserialize {
self.incremental.unwrap_or(self.full)
}
}
#[derive(Debug)]
pub struct SnapshotAccountsDbFields<T> {
full_snapshot_accounts_db_fields: AccountsDbFields<T>,
incremental_snapshot_accounts_db_fields: Option<AccountsDbFields<T>>,
}
impl<T> SnapshotAccountsDbFields<T> {
pub(crate) fn new(
full_snapshot_accounts_db_fields: AccountsDbFields<T>,
incremental_snapshot_accounts_db_fields: Option<AccountsDbFields<T>>,
) -> Self {
Self {
full_snapshot_accounts_db_fields,
incremental_snapshot_accounts_db_fields,
}
}
fn into_bank_hash_info(self) -> BankHashInfo {
let AccountsDbFields(
_snapshot_storages,
_snapshot_write_version,
_snapshot_slot,
snapshot_bank_hash_info,
_snapshot_historical_roots,
_snapshot_historical_roots_with_hash,
) = self
.incremental_snapshot_accounts_db_fields
.unwrap_or(self.full_snapshot_accounts_db_fields);
snapshot_bank_hash_info
}
}
pub(crate) fn serialize_into<W, T>(writer: W, value: &T) -> bincode::Result<()>
where
W: Write,
T: Serialize,
{
bincode::options()
.with_fixint_encoding()
.with_limit(MAX_STREAM_SIZE)
.serialize_into(writer, value)
}
pub(crate) fn deserialize_from<R, T>(reader: R) -> bincode::Result<T>
where
R: Read,
T: DeserializeOwned,
{
bincode::options()
.with_limit(MAX_STREAM_SIZE)
.with_fixint_encoding()
.allow_trailing_bytes()
.deserialize_from::<R, T>(reader)
}
fn deserialize_accounts_db_fields<R>(
stream: &mut BufReader<R>,
) -> Result<AccountsDbFields<SerializableAccountStorageEntry>, Error>
where
R: Read,
{
deserialize_from::<_, _>(stream)
}
#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
#[derive(Clone, Debug, Deserialize)]
struct ExtraFieldsToDeserialize {
#[serde(deserialize_with = "default_on_eof")]
lamports_per_signature: u64,
#[serde(deserialize_with = "default_on_eof")]
_unused_incremental_snapshot_persistence: Option<UnusedIncrementalSnapshotPersistence>,
#[serde(deserialize_with = "default_on_eof")]
_unused_epoch_accounts_hash: Option<Hash>,
#[serde(deserialize_with = "default_on_eof")]
versioned_epoch_stakes: Vec<(u64, DeserializableVersionedEpochStakes)>,
#[serde(deserialize_with = "default_on_eof")]
accounts_lt_hash: Option<SerdeAccountsLtHash>,
#[allow(dead_code)]
#[serde(deserialize_with = "default_on_eof")]
block_id: Option<Hash>,
}
#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
#[cfg_attr(feature = "dev-context-only-utils", derive(Default, PartialEq))]
#[derive(Debug, Serialize)]
pub struct ExtraFieldsToSerialize {
pub lamports_per_signature: u64,
pub unused_incremental_snapshot_persistence: Option<UnusedIncrementalSnapshotPersistence>,
pub unused_epoch_accounts_hash: Option<Hash>,
pub versioned_epoch_stakes: HashMap<u64, VersionedEpochStakes>,
pub accounts_lt_hash: Option<SerdeAccountsLtHash>,
}
fn deserialize_bank_fields<R>(
mut stream: &mut BufReader<R>,
) -> Result<
(
BankFieldsToDeserialize,
AccountsDbFields<SerializableAccountStorageEntry>,
),
Error,
>
where
R: Read,
{
let deserializable_bank = deserialize_from::<_, DeserializableVersionedBank>(&mut stream)?;
if !deserializable_bank.unused_epoch_stakes.is_empty() {
return Err(Box::new(bincode::ErrorKind::Custom(
"Expected deserialized bank's unused_epoch_stakes field to be empty".to_string(),
)));
}
let mut bank_fields = BankFieldsToDeserialize::from(deserializable_bank);
let accounts_db_fields = deserialize_accounts_db_fields(stream)?;
let extra_fields = deserialize_from(stream)?;
let ExtraFieldsToDeserialize {
lamports_per_signature,
_unused_incremental_snapshot_persistence,
_unused_epoch_accounts_hash,
versioned_epoch_stakes,
accounts_lt_hash,
block_id: _,
} = extra_fields;
bank_fields.fee_rate_governor = bank_fields
.fee_rate_governor
.clone_with_lamports_per_signature(lamports_per_signature);
bank_fields.versioned_epoch_stakes = versioned_epoch_stakes;
bank_fields.accounts_lt_hash = accounts_lt_hash
.expect("snapshot must have accounts_lt_hash")
.into();
Ok((bank_fields, accounts_db_fields))
}
pub(crate) fn fields_from_stream<R: Read>(
snapshot_stream: &mut BufReader<R>,
) -> std::result::Result<
(
BankFieldsToDeserialize,
AccountsDbFields<SerializableAccountStorageEntry>,
),
Error,
> {
deserialize_bank_fields(snapshot_stream)
}
#[cfg(feature = "dev-context-only-utils")]
pub(crate) fn fields_from_streams(
snapshot_streams: &mut SnapshotStreams<impl Read>,
) -> std::result::Result<
(
SnapshotBankFields,
SnapshotAccountsDbFields<SerializableAccountStorageEntry>,
),
Error,
> {
let (full_snapshot_bank_fields, full_snapshot_accounts_db_fields) =
fields_from_stream(snapshot_streams.full_snapshot_stream)?;
let (incremental_snapshot_bank_fields, incremental_snapshot_accounts_db_fields) =
snapshot_streams
.incremental_snapshot_stream
.as_mut()
.map(|stream| fields_from_stream(stream))
.transpose()?
.unzip();
let snapshot_bank_fields = SnapshotBankFields {
full: full_snapshot_bank_fields,
incremental: incremental_snapshot_bank_fields,
};
let snapshot_accounts_db_fields = SnapshotAccountsDbFields {
full_snapshot_accounts_db_fields,
incremental_snapshot_accounts_db_fields,
};
Ok((snapshot_bank_fields, snapshot_accounts_db_fields))
}
#[derive(Debug)]
pub struct BankFromStreamsInfo {
pub calculated_accounts_lt_hash: AccountsLtHash,
}
#[allow(clippy::too_many_arguments)]
#[cfg(test)]
pub(crate) fn bank_from_streams<R>(
snapshot_streams: &mut SnapshotStreams<R>,
account_paths: &[PathBuf],
storage_and_next_append_vec_id: StorageAndNextAccountsFileId,
genesis_config: &GenesisConfig,
runtime_config: &RuntimeConfig,
debug_keys: Option<Arc<HashSet<Pubkey>>>,
limit_load_slot_count_from_snapshot: Option<usize>,
verify_index: bool,
accounts_db_config: AccountsDbConfig,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: Arc<AtomicBool>,
) -> std::result::Result<(Bank, BankFromStreamsInfo), Error>
where
R: Read,
{
let (bank_fields, accounts_db_fields) = fields_from_streams(snapshot_streams)?;
let (bank, info) = reconstruct_bank_from_fields(
bank_fields,
accounts_db_fields,
genesis_config,
runtime_config,
account_paths,
storage_and_next_append_vec_id,
debug_keys,
limit_load_slot_count_from_snapshot,
verify_index,
accounts_db_config,
accounts_update_notifier,
exit,
)?;
Ok((
bank,
BankFromStreamsInfo {
calculated_accounts_lt_hash: info.calculated_accounts_lt_hash,
},
))
}
#[cfg(test)]
pub(crate) fn bank_to_stream<W>(
stream: &mut io::BufWriter<W>,
bank: &Bank,
snapshot_storages: &[Arc<AccountStorageEntry>],
) -> Result<(), Error>
where
W: Write,
{
bincode::serialize_into(
stream,
&SerializableBankAndStorage {
bank,
snapshot_storages,
},
)
}
pub fn serialize_bank_snapshot_into(
stream: &mut dyn Write,
bank_fields: BankFieldsToSerialize,
bank_hash_stats: BankHashStats,
account_storage_entries: &[Arc<AccountStorageEntry>],
extra_fields: ExtraFieldsToSerialize,
) -> Result<(), Error> {
let mut serializer = bincode::Serializer::new(
stream,
bincode::DefaultOptions::new().with_fixint_encoding(),
);
serialize_bank_snapshot_with(
&mut serializer,
bank_fields,
bank_hash_stats,
account_storage_entries,
extra_fields,
)
}
pub fn serialize_bank_snapshot_with<S>(
serializer: S,
bank_fields: BankFieldsToSerialize,
bank_hash_stats: BankHashStats,
account_storage_entries: &[Arc<AccountStorageEntry>],
extra_fields: ExtraFieldsToSerialize,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let slot = bank_fields.slot;
let serializable_bank = SerializableVersionedBank::from(bank_fields);
let serializable_accounts_db = SerializableAccountsDb::<'_> {
slot,
account_storage_entries,
bank_hash_stats,
};
(serializable_bank, serializable_accounts_db, extra_fields).serialize(serializer)
}
#[cfg(test)]
struct SerializableBankAndStorage<'a> {
bank: &'a Bank,
snapshot_storages: &'a [Arc<AccountStorageEntry>],
}
#[cfg(test)]
impl Serialize for SerializableBankAndStorage<'_> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
let slot = self.bank.slot();
let mut bank_fields = self.bank.get_fields_to_serialize();
let bank_hash_stats = self.bank.get_bank_hash_stats();
let lamports_per_signature = bank_fields.fee_rate_governor.lamports_per_signature;
let versioned_epoch_stakes = std::mem::take(&mut bank_fields.versioned_epoch_stakes);
let accounts_lt_hash = Some(bank_fields.accounts_lt_hash.clone().into());
let bank_fields_to_serialize = (
SerializableVersionedBank::from(bank_fields),
SerializableAccountsDb::<'_> {
slot,
account_storage_entries: self.snapshot_storages,
bank_hash_stats,
},
ExtraFieldsToSerialize {
lamports_per_signature,
unused_incremental_snapshot_persistence: None,
unused_epoch_accounts_hash: None,
versioned_epoch_stakes,
accounts_lt_hash,
},
);
bank_fields_to_serialize.serialize(serializer)
}
}
#[cfg(test)]
struct SerializableBankAndStorageNoExtra<'a> {
bank: &'a Bank,
snapshot_storages: &'a [Arc<AccountStorageEntry>],
}
#[cfg(test)]
impl Serialize for SerializableBankAndStorageNoExtra<'_> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
let slot = self.bank.slot();
let bank_fields = self.bank.get_fields_to_serialize();
let bank_hash_stats = self.bank.get_bank_hash_stats();
(
SerializableVersionedBank::from(bank_fields),
SerializableAccountsDb::<'_> {
slot,
account_storage_entries: self.snapshot_storages,
bank_hash_stats,
},
)
.serialize(serializer)
}
}
#[cfg(test)]
impl<'a> From<SerializableBankAndStorageNoExtra<'a>> for SerializableBankAndStorage<'a> {
fn from(s: SerializableBankAndStorageNoExtra<'a>) -> SerializableBankAndStorage<'a> {
let SerializableBankAndStorageNoExtra {
bank,
snapshot_storages,
} = s;
SerializableBankAndStorage {
bank,
snapshot_storages,
}
}
}
struct SerializableAccountsDb<'a> {
slot: Slot,
account_storage_entries: &'a [Arc<AccountStorageEntry>],
bank_hash_stats: BankHashStats,
}
impl Serialize for SerializableAccountsDb<'_> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
let entries = utils::serialize_iter_as_map(self.account_storage_entries.iter().map(|x| {
(
x.slot(),
utils::serialize_iter_as_seq(
[x].into_iter()
.map(|x| SerializableAccountStorageEntry::new(x, self.slot)),
),
)
}));
let bank_hash_info = BankHashInfo {
unused_accounts_delta_hash: [0; 32],
unused_accounts_hash: [0; 32],
stats: self.bank_hash_stats.clone(),
};
let historical_roots = Vec::<Slot>::default();
let historical_roots_with_hash = Vec::<(Slot, Hash)>::default();
let mut serialize_account_storage_timer = Measure::start("serialize_account_storage_ms");
let result = (
entries,
0u64, self.slot,
bank_hash_info,
historical_roots,
historical_roots_with_hash,
)
.serialize(serializer);
serialize_account_storage_timer.stop();
datapoint_info!(
"serialize_account_storage_ms",
("duration", serialize_account_storage_timer.as_ms(), i64),
("num_entries", self.account_storage_entries.len(), i64),
);
result
}
}
#[cfg(feature = "frozen-abi")]
impl solana_frozen_abi::abi_example::TransparentAsHelper for SerializableAccountsDb<'_> {}
#[derive(Debug)]
pub(crate) struct ReconstructedBankInfo {
pub(crate) calculated_accounts_lt_hash: AccountsLtHash,
pub(crate) calculated_capitalization: u64,
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn reconstruct_bank_from_fields<E>(
bank_fields: SnapshotBankFields,
snapshot_accounts_db_fields: SnapshotAccountsDbFields<E>,
genesis_config: &GenesisConfig,
runtime_config: &RuntimeConfig,
account_paths: &[PathBuf],
storage_and_next_append_vec_id: StorageAndNextAccountsFileId,
debug_keys: Option<Arc<HashSet<Pubkey>>>,
limit_load_slot_count_from_snapshot: Option<usize>,
verify_index: bool,
accounts_db_config: AccountsDbConfig,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: Arc<AtomicBool>,
) -> Result<(Bank, ReconstructedBankInfo), Error>
where
E: SerializableStorage + std::marker::Sync,
{
let mut bank_fields = bank_fields.collapse_into();
let deserializable_epoch_stakes = std::mem::take(&mut bank_fields.versioned_epoch_stakes);
let epoch_stakes_handle = thread::Builder::new()
.name("solRctEpochStk".into())
.spawn(|| {
deserializable_epoch_stakes
.into_iter()
.map(|(epoch, stakes)| (epoch, stakes.into()))
.collect()
})?;
let (accounts_db, reconstructed_accounts_db_info) = reconstruct_accountsdb_from_fields(
snapshot_accounts_db_fields,
account_paths,
storage_and_next_append_vec_id,
limit_load_slot_count_from_snapshot,
verify_index,
accounts_db_config,
accounts_update_notifier,
exit,
)?;
bank_fields.bank_hash_stats = reconstructed_accounts_db_info.bank_hash_stats;
let bank_rc = BankRc::new(Accounts::new(Arc::new(accounts_db)));
let runtime_config = Arc::new(runtime_config.clone());
let epoch_stakes = epoch_stakes_handle.join().expect("calculate epoch stakes");
let bank = Bank::new_from_snapshot(
bank_rc,
genesis_config,
runtime_config,
bank_fields,
debug_keys,
reconstructed_accounts_db_info.accounts_data_len,
epoch_stakes,
);
info!("rent_collector: {:?}", bank.rent_collector());
Ok((
bank,
ReconstructedBankInfo {
calculated_accounts_lt_hash: reconstructed_accounts_db_info.calculated_accounts_lt_hash,
calculated_capitalization: reconstructed_accounts_db_info.calculated_capitalization,
},
))
}
pub(crate) fn reconstruct_single_storage(
slot: &Slot,
append_vec_file_info: FileInfo,
current_len: usize,
id: AccountsFileId,
storage_access: StorageAccess,
obsolete_accounts: Option<(ObsoleteAccounts, AccountsFileId, usize)>,
) -> Result<Arc<AccountStorageEntry>, SnapshotError> {
let (current_len, obsolete_accounts) = if let Some(obsolete_accounts) = obsolete_accounts {
let updated_len = current_len + obsolete_accounts.2;
if obsolete_accounts.1 != id {
return Err(SnapshotError::MismatchedAccountsFileId(
id,
obsolete_accounts.1,
));
}
(updated_len, obsolete_accounts.0)
} else {
(current_len, ObsoleteAccounts::default())
};
let accounts_file =
AccountsFile::new_for_startup(append_vec_file_info, current_len, storage_access)?;
Ok(Arc::new(AccountStorageEntry::new_existing(
*slot,
id,
accounts_file,
obsolete_accounts,
)))
}
pub(crate) fn remap_append_vec_file(
slot: Slot,
old_append_vec_id: SerializedAccountsFileId,
append_vec_file_info: FileInfo,
next_append_vec_id: &AtomicAccountsFileId,
num_collisions: &AtomicUsize,
) -> io::Result<(AccountsFileId, FileInfo)> {
#[cfg(all(target_os = "linux", target_env = "gnu"))]
let append_vec_path_cstr = cstring_from_path(&append_vec_file_info.path)?;
let mut remapped_append_vec_path = append_vec_file_info.path.clone();
let (remapped_append_vec_id, remapped_append_vec_path) = loop {
let remapped_append_vec_id = next_append_vec_id.fetch_add(1, Ordering::AcqRel);
if old_append_vec_id == remapped_append_vec_id as SerializedAccountsFileId {
break (remapped_append_vec_id, remapped_append_vec_path);
}
let remapped_file_name = AccountsFile::file_name(slot, remapped_append_vec_id);
remapped_append_vec_path = remapped_append_vec_path
.parent()
.unwrap()
.join(remapped_file_name);
#[cfg(all(target_os = "linux", target_env = "gnu"))]
{
let remapped_append_vec_path_cstr = cstring_from_path(&remapped_append_vec_path)?;
match rename_no_replace(&append_vec_path_cstr, &remapped_append_vec_path_cstr) {
Ok(_) => break (remapped_append_vec_id, remapped_append_vec_path),
Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {}
Err(e) => return Err(e),
}
}
#[cfg(any(
not(target_os = "linux"),
all(target_os = "linux", not(target_env = "gnu"))
))]
if std::fs::metadata(&remapped_append_vec_path).is_err() {
break (remapped_append_vec_id, remapped_append_vec_path);
}
num_collisions.fetch_add(1, Ordering::Relaxed);
};
#[cfg(any(
not(target_os = "linux"),
all(target_os = "linux", not(target_env = "gnu"))
))]
if old_append_vec_id != remapped_append_vec_id as SerializedAccountsFileId {
std::fs::rename(&append_vec_file_info.path, &remapped_append_vec_path)?;
}
Ok((
remapped_append_vec_id,
FileInfo {
path: remapped_append_vec_path,
..append_vec_file_info
},
))
}
pub(crate) fn remap_and_reconstruct_single_storage(
slot: Slot,
old_append_vec_id: SerializedAccountsFileId,
current_len: usize,
append_vec_file_info: FileInfo,
next_append_vec_id: &AtomicAccountsFileId,
num_collisions: &AtomicUsize,
storage_access: StorageAccess,
) -> Result<Arc<AccountStorageEntry>, SnapshotError> {
let (remapped_append_vec_id, remapped_append_vec_file_info) = remap_append_vec_file(
slot,
old_append_vec_id,
append_vec_file_info,
next_append_vec_id,
num_collisions,
)?;
let storage = reconstruct_single_storage(
&slot,
remapped_append_vec_file_info,
current_len,
remapped_append_vec_id,
storage_access,
None,
)?;
Ok(storage)
}
#[derive(Debug)]
pub struct ReconstructedAccountsDbInfo {
pub accounts_data_len: u64,
pub calculated_accounts_lt_hash: AccountsLtHash,
pub calculated_capitalization: u64,
pub bank_hash_stats: BankHashStats,
}
#[allow(clippy::too_many_arguments)]
fn reconstruct_accountsdb_from_fields<E>(
snapshot_accounts_db_fields: SnapshotAccountsDbFields<E>,
account_paths: &[PathBuf],
storage_and_next_append_vec_id: StorageAndNextAccountsFileId,
limit_load_slot_count_from_snapshot: Option<usize>,
verify_index: bool,
accounts_db_config: AccountsDbConfig,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: Arc<AtomicBool>,
) -> Result<(AccountsDb, ReconstructedAccountsDbInfo), Error>
where
E: SerializableStorage + std::marker::Sync,
{
let mut accounts_db = AccountsDb::new_with_config(
account_paths.to_vec(),
accounts_db_config,
accounts_update_notifier,
exit,
);
let snapshot_bank_hash_info = snapshot_accounts_db_fields.into_bank_hash_info();
for path in &accounts_db.paths {
std::fs::create_dir_all(path)
.unwrap_or_else(|err| panic!("Failed to create directory {}: {}", path.display(), err));
}
let StorageAndNextAccountsFileId {
storage,
next_append_vec_id,
} = storage_and_next_append_vec_id;
assert!(
!storage.is_empty(),
"At least one storage entry must exist from deserializing stream"
);
let next_append_vec_id = next_append_vec_id.load(Ordering::Acquire);
let max_append_vec_id = next_append_vec_id - 1;
assert!(
max_append_vec_id <= AccountsFileId::MAX / 2,
"Storage id {max_append_vec_id} larger than allowed max"
);
accounts_db.storage.initialize(storage);
accounts_db
.next_id
.store(next_append_vec_id, Ordering::Release);
info!("Building accounts index...");
let start = Instant::now();
let IndexGenerationInfo {
accounts_data_len,
calculated_accounts_lt_hash,
calculated_capitalization,
} = accounts_db.generate_index(limit_load_slot_count_from_snapshot, verify_index);
info!("Building accounts index... Done in {:?}", start.elapsed());
Ok((
accounts_db,
ReconstructedAccountsDbInfo {
accounts_data_len,
calculated_accounts_lt_hash,
calculated_capitalization,
bank_hash_stats: snapshot_bank_hash_info.stats,
},
))
}
#[cfg(all(target_os = "linux", target_env = "gnu"))]
fn rename_no_replace(src: &CStr, dest: &CStr) -> io::Result<()> {
let ret = unsafe {
libc::renameat2(
libc::AT_FDCWD,
src.as_ptr() as *const _,
libc::AT_FDCWD,
dest.as_ptr() as *const _,
libc::RENAME_NOREPLACE,
)
};
if ret == -1 {
return Err(io::Error::last_os_error());
}
Ok(())
}
#[cfg(all(target_os = "linux", target_env = "gnu"))]
fn cstring_from_path(path: &Path) -> io::Result<CString> {
CString::new(path.as_os_str().as_encoded_bytes())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
}