use crate::{
BootstrapClient,
Client,
Prover,
Validator,
network::{NodeType, Peer, PeerPoolHandling},
router::Outbound,
traits::NodeInterface,
};
use snarkos_account::Account;
use snarkos_utilities::{NodeDataDir, SignalHandler};
use snarkvm::prelude::{
Address,
Header,
Ledger,
Network,
PrivateKey,
ViewKey,
block::Block,
store::helpers::{memory::ConsensusMemory, rocksdb::ConsensusDB},
};
use aleo_std::{StorageMode, aleo_ledger_dir};
use anyhow::{Result, bail};
#[cfg(feature = "locktick")]
use locktick::parking_lot::RwLock;
#[cfg(not(feature = "locktick"))]
use parking_lot::RwLock;
use std::{cmp, collections::HashMap, fs, net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc, time::Duration};
use tokio::task;
const CHECKPOINT_BLOCK_FREQUENCY: u32 = 1000;
const MAX_AUTO_CHECKPOINTS: usize = 5;
#[derive(Clone)]
pub enum Node<N: Network> {
Validator(Arc<Validator<N, ConsensusDB<N>>>),
Prover(Arc<Prover<N, ConsensusMemory<N>>>),
Client(Arc<Client<N, ConsensusDB<N>>>),
BootstrapClient(BootstrapClient<N>),
}
impl<N: Network> Node<N> {
pub async fn new_validator(
node_ip: SocketAddr,
bft_ip: Option<SocketAddr>,
rest_ip: Option<SocketAddr>,
rest_rps: u32,
account: Account<N>,
trusted_peers: &[SocketAddr],
trusted_validators: &[SocketAddr],
genesis: Block<N>,
cdn: Option<http::Uri>,
storage_mode: StorageMode,
node_data_dir: NodeDataDir,
trusted_peers_only: bool,
auto_db_checkpoints: Option<PathBuf>,
dev_txs: bool,
dev: Option<u16>,
signal_handler: Arc<SignalHandler>,
) -> Result<Self> {
let validator = Arc::new(
Validator::new(
node_ip,
bft_ip,
rest_ip,
rest_rps,
account,
trusted_peers,
trusted_validators,
genesis,
cdn,
storage_mode,
node_data_dir,
trusted_peers_only,
dev_txs,
dev,
signal_handler,
)
.await?,
);
let node = Self::Validator(validator.clone());
if let Some(path) = auto_db_checkpoints {
if let Some(handle) = node.perform_auto_checkpoints(path)? {
validator.handles.lock().push(handle);
}
}
Ok(node)
}
pub async fn new_prover(
node_ip: SocketAddr,
account: Account<N>,
trusted_peers: &[SocketAddr],
genesis: Block<N>,
node_data_dir: NodeDataDir,
trusted_peers_only: bool,
dev: Option<u16>,
signal_handler: Arc<SignalHandler>,
) -> Result<Self> {
Ok(Self::Prover(Arc::new(
Prover::new(
node_ip,
account,
trusted_peers,
genesis,
node_data_dir,
trusted_peers_only,
dev,
signal_handler,
)
.await?,
)))
}
pub async fn new_client(
node_ip: SocketAddr,
rest_ip: Option<SocketAddr>,
rest_rps: u32,
account: Account<N>,
trusted_peers: &[SocketAddr],
genesis: Block<N>,
cdn: Option<http::Uri>,
storage_mode: StorageMode,
node_data_dir: NodeDataDir,
trusted_peers_only: bool,
auto_db_checkpoints: Option<PathBuf>,
dev: Option<u16>,
signal_handler: Arc<SignalHandler>,
) -> Result<Self> {
let client = Arc::new(
Client::new(
node_ip,
rest_ip,
rest_rps,
account,
trusted_peers,
genesis,
cdn,
storage_mode,
node_data_dir,
trusted_peers_only,
dev,
signal_handler,
)
.await?,
);
let node = Self::Client(client.clone());
if let Some(path) = auto_db_checkpoints {
if let Some(handle) = node.perform_auto_checkpoints(path)? {
client.handles.lock().push(handle);
}
}
Ok(node)
}
pub async fn new_bootstrap_client(
listener_addr: SocketAddr,
account: Account<N>,
genesis_header: Header<N>,
dev: Option<u16>,
) -> Result<Self> {
Ok(Self::BootstrapClient(BootstrapClient::new(listener_addr, account, genesis_header, dev).await?))
}
pub fn node_type(&self) -> NodeType {
match self {
Self::Validator(validator) => validator.node_type(),
Self::Prover(prover) => prover.node_type(),
Self::Client(client) => client.node_type(),
Self::BootstrapClient(_) => NodeType::BootstrapClient,
}
}
pub fn private_key(&self) -> &PrivateKey<N> {
match self {
Self::Validator(node) => node.private_key(),
Self::Prover(node) => node.private_key(),
Self::Client(node) => node.private_key(),
Self::BootstrapClient(node) => node.private_key(),
}
}
pub fn view_key(&self) -> &ViewKey<N> {
match self {
Self::Validator(node) => node.view_key(),
Self::Prover(node) => node.view_key(),
Self::Client(node) => node.view_key(),
Self::BootstrapClient(node) => node.view_key(),
}
}
pub fn address(&self) -> Address<N> {
match self {
Self::Validator(node) => node.address(),
Self::Prover(node) => node.address(),
Self::Client(node) => node.address(),
Self::BootstrapClient(node) => node.address(),
}
}
pub fn is_dev(&self) -> bool {
match self {
Self::Validator(node) => node.is_dev(),
Self::Prover(node) => node.is_dev(),
Self::Client(node) => node.is_dev(),
Self::BootstrapClient(node) => node.is_dev(),
}
}
pub fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
match self {
Self::Validator(validator) => validator.router().peer_pool(),
Self::Prover(prover) => prover.router().peer_pool(),
Self::Client(client) => client.router().peer_pool(),
Self::BootstrapClient(client) => client.peer_pool(),
}
}
pub fn ledger(&self) -> Option<&Ledger<N, ConsensusDB<N>>> {
match self {
Self::Validator(node) => Some(node.ledger()),
Self::Prover(_) => None,
Self::Client(node) => Some(node.ledger()),
Self::BootstrapClient(_) => None,
}
}
pub fn is_block_synced(&self) -> bool {
match self {
Self::Validator(node) => node.is_block_synced(),
Self::Prover(node) => node.is_block_synced(),
Self::Client(node) => node.is_block_synced(),
Self::BootstrapClient(_) => true,
}
}
pub fn num_blocks_behind(&self) -> Option<u32> {
match self {
Self::Validator(node) => node.num_blocks_behind(),
Self::Prover(node) => node.num_blocks_behind(),
Self::Client(node) => node.num_blocks_behind(),
Self::BootstrapClient(_) => Some(0),
}
}
pub fn get_sync_speed(&self) -> f64 {
match self {
Self::Validator(node) => node.get_sync_speed(),
Self::Prover(node) => node.get_sync_speed(),
Self::Client(node) => node.get_sync_speed(),
Self::BootstrapClient(_) => 0.0,
}
}
pub async fn shut_down(&self) {
match self {
Self::Validator(node) => node.shut_down().await,
Self::Prover(node) => node.shut_down().await,
Self::Client(node) => node.shut_down().await,
Self::BootstrapClient(node) => node.shut_down().await,
}
}
pub async fn wait_for_signals(&self, signal_handler: &SignalHandler) {
match self {
Self::Validator(node) => node.wait_for_signals(signal_handler).await,
Self::Prover(node) => node.wait_for_signals(signal_handler).await,
Self::Client(node) => node.wait_for_signals(signal_handler).await,
Self::BootstrapClient(node) => node.wait_for_signals(signal_handler).await,
}
}
pub fn perform_auto_checkpoints(&self, auto_checkpoint_path: PathBuf) -> Result<Option<task::JoinHandle<()>>> {
let Some(ledger) = self.ledger().cloned() else {
return Ok(None);
};
if !auto_checkpoint_path.exists() {
if let Err(e) = fs::create_dir_all(&auto_checkpoint_path) {
bail!("Couldn't create the specified path for the automatic ledger checkpoints: {e}");
}
} else if auto_checkpoint_path.exists() && !auto_checkpoint_path.is_dir() {
bail!("The specified path for automatic ledger checkpoints is not a directory");
}
let handle = tokio::spawn(async move {
info!("Starting the automatic ledger checkpoint routine...");
let mut last_checkpoint_height = None;
let mut existing_checkpoints = Vec::with_capacity(MAX_AUTO_CHECKPOINTS + 1);
let mut block_tree_path = aleo_ledger_dir(N::ID, ledger.vm().block_store().storage_mode());
block_tree_path.push("block_tree");
loop {
tokio::time::sleep(Duration::from_millis(500)).await;
let current_height = ledger.vm().block_store().current_block_height();
if last_checkpoint_height.is_some_and(|checkpoint_height| {
current_height.saturating_sub(checkpoint_height) < CHECKPOINT_BLOCK_FREQUENCY
}) {
continue;
}
let mut checkpoint_path = auto_checkpoint_path.clone();
checkpoint_path.push(format!("checkpoint_{current_height}"));
if let Err(e) = ledger.backup_database(&checkpoint_path) {
warn!("Couldn't automatically store a checkpoint at {}: {e}", checkpoint_path.display());
continue;
}
last_checkpoint_height = Some(current_height);
let ledger_clone = ledger.clone();
let source_block_tree_path = block_tree_path.clone();
tokio::spawn(async move {
if let Err(e) = ledger_clone.cache_block_tree() {
warn!("Couldn't cache the block tree for a ledger checkpoint: {e}");
return;
}
checkpoint_path.push("block_tree");
if let Err(e) = fs::copy(source_block_tree_path, checkpoint_path) {
warn!("Couldn't copy the block tree file to a ledger checkpoint: {e}");
}
});
existing_checkpoints.clear();
let checkpoint_dir = match auto_checkpoint_path.read_dir() {
Ok(dir) => dir,
Err(e) => {
warn!("IO error while accessing the automatic checkpoints: {e}");
continue;
}
};
for entry in checkpoint_dir {
let entry = match entry {
Ok(entry) => entry,
Err(e) => {
warn!("IO error while counting the automatic checkpoints: {e}");
continue;
}
};
let path = entry.path();
if !path.is_dir() {
continue;
}
let file_name = entry.file_name().into_string().unwrap(); let mut name_iter = file_name.split("_");
if name_iter.next() != Some("checkpoint") {
continue;
}
let Some(height) = name_iter.next() else {
continue;
};
let Ok(height) = u32::from_str(height) else {
continue;
};
existing_checkpoints.push((path, height));
}
existing_checkpoints.sort_unstable_by_key(|(_, height)| cmp::Reverse(*height));
let surplus_checkpoints = existing_checkpoints.len().saturating_sub(MAX_AUTO_CHECKPOINTS);
for _ in 0..surplus_checkpoints {
if let Some((checkpoint_path, _)) = existing_checkpoints.pop() {
if let Err(e) = fs::remove_dir_all(checkpoint_path) {
warn!("Couldn't remove an automatic ledger checkpoint: {e}");
}
}
}
}
});
Ok(Some(handle))
}
}