mod router;
use crate::{
bft::{events::DataBlocks, helpers::fmt_id, ledger_service::CoreLedgerService, spawn_blocking},
cdn::CdnBlockSync,
traits::NodeInterface,
};
use snarkos_account::Account;
use snarkos_node_network::NodeType;
use snarkos_node_rest::Rest;
use snarkos_node_router::{
Heartbeat,
Inbound,
Outbound,
Router,
Routing,
messages::{Message, UnconfirmedSolution, UnconfirmedTransaction},
};
use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
use snarkos_node_tcp::{
P2P,
protocols::{Disconnect, Handshake, OnConnect, Reading},
};
use snarkos_utilities::{NodeDataDir, SignalHandler, Stoppable};
use snarkvm::{
console::network::Network,
ledger::{
Ledger,
block::{Block, Header},
puzzle::{Puzzle, Solution, SolutionID},
store::ConsensusStorage,
},
prelude::{VM, block::Transaction},
utilities::flatten_error,
};
use aleo_std::StorageMode;
use anyhow::{Context, Result};
use core::future::Future;
use indexmap::IndexMap;
#[cfg(feature = "locktick")]
use locktick::parking_lot::Mutex;
use lru::LruCache;
#[cfg(not(feature = "locktick"))]
use parking_lot::Mutex;
use std::{
net::SocketAddr,
num::NonZeroUsize,
sync::{
Arc,
atomic::{
AtomicUsize,
Ordering::{Acquire, Relaxed},
},
},
time::Duration,
};
use tokio::{
task::JoinHandle,
time::{sleep, timeout},
};
const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20;
const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>);
#[derive(Clone)]
pub struct Client<N: Network, C: ConsensusStorage<N>> {
ledger: Ledger<N, C>,
router: Router<N>,
rest: Option<Rest<N, C, Self>>,
sync: Arc<BlockSync<N>>,
genesis: Block<N>,
puzzle: Puzzle<N>,
solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>,
deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
num_verifying_solutions: Arc<AtomicUsize>,
num_verifying_deploys: Arc<AtomicUsize>,
num_verifying_executions: Arc<AtomicUsize>,
pub(crate) handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
ping: Arc<Ping<N>>,
signal_handler: Arc<SignalHandler>,
}
impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
pub async fn new(
node_ip: SocketAddr,
rest_ip: Option<SocketAddr>,
rest_rps: u32,
account: Account<N>,
trusted_peers: &[SocketAddr],
genesis: Block<N>,
cdn: Option<http::Uri>,
storage_mode: StorageMode,
node_data_dir: NodeDataDir,
trusted_peers_only: bool,
dev: Option<u16>,
signal_handler: Arc<SignalHandler>,
) -> Result<Self> {
let ledger = {
let storage_mode = storage_mode.clone();
let genesis = genesis.clone();
spawn_blocking!(Ledger::<N, C>::load(genesis, storage_mode))
}
.with_context(|| "Failed to initialize the ledger")?;
let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), signal_handler.clone()));
let router = Router::new(
node_ip,
NodeType::Client,
account,
ledger_service.clone(),
trusted_peers,
Self::MAXIMUM_NUMBER_OF_PEERS as u16,
trusted_peers_only,
node_data_dir.clone(),
dev.is_some(),
)
.await?;
let sync = Arc::new(BlockSync::new(ledger_service.clone()));
let locators = sync.get_block_locators()?;
let ping = Arc::new(Ping::new(router.clone(), locators));
let mut node = Self {
ledger: ledger.clone(),
router,
rest: None,
sync: sync.clone(),
genesis,
ping,
puzzle: ledger.puzzle().clone(),
solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))),
execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))),
num_verifying_solutions: Default::default(),
num_verifying_deploys: Default::default(),
num_verifying_executions: Default::default(),
handles: Default::default(),
signal_handler: signal_handler.clone(),
};
let cdn_sync = cdn.map(|base_url| {
trace!("CDN sync is enabled");
Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler))
});
if let Some(rest_ip) = rest_ip {
node.rest = Some(
Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone()), cdn_sync.clone(), sync)
.await?,
);
}
if let Some(cdn_sync) = cdn_sync {
if let Err(error) = cdn_sync.wait().await.with_context(|| "Failed to synchronize from the CDN") {
crate::log_clean_error(&storage_mode);
node.shut_down().await;
return Err(error);
}
}
node.initialize_routing().await;
node.initialize_sync();
node.initialize_solution_verification();
node.initialize_deploy_verification();
node.initialize_execute_verification();
node.handles.lock().push(crate::start_notification_message_loop());
Ok(node)
}
pub fn ledger(&self) -> &Ledger<N, C> {
&self.ledger
}
pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
&self.rest
}
pub fn router(&self) -> &Router<N> {
&self.router
}
}
impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
fn initialize_sync(&self) {
let self_ = self.clone();
self.spawn(async move {
while !self_.signal_handler.is_stopped() {
self_.try_issuing_block_requests().await;
}
info!("Stopped block request generation");
});
let self_ = self.clone();
self.spawn(async move {
while !self_.signal_handler.is_stopped() {
let _ = timeout(Self::MAX_SYNC_INTERVAL, self_.sync.wait_for_block_responses()).await;
self_.try_advancing_block_synchronization().await;
}
debug!("Stopped block response processing");
});
}
async fn try_advancing_block_synchronization(&self) {
let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
Ok(val) => val,
Err(err) => {
error!("Block synchronization failed - {err}");
return;
}
};
if has_new_blocks {
match self.sync.get_block_locators() {
Ok(locators) => self.ping.update_block_locators(locators),
Err(err) => error!("Failed to get block locators: {err}"),
}
}
}
async fn try_issuing_block_requests(&self) {
let _ = timeout(Self::MAX_SYNC_INTERVAL, self.sync.wait_for_peer_update()).await;
self.sync.set_sync_height(self.ledger.latest_height());
match self.sync.handle_block_request_timeouts(&self.router) {
Ok(Some((requests, sync_peers))) => {
self.send_block_requests(requests, sync_peers).await;
return;
}
Ok(None) => {}
Err(err) => {
error!("{}", flatten_error(&err));
return;
}
}
if !self.sync.can_block_sync() {
trace!("Nothing to sync. Will not issue new block requests");
return;
}
let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
Ok(val) => val,
Err(err) => {
error!("{err}");
return;
}
};
if has_new_blocks {
match self.sync.get_block_locators() {
Ok(locators) => self.ping.update_block_locators(locators),
Err(err) => error!("Failed to get block locators: {err}"),
}
if !self.sync.can_block_sync() {
return;
}
}
let (block_requests, sync_peers) = self.sync.prepare_block_requests();
if block_requests.is_empty() {
let total_requests = self.sync.num_total_block_requests();
let num_outstanding = self.sync.num_outstanding_block_requests();
if total_requests > 0 {
trace!(
"Not block synced yet, but there are still {total_requests} in-flight requests. {num_outstanding} are still awaiting responses."
);
} else {
debug!(
"Not block synced yet, and there are no outstanding block requests or \
new block requests to send"
);
}
} else {
self.send_block_requests(block_requests, sync_peers).await;
}
}
async fn send_block_requests(
&self,
block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
) {
for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
if !self.sync.send_block_requests(self.router(), &sync_peers, requests).await {
break;
}
tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
}
}
fn initialize_solution_verification(&self) {
let node = self.clone();
self.spawn(async move {
loop {
if node.signal_handler.is_stopped() {
info!("Shutting down solution verification");
break;
}
let queue_is_empty = node.solution_queue.lock().is_empty();
let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS;
if queue_is_empty || counter_is_full {
sleep(Duration::from_millis(50)).await;
continue;
}
let mut solution_queue = node.solution_queue.lock();
while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() {
let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed);
let _node = node.clone();
tokio::task::spawn_blocking(move || {
if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() {
let prover_address = solution.address();
if _node.ledger.is_solution_limit_reached(&prover_address, 0) {
debug!("Invalid Solution '{}' - Prover '{prover_address}' has reached their solution limit for the current epoch", fmt_id(solution.id()));
}
let proof_target = _node.ledger.latest_block().header().proof_target();
let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target);
match is_valid {
Ok(()) => {
let message = Message::UnconfirmedSolution(serialized);
_node.propagate(message, &[peer_ip]);
}
Err(error) => {
if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}")
}
}
}
} else {
warn!("Failed to retrieve the latest epoch hash.");
}
_node.num_verifying_solutions.fetch_sub(1, Relaxed);
});
if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS {
break;
}
}
}
});
}
fn initialize_deploy_verification(&self) {
let node = self.clone();
self.spawn(async move {
loop {
if node.signal_handler.is_stopped() {
info!("Shutting down deployment verification");
break;
}
let queue_is_empty = node.deploy_queue.lock().is_empty();
let counter_is_full =
node.num_verifying_deploys.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS;
if queue_is_empty || counter_is_full {
sleep(Duration::from_millis(50)).await;
continue;
}
while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() {
let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed);
let _node = node.clone();
tokio::task::spawn_blocking(move || {
let Some(state_root) = transaction.fee_transition().map(|t| t.global_state_root()) else {
debug!("Failed to access global state root for deployment from peer_ip {peer_ip}");
_node.num_verifying_deploys.fetch_sub(1, Relaxed);
return;
};
if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
debug!("Failed to find global state root for deployment from peer_ip {peer_ip}, propagating anyway");
_node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
_node.num_verifying_deploys.fetch_sub(1, Relaxed);
return;
}
match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
Ok(_) => {
_node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
}
Err(error) => {
debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}");
}
}
_node.num_verifying_deploys.fetch_sub(1, Relaxed);
});
if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS {
break;
}
}
}
});
}
fn initialize_execute_verification(&self) {
let node = self.clone();
self.spawn(async move {
loop {
if node.signal_handler.is_stopped() {
info!("Shutting down execution verification");
break;
}
let queue_is_empty = node.execute_queue.lock().is_empty();
let counter_is_full =
node.num_verifying_executions.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS;
if queue_is_empty || counter_is_full {
sleep(Duration::from_millis(50)).await;
continue;
}
while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() {
let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed);
let _node = node.clone();
tokio::task::spawn_blocking(move || {
let state_roots = [
transaction.execution().map(|t| t.global_state_root()),
transaction.fee_transition().map(|t| t.global_state_root()),
]
.into_iter()
.flatten();
for state_root in state_roots {
if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
debug!("Failed to find global state root for execution from peer_ip {peer_ip}, propagating anyway");
_node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
_node.num_verifying_executions.fetch_sub(1, Relaxed);
return;
}
}
match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
Ok(_) => {
_node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
}
Err(error) => {
debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}");
}
}
_node.num_verifying_executions.fetch_sub(1, Relaxed);
});
if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS {
break;
}
}
}
});
}
pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
self.handles.lock().push(tokio::spawn(future));
}
}
#[async_trait]
impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
async fn shut_down(&self) {
info!("Shutting down...");
trace!("Shutting down the node...");
trace!("Shutting down the client...");
self.handles.lock().iter().for_each(|handle| handle.abort());
self.router.shut_down().await;
info!("Node has shut down.");
}
}