snarkos_node/validator/
mod.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19use snarkos_account::Account;
20use snarkos_node_bft::{helpers::init_primary_channels, ledger_service::CoreLedgerService, spawn_blocking};
21use snarkos_node_consensus::Consensus;
22use snarkos_node_rest::Rest;
23use snarkos_node_router::{
24    Heartbeat,
25    Inbound,
26    Outbound,
27    Router,
28    Routing,
29    messages::{NodeType, PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction},
30};
31use snarkos_node_sync::{BlockSync, BlockSyncMode};
32use snarkos_node_tcp::{
33    P2P,
34    protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
35};
36use snarkvm::prelude::{
37    Ledger,
38    Network,
39    block::{Block, Header},
40    puzzle::Solution,
41    store::ConsensusStorage,
42};
43
44use aleo_std::StorageMode;
45use anyhow::Result;
46use core::future::Future;
47#[cfg(feature = "locktick")]
48use locktick::parking_lot::Mutex;
49#[cfg(not(feature = "locktick"))]
50use parking_lot::Mutex;
51use std::{
52    net::SocketAddr,
53    sync::{Arc, atomic::AtomicBool},
54    time::Duration,
55};
56use tokio::task::JoinHandle;
57
58/// A validator is a full node, capable of validating blocks.
59#[derive(Clone)]
60pub struct Validator<N: Network, C: ConsensusStorage<N>> {
61    /// The ledger of the node.
62    ledger: Ledger<N, C>,
63    /// The consensus module of the node.
64    consensus: Consensus<N>,
65    /// The router of the node.
66    router: Router<N>,
67    /// The REST server of the node.
68    rest: Option<Rest<N, C, Self>>,
69    /// The sync module.
70    sync: BlockSync<N>,
71    /// The spawned handles.
72    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
73    /// The shutdown signal.
74    shutdown: Arc<AtomicBool>,
75}
76
77impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
78    /// Initializes a new validator node.
79    pub async fn new(
80        node_ip: SocketAddr,
81        bft_ip: Option<SocketAddr>,
82        rest_ip: Option<SocketAddr>,
83        rest_rps: u32,
84        account: Account<N>,
85        trusted_peers: &[SocketAddr],
86        trusted_validators: &[SocketAddr],
87        genesis: Block<N>,
88        cdn: Option<String>,
89        storage_mode: StorageMode,
90        allow_external_peers: bool,
91        dev_txs: bool,
92        shutdown: Arc<AtomicBool>,
93    ) -> Result<Self> {
94        // Initialize the signal handler.
95        let signal_node = Self::handle_signals(shutdown.clone());
96
97        // Initialize the ledger.
98        let ledger = Ledger::load(genesis, storage_mode.clone())?;
99
100        // Initialize the CDN.
101        if let Some(base_url) = cdn {
102            // Sync the ledger with the CDN.
103            if let Err((_, error)) =
104                snarkos_node_cdn::sync_ledger_with_cdn(&base_url, ledger.clone(), shutdown.clone()).await
105            {
106                crate::log_clean_error(&storage_mode);
107                return Err(error);
108            }
109        }
110
111        // Initialize the ledger service.
112        let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone()));
113
114        // Initialize the consensus.
115        let mut consensus =
116            Consensus::new(account.clone(), ledger_service.clone(), bft_ip, trusted_validators, storage_mode.clone())?;
117        // Initialize the primary channels.
118        let (primary_sender, primary_receiver) = init_primary_channels::<N>();
119        // Start the consensus.
120        consensus.run(primary_sender, primary_receiver).await?;
121        // Determine if the validator should rotate external peers.
122        let rotate_external_peers = false;
123
124        // Initialize the node router.
125        let router = Router::new(
126            node_ip,
127            NodeType::Validator,
128            account.clone(),
129            ledger_service.clone(),
130            trusted_peers,
131            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
132            rotate_external_peers,
133            allow_external_peers,
134            matches!(storage_mode, StorageMode::Development(_)),
135        )
136        .await?;
137
138        // Initialize the sync module.
139        let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service, router.tcp().clone());
140
141        // Initialize the node.
142        let mut node = Self {
143            ledger: ledger.clone(),
144            consensus: consensus.clone(),
145            router,
146            rest: None,
147            sync,
148            handles: Default::default(),
149            shutdown,
150        };
151        // Initialize the transaction pool.
152        node.initialize_transaction_pool(storage_mode, dev_txs)?;
153
154        // Initialize the REST server.
155        if let Some(rest_ip) = rest_ip {
156            node.rest =
157                Some(Rest::start(rest_ip, rest_rps, Some(consensus), ledger.clone(), Arc::new(node.clone())).await?);
158        }
159        // Initialize the routing.
160        node.initialize_routing().await;
161        // Initialize the notification message loop.
162        node.handles.lock().push(crate::start_notification_message_loop());
163        // Pass the node to the signal handler.
164        let _ = signal_node.set(node.clone());
165        // Return the node.
166        Ok(node)
167    }
168
169    /// Returns the ledger.
170    pub fn ledger(&self) -> &Ledger<N, C> {
171        &self.ledger
172    }
173
174    /// Returns the REST server.
175    pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
176        &self.rest
177    }
178}
179
180impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
181    // /// Initialize the transaction pool.
182    // fn initialize_transaction_pool(&self, dev: Option<u16>) -> Result<()> {
183    //     use snarkvm::{
184    //         console::{
185    //             account::ViewKey,
186    //             program::{Identifier, Literal, Plaintext, ProgramID, Record, Value},
187    //             types::U64,
188    //         },
189    //         ledger::block::transition::Output,
190    //     };
191    //     use std::str::FromStr;
192    //
193    //     // Initialize the locator.
194    //     let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("split")?);
195    //     // Initialize the record name.
196    //     let record_name = Identifier::from_str("credits")?;
197    //
198    //     /// Searches the genesis block for the mint record.
199    //     fn search_genesis_for_mint<N: Network>(
200    //         block: Block<N>,
201    //         view_key: &ViewKey<N>,
202    //     ) -> Option<Record<N, Plaintext<N>>> {
203    //         for transition in block.transitions().filter(|t| t.is_mint()) {
204    //             if let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[0] {
205    //                 if ciphertext.is_owner(view_key) {
206    //                     match ciphertext.decrypt(view_key) {
207    //                         Ok(record) => return Some(record),
208    //                         Err(error) => {
209    //                             error!("Failed to decrypt the mint output record - {error}");
210    //                             return None;
211    //                         }
212    //                     }
213    //                 }
214    //             }
215    //         }
216    //         None
217    //     }
218    //
219    //     /// Searches the block for the split record.
220    //     fn search_block_for_split<N: Network>(
221    //         block: Block<N>,
222    //         view_key: &ViewKey<N>,
223    //     ) -> Option<Record<N, Plaintext<N>>> {
224    //         let mut found = None;
225    //         // TODO (howardwu): Switch to the iterator when DoubleEndedIterator is supported.
226    //         // block.transitions().rev().for_each(|t| {
227    //         let splits = block.transitions().filter(|t| t.is_split()).collect::<Vec<_>>();
228    //         splits.iter().rev().for_each(|t| {
229    //             if found.is_some() {
230    //                 return;
231    //             }
232    //             let Output::Record(_, _, Some(ciphertext)) = &t.outputs()[1] else {
233    //                 error!("Failed to find the split output record");
234    //                 return;
235    //             };
236    //             if ciphertext.is_owner(view_key) {
237    //                 match ciphertext.decrypt(view_key) {
238    //                     Ok(record) => found = Some(record),
239    //                     Err(error) => {
240    //                         error!("Failed to decrypt the split output record - {error}");
241    //                     }
242    //                 }
243    //             }
244    //         });
245    //         found
246    //     }
247    //
248    //     let self_ = self.clone();
249    //     self.spawn(async move {
250    //         // Retrieve the view key.
251    //         let view_key = self_.view_key();
252    //         // Initialize the record.
253    //         let mut record = {
254    //             let mut found = None;
255    //             let mut height = self_.ledger.latest_height();
256    //             while found.is_none() && height > 0 {
257    //                 // Retrieve the block.
258    //                 let Ok(block) = self_.ledger.get_block(height) else {
259    //                     error!("Failed to get block at height {}", height);
260    //                     break;
261    //                 };
262    //                 // Search for the latest split record.
263    //                 if let Some(record) = search_block_for_split(block, view_key) {
264    //                     found = Some(record);
265    //                 }
266    //                 // Decrement the height.
267    //                 height = height.saturating_sub(1);
268    //             }
269    //             match found {
270    //                 Some(record) => record,
271    //                 None => {
272    //                     // Retrieve the genesis block.
273    //                     let Ok(block) = self_.ledger.get_block(0) else {
274    //                         error!("Failed to get the genesis block");
275    //                         return;
276    //                     };
277    //                     // Search the genesis block for the mint record.
278    //                     if let Some(record) = search_genesis_for_mint(block, view_key) {
279    //                         found = Some(record);
280    //                     }
281    //                     found.expect("Failed to find the split output record")
282    //                 }
283    //             }
284    //         };
285    //         info!("Starting transaction pool...");
286    //         // Start the transaction loop.
287    //         loop {
288    //             tokio::time::sleep(Duration::from_secs(1)).await;
289    //             // If the node is running in development mode, only generate if you are allowed.
290    //             if let Some(dev) = dev {
291    //                 if dev != 0 {
292    //                     continue;
293    //                 }
294    //             }
295    //
296    //             // Prepare the inputs.
297    //             let inputs = [Value::from(record.clone()), Value::from(Literal::U64(U64::new(1)))].into_iter();
298    //             // Execute the transaction.
299    //             let transaction = match self_.ledger.vm().execute(
300    //                 self_.private_key(),
301    //                 locator,
302    //                 inputs,
303    //                 None,
304    //                 None,
305    //                 &mut rand::thread_rng(),
306    //             ) {
307    //                 Ok(transaction) => transaction,
308    //                 Err(error) => {
309    //                     error!("Transaction pool encountered an execution error - {error}");
310    //                     continue;
311    //                 }
312    //             };
313    //             // Retrieve the transition.
314    //             let Some(transition) = transaction.transitions().next() else {
315    //                 error!("Transaction pool encountered a missing transition");
316    //                 continue;
317    //             };
318    //             // Retrieve the second output.
319    //             let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[1] else {
320    //                 error!("Transaction pool encountered a missing output");
321    //                 continue;
322    //             };
323    //             // Save the second output record.
324    //             let Ok(next_record) = ciphertext.decrypt(view_key) else {
325    //                 error!("Transaction pool encountered a decryption error");
326    //                 continue;
327    //             };
328    //             // Broadcast the transaction.
329    //             if self_
330    //                 .unconfirmed_transaction(
331    //                     self_.router.local_ip(),
332    //                     UnconfirmedTransaction::from(transaction.clone()),
333    //                     transaction.clone(),
334    //                 )
335    //                 .await
336    //             {
337    //                 info!("Transaction pool broadcasted the transaction");
338    //                 let commitment = next_record.to_commitment(&locator.0, &record_name).unwrap();
339    //                 while !self_.ledger.contains_commitment(&commitment).unwrap_or(false) {
340    //                     tokio::time::sleep(Duration::from_secs(1)).await;
341    //                 }
342    //                 info!("Transaction accepted by the ledger");
343    //             }
344    //             // Save the record.
345    //             record = next_record;
346    //         }
347    //     });
348    //     Ok(())
349    // }
350
351    /// Initialize the transaction pool.
352    fn initialize_transaction_pool(&self, storage_mode: StorageMode, dev_txs: bool) -> Result<()> {
353        use snarkvm::console::{
354            program::{Identifier, Literal, ProgramID, Value},
355            types::U64,
356        };
357        use std::str::FromStr;
358
359        // Initialize the locator.
360        let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("transfer_public")?);
361
362        // Determine whether to start the loop.
363        match storage_mode {
364            // If the node is running in development mode, only generate if you are allowed.
365            StorageMode::Development(id) => {
366                // If the node is not the first node, or if we should not create dev traffic, do not start the loop.
367                if id != 0 || !dev_txs {
368                    return Ok(());
369                }
370            }
371            // If the node is not running in development mode, do not generate dev traffic.
372            _ => return Ok(()),
373        }
374
375        let self_ = self.clone();
376        self.spawn(async move {
377            tokio::time::sleep(Duration::from_secs(3)).await;
378            info!("Starting transaction pool...");
379
380            // Start the transaction loop.
381            loop {
382                tokio::time::sleep(Duration::from_millis(500)).await;
383
384                // Prepare the inputs.
385                let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))];
386                // Execute the transaction.
387                let self__ = self_.clone();
388                let transaction = match spawn_blocking!(self__.ledger.vm().execute(
389                    self__.private_key(),
390                    locator,
391                    inputs.into_iter(),
392                    None,
393                    10_000,
394                    None,
395                    &mut rand::thread_rng(),
396                )) {
397                    Ok(transaction) => transaction,
398                    Err(error) => {
399                        error!("Transaction pool encountered an execution error - {error}");
400                        continue;
401                    }
402                };
403                // Broadcast the transaction.
404                if self_
405                    .unconfirmed_transaction(
406                        self_.router.local_ip(),
407                        UnconfirmedTransaction::from(transaction.clone()),
408                        transaction.clone(),
409                    )
410                    .await
411                {
412                    info!("Transaction pool broadcasted the transaction");
413                }
414            }
415        });
416        Ok(())
417    }
418
419    /// Spawns a task with the given future; it should only be used for long-running tasks.
420    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
421        self.handles.lock().push(tokio::spawn(future));
422    }
423}
424
425#[async_trait]
426impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
427    /// Shuts down the node.
428    async fn shut_down(&self) {
429        info!("Shutting down...");
430
431        // Shut down the node.
432        trace!("Shutting down the node...");
433        self.shutdown.store(true, std::sync::atomic::Ordering::Release);
434
435        // Abort the tasks.
436        trace!("Shutting down the validator...");
437        self.handles.lock().iter().for_each(|handle| handle.abort());
438
439        // Shut down the router.
440        self.router.shut_down().await;
441
442        // Shut down consensus.
443        trace!("Shutting down consensus...");
444        self.consensus.shut_down().await;
445
446        info!("Node has shut down.");
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453    use snarkvm::prelude::{
454        MainnetV0,
455        VM,
456        store::{ConsensusStore, helpers::memory::ConsensusMemory},
457    };
458
459    use anyhow::bail;
460    use rand::SeedableRng;
461    use rand_chacha::ChaChaRng;
462    use std::str::FromStr;
463
464    type CurrentNetwork = MainnetV0;
465
466    /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test.
467    #[ignore]
468    #[tokio::test]
469    async fn test_profiler() -> Result<()> {
470        // Specify the node attributes.
471        let node = SocketAddr::from_str("0.0.0.0:4130").unwrap();
472        let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
473        let storage_mode = StorageMode::Development(0);
474        let dev_txs = true;
475
476        // Initialize an (insecure) fixed RNG.
477        let mut rng = ChaChaRng::seed_from_u64(1234567890u64);
478        // Initialize the account.
479        let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
480        // Initialize a new VM.
481        let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(
482            StorageMode::new_test(None),
483        )?)?;
484        // Initialize the genesis block.
485        let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
486
487        println!("Initializing validator node...");
488
489        let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
490            node,
491            None,
492            Some(rest),
493            10,
494            account,
495            &[],
496            &[],
497            genesis,
498            None,
499            storage_mode,
500            false,
501            dev_txs,
502            Default::default(),
503        )
504        .await
505        .unwrap();
506
507        println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
508
509        bail!("\n\nRemember to #[ignore] this test!\n\n")
510    }
511}