tycho_network/overlay/
public_overlay.rs

1use std::borrow::Borrow;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::time::Duration;
5
6use anyhow::Result;
7use arc_swap::ArcSwapOption;
8use bytes::{Bytes, BytesMut};
9use indexmap::{IndexMap, IndexSet};
10use parking_lot::{Mutex, RwLock, RwLockReadGuard};
11use rand::Rng;
12use tokio::sync::Notify;
13use tycho_util::futures::BoxFutureOrNoop;
14use tycho_util::{FastDashSet, FastHasherState};
15
16use crate::dht::{PeerResolver, PeerResolverHandle};
17use crate::network::Network;
18use crate::overlay::OverlayId;
19use crate::overlay::metrics::Metrics;
20use crate::proto::overlay::{PublicEntry, PublicEntryToSign, rpc};
21use crate::types::{BoxService, PeerId, Request, Response, Service, ServiceExt, ServiceRequest};
22use crate::util::NetworkExt;
23
24pub struct PublicOverlayBuilder {
25    overlay_id: OverlayId,
26    min_capacity: usize,
27    entry_ttl: Duration,
28    banned_peer_ids: FastDashSet<PeerId>,
29    peer_resolver: Option<PeerResolver>,
30    name: Option<&'static str>,
31}
32
33impl PublicOverlayBuilder {
34    /// Minimum capacity for public overlay.
35    /// Public overlay will use suggested peers from untrusted sources to fill the overlay
36    /// until it reaches this capacity.
37    ///
38    /// Default: 100.
39    pub fn with_min_capacity(mut self, min_capacity: usize) -> Self {
40        self.min_capacity = min_capacity;
41        self
42    }
43
44    /// Time-to-live for each entry in the overlay.
45    ///
46    /// Default: 1 hour.
47    pub fn with_entry_ttl(mut self, entry_ttl: Duration) -> Self {
48        self.entry_ttl = entry_ttl;
49        self
50    }
51
52    /// Banned peers that will not be ignored by the overlay.
53    pub fn with_banned_peers<I>(mut self, banned_peers: I) -> Self
54    where
55        I: IntoIterator,
56        I::Item: Borrow<PeerId>,
57    {
58        self.banned_peer_ids
59            .extend(banned_peers.into_iter().map(|id| *id.borrow()));
60        self
61    }
62
63    /// Whether to resolve peers with the provided resolver.
64    ///
65    /// Does not resolve peers by default.
66    pub fn with_peer_resolver(mut self, peer_resolver: PeerResolver) -> Self {
67        self.peer_resolver = Some(peer_resolver);
68        self
69    }
70
71    /// Name of the overlay used in metrics.
72    pub fn named(mut self, name: &'static str) -> Self {
73        self.name = Some(name);
74        self
75    }
76
77    pub fn build<S>(self, service: S) -> PublicOverlay
78    where
79        S: Send + Sync + 'static,
80        S: Service<ServiceRequest, QueryResponse = Response>,
81    {
82        const UNRESOLVED_QUEUE_CAPACITY: usize = 5; // peers
83
84        let request_prefix = tl_proto::serialize(rpc::Prefix {
85            overlay_id: self.overlay_id.as_bytes(),
86        });
87
88        let entries = PublicOverlayEntries {
89            items: Default::default(),
90        };
91
92        let entry_ttl_sec = self.entry_ttl.as_secs().try_into().unwrap_or(u32::MAX);
93
94        PublicOverlay {
95            inner: Arc::new(Inner {
96                overlay_id: self.overlay_id,
97                min_capacity: self.min_capacity,
98                entry_ttl_sec,
99                peer_resolver: self.peer_resolver,
100                entries: RwLock::new(entries),
101                entries_added: Notify::new(),
102                entries_changed: Notify::new(),
103                entries_removed: Notify::new(),
104                entry_count: AtomicUsize::new(0),
105                own_signed_entry: Default::default(),
106                unknown_peers_queue: UnknownPeersQueue::with_capacity(UNRESOLVED_QUEUE_CAPACITY),
107                banned_peer_ids: self.banned_peer_ids,
108                service: service.boxed(),
109                request_prefix: request_prefix.into_boxed_slice(),
110                metrics: self
111                    .name
112                    .map(|label| Metrics::new("tycho_public_overlay", label))
113                    .unwrap_or_default(),
114            }),
115        }
116    }
117}
118
119#[derive(Clone)]
120#[repr(transparent)]
121pub struct PublicOverlay {
122    inner: Arc<Inner>,
123}
124
125impl PublicOverlay {
126    pub fn builder(overlay_id: OverlayId) -> PublicOverlayBuilder {
127        PublicOverlayBuilder {
128            overlay_id,
129            min_capacity: 100,
130            entry_ttl: Duration::from_secs(3600),
131            banned_peer_ids: Default::default(),
132            peer_resolver: None,
133            name: None,
134        }
135    }
136
137    #[inline]
138    pub fn overlay_id(&self) -> &OverlayId {
139        &self.inner.overlay_id
140    }
141
142    pub fn entry_ttl_sec(&self) -> u32 {
143        self.inner.entry_ttl_sec
144    }
145
146    pub fn peer_resolver(&self) -> &Option<PeerResolver> {
147        &self.inner.peer_resolver
148    }
149
150    pub fn unknown_peers_queue(&self) -> &UnknownPeersQueue {
151        &self.inner.unknown_peers_queue
152    }
153
154    pub async fn query(
155        &self,
156        network: &Network,
157        peer_id: &PeerId,
158        mut request: Request,
159    ) -> Result<Response> {
160        self.inner.metrics.record_tx(request.body.len());
161        self.prepend_prefix_to_body(&mut request.body);
162        network.query(peer_id, request).await
163    }
164
165    pub async fn send(
166        &self,
167        network: &Network,
168        peer_id: &PeerId,
169        mut request: Request,
170    ) -> Result<()> {
171        self.inner.metrics.record_tx(request.body.len());
172        self.prepend_prefix_to_body(&mut request.body);
173        network.send(peer_id, request).await
174    }
175
176    /// Bans the given peer from the overlay.
177    ///
178    /// Returns `true` if the peer was not already banned.
179    pub fn ban_peer(&self, peer_id: PeerId) -> bool {
180        self.inner.banned_peer_ids.insert(peer_id)
181    }
182
183    /// Unbans the given peer from the overlay.
184    ///
185    /// Returns `true` if the peer was banned.
186    pub fn unban_peer(&self, peer_id: &PeerId) -> bool {
187        self.inner.banned_peer_ids.remove(peer_id).is_some()
188    }
189
190    pub fn read_entries(&self) -> PublicOverlayEntriesReadGuard<'_> {
191        PublicOverlayEntriesReadGuard {
192            entries: self.inner.entries.read(),
193        }
194    }
195
196    /// Notifies when new entries are added to the overlay.
197    pub fn entires_added(&self) -> &Notify {
198        &self.inner.entries_added
199    }
200
201    /// Notifies when entries are updated in the overlay (added or updated).
202    pub fn entries_changed(&self) -> &Notify {
203        &self.inner.entries_changed
204    }
205
206    pub fn entries_removed(&self) -> &Notify {
207        &self.inner.entries_removed
208    }
209
210    /// Own signed public entry.
211    pub fn own_signed_entry(&self) -> Option<Arc<PublicEntry>> {
212        self.inner.own_signed_entry.load_full()
213    }
214
215    pub(crate) fn set_own_signed_entry(&self, entry: Arc<PublicEntry>) {
216        self.inner.own_signed_entry.store(Some(entry));
217    }
218
219    pub(crate) fn handle_query(&self, req: ServiceRequest) -> BoxFutureOrNoop<Option<Response>> {
220        self.inner.metrics.record_rx(req.body.len());
221        if self.check_peer_id(&req.metadata.peer_id) {
222            BoxFutureOrNoop::future(self.inner.service.on_query(req))
223        } else {
224            BoxFutureOrNoop::Noop
225        }
226    }
227
228    pub(crate) fn handle_message(&self, req: ServiceRequest) -> BoxFutureOrNoop<()> {
229        self.inner.metrics.record_rx(req.body.len());
230        if self.check_peer_id(&req.metadata.peer_id) {
231            BoxFutureOrNoop::future(self.inner.service.on_message(req))
232        } else {
233            BoxFutureOrNoop::Noop
234        }
235    }
236
237    fn check_peer_id(&self, peer_id: &PeerId) -> bool {
238        // TODO: Merge `banned_peer_ids` with `entires`?
239        if self.inner.banned_peer_ids.contains(peer_id) {
240            // Discard requests from banned peers.
241            return false;
242        }
243
244        // NOTE: We are checking `is_full` before `entries.read()`
245        // to reduce the amount of locks when we receive lots of requests
246        // from different peers.
247        if !self.inner.unknown_peers_queue.is_full() && !self.inner.entries.read().contains(peer_id)
248        {
249            // Push unknown peers into queue to resolve.
250            if self.inner.unknown_peers_queue.push(peer_id) {
251                tracing::debug!(%peer_id, "found new unknown peer to resolve");
252            }
253        }
254
255        true
256    }
257
258    /// Adds the given entries to the overlay.
259    ///
260    /// NOTE: Will deadlock if called while `PublicOverlayEntriesReadGuard` is held.
261    pub(crate) fn add_untrusted_entries(
262        &self,
263        local_id: &PeerId,
264        entries: &[Arc<PublicEntry>],
265        now: u32,
266    ) -> bool {
267        if entries.is_empty() {
268            return false;
269        }
270
271        let this = self.inner.as_ref();
272
273        // Check if we can add more entries to the overlay and optimistically
274        // increase the entry count. (if no other thread has already done so).
275        let to_add = entries.len();
276        let mut entry_count = this.entry_count.load(Ordering::Acquire);
277        let to_add = loop {
278            let to_add = match this.min_capacity.checked_sub(entry_count) {
279                Some(capacity) if capacity > 0 => std::cmp::min(to_add, capacity),
280                _ => return false,
281            };
282
283            let res = this.entry_count.compare_exchange_weak(
284                entry_count,
285                entry_count + to_add,
286                Ordering::Release,
287                Ordering::Acquire,
288            );
289            match res {
290                Ok(_) => break to_add,
291                Err(n) => entry_count = n,
292            }
293        };
294
295        // Prepare validation state
296        let mut is_valid = vec![false; entries.len()];
297        let mut has_valid = false;
298
299        // First pass: verify all entries
300        for (entry, is_valid) in std::iter::zip(entries, is_valid.iter_mut()) {
301            if entry.is_expired(now, this.entry_ttl_sec)
302                || self.inner.banned_peer_ids.contains(&entry.peer_id)
303                || entry.peer_id == local_id
304            {
305                // Skip expired or banned peers early
306                continue;
307            }
308
309            let Some(pubkey) = entry.peer_id.as_public_key() else {
310                // Skip entries with invalid public keys
311                continue;
312            };
313
314            if !pubkey.verify_tl(
315                PublicEntryToSign {
316                    overlay_id: this.overlay_id.as_bytes(),
317                    peer_id: &entry.peer_id,
318                    created_at: entry.created_at,
319                },
320                &entry.signature,
321            ) {
322                // Skip entries with invalid signatures
323                continue;
324            }
325
326            // NOTE: check all entries, even if we have more than `to_add`.
327            // We might need them if some are duplicates af known entries.
328            *is_valid = true;
329            has_valid = true;
330        }
331
332        // Second pass: insert all valid entries (if any)
333        //
334        // NOTE: two passes are necessary because public key parsing and
335        // signature verification can be expensive and we want to avoid
336        // holding the lock for too long.
337        let mut added = 0;
338        let mut changed = false;
339        if has_valid {
340            let mut stored = this.entries.write();
341            for (entry, is_valid) in std::iter::zip(entries, is_valid) {
342                if !is_valid {
343                    continue;
344                }
345
346                let status = stored.insert(&this.peer_resolver, entry);
347                changed |= status.is_changed();
348                added += status.is_added() as usize;
349
350                if added >= to_add {
351                    break;
352                }
353            }
354        }
355
356        // Rollback entries that were not valid and not inserted
357        if added < to_add {
358            this.entry_count
359                .fetch_sub(to_add - added, Ordering::Release);
360        }
361
362        if added > 0 {
363            this.entries_added.notify_waiters();
364        }
365        if changed {
366            this.entries_changed.notify_waiters();
367        }
368
369        changed || added > 0
370    }
371
372    /// Removes all expired and banned entries from the overlay.
373    pub(crate) fn remove_invalid_entries(&self, now: u32) {
374        let this = self.inner.as_ref();
375
376        let mut should_notify = false;
377        let mut entries = this.entries.write();
378        entries.retain(|item| {
379            let retain = !item.entry.is_expired(now, this.entry_ttl_sec)
380                && !this.banned_peer_ids.contains(&item.entry.peer_id);
381            should_notify |= !retain;
382            retain
383        });
384
385        if should_notify {
386            self.inner.entries_removed.notify_waiters();
387        }
388    }
389
390    fn prepend_prefix_to_body(&self, body: &mut Bytes) {
391        let this = self.inner.as_ref();
392
393        // TODO: reduce allocations
394        let mut res = BytesMut::with_capacity(this.request_prefix.len() + body.len());
395        res.extend_from_slice(&this.request_prefix);
396        res.extend_from_slice(body);
397        *body = res.freeze();
398    }
399}
400
401impl std::fmt::Debug for PublicOverlay {
402    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
403        f.debug_struct("PublicOverlay")
404            .field("overlay_id", &self.inner.overlay_id)
405            .finish()
406    }
407}
408
409struct Inner {
410    overlay_id: OverlayId,
411    min_capacity: usize,
412    entry_ttl_sec: u32,
413    peer_resolver: Option<PeerResolver>,
414    entries: RwLock<PublicOverlayEntries>,
415    entry_count: AtomicUsize,
416    entries_added: Notify,
417    entries_changed: Notify,
418    entries_removed: Notify,
419    own_signed_entry: ArcSwapOption<PublicEntry>,
420    unknown_peers_queue: UnknownPeersQueue,
421    banned_peer_ids: FastDashSet<PeerId>,
422    service: BoxService<ServiceRequest, Response>,
423    request_prefix: Box<[u8]>,
424    metrics: Metrics,
425}
426
427pub struct PublicOverlayEntries {
428    items: OverlayItems,
429}
430
431impl PublicOverlayEntries {
432    /// Returns `true` if the set contains no elements.
433    pub fn is_empty(&self) -> bool {
434        self.items.is_empty()
435    }
436
437    /// Returns the number of elements in the set, also referred to as its 'length'.
438    pub fn len(&self) -> usize {
439        self.items.len()
440    }
441
442    /// Returns true if the set contains the specified peer id.
443    pub fn contains(&self, peer_id: &PeerId) -> bool {
444        self.items.contains_key(peer_id)
445    }
446
447    /// Returns an iterator over the entries.
448    ///
449    /// The order is not random, but is not defined.
450    pub fn iter(&self) -> indexmap::map::Values<'_, PeerId, PublicOverlayEntryData> {
451        self.items.values()
452    }
453
454    /// Returns a reference to one random element of the slice,
455    /// or `None` if the slice is empty.
456    pub fn choose<R>(&self, rng: &mut R) -> Option<&PublicOverlayEntryData>
457    where
458        R: Rng + ?Sized,
459    {
460        let index = rng.random_range(0..self.items.len());
461        let (_, value) = self.items.get_index(index)?;
462        Some(value)
463    }
464
465    /// Chooses `n` entries from the set, without repetition,
466    /// and in random order.
467    pub fn choose_multiple<R>(
468        &self,
469        rng: &mut R,
470        n: usize,
471    ) -> ChooseMultiplePublicOverlayEntries<'_>
472    where
473        R: Rng + ?Sized,
474    {
475        let len = self.items.len();
476        ChooseMultiplePublicOverlayEntries {
477            items: &self.items,
478            indices: rand::seq::index::sample(rng, len, n.min(len)).into_iter(),
479        }
480    }
481
482    /// Chooses all entries from the set, without repetition,
483    /// and in random order.
484    pub fn choose_all<R>(&self, rng: &mut R) -> ChooseMultiplePublicOverlayEntries<'_>
485    where
486        R: Rng + ?Sized,
487    {
488        self.choose_multiple(rng, self.items.len())
489    }
490
491    fn insert(&mut self, peer_resolver: &Option<PeerResolver>, item: &PublicEntry) -> UpdateStatus {
492        match self.items.entry(item.peer_id) {
493            // No entry for the peer_id, insert a new one
494            indexmap::map::Entry::Vacant(entry) => {
495                let resolver_handle = peer_resolver.as_ref().map_or_else(
496                    || PeerResolverHandle::new_noop(&item.peer_id),
497                    |resolver| resolver.insert(&item.peer_id, false),
498                );
499
500                entry.insert(PublicOverlayEntryData {
501                    entry: Arc::new(item.clone()),
502                    resolver_handle,
503                });
504
505                UpdateStatus::Added
506            }
507            // Entry for the peer_id exists, update it if the new item is newer
508            indexmap::map::Entry::Occupied(mut entry) => {
509                let existing = entry.get_mut();
510                if existing.entry.created_at >= item.created_at {
511                    return UpdateStatus::Skipped;
512                }
513
514                // Try to reuse the existing Arc if possible
515                match Arc::get_mut(&mut existing.entry) {
516                    Some(existing) => existing.clone_from(item),
517                    None => existing.entry = Arc::new(item.clone()),
518                }
519                UpdateStatus::Updated
520            }
521        }
522    }
523
524    fn retain<F>(&mut self, mut f: F)
525    where
526        F: FnMut(&PublicOverlayEntryData) -> bool,
527    {
528        self.items.retain(|_, item| f(item));
529    }
530}
531
532#[derive(Clone)]
533pub struct PublicOverlayEntryData {
534    pub entry: Arc<PublicEntry>,
535    pub resolver_handle: PeerResolverHandle,
536}
537
538impl PublicOverlayEntryData {
539    pub fn is_expired(&self, now: u32, ttl: u32) -> bool {
540        self.entry.is_expired(now, ttl)
541    }
542
543    pub fn expires_at(&self, ttl: u32) -> u32 {
544        self.entry.created_at.saturating_add(ttl)
545    }
546}
547
548pub struct PublicOverlayEntriesReadGuard<'a> {
549    entries: RwLockReadGuard<'a, PublicOverlayEntries>,
550}
551
552impl std::ops::Deref for PublicOverlayEntriesReadGuard<'_> {
553    type Target = PublicOverlayEntries;
554
555    #[inline]
556    fn deref(&self) -> &Self::Target {
557        &self.entries
558    }
559}
560
561pub struct UnknownPeersQueue {
562    peer_ids: Mutex<IndexSet<PeerId, FastHasherState>>,
563    peer_id_count: AtomicUsize,
564    capacity: usize,
565}
566
567impl UnknownPeersQueue {
568    pub fn with_capacity(capacity: usize) -> Self {
569        Self {
570            peer_ids: Mutex::new(IndexSet::with_capacity_and_hasher(
571                capacity,
572                Default::default(),
573            )),
574            peer_id_count: AtomicUsize::new(0),
575            capacity,
576        }
577    }
578
579    pub fn is_empty(&self) -> bool {
580        self.len() == 0
581    }
582
583    pub fn is_full(&self) -> bool {
584        self.len() >= self.capacity
585    }
586
587    pub fn len(&self) -> usize {
588        self.peer_id_count.load(Ordering::Acquire)
589    }
590
591    /// Tries to push a peer id to the queue.
592    /// Returns true if this id was really added.
593    pub fn push(&self, peer_id: &PeerId) -> bool {
594        // NOTE: We could also optimistically check `is_full` here, but we are
595        // already doing it in the outer scope before the "known entry" check.
596
597        let mut peer_ids = self.peer_ids.lock();
598        if peer_ids.len() >= self.capacity {
599            return false;
600        }
601
602        let added = peer_ids.insert(*peer_id);
603        self.peer_id_count.fetch_add(added as _, Ordering::Release);
604        added
605    }
606
607    /// Pops all collected peer ids.
608    pub fn pop_multiple(&self) -> Option<IndexSet<PeerId, FastHasherState>> {
609        if self.is_empty() {
610            return None;
611        }
612
613        let mut peer_ids = self.peer_ids.lock();
614        self.peer_id_count.store(0, Ordering::Release);
615        let res = std::mem::take(&mut *peer_ids);
616        if res.is_empty() { None } else { Some(res) }
617    }
618}
619
620#[derive(Debug, Clone, Copy, PartialEq, Eq)]
621enum UpdateStatus {
622    Skipped,
623    Updated,
624    Added,
625}
626
627impl UpdateStatus {
628    fn is_changed(self) -> bool {
629        matches!(self, Self::Updated | Self::Added)
630    }
631
632    fn is_added(self) -> bool {
633        matches!(self, Self::Added)
634    }
635}
636
637pub struct ChooseMultiplePublicOverlayEntries<'a> {
638    items: &'a OverlayItems,
639    indices: rand::seq::index::IndexVecIntoIter,
640}
641
642impl<'a> Iterator for ChooseMultiplePublicOverlayEntries<'a> {
643    type Item = &'a PublicOverlayEntryData;
644
645    fn next(&mut self) -> Option<Self::Item> {
646        self.indices.next().and_then(|i| {
647            let (_, value) = self.items.get_index(i)?;
648            Some(value)
649        })
650    }
651
652    fn size_hint(&self) -> (usize, Option<usize>) {
653        (self.indices.len(), Some(self.indices.len()))
654    }
655}
656
657impl ExactSizeIterator for ChooseMultiplePublicOverlayEntries<'_> {
658    fn len(&self) -> usize {
659        self.indices.len()
660    }
661}
662
663type OverlayItems = IndexMap<PeerId, PublicOverlayEntryData, FastHasherState>;
664
665#[cfg(test)]
666mod tests {
667    use tycho_crypto::ed25519;
668    use tycho_util::time::now_sec;
669
670    use super::*;
671
672    fn generate_public_entry(overlay: &PublicOverlay, now: u32) -> Arc<PublicEntry> {
673        let keypair = rand::random::<ed25519::KeyPair>();
674        let peer_id: PeerId = keypair.public_key.into();
675        let signature = keypair.sign_tl(crate::proto::overlay::PublicEntryToSign {
676            overlay_id: overlay.overlay_id().as_bytes(),
677            peer_id: &peer_id,
678            created_at: now,
679        });
680        Arc::new(PublicEntry {
681            peer_id,
682            created_at: now,
683            signature: Box::new(signature),
684        })
685    }
686
687    fn generate_invalid_public_entry(now: u32) -> Arc<PublicEntry> {
688        let keypair = rand::random::<ed25519::KeyPair>();
689        let peer_id: PeerId = keypair.public_key.into();
690        Arc::new(PublicEntry {
691            peer_id,
692            created_at: now,
693            signature: Box::new([0; 64]),
694        })
695    }
696
697    fn generate_public_entries(
698        overlay: &PublicOverlay,
699        now: u32,
700        n: usize,
701    ) -> Vec<Arc<PublicEntry>> {
702        (0..n)
703            .map(|_| generate_public_entry(overlay, now))
704            .collect()
705    }
706
707    fn count_entries(overlay: &PublicOverlay) -> usize {
708        let tracked_count = overlay.inner.entry_count.load(Ordering::Acquire);
709        let guard = overlay.read_entries();
710        assert_eq!(guard.entries.items.len(), tracked_count);
711        tracked_count
712    }
713
714    fn make_overlay_with_min_capacity(min_capacity: usize) -> PublicOverlay {
715        PublicOverlay::builder(rand::random())
716            .with_min_capacity(min_capacity)
717            .build(crate::service_query_fn(|_| {
718                futures_util::future::ready(None)
719            }))
720    }
721
722    #[test]
723    fn min_capacity_works_with_single_thread() {
724        let now = now_sec();
725        let local_id: PeerId = rand::random();
726
727        // Add with small portions
728        {
729            let overlay = make_overlay_with_min_capacity(10);
730            let entries = generate_public_entries(&overlay, now, 10);
731
732            overlay.add_untrusted_entries(&local_id, &entries[..5], now);
733            assert_eq!(count_entries(&overlay), 5);
734
735            overlay.add_untrusted_entries(&local_id, &entries[5..], now);
736            assert_eq!(count_entries(&overlay), 10);
737        }
738
739        // Add exact
740        {
741            let overlay = make_overlay_with_min_capacity(10);
742            let entries = generate_public_entries(&overlay, now, 10);
743            overlay.add_untrusted_entries(&local_id, &entries, now);
744            assert_eq!(count_entries(&overlay), 10);
745        }
746
747        // Add once but too much
748        {
749            let overlay = make_overlay_with_min_capacity(10);
750            let entries = generate_public_entries(&overlay, now, 20);
751            overlay.add_untrusted_entries(&local_id, &entries, now);
752            assert_eq!(count_entries(&overlay), 10);
753        }
754
755        // Add once but zero capacity
756        {
757            let overlay = make_overlay_with_min_capacity(0);
758            let entries = generate_public_entries(&overlay, now, 10);
759            overlay.add_untrusted_entries(&local_id, &entries, now);
760            assert_eq!(count_entries(&overlay), 0);
761        }
762
763        // Add all invalid entries
764        {
765            let overlay = make_overlay_with_min_capacity(10);
766            let entries = (0..10)
767                .map(|_| generate_invalid_public_entry(now))
768                .collect::<Vec<_>>();
769            overlay.add_untrusted_entries(&local_id, &entries, now);
770            assert_eq!(count_entries(&overlay), 0);
771        }
772
773        // Add mixed invalid entries
774        {
775            let overlay = make_overlay_with_min_capacity(10);
776            let entries = [
777                generate_invalid_public_entry(now),
778                generate_public_entry(&overlay, now),
779                generate_invalid_public_entry(now),
780                generate_public_entry(&overlay, now),
781                generate_invalid_public_entry(now),
782                generate_public_entry(&overlay, now),
783                generate_invalid_public_entry(now),
784                generate_public_entry(&overlay, now),
785                generate_invalid_public_entry(now),
786                generate_public_entry(&overlay, now),
787            ];
788            overlay.add_untrusted_entries(&local_id, &entries, now);
789            assert_eq!(count_entries(&overlay), 5);
790        }
791
792        // Add mixed invalid entries on edge
793        {
794            let overlay = make_overlay_with_min_capacity(3);
795            let entries = [
796                generate_invalid_public_entry(now),
797                generate_invalid_public_entry(now),
798                generate_invalid_public_entry(now),
799                generate_invalid_public_entry(now),
800                generate_invalid_public_entry(now),
801                generate_public_entry(&overlay, now),
802                generate_public_entry(&overlay, now),
803                generate_public_entry(&overlay, now),
804                generate_public_entry(&overlay, now),
805                generate_public_entry(&overlay, now),
806            ];
807            overlay.add_untrusted_entries(&local_id, &entries, now);
808            assert_eq!(count_entries(&overlay), 3);
809        }
810    }
811
812    #[test]
813    fn min_capacity_works_with_multi_thread() {
814        let now = now_sec();
815        let local_id: PeerId = rand::random();
816
817        let overlay = make_overlay_with_min_capacity(201);
818        let entries = generate_public_entries(&overlay, now, 7 * 3 * 10);
819
820        std::thread::scope(|s| {
821            for entries in entries.chunks_exact(7 * 3) {
822                s.spawn(|| {
823                    for entries in entries.chunks_exact(7) {
824                        overlay.add_untrusted_entries(&local_id, entries, now);
825                    }
826                });
827            }
828        });
829
830        assert_eq!(count_entries(&overlay), 201);
831    }
832
833    #[test]
834    fn unknown_peers_queue() {
835        let queue = UnknownPeersQueue::with_capacity(5);
836        assert!(queue.is_empty());
837        assert!(!queue.is_full());
838
839        // Add
840        let added = queue.push(&PeerId([0; 32]));
841        assert!(added);
842        assert_eq!(queue.len(), 1);
843        assert!(!queue.is_empty());
844        assert!(!queue.is_full());
845
846        let added = queue.push(&PeerId([0; 32]));
847        assert!(!added);
848        assert_eq!(queue.len(), 1);
849
850        for i in 1..=3 {
851            let added = queue.push(&PeerId([i; 32]));
852            assert!(added);
853            assert_eq!(queue.len(), i as usize + 1);
854            assert!(!queue.is_empty());
855            assert!(!queue.is_full());
856        }
857
858        let added = queue.push(&PeerId([4; 32]));
859        assert!(added);
860        assert_eq!(queue.len(), 5);
861        assert!(queue.is_full());
862
863        let added = queue.push(&PeerId([5; 32]));
864        assert!(!added);
865        assert_eq!(queue.len(), 5);
866        assert!(queue.is_full());
867
868        // Pop
869        let items = queue.pop_multiple().unwrap();
870        assert!(queue.is_empty());
871        assert!(!queue.is_full());
872        assert_eq!(items.len(), 5);
873        for i in 0..5 {
874            assert!(items.contains(&PeerId([i; 32])));
875        }
876
877        let items = queue.pop_multiple();
878        assert!(items.is_none());
879
880        // Add
881        let added = queue.push(&PeerId([0; 32]));
882        assert!(added);
883        assert_eq!(queue.len(), 1);
884        assert!(!queue.is_empty());
885        assert!(!queue.is_full());
886
887        // Pop
888        let items = queue.pop_multiple().unwrap();
889        assert!(queue.is_empty());
890        assert!(!queue.is_full());
891        assert_eq!(items.len(), 1);
892        assert!(items.contains(&PeerId([0; 32])));
893    }
894}