everscale_network/overlay/
overlay.rs

1use std::convert::TryFrom;
2use std::net::SocketAddrV4;
3use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::Result;
8use crossbeam_queue::SegQueue;
9use parking_lot::Mutex;
10use sha2::Digest;
11use smallvec::SmallVec;
12use tl_proto::{HashWrapper, TlWrite};
13use tokio::sync::mpsc;
14
15use super::overlay_id::IdShort;
16use super::{broadcast_receiver::*, MAX_OVERLAY_PEERS};
17use crate::adnl;
18use crate::proto;
19use crate::rldp::{self, compression, RaptorQDecoder, RaptorQEncoder};
20use crate::util::*;
21
22/// Overlay configuration
23#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize)]
24#[serde(default)]
25pub struct OverlayOptions {
26    /// More persistent list of peers. Used to distribute broadcasts.
27    ///
28    /// Default: `200`
29    pub max_neighbours: u32,
30
31    /// Max simultaneous broadcasts.
32    ///
33    /// Default: `1000`
34    pub max_broadcast_log: u32,
35
36    /// Broadcasts GC interval. Will leave at most `max_broadcast_log` each iteration.
37    ///
38    /// Default: `1000` ms
39    pub broadcast_gc_interval_ms: u64,
40
41    /// Neighbours or random peers update interval.
42    ///
43    /// Default: `60000` ms
44    pub overlay_peers_timeout_ms: u64,
45
46    /// Packets with length bigger than this will be sent using FEC broadcast.
47    /// See [`Overlay::broadcast`]
48    ///
49    /// Default: `768` bytes
50    pub max_ordinary_broadcast_len: usize,
51
52    /// Max number of peers to distribute broadcast to.
53    ///
54    /// Default: `5`
55    pub broadcast_target_count: u32,
56
57    /// Max number of peers to redistribute ordinary broadcast to.
58    ///
59    /// Default: `3`
60    pub secondary_broadcast_target_count: u32,
61
62    /// Max number of peers to redistribute FEC broadcast to.
63    ///
64    /// Default: `3`
65    pub secondary_fec_broadcast_target_count: u32,
66
67    /// Number of FEC messages to send in group. There will be a short delay between them.
68    ///
69    /// Default: `20`
70    pub fec_broadcast_wave_len: usize,
71
72    /// Interval between FEC broadcast waves.
73    ///
74    /// Default: `10` ms
75    pub fec_broadcast_wave_interval_ms: u64,
76
77    /// Overlay broadcast timeout. It will be forcefully dropped if not received in this time.
78    ///
79    /// Default: `60` sec
80    pub broadcast_timeout_sec: u64,
81
82    /// Whether requests will be compressed.
83    ///
84    /// Default: `false`
85    pub force_compression: bool,
86}
87
88impl Default for OverlayOptions {
89    fn default() -> Self {
90        Self {
91            max_neighbours: 200,
92            max_broadcast_log: 1000,
93            broadcast_gc_interval_ms: 1000,
94            overlay_peers_timeout_ms: 60000,
95            max_ordinary_broadcast_len: 768,
96            broadcast_target_count: 5,
97            secondary_broadcast_target_count: 3,
98            secondary_fec_broadcast_target_count: 3,
99            fec_broadcast_wave_len: 20,
100            fec_broadcast_wave_interval_ms: 10,
101            broadcast_timeout_sec: 60,
102            force_compression: false,
103        }
104    }
105}
106
107/// P2P messages distribution layer
108pub struct Overlay {
109    /// Unique overlay id
110    id: IdShort,
111    /// Local ADNL key
112    node_key: Arc<adnl::Key>,
113    // Configuration
114    options: OverlayOptions,
115
116    /// Broadcasts in progress
117    owned_broadcasts: FastDashMap<BroadcastId, Arc<OwnedBroadcast>>,
118    /// Broadcasts removal queue
119    finished_broadcasts: SegQueue<BroadcastId>,
120    /// Broadcasts removal queue len
121    finished_broadcast_count: AtomicU32,
122
123    /// New peers to add
124    received_peers: Arc<Mutex<ReceivedPeersMap>>,
125    /// Complete incoming broadcasts queue
126    received_broadcasts: Arc<BroadcastReceiver<IncomingBroadcastInfo>>,
127
128    /// Raw overlay nodes
129    nodes: FastDashMap<adnl::NodeIdShort, proto::overlay::NodeOwned>,
130    /// Peers to exclude from random selection
131    ignored_peers: FastDashSet<adnl::NodeIdShort>,
132    /// All known peers
133    known_peers: adnl::PeersSet,
134    /// Random peers subset
135    neighbours: adnl::PeersSet,
136
137    /// Serialized [`proto::rpc::OverlayQuery`] with own overlay id
138    query_prefix: Vec<u8>,
139    /// Serialized [`proto::overlay::Message`] with own overlay id
140    message_prefix: Vec<u8>,
141}
142
143impl Overlay {
144    /// Create new overlay node on top of the given ADNL node
145    pub(super) fn new(
146        node_key: Arc<adnl::Key>,
147        id: IdShort,
148        peers: &[adnl::NodeIdShort],
149        options: OverlayOptions,
150    ) -> Arc<Self> {
151        let query_prefix = tl_proto::serialize(proto::rpc::OverlayQuery {
152            overlay: id.as_slice(),
153        });
154        let message_prefix = tl_proto::serialize(proto::overlay::Message {
155            overlay: id.as_slice(),
156        });
157
158        let known_peers = adnl::PeersSet::with_peers_and_capacity(peers, MAX_OVERLAY_PEERS);
159
160        let overlay = Arc::new(Self {
161            id,
162            node_key,
163            options,
164            owned_broadcasts: FastDashMap::default(),
165            finished_broadcasts: SegQueue::new(),
166            finished_broadcast_count: AtomicU32::new(0),
167            received_peers: Arc::new(Default::default()),
168            received_broadcasts: Arc::new(BroadcastReceiver::default()),
169            nodes: FastDashMap::default(),
170            ignored_peers: FastDashSet::default(),
171            known_peers,
172            neighbours: adnl::PeersSet::with_capacity(options.max_neighbours),
173            query_prefix,
174            message_prefix,
175        });
176
177        if !peers.is_empty() {
178            overlay.update_neighbours(overlay.options.max_neighbours);
179        }
180
181        let overlay_ref = Arc::downgrade(&overlay);
182        let gc_interval = Duration::from_millis(options.broadcast_gc_interval_ms);
183        tokio::spawn(async move {
184            let mut peers_timeout = 0;
185            while let Some(overlay) = overlay_ref.upgrade() {
186                while overlay.finished_broadcast_count.load(Ordering::Acquire)
187                    > options.max_broadcast_log
188                {
189                    if let Some(broadcast_id) = overlay.finished_broadcasts.pop() {
190                        overlay.owned_broadcasts.remove(&broadcast_id);
191                    }
192                    overlay
193                        .finished_broadcast_count
194                        .fetch_sub(1, Ordering::Release);
195                }
196
197                peers_timeout += options.broadcast_gc_interval_ms;
198                if peers_timeout > options.overlay_peers_timeout_ms {
199                    overlay.update_neighbours(1);
200                    peers_timeout = 0;
201                }
202
203                tokio::time::sleep(gc_interval).await;
204            }
205        });
206
207        overlay
208    }
209
210    /// Configuration
211    #[inline(always)]
212    pub fn options(&self) -> &OverlayOptions {
213        &self.options
214    }
215
216    /// Instant metrics
217    pub fn metrics(&self) -> OverlayMetrics {
218        OverlayMetrics {
219            owned_broadcasts_len: self.owned_broadcasts.len(),
220            finished_broadcasts_len: self.finished_broadcast_count.load(Ordering::Acquire),
221            node_count: self.nodes.len(),
222            known_peers: self.known_peers.len(),
223            neighbours: self.neighbours.len(),
224            received_broadcasts_data_len: self.received_broadcasts.data_len(),
225            received_broadcasts_barrier_count: self.received_broadcasts.barriers_len(),
226        }
227    }
228
229    /// Short overlay id
230    pub fn id(&self) -> &IdShort {
231        &self.id
232    }
233
234    /// Returns local ADNL key for public overlay
235    pub fn overlay_key(&self) -> &Arc<adnl::Key> {
236        &self.node_key
237    }
238
239    /// Verifies and adds new peer to the overlay. Returns `Some` short peer id
240    /// if new peer was successfully added and `None` if peer already existed.
241    ///
242    /// See [`Overlay::add_public_peers`] for multiple peers.
243    pub fn add_public_peer(
244        &self,
245        adnl: &adnl::Node,
246        addr: SocketAddrV4,
247        node: proto::overlay::Node<'_>,
248    ) -> Result<Option<adnl::NodeIdShort>> {
249        if let Err(e) = self.id.verify_overlay_node(&node) {
250            tracing::warn!(overlay_id = %self.id, %addr, "invalid public overlay node: {e:?}");
251            return Ok(None);
252        }
253
254        let peer_id_full = adnl::NodeIdFull::try_from(node.id)?;
255        let peer_id = peer_id_full.compute_short_id();
256
257        let is_new_peer = adnl.add_peer(
258            adnl::NewPeerContext::PublicOverlay,
259            self.overlay_key().id(),
260            &peer_id,
261            addr,
262            peer_id_full,
263        )?;
264        if is_new_peer {
265            self.insert_public_peer(&peer_id, node);
266            Ok(Some(peer_id))
267        } else {
268            Ok(None)
269        }
270    }
271
272    /// Verifies and adds new peers to the overlay. Returns a list of successfully added peers.
273    ///
274    /// See [`Overlay::add_public_peer`] for single peer.
275    pub fn add_public_peers<'a, I>(
276        &self,
277        adnl: &adnl::Node,
278        nodes: I,
279    ) -> Result<Vec<adnl::NodeIdShort>>
280    where
281        I: IntoIterator<Item = (SocketAddrV4, proto::overlay::Node<'a>)>,
282    {
283        let local_id = self.overlay_key().id();
284
285        let mut result = Vec::new();
286        for (addr, node) in nodes {
287            if let Err(e) = self.id.verify_overlay_node(&node) {
288                tracing::warn!(overlay_id = %self.id, %addr, "invalid public overlay node: {e:?}");
289                continue;
290            }
291
292            let peer_id_full = adnl::NodeIdFull::try_from(node.id)?;
293            let peer_id = peer_id_full.compute_short_id();
294
295            let is_new_peer = adnl.add_peer(
296                adnl::NewPeerContext::PublicOverlay,
297                local_id,
298                &peer_id,
299                addr,
300                peer_id_full,
301            )?;
302            if is_new_peer {
303                self.insert_public_peer(&peer_id, node);
304                result.push(peer_id);
305                tracing::trace!(overlay_id = %self.id, %peer_id, %addr, "new public peer");
306            }
307        }
308
309        Ok(result)
310    }
311
312    /// Removes peer from random peers and adds it to ignored peers
313    pub fn remove_public_peer(&self, peer_id: &adnl::NodeIdShort) -> bool {
314        if !self.ignored_peers.insert(*peer_id) {
315            return false;
316        }
317        tracing::warn!(overlay_id = %self.id, %peer_id, "removing public overlay peer");
318        if self.neighbours.contains(peer_id) {
319            self.update_neighbours(self.options.max_neighbours);
320        }
321        true
322    }
323
324    /// Checks whether the specified peer has ever been in this public overlay
325    ///
326    /// NOTE: Peer might have been excluded. If you need to check whether the
327    /// specified peer is still in this overlay use [`Overlay::is_active_public_peer`]
328    pub fn is_known_peer(&self, peer_id: &adnl::NodeIdShort) -> bool {
329        self.known_peers.contains(peer_id)
330    }
331
332    /// Checks whether the specified peer is in the current public overlay
333    pub fn is_active_public_peer(&self, peer_id: &adnl::NodeIdShort) -> bool {
334        self.known_peers.contains(peer_id) && !self.ignored_peers.contains(peer_id)
335    }
336
337    /// Fill `dst` with `amount` peers from known peers
338    pub fn write_cached_peers(&self, amount: u32, dst: &adnl::PeersSet) {
339        dst.randomly_fill_from(&self.known_peers, amount, Some(&self.ignored_peers));
340    }
341
342    /// Serialized [`proto::rpc::OverlayQuery`] with own overlay id
343    #[inline(always)]
344    pub fn query_prefix(&self) -> &[u8] {
345        &self.query_prefix
346    }
347
348    /// Serialized [`proto::overlay::Message`] with own overlay id
349    #[inline(always)]
350    pub fn message_prefix(&self) -> &[u8] {
351        &self.message_prefix
352    }
353
354    /// Sends direct ADNL message ([`proto::adnl::Message::Custom`]) to the given peer.
355    ///
356    /// NOTE: Local id ([`Overlay::overlay_key`]) will be used as sender
357    pub fn send_message(
358        &self,
359        adnl: &adnl::Node,
360        peer_id: &adnl::NodeIdShort,
361        data: &[u8],
362    ) -> Result<()> {
363        let local_id = self.overlay_key().id();
364
365        let mut buffer = Vec::with_capacity(self.message_prefix().len() + data.len());
366        buffer.extend_from_slice(self.message_prefix());
367        buffer.extend_from_slice(data);
368        adnl.send_custom_message(local_id, peer_id, &buffer)
369    }
370
371    /// Sends ADNL query directly to the given peer. In case of timeout returns `Ok(None)`
372    ///
373    /// NOTE: Local id ([`Overlay::overlay_key`]) will be used as sender
374    pub async fn adnl_query<Q>(
375        &self,
376        adnl: &adnl::Node,
377        peer_id: &adnl::NodeIdShort,
378        query: Q,
379        timeout: Option<u64>,
380    ) -> Result<Option<Vec<u8>>>
381    where
382        Q: TlWrite,
383    {
384        let local_id = self.overlay_key().id();
385        type Value = tl_proto::OwnedRawBytes<tl_proto::Boxed>;
386        match adnl
387            .query_with_prefix::<Q, Value>(local_id, peer_id, self.query_prefix(), query, timeout)
388            .await?
389        {
390            Some(answer) => Ok(Some(answer.into_inner())),
391            None => Ok(None),
392        }
393    }
394
395    /// Sends RLDP query directly to the given peer. In case of timeout returns `Ok((None, max_timeout))`
396    ///
397    /// NOTE: Local id ([`Overlay::overlay_key`]) will be used as sender
398    pub async fn rldp_query<Q>(
399        &self,
400        rldp: &rldp::Node,
401        peer_id: &adnl::NodeIdShort,
402        query: Q,
403        roundtrip: Option<u64>,
404    ) -> Result<(Option<Vec<u8>>, u64)>
405    where
406        Q: TlWrite,
407    {
408        let local_id = self.overlay_key().id();
409
410        let prefix = self.query_prefix();
411        let mut query_data = Vec::with_capacity(prefix.len() + query.max_size_hint());
412        query_data.extend_from_slice(prefix);
413        query.write_to(&mut query_data);
414
415        rldp.query(local_id, peer_id, query_data, roundtrip).await
416    }
417
418    /// Distributes provided message to the neighbours subset.
419    ///
420    /// See `broadcast_target_count` in [`OverlayOptions`]
421    ///
422    /// NOTE: If `data` len is greater than
423    pub fn broadcast(
424        self: &Arc<Self>,
425        adnl: &Arc<adnl::Node>,
426        data: Vec<u8>,
427        source: Option<&Arc<adnl::Key>>,
428        target: BroadcastTarget,
429    ) -> OutgoingBroadcastInfo {
430        let local_id = self.overlay_key().id();
431
432        let key = match source {
433            Some(key) => key,
434            None => &self.node_key,
435        };
436
437        if data.len() <= self.options.max_ordinary_broadcast_len {
438            self.send_broadcast(adnl, local_id, data, key, target)
439        } else {
440            self.send_fec_broadcast(adnl, local_id, data, key, target)
441        }
442    }
443
444    /// Waits until the next received broadcast.
445    ///
446    /// NOTE: It is important to keep polling this method because otherwise
447    /// received broadcasts queue will consume all the memory.
448    pub async fn wait_for_broadcast(&self) -> IncomingBroadcastInfo {
449        self.received_broadcasts.pop().await
450    }
451
452    /// Take received peers map
453    pub fn take_new_peers(&self) -> ReceivedPeersMap {
454        let mut peers = self.received_peers.lock();
455        std::mem::take(&mut *peers)
456    }
457
458    /// Returns raw signed overlay node
459    pub fn sign_local_node(&self) -> proto::overlay::NodeOwned {
460        let key = self.overlay_key();
461        let version = now();
462
463        let node_to_sign = &proto::overlay::NodeToSign {
464            id: key.id().as_slice(),
465            overlay: self.id().as_slice(),
466            version,
467        };
468        let signature = key.sign(node_to_sign);
469
470        proto::overlay::NodeOwned {
471            id: key.full_id().as_tl().as_equivalent_owned(),
472            overlay: *self.id().as_slice(),
473            version,
474            signature: signature.to_vec().into(),
475        }
476    }
477
478    /// Exchanges random peers with the specified peer. Returns `Ok(None)` in case of timeout.
479    /// Uses the default existing peers filter.
480    pub async fn exchange_random_peers(
481        &self,
482        adnl: &adnl::Node,
483        peer_id: &adnl::NodeIdShort,
484        timeout: Option<u64>,
485    ) -> Result<Option<Vec<adnl::NodeIdShort>>> {
486        struct KnownPeers<'a>(&'a adnl::PeersSet);
487
488        impl ExistingPeersFilter for KnownPeers<'_> {
489            fn contains(&self, peer_id: &adnl::NodeIdShort) -> bool {
490                self.0.contains(peer_id)
491            }
492        }
493
494        self.exchange_random_peers_ext(adnl, peer_id, timeout, &KnownPeers(&self.known_peers))
495            .await
496    }
497
498    /// Exchanges random peers with the specified peer. Returns `Ok(None)` in case of timeout.
499    /// Uses the specified existing peers filter.
500    pub async fn exchange_random_peers_ext(
501        &self,
502        adnl: &adnl::Node,
503        peer_id: &adnl::NodeIdShort,
504        timeout: Option<u64>,
505        existing_peers: &dyn ExistingPeersFilter,
506    ) -> Result<Option<Vec<adnl::NodeIdShort>>> {
507        let query = proto::rpc::OverlayGetRandomPeersOwned {
508            peers: self.prepare_random_peers(),
509        };
510        let answer = match self.adnl_query(adnl, peer_id, query, timeout).await? {
511            Some(answer) => answer,
512            None => {
513                tracing::trace!(overlay_id = %self.id, %peer_id, "no random peers found");
514                return Ok(None);
515            }
516        };
517
518        let answer = tl_proto::deserialize_as_boxed(&answer)?;
519        tracing::trace!(overlay_id = %self.id, %peer_id, "got random peers");
520        let proto::overlay::Nodes { nodes } = self.filter_nodes(answer);
521
522        let nodes = nodes
523            .into_iter()
524            .filter_map(|node| match adnl::NodeIdFull::try_from(node.id) {
525                Ok(full_id) => {
526                    let peer_id = full_id.compute_short_id();
527                    if !existing_peers.contains(&peer_id) {
528                        Some(peer_id)
529                    } else {
530                        None
531                    }
532                }
533                Err(e) => {
534                    tracing::warn!(overlay_id = %self.id, %peer_id, "failed to process peer: {e}");
535                    None
536                }
537            })
538            .collect();
539        Ok(Some(nodes))
540    }
541
542    /// Process ordinary broadcast
543    pub(super) async fn receive_broadcast(
544        self: &Arc<Self>,
545        adnl: &adnl::Node,
546        local_id: &adnl::NodeIdShort,
547        peer_id: &adnl::NodeIdShort,
548        broadcast: proto::overlay::OverlayBroadcast<'_>,
549        raw_data: &[u8],
550    ) -> Result<()> {
551        if self.is_broadcast_outdated(broadcast.date) {
552            return Ok(());
553        }
554
555        let node_id = adnl::NodeIdFull::try_from(broadcast.src)?;
556        let node_peer_id = node_id.compute_short_id();
557        let source = match broadcast.flags {
558            flags if flags & BROADCAST_FLAG_ANY_SENDER == 0 => Some(node_peer_id),
559            _ => None,
560        };
561
562        let broadcast_data = match compression::decompress(broadcast.data) {
563            Some(decompressed) => {
564                let broadcast_to_sign =
565                    make_broadcast_to_sign(&decompressed, broadcast.date, source.as_ref());
566                match node_id.verify(&broadcast_to_sign, broadcast.signature) {
567                    Ok(()) => {
568                        let broadcast_id = broadcast_to_sign.compute_broadcast_id();
569                        if !self.create_broadcast(broadcast_id) {
570                            return Ok(());
571                        }
572                        Some((broadcast_id, decompressed))
573                    }
574                    Err(_) => None,
575                }
576            }
577            None => None,
578        };
579
580        let (broadcast_id, data) = match broadcast_data {
581            Some((id, data)) => (id, data),
582            None => {
583                let broadcast_to_sign =
584                    make_broadcast_to_sign(broadcast.data, broadcast.date, source.as_ref());
585                node_id.verify(&broadcast_to_sign, broadcast.signature)?;
586
587                let broadcast_id = broadcast_to_sign.compute_broadcast_id();
588                if !self.create_broadcast(broadcast_id) {
589                    return Ok(());
590                }
591                (broadcast_id, broadcast.data.to_vec())
592            }
593        };
594
595        self.received_broadcasts.push(IncomingBroadcastInfo {
596            packets: 1,
597            data,
598            from: node_peer_id,
599        });
600
601        let neighbours = self
602            .neighbours
603            .get_random_peers(self.options.secondary_broadcast_target_count, Some(peer_id));
604        self.distribute_broadcast(adnl, local_id, &neighbours, raw_data);
605        self.spawn_broadcast_gc_task(broadcast_id);
606
607        Ok(())
608    }
609
610    /// Process FEC broadcast
611    pub(super) async fn receive_fec_broadcast(
612        self: &Arc<Self>,
613        adnl: &adnl::Node,
614        local_id: &adnl::NodeIdShort,
615        peer_id: &adnl::NodeIdShort,
616        broadcast: proto::overlay::OverlayBroadcastFec<'_>,
617        raw_data: &[u8],
618    ) -> Result<()> {
619        use dashmap::mapref::entry::Entry;
620
621        if self.is_broadcast_outdated(broadcast.date) {
622            return Ok(());
623        }
624
625        let broadcast_id = *broadcast.data_hash;
626        let node_id = adnl::NodeIdFull::try_from(broadcast.src)?;
627        let source = node_id.compute_short_id();
628
629        let signature = match broadcast.signature.len() {
630            64 => broadcast.signature.try_into().unwrap(),
631            _ => return Err(OverlayError::UnsupportedSignature.into()),
632        };
633
634        let transfer = match self.owned_broadcasts.entry(broadcast_id) {
635            // First packet of the broadcast
636            Entry::Vacant(entry) => {
637                self.spawn_fec_transfer_receiver(broadcast.fec, broadcast_id, source, entry)?
638            }
639            // Broadcast was already started
640            Entry::Occupied(entry) => entry.get().clone(),
641        };
642        let transfer = match transfer.as_ref() {
643            OwnedBroadcast::Incoming(transfer) => transfer,
644            OwnedBroadcast::Other => return Ok(()),
645        };
646
647        transfer.updated_at.refresh();
648        if transfer.source != source {
649            tracing::trace!(
650                overlay_id = %self.id,
651                broadcast_id = %DisplayBroadcastId(&broadcast_id),
652                "same broadcast but parts from different sources"
653            );
654            return Ok(());
655        }
656
657        // Ignore duplicate packets
658        if !transfer.history.deliver_packet(broadcast.seqno as u64) {
659            return Ok(());
660        }
661
662        // Send broadcast to the processing queue
663        if !transfer.completed.load(Ordering::Acquire) {
664            transfer.broadcast_tx.send(BroadcastFec {
665                node_id,
666                data_hash: broadcast_id,
667                data_size: broadcast.data_size,
668                flags: broadcast.flags,
669                data: broadcast.data.to_vec(),
670                seqno: broadcast.seqno,
671                fec_type: broadcast.fec,
672                date: broadcast.date,
673                signature,
674            })?;
675        }
676
677        // Redistribute broadcast
678        let neighbours = self.neighbours.get_random_peers(
679            self.options.secondary_fec_broadcast_target_count,
680            Some(peer_id),
681        );
682        self.distribute_broadcast(adnl, local_id, &neighbours, raw_data);
683
684        Ok(())
685    }
686
687    /// Process random peers request
688    pub(super) fn process_get_random_peers(
689        &self,
690        query: proto::rpc::OverlayGetRandomPeers<'_>,
691    ) -> proto::overlay::NodesOwned {
692        use std::collections::hash_map::Entry;
693
694        // Update received peers
695        let peers = self.filter_nodes(query.peers).nodes;
696
697        // Insert received peers
698        let mut received_peers = self.received_peers.lock();
699        for node in peers {
700            match received_peers.entry(HashWrapper(node.id.as_equivalent_owned())) {
701                Entry::Occupied(mut entry) => {
702                    if entry.get().version < node.version {
703                        entry.insert(node.as_equivalent_owned());
704                    }
705                }
706                Entry::Vacant(entry) => {
707                    entry.insert(node.as_equivalent_owned());
708                }
709            }
710        }
711
712        // NOTE: reduce lock scope
713        drop(received_peers);
714
715        // Return random peers from our side
716        self.prepare_random_peers()
717    }
718
719    /// Send ordinary broadcast
720    fn send_broadcast(
721        self: &Arc<Self>,
722        adnl: &adnl::Node,
723        local_id: &adnl::NodeIdShort,
724        mut data: Vec<u8>,
725        key: &Arc<adnl::Key>,
726        target: BroadcastTarget,
727    ) -> OutgoingBroadcastInfo {
728        let date = now();
729        let broadcast_to_sign = make_broadcast_to_sign(&data, date, None);
730        let broadcast_id = broadcast_to_sign.compute_broadcast_id();
731        if !self.create_broadcast(broadcast_id) {
732            tracing::warn!(
733                overlay_id = %self.id,
734                broadcast_id = %DisplayBroadcastId(&broadcast_id),
735                "trying to send duplicated broadcast"
736            );
737            return Default::default();
738        }
739        let signature = key.sign(broadcast_to_sign);
740
741        if self.options.force_compression {
742            if let Err(e) = compression::compress(&mut data) {
743                tracing::warn!(
744                    overlay_id = %self.id,
745                    broadcast_id = %DisplayBroadcastId(&broadcast_id),
746                    "failed to compress overlay broadcast: {e:?}"
747                );
748            }
749        }
750
751        let broadcast = proto::overlay::Broadcast::Broadcast(proto::overlay::OverlayBroadcast {
752            src: key.full_id().as_tl(),
753            certificate: proto::overlay::Certificate::EmptyCertificate,
754            flags: BROADCAST_FLAG_ANY_SENDER,
755            data: &data,
756            date,
757            signature: &signature,
758        });
759
760        let mut buffer = Vec::with_capacity(self.message_prefix.len() + broadcast.max_size_hint());
761        buffer.extend_from_slice(&self.message_prefix);
762        broadcast.write_to(&mut buffer);
763        drop(data);
764
765        let neighbours = match target {
766            BroadcastTarget::RandomNeighbours => OwnedBroadcastTarget::Neighbours(
767                self.neighbours
768                    .get_random_peers(self.options.broadcast_target_count, None),
769            ),
770            BroadcastTarget::Explicit(neighbours) => OwnedBroadcastTarget::Explicit(neighbours),
771        };
772
773        self.distribute_broadcast(adnl, local_id, neighbours.as_ref(), &buffer);
774        self.spawn_broadcast_gc_task(broadcast_id);
775
776        OutgoingBroadcastInfo {
777            packets: 1,
778            recipient_count: neighbours.as_ref().len(),
779        }
780    }
781
782    /// Send FEC broadcast
783    fn send_fec_broadcast(
784        self: &Arc<Self>,
785        adnl: &Arc<adnl::Node>,
786        local_id: &adnl::NodeIdShort,
787        mut data: Vec<u8>,
788        key: &Arc<adnl::Key>,
789        target: BroadcastTarget,
790    ) -> OutgoingBroadcastInfo {
791        let broadcast_id = sha2::Sha256::digest(&data).into();
792        if !self.create_broadcast(broadcast_id) {
793            tracing::warn!(
794                overlay_id = %self.id,
795                broadcast_id = %DisplayBroadcastId(&broadcast_id),
796                "trying to send duplicated broadcast",
797            );
798            return Default::default();
799        }
800
801        if self.options.force_compression {
802            if let Err(e) = compression::compress(&mut data) {
803                tracing::warn!(
804                    overlay_id = %self.id,
805                    broadcast_id = %DisplayBroadcastId(&broadcast_id),
806                    "failed to compress overlay FEC broadcast: {e:?}"
807                );
808            }
809        }
810
811        let data_size = data.len() as u32;
812        let mut outgoing_transfer = OutgoingFecTransfer {
813            broadcast_id,
814            encoder: RaptorQEncoder::with_data(&data),
815            seqno: 0,
816        };
817
818        // NOTE: Data is already in encoder and not needed anymore
819        drop(data);
820
821        let neighbours = match target {
822            BroadcastTarget::RandomNeighbours => OwnedBroadcastTarget::Neighbours(
823                self.neighbours
824                    .get_random_peers(self.options.broadcast_target_count, None),
825            ),
826            BroadcastTarget::Explicit(neighbours) => OwnedBroadcastTarget::Explicit(neighbours),
827        };
828
829        let info = OutgoingBroadcastInfo {
830            packets: (data_size / outgoing_transfer.encoder.params().packet_len + 1) * 3 / 2,
831            recipient_count: neighbours.as_ref().len(),
832        };
833
834        // Spawn sender
835        let wave_len = self.options.fec_broadcast_wave_len;
836        let waves_interval = Duration::from_millis(self.options.fec_broadcast_wave_interval_ms);
837        let overlay = self.clone();
838        let adnl = adnl.clone();
839        let local_id = *local_id;
840        let key = key.clone();
841        tokio::spawn(async move {
842            // Send broadcast in waves
843            'outer: while outgoing_transfer.seqno <= info.packets {
844                for _ in 0..wave_len {
845                    let data = match overlay.prepare_fec_broadcast(&mut outgoing_transfer, &key) {
846                        Ok(data) => data,
847                        // Rare case, it is easier to just ignore it
848                        Err(e) => {
849                            tracing::warn!(
850                                overlay_id = %overlay.id,
851                                broadcast_id = %DisplayBroadcastId(&broadcast_id),
852                                "failed to send overlay broadcast: {e}"
853                            );
854                            break 'outer;
855                        }
856                    };
857
858                    overlay.distribute_broadcast(&adnl, &local_id, neighbours.as_ref(), &data);
859                    if outgoing_transfer.seqno > info.packets {
860                        break 'outer;
861                    }
862                }
863
864                // Sleep between waves
865                tokio::time::sleep(waves_interval).await;
866            }
867        });
868
869        // Schedule broadcast cleanup
870        self.spawn_broadcast_gc_task(broadcast_id);
871
872        // Done
873        info
874    }
875
876    /// Verifies and retains only valid remote peers
877    fn filter_nodes<'a>(&self, mut nodes: proto::overlay::Nodes<'a>) -> proto::overlay::Nodes<'a> {
878        nodes.nodes.retain(|node| {
879            if !matches!(
880                node.id,
881                everscale_crypto::tl::PublicKey::Ed25519 { key }
882                if key != self.node_key.full_id().public_key().as_bytes()
883            ) {
884                return false;
885            }
886
887            if let Err(e) = self.id.verify_overlay_node(node) {
888                tracing::warn!(overlay_id = %self.id, "invalid overlay node: {e:?}");
889                return false;
890            }
891
892            true
893        });
894
895        nodes
896    }
897
898    /// Creates nodes list
899    fn prepare_random_peers(&self) -> proto::overlay::NodesOwned {
900        const MAX_PEERS_IN_RESPONSE: u32 = 4;
901
902        let mut nodes = SmallVec::with_capacity(MAX_PEERS_IN_RESPONSE as usize + 1);
903        nodes.push(self.sign_local_node());
904
905        let peers = adnl::PeersSet::with_capacity(MAX_PEERS_IN_RESPONSE);
906        peers.randomly_fill_from(&self.neighbours, MAX_PEERS_IN_RESPONSE, None);
907        for peer_id in &peers {
908            if let Some(node) = self.nodes.get(peer_id) {
909                nodes.push(node.clone());
910            }
911        }
912
913        proto::overlay::NodesOwned { nodes }
914    }
915
916    /// Fills neighbours with a random subset from known peers
917    fn update_neighbours(&self, amount: u32) {
918        tracing::trace!(overlay_id = %self.id, amount, "updating neighbours");
919        self.neighbours
920            .randomly_fill_from(&self.known_peers, amount, Some(&self.ignored_peers));
921    }
922
923    /// Adds public peer info
924    fn insert_public_peer(&self, peer_id: &adnl::NodeIdShort, node: proto::overlay::Node<'_>) {
925        use dashmap::mapref::entry::Entry;
926
927        self.ignored_peers.remove(peer_id);
928        self.known_peers.insert(*peer_id);
929
930        if !self.neighbours.is_full() {
931            self.neighbours.insert(*peer_id);
932        }
933
934        match self.nodes.entry(*peer_id) {
935            Entry::Occupied(mut entry) => {
936                if entry.get().version < node.version {
937                    entry.insert(node.as_equivalent_owned());
938                }
939            }
940            Entry::Vacant(entry) => {
941                entry.insert(node.as_equivalent_owned());
942            }
943        }
944    }
945
946    /// Adds new broadcast id
947    fn create_broadcast(&self, broadcast_id: BroadcastId) -> bool {
948        use dashmap::mapref::entry::Entry;
949
950        match self.owned_broadcasts.entry(broadcast_id) {
951            Entry::Vacant(entry) => {
952                entry.insert(Arc::new(OwnedBroadcast::Other));
953                true
954            }
955            Entry::Occupied(_) => false,
956        }
957    }
958
959    /// Creates incoming FEC broadcast
960    fn spawn_fec_transfer_receiver(
961        self: &Arc<Self>,
962        fec_type: proto::rldp::RaptorQFecType,
963        broadcast_id: BroadcastId,
964        peer_id: adnl::NodeIdShort,
965        entry: VacantBroadcastEntry<'_>,
966    ) -> Result<Arc<OwnedBroadcast>> {
967        let (broadcast_tx, mut broadcast_rx) = mpsc::unbounded_channel();
968
969        let entry = entry
970            .insert(Arc::new(OwnedBroadcast::Incoming(IncomingFecTransfer {
971                completed: AtomicBool::new(false),
972                history: PacketsHistory::for_recv(),
973                broadcast_tx,
974                source: peer_id,
975                updated_at: Default::default(),
976            })))
977            .clone();
978
979        // Spawn packets receiver
980        let overlay = self.clone();
981        tokio::spawn(async move {
982            let mut decoder = RaptorQDecoder::with_params(fec_type);
983
984            // For each fec broadcast packet
985            let mut packets = 0;
986            while let Some(broadcast) = broadcast_rx.recv().await {
987                packets += 1;
988
989                // Add new data to the encoder
990                match process_fec_broadcast(&mut decoder, broadcast) {
991                    // Broadcast complete and successfully decoded
992                    Ok(Some(data)) => {
993                        let data = IncomingBroadcastInfo {
994                            packets,
995                            data,
996                            from: peer_id,
997                        };
998                        overlay.received_broadcasts.push(data);
999                        break;
1000                    }
1001                    // Broadcast is not complete yet
1002                    Ok(None) => continue,
1003                    // Error during decoding
1004                    Err(e) => {
1005                        tracing::warn!(
1006                            overlay_id = %overlay.id,
1007                            broadcast_id = %DisplayBroadcastId(&broadcast_id),
1008                            "error when receiving overlay broadcast: {e}"
1009                        );
1010                        break;
1011                    }
1012                }
1013            }
1014
1015            // Mark broadcast as completed
1016            if let Some(broadcast) = overlay.owned_broadcasts.get(&broadcast_id) {
1017                match broadcast.value().as_ref() {
1018                    OwnedBroadcast::Incoming(transfer) => {
1019                        transfer.completed.store(true, Ordering::Release);
1020                    }
1021                    _ => {
1022                        tracing::error!(
1023                            overlay_id = %overlay.id,
1024                            broadcast_id = %DisplayBroadcastId(&broadcast_id),
1025                            "incoming fec broadcast mismatch"
1026                        );
1027                    }
1028                }
1029            }
1030        });
1031
1032        // Spawn broadcast cleanup task
1033        let overlay = self.clone();
1034        let broadcast_timeout_sec = self.options.broadcast_timeout_sec;
1035        tokio::spawn(async move {
1036            loop {
1037                tokio::time::sleep(Duration::from_millis(broadcast_timeout_sec * 100)).await;
1038
1039                // Find incoming broadcast
1040                if let Some(broadcast) = overlay.owned_broadcasts.get(&broadcast_id) {
1041                    match broadcast.value().as_ref() {
1042                        // Keep waiting if broadcast is not expired or not complete
1043                        OwnedBroadcast::Incoming(transfer)
1044                            if !transfer.completed.load(Ordering::Acquire)
1045                                && !transfer.updated_at.is_expired(broadcast_timeout_sec) =>
1046                        {
1047                            continue
1048                        }
1049                        OwnedBroadcast::Incoming(_) => {}
1050                        _ => {
1051                            tracing::error!(
1052                                overlay_id = %overlay.id,
1053                                broadcast_id = %DisplayBroadcastId(&broadcast_id),
1054                                "incoming fec broadcast mismatch"
1055                            );
1056                        }
1057                    }
1058                }
1059
1060                break;
1061            }
1062
1063            overlay.spawn_broadcast_gc_task(broadcast_id);
1064        });
1065
1066        Ok(entry)
1067    }
1068
1069    /// Encodes next chunk of FEC broadcast
1070    fn prepare_fec_broadcast(
1071        &self,
1072        transfer: &mut OutgoingFecTransfer,
1073        key: &Arc<adnl::Key>,
1074    ) -> Result<Vec<u8>> {
1075        let chunk = transfer.encoder.encode(&mut transfer.seqno)?;
1076        let date = now();
1077
1078        let broadcast_to_sign = &make_fec_part_to_sign(
1079            &transfer.broadcast_id,
1080            transfer.encoder.params().total_len,
1081            date,
1082            BROADCAST_FLAG_ANY_SENDER,
1083            transfer.encoder.params(),
1084            &chunk,
1085            transfer.seqno,
1086            None,
1087        );
1088        let signature = key.sign(broadcast_to_sign);
1089
1090        let broadcast =
1091            proto::overlay::Broadcast::BroadcastFec(proto::overlay::OverlayBroadcastFec {
1092                src: key.full_id().as_tl(),
1093                certificate: proto::overlay::Certificate::EmptyCertificate,
1094                data_hash: &transfer.broadcast_id,
1095                data_size: transfer.encoder.params().total_len,
1096                flags: BROADCAST_FLAG_ANY_SENDER,
1097                data: &chunk,
1098                seqno: transfer.seqno,
1099                fec: *transfer.encoder.params(),
1100                date,
1101                signature: &signature,
1102            });
1103
1104        transfer.seqno += 1;
1105
1106        let mut buffer = Vec::with_capacity(self.message_prefix.len() + broadcast.max_size_hint());
1107        buffer.extend_from_slice(&self.message_prefix);
1108        broadcast.write_to(&mut buffer);
1109
1110        Ok(buffer)
1111    }
1112
1113    /// Sends ADNL messages to neighbours
1114    fn distribute_broadcast(
1115        &self,
1116        adnl: &adnl::Node,
1117        local_id: &adnl::NodeIdShort,
1118        neighbours: &[adnl::NodeIdShort],
1119        data: &[u8],
1120    ) {
1121        for peer_id in neighbours {
1122            if let Err(e) = adnl.send_custom_message(local_id, peer_id, data) {
1123                tracing::warn!(
1124                    overlay_id = %self.id,
1125                    %peer_id,
1126                    "failed to distribute broadcast: {e}"
1127                );
1128            }
1129        }
1130    }
1131
1132    fn is_broadcast_outdated(&self, date: u32) -> bool {
1133        date + (self.options.broadcast_timeout_sec as u32) < now()
1134    }
1135
1136    fn spawn_broadcast_gc_task(self: &Arc<Self>, broadcast_id: BroadcastId) {
1137        let overlay = self.clone();
1138        tokio::spawn(async move {
1139            tokio::time::sleep(Duration::from_secs(overlay.options.broadcast_timeout_sec)).await;
1140            overlay
1141                .finished_broadcast_count
1142                .fetch_add(1, Ordering::Release);
1143            overlay.finished_broadcasts.push(broadcast_id);
1144        });
1145    }
1146}
1147
1148/// Overlay broadcast target
1149#[derive(Debug, Clone)]
1150pub enum BroadcastTarget {
1151    /// Select N random peers from current neighbours
1152    RandomNeighbours,
1153    /// Explicit neighbour ids
1154    Explicit(Arc<Vec<adnl::NodeIdShort>>),
1155}
1156
1157impl Default for BroadcastTarget {
1158    fn default() -> Self {
1159        Self::RandomNeighbours
1160    }
1161}
1162
1163/// Filter for overlay peers exchange.
1164pub trait ExistingPeersFilter: Send + Sync {
1165    fn contains(&self, peer_id: &adnl::NodeIdShort) -> bool;
1166}
1167
1168impl ExistingPeersFilter for () {
1169    fn contains(&self, _: &adnl::NodeIdShort) -> bool {
1170        false
1171    }
1172}
1173
1174impl ExistingPeersFilter for bool {
1175    fn contains(&self, _: &adnl::NodeIdShort) -> bool {
1176        *self
1177    }
1178}
1179
1180impl<S> ExistingPeersFilter for std::collections::HashSet<adnl::NodeIdShort, S>
1181where
1182    S: std::hash::BuildHasher + Send + Sync,
1183{
1184    fn contains(&self, peer_id: &adnl::NodeIdShort) -> bool {
1185        std::collections::HashSet::contains(self, peer_id)
1186    }
1187}
1188
1189impl<S> ExistingPeersFilter for dashmap::DashSet<adnl::NodeIdShort, S>
1190where
1191    S: std::hash::BuildHasher + Send + Sync + Clone,
1192{
1193    fn contains(&self, peer_id: &adnl::NodeIdShort) -> bool {
1194        dashmap::DashSet::contains(self, peer_id)
1195    }
1196}
1197
1198enum OwnedBroadcastTarget {
1199    Neighbours(Vec<adnl::NodeIdShort>),
1200    Explicit(Arc<Vec<adnl::NodeIdShort>>),
1201}
1202
1203impl AsRef<[adnl::NodeIdShort]> for OwnedBroadcastTarget {
1204    fn as_ref(&self) -> &[adnl::NodeIdShort] {
1205        match self {
1206            OwnedBroadcastTarget::Neighbours(neighbours) => neighbours.as_ref(),
1207            OwnedBroadcastTarget::Explicit(neighbours) => neighbours.as_ref(),
1208        }
1209    }
1210}
1211
1212/// Instant overlay metrics
1213#[derive(Debug, Copy, Clone)]
1214pub struct OverlayMetrics {
1215    pub owned_broadcasts_len: usize,
1216    pub finished_broadcasts_len: u32,
1217    pub node_count: usize,
1218    pub known_peers: usize,
1219    pub neighbours: usize,
1220    pub received_broadcasts_data_len: usize,
1221    pub received_broadcasts_barrier_count: usize,
1222}
1223
1224fn process_fec_broadcast(
1225    decoder: &mut RaptorQDecoder,
1226    broadcast: BroadcastFec,
1227) -> Result<Option<Vec<u8>>> {
1228    let broadcast_id = &broadcast.data_hash;
1229
1230    let broadcast_to_sign = &make_fec_part_to_sign(
1231        broadcast_id,
1232        broadcast.data_size,
1233        broadcast.date,
1234        broadcast.flags,
1235        &broadcast.fec_type,
1236        &broadcast.data,
1237        broadcast.seqno,
1238        if broadcast.flags & BROADCAST_FLAG_ANY_SENDER == 0 {
1239            Some(broadcast.node_id.compute_short_id())
1240        } else {
1241            None
1242        },
1243    );
1244    broadcast
1245        .node_id
1246        .verify(broadcast_to_sign, &broadcast.signature)?;
1247
1248    match decoder.decode(broadcast.seqno, broadcast.data) {
1249        Some(result) if result.len() != broadcast.data_size as usize => {
1250            Err(OverlayError::DataSizeMismatch.into())
1251        }
1252        Some(result) => match compression::decompress(&result) {
1253            Some(decompressed)
1254                if sha2::Sha256::digest(&decompressed).as_slice() == broadcast_id =>
1255            {
1256                Ok(Some(decompressed))
1257            }
1258            _ => {
1259                let data_hash = sha2::Sha256::digest(&result);
1260                if data_hash.as_slice() == broadcast_id {
1261                    Ok(Some(result))
1262                } else {
1263                    Err(OverlayError::DataHashMismatch.into())
1264                }
1265            }
1266        },
1267        None => Ok(None),
1268    }
1269}
1270
1271#[derive(TlWrite)]
1272#[tl(boxed, id = "overlay.broadcast.toSign", scheme = "scheme.tl")]
1273struct OverlayBroadcastToSign {
1274    hash: [u8; 32],
1275    date: u32,
1276}
1277
1278impl OverlayBroadcastToSign {
1279    fn compute_broadcast_id(&self) -> BroadcastId {
1280        tl_proto::hash(self)
1281    }
1282}
1283
1284fn make_broadcast_to_sign(
1285    data: &[u8],
1286    date: u32,
1287    source: Option<&adnl::NodeIdShort>,
1288) -> OverlayBroadcastToSign {
1289    const BROADCAST_ID: u32 = tl_proto::id!("overlay.broadcast.id", scheme = "scheme.tl");
1290
1291    let mut broadcast_hash = sha2::Sha256::new();
1292    broadcast_hash.update(BROADCAST_ID.to_le_bytes());
1293    broadcast_hash.update(source.map(adnl::NodeIdShort::as_slice).unwrap_or(&[0; 32]));
1294    broadcast_hash.update(sha2::Sha256::digest(data).as_slice());
1295    broadcast_hash.update(BROADCAST_FLAG_ANY_SENDER.to_le_bytes());
1296    let broadcast_hash = broadcast_hash.finalize();
1297
1298    OverlayBroadcastToSign {
1299        hash: broadcast_hash.into(),
1300        date,
1301    }
1302}
1303
1304fn make_fec_part_to_sign(
1305    data_hash: &[u8; 32],
1306    data_size: u32,
1307    date: u32,
1308    flags: u32,
1309    params: &proto::rldp::RaptorQFecType,
1310    part: &[u8],
1311    seqno: u32,
1312    source: Option<adnl::NodeIdShort>,
1313) -> OverlayBroadcastToSign {
1314    const BROADCAST_FEC_ID: u32 = tl_proto::id!("overlay.broadcastFec.id", scheme = "scheme.tl");
1315    const BROADCAST_FEC_PART_ID: u32 =
1316        tl_proto::id!("overlay.broadcastFec.partId", scheme = "scheme.tl");
1317
1318    let mut broadcast_hash = sha2::Sha256::new();
1319    broadcast_hash.update(BROADCAST_FEC_ID.to_le_bytes());
1320    broadcast_hash.update(
1321        source
1322            .as_ref()
1323            .map(adnl::NodeIdShort::as_slice)
1324            .unwrap_or(&[0; 32]),
1325    );
1326    broadcast_hash.update(tl_proto::hash(params));
1327    broadcast_hash.update(data_hash);
1328    broadcast_hash.update(data_size.to_le_bytes());
1329    broadcast_hash.update(flags.to_le_bytes());
1330    let broadcast_hash = broadcast_hash.finalize();
1331
1332    let mut part_hash = sha2::Sha256::new();
1333    part_hash.update(BROADCAST_FEC_PART_ID.to_le_bytes());
1334    part_hash.update(broadcast_hash);
1335    part_hash.update(sha2::Sha256::digest(part).as_slice());
1336    part_hash.update(seqno.to_le_bytes());
1337    let part_hash = part_hash.finalize();
1338
1339    OverlayBroadcastToSign {
1340        hash: part_hash.into(),
1341        date,
1342    }
1343}
1344
1345/// Received overlay broadcast
1346pub struct IncomingBroadcastInfo {
1347    pub packets: u32,
1348    pub data: Vec<u8>,
1349    pub from: adnl::NodeIdShort,
1350}
1351
1352/// Sent overlay broadcast info
1353#[derive(Default, Copy, Clone)]
1354pub struct OutgoingBroadcastInfo {
1355    pub packets: u32,
1356    pub recipient_count: usize,
1357}
1358
1359struct IncomingFecTransfer {
1360    completed: AtomicBool,
1361    history: PacketsHistory,
1362    broadcast_tx: BroadcastFecTx,
1363    source: adnl::NodeIdShort,
1364    updated_at: UpdatedAt,
1365}
1366
1367struct OutgoingFecTransfer {
1368    broadcast_id: BroadcastId,
1369    encoder: RaptorQEncoder,
1370    seqno: u32,
1371}
1372
1373enum OwnedBroadcast {
1374    Other,
1375    Incoming(IncomingFecTransfer),
1376}
1377
1378#[derive(Debug)]
1379struct BroadcastFec {
1380    node_id: adnl::NodeIdFull,
1381    data_hash: BroadcastId,
1382    data_size: u32,
1383    flags: u32,
1384    data: Vec<u8>,
1385    seqno: u32,
1386    fec_type: proto::rldp::RaptorQFecType,
1387    date: u32,
1388    signature: [u8; 64],
1389}
1390
1391type VacantBroadcastEntry<'a> =
1392    dashmap::mapref::entry::VacantEntry<'a, BroadcastId, Arc<OwnedBroadcast>, FastHasherState>;
1393
1394/// Type alias for received nodes
1395pub type ReceivedPeersMap =
1396    FastHashMap<HashWrapper<everscale_crypto::tl::PublicKeyOwned>, proto::overlay::NodeOwned>;
1397
1398type BroadcastFecTx = mpsc::UnboundedSender<BroadcastFec>;
1399
1400#[derive(Copy, Clone)]
1401pub struct DisplayBroadcastId<'a>(pub &'a BroadcastId);
1402
1403impl std::fmt::Display for DisplayBroadcastId<'_> {
1404    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1405        let mut output = [0u8; 64];
1406        hex::encode_to_slice(self.0, &mut output).ok();
1407
1408        // SAFETY: output is guaranteed to contain only [0-9a-f]
1409        let output = unsafe { std::str::from_utf8_unchecked(&output) };
1410        f.write_str(output)
1411    }
1412}
1413
1414type BroadcastId = [u8; 32];
1415
1416#[derive(thiserror::Error, Debug)]
1417enum OverlayError {
1418    #[error("Unsupported signature")]
1419    UnsupportedSignature,
1420    #[error("Data size mismatch")]
1421    DataSizeMismatch,
1422    #[error("Data hash mismatch")]
1423    DataHashMismatch,
1424}
1425
1426const BROADCAST_FLAG_ANY_SENDER: u32 = 1; // Any sender