#![warn(
missing_debug_implementations,
missing_docs,
unsafe_code,
bare_trait_objects
)]
#![warn(clippy::pedantic, clippy::nursery)]
#![allow(
// Next `cast_*` lints don't give alternatives.
clippy::cast_possible_wrap, clippy::cast_possible_truncation, clippy::cast_sign_loss,
// Next lints produce too much noise/false positives.
clippy::module_name_repetitions, clippy::similar_names, clippy::must_use_candidate,
clippy::pub_enum_variant_names,
// '... may panic' lints.
clippy::indexing_slicing,
// Too much work to fix.
clippy::missing_errors_doc, clippy::missing_const_for_fn
)]
pub use crate::{
api::{ApiKind, RequestBuilder, TestKitApi, TestKitApiClient},
builder::TestKitBuilder,
network::{TestNetwork, TestNode},
};
pub use exonum_explorer as explorer;
pub use exonum_rust_runtime::spec::Spec;
use exonum::{
blockchain::{
config::GenesisConfig, ApiSender, BlockParams, Blockchain, BlockchainBuilder,
BlockchainMut, ConsensusConfig,
},
crypto::{self, Hash},
helpers::{byzantine_quorum, Height, ValidatorId},
merkledb::{BinaryValue, Database, ObjectHash, Snapshot, TemporaryDB},
messages::{AnyTx, Verified},
runtime::{InstanceId, RuntimeInstance, SnapshotExt},
};
use exonum_api::{
ApiAccess, ApiAggregator, ApiManager, ApiManagerConfig, UpdateEndpoints, WebServerConfig,
};
use exonum_explorer::{BlockWithTransactions, BlockchainExplorer};
use exonum_rust_runtime::{RustRuntimeBuilder, ServiceFactory};
use futures::{
channel::mpsc,
prelude::*,
stream::{self, BoxStream},
StreamExt,
};
#[cfg(feature = "exonum-node")]
use exonum_node::{ExternalMessage, NodePlugin, PluginApiContext, SharedNodeState};
use std::{
collections::HashMap,
fmt, iter, mem,
net::SocketAddr,
sync::{Arc, Mutex},
};
use crate::{
checkpoint_db::{CheckpointDb, CheckpointDbHandler},
server::TestKitActor,
};
mod api;
mod builder;
mod checkpoint_db;
pub mod migrations;
mod network;
pub mod server;
type ApiNotifierChannel = (
mpsc::Sender<UpdateEndpoints>,
mpsc::Receiver<UpdateEndpoints>,
);
pub struct TestKit {
blockchain: BlockchainMut,
db_handler: CheckpointDbHandler<TemporaryDB>,
events_stream: BoxStream<'static, ()>,
processing_lock: Arc<Mutex<()>>,
network: TestNetwork,
api_sender: ApiSender,
api_notifier_channel: ApiNotifierChannel,
api_aggregator: ApiAggregator,
#[cfg(feature = "exonum-node")]
plugins: Vec<Box<dyn NodePlugin>>,
#[cfg(feature = "exonum-node")]
control_channel: (
mpsc::Sender<ExternalMessage>,
mpsc::Receiver<ExternalMessage>,
),
}
impl fmt::Debug for TestKit {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("TestKit")
.field("blockchain", &self.blockchain)
.field("network", &self.network)
.finish()
}
}
impl TestKit {
pub fn for_rust_service(
service_factory: impl ServiceFactory,
name: impl Into<String>,
id: InstanceId,
constructor: impl BinaryValue,
) -> Self {
let spec = Spec::new(service_factory).with_instance(id, name, constructor);
TestKitBuilder::validator().with(spec).build()
}
fn assemble(
database: impl Into<CheckpointDb<TemporaryDB>>,
network: TestNetwork,
genesis_config: Option<GenesisConfig>,
runtimes: Vec<RuntimeInstance>,
api_notifier_channel: ApiNotifierChannel,
) -> Self {
let api_channel = mpsc::channel(1_000);
let api_sender = ApiSender::new(api_channel.0.clone());
let db = database.into();
let db_handler = db.handler();
let db = Arc::new(db);
let blockchain = Blockchain::new(
Arc::clone(&db) as Arc<dyn Database>,
network.us().service_keypair(),
api_sender.clone(),
);
let mut builder = BlockchainBuilder::new(blockchain);
if let Some(genesis_config) = genesis_config {
builder = builder.with_genesis_config(genesis_config);
}
for runtime in runtimes {
builder = builder.with_runtime(runtime);
}
let blockchain = builder.build();
let processing_lock = Arc::new(Mutex::new(()));
let processing_lock_ = Arc::clone(&processing_lock);
let events_stream = api_channel.1.map(move |transaction| {
let _guard = processing_lock_.lock().unwrap();
let snapshot = db.snapshot();
if let Err(error) = Blockchain::check_tx(&snapshot, &transaction) {
log::warn!(
"Did not add transaction {:?} to pool because it is incorrect. {}",
transaction.payload(),
error
);
} else {
BlockchainMut::add_transactions_into_db_pool(db.as_ref(), iter::once(transaction));
}
});
Self {
blockchain,
db_handler,
api_sender,
events_stream: events_stream.boxed(),
processing_lock,
network,
api_notifier_channel,
api_aggregator: ApiAggregator::new(),
#[cfg(feature = "exonum-node")]
plugins: vec![],
#[cfg(feature = "exonum-node")]
control_channel: mpsc::channel(100),
}
}
#[cfg(feature = "exonum-node")]
pub(crate) fn set_plugins(&mut self, plugins: Vec<Box<dyn NodePlugin>>) {
debug_assert!(self.plugins.is_empty());
self.plugins = plugins;
self.api_aggregator = self.create_api_aggregator();
}
#[cfg(feature = "exonum-node")]
fn create_api_aggregator(&self) -> ApiAggregator {
let mut aggregator = ApiAggregator::new();
let node_state = SharedNodeState::new(10_000);
let plugin_api_context = PluginApiContext::new(
self.blockchain.as_ref(),
&node_state,
ApiSender::new(self.control_channel.0.clone()),
);
for plugin in &self.plugins {
aggregator.extend(plugin.wire_api(plugin_api_context.clone()));
}
aggregator
}
#[cfg(not(feature = "exonum-node"))]
fn create_api_aggregator(&self) -> ApiAggregator {
ApiAggregator::new()
}
#[cfg(feature = "exonum-node")]
pub fn poll_control_messages(&mut self) -> Vec<ExternalMessage> {
let mut buffer = vec![];
while let Some(message) = self.control_channel.1.next().now_or_never().flatten() {
buffer.push(message);
}
buffer
}
pub fn api(&mut self) -> TestKitApi {
TestKitApi::new(self)
}
fn update_aggregator(&mut self) -> ApiAggregator {
let mut maybe_update = None;
while let Some(update) = self.api_notifier_channel.1.next().now_or_never().flatten() {
maybe_update = Some(update);
}
if let Some(update) = maybe_update {
let mut aggregator = self.create_api_aggregator();
aggregator.extend(update.into_endpoints());
self.api_aggregator = aggregator;
}
self.api_aggregator.clone()
}
pub fn poll_events(&mut self) {
while let Some(()) = self.events_stream.next().now_or_never().flatten() {
}
}
pub fn snapshot(&self) -> Box<dyn Snapshot> {
self.blockchain.snapshot()
}
pub fn blockchain(&self) -> Blockchain {
self.blockchain.as_ref().to_owned()
}
pub fn checkpoint(&mut self) {
self.db_handler.checkpoint()
}
pub fn rollback(&mut self) {
self.db_handler.rollback()
}
fn do_create_block(&mut self, tx_hashes: &[Hash]) -> BlockWithTransactions {
let new_block_height = self.height().next();
let saved_consensus_config = self.consensus_config();
let validator_id = self.leader().validator_id().unwrap();
let guard = self.processing_lock.lock().unwrap();
let block_params = BlockParams::new(validator_id, new_block_height, tx_hashes);
let patch = self.blockchain.create_patch(block_params, &());
let block_hash = patch.block_hash();
let precommits: Vec<_> = self
.network()
.validators()
.iter()
.map(|validator| validator.create_precommit(new_block_height, block_hash))
.collect();
self.blockchain
.commit(patch, precommits.into_iter())
.unwrap();
drop(guard);
let actual_consensus_config = self.consensus_config();
if actual_consensus_config != saved_consensus_config {
self.network_mut()
.update_consensus_config(&actual_consensus_config);
}
self.poll_events();
let snapshot = self.snapshot();
#[cfg(feature = "exonum-node")]
for plugin in &self.plugins {
plugin.after_commit(&snapshot);
}
BlockchainExplorer::new(&snapshot)
.block_with_txs(self.height())
.unwrap()
}
pub fn create_block_with_transactions<I>(&mut self, txs: I) -> BlockWithTransactions
where
I: IntoIterator<Item = Verified<AnyTx>>,
{
let snapshot = self.snapshot();
let schema = snapshot.for_core();
let mut unknown_transactions = vec![];
let tx_hashes: Vec<_> = txs
.into_iter()
.map(|tx| {
let tx_id = tx.object_hash();
let tx_not_found = !schema.transactions().contains(&tx_id);
let tx_in_pool = schema.transactions_pool().contains(&tx_id);
assert!(
tx_not_found || tx_in_pool,
"Transaction is already committed: {:?}",
tx
);
if tx_not_found {
unknown_transactions.push(tx);
}
tx_id
})
.collect();
self.blockchain
.add_transactions_into_pool(unknown_transactions);
self.create_block_with_tx_hashes(&tx_hashes)
}
pub fn create_block_with_transaction(&mut self, tx: Verified<AnyTx>) -> BlockWithTransactions {
self.create_block_with_transactions(iter::once(tx))
}
pub fn create_block_with_tx_hashes(
&mut self,
tx_hashes: &[crypto::Hash],
) -> BlockWithTransactions {
self.poll_events();
let snapshot = self.blockchain.snapshot();
let schema = snapshot.for_core();
for hash in tx_hashes {
assert!(
schema.transactions_pool().contains(hash),
"Transaction with hash {:?} is not found in the transaction pool",
hash
);
let transaction = schema
.transactions()
.get(hash)
.expect("Transaction is saved in pool, but not in the `transactions` map");
if let Err(error) = Blockchain::check_tx(&snapshot, &transaction) {
panic!(
"Cannot create block with incorrect transaction (hash = {:?}): {}",
hash, error
);
}
}
self.do_create_block(tx_hashes)
}
pub fn create_block(&mut self) -> BlockWithTransactions {
self.poll_events();
let snapshot = self.snapshot();
let core_schema = snapshot.for_core();
let transactions = core_schema.transactions();
let filter_transactions = |hash: &Hash| {
let transaction = transactions
.get(hash)
.expect("Transaction is saved in pool, but not in the `transactions` map");
if let Err(error) = Blockchain::check_tx(&snapshot, &transaction) {
log::warn!(
"Skipped transaction with hash = {:?} when creating a block \
because the transaction is incorrect: {}",
hash,
error
);
false
} else {
true
}
};
let tx_hashes: Vec<_> = core_schema
.transactions_pool()
.iter()
.filter(filter_transactions)
.collect();
self.do_create_block(&tx_hashes)
}
pub fn add_tx(&mut self, transaction: Verified<AnyTx>) {
if let Err(error) = Blockchain::check_tx(&self.blockchain.snapshot(), &transaction) {
panic!(
"Attempt to add incorrect transaction in the pool: {}",
error
);
}
self.blockchain
.add_transactions_into_pool(iter::once(transaction));
}
pub fn is_tx_in_pool(&self, tx_hash: &Hash) -> bool {
self.snapshot()
.for_core()
.transactions_pool()
.contains(tx_hash)
}
pub fn create_blocks_until(&mut self, height: Height) {
while self.height() < height {
self.create_block();
}
}
pub fn last_block_hash(&self) -> crypto::Hash {
self.blockchain.as_ref().last_hash()
}
pub fn height(&self) -> Height {
self.blockchain.as_ref().last_block().height
}
pub fn consensus_config(&self) -> ConsensusConfig {
self.snapshot().for_core().consensus_config()
}
pub fn validator(&self, id: ValidatorId) -> TestNode {
self.network.validators()[id.0 as usize].clone()
}
pub fn majority_count(&self) -> usize {
byzantine_quorum(self.network().validators().len())
}
pub fn leader(&self) -> TestNode {
self.network().validators()[0].clone()
}
pub fn network(&self) -> &TestNetwork {
&self.network
}
pub fn network_mut(&mut self) -> &mut TestNetwork {
&mut self.network
}
#[allow(clippy::mut_mut)] async fn run(mut self, public_api_address: SocketAddr, private_api_address: SocketAddr) {
let events_task = self.remove_events_stream().fuse();
futures::pin_mut!(events_task);
let endpoints_rx = mem::replace(&mut self.api_notifier_channel.1, mpsc::channel(0).1);
let (api_aggregator, actor_task) = TestKitActor::spawn(self).await;
let mut actor_task = actor_task.fuse();
let mut servers = HashMap::new();
servers.insert(ApiAccess::Public, WebServerConfig::new(public_api_address));
servers.insert(
ApiAccess::Private,
WebServerConfig::new(private_api_address),
);
let api_manager_config = ApiManagerConfig::new(servers, api_aggregator);
let manager_task = ApiManager::new(api_manager_config).run(endpoints_rx).fuse();
futures::pin_mut!(manager_task);
futures::select! {
() = events_task => (),
() = actor_task => (),
res = manager_task => if let Err(e) = res {
log::error!("Error running testkit server API: {}", e);
}
}
}
pub(crate) fn remove_events_stream(&mut self) -> impl Future<Output = ()> {
let stream = mem::replace(&mut self.events_stream, Box::pin(stream::empty()));
stream.for_each(|_| async {})
}
pub fn us(&self) -> TestNode {
self.network().us().clone()
}
pub fn stop(self) -> StoppedTestKit {
let db = self.db_handler.into_inner();
let network = self.network;
let api_notifier_channel = self.api_notifier_channel;
#[cfg(feature = "exonum-node")]
let plugins = self.plugins;
StoppedTestKit {
network,
db,
api_notifier_channel,
#[cfg(feature = "exonum-node")]
plugins,
}
}
}
pub struct StoppedTestKit {
db: CheckpointDb<TemporaryDB>,
#[cfg(feature = "exonum-node")]
plugins: Vec<Box<dyn NodePlugin>>,
network: TestNetwork,
api_notifier_channel: ApiNotifierChannel,
}
impl fmt::Debug for StoppedTestKit {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter
.debug_struct("StoppedTestKit")
.field("height", &self.height())
.field("network", &self.network)
.finish()
}
}
impl StoppedTestKit {
pub fn snapshot(&self) -> Box<dyn Snapshot> {
self.db.snapshot()
}
pub fn height(&self) -> Height {
self.snapshot().for_core().height()
}
pub fn network(&self) -> &TestNetwork {
&self.network
}
pub fn resume(self, rust_runtime: RustRuntimeBuilder) -> TestKit {
self.resume_with_runtimes(rust_runtime, Vec::new())
}
pub fn resume_with_runtimes(
self,
rust_runtime: RustRuntimeBuilder,
external_runtimes: Vec<RuntimeInstance>,
) -> TestKit {
let rust_runtime = rust_runtime.build(self.api_notifier_channel.0.clone());
let mut runtimes = external_runtimes;
runtimes.push(rust_runtime.into());
self.do_resume(runtimes)
}
#[cfg(feature = "exonum-node")]
fn do_resume(self, runtimes: Vec<RuntimeInstance>) -> TestKit {
let mut testkit = TestKit::assemble(
self.db,
self.network,
None,
runtimes,
self.api_notifier_channel,
);
testkit.set_plugins(self.plugins);
testkit
}
#[cfg(not(feature = "exonum-node"))]
fn do_resume(self, runtimes: Vec<RuntimeInstance>) -> TestKit {
TestKit::assemble(
self.db,
self.network,
None,
runtimes,
self.api_notifier_channel,
)
}
}
#[test]
fn test_create_block_heights() {
let mut testkit = TestKitBuilder::validator().build();
assert_eq!(Height(0), testkit.height());
testkit.create_block();
assert_eq!(Height(1), testkit.height());
testkit.create_blocks_until(Height(6));
assert_eq!(Height(6), testkit.height());
}
#[test]
fn test_number_of_validators_in_builder() {
let testkit = TestKitBuilder::auditor().build();
assert_eq!(testkit.network().validators().len(), 1);
assert_ne!(testkit.validator(ValidatorId(0)), testkit.us());
let testkit = TestKitBuilder::validator().build();
assert_eq!(testkit.network().validators().len(), 1);
assert_eq!(testkit.validator(ValidatorId(0)), testkit.us());
let testkit = TestKitBuilder::auditor().with_validators(3).build();
assert_eq!(testkit.network().validators().len(), 3);
let us = testkit.us();
assert!(!testkit.network().validators().into_iter().any(|v| v == us));
let testkit = TestKitBuilder::validator().with_validators(5).build();
assert_eq!(testkit.network().validators().len(), 5);
assert_eq!(testkit.validator(ValidatorId(0)), testkit.us());
}
#[test]
#[should_panic(expected = "validator should be present")]
fn test_zero_validators_in_builder() {
TestKitBuilder::auditor().with_validators(0).build();
}
#[test]
#[should_panic(expected = "Number of validators is already specified")]
fn test_multiple_spec_of_validators_in_builder() {
let testkit = TestKitBuilder::auditor()
.with_validators(5)
.with_validators(2)
.build();
drop(testkit);
}
#[test]
fn test_stop() {
let testkit = TestKitBuilder::validator().with_logger().build();
let _testkit_stopped = testkit.stop();
}