Skip to main content

snarkos_node/
node.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    BootstrapClient,
18    Client,
19    Prover,
20    Validator,
21    network::{NodeType, Peer, PeerPoolHandling},
22    router::Outbound,
23    traits::NodeInterface,
24};
25
26use snarkos_account::Account;
27use snarkos_utilities::{NodeDataDir, SignalHandler};
28
29use snarkvm::prelude::{
30    Address,
31    Header,
32    Ledger,
33    Network,
34    PrivateKey,
35    ViewKey,
36    block::Block,
37    store::helpers::{memory::ConsensusMemory, rocksdb::ConsensusDB},
38};
39
40use aleo_std::{StorageMode, aleo_ledger_dir};
41use anyhow::{Result, bail};
42
43#[cfg(feature = "locktick")]
44use locktick::parking_lot::RwLock;
45#[cfg(not(feature = "locktick"))]
46use parking_lot::RwLock;
47use std::{
48    cmp,
49    collections::HashMap,
50    fs,
51    net::SocketAddr,
52    path::{Path, PathBuf},
53    str::FromStr,
54    sync::Arc,
55    time::Duration,
56};
57use tokio::task;
58
59/// The number of blocks between automatic database checkpoints.
60const CHECKPOINT_BLOCK_FREQUENCY: u32 = 1000;
61
62/// The maximum number of automatic database checkpoints kept at any time.
63const MAX_AUTO_CHECKPOINTS: usize = 5;
64
65fn existing_startup_checkpoint_height(auto_checkpoint_path: &Path, startup_height: u32) -> Option<u32> {
66    let mut checkpoint_path = auto_checkpoint_path.to_path_buf();
67    checkpoint_path.push(format!("checkpoint_{startup_height}"));
68    checkpoint_path.is_dir().then_some(startup_height)
69}
70
71#[derive(Clone)]
72pub enum Node<N: Network> {
73    /// A validator is a full node, capable of validating blocks.
74    Validator(Arc<Validator<N, ConsensusDB<N>>>),
75    /// A prover is a light node, capable of producing proofs for consensus.
76    Prover(Arc<Prover<N, ConsensusMemory<N>>>),
77    /// A client node is a full node, capable of querying with the network.
78    Client(Arc<Client<N, ConsensusDB<N>>>),
79    /// A bootstrap client node is a light node dedicated to serving lists of peers.
80    BootstrapClient(BootstrapClient<N>),
81}
82
83impl<N: Network> Node<N> {
84    /// Initializes a new validator node.
85    pub async fn new_validator(
86        node_ip: SocketAddr,
87        bft_ip: Option<SocketAddr>,
88        rest_ip: Option<SocketAddr>,
89        rest_rps: u32,
90        account: Account<N>,
91        trusted_peers: &[SocketAddr],
92        trusted_validators: &[SocketAddr],
93        genesis: Block<N>,
94        cdn: Option<http::Uri>,
95        storage_mode: StorageMode,
96        node_data_dir: NodeDataDir,
97        trusted_peers_only: bool,
98        auto_db_checkpoints: Option<PathBuf>,
99        dev_txs: bool,
100        dev: Option<u16>,
101        slipstream_configs: &[PathBuf],
102        dev_num_validators_for_committee_hotswap: Option<u16>,
103        signal_handler: Arc<SignalHandler>,
104    ) -> Result<Self> {
105        let validator = Arc::new(
106            Validator::new(
107                node_ip,
108                bft_ip,
109                rest_ip,
110                rest_rps,
111                account,
112                trusted_peers,
113                trusted_validators,
114                genesis,
115                cdn,
116                storage_mode,
117                node_data_dir,
118                trusted_peers_only,
119                dev_txs,
120                dev,
121                slipstream_configs,
122                dev_num_validators_for_committee_hotswap,
123                signal_handler,
124            )
125            .await?,
126        );
127
128        let node = Self::Validator(validator.clone());
129
130        // Perform automatic ledger checkpoints.
131        if let Some(path) = auto_db_checkpoints {
132            if let Some(handle) = node.perform_auto_checkpoints(path)? {
133                validator.handles.lock().push(handle);
134            }
135        }
136
137        Ok(node)
138    }
139
140    /// Initializes a new prover node.
141    pub async fn new_prover(
142        node_ip: SocketAddr,
143        account: Account<N>,
144        trusted_peers: &[SocketAddr],
145        genesis: Block<N>,
146        node_data_dir: NodeDataDir,
147        trusted_peers_only: bool,
148        dev: Option<u16>,
149        signal_handler: Arc<SignalHandler>,
150    ) -> Result<Self> {
151        Ok(Self::Prover(Arc::new(
152            Prover::new(
153                node_ip,
154                account,
155                trusted_peers,
156                genesis,
157                node_data_dir,
158                trusted_peers_only,
159                dev,
160                signal_handler,
161            )
162            .await?,
163        )))
164    }
165
166    /// Initializes a new client node.
167    pub async fn new_client(
168        node_ip: SocketAddr,
169        rest_ip: Option<SocketAddr>,
170        rest_rps: u32,
171        account: Account<N>,
172        trusted_peers: &[SocketAddr],
173        genesis: Block<N>,
174        cdn: Option<http::Uri>,
175        storage_mode: StorageMode,
176        node_data_dir: NodeDataDir,
177        trusted_peers_only: bool,
178        auto_db_checkpoints: Option<PathBuf>,
179        dev: Option<u16>,
180        slipstream_configs: &[PathBuf],
181        signal_handler: Arc<SignalHandler>,
182    ) -> Result<Self> {
183        let client = Arc::new(
184            Client::new(
185                node_ip,
186                rest_ip,
187                rest_rps,
188                account,
189                trusted_peers,
190                genesis,
191                cdn,
192                storage_mode,
193                node_data_dir,
194                trusted_peers_only,
195                dev,
196                slipstream_configs,
197                signal_handler,
198            )
199            .await?,
200        );
201
202        let node = Self::Client(client.clone());
203
204        // Perform automatic ledger checkpoints.
205        if let Some(path) = auto_db_checkpoints {
206            if let Some(handle) = node.perform_auto_checkpoints(path)? {
207                client.handles.lock().push(handle);
208            }
209        }
210
211        Ok(node)
212    }
213
214    /// Initializes a new bootstrap client node.
215    pub async fn new_bootstrap_client(
216        listener_addr: SocketAddr,
217        account: Account<N>,
218        genesis_header: Header<N>,
219        dev: Option<u16>,
220    ) -> Result<Self> {
221        Ok(Self::BootstrapClient(BootstrapClient::new(listener_addr, account, genesis_header, dev).await?))
222    }
223
224    /// Returns the node type.
225    pub fn node_type(&self) -> NodeType {
226        match self {
227            Self::Validator(validator) => validator.node_type(),
228            Self::Prover(prover) => prover.node_type(),
229            Self::Client(client) => client.node_type(),
230            Self::BootstrapClient(_) => NodeType::BootstrapClient,
231        }
232    }
233
234    /// Returns the account private key of the node.
235    pub fn private_key(&self) -> &PrivateKey<N> {
236        match self {
237            Self::Validator(node) => node.private_key(),
238            Self::Prover(node) => node.private_key(),
239            Self::Client(node) => node.private_key(),
240            Self::BootstrapClient(node) => node.private_key(),
241        }
242    }
243
244    /// Returns the account view key of the node.
245    pub fn view_key(&self) -> &ViewKey<N> {
246        match self {
247            Self::Validator(node) => node.view_key(),
248            Self::Prover(node) => node.view_key(),
249            Self::Client(node) => node.view_key(),
250            Self::BootstrapClient(node) => node.view_key(),
251        }
252    }
253
254    /// Returns the account address of the node.
255    pub fn address(&self) -> Address<N> {
256        match self {
257            Self::Validator(node) => node.address(),
258            Self::Prover(node) => node.address(),
259            Self::Client(node) => node.address(),
260            Self::BootstrapClient(node) => node.address(),
261        }
262    }
263
264    /// Returns `true` if the node is in development mode.
265    pub fn is_dev(&self) -> bool {
266        match self {
267            Self::Validator(node) => node.is_dev(),
268            Self::Prover(node) => node.is_dev(),
269            Self::Client(node) => node.is_dev(),
270            Self::BootstrapClient(node) => node.is_dev(),
271        }
272    }
273
274    /// Returns a reference to the underlying peer pool.
275    pub fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
276        match self {
277            Self::Validator(validator) => validator.router().peer_pool(),
278            Self::Prover(prover) => prover.router().peer_pool(),
279            Self::Client(client) => client.router().peer_pool(),
280            Self::BootstrapClient(client) => client.peer_pool(),
281        }
282    }
283
284    /// Get the underlying ledger (if any).
285    pub fn ledger(&self) -> Option<&Ledger<N, ConsensusDB<N>>> {
286        match self {
287            Self::Validator(node) => Some(node.ledger()),
288            Self::Prover(_) => None,
289            Self::Client(node) => Some(node.ledger()),
290            Self::BootstrapClient(_) => None,
291        }
292    }
293
294    /// Returns `true` if the node is synced up to the latest block (within the given tolerance).
295    pub fn is_block_synced(&self) -> bool {
296        match self {
297            Self::Validator(node) => node.is_block_synced(),
298            Self::Prover(node) => node.is_block_synced(),
299            Self::Client(node) => node.is_block_synced(),
300            Self::BootstrapClient(_) => true,
301        }
302    }
303
304    /// Returns the number of blocks this node is behind the greatest peer height,
305    /// or `None` if not connected to peers yet.
306    pub fn num_blocks_behind(&self) -> Option<u32> {
307        match self {
308            Self::Validator(node) => node.num_blocks_behind(),
309            Self::Prover(node) => node.num_blocks_behind(),
310            Self::Client(node) => node.num_blocks_behind(),
311            Self::BootstrapClient(_) => Some(0),
312        }
313    }
314
315    /// Calculates the current sync speed in blocks per second.
316    /// Returns None if sync speed cannot be calculated (e.g., not syncing or insufficient data).
317    pub fn get_sync_speed(&self) -> f64 {
318        match self {
319            Self::Validator(node) => node.get_sync_speed(),
320            Self::Prover(node) => node.get_sync_speed(),
321            Self::Client(node) => node.get_sync_speed(),
322            Self::BootstrapClient(_) => 0.0,
323        }
324    }
325
326    /// Shuts down the node.
327    pub async fn shut_down(&self) {
328        match self {
329            Self::Validator(node) => node.shut_down().await,
330            Self::Prover(node) => node.shut_down().await,
331            Self::Client(node) => node.shut_down().await,
332            Self::BootstrapClient(node) => node.shut_down().await,
333        }
334    }
335
336    /// Waits until the node receives a signal.
337    pub async fn wait_for_signals(&self, signal_handler: &SignalHandler) {
338        match self {
339            Self::Validator(node) => node.wait_for_signals(signal_handler).await,
340            Self::Prover(node) => node.wait_for_signals(signal_handler).await,
341            Self::Client(node) => node.wait_for_signals(signal_handler).await,
342            Self::BootstrapClient(node) => node.wait_for_signals(signal_handler).await,
343        }
344    }
345
346    /// Periodically creates automated ledger checkpoints.
347    pub fn perform_auto_checkpoints(&self, auto_checkpoint_path: PathBuf) -> Result<Option<task::JoinHandle<()>>> {
348        // Only perform checkpoints if there's a database involved.
349        let Some(ledger) = self.ledger().cloned() else {
350            return Ok(None);
351        };
352
353        // Ensure that the target path exists as a folder or create it.
354        if !auto_checkpoint_path.exists() {
355            if let Err(e) = fs::create_dir_all(&auto_checkpoint_path) {
356                bail!("Couldn't create the specified path for the automatic ledger checkpoints: {e}");
357            }
358        } else if auto_checkpoint_path.exists() && !auto_checkpoint_path.is_dir() {
359            bail!("The specified path for automatic ledger checkpoints is not a directory");
360        }
361
362        // Spawn a loop that will periodically create the checkpoints.
363        let handle = tokio::spawn(async move {
364            info!("Starting the automatic ledger checkpoint routine...");
365
366            // Prepare some object that will be useful throughout the routine.
367            let startup_height = ledger.vm().block_store().current_block_height();
368            let mut last_checkpoint_height =
369                existing_startup_checkpoint_height(auto_checkpoint_path.as_path(), startup_height);
370            let mut existing_checkpoints = Vec::with_capacity(MAX_AUTO_CHECKPOINTS + 1);
371            let mut block_tree_path = aleo_ledger_dir(N::ID, ledger.vm().block_store().storage_mode());
372            block_tree_path.push("block_tree");
373
374            loop {
375                // A small delay that's smaller than block time. There are technically situations when
376                // blocks can be inserted one after the other more quickly (syncing, multiple blocks in
377                // a Subdag), those are edge cases unlikely to be encountered under normal conditions.
378                tokio::time::sleep(Duration::from_millis(500)).await;
379
380                // Skip if we've already created a checkpoint during this run, and the
381                // number of blocks baked since then is lower than the configured threshold.
382                let current_height = ledger.vm().block_store().current_block_height();
383                if last_checkpoint_height.is_some_and(|checkpoint_height| {
384                    current_height.saturating_sub(checkpoint_height) < CHECKPOINT_BLOCK_FREQUENCY
385                }) {
386                    continue;
387                }
388
389                // Create a checkpoint.
390                let mut checkpoint_path = auto_checkpoint_path.clone();
391                checkpoint_path.push(format!("checkpoint_{current_height}"));
392                if let Err(e) = ledger.backup_database(&checkpoint_path) {
393                    warn!("Couldn't automatically store a checkpoint at {}: {e}", checkpoint_path.display());
394                    continue;
395                }
396                last_checkpoint_height = Some(current_height);
397
398                // Immediately procure and copy the applicable block tree in the background.
399                let ledger_clone = ledger.clone();
400                let source_block_tree_path = block_tree_path.clone();
401                tokio::spawn(async move {
402                    if let Err(e) = ledger_clone.cache_block_tree() {
403                        warn!("Couldn't cache the block tree for a ledger checkpoint: {e}");
404                        return;
405                    }
406
407                    // Copy the block tree file to the new checkpoint.
408                    checkpoint_path.push("block_tree");
409                    if let Err(e) = fs::copy(source_block_tree_path, checkpoint_path) {
410                        warn!("Couldn't copy the block tree file to a ledger checkpoint: {e}");
411                    }
412                });
413
414                // Count the existing auto checkpoints.
415                existing_checkpoints.clear();
416                let checkpoint_dir = match auto_checkpoint_path.read_dir() {
417                    Ok(dir) => dir,
418                    Err(e) => {
419                        warn!("IO error while accessing the automatic checkpoints: {e}");
420                        continue;
421                    }
422                };
423                for entry in checkpoint_dir {
424                    // Handle possible IO errors.
425                    let entry = match entry {
426                        Ok(entry) => entry,
427                        Err(e) => {
428                            warn!("IO error while counting the automatic checkpoints: {e}");
429                            continue;
430                        }
431                    };
432
433                    // Skip non-directories.
434                    let path = entry.path();
435                    if !path.is_dir() {
436                        continue;
437                    }
438
439                    // Recognize checkpoints by the "checkpoint_height" name.
440                    let file_name = entry.file_name().into_string().unwrap(); // can't fail - we create Unicode filenames
441                    let mut name_iter = file_name.split("_");
442                    if name_iter.next() != Some("checkpoint") {
443                        continue;
444                    }
445                    let Some(height) = name_iter.next() else {
446                        continue;
447                    };
448                    let Ok(height) = u32::from_str(height) else {
449                        continue;
450                    };
451                    existing_checkpoints.push((path, height));
452                }
453                existing_checkpoints.sort_unstable_by_key(|(_, height)| cmp::Reverse(*height));
454
455                // If we have a sufficient number of checkpoints, delete the oldest one(s).
456                let surplus_checkpoints = existing_checkpoints.len().saturating_sub(MAX_AUTO_CHECKPOINTS);
457                for _ in 0..surplus_checkpoints {
458                    if let Some((checkpoint_path, _)) = existing_checkpoints.pop() {
459                        if let Err(e) = fs::remove_dir_all(checkpoint_path) {
460                            warn!("Couldn't remove an automatic ledger checkpoint: {e}");
461                        }
462                    }
463                }
464            }
465        });
466
467        Ok(Some(handle))
468    }
469}
470
471#[cfg(test)]
472mod tests {
473    use super::existing_startup_checkpoint_height;
474    use std::{
475        fs,
476        time::{SystemTime, UNIX_EPOCH},
477    };
478
479    #[test]
480    fn seeds_last_checkpoint_height_when_startup_checkpoint_directory_exists() {
481        let startup_height = 42;
482        let unique = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
483        let base_path = std::env::temp_dir().join(format!("snarkos_checkpoint_seed_test_{unique}"));
484        let checkpoint_path = base_path.join(format!("checkpoint_{startup_height}"));
485        fs::create_dir_all(&checkpoint_path).unwrap();
486
487        let seeded_height = existing_startup_checkpoint_height(base_path.as_path(), startup_height);
488        assert_eq!(seeded_height, Some(startup_height));
489
490        fs::remove_dir_all(base_path).unwrap();
491    }
492
493    #[test]
494    fn does_not_seed_last_checkpoint_height_when_startup_checkpoint_directory_missing() {
495        let startup_height = 42;
496        let unique = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
497        let base_path = std::env::temp_dir().join(format!("snarkos_checkpoint_seed_test_{unique}"));
498        fs::create_dir_all(&base_path).unwrap();
499
500        let seeded_height = existing_startup_checkpoint_height(base_path.as_path(), startup_height);
501        assert_eq!(seeded_height, None);
502
503        fs::remove_dir_all(base_path).unwrap();
504    }
505}