1use std::collections::hash_map::Entry;
4use std::collections::{HashMap, HashSet};
5use std::time::Duration;
6
7use libp2p::ping;
8use libp2p::{PeerId, swarm::ConnectionId};
9use lumina_utils::time::Instant;
10use rand::seq::SliceRandom;
11use serde::{Deserialize, Serialize};
12use smallvec::SmallVec;
13use tokio::sync::watch;
14use tracing::info;
15
16use crate::events::{EventPublisher, NodeEvent};
17
18pub(crate) const GC_INTERVAL: Duration = Duration::from_secs(30);
20const EXPIRED_AFTER: Duration = Duration::from_secs(120);
22
23#[derive(Debug)]
25pub(crate) struct PeerTracker {
26 peers: HashMap<PeerId, Peer>,
27 protect_counter: HashMap<u32, usize>,
28 info_tx: watch::Sender<PeerTrackerInfo>,
29 event_pub: EventPublisher,
30}
31
32#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
34#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
35pub struct PeerTrackerInfo {
36 pub num_connected_peers: u64,
38 pub num_connected_trusted_peers: u64,
40 pub num_connected_full_nodes: u64,
43 pub num_connected_archival_nodes: u64,
46}
47
48#[derive(Debug)]
49pub(crate) struct Peer {
50 id: PeerId,
51 connections: HashMap<ConnectionId, ConnectionInfo>,
52 protected: HashSet<u32>,
53 trusted: bool,
54 archival: bool,
55 node_kind: NodeKind,
56 disconnected_at: Option<Instant>,
57}
58
59#[derive(Debug, Default)]
60struct ConnectionInfo {
61 ping: Option<Duration>,
62}
63
64#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
65pub(crate) enum NodeKind {
66 #[default]
67 Unknown,
68 Bridge,
69 Full,
70 Light,
71}
72
73impl NodeKind {
74 fn from_agent_version(s: &str) -> NodeKind {
75 let mut s = s.split('/');
76
77 match s.next() {
78 Some("lumina") => NodeKind::Light,
79 Some("celestia-node") => match s.nth(1) {
80 Some("bridge") => NodeKind::Bridge,
81 Some("full") => NodeKind::Full,
82 Some("light") => NodeKind::Light,
83 _ => NodeKind::Unknown,
84 },
85 _ => NodeKind::Unknown,
86 }
87 }
88
89 pub(crate) fn is_full(&self) -> bool {
90 matches!(self, NodeKind::Full | NodeKind::Bridge)
91 }
92}
93
94impl Peer {
95 fn new(id: PeerId) -> Self {
96 Peer {
97 id,
98 connections: HashMap::new(),
99 protected: HashSet::new(),
100 trusted: false,
101 archival: false,
102 node_kind: NodeKind::Unknown,
103 disconnected_at: Some(Instant::now()),
105 }
106 }
107
108 pub(crate) fn id(&self) -> &PeerId {
109 &self.id
110 }
111
112 pub(crate) fn is_connected(&self) -> bool {
113 !self.connections.is_empty()
114 }
115
116 pub(crate) fn is_trusted(&self) -> bool {
117 self.trusted
118 }
119
120 pub(crate) fn is_protected(&self) -> bool {
121 !self.protected.is_empty()
122 }
123
124 pub(crate) fn is_protected_with_tag(&self, tag: u32) -> bool {
125 self.protected.contains(&tag)
126 }
127
128 pub(crate) fn is_archival(&self) -> bool {
129 self.archival
130 }
131
132 pub(crate) fn is_full(&self) -> bool {
133 self.node_kind.is_full()
134 }
135
136 #[allow(dead_code)]
137 pub(crate) fn node_kind(&self) -> NodeKind {
138 self.node_kind
139 }
140
141 pub(crate) fn best_ping(&self) -> Option<Duration> {
142 self.connections
143 .iter()
144 .flat_map(|(_, conn_info)| conn_info.ping)
145 .min()
146 }
147}
148
149impl PeerTracker {
150 pub(crate) fn new(event_pub: EventPublisher) -> Self {
152 PeerTracker {
153 peers: HashMap::new(),
154 protect_counter: HashMap::new(),
155 info_tx: watch::channel(PeerTrackerInfo::default()).0,
156 event_pub,
157 }
158 }
159
160 pub(crate) fn info(&self) -> PeerTrackerInfo {
162 self.info_tx.borrow().to_owned()
163 }
164
165 pub(crate) fn info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
167 self.info_tx.subscribe()
168 }
169
170 pub(crate) fn peer(&self, peer_id: &PeerId) -> Option<&Peer> {
171 self.peers.get(peer_id)
172 }
173
174 pub(crate) fn peers(&self) -> impl Iterator<Item = &Peer> {
175 self.peers.values()
176 }
177
178 pub(crate) fn is_connected(&self, peer_id: &PeerId) -> bool {
179 self.peer(peer_id).is_some_and(|p| p.is_connected())
180 }
181
182 pub(crate) fn is_protected(&self, peer_id: &PeerId) -> bool {
183 self.peer(peer_id).is_some_and(|p| p.is_protected())
184 }
185
186 #[allow(dead_code)]
187 pub(crate) fn is_protected_with_tag(&self, peer_id: &PeerId, tag: u32) -> bool {
188 self.peer(peer_id)
189 .is_some_and(|p| p.is_protected_with_tag(tag))
190 }
191
192 pub(crate) fn add_peer_id(&mut self, peer_id: &PeerId) -> bool {
196 match self.peers.entry(*peer_id) {
197 Entry::Vacant(entry) => {
198 entry.insert(Peer::new(*peer_id));
199 true
200 }
201 Entry::Occupied(_) => false,
202 }
203 }
204
205 pub(crate) fn set_trusted(&mut self, peer_id: &PeerId, is_trusted: bool) {
207 let peer = self
208 .peers
209 .entry(*peer_id)
210 .or_insert_with(|| Peer::new(*peer_id));
211
212 peer.trusted = is_trusted;
213 self.recount_peer_tracker_info();
214 }
215
216 pub(crate) fn protect(&mut self, peer_id: &PeerId, tag: u32) -> bool {
222 let peer = self
223 .peers
224 .entry(*peer_id)
225 .or_insert_with(|| Peer::new(*peer_id));
226 let was_protected = peer.is_protected();
227
228 if peer.protected.insert(tag) {
229 *self.protect_counter.entry(tag).or_default() += 1;
230 info!("Protect peer {peer_id} with {tag} tag");
231 }
232
233 !was_protected
234 }
235
236 pub(crate) fn unprotect(&mut self, peer_id: &PeerId, tag: u32) -> bool {
242 let Some(peer) = self.peers.get_mut(peer_id) else {
243 return false;
244 };
245
246 let was_protected = peer.is_protected();
247
248 if peer.protected.remove(&tag) {
249 *self
250 .protect_counter
251 .get_mut(&tag)
252 .expect("protected flag was set but not counted") -= 1;
253
254 info!("Unprotect peer {peer_id} with {tag} tag");
255 }
256
257 was_protected && !peer.is_protected()
259 }
260
261 pub(crate) fn protected_len(&self, tag: u32) -> usize {
262 self.protect_counter.get(&tag).copied().unwrap_or(0)
263 }
264
265 pub(crate) fn add_connection(&mut self, peer_id: &PeerId, connection_id: ConnectionId) {
267 let peer = self
268 .peers
269 .entry(*peer_id)
270 .or_insert_with(|| Peer::new(*peer_id));
271 let prev_connected = peer.is_connected();
272
273 peer.connections
274 .insert(connection_id, ConnectionInfo::default());
275
276 if !prev_connected {
278 let trusted = peer.trusted;
279 peer.disconnected_at.take();
280 self.recount_peer_tracker_info();
281
282 self.event_pub.send(NodeEvent::PeerConnected {
283 id: *peer_id,
284 trusted,
285 });
286 }
287 }
288
289 pub(crate) fn remove_connection(&mut self, peer_id: &PeerId, connection_id: ConnectionId) {
291 let Some(peer) = self.peers.get_mut(peer_id) else {
292 return;
293 };
294
295 peer.connections.retain(|id, _| *id != connection_id);
296
297 if !peer.is_connected() {
299 let trusted = peer.trusted;
300 peer.node_kind = NodeKind::Unknown;
301 peer.archival = false;
302 peer.disconnected_at = Some(Instant::now());
303 self.recount_peer_tracker_info();
304
305 self.event_pub.send(NodeEvent::PeerDisconnected {
306 id: peer_id.to_owned(),
307 trusted,
308 });
309 }
310 }
311
312 pub(crate) fn on_agent_version(&mut self, peer_id: &PeerId, agent_version: &str) {
313 if let Some(peer) = self.peers.get_mut(peer_id)
314 && peer.is_connected()
315 {
316 peer.node_kind = NodeKind::from_agent_version(agent_version);
317 self.recount_peer_tracker_info();
318 }
319 }
320
321 pub(crate) fn on_ping_event(&mut self, ev: &ping::Event) {
322 if let Some(peer) = self.peers.get_mut(&ev.peer)
323 && let Some(conn_info) = peer.connections.get_mut(&ev.connection)
324 {
325 conn_info.ping = ev.result.as_ref().ok().copied();
326 }
327 }
328
329 pub(crate) fn mark_as_archival(&mut self, peer_id: &PeerId) {
330 let peer = self
331 .peers
332 .entry(*peer_id)
333 .or_insert_with(|| Peer::new(*peer_id));
334
335 peer.archival = true;
336 self.recount_peer_tracker_info();
337 }
338
339 pub(crate) fn connections(
340 &self,
341 peer_id: &PeerId,
342 ) -> impl Iterator<Item = ConnectionId> + use<'_> {
343 self.peer(peer_id)
344 .map(|peer| peer.connections.keys().copied())
345 .into_iter()
346 .flatten()
347 }
348
349 pub(crate) fn all_connections(&self) -> impl Iterator<Item = (&PeerId, ConnectionId)> {
351 self.peers()
352 .filter(|peer| peer.is_connected())
353 .flat_map(|peer| {
354 peer.connections
355 .keys()
356 .copied()
357 .map(|conn| (peer.id(), conn))
358 })
359 }
360
361 pub(crate) fn best_peer(&self) -> Option<PeerId> {
363 const MAX_PEER_SAMPLE: usize = 128;
364
365 let mut peers = self
367 .peers
368 .iter()
369 .filter(|(_, peer)| peer.is_connected())
370 .take(MAX_PEER_SAMPLE)
371 .map(|(peer_id, _)| peer_id)
372 .collect::<SmallVec<[_; MAX_PEER_SAMPLE]>>();
373
374 peers.shuffle(&mut rand::thread_rng());
375
376 peers.first().copied().copied()
377 }
378
379 fn recount_peer_tracker_info(&self) {
380 self.info_tx.send_if_modified(|info| {
381 let mut new_info = PeerTrackerInfo::default();
382
383 for peer in self.peers.values() {
384 if peer.is_connected() {
385 new_info.num_connected_peers += 1;
386
387 if peer.is_trusted() {
388 new_info.num_connected_trusted_peers += 1;
389 }
390
391 if peer.is_full() {
392 new_info.num_connected_full_nodes += 1;
393 }
394
395 if peer.is_archival() {
396 new_info.num_connected_archival_nodes += 1;
397 }
398 }
399 }
400
401 if *info != new_info {
402 *info = new_info;
403 true
404 } else {
405 false
406 }
407 });
408 }
409
410 pub(crate) fn gc(&mut self) {
411 self.peers.retain(|_, peer| {
412 peer.is_connected()
418 || peer.is_protected()
419 || peer
420 .disconnected_at
421 .is_none_or(|tm| tm.elapsed() <= EXPIRED_AFTER)
422 });
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use crate::events::EventChannel;
429
430 use super::*;
431
432 #[test]
433 fn trust_before_connect() {
434 let event_channel = EventChannel::new();
435 let mut tracker = PeerTracker::new(event_channel.publisher());
436 let mut watcher = tracker.info_watcher();
437 let peer_id = PeerId::random();
438
439 assert!(!watcher.has_changed().unwrap());
440
441 tracker.set_trusted(&peer_id, true);
442 assert!(!watcher.has_changed().unwrap());
443
444 tracker.add_connection(&peer_id, ConnectionId::new_unchecked(1));
445 assert!(tracker.is_connected(&peer_id));
446 assert!(watcher.has_changed().unwrap());
447 let info = watcher.borrow_and_update().to_owned();
448 assert_eq!(info.num_connected_peers, 1);
449 assert_eq!(info.num_connected_trusted_peers, 1);
450 }
451
452 #[test]
453 fn trust_after_connect() {
454 let event_channel = EventChannel::new();
455 let mut tracker = PeerTracker::new(event_channel.publisher());
456 let mut watcher = tracker.info_watcher();
457 let peer_id = PeerId::random();
458
459 assert!(!watcher.has_changed().unwrap());
460
461 tracker.add_connection(&peer_id, ConnectionId::new_unchecked(1));
462 assert!(tracker.is_connected(&peer_id));
463 assert!(watcher.has_changed().unwrap());
464 let info = watcher.borrow_and_update().to_owned();
465 assert_eq!(info.num_connected_peers, 1);
466 assert_eq!(info.num_connected_trusted_peers, 0);
467
468 tracker.set_trusted(&peer_id, true);
469 assert!(watcher.has_changed().unwrap());
470 let info = watcher.borrow_and_update().to_owned();
471 assert_eq!(info.num_connected_peers, 1);
472 assert_eq!(info.num_connected_trusted_peers, 1);
473 }
474
475 #[test]
476 fn untrust_after_connect() {
477 let event_channel = EventChannel::new();
478 let mut tracker = PeerTracker::new(event_channel.publisher());
479 let mut watcher = tracker.info_watcher();
480 let peer_id = PeerId::random();
481
482 assert!(!watcher.has_changed().unwrap());
483
484 tracker.set_trusted(&peer_id, true);
485 assert!(!watcher.has_changed().unwrap());
486
487 tracker.add_connection(&peer_id, ConnectionId::new_unchecked(1));
488 assert!(tracker.is_connected(&peer_id));
489 assert!(watcher.has_changed().unwrap());
490 let info = watcher.borrow_and_update().to_owned();
491 assert_eq!(info.num_connected_peers, 1);
492 assert_eq!(info.num_connected_trusted_peers, 1);
493
494 tracker.set_trusted(&peer_id, false);
495 assert!(watcher.has_changed().unwrap());
496 let info = watcher.borrow_and_update().to_owned();
497 assert_eq!(info.num_connected_peers, 1);
498 assert_eq!(info.num_connected_trusted_peers, 0);
499 }
500
501 #[test]
502 fn tracker_info() {
503 let event_channel = EventChannel::new();
504 let mut tracker = PeerTracker::new(event_channel.publisher());
505 let mut watcher = tracker.info_watcher();
506 let peer_id = PeerId::random();
507 let peer2_id = PeerId::random();
508
509 tracker.add_connection(&peer_id, ConnectionId::new_unchecked(1));
510 assert!(tracker.is_connected(&peer_id));
511 assert!(watcher.has_changed().unwrap());
512 let info = watcher.borrow_and_update().to_owned();
513 assert_eq!(
514 info,
515 PeerTrackerInfo {
516 num_connected_peers: 1,
517 num_connected_trusted_peers: 0,
518 num_connected_full_nodes: 0,
519 num_connected_archival_nodes: 0,
520 }
521 );
522
523 tracker.mark_as_archival(&peer_id);
524 tracker.mark_as_archival(&peer2_id);
525 assert!(watcher.has_changed().unwrap());
526 let info = watcher.borrow_and_update().to_owned();
527 assert_eq!(
528 info,
529 PeerTrackerInfo {
530 num_connected_peers: 1,
531 num_connected_trusted_peers: 0,
532 num_connected_full_nodes: 0,
533 num_connected_archival_nodes: 1,
534 }
535 );
536
537 tracker.mark_as_archival(&peer_id);
538 assert!(!watcher.has_changed().unwrap());
539
540 tracker.on_agent_version(&peer_id, "celestia-node/celestia/full/v0.24.1/fb95d45");
541 assert!(watcher.has_changed().unwrap());
542 let info = watcher.borrow_and_update().to_owned();
543 assert_eq!(
544 info,
545 PeerTrackerInfo {
546 num_connected_peers: 1,
547 num_connected_trusted_peers: 0,
548 num_connected_full_nodes: 1,
549 num_connected_archival_nodes: 1,
550 }
551 );
552
553 tracker.on_agent_version(&peer_id, "celestia-node/celestia/full/v0.24.1/fb95d45");
554 assert!(!watcher.has_changed().unwrap());
555
556 tracker.add_connection(&peer2_id, ConnectionId::new_unchecked(2));
559 assert!(watcher.has_changed().unwrap());
560 let info = watcher.borrow_and_update().to_owned();
561 assert_eq!(
562 info,
563 PeerTrackerInfo {
564 num_connected_peers: 2,
565 num_connected_trusted_peers: 0,
566 num_connected_full_nodes: 1,
567 num_connected_archival_nodes: 2,
568 }
569 );
570
571 tracker.remove_connection(&peer_id, ConnectionId::new_unchecked(1));
573 assert!(watcher.has_changed().unwrap());
574 let info = watcher.borrow_and_update().to_owned();
575 assert_eq!(
576 info,
577 PeerTrackerInfo {
578 num_connected_peers: 1,
579 num_connected_trusted_peers: 0,
580 num_connected_full_nodes: 0,
581 num_connected_archival_nodes: 1,
582 }
583 );
584
585 tracker.add_connection(&peer_id, ConnectionId::new_unchecked(3));
587 assert!(tracker.is_connected(&peer_id));
588 assert!(watcher.has_changed().unwrap());
589 let info = watcher.borrow_and_update().to_owned();
590 assert_eq!(
591 info,
592 PeerTrackerInfo {
593 num_connected_peers: 2,
594 num_connected_trusted_peers: 0,
595 num_connected_full_nodes: 0,
596 num_connected_archival_nodes: 1,
597 }
598 );
599 }
600
601 #[test]
602 fn protect() {
603 let peer_id = PeerId::random();
604 let event_channel = EventChannel::new();
605 let mut tracker = PeerTracker::new(event_channel.publisher());
606
607 assert!(!tracker.is_protected(&peer_id));
609 assert!(!tracker.unprotect(&peer_id, 0));
610 assert_eq!(tracker.protected_len(0), 0);
611
612 assert!(!tracker.is_protected_with_tag(&peer_id, 0));
614 assert!(tracker.protect(&peer_id, 0));
615 assert!(tracker.is_protected(&peer_id));
616 assert!(tracker.is_protected_with_tag(&peer_id, 0));
617 assert_eq!(tracker.protected_len(0), 1);
618 assert!(!tracker.is_protected_with_tag(&peer_id, 1));
620 assert!(!tracker.protect(&peer_id, 1));
621 assert!(tracker.is_protected(&peer_id));
622 assert!(tracker.is_protected_with_tag(&peer_id, 1));
623 assert_eq!(tracker.protected_len(1), 1);
624
625 assert!(!tracker.protect(&peer_id, 0));
627 assert_eq!(tracker.protected_len(0), 1);
628 assert!(tracker.protect(&PeerId::random(), 0));
630 assert_eq!(tracker.protected_len(0), 2);
631
632 assert!(!tracker.unprotect(&peer_id, 0));
634 assert!(!tracker.is_protected_with_tag(&peer_id, 0));
635 assert!(tracker.is_protected(&peer_id));
636 assert_eq!(tracker.protected_len(0), 1);
637 assert!(tracker.unprotect(&peer_id, 1));
639 assert!(!tracker.is_protected_with_tag(&peer_id, 1));
640 assert!(!tracker.is_protected(&peer_id));
641 assert_eq!(tracker.protected_len(1), 0);
642 }
643
644 #[test]
645 fn node_kind() {
646 assert_eq!(
647 NodeKind::from_agent_version("lumina/celestia/0.14.0"),
648 NodeKind::Light
649 );
650
651 assert_eq!(
652 NodeKind::from_agent_version("celestia-node/celestia/bridge/v0.24.1/fb95d45"),
653 NodeKind::Bridge
654 );
655
656 assert_eq!(
657 NodeKind::from_agent_version("celestia-node/celestia/full/v0.24.1/fb95d45"),
658 NodeKind::Full
659 );
660
661 assert_eq!(
662 NodeKind::from_agent_version("celestia-node/celestia/light/v0.24.1/fb95d45"),
663 NodeKind::Light
664 );
665
666 assert_eq!(
667 NodeKind::from_agent_version("probelab-node/celestia/ant/v0.1.0"),
668 NodeKind::Unknown
669 );
670 }
671}