#[macro_use]
extern crate tracing;
#[cfg(feature = "metrics")]
extern crate snarkos_node_metrics as metrics;
use snarkos_account::Account;
use snarkos_node_bft::{
BFT,
MEMORY_POOL_PORT,
Primary,
helpers::{ConsensusReceiver, PrimarySender, Storage, init_consensus_channels, init_primary_channels},
};
use snarkos_node_bft_ledger_service::TranslucentLedgerService;
use snarkos_node_bft_storage_service::BFTMemoryService;
use snarkos_node_sync::BlockSync;
use snarkos_utilities::{NodeDataDir, SimpleStoppable};
use aleo_std::StorageMode;
use snarkvm::{
console::{account::PrivateKey, algorithms::BHP256, types::Address},
ledger::{
Block,
Ledger,
block::Transaction,
committee::{Committee, MIN_VALIDATOR_STAKE},
narwhal::{BatchHeader, Data},
puzzle::{Solution, SolutionID},
store::{ConsensusStore, helpers::memory::ConsensusMemory},
},
prelude::{Field, Hash, Network, Uniform, VM},
utilities::{FromBytes, TestRng, ToBits, ToBytes, to_bytes_le},
};
use ::bytes::Bytes;
use anyhow::{Error, Result, anyhow, ensure};
use axum::{
Router,
extract::{Path, State},
http::StatusCode,
response::{IntoResponse, Response},
routing::get,
};
use axum_extra::response::ErasedJson;
use clap::{Parser, ValueEnum};
use indexmap::IndexMap;
use rand::{CryptoRng, Rng, SeedableRng};
use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
str::FromStr,
sync::{Arc, Mutex, OnceLock},
};
use tokio::{net::TcpListener, sync::oneshot};
use tracing_subscriber::{
layer::{Layer, SubscriberExt},
util::SubscriberInitExt,
};
type CurrentNetwork = snarkvm::prelude::MainnetV0;
pub fn initialize_logger(verbosity: u8) {
let verbosity_str = match verbosity {
0 => "info",
1 => "debug",
2..=4 => "trace",
_ => "info",
};
let filter = tracing_subscriber::EnvFilter::from_str(verbosity_str)
.unwrap()
.add_directive("mio=off".parse().unwrap())
.add_directive("tokio_util=off".parse().unwrap())
.add_directive("hyper=off".parse().unwrap())
.add_directive("reqwest=off".parse().unwrap())
.add_directive("want=off".parse().unwrap())
.add_directive("h2=off".parse().unwrap());
let filter = if verbosity > 3 {
filter.add_directive("snarkos_node_bft::gateway=trace".parse().unwrap())
} else {
filter.add_directive("snarkos_node_bft::gateway=off".parse().unwrap())
};
let filter = if verbosity > 4 {
filter.add_directive("snarkos_node_tcp=trace".parse().unwrap())
} else {
filter.add_directive("snarkos_node_tcp=off".parse().unwrap())
};
let _ = tracing_subscriber::registry()
.with(tracing_subscriber::fmt::Layer::default().with_target(verbosity > 2).with_filter(filter))
.try_init();
}
pub async fn start_bft(
node_id: u16,
num_nodes: u16,
peers: HashMap<u16, SocketAddr>,
) -> Result<(BFT<CurrentNetwork>, PrimarySender<CurrentNetwork>)> {
let (sender, receiver) = init_primary_channels();
let (committee, account) = initialize_components(node_id, num_nodes)?;
let ledger = create_ledger(&account, num_nodes, committee, node_id);
let storage = Storage::new(
ledger.clone(),
Arc::new(BFTMemoryService::new()),
BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64,
);
let ip = match peers.get(&node_id) {
Some(ip) => Some(*ip),
None => Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + node_id)),
};
let node_data_dir = NodeDataDir::new_test(None);
let trusted_validators = trusted_validators(node_id, num_nodes, peers);
let trusted_peers_only = false;
let (consensus_sender, consensus_receiver) = init_consensus_channels::<CurrentNetwork>();
consensus_handler(consensus_receiver);
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
let mut bft = BFT::<CurrentNetwork>::new(
account,
storage,
ledger,
block_sync,
ip,
&trusted_validators,
trusted_peers_only,
node_data_dir,
None,
)?;
bft.run(None, Some(consensus_sender), sender.clone(), receiver).await?;
let primary = bft.primary();
handle_signals(primary);
Ok((bft, sender))
}
pub async fn start_primary(
node_id: u16,
num_nodes: u16,
peers: HashMap<u16, SocketAddr>,
) -> Result<(Primary<CurrentNetwork>, PrimarySender<CurrentNetwork>)> {
let (sender, receiver) = init_primary_channels();
let (committee, account) = initialize_components(node_id, num_nodes)?;
let ledger = create_ledger(&account, num_nodes, committee, node_id);
let storage = Storage::new(
ledger.clone(),
Arc::new(BFTMemoryService::new()),
BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64,
);
let ip = match peers.get(&node_id) {
Some(ip) => Some(*ip),
None => Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + node_id)),
};
let node_data_dir = NodeDataDir::new_test(None);
let trusted_validators = trusted_validators(node_id, num_nodes, peers);
let trusted_peers_only = false;
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
let primary = Primary::<CurrentNetwork>::new(
account,
storage,
ledger,
block_sync,
ip,
&trusted_validators,
trusted_peers_only,
node_data_dir,
None,
)?;
primary.run(None, None, None, sender.clone(), receiver).await?;
handle_signals(&primary);
Ok((primary, sender))
}
fn create_ledger(
account: &Account<CurrentNetwork>,
num_nodes: u16,
committee: Committee<snarkvm::prelude::MainnetV0>,
node_id: u16,
) -> Arc<TranslucentLedgerService<snarkvm::prelude::MainnetV0, ConsensusMemory<snarkvm::prelude::MainnetV0>>> {
let gen_key = account.private_key();
let public_balance_per_validator =
(CurrentNetwork::STARTING_SUPPLY - (num_nodes as u64) * MIN_VALIDATOR_STAKE) / (num_nodes as u64);
let mut balances = IndexMap::<Address<CurrentNetwork>, u64>::new();
for address in committee.members().keys() {
balances.insert(*address, public_balance_per_validator);
}
let mut rng = TestRng::default();
let gen_ledger = genesis_ledger(*gen_key, committee.clone(), balances.clone(), node_id, &mut rng);
Arc::new(TranslucentLedgerService::new(gen_ledger, SimpleStoppable::new()))
}
pub type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
fn genesis_cache() -> &'static Mutex<HashMap<Vec<u8>, Block<CurrentNetwork>>> {
static CACHE: OnceLock<Mutex<HashMap<Vec<u8>, Block<CurrentNetwork>>>> = OnceLock::new();
CACHE.get_or_init(|| Mutex::new(HashMap::new()))
}
fn genesis_block(
genesis_private_key: PrivateKey<CurrentNetwork>,
committee: Committee<CurrentNetwork>,
public_balances: IndexMap<Address<CurrentNetwork>, u64>,
rng: &mut (impl Rng + CryptoRng),
) -> Block<CurrentNetwork> {
let store = ConsensusStore::<_, ConsensusMemory<_>>::open(StorageMode::new_test(None)).unwrap();
let vm = VM::from(store).unwrap();
let bonded_balances: IndexMap<_, _> =
committee.members().iter().map(|(address, (amount, _, _))| (*address, (*address, *address, *amount))).collect();
vm.genesis_quorum(&genesis_private_key, committee, public_balances, bonded_balances, rng).unwrap()
}
fn genesis_ledger(
genesis_private_key: PrivateKey<CurrentNetwork>,
committee: Committee<CurrentNetwork>,
public_balances: IndexMap<Address<CurrentNetwork>, u64>,
node_id: u16,
rng: &mut (impl Rng + CryptoRng),
) -> CurrentLedger {
let cache_key =
to_bytes_le![genesis_private_key, committee, public_balances.iter().collect::<Vec<(_, _)>>()].unwrap();
let block = genesis_cache()
.lock()
.unwrap()
.entry(cache_key.clone())
.or_insert_with(|| {
let hasher = BHP256::<CurrentNetwork>::setup("aleo.dev.block").unwrap();
let file_name = hasher.hash(&cache_key.to_bits_le()).unwrap().to_string() + ".genesis";
let file_path = std::env::temp_dir().join(file_name);
if file_path.exists() {
let buffer = std::fs::read(file_path).unwrap();
return Block::from_bytes_le(&buffer).unwrap();
}
let block = genesis_block(genesis_private_key, committee, public_balances, rng);
std::fs::write(&file_path, block.to_bytes_le().unwrap()).unwrap();
block
})
.clone();
CurrentLedger::load(block, aleo_std::StorageMode::Development(node_id)).unwrap()
}
fn initialize_components(node_id: u16, num_nodes: u16) -> Result<(Committee<CurrentNetwork>, Account<CurrentNetwork>)> {
ensure!(node_id < num_nodes, "Node ID {node_id} must be less than {num_nodes}");
let account = Account::new(&mut rand_chacha::ChaChaRng::seed_from_u64(node_id as u64))?;
println!("\n{account}\n");
let mut members = IndexMap::with_capacity(num_nodes as usize);
for i in 0..num_nodes {
let account = Account::new(&mut rand_chacha::ChaChaRng::seed_from_u64(i as u64))?;
members.insert(account.address(), (MIN_VALIDATOR_STAKE, false, i as u8));
println!(" Validator {}: {}", i, account.address());
}
println!();
let committee = Committee::<CurrentNetwork>::new(0u64, members)?;
Ok((committee, account))
}
fn consensus_handler(receiver: ConsensusReceiver<CurrentNetwork>) {
let ConsensusReceiver { mut rx_consensus_subdag } = receiver;
tokio::task::spawn(async move {
while let Some((subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
let subdag_ms = subdag.values().flatten().count();
let transmissions_ms = transmissions.len() * 25;
let constant_ms = 100;
let sleep_ms = (subdag_ms + transmissions_ms + constant_ms) as u64;
tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
callback.send(Ok(())).ok();
}
});
}
fn trusted_validators(node_id: u16, num_nodes: u16, peers: HashMap<u16, SocketAddr>) -> Vec<SocketAddr> {
let mut trusted = Vec::with_capacity(num_nodes as usize);
for i in 0..num_nodes {
let ip = match peers.get(&i) {
Some(ip) => *ip,
None => SocketAddr::from_str(&format!("127.0.0.1:{}", MEMORY_POOL_PORT + i)).unwrap(),
};
if i != node_id {
trusted.push(ip);
}
}
trusted
}
fn handle_signals(primary: &Primary<CurrentNetwork>) {
let node = primary.clone();
tokio::task::spawn(async move {
match tokio::signal::ctrl_c().await {
Ok(()) => {
node.shut_down().await;
std::process::exit(0);
}
Err(error) => error!("tokio::signal::ctrl_c encountered an error: {}", error),
}
});
}
fn fire_unconfirmed_solutions(sender: &PrimarySender<CurrentNetwork>, node_id: u16, interval_ms: u64) {
let tx_unconfirmed_solution = sender.tx_unconfirmed_solution.clone();
tokio::task::spawn(async move {
let mut shared_rng = rand_chacha::ChaChaRng::seed_from_u64(123456789);
let mut unique_rng = rand_chacha::ChaChaRng::seed_from_u64(node_id as u64);
fn sample(mut rng: impl Rng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
let solution_id = rng.r#gen::<u64>().into();
let solution = Data::Buffer(Bytes::from((0..1024).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
(solution_id, solution)
}
let mut counter = 0;
loop {
let (solution_id, solution) =
if counter % 2 == 0 { sample(&mut shared_rng) } else { sample(&mut unique_rng) };
let (callback, callback_receiver) = oneshot::channel();
if let Err(e) = tx_unconfirmed_solution.send((solution_id, solution, callback)).await {
error!("Failed to send unconfirmed solution: {e}");
}
let _ = callback_receiver.await;
counter += 1;
tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await;
}
});
}
fn fire_unconfirmed_transactions(sender: &PrimarySender<CurrentNetwork>, node_id: u16, interval_ms: u64) {
let tx_unconfirmed_transaction = sender.tx_unconfirmed_transaction.clone();
tokio::task::spawn(async move {
let mut shared_rng = rand_chacha::ChaChaRng::seed_from_u64(123456789);
let mut unique_rng = rand_chacha::ChaChaRng::seed_from_u64(node_id as u64);
fn sample(
mut rng: impl Rng,
) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
let id = Field::<CurrentNetwork>::rand(&mut rng).into();
let transaction = Data::Buffer(Bytes::from((0..1024).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
(id, transaction)
}
let mut counter = 0;
loop {
let (id, transaction) = if counter % 2 == 0 { sample(&mut shared_rng) } else { sample(&mut unique_rng) };
let (callback, callback_receiver) = oneshot::channel();
if let Err(e) = tx_unconfirmed_transaction.send((id, transaction, callback)).await {
error!("Failed to send unconfirmed transaction: {e}");
}
let _ = callback_receiver.await;
counter += 1;
tokio::time::sleep(std::time::Duration::from_millis(interval_ms)).await;
}
});
}
pub struct RestError(pub String);
impl IntoResponse for RestError {
fn into_response(self) -> Response {
(StatusCode::INTERNAL_SERVER_ERROR, format!("Something went wrong: {}", self.0)).into_response()
}
}
#[derive(Clone)]
struct NodeState {
bft: Option<BFT<CurrentNetwork>>,
primary: Primary<CurrentNetwork>,
}
async fn get_leader(State(node): State<NodeState>) -> Result<ErasedJson, RestError> {
match &node.bft {
Some(bft) => Ok(ErasedJson::pretty(bft.leader())),
None => Err(RestError("BFT is not enabled".into())),
}
}
async fn get_current_round(State(node): State<NodeState>) -> Result<ErasedJson, RestError> {
Ok(ErasedJson::pretty(node.primary.current_round()))
}
async fn get_certificates_for_round(
State(node): State<NodeState>,
Path(round): Path<u64>,
) -> Result<ErasedJson, RestError> {
Ok(ErasedJson::pretty(node.primary.storage().get_certificates_for_round(round)))
}
async fn start_server(bft: Option<BFT<CurrentNetwork>>, primary: Primary<CurrentNetwork>, node_id: u16) {
let router = Router::new()
.route("/", get(|| async { "Hello, World!" }))
.route("/leader", get(get_leader))
.route("/round/current", get(get_current_round))
.route("/certificates/:round", get(get_certificates_for_round))
.with_state(NodeState { bft, primary });
let addr = format!("127.0.0.1:{}", 3000 + node_id);
info!("Starting the server at '{addr}'...");
let rest_addr: SocketAddr = addr.parse().unwrap();
let rest_listener = TcpListener::bind(rest_addr).await.unwrap();
axum::serve(rest_listener, router.into_make_service_with_connect_info::<SocketAddr>()).await.unwrap();
}
#[derive(Debug, Clone, ValueEnum)]
enum Mode {
Narwhal,
Bft,
}
#[derive(Parser, Debug)]
struct Args {
#[arg(long)]
mode: Mode,
#[arg(long, value_name = "ID")]
id: u16,
#[arg(long, value_name = "N")]
num_nodes: u16,
#[arg(long, value_name = "PATH")]
peers: Option<PathBuf>,
#[arg(long, value_name = "INTERVAL_MS")]
fire_solutions: Option<Option<u64>>,
#[arg(long, value_name = "INTERVAL_MS")]
fire_transactions: Option<Option<u64>>,
#[arg(long, value_name = "INTERVAL_MS")]
fire_transmissions: Option<Option<u64>>,
#[clap(long, default_value = "false")]
metrics: bool,
}
fn parse_peers(peers_string: String) -> Result<HashMap<u16, SocketAddr>, Error> {
let mut peers = HashMap::new();
for peer in peers_string.lines() {
let mut split = peer.split('=');
let node_id = u16::from_str(split.next().ok_or_else(|| anyhow!("Bad Format"))?)?;
let addr: String = split.next().ok_or_else(|| anyhow!("Bad Format"))?.parse()?;
let ip = SocketAddr::from_str(addr.as_str())?;
peers.insert(node_id, ip);
}
Ok(peers)
}
#[tokio::main]
async fn main() -> Result<()> {
initialize_logger(1);
let args = Args::parse();
let peers = match args.peers {
Some(path) => parse_peers(std::fs::read_to_string(path)?)?,
None => Default::default(),
};
let mut bft_holder = None;
let (primary, sender) = match args.mode {
Mode::Bft => {
let (bft, sender) = start_bft(args.id, args.num_nodes, peers).await?;
bft_holder = Some(bft.clone());
(bft.primary().clone(), sender)
}
Mode::Narwhal => start_primary(args.id, args.num_nodes, peers).await?,
};
const DEFAULT_INTERVAL_MS: u64 = 450;
match (args.fire_transmissions, args.fire_solutions) {
(Some(rate), _) | (_, Some(rate)) => {
fire_unconfirmed_solutions(&sender, args.id, rate.unwrap_or(DEFAULT_INTERVAL_MS));
}
_ => (),
};
match (args.fire_transmissions, args.fire_transactions) {
(Some(rate), _) | (_, Some(rate)) => {
fire_unconfirmed_transactions(&sender, args.id, rate.unwrap_or(DEFAULT_INTERVAL_MS));
}
_ => (),
};
#[cfg(feature = "metrics")]
if args.metrics {
info!("Initializing metrics...");
metrics::initialize_metrics(SocketAddr::from_str(&format!("0.0.0.0:{}", 9000 + args.id)).ok());
}
start_server(bft_holder, primary, args.id).await;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_peers_empty() -> Result<(), Error> {
let peers = parse_peers("".to_owned())?;
assert_eq!(peers.len(), 0);
Ok(())
}
#[test]
fn parse_peers_ok() -> Result<(), Error> {
let s = r#"0=192.168.1.176:5000
1=192.168.1.176:5001
2=192.168.1.176:5002
3=192.168.1.176:5003"#;
let peers = parse_peers(s.to_owned())?;
assert_eq!(peers.len(), 4);
Ok(())
}
#[test]
fn parse_peers_bad_id() -> Result<(), Error> {
let s = "A=192.168.1.176:5000";
let peers = parse_peers(s.to_owned());
assert!(peers.is_err());
Ok(())
}
#[test]
fn parse_peers_bad_format() -> Result<(), Error> {
let s = "foo";
let peers = parse_peers(s.to_owned());
assert!(peers.is_err());
Ok(())
}
}