snarkos_node_router/
lib.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate async_trait;
20#[macro_use]
21extern crate tracing;
22
23pub use snarkos_node_router_messages as messages;
24
25mod handshake;
26
27mod heartbeat;
28pub use heartbeat::*;
29
30mod helpers;
31pub use helpers::*;
32
33mod inbound;
34pub use inbound::*;
35
36mod outbound;
37pub use outbound::*;
38
39mod routing;
40pub use routing::*;
41
42use crate::messages::{Message, NodeType};
43use snarkos_account::Account;
44use snarkos_node_bft_ledger_service::LedgerService;
45use snarkos_node_tcp::{Config, P2P, Tcp, is_bogon_ip, is_unspecified_or_broadcast_ip};
46
47use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};
48
49use anyhow::{Result, bail};
50#[cfg(feature = "locktick")]
51use locktick::parking_lot::{Mutex, RwLock};
52#[cfg(not(feature = "locktick"))]
53use parking_lot::{Mutex, RwLock};
54#[cfg(not(any(test)))]
55use std::net::IpAddr;
56use std::{
57    collections::{HashMap, HashSet, hash_map::Entry},
58    future::Future,
59    net::SocketAddr,
60    ops::Deref,
61    str::FromStr,
62    sync::Arc,
63    time::Instant,
64};
65use tokio::task::JoinHandle;
66
67#[derive(Clone)]
68pub struct Router<N: Network>(Arc<InnerRouter<N>>);
69
70impl<N: Network> Deref for Router<N> {
71    type Target = Arc<InnerRouter<N>>;
72
73    fn deref(&self) -> &Self::Target {
74        &self.0
75    }
76}
77
78pub struct InnerRouter<N: Network> {
79    /// The TCP stack.
80    tcp: Tcp,
81    /// The node type.
82    node_type: NodeType,
83    /// The account of the node.
84    account: Account<N>,
85    /// The ledger service.
86    ledger: Arc<dyn LedgerService<N>>,
87    /// The cache.
88    cache: Cache<N>,
89    /// The resolver.
90    resolver: Resolver,
91    /// The set of trusted peers.
92    trusted_peers: HashSet<SocketAddr>,
93    /// The map of connected peer IPs to their peer handlers.
94    connected_peers: RwLock<HashMap<SocketAddr, Peer<N>>>,
95    /// The set of handshaking peers. While `Tcp` already recognizes the connecting IP addresses
96    /// and prevents duplicate outbound connection attempts to the same IP address, it is unable to
97    /// prevent simultaneous "two-way" connections between two peers (i.e. both nodes simultaneously
98    /// attempt to connect to each other). This set is used to prevent this from happening.
99    connecting_peers: Mutex<HashMap<SocketAddr, Option<Peer<N>>>>,
100    /// The set of candidate peer IPs.
101    candidate_peers: RwLock<HashSet<SocketAddr>>,
102    /// The set of restricted peer IPs.
103    restricted_peers: RwLock<HashMap<SocketAddr, Instant>>,
104    /// The spawned handles.
105    handles: Mutex<Vec<JoinHandle<()>>>,
106    /// If the flag is set, the node will periodically evict more external peers.
107    rotate_external_peers: bool,
108    /// If the flag is set, the node will engage in P2P gossip to request more peers.
109    allow_external_peers: bool,
110    /// The boolean flag for the development mode.
111    is_dev: bool,
112}
113
114impl<N: Network> Router<N> {
115    /// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
116    #[cfg(not(any(test)))]
117    const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
118    /// The maximum number of candidate peers permitted to be stored in the node.
119    const MAXIMUM_CANDIDATE_PEERS: usize = 10_000;
120    /// The maximum number of connection failures permitted by an inbound connecting peer.
121    const MAXIMUM_CONNECTION_FAILURES: usize = 5;
122    /// The maximum amount of connection attempts withing a 10 second threshold
123    #[cfg(not(any(test)))]
124    const MAX_CONNECTION_ATTEMPTS: usize = 10;
125    /// The duration in seconds after which a connected peer is considered inactive or
126    /// disconnected if no message has been received in the meantime.
127    const RADIO_SILENCE_IN_SECS: u64 = 150; // 2.5 minutes
128}
129
130impl<N: Network> Router<N> {
131    /// Initializes a new `Router` instance.
132    #[allow(clippy::too_many_arguments)]
133    pub async fn new(
134        node_ip: SocketAddr,
135        node_type: NodeType,
136        account: Account<N>,
137        ledger: Arc<dyn LedgerService<N>>,
138        trusted_peers: &[SocketAddr],
139        max_peers: u16,
140        rotate_external_peers: bool,
141        allow_external_peers: bool,
142        is_dev: bool,
143    ) -> Result<Self> {
144        // Initialize the TCP stack.
145        let tcp = Tcp::new(Config::new(node_ip, max_peers));
146        // Initialize the router.
147        Ok(Self(Arc::new(InnerRouter {
148            tcp,
149            node_type,
150            account,
151            ledger,
152            cache: Default::default(),
153            resolver: Default::default(),
154            trusted_peers: trusted_peers.iter().copied().collect(),
155            connected_peers: Default::default(),
156            connecting_peers: Default::default(),
157            candidate_peers: Default::default(),
158            restricted_peers: Default::default(),
159            handles: Default::default(),
160            rotate_external_peers,
161            allow_external_peers,
162            is_dev,
163        })))
164    }
165}
166
167impl<N: Network> Router<N> {
168    /// Attempts to connect to the given peer IP.
169    ///
170    /// Returns None if we are already connected to the peer or cannot connect.
171    /// Otherwise, it returns a handle to the tokio tasks that sets up the connection.
172    pub fn connect(&self, peer_ip: SocketAddr) -> Option<JoinHandle<bool>> {
173        // Return early if the attempt is against the protocol rules.
174        match self.check_connection_attempt(peer_ip) {
175            Ok(true) => return None,
176            Ok(false) => {}
177            Err(forbidden_message) => {
178                warn!("{forbidden_message}");
179                return None;
180            }
181        }
182
183        let router = self.clone();
184        Some(tokio::spawn(async move {
185            // Attempt to connect to the candidate peer.
186            match router.tcp.connect(peer_ip).await {
187                // Remove the peer from the candidate peers.
188                Ok(()) => {
189                    router.remove_candidate_peer(peer_ip);
190                    true
191                }
192                // If the connection was not allowed, log the error.
193                Err(error) => {
194                    router.connecting_peers.lock().remove(&peer_ip);
195                    warn!("Unable to connect to '{peer_ip}' - {error}");
196                    false
197                }
198            }
199        }))
200    }
201
202    /// Checks if we can and are allowed to connect to the given peer.
203    ///
204    /// # Return Values
205    /// - `Ok(true)` if already connected (or connecting) to the peer.
206    /// - `Ok(false)` if not connected to the peer but allowed to.
207    /// - `Err(err)` if not allowed to connect to the peer.
208    fn check_connection_attempt(&self, peer_ip: SocketAddr) -> Result<bool> {
209        // Ensure the peer IP is not this node.
210        if self.is_local_ip(&peer_ip) {
211            bail!("Dropping connection attempt to '{peer_ip}' (attempted to self-connect)")
212        }
213        // Ensure the node does not surpass the maximum number of peer connections.
214        if self.number_of_connected_peers() >= self.max_connected_peers() {
215            bail!("Dropping connection attempt to '{peer_ip}' (maximum peers reached)")
216        }
217        // Ensure the node is not already connected to this peer.
218        if self.is_connected(&peer_ip) {
219            debug!("Dropping connection attempt to '{peer_ip}' (already connected)");
220            return Ok(true);
221        }
222        // Ensure the peer is not restricted.
223        if self.is_restricted(&peer_ip) {
224            bail!("Dropping connection attempt to '{peer_ip}' (restricted)")
225        }
226        // Ensure the node is not already connecting to this peer.
227        match self.connecting_peers.lock().entry(peer_ip) {
228            Entry::Vacant(entry) => {
229                entry.insert(None);
230                Ok(false)
231            }
232            Entry::Occupied(_) => {
233                debug!("Dropping connection attempt to '{peer_ip}' (already shaking hands as the initiator)");
234                Ok(true)
235            }
236        }
237    }
238
239    /// Disconnects from the given peer IP, if the peer is connected.
240    pub fn disconnect(&self, peer_ip: SocketAddr) -> JoinHandle<bool> {
241        let router = self.clone();
242        tokio::spawn(async move {
243            if let Some(peer_addr) = router.resolve_to_ambiguous(&peer_ip) {
244                // Disconnect from this peer.
245                let disconnected = router.tcp.disconnect(peer_addr).await;
246                // FIXME (ljedrz): this shouldn't be necessary; it's a double-check
247                //  that the higher-level collection is cleaned up after the lower-level disconnect.
248                if router.is_connected(&peer_ip) && !router.tcp.is_connected(peer_addr) {
249                    warn!("Disconnecting with fallback safety (report this to @ljedrz)");
250                    router.remove_connected_peer(peer_ip);
251                }
252                disconnected
253            } else {
254                false
255            }
256        })
257    }
258
259    /// Returns the IP address of this node.
260    pub fn local_ip(&self) -> SocketAddr {
261        self.tcp.listening_addr().expect("The TCP listener is not enabled")
262    }
263
264    /// Returns `true` if the given IP is this node.
265    pub fn is_local_ip(&self, ip: &SocketAddr) -> bool {
266        *ip == self.local_ip()
267            || (ip.ip().is_unspecified() || ip.ip().is_loopback()) && ip.port() == self.local_ip().port()
268    }
269
270    /// Returns `true` if the given IP is not this node, is not a bogon address, and is not unspecified.
271    pub fn is_valid_peer_ip(&self, ip: &SocketAddr) -> bool {
272        !self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
273    }
274
275    /// Returns `true` if the message version is valid.
276    pub fn is_valid_message_version(&self, message_version: u32) -> bool {
277        // Determine the minimum message version this node will accept, based on its role.
278        // - Provers always operate at the latest message version.
279        // - Validators and clients may accept older versions, depending on their current block height.
280        let lowest_accepted_message_version = match self.node_type {
281            // Provers should always use the latest version.
282            NodeType::Prover => Message::<N>::latest_message_version(),
283            // Validators and clients accept messages from lower version based on the migration height.
284            NodeType::Validator | NodeType::Client => {
285                Message::<N>::lowest_accepted_message_version(self.ledger.latest_block_height())
286            }
287        };
288
289        // Check if the incoming message version is valid.
290        message_version >= lowest_accepted_message_version
291    }
292
293    /// Returns the node type.
294    pub fn node_type(&self) -> NodeType {
295        self.node_type
296    }
297
298    /// Returns the account private key of the node.
299    pub fn private_key(&self) -> &PrivateKey<N> {
300        self.account.private_key()
301    }
302
303    /// Returns the account view key of the node.
304    pub fn view_key(&self) -> &ViewKey<N> {
305        self.account.view_key()
306    }
307
308    /// Returns the account address of the node.
309    pub fn address(&self) -> Address<N> {
310        self.account.address()
311    }
312
313    /// Returns `true` if the node is in development mode.
314    pub fn is_dev(&self) -> bool {
315        self.is_dev
316    }
317
318    /// Returns `true` if the node is periodically evicting more external peers.
319    pub fn rotate_external_peers(&self) -> bool {
320        self.rotate_external_peers
321    }
322
323    /// Returns `true` if the node is engaging in P2P gossip to request more peers.
324    pub fn allow_external_peers(&self) -> bool {
325        self.allow_external_peers
326    }
327
328    /// Returns the listener IP address from the (ambiguous) peer address.
329    pub fn resolve_to_listener(&self, peer_addr: &SocketAddr) -> Option<SocketAddr> {
330        self.resolver.get_listener(peer_addr)
331    }
332
333    /// Returns the (ambiguous) peer address from the listener IP address.
334    pub fn resolve_to_ambiguous(&self, peer_ip: &SocketAddr) -> Option<SocketAddr> {
335        self.resolver.get_ambiguous(peer_ip)
336    }
337
338    /// Returns `true` if the node is connected to the given peer IP.
339    pub fn is_connected(&self, ip: &SocketAddr) -> bool {
340        self.connected_peers.read().contains_key(ip)
341    }
342
343    /// Returns `true` if the given peer IP is a connected validator.
344    pub fn is_connected_validator(&self, peer_ip: &SocketAddr) -> bool {
345        self.connected_peers.read().get(peer_ip).map_or(false, |peer| peer.is_validator())
346    }
347
348    /// Returns `true` if the given peer IP is a connected prover.
349    pub fn is_connected_prover(&self, peer_ip: &SocketAddr) -> bool {
350        self.connected_peers.read().get(peer_ip).map_or(false, |peer| peer.is_prover())
351    }
352
353    /// Returns `true` if the given peer IP is a connected client.
354    pub fn is_connected_client(&self, peer_ip: &SocketAddr) -> bool {
355        self.connected_peers.read().get(peer_ip).map_or(false, |peer| peer.is_client())
356    }
357
358    /// Returns `true` if the node is currently connecting to the given peer IP.
359    pub fn is_connecting(&self, ip: &SocketAddr) -> bool {
360        self.connecting_peers.lock().contains_key(ip)
361    }
362
363    /// Returns `true` if the given IP is restricted.
364    pub fn is_restricted(&self, ip: &SocketAddr) -> bool {
365        self.restricted_peers
366            .read()
367            .get(ip)
368            .map(|time| time.elapsed().as_secs() < Self::RADIO_SILENCE_IN_SECS)
369            .unwrap_or(false)
370    }
371
372    /// Returns `true` if the given IP is trusted.
373    pub fn is_trusted(&self, ip: &SocketAddr) -> bool {
374        self.trusted_peers.contains(ip)
375    }
376
377    /// Returns the maximum number of connected peers.
378    pub fn max_connected_peers(&self) -> usize {
379        self.tcp.config().max_connections as usize
380    }
381
382    /// Returns the number of connected peers.
383    pub fn number_of_connected_peers(&self) -> usize {
384        self.connected_peers.read().len()
385    }
386
387    /// Returns the number of connected validators.
388    pub fn number_of_connected_validators(&self) -> usize {
389        self.connected_peers.read().values().filter(|peer| peer.is_validator()).count()
390    }
391
392    /// Returns the number of connected provers.
393    pub fn number_of_connected_provers(&self) -> usize {
394        self.connected_peers.read().values().filter(|peer| peer.is_prover()).count()
395    }
396
397    /// Returns the number of connected clients.
398    pub fn number_of_connected_clients(&self) -> usize {
399        self.connected_peers.read().values().filter(|peer| peer.is_client()).count()
400    }
401
402    /// Returns the number of candidate peers.
403    pub fn number_of_candidate_peers(&self) -> usize {
404        self.candidate_peers.read().len()
405    }
406
407    /// Returns the number of restricted peers.
408    pub fn number_of_restricted_peers(&self) -> usize {
409        self.restricted_peers.read().len()
410    }
411
412    /// Returns the connected peer given the peer IP, if it exists.
413    pub fn get_connected_peer(&self, ip: &SocketAddr) -> Option<Peer<N>> {
414        self.connected_peers.read().get(ip).cloned()
415    }
416
417    /// Returns the connected peers.
418    pub fn get_connected_peers(&self) -> Vec<Peer<N>> {
419        self.connected_peers.read().values().cloned().collect()
420    }
421
422    /// Returns the list of connected peers.
423    pub fn connected_peers(&self) -> Vec<SocketAddr> {
424        self.connected_peers.read().keys().copied().collect()
425    }
426
427    /// Returns the list of connected validators.
428    pub fn connected_validators(&self) -> Vec<SocketAddr> {
429        self.connected_peers.read().iter().filter(|(_, peer)| peer.is_validator()).map(|(ip, _)| *ip).collect()
430    }
431
432    /// Returns the list of connected provers.
433    pub fn connected_provers(&self) -> Vec<SocketAddr> {
434        self.connected_peers.read().iter().filter(|(_, peer)| peer.is_prover()).map(|(ip, _)| *ip).collect()
435    }
436
437    /// Returns the list of connected clients.
438    pub fn connected_clients(&self) -> Vec<SocketAddr> {
439        self.connected_peers.read().iter().filter(|(_, peer)| peer.is_client()).map(|(ip, _)| *ip).collect()
440    }
441
442    /// Returns the list of candidate peers.
443    pub fn candidate_peers(&self) -> HashSet<SocketAddr> {
444        let banned_ips = self.tcp().banned_peers().get_banned_ips();
445        self.candidate_peers.read().iter().filter(|peer| !banned_ips.contains(&peer.ip())).copied().collect()
446    }
447
448    /// Returns the list of restricted peers.
449    pub fn restricted_peers(&self) -> Vec<SocketAddr> {
450        self.restricted_peers.read().keys().copied().collect()
451    }
452
453    /// Returns the list of trusted peers.
454    pub fn trusted_peers(&self) -> &HashSet<SocketAddr> {
455        &self.trusted_peers
456    }
457
458    /// Returns the list of bootstrap peers.
459    #[allow(clippy::if_same_then_else)]
460    pub fn bootstrap_peers(&self) -> Vec<SocketAddr> {
461        if cfg!(feature = "test") || self.is_dev {
462            // Development testing contains no bootstrap peers.
463            vec![]
464        } else if N::ID == snarkvm::console::network::MainnetV0::ID {
465            // Mainnet contains the following bootstrap peers.
466            vec![
467                SocketAddr::from_str("35.231.67.219:4130").unwrap(),
468                SocketAddr::from_str("34.73.195.196:4130").unwrap(),
469                SocketAddr::from_str("34.23.225.202:4130").unwrap(),
470                SocketAddr::from_str("34.148.16.111:4130").unwrap(),
471            ]
472        } else if N::ID == snarkvm::console::network::TestnetV0::ID {
473            // TestnetV0 contains the following bootstrap peers.
474            vec![
475                SocketAddr::from_str("34.138.104.159:4130").unwrap(),
476                SocketAddr::from_str("35.231.46.237:4130").unwrap(),
477                SocketAddr::from_str("34.148.251.155:4130").unwrap(),
478                SocketAddr::from_str("35.190.141.234:4130").unwrap(),
479            ]
480        } else if N::ID == snarkvm::console::network::CanaryV0::ID {
481            // CanaryV0 contains the following bootstrap peers.
482            vec![
483                SocketAddr::from_str("34.139.88.58:4130").unwrap(),
484                SocketAddr::from_str("34.139.252.207:4130").unwrap(),
485                SocketAddr::from_str("35.185.98.12:4130").unwrap(),
486                SocketAddr::from_str("35.231.106.26:4130").unwrap(),
487            ]
488        } else {
489            // Unrecognized networks contain no bootstrap peers.
490            vec![]
491        }
492    }
493
494    /// Check whether the given IP address is currently banned.
495    #[cfg(not(any(test)))]
496    fn is_ip_banned(&self, ip: IpAddr) -> bool {
497        self.tcp.banned_peers().is_ip_banned(&ip)
498    }
499
500    /// Insert or update a banned IP.
501    #[cfg(not(any(test)))]
502    fn update_ip_ban(&self, ip: IpAddr) {
503        self.tcp.banned_peers().update_ip_ban(ip);
504    }
505
506    /// Returns the list of metrics for the connected peers.
507    pub fn connected_metrics(&self) -> Vec<(SocketAddr, NodeType)> {
508        self.connected_peers.read().iter().map(|(ip, peer)| (*ip, peer.node_type())).collect()
509    }
510
511    #[cfg(feature = "metrics")]
512    fn update_metrics(&self) {
513        metrics::gauge(metrics::router::CONNECTED, self.connected_peers.read().len() as f64);
514        metrics::gauge(metrics::router::CANDIDATE, self.candidate_peers.read().len() as f64);
515        metrics::gauge(metrics::router::RESTRICTED, self.restricted_peers.read().len() as f64);
516    }
517
518    /// Inserts the given peer into the connected peers.
519    pub fn insert_connected_peer(&self, peer_ip: SocketAddr) {
520        // Move the peer from "connecting" to "connected".
521        let peer = match self.connecting_peers.lock().remove(&peer_ip) {
522            Some(Some(peer)) => peer,
523            Some(None) => {
524                warn!("Couldn't promote {peer_ip} from \"connecting\" to \"connected\": Handshake not completed");
525                return;
526            }
527            None => {
528                warn!("Couldn't promote {peer_ip} from \"connecting\" to \"connected\": Public/listen address unknown");
529                return;
530            }
531        };
532        // Add an entry for this `Peer` in the connected peers.
533        self.connected_peers.write().insert(peer_ip, peer);
534        // Remove this peer from the candidate peers, if it exists.
535        self.candidate_peers.write().remove(&peer_ip);
536        // Remove this peer from the restricted peers, if it exists.
537        self.restricted_peers.write().remove(&peer_ip);
538        #[cfg(feature = "metrics")]
539        self.update_metrics();
540        info!("Connected to '{peer_ip}'");
541    }
542
543    /// Inserts the given peer IPs to the set of candidate peers.
544    ///
545    /// This method skips adding any given peers if the combined size exceeds the threshold,
546    /// as the peer providing this list could be subverting the protocol.
547    pub fn insert_candidate_peers(&self, peers: &[SocketAddr]) {
548        // Compute the maximum number of candidate peers.
549        let max_candidate_peers = Self::MAXIMUM_CANDIDATE_PEERS.saturating_sub(self.number_of_candidate_peers());
550        // Ensure the combined number of peers does not surpass the threshold.
551        let eligible_peers = peers
552            .iter()
553            .filter(|peer_ip| {
554                // Ensure the peer is not itself, is not already connected, and is not restricted.
555                !self.is_local_ip(peer_ip) && !self.is_connected(peer_ip) && !self.is_restricted(peer_ip)
556            })
557            .take(max_candidate_peers);
558
559        // Proceed to insert the eligible candidate peer IPs.
560        self.candidate_peers.write().extend(eligible_peers);
561        #[cfg(feature = "metrics")]
562        self.update_metrics();
563    }
564
565    /// Inserts the given peer into the restricted peers.
566    pub fn insert_restricted_peer(&self, peer_ip: SocketAddr) {
567        // Remove this peer from the candidate peers, if it exists.
568        self.candidate_peers.write().remove(&peer_ip);
569        // Add the peer to the restricted peers.
570        self.restricted_peers.write().insert(peer_ip, Instant::now());
571        #[cfg(feature = "metrics")]
572        self.update_metrics();
573    }
574
575    /// Updates the connected peer with the given function.
576    pub fn update_connected_peer<Fn: FnMut(&mut Peer<N>)>(
577        &self,
578        peer_ip: SocketAddr,
579        node_type: NodeType,
580        mut write_fn: Fn,
581    ) -> Result<()> {
582        // Retrieve the peer.
583        if let Some(peer) = self.connected_peers.write().get_mut(&peer_ip) {
584            // Ensure the node type has not changed.
585            if peer.node_type() != node_type {
586                bail!("Peer '{peer_ip}' has changed node types from {} to {node_type}", peer.node_type())
587            }
588            // Lastly, update the peer with the given function.
589            write_fn(peer);
590        }
591        Ok(())
592    }
593
594    pub fn update_last_seen_for_connected_peer(&self, peer_ip: SocketAddr) {
595        if let Some(peer) = self.connected_peers.write().get_mut(&peer_ip) {
596            peer.set_last_seen(Instant::now());
597        }
598    }
599
600    /// Removes the connected peer and adds them to the candidate peers.
601    pub fn remove_connected_peer(&self, peer_ip: SocketAddr) {
602        // Removes the bidirectional map between the listener address and (ambiguous) peer address.
603        self.resolver.remove_peer(&peer_ip);
604        // Remove this peer from the connected peers, if it exists.
605        self.connected_peers.write().remove(&peer_ip);
606        // Add the peer to the candidate peers.
607        self.candidate_peers.write().insert(peer_ip);
608        // Clear cached entries applicable to the peer.
609        self.cache.clear_peer_entries(peer_ip);
610        #[cfg(feature = "metrics")]
611        self.update_metrics();
612    }
613
614    #[cfg(feature = "test")]
615    pub fn clear_candidate_peers(&self) {
616        self.candidate_peers.write().clear();
617        #[cfg(feature = "metrics")]
618        self.update_metrics();
619    }
620
621    /// Removes the given address from the candidate peers, if it exists.
622    pub fn remove_candidate_peer(&self, peer_ip: SocketAddr) {
623        self.candidate_peers.write().remove(&peer_ip);
624        #[cfg(feature = "metrics")]
625        self.update_metrics();
626    }
627
628    /// Spawns a task with the given future; it should only be used for long-running tasks.
629    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
630        self.handles.lock().push(tokio::spawn(future));
631    }
632
633    /// Shuts down the router.
634    pub async fn shut_down(&self) {
635        info!("Shutting down the router...");
636        // Abort the tasks.
637        self.handles.lock().iter().for_each(|handle| handle.abort());
638        // Close the listener.
639        self.tcp.shut_down().await;
640    }
641}