bee_gossip/peer/
list.rs

1// Copyright 2020-2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4#![cfg(feature = "full")]
5
6use std::{mem::take, sync::Arc};
7
8use hashbrown::{HashMap, HashSet};
9use libp2p::{Multiaddr, PeerId};
10use tokio::sync::RwLock;
11
12use super::{
13    error::Error,
14    info::{PeerInfo, PeerRelation},
15};
16use crate::{alias, config::Peer, init::global, swarm::protocols::iota_gossip::GossipSender};
17
18const REMOTE_PEERS_INITIAL_CAP: usize = 8;
19const LOCAL_ADDRS_INITIAL_CAP: usize = 4;
20
21/// A thread-safe wrapper around a [`PeerList`].
22#[derive(Debug, Clone)]
23pub struct PeerListWrapper(pub Arc<RwLock<PeerList>>);
24
25impl PeerListWrapper {
26    pub fn new(peerlist: PeerList) -> Self {
27        Self(Arc::new(RwLock::new(peerlist)))
28    }
29}
30
31#[derive(Debug)]
32pub struct PeerList {
33    local_id: PeerId,
34    local_addrs: HashSet<Multiaddr>,
35    peers: HashMap<PeerId, (PeerInfo, PeerState, PeerMetrics)>,
36    banned_peers: HashSet<PeerId>,
37    banned_addrs: HashSet<Multiaddr>,
38}
39
40impl PeerList {
41    pub fn new(local_id: PeerId) -> Self {
42        Self {
43            local_id,
44            local_addrs: HashSet::with_capacity(LOCAL_ADDRS_INITIAL_CAP),
45            peers: HashMap::with_capacity(REMOTE_PEERS_INITIAL_CAP),
46            banned_peers: HashSet::default(),
47            banned_addrs: HashSet::default(),
48        }
49    }
50
51    pub fn from_peers(local_id: PeerId, peers: Vec<Peer>) -> Self {
52        let mut p = HashMap::with_capacity(REMOTE_PEERS_INITIAL_CAP);
53
54        p.extend(peers.into_iter().map(|peer| {
55            // Realiasing because of otherwise partial move.
56            let peer_id = peer.peer_id;
57            (
58                peer_id,
59                (
60                    PeerInfo {
61                        address: peer.multiaddr,
62                        alias: peer.alias.unwrap_or_else(|| alias!(peer_id).to_owned()),
63                        relation: PeerRelation::Known,
64                    },
65                    PeerState::default(),
66                    PeerMetrics::default(),
67                ),
68            )
69        }));
70
71        Self {
72            local_id,
73            local_addrs: HashSet::with_capacity(LOCAL_ADDRS_INITIAL_CAP),
74            peers: p,
75            banned_peers: HashSet::default(),
76            banned_addrs: HashSet::default(),
77        }
78    }
79
80    pub fn add(&mut self, peer_id: PeerId, peer_info: PeerInfo) -> Result<(), (PeerId, PeerInfo, Error)> {
81        if self.contains(&peer_id) {
82            return Err((peer_id, peer_info, Error::PeerIsDuplicate(peer_id)));
83        }
84
85        // Since we already checked that such a `peer_id` is not yet present, the returned value is always `None`.
86        let _ = self
87            .peers
88            .insert(peer_id, (peer_info, PeerState::default(), PeerMetrics::default()));
89
90        Ok(())
91    }
92
93    pub fn remove(&mut self, peer_id: &PeerId) -> Result<PeerInfo, Error> {
94        let (info, _, _) = self.peers.remove(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?;
95
96        Ok(info)
97    }
98
99    pub fn contains(&self, peer_id: &PeerId) -> bool {
100        self.peers.contains_key(peer_id)
101    }
102
103    pub fn info(&self, peer_id: &PeerId) -> Result<PeerInfo, Error> {
104        self.peers
105            .get(peer_id)
106            .ok_or(Error::PeerNotPresent(*peer_id))
107            .map(|(info, _, _)| info.clone())
108    }
109
110    pub fn metrics(&self, peer_id: &PeerId) -> Result<PeerMetrics, Error> {
111        self.peers
112            .get(peer_id)
113            .ok_or(Error::PeerNotPresent(*peer_id))
114            .map(|(_, _, metrics)| metrics.clone())
115    }
116
117    pub fn len(&self) -> usize {
118        self.peers.len()
119    }
120
121    /// Note: Returns an error if the address trying to be added is a duplicate.
122    pub fn add_local_addr(&mut self, addr: Multiaddr) -> Result<(), (Multiaddr, Error)> {
123        if self.local_addrs.insert(addr.clone()) {
124            Ok(())
125        } else {
126            Err((addr.clone(), Error::AddressIsDuplicate(addr)))
127        }
128    }
129
130    pub fn update_info<U>(&mut self, peer_id: &PeerId, mut update: U) -> Result<(), Error>
131    where
132        U: FnMut(&mut PeerInfo),
133    {
134        let (info, _, _) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?;
135
136        update(info);
137
138        Ok(())
139    }
140
141    pub fn update_state<U>(&mut self, peer_id: &PeerId, mut update: U) -> Result<Option<GossipSender>, Error>
142    where
143        U: FnMut(&mut PeerState) -> Option<GossipSender>,
144    {
145        let (_, state, _) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?;
146
147        Ok(update(state))
148    }
149
150    pub fn update_metrics<U>(&mut self, peer_id: &PeerId, mut update: U) -> Result<(), Error>
151    where
152        U: FnMut(&mut PeerMetrics),
153    {
154        let (_, _, metrics) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?;
155
156        update(metrics);
157
158        Ok(())
159    }
160
161    pub fn satisfies<P>(&self, peer_id: &PeerId, predicate: P) -> Result<bool, Error>
162    where
163        P: Fn(&PeerInfo, &PeerState, &PeerMetrics) -> bool,
164    {
165        self.peers
166            .get(peer_id)
167            .ok_or(Error::PeerNotPresent(*peer_id))
168            .map(|(info, state, metrics)| predicate(info, state, metrics))
169    }
170
171    pub fn filter<'a, P: 'a>(&'a self, predicate: P) -> impl Iterator<Item = (PeerId, PeerInfo, PeerMetrics)> + '_
172    where
173        P: Fn(&PeerInfo, &PeerState, &PeerMetrics) -> bool,
174    {
175        self.peers.iter().filter_map(move |(peer_id, (info, state, metrics))| {
176            if predicate(info, state, metrics) {
177                Some((*peer_id, info.clone(), metrics.clone()))
178            } else {
179                None
180            }
181        })
182    }
183
184    pub fn filter_count<P>(&self, predicate: P) -> usize
185    where
186        P: Fn(&PeerInfo, &PeerState, &PeerMetrics) -> bool,
187    {
188        self.peers.iter().fold(0, |count, (_, (info, state, metrics))| {
189            if predicate(info, state, metrics) {
190                count + 1
191            } else {
192                count
193            }
194        })
195    }
196
197    pub fn filter_remove<P>(&mut self, peer_id: &PeerId, predicate: P) -> bool
198    where
199        P: Fn(&PeerInfo, &PeerState, &PeerMetrics) -> bool,
200    {
201        // NB: Since we drop a potential reference to `&(PeerInfo, PeerState)` this code won't create a deadlock in case
202        // we refactor `PeerList` in a way that `.remove` would only take a `&self`.
203
204        if self
205            .peers
206            .get(peer_id)
207            .filter(|(info, state, metrics)| predicate(info, state, metrics))
208            .is_some()
209        {
210            // Should always return `true`, because we know it's there.
211            self.peers.remove(peer_id).is_some()
212        } else {
213            false
214        }
215    }
216
217    #[cfg(test)]
218    pub fn clear(&mut self) {
219        self.peers.clear();
220        self.banned_peers.clear();
221        self.banned_addrs.clear();
222    }
223
224    pub fn ban_peer(&mut self, peer_id: PeerId) -> Result<(), Error> {
225        // TODO: use storage to persist banned peers
226        if self.banned_peers.insert(peer_id) {
227            Ok(())
228        } else {
229            Err(Error::PeerIsBanned(peer_id))
230        }
231    }
232
233    pub fn ban_address(&mut self, address: Multiaddr) -> Result<(), Error> {
234        // TODO: use storage to persist banned addrs
235        if self.banned_addrs.insert(address.clone()) {
236            Ok(())
237        } else {
238            Err(Error::AddressIsBanned(address))
239        }
240    }
241
242    pub fn unban_peer(&mut self, peer_id: &PeerId) -> Result<(), Error> {
243        if self.banned_peers.remove(peer_id) {
244            Ok(())
245        } else {
246            Err(Error::PeerIsUnbanned(*peer_id))
247        }
248    }
249
250    pub fn unban_address(&mut self, addr: &Multiaddr) -> Result<(), Error> {
251        if self.banned_addrs.remove(addr) {
252            Ok(())
253        } else {
254            Err(Error::AddressIsUnbanned(addr.clone()))
255        }
256    }
257
258    pub fn is_peer_banned(&self, peer_id: &PeerId) -> bool {
259        self.banned_peers.contains(peer_id)
260    }
261
262    pub fn is_addr_banned(&self, addr: &Multiaddr) -> bool {
263        self.banned_addrs.contains(addr)
264    }
265
266    pub fn accepts_incoming_peer(&self, peer_id: &PeerId, peer_addr: &Multiaddr) -> Result<(), Error> {
267        // Checks performed are:
268        // - Deny ourself as peer.
269        // - Deny one of our own addresses.
270        // - Deny banned peers.
271        // - Deny banned addresses.
272        // - Deny already connected peers.
273        // - Deny more than the configured unknown peers.
274        // - Deny more than the configured discovered peers.
275        if peer_id == &self.local_id {
276            Err(Error::PeerIsLocal(*peer_id))
277        } else if self.local_addrs.contains(peer_addr) {
278            Err(Error::AddressIsLocal(peer_addr.clone()))
279        } else if self.banned_peers.contains(peer_id) {
280            Err(Error::PeerIsBanned(*peer_id))
281        } else if self.banned_addrs.contains(peer_addr) {
282            Err(Error::AddressIsBanned(peer_addr.clone()))
283        } else if self
284            .satisfies(peer_id, |_, state, _| state.is_connected())
285            .unwrap_or(false)
286        {
287            Err(Error::PeerIsConnected(*peer_id))
288        } else if !self.contains(peer_id)
289            && self.filter_count(|info, _, _| info.relation.is_unknown()) >= global::max_unknown_peers()
290        {
291            Err(Error::ExceedsUnknownPeerLimit(global::max_unknown_peers()))
292        } else if !self.contains(peer_id)
293            && self.filter_count(|info, _, _| info.relation.is_discovered()) >= global::max_discovered_peers()
294        {
295            Err(Error::ExceedsDiscoveredPeerLimit(global::max_discovered_peers()))
296        } else {
297            // All checks passed! Accept that peer.
298            Ok(())
299        }
300    }
301
302    pub fn allows_dialing_peer(&self, peer_id: &PeerId) -> Result<(), Error> {
303        // Checks performed are:
304        // - Deny dialing ourself as peer.
305        // - Deny dialing a peer that has not been added first. TODO: check if we might want to allow this!
306        // - Deny dialing a banned peer.
307        // - Deny dialing an already connected peer.
308        // - Deny dialing a local address.
309        // - Deny dialing a banned address.
310        // - Deny dialing more than configured unknown peers.
311        // - Deny dialing more than configured discovered peers.
312        if peer_id == &self.local_id {
313            Err(Error::PeerIsLocal(*peer_id))
314        } else if !self.contains(peer_id) {
315            Err(Error::PeerNotPresent(*peer_id))
316        } else if self.banned_peers.contains(peer_id) {
317            Err(Error::PeerIsBanned(*peer_id))
318        } else if self
319            .satisfies(peer_id, |_, state, _| state.is_connected())
320            .unwrap_or(false)
321        {
322            Err(Error::PeerIsConnected(*peer_id))
323        } else {
324            let (peer_info, _, _) = self.peers.get(peer_id).unwrap();
325
326            if self.local_addrs.contains(&peer_info.address) {
327                Err(Error::AddressIsLocal(peer_info.address.clone()))
328            } else if self.banned_addrs.contains(&peer_info.address) {
329                Err(Error::AddressIsBanned(peer_info.address.clone()))
330            } else if peer_info.relation.is_unknown()
331                && self.filter_count(|info, status, _| info.relation.is_unknown() && status.is_connected())
332                    >= global::max_unknown_peers()
333            {
334                Err(Error::ExceedsUnknownPeerLimit(global::max_unknown_peers()))
335            } else if peer_info.relation.is_discovered()
336                && self.filter_count(|info, status, _| info.relation.is_discovered() && status.is_connected())
337                    >= global::max_discovered_peers()
338            {
339                Err(Error::ExceedsDiscoveredPeerLimit(global::max_discovered_peers()))
340            } else {
341                // All checks passed! Allow dialing that peer.
342                Ok(())
343            }
344        }
345    }
346
347    pub fn allows_dialing_addr(&self, addr: &Multiaddr) -> Result<(), Error> {
348        // Checks performed are:
349        // - Deny dialing a local address.
350        // - Deny dialing a banned address.
351        // - Deny dialing an already connected peer (with that address).
352        if self.local_addrs.contains(addr) {
353            Err(Error::AddressIsLocal(addr.clone()))
354        } else if self.banned_addrs.contains(addr) {
355            Err(Error::AddressIsBanned(addr.clone()))
356        } else if let Some(peer_id) = self.find_peer_if_connected(addr) {
357            Err(Error::PeerIsConnected(peer_id))
358        } else {
359            // All checks passed! Allow dialing that address.
360            Ok(())
361        }
362    }
363
364    fn find_peer_if_connected(&self, addr: &Multiaddr) -> Option<PeerId> {
365        self.filter(|info, state, _| state.is_connected() && info.address == *addr)
366            .next()
367            .map(|(peer_id, _, _)| peer_id)
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use libp2p::{identity::ed25519::Keypair, multiaddr::Protocol};
374
375    use super::*;
376
377    #[test]
378    fn new_list() {
379        let pl = PeerList::new(gen_constant_peer_id());
380
381        assert_eq!(pl.len(), 0);
382    }
383
384    #[test]
385    fn add_peers() {
386        let local_id = gen_constant_peer_id();
387        let mut pl = PeerList::new(local_id);
388
389        for i in 1..=3 {
390            assert!(
391                pl.add(
392                    gen_random_peer_id(),
393                    gen_deterministic_peer_info(i, PeerRelation::Known)
394                )
395                .is_ok()
396            );
397            assert_eq!(pl.len(), i as usize);
398        }
399    }
400
401    #[test]
402    fn insert() {
403        let local_id = gen_constant_peer_id();
404        let mut pl = PeerList::new(local_id);
405
406        let peer_id = gen_constant_peer_id();
407
408        assert!(pl.add(peer_id, gen_constant_peer_info()).is_ok());
409
410        // Do not allow inserting the same peer id twice.
411        assert!(matches!(
412            pl.add(peer_id, gen_constant_peer_info()),
413            Err((_, _, Error::PeerIsDuplicate(_)))
414        ));
415    }
416
417    #[test]
418    fn deny_incoming_local_peer() {
419        let local_id = gen_constant_peer_id();
420
421        let pl = PeerList::new(local_id);
422
423        assert!(matches!(
424            pl.accepts_incoming_peer(&local_id, &gen_constant_peer_info().address),
425            Err(Error::PeerIsLocal(_))
426        ));
427    }
428
429    #[test]
430    fn allow_incoming_added_peer() {
431        let local_id = gen_constant_peer_id();
432        let peer_id = gen_random_peer_id();
433        let peer_info = gen_constant_peer_info();
434
435        let mut pl = PeerList::new(local_id);
436
437        pl.add(peer_id, peer_info.clone()).unwrap();
438        pl.accepts_incoming_peer(&peer_id, &peer_info.address).unwrap();
439    }
440
441    #[test]
442    fn conditional_remove() {
443        let local_id = gen_constant_peer_id();
444        let mut pl = PeerList::new(local_id);
445
446        let peer_id = gen_random_peer_id();
447
448        pl.add(peer_id, gen_deterministic_peer_info(0, PeerRelation::Known))
449            .unwrap();
450        assert_eq!(1, pl.len());
451
452        pl.filter_remove(&peer_id, |info, _, _| info.relation.is_unknown());
453        assert_eq!(1, pl.len());
454
455        pl.filter_remove(&peer_id, |info, _, _| info.relation.is_known());
456        assert_eq!(0, pl.len());
457    }
458
459    // ===== helpers =====
460
461    pub fn gen_constant_peer_id() -> PeerId {
462        "12D3KooWJWEKvSFbben74C7H4YtKjhPMTDxd7gP7zxWSUEeF27st".parse().unwrap()
463    }
464
465    pub fn gen_random_peer_id() -> PeerId {
466        PeerId::from_public_key(&libp2p_core::PublicKey::Ed25519(Keypair::generate().public()))
467    }
468
469    pub fn gen_deterministic_peer_info(port: u16, relation: PeerRelation) -> PeerInfo {
470        PeerInfo {
471            address: gen_deterministic_addr(port),
472            alias: port.to_string(),
473            relation,
474        }
475    }
476
477    pub fn gen_constant_peer_info() -> PeerInfo {
478        PeerInfo {
479            address: gen_deterministic_addr(1),
480            alias: String::new(),
481            relation: PeerRelation::Known,
482        }
483    }
484
485    pub fn gen_deterministic_addr(port: u16) -> Multiaddr {
486        let mut addr = Multiaddr::empty();
487        addr.push(Protocol::Dns("localhost".into()));
488        addr.push(Protocol::Tcp(port));
489        addr
490    }
491}
492
493#[derive(Clone, Debug)]
494pub enum PeerState {
495    Disconnected,
496    Connected(GossipSender),
497}
498
499#[derive(Clone, Debug, Default)]
500pub struct PeerMetrics {
501    pub(crate) num_dials: usize,
502    pub(crate) identified_at: Option<u64>,
503}
504
505impl Default for PeerState {
506    fn default() -> Self {
507        Self::Disconnected
508    }
509}
510
511impl PeerState {
512    pub fn is_disconnected(&self) -> bool {
513        matches!(self, Self::Disconnected)
514    }
515
516    pub fn is_connected(&self) -> bool {
517        matches!(self, Self::Connected(_))
518    }
519
520    pub fn set_connected(&mut self, gossip_sender: GossipSender) -> Option<GossipSender> {
521        *self = Self::Connected(gossip_sender);
522        None
523    }
524
525    pub fn set_disconnected(&mut self) -> Option<GossipSender> {
526        match take(self) {
527            Self::Disconnected => None,
528            Self::Connected(sender) => Some(sender),
529        }
530    }
531}
532
533#[cfg(test)]
534mod peerstate_tests {
535    use super::*;
536    use crate::swarm::protocols::iota_gossip::channel;
537
538    #[test]
539    fn new_peer_state() {
540        let peerstate = PeerState::default();
541
542        assert!(peerstate.is_disconnected());
543    }
544
545    #[test]
546    fn peer_state_change() {
547        let mut peerstate = PeerState::Disconnected;
548        let (tx, _rx) = channel();
549
550        peerstate.set_connected(tx);
551        assert!(peerstate.is_connected());
552
553        assert!(peerstate.set_disconnected().is_some());
554        assert!(peerstate.is_disconnected());
555        assert!(peerstate.set_disconnected().is_none());
556    }
557}