snarkos_node_router/
lib.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate async_trait;
20#[macro_use]
21extern crate tracing;
22
23#[cfg(feature = "metrics")]
24extern crate snarkos_node_metrics as metrics;
25
26pub use snarkos_node_router_messages as messages;
27
28mod handshake;
29
30mod heartbeat;
31pub use heartbeat::*;
32
33mod helpers;
34pub use helpers::*;
35
36mod inbound;
37pub use inbound::*;
38
39mod outbound;
40pub use outbound::*;
41
42mod routing;
43pub use routing::*;
44
45mod writing;
46
47pub use crate::messages::NodeType;
48use crate::messages::{BlockRequest, Message, MessageCodec};
49
50use snarkos_account::Account;
51use snarkos_node_bft_ledger_service::LedgerService;
52use snarkos_node_sync_communication_service::CommunicationService;
53use snarkos_node_tcp::{Config, ConnectionSide, P2P, Tcp, is_bogon_ip, is_unspecified_or_broadcast_ip};
54
55use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};
56
57use aleo_std::{StorageMode, aleo_ledger_dir};
58use anyhow::{Result, bail};
59#[cfg(feature = "locktick")]
60use locktick::parking_lot::{Mutex, RwLock};
61#[cfg(not(feature = "locktick"))]
62use parking_lot::{Mutex, RwLock};
63use std::{
64    cmp,
65    collections::{HashMap, HashSet, hash_map::Entry},
66    fs,
67    future::Future,
68    io::{self, Write},
69    net::{IpAddr, SocketAddr},
70    ops::Deref,
71    str::FromStr,
72    sync::Arc,
73    time::{Duration, Instant},
74};
75use tokio::task::JoinHandle;
76
77/// The default port used by the router.
78pub const DEFAULT_NODE_PORT: u16 = 4130;
79
80/// The name of the file containing cached peers.
81const PEER_CACHE_FILENAME: &str = "cached_router_peers";
82
83pub trait PeerPoolHandling<N: Network>: P2P {
84    const OWNER: &str;
85
86    /// The maximum number of peers permitted to be stored in the peer pool.
87    const MAXIMUM_POOL_SIZE: usize;
88
89    /// The number of candidate peers to be removed from the pool once `MAXIMUM_POOL_SIZE` is reached.
90    /// It must be lower than `MAXIMUM_POOL_SIZE`.
91    const PEER_SLASHING_COUNT: usize;
92
93    fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>>;
94
95    fn resolver(&self) -> &RwLock<Resolver<N>>;
96
97    /// Returns `true` if the owning node is in development mode.
98    fn is_dev(&self) -> bool;
99
100    /// Returns the listener address of this node.
101    fn local_ip(&self) -> SocketAddr {
102        self.tcp().listening_addr().expect("The TCP listener is not enabled")
103    }
104
105    /// Returns `true` if the given IP is this node.
106    fn is_local_ip(&self, addr: SocketAddr) -> bool {
107        addr == self.local_ip()
108            || (addr.ip().is_unspecified() || addr.ip().is_loopback()) && addr.port() == self.local_ip().port()
109    }
110
111    /// Returns `true` if the given IP is not this node, is not a bogon address, and is not unspecified.
112    fn is_valid_peer_ip(&self, ip: SocketAddr) -> bool {
113        !self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
114    }
115
116    /// Returns the maximum number of connected peers.
117    fn max_connected_peers(&self) -> usize {
118        self.tcp().config().max_connections as usize
119    }
120
121    /// Ensure we are allowed to connect to the given listener address of a peer.
122    ///
123    /// # Return Values
124    /// - `Ok(true)` if already connected (or connecting) to the peer.
125    /// - `Ok(false)` if not connected to the peer but allowed to.
126    /// - `Err(err)` if not allowed to connect to the peer.
127    fn check_connection_attempt(&self, listener_addr: SocketAddr) -> Result<bool> {
128        // Ensure the peer IP is not this node.
129        if self.is_local_ip(listener_addr) {
130            bail!("{} Dropping connection attempt to '{listener_addr}' (attempted to self-connect)", Self::OWNER);
131        }
132        // Ensure the node does not surpass the maximum number of peer connections.
133        if self.number_of_connected_peers() >= self.max_connected_peers() {
134            bail!("{} Dropping connection attempt to '{listener_addr}' (maximum peers reached)", Self::OWNER);
135        }
136        // Ensure the node is not already connected to this peer.
137        if self.is_connected(listener_addr) {
138            debug!("{} Dropping connection attempt to '{listener_addr}' (already connected)", Self::OWNER);
139            return Ok(true);
140        }
141        // Ensure the node is not already connecting to this peer.
142        if self.is_connecting(listener_addr) {
143            debug!("{} Dropping connection attempt to '{listener_addr}' (already connecting)", Self::OWNER);
144            return Ok(true);
145        }
146        // If the IP is already banned, reject the attempt.
147        if self.is_ip_banned(listener_addr.ip()) {
148            bail!("{} Rejected a connection attempt to a banned IP '{}'", Self::OWNER, listener_addr.ip());
149        }
150        Ok(false)
151    }
152
153    /// Attempts to connect to the given peer's listener address.
154    ///
155    /// Returns None if we are already connected to the peer or cannot connect.
156    /// Otherwise, it returns a handle to the tokio tasks that sets up the connection.
157    fn connect(&self, listener_addr: SocketAddr) -> Option<JoinHandle<bool>> {
158        // Return early if the attempt is against the protocol rules.
159        match self.check_connection_attempt(listener_addr) {
160            Ok(true) => return None,
161            Ok(false) => {}
162            Err(error) => {
163                warn!("{} {error}", Self::OWNER);
164                return None;
165            }
166        }
167
168        // Determine whether the peer is trusted or a bootstrap node in order to decide
169        // how problematic any potential connection issues are.
170        let is_trusted_or_bootstrap =
171            self.is_trusted(listener_addr) || bootstrap_peers::<N>(false).contains(&listener_addr);
172
173        let tcp = self.tcp().clone();
174        Some(tokio::spawn(async move {
175            debug!("{} Connecting to {listener_addr}...", Self::OWNER);
176            // Attempt to connect to the peer.
177            match tcp.connect(listener_addr).await {
178                Ok(_) => true,
179                Err(error) => {
180                    if is_trusted_or_bootstrap {
181                        warn!("{} Unable to connect to '{listener_addr}' - {error}", Self::OWNER);
182                    } else {
183                        debug!("{} Unable to connect to '{listener_addr}' - {error}", Self::OWNER);
184                    }
185                    false
186                }
187            }
188        }))
189    }
190
191    /// Disconnects from the given peer IP, if the peer is connected. The returned boolean
192    /// indicates whether the peer was actually disconnected from, or if this was a noop.
193    fn disconnect(&self, listener_addr: SocketAddr) -> JoinHandle<bool> {
194        if let Some(connected_addr) = self.resolve_to_ambiguous(listener_addr) {
195            let tcp = self.tcp().clone();
196            tokio::spawn(async move { tcp.disconnect(connected_addr).await })
197        } else {
198            tokio::spawn(async { false })
199        }
200    }
201
202    /// Downgrades a connected peer to candidate status.
203    fn downgrade_peer_to_candidate(&self, listener_addr: SocketAddr) {
204        if let Some(peer) = self.peer_pool().write().get_mut(&listener_addr) {
205            if let Peer::Connected(peer) = peer {
206                // Only validators get their aleo address registered with the resolver.
207                let aleo_addr = if peer.node_type == NodeType::Validator { Some(peer.aleo_addr) } else { None };
208                self.resolver().write().remove_peer(peer.connected_addr, aleo_addr);
209            }
210            peer.downgrade_to_candidate(listener_addr);
211        }
212    }
213
214    /// Adds new candidate peers to the peer pool, ensuring their validity and following the
215    /// limit on the number of peers in the pool. The listener addresses may be paired with
216    /// the last known block height of the associated peer.
217    fn insert_candidate_peers(&self, mut listener_addrs: Vec<(SocketAddr, Option<u32>)>) {
218        // Hold a write guard from now on, so as not to accidentally slash multiple times
219        // based on multiple batches of candidate peers, and to not overwrite any entries.
220        let mut peer_pool = self.peer_pool().write();
221
222        // Perform filtering to ensure candidate validity. Also count how many entries are updates.
223        let mut num_updates: usize = 0;
224        listener_addrs.retain(|&(addr, height)| {
225            !self.is_ip_banned(addr.ip())
226                && if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) }
227                && peer_pool
228                    .get(&addr)
229                    .map(|peer| peer.is_candidate() && height.is_some())
230                    .inspect(|is_valid_update| {
231                        if *is_valid_update {
232                            num_updates += 1
233                        }
234                    })
235                    .unwrap_or(true)
236        });
237
238        // If we've managed to filter out every entry, there's nothing to do.
239        if listener_addrs.is_empty() {
240            return;
241        }
242
243        // If we're about to exceed the peer pool size limit, apply candidate slashing.
244        if peer_pool.len() + listener_addrs.len() - num_updates >= Self::MAXIMUM_POOL_SIZE
245            && Self::PEER_SLASHING_COUNT != 0
246        {
247            // Collect the addresses of prospect peers.
248            let mut peers_to_slash = peer_pool
249                .iter()
250                .filter_map(|(addr, peer)| (matches!(peer, Peer::Candidate(_))).then_some(*addr))
251                .collect::<Vec<_>>();
252
253            // Get the low-level peer stats.
254            let known_peers = self.tcp().known_peers().snapshot();
255
256            // Sort the list of candidate peers by failure count (descending) and timestamp (ascending).
257            let default_value = (0, Instant::now());
258            peers_to_slash.sort_unstable_by_key(|addr| {
259                let (num_failures, last_seen) = known_peers
260                    .get(&addr.ip())
261                    .map(|stats| (stats.failures(), stats.timestamp()))
262                    .unwrap_or(default_value);
263                (cmp::Reverse(num_failures), last_seen)
264            });
265
266            // Retain the candidate peers with the most failures and oldest timestamps.
267            peers_to_slash.truncate(Self::PEER_SLASHING_COUNT);
268
269            // Remove the peers to slash from the pool.
270            peer_pool.retain(|addr, _| !peers_to_slash.contains(addr));
271        }
272
273        // Make sure that we won't breach the pool size limit in case the slashing didn't suffice.
274        listener_addrs.truncate(Self::MAXIMUM_POOL_SIZE.saturating_sub(peer_pool.len()));
275
276        // If we've managed to truncate to 0, exit.
277        if listener_addrs.is_empty() {
278            return;
279        }
280
281        // Insert or update the applicable candidate peers.
282        for (addr, height) in listener_addrs {
283            match peer_pool.entry(addr) {
284                Entry::Vacant(entry) => {
285                    entry.insert(Peer::new_candidate(addr, false));
286                }
287                Entry::Occupied(mut entry) => {
288                    if let Peer::Candidate(peer) = entry.get_mut() {
289                        peer.last_height_seen = height;
290                    }
291                }
292            }
293        }
294    }
295
296    /// Completely removes an entry from the peer pool.
297    fn remove_peer(&self, listener_addr: SocketAddr) {
298        self.peer_pool().write().remove(&listener_addr);
299    }
300
301    /// Returns the connected peer address from the listener IP address.
302    fn resolve_to_ambiguous(&self, listener_addr: SocketAddr) -> Option<SocketAddr> {
303        if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
304            Some(peer.connected_addr)
305        } else {
306            None
307        }
308    }
309
310    /// Returns the connected peer aleo address from the listener IP address.
311    fn resolve_to_aleo_addr(&self, listener_addr: SocketAddr) -> Option<Address<N>> {
312        if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
313            Some(peer.aleo_addr)
314        } else {
315            None
316        }
317    }
318
319    /// Returns `true` if the node is connecting to the given peer's listener address.
320    fn is_connecting(&self, listener_addr: SocketAddr) -> bool {
321        self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting())
322    }
323
324    /// Returns `true` if the node is connected to the given peer listener address.
325    fn is_connected(&self, listener_addr: SocketAddr) -> bool {
326        self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connected())
327    }
328
329    /// Returns `true` if the given listener address is trusted.
330    fn is_trusted(&self, listener_addr: SocketAddr) -> bool {
331        self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_trusted())
332    }
333
334    /// Returns the number of all peers.
335    fn number_of_peers(&self) -> usize {
336        self.peer_pool().read().len()
337    }
338
339    /// Returns the number of connected peers.
340    fn number_of_connected_peers(&self) -> usize {
341        self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connected()).count()
342    }
343
344    /// Returns the number of connecting peers.
345    fn number_of_connecting_peers(&self) -> usize {
346        self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connecting()).count()
347    }
348
349    /// Returns the number of candidate peers.
350    fn number_of_candidate_peers(&self) -> usize {
351        self.peer_pool().read().values().filter(|peer| matches!(peer, Peer::Candidate(_))).count()
352    }
353
354    /// Returns the connected peer given the peer IP, if it exists.
355    fn get_connected_peer(&self, listener_addr: SocketAddr) -> Option<ConnectedPeer<N>> {
356        if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
357            Some(peer.clone())
358        } else {
359            None
360        }
361    }
362
363    /// Updates the connected peer - if it exists -  given the peer IP and a closure.
364    /// The returned status indicates whether the update was successful, i.e. the peer had existed.
365    fn update_connected_peer<F: FnMut(&mut ConnectedPeer<N>)>(
366        &self,
367        listener_addr: &SocketAddr,
368        mut update_fn: F,
369    ) -> bool {
370        if let Some(Peer::Connected(peer)) = self.peer_pool().write().get_mut(listener_addr) {
371            update_fn(peer);
372            true
373        } else {
374            false
375        }
376    }
377
378    /// Returns the list of all peers (connected, connecting, and candidate).
379    fn get_peers(&self) -> Vec<Peer<N>> {
380        self.peer_pool().read().values().cloned().collect()
381    }
382
383    /// Returns all connected peers.
384    fn get_connected_peers(&self) -> Vec<ConnectedPeer<N>> {
385        self.filter_connected_peers(|_| true)
386    }
387
388    /// Returns an optionally bounded list of all connected peers sorted by their
389    /// block height (highest first) and failure count (lowest first).
390    fn get_best_connected_peers(&self, max_entries: Option<usize>) -> Vec<ConnectedPeer<N>> {
391        // Get a snapshot of the currently connected peers.
392        let mut peers = self.get_connected_peers();
393        // Get the low-level peer stats.
394        let known_peers = self.tcp().known_peers().snapshot();
395
396        // Sort the prospect peers.
397        peers.sort_unstable_by_key(|peer| {
398            if let Some(peer_stats) = known_peers.get(&peer.listener_addr.ip()) {
399                // Prioritize greatest height, then lowest failure count.
400                (cmp::Reverse(peer.last_height_seen), peer_stats.failures())
401            } else {
402                // Unreachable; use an else-compatible dummy.
403                (cmp::Reverse(peer.last_height_seen), 0)
404            }
405        });
406        if let Some(max) = max_entries {
407            peers.truncate(max);
408        }
409
410        peers
411    }
412
413    /// Returns all connected peers that satisify the given predicate.
414    fn filter_connected_peers<P: FnMut(&ConnectedPeer<N>) -> bool>(&self, mut predicate: P) -> Vec<ConnectedPeer<N>> {
415        self.peer_pool()
416            .read()
417            .values()
418            .filter_map(|p| {
419                if let Peer::Connected(peer) = p
420                    && predicate(peer)
421                {
422                    Some(peer)
423                } else {
424                    None
425                }
426            })
427            .cloned()
428            .collect()
429    }
430
431    /// Returns the list of connected peers.
432    fn connected_peers(&self) -> Vec<SocketAddr> {
433        self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_connected().then_some(*addr)).collect()
434    }
435
436    /// Returns the list of trusted peers.
437    fn trusted_peers(&self) -> Vec<SocketAddr> {
438        self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_trusted().then_some(*addr)).collect()
439    }
440
441    /// Returns the list of candidate peers.
442    fn get_candidate_peers(&self) -> Vec<CandidatePeer> {
443        self.peer_pool()
444            .read()
445            .values()
446            .filter_map(|peer| if let Peer::Candidate(peer) = peer { Some(peer.clone()) } else { None })
447            .collect()
448    }
449
450    /// Returns the list of unconnected trusted peers.
451    fn unconnected_trusted_peers(&self) -> HashSet<SocketAddr> {
452        self.peer_pool()
453            .read()
454            .iter()
455            .filter_map(
456                |(addr, peer)| if let Peer::Candidate(peer) = peer { peer.trusted.then_some(*addr) } else { None },
457            )
458            .collect()
459    }
460
461    /// Loads any previously cached peer addresses so they can be introduced as initial
462    /// candidate peers to connect to.
463    fn load_cached_peers(storage_mode: &StorageMode, filename: &str) -> Result<Vec<SocketAddr>> {
464        let mut peer_cache_path = aleo_ledger_dir(N::ID, storage_mode);
465        peer_cache_path.push(filename);
466
467        let peers = match fs::read_to_string(&peer_cache_path) {
468            Ok(cached_peers_str) => {
469                let mut cached_peers = Vec::new();
470                for peer_addr_str in cached_peers_str.lines() {
471                    match SocketAddr::from_str(peer_addr_str) {
472                        Ok(addr) => cached_peers.push(addr),
473                        Err(error) => warn!("Couldn't parse the cached peer address '{peer_addr_str}': {error}"),
474                    }
475                }
476                cached_peers
477            }
478            Err(error) if error.kind() == io::ErrorKind::NotFound => {
479                // Not an issue - the cache may not exist yet.
480                Vec::new()
481            }
482            Err(error) => {
483                warn!("{} Couldn't load cached peers at {}: {error}", Self::OWNER, peer_cache_path.display());
484                Vec::new()
485            }
486        };
487
488        Ok(peers)
489    }
490
491    /// Preserve the peers who have the greatest known block heights, and the lowest
492    /// number of registered network failures.
493    fn save_best_peers(&self, storage_mode: &StorageMode, filename: &str, max_entries: Option<usize>) -> Result<()> {
494        // Collect all prospect peers.
495        let mut peers = self.get_peers();
496
497        // Get the low-level peer stats.
498        let known_peers = self.tcp().known_peers().snapshot();
499
500        // Sort the list of peers.
501        peers.sort_unstable_by_key(|peer| {
502            if let Some(peer_stats) = known_peers.get(&peer.listener_addr().ip()) {
503                // Prioritize greatest height, then lowest failure count.
504                (cmp::Reverse(peer.last_height_seen()), peer_stats.failures())
505            } else {
506                // Unreachable; use an else-compatible dummy.
507                (cmp::Reverse(peer.last_height_seen()), 0)
508            }
509        });
510        if let Some(max) = max_entries {
511            peers.truncate(max);
512        }
513
514        // Dump the connected peers to a file.
515        let mut path = aleo_ledger_dir(N::ID, storage_mode);
516        path.push(filename);
517        let mut file = fs::File::create(path)?;
518        for peer in peers {
519            writeln!(file, "{}", peer.listener_addr())?;
520        }
521
522        Ok(())
523    }
524
525    // Introduces a new connecting peer into the peer pool if unknown, or promotes
526    // a known candidate peer to a connecting one. The returned boolean indicates
527    // whether the peer has been added/promoted, or rejected due to already being
528    // shaken hands with or connected.
529    fn add_connecting_peer(&self, listener_addr: SocketAddr) -> bool {
530        match self.peer_pool().write().entry(listener_addr) {
531            Entry::Vacant(entry) => {
532                entry.insert(Peer::new_connecting(listener_addr, false));
533                true
534            }
535            Entry::Occupied(mut entry) if matches!(entry.get(), Peer::Candidate(_)) => {
536                entry.insert(Peer::new_connecting(listener_addr, entry.get().is_trusted()));
537                true
538            }
539            Entry::Occupied(_) => false,
540        }
541    }
542
543    /// Temporarily IP-ban and disconnect from the peer with the given listener address and an
544    /// optional reason for the ban. This also removes the peer from the candidate pool.
545    fn ip_ban_peer(&self, listener_addr: SocketAddr, reason: Option<&str>) {
546        let ip = listener_addr.ip();
547        debug!("IP-banning {ip}{}", reason.map(|r| format!(" reason: {r}")).unwrap_or_default());
548
549        // Insert/update the low-level IP ban list.
550        self.tcp().banned_peers().update_ip_ban(ip);
551
552        // Disconnect from the peer.
553        self.disconnect(listener_addr);
554        // Remove the peer from the pool.
555        self.remove_peer(listener_addr);
556    }
557
558    /// Check whether the given IP address is currently banned.
559    fn is_ip_banned(&self, ip: IpAddr) -> bool {
560        self.tcp().banned_peers().is_ip_banned(&ip)
561    }
562
563    /// Insert or update a banned IP.
564    fn update_ip_ban(&self, ip: IpAddr) {
565        self.tcp().banned_peers().update_ip_ban(ip);
566    }
567}
568
569/// The router keeps track of connected and connecting peers.
570/// The actual network communication happens in Inbound/Outbound,
571/// which is implemented by Validator, Prover, and Client.
572#[derive(Clone)]
573pub struct Router<N: Network>(Arc<InnerRouter<N>>);
574
575impl<N: Network> Deref for Router<N> {
576    type Target = Arc<InnerRouter<N>>;
577
578    fn deref(&self) -> &Self::Target {
579        &self.0
580    }
581}
582
583impl<N: Network> PeerPoolHandling<N> for Router<N> {
584    const MAXIMUM_POOL_SIZE: usize = 10_000;
585    const OWNER: &str = "[Router]";
586    const PEER_SLASHING_COUNT: usize = 200;
587
588    fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
589        &self.peer_pool
590    }
591
592    fn resolver(&self) -> &RwLock<Resolver<N>> {
593        &self.resolver
594    }
595
596    fn is_dev(&self) -> bool {
597        self.is_dev
598    }
599}
600
601pub struct InnerRouter<N: Network> {
602    /// The TCP stack.
603    tcp: Tcp,
604    /// The node type.
605    node_type: NodeType,
606    /// The account of the node.
607    account: Account<N>,
608    /// The ledger service.
609    ledger: Arc<dyn LedgerService<N>>,
610    /// The cache.
611    cache: Cache<N>,
612    /// The resolver.
613    resolver: RwLock<Resolver<N>>,
614    /// The collection of both candidate and connected peers.
615    peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
616    /// The spawned handles.
617    handles: Mutex<Vec<JoinHandle<()>>>,
618    /// If the flag is set, the node will periodically evict more external peers.
619    rotate_external_peers: bool,
620    /// If the flag is set, the node will engage in P2P gossip to request more peers.
621    allow_external_peers: bool,
622    /// The storage mode.
623    storage_mode: StorageMode,
624    /// The boolean flag for the development mode.
625    is_dev: bool,
626}
627
628impl<N: Network> Router<N> {
629    /// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
630    #[cfg(not(feature = "test"))]
631    const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
632    /// The maximum amount of connection attempts within a 10 second threshold
633    #[cfg(not(feature = "test"))]
634    const MAX_CONNECTION_ATTEMPTS: usize = 10;
635    /// The duration after which a connected peer is considered inactive or
636    /// disconnected if no message has been received in the meantime.
637    const MAX_RADIO_SILENCE: Duration = Duration::from_secs(150); // 2.5 minutes
638}
639
640impl<N: Network> Router<N> {
641    /// Initializes a new `Router` instance.
642    #[allow(clippy::too_many_arguments)]
643    pub async fn new(
644        node_ip: SocketAddr,
645        node_type: NodeType,
646        account: Account<N>,
647        ledger: Arc<dyn LedgerService<N>>,
648        trusted_peers: &[SocketAddr],
649        max_peers: u16,
650        rotate_external_peers: bool,
651        allow_external_peers: bool,
652        storage_mode: StorageMode,
653        is_dev: bool,
654    ) -> Result<Self> {
655        // Initialize the TCP stack.
656        let tcp = Tcp::new(Config::new(node_ip, max_peers));
657
658        // Prepare the collection of the initial peers.
659        let mut initial_peers = HashMap::new();
660
661        // Load entries from the peer cache (if present).
662        let cached_peers = Self::load_cached_peers(&storage_mode, PEER_CACHE_FILENAME)?;
663        for addr in cached_peers {
664            initial_peers.insert(addr, Peer::new_candidate(addr, false));
665        }
666
667        // Add the trusted peers to the list of the initial peers; this may promote
668        // some of the cached peers to trusted ones.
669        initial_peers.extend(trusted_peers.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
670
671        // Initialize the router.
672        Ok(Self(Arc::new(InnerRouter {
673            tcp,
674            node_type,
675            account,
676            ledger,
677            cache: Default::default(),
678            resolver: Default::default(),
679            peer_pool: RwLock::new(initial_peers),
680            handles: Default::default(),
681            rotate_external_peers,
682            allow_external_peers,
683            storage_mode,
684            is_dev,
685        })))
686    }
687}
688
689impl<N: Network> Router<N> {
690    /// Returns `true` if the message version is valid.
691    pub fn is_valid_message_version(&self, message_version: u32) -> bool {
692        // Determine the minimum message version this node will accept, based on its role.
693        // - Provers always operate at the latest message version.
694        // - Validators and clients may accept older versions, depending on their current block height.
695        let lowest_accepted_message_version = match self.node_type {
696            // Provers should always use the latest version. The bootstrap clients are forced to
697            // be strict, as they don't follow the current chain height.
698            NodeType::Prover | NodeType::BootstrapClient => Message::<N>::latest_message_version(),
699            // Validators and clients accept messages from lower version based on the migration height.
700            NodeType::Validator | NodeType::Client => {
701                Message::<N>::lowest_accepted_message_version(self.ledger.latest_block_height())
702            }
703        };
704
705        // Check if the incoming message version is valid.
706        message_version >= lowest_accepted_message_version
707    }
708
709    /// Returns the node type.
710    pub fn node_type(&self) -> NodeType {
711        self.node_type
712    }
713
714    /// Returns the account private key of the node.
715    pub fn private_key(&self) -> &PrivateKey<N> {
716        self.account.private_key()
717    }
718
719    /// Returns the account view key of the node.
720    pub fn view_key(&self) -> &ViewKey<N> {
721        self.account.view_key()
722    }
723
724    /// Returns the account address of the node.
725    pub fn address(&self) -> Address<N> {
726        self.account.address()
727    }
728
729    /// Returns a reference to the cache.
730    pub fn cache(&self) -> &Cache<N> {
731        &self.cache
732    }
733
734    /// Returns `true` if the node is periodically evicting more external peers.
735    pub fn rotate_external_peers(&self) -> bool {
736        self.rotate_external_peers
737    }
738
739    /// Returns `true` if the node is engaging in P2P gossip to request more peers.
740    pub fn allow_external_peers(&self) -> bool {
741        self.allow_external_peers
742    }
743
744    /// Returns the listener IP address from the (ambiguous) peer address.
745    pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> {
746        self.resolver.read().get_listener(connected_addr)
747    }
748
749    /// Returns the list of metrics for the connected peers.
750    pub fn connected_metrics(&self) -> Vec<(SocketAddr, NodeType)> {
751        self.get_connected_peers().iter().map(|peer| (peer.listener_addr, peer.node_type)).collect()
752    }
753
754    #[cfg(feature = "metrics")]
755    pub fn update_metrics(&self) {
756        metrics::gauge(metrics::router::CONNECTED, self.number_of_connected_peers() as f64);
757        metrics::gauge(metrics::router::CANDIDATE, self.number_of_candidate_peers() as f64);
758    }
759
760    pub fn update_last_seen_for_connected_peer(&self, peer_ip: SocketAddr) {
761        if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
762            peer.update_last_seen();
763        }
764    }
765
766    /// Spawns a task with the given future; it should only be used for long-running tasks.
767    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
768        self.handles.lock().push(tokio::spawn(future));
769    }
770
771    /// Shuts down the router.
772    pub async fn shut_down(&self) {
773        info!("Shutting down the router...");
774        // Save the best peers for future use.
775        if let Err(e) = self.save_best_peers(&self.storage_mode, PEER_CACHE_FILENAME, Some(MAX_PEERS_TO_SEND)) {
776            warn!("Failed to persist best peers to disk: {e}");
777        }
778        // Abort the tasks.
779        self.handles.lock().iter().for_each(|handle| handle.abort());
780        // Close the listener.
781        self.tcp.shut_down().await;
782    }
783}
784
785#[async_trait]
786impl<N: Network> CommunicationService for Router<N> {
787    /// The message type.
788    type Message = Message<N>;
789
790    /// Prepares a block request to be sent.
791    fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
792        debug_assert!(start_height < end_height, "Invalid block request format");
793        Message::BlockRequest(BlockRequest { start_height, end_height })
794    }
795
796    /// Sends the given message to specified peer.
797    ///
798    /// This function returns as soon as the message is queued to be sent,
799    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
800    /// which can be used to determine when and whether the message has been delivered.
801    async fn send(
802        &self,
803        peer_ip: SocketAddr,
804        message: Self::Message,
805    ) -> Option<tokio::sync::oneshot::Receiver<io::Result<()>>> {
806        self.send(peer_ip, message)
807    }
808}
809
810/// Returns the list of bootstrap peers.
811#[allow(clippy::if_same_then_else)]
812pub fn bootstrap_peers<N: Network>(is_dev: bool) -> Vec<SocketAddr> {
813    if cfg!(feature = "test") || is_dev {
814        // Development testing contains optional bootstrap peers loaded from the environment.
815        match std::env::var("TEST_BOOTSTRAP_PEERS") {
816            Ok(peers) => peers.split(',').map(|peer| SocketAddr::from_str(peer).unwrap()).collect(),
817            Err(err) => {
818                warn!("Failed to load bootstrap peers from environment: {err}");
819                vec![]
820            }
821        }
822    } else if N::ID == snarkvm::console::network::MainnetV0::ID {
823        // Mainnet contains the following bootstrap peers.
824        vec![
825            SocketAddr::from_str("35.231.67.219:4130").unwrap(),
826            SocketAddr::from_str("34.73.195.196:4130").unwrap(),
827            SocketAddr::from_str("34.23.225.202:4130").unwrap(),
828            SocketAddr::from_str("34.148.16.111:4130").unwrap(),
829        ]
830    } else if N::ID == snarkvm::console::network::TestnetV0::ID {
831        // TestnetV0 contains the following bootstrap peers.
832        vec![
833            SocketAddr::from_str("34.138.104.159:4130").unwrap(),
834            SocketAddr::from_str("35.231.46.237:4130").unwrap(),
835            SocketAddr::from_str("34.148.251.155:4130").unwrap(),
836            SocketAddr::from_str("35.190.141.234:4130").unwrap(),
837        ]
838    } else if N::ID == snarkvm::console::network::CanaryV0::ID {
839        // CanaryV0 contains the following bootstrap peers.
840        vec![
841            SocketAddr::from_str("34.139.88.58:4130").unwrap(),
842            SocketAddr::from_str("34.139.252.207:4130").unwrap(),
843            SocketAddr::from_str("35.185.98.12:4130").unwrap(),
844            SocketAddr::from_str("35.231.106.26:4130").unwrap(),
845        ]
846    } else {
847        // Unrecognized networks contain no bootstrap peers.
848        vec![]
849    }
850}