snarkos_node_network/
peering.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{CandidatePeer, ConnectedPeer, ConnectionMode, NodeType, Peer, Resolver, bootstrap_peers};
17
18use aleo_std::{StorageMode, aleo_ledger_dir};
19use snarkos_node_tcp::{P2P, is_bogon_ip, is_unspecified_or_broadcast_ip};
20use snarkvm::prelude::{Address, Network};
21
22use anyhow::{Result, bail};
23#[cfg(feature = "locktick")]
24use locktick::parking_lot::RwLock;
25#[cfg(not(feature = "locktick"))]
26use parking_lot::RwLock;
27use std::{
28    cmp,
29    collections::{
30        HashSet,
31        hash_map::{Entry, HashMap},
32    },
33    fs,
34    io::{self, Write},
35    net::{IpAddr, SocketAddr},
36    str::FromStr,
37    time::Instant,
38};
39use tokio::task;
40use tracing::*;
41
42pub trait PeerPoolHandling<N: Network>: P2P {
43    const OWNER: &str;
44
45    /// The maximum number of peers permitted to be stored in the peer pool.
46    const MAXIMUM_POOL_SIZE: usize;
47
48    /// The number of candidate peers to be removed from the pool once `MAXIMUM_POOL_SIZE` is reached.
49    /// It must be lower than `MAXIMUM_POOL_SIZE`.
50    const PEER_SLASHING_COUNT: usize;
51
52    fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>>;
53
54    fn resolver(&self) -> &RwLock<Resolver<N>>;
55
56    /// Returns `true` if the owning node is in development mode.
57    fn is_dev(&self) -> bool;
58
59    /// Returns `true` if the node is in trusted peers only mode.
60    fn trusted_peers_only(&self) -> bool;
61
62    /// Returns the node type.
63    fn node_type(&self) -> NodeType;
64
65    /// Returns the listener address of this node.
66    fn local_ip(&self) -> SocketAddr {
67        self.tcp().listening_addr().expect("The TCP listener is not enabled")
68    }
69
70    /// Returns `true` if the given IP is this node.
71    fn is_local_ip(&self, addr: SocketAddr) -> bool {
72        addr == self.local_ip()
73            || (addr.ip().is_unspecified() || addr.ip().is_loopback()) && addr.port() == self.local_ip().port()
74    }
75
76    /// Returns `true` if the given IP is not this node, is not a bogon address, and is not unspecified.
77    fn is_valid_peer_ip(&self, ip: SocketAddr) -> bool {
78        !self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
79    }
80
81    /// Returns the maximum number of connected peers.
82    fn max_connected_peers(&self) -> usize {
83        self.tcp().config().max_connections as usize
84    }
85
86    /// Ensure we are allowed to connect to the given listener address of a peer.
87    ///
88    /// # Return Values
89    /// - `Ok(true)` if already connected (or connecting) to the peer.
90    /// - `Ok(false)` if not connected to the peer but allowed to.
91    /// - `Err(err)` if not allowed to connect to the peer.
92    fn check_connection_attempt(&self, listener_addr: SocketAddr) -> Result<bool> {
93        // Ensure the peer IP is not this node.
94        if self.is_local_ip(listener_addr) {
95            bail!("{} Dropping connection attempt to '{listener_addr}' (attempted to self-connect)", Self::OWNER);
96        }
97        // Ensure the node does not surpass the maximum number of peer connections.
98        if self.number_of_connected_peers() >= self.max_connected_peers() {
99            bail!("{} Dropping connection attempt to '{listener_addr}' (maximum peers reached)", Self::OWNER);
100        }
101        // Ensure the node is not already connected to this peer.
102        if self.is_connected(listener_addr) {
103            debug!("{} Dropping connection attempt to '{listener_addr}' (already connected)", Self::OWNER);
104            return Ok(true);
105        }
106        // Ensure the node is not already connecting to this peer.
107        if self.is_connecting(listener_addr) {
108            debug!("{} Dropping connection attempt to '{listener_addr}' (already connecting)", Self::OWNER);
109            return Ok(true);
110        }
111        // If the IP is already banned, reject the attempt.
112        if self.is_ip_banned(listener_addr.ip()) {
113            bail!("{} Rejected a connection attempt to a banned IP '{}'", Self::OWNER, listener_addr.ip());
114        }
115        // If the node is in trusted peers only mode, ensure the peer is trusted.
116        if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
117            bail!("{} Dropping connection attempt to '{listener_addr}' (untrusted)", Self::OWNER);
118        }
119        Ok(false)
120    }
121
122    /// Attempts to connect to the given peer's listener address.
123    ///
124    /// Returns None if we are already connected to the peer or cannot connect.
125    /// Otherwise, it returns a handle to the tokio tasks that sets up the connection.
126    fn connect(&self, listener_addr: SocketAddr) -> Option<task::JoinHandle<bool>> {
127        // Return early if the attempt is against the protocol rules.
128        match self.check_connection_attempt(listener_addr) {
129            Ok(true) => return None,
130            Ok(false) => {}
131            Err(error) => {
132                warn!("{} {error}", Self::OWNER);
133                return None;
134            }
135        }
136
137        // Determine whether the peer is trusted or a bootstrap node in order to decide
138        // how problematic any potential connection issues are.
139        let is_trusted_or_bootstrap =
140            self.is_trusted(listener_addr) || bootstrap_peers::<N>(self.is_dev()).contains(&listener_addr);
141
142        let tcp = self.tcp().clone();
143        Some(tokio::spawn(async move {
144            debug!("{} Connecting to {listener_addr}...", Self::OWNER);
145            // Attempt to connect to the peer.
146            match tcp.connect(listener_addr).await {
147                Ok(_) => true,
148                Err(error) => {
149                    if is_trusted_or_bootstrap {
150                        warn!("{} Unable to connect to '{listener_addr}' - {error}", Self::OWNER);
151                    } else {
152                        debug!("{} Unable to connect to '{listener_addr}' - {error}", Self::OWNER);
153                    }
154                    false
155                }
156            }
157        }))
158    }
159
160    /// Disconnects from the given peer IP, if the peer is connected. The returned boolean
161    /// indicates whether the peer was actually disconnected from, or if this was a noop.
162    fn disconnect(&self, listener_addr: SocketAddr) -> task::JoinHandle<bool> {
163        if let Some(connected_addr) = self.resolve_to_ambiguous(listener_addr) {
164            let tcp = self.tcp().clone();
165            tokio::spawn(async move { tcp.disconnect(connected_addr).await })
166        } else {
167            tokio::spawn(async { false })
168        }
169    }
170
171    /// Downgrades a connected peer to candidate status.
172    fn downgrade_peer_to_candidate(&self, listener_addr: SocketAddr) {
173        if let Some(peer) = self.peer_pool().write().get_mut(&listener_addr) {
174            if let Peer::Connected(peer) = peer {
175                // Exception: the BootstrapClient only has a single Resolver,
176                // so it may only map a validator's Aleo address once, for its
177                // Gateway-mode connection. This also means that the Router-mode
178                // connection may not remove that mapping.
179                let aleo_addr = if self.node_type() == NodeType::BootstrapClient
180                    && peer.connection_mode == ConnectionMode::Router
181                {
182                    None
183                } else {
184                    Some(peer.aleo_addr)
185                };
186                self.resolver().write().remove_peer(peer.connected_addr, aleo_addr);
187            }
188            peer.downgrade_to_candidate(listener_addr);
189        }
190    }
191
192    /// Adds new candidate peers to the peer pool, ensuring their validity and following the
193    /// limit on the number of peers in the pool. The listener addresses may be paired with
194    /// the last known block height of the associated peer.
195    fn insert_candidate_peers(&self, mut listener_addrs: Vec<(SocketAddr, Option<u32>)>) {
196        let trusted_peers = self.trusted_peers();
197
198        // Hold a write guard from now on, so as not to accidentally slash multiple times
199        // based on multiple batches of candidate peers, and to not overwrite any entries.
200        let mut peer_pool = self.peer_pool().write();
201
202        // Perform filtering to ensure candidate validity. Also count how many entries are updates.
203        let mut num_updates: usize = 0;
204        listener_addrs.retain(|&(addr, height)| {
205            !self.is_ip_banned(addr.ip())
206                && if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) }
207                && peer_pool
208                    .get(&addr)
209                    .map(|peer| peer.is_candidate() && height.is_some())
210                    .inspect(|is_valid_update| {
211                        if *is_valid_update {
212                            num_updates += 1
213                        }
214                    })
215                    .unwrap_or(true)
216        });
217
218        // If we've managed to filter out every entry, there's nothing to do.
219        if listener_addrs.is_empty() {
220            return;
221        }
222
223        // If we're about to exceed the peer pool size limit, apply candidate slashing.
224        if peer_pool.len() + listener_addrs.len() - num_updates >= Self::MAXIMUM_POOL_SIZE
225            && Self::PEER_SLASHING_COUNT != 0
226        {
227            // Collect the addresses of prospect peers.
228            let mut peers_to_slash = peer_pool
229                .iter()
230                .filter_map(|(addr, peer)| {
231                    (matches!(peer, Peer::Candidate(_)) && !trusted_peers.contains(addr)).then_some(*addr)
232                })
233                .collect::<Vec<_>>();
234
235            // Get the low-level peer stats.
236            let known_peers = self.tcp().known_peers().snapshot();
237
238            // Sort the list of candidate peers by failure count (descending) and timestamp (ascending).
239            let default_value = (0, Instant::now());
240            peers_to_slash.sort_unstable_by_key(|addr| {
241                let (num_failures, last_seen) = known_peers
242                    .get(&addr.ip())
243                    .map(|stats| (stats.failures(), stats.timestamp()))
244                    .unwrap_or(default_value);
245                (cmp::Reverse(num_failures), last_seen)
246            });
247
248            // Retain the candidate peers with the most failures and oldest timestamps.
249            peers_to_slash.truncate(Self::PEER_SLASHING_COUNT);
250
251            // Remove the peers to slash from the pool.
252            peer_pool.retain(|addr, _| !peers_to_slash.contains(addr));
253
254            // Remove the peers to slash from the low-level list of known peers.
255            self.tcp().known_peers().batch_remove(peers_to_slash.iter().map(|addr| addr.ip()));
256        }
257
258        // Make sure that we won't breach the pool size limit in case the slashing didn't suffice.
259        listener_addrs.truncate(Self::MAXIMUM_POOL_SIZE.saturating_sub(peer_pool.len()));
260
261        // If we've managed to truncate to 0, exit.
262        if listener_addrs.is_empty() {
263            return;
264        }
265
266        // Insert or update the applicable candidate peers.
267        for (addr, height) in listener_addrs {
268            match peer_pool.entry(addr) {
269                Entry::Vacant(entry) => {
270                    entry.insert(Peer::new_candidate(addr, false));
271                }
272                Entry::Occupied(mut entry) => {
273                    if let Peer::Candidate(peer) = entry.get_mut() {
274                        peer.last_height_seen = height;
275                    }
276                }
277            }
278        }
279    }
280
281    /// Completely removes an entry from the peer pool.
282    fn remove_peer(&self, listener_addr: SocketAddr) {
283        self.peer_pool().write().remove(&listener_addr);
284    }
285
286    /// Returns the connected peer address from the listener IP address.
287    fn resolve_to_ambiguous(&self, listener_addr: SocketAddr) -> Option<SocketAddr> {
288        if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
289            Some(peer.connected_addr)
290        } else {
291            None
292        }
293    }
294
295    /// Returns the connected peer aleo address from the listener IP address.
296    fn resolve_to_aleo_addr(&self, listener_addr: SocketAddr) -> Option<Address<N>> {
297        if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
298            Some(peer.aleo_addr)
299        } else {
300            None
301        }
302    }
303
304    /// Returns `true` if the node is connecting to the given peer's listener address.
305    fn is_connecting(&self, listener_addr: SocketAddr) -> bool {
306        self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting())
307    }
308
309    /// Returns `true` if the node is connected to the given peer listener address.
310    fn is_connected(&self, listener_addr: SocketAddr) -> bool {
311        self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connected())
312    }
313
314    /// Returns `true` if the node is connected to the given Aleo address.
315    fn is_connected_address(&self, aleo_address: Address<N>) -> bool {
316        // The resolver only contains data on connected peers.
317        self.resolver().read().get_peer_ip_for_address(aleo_address).is_some()
318    }
319
320    /// Returns `true` if the given listener address is trusted.
321    fn is_trusted(&self, listener_addr: SocketAddr) -> bool {
322        self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_trusted())
323    }
324
325    /// Returns the number of all peers.
326    fn number_of_peers(&self) -> usize {
327        self.peer_pool().read().len()
328    }
329
330    /// Returns the number of connected peers.
331    fn number_of_connected_peers(&self) -> usize {
332        self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connected()).count()
333    }
334
335    /// Returns the number of connecting peers.
336    fn number_of_connecting_peers(&self) -> usize {
337        self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connecting()).count()
338    }
339
340    /// Returns the number of candidate peers.
341    fn number_of_candidate_peers(&self) -> usize {
342        self.peer_pool().read().values().filter(|peer| matches!(peer, Peer::Candidate(_))).count()
343    }
344
345    /// Returns the connected peer given the peer IP, if it exists.
346    fn get_connected_peer(&self, listener_addr: SocketAddr) -> Option<ConnectedPeer<N>> {
347        if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
348            Some(peer.clone())
349        } else {
350            None
351        }
352    }
353
354    /// Updates the connected peer - if it exists -  given the peer IP and a closure.
355    /// The returned status indicates whether the update was successful, i.e. the peer had existed.
356    fn update_connected_peer<F: FnMut(&mut ConnectedPeer<N>)>(
357        &self,
358        listener_addr: &SocketAddr,
359        mut update_fn: F,
360    ) -> bool {
361        if let Some(Peer::Connected(peer)) = self.peer_pool().write().get_mut(listener_addr) {
362            update_fn(peer);
363            true
364        } else {
365            false
366        }
367    }
368
369    /// Returns the list of all peers (connected, connecting, and candidate).
370    fn get_peers(&self) -> Vec<Peer<N>> {
371        self.peer_pool().read().values().cloned().collect()
372    }
373
374    /// Returns all connected peers.
375    fn get_connected_peers(&self) -> Vec<ConnectedPeer<N>> {
376        self.filter_connected_peers(|_| true)
377    }
378
379    /// Returns an optionally bounded list of all connected peers sorted by their
380    /// block height (highest first) and failure count (lowest first).
381    fn get_best_connected_peers(&self, max_entries: Option<usize>) -> Vec<ConnectedPeer<N>> {
382        // Get a snapshot of the currently connected peers.
383        let mut peers = self.get_connected_peers();
384        // Get the low-level peer stats.
385        let known_peers = self.tcp().known_peers().snapshot();
386
387        // Sort the prospect peers.
388        peers.sort_unstable_by_key(|peer| {
389            if let Some(peer_stats) = known_peers.get(&peer.listener_addr.ip()) {
390                // Prioritize greatest height, then lowest failure count.
391                (cmp::Reverse(peer.last_height_seen), peer_stats.failures())
392            } else {
393                // Unreachable; use an else-compatible dummy.
394                (cmp::Reverse(peer.last_height_seen), 0)
395            }
396        });
397        if let Some(max) = max_entries {
398            peers.truncate(max);
399        }
400
401        peers
402    }
403
404    /// Returns all connected peers that satisify the given predicate.
405    fn filter_connected_peers<P: FnMut(&ConnectedPeer<N>) -> bool>(&self, mut predicate: P) -> Vec<ConnectedPeer<N>> {
406        self.peer_pool()
407            .read()
408            .values()
409            .filter_map(|p| {
410                if let Peer::Connected(peer) = p
411                    && predicate(peer)
412                {
413                    Some(peer)
414                } else {
415                    None
416                }
417            })
418            .cloned()
419            .collect()
420    }
421
422    /// Returns the list of connected peers.
423    fn connected_peers(&self) -> Vec<SocketAddr> {
424        self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_connected().then_some(*addr)).collect()
425    }
426
427    /// Returns the list of trusted peers.
428    fn trusted_peers(&self) -> Vec<SocketAddr> {
429        self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_trusted().then_some(*addr)).collect()
430    }
431
432    /// Returns the list of candidate peers.
433    fn get_candidate_peers(&self) -> Vec<CandidatePeer> {
434        self.peer_pool()
435            .read()
436            .values()
437            .filter_map(|peer| if let Peer::Candidate(peer) = peer { Some(peer.clone()) } else { None })
438            .collect()
439    }
440
441    /// Returns the list of unconnected trusted peers.
442    fn unconnected_trusted_peers(&self) -> HashSet<SocketAddr> {
443        self.peer_pool()
444            .read()
445            .iter()
446            .filter_map(
447                |(addr, peer)| if let Peer::Candidate(peer) = peer { peer.trusted.then_some(*addr) } else { None },
448            )
449            .collect()
450    }
451
452    /// Loads any previously cached peer addresses so they can be introduced as initial
453    /// candidate peers to connect to.
454    fn load_cached_peers(storage_mode: &StorageMode, filename: &str) -> Result<Vec<SocketAddr>> {
455        let mut peer_cache_path = aleo_ledger_dir(N::ID, storage_mode);
456        peer_cache_path.push(filename);
457
458        let peers = match fs::read_to_string(&peer_cache_path) {
459            Ok(cached_peers_str) => {
460                let mut cached_peers = Vec::new();
461                for peer_addr_str in cached_peers_str.lines() {
462                    match SocketAddr::from_str(peer_addr_str) {
463                        Ok(addr) => cached_peers.push(addr),
464                        Err(error) => warn!("Couldn't parse the cached peer address '{peer_addr_str}': {error}"),
465                    }
466                }
467                cached_peers
468            }
469            Err(error) if error.kind() == io::ErrorKind::NotFound => {
470                // Not an issue - the cache may not exist yet.
471                Vec::new()
472            }
473            Err(error) => {
474                warn!("{} Couldn't load cached peers at {}: {error}", Self::OWNER, peer_cache_path.display());
475                Vec::new()
476            }
477        };
478
479        Ok(peers)
480    }
481
482    /// Preserve the peers who have the greatest known block heights, and the lowest
483    /// number of registered network failures.
484    fn save_best_peers(&self, storage_mode: &StorageMode, filename: &str, max_entries: Option<usize>) -> Result<()> {
485        // Collect all prospect peers.
486        let mut peers = self.get_peers();
487
488        // Get the low-level peer stats.
489        let known_peers = self.tcp().known_peers().snapshot();
490
491        // Sort the list of peers.
492        peers.sort_unstable_by_key(|peer| {
493            if let Some(peer_stats) = known_peers.get(&peer.listener_addr().ip()) {
494                // Prioritize greatest height, then lowest failure count.
495                (cmp::Reverse(peer.last_height_seen()), peer_stats.failures())
496            } else {
497                // Unreachable; use an else-compatible dummy.
498                (cmp::Reverse(peer.last_height_seen()), 0)
499            }
500        });
501        if let Some(max) = max_entries {
502            peers.truncate(max);
503        }
504
505        // Dump the connected peers to a file.
506        let mut path = aleo_ledger_dir(N::ID, storage_mode);
507        path.push(filename);
508        let mut file = fs::File::create(path)?;
509        for peer in peers {
510            writeln!(file, "{}", peer.listener_addr())?;
511        }
512
513        Ok(())
514    }
515
516    // Introduces a new connecting peer into the peer pool if unknown, or promotes
517    // a known candidate peer to a connecting one. The returned boolean indicates
518    // whether the peer has been added/promoted, or rejected due to already being
519    // shaken hands with or connected.
520    fn add_connecting_peer(&self, listener_addr: SocketAddr) -> bool {
521        match self.peer_pool().write().entry(listener_addr) {
522            Entry::Vacant(entry) => {
523                entry.insert(Peer::new_connecting(listener_addr, false));
524                true
525            }
526            Entry::Occupied(mut entry) if matches!(entry.get(), Peer::Candidate(_)) => {
527                entry.insert(Peer::new_connecting(listener_addr, entry.get().is_trusted()));
528                true
529            }
530            Entry::Occupied(_) => false,
531        }
532    }
533
534    /// Temporarily IP-ban and disconnect from the peer with the given listener address and an
535    /// optional reason for the ban. This also removes the peer from the candidate pool.
536    fn ip_ban_peer(&self, listener_addr: SocketAddr, reason: Option<&str>) {
537        let ip = listener_addr.ip();
538        debug!("IP-banning {ip}{}", reason.map(|r| format!(" reason: {r}")).unwrap_or_default());
539
540        // Insert/update the low-level IP ban list.
541        self.tcp().banned_peers().update_ip_ban(ip);
542
543        // Disconnect from the peer.
544        self.disconnect(listener_addr);
545        // Remove the peer from the pool.
546        self.remove_peer(listener_addr);
547    }
548
549    /// Check whether the given IP address is currently banned.
550    fn is_ip_banned(&self, ip: IpAddr) -> bool {
551        self.tcp().banned_peers().is_ip_banned(&ip)
552    }
553
554    /// Insert or update a banned IP.
555    fn update_ip_ban(&self, ip: IpAddr) {
556        self.tcp().banned_peers().update_ip_ban(ip);
557    }
558}