use std::cmp::max;
use std::cmp::{min, Ordering};
use std::collections::VecDeque;
use std::iter::Iterator;
use std::net::SocketAddr;
use std::time::Instant;
use bytes::{Bytes, BytesMut};
use log::{debug, error, info, trace, warn};
use super::TimeSpan;
use crate::loss_compression::compress_loss_list;
use crate::packet::{
AckControlInfo, ControlPacket, ControlTypes, DataEncryption, DataPacket, HandshakeControlInfo,
Packet, SrtControlPacket,
};
use crate::protocol::handshake::Handshake;
use crate::protocol::TimeStamp;
use crate::{seq_number::seq_num_range, ConnectionSettings, SeqNumber};
mod buffer;
mod time;
use buffer::RecvBuffer;
use time::{ReceiveTimers, RTT};
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum ReceiverAlgorithmAction {
TimeBoundedReceive(Instant),
SendControl(ControlPacket, SocketAddr),
OutputData((Instant, Bytes)),
Close,
}
struct LossListEntry {
seq_num: SeqNumber,
feedback_time: TimeStamp,
k: i32,
}
struct AckHistoryEntry {
ack_number: SeqNumber,
ack_seq_num: i32,
timestamp: TimeStamp,
}
pub struct Receiver {
settings: ConnectionSettings,
handshake: Handshake,
timers: ReceiveTimers,
control_packets: VecDeque<Packet>,
data_release: VecDeque<(Instant, Bytes)>,
rtt: RTT,
loss_list: Vec<LossListEntry>,
ack_history_window: Vec<AckHistoryEntry>,
packet_history_window: Vec<(SeqNumber, TimeStamp)>,
packet_pair_window: Vec<(SeqNumber, TimeSpan)>,
lrsn: SeqNumber,
next_ack: i32,
probe_time: Option<TimeStamp>,
lr_ack_acked: (i32, SeqNumber),
receive_buffer: RecvBuffer,
shutdown_flag: bool,
}
impl Receiver {
pub fn new(settings: ConnectionSettings, handshake: Handshake) -> Self {
let init_seq_num = settings.init_recv_seq_num;
info!(
"Receiving started from {:?}, with latency={:?}",
settings.remote, settings.recv_tsbpd_latency
);
Receiver {
settings: settings.clone(),
timers: ReceiveTimers::new(settings.socket_start_time),
control_packets: VecDeque::new(),
data_release: VecDeque::new(),
handshake,
rtt: RTT::new(),
loss_list: Vec::new(),
ack_history_window: Vec::new(),
packet_history_window: Vec::new(),
packet_pair_window: Vec::new(),
lrsn: init_seq_num,
next_ack: 1,
probe_time: None,
lr_ack_acked: (0, init_seq_num),
receive_buffer: RecvBuffer::with(&settings),
shutdown_flag: false,
}
}
pub fn handle_shutdown(&mut self) {
self.shutdown_flag = true;
}
pub fn handle_packet(&mut self, now: Instant, (packet, from): (Packet, SocketAddr)) {
if from != self.settings.remote {
info!("Packet received from unknown address: {:?}", from);
return;
}
if self.settings.local_sockid != packet.dest_sockid() {
info!(
"Packet send to socket id ({:?}) that does not match local ({:?})",
packet.dest_sockid(),
self.settings.local_sockid
);
return;
}
trace!("Received packet: {:?}", packet);
match packet {
Packet::Control(ctrl) => {
self.receive_buffer.synchronize_clock(now, ctrl.timestamp);
match ctrl.control_type {
ControlTypes::Ack { .. } => warn!("Receiver received ACK packet, unusual"),
ControlTypes::Ack2(seq_num) => self.handle_ack2(seq_num, now),
ControlTypes::DropRequest { .. } => unimplemented!(),
ControlTypes::Handshake(shake) => self.handle_handshake_packet(now, shake),
ControlTypes::KeepAlive => {}
ControlTypes::Nak { .. } => warn!("Receiver received NAK packet, unusual"),
ControlTypes::Shutdown => {
info!(
"{:?}: Shutdown packet received, flushing receiver...",
self.settings.local_sockid
);
self.shutdown_flag = true;
}
ControlTypes::Srt(srt_packet) => {
self.handle_srt_control_packet(srt_packet);
}
}
}
Packet::Data(data) => self.handle_data_packet(data, now),
};
}
pub fn next_algorithm_action(&mut self, now: Instant) -> ReceiverAlgorithmAction {
use ReceiverAlgorithmAction::*;
if self.timers.ack.check_expired(now).is_some() {
self.on_ack_event(now);
}
if self.timers.nak.check_expired(now).is_some() {
self.on_nak_event(now);
}
if let Some(data) = self.pop_data(now) {
OutputData(data)
} else if let Some(Packet::Control(packet)) = self.pop_conotrol_packet() {
SendControl(packet, self.settings.remote)
} else if self.shutdown_flag && self.is_flushed() {
Close
} else {
TimeBoundedReceive(self.next_timer(now))
}
}
pub fn is_flushed(&self) -> bool {
self.receive_buffer.next_msg_ready().is_none()
&& self.lr_ack_acked.1 == self.receive_buffer.next_release()
}
fn on_ack_event(&mut self, now: Instant) {
trace!("Ack event hit {:?}", self.settings.local_sockid);
let ack_number = match self.loss_list.first() {
Some(i) => i.seq_num,
None => self.lrsn,
};
if ack_number == self.lr_ack_acked.1 {
return;
}
if let Some(w) = self.ack_history_window.last() {
assert!(w.ack_number <= ack_number);
}
trace!(
"Sending ACK; ack_num={:?}, lr_ack_acked={:?}",
ack_number,
self.lr_ack_acked.1
);
if let Some(&AckHistoryEntry {
ack_number: last_ack_number,
timestamp: last_timestamp,
..
}) = self.ack_history_window.first()
{
if last_ack_number == ack_number &&
(self.receive_buffer.timestamp_from(now) - last_timestamp) < (self.rtt.mean() * 2)
{
return;
}
}
let ack_seq_num = self.next_ack;
self.next_ack += 1;
let packet_recv_rate = if self.packet_history_window.len() < 16 {
0
} else {
let mut last_16: Vec<_> = self.packet_history_window
[self.packet_history_window.len() - 16..]
.windows(2)
.map(|w| w[1].1 - w[0].1)
.collect();
last_16.sort();
let ai = last_16[last_16.len() / 2];
let filtered: Vec<TimeSpan> = last_16
.iter()
.filter(|&&n| n / 8 < ai && n > ai / 8)
.cloned()
.collect();
if filtered.len() > 8 {
(1_000_000 * filtered.len()) as u64
/ filtered
.iter()
.map(|dt| i64::from(dt.as_micros()))
.sum::<i64>() as u64
} else {
0
}
} as u32;
let est_link_cap = {
if self.packet_pair_window.len() < 16 {
0
} else {
let pi = {
let mut last_16: Vec<_> = self.packet_pair_window
[self.packet_pair_window.len() - 16..]
.iter()
.map(|&(_, time)| time)
.collect();
last_16.sort_unstable();
last_16[last_16.len() / 2]
};
(1. / (pi.as_secs_f64())) as i32
}
};
self.send_control(
now,
ControlTypes::Ack(AckControlInfo {
ack_seq_num,
ack_number,
rtt: Some(self.rtt.mean()),
rtt_variance: Some(self.rtt.variance()),
buffer_available: None,
packet_recv_rate: Some(packet_recv_rate),
est_link_cap: Some(est_link_cap),
}),
);
let ts_now = self.receive_buffer.timestamp_from(now);
self.ack_history_window.push(AckHistoryEntry {
ack_number,
ack_seq_num,
timestamp: ts_now,
});
}
fn on_nak_event(&mut self, now: Instant) {
self.timers.update_rtt(&self.rtt);
let ts_now = self.receive_buffer.timestamp_from(now);
let seq_nums = {
let mut ret = Vec::new();
let rtt = self.rtt.mean();
for pak in self
.loss_list
.iter_mut()
.filter(|lle| ts_now - lle.feedback_time > rtt * lle.k)
{
pak.k += 1;
pak.feedback_time = ts_now;
ret.push(pak.seq_num);
}
ret
};
if seq_nums.is_empty() {
return;
}
self.send_nak(now, seq_nums.into_iter());
}
fn handle_handshake_packet(&mut self, now: Instant, control_info: HandshakeControlInfo) {
if let Some(c) = self.handshake.handle_handshake(control_info) {
self.send_control(now, c)
}
}
fn handle_srt_control_packet(&mut self, pack: SrtControlPacket) {
use self::SrtControlPacket::*;
match pack {
HandshakeRequest(_) | HandshakeResponse(_) => {
warn!("Received handshake SRT packet, HSv5 expected");
}
_ => unimplemented!(),
}
}
fn handle_ack2(&mut self, seq_num: i32, now: Instant) {
let id_in_wnd = match self
.ack_history_window
.as_slice()
.binary_search_by_key(&seq_num, |entry| entry.ack_seq_num)
{
Ok(i) => Some(i),
Err(_) => None,
};
if let Some(id) = id_in_wnd {
let AckHistoryEntry {
timestamp: send_timestamp,
ack_number,
..
} = self.ack_history_window[id];
self.lr_ack_acked = (seq_num, ack_number);
self.rtt
.update(self.receive_buffer.timestamp_from(now) - send_timestamp);
self.timers.update_rtt(&self.rtt);
} else {
warn!(
"ACK sequence number in ACK2 packet not found in ACK history: {}",
seq_num
);
}
}
fn handle_data_packet(&mut self, mut data: DataPacket, now: Instant) {
let ts_now = self.receive_buffer.timestamp_from(now);
if data.seq_number % 16 == 0 {
self.probe_time = Some(ts_now)
} else if data.seq_number % 16 == 1 {
if let Some(pt) = self.probe_time {
self.packet_pair_window.push((data.seq_number, ts_now - pt));
self.probe_time = None
}
}
self.packet_history_window.push((data.seq_number, ts_now));
match data.seq_number.cmp(&self.lrsn) {
Ordering::Greater => {
for i in seq_num_range(self.lrsn, data.seq_number) {
self.loss_list.push(LossListEntry {
seq_num: i,
feedback_time: ts_now,
k: 2,
})
}
self.send_nak(now, seq_num_range(self.lrsn, data.seq_number));
}
Ordering::Less => {
match self.loss_list[..].binary_search_by(|ll| ll.seq_num.cmp(&data.seq_number)) {
Ok(i) => {
self.loss_list.remove(i);
}
Err(_) => {
debug!(
"Packet received that's not in the loss list: {:?}, loss_list={:?}",
data.seq_number,
self.loss_list
.iter()
.map(|ll| ll.seq_num.as_raw())
.collect::<Vec<_>>()
);
}
};
}
Ordering::Equal => {}
}
self.lrsn = max(data.seq_number + 1, self.lrsn);
if self.receive_buffer.next_release() > data.seq_number {
debug!("Received packet {:?} twice", data.seq_number);
return;
}
if data.encryption != DataEncryption::None {
self.decrypt_packet(&mut data);
}
self.receive_buffer.add(data);
}
fn decrypt_packet(&self, data: &mut DataPacket) {
let cm = match &self.settings.crypto_manager {
None => {
error!("Unexpcted encrypted packet!");
return;
}
Some(cm) => cm,
};
let mut bm = BytesMut::with_capacity(data.payload.len());
bm.extend_from_slice(&data.payload[..]);
cm.decrypt(data.seq_number, data.encryption, &mut bm);
data.payload = bm.freeze();
}
fn send_nak(&mut self, now: Instant, lost_seq_nums: impl Iterator<Item = SeqNumber>) {
let vec: Vec<_> = lost_seq_nums.collect();
debug!("Sending NAK for={:?}", vec);
self.send_control(
now,
ControlTypes::Nak(compress_loss_list(vec.iter().cloned()).collect()),
);
}
fn pop_data(&mut self, now: Instant) -> Option<(Instant, Bytes)> {
while let Some(d) = self.receive_buffer.next_msg_tsbpd(now) {
self.data_release.push_back(d);
}
let _dropped = self.receive_buffer.drop_too_late_packets(now);
self.data_release.pop_front()
}
fn pop_conotrol_packet(&mut self) -> Option<Packet> {
self.control_packets.pop_front()
}
fn next_timer(&self, now: Instant) -> Instant {
match self.receive_buffer.next_message_release_time(now) {
Some(next_rel_time) => min(self.timers.next_timer(now), next_rel_time),
None => self.timers.next_timer(now),
}
}
fn send_control(&mut self, now: Instant, control: ControlTypes) {
self.control_packets
.push_back(Packet::Control(ControlPacket {
timestamp: self.receive_buffer.timestamp_from(now),
dest_sockid: self.settings.remote_sockid,
control_type: control,
}));
}
}