mod blob_cache;
mod lmdb_ext;
#[cfg(test)]
mod tests;
#[cfg(test)]
use std::collections::BTreeSet;
use std::{
collections::{btree_map::Entry, BTreeMap, HashSet},
convert::TryFrom,
fmt::{self, Display, Formatter},
fs, io, mem,
path::{Path, PathBuf},
sync::Arc,
};
use datasize::DataSize;
use derive_more::From;
use lmdb::{
Cursor, Database, DatabaseFlags, Environment, EnvironmentFlags, RwTransaction, Transaction,
WriteFlags,
};
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use static_assertions::const_assert;
#[cfg(test)]
use tempfile::TempDir;
use thiserror::Error;
use tracing::{debug, error, info, warn};
use casper_hashing::Digest;
use casper_types::{
bytesrepr::{FromBytes, ToBytes},
EraId, ExecutionResult, ProtocolVersion, PublicKey, Transfer, Transform,
};
use crate::{
components::Component,
crypto,
effect::{
requests::{StateStoreRequest, StorageRequest},
EffectBuilder, EffectExt, Effects,
},
fatal,
reactor::ReactorEvent,
types::{
error::BlockValidationError, Block, BlockBody, BlockHash, BlockHeader,
BlockHeaderWithMetadata, BlockSignatures, Deploy, DeployHash, DeployHeader, DeployMetadata,
DeployWithFinalizedApprovals, FinalizedApprovals, HashingAlgorithmVersion, Item,
MerkleBlockBody, MerkleBlockBodyPart, MerkleLinkedListNode, SharedObject, TimeDiff,
},
utils::{display_error, WithDir},
NodeRng,
};
use blob_cache::BlobCache;
use lmdb_ext::{LmdbExtError, TransactionExt, WriteTransactionExt};
const STORAGE_DB_FILENAME: &str = "storage.lmdb";
const MAX_TRANSACTIONS: u32 = 1;
const GIB: usize = 1024 * 1024 * 1024;
const DEFAULT_MAX_BLOCK_STORE_SIZE: usize = 450 * GIB;
const DEFAULT_MAX_DEPLOY_STORE_SIZE: usize = 300 * GIB;
const DEFAULT_MAX_DEPLOY_METADATA_STORE_SIZE: usize = 300 * GIB;
const DEFAULT_MAX_STATE_STORE_SIZE: usize = 10 * GIB;
const MAX_DB_COUNT: u32 = 12;
#[cfg(not(target_os = "macos"))]
const OS_FLAGS: EnvironmentFlags = EnvironmentFlags::WRITE_MAP;
#[cfg(target_os = "macos")]
const OS_FLAGS: EnvironmentFlags = EnvironmentFlags::empty();
const _STORAGE_EVENT_SIZE: usize = mem::size_of::<Event>();
const_assert!(_STORAGE_EVENT_SIZE <= 96);
const STORAGE_FILES: [&str; 5] = [
"data.lmdb",
"data.lmdb-lock",
"storage.lmdb",
"storage.lmdb-lock",
"sse_index",
];
#[derive(Debug, From, Serialize)]
#[repr(u8)]
pub(crate) enum Event {
#[from]
StorageRequest(StorageRequest),
#[from]
StateStoreRequest(StateStoreRequest),
}
#[derive(Debug, Error)]
pub enum Error {
#[error("failed to create database directory `{}`: {}", .0.display(), .1)]
CreateDatabaseDirectory(PathBuf, io::Error),
#[error("duplicate entries for block at height {height}: {first} / {second}")]
DuplicateBlockIndex {
height: u64,
first: BlockHash,
second: BlockHash,
},
#[error("duplicate entries for switch block at era id {era_id}: {first} / {second}")]
DuplicateEraIdIndex {
era_id: EraId,
first: BlockHash,
second: BlockHash,
},
#[error("duplicate entries for blocks for deploy {deploy_hash}: {first} / {second}")]
DuplicateDeployIndex {
deploy_hash: DeployHash,
first: BlockHash,
second: BlockHash,
},
#[error("internal database error: {0}")]
InternalStorage(#[from] LmdbExtError),
#[error("unable to move file {source_path} to {dest_path}: {original_error}")]
UnableToMoveFile {
source_path: PathBuf,
dest_path: PathBuf,
original_error: io::Error,
},
#[error("expected files to exist: {missing_files:?}.")]
MissingStorageFiles {
missing_files: Vec<PathBuf>,
},
#[error(transparent)]
BlockValidation(#[from] BlockValidationError),
#[error(
"Block header not stored under its hash. \
Queried block hash bytes: {queried_block_hash_bytes:x?}, \
Found block header hash bytes: {found_block_header_hash:x?}, \
Block header: {block_header}"
)]
BlockHeaderNotStoredUnderItsHash {
queried_block_hash_bytes: Vec<u8>,
found_block_header_hash: BlockHash,
block_header: Box<BlockHeader>,
},
#[error(
"No block header corresponding to block body found in LMDB. \
Block body hash: {block_body_hash:?}, \
Hashing algorithm version: {hashing_algorithm_version:?}, \
Block body: {block_body:?}"
)]
NoBlockHeaderForBlockBody {
block_body_hash: Digest,
hashing_algorithm_version: HashingAlgorithmVersion,
block_body: Box<BlockBody>,
},
#[error(
"Unexpected hashing algorithm version. \
Expected: {expected_hashing_algorithm_version:?}, \
Actual: {actual_hashing_algorithm_version:?}"
)]
UnexpectedHashingAlgorithmVersion {
expected_hashing_algorithm_version: HashingAlgorithmVersion,
actual_hashing_algorithm_version: HashingAlgorithmVersion,
},
#[error(
"Could not find block body part with Merkle linked list node hash: \
{merkle_linked_list_node_hash:?}"
)]
CouldNotFindBlockBodyPart {
block_hash: BlockHash,
merkle_linked_list_node_hash: Digest,
},
#[error("{0} in signature verification. Database is corrupted.")]
SignatureVerification(crypto::Error),
#[error(
"Block signatures not indexed by their block hash. \
Key bytes in LMDB: {raw_key:x?}, \
Block hash bytes in record: {block_hash_bytes:x?}"
)]
CorruptedBlockSignatureIndex {
raw_key: Vec<u8>,
block_hash_bytes: Vec<u8>,
},
#[error("Non-existent block body referred to by header. Block header: {0:?}")]
NonExistentBlockBodyReferredToByHeader(Box<BlockHeader>),
#[error(
"Non-existent deploy or transfer in block. \
Deploy hash: {deploy_hash:?}, \
Block: {block:?}"
)]
NonExistentDeploy {
deploy_hash: DeployHash,
block: Box<Block>,
},
#[error(
"Found an unexpected part of a block body in the database: \
{part_hash:?}"
)]
UnexpectedBlockBodyPart {
block_body_hash: Digest,
part_hash: Digest,
},
#[error(
"Tried to store FinalizedApprovals for a nonexistent deploy. \
Deploy hash: {deploy_hash:?}"
)]
UnexpectedFinalizedApprovals {
deploy_hash: DeployHash,
},
}
impl From<lmdb::Error> for Error {
fn from(err: lmdb::Error) -> Self {
LmdbExtError::from(err).into()
}
}
#[derive(DataSize, Debug)]
pub struct Storage {
root: PathBuf,
#[data_size(skip)]
env: Environment,
#[data_size(skip)]
block_header_db: Database,
#[data_size(skip)]
block_body_v1_db: Database,
#[data_size(skip)]
block_body_v2_db: Database,
#[data_size(skip)]
deploy_hashes_db: Database,
#[data_size(skip)]
transfer_hashes_db: Database,
#[data_size(skip)]
proposer_db: Database,
#[data_size(skip)]
block_metadata_db: Database,
#[data_size(skip)]
deploy_db: Database,
#[data_size(skip)]
deploy_metadata_db: Database,
#[data_size(skip)]
transfer_db: Database,
#[data_size(skip)]
state_store_db: Database,
#[data_size(skip)]
finalized_approvals_db: Database,
block_height_index: BTreeMap<u64, BlockHash>,
switch_block_era_id_index: BTreeMap<EraId, BlockHash>,
deploy_hash_index: BTreeMap<DeployHash, BlockHash>,
enable_mem_deduplication: bool,
deploy_cache: BlobCache<<Deploy as Item>::Id>,
}
impl<REv> Component<REv> for Storage
where
REv: ReactorEvent,
{
type Event = Event;
type ConstructionError = Error;
fn handle_event(
&mut self,
effect_builder: EffectBuilder<REv>,
_rng: &mut NodeRng,
event: Self::Event,
) -> Effects<Self::Event> {
let result = match event {
Event::StorageRequest(req) => self.handle_storage_request::<REv>(req),
Event::StateStoreRequest(req) => {
self.handle_state_store_request::<REv>(effect_builder, req)
}
};
match result {
Ok(effects) => effects,
Err(err) => fatal!(effect_builder, "storage error: {}", err).ignore(),
}
}
}
impl Storage {
pub fn new(
cfg: &WithDir<Config>,
hard_reset_to_start_of_era: Option<EraId>,
protocol_version: ProtocolVersion,
should_check_integrity: bool,
network_name: &str,
) -> Result<Self, Error> {
let config = cfg.value();
let mut root = cfg.with_dir(config.path.clone());
let network_subdir = root.join(network_name);
if !network_subdir.exists() {
fs::create_dir_all(&network_subdir)
.map_err(|err| Error::CreateDatabaseDirectory(network_subdir.clone(), err))?;
}
if should_move_storage_files_to_network_subdir(&root, &STORAGE_FILES)? {
move_storage_files_to_network_subdir(&root, &network_subdir, &STORAGE_FILES)?;
}
root = network_subdir;
let total_size = config
.max_block_store_size
.saturating_add(config.max_deploy_store_size)
.saturating_add(config.max_deploy_metadata_store_size);
let env = Environment::new()
.set_flags(
OS_FLAGS
| EnvironmentFlags::NO_SUB_DIR
| EnvironmentFlags::NO_TLS
| EnvironmentFlags::NO_READAHEAD,
)
.set_max_readers(MAX_TRANSACTIONS)
.set_max_dbs(MAX_DB_COUNT)
.set_map_size(total_size)
.open(&root.join(STORAGE_DB_FILENAME))?;
let block_header_db = env.create_db(Some("block_header"), DatabaseFlags::empty())?;
let block_metadata_db = env.create_db(Some("block_metadata"), DatabaseFlags::empty())?;
let deploy_db = env.create_db(Some("deploys"), DatabaseFlags::empty())?;
let deploy_metadata_db = env.create_db(Some("deploy_metadata"), DatabaseFlags::empty())?;
let transfer_db = env.create_db(Some("transfer"), DatabaseFlags::empty())?;
let state_store_db = env.create_db(Some("state_store"), DatabaseFlags::empty())?;
let finalized_approvals_db =
env.create_db(Some("finalized_approvals"), DatabaseFlags::empty())?;
let block_body_v1_db = env.create_db(Some("block_body"), DatabaseFlags::empty())?;
let block_body_v2_db = env.create_db(Some("block_body_merkle"), DatabaseFlags::empty())?;
let deploy_hashes_db = env.create_db(Some("deploy_hashes"), DatabaseFlags::empty())?;
let transfer_hashes_db = env.create_db(Some("transfer_hashes"), DatabaseFlags::empty())?;
let proposer_db = env.create_db(Some("proposers"), DatabaseFlags::empty())?;
info!("reindexing block store");
let mut block_height_index = BTreeMap::new();
let mut switch_block_era_id_index = BTreeMap::new();
let mut deploy_hash_index = BTreeMap::new();
let mut block_txn = env.begin_rw_txn()?;
let mut cursor = block_txn.open_rw_cursor(block_header_db)?;
let mut deleted_block_hashes = HashSet::new();
let mut deleted_block_body_hashes_v1 = HashSet::new();
for (raw_key, raw_val) in cursor.iter() {
let block_header: BlockHeader = lmdb_ext::deserialize(raw_val)?;
if let Some(invalid_era) = hard_reset_to_start_of_era {
if block_header.era_id() >= invalid_era
&& block_header.protocol_version() < protocol_version
{
if block_header.hashing_algorithm_version() == HashingAlgorithmVersion::V1 {
let _ = deleted_block_body_hashes_v1.insert(*block_header.body_hash());
}
let _ = deleted_block_hashes.insert(block_header.hash());
cursor.del(WriteFlags::empty())?;
continue;
}
}
if should_check_integrity {
let found_block_header_hash = block_header.hash();
if raw_key != found_block_header_hash.as_ref() {
return Err(Error::BlockHeaderNotStoredUnderItsHash {
queried_block_hash_bytes: raw_key.to_vec(),
found_block_header_hash,
block_header: Box::new(block_header),
});
}
}
insert_to_block_header_indices(
&mut block_height_index,
&mut switch_block_era_id_index,
&block_header,
)?;
let mut body_txn = env.begin_ro_txn()?;
let block_body: BlockBody = match block_header.hashing_algorithm_version() {
HashingAlgorithmVersion::V1 => body_txn
.get_value(block_body_v1_db, block_header.body_hash())?
.ok_or_else(|| {
Error::NonExistentBlockBodyReferredToByHeader(Box::new(
block_header.clone(),
))
})?,
HashingAlgorithmVersion::V2 => get_single_block_body_v2(
&mut body_txn,
block_body_v2_db,
deploy_hashes_db,
transfer_hashes_db,
proposer_db,
block_header.body_hash(),
)?
.ok_or_else(|| {
Error::NonExistentBlockBodyReferredToByHeader(Box::new(block_header.clone()))
})?,
};
if should_check_integrity {
Block::new_from_header_and_body(block_header.clone(), block_body.clone())?;
}
insert_to_deploy_index(&mut deploy_hash_index, block_header.hash(), &block_body)?;
}
info!("block store reindexing complete");
drop(cursor);
block_txn.commit()?;
let deleted_block_hashes_raw = deleted_block_hashes.iter().map(BlockHash::as_ref).collect();
initialize_block_body_v1_db(
&env,
&block_header_db,
&block_body_v1_db,
&deleted_block_body_hashes_v1
.iter()
.map(Digest::as_ref)
.collect(),
should_check_integrity,
)?;
initialize_block_body_v2_db(
&env,
&block_header_db,
&block_body_v2_db,
&deploy_hashes_db,
&transfer_hashes_db,
&proposer_db,
should_check_integrity,
)?;
initialize_block_metadata_db(
&env,
&block_metadata_db,
&deleted_block_hashes_raw,
should_check_integrity,
)?;
initialize_deploy_metadata_db(&env, &deploy_metadata_db, &deleted_block_hashes)?;
Ok(Storage {
root,
env,
block_header_db,
block_body_v1_db,
block_body_v2_db,
deploy_hashes_db,
transfer_hashes_db,
proposer_db,
block_metadata_db,
deploy_db,
deploy_metadata_db,
transfer_db,
state_store_db,
finalized_approvals_db,
block_height_index,
switch_block_era_id_index,
deploy_hash_index,
enable_mem_deduplication: config.enable_mem_deduplication,
deploy_cache: BlobCache::new(config.mem_pool_prune_interval),
})
}
fn handle_state_store_request<REv>(
&mut self,
_effect_builder: EffectBuilder<REv>,
req: StateStoreRequest,
) -> Result<Effects<Event>, Error>
where
Self: Component<REv>,
{
match req {
StateStoreRequest::Save {
key,
data,
responder,
} => {
let mut txn = self.env.begin_rw_txn()?;
txn.put(self.state_store_db, &key, &data, WriteFlags::default())?;
txn.commit()?;
Ok(responder.respond(()).ignore())
}
StateStoreRequest::Load { key, responder } => {
let bytes = self.read_state_store(&key)?;
Ok(responder.respond(bytes).ignore())
}
}
}
pub(crate) fn read_state_store<K>(&self, key: &K) -> Result<Option<Vec<u8>>, Error>
where
K: AsRef<[u8]>,
{
let txn = self.env.begin_ro_txn()?;
let bytes = match txn.get(self.state_store_db, &key) {
Ok(slice) => Some(slice.to_owned()),
Err(lmdb::Error::NotFound) => None,
Err(err) => return Err(err.into()),
};
Ok(bytes)
}
pub(crate) fn del_state_store<K>(&self, key: K) -> Result<bool, Error>
where
K: AsRef<[u8]>,
{
let mut txn = self.env.begin_rw_txn()?;
let result = match txn.del(self.state_store_db, &key, None) {
Ok(_) => Ok(true),
Err(lmdb::Error::NotFound) => Ok(false),
Err(err) => Err(err),
}?;
txn.commit()?;
Ok(result)
}
pub(crate) fn root_path(&self) -> &Path {
&self.root
}
fn handle_storage_request<REv>(&mut self, req: StorageRequest) -> Result<Effects<Event>, Error>
where
Self: Component<REv>,
{
Ok(match req {
StorageRequest::PutBlock { block, responder } => {
responder.respond(self.write_block(&*block)?).ignore()
}
StorageRequest::GetBlock {
block_hash,
responder,
} => responder.respond(self.read_block(&block_hash)?).ignore(),
StorageRequest::GetBlockHeaderAtHeight { height, responder } => responder
.respond(self.get_block_header_by_height(&mut self.env.begin_ro_txn()?, height)?)
.ignore(),
StorageRequest::GetBlockAtHeight { height, responder } => responder
.respond(self.get_block_by_height(&mut self.env.begin_ro_txn()?, height)?)
.ignore(),
StorageRequest::GetHighestBlock { responder } => {
let mut txn = self.env.begin_ro_txn()?;
responder
.respond(self.get_highest_block(&mut txn)?)
.ignore()
}
StorageRequest::GetSwitchBlockHeaderAtEraId { era_id, responder } => responder
.respond(
self.get_switch_block_header_by_era_id(&mut self.env.begin_ro_txn()?, era_id)?,
)
.ignore(),
StorageRequest::GetBlockHeaderForDeploy {
deploy_hash,
responder,
} => {
responder
.respond(self.get_block_header_by_deploy_hash(
&mut self.env.begin_ro_txn()?,
deploy_hash,
)?)
.ignore()
}
StorageRequest::GetBlockHeader {
block_hash,
responder,
} => responder
.respond(self.get_single_block_header(&mut self.env.begin_ro_txn()?, &block_hash)?)
.ignore(),
StorageRequest::GetBlockTransfers {
block_hash,
responder,
} => responder
.respond(self.get_transfers(&mut self.env.begin_ro_txn()?, &block_hash)?)
.ignore(),
StorageRequest::PutDeploy { deploy, responder } => {
responder.respond(self.put_deploy(&*deploy)?).ignore()
}
StorageRequest::GetDeploys {
deploy_hashes,
responder,
} => responder
.respond(self.get_deploys_with_finalized_approvals(
&mut self.env.begin_ro_txn()?,
deploy_hashes.as_slice(),
)?)
.ignore(),
StorageRequest::PutExecutionResults {
block_hash,
execution_results,
responder,
} => {
let mut txn = self.env.begin_rw_txn()?;
let mut transfers: Vec<Transfer> = vec![];
for (deploy_hash, execution_result) in execution_results {
let mut metadata = self
.get_deploy_metadata(&mut txn, &deploy_hash)?
.unwrap_or_default();
if let Some(prev) = metadata.execution_results.get(&block_hash) {
if prev == &execution_result {
continue;
} else {
debug!(%deploy_hash, %block_hash, "different execution result");
}
}
if let ExecutionResult::Success { effect, .. } = execution_result.clone() {
for transform_entry in effect.transforms {
if let Transform::WriteTransfer(transfer) = transform_entry.transform {
transfers.push(transfer);
}
}
}
metadata
.execution_results
.insert(*block_hash, execution_result);
let was_written =
txn.put_value(self.deploy_metadata_db, &deploy_hash, &metadata, true)?;
if !was_written {
error!(?block_hash, ?deploy_hash, "failed to write deploy metadata");
debug_assert!(was_written);
}
}
let was_written =
txn.put_value(self.transfer_db, &*block_hash, &transfers, true)?;
if !was_written {
error!(?block_hash, "failed to write transfers");
debug_assert!(was_written);
}
txn.commit()?;
responder.respond(()).ignore()
}
StorageRequest::GetDeployAndMetadata {
deploy_hash,
responder,
} => {
let mut txn = self.env.begin_ro_txn()?;
let deploy = {
let opt_deploy =
self.get_deploy_with_finalized_approvals(&mut txn, &deploy_hash)?;
if let Some(deploy) = opt_deploy {
deploy
} else {
return Ok(responder.respond(None).ignore());
}
};
let metadata = self
.get_deploy_metadata(&mut txn, &deploy_hash)?
.unwrap_or_default();
responder.respond(Some((deploy, metadata))).ignore()
}
StorageRequest::GetBlockAndMetadataByHash {
block_hash,
responder,
} => {
let mut txn = self.env.begin_ro_txn()?;
let block: Block =
if let Some(block) = self.get_single_block(&mut txn, &block_hash)? {
block
} else {
return Ok(responder.respond(None).ignore());
};
assert_eq!(&block_hash, block.hash());
let signatures = match self.get_finality_signatures(&mut txn, &block_hash)? {
Some(signatures) => signatures,
None => BlockSignatures::new(block_hash, block.header().era_id()),
};
assert!(signatures.verify().is_ok());
responder.respond(Some((block, signatures))).ignore()
}
StorageRequest::GetBlockAndMetadataByHeight {
block_height,
responder,
} => {
let mut txn = self.env.begin_ro_txn()?;
let block: Block =
if let Some(block) = self.get_block_by_height(&mut txn, block_height)? {
block
} else {
return Ok(responder.respond(None).ignore());
};
let hash = block.hash();
let signatures = match self.get_finality_signatures(&mut txn, hash)? {
Some(signatures) => signatures,
None => BlockSignatures::new(*hash, block.header().era_id()),
};
responder.respond(Some((block, signatures))).ignore()
}
StorageRequest::GetHighestBlockWithMetadata { responder } => {
let mut txn = self.env.begin_ro_txn()?;
let highest_block: Block = if let Some(block) = self
.block_height_index
.keys()
.last()
.and_then(|&height| self.get_block_by_height(&mut txn, height).transpose())
.transpose()?
{
block
} else {
return Ok(responder.respond(None).ignore());
};
let hash = highest_block.hash();
let signatures = match self.get_finality_signatures(&mut txn, hash)? {
Some(signatures) => signatures,
None => BlockSignatures::new(*hash, highest_block.header().era_id()),
};
responder
.respond(Some((highest_block, signatures)))
.ignore()
}
StorageRequest::PutBlockSignatures {
signatures,
responder,
} => {
let mut txn = self.env.begin_rw_txn()?;
let old_data: Option<BlockSignatures> =
txn.get_value(self.block_metadata_db, &signatures.block_hash)?;
let new_data = match old_data {
None => signatures,
Some(mut data) => {
for (pk, sig) in signatures.proofs {
data.insert_proof(pk, sig);
}
data
}
};
let outcome = txn.put_value(
self.block_metadata_db,
&new_data.block_hash,
&new_data,
true,
)?;
txn.commit()?;
responder.respond(outcome).ignore()
}
StorageRequest::GetBlockSignatures {
block_hash,
responder,
} => {
let result =
self.get_finality_signatures(&mut self.env.begin_ro_txn()?, &block_hash)?;
responder.respond(result).ignore()
}
StorageRequest::GetFinalizedDeploys { ttl, responder } => {
responder.respond(self.get_finalized_deploys(ttl)?).ignore()
}
StorageRequest::StoreFinalizedApprovals {
ref deploy_hash,
ref finalized_approvals,
responder,
} => responder
.respond(self.store_finalized_approvals(deploy_hash, finalized_approvals)?)
.ignore(),
})
}
pub fn put_deploy(&self, deploy: &Deploy) -> Result<bool, Error> {
let mut txn = self.env.begin_rw_txn()?;
let outcome = txn.put_value(self.deploy_db, deploy.id(), &deploy, false)?;
txn.commit()?;
Ok(outcome)
}
fn get_block_header_and_metadata_by_height<Tx: Transaction>(
&self,
tx: &mut Tx,
height: u64,
) -> Result<Option<BlockHeaderWithMetadata>, Error> {
let block_hash = match self.block_height_index.get(&height) {
None => return Ok(None),
Some(block_hash) => block_hash,
};
let block_header = match self.get_single_block_header(tx, block_hash)? {
None => return Ok(None),
Some(block_header) => block_header,
};
let block_signatures = match self.get_finality_signatures(tx, block_hash)? {
None => BlockSignatures::new(*block_hash, block_header.era_id()),
Some(signatures) => signatures,
};
Ok(Some(BlockHeaderWithMetadata {
block_header,
block_signatures,
}))
}
pub fn read_block(&self, block_hash: &BlockHash) -> Result<Option<Block>, Error> {
self.get_single_block(&mut self.env.begin_ro_txn()?, block_hash)
}
pub fn read_deploy_by_hash(&self, deploy_hash: DeployHash) -> Result<Option<Deploy>, Error> {
let mut txn = self.env.begin_ro_txn()?;
Ok(txn.get_value(self.deploy_db, &deploy_hash)?)
}
pub fn write_block(&mut self, block: &Block) -> Result<bool, Error> {
block.verify()?;
let mut txn = self.env.begin_rw_txn()?;
{
let block_body_hash = block.header().body_hash();
let block_body = block.body();
let success = match block.header().hashing_algorithm_version() {
HashingAlgorithmVersion::V1 => {
self.put_single_block_body_v1(&mut txn, block_body_hash, block_body)?
}
HashingAlgorithmVersion::V2 => {
self.put_single_block_body_v2(&mut txn, block_body)?
}
};
if !success {
error!("Could not insert body for: {}", block);
txn.abort();
return Ok(false);
}
}
if !txn.put_value(self.block_header_db, block.hash(), block.header(), true)? {
error!("Could not insert block header for block: {}", block);
txn.abort();
return Ok(false);
}
insert_to_block_header_indices(
&mut self.block_height_index,
&mut self.switch_block_era_id_index,
block.header(),
)?;
insert_to_deploy_index(
&mut self.deploy_hash_index,
block.header().hash(),
block.body(),
)?;
txn.commit()?;
Ok(true)
}
pub(crate) fn read_block_header_and_finality_signatures_by_height(
&self,
height: u64,
) -> Result<Option<BlockHeaderWithMetadata>, Error> {
let mut txn = self.env.begin_ro_txn()?;
let maybe_block_header_and_finality_signatures =
self.get_block_header_and_metadata_by_height(&mut txn, height)?;
drop(txn);
Ok(maybe_block_header_and_finality_signatures)
}
fn get_block_header_by_height<Tx: Transaction>(
&self,
tx: &mut Tx,
height: u64,
) -> Result<Option<BlockHeader>, Error> {
self.block_height_index
.get(&height)
.and_then(|block_hash| self.get_single_block_header(tx, block_hash).transpose())
.transpose()
}
pub fn read_block_by_height(&self, height: u64) -> Result<Option<Block>, Error> {
self.get_block_by_height(&mut self.env.begin_ro_txn()?, height)
}
pub fn read_highest_block(&self) -> Result<Option<Block>, Error> {
self.get_highest_block(&mut self.env.begin_ro_txn()?)
}
fn get_block_by_height<Tx: Transaction>(
&self,
tx: &mut Tx,
height: u64,
) -> Result<Option<Block>, Error> {
self.block_height_index
.get(&height)
.and_then(|block_hash| self.get_single_block(tx, block_hash).transpose())
.transpose()
}
fn get_switch_block_header_by_era_id<Tx: Transaction>(
&self,
tx: &mut Tx,
era_id: EraId,
) -> Result<Option<BlockHeader>, Error> {
debug!(switch_block_era_id_index = ?self.switch_block_era_id_index);
self.switch_block_era_id_index
.get(&era_id)
.and_then(|block_hash| self.get_single_block_header(tx, block_hash).transpose())
.transpose()
}
fn get_block_header_by_deploy_hash<Tx: Transaction>(
&self,
tx: &mut Tx,
deploy_hash: DeployHash,
) -> Result<Option<BlockHeader>, Error> {
self.deploy_hash_index
.get(&deploy_hash)
.and_then(|block_hash| self.get_single_block_header(tx, block_hash).transpose())
.transpose()
}
fn get_highest_block<Tx: Transaction>(&self, txn: &mut Tx) -> Result<Option<Block>, Error> {
self.block_height_index
.keys()
.last()
.and_then(|&height| self.get_block_by_height(txn, height).transpose())
.transpose()
}
fn get_blocks_while<F, Tx: Transaction>(
&self,
txn: &mut Tx,
predicate: F,
) -> Result<Vec<Block>, Error>
where
F: Fn(&Block) -> bool,
{
let mut next_block = self.get_highest_block(txn)?;
let mut blocks = Vec::new();
loop {
match next_block {
None => break,
Some(block) if !predicate(&block) => break,
Some(block) => {
next_block = match block.parent() {
None => None,
Some(parent_hash) => self.get_single_block(txn, parent_hash)?,
};
blocks.push(block);
}
}
}
Ok(blocks)
}
fn get_finalized_deploys(
&self,
ttl: TimeDiff,
) -> Result<Vec<(DeployHash, DeployHeader)>, Error> {
let mut txn = self.env.begin_ro_txn()?;
let ttl_expired = |block: &Block| block.timestamp().elapsed() < ttl;
let mut deploys = Vec::new();
for block in self.get_blocks_while(&mut txn, ttl_expired)? {
for deploy_hash in block
.body()
.deploy_hashes()
.iter()
.chain(block.body().transfer_hashes())
{
let deploy_header = match self.get_deploy_header(&mut txn, deploy_hash)? {
Some(deploy_header) => deploy_header,
None => {
let deploy_hash = deploy_hash.to_owned();
return Err(Error::NonExistentDeploy {
block: Box::new(block),
deploy_hash,
});
}
};
if deploy_header.timestamp().elapsed() > ttl {
continue;
}
deploys.push((*deploy_hash, deploy_header));
}
}
Ok(deploys)
}
#[allow(dead_code)]
pub(crate) fn read_state_root_hashes_for_trie_check(&self) -> Result<Vec<Digest>, Error> {
let mut blake_hashes: Vec<Digest> = Vec::new();
let txn = self.env.begin_ro_txn()?;
let mut cursor = txn.open_ro_cursor(self.block_header_db)?;
for (_, raw_val) in cursor.iter() {
let header: BlockHeader = lmdb_ext::deserialize(raw_val)?;
let blake_hash = *header.state_root_hash();
blake_hashes.push(blake_hash);
}
blake_hashes.sort();
blake_hashes.dedup();
Ok(blake_hashes)
}
fn get_single_block_header<Tx: Transaction>(
&self,
tx: &mut Tx,
block_hash: &BlockHash,
) -> Result<Option<BlockHeader>, Error> {
let block_header: BlockHeader = match tx.get_value(self.block_header_db, &block_hash)? {
Some(block_header) => block_header,
None => return Ok(None),
};
let found_block_header_hash = block_header.hash();
if found_block_header_hash != *block_hash {
return Err(Error::BlockHeaderNotStoredUnderItsHash {
queried_block_hash_bytes: block_hash.as_ref().to_vec(),
found_block_header_hash,
block_header: Box::new(block_header),
});
};
Ok(Some(block_header))
}
fn get_single_block_body_v2<Tx: Transaction>(
&self,
tx: &mut Tx,
block_body_hash: &Digest,
) -> Result<Option<BlockBody>, LmdbExtError> {
get_single_block_body_v2(
tx,
self.block_body_v2_db,
self.deploy_hashes_db,
self.transfer_hashes_db,
self.proposer_db,
block_body_hash,
)
}
fn put_single_block_body_v1(
&self,
tx: &mut RwTransaction,
block_body_hash: &Digest,
block_body: &BlockBody,
) -> Result<bool, LmdbExtError> {
tx.put_value(self.block_body_v1_db, block_body_hash, block_body, true)
.map_err(Into::into)
}
fn put_merkle_block_body_part<'a, T>(
&self,
tx: &mut RwTransaction,
part_database: Database,
merklized_block_body_part: &MerkleBlockBodyPart<'a, T>,
) -> Result<bool, LmdbExtError>
where
T: ToBytes,
{
if !tx.put_value_bytesrepr(
self.block_body_v2_db,
merklized_block_body_part.merkle_linked_list_node_hash(),
&merklized_block_body_part.value_and_rest_hashes_pair(),
true,
)? {
return Ok(false);
};
if !tx.put_value_bytesrepr(
part_database,
merklized_block_body_part.value_hash(),
merklized_block_body_part.value(),
true,
)? {
return Ok(false);
};
Ok(true)
}
fn put_single_block_body_v2(
&self,
tx: &mut RwTransaction,
block_body: &BlockBody,
) -> Result<bool, LmdbExtError> {
let merkle_block_body = block_body.merklize();
let MerkleBlockBody {
deploy_hashes,
transfer_hashes,
proposer,
} = &merkle_block_body;
if !self.put_merkle_block_body_part(tx, self.deploy_hashes_db, deploy_hashes)? {
return Ok(false);
};
if !self.put_merkle_block_body_part(tx, self.transfer_hashes_db, transfer_hashes)? {
return Ok(false);
};
if !self.put_merkle_block_body_part(tx, self.proposer_db, proposer)? {
return Ok(false);
};
Ok(true)
}
pub(crate) fn get_block_header_by_hash(
&self,
block_hash: &BlockHash,
) -> Result<Option<BlockHeader>, Error> {
let mut txn = self.env.begin_ro_txn()?;
let maybe_block_header = self.get_single_block_header(&mut txn, block_hash)?;
drop(txn);
Ok(maybe_block_header)
}
fn get_single_block<Tx: Transaction>(
&self,
tx: &mut Tx,
block_hash: &BlockHash,
) -> Result<Option<Block>, Error> {
let block_header: BlockHeader = match self.get_single_block_header(tx, block_hash)? {
Some(block_header) => block_header,
None => return Ok(None),
};
let maybe_block_body = self.get_body_for_block_header(tx, &block_header)?;
let block_body = match maybe_block_body {
Some(block_body) => block_body,
None => {
warn!(
?block_header,
"retrieved block header but block body is missing from database"
);
return Ok(None);
}
};
let block = Block::new_from_header_and_body(block_header, block_body)?;
Ok(Some(block))
}
fn get_body_for_block_header<Tx: Transaction>(
&self,
tx: &mut Tx,
block_header: &BlockHeader,
) -> Result<Option<BlockBody>, LmdbExtError> {
match block_header.hashing_algorithm_version() {
HashingAlgorithmVersion::V1 => {
self.get_single_block_body_v1(tx, block_header.body_hash())
}
HashingAlgorithmVersion::V2 => {
self.get_single_block_body_v2(tx, block_header.body_hash())
}
}
}
fn get_single_block_body_v1<Tx: Transaction>(
&self,
tx: &mut Tx,
block_body_hash: &Digest,
) -> Result<Option<BlockBody>, LmdbExtError> {
tx.get_value(self.block_body_v1_db, block_body_hash)
}
fn get_deploys_with_finalized_approvals<Tx: Transaction>(
&self,
tx: &mut Tx,
deploy_hashes: &[DeployHash],
) -> Result<SmallVec<[Option<DeployWithFinalizedApprovals>; 1]>, LmdbExtError> {
deploy_hashes
.iter()
.map(|deploy_hash| self.get_deploy_with_finalized_approvals(tx, deploy_hash))
.collect()
}
fn get_deploy_with_finalized_approvals<Tx: Transaction>(
&self,
tx: &mut Tx,
deploy_hash: &DeployHash,
) -> Result<Option<DeployWithFinalizedApprovals>, LmdbExtError> {
let maybe_original_deploy = tx.get_value(self.deploy_db, deploy_hash)?;
if let Some(deploy) = maybe_original_deploy {
let maybe_finalized_approvals =
tx.get_value(self.finalized_approvals_db, deploy_hash)?;
Ok(Some(DeployWithFinalizedApprovals::new(
deploy,
maybe_finalized_approvals,
)))
} else {
Ok(None)
}
}
fn get_deploy_header<Tx: Transaction>(
&self,
txn: &mut Tx,
deploy_hash: &DeployHash,
) -> Result<Option<DeployHeader>, LmdbExtError> {
let maybe_deploy: Option<Deploy> = txn.get_value(self.deploy_db, deploy_hash)?;
Ok(maybe_deploy.map(|deploy| deploy.header().clone()))
}
fn get_deploy_metadata<Tx: Transaction>(
&self,
tx: &mut Tx,
deploy_hash: &DeployHash,
) -> Result<Option<DeployMetadata>, Error> {
Ok(tx.get_value(self.deploy_metadata_db, deploy_hash)?)
}
fn get_transfers<Tx: Transaction>(
&self,
tx: &mut Tx,
block_hash: &BlockHash,
) -> Result<Option<Vec<Transfer>>, Error> {
Ok(tx.get_value(self.transfer_db, block_hash)?)
}
fn get_finality_signatures<Tx: Transaction>(
&self,
tx: &mut Tx,
block_hash: &BlockHash,
) -> Result<Option<BlockSignatures>, Error> {
Ok(tx.get_value(self.block_metadata_db, block_hash)?)
}
fn get_switch_block_by_era_id<Tx: Transaction>(
&self,
tx: &mut Tx,
era_id: EraId,
) -> Result<Option<Block>, Error> {
self.switch_block_era_id_index
.get(&era_id)
.and_then(|block_hash| self.get_single_block(tx, block_hash).transpose())
.transpose()
}
pub fn store_finalized_approvals(
&self,
deploy_hash: &DeployHash,
finalized_approvals: &FinalizedApprovals,
) -> Result<(), Error> {
let maybe_original_deploy = self.read_deploy_by_hash(*deploy_hash)?;
let original_deploy = maybe_original_deploy.ok_or(Error::UnexpectedFinalizedApprovals {
deploy_hash: *deploy_hash,
})?;
if original_deploy.approvals() != finalized_approvals.as_ref() {
let mut txn = self.env.begin_rw_txn()?;
let _ = txn.put_value(
self.finalized_approvals_db,
deploy_hash,
finalized_approvals,
true,
)?;
txn.commit()?;
}
Ok(())
}
}
fn insert_to_block_header_indices(
block_height_index: &mut BTreeMap<u64, BlockHash>,
switch_block_era_id_index: &mut BTreeMap<EraId, BlockHash>,
block_header: &BlockHeader,
) -> Result<(), Error> {
let block_hash = block_header.hash();
if let Some(first) = block_height_index.get(&block_header.height()) {
if *first != block_hash {
return Err(Error::DuplicateBlockIndex {
height: block_header.height(),
first: *first,
second: block_hash,
});
}
}
if block_header.is_switch_block() {
match switch_block_era_id_index.entry(block_header.era_id()) {
Entry::Vacant(entry) => {
let _ = entry.insert(block_hash);
}
Entry::Occupied(entry) => {
if *entry.get() != block_hash {
return Err(Error::DuplicateEraIdIndex {
era_id: block_header.era_id(),
first: *entry.get(),
second: block_hash,
});
}
}
}
}
let _ = block_height_index.insert(block_header.height(), block_hash);
Ok(())
}
fn insert_to_deploy_index(
deploy_hash_index: &mut BTreeMap<DeployHash, BlockHash>,
block_hash: BlockHash,
block_body: &BlockBody,
) -> Result<(), Error> {
if let Some(hash) = block_body
.deploy_hashes()
.iter()
.chain(block_body.transfer_hashes().iter())
.find(|hash| {
deploy_hash_index
.get(hash)
.map_or(false, |old_block_hash| *old_block_hash != block_hash)
})
{
return Err(Error::DuplicateDeployIndex {
deploy_hash: *hash,
first: deploy_hash_index[hash],
second: block_hash,
});
}
for hash in block_body
.deploy_hashes()
.iter()
.chain(block_body.transfer_hashes().iter())
{
deploy_hash_index.insert(*hash, block_hash);
}
Ok(())
}
fn should_move_storage_files_to_network_subdir(
root: &Path,
file_names: &[&str],
) -> Result<bool, Error> {
let mut files_found = vec![];
let mut files_not_found = vec![];
file_names.iter().for_each(|file_name| {
let file_path = root.join(file_name);
match file_path.exists() {
true => files_found.push(file_path),
false => files_not_found.push(file_path),
}
});
let should_move_files = files_found.len() == file_names.len();
if !should_move_files && !files_found.is_empty() {
error!(
"found storage files: {:?}, missing storage files: {:?}",
files_found, files_not_found
);
return Err(Error::MissingStorageFiles {
missing_files: files_not_found,
});
}
Ok(should_move_files)
}
fn move_storage_files_to_network_subdir(
root: &Path,
subdir: &Path,
file_names: &[&str],
) -> Result<(), Error> {
file_names
.iter()
.map(|file_name| {
let source_path = root.join(file_name);
let dest_path = subdir.join(file_name);
fs::rename(&source_path, &dest_path).map_err(|original_error| Error::UnableToMoveFile {
source_path,
dest_path,
original_error,
})
})
.collect::<Result<Vec<_>, Error>>()?;
info!(
"moved files: {:?} from: {:?} to: {:?}",
file_names, root, subdir
);
Ok(())
}
#[derive(Clone, DataSize, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
pub path: PathBuf,
max_block_store_size: usize,
max_deploy_store_size: usize,
max_deploy_metadata_store_size: usize,
max_state_store_size: usize,
enable_mem_deduplication: bool,
mem_pool_prune_interval: u16,
}
impl Default for Config {
fn default() -> Self {
Config {
path: "/dev/null".into(),
max_block_store_size: DEFAULT_MAX_BLOCK_STORE_SIZE,
max_deploy_store_size: DEFAULT_MAX_DEPLOY_STORE_SIZE,
max_deploy_metadata_store_size: DEFAULT_MAX_DEPLOY_METADATA_STORE_SIZE,
max_state_store_size: DEFAULT_MAX_STATE_STORE_SIZE,
enable_mem_deduplication: false,
mem_pool_prune_interval: 1024,
}
}
}
impl Config {
#[cfg(test)]
pub(crate) fn default_for_tests() -> (Self, TempDir) {
let tempdir = tempfile::tempdir().expect("should get tempdir");
let path = tempdir.path().join("lmdb");
let config = Config {
path,
..Default::default()
};
(config, tempdir)
}
}
impl Display for Event {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Event::StorageRequest(req) => req.fmt(f),
Event::StateStoreRequest(req) => req.fmt(f),
}
}
}
impl Storage {
fn handle_legacy_direct_deploy_request(&self, deploy_hash: DeployHash) -> Option<Deploy> {
self.env
.begin_ro_txn()
.map_err(Into::into)
.and_then(|mut tx| tx.get_value(self.deploy_db, &deploy_hash))
.expect("legacy direct deploy request failed")
}
pub(crate) fn handle_deduplicated_legacy_direct_deploy_request(
&mut self,
deploy_hash: DeployHash,
) -> Option<SharedObject<Vec<u8>>> {
if self.enable_mem_deduplication {
if let Some(serialized) = self.deploy_cache.get(&deploy_hash) {
return Some(SharedObject::shared(serialized));
}
}
let deploy = self.handle_legacy_direct_deploy_request(deploy_hash)?;
match bincode::serialize(&deploy) {
Ok(serialized) => {
if self.enable_mem_deduplication {
let arc = Arc::new(serialized);
self.deploy_cache.put(deploy_hash, Arc::downgrade(&arc));
Some(SharedObject::shared(arc))
} else {
Some(SharedObject::owned(serialized))
}
}
Err(err) => {
error!(
err = display_error(&err),
"failed to create (shared) get-response"
);
None
}
}
}
pub fn transactional_get_switch_block_by_era_id(
&self,
switch_block_era_num: u64,
) -> Option<Block> {
let mut read_only_lmdb_transaction = self
.env
.begin_ro_txn()
.expect("Could not start read only transaction for lmdb");
let switch_block = self
.get_switch_block_by_era_id(
&mut read_only_lmdb_transaction,
EraId::from(switch_block_era_num),
)
.expect("LMDB panicked trying to get switch block");
read_only_lmdb_transaction
.commit()
.expect("Could not commit transaction");
switch_block
}
}
#[cfg(test)]
impl Storage {
pub(crate) fn get_deploy_by_hash(&self, deploy_hash: DeployHash) -> Option<Deploy> {
let mut txn = self
.env
.begin_ro_txn()
.expect("could not create RO transaction");
txn.get_value(self.deploy_db, &deploy_hash)
.expect("could not retrieve value from storage")
}
pub(crate) fn get_deploy_metadata_by_hash(
&self,
deploy_hash: &DeployHash,
) -> Option<DeployMetadata> {
let mut txn = self
.env
.begin_ro_txn()
.expect("could not create RO transaction");
self.get_deploy_metadata(&mut txn, deploy_hash)
.expect("could not retrieve deploy metadata from storage")
}
pub(crate) fn get_deploy_with_finalized_approvals_by_hash(
&self,
deploy_hash: &DeployHash,
) -> Option<DeployWithFinalizedApprovals> {
let mut txn = self
.env
.begin_ro_txn()
.expect("could not create RO transaction");
self.get_deploy_with_finalized_approvals(&mut txn, deploy_hash)
.expect("could not retrieve a deploy with finalized approvals from storage")
}
pub(crate) fn get_all_deploy_hashes(&self) -> BTreeSet<DeployHash> {
let txn = self
.env
.begin_ro_txn()
.expect("could not create RO transaction");
let mut cursor = txn
.open_ro_cursor(self.deploy_db)
.expect("could not create cursor");
cursor
.iter()
.map(|(raw_key, _)| {
DeployHash::new(Digest::try_from(raw_key).expect("malformed deploy hash in DB"))
})
.collect()
}
}
fn construct_block_body_to_block_header_reverse_lookup(
tx: &impl Transaction,
block_header_db: &Database,
) -> Result<BTreeMap<Digest, BlockHeader>, LmdbExtError> {
let mut block_body_hash_to_header_map: BTreeMap<Digest, BlockHeader> = BTreeMap::new();
for (_raw_key, raw_val) in tx.open_ro_cursor(*block_header_db)?.iter() {
let block_header: BlockHeader = lmdb_ext::deserialize(raw_val)?;
block_body_hash_to_header_map.insert(block_header.body_hash().to_owned(), block_header);
}
Ok(block_body_hash_to_header_map)
}
fn initialize_block_body_v1_db(
env: &Environment,
block_header_db: &Database,
block_body_v1_db: &Database,
deleted_block_body_hashes_raw: &HashSet<&[u8]>,
should_check_integrity: bool,
) -> Result<(), Error> {
info!("initializing v1 block body database");
let mut txn = env.begin_rw_txn()?;
let block_body_hash_to_header_map =
construct_block_body_to_block_header_reverse_lookup(&txn, block_header_db)?;
let mut cursor = txn.open_rw_cursor(*block_body_v1_db)?;
for (raw_key, _raw_val) in cursor.iter() {
let block_body_hash =
Digest::try_from(raw_key).map_err(|err| LmdbExtError::DataCorrupted(Box::new(err)))?;
if !block_body_hash_to_header_map.contains_key(&block_body_hash) {
if !deleted_block_body_hashes_raw.contains(raw_key) {
warn!(?raw_key, "orphaned block body detected");
}
info!(?raw_key, "deleting v1 block body");
cursor.del(WriteFlags::empty())?;
}
}
drop(cursor);
if should_check_integrity {
let expected_hashing_algorithm_version = HashingAlgorithmVersion::V1;
for (raw_key, raw_val) in txn.open_ro_cursor(*block_body_v1_db)?.iter() {
let block_body_hash = Digest::try_from(raw_key)
.map_err(|err| LmdbExtError::DataCorrupted(Box::new(err)))?;
let block_body: BlockBody = lmdb_ext::deserialize(raw_val)?;
if let Some(block_header) = block_body_hash_to_header_map.get(&block_body_hash) {
let actual_hashing_algorithm_version = block_header.hashing_algorithm_version();
if expected_hashing_algorithm_version != actual_hashing_algorithm_version {
return Err(Error::UnexpectedHashingAlgorithmVersion {
expected_hashing_algorithm_version,
actual_hashing_algorithm_version,
});
}
Block::new_from_header_and_body(block_header.to_owned(), block_body)?;
} else {
return Err(Error::NoBlockHeaderForBlockBody {
block_body_hash,
hashing_algorithm_version: expected_hashing_algorithm_version,
block_body: Box::new(block_body),
});
}
}
}
txn.commit()?;
info!("v1 block body database initialized");
Ok(())
}
fn get_merkle_linked_list_node<Tx, T>(
tx: &mut Tx,
block_body_v2_db: Database,
part_database: Database,
key_to_block_body_db: &Digest,
) -> Result<Option<MerkleLinkedListNode<T>>, LmdbExtError>
where
Tx: Transaction,
T: FromBytes,
{
let (part_to_value_db, merkle_proof_of_rest): (Digest, Digest) =
match tx.get_value_bytesrepr(block_body_v2_db, key_to_block_body_db)? {
Some(slice) => slice,
None => return Ok(None),
};
let value = match tx.get_value_bytesrepr(part_database, &part_to_value_db)? {
Some(value) => value,
None => return Ok(None),
};
Ok(Some(MerkleLinkedListNode::new(value, merkle_proof_of_rest)))
}
fn get_single_block_body_v2<Tx: Transaction>(
tx: &mut Tx,
block_body_v2_db: Database,
deploy_hashes_db: Database,
transfer_hashes_db: Database,
proposer_db: Database,
block_body_hash: &Digest,
) -> Result<Option<BlockBody>, LmdbExtError> {
let deploy_hashes_with_proof: MerkleLinkedListNode<Vec<DeployHash>> =
match get_merkle_linked_list_node(tx, block_body_v2_db, deploy_hashes_db, block_body_hash)?
{
Some(deploy_hashes_with_proof) => deploy_hashes_with_proof,
None => return Ok(None),
};
let transfer_hashes_with_proof: MerkleLinkedListNode<Vec<DeployHash>> =
match get_merkle_linked_list_node(
tx,
block_body_v2_db,
transfer_hashes_db,
deploy_hashes_with_proof.merkle_proof_of_rest(),
)? {
Some(transfer_hashes_with_proof) => transfer_hashes_with_proof,
None => return Ok(None),
};
let proposer_with_proof: MerkleLinkedListNode<PublicKey> = match get_merkle_linked_list_node(
tx,
block_body_v2_db,
proposer_db,
transfer_hashes_with_proof.merkle_proof_of_rest(),
)? {
Some(proposer_with_proof) => {
debug_assert_eq!(
*proposer_with_proof.merkle_proof_of_rest(),
Digest::SENTINEL_RFOLD
);
proposer_with_proof
}
None => return Ok(None),
};
let block_body = BlockBody::new(
proposer_with_proof.take_value(),
deploy_hashes_with_proof.take_value(),
transfer_hashes_with_proof.take_value(),
);
Ok(Some(block_body))
}
#[allow(dead_code)]
fn garbage_collect_block_body_v2_db(
txn: &mut RwTransaction,
block_body_v2_db: &Database,
deploy_hashes_db: &Database,
transfer_hashes_db: &Database,
proposer_db: &Database,
block_body_hash_to_header_map: &BTreeMap<Digest, BlockHeader>,
) -> Result<(), Error> {
let mut live_digests: [HashSet<Digest>; BlockBody::PARTS_COUNT + 1] = [
HashSet::new(),
HashSet::new(),
HashSet::new(),
HashSet::new(),
];
for (body_hash, header) in block_body_hash_to_header_map {
if header.hashing_algorithm_version() != HashingAlgorithmVersion::V2 {
continue;
}
let mut current_digest = *body_hash;
let mut live_digests_index = 1;
while current_digest != Digest::SENTINEL_RFOLD && !live_digests[0].contains(¤t_digest)
{
live_digests[0].insert(current_digest);
let (key_to_part_db, merkle_proof_of_rest): (Digest, Digest) =
match txn.get_value_bytesrepr(*block_body_v2_db, ¤t_digest)? {
Some(slice) => slice,
None => {
return Err(Error::CouldNotFindBlockBodyPart {
block_hash: header.hash(),
merkle_linked_list_node_hash: current_digest,
})
}
};
if live_digests_index < live_digests.len() {
live_digests[live_digests_index].insert(key_to_part_db);
} else {
return Err(Error::UnexpectedBlockBodyPart {
block_body_hash: *body_hash,
part_hash: key_to_part_db,
});
}
live_digests_index += 1;
current_digest = merkle_proof_of_rest;
}
}
let databases_to_clean: [(&Database, &str); BlockBody::PARTS_COUNT + 1] = [
(block_body_v2_db, "deleting v2 block body part"),
(deploy_hashes_db, "deleting v2 deploy hashes entry"),
(transfer_hashes_db, "deleting v2 transfer hashes entry"),
(proposer_db, "deleting v2 proposer entry"),
];
for (index, (database, info_text)) in databases_to_clean.iter().enumerate() {
let mut cursor = txn.open_rw_cursor(**database)?;
for (raw_key, _raw_val) in cursor.iter() {
let key = Digest::try_from(raw_key)
.map_err(|err| LmdbExtError::DataCorrupted(Box::new(err)))?;
if !live_digests[index].contains(&key) {
info!(?raw_key, info_text);
cursor.del(WriteFlags::empty())?;
}
}
drop(cursor);
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn initialize_block_body_v2_db(
env: &Environment,
block_header_db: &Database,
block_body_v2_db: &Database,
deploy_hashes_db: &Database,
transfer_hashes_db: &Database,
proposer_db: &Database,
should_check_integrity: bool,
) -> Result<(), Error> {
info!("initializing v2 block body database");
if should_check_integrity {
let txn = env.begin_rw_txn()?;
let block_body_hash_to_header_map =
construct_block_body_to_block_header_reverse_lookup(&txn, block_header_db)?;
let expected_hashing_algorithm_version = HashingAlgorithmVersion::V2;
for (raw_key, _raw_val) in txn.open_ro_cursor(*block_body_v2_db)?.iter() {
let block_body_hash = Digest::try_from(raw_key)
.map_err(|err| LmdbExtError::DataCorrupted(Box::new(err)))?;
let block_header = match block_body_hash_to_header_map.get(&block_body_hash) {
Some(block_header) => block_header,
None => {
continue;
}
};
let actual_hashing_algorithm_version = block_header.hashing_algorithm_version();
if expected_hashing_algorithm_version != actual_hashing_algorithm_version {
return Err(Error::UnexpectedHashingAlgorithmVersion {
expected_hashing_algorithm_version,
actual_hashing_algorithm_version,
});
}
let mut txn2 = env.begin_ro_txn()?;
match get_single_block_body_v2(
&mut txn2,
*block_body_v2_db,
*deploy_hashes_db,
*transfer_hashes_db,
*proposer_db,
&block_body_hash,
)? {
Some(block_body) => {
Block::new_from_header_and_body(block_header.to_owned(), block_body)?;
}
None => {
info!(?block_body_hash, "incomplete block body found");
}
};
txn2.commit()?;
}
txn.commit()?;
}
info!("v2 block body database initialized");
Ok(())
}
fn initialize_block_metadata_db(
env: &Environment,
block_metadata_db: &Database,
deleted_block_hashes: &HashSet<&[u8]>,
should_check_integrity: bool,
) -> Result<(), Error> {
info!("initializing block metadata database");
let mut txn = env.begin_rw_txn()?;
let mut cursor = txn.open_rw_cursor(*block_metadata_db)?;
for (raw_key, raw_val) in cursor.iter() {
if deleted_block_hashes.contains(raw_key) {
cursor.del(WriteFlags::empty())?;
continue;
}
if should_check_integrity {
let signatures: BlockSignatures = lmdb_ext::deserialize(raw_val)?;
signatures.verify().map_err(Error::SignatureVerification)?;
if raw_key != signatures.block_hash.as_ref() {
return Err(Error::CorruptedBlockSignatureIndex {
raw_key: raw_key.to_vec(),
block_hash_bytes: signatures.block_hash.as_ref().to_vec(),
});
};
}
}
drop(cursor);
txn.commit()?;
info!("block metadata database initialized");
Ok(())
}
fn initialize_deploy_metadata_db(
env: &Environment,
deploy_metadata_db: &Database,
deleted_block_hashes: &HashSet<BlockHash>,
) -> Result<(), LmdbExtError> {
info!("initializing deploy metadata database");
if !deleted_block_hashes.is_empty() {
let mut txn = env.begin_rw_txn()?;
let mut cursor = txn.open_rw_cursor(*deploy_metadata_db)?;
for (raw_key, raw_val) in cursor.iter() {
let mut deploy_metadata: DeployMetadata = lmdb_ext::deserialize(raw_val)?;
let len_before = deploy_metadata.execution_results.len();
deploy_metadata.execution_results = deploy_metadata
.execution_results
.drain()
.filter(|(block_hash, _)| !deleted_block_hashes.contains(block_hash))
.collect();
if deploy_metadata.execution_results.is_empty() {
cursor.del(WriteFlags::empty())?;
} else if len_before != deploy_metadata.execution_results.len() {
let buffer = lmdb_ext::serialize(&deploy_metadata)?;
cursor.put(&raw_key, &buffer, WriteFlags::empty())?;
}
}
drop(cursor);
txn.commit()?;
}
info!("deploy metadata database initialized");
Ok(())
}