1use std::borrow::Borrow;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::time::Duration;
5
6use anyhow::Result;
7use arc_swap::ArcSwapOption;
8use bytes::{Bytes, BytesMut};
9use indexmap::{IndexMap, IndexSet};
10use parking_lot::{Mutex, RwLock, RwLockReadGuard};
11use rand::Rng;
12use tokio::sync::Notify;
13use tycho_util::futures::BoxFutureOrNoop;
14use tycho_util::{FastDashSet, FastHasherState};
15
16use crate::dht::{PeerResolver, PeerResolverHandle};
17use crate::network::Network;
18use crate::overlay::OverlayId;
19use crate::overlay::metrics::Metrics;
20use crate::proto::overlay::{PublicEntry, PublicEntryToSign, rpc};
21use crate::types::{BoxService, PeerId, Request, Response, Service, ServiceExt, ServiceRequest};
22use crate::util::NetworkExt;
23
24pub struct PublicOverlayBuilder {
25 overlay_id: OverlayId,
26 min_capacity: usize,
27 entry_ttl: Duration,
28 banned_peer_ids: FastDashSet<PeerId>,
29 peer_resolver: Option<PeerResolver>,
30 name: Option<&'static str>,
31}
32
33impl PublicOverlayBuilder {
34 pub fn with_min_capacity(mut self, min_capacity: usize) -> Self {
40 self.min_capacity = min_capacity;
41 self
42 }
43
44 pub fn with_entry_ttl(mut self, entry_ttl: Duration) -> Self {
48 self.entry_ttl = entry_ttl;
49 self
50 }
51
52 pub fn with_banned_peers<I>(mut self, banned_peers: I) -> Self
54 where
55 I: IntoIterator,
56 I::Item: Borrow<PeerId>,
57 {
58 self.banned_peer_ids
59 .extend(banned_peers.into_iter().map(|id| *id.borrow()));
60 self
61 }
62
63 pub fn with_peer_resolver(mut self, peer_resolver: PeerResolver) -> Self {
67 self.peer_resolver = Some(peer_resolver);
68 self
69 }
70
71 pub fn named(mut self, name: &'static str) -> Self {
73 self.name = Some(name);
74 self
75 }
76
77 pub fn build<S>(self, service: S) -> PublicOverlay
78 where
79 S: Send + Sync + 'static,
80 S: Service<ServiceRequest, QueryResponse = Response>,
81 {
82 const UNRESOLVED_QUEUE_CAPACITY: usize = 5; let request_prefix = tl_proto::serialize(rpc::Prefix {
85 overlay_id: self.overlay_id.as_bytes(),
86 });
87
88 let entries = PublicOverlayEntries {
89 items: Default::default(),
90 };
91
92 let entry_ttl_sec = self.entry_ttl.as_secs().try_into().unwrap_or(u32::MAX);
93
94 PublicOverlay {
95 inner: Arc::new(Inner {
96 overlay_id: self.overlay_id,
97 min_capacity: self.min_capacity,
98 entry_ttl_sec,
99 peer_resolver: self.peer_resolver,
100 entries: RwLock::new(entries),
101 entries_added: Notify::new(),
102 entries_changed: Notify::new(),
103 entries_removed: Notify::new(),
104 entry_count: AtomicUsize::new(0),
105 own_signed_entry: Default::default(),
106 unknown_peers_queue: UnknownPeersQueue::with_capacity(UNRESOLVED_QUEUE_CAPACITY),
107 banned_peer_ids: self.banned_peer_ids,
108 service: service.boxed(),
109 request_prefix: request_prefix.into_boxed_slice(),
110 metrics: self
111 .name
112 .map(|label| Metrics::new("tycho_public_overlay", label))
113 .unwrap_or_default(),
114 }),
115 }
116 }
117}
118
119#[derive(Clone)]
120#[repr(transparent)]
121pub struct PublicOverlay {
122 inner: Arc<Inner>,
123}
124
125impl PublicOverlay {
126 pub fn builder(overlay_id: OverlayId) -> PublicOverlayBuilder {
127 PublicOverlayBuilder {
128 overlay_id,
129 min_capacity: 100,
130 entry_ttl: Duration::from_secs(3600),
131 banned_peer_ids: Default::default(),
132 peer_resolver: None,
133 name: None,
134 }
135 }
136
137 #[inline]
138 pub fn overlay_id(&self) -> &OverlayId {
139 &self.inner.overlay_id
140 }
141
142 pub fn entry_ttl_sec(&self) -> u32 {
143 self.inner.entry_ttl_sec
144 }
145
146 pub fn peer_resolver(&self) -> &Option<PeerResolver> {
147 &self.inner.peer_resolver
148 }
149
150 pub fn unknown_peers_queue(&self) -> &UnknownPeersQueue {
151 &self.inner.unknown_peers_queue
152 }
153
154 pub async fn query(
155 &self,
156 network: &Network,
157 peer_id: &PeerId,
158 mut request: Request,
159 ) -> Result<Response> {
160 self.inner.metrics.record_tx(request.body.len());
161 self.prepend_prefix_to_body(&mut request.body);
162 network.query(peer_id, request).await
163 }
164
165 pub async fn send(
166 &self,
167 network: &Network,
168 peer_id: &PeerId,
169 mut request: Request,
170 ) -> Result<()> {
171 self.inner.metrics.record_tx(request.body.len());
172 self.prepend_prefix_to_body(&mut request.body);
173 network.send(peer_id, request).await
174 }
175
176 pub fn ban_peer(&self, peer_id: PeerId) -> bool {
180 self.inner.banned_peer_ids.insert(peer_id)
181 }
182
183 pub fn unban_peer(&self, peer_id: &PeerId) -> bool {
187 self.inner.banned_peer_ids.remove(peer_id).is_some()
188 }
189
190 pub fn read_entries(&self) -> PublicOverlayEntriesReadGuard<'_> {
191 PublicOverlayEntriesReadGuard {
192 entries: self.inner.entries.read(),
193 }
194 }
195
196 pub fn entires_added(&self) -> &Notify {
198 &self.inner.entries_added
199 }
200
201 pub fn entries_changed(&self) -> &Notify {
203 &self.inner.entries_changed
204 }
205
206 pub fn entries_removed(&self) -> &Notify {
207 &self.inner.entries_removed
208 }
209
210 pub fn own_signed_entry(&self) -> Option<Arc<PublicEntry>> {
212 self.inner.own_signed_entry.load_full()
213 }
214
215 pub(crate) fn set_own_signed_entry(&self, entry: Arc<PublicEntry>) {
216 self.inner.own_signed_entry.store(Some(entry));
217 }
218
219 pub(crate) fn handle_query(&self, req: ServiceRequest) -> BoxFutureOrNoop<Option<Response>> {
220 self.inner.metrics.record_rx(req.body.len());
221 if self.check_peer_id(&req.metadata.peer_id) {
222 BoxFutureOrNoop::future(self.inner.service.on_query(req))
223 } else {
224 BoxFutureOrNoop::Noop
225 }
226 }
227
228 pub(crate) fn handle_message(&self, req: ServiceRequest) -> BoxFutureOrNoop<()> {
229 self.inner.metrics.record_rx(req.body.len());
230 if self.check_peer_id(&req.metadata.peer_id) {
231 BoxFutureOrNoop::future(self.inner.service.on_message(req))
232 } else {
233 BoxFutureOrNoop::Noop
234 }
235 }
236
237 fn check_peer_id(&self, peer_id: &PeerId) -> bool {
238 if self.inner.banned_peer_ids.contains(peer_id) {
240 return false;
242 }
243
244 if !self.inner.unknown_peers_queue.is_full() && !self.inner.entries.read().contains(peer_id)
248 {
249 if self.inner.unknown_peers_queue.push(peer_id) {
251 tracing::debug!(%peer_id, "found new unknown peer to resolve");
252 }
253 }
254
255 true
256 }
257
258 pub(crate) fn add_untrusted_entries(
262 &self,
263 local_id: &PeerId,
264 entries: &[Arc<PublicEntry>],
265 now: u32,
266 ) -> bool {
267 if entries.is_empty() {
268 return false;
269 }
270
271 let this = self.inner.as_ref();
272
273 let to_add = entries.len();
276 let mut entry_count = this.entry_count.load(Ordering::Acquire);
277 let to_add = loop {
278 let to_add = match this.min_capacity.checked_sub(entry_count) {
279 Some(capacity) if capacity > 0 => std::cmp::min(to_add, capacity),
280 _ => return false,
281 };
282
283 let res = this.entry_count.compare_exchange_weak(
284 entry_count,
285 entry_count + to_add,
286 Ordering::Release,
287 Ordering::Acquire,
288 );
289 match res {
290 Ok(_) => break to_add,
291 Err(n) => entry_count = n,
292 }
293 };
294
295 let mut is_valid = vec![false; entries.len()];
297 let mut has_valid = false;
298
299 for (entry, is_valid) in std::iter::zip(entries, is_valid.iter_mut()) {
301 if entry.is_expired(now, this.entry_ttl_sec)
302 || self.inner.banned_peer_ids.contains(&entry.peer_id)
303 || entry.peer_id == local_id
304 {
305 continue;
307 }
308
309 let Some(pubkey) = entry.peer_id.as_public_key() else {
310 continue;
312 };
313
314 if !pubkey.verify_tl(
315 PublicEntryToSign {
316 overlay_id: this.overlay_id.as_bytes(),
317 peer_id: &entry.peer_id,
318 created_at: entry.created_at,
319 },
320 &entry.signature,
321 ) {
322 continue;
324 }
325
326 *is_valid = true;
329 has_valid = true;
330 }
331
332 let mut added = 0;
338 let mut changed = false;
339 if has_valid {
340 let mut stored = this.entries.write();
341 for (entry, is_valid) in std::iter::zip(entries, is_valid) {
342 if !is_valid {
343 continue;
344 }
345
346 let status = stored.insert(&this.peer_resolver, entry);
347 changed |= status.is_changed();
348 added += status.is_added() as usize;
349
350 if added >= to_add {
351 break;
352 }
353 }
354 }
355
356 if added < to_add {
358 this.entry_count
359 .fetch_sub(to_add - added, Ordering::Release);
360 }
361
362 if added > 0 {
363 this.entries_added.notify_waiters();
364 }
365 if changed {
366 this.entries_changed.notify_waiters();
367 }
368
369 changed || added > 0
370 }
371
372 pub(crate) fn remove_invalid_entries(&self, now: u32) {
374 let this = self.inner.as_ref();
375
376 let mut removed = 0;
377 let mut entries = this.entries.write();
378 entries.retain(|item| {
379 let retain = !item.entry.is_expired(now, this.entry_ttl_sec)
380 && !this.banned_peer_ids.contains(&item.entry.peer_id);
381
382 if !retain {
383 removed += 1;
384 }
385
386 retain
387 });
388
389 if removed > 0 {
390 this.entry_count.fetch_sub(removed, Ordering::Release);
391 self.inner.entries_removed.notify_waiters();
392 }
393 }
394
395 fn prepend_prefix_to_body(&self, body: &mut Bytes) {
396 let this = self.inner.as_ref();
397
398 let mut res = BytesMut::with_capacity(this.request_prefix.len() + body.len());
400 res.extend_from_slice(&this.request_prefix);
401 res.extend_from_slice(body);
402 *body = res.freeze();
403 }
404}
405
406impl std::fmt::Debug for PublicOverlay {
407 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
408 f.debug_struct("PublicOverlay")
409 .field("overlay_id", &self.inner.overlay_id)
410 .finish()
411 }
412}
413
414struct Inner {
415 overlay_id: OverlayId,
416 min_capacity: usize,
417 entry_ttl_sec: u32,
418 peer_resolver: Option<PeerResolver>,
419 entries: RwLock<PublicOverlayEntries>,
420 entry_count: AtomicUsize,
421 entries_added: Notify,
422 entries_changed: Notify,
423 entries_removed: Notify,
424 own_signed_entry: ArcSwapOption<PublicEntry>,
425 unknown_peers_queue: UnknownPeersQueue,
426 banned_peer_ids: FastDashSet<PeerId>,
427 service: BoxService<ServiceRequest, Response>,
428 request_prefix: Box<[u8]>,
429 metrics: Metrics,
430}
431
432pub struct PublicOverlayEntries {
433 items: OverlayItems,
434}
435
436impl PublicOverlayEntries {
437 pub fn is_empty(&self) -> bool {
439 self.items.is_empty()
440 }
441
442 pub fn len(&self) -> usize {
444 self.items.len()
445 }
446
447 pub fn contains(&self, peer_id: &PeerId) -> bool {
449 self.items.contains_key(peer_id)
450 }
451
452 pub fn iter(&self) -> indexmap::map::Values<'_, PeerId, PublicOverlayEntryData> {
456 self.items.values()
457 }
458
459 pub fn choose<R>(&self, rng: &mut R) -> Option<&PublicOverlayEntryData>
462 where
463 R: Rng + ?Sized,
464 {
465 let index = rng.random_range(0..self.items.len());
466 let (_, value) = self.items.get_index(index)?;
467 Some(value)
468 }
469
470 pub fn choose_multiple<R>(
473 &self,
474 rng: &mut R,
475 n: usize,
476 ) -> ChooseMultiplePublicOverlayEntries<'_>
477 where
478 R: Rng + ?Sized,
479 {
480 let len = self.items.len();
481 ChooseMultiplePublicOverlayEntries {
482 items: &self.items,
483 indices: rand::seq::index::sample(rng, len, n.min(len)).into_iter(),
484 }
485 }
486
487 pub fn choose_all<R>(&self, rng: &mut R) -> ChooseMultiplePublicOverlayEntries<'_>
490 where
491 R: Rng + ?Sized,
492 {
493 self.choose_multiple(rng, self.items.len())
494 }
495
496 fn insert(&mut self, peer_resolver: &Option<PeerResolver>, item: &PublicEntry) -> UpdateStatus {
497 match self.items.entry(item.peer_id) {
498 indexmap::map::Entry::Vacant(entry) => {
500 let resolver_handle = peer_resolver.as_ref().map_or_else(
501 || PeerResolverHandle::new_noop(&item.peer_id),
502 |resolver| resolver.insert(&item.peer_id, false),
503 );
504
505 entry.insert(PublicOverlayEntryData {
506 entry: Arc::new(item.clone()),
507 resolver_handle,
508 });
509
510 UpdateStatus::Added
511 }
512 indexmap::map::Entry::Occupied(mut entry) => {
514 let existing = entry.get_mut();
515 if existing.entry.created_at >= item.created_at {
516 return UpdateStatus::Skipped;
517 }
518
519 match Arc::get_mut(&mut existing.entry) {
521 Some(existing) => existing.clone_from(item),
522 None => existing.entry = Arc::new(item.clone()),
523 }
524 UpdateStatus::Updated
525 }
526 }
527 }
528
529 fn retain<F>(&mut self, mut f: F)
530 where
531 F: FnMut(&PublicOverlayEntryData) -> bool,
532 {
533 self.items.retain(|_, item| f(item));
534 }
535}
536
537#[derive(Clone)]
538pub struct PublicOverlayEntryData {
539 pub entry: Arc<PublicEntry>,
540 pub resolver_handle: PeerResolverHandle,
541}
542
543impl PublicOverlayEntryData {
544 pub fn is_expired(&self, now: u32, ttl: u32) -> bool {
545 self.entry.is_expired(now, ttl)
546 }
547
548 pub fn expires_at(&self, ttl: u32) -> u32 {
549 self.entry.created_at.saturating_add(ttl)
550 }
551}
552
553pub struct PublicOverlayEntriesReadGuard<'a> {
554 entries: RwLockReadGuard<'a, PublicOverlayEntries>,
555}
556
557impl std::ops::Deref for PublicOverlayEntriesReadGuard<'_> {
558 type Target = PublicOverlayEntries;
559
560 #[inline]
561 fn deref(&self) -> &Self::Target {
562 &self.entries
563 }
564}
565
566pub struct UnknownPeersQueue {
567 peer_ids: Mutex<IndexSet<PeerId, FastHasherState>>,
568 peer_id_count: AtomicUsize,
569 capacity: usize,
570}
571
572impl UnknownPeersQueue {
573 pub fn with_capacity(capacity: usize) -> Self {
574 Self {
575 peer_ids: Mutex::new(IndexSet::with_capacity_and_hasher(
576 capacity,
577 Default::default(),
578 )),
579 peer_id_count: AtomicUsize::new(0),
580 capacity,
581 }
582 }
583
584 pub fn is_empty(&self) -> bool {
585 self.len() == 0
586 }
587
588 pub fn is_full(&self) -> bool {
589 self.len() >= self.capacity
590 }
591
592 pub fn len(&self) -> usize {
593 self.peer_id_count.load(Ordering::Acquire)
594 }
595
596 pub fn push(&self, peer_id: &PeerId) -> bool {
599 let mut peer_ids = self.peer_ids.lock();
603 if peer_ids.len() >= self.capacity {
604 return false;
605 }
606
607 let added = peer_ids.insert(*peer_id);
608 self.peer_id_count.fetch_add(added as _, Ordering::Release);
609 added
610 }
611
612 pub fn pop_multiple(&self) -> Option<IndexSet<PeerId, FastHasherState>> {
614 if self.is_empty() {
615 return None;
616 }
617
618 let mut peer_ids = self.peer_ids.lock();
619 self.peer_id_count.store(0, Ordering::Release);
620 let res = std::mem::take(&mut *peer_ids);
621 if res.is_empty() { None } else { Some(res) }
622 }
623}
624
625#[derive(Debug, Clone, Copy, PartialEq, Eq)]
626enum UpdateStatus {
627 Skipped,
628 Updated,
629 Added,
630}
631
632impl UpdateStatus {
633 fn is_changed(self) -> bool {
634 matches!(self, Self::Updated | Self::Added)
635 }
636
637 fn is_added(self) -> bool {
638 matches!(self, Self::Added)
639 }
640}
641
642pub struct ChooseMultiplePublicOverlayEntries<'a> {
643 items: &'a OverlayItems,
644 indices: rand::seq::index::IndexVecIntoIter,
645}
646
647impl<'a> Iterator for ChooseMultiplePublicOverlayEntries<'a> {
648 type Item = &'a PublicOverlayEntryData;
649
650 fn next(&mut self) -> Option<Self::Item> {
651 self.indices.next().and_then(|i| {
652 let (_, value) = self.items.get_index(i)?;
653 Some(value)
654 })
655 }
656
657 fn size_hint(&self) -> (usize, Option<usize>) {
658 (self.indices.len(), Some(self.indices.len()))
659 }
660}
661
662impl ExactSizeIterator for ChooseMultiplePublicOverlayEntries<'_> {
663 fn len(&self) -> usize {
664 self.indices.len()
665 }
666}
667
668type OverlayItems = IndexMap<PeerId, PublicOverlayEntryData, FastHasherState>;
669
670#[cfg(test)]
671mod tests {
672 use tycho_crypto::ed25519;
673 use tycho_util::time::now_sec;
674
675 use super::*;
676
677 fn generate_public_entry(overlay: &PublicOverlay, now: u32) -> Arc<PublicEntry> {
678 let keypair = rand::random::<ed25519::KeyPair>();
679 let peer_id: PeerId = keypair.public_key.into();
680 let signature = keypair.sign_tl(crate::proto::overlay::PublicEntryToSign {
681 overlay_id: overlay.overlay_id().as_bytes(),
682 peer_id: &peer_id,
683 created_at: now,
684 });
685 Arc::new(PublicEntry {
686 peer_id,
687 created_at: now,
688 signature: Box::new(signature),
689 })
690 }
691
692 fn generate_invalid_public_entry(now: u32) -> Arc<PublicEntry> {
693 let keypair = rand::random::<ed25519::KeyPair>();
694 let peer_id: PeerId = keypair.public_key.into();
695 Arc::new(PublicEntry {
696 peer_id,
697 created_at: now,
698 signature: Box::new([0; 64]),
699 })
700 }
701
702 fn generate_public_entries(
703 overlay: &PublicOverlay,
704 now: u32,
705 n: usize,
706 ) -> Vec<Arc<PublicEntry>> {
707 (0..n)
708 .map(|_| generate_public_entry(overlay, now))
709 .collect()
710 }
711
712 fn count_entries(overlay: &PublicOverlay) -> usize {
713 let tracked_count = overlay.inner.entry_count.load(Ordering::Acquire);
714 let guard = overlay.read_entries();
715 assert_eq!(guard.entries.items.len(), tracked_count);
716 tracked_count
717 }
718
719 fn make_overlay_with_min_capacity(min_capacity: usize) -> PublicOverlay {
720 PublicOverlay::builder(rand::random())
721 .with_min_capacity(min_capacity)
722 .build(crate::service_query_fn(|_| {
723 futures_util::future::ready(None)
724 }))
725 }
726
727 #[test]
728 fn min_capacity_works_with_single_thread() {
729 let now = now_sec();
730 let local_id: PeerId = rand::random();
731
732 {
734 let overlay = make_overlay_with_min_capacity(10);
735 let entries = generate_public_entries(&overlay, now, 10);
736
737 overlay.add_untrusted_entries(&local_id, &entries[..5], now);
738 assert_eq!(count_entries(&overlay), 5);
739
740 overlay.add_untrusted_entries(&local_id, &entries[5..], now);
741 assert_eq!(count_entries(&overlay), 10);
742 }
743
744 {
746 let overlay = make_overlay_with_min_capacity(10);
747 let entries = generate_public_entries(&overlay, now, 10);
748 overlay.add_untrusted_entries(&local_id, &entries, now);
749 assert_eq!(count_entries(&overlay), 10);
750 }
751
752 {
754 let overlay = make_overlay_with_min_capacity(10);
755 let entries = generate_public_entries(&overlay, now, 20);
756 overlay.add_untrusted_entries(&local_id, &entries, now);
757 assert_eq!(count_entries(&overlay), 10);
758 }
759
760 {
762 let overlay = make_overlay_with_min_capacity(0);
763 let entries = generate_public_entries(&overlay, now, 10);
764 overlay.add_untrusted_entries(&local_id, &entries, now);
765 assert_eq!(count_entries(&overlay), 0);
766 }
767
768 {
770 let overlay = make_overlay_with_min_capacity(10);
771 let entries = (0..10)
772 .map(|_| generate_invalid_public_entry(now))
773 .collect::<Vec<_>>();
774 overlay.add_untrusted_entries(&local_id, &entries, now);
775 assert_eq!(count_entries(&overlay), 0);
776 }
777
778 {
780 let overlay = make_overlay_with_min_capacity(10);
781 let entries = [
782 generate_invalid_public_entry(now),
783 generate_public_entry(&overlay, now),
784 generate_invalid_public_entry(now),
785 generate_public_entry(&overlay, now),
786 generate_invalid_public_entry(now),
787 generate_public_entry(&overlay, now),
788 generate_invalid_public_entry(now),
789 generate_public_entry(&overlay, now),
790 generate_invalid_public_entry(now),
791 generate_public_entry(&overlay, now),
792 ];
793 overlay.add_untrusted_entries(&local_id, &entries, now);
794 assert_eq!(count_entries(&overlay), 5);
795 }
796
797 {
799 let overlay = make_overlay_with_min_capacity(3);
800 let entries = [
801 generate_invalid_public_entry(now),
802 generate_invalid_public_entry(now),
803 generate_invalid_public_entry(now),
804 generate_invalid_public_entry(now),
805 generate_invalid_public_entry(now),
806 generate_public_entry(&overlay, now),
807 generate_public_entry(&overlay, now),
808 generate_public_entry(&overlay, now),
809 generate_public_entry(&overlay, now),
810 generate_public_entry(&overlay, now),
811 ];
812 overlay.add_untrusted_entries(&local_id, &entries, now);
813 assert_eq!(count_entries(&overlay), 3);
814 }
815 }
816
817 #[test]
818 fn min_capacity_works_with_multi_thread() {
819 let now = now_sec();
820 let local_id: PeerId = rand::random();
821
822 let overlay = make_overlay_with_min_capacity(201);
823 let entries = generate_public_entries(&overlay, now, 7 * 3 * 10);
824
825 std::thread::scope(|s| {
826 for entries in entries.chunks_exact(7 * 3) {
827 s.spawn(|| {
828 for entries in entries.chunks_exact(7) {
829 overlay.add_untrusted_entries(&local_id, entries, now);
830 }
831 });
832 }
833 });
834
835 assert_eq!(count_entries(&overlay), 201);
836 }
837
838 #[test]
839 fn unknown_peers_queue() {
840 let queue = UnknownPeersQueue::with_capacity(5);
841 assert!(queue.is_empty());
842 assert!(!queue.is_full());
843
844 let added = queue.push(&PeerId([0; 32]));
846 assert!(added);
847 assert_eq!(queue.len(), 1);
848 assert!(!queue.is_empty());
849 assert!(!queue.is_full());
850
851 let added = queue.push(&PeerId([0; 32]));
852 assert!(!added);
853 assert_eq!(queue.len(), 1);
854
855 for i in 1..=3 {
856 let added = queue.push(&PeerId([i; 32]));
857 assert!(added);
858 assert_eq!(queue.len(), i as usize + 1);
859 assert!(!queue.is_empty());
860 assert!(!queue.is_full());
861 }
862
863 let added = queue.push(&PeerId([4; 32]));
864 assert!(added);
865 assert_eq!(queue.len(), 5);
866 assert!(queue.is_full());
867
868 let added = queue.push(&PeerId([5; 32]));
869 assert!(!added);
870 assert_eq!(queue.len(), 5);
871 assert!(queue.is_full());
872
873 let items = queue.pop_multiple().unwrap();
875 assert!(queue.is_empty());
876 assert!(!queue.is_full());
877 assert_eq!(items.len(), 5);
878 for i in 0..5 {
879 assert!(items.contains(&PeerId([i; 32])));
880 }
881
882 let items = queue.pop_multiple();
883 assert!(items.is_none());
884
885 let added = queue.push(&PeerId([0; 32]));
887 assert!(added);
888 assert_eq!(queue.len(), 1);
889 assert!(!queue.is_empty());
890 assert!(!queue.is_full());
891
892 let items = queue.pop_multiple().unwrap();
894 assert!(queue.is_empty());
895 assert!(!queue.is_full());
896 assert_eq!(items.len(), 1);
897 assert!(items.contains(&PeerId([0; 32])));
898 }
899}