amareleo_node/validator/
mod.rs1use amareleo_chain_account::Account;
17use amareleo_chain_tracing::TracingHandler;
18use amareleo_node_bft::{helpers::init_primary_channels, ledger_service::CoreLedgerService};
19use amareleo_node_consensus::Consensus;
20use amareleo_node_rest::Rest;
21
22use snarkvm::prelude::{Ledger, Network, block::Block, store::ConsensusStorage};
23
24use aleo_std::StorageMode;
25use anyhow::Result;
26use core::future::Future;
27use parking_lot::Mutex;
28use std::{
29 net::SocketAddr,
30 sync::{Arc, atomic::AtomicBool},
31};
32use tokio::task::JoinHandle;
33use tracing::subscriber::DefaultGuard;
34
35#[derive(Clone)]
37pub struct Validator<N: Network, C: ConsensusStorage<N>> {
38 ledger: Ledger<N, C>,
40 consensus: Consensus<N>,
42 rest: Option<Rest<N, C>>,
44 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
46 tracing: Option<TracingHandler>,
48 shutdown: Arc<AtomicBool>,
50}
51
52impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
53 pub async fn new(
55 rest_ip: SocketAddr,
56 rest_rps: u32,
57 account: Account<N>,
58 genesis: Block<N>,
59 keep_state: bool,
60 storage_mode: StorageMode,
61 tracing: Option<TracingHandler>,
62 shutdown: Arc<AtomicBool>,
63 ) -> Result<Self> {
64 let ledger = Ledger::load(genesis, storage_mode.clone())?;
66
67 let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), tracing.clone(), shutdown.clone()));
69
70 let mut consensus =
72 Consensus::new(account.clone(), ledger_service.clone(), keep_state, storage_mode.clone(), tracing.clone())?;
73 let (primary_sender, primary_receiver) = init_primary_channels::<N>();
75 consensus.run(primary_sender, primary_receiver).await?;
77
78 let mut node = Self {
80 ledger: ledger.clone(),
81 consensus: consensus.clone(),
82 rest: None,
83 handles: Default::default(),
84 tracing: tracing.clone(),
85 shutdown,
86 };
87
88 node.rest = Some(Rest::start(rest_ip, rest_rps, Some(consensus), ledger.clone(), tracing.clone()).await?);
90
91 node.handles.lock().push(crate::start_notification_message_loop());
93
94 Ok(node)
96 }
97
98 pub fn ledger(&self) -> &Ledger<N, C> {
100 &self.ledger
101 }
102
103 pub fn rest(&self) -> &Option<Rest<N, C>> {
105 &self.rest
106 }
107}
108
109impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
110 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
112 self.handles.lock().push(tokio::spawn(future));
113 }
114}
115
116impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
117 pub fn get_tracing_guard(&self) -> Option<DefaultGuard> {
119 self.tracing.clone().map(|trace_handle| trace_handle.subscribe_thread())
120 }
121
122 pub async fn shut_down(&self) {
124 let _guard = self.get_tracing_guard();
125
126 info!("Shutting down...");
127
128 if let Some(rest) = &self.rest {
130 trace!("Shutting down the REST server...");
131 rest.shut_down().await;
132 }
133
134 trace!("Shutting down the node...");
136 self.shutdown.store(true, std::sync::atomic::Ordering::Release);
137
138 trace!("Shutting down the validator...");
140 self.handles.lock().iter().for_each(|handle| handle.abort());
141
142 trace!("Shutting down consensus...");
144 self.consensus.shut_down().await;
145
146 info!("Node has shut down.");
147 info!("");
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use amareleo_node_bft::DEVELOPMENT_MODE_RNG_SEED;
155
156 use snarkvm::prelude::{
157 MainnetV0,
158 VM,
159 store::{ConsensusStore, helpers::memory::ConsensusMemory},
160 };
161
162 use anyhow::bail;
163 use rand::SeedableRng;
164 use rand_chacha::ChaChaRng;
165 use std::str::FromStr;
166
167 type CurrentNetwork = MainnetV0;
168
169 #[ignore]
171 #[tokio::test]
172 async fn test_profiler() -> Result<()> {
173 let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
175 let storage_mode = StorageMode::Development(0);
176
177 let mut rng = ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
179 let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
181 let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(None)?)?;
183 let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
185
186 println!("Initializing validator node...");
187
188 let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
189 rest,
190 10,
191 account,
192 genesis,
193 false,
194 storage_mode,
195 None,
196 Default::default(),
197 )
198 .await
199 .unwrap();
200
201 println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
202
203 bail!("\n\nRemember to #[ignore] this test!\n\n")
204 }
205}