use std::convert::TryFrom;
use std::hash::BuildHasherDefault;
use std::net::SocketAddrV4;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use crossbeam_queue::SegQueue;
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
use sha2::Digest;
use smallvec::SmallVec;
use tl_proto::{HashWrapper, TlWrite};
use tokio::sync::mpsc;
use super::overlay_id::IdShort;
use super::{broadcast_receiver::*, MAX_OVERLAY_PEERS};
use crate::adnl;
use crate::proto;
use crate::rldp::{self, compression, RaptorQDecoder, RaptorQEncoder};
use crate::util::*;
#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize)]
#[serde(default)]
pub struct OverlayOptions {
pub max_neighbours: u32,
pub max_broadcast_log: u32,
pub broadcast_gc_interval_ms: u64,
pub overlay_peers_timeout_ms: u64,
pub max_ordinary_broadcast_len: usize,
pub broadcast_target_count: u32,
pub secondary_broadcast_target_count: u32,
pub secondary_fec_broadcast_target_count: u32,
pub fec_broadcast_wave_len: usize,
pub fec_broadcast_wave_interval_ms: u64,
pub broadcast_timeout_sec: u64,
pub force_compression: bool,
}
impl Default for OverlayOptions {
fn default() -> Self {
Self {
max_neighbours: 200,
max_broadcast_log: 1000,
broadcast_gc_interval_ms: 1000,
overlay_peers_timeout_ms: 60000,
max_ordinary_broadcast_len: 768,
broadcast_target_count: 5,
secondary_broadcast_target_count: 3,
secondary_fec_broadcast_target_count: 3,
fec_broadcast_wave_len: 20,
fec_broadcast_wave_interval_ms: 10,
broadcast_timeout_sec: 60,
force_compression: false,
}
}
}
pub struct Overlay {
id: IdShort,
node_key: Arc<adnl::Key>,
options: OverlayOptions,
owned_broadcasts: FxDashMap<BroadcastId, Arc<OwnedBroadcast>>,
finished_broadcasts: SegQueue<BroadcastId>,
finished_broadcast_count: AtomicU32,
received_peers: Arc<Mutex<ReceivedPeersMap>>,
received_broadcasts: Arc<BroadcastReceiver<IncomingBroadcastInfo>>,
nodes: FxDashMap<adnl::NodeIdShort, proto::overlay::NodeOwned>,
ignored_peers: FxDashSet<adnl::NodeIdShort>,
known_peers: adnl::PeersSet,
neighbours: adnl::PeersSet,
query_prefix: Vec<u8>,
message_prefix: Vec<u8>,
}
impl Overlay {
pub(super) fn new(
node_key: Arc<adnl::Key>,
id: IdShort,
peers: &[adnl::NodeIdShort],
options: OverlayOptions,
) -> Arc<Self> {
let query_prefix = tl_proto::serialize(proto::rpc::OverlayQuery {
overlay: id.as_slice(),
});
let message_prefix = tl_proto::serialize(proto::overlay::Message {
overlay: id.as_slice(),
});
let known_peers = adnl::PeersSet::with_peers_and_capacity(peers, MAX_OVERLAY_PEERS);
let overlay = Arc::new(Self {
id,
node_key,
options,
owned_broadcasts: FxDashMap::default(),
finished_broadcasts: SegQueue::new(),
finished_broadcast_count: AtomicU32::new(0),
received_peers: Arc::new(Default::default()),
received_broadcasts: Arc::new(BroadcastReceiver::default()),
nodes: FxDashMap::default(),
ignored_peers: FxDashSet::default(),
known_peers,
neighbours: adnl::PeersSet::with_capacity(options.max_neighbours),
query_prefix,
message_prefix,
});
if !peers.is_empty() {
overlay.update_neighbours(overlay.options.max_neighbours);
}
let overlay_ref = Arc::downgrade(&overlay);
let gc_interval = Duration::from_millis(options.broadcast_gc_interval_ms);
tokio::spawn(async move {
let mut peers_timeout = 0;
while let Some(overlay) = overlay_ref.upgrade() {
while overlay.finished_broadcast_count.load(Ordering::Acquire)
> options.max_broadcast_log
{
if let Some(broadcast_id) = overlay.finished_broadcasts.pop() {
overlay.owned_broadcasts.remove(&broadcast_id);
}
overlay
.finished_broadcast_count
.fetch_sub(1, Ordering::Release);
}
peers_timeout += options.broadcast_gc_interval_ms;
if peers_timeout > options.overlay_peers_timeout_ms {
overlay.update_neighbours(1);
peers_timeout = 0;
}
tokio::time::sleep(gc_interval).await;
}
});
overlay
}
#[inline(always)]
pub fn options(&self) -> &OverlayOptions {
&self.options
}
pub fn metrics(&self) -> OverlayMetrics {
OverlayMetrics {
owned_broadcasts_len: self.owned_broadcasts.len(),
finished_broadcasts_len: self.finished_broadcast_count.load(Ordering::Acquire),
node_count: self.nodes.len(),
known_peers: self.known_peers.len(),
neighbours: self.neighbours.len(),
received_broadcasts_data_len: self.received_broadcasts.data_len(),
received_broadcasts_barrier_count: self.received_broadcasts.barriers_len(),
}
}
pub fn id(&self) -> &IdShort {
&self.id
}
pub fn overlay_key(&self) -> &Arc<adnl::Key> {
&self.node_key
}
pub fn add_public_peer(
&self,
adnl: &adnl::Node,
addr: SocketAddrV4,
node: proto::overlay::Node<'_>,
) -> Result<Option<adnl::NodeIdShort>> {
if let Err(e) = self.id.verify_overlay_node(&node) {
tracing::warn!(overlay_id = %self.id, %addr, "invalid public overlay node: {e:?}");
return Ok(None);
}
let peer_id_full = adnl::NodeIdFull::try_from(node.id)?;
let peer_id = peer_id_full.compute_short_id();
let is_new_peer = adnl.add_peer(
adnl::NewPeerContext::PublicOverlay,
self.overlay_key().id(),
&peer_id,
addr,
peer_id_full,
)?;
if is_new_peer {
self.insert_public_peer(&peer_id, node);
Ok(Some(peer_id))
} else {
Ok(None)
}
}
pub fn add_public_peers<'a, I>(
&self,
adnl: &adnl::Node,
nodes: I,
) -> Result<Vec<adnl::NodeIdShort>>
where
I: IntoIterator<Item = (SocketAddrV4, proto::overlay::Node<'a>)>,
{
let local_id = self.overlay_key().id();
let mut result = Vec::new();
for (addr, node) in nodes {
if let Err(e) = self.id.verify_overlay_node(&node) {
tracing::warn!(overlay_id = %self.id, %addr, "invalid public overlay node: {e:?}");
continue;
}
let peer_id_full = adnl::NodeIdFull::try_from(node.id)?;
let peer_id = peer_id_full.compute_short_id();
let is_new_peer = adnl.add_peer(
adnl::NewPeerContext::PublicOverlay,
local_id,
&peer_id,
addr,
peer_id_full,
)?;
if is_new_peer {
self.insert_public_peer(&peer_id, node);
result.push(peer_id);
tracing::trace!(overlay_id = %self.id, %peer_id, %addr, "new public peer");
}
}
Ok(result)
}
pub fn remove_public_peer(&self, peer_id: &adnl::NodeIdShort) -> bool {
if !self.ignored_peers.insert(*peer_id) {
return false;
}
tracing::warn!(overlay_id = %self.id, %peer_id, "removing public overlay peer");
if self.neighbours.contains(peer_id) {
self.update_neighbours(self.options.max_neighbours);
}
true
}
pub fn is_known_peer(&self, peer_id: &adnl::NodeIdShort) -> bool {
self.known_peers.contains(peer_id)
}
pub fn is_active_public_peer(&self, peer_id: &adnl::NodeIdShort) -> bool {
self.known_peers.contains(peer_id) && !self.ignored_peers.contains(peer_id)
}
pub fn write_cached_peers(&self, amount: u32, dst: &adnl::PeersSet) {
dst.randomly_fill_from(&self.known_peers, amount, Some(&self.ignored_peers));
}
#[inline(always)]
pub fn query_prefix(&self) -> &[u8] {
&self.query_prefix
}
#[inline(always)]
pub fn message_prefix(&self) -> &[u8] {
&self.message_prefix
}
pub fn send_message(
&self,
adnl: &adnl::Node,
peer_id: &adnl::NodeIdShort,
data: &[u8],
) -> Result<()> {
let local_id = self.overlay_key().id();
let mut buffer = Vec::with_capacity(self.message_prefix().len() + data.len());
buffer.extend_from_slice(self.message_prefix());
buffer.extend_from_slice(data);
adnl.send_custom_message(local_id, peer_id, &buffer)
}
pub async fn adnl_query<Q>(
&self,
adnl: &adnl::Node,
peer_id: &adnl::NodeIdShort,
query: Q,
timeout: Option<u64>,
) -> Result<Option<Vec<u8>>>
where
Q: TlWrite,
{
let local_id = self.overlay_key().id();
type Value = tl_proto::OwnedRawBytes<tl_proto::Boxed>;
match adnl
.query_with_prefix::<Q, Value>(local_id, peer_id, self.query_prefix(), query, timeout)
.await?
{
Some(answer) => Ok(Some(answer.into_inner())),
None => Ok(None),
}
}
pub async fn rldp_query<Q>(
&self,
rldp: &rldp::Node,
peer_id: &adnl::NodeIdShort,
query: Q,
roundtrip: Option<u64>,
) -> Result<(Option<Vec<u8>>, u64)>
where
Q: TlWrite,
{
let local_id = self.overlay_key().id();
let prefix = self.query_prefix();
let mut query_data = Vec::with_capacity(prefix.len() + query.max_size_hint());
query_data.extend_from_slice(prefix);
query.write_to(&mut query_data);
rldp.query(local_id, peer_id, query_data, roundtrip).await
}
pub fn broadcast(
self: &Arc<Self>,
adnl: &Arc<adnl::Node>,
data: Vec<u8>,
source: Option<&Arc<adnl::Key>>,
target: BroadcastTarget,
) -> OutgoingBroadcastInfo {
let local_id = self.overlay_key().id();
let key = match source {
Some(key) => key,
None => &self.node_key,
};
if data.len() <= self.options.max_ordinary_broadcast_len {
self.send_broadcast(adnl, local_id, data, key, target)
} else {
self.send_fec_broadcast(adnl, local_id, data, key, target)
}
}
pub async fn wait_for_broadcast(&self) -> IncomingBroadcastInfo {
self.received_broadcasts.pop().await
}
pub fn take_new_peers(&self) -> ReceivedPeersMap {
let mut peers = self.received_peers.lock();
std::mem::take(&mut *peers)
}
pub fn sign_local_node(&self) -> proto::overlay::NodeOwned {
let key = self.overlay_key();
let version = now();
let node_to_sign = proto::overlay::NodeToSign {
id: key.id().as_slice(),
overlay: self.id().as_slice(),
version,
};
let signature = key.sign(&node_to_sign);
proto::overlay::NodeOwned {
id: key.full_id().as_tl().as_equivalent_owned(),
overlay: *self.id().as_slice(),
version,
signature: signature.to_vec().into(),
}
}
pub async fn exchange_random_peers(
&self,
adnl: &adnl::Node,
peer_id: &adnl::NodeIdShort,
existing_peers: &FxDashSet<adnl::NodeIdShort>,
timeout: Option<u64>,
) -> Result<Option<Vec<adnl::NodeIdShort>>> {
let query = proto::rpc::OverlayGetRandomPeersOwned {
peers: self.prepare_random_peers(),
};
let answer = match self.adnl_query(adnl, peer_id, query, timeout).await? {
Some(answer) => answer,
None => {
tracing::trace!(overlay_id = %self.id, %peer_id, "no random peers found");
return Ok(None);
}
};
let answer = tl_proto::deserialize_as_boxed(&answer)?;
tracing::trace!(overlay_id = %self.id, %peer_id, "got random peers");
let proto::overlay::Nodes { nodes } = self.filter_nodes(answer);
let nodes = nodes
.into_iter()
.filter_map(|node| match adnl::NodeIdFull::try_from(node.id) {
Ok(full_id) => {
let peer_id = full_id.compute_short_id();
if !existing_peers.contains(&peer_id) {
Some(peer_id)
} else {
None
}
}
Err(e) => {
tracing::warn!(overlay_id = %self.id, %peer_id, "failed to process peer: {e}");
None
}
})
.collect();
Ok(Some(nodes))
}
pub(super) async fn receive_broadcast(
self: &Arc<Self>,
adnl: &adnl::Node,
local_id: &adnl::NodeIdShort,
peer_id: &adnl::NodeIdShort,
broadcast: proto::overlay::OverlayBroadcast<'_>,
raw_data: &[u8],
) -> Result<()> {
if self.is_broadcast_outdated(broadcast.date) {
return Ok(());
}
let node_id = adnl::NodeIdFull::try_from(broadcast.src)?;
let node_peer_id = node_id.compute_short_id();
let source = match broadcast.flags {
flags if flags & BROADCAST_FLAG_ANY_SENDER == 0 => Some(node_peer_id),
_ => None,
};
let broadcast_data = match compression::decompress(broadcast.data) {
Some(decompressed) => {
let broadcast_to_sign =
make_broadcast_to_sign(&decompressed, broadcast.date, source.as_ref());
match node_id.verify(&broadcast_to_sign, broadcast.signature) {
Ok(()) => {
let broadcast_id = broadcast_to_sign.compute_broadcast_id();
if !self.create_broadcast(broadcast_id) {
return Ok(());
}
Some((broadcast_id, decompressed))
}
Err(_) => None,
}
}
None => None,
};
let (broadcast_id, data) = match broadcast_data {
Some((id, data)) => (id, data),
None => {
let broadcast_to_sign =
make_broadcast_to_sign(broadcast.data, broadcast.date, source.as_ref());
node_id.verify(&broadcast_to_sign, broadcast.signature)?;
let broadcast_id = broadcast_to_sign.compute_broadcast_id();
if !self.create_broadcast(broadcast_id) {
return Ok(());
}
(broadcast_id, broadcast.data.to_vec())
}
};
self.received_broadcasts.push(IncomingBroadcastInfo {
packets: 1,
data,
from: node_peer_id,
});
let neighbours = self
.neighbours
.get_random_peers(self.options.secondary_broadcast_target_count, Some(peer_id));
self.distribute_broadcast(adnl, local_id, &neighbours, raw_data);
self.spawn_broadcast_gc_task(broadcast_id);
Ok(())
}
pub(super) async fn receive_fec_broadcast(
self: &Arc<Self>,
adnl: &adnl::Node,
local_id: &adnl::NodeIdShort,
peer_id: &adnl::NodeIdShort,
broadcast: proto::overlay::OverlayBroadcastFec<'_>,
raw_data: &[u8],
) -> Result<()> {
use dashmap::mapref::entry::Entry;
if self.is_broadcast_outdated(broadcast.date) {
return Ok(());
}
let broadcast_id = *broadcast.data_hash;
let node_id = adnl::NodeIdFull::try_from(broadcast.src)?;
let source = node_id.compute_short_id();
let signature = match broadcast.signature.len() {
64 => broadcast.signature.try_into().unwrap(),
_ => return Err(OverlayError::UnsupportedSignature.into()),
};
let transfer = match self.owned_broadcasts.entry(broadcast_id) {
Entry::Vacant(entry) => {
self.spawn_fec_transfer_receiver(broadcast.fec, broadcast_id, source, entry)?
}
Entry::Occupied(entry) => entry.get().clone(),
};
let transfer = match transfer.as_ref() {
OwnedBroadcast::Incoming(transfer) => transfer,
OwnedBroadcast::Other => return Ok(()),
};
transfer.updated_at.refresh();
if transfer.source != source {
tracing::trace!(
overlay_id = %self.id,
broadcast_id = %DisplayBroadcastId(&broadcast_id),
"same broadcast but parts from different sources"
);
return Ok(());
}
if !transfer.history.deliver_packet(broadcast.seqno as u64) {
return Ok(());
}
if !transfer.completed.load(Ordering::Acquire) {
transfer.broadcast_tx.send(BroadcastFec {
node_id,
data_hash: broadcast_id,
data_size: broadcast.data_size,
flags: broadcast.flags,
data: broadcast.data.to_vec(),
seqno: broadcast.seqno,
fec_type: broadcast.fec,
date: broadcast.date,
signature,
})?;
}
let neighbours = self.neighbours.get_random_peers(
self.options.secondary_fec_broadcast_target_count,
Some(peer_id),
);
self.distribute_broadcast(adnl, local_id, &neighbours, raw_data);
Ok(())
}
pub(super) fn process_get_random_peers(
&self,
query: proto::rpc::OverlayGetRandomPeers<'_>,
) -> proto::overlay::NodesOwned {
use std::collections::hash_map::Entry;
let peers = self.filter_nodes(query.peers).nodes;
let mut received_peers = self.received_peers.lock();
for node in peers {
match received_peers.entry(HashWrapper(node.id.as_equivalent_owned())) {
Entry::Occupied(mut entry) => {
if entry.get().version < node.version {
entry.insert(node.as_equivalent_owned());
}
}
Entry::Vacant(entry) => {
entry.insert(node.as_equivalent_owned());
}
}
}
drop(received_peers);
self.prepare_random_peers()
}
fn send_broadcast(
self: &Arc<Self>,
adnl: &adnl::Node,
local_id: &adnl::NodeIdShort,
mut data: Vec<u8>,
key: &Arc<adnl::Key>,
target: BroadcastTarget,
) -> OutgoingBroadcastInfo {
let date = now();
let broadcast_to_sign = make_broadcast_to_sign(&data, date, None);
let broadcast_id = broadcast_to_sign.compute_broadcast_id();
if !self.create_broadcast(broadcast_id) {
tracing::warn!(
overlay_id = %self.id,
broadcast_id = %DisplayBroadcastId(&broadcast_id),
"trying to send duplicated broadcast"
);
return Default::default();
}
let signature = key.sign(broadcast_to_sign);
if self.options.force_compression {
if let Err(e) = compression::compress(&mut data) {
tracing::warn!(
overlay_id = %self.id,
broadcast_id = %DisplayBroadcastId(&broadcast_id),
"failed to compress overlay broadcast: {e:?}"
);
}
}
let broadcast = proto::overlay::Broadcast::Broadcast(proto::overlay::OverlayBroadcast {
src: key.full_id().as_tl(),
certificate: proto::overlay::Certificate::EmptyCertificate,
flags: BROADCAST_FLAG_ANY_SENDER,
data: &data,
date,
signature: &signature,
});
let mut buffer = Vec::with_capacity(self.message_prefix.len() + broadcast.max_size_hint());
buffer.extend_from_slice(&self.message_prefix);
broadcast.write_to(&mut buffer);
drop(data);
let neighbours = match target {
BroadcastTarget::RandomNeighbours => OwnedBroadcastTarget::Neighbours(
self.neighbours
.get_random_peers(self.options.broadcast_target_count, None),
),
BroadcastTarget::Explicit(neighbours) => OwnedBroadcastTarget::Explicit(neighbours),
};
self.distribute_broadcast(adnl, local_id, neighbours.as_ref(), &buffer);
self.spawn_broadcast_gc_task(broadcast_id);
OutgoingBroadcastInfo {
packets: 1,
recipient_count: neighbours.as_ref().len(),
}
}
fn send_fec_broadcast(
self: &Arc<Self>,
adnl: &Arc<adnl::Node>,
local_id: &adnl::NodeIdShort,
mut data: Vec<u8>,
key: &Arc<adnl::Key>,
target: BroadcastTarget,
) -> OutgoingBroadcastInfo {
let broadcast_id = sha2::Sha256::digest(&data).into();
if !self.create_broadcast(broadcast_id) {
tracing::warn!(
overlay_id = %self.id,
broadcast_id = %DisplayBroadcastId(&broadcast_id),
"trying to send duplicated broadcast",
);
return Default::default();
}
if self.options.force_compression {
if let Err(e) = compression::compress(&mut data) {
tracing::warn!(
overlay_id = %self.id,
broadcast_id = %DisplayBroadcastId(&broadcast_id),
"failed to compress overlay FEC broadcast: {e:?}"
);
}
}
let data_size = data.len() as u32;
let mut outgoing_transfer = OutgoingFecTransfer {
broadcast_id,
encoder: RaptorQEncoder::with_data(&data),
seqno: 0,
};
drop(data);
let neighbours = match target {
BroadcastTarget::RandomNeighbours => OwnedBroadcastTarget::Neighbours(
self.neighbours
.get_random_peers(self.options.broadcast_target_count, None),
),
BroadcastTarget::Explicit(neighbours) => OwnedBroadcastTarget::Explicit(neighbours),
};
let info = OutgoingBroadcastInfo {
packets: (data_size / outgoing_transfer.encoder.params().packet_len + 1) * 3 / 2,
recipient_count: neighbours.as_ref().len(),
};
let wave_len = self.options.fec_broadcast_wave_len;
let waves_interval = Duration::from_millis(self.options.fec_broadcast_wave_interval_ms);
let overlay = self.clone();
let adnl = adnl.clone();
let local_id = *local_id;
let key = key.clone();
tokio::spawn(async move {
'outer: while outgoing_transfer.seqno <= info.packets {
for _ in 0..wave_len {
let data = match overlay.prepare_fec_broadcast(&mut outgoing_transfer, &key) {
Ok(data) => data,
Err(e) => {
tracing::warn!(
overlay_id = %overlay.id,
broadcast_id = %DisplayBroadcastId(&broadcast_id),
"failed to send overlay broadcast: {e}"
);
break 'outer;
}
};
overlay.distribute_broadcast(&adnl, &local_id, neighbours.as_ref(), &data);
if outgoing_transfer.seqno > info.packets {
break 'outer;
}
}
tokio::time::sleep(waves_interval).await;
}
});
self.spawn_broadcast_gc_task(broadcast_id);
info
}
fn filter_nodes<'a>(&self, mut nodes: proto::overlay::Nodes<'a>) -> proto::overlay::Nodes<'a> {
nodes.nodes.retain(|node| {
if !matches!(
node.id,
everscale_crypto::tl::PublicKey::Ed25519 { key }
if key != self.node_key.full_id().public_key().as_bytes()
) {
return false;
}
if let Err(e) = self.id.verify_overlay_node(node) {
tracing::warn!(overlay_id = %self.id, "invalid overlay node: {e:?}");
return false;
}
true
});
nodes
}
fn prepare_random_peers(&self) -> proto::overlay::NodesOwned {
const MAX_PEERS_IN_RESPONSE: u32 = 4;
let mut nodes = SmallVec::with_capacity(MAX_PEERS_IN_RESPONSE as usize + 1);
nodes.push(self.sign_local_node());
let peers = adnl::PeersSet::with_capacity(MAX_PEERS_IN_RESPONSE);
peers.randomly_fill_from(&self.neighbours, MAX_PEERS_IN_RESPONSE, None);
for peer_id in &peers {
if let Some(node) = self.nodes.get(peer_id) {
nodes.push(node.clone());
}
}
proto::overlay::NodesOwned { nodes }
}
fn update_neighbours(&self, amount: u32) {
tracing::trace!(overlay_id = %self.id, amount, "updating neighbours");
self.neighbours
.randomly_fill_from(&self.known_peers, amount, Some(&self.ignored_peers));
}
fn insert_public_peer(&self, peer_id: &adnl::NodeIdShort, node: proto::overlay::Node<'_>) {
use dashmap::mapref::entry::Entry;
self.ignored_peers.remove(peer_id);
self.known_peers.insert(*peer_id);
if !self.neighbours.is_full() {
self.neighbours.insert(*peer_id);
}
match self.nodes.entry(*peer_id) {
Entry::Occupied(mut entry) => {
if entry.get().version < node.version {
entry.insert(node.as_equivalent_owned());
}
}
Entry::Vacant(entry) => {
entry.insert(node.as_equivalent_owned());
}
}
}
fn create_broadcast(&self, broadcast_id: BroadcastId) -> bool {
use dashmap::mapref::entry::Entry;
match self.owned_broadcasts.entry(broadcast_id) {
Entry::Vacant(entry) => {
entry.insert(Arc::new(OwnedBroadcast::Other));
true
}
Entry::Occupied(_) => false,
}
}
fn spawn_fec_transfer_receiver(
self: &Arc<Self>,
fec_type: proto::rldp::RaptorQFecType,
broadcast_id: BroadcastId,
peer_id: adnl::NodeIdShort,
entry: VacantBroadcastEntry<'_>,
) -> Result<Arc<OwnedBroadcast>> {
let (broadcast_tx, mut broadcast_rx) = mpsc::unbounded_channel();
let entry = entry
.insert(Arc::new(OwnedBroadcast::Incoming(IncomingFecTransfer {
completed: AtomicBool::new(false),
history: PacketsHistory::for_recv(),
broadcast_tx,
source: peer_id,
updated_at: Default::default(),
})))
.clone();
let overlay = self.clone();
tokio::spawn(async move {
let mut decoder = RaptorQDecoder::with_params(fec_type);
let mut packets = 0;
while let Some(broadcast) = broadcast_rx.recv().await {
packets += 1;
match process_fec_broadcast(&mut decoder, broadcast) {
Ok(Some(data)) => {
let data = IncomingBroadcastInfo {
packets,
data,
from: peer_id,
};
overlay.received_broadcasts.push(data);
break;
}
Ok(None) => continue,
Err(e) => {
tracing::warn!(
overlay_id = %overlay.id,
broadcast_id = %DisplayBroadcastId(&broadcast_id),
"error when receiving overlay broadcast: {e}"
);
break;
}
}
}
if let Some(broadcast) = overlay.owned_broadcasts.get(&broadcast_id) {
match broadcast.value().as_ref() {
OwnedBroadcast::Incoming(transfer) => {
transfer.completed.store(true, Ordering::Release);
}
_ => {
tracing::error!(
overlay_id = %overlay.id,
broadcast_id = %DisplayBroadcastId(&broadcast_id),
"incoming fec broadcast mismatch"
);
}
}
}
});
let overlay = self.clone();
let broadcast_timeout_sec = self.options.broadcast_timeout_sec;
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(broadcast_timeout_sec * 100)).await;
if let Some(broadcast) = overlay.owned_broadcasts.get(&broadcast_id) {
match broadcast.value().as_ref() {
OwnedBroadcast::Incoming(transfer)
if !transfer.completed.load(Ordering::Acquire)
&& !transfer.updated_at.is_expired(broadcast_timeout_sec) =>
{
continue
}
OwnedBroadcast::Incoming(_) => {}
_ => {
tracing::error!(
overlay_id = %overlay.id,
broadcast_id = %DisplayBroadcastId(&broadcast_id),
"incoming fec broadcast mismatch"
);
}
}
}
break;
}
overlay.spawn_broadcast_gc_task(broadcast_id);
});
Ok(entry)
}
fn prepare_fec_broadcast(
&self,
transfer: &mut OutgoingFecTransfer,
key: &Arc<adnl::Key>,
) -> Result<Vec<u8>> {
let chunk = transfer.encoder.encode(&mut transfer.seqno)?;
let date = now();
let broadcast_to_sign = make_fec_part_to_sign(
&transfer.broadcast_id,
transfer.encoder.params().total_len,
date,
BROADCAST_FLAG_ANY_SENDER,
transfer.encoder.params(),
&chunk,
transfer.seqno,
None,
);
let signature = key.sign(&broadcast_to_sign);
let broadcast =
proto::overlay::Broadcast::BroadcastFec(proto::overlay::OverlayBroadcastFec {
src: key.full_id().as_tl(),
certificate: proto::overlay::Certificate::EmptyCertificate,
data_hash: &transfer.broadcast_id,
data_size: transfer.encoder.params().total_len,
flags: BROADCAST_FLAG_ANY_SENDER,
data: &chunk,
seqno: transfer.seqno,
fec: *transfer.encoder.params(),
date,
signature: &signature,
});
transfer.seqno += 1;
let mut buffer = Vec::with_capacity(self.message_prefix.len() + broadcast.max_size_hint());
buffer.extend_from_slice(&self.message_prefix);
broadcast.write_to(&mut buffer);
Ok(buffer)
}
fn distribute_broadcast(
&self,
adnl: &adnl::Node,
local_id: &adnl::NodeIdShort,
neighbours: &[adnl::NodeIdShort],
data: &[u8],
) {
for peer_id in neighbours {
if let Err(e) = adnl.send_custom_message(local_id, peer_id, data) {
tracing::warn!(
overlay_id = %self.id,
%peer_id,
"failed to distribute broadcast: {e}"
);
}
}
}
fn is_broadcast_outdated(&self, date: u32) -> bool {
date + (self.options.broadcast_timeout_sec as u32) < now()
}
fn spawn_broadcast_gc_task(self: &Arc<Self>, broadcast_id: BroadcastId) {
let overlay = self.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(overlay.options.broadcast_timeout_sec)).await;
overlay
.finished_broadcast_count
.fetch_add(1, Ordering::Release);
overlay.finished_broadcasts.push(broadcast_id);
});
}
}
#[derive(Debug, Clone)]
pub enum BroadcastTarget {
RandomNeighbours,
Explicit(Arc<Vec<adnl::NodeIdShort>>),
}
impl Default for BroadcastTarget {
fn default() -> Self {
Self::RandomNeighbours
}
}
enum OwnedBroadcastTarget {
Neighbours(Vec<adnl::NodeIdShort>),
Explicit(Arc<Vec<adnl::NodeIdShort>>),
}
impl AsRef<[adnl::NodeIdShort]> for OwnedBroadcastTarget {
fn as_ref(&self) -> &[adnl::NodeIdShort] {
match self {
OwnedBroadcastTarget::Neighbours(neighbours) => neighbours.as_ref(),
OwnedBroadcastTarget::Explicit(neighbours) => neighbours.as_ref(),
}
}
}
#[derive(Debug, Copy, Clone)]
pub struct OverlayMetrics {
pub owned_broadcasts_len: usize,
pub finished_broadcasts_len: u32,
pub node_count: usize,
pub known_peers: usize,
pub neighbours: usize,
pub received_broadcasts_data_len: usize,
pub received_broadcasts_barrier_count: usize,
}
fn process_fec_broadcast(
decoder: &mut RaptorQDecoder,
broadcast: BroadcastFec,
) -> Result<Option<Vec<u8>>> {
let broadcast_id = &broadcast.data_hash;
let broadcast_to_sign = make_fec_part_to_sign(
broadcast_id,
broadcast.data_size,
broadcast.date,
broadcast.flags,
&broadcast.fec_type,
&broadcast.data,
broadcast.seqno,
if broadcast.flags & BROADCAST_FLAG_ANY_SENDER == 0 {
Some(broadcast.node_id.compute_short_id())
} else {
None
},
);
broadcast
.node_id
.verify(&broadcast_to_sign, &broadcast.signature)?;
match decoder.decode(broadcast.seqno as u32, broadcast.data) {
Some(result) if result.len() != broadcast.data_size as usize => {
Err(OverlayError::DataSizeMismatch.into())
}
Some(result) => match compression::decompress(&result) {
Some(decompressed)
if sha2::Sha256::digest(&decompressed).as_slice() == broadcast_id =>
{
Ok(Some(decompressed))
}
_ => {
let data_hash = sha2::Sha256::digest(&result);
if data_hash.as_slice() == broadcast_id {
Ok(Some(result))
} else {
Err(OverlayError::DataHashMismatch.into())
}
}
},
None => Ok(None),
}
}
#[derive(TlWrite)]
#[tl(boxed, id = "overlay.broadcast.toSign", scheme = "scheme.tl")]
struct OverlayBroadcastToSign {
hash: [u8; 32],
date: u32,
}
impl OverlayBroadcastToSign {
fn compute_broadcast_id(&self) -> BroadcastId {
tl_proto::hash(self)
}
}
fn make_broadcast_to_sign(
data: &[u8],
date: u32,
source: Option<&adnl::NodeIdShort>,
) -> OverlayBroadcastToSign {
const BROADCAST_ID: u32 = tl_proto::id!("overlay.broadcast.id", scheme = "scheme.tl");
let mut broadcast_hash = sha2::Sha256::new();
broadcast_hash.update(BROADCAST_ID.to_le_bytes());
broadcast_hash.update(source.map(adnl::NodeIdShort::as_slice).unwrap_or(&[0; 32]));
broadcast_hash.update(sha2::Sha256::digest(data).as_slice());
broadcast_hash.update(BROADCAST_FLAG_ANY_SENDER.to_le_bytes());
let broadcast_hash = broadcast_hash.finalize();
OverlayBroadcastToSign {
hash: broadcast_hash.into(),
date,
}
}
fn make_fec_part_to_sign(
data_hash: &[u8; 32],
data_size: u32,
date: u32,
flags: u32,
params: &proto::rldp::RaptorQFecType,
part: &[u8],
seqno: u32,
source: Option<adnl::NodeIdShort>,
) -> OverlayBroadcastToSign {
const BROADCAST_FEC_ID: u32 = tl_proto::id!("overlay.broadcastFec.id", scheme = "scheme.tl");
const BROADCAST_FEC_PART_ID: u32 =
tl_proto::id!("overlay.broadcastFec.partId", scheme = "scheme.tl");
let mut broadcast_hash = sha2::Sha256::new();
broadcast_hash.update(BROADCAST_FEC_ID.to_le_bytes());
broadcast_hash.update(
source
.as_ref()
.map(adnl::NodeIdShort::as_slice)
.unwrap_or(&[0; 32]),
);
broadcast_hash.update(tl_proto::hash(params));
broadcast_hash.update(data_hash);
broadcast_hash.update(data_size.to_le_bytes());
broadcast_hash.update(flags.to_le_bytes());
let broadcast_hash = broadcast_hash.finalize();
let mut part_hash = sha2::Sha256::new();
part_hash.update(BROADCAST_FEC_PART_ID.to_le_bytes());
part_hash.update(broadcast_hash);
part_hash.update(sha2::Sha256::digest(part).as_slice());
part_hash.update(seqno.to_le_bytes());
let part_hash = part_hash.finalize();
OverlayBroadcastToSign {
hash: part_hash.into(),
date,
}
}
pub struct IncomingBroadcastInfo {
pub packets: u32,
pub data: Vec<u8>,
pub from: adnl::NodeIdShort,
}
#[derive(Default, Copy, Clone)]
pub struct OutgoingBroadcastInfo {
pub packets: u32,
pub recipient_count: usize,
}
struct IncomingFecTransfer {
completed: AtomicBool,
history: PacketsHistory,
broadcast_tx: BroadcastFecTx,
source: adnl::NodeIdShort,
updated_at: UpdatedAt,
}
struct OutgoingFecTransfer {
broadcast_id: BroadcastId,
encoder: RaptorQEncoder,
seqno: u32,
}
enum OwnedBroadcast {
Other,
Incoming(IncomingFecTransfer),
}
#[derive(Debug)]
struct BroadcastFec {
node_id: adnl::NodeIdFull,
data_hash: BroadcastId,
data_size: u32,
flags: u32,
data: Vec<u8>,
seqno: u32,
fec_type: proto::rldp::RaptorQFecType,
date: u32,
signature: [u8; 64],
}
type VacantBroadcastEntry<'a> = dashmap::mapref::entry::VacantEntry<
'a,
BroadcastId,
Arc<OwnedBroadcast>,
BuildHasherDefault<rustc_hash::FxHasher>,
>;
pub type ReceivedPeersMap =
FxHashMap<HashWrapper<everscale_crypto::tl::PublicKeyOwned>, proto::overlay::NodeOwned>;
type BroadcastFecTx = mpsc::UnboundedSender<BroadcastFec>;
#[derive(Copy, Clone)]
pub struct DisplayBroadcastId<'a>(pub &'a BroadcastId);
impl std::fmt::Display for DisplayBroadcastId<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut output = [0u8; 64];
hex::encode_to_slice(self.0, &mut output).ok();
let output = unsafe { std::str::from_utf8_unchecked(&output) };
f.write_str(output)
}
}
type BroadcastId = [u8; 32];
#[derive(thiserror::Error, Debug)]
enum OverlayError {
#[error("Unsupported signature")]
UnsupportedSignature,
#[error("Data size mismatch")]
DataSizeMismatch,
#[error("Data hash mismatch")]
DataHashMismatch,
}
const BROADCAST_FLAG_ANY_SENDER: u32 = 1;