snarkos_node/validator/
mod.rs

1// Copyright 2024-2025 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19use snarkos_account::Account;
20use snarkos_node_bft::{helpers::init_primary_channels, ledger_service::CoreLedgerService, spawn_blocking};
21use snarkos_node_consensus::Consensus;
22use snarkos_node_rest::Rest;
23use snarkos_node_router::{
24    Heartbeat,
25    Inbound,
26    Outbound,
27    Router,
28    Routing,
29    messages::{NodeType, PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction},
30};
31use snarkos_node_sync::{BlockSync, BlockSyncMode};
32use snarkos_node_tcp::{
33    P2P,
34    protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
35};
36use snarkvm::prelude::{
37    Ledger,
38    Network,
39    block::{Block, Header},
40    puzzle::Solution,
41    store::ConsensusStorage,
42};
43
44use aleo_std::StorageMode;
45use anyhow::Result;
46use core::future::Future;
47#[cfg(feature = "locktick")]
48use locktick::parking_lot::Mutex;
49#[cfg(not(feature = "locktick"))]
50use parking_lot::Mutex;
51use std::{
52    net::SocketAddr,
53    sync::{Arc, atomic::AtomicBool},
54    time::Duration,
55};
56use tokio::task::JoinHandle;
57
58/// A validator is a full node, capable of validating blocks.
59#[derive(Clone)]
60pub struct Validator<N: Network, C: ConsensusStorage<N>> {
61    /// The ledger of the node.
62    ledger: Ledger<N, C>,
63    /// The consensus module of the node.
64    consensus: Consensus<N>,
65    /// The router of the node.
66    router: Router<N>,
67    /// The REST server of the node.
68    rest: Option<Rest<N, C, Self>>,
69    /// The sync module.
70    sync: BlockSync<N>,
71    /// The spawned handles.
72    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
73    /// The shutdown signal.
74    shutdown: Arc<AtomicBool>,
75}
76
77impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
78    /// Initializes a new validator node.
79    pub async fn new(
80        node_ip: SocketAddr,
81        bft_ip: Option<SocketAddr>,
82        rest_ip: Option<SocketAddr>,
83        rest_rps: u32,
84        account: Account<N>,
85        trusted_peers: &[SocketAddr],
86        trusted_validators: &[SocketAddr],
87        genesis: Block<N>,
88        cdn: Option<String>,
89        storage_mode: StorageMode,
90        allow_external_peers: bool,
91        dev_txs: bool,
92        shutdown: Arc<AtomicBool>,
93    ) -> Result<Self> {
94        // Initialize the signal handler.
95        let signal_node = Self::handle_signals(shutdown.clone());
96
97        // Initialize the ledger.
98        let ledger = Ledger::load(genesis, storage_mode.clone())?;
99
100        // Initialize the CDN.
101        if let Some(base_url) = cdn {
102            // Sync the ledger with the CDN.
103            if let Err((_, error)) =
104                snarkos_node_cdn::sync_ledger_with_cdn(&base_url, ledger.clone(), shutdown.clone()).await
105            {
106                crate::log_clean_error(&storage_mode);
107                return Err(error);
108            }
109        }
110
111        // Initialize the ledger service.
112        let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone()));
113
114        // Initialize the consensus.
115        let mut consensus =
116            Consensus::new(account.clone(), ledger_service.clone(), bft_ip, trusted_validators, storage_mode.clone())?;
117        // Initialize the primary channels.
118        let (primary_sender, primary_receiver) = init_primary_channels::<N>();
119        // Start the consensus.
120        consensus.run(primary_sender, primary_receiver).await?;
121        // Determine if the validator should rotate external peers.
122        let rotate_external_peers = false;
123
124        // Initialize the node router.
125        let router = Router::new(
126            node_ip,
127            NodeType::Validator,
128            account,
129            trusted_peers,
130            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
131            rotate_external_peers,
132            allow_external_peers,
133            matches!(storage_mode, StorageMode::Development(_)),
134        )
135        .await?;
136
137        // Initialize the sync module.
138        let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service, router.tcp().clone());
139
140        // Initialize the node.
141        let mut node = Self {
142            ledger: ledger.clone(),
143            consensus: consensus.clone(),
144            router,
145            rest: None,
146            sync,
147            handles: Default::default(),
148            shutdown,
149        };
150        // Initialize the transaction pool.
151        node.initialize_transaction_pool(storage_mode, dev_txs)?;
152
153        // Initialize the REST server.
154        if let Some(rest_ip) = rest_ip {
155            node.rest =
156                Some(Rest::start(rest_ip, rest_rps, Some(consensus), ledger.clone(), Arc::new(node.clone())).await?);
157        }
158        // Initialize the routing.
159        node.initialize_routing().await;
160        // Initialize the notification message loop.
161        node.handles.lock().push(crate::start_notification_message_loop());
162        // Pass the node to the signal handler.
163        let _ = signal_node.set(node.clone());
164        // Return the node.
165        Ok(node)
166    }
167
168    /// Returns the ledger.
169    pub fn ledger(&self) -> &Ledger<N, C> {
170        &self.ledger
171    }
172
173    /// Returns the REST server.
174    pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
175        &self.rest
176    }
177}
178
179impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
180    // /// Initialize the transaction pool.
181    // fn initialize_transaction_pool(&self, dev: Option<u16>) -> Result<()> {
182    //     use snarkvm::{
183    //         console::{
184    //             account::ViewKey,
185    //             program::{Identifier, Literal, Plaintext, ProgramID, Record, Value},
186    //             types::U64,
187    //         },
188    //         ledger::block::transition::Output,
189    //     };
190    //     use std::str::FromStr;
191    //
192    //     // Initialize the locator.
193    //     let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("split")?);
194    //     // Initialize the record name.
195    //     let record_name = Identifier::from_str("credits")?;
196    //
197    //     /// Searches the genesis block for the mint record.
198    //     fn search_genesis_for_mint<N: Network>(
199    //         block: Block<N>,
200    //         view_key: &ViewKey<N>,
201    //     ) -> Option<Record<N, Plaintext<N>>> {
202    //         for transition in block.transitions().filter(|t| t.is_mint()) {
203    //             if let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[0] {
204    //                 if ciphertext.is_owner(view_key) {
205    //                     match ciphertext.decrypt(view_key) {
206    //                         Ok(record) => return Some(record),
207    //                         Err(error) => {
208    //                             error!("Failed to decrypt the mint output record - {error}");
209    //                             return None;
210    //                         }
211    //                     }
212    //                 }
213    //             }
214    //         }
215    //         None
216    //     }
217    //
218    //     /// Searches the block for the split record.
219    //     fn search_block_for_split<N: Network>(
220    //         block: Block<N>,
221    //         view_key: &ViewKey<N>,
222    //     ) -> Option<Record<N, Plaintext<N>>> {
223    //         let mut found = None;
224    //         // TODO (howardwu): Switch to the iterator when DoubleEndedIterator is supported.
225    //         // block.transitions().rev().for_each(|t| {
226    //         let splits = block.transitions().filter(|t| t.is_split()).collect::<Vec<_>>();
227    //         splits.iter().rev().for_each(|t| {
228    //             if found.is_some() {
229    //                 return;
230    //             }
231    //             let Output::Record(_, _, Some(ciphertext)) = &t.outputs()[1] else {
232    //                 error!("Failed to find the split output record");
233    //                 return;
234    //             };
235    //             if ciphertext.is_owner(view_key) {
236    //                 match ciphertext.decrypt(view_key) {
237    //                     Ok(record) => found = Some(record),
238    //                     Err(error) => {
239    //                         error!("Failed to decrypt the split output record - {error}");
240    //                     }
241    //                 }
242    //             }
243    //         });
244    //         found
245    //     }
246    //
247    //     let self_ = self.clone();
248    //     self.spawn(async move {
249    //         // Retrieve the view key.
250    //         let view_key = self_.view_key();
251    //         // Initialize the record.
252    //         let mut record = {
253    //             let mut found = None;
254    //             let mut height = self_.ledger.latest_height();
255    //             while found.is_none() && height > 0 {
256    //                 // Retrieve the block.
257    //                 let Ok(block) = self_.ledger.get_block(height) else {
258    //                     error!("Failed to get block at height {}", height);
259    //                     break;
260    //                 };
261    //                 // Search for the latest split record.
262    //                 if let Some(record) = search_block_for_split(block, view_key) {
263    //                     found = Some(record);
264    //                 }
265    //                 // Decrement the height.
266    //                 height = height.saturating_sub(1);
267    //             }
268    //             match found {
269    //                 Some(record) => record,
270    //                 None => {
271    //                     // Retrieve the genesis block.
272    //                     let Ok(block) = self_.ledger.get_block(0) else {
273    //                         error!("Failed to get the genesis block");
274    //                         return;
275    //                     };
276    //                     // Search the genesis block for the mint record.
277    //                     if let Some(record) = search_genesis_for_mint(block, view_key) {
278    //                         found = Some(record);
279    //                     }
280    //                     found.expect("Failed to find the split output record")
281    //                 }
282    //             }
283    //         };
284    //         info!("Starting transaction pool...");
285    //         // Start the transaction loop.
286    //         loop {
287    //             tokio::time::sleep(Duration::from_secs(1)).await;
288    //             // If the node is running in development mode, only generate if you are allowed.
289    //             if let Some(dev) = dev {
290    //                 if dev != 0 {
291    //                     continue;
292    //                 }
293    //             }
294    //
295    //             // Prepare the inputs.
296    //             let inputs = [Value::from(record.clone()), Value::from(Literal::U64(U64::new(1)))].into_iter();
297    //             // Execute the transaction.
298    //             let transaction = match self_.ledger.vm().execute(
299    //                 self_.private_key(),
300    //                 locator,
301    //                 inputs,
302    //                 None,
303    //                 None,
304    //                 &mut rand::thread_rng(),
305    //             ) {
306    //                 Ok(transaction) => transaction,
307    //                 Err(error) => {
308    //                     error!("Transaction pool encountered an execution error - {error}");
309    //                     continue;
310    //                 }
311    //             };
312    //             // Retrieve the transition.
313    //             let Some(transition) = transaction.transitions().next() else {
314    //                 error!("Transaction pool encountered a missing transition");
315    //                 continue;
316    //             };
317    //             // Retrieve the second output.
318    //             let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[1] else {
319    //                 error!("Transaction pool encountered a missing output");
320    //                 continue;
321    //             };
322    //             // Save the second output record.
323    //             let Ok(next_record) = ciphertext.decrypt(view_key) else {
324    //                 error!("Transaction pool encountered a decryption error");
325    //                 continue;
326    //             };
327    //             // Broadcast the transaction.
328    //             if self_
329    //                 .unconfirmed_transaction(
330    //                     self_.router.local_ip(),
331    //                     UnconfirmedTransaction::from(transaction.clone()),
332    //                     transaction.clone(),
333    //                 )
334    //                 .await
335    //             {
336    //                 info!("Transaction pool broadcasted the transaction");
337    //                 let commitment = next_record.to_commitment(&locator.0, &record_name).unwrap();
338    //                 while !self_.ledger.contains_commitment(&commitment).unwrap_or(false) {
339    //                     tokio::time::sleep(Duration::from_secs(1)).await;
340    //                 }
341    //                 info!("Transaction accepted by the ledger");
342    //             }
343    //             // Save the record.
344    //             record = next_record;
345    //         }
346    //     });
347    //     Ok(())
348    // }
349
350    /// Initialize the transaction pool.
351    fn initialize_transaction_pool(&self, storage_mode: StorageMode, dev_txs: bool) -> Result<()> {
352        use snarkvm::console::{
353            program::{Identifier, Literal, ProgramID, Value},
354            types::U64,
355        };
356        use std::str::FromStr;
357
358        // Initialize the locator.
359        let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("transfer_public")?);
360
361        // Determine whether to start the loop.
362        match storage_mode {
363            // If the node is running in development mode, only generate if you are allowed.
364            StorageMode::Development(id) => {
365                // If the node is not the first node, or if we should not create dev traffic, do not start the loop.
366                if id != 0 || !dev_txs {
367                    return Ok(());
368                }
369            }
370            // If the node is not running in development mode, do not generate dev traffic.
371            _ => return Ok(()),
372        }
373
374        let self_ = self.clone();
375        self.spawn(async move {
376            tokio::time::sleep(Duration::from_secs(3)).await;
377            info!("Starting transaction pool...");
378
379            // Start the transaction loop.
380            loop {
381                tokio::time::sleep(Duration::from_millis(500)).await;
382
383                // Prepare the inputs.
384                let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))];
385                // Execute the transaction.
386                let self__ = self_.clone();
387                let transaction = match spawn_blocking!(self__.ledger.vm().execute(
388                    self__.private_key(),
389                    locator,
390                    inputs.into_iter(),
391                    None,
392                    10_000,
393                    None,
394                    &mut rand::thread_rng(),
395                )) {
396                    Ok(transaction) => transaction,
397                    Err(error) => {
398                        error!("Transaction pool encountered an execution error - {error}");
399                        continue;
400                    }
401                };
402                // Broadcast the transaction.
403                if self_
404                    .unconfirmed_transaction(
405                        self_.router.local_ip(),
406                        UnconfirmedTransaction::from(transaction.clone()),
407                        transaction.clone(),
408                    )
409                    .await
410                {
411                    info!("Transaction pool broadcasted the transaction");
412                }
413            }
414        });
415        Ok(())
416    }
417
418    /// Spawns a task with the given future; it should only be used for long-running tasks.
419    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
420        self.handles.lock().push(tokio::spawn(future));
421    }
422}
423
424#[async_trait]
425impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
426    /// Shuts down the node.
427    async fn shut_down(&self) {
428        info!("Shutting down...");
429
430        // Shut down the node.
431        trace!("Shutting down the node...");
432        self.shutdown.store(true, std::sync::atomic::Ordering::Release);
433
434        // Abort the tasks.
435        trace!("Shutting down the validator...");
436        self.handles.lock().iter().for_each(|handle| handle.abort());
437
438        // Shut down the router.
439        self.router.shut_down().await;
440
441        // Shut down consensus.
442        trace!("Shutting down consensus...");
443        self.consensus.shut_down().await;
444
445        info!("Node has shut down.");
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452    use snarkvm::prelude::{
453        MainnetV0,
454        VM,
455        store::{ConsensusStore, helpers::memory::ConsensusMemory},
456    };
457
458    use anyhow::bail;
459    use rand::SeedableRng;
460    use rand_chacha::ChaChaRng;
461    use std::str::FromStr;
462
463    type CurrentNetwork = MainnetV0;
464
465    /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test.
466    #[ignore]
467    #[tokio::test]
468    async fn test_profiler() -> Result<()> {
469        // Specify the node attributes.
470        let node = SocketAddr::from_str("0.0.0.0:4130").unwrap();
471        let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
472        let storage_mode = StorageMode::Development(0);
473        let dev_txs = true;
474
475        // Initialize an (insecure) fixed RNG.
476        let mut rng = ChaChaRng::seed_from_u64(1234567890u64);
477        // Initialize the account.
478        let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
479        // Initialize a new VM.
480        let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(
481            StorageMode::new_test(None),
482        )?)?;
483        // Initialize the genesis block.
484        let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
485
486        println!("Initializing validator node...");
487
488        let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
489            node,
490            None,
491            Some(rest),
492            10,
493            account,
494            &[],
495            &[],
496            genesis,
497            None,
498            storage_mode,
499            false,
500            dev_txs,
501            Default::default(),
502        )
503        .await
504        .unwrap();
505
506        println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
507
508        bail!("\n\nRemember to #[ignore] this test!\n\n")
509    }
510}