snarkos_node/validator/
mod.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19use snarkos_account::Account;
20use snarkos_node_bft::{helpers::init_primary_channels, ledger_service::CoreLedgerService, spawn_blocking};
21use snarkos_node_consensus::Consensus;
22use snarkos_node_rest::Rest;
23use snarkos_node_router::{
24    Heartbeat,
25    Inbound,
26    Outbound,
27    Router,
28    Routing,
29    messages::{NodeType, PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction},
30};
31use snarkos_node_sync::{BlockSync, BlockSyncMode};
32use snarkos_node_tcp::{
33    P2P,
34    protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
35};
36use snarkvm::prelude::{
37    Ledger,
38    Network,
39    block::{Block, Header},
40    puzzle::Solution,
41    store::ConsensusStorage,
42};
43
44use aleo_std::StorageMode;
45use anyhow::Result;
46use core::future::Future;
47use parking_lot::Mutex;
48use std::{
49    net::SocketAddr,
50    sync::{Arc, atomic::AtomicBool},
51    time::Duration,
52};
53use tokio::task::JoinHandle;
54
55/// A validator is a full node, capable of validating blocks.
56#[derive(Clone)]
57pub struct Validator<N: Network, C: ConsensusStorage<N>> {
58    /// The ledger of the node.
59    ledger: Ledger<N, C>,
60    /// The consensus module of the node.
61    consensus: Consensus<N>,
62    /// The router of the node.
63    router: Router<N>,
64    /// The REST server of the node.
65    rest: Option<Rest<N, C, Self>>,
66    /// The sync module.
67    sync: BlockSync<N>,
68    /// The spawned handles.
69    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
70    /// The shutdown signal.
71    shutdown: Arc<AtomicBool>,
72}
73
74impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
75    /// Initializes a new validator node.
76    pub async fn new(
77        node_ip: SocketAddr,
78        bft_ip: Option<SocketAddr>,
79        rest_ip: Option<SocketAddr>,
80        rest_rps: u32,
81        account: Account<N>,
82        trusted_peers: &[SocketAddr],
83        trusted_validators: &[SocketAddr],
84        genesis: Block<N>,
85        cdn: Option<String>,
86        storage_mode: StorageMode,
87        allow_external_peers: bool,
88        dev_txs: bool,
89        shutdown: Arc<AtomicBool>,
90    ) -> Result<Self> {
91        // Initialize the signal handler.
92        let signal_node = Self::handle_signals(shutdown.clone());
93
94        // Initialize the ledger.
95        let ledger = Ledger::load(genesis, storage_mode.clone())?;
96
97        // Initialize the CDN.
98        if let Some(base_url) = cdn {
99            // Sync the ledger with the CDN.
100            if let Err((_, error)) =
101                snarkos_node_cdn::sync_ledger_with_cdn(&base_url, ledger.clone(), shutdown.clone()).await
102            {
103                crate::log_clean_error(&storage_mode);
104                return Err(error);
105            }
106        }
107
108        // Initialize the ledger service.
109        let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone()));
110
111        // Initialize the consensus.
112        let mut consensus =
113            Consensus::new(account.clone(), ledger_service.clone(), bft_ip, trusted_validators, storage_mode.clone())?;
114        // Initialize the primary channels.
115        let (primary_sender, primary_receiver) = init_primary_channels::<N>();
116        // Start the consensus.
117        consensus.run(primary_sender, primary_receiver).await?;
118        // Determine if the validator should rotate external peers.
119        let rotate_external_peers = false;
120
121        // Initialize the node router.
122        let router = Router::new(
123            node_ip,
124            NodeType::Validator,
125            account,
126            trusted_peers,
127            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
128            rotate_external_peers,
129            allow_external_peers,
130            matches!(storage_mode, StorageMode::Development(_)),
131        )
132        .await?;
133
134        // Initialize the sync module.
135        let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service, router.tcp().clone());
136
137        // Initialize the node.
138        let mut node = Self {
139            ledger: ledger.clone(),
140            consensus: consensus.clone(),
141            router,
142            rest: None,
143            sync,
144            handles: Default::default(),
145            shutdown,
146        };
147        // Initialize the transaction pool.
148        node.initialize_transaction_pool(storage_mode, dev_txs)?;
149
150        // Initialize the REST server.
151        if let Some(rest_ip) = rest_ip {
152            node.rest =
153                Some(Rest::start(rest_ip, rest_rps, Some(consensus), ledger.clone(), Arc::new(node.clone())).await?);
154        }
155        // Initialize the routing.
156        node.initialize_routing().await;
157        // Initialize the notification message loop.
158        node.handles.lock().push(crate::start_notification_message_loop());
159        // Pass the node to the signal handler.
160        let _ = signal_node.set(node.clone());
161        // Return the node.
162        Ok(node)
163    }
164
165    /// Returns the ledger.
166    pub fn ledger(&self) -> &Ledger<N, C> {
167        &self.ledger
168    }
169
170    /// Returns the REST server.
171    pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
172        &self.rest
173    }
174}
175
176impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
177    // /// Initialize the transaction pool.
178    // fn initialize_transaction_pool(&self, dev: Option<u16>) -> Result<()> {
179    //     use snarkvm::{
180    //         console::{
181    //             account::ViewKey,
182    //             program::{Identifier, Literal, Plaintext, ProgramID, Record, Value},
183    //             types::U64,
184    //         },
185    //         ledger::block::transition::Output,
186    //     };
187    //     use std::str::FromStr;
188    //
189    //     // Initialize the locator.
190    //     let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("split")?);
191    //     // Initialize the record name.
192    //     let record_name = Identifier::from_str("credits")?;
193    //
194    //     /// Searches the genesis block for the mint record.
195    //     fn search_genesis_for_mint<N: Network>(
196    //         block: Block<N>,
197    //         view_key: &ViewKey<N>,
198    //     ) -> Option<Record<N, Plaintext<N>>> {
199    //         for transition in block.transitions().filter(|t| t.is_mint()) {
200    //             if let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[0] {
201    //                 if ciphertext.is_owner(view_key) {
202    //                     match ciphertext.decrypt(view_key) {
203    //                         Ok(record) => return Some(record),
204    //                         Err(error) => {
205    //                             error!("Failed to decrypt the mint output record - {error}");
206    //                             return None;
207    //                         }
208    //                     }
209    //                 }
210    //             }
211    //         }
212    //         None
213    //     }
214    //
215    //     /// Searches the block for the split record.
216    //     fn search_block_for_split<N: Network>(
217    //         block: Block<N>,
218    //         view_key: &ViewKey<N>,
219    //     ) -> Option<Record<N, Plaintext<N>>> {
220    //         let mut found = None;
221    //         // TODO (howardwu): Switch to the iterator when DoubleEndedIterator is supported.
222    //         // block.transitions().rev().for_each(|t| {
223    //         let splits = block.transitions().filter(|t| t.is_split()).collect::<Vec<_>>();
224    //         splits.iter().rev().for_each(|t| {
225    //             if found.is_some() {
226    //                 return;
227    //             }
228    //             let Output::Record(_, _, Some(ciphertext)) = &t.outputs()[1] else {
229    //                 error!("Failed to find the split output record");
230    //                 return;
231    //             };
232    //             if ciphertext.is_owner(view_key) {
233    //                 match ciphertext.decrypt(view_key) {
234    //                     Ok(record) => found = Some(record),
235    //                     Err(error) => {
236    //                         error!("Failed to decrypt the split output record - {error}");
237    //                     }
238    //                 }
239    //             }
240    //         });
241    //         found
242    //     }
243    //
244    //     let self_ = self.clone();
245    //     self.spawn(async move {
246    //         // Retrieve the view key.
247    //         let view_key = self_.view_key();
248    //         // Initialize the record.
249    //         let mut record = {
250    //             let mut found = None;
251    //             let mut height = self_.ledger.latest_height();
252    //             while found.is_none() && height > 0 {
253    //                 // Retrieve the block.
254    //                 let Ok(block) = self_.ledger.get_block(height) else {
255    //                     error!("Failed to get block at height {}", height);
256    //                     break;
257    //                 };
258    //                 // Search for the latest split record.
259    //                 if let Some(record) = search_block_for_split(block, view_key) {
260    //                     found = Some(record);
261    //                 }
262    //                 // Decrement the height.
263    //                 height = height.saturating_sub(1);
264    //             }
265    //             match found {
266    //                 Some(record) => record,
267    //                 None => {
268    //                     // Retrieve the genesis block.
269    //                     let Ok(block) = self_.ledger.get_block(0) else {
270    //                         error!("Failed to get the genesis block");
271    //                         return;
272    //                     };
273    //                     // Search the genesis block for the mint record.
274    //                     if let Some(record) = search_genesis_for_mint(block, view_key) {
275    //                         found = Some(record);
276    //                     }
277    //                     found.expect("Failed to find the split output record")
278    //                 }
279    //             }
280    //         };
281    //         info!("Starting transaction pool...");
282    //         // Start the transaction loop.
283    //         loop {
284    //             tokio::time::sleep(Duration::from_secs(1)).await;
285    //             // If the node is running in development mode, only generate if you are allowed.
286    //             if let Some(dev) = dev {
287    //                 if dev != 0 {
288    //                     continue;
289    //                 }
290    //             }
291    //
292    //             // Prepare the inputs.
293    //             let inputs = [Value::from(record.clone()), Value::from(Literal::U64(U64::new(1)))].into_iter();
294    //             // Execute the transaction.
295    //             let transaction = match self_.ledger.vm().execute(
296    //                 self_.private_key(),
297    //                 locator,
298    //                 inputs,
299    //                 None,
300    //                 None,
301    //                 &mut rand::thread_rng(),
302    //             ) {
303    //                 Ok(transaction) => transaction,
304    //                 Err(error) => {
305    //                     error!("Transaction pool encountered an execution error - {error}");
306    //                     continue;
307    //                 }
308    //             };
309    //             // Retrieve the transition.
310    //             let Some(transition) = transaction.transitions().next() else {
311    //                 error!("Transaction pool encountered a missing transition");
312    //                 continue;
313    //             };
314    //             // Retrieve the second output.
315    //             let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[1] else {
316    //                 error!("Transaction pool encountered a missing output");
317    //                 continue;
318    //             };
319    //             // Save the second output record.
320    //             let Ok(next_record) = ciphertext.decrypt(view_key) else {
321    //                 error!("Transaction pool encountered a decryption error");
322    //                 continue;
323    //             };
324    //             // Broadcast the transaction.
325    //             if self_
326    //                 .unconfirmed_transaction(
327    //                     self_.router.local_ip(),
328    //                     UnconfirmedTransaction::from(transaction.clone()),
329    //                     transaction.clone(),
330    //                 )
331    //                 .await
332    //             {
333    //                 info!("Transaction pool broadcasted the transaction");
334    //                 let commitment = next_record.to_commitment(&locator.0, &record_name).unwrap();
335    //                 while !self_.ledger.contains_commitment(&commitment).unwrap_or(false) {
336    //                     tokio::time::sleep(Duration::from_secs(1)).await;
337    //                 }
338    //                 info!("Transaction accepted by the ledger");
339    //             }
340    //             // Save the record.
341    //             record = next_record;
342    //         }
343    //     });
344    //     Ok(())
345    // }
346
347    /// Initialize the transaction pool.
348    fn initialize_transaction_pool(&self, storage_mode: StorageMode, dev_txs: bool) -> Result<()> {
349        use snarkvm::console::{
350            program::{Identifier, Literal, ProgramID, Value},
351            types::U64,
352        };
353        use std::str::FromStr;
354
355        // Initialize the locator.
356        let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("transfer_public")?);
357
358        // Determine whether to start the loop.
359        match storage_mode {
360            // If the node is running in development mode, only generate if you are allowed.
361            StorageMode::Development(id) => {
362                // If the node is not the first node, or if we should not create dev traffic, do not start the loop.
363                if id != 0 || !dev_txs {
364                    return Ok(());
365                }
366            }
367            // If the node is not running in development mode, do not generate dev traffic.
368            _ => return Ok(()),
369        }
370
371        let self_ = self.clone();
372        self.spawn(async move {
373            tokio::time::sleep(Duration::from_secs(3)).await;
374            info!("Starting transaction pool...");
375
376            // Start the transaction loop.
377            loop {
378                tokio::time::sleep(Duration::from_millis(500)).await;
379
380                // Prepare the inputs.
381                let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))];
382                // Execute the transaction.
383                let self__ = self_.clone();
384                let transaction = match spawn_blocking!(self__.ledger.vm().execute(
385                    self__.private_key(),
386                    locator,
387                    inputs.into_iter(),
388                    None,
389                    10_000,
390                    None,
391                    &mut rand::thread_rng(),
392                )) {
393                    Ok(transaction) => transaction,
394                    Err(error) => {
395                        error!("Transaction pool encountered an execution error - {error}");
396                        continue;
397                    }
398                };
399                // Broadcast the transaction.
400                if self_
401                    .unconfirmed_transaction(
402                        self_.router.local_ip(),
403                        UnconfirmedTransaction::from(transaction.clone()),
404                        transaction.clone(),
405                    )
406                    .await
407                {
408                    info!("Transaction pool broadcasted the transaction");
409                }
410            }
411        });
412        Ok(())
413    }
414
415    /// Spawns a task with the given future; it should only be used for long-running tasks.
416    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
417        self.handles.lock().push(tokio::spawn(future));
418    }
419}
420
421#[async_trait]
422impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
423    /// Shuts down the node.
424    async fn shut_down(&self) {
425        info!("Shutting down...");
426
427        // Shut down the node.
428        trace!("Shutting down the node...");
429        self.shutdown.store(true, std::sync::atomic::Ordering::Release);
430
431        // Abort the tasks.
432        trace!("Shutting down the validator...");
433        self.handles.lock().iter().for_each(|handle| handle.abort());
434
435        // Shut down the router.
436        self.router.shut_down().await;
437
438        // Shut down consensus.
439        trace!("Shutting down consensus...");
440        self.consensus.shut_down().await;
441
442        info!("Node has shut down.");
443    }
444}
445
446#[cfg(test)]
447mod tests {
448    use super::*;
449    use snarkvm::prelude::{
450        MainnetV0,
451        VM,
452        store::{ConsensusStore, helpers::memory::ConsensusMemory},
453    };
454
455    use anyhow::bail;
456    use rand::SeedableRng;
457    use rand_chacha::ChaChaRng;
458    use std::str::FromStr;
459
460    type CurrentNetwork = MainnetV0;
461
462    /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test.
463    #[ignore]
464    #[tokio::test]
465    async fn test_profiler() -> Result<()> {
466        // Specify the node attributes.
467        let node = SocketAddr::from_str("0.0.0.0:4130").unwrap();
468        let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
469        let storage_mode = StorageMode::Development(0);
470        let dev_txs = true;
471
472        // Initialize an (insecure) fixed RNG.
473        let mut rng = ChaChaRng::seed_from_u64(1234567890u64);
474        // Initialize the account.
475        let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
476        // Initialize a new VM.
477        let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(None)?)?;
478        // Initialize the genesis block.
479        let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
480
481        println!("Initializing validator node...");
482
483        let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
484            node,
485            None,
486            Some(rest),
487            10,
488            account,
489            &[],
490            &[],
491            genesis,
492            None,
493            storage_mode,
494            false,
495            dev_txs,
496            Default::default(),
497        )
498        .await
499        .unwrap();
500
501        println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
502
503        bail!("\n\nRemember to #[ignore] this test!\n\n")
504    }
505}