use crate::synchronizer::{
IBD_BLOCK_FETCH_TOKEN, NOT_IBD_BLOCK_FETCH_TOKEN, SEND_GET_HEADERS_TOKEN,
TIMEOUT_EVICTION_TOKEN,
};
use crate::tests::TestNode;
use crate::{SyncShared, Synchronizer};
use ckb_chain::chain::ChainService;
use ckb_chain_spec::consensus::ConsensusBuilder;
use ckb_channel::bounded;
use ckb_dao::DaoCalculator;
use ckb_dao_utils::genesis_dao_data;
use ckb_network::SupportProtocols;
use ckb_reward_calculator::RewardCalculator;
use ckb_shared::{Shared, SharedBuilder};
use ckb_store::ChainStore;
use ckb_systemtime::{self, unix_time_as_millis};
use ckb_test_chain_utils::always_success_cell;
use ckb_types::prelude::*;
use ckb_types::{
bytes::Bytes,
core::{cell::resolve_transaction, BlockBuilder, EpochNumberWithFraction, TransactionBuilder},
packed::{self, CellInput, CellOutputBuilder, OutPoint},
utilities::difficulty_to_compact,
U256,
};
use ckb_util::RwLock;
use ckb_verification_traits::Switch;
use std::collections::HashSet;
use std::sync::Arc;
const DEFAULT_CHANNEL: usize = 128;
#[test]
fn basic_sync() {
let _faketime_guard = ckb_systemtime::faketime();
_faketime_guard.set_faketime(0);
let thread_name = "fake_time=0".to_string();
let (mut node1, shared1) = setup_node(1);
let (mut node2, shared2) = setup_node(3);
node1.connect(&mut node2, SupportProtocols::Sync.protocol_id());
let (signal_tx1, signal_rx1) = bounded(DEFAULT_CHANNEL);
node1.start(thread_name.clone(), signal_tx1, |data| {
let msg = packed::SyncMessage::from_compatible_slice(&data)
.expect("sync message")
.to_enum();
if let packed::SyncMessageUnionReader::SendBlock(reader) = msg.as_reader() {
let block = reader.block().to_entity().into_view();
block.header().number() == 3
} else {
false
}
});
let (signal_tx2, _) = bounded(DEFAULT_CHANNEL);
node2.start(thread_name, signal_tx2, |_| false);
let _ = signal_rx1.recv();
node1.stop();
node2.stop();
assert_eq!(shared1.snapshot().tip_number(), 3);
assert_eq!(
shared1.snapshot().tip_number(),
shared2.snapshot().tip_number()
);
}
fn setup_node(height: u64) -> (TestNode, Shared) {
let (always_success_cell, always_success_cell_data, always_success_script) =
always_success_cell();
let always_success_tx = TransactionBuilder::default()
.witness(always_success_script.clone().into_witness())
.input(CellInput::new(OutPoint::null(), 0))
.output(always_success_cell.clone())
.output_data(always_success_cell_data.pack())
.build();
let dao = genesis_dao_data(vec![&always_success_tx]).unwrap();
let mut block = BlockBuilder::default()
.timestamp(unix_time_as_millis().pack())
.compact_target(difficulty_to_compact(U256::from(1000u64)).pack())
.dao(dao)
.transaction(always_success_tx)
.build();
let consensus = ConsensusBuilder::default()
.genesis_block(block.clone())
.cellbase_maturity(EpochNumberWithFraction::new(0, 0, 1))
.build();
let (shared, mut pack) = SharedBuilder::with_temp_db()
.consensus(consensus)
.build()
.unwrap();
let chain_service = ChainService::new(shared.clone(), pack.take_proposal_table());
let chain_controller = chain_service.start::<&str>(None);
for _i in 0..height {
let number = block.header().number() + 1;
let timestamp = block.header().timestamp() + 1;
let snapshot = shared.snapshot();
let epoch = snapshot
.consensus()
.next_epoch_ext(&block.header(), &snapshot.borrow_as_data_loader())
.unwrap()
.epoch();
let (_, reward) = RewardCalculator::new(snapshot.consensus(), snapshot.as_ref())
.block_reward_to_finalize(&block.header())
.unwrap();
let builder = TransactionBuilder::default()
.input(CellInput::new_cellbase_input(number))
.witness(always_success_script.to_owned().into_witness());
let cellbase = if number <= snapshot.consensus().finalization_delay_length() {
builder.build()
} else {
builder
.output(
CellOutputBuilder::default()
.capacity(reward.total.pack())
.lock(always_success_script.to_owned())
.build(),
)
.output_data(Bytes::default().pack())
.build()
};
let dao = {
let resolved_cellbase = resolve_transaction(
cellbase.clone(),
&mut HashSet::new(),
snapshot.as_ref(),
snapshot.as_ref(),
)
.unwrap();
let data_loader = snapshot.borrow_as_data_loader();
DaoCalculator::new(shared.consensus(), &data_loader)
.dao_field([resolved_cellbase].iter(), &block.header())
.unwrap()
};
let chain_root = snapshot
.chain_root_mmr(block.header().number())
.get_root()
.expect("chain root_mmr");
let bytes = chain_root.calc_mmr_hash().as_bytes().pack();
block = BlockBuilder::default()
.transaction(cellbase)
.parent_hash(block.header().hash())
.number(number.pack())
.epoch(epoch.number_with_fraction(number).pack())
.timestamp(timestamp.pack())
.compact_target(epoch.compact_target().pack())
.dao(dao)
.extension(Some(bytes))
.build();
chain_controller
.internal_process_block(Arc::new(block.clone()), Switch::DISABLE_ALL)
.expect("process block should be OK");
}
let sync_shared = Arc::new(SyncShared::new(
shared.clone(),
Default::default(),
pack.take_relay_tx_receiver(),
));
let synchronizer = Synchronizer::new(chain_controller, sync_shared);
let mut node = TestNode::new();
let protocol = Arc::new(RwLock::new(synchronizer)) as Arc<_>;
node.add_protocol(
SupportProtocols::Sync.protocol_id(),
&protocol,
&[
SEND_GET_HEADERS_TOKEN,
IBD_BLOCK_FETCH_TOKEN,
NOT_IBD_BLOCK_FETCH_TOKEN,
TIMEOUT_EVICTION_TOKEN,
],
);
(node, shared)
}