pub use exonum_merkledb::Error as FatalError;
pub use crate::runtime::{
error::{ErrorKind as ExecutionErrorKind, ExecutionStatus},
ExecutionError,
};
pub use self::{
block::{Block, BlockProof},
builder::{BlockchainBuilder, InstanceCollection},
config::{ConsensusConfig, ValidatorKeys},
schema::{IndexCoordinates, Schema, SchemaOrigin, TxLocation},
};
pub mod config;
use exonum_crypto::gen_keypair;
use exonum_merkledb::{
access::RawAccess, Database, Fork, MapIndex, ObjectHash, Patch, Result as StorageResult,
Snapshot, TemporaryDB,
};
use failure::{format_err, Error};
use futures::Future;
use std::{
collections::{BTreeMap, HashMap},
iter,
sync::Arc,
};
use crate::{
blockchain::config::GenesisConfig,
crypto::{Hash, PublicKey, SecretKey},
helpers::{Height, Round, ValidateInput, ValidatorId},
messages::{AnyTx, Connect, Message, Precommit, Verified},
node::ApiSender,
runtime::{error::catch_panic, ArtifactSpec, Dispatcher},
};
mod block;
mod builder;
mod schema;
#[cfg(test)]
pub mod tests;
#[derive(Debug, Clone)]
pub struct Blockchain {
pub(crate) api_sender: ApiSender,
db: Arc<dyn Database>,
service_keypair: (PublicKey, SecretKey),
}
impl Blockchain {
pub fn new(
database: impl Into<Arc<dyn Database>>,
service_keypair: (PublicKey, SecretKey),
api_sender: ApiSender,
) -> Self {
Self {
db: database.into(),
service_keypair,
api_sender,
}
}
pub fn build_for_tests() -> Self {
Self::new(TemporaryDB::new(), gen_keypair(), ApiSender::closed())
}
pub fn snapshot(&self) -> Box<dyn Snapshot> {
self.db.snapshot()
}
pub fn last_hash(&self) -> Hash {
Schema::new(&self.snapshot())
.block_hashes_by_height()
.last()
.unwrap_or_else(Hash::default)
}
pub fn last_block(&self) -> Block {
Schema::new(&self.snapshot()).last_block()
}
#[doc(hidden)]
pub fn broadcast_raw_transaction(&self, tx: AnyTx) -> Result<(), Error> {
self.api_sender.broadcast_transaction(Verified::from_value(
tx,
self.service_keypair.0,
&self.service_keypair.1,
))
}
pub fn pool_size(&self) -> u64 {
Schema::new(&self.snapshot()).transactions_pool_len()
}
pub fn get_saved_peers(&self) -> HashMap<PublicKey, Verified<Connect>> {
let snapshot = self.snapshot();
Schema::new(&snapshot).peers_cache().iter().collect()
}
#[cfg(test)]
pub fn into_mut(self, genesis_config: GenesisConfig) -> BlockchainBuilder {
BlockchainBuilder::new(self, genesis_config)
}
#[cfg(test)]
pub fn into_mut_with_dummy_config(self) -> BlockchainBuilder {
use crate::{blockchain::config::GenesisConfigBuilder, helpers::generate_testnet_config};
use exonum_crypto::KeyPair;
let mut config = generate_testnet_config(1, 0).pop().unwrap();
config.keys.service = KeyPair::from(self.service_keypair.clone());
let genesis_config = GenesisConfigBuilder::with_consensus_config(config.consensus).build();
self.into_mut(genesis_config)
}
pub fn sender(&self) -> &ApiSender {
&self.api_sender
}
pub fn service_keypair(&self) -> &(PublicKey, SecretKey) {
&self.service_keypair
}
}
#[derive(Debug)]
pub struct BlockchainMut {
inner: Blockchain,
dispatcher: Dispatcher,
}
impl AsRef<Blockchain> for BlockchainMut {
fn as_ref(&self) -> &Blockchain {
&self.inner
}
}
impl BlockchainMut {
#[cfg(test)]
pub(crate) fn inner(&mut self) -> &mut Blockchain {
&mut self.inner
}
#[cfg(test)]
pub(crate) fn dispatcher(&mut self) -> &mut Dispatcher {
&mut self.dispatcher
}
pub fn immutable_view(&self) -> Blockchain {
self.inner.clone()
}
pub fn snapshot(&self) -> Box<dyn Snapshot> {
self.inner.snapshot()
}
pub fn fork(&self) -> Fork {
self.inner.db.fork()
}
pub fn merge(&mut self, patch: Patch) -> StorageResult<()> {
self.inner.db.merge(patch)
}
fn create_genesis_block(&mut self, genesis_config: GenesisConfig) -> Result<(), Error> {
genesis_config.consensus_config.validate()?;
let mut fork = self.fork();
Schema::new(&fork)
.consensus_config_entry()
.set(genesis_config.consensus_config);
for ArtifactSpec { artifact, payload } in genesis_config.artifacts {
Dispatcher::commit_artifact(&fork, artifact.clone(), payload.clone())?;
self.dispatcher.deploy_artifact(artifact, payload).wait()?
}
for inst in genesis_config.builtin_instances {
self.dispatcher
.add_builtin_service(&mut fork, inst.instance_spec, inst.constructor)?;
}
self.dispatcher.before_commit(&mut fork);
self.dispatcher.commit_block(&mut fork);
self.merge(fork.into_patch())?;
let (_, patch) = self.create_patch(
ValidatorId::zero(),
Height::zero(),
&[],
&mut BTreeMap::new(),
);
let fork = Fork::from(patch);
self.dispatcher
.notify_runtimes_about_commit(fork.snapshot_without_unflushed_changes());
self.merge(fork.into_patch())?;
info!(
"GENESIS_BLOCK ====== hash={}",
self.inner.last_hash().to_hex()
);
Ok(())
}
pub fn create_patch(
&self,
proposer_id: ValidatorId,
height: Height,
tx_hashes: &[Hash],
tx_cache: &mut BTreeMap<Hash, Verified<AnyTx>>,
) -> (Hash, Patch) {
let mut fork = self.fork();
let last_hash = self.inner.last_hash();
for (index, hash) in tx_hashes.iter().enumerate() {
self.execute_transaction(*hash, height, index, &mut fork, tx_cache)
.expect("Transaction execution error");
}
if height > Height(0) {
self.dispatcher.before_commit(&mut fork);
}
let schema = Schema::new(&fork);
let state_hash = {
let mut sum_table = schema.state_hash_aggregator();
sum_table.clear();
let state_hashes = self
.dispatcher
.state_hash(fork.snapshot_without_unflushed_changes())
.into_iter()
.chain(IndexCoordinates::locate(
SchemaOrigin::Core,
schema.state_hash(),
));
for (coordinate, hash) in state_hashes {
sum_table.put(&coordinate, hash);
}
sum_table.object_hash()
};
let tx_hash = schema.block_transactions(height).object_hash();
let block = Block::new(
proposer_id,
height,
tx_hashes.len() as u32,
last_hash,
tx_hash,
state_hash,
);
trace!("execute block = {:?}", block);
let block_hash = block.object_hash();
let schema = Schema::new(&fork);
schema.block_hashes_by_height().push(block_hash);
schema.blocks().put(&block_hash, block);
(block_hash, fork.into_patch())
}
fn execute_transaction(
&self,
tx_hash: Hash,
height: Height,
index: usize,
fork: &mut Fork,
tx_cache: &mut BTreeMap<Hash, Verified<AnyTx>>,
) -> Result<(), Error> {
let schema = Schema::new(&*fork);
let transaction = get_transaction(&tx_hash, &schema.transactions(), &tx_cache)
.ok_or_else(|| format_err!("BUG: Cannot find transaction {:?} in database", tx_hash))?;
fork.flush();
let tx_result = catch_panic(|| self.dispatcher.execute(fork, tx_hash, &transaction));
match &tx_result {
Ok(_) => {
fork.flush();
}
Err(e) => {
if e.kind == ExecutionErrorKind::Panic {
error!("{:?} transaction execution panicked: {:?}", transaction, e);
} else {
info!("{:?} transaction execution failed: {:?}", tx_hash, e);
}
fork.rollback();
}
}
let mut schema = Schema::new(&*fork);
schema
.transaction_results()
.put(&tx_hash, ExecutionStatus(tx_result));
schema.commit_transaction(&tx_hash, height, transaction);
tx_cache.remove(&tx_hash);
let location = TxLocation::new(height, index as u64);
schema.transactions_locations().put(&tx_hash, location);
fork.flush();
Ok(())
}
pub fn commit<I>(
&mut self,
patch: Patch,
block_hash: Hash,
precommits: I,
tx_cache: &mut BTreeMap<Hash, Verified<AnyTx>>,
) -> Result<(), Error>
where
I: IntoIterator<Item = Verified<Precommit>>,
{
let mut fork: Fork = patch.into();
let mut schema = Schema::new(&fork);
schema.precommits(&block_hash).extend(precommits);
schema.consensus_messages_cache().clear();
let txs_in_block = schema.last_block().tx_count();
schema.update_transaction_count(u64::from(txs_in_block));
let tx_hashes = tx_cache.keys().cloned().collect::<Vec<Hash>>();
for tx_hash in tx_hashes {
if let Some(tx) = tx_cache.remove(&tx_hash) {
if !schema.transactions().contains(&tx_hash) {
schema.add_transaction_into_pool(tx);
}
}
}
self.dispatcher.commit_block_and_notify_runtimes(&mut fork);
self.merge(fork.into_patch())?;
Ok(())
}
#[doc(hidden)]
pub fn add_transactions_into_pool(
&mut self,
transactions: impl IntoIterator<Item = Verified<AnyTx>>,
) {
Self::add_transactions_into_db_pool(self.inner.db.as_ref(), transactions);
}
#[doc(hidden)]
pub fn add_transactions_into_db_pool<Db: Database + ?Sized>(
db: &Db,
transactions: impl IntoIterator<Item = Verified<AnyTx>>,
) {
let fork = db.fork();
let mut schema = Schema::new(&fork);
for transaction in transactions {
if !schema.transactions().contains(&transaction.object_hash()) {
schema.add_transaction_into_pool(transaction);
}
}
db.merge(fork.into_patch())
.expect("Cannot update transaction pool");
}
pub fn check_tx(&self, tx: &Verified<AnyTx>) -> Result<(), ExecutionError> {
self.dispatcher.check_tx(tx)
}
pub fn shutdown(&mut self) {
self.dispatcher.shutdown();
}
pub(crate) fn save_message<T: Into<Message>>(&mut self, round: Round, message: T) {
self.save_messages(round, iter::once(message.into()));
}
pub(crate) fn save_messages<I>(&mut self, round: Round, iter: I)
where
I: IntoIterator<Item = Message>,
{
let fork = self.fork();
let mut schema = Schema::new(&fork);
schema.consensus_messages_cache().extend(iter);
schema.set_consensus_round(round);
self.merge(fork.into_patch())
.expect("Unable to save messages to the consensus cache");
}
pub(crate) fn save_peer(&mut self, pubkey: &PublicKey, peer: Verified<Connect>) {
let fork = self.fork();
Schema::new(&fork).peers_cache().put(pubkey, peer);
self.merge(fork.into_patch())
.expect("Unable to save peer to the peers cache");
}
pub fn remove_peer_with_pubkey(&mut self, key: &PublicKey) {
let fork = self.fork();
Schema::new(&fork).peers_cache().remove(key);
self.merge(fork.into_patch())
.expect("Unable to remove peer from the peers cache");
}
}
pub(crate) fn get_transaction<T: RawAccess>(
hash: &Hash,
txs: &MapIndex<T, Hash, Verified<AnyTx>>,
tx_cache: &BTreeMap<Hash, Verified<AnyTx>>,
) -> Option<Verified<AnyTx>> {
txs.get(&hash).or_else(|| tx_cache.get(&hash).cloned())
}
pub(crate) fn contains_transaction<T: RawAccess>(
hash: &Hash,
txs: &MapIndex<T, Hash, Verified<AnyTx>>,
tx_cache: &BTreeMap<Hash, Verified<AnyTx>>,
) -> bool {
txs.contains(&hash) || tx_cache.contains_key(&hash)
}