use ckb_chain::{ChainController, ChainServiceScope};
use ckb_chain_spec::consensus::{Consensus, ConsensusBuilder};
use ckb_constant::sync::{CHAIN_SYNC_TIMEOUT, EVICTION_HEADERS_RESPONSE_TIME, MAX_TIP_AGE};
use ckb_dao::DaoCalculator;
use ckb_error::InternalErrorKind;
use ckb_network::{
Behaviour, CKBProtocolContext, Peer, PeerId, PeerIndex, ProtocolId, SessionType, TargetSession,
async_trait, bytes::Bytes,
};
use ckb_reward_calculator::RewardCalculator;
use ckb_shared::types::HeaderIndex;
use ckb_shared::{Shared, SharedBuilder, Snapshot};
use ckb_store::ChainStore;
use ckb_systemtime::unix_time_as_millis;
use ckb_types::{
U256,
core::{
BlockBuilder, BlockNumber, BlockView, EpochExt, HeaderView, TransactionBuilder,
TransactionView, cell::resolve_transaction,
},
packed::{
self, Byte32, CellInput, CellOutputBuilder, Script, SendBlockBuilder, SendHeadersBuilder,
},
prelude::*,
utilities::difficulty_to_compact,
};
use ckb_util::Mutex;
use ckb_verification_traits::Switch;
use futures::future::Future;
use std::{
collections::{HashMap, HashSet},
ops::Deref,
pin::Pin,
sync::{Arc, atomic::Ordering},
time::Duration,
};
use crate::{
Status, StatusCode, SyncShared,
synchronizer::{BlockFetcher, BlockProcess, GetBlocksProcess, HeadersProcess, Synchronizer},
types::{HeadersSyncController, IBDState, PeerState},
};
fn start_chain(consensus: Option<Consensus>) -> (ChainServiceScope, Shared, Synchronizer) {
let mut builder = SharedBuilder::with_temp_db();
let consensus = consensus.unwrap_or_default();
builder = builder.consensus(consensus);
let (shared, mut pack) = builder.build().unwrap();
let chain = ChainServiceScope::new(pack.take_chain_services_builder());
while chain
.chain_controller()
.is_verifying_unverified_blocks_on_startup()
{
std::thread::sleep(std::time::Duration::from_millis(10));
}
let sync_shared = Arc::new(SyncShared::new(
shared.clone(),
Default::default(),
pack.take_relay_tx_receiver(),
));
let synchronizer = Synchronizer::new(chain.chain_controller().clone(), sync_shared);
(chain, shared, synchronizer)
}
fn create_cellbase(
shared: &Shared,
parent_header: &HeaderView,
number: BlockNumber,
) -> TransactionView {
let (_, reward) = RewardCalculator::new(shared.consensus(), shared.snapshot().as_ref())
.block_reward_to_finalize(parent_header)
.unwrap();
let builder = TransactionBuilder::default()
.input(CellInput::new_cellbase_input(number))
.witness(Script::default().into_witness());
if number <= shared.consensus().finalization_delay_length() {
builder.build()
} else {
builder
.output(CellOutputBuilder::default().capacity(reward.total).build())
.output_data(Bytes::new())
.build()
}
}
fn gen_block(
shared: &Shared,
parent_header: &HeaderView,
epoch: &EpochExt,
nonce: u128,
) -> BlockView {
let now = 1 + parent_header.timestamp();
let number = parent_header.number() + 1;
let cellbase = create_cellbase(shared, parent_header, number);
let dao = {
let snapshot: &Snapshot = &shared.snapshot();
let resolved_cellbase =
resolve_transaction(cellbase.clone(), &mut HashSet::new(), snapshot, snapshot).unwrap();
let data_loader = shared.store().borrow_as_data_loader();
DaoCalculator::new(shared.consensus(), &data_loader)
.dao_field([resolved_cellbase].iter(), parent_header)
.unwrap()
};
let chain_root = shared
.snapshot()
.chain_root_mmr(parent_header.number())
.get_root()
.expect("chain root_mmr");
let bytes = chain_root.calc_mmr_hash().as_bytes().into();
BlockBuilder::default()
.transaction(cellbase)
.parent_hash(parent_header.hash())
.timestamp(now)
.epoch(epoch.number_with_fraction(number))
.number(number)
.compact_target(epoch.compact_target())
.nonce(nonce)
.dao(dao)
.extension(Some(bytes))
.build()
}
fn insert_block(
chain_controller: &ChainController,
shared: &Shared,
nonce: u128,
number: BlockNumber,
) {
let snapshot = shared.snapshot();
let parent = snapshot
.get_block_header(&snapshot.get_block_hash(number - 1).unwrap())
.unwrap();
let epoch = snapshot
.consensus()
.next_epoch_ext(&parent, &snapshot.borrow_as_data_loader())
.unwrap()
.epoch();
let block = gen_block(shared, &parent, &epoch, nonce);
chain_controller
.blocking_process_block_with_switch(Arc::new(block), Switch::DISABLE_EXTENSION)
.expect("process block ok");
}
#[test]
fn test_locator() {
let (chain, shared, synchronizer) = start_chain(None);
let num = 200;
let index = [
199, 198, 197, 196, 195, 194, 193, 192, 191, 190, 188, 184, 176, 160, 128, 64,
];
for i in 1..num {
insert_block(chain.chain_controller(), &shared, u128::from(i), i);
}
let locator = synchronizer
.shared
.active_chain()
.get_locator(shared.snapshot().tip_header().into());
let mut expect = Vec::new();
for i in index.iter() {
expect.push(shared.store().get_block_hash(*i).unwrap());
}
expect.push(shared.genesis_hash());
assert_eq!(expect, locator);
}
#[test]
fn test_locate_latest_common_block() {
let _log_guard = ckb_logger_service::init_for_test("debug").expect("init log");
let consensus = Consensus::default();
let (chain1, shared1, synchronizer1) = start_chain(Some(consensus.clone()));
let (chain2, shared2, synchronizer2) = start_chain(Some(consensus.clone()));
let num = 200;
for i in 1..num {
insert_block(chain1.chain_controller(), &shared1, u128::from(i), i);
}
for i in 1..num {
insert_block(chain2.chain_controller(), &shared2, u128::from(i + 1), i);
}
let locator1 = synchronizer1
.shared
.active_chain()
.get_locator(shared1.snapshot().tip_header().into());
let latest_common = synchronizer2
.shared
.active_chain()
.locate_latest_common_block(&Byte32::zero(), &locator1[..]);
assert_eq!(latest_common, Some(0));
let (chain3, shared3, synchronizer3) = start_chain(Some(consensus));
for i in 1..num {
let j = if i > 192 { i + 1 } else { i };
insert_block(chain3.chain_controller(), &shared3, u128::from(j), i);
}
let latest_common3 = synchronizer3
.shared
.active_chain()
.locate_latest_common_block(&Byte32::zero(), &locator1[..]);
assert_eq!(latest_common3, Some(192));
}
#[test]
fn test_locate_latest_common_block2() {
let consensus = Consensus::default();
let (chain1, shared1, synchronizer1) = start_chain(Some(consensus.clone()));
let (chain2, shared2, synchronizer2) = start_chain(Some(consensus.clone()));
let block_number = 200;
let mut blocks: Vec<BlockView> = Vec::new();
let mut parent = consensus.genesis_block().header();
for i in 1..block_number {
let store = shared1.store();
let epoch = shared1
.consensus()
.next_epoch_ext(&parent, &store.borrow_as_data_loader())
.unwrap()
.epoch();
let new_block = gen_block(&shared1, &parent, &epoch, i);
blocks.push(new_block.clone());
chain1
.chain_controller()
.blocking_process_block_with_switch(Arc::new(new_block.clone()), Switch::DISABLE_ALL)
.expect("process block ok");
chain2
.chain_controller()
.blocking_process_block_with_switch(Arc::new(new_block.clone()), Switch::DISABLE_ALL)
.expect("process block ok");
parent = new_block.header().to_owned();
}
parent = blocks[150].header();
let fork = parent.number();
for i in 1..=block_number {
let store = shared2.store();
let epoch = shared2
.consensus()
.next_epoch_ext(&parent, &store.borrow_as_data_loader())
.unwrap()
.epoch();
let new_block = gen_block(&shared2, &parent, &epoch, i + 100);
chain2
.chain_controller()
.blocking_process_block_with_switch(Arc::new(new_block.clone()), Switch::DISABLE_ALL)
.expect("process block ok");
parent = new_block.header().to_owned();
}
let locator1 = synchronizer1
.shared
.active_chain()
.get_locator(shared1.snapshot().tip_header().into());
let latest_common = synchronizer2
.shared
.active_chain()
.locate_latest_common_block(&Byte32::zero(), &locator1[..])
.unwrap();
assert_eq!(
shared1.snapshot().get_block_hash(fork).unwrap(),
shared2.snapshot().get_block_hash(fork).unwrap()
);
assert!(
shared1.snapshot().get_block_hash(fork + 1).unwrap()
!= shared2.snapshot().get_block_hash(fork + 1).unwrap()
);
assert_eq!(
shared1.snapshot().get_block_hash(latest_common).unwrap(),
shared1.snapshot().get_block_hash(fork).unwrap()
);
}
#[test]
fn test_get_ancestor() {
let consensus = Consensus::default();
let (chain, shared, synchronizer) = start_chain(Some(consensus));
let num = 200;
for i in 1..num {
insert_block(chain.chain_controller(), &shared, u128::from(i), i);
}
let header = synchronizer
.shared
.active_chain()
.get_ancestor(&shared.snapshot().tip_header().hash(), 100);
let tip = synchronizer
.shared
.active_chain()
.get_ancestor(&shared.snapshot().tip_header().hash(), 199);
let noop = synchronizer
.shared
.active_chain()
.get_ancestor(&shared.snapshot().tip_header().hash(), 200);
assert!(tip.is_some());
assert!(header.is_some());
assert!(noop.is_none());
assert_eq!(tip.unwrap().hash(), shared.snapshot().tip_header().hash());
assert_eq!(
header.unwrap().hash(),
shared
.store()
.get_block_header(&shared.store().get_block_hash(100).unwrap())
.unwrap()
.hash()
);
}
#[test]
fn test_process_new_block() {
let consensus = Consensus::default();
let (chain1, shared1, _) = start_chain(Some(consensus.clone()));
let (_chain2, shared2, synchronizer) = start_chain(Some(consensus));
let block_number = 2000;
let mut blocks: Vec<BlockView> = Vec::new();
let mut parent = shared1
.store()
.get_block_header(&shared1.store().get_block_hash(0).unwrap())
.unwrap();
for i in 1..block_number {
let store = shared1.store();
let epoch = shared1
.consensus()
.next_epoch_ext(&parent, &store.borrow_as_data_loader())
.unwrap()
.epoch();
let new_block = gen_block(&shared1, &parent, &epoch, i + 100);
chain1
.chain_controller()
.blocking_process_block_with_switch(Arc::new(new_block.clone()), Switch::DISABLE_ALL)
.expect("process block ok");
parent = new_block.header().to_owned();
blocks.push(new_block);
}
let chain1_last_block = blocks.last().cloned().unwrap();
blocks.into_iter().for_each(|block| {
synchronizer
.shared()
.blocking_insert_new_block(&synchronizer.chain, Arc::new(block))
.expect("Insert new block failed");
});
assert_eq!(&chain1_last_block.header(), shared2.snapshot().tip_header());
}
#[test]
fn test_get_locator_response() {
let consensus = Consensus::default();
let (chain, shared, synchronizer) = start_chain(Some(consensus));
let block_number = 200;
let mut blocks: Vec<BlockView> = Vec::new();
let mut parent = shared
.store()
.get_block_header(&shared.store().get_block_hash(0).unwrap())
.unwrap();
for i in 1..=block_number {
let store = shared.snapshot();
let epoch = shared
.consensus()
.next_epoch_ext(&parent, &store.borrow_as_data_loader())
.unwrap()
.epoch();
let new_block = gen_block(&shared, &parent, &epoch, i + 100);
blocks.push(new_block.clone());
chain
.chain_controller()
.blocking_process_block_with_switch(Arc::new(new_block.clone()), Switch::DISABLE_ALL)
.expect("process block ok");
parent = new_block.header().to_owned();
}
let headers = synchronizer
.shared
.active_chain()
.get_locator_response(180, &Byte32::zero());
assert_eq!(headers.first().unwrap(), &blocks[180].header());
assert_eq!(headers.last().unwrap(), &blocks[199].header());
for window in headers.windows(2) {
if let [parent, header] = &window {
assert_eq!(header.data().raw().parent_hash(), parent.hash());
}
}
}
#[derive(Clone)]
struct DummyNetworkContext {
pub peers: HashMap<PeerIndex, Peer>,
pub disconnected: Arc<Mutex<HashSet<PeerIndex>>>,
}
fn mock_peer_info() -> Peer {
Peer::new(
0.into(),
SessionType::Outbound,
format!("/ip4/127.0.0.1/tcp/42/p2p/{}", PeerId::random().to_base58())
.parse()
.expect("parse multiaddr"),
false,
)
}
fn mock_header_index(total_difficulty: u64) -> HeaderIndex {
HeaderIndex::new(0, Default::default(), U256::from(total_difficulty))
}
#[async_trait]
impl CKBProtocolContext for DummyNetworkContext {
async fn set_notify(&self, _interval: Duration, _token: u64) -> Result<(), ckb_network::Error> {
unimplemented!();
}
async fn remove_notify(&self, _token: u64) -> Result<(), ckb_network::Error> {
unimplemented!()
}
async fn async_future_task(
&self,
_task: Pin<Box<dyn Future<Output = ()> + 'static + Send>>,
_blocking: bool,
) -> Result<(), ckb_network::Error> {
Ok(())
}
async fn async_quick_send_message(
&self,
proto_id: ProtocolId,
peer_index: PeerIndex,
data: Bytes,
) -> Result<(), ckb_network::Error> {
self.send_message(proto_id, peer_index, data)
}
async fn async_quick_send_message_to(
&self,
peer_index: PeerIndex,
data: Bytes,
) -> Result<(), ckb_network::Error> {
self.send_message_to(peer_index, data)
}
async fn async_quick_filter_broadcast(
&self,
target: TargetSession,
data: Bytes,
) -> Result<(), ckb_network::Error> {
self.filter_broadcast(target, data)
}
async fn async_send_message(
&self,
_proto_id: ProtocolId,
_peer_index: PeerIndex,
_data: Bytes,
) -> Result<(), ckb_network::Error> {
Ok(())
}
async fn async_send_message_to(
&self,
_peer_index: PeerIndex,
_data: Bytes,
) -> Result<(), ckb_network::Error> {
Ok(())
}
async fn async_filter_broadcast(
&self,
_target: TargetSession,
_data: Bytes,
) -> Result<(), ckb_network::Error> {
Ok(())
}
async fn async_filter_broadcast_with_proto(
&self,
_proto_id: ProtocolId,
_target: TargetSession,
_data: Bytes,
) -> Result<(), ckb_network::Error> {
Ok(())
}
async fn async_quick_filter_broadcast_with_proto(
&self,
_proto_id: ProtocolId,
_target: TargetSession,
_data: Bytes,
) -> Result<(), ckb_network::Error> {
Ok(())
}
async fn async_disconnect(
&self,
peer_index: PeerIndex,
_msg: &str,
) -> Result<(), ckb_network::Error> {
self.disconnected.lock().insert(peer_index);
Ok(())
}
fn future_task(
&self,
_task: Pin<Box<dyn Future<Output = ()> + 'static + Send>>,
_blocking: bool,
) -> Result<(), ckb_network::Error> {
Ok(())
}
fn quick_send_message(
&self,
proto_id: ProtocolId,
peer_index: PeerIndex,
data: Bytes,
) -> Result<(), ckb_network::Error> {
self.send_message(proto_id, peer_index, data)
}
fn quick_send_message_to(
&self,
peer_index: PeerIndex,
data: Bytes,
) -> Result<(), ckb_network::Error> {
self.send_message_to(peer_index, data)
}
fn quick_filter_broadcast(
&self,
target: TargetSession,
data: Bytes,
) -> Result<(), ckb_network::Error> {
self.filter_broadcast(target, data)
}
fn send_message(
&self,
_proto_id: ProtocolId,
_peer_index: PeerIndex,
_data: Bytes,
) -> Result<(), ckb_network::Error> {
Ok(())
}
fn send_message_to(
&self,
_peer_index: PeerIndex,
_data: Bytes,
) -> Result<(), ckb_network::Error> {
Ok(())
}
fn filter_broadcast(
&self,
_target: TargetSession,
_data: Bytes,
) -> Result<(), ckb_network::Error> {
Ok(())
}
fn quick_filter_broadcast_with_proto(
&self,
_proto_id: ProtocolId,
_target: TargetSession,
_data: Bytes,
) -> Result<(), ckb_network::Error> {
Ok(())
}
fn disconnect(&self, peer_index: PeerIndex, _msg: &str) -> Result<(), ckb_network::Error> {
self.disconnected.lock().insert(peer_index);
Ok(())
}
fn get_peer(&self, peer_index: PeerIndex) -> Option<Peer> {
self.peers.get(&peer_index).cloned()
}
fn with_peer_mut(&self, _peer_index: PeerIndex, _f: Box<dyn FnOnce(&mut Peer)>) {}
fn connected_peers(&self) -> Vec<PeerIndex> {
unimplemented!();
}
fn full_relay_connected_peers(&self) -> Vec<PeerIndex> {
unimplemented!();
}
fn report_peer(&self, _peer_index: PeerIndex, _behaviour: Behaviour) {}
fn ban_peer(&self, _peer_index: PeerIndex, _duration: Duration, _reason: String) {}
fn protocol_id(&self) -> ProtocolId {
ProtocolId::new(1)
}
}
fn mock_network_context(peer_num: usize) -> DummyNetworkContext {
let mut peers = HashMap::default();
for peer in 0..peer_num {
peers.insert(peer.into(), mock_peer_info());
}
DummyNetworkContext {
peers,
disconnected: Arc::new(Mutex::new(HashSet::default())),
}
}
#[test]
fn test_sync_process() {
let consensus = Consensus::default();
let (chain1, shared1, synchronizer1) = start_chain(Some(consensus.clone()));
let (chain2, shared2, synchronizer2) = start_chain(Some(consensus));
let num = 200;
for i in 1..num {
insert_block(chain1.chain_controller(), &shared1, u128::from(i), i);
}
let locator1 = synchronizer1
.shared
.active_chain()
.get_locator(shared1.snapshot().tip_header().into());
for i in 1..=num {
let j = if i > 192 { i + 1 } else { i };
insert_block(chain2.chain_controller(), &shared2, u128::from(j), i);
}
let latest_common = synchronizer2
.shared
.active_chain()
.locate_latest_common_block(&Byte32::zero(), &locator1[..]);
assert_eq!(latest_common, Some(192));
let headers = synchronizer2
.shared
.active_chain()
.get_locator_response(192, &Byte32::zero());
assert_eq!(
headers.first().unwrap().hash(),
shared2.store().get_block_hash(193).unwrap()
);
assert_eq!(
headers.last().unwrap().hash(),
shared2.store().get_block_hash(200).unwrap()
);
let sendheaders = SendHeadersBuilder::default()
.headers(headers.iter().map(|h| h.data()).collect::<Vec<_>>())
.build();
let mock_nc = Arc::new(mock_network_context(4));
let peer1: PeerIndex = 1.into();
let peer2: PeerIndex = 2.into();
synchronizer1.on_connected(mock_nc.as_ref(), peer1);
synchronizer1.on_connected(mock_nc.as_ref(), peer2);
assert_eq!(
HeadersProcess::new(
sendheaders.as_reader(),
&synchronizer1,
peer1,
&(Arc::clone(&mock_nc) as Arc<dyn ckb_network::CKBProtocolContext + Sync>)
)
.execute(),
Status::ok(),
);
let best_known_header = synchronizer1.peers().get_best_known_header(peer1);
assert_eq!(
best_known_header.unwrap().hash(),
headers.last().unwrap().hash()
);
let blocks_to_fetch = synchronizer1
.get_blocks_to_fetch(peer1, IBDState::Out)
.unwrap();
assert_eq!(
blocks_to_fetch[0].first().unwrap(),
&shared2.store().get_block_hash(193).unwrap()
);
assert_eq!(
blocks_to_fetch[0].last().unwrap(),
&shared2.store().get_block_hash(200).unwrap()
);
let mut fetched_blocks = Vec::new();
for block_hash in &blocks_to_fetch[0] {
fetched_blocks.push(shared2.store().get_block(block_hash).unwrap());
}
for block in &fetched_blocks {
let block = SendBlockBuilder::default().block(block.data()).build();
let nc = Arc::new(mock_network_context(1));
assert_eq!(
BlockProcess::new(block.as_reader(), &synchronizer1, peer1, nc).blocking_execute(),
Status::ok(),
);
}
synchronizer1
.shared()
.state()
.write_inflight_blocks()
.remove_by_peer(peer1);
insert_block(&synchronizer2.chain, &shared2, 201u128, 201);
let headers = [synchronizer2.shared.active_chain().tip_header()];
let sendheaders = SendHeadersBuilder::default()
.headers(headers.iter().map(|h| h.data()).collect::<Vec<_>>())
.build();
assert_eq!(
HeadersProcess::new(
sendheaders.as_reader(),
&synchronizer1,
peer1,
&(mock_nc as Arc<dyn ckb_network::CKBProtocolContext + Sync>)
)
.execute(),
Status::ok(),
);
synchronizer1
.get_blocks_to_fetch(peer1, IBDState::Out)
.unwrap();
let last_common_header2 = synchronizer1.peers().get_last_common_header(peer1).unwrap();
assert_eq!(
&last_common_header2.hash(),
blocks_to_fetch[0].last().unwrap(),
"last_common_header change because it update during get_blocks_to_fetch",
);
}
#[test]
fn test_header_sync_timeout() {
let _faketime_guard = ckb_systemtime::faketime();
_faketime_guard.set_faketime(0);
let (_chain, _, synchronizer) = start_chain(None);
let network_context = Arc::new(mock_network_context(5));
_faketime_guard.set_faketime(MAX_TIP_AGE * 2);
assert!(
synchronizer
.shared
.active_chain()
.is_initial_block_download()
);
let peers = synchronizer.peers();
{
let timeout = HeadersSyncController::new(0, 0, 0, 0, false);
let not_timeout = HeadersSyncController::new(MAX_TIP_AGE * 2, 0, MAX_TIP_AGE * 2, 0, false);
let mut state_0 = PeerState::default();
state_0.peer_flags.is_protect = true;
state_0.peer_flags.is_outbound = true;
state_0.headers_sync_controller = Some(timeout);
let mut state_1 = PeerState::default();
state_1.peer_flags.is_outbound = true;
state_1.headers_sync_controller = Some(timeout);
let mut state_2 = PeerState::default();
state_2.peer_flags.is_whitelist = true;
state_2.peer_flags.is_outbound = true;
state_2.headers_sync_controller = Some(timeout);
let mut state_3 = PeerState::default();
state_3.peer_flags.is_outbound = true;
state_3.headers_sync_controller = Some(not_timeout);
peers.state.insert(0.into(), state_0);
peers.state.insert(1.into(), state_1);
peers.state.insert(2.into(), state_2);
peers.state.insert(3.into(), state_3);
}
synchronizer.eviction(&(Arc::clone(&network_context) as Arc<dyn CKBProtocolContext + Sync>));
let disconnected = network_context.disconnected.lock();
assert_eq!(
disconnected.deref(),
&vec![0, 1, 2].into_iter().map(Into::into).collect()
)
}
#[test]
fn test_chain_sync_timeout() {
let _faketime_guard = ckb_systemtime::faketime();
_faketime_guard.set_faketime(0);
let consensus = Consensus::default();
let block = BlockBuilder::default()
.compact_target(difficulty_to_compact(U256::from(3u64)))
.transaction(consensus.genesis_block().transactions()[0].clone())
.build();
let consensus = ConsensusBuilder::default().genesis_block(block).build();
let (_chain, shared, synchronizer) = start_chain(Some(consensus));
assert_eq!(shared.snapshot().total_difficulty(), &U256::from(3u64));
let network_context = Arc::new(mock_network_context(7));
let peers = synchronizer.peers();
let not_timeout = HeadersSyncController::new(MAX_TIP_AGE * 2, 0, MAX_TIP_AGE * 2, 0, false);
let sync_protected_peer = 0.into();
{
let mut state_0 = PeerState::default();
state_0.peer_flags.is_protect = true;
state_0.peer_flags.is_outbound = true;
state_0.headers_sync_controller = Some(not_timeout);
let mut state_1 = PeerState::default();
state_1.peer_flags.is_protect = true;
state_1.peer_flags.is_outbound = true;
state_1.headers_sync_controller = Some(not_timeout);
let mut state_2 = PeerState::default();
state_2.peer_flags.is_protect = true;
state_2.peer_flags.is_outbound = true;
state_2.headers_sync_controller = Some(not_timeout);
let mut state_3 = PeerState::default();
state_3.peer_flags.is_outbound = true;
state_3.headers_sync_controller = Some(not_timeout);
let mut state_4 = PeerState::default();
state_4.peer_flags.is_outbound = true;
state_4.headers_sync_controller = Some(not_timeout);
let mut state_5 = PeerState::default();
state_5.peer_flags.is_outbound = true;
state_5.headers_sync_controller = Some(not_timeout);
let mut state_6 = PeerState::default();
state_6.peer_flags.is_whitelist = true;
state_6.peer_flags.is_outbound = true;
state_6.headers_sync_controller = Some(not_timeout);
peers.state.insert(0.into(), state_0);
peers.state.insert(1.into(), state_1);
peers.state.insert(2.into(), state_2);
peers.state.insert(3.into(), state_3);
peers.state.insert(4.into(), state_4);
peers.state.insert(5.into(), state_5);
peers.state.insert(6.into(), state_6);
}
peers.may_set_best_known_header(0.into(), mock_header_index(1));
peers.may_set_best_known_header(2.into(), mock_header_index(3));
peers.may_set_best_known_header(3.into(), mock_header_index(1));
peers.may_set_best_known_header(5.into(), mock_header_index(3));
{
peers
.state
.get_mut(&sync_protected_peer)
.unwrap()
.start_sync(not_timeout);
synchronizer
.shared()
.state()
.n_sync_started()
.fetch_add(1, Ordering::AcqRel);
}
synchronizer.eviction(&(Arc::clone(&network_context) as Arc<dyn CKBProtocolContext + Sync>));
{
assert!(
peers
.state
.get(&sync_protected_peer)
.unwrap()
.sync_started(),
);
assert_eq!(
synchronizer
.shared()
.state()
.n_sync_started()
.load(Ordering::Acquire),
1
);
assert!({ network_context.disconnected.lock().is_empty() });
assert!(
peers
.state
.get(&2.into())
.unwrap()
.chain_sync
.work_header
.is_none()
);
let (tip, total_difficulty) = {
let snapshot = shared.snapshot();
let header = snapshot.tip_header().to_owned();
let total_difficulty = snapshot.total_difficulty().to_owned();
(header, total_difficulty)
};
assert_eq!(
peers.state.get(&3.into()).unwrap().chain_sync.work_header,
Some(tip.clone())
);
assert_eq!(
peers
.state
.get(&3.into())
.unwrap()
.chain_sync
.total_difficulty,
Some(total_difficulty.clone())
);
assert_eq!(
peers.state.get(&4.into()).unwrap().chain_sync.work_header,
Some(tip)
);
assert_eq!(
peers
.state
.get(&4.into())
.unwrap()
.chain_sync
.total_difficulty,
Some(total_difficulty)
);
for proto_id in &[0usize, 1, 3, 4, 6] {
assert_eq!(
peers
.state
.get(&(*proto_id).into())
.unwrap()
.chain_sync
.timeout,
CHAIN_SYNC_TIMEOUT
);
}
}
_faketime_guard.set_faketime(CHAIN_SYNC_TIMEOUT + 1);
synchronizer.eviction(&(Arc::clone(&network_context) as Arc<dyn CKBProtocolContext + Sync>));
{
assert!({ network_context.disconnected.lock().is_empty() });
assert_eq!(
peers.state.get(&3.into()).unwrap().chain_sync.timeout,
unix_time_as_millis() + EVICTION_HEADERS_RESPONSE_TIME
);
assert_eq!(
peers.state.get(&4.into()).unwrap().chain_sync.timeout,
unix_time_as_millis() + EVICTION_HEADERS_RESPONSE_TIME
);
}
_faketime_guard.set_faketime(unix_time_as_millis() + EVICTION_HEADERS_RESPONSE_TIME + 1);
synchronizer.eviction(&(Arc::clone(&network_context) as Arc<dyn CKBProtocolContext + Sync>));
{
assert!(
!peers
.state
.get(&sync_protected_peer)
.unwrap()
.sync_started(),
);
assert_eq!(
synchronizer
.shared()
.state()
.n_sync_started()
.load(Ordering::Acquire),
0
);
let disconnected = network_context.disconnected.lock();
assert_eq!(
disconnected.deref(),
&vec![3, 4].into_iter().map(Into::into).collect()
)
}
}
#[test]
fn test_n_sync_started() {
let _faketime_guard = ckb_systemtime::faketime();
_faketime_guard.set_faketime(0);
let consensus = Consensus::default();
let block = BlockBuilder::default()
.compact_target(difficulty_to_compact(U256::from(3u64)))
.transaction(consensus.genesis_block().transactions()[0].clone())
.build();
let consensus = ConsensusBuilder::default().genesis_block(block).build();
let (_chain, shared, synchronizer) = start_chain(Some(consensus));
assert_eq!(shared.snapshot().total_difficulty(), &U256::from(3u64));
let network_context = Arc::new(mock_network_context(1));
let peers = synchronizer.peers();
let not_timeout = HeadersSyncController::new(MAX_TIP_AGE * 2, 0, MAX_TIP_AGE * 2, 0, false);
let sync_protected_peer = 0.into();
{
let mut state_0 = PeerState::default();
state_0.peer_flags.is_protect = true;
state_0.peer_flags.is_outbound = true;
state_0.headers_sync_controller = Some(not_timeout);
peers.state.insert(0.into(), state_0);
}
{
peers
.state
.get_mut(&sync_protected_peer)
.unwrap()
.start_sync(not_timeout);
synchronizer
.shared()
.state()
.n_sync_started()
.fetch_add(1, Ordering::AcqRel);
}
synchronizer.eviction(&(Arc::clone(&network_context) as Arc<dyn CKBProtocolContext + Sync>));
assert!({ network_context.disconnected.lock().is_empty() });
_faketime_guard.set_faketime(CHAIN_SYNC_TIMEOUT + 1);
synchronizer.eviction(&(Arc::clone(&network_context) as Arc<dyn CKBProtocolContext + Sync>));
{
assert!({ network_context.disconnected.lock().is_empty() });
assert_eq!(
peers
.state
.get(&sync_protected_peer)
.unwrap()
.chain_sync
.timeout,
unix_time_as_millis() + EVICTION_HEADERS_RESPONSE_TIME
);
}
_faketime_guard.set_faketime(unix_time_as_millis() + EVICTION_HEADERS_RESPONSE_TIME + 1);
synchronizer.eviction(&(network_context as Arc<dyn CKBProtocolContext + Sync>));
{
assert!(
!peers
.state
.get(&sync_protected_peer)
.unwrap()
.sync_started(),
);
assert_eq!(
synchronizer
.shared()
.state()
.n_sync_started()
.load(Ordering::Acquire),
0
);
}
let mut state = peers.state.get_mut(&sync_protected_peer).unwrap();
synchronizer.shared().state().tip_synced(&mut state);
}
#[test]
fn test_fix_last_common_header() {
let m_ = |number| format!("M{number}");
let f_ = |number| format!("F{number}");
let mut graph = HashMap::new();
let mut graph_exts = HashMap::new();
let main_tip_number = 6u64;
let fork_tip_number = 7u64;
let fork_point = 3u64;
{
let (chain, shared, _) = start_chain(Some(Consensus::default()));
for number in 1..=main_tip_number {
insert_block(
chain.chain_controller(),
&shared,
u128::from(number),
number,
);
}
for number in 0..=main_tip_number {
let block_hash = shared.snapshot().get_block_hash(number).unwrap();
let block = shared.snapshot().get_block(&block_hash).unwrap();
let block_ext = shared.snapshot().get_block_ext(&block_hash).unwrap();
graph.insert(m_(number), block);
graph_exts.insert(m_(number), block_ext);
}
}
{
let (chain, shared, _) = start_chain(Some(Consensus::default()));
for number in 1..=fork_tip_number {
insert_block(
chain.chain_controller(),
&shared,
u128::from(number % (fork_point + 1)),
number,
);
}
for number in 0..=fork_tip_number {
let block_hash = shared.snapshot().get_block_hash(number).unwrap();
let block = shared.snapshot().get_block(&block_hash).unwrap();
let block_ext = shared.snapshot().get_block_ext(&block_hash).unwrap();
graph.insert(f_(number), block);
graph_exts.insert(f_(number), block_ext);
}
}
let (_chain, _, synchronizer) = start_chain(Some(Consensus::default()));
for number in 1..=main_tip_number {
let key = m_(number);
let block = graph.get(&key).cloned().unwrap();
synchronizer
.chain
.blocking_process_block(Arc::new(block))
.unwrap();
}
{
let nc = Arc::new(mock_network_context(1));
let peer: PeerIndex = 0.into();
let fork_headers = (1..=fork_tip_number)
.map(|number| graph.get(&f_(number)).cloned().unwrap())
.map(|block| block.header().data())
.collect::<Vec<_>>();
let sendheaders = SendHeadersBuilder::default().headers(fork_headers).build();
synchronizer.on_connected(nc.as_ref(), peer);
assert!(
HeadersProcess::new(
sendheaders.as_reader(),
&synchronizer,
peer,
&(nc as Arc<dyn ckb_network::CKBProtocolContext + Sync>)
)
.execute()
.is_ok()
);
}
let cases = vec![
(None, "M2", Some("M2")),
(None, "F5", Some("M3")),
(None, "M5", Some("M5")),
(Some("M1"), "M5", Some("M1")),
(Some("M1"), "F7", Some("M1")),
(Some("M4"), "F7", Some("M3")),
(Some("F4"), "M6", Some("M3")),
(Some("F4"), "F7", Some("F4")),
(Some("F7"), "M6", Some("M3")), ];
let nc = mock_network_context(cases.len());
for (case, (last_common, best_known, fix_last_common)) in cases.into_iter().enumerate() {
let peer: PeerIndex = case.into();
synchronizer.on_connected(&nc, peer);
let last_common_header = last_common.map(|key| graph.get(key).cloned().unwrap().header());
let best_known_header = {
let header = graph.get(best_known).cloned().unwrap().header();
let total_difficulty = graph_exts
.get(best_known)
.cloned()
.unwrap()
.total_difficulty;
HeaderIndex::new(header.number(), header.hash(), total_difficulty)
};
if let Some(mut state) = synchronizer.shared.state().peers().state.get_mut(&peer) {
state.last_common_header = last_common_header.map(Into::into);
state.best_known_header = Some(best_known_header.clone());
}
let expected = fix_last_common.map(|mark| mark.to_string());
let actual = BlockFetcher::new(Arc::clone(&synchronizer.shared), peer, IBDState::In)
.update_last_common_header(&best_known_header.number_and_hash())
.map(|header| {
if graph
.get(&m_(header.number()))
.map(|b| b.hash() != header.hash())
.unwrap_or(false)
{
f_(header.number())
} else {
m_(header.number())
}
});
assert_eq!(
expected, actual,
"Case: {case}, last_common: {last_common:?}, best_known: {best_known:?}, expected: {expected:?}, actual: {actual:?}"
);
}
}
#[test]
fn get_blocks_process() {
let consensus = Consensus::default();
let (chain, shared, synchronizer) = start_chain(Some(consensus));
let num = 2;
for i in 1..num {
insert_block(chain.chain_controller(), &shared, u128::from(i), i);
}
let genesis_hash = shared.consensus().genesis_hash();
let message_with_genesis = packed::GetBlocks::new_builder()
.block_hashes(vec![genesis_hash])
.build();
let nc = Arc::new(mock_network_context(1)) as Arc<dyn CKBProtocolContext + Sync + 'static>;
let peer: PeerIndex = 1.into();
let process = GetBlocksProcess::new(message_with_genesis.as_reader(), &synchronizer, peer, &nc);
assert_eq!(
process.execute(),
StatusCode::RequestGenesis.with_context("Request genesis block")
);
let hash = shared.snapshot().get_block_hash(1).unwrap();
let message_with_dup = packed::GetBlocks::new_builder()
.block_hashes(vec![hash.clone(), hash])
.build();
let nc = Arc::new(mock_network_context(1)) as Arc<dyn CKBProtocolContext + Sync + 'static>;
let peer: PeerIndex = 1.into();
let process = GetBlocksProcess::new(message_with_dup.as_reader(), &synchronizer, peer, &nc);
assert_eq!(
process.execute(),
StatusCode::RequestDuplicate.with_context("Request duplicate block")
);
}
#[test]
fn test_internal_db_error() {
use ckb_error::is_internal_db_error;
let consensus = Consensus::default();
let mut builder = SharedBuilder::with_temp_db();
builder = builder.consensus(consensus);
let (shared, mut pack) = builder.build().unwrap();
let sync_shared = Arc::new(SyncShared::new(
shared,
Default::default(),
pack.take_relay_tx_receiver(),
));
let mut chain_controller = ChainController::faux();
let block = Arc::new(BlockBuilder::default().build());
faux::when!(chain_controller.blocking_process_block(Arc::clone(&block))).then_return(Err(
InternalErrorKind::Database.other("mocked db error").into(),
));
let synchronizer = Synchronizer::new(chain_controller, sync_shared);
let status = synchronizer
.shared()
.blocking_insert_new_block(&synchronizer.chain, Arc::clone(&block));
assert!(is_internal_db_error(&status.err().unwrap()));
}