1use std::convert::TryFrom;
2use std::net::SocketAddrV4;
3use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::Result;
8use crossbeam_queue::SegQueue;
9use parking_lot::Mutex;
10use sha2::Digest;
11use smallvec::SmallVec;
12use tl_proto::{HashWrapper, TlWrite};
13use tokio::sync::mpsc;
14
15use super::overlay_id::IdShort;
16use super::{broadcast_receiver::*, MAX_OVERLAY_PEERS};
17use crate::adnl;
18use crate::proto;
19use crate::rldp::{self, compression, RaptorQDecoder, RaptorQEncoder};
20use crate::util::*;
21
22#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize)]
24#[serde(default)]
25pub struct OverlayOptions {
26 pub max_neighbours: u32,
30
31 pub max_broadcast_log: u32,
35
36 pub broadcast_gc_interval_ms: u64,
40
41 pub overlay_peers_timeout_ms: u64,
45
46 pub max_ordinary_broadcast_len: usize,
51
52 pub broadcast_target_count: u32,
56
57 pub secondary_broadcast_target_count: u32,
61
62 pub secondary_fec_broadcast_target_count: u32,
66
67 pub fec_broadcast_wave_len: usize,
71
72 pub fec_broadcast_wave_interval_ms: u64,
76
77 pub broadcast_timeout_sec: u64,
81
82 pub force_compression: bool,
86}
87
88impl Default for OverlayOptions {
89 fn default() -> Self {
90 Self {
91 max_neighbours: 200,
92 max_broadcast_log: 1000,
93 broadcast_gc_interval_ms: 1000,
94 overlay_peers_timeout_ms: 60000,
95 max_ordinary_broadcast_len: 768,
96 broadcast_target_count: 5,
97 secondary_broadcast_target_count: 3,
98 secondary_fec_broadcast_target_count: 3,
99 fec_broadcast_wave_len: 20,
100 fec_broadcast_wave_interval_ms: 10,
101 broadcast_timeout_sec: 60,
102 force_compression: false,
103 }
104 }
105}
106
107pub struct Overlay {
109 id: IdShort,
111 node_key: Arc<adnl::Key>,
113 options: OverlayOptions,
115
116 owned_broadcasts: FastDashMap<BroadcastId, Arc<OwnedBroadcast>>,
118 finished_broadcasts: SegQueue<BroadcastId>,
120 finished_broadcast_count: AtomicU32,
122
123 received_peers: Arc<Mutex<ReceivedPeersMap>>,
125 received_broadcasts: Arc<BroadcastReceiver<IncomingBroadcastInfo>>,
127
128 nodes: FastDashMap<adnl::NodeIdShort, proto::overlay::NodeOwned>,
130 ignored_peers: FastDashSet<adnl::NodeIdShort>,
132 known_peers: adnl::PeersSet,
134 neighbours: adnl::PeersSet,
136
137 query_prefix: Vec<u8>,
139 message_prefix: Vec<u8>,
141}
142
143impl Overlay {
144 pub(super) fn new(
146 node_key: Arc<adnl::Key>,
147 id: IdShort,
148 peers: &[adnl::NodeIdShort],
149 options: OverlayOptions,
150 ) -> Arc<Self> {
151 let query_prefix = tl_proto::serialize(proto::rpc::OverlayQuery {
152 overlay: id.as_slice(),
153 });
154 let message_prefix = tl_proto::serialize(proto::overlay::Message {
155 overlay: id.as_slice(),
156 });
157
158 let known_peers = adnl::PeersSet::with_peers_and_capacity(peers, MAX_OVERLAY_PEERS);
159
160 let overlay = Arc::new(Self {
161 id,
162 node_key,
163 options,
164 owned_broadcasts: FastDashMap::default(),
165 finished_broadcasts: SegQueue::new(),
166 finished_broadcast_count: AtomicU32::new(0),
167 received_peers: Arc::new(Default::default()),
168 received_broadcasts: Arc::new(BroadcastReceiver::default()),
169 nodes: FastDashMap::default(),
170 ignored_peers: FastDashSet::default(),
171 known_peers,
172 neighbours: adnl::PeersSet::with_capacity(options.max_neighbours),
173 query_prefix,
174 message_prefix,
175 });
176
177 if !peers.is_empty() {
178 overlay.update_neighbours(overlay.options.max_neighbours);
179 }
180
181 let overlay_ref = Arc::downgrade(&overlay);
182 let gc_interval = Duration::from_millis(options.broadcast_gc_interval_ms);
183 tokio::spawn(async move {
184 let mut peers_timeout = 0;
185 while let Some(overlay) = overlay_ref.upgrade() {
186 while overlay.finished_broadcast_count.load(Ordering::Acquire)
187 > options.max_broadcast_log
188 {
189 if let Some(broadcast_id) = overlay.finished_broadcasts.pop() {
190 overlay.owned_broadcasts.remove(&broadcast_id);
191 }
192 overlay
193 .finished_broadcast_count
194 .fetch_sub(1, Ordering::Release);
195 }
196
197 peers_timeout += options.broadcast_gc_interval_ms;
198 if peers_timeout > options.overlay_peers_timeout_ms {
199 overlay.update_neighbours(1);
200 peers_timeout = 0;
201 }
202
203 tokio::time::sleep(gc_interval).await;
204 }
205 });
206
207 overlay
208 }
209
210 #[inline(always)]
212 pub fn options(&self) -> &OverlayOptions {
213 &self.options
214 }
215
216 pub fn metrics(&self) -> OverlayMetrics {
218 OverlayMetrics {
219 owned_broadcasts_len: self.owned_broadcasts.len(),
220 finished_broadcasts_len: self.finished_broadcast_count.load(Ordering::Acquire),
221 node_count: self.nodes.len(),
222 known_peers: self.known_peers.len(),
223 neighbours: self.neighbours.len(),
224 received_broadcasts_data_len: self.received_broadcasts.data_len(),
225 received_broadcasts_barrier_count: self.received_broadcasts.barriers_len(),
226 }
227 }
228
229 pub fn id(&self) -> &IdShort {
231 &self.id
232 }
233
234 pub fn overlay_key(&self) -> &Arc<adnl::Key> {
236 &self.node_key
237 }
238
239 pub fn add_public_peer(
244 &self,
245 adnl: &adnl::Node,
246 addr: SocketAddrV4,
247 node: proto::overlay::Node<'_>,
248 ) -> Result<Option<adnl::NodeIdShort>> {
249 if let Err(e) = self.id.verify_overlay_node(&node) {
250 tracing::warn!(overlay_id = %self.id, %addr, "invalid public overlay node: {e:?}");
251 return Ok(None);
252 }
253
254 let peer_id_full = adnl::NodeIdFull::try_from(node.id)?;
255 let peer_id = peer_id_full.compute_short_id();
256
257 let is_new_peer = adnl.add_peer(
258 adnl::NewPeerContext::PublicOverlay,
259 self.overlay_key().id(),
260 &peer_id,
261 addr,
262 peer_id_full,
263 )?;
264 if is_new_peer {
265 self.insert_public_peer(&peer_id, node);
266 Ok(Some(peer_id))
267 } else {
268 Ok(None)
269 }
270 }
271
272 pub fn add_public_peers<'a, I>(
276 &self,
277 adnl: &adnl::Node,
278 nodes: I,
279 ) -> Result<Vec<adnl::NodeIdShort>>
280 where
281 I: IntoIterator<Item = (SocketAddrV4, proto::overlay::Node<'a>)>,
282 {
283 let local_id = self.overlay_key().id();
284
285 let mut result = Vec::new();
286 for (addr, node) in nodes {
287 if let Err(e) = self.id.verify_overlay_node(&node) {
288 tracing::warn!(overlay_id = %self.id, %addr, "invalid public overlay node: {e:?}");
289 continue;
290 }
291
292 let peer_id_full = adnl::NodeIdFull::try_from(node.id)?;
293 let peer_id = peer_id_full.compute_short_id();
294
295 let is_new_peer = adnl.add_peer(
296 adnl::NewPeerContext::PublicOverlay,
297 local_id,
298 &peer_id,
299 addr,
300 peer_id_full,
301 )?;
302 if is_new_peer {
303 self.insert_public_peer(&peer_id, node);
304 result.push(peer_id);
305 tracing::trace!(overlay_id = %self.id, %peer_id, %addr, "new public peer");
306 }
307 }
308
309 Ok(result)
310 }
311
312 pub fn remove_public_peer(&self, peer_id: &adnl::NodeIdShort) -> bool {
314 if !self.ignored_peers.insert(*peer_id) {
315 return false;
316 }
317 tracing::warn!(overlay_id = %self.id, %peer_id, "removing public overlay peer");
318 if self.neighbours.contains(peer_id) {
319 self.update_neighbours(self.options.max_neighbours);
320 }
321 true
322 }
323
324 pub fn is_known_peer(&self, peer_id: &adnl::NodeIdShort) -> bool {
329 self.known_peers.contains(peer_id)
330 }
331
332 pub fn is_active_public_peer(&self, peer_id: &adnl::NodeIdShort) -> bool {
334 self.known_peers.contains(peer_id) && !self.ignored_peers.contains(peer_id)
335 }
336
337 pub fn write_cached_peers(&self, amount: u32, dst: &adnl::PeersSet) {
339 dst.randomly_fill_from(&self.known_peers, amount, Some(&self.ignored_peers));
340 }
341
342 #[inline(always)]
344 pub fn query_prefix(&self) -> &[u8] {
345 &self.query_prefix
346 }
347
348 #[inline(always)]
350 pub fn message_prefix(&self) -> &[u8] {
351 &self.message_prefix
352 }
353
354 pub fn send_message(
358 &self,
359 adnl: &adnl::Node,
360 peer_id: &adnl::NodeIdShort,
361 data: &[u8],
362 ) -> Result<()> {
363 let local_id = self.overlay_key().id();
364
365 let mut buffer = Vec::with_capacity(self.message_prefix().len() + data.len());
366 buffer.extend_from_slice(self.message_prefix());
367 buffer.extend_from_slice(data);
368 adnl.send_custom_message(local_id, peer_id, &buffer)
369 }
370
371 pub async fn adnl_query<Q>(
375 &self,
376 adnl: &adnl::Node,
377 peer_id: &adnl::NodeIdShort,
378 query: Q,
379 timeout: Option<u64>,
380 ) -> Result<Option<Vec<u8>>>
381 where
382 Q: TlWrite,
383 {
384 let local_id = self.overlay_key().id();
385 type Value = tl_proto::OwnedRawBytes<tl_proto::Boxed>;
386 match adnl
387 .query_with_prefix::<Q, Value>(local_id, peer_id, self.query_prefix(), query, timeout)
388 .await?
389 {
390 Some(answer) => Ok(Some(answer.into_inner())),
391 None => Ok(None),
392 }
393 }
394
395 pub async fn rldp_query<Q>(
399 &self,
400 rldp: &rldp::Node,
401 peer_id: &adnl::NodeIdShort,
402 query: Q,
403 roundtrip: Option<u64>,
404 ) -> Result<(Option<Vec<u8>>, u64)>
405 where
406 Q: TlWrite,
407 {
408 let local_id = self.overlay_key().id();
409
410 let prefix = self.query_prefix();
411 let mut query_data = Vec::with_capacity(prefix.len() + query.max_size_hint());
412 query_data.extend_from_slice(prefix);
413 query.write_to(&mut query_data);
414
415 rldp.query(local_id, peer_id, query_data, roundtrip).await
416 }
417
418 pub fn broadcast(
424 self: &Arc<Self>,
425 adnl: &Arc<adnl::Node>,
426 data: Vec<u8>,
427 source: Option<&Arc<adnl::Key>>,
428 target: BroadcastTarget,
429 ) -> OutgoingBroadcastInfo {
430 let local_id = self.overlay_key().id();
431
432 let key = match source {
433 Some(key) => key,
434 None => &self.node_key,
435 };
436
437 if data.len() <= self.options.max_ordinary_broadcast_len {
438 self.send_broadcast(adnl, local_id, data, key, target)
439 } else {
440 self.send_fec_broadcast(adnl, local_id, data, key, target)
441 }
442 }
443
444 pub async fn wait_for_broadcast(&self) -> IncomingBroadcastInfo {
449 self.received_broadcasts.pop().await
450 }
451
452 pub fn take_new_peers(&self) -> ReceivedPeersMap {
454 let mut peers = self.received_peers.lock();
455 std::mem::take(&mut *peers)
456 }
457
458 pub fn sign_local_node(&self) -> proto::overlay::NodeOwned {
460 let key = self.overlay_key();
461 let version = now();
462
463 let node_to_sign = &proto::overlay::NodeToSign {
464 id: key.id().as_slice(),
465 overlay: self.id().as_slice(),
466 version,
467 };
468 let signature = key.sign(node_to_sign);
469
470 proto::overlay::NodeOwned {
471 id: key.full_id().as_tl().as_equivalent_owned(),
472 overlay: *self.id().as_slice(),
473 version,
474 signature: signature.to_vec().into(),
475 }
476 }
477
478 pub async fn exchange_random_peers(
481 &self,
482 adnl: &adnl::Node,
483 peer_id: &adnl::NodeIdShort,
484 timeout: Option<u64>,
485 ) -> Result<Option<Vec<adnl::NodeIdShort>>> {
486 struct KnownPeers<'a>(&'a adnl::PeersSet);
487
488 impl ExistingPeersFilter for KnownPeers<'_> {
489 fn contains(&self, peer_id: &adnl::NodeIdShort) -> bool {
490 self.0.contains(peer_id)
491 }
492 }
493
494 self.exchange_random_peers_ext(adnl, peer_id, timeout, &KnownPeers(&self.known_peers))
495 .await
496 }
497
498 pub async fn exchange_random_peers_ext(
501 &self,
502 adnl: &adnl::Node,
503 peer_id: &adnl::NodeIdShort,
504 timeout: Option<u64>,
505 existing_peers: &dyn ExistingPeersFilter,
506 ) -> Result<Option<Vec<adnl::NodeIdShort>>> {
507 let query = proto::rpc::OverlayGetRandomPeersOwned {
508 peers: self.prepare_random_peers(),
509 };
510 let answer = match self.adnl_query(adnl, peer_id, query, timeout).await? {
511 Some(answer) => answer,
512 None => {
513 tracing::trace!(overlay_id = %self.id, %peer_id, "no random peers found");
514 return Ok(None);
515 }
516 };
517
518 let answer = tl_proto::deserialize_as_boxed(&answer)?;
519 tracing::trace!(overlay_id = %self.id, %peer_id, "got random peers");
520 let proto::overlay::Nodes { nodes } = self.filter_nodes(answer);
521
522 let nodes = nodes
523 .into_iter()
524 .filter_map(|node| match adnl::NodeIdFull::try_from(node.id) {
525 Ok(full_id) => {
526 let peer_id = full_id.compute_short_id();
527 if !existing_peers.contains(&peer_id) {
528 Some(peer_id)
529 } else {
530 None
531 }
532 }
533 Err(e) => {
534 tracing::warn!(overlay_id = %self.id, %peer_id, "failed to process peer: {e}");
535 None
536 }
537 })
538 .collect();
539 Ok(Some(nodes))
540 }
541
542 pub(super) async fn receive_broadcast(
544 self: &Arc<Self>,
545 adnl: &adnl::Node,
546 local_id: &adnl::NodeIdShort,
547 peer_id: &adnl::NodeIdShort,
548 broadcast: proto::overlay::OverlayBroadcast<'_>,
549 raw_data: &[u8],
550 ) -> Result<()> {
551 if self.is_broadcast_outdated(broadcast.date) {
552 return Ok(());
553 }
554
555 let node_id = adnl::NodeIdFull::try_from(broadcast.src)?;
556 let node_peer_id = node_id.compute_short_id();
557 let source = match broadcast.flags {
558 flags if flags & BROADCAST_FLAG_ANY_SENDER == 0 => Some(node_peer_id),
559 _ => None,
560 };
561
562 let broadcast_data = match compression::decompress(broadcast.data) {
563 Some(decompressed) => {
564 let broadcast_to_sign =
565 make_broadcast_to_sign(&decompressed, broadcast.date, source.as_ref());
566 match node_id.verify(&broadcast_to_sign, broadcast.signature) {
567 Ok(()) => {
568 let broadcast_id = broadcast_to_sign.compute_broadcast_id();
569 if !self.create_broadcast(broadcast_id) {
570 return Ok(());
571 }
572 Some((broadcast_id, decompressed))
573 }
574 Err(_) => None,
575 }
576 }
577 None => None,
578 };
579
580 let (broadcast_id, data) = match broadcast_data {
581 Some((id, data)) => (id, data),
582 None => {
583 let broadcast_to_sign =
584 make_broadcast_to_sign(broadcast.data, broadcast.date, source.as_ref());
585 node_id.verify(&broadcast_to_sign, broadcast.signature)?;
586
587 let broadcast_id = broadcast_to_sign.compute_broadcast_id();
588 if !self.create_broadcast(broadcast_id) {
589 return Ok(());
590 }
591 (broadcast_id, broadcast.data.to_vec())
592 }
593 };
594
595 self.received_broadcasts.push(IncomingBroadcastInfo {
596 packets: 1,
597 data,
598 from: node_peer_id,
599 });
600
601 let neighbours = self
602 .neighbours
603 .get_random_peers(self.options.secondary_broadcast_target_count, Some(peer_id));
604 self.distribute_broadcast(adnl, local_id, &neighbours, raw_data);
605 self.spawn_broadcast_gc_task(broadcast_id);
606
607 Ok(())
608 }
609
610 pub(super) async fn receive_fec_broadcast(
612 self: &Arc<Self>,
613 adnl: &adnl::Node,
614 local_id: &adnl::NodeIdShort,
615 peer_id: &adnl::NodeIdShort,
616 broadcast: proto::overlay::OverlayBroadcastFec<'_>,
617 raw_data: &[u8],
618 ) -> Result<()> {
619 use dashmap::mapref::entry::Entry;
620
621 if self.is_broadcast_outdated(broadcast.date) {
622 return Ok(());
623 }
624
625 let broadcast_id = *broadcast.data_hash;
626 let node_id = adnl::NodeIdFull::try_from(broadcast.src)?;
627 let source = node_id.compute_short_id();
628
629 let signature = match broadcast.signature.len() {
630 64 => broadcast.signature.try_into().unwrap(),
631 _ => return Err(OverlayError::UnsupportedSignature.into()),
632 };
633
634 let transfer = match self.owned_broadcasts.entry(broadcast_id) {
635 Entry::Vacant(entry) => {
637 self.spawn_fec_transfer_receiver(broadcast.fec, broadcast_id, source, entry)?
638 }
639 Entry::Occupied(entry) => entry.get().clone(),
641 };
642 let transfer = match transfer.as_ref() {
643 OwnedBroadcast::Incoming(transfer) => transfer,
644 OwnedBroadcast::Other => return Ok(()),
645 };
646
647 transfer.updated_at.refresh();
648 if transfer.source != source {
649 tracing::trace!(
650 overlay_id = %self.id,
651 broadcast_id = %DisplayBroadcastId(&broadcast_id),
652 "same broadcast but parts from different sources"
653 );
654 return Ok(());
655 }
656
657 if !transfer.history.deliver_packet(broadcast.seqno as u64) {
659 return Ok(());
660 }
661
662 if !transfer.completed.load(Ordering::Acquire) {
664 transfer.broadcast_tx.send(BroadcastFec {
665 node_id,
666 data_hash: broadcast_id,
667 data_size: broadcast.data_size,
668 flags: broadcast.flags,
669 data: broadcast.data.to_vec(),
670 seqno: broadcast.seqno,
671 fec_type: broadcast.fec,
672 date: broadcast.date,
673 signature,
674 })?;
675 }
676
677 let neighbours = self.neighbours.get_random_peers(
679 self.options.secondary_fec_broadcast_target_count,
680 Some(peer_id),
681 );
682 self.distribute_broadcast(adnl, local_id, &neighbours, raw_data);
683
684 Ok(())
685 }
686
687 pub(super) fn process_get_random_peers(
689 &self,
690 query: proto::rpc::OverlayGetRandomPeers<'_>,
691 ) -> proto::overlay::NodesOwned {
692 use std::collections::hash_map::Entry;
693
694 let peers = self.filter_nodes(query.peers).nodes;
696
697 let mut received_peers = self.received_peers.lock();
699 for node in peers {
700 match received_peers.entry(HashWrapper(node.id.as_equivalent_owned())) {
701 Entry::Occupied(mut entry) => {
702 if entry.get().version < node.version {
703 entry.insert(node.as_equivalent_owned());
704 }
705 }
706 Entry::Vacant(entry) => {
707 entry.insert(node.as_equivalent_owned());
708 }
709 }
710 }
711
712 drop(received_peers);
714
715 self.prepare_random_peers()
717 }
718
719 fn send_broadcast(
721 self: &Arc<Self>,
722 adnl: &adnl::Node,
723 local_id: &adnl::NodeIdShort,
724 mut data: Vec<u8>,
725 key: &Arc<adnl::Key>,
726 target: BroadcastTarget,
727 ) -> OutgoingBroadcastInfo {
728 let date = now();
729 let broadcast_to_sign = make_broadcast_to_sign(&data, date, None);
730 let broadcast_id = broadcast_to_sign.compute_broadcast_id();
731 if !self.create_broadcast(broadcast_id) {
732 tracing::warn!(
733 overlay_id = %self.id,
734 broadcast_id = %DisplayBroadcastId(&broadcast_id),
735 "trying to send duplicated broadcast"
736 );
737 return Default::default();
738 }
739 let signature = key.sign(broadcast_to_sign);
740
741 if self.options.force_compression {
742 if let Err(e) = compression::compress(&mut data) {
743 tracing::warn!(
744 overlay_id = %self.id,
745 broadcast_id = %DisplayBroadcastId(&broadcast_id),
746 "failed to compress overlay broadcast: {e:?}"
747 );
748 }
749 }
750
751 let broadcast = proto::overlay::Broadcast::Broadcast(proto::overlay::OverlayBroadcast {
752 src: key.full_id().as_tl(),
753 certificate: proto::overlay::Certificate::EmptyCertificate,
754 flags: BROADCAST_FLAG_ANY_SENDER,
755 data: &data,
756 date,
757 signature: &signature,
758 });
759
760 let mut buffer = Vec::with_capacity(self.message_prefix.len() + broadcast.max_size_hint());
761 buffer.extend_from_slice(&self.message_prefix);
762 broadcast.write_to(&mut buffer);
763 drop(data);
764
765 let neighbours = match target {
766 BroadcastTarget::RandomNeighbours => OwnedBroadcastTarget::Neighbours(
767 self.neighbours
768 .get_random_peers(self.options.broadcast_target_count, None),
769 ),
770 BroadcastTarget::Explicit(neighbours) => OwnedBroadcastTarget::Explicit(neighbours),
771 };
772
773 self.distribute_broadcast(adnl, local_id, neighbours.as_ref(), &buffer);
774 self.spawn_broadcast_gc_task(broadcast_id);
775
776 OutgoingBroadcastInfo {
777 packets: 1,
778 recipient_count: neighbours.as_ref().len(),
779 }
780 }
781
782 fn send_fec_broadcast(
784 self: &Arc<Self>,
785 adnl: &Arc<adnl::Node>,
786 local_id: &adnl::NodeIdShort,
787 mut data: Vec<u8>,
788 key: &Arc<adnl::Key>,
789 target: BroadcastTarget,
790 ) -> OutgoingBroadcastInfo {
791 let broadcast_id = sha2::Sha256::digest(&data).into();
792 if !self.create_broadcast(broadcast_id) {
793 tracing::warn!(
794 overlay_id = %self.id,
795 broadcast_id = %DisplayBroadcastId(&broadcast_id),
796 "trying to send duplicated broadcast",
797 );
798 return Default::default();
799 }
800
801 if self.options.force_compression {
802 if let Err(e) = compression::compress(&mut data) {
803 tracing::warn!(
804 overlay_id = %self.id,
805 broadcast_id = %DisplayBroadcastId(&broadcast_id),
806 "failed to compress overlay FEC broadcast: {e:?}"
807 );
808 }
809 }
810
811 let data_size = data.len() as u32;
812 let mut outgoing_transfer = OutgoingFecTransfer {
813 broadcast_id,
814 encoder: RaptorQEncoder::with_data(&data),
815 seqno: 0,
816 };
817
818 drop(data);
820
821 let neighbours = match target {
822 BroadcastTarget::RandomNeighbours => OwnedBroadcastTarget::Neighbours(
823 self.neighbours
824 .get_random_peers(self.options.broadcast_target_count, None),
825 ),
826 BroadcastTarget::Explicit(neighbours) => OwnedBroadcastTarget::Explicit(neighbours),
827 };
828
829 let info = OutgoingBroadcastInfo {
830 packets: (data_size / outgoing_transfer.encoder.params().packet_len + 1) * 3 / 2,
831 recipient_count: neighbours.as_ref().len(),
832 };
833
834 let wave_len = self.options.fec_broadcast_wave_len;
836 let waves_interval = Duration::from_millis(self.options.fec_broadcast_wave_interval_ms);
837 let overlay = self.clone();
838 let adnl = adnl.clone();
839 let local_id = *local_id;
840 let key = key.clone();
841 tokio::spawn(async move {
842 'outer: while outgoing_transfer.seqno <= info.packets {
844 for _ in 0..wave_len {
845 let data = match overlay.prepare_fec_broadcast(&mut outgoing_transfer, &key) {
846 Ok(data) => data,
847 Err(e) => {
849 tracing::warn!(
850 overlay_id = %overlay.id,
851 broadcast_id = %DisplayBroadcastId(&broadcast_id),
852 "failed to send overlay broadcast: {e}"
853 );
854 break 'outer;
855 }
856 };
857
858 overlay.distribute_broadcast(&adnl, &local_id, neighbours.as_ref(), &data);
859 if outgoing_transfer.seqno > info.packets {
860 break 'outer;
861 }
862 }
863
864 tokio::time::sleep(waves_interval).await;
866 }
867 });
868
869 self.spawn_broadcast_gc_task(broadcast_id);
871
872 info
874 }
875
876 fn filter_nodes<'a>(&self, mut nodes: proto::overlay::Nodes<'a>) -> proto::overlay::Nodes<'a> {
878 nodes.nodes.retain(|node| {
879 if !matches!(
880 node.id,
881 everscale_crypto::tl::PublicKey::Ed25519 { key }
882 if key != self.node_key.full_id().public_key().as_bytes()
883 ) {
884 return false;
885 }
886
887 if let Err(e) = self.id.verify_overlay_node(node) {
888 tracing::warn!(overlay_id = %self.id, "invalid overlay node: {e:?}");
889 return false;
890 }
891
892 true
893 });
894
895 nodes
896 }
897
898 fn prepare_random_peers(&self) -> proto::overlay::NodesOwned {
900 const MAX_PEERS_IN_RESPONSE: u32 = 4;
901
902 let mut nodes = SmallVec::with_capacity(MAX_PEERS_IN_RESPONSE as usize + 1);
903 nodes.push(self.sign_local_node());
904
905 let peers = adnl::PeersSet::with_capacity(MAX_PEERS_IN_RESPONSE);
906 peers.randomly_fill_from(&self.neighbours, MAX_PEERS_IN_RESPONSE, None);
907 for peer_id in &peers {
908 if let Some(node) = self.nodes.get(peer_id) {
909 nodes.push(node.clone());
910 }
911 }
912
913 proto::overlay::NodesOwned { nodes }
914 }
915
916 fn update_neighbours(&self, amount: u32) {
918 tracing::trace!(overlay_id = %self.id, amount, "updating neighbours");
919 self.neighbours
920 .randomly_fill_from(&self.known_peers, amount, Some(&self.ignored_peers));
921 }
922
923 fn insert_public_peer(&self, peer_id: &adnl::NodeIdShort, node: proto::overlay::Node<'_>) {
925 use dashmap::mapref::entry::Entry;
926
927 self.ignored_peers.remove(peer_id);
928 self.known_peers.insert(*peer_id);
929
930 if !self.neighbours.is_full() {
931 self.neighbours.insert(*peer_id);
932 }
933
934 match self.nodes.entry(*peer_id) {
935 Entry::Occupied(mut entry) => {
936 if entry.get().version < node.version {
937 entry.insert(node.as_equivalent_owned());
938 }
939 }
940 Entry::Vacant(entry) => {
941 entry.insert(node.as_equivalent_owned());
942 }
943 }
944 }
945
946 fn create_broadcast(&self, broadcast_id: BroadcastId) -> bool {
948 use dashmap::mapref::entry::Entry;
949
950 match self.owned_broadcasts.entry(broadcast_id) {
951 Entry::Vacant(entry) => {
952 entry.insert(Arc::new(OwnedBroadcast::Other));
953 true
954 }
955 Entry::Occupied(_) => false,
956 }
957 }
958
959 fn spawn_fec_transfer_receiver(
961 self: &Arc<Self>,
962 fec_type: proto::rldp::RaptorQFecType,
963 broadcast_id: BroadcastId,
964 peer_id: adnl::NodeIdShort,
965 entry: VacantBroadcastEntry<'_>,
966 ) -> Result<Arc<OwnedBroadcast>> {
967 let (broadcast_tx, mut broadcast_rx) = mpsc::unbounded_channel();
968
969 let entry = entry
970 .insert(Arc::new(OwnedBroadcast::Incoming(IncomingFecTransfer {
971 completed: AtomicBool::new(false),
972 history: PacketsHistory::for_recv(),
973 broadcast_tx,
974 source: peer_id,
975 updated_at: Default::default(),
976 })))
977 .clone();
978
979 let overlay = self.clone();
981 tokio::spawn(async move {
982 let mut decoder = RaptorQDecoder::with_params(fec_type);
983
984 let mut packets = 0;
986 while let Some(broadcast) = broadcast_rx.recv().await {
987 packets += 1;
988
989 match process_fec_broadcast(&mut decoder, broadcast) {
991 Ok(Some(data)) => {
993 let data = IncomingBroadcastInfo {
994 packets,
995 data,
996 from: peer_id,
997 };
998 overlay.received_broadcasts.push(data);
999 break;
1000 }
1001 Ok(None) => continue,
1003 Err(e) => {
1005 tracing::warn!(
1006 overlay_id = %overlay.id,
1007 broadcast_id = %DisplayBroadcastId(&broadcast_id),
1008 "error when receiving overlay broadcast: {e}"
1009 );
1010 break;
1011 }
1012 }
1013 }
1014
1015 if let Some(broadcast) = overlay.owned_broadcasts.get(&broadcast_id) {
1017 match broadcast.value().as_ref() {
1018 OwnedBroadcast::Incoming(transfer) => {
1019 transfer.completed.store(true, Ordering::Release);
1020 }
1021 _ => {
1022 tracing::error!(
1023 overlay_id = %overlay.id,
1024 broadcast_id = %DisplayBroadcastId(&broadcast_id),
1025 "incoming fec broadcast mismatch"
1026 );
1027 }
1028 }
1029 }
1030 });
1031
1032 let overlay = self.clone();
1034 let broadcast_timeout_sec = self.options.broadcast_timeout_sec;
1035 tokio::spawn(async move {
1036 loop {
1037 tokio::time::sleep(Duration::from_millis(broadcast_timeout_sec * 100)).await;
1038
1039 if let Some(broadcast) = overlay.owned_broadcasts.get(&broadcast_id) {
1041 match broadcast.value().as_ref() {
1042 OwnedBroadcast::Incoming(transfer)
1044 if !transfer.completed.load(Ordering::Acquire)
1045 && !transfer.updated_at.is_expired(broadcast_timeout_sec) =>
1046 {
1047 continue
1048 }
1049 OwnedBroadcast::Incoming(_) => {}
1050 _ => {
1051 tracing::error!(
1052 overlay_id = %overlay.id,
1053 broadcast_id = %DisplayBroadcastId(&broadcast_id),
1054 "incoming fec broadcast mismatch"
1055 );
1056 }
1057 }
1058 }
1059
1060 break;
1061 }
1062
1063 overlay.spawn_broadcast_gc_task(broadcast_id);
1064 });
1065
1066 Ok(entry)
1067 }
1068
1069 fn prepare_fec_broadcast(
1071 &self,
1072 transfer: &mut OutgoingFecTransfer,
1073 key: &Arc<adnl::Key>,
1074 ) -> Result<Vec<u8>> {
1075 let chunk = transfer.encoder.encode(&mut transfer.seqno)?;
1076 let date = now();
1077
1078 let broadcast_to_sign = &make_fec_part_to_sign(
1079 &transfer.broadcast_id,
1080 transfer.encoder.params().total_len,
1081 date,
1082 BROADCAST_FLAG_ANY_SENDER,
1083 transfer.encoder.params(),
1084 &chunk,
1085 transfer.seqno,
1086 None,
1087 );
1088 let signature = key.sign(broadcast_to_sign);
1089
1090 let broadcast =
1091 proto::overlay::Broadcast::BroadcastFec(proto::overlay::OverlayBroadcastFec {
1092 src: key.full_id().as_tl(),
1093 certificate: proto::overlay::Certificate::EmptyCertificate,
1094 data_hash: &transfer.broadcast_id,
1095 data_size: transfer.encoder.params().total_len,
1096 flags: BROADCAST_FLAG_ANY_SENDER,
1097 data: &chunk,
1098 seqno: transfer.seqno,
1099 fec: *transfer.encoder.params(),
1100 date,
1101 signature: &signature,
1102 });
1103
1104 transfer.seqno += 1;
1105
1106 let mut buffer = Vec::with_capacity(self.message_prefix.len() + broadcast.max_size_hint());
1107 buffer.extend_from_slice(&self.message_prefix);
1108 broadcast.write_to(&mut buffer);
1109
1110 Ok(buffer)
1111 }
1112
1113 fn distribute_broadcast(
1115 &self,
1116 adnl: &adnl::Node,
1117 local_id: &adnl::NodeIdShort,
1118 neighbours: &[adnl::NodeIdShort],
1119 data: &[u8],
1120 ) {
1121 for peer_id in neighbours {
1122 if let Err(e) = adnl.send_custom_message(local_id, peer_id, data) {
1123 tracing::warn!(
1124 overlay_id = %self.id,
1125 %peer_id,
1126 "failed to distribute broadcast: {e}"
1127 );
1128 }
1129 }
1130 }
1131
1132 fn is_broadcast_outdated(&self, date: u32) -> bool {
1133 date + (self.options.broadcast_timeout_sec as u32) < now()
1134 }
1135
1136 fn spawn_broadcast_gc_task(self: &Arc<Self>, broadcast_id: BroadcastId) {
1137 let overlay = self.clone();
1138 tokio::spawn(async move {
1139 tokio::time::sleep(Duration::from_secs(overlay.options.broadcast_timeout_sec)).await;
1140 overlay
1141 .finished_broadcast_count
1142 .fetch_add(1, Ordering::Release);
1143 overlay.finished_broadcasts.push(broadcast_id);
1144 });
1145 }
1146}
1147
1148#[derive(Debug, Clone)]
1150pub enum BroadcastTarget {
1151 RandomNeighbours,
1153 Explicit(Arc<Vec<adnl::NodeIdShort>>),
1155}
1156
1157impl Default for BroadcastTarget {
1158 fn default() -> Self {
1159 Self::RandomNeighbours
1160 }
1161}
1162
1163pub trait ExistingPeersFilter: Send + Sync {
1165 fn contains(&self, peer_id: &adnl::NodeIdShort) -> bool;
1166}
1167
1168impl ExistingPeersFilter for () {
1169 fn contains(&self, _: &adnl::NodeIdShort) -> bool {
1170 false
1171 }
1172}
1173
1174impl ExistingPeersFilter for bool {
1175 fn contains(&self, _: &adnl::NodeIdShort) -> bool {
1176 *self
1177 }
1178}
1179
1180impl<S> ExistingPeersFilter for std::collections::HashSet<adnl::NodeIdShort, S>
1181where
1182 S: std::hash::BuildHasher + Send + Sync,
1183{
1184 fn contains(&self, peer_id: &adnl::NodeIdShort) -> bool {
1185 std::collections::HashSet::contains(self, peer_id)
1186 }
1187}
1188
1189impl<S> ExistingPeersFilter for dashmap::DashSet<adnl::NodeIdShort, S>
1190where
1191 S: std::hash::BuildHasher + Send + Sync + Clone,
1192{
1193 fn contains(&self, peer_id: &adnl::NodeIdShort) -> bool {
1194 dashmap::DashSet::contains(self, peer_id)
1195 }
1196}
1197
1198enum OwnedBroadcastTarget {
1199 Neighbours(Vec<adnl::NodeIdShort>),
1200 Explicit(Arc<Vec<adnl::NodeIdShort>>),
1201}
1202
1203impl AsRef<[adnl::NodeIdShort]> for OwnedBroadcastTarget {
1204 fn as_ref(&self) -> &[adnl::NodeIdShort] {
1205 match self {
1206 OwnedBroadcastTarget::Neighbours(neighbours) => neighbours.as_ref(),
1207 OwnedBroadcastTarget::Explicit(neighbours) => neighbours.as_ref(),
1208 }
1209 }
1210}
1211
1212#[derive(Debug, Copy, Clone)]
1214pub struct OverlayMetrics {
1215 pub owned_broadcasts_len: usize,
1216 pub finished_broadcasts_len: u32,
1217 pub node_count: usize,
1218 pub known_peers: usize,
1219 pub neighbours: usize,
1220 pub received_broadcasts_data_len: usize,
1221 pub received_broadcasts_barrier_count: usize,
1222}
1223
1224fn process_fec_broadcast(
1225 decoder: &mut RaptorQDecoder,
1226 broadcast: BroadcastFec,
1227) -> Result<Option<Vec<u8>>> {
1228 let broadcast_id = &broadcast.data_hash;
1229
1230 let broadcast_to_sign = &make_fec_part_to_sign(
1231 broadcast_id,
1232 broadcast.data_size,
1233 broadcast.date,
1234 broadcast.flags,
1235 &broadcast.fec_type,
1236 &broadcast.data,
1237 broadcast.seqno,
1238 if broadcast.flags & BROADCAST_FLAG_ANY_SENDER == 0 {
1239 Some(broadcast.node_id.compute_short_id())
1240 } else {
1241 None
1242 },
1243 );
1244 broadcast
1245 .node_id
1246 .verify(broadcast_to_sign, &broadcast.signature)?;
1247
1248 match decoder.decode(broadcast.seqno, broadcast.data) {
1249 Some(result) if result.len() != broadcast.data_size as usize => {
1250 Err(OverlayError::DataSizeMismatch.into())
1251 }
1252 Some(result) => match compression::decompress(&result) {
1253 Some(decompressed)
1254 if sha2::Sha256::digest(&decompressed).as_slice() == broadcast_id =>
1255 {
1256 Ok(Some(decompressed))
1257 }
1258 _ => {
1259 let data_hash = sha2::Sha256::digest(&result);
1260 if data_hash.as_slice() == broadcast_id {
1261 Ok(Some(result))
1262 } else {
1263 Err(OverlayError::DataHashMismatch.into())
1264 }
1265 }
1266 },
1267 None => Ok(None),
1268 }
1269}
1270
1271#[derive(TlWrite)]
1272#[tl(boxed, id = "overlay.broadcast.toSign", scheme = "scheme.tl")]
1273struct OverlayBroadcastToSign {
1274 hash: [u8; 32],
1275 date: u32,
1276}
1277
1278impl OverlayBroadcastToSign {
1279 fn compute_broadcast_id(&self) -> BroadcastId {
1280 tl_proto::hash(self)
1281 }
1282}
1283
1284fn make_broadcast_to_sign(
1285 data: &[u8],
1286 date: u32,
1287 source: Option<&adnl::NodeIdShort>,
1288) -> OverlayBroadcastToSign {
1289 const BROADCAST_ID: u32 = tl_proto::id!("overlay.broadcast.id", scheme = "scheme.tl");
1290
1291 let mut broadcast_hash = sha2::Sha256::new();
1292 broadcast_hash.update(BROADCAST_ID.to_le_bytes());
1293 broadcast_hash.update(source.map(adnl::NodeIdShort::as_slice).unwrap_or(&[0; 32]));
1294 broadcast_hash.update(sha2::Sha256::digest(data).as_slice());
1295 broadcast_hash.update(BROADCAST_FLAG_ANY_SENDER.to_le_bytes());
1296 let broadcast_hash = broadcast_hash.finalize();
1297
1298 OverlayBroadcastToSign {
1299 hash: broadcast_hash.into(),
1300 date,
1301 }
1302}
1303
1304fn make_fec_part_to_sign(
1305 data_hash: &[u8; 32],
1306 data_size: u32,
1307 date: u32,
1308 flags: u32,
1309 params: &proto::rldp::RaptorQFecType,
1310 part: &[u8],
1311 seqno: u32,
1312 source: Option<adnl::NodeIdShort>,
1313) -> OverlayBroadcastToSign {
1314 const BROADCAST_FEC_ID: u32 = tl_proto::id!("overlay.broadcastFec.id", scheme = "scheme.tl");
1315 const BROADCAST_FEC_PART_ID: u32 =
1316 tl_proto::id!("overlay.broadcastFec.partId", scheme = "scheme.tl");
1317
1318 let mut broadcast_hash = sha2::Sha256::new();
1319 broadcast_hash.update(BROADCAST_FEC_ID.to_le_bytes());
1320 broadcast_hash.update(
1321 source
1322 .as_ref()
1323 .map(adnl::NodeIdShort::as_slice)
1324 .unwrap_or(&[0; 32]),
1325 );
1326 broadcast_hash.update(tl_proto::hash(params));
1327 broadcast_hash.update(data_hash);
1328 broadcast_hash.update(data_size.to_le_bytes());
1329 broadcast_hash.update(flags.to_le_bytes());
1330 let broadcast_hash = broadcast_hash.finalize();
1331
1332 let mut part_hash = sha2::Sha256::new();
1333 part_hash.update(BROADCAST_FEC_PART_ID.to_le_bytes());
1334 part_hash.update(broadcast_hash);
1335 part_hash.update(sha2::Sha256::digest(part).as_slice());
1336 part_hash.update(seqno.to_le_bytes());
1337 let part_hash = part_hash.finalize();
1338
1339 OverlayBroadcastToSign {
1340 hash: part_hash.into(),
1341 date,
1342 }
1343}
1344
1345pub struct IncomingBroadcastInfo {
1347 pub packets: u32,
1348 pub data: Vec<u8>,
1349 pub from: adnl::NodeIdShort,
1350}
1351
1352#[derive(Default, Copy, Clone)]
1354pub struct OutgoingBroadcastInfo {
1355 pub packets: u32,
1356 pub recipient_count: usize,
1357}
1358
1359struct IncomingFecTransfer {
1360 completed: AtomicBool,
1361 history: PacketsHistory,
1362 broadcast_tx: BroadcastFecTx,
1363 source: adnl::NodeIdShort,
1364 updated_at: UpdatedAt,
1365}
1366
1367struct OutgoingFecTransfer {
1368 broadcast_id: BroadcastId,
1369 encoder: RaptorQEncoder,
1370 seqno: u32,
1371}
1372
1373enum OwnedBroadcast {
1374 Other,
1375 Incoming(IncomingFecTransfer),
1376}
1377
1378#[derive(Debug)]
1379struct BroadcastFec {
1380 node_id: adnl::NodeIdFull,
1381 data_hash: BroadcastId,
1382 data_size: u32,
1383 flags: u32,
1384 data: Vec<u8>,
1385 seqno: u32,
1386 fec_type: proto::rldp::RaptorQFecType,
1387 date: u32,
1388 signature: [u8; 64],
1389}
1390
1391type VacantBroadcastEntry<'a> =
1392 dashmap::mapref::entry::VacantEntry<'a, BroadcastId, Arc<OwnedBroadcast>, FastHasherState>;
1393
1394pub type ReceivedPeersMap =
1396 FastHashMap<HashWrapper<everscale_crypto::tl::PublicKeyOwned>, proto::overlay::NodeOwned>;
1397
1398type BroadcastFecTx = mpsc::UnboundedSender<BroadcastFec>;
1399
1400#[derive(Copy, Clone)]
1401pub struct DisplayBroadcastId<'a>(pub &'a BroadcastId);
1402
1403impl std::fmt::Display for DisplayBroadcastId<'_> {
1404 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1405 let mut output = [0u8; 64];
1406 hex::encode_to_slice(self.0, &mut output).ok();
1407
1408 let output = unsafe { std::str::from_utf8_unchecked(&output) };
1410 f.write_str(output)
1411 }
1412}
1413
1414type BroadcastId = [u8; 32];
1415
1416#[derive(thiserror::Error, Debug)]
1417enum OverlayError {
1418 #[error("Unsupported signature")]
1419 UnsupportedSignature,
1420 #[error("Data size mismatch")]
1421 DataSizeMismatch,
1422 #[error("Data hash mismatch")]
1423 DataHashMismatch,
1424}
1425
1426const BROADCAST_FLAG_ANY_SENDER: u32 = 1;