1use crate::{
17 BootstrapClient,
18 Client,
19 Prover,
20 Validator,
21 network::{NodeType, Peer, PeerPoolHandling},
22 router::Outbound,
23 traits::NodeInterface,
24};
25
26use snarkos_account::Account;
27use snarkos_utilities::{NodeDataDir, SignalHandler};
28
29use snarkvm::prelude::{
30 Address,
31 Header,
32 Ledger,
33 Network,
34 PrivateKey,
35 ViewKey,
36 block::Block,
37 store::helpers::{memory::ConsensusMemory, rocksdb::ConsensusDB},
38};
39
40use aleo_std::{StorageMode, aleo_ledger_dir};
41use anyhow::{Result, bail};
42
43#[cfg(feature = "locktick")]
44use locktick::parking_lot::RwLock;
45#[cfg(not(feature = "locktick"))]
46use parking_lot::RwLock;
47use std::{cmp, collections::HashMap, fs, net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc, time::Duration};
48use tokio::task;
49
50const CHECKPOINT_BLOCK_FREQUENCY: u32 = 1000;
52
53const MAX_AUTO_CHECKPOINTS: usize = 5;
55
56#[derive(Clone)]
57pub enum Node<N: Network> {
58 Validator(Arc<Validator<N, ConsensusDB<N>>>),
60 Prover(Arc<Prover<N, ConsensusMemory<N>>>),
62 Client(Arc<Client<N, ConsensusDB<N>>>),
64 BootstrapClient(BootstrapClient<N>),
66}
67
68impl<N: Network> Node<N> {
69 pub async fn new_validator(
71 node_ip: SocketAddr,
72 bft_ip: Option<SocketAddr>,
73 rest_ip: Option<SocketAddr>,
74 rest_rps: u32,
75 account: Account<N>,
76 trusted_peers: &[SocketAddr],
77 trusted_validators: &[SocketAddr],
78 genesis: Block<N>,
79 cdn: Option<http::Uri>,
80 storage_mode: StorageMode,
81 node_data_dir: NodeDataDir,
82 trusted_peers_only: bool,
83 auto_db_checkpoints: Option<PathBuf>,
84 dev_txs: bool,
85 dev: Option<u16>,
86 signal_handler: Arc<SignalHandler>,
87 ) -> Result<Self> {
88 let validator = Arc::new(
89 Validator::new(
90 node_ip,
91 bft_ip,
92 rest_ip,
93 rest_rps,
94 account,
95 trusted_peers,
96 trusted_validators,
97 genesis,
98 cdn,
99 storage_mode,
100 node_data_dir,
101 trusted_peers_only,
102 dev_txs,
103 dev,
104 signal_handler,
105 )
106 .await?,
107 );
108
109 let node = Self::Validator(validator.clone());
110
111 if let Some(path) = auto_db_checkpoints {
113 if let Some(handle) = node.perform_auto_checkpoints(path)? {
114 validator.handles.lock().push(handle);
115 }
116 }
117
118 Ok(node)
119 }
120
121 pub async fn new_prover(
123 node_ip: SocketAddr,
124 account: Account<N>,
125 trusted_peers: &[SocketAddr],
126 genesis: Block<N>,
127 node_data_dir: NodeDataDir,
128 trusted_peers_only: bool,
129 dev: Option<u16>,
130 signal_handler: Arc<SignalHandler>,
131 ) -> Result<Self> {
132 Ok(Self::Prover(Arc::new(
133 Prover::new(
134 node_ip,
135 account,
136 trusted_peers,
137 genesis,
138 node_data_dir,
139 trusted_peers_only,
140 dev,
141 signal_handler,
142 )
143 .await?,
144 )))
145 }
146
147 pub async fn new_client(
149 node_ip: SocketAddr,
150 rest_ip: Option<SocketAddr>,
151 rest_rps: u32,
152 account: Account<N>,
153 trusted_peers: &[SocketAddr],
154 genesis: Block<N>,
155 cdn: Option<http::Uri>,
156 storage_mode: StorageMode,
157 node_data_dir: NodeDataDir,
158 trusted_peers_only: bool,
159 auto_db_checkpoints: Option<PathBuf>,
160 dev: Option<u16>,
161 signal_handler: Arc<SignalHandler>,
162 ) -> Result<Self> {
163 let client = Arc::new(
164 Client::new(
165 node_ip,
166 rest_ip,
167 rest_rps,
168 account,
169 trusted_peers,
170 genesis,
171 cdn,
172 storage_mode,
173 node_data_dir,
174 trusted_peers_only,
175 dev,
176 signal_handler,
177 )
178 .await?,
179 );
180
181 let node = Self::Client(client.clone());
182
183 if let Some(path) = auto_db_checkpoints {
185 if let Some(handle) = node.perform_auto_checkpoints(path)? {
186 client.handles.lock().push(handle);
187 }
188 }
189
190 Ok(node)
191 }
192
193 pub async fn new_bootstrap_client(
195 listener_addr: SocketAddr,
196 account: Account<N>,
197 genesis_header: Header<N>,
198 dev: Option<u16>,
199 ) -> Result<Self> {
200 Ok(Self::BootstrapClient(BootstrapClient::new(listener_addr, account, genesis_header, dev).await?))
201 }
202
203 pub fn node_type(&self) -> NodeType {
205 match self {
206 Self::Validator(validator) => validator.node_type(),
207 Self::Prover(prover) => prover.node_type(),
208 Self::Client(client) => client.node_type(),
209 Self::BootstrapClient(_) => NodeType::BootstrapClient,
210 }
211 }
212
213 pub fn private_key(&self) -> &PrivateKey<N> {
215 match self {
216 Self::Validator(node) => node.private_key(),
217 Self::Prover(node) => node.private_key(),
218 Self::Client(node) => node.private_key(),
219 Self::BootstrapClient(node) => node.private_key(),
220 }
221 }
222
223 pub fn view_key(&self) -> &ViewKey<N> {
225 match self {
226 Self::Validator(node) => node.view_key(),
227 Self::Prover(node) => node.view_key(),
228 Self::Client(node) => node.view_key(),
229 Self::BootstrapClient(node) => node.view_key(),
230 }
231 }
232
233 pub fn address(&self) -> Address<N> {
235 match self {
236 Self::Validator(node) => node.address(),
237 Self::Prover(node) => node.address(),
238 Self::Client(node) => node.address(),
239 Self::BootstrapClient(node) => node.address(),
240 }
241 }
242
243 pub fn is_dev(&self) -> bool {
245 match self {
246 Self::Validator(node) => node.is_dev(),
247 Self::Prover(node) => node.is_dev(),
248 Self::Client(node) => node.is_dev(),
249 Self::BootstrapClient(node) => node.is_dev(),
250 }
251 }
252
253 pub fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
255 match self {
256 Self::Validator(validator) => validator.router().peer_pool(),
257 Self::Prover(prover) => prover.router().peer_pool(),
258 Self::Client(client) => client.router().peer_pool(),
259 Self::BootstrapClient(client) => client.peer_pool(),
260 }
261 }
262
263 pub fn ledger(&self) -> Option<&Ledger<N, ConsensusDB<N>>> {
265 match self {
266 Self::Validator(node) => Some(node.ledger()),
267 Self::Prover(_) => None,
268 Self::Client(node) => Some(node.ledger()),
269 Self::BootstrapClient(_) => None,
270 }
271 }
272
273 pub fn is_block_synced(&self) -> bool {
275 match self {
276 Self::Validator(node) => node.is_block_synced(),
277 Self::Prover(node) => node.is_block_synced(),
278 Self::Client(node) => node.is_block_synced(),
279 Self::BootstrapClient(_) => true,
280 }
281 }
282
283 pub fn num_blocks_behind(&self) -> Option<u32> {
286 match self {
287 Self::Validator(node) => node.num_blocks_behind(),
288 Self::Prover(node) => node.num_blocks_behind(),
289 Self::Client(node) => node.num_blocks_behind(),
290 Self::BootstrapClient(_) => Some(0),
291 }
292 }
293
294 pub fn get_sync_speed(&self) -> f64 {
297 match self {
298 Self::Validator(node) => node.get_sync_speed(),
299 Self::Prover(node) => node.get_sync_speed(),
300 Self::Client(node) => node.get_sync_speed(),
301 Self::BootstrapClient(_) => 0.0,
302 }
303 }
304
305 pub async fn shut_down(&self) {
307 match self {
308 Self::Validator(node) => node.shut_down().await,
309 Self::Prover(node) => node.shut_down().await,
310 Self::Client(node) => node.shut_down().await,
311 Self::BootstrapClient(node) => node.shut_down().await,
312 }
313 }
314
315 pub async fn wait_for_signals(&self, signal_handler: &SignalHandler) {
317 match self {
318 Self::Validator(node) => node.wait_for_signals(signal_handler).await,
319 Self::Prover(node) => node.wait_for_signals(signal_handler).await,
320 Self::Client(node) => node.wait_for_signals(signal_handler).await,
321 Self::BootstrapClient(node) => node.wait_for_signals(signal_handler).await,
322 }
323 }
324
325 pub fn perform_auto_checkpoints(&self, auto_checkpoint_path: PathBuf) -> Result<Option<task::JoinHandle<()>>> {
327 let Some(ledger) = self.ledger().cloned() else {
329 return Ok(None);
330 };
331
332 if !auto_checkpoint_path.exists() {
334 if let Err(e) = fs::create_dir_all(&auto_checkpoint_path) {
335 bail!("Couldn't create the specified path for the automatic ledger checkpoints: {e}");
336 }
337 } else if auto_checkpoint_path.exists() && !auto_checkpoint_path.is_dir() {
338 bail!("The specified path for automatic ledger checkpoints is not a directory");
339 }
340
341 let handle = tokio::spawn(async move {
343 info!("Starting the automatic ledger checkpoint routine...");
344
345 let mut last_checkpoint_height = None;
347 let mut existing_checkpoints = Vec::with_capacity(MAX_AUTO_CHECKPOINTS + 1);
348 let mut block_tree_path = aleo_ledger_dir(N::ID, ledger.vm().block_store().storage_mode());
349 block_tree_path.push("block_tree");
350
351 loop {
352 tokio::time::sleep(Duration::from_millis(500)).await;
356
357 let current_height = ledger.vm().block_store().current_block_height();
360 if last_checkpoint_height.is_some_and(|checkpoint_height| {
361 current_height.saturating_sub(checkpoint_height) < CHECKPOINT_BLOCK_FREQUENCY
362 }) {
363 continue;
364 }
365
366 let mut checkpoint_path = auto_checkpoint_path.clone();
368 checkpoint_path.push(format!("checkpoint_{current_height}"));
369 if let Err(e) = ledger.backup_database(&checkpoint_path) {
370 warn!("Couldn't automatically store a checkpoint at {}: {e}", checkpoint_path.display());
371 continue;
372 }
373 last_checkpoint_height = Some(current_height);
374
375 let ledger_clone = ledger.clone();
377 let source_block_tree_path = block_tree_path.clone();
378 tokio::spawn(async move {
379 if let Err(e) = ledger_clone.cache_block_tree() {
380 warn!("Couldn't cache the block tree for a ledger checkpoint: {e}");
381 return;
382 }
383
384 checkpoint_path.push("block_tree");
386 if let Err(e) = fs::copy(source_block_tree_path, checkpoint_path) {
387 warn!("Couldn't copy the block tree file to a ledger checkpoint: {e}");
388 }
389 });
390
391 existing_checkpoints.clear();
393 let checkpoint_dir = match auto_checkpoint_path.read_dir() {
394 Ok(dir) => dir,
395 Err(e) => {
396 warn!("IO error while accessing the automatic checkpoints: {e}");
397 continue;
398 }
399 };
400 for entry in checkpoint_dir {
401 let entry = match entry {
403 Ok(entry) => entry,
404 Err(e) => {
405 warn!("IO error while counting the automatic checkpoints: {e}");
406 continue;
407 }
408 };
409
410 let path = entry.path();
412 if !path.is_dir() {
413 continue;
414 }
415
416 let file_name = entry.file_name().into_string().unwrap(); let mut name_iter = file_name.split("_");
419 if name_iter.next() != Some("checkpoint") {
420 continue;
421 }
422 let Some(height) = name_iter.next() else {
423 continue;
424 };
425 let Ok(height) = u32::from_str(height) else {
426 continue;
427 };
428 existing_checkpoints.push((path, height));
429 }
430 existing_checkpoints.sort_unstable_by_key(|(_, height)| cmp::Reverse(*height));
431
432 let surplus_checkpoints = existing_checkpoints.len().saturating_sub(MAX_AUTO_CHECKPOINTS);
434 for _ in 0..surplus_checkpoints {
435 if let Some((checkpoint_path, _)) = existing_checkpoints.pop() {
436 if let Err(e) = fs::remove_dir_all(checkpoint_path) {
437 warn!("Couldn't remove an automatic ledger checkpoint: {e}");
438 }
439 }
440 }
441 }
442 });
443
444 Ok(Some(handle))
445 }
446}