pub use self::{
api_sender::{ApiSender, SendError},
block::{
AdditionalHeaders, Block, BlockHeaderKey, BlockProof, CallProof, Epoch, IndexProof,
ProofError, ProposerId, SkipFlag,
},
builder::BlockchainBuilder,
config::{ConsensusConfig, ConsensusConfigBuilder, ValidatorKeys},
schema::{CallErrorsIter, CallInBlock, CallRecords, Schema, TxLocation},
};
pub mod config;
pub(crate) use crate::runtime::ExecutionError;
use exonum_crypto::{Hash, KeyPair};
use exonum_merkledb::{
access::{Access, RawAccess},
Database, Fork, HashTag, KeySetIndex, MapIndex, ObjectHash, Patch, Result as StorageResult,
Snapshot, SystemSchema, TemporaryDB,
};
use std::{borrow::Cow, collections::BTreeMap, iter, sync::Arc};
use crate::{
blockchain::config::GenesisConfig,
helpers::{Height, ValidateInput, ValidatorId},
messages::{AnyTx, Precommit, Verified},
runtime::Dispatcher,
};
mod api_sender;
mod block;
mod builder;
mod schema;
#[cfg(test)]
pub mod tests;
pub type Transactions<'a> = Box<dyn Iterator<Item = (Hash, Cow<'a, Verified<AnyTx>>)> + 'a>;
pub trait TransactionCache {
fn get_transaction(&self, hash: Hash) -> Option<Verified<AnyTx>>;
fn contains_transaction(&self, hash: Hash) -> bool {
self.get_transaction(hash).is_some()
}
fn transactions(&self) -> Transactions<'_>;
}
impl TransactionCache for () {
fn get_transaction(&self, _hash: Hash) -> Option<Verified<AnyTx>> {
None
}
fn transactions(&self) -> Transactions<'_> {
Box::new(iter::empty())
}
}
impl TransactionCache for BTreeMap<Hash, Verified<AnyTx>> {
fn get_transaction(&self, hash: Hash) -> Option<Verified<AnyTx>> {
self.get(&hash).cloned()
}
fn contains_transaction(&self, hash: Hash) -> bool {
self.contains_key(&hash)
}
fn transactions(&self) -> Transactions<'_> {
let it = self
.iter()
.map(|(tx_hash, tx)| (*tx_hash, Cow::Borrowed(tx)));
Box::new(it)
}
}
#[derive(Debug)]
pub struct PersistentPool<'a, C: ?Sized, T: RawAccess> {
cache: &'a C,
transactions: MapIndex<T, Hash, Verified<AnyTx>>,
transactions_pool: KeySetIndex<T, Hash>,
}
impl<'a, C, T> PersistentPool<'a, C, T>
where
C: TransactionCache + ?Sized,
T: RawAccess,
{
pub fn new<A>(access: A, cache: &'a C) -> Self
where
A: Access<Base = T>,
{
let schema = Schema::new(access);
Self {
cache,
transactions: schema.transactions(),
transactions_pool: schema.transactions_pool(),
}
}
}
impl<C, T> TransactionCache for PersistentPool<'_, C, T>
where
C: TransactionCache + ?Sized,
T: RawAccess,
{
fn get_transaction(&self, hash: Hash) -> Option<Verified<AnyTx>> {
self.cache
.get_transaction(hash)
.or_else(|| self.transactions.get(&hash))
}
fn contains_transaction(&self, hash: Hash) -> bool {
self.cache.contains_transaction(hash) || self.transactions.contains(&hash)
}
fn transactions(&self) -> Transactions<'_> {
let pool_it = self.transactions_pool.iter().map(move |tx_hash| {
let tx = self
.transactions
.get(&tx_hash)
.expect("Transaction in pool is lost");
(tx_hash, Cow::Owned(tx))
});
let it = self.cache.transactions().chain(pool_it);
Box::new(it)
}
}
#[derive(Debug, Clone)]
pub struct Blockchain {
api_sender: ApiSender,
db: Arc<dyn Database>,
service_keypair: KeyPair,
}
impl Blockchain {
pub fn new(
database: impl Into<Arc<dyn Database>>,
service_keypair: impl Into<KeyPair>,
api_sender: ApiSender,
) -> Self {
Self {
db: database.into(),
service_keypair: service_keypair.into(),
api_sender,
}
}
pub fn build_for_tests() -> Self {
Self::new(TemporaryDB::new(), KeyPair::random(), ApiSender::closed())
}
pub(crate) fn database(&self) -> &Arc<dyn Database> {
&self.db
}
pub fn snapshot(&self) -> Box<dyn Snapshot> {
self.db.snapshot()
}
pub fn last_hash(&self) -> Hash {
Schema::new(&self.snapshot())
.block_hashes_by_height()
.last()
.unwrap_or_else(Hash::zero)
}
pub fn last_block(&self) -> Block {
Schema::new(&self.snapshot()).last_block()
}
#[doc(hidden)]
pub fn pool_size(&self) -> u64 {
Schema::new(&self.snapshot()).transactions_pool_len()
}
#[cfg(test)]
pub fn into_mut(self, genesis_config: GenesisConfig) -> BlockchainBuilder {
BlockchainBuilder::new(self).with_genesis_config(genesis_config)
}
#[cfg(test)]
pub fn into_mut_with_dummy_config(self) -> BlockchainBuilder {
use self::config::GenesisConfigBuilder;
let (mut config, _) = ConsensusConfig::for_tests(1);
config.validator_keys[0].service_key = self.service_keypair.public_key();
let genesis_config = GenesisConfigBuilder::with_consensus_config(config).build();
self.into_mut(genesis_config)
}
pub fn sender(&self) -> &ApiSender {
&self.api_sender
}
pub fn service_keypair(&self) -> &KeyPair {
&self.service_keypair
}
pub fn check_tx(snapshot: &dyn Snapshot, tx: &Verified<AnyTx>) -> Result<(), ExecutionError> {
Dispatcher::check_tx(snapshot, tx)
}
}
#[derive(Debug, Clone)]
pub struct BlockParams<'a> {
proposer: ValidatorId,
epoch: Height,
contents: BlockContents<'a>,
}
impl<'a> BlockParams<'a> {
pub fn new(proposer: ValidatorId, epoch: Height, tx_hashes: &'a [Hash]) -> Self {
Self {
proposer,
epoch,
contents: BlockContents::Transactions(tx_hashes),
}
}
pub fn skip(proposer: ValidatorId, epoch: Height) -> Self {
Self {
proposer,
epoch,
contents: BlockContents::Skip,
}
}
pub fn with_contents(
contents: BlockContents<'a>,
proposer: ValidatorId,
epoch: Height,
) -> Self {
Self {
proposer,
epoch,
contents,
}
}
fn for_genesis_block() -> Self {
Self {
proposer: ValidatorId(0),
epoch: Height(0),
contents: BlockContents::Transactions(&[]),
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum BlockContents<'a> {
Transactions(&'a [Hash]),
Skip,
}
#[derive(Debug, Clone, Copy, PartialEq)]
#[non_exhaustive]
pub enum BlockKind {
Normal,
Skip,
}
#[derive(Debug)]
pub struct BlockPatch {
inner: Patch,
block_hash: Hash,
kind: BlockKind,
}
impl BlockPatch {
pub fn into_inner(self) -> Patch {
self.inner
}
pub fn block_hash(&self) -> Hash {
self.block_hash
}
pub fn kind(&self) -> BlockKind {
self.kind
}
}
impl AsRef<Patch> for BlockPatch {
fn as_ref(&self) -> &Patch {
&self.inner
}
}
#[derive(Debug)]
pub struct BlockchainMut {
inner: Blockchain,
dispatcher: Dispatcher,
}
impl AsRef<Blockchain> for BlockchainMut {
fn as_ref(&self) -> &Blockchain {
&self.inner
}
}
impl BlockchainMut {
pub fn immutable_view(&self) -> Blockchain {
self.inner.clone()
}
#[cfg(test)]
pub(crate) fn dispatcher(&mut self) -> &mut Dispatcher {
&mut self.dispatcher
}
pub fn snapshot(&self) -> Box<dyn Snapshot> {
self.inner.snapshot()
}
pub fn fork(&self) -> Fork {
self.inner.db.fork()
}
pub fn merge(&mut self, patch: Patch) -> StorageResult<()> {
self.inner.db.merge(patch)
}
fn create_genesis_block(&mut self, genesis_config: GenesisConfig) {
genesis_config
.consensus_config
.validate()
.expect("Invalid consensus config");
let mut fork = self.fork();
{
let schema = Schema::new(&fork);
schema
.consensus_config_entry()
.set(genesis_config.consensus_config);
schema.transactions_pool().clear();
}
for spec in genesis_config.artifacts {
self.dispatcher
.add_builtin_artifact(&fork, spec.artifact, spec.payload);
}
for inst in genesis_config.builtin_instances {
self.dispatcher
.add_builtin_service(&mut fork, inst.instance_spec, inst.constructor)
.expect("Unable to add a builtin service");
}
let patch = self.dispatcher.start_builtin_instances(fork);
self.merge(patch).unwrap();
let mut fork = self.fork();
let errors = self.dispatcher.after_transactions(&mut fork);
assert!(
errors.is_empty(),
"`after_transactions` failed for at least one service, errors: {:?}",
&errors
);
let patch = self.dispatcher.commit_block(fork);
self.merge(patch).unwrap();
let block_params = BlockParams::for_genesis_block();
let BlockPatch { inner: patch, .. } = self.create_patch(block_params, &());
self.dispatcher.notify_runtimes_about_commit(&patch);
self.merge(patch).unwrap();
log::info!(
"GENESIS_BLOCK ====== hash={}",
self.inner.last_hash().to_hex()
);
}
#[allow(clippy::needless_pass_by_value)]
pub fn create_patch<C>(&self, block_params: BlockParams<'_>, tx_cache: &C) -> BlockPatch
where
C: TransactionCache + ?Sized,
{
match block_params.contents {
BlockContents::Transactions(tx_hashes) => {
self.create_patch_inner(self.fork(), &block_params, tx_hashes, tx_cache)
}
BlockContents::Skip => self.create_skip_patch(&block_params),
}
}
fn create_skip_patch(&self, block_data: &BlockParams<'_>) -> BlockPatch {
let prev_block = self.inner.last_block();
let mut block_skip = Block {
height: prev_block.height, tx_count: 0,
prev_hash: prev_block.object_hash(),
tx_hash: HashTag::empty_list_hash(),
state_hash: prev_block.state_hash,
error_hash: HashTag::empty_map_hash(),
additional_headers: AdditionalHeaders::new(),
};
block_skip.add_header::<ProposerId>(block_data.proposer);
block_skip.add_epoch(block_data.epoch);
block_skip.set_skip();
let block_hash = block_skip.object_hash();
let fork = self.fork();
let mut schema = Schema::new(&fork);
schema.store_block_skip(block_skip);
BlockPatch {
inner: fork.into_patch(),
block_hash,
kind: BlockKind::Skip,
}
}
pub(crate) fn create_patch_inner<C>(
&self,
mut fork: Fork,
block_data: &BlockParams<'_>,
tx_hashes: &[Hash],
tx_cache: &C,
) -> BlockPatch
where
C: TransactionCache + ?Sized,
{
let height = Schema::new(&fork).next_height();
if height > Height(0) {
let errors = self.dispatcher.before_transactions(&mut fork);
let mut schema = Schema::new(&fork);
for (location, error) in errors {
schema.save_error(height, location, error);
}
}
for (index, hash) in (0..).zip(tx_hashes) {
self.execute_transaction(*hash, height, index, &mut fork, tx_cache);
}
if height > Height(0) {
let errors = self.dispatcher.after_transactions(&mut fork);
let mut schema = Schema::new(&fork);
for (location, error) in errors {
schema.save_error(height, location, error);
}
}
let (patch, block) = self.create_block_header(fork, block_data, height, tx_hashes);
log::trace!("Executing {:?}", block);
let block_hash = block.object_hash();
let fork = Fork::from(patch);
let schema = Schema::new(&fork);
schema.block_hashes_by_height().push(block_hash);
schema.blocks().put(&block_hash, block);
BlockPatch {
inner: fork.into_patch(),
block_hash,
kind: BlockKind::Normal,
}
}
fn create_block_header(
&self,
fork: Fork,
block_data: &BlockParams<'_>,
height: Height,
tx_hashes: &[Hash],
) -> (Patch, Block) {
let prev_hash = self.inner.last_hash();
let mut schema = Schema::new(&fork);
let error_hash = schema.call_errors_map(height).object_hash();
let tx_hash = schema.block_transactions(height).object_hash();
schema.clear_block_skip();
let patch = fork.into_patch();
let state_hash = SystemSchema::new(&patch).state_hash();
let mut block = Block {
height,
tx_count: tx_hashes.len() as u32,
prev_hash,
tx_hash,
state_hash,
error_hash,
additional_headers: AdditionalHeaders::new(),
};
block.add_header::<ProposerId>(block_data.proposer);
block.add_epoch(block_data.epoch);
(patch, block)
}
fn execute_transaction<C>(
&self,
tx_hash: Hash,
height: Height,
index: u32,
fork: &mut Fork,
tx_cache: &C,
) where
C: TransactionCache + ?Sized,
{
let transaction = PersistentPool::new(&*fork, tx_cache)
.get_transaction(tx_hash)
.unwrap_or_else(|| panic!("BUG: Cannot find transaction {:?} in database", tx_hash));
fork.flush();
let tx_result = self.dispatcher.execute(fork, tx_hash, index, &transaction);
let mut schema = Schema::new(&*fork);
if let Err(e) = tx_result {
schema.save_error(height, CallInBlock::transaction(index), e);
}
schema.commit_transaction(&tx_hash, height, transaction);
let location = TxLocation::new(height, index);
schema.transactions_locations().put(&tx_hash, location);
fork.flush();
}
pub fn commit<I>(&mut self, patch: BlockPatch, precommits: I) -> anyhow::Result<()>
where
I: IntoIterator<Item = Verified<Precommit>>,
{
let fork: Fork = patch.inner.into();
let schema = Schema::new(&fork);
schema.precommits(&patch.block_hash).extend(precommits);
match patch.kind {
BlockKind::Skip => {
self.merge(fork.into_patch())?;
}
BlockKind::Normal => {
let patch = self.dispatcher.commit_block_and_notify_runtimes(fork);
self.merge(patch)?;
let new_fork = self.fork();
Schema::new(&new_fork).update_transaction_count();
self.merge(new_fork.into_patch())?;
}
}
Ok(())
}
#[doc(hidden)] pub fn add_transactions_into_pool(
&mut self,
transactions: impl IntoIterator<Item = Verified<AnyTx>>,
) {
Self::add_transactions_into_db_pool(self.inner.db.as_ref(), transactions);
}
#[doc(hidden)] pub fn add_transactions_into_db_pool<Db: Database + ?Sized>(
db: &Db,
transactions: impl IntoIterator<Item = Verified<AnyTx>>,
) {
let fork = db.fork();
let mut schema = Schema::new(&fork);
for transaction in transactions {
if !schema.transactions().contains(&transaction.object_hash()) {
schema.add_transaction_into_pool(transaction);
}
}
db.merge(fork.into_patch())
.expect("Cannot update transaction pool");
}
}