use std::net::{Ipv4Addr, SocketAddrV4};
use std::sync::Arc;
use anyhow::Result;
use sha2::Digest;
use tl_proto::TlWrite;
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
use crate::adnl::channel::*;
use crate::adnl::handshake::*;
use crate::adnl::keystore::Key;
use crate::adnl::node_id::NodeIdShort;
use crate::adnl::peer::*;
use crate::adnl::Node;
use crate::proto;
use crate::util::*;
impl Node {
pub(super) fn start_sender(
self: &Arc<Self>,
socket: Arc<UdpSocket>,
mut sender_queue_rx: SenderQueueRx,
) {
use futures_util::future::{select, Either};
let complete_signal = self.cancellation_token.clone();
tokio::spawn(async move {
tokio::pin!(let cancelled = complete_signal.cancelled(););
while let Some(packet) = {
tokio::pin!(let recv = sender_queue_rx.recv(););
match select(recv, &mut cancelled).await {
Either::Left((packet, _)) => packet,
Either::Right(_) => {
tracing::debug!("sender loop finished");
return;
}
}
} {
socket.send_to(&packet.data, packet.destination).await.ok();
}
});
}
pub(super) fn send_message(
&self,
local_id: &NodeIdShort,
peer_id: &NodeIdShort,
message: proto::adnl::Message,
priority: bool,
) -> Result<()> {
const MAX_ADNL_MESSAGE_SIZE: usize = 1024;
const MSG_ANSWER_SIZE: usize = 44;
const MSG_CONFIRM_CHANNEL_SIZE: usize = 72;
const MSG_CREATE_CHANNEL_SIZE: usize = 40;
const MSG_CUSTOM_SIZE: usize = 12;
const MSG_NOP_SIZE: usize = 4;
const MSG_QUERY_SIZE: usize = 44;
const MSG_PART_PREFIX_SIZE: usize = 40;
let peers = self.get_peers(local_id)?;
let peer = match peers.get(peer_id) {
Some(peer) => peer,
None => return Err(AdnlSenderError::UnknownPeer.into()),
};
let peer = peer.value();
let local_key = self.keystore.key_by_id(local_id)?;
let channel = self.channels_by_peers.get(peer_id);
let mut force_handshake = false;
let (additional_size, additional_message) = match &channel {
Some(channel) if channel.ready() => (0, None),
Some(channel_data) => {
tracing::trace!(%local_id, %peer_id, "sending ConfirmChannel");
force_handshake = true;
(
MSG_CONFIRM_CHANNEL_SIZE,
Some(proto::adnl::Message::ConfirmChannel {
key: peer.channel_key().public_key.as_bytes(),
peer_key: channel_data.peer_channel_public_key().as_bytes(),
date: channel_data.peer_channel_date(),
}),
)
}
None => {
tracing::trace!(%local_id, %peer_id, "sending CreateChannel");
(
MSG_CREATE_CHANNEL_SIZE,
Some(proto::adnl::Message::CreateChannel {
key: peer.channel_key().public_key.as_bytes(),
date: now(),
}),
)
}
};
let mut size = additional_size;
size += match message {
proto::adnl::Message::Answer { answer, .. } => answer.len() + MSG_ANSWER_SIZE,
proto::adnl::Message::ConfirmChannel { .. } => MSG_CONFIRM_CHANNEL_SIZE,
proto::adnl::Message::Custom { data } => data.len() + MSG_CUSTOM_SIZE,
proto::adnl::Message::Nop => MSG_NOP_SIZE,
proto::adnl::Message::Query { query, .. } => query.len() + MSG_QUERY_SIZE,
_ => return Err(AdnlSenderError::UnexpectedMessageToSend.into()),
};
let signer = match channel.as_ref() {
Some(channel) if !force_handshake => MessageSigner::Channel {
channel: channel.value(),
priority,
},
_ => MessageSigner::Random(local_key),
};
if size <= MAX_ADNL_MESSAGE_SIZE {
let mut buffer = Vec::with_capacity(size);
let messages = match additional_message {
Some(additional_message) => {
additional_message.write_to(&mut buffer);
message.write_to(&mut buffer);
proto::adnl::OutgoingMessages::Pair(&buffer)
}
None => {
message.write_to(&mut buffer);
proto::adnl::OutgoingMessages::Single(&buffer)
}
};
self.send_packet(peer_id, peer, signer, messages)
} else {
pub fn build_part_message<'a>(
data: &'a [u8],
hash: &'a [u8; 32],
max_size: usize,
offset: &mut usize,
) -> proto::adnl::Message<'a> {
let len = std::cmp::min(data.len(), *offset + max_size);
let result = proto::adnl::Message::Part {
hash,
total_size: data.len() as u32,
offset: *offset as u32,
data: if *offset < len {
&data[*offset..len]
} else {
&data[..0]
},
};
*offset += len;
result
}
let data = tl_proto::serialize(message);
let hash: [u8; 32] = sha2::Sha256::digest(&data).into();
let mut offset = 0;
let mut buffer = Vec::with_capacity(MAX_ADNL_MESSAGE_SIZE);
if let Some(additional_message) = additional_message {
additional_message.write_to(&mut buffer);
let message = build_part_message(
&data,
&hash,
MAX_ADNL_MESSAGE_SIZE - MSG_PART_PREFIX_SIZE - additional_size,
&mut offset,
);
message.write_to(&mut buffer);
ok!(self.send_packet(
peer_id,
peer,
signer,
proto::adnl::OutgoingMessages::Pair(&buffer),
));
}
while offset < data.len() {
buffer.clear();
let message = build_part_message(&data, &hash, MAX_ADNL_MESSAGE_SIZE, &mut offset);
message.write_to(&mut buffer);
ok!(self.send_packet(
peer_id,
peer,
signer,
proto::adnl::OutgoingMessages::Single(&buffer),
));
}
Ok(())
}
}
fn send_packet(
&self,
peer_id: &NodeIdShort,
peer: &Peer,
mut signer: MessageSigner,
messages: proto::adnl::OutgoingMessages,
) -> Result<()> {
const MAX_PRIORITY_ATTEMPTS: u64 = 10;
let priority = if let MessageSigner::Channel { priority, .. } = &mut signer {
if peer.receiver_state().history(*priority).seqno() == 0
&& peer.sender_state().history(true).seqno() > MAX_PRIORITY_ATTEMPTS
{
*priority = false;
}
*priority
} else {
false
};
let mut local_addr = self.socket_addr;
let mut peer_addr = peer.addr();
if self.options.use_loopback_for_neighbours
&& local_addr.ip() == peer_addr.ip()
&& !peer_addr.ip().is_loopback()
{
local_addr.set_ip(Ipv4Addr::LOCALHOST);
peer_addr.set_ip(Ipv4Addr::LOCALHOST);
}
let rand_bytes: [u8; 10] = gen_fast_bytes();
let now = now();
let address = proto::adnl::AddressList {
address: Some(proto::adnl::Address::from(&local_addr)),
version: now,
reinit_date: self.start_time,
expire_at: now + self.options.address_list_timeout_sec,
};
let mut packet = proto::adnl::OutgoingPacketContents {
rand1: &rand_bytes[..3],
from: match signer {
MessageSigner::Channel { .. } => None,
MessageSigner::Random(local_key) => Some(local_key.full_id().as_tl()),
},
messages,
address,
seqno: peer.sender_state().history(priority).bump_seqno(),
confirm_seqno: peer.receiver_state().history(priority).seqno(),
reinit_dates: match signer {
MessageSigner::Channel { .. } => None,
MessageSigner::Random(_) => Some(proto::adnl::ReinitDates {
local: self.start_time,
target: peer.sender_state().reinit_date(),
}),
},
signature: None,
rand2: &rand_bytes[3..],
};
let signature = match signer {
MessageSigner::Random(signer) => Some(signer.sign(&packet)),
MessageSigner::Channel { .. } => None,
};
packet.signature = signature.as_ref().map(<[u8; 64]>::as_slice);
let adnl_version = self.options.version;
let prefix_len = match &signer {
MessageSigner::Channel { .. } => Channel::compute_prefix_len(adnl_version),
MessageSigner::Random(..) => compute_handshake_prefix_len(adnl_version),
};
let mut data = Vec::with_capacity(prefix_len + packet.max_size_hint());
packet.write_to(&mut data);
match signer {
MessageSigner::Channel { channel, priority } => {
channel.encrypt(&mut data, priority, adnl_version)
}
MessageSigner::Random(_) => {
build_handshake_packet(peer_id, peer.id(), &mut data, adnl_version)
}
}
if self
.sender_queue_tx
.send(PacketToSend {
destination: peer_addr,
data,
})
.is_err()
{
return Err(AdnlSenderError::FailedToSendPacket.into());
}
Ok(())
}
}
#[derive(Copy, Clone)]
enum MessageSigner<'a> {
Channel {
channel: &'a Arc<Channel>,
priority: bool,
},
Random(&'a Arc<Key>),
}
pub struct PacketToSend {
destination: SocketAddrV4,
data: Vec<u8>,
}
pub type SenderQueueTx = mpsc::UnboundedSender<PacketToSend>;
pub type SenderQueueRx = mpsc::UnboundedReceiver<PacketToSend>;
#[derive(thiserror::Error, Debug)]
enum AdnlSenderError {
#[error("Unknown peer")]
UnknownPeer,
#[error("Unexpected message to send")]
UnexpectedMessageToSend,
#[error("Failed to send ADNL packet")]
FailedToSendPacket,
}