snarkos_node/client/
mod.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19use snarkos_account::Account;
20use snarkos_node_bft::ledger_service::CoreLedgerService;
21use snarkos_node_rest::Rest;
22use snarkos_node_router::{
23    Heartbeat,
24    Inbound,
25    Outbound,
26    Router,
27    Routing,
28    messages::{Message, NodeType, UnconfirmedSolution, UnconfirmedTransaction},
29};
30use snarkos_node_sync::{BlockSync, BlockSyncMode};
31use snarkos_node_tcp::{
32    P2P,
33    protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
34};
35use snarkvm::{
36    console::network::Network,
37    ledger::{
38        Ledger,
39        block::{Block, Header},
40        puzzle::{Puzzle, Solution, SolutionID},
41        store::ConsensusStorage,
42    },
43    prelude::block::Transaction,
44};
45
46use aleo_std::StorageMode;
47use anyhow::Result;
48use core::future::Future;
49#[cfg(feature = "locktick")]
50use locktick::parking_lot::Mutex;
51use lru::LruCache;
52#[cfg(not(feature = "locktick"))]
53use parking_lot::Mutex;
54use std::{
55    net::SocketAddr,
56    num::NonZeroUsize,
57    sync::{
58        Arc,
59        atomic::{
60            AtomicBool,
61            AtomicUsize,
62            Ordering::{Acquire, Relaxed},
63        },
64    },
65    time::Duration,
66};
67use tokio::{task::JoinHandle, time::sleep};
68
69/// The maximum number of deployments to verify in parallel.
70/// Note: worst case memory to verify a deployment (MAX_DEPLOYMENT_CONSTRAINTS = 1 << 20) is ~2 GiB.
71const MAX_PARALLEL_DEPLOY_VERIFICATIONS: usize = 5;
72/// The maximum number of executions to verify in parallel.
73/// Note: worst case memory to verify an execution is 0.01 GiB.
74const MAX_PARALLEL_EXECUTE_VERIFICATIONS: usize = 1000;
75/// The maximum number of solutions to verify in parallel.
76/// Note: worst case memory to verify a solution is 0.5 GiB.
77const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20;
78/// The capacity for storing unconfirmed deployments.
79/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
80const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
81/// The capacity for storing unconfirmed executions.
82/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
83const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
84/// The capacity for storing unconfirmed solutions.
85/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
86const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
87
88/// Transaction details needed for propagation.
89/// We preserve the serialized transaction for faster propagation.
90type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
91/// Solution details needed for propagation.
92/// We preserve the serialized solution for faster propagation.
93type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>);
94
95/// A client node is a full node, capable of querying with the network.
96#[derive(Clone)]
97pub struct Client<N: Network, C: ConsensusStorage<N>> {
98    /// The ledger of the node.
99    ledger: Ledger<N, C>,
100    /// The router of the node.
101    router: Router<N>,
102    /// The REST server of the node.
103    rest: Option<Rest<N, C, Self>>,
104    /// The sync module.
105    sync: Arc<BlockSync<N>>,
106    /// The genesis block.
107    genesis: Block<N>,
108    /// The puzzle.
109    puzzle: Puzzle<N>,
110    /// The unconfirmed solutions queue.
111    solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>,
112    /// The unconfirmed deployments queue.
113    deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
114    /// The unconfirmed executions queue.
115    execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
116    /// The amount of solutions currently being verified.
117    num_verifying_solutions: Arc<AtomicUsize>,
118    /// The amount of deployments currently being verified.
119    num_verifying_deploys: Arc<AtomicUsize>,
120    /// The amount of executions currently being verified.
121    num_verifying_executions: Arc<AtomicUsize>,
122    /// The spawned handles.
123    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
124    /// The shutdown signal.
125    shutdown: Arc<AtomicBool>,
126}
127
128impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
129    /// Initializes a new client node.
130    pub async fn new(
131        node_ip: SocketAddr,
132        rest_ip: Option<SocketAddr>,
133        rest_rps: u32,
134        account: Account<N>,
135        trusted_peers: &[SocketAddr],
136        genesis: Block<N>,
137        cdn: Option<String>,
138        storage_mode: StorageMode,
139        rotate_external_peers: bool,
140        shutdown: Arc<AtomicBool>,
141    ) -> Result<Self> {
142        // Initialize the signal handler.
143        let signal_node = Self::handle_signals(shutdown.clone());
144
145        // Initialize the ledger.
146        let ledger = Ledger::<N, C>::load(genesis.clone(), storage_mode.clone())?;
147
148        // Initialize the CDN.
149        if let Some(base_url) = cdn {
150            // Sync the ledger with the CDN.
151            if let Err((_, error)) =
152                snarkos_node_cdn::sync_ledger_with_cdn(&base_url, ledger.clone(), shutdown.clone()).await
153            {
154                crate::log_clean_error(&storage_mode);
155                return Err(error);
156            }
157        }
158
159        // Initialize the ledger service.
160        let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), shutdown.clone()));
161        // Determine if the client should allow external peers.
162        let allow_external_peers = true;
163
164        // Initialize the node router.
165        let router = Router::new(
166            node_ip,
167            NodeType::Client,
168            account,
169            ledger_service.clone(),
170            trusted_peers,
171            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
172            rotate_external_peers,
173            allow_external_peers,
174            matches!(storage_mode, StorageMode::Development(_)),
175        )
176        .await?;
177
178        // Initialize the sync module.
179        let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone());
180
181        // Initialize the node.
182        let mut node = Self {
183            ledger: ledger.clone(),
184            router,
185            rest: None,
186            sync: Arc::new(sync),
187            genesis,
188            puzzle: ledger.puzzle().clone(),
189            solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
190            deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))),
191            execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))),
192            num_verifying_solutions: Default::default(),
193            num_verifying_deploys: Default::default(),
194            num_verifying_executions: Default::default(),
195            handles: Default::default(),
196            shutdown,
197        };
198
199        // Initialize the REST server.
200        if let Some(rest_ip) = rest_ip {
201            node.rest = Some(Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone())).await?);
202        }
203        // Initialize the routing.
204        node.initialize_routing().await;
205        // Initialize the sync module.
206        node.initialize_sync();
207        // Initialize solution verification.
208        node.initialize_solution_verification();
209        // Initialize deployment verification.
210        node.initialize_deploy_verification();
211        // Initialize execution verification.
212        node.initialize_execute_verification();
213        // Initialize the notification message loop.
214        node.handles.lock().push(crate::start_notification_message_loop());
215        // Pass the node to the signal handler.
216        let _ = signal_node.set(node.clone());
217        // Return the node.
218        Ok(node)
219    }
220
221    /// Returns the ledger.
222    pub fn ledger(&self) -> &Ledger<N, C> {
223        &self.ledger
224    }
225
226    /// Returns the REST server.
227    pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
228        &self.rest
229    }
230}
231
232impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
233    /// Initializes the sync pool.
234    fn initialize_sync(&self) {
235        // Start the sync loop.
236        let node = self.clone();
237        self.handles.lock().push(tokio::spawn(async move {
238            loop {
239                // If the Ctrl-C handler registered the signal, stop the node.
240                if node.shutdown.load(std::sync::atomic::Ordering::Acquire) {
241                    info!("Shutting down block production");
242                    break;
243                }
244
245                // Sleep briefly to avoid triggering spam detection.
246                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
247                // Perform the sync routine.
248                node.sync.try_block_sync(&node).await;
249            }
250        }));
251    }
252
253    /// Initializes solution verification.
254    fn initialize_solution_verification(&self) {
255        // Start the solution verification loop.
256        let node = self.clone();
257        self.handles.lock().push(tokio::spawn(async move {
258            loop {
259                // If the Ctrl-C handler registered the signal, stop the node.
260                if node.shutdown.load(Acquire) {
261                    info!("Shutting down solution verification");
262                    break;
263                }
264
265                // Determine if the queue contains txs to verify.
266                let queue_is_empty = node.solution_queue.lock().is_empty();
267                // Determine if our verification counter has space to verify new solutions.
268                let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS;
269
270                // Sleep to allow the queue to be filled or solutions to be validated.
271                if queue_is_empty || counter_is_full {
272                    sleep(Duration::from_millis(50)).await;
273                    continue;
274                }
275
276                // Try to verify solutions.
277                let mut solution_queue = node.solution_queue.lock();
278                while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() {
279                    // Increment the verification counter.
280                    let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed);
281                    let _node = node.clone();
282                    // For each solution, spawn a task to verify it.
283                    tokio::task::spawn_blocking(move || {
284                        // Retrieve the latest epoch hash.
285                        if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() {
286                            // Retrieve the latest proof target.
287                            let proof_target = _node.ledger.latest_block().header().proof_target();
288                            // Ensure that the solution is valid for the given epoch.
289                            let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target);
290
291                            match is_valid {
292                                // If the solution is valid, propagate the `UnconfirmedSolution`.
293                                Ok(()) => {
294                                    let message = Message::UnconfirmedSolution(serialized);
295                                    // Propagate the "UnconfirmedSolution".
296                                    _node.propagate(message, &[peer_ip]);
297                                }
298                                // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore.
299                                Err(error) => {
300                                    if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
301                                        debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}")
302                                    }
303                                }
304                            }
305                        } else {
306                            warn!("Failed to retrieve the latest epoch hash.");
307                        }
308                        // Decrement the verification counter.
309                        _node.num_verifying_solutions.fetch_sub(1, Relaxed);
310                    });
311                    // If we are already at capacity, don't verify more solutions.
312                    if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS {
313                        break;
314                    }
315                }
316            }
317        }));
318    }
319
320    /// Initializes deploy verification.
321    fn initialize_deploy_verification(&self) {
322        // Start the deploy verification loop.
323        let node = self.clone();
324        self.handles.lock().push(tokio::spawn(async move {
325            loop {
326                // If the Ctrl-C handler registered the signal, stop the node.
327                if node.shutdown.load(Acquire) {
328                    info!("Shutting down deployment verification");
329                    break;
330                }
331
332                // Determine if the queue contains txs to verify.
333                let queue_is_empty = node.deploy_queue.lock().is_empty();
334                // Determine if our verification counter has space to verify new txs.
335                let counter_is_full = node.num_verifying_deploys.load(Acquire) >= MAX_PARALLEL_DEPLOY_VERIFICATIONS;
336
337                // Sleep to allow the queue to be filled or transactions to be validated.
338                if queue_is_empty || counter_is_full {
339                    sleep(Duration::from_millis(50)).await;
340                    continue;
341                }
342
343                // Try to verify deployments.
344                while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() {
345                    // Increment the verification counter.
346                    let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed);
347                    let _node = node.clone();
348                    // For each deployment, spawn a task to verify it.
349                    tokio::task::spawn_blocking(move || {
350                        // Check the deployment.
351                        match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
352                            Ok(_) => {
353                                // Propagate the `UnconfirmedTransaction`.
354                                _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
355                            }
356                            Err(error) => {
357                                debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}");
358                            }
359                        }
360                        // Decrement the verification counter.
361                        _node.num_verifying_deploys.fetch_sub(1, Relaxed);
362                    });
363                    // If we are already at capacity, don't verify more deployments.
364                    if previous_counter + 1 >= MAX_PARALLEL_DEPLOY_VERIFICATIONS {
365                        break;
366                    }
367                }
368            }
369        }));
370    }
371
372    /// Initializes execute verification.
373    fn initialize_execute_verification(&self) {
374        // Start the execute verification loop.
375        let node = self.clone();
376        self.handles.lock().push(tokio::spawn(async move {
377            loop {
378                // If the Ctrl-C handler registered the signal, stop the node.
379                if node.shutdown.load(Acquire) {
380                    info!("Shutting down execution verification");
381                    break;
382                }
383
384                // Determine if the queue contains txs to verify.
385                let queue_is_empty = node.execute_queue.lock().is_empty();
386                // Determine if our verification counter has space to verify new txs.
387                let counter_is_full = node.num_verifying_executions.load(Acquire) >= MAX_PARALLEL_EXECUTE_VERIFICATIONS;
388
389                // Sleep to allow the queue to be filled or transactions to be validated.
390                if queue_is_empty || counter_is_full {
391                    sleep(Duration::from_millis(50)).await;
392                    continue;
393                }
394
395                // Try to verify executions.
396                while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() {
397                    // Increment the verification counter.
398                    let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed);
399                    let _node = node.clone();
400                    // For each execution, spawn a task to verify it.
401                    tokio::task::spawn_blocking(move || {
402                        // Check the execution.
403                        match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
404                            Ok(_) => {
405                                // Propagate the `UnconfirmedTransaction`.
406                                _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
407                            }
408                            Err(error) => {
409                                debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}");
410                            }
411                        }
412                        // Decrement the verification counter.
413                        _node.num_verifying_executions.fetch_sub(1, Relaxed);
414                    });
415                    // If we are already at capacity, don't verify more executions.
416                    if previous_counter + 1 >= MAX_PARALLEL_EXECUTE_VERIFICATIONS {
417                        break;
418                    }
419                }
420            }
421        }));
422    }
423
424    /// Spawns a task with the given future; it should only be used for long-running tasks.
425    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
426        self.handles.lock().push(tokio::spawn(future));
427    }
428}
429
430#[async_trait]
431impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
432    /// Shuts down the node.
433    async fn shut_down(&self) {
434        info!("Shutting down...");
435
436        // Shut down the node.
437        trace!("Shutting down the node...");
438        self.shutdown.store(true, std::sync::atomic::Ordering::Release);
439
440        // Abort the tasks.
441        trace!("Shutting down the validator...");
442        self.handles.lock().iter().for_each(|handle| handle.abort());
443
444        // Shut down the router.
445        self.router.shut_down().await;
446
447        info!("Node has shut down.");
448    }
449}