amareleo_node/validator/
mod.rsuse amareleo_chain_account::Account;
use amareleo_node_bft::{helpers::init_primary_channels, ledger_service::CoreLedgerService};
use amareleo_node_consensus::Consensus;
use amareleo_node_rest::Rest;
use snarkvm::prelude::{Ledger, Network, block::Block, store::ConsensusStorage};
use aleo_std::StorageMode;
use anyhow::Result;
use core::future::Future;
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use std::{
io,
net::SocketAddr,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::Duration,
};
use tokio::task::JoinHandle;
#[derive(Clone)]
pub struct Validator<N: Network, C: ConsensusStorage<N>> {
ledger: Ledger<N, C>,
consensus: Consensus<N>,
rest: Option<Rest<N, C>>,
handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
shutdown: Arc<AtomicBool>,
}
impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
pub async fn new(
rest_ip: Option<SocketAddr>,
rest_rps: u32,
account: Account<N>,
genesis: Block<N>,
keep_state: bool,
storage_mode: StorageMode,
shutdown: Arc<AtomicBool>,
) -> Result<Self> {
let signal_node = Self::handle_signals(keep_state, shutdown.clone());
let ledger = Ledger::load(genesis, storage_mode.clone())?;
let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone()));
let mut consensus = Consensus::new(account.clone(), ledger_service.clone(), keep_state, storage_mode.clone())?;
let (primary_sender, primary_receiver) = init_primary_channels::<N>();
consensus.run(primary_sender, primary_receiver).await?;
let mut node = Self {
ledger: ledger.clone(),
consensus: consensus.clone(),
rest: None,
handles: Default::default(),
shutdown,
};
if let Some(rest_ip) = rest_ip {
node.rest = Some(Rest::start(rest_ip, rest_rps, Some(consensus), ledger.clone()).await?);
}
node.handles.lock().push(crate::start_notification_message_loop());
let _ = signal_node.set(node.clone());
Ok(node)
}
pub fn ledger(&self) -> &Ledger<N, C> {
&self.ledger
}
pub fn rest(&self) -> &Option<Rest<N, C>> {
&self.rest
}
}
impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
fn handle_signals(keep_state: bool, shutdown_flag: Arc<AtomicBool>) -> Arc<OnceCell<Self>> {
let node: Arc<OnceCell<Self>> = Default::default();
#[cfg(target_family = "unix")]
fn signal_listener() -> impl Future<Output = io::Result<()>> {
use tokio::signal::unix::{SignalKind, signal};
let mut s_int = signal(SignalKind::interrupt()).unwrap();
let mut s_term = signal(SignalKind::terminate()).unwrap();
let mut s_quit = signal(SignalKind::quit()).unwrap();
let mut s_hup = signal(SignalKind::hangup()).unwrap();
async move {
tokio::select!(
_ = s_int.recv() => (),
_ = s_term.recv() => (),
_ = s_quit.recv() => (),
_ = s_hup.recv() => (),
);
Ok(())
}
}
#[cfg(not(target_family = "unix"))]
fn signal_listener() -> impl Future<Output = io::Result<()>> {
tokio::signal::ctrl_c()
}
let node_clone = node.clone();
tokio::task::spawn(async move {
match signal_listener().await {
Ok(()) => {
if !keep_state {
info!("================================================================");
info!(" Node state preservation not required. Terminating immediately. ");
info!("================================================================");
std::process::exit(0);
}
warn!("==========================================================================================");
warn!("⚠️ Attention - Starting the graceful shutdown procedure (ETA: 30 seconds)...");
warn!("⚠️ Attention - Avoid DATA CORRUPTION, do NOT interrupt amareleo (or press Ctrl+C again)");
warn!("⚠️ Attention - Please wait until the shutdown gracefully completes (ETA: 30 seconds)");
warn!("==========================================================================================");
match node_clone.get() {
Some(node) => node.shut_down().await,
None => shutdown_flag.store(true, Ordering::Relaxed),
}
tokio::time::sleep(Duration::from_secs(3)).await;
std::process::exit(0);
}
Err(error) => error!("tokio::signal::ctrl_c encountered an error: {}", error),
}
});
node
}
pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
self.handles.lock().push(tokio::spawn(future));
}
}
impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
async fn shut_down(&self) {
info!("Shutting down...");
trace!("Shutting down the node...");
self.shutdown.store(true, std::sync::atomic::Ordering::Release);
trace!("Shutting down the validator...");
self.handles.lock().iter().for_each(|handle| handle.abort());
trace!("Shutting down consensus...");
self.consensus.shut_down().await;
info!("Node has shut down.");
}
}
#[cfg(test)]
mod tests {
use super::*;
use snarkvm::prelude::{
MainnetV0,
VM,
store::{ConsensusStore, helpers::memory::ConsensusMemory},
};
use anyhow::bail;
use rand::SeedableRng;
use rand_chacha::ChaChaRng;
use std::str::FromStr;
type CurrentNetwork = MainnetV0;
#[ignore]
#[tokio::test]
async fn test_profiler() -> Result<()> {
let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
let storage_mode = StorageMode::Development(0);
let mut rng = ChaChaRng::seed_from_u64(1234567890u64);
let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(None)?)?;
let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
println!("Initializing validator node...");
let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
Some(rest),
10,
account,
genesis,
false,
storage_mode,
Default::default(),
)
.await
.unwrap();
println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
bail!("\n\nRemember to #[ignore] this test!\n\n")
}
}