#![allow(clippy::indexing_slicing)]
use std::{sync::Arc, time::Duration};
use tari_common::configuration::Network;
use tari_common_types::types::HashOutput;
use tari_comms::peer_manager::NodeId;
use tari_core::{
base_node::{
BaseNodeStateMachine,
BaseNodeStateMachineConfig,
SyncValidators,
chain_metadata_service::PeerChainMetadata,
state_machine_service::states::{
BlockSync,
DecideNextSync,
HeaderSyncState,
HorizonStateSync,
StateEvent,
StatusInfo,
},
sync::SyncPeer,
},
chain_storage::{BlockchainDatabaseConfig, DbTransaction},
consensus::{BaseNodeConsensusManager, BaseNodeConsensusManagerBuilder},
mempool::MempoolServiceConfig,
proof_of_work::randomx_factory::RandomXFactory,
test_helpers::blockchain::TempDatabase,
validation::mocks::MockValidator,
};
use tari_node_components::blocks::ChainBlock;
use tari_p2p::{P2pConfig, services::liveness::LivenessConfig};
use tari_shutdown::Shutdown;
use tari_transaction_components::{
key_manager::KeyManager,
tari_amount::T,
tari_proof_of_work::Difficulty,
test_helpers::schema_to_transaction,
transaction_components::{Transaction, WalletOutput},
txn_schema,
};
use tempfile::tempdir;
use tokio::sync::{broadcast, watch};
use crate::helpers::{
block_builders::{append_block, create_genesis_block},
nodes::{NodeInterfaces, create_network_with_multiple_base_nodes_with_config},
sample_blockchains,
};
pub fn initialize_sync_headers_with_ping_pong_data(
local_node_interfaces: &NodeInterfaces,
peer_node_interfaces: &NodeInterfaces,
) -> HeaderSyncState {
HeaderSyncState::new(
vec![SyncPeer::from(PeerChainMetadata::new(
peer_node_interfaces.node_identity.node_id().clone(),
peer_node_interfaces.blockchain_db.get_chain_metadata().unwrap(),
None,
))],
local_node_interfaces.blockchain_db.get_chain_metadata().unwrap(),
)
}
pub async fn sync_headers_execute(
state_machine: &mut BaseNodeStateMachine<TempDatabase>,
header_sync: &mut HeaderSyncState,
) -> StateEvent {
header_sync.next_event(state_machine).await
}
pub fn initialize_sync_blocks(peer_node_interfaces: &NodeInterfaces) -> BlockSync {
BlockSync::from(vec![SyncPeer::from(PeerChainMetadata::new(
peer_node_interfaces.node_identity.node_id().clone(),
peer_node_interfaces.blockchain_db.get_chain_metadata().unwrap(),
None,
))])
}
pub async fn sync_blocks_execute(
state_machine: &mut BaseNodeStateMachine<TempDatabase>,
block_sync: &mut BlockSync,
) -> StateEvent {
block_sync.next_event(state_machine).await
}
pub async fn decide_horizon_sync(
local_state_machine: &mut BaseNodeStateMachine<TempDatabase>,
local_header_sync: HeaderSyncState,
) -> StateEvent {
let mut next_sync = DecideNextSync::from(local_header_sync.clone());
next_sync.next_event(local_state_machine).await
}
pub fn initialize_horizon_sync_without_header_sync(peer_node_interfaces: &NodeInterfaces) -> HorizonStateSync {
HorizonStateSync::from(vec![SyncPeer::from(PeerChainMetadata::new(
peer_node_interfaces.node_identity.node_id().clone(),
peer_node_interfaces.blockchain_db.get_chain_metadata().unwrap(),
None,
))])
}
pub async fn horizon_sync_execute(
state_machine: &mut BaseNodeStateMachine<TempDatabase>,
horizon_sync: &mut HorizonStateSync,
) -> StateEvent {
horizon_sync.next_event(state_machine).await
}
pub async fn create_network_with_multiple_nodes(
blockchain_db_configs: Vec<BlockchainDatabaseConfig>,
) -> (
Vec<BaseNodeStateMachine<TempDatabase>>,
Vec<NodeInterfaces>,
ChainBlock,
BaseNodeConsensusManager,
KeyManager,
WalletOutput,
) {
let num_nodes = blockchain_db_configs.len();
if num_nodes < 2 {
panic!("Must have at least 2 nodes");
}
let network = Network::LocalNet;
let temp_dir = tempdir().unwrap();
let key_manager = KeyManager::new_random().unwrap();
let consensus_constants = sample_blockchains::consensus_constants(network).build();
let (initial_block, coinbase_wallet_output) = create_genesis_block(&consensus_constants, &key_manager);
let consensus_manager = BaseNodeConsensusManagerBuilder::new(network)
.add_consensus_constants(consensus_constants)
.with_block(initial_block.clone())
.build()
.unwrap();
let (node_interfaces, consensus_manager) = create_network_with_multiple_base_nodes_with_config(
vec![MempoolServiceConfig::default(); num_nodes],
vec![
LivenessConfig {
auto_ping_interval: Some(Duration::from_millis(100)),
..Default::default()
};
num_nodes
],
blockchain_db_configs,
vec![P2pConfig::default(); num_nodes],
consensus_manager,
temp_dir.path().to_str().unwrap(),
network,
)
.await;
let shutdown = Shutdown::new();
let mut state_machines = Vec::with_capacity(num_nodes);
for node_interface in node_interfaces.iter().take(num_nodes) {
let (state_change_event_publisher, _) = broadcast::channel(10);
let (status_event_sender, _status_event_receiver) = watch::channel(StatusInfo::new());
state_machines.push(BaseNodeStateMachine::new(
node_interface.blockchain_db.clone().into(),
node_interface.local_nci.clone(),
node_interface.comms.connectivity(),
node_interface.chain_metadata_handle.get_event_stream(),
node_interface.comms.peer_manager(),
node_interface.dht.subscribe_dht_events(),
BaseNodeStateMachineConfig::default(),
SyncValidators::new(MockValidator::new(true), MockValidator::new(true)),
status_event_sender,
state_change_event_publisher,
RandomXFactory::default(),
consensus_manager.clone(),
shutdown.to_signal(),
));
}
(
state_machines,
node_interfaces,
initial_block,
consensus_manager,
key_manager,
coinbase_wallet_output,
)
}
#[allow(dead_code)]
#[derive(Debug)]
pub enum WhatToDelete {
BlocksAndHeaders,
Blocks,
Headers,
}
fn delete_block(txn: &mut DbTransaction, node: &NodeInterfaces, blocks: &[ChainBlock], index: usize) {
txn.delete_tip_block(*blocks[index].hash());
txn.delete_orphan(*blocks[index].hash());
txn.set_best_block(
blocks[index + 1].height(),
blocks[index + 1].accumulated_data().hash,
blocks[index + 1].accumulated_data().total_accumulated_difficulty,
*node.blockchain_db.get_chain_metadata().unwrap().best_block_hash(),
blocks[index + 1].to_chain_header().timestamp(),
);
}
pub fn delete_some_blocks_and_headers(
blocks_with_anchor: &[ChainBlock],
instruction: WhatToDelete,
node: &NodeInterfaces,
) {
if blocks_with_anchor.is_empty() || blocks_with_anchor.len() < 2 {
panic!("blocks must have at least 2 elements");
}
let mut blocks: Vec<_> = blocks_with_anchor.to_vec();
blocks.reverse();
for i in 0..blocks.len() - 1 {
let mut txn = DbTransaction::new();
match instruction {
WhatToDelete::BlocksAndHeaders => {
delete_block(&mut txn, node, &blocks, i);
txn.delete_header(blocks[i].height());
},
WhatToDelete::Blocks => {
delete_block(&mut txn, node, &blocks, i);
},
WhatToDelete::Headers => {
txn.delete_header(blocks[i].height());
},
}
node.blockchain_db.write(txn).unwrap();
match instruction {
WhatToDelete::BlocksAndHeaders => {
assert!(
!node
.blockchain_db
.chain_block_or_orphan_block_exists(*blocks[i].hash())
.unwrap()
);
assert!(
node.blockchain_db
.fetch_header_by_block_hash(*blocks[i].hash())
.unwrap()
.is_none()
);
},
WhatToDelete::Blocks => {
assert!(
!node
.blockchain_db
.chain_block_or_orphan_block_exists(*blocks[i].hash())
.unwrap()
);
},
WhatToDelete::Headers => {
assert!(
node.blockchain_db
.fetch_header_by_block_hash(*blocks[i].hash())
.unwrap()
.is_none()
);
},
}
}
}
#[allow(dead_code)]
pub fn set_best_block(block: &ChainBlock, previous_block_hash: &HashOutput, node: &NodeInterfaces) {
let mut txn = DbTransaction::new();
txn.set_best_block(
block.height(),
block.accumulated_data().hash,
block.accumulated_data().total_accumulated_difficulty,
*previous_block_hash,
block.to_chain_header().timestamp(),
);
node.blockchain_db.write(txn).unwrap();
}
pub fn add_some_existing_blocks(blocks: &[ChainBlock], node: &NodeInterfaces) {
for block in blocks {
let _res = node.blockchain_db.add_block(block.block().clone().into()).unwrap();
}
}
pub fn create_and_add_some_blocks(
node: &NodeInterfaces,
start_block: &ChainBlock,
start_coinbase: &WalletOutput,
number_of_blocks: usize,
consensus_manager: &BaseNodeConsensusManager,
key_manager: &KeyManager,
difficulties: &[u64],
transactions: &Option<Vec<Vec<Transaction>>>,
) -> (Vec<ChainBlock>, Vec<WalletOutput>) {
let transactions = if let Some(val) = transactions {
val.clone()
} else {
vec![vec![]; number_of_blocks]
};
if number_of_blocks != difficulties.len() || number_of_blocks != transactions.len() {
panic!(
"Number of blocks ({}), transactions length ({}) and difficulties length ({}) must be equal",
number_of_blocks,
transactions.len(),
difficulties.len()
);
}
let mut blocks = vec![start_block.clone()];
let mut coinbases = vec![start_coinbase.clone()];
let mut prev_block = start_block.clone();
for (item, txns) in difficulties.iter().zip(transactions.iter()) {
let (new_block, coinbase) = append_block(
&node.blockchain_db,
&prev_block,
txns.clone(),
consensus_manager,
Difficulty::from_u64(*item).unwrap(),
key_manager,
)
.unwrap();
prev_block = new_block.clone();
blocks.push(new_block.clone());
coinbases.push(coinbase.clone());
}
(blocks, coinbases)
}
pub async fn wait_for_is_peer_banned(this_node: &NodeInterfaces, peer_node_id: &NodeId, seconds: u64) -> bool {
let interval_ms = 100;
let intervals = seconds * 1000 / interval_ms;
for _ in 0..intervals {
tokio::time::sleep(Duration::from_millis(interval_ms)).await;
if this_node
.comms
.peer_manager()
.is_peer_banned(peer_node_id)
.await
.unwrap()
{
return true;
}
}
false
}
pub fn state_event(event: &StateEvent) -> String {
match event {
StateEvent::Initialized(_) => "Initialized".to_string(),
StateEvent::HeadersSynchronized(_, _) => "HeadersSynchronized".to_string(),
StateEvent::HeaderSyncFailed(_) => "HeaderSyncFailed".to_string(),
StateEvent::ProceedToHorizonSync(_) => "ProceedToHorizonSync".to_string(),
StateEvent::ProceedToBlockSync(_) => "ProceedToBlockSync".to_string(),
StateEvent::HorizonStateSynchronized => "HorizonStateSynchronized".to_string(),
StateEvent::HorizonStateSyncFailure => "HorizonStateSyncFailure".to_string(),
StateEvent::BlocksSynchronized => "BlocksSynchronized".to_string(),
StateEvent::BlockSyncFailed => "BlockSyncFailed".to_string(),
StateEvent::FallenBehind(_) => "FallenBehind".to_string(),
StateEvent::NetworkSilence => "NetworkSilence".to_string(),
StateEvent::FatalError(_) => "FatalError".to_string(),
StateEvent::Continue => "Continue".to_string(),
StateEvent::UserQuit => "UserQuit".to_string(),
}
}
pub fn create_block_chain_with_transactions(
node: &NodeInterfaces,
initial_block: &ChainBlock,
initial_coinbase: &WalletOutput,
consensus_manager: &BaseNodeConsensusManager,
key_manager: &KeyManager,
intermediate_height: u64,
number_of_blocks: usize,
spend_genesis_coinbase_in_block: usize,
follow_up_transaction_in_block: usize,
follow_up_coinbases_to_spend: usize,
) -> (Vec<ChainBlock>, Vec<WalletOutput>) {
assert!(spend_genesis_coinbase_in_block > 1);
assert!((spend_genesis_coinbase_in_block as u64) < intermediate_height);
assert!(follow_up_transaction_in_block > spend_genesis_coinbase_in_block + 1);
assert!((follow_up_transaction_in_block as u64) > intermediate_height);
assert!(number_of_blocks as u64 > follow_up_transaction_in_block as u64 + intermediate_height + 1);
let add_blocks_a = spend_genesis_coinbase_in_block - 1;
let add_blocks_b = follow_up_transaction_in_block - 1 - add_blocks_a;
let add_blocks_c = number_of_blocks - add_blocks_a - add_blocks_b;
assert!(follow_up_coinbases_to_spend > add_blocks_a);
assert!(follow_up_coinbases_to_spend < follow_up_transaction_in_block);
let (blocks_a, coinbases_a) = create_and_add_some_blocks(
node,
initial_block,
initial_coinbase,
add_blocks_a,
consensus_manager,
key_manager,
&vec![3; add_blocks_a],
&None,
);
assert_eq!(node.blockchain_db.get_height().unwrap(), add_blocks_a as u64);
assert_eq!(
node.blockchain_db.fetch_last_header().unwrap().height,
add_blocks_a as u64
);
let schema = txn_schema!(
from: vec![initial_coinbase.clone()],
to: vec![1 * T; 10]
);
let (txns_genesis_coinbase, _outputs) = schema_to_transaction(&[schema], key_manager);
let mut txns_all = vec![vec![]; add_blocks_b];
txns_all[0] = txns_genesis_coinbase
.into_iter()
.map(|t| Arc::try_unwrap(t).unwrap())
.collect::<Vec<_>>();
let (blocks_b, coinbases_b) = create_and_add_some_blocks(
node,
&blocks_a[blocks_a.len() - 1],
&coinbases_a[coinbases_a.len() - 1],
add_blocks_b,
consensus_manager,
key_manager,
&vec![3; add_blocks_b],
&Some(txns_all),
);
assert_eq!(
node.blockchain_db.get_height().unwrap(),
(add_blocks_a + add_blocks_b) as u64
);
assert_eq!(
node.blockchain_db.fetch_last_header().unwrap().height,
(add_blocks_a + add_blocks_b) as u64
);
let mut coinbases_to_spend = Vec::with_capacity(follow_up_coinbases_to_spend);
for coinbase in coinbases_a.iter().skip(1)
{
coinbases_to_spend.push(coinbase.clone());
}
for coinbase in coinbases_b
.iter()
.skip(1) .take(follow_up_coinbases_to_spend - coinbases_to_spend.len())
{
coinbases_to_spend.push(coinbase.clone());
}
assert_eq!(coinbases_to_spend.len(), follow_up_coinbases_to_spend);
let schema = txn_schema!(
from: coinbases_to_spend,
to: vec![1 * T; 20]
);
let (txns_additional_coinbases, _outputs) = schema_to_transaction(&[schema], key_manager);
let mut txns_all = vec![vec![]; add_blocks_c];
txns_all[0] = txns_additional_coinbases
.into_iter()
.map(|t| Arc::try_unwrap(t).unwrap())
.collect::<Vec<_>>();
let (blocks_c, coinbases_c) = create_and_add_some_blocks(
node,
&blocks_b[blocks_b.len() - 1],
&coinbases_b[coinbases_b.len() - 1],
add_blocks_c,
consensus_manager,
key_manager,
&vec![3; add_blocks_c],
&Some(txns_all),
);
assert_eq!(node.blockchain_db.get_height().unwrap(), number_of_blocks as u64);
assert_eq!(
node.blockchain_db.fetch_last_header().unwrap().height,
number_of_blocks as u64
);
let blocks = [&blocks_a[..], &blocks_b[1..], &blocks_c[1..]].concat();
let coinbases = [&coinbases_a[..], &coinbases_b[1..], &coinbases_c[1..]].concat();
(blocks, coinbases)
}