1use std::borrow::Borrow;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Result;
7use bytes::{Bytes, BytesMut};
8use indexmap::IndexMap;
9use parking_lot::{RwLock, RwLockReadGuard};
10use rand::Rng;
11use tokio::sync::Notify;
12use tycho_util::futures::BoxFutureOrNoop;
13use tycho_util::{FastDashSet, FastHasherState};
14
15use crate::dht::{PeerResolver, PeerResolverHandle};
16use crate::network::Network;
17use crate::overlay::metrics::Metrics;
18use crate::overlay::OverlayId;
19use crate::proto::overlay::{rpc, PublicEntry, PublicEntryToSign};
20use crate::types::{BoxService, PeerId, Request, Response, Service, ServiceExt, ServiceRequest};
21use crate::util::NetworkExt;
22
23pub struct PublicOverlayBuilder {
24 overlay_id: OverlayId,
25 min_capacity: usize,
26 entry_ttl: Duration,
27 banned_peer_ids: FastDashSet<PeerId>,
28 peer_resolver: Option<PeerResolver>,
29 name: Option<&'static str>,
30}
31
32impl PublicOverlayBuilder {
33 pub fn with_min_capacity(mut self, min_capacity: usize) -> Self {
39 self.min_capacity = min_capacity;
40 self
41 }
42
43 pub fn with_entry_ttl(mut self, entry_ttl: Duration) -> Self {
47 self.entry_ttl = entry_ttl;
48 self
49 }
50
51 pub fn with_banned_peers<I>(mut self, banned_peers: I) -> Self
53 where
54 I: IntoIterator,
55 I::Item: Borrow<PeerId>,
56 {
57 self.banned_peer_ids
58 .extend(banned_peers.into_iter().map(|id| *id.borrow()));
59 self
60 }
61
62 pub fn with_peer_resolver(mut self, peer_resolver: PeerResolver) -> Self {
66 self.peer_resolver = Some(peer_resolver);
67 self
68 }
69
70 pub fn named(mut self, name: &'static str) -> Self {
72 self.name = Some(name);
73 self
74 }
75
76 pub fn build<S>(self, service: S) -> PublicOverlay
77 where
78 S: Send + Sync + 'static,
79 S: Service<ServiceRequest, QueryResponse = Response>,
80 {
81 let request_prefix = tl_proto::serialize(rpc::Prefix {
82 overlay_id: self.overlay_id.as_bytes(),
83 });
84
85 let entries = PublicOverlayEntries {
86 items: Default::default(),
87 };
88
89 let entry_ttl_sec = self.entry_ttl.as_secs().try_into().unwrap_or(u32::MAX);
90
91 PublicOverlay {
92 inner: Arc::new(Inner {
93 overlay_id: self.overlay_id,
94 min_capacity: self.min_capacity,
95 entry_ttl_sec,
96 peer_resolver: self.peer_resolver,
97 entries: RwLock::new(entries),
98 entries_added: Notify::new(),
99 entries_changed: Notify::new(),
100 entries_removed: Notify::new(),
101 entry_count: AtomicUsize::new(0),
102 banned_peer_ids: self.banned_peer_ids,
103 service: service.boxed(),
104 request_prefix: request_prefix.into_boxed_slice(),
105 metrics: self
106 .name
107 .map(|label| Metrics::new("tycho_public_overlay", label))
108 .unwrap_or_default(),
109 }),
110 }
111 }
112}
113
114#[derive(Clone)]
115#[repr(transparent)]
116pub struct PublicOverlay {
117 inner: Arc<Inner>,
118}
119
120impl PublicOverlay {
121 pub fn builder(overlay_id: OverlayId) -> PublicOverlayBuilder {
122 PublicOverlayBuilder {
123 overlay_id,
124 min_capacity: 100,
125 entry_ttl: Duration::from_secs(3600),
126 banned_peer_ids: Default::default(),
127 peer_resolver: None,
128 name: None,
129 }
130 }
131
132 #[inline]
133 pub fn overlay_id(&self) -> &OverlayId {
134 &self.inner.overlay_id
135 }
136
137 pub fn entry_ttl_sec(&self) -> u32 {
138 self.inner.entry_ttl_sec
139 }
140
141 pub fn peer_resolver(&self) -> &Option<PeerResolver> {
142 &self.inner.peer_resolver
143 }
144
145 pub async fn query(
146 &self,
147 network: &Network,
148 peer_id: &PeerId,
149 mut request: Request,
150 ) -> Result<Response> {
151 self.inner.metrics.record_tx(request.body.len());
152 self.prepend_prefix_to_body(&mut request.body);
153 network.query(peer_id, request).await
154 }
155
156 pub async fn send(
157 &self,
158 network: &Network,
159 peer_id: &PeerId,
160 mut request: Request,
161 ) -> Result<()> {
162 self.inner.metrics.record_tx(request.body.len());
163 self.prepend_prefix_to_body(&mut request.body);
164 network.send(peer_id, request).await
165 }
166
167 pub fn ban_peer(&self, peer_id: PeerId) -> bool {
171 self.inner.banned_peer_ids.insert(peer_id)
172 }
173
174 pub fn unban_peer(&self, peer_id: &PeerId) -> bool {
178 self.inner.banned_peer_ids.remove(peer_id).is_some()
179 }
180
181 pub fn read_entries(&self) -> PublicOverlayEntriesReadGuard<'_> {
182 PublicOverlayEntriesReadGuard {
183 entries: self.inner.entries.read(),
184 }
185 }
186
187 pub fn entires_added(&self) -> &Notify {
189 &self.inner.entries_added
190 }
191
192 pub fn entries_changed(&self) -> &Notify {
194 &self.inner.entries_changed
195 }
196
197 pub fn entries_removed(&self) -> &Notify {
198 &self.inner.entries_removed
199 }
200
201 pub(crate) fn handle_query(&self, req: ServiceRequest) -> BoxFutureOrNoop<Option<Response>> {
202 self.inner.metrics.record_rx(req.body.len());
203 if !self.inner.banned_peer_ids.contains(&req.metadata.peer_id) {
204 BoxFutureOrNoop::future(self.inner.service.on_query(req))
206 } else {
207 BoxFutureOrNoop::Noop
208 }
209 }
210
211 pub(crate) fn handle_message(&self, req: ServiceRequest) -> BoxFutureOrNoop<()> {
212 self.inner.metrics.record_rx(req.body.len());
213 if !self.inner.banned_peer_ids.contains(&req.metadata.peer_id) {
214 BoxFutureOrNoop::future(self.inner.service.on_message(req))
216 } else {
217 BoxFutureOrNoop::Noop
218 }
219 }
220
221 pub(crate) fn add_untrusted_entries(
225 &self,
226 local_id: &PeerId,
227 entries: &[Arc<PublicEntry>],
228 now: u32,
229 ) -> bool {
230 if entries.is_empty() {
231 return false;
232 }
233
234 let this = self.inner.as_ref();
235
236 let to_add = entries.len();
239 let mut entry_count = this.entry_count.load(Ordering::Acquire);
240 let to_add = loop {
241 let to_add = match this.min_capacity.checked_sub(entry_count) {
242 Some(capacity) if capacity > 0 => std::cmp::min(to_add, capacity),
243 _ => return false,
244 };
245
246 let res = this.entry_count.compare_exchange_weak(
247 entry_count,
248 entry_count + to_add,
249 Ordering::Release,
250 Ordering::Acquire,
251 );
252 match res {
253 Ok(_) => break to_add,
254 Err(n) => entry_count = n,
255 }
256 };
257
258 let mut is_valid = vec![false; entries.len()];
260 let mut has_valid = false;
261
262 for (entry, is_valid) in std::iter::zip(entries, is_valid.iter_mut()) {
264 if entry.is_expired(now, this.entry_ttl_sec)
265 || self.inner.banned_peer_ids.contains(&entry.peer_id)
266 || entry.peer_id == local_id
267 {
268 continue;
270 }
271
272 let Some(pubkey) = entry.peer_id.as_public_key() else {
273 continue;
275 };
276
277 if !pubkey.verify(
278 PublicEntryToSign {
279 overlay_id: this.overlay_id.as_bytes(),
280 peer_id: &entry.peer_id,
281 created_at: entry.created_at,
282 },
283 &entry.signature,
284 ) {
285 continue;
287 }
288
289 *is_valid = true;
292 has_valid = true;
293 }
294
295 let mut added = 0;
301 let mut changed = false;
302 if has_valid {
303 let mut stored = this.entries.write();
304 for (entry, is_valid) in std::iter::zip(entries, is_valid) {
305 if !is_valid {
306 continue;
307 }
308
309 let status = stored.insert(&this.peer_resolver, entry);
310 changed |= status.is_changed();
311 added += status.is_added() as usize;
312
313 if added >= to_add {
314 break;
315 }
316 }
317 }
318
319 if added < to_add {
321 this.entry_count
322 .fetch_sub(to_add - added, Ordering::Release);
323 }
324
325 if added > 0 {
326 this.entries_added.notify_waiters();
327 }
328 if changed {
329 this.entries_changed.notify_waiters();
330 }
331
332 changed || added > 0
333 }
334
335 pub(crate) fn remove_invalid_entries(&self, now: u32) {
337 let this = self.inner.as_ref();
338
339 let mut should_notify = false;
340 let mut entries = this.entries.write();
341 entries.retain(|item| {
342 let retain = !item.entry.is_expired(now, this.entry_ttl_sec)
343 && !this.banned_peer_ids.contains(&item.entry.peer_id);
344 should_notify |= !retain;
345 retain
346 });
347
348 if should_notify {
349 self.inner.entries_removed.notify_waiters();
350 }
351 }
352
353 fn prepend_prefix_to_body(&self, body: &mut Bytes) {
354 let this = self.inner.as_ref();
355
356 let mut res = BytesMut::with_capacity(this.request_prefix.len() + body.len());
358 res.extend_from_slice(&this.request_prefix);
359 res.extend_from_slice(body);
360 *body = res.freeze();
361 }
362}
363
364impl std::fmt::Debug for PublicOverlay {
365 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366 f.debug_struct("PublicOverlay")
367 .field("overlay_id", &self.inner.overlay_id)
368 .finish()
369 }
370}
371
372struct Inner {
373 overlay_id: OverlayId,
374 min_capacity: usize,
375 entry_ttl_sec: u32,
376 peer_resolver: Option<PeerResolver>,
377 entries: RwLock<PublicOverlayEntries>,
378 entry_count: AtomicUsize,
379 entries_added: Notify,
380 entries_changed: Notify,
381 entries_removed: Notify,
382 banned_peer_ids: FastDashSet<PeerId>,
383 service: BoxService<ServiceRequest, Response>,
384 request_prefix: Box<[u8]>,
385 metrics: Metrics,
386}
387
388pub struct PublicOverlayEntries {
389 items: OverlayItems,
390}
391
392impl PublicOverlayEntries {
393 pub fn is_empty(&self) -> bool {
395 self.items.is_empty()
396 }
397
398 pub fn len(&self) -> usize {
400 self.items.len()
401 }
402
403 pub fn contains(&self, peer_id: &PeerId) -> bool {
405 self.items.contains_key(peer_id)
406 }
407
408 pub fn iter(&self) -> indexmap::map::Values<'_, PeerId, PublicOverlayEntryData> {
412 self.items.values()
413 }
414
415 pub fn choose<R>(&self, rng: &mut R) -> Option<&PublicOverlayEntryData>
418 where
419 R: Rng + ?Sized,
420 {
421 let index = rng.gen_range(0..self.items.len());
422 let (_, value) = self.items.get_index(index)?;
423 Some(value)
424 }
425
426 pub fn choose_multiple<R>(
429 &self,
430 rng: &mut R,
431 n: usize,
432 ) -> ChooseMultiplePublicOverlayEntries<'_>
433 where
434 R: Rng + ?Sized,
435 {
436 let len = self.items.len();
437 ChooseMultiplePublicOverlayEntries {
438 items: &self.items,
439 indices: rand::seq::index::sample(rng, len, n.min(len)).into_iter(),
440 }
441 }
442
443 pub fn choose_all<R>(&self, rng: &mut R) -> ChooseMultiplePublicOverlayEntries<'_>
446 where
447 R: Rng + ?Sized,
448 {
449 self.choose_multiple(rng, self.items.len())
450 }
451
452 fn insert(&mut self, peer_resolver: &Option<PeerResolver>, item: &PublicEntry) -> UpdateStatus {
453 match self.items.entry(item.peer_id) {
454 indexmap::map::Entry::Vacant(entry) => {
456 let resolver_handle = peer_resolver.as_ref().map_or_else(
457 || PeerResolverHandle::new_noop(&item.peer_id),
458 |resolver| resolver.insert(&item.peer_id, false),
459 );
460
461 entry.insert(PublicOverlayEntryData {
462 entry: Arc::new(item.clone()),
463 resolver_handle,
464 });
465
466 UpdateStatus::Added
467 }
468 indexmap::map::Entry::Occupied(mut entry) => {
470 let existing = entry.get_mut();
471 if existing.entry.created_at >= item.created_at {
472 return UpdateStatus::Skipped;
473 }
474
475 match Arc::get_mut(&mut existing.entry) {
477 Some(existing) => existing.clone_from(item),
478 None => existing.entry = Arc::new(item.clone()),
479 }
480 UpdateStatus::Updated
481 }
482 }
483 }
484
485 fn retain<F>(&mut self, mut f: F)
486 where
487 F: FnMut(&PublicOverlayEntryData) -> bool,
488 {
489 self.items.retain(|_, item| f(item));
490 }
491}
492
493#[derive(Clone)]
494pub struct PublicOverlayEntryData {
495 pub entry: Arc<PublicEntry>,
496 pub resolver_handle: PeerResolverHandle,
497}
498
499impl PublicOverlayEntryData {
500 pub fn is_expired(&self, now: u32, ttl: u32) -> bool {
501 self.entry.is_expired(now, ttl)
502 }
503
504 pub fn expires_at(&self, ttl: u32) -> u32 {
505 self.entry.created_at.saturating_add(ttl)
506 }
507}
508
509pub struct PublicOverlayEntriesReadGuard<'a> {
510 entries: RwLockReadGuard<'a, PublicOverlayEntries>,
511}
512
513impl std::ops::Deref for PublicOverlayEntriesReadGuard<'_> {
514 type Target = PublicOverlayEntries;
515
516 #[inline]
517 fn deref(&self) -> &Self::Target {
518 &self.entries
519 }
520}
521
522#[derive(Debug, Clone, Copy, PartialEq, Eq)]
523enum UpdateStatus {
524 Skipped,
525 Updated,
526 Added,
527}
528
529impl UpdateStatus {
530 fn is_changed(self) -> bool {
531 matches!(self, Self::Updated | Self::Added)
532 }
533
534 fn is_added(self) -> bool {
535 matches!(self, Self::Added)
536 }
537}
538
539pub struct ChooseMultiplePublicOverlayEntries<'a> {
540 items: &'a OverlayItems,
541 indices: rand::seq::index::IndexVecIntoIter,
542}
543
544impl<'a> Iterator for ChooseMultiplePublicOverlayEntries<'a> {
545 type Item = &'a PublicOverlayEntryData;
546
547 fn next(&mut self) -> Option<Self::Item> {
548 self.indices.next().and_then(|i| {
549 let (_, value) = self.items.get_index(i)?;
550 Some(value)
551 })
552 }
553
554 fn size_hint(&self) -> (usize, Option<usize>) {
555 (self.indices.len(), Some(self.indices.len()))
556 }
557}
558
559impl ExactSizeIterator for ChooseMultiplePublicOverlayEntries<'_> {
560 fn len(&self) -> usize {
561 self.indices.len()
562 }
563}
564
565type OverlayItems = IndexMap<PeerId, PublicOverlayEntryData, FastHasherState>;
566
567#[cfg(test)]
568mod tests {
569 use everscale_crypto::ed25519;
570 use tycho_util::time::now_sec;
571
572 use super::*;
573
574 fn generate_public_entry(overlay: &PublicOverlay, now: u32) -> Arc<PublicEntry> {
575 let keypair = ed25519::KeyPair::generate(&mut rand::thread_rng());
576 let peer_id: PeerId = keypair.public_key.into();
577 let signature = keypair.sign(crate::proto::overlay::PublicEntryToSign {
578 overlay_id: overlay.overlay_id().as_bytes(),
579 peer_id: &peer_id,
580 created_at: now,
581 });
582 Arc::new(PublicEntry {
583 peer_id,
584 created_at: now,
585 signature: Box::new(signature),
586 })
587 }
588
589 fn generate_invalid_public_entry(now: u32) -> Arc<PublicEntry> {
590 let keypair = ed25519::KeyPair::generate(&mut rand::thread_rng());
591 let peer_id: PeerId = keypair.public_key.into();
592 Arc::new(PublicEntry {
593 peer_id,
594 created_at: now,
595 signature: Box::new([0; 64]),
596 })
597 }
598
599 fn generate_public_entries(
600 overlay: &PublicOverlay,
601 now: u32,
602 n: usize,
603 ) -> Vec<Arc<PublicEntry>> {
604 (0..n)
605 .map(|_| generate_public_entry(overlay, now))
606 .collect()
607 }
608
609 fn count_entries(overlay: &PublicOverlay) -> usize {
610 let tracked_count = overlay.inner.entry_count.load(Ordering::Acquire);
611 let guard = overlay.read_entries();
612 assert_eq!(guard.entries.items.len(), tracked_count);
613 tracked_count
614 }
615
616 fn make_overlay_with_min_capacity(min_capacity: usize) -> PublicOverlay {
617 PublicOverlay::builder(rand::random())
618 .with_min_capacity(min_capacity)
619 .build(crate::service_query_fn(|_| {
620 futures_util::future::ready(None)
621 }))
622 }
623
624 #[test]
625 fn min_capacity_works_with_single_thread() {
626 let now = now_sec();
627 let local_id: PeerId = rand::random();
628
629 {
631 let overlay = make_overlay_with_min_capacity(10);
632 let entries = generate_public_entries(&overlay, now, 10);
633
634 overlay.add_untrusted_entries(&local_id, &entries[..5], now);
635 assert_eq!(count_entries(&overlay), 5);
636
637 overlay.add_untrusted_entries(&local_id, &entries[5..], now);
638 assert_eq!(count_entries(&overlay), 10);
639 }
640
641 {
643 let overlay = make_overlay_with_min_capacity(10);
644 let entries = generate_public_entries(&overlay, now, 10);
645 overlay.add_untrusted_entries(&local_id, &entries, now);
646 assert_eq!(count_entries(&overlay), 10);
647 }
648
649 {
651 let overlay = make_overlay_with_min_capacity(10);
652 let entries = generate_public_entries(&overlay, now, 20);
653 overlay.add_untrusted_entries(&local_id, &entries, now);
654 assert_eq!(count_entries(&overlay), 10);
655 }
656
657 {
659 let overlay = make_overlay_with_min_capacity(0);
660 let entries = generate_public_entries(&overlay, now, 10);
661 overlay.add_untrusted_entries(&local_id, &entries, now);
662 assert_eq!(count_entries(&overlay), 0);
663 }
664
665 {
667 let overlay = make_overlay_with_min_capacity(10);
668 let entries = (0..10)
669 .map(|_| generate_invalid_public_entry(now))
670 .collect::<Vec<_>>();
671 overlay.add_untrusted_entries(&local_id, &entries, now);
672 assert_eq!(count_entries(&overlay), 0);
673 }
674
675 {
677 let overlay = make_overlay_with_min_capacity(10);
678 let entries = [
679 generate_invalid_public_entry(now),
680 generate_public_entry(&overlay, now),
681 generate_invalid_public_entry(now),
682 generate_public_entry(&overlay, now),
683 generate_invalid_public_entry(now),
684 generate_public_entry(&overlay, now),
685 generate_invalid_public_entry(now),
686 generate_public_entry(&overlay, now),
687 generate_invalid_public_entry(now),
688 generate_public_entry(&overlay, now),
689 ];
690 overlay.add_untrusted_entries(&local_id, &entries, now);
691 assert_eq!(count_entries(&overlay), 5);
692 }
693
694 {
696 let overlay = make_overlay_with_min_capacity(3);
697 let entries = [
698 generate_invalid_public_entry(now),
699 generate_invalid_public_entry(now),
700 generate_invalid_public_entry(now),
701 generate_invalid_public_entry(now),
702 generate_invalid_public_entry(now),
703 generate_public_entry(&overlay, now),
704 generate_public_entry(&overlay, now),
705 generate_public_entry(&overlay, now),
706 generate_public_entry(&overlay, now),
707 generate_public_entry(&overlay, now),
708 ];
709 overlay.add_untrusted_entries(&local_id, &entries, now);
710 assert_eq!(count_entries(&overlay), 3);
711 }
712 }
713
714 #[test]
715 fn min_capacity_works_with_multi_thread() {
716 let now = now_sec();
717 let local_id: PeerId = rand::random();
718
719 let overlay = make_overlay_with_min_capacity(201);
720 let entries = generate_public_entries(&overlay, now, 7 * 3 * 10);
721
722 std::thread::scope(|s| {
723 for entries in entries.chunks_exact(7 * 3) {
724 s.spawn(|| {
725 for entries in entries.chunks_exact(7) {
726 overlay.add_untrusted_entries(&local_id, entries, now);
727 }
728 });
729 }
730 });
731
732 assert_eq!(count_entries(&overlay), 201);
733 }
734}