snarkos_node/client/
mod.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19use snarkos_account::Account;
20use snarkos_node_bft::ledger_service::CoreLedgerService;
21use snarkos_node_rest::Rest;
22use snarkos_node_router::{
23    Heartbeat,
24    Inbound,
25    Outbound,
26    Router,
27    Routing,
28    messages::{Message, NodeType, UnconfirmedSolution, UnconfirmedTransaction},
29};
30use snarkos_node_sync::{BlockSync, BlockSyncMode};
31use snarkos_node_tcp::{
32    P2P,
33    protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
34};
35use snarkvm::{
36    console::network::Network,
37    ledger::{
38        Ledger,
39        block::{Block, Header},
40        puzzle::{Puzzle, Solution, SolutionID},
41        store::ConsensusStorage,
42    },
43    prelude::block::Transaction,
44};
45
46use aleo_std::StorageMode;
47use anyhow::Result;
48use core::future::Future;
49use lru::LruCache;
50use parking_lot::Mutex;
51use std::{
52    net::SocketAddr,
53    num::NonZeroUsize,
54    sync::{
55        Arc,
56        atomic::{
57            AtomicBool,
58            AtomicUsize,
59            Ordering::{Acquire, Relaxed},
60        },
61    },
62    time::Duration,
63};
64use tokio::{task::JoinHandle, time::sleep};
65
66/// The maximum number of deployments to verify in parallel.
67/// Note: worst case memory to verify a deployment (MAX_DEPLOYMENT_CONSTRAINTS = 1 << 20) is ~2 GiB.
68const MAX_PARALLEL_DEPLOY_VERIFICATIONS: usize = 5;
69/// The maximum number of executions to verify in parallel.
70/// Note: worst case memory to verify an execution is 0.01 GiB.
71const MAX_PARALLEL_EXECUTE_VERIFICATIONS: usize = 1000;
72/// The maximum number of solutions to verify in parallel.
73/// Note: worst case memory to verify a solution is 0.5 GiB.
74const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20;
75/// The capacity for storing unconfirmed deployments.
76/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
77const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
78/// The capacity for storing unconfirmed executions.
79/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
80const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
81/// The capacity for storing unconfirmed solutions.
82/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
83const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
84
85/// Transaction details needed for propagation.
86/// We preserve the serialized transaction for faster propagation.
87type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
88/// Solution details needed for propagation.
89/// We preserve the serialized solution for faster propagation.
90type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>);
91
92/// A client node is a full node, capable of querying with the network.
93#[derive(Clone)]
94pub struct Client<N: Network, C: ConsensusStorage<N>> {
95    /// The ledger of the node.
96    ledger: Ledger<N, C>,
97    /// The router of the node.
98    router: Router<N>,
99    /// The REST server of the node.
100    rest: Option<Rest<N, C, Self>>,
101    /// The sync module.
102    sync: Arc<BlockSync<N>>,
103    /// The genesis block.
104    genesis: Block<N>,
105    /// The puzzle.
106    puzzle: Puzzle<N>,
107    /// The unconfirmed solutions queue.
108    solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>,
109    /// The unconfirmed deployments queue.
110    deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
111    /// The unconfirmed executions queue.
112    execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
113    /// The amount of solutions currently being verified.
114    num_verifying_solutions: Arc<AtomicUsize>,
115    /// The amount of deployments currently being verified.
116    num_verifying_deploys: Arc<AtomicUsize>,
117    /// The amount of executions currently being verified.
118    num_verifying_executions: Arc<AtomicUsize>,
119    /// The spawned handles.
120    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
121    /// The shutdown signal.
122    shutdown: Arc<AtomicBool>,
123}
124
125impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
126    /// Initializes a new client node.
127    pub async fn new(
128        node_ip: SocketAddr,
129        rest_ip: Option<SocketAddr>,
130        rest_rps: u32,
131        account: Account<N>,
132        trusted_peers: &[SocketAddr],
133        genesis: Block<N>,
134        cdn: Option<String>,
135        storage_mode: StorageMode,
136        rotate_external_peers: bool,
137        shutdown: Arc<AtomicBool>,
138    ) -> Result<Self> {
139        // Initialize the signal handler.
140        let signal_node = Self::handle_signals(shutdown.clone());
141
142        // Initialize the ledger.
143        let ledger = Ledger::<N, C>::load(genesis.clone(), storage_mode.clone())?;
144
145        // Initialize the CDN.
146        if let Some(base_url) = cdn {
147            // Sync the ledger with the CDN.
148            if let Err((_, error)) =
149                snarkos_node_cdn::sync_ledger_with_cdn(&base_url, ledger.clone(), shutdown.clone()).await
150            {
151                crate::log_clean_error(&storage_mode);
152                return Err(error);
153            }
154        }
155
156        // Initialize the ledger service.
157        let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), shutdown.clone()));
158        // Determine if the client should allow external peers.
159        let allow_external_peers = true;
160
161        // Initialize the node router.
162        let router = Router::new(
163            node_ip,
164            NodeType::Client,
165            account,
166            trusted_peers,
167            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
168            rotate_external_peers,
169            allow_external_peers,
170            matches!(storage_mode, StorageMode::Development(_)),
171        )
172        .await?;
173
174        // Initialize the sync module.
175        let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone());
176
177        // Initialize the node.
178        let mut node = Self {
179            ledger: ledger.clone(),
180            router,
181            rest: None,
182            sync: Arc::new(sync),
183            genesis,
184            puzzle: ledger.puzzle().clone(),
185            solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
186            deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))),
187            execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))),
188            num_verifying_solutions: Default::default(),
189            num_verifying_deploys: Default::default(),
190            num_verifying_executions: Default::default(),
191            handles: Default::default(),
192            shutdown,
193        };
194
195        // Initialize the REST server.
196        if let Some(rest_ip) = rest_ip {
197            node.rest = Some(Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone())).await?);
198        }
199        // Initialize the routing.
200        node.initialize_routing().await;
201        // Initialize the sync module.
202        node.initialize_sync();
203        // Initialize solution verification.
204        node.initialize_solution_verification();
205        // Initialize deployment verification.
206        node.initialize_deploy_verification();
207        // Initialize execution verification.
208        node.initialize_execute_verification();
209        // Initialize the notification message loop.
210        node.handles.lock().push(crate::start_notification_message_loop());
211        // Pass the node to the signal handler.
212        let _ = signal_node.set(node.clone());
213        // Return the node.
214        Ok(node)
215    }
216
217    /// Returns the ledger.
218    pub fn ledger(&self) -> &Ledger<N, C> {
219        &self.ledger
220    }
221
222    /// Returns the REST server.
223    pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
224        &self.rest
225    }
226}
227
228impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
229    /// Initializes the sync pool.
230    fn initialize_sync(&self) {
231        // Start the sync loop.
232        let node = self.clone();
233        self.handles.lock().push(tokio::spawn(async move {
234            loop {
235                // If the Ctrl-C handler registered the signal, stop the node.
236                if node.shutdown.load(std::sync::atomic::Ordering::Acquire) {
237                    info!("Shutting down block production");
238                    break;
239                }
240
241                // Sleep briefly to avoid triggering spam detection.
242                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
243                // Perform the sync routine.
244                node.sync.try_block_sync(&node).await;
245            }
246        }));
247    }
248
249    /// Initializes solution verification.
250    fn initialize_solution_verification(&self) {
251        // Start the solution verification loop.
252        let node = self.clone();
253        self.handles.lock().push(tokio::spawn(async move {
254            loop {
255                // If the Ctrl-C handler registered the signal, stop the node.
256                if node.shutdown.load(Acquire) {
257                    info!("Shutting down solution verification");
258                    break;
259                }
260
261                // Determine if the queue contains txs to verify.
262                let queue_is_empty = node.solution_queue.lock().is_empty();
263                // Determine if our verification counter has space to verify new solutions.
264                let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS;
265
266                // Sleep to allow the queue to be filled or solutions to be validated.
267                if queue_is_empty || counter_is_full {
268                    sleep(Duration::from_millis(50)).await;
269                    continue;
270                }
271
272                // Try to verify solutions.
273                let mut solution_queue = node.solution_queue.lock();
274                while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() {
275                    // Increment the verification counter.
276                    let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed);
277                    let _node = node.clone();
278                    // For each solution, spawn a task to verify it.
279                    tokio::task::spawn_blocking(move || {
280                        // Retrieve the latest epoch hash.
281                        if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() {
282                            // Retrieve the latest proof target.
283                            let proof_target = _node.ledger.latest_block().header().proof_target();
284                            // Ensure that the solution is valid for the given epoch.
285                            let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target);
286
287                            match is_valid {
288                                // If the solution is valid, propagate the `UnconfirmedSolution`.
289                                Ok(()) => {
290                                    let message = Message::UnconfirmedSolution(serialized);
291                                    // Propagate the "UnconfirmedSolution".
292                                    _node.propagate(message, &[peer_ip]);
293                                }
294                                // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore.
295                                Err(error) => {
296                                    if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
297                                        debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}")
298                                    }
299                                }
300                            }
301                        } else {
302                            warn!("Failed to retrieve the latest epoch hash.");
303                        }
304                        // Decrement the verification counter.
305                        _node.num_verifying_solutions.fetch_sub(1, Relaxed);
306                    });
307                    // If we are already at capacity, don't verify more solutions.
308                    if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS {
309                        break;
310                    }
311                }
312            }
313        }));
314    }
315
316    /// Initializes deploy verification.
317    fn initialize_deploy_verification(&self) {
318        // Start the deploy verification loop.
319        let node = self.clone();
320        self.handles.lock().push(tokio::spawn(async move {
321            loop {
322                // If the Ctrl-C handler registered the signal, stop the node.
323                if node.shutdown.load(Acquire) {
324                    info!("Shutting down deployment verification");
325                    break;
326                }
327
328                // Determine if the queue contains txs to verify.
329                let queue_is_empty = node.deploy_queue.lock().is_empty();
330                // Determine if our verification counter has space to verify new txs.
331                let counter_is_full = node.num_verifying_deploys.load(Acquire) >= MAX_PARALLEL_DEPLOY_VERIFICATIONS;
332
333                // Sleep to allow the queue to be filled or transactions to be validated.
334                if queue_is_empty || counter_is_full {
335                    sleep(Duration::from_millis(50)).await;
336                    continue;
337                }
338
339                // Try to verify deployments.
340                while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() {
341                    // Increment the verification counter.
342                    let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed);
343                    let _node = node.clone();
344                    // For each deployment, spawn a task to verify it.
345                    tokio::task::spawn_blocking(move || {
346                        // Check the deployment.
347                        match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
348                            Ok(_) => {
349                                // Propagate the `UnconfirmedTransaction`.
350                                _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
351                            }
352                            Err(error) => {
353                                debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}");
354                            }
355                        }
356                        // Decrement the verification counter.
357                        _node.num_verifying_deploys.fetch_sub(1, Relaxed);
358                    });
359                    // If we are already at capacity, don't verify more deployments.
360                    if previous_counter + 1 >= MAX_PARALLEL_DEPLOY_VERIFICATIONS {
361                        break;
362                    }
363                }
364            }
365        }));
366    }
367
368    /// Initializes execute verification.
369    fn initialize_execute_verification(&self) {
370        // Start the execute verification loop.
371        let node = self.clone();
372        self.handles.lock().push(tokio::spawn(async move {
373            loop {
374                // If the Ctrl-C handler registered the signal, stop the node.
375                if node.shutdown.load(Acquire) {
376                    info!("Shutting down execution verification");
377                    break;
378                }
379
380                // Determine if the queue contains txs to verify.
381                let queue_is_empty = node.execute_queue.lock().is_empty();
382                // Determine if our verification counter has space to verify new txs.
383                let counter_is_full = node.num_verifying_executions.load(Acquire) >= MAX_PARALLEL_EXECUTE_VERIFICATIONS;
384
385                // Sleep to allow the queue to be filled or transactions to be validated.
386                if queue_is_empty || counter_is_full {
387                    sleep(Duration::from_millis(50)).await;
388                    continue;
389                }
390
391                // Try to verify executions.
392                while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() {
393                    // Increment the verification counter.
394                    let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed);
395                    let _node = node.clone();
396                    // For each execution, spawn a task to verify it.
397                    tokio::task::spawn_blocking(move || {
398                        // Check the execution.
399                        match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
400                            Ok(_) => {
401                                // Propagate the `UnconfirmedTransaction`.
402                                _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
403                            }
404                            Err(error) => {
405                                debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}");
406                            }
407                        }
408                        // Decrement the verification counter.
409                        _node.num_verifying_executions.fetch_sub(1, Relaxed);
410                    });
411                    // If we are already at capacity, don't verify more executions.
412                    if previous_counter + 1 >= MAX_PARALLEL_EXECUTE_VERIFICATIONS {
413                        break;
414                    }
415                }
416            }
417        }));
418    }
419
420    /// Spawns a task with the given future; it should only be used for long-running tasks.
421    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
422        self.handles.lock().push(tokio::spawn(future));
423    }
424}
425
426#[async_trait]
427impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
428    /// Shuts down the node.
429    async fn shut_down(&self) {
430        info!("Shutting down...");
431
432        // Shut down the node.
433        trace!("Shutting down the node...");
434        self.shutdown.store(true, std::sync::atomic::Ordering::Release);
435
436        // Abort the tasks.
437        trace!("Shutting down the validator...");
438        self.handles.lock().iter().for_each(|handle| handle.abort());
439
440        // Shut down the router.
441        self.router.shut_down().await;
442
443        info!("Node has shut down.");
444    }
445}