snarkos_node/client/
mod.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19
20use snarkos_account::Account;
21use snarkos_node_bft::{events::DataBlocks, helpers::fmt_id, ledger_service::CoreLedgerService};
22use snarkos_node_cdn::CdnBlockSync;
23use snarkos_node_network::NodeType;
24use snarkos_node_rest::Rest;
25use snarkos_node_router::{
26    Heartbeat,
27    Inbound,
28    Outbound,
29    Router,
30    Routing,
31    messages::{Message, UnconfirmedSolution, UnconfirmedTransaction},
32};
33use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
34use snarkos_node_tcp::{
35    P2P,
36    protocols::{Disconnect, Handshake, OnConnect, Reading},
37};
38use snarkvm::{
39    console::network::Network,
40    ledger::{
41        Ledger,
42        block::{Block, Header},
43        puzzle::{Puzzle, Solution, SolutionID},
44        store::ConsensusStorage,
45    },
46    prelude::{VM, block::Transaction},
47    utilities::log_error,
48};
49
50use aleo_std::StorageMode;
51use anyhow::Result;
52use core::future::Future;
53use indexmap::IndexMap;
54#[cfg(feature = "locktick")]
55use locktick::parking_lot::Mutex;
56use lru::LruCache;
57#[cfg(not(feature = "locktick"))]
58use parking_lot::Mutex;
59use std::{
60    net::SocketAddr,
61    num::NonZeroUsize,
62    sync::{
63        Arc,
64        atomic::{
65            AtomicBool,
66            AtomicUsize,
67            Ordering::{Acquire, Relaxed},
68        },
69    },
70    time::Duration,
71};
72use tokio::{
73    task::JoinHandle,
74    time::{sleep, timeout},
75};
76
77/// The maximum number of solutions to verify in parallel.
78/// Note: worst case memory to verify a solution is 0.5 GiB.
79const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20;
80/// The capacity for storing unconfirmed deployments.
81/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
82const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
83/// The capacity for storing unconfirmed executions.
84/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
85const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
86/// The capacity for storing unconfirmed solutions.
87/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
88const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
89
90/// Transaction details needed for propagation.
91/// We preserve the serialized transaction for faster propagation.
92type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
93/// Solution details needed for propagation.
94/// We preserve the serialized solution for faster propagation.
95type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>);
96
97/// A client node is a full node, capable of querying with the network.
98#[derive(Clone)]
99pub struct Client<N: Network, C: ConsensusStorage<N>> {
100    /// The ledger of the node.
101    ledger: Ledger<N, C>,
102    /// The router of the node.
103    router: Router<N>,
104    /// The REST server of the node.
105    rest: Option<Rest<N, C, Self>>,
106    /// The block synchronization logic.
107    sync: Arc<BlockSync<N>>,
108    /// The genesis block.
109    genesis: Block<N>,
110    /// The puzzle.
111    puzzle: Puzzle<N>,
112    /// The unconfirmed solutions queue.
113    solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>,
114    /// The unconfirmed deployments queue.
115    deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
116    /// The unconfirmed executions queue.
117    execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
118    /// The amount of solutions currently being verified.
119    num_verifying_solutions: Arc<AtomicUsize>,
120    /// The amount of deployments currently being verified.
121    num_verifying_deploys: Arc<AtomicUsize>,
122    /// The amount of executions currently being verified.
123    num_verifying_executions: Arc<AtomicUsize>,
124    /// The spawned handles.
125    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
126    /// The shutdown signal.
127    shutdown: Arc<AtomicBool>,
128    /// Keeps track of sending pings.
129    ping: Arc<Ping<N>>,
130}
131
132impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
133    /// Initializes a new client node.
134    pub async fn new(
135        node_ip: SocketAddr,
136        rest_ip: Option<SocketAddr>,
137        rest_rps: u32,
138        account: Account<N>,
139        trusted_peers: &[SocketAddr],
140        genesis: Block<N>,
141        cdn: Option<http::Uri>,
142        storage_mode: StorageMode,
143        trusted_peers_only: bool,
144        dev: Option<u16>,
145        shutdown: Arc<AtomicBool>,
146    ) -> Result<Self> {
147        // Initialize the signal handler.
148        let signal_node = Self::handle_signals(shutdown.clone());
149
150        // Initialize the ledger.
151        let ledger = Ledger::<N, C>::load(genesis.clone(), storage_mode.clone())?;
152
153        // Initialize the ledger service.
154        let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), shutdown.clone()));
155
156        // Initialize the node router.
157        let router = Router::new(
158            node_ip,
159            NodeType::Client,
160            account,
161            ledger_service.clone(),
162            trusted_peers,
163            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
164            trusted_peers_only,
165            storage_mode.clone(),
166            dev.is_some(),
167        )
168        .await?;
169
170        // Initialize the sync module.
171        let sync = Arc::new(BlockSync::new(ledger_service.clone()));
172
173        // Set up the ping logic.
174        let locators = sync.get_block_locators()?;
175        let ping = Arc::new(Ping::new(router.clone(), locators));
176
177        // Initialize the node.
178        let mut node = Self {
179            ledger: ledger.clone(),
180            router,
181            rest: None,
182            sync: sync.clone(),
183            genesis,
184            ping,
185            puzzle: ledger.puzzle().clone(),
186            solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
187            deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))),
188            execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))),
189            num_verifying_solutions: Default::default(),
190            num_verifying_deploys: Default::default(),
191            num_verifying_executions: Default::default(),
192            handles: Default::default(),
193            shutdown: shutdown.clone(),
194        };
195
196        // Perform sync with CDN (if enabled).
197        let cdn_sync = cdn.map(|base_url| {
198            trace!("CDN sync is enabled");
199            Arc::new(CdnBlockSync::new(base_url, ledger.clone(), shutdown))
200        });
201
202        // Initialize the REST server.
203        if let Some(rest_ip) = rest_ip {
204            node.rest = Some(
205                Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone()), cdn_sync.clone(), sync)
206                    .await?,
207            );
208        }
209
210        // Set up everything else after CDN sync is done.
211        if let Some(cdn_sync) = cdn_sync {
212            if let Err(error) = cdn_sync.wait().await {
213                crate::log_clean_error(&storage_mode);
214                node.shut_down().await;
215                return Err(error);
216            }
217        }
218
219        // Initialize the routing.
220        node.initialize_routing().await;
221        // Initialize the sync module.
222        node.initialize_sync();
223        // Initialize solution verification.
224        node.initialize_solution_verification();
225        // Initialize deployment verification.
226        node.initialize_deploy_verification();
227        // Initialize execution verification.
228        node.initialize_execute_verification();
229        // Initialize the notification message loop.
230        node.handles.lock().push(crate::start_notification_message_loop());
231        // Pass the node to the signal handler.
232        let _ = signal_node.set(node.clone());
233        // Return the node.
234        Ok(node)
235    }
236
237    /// Returns the ledger.
238    pub fn ledger(&self) -> &Ledger<N, C> {
239        &self.ledger
240    }
241
242    /// Returns the REST server.
243    pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
244        &self.rest
245    }
246
247    /// Returns the router.
248    pub fn router(&self) -> &Router<N> {
249        &self.router
250    }
251}
252
253/// Sync-specific code.
254impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
255    /// The maximum time to wait for peer updates before timing out and attempting to issue new requests.
256    /// This only exists as a fallback for the (unlikely) case a task does not get notified about updates.
257    const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
258
259    /// Spawns the tasks that performs the syncing logic for this client.
260    fn initialize_sync(&self) {
261        // Start the block request generation loop (outgoing).
262        let self_ = self.clone();
263        self.spawn(async move {
264            while !self_.shutdown.load(std::sync::atomic::Ordering::Acquire) {
265                // Perform the sync routine.
266                self_.try_issuing_block_requests().await;
267
268                // Rate limiting happens in [`Self::send_block_requests`] and no additional sleeps are needed here
269            }
270
271            info!("Stopped block request generation");
272        });
273
274        // Start the block response processing loop (incoming).
275        let self_ = self.clone();
276        self.spawn(async move {
277            while !self_.shutdown.load(std::sync::atomic::Ordering::Acquire) {
278                // Wait until there is something to do or until the timeout.
279                let _ = timeout(Self::MAX_SYNC_INTERVAL, self_.sync.wait_for_block_responses()).await;
280
281                // Perform the sync routine.
282                self_.try_advancing_block_synchronization().await;
283
284                // We perform no additional rate limiting here as
285                // requests are already rate-limited.
286            }
287
288            debug!("Stopped block response processing");
289        });
290    }
291
292    /// Client-side version of [`snarkvm_node_bft::Sync::try_advancing_block_synchronization`].
293    async fn try_advancing_block_synchronization(&self) {
294        let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
295            Ok(val) => val,
296            Err(err) => {
297                error!("Block synchronization failed - {err}");
298                return;
299            }
300        };
301
302        // If there are new blocks, we need to update the block locators.
303        if has_new_blocks {
304            match self.sync.get_block_locators() {
305                Ok(locators) => self.ping.update_block_locators(locators),
306                Err(err) => error!("Failed to get block locators: {err}"),
307            }
308        }
309    }
310
311    /// Client-side version of `snarkvm_node_bft::Sync::try_block_sync()`.
312    async fn try_issuing_block_requests(&self) {
313        // Wait for peer updates or timeout
314        let _ = timeout(Self::MAX_SYNC_INTERVAL, self.sync.wait_for_peer_update()).await;
315
316        // For sanity, check that sync height is never below ledger height.
317        // (if the ledger height is lower or equal to the current sync height, this is a noop)
318        self.sync.set_sync_height(self.ledger.latest_height());
319
320        match self.sync.handle_block_request_timeouts(&self.router) {
321            Ok(Some((requests, sync_peers))) => {
322                // Re-request blocks instead of performing regular block sync.
323                self.send_block_requests(requests, sync_peers).await;
324                return;
325            }
326            Ok(None) => {}
327            Err(err) => {
328                // Abort and retry later.
329                log_error(&err);
330                return;
331            }
332        }
333
334        // Do not attempt to sync if there are not blocks to sync.
335        // This prevents redundant log messages and performing unnecessary computation.
336        if !self.sync.can_block_sync() {
337            trace!("Nothing to sync. Will not issue new block requests");
338            return;
339        }
340
341        // Prepare the block requests, if any.
342        // In the process, we update the state of `is_block_synced` for the sync module.
343        let (block_requests, sync_peers) = self.sync.prepare_block_requests();
344
345        // If there are no block requests, but there are pending block responses in the sync pool,
346        // then try to advance the ledger using these pending block responses.
347        if block_requests.is_empty() {
348            let total_requests = self.sync.num_total_block_requests();
349            let num_outstanding = self.sync.num_outstanding_block_requests();
350            if total_requests > 0 {
351                trace!(
352                    "Not block synced yet, but there are still {total_requests} in-flight requests. {num_outstanding} are still awaiting responses."
353                );
354            } else {
355                // This can happen during peer rotation and should not be a warning.
356                debug!(
357                    "Not block synced yet, and there are no outstanding block requests or \
358                 new block requests to send"
359                );
360            }
361        } else {
362            self.send_block_requests(block_requests, sync_peers).await;
363        }
364    }
365
366    async fn send_block_requests(
367        &self,
368        block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
369        sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
370    ) {
371        // Issues the block requests in batches.
372        for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
373            if !self.sync.send_block_requests(self.router(), &sync_peers, requests).await {
374                // Stop if we fail to process a batch of requests.
375                break;
376            }
377
378            // Sleep to avoid triggering spam detection.
379            tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
380        }
381    }
382
383    /// Initializes solution verification.
384    fn initialize_solution_verification(&self) {
385        // Start the solution verification loop.
386        let node = self.clone();
387        self.spawn(async move {
388            loop {
389                // If the Ctrl-C handler registered the signal, stop the node.
390                if node.shutdown.load(Acquire) {
391                    info!("Shutting down solution verification");
392                    break;
393                }
394
395                // Determine if the queue contains txs to verify.
396                let queue_is_empty = node.solution_queue.lock().is_empty();
397                // Determine if our verification counter has space to verify new solutions.
398                let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS;
399
400                // Sleep to allow the queue to be filled or solutions to be validated.
401                if queue_is_empty || counter_is_full {
402                    sleep(Duration::from_millis(50)).await;
403                    continue;
404                }
405
406                // Try to verify solutions.
407                let mut solution_queue = node.solution_queue.lock();
408                while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() {
409                    // Increment the verification counter.
410                    let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed);
411                    let _node = node.clone();
412                    // For each solution, spawn a task to verify it.
413                    tokio::task::spawn_blocking(move || {
414                        // Retrieve the latest epoch hash.
415                        if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() {
416                            // Check if the prover has reached their solution limit.
417                            // While snarkVM will ultimately abort any excess solutions for safety, performing this check
418                            // here prevents the to-be aborted solutions from propagating through the network.
419                            let prover_address = solution.address();
420                            if _node.ledger.is_solution_limit_reached(&prover_address, 0) {
421                                debug!("Invalid Solution '{}' - Prover '{prover_address}' has reached their solution limit for the current epoch", fmt_id(solution.id()));
422                            }
423                            // Retrieve the latest proof target.
424                            let proof_target = _node.ledger.latest_block().header().proof_target();
425                            // Ensure that the solution is valid for the given epoch.
426                            let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target);
427
428                            match is_valid {
429                                // If the solution is valid, propagate the `UnconfirmedSolution`.
430                                Ok(()) => {
431                                    let message = Message::UnconfirmedSolution(serialized);
432                                    // Propagate the "UnconfirmedSolution".
433                                    _node.propagate(message, &[peer_ip]);
434                                }
435                                // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore.
436                                Err(error) => {
437                                    if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
438                                        debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}")
439                                    }
440                                }
441                            }
442                        } else {
443                            warn!("Failed to retrieve the latest epoch hash.");
444                        }
445                        // Decrement the verification counter.
446                        _node.num_verifying_solutions.fetch_sub(1, Relaxed);
447                    });
448                    // If we are already at capacity, don't verify more solutions.
449                    if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS {
450                        break;
451                    }
452                }
453            }
454        });
455    }
456
457    /// Initializes deploy verification.
458    fn initialize_deploy_verification(&self) {
459        // Start the deploy verification loop.
460        let node = self.clone();
461        self.spawn(async move {
462            loop {
463                // If the Ctrl-C handler registered the signal, stop the node.
464                if node.shutdown.load(Acquire) {
465                    info!("Shutting down deployment verification");
466                    break;
467                }
468
469                // Determine if the queue contains txs to verify.
470                let queue_is_empty = node.deploy_queue.lock().is_empty();
471                // Determine if our verification counter has space to verify new txs.
472                let counter_is_full =
473                    node.num_verifying_deploys.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS;
474
475                // Sleep to allow the queue to be filled or transactions to be validated.
476                if queue_is_empty || counter_is_full {
477                    sleep(Duration::from_millis(50)).await;
478                    continue;
479                }
480
481                // Try to verify deployments.
482                while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() {
483                    // Increment the verification counter.
484                    let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed);
485                    let _node = node.clone();
486                    // For each deployment, spawn a task to verify it.
487                    tokio::task::spawn_blocking(move || {
488                        // First collect the state root.
489                        let Some(state_root) = transaction.fee_transition().map(|t| t.global_state_root()) else {
490                            debug!("Failed to access global state root for deployment from peer_ip {peer_ip}");
491                            _node.num_verifying_deploys.fetch_sub(1, Relaxed);
492                            return;
493                        };
494                        // Check if the state root is in the ledger.
495                        if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
496                            debug!("Failed to find global state root for deployment from peer_ip {peer_ip}, propagating anyway");
497                            // Propagate the `UnconfirmedTransaction`.
498                            _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
499                            _node.num_verifying_deploys.fetch_sub(1, Relaxed);
500                            return;
501                            // Also skip the `check_transaction_basic` call if it is already propagated.
502                        }
503                        // Check the deployment.
504                        match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
505                            Ok(_) => {
506                                // Propagate the `UnconfirmedTransaction`.
507                                _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
508                            }
509                            Err(error) => {
510                                debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}");
511                            }
512                        }
513                        // Decrement the verification counter.
514                        _node.num_verifying_deploys.fetch_sub(1, Relaxed);
515                    });
516                    // If we are already at capacity, don't verify more deployments.
517                    if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS {
518                        break;
519                    }
520                }
521            }
522        });
523    }
524
525    /// Initializes execute verification.
526    fn initialize_execute_verification(&self) {
527        // Start the execute verification loop.
528        let node = self.clone();
529        self.spawn(async move {
530            loop {
531                // If the Ctrl-C handler registered the signal, stop the node.
532                if node.shutdown.load(Acquire) {
533                    info!("Shutting down execution verification");
534                    break;
535                }
536
537                // Determine if the queue contains txs to verify.
538                let queue_is_empty = node.execute_queue.lock().is_empty();
539                // Determine if our verification counter has space to verify new txs.
540                let counter_is_full =
541                    node.num_verifying_executions.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS;
542
543                // Sleep to allow the queue to be filled or transactions to be validated.
544                if queue_is_empty || counter_is_full {
545                    sleep(Duration::from_millis(50)).await;
546                    continue;
547                }
548
549                // Try to verify executions.
550                while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() {
551                    // Increment the verification counter.
552                    let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed);
553                    let _node = node.clone();
554                    // For each execution, spawn a task to verify it.
555                    tokio::task::spawn_blocking(move || {
556                        // First collect the state roots.
557                        let state_roots = [
558                            transaction.execution().map(|t| t.global_state_root()),
559                            transaction.fee_transition().map(|t| t.global_state_root()),
560                        ]
561                        .into_iter()
562                        .flatten();
563
564                        for state_root in state_roots {
565                            if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
566                                debug!("Failed to find global state root for execution from peer_ip {peer_ip}, propagating anyway");
567                                // Propagate the `UnconfirmedTransaction`.
568                                _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
569                                _node.num_verifying_executions.fetch_sub(1, Relaxed);
570                                return;
571                                // Also skip the `check_transaction_basic` call if it is already propagated.
572                            }
573                        }
574                        // Check the execution.
575                        match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
576                            Ok(_) => {
577                                // Propagate the `UnconfirmedTransaction`.
578                                _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
579                            }
580                            Err(error) => {
581                                debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}");
582                            }
583                        }
584                        // Decrement the verification counter.
585                        _node.num_verifying_executions.fetch_sub(1, Relaxed);
586                    });
587                    // If we are already at capacity, don't verify more executions.
588                    if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS {
589                        break;
590                    }
591                }
592            }
593        });
594    }
595
596    /// Spawns a task with the given future; it should only be used for long-running tasks.
597    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
598        self.handles.lock().push(tokio::spawn(future));
599    }
600}
601
602#[async_trait]
603impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
604    /// Shuts down the node.
605    async fn shut_down(&self) {
606        info!("Shutting down...");
607
608        // Shut down the node.
609        trace!("Shutting down the node...");
610        self.shutdown.store(true, std::sync::atomic::Ordering::Release);
611
612        // Abort the tasks.
613        trace!("Shutting down the client...");
614        self.handles.lock().iter().for_each(|handle| handle.abort());
615
616        // Shut down the router.
617        self.router.shut_down().await;
618
619        info!("Node has shut down.");
620    }
621}