1use crate::{
17 BootstrapClient,
18 Client,
19 Prover,
20 Validator,
21 network::{NodeType, Peer, PeerPoolHandling},
22 router::Outbound,
23 traits::NodeInterface,
24};
25
26use snarkos_account::Account;
27use snarkos_utilities::{NodeDataDir, SignalHandler};
28
29use snarkvm::prelude::{
30 Address,
31 Header,
32 Ledger,
33 Network,
34 PrivateKey,
35 ViewKey,
36 block::Block,
37 store::helpers::{memory::ConsensusMemory, rocksdb::ConsensusDB},
38};
39
40use aleo_std::{StorageMode, aleo_ledger_dir};
41use anyhow::{Result, bail};
42
43#[cfg(feature = "locktick")]
44use locktick::parking_lot::RwLock;
45#[cfg(not(feature = "locktick"))]
46use parking_lot::RwLock;
47use std::{
48 cmp,
49 collections::HashMap,
50 fs,
51 net::SocketAddr,
52 path::{Path, PathBuf},
53 str::FromStr,
54 sync::Arc,
55 time::Duration,
56};
57use tokio::task;
58
59const CHECKPOINT_BLOCK_FREQUENCY: u32 = 1000;
61
62const MAX_AUTO_CHECKPOINTS: usize = 5;
64
65fn existing_startup_checkpoint_height(auto_checkpoint_path: &Path, startup_height: u32) -> Option<u32> {
66 let mut checkpoint_path = auto_checkpoint_path.to_path_buf();
67 checkpoint_path.push(format!("checkpoint_{startup_height}"));
68 checkpoint_path.is_dir().then_some(startup_height)
69}
70
71#[derive(Clone)]
72pub enum Node<N: Network> {
73 Validator(Arc<Validator<N, ConsensusDB<N>>>),
75 Prover(Arc<Prover<N, ConsensusMemory<N>>>),
77 Client(Arc<Client<N, ConsensusDB<N>>>),
79 BootstrapClient(BootstrapClient<N>),
81}
82
83impl<N: Network> Node<N> {
84 pub async fn new_validator(
86 node_ip: SocketAddr,
87 bft_ip: Option<SocketAddr>,
88 rest_ip: Option<SocketAddr>,
89 rest_rps: u32,
90 account: Account<N>,
91 trusted_peers: &[SocketAddr],
92 trusted_validators: &[SocketAddr],
93 genesis: Block<N>,
94 cdn: Option<http::Uri>,
95 storage_mode: StorageMode,
96 node_data_dir: NodeDataDir,
97 trusted_peers_only: bool,
98 auto_db_checkpoints: Option<PathBuf>,
99 dev_txs: bool,
100 dev: Option<u16>,
101 slipstream_configs: &[PathBuf],
102 dev_num_validators_for_committee_hotswap: Option<u16>,
103 signal_handler: Arc<SignalHandler>,
104 ) -> Result<Self> {
105 let validator = Arc::new(
106 Validator::new(
107 node_ip,
108 bft_ip,
109 rest_ip,
110 rest_rps,
111 account,
112 trusted_peers,
113 trusted_validators,
114 genesis,
115 cdn,
116 storage_mode,
117 node_data_dir,
118 trusted_peers_only,
119 dev_txs,
120 dev,
121 slipstream_configs,
122 dev_num_validators_for_committee_hotswap,
123 signal_handler,
124 )
125 .await?,
126 );
127
128 let node = Self::Validator(validator.clone());
129
130 if let Some(path) = auto_db_checkpoints {
132 if let Some(handle) = node.perform_auto_checkpoints(path)? {
133 validator.handles.lock().push(handle);
134 }
135 }
136
137 Ok(node)
138 }
139
140 pub async fn new_prover(
142 node_ip: SocketAddr,
143 account: Account<N>,
144 trusted_peers: &[SocketAddr],
145 genesis: Block<N>,
146 node_data_dir: NodeDataDir,
147 trusted_peers_only: bool,
148 dev: Option<u16>,
149 signal_handler: Arc<SignalHandler>,
150 ) -> Result<Self> {
151 Ok(Self::Prover(Arc::new(
152 Prover::new(
153 node_ip,
154 account,
155 trusted_peers,
156 genesis,
157 node_data_dir,
158 trusted_peers_only,
159 dev,
160 signal_handler,
161 )
162 .await?,
163 )))
164 }
165
166 pub async fn new_client(
168 node_ip: SocketAddr,
169 rest_ip: Option<SocketAddr>,
170 rest_rps: u32,
171 account: Account<N>,
172 trusted_peers: &[SocketAddr],
173 genesis: Block<N>,
174 cdn: Option<http::Uri>,
175 storage_mode: StorageMode,
176 node_data_dir: NodeDataDir,
177 trusted_peers_only: bool,
178 auto_db_checkpoints: Option<PathBuf>,
179 dev: Option<u16>,
180 slipstream_configs: &[PathBuf],
181 signal_handler: Arc<SignalHandler>,
182 ) -> Result<Self> {
183 let client = Arc::new(
184 Client::new(
185 node_ip,
186 rest_ip,
187 rest_rps,
188 account,
189 trusted_peers,
190 genesis,
191 cdn,
192 storage_mode,
193 node_data_dir,
194 trusted_peers_only,
195 dev,
196 slipstream_configs,
197 signal_handler,
198 )
199 .await?,
200 );
201
202 let node = Self::Client(client.clone());
203
204 if let Some(path) = auto_db_checkpoints {
206 if let Some(handle) = node.perform_auto_checkpoints(path)? {
207 client.handles.lock().push(handle);
208 }
209 }
210
211 Ok(node)
212 }
213
214 pub async fn new_bootstrap_client(
216 listener_addr: SocketAddr,
217 account: Account<N>,
218 genesis_header: Header<N>,
219 dev: Option<u16>,
220 ) -> Result<Self> {
221 Ok(Self::BootstrapClient(BootstrapClient::new(listener_addr, account, genesis_header, dev).await?))
222 }
223
224 pub fn node_type(&self) -> NodeType {
226 match self {
227 Self::Validator(validator) => validator.node_type(),
228 Self::Prover(prover) => prover.node_type(),
229 Self::Client(client) => client.node_type(),
230 Self::BootstrapClient(_) => NodeType::BootstrapClient,
231 }
232 }
233
234 pub fn private_key(&self) -> &PrivateKey<N> {
236 match self {
237 Self::Validator(node) => node.private_key(),
238 Self::Prover(node) => node.private_key(),
239 Self::Client(node) => node.private_key(),
240 Self::BootstrapClient(node) => node.private_key(),
241 }
242 }
243
244 pub fn view_key(&self) -> &ViewKey<N> {
246 match self {
247 Self::Validator(node) => node.view_key(),
248 Self::Prover(node) => node.view_key(),
249 Self::Client(node) => node.view_key(),
250 Self::BootstrapClient(node) => node.view_key(),
251 }
252 }
253
254 pub fn address(&self) -> Address<N> {
256 match self {
257 Self::Validator(node) => node.address(),
258 Self::Prover(node) => node.address(),
259 Self::Client(node) => node.address(),
260 Self::BootstrapClient(node) => node.address(),
261 }
262 }
263
264 pub fn is_dev(&self) -> bool {
266 match self {
267 Self::Validator(node) => node.is_dev(),
268 Self::Prover(node) => node.is_dev(),
269 Self::Client(node) => node.is_dev(),
270 Self::BootstrapClient(node) => node.is_dev(),
271 }
272 }
273
274 pub fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
276 match self {
277 Self::Validator(validator) => validator.router().peer_pool(),
278 Self::Prover(prover) => prover.router().peer_pool(),
279 Self::Client(client) => client.router().peer_pool(),
280 Self::BootstrapClient(client) => client.peer_pool(),
281 }
282 }
283
284 pub fn ledger(&self) -> Option<&Ledger<N, ConsensusDB<N>>> {
286 match self {
287 Self::Validator(node) => Some(node.ledger()),
288 Self::Prover(_) => None,
289 Self::Client(node) => Some(node.ledger()),
290 Self::BootstrapClient(_) => None,
291 }
292 }
293
294 pub fn is_block_synced(&self) -> bool {
296 match self {
297 Self::Validator(node) => node.is_block_synced(),
298 Self::Prover(node) => node.is_block_synced(),
299 Self::Client(node) => node.is_block_synced(),
300 Self::BootstrapClient(_) => true,
301 }
302 }
303
304 pub fn num_blocks_behind(&self) -> Option<u32> {
307 match self {
308 Self::Validator(node) => node.num_blocks_behind(),
309 Self::Prover(node) => node.num_blocks_behind(),
310 Self::Client(node) => node.num_blocks_behind(),
311 Self::BootstrapClient(_) => Some(0),
312 }
313 }
314
315 pub fn get_sync_speed(&self) -> f64 {
318 match self {
319 Self::Validator(node) => node.get_sync_speed(),
320 Self::Prover(node) => node.get_sync_speed(),
321 Self::Client(node) => node.get_sync_speed(),
322 Self::BootstrapClient(_) => 0.0,
323 }
324 }
325
326 pub async fn shut_down(&self) {
328 match self {
329 Self::Validator(node) => node.shut_down().await,
330 Self::Prover(node) => node.shut_down().await,
331 Self::Client(node) => node.shut_down().await,
332 Self::BootstrapClient(node) => node.shut_down().await,
333 }
334 }
335
336 pub async fn wait_for_signals(&self, signal_handler: &SignalHandler) {
338 match self {
339 Self::Validator(node) => node.wait_for_signals(signal_handler).await,
340 Self::Prover(node) => node.wait_for_signals(signal_handler).await,
341 Self::Client(node) => node.wait_for_signals(signal_handler).await,
342 Self::BootstrapClient(node) => node.wait_for_signals(signal_handler).await,
343 }
344 }
345
346 pub fn perform_auto_checkpoints(&self, auto_checkpoint_path: PathBuf) -> Result<Option<task::JoinHandle<()>>> {
348 let Some(ledger) = self.ledger().cloned() else {
350 return Ok(None);
351 };
352
353 if !auto_checkpoint_path.exists() {
355 if let Err(e) = fs::create_dir_all(&auto_checkpoint_path) {
356 bail!("Couldn't create the specified path for the automatic ledger checkpoints: {e}");
357 }
358 } else if auto_checkpoint_path.exists() && !auto_checkpoint_path.is_dir() {
359 bail!("The specified path for automatic ledger checkpoints is not a directory");
360 }
361
362 let handle = tokio::spawn(async move {
364 info!("Starting the automatic ledger checkpoint routine...");
365
366 let startup_height = ledger.vm().block_store().current_block_height();
368 let mut last_checkpoint_height =
369 existing_startup_checkpoint_height(auto_checkpoint_path.as_path(), startup_height);
370 let mut existing_checkpoints = Vec::with_capacity(MAX_AUTO_CHECKPOINTS + 1);
371 let mut block_tree_path = aleo_ledger_dir(N::ID, ledger.vm().block_store().storage_mode());
372 block_tree_path.push("block_tree");
373
374 loop {
375 tokio::time::sleep(Duration::from_millis(500)).await;
379
380 let current_height = ledger.vm().block_store().current_block_height();
383 if last_checkpoint_height.is_some_and(|checkpoint_height| {
384 current_height.saturating_sub(checkpoint_height) < CHECKPOINT_BLOCK_FREQUENCY
385 }) {
386 continue;
387 }
388
389 let mut checkpoint_path = auto_checkpoint_path.clone();
391 checkpoint_path.push(format!("checkpoint_{current_height}"));
392 if let Err(e) = ledger.backup_database(&checkpoint_path) {
393 warn!("Couldn't automatically store a checkpoint at {}: {e}", checkpoint_path.display());
394 continue;
395 }
396 last_checkpoint_height = Some(current_height);
397
398 let ledger_clone = ledger.clone();
400 let source_block_tree_path = block_tree_path.clone();
401 tokio::spawn(async move {
402 if let Err(e) = ledger_clone.cache_block_tree() {
403 warn!("Couldn't cache the block tree for a ledger checkpoint: {e}");
404 return;
405 }
406
407 checkpoint_path.push("block_tree");
409 if let Err(e) = fs::copy(source_block_tree_path, checkpoint_path) {
410 warn!("Couldn't copy the block tree file to a ledger checkpoint: {e}");
411 }
412 });
413
414 existing_checkpoints.clear();
416 let checkpoint_dir = match auto_checkpoint_path.read_dir() {
417 Ok(dir) => dir,
418 Err(e) => {
419 warn!("IO error while accessing the automatic checkpoints: {e}");
420 continue;
421 }
422 };
423 for entry in checkpoint_dir {
424 let entry = match entry {
426 Ok(entry) => entry,
427 Err(e) => {
428 warn!("IO error while counting the automatic checkpoints: {e}");
429 continue;
430 }
431 };
432
433 let path = entry.path();
435 if !path.is_dir() {
436 continue;
437 }
438
439 let file_name = entry.file_name().into_string().unwrap(); let mut name_iter = file_name.split("_");
442 if name_iter.next() != Some("checkpoint") {
443 continue;
444 }
445 let Some(height) = name_iter.next() else {
446 continue;
447 };
448 let Ok(height) = u32::from_str(height) else {
449 continue;
450 };
451 existing_checkpoints.push((path, height));
452 }
453 existing_checkpoints.sort_unstable_by_key(|(_, height)| cmp::Reverse(*height));
454
455 let surplus_checkpoints = existing_checkpoints.len().saturating_sub(MAX_AUTO_CHECKPOINTS);
457 for _ in 0..surplus_checkpoints {
458 if let Some((checkpoint_path, _)) = existing_checkpoints.pop() {
459 if let Err(e) = fs::remove_dir_all(checkpoint_path) {
460 warn!("Couldn't remove an automatic ledger checkpoint: {e}");
461 }
462 }
463 }
464 }
465 });
466
467 Ok(Some(handle))
468 }
469}
470
471#[cfg(test)]
472mod tests {
473 use super::existing_startup_checkpoint_height;
474 use std::{
475 fs,
476 time::{SystemTime, UNIX_EPOCH},
477 };
478
479 #[test]
480 fn seeds_last_checkpoint_height_when_startup_checkpoint_directory_exists() {
481 let startup_height = 42;
482 let unique = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
483 let base_path = std::env::temp_dir().join(format!("snarkos_checkpoint_seed_test_{unique}"));
484 let checkpoint_path = base_path.join(format!("checkpoint_{startup_height}"));
485 fs::create_dir_all(&checkpoint_path).unwrap();
486
487 let seeded_height = existing_startup_checkpoint_height(base_path.as_path(), startup_height);
488 assert_eq!(seeded_height, Some(startup_height));
489
490 fs::remove_dir_all(base_path).unwrap();
491 }
492
493 #[test]
494 fn does_not_seed_last_checkpoint_height_when_startup_checkpoint_directory_missing() {
495 let startup_height = 42;
496 let unique = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
497 let base_path = std::env::temp_dir().join(format!("snarkos_checkpoint_seed_test_{unique}"));
498 fs::create_dir_all(&base_path).unwrap();
499
500 let seeded_height = existing_startup_checkpoint_height(base_path.as_path(), startup_height);
501 assert_eq!(seeded_height, None);
502
503 fs::remove_dir_all(base_path).unwrap();
504 }
505}