Skip to main content

snarkos_node/
node.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    BootstrapClient,
18    Client,
19    Prover,
20    Validator,
21    network::{NodeType, Peer, PeerPoolHandling},
22    router::Outbound,
23    traits::NodeInterface,
24};
25
26use snarkos_account::Account;
27use snarkos_utilities::{NodeDataDir, SignalHandler};
28
29use snarkvm::prelude::{
30    Address,
31    Header,
32    Ledger,
33    Network,
34    PrivateKey,
35    ViewKey,
36    block::Block,
37    store::helpers::{memory::ConsensusMemory, rocksdb::ConsensusDB},
38};
39
40use aleo_std::{StorageMode, aleo_ledger_dir};
41use anyhow::{Result, bail};
42
43#[cfg(feature = "locktick")]
44use locktick::parking_lot::RwLock;
45#[cfg(not(feature = "locktick"))]
46use parking_lot::RwLock;
47use std::{cmp, collections::HashMap, fs, net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc, time::Duration};
48use tokio::task;
49
50/// The number of blocks between automatic database checkpoints.
51const CHECKPOINT_BLOCK_FREQUENCY: u32 = 1000;
52
53/// The maximum number of automatic database checkpoints kept at any time.
54const MAX_AUTO_CHECKPOINTS: usize = 5;
55
56#[derive(Clone)]
57pub enum Node<N: Network> {
58    /// A validator is a full node, capable of validating blocks.
59    Validator(Arc<Validator<N, ConsensusDB<N>>>),
60    /// A prover is a light node, capable of producing proofs for consensus.
61    Prover(Arc<Prover<N, ConsensusMemory<N>>>),
62    /// A client node is a full node, capable of querying with the network.
63    Client(Arc<Client<N, ConsensusDB<N>>>),
64    /// A bootstrap client node is a light node dedicated to serving lists of peers.
65    BootstrapClient(BootstrapClient<N>),
66}
67
68impl<N: Network> Node<N> {
69    /// Initializes a new validator node.
70    pub async fn new_validator(
71        node_ip: SocketAddr,
72        bft_ip: Option<SocketAddr>,
73        rest_ip: Option<SocketAddr>,
74        rest_rps: u32,
75        account: Account<N>,
76        trusted_peers: &[SocketAddr],
77        trusted_validators: &[SocketAddr],
78        genesis: Block<N>,
79        cdn: Option<http::Uri>,
80        storage_mode: StorageMode,
81        node_data_dir: NodeDataDir,
82        trusted_peers_only: bool,
83        auto_db_checkpoints: Option<PathBuf>,
84        dev_txs: bool,
85        dev: Option<u16>,
86        signal_handler: Arc<SignalHandler>,
87    ) -> Result<Self> {
88        let validator = Arc::new(
89            Validator::new(
90                node_ip,
91                bft_ip,
92                rest_ip,
93                rest_rps,
94                account,
95                trusted_peers,
96                trusted_validators,
97                genesis,
98                cdn,
99                storage_mode,
100                node_data_dir,
101                trusted_peers_only,
102                dev_txs,
103                dev,
104                signal_handler,
105            )
106            .await?,
107        );
108
109        let node = Self::Validator(validator.clone());
110
111        // Perform automatic ledger checkpoints.
112        if let Some(path) = auto_db_checkpoints {
113            if let Some(handle) = node.perform_auto_checkpoints(path)? {
114                validator.handles.lock().push(handle);
115            }
116        }
117
118        Ok(node)
119    }
120
121    /// Initializes a new prover node.
122    pub async fn new_prover(
123        node_ip: SocketAddr,
124        account: Account<N>,
125        trusted_peers: &[SocketAddr],
126        genesis: Block<N>,
127        node_data_dir: NodeDataDir,
128        trusted_peers_only: bool,
129        dev: Option<u16>,
130        signal_handler: Arc<SignalHandler>,
131    ) -> Result<Self> {
132        Ok(Self::Prover(Arc::new(
133            Prover::new(
134                node_ip,
135                account,
136                trusted_peers,
137                genesis,
138                node_data_dir,
139                trusted_peers_only,
140                dev,
141                signal_handler,
142            )
143            .await?,
144        )))
145    }
146
147    /// Initializes a new client node.
148    pub async fn new_client(
149        node_ip: SocketAddr,
150        rest_ip: Option<SocketAddr>,
151        rest_rps: u32,
152        account: Account<N>,
153        trusted_peers: &[SocketAddr],
154        genesis: Block<N>,
155        cdn: Option<http::Uri>,
156        storage_mode: StorageMode,
157        node_data_dir: NodeDataDir,
158        trusted_peers_only: bool,
159        auto_db_checkpoints: Option<PathBuf>,
160        dev: Option<u16>,
161        signal_handler: Arc<SignalHandler>,
162    ) -> Result<Self> {
163        let client = Arc::new(
164            Client::new(
165                node_ip,
166                rest_ip,
167                rest_rps,
168                account,
169                trusted_peers,
170                genesis,
171                cdn,
172                storage_mode,
173                node_data_dir,
174                trusted_peers_only,
175                dev,
176                signal_handler,
177            )
178            .await?,
179        );
180
181        let node = Self::Client(client.clone());
182
183        // Perform automatic ledger checkpoints.
184        if let Some(path) = auto_db_checkpoints {
185            if let Some(handle) = node.perform_auto_checkpoints(path)? {
186                client.handles.lock().push(handle);
187            }
188        }
189
190        Ok(node)
191    }
192
193    /// Initializes a new bootstrap client node.
194    pub async fn new_bootstrap_client(
195        listener_addr: SocketAddr,
196        account: Account<N>,
197        genesis_header: Header<N>,
198        dev: Option<u16>,
199    ) -> Result<Self> {
200        Ok(Self::BootstrapClient(BootstrapClient::new(listener_addr, account, genesis_header, dev).await?))
201    }
202
203    /// Returns the node type.
204    pub fn node_type(&self) -> NodeType {
205        match self {
206            Self::Validator(validator) => validator.node_type(),
207            Self::Prover(prover) => prover.node_type(),
208            Self::Client(client) => client.node_type(),
209            Self::BootstrapClient(_) => NodeType::BootstrapClient,
210        }
211    }
212
213    /// Returns the account private key of the node.
214    pub fn private_key(&self) -> &PrivateKey<N> {
215        match self {
216            Self::Validator(node) => node.private_key(),
217            Self::Prover(node) => node.private_key(),
218            Self::Client(node) => node.private_key(),
219            Self::BootstrapClient(node) => node.private_key(),
220        }
221    }
222
223    /// Returns the account view key of the node.
224    pub fn view_key(&self) -> &ViewKey<N> {
225        match self {
226            Self::Validator(node) => node.view_key(),
227            Self::Prover(node) => node.view_key(),
228            Self::Client(node) => node.view_key(),
229            Self::BootstrapClient(node) => node.view_key(),
230        }
231    }
232
233    /// Returns the account address of the node.
234    pub fn address(&self) -> Address<N> {
235        match self {
236            Self::Validator(node) => node.address(),
237            Self::Prover(node) => node.address(),
238            Self::Client(node) => node.address(),
239            Self::BootstrapClient(node) => node.address(),
240        }
241    }
242
243    /// Returns `true` if the node is in development mode.
244    pub fn is_dev(&self) -> bool {
245        match self {
246            Self::Validator(node) => node.is_dev(),
247            Self::Prover(node) => node.is_dev(),
248            Self::Client(node) => node.is_dev(),
249            Self::BootstrapClient(node) => node.is_dev(),
250        }
251    }
252
253    /// Returns a reference to the underlying peer pool.
254    pub fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
255        match self {
256            Self::Validator(validator) => validator.router().peer_pool(),
257            Self::Prover(prover) => prover.router().peer_pool(),
258            Self::Client(client) => client.router().peer_pool(),
259            Self::BootstrapClient(client) => client.peer_pool(),
260        }
261    }
262
263    /// Get the underlying ledger (if any).
264    pub fn ledger(&self) -> Option<&Ledger<N, ConsensusDB<N>>> {
265        match self {
266            Self::Validator(node) => Some(node.ledger()),
267            Self::Prover(_) => None,
268            Self::Client(node) => Some(node.ledger()),
269            Self::BootstrapClient(_) => None,
270        }
271    }
272
273    /// Returns `true` if the node is synced up to the latest block (within the given tolerance).
274    pub fn is_block_synced(&self) -> bool {
275        match self {
276            Self::Validator(node) => node.is_block_synced(),
277            Self::Prover(node) => node.is_block_synced(),
278            Self::Client(node) => node.is_block_synced(),
279            Self::BootstrapClient(_) => true,
280        }
281    }
282
283    /// Returns the number of blocks this node is behind the greatest peer height,
284    /// or `None` if not connected to peers yet.
285    pub fn num_blocks_behind(&self) -> Option<u32> {
286        match self {
287            Self::Validator(node) => node.num_blocks_behind(),
288            Self::Prover(node) => node.num_blocks_behind(),
289            Self::Client(node) => node.num_blocks_behind(),
290            Self::BootstrapClient(_) => Some(0),
291        }
292    }
293
294    /// Calculates the current sync speed in blocks per second.
295    /// Returns None if sync speed cannot be calculated (e.g., not syncing or insufficient data).
296    pub fn get_sync_speed(&self) -> f64 {
297        match self {
298            Self::Validator(node) => node.get_sync_speed(),
299            Self::Prover(node) => node.get_sync_speed(),
300            Self::Client(node) => node.get_sync_speed(),
301            Self::BootstrapClient(_) => 0.0,
302        }
303    }
304
305    /// Shuts down the node.
306    pub async fn shut_down(&self) {
307        match self {
308            Self::Validator(node) => node.shut_down().await,
309            Self::Prover(node) => node.shut_down().await,
310            Self::Client(node) => node.shut_down().await,
311            Self::BootstrapClient(node) => node.shut_down().await,
312        }
313    }
314
315    /// Waits until the node receives a signal.
316    pub async fn wait_for_signals(&self, signal_handler: &SignalHandler) {
317        match self {
318            Self::Validator(node) => node.wait_for_signals(signal_handler).await,
319            Self::Prover(node) => node.wait_for_signals(signal_handler).await,
320            Self::Client(node) => node.wait_for_signals(signal_handler).await,
321            Self::BootstrapClient(node) => node.wait_for_signals(signal_handler).await,
322        }
323    }
324
325    /// Periodically creates automated ledger checkpoints.
326    pub fn perform_auto_checkpoints(&self, auto_checkpoint_path: PathBuf) -> Result<Option<task::JoinHandle<()>>> {
327        // Only perform checkpoints if there's a database involved.
328        let Some(ledger) = self.ledger().cloned() else {
329            return Ok(None);
330        };
331
332        // Ensure that the target path exists as a folder or create it.
333        if !auto_checkpoint_path.exists() {
334            if let Err(e) = fs::create_dir_all(&auto_checkpoint_path) {
335                bail!("Couldn't create the specified path for the automatic ledger checkpoints: {e}");
336            }
337        } else if auto_checkpoint_path.exists() && !auto_checkpoint_path.is_dir() {
338            bail!("The specified path for automatic ledger checkpoints is not a directory");
339        }
340
341        // Spawn a loop that will periodically create the checkpoints.
342        let handle = tokio::spawn(async move {
343            info!("Starting the automatic ledger checkpoint routine...");
344
345            // Prepare some object that will be useful throughout the routine.
346            let mut last_checkpoint_height = None;
347            let mut existing_checkpoints = Vec::with_capacity(MAX_AUTO_CHECKPOINTS + 1);
348            let mut block_tree_path = aleo_ledger_dir(N::ID, ledger.vm().block_store().storage_mode());
349            block_tree_path.push("block_tree");
350
351            loop {
352                // A small delay that's smaller than block time. There are technically situations when
353                // blocks can be inserted one after the other more quickly (syncing, multiple blocks in
354                // a Subdag), those are edge cases unlikely to be encountered under normal conditions.
355                tokio::time::sleep(Duration::from_millis(500)).await;
356
357                // Skip if we've already created a checkpoint during this run, and the
358                // number of blocks baked since then is lower than the configured threshold.
359                let current_height = ledger.vm().block_store().current_block_height();
360                if last_checkpoint_height.is_some_and(|checkpoint_height| {
361                    current_height.saturating_sub(checkpoint_height) < CHECKPOINT_BLOCK_FREQUENCY
362                }) {
363                    continue;
364                }
365
366                // Create a checkpoint.
367                let mut checkpoint_path = auto_checkpoint_path.clone();
368                checkpoint_path.push(format!("checkpoint_{current_height}"));
369                if let Err(e) = ledger.backup_database(&checkpoint_path) {
370                    warn!("Couldn't automatically store a checkpoint at {}: {e}", checkpoint_path.display());
371                    continue;
372                }
373                last_checkpoint_height = Some(current_height);
374
375                // Immediately procure and copy the applicable block tree in the background.
376                let ledger_clone = ledger.clone();
377                let source_block_tree_path = block_tree_path.clone();
378                tokio::spawn(async move {
379                    if let Err(e) = ledger_clone.cache_block_tree() {
380                        warn!("Couldn't cache the block tree for a ledger checkpoint: {e}");
381                        return;
382                    }
383
384                    // Copy the block tree file to the new checkpoint.
385                    checkpoint_path.push("block_tree");
386                    if let Err(e) = fs::copy(source_block_tree_path, checkpoint_path) {
387                        warn!("Couldn't copy the block tree file to a ledger checkpoint: {e}");
388                    }
389                });
390
391                // Count the existing auto checkpoints.
392                existing_checkpoints.clear();
393                let checkpoint_dir = match auto_checkpoint_path.read_dir() {
394                    Ok(dir) => dir,
395                    Err(e) => {
396                        warn!("IO error while accessing the automatic checkpoints: {e}");
397                        continue;
398                    }
399                };
400                for entry in checkpoint_dir {
401                    // Handle possible IO errors.
402                    let entry = match entry {
403                        Ok(entry) => entry,
404                        Err(e) => {
405                            warn!("IO error while counting the automatic checkpoints: {e}");
406                            continue;
407                        }
408                    };
409
410                    // Skip non-directories.
411                    let path = entry.path();
412                    if !path.is_dir() {
413                        continue;
414                    }
415
416                    // Recognize checkpoints by the "checkpoint_height" name.
417                    let file_name = entry.file_name().into_string().unwrap(); // can't fail - we create Unicode filenames
418                    let mut name_iter = file_name.split("_");
419                    if name_iter.next() != Some("checkpoint") {
420                        continue;
421                    }
422                    let Some(height) = name_iter.next() else {
423                        continue;
424                    };
425                    let Ok(height) = u32::from_str(height) else {
426                        continue;
427                    };
428                    existing_checkpoints.push((path, height));
429                }
430                existing_checkpoints.sort_unstable_by_key(|(_, height)| cmp::Reverse(*height));
431
432                // If we have a sufficient number of checkpoints, delete the oldest one(s).
433                let surplus_checkpoints = existing_checkpoints.len().saturating_sub(MAX_AUTO_CHECKPOINTS);
434                for _ in 0..surplus_checkpoints {
435                    if let Some((checkpoint_path, _)) = existing_checkpoints.pop() {
436                        if let Err(e) = fs::remove_dir_all(checkpoint_path) {
437                            warn!("Couldn't remove an automatic ledger checkpoint: {e}");
438                        }
439                    }
440                }
441            }
442        });
443
444        Ok(Some(handle))
445    }
446}