snarkos_node/client/
mod.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19
20use snarkos_account::Account;
21use snarkos_node_bft::{events::DataBlocks, helpers::fmt_id, ledger_service::CoreLedgerService};
22use snarkos_node_cdn::CdnBlockSync;
23use snarkos_node_rest::Rest;
24use snarkos_node_router::{
25    Heartbeat,
26    Inbound,
27    Outbound,
28    Router,
29    Routing,
30    messages::{Message, NodeType, UnconfirmedSolution, UnconfirmedTransaction},
31};
32use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
33use snarkos_node_tcp::{
34    P2P,
35    protocols::{Disconnect, Handshake, OnConnect, Reading},
36};
37use snarkvm::{
38    console::network::Network,
39    ledger::{
40        Ledger,
41        block::{Block, Header},
42        puzzle::{Puzzle, Solution, SolutionID},
43        store::ConsensusStorage,
44    },
45    prelude::{VM, block::Transaction},
46};
47
48use aleo_std::StorageMode;
49use anyhow::Result;
50use core::future::Future;
51use indexmap::IndexMap;
52#[cfg(feature = "locktick")]
53use locktick::parking_lot::Mutex;
54use lru::LruCache;
55#[cfg(not(feature = "locktick"))]
56use parking_lot::Mutex;
57use std::{
58    net::SocketAddr,
59    num::NonZeroUsize,
60    sync::{
61        Arc,
62        atomic::{
63            AtomicBool,
64            AtomicUsize,
65            Ordering::{Acquire, Relaxed},
66        },
67    },
68    time::{Duration, Instant},
69};
70use tokio::{
71    task::JoinHandle,
72    time::{sleep, timeout},
73};
74
75/// The maximum number of solutions to verify in parallel.
76/// Note: worst case memory to verify a solution is 0.5 GiB.
77const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20;
78/// The capacity for storing unconfirmed deployments.
79/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
80const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
81/// The capacity for storing unconfirmed executions.
82/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
83const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
84/// The capacity for storing unconfirmed solutions.
85/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
86const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
87
88/// Transaction details needed for propagation.
89/// We preserve the serialized transaction for faster propagation.
90type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
91/// Solution details needed for propagation.
92/// We preserve the serialized solution for faster propagation.
93type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>);
94
95/// A client node is a full node, capable of querying with the network.
96#[derive(Clone)]
97pub struct Client<N: Network, C: ConsensusStorage<N>> {
98    /// The ledger of the node.
99    ledger: Ledger<N, C>,
100    /// The router of the node.
101    router: Router<N>,
102    /// The REST server of the node.
103    rest: Option<Rest<N, C, Self>>,
104    /// The block synchronization logic.
105    sync: Arc<BlockSync<N>>,
106    /// The genesis block.
107    genesis: Block<N>,
108    /// The puzzle.
109    puzzle: Puzzle<N>,
110    /// The unconfirmed solutions queue.
111    solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>,
112    /// The unconfirmed deployments queue.
113    deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
114    /// The unconfirmed executions queue.
115    execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
116    /// The amount of solutions currently being verified.
117    num_verifying_solutions: Arc<AtomicUsize>,
118    /// The amount of deployments currently being verified.
119    num_verifying_deploys: Arc<AtomicUsize>,
120    /// The amount of executions currently being verified.
121    num_verifying_executions: Arc<AtomicUsize>,
122    /// The spawned handles.
123    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
124    /// The shutdown signal.
125    shutdown: Arc<AtomicBool>,
126    /// Keeps track of sending pings.
127    ping: Arc<Ping<N>>,
128}
129
130impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
131    /// Initializes a new client node.
132    pub async fn new(
133        node_ip: SocketAddr,
134        rest_ip: Option<SocketAddr>,
135        rest_rps: u32,
136        account: Account<N>,
137        trusted_peers: &[SocketAddr],
138        genesis: Block<N>,
139        cdn: Option<http::Uri>,
140        storage_mode: StorageMode,
141        rotate_external_peers: bool,
142        dev: Option<u16>,
143        shutdown: Arc<AtomicBool>,
144    ) -> Result<Self> {
145        // Initialize the signal handler.
146        let signal_node = Self::handle_signals(shutdown.clone());
147
148        // Initialize the ledger.
149        let ledger = Ledger::<N, C>::load(genesis.clone(), storage_mode.clone())?;
150
151        // Initialize the ledger service.
152        let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), shutdown.clone()));
153        // Determine if the client should allow external peers.
154        let allow_external_peers = true;
155
156        // Initialize the node router.
157        let router = Router::new(
158            node_ip,
159            NodeType::Client,
160            account,
161            ledger_service.clone(),
162            trusted_peers,
163            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
164            rotate_external_peers,
165            allow_external_peers,
166            dev.is_some(),
167        )
168        .await?;
169
170        // Initialize the sync module.
171        let sync = Arc::new(BlockSync::new(ledger_service.clone()));
172
173        // Set up the ping logic.
174        let locators = sync.get_block_locators()?;
175        let ping = Arc::new(Ping::new(router.clone(), locators));
176
177        // Initialize the node.
178        let mut node = Self {
179            ledger: ledger.clone(),
180            router,
181            rest: None,
182            sync: sync.clone(),
183            genesis,
184            ping,
185            puzzle: ledger.puzzle().clone(),
186            solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
187            deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))),
188            execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))),
189            num_verifying_solutions: Default::default(),
190            num_verifying_deploys: Default::default(),
191            num_verifying_executions: Default::default(),
192            handles: Default::default(),
193            shutdown: shutdown.clone(),
194        };
195
196        // Perform sync with CDN (if enabled).
197        let cdn_sync = cdn.map(|base_url| {
198            trace!("CDN sync is enabled");
199            Arc::new(CdnBlockSync::new(base_url, ledger.clone(), shutdown))
200        });
201
202        // Initialize the REST server.
203        if let Some(rest_ip) = rest_ip {
204            node.rest = Some(
205                Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone()), cdn_sync.clone(), sync)
206                    .await?,
207            );
208        }
209
210        // Set up everything else after CDN sync is done.
211        if let Some(cdn_sync) = cdn_sync {
212            if let Err(error) = cdn_sync.wait().await {
213                crate::log_clean_error(&storage_mode);
214                node.shut_down().await;
215                return Err(error);
216            }
217        }
218
219        // Initialize the routing.
220        node.initialize_routing().await;
221        // Initialize the sync module.
222        node.initialize_sync();
223        // Initialize solution verification.
224        node.initialize_solution_verification();
225        // Initialize deployment verification.
226        node.initialize_deploy_verification();
227        // Initialize execution verification.
228        node.initialize_execute_verification();
229        // Initialize the notification message loop.
230        node.handles.lock().push(crate::start_notification_message_loop());
231        // Pass the node to the signal handler.
232        let _ = signal_node.set(node.clone());
233        // Return the node.
234        Ok(node)
235    }
236
237    /// Returns the ledger.
238    pub fn ledger(&self) -> &Ledger<N, C> {
239        &self.ledger
240    }
241
242    /// Returns the REST server.
243    pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
244        &self.rest
245    }
246
247    /// Returns the router.
248    pub fn router(&self) -> &Router<N> {
249        &self.router
250    }
251}
252
253/// Sync-specific code.
254impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
255    const SYNC_INTERVAL: Duration = std::time::Duration::from_secs(5);
256
257    /// Spawns the tasks that performs the syncing logic for this client.
258    fn initialize_sync(&self) {
259        // Start the sync loop.
260        let _self = self.clone();
261        let mut last_update = Instant::now();
262
263        self.handles.lock().push(tokio::spawn(async move {
264            loop {
265                // If the Ctrl-C handler registered the signal, stop the node.
266                if _self.shutdown.load(std::sync::atomic::Ordering::Acquire) {
267                    info!("Shutting down block production");
268                    break;
269                }
270
271                // Make sure we do not sync too often
272                let now = Instant::now();
273                let elapsed = now.saturating_duration_since(last_update);
274                let sleep_time = Self::SYNC_INTERVAL.saturating_sub(elapsed);
275
276                if !sleep_time.is_zero() {
277                    sleep(sleep_time).await;
278                }
279
280                // Perform the sync routine.
281                _self.try_block_sync().await;
282                last_update = now;
283            }
284        }));
285    }
286
287    /// Client-side version of `snarkvm_node_bft::Sync::try_block_sync()`.
288    async fn try_block_sync(&self) {
289        // Sleep briefly to avoid triggering spam detection.
290        let _ = timeout(Self::SYNC_INTERVAL, self.sync.wait_for_update()).await;
291
292        // For sanity, check that sync height is never below ledger height.
293        // (if the ledger height is lower or equal to the current sync height, this is a noop)
294        self.sync.set_sync_height(self.ledger.latest_height());
295
296        let new_requests = self.sync.handle_block_request_timeouts(self);
297        if let Some((block_requests, sync_peers)) = new_requests {
298            self.send_block_requests(block_requests, sync_peers).await;
299        }
300
301        // Do not attempt to sync if there are not blocks to sync.
302        // This prevents redundant log messages and performing unnecessary computation.
303        if !self.sync.can_block_sync() {
304            return;
305        }
306
307        // Prepare the block requests, if any.
308        // In the process, we update the state of `is_block_synced` for the sync module.
309        let (block_requests, sync_peers) = self.sync.prepare_block_requests();
310
311        // If there are no block requests, but there are pending block responses in the sync pool,
312        // then try to advance the ledger using these pending block responses.
313        if block_requests.is_empty() && self.sync.has_pending_responses() {
314            // Try to advance the ledger with the sync pool.
315            trace!("No block requests to send. Will process pending responses.");
316            let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
317                Ok(val) => val,
318                Err(err) => {
319                    error!("{err}");
320                    return;
321                }
322            };
323
324            if has_new_blocks {
325                match self.sync.get_block_locators() {
326                    Ok(locators) => self.ping.update_block_locators(locators),
327                    Err(err) => error!("Failed to get block locators: {err}"),
328                }
329            }
330        } else if block_requests.is_empty() {
331            let total_requests = self.sync.num_total_block_requests();
332            let num_outstanding = self.sync.num_outstanding_block_requests();
333            if total_requests > 0 {
334                trace!(
335                    "Not block synced yet, but there are still {total_requests} in-flight requests. {num_outstanding} are still awaiting responses."
336                );
337            } else {
338                // This can happen during peer rotation and should not be a warning.
339                debug!(
340                    "Not block synced yet, and there are no outstanding block requests or \
341                 new block requests to send"
342                );
343            }
344        } else {
345            self.send_block_requests(block_requests, sync_peers).await;
346        }
347    }
348
349    async fn send_block_requests(
350        &self,
351        block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
352        sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
353    ) {
354        // Issues the block requests in batches.
355        for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
356            if !self.sync.send_block_requests(self, &sync_peers, requests).await {
357                // Stop if we fail to process a batch of requests.
358                break;
359            }
360
361            // Sleep to avoid triggering spam detection.
362            tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
363        }
364    }
365
366    /// Initializes solution verification.
367    fn initialize_solution_verification(&self) {
368        // Start the solution verification loop.
369        let node = self.clone();
370        self.handles.lock().push(tokio::spawn(async move {
371            loop {
372                // If the Ctrl-C handler registered the signal, stop the node.
373                if node.shutdown.load(Acquire) {
374                    info!("Shutting down solution verification");
375                    break;
376                }
377
378                // Determine if the queue contains txs to verify.
379                let queue_is_empty = node.solution_queue.lock().is_empty();
380                // Determine if our verification counter has space to verify new solutions.
381                let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS;
382
383                // Sleep to allow the queue to be filled or solutions to be validated.
384                if queue_is_empty || counter_is_full {
385                    sleep(Duration::from_millis(50)).await;
386                    continue;
387                }
388
389                // Try to verify solutions.
390                let mut solution_queue = node.solution_queue.lock();
391                while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() {
392                    // Increment the verification counter.
393                    let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed);
394                    let _node = node.clone();
395                    // For each solution, spawn a task to verify it.
396                    tokio::task::spawn_blocking(move || {
397                        // Retrieve the latest epoch hash.
398                        if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() {
399                            // Check if the prover has reached their solution limit.
400                            // While snarkVM will ultimately abort any excess solutions for safety, performing this check
401                            // here prevents the to-be aborted solutions from propagating through the network.
402                            let prover_address = solution.address();
403                            if _node.ledger.is_solution_limit_reached(&prover_address, 0) {
404                                debug!("Invalid Solution '{}' - Prover '{prover_address}' has reached their solution limit for the current epoch", fmt_id(solution.id()));
405                            }
406                            // Retrieve the latest proof target.
407                            let proof_target = _node.ledger.latest_block().header().proof_target();
408                            // Ensure that the solution is valid for the given epoch.
409                            let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target);
410
411                            match is_valid {
412                                // If the solution is valid, propagate the `UnconfirmedSolution`.
413                                Ok(()) => {
414                                    let message = Message::UnconfirmedSolution(serialized);
415                                    // Propagate the "UnconfirmedSolution".
416                                    _node.propagate(message, &[peer_ip]);
417                                }
418                                // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore.
419                                Err(error) => {
420                                    if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
421                                        debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}")
422                                    }
423                                }
424                            }
425                        } else {
426                            warn!("Failed to retrieve the latest epoch hash.");
427                        }
428                        // Decrement the verification counter.
429                        _node.num_verifying_solutions.fetch_sub(1, Relaxed);
430                    });
431                    // If we are already at capacity, don't verify more solutions.
432                    if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS {
433                        break;
434                    }
435                }
436            }
437        }));
438    }
439
440    /// Initializes deploy verification.
441    fn initialize_deploy_verification(&self) {
442        // Start the deploy verification loop.
443        let node = self.clone();
444        self.handles.lock().push(tokio::spawn(async move {
445            loop {
446                // If the Ctrl-C handler registered the signal, stop the node.
447                if node.shutdown.load(Acquire) {
448                    info!("Shutting down deployment verification");
449                    break;
450                }
451
452                // Determine if the queue contains txs to verify.
453                let queue_is_empty = node.deploy_queue.lock().is_empty();
454                // Determine if our verification counter has space to verify new txs.
455                let counter_is_full =
456                    node.num_verifying_deploys.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS;
457
458                // Sleep to allow the queue to be filled or transactions to be validated.
459                if queue_is_empty || counter_is_full {
460                    sleep(Duration::from_millis(50)).await;
461                    continue;
462                }
463
464                // Try to verify deployments.
465                while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() {
466                    // Increment the verification counter.
467                    let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed);
468                    let _node = node.clone();
469                    // For each deployment, spawn a task to verify it.
470                    tokio::task::spawn_blocking(move || {
471                        // Check the deployment.
472                        match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
473                            Ok(_) => {
474                                // Propagate the `UnconfirmedTransaction`.
475                                _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
476                            }
477                            Err(error) => {
478                                debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}");
479                            }
480                        }
481                        // Decrement the verification counter.
482                        _node.num_verifying_deploys.fetch_sub(1, Relaxed);
483                    });
484                    // If we are already at capacity, don't verify more deployments.
485                    if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS {
486                        break;
487                    }
488                }
489            }
490        }));
491    }
492
493    /// Initializes execute verification.
494    fn initialize_execute_verification(&self) {
495        // Start the execute verification loop.
496        let node = self.clone();
497        self.handles.lock().push(tokio::spawn(async move {
498            loop {
499                // If the Ctrl-C handler registered the signal, stop the node.
500                if node.shutdown.load(Acquire) {
501                    info!("Shutting down execution verification");
502                    break;
503                }
504
505                // Determine if the queue contains txs to verify.
506                let queue_is_empty = node.execute_queue.lock().is_empty();
507                // Determine if our verification counter has space to verify new txs.
508                let counter_is_full =
509                    node.num_verifying_executions.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS;
510
511                // Sleep to allow the queue to be filled or transactions to be validated.
512                if queue_is_empty || counter_is_full {
513                    sleep(Duration::from_millis(50)).await;
514                    continue;
515                }
516
517                // Try to verify executions.
518                while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() {
519                    // Increment the verification counter.
520                    let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed);
521                    let _node = node.clone();
522                    // For each execution, spawn a task to verify it.
523                    tokio::task::spawn_blocking(move || {
524                        // Check the execution.
525                        match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
526                            Ok(_) => {
527                                // Propagate the `UnconfirmedTransaction`.
528                                _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
529                            }
530                            Err(error) => {
531                                debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}");
532                            }
533                        }
534                        // Decrement the verification counter.
535                        _node.num_verifying_executions.fetch_sub(1, Relaxed);
536                    });
537                    // If we are already at capacity, don't verify more executions.
538                    if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS {
539                        break;
540                    }
541                }
542            }
543        }));
544    }
545
546    /// Spawns a task with the given future; it should only be used for long-running tasks.
547    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
548        self.handles.lock().push(tokio::spawn(future));
549    }
550}
551
552#[async_trait]
553impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
554    /// Shuts down the node.
555    async fn shut_down(&self) {
556        info!("Shutting down...");
557
558        // Shut down the node.
559        trace!("Shutting down the node...");
560        self.shutdown.store(true, std::sync::atomic::Ordering::Release);
561
562        // Abort the tasks.
563        trace!("Shutting down the client...");
564        self.handles.lock().iter().for_each(|handle| handle.abort());
565
566        // Shut down the router.
567        self.router.shut_down().await;
568
569        info!("Node has shut down.");
570    }
571}