snarkos_node/validator/
mod.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19
20use snarkos_account::Account;
21use snarkos_node_bft::{ledger_service::CoreLedgerService, spawn_blocking};
22use snarkos_node_cdn::CdnBlockSync;
23use snarkos_node_consensus::Consensus;
24use snarkos_node_network::{NodeType, PeerPoolHandling};
25use snarkos_node_rest::Rest;
26use snarkos_node_router::{
27    Heartbeat,
28    Inbound,
29    Outbound,
30    Router,
31    Routing,
32    messages::{PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction},
33};
34use snarkos_node_sync::{BlockSync, Ping};
35use snarkos_node_tcp::{
36    P2P,
37    protocols::{Disconnect, Handshake, OnConnect, Reading},
38};
39use snarkvm::prelude::{
40    Ledger,
41    Network,
42    block::{Block, Header},
43    puzzle::Solution,
44    store::ConsensusStorage,
45};
46
47use aleo_std::StorageMode;
48use anyhow::Result;
49use core::future::Future;
50#[cfg(feature = "locktick")]
51use locktick::parking_lot::Mutex;
52#[cfg(not(feature = "locktick"))]
53use parking_lot::Mutex;
54use std::{
55    net::SocketAddr,
56    sync::{Arc, atomic::AtomicBool},
57    time::Duration,
58};
59use tokio::task::JoinHandle;
60
61/// A validator is a full node, capable of validating blocks.
62#[derive(Clone)]
63pub struct Validator<N: Network, C: ConsensusStorage<N>> {
64    /// The ledger of the node.
65    ledger: Ledger<N, C>,
66    /// The consensus module of the node.
67    consensus: Consensus<N>,
68    /// The router of the node.
69    router: Router<N>,
70    /// The REST server of the node.
71    rest: Option<Rest<N, C, Self>>,
72    /// The block synchronization logic (used in the Router impl).
73    sync: Arc<BlockSync<N>>,
74    /// The spawned handles.
75    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
76    /// The shutdown signal.
77    shutdown: Arc<AtomicBool>,
78    /// Keeps track of sending pings.
79    ping: Arc<Ping<N>>,
80}
81
82impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
83    /// Initializes a new validator node.
84    pub async fn new(
85        node_ip: SocketAddr,
86        bft_ip: Option<SocketAddr>,
87        rest_ip: Option<SocketAddr>,
88        rest_rps: u32,
89        account: Account<N>,
90        trusted_peers: &[SocketAddr],
91        trusted_validators: &[SocketAddr],
92        genesis: Block<N>,
93        cdn: Option<http::Uri>,
94        storage_mode: StorageMode,
95        trusted_peers_only: bool,
96        dev_txs: bool,
97        dev: Option<u16>,
98        shutdown: Arc<AtomicBool>,
99    ) -> Result<Self> {
100        // Initialize the signal handler.
101        let signal_node = Self::handle_signals(shutdown.clone());
102
103        // Initialize the ledger.
104        let ledger = Ledger::load(genesis, storage_mode.clone())?;
105
106        // Initialize the ledger service.
107        let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone()));
108
109        // Initialize the node router.
110        let router = Router::new(
111            node_ip,
112            NodeType::Validator,
113            account.clone(),
114            ledger_service.clone(),
115            trusted_peers,
116            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
117            trusted_peers_only,
118            storage_mode.clone(),
119            dev.is_some(),
120        )
121        .await?;
122
123        // Initialize the block synchronization logic.
124        let sync = Arc::new(BlockSync::new(ledger_service.clone()));
125        let locators = sync.get_block_locators()?;
126        let ping = Arc::new(Ping::new(router.clone(), locators));
127
128        // Initialize the consensus layer.
129        let consensus = Consensus::new(
130            account.clone(),
131            ledger_service.clone(),
132            sync.clone(),
133            bft_ip,
134            trusted_validators,
135            trusted_peers_only,
136            storage_mode.clone(),
137            ping.clone(),
138            dev,
139        )
140        .await?;
141
142        // Initialize the node.
143        let mut node = Self {
144            ledger: ledger.clone(),
145            consensus: consensus.clone(),
146            router,
147            rest: None,
148            sync: sync.clone(),
149            ping,
150            handles: Default::default(),
151            shutdown: shutdown.clone(),
152        };
153
154        // Perform sync with CDN (if enabled).
155        let cdn_sync = cdn.map(|base_url| Arc::new(CdnBlockSync::new(base_url, ledger.clone(), shutdown)));
156
157        // Initialize the transaction pool.
158        node.initialize_transaction_pool(dev, dev_txs)?;
159
160        // Initialize the REST server.
161        if let Some(rest_ip) = rest_ip {
162            node.rest = Some(
163                Rest::start(
164                    rest_ip,
165                    rest_rps,
166                    Some(consensus),
167                    ledger.clone(),
168                    Arc::new(node.clone()),
169                    cdn_sync.clone(),
170                    sync,
171                )
172                .await?,
173            );
174        }
175
176        // Set up everything else after CDN sync is done.
177        if let Some(cdn_sync) = cdn_sync {
178            if let Err(error) = cdn_sync.wait().await {
179                crate::log_clean_error(&storage_mode);
180                node.shut_down().await;
181                return Err(error);
182            }
183        }
184
185        // Initialize the routing.
186        node.initialize_routing().await;
187        // Initialize the notification message loop.
188        node.handles.lock().push(crate::start_notification_message_loop());
189        // Pass the node to the signal handler.
190        let _ = signal_node.set(node.clone());
191        // Return the node.
192        Ok(node)
193    }
194
195    /// Returns the ledger.
196    pub fn ledger(&self) -> &Ledger<N, C> {
197        &self.ledger
198    }
199
200    /// Returns the REST server.
201    pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
202        &self.rest
203    }
204
205    /// Returns the router.
206    pub fn router(&self) -> &Router<N> {
207        &self.router
208    }
209
210    // /// Initialize the transaction pool.
211    // fn initialize_transaction_pool(&self, dev: Option<u16>) -> Result<()> {
212    //     use snarkvm::{
213    //         console::{
214    //             account::ViewKey,
215    //             program::{Identifier, Literal, Plaintext, ProgramID, Record, Value},
216    //             types::U64,
217    //         },
218    //         ledger::block::transition::Output,
219    //     };
220    //     use std::str::FromStr;
221    //
222    //     // Initialize the locator.
223    //     let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("split")?);
224    //     // Initialize the record name.
225    //     let record_name = Identifier::from_str("credits")?;
226    //
227    //     /// Searches the genesis block for the mint record.
228    //     fn search_genesis_for_mint<N: Network>(
229    //         block: Block<N>,
230    //         view_key: &ViewKey<N>,
231    //     ) -> Option<Record<N, Plaintext<N>>> {
232    //         for transition in block.transitions().filter(|t| t.is_mint()) {
233    //             if let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[0] {
234    //                 if ciphertext.is_owner(view_key) {
235    //                     match ciphertext.decrypt(view_key) {
236    //                         Ok(record) => return Some(record),
237    //                         Err(error) => {
238    //                             error!("Failed to decrypt the mint output record - {error}");
239    //                             return None;
240    //                         }
241    //                     }
242    //                 }
243    //             }
244    //         }
245    //         None
246    //     }
247    //
248    //     /// Searches the block for the split record.
249    //     fn search_block_for_split<N: Network>(
250    //         block: Block<N>,
251    //         view_key: &ViewKey<N>,
252    //     ) -> Option<Record<N, Plaintext<N>>> {
253    //         let mut found = None;
254    //         // TODO (howardwu): Switch to the iterator when DoubleEndedIterator is supported.
255    //         // block.transitions().rev().for_each(|t| {
256    //         let splits = block.transitions().filter(|t| t.is_split()).collect::<Vec<_>>();
257    //         splits.iter().rev().for_each(|t| {
258    //             if found.is_some() {
259    //                 return;
260    //             }
261    //             let Output::Record(_, _, Some(ciphertext)) = &t.outputs()[1] else {
262    //                 error!("Failed to find the split output record");
263    //                 return;
264    //             };
265    //             if ciphertext.is_owner(view_key) {
266    //                 match ciphertext.decrypt(view_key) {
267    //                     Ok(record) => found = Some(record),
268    //                     Err(error) => {
269    //                         error!("Failed to decrypt the split output record - {error}");
270    //                     }
271    //                 }
272    //             }
273    //         });
274    //         found
275    //     }
276    //
277    //     let self_ = self.clone();
278    //     self.spawn(async move {
279    //         // Retrieve the view key.
280    //         let view_key = self_.view_key();
281    //         // Initialize the record.
282    //         let mut record = {
283    //             let mut found = None;
284    //             let mut height = self_.ledger.latest_height();
285    //             while found.is_none() && height > 0 {
286    //                 // Retrieve the block.
287    //                 let Ok(block) = self_.ledger.get_block(height) else {
288    //                     error!("Failed to get block at height {}", height);
289    //                     break;
290    //                 };
291    //                 // Search for the latest split record.
292    //                 if let Some(record) = search_block_for_split(block, view_key) {
293    //                     found = Some(record);
294    //                 }
295    //                 // Decrement the height.
296    //                 height = height.saturating_sub(1);
297    //             }
298    //             match found {
299    //                 Some(record) => record,
300    //                 None => {
301    //                     // Retrieve the genesis block.
302    //                     let Ok(block) = self_.ledger.get_block(0) else {
303    //                         error!("Failed to get the genesis block");
304    //                         return;
305    //                     };
306    //                     // Search the genesis block for the mint record.
307    //                     if let Some(record) = search_genesis_for_mint(block, view_key) {
308    //                         found = Some(record);
309    //                     }
310    //                     found.expect("Failed to find the split output record")
311    //                 }
312    //             }
313    //         };
314    //         info!("Starting transaction pool...");
315    //         // Start the transaction loop.
316    //         loop {
317    //             tokio::time::sleep(Duration::from_secs(1)).await;
318    //             // If the node is running in development mode, only generate if you are allowed.
319    //             if let Some(dev) = dev {
320    //                 if dev != 0 {
321    //                     continue;
322    //                 }
323    //             }
324    //
325    //             // Prepare the inputs.
326    //             let inputs = [Value::from(record.clone()), Value::from(Literal::U64(U64::new(1)))].into_iter();
327    //             // Execute the transaction.
328    //             let transaction = match self_.ledger.vm().execute(
329    //                 self_.private_key(),
330    //                 locator,
331    //                 inputs,
332    //                 None,
333    //                 None,
334    //                 &mut rand::thread_rng(),
335    //             ) {
336    //                 Ok(transaction) => transaction,
337    //                 Err(error) => {
338    //                     error!("Transaction pool encountered an execution error - {error}");
339    //                     continue;
340    //                 }
341    //             };
342    //             // Retrieve the transition.
343    //             let Some(transition) = transaction.transitions().next() else {
344    //                 error!("Transaction pool encountered a missing transition");
345    //                 continue;
346    //             };
347    //             // Retrieve the second output.
348    //             let Output::Record(_, _, Some(ciphertext)) = &transition.outputs()[1] else {
349    //                 error!("Transaction pool encountered a missing output");
350    //                 continue;
351    //             };
352    //             // Save the second output record.
353    //             let Ok(next_record) = ciphertext.decrypt(view_key) else {
354    //                 error!("Transaction pool encountered a decryption error");
355    //                 continue;
356    //             };
357    //             // Broadcast the transaction.
358    //             if self_
359    //                 .unconfirmed_transaction(
360    //                     self_.router.local_ip(),
361    //                     UnconfirmedTransaction::from(transaction.clone()),
362    //                     transaction.clone(),
363    //                 )
364    //                 .await
365    //             {
366    //                 info!("Transaction pool broadcasted the transaction");
367    //                 let commitment = next_record.to_commitment(&locator.0, &record_name).unwrap();
368    //                 while !self_.ledger.contains_commitment(&commitment).unwrap_or(false) {
369    //                     tokio::time::sleep(Duration::from_secs(1)).await;
370    //                 }
371    //                 info!("Transaction accepted by the ledger");
372    //             }
373    //             // Save the record.
374    //             record = next_record;
375    //         }
376    //     });
377    //     Ok(())
378    // }
379
380    /// Initializes the transaction pool (if in development mode).
381    ///
382    /// Spawns a background task that periodically issues transactions to the network.
383    fn initialize_transaction_pool(&self, dev: Option<u16>, dev_txs: bool) -> Result<()> {
384        use snarkvm::console::{
385            program::{Identifier, Literal, ProgramID, Value},
386            types::U64,
387        };
388        use std::str::FromStr;
389
390        // Initialize the locator.
391        let locator = (ProgramID::from_str("credits.aleo")?, Identifier::from_str("transfer_public")?);
392
393        // Determine whether to start the loop.
394        match dev {
395            // If the node is running in development mode, only generate if you are allowed.
396            Some(id) => {
397                // If the node is not the first node, or if we should not create dev traffic, do not start the loop.
398                if id != 0 || !dev_txs {
399                    return Ok(());
400                }
401            }
402            // If the node is not running in development mode, do not generate dev traffic.
403            _ => return Ok(()),
404        }
405
406        let self_ = self.clone();
407        self.spawn(async move {
408            tokio::time::sleep(Duration::from_secs(3)).await;
409            info!("Starting transaction pool...");
410
411            // Start the transaction loop.
412            loop {
413                tokio::time::sleep(Duration::from_millis(500)).await;
414
415                // Prepare the inputs.
416                let inputs = [Value::from(Literal::Address(self_.address())), Value::from(Literal::U64(U64::new(1)))];
417                // Execute the transaction.
418                let self__ = self_.clone();
419                let transaction = match spawn_blocking!(self__.ledger.vm().execute(
420                    self__.private_key(),
421                    locator,
422                    inputs.into_iter(),
423                    None,
424                    10_000,
425                    None,
426                    &mut rand::thread_rng(),
427                )) {
428                    Ok(transaction) => transaction,
429                    Err(error) => {
430                        error!("Transaction pool encountered an execution error - {error}");
431                        continue;
432                    }
433                };
434                // Broadcast the transaction.
435                if self_
436                    .unconfirmed_transaction(
437                        self_.router.local_ip(),
438                        UnconfirmedTransaction::from(transaction.clone()),
439                        transaction.clone(),
440                    )
441                    .await
442                {
443                    info!("Transaction pool broadcasted the transaction");
444                }
445            }
446        });
447        Ok(())
448    }
449
450    /// Spawns a task with the given future; it should only be used for long-running tasks.
451    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
452        self.handles.lock().push(tokio::spawn(future));
453    }
454}
455
456#[async_trait]
457impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
458    /// Shuts down the node.
459    async fn shut_down(&self) {
460        info!("Shutting down...");
461
462        // Shut down the node.
463        trace!("Shutting down the node...");
464        self.shutdown.store(true, std::sync::atomic::Ordering::Release);
465
466        // Abort the tasks.
467        trace!("Shutting down the validator...");
468        self.handles.lock().iter().for_each(|handle| handle.abort());
469
470        // Shut down the router.
471        self.router.shut_down().await;
472
473        // Shut down consensus.
474        trace!("Shutting down consensus...");
475        self.consensus.shut_down().await;
476
477        info!("Node has shut down.");
478    }
479}
480
481#[cfg(test)]
482mod tests {
483    use super::*;
484    use snarkvm::prelude::{
485        MainnetV0,
486        VM,
487        store::{ConsensusStore, helpers::memory::ConsensusMemory},
488    };
489
490    use anyhow::bail;
491    use rand::SeedableRng;
492    use rand_chacha::ChaChaRng;
493    use std::str::FromStr;
494
495    type CurrentNetwork = MainnetV0;
496
497    /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test.
498    #[ignore]
499    #[tokio::test]
500    async fn test_profiler() -> Result<()> {
501        // Specify the node attributes.
502        let node = SocketAddr::from_str("0.0.0.0:4130").unwrap();
503        let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
504        let storage_mode = StorageMode::Development(0);
505        let dev_txs = true;
506
507        // Initialize an (insecure) fixed RNG.
508        let mut rng = ChaChaRng::seed_from_u64(1234567890u64);
509        // Initialize the account.
510        let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
511        // Initialize a new VM.
512        let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(
513            StorageMode::new_test(None),
514        )?)?;
515        // Initialize the genesis block.
516        let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
517
518        println!("Initializing validator node...");
519
520        let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
521            node,
522            None,
523            Some(rest),
524            10,
525            account,
526            &[],
527            &[],
528            genesis,
529            None,
530            storage_mode,
531            false,
532            dev_txs,
533            None,
534            Default::default(),
535        )
536        .await
537        .unwrap();
538
539        println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
540
541        bail!("\n\nRemember to #[ignore] this test!\n\n")
542    }
543}