1use std::collections::hash_map::Entry;
4use std::collections::{HashMap, HashSet};
5use std::time::Duration;
6
7use libp2p::ping;
8use libp2p::{PeerId, swarm::ConnectionId};
9use lumina_utils::time::Instant;
10use serde::{Deserialize, Serialize};
11use tokio::sync::watch;
12use tracing::info;
13
14use crate::events::{EventPublisher, NodeEvent};
15
16pub(crate) const GC_INTERVAL: Duration = Duration::from_secs(30);
18const EXPIRED_AFTER: Duration = Duration::from_secs(120);
20
21#[derive(Debug)]
23pub(crate) struct PeerTracker {
24 peers: HashMap<PeerId, Peer>,
25 protect_counter: HashMap<u32, usize>,
26 info_tx: watch::Sender<PeerTrackerInfo>,
27 event_pub: EventPublisher,
28}
29
30#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
32#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
33pub struct PeerTrackerInfo {
34 pub num_connected_peers: u64,
36 pub num_connected_trusted_peers: u64,
38 pub num_connected_full_nodes: u64,
41 pub num_connected_archival_nodes: u64,
44}
45
46#[derive(Debug)]
47pub(crate) struct Peer {
48 id: PeerId,
49 connections: HashMap<ConnectionId, ConnectionInfo>,
50 protected: HashSet<u32>,
51 trusted: bool,
52 archival: bool,
53 node_kind: NodeKind,
54 disconnected_at: Option<Instant>,
55}
56
57#[derive(Debug, Default)]
58struct ConnectionInfo {
59 ping: Option<Duration>,
60}
61
62#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
63pub(crate) enum NodeKind {
64 #[default]
65 Unknown,
66 Bridge,
67 Full,
68 Light,
69}
70
71impl NodeKind {
72 fn from_agent_version(s: &str) -> NodeKind {
73 let mut s = s.split('/');
74
75 match s.next() {
76 Some("lumina") => NodeKind::Light,
77 Some("celestia-node") => match s.nth(1) {
78 Some("bridge") => NodeKind::Bridge,
79 Some("full") => NodeKind::Full,
80 Some("light") => NodeKind::Light,
81 _ => NodeKind::Unknown,
82 },
83 _ => NodeKind::Unknown,
84 }
85 }
86
87 pub(crate) fn is_full(&self) -> bool {
88 matches!(self, NodeKind::Full | NodeKind::Bridge)
89 }
90}
91
92impl Peer {
93 fn new(id: PeerId) -> Self {
94 Peer {
95 id,
96 connections: HashMap::new(),
97 protected: HashSet::new(),
98 trusted: false,
99 archival: false,
100 node_kind: NodeKind::Unknown,
101 disconnected_at: Some(Instant::now()),
103 }
104 }
105
106 pub(crate) fn id(&self) -> &PeerId {
107 &self.id
108 }
109
110 pub(crate) fn is_connected(&self) -> bool {
111 !self.connections.is_empty()
112 }
113
114 pub(crate) fn is_trusted(&self) -> bool {
115 self.trusted
116 }
117
118 pub(crate) fn is_protected(&self) -> bool {
119 !self.protected.is_empty()
120 }
121
122 pub(crate) fn is_protected_with_tag(&self, tag: u32) -> bool {
123 self.protected.contains(&tag)
124 }
125
126 pub(crate) fn is_archival(&self) -> bool {
127 self.archival
128 }
129
130 pub(crate) fn is_full(&self) -> bool {
131 self.node_kind.is_full()
132 }
133
134 #[allow(dead_code)]
135 pub(crate) fn node_kind(&self) -> NodeKind {
136 self.node_kind
137 }
138
139 pub(crate) fn best_ping(&self) -> Option<Duration> {
140 self.connections
141 .iter()
142 .flat_map(|(_, conn_info)| conn_info.ping)
143 .min()
144 }
145}
146
147impl PeerTracker {
148 pub(crate) fn new(event_pub: EventPublisher) -> Self {
150 PeerTracker {
151 peers: HashMap::new(),
152 protect_counter: HashMap::new(),
153 info_tx: watch::channel(PeerTrackerInfo::default()).0,
154 event_pub,
155 }
156 }
157
158 pub(crate) fn info(&self) -> PeerTrackerInfo {
160 self.info_tx.borrow().to_owned()
161 }
162
163 pub(crate) fn info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
165 self.info_tx.subscribe()
166 }
167
168 pub(crate) fn peer(&self, peer_id: &PeerId) -> Option<&Peer> {
169 self.peers.get(peer_id)
170 }
171
172 pub(crate) fn peers(&self) -> impl Iterator<Item = &Peer> {
173 self.peers.values()
174 }
175
176 pub(crate) fn is_connected(&self, peer_id: &PeerId) -> bool {
177 self.peer(peer_id).is_some_and(|p| p.is_connected())
178 }
179
180 pub(crate) fn is_protected(&self, peer_id: &PeerId) -> bool {
181 self.peer(peer_id).is_some_and(|p| p.is_protected())
182 }
183
184 #[allow(dead_code)]
185 pub(crate) fn is_protected_with_tag(&self, peer_id: &PeerId, tag: u32) -> bool {
186 self.peer(peer_id)
187 .is_some_and(|p| p.is_protected_with_tag(tag))
188 }
189
190 pub(crate) fn add_peer_id(&mut self, peer_id: &PeerId) -> bool {
194 match self.peers.entry(*peer_id) {
195 Entry::Vacant(entry) => {
196 entry.insert(Peer::new(*peer_id));
197 true
198 }
199 Entry::Occupied(_) => false,
200 }
201 }
202
203 pub(crate) fn set_trusted(&mut self, peer_id: &PeerId, is_trusted: bool) {
205 let peer = self
206 .peers
207 .entry(*peer_id)
208 .or_insert_with(|| Peer::new(*peer_id));
209
210 peer.trusted = is_trusted;
211 self.recount_peer_tracker_info();
212 }
213
214 pub(crate) fn protect(&mut self, peer_id: &PeerId, tag: u32) -> bool {
220 let peer = self
221 .peers
222 .entry(*peer_id)
223 .or_insert_with(|| Peer::new(*peer_id));
224 let was_protected = peer.is_protected();
225
226 if peer.protected.insert(tag) {
227 *self.protect_counter.entry(tag).or_default() += 1;
228 info!("Protect peer {peer_id} with {tag} tag");
229 }
230
231 !was_protected
232 }
233
234 pub(crate) fn unprotect(&mut self, peer_id: &PeerId, tag: u32) -> bool {
240 let Some(peer) = self.peers.get_mut(peer_id) else {
241 return false;
242 };
243
244 let was_protected = peer.is_protected();
245
246 if peer.protected.remove(&tag) {
247 *self
248 .protect_counter
249 .get_mut(&tag)
250 .expect("protected flag was set but not counted") -= 1;
251
252 info!("Unprotect peer {peer_id} with {tag} tag");
253 }
254
255 was_protected && !peer.is_protected()
257 }
258
259 pub(crate) fn protected_len(&self, tag: u32) -> usize {
260 self.protect_counter.get(&tag).copied().unwrap_or(0)
261 }
262
263 pub(crate) fn add_connection(&mut self, peer_id: &PeerId, connection_id: ConnectionId) {
265 let peer = self
266 .peers
267 .entry(*peer_id)
268 .or_insert_with(|| Peer::new(*peer_id));
269 let prev_connected = peer.is_connected();
270
271 peer.connections
272 .insert(connection_id, ConnectionInfo::default());
273
274 if !prev_connected {
276 let trusted = peer.trusted;
277 peer.disconnected_at.take();
278 self.recount_peer_tracker_info();
279
280 self.event_pub.send(NodeEvent::PeerConnected {
281 id: *peer_id,
282 trusted,
283 });
284 }
285 }
286
287 pub(crate) fn remove_connection(&mut self, peer_id: &PeerId, connection_id: ConnectionId) {
289 let Some(peer) = self.peers.get_mut(peer_id) else {
290 return;
291 };
292
293 peer.connections.retain(|id, _| *id != connection_id);
294
295 if !peer.is_connected() {
297 let trusted = peer.trusted;
298 peer.node_kind = NodeKind::Unknown;
299 peer.archival = false;
300 peer.disconnected_at = Some(Instant::now());
301 self.recount_peer_tracker_info();
302
303 self.event_pub.send(NodeEvent::PeerDisconnected {
304 id: peer_id.to_owned(),
305 trusted,
306 });
307 }
308 }
309
310 pub(crate) fn on_agent_version(&mut self, peer_id: &PeerId, agent_version: &str) {
311 if let Some(peer) = self.peers.get_mut(peer_id)
312 && peer.is_connected()
313 {
314 peer.node_kind = NodeKind::from_agent_version(agent_version);
315 self.recount_peer_tracker_info();
316 }
317 }
318
319 pub(crate) fn on_ping_event(&mut self, ev: &ping::Event) {
320 if let Some(peer) = self.peers.get_mut(&ev.peer)
321 && let Some(conn_info) = peer.connections.get_mut(&ev.connection)
322 {
323 conn_info.ping = ev.result.as_ref().ok().copied();
324 }
325 }
326
327 pub(crate) fn mark_as_archival(&mut self, peer_id: &PeerId) {
328 let peer = self
329 .peers
330 .entry(*peer_id)
331 .or_insert_with(|| Peer::new(*peer_id));
332
333 peer.archival = true;
334 self.recount_peer_tracker_info();
335 }
336
337 pub(crate) fn connections(
338 &self,
339 peer_id: &PeerId,
340 ) -> impl Iterator<Item = ConnectionId> + use<'_> {
341 self.peer(peer_id)
342 .map(|peer| peer.connections.keys().copied())
343 .into_iter()
344 .flatten()
345 }
346
347 pub(crate) fn all_connections(&self) -> impl Iterator<Item = (&PeerId, ConnectionId)> {
349 self.peers()
350 .filter(|peer| peer.is_connected())
351 .flat_map(|peer| {
352 peer.connections
353 .keys()
354 .copied()
355 .map(|conn| (peer.id(), conn))
356 })
357 }
358
359 fn recount_peer_tracker_info(&self) {
360 self.info_tx.send_if_modified(|info| {
361 let mut new_info = PeerTrackerInfo::default();
362
363 for peer in self.peers.values() {
364 if peer.is_connected() {
365 new_info.num_connected_peers += 1;
366
367 if peer.is_trusted() {
368 new_info.num_connected_trusted_peers += 1;
369 }
370
371 if peer.is_full() {
372 new_info.num_connected_full_nodes += 1;
373 }
374
375 if peer.is_archival() {
376 new_info.num_connected_archival_nodes += 1;
377 }
378 }
379 }
380
381 if *info != new_info {
382 *info = new_info;
383 true
384 } else {
385 false
386 }
387 });
388 }
389
390 pub(crate) fn gc(&mut self) {
391 self.peers.retain(|_, peer| {
392 peer.is_connected()
398 || peer.is_protected()
399 || peer
400 .disconnected_at
401 .is_none_or(|tm| tm.elapsed() <= EXPIRED_AFTER)
402 });
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use crate::events::EventChannel;
409
410 use super::*;
411
412 #[test]
413 fn trust_before_connect() {
414 let event_channel = EventChannel::new();
415 let mut tracker = PeerTracker::new(event_channel.publisher());
416 let mut watcher = tracker.info_watcher();
417 let peer_id = PeerId::random();
418
419 assert!(!watcher.has_changed().unwrap());
420
421 tracker.set_trusted(&peer_id, true);
422 assert!(!watcher.has_changed().unwrap());
423
424 tracker.add_connection(&peer_id, ConnectionId::new_unchecked(1));
425 assert!(tracker.is_connected(&peer_id));
426 assert!(watcher.has_changed().unwrap());
427 let info = watcher.borrow_and_update().to_owned();
428 assert_eq!(info.num_connected_peers, 1);
429 assert_eq!(info.num_connected_trusted_peers, 1);
430 }
431
432 #[test]
433 fn trust_after_connect() {
434 let event_channel = EventChannel::new();
435 let mut tracker = PeerTracker::new(event_channel.publisher());
436 let mut watcher = tracker.info_watcher();
437 let peer_id = PeerId::random();
438
439 assert!(!watcher.has_changed().unwrap());
440
441 tracker.add_connection(&peer_id, ConnectionId::new_unchecked(1));
442 assert!(tracker.is_connected(&peer_id));
443 assert!(watcher.has_changed().unwrap());
444 let info = watcher.borrow_and_update().to_owned();
445 assert_eq!(info.num_connected_peers, 1);
446 assert_eq!(info.num_connected_trusted_peers, 0);
447
448 tracker.set_trusted(&peer_id, true);
449 assert!(watcher.has_changed().unwrap());
450 let info = watcher.borrow_and_update().to_owned();
451 assert_eq!(info.num_connected_peers, 1);
452 assert_eq!(info.num_connected_trusted_peers, 1);
453 }
454
455 #[test]
456 fn untrust_after_connect() {
457 let event_channel = EventChannel::new();
458 let mut tracker = PeerTracker::new(event_channel.publisher());
459 let mut watcher = tracker.info_watcher();
460 let peer_id = PeerId::random();
461
462 assert!(!watcher.has_changed().unwrap());
463
464 tracker.set_trusted(&peer_id, true);
465 assert!(!watcher.has_changed().unwrap());
466
467 tracker.add_connection(&peer_id, ConnectionId::new_unchecked(1));
468 assert!(tracker.is_connected(&peer_id));
469 assert!(watcher.has_changed().unwrap());
470 let info = watcher.borrow_and_update().to_owned();
471 assert_eq!(info.num_connected_peers, 1);
472 assert_eq!(info.num_connected_trusted_peers, 1);
473
474 tracker.set_trusted(&peer_id, false);
475 assert!(watcher.has_changed().unwrap());
476 let info = watcher.borrow_and_update().to_owned();
477 assert_eq!(info.num_connected_peers, 1);
478 assert_eq!(info.num_connected_trusted_peers, 0);
479 }
480
481 #[test]
482 fn tracker_info() {
483 let event_channel = EventChannel::new();
484 let mut tracker = PeerTracker::new(event_channel.publisher());
485 let mut watcher = tracker.info_watcher();
486 let peer_id = PeerId::random();
487 let peer2_id = PeerId::random();
488
489 tracker.add_connection(&peer_id, ConnectionId::new_unchecked(1));
490 assert!(tracker.is_connected(&peer_id));
491 assert!(watcher.has_changed().unwrap());
492 let info = watcher.borrow_and_update().to_owned();
493 assert_eq!(
494 info,
495 PeerTrackerInfo {
496 num_connected_peers: 1,
497 num_connected_trusted_peers: 0,
498 num_connected_full_nodes: 0,
499 num_connected_archival_nodes: 0,
500 }
501 );
502
503 tracker.mark_as_archival(&peer_id);
504 tracker.mark_as_archival(&peer2_id);
505 assert!(watcher.has_changed().unwrap());
506 let info = watcher.borrow_and_update().to_owned();
507 assert_eq!(
508 info,
509 PeerTrackerInfo {
510 num_connected_peers: 1,
511 num_connected_trusted_peers: 0,
512 num_connected_full_nodes: 0,
513 num_connected_archival_nodes: 1,
514 }
515 );
516
517 tracker.mark_as_archival(&peer_id);
518 assert!(!watcher.has_changed().unwrap());
519
520 tracker.on_agent_version(&peer_id, "celestia-node/celestia/full/v0.24.1/fb95d45");
521 assert!(watcher.has_changed().unwrap());
522 let info = watcher.borrow_and_update().to_owned();
523 assert_eq!(
524 info,
525 PeerTrackerInfo {
526 num_connected_peers: 1,
527 num_connected_trusted_peers: 0,
528 num_connected_full_nodes: 1,
529 num_connected_archival_nodes: 1,
530 }
531 );
532
533 tracker.on_agent_version(&peer_id, "celestia-node/celestia/full/v0.24.1/fb95d45");
534 assert!(!watcher.has_changed().unwrap());
535
536 tracker.add_connection(&peer2_id, ConnectionId::new_unchecked(2));
539 assert!(watcher.has_changed().unwrap());
540 let info = watcher.borrow_and_update().to_owned();
541 assert_eq!(
542 info,
543 PeerTrackerInfo {
544 num_connected_peers: 2,
545 num_connected_trusted_peers: 0,
546 num_connected_full_nodes: 1,
547 num_connected_archival_nodes: 2,
548 }
549 );
550
551 tracker.remove_connection(&peer_id, ConnectionId::new_unchecked(1));
553 assert!(watcher.has_changed().unwrap());
554 let info = watcher.borrow_and_update().to_owned();
555 assert_eq!(
556 info,
557 PeerTrackerInfo {
558 num_connected_peers: 1,
559 num_connected_trusted_peers: 0,
560 num_connected_full_nodes: 0,
561 num_connected_archival_nodes: 1,
562 }
563 );
564
565 tracker.add_connection(&peer_id, ConnectionId::new_unchecked(3));
567 assert!(tracker.is_connected(&peer_id));
568 assert!(watcher.has_changed().unwrap());
569 let info = watcher.borrow_and_update().to_owned();
570 assert_eq!(
571 info,
572 PeerTrackerInfo {
573 num_connected_peers: 2,
574 num_connected_trusted_peers: 0,
575 num_connected_full_nodes: 0,
576 num_connected_archival_nodes: 1,
577 }
578 );
579 }
580
581 #[test]
582 fn protect() {
583 let peer_id = PeerId::random();
584 let event_channel = EventChannel::new();
585 let mut tracker = PeerTracker::new(event_channel.publisher());
586
587 assert!(!tracker.is_protected(&peer_id));
589 assert!(!tracker.unprotect(&peer_id, 0));
590 assert_eq!(tracker.protected_len(0), 0);
591
592 assert!(!tracker.is_protected_with_tag(&peer_id, 0));
594 assert!(tracker.protect(&peer_id, 0));
595 assert!(tracker.is_protected(&peer_id));
596 assert!(tracker.is_protected_with_tag(&peer_id, 0));
597 assert_eq!(tracker.protected_len(0), 1);
598 assert!(!tracker.is_protected_with_tag(&peer_id, 1));
600 assert!(!tracker.protect(&peer_id, 1));
601 assert!(tracker.is_protected(&peer_id));
602 assert!(tracker.is_protected_with_tag(&peer_id, 1));
603 assert_eq!(tracker.protected_len(1), 1);
604
605 assert!(!tracker.protect(&peer_id, 0));
607 assert_eq!(tracker.protected_len(0), 1);
608 assert!(tracker.protect(&PeerId::random(), 0));
610 assert_eq!(tracker.protected_len(0), 2);
611
612 assert!(!tracker.unprotect(&peer_id, 0));
614 assert!(!tracker.is_protected_with_tag(&peer_id, 0));
615 assert!(tracker.is_protected(&peer_id));
616 assert_eq!(tracker.protected_len(0), 1);
617 assert!(tracker.unprotect(&peer_id, 1));
619 assert!(!tracker.is_protected_with_tag(&peer_id, 1));
620 assert!(!tracker.is_protected(&peer_id));
621 assert_eq!(tracker.protected_len(1), 0);
622 }
623
624 #[test]
625 fn node_kind() {
626 assert_eq!(
627 NodeKind::from_agent_version("lumina/celestia/0.14.0"),
628 NodeKind::Light
629 );
630
631 assert_eq!(
632 NodeKind::from_agent_version("celestia-node/celestia/bridge/v0.24.1/fb95d45"),
633 NodeKind::Bridge
634 );
635
636 assert_eq!(
637 NodeKind::from_agent_version("celestia-node/celestia/full/v0.24.1/fb95d45"),
638 NodeKind::Full
639 );
640
641 assert_eq!(
642 NodeKind::from_agent_version("celestia-node/celestia/light/v0.24.1/fb95d45"),
643 NodeKind::Light
644 );
645
646 assert_eq!(
647 NodeKind::from_agent_version("probelab-node/celestia/ant/v0.1.0"),
648 NodeKind::Unknown
649 );
650 }
651}