tycho_network/overlay/
public_overlay.rs

1use std::borrow::Borrow;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::time::Duration;
5
6use anyhow::Result;
7use arc_swap::ArcSwapOption;
8use bytes::{Bytes, BytesMut};
9use indexmap::{IndexMap, IndexSet};
10use parking_lot::{Mutex, RwLock, RwLockReadGuard};
11use rand::Rng;
12use tokio::sync::Notify;
13use tycho_util::futures::BoxFutureOrNoop;
14use tycho_util::{FastDashSet, FastHasherState};
15
16use crate::dht::{PeerResolver, PeerResolverHandle};
17use crate::network::Network;
18use crate::overlay::OverlayId;
19use crate::overlay::metrics::Metrics;
20use crate::proto::overlay::{PublicEntry, PublicEntryToSign, rpc};
21use crate::types::{BoxService, PeerId, Request, Response, Service, ServiceExt, ServiceRequest};
22use crate::util::NetworkExt;
23
24pub struct PublicOverlayBuilder {
25    overlay_id: OverlayId,
26    min_capacity: usize,
27    entry_ttl: Duration,
28    banned_peer_ids: FastDashSet<PeerId>,
29    peer_resolver: Option<PeerResolver>,
30    name: Option<&'static str>,
31}
32
33impl PublicOverlayBuilder {
34    /// Minimum capacity for public overlay.
35    /// Public overlay will use suggested peers from untrusted sources to fill the overlay
36    /// until it reaches this capacity.
37    ///
38    /// Default: 100.
39    pub fn with_min_capacity(mut self, min_capacity: usize) -> Self {
40        self.min_capacity = min_capacity;
41        self
42    }
43
44    /// Time-to-live for each entry in the overlay.
45    ///
46    /// Default: 1 hour.
47    pub fn with_entry_ttl(mut self, entry_ttl: Duration) -> Self {
48        self.entry_ttl = entry_ttl;
49        self
50    }
51
52    /// Banned peers that will not be ignored by the overlay.
53    pub fn with_banned_peers<I>(mut self, banned_peers: I) -> Self
54    where
55        I: IntoIterator,
56        I::Item: Borrow<PeerId>,
57    {
58        self.banned_peer_ids
59            .extend(banned_peers.into_iter().map(|id| *id.borrow()));
60        self
61    }
62
63    /// Whether to resolve peers with the provided resolver.
64    ///
65    /// Does not resolve peers by default.
66    pub fn with_peer_resolver(mut self, peer_resolver: PeerResolver) -> Self {
67        self.peer_resolver = Some(peer_resolver);
68        self
69    }
70
71    /// Name of the overlay used in metrics.
72    pub fn named(mut self, name: &'static str) -> Self {
73        self.name = Some(name);
74        self
75    }
76
77    pub fn build<S>(self, service: S) -> PublicOverlay
78    where
79        S: Send + Sync + 'static,
80        S: Service<ServiceRequest, QueryResponse = Response>,
81    {
82        const UNRESOLVED_QUEUE_CAPACITY: usize = 5; // peers
83
84        let request_prefix = tl_proto::serialize(rpc::Prefix {
85            overlay_id: self.overlay_id.as_bytes(),
86        });
87
88        let entries = PublicOverlayEntries {
89            items: Default::default(),
90        };
91
92        let entry_ttl_sec = self.entry_ttl.as_secs().try_into().unwrap_or(u32::MAX);
93
94        PublicOverlay {
95            inner: Arc::new(Inner {
96                overlay_id: self.overlay_id,
97                min_capacity: self.min_capacity,
98                entry_ttl_sec,
99                peer_resolver: self.peer_resolver,
100                entries: RwLock::new(entries),
101                entries_added: Notify::new(),
102                entries_changed: Notify::new(),
103                entries_removed: Notify::new(),
104                entry_count: AtomicUsize::new(0),
105                own_signed_entry: Default::default(),
106                unknown_peers_queue: UnknownPeersQueue::with_capacity(UNRESOLVED_QUEUE_CAPACITY),
107                banned_peer_ids: self.banned_peer_ids,
108                service: service.boxed(),
109                request_prefix: request_prefix.into_boxed_slice(),
110                metrics: self
111                    .name
112                    .map(|label| Metrics::new("tycho_public_overlay", label))
113                    .unwrap_or_default(),
114            }),
115        }
116    }
117}
118
119#[derive(Clone)]
120#[repr(transparent)]
121pub struct PublicOverlay {
122    inner: Arc<Inner>,
123}
124
125impl PublicOverlay {
126    pub fn builder(overlay_id: OverlayId) -> PublicOverlayBuilder {
127        PublicOverlayBuilder {
128            overlay_id,
129            min_capacity: 100,
130            entry_ttl: Duration::from_secs(3600),
131            banned_peer_ids: Default::default(),
132            peer_resolver: None,
133            name: None,
134        }
135    }
136
137    #[inline]
138    pub fn overlay_id(&self) -> &OverlayId {
139        &self.inner.overlay_id
140    }
141
142    pub fn entry_ttl_sec(&self) -> u32 {
143        self.inner.entry_ttl_sec
144    }
145
146    pub fn peer_resolver(&self) -> &Option<PeerResolver> {
147        &self.inner.peer_resolver
148    }
149
150    pub fn unknown_peers_queue(&self) -> &UnknownPeersQueue {
151        &self.inner.unknown_peers_queue
152    }
153
154    pub async fn query(
155        &self,
156        network: &Network,
157        peer_id: &PeerId,
158        mut request: Request,
159    ) -> Result<Response> {
160        self.inner.metrics.record_tx(request.body.len());
161        self.prepend_prefix_to_body(&mut request.body);
162        network.query(peer_id, request).await
163    }
164
165    pub async fn send(
166        &self,
167        network: &Network,
168        peer_id: &PeerId,
169        mut request: Request,
170    ) -> Result<()> {
171        self.inner.metrics.record_tx(request.body.len());
172        self.prepend_prefix_to_body(&mut request.body);
173        network.send(peer_id, request).await
174    }
175
176    /// Bans the given peer from the overlay.
177    ///
178    /// Returns `true` if the peer was not already banned.
179    pub fn ban_peer(&self, peer_id: PeerId) -> bool {
180        self.inner.banned_peer_ids.insert(peer_id)
181    }
182
183    /// Unbans the given peer from the overlay.
184    ///
185    /// Returns `true` if the peer was banned.
186    pub fn unban_peer(&self, peer_id: &PeerId) -> bool {
187        self.inner.banned_peer_ids.remove(peer_id).is_some()
188    }
189
190    pub fn read_entries(&self) -> PublicOverlayEntriesReadGuard<'_> {
191        PublicOverlayEntriesReadGuard {
192            entries: self.inner.entries.read(),
193        }
194    }
195
196    /// Notifies when new entries are added to the overlay.
197    pub fn entires_added(&self) -> &Notify {
198        &self.inner.entries_added
199    }
200
201    /// Notifies when entries are updated in the overlay (added or updated).
202    pub fn entries_changed(&self) -> &Notify {
203        &self.inner.entries_changed
204    }
205
206    pub fn entries_removed(&self) -> &Notify {
207        &self.inner.entries_removed
208    }
209
210    /// Own signed public entry.
211    pub fn own_signed_entry(&self) -> Option<Arc<PublicEntry>> {
212        self.inner.own_signed_entry.load_full()
213    }
214
215    pub(crate) fn set_own_signed_entry(&self, entry: Arc<PublicEntry>) {
216        self.inner.own_signed_entry.store(Some(entry));
217    }
218
219    pub(crate) fn handle_query(&self, req: ServiceRequest) -> BoxFutureOrNoop<Option<Response>> {
220        self.inner.metrics.record_rx(req.body.len());
221        if self.check_peer_id(&req.metadata.peer_id) {
222            BoxFutureOrNoop::future(self.inner.service.on_query(req))
223        } else {
224            BoxFutureOrNoop::Noop
225        }
226    }
227
228    pub(crate) fn handle_message(&self, req: ServiceRequest) -> BoxFutureOrNoop<()> {
229        self.inner.metrics.record_rx(req.body.len());
230        if self.check_peer_id(&req.metadata.peer_id) {
231            BoxFutureOrNoop::future(self.inner.service.on_message(req))
232        } else {
233            BoxFutureOrNoop::Noop
234        }
235    }
236
237    fn check_peer_id(&self, peer_id: &PeerId) -> bool {
238        // TODO: Merge `banned_peer_ids` with `entires`?
239        if self.inner.banned_peer_ids.contains(peer_id) {
240            // Discard requests from banned peers.
241            return false;
242        }
243
244        // NOTE: We are checking `is_full` before `entries.read()`
245        // to reduce the amount of locks when we receive lots of requests
246        // from different peers.
247        if !self.inner.unknown_peers_queue.is_full() && !self.inner.entries.read().contains(peer_id)
248        {
249            // Push unknown peers into queue to resolve.
250            if self.inner.unknown_peers_queue.push(peer_id) {
251                tracing::debug!(%peer_id, "found new unknown peer to resolve");
252            }
253        }
254
255        true
256    }
257
258    /// Adds the given entries to the overlay.
259    ///
260    /// NOTE: Will deadlock if called while `PublicOverlayEntriesReadGuard` is held.
261    pub(crate) fn add_untrusted_entries(
262        &self,
263        local_id: &PeerId,
264        entries: &[Arc<PublicEntry>],
265        now: u32,
266    ) -> bool {
267        if entries.is_empty() {
268            return false;
269        }
270
271        let this = self.inner.as_ref();
272
273        // Check if we can add more entries to the overlay and optimistically
274        // increase the entry count. (if no other thread has already done so).
275        let to_add = entries.len();
276        let mut entry_count = this.entry_count.load(Ordering::Acquire);
277        let to_add = loop {
278            let to_add = match this.min_capacity.checked_sub(entry_count) {
279                Some(capacity) if capacity > 0 => std::cmp::min(to_add, capacity),
280                _ => return false,
281            };
282
283            let res = this.entry_count.compare_exchange_weak(
284                entry_count,
285                entry_count + to_add,
286                Ordering::Release,
287                Ordering::Acquire,
288            );
289            match res {
290                Ok(_) => break to_add,
291                Err(n) => entry_count = n,
292            }
293        };
294
295        // Prepare validation state
296        let mut is_valid = vec![false; entries.len()];
297        let mut has_valid = false;
298
299        // First pass: verify all entries
300        for (entry, is_valid) in std::iter::zip(entries, is_valid.iter_mut()) {
301            if entry.is_expired(now, this.entry_ttl_sec)
302                || self.inner.banned_peer_ids.contains(&entry.peer_id)
303                || entry.peer_id == local_id
304            {
305                // Skip expired or banned peers early
306                continue;
307            }
308
309            let Some(pubkey) = entry.peer_id.as_public_key() else {
310                // Skip entries with invalid public keys
311                continue;
312            };
313
314            if !pubkey.verify_tl(
315                PublicEntryToSign {
316                    overlay_id: this.overlay_id.as_bytes(),
317                    peer_id: &entry.peer_id,
318                    created_at: entry.created_at,
319                },
320                &entry.signature,
321            ) {
322                // Skip entries with invalid signatures
323                continue;
324            }
325
326            // NOTE: check all entries, even if we have more than `to_add`.
327            // We might need them if some are duplicates af known entries.
328            *is_valid = true;
329            has_valid = true;
330        }
331
332        // Second pass: insert all valid entries (if any)
333        //
334        // NOTE: two passes are necessary because public key parsing and
335        // signature verification can be expensive and we want to avoid
336        // holding the lock for too long.
337        let mut added = 0;
338        let mut changed = false;
339        if has_valid {
340            let mut stored = this.entries.write();
341            for (entry, is_valid) in std::iter::zip(entries, is_valid) {
342                if !is_valid {
343                    continue;
344                }
345
346                let status = stored.insert(&this.peer_resolver, entry);
347                changed |= status.is_changed();
348                added += status.is_added() as usize;
349
350                if added >= to_add {
351                    break;
352                }
353            }
354        }
355
356        // Rollback entries that were not valid and not inserted
357        if added < to_add {
358            this.entry_count
359                .fetch_sub(to_add - added, Ordering::Release);
360        }
361
362        if added > 0 {
363            this.entries_added.notify_waiters();
364        }
365        if changed {
366            this.entries_changed.notify_waiters();
367        }
368
369        changed || added > 0
370    }
371
372    /// Removes all expired and banned entries from the overlay.
373    pub(crate) fn remove_invalid_entries(&self, now: u32) {
374        let this = self.inner.as_ref();
375
376        let mut removed = 0;
377        let mut entries = this.entries.write();
378        entries.retain(|item| {
379            let retain = !item.entry.is_expired(now, this.entry_ttl_sec)
380                && !this.banned_peer_ids.contains(&item.entry.peer_id);
381
382            if !retain {
383                removed += 1;
384            }
385
386            retain
387        });
388
389        if removed > 0 {
390            this.entry_count.fetch_sub(removed, Ordering::Release);
391            self.inner.entries_removed.notify_waiters();
392        }
393    }
394
395    fn prepend_prefix_to_body(&self, body: &mut Bytes) {
396        let this = self.inner.as_ref();
397
398        // TODO: reduce allocations
399        let mut res = BytesMut::with_capacity(this.request_prefix.len() + body.len());
400        res.extend_from_slice(&this.request_prefix);
401        res.extend_from_slice(body);
402        *body = res.freeze();
403    }
404}
405
406impl std::fmt::Debug for PublicOverlay {
407    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
408        f.debug_struct("PublicOverlay")
409            .field("overlay_id", &self.inner.overlay_id)
410            .finish()
411    }
412}
413
414struct Inner {
415    overlay_id: OverlayId,
416    min_capacity: usize,
417    entry_ttl_sec: u32,
418    peer_resolver: Option<PeerResolver>,
419    entries: RwLock<PublicOverlayEntries>,
420    entry_count: AtomicUsize,
421    entries_added: Notify,
422    entries_changed: Notify,
423    entries_removed: Notify,
424    own_signed_entry: ArcSwapOption<PublicEntry>,
425    unknown_peers_queue: UnknownPeersQueue,
426    banned_peer_ids: FastDashSet<PeerId>,
427    service: BoxService<ServiceRequest, Response>,
428    request_prefix: Box<[u8]>,
429    metrics: Metrics,
430}
431
432pub struct PublicOverlayEntries {
433    items: OverlayItems,
434}
435
436impl PublicOverlayEntries {
437    /// Returns `true` if the set contains no elements.
438    pub fn is_empty(&self) -> bool {
439        self.items.is_empty()
440    }
441
442    /// Returns the number of elements in the set, also referred to as its 'length'.
443    pub fn len(&self) -> usize {
444        self.items.len()
445    }
446
447    /// Returns true if the set contains the specified peer id.
448    pub fn contains(&self, peer_id: &PeerId) -> bool {
449        self.items.contains_key(peer_id)
450    }
451
452    /// Returns an iterator over the entries.
453    ///
454    /// The order is not random, but is not defined.
455    pub fn iter(&self) -> indexmap::map::Values<'_, PeerId, PublicOverlayEntryData> {
456        self.items.values()
457    }
458
459    /// Returns a reference to one random element of the slice,
460    /// or `None` if the slice is empty.
461    pub fn choose<R>(&self, rng: &mut R) -> Option<&PublicOverlayEntryData>
462    where
463        R: Rng + ?Sized,
464    {
465        let index = rng.random_range(0..self.items.len());
466        let (_, value) = self.items.get_index(index)?;
467        Some(value)
468    }
469
470    /// Chooses `n` entries from the set, without repetition,
471    /// and in random order.
472    pub fn choose_multiple<R>(
473        &self,
474        rng: &mut R,
475        n: usize,
476    ) -> ChooseMultiplePublicOverlayEntries<'_>
477    where
478        R: Rng + ?Sized,
479    {
480        let len = self.items.len();
481        ChooseMultiplePublicOverlayEntries {
482            items: &self.items,
483            indices: rand::seq::index::sample(rng, len, n.min(len)).into_iter(),
484        }
485    }
486
487    /// Chooses all entries from the set, without repetition,
488    /// and in random order.
489    pub fn choose_all<R>(&self, rng: &mut R) -> ChooseMultiplePublicOverlayEntries<'_>
490    where
491        R: Rng + ?Sized,
492    {
493        self.choose_multiple(rng, self.items.len())
494    }
495
496    fn insert(&mut self, peer_resolver: &Option<PeerResolver>, item: &PublicEntry) -> UpdateStatus {
497        match self.items.entry(item.peer_id) {
498            // No entry for the peer_id, insert a new one
499            indexmap::map::Entry::Vacant(entry) => {
500                let resolver_handle = peer_resolver.as_ref().map_or_else(
501                    || PeerResolverHandle::new_noop(&item.peer_id),
502                    |resolver| resolver.insert(&item.peer_id, false),
503                );
504
505                entry.insert(PublicOverlayEntryData {
506                    entry: Arc::new(item.clone()),
507                    resolver_handle,
508                });
509
510                UpdateStatus::Added
511            }
512            // Entry for the peer_id exists, update it if the new item is newer
513            indexmap::map::Entry::Occupied(mut entry) => {
514                let existing = entry.get_mut();
515                if existing.entry.created_at >= item.created_at {
516                    return UpdateStatus::Skipped;
517                }
518
519                // Try to reuse the existing Arc if possible
520                match Arc::get_mut(&mut existing.entry) {
521                    Some(existing) => existing.clone_from(item),
522                    None => existing.entry = Arc::new(item.clone()),
523                }
524                UpdateStatus::Updated
525            }
526        }
527    }
528
529    fn retain<F>(&mut self, mut f: F)
530    where
531        F: FnMut(&PublicOverlayEntryData) -> bool,
532    {
533        self.items.retain(|_, item| f(item));
534    }
535}
536
537#[derive(Clone)]
538pub struct PublicOverlayEntryData {
539    pub entry: Arc<PublicEntry>,
540    pub resolver_handle: PeerResolverHandle,
541}
542
543impl PublicOverlayEntryData {
544    pub fn is_expired(&self, now: u32, ttl: u32) -> bool {
545        self.entry.is_expired(now, ttl)
546    }
547
548    pub fn expires_at(&self, ttl: u32) -> u32 {
549        self.entry.created_at.saturating_add(ttl)
550    }
551}
552
553pub struct PublicOverlayEntriesReadGuard<'a> {
554    entries: RwLockReadGuard<'a, PublicOverlayEntries>,
555}
556
557impl std::ops::Deref for PublicOverlayEntriesReadGuard<'_> {
558    type Target = PublicOverlayEntries;
559
560    #[inline]
561    fn deref(&self) -> &Self::Target {
562        &self.entries
563    }
564}
565
566pub struct UnknownPeersQueue {
567    peer_ids: Mutex<IndexSet<PeerId, FastHasherState>>,
568    peer_id_count: AtomicUsize,
569    capacity: usize,
570}
571
572impl UnknownPeersQueue {
573    pub fn with_capacity(capacity: usize) -> Self {
574        Self {
575            peer_ids: Mutex::new(IndexSet::with_capacity_and_hasher(
576                capacity,
577                Default::default(),
578            )),
579            peer_id_count: AtomicUsize::new(0),
580            capacity,
581        }
582    }
583
584    pub fn is_empty(&self) -> bool {
585        self.len() == 0
586    }
587
588    pub fn is_full(&self) -> bool {
589        self.len() >= self.capacity
590    }
591
592    pub fn len(&self) -> usize {
593        self.peer_id_count.load(Ordering::Acquire)
594    }
595
596    /// Tries to push a peer id to the queue.
597    /// Returns true if this id was really added.
598    pub fn push(&self, peer_id: &PeerId) -> bool {
599        // NOTE: We could also optimistically check `is_full` here, but we are
600        // already doing it in the outer scope before the "known entry" check.
601
602        let mut peer_ids = self.peer_ids.lock();
603        if peer_ids.len() >= self.capacity {
604            return false;
605        }
606
607        let added = peer_ids.insert(*peer_id);
608        self.peer_id_count.fetch_add(added as _, Ordering::Release);
609        added
610    }
611
612    /// Pops all collected peer ids.
613    pub fn pop_multiple(&self) -> Option<IndexSet<PeerId, FastHasherState>> {
614        if self.is_empty() {
615            return None;
616        }
617
618        let mut peer_ids = self.peer_ids.lock();
619        self.peer_id_count.store(0, Ordering::Release);
620        let res = std::mem::take(&mut *peer_ids);
621        if res.is_empty() { None } else { Some(res) }
622    }
623}
624
625#[derive(Debug, Clone, Copy, PartialEq, Eq)]
626enum UpdateStatus {
627    Skipped,
628    Updated,
629    Added,
630}
631
632impl UpdateStatus {
633    fn is_changed(self) -> bool {
634        matches!(self, Self::Updated | Self::Added)
635    }
636
637    fn is_added(self) -> bool {
638        matches!(self, Self::Added)
639    }
640}
641
642pub struct ChooseMultiplePublicOverlayEntries<'a> {
643    items: &'a OverlayItems,
644    indices: rand::seq::index::IndexVecIntoIter,
645}
646
647impl<'a> Iterator for ChooseMultiplePublicOverlayEntries<'a> {
648    type Item = &'a PublicOverlayEntryData;
649
650    fn next(&mut self) -> Option<Self::Item> {
651        self.indices.next().and_then(|i| {
652            let (_, value) = self.items.get_index(i)?;
653            Some(value)
654        })
655    }
656
657    fn size_hint(&self) -> (usize, Option<usize>) {
658        (self.indices.len(), Some(self.indices.len()))
659    }
660}
661
662impl ExactSizeIterator for ChooseMultiplePublicOverlayEntries<'_> {
663    fn len(&self) -> usize {
664        self.indices.len()
665    }
666}
667
668type OverlayItems = IndexMap<PeerId, PublicOverlayEntryData, FastHasherState>;
669
670#[cfg(test)]
671mod tests {
672    use tycho_crypto::ed25519;
673    use tycho_util::time::now_sec;
674
675    use super::*;
676
677    fn generate_public_entry(overlay: &PublicOverlay, now: u32) -> Arc<PublicEntry> {
678        let keypair = rand::random::<ed25519::KeyPair>();
679        let peer_id: PeerId = keypair.public_key.into();
680        let signature = keypair.sign_tl(crate::proto::overlay::PublicEntryToSign {
681            overlay_id: overlay.overlay_id().as_bytes(),
682            peer_id: &peer_id,
683            created_at: now,
684        });
685        Arc::new(PublicEntry {
686            peer_id,
687            created_at: now,
688            signature: Box::new(signature),
689        })
690    }
691
692    fn generate_invalid_public_entry(now: u32) -> Arc<PublicEntry> {
693        let keypair = rand::random::<ed25519::KeyPair>();
694        let peer_id: PeerId = keypair.public_key.into();
695        Arc::new(PublicEntry {
696            peer_id,
697            created_at: now,
698            signature: Box::new([0; 64]),
699        })
700    }
701
702    fn generate_public_entries(
703        overlay: &PublicOverlay,
704        now: u32,
705        n: usize,
706    ) -> Vec<Arc<PublicEntry>> {
707        (0..n)
708            .map(|_| generate_public_entry(overlay, now))
709            .collect()
710    }
711
712    fn count_entries(overlay: &PublicOverlay) -> usize {
713        let tracked_count = overlay.inner.entry_count.load(Ordering::Acquire);
714        let guard = overlay.read_entries();
715        assert_eq!(guard.entries.items.len(), tracked_count);
716        tracked_count
717    }
718
719    fn make_overlay_with_min_capacity(min_capacity: usize) -> PublicOverlay {
720        PublicOverlay::builder(rand::random())
721            .with_min_capacity(min_capacity)
722            .build(crate::service_query_fn(|_| {
723                futures_util::future::ready(None)
724            }))
725    }
726
727    #[test]
728    fn min_capacity_works_with_single_thread() {
729        let now = now_sec();
730        let local_id: PeerId = rand::random();
731
732        // Add with small portions
733        {
734            let overlay = make_overlay_with_min_capacity(10);
735            let entries = generate_public_entries(&overlay, now, 10);
736
737            overlay.add_untrusted_entries(&local_id, &entries[..5], now);
738            assert_eq!(count_entries(&overlay), 5);
739
740            overlay.add_untrusted_entries(&local_id, &entries[5..], now);
741            assert_eq!(count_entries(&overlay), 10);
742        }
743
744        // Add exact
745        {
746            let overlay = make_overlay_with_min_capacity(10);
747            let entries = generate_public_entries(&overlay, now, 10);
748            overlay.add_untrusted_entries(&local_id, &entries, now);
749            assert_eq!(count_entries(&overlay), 10);
750        }
751
752        // Add once but too much
753        {
754            let overlay = make_overlay_with_min_capacity(10);
755            let entries = generate_public_entries(&overlay, now, 20);
756            overlay.add_untrusted_entries(&local_id, &entries, now);
757            assert_eq!(count_entries(&overlay), 10);
758        }
759
760        // Add once but zero capacity
761        {
762            let overlay = make_overlay_with_min_capacity(0);
763            let entries = generate_public_entries(&overlay, now, 10);
764            overlay.add_untrusted_entries(&local_id, &entries, now);
765            assert_eq!(count_entries(&overlay), 0);
766        }
767
768        // Add all invalid entries
769        {
770            let overlay = make_overlay_with_min_capacity(10);
771            let entries = (0..10)
772                .map(|_| generate_invalid_public_entry(now))
773                .collect::<Vec<_>>();
774            overlay.add_untrusted_entries(&local_id, &entries, now);
775            assert_eq!(count_entries(&overlay), 0);
776        }
777
778        // Add mixed invalid entries
779        {
780            let overlay = make_overlay_with_min_capacity(10);
781            let entries = [
782                generate_invalid_public_entry(now),
783                generate_public_entry(&overlay, now),
784                generate_invalid_public_entry(now),
785                generate_public_entry(&overlay, now),
786                generate_invalid_public_entry(now),
787                generate_public_entry(&overlay, now),
788                generate_invalid_public_entry(now),
789                generate_public_entry(&overlay, now),
790                generate_invalid_public_entry(now),
791                generate_public_entry(&overlay, now),
792            ];
793            overlay.add_untrusted_entries(&local_id, &entries, now);
794            assert_eq!(count_entries(&overlay), 5);
795        }
796
797        // Add mixed invalid entries on edge
798        {
799            let overlay = make_overlay_with_min_capacity(3);
800            let entries = [
801                generate_invalid_public_entry(now),
802                generate_invalid_public_entry(now),
803                generate_invalid_public_entry(now),
804                generate_invalid_public_entry(now),
805                generate_invalid_public_entry(now),
806                generate_public_entry(&overlay, now),
807                generate_public_entry(&overlay, now),
808                generate_public_entry(&overlay, now),
809                generate_public_entry(&overlay, now),
810                generate_public_entry(&overlay, now),
811            ];
812            overlay.add_untrusted_entries(&local_id, &entries, now);
813            assert_eq!(count_entries(&overlay), 3);
814        }
815    }
816
817    #[test]
818    fn min_capacity_works_with_multi_thread() {
819        let now = now_sec();
820        let local_id: PeerId = rand::random();
821
822        let overlay = make_overlay_with_min_capacity(201);
823        let entries = generate_public_entries(&overlay, now, 7 * 3 * 10);
824
825        std::thread::scope(|s| {
826            for entries in entries.chunks_exact(7 * 3) {
827                s.spawn(|| {
828                    for entries in entries.chunks_exact(7) {
829                        overlay.add_untrusted_entries(&local_id, entries, now);
830                    }
831                });
832            }
833        });
834
835        assert_eq!(count_entries(&overlay), 201);
836    }
837
838    #[test]
839    fn unknown_peers_queue() {
840        let queue = UnknownPeersQueue::with_capacity(5);
841        assert!(queue.is_empty());
842        assert!(!queue.is_full());
843
844        // Add
845        let added = queue.push(&PeerId([0; 32]));
846        assert!(added);
847        assert_eq!(queue.len(), 1);
848        assert!(!queue.is_empty());
849        assert!(!queue.is_full());
850
851        let added = queue.push(&PeerId([0; 32]));
852        assert!(!added);
853        assert_eq!(queue.len(), 1);
854
855        for i in 1..=3 {
856            let added = queue.push(&PeerId([i; 32]));
857            assert!(added);
858            assert_eq!(queue.len(), i as usize + 1);
859            assert!(!queue.is_empty());
860            assert!(!queue.is_full());
861        }
862
863        let added = queue.push(&PeerId([4; 32]));
864        assert!(added);
865        assert_eq!(queue.len(), 5);
866        assert!(queue.is_full());
867
868        let added = queue.push(&PeerId([5; 32]));
869        assert!(!added);
870        assert_eq!(queue.len(), 5);
871        assert!(queue.is_full());
872
873        // Pop
874        let items = queue.pop_multiple().unwrap();
875        assert!(queue.is_empty());
876        assert!(!queue.is_full());
877        assert_eq!(items.len(), 5);
878        for i in 0..5 {
879            assert!(items.contains(&PeerId([i; 32])));
880        }
881
882        let items = queue.pop_multiple();
883        assert!(items.is_none());
884
885        // Add
886        let added = queue.push(&PeerId([0; 32]));
887        assert!(added);
888        assert_eq!(queue.len(), 1);
889        assert!(!queue.is_empty());
890        assert!(!queue.is_full());
891
892        // Pop
893        let items = queue.pop_multiple().unwrap();
894        assert!(queue.is_empty());
895        assert!(!queue.is_full());
896        assert_eq!(items.len(), 1);
897        assert!(items.contains(&PeerId([0; 32])));
898    }
899}