Skip to main content

snarkos_node/client/
mod.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::{
19    bft::{helpers::fmt_id, ledger_service::CoreLedgerService, spawn_blocking},
20    cdn::CdnBlockSync,
21    traits::NodeInterface,
22};
23
24use snarkos_account::Account;
25use snarkos_node_network::{ConnectionMode, NodeType};
26use snarkos_node_rest::Rest;
27use snarkos_node_router::{
28    Heartbeat,
29    Inbound,
30    Outbound,
31    Router,
32    Routing,
33    messages::{Message, UnconfirmedSolution, UnconfirmedTransaction},
34};
35use snarkos_node_sync::{BlockSync, Ping};
36use snarkos_node_tcp::{
37    P2P,
38    protocols::{Disconnect, Handshake, OnConnect, Reading},
39};
40use snarkos_utilities::{NodeDataDir, SignalHandler, Stoppable};
41
42use snarkvm::{
43    console::network::Network,
44    ledger::{
45        Ledger,
46        block::{Block, Header},
47        puzzle::{Puzzle, Solution, SolutionID},
48        store::ConsensusStorage,
49    },
50    prelude::{VM, block::Transaction},
51};
52
53use aleo_std::StorageMode;
54use anyhow::{Context, Result};
55use core::future::Future;
56#[cfg(feature = "locktick")]
57use locktick::parking_lot::Mutex;
58use lru::LruCache;
59#[cfg(not(feature = "locktick"))]
60use parking_lot::Mutex;
61use std::{
62    net::SocketAddr,
63    num::NonZeroUsize,
64    sync::{
65        Arc,
66        atomic::{
67            AtomicUsize,
68            Ordering::{Acquire, Relaxed},
69        },
70    },
71    time::Duration,
72};
73use tokio::{
74    task::JoinHandle,
75    time::{sleep, timeout},
76};
77
78/// The maximum number of solutions to verify in parallel.
79/// Note: worst case memory to verify a solution is 0.5 GiB.
80const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20;
81/// The capacity for storing unconfirmed deployments.
82/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
83const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
84/// The capacity for storing unconfirmed executions.
85/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
86const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
87/// The capacity for storing unconfirmed solutions.
88/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
89const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
90
91/// Transaction details needed for propagation.
92/// We preserve the serialized transaction for faster propagation.
93type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
94/// Solution details needed for propagation.
95/// We preserve the serialized solution for faster propagation.
96type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>);
97
98/// A client node is a full node, capable of querying with the network.
99#[derive(Clone)]
100pub struct Client<N: Network, C: ConsensusStorage<N>> {
101    /// The ledger of the node.
102    ledger: Ledger<N, C>,
103    /// The router of the node.
104    router: Router<N>,
105    /// The REST server of the node.
106    rest: Option<Rest<N, C, Self>>,
107    /// The block synchronization logic.
108    sync: Arc<BlockSync<N>>,
109    /// The genesis block.
110    genesis: Block<N>,
111    /// The puzzle.
112    puzzle: Puzzle<N>,
113    /// The unconfirmed solutions queue.
114    solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>,
115    /// The unconfirmed deployments queue.
116    deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
117    /// The unconfirmed executions queue.
118    execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
119    /// The amount of solutions currently being verified.
120    num_verifying_solutions: Arc<AtomicUsize>,
121    /// The amount of deployments currently being verified.
122    num_verifying_deploys: Arc<AtomicUsize>,
123    /// The amount of executions currently being verified.
124    num_verifying_executions: Arc<AtomicUsize>,
125    /// The spawned handles.
126    pub(crate) handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
127    /// Keeps track of sending pings.
128    ping: Arc<Ping<N>>,
129    /// The signal handling logic.
130    signal_handler: Arc<SignalHandler>,
131}
132
133impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
134    /// Initializes a new client node.
135    pub async fn new(
136        node_ip: SocketAddr,
137        rest_ip: Option<SocketAddr>,
138        rest_rps: u32,
139        account: Account<N>,
140        trusted_peers: &[SocketAddr],
141        genesis: Block<N>,
142        cdn: Option<http::Uri>,
143        storage_mode: StorageMode,
144        node_data_dir: NodeDataDir,
145        trusted_peers_only: bool,
146        dev: Option<u16>,
147        _slipstream_configs: &[std::path::PathBuf],
148        signal_handler: Arc<SignalHandler>,
149    ) -> Result<Self> {
150        // Initialize the ledger.
151        let ledger = {
152            let storage_mode = storage_mode.clone();
153            let genesis = genesis.clone();
154
155            spawn_blocking!(Ledger::<N, C>::load(genesis, storage_mode))
156        }
157        .with_context(|| "Failed to initialize the ledger")?;
158
159        // Initialize the Slipstream plugin manager (if any config files were provided).
160        #[cfg(feature = "slipstream-plugins")]
161        if !_slipstream_configs.is_empty() {
162            let manager =
163                snarkvm::slipstream_plugin_manager::SlipstreamPluginManager::from_config_files(_slipstream_configs)
164                    .context("Failed to initialize Slipstream plugin manager")?;
165            ledger.vm().finalize_store().set_slipstream_plugin_manager(manager);
166            let num_plugins = _slipstream_configs.len();
167            tracing::info!(target: "slipstream", "Slipstream plugin manager registered ({num_plugins} plugin(s))");
168        }
169
170        // Initialize the ledger service.
171        let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), signal_handler.clone()));
172        // Initialize the node router.
173        let router = Router::new(
174            node_ip,
175            NodeType::Client,
176            account,
177            ledger_service.clone(),
178            trusted_peers,
179            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
180            trusted_peers_only,
181            node_data_dir.clone(),
182            dev.is_some(),
183        )
184        .await?;
185
186        // Initialize the sync module.
187        let sync = Arc::new(BlockSync::new(ledger_service.clone(), ConnectionMode::Router));
188
189        // Set up the ping logic.
190        let locators = sync.get_block_locators()?;
191        let ping = Arc::new(Ping::new(router.clone(), locators));
192
193        // Initialize the node.
194        let mut node = Self {
195            ledger: ledger.clone(),
196            router,
197            rest: None,
198            sync: sync.clone(),
199            genesis,
200            ping,
201            puzzle: ledger.puzzle().clone(),
202            solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
203            deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))),
204            execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))),
205            num_verifying_solutions: Default::default(),
206            num_verifying_deploys: Default::default(),
207            num_verifying_executions: Default::default(),
208            handles: Default::default(),
209            signal_handler: signal_handler.clone(),
210        };
211
212        // Perform sync with CDN (if enabled).
213        let cdn_sync = cdn.map(|base_url| {
214            trace!("CDN sync is enabled");
215            Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler))
216        });
217
218        // Initialize the REST server.
219        if let Some(rest_ip) = rest_ip {
220            node.rest = Some(
221                Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone()), cdn_sync.clone(), sync)
222                    .await?,
223            );
224        }
225
226        // Set up everything else after CDN sync is done.
227        if let Some(cdn_sync) = cdn_sync {
228            if let Err(error) = cdn_sync.wait().await.with_context(|| "Failed to synchronize from the CDN") {
229                crate::log_clean_error(&storage_mode);
230                node.shut_down().await;
231                return Err(error);
232            }
233        }
234
235        // Initialize the routing.
236        node.initialize_routing().await;
237        // Initialize the sync module.
238        node.initialize_sync();
239        // Initialize solution verification.
240        node.initialize_solution_verification();
241        // Initialize deployment verification.
242        node.initialize_deploy_verification();
243        // Initialize execution verification.
244        node.initialize_execute_verification();
245        // Initialize the notification message loop.
246        node.handles.lock().push(crate::start_notification_message_loop());
247        // Return the node.
248        Ok(node)
249    }
250
251    /// Returns the ledger.
252    pub fn ledger(&self) -> &Ledger<N, C> {
253        &self.ledger
254    }
255
256    /// Returns the REST server.
257    pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
258        &self.rest
259    }
260
261    /// Returns the router.
262    pub fn router(&self) -> &Router<N> {
263        &self.router
264    }
265}
266
267/// Sync-specific code.
268impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
269    /// The maximum time to wait for peer updates before timing out and attempting to issue new requests.
270    /// This only exists as a fallback for the (unlikely) case a task does not get notified about updates.
271    const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
272
273    /// Spawns the tasks that performs the syncing logic for this client.
274    fn initialize_sync(&self) {
275        // Start the block request generation loop (outgoing).
276        let self_ = self.clone();
277        self.spawn(async move {
278            while !self_.signal_handler.is_stopped() {
279                // Wait for peer updates or timeout
280                let _ = timeout(Self::MAX_SYNC_INTERVAL, self_.sync.wait_for_peer_update()).await;
281
282                // Perform the sync routine.
283                self_.try_issuing_block_requests().await;
284            }
285
286            info!("Stopped block request generation");
287        });
288
289        // Start the block response processing loop (incoming).
290        let self_ = self.clone();
291        self.spawn(async move {
292            while !self_.signal_handler.is_stopped() {
293                // Wait until there is something to do or until the timeout.
294                let _ = timeout(Self::MAX_SYNC_INTERVAL, self_.sync.wait_for_block_responses()).await;
295
296                // Perform the sync routine.
297                self_.try_advancing_block_synchronization().await;
298
299                // We perform no additional rate limiting here as
300                // requests are already rate-limited.
301            }
302
303            debug!("Stopped block response processing");
304        });
305    }
306
307    /// Client-side version of [`snarkvm_node_bft::Sync::try_advancing_block_synchronization`].
308    async fn try_advancing_block_synchronization(&self) {
309        let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
310            Ok(val) => val,
311            Err(err) => {
312                error!("Block synchronization failed - {err}");
313                return;
314            }
315        };
316
317        // If there are new blocks, we need to update the block locators.
318        if has_new_blocks {
319            match self.sync.get_block_locators() {
320                Ok(locators) => self.ping.update_block_locators(locators),
321                Err(err) => error!("Failed to get block locators: {err}"),
322            }
323        }
324    }
325
326    /// Client-side version of `snarkvm_node_bft::Sync::try_block_sync()`.
327    async fn try_issuing_block_requests(&self) {
328        self.sync.try_issuing_block_requests(self.router()).await;
329    }
330
331    /// Initializes solution verification.
332    fn initialize_solution_verification(&self) {
333        // Start the solution verification loop.
334        let node = self.clone();
335        self.spawn(async move {
336            loop {
337                // If the Ctrl-C handler registered the signal, stop the node.
338                if node.signal_handler.is_stopped() {
339                    info!("Shutting down solution verification");
340                    break;
341                }
342
343                // Determine if the queue contains txs to verify.
344                let queue_is_empty = node.solution_queue.lock().is_empty();
345                // Determine if our verification counter has space to verify new solutions.
346                let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS;
347
348                // Sleep to allow the queue to be filled or solutions to be validated.
349                if queue_is_empty || counter_is_full {
350                    sleep(Duration::from_millis(50)).await;
351                    continue;
352                }
353
354                // Try to verify solutions.
355                let mut solution_queue = node.solution_queue.lock();
356                while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() {
357                    // Increment the verification counter.
358                    let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed);
359                    let _node = node.clone();
360                    // For each solution, spawn a task to verify it.
361                    tokio::task::spawn_blocking(move || {
362                        // Retrieve the latest epoch hash.
363                        if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() {
364                            // Check if the prover has reached their solution limit.
365                            // While snarkVM will ultimately abort any excess solutions for safety, performing this check
366                            // here prevents the to-be aborted solutions from propagating through the network.
367                            let prover_address = solution.address();
368                            if _node.ledger.is_solution_limit_reached(&prover_address, 0) {
369                                debug!("Invalid Solution '{}' - Prover '{prover_address}' has reached their solution limit for the current epoch", fmt_id(solution.id()));
370                            }
371                            // Retrieve the latest proof target.
372                            let proof_target = _node.ledger.latest_block().header().proof_target();
373                            // Ensure that the solution is valid for the given epoch.
374                            let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target);
375
376                            match is_valid {
377                                // If the solution is valid, propagate the `UnconfirmedSolution`.
378                                Ok(()) => {
379                                    let message = Message::UnconfirmedSolution(serialized);
380                                    // Propagate the "UnconfirmedSolution".
381                                    _node.propagate(message, &[peer_ip]);
382                                }
383                                // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore.
384                                Err(error) => {
385                                    if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
386                                        debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}")
387                                    }
388                                }
389                            }
390                        } else {
391                            warn!("Failed to retrieve the latest epoch hash.");
392                        }
393                        // Decrement the verification counter.
394                        _node.num_verifying_solutions.fetch_sub(1, Relaxed);
395                    });
396                    // If we are already at capacity, don't verify more solutions.
397                    if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS {
398                        break;
399                    }
400                }
401            }
402        });
403    }
404
405    /// Initializes deploy verification.
406    fn initialize_deploy_verification(&self) {
407        // Start the deploy verification loop.
408        let node = self.clone();
409        self.spawn(async move {
410            loop {
411                // If the Ctrl-C handler registered the signal, stop the node.
412                if node.signal_handler.is_stopped() {
413                    info!("Shutting down deployment verification");
414                    break;
415                }
416
417                // Determine if the queue contains txs to verify.
418                let queue_is_empty = node.deploy_queue.lock().is_empty();
419                // Determine if our verification counter has space to verify new txs.
420                let counter_is_full =
421                    node.num_verifying_deploys.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS;
422
423                // Sleep to allow the queue to be filled or transactions to be validated.
424                if queue_is_empty || counter_is_full {
425                    sleep(Duration::from_millis(50)).await;
426                    continue;
427                }
428
429                // Try to verify deployments.
430                while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() {
431                    // Increment the verification counter.
432                    let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed);
433                    let _node = node.clone();
434                    // For each deployment, spawn a task to verify it.
435                    tokio::task::spawn_blocking(move || {
436                        // First collect the state root.
437                        let Some(state_root) = transaction.fee_transition().map(|t| t.global_state_root()) else {
438                            debug!("Failed to access global state root for deployment from peer_ip {peer_ip}");
439                            _node.num_verifying_deploys.fetch_sub(1, Relaxed);
440                            return;
441                        };
442                        // Check if the state root is in the ledger.
443                        if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
444                            debug!("Failed to find global state root for deployment from peer_ip {peer_ip}, propagating anyway");
445                            // Propagate the `UnconfirmedTransaction`.
446                            _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
447                            _node.num_verifying_deploys.fetch_sub(1, Relaxed);
448                            return;
449                            // Also skip the `check_transaction_basic` call if it is already propagated.
450                        }
451                        // Check the deployment.
452                        match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::rng()) {
453                            Ok(_) => {
454                                // Propagate the `UnconfirmedTransaction`.
455                                _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
456                            }
457                            Err(error) => {
458                                debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}");
459                            }
460                        }
461                        // Decrement the verification counter.
462                        _node.num_verifying_deploys.fetch_sub(1, Relaxed);
463                    });
464                    // If we are already at capacity, don't verify more deployments.
465                    if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS {
466                        break;
467                    }
468                }
469            }
470        });
471    }
472
473    /// Initializes execute verification.
474    fn initialize_execute_verification(&self) {
475        // Start the execute verification loop.
476        let node = self.clone();
477        self.spawn(async move {
478            loop {
479                // If the Ctrl-C handler registered the signal, stop the node.
480                if node.signal_handler.is_stopped() {
481                    info!("Shutting down execution verification");
482                    break;
483                }
484
485                // Determine if the queue contains txs to verify.
486                let queue_is_empty = node.execute_queue.lock().is_empty();
487                // Determine if our verification counter has space to verify new txs.
488                let counter_is_full =
489                    node.num_verifying_executions.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS;
490
491                // Sleep to allow the queue to be filled or transactions to be validated.
492                if queue_is_empty || counter_is_full {
493                    sleep(Duration::from_millis(50)).await;
494                    continue;
495                }
496
497                // Try to verify executions.
498                while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() {
499                    // Increment the verification counter.
500                    let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed);
501                    let _node = node.clone();
502                    // For each execution, spawn a task to verify it.
503                    tokio::task::spawn_blocking(move || {
504                        // First collect the state roots.
505                        let state_roots = [
506                            transaction.execution().map(|t| t.global_state_root()),
507                            transaction.fee_transition().map(|t| t.global_state_root()),
508                        ]
509                        .into_iter()
510                        .flatten();
511
512                        for state_root in state_roots {
513                            if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
514                                debug!("Failed to find global state root for execution from peer_ip {peer_ip}, propagating anyway");
515                                // Propagate the `UnconfirmedTransaction`.
516                                _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
517                                _node.num_verifying_executions.fetch_sub(1, Relaxed);
518                                return;
519                                // Also skip the `check_transaction_basic` call if it is already propagated.
520                            }
521                        }
522                        // Check the execution.
523                        match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::rng()) {
524                            Ok(_) => {
525                                // Propagate the `UnconfirmedTransaction`.
526                                _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
527                            }
528                            Err(error) => {
529                                debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}");
530                            }
531                        }
532                        // Decrement the verification counter.
533                        _node.num_verifying_executions.fetch_sub(1, Relaxed);
534                    });
535                    // If we are already at capacity, don't verify more executions.
536                    if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS {
537                        break;
538                    }
539                }
540            }
541        });
542    }
543
544    /// Spawns a task with the given future; it should only be used for long-running tasks.
545    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
546        self.handles.lock().push(tokio::spawn(future));
547    }
548}
549
550#[async_trait]
551impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
552    /// Shuts down the node.
553    async fn shut_down(&self) {
554        info!("Shutting down...");
555
556        // Shut down the node.
557        trace!("Shutting down the node...");
558
559        // Shut down the Slipstream plugin service.
560        #[cfg(feature = "slipstream-plugins")]
561        if let Some(manager) = self.ledger.vm().finalize_store().slipstream_plugin_manager().write().as_mut() {
562            manager.unload();
563        }
564
565        // Shut down the REST instance.
566        if let Some(rest) = &self.rest {
567            trace!("Shutting down the REST server...");
568            rest.shut_down();
569        }
570
571        // Abort the tasks.
572        trace!("Shutting down the client...");
573        self.handles.lock().iter().for_each(|handle| handle.abort());
574
575        // Shut down the router.
576        self.router.shut_down().await;
577
578        info!("Node has shut down.");
579    }
580}