Skip to main content

snarkos_node_network/
peering.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{CandidatePeer, ConnectedPeer, ConnectionMode, NodeType, Peer, Resolver};
17
18use snarkos_node_tcp::{ConnectError, P2P, is_bogon_ip, is_unspecified_or_broadcast_ip};
19use snarkvm::prelude::{Address, Network};
20
21use anyhow::Result;
22#[cfg(feature = "locktick")]
23use locktick::parking_lot::RwLock;
24#[cfg(not(feature = "locktick"))]
25use parking_lot::RwLock;
26use std::{
27    cmp,
28    collections::{
29        HashSet,
30        hash_map::{Entry, HashMap},
31    },
32    fs,
33    io::{self, Write},
34    net::{IpAddr, SocketAddr},
35    path::Path,
36    str::FromStr,
37    time::Instant,
38};
39use tokio::task;
40use tracing::*;
41
42/// Application-level errors generated by the peering module.
43/// This is never returned directly, but only as the payload for a `ConnectError`.
44#[derive(Debug)]
45pub enum PeeringError {
46    NoExternalPeersAllowed,
47}
48
49impl snarkos_node_tcp::ApplicationError for PeeringError {}
50
51impl std::fmt::Display for PeeringError {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        match self {
54            Self::NoExternalPeersAllowed => write!(f, "no untrusted peers allowed"),
55        }
56    }
57}
58
59pub trait PeerPoolHandling<N: Network>: P2P {
60    const OWNER: &str;
61
62    /// The maximum number of peers permitted to be stored in the peer pool.
63    const MAXIMUM_POOL_SIZE: usize;
64
65    /// The number of candidate peers to be removed from the pool once `MAXIMUM_POOL_SIZE` is reached.
66    /// It must be lower than `MAXIMUM_POOL_SIZE`.
67    const PEER_SLASHING_COUNT: usize;
68
69    /// Returns the mapping of all known peers (connected or otherwise), keyed by their public listener address.
70    fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>>;
71
72    /// Returns the resolver for translating between public listener addresses and connected addresses.
73    fn resolver(&self) -> &RwLock<Resolver<N>>;
74
75    /// Returns `true` if the owning node is in development mode.
76    fn is_dev(&self) -> bool;
77
78    /// Returns `true` if the node is in trusted peers only mode.
79    fn trusted_peers_only(&self) -> bool;
80
81    /// Returns the node type.
82    fn node_type(&self) -> NodeType;
83
84    /// Returns the listener address of this node.
85    fn local_ip(&self) -> SocketAddr {
86        self.tcp().listening_addr().expect("The TCP listener is not enabled")
87    }
88
89    /// Returns `true` if the given IP is this node.
90    fn is_local_ip(&self, addr: SocketAddr) -> bool {
91        addr == self.local_ip()
92            || (addr.ip().is_unspecified() || addr.ip().is_loopback()) && addr.port() == self.local_ip().port()
93    }
94
95    /// Returns `true` if the given IP is not this node, is not a bogon address, and is not unspecified.
96    fn is_valid_peer_ip(&self, ip: SocketAddr) -> bool {
97        !self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
98    }
99
100    /// Returns the maximum number of connected peers.
101    fn max_connected_peers(&self) -> usize {
102        self.tcp().config().max_connections as usize
103    }
104
105    /// Ensure we can and are allowed to connect to the given listener address of a peer.
106    fn check_connection_attempt(&self, listener_addr: SocketAddr) -> Result<(), ConnectError> {
107        // Ensure the peer IP is not this node.
108        if self.is_local_ip(listener_addr) {
109            return Err(ConnectError::SelfConnect { address: listener_addr });
110        }
111        // Ensure the node does not surpass the maximum number of peer connections.
112        if self.number_of_connected_peers() >= self.max_connected_peers() {
113            return Err(ConnectError::MaximumConnectionsReached { limit: self.max_connected_peers() as u16 });
114        }
115        // Ensure the node is not already connected to this peer.
116        if self.is_connected(listener_addr) {
117            return Err(ConnectError::AlreadyConnected { address: listener_addr });
118        }
119        // Ensure the node is not already connecting to this peer.
120        if self.is_connecting(listener_addr) {
121            return Err(ConnectError::AlreadyConnecting { address: listener_addr });
122        }
123        // Ensure the peer IP is not banned.
124        if self.is_ip_banned(listener_addr.ip()) {
125            return Err(ConnectError::BannedIp { ip: listener_addr.ip() });
126        }
127        // If the node is in trusted peers only mode, ensure the peer is trusted.
128        if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
129            return Err(ConnectError::application(PeeringError::NoExternalPeersAllowed));
130        }
131
132        Ok(())
133    }
134
135    /// Attempts to connect to the given peer's listener address.
136    ///
137    /// Returns an earlier error, if, for example, we are already connected to the peer.
138    /// Otherwise, it returns a handle to the tokio tasks that sets up the connection.
139    ///
140    /// # Concurrency
141    /// Only one task may call this function for a given listener address at a time.
142    fn connect(&self, listener_addr: SocketAddr) -> Result<task::JoinHandle<Result<(), ConnectError>>, ConnectError> {
143        // Return early if the attempt is against the protocol rules.
144        self.check_connection_attempt(listener_addr)?;
145
146        // Update the last connection attempt time for the peer.
147        if let Some(Peer::Candidate(peer)) = self.peer_pool().write().get_mut(&listener_addr) {
148            peer.last_connection_attempt = Some(Instant::now());
149            peer.total_connection_attempts += 1;
150        } else {
151            warn!("{} No candidate peer entry exists for '{listener_addr:?}' while connecting.", Self::OWNER);
152        }
153
154        let tcp = self.tcp().clone();
155        Ok(tokio::spawn(async move {
156            debug!("{} Connecting to {listener_addr}...", Self::OWNER);
157            tcp.connect(listener_addr).await
158        }))
159    }
160
161    /// Disconnects from the given peer IP, if the peer is connected. The returned boolean
162    /// indicates whether the peer was actually disconnected from, or if this was a noop.
163    fn disconnect(&self, listener_addr: SocketAddr) -> task::JoinHandle<bool> {
164        if let Some(connected_addr) = self.resolve_to_ambiguous(listener_addr) {
165            let tcp = self.tcp().clone();
166            tokio::spawn(async move { tcp.disconnect(connected_addr).await })
167        } else {
168            tokio::spawn(async { false })
169        }
170    }
171
172    /// Downgrades a connected peer to candidate status.
173    ///
174    /// Returns true if the peer was fully connected.
175    fn downgrade_peer_to_candidate(&self, listener_addr: SocketAddr) -> bool {
176        let mut peer_pool = self.peer_pool().write();
177        let Some(peer) = peer_pool.get_mut(&listener_addr) else {
178            trace!("{} Downgrade peer to candidate failed - peer not found", Self::OWNER);
179            return false;
180        };
181
182        if let Peer::Connected(conn_peer) = peer {
183            // Exception: the BootstrapClient only has a single Resolver,
184            // so it may only map a validator's Aleo address once, for its
185            // Gateway-mode connection. This also means that the Router-mode
186            // connection may not remove that mapping.
187            let aleo_addr = if self.node_type() == NodeType::BootstrapClient
188                && conn_peer.connection_mode == ConnectionMode::Router
189            {
190                None
191            } else {
192                Some(conn_peer.aleo_addr)
193            };
194            self.resolver().write().remove_peer(conn_peer.connected_addr, aleo_addr);
195            peer.downgrade_to_candidate(listener_addr);
196            true
197        } else {
198            peer.downgrade_to_candidate(listener_addr);
199            false
200        }
201    }
202
203    /// Adds new candidate peers to the peer pool, ensuring their validity and following the
204    /// limit on the number of peers in the pool. The listener addresses may be paired with
205    /// the last known block height of the associated peer.
206    fn insert_candidate_peers(&self, mut listener_addrs: Vec<(SocketAddr, Option<u32>)>) {
207        let trusted_peers = self.trusted_peers();
208
209        // Hold a write guard from now on, so as not to accidentally slash multiple times
210        // based on multiple batches of candidate peers, and to not overwrite any entries.
211        let mut peer_pool = self.peer_pool().write();
212
213        // Perform filtering to ensure candidate validity. Also count how many entries are updates.
214        let mut num_updates: usize = 0;
215        listener_addrs.retain(|&(addr, height)| {
216            !self.is_ip_banned(addr.ip())
217                && if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) }
218                && peer_pool
219                    .get(&addr)
220                    .map(|peer| peer.is_candidate() && height.is_some())
221                    .inspect(|is_valid_update| {
222                        if *is_valid_update {
223                            num_updates += 1
224                        }
225                    })
226                    .unwrap_or(true)
227        });
228
229        // If we've managed to filter out every entry, there's nothing to do.
230        if listener_addrs.is_empty() {
231            return;
232        }
233
234        // If we're about to exceed the peer pool size limit, apply candidate slashing.
235        if peer_pool.len() + listener_addrs.len() - num_updates >= Self::MAXIMUM_POOL_SIZE
236            && Self::PEER_SLASHING_COUNT != 0
237        {
238            // Collect the addresses of prospect peers.
239            let mut peers_to_slash = peer_pool
240                .iter()
241                .filter_map(|(addr, peer)| {
242                    (matches!(peer, Peer::Candidate(_)) && !trusted_peers.contains(addr)).then_some(*addr)
243                })
244                .collect::<Vec<_>>();
245
246            // Get the low-level peer stats.
247            let known_peers = self.tcp().known_peers().snapshot();
248
249            // Sort the list of candidate peers by failure count (descending) and timestamp (ascending).
250            let default_value = (0, Instant::now());
251            peers_to_slash.sort_unstable_by_key(|addr| {
252                let (num_failures, last_seen) = known_peers
253                    .get(&addr.ip())
254                    .map(|stats| (stats.failures(), stats.timestamp()))
255                    .unwrap_or(default_value);
256                (cmp::Reverse(num_failures), last_seen)
257            });
258
259            // Retain the candidate peers with the most failures and oldest timestamps.
260            peers_to_slash.truncate(Self::PEER_SLASHING_COUNT);
261
262            // Remove the peers to slash from the pool.
263            peer_pool.retain(|addr, _| !peers_to_slash.contains(addr));
264
265            // Remove the peers to slash from the low-level list of known peers.
266            self.tcp().known_peers().batch_remove(peers_to_slash.iter().map(|addr| addr.ip()));
267        }
268
269        // Make sure that we won't breach the pool size limit in case the slashing didn't suffice.
270        listener_addrs.truncate(Self::MAXIMUM_POOL_SIZE.saturating_sub(peer_pool.len()));
271
272        // If we've managed to truncate to 0, exit.
273        if listener_addrs.is_empty() {
274            return;
275        }
276
277        // Insert or update the applicable candidate peers.
278        for (addr, height) in listener_addrs {
279            match peer_pool.entry(addr) {
280                Entry::Vacant(entry) => {
281                    entry.insert(Peer::new_candidate(addr, false));
282                }
283                Entry::Occupied(mut entry) => {
284                    if let Peer::Candidate(peer) = entry.get_mut() {
285                        peer.last_height_seen = height;
286                    }
287                }
288            }
289        }
290    }
291
292    /// Completely removes an entry from the peer pool.
293    fn remove_peer(&self, listener_addr: SocketAddr) {
294        self.peer_pool().write().remove(&listener_addr);
295    }
296
297    /// Returns the connected peer address from the listener IP address.
298    fn resolve_to_ambiguous(&self, listener_addr: SocketAddr) -> Option<SocketAddr> {
299        if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
300            Some(peer.connected_addr)
301        } else {
302            None
303        }
304    }
305
306    /// Returns the connected peer aleo address from the listener IP address.
307    fn resolve_to_aleo_addr(&self, listener_addr: SocketAddr) -> Option<Address<N>> {
308        if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
309            Some(peer.aleo_addr)
310        } else {
311            None
312        }
313    }
314
315    /// Returns `true` if the node is connecting to the given peer's listener address.
316    fn is_connecting(&self, listener_addr: SocketAddr) -> bool {
317        self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting())
318    }
319
320    /// Returns `true` if the node is connected to the given peer listener address.
321    fn is_connected(&self, listener_addr: SocketAddr) -> bool {
322        self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connected())
323    }
324
325    /// Returns `true` if the node is connected to the given Aleo address.
326    fn is_connected_address(&self, aleo_address: Address<N>) -> bool {
327        // The resolver only contains data on connected peers.
328        self.resolver().read().get_peer_ip_for_address(aleo_address).is_some()
329    }
330
331    /// Returns `true` if the node is connected or connecting to the given peer listener address.
332    fn is_connecting_or_connected(&self, listener_addr: SocketAddr) -> bool {
333        self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting() || peer.is_connected())
334    }
335
336    /// Returns `true` if the given listener address is trusted.
337    fn is_trusted(&self, listener_addr: SocketAddr) -> bool {
338        self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_trusted())
339    }
340
341    /// Returns the number of all peers.
342    fn number_of_peers(&self) -> usize {
343        self.peer_pool().read().len()
344    }
345
346    /// Returns the number of connected peers.
347    fn number_of_connected_peers(&self) -> usize {
348        self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connected()).count()
349    }
350
351    /// Returns the number of connecting peers.
352    fn number_of_connecting_peers(&self) -> usize {
353        self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connecting()).count()
354    }
355
356    /// Returns the number of candidate peers.
357    fn number_of_candidate_peers(&self) -> usize {
358        self.peer_pool().read().values().filter(|peer| matches!(peer, Peer::Candidate(_))).count()
359    }
360
361    /// Returns the connected peer given the peer IP, if it exists.
362    fn get_connected_peer(&self, listener_addr: SocketAddr) -> Option<ConnectedPeer<N>> {
363        if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
364            Some(peer.clone())
365        } else {
366            None
367        }
368    }
369
370    /// Updates the connected peer - if it exists -  given the peer IP and a closure.
371    /// The returned status indicates whether the update was successful, i.e. the peer had existed.
372    fn update_connected_peer<F: FnMut(&mut ConnectedPeer<N>)>(
373        &self,
374        listener_addr: &SocketAddr,
375        mut update_fn: F,
376    ) -> bool {
377        if let Some(Peer::Connected(peer)) = self.peer_pool().write().get_mut(listener_addr) {
378            update_fn(peer);
379            true
380        } else {
381            false
382        }
383    }
384
385    /// Returns the list of all peers (connected, connecting, and candidate).
386    fn get_peers(&self) -> Vec<Peer<N>> {
387        self.peer_pool().read().values().cloned().collect()
388    }
389
390    /// Returns all connected peers.
391    fn get_connected_peers(&self) -> Vec<ConnectedPeer<N>> {
392        self.filter_connected_peers(|_| true)
393    }
394
395    /// Returns an optionally bounded list of all connected peers sorted by their
396    /// block height (highest first) and failure count (lowest first).
397    fn get_best_connected_peers(&self, max_entries: Option<usize>) -> Vec<ConnectedPeer<N>> {
398        // Get a snapshot of the currently connected peers.
399        let mut peers = self.get_connected_peers();
400        // Get the low-level peer stats.
401        let known_peers = self.tcp().known_peers().snapshot();
402
403        // Sort the prospect peers.
404        peers.sort_unstable_by_key(|peer| {
405            if let Some(peer_stats) = known_peers.get(&peer.listener_addr.ip()) {
406                // Prioritize greatest height, then lowest failure count.
407                (cmp::Reverse(peer.last_height_seen), peer_stats.failures())
408            } else {
409                // Unreachable; use an else-compatible dummy.
410                (cmp::Reverse(peer.last_height_seen), 0)
411            }
412        });
413        if let Some(max) = max_entries {
414            peers.truncate(max);
415        }
416
417        peers
418    }
419
420    /// Returns all connected peers that satisify the given predicate.
421    fn filter_connected_peers<P: FnMut(&ConnectedPeer<N>) -> bool>(&self, mut predicate: P) -> Vec<ConnectedPeer<N>> {
422        self.peer_pool()
423            .read()
424            .values()
425            .filter_map(|p| {
426                if let Peer::Connected(peer) = p
427                    && predicate(peer)
428                {
429                    Some(peer)
430                } else {
431                    None
432                }
433            })
434            .cloned()
435            .collect()
436    }
437
438    /// Returns the list of connected peers.
439    fn connected_peers(&self) -> Vec<SocketAddr> {
440        self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_connected().then_some(*addr)).collect()
441    }
442
443    /// Returns the list of trusted peers.
444    fn trusted_peers(&self) -> Vec<SocketAddr> {
445        self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_trusted().then_some(*addr)).collect()
446    }
447
448    /// Returns the list of candidate peers.
449    fn get_candidate_peers(&self) -> Vec<CandidatePeer> {
450        self.peer_pool()
451            .read()
452            .values()
453            .filter_map(|peer| if let Peer::Candidate(peer) = peer { Some(peer.clone()) } else { None })
454            .collect()
455    }
456
457    /// Returns the list of trusted candidate peers.
458    fn get_trusted_candidate_peers(&self) -> Vec<CandidatePeer> {
459        self.peer_pool()
460            .read()
461            .values()
462            .filter_map(|peer| {
463                if let Peer::Candidate(peer) = peer
464                    && peer.trusted
465                {
466                    Some(peer.clone())
467                } else {
468                    None
469                }
470            })
471            .collect()
472    }
473
474    /// Loads any previously cached peer addresses so they can be introduced as initial
475    /// candidate peers to connect to.
476    fn load_cached_peers(path: &Path) -> Result<Vec<SocketAddr>> {
477        let peers = match fs::read_to_string(path) {
478            Ok(cached_peers_str) => {
479                let mut cached_peers = Vec::new();
480                for peer_addr_str in cached_peers_str.lines() {
481                    match SocketAddr::from_str(peer_addr_str) {
482                        Ok(addr) => cached_peers.push(addr),
483                        Err(error) => warn!("Couldn't parse the cached peer address '{peer_addr_str}': {error}"),
484                    }
485                }
486                cached_peers
487            }
488            Err(error) if error.kind() == io::ErrorKind::NotFound => {
489                // Not an issue - the cache may not exist yet.
490                Vec::new()
491            }
492            Err(error) => {
493                warn!("{} Couldn't load cached peers at {}: {error}", Self::OWNER, path.display());
494                Vec::new()
495            }
496        };
497
498        Ok(peers)
499    }
500
501    /// Preserve the peers who have the greatest known block heights, and the lowest
502    /// number of registered network failures.
503    ///
504    /// # Arguments
505    /// * `path` - The path to the file to save the peers to.
506    /// * `max_entries` - The maximum number of peers to save (if there are more, the extra ones are truncated).
507    /// * `store_ports` - Whether to store the ports of the peers, or just the IP addresses.
508    fn save_best_peers(&self, path: &Path, max_entries: Option<usize>, store_ports: bool) -> Result<()> {
509        // Collect all prospect peers.
510        let mut peers = self.get_peers();
511
512        // Get the low-level peer stats.
513        let known_peers = self.tcp().known_peers().snapshot();
514
515        // Sort the list of peers.
516        peers.sort_unstable_by_key(|peer| {
517            if let Some(peer_stats) = known_peers.get(&peer.listener_addr().ip()) {
518                // Prioritize greatest height, then lowest failure count.
519                (cmp::Reverse(peer.last_height_seen()), peer_stats.failures())
520            } else {
521                // Unreachable; use an else-compatible dummy.
522                (cmp::Reverse(peer.last_height_seen()), 0)
523            }
524        });
525        if let Some(max) = max_entries {
526            peers.truncate(max);
527        }
528
529        // Dump the connected and deduplicated peers to a file.
530        let addrs: HashSet<_> = peers
531            .iter()
532            .map(
533                |peer| {
534                    if store_ports { peer.listener_addr().to_string() } else { peer.listener_addr().ip().to_string() }
535                },
536            )
537            .collect();
538
539        let mut file = fs::File::create(path)?;
540        for addr in addrs {
541            writeln!(file, "{addr}")?;
542        }
543
544        Ok(())
545    }
546
547    // Introduces a new connecting peer into the peer pool if unknown, or promotes
548    // a known candidate peer to a connecting one. The returned boolean indicates
549    // whether the peer has been added/promoted, or rejected due to already being
550    // shaken hands with or connected.
551    fn add_connecting_peer(&self, listener_addr: SocketAddr) -> Result<(), ConnectError> {
552        match self.peer_pool().write().entry(listener_addr) {
553            Entry::Vacant(entry) => {
554                entry.insert(Peer::new_connecting(listener_addr, false));
555                Ok(())
556            }
557            Entry::Occupied(mut entry) => match entry.get() {
558                peer @ Peer::Candidate(_) => {
559                    entry.insert(Peer::new_connecting(listener_addr, peer.is_trusted()));
560                    Ok(())
561                }
562                Peer::Connecting(_) => Err(ConnectError::AlreadyConnecting { address: listener_addr }),
563                Peer::Connected(_) => Err(ConnectError::AlreadyConnected { address: listener_addr }),
564            },
565        }
566    }
567
568    /// Temporarily IP-ban and disconnect from the peer with the given listener address and an
569    /// optional reason for the ban. This also removes the peer from the candidate pool.
570    fn ip_ban_peer(&self, listener_addr: SocketAddr, reason: Option<&str>) {
571        // Ignore IP-banning if we are in dev mode.
572        if self.is_dev() {
573            return;
574        }
575
576        let ip = listener_addr.ip();
577        debug!("IP-banning {ip}{}", reason.map(|r| format!(" reason: {r}")).unwrap_or_default());
578
579        // Insert/update the low-level IP ban list.
580        self.tcp().banned_peers().update_ip_ban(ip);
581
582        // Disconnect from the peer.
583        self.disconnect(listener_addr);
584        // Remove the peer from the pool.
585        self.remove_peer(listener_addr);
586    }
587
588    /// Check whether the given IP address is currently banned.
589    fn is_ip_banned(&self, ip: IpAddr) -> bool {
590        self.tcp().banned_peers().is_ip_banned(&ip)
591    }
592
593    /// Insert or update a banned IP.
594    fn update_ip_ban(&self, ip: IpAddr) {
595        self.tcp().banned_peers().update_ip_ban(ip);
596    }
597}