use actix::prelude::*;
use exonum::{blockchain::ConsensusConfig, crypto::Hash, helpers::Height};
use exonum_api::{self as api, ApiAggregator, ApiBuilder};
use exonum_explorer::{BlockWithTransactions, BlockchainExplorer};
use futures::FutureExt;
use serde::{Deserialize, Serialize};
use tokio::task::LocalSet;
use crate::{TestKit, TestNode};
#[derive(Debug)]
pub(crate) struct TestKitActor(TestKit);
impl TestKitActor {
pub(crate) async fn spawn(mut testkit: TestKit) -> (ApiAggregator, LocalSet) {
let mut api_aggregator = testkit.update_aggregator();
let local_set = LocalSet::new();
local_set.spawn_local(System::run_in_tokio("testkit", &local_set));
let testkit = local_set.run_until(async { Self(testkit).start() }).await;
api_aggregator.insert("testkit", Self::api(testkit));
(api_aggregator, local_set)
}
fn api(addr: Addr<Self>) -> ApiBuilder {
let mut builder = ApiBuilder::new();
let api_scope = builder.private_scope();
let addr_ = addr.clone();
api_scope.endpoint("v1/status", move |()| {
addr_.send(GetStatus).map(flatten_err)
});
let addr_ = addr.clone();
api_scope.endpoint_mut("v1/blocks/rollback", move |height| {
addr_.send(RollBack(height)).map(flatten_err)
});
api_scope.endpoint_mut("v1/blocks/create", move |query: CreateBlock| {
addr.send(query).map(flatten_err)
});
builder
}
}
impl Actor for TestKitActor {
type Context = Context<Self>;
}
fn flatten_err<T>(res: Result<Result<T, api::Error>, MailboxError>) -> Result<T, api::Error> {
match res {
Ok(Ok(value)) => Ok(value),
Ok(Err(e)) => Err(e),
Err(e) => Err(api::Error::internal(e)),
}
}
#[derive(Debug)]
struct GetStatus;
impl Message for GetStatus {
type Result = api::Result<TestKitStatus>;
}
#[derive(Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub struct TestKitStatus {
pub height: Height,
pub configuration: ConsensusConfig,
pub nodes: Vec<TestNode>,
}
impl Handler<GetStatus> for TestKitActor {
type Result = api::Result<TestKitStatus>;
fn handle(&mut self, _msg: GetStatus, _ctx: &mut Self::Context) -> Self::Result {
Ok(TestKitStatus {
height: self.0.height(),
configuration: self.0.consensus_config(),
nodes: self.0.network.nodes().to_vec(),
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct CreateBlock {
#[serde(default)]
pub tx_hashes: Option<Vec<Hash>>,
}
impl CreateBlock {
pub fn with_tx_hashes(tx_hashes: Vec<Hash>) -> Self {
Self {
tx_hashes: Some(tx_hashes),
}
}
pub fn with_all_transactions() -> Self {
Self { tx_hashes: None }
}
}
impl Message for CreateBlock {
type Result = api::Result<BlockWithTransactions>;
}
impl Handler<CreateBlock> for TestKitActor {
type Result = api::Result<BlockWithTransactions>;
fn handle(&mut self, msg: CreateBlock, _ctx: &mut Self::Context) -> Self::Result {
let block_info = if let Some(tx_hashes) = msg.tx_hashes {
let maybe_missing_tx = tx_hashes.iter().find(|h| !self.0.is_tx_in_pool(h));
if let Some(missing_tx) = maybe_missing_tx {
return Err(api::Error::bad_request()
.title("Creating block failed")
.detail(format!(
"Transaction not in mempool: {}",
missing_tx.to_string()
)));
}
self.0.checkpoint();
self.0.create_block_with_tx_hashes(&tx_hashes)
} else {
self.0.checkpoint();
self.0.create_block()
};
Ok(block_info)
}
}
#[derive(Debug)]
struct RollBack(Height);
impl Message for RollBack {
type Result = api::Result<Option<BlockWithTransactions>>;
}
impl Handler<RollBack> for TestKitActor {
type Result = api::Result<Option<BlockWithTransactions>>;
fn handle(&mut self, RollBack(height): RollBack, _ctx: &mut Self::Context) -> Self::Result {
if height == Height(0) {
return Err(api::Error::bad_request().title("Cannot rollback past genesis block"));
}
if self.0.height() >= height {
let rollback_blocks = (self.0.height().0 - height.0 + 1) as usize;
for _ in 0..rollback_blocks {
self.0.rollback();
}
}
let snapshot = self.0.snapshot();
let explorer = BlockchainExplorer::new(snapshot.as_ref());
Ok(explorer.block_with_txs(self.0.height()))
}
}
#[cfg(test)]
mod tests {
use exonum::{
crypto::{gen_keypair, Hash},
helpers::{Height, ValidatorId},
messages::{AnyTx, Verified},
runtime::{ExecutionContext, ExecutionError},
};
use exonum_derive::{exonum_interface, ServiceDispatcher, ServiceFactory};
use exonum_explorer::BlockWithTransactions;
use exonum_merkledb::ObjectHash;
use exonum_rust_runtime::{api, spec::Spec, Service};
use pretty_assertions::assert_eq;
use tokio::time::delay_for;
use std::time::Duration;
use super::*;
use crate::{TestKitApi, TestKitBuilder};
const TIMESTAMP_SERVICE_ID: u32 = 2;
const TIMESTAMP_SERVICE_NAME: &str = "sample";
fn timestamp(s: &str) -> Verified<AnyTx> {
gen_keypair().timestamp(TIMESTAMP_SERVICE_ID, s.to_owned())
}
#[derive(Debug, ServiceDispatcher, ServiceFactory)]
#[service_factory(artifact_name = "sample-service")]
#[service_dispatcher(implements("SampleInterface"))]
struct SampleService;
#[exonum_interface(auto_ids)]
trait SampleInterface<Ctx> {
type Output;
fn timestamp(&self, ctx: Ctx, arg: String) -> Self::Output;
}
impl SampleInterface<ExecutionContext<'_>> for SampleService {
type Output = Result<(), ExecutionError>;
fn timestamp(&self, _ctx: ExecutionContext<'_>, _arg: String) -> Self::Output {
Ok(())
}
}
impl Service for SampleService {}
async fn init_handler(height: Height) -> (TestKitApi, LocalSet) {
let mut testkit = TestKitBuilder::validator()
.with(Spec::new(SampleService).with_instance(
TIMESTAMP_SERVICE_ID,
TIMESTAMP_SERVICE_NAME,
(),
))
.build();
testkit.create_blocks_until(height);
tokio::spawn(testkit.remove_events_stream());
let api_sender = testkit.api_sender.clone();
let (aggregator, actor_task) = TestKitActor::spawn(testkit).await;
let api = TestKitApi::from_raw_parts(aggregator, api_sender);
(api, actor_task)
}
async fn sleep() {
delay_for(Duration::from_millis(20)).await;
}
async fn test_status(api: TestKitApi) {
let status: TestKitStatus = api.private("api/testkit").get("v1/status").await.unwrap();
assert_eq!(status.height, Height(0));
assert_eq!(status.nodes.len(), 1);
let our_node = &status.nodes[0];
assert_eq!(our_node.validator_id(), Some(ValidatorId(0)));
assert_eq!(
status.configuration.validator_keys,
[our_node.public_keys()]
);
}
#[tokio::test]
async fn status() {
let (api, local_set) = init_handler(Height(0)).await;
local_set.run_until(test_status(api)).await;
}
async fn test_create_block_with_empty_body(api: TestKitApi) {
let tx = timestamp("foo");
api.send(tx.clone()).await;
sleep().await;
let block_info: BlockWithTransactions = api
.private("api/testkit")
.query(&CreateBlock { tx_hashes: None })
.post("v1/blocks/create")
.await
.unwrap();
assert_eq!(block_info.header.height, Height(1));
assert_eq!(block_info.transactions.len(), 1);
assert_eq!(block_info.transactions[0].message(), &tx);
let block_info: BlockWithTransactions = api
.private("api/testkit")
.query(&Height(1))
.post("v1/blocks/rollback")
.await
.unwrap();
assert_eq!(block_info.header.height, Height(0));
api.send(tx.clone()).await;
sleep().await;
let block_info: BlockWithTransactions = api
.private("api/testkit")
.query(&CreateBlock { tx_hashes: None })
.post("v1/blocks/create")
.await
.unwrap();
assert_eq!(block_info.header.height, Height(1));
assert_eq!(block_info.transactions.len(), 1);
assert_eq!(block_info.transactions[0].message(), &tx);
}
#[tokio::test]
async fn create_block_with_empty_body() {
let (api, local_set) = init_handler(Height(0)).await;
local_set
.run_until(test_create_block_with_empty_body(api))
.await;
}
async fn test_create_block_with_specified_transactions(api: TestKitApi) {
let tx_foo = timestamp("foo");
let tx_bar = timestamp("bar");
api.send(tx_foo.clone()).await;
api.send(tx_bar.clone()).await;
sleep().await;
let body = CreateBlock {
tx_hashes: Some(vec![tx_foo.object_hash()]),
};
let block_info: BlockWithTransactions = api
.private("api/testkit")
.query(&body)
.post("v1/blocks/create")
.await
.unwrap();
assert_eq!(block_info.header.height, Height(1));
assert_eq!(block_info.transactions.len(), 1);
assert_eq!(block_info.transactions[0].message(), &tx_foo);
let body = CreateBlock {
tx_hashes: Some(vec![tx_bar.object_hash()]),
};
let block_info: BlockWithTransactions = api
.private("api/testkit")
.query(&body)
.post("v1/blocks/create")
.await
.unwrap();
assert_eq!(block_info.header.height, Height(2));
assert_eq!(block_info.transactions.len(), 1);
assert_eq!(block_info.transactions[0].message(), &tx_bar);
}
#[tokio::test]
async fn create_block_with_specified_transactions() {
let (api, local_set) = init_handler(Height(0)).await;
local_set
.run_until(test_create_block_with_specified_transactions(api))
.await;
}
async fn test_create_block_with_bogus_transaction(api: TestKitApi) {
let body = CreateBlock {
tx_hashes: Some(vec![Hash::zero()]),
};
let err = api
.private("api/testkit")
.query(&body)
.post::<BlockWithTransactions>("v1/blocks/create")
.await
.unwrap_err();
assert_eq!(err.http_code, api::HttpStatusCode::BAD_REQUEST);
assert_eq!(err.body.title, "Creating block failed");
assert_eq!(
err.body.detail,
format!("Transaction not in mempool: {}", Hash::zero())
);
}
#[tokio::test]
async fn create_block_with_bogus_transaction() {
let (api, local_set) = init_handler(Height(0)).await;
local_set
.run_until(test_create_block_with_bogus_transaction(api))
.await;
}
async fn test_rollback_normal(api: TestKitApi) {
for i in 0..4 {
let block: BlockWithTransactions = api
.private("api/testkit")
.query(&CreateBlock { tx_hashes: None })
.post("v1/blocks/create")
.await
.unwrap();
assert_eq!(block.height(), Height(i + 1));
}
let block_info: BlockWithTransactions = api
.private("api/testkit")
.query(&Height(10))
.post("v1/blocks/rollback")
.await
.unwrap();
assert_eq!(block_info.header.height, Height(4));
for _ in 0..2 {
let block_info: BlockWithTransactions = api
.private("api/testkit")
.query(&Height(4))
.post("v1/blocks/rollback")
.await
.unwrap();
assert_eq!(block_info.header.height, Height(3));
}
let block = api
.private("api/testkit")
.query(&Height(1))
.post::<BlockWithTransactions>("v1/blocks/rollback")
.await
.unwrap();
assert_eq!(block.header.height, Height(0));
}
#[tokio::test]
async fn rollback_normal() {
let (api, local_set) = init_handler(Height(0)).await;
local_set.run_until(test_rollback_normal(api)).await;
}
async fn test_rollback_past_genesis(api: TestKitApi) {
let err = api
.private("api/testkit")
.query(&Height(0))
.post::<BlockWithTransactions>("v1/blocks/rollback")
.await
.unwrap_err();
assert_eq!(err.http_code, api::HttpStatusCode::BAD_REQUEST);
assert_eq!(err.body.title, "Cannot rollback past genesis block");
}
#[tokio::test]
async fn rollback_past_genesis() {
let (api, local_set) = init_handler(Height(4)).await;
local_set.run_until(test_rollback_past_genesis(api)).await;
}
}