use crate::common::{CurrentNetwork, TranslucentLedgerService, primary};
use snarkos_account::Account;
use snarkos_node_bft::{
Gateway,
Worker,
helpers::{PrimarySender, Storage},
storage_service::BFTMemoryService,
};
use snarkos_utilities::{NodeDataDir, SimpleStoppable};
use snarkvm::{
console::account::Address,
ledger::{
committee::Committee,
narwhal::{BatchHeader, Data},
store::helpers::memory::ConsensusMemory,
},
prelude::{
Field,
Network,
TestRng,
Uniform,
block::Transaction,
committee::MIN_VALIDATOR_STAKE,
puzzle::{Solution, SolutionID},
},
};
use std::{str::FromStr, sync::Arc, time::Duration};
use ::bytes::Bytes;
use indexmap::IndexMap;
#[cfg(feature = "locktick")]
use locktick::parking_lot::RwLock;
#[cfg(not(feature = "locktick"))]
use parking_lot::RwLock;
use rand::Rng;
use tokio::{sync::oneshot, task::JoinHandle, time::sleep};
use tracing::*;
use tracing_subscriber::{
layer::{Layer, SubscriberExt},
util::SubscriberInitExt,
};
pub fn initialize_logger(verbosity: u8) {
let verbosity_str = match verbosity {
0 => "info",
1 => "debug",
2..=4 => "trace",
_ => "info",
};
let filter = tracing_subscriber::EnvFilter::from_str(verbosity_str)
.unwrap()
.add_directive("mio=off".parse().unwrap())
.add_directive("tokio_util=off".parse().unwrap())
.add_directive("hyper=off".parse().unwrap())
.add_directive("reqwest=off".parse().unwrap())
.add_directive("want=off".parse().unwrap())
.add_directive("h2=off".parse().unwrap());
let filter = if verbosity > 3 {
filter.add_directive("snarkos_node_tcp=trace".parse().unwrap())
} else {
filter.add_directive("snarkos_node_tcp=off".parse().unwrap())
};
let _ = tracing_subscriber::registry()
.with(tracing_subscriber::fmt::Layer::default().with_target(verbosity > 2).with_filter(filter))
.try_init();
}
pub fn fire_unconfirmed_solutions(
sender: &PrimarySender<CurrentNetwork>,
node_id: u16,
interval_ms: u64,
) -> JoinHandle<()> {
let tx_unconfirmed_solution = sender.tx_unconfirmed_solution.clone();
tokio::task::spawn(async move {
let mut shared_rng = TestRng::fixed(123456789);
let mut unique_rng = TestRng::fixed(node_id as u64);
async fn sample(mut rng: impl Rng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
let solution_id = rng.r#gen::<u64>().into();
let mut vec = vec![0u8; 1024];
rng.fill_bytes(&mut vec);
let solution = Data::Buffer(Bytes::from(vec));
(solution_id, solution)
}
let mut counter = 0;
loop {
let (solution_id, solution) =
if counter % 2 == 0 { sample(&mut shared_rng).await } else { sample(&mut unique_rng).await };
let (callback, callback_receiver) = oneshot::channel();
if let Err(e) = tx_unconfirmed_solution.send((solution_id, solution, callback)).await {
error!("Failed to send unconfirmed solution: {e}");
}
let _ = callback_receiver.await;
counter += 1;
sleep(Duration::from_millis(interval_ms)).await;
}
})
}
pub fn fire_unconfirmed_transactions(
sender: &PrimarySender<CurrentNetwork>,
node_id: u16,
interval_ms: u64,
) -> JoinHandle<()> {
let tx_unconfirmed_transaction = sender.tx_unconfirmed_transaction.clone();
tokio::task::spawn(async move {
let mut shared_rng = TestRng::fixed(123456789);
let mut unique_rng = TestRng::fixed(node_id as u64);
fn sample(
mut rng: impl Rng,
) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
let id = Field::<CurrentNetwork>::rand(&mut rng).into();
let mut vec = vec![0u8; 1024];
rng.fill_bytes(&mut vec);
let transaction = Data::Buffer(Bytes::from(vec));
(id, transaction)
}
let mut counter = 0;
loop {
let (id, transaction) = if counter % 2 == 0 { sample(&mut shared_rng) } else { sample(&mut unique_rng) };
let (callback, callback_receiver) = oneshot::channel();
if let Err(e) = tx_unconfirmed_transaction.send((id, transaction, callback)).await {
error!("Failed to send unconfirmed transaction: {e}");
}
let _ = callback_receiver.await;
counter += 1;
sleep(Duration::from_millis(interval_ms)).await;
}
})
}
pub fn sample_ledger(
accounts: &[Account<CurrentNetwork>],
committee: &Committee<CurrentNetwork>,
rng: &mut TestRng,
) -> Arc<TranslucentLedgerService<CurrentNetwork, ConsensusMemory<CurrentNetwork>>> {
let num_nodes = committee.num_members();
let bonded_balances: IndexMap<_, _> =
committee.members().iter().map(|(address, (amount, _, _))| (*address, (*address, *address, *amount))).collect();
let gen_key = *accounts[0].private_key();
let public_balance_per_validator =
(CurrentNetwork::STARTING_SUPPLY - (num_nodes as u64) * MIN_VALIDATOR_STAKE) / (num_nodes as u64);
let mut balances = IndexMap::<Address<CurrentNetwork>, u64>::new();
for account in accounts.iter() {
balances.insert(account.address(), public_balance_per_validator);
}
let gen_ledger =
primary::genesis_ledger(gen_key, committee.clone(), balances.clone(), bonded_balances.clone(), rng);
Arc::new(TranslucentLedgerService::new(gen_ledger, SimpleStoppable::new()))
}
pub fn sample_storage<N: Network>(ledger: Arc<TranslucentLedgerService<N, ConsensusMemory<N>>>) -> Storage<N> {
Storage::new(ledger, Arc::new(BFTMemoryService::new()), BatchHeader::<N>::MAX_GC_ROUNDS as u64)
}
pub fn sample_gateway<N: Network>(
account: Account<N>,
storage: Storage<N>,
ledger: Arc<TranslucentLedgerService<N, ConsensusMemory<N>>>,
) -> Gateway<N> {
Gateway::new(account, storage, ledger, None, &[], false, NodeDataDir::new_test(None), None).unwrap()
}
pub fn sample_worker<N: Network>(
id: u8,
account: Account<N>,
ledger: Arc<TranslucentLedgerService<N, ConsensusMemory<N>>>,
) -> Worker<N> {
let storage = sample_storage(ledger.clone());
let gateway = sample_gateway(account, storage.clone(), ledger.clone());
let proposed_batch = Arc::new(RwLock::new(None));
Worker::new(id, Arc::new(gateway.clone()), storage.clone(), ledger, proposed_batch).unwrap()
}