use futures::StreamExt;
use tari_comms::protocol::rpc::{RpcStatusCode, mock::RpcRequestMock};
use tari_service_framework::reply_channel;
use tari_test_utils::{streams::convert_mpsc_to_stream, unpack_enum};
use tokio::sync::broadcast;
use super::BaseNodeSyncRpcService;
use crate::{
base_node::{BaseNodeSyncService, LocalNodeCommsInterface},
chain_storage::BlockchainDatabase,
proto::base_node::{SyncBlocksRequest, SyncUtxosRequest},
test_helpers::{
blockchain::{TempDatabase, create_main_chain, create_new_blockchain},
create_peer_manager,
},
};
fn setup() -> (
BaseNodeSyncRpcService<TempDatabase>,
BlockchainDatabase<TempDatabase>,
RpcRequestMock,
) {
let peer_manager = create_peer_manager();
let request_mock = RpcRequestMock::new(peer_manager);
let db = create_new_blockchain();
let (req_tx, _) = reply_channel::unbounded();
let (block_tx, _) = reply_channel::unbounded();
let (block_event_tx, _) = broadcast::channel(1);
let service = BaseNodeSyncRpcService::new(
db.clone().into(),
LocalNodeCommsInterface::new(req_tx, block_tx, block_event_tx),
);
(service, db, request_mock)
}
mod sync_blocks {
use super::*;
#[tokio::test]
async fn it_returns_not_found_if_unknown_hash() {
let (service, _, rpc_request_mock) = setup();
let msg = SyncBlocksRequest {
start_hash: vec![0; 32],
end_hash: vec![0; 32],
};
let req = rpc_request_mock.request_with_context(Default::default(), msg);
let err = service.sync_blocks(req).await.unwrap_err();
unpack_enum!(RpcStatusCode::NotFound = err.as_status_code());
}
#[tokio::test]
async fn it_sends_bad_request_on_bad_response() {
let (service, db, rpc_request_mock) = setup();
let (_, chain) = create_main_chain(&db, block_specs!(["A->GB"]));
let block = chain.get("A").unwrap();
let msg = SyncBlocksRequest {
start_hash: block.hash().to_vec(),
end_hash: block.hash().to_vec(),
};
let req = rpc_request_mock.request_with_context(Default::default(), msg);
assert!(service.sync_blocks(req).await.is_err());
}
#[tokio::test]
async fn it_streams_blocks_until_end() {
let (service, db, rpc_request_mock) = setup();
let (_, chain) = create_main_chain(&db, block_specs!(["A->GB"], ["B->A"], ["C->B"], ["D->C"], ["E->D"]));
let first_block = chain.get("A").unwrap();
let last_block = chain.get("E").unwrap();
let msg = SyncBlocksRequest {
start_hash: first_block.hash().to_vec(),
end_hash: last_block.hash().to_vec(),
};
let req = rpc_request_mock.request_with_context(Default::default(), msg);
let mut streaming = service.sync_blocks(req).await.unwrap().into_inner();
let blocks = convert_mpsc_to_stream(&mut streaming)
.map(|block| block.unwrap())
.collect::<Vec<_>>()
.await;
assert_eq!(blocks.len(), 4);
blocks.iter().zip(["B", "C", "D", "E"]).for_each(|(block, name)| {
assert_eq!(*chain.get(name).unwrap().hash(), block.hash);
});
}
}
mod sync_utxos {
use super::*;
#[tokio::test]
async fn it_returns_not_found_if_unknown_hash() {
let (service, db, rpc_request_mock) = setup();
let gen_block_hash = db.fetch_header(0).unwrap().unwrap().hash();
let msg = SyncUtxosRequest {
start_header_hash: gen_block_hash.to_vec(),
end_header_hash: vec![0; 32],
};
let req = rpc_request_mock.request_with_context(Default::default(), msg);
let err = service.sync_utxos(req).await.unwrap_err();
unpack_enum!(RpcStatusCode::NotFound = err.as_status_code());
}
#[tokio::test]
async fn it_returns_not_found_if_start_not_found() {
let (service, db, rpc_request_mock) = setup();
let (_, chain) = create_main_chain(&db, block_specs!(["A->GB"]));
let gb = chain.get("GB").unwrap();
let msg = SyncUtxosRequest {
start_header_hash: vec![0; 32],
end_header_hash: gb.hash().to_vec(),
};
let req = rpc_request_mock.request_with_context(Default::default(), msg);
let err = service.sync_utxos(req).await.unwrap_err();
unpack_enum!(RpcStatusCode::NotFound = err.as_status_code());
}
}