snarkos_node/validator/mod.rs
1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19use snarkos_account::Account;
20use snarkos_node_bft::{helpers::init_primary_channels, ledger_service::CoreLedgerService, spawn_blocking};
21use snarkos_node_consensus::Consensus;
22use snarkos_node_rest::Rest;
23use snarkos_node_router::{
24 Heartbeat,
25 Inbound,
26 Outbound,
27 Router,
28 Routing,
29 messages::{NodeType, PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction},
30};
31use snarkos_node_sync::{BlockSync, BlockSyncMode};
32use snarkos_node_tcp::{
33 P2P,
34 protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
35};
36use snarkvm::prelude::{
37 Ledger,
38 Network,
39 block::{Block, Header},
40 puzzle::Solution,
41 store::ConsensusStorage,
42};
43
44use aleo_std::StorageMode;
45use anyhow::Result;
46use core::future::Future;
47use parking_lot::Mutex;
48use std::{
49 net::SocketAddr,
50 sync::{Arc, atomic::AtomicBool},
51 time::Duration,
52};
53use tokio::task::JoinHandle;
54
55/// A validator is a full node, capable of validating blocks.
56#[derive(Clone)]
57pub struct Validator<N: Network, C: ConsensusStorage<N>> {
58 /// The ledger of the node.
59 ledger: Ledger<N, C>,
60 /// The consensus module of the node.
61 consensus: Consensus<N>,
62 /// The router of the node.
63 router: Router<N>,
64 /// The REST server of the node.
65 rest: Option<Rest<N, C, Self>>,
66 /// The sync module.
67 sync: BlockSync<N>,
68 /// The spawned handles.
69 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
70 /// The shutdown signal.
71 shutdown: Arc<AtomicBool>,
72}
73
74impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
75 /// Initializes a new validator node.
76 pub async fn new(
77 node_ip: SocketAddr,
78 bft_ip: Option<SocketAddr>,
79 rest_ip: Option<SocketAddr>,
80 rest_rps: u32,
81 account: Account<N>,
82 trusted_peers: &[SocketAddr],
83 trusted_validators: &[SocketAddr],
84 genesis: Block<N>,
85 cdn: Option<String>,
86 storage_mode: StorageMode,
87 allow_external_peers: bool,
88 dev_txs: bool,
89 shutdown: Arc<AtomicBool>,
90 ) -> Result<Self> {
91 // Initialize the signal handler.
92 let signal_node = Self::handle_signals(shutdown.clone());
93
94 // Initialize the ledger.
95 let ledger = Ledger::load(genesis, storage_mode.clone())?;
96
97 // Initialize the CDN.
98 if let Some(base_url) = cdn {
99 // Sync the ledger with the CDN.
100 if let Err((_, error)) =
101 snarkos_node_cdn::sync_ledger_with_cdn(&base_url, ledger.clone(), shutdown.clone()).await
102 {
103 crate::log_clean_error(&storage_mode);
104 return Err(error);
105 }
106 }
107
108 // Initialize the ledger service.
109 let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone()));
110
111 // Initialize the consensus.
112 let mut consensus =
113 Consensus::new(account.clone(), ledger_service.clone(), bft_ip, trusted_validators, storage_mode.clone())?;
114 // Initialize the primary channels.
115 let (primary_sender, primary_receiver) = init_primary_channels::<N>();
116 // Start the consensus.
117 consensus.run(primary_sender, primary_receiver).await?;
118 // Determine if the validator should rotate external peers.
119 let rotate_external_peers = false;
120
121 // Initialize the node router.
122 let router = Router::new(
123 node_ip,
124 NodeType::Validator,
125 account,
126 trusted_peers,
127 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
128 rotate_external_peers,
129 allow_external_peers,
130 matches!(storage_mode, StorageMode::Development(_)),
131 )
132 .await?;
133
134 // Initialize the sync module.
135 let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service, router.tcp().clone());
136
137 // Initialize the node.
138 let mut node = Self {
139 ledger: ledger.clone(),
140 consensus: consensus.clone(),
141 router,
142 rest: None,
143 sync,
144 handles: Default::default(),
145 shutdown,
146 };
147 // Initialize the transaction pool.
148 node.initialize_transaction_pool(storage_mode, dev_txs)?;
149
150 // Initialize the REST server.
151 if let Some(rest_ip) = rest_ip {
152 node.rest =
153 Some(Rest::start(rest_ip, rest_rps, Some(consensus), ledger.clone(), Arc::new(node.clone())).await?);
154 }
155 // Initialize the routing.
156 node.initialize_routing().await;
157 // Initialize the notification message loop.
158 node.handles.lock().push(crate::start_notification_message_loop());
159 // Pass the node to the signal handler.
160 let _ = signal_node.set(node.clone());
161 // Return the node.
162 Ok(node)
163 }
164
165 /// Returns the ledger.
166 pub fn ledger(&self) -> &Ledger<N, C> {
167 &self.ledger
168 }
169
170 /// Returns the REST server.
171 pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
172 &self.rest
173 }
174}
175
176impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
177 // /// Initialize the transaction pool.
178 // fn initialize_transaction_pool(&self, dev: Option<u16>) -> Result<()> {
179 // use snarkvm::{
180 // console::{
181 // account::ViewKey,
182 // program::{Identifier, Literal, Plaintext, ProgramID, Record, Value},
183 // types::U64,
184 // },
185 // ledger::block::transition::Output,
186 // };
187 // use std::str::FromStr;
188 //
189 // // Initialize the locator.
190 // let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("split")?);
191 // // Initialize the record name.
192 // let record_name = Identifier::from_str("credits")?;
193 //
194 // /// Searches the genesis block for the mint record.
195 // fn search_genesis_for_mint<N: Network>(
196 // block: Block<N>,
197 // view_key: &ViewKey<N>,
198 // ) -> Option<Record<N, Plaintext<N>>> {
199 // for transition in block.transitions().filter(|t| t.is_mint()) {
200 // if let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[0] {
201 // if ciphertext.is_owner(view_key) {
202 // match ciphertext.decrypt(view_key) {
203 // Ok(record) => return Some(record),
204 // Err(error) => {
205 // error!("Failed to decrypt the mint output record - {error}");
206 // return None;
207 // }
208 // }
209 // }
210 // }
211 // }
212 // None
213 // }
214 //
215 // /// Searches the block for the split record.
216 // fn search_block_for_split<N: Network>(
217 // block: Block<N>,
218 // view_key: &ViewKey<N>,
219 // ) -> Option<Record<N, Plaintext<N>>> {
220 // let mut found = None;
221 // // TODO (howardwu): Switch to the iterator when DoubleEndedIterator is supported.
222 // // block.transitions().rev().for_each(|t| {
223 // let splits = block.transitions().filter(|t| t.is_split()).collect::<Vec<_>>();
224 // splits.iter().rev().for_each(|t| {
225 // if found.is_some() {
226 // return;
227 // }
228 // let Output::Record(_, _, Some(ciphertext)) = &t.outputs()[1] else {
229 // error!("Failed to find the split output record");
230 // return;
231 // };
232 // if ciphertext.is_owner(view_key) {
233 // match ciphertext.decrypt(view_key) {
234 // Ok(record) => found = Some(record),
235 // Err(error) => {
236 // error!("Failed to decrypt the split output record - {error}");
237 // }
238 // }
239 // }
240 // });
241 // found
242 // }
243 //
244 // let self_ = self.clone();
245 // self.spawn(async move {
246 // // Retrieve the view key.
247 // let view_key = self_.view_key();
248 // // Initialize the record.
249 // let mut record = {
250 // let mut found = None;
251 // let mut height = self_.ledger.latest_height();
252 // while found.is_none() && height > 0 {
253 // // Retrieve the block.
254 // let Ok(block) = self_.ledger.get_block(height) else {
255 // error!("Failed to get block at height {}", height);
256 // break;
257 // };
258 // // Search for the latest split record.
259 // if let Some(record) = search_block_for_split(block, view_key) {
260 // found = Some(record);
261 // }
262 // // Decrement the height.
263 // height = height.saturating_sub(1);
264 // }
265 // match found {
266 // Some(record) => record,
267 // None => {
268 // // Retrieve the genesis block.
269 // let Ok(block) = self_.ledger.get_block(0) else {
270 // error!("Failed to get the genesis block");
271 // return;
272 // };
273 // // Search the genesis block for the mint record.
274 // if let Some(record) = search_genesis_for_mint(block, view_key) {
275 // found = Some(record);
276 // }
277 // found.expect("Failed to find the split output record")
278 // }
279 // }
280 // };
281 // info!("Starting transaction pool...");
282 // // Start the transaction loop.
283 // loop {
284 // tokio::time::sleep(Duration::from_secs(1)).await;
285 // // If the node is running in development mode, only generate if you are allowed.
286 // if let Some(dev) = dev {
287 // if dev != 0 {
288 // continue;
289 // }
290 // }
291 //
292 // // Prepare the inputs.
293 // let inputs = [Value::from(record.clone()), Value::from(Literal::U64(U64::new(1)))].into_iter();
294 // // Execute the transaction.
295 // let transaction = match self_.ledger.vm().execute(
296 // self_.private_key(),
297 // locator,
298 // inputs,
299 // None,
300 // None,
301 // &mut rand::thread_rng(),
302 // ) {
303 // Ok(transaction) => transaction,
304 // Err(error) => {
305 // error!("Transaction pool encountered an execution error - {error}");
306 // continue;
307 // }
308 // };
309 // // Retrieve the transition.
310 // let Some(transition) = transaction.transitions().next() else {
311 // error!("Transaction pool encountered a missing transition");
312 // continue;
313 // };
314 // // Retrieve the second output.
315 // let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[1] else {
316 // error!("Transaction pool encountered a missing output");
317 // continue;
318 // };
319 // // Save the second output record.
320 // let Ok(next_record) = ciphertext.decrypt(view_key) else {
321 // error!("Transaction pool encountered a decryption error");
322 // continue;
323 // };
324 // // Broadcast the transaction.
325 // if self_
326 // .unconfirmed_transaction(
327 // self_.router.local_ip(),
328 // UnconfirmedTransaction::from(transaction.clone()),
329 // transaction.clone(),
330 // )
331 // .await
332 // {
333 // info!("Transaction pool broadcasted the transaction");
334 // let commitment = next_record.to_commitment(&locator.0, &record_name).unwrap();
335 // while !self_.ledger.contains_commitment(&commitment).unwrap_or(false) {
336 // tokio::time::sleep(Duration::from_secs(1)).await;
337 // }
338 // info!("Transaction accepted by the ledger");
339 // }
340 // // Save the record.
341 // record = next_record;
342 // }
343 // });
344 // Ok(())
345 // }
346
347 /// Initialize the transaction pool.
348 fn initialize_transaction_pool(&self, storage_mode: StorageMode, dev_txs: bool) -> Result<()> {
349 use snarkvm::console::{
350 program::{Identifier, Literal, ProgramID, Value},
351 types::U64,
352 };
353 use std::str::FromStr;
354
355 // Initialize the locator.
356 let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("transfer_public")?);
357
358 // Determine whether to start the loop.
359 match storage_mode {
360 // If the node is running in development mode, only generate if you are allowed.
361 StorageMode::Development(id) => {
362 // If the node is not the first node, or if we should not create dev traffic, do not start the loop.
363 if id != 0 || !dev_txs {
364 return Ok(());
365 }
366 }
367 // If the node is not running in development mode, do not generate dev traffic.
368 _ => return Ok(()),
369 }
370
371 let self_ = self.clone();
372 self.spawn(async move {
373 tokio::time::sleep(Duration::from_secs(3)).await;
374 info!("Starting transaction pool...");
375
376 // Start the transaction loop.
377 loop {
378 tokio::time::sleep(Duration::from_millis(500)).await;
379
380 // Prepare the inputs.
381 let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))];
382 // Execute the transaction.
383 let self__ = self_.clone();
384 let transaction = match spawn_blocking!(self__.ledger.vm().execute(
385 self__.private_key(),
386 locator,
387 inputs.into_iter(),
388 None,
389 10_000,
390 None,
391 &mut rand::thread_rng(),
392 )) {
393 Ok(transaction) => transaction,
394 Err(error) => {
395 error!("Transaction pool encountered an execution error - {error}");
396 continue;
397 }
398 };
399 // Broadcast the transaction.
400 if self_
401 .unconfirmed_transaction(
402 self_.router.local_ip(),
403 UnconfirmedTransaction::from(transaction.clone()),
404 transaction.clone(),
405 )
406 .await
407 {
408 info!("Transaction pool broadcasted the transaction");
409 }
410 }
411 });
412 Ok(())
413 }
414
415 /// Spawns a task with the given future; it should only be used for long-running tasks.
416 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
417 self.handles.lock().push(tokio::spawn(future));
418 }
419}
420
421#[async_trait]
422impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
423 /// Shuts down the node.
424 async fn shut_down(&self) {
425 info!("Shutting down...");
426
427 // Shut down the node.
428 trace!("Shutting down the node...");
429 self.shutdown.store(true, std::sync::atomic::Ordering::Release);
430
431 // Abort the tasks.
432 trace!("Shutting down the validator...");
433 self.handles.lock().iter().for_each(|handle| handle.abort());
434
435 // Shut down the router.
436 self.router.shut_down().await;
437
438 // Shut down consensus.
439 trace!("Shutting down consensus...");
440 self.consensus.shut_down().await;
441
442 info!("Node has shut down.");
443 }
444}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449 use snarkvm::prelude::{
450 MainnetV0,
451 VM,
452 store::{ConsensusStore, helpers::memory::ConsensusMemory},
453 };
454
455 use anyhow::bail;
456 use rand::SeedableRng;
457 use rand_chacha::ChaChaRng;
458 use std::str::FromStr;
459
460 type CurrentNetwork = MainnetV0;
461
462 /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test.
463 #[ignore]
464 #[tokio::test]
465 async fn test_profiler() -> Result<()> {
466 // Specify the node attributes.
467 let node = SocketAddr::from_str("0.0.0.0:4130").unwrap();
468 let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
469 let storage_mode = StorageMode::Development(0);
470 let dev_txs = true;
471
472 // Initialize an (insecure) fixed RNG.
473 let mut rng = ChaChaRng::seed_from_u64(1234567890u64);
474 // Initialize the account.
475 let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
476 // Initialize a new VM.
477 let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(None)?)?;
478 // Initialize the genesis block.
479 let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
480
481 println!("Initializing validator node...");
482
483 let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
484 node,
485 None,
486 Some(rest),
487 10,
488 account,
489 &[],
490 &[],
491 genesis,
492 None,
493 storage_mode,
494 false,
495 dev_txs,
496 Default::default(),
497 )
498 .await
499 .unwrap();
500
501 println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
502
503 bail!("\n\nRemember to #[ignore] this test!\n\n")
504 }
505}