lumina_node/
peer_tracker.rs

1//! Primitives related to tracking the state of peers in the network.
2
3use std::collections::hash_map::Entry;
4use std::collections::{HashMap, HashSet};
5use std::time::Duration;
6
7use libp2p::ping;
8use libp2p::{PeerId, swarm::ConnectionId};
9use lumina_utils::time::Instant;
10use serde::{Deserialize, Serialize};
11use tokio::sync::watch;
12use tracing::info;
13
14use crate::events::{EventPublisher, NodeEvent};
15
16/// How often garbage collector should be called.
17pub(crate) const GC_INTERVAL: Duration = Duration::from_secs(30);
18/// How much time a `Peer` needs to be disconnected to expire.
19const EXPIRED_AFTER: Duration = Duration::from_secs(120);
20
21/// Keeps track various information about peers.
22#[derive(Debug)]
23pub(crate) struct PeerTracker {
24    peers: HashMap<PeerId, Peer>,
25    protect_counter: HashMap<u32, usize>,
26    info_tx: watch::Sender<PeerTrackerInfo>,
27    event_pub: EventPublisher,
28}
29
30/// Statistics of the connected peers
31#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
32#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
33pub struct PeerTrackerInfo {
34    /// Number of the connected peers.
35    pub num_connected_peers: u64,
36    /// Number of the connected trusted peers.
37    pub num_connected_trusted_peers: u64,
38    /// Number of the connected full nodes.
39    // This is used by `SwarmManager` in order to trigger `peer_health_check`.
40    pub num_connected_full_nodes: u64,
41    /// Number of the connected archival nodes.
42    // This is used by `SwarmManager` in order to trigger `peer_health_check`.
43    pub num_connected_archival_nodes: u64,
44}
45
46#[derive(Debug)]
47pub(crate) struct Peer {
48    id: PeerId,
49    connections: HashMap<ConnectionId, ConnectionInfo>,
50    protected: HashSet<u32>,
51    trusted: bool,
52    archival: bool,
53    node_kind: NodeKind,
54    disconnected_at: Option<Instant>,
55}
56
57#[derive(Debug, Default)]
58struct ConnectionInfo {
59    ping: Option<Duration>,
60}
61
62#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
63pub(crate) enum NodeKind {
64    #[default]
65    Unknown,
66    Bridge,
67    Full,
68    Light,
69}
70
71impl NodeKind {
72    fn from_agent_version(s: &str) -> NodeKind {
73        let mut s = s.split('/');
74
75        match s.next() {
76            Some("lumina") => NodeKind::Light,
77            Some("celestia-node") => match s.nth(1) {
78                Some("bridge") => NodeKind::Bridge,
79                Some("full") => NodeKind::Full,
80                Some("light") => NodeKind::Light,
81                _ => NodeKind::Unknown,
82            },
83            _ => NodeKind::Unknown,
84        }
85    }
86
87    pub(crate) fn is_full(&self) -> bool {
88        matches!(self, NodeKind::Full | NodeKind::Bridge)
89    }
90}
91
92impl Peer {
93    fn new(id: PeerId) -> Self {
94        Peer {
95            id,
96            connections: HashMap::new(),
97            protected: HashSet::new(),
98            trusted: false,
99            archival: false,
100            node_kind: NodeKind::Unknown,
101            // We start as disconnected
102            disconnected_at: Some(Instant::now()),
103        }
104    }
105
106    pub(crate) fn id(&self) -> &PeerId {
107        &self.id
108    }
109
110    pub(crate) fn is_connected(&self) -> bool {
111        !self.connections.is_empty()
112    }
113
114    pub(crate) fn is_trusted(&self) -> bool {
115        self.trusted
116    }
117
118    pub(crate) fn is_protected(&self) -> bool {
119        !self.protected.is_empty()
120    }
121
122    pub(crate) fn is_protected_with_tag(&self, tag: u32) -> bool {
123        self.protected.contains(&tag)
124    }
125
126    pub(crate) fn is_archival(&self) -> bool {
127        self.archival
128    }
129
130    pub(crate) fn is_full(&self) -> bool {
131        self.node_kind.is_full()
132    }
133
134    #[allow(dead_code)]
135    pub(crate) fn node_kind(&self) -> NodeKind {
136        self.node_kind
137    }
138
139    pub(crate) fn best_ping(&self) -> Option<Duration> {
140        self.connections
141            .iter()
142            .flat_map(|(_, conn_info)| conn_info.ping)
143            .min()
144    }
145}
146
147impl PeerTracker {
148    /// Constructs an empty PeerTracker.
149    pub(crate) fn new(event_pub: EventPublisher) -> Self {
150        PeerTracker {
151            peers: HashMap::new(),
152            protect_counter: HashMap::new(),
153            info_tx: watch::channel(PeerTrackerInfo::default()).0,
154            event_pub,
155        }
156    }
157
158    /// Returns the current [`PeerTrackerInfo`].
159    pub(crate) fn info(&self) -> PeerTrackerInfo {
160        self.info_tx.borrow().to_owned()
161    }
162
163    /// Returns a watcher for any [`PeerTrackerInfo`] changes.
164    pub(crate) fn info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
165        self.info_tx.subscribe()
166    }
167
168    pub(crate) fn peer(&self, peer_id: &PeerId) -> Option<&Peer> {
169        self.peers.get(peer_id)
170    }
171
172    pub(crate) fn peers(&self) -> impl Iterator<Item = &Peer> {
173        self.peers.values()
174    }
175
176    pub(crate) fn is_connected(&self, peer_id: &PeerId) -> bool {
177        self.peer(peer_id).is_some_and(|p| p.is_connected())
178    }
179
180    pub(crate) fn is_protected(&self, peer_id: &PeerId) -> bool {
181        self.peer(peer_id).is_some_and(|p| p.is_protected())
182    }
183
184    #[allow(dead_code)]
185    pub(crate) fn is_protected_with_tag(&self, peer_id: &PeerId, tag: u32) -> bool {
186        self.peer(peer_id)
187            .is_some_and(|p| p.is_protected_with_tag(tag))
188    }
189
190    /// Adds a peer ID.
191    ///
192    /// Returns `true` if peer was not known from before.
193    pub(crate) fn add_peer_id(&mut self, peer_id: &PeerId) -> bool {
194        match self.peers.entry(*peer_id) {
195            Entry::Vacant(entry) => {
196                entry.insert(Peer::new(*peer_id));
197                true
198            }
199            Entry::Occupied(_) => false,
200        }
201    }
202
203    /// Sets peer as trusted.
204    pub(crate) fn set_trusted(&mut self, peer_id: &PeerId, is_trusted: bool) {
205        let peer = self
206            .peers
207            .entry(*peer_id)
208            .or_insert_with(|| Peer::new(*peer_id));
209
210        peer.trusted = is_trusted;
211        self.recount_peer_tracker_info();
212    }
213
214    /// Add protect flag to the peer.
215    ///
216    /// Tag allows having different reasons for protection without interfering with one another.
217    ///
218    /// Returns `true` if the peer changes state from unprotected to protected.
219    pub(crate) fn protect(&mut self, peer_id: &PeerId, tag: u32) -> bool {
220        let peer = self
221            .peers
222            .entry(*peer_id)
223            .or_insert_with(|| Peer::new(*peer_id));
224        let was_protected = peer.is_protected();
225
226        if peer.protected.insert(tag) {
227            *self.protect_counter.entry(tag).or_default() += 1;
228            info!("Protect peer {peer_id} with {tag} tag");
229        }
230
231        !was_protected
232    }
233
234    /// Remove protect flag from the peer.
235    ///
236    /// Tag allows having different reasons for protection without interfering with one another.
237    ///
238    /// Returns `true` if the peer changes state from protected to unprotected.
239    pub(crate) fn unprotect(&mut self, peer_id: &PeerId, tag: u32) -> bool {
240        let Some(peer) = self.peers.get_mut(peer_id) else {
241            return false;
242        };
243
244        let was_protected = peer.is_protected();
245
246        if peer.protected.remove(&tag) {
247            *self
248                .protect_counter
249                .get_mut(&tag)
250                .expect("protected flag was set but not counted") -= 1;
251
252            info!("Unprotect peer {peer_id} with {tag} tag");
253        }
254
255        // Return true if `protected` state changed
256        was_protected && !peer.is_protected()
257    }
258
259    pub(crate) fn protected_len(&self, tag: u32) -> usize {
260        self.protect_counter.get(&tag).copied().unwrap_or(0)
261    }
262
263    /// Add an active connection of a peer.
264    pub(crate) fn add_connection(&mut self, peer_id: &PeerId, connection_id: ConnectionId) {
265        let peer = self
266            .peers
267            .entry(*peer_id)
268            .or_insert_with(|| Peer::new(*peer_id));
269        let prev_connected = peer.is_connected();
270
271        peer.connections
272            .insert(connection_id, ConnectionInfo::default());
273
274        // If peer was not already connected from before
275        if !prev_connected {
276            let trusted = peer.trusted;
277            peer.disconnected_at.take();
278            self.recount_peer_tracker_info();
279
280            self.event_pub.send(NodeEvent::PeerConnected {
281                id: *peer_id,
282                trusted,
283            });
284        }
285    }
286
287    /// Remove a connection from the peer.
288    pub(crate) fn remove_connection(&mut self, peer_id: &PeerId, connection_id: ConnectionId) {
289        let Some(peer) = self.peers.get_mut(peer_id) else {
290            return;
291        };
292
293        peer.connections.retain(|id, _| *id != connection_id);
294
295        // If this is the last connection from the peer.
296        if !peer.is_connected() {
297            let trusted = peer.trusted;
298            peer.node_kind = NodeKind::Unknown;
299            peer.archival = false;
300            peer.disconnected_at = Some(Instant::now());
301            self.recount_peer_tracker_info();
302
303            self.event_pub.send(NodeEvent::PeerDisconnected {
304                id: peer_id.to_owned(),
305                trusted,
306            });
307        }
308    }
309
310    pub(crate) fn on_agent_version(&mut self, peer_id: &PeerId, agent_version: &str) {
311        if let Some(peer) = self.peers.get_mut(peer_id)
312            && peer.is_connected()
313        {
314            peer.node_kind = NodeKind::from_agent_version(agent_version);
315            self.recount_peer_tracker_info();
316        }
317    }
318
319    pub(crate) fn on_ping_event(&mut self, ev: &ping::Event) {
320        if let Some(peer) = self.peers.get_mut(&ev.peer)
321            && let Some(conn_info) = peer.connections.get_mut(&ev.connection)
322        {
323            conn_info.ping = ev.result.as_ref().ok().copied();
324        }
325    }
326
327    pub(crate) fn mark_as_archival(&mut self, peer_id: &PeerId) {
328        let peer = self
329            .peers
330            .entry(*peer_id)
331            .or_insert_with(|| Peer::new(*peer_id));
332
333        peer.archival = true;
334        self.recount_peer_tracker_info();
335    }
336
337    pub(crate) fn connections(
338        &self,
339        peer_id: &PeerId,
340    ) -> impl Iterator<Item = ConnectionId> + use<'_> {
341        self.peer(peer_id)
342            .map(|peer| peer.connections.keys().copied())
343            .into_iter()
344            .flatten()
345    }
346
347    /// Returns all connections.
348    pub(crate) fn all_connections(&self) -> impl Iterator<Item = (&PeerId, ConnectionId)> {
349        self.peers()
350            .filter(|peer| peer.is_connected())
351            .flat_map(|peer| {
352                peer.connections
353                    .keys()
354                    .copied()
355                    .map(|conn| (peer.id(), conn))
356            })
357    }
358
359    fn recount_peer_tracker_info(&self) {
360        self.info_tx.send_if_modified(|info| {
361            let mut new_info = PeerTrackerInfo::default();
362
363            for peer in self.peers.values() {
364                if peer.is_connected() {
365                    new_info.num_connected_peers += 1;
366
367                    if peer.is_trusted() {
368                        new_info.num_connected_trusted_peers += 1;
369                    }
370
371                    if peer.is_full() {
372                        new_info.num_connected_full_nodes += 1;
373                    }
374
375                    if peer.is_archival() {
376                        new_info.num_connected_archival_nodes += 1;
377                    }
378                }
379            }
380
381            if *info != new_info {
382                *info = new_info;
383                true
384            } else {
385                false
386            }
387        });
388    }
389
390    pub(crate) fn gc(&mut self) {
391        self.peers.retain(|_, peer| {
392            // We keep:
393            //
394            // * Connected peers
395            // * Protected peers
396            // * Recently disconnected peers
397            peer.is_connected()
398                || peer.is_protected()
399                || peer
400                    .disconnected_at
401                    .is_none_or(|tm| tm.elapsed() <= EXPIRED_AFTER)
402        });
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use crate::events::EventChannel;
409
410    use super::*;
411
412    #[test]
413    fn trust_before_connect() {
414        let event_channel = EventChannel::new();
415        let mut tracker = PeerTracker::new(event_channel.publisher());
416        let mut watcher = tracker.info_watcher();
417        let peer_id = PeerId::random();
418
419        assert!(!watcher.has_changed().unwrap());
420
421        tracker.set_trusted(&peer_id, true);
422        assert!(!watcher.has_changed().unwrap());
423
424        tracker.add_connection(&peer_id, ConnectionId::new_unchecked(1));
425        assert!(tracker.is_connected(&peer_id));
426        assert!(watcher.has_changed().unwrap());
427        let info = watcher.borrow_and_update().to_owned();
428        assert_eq!(info.num_connected_peers, 1);
429        assert_eq!(info.num_connected_trusted_peers, 1);
430    }
431
432    #[test]
433    fn trust_after_connect() {
434        let event_channel = EventChannel::new();
435        let mut tracker = PeerTracker::new(event_channel.publisher());
436        let mut watcher = tracker.info_watcher();
437        let peer_id = PeerId::random();
438
439        assert!(!watcher.has_changed().unwrap());
440
441        tracker.add_connection(&peer_id, ConnectionId::new_unchecked(1));
442        assert!(tracker.is_connected(&peer_id));
443        assert!(watcher.has_changed().unwrap());
444        let info = watcher.borrow_and_update().to_owned();
445        assert_eq!(info.num_connected_peers, 1);
446        assert_eq!(info.num_connected_trusted_peers, 0);
447
448        tracker.set_trusted(&peer_id, true);
449        assert!(watcher.has_changed().unwrap());
450        let info = watcher.borrow_and_update().to_owned();
451        assert_eq!(info.num_connected_peers, 1);
452        assert_eq!(info.num_connected_trusted_peers, 1);
453    }
454
455    #[test]
456    fn untrust_after_connect() {
457        let event_channel = EventChannel::new();
458        let mut tracker = PeerTracker::new(event_channel.publisher());
459        let mut watcher = tracker.info_watcher();
460        let peer_id = PeerId::random();
461
462        assert!(!watcher.has_changed().unwrap());
463
464        tracker.set_trusted(&peer_id, true);
465        assert!(!watcher.has_changed().unwrap());
466
467        tracker.add_connection(&peer_id, ConnectionId::new_unchecked(1));
468        assert!(tracker.is_connected(&peer_id));
469        assert!(watcher.has_changed().unwrap());
470        let info = watcher.borrow_and_update().to_owned();
471        assert_eq!(info.num_connected_peers, 1);
472        assert_eq!(info.num_connected_trusted_peers, 1);
473
474        tracker.set_trusted(&peer_id, false);
475        assert!(watcher.has_changed().unwrap());
476        let info = watcher.borrow_and_update().to_owned();
477        assert_eq!(info.num_connected_peers, 1);
478        assert_eq!(info.num_connected_trusted_peers, 0);
479    }
480
481    #[test]
482    fn tracker_info() {
483        let event_channel = EventChannel::new();
484        let mut tracker = PeerTracker::new(event_channel.publisher());
485        let mut watcher = tracker.info_watcher();
486        let peer_id = PeerId::random();
487        let peer2_id = PeerId::random();
488
489        tracker.add_connection(&peer_id, ConnectionId::new_unchecked(1));
490        assert!(tracker.is_connected(&peer_id));
491        assert!(watcher.has_changed().unwrap());
492        let info = watcher.borrow_and_update().to_owned();
493        assert_eq!(
494            info,
495            PeerTrackerInfo {
496                num_connected_peers: 1,
497                num_connected_trusted_peers: 0,
498                num_connected_full_nodes: 0,
499                num_connected_archival_nodes: 0,
500            }
501        );
502
503        tracker.mark_as_archival(&peer_id);
504        tracker.mark_as_archival(&peer2_id);
505        assert!(watcher.has_changed().unwrap());
506        let info = watcher.borrow_and_update().to_owned();
507        assert_eq!(
508            info,
509            PeerTrackerInfo {
510                num_connected_peers: 1,
511                num_connected_trusted_peers: 0,
512                num_connected_full_nodes: 0,
513                num_connected_archival_nodes: 1,
514            }
515        );
516
517        tracker.mark_as_archival(&peer_id);
518        assert!(!watcher.has_changed().unwrap());
519
520        tracker.on_agent_version(&peer_id, "celestia-node/celestia/full/v0.24.1/fb95d45");
521        assert!(watcher.has_changed().unwrap());
522        let info = watcher.borrow_and_update().to_owned();
523        assert_eq!(
524            info,
525            PeerTrackerInfo {
526                num_connected_peers: 1,
527                num_connected_trusted_peers: 0,
528                num_connected_full_nodes: 1,
529                num_connected_archival_nodes: 1,
530            }
531        );
532
533        tracker.on_agent_version(&peer_id, "celestia-node/celestia/full/v0.24.1/fb95d45");
534        assert!(!watcher.has_changed().unwrap());
535
536        // peer2_id connected, check that previous `mark_as_archival` is
537        // propagated in `PeerTrackerInfo`.
538        tracker.add_connection(&peer2_id, ConnectionId::new_unchecked(2));
539        assert!(watcher.has_changed().unwrap());
540        let info = watcher.borrow_and_update().to_owned();
541        assert_eq!(
542            info,
543            PeerTrackerInfo {
544                num_connected_peers: 2,
545                num_connected_trusted_peers: 0,
546                num_connected_full_nodes: 1,
547                num_connected_archival_nodes: 2,
548            }
549        );
550
551        // Peer gets disconnected
552        tracker.remove_connection(&peer_id, ConnectionId::new_unchecked(1));
553        assert!(watcher.has_changed().unwrap());
554        let info = watcher.borrow_and_update().to_owned();
555        assert_eq!(
556            info,
557            PeerTrackerInfo {
558                num_connected_peers: 1,
559                num_connected_trusted_peers: 0,
560                num_connected_full_nodes: 0,
561                num_connected_archival_nodes: 1,
562            }
563        );
564
565        // Peer gets reconnected
566        tracker.add_connection(&peer_id, ConnectionId::new_unchecked(3));
567        assert!(tracker.is_connected(&peer_id));
568        assert!(watcher.has_changed().unwrap());
569        let info = watcher.borrow_and_update().to_owned();
570        assert_eq!(
571            info,
572            PeerTrackerInfo {
573                num_connected_peers: 2,
574                num_connected_trusted_peers: 0,
575                num_connected_full_nodes: 0,
576                num_connected_archival_nodes: 1,
577            }
578        );
579    }
580
581    #[test]
582    fn protect() {
583        let peer_id = PeerId::random();
584        let event_channel = EventChannel::new();
585        let mut tracker = PeerTracker::new(event_channel.publisher());
586
587        // Unknown peers are always unprotected, so state doesn't change
588        assert!(!tracker.is_protected(&peer_id));
589        assert!(!tracker.unprotect(&peer_id, 0));
590        assert_eq!(tracker.protected_len(0), 0);
591
592        // Now the state changes from unprotected to protected
593        assert!(!tracker.is_protected_with_tag(&peer_id, 0));
594        assert!(tracker.protect(&peer_id, 0));
595        assert!(tracker.is_protected(&peer_id));
596        assert!(tracker.is_protected_with_tag(&peer_id, 0));
597        assert_eq!(tracker.protected_len(0), 1);
598        // Adding more tags doesn't change the state
599        assert!(!tracker.is_protected_with_tag(&peer_id, 1));
600        assert!(!tracker.protect(&peer_id, 1));
601        assert!(tracker.is_protected(&peer_id));
602        assert!(tracker.is_protected_with_tag(&peer_id, 1));
603        assert_eq!(tracker.protected_len(1), 1);
604
605        // Adding an existing tag to a peer doesn't change the counter
606        assert!(!tracker.protect(&peer_id, 0));
607        assert_eq!(tracker.protected_len(0), 1);
608        // Adding a tag to a peer must increase the counter
609        assert!(tracker.protect(&PeerId::random(), 0));
610        assert_eq!(tracker.protected_len(0), 2);
611
612        // Removing only some of the tags doesn't change the state
613        assert!(!tracker.unprotect(&peer_id, 0));
614        assert!(!tracker.is_protected_with_tag(&peer_id, 0));
615        assert!(tracker.is_protected(&peer_id));
616        assert_eq!(tracker.protected_len(0), 1);
617        // Removing all tags, changes the state from protected to unprotected
618        assert!(tracker.unprotect(&peer_id, 1));
619        assert!(!tracker.is_protected_with_tag(&peer_id, 1));
620        assert!(!tracker.is_protected(&peer_id));
621        assert_eq!(tracker.protected_len(1), 0);
622    }
623
624    #[test]
625    fn node_kind() {
626        assert_eq!(
627            NodeKind::from_agent_version("lumina/celestia/0.14.0"),
628            NodeKind::Light
629        );
630
631        assert_eq!(
632            NodeKind::from_agent_version("celestia-node/celestia/bridge/v0.24.1/fb95d45"),
633            NodeKind::Bridge
634        );
635
636        assert_eq!(
637            NodeKind::from_agent_version("celestia-node/celestia/full/v0.24.1/fb95d45"),
638            NodeKind::Full
639        );
640
641        assert_eq!(
642            NodeKind::from_agent_version("celestia-node/celestia/light/v0.24.1/fb95d45"),
643            NodeKind::Light
644        );
645
646        assert_eq!(
647            NodeKind::from_agent_version("probelab-node/celestia/ant/v0.1.0"),
648            NodeKind::Unknown
649        );
650    }
651}