snarkos_node/validator/mod.rs
1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19
20use snarkos_account::Account;
21use snarkos_node_bft::{ledger_service::CoreLedgerService, spawn_blocking};
22use snarkos_node_cdn::CdnBlockSync;
23use snarkos_node_consensus::Consensus;
24use snarkos_node_rest::Rest;
25use snarkos_node_router::{
26 Heartbeat,
27 Inbound,
28 Outbound,
29 Router,
30 Routing,
31 messages::{NodeType, PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction},
32};
33use snarkos_node_sync::{BlockSync, Ping};
34use snarkos_node_tcp::{
35 P2P,
36 protocols::{Disconnect, Handshake, OnConnect, Reading},
37};
38use snarkvm::prelude::{
39 Ledger,
40 Network,
41 block::{Block, Header},
42 puzzle::Solution,
43 store::ConsensusStorage,
44};
45
46use aleo_std::StorageMode;
47use anyhow::Result;
48use core::future::Future;
49#[cfg(feature = "locktick")]
50use locktick::parking_lot::Mutex;
51#[cfg(not(feature = "locktick"))]
52use parking_lot::Mutex;
53use std::{
54 net::SocketAddr,
55 sync::{Arc, atomic::AtomicBool},
56 time::Duration,
57};
58use tokio::task::JoinHandle;
59
60/// A validator is a full node, capable of validating blocks.
61#[derive(Clone)]
62pub struct Validator<N: Network, C: ConsensusStorage<N>> {
63 /// The ledger of the node.
64 ledger: Ledger<N, C>,
65 /// The consensus module of the node.
66 consensus: Consensus<N>,
67 /// The router of the node.
68 router: Router<N>,
69 /// The REST server of the node.
70 rest: Option<Rest<N, C, Self>>,
71 /// The block synchronization logic (used in the Router impl).
72 sync: Arc<BlockSync<N>>,
73 /// The spawned handles.
74 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
75 /// The shutdown signal.
76 shutdown: Arc<AtomicBool>,
77 /// Keeps track of sending pings.
78 ping: Arc<Ping<N>>,
79}
80
81impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
82 /// Initializes a new validator node.
83 pub async fn new(
84 node_ip: SocketAddr,
85 bft_ip: Option<SocketAddr>,
86 rest_ip: Option<SocketAddr>,
87 rest_rps: u32,
88 account: Account<N>,
89 trusted_peers: &[SocketAddr],
90 trusted_validators: &[SocketAddr],
91 genesis: Block<N>,
92 cdn: Option<String>,
93 storage_mode: StorageMode,
94 allow_external_peers: bool,
95 dev_txs: bool,
96 dev: Option<u16>,
97 shutdown: Arc<AtomicBool>,
98 ) -> Result<Self> {
99 // Initialize the signal handler.
100 let signal_node = Self::handle_signals(shutdown.clone());
101
102 // Initialize the ledger.
103 let ledger = Ledger::load(genesis, storage_mode.clone())?;
104
105 // Initialize the ledger service.
106 let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone()));
107
108 // Determine if the validator should rotate external peers.
109 let rotate_external_peers = false;
110
111 // Initialize the node router.
112 let router = Router::new(
113 node_ip,
114 NodeType::Validator,
115 account.clone(),
116 ledger_service.clone(),
117 trusted_peers,
118 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
119 rotate_external_peers,
120 allow_external_peers,
121 dev.is_some(),
122 )
123 .await?;
124
125 // Initialize the block synchronization logic.
126 let sync = Arc::new(BlockSync::new(ledger_service.clone()));
127 let locators = sync.get_block_locators()?;
128 let ping = Arc::new(Ping::new(router.clone(), locators));
129
130 // Initialize the consensus layer.
131 let consensus = Consensus::new(
132 account.clone(),
133 ledger_service.clone(),
134 sync.clone(),
135 bft_ip,
136 trusted_validators,
137 storage_mode.clone(),
138 ping.clone(),
139 dev,
140 )
141 .await?;
142
143 // Initialize the node.
144 let mut node = Self {
145 ledger: ledger.clone(),
146 consensus: consensus.clone(),
147 router,
148 rest: None,
149 sync: sync.clone(),
150 ping,
151 handles: Default::default(),
152 shutdown: shutdown.clone(),
153 };
154
155 // Perform sync with CDN (if enabled).
156 let cdn_sync = cdn.map(|base_url| Arc::new(CdnBlockSync::new(base_url, ledger.clone(), shutdown)));
157
158 // Initialize the transaction pool.
159 node.initialize_transaction_pool(dev, dev_txs)?;
160
161 // Initialize the REST server.
162 if let Some(rest_ip) = rest_ip {
163 node.rest = Some(
164 Rest::start(
165 rest_ip,
166 rest_rps,
167 Some(consensus),
168 ledger.clone(),
169 Arc::new(node.clone()),
170 cdn_sync.clone(),
171 sync,
172 )
173 .await?,
174 );
175 }
176
177 // Set up everything else after CDN sync is done.
178 if let Some(cdn_sync) = cdn_sync {
179 if let Err(error) = cdn_sync.wait().await {
180 crate::log_clean_error(&storage_mode);
181 node.shut_down().await;
182 return Err(error);
183 }
184 }
185
186 // Initialize the routing.
187 node.initialize_routing().await;
188 // Initialize the notification message loop.
189 node.handles.lock().push(crate::start_notification_message_loop());
190 // Pass the node to the signal handler.
191 let _ = signal_node.set(node.clone());
192 // Return the node.
193 Ok(node)
194 }
195
196 /// Returns the ledger.
197 pub fn ledger(&self) -> &Ledger<N, C> {
198 &self.ledger
199 }
200
201 /// Returns the REST server.
202 pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
203 &self.rest
204 }
205}
206
207impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
208 // /// Initialize the transaction pool.
209 // fn initialize_transaction_pool(&self, dev: Option<u16>) -> Result<()> {
210 // use snarkvm::{
211 // console::{
212 // account::ViewKey,
213 // program::{Identifier, Literal, Plaintext, ProgramID, Record, Value},
214 // types::U64,
215 // },
216 // ledger::block::transition::Output,
217 // };
218 // use std::str::FromStr;
219 //
220 // // Initialize the locator.
221 // let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("split")?);
222 // // Initialize the record name.
223 // let record_name = Identifier::from_str("credits")?;
224 //
225 // /// Searches the genesis block for the mint record.
226 // fn search_genesis_for_mint<N: Network>(
227 // block: Block<N>,
228 // view_key: &ViewKey<N>,
229 // ) -> Option<Record<N, Plaintext<N>>> {
230 // for transition in block.transitions().filter(|t| t.is_mint()) {
231 // if let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[0] {
232 // if ciphertext.is_owner(view_key) {
233 // match ciphertext.decrypt(view_key) {
234 // Ok(record) => return Some(record),
235 // Err(error) => {
236 // error!("Failed to decrypt the mint output record - {error}");
237 // return None;
238 // }
239 // }
240 // }
241 // }
242 // }
243 // None
244 // }
245 //
246 // /// Searches the block for the split record.
247 // fn search_block_for_split<N: Network>(
248 // block: Block<N>,
249 // view_key: &ViewKey<N>,
250 // ) -> Option<Record<N, Plaintext<N>>> {
251 // let mut found = None;
252 // // TODO (howardwu): Switch to the iterator when DoubleEndedIterator is supported.
253 // // block.transitions().rev().for_each(|t| {
254 // let splits = block.transitions().filter(|t| t.is_split()).collect::<Vec<_>>();
255 // splits.iter().rev().for_each(|t| {
256 // if found.is_some() {
257 // return;
258 // }
259 // let Output::Record(_, _, Some(ciphertext)) = &t.outputs()[1] else {
260 // error!("Failed to find the split output record");
261 // return;
262 // };
263 // if ciphertext.is_owner(view_key) {
264 // match ciphertext.decrypt(view_key) {
265 // Ok(record) => found = Some(record),
266 // Err(error) => {
267 // error!("Failed to decrypt the split output record - {error}");
268 // }
269 // }
270 // }
271 // });
272 // found
273 // }
274 //
275 // let self_ = self.clone();
276 // self.spawn(async move {
277 // // Retrieve the view key.
278 // let view_key = self_.view_key();
279 // // Initialize the record.
280 // let mut record = {
281 // let mut found = None;
282 // let mut height = self_.ledger.latest_height();
283 // while found.is_none() && height > 0 {
284 // // Retrieve the block.
285 // let Ok(block) = self_.ledger.get_block(height) else {
286 // error!("Failed to get block at height {}", height);
287 // break;
288 // };
289 // // Search for the latest split record.
290 // if let Some(record) = search_block_for_split(block, view_key) {
291 // found = Some(record);
292 // }
293 // // Decrement the height.
294 // height = height.saturating_sub(1);
295 // }
296 // match found {
297 // Some(record) => record,
298 // None => {
299 // // Retrieve the genesis block.
300 // let Ok(block) = self_.ledger.get_block(0) else {
301 // error!("Failed to get the genesis block");
302 // return;
303 // };
304 // // Search the genesis block for the mint record.
305 // if let Some(record) = search_genesis_for_mint(block, view_key) {
306 // found = Some(record);
307 // }
308 // found.expect("Failed to find the split output record")
309 // }
310 // }
311 // };
312 // info!("Starting transaction pool...");
313 // // Start the transaction loop.
314 // loop {
315 // tokio::time::sleep(Duration::from_secs(1)).await;
316 // // If the node is running in development mode, only generate if you are allowed.
317 // if let Some(dev) = dev {
318 // if dev != 0 {
319 // continue;
320 // }
321 // }
322 //
323 // // Prepare the inputs.
324 // let inputs = [Value::from(record.clone()), Value::from(Literal::U64(U64::new(1)))].into_iter();
325 // // Execute the transaction.
326 // let transaction = match self_.ledger.vm().execute(
327 // self_.private_key(),
328 // locator,
329 // inputs,
330 // None,
331 // None,
332 // &mut rand::thread_rng(),
333 // ) {
334 // Ok(transaction) => transaction,
335 // Err(error) => {
336 // error!("Transaction pool encountered an execution error - {error}");
337 // continue;
338 // }
339 // };
340 // // Retrieve the transition.
341 // let Some(transition) = transaction.transitions().next() else {
342 // error!("Transaction pool encountered a missing transition");
343 // continue;
344 // };
345 // // Retrieve the second output.
346 // let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[1] else {
347 // error!("Transaction pool encountered a missing output");
348 // continue;
349 // };
350 // // Save the second output record.
351 // let Ok(next_record) = ciphertext.decrypt(view_key) else {
352 // error!("Transaction pool encountered a decryption error");
353 // continue;
354 // };
355 // // Broadcast the transaction.
356 // if self_
357 // .unconfirmed_transaction(
358 // self_.router.local_ip(),
359 // UnconfirmedTransaction::from(transaction.clone()),
360 // transaction.clone(),
361 // )
362 // .await
363 // {
364 // info!("Transaction pool broadcasted the transaction");
365 // let commitment = next_record.to_commitment(&locator.0, &record_name).unwrap();
366 // while !self_.ledger.contains_commitment(&commitment).unwrap_or(false) {
367 // tokio::time::sleep(Duration::from_secs(1)).await;
368 // }
369 // info!("Transaction accepted by the ledger");
370 // }
371 // // Save the record.
372 // record = next_record;
373 // }
374 // });
375 // Ok(())
376 // }
377
378 /// Initialize the transaction pool.
379 fn initialize_transaction_pool(&self, dev: Option<u16>, dev_txs: bool) -> Result<()> {
380 use snarkvm::console::{
381 program::{Identifier, Literal, ProgramID, Value},
382 types::U64,
383 };
384 use std::str::FromStr;
385
386 // Initialize the locator.
387 let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("transfer_public")?);
388
389 // Determine whether to start the loop.
390 match dev {
391 // If the node is running in development mode, only generate if you are allowed.
392 Some(id) => {
393 // If the node is not the first node, or if we should not create dev traffic, do not start the loop.
394 if id != 0 || !dev_txs {
395 return Ok(());
396 }
397 }
398 // If the node is not running in development mode, do not generate dev traffic.
399 _ => return Ok(()),
400 }
401
402 let self_ = self.clone();
403 self.spawn(async move {
404 tokio::time::sleep(Duration::from_secs(3)).await;
405 info!("Starting transaction pool...");
406
407 // Start the transaction loop.
408 loop {
409 tokio::time::sleep(Duration::from_millis(500)).await;
410
411 // Prepare the inputs.
412 let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))];
413 // Execute the transaction.
414 let self__ = self_.clone();
415 let transaction = match spawn_blocking!(self__.ledger.vm().execute(
416 self__.private_key(),
417 locator,
418 inputs.into_iter(),
419 None,
420 10_000,
421 None,
422 &mut rand::thread_rng(),
423 )) {
424 Ok(transaction) => transaction,
425 Err(error) => {
426 error!("Transaction pool encountered an execution error - {error}");
427 continue;
428 }
429 };
430 // Broadcast the transaction.
431 if self_
432 .unconfirmed_transaction(
433 self_.router.local_ip(),
434 UnconfirmedTransaction::from(transaction.clone()),
435 transaction.clone(),
436 )
437 .await
438 {
439 info!("Transaction pool broadcasted the transaction");
440 }
441 }
442 });
443 Ok(())
444 }
445
446 /// Spawns a task with the given future; it should only be used for long-running tasks.
447 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
448 self.handles.lock().push(tokio::spawn(future));
449 }
450}
451
452#[async_trait]
453impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
454 /// Shuts down the node.
455 async fn shut_down(&self) {
456 info!("Shutting down...");
457
458 // Shut down the node.
459 trace!("Shutting down the node...");
460 self.shutdown.store(true, std::sync::atomic::Ordering::Release);
461
462 // Abort the tasks.
463 trace!("Shutting down the validator...");
464 self.handles.lock().iter().for_each(|handle| handle.abort());
465
466 // Shut down the router.
467 self.router.shut_down().await;
468
469 // Shut down consensus.
470 trace!("Shutting down consensus...");
471 self.consensus.shut_down().await;
472
473 info!("Node has shut down.");
474 }
475}
476
477#[cfg(test)]
478mod tests {
479 use super::*;
480 use snarkvm::prelude::{
481 MainnetV0,
482 VM,
483 store::{ConsensusStore, helpers::memory::ConsensusMemory},
484 };
485
486 use anyhow::bail;
487 use rand::SeedableRng;
488 use rand_chacha::ChaChaRng;
489 use std::str::FromStr;
490
491 type CurrentNetwork = MainnetV0;
492
493 /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test.
494 #[ignore]
495 #[tokio::test]
496 async fn test_profiler() -> Result<()> {
497 // Specify the node attributes.
498 let node = SocketAddr::from_str("0.0.0.0:4130").unwrap();
499 let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
500 let storage_mode = StorageMode::Development(0);
501 let dev_txs = true;
502
503 // Initialize an (insecure) fixed RNG.
504 let mut rng = ChaChaRng::seed_from_u64(1234567890u64);
505 // Initialize the account.
506 let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
507 // Initialize a new VM.
508 let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(
509 StorageMode::new_test(None),
510 )?)?;
511 // Initialize the genesis block.
512 let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
513
514 println!("Initializing validator node...");
515
516 let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
517 node,
518 None,
519 Some(rest),
520 10,
521 account,
522 &[],
523 &[],
524 genesis,
525 None,
526 storage_mode,
527 false,
528 dev_txs,
529 None,
530 Default::default(),
531 )
532 .await
533 .unwrap();
534
535 println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
536
537 bail!("\n\nRemember to #[ignore] this test!\n\n")
538 }
539}