lumina_node/
peer_tracker.rs

1//! Primitives related to tracking the state of peers in the network.
2
3use std::borrow::Borrow;
4
5use dashmap::mapref::entry::Entry;
6use dashmap::mapref::one::RefMut;
7use dashmap::DashMap;
8use libp2p::{swarm::ConnectionId, Multiaddr, PeerId};
9use rand::seq::SliceRandom;
10use serde::{Deserialize, Serialize};
11use smallvec::SmallVec;
12use tokio::sync::watch;
13
14use crate::events::{EventPublisher, NodeEvent};
15
16/// Keeps track various information about peers.
17#[derive(Debug)]
18pub struct PeerTracker {
19    peers: DashMap<PeerId, PeerInfo>,
20    info_tx: watch::Sender<PeerTrackerInfo>,
21    event_pub: EventPublisher,
22}
23
24/// Statistics of the connected peers
25#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
26#[derive(Debug, Clone, Default, Serialize, Deserialize)]
27pub struct PeerTrackerInfo {
28    /// Number of the connected peers.
29    pub num_connected_peers: u64,
30    /// Number of the connected trusted peers.
31    pub num_connected_trusted_peers: u64,
32}
33
34#[derive(Debug)]
35struct PeerInfo {
36    state: PeerState,
37    addrs: SmallVec<[Multiaddr; 4]>,
38    connections: SmallVec<[ConnectionId; 1]>,
39    trusted: bool,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43enum PeerState {
44    Discovered,
45    AddressesFound,
46    Connected,
47}
48
49impl PeerInfo {
50    fn is_connected(&self) -> bool {
51        matches!(self.state, PeerState::Connected)
52    }
53}
54
55impl PeerTracker {
56    /// Constructs an empty PeerTracker.
57    pub fn new(event_pub: EventPublisher) -> Self {
58        PeerTracker {
59            peers: DashMap::new(),
60            info_tx: watch::channel(PeerTrackerInfo::default()).0,
61            event_pub,
62        }
63    }
64
65    /// Returns the current [`PeerTrackerInfo`].
66    pub fn info(&self) -> PeerTrackerInfo {
67        self.info_tx.borrow().to_owned()
68    }
69
70    /// Returns a watcher for any [`PeerTrackerInfo`] changes.
71    pub fn info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
72        self.info_tx.subscribe()
73    }
74
75    /// Sets peer as discovered if this is it's first appearance.
76    ///
77    /// Returns `true` if peer was not known from before.
78    pub fn set_maybe_discovered(&self, peer: PeerId) -> bool {
79        match self.peers.entry(peer) {
80            Entry::Vacant(entry) => {
81                entry.insert(PeerInfo {
82                    state: PeerState::Discovered,
83                    addrs: SmallVec::new(),
84                    connections: SmallVec::new(),
85                    trusted: false,
86                });
87                true
88            }
89            Entry::Occupied(_) => false,
90        }
91    }
92
93    /// Get the `PeerInfo` of the peer.
94    ///
95    /// If peer is not found it is added as `PeerState::Discovered`.
96    fn get(&self, peer: PeerId) -> RefMut<'_, PeerId, PeerInfo> {
97        self.peers.entry(peer).or_insert_with(|| PeerInfo {
98            state: PeerState::Discovered,
99            addrs: SmallVec::new(),
100            connections: SmallVec::new(),
101            trusted: false,
102        })
103    }
104
105    /// Add an address for a peer.
106    pub fn add_addresses<I, A>(&self, peer: PeerId, addrs: I)
107    where
108        I: IntoIterator<Item = A>,
109        A: Borrow<Multiaddr>,
110    {
111        let mut state = self.get(peer);
112
113        for addr in addrs {
114            let addr = addr.borrow();
115
116            if !state.addrs.contains(addr) {
117                state.addrs.push(addr.to_owned());
118            }
119        }
120
121        // Upgrade state
122        if state.state == PeerState::Discovered && !state.addrs.is_empty() {
123            state.state = PeerState::AddressesFound;
124        }
125    }
126
127    /// Sets peer as trusted.
128    pub fn set_trusted(&self, peer: PeerId, is_trusted: bool) {
129        let mut peer_info = self.get(peer);
130
131        if peer_info.trusted == is_trusted {
132            // Nothing to be done
133            return;
134        }
135
136        peer_info.trusted = is_trusted;
137
138        // If peer was already connected, then `num_connected_trusted_peers`
139        // needs to be adjusted based on the new information.
140        if peer_info.is_connected() {
141            self.info_tx.send_modify(|tracker_info| {
142                if is_trusted {
143                    tracker_info.num_connected_trusted_peers += 1;
144                } else {
145                    tracker_info.num_connected_trusted_peers -= 1;
146                }
147            });
148        }
149    }
150
151    /// Sets peer as connected.
152    pub fn set_connected(
153        &self,
154        peer: PeerId,
155        connection_id: ConnectionId,
156        address: impl Into<Option<Multiaddr>>,
157    ) {
158        let mut peer_info = self.get(peer);
159
160        if let Some(address) = address.into() {
161            if !peer_info.addrs.contains(&address) {
162                peer_info.addrs.push(address);
163            }
164        }
165
166        peer_info.connections.push(connection_id);
167
168        // If peer was not already connected from before
169        if !peer_info.is_connected() {
170            peer_info.state = PeerState::Connected;
171
172            increment_connected_peers(&self.info_tx, peer_info.trusted);
173
174            self.event_pub.send(NodeEvent::PeerConnected {
175                id: peer,
176                trusted: peer_info.trusted,
177            });
178        }
179    }
180
181    /// Sets peer as disconnected if `connection_id` was the last connection.
182    ///
183    /// Returns `true` if was set to disconnected.
184    pub fn set_maybe_disconnected(&self, peer: PeerId, connection_id: ConnectionId) -> bool {
185        let mut peer_info = self.get(peer);
186
187        peer_info.connections.retain(|id| *id != connection_id);
188
189        // If this is the last connection from the peer
190        if peer_info.connections.is_empty() {
191            if peer_info.addrs.is_empty() {
192                peer_info.state = PeerState::Discovered;
193            } else {
194                peer_info.state = PeerState::AddressesFound;
195            }
196
197            decrement_connected_peers(&self.info_tx, peer_info.trusted);
198
199            self.event_pub.send(NodeEvent::PeerDisconnected {
200                id: peer,
201                trusted: peer_info.trusted,
202            });
203
204            true
205        } else {
206            false
207        }
208    }
209
210    /// Returns true if peer is connected.
211    #[allow(dead_code)]
212    pub fn is_connected(&self, peer: PeerId) -> bool {
213        self.get(peer).is_connected()
214    }
215
216    /// Returns the addresses of the peer.
217    #[allow(dead_code)]
218    pub fn addresses(&self, peer: PeerId) -> SmallVec<[Multiaddr; 4]> {
219        self.get(peer).addrs.clone()
220    }
221
222    /// Removes a peer.
223    #[allow(dead_code)]
224    pub fn remove(&self, peer: PeerId) {
225        self.peers.remove(&peer);
226    }
227
228    /// Returns connected peers.
229    pub fn connected_peers(&self) -> Vec<PeerId> {
230        self.peers
231            .iter()
232            .filter(|pair| pair.value().is_connected())
233            .map(|pair| pair.key().to_owned())
234            .collect()
235    }
236
237    pub fn connections(&self) -> Vec<(PeerId, SmallVec<[ConnectionId; 1]>)> {
238        self.peers
239            .iter()
240            .filter(|pair| pair.value().is_connected())
241            .map(|pair| (pair.key().to_owned(), pair.value().connections.clone()))
242            .collect()
243    }
244
245    /// Returns one of the best peers.
246    pub fn best_peer(&self) -> Option<PeerId> {
247        const MAX_PEER_SAMPLE: usize = 128;
248
249        // TODO: Implement peer score and return the best.
250        let mut peers = self
251            .peers
252            .iter()
253            .filter(|pair| pair.value().is_connected())
254            .take(MAX_PEER_SAMPLE)
255            .map(|pair| pair.key().to_owned())
256            .collect::<SmallVec<[_; MAX_PEER_SAMPLE]>>();
257
258        peers.shuffle(&mut rand::thread_rng());
259
260        peers.first().copied()
261    }
262
263    /// Returns up to N amount of best peers.
264    #[allow(dead_code)]
265    pub fn best_n_peers(&self, limit: usize) -> Vec<PeerId> {
266        // TODO: Implement peer score and return the best N peers.
267        self.peers
268            .iter()
269            .filter(|pair| pair.value().is_connected())
270            .take(limit)
271            .map(|pair| pair.key().to_owned())
272            // collect instead of returning an iter to not block the dashmap
273            .collect()
274    }
275
276    /// Returns up to N amount of trusted peers.
277    pub fn trusted_n_peers(&self, limit: usize) -> Vec<PeerId> {
278        self.peers
279            .iter()
280            .filter(|pair| pair.value().is_connected() && pair.value().trusted)
281            .take(limit)
282            .map(|pair| pair.key().to_owned())
283            // collect instead of returning an iter to not block the dashmap
284            .collect()
285    }
286}
287
288fn increment_connected_peers(info_tx: &watch::Sender<PeerTrackerInfo>, trusted: bool) {
289    info_tx.send_modify(|tracker_info| {
290        tracker_info.num_connected_peers += 1;
291
292        if trusted {
293            tracker_info.num_connected_trusted_peers += 1;
294        }
295    });
296}
297
298fn decrement_connected_peers(info_tx: &watch::Sender<PeerTrackerInfo>, trusted: bool) {
299    info_tx.send_modify(|tracker_info| {
300        tracker_info.num_connected_peers -= 1;
301
302        if trusted {
303            tracker_info.num_connected_trusted_peers -= 1;
304        }
305    });
306}
307
308#[cfg(test)]
309mod tests {
310    use crate::events::EventChannel;
311
312    use super::*;
313
314    #[test]
315    fn trust_before_connect() {
316        let event_channel = EventChannel::new();
317        let tracker = PeerTracker::new(event_channel.publisher());
318        let mut watcher = tracker.info_watcher();
319        let peer = PeerId::random();
320
321        assert!(!watcher.has_changed().unwrap());
322
323        tracker.set_trusted(peer, true);
324        assert!(!watcher.has_changed().unwrap());
325
326        tracker.set_connected(peer, ConnectionId::new_unchecked(1), None);
327        assert!(watcher.has_changed().unwrap());
328        let info = watcher.borrow_and_update().to_owned();
329        assert_eq!(info.num_connected_peers, 1);
330        assert_eq!(info.num_connected_trusted_peers, 1);
331    }
332
333    #[test]
334    fn trust_after_connect() {
335        let event_channel = EventChannel::new();
336        let tracker = PeerTracker::new(event_channel.publisher());
337        let mut watcher = tracker.info_watcher();
338        let peer = PeerId::random();
339
340        assert!(!watcher.has_changed().unwrap());
341
342        tracker.set_connected(peer, ConnectionId::new_unchecked(1), None);
343        assert!(watcher.has_changed().unwrap());
344        let info = watcher.borrow_and_update().to_owned();
345        assert_eq!(info.num_connected_peers, 1);
346        assert_eq!(info.num_connected_trusted_peers, 0);
347
348        tracker.set_trusted(peer, true);
349        assert!(watcher.has_changed().unwrap());
350        let info = watcher.borrow_and_update().to_owned();
351        assert_eq!(info.num_connected_peers, 1);
352        assert_eq!(info.num_connected_trusted_peers, 1);
353    }
354
355    #[test]
356    fn untrust_after_connect() {
357        let event_channel = EventChannel::new();
358        let tracker = PeerTracker::new(event_channel.publisher());
359        let mut watcher = tracker.info_watcher();
360        let peer = PeerId::random();
361
362        assert!(!watcher.has_changed().unwrap());
363
364        tracker.set_trusted(peer, true);
365        assert!(!watcher.has_changed().unwrap());
366
367        tracker.set_connected(peer, ConnectionId::new_unchecked(1), None);
368        assert!(watcher.has_changed().unwrap());
369        let info = watcher.borrow_and_update().to_owned();
370        assert_eq!(info.num_connected_peers, 1);
371        assert_eq!(info.num_connected_trusted_peers, 1);
372
373        tracker.set_trusted(peer, false);
374        assert!(watcher.has_changed().unwrap());
375        let info = watcher.borrow_and_update().to_owned();
376        assert_eq!(info.num_connected_peers, 1);
377        assert_eq!(info.num_connected_trusted_peers, 0);
378    }
379}