mod router;
use crate::traits::NodeInterface;
use snarkos_account::Account;
use snarkos_node_bft::{ledger_service::CoreLedgerService, spawn_blocking};
use snarkos_node_cdn::CdnBlockSync;
use snarkos_node_consensus::Consensus;
use snarkos_node_network::{NodeType, PeerPoolHandling};
use snarkos_node_rest::Rest;
use snarkos_node_router::{
Heartbeat,
Inbound,
Outbound,
Router,
Routing,
messages::{PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction},
};
use snarkos_node_sync::{BlockSync, Ping};
use snarkos_node_tcp::{
P2P,
protocols::{Disconnect, Handshake, OnConnect, Reading},
};
use snarkos_utilities::{NodeDataDir, SignalHandler};
use snarkvm::prelude::{
Ledger,
Network,
block::{Block, Header},
puzzle::Solution,
store::ConsensusStorage,
};
use aleo_std::StorageMode;
use anyhow::{Context, Result};
use core::future::Future;
#[cfg(feature = "locktick")]
use locktick::parking_lot::Mutex;
#[cfg(not(feature = "locktick"))]
use parking_lot::Mutex;
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::task::JoinHandle;
#[derive(Clone)]
pub struct Validator<N: Network, C: ConsensusStorage<N>> {
ledger: Ledger<N, C>,
consensus: Consensus<N>,
router: Router<N>,
rest: Option<Rest<N, C, Self>>,
sync: Arc<BlockSync<N>>,
pub(crate) handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
ping: Arc<Ping<N>>,
}
impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
pub async fn new(
node_ip: SocketAddr,
bft_ip: Option<SocketAddr>,
rest_ip: Option<SocketAddr>,
rest_rps: u32,
account: Account<N>,
trusted_peers: &[SocketAddr],
trusted_validators: &[SocketAddr],
genesis: Block<N>,
cdn: Option<http::Uri>,
storage_mode: StorageMode,
node_data_dir: NodeDataDir,
trusted_peers_only: bool,
dev_txs: bool,
dev: Option<u16>,
signal_handler: Arc<SignalHandler>,
) -> Result<Self> {
let ledger = {
let storage_mode = storage_mode.clone();
let genesis = genesis.clone();
spawn_blocking!(Ledger::<N, C>::load(genesis, storage_mode))
}
.with_context(|| "Failed to initialize the ledger")?;
let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), signal_handler.clone()));
let router = Router::new(
node_ip,
NodeType::Validator,
account.clone(),
ledger_service.clone(),
trusted_peers,
Self::MAXIMUM_NUMBER_OF_PEERS as u16,
trusted_peers_only,
node_data_dir.clone(),
dev.is_some(),
)
.await?;
let sync = Arc::new(BlockSync::new(ledger_service.clone()));
let locators = sync.get_block_locators()?;
let ping = Arc::new(Ping::new(router.clone(), locators));
let consensus = Consensus::new(
account.clone(),
ledger_service.clone(),
sync.clone(),
bft_ip,
trusted_validators,
trusted_peers_only,
storage_mode.clone(),
node_data_dir.clone(),
ping.clone(),
dev,
)
.await?;
let mut node = Self {
ledger: ledger.clone(),
consensus: consensus.clone(),
router,
rest: None,
sync: sync.clone(),
ping,
handles: Default::default(),
};
let cdn_sync = cdn.map(|base_url| Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler)));
node.initialize_transaction_pool(dev, dev_txs)?;
if let Some(rest_ip) = rest_ip {
node.rest = Some(
Rest::start(
rest_ip,
rest_rps,
Some(consensus),
ledger.clone(),
Arc::new(node.clone()),
cdn_sync.clone(),
sync,
)
.await?,
);
}
if let Some(cdn_sync) = cdn_sync {
if let Err(error) = cdn_sync.wait().await.with_context(|| "Failed to synchronize from the CDN") {
crate::log_clean_error(&storage_mode);
node.shut_down().await;
return Err(error);
}
}
node.initialize_routing().await;
node.handles.lock().push(crate::start_notification_message_loop());
Ok(node)
}
pub fn ledger(&self) -> &Ledger<N, C> {
&self.ledger
}
pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
&self.rest
}
pub fn router(&self) -> &Router<N> {
&self.router
}
fn initialize_transaction_pool(&self, dev: Option<u16>, dev_txs: bool) -> Result<()> {
use snarkvm::console::{
program::{Identifier, Literal, ProgramID, Value},
types::U64,
};
use std::str::FromStr;
let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("transfer_public")?);
match dev {
Some(id) => {
if id != 0 || !dev_txs {
return Ok(());
}
}
_ => return Ok(()),
}
let self_ = self.clone();
self.spawn(async move {
tokio::time::sleep(Duration::from_secs(3)).await;
info!("Starting transaction pool...");
loop {
tokio::time::sleep(Duration::from_millis(500)).await;
let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))];
let self__ = self_.clone();
let transaction = match spawn_blocking!(
self__
.ledger
.vm()
.execute(
self__.private_key(),
locator,
inputs.into_iter(),
None,
10_000,
None,
&mut rand::thread_rng(),
)
.map_err(|err| err.into())
) {
Ok(transaction) => transaction,
Err(error) => {
error!("Transaction pool encountered an execution error - {error}");
continue;
}
};
if self_
.unconfirmed_transaction(
self_.router.local_ip(),
UnconfirmedTransaction::from(transaction.clone()),
transaction.clone(),
)
.await
{
info!("Transaction pool broadcasted the transaction");
}
}
});
Ok(())
}
pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
self.handles.lock().push(tokio::spawn(future));
}
}
#[async_trait]
impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
async fn shut_down(&self) {
info!("Shutting down...");
trace!("Shutting down the node...");
trace!("Shutting down the validator...");
self.handles.lock().iter().for_each(|handle| handle.abort());
self.router.shut_down().await;
trace!("Shutting down consensus...");
self.consensus.shut_down().await;
info!("Node has shut down.");
}
}
#[cfg(test)]
mod tests {
use super::*;
use snarkvm::prelude::{
MainnetV0,
VM,
store::{ConsensusStore, helpers::memory::ConsensusMemory},
};
use anyhow::bail;
use rand::SeedableRng;
use rand_chacha::ChaChaRng;
use std::str::FromStr;
type CurrentNetwork = MainnetV0;
#[ignore]
#[tokio::test]
async fn test_profiler() -> Result<()> {
let node = SocketAddr::from_str("0.0.0.0:4130").unwrap();
let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
let storage_mode = StorageMode::Development(0);
let node_data_dir = NodeDataDir::new_development(CurrentNetwork::ID, 0);
let dev_txs = true;
let mut rng = ChaChaRng::seed_from_u64(1234567890u64);
let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(
StorageMode::new_test(None),
)?)?;
let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
println!("Initializing validator node...");
let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
node,
None,
Some(rest),
10,
account,
&[],
&[],
genesis,
None,
storage_mode,
node_data_dir,
false,
dev_txs,
None,
SignalHandler::new(),
)
.await
.unwrap();
println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
bail!("\n\nRemember to #[ignore] this test!\n\n")
}
}