1#![cfg(feature = "full")]
5
6use std::{mem::take, sync::Arc};
7
8use hashbrown::{HashMap, HashSet};
9use libp2p::{Multiaddr, PeerId};
10use tokio::sync::RwLock;
11
12use super::{
13 error::Error,
14 info::{PeerInfo, PeerRelation},
15};
16use crate::{alias, config::Peer, init::global, swarm::protocols::iota_gossip::GossipSender};
17
18const REMOTE_PEERS_INITIAL_CAP: usize = 8;
19const LOCAL_ADDRS_INITIAL_CAP: usize = 4;
20
21#[derive(Debug, Clone)]
23pub struct PeerListWrapper(pub Arc<RwLock<PeerList>>);
24
25impl PeerListWrapper {
26 pub fn new(peerlist: PeerList) -> Self {
27 Self(Arc::new(RwLock::new(peerlist)))
28 }
29}
30
31#[derive(Debug)]
32pub struct PeerList {
33 local_id: PeerId,
34 local_addrs: HashSet<Multiaddr>,
35 peers: HashMap<PeerId, (PeerInfo, PeerState, PeerMetrics)>,
36 banned_peers: HashSet<PeerId>,
37 banned_addrs: HashSet<Multiaddr>,
38}
39
40impl PeerList {
41 pub fn new(local_id: PeerId) -> Self {
42 Self {
43 local_id,
44 local_addrs: HashSet::with_capacity(LOCAL_ADDRS_INITIAL_CAP),
45 peers: HashMap::with_capacity(REMOTE_PEERS_INITIAL_CAP),
46 banned_peers: HashSet::default(),
47 banned_addrs: HashSet::default(),
48 }
49 }
50
51 pub fn from_peers(local_id: PeerId, peers: Vec<Peer>) -> Self {
52 let mut p = HashMap::with_capacity(REMOTE_PEERS_INITIAL_CAP);
53
54 p.extend(peers.into_iter().map(|peer| {
55 let peer_id = peer.peer_id;
57 (
58 peer_id,
59 (
60 PeerInfo {
61 address: peer.multiaddr,
62 alias: peer.alias.unwrap_or_else(|| alias!(peer_id).to_owned()),
63 relation: PeerRelation::Known,
64 },
65 PeerState::default(),
66 PeerMetrics::default(),
67 ),
68 )
69 }));
70
71 Self {
72 local_id,
73 local_addrs: HashSet::with_capacity(LOCAL_ADDRS_INITIAL_CAP),
74 peers: p,
75 banned_peers: HashSet::default(),
76 banned_addrs: HashSet::default(),
77 }
78 }
79
80 pub fn add(&mut self, peer_id: PeerId, peer_info: PeerInfo) -> Result<(), (PeerId, PeerInfo, Error)> {
81 if self.contains(&peer_id) {
82 return Err((peer_id, peer_info, Error::PeerIsDuplicate(peer_id)));
83 }
84
85 let _ = self
87 .peers
88 .insert(peer_id, (peer_info, PeerState::default(), PeerMetrics::default()));
89
90 Ok(())
91 }
92
93 pub fn remove(&mut self, peer_id: &PeerId) -> Result<PeerInfo, Error> {
94 let (info, _, _) = self.peers.remove(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?;
95
96 Ok(info)
97 }
98
99 pub fn contains(&self, peer_id: &PeerId) -> bool {
100 self.peers.contains_key(peer_id)
101 }
102
103 pub fn info(&self, peer_id: &PeerId) -> Result<PeerInfo, Error> {
104 self.peers
105 .get(peer_id)
106 .ok_or(Error::PeerNotPresent(*peer_id))
107 .map(|(info, _, _)| info.clone())
108 }
109
110 pub fn metrics(&self, peer_id: &PeerId) -> Result<PeerMetrics, Error> {
111 self.peers
112 .get(peer_id)
113 .ok_or(Error::PeerNotPresent(*peer_id))
114 .map(|(_, _, metrics)| metrics.clone())
115 }
116
117 pub fn len(&self) -> usize {
118 self.peers.len()
119 }
120
121 pub fn add_local_addr(&mut self, addr: Multiaddr) -> Result<(), (Multiaddr, Error)> {
123 if self.local_addrs.insert(addr.clone()) {
124 Ok(())
125 } else {
126 Err((addr.clone(), Error::AddressIsDuplicate(addr)))
127 }
128 }
129
130 pub fn update_info<U>(&mut self, peer_id: &PeerId, mut update: U) -> Result<(), Error>
131 where
132 U: FnMut(&mut PeerInfo),
133 {
134 let (info, _, _) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?;
135
136 update(info);
137
138 Ok(())
139 }
140
141 pub fn update_state<U>(&mut self, peer_id: &PeerId, mut update: U) -> Result<Option<GossipSender>, Error>
142 where
143 U: FnMut(&mut PeerState) -> Option<GossipSender>,
144 {
145 let (_, state, _) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?;
146
147 Ok(update(state))
148 }
149
150 pub fn update_metrics<U>(&mut self, peer_id: &PeerId, mut update: U) -> Result<(), Error>
151 where
152 U: FnMut(&mut PeerMetrics),
153 {
154 let (_, _, metrics) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?;
155
156 update(metrics);
157
158 Ok(())
159 }
160
161 pub fn satisfies<P>(&self, peer_id: &PeerId, predicate: P) -> Result<bool, Error>
162 where
163 P: Fn(&PeerInfo, &PeerState, &PeerMetrics) -> bool,
164 {
165 self.peers
166 .get(peer_id)
167 .ok_or(Error::PeerNotPresent(*peer_id))
168 .map(|(info, state, metrics)| predicate(info, state, metrics))
169 }
170
171 pub fn filter<'a, P: 'a>(&'a self, predicate: P) -> impl Iterator<Item = (PeerId, PeerInfo, PeerMetrics)> + '_
172 where
173 P: Fn(&PeerInfo, &PeerState, &PeerMetrics) -> bool,
174 {
175 self.peers.iter().filter_map(move |(peer_id, (info, state, metrics))| {
176 if predicate(info, state, metrics) {
177 Some((*peer_id, info.clone(), metrics.clone()))
178 } else {
179 None
180 }
181 })
182 }
183
184 pub fn filter_count<P>(&self, predicate: P) -> usize
185 where
186 P: Fn(&PeerInfo, &PeerState, &PeerMetrics) -> bool,
187 {
188 self.peers.iter().fold(0, |count, (_, (info, state, metrics))| {
189 if predicate(info, state, metrics) {
190 count + 1
191 } else {
192 count
193 }
194 })
195 }
196
197 pub fn filter_remove<P>(&mut self, peer_id: &PeerId, predicate: P) -> bool
198 where
199 P: Fn(&PeerInfo, &PeerState, &PeerMetrics) -> bool,
200 {
201 if self
205 .peers
206 .get(peer_id)
207 .filter(|(info, state, metrics)| predicate(info, state, metrics))
208 .is_some()
209 {
210 self.peers.remove(peer_id).is_some()
212 } else {
213 false
214 }
215 }
216
217 #[cfg(test)]
218 pub fn clear(&mut self) {
219 self.peers.clear();
220 self.banned_peers.clear();
221 self.banned_addrs.clear();
222 }
223
224 pub fn ban_peer(&mut self, peer_id: PeerId) -> Result<(), Error> {
225 if self.banned_peers.insert(peer_id) {
227 Ok(())
228 } else {
229 Err(Error::PeerIsBanned(peer_id))
230 }
231 }
232
233 pub fn ban_address(&mut self, address: Multiaddr) -> Result<(), Error> {
234 if self.banned_addrs.insert(address.clone()) {
236 Ok(())
237 } else {
238 Err(Error::AddressIsBanned(address))
239 }
240 }
241
242 pub fn unban_peer(&mut self, peer_id: &PeerId) -> Result<(), Error> {
243 if self.banned_peers.remove(peer_id) {
244 Ok(())
245 } else {
246 Err(Error::PeerIsUnbanned(*peer_id))
247 }
248 }
249
250 pub fn unban_address(&mut self, addr: &Multiaddr) -> Result<(), Error> {
251 if self.banned_addrs.remove(addr) {
252 Ok(())
253 } else {
254 Err(Error::AddressIsUnbanned(addr.clone()))
255 }
256 }
257
258 pub fn is_peer_banned(&self, peer_id: &PeerId) -> bool {
259 self.banned_peers.contains(peer_id)
260 }
261
262 pub fn is_addr_banned(&self, addr: &Multiaddr) -> bool {
263 self.banned_addrs.contains(addr)
264 }
265
266 pub fn accepts_incoming_peer(&self, peer_id: &PeerId, peer_addr: &Multiaddr) -> Result<(), Error> {
267 if peer_id == &self.local_id {
276 Err(Error::PeerIsLocal(*peer_id))
277 } else if self.local_addrs.contains(peer_addr) {
278 Err(Error::AddressIsLocal(peer_addr.clone()))
279 } else if self.banned_peers.contains(peer_id) {
280 Err(Error::PeerIsBanned(*peer_id))
281 } else if self.banned_addrs.contains(peer_addr) {
282 Err(Error::AddressIsBanned(peer_addr.clone()))
283 } else if self
284 .satisfies(peer_id, |_, state, _| state.is_connected())
285 .unwrap_or(false)
286 {
287 Err(Error::PeerIsConnected(*peer_id))
288 } else if !self.contains(peer_id)
289 && self.filter_count(|info, _, _| info.relation.is_unknown()) >= global::max_unknown_peers()
290 {
291 Err(Error::ExceedsUnknownPeerLimit(global::max_unknown_peers()))
292 } else if !self.contains(peer_id)
293 && self.filter_count(|info, _, _| info.relation.is_discovered()) >= global::max_discovered_peers()
294 {
295 Err(Error::ExceedsDiscoveredPeerLimit(global::max_discovered_peers()))
296 } else {
297 Ok(())
299 }
300 }
301
302 pub fn allows_dialing_peer(&self, peer_id: &PeerId) -> Result<(), Error> {
303 if peer_id == &self.local_id {
313 Err(Error::PeerIsLocal(*peer_id))
314 } else if !self.contains(peer_id) {
315 Err(Error::PeerNotPresent(*peer_id))
316 } else if self.banned_peers.contains(peer_id) {
317 Err(Error::PeerIsBanned(*peer_id))
318 } else if self
319 .satisfies(peer_id, |_, state, _| state.is_connected())
320 .unwrap_or(false)
321 {
322 Err(Error::PeerIsConnected(*peer_id))
323 } else {
324 let (peer_info, _, _) = self.peers.get(peer_id).unwrap();
325
326 if self.local_addrs.contains(&peer_info.address) {
327 Err(Error::AddressIsLocal(peer_info.address.clone()))
328 } else if self.banned_addrs.contains(&peer_info.address) {
329 Err(Error::AddressIsBanned(peer_info.address.clone()))
330 } else if peer_info.relation.is_unknown()
331 && self.filter_count(|info, status, _| info.relation.is_unknown() && status.is_connected())
332 >= global::max_unknown_peers()
333 {
334 Err(Error::ExceedsUnknownPeerLimit(global::max_unknown_peers()))
335 } else if peer_info.relation.is_discovered()
336 && self.filter_count(|info, status, _| info.relation.is_discovered() && status.is_connected())
337 >= global::max_discovered_peers()
338 {
339 Err(Error::ExceedsDiscoveredPeerLimit(global::max_discovered_peers()))
340 } else {
341 Ok(())
343 }
344 }
345 }
346
347 pub fn allows_dialing_addr(&self, addr: &Multiaddr) -> Result<(), Error> {
348 if self.local_addrs.contains(addr) {
353 Err(Error::AddressIsLocal(addr.clone()))
354 } else if self.banned_addrs.contains(addr) {
355 Err(Error::AddressIsBanned(addr.clone()))
356 } else if let Some(peer_id) = self.find_peer_if_connected(addr) {
357 Err(Error::PeerIsConnected(peer_id))
358 } else {
359 Ok(())
361 }
362 }
363
364 fn find_peer_if_connected(&self, addr: &Multiaddr) -> Option<PeerId> {
365 self.filter(|info, state, _| state.is_connected() && info.address == *addr)
366 .next()
367 .map(|(peer_id, _, _)| peer_id)
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use libp2p::{identity::ed25519::Keypair, multiaddr::Protocol};
374
375 use super::*;
376
377 #[test]
378 fn new_list() {
379 let pl = PeerList::new(gen_constant_peer_id());
380
381 assert_eq!(pl.len(), 0);
382 }
383
384 #[test]
385 fn add_peers() {
386 let local_id = gen_constant_peer_id();
387 let mut pl = PeerList::new(local_id);
388
389 for i in 1..=3 {
390 assert!(
391 pl.add(
392 gen_random_peer_id(),
393 gen_deterministic_peer_info(i, PeerRelation::Known)
394 )
395 .is_ok()
396 );
397 assert_eq!(pl.len(), i as usize);
398 }
399 }
400
401 #[test]
402 fn insert() {
403 let local_id = gen_constant_peer_id();
404 let mut pl = PeerList::new(local_id);
405
406 let peer_id = gen_constant_peer_id();
407
408 assert!(pl.add(peer_id, gen_constant_peer_info()).is_ok());
409
410 assert!(matches!(
412 pl.add(peer_id, gen_constant_peer_info()),
413 Err((_, _, Error::PeerIsDuplicate(_)))
414 ));
415 }
416
417 #[test]
418 fn deny_incoming_local_peer() {
419 let local_id = gen_constant_peer_id();
420
421 let pl = PeerList::new(local_id);
422
423 assert!(matches!(
424 pl.accepts_incoming_peer(&local_id, &gen_constant_peer_info().address),
425 Err(Error::PeerIsLocal(_))
426 ));
427 }
428
429 #[test]
430 fn allow_incoming_added_peer() {
431 let local_id = gen_constant_peer_id();
432 let peer_id = gen_random_peer_id();
433 let peer_info = gen_constant_peer_info();
434
435 let mut pl = PeerList::new(local_id);
436
437 pl.add(peer_id, peer_info.clone()).unwrap();
438 pl.accepts_incoming_peer(&peer_id, &peer_info.address).unwrap();
439 }
440
441 #[test]
442 fn conditional_remove() {
443 let local_id = gen_constant_peer_id();
444 let mut pl = PeerList::new(local_id);
445
446 let peer_id = gen_random_peer_id();
447
448 pl.add(peer_id, gen_deterministic_peer_info(0, PeerRelation::Known))
449 .unwrap();
450 assert_eq!(1, pl.len());
451
452 pl.filter_remove(&peer_id, |info, _, _| info.relation.is_unknown());
453 assert_eq!(1, pl.len());
454
455 pl.filter_remove(&peer_id, |info, _, _| info.relation.is_known());
456 assert_eq!(0, pl.len());
457 }
458
459 pub fn gen_constant_peer_id() -> PeerId {
462 "12D3KooWJWEKvSFbben74C7H4YtKjhPMTDxd7gP7zxWSUEeF27st".parse().unwrap()
463 }
464
465 pub fn gen_random_peer_id() -> PeerId {
466 PeerId::from_public_key(&libp2p_core::PublicKey::Ed25519(Keypair::generate().public()))
467 }
468
469 pub fn gen_deterministic_peer_info(port: u16, relation: PeerRelation) -> PeerInfo {
470 PeerInfo {
471 address: gen_deterministic_addr(port),
472 alias: port.to_string(),
473 relation,
474 }
475 }
476
477 pub fn gen_constant_peer_info() -> PeerInfo {
478 PeerInfo {
479 address: gen_deterministic_addr(1),
480 alias: String::new(),
481 relation: PeerRelation::Known,
482 }
483 }
484
485 pub fn gen_deterministic_addr(port: u16) -> Multiaddr {
486 let mut addr = Multiaddr::empty();
487 addr.push(Protocol::Dns("localhost".into()));
488 addr.push(Protocol::Tcp(port));
489 addr
490 }
491}
492
493#[derive(Clone, Debug)]
494pub enum PeerState {
495 Disconnected,
496 Connected(GossipSender),
497}
498
499#[derive(Clone, Debug, Default)]
500pub struct PeerMetrics {
501 pub(crate) num_dials: usize,
502 pub(crate) identified_at: Option<u64>,
503}
504
505impl Default for PeerState {
506 fn default() -> Self {
507 Self::Disconnected
508 }
509}
510
511impl PeerState {
512 pub fn is_disconnected(&self) -> bool {
513 matches!(self, Self::Disconnected)
514 }
515
516 pub fn is_connected(&self) -> bool {
517 matches!(self, Self::Connected(_))
518 }
519
520 pub fn set_connected(&mut self, gossip_sender: GossipSender) -> Option<GossipSender> {
521 *self = Self::Connected(gossip_sender);
522 None
523 }
524
525 pub fn set_disconnected(&mut self) -> Option<GossipSender> {
526 match take(self) {
527 Self::Disconnected => None,
528 Self::Connected(sender) => Some(sender),
529 }
530 }
531}
532
533#[cfg(test)]
534mod peerstate_tests {
535 use super::*;
536 use crate::swarm::protocols::iota_gossip::channel;
537
538 #[test]
539 fn new_peer_state() {
540 let peerstate = PeerState::default();
541
542 assert!(peerstate.is_disconnected());
543 }
544
545 #[test]
546 fn peer_state_change() {
547 let mut peerstate = PeerState::Disconnected;
548 let (tx, _rx) = channel();
549
550 peerstate.set_connected(tx);
551 assert!(peerstate.is_connected());
552
553 assert!(peerstate.set_disconnected().is_some());
554 assert!(peerstate.is_disconnected());
555 assert!(peerstate.set_disconnected().is_none());
556 }
557}