snarkos_node/validator/mod.rs
1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19
20use snarkos_account::Account;
21use snarkos_node_bft::{ledger_service::CoreLedgerService, spawn_blocking};
22use snarkos_node_cdn::CdnBlockSync;
23use snarkos_node_consensus::Consensus;
24use snarkos_node_network::{NodeType, PeerPoolHandling};
25use snarkos_node_rest::Rest;
26use snarkos_node_router::{
27 Heartbeat,
28 Inbound,
29 Outbound,
30 Router,
31 Routing,
32 messages::{PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction},
33};
34use snarkos_node_sync::{BlockSync, Ping};
35use snarkos_node_tcp::{
36 P2P,
37 protocols::{Disconnect, Handshake, OnConnect, Reading},
38};
39use snarkos_utilities::{NodeDataDir, SignalHandler};
40
41use snarkvm::prelude::{
42 Ledger,
43 Network,
44 block::{Block, Header},
45 puzzle::Solution,
46 store::ConsensusStorage,
47};
48
49use aleo_std::StorageMode;
50use anyhow::{Context, Result};
51use core::future::Future;
52#[cfg(feature = "locktick")]
53use locktick::parking_lot::Mutex;
54#[cfg(not(feature = "locktick"))]
55use parking_lot::Mutex;
56use std::{net::SocketAddr, sync::Arc, time::Duration};
57use tokio::task::JoinHandle;
58
59/// A validator is a full node, capable of validating blocks.
60#[derive(Clone)]
61pub struct Validator<N: Network, C: ConsensusStorage<N>> {
62 /// The ledger of the node.
63 ledger: Ledger<N, C>,
64 /// The consensus module of the node.
65 consensus: Consensus<N>,
66 /// The router of the node.
67 router: Router<N>,
68 /// The REST server of the node.
69 rest: Option<Rest<N, C, Self>>,
70 /// The block synchronization logic (used in the Router impl).
71 sync: Arc<BlockSync<N>>,
72 /// The spawned handles.
73 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
74 /// Keeps track of sending pings.
75 ping: Arc<Ping<N>>,
76}
77
78impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
79 /// Initializes a new validator node.
80 pub async fn new(
81 node_ip: SocketAddr,
82 bft_ip: Option<SocketAddr>,
83 rest_ip: Option<SocketAddr>,
84 rest_rps: u32,
85 account: Account<N>,
86 trusted_peers: &[SocketAddr],
87 trusted_validators: &[SocketAddr],
88 genesis: Block<N>,
89 cdn: Option<http::Uri>,
90 storage_mode: StorageMode,
91 node_data_dir: NodeDataDir,
92 trusted_peers_only: bool,
93 dev_txs: bool,
94 dev: Option<u16>,
95 signal_handler: Arc<SignalHandler>,
96 ) -> Result<Self> {
97 // Initialize the ledger.
98 let ledger = {
99 let storage_mode = storage_mode.clone();
100 let genesis = genesis.clone();
101
102 spawn_blocking!(Ledger::<N, C>::load(genesis, storage_mode))
103 }
104 .with_context(|| "Failed to initialize the ledger")?;
105
106 // Initialize the ledger service.
107 let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), signal_handler.clone()));
108
109 // Initialize the node router.
110 let router = Router::new(
111 node_ip,
112 NodeType::Validator,
113 account.clone(),
114 ledger_service.clone(),
115 trusted_peers,
116 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
117 trusted_peers_only,
118 node_data_dir.clone(),
119 dev.is_some(),
120 )
121 .await?;
122
123 // Initialize the block synchronization logic.
124 let sync = Arc::new(BlockSync::new(ledger_service.clone()));
125 let locators = sync.get_block_locators()?;
126 let ping = Arc::new(Ping::new(router.clone(), locators));
127
128 // Initialize the consensus layer.
129 let consensus = Consensus::new(
130 account.clone(),
131 ledger_service.clone(),
132 sync.clone(),
133 bft_ip,
134 trusted_validators,
135 trusted_peers_only,
136 storage_mode.clone(),
137 node_data_dir.clone(),
138 ping.clone(),
139 dev,
140 )
141 .await?;
142
143 // Initialize the node.
144 let mut node = Self {
145 ledger: ledger.clone(),
146 consensus: consensus.clone(),
147 router,
148 rest: None,
149 sync: sync.clone(),
150 ping,
151 handles: Default::default(),
152 };
153
154 // Perform sync with CDN (if enabled).
155 let cdn_sync = cdn.map(|base_url| Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler)));
156
157 // Initialize the transaction pool.
158 node.initialize_transaction_pool(dev, dev_txs)?;
159
160 // Initialize the REST server.
161 if let Some(rest_ip) = rest_ip {
162 node.rest = Some(
163 Rest::start(
164 rest_ip,
165 rest_rps,
166 Some(consensus),
167 ledger.clone(),
168 Arc::new(node.clone()),
169 cdn_sync.clone(),
170 sync,
171 )
172 .await?,
173 );
174 }
175
176 // Set up everything else after CDN sync is done.
177 if let Some(cdn_sync) = cdn_sync {
178 if let Err(error) = cdn_sync.wait().await.with_context(|| "Failed to synchronize from the CDN") {
179 crate::log_clean_error(&storage_mode);
180 node.shut_down().await;
181 return Err(error);
182 }
183 }
184
185 // Initialize the routing.
186 node.initialize_routing().await;
187 // Initialize the notification message loop.
188 node.handles.lock().push(crate::start_notification_message_loop());
189
190 // Return the node.
191 Ok(node)
192 }
193
194 /// Returns the ledger.
195 pub fn ledger(&self) -> &Ledger<N, C> {
196 &self.ledger
197 }
198
199 /// Returns the REST server.
200 pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
201 &self.rest
202 }
203
204 /// Returns the router.
205 pub fn router(&self) -> &Router<N> {
206 &self.router
207 }
208
209 // /// Initialize the transaction pool.
210 // fn initialize_transaction_pool(&self, dev: Option<u16>) -> Result<()> {
211 // use snarkvm::{
212 // console::{
213 // account::ViewKey,
214 // program::{Identifier, Literal, Plaintext, ProgramID, Record, Value},
215 // types::U64,
216 // },
217 // ledger::block::transition::Output,
218 // };
219 // use std::str::FromStr;
220 //
221 // // Initialize the locator.
222 // let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("split")?);
223 // // Initialize the record name.
224 // let record_name = Identifier::from_str("credits")?;
225 //
226 // /// Searches the genesis block for the mint record.
227 // fn search_genesis_for_mint<N: Network>(
228 // block: Block<N>,
229 // view_key: &ViewKey<N>,
230 // ) -> Option<Record<N, Plaintext<N>>> {
231 // for transition in block.transitions().filter(|t| t.is_mint()) {
232 // if let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[0] {
233 // if ciphertext.is_owner(view_key) {
234 // match ciphertext.decrypt(view_key) {
235 // Ok(record) => return Some(record),
236 // Err(error) => {
237 // error!("Failed to decrypt the mint output record - {error}");
238 // return None;
239 // }
240 // }
241 // }
242 // }
243 // }
244 // None
245 // }
246 //
247 // /// Searches the block for the split record.
248 // fn search_block_for_split<N: Network>(
249 // block: Block<N>,
250 // view_key: &ViewKey<N>,
251 // ) -> Option<Record<N, Plaintext<N>>> {
252 // let mut found = None;
253 // // TODO (howardwu): Switch to the iterator when DoubleEndedIterator is supported.
254 // // block.transitions().rev().for_each(|t| {
255 // let splits = block.transitions().filter(|t| t.is_split()).collect::<Vec<_>>();
256 // splits.iter().rev().for_each(|t| {
257 // if found.is_some() {
258 // return;
259 // }
260 // let Output::Record(_, _, Some(ciphertext)) = &t.outputs()[1] else {
261 // error!("Failed to find the split output record");
262 // return;
263 // };
264 // if ciphertext.is_owner(view_key) {
265 // match ciphertext.decrypt(view_key) {
266 // Ok(record) => found = Some(record),
267 // Err(error) => {
268 // error!("Failed to decrypt the split output record - {error}");
269 // }
270 // }
271 // }
272 // });
273 // found
274 // }
275 //
276 // let self_ = self.clone();
277 // self.spawn(async move {
278 // // Retrieve the view key.
279 // let view_key = self_.view_key();
280 // // Initialize the record.
281 // let mut record = {
282 // let mut found = None;
283 // let mut height = self_.ledger.latest_height();
284 // while found.is_none() && height > 0 {
285 // // Retrieve the block.
286 // let Ok(block) = self_.ledger.get_block(height) else {
287 // error!("Failed to get block at height {}", height);
288 // break;
289 // };
290 // // Search for the latest split record.
291 // if let Some(record) = search_block_for_split(block, view_key) {
292 // found = Some(record);
293 // }
294 // // Decrement the height.
295 // height = height.saturating_sub(1);
296 // }
297 // match found {
298 // Some(record) => record,
299 // None => {
300 // // Retrieve the genesis block.
301 // let Ok(block) = self_.ledger.get_block(0) else {
302 // error!("Failed to get the genesis block");
303 // return;
304 // };
305 // // Search the genesis block for the mint record.
306 // if let Some(record) = search_genesis_for_mint(block, view_key) {
307 // found = Some(record);
308 // }
309 // found.expect("Failed to find the split output record")
310 // }
311 // }
312 // };
313 // info!("Starting transaction pool...");
314 // // Start the transaction loop.
315 // loop {
316 // tokio::time::sleep(Duration::from_secs(1)).await;
317 // // If the node is running in development mode, only generate if you are allowed.
318 // if let Some(dev) = dev {
319 // if dev != 0 {
320 // continue;
321 // }
322 // }
323 //
324 // // Prepare the inputs.
325 // let inputs = [Value::from(record.clone()), Value::from(Literal::U64(U64::new(1)))].into_iter();
326 // // Execute the transaction.
327 // let transaction = match self_.ledger.vm().execute(
328 // self_.private_key(),
329 // locator,
330 // inputs,
331 // None,
332 // None,
333 // &mut rand::thread_rng(),
334 // ) {
335 // Ok(transaction) => transaction,
336 // Err(error) => {
337 // error!("Transaction pool encountered an execution error - {error}");
338 // continue;
339 // }
340 // };
341 // // Retrieve the transition.
342 // let Some(transition) = transaction.transitions().next() else {
343 // error!("Transaction pool encountered a missing transition");
344 // continue;
345 // };
346 // // Retrieve the second output.
347 // let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[1] else {
348 // error!("Transaction pool encountered a missing output");
349 // continue;
350 // };
351 // // Save the second output record.
352 // let Ok(next_record) = ciphertext.decrypt(view_key) else {
353 // error!("Transaction pool encountered a decryption error");
354 // continue;
355 // };
356 // // Broadcast the transaction.
357 // if self_
358 // .unconfirmed_transaction(
359 // self_.router.local_ip(),
360 // UnconfirmedTransaction::from(transaction.clone()),
361 // transaction.clone(),
362 // )
363 // .await
364 // {
365 // info!("Transaction pool broadcasted the transaction");
366 // let commitment = next_record.to_commitment(&locator.0, &record_name).unwrap();
367 // while !self_.ledger.contains_commitment(&commitment).unwrap_or(false) {
368 // tokio::time::sleep(Duration::from_secs(1)).await;
369 // }
370 // info!("Transaction accepted by the ledger");
371 // }
372 // // Save the record.
373 // record = next_record;
374 // }
375 // });
376 // Ok(())
377 // }
378
379 /// Initializes the transaction pool (if in development mode).
380 ///
381 /// Spawns a background task that periodically issues transactions to the network.
382 fn initialize_transaction_pool(&self, dev: Option<u16>, dev_txs: bool) -> Result<()> {
383 use snarkvm::console::{
384 program::{Identifier, Literal, ProgramID, Value},
385 types::U64,
386 };
387 use std::str::FromStr;
388
389 // Initialize the locator.
390 let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("transfer_public")?);
391
392 // Determine whether to start the loop.
393 match dev {
394 // If the node is running in development mode, only generate if you are allowed.
395 Some(id) => {
396 // If the node is not the first node, or if we should not create dev traffic, do not start the loop.
397 if id != 0 || !dev_txs {
398 return Ok(());
399 }
400 }
401 // If the node is not running in development mode, do not generate dev traffic.
402 _ => return Ok(()),
403 }
404
405 let self_ = self.clone();
406 self.spawn(async move {
407 tokio::time::sleep(Duration::from_secs(3)).await;
408 info!("Starting transaction pool...");
409
410 // Start the transaction loop.
411 loop {
412 tokio::time::sleep(Duration::from_millis(500)).await;
413
414 // Prepare the inputs.
415 let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))];
416 // Execute the transaction.
417 let self__ = self_.clone();
418 let transaction = match spawn_blocking!(self__.ledger.vm().execute(
419 self__.private_key(),
420 locator,
421 inputs.into_iter(),
422 None,
423 10_000,
424 None,
425 &mut rand::thread_rng(),
426 )) {
427 Ok(transaction) => transaction,
428 Err(error) => {
429 error!("Transaction pool encountered an execution error - {error}");
430 continue;
431 }
432 };
433 // Broadcast the transaction.
434 if self_
435 .unconfirmed_transaction(
436 self_.router.local_ip(),
437 UnconfirmedTransaction::from(transaction.clone()),
438 transaction.clone(),
439 )
440 .await
441 {
442 info!("Transaction pool broadcasted the transaction");
443 }
444 }
445 });
446 Ok(())
447 }
448
449 /// Spawns a task with the given future; it should only be used for long-running tasks.
450 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
451 self.handles.lock().push(tokio::spawn(future));
452 }
453}
454
455#[async_trait]
456impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
457 /// Shuts down the node.
458 async fn shut_down(&self) {
459 info!("Shutting down...");
460
461 // Shut down the node.
462 trace!("Shutting down the node...");
463
464 // Abort the tasks.
465 trace!("Shutting down the validator...");
466 self.handles.lock().iter().for_each(|handle| handle.abort());
467
468 // Shut down the router.
469 self.router.shut_down().await;
470
471 // Shut down consensus.
472 trace!("Shutting down consensus...");
473 self.consensus.shut_down().await;
474
475 info!("Node has shut down.");
476 }
477}
478
479#[cfg(test)]
480mod tests {
481 use super::*;
482 use snarkvm::prelude::{
483 MainnetV0,
484 VM,
485 store::{ConsensusStore, helpers::memory::ConsensusMemory},
486 };
487
488 use anyhow::bail;
489 use rand::SeedableRng;
490 use rand_chacha::ChaChaRng;
491 use std::str::FromStr;
492
493 type CurrentNetwork = MainnetV0;
494
495 /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test.
496 #[ignore]
497 #[tokio::test]
498 async fn test_profiler() -> Result<()> {
499 // Specify the node attributes.
500 let node = SocketAddr::from_str("0.0.0.0:4130").unwrap();
501 let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
502 let storage_mode = StorageMode::Development(0);
503 let node_data_dir = NodeDataDir::new_development(CurrentNetwork::ID, 0);
504 let dev_txs = true;
505
506 // Initialize an (insecure) fixed RNG.
507 let mut rng = ChaChaRng::seed_from_u64(1234567890u64);
508 // Initialize the account.
509 let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
510 // Initialize a new VM.
511 let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(
512 StorageMode::new_test(None),
513 )?)?;
514 // Initialize the genesis block.
515 let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
516
517 println!("Initializing validator node...");
518
519 let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
520 node,
521 None,
522 Some(rest),
523 10,
524 account,
525 &[],
526 &[],
527 genesis,
528 None,
529 storage_mode,
530 node_data_dir,
531 false,
532 dev_txs,
533 None,
534 SignalHandler::new(),
535 )
536 .await
537 .unwrap();
538
539 println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
540
541 bail!("\n\nRemember to #[ignore] this test!\n\n")
542 }
543}