snarkos_node/validator/
mod.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19
20use snarkos_account::Account;
21use snarkos_node_bft::{ledger_service::CoreLedgerService, spawn_blocking};
22use snarkos_node_cdn::CdnBlockSync;
23use snarkos_node_consensus::Consensus;
24use snarkos_node_rest::Rest;
25use snarkos_node_router::{
26    Heartbeat,
27    Inbound,
28    Outbound,
29    Router,
30    Routing,
31    messages::{NodeType, PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction},
32};
33use snarkos_node_sync::{BlockSync, Ping};
34use snarkos_node_tcp::{
35    P2P,
36    protocols::{Disconnect, Handshake, OnConnect, Reading},
37};
38use snarkvm::prelude::{
39    Ledger,
40    Network,
41    block::{Block, Header},
42    puzzle::Solution,
43    store::ConsensusStorage,
44};
45
46use aleo_std::StorageMode;
47use anyhow::Result;
48use core::future::Future;
49#[cfg(feature = "locktick")]
50use locktick::parking_lot::Mutex;
51#[cfg(not(feature = "locktick"))]
52use parking_lot::Mutex;
53use std::{
54    net::SocketAddr,
55    sync::{Arc, atomic::AtomicBool},
56    time::Duration,
57};
58use tokio::task::JoinHandle;
59
60/// A validator is a full node, capable of validating blocks.
61#[derive(Clone)]
62pub struct Validator<N: Network, C: ConsensusStorage<N>> {
63    /// The ledger of the node.
64    ledger: Ledger<N, C>,
65    /// The consensus module of the node.
66    consensus: Consensus<N>,
67    /// The router of the node.
68    router: Router<N>,
69    /// The REST server of the node.
70    rest: Option<Rest<N, C, Self>>,
71    /// The block synchronization logic (used in the Router impl).
72    sync: Arc<BlockSync<N>>,
73    /// The spawned handles.
74    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
75    /// The shutdown signal.
76    shutdown: Arc<AtomicBool>,
77    /// Keeps track of sending pings.
78    ping: Arc<Ping<N>>,
79}
80
81impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
82    /// Initializes a new validator node.
83    pub async fn new(
84        node_ip: SocketAddr,
85        bft_ip: Option<SocketAddr>,
86        rest_ip: Option<SocketAddr>,
87        rest_rps: u32,
88        account: Account<N>,
89        trusted_peers: &[SocketAddr],
90        trusted_validators: &[SocketAddr],
91        genesis: Block<N>,
92        cdn: Option<String>,
93        storage_mode: StorageMode,
94        allow_external_peers: bool,
95        dev_txs: bool,
96        dev: Option<u16>,
97        shutdown: Arc<AtomicBool>,
98    ) -> Result<Self> {
99        // Initialize the signal handler.
100        let signal_node = Self::handle_signals(shutdown.clone());
101
102        // Initialize the ledger.
103        let ledger = Ledger::load(genesis, storage_mode.clone())?;
104
105        // Initialize the ledger service.
106        let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone()));
107
108        // Determine if the validator should rotate external peers.
109        let rotate_external_peers = false;
110
111        // Initialize the node router.
112        let router = Router::new(
113            node_ip,
114            NodeType::Validator,
115            account.clone(),
116            ledger_service.clone(),
117            trusted_peers,
118            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
119            rotate_external_peers,
120            allow_external_peers,
121            dev.is_some(),
122        )
123        .await?;
124
125        // Initialize the block synchronization logic.
126        let sync = Arc::new(BlockSync::new(ledger_service.clone()));
127        let locators = sync.get_block_locators()?;
128        let ping = Arc::new(Ping::new(router.clone(), locators));
129
130        // Initialize the consensus layer.
131        let consensus = Consensus::new(
132            account.clone(),
133            ledger_service.clone(),
134            sync.clone(),
135            bft_ip,
136            trusted_validators,
137            storage_mode.clone(),
138            ping.clone(),
139            dev,
140        )
141        .await?;
142
143        // Initialize the node.
144        let mut node = Self {
145            ledger: ledger.clone(),
146            consensus: consensus.clone(),
147            router,
148            rest: None,
149            sync: sync.clone(),
150            ping,
151            handles: Default::default(),
152            shutdown: shutdown.clone(),
153        };
154
155        // Perform sync with CDN (if enabled).
156        let cdn_sync = cdn.map(|base_url| Arc::new(CdnBlockSync::new(base_url, ledger.clone(), shutdown)));
157
158        // Initialize the transaction pool.
159        node.initialize_transaction_pool(dev, dev_txs)?;
160
161        // Initialize the REST server.
162        if let Some(rest_ip) = rest_ip {
163            node.rest = Some(
164                Rest::start(
165                    rest_ip,
166                    rest_rps,
167                    Some(consensus),
168                    ledger.clone(),
169                    Arc::new(node.clone()),
170                    cdn_sync.clone(),
171                    sync,
172                )
173                .await?,
174            );
175        }
176
177        // Set up everything else after CDN sync is done.
178        if let Some(cdn_sync) = cdn_sync {
179            if let Err(error) = cdn_sync.wait().await {
180                crate::log_clean_error(&storage_mode);
181                node.shut_down().await;
182                return Err(error);
183            }
184        }
185
186        // Initialize the routing.
187        node.initialize_routing().await;
188        // Initialize the notification message loop.
189        node.handles.lock().push(crate::start_notification_message_loop());
190        // Pass the node to the signal handler.
191        let _ = signal_node.set(node.clone());
192        // Return the node.
193        Ok(node)
194    }
195
196    /// Returns the ledger.
197    pub fn ledger(&self) -> &Ledger<N, C> {
198        &self.ledger
199    }
200
201    /// Returns the REST server.
202    pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
203        &self.rest
204    }
205}
206
207impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
208    // /// Initialize the transaction pool.
209    // fn initialize_transaction_pool(&self, dev: Option<u16>) -> Result<()> {
210    //     use snarkvm::{
211    //         console::{
212    //             account::ViewKey,
213    //             program::{Identifier, Literal, Plaintext, ProgramID, Record, Value},
214    //             types::U64,
215    //         },
216    //         ledger::block::transition::Output,
217    //     };
218    //     use std::str::FromStr;
219    //
220    //     // Initialize the locator.
221    //     let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("split")?);
222    //     // Initialize the record name.
223    //     let record_name = Identifier::from_str("credits")?;
224    //
225    //     /// Searches the genesis block for the mint record.
226    //     fn search_genesis_for_mint<N: Network>(
227    //         block: Block<N>,
228    //         view_key: &ViewKey<N>,
229    //     ) -> Option<Record<N, Plaintext<N>>> {
230    //         for transition in block.transitions().filter(|t| t.is_mint()) {
231    //             if let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[0] {
232    //                 if ciphertext.is_owner(view_key) {
233    //                     match ciphertext.decrypt(view_key) {
234    //                         Ok(record) => return Some(record),
235    //                         Err(error) => {
236    //                             error!("Failed to decrypt the mint output record - {error}");
237    //                             return None;
238    //                         }
239    //                     }
240    //                 }
241    //             }
242    //         }
243    //         None
244    //     }
245    //
246    //     /// Searches the block for the split record.
247    //     fn search_block_for_split<N: Network>(
248    //         block: Block<N>,
249    //         view_key: &ViewKey<N>,
250    //     ) -> Option<Record<N, Plaintext<N>>> {
251    //         let mut found = None;
252    //         // TODO (howardwu): Switch to the iterator when DoubleEndedIterator is supported.
253    //         // block.transitions().rev().for_each(|t| {
254    //         let splits = block.transitions().filter(|t| t.is_split()).collect::<Vec<_>>();
255    //         splits.iter().rev().for_each(|t| {
256    //             if found.is_some() {
257    //                 return;
258    //             }
259    //             let Output::Record(_, _, Some(ciphertext)) = &t.outputs()[1] else {
260    //                 error!("Failed to find the split output record");
261    //                 return;
262    //             };
263    //             if ciphertext.is_owner(view_key) {
264    //                 match ciphertext.decrypt(view_key) {
265    //                     Ok(record) => found = Some(record),
266    //                     Err(error) => {
267    //                         error!("Failed to decrypt the split output record - {error}");
268    //                     }
269    //                 }
270    //             }
271    //         });
272    //         found
273    //     }
274    //
275    //     let self_ = self.clone();
276    //     self.spawn(async move {
277    //         // Retrieve the view key.
278    //         let view_key = self_.view_key();
279    //         // Initialize the record.
280    //         let mut record = {
281    //             let mut found = None;
282    //             let mut height = self_.ledger.latest_height();
283    //             while found.is_none() && height > 0 {
284    //                 // Retrieve the block.
285    //                 let Ok(block) = self_.ledger.get_block(height) else {
286    //                     error!("Failed to get block at height {}", height);
287    //                     break;
288    //                 };
289    //                 // Search for the latest split record.
290    //                 if let Some(record) = search_block_for_split(block, view_key) {
291    //                     found = Some(record);
292    //                 }
293    //                 // Decrement the height.
294    //                 height = height.saturating_sub(1);
295    //             }
296    //             match found {
297    //                 Some(record) => record,
298    //                 None => {
299    //                     // Retrieve the genesis block.
300    //                     let Ok(block) = self_.ledger.get_block(0) else {
301    //                         error!("Failed to get the genesis block");
302    //                         return;
303    //                     };
304    //                     // Search the genesis block for the mint record.
305    //                     if let Some(record) = search_genesis_for_mint(block, view_key) {
306    //                         found = Some(record);
307    //                     }
308    //                     found.expect("Failed to find the split output record")
309    //                 }
310    //             }
311    //         };
312    //         info!("Starting transaction pool...");
313    //         // Start the transaction loop.
314    //         loop {
315    //             tokio::time::sleep(Duration::from_secs(1)).await;
316    //             // If the node is running in development mode, only generate if you are allowed.
317    //             if let Some(dev) = dev {
318    //                 if dev != 0 {
319    //                     continue;
320    //                 }
321    //             }
322    //
323    //             // Prepare the inputs.
324    //             let inputs = [Value::from(record.clone()), Value::from(Literal::U64(U64::new(1)))].into_iter();
325    //             // Execute the transaction.
326    //             let transaction = match self_.ledger.vm().execute(
327    //                 self_.private_key(),
328    //                 locator,
329    //                 inputs,
330    //                 None,
331    //                 None,
332    //                 &mut rand::thread_rng(),
333    //             ) {
334    //                 Ok(transaction) => transaction,
335    //                 Err(error) => {
336    //                     error!("Transaction pool encountered an execution error - {error}");
337    //                     continue;
338    //                 }
339    //             };
340    //             // Retrieve the transition.
341    //             let Some(transition) = transaction.transitions().next() else {
342    //                 error!("Transaction pool encountered a missing transition");
343    //                 continue;
344    //             };
345    //             // Retrieve the second output.
346    //             let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[1] else {
347    //                 error!("Transaction pool encountered a missing output");
348    //                 continue;
349    //             };
350    //             // Save the second output record.
351    //             let Ok(next_record) = ciphertext.decrypt(view_key) else {
352    //                 error!("Transaction pool encountered a decryption error");
353    //                 continue;
354    //             };
355    //             // Broadcast the transaction.
356    //             if self_
357    //                 .unconfirmed_transaction(
358    //                     self_.router.local_ip(),
359    //                     UnconfirmedTransaction::from(transaction.clone()),
360    //                     transaction.clone(),
361    //                 )
362    //                 .await
363    //             {
364    //                 info!("Transaction pool broadcasted the transaction");
365    //                 let commitment = next_record.to_commitment(&locator.0, &record_name).unwrap();
366    //                 while !self_.ledger.contains_commitment(&commitment).unwrap_or(false) {
367    //                     tokio::time::sleep(Duration::from_secs(1)).await;
368    //                 }
369    //                 info!("Transaction accepted by the ledger");
370    //             }
371    //             // Save the record.
372    //             record = next_record;
373    //         }
374    //     });
375    //     Ok(())
376    // }
377
378    /// Initialize the transaction pool.
379    fn initialize_transaction_pool(&self, dev: Option<u16>, dev_txs: bool) -> Result<()> {
380        use snarkvm::console::{
381            program::{Identifier, Literal, ProgramID, Value},
382            types::U64,
383        };
384        use std::str::FromStr;
385
386        // Initialize the locator.
387        let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("transfer_public")?);
388
389        // Determine whether to start the loop.
390        match dev {
391            // If the node is running in development mode, only generate if you are allowed.
392            Some(id) => {
393                // If the node is not the first node, or if we should not create dev traffic, do not start the loop.
394                if id != 0 || !dev_txs {
395                    return Ok(());
396                }
397            }
398            // If the node is not running in development mode, do not generate dev traffic.
399            _ => return Ok(()),
400        }
401
402        let self_ = self.clone();
403        self.spawn(async move {
404            tokio::time::sleep(Duration::from_secs(3)).await;
405            info!("Starting transaction pool...");
406
407            // Start the transaction loop.
408            loop {
409                tokio::time::sleep(Duration::from_millis(500)).await;
410
411                // Prepare the inputs.
412                let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))];
413                // Execute the transaction.
414                let self__ = self_.clone();
415                let transaction = match spawn_blocking!(self__.ledger.vm().execute(
416                    self__.private_key(),
417                    locator,
418                    inputs.into_iter(),
419                    None,
420                    10_000,
421                    None,
422                    &mut rand::thread_rng(),
423                )) {
424                    Ok(transaction) => transaction,
425                    Err(error) => {
426                        error!("Transaction pool encountered an execution error - {error}");
427                        continue;
428                    }
429                };
430                // Broadcast the transaction.
431                if self_
432                    .unconfirmed_transaction(
433                        self_.router.local_ip(),
434                        UnconfirmedTransaction::from(transaction.clone()),
435                        transaction.clone(),
436                    )
437                    .await
438                {
439                    info!("Transaction pool broadcasted the transaction");
440                }
441            }
442        });
443        Ok(())
444    }
445
446    /// Spawns a task with the given future; it should only be used for long-running tasks.
447    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
448        self.handles.lock().push(tokio::spawn(future));
449    }
450}
451
452#[async_trait]
453impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
454    /// Shuts down the node.
455    async fn shut_down(&self) {
456        info!("Shutting down...");
457
458        // Shut down the node.
459        trace!("Shutting down the node...");
460        self.shutdown.store(true, std::sync::atomic::Ordering::Release);
461
462        // Abort the tasks.
463        trace!("Shutting down the validator...");
464        self.handles.lock().iter().for_each(|handle| handle.abort());
465
466        // Shut down the router.
467        self.router.shut_down().await;
468
469        // Shut down consensus.
470        trace!("Shutting down consensus...");
471        self.consensus.shut_down().await;
472
473        info!("Node has shut down.");
474    }
475}
476
477#[cfg(test)]
478mod tests {
479    use super::*;
480    use snarkvm::prelude::{
481        MainnetV0,
482        VM,
483        store::{ConsensusStore, helpers::memory::ConsensusMemory},
484    };
485
486    use anyhow::bail;
487    use rand::SeedableRng;
488    use rand_chacha::ChaChaRng;
489    use std::str::FromStr;
490
491    type CurrentNetwork = MainnetV0;
492
493    /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test.
494    #[ignore]
495    #[tokio::test]
496    async fn test_profiler() -> Result<()> {
497        // Specify the node attributes.
498        let node = SocketAddr::from_str("0.0.0.0:4130").unwrap();
499        let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
500        let storage_mode = StorageMode::Development(0);
501        let dev_txs = true;
502
503        // Initialize an (insecure) fixed RNG.
504        let mut rng = ChaChaRng::seed_from_u64(1234567890u64);
505        // Initialize the account.
506        let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
507        // Initialize a new VM.
508        let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(
509            StorageMode::new_test(None),
510        )?)?;
511        // Initialize the genesis block.
512        let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
513
514        println!("Initializing validator node...");
515
516        let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
517            node,
518            None,
519            Some(rest),
520            10,
521            account,
522            &[],
523            &[],
524            genesis,
525            None,
526            storage_mode,
527            false,
528            dev_txs,
529            None,
530            Default::default(),
531        )
532        .await
533        .unwrap();
534
535        println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
536
537        bail!("\n\nRemember to #[ignore] this test!\n\n")
538    }
539}