1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
// Copyright (c) 2019-2026 Provable Inc.
// This file is part of the snarkOS library.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{CandidatePeer, ConnectedPeer, ConnectionMode, NodeType, Peer, Resolver};
use snarkos_node_tcp::{ConnectError, P2P, is_bogon_ip, is_unspecified_or_broadcast_ip};
use snarkvm::prelude::{Address, Network};
use anyhow::Result;
#[cfg(feature = "locktick")]
use locktick::parking_lot::RwLock;
#[cfg(not(feature = "locktick"))]
use parking_lot::RwLock;
use std::{
cmp,
collections::{
HashSet,
hash_map::{Entry, HashMap},
},
fs,
io::{self, Write},
net::{IpAddr, SocketAddr},
path::Path,
str::FromStr,
time::Instant,
};
use tokio::task;
use tracing::*;
/// Application-level errors generated by the peering module.
/// This is never returned directly, but only as the payload for a `ConnectError`.
#[derive(Debug)]
pub enum PeeringError {
NoExternalPeersAllowed,
}
impl snarkos_node_tcp::ApplicationError for PeeringError {}
impl std::fmt::Display for PeeringError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NoExternalPeersAllowed => write!(f, "no untrusted peers allowed"),
}
}
}
pub trait PeerPoolHandling<N: Network>: P2P {
const OWNER: &str;
/// The maximum number of peers permitted to be stored in the peer pool.
const MAXIMUM_POOL_SIZE: usize;
/// The number of candidate peers to be removed from the pool once `MAXIMUM_POOL_SIZE` is reached.
/// It must be lower than `MAXIMUM_POOL_SIZE`.
const PEER_SLASHING_COUNT: usize;
/// Returns the mapping of all known peers (connected or otherwise), keyed by their public listener address.
fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>>;
/// Returns the resolver for translating between public listener addresses and connected addresses.
fn resolver(&self) -> &RwLock<Resolver<N>>;
/// Returns `true` if the owning node is in development mode.
fn is_dev(&self) -> bool;
/// Returns `true` if the node is in trusted peers only mode.
fn trusted_peers_only(&self) -> bool;
/// Returns the node type.
fn node_type(&self) -> NodeType;
/// Returns the listener address of this node.
fn local_ip(&self) -> SocketAddr {
self.tcp().listening_addr().expect("The TCP listener is not enabled")
}
/// Returns `true` if the given IP is this node.
fn is_local_ip(&self, addr: SocketAddr) -> bool {
addr == self.local_ip()
|| (addr.ip().is_unspecified() || addr.ip().is_loopback()) && addr.port() == self.local_ip().port()
}
/// Returns `true` if the given IP is not this node, is not a bogon address, and is not unspecified.
fn is_valid_peer_ip(&self, ip: SocketAddr) -> bool {
!self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
}
/// Returns the maximum number of connected peers.
fn max_connected_peers(&self) -> usize {
self.tcp().config().max_connections as usize
}
/// Ensure we can and are allowed to connect to the given listener address of a peer.
fn check_connection_attempt(&self, listener_addr: SocketAddr) -> Result<(), ConnectError> {
// Ensure the peer IP is not this node.
if self.is_local_ip(listener_addr) {
return Err(ConnectError::SelfConnect { address: listener_addr });
}
// Ensure the node does not surpass the maximum number of peer connections.
if self.number_of_connected_peers() >= self.max_connected_peers() {
return Err(ConnectError::MaximumConnectionsReached { limit: self.max_connected_peers() as u16 });
}
// Ensure the node is not already connected to this peer.
if self.is_connected(listener_addr) {
return Err(ConnectError::AlreadyConnected { address: listener_addr });
}
// Ensure the node is not already connecting to this peer.
if self.is_connecting(listener_addr) {
return Err(ConnectError::AlreadyConnecting { address: listener_addr });
}
// Ensure the peer IP is not banned.
if self.is_ip_banned(listener_addr.ip()) {
return Err(ConnectError::BannedIp { ip: listener_addr.ip() });
}
// If the node is in trusted peers only mode, ensure the peer is trusted.
if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
return Err(ConnectError::application(PeeringError::NoExternalPeersAllowed));
}
Ok(())
}
/// Attempts to connect to the given peer's listener address.
///
/// Returns an earlier error, if, for example, we are already connected to the peer.
/// Otherwise, it returns a handle to the tokio tasks that sets up the connection.
///
/// # Concurrency
/// Only one task may call this function for a given listener address at a time.
fn connect(&self, listener_addr: SocketAddr) -> Result<task::JoinHandle<Result<(), ConnectError>>, ConnectError> {
// Return early if the attempt is against the protocol rules.
self.check_connection_attempt(listener_addr)?;
// Update the last connection attempt time for the peer.
if let Some(Peer::Candidate(peer)) = self.peer_pool().write().get_mut(&listener_addr) {
peer.last_connection_attempt = Some(Instant::now());
peer.total_connection_attempts += 1;
} else {
warn!("{} No candidate peer entry exists for '{listener_addr:?}' while connecting.", Self::OWNER);
}
let tcp = self.tcp().clone();
Ok(tokio::spawn(async move {
debug!("{} Connecting to {listener_addr}...", Self::OWNER);
tcp.connect(listener_addr).await
}))
}
/// Disconnects from the given peer IP, if the peer is connected. The returned boolean
/// indicates whether the peer was actually disconnected from, or if this was a noop.
fn disconnect(&self, listener_addr: SocketAddr) -> task::JoinHandle<bool> {
if let Some(connected_addr) = self.resolve_to_ambiguous(listener_addr) {
let tcp = self.tcp().clone();
tokio::spawn(async move { tcp.disconnect(connected_addr).await })
} else {
tokio::spawn(async { false })
}
}
/// Downgrades a connected peer to candidate status.
///
/// Returns true if the peer was fully connected.
fn downgrade_peer_to_candidate(&self, listener_addr: SocketAddr) -> bool {
let mut peer_pool = self.peer_pool().write();
let Some(peer) = peer_pool.get_mut(&listener_addr) else {
trace!("{} Downgrade peer to candidate failed - peer not found", Self::OWNER);
return false;
};
if let Peer::Connected(conn_peer) = peer {
// Exception: the BootstrapClient only has a single Resolver,
// so it may only map a validator's Aleo address once, for its
// Gateway-mode connection. This also means that the Router-mode
// connection may not remove that mapping.
let aleo_addr = if self.node_type() == NodeType::BootstrapClient
&& conn_peer.connection_mode == ConnectionMode::Router
{
None
} else {
Some(conn_peer.aleo_addr)
};
self.resolver().write().remove_peer(conn_peer.connected_addr, aleo_addr);
peer.downgrade_to_candidate(listener_addr);
true
} else {
peer.downgrade_to_candidate(listener_addr);
false
}
}
/// Adds new candidate peers to the peer pool, ensuring their validity and following the
/// limit on the number of peers in the pool. The listener addresses may be paired with
/// the last known block height of the associated peer.
fn insert_candidate_peers(&self, mut listener_addrs: Vec<(SocketAddr, Option<u32>)>) {
let trusted_peers = self.trusted_peers();
// Hold a write guard from now on, so as not to accidentally slash multiple times
// based on multiple batches of candidate peers, and to not overwrite any entries.
let mut peer_pool = self.peer_pool().write();
// Perform filtering to ensure candidate validity. Also count how many entries are updates.
let mut num_updates: usize = 0;
listener_addrs.retain(|&(addr, height)| {
!self.is_ip_banned(addr.ip())
&& if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) }
&& peer_pool
.get(&addr)
.map(|peer| peer.is_candidate() && height.is_some())
.inspect(|is_valid_update| {
if *is_valid_update {
num_updates += 1
}
})
.unwrap_or(true)
});
// If we've managed to filter out every entry, there's nothing to do.
if listener_addrs.is_empty() {
return;
}
// If we're about to exceed the peer pool size limit, apply candidate slashing.
if peer_pool.len() + listener_addrs.len() - num_updates >= Self::MAXIMUM_POOL_SIZE
&& Self::PEER_SLASHING_COUNT != 0
{
// Collect the addresses of prospect peers.
let mut peers_to_slash = peer_pool
.iter()
.filter_map(|(addr, peer)| {
(matches!(peer, Peer::Candidate(_)) && !trusted_peers.contains(addr)).then_some(*addr)
})
.collect::<Vec<_>>();
// Get the low-level peer stats.
let known_peers = self.tcp().known_peers().snapshot();
// Sort the list of candidate peers by failure count (descending) and timestamp (ascending).
let default_value = (0, Instant::now());
peers_to_slash.sort_unstable_by_key(|addr| {
let (num_failures, last_seen) = known_peers
.get(&addr.ip())
.map(|stats| (stats.failures(), stats.timestamp()))
.unwrap_or(default_value);
(cmp::Reverse(num_failures), last_seen)
});
// Retain the candidate peers with the most failures and oldest timestamps.
peers_to_slash.truncate(Self::PEER_SLASHING_COUNT);
// Remove the peers to slash from the pool.
peer_pool.retain(|addr, _| !peers_to_slash.contains(addr));
// Remove the peers to slash from the low-level list of known peers.
self.tcp().known_peers().batch_remove(peers_to_slash.iter().map(|addr| addr.ip()));
}
// Make sure that we won't breach the pool size limit in case the slashing didn't suffice.
listener_addrs.truncate(Self::MAXIMUM_POOL_SIZE.saturating_sub(peer_pool.len()));
// If we've managed to truncate to 0, exit.
if listener_addrs.is_empty() {
return;
}
// Insert or update the applicable candidate peers.
for (addr, height) in listener_addrs {
match peer_pool.entry(addr) {
Entry::Vacant(entry) => {
entry.insert(Peer::new_candidate(addr, false));
}
Entry::Occupied(mut entry) => {
if let Peer::Candidate(peer) = entry.get_mut() {
peer.last_height_seen = height;
}
}
}
}
}
/// Completely removes an entry from the peer pool.
fn remove_peer(&self, listener_addr: SocketAddr) {
self.peer_pool().write().remove(&listener_addr);
}
/// Returns the connected peer address from the listener IP address.
fn resolve_to_ambiguous(&self, listener_addr: SocketAddr) -> Option<SocketAddr> {
if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
Some(peer.connected_addr)
} else {
None
}
}
/// Returns the connected peer aleo address from the listener IP address.
fn resolve_to_aleo_addr(&self, listener_addr: SocketAddr) -> Option<Address<N>> {
if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
Some(peer.aleo_addr)
} else {
None
}
}
/// Returns `true` if the node is connecting to the given peer's listener address.
fn is_connecting(&self, listener_addr: SocketAddr) -> bool {
self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting())
}
/// Returns `true` if the node is connected to the given peer listener address.
fn is_connected(&self, listener_addr: SocketAddr) -> bool {
self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connected())
}
/// Returns `true` if the node is connected to the given Aleo address.
fn is_connected_address(&self, aleo_address: Address<N>) -> bool {
// The resolver only contains data on connected peers.
self.resolver().read().get_peer_ip_for_address(aleo_address).is_some()
}
/// Returns `true` if the node is connected or connecting to the given peer listener address.
fn is_connecting_or_connected(&self, listener_addr: SocketAddr) -> bool {
self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting() || peer.is_connected())
}
/// Returns `true` if the given listener address is trusted.
fn is_trusted(&self, listener_addr: SocketAddr) -> bool {
self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_trusted())
}
/// Returns the number of all peers.
fn number_of_peers(&self) -> usize {
self.peer_pool().read().len()
}
/// Returns the number of connected peers.
fn number_of_connected_peers(&self) -> usize {
self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connected()).count()
}
/// Returns the number of connecting peers.
fn number_of_connecting_peers(&self) -> usize {
self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connecting()).count()
}
/// Returns the number of candidate peers.
fn number_of_candidate_peers(&self) -> usize {
self.peer_pool().read().values().filter(|peer| matches!(peer, Peer::Candidate(_))).count()
}
/// Returns the connected peer given the peer IP, if it exists.
fn get_connected_peer(&self, listener_addr: SocketAddr) -> Option<ConnectedPeer<N>> {
if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
Some(peer.clone())
} else {
None
}
}
/// Updates the connected peer - if it exists - given the peer IP and a closure.
/// The returned status indicates whether the update was successful, i.e. the peer had existed.
fn update_connected_peer<F: FnMut(&mut ConnectedPeer<N>)>(
&self,
listener_addr: &SocketAddr,
mut update_fn: F,
) -> bool {
if let Some(Peer::Connected(peer)) = self.peer_pool().write().get_mut(listener_addr) {
update_fn(peer);
true
} else {
false
}
}
/// Returns the list of all peers (connected, connecting, and candidate).
fn get_peers(&self) -> Vec<Peer<N>> {
self.peer_pool().read().values().cloned().collect()
}
/// Returns all connected peers.
fn get_connected_peers(&self) -> Vec<ConnectedPeer<N>> {
self.filter_connected_peers(|_| true)
}
/// Returns an optionally bounded list of all connected peers sorted by their
/// block height (highest first) and failure count (lowest first).
fn get_best_connected_peers(&self, max_entries: Option<usize>) -> Vec<ConnectedPeer<N>> {
// Get a snapshot of the currently connected peers.
let mut peers = self.get_connected_peers();
// Get the low-level peer stats.
let known_peers = self.tcp().known_peers().snapshot();
// Sort the prospect peers.
peers.sort_unstable_by_key(|peer| {
if let Some(peer_stats) = known_peers.get(&peer.listener_addr.ip()) {
// Prioritize greatest height, then lowest failure count.
(cmp::Reverse(peer.last_height_seen), peer_stats.failures())
} else {
// Unreachable; use an else-compatible dummy.
(cmp::Reverse(peer.last_height_seen), 0)
}
});
if let Some(max) = max_entries {
peers.truncate(max);
}
peers
}
/// Returns all connected peers that satisify the given predicate.
fn filter_connected_peers<P: FnMut(&ConnectedPeer<N>) -> bool>(&self, mut predicate: P) -> Vec<ConnectedPeer<N>> {
self.peer_pool()
.read()
.values()
.filter_map(|p| {
if let Peer::Connected(peer) = p
&& predicate(peer)
{
Some(peer)
} else {
None
}
})
.cloned()
.collect()
}
/// Returns the list of connected peers.
fn connected_peers(&self) -> Vec<SocketAddr> {
self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_connected().then_some(*addr)).collect()
}
/// Returns the list of trusted peers.
fn trusted_peers(&self) -> Vec<SocketAddr> {
self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_trusted().then_some(*addr)).collect()
}
/// Returns the list of candidate peers.
fn get_candidate_peers(&self) -> Vec<CandidatePeer> {
self.peer_pool()
.read()
.values()
.filter_map(|peer| if let Peer::Candidate(peer) = peer { Some(peer.clone()) } else { None })
.collect()
}
/// Returns the list of trusted candidate peers.
fn get_trusted_candidate_peers(&self) -> Vec<CandidatePeer> {
self.peer_pool()
.read()
.values()
.filter_map(|peer| {
if let Peer::Candidate(peer) = peer
&& peer.trusted
{
Some(peer.clone())
} else {
None
}
})
.collect()
}
/// Loads any previously cached peer addresses so they can be introduced as initial
/// candidate peers to connect to.
fn load_cached_peers(path: &Path) -> Result<Vec<SocketAddr>> {
let peers = match fs::read_to_string(path) {
Ok(cached_peers_str) => {
let mut cached_peers = Vec::new();
for peer_addr_str in cached_peers_str.lines() {
match SocketAddr::from_str(peer_addr_str) {
Ok(addr) => cached_peers.push(addr),
Err(error) => warn!("Couldn't parse the cached peer address '{peer_addr_str}': {error}"),
}
}
cached_peers
}
Err(error) if error.kind() == io::ErrorKind::NotFound => {
// Not an issue - the cache may not exist yet.
Vec::new()
}
Err(error) => {
warn!("{} Couldn't load cached peers at {}: {error}", Self::OWNER, path.display());
Vec::new()
}
};
Ok(peers)
}
/// Preserve the peers who have the greatest known block heights, and the lowest
/// number of registered network failures.
///
/// # Arguments
/// * `path` - The path to the file to save the peers to.
/// * `max_entries` - The maximum number of peers to save (if there are more, the extra ones are truncated).
/// * `store_ports` - Whether to store the ports of the peers, or just the IP addresses.
fn save_best_peers(&self, path: &Path, max_entries: Option<usize>, store_ports: bool) -> Result<()> {
// Collect all prospect peers.
let mut peers = self.get_peers();
// Get the low-level peer stats.
let known_peers = self.tcp().known_peers().snapshot();
// Sort the list of peers.
peers.sort_unstable_by_key(|peer| {
if let Some(peer_stats) = known_peers.get(&peer.listener_addr().ip()) {
// Prioritize greatest height, then lowest failure count.
(cmp::Reverse(peer.last_height_seen()), peer_stats.failures())
} else {
// Unreachable; use an else-compatible dummy.
(cmp::Reverse(peer.last_height_seen()), 0)
}
});
if let Some(max) = max_entries {
peers.truncate(max);
}
// Dump the connected and deduplicated peers to a file.
let addrs: HashSet<_> = peers
.iter()
.map(
|peer| {
if store_ports { peer.listener_addr().to_string() } else { peer.listener_addr().ip().to_string() }
},
)
.collect();
let mut file = fs::File::create(path)?;
for addr in addrs {
writeln!(file, "{addr}")?;
}
Ok(())
}
// Introduces a new connecting peer into the peer pool if unknown, or promotes
// a known candidate peer to a connecting one. The returned boolean indicates
// whether the peer has been added/promoted, or rejected due to already being
// shaken hands with or connected.
fn add_connecting_peer(&self, listener_addr: SocketAddr) -> Result<(), ConnectError> {
match self.peer_pool().write().entry(listener_addr) {
Entry::Vacant(entry) => {
entry.insert(Peer::new_connecting(listener_addr, false));
Ok(())
}
Entry::Occupied(mut entry) => match entry.get() {
peer @ Peer::Candidate(_) => {
entry.insert(Peer::new_connecting(listener_addr, peer.is_trusted()));
Ok(())
}
Peer::Connecting(_) => Err(ConnectError::AlreadyConnecting { address: listener_addr }),
Peer::Connected(_) => Err(ConnectError::AlreadyConnected { address: listener_addr }),
},
}
}
/// Temporarily IP-ban and disconnect from the peer with the given listener address and an
/// optional reason for the ban. This also removes the peer from the candidate pool.
fn ip_ban_peer(&self, listener_addr: SocketAddr, reason: Option<&str>) {
// Ignore IP-banning if we are in dev mode.
if self.is_dev() {
return;
}
let ip = listener_addr.ip();
debug!("IP-banning {ip}{}", reason.map(|r| format!(" reason: {r}")).unwrap_or_default());
// Insert/update the low-level IP ban list.
self.tcp().banned_peers().update_ip_ban(ip);
// Disconnect from the peer.
self.disconnect(listener_addr);
// Remove the peer from the pool.
self.remove_peer(listener_addr);
}
/// Check whether the given IP address is currently banned.
fn is_ip_banned(&self, ip: IpAddr) -> bool {
self.tcp().banned_peers().is_ip_banned(&ip)
}
/// Insert or update a banned IP.
fn update_ip_ban(&self, ip: IpAddr) {
self.tcp().banned_peers().update_ip_ban(ip);
}
}