amareleo_node/validator/
mod.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use amareleo_chain_account::Account;
17use amareleo_chain_tracing::TracingHandler;
18use amareleo_node_bft::{helpers::init_primary_channels, ledger_service::CoreLedgerService};
19use amareleo_node_consensus::Consensus;
20use amareleo_node_rest::Rest;
21
22use snarkvm::prelude::{Ledger, Network, block::Block, store::ConsensusStorage};
23
24use aleo_std::StorageMode;
25use anyhow::Result;
26use core::future::Future;
27use parking_lot::Mutex;
28use std::{
29    net::SocketAddr,
30    sync::{Arc, atomic::AtomicBool},
31};
32use tokio::task::JoinHandle;
33use tracing::subscriber::DefaultGuard;
34
35/// A validator is a full node, capable of validating blocks.
36#[derive(Clone)]
37pub struct Validator<N: Network, C: ConsensusStorage<N>> {
38    /// The ledger of the node.
39    ledger: Ledger<N, C>,
40    /// The consensus module of the node.
41    consensus: Consensus<N>,
42    /// The REST server of the node.
43    rest: Option<Rest<N, C>>,
44    /// The spawned handles.
45    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
46    /// Tracing handle
47    tracing: Option<TracingHandler>,
48    /// The shutdown signal.
49    shutdown: Arc<AtomicBool>,
50}
51
52impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
53    /// Initializes a new validator node.
54    pub async fn new(
55        rest_ip: SocketAddr,
56        rest_rps: u32,
57        account: Account<N>,
58        genesis: Block<N>,
59        keep_state: bool,
60        storage_mode: StorageMode,
61        tracing: Option<TracingHandler>,
62        shutdown: Arc<AtomicBool>,
63    ) -> Result<Self> {
64        // Initialize the ledger.
65        let ledger = Ledger::load(genesis, storage_mode.clone())?;
66
67        // Initialize the ledger service.
68        let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), tracing.clone(), shutdown.clone()));
69
70        // Initialize the consensus.
71        let mut consensus =
72            Consensus::new(account.clone(), ledger_service.clone(), keep_state, storage_mode.clone(), tracing.clone())?;
73        // Initialize the primary channels.
74        let (primary_sender, primary_receiver) = init_primary_channels::<N>();
75        // Start the consensus.
76        consensus.run(primary_sender, primary_receiver).await?;
77
78        // Initialize the node.
79        let mut node = Self {
80            ledger: ledger.clone(),
81            consensus: consensus.clone(),
82            rest: None,
83            handles: Default::default(),
84            tracing: tracing.clone(),
85            shutdown,
86        };
87
88        // Initialize the REST server.
89        node.rest = Some(Rest::start(rest_ip, rest_rps, Some(consensus), ledger.clone(), tracing.clone()).await?);
90
91        // Initialize the notification message loop.
92        node.handles.lock().push(crate::start_notification_message_loop());
93
94        // Return the node.
95        Ok(node)
96    }
97
98    /// Returns the ledger.
99    pub fn ledger(&self) -> &Ledger<N, C> {
100        &self.ledger
101    }
102
103    /// Returns the REST server.
104    pub fn rest(&self) -> &Option<Rest<N, C>> {
105        &self.rest
106    }
107}
108
109impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
110    /// Spawns a task with the given future; it should only be used for long-running tasks.
111    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
112        self.handles.lock().push(tokio::spawn(future));
113    }
114}
115
116impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
117    /// Retruns tracing guard
118    pub fn get_tracing_guard(&self) -> Option<DefaultGuard> {
119        self.tracing.clone().map(|trace_handle| trace_handle.subscribe_thread())
120    }
121
122    /// Shuts down the node.
123    pub async fn shut_down(&self) {
124        let _guard = self.get_tracing_guard();
125
126        info!("Shutting down...");
127
128        // Shut down rest server.
129        if let Some(rest) = &self.rest {
130            trace!("Shutting down the REST server...");
131            rest.shut_down().await;
132        }
133
134        // Shut down the node.
135        trace!("Shutting down the node...");
136        self.shutdown.store(true, std::sync::atomic::Ordering::Release);
137
138        // Abort the tasks.
139        trace!("Shutting down the validator...");
140        self.handles.lock().iter().for_each(|handle| handle.abort());
141
142        // Shut down consensus.
143        trace!("Shutting down consensus...");
144        self.consensus.shut_down().await;
145
146        info!("Node has shut down.");
147        info!("");
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use amareleo_node_bft::DEVELOPMENT_MODE_RNG_SEED;
155
156    use snarkvm::prelude::{
157        MainnetV0,
158        VM,
159        store::{ConsensusStore, helpers::memory::ConsensusMemory},
160    };
161
162    use anyhow::bail;
163    use rand::SeedableRng;
164    use rand_chacha::ChaChaRng;
165    use std::str::FromStr;
166
167    type CurrentNetwork = MainnetV0;
168
169    /// Use `RUST_MIN_STACK=67108864 cargo test --release profiler --features timer` to run this test.
170    #[ignore]
171    #[tokio::test]
172    async fn test_profiler() -> Result<()> {
173        // Specify the node attributes.
174        let rest = SocketAddr::from_str("0.0.0.0:3030").unwrap();
175        let storage_mode = StorageMode::Development(0);
176
177        // Initialize an (insecure) fixed RNG.
178        let mut rng = ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
179        // Initialize the account.
180        let account = Account::<CurrentNetwork>::new(&mut rng).unwrap();
181        // Initialize a new VM.
182        let vm = VM::from(ConsensusStore::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::open(None)?)?;
183        // Initialize the genesis block.
184        let genesis = vm.genesis_beacon(account.private_key(), &mut rng)?;
185
186        println!("Initializing validator node...");
187
188        let validator = Validator::<CurrentNetwork, ConsensusMemory<CurrentNetwork>>::new(
189            rest,
190            10,
191            account,
192            genesis,
193            false,
194            storage_mode,
195            None,
196            Default::default(),
197        )
198        .await
199        .unwrap();
200
201        println!("Loaded validator node with {} blocks", validator.ledger.latest_height(),);
202
203        bail!("\n\nRemember to #[ignore] this test!\n\n")
204    }
205}