1use std::borrow::Borrow;
4
5use dashmap::mapref::entry::Entry;
6use dashmap::mapref::one::RefMut;
7use dashmap::DashMap;
8use libp2p::{swarm::ConnectionId, Multiaddr, PeerId};
9use rand::seq::SliceRandom;
10use serde::{Deserialize, Serialize};
11use smallvec::SmallVec;
12use tokio::sync::watch;
13
14use crate::events::{EventPublisher, NodeEvent};
15
16#[derive(Debug)]
18pub struct PeerTracker {
19 peers: DashMap<PeerId, PeerInfo>,
20 info_tx: watch::Sender<PeerTrackerInfo>,
21 event_pub: EventPublisher,
22}
23
24#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
26#[derive(Debug, Clone, Default, Serialize, Deserialize)]
27pub struct PeerTrackerInfo {
28 pub num_connected_peers: u64,
30 pub num_connected_trusted_peers: u64,
32}
33
34#[derive(Debug)]
35struct PeerInfo {
36 state: PeerState,
37 addrs: SmallVec<[Multiaddr; 4]>,
38 connections: SmallVec<[ConnectionId; 1]>,
39 trusted: bool,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43enum PeerState {
44 Discovered,
45 AddressesFound,
46 Connected,
47}
48
49impl PeerInfo {
50 fn is_connected(&self) -> bool {
51 matches!(self.state, PeerState::Connected)
52 }
53}
54
55impl PeerTracker {
56 pub fn new(event_pub: EventPublisher) -> Self {
58 PeerTracker {
59 peers: DashMap::new(),
60 info_tx: watch::channel(PeerTrackerInfo::default()).0,
61 event_pub,
62 }
63 }
64
65 pub fn info(&self) -> PeerTrackerInfo {
67 self.info_tx.borrow().to_owned()
68 }
69
70 pub fn info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
72 self.info_tx.subscribe()
73 }
74
75 pub fn set_maybe_discovered(&self, peer: PeerId) -> bool {
79 match self.peers.entry(peer) {
80 Entry::Vacant(entry) => {
81 entry.insert(PeerInfo {
82 state: PeerState::Discovered,
83 addrs: SmallVec::new(),
84 connections: SmallVec::new(),
85 trusted: false,
86 });
87 true
88 }
89 Entry::Occupied(_) => false,
90 }
91 }
92
93 fn get(&self, peer: PeerId) -> RefMut<'_, PeerId, PeerInfo> {
97 self.peers.entry(peer).or_insert_with(|| PeerInfo {
98 state: PeerState::Discovered,
99 addrs: SmallVec::new(),
100 connections: SmallVec::new(),
101 trusted: false,
102 })
103 }
104
105 pub fn add_addresses<I, A>(&self, peer: PeerId, addrs: I)
107 where
108 I: IntoIterator<Item = A>,
109 A: Borrow<Multiaddr>,
110 {
111 let mut state = self.get(peer);
112
113 for addr in addrs {
114 let addr = addr.borrow();
115
116 if !state.addrs.contains(addr) {
117 state.addrs.push(addr.to_owned());
118 }
119 }
120
121 if state.state == PeerState::Discovered && !state.addrs.is_empty() {
123 state.state = PeerState::AddressesFound;
124 }
125 }
126
127 pub fn set_trusted(&self, peer: PeerId, is_trusted: bool) {
129 let mut peer_info = self.get(peer);
130
131 if peer_info.trusted == is_trusted {
132 return;
134 }
135
136 peer_info.trusted = is_trusted;
137
138 if peer_info.is_connected() {
141 self.info_tx.send_modify(|tracker_info| {
142 if is_trusted {
143 tracker_info.num_connected_trusted_peers += 1;
144 } else {
145 tracker_info.num_connected_trusted_peers -= 1;
146 }
147 });
148 }
149 }
150
151 pub fn set_connected(
153 &self,
154 peer: PeerId,
155 connection_id: ConnectionId,
156 address: impl Into<Option<Multiaddr>>,
157 ) {
158 let mut peer_info = self.get(peer);
159
160 if let Some(address) = address.into() {
161 if !peer_info.addrs.contains(&address) {
162 peer_info.addrs.push(address);
163 }
164 }
165
166 peer_info.connections.push(connection_id);
167
168 if !peer_info.is_connected() {
170 peer_info.state = PeerState::Connected;
171
172 increment_connected_peers(&self.info_tx, peer_info.trusted);
173
174 self.event_pub.send(NodeEvent::PeerConnected {
175 id: peer,
176 trusted: peer_info.trusted,
177 });
178 }
179 }
180
181 pub fn set_maybe_disconnected(&self, peer: PeerId, connection_id: ConnectionId) -> bool {
185 let mut peer_info = self.get(peer);
186
187 peer_info.connections.retain(|id| *id != connection_id);
188
189 if peer_info.connections.is_empty() {
191 if peer_info.addrs.is_empty() {
192 peer_info.state = PeerState::Discovered;
193 } else {
194 peer_info.state = PeerState::AddressesFound;
195 }
196
197 decrement_connected_peers(&self.info_tx, peer_info.trusted);
198
199 self.event_pub.send(NodeEvent::PeerDisconnected {
200 id: peer,
201 trusted: peer_info.trusted,
202 });
203
204 true
205 } else {
206 false
207 }
208 }
209
210 #[allow(dead_code)]
212 pub fn is_connected(&self, peer: PeerId) -> bool {
213 self.get(peer).is_connected()
214 }
215
216 #[allow(dead_code)]
218 pub fn addresses(&self, peer: PeerId) -> SmallVec<[Multiaddr; 4]> {
219 self.get(peer).addrs.clone()
220 }
221
222 #[allow(dead_code)]
224 pub fn remove(&self, peer: PeerId) {
225 self.peers.remove(&peer);
226 }
227
228 pub fn connected_peers(&self) -> Vec<PeerId> {
230 self.peers
231 .iter()
232 .filter(|pair| pair.value().is_connected())
233 .map(|pair| pair.key().to_owned())
234 .collect()
235 }
236
237 pub fn connections(&self) -> Vec<(PeerId, SmallVec<[ConnectionId; 1]>)> {
238 self.peers
239 .iter()
240 .filter(|pair| pair.value().is_connected())
241 .map(|pair| (pair.key().to_owned(), pair.value().connections.clone()))
242 .collect()
243 }
244
245 pub fn best_peer(&self) -> Option<PeerId> {
247 const MAX_PEER_SAMPLE: usize = 128;
248
249 let mut peers = self
251 .peers
252 .iter()
253 .filter(|pair| pair.value().is_connected())
254 .take(MAX_PEER_SAMPLE)
255 .map(|pair| pair.key().to_owned())
256 .collect::<SmallVec<[_; MAX_PEER_SAMPLE]>>();
257
258 peers.shuffle(&mut rand::thread_rng());
259
260 peers.first().copied()
261 }
262
263 #[allow(dead_code)]
265 pub fn best_n_peers(&self, limit: usize) -> Vec<PeerId> {
266 self.peers
268 .iter()
269 .filter(|pair| pair.value().is_connected())
270 .take(limit)
271 .map(|pair| pair.key().to_owned())
272 .collect()
274 }
275
276 pub fn trusted_n_peers(&self, limit: usize) -> Vec<PeerId> {
278 self.peers
279 .iter()
280 .filter(|pair| pair.value().is_connected() && pair.value().trusted)
281 .take(limit)
282 .map(|pair| pair.key().to_owned())
283 .collect()
285 }
286}
287
288fn increment_connected_peers(info_tx: &watch::Sender<PeerTrackerInfo>, trusted: bool) {
289 info_tx.send_modify(|tracker_info| {
290 tracker_info.num_connected_peers += 1;
291
292 if trusted {
293 tracker_info.num_connected_trusted_peers += 1;
294 }
295 });
296}
297
298fn decrement_connected_peers(info_tx: &watch::Sender<PeerTrackerInfo>, trusted: bool) {
299 info_tx.send_modify(|tracker_info| {
300 tracker_info.num_connected_peers -= 1;
301
302 if trusted {
303 tracker_info.num_connected_trusted_peers -= 1;
304 }
305 });
306}
307
308#[cfg(test)]
309mod tests {
310 use crate::events::EventChannel;
311
312 use super::*;
313
314 #[test]
315 fn trust_before_connect() {
316 let event_channel = EventChannel::new();
317 let tracker = PeerTracker::new(event_channel.publisher());
318 let mut watcher = tracker.info_watcher();
319 let peer = PeerId::random();
320
321 assert!(!watcher.has_changed().unwrap());
322
323 tracker.set_trusted(peer, true);
324 assert!(!watcher.has_changed().unwrap());
325
326 tracker.set_connected(peer, ConnectionId::new_unchecked(1), None);
327 assert!(watcher.has_changed().unwrap());
328 let info = watcher.borrow_and_update().to_owned();
329 assert_eq!(info.num_connected_peers, 1);
330 assert_eq!(info.num_connected_trusted_peers, 1);
331 }
332
333 #[test]
334 fn trust_after_connect() {
335 let event_channel = EventChannel::new();
336 let tracker = PeerTracker::new(event_channel.publisher());
337 let mut watcher = tracker.info_watcher();
338 let peer = PeerId::random();
339
340 assert!(!watcher.has_changed().unwrap());
341
342 tracker.set_connected(peer, ConnectionId::new_unchecked(1), None);
343 assert!(watcher.has_changed().unwrap());
344 let info = watcher.borrow_and_update().to_owned();
345 assert_eq!(info.num_connected_peers, 1);
346 assert_eq!(info.num_connected_trusted_peers, 0);
347
348 tracker.set_trusted(peer, true);
349 assert!(watcher.has_changed().unwrap());
350 let info = watcher.borrow_and_update().to_owned();
351 assert_eq!(info.num_connected_peers, 1);
352 assert_eq!(info.num_connected_trusted_peers, 1);
353 }
354
355 #[test]
356 fn untrust_after_connect() {
357 let event_channel = EventChannel::new();
358 let tracker = PeerTracker::new(event_channel.publisher());
359 let mut watcher = tracker.info_watcher();
360 let peer = PeerId::random();
361
362 assert!(!watcher.has_changed().unwrap());
363
364 tracker.set_trusted(peer, true);
365 assert!(!watcher.has_changed().unwrap());
366
367 tracker.set_connected(peer, ConnectionId::new_unchecked(1), None);
368 assert!(watcher.has_changed().unwrap());
369 let info = watcher.borrow_and_update().to_owned();
370 assert_eq!(info.num_connected_peers, 1);
371 assert_eq!(info.num_connected_trusted_peers, 1);
372
373 tracker.set_trusted(peer, false);
374 assert!(watcher.has_changed().unwrap());
375 let info = watcher.borrow_and_update().to_owned();
376 assert_eq!(info.num_connected_peers, 1);
377 assert_eq!(info.num_connected_trusted_peers, 0);
378 }
379}