Skip to main content

snarkos_node_network/
peering.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{CandidatePeer, ConnectedPeer, ConnectionMode, NodeType, Peer, Resolver};
17
18#[cfg(test)]
19mod tests {
20    use super::*;
21    use crate::Peer;
22    use snarkos_node_tcp::{Config, P2P, Tcp};
23    use snarkvm::{prelude::Rng, utilities::TestRng};
24
25    use std::{collections::HashMap, net::SocketAddr, time::Instant};
26
27    type CurrentNetwork = snarkvm::prelude::MainnetV0;
28
29    struct MockPeerPool<N: Network> {
30        tcp: Tcp,
31        peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
32        resolver: RwLock<Resolver<N>>,
33    }
34
35    impl<N: Network> MockPeerPool<N> {
36        fn new() -> Self {
37            let config = Config { listener_ip: None, ..Default::default() };
38            Self { tcp: Tcp::new(config), peer_pool: Default::default(), resolver: Default::default() }
39        }
40    }
41
42    impl<N: Network> P2P for MockPeerPool<N> {
43        fn tcp(&self) -> &Tcp {
44            &self.tcp
45        }
46    }
47
48    impl<N: Network> PeerPoolHandling<N> for MockPeerPool<N> {
49        const MAXIMUM_POOL_SIZE: usize = 100;
50        const OWNER: &str = "MockPeerPool";
51        const PEER_SLASHING_COUNT: usize = 10;
52
53        fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
54            &self.peer_pool
55        }
56
57        fn resolver(&self) -> &RwLock<Resolver<N>> {
58            &self.resolver
59        }
60
61        fn is_dev(&self) -> bool {
62            false
63        }
64
65        fn trusted_peers_only(&self) -> bool {
66            false
67        }
68
69        fn node_type(&self) -> NodeType {
70            NodeType::Client
71        }
72    }
73
74    fn make_connected_peer(port: u16, node_type: NodeType, rng: &mut TestRng) -> (SocketAddr, Peer<CurrentNetwork>) {
75        use snarkvm::prelude::Address;
76        let listener_addr = SocketAddr::from(([127, 0, 0, 1], port));
77        let connected_addr = SocketAddr::from(([127, 0, 0, 1], port + 10000));
78        let now = Instant::now();
79        let peer = Peer::Connected(ConnectedPeer {
80            listener_addr,
81            connected_addr,
82            connection_mode: ConnectionMode::Router,
83            trusted: false,
84            aleo_addr: Address::<CurrentNetwork>::new(rng.random()),
85            node_type,
86            version: 1,
87            snarkos_sha: None,
88            last_height_seen: None,
89            first_seen: now,
90            last_seen: now,
91        });
92        (listener_addr, peer)
93    }
94
95    #[test]
96    fn test_peer_state_transitions() {
97        use snarkvm::prelude::Address;
98
99        let pool = MockPeerPool::<CurrentNetwork>::new();
100        let mut rng = TestRng::default();
101
102        let listener_addr = SocketAddr::from(([192, 0, 2, 1], 4000));
103        let connected_addr = SocketAddr::from(([192, 0, 2, 1], 14000));
104        let aleo_addr = Address::<CurrentNetwork>::new(rng.random());
105
106        // Step 1: insert as a candidate.
107        pool.peer_pool().write().insert(listener_addr, Peer::new_candidate(listener_addr, false));
108
109        assert_eq!(pool.number_of_candidate_peers(), 1);
110        assert_eq!(pool.number_of_connecting_peers(), Some(0));
111        assert_eq!(pool.number_of_connected_peers(), 0);
112        assert!(!pool.is_connecting(listener_addr));
113        assert!(!pool.is_connected(listener_addr));
114
115        // Step 2: promote to connecting.
116        assert!(pool.add_connecting_peer(listener_addr).is_ok());
117
118        assert_eq!(pool.number_of_candidate_peers(), 0);
119        assert_eq!(pool.number_of_connecting_peers(), Some(1));
120        assert_eq!(pool.number_of_connected_peers(), 0);
121        assert!(pool.is_connecting(listener_addr));
122        assert!(!pool.is_connected(listener_addr));
123
124        // Step 3: complete the handshake — upgrade to connected.
125        pool.peer_pool().write().get_mut(&listener_addr).unwrap().upgrade_to_connected(
126            connected_addr,
127            listener_addr.port(),
128            aleo_addr,
129            NodeType::Validator,
130            1,
131            None,
132            ConnectionMode::Router,
133        );
134
135        assert_eq!(pool.number_of_candidate_peers(), 0);
136        assert_eq!(pool.number_of_connecting_peers(), Some(0));
137        assert_eq!(pool.number_of_connected_peers(), 1);
138        assert!(!pool.is_connecting(listener_addr));
139        assert!(pool.is_connected(listener_addr));
140        assert_eq!(pool.number_of_connected_validators(), Some(1));
141
142        // Verify the connected peer's fields.
143        let connected = pool.get_connected_peer(listener_addr).expect("peer should be connected");
144        assert_eq!(connected.listener_addr, listener_addr);
145        assert_eq!(connected.connected_addr, connected_addr);
146        assert_eq!(connected.aleo_addr, aleo_addr);
147        assert_eq!(connected.node_type, NodeType::Validator);
148    }
149
150    #[test]
151    fn test_number_of_connected_validators() {
152        let pool = MockPeerPool::<CurrentNetwork>::new();
153        let mut rng = TestRng::default();
154
155        // Empty pool: no validators.
156        assert_eq!(pool.number_of_connected_validators(), Some(0));
157
158        // Insert 2 validators and 1 client.
159        let (addr1, peer1) = make_connected_peer(3000, NodeType::Validator, &mut rng);
160        let (addr2, peer2) = make_connected_peer(3001, NodeType::Validator, &mut rng);
161        let (addr3, peer3) = make_connected_peer(3002, NodeType::Client, &mut rng);
162        {
163            let mut pool_write = pool.peer_pool().write();
164            pool_write.insert(addr1, peer1);
165            pool_write.insert(addr2, peer2);
166            pool_write.insert(addr3, peer3);
167        }
168
169        assert_eq!(pool.number_of_connected_validators(), Some(2));
170        assert_eq!(pool.number_of_connected_peers(), 3);
171
172        // A candidate peer should not be counted as a validator.
173        let candidate_addr = SocketAddr::from(([127, 0, 0, 1], 3003));
174        pool.peer_pool().write().insert(candidate_addr, Peer::new_candidate(candidate_addr, false));
175
176        assert_eq!(pool.number_of_connected_validators(), Some(2));
177        assert_eq!(pool.number_of_connected_peers(), 3);
178    }
179}
180
181use snarkos_node_tcp::{ConnectError, P2P, is_bogon_ip, is_unspecified_or_broadcast_ip};
182use snarkvm::prelude::{Address, Network};
183
184use anyhow::Result;
185#[cfg(feature = "locktick")]
186use locktick::parking_lot::RwLock;
187#[cfg(not(feature = "locktick"))]
188use parking_lot::RwLock;
189use std::{
190    cmp,
191    collections::{
192        HashSet,
193        hash_map::{Entry, HashMap},
194    },
195    fs,
196    io::{self, Write},
197    net::{IpAddr, SocketAddr},
198    path::Path,
199    str::FromStr,
200    time::Instant,
201};
202use tokio::task;
203use tracing::*;
204
205/// Application-level errors generated by the peering module.
206/// This is never returned directly, but only as the payload for a `ConnectError`.
207#[derive(Debug)]
208pub enum PeeringError {
209    NoExternalPeersAllowed,
210}
211
212impl snarkos_node_tcp::ApplicationError for PeeringError {}
213
214impl std::fmt::Display for PeeringError {
215    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216        match self {
217            Self::NoExternalPeersAllowed => write!(f, "no untrusted peers allowed"),
218        }
219    }
220}
221
222pub trait PeerPoolHandling<N: Network>: P2P {
223    const OWNER: &str;
224
225    /// The maximum number of peers permitted to be stored in the peer pool.
226    const MAXIMUM_POOL_SIZE: usize;
227
228    /// The number of candidate peers to be removed from the pool once `MAXIMUM_POOL_SIZE` is reached.
229    /// It must be lower than `MAXIMUM_POOL_SIZE`.
230    const PEER_SLASHING_COUNT: usize;
231
232    /// Returns the mapping of all known peers (connected or otherwise), keyed by their public listener address.
233    fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>>;
234
235    /// Returns the resolver for translating between public listener addresses and connected addresses.
236    fn resolver(&self) -> &RwLock<Resolver<N>>;
237
238    /// Returns `true` if the owning node is in development mode.
239    fn is_dev(&self) -> bool;
240
241    /// Returns `true` if the node is in trusted peers only mode.
242    fn trusted_peers_only(&self) -> bool;
243
244    /// Returns the node type.
245    fn node_type(&self) -> NodeType;
246
247    /// Returns the listener address of this node.
248    fn local_ip(&self) -> SocketAddr {
249        self.tcp().listening_addr().expect("The TCP listener is not enabled")
250    }
251
252    /// Returns `true` if the given IP is this node.
253    fn is_local_ip(&self, addr: SocketAddr) -> bool {
254        addr == self.local_ip()
255            || (addr.ip().is_unspecified() || addr.ip().is_loopback()) && addr.port() == self.local_ip().port()
256    }
257
258    /// Returns `true` if the given IP is not this node, is not a bogon address, and is not unspecified.
259    fn is_valid_peer_ip(&self, ip: SocketAddr) -> bool {
260        !self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
261    }
262
263    /// Returns the maximum number of connected peers.
264    fn max_connected_peers(&self) -> usize {
265        self.tcp().config().max_connections as usize
266    }
267
268    /// Ensure we can and are allowed to connect to the given listener address of a peer.
269    fn check_connection_attempt(&self, listener_addr: SocketAddr) -> Result<(), ConnectError> {
270        // Ensure the peer IP is not this node.
271        if self.is_local_ip(listener_addr) {
272            return Err(ConnectError::SelfConnect { address: listener_addr });
273        }
274        // Ensure the node does not surpass the maximum number of peer connections.
275        if self.number_of_connected_peers() >= self.max_connected_peers() {
276            return Err(ConnectError::MaximumConnectionsReached { limit: self.max_connected_peers() as u16 });
277        }
278        // Ensure the node is not already connected to this peer.
279        if self.is_connected(listener_addr) {
280            return Err(ConnectError::AlreadyConnected { address: listener_addr });
281        }
282        // Ensure the node is not already connecting to this peer.
283        if self.is_connecting(listener_addr) {
284            return Err(ConnectError::AlreadyConnecting { address: listener_addr });
285        }
286        // Ensure the peer IP is not banned.
287        if self.is_ip_banned(listener_addr.ip()) {
288            return Err(ConnectError::BannedIp { ip: listener_addr.ip() });
289        }
290        // If the node is in trusted peers only mode, ensure the peer is trusted.
291        if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
292            return Err(ConnectError::application(PeeringError::NoExternalPeersAllowed));
293        }
294
295        Ok(())
296    }
297
298    /// Attempts to connect to the given peer's listener address.
299    ///
300    /// Returns an earlier error, if, for example, we are already connected to the peer.
301    /// Otherwise, it returns a handle to the tokio tasks that sets up the connection.
302    ///
303    /// # Concurrency
304    /// Only one task may call this function for a given listener address at a time.
305    fn connect(&self, listener_addr: SocketAddr) -> Result<task::JoinHandle<Result<(), ConnectError>>, ConnectError> {
306        // Return early if the attempt is against the protocol rules.
307        self.check_connection_attempt(listener_addr)?;
308
309        // Update the last connection attempt time for the peer.
310        if let Some(Peer::Candidate(peer)) = self.peer_pool().write().get_mut(&listener_addr) {
311            peer.last_connection_attempt = Some(Instant::now());
312            peer.total_connection_attempts += 1;
313        } else {
314            warn!("{} No candidate peer entry exists for '{listener_addr:?}' while connecting.", Self::OWNER);
315        }
316
317        let tcp = self.tcp().clone();
318        Ok(tokio::spawn(async move {
319            debug!("{} Connecting to {listener_addr}...", Self::OWNER);
320            tcp.connect(listener_addr).await
321        }))
322    }
323
324    /// Disconnects from the given peer IP, if the peer is connected. The returned boolean
325    /// indicates whether the peer was actually disconnected from, or if this was a noop.
326    fn disconnect(&self, listener_addr: SocketAddr) -> task::JoinHandle<bool> {
327        if let Some(connected_addr) = self.resolve_to_ambiguous(listener_addr) {
328            let tcp = self.tcp().clone();
329            tokio::spawn(async move { tcp.disconnect(connected_addr).await })
330        } else {
331            tokio::spawn(async { false })
332        }
333    }
334
335    /// Downgrades a connected peer to candidate status.
336    ///
337    /// Returns true if the peer was fully connected.
338    fn downgrade_peer_to_candidate(&self, listener_addr: SocketAddr) -> bool {
339        let mut peer_pool = self.peer_pool().write();
340        let Some(peer) = peer_pool.get_mut(&listener_addr) else {
341            trace!("{} Downgrade peer to candidate failed - peer not found", Self::OWNER);
342            return false;
343        };
344
345        if let Peer::Connected(conn_peer) = peer {
346            // Exception: the BootstrapClient only has a single Resolver,
347            // so it may only map a validator's Aleo address once, for its
348            // Gateway-mode connection. This also means that the Router-mode
349            // connection may not remove that mapping.
350            let aleo_addr = if self.node_type() == NodeType::BootstrapClient
351                && conn_peer.connection_mode == ConnectionMode::Router
352            {
353                None
354            } else {
355                Some(conn_peer.aleo_addr)
356            };
357            self.resolver().write().remove_peer(conn_peer.connected_addr, aleo_addr);
358            peer.downgrade_to_candidate(listener_addr);
359            true
360        } else {
361            peer.downgrade_to_candidate(listener_addr);
362            false
363        }
364    }
365
366    /// Adds new candidate peers to the peer pool, ensuring their validity and following the
367    /// limit on the number of peers in the pool. The listener addresses may be paired with
368    /// the last known block height of the associated peer.
369    fn insert_candidate_peers(&self, mut listener_addrs: Vec<(SocketAddr, Option<u32>)>) {
370        let trusted_peers = self.trusted_peers();
371
372        // Hold a write guard from now on, so as not to accidentally slash multiple times
373        // based on multiple batches of candidate peers, and to not overwrite any entries.
374        let mut peer_pool = self.peer_pool().write();
375
376        // Perform filtering to ensure candidate validity. Also count how many entries are updates.
377        let mut num_updates: usize = 0;
378        listener_addrs.retain(|&(addr, height)| {
379            !self.is_ip_banned(addr.ip())
380                && if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) }
381                && peer_pool
382                    .get(&addr)
383                    .map(|peer| peer.is_candidate() && height.is_some())
384                    .inspect(|is_valid_update| {
385                        if *is_valid_update {
386                            num_updates += 1
387                        }
388                    })
389                    .unwrap_or(true)
390        });
391
392        // If we've managed to filter out every entry, there's nothing to do.
393        if listener_addrs.is_empty() {
394            return;
395        }
396
397        // If we're about to exceed the peer pool size limit, apply candidate slashing.
398        if peer_pool.len() + listener_addrs.len() - num_updates >= Self::MAXIMUM_POOL_SIZE
399            && Self::PEER_SLASHING_COUNT != 0
400        {
401            // Collect the addresses of prospect peers.
402            let mut peers_to_slash = peer_pool
403                .iter()
404                .filter_map(|(addr, peer)| {
405                    (matches!(peer, Peer::Candidate(_)) && !trusted_peers.contains(addr)).then_some(*addr)
406                })
407                .collect::<Vec<_>>();
408
409            // Get the low-level peer stats.
410            let known_peers = self.tcp().known_peers().snapshot();
411
412            // Sort the list of candidate peers by failure count (descending) and timestamp (ascending).
413            let default_value = (0, Instant::now());
414            peers_to_slash.sort_unstable_by_key(|addr| {
415                let (num_failures, last_seen) = known_peers
416                    .get(&addr.ip())
417                    .map(|stats| (stats.failures(), stats.timestamp()))
418                    .unwrap_or(default_value);
419                (cmp::Reverse(num_failures), last_seen)
420            });
421
422            // Retain the candidate peers with the most failures and oldest timestamps.
423            peers_to_slash.truncate(Self::PEER_SLASHING_COUNT);
424
425            // Remove the peers to slash from the pool.
426            peer_pool.retain(|addr, _| !peers_to_slash.contains(addr));
427
428            // Remove the peers to slash from the low-level list of known peers.
429            self.tcp().known_peers().batch_remove(peers_to_slash.iter().map(|addr| addr.ip()));
430        }
431
432        // Make sure that we won't breach the pool size limit in case the slashing didn't suffice.
433        listener_addrs.truncate(Self::MAXIMUM_POOL_SIZE.saturating_sub(peer_pool.len()));
434
435        // If we've managed to truncate to 0, exit.
436        if listener_addrs.is_empty() {
437            return;
438        }
439
440        // Insert or update the applicable candidate peers.
441        for (addr, height) in listener_addrs {
442            match peer_pool.entry(addr) {
443                Entry::Vacant(entry) => {
444                    entry.insert(Peer::new_candidate(addr, false));
445                }
446                Entry::Occupied(mut entry) => {
447                    if let Peer::Candidate(peer) = entry.get_mut() {
448                        peer.last_height_seen = height;
449                    }
450                }
451            }
452        }
453    }
454
455    /// Completely removes an entry from the peer pool.
456    fn remove_peer(&self, listener_addr: SocketAddr) {
457        self.peer_pool().write().remove(&listener_addr);
458    }
459
460    /// Returns the connected peer address from the listener IP address.
461    fn resolve_to_ambiguous(&self, listener_addr: SocketAddr) -> Option<SocketAddr> {
462        if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
463            Some(peer.connected_addr)
464        } else {
465            None
466        }
467    }
468
469    /// Returns the connected peer aleo address from the listener IP address.
470    fn resolve_to_aleo_addr(&self, listener_addr: SocketAddr) -> Option<Address<N>> {
471        if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
472            Some(peer.aleo_addr)
473        } else {
474            None
475        }
476    }
477
478    /// Returns `true` if the node is connecting to the given peer's listener address.
479    fn is_connecting(&self, listener_addr: SocketAddr) -> bool {
480        self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting())
481    }
482
483    /// Returns `true` if the node is connected to the given peer listener address.
484    fn is_connected(&self, listener_addr: SocketAddr) -> bool {
485        self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connected())
486    }
487
488    /// Returns `true` if the node is connected to the given Aleo address.
489    fn is_connected_address(&self, aleo_address: Address<N>) -> bool {
490        // The resolver only contains data on connected peers.
491        self.resolver().read().get_peer_ip_for_address(aleo_address).is_some()
492    }
493
494    /// Returns `true` if the node is connected or connecting to the given peer listener address.
495    fn is_connecting_or_connected(&self, listener_addr: SocketAddr) -> bool {
496        self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting() || peer.is_connected())
497    }
498
499    /// Returns `true` if the given listener address is trusted.
500    fn is_trusted(&self, listener_addr: SocketAddr) -> bool {
501        self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_trusted())
502    }
503
504    /// Returns the number of all peers.
505    fn number_of_peers(&self) -> usize {
506        self.peer_pool().read().len()
507    }
508
509    /// Returns the number of connected peers.
510    fn number_of_connected_peers(&self) -> usize {
511        self.peer_pool().read().values().filter(|peer| peer.is_connected()).count()
512    }
513
514    /// Returns the number of connected validators.
515    #[cfg(feature = "metrics")]
516    fn number_of_connected_validators(&self) -> Option<usize> {
517        Some(
518            self.peer_pool()
519                .try_read()?
520                .values()
521                .filter(|peer| peer.as_connected().is_some_and(|peer| peer.is_validator()))
522                .count(),
523        )
524    }
525
526    /// Returns the number of connecting peers.
527    #[cfg(feature = "metrics")]
528    fn number_of_connecting_peers(&self) -> Option<usize> {
529        Some(self.peer_pool().try_read()?.values().filter(|peer| peer.is_connecting()).count())
530    }
531
532    /// Returns the number of candidate peers.
533    fn number_of_candidate_peers(&self) -> usize {
534        self.peer_pool().read().values().filter(|peer| matches!(peer, Peer::Candidate(_))).count()
535    }
536
537    /// Returns the connected peer given the peer IP, if it exists.
538    fn get_connected_peer(&self, listener_addr: SocketAddr) -> Option<ConnectedPeer<N>> {
539        if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
540            Some(peer.clone())
541        } else {
542            None
543        }
544    }
545
546    /// Updates the connected peer - if it exists -  given the peer IP and a closure.
547    /// The returned status indicates whether the update was successful, i.e. the peer had existed.
548    fn update_connected_peer<F: FnMut(&mut ConnectedPeer<N>)>(
549        &self,
550        listener_addr: &SocketAddr,
551        mut update_fn: F,
552    ) -> bool {
553        if let Some(Peer::Connected(peer)) = self.peer_pool().write().get_mut(listener_addr) {
554            update_fn(peer);
555            true
556        } else {
557            false
558        }
559    }
560
561    /// Returns the list of all peers (connected, connecting, and candidate).
562    fn get_peers(&self) -> Vec<Peer<N>> {
563        self.peer_pool().read().values().cloned().collect()
564    }
565
566    /// Returns all connected peers.
567    fn get_connected_peers(&self) -> Vec<ConnectedPeer<N>> {
568        self.filter_connected_peers(|_| true)
569    }
570
571    /// Returns an optionally bounded list of all connected peers sorted by their
572    /// block height (highest first) and failure count (lowest first).
573    fn get_best_connected_peers(&self, max_entries: Option<usize>) -> Vec<ConnectedPeer<N>> {
574        // Get a snapshot of the currently connected peers.
575        let mut peers = self.get_connected_peers();
576        // Get the low-level peer stats.
577        let known_peers = self.tcp().known_peers().snapshot();
578
579        // Sort the prospect peers.
580        peers.sort_unstable_by_key(|peer| {
581            if let Some(peer_stats) = known_peers.get(&peer.listener_addr.ip()) {
582                // Prioritize greatest height, then lowest failure count.
583                (cmp::Reverse(peer.last_height_seen), peer_stats.failures())
584            } else {
585                // Unreachable; use an else-compatible dummy.
586                (cmp::Reverse(peer.last_height_seen), 0)
587            }
588        });
589        if let Some(max) = max_entries {
590            peers.truncate(max);
591        }
592
593        peers
594    }
595
596    /// Returns all connected peers that satisify the given predicate.
597    fn filter_connected_peers<P: FnMut(&ConnectedPeer<N>) -> bool>(&self, mut predicate: P) -> Vec<ConnectedPeer<N>> {
598        self.peer_pool()
599            .read()
600            .values()
601            .filter_map(|p| {
602                if let Peer::Connected(peer) = p
603                    && predicate(peer)
604                {
605                    Some(peer)
606                } else {
607                    None
608                }
609            })
610            .cloned()
611            .collect()
612    }
613
614    /// Returns the list of connected peers.
615    fn connected_peers(&self) -> Vec<SocketAddr> {
616        self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_connected().then_some(*addr)).collect()
617    }
618
619    /// Returns the list of trusted peers.
620    fn trusted_peers(&self) -> Vec<SocketAddr> {
621        self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_trusted().then_some(*addr)).collect()
622    }
623
624    /// Returns the list of candidate peers.
625    fn get_candidate_peers(&self) -> Vec<CandidatePeer> {
626        self.peer_pool()
627            .read()
628            .values()
629            .filter_map(|peer| if let Peer::Candidate(peer) = peer { Some(peer.clone()) } else { None })
630            .collect()
631    }
632
633    /// Returns the list of trusted candidate peers.
634    fn get_trusted_candidate_peers(&self) -> Vec<CandidatePeer> {
635        self.peer_pool()
636            .read()
637            .values()
638            .filter_map(|peer| {
639                if let Peer::Candidate(peer) = peer
640                    && peer.trusted
641                {
642                    Some(peer.clone())
643                } else {
644                    None
645                }
646            })
647            .collect()
648    }
649
650    /// Loads any previously cached peer addresses so they can be introduced as initial
651    /// candidate peers to connect to.
652    fn load_cached_peers(path: &Path) -> Result<Vec<SocketAddr>> {
653        let peers = match fs::read_to_string(path) {
654            Ok(cached_peers_str) => {
655                let mut cached_peers = Vec::new();
656                for peer_addr_str in cached_peers_str.lines() {
657                    match SocketAddr::from_str(peer_addr_str) {
658                        Ok(addr) => cached_peers.push(addr),
659                        Err(error) => warn!("Couldn't parse the cached peer address '{peer_addr_str}': {error}"),
660                    }
661                }
662                cached_peers
663            }
664            Err(error) if error.kind() == io::ErrorKind::NotFound => {
665                // Not an issue - the cache may not exist yet.
666                Vec::new()
667            }
668            Err(error) => {
669                warn!("{} Couldn't load cached peers at {}: {error}", Self::OWNER, path.display());
670                Vec::new()
671            }
672        };
673
674        Ok(peers)
675    }
676
677    /// Preserve the peers who have the greatest known block heights, and the lowest
678    /// number of registered network failures.
679    ///
680    /// # Arguments
681    /// * `path` - The path to the file to save the peers to.
682    /// * `max_entries` - The maximum number of peers to save (if there are more, the extra ones are truncated).
683    /// * `store_ports` - Whether to store the ports of the peers, or just the IP addresses.
684    fn save_best_peers(&self, path: &Path, max_entries: Option<usize>, store_ports: bool) -> Result<()> {
685        // Collect all prospect peers.
686        let mut peers = self.get_peers();
687
688        // Get the low-level peer stats.
689        let known_peers = self.tcp().known_peers().snapshot();
690
691        // Sort the list of peers.
692        peers.sort_unstable_by_key(|peer| {
693            if let Some(peer_stats) = known_peers.get(&peer.listener_addr().ip()) {
694                // Prioritize greatest height, then lowest failure count.
695                (cmp::Reverse(peer.last_height_seen()), peer_stats.failures())
696            } else {
697                // Unreachable; use an else-compatible dummy.
698                (cmp::Reverse(peer.last_height_seen()), 0)
699            }
700        });
701        if let Some(max) = max_entries {
702            peers.truncate(max);
703        }
704
705        // Dump the connected and deduplicated peers to a file.
706        let addrs: HashSet<_> = peers
707            .iter()
708            .map(
709                |peer| {
710                    if store_ports { peer.listener_addr().to_string() } else { peer.listener_addr().ip().to_string() }
711                },
712            )
713            .collect();
714
715        let mut file = fs::File::create(path)?;
716        for addr in addrs {
717            writeln!(file, "{addr}")?;
718        }
719
720        Ok(())
721    }
722
723    // Introduces a new connecting peer into the peer pool if unknown, or promotes
724    // a known candidate peer to a connecting one. The returned boolean indicates
725    // whether the peer has been added/promoted, or rejected due to already being
726    // shaken hands with or connected.
727    fn add_connecting_peer(&self, listener_addr: SocketAddr) -> Result<(), ConnectError> {
728        match self.peer_pool().write().entry(listener_addr) {
729            Entry::Vacant(entry) => {
730                entry.insert(Peer::new_connecting(listener_addr, false));
731                Ok(())
732            }
733            Entry::Occupied(mut entry) => match entry.get() {
734                peer @ Peer::Candidate(_) => {
735                    entry.insert(Peer::new_connecting(listener_addr, peer.is_trusted()));
736                    Ok(())
737                }
738                Peer::Connecting(_) => Err(ConnectError::AlreadyConnecting { address: listener_addr }),
739                Peer::Connected(_) => Err(ConnectError::AlreadyConnected { address: listener_addr }),
740            },
741        }
742    }
743
744    /// Temporarily IP-ban and disconnect from the peer with the given listener address and an
745    /// optional reason for the ban. This also removes the peer from the candidate pool.
746    fn ip_ban_peer(&self, listener_addr: SocketAddr, reason: Option<&str>) {
747        // Ignore IP-banning if we are in dev mode.
748        if self.is_dev() {
749            return;
750        }
751
752        let ip = listener_addr.ip();
753        debug!("IP-banning {ip}{}", reason.map(|r| format!(" reason: {r}")).unwrap_or_default());
754
755        // Insert/update the low-level IP ban list.
756        self.tcp().banned_peers().update_ip_ban(ip);
757
758        // Disconnect from the peer.
759        self.disconnect(listener_addr);
760        // Remove the peer from the pool.
761        self.remove_peer(listener_addr);
762    }
763
764    /// Check whether the given IP address is currently banned.
765    fn is_ip_banned(&self, ip: IpAddr) -> bool {
766        self.tcp().banned_peers().is_ip_banned(&ip)
767    }
768
769    /// Insert or update a banned IP.
770    fn update_ip_ban(&self, ip: IpAddr) {
771        self.tcp().banned_peers().update_ip_ban(ip);
772    }
773}