snarkos_node/validator/mod.rs
1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19
20use snarkos_account::Account;
21use snarkos_node_bft::{ledger_service::CoreLedgerService, spawn_blocking};
22use snarkos_node_cdn::CdnBlockSync;
23use snarkos_node_consensus::Consensus;
24use snarkos_node_network::{ConnectionMode, NodeType, PeerPoolHandling};
25use snarkos_node_rest::Rest;
26use snarkos_node_router::{
27 Heartbeat,
28 Inbound,
29 Outbound,
30 Router,
31 Routing,
32 messages::{PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction},
33};
34use snarkos_node_sync::{BlockSync, Ping};
35use snarkos_node_tcp::{
36 P2P,
37 protocols::{Disconnect, Handshake, OnConnect, Reading},
38};
39use snarkos_utilities::{NodeDataDir, SignalHandler};
40
41use snarkvm::prelude::{
42 Ledger,
43 Network,
44 block::{Block, Header},
45 puzzle::Solution,
46 store::ConsensusStorage,
47};
48
49use aleo_std::StorageMode;
50use anyhow::{Context, Result};
51use core::future::Future;
52#[cfg(feature = "locktick")]
53use locktick::parking_lot::Mutex;
54#[cfg(not(feature = "locktick"))]
55use parking_lot::Mutex;
56use std::{net::SocketAddr, sync::Arc, time::Duration};
57use tokio::task::JoinHandle;
58
59/// A validator is a full node, capable of validating blocks.
60#[derive(Clone)]
61pub struct Validator<N: Network, C: ConsensusStorage<N>> {
62 /// The ledger of the node.
63 ledger: Ledger<N, C>,
64 /// The consensus module of the node.
65 consensus: Consensus<N>,
66 /// The router of the node.
67 router: Router<N>,
68 /// The REST server of the node.
69 rest: Option<Rest<N, C, Self>>,
70 /// The block synchronization logic (used in the Router impl).
71 sync: Arc<BlockSync<N>>,
72 /// The spawned handles.
73 pub(crate) handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
74 /// Keeps track of sending pings.
75 ping: Arc<Ping<N>>,
76}
77
78impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
79 /// Initializes a new validator node.
80 pub async fn new(
81 node_ip: SocketAddr,
82 bft_ip: Option<SocketAddr>,
83 rest_ip: Option<SocketAddr>,
84 rest_rps: u32,
85 account: Account<N>,
86 trusted_peers: &[SocketAddr],
87 trusted_validators: &[SocketAddr],
88 genesis: Block<N>,
89 cdn: Option<http::Uri>,
90 storage_mode: StorageMode,
91 node_data_dir: NodeDataDir,
92 trusted_peers_only: bool,
93 dev_txs: bool,
94 dev: Option<u16>,
95 _slipstream_configs: &[std::path::PathBuf],
96 #[cfg(feature = "test_network")] dev_num_validators_for_committee_hotswap: Option<u16>,
97 #[cfg(not(feature = "test_network"))] _dev_num_validators_for_committee_hotswap: Option<u16>,
98 signal_handler: Arc<SignalHandler>,
99 ) -> Result<Self> {
100 // Initialize the ledger.
101 let ledger = {
102 let storage_mode = storage_mode.clone();
103 let genesis = genesis.clone();
104
105 spawn_blocking!(Ledger::<N, C>::load(genesis, storage_mode))
106 }
107 .with_context(|| "Failed to initialize the ledger")?;
108
109 // Initialize the Slipstream plugin manager (if any config files were provided).
110 #[cfg(feature = "slipstream-plugins")]
111 if !_slipstream_configs.is_empty() {
112 let manager =
113 snarkvm::slipstream_plugin_manager::SlipstreamPluginManager::from_config_files(_slipstream_configs)
114 .context("Failed to initialize Slipstream plugin manager")?;
115 ledger.vm().finalize_store().set_slipstream_plugin_manager(manager);
116 let num_plugins = _slipstream_configs.len();
117 tracing::info!(target: "slipstream", "Slipstream plugin manager registered ({num_plugins} plugin(s))");
118 }
119
120 // Initialize the ledger service.
121 #[cfg(not(feature = "test_network"))]
122 let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), signal_handler.clone()));
123 #[cfg(feature = "test_network")]
124 // Initialize the ledger service with a deterministic dev committee.
125 let ledger_service = if let Some(dev_num_validators) = dev_num_validators_for_committee_hotswap {
126 Arc::new(CoreLedgerService::new_dev(
127 ledger.clone(),
128 signal_handler.clone(),
129 Some((node_data_dir.clone(), dev_num_validators)),
130 ))
131 // Initialize the ledger service without a deterministic dev committee.
132 } else {
133 Arc::new(CoreLedgerService::new_dev(ledger.clone(), signal_handler.clone(), None))
134 };
135
136 // Initialize the node router.
137 let router = Router::new(
138 node_ip,
139 NodeType::Validator,
140 account.clone(),
141 ledger_service.clone(),
142 trusted_peers,
143 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
144 trusted_peers_only,
145 node_data_dir.clone(),
146 dev.is_some(),
147 )
148 .await?;
149
150 // Initialize the block synchronization logic.
151 let sync = Arc::new(BlockSync::new(ledger_service.clone(), ConnectionMode::Gateway));
152 let locators = sync.get_block_locators()?;
153 let ping = Arc::new(Ping::new(router.clone(), locators));
154
155 // Initialize the consensus layer.
156 let consensus = Consensus::new(
157 account.clone(),
158 ledger_service.clone(),
159 sync.clone(),
160 bft_ip,
161 trusted_validators,
162 trusted_peers_only,
163 storage_mode.clone(),
164 node_data_dir.clone(),
165 ping.clone(),
166 dev,
167 )
168 .await?;
169
170 // Initialize the node.
171 let mut node = Self {
172 ledger: ledger.clone(),
173 consensus: consensus.clone(),
174 router,
175 rest: None,
176 sync: sync.clone(),
177 ping,
178 handles: Default::default(),
179 };
180
181 // Perform sync with CDN (if enabled).
182 let cdn_sync = cdn.map(|base_url| Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler)));
183
184 // Initialize the transaction pool.
185 node.initialize_transaction_pool(dev, dev_txs)?;
186
187 // Initialize the REST server.
188 if let Some(rest_ip) = rest_ip {
189 node.rest = Some(
190 Rest::start(
191 rest_ip,
192 rest_rps,
193 Some(consensus),
194 ledger.clone(),
195 Arc::new(node.clone()),
196 cdn_sync.clone(),
197 sync,
198 )
199 .await?,
200 );
201 }
202
203 // Set up everything else after CDN sync is done.
204 if let Some(cdn_sync) = cdn_sync {
205 if let Err(error) = cdn_sync.wait().await.with_context(|| "Failed to synchronize from the CDN") {
206 crate::log_clean_error(&storage_mode);
207 node.shut_down().await;
208 return Err(error);
209 }
210 }
211
212 // Initialize the routing.
213 node.initialize_routing().await;
214 // Initialize the notification message loop.
215 node.handles.lock().push(crate::start_notification_message_loop());
216
217 // Return the node.
218 Ok(node)
219 }
220
221 /// Returns the ledger.
222 pub fn ledger(&self) -> &Ledger<N, C> {
223 &self.ledger
224 }
225
226 /// Returns the REST server.
227 pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
228 &self.rest
229 }
230
231 /// Returns the router.
232 pub fn router(&self) -> &Router<N> {
233 &self.router
234 }
235
236 // /// Initialize the transaction pool.
237 // fn initialize_transaction_pool(&self, dev: Option<u16>) -> Result<()> {
238 // use snarkvm::{
239 // console::{
240 // account::ViewKey,
241 // program::{Identifier, Literal, Plaintext, ProgramID, Record, Value},
242 // types::U64,
243 // },
244 // ledger::block::transition::Output,
245 // };
246 // use std::str::FromStr;
247 //
248 // // Initialize the locator.
249 // let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("split")?);
250 // // Initialize the record name.
251 // let record_name = Identifier::from_str("credits")?;
252 //
253 // /// Searches the genesis block for the mint record.
254 // fn search_genesis_for_mint<N: Network>(
255 // block: Block<N>,
256 // view_key: &ViewKey<N>,
257 // ) -> Option<Record<N, Plaintext<N>>> {
258 // for transition in block.transitions().filter(|t| t.is_mint()) {
259 // if let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[0] {
260 // if ciphertext.is_owner(view_key) {
261 // match ciphertext.decrypt(view_key) {
262 // Ok(record) => return Some(record),
263 // Err(error) => {
264 // error!("Failed to decrypt the mint output record - {error}");
265 // return None;
266 // }
267 // }
268 // }
269 // }
270 // }
271 // None
272 // }
273 //
274 // /// Searches the block for the split record.
275 // fn search_block_for_split<N: Network>(
276 // block: Block<N>,
277 // view_key: &ViewKey<N>,
278 // ) -> Option<Record<N, Plaintext<N>>> {
279 // let mut found = None;
280 // // TODO (howardwu): Switch to the iterator when DoubleEndedIterator is supported.
281 // // block.transitions().rev().for_each(|t| {
282 // let splits = block.transitions().filter(|t| t.is_split()).collect::<Vec<_>>();
283 // splits.iter().rev().for_each(|t| {
284 // if found.is_some() {
285 // return;
286 // }
287 // let Output::Record(_, _, Some(ciphertext)) = &t.outputs()[1] else {
288 // error!("Failed to find the split output record");
289 // return;
290 // };
291 // if ciphertext.is_owner(view_key) {
292 // match ciphertext.decrypt(view_key) {
293 // Ok(record) => found = Some(record),
294 // Err(error) => {
295 // error!("Failed to decrypt the split output record - {error}");
296 // }
297 // }
298 // }
299 // });
300 // found
301 // }
302 //
303 // let self_ = self.clone();
304 // self.spawn(async move {
305 // // Retrieve the view key.
306 // let view_key = self_.view_key();
307 // // Initialize the record.
308 // let mut record = {
309 // let mut found = None;
310 // let mut height = self_.ledger.latest_height();
311 // while found.is_none() && height > 0 {
312 // // Retrieve the block.
313 // let Ok(block) = self_.ledger.get_block(height) else {
314 // error!("Failed to get block at height {}", height);
315 // break;
316 // };
317 // // Search for the latest split record.
318 // if let Some(record) = search_block_for_split(block, view_key) {
319 // found = Some(record);
320 // }
321 // // Decrement the height.
322 // height = height.saturating_sub(1);
323 // }
324 // match found {
325 // Some(record) => record,
326 // None => {
327 // // Retrieve the genesis block.
328 // let Ok(block) = self_.ledger.get_block(0) else {
329 // error!("Failed to get the genesis block");
330 // return;
331 // };
332 // // Search the genesis block for the mint record.
333 // if let Some(record) = search_genesis_for_mint(block, view_key) {
334 // found = Some(record);
335 // }
336 // found.expect("Failed to find the split output record")
337 // }
338 // }
339 // };
340 // info!("Starting transaction pool...");
341 // // Start the transaction loop.
342 // loop {
343 // tokio::time::sleep(Duration::from_secs(1)).await;
344 // // If the node is running in development mode, only generate if you are allowed.
345 // if let Some(dev) = dev {
346 // if dev != 0 {
347 // continue;
348 // }
349 // }
350 //
351 // // Prepare the inputs.
352 // let inputs = [Value::from(record.clone()), Value::from(Literal::U64(U64::new(1)))].into_iter();
353 // // Execute the transaction.
354 // let transaction = match self_.ledger.vm().execute(
355 // self_.private_key(),
356 // locator,
357 // inputs,
358 // None,
359 // None,
360 // &mut rand::rng(),
361 // ) {
362 // Ok(transaction) => transaction,
363 // Err(error) => {
364 // error!("Transaction pool encountered an execution error - {error}");
365 // continue;
366 // }
367 // };
368 // // Retrieve the transition.
369 // let Some(transition) = transaction.transitions().next() else {
370 // error!("Transaction pool encountered a missing transition");
371 // continue;
372 // };
373 // // Retrieve the second output.
374 // let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[1] else {
375 // error!("Transaction pool encountered a missing output");
376 // continue;
377 // };
378 // // Save the second output record.
379 // let Ok(next_record) = ciphertext.decrypt(view_key) else {
380 // error!("Transaction pool encountered a decryption error");
381 // continue;
382 // };
383 // // Broadcast the transaction.
384 // if self_
385 // .unconfirmed_transaction(
386 // self_.router.local_ip(),
387 // UnconfirmedTransaction::from(transaction.clone()),
388 // transaction.clone(),
389 // )
390 // .await
391 // {
392 // info!("Transaction pool broadcasted the transaction");
393 // let commitment = next_record.to_commitment(&locator.0, &record_name).unwrap();
394 // while !self_.ledger.contains_commitment(&commitment).unwrap_or(false) {
395 // tokio::time::sleep(Duration::from_secs(1)).await;
396 // }
397 // info!("Transaction accepted by the ledger");
398 // }
399 // // Save the record.
400 // record = next_record;
401 // }
402 // });
403 // Ok(())
404 // }
405
406 /// Initializes the transaction pool (if in development mode).
407 ///
408 /// Spawns a background task that periodically issues transactions to the network.
409 fn initialize_transaction_pool(&self, dev: Option<u16>, dev_txs: bool) -> Result<()> {
410 use snarkvm::console::{
411 program::{Identifier, Literal, ProgramID, Value},
412 types::U64,
413 };
414 use std::str::FromStr;
415
416 // Initialize the locator.
417 let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("transfer_public")?);
418
419 // Determine whether to start the loop.
420 match dev {
421 // If the node is running in development mode, only generate if you are allowed.
422 Some(id) => {
423 // If the node is not the first node, or if we should not create dev traffic, do not start the loop.
424 if id != 0 || !dev_txs {
425 return Ok(());
426 }
427 }
428 // If the node is not running in development mode, do not generate dev traffic.
429 _ => return Ok(()),
430 }
431
432 let self_ = self.clone();
433 self.spawn(async move {
434 tokio::time::sleep(Duration::from_secs(3)).await;
435 info!("Starting transaction pool...");
436
437 // Start the transaction loop.
438 loop {
439 tokio::time::sleep(Duration::from_millis(500)).await;
440
441 // Prepare the inputs.
442 let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))];
443 // Execute the transaction.
444 let self__ = self_.clone();
445 let transaction = match tokio::task::spawn_blocking(move || {
446 self__.ledger.vm().execute(
447 self__.private_key(),
448 locator,
449 inputs.into_iter(),
450 None,
451 10_000,
452 None,
453 &mut rand::rng(),
454 )
455 })
456 .await
457 {
458 Ok(Ok(transaction)) => transaction,
459 Ok(Err(error)) => {
460 error!("Transaction pool encountered an execution error - {error}");
461 continue;
462 }
463 Err(error) => {
464 error!("Transaction pool encountered an execution error - blocking task failed - {error}");
465 continue;
466 }
467 };
468 // Broadcast the transaction.
469 if self_
470 .unconfirmed_transaction(
471 self_.router.local_ip(),
472 UnconfirmedTransaction::from(transaction.clone()),
473 transaction.clone(),
474 )
475 .await
476 {
477 info!("Transaction pool broadcasted the transaction");
478 }
479 }
480 });
481 Ok(())
482 }
483
484 /// Spawns a task with the given future; it should only be used for long-running tasks.
485 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
486 self.handles.lock().push(tokio::spawn(future));
487 }
488}
489
490#[async_trait]
491impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
492 /// Shuts down the node.
493 async fn shut_down(&self) {
494 info!("Shutting down...");
495
496 // Shut down the node.
497 trace!("Shutting down the node...");
498
499 // Shut down the Slipstream plugin manager.
500 #[cfg(feature = "slipstream-plugins")]
501 if let Some(manager) = self.ledger.vm().finalize_store().slipstream_plugin_manager().write().as_mut() {
502 manager.unload();
503 }
504
505 // Shut down the REST instance.
506 if let Some(rest) = &self.rest {
507 trace!("Shutting down the REST server...");
508 rest.shut_down();
509 }
510
511 // Abort the tasks.
512 trace!("Shutting down the validator...");
513 self.handles.lock().iter().for_each(|handle| handle.abort());
514
515 // Shut down the router.
516 self.router.shut_down().await;
517
518 // Shut down consensus.
519 trace!("Shutting down consensus...");
520 self.consensus.shut_down().await;
521
522 info!("Node has shut down.");
523 }
524}
525
526#[cfg(test)]
527mod tests {
528 use super::*;
529 use snarkvm::prelude::{
530 MainnetV0,
531 VM,
532 store::{ConsensusStore, helpers::memory::ConsensusMemory},
533 };
534
535 use anyhow::bail;
536 use rand::SeedableRng;
537 use rand_chacha::ChaChaRng;
538 use std::str::FromStr;
539
540 type CurrentNetwork = MainnetV0;
541
542 /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test.
543 #[ignore]
544 #[tokio::test]
545 async fn test_profiler() -> Result<()> {
546 // Specify the node attributes.
547 let node = SocketAddr::from_str("0.0.0.0:4130").unwrap();
548 let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
549 let storage_mode = StorageMode::Development(0);
550 let node_data_dir = NodeDataDir::new_development(CurrentNetwork::ID, 0);
551 let dev_txs = true;
552
553 // Initialize an (insecure) fixed RNG.
554 let mut rng = ChaChaRng::seed_from_u64(snarkos_utilities::DEVELOPMENT_MODE_RNG_SEED);
555 // Initialize the account.
556 let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
557 // Initialize a new VM.
558 let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(
559 StorageMode::new_test(None),
560 )?)?;
561 // Initialize the genesis block.
562 let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
563
564 println!("Initializing validator node...");
565
566 let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
567 node,
568 None,
569 Some(rest),
570 10,
571 account,
572 &[],
573 &[],
574 genesis,
575 None,
576 storage_mode,
577 node_data_dir,
578 false,
579 dev_txs,
580 None,
581 &[],
582 None,
583 SignalHandler::new(None),
584 )
585 .await
586 .unwrap();
587
588 println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
589
590 bail!("\n\nRemember to #[ignore] this test!\n\n")
591 }
592}