#![deny(
missing_debug_implementations,
missing_docs,
unsafe_code,
bare_trait_objects
)]
extern crate actix_web;
#[cfg_attr(test, macro_use)]
#[cfg(test)]
extern crate assert_matches;
extern crate exonum;
#[macro_use]
extern crate failure;
extern crate futures;
#[macro_use]
extern crate log;
extern crate reqwest;
extern crate serde;
#[macro_use]
extern crate serde_derive;
#[cfg_attr(test, macro_use)]
#[cfg(test)]
extern crate exonum_derive;
extern crate protobuf;
extern crate serde_json;
extern crate serde_urlencoded;
extern crate tokio_core;
pub use api::{ApiKind, TestKitApi};
pub use compare::ComparableSnapshot;
pub use network::{TestNetwork, TestNetworkConfiguration, TestNode};
pub mod compare;
pub mod proto;
use futures::{sync::mpsc, Future, Stream};
use tokio_core::reactor::Core;
use std::sync::{Arc, RwLock};
use std::{fmt, net::SocketAddr};
use exonum::{
api::{
backends::actix::{ApiRuntimeConfig, SystemRuntimeConfig},
ApiAccess,
},
blockchain::{Blockchain, Schema as CoreSchema, Service, StoredConfiguration},
crypto::{self, Hash},
explorer::{BlockWithTransactions, BlockchainExplorer},
helpers::{Height, ValidatorId},
messages::{RawTransaction, Signed},
node::{ApiSender, ExternalMessage, State as NodeState},
storage::{MemoryDB, Patch, Snapshot},
};
use checkpoint_db::{CheckpointDb, CheckpointDbHandler};
use poll_events::poll_events;
#[macro_use]
mod macros;
mod api;
mod checkpoint_db;
mod network;
mod poll_events;
mod server;
pub struct TestKitBuilder {
our_validator_id: Option<ValidatorId>,
validator_count: Option<u16>,
services: Vec<Box<dyn Service>>,
logger: bool,
}
impl fmt::Debug for TestKitBuilder {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("TestKitBuilder")
.field(
"us",
&self
.our_validator_id
.map_or("Auditor".to_string(), |id| format!("Validator #{}", id.0)),
)
.field("validator_count", &self.validator_count)
.field(
"services",
&self
.services
.iter()
.map(|x| x.service_name())
.collect::<Vec<_>>(),
)
.field("logger", &self.logger)
.finish()
}
}
impl TestKitBuilder {
pub fn validator() -> Self {
TestKitBuilder {
validator_count: None,
our_validator_id: Some(ValidatorId(0)),
services: Vec::new(),
logger: false,
}
}
pub fn auditor() -> Self {
TestKitBuilder {
validator_count: None,
our_validator_id: None,
services: Vec::new(),
logger: false,
}
}
pub fn with_validators(mut self, validator_count: u16) -> Self {
assert!(
self.validator_count.is_none(),
"Number of validators is already specified"
);
self.validator_count = Some(validator_count);
self
}
pub fn with_service<S>(mut self, service: S) -> Self
where
S: Into<Box<dyn Service>>,
{
self.services.push(service.into());
self
}
pub fn with_logger(mut self) -> Self {
self.logger = true;
self
}
pub fn create(self) -> TestKit {
if self.logger {
exonum::helpers::init_logger().ok();
}
crypto::init();
TestKit::assemble(
self.services,
TestNetwork::with_our_role(self.our_validator_id, self.validator_count.unwrap_or(1)),
)
}
pub fn serve(self, public_api_address: SocketAddr, private_api_address: SocketAddr) {
let testkit = self.create();
testkit.run(public_api_address, private_api_address);
}
}
pub struct TestKit {
blockchain: Blockchain,
db_handler: CheckpointDbHandler<MemoryDB>,
events_stream: Box<dyn Stream<Item = (), Error = ()> + Send + Sync>,
network: TestNetwork,
api_sender: ApiSender,
cfg_proposal: Option<ConfigurationProposalState>,
}
impl fmt::Debug for TestKit {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("TestKit")
.field("blockchain", &self.blockchain)
.field("network", &self.network)
.field("cfg_change_proposal", &self.cfg_proposal)
.finish()
}
}
impl TestKit {
pub fn for_service<S>(service: S) -> Self
where
S: Into<Box<dyn Service>>,
{
TestKitBuilder::validator().with_service(service).create()
}
fn assemble(services: Vec<Box<dyn Service>>, network: TestNetwork) -> Self {
let api_channel = mpsc::channel(1_000);
let api_sender = ApiSender::new(api_channel.0.clone());
let db = CheckpointDb::new(MemoryDB::new());
let db_handler = db.handler();
let mut blockchain = Blockchain::new(
db,
services,
*network.us().service_keypair().0,
network.us().service_keypair().1.clone(),
api_sender.clone(),
);
let genesis = network.genesis_config();
blockchain.initialize(genesis.clone()).unwrap();
let events_stream: Box<dyn Stream<Item = (), Error = ()> + Send + Sync> = {
let mut blockchain = blockchain.clone();
Box::new(api_channel.1.and_then(move |event| {
let mut fork = blockchain.fork();
{
let mut schema = CoreSchema::new(&mut fork);
match event {
ExternalMessage::Transaction(tx) => {
let hash = tx.hash();
if !schema.transactions().contains(&hash) {
schema.add_transaction_into_pool(tx.clone());
}
}
ExternalMessage::PeerAdd(_)
| ExternalMessage::Enable(_)
| ExternalMessage::Rebroadcast
| ExternalMessage::Shutdown => { }
}
}
blockchain.merge(fork.into_patch()).unwrap();
Ok(())
}))
};
TestKit {
blockchain,
db_handler,
api_sender,
events_stream,
network,
cfg_proposal: None,
}
}
pub fn api(&self) -> TestKitApi {
TestKitApi::new(self)
}
pub fn poll_events(&mut self) {
poll_events(&mut self.events_stream);
}
pub fn snapshot(&self) -> Box<dyn Snapshot> {
self.blockchain.snapshot()
}
pub fn blockchain(&self) -> &Blockchain {
&self.blockchain
}
pub fn blockchain_mut(&mut self) -> &mut Blockchain {
&mut self.blockchain
}
pub fn checkpoint(&mut self) {
self.db_handler.checkpoint()
}
pub fn rollback(&mut self) {
self.db_handler.rollback()
}
pub fn probe_all<I>(&mut self, transactions: I) -> Box<dyn Snapshot>
where
I: IntoIterator<Item = Signed<RawTransaction>>,
{
self.poll_events();
let schema = CoreSchema::new(self.snapshot());
let uncommitted_txs = transactions.into_iter().filter(|tx| {
!schema.transactions().contains(&tx.hash())
|| schema.transactions_pool().contains(&tx.hash())
});
self.checkpoint();
self.create_block_with_transactions(uncommitted_txs);
let snapshot = self.snapshot();
self.rollback();
snapshot
}
pub fn probe(&mut self, transaction: Signed<RawTransaction>) -> Box<dyn Snapshot> {
self.probe_all(vec![transaction])
}
fn do_create_block(&mut self, tx_hashes: &[crypto::Hash]) -> BlockWithTransactions {
let new_block_height = self.height().next();
let last_hash = self.last_block_hash();
let config_patch = self.update_configuration(new_block_height);
let (block_hash, patch) = {
let validator_id = self.leader().validator_id().unwrap();
self.blockchain
.create_patch(validator_id, new_block_height, tx_hashes)
};
let patch = if let Some(config_patch) = config_patch {
let mut fork = self.blockchain.fork();
fork.merge(config_patch);
fork.merge(patch);
fork.into_patch()
} else {
patch
};
let propose = self
.leader()
.create_propose(new_block_height, &last_hash, tx_hashes);
let precommits: Vec<_> = self
.network()
.validators()
.iter()
.map(|v| v.create_precommit(&propose, &block_hash))
.collect();
self.blockchain
.commit(&patch, block_hash, precommits.into_iter())
.unwrap();
self.poll_events();
BlockchainExplorer::new(&self.blockchain)
.block_with_txs(self.height())
.unwrap()
}
fn update_configuration(&mut self, new_block_height: Height) -> Option<Patch> {
use ConfigurationProposalState::*;
let actual_from = new_block_height.next();
if let Some(cfg_proposal) = self.cfg_proposal.take() {
match cfg_proposal {
Uncommitted(cfg_proposal) => {
let stored = cfg_proposal.stored_configuration().clone();
let mut fork = self.blockchain.fork();
CoreSchema::new(&mut fork).commit_configuration(stored);
self.cfg_proposal = Some(Committed(cfg_proposal));
return Some(fork.into_patch());
}
Committed(cfg_proposal) => {
if cfg_proposal.actual_from() == actual_from {
self.network_mut().update_configuration(cfg_proposal);
} else {
self.cfg_proposal = Some(Committed(cfg_proposal));
}
}
}
}
None
}
pub fn next_configuration(&self) -> Option<&TestNetworkConfiguration> {
use ConfigurationProposalState::*;
self.cfg_proposal.as_ref().map(|p| match *p {
Committed(ref proposal) | Uncommitted(ref proposal) => proposal,
})
}
pub fn create_block_with_transactions<I>(&mut self, txs: I) -> BlockWithTransactions
where
I: IntoIterator<Item = Signed<RawTransaction>>,
{
let tx_hashes: Vec<_> = {
let blockchain = self.blockchain_mut();
let mut fork = blockchain.fork();
let hashes = {
let mut schema = CoreSchema::new(&mut fork);
txs.into_iter()
.map(|tx| {
let tx_id = tx.hash();
let tx_not_found = !schema.transactions().contains(&tx_id);
let tx_in_pool = schema.transactions_pool().contains(&tx_id);
assert!(
tx_not_found || tx_in_pool,
"Transaction is already committed: {:?}",
tx
);
if tx_not_found {
schema.add_transaction_into_pool(tx.clone());
}
tx_id
})
.collect()
};
blockchain.merge(fork.into_patch()).unwrap();
hashes
};
self.create_block_with_tx_hashes(&tx_hashes)
}
pub fn create_block_with_transaction(
&mut self,
tx: Signed<RawTransaction>,
) -> BlockWithTransactions {
self.create_block_with_transactions(txvec![tx])
}
pub fn create_block_with_tx_hashes(
&mut self,
tx_hashes: &[crypto::Hash],
) -> BlockWithTransactions {
self.poll_events();
{
let snapshot = self.blockchain.snapshot();
let schema = CoreSchema::new(&snapshot);
for hash in tx_hashes {
assert!(schema.transactions_pool().contains(hash));
}
}
self.do_create_block(tx_hashes)
}
pub fn create_block(&mut self) -> BlockWithTransactions {
self.poll_events();
let snapshot = self.blockchain.snapshot();
let schema = CoreSchema::new(&snapshot);
let txs = schema.transactions_pool();
let tx_hashes: Vec<_> = txs.iter().collect();
{
let blockchain = self.blockchain_mut();
let fork = blockchain.fork();
blockchain.merge(fork.into_patch()).unwrap();
}
self.do_create_block(&tx_hashes)
}
pub fn add_tx(&mut self, transaction: Signed<RawTransaction>) {
let mut fork = self.blockchain.fork();
let mut schema = CoreSchema::new(&mut fork);
schema.add_transaction_into_pool(transaction)
}
pub fn is_tx_in_pool(&self, tx_hash: &Hash) -> bool {
let snapshot = self.blockchain.snapshot();
let schema = CoreSchema::new(&snapshot);
schema.transactions_pool().contains(tx_hash)
}
pub fn create_blocks_until(&mut self, height: Height) {
while self.height() < height {
self.create_block();
}
}
pub fn last_block_hash(&self) -> crypto::Hash {
self.blockchain.last_hash()
}
pub fn height(&self) -> Height {
self.blockchain.last_block().height()
}
pub fn explorer(&self) -> BlockchainExplorer {
BlockchainExplorer::new(&self.blockchain)
}
pub fn actual_configuration(&self) -> StoredConfiguration {
CoreSchema::new(&self.snapshot()).actual_configuration()
}
pub fn validator(&self, id: ValidatorId) -> &TestNode {
&self.network.validators()[id.0 as usize]
}
pub fn majority_count(&self) -> usize {
NodeState::byzantine_majority_count(self.network().validators().len())
}
pub fn leader(&self) -> &TestNode {
&self.network().validators()[0]
}
pub fn network(&self) -> &TestNetwork {
&self.network
}
pub fn network_mut(&mut self) -> &mut TestNetwork {
&mut self.network
}
pub fn configuration_change_proposal(&self) -> TestNetworkConfiguration {
let stored_configuration = CoreSchema::new(&self.snapshot()).actual_configuration();
TestNetworkConfiguration::new(self.network(), stored_configuration)
}
pub fn commit_configuration_change(&mut self, proposal: TestNetworkConfiguration) {
use self::ConfigurationProposalState::*;
assert!(
self.height() < proposal.actual_from(),
"The `actual_from` height should be greater than the current."
);
assert!(
self.cfg_proposal.is_none(),
"There is an active configuration change proposal."
);
self.cfg_proposal = Some(Uncommitted(proposal));
}
fn run(mut self, public_api_address: SocketAddr, private_api_address: SocketAddr) {
let events_stream = self.remove_events_stream();
let testkit_ref = Arc::new(RwLock::new(self));
let system_runtime_config = SystemRuntimeConfig {
api_runtimes: vec![
ApiRuntimeConfig::new(public_api_address, ApiAccess::Public),
ApiRuntimeConfig::new(private_api_address, ApiAccess::Private),
],
api_aggregator: server::create_testkit_api_aggregator(&testkit_ref),
};
let system_runtime = system_runtime_config.start().unwrap();
let mut core = Core::new().unwrap();
core.run(events_stream).unwrap();
system_runtime.stop().unwrap();
}
fn remove_events_stream(&mut self) -> Box<dyn Future<Item = (), Error = ()>> {
let stream = std::mem::replace(&mut self.events_stream, Box::new(futures::stream::empty()));
Box::new(stream.for_each(|_| Ok(())))
}
pub fn us(&self) -> &TestNode {
self.network().us()
}
}
#[derive(Debug)]
enum ConfigurationProposalState {
Uncommitted(TestNetworkConfiguration),
Committed(TestNetworkConfiguration),
}
#[test]
fn test_create_block_heights() {
let mut testkit = TestKitBuilder::validator().create();
assert_eq!(Height(0), testkit.height());
testkit.create_block();
assert_eq!(Height(1), testkit.height());
testkit.create_blocks_until(Height(6));
assert_eq!(Height(6), testkit.height());
}
#[test]
fn test_number_of_validators_in_builder() {
let testkit = TestKitBuilder::auditor().create();
assert_eq!(testkit.network().validators().len(), 1);
assert_ne!(testkit.validator(ValidatorId(0)), testkit.us());
let testkit = TestKitBuilder::validator().create();
assert_eq!(testkit.network().validators().len(), 1);
assert_eq!(testkit.validator(ValidatorId(0)), testkit.us());
let testkit = TestKitBuilder::auditor().with_validators(3).create();
assert_eq!(testkit.network().validators().len(), 3);
let us = testkit.us();
assert!(!testkit.network().validators().iter().any(|v| v == us));
let testkit = TestKitBuilder::validator().with_validators(5).create();
assert_eq!(testkit.network().validators().len(), 5);
assert_eq!(testkit.validator(ValidatorId(0)), testkit.us());
}
#[test]
#[should_panic(expected = "validator should be present")]
fn test_zero_validators_in_builder() {
let testkit = TestKitBuilder::auditor().with_validators(0).create();
drop(testkit);
}
#[test]
#[should_panic(expected = "Number of validators is already specified")]
fn test_multiple_spec_of_validators_in_builder() {
let testkit = TestKitBuilder::auditor()
.with_validators(5)
.with_validators(2)
.create();
drop(testkit);
}