use anyhow::format_err;
use exonum_derive::{BinaryValue, ObjectHash};
use exonum_merkledb::{
access::{Access, AccessExt, RawAccessMut},
impl_binary_key_for_binary_value,
indexes::{Entries, Values},
Entry, KeySetIndex, ListIndex, MapIndex, ObjectHash, ProofEntry, ProofListIndex, ProofMapIndex,
};
use exonum_proto::ProtobufConvert;
use std::fmt;
use super::{Block, BlockProof, CallProof, ConsensusConfig};
use crate::{
crypto::{Hash, PublicKey},
helpers::{Height, ValidatorId},
messages::{AnyTx, Precommit, Verified},
proto::schema::blockchain as pb_blockchain,
runtime::{ExecutionError, ExecutionErrorAux, InstanceId},
};
macro_rules! define_names {
(
$(
$name:ident => $value:expr;
)+
) => (
$(const $name: &str = concat!("core.", $value);)*
)
}
define_names!(
TRANSACTIONS => "transactions";
CALL_ERRORS => "call_errors";
CALL_ERRORS_AUX => "call_errors_aux";
TRANSACTIONS_LEN => "transactions_len";
TRANSACTIONS_POOL => "transactions_pool";
TRANSACTIONS_POOL_LEN => "transactions_pool_len";
TRANSACTIONS_LOCATIONS => "transactions_locations";
BLOCKS => "blocks";
BLOCK_HASHES_BY_HEIGHT => "block_hashes_by_height";
BLOCK_TRANSACTIONS => "block_transactions";
BLOCK_SKIP => "block_skip";
PRECOMMITS => "precommits";
CONSENSUS_CONFIG => "consensus_config";
);
#[derive(Debug, Clone, Copy, PartialEq)]
#[derive(Serialize, Deserialize)]
#[derive(ProtobufConvert, BinaryValue, ObjectHash)]
#[protobuf_convert(source = "pb_blockchain::TxLocation")]
pub struct TxLocation {
block_height: Height,
position_in_block: u32,
}
impl TxLocation {
pub fn new(block_height: Height, position_in_block: u32) -> Self {
Self {
block_height,
position_in_block,
}
}
pub fn block_height(&self) -> Height {
self.block_height
}
pub fn position_in_block(&self) -> u32 {
self.position_in_block
}
}
#[derive(Debug, Clone, Copy)]
pub struct Schema<T> {
access: T,
}
impl<T: Access> Schema<T> {
#[doc(hidden)]
pub fn new(access: T) -> Self {
Self { access }
}
pub fn transactions(&self) -> MapIndex<T::Base, Hash, Verified<AnyTx>> {
self.access.get_map(TRANSACTIONS)
}
pub(crate) fn call_errors_map(
&self,
block_height: Height,
) -> ProofMapIndex<T::Base, CallInBlock, ExecutionError> {
self.access.get_proof_map((CALL_ERRORS, &block_height.0))
}
fn call_errors_aux(
&self,
block_height: Height,
) -> MapIndex<T::Base, CallInBlock, ExecutionErrorAux> {
self.access.get_map((CALL_ERRORS_AUX, &block_height.0))
}
pub fn call_records(&self, block_height: Height) -> Option<CallRecords<T>> {
self.block_hash_by_height(block_height)?;
Some(CallRecords {
height: block_height,
errors: self.call_errors_map(block_height),
errors_aux: self.call_errors_aux(block_height),
access: self.access.clone(),
})
}
pub fn transaction_result(&self, location: TxLocation) -> Option<Result<(), ExecutionError>> {
let records = self.call_records(location.block_height)?;
let txs_in_block = self.block_transactions(location.block_height).len();
if txs_in_block <= u64::from(location.position_in_block) {
return None;
}
let status = records.get(CallInBlock::transaction(location.position_in_block));
Some(status)
}
fn transactions_len_index(&self) -> Entry<T::Base, u64> {
self.access.get_entry(TRANSACTIONS_LEN)
}
pub fn transactions_len(&self) -> u64 {
let pool = self.transactions_len_index();
pool.get().unwrap_or(0)
}
pub fn transactions_pool(&self) -> KeySetIndex<T::Base, Hash> {
self.access.get_key_set(TRANSACTIONS_POOL)
}
#[doc(hidden)]
pub fn transactions_pool_len_index(&self) -> Entry<T::Base, u64> {
self.access.get_entry(TRANSACTIONS_POOL_LEN)
}
pub fn transactions_pool_len(&self) -> u64 {
let pool = self.transactions_pool_len_index();
pool.get().unwrap_or(0)
}
pub fn transactions_locations(&self) -> MapIndex<T::Base, Hash, TxLocation> {
self.access.get_map(TRANSACTIONS_LOCATIONS)
}
pub fn blocks(&self) -> MapIndex<T::Base, Hash, Block> {
self.access.get_map(BLOCKS)
}
pub fn block_hashes_by_height(&self) -> ListIndex<T::Base, Hash> {
self.access.get_list(BLOCK_HASHES_BY_HEIGHT)
}
pub fn block_transactions(&self, height: Height) -> ProofListIndex<T::Base, Hash> {
let height: u64 = height.into();
self.access.get_proof_list((BLOCK_TRANSACTIONS, &height))
}
fn block_skip_entry(&self) -> Entry<T::Base, Block> {
self.access.get_entry(BLOCK_SKIP)
}
pub fn block_skip(&self) -> Option<Block> {
self.block_skip_entry().get()
}
pub fn block_skip_and_precommits(&self) -> Option<BlockProof> {
let block = self.block_skip_entry().get()?;
let precommits = self.precommits(&block.object_hash()).iter().collect();
Some(BlockProof::new(block, precommits))
}
pub fn precommits(&self, hash: &Hash) -> ListIndex<T::Base, Verified<Precommit>> {
self.access.get_list((PRECOMMITS, hash))
}
#[doc(hidden)]
pub fn consensus_config_entry(&self) -> ProofEntry<T::Base, ConsensusConfig> {
self.access.get_proof_entry(CONSENSUS_CONFIG)
}
pub fn block_hash_by_height(&self, height: Height) -> Option<Hash> {
self.block_hashes_by_height().get(height.into())
}
pub fn block_and_precommits(&self, height: Height) -> Option<BlockProof> {
let block_hash = self.block_hash_by_height(height)?;
let block = self.blocks().get(&block_hash).unwrap();
let precommits = self.precommits(&block_hash).iter().collect();
Some(BlockProof::new(block, precommits))
}
pub fn last_block(&self) -> Block {
let hash = self
.block_hashes_by_height()
.last()
.expect("An attempt to get the `last_block` during creating the genesis block.");
self.blocks().get(&hash).unwrap()
}
pub fn height(&self) -> Height {
let len = self.block_hashes_by_height().len();
assert!(
len > 0,
"An attempt to get the actual `height` during creating the genesis block."
);
Height(len - 1)
}
pub fn next_height(&self) -> Height {
let len = self.block_hashes_by_height().len();
Height(len)
}
pub fn consensus_config(&self) -> ConsensusConfig {
self.consensus_config_entry()
.get()
.expect("Consensus configuration is absent")
}
pub fn validator_id(&self, service_public_key: PublicKey) -> Option<ValidatorId> {
self.consensus_config()
.find_validator(|validator_keys| service_public_key == validator_keys.service_key)
}
}
impl<T: Access> Schema<T>
where
T::Base: RawAccessMut,
{
#[doc(hidden)]
pub fn add_transaction_into_pool(&mut self, tx: Verified<AnyTx>) {
self.transactions_pool().insert(&tx.object_hash());
let x = self.transactions_pool_len_index().get().unwrap_or(0);
self.transactions_pool_len_index().set(x + 1);
self.transactions().put(&tx.object_hash(), tx);
}
pub(crate) fn commit_transaction(&mut self, hash: &Hash, height: Height, tx: Verified<AnyTx>) {
if !self.transactions().contains(hash) {
self.transactions().put(hash, tx)
}
self.block_transactions(height).push(*hash);
}
pub(crate) fn update_transaction_count(&mut self) {
let block_transactions = self.block_transactions(self.height());
let count = block_transactions.len();
let mut len_index = self.transactions_len_index();
let new_len = len_index.get().unwrap_or(0) + count;
len_index.set(new_len);
let mut pool = self.transactions_pool();
let pool_count = block_transactions
.iter()
.filter(|tx_hash| {
if pool.contains(tx_hash) {
pool.remove(tx_hash);
true
} else {
false
}
})
.count();
let mut pool_len_index = self.transactions_pool_len_index();
let new_pool_len = pool_len_index.get().unwrap_or(0) - pool_count as u64;
pool_len_index.set(new_pool_len);
}
pub(crate) fn save_error(
&mut self,
height: Height,
call: CallInBlock,
mut err: ExecutionError,
) {
let aux = err.split_aux();
self.call_errors_map(height).put(&call, err);
self.call_errors_aux(height).put(&call, aux);
}
pub(super) fn clear_block_skip(&mut self) {
if let Some(block_skip) = self.block_skip_entry().take() {
let block_hash = block_skip.object_hash();
self.precommits(&block_hash).clear();
}
}
pub(super) fn store_block_skip(&mut self, block_skip: Block) {
self.clear_block_skip();
self.block_skip_entry().set(block_skip);
}
}
#[derive(Debug)]
pub struct CallRecords<T: Access> {
height: Height,
errors: ProofMapIndex<T::Base, CallInBlock, ExecutionError>,
errors_aux: MapIndex<T::Base, CallInBlock, ExecutionErrorAux>,
access: T,
}
impl<T: Access> CallRecords<T> {
pub fn errors(&self) -> CallErrorsIter<'_> {
CallErrorsIter {
errors_iter: self.errors.iter(),
aux_iter: self.errors_aux.values(),
}
}
pub fn get(&self, call: CallInBlock) -> Result<(), ExecutionError> {
match self.errors.get(&call) {
Some(mut err) => {
let aux = self
.errors_aux
.get(&call)
.expect("BUG: Aux info is not saved for an error");
err.recombine_with_aux(aux);
Err(err)
}
None => Ok(()),
}
}
pub fn get_proof(&self, call: CallInBlock) -> CallProof {
let block_proof = Schema::new(self.access.clone())
.block_and_precommits(self.height)
.unwrap();
let error_description = self.errors_aux.get(&call).map(|aux| aux.description);
let call_proof = self.errors.get_proof(call);
CallProof::new(block_proof, call_proof, error_description)
}
}
#[derive(Debug)]
pub struct CallErrorsIter<'a> {
errors_iter: Entries<'a, CallInBlock, ExecutionError>,
aux_iter: Values<'a, ExecutionErrorAux>,
}
impl Iterator for CallErrorsIter<'_> {
type Item = (CallInBlock, ExecutionError);
fn next(&mut self) -> Option<Self::Item> {
let (call, mut error) = self.errors_iter.next()?;
let aux = self
.aux_iter
.next()
.expect("BUG: Aux info is not saved for an error");
error.recombine_with_aux(aux);
Some((call, error))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Serialize, Deserialize, BinaryValue, ObjectHash)]
#[serde(tag = "type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum CallInBlock {
BeforeTransactions {
id: InstanceId,
},
Transaction {
index: u32,
},
AfterTransactions {
id: InstanceId,
},
}
impl ProtobufConvert for CallInBlock {
type ProtoStruct = pb_blockchain::CallInBlock;
fn to_pb(&self) -> Self::ProtoStruct {
let mut pb = Self::ProtoStruct::new();
match self {
Self::BeforeTransactions { id } => pb.set_before_transactions(*id),
Self::Transaction { index } => pb.set_transaction(*index),
Self::AfterTransactions { id } => pb.set_after_transactions(*id),
}
pb
}
fn from_pb(pb: Self::ProtoStruct) -> anyhow::Result<Self> {
if pb.has_before_transactions() {
Ok(Self::BeforeTransactions {
id: pb.get_before_transactions(),
})
} else if pb.has_transaction() {
Ok(Self::Transaction {
index: pb.get_transaction(),
})
} else if pb.has_after_transactions() {
Ok(Self::AfterTransactions {
id: pb.get_after_transactions(),
})
} else {
Err(format_err!("Invalid location format"))
}
}
}
impl CallInBlock {
pub fn before_transactions(id: InstanceId) -> Self {
Self::BeforeTransactions { id }
}
pub fn transaction(index: u32) -> Self {
Self::Transaction { index }
}
pub fn after_transactions(id: InstanceId) -> Self {
Self::AfterTransactions { id }
}
}
impl_binary_key_for_binary_value!(CallInBlock);
impl fmt::Display for CallInBlock {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::BeforeTransactions { id } => write!(
formatter,
"`before_transactions` for service with ID {}",
id
),
Self::Transaction { index } => write!(formatter, "transaction #{}", index + 1),
Self::AfterTransactions { id } => {
write!(formatter, "`after_transactions` for service with ID {}", id)
}
}
}
}
#[test]
fn location_json_serialization() {
use pretty_assertions::assert_eq;
use serde_json::json;
let location = CallInBlock::transaction(1);
assert_eq!(
serde_json::to_value(location).unwrap(),
json!({ "type": "transaction", "index": 1 })
);
let location = CallInBlock::after_transactions(1_000);
assert_eq!(
serde_json::to_value(location).unwrap(),
json!({ "type": "after_transactions", "id": 1_000 })
);
}