use crate::common::{
CurrentNetwork,
TranslucentLedgerService,
utils::{fire_unconfirmed_solutions, fire_unconfirmed_transactions, initialize_logger},
};
use snarkos_account::Account;
use snarkos_node_bft::{
BFT,
MAX_BATCH_DELAY_IN_MS,
MEMORY_POOL_PORT,
Primary,
helpers::{PrimarySender, Storage, init_primary_channels},
};
use snarkos_node_bft_storage_service::BFTMemoryService;
use snarkos_node_network::PeerPoolHandling;
use snarkos_node_sync::BlockSync;
use snarkos_utilities::{NodeDataDir, SimpleStoppable};
use snarkvm::{
console::{
account::{Address, PrivateKey},
algorithms::{BHP256, Hash},
network::Network,
},
ledger::{
Ledger,
block::Block,
committee::{Committee, MIN_VALIDATOR_STAKE},
narwhal::BatchHeader,
store::{ConsensusStore, helpers::memory::ConsensusMemory},
},
prelude::{CryptoRng, FromBytes, Rng, TestRng, ToBits, ToBytes, VM},
utilities::to_bytes_le,
};
use aleo_std::StorageMode;
use indexmap::IndexMap;
use itertools::Itertools;
#[cfg(feature = "locktick")]
use locktick::parking_lot::Mutex;
#[cfg(not(feature = "locktick"))]
use parking_lot::Mutex;
use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr, SocketAddr},
ops::RangeBounds,
sync::{Arc, OnceLock},
time::Duration,
};
use tokio::{task::JoinHandle, time::sleep};
use tracing::*;
#[derive(Clone, Copy, Debug)]
pub struct TestNetworkConfig {
pub num_nodes: u16,
pub bft: bool,
pub connect_all: bool,
pub fire_transmissions: Option<u64>,
pub log_level: Option<u8>,
pub log_connections: bool,
}
#[derive(Clone)]
pub struct TestNetwork {
pub config: TestNetworkConfig,
pub validators: HashMap<u16, TestValidator>,
}
#[derive(Clone)]
pub struct TestValidator {
pub id: u16,
pub primary: Primary<CurrentNetwork>,
pub primary_sender: Option<PrimarySender<CurrentNetwork>>,
pub bft: OnceLock<BFT<CurrentNetwork>>,
pub handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
}
pub type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
impl TestValidator {
pub fn fire_transmissions(&mut self, interval_ms: u64) {
let solution_handle = fire_unconfirmed_solutions(self.primary_sender.as_mut().unwrap(), self.id, interval_ms);
let transaction_handle =
fire_unconfirmed_transactions(self.primary_sender.as_mut().unwrap(), self.id, interval_ms);
self.handles.lock().push(solution_handle);
self.handles.lock().push(transaction_handle);
}
pub fn log_connections(&mut self) {
let self_clone = self.clone();
self.handles.lock().push(tokio::task::spawn(async move {
loop {
let connections = self_clone.primary.gateway().connected_peers();
info!("{} connections", connections.len());
for connection in connections {
debug!(" {}", connection);
}
sleep(Duration::from_secs(5)).await;
}
}));
}
}
impl TestNetwork {
pub fn new(config: TestNetworkConfig) -> Self {
let mut rng = TestRng::default();
if let Some(log_level) = config.log_level {
initialize_logger(log_level);
}
let (accounts, committee) = new_test_committee(config.num_nodes, &mut rng);
let bonded_balances: IndexMap<_, _> = committee
.members()
.iter()
.map(|(address, (amount, _, _))| (*address, (*address, *address, *amount)))
.collect();
let gen_key = *accounts[0].private_key();
let public_balance_per_validator = (CurrentNetwork::STARTING_SUPPLY
- (config.num_nodes as u64) * MIN_VALIDATOR_STAKE)
/ (config.num_nodes as u64);
let mut balances = IndexMap::<Address<CurrentNetwork>, u64>::new();
for account in accounts.iter() {
balances.insert(account.address(), public_balance_per_validator);
}
let mut validators = HashMap::with_capacity(config.num_nodes as usize);
for (id, account) in accounts.into_iter().enumerate() {
let gen_ledger =
genesis_ledger(gen_key, committee.clone(), balances.clone(), bonded_balances.clone(), &mut rng);
let ledger = Arc::new(TranslucentLedgerService::new(gen_ledger, SimpleStoppable::new()));
let storage = Storage::new(
ledger.clone(),
Arc::new(BFTMemoryService::new()),
BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64,
);
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
let (primary, bft) = if config.bft {
let bft = BFT::<CurrentNetwork>::new(
account,
storage,
ledger,
block_sync,
Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + id as u16)),
&[],
false,
NodeDataDir::new_test(None),
None,
)
.unwrap();
(bft.primary().clone(), Some(bft))
} else {
let primary = Primary::<CurrentNetwork>::new(
account,
storage,
ledger,
block_sync,
Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + id as u16)),
&[],
false,
NodeDataDir::new_test(None),
None,
)
.unwrap();
(primary, None)
};
let test_validator = TestValidator {
id: id as u16,
primary,
primary_sender: None,
bft: OnceLock::new(),
handles: Default::default(),
};
if let Some(bft) = bft {
assert!(test_validator.bft.set(bft).is_ok());
}
validators.insert(id as u16, test_validator);
}
Self { config, validators }
}
pub async fn start(&mut self) {
for validator in self.validators.values_mut() {
let (primary_sender, primary_receiver) = init_primary_channels();
validator.primary_sender = Some(primary_sender.clone());
if let Some(bft) = validator.bft.get_mut() {
bft.run(None, None, primary_sender, primary_receiver).await.unwrap();
} else {
validator.primary.run(None, None, None, primary_sender, primary_receiver).await.unwrap();
}
if let Some(interval_ms) = self.config.fire_transmissions {
validator.fire_transmissions(interval_ms);
}
if self.config.log_connections {
validator.log_connections();
}
}
if self.config.connect_all {
self.connect_all().await;
}
}
pub fn fire_transmissions_at(&mut self, id: u16, interval_ms: u64) {
self.validators.get_mut(&id).unwrap().fire_transmissions(interval_ms);
}
pub async fn connect_validators(&self, first_id: u16, second_id: u16) {
let first_validator = self.validators.get(&first_id).unwrap();
let second_validator_ip = self.validators.get(&second_id).unwrap().primary.gateway().local_ip();
let _ = first_validator.primary.gateway().connect(second_validator_ip);
sleep(Duration::from_millis(100)).await;
}
pub async fn connect_all(&self) {
for (validator, other_validator) in self.validators.values().tuple_combinations() {
let ip = other_validator.primary.gateway().local_ip();
let _ = validator.primary.gateway().connect(ip);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
pub async fn connect_one(&self, id: u16) {
let target_validator = self.validators.get(&id).unwrap();
let target_ip = target_validator.primary.gateway().local_ip();
for validator in self.validators.values() {
if validator.id != id {
let _ = validator.primary.gateway().connect(target_ip);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
}
pub async fn disconnect(&self, num_nodes: u16) {
for validator in self.validators.values().take(num_nodes as usize) {
for peer_ip in validator.primary.gateway().connected_peers().iter() {
validator.primary.gateway().disconnect(*peer_ip);
}
}
sleep(Duration::from_millis(100)).await;
}
pub async fn disconnect_one(&self, id: u16) {
let target_validator = self.validators.get(&id).unwrap();
for peer_ip in target_validator.primary.gateway().connected_peers().iter() {
target_validator.primary.gateway().disconnect(*peer_ip);
}
sleep(Duration::from_millis(100)).await;
}
pub fn is_round_reached(&self, round: u64) -> bool {
let quorum_threshold = self.validators.len() - (self.validators.len() - 1) / 3;
self.validators.values().filter(|v| v.primary.current_round() >= round).count() >= quorum_threshold
}
pub async fn is_halted(&self) -> bool {
let halt_round = self.validators.values().map(|v| v.primary.current_round()).max().unwrap();
sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS * 2)).await;
self.validators.values().all(|v| v.primary.current_round() <= halt_round)
}
pub fn is_committee_coherent<T>(&self, rounds_range: T) -> bool
where
T: RangeBounds<u64> + IntoIterator<Item = u64>,
{
for round in rounds_range.into_iter() {
let mut last: Option<Committee<CurrentNetwork>> = None;
for validator in self.validators.values() {
if let Ok(committee) = validator.primary.ledger().get_committee_for_round(round) {
match last.clone() {
None => last = Some(committee),
Some(first) => {
if first != committee {
return false;
}
}
}
}
}
}
true
}
pub fn is_certificate_round_coherent<T>(&self, rounds_range: T) -> bool
where
T: RangeBounds<u64> + IntoIterator<Item = u64>,
{
rounds_range.into_iter().all(|round| {
self.validators.values().map(|v| v.primary.storage().get_certificates_for_round(round)).dedup().count() == 1
})
}
}
pub fn new_test_committee(n: u16, rng: &mut TestRng) -> (Vec<Account<CurrentNetwork>>, Committee<CurrentNetwork>) {
let mut accounts = Vec::with_capacity(n as usize);
let mut members = IndexMap::with_capacity(n as usize);
for i in 0..n {
let account = Account::new(rng).unwrap();
info!("Validator {}: {}", i, account.address());
members.insert(account.address(), (MIN_VALIDATOR_STAKE, false, rng.gen_range(0..100)));
accounts.push(account);
}
let committee = Committee::<CurrentNetwork>::new(0u64, members).unwrap();
(accounts, committee)
}
fn genesis_cache() -> &'static Mutex<HashMap<Vec<u8>, Block<CurrentNetwork>>> {
static CACHE: OnceLock<Mutex<HashMap<Vec<u8>, Block<CurrentNetwork>>>> = OnceLock::new();
CACHE.get_or_init(|| Mutex::new(HashMap::new()))
}
pub fn genesis_block(
genesis_private_key: PrivateKey<CurrentNetwork>,
committee: Committee<CurrentNetwork>,
public_balances: IndexMap<Address<CurrentNetwork>, u64>,
bonded_balances: IndexMap<Address<CurrentNetwork>, (Address<CurrentNetwork>, Address<CurrentNetwork>, u64)>,
rng: &mut (impl Rng + CryptoRng),
) -> Block<CurrentNetwork> {
let store = ConsensusStore::<_, ConsensusMemory<_>>::open(StorageMode::new_test(None)).unwrap();
let vm = VM::from(store).unwrap();
vm.genesis_quorum(&genesis_private_key, committee, public_balances, bonded_balances, rng).unwrap()
}
pub fn genesis_ledger(
genesis_private_key: PrivateKey<CurrentNetwork>,
committee: Committee<CurrentNetwork>,
public_balances: IndexMap<Address<CurrentNetwork>, u64>,
bonded_balances: IndexMap<Address<CurrentNetwork>, (Address<CurrentNetwork>, Address<CurrentNetwork>, u64)>,
rng: &mut (impl Rng + CryptoRng),
) -> CurrentLedger {
let cache_key =
to_bytes_le![genesis_private_key, committee, public_balances.iter().collect::<Vec<(_, _)>>()].unwrap();
let block = genesis_cache()
.lock()
.entry(cache_key.clone())
.or_insert_with(|| {
let hasher = BHP256::<CurrentNetwork>::setup("aleo.dev.block").unwrap();
let file_name = hasher.hash(&cache_key.to_bits_le()).unwrap().to_string() + ".genesis";
let file_path = std::env::temp_dir().join(file_name);
if file_path.exists() {
let buffer = std::fs::read(file_path).unwrap();
return Block::from_bytes_le(&buffer).unwrap();
}
let block = genesis_block(genesis_private_key, committee, public_balances, bonded_balances, rng);
std::fs::write(&file_path, block.to_bytes_le().unwrap()).unwrap();
block
})
.clone();
CurrentLedger::load(block, StorageMode::new_test(None)).unwrap()
}