snarkos_node/client/
mod.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19
20use snarkos_account::Account;
21use snarkos_node_bft::{events::DataBlocks, helpers::fmt_id, ledger_service::CoreLedgerService};
22use snarkos_node_cdn::CdnBlockSync;
23use snarkos_node_rest::Rest;
24use snarkos_node_router::{
25    Heartbeat,
26    Inbound,
27    Outbound,
28    Router,
29    Routing,
30    messages::{Message, NodeType, UnconfirmedSolution, UnconfirmedTransaction},
31};
32use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
33use snarkos_node_tcp::{
34    P2P,
35    protocols::{Disconnect, Handshake, OnConnect, Reading},
36};
37use snarkvm::{
38    console::network::Network,
39    ledger::{
40        Ledger,
41        block::{Block, Header},
42        puzzle::{Puzzle, Solution, SolutionID},
43        store::ConsensusStorage,
44    },
45    prelude::block::Transaction,
46};
47
48use aleo_std::StorageMode;
49use anyhow::Result;
50use core::future::Future;
51use indexmap::IndexMap;
52#[cfg(feature = "locktick")]
53use locktick::parking_lot::Mutex;
54use lru::LruCache;
55#[cfg(not(feature = "locktick"))]
56use parking_lot::Mutex;
57use std::{
58    net::SocketAddr,
59    num::NonZeroUsize,
60    sync::{
61        Arc,
62        atomic::{
63            AtomicBool,
64            AtomicUsize,
65            Ordering::{Acquire, Relaxed},
66        },
67    },
68    time::{Duration, Instant},
69};
70use tokio::{
71    task::JoinHandle,
72    time::{sleep, timeout},
73};
74
75/// The maximum number of deployments to verify in parallel.
76/// Note: worst case memory to verify a deployment (MAX_DEPLOYMENT_CONSTRAINTS = 1 << 20) is ~2 GiB.
77const MAX_PARALLEL_DEPLOY_VERIFICATIONS: usize = 5;
78/// The maximum number of executions to verify in parallel.
79/// Note: worst case memory to verify an execution is 0.01 GiB.
80const MAX_PARALLEL_EXECUTE_VERIFICATIONS: usize = 1000;
81/// The maximum number of solutions to verify in parallel.
82/// Note: worst case memory to verify a solution is 0.5 GiB.
83const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20;
84/// The capacity for storing unconfirmed deployments.
85/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
86const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
87/// The capacity for storing unconfirmed executions.
88/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
89const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
90/// The capacity for storing unconfirmed solutions.
91/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
92const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
93
94/// Transaction details needed for propagation.
95/// We preserve the serialized transaction for faster propagation.
96type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
97/// Solution details needed for propagation.
98/// We preserve the serialized solution for faster propagation.
99type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>);
100
101/// A client node is a full node, capable of querying with the network.
102#[derive(Clone)]
103pub struct Client<N: Network, C: ConsensusStorage<N>> {
104    /// The ledger of the node.
105    ledger: Ledger<N, C>,
106    /// The router of the node.
107    router: Router<N>,
108    /// The REST server of the node.
109    rest: Option<Rest<N, C, Self>>,
110    /// The block synchronization logic.
111    sync: Arc<BlockSync<N>>,
112    /// The genesis block.
113    genesis: Block<N>,
114    /// The puzzle.
115    puzzle: Puzzle<N>,
116    /// The unconfirmed solutions queue.
117    solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>,
118    /// The unconfirmed deployments queue.
119    deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
120    /// The unconfirmed executions queue.
121    execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
122    /// The amount of solutions currently being verified.
123    num_verifying_solutions: Arc<AtomicUsize>,
124    /// The amount of deployments currently being verified.
125    num_verifying_deploys: Arc<AtomicUsize>,
126    /// The amount of executions currently being verified.
127    num_verifying_executions: Arc<AtomicUsize>,
128    /// The spawned handles.
129    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
130    /// The shutdown signal.
131    shutdown: Arc<AtomicBool>,
132    /// Keeps track of sending pings.
133    ping: Arc<Ping<N>>,
134}
135
136impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
137    /// Initializes a new client node.
138    pub async fn new(
139        node_ip: SocketAddr,
140        rest_ip: Option<SocketAddr>,
141        rest_rps: u32,
142        account: Account<N>,
143        trusted_peers: &[SocketAddr],
144        genesis: Block<N>,
145        cdn: Option<String>,
146        storage_mode: StorageMode,
147        rotate_external_peers: bool,
148        shutdown: Arc<AtomicBool>,
149    ) -> Result<Self> {
150        // Initialize the signal handler.
151        let signal_node = Self::handle_signals(shutdown.clone());
152
153        // Initialize the ledger.
154        let ledger = Ledger::<N, C>::load(genesis.clone(), storage_mode.clone())?;
155
156        // Initialize the ledger service.
157        let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), shutdown.clone()));
158        // Determine if the client should allow external peers.
159        let allow_external_peers = true;
160
161        // Initialize the node router.
162        let router = Router::new(
163            node_ip,
164            NodeType::Client,
165            account,
166            ledger_service.clone(),
167            trusted_peers,
168            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
169            rotate_external_peers,
170            allow_external_peers,
171            matches!(storage_mode, StorageMode::Development(_)),
172        )
173        .await?;
174
175        // Initialize the sync module.
176        let sync = Arc::new(BlockSync::new(ledger_service.clone()));
177
178        // Set up the ping logic.
179        let locators = sync.get_block_locators()?;
180        let ping = Arc::new(Ping::new(router.clone(), locators));
181
182        // Initialize the node.
183        let mut node = Self {
184            ledger: ledger.clone(),
185            router,
186            rest: None,
187            sync: sync.clone(),
188            genesis,
189            ping,
190            puzzle: ledger.puzzle().clone(),
191            solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
192            deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))),
193            execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))),
194            num_verifying_solutions: Default::default(),
195            num_verifying_deploys: Default::default(),
196            num_verifying_executions: Default::default(),
197            handles: Default::default(),
198            shutdown: shutdown.clone(),
199        };
200
201        // Perform sync with CDN (if enabled).
202        let cdn_sync = cdn.map(|base_url| {
203            trace!("CDN sync is enabled");
204            Arc::new(CdnBlockSync::new(base_url, ledger.clone(), shutdown))
205        });
206
207        // Initialize the REST server.
208        if let Some(rest_ip) = rest_ip {
209            node.rest = Some(
210                Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone()), cdn_sync.clone(), sync)
211                    .await?,
212            );
213        }
214
215        // Set up everything else after CDN sync is done.
216        if let Some(cdn_sync) = cdn_sync {
217            if let Err(error) = cdn_sync.wait().await {
218                crate::log_clean_error(&storage_mode);
219                node.shut_down().await;
220                return Err(error);
221            }
222        }
223
224        // Initialize the routing.
225        node.initialize_routing().await;
226        // Initialize the sync module.
227        node.initialize_sync();
228        // Initialize solution verification.
229        node.initialize_solution_verification();
230        // Initialize deployment verification.
231        node.initialize_deploy_verification();
232        // Initialize execution verification.
233        node.initialize_execute_verification();
234        // Initialize the notification message loop.
235        node.handles.lock().push(crate::start_notification_message_loop());
236        // Pass the node to the signal handler.
237        let _ = signal_node.set(node.clone());
238        // Return the node.
239        Ok(node)
240    }
241
242    /// Returns the ledger.
243    pub fn ledger(&self) -> &Ledger<N, C> {
244        &self.ledger
245    }
246
247    /// Returns the REST server.
248    pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
249        &self.rest
250    }
251}
252
253impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
254    const SYNC_INTERVAL: Duration = std::time::Duration::from_secs(5);
255
256    /// Spawns the tasks that performs the syncing logic for this client.
257    fn initialize_sync(&self) {
258        // Start the sync loop.
259        let _self = self.clone();
260        let mut last_update = Instant::now();
261
262        self.handles.lock().push(tokio::spawn(async move {
263            loop {
264                // If the Ctrl-C handler registered the signal, stop the node.
265                if _self.shutdown.load(std::sync::atomic::Ordering::Acquire) {
266                    info!("Shutting down block production");
267                    break;
268                }
269
270                // Make sure we do not sync too often
271                let now = Instant::now();
272                let elapsed = now.saturating_duration_since(last_update);
273                let sleep_time = Self::SYNC_INTERVAL.saturating_sub(elapsed);
274
275                if !sleep_time.is_zero() {
276                    sleep(sleep_time).await;
277                }
278
279                // Perform the sync routine.
280                _self.try_block_sync().await;
281                last_update = now;
282            }
283        }));
284    }
285
286    /// Client-side version of `snarkvm_node_bft::Sync::try_block_sync()`.
287    async fn try_block_sync(&self) {
288        // Sleep briefly to avoid triggering spam detection.
289        let _ = timeout(Self::SYNC_INTERVAL, self.sync.wait_for_update()).await;
290
291        // For sanity, check that sync height is never below ledger height.
292        // (if the ledger height is lower or equal to the current sync height, this is a noop)
293        self.sync.set_sync_height(self.ledger.latest_height());
294
295        let new_requests = self.sync.handle_block_request_timeouts(self);
296        if let Some((block_requests, sync_peers)) = new_requests {
297            self.send_block_requests(block_requests, sync_peers).await;
298        }
299
300        // Do not attempt to sync if there are not blocks to sync.
301        // This prevents redundant log messages and performing unnecessary computation.
302        if !self.sync.can_block_sync() {
303            return;
304        }
305
306        // Prepare the block requests, if any.
307        // In the process, we update the state of `is_block_synced` for the sync module.
308        let (block_requests, sync_peers) = self.sync.prepare_block_requests();
309
310        // If there are no block requests, but there are pending block responses in the sync pool,
311        // then try to advance the ledger using these pending block responses.
312        if block_requests.is_empty() && self.sync.has_pending_responses() {
313            // Try to advance the ledger with the sync pool.
314            trace!("No block requests to send. Will process pending responses.");
315            let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
316                Ok(val) => val,
317                Err(err) => {
318                    error!("{err}");
319                    return;
320                }
321            };
322
323            if has_new_blocks {
324                match self.sync.get_block_locators() {
325                    Ok(locators) => self.ping.update_block_locators(locators),
326                    Err(err) => error!("Failed to get block locators: {err}"),
327                }
328            }
329        } else if block_requests.is_empty() {
330            let total_requests = self.sync.num_total_block_requests();
331            let num_outstanding = self.sync.num_outstanding_block_requests();
332            if total_requests > 0 {
333                trace!(
334                    "Not block synced yet, but there are still {total_requests} in-flight requests. {num_outstanding} are still awaiting responses."
335                );
336            } else {
337                // This can happen during peer rotation and should not be a warning.
338                debug!(
339                    "Not block synced yet, and there are no outstanding block requests or \
340                 new block requests to send"
341                );
342            }
343        } else {
344            self.send_block_requests(block_requests, sync_peers).await;
345        }
346    }
347
348    async fn send_block_requests(
349        &self,
350        block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
351        sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
352    ) {
353        // Issues the block requests in batches.
354        for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
355            if !self.sync.send_block_requests(self, &sync_peers, requests).await {
356                // Stop if we fail to process a batch of requests.
357                break;
358            }
359
360            // Sleep to avoid triggering spam detection.
361            tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
362        }
363    }
364
365    /// Initializes solution verification.
366    fn initialize_solution_verification(&self) {
367        // Start the solution verification loop.
368        let node = self.clone();
369        self.handles.lock().push(tokio::spawn(async move {
370            loop {
371                // If the Ctrl-C handler registered the signal, stop the node.
372                if node.shutdown.load(Acquire) {
373                    info!("Shutting down solution verification");
374                    break;
375                }
376
377                // Determine if the queue contains txs to verify.
378                let queue_is_empty = node.solution_queue.lock().is_empty();
379                // Determine if our verification counter has space to verify new solutions.
380                let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS;
381
382                // Sleep to allow the queue to be filled or solutions to be validated.
383                if queue_is_empty || counter_is_full {
384                    sleep(Duration::from_millis(50)).await;
385                    continue;
386                }
387
388                // Try to verify solutions.
389                let mut solution_queue = node.solution_queue.lock();
390                while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() {
391                    // Increment the verification counter.
392                    let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed);
393                    let _node = node.clone();
394                    // For each solution, spawn a task to verify it.
395                    tokio::task::spawn_blocking(move || {
396                        // Retrieve the latest epoch hash.
397                        if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() {
398                            // Check if the prover has reached their solution limit.
399                            // While snarkVM will ultimately abort any excess solutions for safety, performing this check
400                            // here prevents the to-be aborted solutions from propagating through the network.
401                            let prover_address = solution.address();
402                            if _node.ledger.is_solution_limit_reached(&prover_address, 0) {
403                                debug!("Invalid Solution '{}' - Prover '{prover_address}' has reached their solution limit for the current epoch", fmt_id(solution.id()));
404                            }
405                            // Retrieve the latest proof target.
406                            let proof_target = _node.ledger.latest_block().header().proof_target();
407                            // Ensure that the solution is valid for the given epoch.
408                            let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target);
409
410                            match is_valid {
411                                // If the solution is valid, propagate the `UnconfirmedSolution`.
412                                Ok(()) => {
413                                    let message = Message::UnconfirmedSolution(serialized);
414                                    // Propagate the "UnconfirmedSolution".
415                                    _node.propagate(message, &[peer_ip]);
416                                }
417                                // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore.
418                                Err(error) => {
419                                    if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
420                                        debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}")
421                                    }
422                                }
423                            }
424                        } else {
425                            warn!("Failed to retrieve the latest epoch hash.");
426                        }
427                        // Decrement the verification counter.
428                        _node.num_verifying_solutions.fetch_sub(1, Relaxed);
429                    });
430                    // If we are already at capacity, don't verify more solutions.
431                    if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS {
432                        break;
433                    }
434                }
435            }
436        }));
437    }
438
439    /// Initializes deploy verification.
440    fn initialize_deploy_verification(&self) {
441        // Start the deploy verification loop.
442        let node = self.clone();
443        self.handles.lock().push(tokio::spawn(async move {
444            loop {
445                // If the Ctrl-C handler registered the signal, stop the node.
446                if node.shutdown.load(Acquire) {
447                    info!("Shutting down deployment verification");
448                    break;
449                }
450
451                // Determine if the queue contains txs to verify.
452                let queue_is_empty = node.deploy_queue.lock().is_empty();
453                // Determine if our verification counter has space to verify new txs.
454                let counter_is_full = node.num_verifying_deploys.load(Acquire) >= MAX_PARALLEL_DEPLOY_VERIFICATIONS;
455
456                // Sleep to allow the queue to be filled or transactions to be validated.
457                if queue_is_empty || counter_is_full {
458                    sleep(Duration::from_millis(50)).await;
459                    continue;
460                }
461
462                // Try to verify deployments.
463                while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() {
464                    // Increment the verification counter.
465                    let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed);
466                    let _node = node.clone();
467                    // For each deployment, spawn a task to verify it.
468                    tokio::task::spawn_blocking(move || {
469                        // Check the deployment.
470                        match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
471                            Ok(_) => {
472                                // Propagate the `UnconfirmedTransaction`.
473                                _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
474                            }
475                            Err(error) => {
476                                debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}");
477                            }
478                        }
479                        // Decrement the verification counter.
480                        _node.num_verifying_deploys.fetch_sub(1, Relaxed);
481                    });
482                    // If we are already at capacity, don't verify more deployments.
483                    if previous_counter + 1 >= MAX_PARALLEL_DEPLOY_VERIFICATIONS {
484                        break;
485                    }
486                }
487            }
488        }));
489    }
490
491    /// Initializes execute verification.
492    fn initialize_execute_verification(&self) {
493        // Start the execute verification loop.
494        let node = self.clone();
495        self.handles.lock().push(tokio::spawn(async move {
496            loop {
497                // If the Ctrl-C handler registered the signal, stop the node.
498                if node.shutdown.load(Acquire) {
499                    info!("Shutting down execution verification");
500                    break;
501                }
502
503                // Determine if the queue contains txs to verify.
504                let queue_is_empty = node.execute_queue.lock().is_empty();
505                // Determine if our verification counter has space to verify new txs.
506                let counter_is_full = node.num_verifying_executions.load(Acquire) >= MAX_PARALLEL_EXECUTE_VERIFICATIONS;
507
508                // Sleep to allow the queue to be filled or transactions to be validated.
509                if queue_is_empty || counter_is_full {
510                    sleep(Duration::from_millis(50)).await;
511                    continue;
512                }
513
514                // Try to verify executions.
515                while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() {
516                    // Increment the verification counter.
517                    let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed);
518                    let _node = node.clone();
519                    // For each execution, spawn a task to verify it.
520                    tokio::task::spawn_blocking(move || {
521                        // Check the execution.
522                        match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
523                            Ok(_) => {
524                                // Propagate the `UnconfirmedTransaction`.
525                                _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
526                            }
527                            Err(error) => {
528                                debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}");
529                            }
530                        }
531                        // Decrement the verification counter.
532                        _node.num_verifying_executions.fetch_sub(1, Relaxed);
533                    });
534                    // If we are already at capacity, don't verify more executions.
535                    if previous_counter + 1 >= MAX_PARALLEL_EXECUTE_VERIFICATIONS {
536                        break;
537                    }
538                }
539            }
540        }));
541    }
542
543    /// Spawns a task with the given future; it should only be used for long-running tasks.
544    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
545        self.handles.lock().push(tokio::spawn(future));
546    }
547}
548
549#[async_trait]
550impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
551    /// Shuts down the node.
552    async fn shut_down(&self) {
553        info!("Shutting down...");
554
555        // Shut down the node.
556        trace!("Shutting down the node...");
557        self.shutdown.store(true, std::sync::atomic::Ordering::Release);
558
559        // Abort the tasks.
560        trace!("Shutting down the client...");
561        self.handles.lock().iter().for_each(|handle| handle.abort());
562
563        // Shut down the router.
564        self.router.shut_down().await;
565
566        info!("Node has shut down.");
567    }
568}