Skip to main content

snarkos_node/client/
mod.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::{
19    bft::{events::DataBlocks, helpers::fmt_id, ledger_service::CoreLedgerService, spawn_blocking},
20    cdn::CdnBlockSync,
21    traits::NodeInterface,
22};
23
24use snarkos_account::Account;
25use snarkos_node_network::NodeType;
26use snarkos_node_rest::Rest;
27use snarkos_node_router::{
28    Heartbeat,
29    Inbound,
30    Outbound,
31    Router,
32    Routing,
33    messages::{Message, UnconfirmedSolution, UnconfirmedTransaction},
34};
35use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
36use snarkos_node_tcp::{
37    P2P,
38    protocols::{Disconnect, Handshake, OnConnect, Reading},
39};
40use snarkos_utilities::{NodeDataDir, SignalHandler, Stoppable};
41
42use snarkvm::{
43    console::network::Network,
44    ledger::{
45        Ledger,
46        block::{Block, Header},
47        puzzle::{Puzzle, Solution, SolutionID},
48        store::ConsensusStorage,
49    },
50    prelude::{VM, block::Transaction},
51    utilities::flatten_error,
52};
53
54use aleo_std::StorageMode;
55use anyhow::{Context, Result};
56use core::future::Future;
57use indexmap::IndexMap;
58#[cfg(feature = "locktick")]
59use locktick::parking_lot::Mutex;
60use lru::LruCache;
61#[cfg(not(feature = "locktick"))]
62use parking_lot::Mutex;
63use std::{
64    net::SocketAddr,
65    num::NonZeroUsize,
66    sync::{
67        Arc,
68        atomic::{
69            AtomicUsize,
70            Ordering::{Acquire, Relaxed},
71        },
72    },
73    time::Duration,
74};
75use tokio::{
76    task::JoinHandle,
77    time::{sleep, timeout},
78};
79
80/// The maximum number of solutions to verify in parallel.
81/// Note: worst case memory to verify a solution is 0.5 GiB.
82const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20;
83/// The capacity for storing unconfirmed deployments.
84/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
85const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
86/// The capacity for storing unconfirmed executions.
87/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
88const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
89/// The capacity for storing unconfirmed solutions.
90/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
91const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
92
93/// Transaction details needed for propagation.
94/// We preserve the serialized transaction for faster propagation.
95type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
96/// Solution details needed for propagation.
97/// We preserve the serialized solution for faster propagation.
98type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>);
99
100/// A client node is a full node, capable of querying with the network.
101#[derive(Clone)]
102pub struct Client<N: Network, C: ConsensusStorage<N>> {
103    /// The ledger of the node.
104    ledger: Ledger<N, C>,
105    /// The router of the node.
106    router: Router<N>,
107    /// The REST server of the node.
108    rest: Option<Rest<N, C, Self>>,
109    /// The block synchronization logic.
110    sync: Arc<BlockSync<N>>,
111    /// The genesis block.
112    genesis: Block<N>,
113    /// The puzzle.
114    puzzle: Puzzle<N>,
115    /// The unconfirmed solutions queue.
116    solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>,
117    /// The unconfirmed deployments queue.
118    deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
119    /// The unconfirmed executions queue.
120    execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
121    /// The amount of solutions currently being verified.
122    num_verifying_solutions: Arc<AtomicUsize>,
123    /// The amount of deployments currently being verified.
124    num_verifying_deploys: Arc<AtomicUsize>,
125    /// The amount of executions currently being verified.
126    num_verifying_executions: Arc<AtomicUsize>,
127    /// The spawned handles.
128    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
129    /// Keeps track of sending pings.
130    ping: Arc<Ping<N>>,
131    /// The signal handling logic.
132    signal_handler: Arc<SignalHandler>,
133}
134
135impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
136    /// Initializes a new client node.
137    pub async fn new(
138        node_ip: SocketAddr,
139        rest_ip: Option<SocketAddr>,
140        rest_rps: u32,
141        account: Account<N>,
142        trusted_peers: &[SocketAddr],
143        genesis: Block<N>,
144        cdn: Option<http::Uri>,
145        storage_mode: StorageMode,
146        node_data_dir: NodeDataDir,
147        trusted_peers_only: bool,
148        dev: Option<u16>,
149        signal_handler: Arc<SignalHandler>,
150    ) -> Result<Self> {
151        // Initialize the ledger.
152        let ledger = {
153            let storage_mode = storage_mode.clone();
154            let genesis = genesis.clone();
155
156            spawn_blocking!(Ledger::<N, C>::load(genesis, storage_mode))
157        }
158        .with_context(|| "Failed to initialize the ledger")?;
159
160        // Initialize the ledger service.
161        let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), signal_handler.clone()));
162        // Initialize the node router.
163        let router = Router::new(
164            node_ip,
165            NodeType::Client,
166            account,
167            ledger_service.clone(),
168            trusted_peers,
169            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
170            trusted_peers_only,
171            node_data_dir.clone(),
172            dev.is_some(),
173        )
174        .await?;
175
176        // Initialize the sync module.
177        let sync = Arc::new(BlockSync::new(ledger_service.clone()));
178
179        // Set up the ping logic.
180        let locators = sync.get_block_locators()?;
181        let ping = Arc::new(Ping::new(router.clone(), locators));
182
183        // Initialize the node.
184        let mut node = Self {
185            ledger: ledger.clone(),
186            router,
187            rest: None,
188            sync: sync.clone(),
189            genesis,
190            ping,
191            puzzle: ledger.puzzle().clone(),
192            solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
193            deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))),
194            execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))),
195            num_verifying_solutions: Default::default(),
196            num_verifying_deploys: Default::default(),
197            num_verifying_executions: Default::default(),
198            handles: Default::default(),
199            signal_handler: signal_handler.clone(),
200        };
201
202        // Perform sync with CDN (if enabled).
203        let cdn_sync = cdn.map(|base_url| {
204            trace!("CDN sync is enabled");
205            Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler))
206        });
207
208        // Initialize the REST server.
209        if let Some(rest_ip) = rest_ip {
210            node.rest = Some(
211                Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone()), cdn_sync.clone(), sync)
212                    .await?,
213            );
214        }
215
216        // Set up everything else after CDN sync is done.
217        if let Some(cdn_sync) = cdn_sync {
218            if let Err(error) = cdn_sync.wait().await.with_context(|| "Failed to synchronize from the CDN") {
219                crate::log_clean_error(&storage_mode);
220                node.shut_down().await;
221                return Err(error);
222            }
223        }
224
225        // Initialize the routing.
226        node.initialize_routing().await;
227        // Initialize the sync module.
228        node.initialize_sync();
229        // Initialize solution verification.
230        node.initialize_solution_verification();
231        // Initialize deployment verification.
232        node.initialize_deploy_verification();
233        // Initialize execution verification.
234        node.initialize_execute_verification();
235        // Initialize the notification message loop.
236        node.handles.lock().push(crate::start_notification_message_loop());
237        // Return the node.
238        Ok(node)
239    }
240
241    /// Returns the ledger.
242    pub fn ledger(&self) -> &Ledger<N, C> {
243        &self.ledger
244    }
245
246    /// Returns the REST server.
247    pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
248        &self.rest
249    }
250
251    /// Returns the router.
252    pub fn router(&self) -> &Router<N> {
253        &self.router
254    }
255}
256
257/// Sync-specific code.
258impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
259    /// The maximum time to wait for peer updates before timing out and attempting to issue new requests.
260    /// This only exists as a fallback for the (unlikely) case a task does not get notified about updates.
261    const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
262
263    /// Spawns the tasks that performs the syncing logic for this client.
264    fn initialize_sync(&self) {
265        // Start the block request generation loop (outgoing).
266        let self_ = self.clone();
267        self.spawn(async move {
268            while !self_.signal_handler.is_stopped() {
269                // Perform the sync routine.
270                self_.try_issuing_block_requests().await;
271            }
272
273            info!("Stopped block request generation");
274        });
275
276        // Start the block response processing loop (incoming).
277        let self_ = self.clone();
278        self.spawn(async move {
279            while !self_.signal_handler.is_stopped() {
280                // Wait until there is something to do or until the timeout.
281                let _ = timeout(Self::MAX_SYNC_INTERVAL, self_.sync.wait_for_block_responses()).await;
282
283                // Perform the sync routine.
284                self_.try_advancing_block_synchronization().await;
285
286                // We perform no additional rate limiting here as
287                // requests are already rate-limited.
288            }
289
290            debug!("Stopped block response processing");
291        });
292    }
293
294    /// Client-side version of [`snarkvm_node_bft::Sync::try_advancing_block_synchronization`].
295    async fn try_advancing_block_synchronization(&self) {
296        let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
297            Ok(val) => val,
298            Err(err) => {
299                error!("Block synchronization failed - {err}");
300                return;
301            }
302        };
303
304        // If there are new blocks, we need to update the block locators.
305        if has_new_blocks {
306            match self.sync.get_block_locators() {
307                Ok(locators) => self.ping.update_block_locators(locators),
308                Err(err) => error!("Failed to get block locators: {err}"),
309            }
310        }
311    }
312
313    /// Client-side version of `snarkvm_node_bft::Sync::try_block_sync()`.
314    async fn try_issuing_block_requests(&self) {
315        // Wait for peer updates or timeout
316        let _ = timeout(Self::MAX_SYNC_INTERVAL, self.sync.wait_for_peer_update()).await;
317
318        // For sanity, check that sync height is never below ledger height.
319        // (if the ledger height is lower or equal to the current sync height, this is a noop)
320        self.sync.set_sync_height(self.ledger.latest_height());
321
322        match self.sync.handle_block_request_timeouts(&self.router) {
323            Ok(Some((requests, sync_peers))) => {
324                // Re-request blocks instead of performing regular block sync.
325                self.send_block_requests(requests, sync_peers).await;
326                return;
327            }
328            Ok(None) => {}
329            Err(err) => {
330                // Abort and retry later.
331                error!("{}", flatten_error(&err));
332                return;
333            }
334        }
335
336        // Do not attempt to sync if there are not blocks to sync.
337        // This prevents redundant log messages and performing unnecessary computation.
338        if !self.sync.can_block_sync() {
339            trace!("Nothing to sync. Will not issue new block requests");
340            return;
341        }
342
343        // First, try to advance the ledger with new responses.
344        let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
345            Ok(val) => val,
346            Err(err) => {
347                error!("{err}");
348                return;
349            }
350        };
351
352        if has_new_blocks {
353            match self.sync.get_block_locators() {
354                Ok(locators) => self.ping.update_block_locators(locators),
355                Err(err) => error!("Failed to get block locators: {err}"),
356            }
357
358            // If these were the last blocks to process, do not continue.
359            if !self.sync.can_block_sync() {
360                return;
361            }
362        }
363
364        // Prepare the block requests, if any.
365        // In the process, we update the state of `is_block_synced` for the sync module.
366        let (block_requests, sync_peers) = self.sync.prepare_block_requests();
367
368        // If there are no block requests, but there are pending block responses in the sync pool,
369        // then try to advance the ledger using these pending block responses.
370        if block_requests.is_empty() {
371            let total_requests = self.sync.num_total_block_requests();
372            let num_outstanding = self.sync.num_outstanding_block_requests();
373            if total_requests > 0 {
374                trace!(
375                    "Not block synced yet, but there are still {total_requests} in-flight requests. {num_outstanding} are still awaiting responses."
376                );
377            } else {
378                // This can happen during peer rotation and should not be a warning.
379                debug!(
380                    "Not block synced yet, and there are no outstanding block requests or \
381                 new block requests to send"
382                );
383            }
384        } else {
385            self.send_block_requests(block_requests, sync_peers).await;
386        }
387    }
388
389    async fn send_block_requests(
390        &self,
391        block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
392        sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
393    ) {
394        // Issues the block requests in batches.
395        for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
396            if !self.sync.send_block_requests(self.router(), &sync_peers, requests).await {
397                // Stop if we fail to process a batch of requests.
398                break;
399            }
400
401            // Sleep to avoid triggering spam detection.
402            tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
403        }
404    }
405
406    /// Initializes solution verification.
407    fn initialize_solution_verification(&self) {
408        // Start the solution verification loop.
409        let node = self.clone();
410        self.spawn(async move {
411            loop {
412                // If the Ctrl-C handler registered the signal, stop the node.
413                if node.signal_handler.is_stopped() {
414                    info!("Shutting down solution verification");
415                    break;
416                }
417
418                // Determine if the queue contains txs to verify.
419                let queue_is_empty = node.solution_queue.lock().is_empty();
420                // Determine if our verification counter has space to verify new solutions.
421                let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS;
422
423                // Sleep to allow the queue to be filled or solutions to be validated.
424                if queue_is_empty || counter_is_full {
425                    sleep(Duration::from_millis(50)).await;
426                    continue;
427                }
428
429                // Try to verify solutions.
430                let mut solution_queue = node.solution_queue.lock();
431                while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() {
432                    // Increment the verification counter.
433                    let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed);
434                    let _node = node.clone();
435                    // For each solution, spawn a task to verify it.
436                    tokio::task::spawn_blocking(move || {
437                        // Retrieve the latest epoch hash.
438                        if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() {
439                            // Check if the prover has reached their solution limit.
440                            // While snarkVM will ultimately abort any excess solutions for safety, performing this check
441                            // here prevents the to-be aborted solutions from propagating through the network.
442                            let prover_address = solution.address();
443                            if _node.ledger.is_solution_limit_reached(&prover_address, 0) {
444                                debug!("Invalid Solution '{}' - Prover '{prover_address}' has reached their solution limit for the current epoch", fmt_id(solution.id()));
445                            }
446                            // Retrieve the latest proof target.
447                            let proof_target = _node.ledger.latest_block().header().proof_target();
448                            // Ensure that the solution is valid for the given epoch.
449                            let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target);
450
451                            match is_valid {
452                                // If the solution is valid, propagate the `UnconfirmedSolution`.
453                                Ok(()) => {
454                                    let message = Message::UnconfirmedSolution(serialized);
455                                    // Propagate the "UnconfirmedSolution".
456                                    _node.propagate(message, &[peer_ip]);
457                                }
458                                // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore.
459                                Err(error) => {
460                                    if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
461                                        debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}")
462                                    }
463                                }
464                            }
465                        } else {
466                            warn!("Failed to retrieve the latest epoch hash.");
467                        }
468                        // Decrement the verification counter.
469                        _node.num_verifying_solutions.fetch_sub(1, Relaxed);
470                    });
471                    // If we are already at capacity, don't verify more solutions.
472                    if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS {
473                        break;
474                    }
475                }
476            }
477        });
478    }
479
480    /// Initializes deploy verification.
481    fn initialize_deploy_verification(&self) {
482        // Start the deploy verification loop.
483        let node = self.clone();
484        self.spawn(async move {
485            loop {
486                // If the Ctrl-C handler registered the signal, stop the node.
487                if node.signal_handler.is_stopped() {
488                    info!("Shutting down deployment verification");
489                    break;
490                }
491
492                // Determine if the queue contains txs to verify.
493                let queue_is_empty = node.deploy_queue.lock().is_empty();
494                // Determine if our verification counter has space to verify new txs.
495                let counter_is_full =
496                    node.num_verifying_deploys.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS;
497
498                // Sleep to allow the queue to be filled or transactions to be validated.
499                if queue_is_empty || counter_is_full {
500                    sleep(Duration::from_millis(50)).await;
501                    continue;
502                }
503
504                // Try to verify deployments.
505                while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() {
506                    // Increment the verification counter.
507                    let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed);
508                    let _node = node.clone();
509                    // For each deployment, spawn a task to verify it.
510                    tokio::task::spawn_blocking(move || {
511                        // First collect the state root.
512                        let Some(state_root) = transaction.fee_transition().map(|t| t.global_state_root()) else {
513                            debug!("Failed to access global state root for deployment from peer_ip {peer_ip}");
514                            _node.num_verifying_deploys.fetch_sub(1, Relaxed);
515                            return;
516                        };
517                        // Check if the state root is in the ledger.
518                        if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
519                            debug!("Failed to find global state root for deployment from peer_ip {peer_ip}, propagating anyway");
520                            // Propagate the `UnconfirmedTransaction`.
521                            _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
522                            _node.num_verifying_deploys.fetch_sub(1, Relaxed);
523                            return;
524                            // Also skip the `check_transaction_basic` call if it is already propagated.
525                        }
526                        // Check the deployment.
527                        match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
528                            Ok(_) => {
529                                // Propagate the `UnconfirmedTransaction`.
530                                _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
531                            }
532                            Err(error) => {
533                                debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}");
534                            }
535                        }
536                        // Decrement the verification counter.
537                        _node.num_verifying_deploys.fetch_sub(1, Relaxed);
538                    });
539                    // If we are already at capacity, don't verify more deployments.
540                    if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS {
541                        break;
542                    }
543                }
544            }
545        });
546    }
547
548    /// Initializes execute verification.
549    fn initialize_execute_verification(&self) {
550        // Start the execute verification loop.
551        let node = self.clone();
552        self.spawn(async move {
553            loop {
554                // If the Ctrl-C handler registered the signal, stop the node.
555                if node.signal_handler.is_stopped() {
556                    info!("Shutting down execution verification");
557                    break;
558                }
559
560                // Determine if the queue contains txs to verify.
561                let queue_is_empty = node.execute_queue.lock().is_empty();
562                // Determine if our verification counter has space to verify new txs.
563                let counter_is_full =
564                    node.num_verifying_executions.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS;
565
566                // Sleep to allow the queue to be filled or transactions to be validated.
567                if queue_is_empty || counter_is_full {
568                    sleep(Duration::from_millis(50)).await;
569                    continue;
570                }
571
572                // Try to verify executions.
573                while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() {
574                    // Increment the verification counter.
575                    let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed);
576                    let _node = node.clone();
577                    // For each execution, spawn a task to verify it.
578                    tokio::task::spawn_blocking(move || {
579                        // First collect the state roots.
580                        let state_roots = [
581                            transaction.execution().map(|t| t.global_state_root()),
582                            transaction.fee_transition().map(|t| t.global_state_root()),
583                        ]
584                        .into_iter()
585                        .flatten();
586
587                        for state_root in state_roots {
588                            if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
589                                debug!("Failed to find global state root for execution from peer_ip {peer_ip}, propagating anyway");
590                                // Propagate the `UnconfirmedTransaction`.
591                                _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
592                                _node.num_verifying_executions.fetch_sub(1, Relaxed);
593                                return;
594                                // Also skip the `check_transaction_basic` call if it is already propagated.
595                            }
596                        }
597                        // Check the execution.
598                        match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
599                            Ok(_) => {
600                                // Propagate the `UnconfirmedTransaction`.
601                                _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
602                            }
603                            Err(error) => {
604                                debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}");
605                            }
606                        }
607                        // Decrement the verification counter.
608                        _node.num_verifying_executions.fetch_sub(1, Relaxed);
609                    });
610                    // If we are already at capacity, don't verify more executions.
611                    if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS {
612                        break;
613                    }
614                }
615            }
616        });
617    }
618
619    /// Spawns a task with the given future; it should only be used for long-running tasks.
620    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
621        self.handles.lock().push(tokio::spawn(future));
622    }
623}
624
625#[async_trait]
626impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
627    /// Shuts down the node.
628    async fn shut_down(&self) {
629        info!("Shutting down...");
630
631        // Shut down the node.
632        trace!("Shutting down the node...");
633
634        // Abort the tasks.
635        trace!("Shutting down the client...");
636        self.handles.lock().iter().for_each(|handle| handle.abort());
637
638        // Shut down the router.
639        self.router.shut_down().await;
640
641        info!("Node has shut down.");
642    }
643}