tycho_network/overlay/
public_overlay.rs

1use std::borrow::Borrow;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Result;
7use bytes::{Bytes, BytesMut};
8use indexmap::IndexMap;
9use parking_lot::{RwLock, RwLockReadGuard};
10use rand::Rng;
11use tokio::sync::Notify;
12use tycho_util::futures::BoxFutureOrNoop;
13use tycho_util::{FastDashSet, FastHasherState};
14
15use crate::dht::{PeerResolver, PeerResolverHandle};
16use crate::network::Network;
17use crate::overlay::metrics::Metrics;
18use crate::overlay::OverlayId;
19use crate::proto::overlay::{rpc, PublicEntry, PublicEntryToSign};
20use crate::types::{BoxService, PeerId, Request, Response, Service, ServiceExt, ServiceRequest};
21use crate::util::NetworkExt;
22
23pub struct PublicOverlayBuilder {
24    overlay_id: OverlayId,
25    min_capacity: usize,
26    entry_ttl: Duration,
27    banned_peer_ids: FastDashSet<PeerId>,
28    peer_resolver: Option<PeerResolver>,
29    name: Option<&'static str>,
30}
31
32impl PublicOverlayBuilder {
33    /// Minimum capacity for public overlay.
34    /// Public overlay will use suggested peers from untrusted sources to fill the overlay
35    /// until it reaches this capacity.
36    ///
37    /// Default: 100.
38    pub fn with_min_capacity(mut self, min_capacity: usize) -> Self {
39        self.min_capacity = min_capacity;
40        self
41    }
42
43    /// Time-to-live for each entry in the overlay.
44    ///
45    /// Default: 1 hour.
46    pub fn with_entry_ttl(mut self, entry_ttl: Duration) -> Self {
47        self.entry_ttl = entry_ttl;
48        self
49    }
50
51    /// Banned peers that will not be ignored by the overlay.
52    pub fn with_banned_peers<I>(mut self, banned_peers: I) -> Self
53    where
54        I: IntoIterator,
55        I::Item: Borrow<PeerId>,
56    {
57        self.banned_peer_ids
58            .extend(banned_peers.into_iter().map(|id| *id.borrow()));
59        self
60    }
61
62    /// Whether to resolve peers with the provided resolver.
63    ///
64    /// Does not resolve peers by default.
65    pub fn with_peer_resolver(mut self, peer_resolver: PeerResolver) -> Self {
66        self.peer_resolver = Some(peer_resolver);
67        self
68    }
69
70    /// Name of the overlay used in metrics.
71    pub fn named(mut self, name: &'static str) -> Self {
72        self.name = Some(name);
73        self
74    }
75
76    pub fn build<S>(self, service: S) -> PublicOverlay
77    where
78        S: Send + Sync + 'static,
79        S: Service<ServiceRequest, QueryResponse = Response>,
80    {
81        let request_prefix = tl_proto::serialize(rpc::Prefix {
82            overlay_id: self.overlay_id.as_bytes(),
83        });
84
85        let entries = PublicOverlayEntries {
86            items: Default::default(),
87        };
88
89        let entry_ttl_sec = self.entry_ttl.as_secs().try_into().unwrap_or(u32::MAX);
90
91        PublicOverlay {
92            inner: Arc::new(Inner {
93                overlay_id: self.overlay_id,
94                min_capacity: self.min_capacity,
95                entry_ttl_sec,
96                peer_resolver: self.peer_resolver,
97                entries: RwLock::new(entries),
98                entries_added: Notify::new(),
99                entries_changed: Notify::new(),
100                entries_removed: Notify::new(),
101                entry_count: AtomicUsize::new(0),
102                banned_peer_ids: self.banned_peer_ids,
103                service: service.boxed(),
104                request_prefix: request_prefix.into_boxed_slice(),
105                metrics: self
106                    .name
107                    .map(|label| Metrics::new("tycho_public_overlay", label))
108                    .unwrap_or_default(),
109            }),
110        }
111    }
112}
113
114#[derive(Clone)]
115#[repr(transparent)]
116pub struct PublicOverlay {
117    inner: Arc<Inner>,
118}
119
120impl PublicOverlay {
121    pub fn builder(overlay_id: OverlayId) -> PublicOverlayBuilder {
122        PublicOverlayBuilder {
123            overlay_id,
124            min_capacity: 100,
125            entry_ttl: Duration::from_secs(3600),
126            banned_peer_ids: Default::default(),
127            peer_resolver: None,
128            name: None,
129        }
130    }
131
132    #[inline]
133    pub fn overlay_id(&self) -> &OverlayId {
134        &self.inner.overlay_id
135    }
136
137    pub fn entry_ttl_sec(&self) -> u32 {
138        self.inner.entry_ttl_sec
139    }
140
141    pub fn peer_resolver(&self) -> &Option<PeerResolver> {
142        &self.inner.peer_resolver
143    }
144
145    pub async fn query(
146        &self,
147        network: &Network,
148        peer_id: &PeerId,
149        mut request: Request,
150    ) -> Result<Response> {
151        self.inner.metrics.record_tx(request.body.len());
152        self.prepend_prefix_to_body(&mut request.body);
153        network.query(peer_id, request).await
154    }
155
156    pub async fn send(
157        &self,
158        network: &Network,
159        peer_id: &PeerId,
160        mut request: Request,
161    ) -> Result<()> {
162        self.inner.metrics.record_tx(request.body.len());
163        self.prepend_prefix_to_body(&mut request.body);
164        network.send(peer_id, request).await
165    }
166
167    /// Bans the given peer from the overlay.
168    ///
169    /// Returns `true` if the peer was not already banned.
170    pub fn ban_peer(&self, peer_id: PeerId) -> bool {
171        self.inner.banned_peer_ids.insert(peer_id)
172    }
173
174    /// Unbans the given peer from the overlay.
175    ///
176    /// Returns `true` if the peer was banned.
177    pub fn unban_peer(&self, peer_id: &PeerId) -> bool {
178        self.inner.banned_peer_ids.remove(peer_id).is_some()
179    }
180
181    pub fn read_entries(&self) -> PublicOverlayEntriesReadGuard<'_> {
182        PublicOverlayEntriesReadGuard {
183            entries: self.inner.entries.read(),
184        }
185    }
186
187    /// Notifies when new entries are added to the overlay.
188    pub fn entires_added(&self) -> &Notify {
189        &self.inner.entries_added
190    }
191
192    /// Notifies when entries are updated in the overlay (added or updated).
193    pub fn entries_changed(&self) -> &Notify {
194        &self.inner.entries_changed
195    }
196
197    pub fn entries_removed(&self) -> &Notify {
198        &self.inner.entries_removed
199    }
200
201    pub(crate) fn handle_query(&self, req: ServiceRequest) -> BoxFutureOrNoop<Option<Response>> {
202        self.inner.metrics.record_rx(req.body.len());
203        if !self.inner.banned_peer_ids.contains(&req.metadata.peer_id) {
204            // TODO: add peer from metadata to the overlay
205            BoxFutureOrNoop::future(self.inner.service.on_query(req))
206        } else {
207            BoxFutureOrNoop::Noop
208        }
209    }
210
211    pub(crate) fn handle_message(&self, req: ServiceRequest) -> BoxFutureOrNoop<()> {
212        self.inner.metrics.record_rx(req.body.len());
213        if !self.inner.banned_peer_ids.contains(&req.metadata.peer_id) {
214            // TODO: add peer from metadata to the overlay
215            BoxFutureOrNoop::future(self.inner.service.on_message(req))
216        } else {
217            BoxFutureOrNoop::Noop
218        }
219    }
220
221    /// Adds the given entries to the overlay.
222    ///
223    /// NOTE: Will deadlock if called while `PublicOverlayEntriesReadGuard` is held.
224    pub(crate) fn add_untrusted_entries(
225        &self,
226        local_id: &PeerId,
227        entries: &[Arc<PublicEntry>],
228        now: u32,
229    ) -> bool {
230        if entries.is_empty() {
231            return false;
232        }
233
234        let this = self.inner.as_ref();
235
236        // Check if we can add more entries to the overlay and optimistically
237        // increase the entry count. (if no other thread has already done so).
238        let to_add = entries.len();
239        let mut entry_count = this.entry_count.load(Ordering::Acquire);
240        let to_add = loop {
241            let to_add = match this.min_capacity.checked_sub(entry_count) {
242                Some(capacity) if capacity > 0 => std::cmp::min(to_add, capacity),
243                _ => return false,
244            };
245
246            let res = this.entry_count.compare_exchange_weak(
247                entry_count,
248                entry_count + to_add,
249                Ordering::Release,
250                Ordering::Acquire,
251            );
252            match res {
253                Ok(_) => break to_add,
254                Err(n) => entry_count = n,
255            }
256        };
257
258        // Prepare validation state
259        let mut is_valid = vec![false; entries.len()];
260        let mut has_valid = false;
261
262        // First pass: verify all entries
263        for (entry, is_valid) in std::iter::zip(entries, is_valid.iter_mut()) {
264            if entry.is_expired(now, this.entry_ttl_sec)
265                || self.inner.banned_peer_ids.contains(&entry.peer_id)
266                || entry.peer_id == local_id
267            {
268                // Skip expired or banned peers early
269                continue;
270            }
271
272            let Some(pubkey) = entry.peer_id.as_public_key() else {
273                // Skip entries with invalid public keys
274                continue;
275            };
276
277            if !pubkey.verify(
278                PublicEntryToSign {
279                    overlay_id: this.overlay_id.as_bytes(),
280                    peer_id: &entry.peer_id,
281                    created_at: entry.created_at,
282                },
283                &entry.signature,
284            ) {
285                // Skip entries with invalid signatures
286                continue;
287            }
288
289            // NOTE: check all entries, even if we have more than `to_add`.
290            // We might need them if some are duplicates af known entries.
291            *is_valid = true;
292            has_valid = true;
293        }
294
295        // Second pass: insert all valid entries (if any)
296        //
297        // NOTE: two passes are necessary because public key parsing and
298        // signature verification can be expensive and we want to avoid
299        // holding the lock for too long.
300        let mut added = 0;
301        let mut changed = false;
302        if has_valid {
303            let mut stored = this.entries.write();
304            for (entry, is_valid) in std::iter::zip(entries, is_valid) {
305                if !is_valid {
306                    continue;
307                }
308
309                let status = stored.insert(&this.peer_resolver, entry);
310                changed |= status.is_changed();
311                added += status.is_added() as usize;
312
313                if added >= to_add {
314                    break;
315                }
316            }
317        }
318
319        // Rollback entries that were not valid and not inserted
320        if added < to_add {
321            this.entry_count
322                .fetch_sub(to_add - added, Ordering::Release);
323        }
324
325        if added > 0 {
326            this.entries_added.notify_waiters();
327        }
328        if changed {
329            this.entries_changed.notify_waiters();
330        }
331
332        changed || added > 0
333    }
334
335    /// Removes all expired and banned entries from the overlay.
336    pub(crate) fn remove_invalid_entries(&self, now: u32) {
337        let this = self.inner.as_ref();
338
339        let mut should_notify = false;
340        let mut entries = this.entries.write();
341        entries.retain(|item| {
342            let retain = !item.entry.is_expired(now, this.entry_ttl_sec)
343                && !this.banned_peer_ids.contains(&item.entry.peer_id);
344            should_notify |= !retain;
345            retain
346        });
347
348        if should_notify {
349            self.inner.entries_removed.notify_waiters();
350        }
351    }
352
353    fn prepend_prefix_to_body(&self, body: &mut Bytes) {
354        let this = self.inner.as_ref();
355
356        // TODO: reduce allocations
357        let mut res = BytesMut::with_capacity(this.request_prefix.len() + body.len());
358        res.extend_from_slice(&this.request_prefix);
359        res.extend_from_slice(body);
360        *body = res.freeze();
361    }
362}
363
364impl std::fmt::Debug for PublicOverlay {
365    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366        f.debug_struct("PublicOverlay")
367            .field("overlay_id", &self.inner.overlay_id)
368            .finish()
369    }
370}
371
372struct Inner {
373    overlay_id: OverlayId,
374    min_capacity: usize,
375    entry_ttl_sec: u32,
376    peer_resolver: Option<PeerResolver>,
377    entries: RwLock<PublicOverlayEntries>,
378    entry_count: AtomicUsize,
379    entries_added: Notify,
380    entries_changed: Notify,
381    entries_removed: Notify,
382    banned_peer_ids: FastDashSet<PeerId>,
383    service: BoxService<ServiceRequest, Response>,
384    request_prefix: Box<[u8]>,
385    metrics: Metrics,
386}
387
388pub struct PublicOverlayEntries {
389    items: OverlayItems,
390}
391
392impl PublicOverlayEntries {
393    /// Returns `true` if the set contains no elements.
394    pub fn is_empty(&self) -> bool {
395        self.items.is_empty()
396    }
397
398    /// Returns the number of elements in the set, also referred to as its 'length'.
399    pub fn len(&self) -> usize {
400        self.items.len()
401    }
402
403    /// Returns true if the set contains the specified peer id.
404    pub fn contains(&self, peer_id: &PeerId) -> bool {
405        self.items.contains_key(peer_id)
406    }
407
408    /// Returns an iterator over the entries.
409    ///
410    /// The order is not random, but is not defined.
411    pub fn iter(&self) -> indexmap::map::Values<'_, PeerId, PublicOverlayEntryData> {
412        self.items.values()
413    }
414
415    /// Returns a reference to one random element of the slice,
416    /// or `None` if the slice is empty.
417    pub fn choose<R>(&self, rng: &mut R) -> Option<&PublicOverlayEntryData>
418    where
419        R: Rng + ?Sized,
420    {
421        let index = rng.gen_range(0..self.items.len());
422        let (_, value) = self.items.get_index(index)?;
423        Some(value)
424    }
425
426    /// Chooses `n` entries from the set, without repetition,
427    /// and in random order.
428    pub fn choose_multiple<R>(
429        &self,
430        rng: &mut R,
431        n: usize,
432    ) -> ChooseMultiplePublicOverlayEntries<'_>
433    where
434        R: Rng + ?Sized,
435    {
436        let len = self.items.len();
437        ChooseMultiplePublicOverlayEntries {
438            items: &self.items,
439            indices: rand::seq::index::sample(rng, len, n.min(len)).into_iter(),
440        }
441    }
442
443    /// Chooses all entries from the set, without repetition,
444    /// and in random order.
445    pub fn choose_all<R>(&self, rng: &mut R) -> ChooseMultiplePublicOverlayEntries<'_>
446    where
447        R: Rng + ?Sized,
448    {
449        self.choose_multiple(rng, self.items.len())
450    }
451
452    fn insert(&mut self, peer_resolver: &Option<PeerResolver>, item: &PublicEntry) -> UpdateStatus {
453        match self.items.entry(item.peer_id) {
454            // No entry for the peer_id, insert a new one
455            indexmap::map::Entry::Vacant(entry) => {
456                let resolver_handle = peer_resolver.as_ref().map_or_else(
457                    || PeerResolverHandle::new_noop(&item.peer_id),
458                    |resolver| resolver.insert(&item.peer_id, false),
459                );
460
461                entry.insert(PublicOverlayEntryData {
462                    entry: Arc::new(item.clone()),
463                    resolver_handle,
464                });
465
466                UpdateStatus::Added
467            }
468            // Entry for the peer_id exists, update it if the new item is newer
469            indexmap::map::Entry::Occupied(mut entry) => {
470                let existing = entry.get_mut();
471                if existing.entry.created_at >= item.created_at {
472                    return UpdateStatus::Skipped;
473                }
474
475                // Try to reuse the existing Arc if possible
476                match Arc::get_mut(&mut existing.entry) {
477                    Some(existing) => existing.clone_from(item),
478                    None => existing.entry = Arc::new(item.clone()),
479                }
480                UpdateStatus::Updated
481            }
482        }
483    }
484
485    fn retain<F>(&mut self, mut f: F)
486    where
487        F: FnMut(&PublicOverlayEntryData) -> bool,
488    {
489        self.items.retain(|_, item| f(item));
490    }
491}
492
493#[derive(Clone)]
494pub struct PublicOverlayEntryData {
495    pub entry: Arc<PublicEntry>,
496    pub resolver_handle: PeerResolverHandle,
497}
498
499impl PublicOverlayEntryData {
500    pub fn is_expired(&self, now: u32, ttl: u32) -> bool {
501        self.entry.is_expired(now, ttl)
502    }
503
504    pub fn expires_at(&self, ttl: u32) -> u32 {
505        self.entry.created_at.saturating_add(ttl)
506    }
507}
508
509pub struct PublicOverlayEntriesReadGuard<'a> {
510    entries: RwLockReadGuard<'a, PublicOverlayEntries>,
511}
512
513impl std::ops::Deref for PublicOverlayEntriesReadGuard<'_> {
514    type Target = PublicOverlayEntries;
515
516    #[inline]
517    fn deref(&self) -> &Self::Target {
518        &self.entries
519    }
520}
521
522#[derive(Debug, Clone, Copy, PartialEq, Eq)]
523enum UpdateStatus {
524    Skipped,
525    Updated,
526    Added,
527}
528
529impl UpdateStatus {
530    fn is_changed(self) -> bool {
531        matches!(self, Self::Updated | Self::Added)
532    }
533
534    fn is_added(self) -> bool {
535        matches!(self, Self::Added)
536    }
537}
538
539pub struct ChooseMultiplePublicOverlayEntries<'a> {
540    items: &'a OverlayItems,
541    indices: rand::seq::index::IndexVecIntoIter,
542}
543
544impl<'a> Iterator for ChooseMultiplePublicOverlayEntries<'a> {
545    type Item = &'a PublicOverlayEntryData;
546
547    fn next(&mut self) -> Option<Self::Item> {
548        self.indices.next().and_then(|i| {
549            let (_, value) = self.items.get_index(i)?;
550            Some(value)
551        })
552    }
553
554    fn size_hint(&self) -> (usize, Option<usize>) {
555        (self.indices.len(), Some(self.indices.len()))
556    }
557}
558
559impl ExactSizeIterator for ChooseMultiplePublicOverlayEntries<'_> {
560    fn len(&self) -> usize {
561        self.indices.len()
562    }
563}
564
565type OverlayItems = IndexMap<PeerId, PublicOverlayEntryData, FastHasherState>;
566
567#[cfg(test)]
568mod tests {
569    use everscale_crypto::ed25519;
570    use tycho_util::time::now_sec;
571
572    use super::*;
573
574    fn generate_public_entry(overlay: &PublicOverlay, now: u32) -> Arc<PublicEntry> {
575        let keypair = ed25519::KeyPair::generate(&mut rand::thread_rng());
576        let peer_id: PeerId = keypair.public_key.into();
577        let signature = keypair.sign(crate::proto::overlay::PublicEntryToSign {
578            overlay_id: overlay.overlay_id().as_bytes(),
579            peer_id: &peer_id,
580            created_at: now,
581        });
582        Arc::new(PublicEntry {
583            peer_id,
584            created_at: now,
585            signature: Box::new(signature),
586        })
587    }
588
589    fn generate_invalid_public_entry(now: u32) -> Arc<PublicEntry> {
590        let keypair = ed25519::KeyPair::generate(&mut rand::thread_rng());
591        let peer_id: PeerId = keypair.public_key.into();
592        Arc::new(PublicEntry {
593            peer_id,
594            created_at: now,
595            signature: Box::new([0; 64]),
596        })
597    }
598
599    fn generate_public_entries(
600        overlay: &PublicOverlay,
601        now: u32,
602        n: usize,
603    ) -> Vec<Arc<PublicEntry>> {
604        (0..n)
605            .map(|_| generate_public_entry(overlay, now))
606            .collect()
607    }
608
609    fn count_entries(overlay: &PublicOverlay) -> usize {
610        let tracked_count = overlay.inner.entry_count.load(Ordering::Acquire);
611        let guard = overlay.read_entries();
612        assert_eq!(guard.entries.items.len(), tracked_count);
613        tracked_count
614    }
615
616    fn make_overlay_with_min_capacity(min_capacity: usize) -> PublicOverlay {
617        PublicOverlay::builder(rand::random())
618            .with_min_capacity(min_capacity)
619            .build(crate::service_query_fn(|_| {
620                futures_util::future::ready(None)
621            }))
622    }
623
624    #[test]
625    fn min_capacity_works_with_single_thread() {
626        let now = now_sec();
627        let local_id: PeerId = rand::random();
628
629        // Add with small portions
630        {
631            let overlay = make_overlay_with_min_capacity(10);
632            let entries = generate_public_entries(&overlay, now, 10);
633
634            overlay.add_untrusted_entries(&local_id, &entries[..5], now);
635            assert_eq!(count_entries(&overlay), 5);
636
637            overlay.add_untrusted_entries(&local_id, &entries[5..], now);
638            assert_eq!(count_entries(&overlay), 10);
639        }
640
641        // Add exact
642        {
643            let overlay = make_overlay_with_min_capacity(10);
644            let entries = generate_public_entries(&overlay, now, 10);
645            overlay.add_untrusted_entries(&local_id, &entries, now);
646            assert_eq!(count_entries(&overlay), 10);
647        }
648
649        // Add once but too much
650        {
651            let overlay = make_overlay_with_min_capacity(10);
652            let entries = generate_public_entries(&overlay, now, 20);
653            overlay.add_untrusted_entries(&local_id, &entries, now);
654            assert_eq!(count_entries(&overlay), 10);
655        }
656
657        // Add once but zero capacity
658        {
659            let overlay = make_overlay_with_min_capacity(0);
660            let entries = generate_public_entries(&overlay, now, 10);
661            overlay.add_untrusted_entries(&local_id, &entries, now);
662            assert_eq!(count_entries(&overlay), 0);
663        }
664
665        // Add all invalid entries
666        {
667            let overlay = make_overlay_with_min_capacity(10);
668            let entries = (0..10)
669                .map(|_| generate_invalid_public_entry(now))
670                .collect::<Vec<_>>();
671            overlay.add_untrusted_entries(&local_id, &entries, now);
672            assert_eq!(count_entries(&overlay), 0);
673        }
674
675        // Add mixed invalid entries
676        {
677            let overlay = make_overlay_with_min_capacity(10);
678            let entries = [
679                generate_invalid_public_entry(now),
680                generate_public_entry(&overlay, now),
681                generate_invalid_public_entry(now),
682                generate_public_entry(&overlay, now),
683                generate_invalid_public_entry(now),
684                generate_public_entry(&overlay, now),
685                generate_invalid_public_entry(now),
686                generate_public_entry(&overlay, now),
687                generate_invalid_public_entry(now),
688                generate_public_entry(&overlay, now),
689            ];
690            overlay.add_untrusted_entries(&local_id, &entries, now);
691            assert_eq!(count_entries(&overlay), 5);
692        }
693
694        // Add mixed invalid entries on edge
695        {
696            let overlay = make_overlay_with_min_capacity(3);
697            let entries = [
698                generate_invalid_public_entry(now),
699                generate_invalid_public_entry(now),
700                generate_invalid_public_entry(now),
701                generate_invalid_public_entry(now),
702                generate_invalid_public_entry(now),
703                generate_public_entry(&overlay, now),
704                generate_public_entry(&overlay, now),
705                generate_public_entry(&overlay, now),
706                generate_public_entry(&overlay, now),
707                generate_public_entry(&overlay, now),
708            ];
709            overlay.add_untrusted_entries(&local_id, &entries, now);
710            assert_eq!(count_entries(&overlay), 3);
711        }
712    }
713
714    #[test]
715    fn min_capacity_works_with_multi_thread() {
716        let now = now_sec();
717        let local_id: PeerId = rand::random();
718
719        let overlay = make_overlay_with_min_capacity(201);
720        let entries = generate_public_entries(&overlay, now, 7 * 3 * 10);
721
722        std::thread::scope(|s| {
723            for entries in entries.chunks_exact(7 * 3) {
724                s.spawn(|| {
725                    for entries in entries.chunks_exact(7) {
726                        overlay.add_untrusted_entries(&local_id, entries, now);
727                    }
728                });
729            }
730        });
731
732        assert_eq!(count_entries(&overlay), 201);
733    }
734}