snarkos_node/validator/mod.rs
1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19
20use snarkos_account::Account;
21use snarkos_node_bft::{ledger_service::CoreLedgerService, spawn_blocking};
22use snarkos_node_cdn::CdnBlockSync;
23use snarkos_node_consensus::Consensus;
24use snarkos_node_network::{NodeType, PeerPoolHandling};
25use snarkos_node_rest::Rest;
26use snarkos_node_router::{
27 Heartbeat,
28 Inbound,
29 Outbound,
30 Router,
31 Routing,
32 messages::{PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction},
33};
34use snarkos_node_sync::{BlockSync, Ping};
35use snarkos_node_tcp::{
36 P2P,
37 protocols::{Disconnect, Handshake, OnConnect, Reading},
38};
39use snarkvm::prelude::{
40 Ledger,
41 Network,
42 block::{Block, Header},
43 puzzle::Solution,
44 store::ConsensusStorage,
45};
46
47use aleo_std::StorageMode;
48use anyhow::Result;
49use core::future::Future;
50#[cfg(feature = "locktick")]
51use locktick::parking_lot::Mutex;
52#[cfg(not(feature = "locktick"))]
53use parking_lot::Mutex;
54use std::{
55 net::SocketAddr,
56 sync::{Arc, atomic::AtomicBool},
57 time::Duration,
58};
59use tokio::task::JoinHandle;
60
61/// A validator is a full node, capable of validating blocks.
62#[derive(Clone)]
63pub struct Validator<N: Network, C: ConsensusStorage<N>> {
64 /// The ledger of the node.
65 ledger: Ledger<N, C>,
66 /// The consensus module of the node.
67 consensus: Consensus<N>,
68 /// The router of the node.
69 router: Router<N>,
70 /// The REST server of the node.
71 rest: Option<Rest<N, C, Self>>,
72 /// The block synchronization logic (used in the Router impl).
73 sync: Arc<BlockSync<N>>,
74 /// The spawned handles.
75 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
76 /// The shutdown signal.
77 shutdown: Arc<AtomicBool>,
78 /// Keeps track of sending pings.
79 ping: Arc<Ping<N>>,
80}
81
82impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
83 /// Initializes a new validator node.
84 pub async fn new(
85 node_ip: SocketAddr,
86 bft_ip: Option<SocketAddr>,
87 rest_ip: Option<SocketAddr>,
88 rest_rps: u32,
89 account: Account<N>,
90 trusted_peers: &[SocketAddr],
91 trusted_validators: &[SocketAddr],
92 genesis: Block<N>,
93 cdn: Option<http::Uri>,
94 storage_mode: StorageMode,
95 trusted_peers_only: bool,
96 dev_txs: bool,
97 dev: Option<u16>,
98 shutdown: Arc<AtomicBool>,
99 ) -> Result<Self> {
100 // Initialize the signal handler.
101 let signal_node = Self::handle_signals(shutdown.clone());
102
103 // Initialize the ledger.
104 let ledger = Ledger::load(genesis, storage_mode.clone())?;
105
106 // Initialize the ledger service.
107 let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone()));
108
109 // Initialize the node router.
110 let router = Router::new(
111 node_ip,
112 NodeType::Validator,
113 account.clone(),
114 ledger_service.clone(),
115 trusted_peers,
116 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
117 trusted_peers_only,
118 storage_mode.clone(),
119 dev.is_some(),
120 )
121 .await?;
122
123 // Initialize the block synchronization logic.
124 let sync = Arc::new(BlockSync::new(ledger_service.clone()));
125 let locators = sync.get_block_locators()?;
126 let ping = Arc::new(Ping::new(router.clone(), locators));
127
128 // Initialize the consensus layer.
129 let consensus = Consensus::new(
130 account.clone(),
131 ledger_service.clone(),
132 sync.clone(),
133 bft_ip,
134 trusted_validators,
135 trusted_peers_only,
136 storage_mode.clone(),
137 ping.clone(),
138 dev,
139 )
140 .await?;
141
142 // Initialize the node.
143 let mut node = Self {
144 ledger: ledger.clone(),
145 consensus: consensus.clone(),
146 router,
147 rest: None,
148 sync: sync.clone(),
149 ping,
150 handles: Default::default(),
151 shutdown: shutdown.clone(),
152 };
153
154 // Perform sync with CDN (if enabled).
155 let cdn_sync = cdn.map(|base_url| Arc::new(CdnBlockSync::new(base_url, ledger.clone(), shutdown)));
156
157 // Initialize the transaction pool.
158 node.initialize_transaction_pool(dev, dev_txs)?;
159
160 // Initialize the REST server.
161 if let Some(rest_ip) = rest_ip {
162 node.rest = Some(
163 Rest::start(
164 rest_ip,
165 rest_rps,
166 Some(consensus),
167 ledger.clone(),
168 Arc::new(node.clone()),
169 cdn_sync.clone(),
170 sync,
171 )
172 .await?,
173 );
174 }
175
176 // Set up everything else after CDN sync is done.
177 if let Some(cdn_sync) = cdn_sync {
178 if let Err(error) = cdn_sync.wait().await {
179 crate::log_clean_error(&storage_mode);
180 node.shut_down().await;
181 return Err(error);
182 }
183 }
184
185 // Initialize the routing.
186 node.initialize_routing().await;
187 // Initialize the notification message loop.
188 node.handles.lock().push(crate::start_notification_message_loop());
189 // Pass the node to the signal handler.
190 let _ = signal_node.set(node.clone());
191 // Return the node.
192 Ok(node)
193 }
194
195 /// Returns the ledger.
196 pub fn ledger(&self) -> &Ledger<N, C> {
197 &self.ledger
198 }
199
200 /// Returns the REST server.
201 pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
202 &self.rest
203 }
204
205 /// Returns the router.
206 pub fn router(&self) -> &Router<N> {
207 &self.router
208 }
209
210 // /// Initialize the transaction pool.
211 // fn initialize_transaction_pool(&self, dev: Option<u16>) -> Result<()> {
212 // use snarkvm::{
213 // console::{
214 // account::ViewKey,
215 // program::{Identifier, Literal, Plaintext, ProgramID, Record, Value},
216 // types::U64,
217 // },
218 // ledger::block::transition::Output,
219 // };
220 // use std::str::FromStr;
221 //
222 // // Initialize the locator.
223 // let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("split")?);
224 // // Initialize the record name.
225 // let record_name = Identifier::from_str("credits")?;
226 //
227 // /// Searches the genesis block for the mint record.
228 // fn search_genesis_for_mint<N: Network>(
229 // block: Block<N>,
230 // view_key: &ViewKey<N>,
231 // ) -> Option<Record<N, Plaintext<N>>> {
232 // for transition in block.transitions().filter(|t| t.is_mint()) {
233 // if let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[0] {
234 // if ciphertext.is_owner(view_key) {
235 // match ciphertext.decrypt(view_key) {
236 // Ok(record) => return Some(record),
237 // Err(error) => {
238 // error!("Failed to decrypt the mint output record - {error}");
239 // return None;
240 // }
241 // }
242 // }
243 // }
244 // }
245 // None
246 // }
247 //
248 // /// Searches the block for the split record.
249 // fn search_block_for_split<N: Network>(
250 // block: Block<N>,
251 // view_key: &ViewKey<N>,
252 // ) -> Option<Record<N, Plaintext<N>>> {
253 // let mut found = None;
254 // // TODO (howardwu): Switch to the iterator when DoubleEndedIterator is supported.
255 // // block.transitions().rev().for_each(|t| {
256 // let splits = block.transitions().filter(|t| t.is_split()).collect::<Vec<_>>();
257 // splits.iter().rev().for_each(|t| {
258 // if found.is_some() {
259 // return;
260 // }
261 // let Output::Record(_, _, Some(ciphertext)) = &t.outputs()[1] else {
262 // error!("Failed to find the split output record");
263 // return;
264 // };
265 // if ciphertext.is_owner(view_key) {
266 // match ciphertext.decrypt(view_key) {
267 // Ok(record) => found = Some(record),
268 // Err(error) => {
269 // error!("Failed to decrypt the split output record - {error}");
270 // }
271 // }
272 // }
273 // });
274 // found
275 // }
276 //
277 // let self_ = self.clone();
278 // self.spawn(async move {
279 // // Retrieve the view key.
280 // let view_key = self_.view_key();
281 // // Initialize the record.
282 // let mut record = {
283 // let mut found = None;
284 // let mut height = self_.ledger.latest_height();
285 // while found.is_none() && height > 0 {
286 // // Retrieve the block.
287 // let Ok(block) = self_.ledger.get_block(height) else {
288 // error!("Failed to get block at height {}", height);
289 // break;
290 // };
291 // // Search for the latest split record.
292 // if let Some(record) = search_block_for_split(block, view_key) {
293 // found = Some(record);
294 // }
295 // // Decrement the height.
296 // height = height.saturating_sub(1);
297 // }
298 // match found {
299 // Some(record) => record,
300 // None => {
301 // // Retrieve the genesis block.
302 // let Ok(block) = self_.ledger.get_block(0) else {
303 // error!("Failed to get the genesis block");
304 // return;
305 // };
306 // // Search the genesis block for the mint record.
307 // if let Some(record) = search_genesis_for_mint(block, view_key) {
308 // found = Some(record);
309 // }
310 // found.expect("Failed to find the split output record")
311 // }
312 // }
313 // };
314 // info!("Starting transaction pool...");
315 // // Start the transaction loop.
316 // loop {
317 // tokio::time::sleep(Duration::from_secs(1)).await;
318 // // If the node is running in development mode, only generate if you are allowed.
319 // if let Some(dev) = dev {
320 // if dev != 0 {
321 // continue;
322 // }
323 // }
324 //
325 // // Prepare the inputs.
326 // let inputs = [Value::from(record.clone()), Value::from(Literal::U64(U64::new(1)))].into_iter();
327 // // Execute the transaction.
328 // let transaction = match self_.ledger.vm().execute(
329 // self_.private_key(),
330 // locator,
331 // inputs,
332 // None,
333 // None,
334 // &mut rand::thread_rng(),
335 // ) {
336 // Ok(transaction) => transaction,
337 // Err(error) => {
338 // error!("Transaction pool encountered an execution error - {error}");
339 // continue;
340 // }
341 // };
342 // // Retrieve the transition.
343 // let Some(transition) = transaction.transitions().next() else {
344 // error!("Transaction pool encountered a missing transition");
345 // continue;
346 // };
347 // // Retrieve the second output.
348 // let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[1] else {
349 // error!("Transaction pool encountered a missing output");
350 // continue;
351 // };
352 // // Save the second output record.
353 // let Ok(next_record) = ciphertext.decrypt(view_key) else {
354 // error!("Transaction pool encountered a decryption error");
355 // continue;
356 // };
357 // // Broadcast the transaction.
358 // if self_
359 // .unconfirmed_transaction(
360 // self_.router.local_ip(),
361 // UnconfirmedTransaction::from(transaction.clone()),
362 // transaction.clone(),
363 // )
364 // .await
365 // {
366 // info!("Transaction pool broadcasted the transaction");
367 // let commitment = next_record.to_commitment(&locator.0, &record_name).unwrap();
368 // while !self_.ledger.contains_commitment(&commitment).unwrap_or(false) {
369 // tokio::time::sleep(Duration::from_secs(1)).await;
370 // }
371 // info!("Transaction accepted by the ledger");
372 // }
373 // // Save the record.
374 // record = next_record;
375 // }
376 // });
377 // Ok(())
378 // }
379
380 /// Initializes the transaction pool (if in development mode).
381 ///
382 /// Spawns a background task that periodically issues transactions to the network.
383 fn initialize_transaction_pool(&self, dev: Option<u16>, dev_txs: bool) -> Result<()> {
384 use snarkvm::console::{
385 program::{Identifier, Literal, ProgramID, Value},
386 types::U64,
387 };
388 use std::str::FromStr;
389
390 // Initialize the locator.
391 let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("transfer_public")?);
392
393 // Determine whether to start the loop.
394 match dev {
395 // If the node is running in development mode, only generate if you are allowed.
396 Some(id) => {
397 // If the node is not the first node, or if we should not create dev traffic, do not start the loop.
398 if id != 0 || !dev_txs {
399 return Ok(());
400 }
401 }
402 // If the node is not running in development mode, do not generate dev traffic.
403 _ => return Ok(()),
404 }
405
406 let self_ = self.clone();
407 self.spawn(async move {
408 tokio::time::sleep(Duration::from_secs(3)).await;
409 info!("Starting transaction pool...");
410
411 // Start the transaction loop.
412 loop {
413 tokio::time::sleep(Duration::from_millis(500)).await;
414
415 // Prepare the inputs.
416 let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))];
417 // Execute the transaction.
418 let self__ = self_.clone();
419 let transaction = match spawn_blocking!(self__.ledger.vm().execute(
420 self__.private_key(),
421 locator,
422 inputs.into_iter(),
423 None,
424 10_000,
425 None,
426 &mut rand::thread_rng(),
427 )) {
428 Ok(transaction) => transaction,
429 Err(error) => {
430 error!("Transaction pool encountered an execution error - {error}");
431 continue;
432 }
433 };
434 // Broadcast the transaction.
435 if self_
436 .unconfirmed_transaction(
437 self_.router.local_ip(),
438 UnconfirmedTransaction::from(transaction.clone()),
439 transaction.clone(),
440 )
441 .await
442 {
443 info!("Transaction pool broadcasted the transaction");
444 }
445 }
446 });
447 Ok(())
448 }
449
450 /// Spawns a task with the given future; it should only be used for long-running tasks.
451 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
452 self.handles.lock().push(tokio::spawn(future));
453 }
454}
455
456#[async_trait]
457impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
458 /// Shuts down the node.
459 async fn shut_down(&self) {
460 info!("Shutting down...");
461
462 // Shut down the node.
463 trace!("Shutting down the node...");
464 self.shutdown.store(true, std::sync::atomic::Ordering::Release);
465
466 // Abort the tasks.
467 trace!("Shutting down the validator...");
468 self.handles.lock().iter().for_each(|handle| handle.abort());
469
470 // Shut down the router.
471 self.router.shut_down().await;
472
473 // Shut down consensus.
474 trace!("Shutting down consensus...");
475 self.consensus.shut_down().await;
476
477 info!("Node has shut down.");
478 }
479}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484 use snarkvm::prelude::{
485 MainnetV0,
486 VM,
487 store::{ConsensusStore, helpers::memory::ConsensusMemory},
488 };
489
490 use anyhow::bail;
491 use rand::SeedableRng;
492 use rand_chacha::ChaChaRng;
493 use std::str::FromStr;
494
495 type CurrentNetwork = MainnetV0;
496
497 /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test.
498 #[ignore]
499 #[tokio::test]
500 async fn test_profiler() -> Result<()> {
501 // Specify the node attributes.
502 let node = SocketAddr::from_str("0.0.0.0:4130").unwrap();
503 let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
504 let storage_mode = StorageMode::Development(0);
505 let dev_txs = true;
506
507 // Initialize an (insecure) fixed RNG.
508 let mut rng = ChaChaRng::seed_from_u64(1234567890u64);
509 // Initialize the account.
510 let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
511 // Initialize a new VM.
512 let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(
513 StorageMode::new_test(None),
514 )?)?;
515 // Initialize the genesis block.
516 let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
517
518 println!("Initializing validator node...");
519
520 let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
521 node,
522 None,
523 Some(rest),
524 10,
525 account,
526 &[],
527 &[],
528 genesis,
529 None,
530 storage_mode,
531 false,
532 dev_txs,
533 None,
534 Default::default(),
535 )
536 .await
537 .unwrap();
538
539 println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
540
541 bail!("\n\nRemember to #[ignore] this test!\n\n")
542 }
543}