mod lmdb_ext;
#[cfg(test)]
mod tests;
use std::{
collections::BTreeMap,
fmt::{self, Display, Formatter},
fs, io,
path::PathBuf,
sync::Arc,
};
#[cfg(test)]
use std::{collections::BTreeSet, convert::TryFrom};
use datasize::DataSize;
use derive_more::From;
use lmdb::{
Cursor, Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, WriteFlags,
};
use serde::{Deserialize, Serialize};
#[cfg(test)]
use tempfile::TempDir;
use thiserror::Error;
use tracing::info;
use super::Component;
#[cfg(test)]
use crate::crypto::hash::Digest;
use crate::{
effect::{
requests::{StateStoreRequest, StorageRequest},
EffectBuilder, EffectExt, Effects,
},
fatal,
types::{Block, BlockHash, Deploy, DeployHash, DeployMetadata},
utils::WithDir,
Chainspec, NodeRng,
};
use casper_types::{ExecutionResult, Transfer, Transform};
use lmdb_ext::{LmdbExtError, TransactionExt, WriteTransactionExt};
const STORAGE_DB_FILENAME: &str = "storage.lmdb";
const MAX_TRANSACTIONS: u32 = 1;
const GIB: usize = 1024 * 1024 * 1024;
const DEFAULT_MAX_BLOCK_STORE_SIZE: usize = 450 * GIB;
const DEFAULT_MAX_DEPLOY_STORE_SIZE: usize = 300 * GIB;
const DEFAULT_MAX_DEPLOY_METADATA_STORE_SIZE: usize = 300 * GIB;
const DEFAULT_MAX_STATE_STORE_SIZE: usize = 10 * GIB;
const MAX_DB_COUNT: u32 = 5;
#[cfg(not(target_os = "macos"))]
const OS_FLAGS: EnvironmentFlags = EnvironmentFlags::WRITE_MAP;
#[cfg(target_os = "macos")]
const OS_FLAGS: EnvironmentFlags = EnvironmentFlags::empty();
#[derive(Debug, From, Serialize)]
pub enum Event {
#[from]
StorageRequest(StorageRequest),
#[from]
StateStoreRequest(StateStoreRequest),
}
#[derive(Debug, Error)]
pub enum Error {
#[error("failed to create database directory `{}`: {}", .0.display(), .1)]
CreateDatabaseDirectory(PathBuf, io::Error),
#[error("duplicate entries for block at height {height}: {first} / {second}")]
DuplicateBlockIndex {
height: u64,
first: BlockHash,
second: BlockHash,
},
#[error("duplicate execution result for deploy {deploy_hash} in block {block_hash}")]
DuplicateExecutionResult {
deploy_hash: DeployHash,
block_hash: BlockHash,
},
#[error("internal database error: {0}")]
InternalStorage(#[from] LmdbExtError),
}
impl From<lmdb::Error> for Error {
fn from(err: lmdb::Error) -> Self {
LmdbExtError::from(err).into()
}
}
#[derive(DataSize, Debug)]
pub struct Storage {
root: PathBuf,
#[data_size(skip)]
env: Environment,
#[data_size(skip)]
block_db: Database,
#[data_size(skip)]
deploy_db: Database,
#[data_size(skip)]
deploy_metadata_db: Database,
#[data_size(skip)]
transfer_db: Database,
#[data_size(skip)]
state_store_db: Database,
block_height_index: BTreeMap<u64, BlockHash>,
chainspec_cache: Option<Arc<Chainspec>>,
}
impl<REv> Component<REv> for Storage {
type Event = Event;
type ConstructionError = Error;
fn handle_event(
&mut self,
effect_builder: EffectBuilder<REv>,
_rng: &mut NodeRng,
event: Self::Event,
) -> Effects<Self::Event> {
let result = match event {
Event::StorageRequest(req) => self.handle_storage_request::<REv>(req),
Event::StateStoreRequest(req) => {
self.handle_state_store_request::<REv>(effect_builder, req)
}
};
match result {
Ok(effects) => effects,
Err(err) => fatal!(effect_builder, "storage error: {}", err),
}
}
}
impl Storage {
pub(crate) fn new(cfg: &WithDir<Config>) -> Result<Self, Error> {
let config = cfg.value();
let root = cfg.with_dir(config.path.clone());
if !root.exists() {
fs::create_dir_all(&root)
.map_err(|err| Error::CreateDatabaseDirectory(root.clone(), err))?;
}
let total_size = config
.max_block_store_size
.saturating_add(config.max_deploy_store_size)
.saturating_add(config.max_deploy_metadata_store_size);
let env = Environment::new()
.set_flags(
OS_FLAGS |
EnvironmentFlags::NO_SUB_DIR
| EnvironmentFlags::NO_TLS,
)
.set_max_readers(MAX_TRANSACTIONS)
.set_max_dbs(MAX_DB_COUNT)
.set_map_size(total_size)
.open(&root.join(STORAGE_DB_FILENAME))?;
let block_db = env.create_db(Some("blocks"), DatabaseFlags::empty())?;
let deploy_db = env.create_db(Some("deploys"), DatabaseFlags::empty())?;
let deploy_metadata_db = env.create_db(Some("deploy_metadata"), DatabaseFlags::empty())?;
let transfer_db = env.create_db(Some("transfer"), DatabaseFlags::empty())?;
let state_store_db = env.create_db(Some("state_store"), DatabaseFlags::empty())?;
info!("reindexing block store");
let mut block_height_index = BTreeMap::new();
let block_txn = env.begin_ro_txn()?;
let mut cursor = block_txn.open_ro_cursor(block_db)?;
for (raw_key, raw_val) in cursor.iter() {
let block: Block = lmdb_ext::deserialize(raw_val)?;
assert_eq!(
raw_key,
block.hash().as_ref(),
"found corrupt block in database"
);
let header = block.header();
if let Some(duplicate) = block_height_index.insert(header.height(), *block.hash()) {
return Err(Error::DuplicateBlockIndex {
height: header.height(),
first: header.hash(),
second: duplicate,
});
}
}
info!("block store reindexing complete");
drop(cursor);
drop(block_txn);
Ok(Storage {
root,
env,
block_db,
deploy_db,
deploy_metadata_db,
transfer_db,
state_store_db,
block_height_index,
chainspec_cache: None,
})
}
fn handle_state_store_request<REv>(
&mut self,
_effect_builder: EffectBuilder<REv>,
req: StateStoreRequest,
) -> Result<Effects<Event>, Error>
where
Self: Component<REv>,
{
match req {
StateStoreRequest::Save {
key,
data,
responder,
} => {
let mut txn = self.env.begin_rw_txn()?;
txn.put(self.state_store_db, &key, &data, WriteFlags::default())?;
txn.commit()?;
Ok(responder.respond(()).ignore())
}
StateStoreRequest::Load { key, responder } => {
let txn = self.env.begin_ro_txn()?;
let bytes = match txn.get(self.state_store_db, &key) {
Ok(slice) => Some(slice.to_owned()),
Err(lmdb::Error::NotFound) => None,
Err(err) => return Err(err.into()),
};
Ok(responder.respond(bytes).ignore())
}
}
}
fn handle_storage_request<REv>(&mut self, req: StorageRequest) -> Result<Effects<Event>, Error>
where
Self: Component<REv>,
{
Ok(match req {
StorageRequest::PutBlock { block, responder } => {
let mut txn = self.env.begin_rw_txn()?;
let outcome = txn.put_value(self.block_db, block.hash(), &block, true)?;
txn.commit()?;
if outcome {
if let Some(first) = self.block_height_index.get(&block.height()) {
if first != block.hash() {
return Err(Error::DuplicateBlockIndex {
height: block.height(),
first: *first,
second: *block.hash(),
});
}
}
}
self.block_height_index
.insert(block.height(), *block.hash());
responder.respond(outcome).ignore()
}
StorageRequest::GetBlock {
block_hash,
responder,
} => responder
.respond(self.get_single_block(&mut self.env.begin_ro_txn()?, &block_hash)?)
.ignore(),
StorageRequest::GetBlockAtHeight { height, responder } => responder
.respond(self.get_block_by_height(&mut self.env.begin_ro_txn()?, height)?)
.ignore(),
StorageRequest::GetHighestBlock { responder } => {
let mut txn = self.env.begin_ro_txn()?;
responder
.respond(
self.block_height_index
.keys()
.last()
.and_then(|&height| {
self.get_block_by_height(&mut txn, height).transpose()
})
.transpose()?,
)
.ignore()
}
StorageRequest::GetBlockHeader {
block_hash,
responder,
} => responder
.respond(
self.get_single_block(&mut self.env.begin_ro_txn()?, &block_hash)?
.map(|block| block.header().clone()),
)
.ignore(),
StorageRequest::GetBlockTransfers {
block_hash,
responder,
} => responder
.respond(self.get_transfers(&mut self.env.begin_ro_txn()?, &block_hash)?)
.ignore(),
StorageRequest::PutDeploy { deploy, responder } => {
let mut txn = self.env.begin_rw_txn()?;
let outcome = txn.put_value(self.deploy_db, deploy.id(), &deploy, false)?;
txn.commit()?;
responder.respond(outcome).ignore()
}
StorageRequest::GetDeploys {
deploy_hashes,
responder,
} => responder
.respond(self.get_deploys(&mut self.env.begin_ro_txn()?, deploy_hashes.as_slice())?)
.ignore(),
StorageRequest::GetDeployHeaders {
deploy_hashes,
responder,
} => responder
.respond(
self.get_deploys(&mut self.env.begin_ro_txn()?, deploy_hashes.as_slice())?
.into_iter()
.map(|opt| opt.map(|deploy| deploy.header().clone()))
.collect(),
)
.ignore(),
StorageRequest::PutExecutionResults {
block_hash,
execution_results,
responder,
} => {
let mut txn = self.env.begin_rw_txn()?;
let mut transfers: Vec<Transfer> = vec![];
for (deploy_hash, execution_result) in execution_results {
let mut metadata = self
.get_deploy_metadata(&mut txn, &deploy_hash)?
.unwrap_or_default();
if let Some(prev) = metadata.execution_results.get(&block_hash) {
if prev != &execution_result {
return Err(Error::DuplicateExecutionResult {
deploy_hash,
block_hash,
});
}
continue;
}
if let ExecutionResult::Success { effect, .. } = execution_result.clone() {
for transform_entry in effect.transforms {
if let Transform::WriteTransfer(transfer) = transform_entry.transform {
transfers.push(transfer);
}
}
}
metadata
.execution_results
.insert(block_hash, execution_result);
let was_written =
txn.put_value(self.deploy_metadata_db, &deploy_hash, &metadata, true)?;
assert!(
was_written,
"failed to write deploy metadata for block_hash {} deploy_hash {}",
block_hash, deploy_hash
);
}
let was_written = txn.put_value(self.transfer_db, &block_hash, &transfers, true)?;
assert!(
was_written,
"failed to write transfers for block_hash {}",
block_hash
);
txn.commit()?;
responder.respond(()).ignore()
}
StorageRequest::GetDeployAndMetadata {
deploy_hash,
responder,
} => {
let mut txn = self.env.begin_ro_txn()?;
let deploy: Deploy =
if let Some(deploy) = txn.get_value(self.deploy_db, &deploy_hash)? {
deploy
} else {
return Ok(responder.respond(None).ignore());
};
let metadata = self
.get_deploy_metadata(&mut txn, &deploy_hash)?
.unwrap_or_default();
responder.respond(Some((deploy, metadata))).ignore()
}
StorageRequest::PutChainspec {
chainspec,
responder,
} => {
self.chainspec_cache = Some(chainspec);
responder.respond(()).ignore()
}
StorageRequest::GetChainspec {
version: _version,
responder,
} => responder.respond(self.chainspec_cache.clone()).ignore(),
})
}
fn get_block_by_height<Tx: Transaction>(
&self,
tx: &mut Tx,
height: u64,
) -> Result<Option<Block>, LmdbExtError> {
self.block_height_index
.get(&height)
.and_then(|block_hash| self.get_single_block(tx, block_hash).transpose())
.transpose()
}
fn get_single_block<Tx: Transaction>(
&self,
tx: &mut Tx,
block_hash: &BlockHash,
) -> Result<Option<Block>, LmdbExtError> {
tx.get_value(self.block_db, &block_hash)
}
fn get_deploys<Tx: Transaction>(
&self,
tx: &mut Tx,
deploy_hashes: &[DeployHash],
) -> Result<Vec<Option<Deploy>>, LmdbExtError> {
deploy_hashes
.iter()
.map(|deploy_hash| tx.get_value(self.deploy_db, deploy_hash))
.collect()
}
fn get_deploy_metadata<Tx: Transaction>(
&self,
tx: &mut Tx,
deploy_hash: &DeployHash,
) -> Result<Option<DeployMetadata>, Error> {
Ok(tx.get_value(self.deploy_metadata_db, deploy_hash)?)
}
fn get_transfers<Tx: Transaction>(
&self,
tx: &mut Tx,
block_hash: &BlockHash,
) -> Result<Option<Vec<Transfer>>, Error> {
Ok(tx.get_value(self.transfer_db, block_hash)?)
}
}
#[derive(Clone, DataSize, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
pub(crate) path: PathBuf,
max_block_store_size: usize,
max_deploy_store_size: usize,
max_deploy_metadata_store_size: usize,
max_state_store_size: usize,
}
impl Default for Config {
fn default() -> Self {
Config {
path: "/dev/null".into(),
max_block_store_size: DEFAULT_MAX_BLOCK_STORE_SIZE,
max_deploy_store_size: DEFAULT_MAX_DEPLOY_STORE_SIZE,
max_deploy_metadata_store_size: DEFAULT_MAX_DEPLOY_METADATA_STORE_SIZE,
max_state_store_size: DEFAULT_MAX_STATE_STORE_SIZE,
}
}
}
impl Config {
#[cfg(test)]
pub(crate) fn default_for_tests() -> (Self, TempDir) {
let tempdir = tempfile::tempdir().expect("should get tempdir");
let path = tempdir.path().join("lmdb");
let config = Config {
path,
..Default::default()
};
(config, tempdir)
}
}
impl Display for Event {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Event::StorageRequest(req) => req.fmt(f),
Event::StateStoreRequest(req) => req.fmt(f),
}
}
}
impl Storage {
pub fn handle_legacy_direct_deploy_request(&self, deploy_hash: DeployHash) -> Option<Deploy> {
self.env
.begin_ro_txn()
.map_err(Into::into)
.and_then(|mut tx| tx.get_value(self.deploy_db, &deploy_hash))
.expect("legacy direct deploy request failed")
}
}
#[cfg(test)]
impl Storage {
pub fn get_deploy_by_hash(&self, deploy_hash: DeployHash) -> Option<Deploy> {
let mut txn = self
.env
.begin_ro_txn()
.expect("could not create RO transaction");
txn.get_value(self.deploy_db, &deploy_hash)
.expect("could not retrieve value from storage")
}
pub fn get_all_deploy_hashes(&self) -> BTreeSet<DeployHash> {
let txn = self
.env
.begin_ro_txn()
.expect("could not create RO transaction");
let mut cursor = txn
.open_ro_cursor(self.deploy_db)
.expect("could not create cursor");
cursor
.iter()
.map(|(raw_key, _)| {
DeployHash::new(Digest::try_from(raw_key).expect("malformed deploy hash in DB"))
})
.collect()
}
}