snarkos_node/validator/mod.rs
1// Copyright 2024-2025 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19use snarkos_account::Account;
20use snarkos_node_bft::{helpers::init_primary_channels, ledger_service::CoreLedgerService, spawn_blocking};
21use snarkos_node_consensus::Consensus;
22use snarkos_node_rest::Rest;
23use snarkos_node_router::{
24 Heartbeat,
25 Inbound,
26 Outbound,
27 Router,
28 Routing,
29 messages::{NodeType, PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction},
30};
31use snarkos_node_sync::{BlockSync, BlockSyncMode};
32use snarkos_node_tcp::{
33 P2P,
34 protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
35};
36use snarkvm::prelude::{
37 Ledger,
38 Network,
39 block::{Block, Header},
40 puzzle::Solution,
41 store::ConsensusStorage,
42};
43
44use aleo_std::StorageMode;
45use anyhow::Result;
46use core::future::Future;
47#[cfg(feature = "locktick")]
48use locktick::parking_lot::Mutex;
49#[cfg(not(feature = "locktick"))]
50use parking_lot::Mutex;
51use std::{
52 net::SocketAddr,
53 sync::{Arc, atomic::AtomicBool},
54 time::Duration,
55};
56use tokio::task::JoinHandle;
57
58/// A validator is a full node, capable of validating blocks.
59#[derive(Clone)]
60pub struct Validator<N: Network, C: ConsensusStorage<N>> {
61 /// The ledger of the node.
62 ledger: Ledger<N, C>,
63 /// The consensus module of the node.
64 consensus: Consensus<N>,
65 /// The router of the node.
66 router: Router<N>,
67 /// The REST server of the node.
68 rest: Option<Rest<N, C, Self>>,
69 /// The sync module.
70 sync: BlockSync<N>,
71 /// The spawned handles.
72 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
73 /// The shutdown signal.
74 shutdown: Arc<AtomicBool>,
75}
76
77impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
78 /// Initializes a new validator node.
79 pub async fn new(
80 node_ip: SocketAddr,
81 bft_ip: Option<SocketAddr>,
82 rest_ip: Option<SocketAddr>,
83 rest_rps: u32,
84 account: Account<N>,
85 trusted_peers: &[SocketAddr],
86 trusted_validators: &[SocketAddr],
87 genesis: Block<N>,
88 cdn: Option<String>,
89 storage_mode: StorageMode,
90 allow_external_peers: bool,
91 dev_txs: bool,
92 shutdown: Arc<AtomicBool>,
93 ) -> Result<Self> {
94 // Initialize the signal handler.
95 let signal_node = Self::handle_signals(shutdown.clone());
96
97 // Initialize the ledger.
98 let ledger = Ledger::load(genesis, storage_mode.clone())?;
99
100 // Initialize the CDN.
101 if let Some(base_url) = cdn {
102 // Sync the ledger with the CDN.
103 if let Err((_, error)) =
104 snarkos_node_cdn::sync_ledger_with_cdn(&base_url, ledger.clone(), shutdown.clone()).await
105 {
106 crate::log_clean_error(&storage_mode);
107 return Err(error);
108 }
109 }
110
111 // Initialize the ledger service.
112 let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone()));
113
114 // Initialize the consensus.
115 let mut consensus =
116 Consensus::new(account.clone(), ledger_service.clone(), bft_ip, trusted_validators, storage_mode.clone())?;
117 // Initialize the primary channels.
118 let (primary_sender, primary_receiver) = init_primary_channels::<N>();
119 // Start the consensus.
120 consensus.run(primary_sender, primary_receiver).await?;
121 // Determine if the validator should rotate external peers.
122 let rotate_external_peers = false;
123
124 // Initialize the node router.
125 let router = Router::new(
126 node_ip,
127 NodeType::Validator,
128 account,
129 trusted_peers,
130 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
131 rotate_external_peers,
132 allow_external_peers,
133 matches!(storage_mode, StorageMode::Development(_)),
134 )
135 .await?;
136
137 // Initialize the sync module.
138 let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service, router.tcp().clone());
139
140 // Initialize the node.
141 let mut node = Self {
142 ledger: ledger.clone(),
143 consensus: consensus.clone(),
144 router,
145 rest: None,
146 sync,
147 handles: Default::default(),
148 shutdown,
149 };
150 // Initialize the transaction pool.
151 node.initialize_transaction_pool(storage_mode, dev_txs)?;
152
153 // Initialize the REST server.
154 if let Some(rest_ip) = rest_ip {
155 node.rest =
156 Some(Rest::start(rest_ip, rest_rps, Some(consensus), ledger.clone(), Arc::new(node.clone())).await?);
157 }
158 // Initialize the routing.
159 node.initialize_routing().await;
160 // Initialize the notification message loop.
161 node.handles.lock().push(crate::start_notification_message_loop());
162 // Pass the node to the signal handler.
163 let _ = signal_node.set(node.clone());
164 // Return the node.
165 Ok(node)
166 }
167
168 /// Returns the ledger.
169 pub fn ledger(&self) -> &Ledger<N, C> {
170 &self.ledger
171 }
172
173 /// Returns the REST server.
174 pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
175 &self.rest
176 }
177}
178
179impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
180 // /// Initialize the transaction pool.
181 // fn initialize_transaction_pool(&self, dev: Option<u16>) -> Result<()> {
182 // use snarkvm::{
183 // console::{
184 // account::ViewKey,
185 // program::{Identifier, Literal, Plaintext, ProgramID, Record, Value},
186 // types::U64,
187 // },
188 // ledger::block::transition::Output,
189 // };
190 // use std::str::FromStr;
191 //
192 // // Initialize the locator.
193 // let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("split")?);
194 // // Initialize the record name.
195 // let record_name = Identifier::from_str("credits")?;
196 //
197 // /// Searches the genesis block for the mint record.
198 // fn search_genesis_for_mint<N: Network>(
199 // block: Block<N>,
200 // view_key: &ViewKey<N>,
201 // ) -> Option<Record<N, Plaintext<N>>> {
202 // for transition in block.transitions().filter(|t| t.is_mint()) {
203 // if let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[0] {
204 // if ciphertext.is_owner(view_key) {
205 // match ciphertext.decrypt(view_key) {
206 // Ok(record) => return Some(record),
207 // Err(error) => {
208 // error!("Failed to decrypt the mint output record - {error}");
209 // return None;
210 // }
211 // }
212 // }
213 // }
214 // }
215 // None
216 // }
217 //
218 // /// Searches the block for the split record.
219 // fn search_block_for_split<N: Network>(
220 // block: Block<N>,
221 // view_key: &ViewKey<N>,
222 // ) -> Option<Record<N, Plaintext<N>>> {
223 // let mut found = None;
224 // // TODO (howardwu): Switch to the iterator when DoubleEndedIterator is supported.
225 // // block.transitions().rev().for_each(|t| {
226 // let splits = block.transitions().filter(|t| t.is_split()).collect::<Vec<_>>();
227 // splits.iter().rev().for_each(|t| {
228 // if found.is_some() {
229 // return;
230 // }
231 // let Output::Record(_, _, Some(ciphertext)) = &t.outputs()[1] else {
232 // error!("Failed to find the split output record");
233 // return;
234 // };
235 // if ciphertext.is_owner(view_key) {
236 // match ciphertext.decrypt(view_key) {
237 // Ok(record) => found = Some(record),
238 // Err(error) => {
239 // error!("Failed to decrypt the split output record - {error}");
240 // }
241 // }
242 // }
243 // });
244 // found
245 // }
246 //
247 // let self_ = self.clone();
248 // self.spawn(async move {
249 // // Retrieve the view key.
250 // let view_key = self_.view_key();
251 // // Initialize the record.
252 // let mut record = {
253 // let mut found = None;
254 // let mut height = self_.ledger.latest_height();
255 // while found.is_none() && height > 0 {
256 // // Retrieve the block.
257 // let Ok(block) = self_.ledger.get_block(height) else {
258 // error!("Failed to get block at height {}", height);
259 // break;
260 // };
261 // // Search for the latest split record.
262 // if let Some(record) = search_block_for_split(block, view_key) {
263 // found = Some(record);
264 // }
265 // // Decrement the height.
266 // height = height.saturating_sub(1);
267 // }
268 // match found {
269 // Some(record) => record,
270 // None => {
271 // // Retrieve the genesis block.
272 // let Ok(block) = self_.ledger.get_block(0) else {
273 // error!("Failed to get the genesis block");
274 // return;
275 // };
276 // // Search the genesis block for the mint record.
277 // if let Some(record) = search_genesis_for_mint(block, view_key) {
278 // found = Some(record);
279 // }
280 // found.expect("Failed to find the split output record")
281 // }
282 // }
283 // };
284 // info!("Starting transaction pool...");
285 // // Start the transaction loop.
286 // loop {
287 // tokio::time::sleep(Duration::from_secs(1)).await;
288 // // If the node is running in development mode, only generate if you are allowed.
289 // if let Some(dev) = dev {
290 // if dev != 0 {
291 // continue;
292 // }
293 // }
294 //
295 // // Prepare the inputs.
296 // let inputs = [Value::from(record.clone()), Value::from(Literal::U64(U64::new(1)))].into_iter();
297 // // Execute the transaction.
298 // let transaction = match self_.ledger.vm().execute(
299 // self_.private_key(),
300 // locator,
301 // inputs,
302 // None,
303 // None,
304 // &mut rand::thread_rng(),
305 // ) {
306 // Ok(transaction) => transaction,
307 // Err(error) => {
308 // error!("Transaction pool encountered an execution error - {error}");
309 // continue;
310 // }
311 // };
312 // // Retrieve the transition.
313 // let Some(transition) = transaction.transitions().next() else {
314 // error!("Transaction pool encountered a missing transition");
315 // continue;
316 // };
317 // // Retrieve the second output.
318 // let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[1] else {
319 // error!("Transaction pool encountered a missing output");
320 // continue;
321 // };
322 // // Save the second output record.
323 // let Ok(next_record) = ciphertext.decrypt(view_key) else {
324 // error!("Transaction pool encountered a decryption error");
325 // continue;
326 // };
327 // // Broadcast the transaction.
328 // if self_
329 // .unconfirmed_transaction(
330 // self_.router.local_ip(),
331 // UnconfirmedTransaction::from(transaction.clone()),
332 // transaction.clone(),
333 // )
334 // .await
335 // {
336 // info!("Transaction pool broadcasted the transaction");
337 // let commitment = next_record.to_commitment(&locator.0, &record_name).unwrap();
338 // while !self_.ledger.contains_commitment(&commitment).unwrap_or(false) {
339 // tokio::time::sleep(Duration::from_secs(1)).await;
340 // }
341 // info!("Transaction accepted by the ledger");
342 // }
343 // // Save the record.
344 // record = next_record;
345 // }
346 // });
347 // Ok(())
348 // }
349
350 /// Initialize the transaction pool.
351 fn initialize_transaction_pool(&self, storage_mode: StorageMode, dev_txs: bool) -> Result<()> {
352 use snarkvm::console::{
353 program::{Identifier, Literal, ProgramID, Value},
354 types::U64,
355 };
356 use std::str::FromStr;
357
358 // Initialize the locator.
359 let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("transfer_public")?);
360
361 // Determine whether to start the loop.
362 match storage_mode {
363 // If the node is running in development mode, only generate if you are allowed.
364 StorageMode::Development(id) => {
365 // If the node is not the first node, or if we should not create dev traffic, do not start the loop.
366 if id != 0 || !dev_txs {
367 return Ok(());
368 }
369 }
370 // If the node is not running in development mode, do not generate dev traffic.
371 _ => return Ok(()),
372 }
373
374 let self_ = self.clone();
375 self.spawn(async move {
376 tokio::time::sleep(Duration::from_secs(3)).await;
377 info!("Starting transaction pool...");
378
379 // Start the transaction loop.
380 loop {
381 tokio::time::sleep(Duration::from_millis(500)).await;
382
383 // Prepare the inputs.
384 let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))];
385 // Execute the transaction.
386 let self__ = self_.clone();
387 let transaction = match spawn_blocking!(self__.ledger.vm().execute(
388 self__.private_key(),
389 locator,
390 inputs.into_iter(),
391 None,
392 10_000,
393 None,
394 &mut rand::thread_rng(),
395 )) {
396 Ok(transaction) => transaction,
397 Err(error) => {
398 error!("Transaction pool encountered an execution error - {error}");
399 continue;
400 }
401 };
402 // Broadcast the transaction.
403 if self_
404 .unconfirmed_transaction(
405 self_.router.local_ip(),
406 UnconfirmedTransaction::from(transaction.clone()),
407 transaction.clone(),
408 )
409 .await
410 {
411 info!("Transaction pool broadcasted the transaction");
412 }
413 }
414 });
415 Ok(())
416 }
417
418 /// Spawns a task with the given future; it should only be used for long-running tasks.
419 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
420 self.handles.lock().push(tokio::spawn(future));
421 }
422}
423
424#[async_trait]
425impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
426 /// Shuts down the node.
427 async fn shut_down(&self) {
428 info!("Shutting down...");
429
430 // Shut down the node.
431 trace!("Shutting down the node...");
432 self.shutdown.store(true, std::sync::atomic::Ordering::Release);
433
434 // Abort the tasks.
435 trace!("Shutting down the validator...");
436 self.handles.lock().iter().for_each(|handle| handle.abort());
437
438 // Shut down the router.
439 self.router.shut_down().await;
440
441 // Shut down consensus.
442 trace!("Shutting down consensus...");
443 self.consensus.shut_down().await;
444
445 info!("Node has shut down.");
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452 use snarkvm::prelude::{
453 MainnetV0,
454 VM,
455 store::{ConsensusStore, helpers::memory::ConsensusMemory},
456 };
457
458 use anyhow::bail;
459 use rand::SeedableRng;
460 use rand_chacha::ChaChaRng;
461 use std::str::FromStr;
462
463 type CurrentNetwork = MainnetV0;
464
465 /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test.
466 #[ignore]
467 #[tokio::test]
468 async fn test_profiler() -> Result<()> {
469 // Specify the node attributes.
470 let node = SocketAddr::from_str("0.0.0.0:4130").unwrap();
471 let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
472 let storage_mode = StorageMode::Development(0);
473 let dev_txs = true;
474
475 // Initialize an (insecure) fixed RNG.
476 let mut rng = ChaChaRng::seed_from_u64(1234567890u64);
477 // Initialize the account.
478 let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
479 // Initialize a new VM.
480 let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(
481 StorageMode::new_test(None),
482 )?)?;
483 // Initialize the genesis block.
484 let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
485
486 println!("Initializing validator node...");
487
488 let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
489 node,
490 None,
491 Some(rest),
492 10,
493 account,
494 &[],
495 &[],
496 genesis,
497 None,
498 storage_mode,
499 false,
500 dev_txs,
501 Default::default(),
502 )
503 .await
504 .unwrap();
505
506 println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
507
508 bail!("\n\nRemember to #[ignore] this test!\n\n")
509 }
510}