Skip to main content

snarkos_node_network/
peering.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{CandidatePeer, ConnectedPeer, ConnectionMode, NodeType, Peer, Resolver, bootstrap_peers};
17
18use snarkos_node_tcp::{P2P, is_bogon_ip, is_unspecified_or_broadcast_ip};
19use snarkvm::prelude::{Address, Network};
20
21use anyhow::{Result, bail};
22#[cfg(feature = "locktick")]
23use locktick::parking_lot::RwLock;
24#[cfg(not(feature = "locktick"))]
25use parking_lot::RwLock;
26use std::{
27    cmp,
28    collections::{
29        HashSet,
30        hash_map::{Entry, HashMap},
31    },
32    fs,
33    io::{self, Write},
34    net::{IpAddr, SocketAddr},
35    path::Path,
36    str::FromStr,
37    time::Instant,
38};
39use tokio::task;
40use tracing::*;
41
42pub trait PeerPoolHandling<N: Network>: P2P {
43    const OWNER: &str;
44
45    /// The maximum number of peers permitted to be stored in the peer pool.
46    const MAXIMUM_POOL_SIZE: usize;
47
48    /// The number of candidate peers to be removed from the pool once `MAXIMUM_POOL_SIZE` is reached.
49    /// It must be lower than `MAXIMUM_POOL_SIZE`.
50    const PEER_SLASHING_COUNT: usize;
51
52    fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>>;
53
54    fn resolver(&self) -> &RwLock<Resolver<N>>;
55
56    /// Returns `true` if the owning node is in development mode.
57    fn is_dev(&self) -> bool;
58
59    /// Returns `true` if the node is in trusted peers only mode.
60    fn trusted_peers_only(&self) -> bool;
61
62    /// Returns the node type.
63    fn node_type(&self) -> NodeType;
64
65    /// Returns the listener address of this node.
66    fn local_ip(&self) -> SocketAddr {
67        self.tcp().listening_addr().expect("The TCP listener is not enabled")
68    }
69
70    /// Returns `true` if the given IP is this node.
71    fn is_local_ip(&self, addr: SocketAddr) -> bool {
72        addr == self.local_ip()
73            || (addr.ip().is_unspecified() || addr.ip().is_loopback()) && addr.port() == self.local_ip().port()
74    }
75
76    /// Returns `true` if the given IP is not this node, is not a bogon address, and is not unspecified.
77    fn is_valid_peer_ip(&self, ip: SocketAddr) -> bool {
78        !self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
79    }
80
81    /// Returns the maximum number of connected peers.
82    fn max_connected_peers(&self) -> usize {
83        self.tcp().config().max_connections as usize
84    }
85
86    /// Ensure we are allowed to connect to the given listener address of a peer.
87    ///
88    /// # Return Values
89    /// - `Ok(true)` if already connected (or connecting) to the peer.
90    /// - `Ok(false)` if not connected to the peer but allowed to.
91    /// - `Err(err)` if not allowed to connect to the peer.
92    fn check_connection_attempt(&self, listener_addr: SocketAddr) -> Result<bool> {
93        // Ensure the peer IP is not this node.
94        if self.is_local_ip(listener_addr) {
95            bail!("{} Dropping connection attempt to '{listener_addr}' (attempted to self-connect)", Self::OWNER);
96        }
97        // Ensure the node does not surpass the maximum number of peer connections.
98        if self.number_of_connected_peers() >= self.max_connected_peers() {
99            bail!("{} Dropping connection attempt to '{listener_addr}' (maximum peers reached)", Self::OWNER);
100        }
101        // Ensure the node is not already connected to this peer.
102        if self.is_connected(listener_addr) {
103            debug!("{} Dropping connection attempt to '{listener_addr}' (already connected)", Self::OWNER);
104            return Ok(true);
105        }
106        // Ensure the node is not already connecting to this peer.
107        if self.is_connecting(listener_addr) {
108            debug!("{} Dropping connection attempt to '{listener_addr}' (already connecting)", Self::OWNER);
109            return Ok(true);
110        }
111        // If the IP is already banned, reject the attempt.
112        if self.is_ip_banned(listener_addr.ip()) {
113            bail!("{} Rejected a connection attempt to a banned IP '{}'", Self::OWNER, listener_addr.ip());
114        }
115        // If the node is in trusted peers only mode, ensure the peer is trusted.
116        if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
117            bail!("{} Dropping connection attempt to '{listener_addr}' (untrusted)", Self::OWNER);
118        }
119        Ok(false)
120    }
121
122    /// Attempts to connect to the given peer's listener address.
123    ///
124    /// Returns None if we are already connected to the peer or cannot connect.
125    /// Otherwise, it returns a handle to the tokio tasks that sets up the connection.
126    fn connect(&self, listener_addr: SocketAddr) -> Option<task::JoinHandle<bool>> {
127        // Return early if the attempt is against the protocol rules.
128        match self.check_connection_attempt(listener_addr) {
129            Ok(true) => return None,
130            Ok(false) => {}
131            Err(error) => {
132                warn!("{} {error}", Self::OWNER);
133                return None;
134            }
135        }
136
137        // Determine whether the peer is trusted or a bootstrap node in order to decide
138        // how problematic any potential connection issues are.
139        let is_trusted_or_bootstrap =
140            self.is_trusted(listener_addr) || bootstrap_peers::<N>(self.is_dev()).contains(&listener_addr);
141
142        let tcp = self.tcp().clone();
143        Some(tokio::spawn(async move {
144            debug!("{} Connecting to {listener_addr}...", Self::OWNER);
145            // Attempt to connect to the peer.
146            match tcp.connect(listener_addr).await {
147                Ok(_) => true,
148                Err(error) => {
149                    if is_trusted_or_bootstrap {
150                        warn!("{} Unable to connect to '{listener_addr}' - {error}", Self::OWNER);
151                    } else {
152                        debug!("{} Unable to connect to '{listener_addr}' - {error}", Self::OWNER);
153                    }
154                    false
155                }
156            }
157        }))
158    }
159
160    /// Disconnects from the given peer IP, if the peer is connected. The returned boolean
161    /// indicates whether the peer was actually disconnected from, or if this was a noop.
162    fn disconnect(&self, listener_addr: SocketAddr) -> task::JoinHandle<bool> {
163        if let Some(connected_addr) = self.resolve_to_ambiguous(listener_addr) {
164            let tcp = self.tcp().clone();
165            tokio::spawn(async move { tcp.disconnect(connected_addr).await })
166        } else {
167            tokio::spawn(async { false })
168        }
169    }
170
171    /// Downgrades a connected peer to candidate status.
172    ///
173    /// Returns true if the peer was fully connected.
174    fn downgrade_peer_to_candidate(&self, listener_addr: SocketAddr) -> bool {
175        let mut peer_pool = self.peer_pool().write();
176        let Some(peer) = peer_pool.get_mut(&listener_addr) else {
177            trace!("{} Downgrade peer to candidate failed - peer not found", Self::OWNER);
178            return false;
179        };
180
181        if let Peer::Connected(conn_peer) = peer {
182            // Exception: the BootstrapClient only has a single Resolver,
183            // so it may only map a validator's Aleo address once, for its
184            // Gateway-mode connection. This also means that the Router-mode
185            // connection may not remove that mapping.
186            let aleo_addr = if self.node_type() == NodeType::BootstrapClient
187                && conn_peer.connection_mode == ConnectionMode::Router
188            {
189                None
190            } else {
191                Some(conn_peer.aleo_addr)
192            };
193            self.resolver().write().remove_peer(conn_peer.connected_addr, aleo_addr);
194            peer.downgrade_to_candidate(listener_addr);
195            true
196        } else {
197            peer.downgrade_to_candidate(listener_addr);
198            false
199        }
200    }
201
202    /// Adds new candidate peers to the peer pool, ensuring their validity and following the
203    /// limit on the number of peers in the pool. The listener addresses may be paired with
204    /// the last known block height of the associated peer.
205    fn insert_candidate_peers(&self, mut listener_addrs: Vec<(SocketAddr, Option<u32>)>) {
206        let trusted_peers = self.trusted_peers();
207
208        // Hold a write guard from now on, so as not to accidentally slash multiple times
209        // based on multiple batches of candidate peers, and to not overwrite any entries.
210        let mut peer_pool = self.peer_pool().write();
211
212        // Perform filtering to ensure candidate validity. Also count how many entries are updates.
213        let mut num_updates: usize = 0;
214        listener_addrs.retain(|&(addr, height)| {
215            !self.is_ip_banned(addr.ip())
216                && if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) }
217                && peer_pool
218                    .get(&addr)
219                    .map(|peer| peer.is_candidate() && height.is_some())
220                    .inspect(|is_valid_update| {
221                        if *is_valid_update {
222                            num_updates += 1
223                        }
224                    })
225                    .unwrap_or(true)
226        });
227
228        // If we've managed to filter out every entry, there's nothing to do.
229        if listener_addrs.is_empty() {
230            return;
231        }
232
233        // If we're about to exceed the peer pool size limit, apply candidate slashing.
234        if peer_pool.len() + listener_addrs.len() - num_updates >= Self::MAXIMUM_POOL_SIZE
235            && Self::PEER_SLASHING_COUNT != 0
236        {
237            // Collect the addresses of prospect peers.
238            let mut peers_to_slash = peer_pool
239                .iter()
240                .filter_map(|(addr, peer)| {
241                    (matches!(peer, Peer::Candidate(_)) && !trusted_peers.contains(addr)).then_some(*addr)
242                })
243                .collect::<Vec<_>>();
244
245            // Get the low-level peer stats.
246            let known_peers = self.tcp().known_peers().snapshot();
247
248            // Sort the list of candidate peers by failure count (descending) and timestamp (ascending).
249            let default_value = (0, Instant::now());
250            peers_to_slash.sort_unstable_by_key(|addr| {
251                let (num_failures, last_seen) = known_peers
252                    .get(&addr.ip())
253                    .map(|stats| (stats.failures(), stats.timestamp()))
254                    .unwrap_or(default_value);
255                (cmp::Reverse(num_failures), last_seen)
256            });
257
258            // Retain the candidate peers with the most failures and oldest timestamps.
259            peers_to_slash.truncate(Self::PEER_SLASHING_COUNT);
260
261            // Remove the peers to slash from the pool.
262            peer_pool.retain(|addr, _| !peers_to_slash.contains(addr));
263
264            // Remove the peers to slash from the low-level list of known peers.
265            self.tcp().known_peers().batch_remove(peers_to_slash.iter().map(|addr| addr.ip()));
266        }
267
268        // Make sure that we won't breach the pool size limit in case the slashing didn't suffice.
269        listener_addrs.truncate(Self::MAXIMUM_POOL_SIZE.saturating_sub(peer_pool.len()));
270
271        // If we've managed to truncate to 0, exit.
272        if listener_addrs.is_empty() {
273            return;
274        }
275
276        // Insert or update the applicable candidate peers.
277        for (addr, height) in listener_addrs {
278            match peer_pool.entry(addr) {
279                Entry::Vacant(entry) => {
280                    entry.insert(Peer::new_candidate(addr, false));
281                }
282                Entry::Occupied(mut entry) => {
283                    if let Peer::Candidate(peer) = entry.get_mut() {
284                        peer.last_height_seen = height;
285                    }
286                }
287            }
288        }
289    }
290
291    /// Completely removes an entry from the peer pool.
292    fn remove_peer(&self, listener_addr: SocketAddr) {
293        self.peer_pool().write().remove(&listener_addr);
294    }
295
296    /// Returns the connected peer address from the listener IP address.
297    fn resolve_to_ambiguous(&self, listener_addr: SocketAddr) -> Option<SocketAddr> {
298        if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
299            Some(peer.connected_addr)
300        } else {
301            None
302        }
303    }
304
305    /// Returns the connected peer aleo address from the listener IP address.
306    fn resolve_to_aleo_addr(&self, listener_addr: SocketAddr) -> Option<Address<N>> {
307        if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
308            Some(peer.aleo_addr)
309        } else {
310            None
311        }
312    }
313
314    /// Returns `true` if the node is connecting to the given peer's listener address.
315    fn is_connecting(&self, listener_addr: SocketAddr) -> bool {
316        self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting())
317    }
318
319    /// Returns `true` if the node is connected to the given peer listener address.
320    fn is_connected(&self, listener_addr: SocketAddr) -> bool {
321        self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connected())
322    }
323
324    /// Returns `true` if the node is connected to the given Aleo address.
325    fn is_connected_address(&self, aleo_address: Address<N>) -> bool {
326        // The resolver only contains data on connected peers.
327        self.resolver().read().get_peer_ip_for_address(aleo_address).is_some()
328    }
329
330    /// Returns `true` if the given listener address is trusted.
331    fn is_trusted(&self, listener_addr: SocketAddr) -> bool {
332        self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_trusted())
333    }
334
335    /// Returns the number of all peers.
336    fn number_of_peers(&self) -> usize {
337        self.peer_pool().read().len()
338    }
339
340    /// Returns the number of connected peers.
341    fn number_of_connected_peers(&self) -> usize {
342        self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connected()).count()
343    }
344
345    /// Returns the number of connecting peers.
346    fn number_of_connecting_peers(&self) -> usize {
347        self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connecting()).count()
348    }
349
350    /// Returns the number of candidate peers.
351    fn number_of_candidate_peers(&self) -> usize {
352        self.peer_pool().read().values().filter(|peer| matches!(peer, Peer::Candidate(_))).count()
353    }
354
355    /// Returns the connected peer given the peer IP, if it exists.
356    fn get_connected_peer(&self, listener_addr: SocketAddr) -> Option<ConnectedPeer<N>> {
357        if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
358            Some(peer.clone())
359        } else {
360            None
361        }
362    }
363
364    /// Updates the connected peer - if it exists -  given the peer IP and a closure.
365    /// The returned status indicates whether the update was successful, i.e. the peer had existed.
366    fn update_connected_peer<F: FnMut(&mut ConnectedPeer<N>)>(
367        &self,
368        listener_addr: &SocketAddr,
369        mut update_fn: F,
370    ) -> bool {
371        if let Some(Peer::Connected(peer)) = self.peer_pool().write().get_mut(listener_addr) {
372            update_fn(peer);
373            true
374        } else {
375            false
376        }
377    }
378
379    /// Returns the list of all peers (connected, connecting, and candidate).
380    fn get_peers(&self) -> Vec<Peer<N>> {
381        self.peer_pool().read().values().cloned().collect()
382    }
383
384    /// Returns all connected peers.
385    fn get_connected_peers(&self) -> Vec<ConnectedPeer<N>> {
386        self.filter_connected_peers(|_| true)
387    }
388
389    /// Returns an optionally bounded list of all connected peers sorted by their
390    /// block height (highest first) and failure count (lowest first).
391    fn get_best_connected_peers(&self, max_entries: Option<usize>) -> Vec<ConnectedPeer<N>> {
392        // Get a snapshot of the currently connected peers.
393        let mut peers = self.get_connected_peers();
394        // Get the low-level peer stats.
395        let known_peers = self.tcp().known_peers().snapshot();
396
397        // Sort the prospect peers.
398        peers.sort_unstable_by_key(|peer| {
399            if let Some(peer_stats) = known_peers.get(&peer.listener_addr.ip()) {
400                // Prioritize greatest height, then lowest failure count.
401                (cmp::Reverse(peer.last_height_seen), peer_stats.failures())
402            } else {
403                // Unreachable; use an else-compatible dummy.
404                (cmp::Reverse(peer.last_height_seen), 0)
405            }
406        });
407        if let Some(max) = max_entries {
408            peers.truncate(max);
409        }
410
411        peers
412    }
413
414    /// Returns all connected peers that satisify the given predicate.
415    fn filter_connected_peers<P: FnMut(&ConnectedPeer<N>) -> bool>(&self, mut predicate: P) -> Vec<ConnectedPeer<N>> {
416        self.peer_pool()
417            .read()
418            .values()
419            .filter_map(|p| {
420                if let Peer::Connected(peer) = p
421                    && predicate(peer)
422                {
423                    Some(peer)
424                } else {
425                    None
426                }
427            })
428            .cloned()
429            .collect()
430    }
431
432    /// Returns the list of connected peers.
433    fn connected_peers(&self) -> Vec<SocketAddr> {
434        self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_connected().then_some(*addr)).collect()
435    }
436
437    /// Returns the list of trusted peers.
438    fn trusted_peers(&self) -> Vec<SocketAddr> {
439        self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_trusted().then_some(*addr)).collect()
440    }
441
442    /// Returns the list of candidate peers.
443    fn get_candidate_peers(&self) -> Vec<CandidatePeer> {
444        self.peer_pool()
445            .read()
446            .values()
447            .filter_map(|peer| if let Peer::Candidate(peer) = peer { Some(peer.clone()) } else { None })
448            .collect()
449    }
450
451    /// Returns the list of unconnected trusted peers.
452    fn unconnected_trusted_peers(&self) -> HashSet<SocketAddr> {
453        self.peer_pool()
454            .read()
455            .iter()
456            .filter_map(
457                |(addr, peer)| if let Peer::Candidate(peer) = peer { peer.trusted.then_some(*addr) } else { None },
458            )
459            .collect()
460    }
461
462    /// Loads any previously cached peer addresses so they can be introduced as initial
463    /// candidate peers to connect to.
464    fn load_cached_peers(path: &Path) -> Result<Vec<SocketAddr>> {
465        let peers = match fs::read_to_string(path) {
466            Ok(cached_peers_str) => {
467                let mut cached_peers = Vec::new();
468                for peer_addr_str in cached_peers_str.lines() {
469                    match SocketAddr::from_str(peer_addr_str) {
470                        Ok(addr) => cached_peers.push(addr),
471                        Err(error) => warn!("Couldn't parse the cached peer address '{peer_addr_str}': {error}"),
472                    }
473                }
474                cached_peers
475            }
476            Err(error) if error.kind() == io::ErrorKind::NotFound => {
477                // Not an issue - the cache may not exist yet.
478                Vec::new()
479            }
480            Err(error) => {
481                warn!("{} Couldn't load cached peers at {}: {error}", Self::OWNER, path.display());
482                Vec::new()
483            }
484        };
485
486        Ok(peers)
487    }
488
489    /// Preserve the peers who have the greatest known block heights, and the lowest
490    /// number of registered network failures.
491    ///
492    /// # Arguments
493    /// * `path` - The path to the file to save the peers to.
494    /// * `max_entries` - The maximum number of peers to save (if there are more, the extra ones are truncated).
495    /// * `store_ports` - Whether to store the ports of the peers, or just the IP addresses.
496    fn save_best_peers(&self, path: &Path, max_entries: Option<usize>, store_ports: bool) -> Result<()> {
497        // Collect all prospect peers.
498        let mut peers = self.get_peers();
499
500        // Get the low-level peer stats.
501        let known_peers = self.tcp().known_peers().snapshot();
502
503        // Sort the list of peers.
504        peers.sort_unstable_by_key(|peer| {
505            if let Some(peer_stats) = known_peers.get(&peer.listener_addr().ip()) {
506                // Prioritize greatest height, then lowest failure count.
507                (cmp::Reverse(peer.last_height_seen()), peer_stats.failures())
508            } else {
509                // Unreachable; use an else-compatible dummy.
510                (cmp::Reverse(peer.last_height_seen()), 0)
511            }
512        });
513        if let Some(max) = max_entries {
514            peers.truncate(max);
515        }
516
517        // Dump the connected and deduplicated peers to a file.
518        let addrs: HashSet<_> = peers
519            .iter()
520            .map(
521                |peer| {
522                    if store_ports { peer.listener_addr().to_string() } else { peer.listener_addr().ip().to_string() }
523                },
524            )
525            .collect();
526
527        let mut file = fs::File::create(path)?;
528        for addr in addrs {
529            writeln!(file, "{addr}")?;
530        }
531
532        Ok(())
533    }
534
535    // Introduces a new connecting peer into the peer pool if unknown, or promotes
536    // a known candidate peer to a connecting one. The returned boolean indicates
537    // whether the peer has been added/promoted, or rejected due to already being
538    // shaken hands with or connected.
539    fn add_connecting_peer(&self, listener_addr: SocketAddr) -> bool {
540        match self.peer_pool().write().entry(listener_addr) {
541            Entry::Vacant(entry) => {
542                entry.insert(Peer::new_connecting(listener_addr, false));
543                true
544            }
545            Entry::Occupied(mut entry) if matches!(entry.get(), Peer::Candidate(_)) => {
546                entry.insert(Peer::new_connecting(listener_addr, entry.get().is_trusted()));
547                true
548            }
549            Entry::Occupied(_) => false,
550        }
551    }
552
553    /// Temporarily IP-ban and disconnect from the peer with the given listener address and an
554    /// optional reason for the ban. This also removes the peer from the candidate pool.
555    fn ip_ban_peer(&self, listener_addr: SocketAddr, reason: Option<&str>) {
556        // Ignore IP-banning if we are in dev mode.
557        if self.is_dev() {
558            return;
559        }
560
561        let ip = listener_addr.ip();
562        debug!("IP-banning {ip}{}", reason.map(|r| format!(" reason: {r}")).unwrap_or_default());
563
564        // Insert/update the low-level IP ban list.
565        self.tcp().banned_peers().update_ip_ban(ip);
566
567        // Disconnect from the peer.
568        self.disconnect(listener_addr);
569        // Remove the peer from the pool.
570        self.remove_peer(listener_addr);
571    }
572
573    /// Check whether the given IP address is currently banned.
574    fn is_ip_banned(&self, ip: IpAddr) -> bool {
575        self.tcp().banned_peers().is_ip_banned(&ip)
576    }
577
578    /// Insert or update a banned IP.
579    fn update_ip_ban(&self, ip: IpAddr) {
580        self.tcp().banned_peers().update_ip_ban(ip);
581    }
582}