1use std::borrow::Borrow;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::time::Duration;
5
6use anyhow::Result;
7use arc_swap::ArcSwapOption;
8use bytes::{Bytes, BytesMut};
9use indexmap::{IndexMap, IndexSet};
10use parking_lot::{Mutex, RwLock, RwLockReadGuard};
11use rand::Rng;
12use tokio::sync::Notify;
13use tycho_util::futures::BoxFutureOrNoop;
14use tycho_util::{FastDashSet, FastHasherState};
15
16use crate::dht::{PeerResolver, PeerResolverHandle};
17use crate::network::Network;
18use crate::overlay::OverlayId;
19use crate::overlay::metrics::Metrics;
20use crate::proto::overlay::{PublicEntry, PublicEntryToSign, rpc};
21use crate::types::{BoxService, PeerId, Request, Response, Service, ServiceExt, ServiceRequest};
22use crate::util::NetworkExt;
23
24pub struct PublicOverlayBuilder {
25 overlay_id: OverlayId,
26 min_capacity: usize,
27 entry_ttl: Duration,
28 banned_peer_ids: FastDashSet<PeerId>,
29 peer_resolver: Option<PeerResolver>,
30 name: Option<&'static str>,
31}
32
33impl PublicOverlayBuilder {
34 pub fn with_min_capacity(mut self, min_capacity: usize) -> Self {
40 self.min_capacity = min_capacity;
41 self
42 }
43
44 pub fn with_entry_ttl(mut self, entry_ttl: Duration) -> Self {
48 self.entry_ttl = entry_ttl;
49 self
50 }
51
52 pub fn with_banned_peers<I>(mut self, banned_peers: I) -> Self
54 where
55 I: IntoIterator,
56 I::Item: Borrow<PeerId>,
57 {
58 self.banned_peer_ids
59 .extend(banned_peers.into_iter().map(|id| *id.borrow()));
60 self
61 }
62
63 pub fn with_peer_resolver(mut self, peer_resolver: PeerResolver) -> Self {
67 self.peer_resolver = Some(peer_resolver);
68 self
69 }
70
71 pub fn named(mut self, name: &'static str) -> Self {
73 self.name = Some(name);
74 self
75 }
76
77 pub fn build<S>(self, service: S) -> PublicOverlay
78 where
79 S: Send + Sync + 'static,
80 S: Service<ServiceRequest, QueryResponse = Response>,
81 {
82 const UNRESOLVED_QUEUE_CAPACITY: usize = 5; let request_prefix = tl_proto::serialize(rpc::Prefix {
85 overlay_id: self.overlay_id.as_bytes(),
86 });
87
88 let entries = PublicOverlayEntries {
89 items: Default::default(),
90 };
91
92 let entry_ttl_sec = self.entry_ttl.as_secs().try_into().unwrap_or(u32::MAX);
93
94 PublicOverlay {
95 inner: Arc::new(Inner {
96 overlay_id: self.overlay_id,
97 min_capacity: self.min_capacity,
98 entry_ttl_sec,
99 peer_resolver: self.peer_resolver,
100 entries: RwLock::new(entries),
101 entries_added: Notify::new(),
102 entries_changed: Notify::new(),
103 entries_removed: Notify::new(),
104 entry_count: AtomicUsize::new(0),
105 own_signed_entry: Default::default(),
106 unknown_peers_queue: UnknownPeersQueue::with_capacity(UNRESOLVED_QUEUE_CAPACITY),
107 banned_peer_ids: self.banned_peer_ids,
108 service: service.boxed(),
109 request_prefix: request_prefix.into_boxed_slice(),
110 metrics: self
111 .name
112 .map(|label| Metrics::new("tycho_public_overlay", label))
113 .unwrap_or_default(),
114 }),
115 }
116 }
117}
118
119#[derive(Clone)]
120#[repr(transparent)]
121pub struct PublicOverlay {
122 inner: Arc<Inner>,
123}
124
125impl PublicOverlay {
126 pub fn builder(overlay_id: OverlayId) -> PublicOverlayBuilder {
127 PublicOverlayBuilder {
128 overlay_id,
129 min_capacity: 100,
130 entry_ttl: Duration::from_secs(3600),
131 banned_peer_ids: Default::default(),
132 peer_resolver: None,
133 name: None,
134 }
135 }
136
137 #[inline]
138 pub fn overlay_id(&self) -> &OverlayId {
139 &self.inner.overlay_id
140 }
141
142 pub fn entry_ttl_sec(&self) -> u32 {
143 self.inner.entry_ttl_sec
144 }
145
146 pub fn peer_resolver(&self) -> &Option<PeerResolver> {
147 &self.inner.peer_resolver
148 }
149
150 pub fn unknown_peers_queue(&self) -> &UnknownPeersQueue {
151 &self.inner.unknown_peers_queue
152 }
153
154 pub async fn query(
155 &self,
156 network: &Network,
157 peer_id: &PeerId,
158 mut request: Request,
159 ) -> Result<Response> {
160 self.inner.metrics.record_tx(request.body.len());
161 self.prepend_prefix_to_body(&mut request.body);
162 network.query(peer_id, request).await
163 }
164
165 pub async fn send(
166 &self,
167 network: &Network,
168 peer_id: &PeerId,
169 mut request: Request,
170 ) -> Result<()> {
171 self.inner.metrics.record_tx(request.body.len());
172 self.prepend_prefix_to_body(&mut request.body);
173 network.send(peer_id, request).await
174 }
175
176 pub fn ban_peer(&self, peer_id: PeerId) -> bool {
180 self.inner.banned_peer_ids.insert(peer_id)
181 }
182
183 pub fn unban_peer(&self, peer_id: &PeerId) -> bool {
187 self.inner.banned_peer_ids.remove(peer_id).is_some()
188 }
189
190 pub fn read_entries(&self) -> PublicOverlayEntriesReadGuard<'_> {
191 PublicOverlayEntriesReadGuard {
192 entries: self.inner.entries.read(),
193 }
194 }
195
196 pub fn entires_added(&self) -> &Notify {
198 &self.inner.entries_added
199 }
200
201 pub fn entries_changed(&self) -> &Notify {
203 &self.inner.entries_changed
204 }
205
206 pub fn entries_removed(&self) -> &Notify {
207 &self.inner.entries_removed
208 }
209
210 pub fn own_signed_entry(&self) -> Option<Arc<PublicEntry>> {
212 self.inner.own_signed_entry.load_full()
213 }
214
215 pub(crate) fn set_own_signed_entry(&self, entry: Arc<PublicEntry>) {
216 self.inner.own_signed_entry.store(Some(entry));
217 }
218
219 pub(crate) fn handle_query(&self, req: ServiceRequest) -> BoxFutureOrNoop<Option<Response>> {
220 self.inner.metrics.record_rx(req.body.len());
221 if self.check_peer_id(&req.metadata.peer_id) {
222 BoxFutureOrNoop::future(self.inner.service.on_query(req))
223 } else {
224 BoxFutureOrNoop::Noop
225 }
226 }
227
228 pub(crate) fn handle_message(&self, req: ServiceRequest) -> BoxFutureOrNoop<()> {
229 self.inner.metrics.record_rx(req.body.len());
230 if self.check_peer_id(&req.metadata.peer_id) {
231 BoxFutureOrNoop::future(self.inner.service.on_message(req))
232 } else {
233 BoxFutureOrNoop::Noop
234 }
235 }
236
237 fn check_peer_id(&self, peer_id: &PeerId) -> bool {
238 if self.inner.banned_peer_ids.contains(peer_id) {
240 return false;
242 }
243
244 if !self.inner.unknown_peers_queue.is_full() && !self.inner.entries.read().contains(peer_id)
248 {
249 if self.inner.unknown_peers_queue.push(peer_id) {
251 tracing::debug!(%peer_id, "found new unknown peer to resolve");
252 }
253 }
254
255 true
256 }
257
258 pub(crate) fn add_untrusted_entries(
262 &self,
263 local_id: &PeerId,
264 entries: &[Arc<PublicEntry>],
265 now: u32,
266 ) -> bool {
267 if entries.is_empty() {
268 return false;
269 }
270
271 let this = self.inner.as_ref();
272
273 let to_add = entries.len();
276 let mut entry_count = this.entry_count.load(Ordering::Acquire);
277 let to_add = loop {
278 let to_add = match this.min_capacity.checked_sub(entry_count) {
279 Some(capacity) if capacity > 0 => std::cmp::min(to_add, capacity),
280 _ => return false,
281 };
282
283 let res = this.entry_count.compare_exchange_weak(
284 entry_count,
285 entry_count + to_add,
286 Ordering::Release,
287 Ordering::Acquire,
288 );
289 match res {
290 Ok(_) => break to_add,
291 Err(n) => entry_count = n,
292 }
293 };
294
295 let mut is_valid = vec![false; entries.len()];
297 let mut has_valid = false;
298
299 for (entry, is_valid) in std::iter::zip(entries, is_valid.iter_mut()) {
301 if entry.is_expired(now, this.entry_ttl_sec)
302 || self.inner.banned_peer_ids.contains(&entry.peer_id)
303 || entry.peer_id == local_id
304 {
305 continue;
307 }
308
309 let Some(pubkey) = entry.peer_id.as_public_key() else {
310 continue;
312 };
313
314 if !pubkey.verify_tl(
315 PublicEntryToSign {
316 overlay_id: this.overlay_id.as_bytes(),
317 peer_id: &entry.peer_id,
318 created_at: entry.created_at,
319 },
320 &entry.signature,
321 ) {
322 continue;
324 }
325
326 *is_valid = true;
329 has_valid = true;
330 }
331
332 let mut added = 0;
338 let mut changed = false;
339 if has_valid {
340 let mut stored = this.entries.write();
341 for (entry, is_valid) in std::iter::zip(entries, is_valid) {
342 if !is_valid {
343 continue;
344 }
345
346 let status = stored.insert(&this.peer_resolver, entry);
347 changed |= status.is_changed();
348 added += status.is_added() as usize;
349
350 if added >= to_add {
351 break;
352 }
353 }
354 }
355
356 if added < to_add {
358 this.entry_count
359 .fetch_sub(to_add - added, Ordering::Release);
360 }
361
362 if added > 0 {
363 this.entries_added.notify_waiters();
364 }
365 if changed {
366 this.entries_changed.notify_waiters();
367 }
368
369 changed || added > 0
370 }
371
372 pub(crate) fn remove_invalid_entries(&self, now: u32) {
374 let this = self.inner.as_ref();
375
376 let mut should_notify = false;
377 let mut entries = this.entries.write();
378 entries.retain(|item| {
379 let retain = !item.entry.is_expired(now, this.entry_ttl_sec)
380 && !this.banned_peer_ids.contains(&item.entry.peer_id);
381 should_notify |= !retain;
382 retain
383 });
384
385 if should_notify {
386 self.inner.entries_removed.notify_waiters();
387 }
388 }
389
390 fn prepend_prefix_to_body(&self, body: &mut Bytes) {
391 let this = self.inner.as_ref();
392
393 let mut res = BytesMut::with_capacity(this.request_prefix.len() + body.len());
395 res.extend_from_slice(&this.request_prefix);
396 res.extend_from_slice(body);
397 *body = res.freeze();
398 }
399}
400
401impl std::fmt::Debug for PublicOverlay {
402 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
403 f.debug_struct("PublicOverlay")
404 .field("overlay_id", &self.inner.overlay_id)
405 .finish()
406 }
407}
408
409struct Inner {
410 overlay_id: OverlayId,
411 min_capacity: usize,
412 entry_ttl_sec: u32,
413 peer_resolver: Option<PeerResolver>,
414 entries: RwLock<PublicOverlayEntries>,
415 entry_count: AtomicUsize,
416 entries_added: Notify,
417 entries_changed: Notify,
418 entries_removed: Notify,
419 own_signed_entry: ArcSwapOption<PublicEntry>,
420 unknown_peers_queue: UnknownPeersQueue,
421 banned_peer_ids: FastDashSet<PeerId>,
422 service: BoxService<ServiceRequest, Response>,
423 request_prefix: Box<[u8]>,
424 metrics: Metrics,
425}
426
427pub struct PublicOverlayEntries {
428 items: OverlayItems,
429}
430
431impl PublicOverlayEntries {
432 pub fn is_empty(&self) -> bool {
434 self.items.is_empty()
435 }
436
437 pub fn len(&self) -> usize {
439 self.items.len()
440 }
441
442 pub fn contains(&self, peer_id: &PeerId) -> bool {
444 self.items.contains_key(peer_id)
445 }
446
447 pub fn iter(&self) -> indexmap::map::Values<'_, PeerId, PublicOverlayEntryData> {
451 self.items.values()
452 }
453
454 pub fn choose<R>(&self, rng: &mut R) -> Option<&PublicOverlayEntryData>
457 where
458 R: Rng + ?Sized,
459 {
460 let index = rng.random_range(0..self.items.len());
461 let (_, value) = self.items.get_index(index)?;
462 Some(value)
463 }
464
465 pub fn choose_multiple<R>(
468 &self,
469 rng: &mut R,
470 n: usize,
471 ) -> ChooseMultiplePublicOverlayEntries<'_>
472 where
473 R: Rng + ?Sized,
474 {
475 let len = self.items.len();
476 ChooseMultiplePublicOverlayEntries {
477 items: &self.items,
478 indices: rand::seq::index::sample(rng, len, n.min(len)).into_iter(),
479 }
480 }
481
482 pub fn choose_all<R>(&self, rng: &mut R) -> ChooseMultiplePublicOverlayEntries<'_>
485 where
486 R: Rng + ?Sized,
487 {
488 self.choose_multiple(rng, self.items.len())
489 }
490
491 fn insert(&mut self, peer_resolver: &Option<PeerResolver>, item: &PublicEntry) -> UpdateStatus {
492 match self.items.entry(item.peer_id) {
493 indexmap::map::Entry::Vacant(entry) => {
495 let resolver_handle = peer_resolver.as_ref().map_or_else(
496 || PeerResolverHandle::new_noop(&item.peer_id),
497 |resolver| resolver.insert(&item.peer_id, false),
498 );
499
500 entry.insert(PublicOverlayEntryData {
501 entry: Arc::new(item.clone()),
502 resolver_handle,
503 });
504
505 UpdateStatus::Added
506 }
507 indexmap::map::Entry::Occupied(mut entry) => {
509 let existing = entry.get_mut();
510 if existing.entry.created_at >= item.created_at {
511 return UpdateStatus::Skipped;
512 }
513
514 match Arc::get_mut(&mut existing.entry) {
516 Some(existing) => existing.clone_from(item),
517 None => existing.entry = Arc::new(item.clone()),
518 }
519 UpdateStatus::Updated
520 }
521 }
522 }
523
524 fn retain<F>(&mut self, mut f: F)
525 where
526 F: FnMut(&PublicOverlayEntryData) -> bool,
527 {
528 self.items.retain(|_, item| f(item));
529 }
530}
531
532#[derive(Clone)]
533pub struct PublicOverlayEntryData {
534 pub entry: Arc<PublicEntry>,
535 pub resolver_handle: PeerResolverHandle,
536}
537
538impl PublicOverlayEntryData {
539 pub fn is_expired(&self, now: u32, ttl: u32) -> bool {
540 self.entry.is_expired(now, ttl)
541 }
542
543 pub fn expires_at(&self, ttl: u32) -> u32 {
544 self.entry.created_at.saturating_add(ttl)
545 }
546}
547
548pub struct PublicOverlayEntriesReadGuard<'a> {
549 entries: RwLockReadGuard<'a, PublicOverlayEntries>,
550}
551
552impl std::ops::Deref for PublicOverlayEntriesReadGuard<'_> {
553 type Target = PublicOverlayEntries;
554
555 #[inline]
556 fn deref(&self) -> &Self::Target {
557 &self.entries
558 }
559}
560
561pub struct UnknownPeersQueue {
562 peer_ids: Mutex<IndexSet<PeerId, FastHasherState>>,
563 peer_id_count: AtomicUsize,
564 capacity: usize,
565}
566
567impl UnknownPeersQueue {
568 pub fn with_capacity(capacity: usize) -> Self {
569 Self {
570 peer_ids: Mutex::new(IndexSet::with_capacity_and_hasher(
571 capacity,
572 Default::default(),
573 )),
574 peer_id_count: AtomicUsize::new(0),
575 capacity,
576 }
577 }
578
579 pub fn is_empty(&self) -> bool {
580 self.len() == 0
581 }
582
583 pub fn is_full(&self) -> bool {
584 self.len() >= self.capacity
585 }
586
587 pub fn len(&self) -> usize {
588 self.peer_id_count.load(Ordering::Acquire)
589 }
590
591 pub fn push(&self, peer_id: &PeerId) -> bool {
594 let mut peer_ids = self.peer_ids.lock();
598 if peer_ids.len() >= self.capacity {
599 return false;
600 }
601
602 let added = peer_ids.insert(*peer_id);
603 self.peer_id_count.fetch_add(added as _, Ordering::Release);
604 added
605 }
606
607 pub fn pop_multiple(&self) -> Option<IndexSet<PeerId, FastHasherState>> {
609 if self.is_empty() {
610 return None;
611 }
612
613 let mut peer_ids = self.peer_ids.lock();
614 self.peer_id_count.store(0, Ordering::Release);
615 let res = std::mem::take(&mut *peer_ids);
616 if res.is_empty() { None } else { Some(res) }
617 }
618}
619
620#[derive(Debug, Clone, Copy, PartialEq, Eq)]
621enum UpdateStatus {
622 Skipped,
623 Updated,
624 Added,
625}
626
627impl UpdateStatus {
628 fn is_changed(self) -> bool {
629 matches!(self, Self::Updated | Self::Added)
630 }
631
632 fn is_added(self) -> bool {
633 matches!(self, Self::Added)
634 }
635}
636
637pub struct ChooseMultiplePublicOverlayEntries<'a> {
638 items: &'a OverlayItems,
639 indices: rand::seq::index::IndexVecIntoIter,
640}
641
642impl<'a> Iterator for ChooseMultiplePublicOverlayEntries<'a> {
643 type Item = &'a PublicOverlayEntryData;
644
645 fn next(&mut self) -> Option<Self::Item> {
646 self.indices.next().and_then(|i| {
647 let (_, value) = self.items.get_index(i)?;
648 Some(value)
649 })
650 }
651
652 fn size_hint(&self) -> (usize, Option<usize>) {
653 (self.indices.len(), Some(self.indices.len()))
654 }
655}
656
657impl ExactSizeIterator for ChooseMultiplePublicOverlayEntries<'_> {
658 fn len(&self) -> usize {
659 self.indices.len()
660 }
661}
662
663type OverlayItems = IndexMap<PeerId, PublicOverlayEntryData, FastHasherState>;
664
665#[cfg(test)]
666mod tests {
667 use tycho_crypto::ed25519;
668 use tycho_util::time::now_sec;
669
670 use super::*;
671
672 fn generate_public_entry(overlay: &PublicOverlay, now: u32) -> Arc<PublicEntry> {
673 let keypair = rand::random::<ed25519::KeyPair>();
674 let peer_id: PeerId = keypair.public_key.into();
675 let signature = keypair.sign_tl(crate::proto::overlay::PublicEntryToSign {
676 overlay_id: overlay.overlay_id().as_bytes(),
677 peer_id: &peer_id,
678 created_at: now,
679 });
680 Arc::new(PublicEntry {
681 peer_id,
682 created_at: now,
683 signature: Box::new(signature),
684 })
685 }
686
687 fn generate_invalid_public_entry(now: u32) -> Arc<PublicEntry> {
688 let keypair = rand::random::<ed25519::KeyPair>();
689 let peer_id: PeerId = keypair.public_key.into();
690 Arc::new(PublicEntry {
691 peer_id,
692 created_at: now,
693 signature: Box::new([0; 64]),
694 })
695 }
696
697 fn generate_public_entries(
698 overlay: &PublicOverlay,
699 now: u32,
700 n: usize,
701 ) -> Vec<Arc<PublicEntry>> {
702 (0..n)
703 .map(|_| generate_public_entry(overlay, now))
704 .collect()
705 }
706
707 fn count_entries(overlay: &PublicOverlay) -> usize {
708 let tracked_count = overlay.inner.entry_count.load(Ordering::Acquire);
709 let guard = overlay.read_entries();
710 assert_eq!(guard.entries.items.len(), tracked_count);
711 tracked_count
712 }
713
714 fn make_overlay_with_min_capacity(min_capacity: usize) -> PublicOverlay {
715 PublicOverlay::builder(rand::random())
716 .with_min_capacity(min_capacity)
717 .build(crate::service_query_fn(|_| {
718 futures_util::future::ready(None)
719 }))
720 }
721
722 #[test]
723 fn min_capacity_works_with_single_thread() {
724 let now = now_sec();
725 let local_id: PeerId = rand::random();
726
727 {
729 let overlay = make_overlay_with_min_capacity(10);
730 let entries = generate_public_entries(&overlay, now, 10);
731
732 overlay.add_untrusted_entries(&local_id, &entries[..5], now);
733 assert_eq!(count_entries(&overlay), 5);
734
735 overlay.add_untrusted_entries(&local_id, &entries[5..], now);
736 assert_eq!(count_entries(&overlay), 10);
737 }
738
739 {
741 let overlay = make_overlay_with_min_capacity(10);
742 let entries = generate_public_entries(&overlay, now, 10);
743 overlay.add_untrusted_entries(&local_id, &entries, now);
744 assert_eq!(count_entries(&overlay), 10);
745 }
746
747 {
749 let overlay = make_overlay_with_min_capacity(10);
750 let entries = generate_public_entries(&overlay, now, 20);
751 overlay.add_untrusted_entries(&local_id, &entries, now);
752 assert_eq!(count_entries(&overlay), 10);
753 }
754
755 {
757 let overlay = make_overlay_with_min_capacity(0);
758 let entries = generate_public_entries(&overlay, now, 10);
759 overlay.add_untrusted_entries(&local_id, &entries, now);
760 assert_eq!(count_entries(&overlay), 0);
761 }
762
763 {
765 let overlay = make_overlay_with_min_capacity(10);
766 let entries = (0..10)
767 .map(|_| generate_invalid_public_entry(now))
768 .collect::<Vec<_>>();
769 overlay.add_untrusted_entries(&local_id, &entries, now);
770 assert_eq!(count_entries(&overlay), 0);
771 }
772
773 {
775 let overlay = make_overlay_with_min_capacity(10);
776 let entries = [
777 generate_invalid_public_entry(now),
778 generate_public_entry(&overlay, now),
779 generate_invalid_public_entry(now),
780 generate_public_entry(&overlay, now),
781 generate_invalid_public_entry(now),
782 generate_public_entry(&overlay, now),
783 generate_invalid_public_entry(now),
784 generate_public_entry(&overlay, now),
785 generate_invalid_public_entry(now),
786 generate_public_entry(&overlay, now),
787 ];
788 overlay.add_untrusted_entries(&local_id, &entries, now);
789 assert_eq!(count_entries(&overlay), 5);
790 }
791
792 {
794 let overlay = make_overlay_with_min_capacity(3);
795 let entries = [
796 generate_invalid_public_entry(now),
797 generate_invalid_public_entry(now),
798 generate_invalid_public_entry(now),
799 generate_invalid_public_entry(now),
800 generate_invalid_public_entry(now),
801 generate_public_entry(&overlay, now),
802 generate_public_entry(&overlay, now),
803 generate_public_entry(&overlay, now),
804 generate_public_entry(&overlay, now),
805 generate_public_entry(&overlay, now),
806 ];
807 overlay.add_untrusted_entries(&local_id, &entries, now);
808 assert_eq!(count_entries(&overlay), 3);
809 }
810 }
811
812 #[test]
813 fn min_capacity_works_with_multi_thread() {
814 let now = now_sec();
815 let local_id: PeerId = rand::random();
816
817 let overlay = make_overlay_with_min_capacity(201);
818 let entries = generate_public_entries(&overlay, now, 7 * 3 * 10);
819
820 std::thread::scope(|s| {
821 for entries in entries.chunks_exact(7 * 3) {
822 s.spawn(|| {
823 for entries in entries.chunks_exact(7) {
824 overlay.add_untrusted_entries(&local_id, entries, now);
825 }
826 });
827 }
828 });
829
830 assert_eq!(count_entries(&overlay), 201);
831 }
832
833 #[test]
834 fn unknown_peers_queue() {
835 let queue = UnknownPeersQueue::with_capacity(5);
836 assert!(queue.is_empty());
837 assert!(!queue.is_full());
838
839 let added = queue.push(&PeerId([0; 32]));
841 assert!(added);
842 assert_eq!(queue.len(), 1);
843 assert!(!queue.is_empty());
844 assert!(!queue.is_full());
845
846 let added = queue.push(&PeerId([0; 32]));
847 assert!(!added);
848 assert_eq!(queue.len(), 1);
849
850 for i in 1..=3 {
851 let added = queue.push(&PeerId([i; 32]));
852 assert!(added);
853 assert_eq!(queue.len(), i as usize + 1);
854 assert!(!queue.is_empty());
855 assert!(!queue.is_full());
856 }
857
858 let added = queue.push(&PeerId([4; 32]));
859 assert!(added);
860 assert_eq!(queue.len(), 5);
861 assert!(queue.is_full());
862
863 let added = queue.push(&PeerId([5; 32]));
864 assert!(!added);
865 assert_eq!(queue.len(), 5);
866 assert!(queue.is_full());
867
868 let items = queue.pop_multiple().unwrap();
870 assert!(queue.is_empty());
871 assert!(!queue.is_full());
872 assert_eq!(items.len(), 5);
873 for i in 0..5 {
874 assert!(items.contains(&PeerId([i; 32])));
875 }
876
877 let items = queue.pop_multiple();
878 assert!(items.is_none());
879
880 let added = queue.push(&PeerId([0; 32]));
882 assert!(added);
883 assert_eq!(queue.len(), 1);
884 assert!(!queue.is_empty());
885 assert!(!queue.is_full());
886
887 let items = queue.pop_multiple().unwrap();
889 assert!(queue.is_empty());
890 assert!(!queue.is_full());
891 assert_eq!(items.len(), 1);
892 assert!(items.contains(&PeerId([0; 32])));
893 }
894}