snarkos_node/validator/mod.rs
1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19use snarkos_account::Account;
20use snarkos_node_bft::{helpers::init_primary_channels, ledger_service::CoreLedgerService, spawn_blocking};
21use snarkos_node_consensus::Consensus;
22use snarkos_node_rest::Rest;
23use snarkos_node_router::{
24 Heartbeat,
25 Inbound,
26 Outbound,
27 Router,
28 Routing,
29 messages::{NodeType, PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction},
30};
31use snarkos_node_sync::{BlockSync, BlockSyncMode};
32use snarkos_node_tcp::{
33 P2P,
34 protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
35};
36use snarkvm::prelude::{
37 Ledger,
38 Network,
39 block::{Block, Header},
40 puzzle::Solution,
41 store::ConsensusStorage,
42};
43
44use aleo_std::StorageMode;
45use anyhow::Result;
46use core::future::Future;
47#[cfg(feature = "locktick")]
48use locktick::parking_lot::Mutex;
49#[cfg(not(feature = "locktick"))]
50use parking_lot::Mutex;
51use std::{
52 net::SocketAddr,
53 sync::{Arc, atomic::AtomicBool},
54 time::Duration,
55};
56use tokio::task::JoinHandle;
57
58/// A validator is a full node, capable of validating blocks.
59#[derive(Clone)]
60pub struct Validator<N: Network, C: ConsensusStorage<N>> {
61 /// The ledger of the node.
62 ledger: Ledger<N, C>,
63 /// The consensus module of the node.
64 consensus: Consensus<N>,
65 /// The router of the node.
66 router: Router<N>,
67 /// The REST server of the node.
68 rest: Option<Rest<N, C, Self>>,
69 /// The sync module.
70 sync: BlockSync<N>,
71 /// The spawned handles.
72 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
73 /// The shutdown signal.
74 shutdown: Arc<AtomicBool>,
75}
76
77impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
78 /// Initializes a new validator node.
79 pub async fn new(
80 node_ip: SocketAddr,
81 bft_ip: Option<SocketAddr>,
82 rest_ip: Option<SocketAddr>,
83 rest_rps: u32,
84 account: Account<N>,
85 trusted_peers: &[SocketAddr],
86 trusted_validators: &[SocketAddr],
87 genesis: Block<N>,
88 cdn: Option<String>,
89 storage_mode: StorageMode,
90 allow_external_peers: bool,
91 dev_txs: bool,
92 shutdown: Arc<AtomicBool>,
93 ) -> Result<Self> {
94 // Initialize the signal handler.
95 let signal_node = Self::handle_signals(shutdown.clone());
96
97 // Initialize the ledger.
98 let ledger = Ledger::load(genesis, storage_mode.clone())?;
99
100 // Initialize the CDN.
101 if let Some(base_url) = cdn {
102 // Sync the ledger with the CDN.
103 if let Err((_, error)) =
104 snarkos_node_cdn::sync_ledger_with_cdn(&base_url, ledger.clone(), shutdown.clone()).await
105 {
106 crate::log_clean_error(&storage_mode);
107 return Err(error);
108 }
109 }
110
111 // Initialize the ledger service.
112 let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone()));
113
114 // Initialize the consensus.
115 let mut consensus =
116 Consensus::new(account.clone(), ledger_service.clone(), bft_ip, trusted_validators, storage_mode.clone())?;
117 // Initialize the primary channels.
118 let (primary_sender, primary_receiver) = init_primary_channels::<N>();
119 // Start the consensus.
120 consensus.run(primary_sender, primary_receiver).await?;
121 // Determine if the validator should rotate external peers.
122 let rotate_external_peers = false;
123
124 // Initialize the node router.
125 let router = Router::new(
126 node_ip,
127 NodeType::Validator,
128 account.clone(),
129 ledger_service.clone(),
130 trusted_peers,
131 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
132 rotate_external_peers,
133 allow_external_peers,
134 matches!(storage_mode, StorageMode::Development(_)),
135 )
136 .await?;
137
138 // Initialize the sync module.
139 let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service, router.tcp().clone());
140
141 // Initialize the node.
142 let mut node = Self {
143 ledger: ledger.clone(),
144 consensus: consensus.clone(),
145 router,
146 rest: None,
147 sync,
148 handles: Default::default(),
149 shutdown,
150 };
151 // Initialize the transaction pool.
152 node.initialize_transaction_pool(storage_mode, dev_txs)?;
153
154 // Initialize the REST server.
155 if let Some(rest_ip) = rest_ip {
156 node.rest =
157 Some(Rest::start(rest_ip, rest_rps, Some(consensus), ledger.clone(), Arc::new(node.clone())).await?);
158 }
159 // Initialize the routing.
160 node.initialize_routing().await;
161 // Initialize the notification message loop.
162 node.handles.lock().push(crate::start_notification_message_loop());
163 // Pass the node to the signal handler.
164 let _ = signal_node.set(node.clone());
165 // Return the node.
166 Ok(node)
167 }
168
169 /// Returns the ledger.
170 pub fn ledger(&self) -> &Ledger<N, C> {
171 &self.ledger
172 }
173
174 /// Returns the REST server.
175 pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
176 &self.rest
177 }
178}
179
180impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
181 // /// Initialize the transaction pool.
182 // fn initialize_transaction_pool(&self, dev: Option<u16>) -> Result<()> {
183 // use snarkvm::{
184 // console::{
185 // account::ViewKey,
186 // program::{Identifier, Literal, Plaintext, ProgramID, Record, Value},
187 // types::U64,
188 // },
189 // ledger::block::transition::Output,
190 // };
191 // use std::str::FromStr;
192 //
193 // // Initialize the locator.
194 // let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("split")?);
195 // // Initialize the record name.
196 // let record_name = Identifier::from_str("credits")?;
197 //
198 // /// Searches the genesis block for the mint record.
199 // fn search_genesis_for_mint<N: Network>(
200 // block: Block<N>,
201 // view_key: &ViewKey<N>,
202 // ) -> Option<Record<N, Plaintext<N>>> {
203 // for transition in block.transitions().filter(|t| t.is_mint()) {
204 // if let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[0] {
205 // if ciphertext.is_owner(view_key) {
206 // match ciphertext.decrypt(view_key) {
207 // Ok(record) => return Some(record),
208 // Err(error) => {
209 // error!("Failed to decrypt the mint output record - {error}");
210 // return None;
211 // }
212 // }
213 // }
214 // }
215 // }
216 // None
217 // }
218 //
219 // /// Searches the block for the split record.
220 // fn search_block_for_split<N: Network>(
221 // block: Block<N>,
222 // view_key: &ViewKey<N>,
223 // ) -> Option<Record<N, Plaintext<N>>> {
224 // let mut found = None;
225 // // TODO (howardwu): Switch to the iterator when DoubleEndedIterator is supported.
226 // // block.transitions().rev().for_each(|t| {
227 // let splits = block.transitions().filter(|t| t.is_split()).collect::<Vec<_>>();
228 // splits.iter().rev().for_each(|t| {
229 // if found.is_some() {
230 // return;
231 // }
232 // let Output::Record(_, _, Some(ciphertext)) = &t.outputs()[1] else {
233 // error!("Failed to find the split output record");
234 // return;
235 // };
236 // if ciphertext.is_owner(view_key) {
237 // match ciphertext.decrypt(view_key) {
238 // Ok(record) => found = Some(record),
239 // Err(error) => {
240 // error!("Failed to decrypt the split output record - {error}");
241 // }
242 // }
243 // }
244 // });
245 // found
246 // }
247 //
248 // let self_ = self.clone();
249 // self.spawn(async move {
250 // // Retrieve the view key.
251 // let view_key = self_.view_key();
252 // // Initialize the record.
253 // let mut record = {
254 // let mut found = None;
255 // let mut height = self_.ledger.latest_height();
256 // while found.is_none() && height > 0 {
257 // // Retrieve the block.
258 // let Ok(block) = self_.ledger.get_block(height) else {
259 // error!("Failed to get block at height {}", height);
260 // break;
261 // };
262 // // Search for the latest split record.
263 // if let Some(record) = search_block_for_split(block, view_key) {
264 // found = Some(record);
265 // }
266 // // Decrement the height.
267 // height = height.saturating_sub(1);
268 // }
269 // match found {
270 // Some(record) => record,
271 // None => {
272 // // Retrieve the genesis block.
273 // let Ok(block) = self_.ledger.get_block(0) else {
274 // error!("Failed to get the genesis block");
275 // return;
276 // };
277 // // Search the genesis block for the mint record.
278 // if let Some(record) = search_genesis_for_mint(block, view_key) {
279 // found = Some(record);
280 // }
281 // found.expect("Failed to find the split output record")
282 // }
283 // }
284 // };
285 // info!("Starting transaction pool...");
286 // // Start the transaction loop.
287 // loop {
288 // tokio::time::sleep(Duration::from_secs(1)).await;
289 // // If the node is running in development mode, only generate if you are allowed.
290 // if let Some(dev) = dev {
291 // if dev != 0 {
292 // continue;
293 // }
294 // }
295 //
296 // // Prepare the inputs.
297 // let inputs = [Value::from(record.clone()), Value::from(Literal::U64(U64::new(1)))].into_iter();
298 // // Execute the transaction.
299 // let transaction = match self_.ledger.vm().execute(
300 // self_.private_key(),
301 // locator,
302 // inputs,
303 // None,
304 // None,
305 // &mut rand::thread_rng(),
306 // ) {
307 // Ok(transaction) => transaction,
308 // Err(error) => {
309 // error!("Transaction pool encountered an execution error - {error}");
310 // continue;
311 // }
312 // };
313 // // Retrieve the transition.
314 // let Some(transition) = transaction.transitions().next() else {
315 // error!("Transaction pool encountered a missing transition");
316 // continue;
317 // };
318 // // Retrieve the second output.
319 // let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[1] else {
320 // error!("Transaction pool encountered a missing output");
321 // continue;
322 // };
323 // // Save the second output record.
324 // let Ok(next_record) = ciphertext.decrypt(view_key) else {
325 // error!("Transaction pool encountered a decryption error");
326 // continue;
327 // };
328 // // Broadcast the transaction.
329 // if self_
330 // .unconfirmed_transaction(
331 // self_.router.local_ip(),
332 // UnconfirmedTransaction::from(transaction.clone()),
333 // transaction.clone(),
334 // )
335 // .await
336 // {
337 // info!("Transaction pool broadcasted the transaction");
338 // let commitment = next_record.to_commitment(&locator.0, &record_name).unwrap();
339 // while !self_.ledger.contains_commitment(&commitment).unwrap_or(false) {
340 // tokio::time::sleep(Duration::from_secs(1)).await;
341 // }
342 // info!("Transaction accepted by the ledger");
343 // }
344 // // Save the record.
345 // record = next_record;
346 // }
347 // });
348 // Ok(())
349 // }
350
351 /// Initialize the transaction pool.
352 fn initialize_transaction_pool(&self, storage_mode: StorageMode, dev_txs: bool) -> Result<()> {
353 use snarkvm::console::{
354 program::{Identifier, Literal, ProgramID, Value},
355 types::U64,
356 };
357 use std::str::FromStr;
358
359 // Initialize the locator.
360 let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("transfer_public")?);
361
362 // Determine whether to start the loop.
363 match storage_mode {
364 // If the node is running in development mode, only generate if you are allowed.
365 StorageMode::Development(id) => {
366 // If the node is not the first node, or if we should not create dev traffic, do not start the loop.
367 if id != 0 || !dev_txs {
368 return Ok(());
369 }
370 }
371 // If the node is not running in development mode, do not generate dev traffic.
372 _ => return Ok(()),
373 }
374
375 let self_ = self.clone();
376 self.spawn(async move {
377 tokio::time::sleep(Duration::from_secs(3)).await;
378 info!("Starting transaction pool...");
379
380 // Start the transaction loop.
381 loop {
382 tokio::time::sleep(Duration::from_millis(500)).await;
383
384 // Prepare the inputs.
385 let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))];
386 // Execute the transaction.
387 let self__ = self_.clone();
388 let transaction = match spawn_blocking!(self__.ledger.vm().execute(
389 self__.private_key(),
390 locator,
391 inputs.into_iter(),
392 None,
393 10_000,
394 None,
395 &mut rand::thread_rng(),
396 )) {
397 Ok(transaction) => transaction,
398 Err(error) => {
399 error!("Transaction pool encountered an execution error - {error}");
400 continue;
401 }
402 };
403 // Broadcast the transaction.
404 if self_
405 .unconfirmed_transaction(
406 self_.router.local_ip(),
407 UnconfirmedTransaction::from(transaction.clone()),
408 transaction.clone(),
409 )
410 .await
411 {
412 info!("Transaction pool broadcasted the transaction");
413 }
414 }
415 });
416 Ok(())
417 }
418
419 /// Spawns a task with the given future; it should only be used for long-running tasks.
420 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
421 self.handles.lock().push(tokio::spawn(future));
422 }
423}
424
425#[async_trait]
426impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
427 /// Shuts down the node.
428 async fn shut_down(&self) {
429 info!("Shutting down...");
430
431 // Shut down the node.
432 trace!("Shutting down the node...");
433 self.shutdown.store(true, std::sync::atomic::Ordering::Release);
434
435 // Abort the tasks.
436 trace!("Shutting down the validator...");
437 self.handles.lock().iter().for_each(|handle| handle.abort());
438
439 // Shut down the router.
440 self.router.shut_down().await;
441
442 // Shut down consensus.
443 trace!("Shutting down consensus...");
444 self.consensus.shut_down().await;
445
446 info!("Node has shut down.");
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453 use snarkvm::prelude::{
454 MainnetV0,
455 VM,
456 store::{ConsensusStore, helpers::memory::ConsensusMemory},
457 };
458
459 use anyhow::bail;
460 use rand::SeedableRng;
461 use rand_chacha::ChaChaRng;
462 use std::str::FromStr;
463
464 type CurrentNetwork = MainnetV0;
465
466 /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test.
467 #[ignore]
468 #[tokio::test]
469 async fn test_profiler() -> Result<()> {
470 // Specify the node attributes.
471 let node = SocketAddr::from_str("0.0.0.0:4130").unwrap();
472 let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
473 let storage_mode = StorageMode::Development(0);
474 let dev_txs = true;
475
476 // Initialize an (insecure) fixed RNG.
477 let mut rng = ChaChaRng::seed_from_u64(1234567890u64);
478 // Initialize the account.
479 let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
480 // Initialize a new VM.
481 let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(
482 StorageMode::new_test(None),
483 )?)?;
484 // Initialize the genesis block.
485 let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
486
487 println!("Initializing validator node...");
488
489 let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
490 node,
491 None,
492 Some(rest),
493 10,
494 account,
495 &[],
496 &[],
497 genesis,
498 None,
499 storage_mode,
500 false,
501 dev_txs,
502 Default::default(),
503 )
504 .await
505 .unwrap();
506
507 println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
508
509 bail!("\n\nRemember to #[ignore] this test!\n\n")
510 }
511}