lumina_node/
peer_tracker.rs

1//! Primitives related to tracking the state of peers in the network.
2
3use std::collections::hash_map::Entry;
4use std::collections::{HashMap, HashSet};
5use std::time::Duration;
6
7use libp2p::ping;
8use libp2p::{PeerId, swarm::ConnectionId};
9use lumina_utils::time::Instant;
10use rand::seq::SliceRandom;
11use serde::{Deserialize, Serialize};
12use smallvec::SmallVec;
13use tokio::sync::watch;
14use tracing::info;
15
16use crate::events::{EventPublisher, NodeEvent};
17
18/// How often garbage collector should be called.
19pub(crate) const GC_INTERVAL: Duration = Duration::from_secs(30);
20/// How much time a `Peer` needs to be disconnected to expire.
21const EXPIRED_AFTER: Duration = Duration::from_secs(120);
22
23/// Keeps track various information about peers.
24#[derive(Debug)]
25pub(crate) struct PeerTracker {
26    peers: HashMap<PeerId, Peer>,
27    protect_counter: HashMap<u32, usize>,
28    info_tx: watch::Sender<PeerTrackerInfo>,
29    event_pub: EventPublisher,
30}
31
32/// Statistics of the connected peers
33#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
34#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
35pub struct PeerTrackerInfo {
36    /// Number of the connected peers.
37    pub num_connected_peers: u64,
38    /// Number of the connected trusted peers.
39    pub num_connected_trusted_peers: u64,
40    /// Number of the connected full nodes.
41    // This is used by `SwarmManager` in order to trigger `peer_health_check`.
42    pub num_connected_full_nodes: u64,
43    /// Number of the connected archival nodes.
44    // This is used by `SwarmManager` in order to trigger `peer_health_check`.
45    pub num_connected_archival_nodes: u64,
46}
47
48#[derive(Debug)]
49pub(crate) struct Peer {
50    id: PeerId,
51    connections: HashMap<ConnectionId, ConnectionInfo>,
52    protected: HashSet<u32>,
53    trusted: bool,
54    archival: bool,
55    node_kind: NodeKind,
56    disconnected_at: Option<Instant>,
57}
58
59#[derive(Debug, Default)]
60struct ConnectionInfo {
61    ping: Option<Duration>,
62}
63
64#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
65pub(crate) enum NodeKind {
66    #[default]
67    Unknown,
68    Bridge,
69    Full,
70    Light,
71}
72
73impl NodeKind {
74    fn from_agent_version(s: &str) -> NodeKind {
75        let mut s = s.split('/');
76
77        match s.next() {
78            Some("lumina") => NodeKind::Light,
79            Some("celestia-node") => match s.nth(1) {
80                Some("bridge") => NodeKind::Bridge,
81                Some("full") => NodeKind::Full,
82                Some("light") => NodeKind::Light,
83                _ => NodeKind::Unknown,
84            },
85            _ => NodeKind::Unknown,
86        }
87    }
88
89    pub(crate) fn is_full(&self) -> bool {
90        matches!(self, NodeKind::Full | NodeKind::Bridge)
91    }
92}
93
94impl Peer {
95    fn new(id: PeerId) -> Self {
96        Peer {
97            id,
98            connections: HashMap::new(),
99            protected: HashSet::new(),
100            trusted: false,
101            archival: false,
102            node_kind: NodeKind::Unknown,
103            // We start as disconnected
104            disconnected_at: Some(Instant::now()),
105        }
106    }
107
108    pub(crate) fn id(&self) -> &PeerId {
109        &self.id
110    }
111
112    pub(crate) fn is_connected(&self) -> bool {
113        !self.connections.is_empty()
114    }
115
116    pub(crate) fn is_trusted(&self) -> bool {
117        self.trusted
118    }
119
120    pub(crate) fn is_protected(&self) -> bool {
121        !self.protected.is_empty()
122    }
123
124    pub(crate) fn is_protected_with_tag(&self, tag: u32) -> bool {
125        self.protected.contains(&tag)
126    }
127
128    pub(crate) fn is_archival(&self) -> bool {
129        self.archival
130    }
131
132    pub(crate) fn is_full(&self) -> bool {
133        self.node_kind.is_full()
134    }
135
136    #[allow(dead_code)]
137    pub(crate) fn node_kind(&self) -> NodeKind {
138        self.node_kind
139    }
140
141    pub(crate) fn best_ping(&self) -> Option<Duration> {
142        self.connections
143            .iter()
144            .flat_map(|(_, conn_info)| conn_info.ping)
145            .min()
146    }
147}
148
149impl PeerTracker {
150    /// Constructs an empty PeerTracker.
151    pub(crate) fn new(event_pub: EventPublisher) -> Self {
152        PeerTracker {
153            peers: HashMap::new(),
154            protect_counter: HashMap::new(),
155            info_tx: watch::channel(PeerTrackerInfo::default()).0,
156            event_pub,
157        }
158    }
159
160    /// Returns the current [`PeerTrackerInfo`].
161    pub(crate) fn info(&self) -> PeerTrackerInfo {
162        self.info_tx.borrow().to_owned()
163    }
164
165    /// Returns a watcher for any [`PeerTrackerInfo`] changes.
166    pub(crate) fn info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
167        self.info_tx.subscribe()
168    }
169
170    pub(crate) fn peer(&self, peer_id: &PeerId) -> Option<&Peer> {
171        self.peers.get(peer_id)
172    }
173
174    pub(crate) fn peers(&self) -> impl Iterator<Item = &Peer> {
175        self.peers.values()
176    }
177
178    pub(crate) fn is_connected(&self, peer_id: &PeerId) -> bool {
179        self.peer(peer_id).is_some_and(|p| p.is_connected())
180    }
181
182    pub(crate) fn is_protected(&self, peer_id: &PeerId) -> bool {
183        self.peer(peer_id).is_some_and(|p| p.is_protected())
184    }
185
186    #[allow(dead_code)]
187    pub(crate) fn is_protected_with_tag(&self, peer_id: &PeerId, tag: u32) -> bool {
188        self.peer(peer_id)
189            .is_some_and(|p| p.is_protected_with_tag(tag))
190    }
191
192    /// Adds a peer ID.
193    ///
194    /// Returns `true` if peer was not known from before.
195    pub(crate) fn add_peer_id(&mut self, peer_id: &PeerId) -> bool {
196        match self.peers.entry(*peer_id) {
197            Entry::Vacant(entry) => {
198                entry.insert(Peer::new(*peer_id));
199                true
200            }
201            Entry::Occupied(_) => false,
202        }
203    }
204
205    /// Sets peer as trusted.
206    pub(crate) fn set_trusted(&mut self, peer_id: &PeerId, is_trusted: bool) {
207        let peer = self
208            .peers
209            .entry(*peer_id)
210            .or_insert_with(|| Peer::new(*peer_id));
211
212        peer.trusted = is_trusted;
213        self.recount_peer_tracker_info();
214    }
215
216    /// Add protect flag to the peer.
217    ///
218    /// Tag allows having different reasons for protection without interfering with one another.
219    ///
220    /// Returns `true` if peer changes from unprotected state to protected.
221    pub(crate) fn protect(&mut self, peer_id: &PeerId, tag: u32) -> bool {
222        let peer = self
223            .peers
224            .entry(*peer_id)
225            .or_insert_with(|| Peer::new(*peer_id));
226        let was_protected = peer.is_protected();
227
228        if peer.protected.insert(tag) {
229            *self.protect_counter.entry(tag).or_default() += 1;
230            info!("Protect peer {peer_id} with {tag} tag");
231        }
232
233        !was_protected
234    }
235
236    /// Remove protect flag from the peer.
237    ///
238    /// Tag allows having different reasons for protection without interfering with one another.
239    ///
240    /// Returns `true` if peer changes from protected state to unprotected.
241    pub(crate) fn unprotect(&mut self, peer_id: &PeerId, tag: u32) -> bool {
242        let Some(peer) = self.peers.get_mut(peer_id) else {
243            return false;
244        };
245
246        let was_protected = peer.is_protected();
247
248        if peer.protected.remove(&tag) {
249            *self
250                .protect_counter
251                .get_mut(&tag)
252                .expect("protected flag was set but not counted") -= 1;
253
254            info!("Unprotect peer {peer_id} with {tag} tag");
255        }
256
257        // Return true if `protected` state changed
258        was_protected && !peer.is_protected()
259    }
260
261    pub(crate) fn protected_len(&self, tag: u32) -> usize {
262        self.protect_counter.get(&tag).copied().unwrap_or(0)
263    }
264
265    /// Add an active connection of a peer.
266    pub(crate) fn add_connection(&mut self, peer_id: &PeerId, connection_id: ConnectionId) {
267        let peer = self
268            .peers
269            .entry(*peer_id)
270            .or_insert_with(|| Peer::new(*peer_id));
271        let prev_connected = peer.is_connected();
272
273        peer.connections
274            .insert(connection_id, ConnectionInfo::default());
275
276        // If peer was not already connected from before
277        if !prev_connected {
278            let trusted = peer.trusted;
279            peer.disconnected_at.take();
280            self.recount_peer_tracker_info();
281
282            self.event_pub.send(NodeEvent::PeerConnected {
283                id: *peer_id,
284                trusted,
285            });
286        }
287    }
288
289    /// Remove a connection from the peer.
290    pub(crate) fn remove_connection(&mut self, peer_id: &PeerId, connection_id: ConnectionId) {
291        let Some(peer) = self.peers.get_mut(peer_id) else {
292            return;
293        };
294
295        peer.connections.retain(|id, _| *id != connection_id);
296
297        // If this is the last connection from the peer.
298        if !peer.is_connected() {
299            let trusted = peer.trusted;
300            peer.node_kind = NodeKind::Unknown;
301            peer.archival = false;
302            peer.disconnected_at = Some(Instant::now());
303            self.recount_peer_tracker_info();
304
305            self.event_pub.send(NodeEvent::PeerDisconnected {
306                id: peer_id.to_owned(),
307                trusted,
308            });
309        }
310    }
311
312    pub(crate) fn on_agent_version(&mut self, peer_id: &PeerId, agent_version: &str) {
313        if let Some(peer) = self.peers.get_mut(peer_id)
314            && peer.is_connected()
315        {
316            peer.node_kind = NodeKind::from_agent_version(agent_version);
317            self.recount_peer_tracker_info();
318        }
319    }
320
321    pub(crate) fn on_ping_event(&mut self, ev: &ping::Event) {
322        if let Some(peer) = self.peers.get_mut(&ev.peer)
323            && let Some(conn_info) = peer.connections.get_mut(&ev.connection)
324        {
325            conn_info.ping = ev.result.as_ref().ok().copied();
326        }
327    }
328
329    pub(crate) fn mark_as_archival(&mut self, peer_id: &PeerId) {
330        let peer = self
331            .peers
332            .entry(*peer_id)
333            .or_insert_with(|| Peer::new(*peer_id));
334
335        peer.archival = true;
336        self.recount_peer_tracker_info();
337    }
338
339    pub(crate) fn connections(
340        &self,
341        peer_id: &PeerId,
342    ) -> impl Iterator<Item = ConnectionId> + use<'_> {
343        self.peer(peer_id)
344            .map(|peer| peer.connections.keys().copied())
345            .into_iter()
346            .flatten()
347    }
348
349    /// Returns all connections.
350    pub(crate) fn all_connections(&self) -> impl Iterator<Item = (&PeerId, ConnectionId)> {
351        self.peers()
352            .filter(|peer| peer.is_connected())
353            .flat_map(|peer| {
354                peer.connections
355                    .keys()
356                    .copied()
357                    .map(|conn| (peer.id(), conn))
358            })
359    }
360
361    /// Returns one of the best peers.
362    pub(crate) fn best_peer(&self) -> Option<PeerId> {
363        const MAX_PEER_SAMPLE: usize = 128;
364
365        // TODO: Implement peer score and return the best.
366        let mut peers = self
367            .peers
368            .iter()
369            .filter(|(_, peer)| peer.is_connected())
370            .take(MAX_PEER_SAMPLE)
371            .map(|(peer_id, _)| peer_id)
372            .collect::<SmallVec<[_; MAX_PEER_SAMPLE]>>();
373
374        peers.shuffle(&mut rand::thread_rng());
375
376        peers.first().copied().copied()
377    }
378
379    fn recount_peer_tracker_info(&self) {
380        self.info_tx.send_if_modified(|info| {
381            let mut new_info = PeerTrackerInfo::default();
382
383            for peer in self.peers.values() {
384                if peer.is_connected() {
385                    new_info.num_connected_peers += 1;
386
387                    if peer.is_trusted() {
388                        new_info.num_connected_trusted_peers += 1;
389                    }
390
391                    if peer.is_full() {
392                        new_info.num_connected_full_nodes += 1;
393                    }
394
395                    if peer.is_archival() {
396                        new_info.num_connected_archival_nodes += 1;
397                    }
398                }
399            }
400
401            if *info != new_info {
402                *info = new_info;
403                true
404            } else {
405                false
406            }
407        });
408    }
409
410    pub(crate) fn gc(&mut self) {
411        self.peers.retain(|_, peer| {
412            // We keep:
413            //
414            // * Connected peers
415            // * Protected peers
416            // * Recently disconnected peers
417            peer.is_connected()
418                || peer.is_protected()
419                || peer
420                    .disconnected_at
421                    .is_none_or(|tm| tm.elapsed() <= EXPIRED_AFTER)
422        });
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use crate::events::EventChannel;
429
430    use super::*;
431
432    #[test]
433    fn trust_before_connect() {
434        let event_channel = EventChannel::new();
435        let mut tracker = PeerTracker::new(event_channel.publisher());
436        let mut watcher = tracker.info_watcher();
437        let peer_id = PeerId::random();
438
439        assert!(!watcher.has_changed().unwrap());
440
441        tracker.set_trusted(&peer_id, true);
442        assert!(!watcher.has_changed().unwrap());
443
444        tracker.add_connection(&peer_id, ConnectionId::new_unchecked(1));
445        assert!(tracker.is_connected(&peer_id));
446        assert!(watcher.has_changed().unwrap());
447        let info = watcher.borrow_and_update().to_owned();
448        assert_eq!(info.num_connected_peers, 1);
449        assert_eq!(info.num_connected_trusted_peers, 1);
450    }
451
452    #[test]
453    fn trust_after_connect() {
454        let event_channel = EventChannel::new();
455        let mut tracker = PeerTracker::new(event_channel.publisher());
456        let mut watcher = tracker.info_watcher();
457        let peer_id = PeerId::random();
458
459        assert!(!watcher.has_changed().unwrap());
460
461        tracker.add_connection(&peer_id, ConnectionId::new_unchecked(1));
462        assert!(tracker.is_connected(&peer_id));
463        assert!(watcher.has_changed().unwrap());
464        let info = watcher.borrow_and_update().to_owned();
465        assert_eq!(info.num_connected_peers, 1);
466        assert_eq!(info.num_connected_trusted_peers, 0);
467
468        tracker.set_trusted(&peer_id, true);
469        assert!(watcher.has_changed().unwrap());
470        let info = watcher.borrow_and_update().to_owned();
471        assert_eq!(info.num_connected_peers, 1);
472        assert_eq!(info.num_connected_trusted_peers, 1);
473    }
474
475    #[test]
476    fn untrust_after_connect() {
477        let event_channel = EventChannel::new();
478        let mut tracker = PeerTracker::new(event_channel.publisher());
479        let mut watcher = tracker.info_watcher();
480        let peer_id = PeerId::random();
481
482        assert!(!watcher.has_changed().unwrap());
483
484        tracker.set_trusted(&peer_id, true);
485        assert!(!watcher.has_changed().unwrap());
486
487        tracker.add_connection(&peer_id, ConnectionId::new_unchecked(1));
488        assert!(tracker.is_connected(&peer_id));
489        assert!(watcher.has_changed().unwrap());
490        let info = watcher.borrow_and_update().to_owned();
491        assert_eq!(info.num_connected_peers, 1);
492        assert_eq!(info.num_connected_trusted_peers, 1);
493
494        tracker.set_trusted(&peer_id, false);
495        assert!(watcher.has_changed().unwrap());
496        let info = watcher.borrow_and_update().to_owned();
497        assert_eq!(info.num_connected_peers, 1);
498        assert_eq!(info.num_connected_trusted_peers, 0);
499    }
500
501    #[test]
502    fn tracker_info() {
503        let event_channel = EventChannel::new();
504        let mut tracker = PeerTracker::new(event_channel.publisher());
505        let mut watcher = tracker.info_watcher();
506        let peer_id = PeerId::random();
507        let peer2_id = PeerId::random();
508
509        tracker.add_connection(&peer_id, ConnectionId::new_unchecked(1));
510        assert!(tracker.is_connected(&peer_id));
511        assert!(watcher.has_changed().unwrap());
512        let info = watcher.borrow_and_update().to_owned();
513        assert_eq!(
514            info,
515            PeerTrackerInfo {
516                num_connected_peers: 1,
517                num_connected_trusted_peers: 0,
518                num_connected_full_nodes: 0,
519                num_connected_archival_nodes: 0,
520            }
521        );
522
523        tracker.mark_as_archival(&peer_id);
524        tracker.mark_as_archival(&peer2_id);
525        assert!(watcher.has_changed().unwrap());
526        let info = watcher.borrow_and_update().to_owned();
527        assert_eq!(
528            info,
529            PeerTrackerInfo {
530                num_connected_peers: 1,
531                num_connected_trusted_peers: 0,
532                num_connected_full_nodes: 0,
533                num_connected_archival_nodes: 1,
534            }
535        );
536
537        tracker.mark_as_archival(&peer_id);
538        assert!(!watcher.has_changed().unwrap());
539
540        tracker.on_agent_version(&peer_id, "celestia-node/celestia/full/v0.24.1/fb95d45");
541        assert!(watcher.has_changed().unwrap());
542        let info = watcher.borrow_and_update().to_owned();
543        assert_eq!(
544            info,
545            PeerTrackerInfo {
546                num_connected_peers: 1,
547                num_connected_trusted_peers: 0,
548                num_connected_full_nodes: 1,
549                num_connected_archival_nodes: 1,
550            }
551        );
552
553        tracker.on_agent_version(&peer_id, "celestia-node/celestia/full/v0.24.1/fb95d45");
554        assert!(!watcher.has_changed().unwrap());
555
556        // peer2_id connected, check that previous `mark_as_archival` is
557        // propagated in `PeerTrackerInfo`.
558        tracker.add_connection(&peer2_id, ConnectionId::new_unchecked(2));
559        assert!(watcher.has_changed().unwrap());
560        let info = watcher.borrow_and_update().to_owned();
561        assert_eq!(
562            info,
563            PeerTrackerInfo {
564                num_connected_peers: 2,
565                num_connected_trusted_peers: 0,
566                num_connected_full_nodes: 1,
567                num_connected_archival_nodes: 2,
568            }
569        );
570
571        // Peer gets disconnected
572        tracker.remove_connection(&peer_id, ConnectionId::new_unchecked(1));
573        assert!(watcher.has_changed().unwrap());
574        let info = watcher.borrow_and_update().to_owned();
575        assert_eq!(
576            info,
577            PeerTrackerInfo {
578                num_connected_peers: 1,
579                num_connected_trusted_peers: 0,
580                num_connected_full_nodes: 0,
581                num_connected_archival_nodes: 1,
582            }
583        );
584
585        // Peer gets reconnected
586        tracker.add_connection(&peer_id, ConnectionId::new_unchecked(3));
587        assert!(tracker.is_connected(&peer_id));
588        assert!(watcher.has_changed().unwrap());
589        let info = watcher.borrow_and_update().to_owned();
590        assert_eq!(
591            info,
592            PeerTrackerInfo {
593                num_connected_peers: 2,
594                num_connected_trusted_peers: 0,
595                num_connected_full_nodes: 0,
596                num_connected_archival_nodes: 1,
597            }
598        );
599    }
600
601    #[test]
602    fn protect() {
603        let peer_id = PeerId::random();
604        let event_channel = EventChannel::new();
605        let mut tracker = PeerTracker::new(event_channel.publisher());
606
607        // Unknown peers are always unprotected, so state doesn't change
608        assert!(!tracker.is_protected(&peer_id));
609        assert!(!tracker.unprotect(&peer_id, 0));
610        assert_eq!(tracker.protected_len(0), 0);
611
612        // Now state changed from unprotected to protected
613        assert!(!tracker.is_protected_with_tag(&peer_id, 0));
614        assert!(tracker.protect(&peer_id, 0));
615        assert!(tracker.is_protected(&peer_id));
616        assert!(tracker.is_protected_with_tag(&peer_id, 0));
617        assert_eq!(tracker.protected_len(0), 1);
618        // Adding more tags doesn't change the state
619        assert!(!tracker.is_protected_with_tag(&peer_id, 1));
620        assert!(!tracker.protect(&peer_id, 1));
621        assert!(tracker.is_protected(&peer_id));
622        assert!(tracker.is_protected_with_tag(&peer_id, 1));
623        assert_eq!(tracker.protected_len(1), 1);
624
625        // Adding an existing tag to a peer doesn't change the counter
626        assert!(!tracker.protect(&peer_id, 0));
627        assert_eq!(tracker.protected_len(0), 1);
628        // Adding a tag to a peer must increase the counter
629        assert!(tracker.protect(&PeerId::random(), 0));
630        assert_eq!(tracker.protected_len(0), 2);
631
632        // Removing only some of the tags doesn't change the state
633        assert!(!tracker.unprotect(&peer_id, 0));
634        assert!(!tracker.is_protected_with_tag(&peer_id, 0));
635        assert!(tracker.is_protected(&peer_id));
636        assert_eq!(tracker.protected_len(0), 1);
637        // Removing all tags, changes the state from protected to unprotected
638        assert!(tracker.unprotect(&peer_id, 1));
639        assert!(!tracker.is_protected_with_tag(&peer_id, 1));
640        assert!(!tracker.is_protected(&peer_id));
641        assert_eq!(tracker.protected_len(1), 0);
642    }
643
644    #[test]
645    fn node_kind() {
646        assert_eq!(
647            NodeKind::from_agent_version("lumina/celestia/0.14.0"),
648            NodeKind::Light
649        );
650
651        assert_eq!(
652            NodeKind::from_agent_version("celestia-node/celestia/bridge/v0.24.1/fb95d45"),
653            NodeKind::Bridge
654        );
655
656        assert_eq!(
657            NodeKind::from_agent_version("celestia-node/celestia/full/v0.24.1/fb95d45"),
658            NodeKind::Full
659        );
660
661        assert_eq!(
662            NodeKind::from_agent_version("celestia-node/celestia/light/v0.24.1/fb95d45"),
663            NodeKind::Light
664        );
665
666        assert_eq!(
667            NodeKind::from_agent_version("probelab-node/celestia/ant/v0.1.0"),
668            NodeKind::Unknown
669        );
670    }
671}