Skip to main content

snarkos_node/validator/
mod.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19
20use snarkos_account::Account;
21use snarkos_node_bft::{ledger_service::CoreLedgerService, spawn_blocking};
22use snarkos_node_cdn::CdnBlockSync;
23use snarkos_node_consensus::Consensus;
24use snarkos_node_network::{ConnectionMode, NodeType, PeerPoolHandling};
25use snarkos_node_rest::Rest;
26use snarkos_node_router::{
27    Heartbeat,
28    Inbound,
29    Outbound,
30    Router,
31    Routing,
32    messages::{PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction},
33};
34use snarkos_node_sync::{BlockSync, Ping};
35use snarkos_node_tcp::{
36    P2P,
37    protocols::{Disconnect, Handshake, OnConnect, Reading},
38};
39use snarkos_utilities::{NodeDataDir, SignalHandler};
40
41use snarkvm::prelude::{
42    Ledger,
43    Network,
44    block::{Block, Header},
45    puzzle::Solution,
46    store::ConsensusStorage,
47};
48
49use aleo_std::StorageMode;
50use anyhow::{Context, Result};
51use core::future::Future;
52#[cfg(feature = "locktick")]
53use locktick::parking_lot::Mutex;
54#[cfg(not(feature = "locktick"))]
55use parking_lot::Mutex;
56use std::{net::SocketAddr, sync::Arc, time::Duration};
57use tokio::task::JoinHandle;
58
59/// A validator is a full node, capable of validating blocks.
60#[derive(Clone)]
61pub struct Validator<N: Network, C: ConsensusStorage<N>> {
62    /// The ledger of the node.
63    ledger: Ledger<N, C>,
64    /// The consensus module of the node.
65    consensus: Consensus<N>,
66    /// The router of the node.
67    router: Router<N>,
68    /// The REST server of the node.
69    rest: Option<Rest<N, C, Self>>,
70    /// The block synchronization logic (used in the Router impl).
71    sync: Arc<BlockSync<N>>,
72    /// The spawned handles.
73    pub(crate) handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
74    /// Keeps track of sending pings.
75    ping: Arc<Ping<N>>,
76}
77
78impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
79    /// Initializes a new validator node.
80    pub async fn new(
81        node_ip: SocketAddr,
82        bft_ip: Option<SocketAddr>,
83        rest_ip: Option<SocketAddr>,
84        rest_rps: u32,
85        account: Account<N>,
86        trusted_peers: &[SocketAddr],
87        trusted_validators: &[SocketAddr],
88        genesis: Block<N>,
89        cdn: Option<http::Uri>,
90        storage_mode: StorageMode,
91        node_data_dir: NodeDataDir,
92        trusted_peers_only: bool,
93        dev_txs: bool,
94        dev: Option<u16>,
95        _slipstream_configs: &[std::path::PathBuf],
96        #[cfg(feature = "test_network")] dev_num_validators_for_committee_hotswap: Option<u16>,
97        #[cfg(not(feature = "test_network"))] _dev_num_validators_for_committee_hotswap: Option<u16>,
98        signal_handler: Arc<SignalHandler>,
99    ) -> Result<Self> {
100        // Initialize the ledger.
101        let ledger = {
102            let storage_mode = storage_mode.clone();
103            let genesis = genesis.clone();
104
105            spawn_blocking!(Ledger::<N, C>::load(genesis, storage_mode))
106        }
107        .with_context(|| "Failed to initialize the ledger")?;
108
109        // Initialize the Slipstream plugin manager (if any config files were provided).
110        #[cfg(feature = "slipstream-plugins")]
111        if !_slipstream_configs.is_empty() {
112            let manager =
113                snarkvm::slipstream_plugin_manager::SlipstreamPluginManager::from_config_files(_slipstream_configs)
114                    .context("Failed to initialize Slipstream plugin manager")?;
115            ledger.vm().finalize_store().set_slipstream_plugin_manager(manager);
116            let num_plugins = _slipstream_configs.len();
117            tracing::info!(target: "slipstream", "Slipstream plugin manager registered ({num_plugins} plugin(s))");
118        }
119
120        // Initialize the ledger service.
121        #[cfg(not(feature = "test_network"))]
122        let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), signal_handler.clone()));
123        #[cfg(feature = "test_network")]
124        // Initialize the ledger service with a deterministic dev committee.
125        let ledger_service = if let Some(dev_num_validators) = dev_num_validators_for_committee_hotswap {
126            Arc::new(CoreLedgerService::new_dev(
127                ledger.clone(),
128                signal_handler.clone(),
129                Some((node_data_dir.clone(), dev_num_validators)),
130            ))
131        // Initialize the ledger service without a deterministic dev committee.
132        } else {
133            Arc::new(CoreLedgerService::new_dev(ledger.clone(), signal_handler.clone(), None))
134        };
135
136        // Initialize the node router.
137        let router = Router::new(
138            node_ip,
139            NodeType::Validator,
140            account.clone(),
141            ledger_service.clone(),
142            trusted_peers,
143            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
144            trusted_peers_only,
145            node_data_dir.clone(),
146            dev.is_some(),
147        )
148        .await?;
149
150        // Initialize the block synchronization logic.
151        let sync = Arc::new(BlockSync::new(ledger_service.clone(), ConnectionMode::Gateway));
152        let locators = sync.get_block_locators()?;
153        let ping = Arc::new(Ping::new(router.clone(), locators));
154
155        // Initialize the consensus layer.
156        let consensus = Consensus::new(
157            account.clone(),
158            ledger_service.clone(),
159            sync.clone(),
160            bft_ip,
161            trusted_validators,
162            trusted_peers_only,
163            storage_mode.clone(),
164            node_data_dir.clone(),
165            ping.clone(),
166            dev,
167        )
168        .await?;
169
170        // Initialize the node.
171        let mut node = Self {
172            ledger: ledger.clone(),
173            consensus: consensus.clone(),
174            router,
175            rest: None,
176            sync: sync.clone(),
177            ping,
178            handles: Default::default(),
179        };
180
181        // Perform sync with CDN (if enabled).
182        let cdn_sync = cdn.map(|base_url| Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler)));
183
184        // Initialize the transaction pool.
185        node.initialize_transaction_pool(dev, dev_txs)?;
186
187        // Initialize the REST server.
188        if let Some(rest_ip) = rest_ip {
189            node.rest = Some(
190                Rest::start(
191                    rest_ip,
192                    rest_rps,
193                    Some(consensus),
194                    ledger.clone(),
195                    Arc::new(node.clone()),
196                    cdn_sync.clone(),
197                    sync,
198                )
199                .await?,
200            );
201        }
202
203        // Set up everything else after CDN sync is done.
204        if let Some(cdn_sync) = cdn_sync {
205            if let Err(error) = cdn_sync.wait().await.with_context(|| "Failed to synchronize from the CDN") {
206                crate::log_clean_error(&storage_mode);
207                node.shut_down().await;
208                return Err(error);
209            }
210        }
211
212        // Initialize the routing.
213        node.initialize_routing().await;
214        // Initialize the notification message loop.
215        node.handles.lock().push(crate::start_notification_message_loop());
216
217        // Return the node.
218        Ok(node)
219    }
220
221    /// Returns the ledger.
222    pub fn ledger(&self) -> &Ledger<N, C> {
223        &self.ledger
224    }
225
226    /// Returns the REST server.
227    pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
228        &self.rest
229    }
230
231    /// Returns the router.
232    pub fn router(&self) -> &Router<N> {
233        &self.router
234    }
235
236    // /// Initialize the transaction pool.
237    // fn initialize_transaction_pool(&self, dev: Option<u16>) -> Result<()> {
238    //     use snarkvm::{
239    //         console::{
240    //             account::ViewKey,
241    //             program::{Identifier, Literal, Plaintext, ProgramID, Record, Value},
242    //             types::U64,
243    //         },
244    //         ledger::block::transition::Output,
245    //     };
246    //     use std::str::FromStr;
247    //
248    //     // Initialize the locator.
249    //     let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("split")?);
250    //     // Initialize the record name.
251    //     let record_name = Identifier::from_str("credits")?;
252    //
253    //     /// Searches the genesis block for the mint record.
254    //     fn search_genesis_for_mint<N: Network>(
255    //         block: Block<N>,
256    //         view_key: &ViewKey<N>,
257    //     ) -> Option<Record<N, Plaintext<N>>> {
258    //         for transition in block.transitions().filter(|t| t.is_mint()) {
259    //             if let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[0] {
260    //                 if ciphertext.is_owner(view_key) {
261    //                     match ciphertext.decrypt(view_key) {
262    //                         Ok(record) => return Some(record),
263    //                         Err(error) => {
264    //                             error!("Failed to decrypt the mint output record - {error}");
265    //                             return None;
266    //                         }
267    //                     }
268    //                 }
269    //             }
270    //         }
271    //         None
272    //     }
273    //
274    //     /// Searches the block for the split record.
275    //     fn search_block_for_split<N: Network>(
276    //         block: Block<N>,
277    //         view_key: &ViewKey<N>,
278    //     ) -> Option<Record<N, Plaintext<N>>> {
279    //         let mut found = None;
280    //         // TODO (howardwu): Switch to the iterator when DoubleEndedIterator is supported.
281    //         // block.transitions().rev().for_each(|t| {
282    //         let splits = block.transitions().filter(|t| t.is_split()).collect::<Vec<_>>();
283    //         splits.iter().rev().for_each(|t| {
284    //             if found.is_some() {
285    //                 return;
286    //             }
287    //             let Output::Record(_, _, Some(ciphertext)) = &t.outputs()[1] else {
288    //                 error!("Failed to find the split output record");
289    //                 return;
290    //             };
291    //             if ciphertext.is_owner(view_key) {
292    //                 match ciphertext.decrypt(view_key) {
293    //                     Ok(record) => found = Some(record),
294    //                     Err(error) => {
295    //                         error!("Failed to decrypt the split output record - {error}");
296    //                     }
297    //                 }
298    //             }
299    //         });
300    //         found
301    //     }
302    //
303    //     let self_ = self.clone();
304    //     self.spawn(async move {
305    //         // Retrieve the view key.
306    //         let view_key = self_.view_key();
307    //         // Initialize the record.
308    //         let mut record = {
309    //             let mut found = None;
310    //             let mut height = self_.ledger.latest_height();
311    //             while found.is_none() && height > 0 {
312    //                 // Retrieve the block.
313    //                 let Ok(block) = self_.ledger.get_block(height) else {
314    //                     error!("Failed to get block at height {}", height);
315    //                     break;
316    //                 };
317    //                 // Search for the latest split record.
318    //                 if let Some(record) = search_block_for_split(block, view_key) {
319    //                     found = Some(record);
320    //                 }
321    //                 // Decrement the height.
322    //                 height = height.saturating_sub(1);
323    //             }
324    //             match found {
325    //                 Some(record) => record,
326    //                 None => {
327    //                     // Retrieve the genesis block.
328    //                     let Ok(block) = self_.ledger.get_block(0) else {
329    //                         error!("Failed to get the genesis block");
330    //                         return;
331    //                     };
332    //                     // Search the genesis block for the mint record.
333    //                     if let Some(record) = search_genesis_for_mint(block, view_key) {
334    //                         found = Some(record);
335    //                     }
336    //                     found.expect("Failed to find the split output record")
337    //                 }
338    //             }
339    //         };
340    //         info!("Starting transaction pool...");
341    //         // Start the transaction loop.
342    //         loop {
343    //             tokio::time::sleep(Duration::from_secs(1)).await;
344    //             // If the node is running in development mode, only generate if you are allowed.
345    //             if let Some(dev) = dev {
346    //                 if dev != 0 {
347    //                     continue;
348    //                 }
349    //             }
350    //
351    //             // Prepare the inputs.
352    //             let inputs = [Value::from(record.clone()), Value::from(Literal::U64(U64::new(1)))].into_iter();
353    //             // Execute the transaction.
354    //             let transaction = match self_.ledger.vm().execute(
355    //                 self_.private_key(),
356    //                 locator,
357    //                 inputs,
358    //                 None,
359    //                 None,
360    //                 &mut rand::rng(),
361    //             ) {
362    //                 Ok(transaction) => transaction,
363    //                 Err(error) => {
364    //                     error!("Transaction pool encountered an execution error - {error}");
365    //                     continue;
366    //                 }
367    //             };
368    //             // Retrieve the transition.
369    //             let Some(transition) = transaction.transitions().next() else {
370    //                 error!("Transaction pool encountered a missing transition");
371    //                 continue;
372    //             };
373    //             // Retrieve the second output.
374    //             let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[1] else {
375    //                 error!("Transaction pool encountered a missing output");
376    //                 continue;
377    //             };
378    //             // Save the second output record.
379    //             let Ok(next_record) = ciphertext.decrypt(view_key) else {
380    //                 error!("Transaction pool encountered a decryption error");
381    //                 continue;
382    //             };
383    //             // Broadcast the transaction.
384    //             if self_
385    //                 .unconfirmed_transaction(
386    //                     self_.router.local_ip(),
387    //                     UnconfirmedTransaction::from(transaction.clone()),
388    //                     transaction.clone(),
389    //                 )
390    //                 .await
391    //             {
392    //                 info!("Transaction pool broadcasted the transaction");
393    //                 let commitment = next_record.to_commitment(&locator.0, &record_name).unwrap();
394    //                 while !self_.ledger.contains_commitment(&commitment).unwrap_or(false) {
395    //                     tokio::time::sleep(Duration::from_secs(1)).await;
396    //                 }
397    //                 info!("Transaction accepted by the ledger");
398    //             }
399    //             // Save the record.
400    //             record = next_record;
401    //         }
402    //     });
403    //     Ok(())
404    // }
405
406    /// Initializes the transaction pool (if in development mode).
407    ///
408    /// Spawns a background task that periodically issues transactions to the network.
409    fn initialize_transaction_pool(&self, dev: Option<u16>, dev_txs: bool) -> Result<()> {
410        use snarkvm::console::{
411            program::{Identifier, Literal, ProgramID, Value},
412            types::U64,
413        };
414        use std::str::FromStr;
415
416        // Initialize the locator.
417        let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("transfer_public")?);
418
419        // Determine whether to start the loop.
420        match dev {
421            // If the node is running in development mode, only generate if you are allowed.
422            Some(id) => {
423                // If the node is not the first node, or if we should not create dev traffic, do not start the loop.
424                if id != 0 || !dev_txs {
425                    return Ok(());
426                }
427            }
428            // If the node is not running in development mode, do not generate dev traffic.
429            _ => return Ok(()),
430        }
431
432        let self_ = self.clone();
433        self.spawn(async move {
434            tokio::time::sleep(Duration::from_secs(3)).await;
435            info!("Starting transaction pool...");
436
437            // Start the transaction loop.
438            loop {
439                tokio::time::sleep(Duration::from_millis(500)).await;
440
441                // Prepare the inputs.
442                let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))];
443                // Execute the transaction.
444                let self__ = self_.clone();
445                let transaction = match tokio::task::spawn_blocking(move || {
446                    self__.ledger.vm().execute(
447                        self__.private_key(),
448                        locator,
449                        inputs.into_iter(),
450                        None,
451                        10_000,
452                        None,
453                        &mut rand::rng(),
454                    )
455                })
456                .await
457                {
458                    Ok(Ok(transaction)) => transaction,
459                    Ok(Err(error)) => {
460                        error!("Transaction pool encountered an execution error - {error}");
461                        continue;
462                    }
463                    Err(error) => {
464                        error!("Transaction pool encountered an execution error - blocking task failed - {error}");
465                        continue;
466                    }
467                };
468                // Broadcast the transaction.
469                if self_
470                    .unconfirmed_transaction(
471                        self_.router.local_ip(),
472                        UnconfirmedTransaction::from(transaction.clone()),
473                        transaction.clone(),
474                    )
475                    .await
476                {
477                    info!("Transaction pool broadcasted the transaction");
478                }
479            }
480        });
481        Ok(())
482    }
483
484    /// Spawns a task with the given future; it should only be used for long-running tasks.
485    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
486        self.handles.lock().push(tokio::spawn(future));
487    }
488}
489
490#[async_trait]
491impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
492    /// Shuts down the node.
493    async fn shut_down(&self) {
494        info!("Shutting down...");
495
496        // Shut down the node.
497        trace!("Shutting down the node...");
498
499        // Shut down the Slipstream plugin manager.
500        #[cfg(feature = "slipstream-plugins")]
501        if let Some(manager) = self.ledger.vm().finalize_store().slipstream_plugin_manager().write().as_mut() {
502            manager.unload();
503        }
504
505        // Shut down the REST instance.
506        if let Some(rest) = &self.rest {
507            trace!("Shutting down the REST server...");
508            rest.shut_down();
509        }
510
511        // Abort the tasks.
512        trace!("Shutting down the validator...");
513        self.handles.lock().iter().for_each(|handle| handle.abort());
514
515        // Shut down the router.
516        self.router.shut_down().await;
517
518        // Shut down consensus.
519        trace!("Shutting down consensus...");
520        self.consensus.shut_down().await;
521
522        info!("Node has shut down.");
523    }
524}
525
526#[cfg(test)]
527mod tests {
528    use super::*;
529    use snarkvm::prelude::{
530        MainnetV0,
531        VM,
532        store::{ConsensusStore, helpers::memory::ConsensusMemory},
533    };
534
535    use anyhow::bail;
536    use rand::SeedableRng;
537    use rand_chacha::ChaChaRng;
538    use std::str::FromStr;
539
540    type CurrentNetwork = MainnetV0;
541
542    /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test.
543    #[ignore]
544    #[tokio::test]
545    async fn test_profiler() -> Result<()> {
546        // Specify the node attributes.
547        let node = SocketAddr::from_str("0.0.0.0:4130").unwrap();
548        let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
549        let storage_mode = StorageMode::Development(0);
550        let node_data_dir = NodeDataDir::new_development(CurrentNetwork::ID, 0);
551        let dev_txs = true;
552
553        // Initialize an (insecure) fixed RNG.
554        let mut rng = ChaChaRng::seed_from_u64(snarkos_utilities::DEVELOPMENT_MODE_RNG_SEED);
555        // Initialize the account.
556        let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
557        // Initialize a new VM.
558        let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(
559            StorageMode::new_test(None),
560        )?)?;
561        // Initialize the genesis block.
562        let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
563
564        println!("Initializing validator node...");
565
566        let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
567            node,
568            None,
569            Some(rest),
570            10,
571            account,
572            &[],
573            &[],
574            genesis,
575            None,
576            storage_mode,
577            node_data_dir,
578            false,
579            dev_txs,
580            None,
581            &[],
582            None,
583            SignalHandler::new(None),
584        )
585        .await
586        .unwrap();
587
588        println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
589
590        bail!("\n\nRemember to #[ignore] this test!\n\n")
591    }
592}